2025-01-07 18:09:25 -05:00
|
|
|
use core::{borrow::Borrow, future::Future};
|
2025-01-05 00:29:11 -05:00
|
|
|
use std::{
|
|
|
|
|
sync::Arc,
|
|
|
|
|
collections::{HashSet, HashMap},
|
|
|
|
|
};
|
2025-01-04 22:21:23 -05:00
|
|
|
|
2025-11-16 11:50:24 -05:00
|
|
|
use serai_client_serai::abi::primitives::{network_id::ExternalNetworkId, validator_sets::Session};
|
|
|
|
|
use serai_client_serai::{RpcError, Serai};
|
2025-01-04 22:21:23 -05:00
|
|
|
|
2025-01-07 18:09:25 -05:00
|
|
|
use serai_task::{Task, ContinuallyRan};
|
|
|
|
|
|
2025-01-04 22:21:23 -05:00
|
|
|
use libp2p::PeerId;
|
|
|
|
|
|
2025-01-05 00:17:05 -05:00
|
|
|
use futures_util::stream::{StreamExt, FuturesUnordered};
|
2025-01-08 23:54:27 -05:00
|
|
|
use tokio::sync::{mpsc, RwLock};
|
2025-01-05 00:17:05 -05:00
|
|
|
|
2025-01-09 01:26:25 -05:00
|
|
|
use crate::peer_id_from_public;
|
2025-01-04 22:21:23 -05:00
|
|
|
|
2025-01-08 23:54:27 -05:00
|
|
|
pub(crate) struct Changes {
|
|
|
|
|
pub(crate) removed: HashSet<PeerId>,
|
|
|
|
|
pub(crate) added: HashSet<PeerId>,
|
|
|
|
|
}
|
|
|
|
|
|
2025-01-04 22:21:23 -05:00
|
|
|
pub(crate) struct Validators {
|
2025-01-10 01:20:26 -05:00
|
|
|
serai: Arc<Serai>,
|
2025-01-04 22:21:23 -05:00
|
|
|
|
|
|
|
|
// A cache for which session we're populated with the validators of
|
2025-01-30 03:14:24 -05:00
|
|
|
sessions: HashMap<ExternalNetworkId, Session>,
|
2025-01-04 22:21:23 -05:00
|
|
|
// The validators by network
|
2025-01-30 03:14:24 -05:00
|
|
|
by_network: HashMap<ExternalNetworkId, HashSet<PeerId>>,
|
2025-01-04 23:28:29 -05:00
|
|
|
// The validators and their networks
|
2025-01-30 03:14:24 -05:00
|
|
|
validators: HashMap<PeerId, HashSet<ExternalNetworkId>>,
|
2025-01-08 23:54:27 -05:00
|
|
|
|
|
|
|
|
// The channel to send the changes down
|
|
|
|
|
changes: mpsc::UnboundedSender<Changes>,
|
2025-01-04 22:21:23 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl Validators {
|
2025-01-10 01:20:26 -05:00
|
|
|
pub(crate) fn new(serai: Arc<Serai>) -> (Self, mpsc::UnboundedReceiver<Changes>) {
|
2025-01-08 23:54:27 -05:00
|
|
|
let (send, recv) = mpsc::unbounded_channel();
|
|
|
|
|
let validators = Validators {
|
2025-01-07 18:16:34 -05:00
|
|
|
serai,
|
|
|
|
|
sessions: HashMap::new(),
|
|
|
|
|
by_network: HashMap::new(),
|
|
|
|
|
validators: HashMap::new(),
|
2025-01-08 23:54:27 -05:00
|
|
|
changes: send,
|
|
|
|
|
};
|
|
|
|
|
(validators, recv)
|
2025-01-07 18:16:34 -05:00
|
|
|
}
|
|
|
|
|
|
2025-01-05 00:29:11 -05:00
|
|
|
async fn session_changes(
|
|
|
|
|
serai: impl Borrow<Serai>,
|
2025-01-30 03:14:24 -05:00
|
|
|
sessions: impl Borrow<HashMap<ExternalNetworkId, Session>>,
|
2025-11-16 11:50:24 -05:00
|
|
|
) -> Result<Vec<(ExternalNetworkId, Session, HashSet<PeerId>)>, RpcError> {
|
2025-01-15 21:00:50 -05:00
|
|
|
/*
|
|
|
|
|
This uses the latest finalized block, not the latest cosigned block, which should be fine as
|
|
|
|
|
in the worst case, we'd connect to unexpected validators. They still shouldn't be able to
|
|
|
|
|
bypass the cosign protocol unless a historical global session was malicious, in which case
|
|
|
|
|
the cosign protocol already breaks.
|
|
|
|
|
|
|
|
|
|
Besides, we can't connect to historical validators, only the current validators.
|
|
|
|
|
*/
|
2025-01-12 18:29:08 -05:00
|
|
|
let temporal_serai = serai.borrow().as_of_latest_finalized_block().await?;
|
2025-01-04 22:21:23 -05:00
|
|
|
let temporal_serai = temporal_serai.validator_sets();
|
2025-01-05 00:17:05 -05:00
|
|
|
|
|
|
|
|
let mut session_changes = vec![];
|
|
|
|
|
{
|
|
|
|
|
// FuturesUnordered can be bad practice as it'll cause timeouts if infrequently polled, but
|
|
|
|
|
// we poll it till it yields all futures with the most minimal processing possible
|
|
|
|
|
let mut futures = FuturesUnordered::new();
|
2025-11-16 11:50:24 -05:00
|
|
|
for network in ExternalNetworkId::all() {
|
2025-01-05 00:29:11 -05:00
|
|
|
let sessions = sessions.borrow();
|
2025-11-16 11:50:24 -05:00
|
|
|
let temporal_serai = temporal_serai.borrow();
|
2025-01-05 00:17:05 -05:00
|
|
|
futures.push(async move {
|
2025-11-16 11:50:24 -05:00
|
|
|
let session = match temporal_serai.current_session(network.into()).await {
|
2025-01-05 00:17:05 -05:00
|
|
|
Ok(Some(session)) => session,
|
|
|
|
|
Ok(None) => return Ok(None),
|
2025-01-12 18:29:08 -05:00
|
|
|
Err(e) => return Err(e),
|
2025-01-05 00:17:05 -05:00
|
|
|
};
|
|
|
|
|
|
|
|
|
|
if sessions.get(&network) == Some(&session) {
|
|
|
|
|
Ok(None)
|
2025-01-04 23:28:29 -05:00
|
|
|
} else {
|
2025-11-16 11:50:24 -05:00
|
|
|
match temporal_serai.current_validators(network.into()).await {
|
|
|
|
|
Ok(Some(validators)) => Ok(Some((
|
2025-01-05 00:29:11 -05:00
|
|
|
network,
|
|
|
|
|
session,
|
2025-11-16 11:50:24 -05:00
|
|
|
validators
|
|
|
|
|
.into_iter()
|
|
|
|
|
.map(|validator| peer_id_from_public(validator.into()))
|
|
|
|
|
.collect(),
|
2025-01-05 00:29:11 -05:00
|
|
|
))),
|
2025-11-16 11:50:24 -05:00
|
|
|
Ok(None) => panic!("network has session yet no validators"),
|
2025-01-12 18:29:08 -05:00
|
|
|
Err(e) => Err(e),
|
2025-01-05 00:17:05 -05:00
|
|
|
}
|
2025-01-04 22:21:23 -05:00
|
|
|
}
|
2025-01-05 00:17:05 -05:00
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
while let Some(session_change) = futures.next().await {
|
|
|
|
|
if let Some(session_change) = session_change? {
|
|
|
|
|
session_changes.push(session_change);
|
2025-01-04 22:21:23 -05:00
|
|
|
}
|
2025-01-05 00:17:05 -05:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-01-05 00:29:11 -05:00
|
|
|
Ok(session_changes)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn incorporate_session_changes(
|
|
|
|
|
&mut self,
|
2025-01-30 03:14:24 -05:00
|
|
|
session_changes: Vec<(ExternalNetworkId, Session, HashSet<PeerId>)>,
|
2025-01-07 16:34:19 -05:00
|
|
|
) {
|
2025-01-08 23:54:27 -05:00
|
|
|
let mut removed = HashSet::new();
|
|
|
|
|
let mut added = HashSet::new();
|
|
|
|
|
|
2025-01-05 00:29:11 -05:00
|
|
|
for (network, session, validators) in session_changes {
|
2025-01-05 00:17:05 -05:00
|
|
|
// Remove the existing validators
|
|
|
|
|
for validator in self.by_network.remove(&network).unwrap_or_else(HashSet::new) {
|
2025-01-07 16:34:19 -05:00
|
|
|
// Get all networks this validator is in
|
2025-01-05 00:17:05 -05:00
|
|
|
let mut networks = self.validators.remove(&validator).unwrap();
|
2025-01-07 16:34:19 -05:00
|
|
|
// Remove this one
|
2025-01-05 00:17:05 -05:00
|
|
|
networks.remove(&network);
|
2025-01-07 16:34:19 -05:00
|
|
|
if !networks.is_empty() {
|
2025-01-08 23:54:27 -05:00
|
|
|
// Insert the networks back if the validator was present in other networks
|
2025-01-05 00:17:05 -05:00
|
|
|
self.validators.insert(validator, networks);
|
2025-01-08 23:54:27 -05:00
|
|
|
} else {
|
|
|
|
|
// Because this validator is no longer present in any network, mark them as removed
|
2025-01-09 00:16:45 -05:00
|
|
|
/*
|
|
|
|
|
This isn't accurate. The validator isn't present in the latest session for this
|
|
|
|
|
network. The validator was present in the prior session which has yet to retire. Our
|
|
|
|
|
lack of explicit inclusion for both the prior session and the current session causes
|
|
|
|
|
only the validators mutually present in both sessions to be responsible for all actions
|
|
|
|
|
still ongoing as the prior validator set retires.
|
|
|
|
|
|
|
|
|
|
TODO: Fix this
|
|
|
|
|
*/
|
2025-01-08 23:54:27 -05:00
|
|
|
removed.insert(validator);
|
2025-01-04 22:21:23 -05:00
|
|
|
}
|
2025-01-05 00:17:05 -05:00
|
|
|
}
|
2025-01-04 22:21:23 -05:00
|
|
|
|
2025-01-05 00:17:05 -05:00
|
|
|
// Add the new validators
|
|
|
|
|
for validator in validators.iter().copied() {
|
|
|
|
|
self.validators.entry(validator).or_insert_with(HashSet::new).insert(network);
|
2025-01-08 23:54:27 -05:00
|
|
|
added.insert(validator);
|
2025-01-04 22:21:23 -05:00
|
|
|
}
|
2025-01-05 00:17:05 -05:00
|
|
|
self.by_network.insert(network, validators);
|
|
|
|
|
|
|
|
|
|
// Update the session we have populated
|
|
|
|
|
self.sessions.insert(network, session);
|
2025-01-04 22:21:23 -05:00
|
|
|
}
|
2025-01-08 23:54:27 -05:00
|
|
|
|
|
|
|
|
// Only flag validators for removal if they weren't simultaneously added by these changes
|
|
|
|
|
removed.retain(|validator| !added.contains(validator));
|
|
|
|
|
// Send the changes, dropping the error
|
|
|
|
|
// This lets the caller opt-out of change notifications by dropping the receiver
|
|
|
|
|
let _: Result<_, _> = self.changes.send(Changes { removed, added });
|
2025-01-05 00:29:11 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Update the view of the validators.
|
2025-11-16 11:50:24 -05:00
|
|
|
pub(crate) async fn update(&mut self) -> Result<(), RpcError> {
|
2025-01-10 01:20:26 -05:00
|
|
|
let session_changes = Self::session_changes(&*self.serai, &self.sessions).await?;
|
2025-01-07 16:34:19 -05:00
|
|
|
self.incorporate_session_changes(session_changes);
|
|
|
|
|
Ok(())
|
2025-01-04 22:21:23 -05:00
|
|
|
}
|
|
|
|
|
|
2025-01-30 03:14:24 -05:00
|
|
|
pub(crate) fn by_network(&self) -> &HashMap<ExternalNetworkId, HashSet<PeerId>> {
|
2025-01-04 22:21:23 -05:00
|
|
|
&self.by_network
|
|
|
|
|
}
|
|
|
|
|
|
2025-01-30 03:14:24 -05:00
|
|
|
pub(crate) fn networks(&self, peer_id: &PeerId) -> Option<&HashSet<ExternalNetworkId>> {
|
2025-01-04 23:28:29 -05:00
|
|
|
self.validators.get(peer_id)
|
2025-01-04 22:21:23 -05:00
|
|
|
}
|
|
|
|
|
}
|
2025-01-05 00:29:11 -05:00
|
|
|
|
2025-01-07 18:09:25 -05:00
|
|
|
/// A task which updates a set of validators.
|
2025-01-07 15:36:06 -05:00
|
|
|
///
|
2025-01-07 18:09:25 -05:00
|
|
|
/// The validators managed by this tak will have their exclusive lock held for a minimal amount of
|
|
|
|
|
/// time while the update occurs to minimize the disruption to the services relying on it.
|
|
|
|
|
pub(crate) struct UpdateValidatorsTask {
|
|
|
|
|
validators: Arc<RwLock<Validators>>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl UpdateValidatorsTask {
|
|
|
|
|
/// Spawn a new instance of the UpdateValidatorsTask.
|
|
|
|
|
///
|
|
|
|
|
/// This returns a reference to the Validators it updates after spawning itself.
|
2025-01-10 01:20:26 -05:00
|
|
|
pub(crate) fn spawn(
|
|
|
|
|
serai: Arc<Serai>,
|
|
|
|
|
) -> (Arc<RwLock<Validators>>, mpsc::UnboundedReceiver<Changes>) {
|
2025-01-07 18:09:25 -05:00
|
|
|
// The validators which will be updated
|
2025-01-08 23:54:27 -05:00
|
|
|
let (validators, changes) = Validators::new(serai);
|
|
|
|
|
let validators = Arc::new(RwLock::new(validators));
|
2025-01-07 18:09:25 -05:00
|
|
|
|
|
|
|
|
// Define the task
|
|
|
|
|
let (update_validators_task, update_validators_task_handle) = Task::new();
|
|
|
|
|
// Forget the handle, as dropping the handle would stop the task
|
|
|
|
|
core::mem::forget(update_validators_task_handle);
|
|
|
|
|
// Spawn the task
|
|
|
|
|
tokio::spawn(
|
|
|
|
|
(Self { validators: validators.clone() }).continually_run(update_validators_task, vec![]),
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// Return the validators
|
2025-01-08 23:54:27 -05:00
|
|
|
(validators, changes)
|
2025-01-07 18:09:25 -05:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl ContinuallyRan for UpdateValidatorsTask {
|
|
|
|
|
// Only run every minute, not the default of every five seconds
|
|
|
|
|
const DELAY_BETWEEN_ITERATIONS: u64 = 60;
|
|
|
|
|
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60;
|
|
|
|
|
|
2025-11-16 11:50:24 -05:00
|
|
|
type Error = RpcError;
|
2025-01-12 18:29:08 -05:00
|
|
|
|
|
|
|
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
2025-01-07 18:09:25 -05:00
|
|
|
async move {
|
|
|
|
|
let session_changes = {
|
|
|
|
|
let validators = self.validators.read().await;
|
2025-01-12 18:29:08 -05:00
|
|
|
Validators::session_changes(validators.serai.clone(), validators.sessions.clone()).await?
|
2025-01-07 18:09:25 -05:00
|
|
|
};
|
|
|
|
|
self.validators.write().await.incorporate_session_changes(session_changes);
|
|
|
|
|
Ok(true)
|
|
|
|
|
}
|
|
|
|
|
}
|
2025-01-05 00:29:11 -05:00
|
|
|
}
|