6 Commits

Author SHA1 Message Date
Luke Parker
b2bd5d3a44 Remove Debug bound on tributary::P2p 2025-01-08 17:40:32 -05:00
Luke Parker
de2d6568a4 Actually implement the Peer abstraction for Libp2p 2025-01-08 17:40:08 -05:00
Luke Parker
fd9b464b35 Add a trait for the P2p network used in the coordinator
Moves all of the Libp2p code to a dedicated directory. Makes the Heartbeat task
abstract over any P2p network.
2025-01-08 17:01:37 -05:00
Luke Parker
376a66b000 Remove async-trait from tendermint-machine, tributary-chain 2025-01-08 16:41:11 -05:00
Luke Parker
2121a9b131 Spawn the task to select validators to dial 2025-01-07 18:17:36 -05:00
Luke Parker
419223c54e Build the swarm
Moves UpdateSharedValidatorsTask to validators.rs. While prior planned to
re-use a validators object across connecting and peer state management, the
current plan is to use an independent validators object for each to minimize
any contention. They should be built infrequently enough, and cheap enough to
update in the majority case (due to quickly checking if an update is needed),
that this is fine.
2025-01-07 18:09:25 -05:00
18 changed files with 587 additions and 324 deletions

2
Cargo.lock generated
View File

@@ -10498,7 +10498,6 @@ dependencies = [
name = "tendermint-machine" name = "tendermint-machine"
version = "0.2.0" version = "0.2.0"
dependencies = [ dependencies = [
"async-trait",
"futures-channel", "futures-channel",
"futures-util", "futures-util",
"hex", "hex",
@@ -10941,7 +10940,6 @@ dependencies = [
name = "tributary-chain" name = "tributary-chain"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"async-trait",
"blake2", "blake2",
"ciphersuite", "ciphersuite",
"flexible-transcript", "flexible-transcript",

View File

@@ -1,9 +1,10 @@
use core::future::Future; use core::future::Future;
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
use serai_client::validator_sets::primitives::ValidatorSet; use serai_client::validator_sets::primitives::ValidatorSet;
use futures_util::FutureExt;
use tributary::{ReadWrite, Block, Tributary, TributaryReader}; use tributary::{ReadWrite, Block, Tributary, TributaryReader};
use serai_db::*; use serai_db::*;
@@ -11,10 +12,7 @@ use serai_task::ContinuallyRan;
use crate::{ use crate::{
tributary::Transaction, tributary::Transaction,
p2p::{ p2p::{Peer, P2p},
reqres::{Request, Response},
P2p,
},
}; };
// Amount of blocks in a minute // Amount of blocks in a minute
@@ -28,14 +26,14 @@ pub const BLOCKS_PER_BATCH: usize = BLOCKS_PER_MINUTE + 1;
/// ///
/// If the other validator has more blocks then we do, they're expected to inform us. This forms /// If the other validator has more blocks then we do, they're expected to inform us. This forms
/// the sync protocol for our Tributaries. /// the sync protocol for our Tributaries.
struct HeartbeatTask<TD: Db> { struct HeartbeatTask<TD: Db, P: P2p> {
set: ValidatorSet, set: ValidatorSet,
tributary: Tributary<TD, Transaction, P2p>, tributary: Tributary<TD, Transaction, P>,
reader: TributaryReader<TD, Transaction>, reader: TributaryReader<TD, Transaction>,
p2p: P2p, p2p: P,
} }
impl<TD: Db> ContinuallyRan for HeartbeatTask<TD> { impl<TD: Db, P: P2p> ContinuallyRan for HeartbeatTask<TD, P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move { async move {
// If our blockchain hasn't had a block in the past minute, trigger the heartbeat protocol // If our blockchain hasn't had a block in the past minute, trigger the heartbeat protocol
@@ -74,8 +72,10 @@ impl<TD: Db> ContinuallyRan for HeartbeatTask<TD> {
tip = self.reader.tip(); tip = self.reader.tip();
tip_is_stale = false; tip_is_stale = false;
} }
let request = Request::Heartbeat { set: self.set, latest_block_hash: tip }; // Necessary due to https://github.com/rust-lang/rust/issues/100013
let Ok(Response::Blocks(blocks)) = peer.send(request).await else { continue 'peer }; let Some(blocks) = peer.send_heartbeat(self.set, tip).boxed().await else {
continue 'peer;
};
// This is the final batch if it has less than the maximum amount of blocks // This is the final batch if it has less than the maximum amount of blocks
// (signifying there weren't more blocks after this to fill the batch with) // (signifying there weren't more blocks after this to fill the batch with)

View File

@@ -19,14 +19,15 @@ use libp2p::{
noise, noise,
}; };
use crate::p2p::{validators::Validators, peer_id_from_public}; use crate::p2p::libp2p::{validators::Validators, peer_id_from_public};
const PROTOCOL: &str = "/serai/coordinator/validators"; const PROTOCOL: &str = "/serai/coordinator/validators";
struct OnlyValidators { #[derive(Clone)]
validators: Arc<RwLock<Validators>>, pub(crate) struct OnlyValidators {
serai_key: Zeroizing<Keypair>, pub(crate) validators: Arc<RwLock<Validators>>,
noise_keypair: identity::Keypair, pub(crate) serai_key: Zeroizing<Keypair>,
pub(crate) noise_keypair: identity::Keypair,
} }
impl OnlyValidators { impl OnlyValidators {

View File

@@ -14,7 +14,7 @@ use libp2p::{
use serai_task::ContinuallyRan; use serai_task::ContinuallyRan;
use crate::p2p::{PORT, Peers, validators::Validators}; use crate::p2p::libp2p::{PORT, Peers, validators::Validators};
const TARGET_PEERS_PER_NETWORK: usize = 5; const TARGET_PEERS_PER_NETWORK: usize = 5;
/* /*
@@ -28,13 +28,19 @@ const TARGET_PEERS_PER_NETWORK: usize = 5;
*/ */
// TODO const TARGET_DIALED_PEERS_PER_NETWORK: usize = 3; // TODO const TARGET_DIALED_PEERS_PER_NETWORK: usize = 3;
struct DialTask { pub(crate) struct DialTask {
serai: Serai, serai: Serai,
validators: Validators, validators: Validators,
peers: Peers, peers: Peers,
to_dial: mpsc::UnboundedSender<DialOpts>, to_dial: mpsc::UnboundedSender<DialOpts>,
} }
impl DialTask {
pub(crate) fn new(serai: Serai, peers: Peers, to_dial: mpsc::UnboundedSender<DialOpts>) -> Self {
DialTask { serai: serai.clone(), validators: Validators::new(serai), peers, to_dial }
}
}
impl ContinuallyRan for DialTask { impl ContinuallyRan for DialTask {
// Only run every five minutes, not the default of every five seconds // Only run every five minutes, not the default of every five seconds
const DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60; const DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60;

View File

@@ -15,7 +15,7 @@ pub use libp2p::gossipsub::Event;
use serai_cosign::SignedCosign; use serai_cosign::SignedCosign;
// Block size limit + 16 KB of space for signatures/metadata // Block size limit + 16 KB of space for signatures/metadata
const MAX_LIBP2P_GOSSIP_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 16384; pub(crate) const MAX_LIBP2P_GOSSIP_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 16384;
const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(80); const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(80);

View File

@@ -0,0 +1,214 @@
use core::{future::Future, time::Duration};
use std::{
sync::Arc,
collections::{HashSet, HashMap},
};
use zeroize::Zeroizing;
use schnorrkel::Keypair;
use serai_client::{
primitives::{NetworkId, PublicKey},
validator_sets::primitives::ValidatorSet,
Serai,
};
use tokio::sync::{mpsc, oneshot, RwLock};
use serai_task::{Task, ContinuallyRan};
use libp2p::{
multihash::Multihash,
identity::{self, PeerId},
tcp::Config as TcpConfig,
yamux,
swarm::NetworkBehaviour,
SwarmBuilder,
};
use crate::p2p::TributaryBlockWithCommit;
/// A struct to sync the validators from the Serai node in order to keep track of them.
mod validators;
use validators::UpdateValidatorsTask;
/// The authentication protocol upgrade to limit the P2P network to active validators.
mod authenticate;
use authenticate::OnlyValidators;
/// The dial task, to find new peers to connect to
mod dial;
use dial::DialTask;
/// The request-response messages and behavior
mod reqres;
use reqres::{Request, Response};
/// The gossip messages and behavior
mod gossip;
/// The swarm task, running it and dispatching to/from it
mod swarm;
use swarm::SwarmTask;
const PORT: u16 = 30563; // 5132 ^ (('c' << 8) | 'o')
// usize::max, manually implemented, as max isn't a const fn
const MAX_LIBP2P_MESSAGE_SIZE: usize =
if gossip::MAX_LIBP2P_GOSSIP_MESSAGE_SIZE > reqres::MAX_LIBP2P_REQRES_MESSAGE_SIZE {
gossip::MAX_LIBP2P_GOSSIP_MESSAGE_SIZE
} else {
reqres::MAX_LIBP2P_REQRES_MESSAGE_SIZE
};
fn peer_id_from_public(public: PublicKey) -> PeerId {
// 0 represents the identity Multihash, that no hash was performed
// It's an internal constant so we can't refer to the constant inside libp2p
PeerId::from_multihash(Multihash::wrap(0, &public.0).unwrap()).unwrap()
}
struct Peer<'a> {
outbound_requests: &'a mpsc::UnboundedSender<(PeerId, Request, oneshot::Sender<Response>)>,
id: PeerId,
}
impl crate::p2p::Peer<'_> for Peer<'_> {
fn send_heartbeat(
&self,
set: ValidatorSet,
latest_block_hash: [u8; 32],
) -> impl Send + Future<Output = Option<Vec<TributaryBlockWithCommit>>> {
const HEARBEAT_TIMEOUT: Duration = Duration::from_secs(5);
async move {
let request = Request::Heartbeat { set, latest_block_hash };
let (sender, receiver) = oneshot::channel();
self
.outbound_requests
.send((self.id, request, sender))
.expect("outbound requests recv channel was dropped?");
match tokio::time::timeout(HEARBEAT_TIMEOUT, receiver).await.ok()?.ok()? {
Response::None => Some(vec![]),
Response::Blocks(blocks) => Some(blocks),
// TODO: Disconnect this peer
Response::NotableCosigns(_) => None,
}
}
}
}
#[derive(Clone)]
struct Peers {
peers: Arc<RwLock<HashMap<NetworkId, HashSet<PeerId>>>>,
}
#[derive(NetworkBehaviour)]
struct Behavior {
reqres: reqres::Behavior,
gossip: gossip::Behavior,
}
#[derive(Clone)]
struct Libp2p {
peers: Peers,
outbound_requests: mpsc::UnboundedSender<(PeerId, Request, oneshot::Sender<Response>)>,
}
impl Libp2p {
pub(crate) fn new(serai_key: &Zeroizing<Keypair>, serai: Serai) -> Libp2p {
// Define the object we track peers with
let peers = Peers { peers: Arc::new(RwLock::new(HashMap::new())) };
// Define the dial task
let (dial_task_def, dial_task) = Task::new();
let (to_dial_send, to_dial_recv) = mpsc::unbounded_channel();
tokio::spawn(
DialTask::new(serai.clone(), peers.clone(), to_dial_send)
.continually_run(dial_task_def, vec![]),
);
// Define the Validators object used for validating new connections
let connection_validators = UpdateValidatorsTask::spawn(serai.clone());
let new_only_validators = |noise_keypair: &identity::Keypair| -> Result<_, ()> {
Ok(OnlyValidators {
serai_key: serai_key.clone(),
validators: connection_validators.clone(),
noise_keypair: noise_keypair.clone(),
})
};
let new_yamux = || {
let mut config = yamux::Config::default();
// 1 MiB default + max message size
config.set_max_buffer_size((1024 * 1024) + MAX_LIBP2P_MESSAGE_SIZE);
// 256 KiB default + max message size
config.set_receive_window_size(((256 * 1024) + MAX_LIBP2P_MESSAGE_SIZE).try_into().unwrap());
config
};
let behavior = Behavior { reqres: reqres::new_behavior(), gossip: gossip::new_behavior() };
let mut swarm = SwarmBuilder::with_existing_identity(identity::Keypair::generate_ed25519())
.with_tokio()
.with_tcp(TcpConfig::default().nodelay(false), new_only_validators, new_yamux)
.unwrap()
.with_behaviour(|_| behavior)
.unwrap()
.build();
swarm.listen_on(format!("/ip4/0.0.0.0/tcp/{PORT}").parse().unwrap()).unwrap();
swarm.listen_on(format!("/ip6/::/tcp/{PORT}").parse().unwrap()).unwrap();
let swarm_validators = UpdateValidatorsTask::spawn(serai);
let (gossip_send, gossip_recv) = mpsc::unbounded_channel();
let (signed_cosigns_send, signed_cosigns_recv) = mpsc::unbounded_channel();
let (tributary_gossip_send, tributary_gossip_recv) = mpsc::unbounded_channel();
let (outbound_requests_send, outbound_requests_recv) = mpsc::unbounded_channel();
let (heartbeat_requests_send, heartbeat_requests_recv) = mpsc::unbounded_channel();
let (notable_cosign_requests_send, notable_cosign_requests_recv) = mpsc::unbounded_channel();
let (inbound_request_responses_send, inbound_request_responses_recv) =
mpsc::unbounded_channel();
// Create the swarm task
SwarmTask::spawn(
dial_task,
to_dial_recv,
swarm_validators,
peers,
swarm,
gossip_recv,
signed_cosigns_send,
tributary_gossip_send,
outbound_requests_recv,
heartbeat_requests_send,
notable_cosign_requests_send,
inbound_request_responses_recv,
);
// gossip_send, signed_cosigns_recv, tributary_gossip_recv, outbound_requests_send,
// heartbeat_requests_recv, notable_cosign_requests_recv, inbound_request_responses_send
todo!("TODO");
}
}
impl tributary::P2p for Libp2p {
fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) -> impl Send + Future<Output = ()> {
async move { todo!("TODO") }
}
}
impl crate::p2p::P2p for Libp2p {
type Peer<'a> = Peer<'a>;
fn peers(&self, network: NetworkId) -> impl Send + Future<Output = Vec<Self::Peer<'_>>> {
async move {
let Some(peer_ids) = self.peers.peers.read().await.get(&network).cloned() else {
return vec![];
};
let mut res = vec![];
for id in peer_ids {
res.push(Peer { outbound_requests: &self.outbound_requests, id });
}
res
}
}
}

View File

@@ -15,9 +15,11 @@ pub use request_response::Message;
use serai_cosign::SignedCosign; use serai_cosign::SignedCosign;
use crate::p2p::TributaryBlockWithCommit;
/// The maximum message size for the request-response protocol /// The maximum message size for the request-response protocol
// This is derived from the heartbeat message size as it's our largest message // This is derived from the heartbeat message size as it's our largest message
const MAX_LIBP2P_REQRES_MESSAGE_SIZE: usize = pub(crate) const MAX_LIBP2P_REQRES_MESSAGE_SIZE: usize =
(tributary::BLOCK_SIZE_LIMIT * crate::p2p::heartbeat::BLOCKS_PER_BATCH) + 1024; (tributary::BLOCK_SIZE_LIMIT * crate::p2p::heartbeat::BLOCKS_PER_BATCH) + 1024;
const PROTOCOL: &str = "/serai/coordinator/reqres/1.0.0"; const PROTOCOL: &str = "/serai/coordinator/reqres/1.0.0";
@@ -36,24 +38,17 @@ pub(crate) enum Request {
NotableCosigns { global_session: [u8; 32] }, NotableCosigns { global_session: [u8; 32] },
} }
/// A tributary block and its commit.
#[derive(Clone, BorshSerialize, BorshDeserialize)]
pub(crate) struct TributaryBlockWithCommit {
pub(crate) block: Vec<u8>,
pub(crate) commit: Vec<u8>,
}
/// Responses which can be received via the request-response protocol. /// Responses which can be received via the request-response protocol.
#[derive(Clone, BorshSerialize, BorshDeserialize)] #[derive(Clone, BorshSerialize, BorshDeserialize)]
pub(crate) enum Response { pub(crate) enum Response {
NoResponse, None,
Blocks(Vec<TributaryBlockWithCommit>), Blocks(Vec<TributaryBlockWithCommit>),
NotableCosigns(Vec<SignedCosign>), NotableCosigns(Vec<SignedCosign>),
} }
impl fmt::Debug for Response { impl fmt::Debug for Response {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self { match self {
Response::NoResponse => fmt.debug_struct("Response::NoResponse").finish(), Response::None => fmt.debug_struct("Response::None").finish(),
Response::Blocks(_) => fmt.debug_struct("Response::Block").finish_non_exhaustive(), Response::Blocks(_) => fmt.debug_struct("Response::Block").finish_non_exhaustive(),
Response::NotableCosigns(_) => { Response::NotableCosigns(_) => {
fmt.debug_struct("Response::NotableCosigns").finish_non_exhaustive() fmt.debug_struct("Response::NotableCosigns").finish_non_exhaustive()

View File

@@ -21,13 +21,16 @@ use libp2p::{
swarm::{dial_opts::DialOpts, SwarmEvent, Swarm}, swarm::{dial_opts::DialOpts, SwarmEvent, Swarm},
}; };
use crate::p2p::{ use crate::p2p::libp2p::{
Peers, BehaviorEvent, Behavior, Peers, BehaviorEvent, Behavior,
validators::Validators, validators::Validators,
reqres::{self, Request, Response}, reqres::{self, Request, Response},
gossip, gossip,
}; };
const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(80);
const TIME_BETWEEN_REBUILD_PEERS: Duration = Duration::from_secs(10 * 60);
/* /*
`SwarmTask` handles everything we need the `Swarm` object for. The goal is to minimize the `SwarmTask` handles everything we need the `Swarm` object for. The goal is to minimize the
contention on this task. Unfortunately, the `Swarm` object itself is needed for a variety of contention on this task. Unfortunately, the `Swarm` object itself is needed for a variety of
@@ -43,7 +46,7 @@ use crate::p2p::{
- Dispatching received requests - Dispatching received requests
- Sending responses - Sending responses
*/ */
struct SwarmTask { pub(crate) struct SwarmTask {
dial_task: TaskHandle, dial_task: TaskHandle,
to_dial: mpsc::UnboundedReceiver<DialOpts>, to_dial: mpsc::UnboundedReceiver<DialOpts>,
last_dial_task_run: Instant, last_dial_task_run: Instant,
@@ -54,12 +57,14 @@ struct SwarmTask {
swarm: Swarm<Behavior>, swarm: Swarm<Behavior>,
last_message: Instant,
gossip: mpsc::UnboundedReceiver<gossip::Message>, gossip: mpsc::UnboundedReceiver<gossip::Message>,
signed_cosigns: mpsc::UnboundedSender<SignedCosign>, signed_cosigns: mpsc::UnboundedSender<SignedCosign>,
tributary_gossip: mpsc::UnboundedSender<(ValidatorSet, Vec<u8>)>, tributary_gossip: mpsc::UnboundedSender<(ValidatorSet, Vec<u8>)>,
outbound_requests: mpsc::UnboundedReceiver<(PeerId, Request, oneshot::Sender<Option<Response>>)>, outbound_requests: mpsc::UnboundedReceiver<(PeerId, Request, oneshot::Sender<Response>)>,
outbound_request_responses: HashMap<RequestId, oneshot::Sender<Option<Response>>>, outbound_request_responses: HashMap<RequestId, oneshot::Sender<Response>>,
inbound_request_response_channels: HashMap<RequestId, ResponseChannel<Response>>, inbound_request_response_channels: HashMap<RequestId, ResponseChannel<Response>>,
heartbeat_requests: mpsc::UnboundedSender<(RequestId, ValidatorSet, [u8; 32])>, heartbeat_requests: mpsc::UnboundedSender<(RequestId, ValidatorSet, [u8; 32])>,
@@ -99,35 +104,31 @@ impl SwarmTask {
fn handle_reqres(&mut self, event: reqres::Event) { fn handle_reqres(&mut self, event: reqres::Event) {
match event { match event {
reqres::Event::Message { message, .. } => match message { reqres::Event::Message { message, .. } => match message {
reqres::Message::Request { request_id, request, channel } => { reqres::Message::Request { request_id, request, channel } => match request {
match request { reqres::Request::KeepAlive => {
// TODO: Send these let _: Result<_, _> =
reqres::Request::KeepAlive => { self.swarm.behaviour_mut().reqres.send_response(channel, Response::None);
let _: Result<_, _> =
self.swarm.behaviour_mut().reqres.send_response(channel, Response::NoResponse);
}
reqres::Request::Heartbeat { set, latest_block_hash } => {
self.inbound_request_response_channels.insert(request_id, channel);
let _: Result<_, _> =
self.heartbeat_requests.send((request_id, set, latest_block_hash));
}
reqres::Request::NotableCosigns { global_session } => {
self.inbound_request_response_channels.insert(request_id, channel);
let _: Result<_, _> = self.notable_cosign_requests.send((request_id, global_session));
}
} }
} reqres::Request::Heartbeat { set, latest_block_hash } => {
self.inbound_request_response_channels.insert(request_id, channel);
let _: Result<_, _> =
self.heartbeat_requests.send((request_id, set, latest_block_hash));
}
reqres::Request::NotableCosigns { global_session } => {
self.inbound_request_response_channels.insert(request_id, channel);
let _: Result<_, _> = self.notable_cosign_requests.send((request_id, global_session));
}
},
reqres::Message::Response { request_id, response } => { reqres::Message::Response { request_id, response } => {
// Send Some(response) as the response for the request
if let Some(channel) = self.outbound_request_responses.remove(&request_id) { if let Some(channel) = self.outbound_request_responses.remove(&request_id) {
let _: Result<_, _> = channel.send(Some(response)); let _: Result<_, _> = channel.send(response);
} }
} }
}, },
reqres::Event::OutboundFailure { request_id, .. } => { reqres::Event::OutboundFailure { request_id, .. } => {
// Send None as the response for the request // Send None as the response for the request
if let Some(channel) = self.outbound_request_responses.remove(&request_id) { if let Some(channel) = self.outbound_request_responses.remove(&request_id) {
let _: Result<_, _> = channel.send(None); let _: Result<_, _> = channel.send(Response::None);
} }
} }
reqres::Event::InboundFailure { .. } | reqres::Event::ResponseSent { .. } => {} reqres::Event::InboundFailure { .. } | reqres::Event::ResponseSent { .. } => {}
@@ -136,9 +137,19 @@ impl SwarmTask {
async fn run(mut self) { async fn run(mut self) {
loop { loop {
let time_till_keep_alive = Instant::now().saturating_duration_since(self.last_message);
let time_till_rebuild_peers = self.rebuild_peers_at.saturating_duration_since(Instant::now()); let time_till_rebuild_peers = self.rebuild_peers_at.saturating_duration_since(Instant::now());
tokio::select! { tokio::select! {
() = tokio::time::sleep(time_till_keep_alive) => {
let peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
let behavior = self.swarm.behaviour_mut();
for peer in peers {
behavior.reqres.send_request(&peer, Request::KeepAlive);
}
self.last_message = Instant::now();
}
// Dial peers we're instructed to // Dial peers we're instructed to
dial_opts = self.to_dial.recv() => { dial_opts = self.to_dial.recv() => {
let dial_opts = dial_opts.expect("DialTask was closed?"); let dial_opts = dial_opts.expect("DialTask was closed?");
@@ -156,8 +167,6 @@ impl SwarmTask {
We also use this to disconnect all peers who are no longer active in any network. We also use this to disconnect all peers who are no longer active in any network.
*/ */
() = tokio::time::sleep(time_till_rebuild_peers) => { () = tokio::time::sleep(time_till_rebuild_peers) => {
const TIME_BETWEEN_REBUILD_PEERS: Duration = Duration::from_secs(10 * 60);
let validators_by_network = self.validators.read().await.by_network().clone(); let validators_by_network = self.validators.read().await.by_network().clone();
let connected_peers = self.swarm.connected_peers().copied().collect::<HashSet<_>>(); let connected_peers = self.swarm.connected_peers().copied().collect::<HashSet<_>>();
@@ -253,6 +262,7 @@ impl SwarmTask {
let topic = message.topic(); let topic = message.topic();
let message = borsh::to_vec(&message).unwrap(); let message = borsh::to_vec(&message).unwrap();
let _: Result<_, _> = self.swarm.behaviour_mut().gossip.publish(topic, message); let _: Result<_, _> = self.swarm.behaviour_mut().gossip.publish(topic, message);
self.last_message = Instant::now();
} }
request = self.outbound_requests.recv() => { request = self.outbound_requests.recv() => {
@@ -273,4 +283,54 @@ impl SwarmTask {
} }
} }
} }
#[allow(clippy::too_many_arguments)]
pub(crate) fn spawn(
dial_task: TaskHandle,
to_dial: mpsc::UnboundedReceiver<DialOpts>,
validators: Arc<RwLock<Validators>>,
peers: Peers,
swarm: Swarm<Behavior>,
gossip: mpsc::UnboundedReceiver<gossip::Message>,
signed_cosigns: mpsc::UnboundedSender<SignedCosign>,
tributary_gossip: mpsc::UnboundedSender<(ValidatorSet, Vec<u8>)>,
outbound_requests: mpsc::UnboundedReceiver<(PeerId, Request, oneshot::Sender<Response>)>,
heartbeat_requests: mpsc::UnboundedSender<(RequestId, ValidatorSet, [u8; 32])>,
notable_cosign_requests: mpsc::UnboundedSender<(RequestId, [u8; 32])>,
inbound_request_responses: mpsc::UnboundedReceiver<(RequestId, Response)>,
) {
tokio::spawn(
SwarmTask {
dial_task,
to_dial,
last_dial_task_run: Instant::now(),
validators,
peers,
rebuild_peers_at: Instant::now() + TIME_BETWEEN_REBUILD_PEERS,
swarm,
last_message: Instant::now(),
gossip,
signed_cosigns,
tributary_gossip,
outbound_requests,
outbound_request_responses: HashMap::new(),
inbound_request_response_channels: HashMap::new(),
heartbeat_requests,
notable_cosign_requests,
inbound_request_responses,
}
.run(),
);
}
} }

View File

@@ -1,4 +1,4 @@
use core::borrow::Borrow; use core::{borrow::Borrow, future::Future};
use std::{ use std::{
sync::Arc, sync::Arc,
collections::{HashSet, HashMap}, collections::{HashSet, HashMap},
@@ -6,12 +6,14 @@ use std::{
use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, Serai}; use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, Serai};
use serai_task::{Task, ContinuallyRan};
use libp2p::PeerId; use libp2p::PeerId;
use futures_util::stream::{StreamExt, FuturesUnordered}; use futures_util::stream::{StreamExt, FuturesUnordered};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use crate::p2p::peer_id_from_public; use crate::p2p::libp2p::peer_id_from_public;
pub(crate) struct Validators { pub(crate) struct Validators {
serai: Serai, serai: Serai,
@@ -25,6 +27,15 @@ pub(crate) struct Validators {
} }
impl Validators { impl Validators {
pub(crate) fn new(serai: Serai) -> Self {
Validators {
serai,
sessions: HashMap::new(),
by_network: HashMap::new(),
validators: HashMap::new(),
}
}
async fn session_changes( async fn session_changes(
serai: impl Borrow<Serai>, serai: impl Borrow<Serai>,
sessions: impl Borrow<HashMap<NetworkId, Session>>, sessions: impl Borrow<HashMap<NetworkId, Session>>,
@@ -103,8 +114,6 @@ impl Validators {
} }
/// Update the view of the validators. /// Update the view of the validators.
///
/// Returns all validators removed from the active validator set.
pub(crate) async fn update(&mut self) -> Result<(), String> { pub(crate) async fn update(&mut self) -> Result<(), String> {
let session_changes = Self::session_changes(&self.serai, &self.sessions).await?; let session_changes = Self::session_changes(&self.serai, &self.sessions).await?;
self.incorporate_session_changes(session_changes); self.incorporate_session_changes(session_changes);
@@ -124,19 +133,51 @@ impl Validators {
} }
} }
/// Update the view of the validators. /// A task which updates a set of validators.
/// ///
/// This minimizes the time an exclusive lock is held over the validators to minimize the /// The validators managed by this tak will have their exclusive lock held for a minimal amount of
/// disruption to functioning. /// time while the update occurs to minimize the disruption to the services relying on it.
/// pub(crate) struct UpdateValidatorsTask {
/// Returns all validators removed from the active validator set. validators: Arc<RwLock<Validators>>,
pub(crate) async fn update_shared_validators( }
validators: &Arc<RwLock<Validators>>,
) -> Result<(), String> { impl UpdateValidatorsTask {
let session_changes = { /// Spawn a new instance of the UpdateValidatorsTask.
let validators = validators.read().await; ///
Validators::session_changes(validators.serai.clone(), validators.sessions.clone()).await? /// This returns a reference to the Validators it updates after spawning itself.
}; pub(crate) fn spawn(serai: Serai) -> Arc<RwLock<Validators>> {
validators.write().await.incorporate_session_changes(session_changes); // The validators which will be updated
Ok(()) let validators = Arc::new(RwLock::new(Validators::new(serai)));
// Define the task
let (update_validators_task, update_validators_task_handle) = Task::new();
// Forget the handle, as dropping the handle would stop the task
core::mem::forget(update_validators_task_handle);
// Spawn the task
tokio::spawn(
(Self { validators: validators.clone() }).continually_run(update_validators_task, vec![]),
);
// Return the validators
validators
}
}
impl ContinuallyRan for UpdateValidatorsTask {
// Only run every minute, not the default of every five seconds
const DELAY_BETWEEN_ITERATIONS: u64 = 60;
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move {
let session_changes = {
let validators = self.validators.read().await;
Validators::session_changes(validators.serai.clone(), validators.sessions.clone())
.await
.map_err(|e| format!("{e:?}"))?
};
self.validators.write().await.incorporate_session_changes(session_changes);
Ok(true)
}
}
} }

View File

@@ -1,94 +1,31 @@
use core::future::Future; use core::future::Future;
use std::{
sync::Arc,
collections::{HashSet, HashMap},
};
use serai_client::primitives::{NetworkId, PublicKey}; use borsh::{BorshSerialize, BorshDeserialize};
use tokio::sync::RwLock; use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet};
use serai_task::ContinuallyRan; /// The libp2p-backed P2p network
mod libp2p;
use libp2p::{multihash::Multihash, identity::PeerId, swarm::NetworkBehaviour};
/// A struct to sync the validators from the Serai node in order to keep track of them.
mod validators;
use validators::{Validators, update_shared_validators};
/// The authentication protocol upgrade to limit the P2P network to active validators.
mod authenticate;
/// The dial task, to find new peers to connect to
mod dial;
/// The request-response messages and behavior
mod reqres;
use reqres::{Request, Response};
/// The gossip messages and behavior
mod gossip;
/// The heartbeat task, effecting sync of Tributaries /// The heartbeat task, effecting sync of Tributaries
mod heartbeat; mod heartbeat;
/// The swarm task, running it and dispatching to/from it /// A tributary block and its commit.
mod swarm; #[derive(Clone, BorshSerialize, BorshDeserialize)]
pub(crate) struct TributaryBlockWithCommit {
const PORT: u16 = 30563; // 5132 ^ (('c' << 8) | 'o') pub(crate) block: Vec<u8>,
pub(crate) commit: Vec<u8>,
fn peer_id_from_public(public: PublicKey) -> PeerId {
// 0 represents the identity Multihash, that no hash was performed
// It's an internal constant so we can't refer to the constant inside libp2p
PeerId::from_multihash(Multihash::wrap(0, &public.0).unwrap()).unwrap()
} }
struct Peer; trait Peer<'a>: Send {
impl Peer { fn send_heartbeat(
async fn send(&self, request: Request) -> Result<Response, tokio::time::error::Elapsed> { &self,
(async move { todo!("TODO") }).await set: ValidatorSet,
} latest_block_hash: [u8; 32],
) -> impl Send + Future<Output = Option<Vec<TributaryBlockWithCommit>>>;
} }
#[derive(Clone)] trait P2p: Send + Sync + tributary::P2p {
struct Peers { type Peer<'a>: Peer<'a>;
peers: Arc<RwLock<HashMap<NetworkId, HashSet<PeerId>>>>, fn peers(&self, network: NetworkId) -> impl Send + Future<Output = Vec<Self::Peer<'_>>>;
}
#[derive(Clone, Debug)]
struct P2p;
impl P2p {
async fn peers(&self, set: NetworkId) -> Vec<Peer> {
(async move { todo!("TODO") }).await
}
}
#[async_trait::async_trait]
impl tributary::P2p for P2p {
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) {
todo!("TODO")
}
}
#[derive(NetworkBehaviour)]
struct Behavior {
reqres: reqres::Behavior,
gossip: gossip::Behavior,
}
struct UpdateSharedValidatorsTask {
validators: Arc<RwLock<Validators>>,
}
impl ContinuallyRan for UpdateSharedValidatorsTask {
// Only run every minute, not the default of every five seconds
const DELAY_BETWEEN_ITERATIONS: u64 = 60;
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move {
update_shared_validators(&self.validators).await.map_err(|e| format!("{e:?}"))?;
Ok(true)
}
}
} }

View File

@@ -16,7 +16,6 @@ rustdoc-args = ["--cfg", "docsrs"]
workspace = true workspace = true
[dependencies] [dependencies]
async-trait = { version = "0.1", default-features = false }
thiserror = { version = "2", default-features = false, features = ["std"] } thiserror = { version = "2", default-features = false, features = ["std"] }
subtle = { version = "^2", default-features = false, features = ["std"] } subtle = { version = "^2", default-features = false, features = ["std"] }

View File

@@ -1,8 +1,6 @@
use core::{marker::PhantomData, fmt::Debug}; use core::{marker::PhantomData, fmt::Debug, future::Future};
use std::{sync::Arc, io}; use std::{sync::Arc, io};
use async_trait::async_trait;
use zeroize::Zeroizing; use zeroize::Zeroizing;
use ciphersuite::{Ciphersuite, Ristretto}; use ciphersuite::{Ciphersuite, Ristretto};
@@ -131,20 +129,18 @@ pub trait ReadWrite: Sized {
} }
} }
#[async_trait] pub trait P2p: 'static + Send + Sync + Clone {
pub trait P2p: 'static + Send + Sync + Clone + Debug {
/// Broadcast a message to all other members of the Tributary with the specified genesis. /// Broadcast a message to all other members of the Tributary with the specified genesis.
/// ///
/// The Tributary will re-broadcast consensus messages on a fixed interval to ensure they aren't /// The Tributary will re-broadcast consensus messages on a fixed interval to ensure they aren't
/// prematurely dropped from the P2P layer. THe P2P layer SHOULD perform content-based /// prematurely dropped from the P2P layer. THe P2P layer SHOULD perform content-based
/// deduplication to ensure a sane amount of load. /// deduplication to ensure a sane amount of load.
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>); fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) -> impl Send + Future<Output = ()>;
} }
#[async_trait]
impl<P: P2p> P2p for Arc<P> { impl<P: P2p> P2p for Arc<P> {
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) { fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) -> impl Send + Future<Output = ()> {
(*self).broadcast(genesis, msg).await P::broadcast(self, genesis, msg)
} }
} }

View File

@@ -1,8 +1,6 @@
use core::ops::Deref; use core::{ops::Deref, future::Future};
use std::{sync::Arc, collections::HashMap}; use std::{sync::Arc, collections::HashMap};
use async_trait::async_trait;
use subtle::ConstantTimeEq; use subtle::ConstantTimeEq;
use zeroize::{Zeroize, Zeroizing}; use zeroize::{Zeroize, Zeroizing};
@@ -74,50 +72,52 @@ impl Signer {
} }
} }
#[async_trait]
impl SignerTrait for Signer { impl SignerTrait for Signer {
type ValidatorId = [u8; 32]; type ValidatorId = [u8; 32];
type Signature = [u8; 64]; type Signature = [u8; 64];
/// Returns the validator's current ID. Returns None if they aren't a current validator. /// Returns the validator's current ID. Returns None if they aren't a current validator.
async fn validator_id(&self) -> Option<Self::ValidatorId> { fn validator_id(&self) -> impl Send + Future<Output = Option<Self::ValidatorId>> {
Some((Ristretto::generator() * self.key.deref()).to_bytes()) async move { Some((Ristretto::generator() * self.key.deref()).to_bytes()) }
} }
/// Sign a signature with the current validator's private key. /// Sign a signature with the current validator's private key.
async fn sign(&self, msg: &[u8]) -> Self::Signature { fn sign(&self, msg: &[u8]) -> impl Send + Future<Output = Self::Signature> {
let mut nonce = Zeroizing::new(RecommendedTranscript::new(b"Tributary Chain Tendermint Nonce")); async move {
nonce.append_message(b"genesis", self.genesis); let mut nonce =
nonce.append_message(b"key", Zeroizing::new(self.key.deref().to_repr()).as_ref()); Zeroizing::new(RecommendedTranscript::new(b"Tributary Chain Tendermint Nonce"));
nonce.append_message(b"message", msg); nonce.append_message(b"genesis", self.genesis);
let mut nonce = nonce.challenge(b"nonce"); nonce.append_message(b"key", Zeroizing::new(self.key.deref().to_repr()).as_ref());
nonce.append_message(b"message", msg);
let mut nonce = nonce.challenge(b"nonce");
let mut nonce_arr = [0; 64]; let mut nonce_arr = [0; 64];
nonce_arr.copy_from_slice(nonce.as_ref()); nonce_arr.copy_from_slice(nonce.as_ref());
let nonce_ref: &mut [u8] = nonce.as_mut(); let nonce_ref: &mut [u8] = nonce.as_mut();
nonce_ref.zeroize(); nonce_ref.zeroize();
let nonce_ref: &[u8] = nonce.as_ref(); let nonce_ref: &[u8] = nonce.as_ref();
assert_eq!(nonce_ref, [0; 64].as_ref()); assert_eq!(nonce_ref, [0; 64].as_ref());
let nonce = let nonce =
Zeroizing::new(<Ristretto as Ciphersuite>::F::from_bytes_mod_order_wide(&nonce_arr)); Zeroizing::new(<Ristretto as Ciphersuite>::F::from_bytes_mod_order_wide(&nonce_arr));
nonce_arr.zeroize(); nonce_arr.zeroize();
assert!(!bool::from(nonce.ct_eq(&<Ristretto as Ciphersuite>::F::ZERO))); assert!(!bool::from(nonce.ct_eq(&<Ristretto as Ciphersuite>::F::ZERO)));
let challenge = challenge( let challenge = challenge(
self.genesis, self.genesis,
(Ristretto::generator() * self.key.deref()).to_bytes(), (Ristretto::generator() * self.key.deref()).to_bytes(),
(Ristretto::generator() * nonce.deref()).to_bytes().as_ref(), (Ristretto::generator() * nonce.deref()).to_bytes().as_ref(),
msg, msg,
); );
let sig = SchnorrSignature::<Ristretto>::sign(&self.key, nonce, challenge).serialize(); let sig = SchnorrSignature::<Ristretto>::sign(&self.key, nonce, challenge).serialize();
let mut res = [0; 64]; let mut res = [0; 64];
res.copy_from_slice(&sig); res.copy_from_slice(&sig);
res res
}
} }
} }
@@ -274,7 +274,6 @@ pub const BLOCK_PROCESSING_TIME: u32 = 999;
pub const LATENCY_TIME: u32 = 1667; pub const LATENCY_TIME: u32 = 1667;
pub const TARGET_BLOCK_TIME: u32 = BLOCK_PROCESSING_TIME + (3 * LATENCY_TIME); pub const TARGET_BLOCK_TIME: u32 = BLOCK_PROCESSING_TIME + (3 * LATENCY_TIME);
#[async_trait]
impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P> { impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P> {
type Db = D; type Db = D;
@@ -300,111 +299,126 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
self.validators.clone() self.validators.clone()
} }
async fn broadcast(&mut self, msg: SignedMessageFor<Self>) { fn broadcast(&mut self, msg: SignedMessageFor<Self>) -> impl Send + Future<Output = ()> {
let mut to_broadcast = vec![TENDERMINT_MESSAGE]; async move {
to_broadcast.extend(msg.encode()); let mut to_broadcast = vec![TENDERMINT_MESSAGE];
self.p2p.broadcast(self.genesis, to_broadcast).await to_broadcast.extend(msg.encode());
} self.p2p.broadcast(self.genesis, to_broadcast).await
async fn slash(&mut self, validator: Self::ValidatorId, slash_event: SlashEvent) {
log::error!(
"validator {} triggered a slash event on tributary {} (with evidence: {})",
hex::encode(validator),
hex::encode(self.genesis),
matches!(slash_event, SlashEvent::WithEvidence(_)),
);
let signer = self.signer();
let Some(tx) = (match slash_event {
SlashEvent::WithEvidence(evidence) => {
// create an unsigned evidence tx
Some(TendermintTx::SlashEvidence(evidence))
}
SlashEvent::Id(_reason, _block, _round) => {
// TODO: Increase locally observed slash points
None
}
}) else {
return;
};
// add tx to blockchain and broadcast to peers
let mut to_broadcast = vec![TRANSACTION_MESSAGE];
tx.write(&mut to_broadcast).unwrap();
if self.blockchain.write().await.add_transaction::<Self>(
true,
Transaction::Tendermint(tx),
&self.signature_scheme(),
) == Ok(true)
{
self.p2p.broadcast(signer.genesis, to_broadcast).await;
} }
} }
async fn validate(&self, block: &Self::Block) -> Result<(), TendermintBlockError> { fn slash(
let block = &mut self,
Block::read::<&[u8]>(&mut block.0.as_ref()).map_err(|_| TendermintBlockError::Fatal)?; validator: Self::ValidatorId,
self slash_event: SlashEvent,
.blockchain ) -> impl Send + Future<Output = ()> {
.read() async move {
.await log::error!(
.verify_block::<Self>(&block, &self.signature_scheme(), false) "validator {} triggered a slash event on tributary {} (with evidence: {})",
.map_err(|e| match e { hex::encode(validator),
BlockError::NonLocalProvided(_) => TendermintBlockError::Temporal, hex::encode(self.genesis),
_ => { matches!(slash_event, SlashEvent::WithEvidence(_)),
log::warn!("Tributary Tendermint validate returning BlockError::Fatal due to {e:?}"); );
TendermintBlockError::Fatal
let signer = self.signer();
let Some(tx) = (match slash_event {
SlashEvent::WithEvidence(evidence) => {
// create an unsigned evidence tx
Some(TendermintTx::SlashEvidence(evidence))
} }
}) SlashEvent::Id(_reason, _block, _round) => {
// TODO: Increase locally observed slash points
None
}
}) else {
return;
};
// add tx to blockchain and broadcast to peers
let mut to_broadcast = vec![TRANSACTION_MESSAGE];
tx.write(&mut to_broadcast).unwrap();
if self.blockchain.write().await.add_transaction::<Self>(
true,
Transaction::Tendermint(tx),
&self.signature_scheme(),
) == Ok(true)
{
self.p2p.broadcast(signer.genesis, to_broadcast).await;
}
}
} }
async fn add_block( fn validate(
&self,
block: &Self::Block,
) -> impl Send + Future<Output = Result<(), TendermintBlockError>> {
async move {
let block =
Block::read::<&[u8]>(&mut block.0.as_ref()).map_err(|_| TendermintBlockError::Fatal)?;
self
.blockchain
.read()
.await
.verify_block::<Self>(&block, &self.signature_scheme(), false)
.map_err(|e| match e {
BlockError::NonLocalProvided(_) => TendermintBlockError::Temporal,
_ => {
log::warn!("Tributary Tendermint validate returning BlockError::Fatal due to {e:?}");
TendermintBlockError::Fatal
}
})
}
}
fn add_block(
&mut self, &mut self,
serialized_block: Self::Block, serialized_block: Self::Block,
commit: Commit<Self::SignatureScheme>, commit: Commit<Self::SignatureScheme>,
) -> Option<Self::Block> { ) -> impl Send + Future<Output = Option<Self::Block>> {
let invalid_block = || { async move {
// There's a fatal flaw in the code, it's behind a hard fork, or the validators turned let invalid_block = || {
// malicious // There's a fatal flaw in the code, it's behind a hard fork, or the validators turned
// All justify a halt to then achieve social consensus from // malicious
// TODO: Under multiple validator sets, a small validator set turning malicious knocks // All justify a halt to then achieve social consensus from
// off the entire network. That's an unacceptable DoS. // TODO: Under multiple validator sets, a small validator set turning malicious knocks
panic!("validators added invalid block to tributary {}", hex::encode(self.genesis)); // off the entire network. That's an unacceptable DoS.
}; panic!("validators added invalid block to tributary {}", hex::encode(self.genesis));
};
// Tendermint should only produce valid commits // Tendermint should only produce valid commits
assert!(self.verify_commit(serialized_block.id(), &commit)); assert!(self.verify_commit(serialized_block.id(), &commit));
let Ok(block) = Block::read::<&[u8]>(&mut serialized_block.0.as_ref()) else { let Ok(block) = Block::read::<&[u8]>(&mut serialized_block.0.as_ref()) else {
return invalid_block(); return invalid_block();
}; };
let encoded_commit = commit.encode(); let encoded_commit = commit.encode();
loop { loop {
let block_res = self.blockchain.write().await.add_block::<Self>( let block_res = self.blockchain.write().await.add_block::<Self>(
&block, &block,
encoded_commit.clone(), encoded_commit.clone(),
&self.signature_scheme(), &self.signature_scheme(),
); );
match block_res { match block_res {
Ok(()) => { Ok(()) => {
// If we successfully added this block, break // If we successfully added this block, break
break; break;
}
Err(BlockError::NonLocalProvided(hash)) => {
log::error!(
"missing provided transaction {} which other validators on tributary {} had",
hex::encode(hash),
hex::encode(self.genesis)
);
tokio::time::sleep(core::time::Duration::from_secs(5)).await;
}
_ => return invalid_block(),
} }
Err(BlockError::NonLocalProvided(hash)) => {
log::error!(
"missing provided transaction {} which other validators on tributary {} had",
hex::encode(hash),
hex::encode(self.genesis)
);
tokio::time::sleep(core::time::Duration::from_secs(5)).await;
}
_ => return invalid_block(),
} }
}
Some(TendermintBlock( Some(TendermintBlock(
self.blockchain.write().await.build_block::<Self>(&self.signature_scheme()).serialize(), self.blockchain.write().await.build_block::<Self>(&self.signature_scheme()).serialize(),
)) ))
}
} }
} }

View File

@@ -1,11 +1,12 @@
use core::future::Future;
pub use crate::P2p; pub use crate::P2p;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct DummyP2p; pub struct DummyP2p;
#[async_trait::async_trait]
impl P2p for DummyP2p { impl P2p for DummyP2p {
async fn broadcast(&self, _: [u8; 32], _: Vec<u8>) { fn broadcast(&self, _: [u8; 32], _: Vec<u8>) -> impl Send + Future<Output = ()> {
unimplemented!() async move { unimplemented!() }
} }
} }

View File

@@ -1,4 +1,7 @@
use core::future::Future;
use tendermint::ext::Network; use tendermint::ext::Network;
use crate::{ use crate::{
P2p, TendermintTx, P2p, TendermintTx,
tendermint::{TARGET_BLOCK_TIME, TendermintNetwork}, tendermint::{TARGET_BLOCK_TIME, TendermintNetwork},
@@ -11,10 +14,9 @@ fn assert_target_block_time() {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct DummyP2p; pub struct DummyP2p;
#[async_trait::async_trait]
impl P2p for DummyP2p { impl P2p for DummyP2p {
async fn broadcast(&self, _: [u8; 32], _: Vec<u8>) { fn broadcast(&self, _: [u8; 32], _: Vec<u8>) -> impl Send + Future<Output = ()> {
unimplemented!() async move { unimplemented!() }
} }
} }

View File

@@ -16,7 +16,6 @@ rustdoc-args = ["--cfg", "docsrs"]
workspace = true workspace = true
[dependencies] [dependencies]
async-trait = { version = "0.1", default-features = false }
thiserror = { version = "2", default-features = false, features = ["std"] } thiserror = { version = "2", default-features = false, features = ["std"] }
hex = { version = "0.4", default-features = false, features = ["std"] } hex = { version = "0.4", default-features = false, features = ["std"] }

View File

@@ -1,7 +1,6 @@
use core::{hash::Hash, fmt::Debug}; use core::{hash::Hash, fmt::Debug, future::Future};
use std::{sync::Arc, collections::HashSet}; use std::{sync::Arc, collections::HashSet};
use async_trait::async_trait;
use thiserror::Error; use thiserror::Error;
use parity_scale_codec::{Encode, Decode}; use parity_scale_codec::{Encode, Decode};
@@ -34,7 +33,6 @@ pub struct BlockNumber(pub u64);
pub struct RoundNumber(pub u32); pub struct RoundNumber(pub u32);
/// A signer for a validator. /// A signer for a validator.
#[async_trait]
pub trait Signer: Send + Sync { pub trait Signer: Send + Sync {
// Type used to identify validators. // Type used to identify validators.
type ValidatorId: ValidatorId; type ValidatorId: ValidatorId;
@@ -42,22 +40,21 @@ pub trait Signer: Send + Sync {
type Signature: Signature; type Signature: Signature;
/// Returns the validator's current ID. Returns None if they aren't a current validator. /// Returns the validator's current ID. Returns None if they aren't a current validator.
async fn validator_id(&self) -> Option<Self::ValidatorId>; fn validator_id(&self) -> impl Send + Future<Output = Option<Self::ValidatorId>>;
/// Sign a signature with the current validator's private key. /// Sign a signature with the current validator's private key.
async fn sign(&self, msg: &[u8]) -> Self::Signature; fn sign(&self, msg: &[u8]) -> impl Send + Future<Output = Self::Signature>;
} }
#[async_trait]
impl<S: Signer> Signer for Arc<S> { impl<S: Signer> Signer for Arc<S> {
type ValidatorId = S::ValidatorId; type ValidatorId = S::ValidatorId;
type Signature = S::Signature; type Signature = S::Signature;
async fn validator_id(&self) -> Option<Self::ValidatorId> { fn validator_id(&self) -> impl Send + Future<Output = Option<Self::ValidatorId>> {
self.as_ref().validator_id().await self.as_ref().validator_id()
} }
async fn sign(&self, msg: &[u8]) -> Self::Signature { fn sign(&self, msg: &[u8]) -> impl Send + Future<Output = Self::Signature> {
self.as_ref().sign(msg).await self.as_ref().sign(msg)
} }
} }
@@ -210,7 +207,6 @@ pub trait Block: Send + Sync + Clone + PartialEq + Eq + Debug + Encode + Decode
} }
/// Trait representing the distributed system Tendermint is providing consensus over. /// Trait representing the distributed system Tendermint is providing consensus over.
#[async_trait]
pub trait Network: Sized + Send + Sync { pub trait Network: Sized + Send + Sync {
/// The database used to back this. /// The database used to back this.
type Db: serai_db::Db; type Db: serai_db::Db;
@@ -229,6 +225,7 @@ pub trait Network: Sized + Send + Sync {
/// This should include both the time to download the block and the actual processing time. /// This should include both the time to download the block and the actual processing time.
/// ///
/// BLOCK_PROCESSING_TIME + (3 * LATENCY_TIME) must be divisible by 1000. /// BLOCK_PROCESSING_TIME + (3 * LATENCY_TIME) must be divisible by 1000.
// TODO: Redefine as Duration
const BLOCK_PROCESSING_TIME: u32; const BLOCK_PROCESSING_TIME: u32;
/// Network latency time in milliseconds. /// Network latency time in milliseconds.
/// ///
@@ -280,15 +277,19 @@ pub trait Network: Sized + Send + Sync {
/// Switching to unauthenticated channels in a system already providing authenticated channels is /// Switching to unauthenticated channels in a system already providing authenticated channels is
/// not recommended as this is a minor, temporal inefficiency, while downgrading channels may /// not recommended as this is a minor, temporal inefficiency, while downgrading channels may
/// have wider implications. /// have wider implications.
async fn broadcast(&mut self, msg: SignedMessageFor<Self>); fn broadcast(&mut self, msg: SignedMessageFor<Self>) -> impl Send + Future<Output = ()>;
/// Trigger a slash for the validator in question who was definitively malicious. /// Trigger a slash for the validator in question who was definitively malicious.
/// ///
/// The exact process of triggering a slash is undefined and left to the network as a whole. /// The exact process of triggering a slash is undefined and left to the network as a whole.
async fn slash(&mut self, validator: Self::ValidatorId, slash_event: SlashEvent); fn slash(
&mut self,
validator: Self::ValidatorId,
slash_event: SlashEvent,
) -> impl Send + Future<Output = ()>;
/// Validate a block. /// Validate a block.
async fn validate(&self, block: &Self::Block) -> Result<(), BlockError>; fn validate(&self, block: &Self::Block) -> impl Send + Future<Output = Result<(), BlockError>>;
/// Add a block, returning the proposal for the next one. /// Add a block, returning the proposal for the next one.
/// ///
@@ -298,9 +299,9 @@ pub trait Network: Sized + Send + Sync {
/// This deviates from the paper which will have a local node refuse to decide on a block it /// This deviates from the paper which will have a local node refuse to decide on a block it
/// considers invalid. This library acknowledges the network did decide on it, leaving handling /// considers invalid. This library acknowledges the network did decide on it, leaving handling
/// of it to the network, and outside of this scope. /// of it to the network, and outside of this scope.
async fn add_block( fn add_block(
&mut self, &mut self,
block: Self::Block, block: Self::Block,
commit: Commit<Self::SignatureScheme>, commit: Commit<Self::SignatureScheme>,
) -> Option<Self::Block>; ) -> impl Send + Future<Output = Option<Self::Block>>;
} }

View File

@@ -1,10 +1,9 @@
use core::future::Future;
use std::{ use std::{
sync::Arc, sync::Arc,
time::{UNIX_EPOCH, SystemTime, Duration}, time::{UNIX_EPOCH, SystemTime, Duration},
}; };
use async_trait::async_trait;
use parity_scale_codec::{Encode, Decode}; use parity_scale_codec::{Encode, Decode};
use futures_util::sink::SinkExt; use futures_util::sink::SinkExt;
@@ -21,20 +20,21 @@ type TestValidatorId = u16;
type TestBlockId = [u8; 4]; type TestBlockId = [u8; 4];
struct TestSigner(u16); struct TestSigner(u16);
#[async_trait]
impl Signer for TestSigner { impl Signer for TestSigner {
type ValidatorId = TestValidatorId; type ValidatorId = TestValidatorId;
type Signature = [u8; 32]; type Signature = [u8; 32];
async fn validator_id(&self) -> Option<TestValidatorId> { fn validator_id(&self) -> impl Send + Future<Output = Option<TestValidatorId>> {
Some(self.0) async move { Some(self.0) }
} }
async fn sign(&self, msg: &[u8]) -> [u8; 32] { fn sign(&self, msg: &[u8]) -> impl Send + Future<Output = [u8; 32]> {
let mut sig = [0; 32]; async move {
sig[.. 2].copy_from_slice(&self.0.to_le_bytes()); let mut sig = [0; 32];
sig[2 .. (2 + 30.min(msg.len()))].copy_from_slice(&msg[.. 30.min(msg.len())]); sig[.. 2].copy_from_slice(&self.0.to_le_bytes());
sig sig[2 .. (2 + 30.min(msg.len()))].copy_from_slice(&msg[.. 30.min(msg.len())]);
sig
}
} }
} }
@@ -111,7 +111,6 @@ struct TestNetwork(
Arc<RwLock<Vec<(MessageSender<Self>, SyncedBlockSender<Self>, SyncedBlockResultReceiver)>>>, Arc<RwLock<Vec<(MessageSender<Self>, SyncedBlockSender<Self>, SyncedBlockResultReceiver)>>>,
); );
#[async_trait]
impl Network for TestNetwork { impl Network for TestNetwork {
type Db = MemDb; type Db = MemDb;