Remove tokio dependency from serai-coordinator-p2p

Re-implements tokio::mpsc::oneshot with a thin wrapper around async-channel.

Also replaces futures-util with futures-lite.
This commit is contained in:
Luke Parker
2025-01-09 02:16:05 -05:00
parent 9833911e06
commit 201a444e89
8 changed files with 73 additions and 22 deletions

View File

@@ -26,8 +26,8 @@ serai-client = { path = "../../substrate/client", default-features = false, feat
serai-cosign = { path = "../cosign" }
tributary = { package = "tributary-chain", path = "../tributary" }
futures-util = { version = "0.3", default-features = false, features = ["std"] }
tokio = { version = "1", default-features = false, features = ["sync"] }
async-channel = { version = "2", default-features = false, features = ["std"] }
futures-lite = { version = "2", default-features = false, features = ["std"] }
log = { version = "0.4", default-features = false, features = ["std"] }
serai-task = { path = "../../common/task", version = "0.1" }

View File

@@ -19,7 +19,7 @@ use serai_client::{
Serai,
};
use tokio::sync::{mpsc, oneshot, Mutex, RwLock};
use tokio::sync::{mpsc, Mutex, RwLock};
use serai_task::{Task, ContinuallyRan};
@@ -35,7 +35,7 @@ use libp2p::{
SwarmBuilder,
};
use serai_coordinator_p2p::{Heartbeat, TributaryBlockWithCommit};
use serai_coordinator_p2p::{oneshot, Heartbeat, TributaryBlockWithCommit};
/// A struct to sync the validators from the Serai node in order to keep track of them.
mod validators;

View File

@@ -8,7 +8,7 @@ use borsh::BorshDeserialize;
use serai_client::validator_sets::primitives::ValidatorSet;
use tokio::sync::{mpsc, oneshot, RwLock};
use tokio::sync::{mpsc, RwLock};
use serai_task::TaskHandle;
@@ -21,7 +21,7 @@ use libp2p::{
swarm::{dial_opts::DialOpts, SwarmEvent, Swarm},
};
use serai_coordinator_p2p::Heartbeat;
use serai_coordinator_p2p::{oneshot, Heartbeat};
use crate::{
Peers, BehaviorEvent, Behavior,

View File

@@ -3,7 +3,7 @@ use std::time::{Duration, SystemTime};
use serai_client::validator_sets::primitives::ValidatorSet;
use futures_util::FutureExt;
use futures_lite::FutureExt;
use tributary::{ReadWrite, TransactionTrait, Block, Tributary, TributaryReader};

View File

@@ -8,10 +8,11 @@ use borsh::{BorshSerialize, BorshDeserialize};
use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet};
use tokio::sync::oneshot;
use serai_cosign::SignedCosign;
/// A oneshot channel.
pub mod oneshot;
/// The heartbeat task, effecting sync of Tributaries
pub mod heartbeat;

View File

@@ -0,0 +1,35 @@
use core::{
pin::Pin,
task::{Poll, Context},
future::Future,
};
pub use async_channel::{SendError, RecvError};
/// The sender for a oneshot channel.
pub struct Sender<T: Send>(async_channel::Sender<T>);
impl<T: Send> Sender<T> {
/// Send a value down the channel.
///
/// Returns an error if the channel's receiver was dropped.
pub fn send(self, msg: T) -> Result<(), SendError<T>> {
self.0.send_blocking(msg)
}
}
/// The receiver for a oneshot channel.
pub struct Receiver<T: Send>(async_channel::Receiver<T>);
impl<T: Send> Future for Receiver<T> {
type Output = Result<T, RecvError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let recv = self.0.recv();
futures_lite::pin!(recv);
recv.poll(cx)
}
}
/// Create a new oneshot channel.
pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
let (send, recv) = async_channel::bounded(1);
(Sender(send), Receiver(recv))
}