9 Commits

Author SHA1 Message Date
Luke Parker
c6d0fb477c Inline noise into OnlyValidators
libp2p does support (noise, OnlyValidators) but it'll interpret it as either,
not a chain. This will act as the desired chain.
2025-01-05 00:55:25 -05:00
Luke Parker
96518500b1 Don't hold the shared Validators write lock while making requests to Serai 2025-01-05 00:29:11 -05:00
Luke Parker
2b8f481364 Parallelize requests within Validators::update 2025-01-05 00:17:05 -05:00
Luke Parker
479ca0410a Add commentary on the use of FuturesOrdered 2025-01-04 23:28:54 -05:00
Luke Parker
9a5a661d04 Start on the task to manage the swarm 2025-01-04 23:28:29 -05:00
Luke Parker
3daeea09e6 Only let active Serai validators connect over P2P 2025-01-04 22:21:23 -05:00
Luke Parker
a64e2004ab Dial new peers when we don't have the target amount 2025-01-04 18:04:24 -05:00
Luke Parker
f9f6d40695 Use Serai validator keys as PeerIds 2025-01-04 18:03:37 -05:00
Luke Parker
4836c1676b Don't consider the Serai set in the cosigning protocol
The Serai set SHOULD be banned from setting keys so this SHOULD be unreachable.
It's now explicitly unreachable.
2025-01-04 13:52:17 -05:00
12 changed files with 640 additions and 10 deletions

1
Cargo.lock generated
View File

@@ -8327,6 +8327,7 @@ dependencies = [
"parity-scale-codec",
"rand_core",
"schnorr-signatures",
"schnorrkel",
"serai-client",
"serai-cosign",
"serai-db",

View File

@@ -25,6 +25,7 @@ bitvec = { version = "1", default-features = false, features = ["std"] }
rand_core = { version = "0.6", default-features = false, features = ["std"] }
blake2 = { version = "0.10", default-features = false, features = ["std"] }
schnorrkel = { version = "0.11", default-features = false, features = ["std"] }
transcript = { package = "flexible-transcript", path = "../crypto/transcript", default-features = false, features = ["std", "recommended"] }
ciphersuite = { path = "../crypto/ciphersuite", default-features = false, features = ["std"] }

View File

@@ -29,7 +29,7 @@ pub use delay::BROADCAST_FREQUENCY;
use delay::LatestCosignedBlockNumber;
/// The schnorrkel context to used when signing a cosign.
pub const COSIGN_CONTEXT: &[u8] = b"serai-cosign";
pub const COSIGN_CONTEXT: &[u8] = b"/serai/coordinator/cosign";
/// A 'global session', defined as all validator sets used for cosigning at a given moment.
///
@@ -161,6 +161,11 @@ async fn keys_for_network(
serai: &TemporalSerai<'_>,
network: NetworkId,
) -> Result<Option<(Session, KeyPair)>, String> {
// The Serai network never cosigns so it has no keys for cosigning
if network == NetworkId::Serai {
return Ok(None);
}
let Some(latest_session) =
serai.validator_sets().session(network).await.map_err(|e| format!("{e:?}"))?
else {

View File

@@ -0,0 +1,183 @@
use core::{pin::Pin, future::Future};
use std::{sync::Arc, io};
use zeroize::Zeroizing;
use rand_core::{RngCore, OsRng};
use blake2::{Digest, Blake2s256};
use schnorrkel::{Keypair, PublicKey, Signature};
use serai_client::primitives::PublicKey as Public;
use tokio::sync::RwLock;
use futures_util::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use libp2p::{
core::UpgradeInfo,
InboundUpgrade, OutboundUpgrade,
identity::{self, PeerId},
noise,
};
use crate::p2p::{validators::Validators, peer_id_from_public};
const PROTOCOL: &str = "/serai/coordinator/validators";
struct OnlyValidators {
validators: Arc<RwLock<Validators>>,
serai_key: Zeroizing<Keypair>,
noise_keypair: identity::Keypair,
}
impl OnlyValidators {
/// The ephemeral challenge protocol for authentication.
///
/// We use ephemeral challenges to prevent replaying signatures from historic sessions.
///
/// We don't immediately send the challenge. We only send a commitment to it. This prevents our
/// remote peer from choosing their challenge in response to our challenge, in case there was any
/// benefit to doing so.
async fn challenges<S: 'static + Send + Unpin + AsyncRead + AsyncWrite>(
socket: &mut noise::Output<S>,
) -> io::Result<([u8; 32], [u8; 32])> {
let mut our_challenge = [0; 32];
OsRng.fill_bytes(&mut our_challenge);
// Write the hash of our challenge
socket.write_all(&Blake2s256::digest(our_challenge)).await?;
// Read the hash of their challenge
let mut their_challenge_commitment = [0; 32];
socket.read_exact(&mut their_challenge_commitment).await?;
// Reveal our challenge
socket.write_all(&our_challenge).await?;
// Read their challenge
let mut their_challenge = [0; 32];
socket.read_exact(&mut their_challenge).await?;
// Verify their challenge
if <[u8; 32]>::from(Blake2s256::digest(their_challenge)) != their_challenge_commitment {
Err(io::Error::other("challenge didn't match challenge commitment"))?;
}
Ok((our_challenge, their_challenge))
}
// We sign the two noise peer IDs and the ephemeral challenges.
//
// Signing the noise peer IDs ensures we're authenticating this noise connection. The only
// expectations placed on noise are for it to prevent a MITM from impersonating the other end or
// modifying any messages sent.
//
// Signing the ephemeral challenges prevents any replays. While that should be unnecessary, as
// noise MAY prevent replays across sessions (even when the same key is used), and noise IDs
// shouldn't be reused (so it should be fine to reuse an existing signature for these noise IDs),
// it doesn't hurt.
async fn authenticate<S: 'static + Send + Unpin + AsyncRead + AsyncWrite>(
&self,
socket: &mut noise::Output<S>,
dialer_peer_id: PeerId,
dialer_challenge: [u8; 32],
listener_peer_id: PeerId,
listener_challenge: [u8; 32],
) -> io::Result<PeerId> {
// Write our public key
socket.write_all(&self.serai_key.public.to_bytes()).await?;
let msg = borsh::to_vec(&(
dialer_peer_id.to_bytes(),
dialer_challenge,
listener_peer_id.to_bytes(),
listener_challenge,
))
.unwrap();
let signature = self.serai_key.sign_simple(PROTOCOL.as_bytes(), &msg);
socket.write_all(&signature.to_bytes()).await?;
let mut public_key_and_sig = [0; 96];
socket.read_exact(&mut public_key_and_sig).await?;
let public_key = PublicKey::from_bytes(&public_key_and_sig[.. 32])
.map_err(|_| io::Error::other("invalid public key"))?;
let sig = Signature::from_bytes(&public_key_and_sig[32 ..])
.map_err(|_| io::Error::other("invalid signature serialization"))?;
public_key
.verify_simple(PROTOCOL.as_bytes(), &msg, &sig)
.map_err(|_| io::Error::other("invalid signature"))?;
let peer_id = peer_id_from_public(Public::from_raw(public_key.to_bytes()));
if !self.validators.read().await.contains(&peer_id) {
Err(io::Error::other("peer which tried to connect isn't a known active validator"))?;
}
Ok(peer_id)
}
}
impl UpgradeInfo for OnlyValidators {
type Info = <noise::Config as UpgradeInfo>::Info;
type InfoIter = <noise::Config as UpgradeInfo>::InfoIter;
fn protocol_info(&self) -> Self::InfoIter {
// A keypair only causes an error if its sign operation fails, which is only possible with RSA,
// which isn't used within this codebase
noise::Config::new(&self.noise_keypair).unwrap().protocol_info()
}
}
impl<S: 'static + Send + Unpin + AsyncRead + AsyncWrite> InboundUpgrade<S> for OnlyValidators {
type Output = (PeerId, noise::Output<S>);
type Error = io::Error;
type Future = Pin<Box<dyn Send + Future<Output = Result<Self::Output, Self::Error>>>>;
fn upgrade_inbound(self, socket: S, info: Self::Info) -> Self::Future {
Box::pin(async move {
let (dialer_noise_peer_id, mut socket) = noise::Config::new(&self.noise_keypair)
.unwrap()
.upgrade_inbound(socket, info)
.await
.map_err(io::Error::other)?;
let (our_challenge, dialer_challenge) = OnlyValidators::challenges(&mut socket).await?;
let dialer_serai_validator = self
.authenticate(
&mut socket,
dialer_noise_peer_id,
dialer_challenge,
PeerId::from_public_key(&self.noise_keypair.public()),
our_challenge,
)
.await?;
Ok((dialer_serai_validator, socket))
})
}
}
impl<S: 'static + Send + Unpin + AsyncRead + AsyncWrite> OutboundUpgrade<S> for OnlyValidators {
type Output = (PeerId, noise::Output<S>);
type Error = io::Error;
type Future = Pin<Box<dyn Send + Future<Output = Result<Self::Output, Self::Error>>>>;
fn upgrade_outbound(self, socket: S, info: Self::Info) -> Self::Future {
Box::pin(async move {
let (listener_noise_peer_id, mut socket) = noise::Config::new(&self.noise_keypair)
.unwrap()
.upgrade_outbound(socket, info)
.await
.map_err(io::Error::other)?;
let (our_challenge, listener_challenge) = OnlyValidators::challenges(&mut socket).await?;
let listener_serai_validator = self
.authenticate(
&mut socket,
PeerId::from_public_key(&self.noise_keypair.public()),
our_challenge,
listener_noise_peer_id,
listener_challenge,
)
.await?;
Ok((listener_serai_validator, socket))
})
}
}

106
coordinator/src/p2p/dial.rs Normal file
View File

@@ -0,0 +1,106 @@
use core::future::Future;
use std::collections::HashSet;
use rand_core::{RngCore, OsRng};
use tokio::sync::mpsc;
use serai_client::Serai;
use libp2p::{
core::multiaddr::{Protocol, Multiaddr},
swarm::dial_opts::DialOpts,
};
use serai_task::ContinuallyRan;
use crate::p2p::{PORT, Peers, validators::Validators};
const TARGET_PEERS_PER_NETWORK: usize = 5;
struct DialTask {
serai: Serai,
validators: Validators,
peers: Peers,
to_dial: mpsc::UnboundedSender<DialOpts>,
}
impl ContinuallyRan for DialTask {
// Only run every five minutes, not the default of every five seconds
const DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60;
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 10 * 60;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move {
self.validators.update().await?;
// If any of our peers is lacking, try to connect to more
let mut dialed = false;
let peer_counts = self
.peers
.peers
.read()
.await
.iter()
.map(|(network, peers)| (*network, peers.len()))
.collect::<Vec<_>>();
for (network, peer_count) in peer_counts {
/*
If we don't have the target amount of peers, and we don't have all the validators in the
set but one, attempt to connect to more validators within this set.
The latter clause is so if there's a set with only 3 validators, we don't infinitely try
to connect to the target amount of peers for this network as we never will. Instead, we
only try to connect to most of the validators actually present.
*/
if (peer_count < TARGET_PEERS_PER_NETWORK) &&
(peer_count <
self
.validators
.by_network()
.get(&network)
.map(HashSet::len)
.unwrap_or(0)
.saturating_sub(1))
{
let mut potential_peers =
self.serai.p2p_validators(network).await.map_err(|e| format!("{e:?}"))?;
for _ in 0 .. (TARGET_PEERS_PER_NETWORK - peer_count) {
if potential_peers.is_empty() {
break;
}
let index_to_dial =
usize::try_from(OsRng.next_u64() % u64::try_from(potential_peers.len()).unwrap())
.unwrap();
let randomly_selected_peer = potential_peers.swap_remove(index_to_dial);
log::info!("found peer from substrate: {randomly_selected_peer}");
// Map the peer from a Substrate P2P network peer to a Coordinator P2P network peer
let mapped_peer = randomly_selected_peer
.into_iter()
.filter_map(|protocol| match protocol {
// Drop PeerIds from the Substrate P2p network
Protocol::P2p(_) => None,
// Use our own TCP port
Protocol::Tcp(_) => Some(Protocol::Tcp(PORT)),
// Pass-through any other specifications (IPv4, IPv6, etc)
other => Some(other),
})
.collect::<Multiaddr>();
log::debug!("mapped found peer: {mapped_peer}");
self
.to_dial
.send(DialOpts::unknown_peer_id().address(mapped_peer).build())
.expect("dial receiver closed?");
dialed = true;
}
}
}
Ok(dialed)
}
}
}

View File

@@ -59,12 +59,11 @@ pub(crate) fn new_behavior() -> Behavior {
})
.build();
// TODO: Don't use IdentityTransform here. Authenticate using validator keys
let mut gossipsub = Behavior::new(MessageAuthenticity::Anonymous, config.unwrap()).unwrap();
let mut gossip = Behavior::new(MessageAuthenticity::Anonymous, config.unwrap()).unwrap();
// Subscribe to the base topic
let topic = IdentTopic::new(BASE_TOPIC);
let _ = gossipsub.subscribe(&topic);
let _ = gossip.subscribe(&topic);
gossipsub
gossip
}

View File

@@ -1,12 +1,48 @@
use serai_client::primitives::NetworkId;
use std::{
sync::Arc,
collections::{HashSet, HashMap},
time::{Duration, Instant},
};
use serai_client::primitives::{NetworkId, PublicKey};
use tokio::sync::{mpsc, RwLock};
use futures_util::StreamExt;
use libp2p::{
multihash::Multihash,
identity::PeerId,
swarm::{dial_opts::DialOpts, NetworkBehaviour, SwarmEvent, Swarm},
};
/// A struct to sync the validators from the Serai node in order to keep track of them.
mod validators;
use validators::{Validators, update_shared_validators};
/// The authentication protocol upgrade to limit the P2P network to active validators.
mod authenticate;
/// The dial task, to find new peers to connect to
mod dial;
/// The request-response messages and behavior
mod reqres;
use reqres::{Request, Response};
/// The gossip messages and behavior
mod gossip;
/// The heartbeat task, effecting sync of Tributaries
mod heartbeat;
const PORT: u16 = 30563; // 5132 ^ (('c' << 8) | 'o')
fn peer_id_from_public(public: PublicKey) -> PeerId {
// 0 represents the identity Multihash, that no hash was performed
// It's an internal constant so we can't refer to the constant inside libp2p
PeerId::from_multihash(Multihash::wrap(0, &public.0).unwrap()).unwrap()
}
struct Peer;
impl Peer {
async fn send(&self, request: Request) -> Result<Response, tokio::time::error::Elapsed> {
@@ -14,6 +50,11 @@ impl Peer {
}
}
#[derive(Clone)]
struct Peers {
peers: Arc<RwLock<HashMap<NetworkId, HashSet<PeerId>>>>,
}
#[derive(Clone, Debug)]
struct P2p;
impl P2p {
@@ -28,3 +69,140 @@ impl tributary::P2p for P2p {
todo!("TODO")
}
}
#[derive(NetworkBehaviour)]
struct Behavior {
reqres: reqres::Behavior,
gossip: gossip::Behavior,
}
struct SwarmTask {
to_dial: mpsc::UnboundedReceiver<DialOpts>,
validators: Arc<RwLock<Validators>>,
last_refreshed_validators: Instant,
next_refresh_validators: Instant,
peers: Peers,
rebuild_peers_at: Instant,
swarm: Swarm<Behavior>,
}
impl SwarmTask {
async fn run(mut self) {
loop {
let time_till_refresh_validators =
self.next_refresh_validators.saturating_duration_since(Instant::now());
let time_till_rebuild_peers = self.rebuild_peers_at.saturating_duration_since(Instant::now());
tokio::select! {
biased;
// Refresh the instance of validators we use to track peers/share with authenticate
// TODO: Move this to a task
() = tokio::time::sleep(time_till_refresh_validators) => {
const TIME_BETWEEN_REFRESH_VALIDATORS: Duration = Duration::from_secs(60);
const MAX_TIME_BETWEEN_REFRESH_VALIDATORS: Duration = Duration::from_secs(5 * 60);
let update = update_shared_validators(&self.validators).await;
match update {
Ok(removed) => {
for removed in removed {
let _: Result<_, _> = self.swarm.disconnect_peer_id(removed);
}
self.last_refreshed_validators = Instant::now();
self.next_refresh_validators = Instant::now() + TIME_BETWEEN_REFRESH_VALIDATORS;
}
Err(e) => {
log::warn!("couldn't refresh validators: {e:?}");
// Increase the delay before the next refresh by using the time since the last
// refresh. This will be 5 seconds, then 5 seconds, then 10 seconds, then 20...
let time_since_last = self
.next_refresh_validators
.saturating_duration_since(self.last_refreshed_validators);
// But limit the delay
self.next_refresh_validators =
Instant::now() + time_since_last.min(MAX_TIME_BETWEEN_REFRESH_VALIDATORS);
},
}
}
// Rebuild the peers every 10 minutes
//
// This handles edge cases such as when a validator changes the networks they're present
// in, race conditions, or any other edge cases/quirks which would otherwise risk spiraling
// out of control
() = tokio::time::sleep(time_till_rebuild_peers) => {
const TIME_BETWEEN_REBUILD_PEERS: Duration = Duration::from_secs(10 * 60);
let validators_by_network = self.validators.read().await.by_network().clone();
let connected = self.swarm.connected_peers().copied().collect::<HashSet<_>>();
let mut peers = HashMap::new();
for (network, validators) in validators_by_network {
peers.insert(network, validators.intersection(&connected).copied().collect());
}
*self.peers.peers.write().await = peers;
self.rebuild_peers_at = Instant::now() + TIME_BETWEEN_REBUILD_PEERS;
}
// Dial peers we're instructed to
dial_opts = self.to_dial.recv() => {
let dial_opts = dial_opts.expect("DialTask was closed?");
let _: Result<_, _> = self.swarm.dial(dial_opts);
}
// Handle swarm events
event = self.swarm.next() => {
// `Swarm::next` will never return `Poll::Ready(None)`
// https://docs.rs/
// libp2p/0.54.1/libp2p/struct.Swarm.html#impl-Stream-for-Swarm%3CTBehaviour%3E
let event = event.unwrap();
match event {
SwarmEvent::Behaviour(BehaviorEvent::Reqres(event)) => todo!("TODO"),
SwarmEvent::Behaviour(BehaviorEvent::Gossip(event)) => todo!("TODO"),
// New connection, so update peers
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
let Some(networks) =
self.validators.read().await.networks(&peer_id).cloned() else { continue };
for network in networks {
self
.peers
.peers
.write()
.await
.entry(network)
.or_insert_with(HashSet::new)
.insert(peer_id);
}
},
// Connection closed, so update peers
SwarmEvent::ConnectionClosed { peer_id, .. } => {
let Some(networks) =
self.validators.read().await.networks(&peer_id).cloned() else { continue };
for network in networks {
self
.peers
.peers
.write()
.await
.entry(network)
.or_insert_with(HashSet::new)
.remove(&peer_id);
}
},
SwarmEvent::IncomingConnection { .. } |
SwarmEvent::IncomingConnectionError { .. } |
SwarmEvent::OutgoingConnectionError { .. } |
SwarmEvent::NewListenAddr { .. } |
SwarmEvent::ExpiredListenAddr { .. } |
SwarmEvent::ListenerClosed { .. } |
SwarmEvent::ListenerError { .. } |
SwarmEvent::Dialing { .. } => {}
}
}
}
}
}
}

View File

@@ -1,5 +1,5 @@
use core::time::Duration;
use std::io::{self, Read};
use core::{fmt, time::Duration};
use std::io;
use async_trait::async_trait;
@@ -46,6 +46,15 @@ pub(crate) enum Response {
Blocks(Vec<TributaryBlockWithCommit>),
NotableCosigns(Vec<SignedCosign>),
}
impl fmt::Debug for Response {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
(match self {
Response::Blocks(_) => fmt.debug_struct("Response::Block"),
Response::NotableCosigns(_) => fmt.debug_struct("Response::NotableCosigns"),
})
.finish_non_exhaustive()
}
}
/// The codec used for the request-response protocol.
///
@@ -53,7 +62,7 @@ pub(crate) enum Response {
/// ideally, we'd use borsh directly with the `io` traits defined here, they're async and there
/// isn't an amenable API within borsh for incremental deserialization.
#[derive(Default, Clone, Copy, Debug)]
struct Codec;
pub(crate) struct Codec;
impl Codec {
async fn read<M: BorshDeserialize>(io: &mut (impl Unpin + AsyncRead)) -> io::Result<M> {
let mut len = [0; 4];

View File

@@ -0,0 +1,140 @@
use core::borrow::Borrow;
use std::{
sync::Arc,
collections::{HashSet, HashMap},
};
use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, Serai};
use libp2p::PeerId;
use futures_util::stream::{StreamExt, FuturesUnordered};
use tokio::sync::RwLock;
use crate::p2p::peer_id_from_public;
pub(crate) struct Validators {
serai: Serai,
// A cache for which session we're populated with the validators of
sessions: HashMap<NetworkId, Session>,
// The validators by network
by_network: HashMap<NetworkId, HashSet<PeerId>>,
// The validators and their networks
validators: HashMap<PeerId, HashSet<NetworkId>>,
}
impl Validators {
async fn session_changes(
serai: impl Borrow<Serai>,
sessions: impl Borrow<HashMap<NetworkId, Session>>,
) -> Result<Vec<(NetworkId, Session, HashSet<PeerId>)>, String> {
let temporal_serai =
serai.borrow().as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?;
let temporal_serai = temporal_serai.validator_sets();
let mut session_changes = vec![];
{
// FuturesUnordered can be bad practice as it'll cause timeouts if infrequently polled, but
// we poll it till it yields all futures with the most minimal processing possible
let mut futures = FuturesUnordered::new();
for network in serai_client::primitives::NETWORKS {
if network == NetworkId::Serai {
continue;
}
let sessions = sessions.borrow();
futures.push(async move {
let session = match temporal_serai.session(network).await {
Ok(Some(session)) => session,
Ok(None) => return Ok(None),
Err(e) => return Err(format!("{e:?}")),
};
if sessions.get(&network) == Some(&session) {
Ok(None)
} else {
match temporal_serai.active_network_validators(network).await {
Ok(validators) => Ok(Some((
network,
session,
validators.into_iter().map(peer_id_from_public).collect(),
))),
Err(e) => Err(format!("{e:?}")),
}
}
});
}
while let Some(session_change) = futures.next().await {
if let Some(session_change) = session_change? {
session_changes.push(session_change);
}
}
}
Ok(session_changes)
}
fn incorporate_session_changes(
&mut self,
session_changes: Vec<(NetworkId, Session, HashSet<PeerId>)>,
) -> HashSet<PeerId> {
let mut removed = HashSet::new();
for (network, session, validators) in session_changes {
// Remove the existing validators
for validator in self.by_network.remove(&network).unwrap_or_else(HashSet::new) {
let mut networks = self.validators.remove(&validator).unwrap();
networks.remove(&network);
if networks.is_empty() {
removed.insert(validator);
} else {
self.validators.insert(validator, networks);
}
}
// Add the new validators
for validator in validators.iter().copied() {
self.validators.entry(validator).or_insert_with(HashSet::new).insert(network);
}
self.by_network.insert(network, validators);
// Update the session we have populated
self.sessions.insert(network, session);
}
removed
}
/// Update the view of the validators.
///
/// Returns all validators removed from the active validator set.
pub(crate) async fn update(&mut self) -> Result<HashSet<PeerId>, String> {
let session_changes = Self::session_changes(&self.serai, &self.sessions).await?;
Ok(self.incorporate_session_changes(session_changes))
}
pub(crate) fn by_network(&self) -> &HashMap<NetworkId, HashSet<PeerId>> {
&self.by_network
}
pub(crate) fn contains(&self, peer_id: &PeerId) -> bool {
self.validators.contains_key(peer_id)
}
pub(crate) fn networks(&self, peer_id: &PeerId) -> Option<&HashSet<NetworkId>> {
self.validators.get(peer_id)
}
}
/// Update the view of the validators.
///
/// Returns all validators removed from the active validator set.
pub(crate) async fn update_shared_validators(
validators: &Arc<RwLock<Validators>>,
) -> Result<HashSet<PeerId>, String> {
let session_changes = {
let validators = validators.read().await;
Validators::session_changes(validators.serai.clone(), validators.sessions.clone()).await?
};
Ok(validators.write().await.incorporate_session_changes(session_changes))
}

View File

@@ -107,6 +107,9 @@ impl<D: Db> ContinuallyRan for CanonicalEventStream<D> {
// Sync the next set of upcoming blocks all at once to minimize latency
const BLOCKS_TO_SYNC_AT_ONCE: u64 = 10;
// FuturesOrdered can be bad practice due to potentially causing tiemouts if it isn't
// sufficiently polled. Considering our processing loop is minimal and it does poll this,
// it's fine.
let mut set = FuturesOrdered::new();
for block_number in
next_block ..= latest_finalized_block.min(next_block + BLOCKS_TO_SYNC_AT_ONCE)

View File

@@ -100,6 +100,11 @@ impl<D: Db> ContinuallyRan for EphemeralEventStream<D> {
// Sync the next set of upcoming blocks all at once to minimize latency
const BLOCKS_TO_SYNC_AT_ONCE: u64 = 50;
// FuturesOrdered can be bad practice due to potentially causing tiemouts if it isn't
// sufficiently polled. Our processing loop isn't minimal, itself making multiple requests,
// but the loop body should only be executed a few times a week. It's better to get through
// most blocks with this optimization, and have timeouts a few times a week, than not have
// this at all.
let mut set = FuturesOrdered::new();
for block_number in
next_block ..= latest_finalized_block.min(next_block + BLOCKS_TO_SYNC_AT_ONCE)

View File

@@ -65,7 +65,7 @@ where
let validators = client.runtime_api().validators(latest_block, network).map_err(|_| {
jsonrpsee::core::Error::to_call_error(std::io::Error::other(format!(
"couldn't get validators from the latest block, which is likely a fatal bug. {}",
"please report this at https://github.com/serai-dex/serai",
"please report this at https://github.com/serai-dex/serai/issues",
)))
})?;
// Always return the protocol's bootnodes