Promote Request::Heartbeat from an enum variant to a struct

This commit is contained in:
Luke Parker
2025-01-09 01:41:42 -05:00
parent 465e8498c4
commit 9833911e06
6 changed files with 42 additions and 23 deletions

View File

@@ -35,7 +35,7 @@ use libp2p::{
SwarmBuilder, SwarmBuilder,
}; };
use serai_coordinator_p2p::TributaryBlockWithCommit; use serai_coordinator_p2p::{Heartbeat, TributaryBlockWithCommit};
/// A struct to sync the validators from the Serai node in order to keep track of them. /// A struct to sync the validators from the Serai node in order to keep track of them.
mod validators; mod validators;
@@ -88,13 +88,12 @@ pub struct Peer<'a> {
impl serai_coordinator_p2p::Peer<'_> for Peer<'_> { impl serai_coordinator_p2p::Peer<'_> for Peer<'_> {
fn send_heartbeat( fn send_heartbeat(
&self, &self,
set: ValidatorSet, heartbeat: Heartbeat,
latest_block_hash: [u8; 32],
) -> impl Send + Future<Output = Option<Vec<TributaryBlockWithCommit>>> { ) -> impl Send + Future<Output = Option<Vec<TributaryBlockWithCommit>>> {
async move { async move {
const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5); const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
let request = Request::Heartbeat { set, latest_block_hash }; let request = Request::Heartbeat(heartbeat);
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
self self
.outbound_requests .outbound_requests
@@ -341,9 +340,7 @@ impl serai_coordinator_p2p::P2p for Libp2p {
fn heartbeat( fn heartbeat(
&self, &self,
) -> impl Send ) -> impl Send + Future<Output = (Heartbeat, oneshot::Sender<Vec<TributaryBlockWithCommit>>)> {
+ Future<Output = (ValidatorSet, [u8; 32], oneshot::Sender<Vec<TributaryBlockWithCommit>>)>
{
async move { async move {
let (request_id, set, latest_block_hash) = self let (request_id, set, latest_block_hash) = self
.heartbeat_requests .heartbeat_requests
@@ -357,16 +354,19 @@ impl serai_coordinator_p2p::P2p for Libp2p {
let respond = self.inbound_request_responses.clone(); let respond = self.inbound_request_responses.clone();
async move { async move {
// The swarm task expects us to respond to every request. If the caller drops this // The swarm task expects us to respond to every request. If the caller drops this
// channel, we'll receive `Err` and respond with `None`, safely satisfying that bound // channel, we'll receive `Err` and respond with `vec![]`, safely satisfying that bound
// without requiring the caller send a value down this channel // without requiring the caller send a value down this channel
let response = let response = if let Ok(blocks) = receiver.await {
if let Ok(blocks) = receiver.await { Response::Blocks(blocks) } else { Response::None }; Response::Blocks(blocks)
} else {
Response::Blocks(vec![])
};
respond respond
.send((request_id, response)) .send((request_id, response))
.expect("inbound_request_responses_recv was dropped?"); .expect("inbound_request_responses_recv was dropped?");
} }
}); });
(set, latest_block_hash, sender) (Heartbeat { set, latest_block_hash }, sender)
} }
} }
@@ -388,7 +388,7 @@ impl serai_coordinator_p2p::P2p for Libp2p {
let response = if let Ok(notable_cosigns) = receiver.await { let response = if let Ok(notable_cosigns) = receiver.await {
Response::NotableCosigns(notable_cosigns) Response::NotableCosigns(notable_cosigns)
} else { } else {
Response::None Response::NotableCosigns(vec![])
}; };
respond respond
.send((request_id, response)) .send((request_id, response))

View File

@@ -4,7 +4,6 @@ use std::io;
use async_trait::async_trait; use async_trait::async_trait;
use borsh::{BorshSerialize, BorshDeserialize}; use borsh::{BorshSerialize, BorshDeserialize};
use serai_client::validator_sets::primitives::ValidatorSet;
use futures_util::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use futures_util::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
@@ -15,7 +14,7 @@ pub use request_response::{RequestId, Message};
use serai_cosign::SignedCosign; use serai_cosign::SignedCosign;
use serai_coordinator_p2p::TributaryBlockWithCommit; use serai_coordinator_p2p::{Heartbeat, TributaryBlockWithCommit};
/// The maximum message size for the request-response protocol /// The maximum message size for the request-response protocol
// This is derived from the heartbeat message size as it's our largest message // This is derived from the heartbeat message size as it's our largest message
@@ -31,7 +30,7 @@ pub(crate) enum Request {
/// intervals. /// intervals.
/// ///
/// If our peers have more blocks than us, they're expected to respond with those blocks. /// If our peers have more blocks than us, they're expected to respond with those blocks.
Heartbeat { set: ValidatorSet, latest_block_hash: [u8; 32] }, Heartbeat(Heartbeat),
/// A request for the notable cosigns for a global session. /// A request for the notable cosigns for a global session.
NotableCosigns { global_session: [u8; 32] }, NotableCosigns { global_session: [u8; 32] },
} }

View File

@@ -21,6 +21,8 @@ use libp2p::{
swarm::{dial_opts::DialOpts, SwarmEvent, Swarm}, swarm::{dial_opts::DialOpts, SwarmEvent, Swarm},
}; };
use serai_coordinator_p2p::Heartbeat;
use crate::{ use crate::{
Peers, BehaviorEvent, Behavior, Peers, BehaviorEvent, Behavior,
validators::{self, Validators}, validators::{self, Validators},
@@ -105,7 +107,7 @@ impl SwarmTask {
match event { match event {
reqres::Event::Message { message, .. } => match message { reqres::Event::Message { message, .. } => match message {
reqres::Message::Request { request_id, request, channel } => match request { reqres::Message::Request { request_id, request, channel } => match request {
reqres::Request::Heartbeat { set, latest_block_hash } => { reqres::Request::Heartbeat(Heartbeat { set, latest_block_hash }) => {
self.inbound_request_response_channels.insert(request_id, channel); self.inbound_request_response_channels.insert(request_id, channel);
let _: Result<_, _> = let _: Result<_, _> =
self.heartbeat_requests.send((request_id, set, latest_block_hash)); self.heartbeat_requests.send((request_id, set, latest_block_hash));

View File

View File

@@ -10,7 +10,7 @@ use tributary::{ReadWrite, TransactionTrait, Block, Tributary, TributaryReader};
use serai_db::*; use serai_db::*;
use serai_task::ContinuallyRan; use serai_task::ContinuallyRan;
use crate::{Peer, P2p}; use crate::{Heartbeat, Peer, P2p};
// Amount of blocks in a minute // Amount of blocks in a minute
const BLOCKS_PER_MINUTE: usize = (60 / (tributary::tendermint::TARGET_BLOCK_TIME / 1000)) as usize; const BLOCKS_PER_MINUTE: usize = (60 / (tributary::tendermint::TARGET_BLOCK_TIME / 1000)) as usize;
@@ -70,7 +70,11 @@ impl<TD: Db, Tx: TransactionTrait, P: P2p> ContinuallyRan for HeartbeatTask<TD,
tip_is_stale = false; tip_is_stale = false;
} }
// Necessary due to https://github.com/rust-lang/rust/issues/100013 // Necessary due to https://github.com/rust-lang/rust/issues/100013
let Some(blocks) = peer.send_heartbeat(self.set, tip).boxed().await else { let Some(blocks) = peer
.send_heartbeat(Heartbeat { set: self.set, latest_block_hash: tip })
.boxed()
.await
else {
continue 'peer; continue 'peer;
}; };
@@ -88,7 +92,14 @@ impl<TD: Db, Tx: TransactionTrait, P: P2p> ContinuallyRan for HeartbeatTask<TD,
// Attempt to sync the block // Attempt to sync the block
if !self.tributary.sync_block(block, block_with_commit.commit).await { if !self.tributary.sync_block(block, block_with_commit.commit).await {
// The block may be invalid or may simply be stale // The block may be invalid or stale if we added a block elsewhere
if (!tip_is_stale) && (tip != self.reader.tip()) {
// Since the Tributary's tip advanced on its own, return
return Ok(false);
}
// Since this block was invalid or stale in a way non-trivial to detect, try to
// sync with the next peer
continue 'peer; continue 'peer;
} }

View File

@@ -15,6 +15,15 @@ use serai_cosign::SignedCosign;
/// The heartbeat task, effecting sync of Tributaries /// The heartbeat task, effecting sync of Tributaries
pub mod heartbeat; pub mod heartbeat;
/// A heartbeat for a Tributary.
#[derive(Clone, Copy, BorshSerialize, BorshDeserialize, Debug)]
pub struct Heartbeat {
/// The Tributary this is the heartbeat of.
pub set: ValidatorSet,
/// The hash of the latest block added to the Tributary.
pub latest_block_hash: [u8; 32],
}
/// A tributary block and its commit. /// A tributary block and its commit.
#[derive(Clone, BorshSerialize, BorshDeserialize)] #[derive(Clone, BorshSerialize, BorshDeserialize)]
pub struct TributaryBlockWithCommit { pub struct TributaryBlockWithCommit {
@@ -29,8 +38,7 @@ pub trait Peer<'a>: Send {
/// Send a heartbeat to this peer. /// Send a heartbeat to this peer.
fn send_heartbeat( fn send_heartbeat(
&self, &self,
set: ValidatorSet, heartbeat: Heartbeat,
latest_block_hash: [u8; 32],
) -> impl Send + Future<Output = Option<Vec<TributaryBlockWithCommit>>>; ) -> impl Send + Future<Output = Option<Vec<TributaryBlockWithCommit>>>;
} }
@@ -48,8 +56,7 @@ pub trait P2p: Send + Sync + Clone + tributary::P2p + serai_cosign::RequestNotab
/// descending blocks. /// descending blocks.
fn heartbeat( fn heartbeat(
&self, &self,
) -> impl Send ) -> impl Send + Future<Output = (Heartbeat, oneshot::Sender<Vec<TributaryBlockWithCommit>>)>;
+ Future<Output = (ValidatorSet, [u8; 32], oneshot::Sender<Vec<TributaryBlockWithCommit>>)>;
/// A cancel-safe future for the next request for the notable cosigns of a gloabl session. /// A cancel-safe future for the next request for the notable cosigns of a gloabl session.
/// ///