From 9a5a661d04215e319f20851449ad3f9aa174bcab Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sat, 4 Jan 2025 23:28:29 -0500 Subject: [PATCH] Start on the task to manage the swarm --- coordinator/Cargo.toml | 1 + coordinator/src/p2p/dial.rs | 8 +- coordinator/src/p2p/mod.rs | 136 +++++++++++++++++++++++++++++- coordinator/src/p2p/reqres.rs | 2 +- coordinator/src/p2p/validators.rs | 42 +++++---- 5 files changed, 168 insertions(+), 21 deletions(-) diff --git a/coordinator/Cargo.toml b/coordinator/Cargo.toml index 9515bd74..d0f8cb24 100644 --- a/coordinator/Cargo.toml +++ b/coordinator/Cargo.toml @@ -25,6 +25,7 @@ bitvec = { version = "1", default-features = false, features = ["std"] } rand_core = { version = "0.6", default-features = false, features = ["std"] } blake2 = { version = "0.10", default-features = false, features = ["std"] } +schnorrkel = { version = "0.11", default-features = false, features = ["std"] } transcript = { package = "flexible-transcript", path = "../crypto/transcript", default-features = false, features = ["std", "recommended"] } ciphersuite = { path = "../crypto/ciphersuite", default-features = false, features = ["std"] } diff --git a/coordinator/src/p2p/dial.rs b/coordinator/src/p2p/dial.rs index 2e427ee7..59c976cd 100644 --- a/coordinator/src/p2p/dial.rs +++ b/coordinator/src/p2p/dial.rs @@ -1,4 +1,5 @@ use core::future::Future; +use std::collections::HashSet; use rand_core::{RngCore, OsRng}; @@ -25,6 +26,7 @@ struct DialTask { } impl ContinuallyRan for DialTask { + // Only run every thirty seconds, not the default of every five const DELAY_BETWEEN_ITERATIONS: u64 = 30; fn run_iteration(&mut self) -> impl Send + Future> { @@ -37,7 +39,7 @@ impl ContinuallyRan for DialTask { .peers .peers .read() - .unwrap() + .await .iter() .map(|(network, peers)| (*network, peers.len())) .collect::>(); @@ -54,9 +56,9 @@ impl ContinuallyRan for DialTask { (peer_count < self .validators - .validators() + .by_network() .get(&network) - .map(Vec::len) + .map(HashSet::len) .unwrap_or(0) .saturating_sub(1)) { diff --git a/coordinator/src/p2p/mod.rs b/coordinator/src/p2p/mod.rs index 97f00cbf..c1f6eca5 100644 --- a/coordinator/src/p2p/mod.rs +++ b/coordinator/src/p2p/mod.rs @@ -1,11 +1,12 @@ use std::{ - sync::{Arc, RwLock}, + sync::Arc, collections::{HashSet, HashMap}, + time::{Duration, Instant}, }; use serai_client::primitives::{NetworkId, PublicKey}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, RwLock}; use futures_util::StreamExt; use libp2p::{ @@ -16,6 +17,7 @@ use libp2p::{ /// A struct to sync the validators from the Serai node in order to keep track of them. mod validators; +use validators::Validators; /// The authentication protocol upgrade to limit the P2P network to active validators. mod authenticate; @@ -73,3 +75,133 @@ struct Behavior { reqres: reqres::Behavior, gossip: gossip::Behavior, } + +struct SwarmTask { + to_dial: mpsc::UnboundedReceiver, + + validators: Arc>, + last_refreshed_validators: Instant, + next_refresh_validators: Instant, + + peers: Peers, + rebuild_peers_at: Instant, + + swarm: Swarm, +} + +impl SwarmTask { + async fn run(mut self) { + loop { + let time_till_refresh_validators = + self.next_refresh_validators.saturating_duration_since(Instant::now()); + let time_till_rebuild_peers = self.rebuild_peers_at.saturating_duration_since(Instant::now()); + + tokio::select! { + biased; + + // Refresh the instance of validators we use to track peers/share with authenticate + () = tokio::time::sleep(time_till_refresh_validators) => { + const TIME_BETWEEN_REFRESH_VALIDATORS: Duration = Duration::from_secs(5); + const MAX_TIME_BETWEEN_REFRESH_VALIDATORS: Duration = Duration::from_secs(120); + + let update = self.validators.write().await.update().await; + match update { + Ok(removed) => { + for removed in removed { + let _: Result<_, _> = self.swarm.disconnect_peer_id(removed); + } + self.last_refreshed_validators = Instant::now(); + self.next_refresh_validators = Instant::now() + TIME_BETWEEN_REFRESH_VALIDATORS; + } + Err(e) => { + log::warn!("couldn't refresh validators: {e:?}"); + // Increase the delay before the next refresh by using the time since the last + // refresh. This will be 5 seconds, then 5 seconds, then 10 seconds, then 20... + let time_since_last = self + .next_refresh_validators + .saturating_duration_since(self.last_refreshed_validators); + // But limit the delay + self.next_refresh_validators = + Instant::now() + time_since_last.min(MAX_TIME_BETWEEN_REFRESH_VALIDATORS); + }, + } + } + + // Rebuild the peers every 10 minutes + // + // This handles edge cases such as when a validator changes the networks they're present + // in, race conditions, or any other edge cases/quirks which would otherwise risk spiraling + // out of control + () = tokio::time::sleep(time_till_rebuild_peers) => { + const TIME_BETWEEN_REBUILD_PEERS: Duration = Duration::from_secs(10 * 60); + + let validators_by_network = self.validators.read().await.by_network().clone(); + let connected = self.swarm.connected_peers().copied().collect::>(); + let mut peers = HashMap::new(); + for (network, validators) in validators_by_network { + peers.insert(network, validators.intersection(&connected).copied().collect()); + } + *self.peers.peers.write().await = peers; + + self.rebuild_peers_at = Instant::now() + TIME_BETWEEN_REBUILD_PEERS; + } + + // Dial peers we're instructed to + dial_opts = self.to_dial.recv() => { + let dial_opts = dial_opts.expect("DialTask was closed?"); + let _: Result<_, _> = self.swarm.dial(dial_opts); + } + + // Handle swarm events + event = self.swarm.next() => { + // `Swarm::next` will never return `Poll::Ready(None)` + // https://docs.rs/ + // libp2p/0.54.1/libp2p/struct.Swarm.html#impl-Stream-for-Swarm%3CTBehaviour%3E + let event = event.unwrap(); + match event { + SwarmEvent::Behaviour(BehaviorEvent::Reqres(event)) => todo!("TODO"), + SwarmEvent::Behaviour(BehaviorEvent::Gossip(event)) => todo!("TODO"), + // New connection, so update peers + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + let Some(networks) = + self.validators.read().await.networks(&peer_id).cloned() else { continue }; + for network in networks { + self + .peers + .peers + .write() + .await + .entry(network) + .or_insert_with(HashSet::new) + .insert(peer_id); + } + }, + // Connection closed, so update peers + SwarmEvent::ConnectionClosed { peer_id, .. } => { + let Some(networks) = + self.validators.read().await.networks(&peer_id).cloned() else { continue }; + for network in networks { + self + .peers + .peers + .write() + .await + .entry(network) + .or_insert_with(HashSet::new) + .remove(&peer_id); + } + }, + SwarmEvent::IncomingConnection { .. } | + SwarmEvent::IncomingConnectionError { .. } | + SwarmEvent::OutgoingConnectionError { .. } | + SwarmEvent::NewListenAddr { .. } | + SwarmEvent::ExpiredListenAddr { .. } | + SwarmEvent::ListenerClosed { .. } | + SwarmEvent::ListenerError { .. } | + SwarmEvent::Dialing { .. } => {} + } + } + } + } + } +} diff --git a/coordinator/src/p2p/reqres.rs b/coordinator/src/p2p/reqres.rs index b5d87c1c..7faf2f8b 100644 --- a/coordinator/src/p2p/reqres.rs +++ b/coordinator/src/p2p/reqres.rs @@ -1,5 +1,5 @@ use core::{fmt, time::Duration}; -use std::io::{self, Read}; +use std::io; use async_trait::async_trait; diff --git a/coordinator/src/p2p/validators.rs b/coordinator/src/p2p/validators.rs index 1bd10110..26487a59 100644 --- a/coordinator/src/p2p/validators.rs +++ b/coordinator/src/p2p/validators.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashSet, HashMap}; use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, Serai}; @@ -12,13 +12,18 @@ pub(crate) struct Validators { // A cache for which session we're populated with the validators of sessions: HashMap, // The validators by network - by_network: HashMap>, - // The set of all validators (as a HashMap to represent the amount of inclusions) - set: HashMap, + by_network: HashMap>, + // The validators and their networks + validators: HashMap>, } impl Validators { - pub(crate) async fn update(&mut self) -> Result<(), String> { + /// Update the view of the validators. + /// + /// Returns all validators removed from the active validator set. + pub(crate) async fn update(&mut self) -> Result, String> { + let mut removed = HashSet::new(); + let temporal_serai = self.serai.as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?; let temporal_serai = temporal_serai.validator_sets(); @@ -35,20 +40,22 @@ impl Validators { let new_validators = temporal_serai.active_network_validators(network).await.map_err(|e| format!("{e:?}"))?; let new_validators = - new_validators.into_iter().map(peer_id_from_public).collect::>(); + new_validators.into_iter().map(peer_id_from_public).collect::>(); // Remove the existing validators - for validator in self.by_network.remove(&network).unwrap_or(vec![]) { - let mut inclusions = self.set.remove(&validator).unwrap(); - inclusions -= 1; - if inclusions != 0 { - self.set.insert(validator, inclusions); + for validator in self.by_network.remove(&network).unwrap_or_else(HashSet::new) { + let mut networks = self.validators.remove(&validator).unwrap(); + networks.remove(&network); + if networks.is_empty() { + removed.insert(validator); + } else { + self.validators.insert(validator, networks); } } // Add the new validators for validator in new_validators.iter().copied() { - *self.set.entry(validator).or_insert(0) += 1; + self.validators.entry(validator).or_insert_with(HashSet::new).insert(network); } self.by_network.insert(network, new_validators); @@ -56,14 +63,19 @@ impl Validators { self.sessions.insert(network, session); } } - Ok(()) + + Ok(removed) } - pub(crate) fn validators(&self) -> &HashMap> { + pub(crate) fn by_network(&self) -> &HashMap> { &self.by_network } pub(crate) fn contains(&self, peer_id: &PeerId) -> bool { - self.set.contains_key(peer_id) + self.validators.contains_key(peer_id) + } + + pub(crate) fn networks(&self, peer_id: &PeerId) -> Option<&HashSet> { + self.validators.get(peer_id) } }