Remove Tendermint for GRANDPA

Updates to polkadot-v0.9.40, with a variety of dependency updates accordingly.
Substrate thankfully now uses k256 0.13, pathing the way for #256. We couldn't
upgrade to polkadot-v0.9.40 without this due to polkadot-v0.9.40 having
fundamental changes to syncing. While we could've updated tendermint, it's not
worth the continued development effort given its inability to work with
multiple validator sets.

Purges sc-tendermint. Keeps tendermint-machine for #163.

Closes #137, #148, #157, #171. #96 and #99 should be re-scoped/clarified. #134
and #159 also should be clarified. #169 is also no longer a priority since
we're only considering temporal deployments of tendermint. #170 also isn't
since we're looking at effectively sharded validator sets, so there should
be no singular large set needing high performance.
This commit is contained in:
Luke Parker
2023-03-26 08:43:01 -04:00
parent 534e1bb11d
commit aea6ac104f
42 changed files with 1089 additions and 2143 deletions

24
tendermint/Cargo.toml Normal file
View File

@@ -0,0 +1,24 @@
[package]
name = "tendermint-machine"
version = "0.2.0"
description = "An implementation of the Tendermint state machine in Rust"
license = "MIT"
repository = "https://github.com/serai-dex/serai/tree/develop/tendermint"
authors = ["Luke Parker <lukeparker5132@gmail.com>"]
edition = "2021"
[dependencies]
async-trait = "0.1"
thiserror = "1"
log = "0.4"
parity-scale-codec = { version = "3", features = ["derive"] }
futures = "0.3"
tokio = { version = "1", features = ["macros", "sync", "time", "rt"] }
sp-runtime = { git = "https://github.com/serai-dex/substrate", version = "7.0.0", optional = true }
[features]
substrate = ["sp-runtime"]

21
tendermint/LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2022-2023 Luke Parker
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

61
tendermint/README.md Normal file
View File

@@ -0,0 +1,61 @@
# Tendermint
An implementation of the Tendermint state machine in Rust.
This is solely the state machine, intended to be mapped to any arbitrary system.
It supports an arbitrary signature scheme, weighting, and block definition
accordingly. It is not intended to work with the Cosmos SDK, solely to be an
implementation of the [academic protocol](https://arxiv.org/pdf/1807.04938.pdf).
### Caveats
- Only SCALE serialization is supported currently. Ideally, everything from
SCALE to borsh to bincode would be supported. SCALE was chosen due to this
being under Serai, which uses Substrate, which uses SCALE. Accordingly, when
deciding which of the three (mutually incompatible) options to support...
- The only supported runtime is tokio due to requiring a `sleep` implementation.
Ideally, the runtime choice will be moved to a feature in the future.
- It is possible for `add_block` to be called on a block which failed (or never
went through in the first place) validation. This is a break from the paper
which is accepted here. This is for two reasons.
1) Serai needing this functionality.
2) If a block is committed which is invalid, either there's a malicious
majority now defining consensus OR the local node is malicious by virtue of
being faulty. Considering how either represents a fatal circumstance,
except with regards to system like Serai which have their own logic for
pseudo-valid blocks, it is accepted as a possible behavior with the caveat
any consumers must be aware of it. No machine will vote nor precommit to a
block it considers invalid, so for a network with an honest majority, this
is a non-issue.
### Paper
The [paper](https://arxiv.org/abs/1807.04938) describes the algorithm with
pseudocode on page 6. This pseudocode isn't directly implementable, nor does it
specify faulty behavior. Instead, it's solely a series of conditions which
trigger events in order to successfully achieve consensus.
The included pseudocode segments can be minimally described as follows:
```
01-09 Init
10-10 StartRound(0)
11-21 StartRound
22-27 Fresh proposal
28-33 Proposal building off a valid round with prevotes
34-35 2f+1 prevote -> schedule timeout prevote
36-43 First proposal with prevotes -> precommit Some
44-46 2f+1 nil prevote -> precommit nil
47-48 2f+1 precommit -> schedule timeout precommit
49-54 First proposal with precommits -> finalize
55-56 f+1 round > local round, jump
57-60 on timeout propose
61-64 on timeout prevote
65-67 on timeout precommit
```
The corresponding Rust code implementing these tasks are marked with their
related line numbers.

139
tendermint/src/block.rs Normal file
View File

@@ -0,0 +1,139 @@
use std::{
sync::Arc,
collections::{HashSet, HashMap},
};
use crate::{
time::CanonicalInstant,
ext::{RoundNumber, BlockNumber, Block, Network},
round::RoundData,
message_log::MessageLog,
Step, Data, DataFor, Message, MessageFor,
};
pub(crate) struct BlockData<N: Network> {
pub(crate) number: BlockNumber,
pub(crate) validator_id: Option<N::ValidatorId>,
pub(crate) proposal: Option<N::Block>,
pub(crate) log: MessageLog<N>,
pub(crate) slashes: HashSet<N::ValidatorId>,
// We track the end times of each round for two reasons:
// 1) Knowing the start time of the next round
// 2) Validating precommits, which include the end time of the round which produced it
// This HashMap contains the end time of the round we're currently in and every round prior
pub(crate) end_time: HashMap<RoundNumber, CanonicalInstant>,
pub(crate) round: Option<RoundData<N>>,
pub(crate) locked: Option<(RoundNumber, <N::Block as Block>::Id)>,
pub(crate) valid: Option<(RoundNumber, N::Block)>,
}
impl<N: Network> BlockData<N> {
pub(crate) fn new(
weights: Arc<N::Weights>,
number: BlockNumber,
validator_id: Option<N::ValidatorId>,
proposal: Option<N::Block>,
) -> BlockData<N> {
BlockData {
number,
validator_id,
proposal,
log: MessageLog::new(weights),
slashes: HashSet::new(),
end_time: HashMap::new(),
// The caller of BlockData::new is expected to be populated after by the caller
round: None,
locked: None,
valid: None,
}
}
pub(crate) fn round(&self) -> &RoundData<N> {
self.round.as_ref().unwrap()
}
pub(crate) fn round_mut(&mut self) -> &mut RoundData<N> {
self.round.as_mut().unwrap()
}
// Populate the end time up to the specified round
// This is generally used when moving to the next round, where this will only populate one time,
// yet is also used when jumping rounds (when 33% of the validators are on a round ahead of us)
pub(crate) fn populate_end_time(&mut self, round: RoundNumber) {
// Starts from the current round since we only start the current round once we have have all
// the prior time data
for r in (self.round().number.0 + 1) ..= round.0 {
self.end_time.insert(
RoundNumber(r),
RoundData::<N>::new(RoundNumber(r), self.end_time[&RoundNumber(r - 1)]).end_time(),
);
}
}
// Start a new round. Optionally takes in the time for when this is the first round, and the time
// isn't simply the time of the prior round (yet rather the prior block). Returns the proposal
// data, if we are the proposer.
pub(crate) fn new_round(
&mut self,
round: RoundNumber,
proposer: N::ValidatorId,
time: Option<CanonicalInstant>,
) -> Option<DataFor<N>> {
debug_assert_eq!(round.0 == 0, time.is_some());
// If this is the first round, we don't have a prior round's end time to use as the start
// We use the passed in time instead
// If this isn't the first round, ensure we have the prior round's end time by populating the
// map with all rounds till this round
// This can happen we jump from round x to round x+n, where n != 1
// The paper says to do so whenever you observe a sufficient amount of peers on a higher round
if round.0 != 0 {
self.populate_end_time(round);
}
// 11-13
self.round = Some(RoundData::<N>::new(
round,
time.unwrap_or_else(|| self.end_time[&RoundNumber(round.0 - 1)]),
));
self.end_time.insert(round, self.round().end_time());
// 14-21
if Some(proposer) == self.validator_id {
let (round, block) = self.valid.clone().unzip();
block.or_else(|| self.proposal.clone()).map(|block| Data::Proposal(round, block))
} else {
self.round_mut().set_timeout(Step::Propose);
None
}
}
// Transform Data into an actual Message, using the contextual data from this block
pub(crate) fn message(&mut self, data: DataFor<N>) -> Option<MessageFor<N>> {
debug_assert_eq!(
self.round().step,
match data.step() {
Step::Propose | Step::Prevote => Step::Propose,
Step::Precommit => Step::Prevote,
},
);
// Tendermint always sets the round's step to whatever it just broadcasted
// Consolidate all of those here to ensure they aren't missed by an oversight
// 27, 33, 41, 46, 60, 64
self.round_mut().step = data.step();
// Only return a message to if we're actually a current validator
self.validator_id.map(|validator_id| Message {
sender: validator_id,
block: self.number,
round: self.round().number,
data,
})
}
}

274
tendermint/src/ext.rs Normal file
View File

@@ -0,0 +1,274 @@
use core::{hash::Hash, fmt::Debug};
use std::{sync::Arc, collections::HashSet};
use async_trait::async_trait;
use thiserror::Error;
use parity_scale_codec::{Encode, Decode};
use crate::{SignedMessageFor, commit_msg};
/// An alias for a series of traits required for a type to be usable as a validator ID,
/// automatically implemented for all types satisfying those traits.
pub trait ValidatorId:
Send + Sync + Clone + Copy + PartialEq + Eq + Hash + Debug + Encode + Decode
{
}
impl<V: Send + Sync + Clone + Copy + PartialEq + Eq + Hash + Debug + Encode + Decode> ValidatorId
for V
{
}
/// An alias for a series of traits required for a type to be usable as a signature,
/// automatically implemented for all types satisfying those traits.
pub trait Signature: Send + Sync + Clone + PartialEq + Debug + Encode + Decode {}
impl<S: Send + Sync + Clone + PartialEq + Debug + Encode + Decode> Signature for S {}
// Type aliases which are distinct according to the type system
/// A struct containing a Block Number, wrapped to have a distinct type.
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Encode, Decode)]
pub struct BlockNumber(pub u64);
/// A struct containing a round number, wrapped to have a distinct type.
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Encode, Decode)]
pub struct RoundNumber(pub u32);
/// A signer for a validator.
#[async_trait]
pub trait Signer: Send + Sync {
// Type used to identify validators.
type ValidatorId: ValidatorId;
/// Signature type.
type Signature: Signature;
/// Returns the validator's current ID. Returns None if they aren't a current validator.
async fn validator_id(&self) -> Option<Self::ValidatorId>;
/// Sign a signature with the current validator's private key.
async fn sign(&self, msg: &[u8]) -> Self::Signature;
}
#[async_trait]
impl<S: Signer> Signer for Arc<S> {
type ValidatorId = S::ValidatorId;
type Signature = S::Signature;
async fn validator_id(&self) -> Option<Self::ValidatorId> {
self.as_ref().validator_id().await
}
async fn sign(&self, msg: &[u8]) -> Self::Signature {
self.as_ref().sign(msg).await
}
}
/// A signature scheme used by validators.
pub trait SignatureScheme: Send + Sync {
// Type used to identify validators.
type ValidatorId: ValidatorId;
/// Signature type.
type Signature: Signature;
/// Type representing an aggregate signature. This would presumably be a BLS signature,
/// yet even with Schnorr signatures
/// [half-aggregation is possible](https://eprint.iacr.org/2021/350).
/// It could even be a threshold signature scheme, though that's currently unexpected.
type AggregateSignature: Signature;
/// Type representing a signer of this scheme.
type Signer: Signer<ValidatorId = Self::ValidatorId, Signature = Self::Signature>;
/// Verify a signature from the validator in question.
#[must_use]
fn verify(&self, validator: Self::ValidatorId, msg: &[u8], sig: &Self::Signature) -> bool;
/// Aggregate signatures.
fn aggregate(sigs: &[Self::Signature]) -> Self::AggregateSignature;
/// Verify an aggregate signature for the list of signers.
#[must_use]
fn verify_aggregate(
&self,
signers: &[Self::ValidatorId],
msg: &[u8],
sig: &Self::AggregateSignature,
) -> bool;
}
impl<S: SignatureScheme> SignatureScheme for Arc<S> {
type ValidatorId = S::ValidatorId;
type Signature = S::Signature;
type AggregateSignature = S::AggregateSignature;
type Signer = S::Signer;
fn verify(&self, validator: Self::ValidatorId, msg: &[u8], sig: &Self::Signature) -> bool {
self.as_ref().verify(validator, msg, sig)
}
fn aggregate(sigs: &[Self::Signature]) -> Self::AggregateSignature {
S::aggregate(sigs)
}
#[must_use]
fn verify_aggregate(
&self,
signers: &[Self::ValidatorId],
msg: &[u8],
sig: &Self::AggregateSignature,
) -> bool {
self.as_ref().verify_aggregate(signers, msg, sig)
}
}
/// A commit for a specific block. The list of validators have weight exceeding the threshold for
/// a valid commit.
#[derive(Clone, PartialEq, Debug, Encode, Decode)]
pub struct Commit<S: SignatureScheme> {
/// End time of the round which created this commit, used as the start time of the next block.
pub end_time: u64,
/// Validators participating in the signature.
pub validators: Vec<S::ValidatorId>,
/// Aggregate signature.
pub signature: S::AggregateSignature,
}
/// Weights for the validators present.
pub trait Weights: Send + Sync {
type ValidatorId: ValidatorId;
/// Total weight of all validators.
fn total_weight(&self) -> u64;
/// Weight for a specific validator.
fn weight(&self, validator: Self::ValidatorId) -> u64;
/// Threshold needed for BFT consensus.
fn threshold(&self) -> u64 {
((self.total_weight() * 2) / 3) + 1
}
/// Threshold preventing BFT consensus.
fn fault_thresold(&self) -> u64 {
(self.total_weight() - self.threshold()) + 1
}
/// Weighted round robin function.
fn proposer(&self, block: BlockNumber, round: RoundNumber) -> Self::ValidatorId;
}
impl<W: Weights> Weights for Arc<W> {
type ValidatorId = W::ValidatorId;
fn total_weight(&self) -> u64 {
self.as_ref().total_weight()
}
fn weight(&self, validator: Self::ValidatorId) -> u64 {
self.as_ref().weight(validator)
}
fn proposer(&self, block: BlockNumber, round: RoundNumber) -> Self::ValidatorId {
self.as_ref().proposer(block, round)
}
}
/// Simplified error enum representing a block's validity.
#[derive(Clone, Copy, PartialEq, Eq, Debug, Error, Encode, Decode)]
pub enum BlockError {
/// Malformed block which is wholly invalid.
#[error("invalid block")]
Fatal,
/// Valid block by syntax, with semantics which may or may not be valid yet are locally
/// considered invalid. If a block fails to validate with this, a slash will not be triggered.
#[error("invalid block under local view")]
Temporal,
}
/// Trait representing a Block.
pub trait Block: Send + Sync + Clone + PartialEq + Debug + Encode + Decode {
// Type used to identify blocks. Presumably a cryptographic hash of the block.
type Id: Send + Sync + Copy + Clone + PartialEq + AsRef<[u8]> + Debug + Encode + Decode;
/// Return the deterministic, unique ID for this block.
fn id(&self) -> Self::Id;
}
#[cfg(feature = "substrate")]
impl<B: sp_runtime::traits::Block> Block for B {
type Id = B::Hash;
fn id(&self) -> B::Hash {
self.hash()
}
}
/// Trait representing the distributed system Tendermint is providing consensus over.
#[async_trait]
pub trait Network: Send + Sync {
// Type used to identify validators.
type ValidatorId: ValidatorId;
/// Signature scheme used by validators.
type SignatureScheme: SignatureScheme<ValidatorId = Self::ValidatorId>;
/// Object representing the weights of validators.
type Weights: Weights<ValidatorId = Self::ValidatorId>;
/// Type used for ordered blocks of information.
type Block: Block;
/// Maximum block processing time in seconds. This should include both the actual processing time
/// and the time to download the block.
const BLOCK_PROCESSING_TIME: u32;
/// Network latency time in seconds.
const LATENCY_TIME: u32;
/// The block time is defined as the processing time plus three times the latency.
fn block_time() -> u32 {
Self::BLOCK_PROCESSING_TIME + (3 * Self::LATENCY_TIME)
}
/// Return a handle on the signer in use, usable for the entire lifetime of the machine.
fn signer(&self) -> <Self::SignatureScheme as SignatureScheme>::Signer;
/// Return a handle on the signing scheme in use, usable for the entire lifetime of the machine.
fn signature_scheme(&self) -> Self::SignatureScheme;
/// Return a handle on the validators' weights, usable for the entire lifetime of the machine.
fn weights(&self) -> Self::Weights;
/// Verify a commit for a given block. Intended for use when syncing or when not an active
/// validator.
#[must_use]
fn verify_commit(
&self,
id: <Self::Block as Block>::Id,
commit: &Commit<Self::SignatureScheme>,
) -> bool {
if commit.validators.iter().collect::<HashSet<_>>().len() != commit.validators.len() {
return false;
}
if !self.signature_scheme().verify_aggregate(
&commit.validators,
&commit_msg(commit.end_time, id.as_ref()),
&commit.signature,
) {
return false;
}
let weights = self.weights();
commit.validators.iter().map(|v| weights.weight(*v)).sum::<u64>() >= weights.threshold()
}
/// Broadcast a message to the other validators. If authenticated channels have already been
/// established, this will double-authenticate. Switching to unauthenticated channels in a system
/// already providing authenticated channels is not recommended as this is a minor, temporal
/// inefficiency while downgrading channels may have wider implications.
async fn broadcast(&mut self, msg: SignedMessageFor<Self>);
/// Trigger a slash for the validator in question who was definitively malicious.
/// The exact process of triggering a slash is undefined and left to the network as a whole.
async fn slash(&mut self, validator: Self::ValidatorId);
/// Validate a block.
async fn validate(&mut self, block: &Self::Block) -> Result<(), BlockError>;
/// Add a block, returning the proposal for the next one. It's possible a block, which was never
/// validated or even failed validation, may be passed here if a supermajority of validators did
/// consider it valid and created a commit for it. This deviates from the paper which will have a
/// local node refuse to decide on a block it considers invalid. This library acknowledges the
/// network did decide on it, leaving handling of it to the network, and outside of this scope.
async fn add_block(
&mut self,
block: Self::Block,
commit: Commit<Self::SignatureScheme>,
) -> Option<Self::Block>;
}

648
tendermint/src/lib.rs Normal file
View File

@@ -0,0 +1,648 @@
use core::fmt::Debug;
use std::{
sync::Arc,
time::{SystemTime, Instant, Duration},
collections::VecDeque,
};
use log::debug;
use parity_scale_codec::{Encode, Decode};
use futures::{
FutureExt, StreamExt,
future::{self, Fuse},
channel::mpsc,
};
use tokio::time::sleep;
mod time;
use time::{sys_time, CanonicalInstant};
mod round;
mod block;
use block::BlockData;
pub(crate) mod message_log;
/// Traits and types of the external network being integrated with to provide consensus over.
pub mod ext;
use ext::*;
pub(crate) fn commit_msg(end_time: u64, id: &[u8]) -> Vec<u8> {
[&end_time.to_le_bytes(), id].concat().to_vec()
}
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Encode, Decode)]
enum Step {
Propose,
Prevote,
Precommit,
}
#[derive(Clone, Debug, Encode, Decode)]
enum Data<B: Block, S: Signature> {
Proposal(Option<RoundNumber>, B),
Prevote(Option<B::Id>),
Precommit(Option<(B::Id, S)>),
}
impl<B: Block, S: Signature> PartialEq for Data<B, S> {
fn eq(&self, other: &Data<B, S>) -> bool {
match (self, other) {
(Data::Proposal(valid_round, block), Data::Proposal(valid_round2, block2)) => {
(valid_round == valid_round2) && (block == block2)
}
(Data::Prevote(id), Data::Prevote(id2)) => id == id2,
(Data::Precommit(None), Data::Precommit(None)) => true,
(Data::Precommit(Some((id, _))), Data::Precommit(Some((id2, _)))) => id == id2,
_ => false,
}
}
}
impl<B: Block, S: Signature> Data<B, S> {
fn step(&self) -> Step {
match self {
Data::Proposal(..) => Step::Propose,
Data::Prevote(..) => Step::Prevote,
Data::Precommit(..) => Step::Precommit,
}
}
}
#[derive(Clone, PartialEq, Debug, Encode, Decode)]
struct Message<V: ValidatorId, B: Block, S: Signature> {
sender: V,
block: BlockNumber,
round: RoundNumber,
data: Data<B, S>,
}
/// A signed Tendermint consensus message to be broadcast to the other validators.
#[derive(Clone, PartialEq, Debug, Encode, Decode)]
pub struct SignedMessage<V: ValidatorId, B: Block, S: Signature> {
msg: Message<V, B, S>,
sig: S,
}
impl<V: ValidatorId, B: Block, S: Signature> SignedMessage<V, B, S> {
/// Number of the block this message is attempting to add to the chain.
pub fn block(&self) -> BlockNumber {
self.msg.block
}
#[must_use]
pub fn verify_signature<Scheme: SignatureScheme<ValidatorId = V, Signature = S>>(
&self,
signer: &Scheme,
) -> bool {
signer.verify(self.msg.sender, &self.msg.encode(), &self.sig)
}
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
enum TendermintError<V: ValidatorId> {
Malicious(V),
Temporal,
}
// Type aliases to abstract over generic hell
pub(crate) type DataFor<N> =
Data<<N as Network>::Block, <<N as Network>::SignatureScheme as SignatureScheme>::Signature>;
pub(crate) type MessageFor<N> = Message<
<N as Network>::ValidatorId,
<N as Network>::Block,
<<N as Network>::SignatureScheme as SignatureScheme>::Signature,
>;
/// Type alias to the SignedMessage type for a given Network
pub type SignedMessageFor<N> = SignedMessage<
<N as Network>::ValidatorId,
<N as Network>::Block,
<<N as Network>::SignatureScheme as SignatureScheme>::Signature,
>;
/// A machine executing the Tendermint protocol.
pub struct TendermintMachine<N: Network> {
network: N,
signer: <N::SignatureScheme as SignatureScheme>::Signer,
validators: N::SignatureScheme,
weights: Arc<N::Weights>,
queue: VecDeque<MessageFor<N>>,
msg_recv: mpsc::UnboundedReceiver<SignedMessageFor<N>>,
#[allow(clippy::type_complexity)]
step_recv: mpsc::UnboundedReceiver<(BlockNumber, Commit<N::SignatureScheme>, Option<N::Block>)>,
block: BlockData<N>,
}
pub type StepSender<N> = mpsc::UnboundedSender<(
BlockNumber,
Commit<<N as Network>::SignatureScheme>,
Option<<N as Network>::Block>,
)>;
pub type MessageSender<N> = mpsc::UnboundedSender<SignedMessageFor<N>>;
/// A Tendermint machine and its channel to receive messages from the gossip layer over.
pub struct TendermintHandle<N: Network> {
/// Channel to trigger the machine to move to the next block.
/// Takes in the the previous block's commit, along with the new proposal.
pub step: StepSender<N>,
/// Channel to send messages received from the P2P layer.
pub messages: MessageSender<N>,
/// Tendermint machine to be run on an asynchronous task.
pub machine: TendermintMachine<N>,
}
impl<N: Network + 'static> TendermintMachine<N> {
// Broadcast the given piece of data
// Tendermint messages always specify their block/round, yet Tendermint only ever broadcasts for
// the current block/round. Accordingly, instead of manually fetching those at every call-site,
// this function can simply pass the data to the block which can contextualize it
fn broadcast(&mut self, data: DataFor<N>) {
if let Some(msg) = self.block.message(data) {
// Push it on to the queue. This is done so we only handle one message at a time, and so we
// can handle our own message before broadcasting it. That way, we fail before before
// becoming malicious
self.queue.push_back(msg);
}
}
// Start a new round. Returns true if we were the proposer
fn round(&mut self, round: RoundNumber, time: Option<CanonicalInstant>) -> bool {
if let Some(data) =
self.block.new_round(round, self.weights.proposer(self.block.number, round), time)
{
self.broadcast(data);
true
} else {
false
}
}
// 53-54
async fn reset(&mut self, end_round: RoundNumber, proposal: Option<N::Block>) {
// Ensure we have the end time data for the last round
self.block.populate_end_time(end_round);
// Sleep until this round ends
let round_end = self.block.end_time[&end_round];
sleep(round_end.instant().saturating_duration_since(Instant::now())).await;
// Clear our outbound message queue
self.queue = VecDeque::new();
// Create the new block
self.block = BlockData::new(
self.weights.clone(),
BlockNumber(self.block.number.0 + 1),
self.signer.validator_id().await,
proposal,
);
// Start the first round
self.round(RoundNumber(0), Some(round_end));
}
async fn reset_by_commit(
&mut self,
commit: Commit<N::SignatureScheme>,
proposal: Option<N::Block>,
) {
let mut round = self.block.round().number;
// If this commit is for a round we don't have, jump up to it
while self.block.end_time[&round].canonical() < commit.end_time {
round.0 += 1;
self.block.populate_end_time(round);
}
// If this commit is for a prior round, find it
while self.block.end_time[&round].canonical() > commit.end_time {
if round.0 == 0 {
panic!("commit isn't for this machine's next block");
}
round.0 -= 1;
}
debug_assert_eq!(self.block.end_time[&round].canonical(), commit.end_time);
self.reset(round, proposal).await;
}
async fn slash(&mut self, validator: N::ValidatorId) {
if !self.block.slashes.contains(&validator) {
debug!(target: "tendermint", "Slashing validator {:?}", validator);
self.block.slashes.insert(validator);
self.network.slash(validator).await;
}
}
/// Create a new Tendermint machine, from the specified point, with the specified block as the
/// one to propose next. This will return a channel to send messages from the gossip layer and
/// the machine itself. The machine should have `run` called from an asynchronous task.
#[allow(clippy::new_ret_no_self)]
pub async fn new(
network: N,
last_block: BlockNumber,
last_time: u64,
proposal: N::Block,
) -> TendermintHandle<N> {
let (msg_send, msg_recv) = mpsc::unbounded();
let (step_send, step_recv) = mpsc::unbounded();
TendermintHandle {
step: step_send,
messages: msg_send,
machine: {
let sys_time = sys_time(last_time);
// If the last block hasn't ended yet, sleep until it has
sleep(sys_time.duration_since(SystemTime::now()).unwrap_or(Duration::ZERO)).await;
let signer = network.signer();
let validators = network.signature_scheme();
let weights = Arc::new(network.weights());
let validator_id = signer.validator_id().await;
// 01-10
let mut machine = TendermintMachine {
network,
signer,
validators,
weights: weights.clone(),
queue: VecDeque::new(),
msg_recv,
step_recv,
block: BlockData::new(
weights,
BlockNumber(last_block.0 + 1),
validator_id,
Some(proposal),
),
};
// The end time of the last block is the start time for this one
// The Commit explicitly contains the end time, so loading the last commit will provide
// this. The only exception is for the genesis block, which doesn't have a commit
// Using the genesis time in place will cause this block to be created immediately
// after it, without the standard amount of separation (so their times will be
// equivalent or minimally offset)
// For callers wishing to avoid this, they should pass (0, GENESIS + N::block_time())
machine.round(RoundNumber(0), Some(CanonicalInstant::new(last_time)));
machine
},
}
}
pub async fn run(mut self) {
loop {
// Also create a future for if the queue has a message
// Does not pop_front as if another message has higher priority, its future will be handled
// instead in this loop, and the popped value would be dropped with the next iteration
// While no other message has a higher priority right now, this is a safer practice
let mut queue_future =
if self.queue.is_empty() { Fuse::terminated() } else { future::ready(()).fuse() };
if let Some((broadcast, msg)) = futures::select_biased! {
// Handle a new block occuring externally (an external sync loop)
// Has the highest priority as it makes all other futures here irrelevant
msg = self.step_recv.next() => {
if let Some((block_number, commit, proposal)) = msg {
// Commit is for a block we've already moved past
if block_number != self.block.number {
continue;
}
self.reset_by_commit(commit, proposal).await;
None
} else {
break;
}
},
// Handle our messages
_ = queue_future => {
Some((true, self.queue.pop_front().unwrap()))
},
// Handle any timeouts
step = self.block.round().timeout_future().fuse() => {
// Remove the timeout so it doesn't persist, always being the selected future due to bias
// While this does enable the timeout to be entered again, the timeout setting code will
// never attempt to add a timeout after its timeout has expired
self.block.round_mut().timeouts.remove(&step);
// Only run if it's still the step in question
if self.block.round().step == step {
match step {
Step::Propose => {
// Slash the validator for not proposing when they should've
debug!(target: "tendermint", "Validator didn't propose when they should have");
self.slash(
self.weights.proposer(self.block.number, self.block.round().number)
).await;
self.broadcast(Data::Prevote(None));
},
Step::Prevote => self.broadcast(Data::Precommit(None)),
Step::Precommit => {
self.round(RoundNumber(self.block.round().number.0 + 1), None);
continue;
}
}
}
None
},
// Handle any received messages
msg = self.msg_recv.next() => {
if let Some(msg) = msg {
if !msg.verify_signature(&self.validators) {
continue;
}
Some((false, msg.msg))
} else {
break;
}
}
} {
let res = self.message(msg.clone()).await;
if res.is_err() && broadcast {
panic!("honest node had invalid behavior");
}
match res {
Ok(None) => (),
Ok(Some(block)) => {
let mut validators = vec![];
let mut sigs = vec![];
// Get all precommits for this round
for (validator, msgs) in &self.block.log.log[&msg.round] {
if let Some(Data::Precommit(Some((id, sig)))) = msgs.get(&Step::Precommit) {
// If this precommit was for this block, include it
if id == &block.id() {
validators.push(*validator);
sigs.push(sig.clone());
}
}
}
let commit = Commit {
end_time: self.block.end_time[&msg.round].canonical(),
validators,
signature: N::SignatureScheme::aggregate(&sigs),
};
debug_assert!(self.network.verify_commit(block.id(), &commit));
let proposal = self.network.add_block(block, commit).await;
self.reset(msg.round, proposal).await;
}
Err(TendermintError::Malicious(validator)) => self.slash(validator).await,
Err(TendermintError::Temporal) => (),
}
if broadcast {
let sig = self.signer.sign(&msg.encode()).await;
self.network.broadcast(SignedMessage { msg, sig }).await;
}
}
}
}
// Returns Ok(true) if this was a Precommit which had its signature validated
// Returns Ok(false) if it wasn't a Precommit or the signature wasn't validated yet
// Returns Err if the signature was invalid
fn verify_precommit_signature(
&self,
sender: N::ValidatorId,
round: RoundNumber,
data: &DataFor<N>,
) -> Result<bool, TendermintError<N::ValidatorId>> {
if let Data::Precommit(Some((id, sig))) = data {
// Also verify the end_time of the commit
// Only perform this verification if we already have the end_time
// Else, there's a DoS where we receive a precommit for some round infinitely in the future
// which forces us to calculate every end time
if let Some(end_time) = self.block.end_time.get(&round) {
if !self.validators.verify(sender, &commit_msg(end_time.canonical(), id.as_ref()), sig) {
debug!(target: "tendermint", "Validator produced an invalid commit signature");
Err(TendermintError::Malicious(sender))?;
}
return Ok(true);
}
}
Ok(false)
}
async fn message(
&mut self,
msg: MessageFor<N>,
) -> Result<Option<N::Block>, TendermintError<N::ValidatorId>> {
if msg.block != self.block.number {
Err(TendermintError::Temporal)?;
}
// If this is a precommit, verify its signature
self.verify_precommit_signature(msg.sender, msg.round, &msg.data)?;
// Only let the proposer propose
if matches!(msg.data, Data::Proposal(..)) &&
(msg.sender != self.weights.proposer(msg.block, msg.round))
{
debug!(target: "tendermint", "Validator who wasn't the proposer proposed");
Err(TendermintError::Malicious(msg.sender))?;
};
if !self.block.log.log(msg.clone())? {
return Ok(None);
}
// All functions, except for the finalizer and the jump, are locked to the current round
// Run the finalizer to see if it applies
// 49-52
if matches!(msg.data, Data::Proposal(..)) || matches!(msg.data, Data::Precommit(_)) {
let proposer = self.weights.proposer(self.block.number, msg.round);
// Get the proposal
if let Some(Data::Proposal(_, block)) = self.block.log.get(msg.round, proposer, Step::Propose)
{
// Check if it has gotten a sufficient amount of precommits
// Use a junk signature since message equality disregards the signature
if self.block.log.has_consensus(
msg.round,
Data::Precommit(Some((block.id(), self.signer.sign(&[]).await))),
) {
return Ok(Some(block.clone()));
}
}
}
// Else, check if we need to jump ahead
#[allow(clippy::comparison_chain)]
if msg.round.0 < self.block.round().number.0 {
// Prior round, disregard if not finalizing
return Ok(None);
} else if msg.round.0 > self.block.round().number.0 {
// 55-56
// Jump, enabling processing by the below code
if self.block.log.round_participation(msg.round) > self.weights.fault_thresold() {
// If this round already has precommit messages, verify their signatures
let round_msgs = self.block.log.log[&msg.round].clone();
for (validator, msgs) in &round_msgs {
if let Some(data) = msgs.get(&Step::Precommit) {
if let Ok(res) = self.verify_precommit_signature(*validator, msg.round, data) {
// Ensure this actually verified the signature instead of believing it shouldn't yet
debug_assert!(res);
} else {
// Remove the message so it isn't counted towards forming a commit/included in one
// This won't remove the fact the precommitted for this block hash in the MessageLog
// TODO: Don't even log these in the first place until we jump, preventing needing
// to do this in the first place
self
.block
.log
.log
.get_mut(&msg.round)
.unwrap()
.get_mut(validator)
.unwrap()
.remove(&Step::Precommit);
self.slash(*validator).await;
}
}
}
// If we're the proposer, return now so we re-run processing with our proposal
// If we continue now, it'd just be wasted ops
if self.round(msg.round, None) {
return Ok(None);
}
} else {
// Future round which we aren't ready to jump to, so return for now
return Ok(None);
}
}
// The paper executes these checks when the step is prevote. Making sure this message warrants
// rerunning these checks is a sane optimization since message instances is a full iteration
// of the round map
if (self.block.round().step == Step::Prevote) && matches!(msg.data, Data::Prevote(_)) {
let (participation, weight) =
self.block.log.message_instances(self.block.round().number, Data::Prevote(None));
// 34-35
if participation >= self.weights.threshold() {
self.block.round_mut().set_timeout(Step::Prevote);
}
// 44-46
if weight >= self.weights.threshold() {
self.broadcast(Data::Precommit(None));
return Ok(None);
}
}
// 47-48
if matches!(msg.data, Data::Precommit(_)) &&
self.block.log.has_participation(self.block.round().number, Step::Precommit)
{
self.block.round_mut().set_timeout(Step::Precommit);
}
// All further operations require actually having the proposal in question
let proposer = self.weights.proposer(self.block.number, self.block.round().number);
let (vr, block) = if let Some(Data::Proposal(vr, block)) =
self.block.log.get(self.block.round().number, proposer, Step::Propose)
{
(vr, block)
} else {
return Ok(None);
};
// 22-33
if self.block.round().step == Step::Propose {
// Delay error handling (triggering a slash) until after we vote.
let (valid, err) = match self.network.validate(block).await {
Ok(_) => (true, Ok(None)),
Err(BlockError::Temporal) => (false, Ok(None)),
Err(BlockError::Fatal) => (false, {
debug!(target: "tendermint", "Validator proposed a fatally invalid block");
Err(TendermintError::Malicious(proposer))
}),
};
// Create a raw vote which only requires block validity as a basis for the actual vote.
let raw_vote = Some(block.id()).filter(|_| valid);
// If locked is none, it has a round of -1 according to the protocol. That satisfies
// 23 and 29. If it's some, both are satisfied if they're for the same ID. If it's some
// with different IDs, the function on 22 rejects yet the function on 28 has one other
// condition
let locked = self.block.locked.as_ref().map(|(_, id)| id == &block.id()).unwrap_or(true);
let mut vote = raw_vote.filter(|_| locked);
if let Some(vr) = vr {
// Malformed message
if vr.0 >= self.block.round().number.0 {
debug!(target: "tendermint", "Validator claimed a round from the future was valid");
Err(TendermintError::Malicious(msg.sender))?;
}
if self.block.log.has_consensus(*vr, Data::Prevote(Some(block.id()))) {
// Allow differing locked values if the proposal has a newer valid round
// This is the other condition described above
if let Some((locked_round, _)) = self.block.locked.as_ref() {
vote = vote.or_else(|| raw_vote.filter(|_| locked_round.0 <= vr.0));
}
self.broadcast(Data::Prevote(vote));
return err;
}
} else {
self.broadcast(Data::Prevote(vote));
return err;
}
return Ok(None);
}
if self
.block
.valid
.as_ref()
.map(|(round, _)| round != &self.block.round().number)
.unwrap_or(true)
{
// 36-43
// The run once condition is implemented above. Since valid will always be set by this, it
// not being set, or only being set historically, means this has yet to be run
if self.block.log.has_consensus(self.block.round().number, Data::Prevote(Some(block.id()))) {
match self.network.validate(block).await {
Ok(_) => (),
Err(BlockError::Temporal) => (),
Err(BlockError::Fatal) => {
debug!(target: "tendermint", "Validator proposed a fatally invalid block");
Err(TendermintError::Malicious(proposer))?
}
};
self.block.valid = Some((self.block.round().number, block.clone()));
if self.block.round().step == Step::Prevote {
self.block.locked = Some((self.block.round().number, block.id()));
self.broadcast(Data::Precommit(Some((
block.id(),
self
.signer
.sign(&commit_msg(
self.block.end_time[&self.block.round().number].canonical(),
block.id().as_ref(),
))
.await,
))));
}
}
}
Ok(None)
}
}

View File

@@ -0,0 +1,108 @@
use std::{sync::Arc, collections::HashMap};
use log::debug;
use crate::{ext::*, RoundNumber, Step, Data, DataFor, MessageFor, TendermintError};
type RoundLog<N> = HashMap<<N as Network>::ValidatorId, HashMap<Step, DataFor<N>>>;
pub(crate) struct MessageLog<N: Network> {
weights: Arc<N::Weights>,
precommitted: HashMap<N::ValidatorId, <N::Block as Block>::Id>,
pub(crate) log: HashMap<RoundNumber, RoundLog<N>>,
}
impl<N: Network> MessageLog<N> {
pub(crate) fn new(weights: Arc<N::Weights>) -> MessageLog<N> {
MessageLog { weights, precommitted: HashMap::new(), log: HashMap::new() }
}
// Returns true if it's a new message
pub(crate) fn log(
&mut self,
msg: MessageFor<N>,
) -> Result<bool, TendermintError<N::ValidatorId>> {
let round = self.log.entry(msg.round).or_insert_with(HashMap::new);
let msgs = round.entry(msg.sender).or_insert_with(HashMap::new);
// Handle message replays without issue. It's only multiple messages which is malicious
let step = msg.data.step();
if let Some(existing) = msgs.get(&step) {
if existing != &msg.data {
debug!(
target: "tendermint",
"Validator sent multiple messages for the same block + round + step"
);
Err(TendermintError::Malicious(msg.sender))?;
}
return Ok(false);
}
// If they already precommitted to a distinct hash, error
if let Data::Precommit(Some((hash, _))) = &msg.data {
if let Some(prev) = self.precommitted.get(&msg.sender) {
if hash != prev {
debug!(target: "tendermint", "Validator precommitted to multiple blocks");
Err(TendermintError::Malicious(msg.sender))?;
}
}
self.precommitted.insert(msg.sender, *hash);
}
msgs.insert(step, msg.data);
Ok(true)
}
// For a given round, return the participating weight for this step, and the weight agreeing with
// the data.
pub(crate) fn message_instances(&self, round: RoundNumber, data: DataFor<N>) -> (u64, u64) {
let mut participating = 0;
let mut weight = 0;
for (participant, msgs) in &self.log[&round] {
if let Some(msg) = msgs.get(&data.step()) {
let validator_weight = self.weights.weight(*participant);
participating += validator_weight;
if &data == msg {
weight += validator_weight;
}
}
}
(participating, weight)
}
// Get the participation in a given round
pub(crate) fn round_participation(&self, round: RoundNumber) -> u64 {
let mut weight = 0;
if let Some(round) = self.log.get(&round) {
for participant in round.keys() {
weight += self.weights.weight(*participant);
}
};
weight
}
// Check if a supermajority of nodes have participated on a specific step
pub(crate) fn has_participation(&self, round: RoundNumber, step: Step) -> bool {
let mut participating = 0;
for (participant, msgs) in &self.log[&round] {
if msgs.get(&step).is_some() {
participating += self.weights.weight(*participant);
}
}
participating >= self.weights.threshold()
}
// Check if consensus has been reached on a specific piece of data
pub(crate) fn has_consensus(&self, round: RoundNumber, data: DataFor<N>) -> bool {
let (_, weight) = self.message_instances(round, data);
weight >= self.weights.threshold()
}
pub(crate) fn get(
&self,
round: RoundNumber,
sender: N::ValidatorId,
step: Step,
) -> Option<&DataFor<N>> {
self.log.get(&round).and_then(|round| round.get(&sender).and_then(|msgs| msgs.get(&step)))
}
}

83
tendermint/src/round.rs Normal file
View File

@@ -0,0 +1,83 @@
use std::{
marker::PhantomData,
time::{Duration, Instant},
collections::HashMap,
};
use futures::{FutureExt, future};
use tokio::time::sleep;
use crate::{
time::CanonicalInstant,
Step,
ext::{RoundNumber, Network},
};
pub(crate) struct RoundData<N: Network> {
_network: PhantomData<N>,
pub(crate) number: RoundNumber,
pub(crate) start_time: CanonicalInstant,
pub(crate) step: Step,
pub(crate) timeouts: HashMap<Step, Instant>,
}
impl<N: Network> RoundData<N> {
pub(crate) fn new(number: RoundNumber, start_time: CanonicalInstant) -> Self {
RoundData {
_network: PhantomData,
number,
start_time,
step: Step::Propose,
timeouts: HashMap::new(),
}
}
fn timeout(&self, step: Step) -> CanonicalInstant {
let adjusted_block = N::BLOCK_PROCESSING_TIME * (self.number.0 + 1);
let adjusted_latency = N::LATENCY_TIME * (self.number.0 + 1);
let offset = Duration::from_secs(
(match step {
Step::Propose => adjusted_block + adjusted_latency,
Step::Prevote => adjusted_block + (2 * adjusted_latency),
Step::Precommit => adjusted_block + (3 * adjusted_latency),
})
.into(),
);
self.start_time + offset
}
pub(crate) fn end_time(&self) -> CanonicalInstant {
self.timeout(Step::Precommit)
}
pub(crate) fn set_timeout(&mut self, step: Step) {
let timeout = self.timeout(step).instant();
self.timeouts.entry(step).or_insert(timeout);
}
// Poll all set timeouts, returning the Step whose timeout has just expired
pub(crate) async fn timeout_future(&self) -> Step {
let timeout_future = |step| {
let timeout = self.timeouts.get(&step).copied();
(async move {
if let Some(timeout) = timeout {
sleep(timeout.saturating_duration_since(Instant::now())).await;
} else {
future::pending::<()>().await;
}
step
})
.fuse()
};
let propose_timeout = timeout_future(Step::Propose);
let prevote_timeout = timeout_future(Step::Prevote);
let precommit_timeout = timeout_future(Step::Precommit);
futures::pin_mut!(propose_timeout, prevote_timeout, precommit_timeout);
futures::select_biased! {
step = propose_timeout => step,
step = prevote_timeout => step,
step = precommit_timeout => step,
}
}
}

44
tendermint/src/time.rs Normal file
View File

@@ -0,0 +1,44 @@
use core::ops::Add;
use std::time::{UNIX_EPOCH, SystemTime, Instant, Duration};
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub(crate) struct CanonicalInstant {
/// Time since the epoch.
time: u64,
/// An Instant synchronized with the above time.
instant: Instant,
}
pub(crate) fn sys_time(time: u64) -> SystemTime {
UNIX_EPOCH + Duration::from_secs(time)
}
impl CanonicalInstant {
pub(crate) fn new(time: u64) -> CanonicalInstant {
// This is imprecise yet should be precise enough, as it'll resolve within a few ms
let instant_now = Instant::now();
let sys_now = SystemTime::now();
// If the time is in the future, this will be off by that much time
let elapsed = sys_now.duration_since(sys_time(time)).unwrap_or(Duration::ZERO);
// Except for the fact this panics here
let synced_instant = instant_now.checked_sub(elapsed).unwrap();
CanonicalInstant { time, instant: synced_instant }
}
pub(crate) fn canonical(&self) -> u64 {
self.time
}
pub(crate) fn instant(&self) -> Instant {
self.instant
}
}
impl Add<Duration> for CanonicalInstant {
type Output = CanonicalInstant;
fn add(self, duration: Duration) -> CanonicalInstant {
CanonicalInstant { time: self.time + duration.as_secs(), instant: self.instant + duration }
}
}

177
tendermint/tests/ext.rs Normal file
View File

@@ -0,0 +1,177 @@
use std::{
sync::Arc,
time::{UNIX_EPOCH, SystemTime, Duration},
};
use async_trait::async_trait;
use parity_scale_codec::{Encode, Decode};
use futures::SinkExt;
use tokio::{sync::RwLock, time::sleep};
use tendermint_machine::{
ext::*, SignedMessageFor, StepSender, MessageSender, TendermintMachine, TendermintHandle,
};
type TestValidatorId = u16;
type TestBlockId = [u8; 4];
struct TestSigner(u16);
#[async_trait]
impl Signer for TestSigner {
type ValidatorId = TestValidatorId;
type Signature = [u8; 32];
async fn validator_id(&self) -> Option<TestValidatorId> {
Some(self.0)
}
async fn sign(&self, msg: &[u8]) -> [u8; 32] {
let mut sig = [0; 32];
sig[.. 2].copy_from_slice(&self.0.to_le_bytes());
sig[2 .. (2 + 30.min(msg.len()))].copy_from_slice(&msg[.. 30.min(msg.len())]);
sig
}
}
struct TestSignatureScheme;
impl SignatureScheme for TestSignatureScheme {
type ValidatorId = TestValidatorId;
type Signature = [u8; 32];
type AggregateSignature = Vec<[u8; 32]>;
type Signer = TestSigner;
#[must_use]
fn verify(&self, validator: u16, msg: &[u8], sig: &[u8; 32]) -> bool {
(sig[.. 2] == validator.to_le_bytes()) && (sig[2 ..] == [msg, &[0; 30]].concat()[.. 30])
}
fn aggregate(sigs: &[[u8; 32]]) -> Vec<[u8; 32]> {
sigs.to_vec()
}
#[must_use]
fn verify_aggregate(
&self,
signers: &[TestValidatorId],
msg: &[u8],
sigs: &Vec<[u8; 32]>,
) -> bool {
assert_eq!(signers.len(), sigs.len());
for sig in signers.iter().zip(sigs.iter()) {
assert!(self.verify(*sig.0, msg, sig.1));
}
true
}
}
struct TestWeights;
impl Weights for TestWeights {
type ValidatorId = TestValidatorId;
fn total_weight(&self) -> u64 {
4
}
fn weight(&self, id: TestValidatorId) -> u64 {
[1; 4][usize::try_from(id).unwrap()]
}
fn proposer(&self, number: BlockNumber, round: RoundNumber) -> TestValidatorId {
TestValidatorId::try_from((number.0 + u64::from(round.0)) % 4).unwrap()
}
}
#[derive(Clone, PartialEq, Debug, Encode, Decode)]
struct TestBlock {
id: TestBlockId,
valid: Result<(), BlockError>,
}
impl Block for TestBlock {
type Id = TestBlockId;
fn id(&self) -> TestBlockId {
self.id
}
}
#[allow(clippy::type_complexity)]
struct TestNetwork(u16, Arc<RwLock<Vec<(MessageSender<Self>, StepSender<Self>)>>>);
#[async_trait]
impl Network for TestNetwork {
type ValidatorId = TestValidatorId;
type SignatureScheme = TestSignatureScheme;
type Weights = TestWeights;
type Block = TestBlock;
const BLOCK_PROCESSING_TIME: u32 = 2;
const LATENCY_TIME: u32 = 1;
fn signer(&self) -> TestSigner {
TestSigner(self.0)
}
fn signature_scheme(&self) -> TestSignatureScheme {
TestSignatureScheme
}
fn weights(&self) -> TestWeights {
TestWeights
}
async fn broadcast(&mut self, msg: SignedMessageFor<Self>) {
for (messages, _) in self.1.write().await.iter_mut() {
messages.send(msg.clone()).await.unwrap();
}
}
async fn slash(&mut self, _: TestValidatorId) {
dbg!("Slash");
todo!()
}
async fn validate(&mut self, block: &TestBlock) -> Result<(), BlockError> {
block.valid
}
async fn add_block(
&mut self,
block: TestBlock,
commit: Commit<TestSignatureScheme>,
) -> Option<TestBlock> {
dbg!("Adding ", &block);
assert!(block.valid.is_ok());
assert!(self.verify_commit(block.id(), &commit));
Some(TestBlock { id: (u32::from_le_bytes(block.id) + 1).to_le_bytes(), valid: Ok(()) })
}
}
impl TestNetwork {
async fn new(validators: usize) -> Arc<RwLock<Vec<(MessageSender<Self>, StepSender<Self>)>>> {
let arc = Arc::new(RwLock::new(vec![]));
{
let mut write = arc.write().await;
for i in 0 .. validators {
let i = u16::try_from(i).unwrap();
let TendermintHandle { messages, machine, step } = TendermintMachine::new(
TestNetwork(i, arc.clone()),
BlockNumber(1),
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(),
TestBlock { id: 1u32.to_le_bytes(), valid: Ok(()) },
)
.await;
tokio::task::spawn(machine.run());
write.push((messages, step));
}
}
arc
}
}
#[tokio::test]
async fn test() {
TestNetwork::new(4).await;
sleep(Duration::from_secs(30)).await;
}