From 201a444e8927c80bdafc856280af1d3637364249 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Thu, 9 Jan 2025 02:16:05 -0500 Subject: [PATCH] Remove tokio dependency from serai-coordinator-p2p Re-implements tokio::mpsc::oneshot with a thin wrapper around async-channel. Also replaces futures-util with futures-lite. --- Cargo.lock | 41 ++++++++++++++++++++--------- coordinator/p2p/Cargo.toml | 4 +-- coordinator/p2p/libp2p/src/lib.rs | 4 +-- coordinator/p2p/libp2p/src/swarm.rs | 4 +-- coordinator/p2p/src/cosign.rs | 0 coordinator/p2p/src/heartbeat.rs | 2 +- coordinator/p2p/src/lib.rs | 5 ++-- coordinator/p2p/src/oneshot.rs | 35 ++++++++++++++++++++++++ 8 files changed, 73 insertions(+), 22 deletions(-) delete mode 100644 coordinator/p2p/src/cosign.rs create mode 100644 coordinator/p2p/src/oneshot.rs diff --git a/Cargo.lock b/Cargo.lock index dfeea6f6..379c81ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -840,6 +840,18 @@ dependencies = [ "futures-core", ] +[[package]] +name = "async-channel" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-io" version = "2.4.0" @@ -2528,7 +2540,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -3047,7 +3059,10 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cef40d21ae2c515b51041df9ed313ed21e572df340ea58a922a0aefe7e8891a1" dependencies = [ + "fastrand", "futures-core", + "futures-io", + "parking", "pin-project-lite", ] @@ -3548,7 +3563,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.8", "tokio", "tower-service", "tracing", @@ -4112,7 +4127,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -6907,7 +6922,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -7450,7 +7465,7 @@ version = "0.10.0-dev" source = "git+https://github.com/serai-dex/substrate#6e3f07bf5c98a6a3ec15f2b1a46148aa8c7d737a" dependencies = [ "array-bytes", - "async-channel", + "async-channel 1.9.0", "async-trait", "asynchronous-codec", "bytes", @@ -7491,7 +7506,7 @@ name = "sc-network-bitswap" version = "0.10.0-dev" source = "git+https://github.com/serai-dex/substrate#6e3f07bf5c98a6a3ec15f2b1a46148aa8c7d737a" dependencies = [ - "async-channel", + "async-channel 1.9.0", "cid", "futures", "libp2p-identity", @@ -7548,7 +7563,7 @@ version = "0.10.0-dev" source = "git+https://github.com/serai-dex/substrate#6e3f07bf5c98a6a3ec15f2b1a46148aa8c7d737a" dependencies = [ "array-bytes", - "async-channel", + "async-channel 1.9.0", "futures", "libp2p-identity", "log", @@ -7569,7 +7584,7 @@ version = "0.10.0-dev" source = "git+https://github.com/serai-dex/substrate#6e3f07bf5c98a6a3ec15f2b1a46148aa8c7d737a" dependencies = [ "array-bytes", - "async-channel", + "async-channel 1.9.0", "async-trait", "fork-tree", "futures", @@ -7943,7 +7958,7 @@ name = "sc-utils" version = "4.0.0-dev" source = "git+https://github.com/serai-dex/substrate#6e3f07bf5c98a6a3ec15f2b1a46148aa8c7d737a" dependencies = [ - "async-channel", + "async-channel 1.9.0", "futures", "futures-timer", "lazy_static", @@ -8369,14 +8384,14 @@ dependencies = [ name = "serai-coordinator-p2p" version = "0.1.0" dependencies = [ + "async-channel 2.3.1", "borsh", - "futures-util", + "futures-lite", "log", "serai-client", "serai-cosign", "serai-db", "serai-task", - "tokio", "tributary-chain", ] @@ -10528,7 +10543,7 @@ dependencies = [ "getrandom", "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -11752,7 +11767,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/coordinator/p2p/Cargo.toml b/coordinator/p2p/Cargo.toml index 44183258..afb0b483 100644 --- a/coordinator/p2p/Cargo.toml +++ b/coordinator/p2p/Cargo.toml @@ -26,8 +26,8 @@ serai-client = { path = "../../substrate/client", default-features = false, feat serai-cosign = { path = "../cosign" } tributary = { package = "tributary-chain", path = "../tributary" } -futures-util = { version = "0.3", default-features = false, features = ["std"] } -tokio = { version = "1", default-features = false, features = ["sync"] } +async-channel = { version = "2", default-features = false, features = ["std"] } +futures-lite = { version = "2", default-features = false, features = ["std"] } log = { version = "0.4", default-features = false, features = ["std"] } serai-task = { path = "../../common/task", version = "0.1" } diff --git a/coordinator/p2p/libp2p/src/lib.rs b/coordinator/p2p/libp2p/src/lib.rs index 2f6defa7..f15606d0 100644 --- a/coordinator/p2p/libp2p/src/lib.rs +++ b/coordinator/p2p/libp2p/src/lib.rs @@ -19,7 +19,7 @@ use serai_client::{ Serai, }; -use tokio::sync::{mpsc, oneshot, Mutex, RwLock}; +use tokio::sync::{mpsc, Mutex, RwLock}; use serai_task::{Task, ContinuallyRan}; @@ -35,7 +35,7 @@ use libp2p::{ SwarmBuilder, }; -use serai_coordinator_p2p::{Heartbeat, TributaryBlockWithCommit}; +use serai_coordinator_p2p::{oneshot, Heartbeat, TributaryBlockWithCommit}; /// A struct to sync the validators from the Serai node in order to keep track of them. mod validators; diff --git a/coordinator/p2p/libp2p/src/swarm.rs b/coordinator/p2p/libp2p/src/swarm.rs index 0c3e2664..16200954 100644 --- a/coordinator/p2p/libp2p/src/swarm.rs +++ b/coordinator/p2p/libp2p/src/swarm.rs @@ -8,7 +8,7 @@ use borsh::BorshDeserialize; use serai_client::validator_sets::primitives::ValidatorSet; -use tokio::sync::{mpsc, oneshot, RwLock}; +use tokio::sync::{mpsc, RwLock}; use serai_task::TaskHandle; @@ -21,7 +21,7 @@ use libp2p::{ swarm::{dial_opts::DialOpts, SwarmEvent, Swarm}, }; -use serai_coordinator_p2p::Heartbeat; +use serai_coordinator_p2p::{oneshot, Heartbeat}; use crate::{ Peers, BehaviorEvent, Behavior, diff --git a/coordinator/p2p/src/cosign.rs b/coordinator/p2p/src/cosign.rs deleted file mode 100644 index e69de29b..00000000 diff --git a/coordinator/p2p/src/heartbeat.rs b/coordinator/p2p/src/heartbeat.rs index 4966c471..f79151ad 100644 --- a/coordinator/p2p/src/heartbeat.rs +++ b/coordinator/p2p/src/heartbeat.rs @@ -3,7 +3,7 @@ use std::time::{Duration, SystemTime}; use serai_client::validator_sets::primitives::ValidatorSet; -use futures_util::FutureExt; +use futures_lite::FutureExt; use tributary::{ReadWrite, TransactionTrait, Block, Tributary, TributaryReader}; diff --git a/coordinator/p2p/src/lib.rs b/coordinator/p2p/src/lib.rs index 262cb3bf..11c9cf53 100644 --- a/coordinator/p2p/src/lib.rs +++ b/coordinator/p2p/src/lib.rs @@ -8,10 +8,11 @@ use borsh::{BorshSerialize, BorshDeserialize}; use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet}; -use tokio::sync::oneshot; - use serai_cosign::SignedCosign; +/// A oneshot channel. +pub mod oneshot; + /// The heartbeat task, effecting sync of Tributaries pub mod heartbeat; diff --git a/coordinator/p2p/src/oneshot.rs b/coordinator/p2p/src/oneshot.rs new file mode 100644 index 00000000..bced6a1a --- /dev/null +++ b/coordinator/p2p/src/oneshot.rs @@ -0,0 +1,35 @@ +use core::{ + pin::Pin, + task::{Poll, Context}, + future::Future, +}; + +pub use async_channel::{SendError, RecvError}; + +/// The sender for a oneshot channel. +pub struct Sender(async_channel::Sender); +impl Sender { + /// Send a value down the channel. + /// + /// Returns an error if the channel's receiver was dropped. + pub fn send(self, msg: T) -> Result<(), SendError> { + self.0.send_blocking(msg) + } +} + +/// The receiver for a oneshot channel. +pub struct Receiver(async_channel::Receiver); +impl Future for Receiver { + type Output = Result; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let recv = self.0.recv(); + futures_lite::pin!(recv); + recv.poll(cx) + } +} + +/// Create a new oneshot channel. +pub fn channel() -> (Sender, Receiver) { + let (send, recv) = async_channel::bounded(1); + (Sender(send), Receiver(recv)) +}