mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-11 13:39:25 +00:00
coordinator/tributary was tributary-chain. This crate has been renamed tributary-sdk and moved to coordinator/tributary-sdk. coordinator/src/tributary was our instantion of a Tributary, the Transaction type and scan task. This has been moved to coordinator/tributary. The main reason for this was due to coordinator/main.rs becoming untidy. There is now a collection of clean, independent APIs present in the codebase. coordinator/main.rs is to compose them. Sometimes, these compositions are a bit silly (reading from a channel just to forward the message to a distinct channel). That's more than fine as the code is still readable and the value from the cleanliness of the APIs composed far exceeds the nits from having these odd compositions. This breaks down a bit as we now define a global database, and have some APIs interact with multiple other APIs. coordinator/src/tributary was a self-contained, clean API. The recently added task present in coordinator/tributary/mod.rs, which bound it to the rest of the Coordinator, wasn't. Now, coordinator/src is solely the API compositions, and all self-contained APIs are their own crates.
205 lines
7.6 KiB
Rust
205 lines
7.6 KiB
Rust
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
|
|
#![doc = include_str!("../README.md")]
|
|
#![deny(missing_docs)]
|
|
|
|
use core::future::Future;
|
|
use std::collections::HashMap;
|
|
|
|
use borsh::{BorshSerialize, BorshDeserialize};
|
|
|
|
use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet};
|
|
|
|
use serai_db::Db;
|
|
use tributary_sdk::{ReadWrite, TransactionTrait, Tributary, TributaryReader};
|
|
use serai_cosign::{SignedCosign, Cosigning};
|
|
|
|
use tokio::sync::{mpsc, oneshot};
|
|
|
|
use serai_task::{Task, ContinuallyRan};
|
|
|
|
/// The heartbeat task, effecting sync of Tributaries
|
|
pub mod heartbeat;
|
|
use crate::heartbeat::HeartbeatTask;
|
|
|
|
/// A heartbeat for a Tributary.
|
|
#[derive(Clone, Copy, BorshSerialize, BorshDeserialize, Debug)]
|
|
pub struct Heartbeat {
|
|
/// The Tributary this is the heartbeat of.
|
|
pub set: ValidatorSet,
|
|
/// The hash of the latest block added to the Tributary.
|
|
pub latest_block_hash: [u8; 32],
|
|
}
|
|
|
|
/// A tributary block and its commit.
|
|
#[derive(Clone, BorshSerialize, BorshDeserialize)]
|
|
pub struct TributaryBlockWithCommit {
|
|
/// The serialized block.
|
|
pub block: Vec<u8>,
|
|
/// The serialized commit.
|
|
pub commit: Vec<u8>,
|
|
}
|
|
|
|
/// A representation of a peer.
|
|
pub trait Peer<'a>: Send {
|
|
/// Send a heartbeat to this peer.
|
|
fn send_heartbeat(
|
|
&self,
|
|
heartbeat: Heartbeat,
|
|
) -> impl Send + Future<Output = Option<Vec<TributaryBlockWithCommit>>>;
|
|
}
|
|
|
|
/// The representation of the P2P network.
|
|
pub trait P2p:
|
|
Send + Sync + Clone + tributary_sdk::P2p + serai_cosign::RequestNotableCosigns
|
|
{
|
|
/// The representation of a peer.
|
|
type Peer<'a>: Peer<'a>;
|
|
|
|
/// Fetch the peers for this network.
|
|
fn peers(&self, network: NetworkId) -> impl Send + Future<Output = Vec<Self::Peer<'_>>>;
|
|
|
|
/// Broadcast a cosign.
|
|
fn publish_cosign(&self, cosign: SignedCosign) -> impl Send + Future<Output = ()>;
|
|
|
|
/// A cancel-safe future for the next heartbeat received over the P2P network.
|
|
///
|
|
/// Yields the validator set its for, the latest block hash observed, and a channel to return the
|
|
/// descending blocks. This channel MUST NOT and will not have its receiver dropped before a
|
|
/// message is sent.
|
|
fn heartbeat(
|
|
&self,
|
|
) -> impl Send + Future<Output = (Heartbeat, oneshot::Sender<Vec<TributaryBlockWithCommit>>)>;
|
|
|
|
/// A cancel-safe future for the next request for the notable cosigns of a gloabl session.
|
|
///
|
|
/// Yields the global session the request is for and a channel to return the notable cosigns.
|
|
/// This channel MUST NOT and will not have its receiver dropped before a message is sent.
|
|
fn notable_cosigns_request(
|
|
&self,
|
|
) -> impl Send + Future<Output = ([u8; 32], oneshot::Sender<Vec<SignedCosign>>)>;
|
|
|
|
/// A cancel-safe future for the next message regarding a Tributary.
|
|
///
|
|
/// Yields the message's Tributary's genesis block hash and the message.
|
|
fn tributary_message(&self) -> impl Send + Future<Output = ([u8; 32], Vec<u8>)>;
|
|
|
|
/// A cancel-safe future for the next cosign received.
|
|
fn cosign(&self) -> impl Send + Future<Output = SignedCosign>;
|
|
}
|
|
|
|
fn handle_notable_cosigns_request<D: Db>(
|
|
db: &D,
|
|
global_session: [u8; 32],
|
|
channel: oneshot::Sender<Vec<SignedCosign>>,
|
|
) {
|
|
let cosigns = Cosigning::<D>::notable_cosigns(db, global_session);
|
|
channel.send(cosigns).expect("channel listening for cosign oneshot response was dropped?");
|
|
}
|
|
|
|
fn handle_heartbeat<D: Db, T: TransactionTrait>(
|
|
reader: &TributaryReader<D, T>,
|
|
mut latest_block_hash: [u8; 32],
|
|
channel: oneshot::Sender<Vec<TributaryBlockWithCommit>>,
|
|
) {
|
|
let mut res_size = 8;
|
|
let mut res = vec![];
|
|
// This former case should be covered by this latter case
|
|
while (res.len() < heartbeat::MIN_BLOCKS_PER_BATCH) || (res_size < heartbeat::BATCH_SIZE_LIMIT) {
|
|
let Some(block_after) = reader.block_after(&latest_block_hash) else { break };
|
|
|
|
// These `break` conditions should only occur under edge cases, such as if we're actively
|
|
// deleting this Tributary due to being done with it
|
|
let Some(block) = reader.block(&block_after) else { break };
|
|
let block = block.serialize();
|
|
let Some(commit) = reader.commit(&block_after) else { break };
|
|
res_size += 8 + block.len() + 8 + commit.len();
|
|
res.push(TributaryBlockWithCommit { block, commit });
|
|
|
|
latest_block_hash = block_after;
|
|
}
|
|
channel
|
|
.send(res)
|
|
.map_err(|_| ())
|
|
.expect("channel listening for heartbeat oneshot response was dropped?");
|
|
}
|
|
|
|
/// Run the P2P instance.
|
|
///
|
|
/// `add_tributary`'s and `retire_tributary's senders, along with `send_cosigns`'s receiver, must
|
|
/// never be dropped. `retire_tributary` is not required to only be instructed with added
|
|
/// Tributaries.
|
|
pub async fn run<TD: Db, Tx: TransactionTrait, P: P2p>(
|
|
db: impl Db,
|
|
p2p: P,
|
|
mut add_tributary: mpsc::UnboundedReceiver<(ValidatorSet, Tributary<TD, Tx, P>)>,
|
|
mut retire_tributary: mpsc::UnboundedReceiver<ValidatorSet>,
|
|
send_cosigns: mpsc::UnboundedSender<SignedCosign>,
|
|
) {
|
|
let mut readers = HashMap::<ValidatorSet, TributaryReader<TD, Tx>>::new();
|
|
let mut tributaries = HashMap::<[u8; 32], mpsc::UnboundedSender<Vec<u8>>>::new();
|
|
let mut heartbeat_tasks = HashMap::<ValidatorSet, _>::new();
|
|
|
|
loop {
|
|
tokio::select! {
|
|
tributary = add_tributary.recv() => {
|
|
let (set, tributary) = tributary.expect("add_tributary send was dropped");
|
|
let reader = tributary.reader();
|
|
readers.insert(set, reader.clone());
|
|
|
|
let (heartbeat_task_def, heartbeat_task) = Task::new();
|
|
tokio::spawn(
|
|
(HeartbeatTask {
|
|
set,
|
|
tributary: tributary.clone(),
|
|
reader: reader.clone(),
|
|
p2p: p2p.clone(),
|
|
}).continually_run(heartbeat_task_def, vec![])
|
|
);
|
|
heartbeat_tasks.insert(set, heartbeat_task);
|
|
|
|
let (tributary_message_send, mut tributary_message_recv) = mpsc::unbounded_channel();
|
|
tributaries.insert(tributary.genesis(), tributary_message_send);
|
|
// For as long as this sender exists, handle the messages from it on a dedicated task
|
|
tokio::spawn(async move {
|
|
while let Some(message) = tributary_message_recv.recv().await {
|
|
tributary.handle_message(&message).await;
|
|
}
|
|
});
|
|
}
|
|
set = retire_tributary.recv() => {
|
|
let set = set.expect("retire_tributary send was dropped");
|
|
let Some(reader) = readers.remove(&set) else { continue };
|
|
tributaries.remove(&reader.genesis()).expect("tributary reader but no tributary");
|
|
heartbeat_tasks.remove(&set).expect("tributary but no heartbeat task");
|
|
}
|
|
|
|
(heartbeat, channel) = p2p.heartbeat() => {
|
|
if let Some(reader) = readers.get(&heartbeat.set) {
|
|
let reader = reader.clone(); // This is a cheap clone
|
|
// We spawn this on a task due to the DB reads needed
|
|
tokio::spawn(async move {
|
|
handle_heartbeat(&reader, heartbeat.latest_block_hash, channel)
|
|
});
|
|
}
|
|
}
|
|
(global_session, channel) = p2p.notable_cosigns_request() => {
|
|
tokio::spawn({
|
|
let db = db.clone();
|
|
async move { handle_notable_cosigns_request(&db, global_session, channel) }
|
|
});
|
|
}
|
|
(tributary, message) = p2p.tributary_message() => {
|
|
if let Some(tributary) = tributaries.get(&tributary) {
|
|
tributary.send(message).expect("tributary message recv was dropped?");
|
|
}
|
|
}
|
|
cosign = p2p.cosign() => {
|
|
// We don't call `Cosigning::intake_cosign` here as that can only be called from a single
|
|
// location. We also need to intake the cosigns we produce, which means we need to merge
|
|
// these streams (signing, network) somehow. That's done with this mpsc channel
|
|
send_cosigns.send(cosign).expect("channel receiving cosigns was dropped");
|
|
}
|
|
}
|
|
}
|
|
}
|