mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-12 14:09:25 +00:00
Compare commits
9 Commits
985261574c
...
c6d0fb477c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c6d0fb477c | ||
|
|
96518500b1 | ||
|
|
2b8f481364 | ||
|
|
479ca0410a | ||
|
|
9a5a661d04 | ||
|
|
3daeea09e6 | ||
|
|
a64e2004ab | ||
|
|
f9f6d40695 | ||
|
|
4836c1676b |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -8327,6 +8327,7 @@ dependencies = [
|
||||
"parity-scale-codec",
|
||||
"rand_core",
|
||||
"schnorr-signatures",
|
||||
"schnorrkel",
|
||||
"serai-client",
|
||||
"serai-cosign",
|
||||
"serai-db",
|
||||
|
||||
@@ -25,6 +25,7 @@ bitvec = { version = "1", default-features = false, features = ["std"] }
|
||||
rand_core = { version = "0.6", default-features = false, features = ["std"] }
|
||||
|
||||
blake2 = { version = "0.10", default-features = false, features = ["std"] }
|
||||
schnorrkel = { version = "0.11", default-features = false, features = ["std"] }
|
||||
|
||||
transcript = { package = "flexible-transcript", path = "../crypto/transcript", default-features = false, features = ["std", "recommended"] }
|
||||
ciphersuite = { path = "../crypto/ciphersuite", default-features = false, features = ["std"] }
|
||||
|
||||
@@ -29,7 +29,7 @@ pub use delay::BROADCAST_FREQUENCY;
|
||||
use delay::LatestCosignedBlockNumber;
|
||||
|
||||
/// The schnorrkel context to used when signing a cosign.
|
||||
pub const COSIGN_CONTEXT: &[u8] = b"serai-cosign";
|
||||
pub const COSIGN_CONTEXT: &[u8] = b"/serai/coordinator/cosign";
|
||||
|
||||
/// A 'global session', defined as all validator sets used for cosigning at a given moment.
|
||||
///
|
||||
@@ -161,6 +161,11 @@ async fn keys_for_network(
|
||||
serai: &TemporalSerai<'_>,
|
||||
network: NetworkId,
|
||||
) -> Result<Option<(Session, KeyPair)>, String> {
|
||||
// The Serai network never cosigns so it has no keys for cosigning
|
||||
if network == NetworkId::Serai {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let Some(latest_session) =
|
||||
serai.validator_sets().session(network).await.map_err(|e| format!("{e:?}"))?
|
||||
else {
|
||||
|
||||
183
coordinator/src/p2p/authenticate.rs
Normal file
183
coordinator/src/p2p/authenticate.rs
Normal file
@@ -0,0 +1,183 @@
|
||||
use core::{pin::Pin, future::Future};
|
||||
use std::{sync::Arc, io};
|
||||
|
||||
use zeroize::Zeroizing;
|
||||
use rand_core::{RngCore, OsRng};
|
||||
|
||||
use blake2::{Digest, Blake2s256};
|
||||
use schnorrkel::{Keypair, PublicKey, Signature};
|
||||
|
||||
use serai_client::primitives::PublicKey as Public;
|
||||
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use futures_util::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||
use libp2p::{
|
||||
core::UpgradeInfo,
|
||||
InboundUpgrade, OutboundUpgrade,
|
||||
identity::{self, PeerId},
|
||||
noise,
|
||||
};
|
||||
|
||||
use crate::p2p::{validators::Validators, peer_id_from_public};
|
||||
|
||||
const PROTOCOL: &str = "/serai/coordinator/validators";
|
||||
|
||||
struct OnlyValidators {
|
||||
validators: Arc<RwLock<Validators>>,
|
||||
serai_key: Zeroizing<Keypair>,
|
||||
noise_keypair: identity::Keypair,
|
||||
}
|
||||
|
||||
impl OnlyValidators {
|
||||
/// The ephemeral challenge protocol for authentication.
|
||||
///
|
||||
/// We use ephemeral challenges to prevent replaying signatures from historic sessions.
|
||||
///
|
||||
/// We don't immediately send the challenge. We only send a commitment to it. This prevents our
|
||||
/// remote peer from choosing their challenge in response to our challenge, in case there was any
|
||||
/// benefit to doing so.
|
||||
async fn challenges<S: 'static + Send + Unpin + AsyncRead + AsyncWrite>(
|
||||
socket: &mut noise::Output<S>,
|
||||
) -> io::Result<([u8; 32], [u8; 32])> {
|
||||
let mut our_challenge = [0; 32];
|
||||
OsRng.fill_bytes(&mut our_challenge);
|
||||
|
||||
// Write the hash of our challenge
|
||||
socket.write_all(&Blake2s256::digest(our_challenge)).await?;
|
||||
|
||||
// Read the hash of their challenge
|
||||
let mut their_challenge_commitment = [0; 32];
|
||||
socket.read_exact(&mut their_challenge_commitment).await?;
|
||||
|
||||
// Reveal our challenge
|
||||
socket.write_all(&our_challenge).await?;
|
||||
|
||||
// Read their challenge
|
||||
let mut their_challenge = [0; 32];
|
||||
socket.read_exact(&mut their_challenge).await?;
|
||||
|
||||
// Verify their challenge
|
||||
if <[u8; 32]>::from(Blake2s256::digest(their_challenge)) != their_challenge_commitment {
|
||||
Err(io::Error::other("challenge didn't match challenge commitment"))?;
|
||||
}
|
||||
|
||||
Ok((our_challenge, their_challenge))
|
||||
}
|
||||
|
||||
// We sign the two noise peer IDs and the ephemeral challenges.
|
||||
//
|
||||
// Signing the noise peer IDs ensures we're authenticating this noise connection. The only
|
||||
// expectations placed on noise are for it to prevent a MITM from impersonating the other end or
|
||||
// modifying any messages sent.
|
||||
//
|
||||
// Signing the ephemeral challenges prevents any replays. While that should be unnecessary, as
|
||||
// noise MAY prevent replays across sessions (even when the same key is used), and noise IDs
|
||||
// shouldn't be reused (so it should be fine to reuse an existing signature for these noise IDs),
|
||||
// it doesn't hurt.
|
||||
async fn authenticate<S: 'static + Send + Unpin + AsyncRead + AsyncWrite>(
|
||||
&self,
|
||||
socket: &mut noise::Output<S>,
|
||||
dialer_peer_id: PeerId,
|
||||
dialer_challenge: [u8; 32],
|
||||
listener_peer_id: PeerId,
|
||||
listener_challenge: [u8; 32],
|
||||
) -> io::Result<PeerId> {
|
||||
// Write our public key
|
||||
socket.write_all(&self.serai_key.public.to_bytes()).await?;
|
||||
|
||||
let msg = borsh::to_vec(&(
|
||||
dialer_peer_id.to_bytes(),
|
||||
dialer_challenge,
|
||||
listener_peer_id.to_bytes(),
|
||||
listener_challenge,
|
||||
))
|
||||
.unwrap();
|
||||
let signature = self.serai_key.sign_simple(PROTOCOL.as_bytes(), &msg);
|
||||
socket.write_all(&signature.to_bytes()).await?;
|
||||
|
||||
let mut public_key_and_sig = [0; 96];
|
||||
socket.read_exact(&mut public_key_and_sig).await?;
|
||||
let public_key = PublicKey::from_bytes(&public_key_and_sig[.. 32])
|
||||
.map_err(|_| io::Error::other("invalid public key"))?;
|
||||
let sig = Signature::from_bytes(&public_key_and_sig[32 ..])
|
||||
.map_err(|_| io::Error::other("invalid signature serialization"))?;
|
||||
|
||||
public_key
|
||||
.verify_simple(PROTOCOL.as_bytes(), &msg, &sig)
|
||||
.map_err(|_| io::Error::other("invalid signature"))?;
|
||||
|
||||
let peer_id = peer_id_from_public(Public::from_raw(public_key.to_bytes()));
|
||||
if !self.validators.read().await.contains(&peer_id) {
|
||||
Err(io::Error::other("peer which tried to connect isn't a known active validator"))?;
|
||||
}
|
||||
|
||||
Ok(peer_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl UpgradeInfo for OnlyValidators {
|
||||
type Info = <noise::Config as UpgradeInfo>::Info;
|
||||
type InfoIter = <noise::Config as UpgradeInfo>::InfoIter;
|
||||
fn protocol_info(&self) -> Self::InfoIter {
|
||||
// A keypair only causes an error if its sign operation fails, which is only possible with RSA,
|
||||
// which isn't used within this codebase
|
||||
noise::Config::new(&self.noise_keypair).unwrap().protocol_info()
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: 'static + Send + Unpin + AsyncRead + AsyncWrite> InboundUpgrade<S> for OnlyValidators {
|
||||
type Output = (PeerId, noise::Output<S>);
|
||||
type Error = io::Error;
|
||||
type Future = Pin<Box<dyn Send + Future<Output = Result<Self::Output, Self::Error>>>>;
|
||||
|
||||
fn upgrade_inbound(self, socket: S, info: Self::Info) -> Self::Future {
|
||||
Box::pin(async move {
|
||||
let (dialer_noise_peer_id, mut socket) = noise::Config::new(&self.noise_keypair)
|
||||
.unwrap()
|
||||
.upgrade_inbound(socket, info)
|
||||
.await
|
||||
.map_err(io::Error::other)?;
|
||||
|
||||
let (our_challenge, dialer_challenge) = OnlyValidators::challenges(&mut socket).await?;
|
||||
let dialer_serai_validator = self
|
||||
.authenticate(
|
||||
&mut socket,
|
||||
dialer_noise_peer_id,
|
||||
dialer_challenge,
|
||||
PeerId::from_public_key(&self.noise_keypair.public()),
|
||||
our_challenge,
|
||||
)
|
||||
.await?;
|
||||
Ok((dialer_serai_validator, socket))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: 'static + Send + Unpin + AsyncRead + AsyncWrite> OutboundUpgrade<S> for OnlyValidators {
|
||||
type Output = (PeerId, noise::Output<S>);
|
||||
type Error = io::Error;
|
||||
type Future = Pin<Box<dyn Send + Future<Output = Result<Self::Output, Self::Error>>>>;
|
||||
|
||||
fn upgrade_outbound(self, socket: S, info: Self::Info) -> Self::Future {
|
||||
Box::pin(async move {
|
||||
let (listener_noise_peer_id, mut socket) = noise::Config::new(&self.noise_keypair)
|
||||
.unwrap()
|
||||
.upgrade_outbound(socket, info)
|
||||
.await
|
||||
.map_err(io::Error::other)?;
|
||||
|
||||
let (our_challenge, listener_challenge) = OnlyValidators::challenges(&mut socket).await?;
|
||||
let listener_serai_validator = self
|
||||
.authenticate(
|
||||
&mut socket,
|
||||
PeerId::from_public_key(&self.noise_keypair.public()),
|
||||
our_challenge,
|
||||
listener_noise_peer_id,
|
||||
listener_challenge,
|
||||
)
|
||||
.await?;
|
||||
Ok((listener_serai_validator, socket))
|
||||
})
|
||||
}
|
||||
}
|
||||
106
coordinator/src/p2p/dial.rs
Normal file
106
coordinator/src/p2p/dial.rs
Normal file
@@ -0,0 +1,106 @@
|
||||
use core::future::Future;
|
||||
use std::collections::HashSet;
|
||||
|
||||
use rand_core::{RngCore, OsRng};
|
||||
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use serai_client::Serai;
|
||||
|
||||
use libp2p::{
|
||||
core::multiaddr::{Protocol, Multiaddr},
|
||||
swarm::dial_opts::DialOpts,
|
||||
};
|
||||
|
||||
use serai_task::ContinuallyRan;
|
||||
|
||||
use crate::p2p::{PORT, Peers, validators::Validators};
|
||||
|
||||
const TARGET_PEERS_PER_NETWORK: usize = 5;
|
||||
|
||||
struct DialTask {
|
||||
serai: Serai,
|
||||
validators: Validators,
|
||||
peers: Peers,
|
||||
to_dial: mpsc::UnboundedSender<DialOpts>,
|
||||
}
|
||||
|
||||
impl ContinuallyRan for DialTask {
|
||||
// Only run every five minutes, not the default of every five seconds
|
||||
const DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60;
|
||||
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 10 * 60;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
async move {
|
||||
self.validators.update().await?;
|
||||
|
||||
// If any of our peers is lacking, try to connect to more
|
||||
let mut dialed = false;
|
||||
let peer_counts = self
|
||||
.peers
|
||||
.peers
|
||||
.read()
|
||||
.await
|
||||
.iter()
|
||||
.map(|(network, peers)| (*network, peers.len()))
|
||||
.collect::<Vec<_>>();
|
||||
for (network, peer_count) in peer_counts {
|
||||
/*
|
||||
If we don't have the target amount of peers, and we don't have all the validators in the
|
||||
set but one, attempt to connect to more validators within this set.
|
||||
|
||||
The latter clause is so if there's a set with only 3 validators, we don't infinitely try
|
||||
to connect to the target amount of peers for this network as we never will. Instead, we
|
||||
only try to connect to most of the validators actually present.
|
||||
*/
|
||||
if (peer_count < TARGET_PEERS_PER_NETWORK) &&
|
||||
(peer_count <
|
||||
self
|
||||
.validators
|
||||
.by_network()
|
||||
.get(&network)
|
||||
.map(HashSet::len)
|
||||
.unwrap_or(0)
|
||||
.saturating_sub(1))
|
||||
{
|
||||
let mut potential_peers =
|
||||
self.serai.p2p_validators(network).await.map_err(|e| format!("{e:?}"))?;
|
||||
for _ in 0 .. (TARGET_PEERS_PER_NETWORK - peer_count) {
|
||||
if potential_peers.is_empty() {
|
||||
break;
|
||||
}
|
||||
let index_to_dial =
|
||||
usize::try_from(OsRng.next_u64() % u64::try_from(potential_peers.len()).unwrap())
|
||||
.unwrap();
|
||||
let randomly_selected_peer = potential_peers.swap_remove(index_to_dial);
|
||||
|
||||
log::info!("found peer from substrate: {randomly_selected_peer}");
|
||||
|
||||
// Map the peer from a Substrate P2P network peer to a Coordinator P2P network peer
|
||||
let mapped_peer = randomly_selected_peer
|
||||
.into_iter()
|
||||
.filter_map(|protocol| match protocol {
|
||||
// Drop PeerIds from the Substrate P2p network
|
||||
Protocol::P2p(_) => None,
|
||||
// Use our own TCP port
|
||||
Protocol::Tcp(_) => Some(Protocol::Tcp(PORT)),
|
||||
// Pass-through any other specifications (IPv4, IPv6, etc)
|
||||
other => Some(other),
|
||||
})
|
||||
.collect::<Multiaddr>();
|
||||
|
||||
log::debug!("mapped found peer: {mapped_peer}");
|
||||
|
||||
self
|
||||
.to_dial
|
||||
.send(DialOpts::unknown_peer_id().address(mapped_peer).build())
|
||||
.expect("dial receiver closed?");
|
||||
dialed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(dialed)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -59,12 +59,11 @@ pub(crate) fn new_behavior() -> Behavior {
|
||||
})
|
||||
.build();
|
||||
|
||||
// TODO: Don't use IdentityTransform here. Authenticate using validator keys
|
||||
let mut gossipsub = Behavior::new(MessageAuthenticity::Anonymous, config.unwrap()).unwrap();
|
||||
let mut gossip = Behavior::new(MessageAuthenticity::Anonymous, config.unwrap()).unwrap();
|
||||
|
||||
// Subscribe to the base topic
|
||||
let topic = IdentTopic::new(BASE_TOPIC);
|
||||
let _ = gossipsub.subscribe(&topic);
|
||||
let _ = gossip.subscribe(&topic);
|
||||
|
||||
gossipsub
|
||||
gossip
|
||||
}
|
||||
|
||||
@@ -1,12 +1,48 @@
|
||||
use serai_client::primitives::NetworkId;
|
||||
use std::{
|
||||
sync::Arc,
|
||||
collections::{HashSet, HashMap},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use serai_client::primitives::{NetworkId, PublicKey};
|
||||
|
||||
use tokio::sync::{mpsc, RwLock};
|
||||
|
||||
use futures_util::StreamExt;
|
||||
use libp2p::{
|
||||
multihash::Multihash,
|
||||
identity::PeerId,
|
||||
swarm::{dial_opts::DialOpts, NetworkBehaviour, SwarmEvent, Swarm},
|
||||
};
|
||||
|
||||
/// A struct to sync the validators from the Serai node in order to keep track of them.
|
||||
mod validators;
|
||||
use validators::{Validators, update_shared_validators};
|
||||
|
||||
/// The authentication protocol upgrade to limit the P2P network to active validators.
|
||||
mod authenticate;
|
||||
|
||||
/// The dial task, to find new peers to connect to
|
||||
mod dial;
|
||||
|
||||
/// The request-response messages and behavior
|
||||
mod reqres;
|
||||
use reqres::{Request, Response};
|
||||
|
||||
/// The gossip messages and behavior
|
||||
mod gossip;
|
||||
|
||||
/// The heartbeat task, effecting sync of Tributaries
|
||||
mod heartbeat;
|
||||
|
||||
const PORT: u16 = 30563; // 5132 ^ (('c' << 8) | 'o')
|
||||
|
||||
fn peer_id_from_public(public: PublicKey) -> PeerId {
|
||||
// 0 represents the identity Multihash, that no hash was performed
|
||||
// It's an internal constant so we can't refer to the constant inside libp2p
|
||||
PeerId::from_multihash(Multihash::wrap(0, &public.0).unwrap()).unwrap()
|
||||
}
|
||||
|
||||
struct Peer;
|
||||
impl Peer {
|
||||
async fn send(&self, request: Request) -> Result<Response, tokio::time::error::Elapsed> {
|
||||
@@ -14,6 +50,11 @@ impl Peer {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Peers {
|
||||
peers: Arc<RwLock<HashMap<NetworkId, HashSet<PeerId>>>>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct P2p;
|
||||
impl P2p {
|
||||
@@ -28,3 +69,140 @@ impl tributary::P2p for P2p {
|
||||
todo!("TODO")
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(NetworkBehaviour)]
|
||||
struct Behavior {
|
||||
reqres: reqres::Behavior,
|
||||
gossip: gossip::Behavior,
|
||||
}
|
||||
|
||||
struct SwarmTask {
|
||||
to_dial: mpsc::UnboundedReceiver<DialOpts>,
|
||||
|
||||
validators: Arc<RwLock<Validators>>,
|
||||
last_refreshed_validators: Instant,
|
||||
next_refresh_validators: Instant,
|
||||
|
||||
peers: Peers,
|
||||
rebuild_peers_at: Instant,
|
||||
|
||||
swarm: Swarm<Behavior>,
|
||||
}
|
||||
|
||||
impl SwarmTask {
|
||||
async fn run(mut self) {
|
||||
loop {
|
||||
let time_till_refresh_validators =
|
||||
self.next_refresh_validators.saturating_duration_since(Instant::now());
|
||||
let time_till_rebuild_peers = self.rebuild_peers_at.saturating_duration_since(Instant::now());
|
||||
|
||||
tokio::select! {
|
||||
biased;
|
||||
|
||||
// Refresh the instance of validators we use to track peers/share with authenticate
|
||||
// TODO: Move this to a task
|
||||
() = tokio::time::sleep(time_till_refresh_validators) => {
|
||||
const TIME_BETWEEN_REFRESH_VALIDATORS: Duration = Duration::from_secs(60);
|
||||
const MAX_TIME_BETWEEN_REFRESH_VALIDATORS: Duration = Duration::from_secs(5 * 60);
|
||||
|
||||
let update = update_shared_validators(&self.validators).await;
|
||||
match update {
|
||||
Ok(removed) => {
|
||||
for removed in removed {
|
||||
let _: Result<_, _> = self.swarm.disconnect_peer_id(removed);
|
||||
}
|
||||
self.last_refreshed_validators = Instant::now();
|
||||
self.next_refresh_validators = Instant::now() + TIME_BETWEEN_REFRESH_VALIDATORS;
|
||||
}
|
||||
Err(e) => {
|
||||
log::warn!("couldn't refresh validators: {e:?}");
|
||||
// Increase the delay before the next refresh by using the time since the last
|
||||
// refresh. This will be 5 seconds, then 5 seconds, then 10 seconds, then 20...
|
||||
let time_since_last = self
|
||||
.next_refresh_validators
|
||||
.saturating_duration_since(self.last_refreshed_validators);
|
||||
// But limit the delay
|
||||
self.next_refresh_validators =
|
||||
Instant::now() + time_since_last.min(MAX_TIME_BETWEEN_REFRESH_VALIDATORS);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Rebuild the peers every 10 minutes
|
||||
//
|
||||
// This handles edge cases such as when a validator changes the networks they're present
|
||||
// in, race conditions, or any other edge cases/quirks which would otherwise risk spiraling
|
||||
// out of control
|
||||
() = tokio::time::sleep(time_till_rebuild_peers) => {
|
||||
const TIME_BETWEEN_REBUILD_PEERS: Duration = Duration::from_secs(10 * 60);
|
||||
|
||||
let validators_by_network = self.validators.read().await.by_network().clone();
|
||||
let connected = self.swarm.connected_peers().copied().collect::<HashSet<_>>();
|
||||
let mut peers = HashMap::new();
|
||||
for (network, validators) in validators_by_network {
|
||||
peers.insert(network, validators.intersection(&connected).copied().collect());
|
||||
}
|
||||
*self.peers.peers.write().await = peers;
|
||||
|
||||
self.rebuild_peers_at = Instant::now() + TIME_BETWEEN_REBUILD_PEERS;
|
||||
}
|
||||
|
||||
// Dial peers we're instructed to
|
||||
dial_opts = self.to_dial.recv() => {
|
||||
let dial_opts = dial_opts.expect("DialTask was closed?");
|
||||
let _: Result<_, _> = self.swarm.dial(dial_opts);
|
||||
}
|
||||
|
||||
// Handle swarm events
|
||||
event = self.swarm.next() => {
|
||||
// `Swarm::next` will never return `Poll::Ready(None)`
|
||||
// https://docs.rs/
|
||||
// libp2p/0.54.1/libp2p/struct.Swarm.html#impl-Stream-for-Swarm%3CTBehaviour%3E
|
||||
let event = event.unwrap();
|
||||
match event {
|
||||
SwarmEvent::Behaviour(BehaviorEvent::Reqres(event)) => todo!("TODO"),
|
||||
SwarmEvent::Behaviour(BehaviorEvent::Gossip(event)) => todo!("TODO"),
|
||||
// New connection, so update peers
|
||||
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
|
||||
let Some(networks) =
|
||||
self.validators.read().await.networks(&peer_id).cloned() else { continue };
|
||||
for network in networks {
|
||||
self
|
||||
.peers
|
||||
.peers
|
||||
.write()
|
||||
.await
|
||||
.entry(network)
|
||||
.or_insert_with(HashSet::new)
|
||||
.insert(peer_id);
|
||||
}
|
||||
},
|
||||
// Connection closed, so update peers
|
||||
SwarmEvent::ConnectionClosed { peer_id, .. } => {
|
||||
let Some(networks) =
|
||||
self.validators.read().await.networks(&peer_id).cloned() else { continue };
|
||||
for network in networks {
|
||||
self
|
||||
.peers
|
||||
.peers
|
||||
.write()
|
||||
.await
|
||||
.entry(network)
|
||||
.or_insert_with(HashSet::new)
|
||||
.remove(&peer_id);
|
||||
}
|
||||
},
|
||||
SwarmEvent::IncomingConnection { .. } |
|
||||
SwarmEvent::IncomingConnectionError { .. } |
|
||||
SwarmEvent::OutgoingConnectionError { .. } |
|
||||
SwarmEvent::NewListenAddr { .. } |
|
||||
SwarmEvent::ExpiredListenAddr { .. } |
|
||||
SwarmEvent::ListenerClosed { .. } |
|
||||
SwarmEvent::ListenerError { .. } |
|
||||
SwarmEvent::Dialing { .. } => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use core::time::Duration;
|
||||
use std::io::{self, Read};
|
||||
use core::{fmt, time::Duration};
|
||||
use std::io;
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
||||
@@ -46,6 +46,15 @@ pub(crate) enum Response {
|
||||
Blocks(Vec<TributaryBlockWithCommit>),
|
||||
NotableCosigns(Vec<SignedCosign>),
|
||||
}
|
||||
impl fmt::Debug for Response {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
(match self {
|
||||
Response::Blocks(_) => fmt.debug_struct("Response::Block"),
|
||||
Response::NotableCosigns(_) => fmt.debug_struct("Response::NotableCosigns"),
|
||||
})
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
/// The codec used for the request-response protocol.
|
||||
///
|
||||
@@ -53,7 +62,7 @@ pub(crate) enum Response {
|
||||
/// ideally, we'd use borsh directly with the `io` traits defined here, they're async and there
|
||||
/// isn't an amenable API within borsh for incremental deserialization.
|
||||
#[derive(Default, Clone, Copy, Debug)]
|
||||
struct Codec;
|
||||
pub(crate) struct Codec;
|
||||
impl Codec {
|
||||
async fn read<M: BorshDeserialize>(io: &mut (impl Unpin + AsyncRead)) -> io::Result<M> {
|
||||
let mut len = [0; 4];
|
||||
|
||||
140
coordinator/src/p2p/validators.rs
Normal file
140
coordinator/src/p2p/validators.rs
Normal file
@@ -0,0 +1,140 @@
|
||||
use core::borrow::Borrow;
|
||||
use std::{
|
||||
sync::Arc,
|
||||
collections::{HashSet, HashMap},
|
||||
};
|
||||
|
||||
use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, Serai};
|
||||
|
||||
use libp2p::PeerId;
|
||||
|
||||
use futures_util::stream::{StreamExt, FuturesUnordered};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::p2p::peer_id_from_public;
|
||||
|
||||
pub(crate) struct Validators {
|
||||
serai: Serai,
|
||||
|
||||
// A cache for which session we're populated with the validators of
|
||||
sessions: HashMap<NetworkId, Session>,
|
||||
// The validators by network
|
||||
by_network: HashMap<NetworkId, HashSet<PeerId>>,
|
||||
// The validators and their networks
|
||||
validators: HashMap<PeerId, HashSet<NetworkId>>,
|
||||
}
|
||||
|
||||
impl Validators {
|
||||
async fn session_changes(
|
||||
serai: impl Borrow<Serai>,
|
||||
sessions: impl Borrow<HashMap<NetworkId, Session>>,
|
||||
) -> Result<Vec<(NetworkId, Session, HashSet<PeerId>)>, String> {
|
||||
let temporal_serai =
|
||||
serai.borrow().as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?;
|
||||
let temporal_serai = temporal_serai.validator_sets();
|
||||
|
||||
let mut session_changes = vec![];
|
||||
{
|
||||
// FuturesUnordered can be bad practice as it'll cause timeouts if infrequently polled, but
|
||||
// we poll it till it yields all futures with the most minimal processing possible
|
||||
let mut futures = FuturesUnordered::new();
|
||||
for network in serai_client::primitives::NETWORKS {
|
||||
if network == NetworkId::Serai {
|
||||
continue;
|
||||
}
|
||||
let sessions = sessions.borrow();
|
||||
futures.push(async move {
|
||||
let session = match temporal_serai.session(network).await {
|
||||
Ok(Some(session)) => session,
|
||||
Ok(None) => return Ok(None),
|
||||
Err(e) => return Err(format!("{e:?}")),
|
||||
};
|
||||
|
||||
if sessions.get(&network) == Some(&session) {
|
||||
Ok(None)
|
||||
} else {
|
||||
match temporal_serai.active_network_validators(network).await {
|
||||
Ok(validators) => Ok(Some((
|
||||
network,
|
||||
session,
|
||||
validators.into_iter().map(peer_id_from_public).collect(),
|
||||
))),
|
||||
Err(e) => Err(format!("{e:?}")),
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
while let Some(session_change) = futures.next().await {
|
||||
if let Some(session_change) = session_change? {
|
||||
session_changes.push(session_change);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(session_changes)
|
||||
}
|
||||
|
||||
fn incorporate_session_changes(
|
||||
&mut self,
|
||||
session_changes: Vec<(NetworkId, Session, HashSet<PeerId>)>,
|
||||
) -> HashSet<PeerId> {
|
||||
let mut removed = HashSet::new();
|
||||
|
||||
for (network, session, validators) in session_changes {
|
||||
// Remove the existing validators
|
||||
for validator in self.by_network.remove(&network).unwrap_or_else(HashSet::new) {
|
||||
let mut networks = self.validators.remove(&validator).unwrap();
|
||||
networks.remove(&network);
|
||||
if networks.is_empty() {
|
||||
removed.insert(validator);
|
||||
} else {
|
||||
self.validators.insert(validator, networks);
|
||||
}
|
||||
}
|
||||
|
||||
// Add the new validators
|
||||
for validator in validators.iter().copied() {
|
||||
self.validators.entry(validator).or_insert_with(HashSet::new).insert(network);
|
||||
}
|
||||
self.by_network.insert(network, validators);
|
||||
|
||||
// Update the session we have populated
|
||||
self.sessions.insert(network, session);
|
||||
}
|
||||
|
||||
removed
|
||||
}
|
||||
|
||||
/// Update the view of the validators.
|
||||
///
|
||||
/// Returns all validators removed from the active validator set.
|
||||
pub(crate) async fn update(&mut self) -> Result<HashSet<PeerId>, String> {
|
||||
let session_changes = Self::session_changes(&self.serai, &self.sessions).await?;
|
||||
Ok(self.incorporate_session_changes(session_changes))
|
||||
}
|
||||
|
||||
pub(crate) fn by_network(&self) -> &HashMap<NetworkId, HashSet<PeerId>> {
|
||||
&self.by_network
|
||||
}
|
||||
|
||||
pub(crate) fn contains(&self, peer_id: &PeerId) -> bool {
|
||||
self.validators.contains_key(peer_id)
|
||||
}
|
||||
|
||||
pub(crate) fn networks(&self, peer_id: &PeerId) -> Option<&HashSet<NetworkId>> {
|
||||
self.validators.get(peer_id)
|
||||
}
|
||||
}
|
||||
|
||||
/// Update the view of the validators.
|
||||
///
|
||||
/// Returns all validators removed from the active validator set.
|
||||
pub(crate) async fn update_shared_validators(
|
||||
validators: &Arc<RwLock<Validators>>,
|
||||
) -> Result<HashSet<PeerId>, String> {
|
||||
let session_changes = {
|
||||
let validators = validators.read().await;
|
||||
Validators::session_changes(validators.serai.clone(), validators.sessions.clone()).await?
|
||||
};
|
||||
Ok(validators.write().await.incorporate_session_changes(session_changes))
|
||||
}
|
||||
@@ -107,6 +107,9 @@ impl<D: Db> ContinuallyRan for CanonicalEventStream<D> {
|
||||
|
||||
// Sync the next set of upcoming blocks all at once to minimize latency
|
||||
const BLOCKS_TO_SYNC_AT_ONCE: u64 = 10;
|
||||
// FuturesOrdered can be bad practice due to potentially causing tiemouts if it isn't
|
||||
// sufficiently polled. Considering our processing loop is minimal and it does poll this,
|
||||
// it's fine.
|
||||
let mut set = FuturesOrdered::new();
|
||||
for block_number in
|
||||
next_block ..= latest_finalized_block.min(next_block + BLOCKS_TO_SYNC_AT_ONCE)
|
||||
|
||||
@@ -100,6 +100,11 @@ impl<D: Db> ContinuallyRan for EphemeralEventStream<D> {
|
||||
|
||||
// Sync the next set of upcoming blocks all at once to minimize latency
|
||||
const BLOCKS_TO_SYNC_AT_ONCE: u64 = 50;
|
||||
// FuturesOrdered can be bad practice due to potentially causing tiemouts if it isn't
|
||||
// sufficiently polled. Our processing loop isn't minimal, itself making multiple requests,
|
||||
// but the loop body should only be executed a few times a week. It's better to get through
|
||||
// most blocks with this optimization, and have timeouts a few times a week, than not have
|
||||
// this at all.
|
||||
let mut set = FuturesOrdered::new();
|
||||
for block_number in
|
||||
next_block ..= latest_finalized_block.min(next_block + BLOCKS_TO_SYNC_AT_ONCE)
|
||||
|
||||
@@ -65,7 +65,7 @@ where
|
||||
let validators = client.runtime_api().validators(latest_block, network).map_err(|_| {
|
||||
jsonrpsee::core::Error::to_call_error(std::io::Error::other(format!(
|
||||
"couldn't get validators from the latest block, which is likely a fatal bug. {}",
|
||||
"please report this at https://github.com/serai-dex/serai",
|
||||
"please report this at https://github.com/serai-dex/serai/issues",
|
||||
)))
|
||||
})?;
|
||||
// Always return the protocol's bootnodes
|
||||
|
||||
Reference in New Issue
Block a user