From 96518500b19a2e02aff925846c3fa920b04ea5bb Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sun, 5 Jan 2025 00:29:11 -0500 Subject: [PATCH] Don't hold the shared Validators write lock while making requests to Serai --- coordinator/src/p2p/mod.rs | 4 +- coordinator/src/p2p/validators.rs | 61 ++++++++++++++++++++++++------- 2 files changed, 50 insertions(+), 15 deletions(-) diff --git a/coordinator/src/p2p/mod.rs b/coordinator/src/p2p/mod.rs index 6ca71816..dbecc7c7 100644 --- a/coordinator/src/p2p/mod.rs +++ b/coordinator/src/p2p/mod.rs @@ -17,7 +17,7 @@ use libp2p::{ /// A struct to sync the validators from the Serai node in order to keep track of them. mod validators; -use validators::Validators; +use validators::{Validators, update_shared_validators}; /// The authentication protocol upgrade to limit the P2P network to active validators. mod authenticate; @@ -104,7 +104,7 @@ impl SwarmTask { const TIME_BETWEEN_REFRESH_VALIDATORS: Duration = Duration::from_secs(60); const MAX_TIME_BETWEEN_REFRESH_VALIDATORS: Duration = Duration::from_secs(5 * 60); - let update = self.validators.write().await.update().await; + let update = update_shared_validators(&self.validators).await; match update { Ok(removed) => { for removed in removed { diff --git a/coordinator/src/p2p/validators.rs b/coordinator/src/p2p/validators.rs index 3956a547..c3d3b2ba 100644 --- a/coordinator/src/p2p/validators.rs +++ b/coordinator/src/p2p/validators.rs @@ -1,10 +1,15 @@ -use std::collections::{HashSet, HashMap}; +use core::borrow::Borrow; +use std::{ + sync::Arc, + collections::{HashSet, HashMap}, +}; use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, Serai}; use libp2p::PeerId; use futures_util::stream::{StreamExt, FuturesUnordered}; +use tokio::sync::RwLock; use crate::p2p::peer_id_from_public; @@ -20,14 +25,12 @@ pub(crate) struct Validators { } impl Validators { - /// Update the view of the validators. - /// - /// Returns all validators removed from the active validator set. - pub(crate) async fn update(&mut self) -> Result, String> { - let mut removed = HashSet::new(); - + async fn session_changes( + serai: impl Borrow, + sessions: impl Borrow>, + ) -> Result)>, String> { let temporal_serai = - self.serai.as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?; + serai.borrow().as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?; let temporal_serai = temporal_serai.validator_sets(); let mut session_changes = vec![]; @@ -39,7 +42,7 @@ impl Validators { if network == NetworkId::Serai { continue; } - let sessions = &self.sessions; + let sessions = sessions.borrow(); futures.push(async move { let session = match temporal_serai.session(network).await { Ok(Some(session)) => session, @@ -51,7 +54,11 @@ impl Validators { Ok(None) } else { match temporal_serai.active_network_validators(network).await { - Ok(validators) => Ok(Some((network, session, validators))), + Ok(validators) => Ok(Some(( + network, + session, + validators.into_iter().map(peer_id_from_public).collect(), + ))), Err(e) => Err(format!("{e:?}")), } } @@ -64,9 +71,16 @@ impl Validators { } } - for (network, session, validators) in session_changes { - let validators = validators.into_iter().map(peer_id_from_public).collect::>(); + Ok(session_changes) + } + fn incorporate_session_changes( + &mut self, + session_changes: Vec<(NetworkId, Session, HashSet)>, + ) -> HashSet { + let mut removed = HashSet::new(); + + for (network, session, validators) in session_changes { // Remove the existing validators for validator in self.by_network.remove(&network).unwrap_or_else(HashSet::new) { let mut networks = self.validators.remove(&validator).unwrap(); @@ -88,7 +102,15 @@ impl Validators { self.sessions.insert(network, session); } - Ok(removed) + removed + } + + /// Update the view of the validators. + /// + /// Returns all validators removed from the active validator set. + pub(crate) async fn update(&mut self) -> Result, String> { + let session_changes = Self::session_changes(&self.serai, &self.sessions).await?; + Ok(self.incorporate_session_changes(session_changes)) } pub(crate) fn by_network(&self) -> &HashMap> { @@ -103,3 +125,16 @@ impl Validators { self.validators.get(peer_id) } } + +/// Update the view of the validators. +/// +/// Returns all validators removed from the active validator set. +pub(crate) async fn update_shared_validators( + validators: &Arc>, +) -> Result, String> { + let session_changes = { + let validators = validators.read().await; + Validators::session_changes(validators.serai.clone(), validators.sessions.clone()).await? + }; + Ok(validators.write().await.incorporate_session_changes(session_changes)) +}