6 Commits

Author SHA1 Message Date
Luke Parker
b2bd5d3a44 Remove Debug bound on tributary::P2p 2025-01-08 17:40:32 -05:00
Luke Parker
de2d6568a4 Actually implement the Peer abstraction for Libp2p 2025-01-08 17:40:08 -05:00
Luke Parker
fd9b464b35 Add a trait for the P2p network used in the coordinator
Moves all of the Libp2p code to a dedicated directory. Makes the Heartbeat task
abstract over any P2p network.
2025-01-08 17:01:37 -05:00
Luke Parker
376a66b000 Remove async-trait from tendermint-machine, tributary-chain 2025-01-08 16:41:11 -05:00
Luke Parker
2121a9b131 Spawn the task to select validators to dial 2025-01-07 18:17:36 -05:00
Luke Parker
419223c54e Build the swarm
Moves UpdateSharedValidatorsTask to validators.rs. While prior planned to
re-use a validators object across connecting and peer state management, the
current plan is to use an independent validators object for each to minimize
any contention. They should be built infrequently enough, and cheap enough to
update in the majority case (due to quickly checking if an update is needed),
that this is fine.
2025-01-07 18:09:25 -05:00
18 changed files with 587 additions and 324 deletions

2
Cargo.lock generated
View File

@@ -10498,7 +10498,6 @@ dependencies = [
name = "tendermint-machine"
version = "0.2.0"
dependencies = [
"async-trait",
"futures-channel",
"futures-util",
"hex",
@@ -10941,7 +10940,6 @@ dependencies = [
name = "tributary-chain"
version = "0.1.0"
dependencies = [
"async-trait",
"blake2",
"ciphersuite",
"flexible-transcript",

View File

@@ -1,9 +1,10 @@
use core::future::Future;
use std::time::{Duration, SystemTime};
use serai_client::validator_sets::primitives::ValidatorSet;
use futures_util::FutureExt;
use tributary::{ReadWrite, Block, Tributary, TributaryReader};
use serai_db::*;
@@ -11,10 +12,7 @@ use serai_task::ContinuallyRan;
use crate::{
tributary::Transaction,
p2p::{
reqres::{Request, Response},
P2p,
},
p2p::{Peer, P2p},
};
// Amount of blocks in a minute
@@ -28,14 +26,14 @@ pub const BLOCKS_PER_BATCH: usize = BLOCKS_PER_MINUTE + 1;
///
/// If the other validator has more blocks then we do, they're expected to inform us. This forms
/// the sync protocol for our Tributaries.
struct HeartbeatTask<TD: Db> {
struct HeartbeatTask<TD: Db, P: P2p> {
set: ValidatorSet,
tributary: Tributary<TD, Transaction, P2p>,
tributary: Tributary<TD, Transaction, P>,
reader: TributaryReader<TD, Transaction>,
p2p: P2p,
p2p: P,
}
impl<TD: Db> ContinuallyRan for HeartbeatTask<TD> {
impl<TD: Db, P: P2p> ContinuallyRan for HeartbeatTask<TD, P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move {
// If our blockchain hasn't had a block in the past minute, trigger the heartbeat protocol
@@ -74,8 +72,10 @@ impl<TD: Db> ContinuallyRan for HeartbeatTask<TD> {
tip = self.reader.tip();
tip_is_stale = false;
}
let request = Request::Heartbeat { set: self.set, latest_block_hash: tip };
let Ok(Response::Blocks(blocks)) = peer.send(request).await else { continue 'peer };
// Necessary due to https://github.com/rust-lang/rust/issues/100013
let Some(blocks) = peer.send_heartbeat(self.set, tip).boxed().await else {
continue 'peer;
};
// This is the final batch if it has less than the maximum amount of blocks
// (signifying there weren't more blocks after this to fill the batch with)

View File

@@ -19,14 +19,15 @@ use libp2p::{
noise,
};
use crate::p2p::{validators::Validators, peer_id_from_public};
use crate::p2p::libp2p::{validators::Validators, peer_id_from_public};
const PROTOCOL: &str = "/serai/coordinator/validators";
struct OnlyValidators {
validators: Arc<RwLock<Validators>>,
serai_key: Zeroizing<Keypair>,
noise_keypair: identity::Keypair,
#[derive(Clone)]
pub(crate) struct OnlyValidators {
pub(crate) validators: Arc<RwLock<Validators>>,
pub(crate) serai_key: Zeroizing<Keypair>,
pub(crate) noise_keypair: identity::Keypair,
}
impl OnlyValidators {

View File

@@ -14,7 +14,7 @@ use libp2p::{
use serai_task::ContinuallyRan;
use crate::p2p::{PORT, Peers, validators::Validators};
use crate::p2p::libp2p::{PORT, Peers, validators::Validators};
const TARGET_PEERS_PER_NETWORK: usize = 5;
/*
@@ -28,13 +28,19 @@ const TARGET_PEERS_PER_NETWORK: usize = 5;
*/
// TODO const TARGET_DIALED_PEERS_PER_NETWORK: usize = 3;
struct DialTask {
pub(crate) struct DialTask {
serai: Serai,
validators: Validators,
peers: Peers,
to_dial: mpsc::UnboundedSender<DialOpts>,
}
impl DialTask {
pub(crate) fn new(serai: Serai, peers: Peers, to_dial: mpsc::UnboundedSender<DialOpts>) -> Self {
DialTask { serai: serai.clone(), validators: Validators::new(serai), peers, to_dial }
}
}
impl ContinuallyRan for DialTask {
// Only run every five minutes, not the default of every five seconds
const DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60;

View File

@@ -15,7 +15,7 @@ pub use libp2p::gossipsub::Event;
use serai_cosign::SignedCosign;
// Block size limit + 16 KB of space for signatures/metadata
const MAX_LIBP2P_GOSSIP_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 16384;
pub(crate) const MAX_LIBP2P_GOSSIP_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 16384;
const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(80);

View File

@@ -0,0 +1,214 @@
use core::{future::Future, time::Duration};
use std::{
sync::Arc,
collections::{HashSet, HashMap},
};
use zeroize::Zeroizing;
use schnorrkel::Keypair;
use serai_client::{
primitives::{NetworkId, PublicKey},
validator_sets::primitives::ValidatorSet,
Serai,
};
use tokio::sync::{mpsc, oneshot, RwLock};
use serai_task::{Task, ContinuallyRan};
use libp2p::{
multihash::Multihash,
identity::{self, PeerId},
tcp::Config as TcpConfig,
yamux,
swarm::NetworkBehaviour,
SwarmBuilder,
};
use crate::p2p::TributaryBlockWithCommit;
/// A struct to sync the validators from the Serai node in order to keep track of them.
mod validators;
use validators::UpdateValidatorsTask;
/// The authentication protocol upgrade to limit the P2P network to active validators.
mod authenticate;
use authenticate::OnlyValidators;
/// The dial task, to find new peers to connect to
mod dial;
use dial::DialTask;
/// The request-response messages and behavior
mod reqres;
use reqres::{Request, Response};
/// The gossip messages and behavior
mod gossip;
/// The swarm task, running it and dispatching to/from it
mod swarm;
use swarm::SwarmTask;
const PORT: u16 = 30563; // 5132 ^ (('c' << 8) | 'o')
// usize::max, manually implemented, as max isn't a const fn
const MAX_LIBP2P_MESSAGE_SIZE: usize =
if gossip::MAX_LIBP2P_GOSSIP_MESSAGE_SIZE > reqres::MAX_LIBP2P_REQRES_MESSAGE_SIZE {
gossip::MAX_LIBP2P_GOSSIP_MESSAGE_SIZE
} else {
reqres::MAX_LIBP2P_REQRES_MESSAGE_SIZE
};
fn peer_id_from_public(public: PublicKey) -> PeerId {
// 0 represents the identity Multihash, that no hash was performed
// It's an internal constant so we can't refer to the constant inside libp2p
PeerId::from_multihash(Multihash::wrap(0, &public.0).unwrap()).unwrap()
}
struct Peer<'a> {
outbound_requests: &'a mpsc::UnboundedSender<(PeerId, Request, oneshot::Sender<Response>)>,
id: PeerId,
}
impl crate::p2p::Peer<'_> for Peer<'_> {
fn send_heartbeat(
&self,
set: ValidatorSet,
latest_block_hash: [u8; 32],
) -> impl Send + Future<Output = Option<Vec<TributaryBlockWithCommit>>> {
const HEARBEAT_TIMEOUT: Duration = Duration::from_secs(5);
async move {
let request = Request::Heartbeat { set, latest_block_hash };
let (sender, receiver) = oneshot::channel();
self
.outbound_requests
.send((self.id, request, sender))
.expect("outbound requests recv channel was dropped?");
match tokio::time::timeout(HEARBEAT_TIMEOUT, receiver).await.ok()?.ok()? {
Response::None => Some(vec![]),
Response::Blocks(blocks) => Some(blocks),
// TODO: Disconnect this peer
Response::NotableCosigns(_) => None,
}
}
}
}
#[derive(Clone)]
struct Peers {
peers: Arc<RwLock<HashMap<NetworkId, HashSet<PeerId>>>>,
}
#[derive(NetworkBehaviour)]
struct Behavior {
reqres: reqres::Behavior,
gossip: gossip::Behavior,
}
#[derive(Clone)]
struct Libp2p {
peers: Peers,
outbound_requests: mpsc::UnboundedSender<(PeerId, Request, oneshot::Sender<Response>)>,
}
impl Libp2p {
pub(crate) fn new(serai_key: &Zeroizing<Keypair>, serai: Serai) -> Libp2p {
// Define the object we track peers with
let peers = Peers { peers: Arc::new(RwLock::new(HashMap::new())) };
// Define the dial task
let (dial_task_def, dial_task) = Task::new();
let (to_dial_send, to_dial_recv) = mpsc::unbounded_channel();
tokio::spawn(
DialTask::new(serai.clone(), peers.clone(), to_dial_send)
.continually_run(dial_task_def, vec![]),
);
// Define the Validators object used for validating new connections
let connection_validators = UpdateValidatorsTask::spawn(serai.clone());
let new_only_validators = |noise_keypair: &identity::Keypair| -> Result<_, ()> {
Ok(OnlyValidators {
serai_key: serai_key.clone(),
validators: connection_validators.clone(),
noise_keypair: noise_keypair.clone(),
})
};
let new_yamux = || {
let mut config = yamux::Config::default();
// 1 MiB default + max message size
config.set_max_buffer_size((1024 * 1024) + MAX_LIBP2P_MESSAGE_SIZE);
// 256 KiB default + max message size
config.set_receive_window_size(((256 * 1024) + MAX_LIBP2P_MESSAGE_SIZE).try_into().unwrap());
config
};
let behavior = Behavior { reqres: reqres::new_behavior(), gossip: gossip::new_behavior() };
let mut swarm = SwarmBuilder::with_existing_identity(identity::Keypair::generate_ed25519())
.with_tokio()
.with_tcp(TcpConfig::default().nodelay(false), new_only_validators, new_yamux)
.unwrap()
.with_behaviour(|_| behavior)
.unwrap()
.build();
swarm.listen_on(format!("/ip4/0.0.0.0/tcp/{PORT}").parse().unwrap()).unwrap();
swarm.listen_on(format!("/ip6/::/tcp/{PORT}").parse().unwrap()).unwrap();
let swarm_validators = UpdateValidatorsTask::spawn(serai);
let (gossip_send, gossip_recv) = mpsc::unbounded_channel();
let (signed_cosigns_send, signed_cosigns_recv) = mpsc::unbounded_channel();
let (tributary_gossip_send, tributary_gossip_recv) = mpsc::unbounded_channel();
let (outbound_requests_send, outbound_requests_recv) = mpsc::unbounded_channel();
let (heartbeat_requests_send, heartbeat_requests_recv) = mpsc::unbounded_channel();
let (notable_cosign_requests_send, notable_cosign_requests_recv) = mpsc::unbounded_channel();
let (inbound_request_responses_send, inbound_request_responses_recv) =
mpsc::unbounded_channel();
// Create the swarm task
SwarmTask::spawn(
dial_task,
to_dial_recv,
swarm_validators,
peers,
swarm,
gossip_recv,
signed_cosigns_send,
tributary_gossip_send,
outbound_requests_recv,
heartbeat_requests_send,
notable_cosign_requests_send,
inbound_request_responses_recv,
);
// gossip_send, signed_cosigns_recv, tributary_gossip_recv, outbound_requests_send,
// heartbeat_requests_recv, notable_cosign_requests_recv, inbound_request_responses_send
todo!("TODO");
}
}
impl tributary::P2p for Libp2p {
fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) -> impl Send + Future<Output = ()> {
async move { todo!("TODO") }
}
}
impl crate::p2p::P2p for Libp2p {
type Peer<'a> = Peer<'a>;
fn peers(&self, network: NetworkId) -> impl Send + Future<Output = Vec<Self::Peer<'_>>> {
async move {
let Some(peer_ids) = self.peers.peers.read().await.get(&network).cloned() else {
return vec![];
};
let mut res = vec![];
for id in peer_ids {
res.push(Peer { outbound_requests: &self.outbound_requests, id });
}
res
}
}
}

View File

@@ -15,9 +15,11 @@ pub use request_response::Message;
use serai_cosign::SignedCosign;
use crate::p2p::TributaryBlockWithCommit;
/// The maximum message size for the request-response protocol
// This is derived from the heartbeat message size as it's our largest message
const MAX_LIBP2P_REQRES_MESSAGE_SIZE: usize =
pub(crate) const MAX_LIBP2P_REQRES_MESSAGE_SIZE: usize =
(tributary::BLOCK_SIZE_LIMIT * crate::p2p::heartbeat::BLOCKS_PER_BATCH) + 1024;
const PROTOCOL: &str = "/serai/coordinator/reqres/1.0.0";
@@ -36,24 +38,17 @@ pub(crate) enum Request {
NotableCosigns { global_session: [u8; 32] },
}
/// A tributary block and its commit.
#[derive(Clone, BorshSerialize, BorshDeserialize)]
pub(crate) struct TributaryBlockWithCommit {
pub(crate) block: Vec<u8>,
pub(crate) commit: Vec<u8>,
}
/// Responses which can be received via the request-response protocol.
#[derive(Clone, BorshSerialize, BorshDeserialize)]
pub(crate) enum Response {
NoResponse,
None,
Blocks(Vec<TributaryBlockWithCommit>),
NotableCosigns(Vec<SignedCosign>),
}
impl fmt::Debug for Response {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Response::NoResponse => fmt.debug_struct("Response::NoResponse").finish(),
Response::None => fmt.debug_struct("Response::None").finish(),
Response::Blocks(_) => fmt.debug_struct("Response::Block").finish_non_exhaustive(),
Response::NotableCosigns(_) => {
fmt.debug_struct("Response::NotableCosigns").finish_non_exhaustive()

View File

@@ -21,13 +21,16 @@ use libp2p::{
swarm::{dial_opts::DialOpts, SwarmEvent, Swarm},
};
use crate::p2p::{
use crate::p2p::libp2p::{
Peers, BehaviorEvent, Behavior,
validators::Validators,
reqres::{self, Request, Response},
gossip,
};
const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(80);
const TIME_BETWEEN_REBUILD_PEERS: Duration = Duration::from_secs(10 * 60);
/*
`SwarmTask` handles everything we need the `Swarm` object for. The goal is to minimize the
contention on this task. Unfortunately, the `Swarm` object itself is needed for a variety of
@@ -43,7 +46,7 @@ use crate::p2p::{
- Dispatching received requests
- Sending responses
*/
struct SwarmTask {
pub(crate) struct SwarmTask {
dial_task: TaskHandle,
to_dial: mpsc::UnboundedReceiver<DialOpts>,
last_dial_task_run: Instant,
@@ -54,12 +57,14 @@ struct SwarmTask {
swarm: Swarm<Behavior>,
last_message: Instant,
gossip: mpsc::UnboundedReceiver<gossip::Message>,
signed_cosigns: mpsc::UnboundedSender<SignedCosign>,
tributary_gossip: mpsc::UnboundedSender<(ValidatorSet, Vec<u8>)>,
outbound_requests: mpsc::UnboundedReceiver<(PeerId, Request, oneshot::Sender<Option<Response>>)>,
outbound_request_responses: HashMap<RequestId, oneshot::Sender<Option<Response>>>,
outbound_requests: mpsc::UnboundedReceiver<(PeerId, Request, oneshot::Sender<Response>)>,
outbound_request_responses: HashMap<RequestId, oneshot::Sender<Response>>,
inbound_request_response_channels: HashMap<RequestId, ResponseChannel<Response>>,
heartbeat_requests: mpsc::UnboundedSender<(RequestId, ValidatorSet, [u8; 32])>,
@@ -99,12 +104,10 @@ impl SwarmTask {
fn handle_reqres(&mut self, event: reqres::Event) {
match event {
reqres::Event::Message { message, .. } => match message {
reqres::Message::Request { request_id, request, channel } => {
match request {
// TODO: Send these
reqres::Message::Request { request_id, request, channel } => match request {
reqres::Request::KeepAlive => {
let _: Result<_, _> =
self.swarm.behaviour_mut().reqres.send_response(channel, Response::NoResponse);
self.swarm.behaviour_mut().reqres.send_response(channel, Response::None);
}
reqres::Request::Heartbeat { set, latest_block_hash } => {
self.inbound_request_response_channels.insert(request_id, channel);
@@ -115,19 +118,17 @@ impl SwarmTask {
self.inbound_request_response_channels.insert(request_id, channel);
let _: Result<_, _> = self.notable_cosign_requests.send((request_id, global_session));
}
}
}
},
reqres::Message::Response { request_id, response } => {
// Send Some(response) as the response for the request
if let Some(channel) = self.outbound_request_responses.remove(&request_id) {
let _: Result<_, _> = channel.send(Some(response));
let _: Result<_, _> = channel.send(response);
}
}
},
reqres::Event::OutboundFailure { request_id, .. } => {
// Send None as the response for the request
if let Some(channel) = self.outbound_request_responses.remove(&request_id) {
let _: Result<_, _> = channel.send(None);
let _: Result<_, _> = channel.send(Response::None);
}
}
reqres::Event::InboundFailure { .. } | reqres::Event::ResponseSent { .. } => {}
@@ -136,9 +137,19 @@ impl SwarmTask {
async fn run(mut self) {
loop {
let time_till_keep_alive = Instant::now().saturating_duration_since(self.last_message);
let time_till_rebuild_peers = self.rebuild_peers_at.saturating_duration_since(Instant::now());
tokio::select! {
() = tokio::time::sleep(time_till_keep_alive) => {
let peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
let behavior = self.swarm.behaviour_mut();
for peer in peers {
behavior.reqres.send_request(&peer, Request::KeepAlive);
}
self.last_message = Instant::now();
}
// Dial peers we're instructed to
dial_opts = self.to_dial.recv() => {
let dial_opts = dial_opts.expect("DialTask was closed?");
@@ -156,8 +167,6 @@ impl SwarmTask {
We also use this to disconnect all peers who are no longer active in any network.
*/
() = tokio::time::sleep(time_till_rebuild_peers) => {
const TIME_BETWEEN_REBUILD_PEERS: Duration = Duration::from_secs(10 * 60);
let validators_by_network = self.validators.read().await.by_network().clone();
let connected_peers = self.swarm.connected_peers().copied().collect::<HashSet<_>>();
@@ -253,6 +262,7 @@ impl SwarmTask {
let topic = message.topic();
let message = borsh::to_vec(&message).unwrap();
let _: Result<_, _> = self.swarm.behaviour_mut().gossip.publish(topic, message);
self.last_message = Instant::now();
}
request = self.outbound_requests.recv() => {
@@ -273,4 +283,54 @@ impl SwarmTask {
}
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn spawn(
dial_task: TaskHandle,
to_dial: mpsc::UnboundedReceiver<DialOpts>,
validators: Arc<RwLock<Validators>>,
peers: Peers,
swarm: Swarm<Behavior>,
gossip: mpsc::UnboundedReceiver<gossip::Message>,
signed_cosigns: mpsc::UnboundedSender<SignedCosign>,
tributary_gossip: mpsc::UnboundedSender<(ValidatorSet, Vec<u8>)>,
outbound_requests: mpsc::UnboundedReceiver<(PeerId, Request, oneshot::Sender<Response>)>,
heartbeat_requests: mpsc::UnboundedSender<(RequestId, ValidatorSet, [u8; 32])>,
notable_cosign_requests: mpsc::UnboundedSender<(RequestId, [u8; 32])>,
inbound_request_responses: mpsc::UnboundedReceiver<(RequestId, Response)>,
) {
tokio::spawn(
SwarmTask {
dial_task,
to_dial,
last_dial_task_run: Instant::now(),
validators,
peers,
rebuild_peers_at: Instant::now() + TIME_BETWEEN_REBUILD_PEERS,
swarm,
last_message: Instant::now(),
gossip,
signed_cosigns,
tributary_gossip,
outbound_requests,
outbound_request_responses: HashMap::new(),
inbound_request_response_channels: HashMap::new(),
heartbeat_requests,
notable_cosign_requests,
inbound_request_responses,
}
.run(),
);
}
}

View File

@@ -1,4 +1,4 @@
use core::borrow::Borrow;
use core::{borrow::Borrow, future::Future};
use std::{
sync::Arc,
collections::{HashSet, HashMap},
@@ -6,12 +6,14 @@ use std::{
use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, Serai};
use serai_task::{Task, ContinuallyRan};
use libp2p::PeerId;
use futures_util::stream::{StreamExt, FuturesUnordered};
use tokio::sync::RwLock;
use crate::p2p::peer_id_from_public;
use crate::p2p::libp2p::peer_id_from_public;
pub(crate) struct Validators {
serai: Serai,
@@ -25,6 +27,15 @@ pub(crate) struct Validators {
}
impl Validators {
pub(crate) fn new(serai: Serai) -> Self {
Validators {
serai,
sessions: HashMap::new(),
by_network: HashMap::new(),
validators: HashMap::new(),
}
}
async fn session_changes(
serai: impl Borrow<Serai>,
sessions: impl Borrow<HashMap<NetworkId, Session>>,
@@ -103,8 +114,6 @@ impl Validators {
}
/// Update the view of the validators.
///
/// Returns all validators removed from the active validator set.
pub(crate) async fn update(&mut self) -> Result<(), String> {
let session_changes = Self::session_changes(&self.serai, &self.sessions).await?;
self.incorporate_session_changes(session_changes);
@@ -124,19 +133,51 @@ impl Validators {
}
}
/// Update the view of the validators.
/// A task which updates a set of validators.
///
/// This minimizes the time an exclusive lock is held over the validators to minimize the
/// disruption to functioning.
///
/// Returns all validators removed from the active validator set.
pub(crate) async fn update_shared_validators(
validators: &Arc<RwLock<Validators>>,
) -> Result<(), String> {
let session_changes = {
let validators = validators.read().await;
Validators::session_changes(validators.serai.clone(), validators.sessions.clone()).await?
};
validators.write().await.incorporate_session_changes(session_changes);
Ok(())
/// The validators managed by this tak will have their exclusive lock held for a minimal amount of
/// time while the update occurs to minimize the disruption to the services relying on it.
pub(crate) struct UpdateValidatorsTask {
validators: Arc<RwLock<Validators>>,
}
impl UpdateValidatorsTask {
/// Spawn a new instance of the UpdateValidatorsTask.
///
/// This returns a reference to the Validators it updates after spawning itself.
pub(crate) fn spawn(serai: Serai) -> Arc<RwLock<Validators>> {
// The validators which will be updated
let validators = Arc::new(RwLock::new(Validators::new(serai)));
// Define the task
let (update_validators_task, update_validators_task_handle) = Task::new();
// Forget the handle, as dropping the handle would stop the task
core::mem::forget(update_validators_task_handle);
// Spawn the task
tokio::spawn(
(Self { validators: validators.clone() }).continually_run(update_validators_task, vec![]),
);
// Return the validators
validators
}
}
impl ContinuallyRan for UpdateValidatorsTask {
// Only run every minute, not the default of every five seconds
const DELAY_BETWEEN_ITERATIONS: u64 = 60;
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move {
let session_changes = {
let validators = self.validators.read().await;
Validators::session_changes(validators.serai.clone(), validators.sessions.clone())
.await
.map_err(|e| format!("{e:?}"))?
};
self.validators.write().await.incorporate_session_changes(session_changes);
Ok(true)
}
}
}

View File

@@ -1,94 +1,31 @@
use core::future::Future;
use std::{
sync::Arc,
collections::{HashSet, HashMap},
};
use serai_client::primitives::{NetworkId, PublicKey};
use borsh::{BorshSerialize, BorshDeserialize};
use tokio::sync::RwLock;
use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet};
use serai_task::ContinuallyRan;
use libp2p::{multihash::Multihash, identity::PeerId, swarm::NetworkBehaviour};
/// A struct to sync the validators from the Serai node in order to keep track of them.
mod validators;
use validators::{Validators, update_shared_validators};
/// The authentication protocol upgrade to limit the P2P network to active validators.
mod authenticate;
/// The dial task, to find new peers to connect to
mod dial;
/// The request-response messages and behavior
mod reqres;
use reqres::{Request, Response};
/// The gossip messages and behavior
mod gossip;
/// The libp2p-backed P2p network
mod libp2p;
/// The heartbeat task, effecting sync of Tributaries
mod heartbeat;
/// The swarm task, running it and dispatching to/from it
mod swarm;
const PORT: u16 = 30563; // 5132 ^ (('c' << 8) | 'o')
fn peer_id_from_public(public: PublicKey) -> PeerId {
// 0 represents the identity Multihash, that no hash was performed
// It's an internal constant so we can't refer to the constant inside libp2p
PeerId::from_multihash(Multihash::wrap(0, &public.0).unwrap()).unwrap()
/// A tributary block and its commit.
#[derive(Clone, BorshSerialize, BorshDeserialize)]
pub(crate) struct TributaryBlockWithCommit {
pub(crate) block: Vec<u8>,
pub(crate) commit: Vec<u8>,
}
struct Peer;
impl Peer {
async fn send(&self, request: Request) -> Result<Response, tokio::time::error::Elapsed> {
(async move { todo!("TODO") }).await
}
trait Peer<'a>: Send {
fn send_heartbeat(
&self,
set: ValidatorSet,
latest_block_hash: [u8; 32],
) -> impl Send + Future<Output = Option<Vec<TributaryBlockWithCommit>>>;
}
#[derive(Clone)]
struct Peers {
peers: Arc<RwLock<HashMap<NetworkId, HashSet<PeerId>>>>,
}
#[derive(Clone, Debug)]
struct P2p;
impl P2p {
async fn peers(&self, set: NetworkId) -> Vec<Peer> {
(async move { todo!("TODO") }).await
}
}
#[async_trait::async_trait]
impl tributary::P2p for P2p {
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) {
todo!("TODO")
}
}
#[derive(NetworkBehaviour)]
struct Behavior {
reqres: reqres::Behavior,
gossip: gossip::Behavior,
}
struct UpdateSharedValidatorsTask {
validators: Arc<RwLock<Validators>>,
}
impl ContinuallyRan for UpdateSharedValidatorsTask {
// Only run every minute, not the default of every five seconds
const DELAY_BETWEEN_ITERATIONS: u64 = 60;
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move {
update_shared_validators(&self.validators).await.map_err(|e| format!("{e:?}"))?;
Ok(true)
}
}
trait P2p: Send + Sync + tributary::P2p {
type Peer<'a>: Peer<'a>;
fn peers(&self, network: NetworkId) -> impl Send + Future<Output = Vec<Self::Peer<'_>>>;
}

View File

@@ -16,7 +16,6 @@ rustdoc-args = ["--cfg", "docsrs"]
workspace = true
[dependencies]
async-trait = { version = "0.1", default-features = false }
thiserror = { version = "2", default-features = false, features = ["std"] }
subtle = { version = "^2", default-features = false, features = ["std"] }

View File

@@ -1,8 +1,6 @@
use core::{marker::PhantomData, fmt::Debug};
use core::{marker::PhantomData, fmt::Debug, future::Future};
use std::{sync::Arc, io};
use async_trait::async_trait;
use zeroize::Zeroizing;
use ciphersuite::{Ciphersuite, Ristretto};
@@ -131,20 +129,18 @@ pub trait ReadWrite: Sized {
}
}
#[async_trait]
pub trait P2p: 'static + Send + Sync + Clone + Debug {
pub trait P2p: 'static + Send + Sync + Clone {
/// Broadcast a message to all other members of the Tributary with the specified genesis.
///
/// The Tributary will re-broadcast consensus messages on a fixed interval to ensure they aren't
/// prematurely dropped from the P2P layer. THe P2P layer SHOULD perform content-based
/// deduplication to ensure a sane amount of load.
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>);
fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) -> impl Send + Future<Output = ()>;
}
#[async_trait]
impl<P: P2p> P2p for Arc<P> {
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) {
(*self).broadcast(genesis, msg).await
fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) -> impl Send + Future<Output = ()> {
P::broadcast(self, genesis, msg)
}
}

View File

@@ -1,8 +1,6 @@
use core::ops::Deref;
use core::{ops::Deref, future::Future};
use std::{sync::Arc, collections::HashMap};
use async_trait::async_trait;
use subtle::ConstantTimeEq;
use zeroize::{Zeroize, Zeroizing};
@@ -74,19 +72,20 @@ impl Signer {
}
}
#[async_trait]
impl SignerTrait for Signer {
type ValidatorId = [u8; 32];
type Signature = [u8; 64];
/// Returns the validator's current ID. Returns None if they aren't a current validator.
async fn validator_id(&self) -> Option<Self::ValidatorId> {
Some((Ristretto::generator() * self.key.deref()).to_bytes())
fn validator_id(&self) -> impl Send + Future<Output = Option<Self::ValidatorId>> {
async move { Some((Ristretto::generator() * self.key.deref()).to_bytes()) }
}
/// Sign a signature with the current validator's private key.
async fn sign(&self, msg: &[u8]) -> Self::Signature {
let mut nonce = Zeroizing::new(RecommendedTranscript::new(b"Tributary Chain Tendermint Nonce"));
fn sign(&self, msg: &[u8]) -> impl Send + Future<Output = Self::Signature> {
async move {
let mut nonce =
Zeroizing::new(RecommendedTranscript::new(b"Tributary Chain Tendermint Nonce"));
nonce.append_message(b"genesis", self.genesis);
nonce.append_message(b"key", Zeroizing::new(self.key.deref().to_repr()).as_ref());
nonce.append_message(b"message", msg);
@@ -120,6 +119,7 @@ impl SignerTrait for Signer {
res
}
}
}
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct Validators {
@@ -274,7 +274,6 @@ pub const BLOCK_PROCESSING_TIME: u32 = 999;
pub const LATENCY_TIME: u32 = 1667;
pub const TARGET_BLOCK_TIME: u32 = BLOCK_PROCESSING_TIME + (3 * LATENCY_TIME);
#[async_trait]
impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P> {
type Db = D;
@@ -300,13 +299,20 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
self.validators.clone()
}
async fn broadcast(&mut self, msg: SignedMessageFor<Self>) {
fn broadcast(&mut self, msg: SignedMessageFor<Self>) -> impl Send + Future<Output = ()> {
async move {
let mut to_broadcast = vec![TENDERMINT_MESSAGE];
to_broadcast.extend(msg.encode());
self.p2p.broadcast(self.genesis, to_broadcast).await
}
}
async fn slash(&mut self, validator: Self::ValidatorId, slash_event: SlashEvent) {
fn slash(
&mut self,
validator: Self::ValidatorId,
slash_event: SlashEvent,
) -> impl Send + Future<Output = ()> {
async move {
log::error!(
"validator {} triggered a slash event on tributary {} (with evidence: {})",
hex::encode(validator),
@@ -340,8 +346,13 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
self.p2p.broadcast(signer.genesis, to_broadcast).await;
}
}
}
async fn validate(&self, block: &Self::Block) -> Result<(), TendermintBlockError> {
fn validate(
&self,
block: &Self::Block,
) -> impl Send + Future<Output = Result<(), TendermintBlockError>> {
async move {
let block =
Block::read::<&[u8]>(&mut block.0.as_ref()).map_err(|_| TendermintBlockError::Fatal)?;
self
@@ -357,12 +368,14 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
}
})
}
}
async fn add_block(
fn add_block(
&mut self,
serialized_block: Self::Block,
commit: Commit<Self::SignatureScheme>,
) -> Option<Self::Block> {
) -> impl Send + Future<Output = Option<Self::Block>> {
async move {
let invalid_block = || {
// There's a fatal flaw in the code, it's behind a hard fork, or the validators turned
// malicious
@@ -408,3 +421,4 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
))
}
}
}

View File

@@ -1,11 +1,12 @@
use core::future::Future;
pub use crate::P2p;
#[derive(Clone, Debug)]
pub struct DummyP2p;
#[async_trait::async_trait]
impl P2p for DummyP2p {
async fn broadcast(&self, _: [u8; 32], _: Vec<u8>) {
unimplemented!()
fn broadcast(&self, _: [u8; 32], _: Vec<u8>) -> impl Send + Future<Output = ()> {
async move { unimplemented!() }
}
}

View File

@@ -1,4 +1,7 @@
use core::future::Future;
use tendermint::ext::Network;
use crate::{
P2p, TendermintTx,
tendermint::{TARGET_BLOCK_TIME, TendermintNetwork},
@@ -11,10 +14,9 @@ fn assert_target_block_time() {
#[derive(Clone, Debug)]
pub struct DummyP2p;
#[async_trait::async_trait]
impl P2p for DummyP2p {
async fn broadcast(&self, _: [u8; 32], _: Vec<u8>) {
unimplemented!()
fn broadcast(&self, _: [u8; 32], _: Vec<u8>) -> impl Send + Future<Output = ()> {
async move { unimplemented!() }
}
}

View File

@@ -16,7 +16,6 @@ rustdoc-args = ["--cfg", "docsrs"]
workspace = true
[dependencies]
async-trait = { version = "0.1", default-features = false }
thiserror = { version = "2", default-features = false, features = ["std"] }
hex = { version = "0.4", default-features = false, features = ["std"] }

View File

@@ -1,7 +1,6 @@
use core::{hash::Hash, fmt::Debug};
use core::{hash::Hash, fmt::Debug, future::Future};
use std::{sync::Arc, collections::HashSet};
use async_trait::async_trait;
use thiserror::Error;
use parity_scale_codec::{Encode, Decode};
@@ -34,7 +33,6 @@ pub struct BlockNumber(pub u64);
pub struct RoundNumber(pub u32);
/// A signer for a validator.
#[async_trait]
pub trait Signer: Send + Sync {
// Type used to identify validators.
type ValidatorId: ValidatorId;
@@ -42,22 +40,21 @@ pub trait Signer: Send + Sync {
type Signature: Signature;
/// Returns the validator's current ID. Returns None if they aren't a current validator.
async fn validator_id(&self) -> Option<Self::ValidatorId>;
fn validator_id(&self) -> impl Send + Future<Output = Option<Self::ValidatorId>>;
/// Sign a signature with the current validator's private key.
async fn sign(&self, msg: &[u8]) -> Self::Signature;
fn sign(&self, msg: &[u8]) -> impl Send + Future<Output = Self::Signature>;
}
#[async_trait]
impl<S: Signer> Signer for Arc<S> {
type ValidatorId = S::ValidatorId;
type Signature = S::Signature;
async fn validator_id(&self) -> Option<Self::ValidatorId> {
self.as_ref().validator_id().await
fn validator_id(&self) -> impl Send + Future<Output = Option<Self::ValidatorId>> {
self.as_ref().validator_id()
}
async fn sign(&self, msg: &[u8]) -> Self::Signature {
self.as_ref().sign(msg).await
fn sign(&self, msg: &[u8]) -> impl Send + Future<Output = Self::Signature> {
self.as_ref().sign(msg)
}
}
@@ -210,7 +207,6 @@ pub trait Block: Send + Sync + Clone + PartialEq + Eq + Debug + Encode + Decode
}
/// Trait representing the distributed system Tendermint is providing consensus over.
#[async_trait]
pub trait Network: Sized + Send + Sync {
/// The database used to back this.
type Db: serai_db::Db;
@@ -229,6 +225,7 @@ pub trait Network: Sized + Send + Sync {
/// This should include both the time to download the block and the actual processing time.
///
/// BLOCK_PROCESSING_TIME + (3 * LATENCY_TIME) must be divisible by 1000.
// TODO: Redefine as Duration
const BLOCK_PROCESSING_TIME: u32;
/// Network latency time in milliseconds.
///
@@ -280,15 +277,19 @@ pub trait Network: Sized + Send + Sync {
/// Switching to unauthenticated channels in a system already providing authenticated channels is
/// not recommended as this is a minor, temporal inefficiency, while downgrading channels may
/// have wider implications.
async fn broadcast(&mut self, msg: SignedMessageFor<Self>);
fn broadcast(&mut self, msg: SignedMessageFor<Self>) -> impl Send + Future<Output = ()>;
/// Trigger a slash for the validator in question who was definitively malicious.
///
/// The exact process of triggering a slash is undefined and left to the network as a whole.
async fn slash(&mut self, validator: Self::ValidatorId, slash_event: SlashEvent);
fn slash(
&mut self,
validator: Self::ValidatorId,
slash_event: SlashEvent,
) -> impl Send + Future<Output = ()>;
/// Validate a block.
async fn validate(&self, block: &Self::Block) -> Result<(), BlockError>;
fn validate(&self, block: &Self::Block) -> impl Send + Future<Output = Result<(), BlockError>>;
/// Add a block, returning the proposal for the next one.
///
@@ -298,9 +299,9 @@ pub trait Network: Sized + Send + Sync {
/// This deviates from the paper which will have a local node refuse to decide on a block it
/// considers invalid. This library acknowledges the network did decide on it, leaving handling
/// of it to the network, and outside of this scope.
async fn add_block(
fn add_block(
&mut self,
block: Self::Block,
commit: Commit<Self::SignatureScheme>,
) -> Option<Self::Block>;
) -> impl Send + Future<Output = Option<Self::Block>>;
}

View File

@@ -1,10 +1,9 @@
use core::future::Future;
use std::{
sync::Arc,
time::{UNIX_EPOCH, SystemTime, Duration},
};
use async_trait::async_trait;
use parity_scale_codec::{Encode, Decode};
use futures_util::sink::SinkExt;
@@ -21,22 +20,23 @@ type TestValidatorId = u16;
type TestBlockId = [u8; 4];
struct TestSigner(u16);
#[async_trait]
impl Signer for TestSigner {
type ValidatorId = TestValidatorId;
type Signature = [u8; 32];
async fn validator_id(&self) -> Option<TestValidatorId> {
Some(self.0)
fn validator_id(&self) -> impl Send + Future<Output = Option<TestValidatorId>> {
async move { Some(self.0) }
}
async fn sign(&self, msg: &[u8]) -> [u8; 32] {
fn sign(&self, msg: &[u8]) -> impl Send + Future<Output = [u8; 32]> {
async move {
let mut sig = [0; 32];
sig[.. 2].copy_from_slice(&self.0.to_le_bytes());
sig[2 .. (2 + 30.min(msg.len()))].copy_from_slice(&msg[.. 30.min(msg.len())]);
sig
}
}
}
#[derive(Clone)]
struct TestSignatureScheme;
@@ -111,7 +111,6 @@ struct TestNetwork(
Arc<RwLock<Vec<(MessageSender<Self>, SyncedBlockSender<Self>, SyncedBlockResultReceiver)>>>,
);
#[async_trait]
impl Network for TestNetwork {
type Db = MemDb;