Redo Tendermint folder structure

This commit is contained in:
Luke Parker
2022-10-27 06:33:58 -04:00
parent 4c2dd9b306
commit 66f7663cb2
23 changed files with 56 additions and 8 deletions

View File

@@ -0,0 +1,19 @@
[package]
name = "tendermint-machine"
version = "0.1.0"
description = "An implementation of the Tendermint state machine in Rust"
license = "MIT"
repository = "https://github.com/serai-dex/serai/tree/develop/substrate/tendermint/machine"
authors = ["Luke Parker <lukeparker5132@gmail.com>"]
edition = "2021"
[dependencies]
parity-scale-codec = { version = "3.2", features = ["derive"] }
async-trait = "0.1"
tokio = { version = "1", features = ["macros", "sync", "time", "rt"] }
sp-runtime = { git = "https://github.com/serai-dex/substrate", optional = true }
[features]
substrate = ["sp-runtime"]

View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2022 Luke Parker
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -0,0 +1,70 @@
# Tendermint
An implementation of the Tendermint state machine in Rust.
This is solely the state machine, intended to be mapped to any arbitrary system.
It supports an arbitrary signature scheme, weighting, and block definition
accordingly. It is not intended to work with the Cosmos SDK, solely to be an
implementation of the [academic protocol](https://arxiv.org/pdf/1807.04938.pdf).
### Caveats
- Only SCALE serialization is supported currently. Ideally, everything from
SCALE to borsh to bincode would be supported. SCALE was chosen due to this
being under Serai, which uses Substrate, which uses SCALE. Accordingly, when
deciding which of the three (mutually incompatible) options to support...
- tokio is explicitly used for the asynchronous task which runs the Tendermint
machine. Ideally, `futures-rs` would be used enabling any async runtime to be
used.
- It is possible for `add_block` to be called on a block which failed (or never
went through in the first place) validation. This is a break from the paper
which is accepted here. This is for two reasons.
1) Serai needing this functionality.
2) If a block is committed which is invalid, either there's a malicious
majority now defining consensus OR the local node is malicious by virtue of
being faulty. Considering how either represents a fatal circumstance,
except with regards to system like Serai which have their own logic for
pseudo-valid blocks, it is accepted as a possible behavior with the caveat
any consumers must be aware of it. No machine will vote nor precommit to a
block it considers invalid, so for a network with an honest majority, this
is a non-issue.
### Paper
The [paper](https://arxiv.org/abs/1807.04938) describes the algorithm with
pseudocode on page 6. This pseudocode is written as a series of conditions for
advancement. This is extremely archaic, as its a fraction of the actually
required code. This is due to its hand-waving away of data tracking, lack of
comments (beyond the entire rest of the paper, of course), and lack of
specification regarding faulty nodes.
While the "hand-waving" is both legitimate and expected, as it's not the paper's
job to describe a full message processing loop nor efficient variable handling,
it does leave behind ambiguities and annoyances, not to mention an overall
structure which cannot be directly translated. This section is meant to be a
description of it as used for translation.
The included pseudocode segments can be minimally described as follows:
```
01-09 Init
10-10 StartRound(0)
11-21 StartRound
22-27 Fresh proposal
28-33 Proposal building off a valid round with prevotes
34-35 2f+1 prevote -> schedule timeout prevote
36-43 First proposal with prevotes -> precommit Some
44-46 2f+1 nil prevote -> precommit nil
47-48 2f+1 precommit -> schedule timeout precommit
49-54 First proposal with precommits -> finalize
55-56 f+1 round > local round, jump
57-60 on timeout propose
61-64 on timeout prevote
65-67 on timeout precommit
```
The corresponding Rust code implementing these tasks are marked with their
related line numbers.

View File

@@ -0,0 +1,194 @@
use core::{hash::Hash, fmt::Debug};
use std::sync::Arc;
use parity_scale_codec::{Encode, Decode};
use crate::{SignedMessage, commit_msg};
/// An alias for a series of traits required for a type to be usable as a validator ID,
/// automatically implemented for all types satisfying those traits.
pub trait ValidatorId:
Send + Sync + Clone + Copy + PartialEq + Eq + Hash + Debug + Encode + Decode
{
}
impl<V: Send + Sync + Clone + Copy + PartialEq + Eq + Hash + Debug + Encode + Decode> ValidatorId
for V
{
}
/// An alias for a series of traits required for a type to be usable as a signature,
/// automatically implemented for all types satisfying those traits.
pub trait Signature: Send + Sync + Clone + PartialEq + Debug + Encode + Decode {}
impl<S: Send + Sync + Clone + PartialEq + Debug + Encode + Decode> Signature for S {}
// Type aliases which are distinct according to the type system
/// A struct containing a Block Number, wrapped to have a distinct type.
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Encode, Decode)]
pub struct BlockNumber(pub u64);
/// A struct containing a round number, wrapped to have a distinct type.
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Encode, Decode)]
pub struct Round(pub u32);
/// A signature scheme used by validators.
pub trait SignatureScheme: Send + Sync {
// Type used to identify validators.
type ValidatorId: ValidatorId;
/// Signature type.
type Signature: Signature;
/// Type representing an aggregate signature. This would presumably be a BLS signature,
/// yet even with Schnorr signatures
/// [half-aggregation is possible](https://eprint.iacr.org/2021/350).
/// It could even be a threshold signature scheme, though that's currently unexpected.
type AggregateSignature: Signature;
/// Sign a signature with the current validator's private key.
fn sign(&self, msg: &[u8]) -> Self::Signature;
/// Verify a signature from the validator in question.
#[must_use]
fn verify(&self, validator: Self::ValidatorId, msg: &[u8], sig: &Self::Signature) -> bool;
/// Aggregate signatures.
fn aggregate(sigs: &[Self::Signature]) -> Self::AggregateSignature;
/// Verify an aggregate signature for the list of signers.
#[must_use]
fn verify_aggregate(
&self,
signers: &[Self::ValidatorId],
msg: &[u8],
sig: &Self::AggregateSignature,
) -> bool;
}
/// A commit for a specific block. The list of validators have weight exceeding the threshold for
/// a valid commit.
#[derive(Clone, PartialEq, Debug, Encode, Decode)]
pub struct Commit<S: SignatureScheme> {
/// End time of the round, used as the start time of next round.
pub end_time: u64,
/// Validators participating in the signature.
pub validators: Vec<S::ValidatorId>,
/// Aggregate signature.
pub signature: S::AggregateSignature,
}
/// Weights for the validators present.
pub trait Weights: Send + Sync {
type ValidatorId: ValidatorId;
/// Total weight of all validators.
fn total_weight(&self) -> u64;
/// Weight for a specific validator.
fn weight(&self, validator: Self::ValidatorId) -> u64;
/// Threshold needed for BFT consensus.
fn threshold(&self) -> u64 {
((self.total_weight() * 2) / 3) + 1
}
/// Threshold preventing BFT consensus.
fn fault_thresold(&self) -> u64 {
(self.total_weight() - self.threshold()) + 1
}
/// Weighted round robin function.
fn proposer(&self, number: BlockNumber, round: Round) -> Self::ValidatorId;
}
/// Simplified error enum representing a block's validity.
#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, Decode)]
pub enum BlockError {
/// Malformed block which is wholly invalid.
Fatal,
/// Valid block by syntax, with semantics which may or may not be valid yet are locally
/// considered invalid. If a block fails to validate with this, a slash will not be triggered.
Temporal,
}
/// Trait representing a Block.
pub trait Block: Send + Sync + Clone + PartialEq + Debug + Encode + Decode {
// Type used to identify blocks. Presumably a cryptographic hash of the block.
type Id: Send + Sync + Copy + Clone + PartialEq + AsRef<[u8]> + Debug + Encode + Decode;
/// Return the deterministic, unique ID for this block.
fn id(&self) -> Self::Id;
}
#[cfg(feature = "substrate")]
impl<B: sp_runtime::traits::Block> Block for B {
type Id = B::Hash;
fn id(&self) -> B::Hash {
self.hash()
}
}
/// Trait representing the distributed system Tendermint is providing consensus over.
#[async_trait::async_trait]
pub trait Network: Send + Sync {
// Type used to identify validators.
type ValidatorId: ValidatorId;
/// Signature scheme used by validators.
type SignatureScheme: SignatureScheme<ValidatorId = Self::ValidatorId>;
/// Object representing the weights of validators.
type Weights: Weights<ValidatorId = Self::ValidatorId>;
/// Type used for ordered blocks of information.
type Block: Block;
// Block time in seconds
const BLOCK_TIME: u32;
/// Return the signature scheme in use. The instance is expected to have the validators' public
/// keys, along with an instance of the private key of the current validator.
fn signature_scheme(&self) -> Arc<Self::SignatureScheme>;
/// Return a reference to the validators' weights.
fn weights(&self) -> Arc<Self::Weights>;
/// Verify a commit for a given block. Intended for use when syncing or when not an active
/// validator.
#[must_use]
fn verify_commit(
&self,
id: <Self::Block as Block>::Id,
commit: &Commit<Self::SignatureScheme>,
) -> bool {
if !self.signature_scheme().verify_aggregate(
&commit.validators,
&commit_msg(commit.end_time, id.as_ref()),
&commit.signature,
) {
return false;
}
let weights = self.weights();
commit.validators.iter().map(|v| weights.weight(*v)).sum::<u64>() >= weights.threshold()
}
/// Broadcast a message to the other validators. If authenticated channels have already been
/// established, this will double-authenticate. Switching to unauthenticated channels in a system
/// already providing authenticated channels is not recommended as this is a minor, temporal
/// inefficiency while downgrading channels may have wider implications.
async fn broadcast(
&mut self,
msg: SignedMessage<
Self::ValidatorId,
Self::Block,
<Self::SignatureScheme as SignatureScheme>::Signature,
>,
);
/// Trigger a slash for the validator in question who was definitively malicious.
/// The exact process of triggering a slash is undefined and left to the network as a whole.
// TODO: This is spammed right now.
async fn slash(&mut self, validator: Self::ValidatorId);
/// Validate a block.
async fn validate(&mut self, block: &Self::Block) -> Result<(), BlockError>;
/// Add a block, returning the proposal for the next one. It's possible a block, which was never
/// validated or even failed validation, may be passed here if a supermajority of validators did
/// consider it valid and created a commit for it. This deviates from the paper which will have a
/// local node refuse to decide on a block it considers invalid. This library acknowledges the
/// network did decide on it, leaving handling of it to the network, and outside of this scope.
async fn add_block(
&mut self,
block: Self::Block,
commit: Commit<Self::SignatureScheme>,
) -> Self::Block;
}

View File

@@ -0,0 +1,532 @@
use core::fmt::Debug;
use std::{
sync::Arc,
time::{UNIX_EPOCH, SystemTime, Instant, Duration},
collections::HashMap,
};
use parity_scale_codec::{Encode, Decode};
use tokio::{
task::{JoinHandle, yield_now},
sync::{
RwLock,
mpsc::{self, error::TryRecvError},
},
time::sleep,
};
/// Traits and types of the external network being integrated with to provide consensus over.
pub mod ext;
use ext::*;
mod message_log;
use message_log::MessageLog;
pub(crate) fn commit_msg(end_time: u64, id: &[u8]) -> Vec<u8> {
[&end_time.to_le_bytes(), id].concat().to_vec()
}
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Encode, Decode)]
enum Step {
Propose,
Prevote,
Precommit,
}
#[derive(Clone, Debug, Encode, Decode)]
enum Data<B: Block, S: Signature> {
Proposal(Option<Round>, B),
Prevote(Option<B::Id>),
Precommit(Option<(B::Id, S)>),
}
impl<B: Block, S: Signature> PartialEq for Data<B, S> {
fn eq(&self, other: &Data<B, S>) -> bool {
match (self, other) {
(Data::Proposal(r, b), Data::Proposal(r2, b2)) => (r == r2) && (b == b2),
(Data::Prevote(i), Data::Prevote(i2)) => i == i2,
(Data::Precommit(None), Data::Precommit(None)) => true,
(Data::Precommit(Some((i, _))), Data::Precommit(Some((i2, _)))) => i == i2,
_ => false,
}
}
}
impl<B: Block, S: Signature> Data<B, S> {
fn step(&self) -> Step {
match self {
Data::Proposal(..) => Step::Propose,
Data::Prevote(..) => Step::Prevote,
Data::Precommit(..) => Step::Precommit,
}
}
}
#[derive(Clone, PartialEq, Debug, Encode, Decode)]
struct Message<V: ValidatorId, B: Block, S: Signature> {
sender: V,
number: BlockNumber,
round: Round,
data: Data<B, S>,
}
/// A signed Tendermint consensus message to be broadcast to the other validators.
#[derive(Clone, PartialEq, Debug, Encode, Decode)]
pub struct SignedMessage<V: ValidatorId, B: Block, S: Signature> {
msg: Message<V, B, S>,
sig: S,
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
enum TendermintError<V: ValidatorId> {
Malicious(V),
Temporal,
}
/// A machine executing the Tendermint protocol.
pub struct TendermintMachine<N: Network> {
network: Arc<RwLock<N>>,
signer: Arc<N::SignatureScheme>,
weights: Arc<N::Weights>,
proposer: N::ValidatorId,
number: BlockNumber,
canonical_start_time: u64,
start_time: Instant,
personal_proposal: N::Block,
queue: Vec<(
bool,
Message<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
)>,
log: MessageLog<N>,
round: Round,
end_time: HashMap<Round, Instant>,
step: Step,
locked: Option<(Round, <N::Block as Block>::Id)>,
valid: Option<(Round, N::Block)>,
timeouts: HashMap<Step, Instant>,
}
/// A handle to an asynchronous task, along with a channel to inform of it of messages received.
pub struct TendermintHandle<N: Network> {
/// Channel to send messages received from the P2P layer.
pub messages: mpsc::Sender<
SignedMessage<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
>,
/// Handle for the asynchronous task executing the machine. The task will automatically exit
/// when the channel is dropped.
pub handle: JoinHandle<()>,
}
impl<N: Network + 'static> TendermintMachine<N> {
// Get the canonical end time for a given round, represented as seconds since the epoch
// While we have the Instant already in end_time, converting it to a SystemTime would be lossy,
// potentially enough to cause a consensus failure. Independently tracking this variable ensures
// that won't happen
fn canonical_end_time(&self, round: Round) -> u64 {
let mut time = self.canonical_start_time;
for r in 0 .. u64::from(round.0 + 1) {
time += (r + 1) * u64::from(N::BLOCK_TIME);
}
time
}
fn timeout(&self, step: Step) -> Instant {
let mut round_time = Duration::from_secs(N::BLOCK_TIME.into());
round_time *= self.round.0 + 1;
let step_time = round_time / 3;
let offset = match step {
Step::Propose => step_time,
Step::Prevote => step_time * 2,
Step::Precommit => step_time * 3,
};
self.start_time + offset
}
fn broadcast(
&mut self,
data: Data<N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
) {
let step = data.step();
// 27, 33, 41, 46, 60, 64
self.step = step;
self.queue.push((
true,
Message { sender: self.proposer, number: self.number, round: self.round, data },
));
}
// 14-21
fn round_propose(&mut self) -> bool {
if self.weights.proposer(self.number, self.round) == self.proposer {
let (round, block) = self
.valid
.clone()
.map(|(r, b)| (Some(r), b))
.unwrap_or((None, self.personal_proposal.clone()));
self.broadcast(Data::Proposal(round, block));
true
} else {
self.timeouts.insert(Step::Propose, self.timeout(Step::Propose));
false
}
}
fn round(&mut self, round: Round) -> bool {
// Correct the start time
for r in self.round.0 .. round.0 {
let end = self.timeout(Step::Precommit);
self.end_time.insert(Round(r), end);
self.start_time = end;
}
// 11-13
// Clear timeouts
self.timeouts = HashMap::new();
self.round = round;
self.end_time.insert(round, self.timeout(Step::Precommit));
self.step = Step::Propose;
self.round_propose()
}
// 53-54
async fn reset(&mut self, end_round: Round, proposal: N::Block) {
// Wait for the next block interval
let round_end = self.end_time[&end_round];
sleep(round_end.saturating_duration_since(Instant::now())).await;
self.number.0 += 1;
self.canonical_start_time = self.canonical_end_time(end_round);
self.start_time = round_end;
self.personal_proposal = proposal;
self.queue = self.queue.drain(..).filter(|msg| msg.1.number == self.number).collect();
self.log = MessageLog::new(self.network.read().await.weights());
self.end_time = HashMap::new();
self.locked = None;
self.valid = None;
self.round(Round(0));
}
/// Create a new Tendermint machine, for the specified proposer, from the specified block, with
/// the specified block as the one to propose next, returning a handle for the machine.
#[allow(clippy::new_ret_no_self)]
pub fn new(
network: N,
proposer: N::ValidatorId,
start: (BlockNumber, u64),
proposal: N::Block,
) -> TendermintHandle<N> {
// Convert the start time to an instant
// This is imprecise yet should be precise enough
let start_time = {
let instant_now = Instant::now();
let sys_now = SystemTime::now();
instant_now -
sys_now
.duration_since(UNIX_EPOCH + Duration::from_secs(start.1))
.unwrap_or(Duration::ZERO)
};
let (msg_send, mut msg_recv) = mpsc::channel(100); // Backlog to accept. Currently arbitrary
TendermintHandle {
messages: msg_send,
handle: tokio::spawn(async move {
let signer = network.signature_scheme();
let weights = network.weights();
let network = Arc::new(RwLock::new(network));
// 01-10
let mut machine = TendermintMachine {
network,
signer,
weights: weights.clone(),
proposer,
number: start.0,
canonical_start_time: start.1,
start_time,
personal_proposal: proposal,
queue: vec![],
log: MessageLog::new(weights),
round: Round(0),
end_time: HashMap::new(),
step: Step::Propose,
locked: None,
valid: None,
timeouts: HashMap::new(),
};
machine.round(Round(0));
loop {
// Check if any timeouts have been triggered
let now = Instant::now();
let (t1, t2, t3) = {
let ready = |step| machine.timeouts.get(&step).unwrap_or(&now) < &now;
(ready(Step::Propose), ready(Step::Prevote), ready(Step::Precommit))
};
// Propose timeout
if t1 && (machine.step == Step::Propose) {
machine.broadcast(Data::Prevote(None));
}
// Prevote timeout
if t2 && (machine.step == Step::Prevote) {
machine.broadcast(Data::Precommit(None));
}
// Precommit timeout
if t3 {
machine.round(Round(machine.round.0.wrapping_add(1)));
}
// Drain the channel of messages
let mut broken = false;
loop {
match msg_recv.try_recv() {
Ok(msg) => {
if !machine.signer.verify(msg.msg.sender, &msg.msg.encode(), &msg.sig) {
continue;
}
machine.queue.push((false, msg.msg));
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => broken = true,
}
}
if broken {
break;
}
// Handle the queue
let mut queue = machine.queue.drain(..).collect::<Vec<_>>();
for (broadcast, msg) in queue.drain(..) {
let res = machine.message(msg.clone()).await;
if res.is_err() && broadcast {
panic!("honest node had invalid behavior");
}
match res {
Ok(None) => (),
Ok(Some(block)) => {
let mut validators = vec![];
let mut sigs = vec![];
for (v, sig) in machine.log.precommitted.iter().filter_map(|(k, (id, sig))| {
Some((*k, sig.clone())).filter(|_| id == &block.id())
}) {
validators.push(v);
sigs.push(sig);
}
let commit = Commit {
end_time: machine.canonical_end_time(msg.round),
validators,
signature: N::SignatureScheme::aggregate(&sigs),
};
debug_assert!(machine.network.read().await.verify_commit(block.id(), &commit));
let proposal = machine.network.write().await.add_block(block, commit).await;
machine.reset(msg.round, proposal).await;
}
Err(TendermintError::Malicious(validator)) => {
machine.network.write().await.slash(validator).await;
}
Err(TendermintError::Temporal) => (),
}
if broadcast {
let sig = machine.signer.sign(&msg.encode());
machine.network.write().await.broadcast(SignedMessage { msg, sig }).await;
}
}
yield_now().await;
}
}),
}
}
async fn message(
&mut self,
msg: Message<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
) -> Result<Option<N::Block>, TendermintError<N::ValidatorId>> {
if msg.number != self.number {
Err(TendermintError::Temporal)?;
}
// Verify the end time and signature if this is a precommit
if let Data::Precommit(Some((id, sig))) = &msg.data {
if !self.signer.verify(
msg.sender,
&commit_msg(self.canonical_end_time(msg.round), id.as_ref()),
sig,
) {
// Since we verified this validator actually sent the message, they're malicious
Err(TendermintError::Malicious(msg.sender))?;
}
}
// Only let the proposer propose
if matches!(msg.data, Data::Proposal(..)) &&
(msg.sender != self.weights.proposer(msg.number, msg.round))
{
Err(TendermintError::Malicious(msg.sender))?;
};
if !self.log.log(msg.clone())? {
return Ok(None);
}
// All functions, except for the finalizer and the jump, are locked to the current round
// Run the finalizer to see if it applies
// 49-52
if matches!(msg.data, Data::Proposal(..)) || matches!(msg.data, Data::Precommit(_)) {
let proposer = self.weights.proposer(self.number, msg.round);
// Get the proposal
if let Some(Data::Proposal(_, block)) = self.log.get(msg.round, proposer, Step::Propose) {
// Check if it has gotten a sufficient amount of precommits
// Use a junk signature since message equality disregards the signature
if self
.log
.has_consensus(msg.round, Data::Precommit(Some((block.id(), self.signer.sign(&[])))))
{
return Ok(Some(block.clone()));
}
}
}
// Else, check if we need to jump ahead
#[allow(clippy::comparison_chain)]
if msg.round.0 < self.round.0 {
// Prior round, disregard if not finalizing
return Ok(None);
} else if msg.round.0 > self.round.0 {
// 55-56
// Jump, enabling processing by the below code
if self.log.round_participation(self.round) > self.weights.fault_thresold() {
// If we're the proposer, return to avoid a double process
if self.round(msg.round) {
return Ok(None);
}
} else {
// Future round which we aren't ready to jump to, so return for now
return Ok(None);
}
}
// The paper executes these checks when the step is prevote. Making sure this message warrants
// rerunning these checks is a sane optimization since message instances is a full iteration
// of the round map
if (self.step == Step::Prevote) && matches!(msg.data, Data::Prevote(_)) {
let (participation, weight) = self.log.message_instances(self.round, Data::Prevote(None));
// 34-35
if participation >= self.weights.threshold() {
let timeout = self.timeout(Step::Prevote);
self.timeouts.entry(Step::Prevote).or_insert(timeout);
}
// 44-46
if weight >= self.weights.threshold() {
self.broadcast(Data::Precommit(None));
return Ok(None);
}
}
// 47-48
if matches!(msg.data, Data::Precommit(_)) &&
self.log.has_participation(self.round, Step::Precommit)
{
let timeout = self.timeout(Step::Precommit);
self.timeouts.entry(Step::Precommit).or_insert(timeout);
}
let proposer = self.weights.proposer(self.number, self.round);
if let Some(Data::Proposal(vr, block)) = self.log.get(self.round, proposer, Step::Propose) {
// 22-33
if self.step == Step::Propose {
// Delay error handling (triggering a slash) until after we vote.
let (valid, err) = match self.network.write().await.validate(block).await {
Ok(_) => (true, Ok(None)),
Err(BlockError::Temporal) => (false, Ok(None)),
Err(BlockError::Fatal) => (false, Err(TendermintError::Malicious(proposer))),
};
// Create a raw vote which only requires block validity as a basis for the actual vote.
let raw_vote = Some(block.id()).filter(|_| valid);
// If locked is none, it has a round of -1 according to the protocol. That satisfies
// 23 and 29. If it's some, both are satisfied if they're for the same ID. If it's some
// with different IDs, the function on 22 rejects yet the function on 28 has one other
// condition
let locked = self.locked.as_ref().map(|(_, id)| id == &block.id()).unwrap_or(true);
let mut vote = raw_vote.filter(|_| locked);
if let Some(vr) = vr {
// Malformed message
if vr.0 >= self.round.0 {
Err(TendermintError::Malicious(msg.sender))?;
}
if self.log.has_consensus(*vr, Data::Prevote(Some(block.id()))) {
// Allow differing locked values if the proposal has a newer valid round
// This is the other condition described above
if let Some((locked_round, _)) = self.locked.as_ref() {
vote = vote.or_else(|| raw_vote.filter(|_| locked_round.0 <= vr.0));
}
self.broadcast(Data::Prevote(vote));
return err;
}
} else {
self.broadcast(Data::Prevote(vote));
return err;
}
} else if self.valid.as_ref().map(|(round, _)| round != &self.round).unwrap_or(true) {
// 36-43
// The run once condition is implemented above. Sinve valid will always be set, it not
// being set, or only being set historically, means this has yet to be run
if self.log.has_consensus(self.round, Data::Prevote(Some(block.id()))) {
match self.network.write().await.validate(block).await {
Ok(_) => (),
Err(BlockError::Temporal) => (),
Err(BlockError::Fatal) => Err(TendermintError::Malicious(proposer))?,
};
self.valid = Some((self.round, block.clone()));
if self.step == Step::Prevote {
self.locked = Some((self.round, block.id()));
self.broadcast(Data::Precommit(Some((
block.id(),
self
.signer
.sign(&commit_msg(self.canonical_end_time(self.round), block.id().as_ref())),
))));
return Ok(None);
}
}
}
}
Ok(None)
}
}

View File

@@ -0,0 +1,117 @@
use std::{sync::Arc, collections::HashMap};
use crate::{ext::*, Round, Step, Data, Message, TendermintError};
pub(crate) struct MessageLog<N: Network> {
weights: Arc<N::Weights>,
pub(crate) precommitted: HashMap<
N::ValidatorId,
(<N::Block as Block>::Id, <N::SignatureScheme as SignatureScheme>::Signature),
>,
log: HashMap<
Round,
HashMap<
N::ValidatorId,
HashMap<Step, Data<N::Block, <N::SignatureScheme as SignatureScheme>::Signature>>,
>,
>,
}
impl<N: Network> MessageLog<N> {
pub(crate) fn new(weights: Arc<N::Weights>) -> MessageLog<N> {
MessageLog { weights, precommitted: HashMap::new(), log: HashMap::new() }
}
// Returns true if it's a new message
pub(crate) fn log(
&mut self,
msg: Message<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
) -> Result<bool, TendermintError<N::ValidatorId>> {
let round = self.log.entry(msg.round).or_insert_with(HashMap::new);
let msgs = round.entry(msg.sender).or_insert_with(HashMap::new);
// Handle message replays without issue. It's only multiple messages which is malicious
let step = msg.data.step();
if let Some(existing) = msgs.get(&step) {
if existing != &msg.data {
Err(TendermintError::Malicious(msg.sender))?;
}
return Ok(false);
}
// If they already precommitted to a distinct hash, error
if let Data::Precommit(Some((hash, sig))) = &msg.data {
if let Some((prev, _)) = self.precommitted.get(&msg.sender) {
if hash != prev {
Err(TendermintError::Malicious(msg.sender))?;
}
}
self.precommitted.insert(msg.sender, (*hash, sig.clone()));
}
msgs.insert(step, msg.data);
Ok(true)
}
// For a given round, return the participating weight for this step, and the weight agreeing with
// the data.
pub(crate) fn message_instances(
&self,
round: Round,
data: Data<N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
) -> (u64, u64) {
let mut participating = 0;
let mut weight = 0;
for (participant, msgs) in &self.log[&round] {
if let Some(msg) = msgs.get(&data.step()) {
let validator_weight = self.weights.weight(*participant);
participating += validator_weight;
if &data == msg {
weight += validator_weight;
}
}
}
(participating, weight)
}
// Get the participation in a given round
pub(crate) fn round_participation(&self, round: Round) -> u64 {
let mut weight = 0;
if let Some(round) = self.log.get(&round) {
for participant in round.keys() {
weight += self.weights.weight(*participant);
}
};
weight
}
// Check if a supermajority of nodes have participated on a specific step
pub(crate) fn has_participation(&self, round: Round, step: Step) -> bool {
let mut participating = 0;
for (participant, msgs) in &self.log[&round] {
if msgs.get(&step).is_some() {
participating += self.weights.weight(*participant);
}
}
participating >= self.weights.threshold()
}
// Check if consensus has been reached on a specific piece of data
pub(crate) fn has_consensus(
&self,
round: Round,
data: Data<N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
) -> bool {
let (_, weight) = self.message_instances(round, data);
weight >= self.weights.threshold()
}
pub(crate) fn get(
&self,
round: Round,
sender: N::ValidatorId,
step: Step,
) -> Option<&Data<N::Block, <N::SignatureScheme as SignatureScheme>::Signature>> {
self.log.get(&round).and_then(|round| round.get(&sender).and_then(|msgs| msgs.get(&step)))
}
}

View File

@@ -0,0 +1,153 @@
use std::{
sync::Arc,
time::{UNIX_EPOCH, SystemTime, Duration},
};
use parity_scale_codec::{Encode, Decode};
use tokio::{sync::RwLock, time::sleep};
use tendermint_machine::{ext::*, SignedMessage, TendermintMachine, TendermintHandle};
type TestValidatorId = u16;
type TestBlockId = [u8; 4];
struct TestSignatureScheme(u16);
impl SignatureScheme for TestSignatureScheme {
type ValidatorId = TestValidatorId;
type Signature = [u8; 32];
type AggregateSignature = Vec<[u8; 32]>;
fn sign(&self, msg: &[u8]) -> [u8; 32] {
let mut sig = [0; 32];
sig[.. 2].copy_from_slice(&self.0.to_le_bytes());
sig[2 .. (2 + 30.min(msg.len()))].copy_from_slice(&msg[.. 30.min(msg.len())]);
sig
}
#[must_use]
fn verify(&self, validator: u16, msg: &[u8], sig: &[u8; 32]) -> bool {
(sig[.. 2] == validator.to_le_bytes()) && (&sig[2 ..] == &[msg, &[0; 30]].concat()[.. 30])
}
fn aggregate(sigs: &[[u8; 32]]) -> Vec<[u8; 32]> {
sigs.to_vec()
}
#[must_use]
fn verify_aggregate(
&self,
signers: &[TestValidatorId],
msg: &[u8],
sigs: &Vec<[u8; 32]>,
) -> bool {
assert_eq!(signers.len(), sigs.len());
for sig in signers.iter().zip(sigs.iter()) {
assert!(self.verify(*sig.0, msg, sig.1));
}
true
}
}
struct TestWeights;
impl Weights for TestWeights {
type ValidatorId = TestValidatorId;
fn total_weight(&self) -> u64 {
4
}
fn weight(&self, id: TestValidatorId) -> u64 {
[1; 4][usize::try_from(id).unwrap()]
}
fn proposer(&self, number: BlockNumber, round: Round) -> TestValidatorId {
TestValidatorId::try_from((number.0 + u64::from(round.0)) % 4).unwrap()
}
}
#[derive(Clone, PartialEq, Debug, Encode, Decode)]
struct TestBlock {
id: TestBlockId,
valid: Result<(), BlockError>,
}
impl Block for TestBlock {
type Id = TestBlockId;
fn id(&self) -> TestBlockId {
self.id
}
}
struct TestNetwork(u16, Arc<RwLock<Vec<TendermintHandle<Self>>>>);
#[async_trait::async_trait]
impl Network for TestNetwork {
type ValidatorId = TestValidatorId;
type SignatureScheme = TestSignatureScheme;
type Weights = TestWeights;
type Block = TestBlock;
const BLOCK_TIME: u32 = 1;
fn signature_scheme(&self) -> Arc<TestSignatureScheme> {
Arc::new(TestSignatureScheme(self.0))
}
fn weights(&self) -> Arc<TestWeights> {
Arc::new(TestWeights)
}
async fn broadcast(&mut self, msg: SignedMessage<TestValidatorId, Self::Block, [u8; 32]>) {
for handle in self.1.write().await.iter_mut() {
handle.messages.send(msg.clone()).await.unwrap();
}
}
async fn slash(&mut self, _: TestValidatorId) {
dbg!("Slash");
todo!()
}
async fn validate(&mut self, block: &TestBlock) -> Result<(), BlockError> {
block.valid
}
async fn add_block(
&mut self,
block: TestBlock,
commit: Commit<TestSignatureScheme>,
) -> TestBlock {
dbg!("Adding ", &block);
assert!(block.valid.is_ok());
assert!(self.verify_commit(block.id(), &commit));
TestBlock { id: (u32::from_le_bytes(block.id) + 1).to_le_bytes(), valid: Ok(()) }
}
}
impl TestNetwork {
async fn new(validators: usize) -> Arc<RwLock<Vec<TendermintHandle<Self>>>> {
let arc = Arc::new(RwLock::new(vec![]));
{
let mut write = arc.write().await;
for i in 0 .. validators {
let i = u16::try_from(i).unwrap();
write.push(TendermintMachine::new(
TestNetwork(i, arc.clone()),
i,
(BlockNumber(1), (SystemTime::now().duration_since(UNIX_EPOCH)).unwrap().as_secs()),
TestBlock { id: 1u32.to_le_bytes(), valid: Ok(()) },
));
}
}
arc
}
}
#[tokio::test]
async fn test() {
TestNetwork::new(4).await;
for _ in 0 .. 10 {
sleep(Duration::from_secs(1)).await;
}
}