From 49c221cca2774b071710fefc9851ef13229686ec Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Fri, 3 Jan 2025 13:02:29 -0500 Subject: [PATCH] Restore request-response code to the coordinator --- Cargo.lock | 1 + coordinator/Cargo.toml | 2 + coordinator/src/main.rs | 1 + coordinator/src/p2p/reqres.rs | 126 +++++++++++++++++++++++ coordinator/src/tributary/transaction.rs | 4 +- 5 files changed, 133 insertions(+), 1 deletion(-) create mode 100644 coordinator/src/p2p/reqres.rs diff --git a/Cargo.lock b/Cargo.lock index a0d77739..802c5f00 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8328,6 +8328,7 @@ dependencies = [ "rand_core", "schnorr-signatures", "serai-client", + "serai-cosign", "serai-db", "serai-env", "serai-message-queue", diff --git a/coordinator/Cargo.toml b/coordinator/Cargo.toml index 2af0f822..9515bd74 100644 --- a/coordinator/Cargo.toml +++ b/coordinator/Cargo.toml @@ -56,6 +56,8 @@ futures-util = { version = "0.3", default-features = false, features = ["std"] } tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "sync", "time", "macros"] } libp2p = { version = "0.52", default-features = false, features = ["tokio", "tcp", "noise", "yamux", "request-response", "gossipsub", "macros"] } +serai-cosign = { path = "./cosign" } + [dev-dependencies] tributary = { package = "tributary-chain", path = "./tributary", features = ["tests"] } sp-application-crypto = { git = "https://github.com/serai-dex/substrate", default-features = false, features = ["std"] } diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index c3eb8d80..2316af2e 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -1,4 +1,5 @@ mod tributary; +mod p2p; fn main() { todo!("TODO") diff --git a/coordinator/src/p2p/reqres.rs b/coordinator/src/p2p/reqres.rs new file mode 100644 index 00000000..fbd0388d --- /dev/null +++ b/coordinator/src/p2p/reqres.rs @@ -0,0 +1,126 @@ +use core::time::Duration; +use std::io::{self, Read}; + +use async_trait::async_trait; + +use borsh::{BorshSerialize, BorshDeserialize}; +use serai_client::validator_sets::primitives::ValidatorSet; + +use futures_util::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; + +use libp2p::request_response::{Codec as CodecTrait, Config, Behaviour, ProtocolSupport}; + +use serai_cosign::SignedCosign; + +/// The maximum message size for the request-response protocol +// This is derived from the heartbeat message size as it's our largest message +const MAX_LIBP2P_REQRES_MESSAGE_SIZE: usize = + (tributary::BLOCK_SIZE_LIMIT * crate::p2p::heartbeat::BLOCKS_PER_BATCH) + 1024; + +const PROTOCOL: &str = "/serai/coordinator"; + +/// Requests which can be made via the request-response protocol. +#[derive(Clone, Copy, Debug, BorshSerialize, BorshDeserialize)] +pub(crate) enum Request { + /// A keep-alive to prevent our connections from being dropped. + KeepAlive, + /// A heartbeat informing our peers of our latest block, for the specified blockchain, on regular + /// intervals. + /// + /// If our peers have more blocks than us, they're expected to respond with those blocks. + Heartbeat { set: ValidatorSet, latest_block_hash: [u8; 32] }, + /// A request for the notable cosigns for a global session. + NotableCosigns { global_session: [u8; 32] }, +} + +/// A tributary block and its commit. +#[derive(Clone, BorshSerialize, BorshDeserialize)] +pub(crate) struct TributaryBlockWithCommit { + pub(crate) block: Vec, + pub(crate) commit: Vec, +} + +/// Responses which can be received via the request-response protocol. +#[derive(Clone, BorshSerialize, BorshDeserialize)] +pub(crate) enum Response { + Blocks(Vec), + NotableCosigns(Vec), +} + +/// The codec used for the request-response protocol. +/// +/// We don't use CBOR or JSON, but use borsh to create `Vec`s we then length-prefix. While +/// ideally, we'd use borsh directly with the `io` traits defined here, they're async and there +/// isn't an amenable API within borsh for incremental deserialization. +#[derive(Default, Clone, Copy, Debug)] +struct Codec; +impl Codec { + async fn read(io: &mut (impl Unpin + AsyncRead)) -> io::Result { + let mut len = [0; 4]; + io.read_exact(&mut len).await?; + let len = usize::try_from(u32::from_le_bytes(len)).expect("not at least a 32-bit platform?"); + if len > MAX_LIBP2P_REQRES_MESSAGE_SIZE { + Err(io::Error::other("request length exceeded MAX_LIBP2P_REQRES_MESSAGE_SIZE"))?; + } + // This may be a non-trivial allocation easily causable + // While we could chunk the read, meaning we only perform the allocation as bandwidth is used, + // the max message size should be sufficiently sane + let mut buf = vec![0; len]; + io.read_exact(&mut buf).await?; + let mut buf = buf.as_slice(); + let res = M::deserialize(&mut buf)?; + if !buf.is_empty() { + Err(io::Error::other("p2p message had extra data appended to it"))?; + } + Ok(res) + } + async fn write(io: &mut (impl Unpin + AsyncWrite), msg: &impl BorshSerialize) -> io::Result<()> { + let msg = borsh::to_vec(msg).unwrap(); + io.write_all(&u32::try_from(msg.len()).unwrap().to_le_bytes()).await?; + io.write_all(&msg).await + } +} +#[async_trait] +impl CodecTrait for Codec { + type Protocol = &'static str; + type Request = Request; + type Response = Response; + + async fn read_request( + &mut self, + _: &Self::Protocol, + io: &mut R, + ) -> io::Result { + Self::read(io).await + } + async fn read_response( + &mut self, + proto: &Self::Protocol, + io: &mut R, + ) -> io::Result { + Self::read(io).await + } + async fn write_request( + &mut self, + _: &Self::Protocol, + io: &mut W, + req: Request, + ) -> io::Result<()> { + Self::write(io, &req).await + } + async fn write_response( + &mut self, + proto: &Self::Protocol, + io: &mut W, + res: Response, + ) -> io::Result<()> { + Self::write(io, &res).await + } +} + +pub(crate) type Behavior = Behaviour; +pub(crate) fn new_behavior() -> Behavior { + let mut config = Config::default(); + config.set_request_timeout(Duration::from_secs(5)); + Behavior::new([(PROTOCOL, ProtocolSupport::Full)], config) +} diff --git a/coordinator/src/tributary/transaction.rs b/coordinator/src/tributary/transaction.rs index 65391296..0befbf36 100644 --- a/coordinator/src/tributary/transaction.rs +++ b/coordinator/src/tributary/transaction.rs @@ -43,12 +43,14 @@ impl SigningProtocolRound { } } -/// `tributary::Signed` without the nonce. +/// `tributary::Signed` but without the nonce. /// /// All of our nonces are deterministic to the type of transaction and fields within. #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub struct Signed { + /// The signer. pub signer: ::G, + /// The signature. pub signature: SchnorrSignature, }