diff --git a/coordinator/src/p2p/libp2p/gossip.rs b/coordinator/src/p2p/libp2p/gossip.rs index 99196fb6..66a0b24a 100644 --- a/coordinator/src/p2p/libp2p/gossip.rs +++ b/coordinator/src/p2p/libp2p/gossip.rs @@ -2,9 +2,7 @@ use core::time::Duration; use blake2::{Digest, Blake2s256}; -use scale::Encode; use borsh::{BorshSerialize, BorshDeserialize}; -use serai_client::validator_sets::primitives::ValidatorSet; use libp2p::gossipsub::{ TopicHash, IdentTopic, MessageId, MessageAuthenticity, ValidationMode, ConfigBuilder, @@ -22,20 +20,20 @@ const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(80); const LIBP2P_PROTOCOL: &str = "/serai/coordinator/gossip/1.0.0"; const BASE_TOPIC: &str = "/"; -fn topic_for_set(set: ValidatorSet) -> IdentTopic { - IdentTopic::new(format!("/set/{}", hex::encode(set.encode()))) +fn topic_for_tributary(tributary: [u8; 32]) -> IdentTopic { + IdentTopic::new(format!("/tributary/{}", hex::encode(tributary))) } #[derive(Clone, BorshSerialize, BorshDeserialize)] pub(crate) enum Message { - Tributary { set: ValidatorSet, message: Vec }, + Tributary { tributary: [u8; 32], message: Vec }, Cosign(SignedCosign), } impl Message { pub(crate) fn topic(&self) -> TopicHash { match self { - Message::Tributary { set, .. } => topic_for_set(*set).hash(), + Message::Tributary { tributary, .. } => topic_for_tributary(*tributary).hash(), Message::Cosign(_) => IdentTopic::new(BASE_TOPIC).hash(), } } diff --git a/coordinator/src/p2p/libp2p/mod.rs b/coordinator/src/p2p/libp2p/mod.rs index 79f06c19..93db7c88 100644 --- a/coordinator/src/p2p/libp2p/mod.rs +++ b/coordinator/src/p2p/libp2p/mod.rs @@ -4,6 +4,8 @@ use std::{ collections::{HashSet, HashMap}, }; +use rand_core::{RngCore, OsRng}; + use zeroize::Zeroizing; use schnorrkel::Keypair; @@ -13,10 +15,12 @@ use serai_client::{ Serai, }; -use tokio::sync::{mpsc, oneshot, RwLock}; +use tokio::sync::{mpsc, oneshot, Mutex, RwLock}; use serai_task::{Task, ContinuallyRan}; +use serai_cosign::SignedCosign; + use libp2p::{ multihash::Multihash, identity::{self, PeerId}, @@ -42,10 +46,11 @@ use dial::DialTask; /// The request-response messages and behavior mod reqres; -use reqres::{Request, Response}; +use reqres::{RequestId, Request, Response}; /// The gossip messages and behavior mod gossip; +use gossip::Message; /// The swarm task, running it and dispatching to/from it mod swarm; @@ -77,19 +82,21 @@ impl crate::p2p::Peer<'_> for Peer<'_> { set: ValidatorSet, latest_block_hash: [u8; 32], ) -> impl Send + Future>> { - const HEARBEAT_TIMEOUT: Duration = Duration::from_secs(5); async move { + const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5); + let request = Request::Heartbeat { set, latest_block_hash }; let (sender, receiver) = oneshot::channel(); self .outbound_requests .send((self.id, request, sender)) .expect("outbound requests recv channel was dropped?"); - match tokio::time::timeout(HEARBEAT_TIMEOUT, receiver).await.ok()?.ok()? { - Response::None => Some(vec![]), - Response::Blocks(blocks) => Some(blocks), - // TODO: Disconnect this peer - Response::NotableCosigns(_) => None, + if let Ok(Ok(Response::Blocks(blocks))) = + tokio::time::timeout(HEARTBEAT_TIMEOUT, receiver).await + { + Some(blocks) + } else { + None } } } @@ -109,7 +116,18 @@ struct Behavior { #[derive(Clone)] struct Libp2p { peers: Peers, + + gossip: mpsc::UnboundedSender, outbound_requests: mpsc::UnboundedSender<(PeerId, Request, oneshot::Sender)>, + + tributary_gossip: Arc)>>>, + + signed_cosigns: Arc>>, + signed_cosigns_send: mpsc::UnboundedSender, + + heartbeat_requests: Arc>>, + notable_cosign_requests: Arc>>, + inbound_request_responses: mpsc::UnboundedSender<(RequestId, Response)>, } impl Libp2p { @@ -174,10 +192,10 @@ impl Libp2p { dial_task, to_dial_recv, swarm_validators, - peers, + peers.clone(), swarm, gossip_recv, - signed_cosigns_send, + signed_cosigns_send.clone(), tributary_gossip_send, outbound_requests_recv, heartbeat_requests_send, @@ -185,20 +203,92 @@ impl Libp2p { inbound_request_responses_recv, ); - // gossip_send, signed_cosigns_recv, tributary_gossip_recv, outbound_requests_send, - // heartbeat_requests_recv, notable_cosign_requests_recv, inbound_request_responses_send - todo!("TODO"); + Libp2p { + peers, + + gossip: gossip_send, + outbound_requests: outbound_requests_send, + + tributary_gossip: Arc::new(Mutex::new(tributary_gossip_recv)), + + signed_cosigns: Arc::new(Mutex::new(signed_cosigns_recv)), + signed_cosigns_send, + + heartbeat_requests: Arc::new(Mutex::new(heartbeat_requests_recv)), + notable_cosign_requests: Arc::new(Mutex::new(notable_cosign_requests_recv)), + inbound_request_responses: inbound_request_responses_send, + } } } impl tributary::P2p for Libp2p { - fn broadcast(&self, genesis: [u8; 32], msg: Vec) -> impl Send + Future { - async move { todo!("TODO") } + fn broadcast(&self, tributary: [u8; 32], message: Vec) -> impl Send + Future { + async move { + self + .gossip + .send(Message::Tributary { tributary, message }) + .expect("gossip recv channel was dropped?"); + } + } +} + +impl serai_cosign::RequestNotableCosigns for Libp2p { + type Error = (); + + fn request_notable_cosigns( + &self, + global_session: [u8; 32], + ) -> impl Send + Future> { + async move { + const AMOUNT_OF_PEERS_TO_REQUEST_FROM: usize = 3; + const NOTABLE_COSIGNS_TIMEOUT: Duration = Duration::from_secs(5); + + let request = Request::NotableCosigns { global_session }; + + let peers = self.peers.peers.read().await.clone(); + // HashSet of all peers + let peers = peers.into_values().flat_map(<_>::into_iter).collect::>(); + // Vec of all peers + let mut peers = peers.into_iter().collect::>(); + + let mut channels = Vec::with_capacity(AMOUNT_OF_PEERS_TO_REQUEST_FROM); + for _ in 0 .. AMOUNT_OF_PEERS_TO_REQUEST_FROM { + if peers.is_empty() { + break; + } + let i = usize::try_from(OsRng.next_u64() % u64::try_from(peers.len()).unwrap()).unwrap(); + let peer = peers.swap_remove(i); + + let (sender, receiver) = oneshot::channel(); + self + .outbound_requests + .send((peer, request, sender)) + .expect("outbound requests recv channel was dropped?"); + channels.push(receiver); + } + + // We could reduce our latency by using FuturesUnordered here but the latency isn't a concern + for channel in channels { + if let Ok(Ok(Response::NotableCosigns(cosigns))) = + tokio::time::timeout(NOTABLE_COSIGNS_TIMEOUT, channel).await + { + for cosign in cosigns { + self + .signed_cosigns_send + .send(cosign) + .expect("signed_cosigns recv in this object was dropped?"); + } + } + } + + Ok(()) + } } } impl crate::p2p::P2p for Libp2p { type Peer<'a> = Peer<'a>; + fn peers(&self, network: NetworkId) -> impl Send + Future>> { async move { let Some(peer_ids) = self.peers.peers.read().await.get(&network).cloned() else { @@ -211,4 +301,79 @@ impl crate::p2p::P2p for Libp2p { res } } + + fn heartbeat( + &self, + ) -> impl Send + + Future>)> + { + async move { + let (request_id, set, latest_block_hash) = self + .heartbeat_requests + .lock() + .await + .recv() + .await + .expect("heartbeat_requests_send was dropped?"); + let (sender, receiver) = oneshot::channel(); + tokio::spawn({ + let respond = self.inbound_request_responses.clone(); + async move { + let response = + if let Ok(blocks) = receiver.await { Response::Blocks(blocks) } else { Response::None }; + respond + .send((request_id, response)) + .expect("inbound_request_responses_recv was dropped?"); + } + }); + (set, latest_block_hash, sender) + } + } + + fn notable_cosigns_request( + &self, + ) -> impl Send + Future>)> { + async move { + let (request_id, global_session) = self + .notable_cosign_requests + .lock() + .await + .recv() + .await + .expect("notable_cosign_requests_send was dropped?"); + let (sender, receiver) = oneshot::channel(); + tokio::spawn({ + let respond = self.inbound_request_responses.clone(); + async move { + let response = if let Ok(notable_cosigns) = receiver.await { + Response::NotableCosigns(notable_cosigns) + } else { + Response::None + }; + respond + .send((request_id, response)) + .expect("inbound_request_responses_recv was dropped?"); + } + }); + (global_session, sender) + } + } + + fn tributary_message(&self) -> impl Send + Future)> { + async move { + self.tributary_gossip.lock().await.recv().await.expect("tributary_gossip send was dropped?") + } + } + + fn cosign(&self) -> impl Send + Future { + async move { + self + .signed_cosigns + .lock() + .await + .recv() + .await + .expect("signed_cosigns couldn't recv despite send in same object?") + } + } } diff --git a/coordinator/src/p2p/libp2p/reqres.rs b/coordinator/src/p2p/libp2p/reqres.rs index e3d761e5..f58abc8b 100644 --- a/coordinator/src/p2p/libp2p/reqres.rs +++ b/coordinator/src/p2p/libp2p/reqres.rs @@ -11,7 +11,7 @@ use futures_util::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use libp2p::request_response::{ self, Codec as CodecTrait, Event as GenericEvent, Config, Behaviour, ProtocolSupport, }; -pub use request_response::Message; +pub use request_response::{RequestId, Message}; use serai_cosign::SignedCosign; diff --git a/coordinator/src/p2p/libp2p/swarm.rs b/coordinator/src/p2p/libp2p/swarm.rs index 615295f4..148e615f 100644 --- a/coordinator/src/p2p/libp2p/swarm.rs +++ b/coordinator/src/p2p/libp2p/swarm.rs @@ -61,7 +61,7 @@ pub(crate) struct SwarmTask { gossip: mpsc::UnboundedReceiver, signed_cosigns: mpsc::UnboundedSender, - tributary_gossip: mpsc::UnboundedSender<(ValidatorSet, Vec)>, + tributary_gossip: mpsc::UnboundedSender<([u8; 32], Vec)>, outbound_requests: mpsc::UnboundedReceiver<(PeerId, Request, oneshot::Sender)>, outbound_request_responses: HashMap>, @@ -82,12 +82,13 @@ impl SwarmTask { match event { gossip::Event::Message { message, .. } => { let Ok(message) = gossip::Message::deserialize(&mut message.data.as_slice()) else { - // TODO: Penalize the PeerId which sent this message + // TODO: Penalize the PeerId which created this message, which requires authenticating + // each message OR moving to explicit acknowledgement before re-gossiping return; }; match message { - gossip::Message::Tributary { set, message } => { - let _: Result<_, _> = self.tributary_gossip.send((set, message)); + gossip::Message::Tributary { tributary, message } => { + let _: Result<_, _> = self.tributary_gossip.send((tributary, message)); } gossip::Message::Cosign(signed_cosign) => { let _: Result<_, _> = self.signed_cosigns.send(signed_cosign); @@ -296,7 +297,7 @@ impl SwarmTask { gossip: mpsc::UnboundedReceiver, signed_cosigns: mpsc::UnboundedSender, - tributary_gossip: mpsc::UnboundedSender<(ValidatorSet, Vec)>, + tributary_gossip: mpsc::UnboundedSender<([u8; 32], Vec)>, outbound_requests: mpsc::UnboundedReceiver<(PeerId, Request, oneshot::Sender)>, diff --git a/coordinator/src/p2p/mod.rs b/coordinator/src/p2p/mod.rs index 414e4ec3..9c501973 100644 --- a/coordinator/src/p2p/mod.rs +++ b/coordinator/src/p2p/mod.rs @@ -4,6 +4,10 @@ use borsh::{BorshSerialize, BorshDeserialize}; use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet}; +use tokio::sync::oneshot; + +use serai_cosign::SignedCosign; + /// The libp2p-backed P2p network mod libp2p; @@ -25,7 +29,33 @@ trait Peer<'a>: Send { ) -> impl Send + Future>>; } -trait P2p: Send + Sync + tributary::P2p { +trait P2p: Send + Sync + tributary::P2p + serai_cosign::RequestNotableCosigns { type Peer<'a>: Peer<'a>; + + /// Fetch the peers for this network. fn peers(&self, network: NetworkId) -> impl Send + Future>>; + + /// A cancel-safe future for the next heartbeat received over the P2P network. + /// + /// Yields the validator set its for, the latest block hash observed, and a channel to return the + /// descending blocks. + fn heartbeat( + &self, + ) -> impl Send + + Future>)>; + + /// A cancel-safe future for the next request for the notable cosigns of a gloabl session. + /// + /// Yields the global session the request is for and a channel to return the notable cosigns. + fn notable_cosigns_request( + &self, + ) -> impl Send + Future>)>; + + /// A cancel-safe future for the next message regarding a Tributary. + /// + /// Yields the message's Tributary's genesis block hash and the message. + fn tributary_message(&self) -> impl Send + Future)>; + + /// A cancel-safe future for the next cosign received. + fn cosign(&self) -> impl Send + Future; }