mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-12 14:09:25 +00:00
Compare commits
6 Commits
a731c0005d
...
b2bd5d3a44
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b2bd5d3a44 | ||
|
|
de2d6568a4 | ||
|
|
fd9b464b35 | ||
|
|
376a66b000 | ||
|
|
2121a9b131 | ||
|
|
419223c54e |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -10498,7 +10498,6 @@ dependencies = [
|
|||||||
name = "tendermint-machine"
|
name = "tendermint-machine"
|
||||||
version = "0.2.0"
|
version = "0.2.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-trait",
|
|
||||||
"futures-channel",
|
"futures-channel",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"hex",
|
"hex",
|
||||||
@@ -10941,7 +10940,6 @@ dependencies = [
|
|||||||
name = "tributary-chain"
|
name = "tributary-chain"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-trait",
|
|
||||||
"blake2",
|
"blake2",
|
||||||
"ciphersuite",
|
"ciphersuite",
|
||||||
"flexible-transcript",
|
"flexible-transcript",
|
||||||
|
|||||||
@@ -1,9 +1,10 @@
|
|||||||
use core::future::Future;
|
use core::future::Future;
|
||||||
|
|
||||||
use std::time::{Duration, SystemTime};
|
use std::time::{Duration, SystemTime};
|
||||||
|
|
||||||
use serai_client::validator_sets::primitives::ValidatorSet;
|
use serai_client::validator_sets::primitives::ValidatorSet;
|
||||||
|
|
||||||
|
use futures_util::FutureExt;
|
||||||
|
|
||||||
use tributary::{ReadWrite, Block, Tributary, TributaryReader};
|
use tributary::{ReadWrite, Block, Tributary, TributaryReader};
|
||||||
|
|
||||||
use serai_db::*;
|
use serai_db::*;
|
||||||
@@ -11,10 +12,7 @@ use serai_task::ContinuallyRan;
|
|||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
tributary::Transaction,
|
tributary::Transaction,
|
||||||
p2p::{
|
p2p::{Peer, P2p},
|
||||||
reqres::{Request, Response},
|
|
||||||
P2p,
|
|
||||||
},
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// Amount of blocks in a minute
|
// Amount of blocks in a minute
|
||||||
@@ -28,14 +26,14 @@ pub const BLOCKS_PER_BATCH: usize = BLOCKS_PER_MINUTE + 1;
|
|||||||
///
|
///
|
||||||
/// If the other validator has more blocks then we do, they're expected to inform us. This forms
|
/// If the other validator has more blocks then we do, they're expected to inform us. This forms
|
||||||
/// the sync protocol for our Tributaries.
|
/// the sync protocol for our Tributaries.
|
||||||
struct HeartbeatTask<TD: Db> {
|
struct HeartbeatTask<TD: Db, P: P2p> {
|
||||||
set: ValidatorSet,
|
set: ValidatorSet,
|
||||||
tributary: Tributary<TD, Transaction, P2p>,
|
tributary: Tributary<TD, Transaction, P>,
|
||||||
reader: TributaryReader<TD, Transaction>,
|
reader: TributaryReader<TD, Transaction>,
|
||||||
p2p: P2p,
|
p2p: P,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TD: Db> ContinuallyRan for HeartbeatTask<TD> {
|
impl<TD: Db, P: P2p> ContinuallyRan for HeartbeatTask<TD, P> {
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||||
async move {
|
async move {
|
||||||
// If our blockchain hasn't had a block in the past minute, trigger the heartbeat protocol
|
// If our blockchain hasn't had a block in the past minute, trigger the heartbeat protocol
|
||||||
@@ -74,8 +72,10 @@ impl<TD: Db> ContinuallyRan for HeartbeatTask<TD> {
|
|||||||
tip = self.reader.tip();
|
tip = self.reader.tip();
|
||||||
tip_is_stale = false;
|
tip_is_stale = false;
|
||||||
}
|
}
|
||||||
let request = Request::Heartbeat { set: self.set, latest_block_hash: tip };
|
// Necessary due to https://github.com/rust-lang/rust/issues/100013
|
||||||
let Ok(Response::Blocks(blocks)) = peer.send(request).await else { continue 'peer };
|
let Some(blocks) = peer.send_heartbeat(self.set, tip).boxed().await else {
|
||||||
|
continue 'peer;
|
||||||
|
};
|
||||||
|
|
||||||
// This is the final batch if it has less than the maximum amount of blocks
|
// This is the final batch if it has less than the maximum amount of blocks
|
||||||
// (signifying there weren't more blocks after this to fill the batch with)
|
// (signifying there weren't more blocks after this to fill the batch with)
|
||||||
|
|||||||
@@ -19,14 +19,15 @@ use libp2p::{
|
|||||||
noise,
|
noise,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::p2p::{validators::Validators, peer_id_from_public};
|
use crate::p2p::libp2p::{validators::Validators, peer_id_from_public};
|
||||||
|
|
||||||
const PROTOCOL: &str = "/serai/coordinator/validators";
|
const PROTOCOL: &str = "/serai/coordinator/validators";
|
||||||
|
|
||||||
struct OnlyValidators {
|
#[derive(Clone)]
|
||||||
validators: Arc<RwLock<Validators>>,
|
pub(crate) struct OnlyValidators {
|
||||||
serai_key: Zeroizing<Keypair>,
|
pub(crate) validators: Arc<RwLock<Validators>>,
|
||||||
noise_keypair: identity::Keypair,
|
pub(crate) serai_key: Zeroizing<Keypair>,
|
||||||
|
pub(crate) noise_keypair: identity::Keypair,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl OnlyValidators {
|
impl OnlyValidators {
|
||||||
@@ -14,7 +14,7 @@ use libp2p::{
|
|||||||
|
|
||||||
use serai_task::ContinuallyRan;
|
use serai_task::ContinuallyRan;
|
||||||
|
|
||||||
use crate::p2p::{PORT, Peers, validators::Validators};
|
use crate::p2p::libp2p::{PORT, Peers, validators::Validators};
|
||||||
|
|
||||||
const TARGET_PEERS_PER_NETWORK: usize = 5;
|
const TARGET_PEERS_PER_NETWORK: usize = 5;
|
||||||
/*
|
/*
|
||||||
@@ -28,13 +28,19 @@ const TARGET_PEERS_PER_NETWORK: usize = 5;
|
|||||||
*/
|
*/
|
||||||
// TODO const TARGET_DIALED_PEERS_PER_NETWORK: usize = 3;
|
// TODO const TARGET_DIALED_PEERS_PER_NETWORK: usize = 3;
|
||||||
|
|
||||||
struct DialTask {
|
pub(crate) struct DialTask {
|
||||||
serai: Serai,
|
serai: Serai,
|
||||||
validators: Validators,
|
validators: Validators,
|
||||||
peers: Peers,
|
peers: Peers,
|
||||||
to_dial: mpsc::UnboundedSender<DialOpts>,
|
to_dial: mpsc::UnboundedSender<DialOpts>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl DialTask {
|
||||||
|
pub(crate) fn new(serai: Serai, peers: Peers, to_dial: mpsc::UnboundedSender<DialOpts>) -> Self {
|
||||||
|
DialTask { serai: serai.clone(), validators: Validators::new(serai), peers, to_dial }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl ContinuallyRan for DialTask {
|
impl ContinuallyRan for DialTask {
|
||||||
// Only run every five minutes, not the default of every five seconds
|
// Only run every five minutes, not the default of every five seconds
|
||||||
const DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60;
|
const DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60;
|
||||||
@@ -15,7 +15,7 @@ pub use libp2p::gossipsub::Event;
|
|||||||
use serai_cosign::SignedCosign;
|
use serai_cosign::SignedCosign;
|
||||||
|
|
||||||
// Block size limit + 16 KB of space for signatures/metadata
|
// Block size limit + 16 KB of space for signatures/metadata
|
||||||
const MAX_LIBP2P_GOSSIP_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 16384;
|
pub(crate) const MAX_LIBP2P_GOSSIP_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 16384;
|
||||||
|
|
||||||
const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(80);
|
const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(80);
|
||||||
|
|
||||||
214
coordinator/src/p2p/libp2p/mod.rs
Normal file
214
coordinator/src/p2p/libp2p/mod.rs
Normal file
@@ -0,0 +1,214 @@
|
|||||||
|
use core::{future::Future, time::Duration};
|
||||||
|
use std::{
|
||||||
|
sync::Arc,
|
||||||
|
collections::{HashSet, HashMap},
|
||||||
|
};
|
||||||
|
|
||||||
|
use zeroize::Zeroizing;
|
||||||
|
use schnorrkel::Keypair;
|
||||||
|
|
||||||
|
use serai_client::{
|
||||||
|
primitives::{NetworkId, PublicKey},
|
||||||
|
validator_sets::primitives::ValidatorSet,
|
||||||
|
Serai,
|
||||||
|
};
|
||||||
|
|
||||||
|
use tokio::sync::{mpsc, oneshot, RwLock};
|
||||||
|
|
||||||
|
use serai_task::{Task, ContinuallyRan};
|
||||||
|
|
||||||
|
use libp2p::{
|
||||||
|
multihash::Multihash,
|
||||||
|
identity::{self, PeerId},
|
||||||
|
tcp::Config as TcpConfig,
|
||||||
|
yamux,
|
||||||
|
swarm::NetworkBehaviour,
|
||||||
|
SwarmBuilder,
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::p2p::TributaryBlockWithCommit;
|
||||||
|
|
||||||
|
/// A struct to sync the validators from the Serai node in order to keep track of them.
|
||||||
|
mod validators;
|
||||||
|
use validators::UpdateValidatorsTask;
|
||||||
|
|
||||||
|
/// The authentication protocol upgrade to limit the P2P network to active validators.
|
||||||
|
mod authenticate;
|
||||||
|
use authenticate::OnlyValidators;
|
||||||
|
|
||||||
|
/// The dial task, to find new peers to connect to
|
||||||
|
mod dial;
|
||||||
|
use dial::DialTask;
|
||||||
|
|
||||||
|
/// The request-response messages and behavior
|
||||||
|
mod reqres;
|
||||||
|
use reqres::{Request, Response};
|
||||||
|
|
||||||
|
/// The gossip messages and behavior
|
||||||
|
mod gossip;
|
||||||
|
|
||||||
|
/// The swarm task, running it and dispatching to/from it
|
||||||
|
mod swarm;
|
||||||
|
use swarm::SwarmTask;
|
||||||
|
|
||||||
|
const PORT: u16 = 30563; // 5132 ^ (('c' << 8) | 'o')
|
||||||
|
|
||||||
|
// usize::max, manually implemented, as max isn't a const fn
|
||||||
|
const MAX_LIBP2P_MESSAGE_SIZE: usize =
|
||||||
|
if gossip::MAX_LIBP2P_GOSSIP_MESSAGE_SIZE > reqres::MAX_LIBP2P_REQRES_MESSAGE_SIZE {
|
||||||
|
gossip::MAX_LIBP2P_GOSSIP_MESSAGE_SIZE
|
||||||
|
} else {
|
||||||
|
reqres::MAX_LIBP2P_REQRES_MESSAGE_SIZE
|
||||||
|
};
|
||||||
|
|
||||||
|
fn peer_id_from_public(public: PublicKey) -> PeerId {
|
||||||
|
// 0 represents the identity Multihash, that no hash was performed
|
||||||
|
// It's an internal constant so we can't refer to the constant inside libp2p
|
||||||
|
PeerId::from_multihash(Multihash::wrap(0, &public.0).unwrap()).unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Peer<'a> {
|
||||||
|
outbound_requests: &'a mpsc::UnboundedSender<(PeerId, Request, oneshot::Sender<Response>)>,
|
||||||
|
id: PeerId,
|
||||||
|
}
|
||||||
|
impl crate::p2p::Peer<'_> for Peer<'_> {
|
||||||
|
fn send_heartbeat(
|
||||||
|
&self,
|
||||||
|
set: ValidatorSet,
|
||||||
|
latest_block_hash: [u8; 32],
|
||||||
|
) -> impl Send + Future<Output = Option<Vec<TributaryBlockWithCommit>>> {
|
||||||
|
const HEARBEAT_TIMEOUT: Duration = Duration::from_secs(5);
|
||||||
|
async move {
|
||||||
|
let request = Request::Heartbeat { set, latest_block_hash };
|
||||||
|
let (sender, receiver) = oneshot::channel();
|
||||||
|
self
|
||||||
|
.outbound_requests
|
||||||
|
.send((self.id, request, sender))
|
||||||
|
.expect("outbound requests recv channel was dropped?");
|
||||||
|
match tokio::time::timeout(HEARBEAT_TIMEOUT, receiver).await.ok()?.ok()? {
|
||||||
|
Response::None => Some(vec![]),
|
||||||
|
Response::Blocks(blocks) => Some(blocks),
|
||||||
|
// TODO: Disconnect this peer
|
||||||
|
Response::NotableCosigns(_) => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct Peers {
|
||||||
|
peers: Arc<RwLock<HashMap<NetworkId, HashSet<PeerId>>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(NetworkBehaviour)]
|
||||||
|
struct Behavior {
|
||||||
|
reqres: reqres::Behavior,
|
||||||
|
gossip: gossip::Behavior,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct Libp2p {
|
||||||
|
peers: Peers,
|
||||||
|
outbound_requests: mpsc::UnboundedSender<(PeerId, Request, oneshot::Sender<Response>)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Libp2p {
|
||||||
|
pub(crate) fn new(serai_key: &Zeroizing<Keypair>, serai: Serai) -> Libp2p {
|
||||||
|
// Define the object we track peers with
|
||||||
|
let peers = Peers { peers: Arc::new(RwLock::new(HashMap::new())) };
|
||||||
|
|
||||||
|
// Define the dial task
|
||||||
|
let (dial_task_def, dial_task) = Task::new();
|
||||||
|
let (to_dial_send, to_dial_recv) = mpsc::unbounded_channel();
|
||||||
|
tokio::spawn(
|
||||||
|
DialTask::new(serai.clone(), peers.clone(), to_dial_send)
|
||||||
|
.continually_run(dial_task_def, vec![]),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Define the Validators object used for validating new connections
|
||||||
|
let connection_validators = UpdateValidatorsTask::spawn(serai.clone());
|
||||||
|
let new_only_validators = |noise_keypair: &identity::Keypair| -> Result<_, ()> {
|
||||||
|
Ok(OnlyValidators {
|
||||||
|
serai_key: serai_key.clone(),
|
||||||
|
validators: connection_validators.clone(),
|
||||||
|
noise_keypair: noise_keypair.clone(),
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
let new_yamux = || {
|
||||||
|
let mut config = yamux::Config::default();
|
||||||
|
// 1 MiB default + max message size
|
||||||
|
config.set_max_buffer_size((1024 * 1024) + MAX_LIBP2P_MESSAGE_SIZE);
|
||||||
|
// 256 KiB default + max message size
|
||||||
|
config.set_receive_window_size(((256 * 1024) + MAX_LIBP2P_MESSAGE_SIZE).try_into().unwrap());
|
||||||
|
config
|
||||||
|
};
|
||||||
|
|
||||||
|
let behavior = Behavior { reqres: reqres::new_behavior(), gossip: gossip::new_behavior() };
|
||||||
|
|
||||||
|
let mut swarm = SwarmBuilder::with_existing_identity(identity::Keypair::generate_ed25519())
|
||||||
|
.with_tokio()
|
||||||
|
.with_tcp(TcpConfig::default().nodelay(false), new_only_validators, new_yamux)
|
||||||
|
.unwrap()
|
||||||
|
.with_behaviour(|_| behavior)
|
||||||
|
.unwrap()
|
||||||
|
.build();
|
||||||
|
swarm.listen_on(format!("/ip4/0.0.0.0/tcp/{PORT}").parse().unwrap()).unwrap();
|
||||||
|
swarm.listen_on(format!("/ip6/::/tcp/{PORT}").parse().unwrap()).unwrap();
|
||||||
|
|
||||||
|
let swarm_validators = UpdateValidatorsTask::spawn(serai);
|
||||||
|
|
||||||
|
let (gossip_send, gossip_recv) = mpsc::unbounded_channel();
|
||||||
|
let (signed_cosigns_send, signed_cosigns_recv) = mpsc::unbounded_channel();
|
||||||
|
let (tributary_gossip_send, tributary_gossip_recv) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
let (outbound_requests_send, outbound_requests_recv) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
let (heartbeat_requests_send, heartbeat_requests_recv) = mpsc::unbounded_channel();
|
||||||
|
let (notable_cosign_requests_send, notable_cosign_requests_recv) = mpsc::unbounded_channel();
|
||||||
|
let (inbound_request_responses_send, inbound_request_responses_recv) =
|
||||||
|
mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
// Create the swarm task
|
||||||
|
SwarmTask::spawn(
|
||||||
|
dial_task,
|
||||||
|
to_dial_recv,
|
||||||
|
swarm_validators,
|
||||||
|
peers,
|
||||||
|
swarm,
|
||||||
|
gossip_recv,
|
||||||
|
signed_cosigns_send,
|
||||||
|
tributary_gossip_send,
|
||||||
|
outbound_requests_recv,
|
||||||
|
heartbeat_requests_send,
|
||||||
|
notable_cosign_requests_send,
|
||||||
|
inbound_request_responses_recv,
|
||||||
|
);
|
||||||
|
|
||||||
|
// gossip_send, signed_cosigns_recv, tributary_gossip_recv, outbound_requests_send,
|
||||||
|
// heartbeat_requests_recv, notable_cosign_requests_recv, inbound_request_responses_send
|
||||||
|
todo!("TODO");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl tributary::P2p for Libp2p {
|
||||||
|
fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) -> impl Send + Future<Output = ()> {
|
||||||
|
async move { todo!("TODO") }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl crate::p2p::P2p for Libp2p {
|
||||||
|
type Peer<'a> = Peer<'a>;
|
||||||
|
fn peers(&self, network: NetworkId) -> impl Send + Future<Output = Vec<Self::Peer<'_>>> {
|
||||||
|
async move {
|
||||||
|
let Some(peer_ids) = self.peers.peers.read().await.get(&network).cloned() else {
|
||||||
|
return vec![];
|
||||||
|
};
|
||||||
|
let mut res = vec![];
|
||||||
|
for id in peer_ids {
|
||||||
|
res.push(Peer { outbound_requests: &self.outbound_requests, id });
|
||||||
|
}
|
||||||
|
res
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -15,9 +15,11 @@ pub use request_response::Message;
|
|||||||
|
|
||||||
use serai_cosign::SignedCosign;
|
use serai_cosign::SignedCosign;
|
||||||
|
|
||||||
|
use crate::p2p::TributaryBlockWithCommit;
|
||||||
|
|
||||||
/// The maximum message size for the request-response protocol
|
/// The maximum message size for the request-response protocol
|
||||||
// This is derived from the heartbeat message size as it's our largest message
|
// This is derived from the heartbeat message size as it's our largest message
|
||||||
const MAX_LIBP2P_REQRES_MESSAGE_SIZE: usize =
|
pub(crate) const MAX_LIBP2P_REQRES_MESSAGE_SIZE: usize =
|
||||||
(tributary::BLOCK_SIZE_LIMIT * crate::p2p::heartbeat::BLOCKS_PER_BATCH) + 1024;
|
(tributary::BLOCK_SIZE_LIMIT * crate::p2p::heartbeat::BLOCKS_PER_BATCH) + 1024;
|
||||||
|
|
||||||
const PROTOCOL: &str = "/serai/coordinator/reqres/1.0.0";
|
const PROTOCOL: &str = "/serai/coordinator/reqres/1.0.0";
|
||||||
@@ -36,24 +38,17 @@ pub(crate) enum Request {
|
|||||||
NotableCosigns { global_session: [u8; 32] },
|
NotableCosigns { global_session: [u8; 32] },
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A tributary block and its commit.
|
|
||||||
#[derive(Clone, BorshSerialize, BorshDeserialize)]
|
|
||||||
pub(crate) struct TributaryBlockWithCommit {
|
|
||||||
pub(crate) block: Vec<u8>,
|
|
||||||
pub(crate) commit: Vec<u8>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Responses which can be received via the request-response protocol.
|
/// Responses which can be received via the request-response protocol.
|
||||||
#[derive(Clone, BorshSerialize, BorshDeserialize)]
|
#[derive(Clone, BorshSerialize, BorshDeserialize)]
|
||||||
pub(crate) enum Response {
|
pub(crate) enum Response {
|
||||||
NoResponse,
|
None,
|
||||||
Blocks(Vec<TributaryBlockWithCommit>),
|
Blocks(Vec<TributaryBlockWithCommit>),
|
||||||
NotableCosigns(Vec<SignedCosign>),
|
NotableCosigns(Vec<SignedCosign>),
|
||||||
}
|
}
|
||||||
impl fmt::Debug for Response {
|
impl fmt::Debug for Response {
|
||||||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
match self {
|
match self {
|
||||||
Response::NoResponse => fmt.debug_struct("Response::NoResponse").finish(),
|
Response::None => fmt.debug_struct("Response::None").finish(),
|
||||||
Response::Blocks(_) => fmt.debug_struct("Response::Block").finish_non_exhaustive(),
|
Response::Blocks(_) => fmt.debug_struct("Response::Block").finish_non_exhaustive(),
|
||||||
Response::NotableCosigns(_) => {
|
Response::NotableCosigns(_) => {
|
||||||
fmt.debug_struct("Response::NotableCosigns").finish_non_exhaustive()
|
fmt.debug_struct("Response::NotableCosigns").finish_non_exhaustive()
|
||||||
@@ -21,13 +21,16 @@ use libp2p::{
|
|||||||
swarm::{dial_opts::DialOpts, SwarmEvent, Swarm},
|
swarm::{dial_opts::DialOpts, SwarmEvent, Swarm},
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::p2p::{
|
use crate::p2p::libp2p::{
|
||||||
Peers, BehaviorEvent, Behavior,
|
Peers, BehaviorEvent, Behavior,
|
||||||
validators::Validators,
|
validators::Validators,
|
||||||
reqres::{self, Request, Response},
|
reqres::{self, Request, Response},
|
||||||
gossip,
|
gossip,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(80);
|
||||||
|
const TIME_BETWEEN_REBUILD_PEERS: Duration = Duration::from_secs(10 * 60);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
`SwarmTask` handles everything we need the `Swarm` object for. The goal is to minimize the
|
`SwarmTask` handles everything we need the `Swarm` object for. The goal is to minimize the
|
||||||
contention on this task. Unfortunately, the `Swarm` object itself is needed for a variety of
|
contention on this task. Unfortunately, the `Swarm` object itself is needed for a variety of
|
||||||
@@ -43,7 +46,7 @@ use crate::p2p::{
|
|||||||
- Dispatching received requests
|
- Dispatching received requests
|
||||||
- Sending responses
|
- Sending responses
|
||||||
*/
|
*/
|
||||||
struct SwarmTask {
|
pub(crate) struct SwarmTask {
|
||||||
dial_task: TaskHandle,
|
dial_task: TaskHandle,
|
||||||
to_dial: mpsc::UnboundedReceiver<DialOpts>,
|
to_dial: mpsc::UnboundedReceiver<DialOpts>,
|
||||||
last_dial_task_run: Instant,
|
last_dial_task_run: Instant,
|
||||||
@@ -54,12 +57,14 @@ struct SwarmTask {
|
|||||||
|
|
||||||
swarm: Swarm<Behavior>,
|
swarm: Swarm<Behavior>,
|
||||||
|
|
||||||
|
last_message: Instant,
|
||||||
|
|
||||||
gossip: mpsc::UnboundedReceiver<gossip::Message>,
|
gossip: mpsc::UnboundedReceiver<gossip::Message>,
|
||||||
signed_cosigns: mpsc::UnboundedSender<SignedCosign>,
|
signed_cosigns: mpsc::UnboundedSender<SignedCosign>,
|
||||||
tributary_gossip: mpsc::UnboundedSender<(ValidatorSet, Vec<u8>)>,
|
tributary_gossip: mpsc::UnboundedSender<(ValidatorSet, Vec<u8>)>,
|
||||||
|
|
||||||
outbound_requests: mpsc::UnboundedReceiver<(PeerId, Request, oneshot::Sender<Option<Response>>)>,
|
outbound_requests: mpsc::UnboundedReceiver<(PeerId, Request, oneshot::Sender<Response>)>,
|
||||||
outbound_request_responses: HashMap<RequestId, oneshot::Sender<Option<Response>>>,
|
outbound_request_responses: HashMap<RequestId, oneshot::Sender<Response>>,
|
||||||
|
|
||||||
inbound_request_response_channels: HashMap<RequestId, ResponseChannel<Response>>,
|
inbound_request_response_channels: HashMap<RequestId, ResponseChannel<Response>>,
|
||||||
heartbeat_requests: mpsc::UnboundedSender<(RequestId, ValidatorSet, [u8; 32])>,
|
heartbeat_requests: mpsc::UnboundedSender<(RequestId, ValidatorSet, [u8; 32])>,
|
||||||
@@ -99,35 +104,31 @@ impl SwarmTask {
|
|||||||
fn handle_reqres(&mut self, event: reqres::Event) {
|
fn handle_reqres(&mut self, event: reqres::Event) {
|
||||||
match event {
|
match event {
|
||||||
reqres::Event::Message { message, .. } => match message {
|
reqres::Event::Message { message, .. } => match message {
|
||||||
reqres::Message::Request { request_id, request, channel } => {
|
reqres::Message::Request { request_id, request, channel } => match request {
|
||||||
match request {
|
reqres::Request::KeepAlive => {
|
||||||
// TODO: Send these
|
let _: Result<_, _> =
|
||||||
reqres::Request::KeepAlive => {
|
self.swarm.behaviour_mut().reqres.send_response(channel, Response::None);
|
||||||
let _: Result<_, _> =
|
|
||||||
self.swarm.behaviour_mut().reqres.send_response(channel, Response::NoResponse);
|
|
||||||
}
|
|
||||||
reqres::Request::Heartbeat { set, latest_block_hash } => {
|
|
||||||
self.inbound_request_response_channels.insert(request_id, channel);
|
|
||||||
let _: Result<_, _> =
|
|
||||||
self.heartbeat_requests.send((request_id, set, latest_block_hash));
|
|
||||||
}
|
|
||||||
reqres::Request::NotableCosigns { global_session } => {
|
|
||||||
self.inbound_request_response_channels.insert(request_id, channel);
|
|
||||||
let _: Result<_, _> = self.notable_cosign_requests.send((request_id, global_session));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
reqres::Request::Heartbeat { set, latest_block_hash } => {
|
||||||
|
self.inbound_request_response_channels.insert(request_id, channel);
|
||||||
|
let _: Result<_, _> =
|
||||||
|
self.heartbeat_requests.send((request_id, set, latest_block_hash));
|
||||||
|
}
|
||||||
|
reqres::Request::NotableCosigns { global_session } => {
|
||||||
|
self.inbound_request_response_channels.insert(request_id, channel);
|
||||||
|
let _: Result<_, _> = self.notable_cosign_requests.send((request_id, global_session));
|
||||||
|
}
|
||||||
|
},
|
||||||
reqres::Message::Response { request_id, response } => {
|
reqres::Message::Response { request_id, response } => {
|
||||||
// Send Some(response) as the response for the request
|
|
||||||
if let Some(channel) = self.outbound_request_responses.remove(&request_id) {
|
if let Some(channel) = self.outbound_request_responses.remove(&request_id) {
|
||||||
let _: Result<_, _> = channel.send(Some(response));
|
let _: Result<_, _> = channel.send(response);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
reqres::Event::OutboundFailure { request_id, .. } => {
|
reqres::Event::OutboundFailure { request_id, .. } => {
|
||||||
// Send None as the response for the request
|
// Send None as the response for the request
|
||||||
if let Some(channel) = self.outbound_request_responses.remove(&request_id) {
|
if let Some(channel) = self.outbound_request_responses.remove(&request_id) {
|
||||||
let _: Result<_, _> = channel.send(None);
|
let _: Result<_, _> = channel.send(Response::None);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
reqres::Event::InboundFailure { .. } | reqres::Event::ResponseSent { .. } => {}
|
reqres::Event::InboundFailure { .. } | reqres::Event::ResponseSent { .. } => {}
|
||||||
@@ -136,9 +137,19 @@ impl SwarmTask {
|
|||||||
|
|
||||||
async fn run(mut self) {
|
async fn run(mut self) {
|
||||||
loop {
|
loop {
|
||||||
|
let time_till_keep_alive = Instant::now().saturating_duration_since(self.last_message);
|
||||||
let time_till_rebuild_peers = self.rebuild_peers_at.saturating_duration_since(Instant::now());
|
let time_till_rebuild_peers = self.rebuild_peers_at.saturating_duration_since(Instant::now());
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
() = tokio::time::sleep(time_till_keep_alive) => {
|
||||||
|
let peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
|
||||||
|
let behavior = self.swarm.behaviour_mut();
|
||||||
|
for peer in peers {
|
||||||
|
behavior.reqres.send_request(&peer, Request::KeepAlive);
|
||||||
|
}
|
||||||
|
self.last_message = Instant::now();
|
||||||
|
}
|
||||||
|
|
||||||
// Dial peers we're instructed to
|
// Dial peers we're instructed to
|
||||||
dial_opts = self.to_dial.recv() => {
|
dial_opts = self.to_dial.recv() => {
|
||||||
let dial_opts = dial_opts.expect("DialTask was closed?");
|
let dial_opts = dial_opts.expect("DialTask was closed?");
|
||||||
@@ -156,8 +167,6 @@ impl SwarmTask {
|
|||||||
We also use this to disconnect all peers who are no longer active in any network.
|
We also use this to disconnect all peers who are no longer active in any network.
|
||||||
*/
|
*/
|
||||||
() = tokio::time::sleep(time_till_rebuild_peers) => {
|
() = tokio::time::sleep(time_till_rebuild_peers) => {
|
||||||
const TIME_BETWEEN_REBUILD_PEERS: Duration = Duration::from_secs(10 * 60);
|
|
||||||
|
|
||||||
let validators_by_network = self.validators.read().await.by_network().clone();
|
let validators_by_network = self.validators.read().await.by_network().clone();
|
||||||
let connected_peers = self.swarm.connected_peers().copied().collect::<HashSet<_>>();
|
let connected_peers = self.swarm.connected_peers().copied().collect::<HashSet<_>>();
|
||||||
|
|
||||||
@@ -253,6 +262,7 @@ impl SwarmTask {
|
|||||||
let topic = message.topic();
|
let topic = message.topic();
|
||||||
let message = borsh::to_vec(&message).unwrap();
|
let message = borsh::to_vec(&message).unwrap();
|
||||||
let _: Result<_, _> = self.swarm.behaviour_mut().gossip.publish(topic, message);
|
let _: Result<_, _> = self.swarm.behaviour_mut().gossip.publish(topic, message);
|
||||||
|
self.last_message = Instant::now();
|
||||||
}
|
}
|
||||||
|
|
||||||
request = self.outbound_requests.recv() => {
|
request = self.outbound_requests.recv() => {
|
||||||
@@ -273,4 +283,54 @@ impl SwarmTask {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
|
pub(crate) fn spawn(
|
||||||
|
dial_task: TaskHandle,
|
||||||
|
to_dial: mpsc::UnboundedReceiver<DialOpts>,
|
||||||
|
|
||||||
|
validators: Arc<RwLock<Validators>>,
|
||||||
|
peers: Peers,
|
||||||
|
|
||||||
|
swarm: Swarm<Behavior>,
|
||||||
|
|
||||||
|
gossip: mpsc::UnboundedReceiver<gossip::Message>,
|
||||||
|
signed_cosigns: mpsc::UnboundedSender<SignedCosign>,
|
||||||
|
tributary_gossip: mpsc::UnboundedSender<(ValidatorSet, Vec<u8>)>,
|
||||||
|
|
||||||
|
outbound_requests: mpsc::UnboundedReceiver<(PeerId, Request, oneshot::Sender<Response>)>,
|
||||||
|
|
||||||
|
heartbeat_requests: mpsc::UnboundedSender<(RequestId, ValidatorSet, [u8; 32])>,
|
||||||
|
notable_cosign_requests: mpsc::UnboundedSender<(RequestId, [u8; 32])>,
|
||||||
|
inbound_request_responses: mpsc::UnboundedReceiver<(RequestId, Response)>,
|
||||||
|
) {
|
||||||
|
tokio::spawn(
|
||||||
|
SwarmTask {
|
||||||
|
dial_task,
|
||||||
|
to_dial,
|
||||||
|
last_dial_task_run: Instant::now(),
|
||||||
|
|
||||||
|
validators,
|
||||||
|
peers,
|
||||||
|
rebuild_peers_at: Instant::now() + TIME_BETWEEN_REBUILD_PEERS,
|
||||||
|
|
||||||
|
swarm,
|
||||||
|
|
||||||
|
last_message: Instant::now(),
|
||||||
|
|
||||||
|
gossip,
|
||||||
|
signed_cosigns,
|
||||||
|
tributary_gossip,
|
||||||
|
|
||||||
|
outbound_requests,
|
||||||
|
outbound_request_responses: HashMap::new(),
|
||||||
|
|
||||||
|
inbound_request_response_channels: HashMap::new(),
|
||||||
|
heartbeat_requests,
|
||||||
|
notable_cosign_requests,
|
||||||
|
inbound_request_responses,
|
||||||
|
}
|
||||||
|
.run(),
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
use core::borrow::Borrow;
|
use core::{borrow::Borrow, future::Future};
|
||||||
use std::{
|
use std::{
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
collections::{HashSet, HashMap},
|
collections::{HashSet, HashMap},
|
||||||
@@ -6,12 +6,14 @@ use std::{
|
|||||||
|
|
||||||
use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, Serai};
|
use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, Serai};
|
||||||
|
|
||||||
|
use serai_task::{Task, ContinuallyRan};
|
||||||
|
|
||||||
use libp2p::PeerId;
|
use libp2p::PeerId;
|
||||||
|
|
||||||
use futures_util::stream::{StreamExt, FuturesUnordered};
|
use futures_util::stream::{StreamExt, FuturesUnordered};
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
use crate::p2p::peer_id_from_public;
|
use crate::p2p::libp2p::peer_id_from_public;
|
||||||
|
|
||||||
pub(crate) struct Validators {
|
pub(crate) struct Validators {
|
||||||
serai: Serai,
|
serai: Serai,
|
||||||
@@ -25,6 +27,15 @@ pub(crate) struct Validators {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Validators {
|
impl Validators {
|
||||||
|
pub(crate) fn new(serai: Serai) -> Self {
|
||||||
|
Validators {
|
||||||
|
serai,
|
||||||
|
sessions: HashMap::new(),
|
||||||
|
by_network: HashMap::new(),
|
||||||
|
validators: HashMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn session_changes(
|
async fn session_changes(
|
||||||
serai: impl Borrow<Serai>,
|
serai: impl Borrow<Serai>,
|
||||||
sessions: impl Borrow<HashMap<NetworkId, Session>>,
|
sessions: impl Borrow<HashMap<NetworkId, Session>>,
|
||||||
@@ -103,8 +114,6 @@ impl Validators {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Update the view of the validators.
|
/// Update the view of the validators.
|
||||||
///
|
|
||||||
/// Returns all validators removed from the active validator set.
|
|
||||||
pub(crate) async fn update(&mut self) -> Result<(), String> {
|
pub(crate) async fn update(&mut self) -> Result<(), String> {
|
||||||
let session_changes = Self::session_changes(&self.serai, &self.sessions).await?;
|
let session_changes = Self::session_changes(&self.serai, &self.sessions).await?;
|
||||||
self.incorporate_session_changes(session_changes);
|
self.incorporate_session_changes(session_changes);
|
||||||
@@ -124,19 +133,51 @@ impl Validators {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Update the view of the validators.
|
/// A task which updates a set of validators.
|
||||||
///
|
///
|
||||||
/// This minimizes the time an exclusive lock is held over the validators to minimize the
|
/// The validators managed by this tak will have their exclusive lock held for a minimal amount of
|
||||||
/// disruption to functioning.
|
/// time while the update occurs to minimize the disruption to the services relying on it.
|
||||||
///
|
pub(crate) struct UpdateValidatorsTask {
|
||||||
/// Returns all validators removed from the active validator set.
|
validators: Arc<RwLock<Validators>>,
|
||||||
pub(crate) async fn update_shared_validators(
|
}
|
||||||
validators: &Arc<RwLock<Validators>>,
|
|
||||||
) -> Result<(), String> {
|
impl UpdateValidatorsTask {
|
||||||
let session_changes = {
|
/// Spawn a new instance of the UpdateValidatorsTask.
|
||||||
let validators = validators.read().await;
|
///
|
||||||
Validators::session_changes(validators.serai.clone(), validators.sessions.clone()).await?
|
/// This returns a reference to the Validators it updates after spawning itself.
|
||||||
};
|
pub(crate) fn spawn(serai: Serai) -> Arc<RwLock<Validators>> {
|
||||||
validators.write().await.incorporate_session_changes(session_changes);
|
// The validators which will be updated
|
||||||
Ok(())
|
let validators = Arc::new(RwLock::new(Validators::new(serai)));
|
||||||
|
|
||||||
|
// Define the task
|
||||||
|
let (update_validators_task, update_validators_task_handle) = Task::new();
|
||||||
|
// Forget the handle, as dropping the handle would stop the task
|
||||||
|
core::mem::forget(update_validators_task_handle);
|
||||||
|
// Spawn the task
|
||||||
|
tokio::spawn(
|
||||||
|
(Self { validators: validators.clone() }).continually_run(update_validators_task, vec![]),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Return the validators
|
||||||
|
validators
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ContinuallyRan for UpdateValidatorsTask {
|
||||||
|
// Only run every minute, not the default of every five seconds
|
||||||
|
const DELAY_BETWEEN_ITERATIONS: u64 = 60;
|
||||||
|
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60;
|
||||||
|
|
||||||
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||||
|
async move {
|
||||||
|
let session_changes = {
|
||||||
|
let validators = self.validators.read().await;
|
||||||
|
Validators::session_changes(validators.serai.clone(), validators.sessions.clone())
|
||||||
|
.await
|
||||||
|
.map_err(|e| format!("{e:?}"))?
|
||||||
|
};
|
||||||
|
self.validators.write().await.incorporate_session_changes(session_changes);
|
||||||
|
Ok(true)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -1,94 +1,31 @@
|
|||||||
use core::future::Future;
|
use core::future::Future;
|
||||||
use std::{
|
|
||||||
sync::Arc,
|
|
||||||
collections::{HashSet, HashMap},
|
|
||||||
};
|
|
||||||
|
|
||||||
use serai_client::primitives::{NetworkId, PublicKey};
|
use borsh::{BorshSerialize, BorshDeserialize};
|
||||||
|
|
||||||
use tokio::sync::RwLock;
|
use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet};
|
||||||
|
|
||||||
use serai_task::ContinuallyRan;
|
/// The libp2p-backed P2p network
|
||||||
|
mod libp2p;
|
||||||
use libp2p::{multihash::Multihash, identity::PeerId, swarm::NetworkBehaviour};
|
|
||||||
|
|
||||||
/// A struct to sync the validators from the Serai node in order to keep track of them.
|
|
||||||
mod validators;
|
|
||||||
use validators::{Validators, update_shared_validators};
|
|
||||||
|
|
||||||
/// The authentication protocol upgrade to limit the P2P network to active validators.
|
|
||||||
mod authenticate;
|
|
||||||
|
|
||||||
/// The dial task, to find new peers to connect to
|
|
||||||
mod dial;
|
|
||||||
|
|
||||||
/// The request-response messages and behavior
|
|
||||||
mod reqres;
|
|
||||||
use reqres::{Request, Response};
|
|
||||||
|
|
||||||
/// The gossip messages and behavior
|
|
||||||
mod gossip;
|
|
||||||
|
|
||||||
/// The heartbeat task, effecting sync of Tributaries
|
/// The heartbeat task, effecting sync of Tributaries
|
||||||
mod heartbeat;
|
mod heartbeat;
|
||||||
|
|
||||||
/// The swarm task, running it and dispatching to/from it
|
/// A tributary block and its commit.
|
||||||
mod swarm;
|
#[derive(Clone, BorshSerialize, BorshDeserialize)]
|
||||||
|
pub(crate) struct TributaryBlockWithCommit {
|
||||||
const PORT: u16 = 30563; // 5132 ^ (('c' << 8) | 'o')
|
pub(crate) block: Vec<u8>,
|
||||||
|
pub(crate) commit: Vec<u8>,
|
||||||
fn peer_id_from_public(public: PublicKey) -> PeerId {
|
|
||||||
// 0 represents the identity Multihash, that no hash was performed
|
|
||||||
// It's an internal constant so we can't refer to the constant inside libp2p
|
|
||||||
PeerId::from_multihash(Multihash::wrap(0, &public.0).unwrap()).unwrap()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
struct Peer;
|
trait Peer<'a>: Send {
|
||||||
impl Peer {
|
fn send_heartbeat(
|
||||||
async fn send(&self, request: Request) -> Result<Response, tokio::time::error::Elapsed> {
|
&self,
|
||||||
(async move { todo!("TODO") }).await
|
set: ValidatorSet,
|
||||||
}
|
latest_block_hash: [u8; 32],
|
||||||
|
) -> impl Send + Future<Output = Option<Vec<TributaryBlockWithCommit>>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
trait P2p: Send + Sync + tributary::P2p {
|
||||||
struct Peers {
|
type Peer<'a>: Peer<'a>;
|
||||||
peers: Arc<RwLock<HashMap<NetworkId, HashSet<PeerId>>>>,
|
fn peers(&self, network: NetworkId) -> impl Send + Future<Output = Vec<Self::Peer<'_>>>;
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
|
||||||
struct P2p;
|
|
||||||
impl P2p {
|
|
||||||
async fn peers(&self, set: NetworkId) -> Vec<Peer> {
|
|
||||||
(async move { todo!("TODO") }).await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl tributary::P2p for P2p {
|
|
||||||
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) {
|
|
||||||
todo!("TODO")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(NetworkBehaviour)]
|
|
||||||
struct Behavior {
|
|
||||||
reqres: reqres::Behavior,
|
|
||||||
gossip: gossip::Behavior,
|
|
||||||
}
|
|
||||||
|
|
||||||
struct UpdateSharedValidatorsTask {
|
|
||||||
validators: Arc<RwLock<Validators>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ContinuallyRan for UpdateSharedValidatorsTask {
|
|
||||||
// Only run every minute, not the default of every five seconds
|
|
||||||
const DELAY_BETWEEN_ITERATIONS: u64 = 60;
|
|
||||||
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60;
|
|
||||||
|
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
|
||||||
async move {
|
|
||||||
update_shared_validators(&self.validators).await.map_err(|e| format!("{e:?}"))?;
|
|
||||||
Ok(true)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,7 +16,6 @@ rustdoc-args = ["--cfg", "docsrs"]
|
|||||||
workspace = true
|
workspace = true
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
async-trait = { version = "0.1", default-features = false }
|
|
||||||
thiserror = { version = "2", default-features = false, features = ["std"] }
|
thiserror = { version = "2", default-features = false, features = ["std"] }
|
||||||
|
|
||||||
subtle = { version = "^2", default-features = false, features = ["std"] }
|
subtle = { version = "^2", default-features = false, features = ["std"] }
|
||||||
|
|||||||
@@ -1,8 +1,6 @@
|
|||||||
use core::{marker::PhantomData, fmt::Debug};
|
use core::{marker::PhantomData, fmt::Debug, future::Future};
|
||||||
use std::{sync::Arc, io};
|
use std::{sync::Arc, io};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
|
||||||
|
|
||||||
use zeroize::Zeroizing;
|
use zeroize::Zeroizing;
|
||||||
|
|
||||||
use ciphersuite::{Ciphersuite, Ristretto};
|
use ciphersuite::{Ciphersuite, Ristretto};
|
||||||
@@ -131,20 +129,18 @@ pub trait ReadWrite: Sized {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
pub trait P2p: 'static + Send + Sync + Clone {
|
||||||
pub trait P2p: 'static + Send + Sync + Clone + Debug {
|
|
||||||
/// Broadcast a message to all other members of the Tributary with the specified genesis.
|
/// Broadcast a message to all other members of the Tributary with the specified genesis.
|
||||||
///
|
///
|
||||||
/// The Tributary will re-broadcast consensus messages on a fixed interval to ensure they aren't
|
/// The Tributary will re-broadcast consensus messages on a fixed interval to ensure they aren't
|
||||||
/// prematurely dropped from the P2P layer. THe P2P layer SHOULD perform content-based
|
/// prematurely dropped from the P2P layer. THe P2P layer SHOULD perform content-based
|
||||||
/// deduplication to ensure a sane amount of load.
|
/// deduplication to ensure a sane amount of load.
|
||||||
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>);
|
fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) -> impl Send + Future<Output = ()>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl<P: P2p> P2p for Arc<P> {
|
impl<P: P2p> P2p for Arc<P> {
|
||||||
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) {
|
fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) -> impl Send + Future<Output = ()> {
|
||||||
(*self).broadcast(genesis, msg).await
|
P::broadcast(self, genesis, msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,8 +1,6 @@
|
|||||||
use core::ops::Deref;
|
use core::{ops::Deref, future::Future};
|
||||||
use std::{sync::Arc, collections::HashMap};
|
use std::{sync::Arc, collections::HashMap};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
|
||||||
|
|
||||||
use subtle::ConstantTimeEq;
|
use subtle::ConstantTimeEq;
|
||||||
use zeroize::{Zeroize, Zeroizing};
|
use zeroize::{Zeroize, Zeroizing};
|
||||||
|
|
||||||
@@ -74,50 +72,52 @@ impl Signer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl SignerTrait for Signer {
|
impl SignerTrait for Signer {
|
||||||
type ValidatorId = [u8; 32];
|
type ValidatorId = [u8; 32];
|
||||||
type Signature = [u8; 64];
|
type Signature = [u8; 64];
|
||||||
|
|
||||||
/// Returns the validator's current ID. Returns None if they aren't a current validator.
|
/// Returns the validator's current ID. Returns None if they aren't a current validator.
|
||||||
async fn validator_id(&self) -> Option<Self::ValidatorId> {
|
fn validator_id(&self) -> impl Send + Future<Output = Option<Self::ValidatorId>> {
|
||||||
Some((Ristretto::generator() * self.key.deref()).to_bytes())
|
async move { Some((Ristretto::generator() * self.key.deref()).to_bytes()) }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sign a signature with the current validator's private key.
|
/// Sign a signature with the current validator's private key.
|
||||||
async fn sign(&self, msg: &[u8]) -> Self::Signature {
|
fn sign(&self, msg: &[u8]) -> impl Send + Future<Output = Self::Signature> {
|
||||||
let mut nonce = Zeroizing::new(RecommendedTranscript::new(b"Tributary Chain Tendermint Nonce"));
|
async move {
|
||||||
nonce.append_message(b"genesis", self.genesis);
|
let mut nonce =
|
||||||
nonce.append_message(b"key", Zeroizing::new(self.key.deref().to_repr()).as_ref());
|
Zeroizing::new(RecommendedTranscript::new(b"Tributary Chain Tendermint Nonce"));
|
||||||
nonce.append_message(b"message", msg);
|
nonce.append_message(b"genesis", self.genesis);
|
||||||
let mut nonce = nonce.challenge(b"nonce");
|
nonce.append_message(b"key", Zeroizing::new(self.key.deref().to_repr()).as_ref());
|
||||||
|
nonce.append_message(b"message", msg);
|
||||||
|
let mut nonce = nonce.challenge(b"nonce");
|
||||||
|
|
||||||
let mut nonce_arr = [0; 64];
|
let mut nonce_arr = [0; 64];
|
||||||
nonce_arr.copy_from_slice(nonce.as_ref());
|
nonce_arr.copy_from_slice(nonce.as_ref());
|
||||||
|
|
||||||
let nonce_ref: &mut [u8] = nonce.as_mut();
|
let nonce_ref: &mut [u8] = nonce.as_mut();
|
||||||
nonce_ref.zeroize();
|
nonce_ref.zeroize();
|
||||||
let nonce_ref: &[u8] = nonce.as_ref();
|
let nonce_ref: &[u8] = nonce.as_ref();
|
||||||
assert_eq!(nonce_ref, [0; 64].as_ref());
|
assert_eq!(nonce_ref, [0; 64].as_ref());
|
||||||
|
|
||||||
let nonce =
|
let nonce =
|
||||||
Zeroizing::new(<Ristretto as Ciphersuite>::F::from_bytes_mod_order_wide(&nonce_arr));
|
Zeroizing::new(<Ristretto as Ciphersuite>::F::from_bytes_mod_order_wide(&nonce_arr));
|
||||||
nonce_arr.zeroize();
|
nonce_arr.zeroize();
|
||||||
|
|
||||||
assert!(!bool::from(nonce.ct_eq(&<Ristretto as Ciphersuite>::F::ZERO)));
|
assert!(!bool::from(nonce.ct_eq(&<Ristretto as Ciphersuite>::F::ZERO)));
|
||||||
|
|
||||||
let challenge = challenge(
|
let challenge = challenge(
|
||||||
self.genesis,
|
self.genesis,
|
||||||
(Ristretto::generator() * self.key.deref()).to_bytes(),
|
(Ristretto::generator() * self.key.deref()).to_bytes(),
|
||||||
(Ristretto::generator() * nonce.deref()).to_bytes().as_ref(),
|
(Ristretto::generator() * nonce.deref()).to_bytes().as_ref(),
|
||||||
msg,
|
msg,
|
||||||
);
|
);
|
||||||
|
|
||||||
let sig = SchnorrSignature::<Ristretto>::sign(&self.key, nonce, challenge).serialize();
|
let sig = SchnorrSignature::<Ristretto>::sign(&self.key, nonce, challenge).serialize();
|
||||||
|
|
||||||
let mut res = [0; 64];
|
let mut res = [0; 64];
|
||||||
res.copy_from_slice(&sig);
|
res.copy_from_slice(&sig);
|
||||||
res
|
res
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -274,7 +274,6 @@ pub const BLOCK_PROCESSING_TIME: u32 = 999;
|
|||||||
pub const LATENCY_TIME: u32 = 1667;
|
pub const LATENCY_TIME: u32 = 1667;
|
||||||
pub const TARGET_BLOCK_TIME: u32 = BLOCK_PROCESSING_TIME + (3 * LATENCY_TIME);
|
pub const TARGET_BLOCK_TIME: u32 = BLOCK_PROCESSING_TIME + (3 * LATENCY_TIME);
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P> {
|
impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P> {
|
||||||
type Db = D;
|
type Db = D;
|
||||||
|
|
||||||
@@ -300,111 +299,126 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
|
|||||||
self.validators.clone()
|
self.validators.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn broadcast(&mut self, msg: SignedMessageFor<Self>) {
|
fn broadcast(&mut self, msg: SignedMessageFor<Self>) -> impl Send + Future<Output = ()> {
|
||||||
let mut to_broadcast = vec![TENDERMINT_MESSAGE];
|
async move {
|
||||||
to_broadcast.extend(msg.encode());
|
let mut to_broadcast = vec![TENDERMINT_MESSAGE];
|
||||||
self.p2p.broadcast(self.genesis, to_broadcast).await
|
to_broadcast.extend(msg.encode());
|
||||||
}
|
self.p2p.broadcast(self.genesis, to_broadcast).await
|
||||||
|
|
||||||
async fn slash(&mut self, validator: Self::ValidatorId, slash_event: SlashEvent) {
|
|
||||||
log::error!(
|
|
||||||
"validator {} triggered a slash event on tributary {} (with evidence: {})",
|
|
||||||
hex::encode(validator),
|
|
||||||
hex::encode(self.genesis),
|
|
||||||
matches!(slash_event, SlashEvent::WithEvidence(_)),
|
|
||||||
);
|
|
||||||
|
|
||||||
let signer = self.signer();
|
|
||||||
let Some(tx) = (match slash_event {
|
|
||||||
SlashEvent::WithEvidence(evidence) => {
|
|
||||||
// create an unsigned evidence tx
|
|
||||||
Some(TendermintTx::SlashEvidence(evidence))
|
|
||||||
}
|
|
||||||
SlashEvent::Id(_reason, _block, _round) => {
|
|
||||||
// TODO: Increase locally observed slash points
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}) else {
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
|
|
||||||
// add tx to blockchain and broadcast to peers
|
|
||||||
let mut to_broadcast = vec![TRANSACTION_MESSAGE];
|
|
||||||
tx.write(&mut to_broadcast).unwrap();
|
|
||||||
if self.blockchain.write().await.add_transaction::<Self>(
|
|
||||||
true,
|
|
||||||
Transaction::Tendermint(tx),
|
|
||||||
&self.signature_scheme(),
|
|
||||||
) == Ok(true)
|
|
||||||
{
|
|
||||||
self.p2p.broadcast(signer.genesis, to_broadcast).await;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn validate(&self, block: &Self::Block) -> Result<(), TendermintBlockError> {
|
fn slash(
|
||||||
let block =
|
&mut self,
|
||||||
Block::read::<&[u8]>(&mut block.0.as_ref()).map_err(|_| TendermintBlockError::Fatal)?;
|
validator: Self::ValidatorId,
|
||||||
self
|
slash_event: SlashEvent,
|
||||||
.blockchain
|
) -> impl Send + Future<Output = ()> {
|
||||||
.read()
|
async move {
|
||||||
.await
|
log::error!(
|
||||||
.verify_block::<Self>(&block, &self.signature_scheme(), false)
|
"validator {} triggered a slash event on tributary {} (with evidence: {})",
|
||||||
.map_err(|e| match e {
|
hex::encode(validator),
|
||||||
BlockError::NonLocalProvided(_) => TendermintBlockError::Temporal,
|
hex::encode(self.genesis),
|
||||||
_ => {
|
matches!(slash_event, SlashEvent::WithEvidence(_)),
|
||||||
log::warn!("Tributary Tendermint validate returning BlockError::Fatal due to {e:?}");
|
);
|
||||||
TendermintBlockError::Fatal
|
|
||||||
|
let signer = self.signer();
|
||||||
|
let Some(tx) = (match slash_event {
|
||||||
|
SlashEvent::WithEvidence(evidence) => {
|
||||||
|
// create an unsigned evidence tx
|
||||||
|
Some(TendermintTx::SlashEvidence(evidence))
|
||||||
}
|
}
|
||||||
})
|
SlashEvent::Id(_reason, _block, _round) => {
|
||||||
|
// TODO: Increase locally observed slash points
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}) else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
// add tx to blockchain and broadcast to peers
|
||||||
|
let mut to_broadcast = vec![TRANSACTION_MESSAGE];
|
||||||
|
tx.write(&mut to_broadcast).unwrap();
|
||||||
|
if self.blockchain.write().await.add_transaction::<Self>(
|
||||||
|
true,
|
||||||
|
Transaction::Tendermint(tx),
|
||||||
|
&self.signature_scheme(),
|
||||||
|
) == Ok(true)
|
||||||
|
{
|
||||||
|
self.p2p.broadcast(signer.genesis, to_broadcast).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn add_block(
|
fn validate(
|
||||||
|
&self,
|
||||||
|
block: &Self::Block,
|
||||||
|
) -> impl Send + Future<Output = Result<(), TendermintBlockError>> {
|
||||||
|
async move {
|
||||||
|
let block =
|
||||||
|
Block::read::<&[u8]>(&mut block.0.as_ref()).map_err(|_| TendermintBlockError::Fatal)?;
|
||||||
|
self
|
||||||
|
.blockchain
|
||||||
|
.read()
|
||||||
|
.await
|
||||||
|
.verify_block::<Self>(&block, &self.signature_scheme(), false)
|
||||||
|
.map_err(|e| match e {
|
||||||
|
BlockError::NonLocalProvided(_) => TendermintBlockError::Temporal,
|
||||||
|
_ => {
|
||||||
|
log::warn!("Tributary Tendermint validate returning BlockError::Fatal due to {e:?}");
|
||||||
|
TendermintBlockError::Fatal
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_block(
|
||||||
&mut self,
|
&mut self,
|
||||||
serialized_block: Self::Block,
|
serialized_block: Self::Block,
|
||||||
commit: Commit<Self::SignatureScheme>,
|
commit: Commit<Self::SignatureScheme>,
|
||||||
) -> Option<Self::Block> {
|
) -> impl Send + Future<Output = Option<Self::Block>> {
|
||||||
let invalid_block = || {
|
async move {
|
||||||
// There's a fatal flaw in the code, it's behind a hard fork, or the validators turned
|
let invalid_block = || {
|
||||||
// malicious
|
// There's a fatal flaw in the code, it's behind a hard fork, or the validators turned
|
||||||
// All justify a halt to then achieve social consensus from
|
// malicious
|
||||||
// TODO: Under multiple validator sets, a small validator set turning malicious knocks
|
// All justify a halt to then achieve social consensus from
|
||||||
// off the entire network. That's an unacceptable DoS.
|
// TODO: Under multiple validator sets, a small validator set turning malicious knocks
|
||||||
panic!("validators added invalid block to tributary {}", hex::encode(self.genesis));
|
// off the entire network. That's an unacceptable DoS.
|
||||||
};
|
panic!("validators added invalid block to tributary {}", hex::encode(self.genesis));
|
||||||
|
};
|
||||||
|
|
||||||
// Tendermint should only produce valid commits
|
// Tendermint should only produce valid commits
|
||||||
assert!(self.verify_commit(serialized_block.id(), &commit));
|
assert!(self.verify_commit(serialized_block.id(), &commit));
|
||||||
|
|
||||||
let Ok(block) = Block::read::<&[u8]>(&mut serialized_block.0.as_ref()) else {
|
let Ok(block) = Block::read::<&[u8]>(&mut serialized_block.0.as_ref()) else {
|
||||||
return invalid_block();
|
return invalid_block();
|
||||||
};
|
};
|
||||||
|
|
||||||
let encoded_commit = commit.encode();
|
let encoded_commit = commit.encode();
|
||||||
loop {
|
loop {
|
||||||
let block_res = self.blockchain.write().await.add_block::<Self>(
|
let block_res = self.blockchain.write().await.add_block::<Self>(
|
||||||
&block,
|
&block,
|
||||||
encoded_commit.clone(),
|
encoded_commit.clone(),
|
||||||
&self.signature_scheme(),
|
&self.signature_scheme(),
|
||||||
);
|
);
|
||||||
match block_res {
|
match block_res {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
// If we successfully added this block, break
|
// If we successfully added this block, break
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
|
Err(BlockError::NonLocalProvided(hash)) => {
|
||||||
|
log::error!(
|
||||||
|
"missing provided transaction {} which other validators on tributary {} had",
|
||||||
|
hex::encode(hash),
|
||||||
|
hex::encode(self.genesis)
|
||||||
|
);
|
||||||
|
tokio::time::sleep(core::time::Duration::from_secs(5)).await;
|
||||||
|
}
|
||||||
|
_ => return invalid_block(),
|
||||||
}
|
}
|
||||||
Err(BlockError::NonLocalProvided(hash)) => {
|
|
||||||
log::error!(
|
|
||||||
"missing provided transaction {} which other validators on tributary {} had",
|
|
||||||
hex::encode(hash),
|
|
||||||
hex::encode(self.genesis)
|
|
||||||
);
|
|
||||||
tokio::time::sleep(core::time::Duration::from_secs(5)).await;
|
|
||||||
}
|
|
||||||
_ => return invalid_block(),
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
Some(TendermintBlock(
|
Some(TendermintBlock(
|
||||||
self.blockchain.write().await.build_block::<Self>(&self.signature_scheme()).serialize(),
|
self.blockchain.write().await.build_block::<Self>(&self.signature_scheme()).serialize(),
|
||||||
))
|
))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,11 +1,12 @@
|
|||||||
|
use core::future::Future;
|
||||||
|
|
||||||
pub use crate::P2p;
|
pub use crate::P2p;
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct DummyP2p;
|
pub struct DummyP2p;
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl P2p for DummyP2p {
|
impl P2p for DummyP2p {
|
||||||
async fn broadcast(&self, _: [u8; 32], _: Vec<u8>) {
|
fn broadcast(&self, _: [u8; 32], _: Vec<u8>) -> impl Send + Future<Output = ()> {
|
||||||
unimplemented!()
|
async move { unimplemented!() }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,7 @@
|
|||||||
|
use core::future::Future;
|
||||||
|
|
||||||
use tendermint::ext::Network;
|
use tendermint::ext::Network;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
P2p, TendermintTx,
|
P2p, TendermintTx,
|
||||||
tendermint::{TARGET_BLOCK_TIME, TendermintNetwork},
|
tendermint::{TARGET_BLOCK_TIME, TendermintNetwork},
|
||||||
@@ -11,10 +14,9 @@ fn assert_target_block_time() {
|
|||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct DummyP2p;
|
pub struct DummyP2p;
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl P2p for DummyP2p {
|
impl P2p for DummyP2p {
|
||||||
async fn broadcast(&self, _: [u8; 32], _: Vec<u8>) {
|
fn broadcast(&self, _: [u8; 32], _: Vec<u8>) -> impl Send + Future<Output = ()> {
|
||||||
unimplemented!()
|
async move { unimplemented!() }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -16,7 +16,6 @@ rustdoc-args = ["--cfg", "docsrs"]
|
|||||||
workspace = true
|
workspace = true
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
async-trait = { version = "0.1", default-features = false }
|
|
||||||
thiserror = { version = "2", default-features = false, features = ["std"] }
|
thiserror = { version = "2", default-features = false, features = ["std"] }
|
||||||
|
|
||||||
hex = { version = "0.4", default-features = false, features = ["std"] }
|
hex = { version = "0.4", default-features = false, features = ["std"] }
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
use core::{hash::Hash, fmt::Debug};
|
use core::{hash::Hash, fmt::Debug, future::Future};
|
||||||
use std::{sync::Arc, collections::HashSet};
|
use std::{sync::Arc, collections::HashSet};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
use parity_scale_codec::{Encode, Decode};
|
use parity_scale_codec::{Encode, Decode};
|
||||||
@@ -34,7 +33,6 @@ pub struct BlockNumber(pub u64);
|
|||||||
pub struct RoundNumber(pub u32);
|
pub struct RoundNumber(pub u32);
|
||||||
|
|
||||||
/// A signer for a validator.
|
/// A signer for a validator.
|
||||||
#[async_trait]
|
|
||||||
pub trait Signer: Send + Sync {
|
pub trait Signer: Send + Sync {
|
||||||
// Type used to identify validators.
|
// Type used to identify validators.
|
||||||
type ValidatorId: ValidatorId;
|
type ValidatorId: ValidatorId;
|
||||||
@@ -42,22 +40,21 @@ pub trait Signer: Send + Sync {
|
|||||||
type Signature: Signature;
|
type Signature: Signature;
|
||||||
|
|
||||||
/// Returns the validator's current ID. Returns None if they aren't a current validator.
|
/// Returns the validator's current ID. Returns None if they aren't a current validator.
|
||||||
async fn validator_id(&self) -> Option<Self::ValidatorId>;
|
fn validator_id(&self) -> impl Send + Future<Output = Option<Self::ValidatorId>>;
|
||||||
/// Sign a signature with the current validator's private key.
|
/// Sign a signature with the current validator's private key.
|
||||||
async fn sign(&self, msg: &[u8]) -> Self::Signature;
|
fn sign(&self, msg: &[u8]) -> impl Send + Future<Output = Self::Signature>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl<S: Signer> Signer for Arc<S> {
|
impl<S: Signer> Signer for Arc<S> {
|
||||||
type ValidatorId = S::ValidatorId;
|
type ValidatorId = S::ValidatorId;
|
||||||
type Signature = S::Signature;
|
type Signature = S::Signature;
|
||||||
|
|
||||||
async fn validator_id(&self) -> Option<Self::ValidatorId> {
|
fn validator_id(&self) -> impl Send + Future<Output = Option<Self::ValidatorId>> {
|
||||||
self.as_ref().validator_id().await
|
self.as_ref().validator_id()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn sign(&self, msg: &[u8]) -> Self::Signature {
|
fn sign(&self, msg: &[u8]) -> impl Send + Future<Output = Self::Signature> {
|
||||||
self.as_ref().sign(msg).await
|
self.as_ref().sign(msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -210,7 +207,6 @@ pub trait Block: Send + Sync + Clone + PartialEq + Eq + Debug + Encode + Decode
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Trait representing the distributed system Tendermint is providing consensus over.
|
/// Trait representing the distributed system Tendermint is providing consensus over.
|
||||||
#[async_trait]
|
|
||||||
pub trait Network: Sized + Send + Sync {
|
pub trait Network: Sized + Send + Sync {
|
||||||
/// The database used to back this.
|
/// The database used to back this.
|
||||||
type Db: serai_db::Db;
|
type Db: serai_db::Db;
|
||||||
@@ -229,6 +225,7 @@ pub trait Network: Sized + Send + Sync {
|
|||||||
/// This should include both the time to download the block and the actual processing time.
|
/// This should include both the time to download the block and the actual processing time.
|
||||||
///
|
///
|
||||||
/// BLOCK_PROCESSING_TIME + (3 * LATENCY_TIME) must be divisible by 1000.
|
/// BLOCK_PROCESSING_TIME + (3 * LATENCY_TIME) must be divisible by 1000.
|
||||||
|
// TODO: Redefine as Duration
|
||||||
const BLOCK_PROCESSING_TIME: u32;
|
const BLOCK_PROCESSING_TIME: u32;
|
||||||
/// Network latency time in milliseconds.
|
/// Network latency time in milliseconds.
|
||||||
///
|
///
|
||||||
@@ -280,15 +277,19 @@ pub trait Network: Sized + Send + Sync {
|
|||||||
/// Switching to unauthenticated channels in a system already providing authenticated channels is
|
/// Switching to unauthenticated channels in a system already providing authenticated channels is
|
||||||
/// not recommended as this is a minor, temporal inefficiency, while downgrading channels may
|
/// not recommended as this is a minor, temporal inefficiency, while downgrading channels may
|
||||||
/// have wider implications.
|
/// have wider implications.
|
||||||
async fn broadcast(&mut self, msg: SignedMessageFor<Self>);
|
fn broadcast(&mut self, msg: SignedMessageFor<Self>) -> impl Send + Future<Output = ()>;
|
||||||
|
|
||||||
/// Trigger a slash for the validator in question who was definitively malicious.
|
/// Trigger a slash for the validator in question who was definitively malicious.
|
||||||
///
|
///
|
||||||
/// The exact process of triggering a slash is undefined and left to the network as a whole.
|
/// The exact process of triggering a slash is undefined and left to the network as a whole.
|
||||||
async fn slash(&mut self, validator: Self::ValidatorId, slash_event: SlashEvent);
|
fn slash(
|
||||||
|
&mut self,
|
||||||
|
validator: Self::ValidatorId,
|
||||||
|
slash_event: SlashEvent,
|
||||||
|
) -> impl Send + Future<Output = ()>;
|
||||||
|
|
||||||
/// Validate a block.
|
/// Validate a block.
|
||||||
async fn validate(&self, block: &Self::Block) -> Result<(), BlockError>;
|
fn validate(&self, block: &Self::Block) -> impl Send + Future<Output = Result<(), BlockError>>;
|
||||||
|
|
||||||
/// Add a block, returning the proposal for the next one.
|
/// Add a block, returning the proposal for the next one.
|
||||||
///
|
///
|
||||||
@@ -298,9 +299,9 @@ pub trait Network: Sized + Send + Sync {
|
|||||||
/// This deviates from the paper which will have a local node refuse to decide on a block it
|
/// This deviates from the paper which will have a local node refuse to decide on a block it
|
||||||
/// considers invalid. This library acknowledges the network did decide on it, leaving handling
|
/// considers invalid. This library acknowledges the network did decide on it, leaving handling
|
||||||
/// of it to the network, and outside of this scope.
|
/// of it to the network, and outside of this scope.
|
||||||
async fn add_block(
|
fn add_block(
|
||||||
&mut self,
|
&mut self,
|
||||||
block: Self::Block,
|
block: Self::Block,
|
||||||
commit: Commit<Self::SignatureScheme>,
|
commit: Commit<Self::SignatureScheme>,
|
||||||
) -> Option<Self::Block>;
|
) -> impl Send + Future<Output = Option<Self::Block>>;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,10 +1,9 @@
|
|||||||
|
use core::future::Future;
|
||||||
use std::{
|
use std::{
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
time::{UNIX_EPOCH, SystemTime, Duration},
|
time::{UNIX_EPOCH, SystemTime, Duration},
|
||||||
};
|
};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
|
||||||
|
|
||||||
use parity_scale_codec::{Encode, Decode};
|
use parity_scale_codec::{Encode, Decode};
|
||||||
|
|
||||||
use futures_util::sink::SinkExt;
|
use futures_util::sink::SinkExt;
|
||||||
@@ -21,20 +20,21 @@ type TestValidatorId = u16;
|
|||||||
type TestBlockId = [u8; 4];
|
type TestBlockId = [u8; 4];
|
||||||
|
|
||||||
struct TestSigner(u16);
|
struct TestSigner(u16);
|
||||||
#[async_trait]
|
|
||||||
impl Signer for TestSigner {
|
impl Signer for TestSigner {
|
||||||
type ValidatorId = TestValidatorId;
|
type ValidatorId = TestValidatorId;
|
||||||
type Signature = [u8; 32];
|
type Signature = [u8; 32];
|
||||||
|
|
||||||
async fn validator_id(&self) -> Option<TestValidatorId> {
|
fn validator_id(&self) -> impl Send + Future<Output = Option<TestValidatorId>> {
|
||||||
Some(self.0)
|
async move { Some(self.0) }
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn sign(&self, msg: &[u8]) -> [u8; 32] {
|
fn sign(&self, msg: &[u8]) -> impl Send + Future<Output = [u8; 32]> {
|
||||||
let mut sig = [0; 32];
|
async move {
|
||||||
sig[.. 2].copy_from_slice(&self.0.to_le_bytes());
|
let mut sig = [0; 32];
|
||||||
sig[2 .. (2 + 30.min(msg.len()))].copy_from_slice(&msg[.. 30.min(msg.len())]);
|
sig[.. 2].copy_from_slice(&self.0.to_le_bytes());
|
||||||
sig
|
sig[2 .. (2 + 30.min(msg.len()))].copy_from_slice(&msg[.. 30.min(msg.len())]);
|
||||||
|
sig
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -111,7 +111,6 @@ struct TestNetwork(
|
|||||||
Arc<RwLock<Vec<(MessageSender<Self>, SyncedBlockSender<Self>, SyncedBlockResultReceiver)>>>,
|
Arc<RwLock<Vec<(MessageSender<Self>, SyncedBlockSender<Self>, SyncedBlockResultReceiver)>>>,
|
||||||
);
|
);
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl Network for TestNetwork {
|
impl Network for TestNetwork {
|
||||||
type Db = MemDb;
|
type Db = MemDb;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user