From fd9b464b359eeaa035b8e919a274766504c33e9c Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Wed, 8 Jan 2025 17:01:37 -0500 Subject: [PATCH] Add a trait for the P2p network used in the coordinator Moves all of the Libp2p code to a dedicated directory. Makes the Heartbeat task abstract over any P2p network. --- coordinator/src/p2p/heartbeat.rs | 16 +- .../src/p2p/{ => libp2p}/authenticate.rs | 2 +- coordinator/src/p2p/{ => libp2p}/dial.rs | 2 +- coordinator/src/p2p/{ => libp2p}/gossip.rs | 0 coordinator/src/p2p/libp2p/mod.rs | 163 ++++++++++++++++ coordinator/src/p2p/{ => libp2p}/reqres.rs | 0 coordinator/src/p2p/{ => libp2p}/swarm.rs | 4 +- .../src/p2p/{ => libp2p}/validators.rs | 2 +- coordinator/src/p2p/mod.rs | 183 ++---------------- 9 files changed, 194 insertions(+), 178 deletions(-) rename coordinator/src/p2p/{ => libp2p}/authenticate.rs (98%) rename coordinator/src/p2p/{ => libp2p}/dial.rs (98%) rename coordinator/src/p2p/{ => libp2p}/gossip.rs (100%) create mode 100644 coordinator/src/p2p/libp2p/mod.rs rename coordinator/src/p2p/{ => libp2p}/reqres.rs (100%) rename coordinator/src/p2p/{ => libp2p}/swarm.rs (99%) rename coordinator/src/p2p/{ => libp2p}/validators.rs (99%) diff --git a/coordinator/src/p2p/heartbeat.rs b/coordinator/src/p2p/heartbeat.rs index 85b07dc6..0f000dcc 100644 --- a/coordinator/src/p2p/heartbeat.rs +++ b/coordinator/src/p2p/heartbeat.rs @@ -11,10 +11,7 @@ use serai_task::ContinuallyRan; use crate::{ tributary::Transaction, - p2p::{ - reqres::{Request, Response}, - P2p, - }, + p2p::{Peer, P2p}, }; // Amount of blocks in a minute @@ -28,14 +25,14 @@ pub const BLOCKS_PER_BATCH: usize = BLOCKS_PER_MINUTE + 1; /// /// If the other validator has more blocks then we do, they're expected to inform us. This forms /// the sync protocol for our Tributaries. -struct HeartbeatTask { +struct HeartbeatTask { set: ValidatorSet, - tributary: Tributary, + tributary: Tributary, reader: TributaryReader, - p2p: P2p, + p2p: P, } -impl ContinuallyRan for HeartbeatTask { +impl ContinuallyRan for HeartbeatTask { fn run_iteration(&mut self) -> impl Send + Future> { async move { // If our blockchain hasn't had a block in the past minute, trigger the heartbeat protocol @@ -74,8 +71,7 @@ impl ContinuallyRan for HeartbeatTask { tip = self.reader.tip(); tip_is_stale = false; } - let request = Request::Heartbeat { set: self.set, latest_block_hash: tip }; - let Ok(Response::Blocks(blocks)) = peer.send(request).await else { continue 'peer }; + let Ok(blocks) = peer.send_heartbeat(self.set, tip).await else { continue 'peer }; // This is the final batch if it has less than the maximum amount of blocks // (signifying there weren't more blocks after this to fill the batch with) diff --git a/coordinator/src/p2p/authenticate.rs b/coordinator/src/p2p/libp2p/authenticate.rs similarity index 98% rename from coordinator/src/p2p/authenticate.rs rename to coordinator/src/p2p/libp2p/authenticate.rs index c678a034..d00d0dac 100644 --- a/coordinator/src/p2p/authenticate.rs +++ b/coordinator/src/p2p/libp2p/authenticate.rs @@ -19,7 +19,7 @@ use libp2p::{ noise, }; -use crate::p2p::{validators::Validators, peer_id_from_public}; +use crate::p2p::libp2p::{validators::Validators, peer_id_from_public}; const PROTOCOL: &str = "/serai/coordinator/validators"; diff --git a/coordinator/src/p2p/dial.rs b/coordinator/src/p2p/libp2p/dial.rs similarity index 98% rename from coordinator/src/p2p/dial.rs rename to coordinator/src/p2p/libp2p/dial.rs index 74eaba9a..03795a51 100644 --- a/coordinator/src/p2p/dial.rs +++ b/coordinator/src/p2p/libp2p/dial.rs @@ -14,7 +14,7 @@ use libp2p::{ use serai_task::ContinuallyRan; -use crate::p2p::{PORT, Peers, validators::Validators}; +use crate::p2p::libp2p::{PORT, Peers, validators::Validators}; const TARGET_PEERS_PER_NETWORK: usize = 5; /* diff --git a/coordinator/src/p2p/gossip.rs b/coordinator/src/p2p/libp2p/gossip.rs similarity index 100% rename from coordinator/src/p2p/gossip.rs rename to coordinator/src/p2p/libp2p/gossip.rs diff --git a/coordinator/src/p2p/libp2p/mod.rs b/coordinator/src/p2p/libp2p/mod.rs new file mode 100644 index 00000000..b103a63f --- /dev/null +++ b/coordinator/src/p2p/libp2p/mod.rs @@ -0,0 +1,163 @@ +use core::future::Future; +use std::{ + sync::Arc, + collections::{HashSet, HashMap}, +}; + +use zeroize::Zeroizing; +use schnorrkel::Keypair; + +use serai_client::{ + primitives::{NetworkId, PublicKey}, + Serai, +}; + +use tokio::sync::{mpsc, RwLock}; + +use serai_task::{Task, ContinuallyRan}; + +use libp2p::{ + multihash::Multihash, + identity::{self, PeerId}, + tcp::Config as TcpConfig, + yamux, + swarm::NetworkBehaviour, + SwarmBuilder, +}; + +/// A struct to sync the validators from the Serai node in order to keep track of them. +mod validators; +use validators::UpdateValidatorsTask; + +/// The authentication protocol upgrade to limit the P2P network to active validators. +mod authenticate; +use authenticate::OnlyValidators; + +/// The dial task, to find new peers to connect to +mod dial; +use dial::DialTask; + +/// The request-response messages and behavior +mod reqres; +use reqres::{Request, Response}; + +/// The gossip messages and behavior +mod gossip; + +/// The swarm task, running it and dispatching to/from it +mod swarm; +use swarm::SwarmTask; + +const PORT: u16 = 30563; // 5132 ^ (('c' << 8) | 'o') + +// usize::max, manually implemented, as max isn't a const fn +const MAX_LIBP2P_MESSAGE_SIZE: usize = + if gossip::MAX_LIBP2P_GOSSIP_MESSAGE_SIZE > reqres::MAX_LIBP2P_REQRES_MESSAGE_SIZE { + gossip::MAX_LIBP2P_GOSSIP_MESSAGE_SIZE + } else { + reqres::MAX_LIBP2P_REQRES_MESSAGE_SIZE + }; + +fn peer_id_from_public(public: PublicKey) -> PeerId { + // 0 represents the identity Multihash, that no hash was performed + // It's an internal constant so we can't refer to the constant inside libp2p + PeerId::from_multihash(Multihash::wrap(0, &public.0).unwrap()).unwrap() +} + +struct Peer; +impl Peer { + async fn send(&self, request: Request) -> Result { + (async move { todo!("TODO") }).await + } +} + +#[derive(Clone)] +struct Peers { + peers: Arc>>>, +} + +#[derive(NetworkBehaviour)] +struct Behavior { + reqres: reqres::Behavior, + gossip: gossip::Behavior, +} + +struct LibP2p; +impl LibP2p { + pub(crate) fn new(serai_key: &Zeroizing, serai: Serai) -> LibP2p { + // Define the object we track peers with + let peers = Peers { peers: Arc::new(RwLock::new(HashMap::new())) }; + + // Define the dial task + let (dial_task_def, dial_task) = Task::new(); + let (to_dial_send, to_dial_recv) = mpsc::unbounded_channel(); + tokio::spawn( + DialTask::new(serai.clone(), peers.clone(), to_dial_send) + .continually_run(dial_task_def, vec![]), + ); + + // Define the Validators object used for validating new connections + let connection_validators = UpdateValidatorsTask::spawn(serai.clone()); + let new_only_validators = |noise_keypair: &identity::Keypair| -> Result<_, ()> { + Ok(OnlyValidators { + serai_key: serai_key.clone(), + validators: connection_validators.clone(), + noise_keypair: noise_keypair.clone(), + }) + }; + + let new_yamux = || { + let mut config = yamux::Config::default(); + // 1 MiB default + max message size + config.set_max_buffer_size((1024 * 1024) + MAX_LIBP2P_MESSAGE_SIZE); + // 256 KiB default + max message size + config.set_receive_window_size(((256 * 1024) + MAX_LIBP2P_MESSAGE_SIZE).try_into().unwrap()); + config + }; + + let behavior = Behavior { reqres: reqres::new_behavior(), gossip: gossip::new_behavior() }; + + let mut swarm = SwarmBuilder::with_existing_identity(identity::Keypair::generate_ed25519()) + .with_tokio() + .with_tcp(TcpConfig::default().nodelay(false), new_only_validators, new_yamux) + .unwrap() + .with_behaviour(|_| behavior) + .unwrap() + .build(); + swarm.listen_on(format!("/ip4/0.0.0.0/tcp/{PORT}").parse().unwrap()).unwrap(); + swarm.listen_on(format!("/ip6/::/tcp/{PORT}").parse().unwrap()).unwrap(); + + let swarm_validators = UpdateValidatorsTask::spawn(serai); + + let (gossip_send, gossip_recv) = mpsc::unbounded_channel(); + let (signed_cosigns_send, signed_cosigns_recv) = mpsc::unbounded_channel(); + let (tributary_gossip_send, tributary_gossip_recv) = mpsc::unbounded_channel(); + + let (outbound_requests_send, outbound_requests_recv) = mpsc::unbounded_channel(); + + let (heartbeat_requests_send, heartbeat_requests_recv) = mpsc::unbounded_channel(); + let (notable_cosign_requests_send, notable_cosign_requests_recv) = mpsc::unbounded_channel(); + let (inbound_request_responses_send, inbound_request_responses_recv) = + mpsc::unbounded_channel(); + + // Create the swarm task + SwarmTask::spawn( + dial_task, + to_dial_recv, + swarm_validators, + peers, + swarm, + gossip_recv, + signed_cosigns_send, + tributary_gossip_send, + outbound_requests_recv, + heartbeat_requests_send, + notable_cosign_requests_send, + inbound_request_responses_recv, + ); + + // gossip_send, signed_cosigns_recv, tributary_gossip_recv, outbound_requests_send, + // heartbeat_requests_recv, notable_cosign_requests_recv, inbound_request_responses_send + todo!("TODO"); + } +} diff --git a/coordinator/src/p2p/reqres.rs b/coordinator/src/p2p/libp2p/reqres.rs similarity index 100% rename from coordinator/src/p2p/reqres.rs rename to coordinator/src/p2p/libp2p/reqres.rs diff --git a/coordinator/src/p2p/swarm.rs b/coordinator/src/p2p/libp2p/swarm.rs similarity index 99% rename from coordinator/src/p2p/swarm.rs rename to coordinator/src/p2p/libp2p/swarm.rs index 87440c92..3962e81b 100644 --- a/coordinator/src/p2p/swarm.rs +++ b/coordinator/src/p2p/libp2p/swarm.rs @@ -21,7 +21,7 @@ use libp2p::{ swarm::{dial_opts::DialOpts, SwarmEvent, Swarm}, }; -use crate::p2p::{ +use crate::p2p::libp2p::{ Peers, BehaviorEvent, Behavior, validators::Validators, reqres::{self, Request, Response}, @@ -286,7 +286,7 @@ impl SwarmTask { } #[allow(clippy::too_many_arguments)] - pub(crate) fn new( + pub(crate) fn spawn( dial_task: TaskHandle, to_dial: mpsc::UnboundedReceiver, diff --git a/coordinator/src/p2p/validators.rs b/coordinator/src/p2p/libp2p/validators.rs similarity index 99% rename from coordinator/src/p2p/validators.rs rename to coordinator/src/p2p/libp2p/validators.rs index 5a639148..b5be7c9e 100644 --- a/coordinator/src/p2p/validators.rs +++ b/coordinator/src/p2p/libp2p/validators.rs @@ -13,7 +13,7 @@ use libp2p::PeerId; use futures_util::stream::{StreamExt, FuturesUnordered}; use tokio::sync::RwLock; -use crate::p2p::peer_id_from_public; +use crate::p2p::libp2p::peer_id_from_public; pub(crate) struct Validators { serai: Serai, diff --git a/coordinator/src/p2p/mod.rs b/coordinator/src/p2p/mod.rs index ba09b273..534e44dc 100644 --- a/coordinator/src/p2p/mod.rs +++ b/coordinator/src/p2p/mod.rs @@ -1,176 +1,33 @@ -use std::{ - sync::Arc, - collections::{HashSet, HashMap}, -}; +use core::future::Future; -use zeroize::Zeroizing; -use schnorrkel::Keypair; +use tokio::time::error::Elapsed; -use serai_client::{ - primitives::{NetworkId, PublicKey}, - Serai, -}; +use borsh::{BorshSerialize, BorshDeserialize}; -use tokio::sync::{mpsc, RwLock}; +use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet}; -use serai_task::{Task, ContinuallyRan}; - -use libp2p::{ - multihash::Multihash, - identity::{self, PeerId}, - tcp::Config as TcpConfig, - yamux, - swarm::NetworkBehaviour, - SwarmBuilder, -}; - -/// A struct to sync the validators from the Serai node in order to keep track of them. -mod validators; -use validators::UpdateValidatorsTask; - -/// The authentication protocol upgrade to limit the P2P network to active validators. -mod authenticate; -use authenticate::OnlyValidators; - -/// The dial task, to find new peers to connect to -mod dial; -use dial::DialTask; - -/// The request-response messages and behavior -mod reqres; -use reqres::{Request, Response}; - -/// The gossip messages and behavior -mod gossip; +/// The libp2p-backed P2p network +mod libp2p; /// The heartbeat task, effecting sync of Tributaries mod heartbeat; -/// The swarm task, running it and dispatching to/from it -mod swarm; -use swarm::SwarmTask; - -const PORT: u16 = 30563; // 5132 ^ (('c' << 8) | 'o') - -// usize::max, manually implemented, as max isn't a const fn -const MAX_LIBP2P_MESSAGE_SIZE: usize = - if gossip::MAX_LIBP2P_GOSSIP_MESSAGE_SIZE > reqres::MAX_LIBP2P_REQRES_MESSAGE_SIZE { - gossip::MAX_LIBP2P_GOSSIP_MESSAGE_SIZE - } else { - reqres::MAX_LIBP2P_REQRES_MESSAGE_SIZE - }; - -fn peer_id_from_public(public: PublicKey) -> PeerId { - // 0 represents the identity Multihash, that no hash was performed - // It's an internal constant so we can't refer to the constant inside libp2p - PeerId::from_multihash(Multihash::wrap(0, &public.0).unwrap()).unwrap() +/// A tributary block and its commit. +#[derive(Clone, BorshSerialize, BorshDeserialize)] +pub(crate) struct TributaryBlockWithCommit { + pub(crate) block: Vec, + pub(crate) commit: Vec, } -struct Peer; -impl Peer { - async fn send(&self, request: Request) -> Result { - (async move { todo!("TODO") }).await - } +trait Peer: Send { + fn send_heartbeat( + &self, + set: ValidatorSet, + latest_block_hash: [u8; 32], + ) -> impl Send + Future, Elapsed>>; } -#[derive(Clone)] -struct Peers { - peers: Arc>>>, -} - -#[derive(Clone, Debug)] -struct P2p; -impl P2p { - async fn peers(&self, set: NetworkId) -> Vec { - (async move { todo!("TODO") }).await - } -} - -#[async_trait::async_trait] -impl tributary::P2p for P2p { - async fn broadcast(&self, genesis: [u8; 32], msg: Vec) { - todo!("TODO") - } -} - -#[derive(NetworkBehaviour)] -struct Behavior { - reqres: reqres::Behavior, - gossip: gossip::Behavior, -} - -pub(crate) fn new(serai_key: &Zeroizing, serai: Serai) -> P2p { - // Define the object we track peers with - let peers = Peers { peers: Arc::new(RwLock::new(HashMap::new())) }; - - // Define the dial task - let (dial_task_def, dial_task) = Task::new(); - let (to_dial_send, to_dial_recv) = mpsc::unbounded_channel(); - tokio::spawn( - DialTask::new(serai.clone(), peers.clone(), to_dial_send) - .continually_run(dial_task_def, vec![]), - ); - - // Define the Validators object used for validating new connections - let connection_validators = UpdateValidatorsTask::spawn(serai.clone()); - let new_only_validators = |noise_keypair: &identity::Keypair| -> Result<_, ()> { - Ok(OnlyValidators { - serai_key: serai_key.clone(), - validators: connection_validators.clone(), - noise_keypair: noise_keypair.clone(), - }) - }; - - let new_yamux = || { - let mut config = yamux::Config::default(); - // 1 MiB default + max message size - config.set_max_buffer_size((1024 * 1024) + MAX_LIBP2P_MESSAGE_SIZE); - // 256 KiB default + max message size - config.set_receive_window_size(((256 * 1024) + MAX_LIBP2P_MESSAGE_SIZE).try_into().unwrap()); - config - }; - - let behavior = Behavior { reqres: reqres::new_behavior(), gossip: gossip::new_behavior() }; - - let mut swarm = SwarmBuilder::with_existing_identity(identity::Keypair::generate_ed25519()) - .with_tokio() - .with_tcp(TcpConfig::default().nodelay(false), new_only_validators, new_yamux) - .unwrap() - .with_behaviour(|_| behavior) - .unwrap() - .build(); - swarm.listen_on(format!("/ip4/0.0.0.0/tcp/{PORT}").parse().unwrap()).unwrap(); - swarm.listen_on(format!("/ip6/::/tcp/{PORT}").parse().unwrap()).unwrap(); - - let swarm_validators = UpdateValidatorsTask::spawn(serai); - - let (gossip_send, gossip_recv) = mpsc::unbounded_channel(); - let (signed_cosigns_send, signed_cosigns_recv) = mpsc::unbounded_channel(); - let (tributary_gossip_send, tributary_gossip_recv) = mpsc::unbounded_channel(); - - let (outbound_requests_send, outbound_requests_recv) = mpsc::unbounded_channel(); - - let (heartbeat_requests_send, heartbeat_requests_recv) = mpsc::unbounded_channel(); - let (notable_cosign_requests_send, notable_cosign_requests_recv) = mpsc::unbounded_channel(); - let (inbound_request_responses_send, inbound_request_responses_recv) = mpsc::unbounded_channel(); - - // Create the swarm task - SwarmTask::new( - dial_task, - to_dial_recv, - swarm_validators, - peers, - swarm, - gossip_recv, - signed_cosigns_send, - tributary_gossip_send, - outbound_requests_recv, - heartbeat_requests_send, - notable_cosign_requests_send, - inbound_request_responses_recv, - ); - - // gossip_send, signed_cosigns_recv, tributary_gossip_recv, outbound_requests_send, - // heartbeat_requests_recv, notable_cosign_requests_recv, inbound_request_responses_send - todo!("TODO"); +trait P2p: Send + Sync + tributary::P2p { + type Peer: Peer; + fn peers(&self, network: NetworkId) -> impl Send + Future>; }