Add a trait for the P2p network used in the coordinator

Moves all of the Libp2p code to a dedicated directory. Makes the Heartbeat task
abstract over any P2p network.
This commit is contained in:
Luke Parker
2025-01-08 17:01:37 -05:00
parent 376a66b000
commit fd9b464b35
9 changed files with 194 additions and 178 deletions

View File

@@ -11,10 +11,7 @@ use serai_task::ContinuallyRan;
use crate::{ use crate::{
tributary::Transaction, tributary::Transaction,
p2p::{ p2p::{Peer, P2p},
reqres::{Request, Response},
P2p,
},
}; };
// Amount of blocks in a minute // Amount of blocks in a minute
@@ -28,14 +25,14 @@ pub const BLOCKS_PER_BATCH: usize = BLOCKS_PER_MINUTE + 1;
/// ///
/// If the other validator has more blocks then we do, they're expected to inform us. This forms /// If the other validator has more blocks then we do, they're expected to inform us. This forms
/// the sync protocol for our Tributaries. /// the sync protocol for our Tributaries.
struct HeartbeatTask<TD: Db> { struct HeartbeatTask<TD: Db, P: P2p> {
set: ValidatorSet, set: ValidatorSet,
tributary: Tributary<TD, Transaction, P2p>, tributary: Tributary<TD, Transaction, P>,
reader: TributaryReader<TD, Transaction>, reader: TributaryReader<TD, Transaction>,
p2p: P2p, p2p: P,
} }
impl<TD: Db> ContinuallyRan for HeartbeatTask<TD> { impl<TD: Db, P: P2p> ContinuallyRan for HeartbeatTask<TD, P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move { async move {
// If our blockchain hasn't had a block in the past minute, trigger the heartbeat protocol // If our blockchain hasn't had a block in the past minute, trigger the heartbeat protocol
@@ -74,8 +71,7 @@ impl<TD: Db> ContinuallyRan for HeartbeatTask<TD> {
tip = self.reader.tip(); tip = self.reader.tip();
tip_is_stale = false; tip_is_stale = false;
} }
let request = Request::Heartbeat { set: self.set, latest_block_hash: tip }; let Ok(blocks) = peer.send_heartbeat(self.set, tip).await else { continue 'peer };
let Ok(Response::Blocks(blocks)) = peer.send(request).await else { continue 'peer };
// This is the final batch if it has less than the maximum amount of blocks // This is the final batch if it has less than the maximum amount of blocks
// (signifying there weren't more blocks after this to fill the batch with) // (signifying there weren't more blocks after this to fill the batch with)

View File

@@ -19,7 +19,7 @@ use libp2p::{
noise, noise,
}; };
use crate::p2p::{validators::Validators, peer_id_from_public}; use crate::p2p::libp2p::{validators::Validators, peer_id_from_public};
const PROTOCOL: &str = "/serai/coordinator/validators"; const PROTOCOL: &str = "/serai/coordinator/validators";

View File

@@ -14,7 +14,7 @@ use libp2p::{
use serai_task::ContinuallyRan; use serai_task::ContinuallyRan;
use crate::p2p::{PORT, Peers, validators::Validators}; use crate::p2p::libp2p::{PORT, Peers, validators::Validators};
const TARGET_PEERS_PER_NETWORK: usize = 5; const TARGET_PEERS_PER_NETWORK: usize = 5;
/* /*

View File

@@ -0,0 +1,163 @@
use core::future::Future;
use std::{
sync::Arc,
collections::{HashSet, HashMap},
};
use zeroize::Zeroizing;
use schnorrkel::Keypair;
use serai_client::{
primitives::{NetworkId, PublicKey},
Serai,
};
use tokio::sync::{mpsc, RwLock};
use serai_task::{Task, ContinuallyRan};
use libp2p::{
multihash::Multihash,
identity::{self, PeerId},
tcp::Config as TcpConfig,
yamux,
swarm::NetworkBehaviour,
SwarmBuilder,
};
/// A struct to sync the validators from the Serai node in order to keep track of them.
mod validators;
use validators::UpdateValidatorsTask;
/// The authentication protocol upgrade to limit the P2P network to active validators.
mod authenticate;
use authenticate::OnlyValidators;
/// The dial task, to find new peers to connect to
mod dial;
use dial::DialTask;
/// The request-response messages and behavior
mod reqres;
use reqres::{Request, Response};
/// The gossip messages and behavior
mod gossip;
/// The swarm task, running it and dispatching to/from it
mod swarm;
use swarm::SwarmTask;
const PORT: u16 = 30563; // 5132 ^ (('c' << 8) | 'o')
// usize::max, manually implemented, as max isn't a const fn
const MAX_LIBP2P_MESSAGE_SIZE: usize =
if gossip::MAX_LIBP2P_GOSSIP_MESSAGE_SIZE > reqres::MAX_LIBP2P_REQRES_MESSAGE_SIZE {
gossip::MAX_LIBP2P_GOSSIP_MESSAGE_SIZE
} else {
reqres::MAX_LIBP2P_REQRES_MESSAGE_SIZE
};
fn peer_id_from_public(public: PublicKey) -> PeerId {
// 0 represents the identity Multihash, that no hash was performed
// It's an internal constant so we can't refer to the constant inside libp2p
PeerId::from_multihash(Multihash::wrap(0, &public.0).unwrap()).unwrap()
}
struct Peer;
impl Peer {
async fn send(&self, request: Request) -> Result<Response, tokio::time::error::Elapsed> {
(async move { todo!("TODO") }).await
}
}
#[derive(Clone)]
struct Peers {
peers: Arc<RwLock<HashMap<NetworkId, HashSet<PeerId>>>>,
}
#[derive(NetworkBehaviour)]
struct Behavior {
reqres: reqres::Behavior,
gossip: gossip::Behavior,
}
struct LibP2p;
impl LibP2p {
pub(crate) fn new(serai_key: &Zeroizing<Keypair>, serai: Serai) -> LibP2p {
// Define the object we track peers with
let peers = Peers { peers: Arc::new(RwLock::new(HashMap::new())) };
// Define the dial task
let (dial_task_def, dial_task) = Task::new();
let (to_dial_send, to_dial_recv) = mpsc::unbounded_channel();
tokio::spawn(
DialTask::new(serai.clone(), peers.clone(), to_dial_send)
.continually_run(dial_task_def, vec![]),
);
// Define the Validators object used for validating new connections
let connection_validators = UpdateValidatorsTask::spawn(serai.clone());
let new_only_validators = |noise_keypair: &identity::Keypair| -> Result<_, ()> {
Ok(OnlyValidators {
serai_key: serai_key.clone(),
validators: connection_validators.clone(),
noise_keypair: noise_keypair.clone(),
})
};
let new_yamux = || {
let mut config = yamux::Config::default();
// 1 MiB default + max message size
config.set_max_buffer_size((1024 * 1024) + MAX_LIBP2P_MESSAGE_SIZE);
// 256 KiB default + max message size
config.set_receive_window_size(((256 * 1024) + MAX_LIBP2P_MESSAGE_SIZE).try_into().unwrap());
config
};
let behavior = Behavior { reqres: reqres::new_behavior(), gossip: gossip::new_behavior() };
let mut swarm = SwarmBuilder::with_existing_identity(identity::Keypair::generate_ed25519())
.with_tokio()
.with_tcp(TcpConfig::default().nodelay(false), new_only_validators, new_yamux)
.unwrap()
.with_behaviour(|_| behavior)
.unwrap()
.build();
swarm.listen_on(format!("/ip4/0.0.0.0/tcp/{PORT}").parse().unwrap()).unwrap();
swarm.listen_on(format!("/ip6/::/tcp/{PORT}").parse().unwrap()).unwrap();
let swarm_validators = UpdateValidatorsTask::spawn(serai);
let (gossip_send, gossip_recv) = mpsc::unbounded_channel();
let (signed_cosigns_send, signed_cosigns_recv) = mpsc::unbounded_channel();
let (tributary_gossip_send, tributary_gossip_recv) = mpsc::unbounded_channel();
let (outbound_requests_send, outbound_requests_recv) = mpsc::unbounded_channel();
let (heartbeat_requests_send, heartbeat_requests_recv) = mpsc::unbounded_channel();
let (notable_cosign_requests_send, notable_cosign_requests_recv) = mpsc::unbounded_channel();
let (inbound_request_responses_send, inbound_request_responses_recv) =
mpsc::unbounded_channel();
// Create the swarm task
SwarmTask::spawn(
dial_task,
to_dial_recv,
swarm_validators,
peers,
swarm,
gossip_recv,
signed_cosigns_send,
tributary_gossip_send,
outbound_requests_recv,
heartbeat_requests_send,
notable_cosign_requests_send,
inbound_request_responses_recv,
);
// gossip_send, signed_cosigns_recv, tributary_gossip_recv, outbound_requests_send,
// heartbeat_requests_recv, notable_cosign_requests_recv, inbound_request_responses_send
todo!("TODO");
}
}

View File

@@ -21,7 +21,7 @@ use libp2p::{
swarm::{dial_opts::DialOpts, SwarmEvent, Swarm}, swarm::{dial_opts::DialOpts, SwarmEvent, Swarm},
}; };
use crate::p2p::{ use crate::p2p::libp2p::{
Peers, BehaviorEvent, Behavior, Peers, BehaviorEvent, Behavior,
validators::Validators, validators::Validators,
reqres::{self, Request, Response}, reqres::{self, Request, Response},
@@ -286,7 +286,7 @@ impl SwarmTask {
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub(crate) fn new( pub(crate) fn spawn(
dial_task: TaskHandle, dial_task: TaskHandle,
to_dial: mpsc::UnboundedReceiver<DialOpts>, to_dial: mpsc::UnboundedReceiver<DialOpts>,

View File

@@ -13,7 +13,7 @@ use libp2p::PeerId;
use futures_util::stream::{StreamExt, FuturesUnordered}; use futures_util::stream::{StreamExt, FuturesUnordered};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use crate::p2p::peer_id_from_public; use crate::p2p::libp2p::peer_id_from_public;
pub(crate) struct Validators { pub(crate) struct Validators {
serai: Serai, serai: Serai,

View File

@@ -1,176 +1,33 @@
use std::{ use core::future::Future;
sync::Arc,
collections::{HashSet, HashMap},
};
use zeroize::Zeroizing; use tokio::time::error::Elapsed;
use schnorrkel::Keypair;
use serai_client::{ use borsh::{BorshSerialize, BorshDeserialize};
primitives::{NetworkId, PublicKey},
Serai,
};
use tokio::sync::{mpsc, RwLock}; use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet};
use serai_task::{Task, ContinuallyRan}; /// The libp2p-backed P2p network
mod libp2p;
use libp2p::{
multihash::Multihash,
identity::{self, PeerId},
tcp::Config as TcpConfig,
yamux,
swarm::NetworkBehaviour,
SwarmBuilder,
};
/// A struct to sync the validators from the Serai node in order to keep track of them.
mod validators;
use validators::UpdateValidatorsTask;
/// The authentication protocol upgrade to limit the P2P network to active validators.
mod authenticate;
use authenticate::OnlyValidators;
/// The dial task, to find new peers to connect to
mod dial;
use dial::DialTask;
/// The request-response messages and behavior
mod reqres;
use reqres::{Request, Response};
/// The gossip messages and behavior
mod gossip;
/// The heartbeat task, effecting sync of Tributaries /// The heartbeat task, effecting sync of Tributaries
mod heartbeat; mod heartbeat;
/// The swarm task, running it and dispatching to/from it /// A tributary block and its commit.
mod swarm; #[derive(Clone, BorshSerialize, BorshDeserialize)]
use swarm::SwarmTask; pub(crate) struct TributaryBlockWithCommit {
pub(crate) block: Vec<u8>,
const PORT: u16 = 30563; // 5132 ^ (('c' << 8) | 'o') pub(crate) commit: Vec<u8>,
// usize::max, manually implemented, as max isn't a const fn
const MAX_LIBP2P_MESSAGE_SIZE: usize =
if gossip::MAX_LIBP2P_GOSSIP_MESSAGE_SIZE > reqres::MAX_LIBP2P_REQRES_MESSAGE_SIZE {
gossip::MAX_LIBP2P_GOSSIP_MESSAGE_SIZE
} else {
reqres::MAX_LIBP2P_REQRES_MESSAGE_SIZE
};
fn peer_id_from_public(public: PublicKey) -> PeerId {
// 0 represents the identity Multihash, that no hash was performed
// It's an internal constant so we can't refer to the constant inside libp2p
PeerId::from_multihash(Multihash::wrap(0, &public.0).unwrap()).unwrap()
} }
struct Peer; trait Peer: Send {
impl Peer { fn send_heartbeat(
async fn send(&self, request: Request) -> Result<Response, tokio::time::error::Elapsed> { &self,
(async move { todo!("TODO") }).await set: ValidatorSet,
} latest_block_hash: [u8; 32],
) -> impl Send + Future<Output = Result<Vec<TributaryBlockWithCommit>, Elapsed>>;
} }
#[derive(Clone)] trait P2p: Send + Sync + tributary::P2p {
struct Peers { type Peer: Peer;
peers: Arc<RwLock<HashMap<NetworkId, HashSet<PeerId>>>>, fn peers(&self, network: NetworkId) -> impl Send + Future<Output = Vec<Self::Peer>>;
}
#[derive(Clone, Debug)]
struct P2p;
impl P2p {
async fn peers(&self, set: NetworkId) -> Vec<Peer> {
(async move { todo!("TODO") }).await
}
}
#[async_trait::async_trait]
impl tributary::P2p for P2p {
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) {
todo!("TODO")
}
}
#[derive(NetworkBehaviour)]
struct Behavior {
reqres: reqres::Behavior,
gossip: gossip::Behavior,
}
pub(crate) fn new(serai_key: &Zeroizing<Keypair>, serai: Serai) -> P2p {
// Define the object we track peers with
let peers = Peers { peers: Arc::new(RwLock::new(HashMap::new())) };
// Define the dial task
let (dial_task_def, dial_task) = Task::new();
let (to_dial_send, to_dial_recv) = mpsc::unbounded_channel();
tokio::spawn(
DialTask::new(serai.clone(), peers.clone(), to_dial_send)
.continually_run(dial_task_def, vec![]),
);
// Define the Validators object used for validating new connections
let connection_validators = UpdateValidatorsTask::spawn(serai.clone());
let new_only_validators = |noise_keypair: &identity::Keypair| -> Result<_, ()> {
Ok(OnlyValidators {
serai_key: serai_key.clone(),
validators: connection_validators.clone(),
noise_keypair: noise_keypair.clone(),
})
};
let new_yamux = || {
let mut config = yamux::Config::default();
// 1 MiB default + max message size
config.set_max_buffer_size((1024 * 1024) + MAX_LIBP2P_MESSAGE_SIZE);
// 256 KiB default + max message size
config.set_receive_window_size(((256 * 1024) + MAX_LIBP2P_MESSAGE_SIZE).try_into().unwrap());
config
};
let behavior = Behavior { reqres: reqres::new_behavior(), gossip: gossip::new_behavior() };
let mut swarm = SwarmBuilder::with_existing_identity(identity::Keypair::generate_ed25519())
.with_tokio()
.with_tcp(TcpConfig::default().nodelay(false), new_only_validators, new_yamux)
.unwrap()
.with_behaviour(|_| behavior)
.unwrap()
.build();
swarm.listen_on(format!("/ip4/0.0.0.0/tcp/{PORT}").parse().unwrap()).unwrap();
swarm.listen_on(format!("/ip6/::/tcp/{PORT}").parse().unwrap()).unwrap();
let swarm_validators = UpdateValidatorsTask::spawn(serai);
let (gossip_send, gossip_recv) = mpsc::unbounded_channel();
let (signed_cosigns_send, signed_cosigns_recv) = mpsc::unbounded_channel();
let (tributary_gossip_send, tributary_gossip_recv) = mpsc::unbounded_channel();
let (outbound_requests_send, outbound_requests_recv) = mpsc::unbounded_channel();
let (heartbeat_requests_send, heartbeat_requests_recv) = mpsc::unbounded_channel();
let (notable_cosign_requests_send, notable_cosign_requests_recv) = mpsc::unbounded_channel();
let (inbound_request_responses_send, inbound_request_responses_recv) = mpsc::unbounded_channel();
// Create the swarm task
SwarmTask::new(
dial_task,
to_dial_recv,
swarm_validators,
peers,
swarm,
gossip_recv,
signed_cosigns_send,
tributary_gossip_send,
outbound_requests_recv,
heartbeat_requests_send,
notable_cosign_requests_send,
inbound_request_responses_recv,
);
// gossip_send, signed_cosigns_recv, tributary_gossip_recv, outbound_requests_send,
// heartbeat_requests_recv, notable_cosign_requests_recv, inbound_request_responses_send
todo!("TODO");
} }