From b101e2211a0b9163abd71c4ff036cb9a22f0789d Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Thu, 9 Jan 2025 06:23:14 -0500 Subject: [PATCH] Complete serai-coordinator-p2p --- Cargo.lock | 24 ++---- coordinator/p2p/Cargo.toml | 2 +- coordinator/p2p/README.md | 2 +- coordinator/p2p/libp2p/src/lib.rs | 4 +- coordinator/p2p/libp2p/src/reqres.rs | 2 +- coordinator/p2p/libp2p/src/swarm.rs | 9 +- coordinator/p2p/src/heartbeat.rs | 31 +++++-- coordinator/p2p/src/lib.rs | 124 ++++++++++++++++++++++++++- coordinator/p2p/src/oneshot.rs | 35 -------- 9 files changed, 156 insertions(+), 77 deletions(-) delete mode 100644 coordinator/p2p/src/oneshot.rs diff --git a/Cargo.lock b/Cargo.lock index 379c81ba..840a6c53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -840,18 +840,6 @@ dependencies = [ "futures-core", ] -[[package]] -name = "async-channel" -version = "2.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" -dependencies = [ - "concurrent-queue", - "event-listener-strategy", - "futures-core", - "pin-project-lite", -] - [[package]] name = "async-io" version = "2.4.0" @@ -7465,7 +7453,7 @@ version = "0.10.0-dev" source = "git+https://github.com/serai-dex/substrate#6e3f07bf5c98a6a3ec15f2b1a46148aa8c7d737a" dependencies = [ "array-bytes", - "async-channel 1.9.0", + "async-channel", "async-trait", "asynchronous-codec", "bytes", @@ -7506,7 +7494,7 @@ name = "sc-network-bitswap" version = "0.10.0-dev" source = "git+https://github.com/serai-dex/substrate#6e3f07bf5c98a6a3ec15f2b1a46148aa8c7d737a" dependencies = [ - "async-channel 1.9.0", + "async-channel", "cid", "futures", "libp2p-identity", @@ -7563,7 +7551,7 @@ version = "0.10.0-dev" source = "git+https://github.com/serai-dex/substrate#6e3f07bf5c98a6a3ec15f2b1a46148aa8c7d737a" dependencies = [ "array-bytes", - "async-channel 1.9.0", + "async-channel", "futures", "libp2p-identity", "log", @@ -7584,7 +7572,7 @@ version = "0.10.0-dev" source = "git+https://github.com/serai-dex/substrate#6e3f07bf5c98a6a3ec15f2b1a46148aa8c7d737a" dependencies = [ "array-bytes", - "async-channel 1.9.0", + "async-channel", "async-trait", "fork-tree", "futures", @@ -7958,7 +7946,7 @@ name = "sc-utils" version = "4.0.0-dev" source = "git+https://github.com/serai-dex/substrate#6e3f07bf5c98a6a3ec15f2b1a46148aa8c7d737a" dependencies = [ - "async-channel 1.9.0", + "async-channel", "futures", "futures-timer", "lazy_static", @@ -8384,7 +8372,6 @@ dependencies = [ name = "serai-coordinator-p2p" version = "0.1.0" dependencies = [ - "async-channel 2.3.1", "borsh", "futures-lite", "log", @@ -8392,6 +8379,7 @@ dependencies = [ "serai-cosign", "serai-db", "serai-task", + "tokio", "tributary-chain", ] diff --git a/coordinator/p2p/Cargo.toml b/coordinator/p2p/Cargo.toml index afb0b483..7b7c055c 100644 --- a/coordinator/p2p/Cargo.toml +++ b/coordinator/p2p/Cargo.toml @@ -26,8 +26,8 @@ serai-client = { path = "../../substrate/client", default-features = false, feat serai-cosign = { path = "../cosign" } tributary = { package = "tributary-chain", path = "../tributary" } -async-channel = { version = "2", default-features = false, features = ["std"] } futures-lite = { version = "2", default-features = false, features = ["std"] } +tokio = { version = "1", default-features = false, features = ["sync", "macros"] } log = { version = "0.4", default-features = false, features = ["std"] } serai-task = { path = "../../common/task", version = "0.1" } diff --git a/coordinator/p2p/README.md b/coordinator/p2p/README.md index 9d6d9aef..7a8de210 100644 --- a/coordinator/p2p/README.md +++ b/coordinator/p2p/README.md @@ -1,3 +1,3 @@ # Serai Coordinator P2P -The P2P abstraction used by Serai's coordinator. +The P2P abstraction used by Serai's coordinator, and tasks over it. diff --git a/coordinator/p2p/libp2p/src/lib.rs b/coordinator/p2p/libp2p/src/lib.rs index f15606d0..2f6defa7 100644 --- a/coordinator/p2p/libp2p/src/lib.rs +++ b/coordinator/p2p/libp2p/src/lib.rs @@ -19,7 +19,7 @@ use serai_client::{ Serai, }; -use tokio::sync::{mpsc, Mutex, RwLock}; +use tokio::sync::{mpsc, oneshot, Mutex, RwLock}; use serai_task::{Task, ContinuallyRan}; @@ -35,7 +35,7 @@ use libp2p::{ SwarmBuilder, }; -use serai_coordinator_p2p::{oneshot, Heartbeat, TributaryBlockWithCommit}; +use serai_coordinator_p2p::{Heartbeat, TributaryBlockWithCommit}; /// A struct to sync the validators from the Serai node in order to keep track of them. mod validators; diff --git a/coordinator/p2p/libp2p/src/reqres.rs b/coordinator/p2p/libp2p/src/reqres.rs index 617e1027..221cbdf3 100644 --- a/coordinator/p2p/libp2p/src/reqres.rs +++ b/coordinator/p2p/libp2p/src/reqres.rs @@ -19,7 +19,7 @@ use serai_coordinator_p2p::{Heartbeat, TributaryBlockWithCommit}; /// The maximum message size for the request-response protocol // This is derived from the heartbeat message size as it's our largest message pub(crate) const MAX_LIBP2P_REQRES_MESSAGE_SIZE: usize = - (tributary::BLOCK_SIZE_LIMIT * serai_coordinator_p2p::heartbeat::BLOCKS_PER_BATCH) + 1024; + 1024 + serai_coordinator_p2p::heartbeat::BATCH_SIZE_LIMIT; const PROTOCOL: &str = "/serai/coordinator/reqres/1.0.0"; diff --git a/coordinator/p2p/libp2p/src/swarm.rs b/coordinator/p2p/libp2p/src/swarm.rs index 16200954..a9b13bf0 100644 --- a/coordinator/p2p/libp2p/src/swarm.rs +++ b/coordinator/p2p/libp2p/src/swarm.rs @@ -8,7 +8,7 @@ use borsh::BorshDeserialize; use serai_client::validator_sets::primitives::ValidatorSet; -use tokio::sync::{mpsc, RwLock}; +use tokio::sync::{mpsc, oneshot, RwLock}; use serai_task::TaskHandle; @@ -21,7 +21,7 @@ use libp2p::{ swarm::{dial_opts::DialOpts, SwarmEvent, Swarm}, }; -use serai_coordinator_p2p::{oneshot, Heartbeat}; +use serai_coordinator_p2p::Heartbeat; use crate::{ Peers, BehaviorEvent, Behavior, @@ -69,11 +69,6 @@ pub(crate) struct SwarmTask { inbound_request_response_channels: HashMap>, heartbeat_requests: mpsc::UnboundedSender<(RequestId, ValidatorSet, [u8; 32])>, - /* TODO - let cosigns = Cosigning::::notable_cosigns(&self.db, global_session); - let res = reqres::Response::NotableCosigns(cosigns); - let _: Result<_, _> = self.swarm.behaviour_mut().reqres.send_response(channel, res); - */ notable_cosign_requests: mpsc::UnboundedSender<(RequestId, [u8; 32])>, inbound_request_responses: mpsc::UnboundedReceiver<(RequestId, Response)>, } diff --git a/coordinator/p2p/src/heartbeat.rs b/coordinator/p2p/src/heartbeat.rs index f79151ad..76d160ea 100644 --- a/coordinator/p2p/src/heartbeat.rs +++ b/coordinator/p2p/src/heartbeat.rs @@ -1,7 +1,7 @@ use core::future::Future; use std::time::{Duration, SystemTime}; -use serai_client::validator_sets::primitives::ValidatorSet; +use serai_client::validator_sets::primitives::{MAX_KEY_SHARES_PER_SET, ValidatorSet}; use futures_lite::FutureExt; @@ -15,19 +15,32 @@ use crate::{Heartbeat, Peer, P2p}; // Amount of blocks in a minute const BLOCKS_PER_MINUTE: usize = (60 / (tributary::tendermint::TARGET_BLOCK_TIME / 1000)) as usize; -/// The maximum amount of blocks to include/included within a batch. -pub const BLOCKS_PER_BATCH: usize = BLOCKS_PER_MINUTE + 1; +/// The minimum amount of blocks to include/included within a batch, assuming there's blocks to +/// include in the batch. +/// +/// This decides the size limit of the Batch (the Block size limit multiplied by the minimum amount +/// of blocks we'll send). The actual amount of blocks sent will be the amount which fits within +/// the size limit. +pub const MIN_BLOCKS_PER_BATCH: usize = BLOCKS_PER_MINUTE + 1; + +/// The size limit for a batch of blocks sent in response to a Heartbeat. +/// +/// This estimates the size of a commit as `32 + (MAX_VALIDATORS * 128)`. At the time of writing, a +/// commit is `8 + (validators * 32) + (32 + (validators * 32))` (for the time, list of validators, +/// and aggregate signature). Accordingly, this should be a safe over-estimate. +pub const BATCH_SIZE_LIMIT: usize = MIN_BLOCKS_PER_BATCH * + (tributary::BLOCK_SIZE_LIMIT + 32 + ((MAX_KEY_SHARES_PER_SET as usize) * 128)); /// Sends a heartbeat to other validators on regular intervals informing them of our Tributary's /// tip. /// /// If the other validator has more blocks then we do, they're expected to inform us. This forms /// the sync protocol for our Tributaries. -pub struct HeartbeatTask { - set: ValidatorSet, - tributary: Tributary, - reader: TributaryReader, - p2p: P, +pub(crate) struct HeartbeatTask { + pub(crate) set: ValidatorSet, + pub(crate) tributary: Tributary, + pub(crate) reader: TributaryReader, + pub(crate) p2p: P, } impl ContinuallyRan for HeartbeatTask { @@ -80,7 +93,7 @@ impl ContinuallyRan for HeartbeatTask impl Send + Future; } + +fn handle_notable_cosigns_request( + db: &D, + global_session: [u8; 32], + channel: oneshot::Sender>, +) { + let cosigns = Cosigning::::notable_cosigns(db, global_session); + channel.send(cosigns).expect("channel listening for cosign oneshot response was dropped?"); +} + +fn handle_heartbeat( + reader: &TributaryReader, + mut latest_block_hash: [u8; 32], + channel: oneshot::Sender>, +) { + let mut res_size = 8; + let mut res = vec![]; + // This former case should be covered by this latter case + while (res.len() < heartbeat::MIN_BLOCKS_PER_BATCH) || (res_size < heartbeat::BATCH_SIZE_LIMIT) { + let Some(block_after) = reader.block_after(&latest_block_hash) else { break }; + + let block = reader.block(&block_after).unwrap().serialize(); + let commit = reader.commit(&block_after).unwrap(); + res_size += 8 + block.len() + 8 + commit.len(); + res.push(TributaryBlockWithCommit { block, commit }); + + latest_block_hash = block_after; + } + channel + .send(res) + .map_err(|_| ()) + .expect("channel listening for heartbeat oneshot response was dropped?"); +} + +/// Run the P2P instance. +/// +/// `add_tributary`'s and `retire_tributary's senders, along with `send_cosigns`'s receiver, must +/// never be dropped. `retire_tributary` is not required to only be instructed with added +/// Tributaries. +pub async fn run( + db: impl Db, + p2p: P, + mut add_tributary: mpsc::UnboundedReceiver<(ValidatorSet, Tributary)>, + mut retire_tributary: mpsc::UnboundedReceiver, + send_cosigns: mpsc::UnboundedSender, +) { + let mut readers = HashMap::>::new(); + let mut tributaries = HashMap::<[u8; 32], mpsc::UnboundedSender>>::new(); + let mut heartbeat_tasks = HashMap::::new(); + + loop { + tokio::select! { + tributary = add_tributary.recv() => { + let (set, tributary) = tributary.expect("add_tributary send was dropped?"); + let reader = tributary.reader(); + readers.insert(set, reader.clone()); + + let (heartbeat_task_def, heartbeat_task) = Task::new(); + tokio::spawn( + (HeartbeatTask { + set, + tributary: tributary.clone(), + reader: reader.clone(), + p2p: p2p.clone(), + }).continually_run(heartbeat_task_def, vec![]) + ); + heartbeat_tasks.insert(set, heartbeat_task); + + let (tributary_message_send, mut tributary_message_recv) = mpsc::unbounded_channel(); + tributaries.insert(tributary.genesis(), tributary_message_send); + // For as long as this sender exists, handle the messages from it on a dedicated task + tokio::spawn(async move { + while let Some(message) = tributary_message_recv.recv().await { + tributary.handle_message(&message).await; + } + }); + } + set = retire_tributary.recv() => { + let set = set.expect("retire_tributary send was dropped?"); + let Some(reader) = readers.remove(&set) else { continue }; + tributaries.remove(&reader.genesis()).expect("tributary reader but no tributary"); + heartbeat_tasks.remove(&set).expect("tributary but no heartbeat task"); + } + + (heartbeat, channel) = p2p.heartbeat() => { + if let Some(reader) = readers.get(&heartbeat.set) { + let reader = reader.clone(); // This is a cheap clone + // We spawn this on a task due to the DB reads needed + tokio::spawn(async move { + handle_heartbeat(&reader, heartbeat.latest_block_hash, channel) + }); + } + } + (global_session, channel) = p2p.notable_cosigns_request() => { + tokio::spawn({ + let db = db.clone(); + async move { handle_notable_cosigns_request(&db, global_session, channel) } + }); + } + (tributary, message) = p2p.tributary_message() => { + if let Some(tributary) = tributaries.get(&tributary) { + tributary.send(message).expect("tributary message recv was dropped?"); + } + } + cosign = p2p.cosign() => { + // We don't call `Cosigning::intake_cosign` here as that can only be called from a single + // location. We also need to intake the cosigns we produce, which means we need to merge + // these streams (signing, network) somehow. That's done with this mpsc channel + send_cosigns.send(cosign).expect("channel receiving cosigns was dropped?"); + } + } + } +} diff --git a/coordinator/p2p/src/oneshot.rs b/coordinator/p2p/src/oneshot.rs deleted file mode 100644 index bced6a1a..00000000 --- a/coordinator/p2p/src/oneshot.rs +++ /dev/null @@ -1,35 +0,0 @@ -use core::{ - pin::Pin, - task::{Poll, Context}, - future::Future, -}; - -pub use async_channel::{SendError, RecvError}; - -/// The sender for a oneshot channel. -pub struct Sender(async_channel::Sender); -impl Sender { - /// Send a value down the channel. - /// - /// Returns an error if the channel's receiver was dropped. - pub fn send(self, msg: T) -> Result<(), SendError> { - self.0.send_blocking(msg) - } -} - -/// The receiver for a oneshot channel. -pub struct Receiver(async_channel::Receiver); -impl Future for Receiver { - type Output = Result; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let recv = self.0.recv(); - futures_lite::pin!(recv); - recv.poll(cx) - } -} - -/// Create a new oneshot channel. -pub fn channel() -> (Sender, Receiver) { - let (send, recv) = async_channel::bounded(1); - (Sender(send), Receiver(recv)) -}