diff --git a/coordinator/src/p2p/authenticate.rs b/coordinator/src/p2p/authenticate.rs index ffa8a33b..c678a034 100644 --- a/coordinator/src/p2p/authenticate.rs +++ b/coordinator/src/p2p/authenticate.rs @@ -23,10 +23,11 @@ use crate::p2p::{validators::Validators, peer_id_from_public}; const PROTOCOL: &str = "/serai/coordinator/validators"; -struct OnlyValidators { - validators: Arc>, - serai_key: Zeroizing, - noise_keypair: identity::Keypair, +#[derive(Clone)] +pub(crate) struct OnlyValidators { + pub(crate) validators: Arc>, + pub(crate) serai_key: Zeroizing, + pub(crate) noise_keypair: identity::Keypair, } impl OnlyValidators { diff --git a/coordinator/src/p2p/gossip.rs b/coordinator/src/p2p/gossip.rs index 7f5a078c..99196fb6 100644 --- a/coordinator/src/p2p/gossip.rs +++ b/coordinator/src/p2p/gossip.rs @@ -15,7 +15,7 @@ pub use libp2p::gossipsub::Event; use serai_cosign::SignedCosign; // Block size limit + 16 KB of space for signatures/metadata -const MAX_LIBP2P_GOSSIP_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 16384; +pub(crate) const MAX_LIBP2P_GOSSIP_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 16384; const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(80); diff --git a/coordinator/src/p2p/mod.rs b/coordinator/src/p2p/mod.rs index 7ccb46a3..b104e94f 100644 --- a/coordinator/src/p2p/mod.rs +++ b/coordinator/src/p2p/mod.rs @@ -1,23 +1,36 @@ -use core::future::Future; use std::{ sync::Arc, collections::{HashSet, HashMap}, }; -use serai_client::primitives::{NetworkId, PublicKey}; +use zeroize::Zeroizing; +use schnorrkel::Keypair; -use tokio::sync::RwLock; +use serai_client::{ + primitives::{NetworkId, PublicKey}, + Serai, +}; -use serai_task::ContinuallyRan; +use tokio::sync::{mpsc, RwLock}; -use libp2p::{multihash::Multihash, identity::PeerId, swarm::NetworkBehaviour}; +use serai_task::Task; + +use libp2p::{ + multihash::Multihash, + identity::{self, PeerId}, + tcp::Config as TcpConfig, + yamux, + swarm::NetworkBehaviour, + SwarmBuilder, +}; /// A struct to sync the validators from the Serai node in order to keep track of them. mod validators; -use validators::{Validators, update_shared_validators}; +use validators::{Validators, UpdateValidatorsTask}; /// The authentication protocol upgrade to limit the P2P network to active validators. mod authenticate; +use authenticate::OnlyValidators; /// The dial task, to find new peers to connect to mod dial; @@ -34,9 +47,18 @@ mod heartbeat; /// The swarm task, running it and dispatching to/from it mod swarm; +use swarm::SwarmTask; const PORT: u16 = 30563; // 5132 ^ (('c' << 8) | 'o') +// usize::max, manually implemented, as max isn't a const fn +const MAX_LIBP2P_MESSAGE_SIZE: usize = + if gossip::MAX_LIBP2P_GOSSIP_MESSAGE_SIZE > reqres::MAX_LIBP2P_REQRES_MESSAGE_SIZE { + gossip::MAX_LIBP2P_GOSSIP_MESSAGE_SIZE + } else { + reqres::MAX_LIBP2P_REQRES_MESSAGE_SIZE + }; + fn peer_id_from_public(public: PublicKey) -> PeerId { // 0 represents the identity Multihash, that no hash was performed // It's an internal constant so we can't refer to the constant inside libp2p @@ -76,19 +98,73 @@ struct Behavior { gossip: gossip::Behavior, } -struct UpdateSharedValidatorsTask { - validators: Arc>, -} +pub(crate) fn new(serai_key: &Zeroizing, serai: Serai) -> P2p { + // Define the object we track peers with + let peers = Peers { peers: Arc::new(RwLock::new(HashMap::new())) }; -impl ContinuallyRan for UpdateSharedValidatorsTask { - // Only run every minute, not the default of every five seconds - const DELAY_BETWEEN_ITERATIONS: u64 = 60; - const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60; + // Define the dial task + let (dial_task_def, dial_task) = Task::new(); + let (to_dial_send, to_dial_recv) = mpsc::unbounded_channel(); + todo!("TODO: Dial task"); - fn run_iteration(&mut self) -> impl Send + Future> { - async move { - update_shared_validators(&self.validators).await.map_err(|e| format!("{e:?}"))?; - Ok(true) - } - } + // Define the Validators object used for validating new connections + let connection_validators = UpdateValidatorsTask::spawn(serai.clone()); + let new_only_validators = |noise_keypair: &identity::Keypair| -> Result<_, ()> { + Ok(OnlyValidators { + serai_key: serai_key.clone(), + validators: connection_validators.clone(), + noise_keypair: noise_keypair.clone(), + }) + }; + + let new_yamux = || { + let mut config = yamux::Config::default(); + // 1 MiB default + max message size + config.set_max_buffer_size((1024 * 1024) + MAX_LIBP2P_MESSAGE_SIZE); + // 256 KiB default + max message size + config.set_receive_window_size(((256 * 1024) + MAX_LIBP2P_MESSAGE_SIZE).try_into().unwrap()); + config + }; + + let behavior = Behavior { reqres: reqres::new_behavior(), gossip: gossip::new_behavior() }; + + let mut swarm = SwarmBuilder::with_existing_identity(identity::Keypair::generate_ed25519()) + .with_tokio() + .with_tcp(TcpConfig::default().nodelay(false), new_only_validators, new_yamux) + .unwrap() + .with_behaviour(|_| behavior) + .unwrap() + .build(); + swarm.listen_on(format!("/ip4/0.0.0.0/tcp/{PORT}").parse().unwrap()).unwrap(); + swarm.listen_on(format!("/ip6/::/tcp/{PORT}").parse().unwrap()).unwrap(); + + let swarm_validators = UpdateValidatorsTask::spawn(serai); + + let (gossip_send, gossip_recv) = mpsc::unbounded_channel(); + let (signed_cosigns_send, signed_cosigns_recv) = mpsc::unbounded_channel(); + let (tributary_gossip_send, tributary_gossip_recv) = mpsc::unbounded_channel(); + + let (outbound_requests_send, outbound_requests_recv) = mpsc::unbounded_channel(); + + let (heartbeat_requests_send, heartbeat_requests_recv) = mpsc::unbounded_channel(); + let (notable_cosign_requests_send, notable_cosign_requests_recv) = mpsc::unbounded_channel(); + let (inbound_request_responses_send, inbound_request_responses_recv) = mpsc::unbounded_channel(); + + // Create the swarm task + SwarmTask::new( + dial_task, + to_dial_recv, + swarm_validators, + peers, + swarm, + gossip_recv, + signed_cosigns_send, + tributary_gossip_send, + outbound_requests_recv, + heartbeat_requests_send, + notable_cosign_requests_send, + inbound_request_responses_recv, + ); + + todo!("TODO") } diff --git a/coordinator/src/p2p/reqres.rs b/coordinator/src/p2p/reqres.rs index fd6efcbd..cf7575e4 100644 --- a/coordinator/src/p2p/reqres.rs +++ b/coordinator/src/p2p/reqres.rs @@ -17,7 +17,7 @@ use serai_cosign::SignedCosign; /// The maximum message size for the request-response protocol // This is derived from the heartbeat message size as it's our largest message -const MAX_LIBP2P_REQRES_MESSAGE_SIZE: usize = +pub(crate) const MAX_LIBP2P_REQRES_MESSAGE_SIZE: usize = (tributary::BLOCK_SIZE_LIMIT * crate::p2p::heartbeat::BLOCKS_PER_BATCH) + 1024; const PROTOCOL: &str = "/serai/coordinator/reqres/1.0.0"; @@ -46,14 +46,14 @@ pub(crate) struct TributaryBlockWithCommit { /// Responses which can be received via the request-response protocol. #[derive(Clone, BorshSerialize, BorshDeserialize)] pub(crate) enum Response { - NoResponse, + None, Blocks(Vec), NotableCosigns(Vec), } impl fmt::Debug for Response { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - Response::NoResponse => fmt.debug_struct("Response::NoResponse").finish(), + Response::None => fmt.debug_struct("Response::None").finish(), Response::Blocks(_) => fmt.debug_struct("Response::Block").finish_non_exhaustive(), Response::NotableCosigns(_) => { fmt.debug_struct("Response::NotableCosigns").finish_non_exhaustive() diff --git a/coordinator/src/p2p/swarm.rs b/coordinator/src/p2p/swarm.rs index 10d14a6d..87440c92 100644 --- a/coordinator/src/p2p/swarm.rs +++ b/coordinator/src/p2p/swarm.rs @@ -28,6 +28,9 @@ use crate::p2p::{ gossip, }; +const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(80); +const TIME_BETWEEN_REBUILD_PEERS: Duration = Duration::from_secs(10 * 60); + /* `SwarmTask` handles everything we need the `Swarm` object for. The goal is to minimize the contention on this task. Unfortunately, the `Swarm` object itself is needed for a variety of @@ -43,7 +46,7 @@ use crate::p2p::{ - Dispatching received requests - Sending responses */ -struct SwarmTask { +pub(crate) struct SwarmTask { dial_task: TaskHandle, to_dial: mpsc::UnboundedReceiver, last_dial_task_run: Instant, @@ -54,6 +57,8 @@ struct SwarmTask { swarm: Swarm, + last_message: Instant, + gossip: mpsc::UnboundedReceiver, signed_cosigns: mpsc::UnboundedSender, tributary_gossip: mpsc::UnboundedSender<(ValidatorSet, Vec)>, @@ -99,24 +104,21 @@ impl SwarmTask { fn handle_reqres(&mut self, event: reqres::Event) { match event { reqres::Event::Message { message, .. } => match message { - reqres::Message::Request { request_id, request, channel } => { - match request { - // TODO: Send these - reqres::Request::KeepAlive => { - let _: Result<_, _> = - self.swarm.behaviour_mut().reqres.send_response(channel, Response::NoResponse); - } - reqres::Request::Heartbeat { set, latest_block_hash } => { - self.inbound_request_response_channels.insert(request_id, channel); - let _: Result<_, _> = - self.heartbeat_requests.send((request_id, set, latest_block_hash)); - } - reqres::Request::NotableCosigns { global_session } => { - self.inbound_request_response_channels.insert(request_id, channel); - let _: Result<_, _> = self.notable_cosign_requests.send((request_id, global_session)); - } + reqres::Message::Request { request_id, request, channel } => match request { + reqres::Request::KeepAlive => { + let _: Result<_, _> = + self.swarm.behaviour_mut().reqres.send_response(channel, Response::None); } - } + reqres::Request::Heartbeat { set, latest_block_hash } => { + self.inbound_request_response_channels.insert(request_id, channel); + let _: Result<_, _> = + self.heartbeat_requests.send((request_id, set, latest_block_hash)); + } + reqres::Request::NotableCosigns { global_session } => { + self.inbound_request_response_channels.insert(request_id, channel); + let _: Result<_, _> = self.notable_cosign_requests.send((request_id, global_session)); + } + }, reqres::Message::Response { request_id, response } => { // Send Some(response) as the response for the request if let Some(channel) = self.outbound_request_responses.remove(&request_id) { @@ -136,9 +138,19 @@ impl SwarmTask { async fn run(mut self) { loop { + let time_till_keep_alive = Instant::now().saturating_duration_since(self.last_message); let time_till_rebuild_peers = self.rebuild_peers_at.saturating_duration_since(Instant::now()); tokio::select! { + () = tokio::time::sleep(time_till_keep_alive) => { + let peers = self.swarm.connected_peers().copied().collect::>(); + let behavior = self.swarm.behaviour_mut(); + for peer in peers { + behavior.reqres.send_request(&peer, Request::KeepAlive); + } + self.last_message = Instant::now(); + } + // Dial peers we're instructed to dial_opts = self.to_dial.recv() => { let dial_opts = dial_opts.expect("DialTask was closed?"); @@ -156,8 +168,6 @@ impl SwarmTask { We also use this to disconnect all peers who are no longer active in any network. */ () = tokio::time::sleep(time_till_rebuild_peers) => { - const TIME_BETWEEN_REBUILD_PEERS: Duration = Duration::from_secs(10 * 60); - let validators_by_network = self.validators.read().await.by_network().clone(); let connected_peers = self.swarm.connected_peers().copied().collect::>(); @@ -253,6 +263,7 @@ impl SwarmTask { let topic = message.topic(); let message = borsh::to_vec(&message).unwrap(); let _: Result<_, _> = self.swarm.behaviour_mut().gossip.publish(topic, message); + self.last_message = Instant::now(); } request = self.outbound_requests.recv() => { @@ -273,4 +284,58 @@ impl SwarmTask { } } } + + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( + dial_task: TaskHandle, + to_dial: mpsc::UnboundedReceiver, + + validators: Arc>, + peers: Peers, + + swarm: Swarm, + + gossip: mpsc::UnboundedReceiver, + signed_cosigns: mpsc::UnboundedSender, + tributary_gossip: mpsc::UnboundedSender<(ValidatorSet, Vec)>, + + outbound_requests: mpsc::UnboundedReceiver<( + PeerId, + Request, + oneshot::Sender>, + )>, + + heartbeat_requests: mpsc::UnboundedSender<(RequestId, ValidatorSet, [u8; 32])>, + notable_cosign_requests: mpsc::UnboundedSender<(RequestId, [u8; 32])>, + inbound_request_responses: mpsc::UnboundedReceiver<(RequestId, Response)>, + ) { + tokio::spawn( + SwarmTask { + dial_task, + to_dial, + last_dial_task_run: Instant::now(), + + validators, + peers, + rebuild_peers_at: Instant::now() + TIME_BETWEEN_REBUILD_PEERS, + + swarm, + + last_message: Instant::now(), + + gossip, + signed_cosigns, + tributary_gossip, + + outbound_requests, + outbound_request_responses: HashMap::new(), + + inbound_request_response_channels: HashMap::new(), + heartbeat_requests, + notable_cosign_requests, + inbound_request_responses, + } + .run(), + ); + } } diff --git a/coordinator/src/p2p/validators.rs b/coordinator/src/p2p/validators.rs index 4b3d3870..5d802f4b 100644 --- a/coordinator/src/p2p/validators.rs +++ b/coordinator/src/p2p/validators.rs @@ -1,4 +1,4 @@ -use core::borrow::Borrow; +use core::{borrow::Borrow, future::Future}; use std::{ sync::Arc, collections::{HashSet, HashMap}, @@ -6,6 +6,8 @@ use std::{ use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, Serai}; +use serai_task::{Task, ContinuallyRan}; + use libp2p::PeerId; use futures_util::stream::{StreamExt, FuturesUnordered}; @@ -103,8 +105,6 @@ impl Validators { } /// Update the view of the validators. - /// - /// Returns all validators removed from the active validator set. pub(crate) async fn update(&mut self) -> Result<(), String> { let session_changes = Self::session_changes(&self.serai, &self.sessions).await?; self.incorporate_session_changes(session_changes); @@ -124,19 +124,56 @@ impl Validators { } } -/// Update the view of the validators. +/// A task which updates a set of validators. /// -/// This minimizes the time an exclusive lock is held over the validators to minimize the -/// disruption to functioning. -/// -/// Returns all validators removed from the active validator set. -pub(crate) async fn update_shared_validators( - validators: &Arc>, -) -> Result<(), String> { - let session_changes = { - let validators = validators.read().await; - Validators::session_changes(validators.serai.clone(), validators.sessions.clone()).await? - }; - validators.write().await.incorporate_session_changes(session_changes); - Ok(()) +/// The validators managed by this tak will have their exclusive lock held for a minimal amount of +/// time while the update occurs to minimize the disruption to the services relying on it. +pub(crate) struct UpdateValidatorsTask { + validators: Arc>, +} + +impl UpdateValidatorsTask { + /// Spawn a new instance of the UpdateValidatorsTask. + /// + /// This returns a reference to the Validators it updates after spawning itself. + pub(crate) fn spawn(serai: Serai) -> Arc> { + // The validators which will be updated + let validators = Arc::new(RwLock::new(Validators { + serai, + sessions: HashMap::new(), + by_network: HashMap::new(), + validators: HashMap::new(), + })); + + // Define the task + let (update_validators_task, update_validators_task_handle) = Task::new(); + // Forget the handle, as dropping the handle would stop the task + core::mem::forget(update_validators_task_handle); + // Spawn the task + tokio::spawn( + (Self { validators: validators.clone() }).continually_run(update_validators_task, vec![]), + ); + + // Return the validators + validators + } +} + +impl ContinuallyRan for UpdateValidatorsTask { + // Only run every minute, not the default of every five seconds + const DELAY_BETWEEN_ITERATIONS: u64 = 60; + const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60; + + fn run_iteration(&mut self) -> impl Send + Future> { + async move { + let session_changes = { + let validators = self.validators.read().await; + Validators::session_changes(validators.serai.clone(), validators.sessions.clone()) + .await + .map_err(|e| format!("{e:?}"))? + }; + self.validators.write().await.incorporate_session_changes(session_changes); + Ok(true) + } + } }