#![cfg_attr(docsrs, feature(doc_auto_cfg))] #![doc = include_str!("../README.md")] #![deny(missing_docs)] use core::future::Future; use std::collections::HashMap; use borsh::{BorshSerialize, BorshDeserialize}; use serai_primitives::{network_id::ExternalNetworkId, validator_sets::ExternalValidatorSet}; use serai_db::Db; use tributary_sdk::{ReadWrite, TransactionTrait, Tributary, TributaryReader}; use serai_cosign::{SignedCosign, Cosigning}; use tokio::sync::{mpsc, oneshot}; use serai_task::{Task, ContinuallyRan}; /// The heartbeat task, effecting sync of Tributaries pub mod heartbeat; use crate::heartbeat::HeartbeatTask; /// A heartbeat for a Tributary. #[derive(Clone, Copy, BorshSerialize, BorshDeserialize, Debug)] pub struct Heartbeat { /// The Tributary this is the heartbeat of. pub set: ExternalValidatorSet, /// The hash of the latest block added to the Tributary. pub latest_block_hash: [u8; 32], } /// A tributary block and its commit. #[derive(Clone, BorshSerialize, BorshDeserialize)] pub struct TributaryBlockWithCommit { /// The serialized block. pub block: Vec, /// The serialized commit. pub commit: Vec, } /// A representation of a peer. pub trait Peer<'a>: Send { /// Send a heartbeat to this peer. fn send_heartbeat( &self, heartbeat: Heartbeat, ) -> impl Send + Future>>; } /// The representation of the P2P network. pub trait P2p: Send + Sync + Clone + tributary_sdk::P2p + serai_cosign::RequestNotableCosigns { /// The representation of a peer. type Peer<'a>: Peer<'a>; /// Fetch the peers for this network. fn peers(&self, network: ExternalNetworkId) -> impl Send + Future>>; /// Broadcast a cosign. fn publish_cosign(&self, cosign: SignedCosign) -> impl Send + Future; /// A cancel-safe future for the next heartbeat received over the P2P network. /// /// Yields the validator set its for, the latest block hash observed, and a channel to return the /// descending blocks. This channel MUST NOT and will not have its receiver dropped before a /// message is sent. fn heartbeat( &self, ) -> impl Send + Future>)>; /// A cancel-safe future for the next request for the notable cosigns of a gloabl session. /// /// Yields the global session the request is for and a channel to return the notable cosigns. /// This channel MUST NOT and will not have its receiver dropped before a message is sent. fn notable_cosigns_request( &self, ) -> impl Send + Future>)>; /// A cancel-safe future for the next message regarding a Tributary. /// /// Yields the message's Tributary's genesis block hash and the message. fn tributary_message(&self) -> impl Send + Future)>; /// A cancel-safe future for the next cosign received. fn cosign(&self) -> impl Send + Future; } fn handle_notable_cosigns_request( db: &D, global_session: [u8; 32], channel: oneshot::Sender>, ) { let cosigns = Cosigning::::notable_cosigns(db, global_session); channel.send(cosigns).expect("channel listening for cosign oneshot response was dropped?"); } fn handle_heartbeat( reader: &TributaryReader, mut latest_block_hash: [u8; 32], channel: oneshot::Sender>, ) { let mut res_size = 8; let mut res = vec![]; // This former case should be covered by this latter case while (res.len() < heartbeat::MIN_BLOCKS_PER_BATCH) || (res_size < heartbeat::BATCH_SIZE_LIMIT) { let Some(block_after) = reader.block_after(&latest_block_hash) else { break }; // These `break` conditions should only occur under edge cases, such as if we're actively // deleting this Tributary due to being done with it let Some(block) = reader.block(&block_after) else { break }; let block = block.serialize(); let Some(commit) = reader.commit(&block_after) else { break }; res_size += 8 + block.len() + 8 + commit.len(); res.push(TributaryBlockWithCommit { block, commit }); latest_block_hash = block_after; } channel .send(res) .map_err(|_| ()) .expect("channel listening for heartbeat oneshot response was dropped?"); } /// Run the P2P instance. /// /// `add_tributary`'s and `retire_tributary's senders, along with `send_cosigns`'s receiver, must /// never be dropped. `retire_tributary` is not required to only be instructed with added /// Tributaries. pub async fn run( db: impl Db, p2p: P, mut add_tributary: mpsc::UnboundedReceiver<(ExternalValidatorSet, Tributary)>, mut retire_tributary: mpsc::UnboundedReceiver, send_cosigns: mpsc::UnboundedSender, ) { let mut readers = HashMap::>::new(); let mut tributaries = HashMap::<[u8; 32], mpsc::UnboundedSender>>::new(); let mut heartbeat_tasks = HashMap::::new(); loop { tokio::select! { tributary = add_tributary.recv() => { let (set, tributary) = tributary.expect("add_tributary send was dropped"); let reader = tributary.reader(); readers.insert(set, reader.clone()); let (heartbeat_task_def, heartbeat_task) = Task::new(); tokio::spawn( (HeartbeatTask { set, tributary: tributary.clone(), reader: reader.clone(), p2p: p2p.clone(), }).continually_run(heartbeat_task_def, vec![]) ); heartbeat_tasks.insert(set, heartbeat_task); let (tributary_message_send, mut tributary_message_recv) = mpsc::unbounded_channel(); tributaries.insert(tributary.genesis(), tributary_message_send); // For as long as this sender exists, handle the messages from it on a dedicated task tokio::spawn(async move { while let Some(message) = tributary_message_recv.recv().await { tributary.handle_message(&message).await; } }); } set = retire_tributary.recv() => { let set = set.expect("retire_tributary send was dropped"); let Some(reader) = readers.remove(&set) else { continue }; tributaries.remove(&reader.genesis()).expect("tributary reader but no tributary"); heartbeat_tasks.remove(&set).expect("tributary but no heartbeat task"); } (heartbeat, channel) = p2p.heartbeat() => { if let Some(reader) = readers.get(&heartbeat.set) { let reader = reader.clone(); // This is a cheap clone // We spawn this on a task due to the DB reads needed tokio::spawn(async move { handle_heartbeat(&reader, heartbeat.latest_block_hash, channel) }); } } (global_session, channel) = p2p.notable_cosigns_request() => { tokio::spawn({ let db = db.clone(); async move { handle_notable_cosigns_request(&db, global_session, channel) } }); } (tributary, message) = p2p.tributary_message() => { if let Some(tributary) = tributaries.get(&tributary) { tributary.send(message).expect("tributary message recv was dropped?"); } } cosign = p2p.cosign() => { // We don't call `Cosigning::intake_cosign` here as that can only be called from a single // location. We also need to intake the cosigns we produce, which means we need to merge // these streams (signing, network) somehow. That's done with this mpsc channel send_cosigns.send(cosign).expect("channel receiving cosigns was dropped"); } } } }