From 3daeea09e630623cb58965ca1292069779ca2051 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sat, 4 Jan 2025 22:21:23 -0500 Subject: [PATCH] Only let active Serai validators connect over P2P --- coordinator/src/p2p/authenticate.rs | 24 ++++++---- coordinator/src/p2p/dial.rs | 47 +++++--------------- coordinator/src/p2p/gossip.rs | 7 ++- coordinator/src/p2p/mod.rs | 47 +++++++++++++++++++- coordinator/src/p2p/reqres.rs | 13 +++++- coordinator/src/p2p/validators.rs | 69 +++++++++++++++++++++++++++++ 6 files changed, 157 insertions(+), 50 deletions(-) create mode 100644 coordinator/src/p2p/validators.rs diff --git a/coordinator/src/p2p/authenticate.rs b/coordinator/src/p2p/authenticate.rs index 4b61d381..99d98515 100644 --- a/coordinator/src/p2p/authenticate.rs +++ b/coordinator/src/p2p/authenticate.rs @@ -1,5 +1,5 @@ use core::{pin::Pin, future::Future}; -use std::io; +use std::{sync::Arc, io}; use zeroize::Zeroizing; use rand_core::{RngCore, OsRng}; @@ -7,14 +7,19 @@ use rand_core::{RngCore, OsRng}; use blake2::{Digest, Blake2s256}; use schnorrkel::{Keypair, PublicKey, Signature}; -use futures_util::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, StreamExt}; -use libp2p::{ - core::UpgradeInfo, InboundUpgrade, OutboundUpgrade, multihash::Multihash, identity::PeerId, noise, -}; +use serai_client::primitives::PublicKey as Public; + +use tokio::sync::RwLock; + +use futures_util::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use libp2p::{core::UpgradeInfo, InboundUpgrade, OutboundUpgrade, identity::PeerId, noise}; + +use crate::p2p::{validators::Validators, peer_id_from_public}; const PROTOCOL: &str = "/serai/coordinator/validators"; struct OnlyValidators { + validators: Arc>, serai_key: Zeroizing, our_peer_id: PeerId, } @@ -97,9 +102,12 @@ impl OnlyValidators { .verify_simple(PROTOCOL.as_bytes(), &msg, &sig) .map_err(|_| io::Error::other("invalid signature"))?; - // 0 represents the identity Multihash, that no hash was performed - // It's an internal constant so we can't refer to the constant inside libp2p - Ok(PeerId::from_multihash(Multihash::wrap(0, &public_key.to_bytes()).unwrap()).unwrap()) + let peer_id = peer_id_from_public(Public::from_raw(public_key.to_bytes())); + if !self.validators.read().await.contains(&peer_id) { + Err(io::Error::other("peer which tried to connect isn't a known active validator"))?; + } + + Ok(peer_id) } } diff --git a/coordinator/src/p2p/dial.rs b/coordinator/src/p2p/dial.rs index 94ee664e..2e427ee7 100644 --- a/coordinator/src/p2p/dial.rs +++ b/coordinator/src/p2p/dial.rs @@ -1,15 +1,10 @@ use core::future::Future; -use std::collections::HashMap; use rand_core::{RngCore, OsRng}; use tokio::sync::mpsc; -use serai_client::{ - primitives::{NetworkId, PublicKey}, - validator_sets::primitives::Session, - Serai, -}; +use serai_client::Serai; use libp2p::{ core::multiaddr::{Protocol, Multiaddr}, @@ -18,16 +13,13 @@ use libp2p::{ use serai_task::ContinuallyRan; -use crate::p2p::{PORT, Peers}; +use crate::p2p::{PORT, Peers, validators::Validators}; const TARGET_PEERS_PER_NETWORK: usize = 5; struct DialTask { serai: Serai, - - sessions: HashMap, - validators: HashMap>, - + validators: Validators, peers: Peers, to_dial: mpsc::UnboundedSender, } @@ -37,29 +29,7 @@ impl ContinuallyRan for DialTask { fn run_iteration(&mut self) -> impl Send + Future> { async move { - let temporal_serai = - self.serai.as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?; - let temporal_serai = temporal_serai.validator_sets(); - for network in serai_client::primitives::NETWORKS { - if network == NetworkId::Serai { - continue; - } - let Some(session) = temporal_serai.session(network).await.map_err(|e| format!("{e:?}"))? - else { - continue; - }; - // If the session has changed, populate it with the current validators - if self.sessions.get(&network) != Some(&session) { - self.validators.insert( - network, - temporal_serai - .active_network_validators(network) - .await - .map_err(|e| format!("{e:?}"))?, - ); - self.sessions.insert(network, session); - } - } + self.validators.update().await?; // If any of our peers is lacking, try to connect to more let mut dialed = false; @@ -81,7 +51,14 @@ impl ContinuallyRan for DialTask { only try to connect to most of the validators actually present. */ if (peer_count < TARGET_PEERS_PER_NETWORK) && - (peer_count < self.validators[&network].len().saturating_sub(1)) + (peer_count < + self + .validators + .validators() + .get(&network) + .map(Vec::len) + .unwrap_or(0) + .saturating_sub(1)) { let mut potential_peers = self.serai.p2p_validators(network).await.map_err(|e| format!("{e:?}"))?; diff --git a/coordinator/src/p2p/gossip.rs b/coordinator/src/p2p/gossip.rs index b5b8ebd9..8e32180b 100644 --- a/coordinator/src/p2p/gossip.rs +++ b/coordinator/src/p2p/gossip.rs @@ -59,12 +59,11 @@ pub(crate) fn new_behavior() -> Behavior { }) .build(); - // TODO: Don't use IdentityTransform here. Authenticate using validator keys - let mut gossipsub = Behavior::new(MessageAuthenticity::Anonymous, config.unwrap()).unwrap(); + let mut gossip = Behavior::new(MessageAuthenticity::Anonymous, config.unwrap()).unwrap(); // Subscribe to the base topic let topic = IdentTopic::new(BASE_TOPIC); - let _ = gossipsub.subscribe(&topic); + let _ = gossip.subscribe(&topic); - gossipsub + gossip } diff --git a/coordinator/src/p2p/mod.rs b/coordinator/src/p2p/mod.rs index 09b7402d..97f00cbf 100644 --- a/coordinator/src/p2p/mod.rs +++ b/coordinator/src/p2p/mod.rs @@ -1,12 +1,46 @@ -use serai_client::primitives::NetworkId; +use std::{ + sync::{Arc, RwLock}, + collections::{HashSet, HashMap}, +}; +use serai_client::primitives::{NetworkId, PublicKey}; + +use tokio::sync::mpsc; + +use futures_util::StreamExt; +use libp2p::{ + multihash::Multihash, + identity::PeerId, + swarm::{dial_opts::DialOpts, NetworkBehaviour, SwarmEvent, Swarm}, +}; + +/// A struct to sync the validators from the Serai node in order to keep track of them. +mod validators; + +/// The authentication protocol upgrade to limit the P2P network to active validators. +mod authenticate; + +/// The dial task, to find new peers to connect to +mod dial; + +/// The request-response messages and behavior mod reqres; use reqres::{Request, Response}; +/// The gossip messages and behavior mod gossip; +/// The heartbeat task, effecting sync of Tributaries mod heartbeat; +const PORT: u16 = 30563; // 5132 ^ (('c' << 8) | 'o') + +fn peer_id_from_public(public: PublicKey) -> PeerId { + // 0 represents the identity Multihash, that no hash was performed + // It's an internal constant so we can't refer to the constant inside libp2p + PeerId::from_multihash(Multihash::wrap(0, &public.0).unwrap()).unwrap() +} + struct Peer; impl Peer { async fn send(&self, request: Request) -> Result { @@ -14,6 +48,11 @@ impl Peer { } } +#[derive(Clone)] +struct Peers { + peers: Arc>>>, +} + #[derive(Clone, Debug)] struct P2p; impl P2p { @@ -28,3 +67,9 @@ impl tributary::P2p for P2p { todo!("TODO") } } + +#[derive(NetworkBehaviour)] +struct Behavior { + reqres: reqres::Behavior, + gossip: gossip::Behavior, +} diff --git a/coordinator/src/p2p/reqres.rs b/coordinator/src/p2p/reqres.rs index d1b1a2ec..b5d87c1c 100644 --- a/coordinator/src/p2p/reqres.rs +++ b/coordinator/src/p2p/reqres.rs @@ -1,4 +1,4 @@ -use core::time::Duration; +use core::{fmt, time::Duration}; use std::io::{self, Read}; use async_trait::async_trait; @@ -46,6 +46,15 @@ pub(crate) enum Response { Blocks(Vec), NotableCosigns(Vec), } +impl fmt::Debug for Response { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + (match self { + Response::Blocks(_) => fmt.debug_struct("Response::Block"), + Response::NotableCosigns(_) => fmt.debug_struct("Response::NotableCosigns"), + }) + .finish_non_exhaustive() + } +} /// The codec used for the request-response protocol. /// @@ -53,7 +62,7 @@ pub(crate) enum Response { /// ideally, we'd use borsh directly with the `io` traits defined here, they're async and there /// isn't an amenable API within borsh for incremental deserialization. #[derive(Default, Clone, Copy, Debug)] -struct Codec; +pub(crate) struct Codec; impl Codec { async fn read(io: &mut (impl Unpin + AsyncRead)) -> io::Result { let mut len = [0; 4]; diff --git a/coordinator/src/p2p/validators.rs b/coordinator/src/p2p/validators.rs new file mode 100644 index 00000000..1bd10110 --- /dev/null +++ b/coordinator/src/p2p/validators.rs @@ -0,0 +1,69 @@ +use std::collections::HashMap; + +use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, Serai}; + +use libp2p::PeerId; + +use crate::p2p::peer_id_from_public; + +pub(crate) struct Validators { + serai: Serai, + + // A cache for which session we're populated with the validators of + sessions: HashMap, + // The validators by network + by_network: HashMap>, + // The set of all validators (as a HashMap to represent the amount of inclusions) + set: HashMap, +} + +impl Validators { + pub(crate) async fn update(&mut self) -> Result<(), String> { + let temporal_serai = + self.serai.as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?; + let temporal_serai = temporal_serai.validator_sets(); + for network in serai_client::primitives::NETWORKS { + if network == NetworkId::Serai { + continue; + } + let Some(session) = temporal_serai.session(network).await.map_err(|e| format!("{e:?}"))? + else { + continue; + }; + // If the session has changed, populate it with the current validators + if self.sessions.get(&network) != Some(&session) { + let new_validators = + temporal_serai.active_network_validators(network).await.map_err(|e| format!("{e:?}"))?; + let new_validators = + new_validators.into_iter().map(peer_id_from_public).collect::>(); + + // Remove the existing validators + for validator in self.by_network.remove(&network).unwrap_or(vec![]) { + let mut inclusions = self.set.remove(&validator).unwrap(); + inclusions -= 1; + if inclusions != 0 { + self.set.insert(validator, inclusions); + } + } + + // Add the new validators + for validator in new_validators.iter().copied() { + *self.set.entry(validator).or_insert(0) += 1; + } + self.by_network.insert(network, new_validators); + + // Update the session we have populated + self.sessions.insert(network, session); + } + } + Ok(()) + } + + pub(crate) fn validators(&self) -> &HashMap> { + &self.by_network + } + + pub(crate) fn contains(&self, peer_id: &PeerId) -> bool { + self.set.contains_key(peer_id) + } +}