Actually implement the Peer abstraction for Libp2p

This commit is contained in:
Luke Parker
2025-01-08 17:40:08 -05:00
parent fd9b464b35
commit de2d6568a4
5 changed files with 77 additions and 34 deletions

View File

@@ -1,9 +1,10 @@
use core::future::Future; use core::future::Future;
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
use serai_client::validator_sets::primitives::ValidatorSet; use serai_client::validator_sets::primitives::ValidatorSet;
use futures_util::FutureExt;
use tributary::{ReadWrite, Block, Tributary, TributaryReader}; use tributary::{ReadWrite, Block, Tributary, TributaryReader};
use serai_db::*; use serai_db::*;
@@ -71,7 +72,10 @@ impl<TD: Db, P: P2p> ContinuallyRan for HeartbeatTask<TD, P> {
tip = self.reader.tip(); tip = self.reader.tip();
tip_is_stale = false; tip_is_stale = false;
} }
let Ok(blocks) = peer.send_heartbeat(self.set, tip).await else { continue 'peer }; // Necessary due to https://github.com/rust-lang/rust/issues/100013
let Some(blocks) = peer.send_heartbeat(self.set, tip).boxed().await else {
continue 'peer;
};
// This is the final batch if it has less than the maximum amount of blocks // This is the final batch if it has less than the maximum amount of blocks
// (signifying there weren't more blocks after this to fill the batch with) // (signifying there weren't more blocks after this to fill the batch with)

View File

@@ -1,4 +1,4 @@
use core::future::Future; use core::{future::Future, time::Duration};
use std::{ use std::{
sync::Arc, sync::Arc,
collections::{HashSet, HashMap}, collections::{HashSet, HashMap},
@@ -9,10 +9,11 @@ use schnorrkel::Keypair;
use serai_client::{ use serai_client::{
primitives::{NetworkId, PublicKey}, primitives::{NetworkId, PublicKey},
validator_sets::primitives::ValidatorSet,
Serai, Serai,
}; };
use tokio::sync::{mpsc, RwLock}; use tokio::sync::{mpsc, oneshot, RwLock};
use serai_task::{Task, ContinuallyRan}; use serai_task::{Task, ContinuallyRan};
@@ -25,6 +26,8 @@ use libp2p::{
SwarmBuilder, SwarmBuilder,
}; };
use crate::p2p::TributaryBlockWithCommit;
/// A struct to sync the validators from the Serai node in order to keep track of them. /// A struct to sync the validators from the Serai node in order to keep track of them.
mod validators; mod validators;
use validators::UpdateValidatorsTask; use validators::UpdateValidatorsTask;
@@ -64,10 +67,31 @@ fn peer_id_from_public(public: PublicKey) -> PeerId {
PeerId::from_multihash(Multihash::wrap(0, &public.0).unwrap()).unwrap() PeerId::from_multihash(Multihash::wrap(0, &public.0).unwrap()).unwrap()
} }
struct Peer; struct Peer<'a> {
impl Peer { outbound_requests: &'a mpsc::UnboundedSender<(PeerId, Request, oneshot::Sender<Response>)>,
async fn send(&self, request: Request) -> Result<Response, tokio::time::error::Elapsed> { id: PeerId,
(async move { todo!("TODO") }).await }
impl crate::p2p::Peer<'_> for Peer<'_> {
fn send_heartbeat(
&self,
set: ValidatorSet,
latest_block_hash: [u8; 32],
) -> impl Send + Future<Output = Option<Vec<TributaryBlockWithCommit>>> {
const HEARBEAT_TIMEOUT: Duration = Duration::from_secs(5);
async move {
let request = Request::Heartbeat { set, latest_block_hash };
let (sender, receiver) = oneshot::channel();
self
.outbound_requests
.send((self.id, request, sender))
.expect("outbound requests recv channel was dropped?");
match tokio::time::timeout(HEARBEAT_TIMEOUT, receiver).await.ok()?.ok()? {
Response::None => Some(vec![]),
Response::Blocks(blocks) => Some(blocks),
// TODO: Disconnect this peer
Response::NotableCosigns(_) => None,
}
}
} }
} }
@@ -82,9 +106,14 @@ struct Behavior {
gossip: gossip::Behavior, gossip: gossip::Behavior,
} }
struct LibP2p; #[derive(Clone)]
impl LibP2p { struct Libp2p {
pub(crate) fn new(serai_key: &Zeroizing<Keypair>, serai: Serai) -> LibP2p { peers: Peers,
outbound_requests: mpsc::UnboundedSender<(PeerId, Request, oneshot::Sender<Response>)>,
}
impl Libp2p {
pub(crate) fn new(serai_key: &Zeroizing<Keypair>, serai: Serai) -> Libp2p {
// Define the object we track peers with // Define the object we track peers with
let peers = Peers { peers: Arc::new(RwLock::new(HashMap::new())) }; let peers = Peers { peers: Arc::new(RwLock::new(HashMap::new())) };
@@ -161,3 +190,25 @@ impl LibP2p {
todo!("TODO"); todo!("TODO");
} }
} }
impl tributary::P2p for Libp2p {
fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) -> impl Send + Future<Output = ()> {
async move { todo!("TODO") }
}
}
impl crate::p2p::P2p for Libp2p {
type Peer<'a> = Peer<'a>;
fn peers(&self, network: NetworkId) -> impl Send + Future<Output = Vec<Self::Peer<'_>>> {
async move {
let Some(peer_ids) = self.peers.peers.read().await.get(&network).cloned() else {
return vec![];
};
let mut res = vec![];
for id in peer_ids {
res.push(Peer { outbound_requests: &self.outbound_requests, id });
}
res
}
}
}

View File

@@ -15,6 +15,8 @@ pub use request_response::Message;
use serai_cosign::SignedCosign; use serai_cosign::SignedCosign;
use crate::p2p::TributaryBlockWithCommit;
/// The maximum message size for the request-response protocol /// The maximum message size for the request-response protocol
// This is derived from the heartbeat message size as it's our largest message // This is derived from the heartbeat message size as it's our largest message
pub(crate) const MAX_LIBP2P_REQRES_MESSAGE_SIZE: usize = pub(crate) const MAX_LIBP2P_REQRES_MESSAGE_SIZE: usize =
@@ -36,13 +38,6 @@ pub(crate) enum Request {
NotableCosigns { global_session: [u8; 32] }, NotableCosigns { global_session: [u8; 32] },
} }
/// A tributary block and its commit.
#[derive(Clone, BorshSerialize, BorshDeserialize)]
pub(crate) struct TributaryBlockWithCommit {
pub(crate) block: Vec<u8>,
pub(crate) commit: Vec<u8>,
}
/// Responses which can be received via the request-response protocol. /// Responses which can be received via the request-response protocol.
#[derive(Clone, BorshSerialize, BorshDeserialize)] #[derive(Clone, BorshSerialize, BorshDeserialize)]
pub(crate) enum Response { pub(crate) enum Response {

View File

@@ -63,8 +63,8 @@ pub(crate) struct SwarmTask {
signed_cosigns: mpsc::UnboundedSender<SignedCosign>, signed_cosigns: mpsc::UnboundedSender<SignedCosign>,
tributary_gossip: mpsc::UnboundedSender<(ValidatorSet, Vec<u8>)>, tributary_gossip: mpsc::UnboundedSender<(ValidatorSet, Vec<u8>)>,
outbound_requests: mpsc::UnboundedReceiver<(PeerId, Request, oneshot::Sender<Option<Response>>)>, outbound_requests: mpsc::UnboundedReceiver<(PeerId, Request, oneshot::Sender<Response>)>,
outbound_request_responses: HashMap<RequestId, oneshot::Sender<Option<Response>>>, outbound_request_responses: HashMap<RequestId, oneshot::Sender<Response>>,
inbound_request_response_channels: HashMap<RequestId, ResponseChannel<Response>>, inbound_request_response_channels: HashMap<RequestId, ResponseChannel<Response>>,
heartbeat_requests: mpsc::UnboundedSender<(RequestId, ValidatorSet, [u8; 32])>, heartbeat_requests: mpsc::UnboundedSender<(RequestId, ValidatorSet, [u8; 32])>,
@@ -120,16 +120,15 @@ impl SwarmTask {
} }
}, },
reqres::Message::Response { request_id, response } => { reqres::Message::Response { request_id, response } => {
// Send Some(response) as the response for the request
if let Some(channel) = self.outbound_request_responses.remove(&request_id) { if let Some(channel) = self.outbound_request_responses.remove(&request_id) {
let _: Result<_, _> = channel.send(Some(response)); let _: Result<_, _> = channel.send(response);
} }
} }
}, },
reqres::Event::OutboundFailure { request_id, .. } => { reqres::Event::OutboundFailure { request_id, .. } => {
// Send None as the response for the request // Send None as the response for the request
if let Some(channel) = self.outbound_request_responses.remove(&request_id) { if let Some(channel) = self.outbound_request_responses.remove(&request_id) {
let _: Result<_, _> = channel.send(None); let _: Result<_, _> = channel.send(Response::None);
} }
} }
reqres::Event::InboundFailure { .. } | reqres::Event::ResponseSent { .. } => {} reqres::Event::InboundFailure { .. } | reqres::Event::ResponseSent { .. } => {}
@@ -299,11 +298,7 @@ impl SwarmTask {
signed_cosigns: mpsc::UnboundedSender<SignedCosign>, signed_cosigns: mpsc::UnboundedSender<SignedCosign>,
tributary_gossip: mpsc::UnboundedSender<(ValidatorSet, Vec<u8>)>, tributary_gossip: mpsc::UnboundedSender<(ValidatorSet, Vec<u8>)>,
outbound_requests: mpsc::UnboundedReceiver<( outbound_requests: mpsc::UnboundedReceiver<(PeerId, Request, oneshot::Sender<Response>)>,
PeerId,
Request,
oneshot::Sender<Option<Response>>,
)>,
heartbeat_requests: mpsc::UnboundedSender<(RequestId, ValidatorSet, [u8; 32])>, heartbeat_requests: mpsc::UnboundedSender<(RequestId, ValidatorSet, [u8; 32])>,
notable_cosign_requests: mpsc::UnboundedSender<(RequestId, [u8; 32])>, notable_cosign_requests: mpsc::UnboundedSender<(RequestId, [u8; 32])>,

View File

@@ -1,7 +1,5 @@
use core::future::Future; use core::future::Future;
use tokio::time::error::Elapsed;
use borsh::{BorshSerialize, BorshDeserialize}; use borsh::{BorshSerialize, BorshDeserialize};
use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet}; use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet};
@@ -19,15 +17,15 @@ pub(crate) struct TributaryBlockWithCommit {
pub(crate) commit: Vec<u8>, pub(crate) commit: Vec<u8>,
} }
trait Peer: Send { trait Peer<'a>: Send {
fn send_heartbeat( fn send_heartbeat(
&self, &self,
set: ValidatorSet, set: ValidatorSet,
latest_block_hash: [u8; 32], latest_block_hash: [u8; 32],
) -> impl Send + Future<Output = Result<Vec<TributaryBlockWithCommit>, Elapsed>>; ) -> impl Send + Future<Output = Option<Vec<TributaryBlockWithCommit>>>;
} }
trait P2p: Send + Sync + tributary::P2p { trait P2p: Send + Sync + tributary::P2p {
type Peer: Peer; type Peer<'a>: Peer<'a>;
fn peers(&self, network: NetworkId) -> impl Send + Future<Output = Vec<Self::Peer>>; fn peers(&self, network: NetworkId) -> impl Send + Future<Output = Vec<Self::Peer<'_>>>;
} }