From 2a3eaf4d7ef2330bf802820e8ad8d3fc249a3c38 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Fri, 10 Jan 2025 01:20:26 -0500 Subject: [PATCH] Wrap the entire Libp2p object in an Arc Makes `Clone` calls significantly cheaper as now only the outer Arc is cloned (the inner ones have been removed). Also wraps uses of Serai in an Arc as we shouldn't actually need/want multiple caller connection pools. --- coordinator/cosign/src/intend.rs | 4 +- coordinator/cosign/src/lib.rs | 9 ++-- coordinator/p2p/libp2p/src/dial.rs | 10 ++-- coordinator/p2p/libp2p/src/lib.rs | 60 +++++++++++++++--------- coordinator/p2p/libp2p/src/validators.rs | 10 ++-- coordinator/p2p/src/lib.rs | 3 ++ 6 files changed, 59 insertions(+), 37 deletions(-) diff --git a/coordinator/cosign/src/intend.rs b/coordinator/cosign/src/intend.rs index db38c905..9fa229c5 100644 --- a/coordinator/cosign/src/intend.rs +++ b/coordinator/cosign/src/intend.rs @@ -1,5 +1,5 @@ use core::future::Future; -use std::collections::HashMap; +use std::{sync::Arc, collections::HashMap}; use serai_client::{ primitives::{SeraiAddress, Amount}, @@ -57,7 +57,7 @@ async fn block_has_events_justifying_a_cosign( /// A task to determine which blocks we should intend to cosign. pub(crate) struct CosignIntendTask { pub(crate) db: D, - pub(crate) serai: Serai, + pub(crate) serai: Arc, } impl ContinuallyRan for CosignIntendTask { diff --git a/coordinator/cosign/src/lib.rs b/coordinator/cosign/src/lib.rs index 7d909712..aa2883aa 100644 --- a/coordinator/cosign/src/lib.rs +++ b/coordinator/cosign/src/lib.rs @@ -3,7 +3,7 @@ #![deny(missing_docs)] use core::{fmt::Debug, future::Future}; -use std::collections::HashMap; +use std::{sync::Arc, collections::HashMap}; use blake2::{Digest, Blake2s256}; @@ -240,7 +240,7 @@ impl Cosigning { /// only used once at any given time. pub fn spawn( db: D, - serai: Serai, + serai: Arc, request: R, tasks_to_run_upon_cosigning: Vec, ) -> Self { @@ -334,10 +334,9 @@ impl Cosigning { } } - /// Intake a cosign from the Serai network. + /// Intake a cosign. /// - /// - Returns Err(_) if there was an error trying to validate the cosign and it should be retired - /// later. + /// - Returns Err(_) if there was an error trying to validate the cosign. /// - Returns Ok(true) if the cosign was successfully handled or could not be handled at this /// time. /// - Returns Ok(false) if the cosign was invalid. diff --git a/coordinator/p2p/libp2p/src/dial.rs b/coordinator/p2p/libp2p/src/dial.rs index f8576217..1530e34b 100644 --- a/coordinator/p2p/libp2p/src/dial.rs +++ b/coordinator/p2p/libp2p/src/dial.rs @@ -1,5 +1,5 @@ use core::future::Future; -use std::collections::HashSet; +use std::{sync::Arc, collections::HashSet}; use rand_core::{RngCore, OsRng}; @@ -29,14 +29,18 @@ const TARGET_PEERS_PER_NETWORK: usize = 5; // TODO const TARGET_DIALED_PEERS_PER_NETWORK: usize = 3; pub(crate) struct DialTask { - serai: Serai, + serai: Arc, validators: Validators, peers: Peers, to_dial: mpsc::UnboundedSender, } impl DialTask { - pub(crate) fn new(serai: Serai, peers: Peers, to_dial: mpsc::UnboundedSender) -> Self { + pub(crate) fn new( + serai: Arc, + peers: Peers, + to_dial: mpsc::UnboundedSender, + ) -> Self { DialTask { serai: serai.clone(), validators: Validators::new(serai).0, peers, to_dial } } } diff --git a/coordinator/p2p/libp2p/src/lib.rs b/coordinator/p2p/libp2p/src/lib.rs index 2f6defa7..d3f09e61 100644 --- a/coordinator/p2p/libp2p/src/lib.rs +++ b/coordinator/p2p/libp2p/src/lib.rs @@ -131,33 +131,35 @@ struct Behavior { gossip: gossip::Behavior, } -/// The libp2p-backed P2P implementation. -/// -/// The P2p trait implementation does not support backpressure and is expected to be fully -/// utilized. Failure to poll the entire API will cause unbounded memory growth. #[allow(clippy::type_complexity)] -#[derive(Clone)] -pub struct Libp2p { +struct Libp2pInner { peers: Peers, gossip: mpsc::UnboundedSender, outbound_requests: mpsc::UnboundedSender<(PeerId, Request, oneshot::Sender)>, - tributary_gossip: Arc)>>>, + tributary_gossip: Mutex)>>, - signed_cosigns: Arc>>, + signed_cosigns: Mutex>, signed_cosigns_send: mpsc::UnboundedSender, - heartbeat_requests: Arc>>, - notable_cosign_requests: Arc>>, + heartbeat_requests: Mutex>, + notable_cosign_requests: Mutex>, inbound_request_responses: mpsc::UnboundedSender<(RequestId, Response)>, } +/// The libp2p-backed P2P implementation. +/// +/// The P2p trait implementation does not support backpressure and is expected to be fully +/// utilized. Failure to poll the entire API will cause unbounded memory growth. +#[derive(Clone)] +pub struct Libp2p(Arc); + impl Libp2p { /// Create a new libp2p-backed P2P instance. /// /// This will spawn all of the internal tasks necessary for functioning. - pub fn new(serai_key: &Zeroizing, serai: Serai) -> Libp2p { + pub fn new(serai_key: &Zeroizing, serai: Arc) -> Libp2p { // Define the object we track peers with let peers = Peers { peers: Arc::new(RwLock::new(HashMap::new())) }; @@ -239,21 +241,21 @@ impl Libp2p { inbound_request_responses_recv, ); - Libp2p { + Libp2p(Arc::new(Libp2pInner { peers, gossip: gossip_send, outbound_requests: outbound_requests_send, - tributary_gossip: Arc::new(Mutex::new(tributary_gossip_recv)), + tributary_gossip: Mutex::new(tributary_gossip_recv), - signed_cosigns: Arc::new(Mutex::new(signed_cosigns_recv)), + signed_cosigns: Mutex::new(signed_cosigns_recv), signed_cosigns_send, - heartbeat_requests: Arc::new(Mutex::new(heartbeat_requests_recv)), - notable_cosign_requests: Arc::new(Mutex::new(notable_cosign_requests_recv)), + heartbeat_requests: Mutex::new(heartbeat_requests_recv), + notable_cosign_requests: Mutex::new(notable_cosign_requests_recv), inbound_request_responses: inbound_request_responses_send, - } + })) } } @@ -261,6 +263,7 @@ impl tributary::P2p for Libp2p { fn broadcast(&self, tributary: [u8; 32], message: Vec) -> impl Send + Future { async move { self + .0 .gossip .send(Message::Tributary { tributary, message }) .expect("gossip recv channel was dropped?"); @@ -281,7 +284,7 @@ impl serai_cosign::RequestNotableCosigns for Libp2p { let request = Request::NotableCosigns { global_session }; - let peers = self.peers.peers.read().await.clone(); + let peers = self.0.peers.peers.read().await.clone(); // HashSet of all peers let peers = peers.into_values().flat_map(<_>::into_iter).collect::>(); // Vec of all peers @@ -297,6 +300,7 @@ impl serai_cosign::RequestNotableCosigns for Libp2p { let (sender, receiver) = oneshot::channel(); self + .0 .outbound_requests .send((peer, request, sender)) .expect("outbound requests recv channel was dropped?"); @@ -310,6 +314,7 @@ impl serai_cosign::RequestNotableCosigns for Libp2p { { for cosign in cosigns { self + .0 .signed_cosigns_send .send(cosign) .expect("signed_cosigns recv in this object was dropped?"); @@ -327,22 +332,29 @@ impl serai_coordinator_p2p::P2p for Libp2p { fn peers(&self, network: NetworkId) -> impl Send + Future>> { async move { - let Some(peer_ids) = self.peers.peers.read().await.get(&network).cloned() else { + let Some(peer_ids) = self.0.peers.peers.read().await.get(&network).cloned() else { return vec![]; }; let mut res = vec![]; for id in peer_ids { - res.push(Peer { outbound_requests: &self.outbound_requests, id }); + res.push(Peer { outbound_requests: &self.0.outbound_requests, id }); } res } } + fn publish_cosign(&self, cosign: SignedCosign) -> impl Send + Future { + async move { + self.0.gossip.send(Message::Cosign(cosign)).expect("gossip recv channel was dropped?"); + } + } + fn heartbeat( &self, ) -> impl Send + Future>)> { async move { let (request_id, set, latest_block_hash) = self + .0 .heartbeat_requests .lock() .await @@ -351,7 +363,7 @@ impl serai_coordinator_p2p::P2p for Libp2p { .expect("heartbeat_requests_send was dropped?"); let (sender, receiver) = oneshot::channel(); tokio::spawn({ - let respond = self.inbound_request_responses.clone(); + let respond = self.0.inbound_request_responses.clone(); async move { // The swarm task expects us to respond to every request. If the caller drops this // channel, we'll receive `Err` and respond with `vec![]`, safely satisfying that bound @@ -375,6 +387,7 @@ impl serai_coordinator_p2p::P2p for Libp2p { ) -> impl Send + Future>)> { async move { let (request_id, global_session) = self + .0 .notable_cosign_requests .lock() .await @@ -383,7 +396,7 @@ impl serai_coordinator_p2p::P2p for Libp2p { .expect("notable_cosign_requests_send was dropped?"); let (sender, receiver) = oneshot::channel(); tokio::spawn({ - let respond = self.inbound_request_responses.clone(); + let respond = self.0.inbound_request_responses.clone(); async move { let response = if let Ok(notable_cosigns) = receiver.await { Response::NotableCosigns(notable_cosigns) @@ -401,13 +414,14 @@ impl serai_coordinator_p2p::P2p for Libp2p { fn tributary_message(&self) -> impl Send + Future)> { async move { - self.tributary_gossip.lock().await.recv().await.expect("tributary_gossip send was dropped?") + self.0.tributary_gossip.lock().await.recv().await.expect("tributary_gossip send was dropped?") } } fn cosign(&self) -> impl Send + Future { async move { self + .0 .signed_cosigns .lock() .await diff --git a/coordinator/p2p/libp2p/src/validators.rs b/coordinator/p2p/libp2p/src/validators.rs index 0ce4c91b..951a5e99 100644 --- a/coordinator/p2p/libp2p/src/validators.rs +++ b/coordinator/p2p/libp2p/src/validators.rs @@ -21,7 +21,7 @@ pub(crate) struct Changes { } pub(crate) struct Validators { - serai: Serai, + serai: Arc, // A cache for which session we're populated with the validators of sessions: HashMap, @@ -35,7 +35,7 @@ pub(crate) struct Validators { } impl Validators { - pub(crate) fn new(serai: Serai) -> (Self, mpsc::UnboundedReceiver) { + pub(crate) fn new(serai: Arc) -> (Self, mpsc::UnboundedReceiver) { let (send, recv) = mpsc::unbounded_channel(); let validators = Validators { serai, @@ -148,7 +148,7 @@ impl Validators { /// Update the view of the validators. pub(crate) async fn update(&mut self) -> Result<(), String> { - let session_changes = Self::session_changes(&self.serai, &self.sessions).await?; + let session_changes = Self::session_changes(&*self.serai, &self.sessions).await?; self.incorporate_session_changes(session_changes); Ok(()) } @@ -174,7 +174,9 @@ impl UpdateValidatorsTask { /// Spawn a new instance of the UpdateValidatorsTask. /// /// This returns a reference to the Validators it updates after spawning itself. - pub(crate) fn spawn(serai: Serai) -> (Arc>, mpsc::UnboundedReceiver) { + pub(crate) fn spawn( + serai: Arc, + ) -> (Arc>, mpsc::UnboundedReceiver) { // The validators which will be updated let (validators, changes) = Validators::new(serai); let validators = Arc::new(RwLock::new(validators)); diff --git a/coordinator/p2p/src/lib.rs b/coordinator/p2p/src/lib.rs index d285c8f0..71eb8f2c 100644 --- a/coordinator/p2p/src/lib.rs +++ b/coordinator/p2p/src/lib.rs @@ -56,6 +56,9 @@ pub trait P2p: Send + Sync + Clone + tributary::P2p + serai_cosign::RequestNotab /// Fetch the peers for this network. fn peers(&self, network: NetworkId) -> impl Send + Future>>; + /// Broadcast a cosign. + fn publish_cosign(&self, cosign: SignedCosign) -> impl Send + Future; + /// A cancel-safe future for the next heartbeat received over the P2P network. /// /// Yields the validator set its for, the latest block hash observed, and a channel to return the