mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-08 12:19:24 +00:00
Only let active Serai validators connect over P2P
This commit is contained in:
@@ -1,5 +1,5 @@
|
|||||||
use core::{pin::Pin, future::Future};
|
use core::{pin::Pin, future::Future};
|
||||||
use std::io;
|
use std::{sync::Arc, io};
|
||||||
|
|
||||||
use zeroize::Zeroizing;
|
use zeroize::Zeroizing;
|
||||||
use rand_core::{RngCore, OsRng};
|
use rand_core::{RngCore, OsRng};
|
||||||
@@ -7,14 +7,19 @@ use rand_core::{RngCore, OsRng};
|
|||||||
use blake2::{Digest, Blake2s256};
|
use blake2::{Digest, Blake2s256};
|
||||||
use schnorrkel::{Keypair, PublicKey, Signature};
|
use schnorrkel::{Keypair, PublicKey, Signature};
|
||||||
|
|
||||||
use futures_util::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, StreamExt};
|
use serai_client::primitives::PublicKey as Public;
|
||||||
use libp2p::{
|
|
||||||
core::UpgradeInfo, InboundUpgrade, OutboundUpgrade, multihash::Multihash, identity::PeerId, noise,
|
use tokio::sync::RwLock;
|
||||||
};
|
|
||||||
|
use futures_util::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||||
|
use libp2p::{core::UpgradeInfo, InboundUpgrade, OutboundUpgrade, identity::PeerId, noise};
|
||||||
|
|
||||||
|
use crate::p2p::{validators::Validators, peer_id_from_public};
|
||||||
|
|
||||||
const PROTOCOL: &str = "/serai/coordinator/validators";
|
const PROTOCOL: &str = "/serai/coordinator/validators";
|
||||||
|
|
||||||
struct OnlyValidators {
|
struct OnlyValidators {
|
||||||
|
validators: Arc<RwLock<Validators>>,
|
||||||
serai_key: Zeroizing<Keypair>,
|
serai_key: Zeroizing<Keypair>,
|
||||||
our_peer_id: PeerId,
|
our_peer_id: PeerId,
|
||||||
}
|
}
|
||||||
@@ -97,9 +102,12 @@ impl OnlyValidators {
|
|||||||
.verify_simple(PROTOCOL.as_bytes(), &msg, &sig)
|
.verify_simple(PROTOCOL.as_bytes(), &msg, &sig)
|
||||||
.map_err(|_| io::Error::other("invalid signature"))?;
|
.map_err(|_| io::Error::other("invalid signature"))?;
|
||||||
|
|
||||||
// 0 represents the identity Multihash, that no hash was performed
|
let peer_id = peer_id_from_public(Public::from_raw(public_key.to_bytes()));
|
||||||
// It's an internal constant so we can't refer to the constant inside libp2p
|
if !self.validators.read().await.contains(&peer_id) {
|
||||||
Ok(PeerId::from_multihash(Multihash::wrap(0, &public_key.to_bytes()).unwrap()).unwrap())
|
Err(io::Error::other("peer which tried to connect isn't a known active validator"))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(peer_id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,15 +1,10 @@
|
|||||||
use core::future::Future;
|
use core::future::Future;
|
||||||
use std::collections::HashMap;
|
|
||||||
|
|
||||||
use rand_core::{RngCore, OsRng};
|
use rand_core::{RngCore, OsRng};
|
||||||
|
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
use serai_client::{
|
use serai_client::Serai;
|
||||||
primitives::{NetworkId, PublicKey},
|
|
||||||
validator_sets::primitives::Session,
|
|
||||||
Serai,
|
|
||||||
};
|
|
||||||
|
|
||||||
use libp2p::{
|
use libp2p::{
|
||||||
core::multiaddr::{Protocol, Multiaddr},
|
core::multiaddr::{Protocol, Multiaddr},
|
||||||
@@ -18,16 +13,13 @@ use libp2p::{
|
|||||||
|
|
||||||
use serai_task::ContinuallyRan;
|
use serai_task::ContinuallyRan;
|
||||||
|
|
||||||
use crate::p2p::{PORT, Peers};
|
use crate::p2p::{PORT, Peers, validators::Validators};
|
||||||
|
|
||||||
const TARGET_PEERS_PER_NETWORK: usize = 5;
|
const TARGET_PEERS_PER_NETWORK: usize = 5;
|
||||||
|
|
||||||
struct DialTask {
|
struct DialTask {
|
||||||
serai: Serai,
|
serai: Serai,
|
||||||
|
validators: Validators,
|
||||||
sessions: HashMap<NetworkId, Session>,
|
|
||||||
validators: HashMap<NetworkId, Vec<PublicKey>>,
|
|
||||||
|
|
||||||
peers: Peers,
|
peers: Peers,
|
||||||
to_dial: mpsc::UnboundedSender<DialOpts>,
|
to_dial: mpsc::UnboundedSender<DialOpts>,
|
||||||
}
|
}
|
||||||
@@ -37,29 +29,7 @@ impl ContinuallyRan for DialTask {
|
|||||||
|
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||||
async move {
|
async move {
|
||||||
let temporal_serai =
|
self.validators.update().await?;
|
||||||
self.serai.as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?;
|
|
||||||
let temporal_serai = temporal_serai.validator_sets();
|
|
||||||
for network in serai_client::primitives::NETWORKS {
|
|
||||||
if network == NetworkId::Serai {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
let Some(session) = temporal_serai.session(network).await.map_err(|e| format!("{e:?}"))?
|
|
||||||
else {
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
// If the session has changed, populate it with the current validators
|
|
||||||
if self.sessions.get(&network) != Some(&session) {
|
|
||||||
self.validators.insert(
|
|
||||||
network,
|
|
||||||
temporal_serai
|
|
||||||
.active_network_validators(network)
|
|
||||||
.await
|
|
||||||
.map_err(|e| format!("{e:?}"))?,
|
|
||||||
);
|
|
||||||
self.sessions.insert(network, session);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If any of our peers is lacking, try to connect to more
|
// If any of our peers is lacking, try to connect to more
|
||||||
let mut dialed = false;
|
let mut dialed = false;
|
||||||
@@ -81,7 +51,14 @@ impl ContinuallyRan for DialTask {
|
|||||||
only try to connect to most of the validators actually present.
|
only try to connect to most of the validators actually present.
|
||||||
*/
|
*/
|
||||||
if (peer_count < TARGET_PEERS_PER_NETWORK) &&
|
if (peer_count < TARGET_PEERS_PER_NETWORK) &&
|
||||||
(peer_count < self.validators[&network].len().saturating_sub(1))
|
(peer_count <
|
||||||
|
self
|
||||||
|
.validators
|
||||||
|
.validators()
|
||||||
|
.get(&network)
|
||||||
|
.map(Vec::len)
|
||||||
|
.unwrap_or(0)
|
||||||
|
.saturating_sub(1))
|
||||||
{
|
{
|
||||||
let mut potential_peers =
|
let mut potential_peers =
|
||||||
self.serai.p2p_validators(network).await.map_err(|e| format!("{e:?}"))?;
|
self.serai.p2p_validators(network).await.map_err(|e| format!("{e:?}"))?;
|
||||||
|
|||||||
@@ -59,12 +59,11 @@ pub(crate) fn new_behavior() -> Behavior {
|
|||||||
})
|
})
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
// TODO: Don't use IdentityTransform here. Authenticate using validator keys
|
let mut gossip = Behavior::new(MessageAuthenticity::Anonymous, config.unwrap()).unwrap();
|
||||||
let mut gossipsub = Behavior::new(MessageAuthenticity::Anonymous, config.unwrap()).unwrap();
|
|
||||||
|
|
||||||
// Subscribe to the base topic
|
// Subscribe to the base topic
|
||||||
let topic = IdentTopic::new(BASE_TOPIC);
|
let topic = IdentTopic::new(BASE_TOPIC);
|
||||||
let _ = gossipsub.subscribe(&topic);
|
let _ = gossip.subscribe(&topic);
|
||||||
|
|
||||||
gossipsub
|
gossip
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,12 +1,46 @@
|
|||||||
use serai_client::primitives::NetworkId;
|
use std::{
|
||||||
|
sync::{Arc, RwLock},
|
||||||
|
collections::{HashSet, HashMap},
|
||||||
|
};
|
||||||
|
|
||||||
|
use serai_client::primitives::{NetworkId, PublicKey};
|
||||||
|
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
|
use futures_util::StreamExt;
|
||||||
|
use libp2p::{
|
||||||
|
multihash::Multihash,
|
||||||
|
identity::PeerId,
|
||||||
|
swarm::{dial_opts::DialOpts, NetworkBehaviour, SwarmEvent, Swarm},
|
||||||
|
};
|
||||||
|
|
||||||
|
/// A struct to sync the validators from the Serai node in order to keep track of them.
|
||||||
|
mod validators;
|
||||||
|
|
||||||
|
/// The authentication protocol upgrade to limit the P2P network to active validators.
|
||||||
|
mod authenticate;
|
||||||
|
|
||||||
|
/// The dial task, to find new peers to connect to
|
||||||
|
mod dial;
|
||||||
|
|
||||||
|
/// The request-response messages and behavior
|
||||||
mod reqres;
|
mod reqres;
|
||||||
use reqres::{Request, Response};
|
use reqres::{Request, Response};
|
||||||
|
|
||||||
|
/// The gossip messages and behavior
|
||||||
mod gossip;
|
mod gossip;
|
||||||
|
|
||||||
|
/// The heartbeat task, effecting sync of Tributaries
|
||||||
mod heartbeat;
|
mod heartbeat;
|
||||||
|
|
||||||
|
const PORT: u16 = 30563; // 5132 ^ (('c' << 8) | 'o')
|
||||||
|
|
||||||
|
fn peer_id_from_public(public: PublicKey) -> PeerId {
|
||||||
|
// 0 represents the identity Multihash, that no hash was performed
|
||||||
|
// It's an internal constant so we can't refer to the constant inside libp2p
|
||||||
|
PeerId::from_multihash(Multihash::wrap(0, &public.0).unwrap()).unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
struct Peer;
|
struct Peer;
|
||||||
impl Peer {
|
impl Peer {
|
||||||
async fn send(&self, request: Request) -> Result<Response, tokio::time::error::Elapsed> {
|
async fn send(&self, request: Request) -> Result<Response, tokio::time::error::Elapsed> {
|
||||||
@@ -14,6 +48,11 @@ impl Peer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct Peers {
|
||||||
|
peers: Arc<RwLock<HashMap<NetworkId, HashSet<PeerId>>>>,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
struct P2p;
|
struct P2p;
|
||||||
impl P2p {
|
impl P2p {
|
||||||
@@ -28,3 +67,9 @@ impl tributary::P2p for P2p {
|
|||||||
todo!("TODO")
|
todo!("TODO")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(NetworkBehaviour)]
|
||||||
|
struct Behavior {
|
||||||
|
reqres: reqres::Behavior,
|
||||||
|
gossip: gossip::Behavior,
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
use core::time::Duration;
|
use core::{fmt, time::Duration};
|
||||||
use std::io::{self, Read};
|
use std::io::{self, Read};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
@@ -46,6 +46,15 @@ pub(crate) enum Response {
|
|||||||
Blocks(Vec<TributaryBlockWithCommit>),
|
Blocks(Vec<TributaryBlockWithCommit>),
|
||||||
NotableCosigns(Vec<SignedCosign>),
|
NotableCosigns(Vec<SignedCosign>),
|
||||||
}
|
}
|
||||||
|
impl fmt::Debug for Response {
|
||||||
|
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
(match self {
|
||||||
|
Response::Blocks(_) => fmt.debug_struct("Response::Block"),
|
||||||
|
Response::NotableCosigns(_) => fmt.debug_struct("Response::NotableCosigns"),
|
||||||
|
})
|
||||||
|
.finish_non_exhaustive()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// The codec used for the request-response protocol.
|
/// The codec used for the request-response protocol.
|
||||||
///
|
///
|
||||||
@@ -53,7 +62,7 @@ pub(crate) enum Response {
|
|||||||
/// ideally, we'd use borsh directly with the `io` traits defined here, they're async and there
|
/// ideally, we'd use borsh directly with the `io` traits defined here, they're async and there
|
||||||
/// isn't an amenable API within borsh for incremental deserialization.
|
/// isn't an amenable API within borsh for incremental deserialization.
|
||||||
#[derive(Default, Clone, Copy, Debug)]
|
#[derive(Default, Clone, Copy, Debug)]
|
||||||
struct Codec;
|
pub(crate) struct Codec;
|
||||||
impl Codec {
|
impl Codec {
|
||||||
async fn read<M: BorshDeserialize>(io: &mut (impl Unpin + AsyncRead)) -> io::Result<M> {
|
async fn read<M: BorshDeserialize>(io: &mut (impl Unpin + AsyncRead)) -> io::Result<M> {
|
||||||
let mut len = [0; 4];
|
let mut len = [0; 4];
|
||||||
|
|||||||
69
coordinator/src/p2p/validators.rs
Normal file
69
coordinator/src/p2p/validators.rs
Normal file
@@ -0,0 +1,69 @@
|
|||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, Serai};
|
||||||
|
|
||||||
|
use libp2p::PeerId;
|
||||||
|
|
||||||
|
use crate::p2p::peer_id_from_public;
|
||||||
|
|
||||||
|
pub(crate) struct Validators {
|
||||||
|
serai: Serai,
|
||||||
|
|
||||||
|
// A cache for which session we're populated with the validators of
|
||||||
|
sessions: HashMap<NetworkId, Session>,
|
||||||
|
// The validators by network
|
||||||
|
by_network: HashMap<NetworkId, Vec<PeerId>>,
|
||||||
|
// The set of all validators (as a HashMap<PeerId, usize> to represent the amount of inclusions)
|
||||||
|
set: HashMap<PeerId, usize>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Validators {
|
||||||
|
pub(crate) async fn update(&mut self) -> Result<(), String> {
|
||||||
|
let temporal_serai =
|
||||||
|
self.serai.as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?;
|
||||||
|
let temporal_serai = temporal_serai.validator_sets();
|
||||||
|
for network in serai_client::primitives::NETWORKS {
|
||||||
|
if network == NetworkId::Serai {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let Some(session) = temporal_serai.session(network).await.map_err(|e| format!("{e:?}"))?
|
||||||
|
else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
// If the session has changed, populate it with the current validators
|
||||||
|
if self.sessions.get(&network) != Some(&session) {
|
||||||
|
let new_validators =
|
||||||
|
temporal_serai.active_network_validators(network).await.map_err(|e| format!("{e:?}"))?;
|
||||||
|
let new_validators =
|
||||||
|
new_validators.into_iter().map(peer_id_from_public).collect::<Vec<_>>();
|
||||||
|
|
||||||
|
// Remove the existing validators
|
||||||
|
for validator in self.by_network.remove(&network).unwrap_or(vec![]) {
|
||||||
|
let mut inclusions = self.set.remove(&validator).unwrap();
|
||||||
|
inclusions -= 1;
|
||||||
|
if inclusions != 0 {
|
||||||
|
self.set.insert(validator, inclusions);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the new validators
|
||||||
|
for validator in new_validators.iter().copied() {
|
||||||
|
*self.set.entry(validator).or_insert(0) += 1;
|
||||||
|
}
|
||||||
|
self.by_network.insert(network, new_validators);
|
||||||
|
|
||||||
|
// Update the session we have populated
|
||||||
|
self.sessions.insert(network, session);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn validators(&self) -> &HashMap<NetworkId, Vec<PeerId>> {
|
||||||
|
&self.by_network
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn contains(&self, peer_id: &PeerId) -> bool {
|
||||||
|
self.set.contains_key(peer_id)
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user