Complete serai-coordinator-p2p

This commit is contained in:
Luke Parker
2025-01-09 06:23:14 -05:00
parent 201a444e89
commit b101e2211a
9 changed files with 156 additions and 77 deletions

View File

@@ -3,18 +3,23 @@
#![deny(missing_docs)]
use core::future::Future;
use std::collections::HashMap;
use borsh::{BorshSerialize, BorshDeserialize};
use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet};
use serai_cosign::SignedCosign;
use serai_db::Db;
use tributary::{ReadWrite, TransactionTrait, Tributary, TributaryReader};
use serai_cosign::{SignedCosign, Cosigning};
/// A oneshot channel.
pub mod oneshot;
use tokio::sync::{mpsc, oneshot};
use serai_task::{Task, ContinuallyRan};
/// The heartbeat task, effecting sync of Tributaries
pub mod heartbeat;
use crate::heartbeat::HeartbeatTask;
/// A heartbeat for a Tributary.
#[derive(Clone, Copy, BorshSerialize, BorshDeserialize, Debug)]
@@ -74,3 +79,116 @@ pub trait P2p: Send + Sync + Clone + tributary::P2p + serai_cosign::RequestNotab
/// A cancel-safe future for the next cosign received.
fn cosign(&self) -> impl Send + Future<Output = SignedCosign>;
}
fn handle_notable_cosigns_request<D: Db>(
db: &D,
global_session: [u8; 32],
channel: oneshot::Sender<Vec<SignedCosign>>,
) {
let cosigns = Cosigning::<D>::notable_cosigns(db, global_session);
channel.send(cosigns).expect("channel listening for cosign oneshot response was dropped?");
}
fn handle_heartbeat<D: Db, T: TransactionTrait>(
reader: &TributaryReader<D, T>,
mut latest_block_hash: [u8; 32],
channel: oneshot::Sender<Vec<TributaryBlockWithCommit>>,
) {
let mut res_size = 8;
let mut res = vec![];
// This former case should be covered by this latter case
while (res.len() < heartbeat::MIN_BLOCKS_PER_BATCH) || (res_size < heartbeat::BATCH_SIZE_LIMIT) {
let Some(block_after) = reader.block_after(&latest_block_hash) else { break };
let block = reader.block(&block_after).unwrap().serialize();
let commit = reader.commit(&block_after).unwrap();
res_size += 8 + block.len() + 8 + commit.len();
res.push(TributaryBlockWithCommit { block, commit });
latest_block_hash = block_after;
}
channel
.send(res)
.map_err(|_| ())
.expect("channel listening for heartbeat oneshot response was dropped?");
}
/// Run the P2P instance.
///
/// `add_tributary`'s and `retire_tributary's senders, along with `send_cosigns`'s receiver, must
/// never be dropped. `retire_tributary` is not required to only be instructed with added
/// Tributaries.
pub async fn run<TD: Db, Tx: TransactionTrait, P: P2p>(
db: impl Db,
p2p: P,
mut add_tributary: mpsc::UnboundedReceiver<(ValidatorSet, Tributary<TD, Tx, P>)>,
mut retire_tributary: mpsc::UnboundedReceiver<ValidatorSet>,
send_cosigns: mpsc::UnboundedSender<SignedCosign>,
) {
let mut readers = HashMap::<ValidatorSet, TributaryReader<TD, Tx>>::new();
let mut tributaries = HashMap::<[u8; 32], mpsc::UnboundedSender<Vec<u8>>>::new();
let mut heartbeat_tasks = HashMap::<ValidatorSet, _>::new();
loop {
tokio::select! {
tributary = add_tributary.recv() => {
let (set, tributary) = tributary.expect("add_tributary send was dropped?");
let reader = tributary.reader();
readers.insert(set, reader.clone());
let (heartbeat_task_def, heartbeat_task) = Task::new();
tokio::spawn(
(HeartbeatTask {
set,
tributary: tributary.clone(),
reader: reader.clone(),
p2p: p2p.clone(),
}).continually_run(heartbeat_task_def, vec![])
);
heartbeat_tasks.insert(set, heartbeat_task);
let (tributary_message_send, mut tributary_message_recv) = mpsc::unbounded_channel();
tributaries.insert(tributary.genesis(), tributary_message_send);
// For as long as this sender exists, handle the messages from it on a dedicated task
tokio::spawn(async move {
while let Some(message) = tributary_message_recv.recv().await {
tributary.handle_message(&message).await;
}
});
}
set = retire_tributary.recv() => {
let set = set.expect("retire_tributary send was dropped?");
let Some(reader) = readers.remove(&set) else { continue };
tributaries.remove(&reader.genesis()).expect("tributary reader but no tributary");
heartbeat_tasks.remove(&set).expect("tributary but no heartbeat task");
}
(heartbeat, channel) = p2p.heartbeat() => {
if let Some(reader) = readers.get(&heartbeat.set) {
let reader = reader.clone(); // This is a cheap clone
// We spawn this on a task due to the DB reads needed
tokio::spawn(async move {
handle_heartbeat(&reader, heartbeat.latest_block_hash, channel)
});
}
}
(global_session, channel) = p2p.notable_cosigns_request() => {
tokio::spawn({
let db = db.clone();
async move { handle_notable_cosigns_request(&db, global_session, channel) }
});
}
(tributary, message) = p2p.tributary_message() => {
if let Some(tributary) = tributaries.get(&tributary) {
tributary.send(message).expect("tributary message recv was dropped?");
}
}
cosign = p2p.cosign() => {
// We don't call `Cosigning::intake_cosign` here as that can only be called from a single
// location. We also need to intake the cosigns we produce, which means we need to merge
// these streams (signing, network) somehow. That's done with this mpsc channel
send_cosigns.send(cosign).expect("channel receiving cosigns was dropped?");
}
}
}
}