Don't hold the shared Validators write lock while making requests to Serai

This commit is contained in:
Luke Parker
2025-01-05 00:29:11 -05:00
parent 2b8f481364
commit 96518500b1
2 changed files with 50 additions and 15 deletions

View File

@@ -17,7 +17,7 @@ use libp2p::{
/// A struct to sync the validators from the Serai node in order to keep track of them.
mod validators;
use validators::Validators;
use validators::{Validators, update_shared_validators};
/// The authentication protocol upgrade to limit the P2P network to active validators.
mod authenticate;
@@ -104,7 +104,7 @@ impl SwarmTask {
const TIME_BETWEEN_REFRESH_VALIDATORS: Duration = Duration::from_secs(60);
const MAX_TIME_BETWEEN_REFRESH_VALIDATORS: Duration = Duration::from_secs(5 * 60);
let update = self.validators.write().await.update().await;
let update = update_shared_validators(&self.validators).await;
match update {
Ok(removed) => {
for removed in removed {

View File

@@ -1,10 +1,15 @@
use std::collections::{HashSet, HashMap};
use core::borrow::Borrow;
use std::{
sync::Arc,
collections::{HashSet, HashMap},
};
use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, Serai};
use libp2p::PeerId;
use futures_util::stream::{StreamExt, FuturesUnordered};
use tokio::sync::RwLock;
use crate::p2p::peer_id_from_public;
@@ -20,14 +25,12 @@ pub(crate) struct Validators {
}
impl Validators {
/// Update the view of the validators.
///
/// Returns all validators removed from the active validator set.
pub(crate) async fn update(&mut self) -> Result<HashSet<PeerId>, String> {
let mut removed = HashSet::new();
async fn session_changes(
serai: impl Borrow<Serai>,
sessions: impl Borrow<HashMap<NetworkId, Session>>,
) -> Result<Vec<(NetworkId, Session, HashSet<PeerId>)>, String> {
let temporal_serai =
self.serai.as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?;
serai.borrow().as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?;
let temporal_serai = temporal_serai.validator_sets();
let mut session_changes = vec![];
@@ -39,7 +42,7 @@ impl Validators {
if network == NetworkId::Serai {
continue;
}
let sessions = &self.sessions;
let sessions = sessions.borrow();
futures.push(async move {
let session = match temporal_serai.session(network).await {
Ok(Some(session)) => session,
@@ -51,7 +54,11 @@ impl Validators {
Ok(None)
} else {
match temporal_serai.active_network_validators(network).await {
Ok(validators) => Ok(Some((network, session, validators))),
Ok(validators) => Ok(Some((
network,
session,
validators.into_iter().map(peer_id_from_public).collect(),
))),
Err(e) => Err(format!("{e:?}")),
}
}
@@ -64,9 +71,16 @@ impl Validators {
}
}
for (network, session, validators) in session_changes {
let validators = validators.into_iter().map(peer_id_from_public).collect::<HashSet<_>>();
Ok(session_changes)
}
fn incorporate_session_changes(
&mut self,
session_changes: Vec<(NetworkId, Session, HashSet<PeerId>)>,
) -> HashSet<PeerId> {
let mut removed = HashSet::new();
for (network, session, validators) in session_changes {
// Remove the existing validators
for validator in self.by_network.remove(&network).unwrap_or_else(HashSet::new) {
let mut networks = self.validators.remove(&validator).unwrap();
@@ -88,7 +102,15 @@ impl Validators {
self.sessions.insert(network, session);
}
Ok(removed)
removed
}
/// Update the view of the validators.
///
/// Returns all validators removed from the active validator set.
pub(crate) async fn update(&mut self) -> Result<HashSet<PeerId>, String> {
let session_changes = Self::session_changes(&self.serai, &self.sessions).await?;
Ok(self.incorporate_session_changes(session_changes))
}
pub(crate) fn by_network(&self) -> &HashMap<NetworkId, HashSet<PeerId>> {
@@ -103,3 +125,16 @@ impl Validators {
self.validators.get(peer_id)
}
}
/// Update the view of the validators.
///
/// Returns all validators removed from the active validator set.
pub(crate) async fn update_shared_validators(
validators: &Arc<RwLock<Validators>>,
) -> Result<HashSet<PeerId>, String> {
let session_changes = {
let validators = validators.read().await;
Validators::session_changes(validators.serai.clone(), validators.sessions.clone()).await?
};
Ok(validators.write().await.incorporate_session_changes(session_changes))
}