Finish mapping Libp2p to the P2p trait API

This commit is contained in:
Luke Parker
2025-01-08 19:39:09 -05:00
parent b2bd5d3a44
commit ce83b41712
5 changed files with 222 additions and 28 deletions

View File

@@ -4,6 +4,8 @@ use std::{
collections::{HashSet, HashMap},
};
use rand_core::{RngCore, OsRng};
use zeroize::Zeroizing;
use schnorrkel::Keypair;
@@ -13,10 +15,12 @@ use serai_client::{
Serai,
};
use tokio::sync::{mpsc, oneshot, RwLock};
use tokio::sync::{mpsc, oneshot, Mutex, RwLock};
use serai_task::{Task, ContinuallyRan};
use serai_cosign::SignedCosign;
use libp2p::{
multihash::Multihash,
identity::{self, PeerId},
@@ -42,10 +46,11 @@ use dial::DialTask;
/// The request-response messages and behavior
mod reqres;
use reqres::{Request, Response};
use reqres::{RequestId, Request, Response};
/// The gossip messages and behavior
mod gossip;
use gossip::Message;
/// The swarm task, running it and dispatching to/from it
mod swarm;
@@ -77,19 +82,21 @@ impl crate::p2p::Peer<'_> for Peer<'_> {
set: ValidatorSet,
latest_block_hash: [u8; 32],
) -> impl Send + Future<Output = Option<Vec<TributaryBlockWithCommit>>> {
const HEARBEAT_TIMEOUT: Duration = Duration::from_secs(5);
async move {
const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
let request = Request::Heartbeat { set, latest_block_hash };
let (sender, receiver) = oneshot::channel();
self
.outbound_requests
.send((self.id, request, sender))
.expect("outbound requests recv channel was dropped?");
match tokio::time::timeout(HEARBEAT_TIMEOUT, receiver).await.ok()?.ok()? {
Response::None => Some(vec![]),
Response::Blocks(blocks) => Some(blocks),
// TODO: Disconnect this peer
Response::NotableCosigns(_) => None,
if let Ok(Ok(Response::Blocks(blocks))) =
tokio::time::timeout(HEARTBEAT_TIMEOUT, receiver).await
{
Some(blocks)
} else {
None
}
}
}
@@ -109,7 +116,18 @@ struct Behavior {
#[derive(Clone)]
struct Libp2p {
peers: Peers,
gossip: mpsc::UnboundedSender<Message>,
outbound_requests: mpsc::UnboundedSender<(PeerId, Request, oneshot::Sender<Response>)>,
tributary_gossip: Arc<Mutex<mpsc::UnboundedReceiver<([u8; 32], Vec<u8>)>>>,
signed_cosigns: Arc<Mutex<mpsc::UnboundedReceiver<SignedCosign>>>,
signed_cosigns_send: mpsc::UnboundedSender<SignedCosign>,
heartbeat_requests: Arc<Mutex<mpsc::UnboundedReceiver<(RequestId, ValidatorSet, [u8; 32])>>>,
notable_cosign_requests: Arc<Mutex<mpsc::UnboundedReceiver<(RequestId, [u8; 32])>>>,
inbound_request_responses: mpsc::UnboundedSender<(RequestId, Response)>,
}
impl Libp2p {
@@ -174,10 +192,10 @@ impl Libp2p {
dial_task,
to_dial_recv,
swarm_validators,
peers,
peers.clone(),
swarm,
gossip_recv,
signed_cosigns_send,
signed_cosigns_send.clone(),
tributary_gossip_send,
outbound_requests_recv,
heartbeat_requests_send,
@@ -185,20 +203,92 @@ impl Libp2p {
inbound_request_responses_recv,
);
// gossip_send, signed_cosigns_recv, tributary_gossip_recv, outbound_requests_send,
// heartbeat_requests_recv, notable_cosign_requests_recv, inbound_request_responses_send
todo!("TODO");
Libp2p {
peers,
gossip: gossip_send,
outbound_requests: outbound_requests_send,
tributary_gossip: Arc::new(Mutex::new(tributary_gossip_recv)),
signed_cosigns: Arc::new(Mutex::new(signed_cosigns_recv)),
signed_cosigns_send,
heartbeat_requests: Arc::new(Mutex::new(heartbeat_requests_recv)),
notable_cosign_requests: Arc::new(Mutex::new(notable_cosign_requests_recv)),
inbound_request_responses: inbound_request_responses_send,
}
}
}
impl tributary::P2p for Libp2p {
fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) -> impl Send + Future<Output = ()> {
async move { todo!("TODO") }
fn broadcast(&self, tributary: [u8; 32], message: Vec<u8>) -> impl Send + Future<Output = ()> {
async move {
self
.gossip
.send(Message::Tributary { tributary, message })
.expect("gossip recv channel was dropped?");
}
}
}
impl serai_cosign::RequestNotableCosigns for Libp2p {
type Error = ();
fn request_notable_cosigns(
&self,
global_session: [u8; 32],
) -> impl Send + Future<Output = Result<(), Self::Error>> {
async move {
const AMOUNT_OF_PEERS_TO_REQUEST_FROM: usize = 3;
const NOTABLE_COSIGNS_TIMEOUT: Duration = Duration::from_secs(5);
let request = Request::NotableCosigns { global_session };
let peers = self.peers.peers.read().await.clone();
// HashSet of all peers
let peers = peers.into_values().flat_map(<_>::into_iter).collect::<HashSet<_>>();
// Vec of all peers
let mut peers = peers.into_iter().collect::<Vec<_>>();
let mut channels = Vec::with_capacity(AMOUNT_OF_PEERS_TO_REQUEST_FROM);
for _ in 0 .. AMOUNT_OF_PEERS_TO_REQUEST_FROM {
if peers.is_empty() {
break;
}
let i = usize::try_from(OsRng.next_u64() % u64::try_from(peers.len()).unwrap()).unwrap();
let peer = peers.swap_remove(i);
let (sender, receiver) = oneshot::channel();
self
.outbound_requests
.send((peer, request, sender))
.expect("outbound requests recv channel was dropped?");
channels.push(receiver);
}
// We could reduce our latency by using FuturesUnordered here but the latency isn't a concern
for channel in channels {
if let Ok(Ok(Response::NotableCosigns(cosigns))) =
tokio::time::timeout(NOTABLE_COSIGNS_TIMEOUT, channel).await
{
for cosign in cosigns {
self
.signed_cosigns_send
.send(cosign)
.expect("signed_cosigns recv in this object was dropped?");
}
}
}
Ok(())
}
}
}
impl crate::p2p::P2p for Libp2p {
type Peer<'a> = Peer<'a>;
fn peers(&self, network: NetworkId) -> impl Send + Future<Output = Vec<Self::Peer<'_>>> {
async move {
let Some(peer_ids) = self.peers.peers.read().await.get(&network).cloned() else {
@@ -211,4 +301,79 @@ impl crate::p2p::P2p for Libp2p {
res
}
}
fn heartbeat(
&self,
) -> impl Send
+ Future<Output = (ValidatorSet, [u8; 32], oneshot::Sender<Vec<TributaryBlockWithCommit>>)>
{
async move {
let (request_id, set, latest_block_hash) = self
.heartbeat_requests
.lock()
.await
.recv()
.await
.expect("heartbeat_requests_send was dropped?");
let (sender, receiver) = oneshot::channel();
tokio::spawn({
let respond = self.inbound_request_responses.clone();
async move {
let response =
if let Ok(blocks) = receiver.await { Response::Blocks(blocks) } else { Response::None };
respond
.send((request_id, response))
.expect("inbound_request_responses_recv was dropped?");
}
});
(set, latest_block_hash, sender)
}
}
fn notable_cosigns_request(
&self,
) -> impl Send + Future<Output = ([u8; 32], oneshot::Sender<Vec<SignedCosign>>)> {
async move {
let (request_id, global_session) = self
.notable_cosign_requests
.lock()
.await
.recv()
.await
.expect("notable_cosign_requests_send was dropped?");
let (sender, receiver) = oneshot::channel();
tokio::spawn({
let respond = self.inbound_request_responses.clone();
async move {
let response = if let Ok(notable_cosigns) = receiver.await {
Response::NotableCosigns(notable_cosigns)
} else {
Response::None
};
respond
.send((request_id, response))
.expect("inbound_request_responses_recv was dropped?");
}
});
(global_session, sender)
}
}
fn tributary_message(&self) -> impl Send + Future<Output = ([u8; 32], Vec<u8>)> {
async move {
self.tributary_gossip.lock().await.recv().await.expect("tributary_gossip send was dropped?")
}
}
fn cosign(&self) -> impl Send + Future<Output = SignedCosign> {
async move {
self
.signed_cosigns
.lock()
.await
.recv()
.await
.expect("signed_cosigns couldn't recv despite send in same object?")
}
}
}