From 9833911e0630a598db547f73ef10eab5c392c1ad Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Thu, 9 Jan 2025 01:41:42 -0500 Subject: [PATCH] Promote Request::Heartbeat from an enum variant to a struct --- coordinator/p2p/libp2p/src/lib.rs | 24 ++++++++++++------------ coordinator/p2p/libp2p/src/reqres.rs | 5 ++--- coordinator/p2p/libp2p/src/swarm.rs | 4 +++- coordinator/p2p/src/cosign.rs | 0 coordinator/p2p/src/heartbeat.rs | 17 ++++++++++++++--- coordinator/p2p/src/lib.rs | 15 +++++++++++---- 6 files changed, 42 insertions(+), 23 deletions(-) create mode 100644 coordinator/p2p/src/cosign.rs diff --git a/coordinator/p2p/libp2p/src/lib.rs b/coordinator/p2p/libp2p/src/lib.rs index 0778813f..2f6defa7 100644 --- a/coordinator/p2p/libp2p/src/lib.rs +++ b/coordinator/p2p/libp2p/src/lib.rs @@ -35,7 +35,7 @@ use libp2p::{ SwarmBuilder, }; -use serai_coordinator_p2p::TributaryBlockWithCommit; +use serai_coordinator_p2p::{Heartbeat, TributaryBlockWithCommit}; /// A struct to sync the validators from the Serai node in order to keep track of them. mod validators; @@ -88,13 +88,12 @@ pub struct Peer<'a> { impl serai_coordinator_p2p::Peer<'_> for Peer<'_> { fn send_heartbeat( &self, - set: ValidatorSet, - latest_block_hash: [u8; 32], + heartbeat: Heartbeat, ) -> impl Send + Future>> { async move { const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5); - let request = Request::Heartbeat { set, latest_block_hash }; + let request = Request::Heartbeat(heartbeat); let (sender, receiver) = oneshot::channel(); self .outbound_requests @@ -341,9 +340,7 @@ impl serai_coordinator_p2p::P2p for Libp2p { fn heartbeat( &self, - ) -> impl Send - + Future>)> - { + ) -> impl Send + Future>)> { async move { let (request_id, set, latest_block_hash) = self .heartbeat_requests @@ -357,16 +354,19 @@ impl serai_coordinator_p2p::P2p for Libp2p { let respond = self.inbound_request_responses.clone(); async move { // The swarm task expects us to respond to every request. If the caller drops this - // channel, we'll receive `Err` and respond with `None`, safely satisfying that bound + // channel, we'll receive `Err` and respond with `vec![]`, safely satisfying that bound // without requiring the caller send a value down this channel - let response = - if let Ok(blocks) = receiver.await { Response::Blocks(blocks) } else { Response::None }; + let response = if let Ok(blocks) = receiver.await { + Response::Blocks(blocks) + } else { + Response::Blocks(vec![]) + }; respond .send((request_id, response)) .expect("inbound_request_responses_recv was dropped?"); } }); - (set, latest_block_hash, sender) + (Heartbeat { set, latest_block_hash }, sender) } } @@ -388,7 +388,7 @@ impl serai_coordinator_p2p::P2p for Libp2p { let response = if let Ok(notable_cosigns) = receiver.await { Response::NotableCosigns(notable_cosigns) } else { - Response::None + Response::NotableCosigns(vec![]) }; respond .send((request_id, response)) diff --git a/coordinator/p2p/libp2p/src/reqres.rs b/coordinator/p2p/libp2p/src/reqres.rs index 4f8fa236..617e1027 100644 --- a/coordinator/p2p/libp2p/src/reqres.rs +++ b/coordinator/p2p/libp2p/src/reqres.rs @@ -4,7 +4,6 @@ use std::io; use async_trait::async_trait; use borsh::{BorshSerialize, BorshDeserialize}; -use serai_client::validator_sets::primitives::ValidatorSet; use futures_util::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; @@ -15,7 +14,7 @@ pub use request_response::{RequestId, Message}; use serai_cosign::SignedCosign; -use serai_coordinator_p2p::TributaryBlockWithCommit; +use serai_coordinator_p2p::{Heartbeat, TributaryBlockWithCommit}; /// The maximum message size for the request-response protocol // This is derived from the heartbeat message size as it's our largest message @@ -31,7 +30,7 @@ pub(crate) enum Request { /// intervals. /// /// If our peers have more blocks than us, they're expected to respond with those blocks. - Heartbeat { set: ValidatorSet, latest_block_hash: [u8; 32] }, + Heartbeat(Heartbeat), /// A request for the notable cosigns for a global session. NotableCosigns { global_session: [u8; 32] }, } diff --git a/coordinator/p2p/libp2p/src/swarm.rs b/coordinator/p2p/libp2p/src/swarm.rs index e0a6762b..0c3e2664 100644 --- a/coordinator/p2p/libp2p/src/swarm.rs +++ b/coordinator/p2p/libp2p/src/swarm.rs @@ -21,6 +21,8 @@ use libp2p::{ swarm::{dial_opts::DialOpts, SwarmEvent, Swarm}, }; +use serai_coordinator_p2p::Heartbeat; + use crate::{ Peers, BehaviorEvent, Behavior, validators::{self, Validators}, @@ -105,7 +107,7 @@ impl SwarmTask { match event { reqres::Event::Message { message, .. } => match message { reqres::Message::Request { request_id, request, channel } => match request { - reqres::Request::Heartbeat { set, latest_block_hash } => { + reqres::Request::Heartbeat(Heartbeat { set, latest_block_hash }) => { self.inbound_request_response_channels.insert(request_id, channel); let _: Result<_, _> = self.heartbeat_requests.send((request_id, set, latest_block_hash)); diff --git a/coordinator/p2p/src/cosign.rs b/coordinator/p2p/src/cosign.rs new file mode 100644 index 00000000..e69de29b diff --git a/coordinator/p2p/src/heartbeat.rs b/coordinator/p2p/src/heartbeat.rs index 87827e7f..4966c471 100644 --- a/coordinator/p2p/src/heartbeat.rs +++ b/coordinator/p2p/src/heartbeat.rs @@ -10,7 +10,7 @@ use tributary::{ReadWrite, TransactionTrait, Block, Tributary, TributaryReader}; use serai_db::*; use serai_task::ContinuallyRan; -use crate::{Peer, P2p}; +use crate::{Heartbeat, Peer, P2p}; // Amount of blocks in a minute const BLOCKS_PER_MINUTE: usize = (60 / (tributary::tendermint::TARGET_BLOCK_TIME / 1000)) as usize; @@ -70,7 +70,11 @@ impl ContinuallyRan for HeartbeatTask ContinuallyRan for HeartbeatTask: Send { /// Send a heartbeat to this peer. fn send_heartbeat( &self, - set: ValidatorSet, - latest_block_hash: [u8; 32], + heartbeat: Heartbeat, ) -> impl Send + Future>>; } @@ -48,8 +56,7 @@ pub trait P2p: Send + Sync + Clone + tributary::P2p + serai_cosign::RequestNotab /// descending blocks. fn heartbeat( &self, - ) -> impl Send - + Future>)>; + ) -> impl Send + Future>)>; /// A cancel-safe future for the next request for the notable cosigns of a gloabl session. ///