From f27e4e320262a4d431150163339f1d29c5b30d02 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Tue, 7 Jan 2025 16:34:19 -0500 Subject: [PATCH] Move the WIP SwarmTask to its own file --- coordinator/src/p2p/gossip.rs | 13 +- coordinator/src/p2p/mod.rs | 234 ++-------------------------- coordinator/src/p2p/reqres.rs | 8 +- coordinator/src/p2p/swarm.rs | 247 ++++++++++++++++++++++++++++++ coordinator/src/p2p/validators.rs | 23 ++- 5 files changed, 291 insertions(+), 234 deletions(-) create mode 100644 coordinator/src/p2p/swarm.rs diff --git a/coordinator/src/p2p/gossip.rs b/coordinator/src/p2p/gossip.rs index dc6a5849..7f5a078c 100644 --- a/coordinator/src/p2p/gossip.rs +++ b/coordinator/src/p2p/gossip.rs @@ -7,8 +7,8 @@ use borsh::{BorshSerialize, BorshDeserialize}; use serai_client::validator_sets::primitives::ValidatorSet; use libp2p::gossipsub::{ - IdentTopic, MessageId, MessageAuthenticity, ValidationMode, ConfigBuilder, IdentityTransform, - AllowAllSubscriptionFilter, Behaviour, + TopicHash, IdentTopic, MessageId, MessageAuthenticity, ValidationMode, ConfigBuilder, + IdentityTransform, AllowAllSubscriptionFilter, Behaviour, }; pub use libp2p::gossipsub::Event; @@ -32,6 +32,15 @@ pub(crate) enum Message { Cosign(SignedCosign), } +impl Message { + pub(crate) fn topic(&self) -> TopicHash { + match self { + Message::Tributary { set, .. } => topic_for_set(*set).hash(), + Message::Cosign(_) => IdentTopic::new(BASE_TOPIC).hash(), + } + } +} + pub(crate) type Behavior = Behaviour; pub(crate) fn new_behavior() -> Behavior { diff --git a/coordinator/src/p2p/mod.rs b/coordinator/src/p2p/mod.rs index cedaed3e..7ccb46a3 100644 --- a/coordinator/src/p2p/mod.rs +++ b/coordinator/src/p2p/mod.rs @@ -1,27 +1,16 @@ +use core::future::Future; use std::{ sync::Arc, collections::{HashSet, HashMap}, - time::{Duration, Instant}, }; -use borsh::BorshDeserialize; - use serai_client::primitives::{NetworkId, PublicKey}; -use tokio::sync::{mpsc, oneshot, RwLock}; +use tokio::sync::RwLock; -use serai_db::Db; -use serai_task::TaskHandle; +use serai_task::ContinuallyRan; -use serai_cosign::Cosigning; - -use futures_util::StreamExt; -use libp2p::{ - multihash::Multihash, - identity::PeerId, - request_response::RequestId, - swarm::{dial_opts::DialOpts, NetworkBehaviour, SwarmEvent, Swarm}, -}; +use libp2p::{multihash::Multihash, identity::PeerId, swarm::NetworkBehaviour}; /// A struct to sync the validators from the Serai node in order to keep track of them. mod validators; @@ -43,6 +32,9 @@ mod gossip; /// The heartbeat task, effecting sync of Tributaries mod heartbeat; +/// The swarm task, running it and dispatching to/from it +mod swarm; + const PORT: u16 = 30563; // 5132 ^ (('c' << 8) | 'o') fn peer_id_from_public(public: PublicKey) -> PeerId { @@ -84,213 +76,19 @@ struct Behavior { gossip: gossip::Behavior, } -struct SwarmTask { - dial_task: TaskHandle, - to_dial: mpsc::UnboundedReceiver, - last_dial_task_run: Instant, - +struct UpdateSharedValidatorsTask { validators: Arc>, - last_refreshed_validators: Instant, - next_refresh_validators: Instant, - - peers: Peers, - rebuild_peers_at: Instant, - - db: D, - swarm: Swarm, - - request_recv: mpsc::UnboundedReceiver<(PeerId, Request, oneshot::Sender>)>, - request_resp: HashMap>>, } -impl SwarmTask { - async fn run(mut self) { - loop { - let time_till_refresh_validators = - self.next_refresh_validators.saturating_duration_since(Instant::now()); - let time_till_rebuild_peers = self.rebuild_peers_at.saturating_duration_since(Instant::now()); +impl ContinuallyRan for UpdateSharedValidatorsTask { + // Only run every minute, not the default of every five seconds + const DELAY_BETWEEN_ITERATIONS: u64 = 60; + const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60; - tokio::select! { - biased; - - // Refresh the instance of validators we use to track peers/share with authenticate - // TODO: Move this to a task - () = tokio::time::sleep(time_till_refresh_validators) => { - const TIME_BETWEEN_REFRESH_VALIDATORS: Duration = Duration::from_secs(60); - const MAX_TIME_BETWEEN_REFRESH_VALIDATORS: Duration = Duration::from_secs(5 * 60); - - let update = update_shared_validators(&self.validators).await; - match update { - Ok(removed) => { - for removed in removed { - let _: Result<_, _> = self.swarm.disconnect_peer_id(removed); - } - self.last_refreshed_validators = Instant::now(); - self.next_refresh_validators = Instant::now() + TIME_BETWEEN_REFRESH_VALIDATORS; - } - Err(e) => { - log::warn!("couldn't refresh validators: {e:?}"); - // Increase the delay before the next refresh by using the time since the last - // refresh. This will be 5 seconds, then 5 seconds, then 10 seconds, then 20... - let time_since_last = self - .next_refresh_validators - .saturating_duration_since(self.last_refreshed_validators); - // But limit the delay - self.next_refresh_validators = - Instant::now() + time_since_last.min(MAX_TIME_BETWEEN_REFRESH_VALIDATORS); - }, - } - } - - // Rebuild the peers every 10 minutes - // - // This handles edge cases such as when a validator changes the networks they're present - // in, race conditions, or any other edge cases/quirks which would otherwise risk spiraling - // out of control - () = tokio::time::sleep(time_till_rebuild_peers) => { - const TIME_BETWEEN_REBUILD_PEERS: Duration = Duration::from_secs(10 * 60); - - let validators_by_network = self.validators.read().await.by_network().clone(); - let connected = self.swarm.connected_peers().copied().collect::>(); - let mut peers = HashMap::new(); - for (network, validators) in validators_by_network { - peers.insert(network, validators.intersection(&connected).copied().collect()); - } - *self.peers.peers.write().await = peers; - - self.rebuild_peers_at = Instant::now() + TIME_BETWEEN_REBUILD_PEERS; - } - - // Dial peers we're instructed to - dial_opts = self.to_dial.recv() => { - let dial_opts = dial_opts.expect("DialTask was closed?"); - let _: Result<_, _> = self.swarm.dial(dial_opts); - } - - request = self.request_recv.recv() => { - let (peer, request, response_channel) = - request.expect("channel for requests was closed?"); - let request_id = self.swarm.behaviour_mut().reqres.send_request(&peer, request); - self.request_resp.insert(request_id, response_channel); - } - - // Handle swarm events - event = self.swarm.next() => { - // `Swarm::next` will never return `Poll::Ready(None)` - // https://docs.rs/ - // libp2p/0.54.1/libp2p/struct.Swarm.html#impl-Stream-for-Swarm%3CTBehaviour%3E - let event = event.unwrap(); - match event { - SwarmEvent::Behaviour(BehaviorEvent::Reqres(event)) => match event { - reqres::Event::Message { message, .. } => match message { - reqres::Message::Request { request_id: _, request, channel } => { - match request { - // TODO: Send these - reqres::Request::KeepAlive => {}, - reqres::Request::Heartbeat { set, latest_block_hash } => todo!("TODO"), - reqres::Request::NotableCosigns { global_session } => { - // TODO: Move this out - let cosigns = Cosigning::::notable_cosigns(&self.db, global_session); - let res = reqres::Response::NotableCosigns(cosigns); - let _: Result<_, _> = - self.swarm.behaviour_mut().reqres.send_response(channel, res); - }, - } - } - reqres::Message::Response { request_id, response } => { - // Send Some(response) as the response for the request - if let Some(channel) = self.request_resp.remove(&request_id) { - let _: Result<_, _> = channel.send(Some(response)); - } - }, - } - reqres::Event::OutboundFailure { request_id, .. } => { - // Send None as the response for the request - if let Some(channel) = self.request_resp.remove(&request_id) { - let _: Result<_, _> = channel.send(None); - } - }, - reqres::Event::InboundFailure { .. } | reqres::Event::ResponseSent { .. } => {}, - }, - SwarmEvent::Behaviour(BehaviorEvent::Gossip(event)) => match event { - gossip::Event::Message { message, .. } => { - let Ok(message) = gossip::Message::deserialize(&mut message.data.as_slice()) else { - continue - }; - match message { - gossip::Message::Tributary { set, message } => todo!("TODO"), - gossip::Message::Cosign(signed_cosign) => todo!("TODO"), - } - } - gossip::Event::Subscribed { .. } | gossip::Event::Unsubscribed { .. } => {}, - gossip::Event::GossipsubNotSupported { peer_id } => { - let _: Result<_, _> = self.swarm.disconnect_peer_id(peer_id); - } - }, - - // New connection, so update peers - SwarmEvent::ConnectionEstablished { peer_id, .. } => { - let Some(networks) = - self.validators.read().await.networks(&peer_id).cloned() else { continue }; - for network in networks { - self - .peers - .peers - .write() - .await - .entry(network) - .or_insert_with(HashSet::new) - .insert(peer_id); - } - }, - - // Connection closed, so update peers - SwarmEvent::ConnectionClosed { peer_id, .. } => { - let Some(networks) = - self.validators.read().await.networks(&peer_id).cloned() else { continue }; - for network in networks { - self - .peers - .peers - .write() - .await - .entry(network) - .or_insert_with(HashSet::new) - .remove(&peer_id); - } - - /* - We want to re-run the dial task, since we lost a peer, in case we should find new - peers. This opens a DoS where a validator repeatedly opens/closes connections to - force iterations of the dial task. We prevent this by setting a minimum distance - since the last explicit iteration. - - This is suboptimal. If we have several disconnects in immediate proximity, we'll - trigger the dial task upon the first (where we may still have enough peers we - shouldn't dial more) but not the last (where we may have so few peers left we - should dial more). This is accepted as the dial task will eventually run on its - natural timer. - */ - const MINIMUM_TIME_SINCE_LAST_EXPLICIT_DIAL: Duration = Duration::from_secs(60); - let now = Instant::now(); - if (self.last_dial_task_run + MINIMUM_TIME_SINCE_LAST_EXPLICIT_DIAL) < now { - self.dial_task.run_now(); - self.last_dial_task_run = now; - } - }, - - // We don't handle any of these - SwarmEvent::IncomingConnection { .. } | - SwarmEvent::IncomingConnectionError { .. } | - SwarmEvent::OutgoingConnectionError { .. } | - SwarmEvent::NewListenAddr { .. } | - SwarmEvent::ExpiredListenAddr { .. } | - SwarmEvent::ListenerClosed { .. } | - SwarmEvent::ListenerError { .. } | - SwarmEvent::Dialing { .. } => {} - } - } - } + fn run_iteration(&mut self) -> impl Send + Future> { + async move { + update_shared_validators(&self.validators).await.map_err(|e| format!("{e:?}"))?; + Ok(true) } } } diff --git a/coordinator/src/p2p/reqres.rs b/coordinator/src/p2p/reqres.rs index 0793e839..ad9075d7 100644 --- a/coordinator/src/p2p/reqres.rs +++ b/coordinator/src/p2p/reqres.rs @@ -8,8 +8,10 @@ use serai_client::validator_sets::primitives::ValidatorSet; use futures_util::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use libp2p::request_response::{self, Codec as CodecTrait, Config, Behaviour, ProtocolSupport}; -pub use request_response::{Message, Event}; +use libp2p::request_response::{ + self, Codec as CodecTrait, Event as GenericEvent, Config, Behaviour, ProtocolSupport, +}; +pub use request_response::Message; use serai_cosign::SignedCosign; @@ -128,6 +130,8 @@ impl CodecTrait for Codec { } } +pub(crate) type Event = GenericEvent; + pub(crate) type Behavior = Behaviour; pub(crate) fn new_behavior() -> Behavior { let mut config = Config::default(); diff --git a/coordinator/src/p2p/swarm.rs b/coordinator/src/p2p/swarm.rs new file mode 100644 index 00000000..8aab3d90 --- /dev/null +++ b/coordinator/src/p2p/swarm.rs @@ -0,0 +1,247 @@ +use std::{ + sync::Arc, + collections::{HashSet, HashMap}, + time::{Duration, Instant}, +}; + +use borsh::BorshDeserialize; + +use tokio::sync::{mpsc, oneshot, RwLock}; + +use serai_db::Db; +use serai_task::TaskHandle; + +use serai_cosign::Cosigning; + +use futures_util::StreamExt; +use libp2p::{ + identity::PeerId, + request_response::RequestId, + swarm::{dial_opts::DialOpts, SwarmEvent, Swarm}, +}; + +use crate::p2p::{ + Peers, BehaviorEvent, Behavior, + validators::Validators, + reqres::{self, Request, Response}, + gossip, +}; + +/* + `SwarmTask` handles everything we need the `Swarm` object for. The goal is to minimize the + contention on this task. Unfortunately, the `Swarm` object itself is needed for a variety of + purposes making this a rather large task. + + Responsibilities include: + - Actually dialing new peers (the selection process occurs in another task) + - Maintaining the peers structure (as we need the Swarm object to see who our peers are) + - Gossiping messages + - Dispatching gossiped messages + - Sending requests + - Dispatching responses to requests + - Dispatching received requests + - Sending responses +*/ +struct SwarmTask { + dial_task: TaskHandle, + to_dial: mpsc::UnboundedReceiver, + last_dial_task_run: Instant, + + validators: Arc>, + + peers: Peers, + rebuild_peers_at: Instant, + + db: D, + swarm: Swarm, + + gossip: mpsc::UnboundedReceiver, + + outbound_requests: mpsc::UnboundedReceiver<(PeerId, Request, oneshot::Sender>)>, + outbound_requests_responses: HashMap>>, +} + +impl SwarmTask { + fn handle_reqres(&mut self, event: reqres::Event) { + match event { + reqres::Event::Message { message, .. } => match message { + reqres::Message::Request { request_id: _, request, channel } => { + match request { + // TODO: Send these + reqres::Request::KeepAlive => {} + reqres::Request::Heartbeat { set, latest_block_hash } => todo!("TODO"), + reqres::Request::NotableCosigns { global_session } => { + // TODO: Move this out + let cosigns = Cosigning::::notable_cosigns(&self.db, global_session); + let res = reqres::Response::NotableCosigns(cosigns); + let _: Result<_, _> = self.swarm.behaviour_mut().reqres.send_response(channel, res); + } + } + } + reqres::Message::Response { request_id, response } => { + // Send Some(response) as the response for the request + if let Some(channel) = self.outbound_requests_responses.remove(&request_id) { + let _: Result<_, _> = channel.send(Some(response)); + } + } + }, + reqres::Event::OutboundFailure { request_id, .. } => { + // Send None as the response for the request + if let Some(channel) = self.outbound_requests_responses.remove(&request_id) { + let _: Result<_, _> = channel.send(None); + } + } + reqres::Event::InboundFailure { .. } | reqres::Event::ResponseSent { .. } => {} + } + } + + fn handle_gossip(&mut self, event: gossip::Event) { + match event { + gossip::Event::Message { message, .. } => { + let Ok(message) = gossip::Message::deserialize(&mut message.data.as_slice()) else { + // TODO: Penalize the PeerId which sent this message + return; + }; + match message { + gossip::Message::Tributary { set, message } => todo!("TODO"), + gossip::Message::Cosign(signed_cosign) => todo!("TODO"), + } + } + gossip::Event::Subscribed { .. } | gossip::Event::Unsubscribed { .. } => {} + gossip::Event::GossipsubNotSupported { peer_id } => { + let _: Result<_, _> = self.swarm.disconnect_peer_id(peer_id); + } + } + } + + async fn run(mut self) { + loop { + let time_till_rebuild_peers = self.rebuild_peers_at.saturating_duration_since(Instant::now()); + + tokio::select! { + // Dial peers we're instructed to + dial_opts = self.to_dial.recv() => { + let dial_opts = dial_opts.expect("DialTask was closed?"); + let _: Result<_, _> = self.swarm.dial(dial_opts); + } + + /* + Rebuild the peers every 10 minutes. + + This protects against any race conditions/edge cases we have in our logic to track peers, + along with unrepresented behavior such as when a peer changes the networks they're active + in. This lets the peer tracking logic simply be 'good enough' to not become horribly + corrupt over the span of `TIME_BETWEEN_REBUILD_PEERS`. + + We also use this to disconnect all peers who are no longer active in any network. + */ + () = tokio::time::sleep(time_till_rebuild_peers) => { + const TIME_BETWEEN_REBUILD_PEERS: Duration = Duration::from_secs(10 * 60); + + let validators_by_network = self.validators.read().await.by_network().clone(); + let connected_peers = self.swarm.connected_peers().copied().collect::>(); + + // We initially populate the list of peers to disconnect with all peers + let mut to_disconnect = connected_peers.clone(); + + // Build the new peers object + let mut peers = HashMap::new(); + for (network, validators) in validators_by_network { + peers.insert(network, validators.intersection(&connected_peers).copied().collect()); + + // If this peer is in this validator set, don't keep it flagged for disconnection + to_disconnect.retain(|peer| !validators.contains(peer)); + } + + // Write the new peers object + *self.peers.peers.write().await = peers; + self.rebuild_peers_at = Instant::now() + TIME_BETWEEN_REBUILD_PEERS; + + // Disconnect all peers marked for disconnection + for peer in to_disconnect { + let _: Result<_, _> = self.swarm.disconnect_peer_id(peer); + } + } + + // Handle swarm events + event = self.swarm.next() => { + // `Swarm::next` will never return `Poll::Ready(None)` + // https://docs.rs/ + // libp2p/0.54.1/libp2p/struct.Swarm.html#impl-Stream-for-Swarm%3CTBehaviour%3E + let event = event.unwrap(); + match event { + // New connection, so update peers + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + let Some(networks) = + self.validators.read().await.networks(&peer_id).cloned() else { continue }; + let mut peers = self.peers.peers.write().await; + for network in networks { + peers.entry(network).or_insert_with(HashSet::new).insert(peer_id); + } + } + + // Connection closed, so update peers + SwarmEvent::ConnectionClosed { peer_id, .. } => { + let Some(networks) = + self.validators.read().await.networks(&peer_id).cloned() else { continue }; + let mut peers = self.peers.peers.write().await; + for network in networks { + peers.entry(network).or_insert_with(HashSet::new).remove(&peer_id); + } + + /* + We want to re-run the dial task, since we lost a peer, in case we should find new + peers. This opens a DoS where a validator repeatedly opens/closes connections to + force iterations of the dial task. We prevent this by setting a minimum distance + since the last explicit iteration. + + This is suboptimal. If we have several disconnects in immediate proximity, we'll + trigger the dial task upon the first (where we may still have enough peers we + shouldn't dial more) but not the last (where we may have so few peers left we + should dial more). This is accepted as the dial task will eventually run on its + natural timer. + */ + const MINIMUM_TIME_SINCE_LAST_EXPLICIT_DIAL: Duration = Duration::from_secs(60); + let now = Instant::now(); + if (self.last_dial_task_run + MINIMUM_TIME_SINCE_LAST_EXPLICIT_DIAL) < now { + self.dial_task.run_now(); + self.last_dial_task_run = now; + } + } + + SwarmEvent::Behaviour(BehaviorEvent::Reqres(event)) => { + self.handle_reqres(event) + } + SwarmEvent::Behaviour(BehaviorEvent::Gossip(event)) => { + self.handle_gossip(event) + } + + // We don't handle any of these + SwarmEvent::IncomingConnection { .. } | + SwarmEvent::IncomingConnectionError { .. } | + SwarmEvent::OutgoingConnectionError { .. } | + SwarmEvent::NewListenAddr { .. } | + SwarmEvent::ExpiredListenAddr { .. } | + SwarmEvent::ListenerClosed { .. } | + SwarmEvent::ListenerError { .. } | + SwarmEvent::Dialing { .. } => {} + } + } + + request = self.outbound_requests.recv() => { + let (peer, request, response_channel) = + request.expect("channel for requests was closed?"); + let request_id = self.swarm.behaviour_mut().reqres.send_request(&peer, request); + self.outbound_requests_responses.insert(request_id, response_channel); + } + + message = self.gossip.recv() => { + let message = message.expect("channel for messages to gossip was closed?"); + let topic = message.topic(); + let message = borsh::to_vec(&message).unwrap(); + let _: Result<_, _> = self.swarm.behaviour_mut().gossip.publish(topic, message); + } + } + } + } +} diff --git a/coordinator/src/p2p/validators.rs b/coordinator/src/p2p/validators.rs index 95cbe8b1..4b3d3870 100644 --- a/coordinator/src/p2p/validators.rs +++ b/coordinator/src/p2p/validators.rs @@ -77,17 +77,16 @@ impl Validators { fn incorporate_session_changes( &mut self, session_changes: Vec<(NetworkId, Session, HashSet)>, - ) -> HashSet { - let mut removed = HashSet::new(); - + ) { for (network, session, validators) in session_changes { // Remove the existing validators for validator in self.by_network.remove(&network).unwrap_or_else(HashSet::new) { + // Get all networks this validator is in let mut networks = self.validators.remove(&validator).unwrap(); + // Remove this one networks.remove(&network); - if networks.is_empty() { - removed.insert(validator); - } else { + // Insert the networks back if the validator was present in other networks + if !networks.is_empty() { self.validators.insert(validator, networks); } } @@ -101,16 +100,15 @@ impl Validators { // Update the session we have populated self.sessions.insert(network, session); } - - removed } /// Update the view of the validators. /// /// Returns all validators removed from the active validator set. - pub(crate) async fn update(&mut self) -> Result, String> { + pub(crate) async fn update(&mut self) -> Result<(), String> { let session_changes = Self::session_changes(&self.serai, &self.sessions).await?; - Ok(self.incorporate_session_changes(session_changes)) + self.incorporate_session_changes(session_changes); + Ok(()) } pub(crate) fn by_network(&self) -> &HashMap> { @@ -134,10 +132,11 @@ impl Validators { /// Returns all validators removed from the active validator set. pub(crate) async fn update_shared_validators( validators: &Arc>, -) -> Result, String> { +) -> Result<(), String> { let session_changes = { let validators = validators.read().await; Validators::session_changes(validators.serai.clone(), validators.sessions.clone()).await? }; - Ok(validators.write().await.incorporate_session_changes(session_changes)) + validators.write().await.incorporate_session_changes(session_changes); + Ok(()) }