From 2b8f481364a7598bd244cba833a2160a0345eb3c Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sun, 5 Jan 2025 00:17:05 -0500 Subject: [PATCH] Parallelize requests within Validators::update --- coordinator/src/p2p/dial.rs | 5 +- coordinator/src/p2p/mod.rs | 4 +- coordinator/src/p2p/validators.rs | 86 ++++++++++++++++++++----------- 3 files changed, 60 insertions(+), 35 deletions(-) diff --git a/coordinator/src/p2p/dial.rs b/coordinator/src/p2p/dial.rs index 59c976cd..13d7f7ff 100644 --- a/coordinator/src/p2p/dial.rs +++ b/coordinator/src/p2p/dial.rs @@ -26,8 +26,9 @@ struct DialTask { } impl ContinuallyRan for DialTask { - // Only run every thirty seconds, not the default of every five - const DELAY_BETWEEN_ITERATIONS: u64 = 30; + // Only run every five minutes, not the default of every five seconds + const DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60; + const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 10 * 60; fn run_iteration(&mut self) -> impl Send + Future> { async move { diff --git a/coordinator/src/p2p/mod.rs b/coordinator/src/p2p/mod.rs index c1f6eca5..6ca71816 100644 --- a/coordinator/src/p2p/mod.rs +++ b/coordinator/src/p2p/mod.rs @@ -101,8 +101,8 @@ impl SwarmTask { // Refresh the instance of validators we use to track peers/share with authenticate () = tokio::time::sleep(time_till_refresh_validators) => { - const TIME_BETWEEN_REFRESH_VALIDATORS: Duration = Duration::from_secs(5); - const MAX_TIME_BETWEEN_REFRESH_VALIDATORS: Duration = Duration::from_secs(120); + const TIME_BETWEEN_REFRESH_VALIDATORS: Duration = Duration::from_secs(60); + const MAX_TIME_BETWEEN_REFRESH_VALIDATORS: Duration = Duration::from_secs(5 * 60); let update = self.validators.write().await.update().await; match update { diff --git a/coordinator/src/p2p/validators.rs b/coordinator/src/p2p/validators.rs index 26487a59..3956a547 100644 --- a/coordinator/src/p2p/validators.rs +++ b/coordinator/src/p2p/validators.rs @@ -4,6 +4,8 @@ use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, S use libp2p::PeerId; +use futures_util::stream::{StreamExt, FuturesUnordered}; + use crate::p2p::peer_id_from_public; pub(crate) struct Validators { @@ -27,41 +29,63 @@ impl Validators { let temporal_serai = self.serai.as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?; let temporal_serai = temporal_serai.validator_sets(); - for network in serai_client::primitives::NETWORKS { - if network == NetworkId::Serai { - continue; - } - let Some(session) = temporal_serai.session(network).await.map_err(|e| format!("{e:?}"))? - else { - continue; - }; - // If the session has changed, populate it with the current validators - if self.sessions.get(&network) != Some(&session) { - let new_validators = - temporal_serai.active_network_validators(network).await.map_err(|e| format!("{e:?}"))?; - let new_validators = - new_validators.into_iter().map(peer_id_from_public).collect::>(); - // Remove the existing validators - for validator in self.by_network.remove(&network).unwrap_or_else(HashSet::new) { - let mut networks = self.validators.remove(&validator).unwrap(); - networks.remove(&network); - if networks.is_empty() { - removed.insert(validator); + let mut session_changes = vec![]; + { + // FuturesUnordered can be bad practice as it'll cause timeouts if infrequently polled, but + // we poll it till it yields all futures with the most minimal processing possible + let mut futures = FuturesUnordered::new(); + for network in serai_client::primitives::NETWORKS { + if network == NetworkId::Serai { + continue; + } + let sessions = &self.sessions; + futures.push(async move { + let session = match temporal_serai.session(network).await { + Ok(Some(session)) => session, + Ok(None) => return Ok(None), + Err(e) => return Err(format!("{e:?}")), + }; + + if sessions.get(&network) == Some(&session) { + Ok(None) } else { - self.validators.insert(validator, networks); + match temporal_serai.active_network_validators(network).await { + Ok(validators) => Ok(Some((network, session, validators))), + Err(e) => Err(format!("{e:?}")), + } } - } - - // Add the new validators - for validator in new_validators.iter().copied() { - self.validators.entry(validator).or_insert_with(HashSet::new).insert(network); - } - self.by_network.insert(network, new_validators); - - // Update the session we have populated - self.sessions.insert(network, session); + }); } + while let Some(session_change) = futures.next().await { + if let Some(session_change) = session_change? { + session_changes.push(session_change); + } + } + } + + for (network, session, validators) in session_changes { + let validators = validators.into_iter().map(peer_id_from_public).collect::>(); + + // Remove the existing validators + for validator in self.by_network.remove(&network).unwrap_or_else(HashSet::new) { + let mut networks = self.validators.remove(&validator).unwrap(); + networks.remove(&network); + if networks.is_empty() { + removed.insert(validator); + } else { + self.validators.insert(validator, networks); + } + } + + // Add the new validators + for validator in validators.iter().copied() { + self.validators.entry(validator).or_insert_with(HashSet::new).insert(network); + } + self.by_network.insert(network, validators); + + // Update the session we have populated + self.sessions.insert(network, session); } Ok(removed)