Files
serai/substrate/tendermint/src/lib.rs

450 lines
14 KiB
Rust
Raw Normal View History

use core::fmt::Debug;
2022-10-16 07:54:07 -04:00
use std::{
sync::Arc,
time::{Instant, Duration},
collections::HashMap,
};
2022-10-16 07:30:11 -04:00
use parity_scale_codec::{Encode, Decode};
2022-10-16 07:30:11 -04:00
use tokio::{
task::{JoinHandle, yield_now},
sync::{
RwLock,
mpsc::{self, error::TryRecvError},
},
};
2022-10-17 08:07:23 -04:00
/// Traits and types of the external network being integrated with to provide consensus over.
pub mod ext;
use ext::*;
2022-10-02 23:23:58 -04:00
mod message_log;
2022-10-16 07:30:11 -04:00
use message_log::MessageLog;
2022-10-12 21:36:40 -04:00
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Encode, Decode)]
2022-10-02 23:23:58 -04:00
enum Step {
Propose,
Prevote,
Precommit,
}
#[derive(Clone, Debug, Encode, Decode)]
enum Data<B: Block, S: Signature> {
2022-10-16 07:30:11 -04:00
Proposal(Option<Round>, B),
Prevote(Option<B::Id>),
Precommit(Option<(B::Id, S)>),
}
impl<B: Block, S: Signature> PartialEq for Data<B, S> {
fn eq(&self, other: &Data<B, S>) -> bool {
match (self, other) {
(Data::Proposal(r, b), Data::Proposal(r2, b2)) => (r == r2) && (b == b2),
(Data::Prevote(i), Data::Prevote(i2)) => i == i2,
(Data::Precommit(None), Data::Precommit(None)) => true,
(Data::Precommit(Some((i, _))), Data::Precommit(Some((i2, _)))) => i == i2,
_ => false,
}
}
2022-10-02 23:23:58 -04:00
}
impl<B: Block, S: Signature> Data<B, S> {
2022-10-02 23:23:58 -04:00
fn step(&self) -> Step {
match self {
Data::Proposal(..) => Step::Propose,
Data::Prevote(..) => Step::Prevote,
Data::Precommit(..) => Step::Precommit,
}
}
}
#[derive(Clone, PartialEq, Debug, Encode, Decode)]
2022-10-17 08:07:23 -04:00
struct Message<V: ValidatorId, B: Block, S: Signature> {
sender: V,
2022-10-02 23:23:58 -04:00
number: BlockNumber,
round: Round,
2022-10-02 23:23:58 -04:00
data: Data<B, S>,
2022-10-02 23:23:58 -04:00
}
2022-10-17 08:07:23 -04:00
/// A signed Tendermint consensus message to be broadcast to the other validators.
#[derive(Clone, PartialEq, Debug, Encode, Decode)]
pub struct SignedMessage<V: ValidatorId, B: Block, S: Signature> {
msg: Message<V, B, S>,
sig: S,
}
2022-10-17 08:07:23 -04:00
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
enum TendermintError<V: ValidatorId> {
Malicious(V),
2022-10-02 23:23:58 -04:00
Temporal,
}
2022-10-17 08:07:23 -04:00
/// A machine executing the Tendermint protocol.
2022-10-16 07:30:11 -04:00
pub struct TendermintMachine<N: Network> {
network: Arc<RwLock<N>>,
signer: Arc<N::SignatureScheme>,
2022-10-16 07:30:11 -04:00
weights: Arc<N::Weights>,
proposer: N::ValidatorId,
2022-10-02 23:23:58 -04:00
2022-10-16 07:30:11 -04:00
number: BlockNumber,
2022-10-16 07:54:07 -04:00
start_time: Instant,
2022-10-16 07:30:11 -04:00
personal_proposal: N::Block,
2022-10-02 23:23:58 -04:00
2022-10-16 07:30:11 -04:00
log: MessageLog<N>,
round: Round,
2022-10-02 23:23:58 -04:00
step: Step,
2022-10-12 21:36:40 -04:00
2022-10-16 07:30:11 -04:00
locked: Option<(Round, N::Block)>,
valid: Option<(Round, N::Block)>,
timeouts: HashMap<Step, Instant>,
2022-10-12 21:36:40 -04:00
}
2022-10-17 08:07:23 -04:00
/// A handle to an asynchronous task, along with a channel to inform of it of messages received.
2022-10-16 07:30:11 -04:00
pub struct TendermintHandle<N: Network> {
2022-10-17 08:07:23 -04:00
/// Channel to send messages received from the P2P layer.
pub messages: mpsc::Sender<
SignedMessage<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
>,
2022-10-17 08:07:23 -04:00
/// Handle for the asynchronous task executing the machine. The task will automatically exit
/// when the channel is dropped.
2022-10-16 07:30:11 -04:00
pub handle: JoinHandle<()>,
2022-10-02 23:23:58 -04:00
}
2022-10-16 07:30:11 -04:00
impl<N: Network + 'static> TendermintMachine<N> {
fn timeout(&self, step: Step) -> Instant {
2022-10-16 07:54:07 -04:00
let mut round_time = Duration::from_secs(N::BLOCK_TIME.into());
2022-10-17 08:07:23 -04:00
round_time *= self.round.0 + 1;
2022-10-16 07:54:07 -04:00
let step_time = round_time / 3;
let offset = match step {
Step::Propose => step_time,
Step::Prevote => step_time * 2,
Step::Precommit => step_time * 3,
};
self.start_time + offset
2022-10-16 07:30:11 -04:00
}
#[async_recursion::async_recursion]
async fn broadcast(
&mut self,
data: Data<N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
) -> Option<N::Block> {
2022-10-16 09:09:14 -04:00
let step = data.step();
let msg = Message { sender: self.proposer, number: self.number, round: self.round, data };
2022-10-16 07:30:11 -04:00
let res = self.message(msg.clone()).await.unwrap();
2022-10-16 09:09:14 -04:00
self.step = step; // TODO: Before or after the above handling call?
let sig = self.signer.sign(&msg.encode());
self.network.write().await.broadcast(SignedMessage { msg, sig }).await;
2022-10-02 23:23:58 -04:00
res
}
// 14-21
2022-10-16 07:30:11 -04:00
async fn round_propose(&mut self) {
if self.weights.proposer(self.number, self.round) == self.proposer {
let (round, block) = if let Some((round, block)) = &self.valid {
(Some(*round), block.clone())
2022-10-02 23:23:58 -04:00
} else {
2022-10-16 07:30:11 -04:00
(None, self.personal_proposal.clone())
2022-10-02 23:23:58 -04:00
};
2022-10-16 07:30:11 -04:00
debug_assert!(self.broadcast(Data::Proposal(round, block)).await.is_none());
2022-10-02 23:23:58 -04:00
} else {
2022-10-16 07:30:11 -04:00
self.timeouts.insert(Step::Propose, self.timeout(Step::Propose));
2022-10-02 23:23:58 -04:00
}
}
// 11-13
2022-10-16 07:30:11 -04:00
async fn round(&mut self, round: Round) {
2022-10-16 09:16:44 -04:00
dbg!(round);
2022-10-16 09:09:14 -04:00
2022-10-16 07:54:07 -04:00
// Correct the start time
for _ in self.round.0 .. round.0 {
self.start_time = self.timeout(Step::Precommit);
}
2022-10-16 09:16:44 -04:00
// Clear timeouts
self.timeouts = HashMap::new();
2022-10-02 23:23:58 -04:00
self.round = round;
2022-10-16 09:16:44 -04:00
self.step = Step::Propose;
2022-10-16 07:30:11 -04:00
self.round_propose().await;
2022-10-02 23:23:58 -04:00
}
// 1-9
2022-10-16 07:30:11 -04:00
async fn reset(&mut self, proposal: N::Block) {
self.number.0 += 1;
2022-10-16 07:54:07 -04:00
self.start_time = Instant::now();
2022-10-16 07:30:11 -04:00
self.personal_proposal = proposal;
2022-10-02 23:23:58 -04:00
2022-10-16 07:30:11 -04:00
self.log = MessageLog::new(self.network.read().await.weights());
2022-10-02 23:23:58 -04:00
self.locked = None;
self.valid = None;
2022-10-16 07:30:11 -04:00
self.round(Round(0)).await;
2022-10-02 23:23:58 -04:00
}
2022-10-17 08:07:23 -04:00
/// Create a new Tendermint machine, for the specified proposer, from the specified block, with
/// the specified block as the one to propose next, returning a handle for the machine.
2022-10-02 23:23:58 -04:00
// 10
2022-10-16 09:16:44 -04:00
#[allow(clippy::new_ret_no_self)]
2022-10-16 07:30:11 -04:00
pub fn new(
network: N,
proposer: N::ValidatorId,
number: BlockNumber,
proposal: N::Block,
) -> TendermintHandle<N> {
2022-10-12 21:36:40 -04:00
let (msg_send, mut msg_recv) = mpsc::channel(100); // Backlog to accept. Currently arbitrary
TendermintHandle {
messages: msg_send,
2022-10-16 07:30:11 -04:00
handle: tokio::spawn(async move {
let signer = network.signature_scheme();
2022-10-16 07:30:11 -04:00
let weights = network.weights();
let network = Arc::new(RwLock::new(network));
let mut machine = TendermintMachine {
network,
signer,
2022-10-16 07:30:11 -04:00
weights: weights.clone(),
2022-10-12 21:36:40 -04:00
proposer,
number,
2022-10-16 07:54:07 -04:00
start_time: Instant::now(),
2022-10-16 07:30:11 -04:00
personal_proposal: proposal,
2022-10-12 21:36:40 -04:00
2022-10-16 07:30:11 -04:00
log: MessageLog::new(weights),
round: Round(0),
step: Step::Propose,
2022-10-12 21:36:40 -04:00
locked: None,
valid: None,
2022-10-16 07:30:11 -04:00
timeouts: HashMap::new(),
2022-10-12 21:36:40 -04:00
};
2022-10-16 07:30:11 -04:00
machine.round_propose().await;
2022-10-12 21:36:40 -04:00
loop {
2022-10-16 07:30:11 -04:00
// Check if any timeouts have been triggered
2022-10-12 21:36:40 -04:00
let now = Instant::now();
let (t1, t2, t3) = {
2022-10-16 07:30:11 -04:00
let ready = |step| machine.timeouts.get(&step).unwrap_or(&now) < &now;
(ready(Step::Propose), ready(Step::Prevote), ready(Step::Precommit))
};
2022-10-02 23:23:58 -04:00
2022-10-16 07:30:11 -04:00
// Propose timeout
2022-10-16 09:09:14 -04:00
if t1 && (machine.step == Step::Propose) {
debug_assert!(machine.broadcast(Data::Prevote(None)).await.is_none());
}
2022-10-16 07:30:11 -04:00
// Prevote timeout
2022-10-16 09:09:14 -04:00
if t2 && (machine.step == Step::Prevote) {
debug_assert!(machine.broadcast(Data::Precommit(None)).await.is_none());
}
2022-10-16 07:30:11 -04:00
// Precommit timeout
if t3 {
2022-10-16 09:09:14 -04:00
machine.round(Round(machine.round.0 + 1)).await;
}
2022-10-02 23:23:58 -04:00
2022-10-16 07:30:11 -04:00
// If there's a message, handle it
match msg_recv.try_recv() {
Ok(msg) => {
if !machine.signer.verify(msg.msg.sender, &msg.msg.encode(), msg.sig) {
yield_now().await;
continue;
2022-10-16 07:30:11 -04:00
}
match machine.message(msg.msg).await {
Ok(None) => (),
Ok(Some(block)) => {
let mut validators = vec![];
let mut sigs = vec![];
for (v, sig) in machine.log.precommitted.iter().filter_map(|(k, (id, sig))| {
Some((*k, sig.clone())).filter(|_| id == &block.id())
}) {
validators.push(v);
sigs.push(sig);
}
2022-10-17 08:07:23 -04:00
let commit =
Commit { validators, signature: N::SignatureScheme::aggregate(&sigs) };
debug_assert!(machine.network.read().await.verify_commit(block.id(), &commit));
let proposal = machine.network.write().await.add_block(block, commit);
machine.reset(proposal).await
}
Err(TendermintError::Malicious(validator)) => {
machine.network.write().await.slash(validator).await
}
Err(TendermintError::Temporal) => (),
2022-10-16 07:30:11 -04:00
}
}
2022-10-16 07:30:11 -04:00
Err(TryRecvError::Empty) => yield_now().await,
Err(TryRecvError::Disconnected) => break,
2022-10-12 21:36:40 -04:00
}
}
}),
2022-10-02 23:23:58 -04:00
}
}
// 49-54
2022-10-16 07:30:11 -04:00
fn check_committed(&mut self, round: Round) -> Option<N::Block> {
let proposer = self.weights.proposer(self.number, round);
2022-10-02 23:23:58 -04:00
// Get the proposal
2022-10-16 07:30:11 -04:00
if let Some(proposal) = self.log.get(round, proposer, Step::Propose) {
2022-10-02 23:23:58 -04:00
// Destructure
debug_assert!(matches!(proposal, Data::Proposal(..)));
if let Data::Proposal(_, block) = proposal {
// Check if it has gotten a sufficient amount of precommits
let (participants, weight) = self
.log
// Use a junk signature since message equality is irrelevant to the signature
.message_instances(round, Data::Precommit(Some((block.id(), self.signer.sign(&[])))));
2022-10-02 23:23:58 -04:00
2022-10-16 07:30:11 -04:00
let threshold = self.weights.threshold();
if weight >= threshold {
return Some(block.clone());
2022-10-02 23:23:58 -04:00
}
// 47-48
2022-10-16 07:30:11 -04:00
if participants >= threshold {
let timeout = self.timeout(Step::Precommit);
self.timeouts.entry(Step::Precommit).or_insert(timeout);
2022-10-02 23:23:58 -04:00
}
}
}
None
}
2022-10-16 07:30:11 -04:00
async fn message(
&mut self,
msg: Message<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
2022-10-16 07:30:11 -04:00
) -> Result<Option<N::Block>, TendermintError<N::ValidatorId>> {
if let Data::Precommit(Some((id, sig))) = &msg.data {
if !self.signer.verify(msg.sender, id.as_ref(), sig.clone()) {
Err(TendermintError::Malicious(msg.sender))?;
}
}
if msg.number != self.number {
2022-10-02 23:23:58 -04:00
Err(TendermintError::Temporal)?;
}
2022-10-16 07:30:11 -04:00
if matches!(msg.data, Data::Proposal(..)) &&
(msg.sender != self.weights.proposer(msg.number, msg.round))
{
Err(TendermintError::Malicious(msg.sender))?;
};
2022-10-16 07:30:11 -04:00
if !self.log.log(msg.clone())? {
2022-10-02 23:23:58 -04:00
return Ok(None);
}
// All functions, except for the finalizer and the jump, are locked to the current round
// Run the finalizer to see if it applies
if matches!(msg.data, Data::Proposal(..)) || matches!(msg.data, Data::Precommit(_)) {
let block = self.check_committed(msg.round);
if block.is_some() {
return Ok(block);
}
}
// Else, check if we need to jump ahead
2022-10-16 09:16:44 -04:00
#[allow(clippy::comparison_chain)]
2022-10-16 07:30:11 -04:00
if msg.round.0 < self.round.0 {
2022-10-02 23:23:58 -04:00
return Ok(None);
2022-10-16 07:30:11 -04:00
} else if msg.round.0 > self.round.0 {
2022-10-02 23:23:58 -04:00
// 55-56
2022-10-16 07:30:11 -04:00
if self.log.round_participation(self.round) > self.weights.fault_thresold() {
2022-10-16 09:09:14 -04:00
self.round(msg.round).await;
2022-10-02 23:23:58 -04:00
} else {
return Ok(None);
}
}
2022-10-16 07:30:11 -04:00
let proposal = self
.log
.get(self.round, self.weights.proposer(self.number, self.round), Step::Propose)
.cloned();
2022-10-02 23:23:58 -04:00
if self.step == Step::Propose {
2022-10-16 07:30:11 -04:00
if let Some(proposal) = &proposal {
2022-10-02 23:23:58 -04:00
debug_assert!(matches!(proposal, Data::Proposal(..)));
if let Data::Proposal(vr, block) = proposal {
if let Some(vr) = vr {
// 28-33
2022-10-16 07:30:11 -04:00
if (vr.0 < self.round.0) && self.log.has_consensus(*vr, Data::Prevote(Some(block.id())))
{
debug_assert!(self
2022-10-16 07:30:11 -04:00
.broadcast(Data::Prevote(Some(block.id()).filter(|_| {
self
.locked
2022-10-16 07:30:11 -04:00
.as_ref()
.map(|(round, value)| (round.0 <= vr.0) || (block.id() == value.id()))
.unwrap_or(true)
})))
2022-10-16 07:30:11 -04:00
.await
.is_none());
2022-10-02 23:23:58 -04:00
} else {
Err(TendermintError::Malicious(msg.sender))?;
}
} else {
// 22-27
2022-10-17 08:07:23 -04:00
self.network.write().await.validate(block).map_err(|e| match e {
BlockError::Temporal => TendermintError::Temporal,
BlockError::Fatal => TendermintError::Malicious(msg.sender),
})?;
debug_assert!(self
2022-10-16 07:30:11 -04:00
.broadcast(Data::Prevote(Some(block.id()).filter(|_| self.locked.is_none() ||
self.locked.as_ref().map(|locked| locked.1.id()) == Some(block.id()))))
.await
.is_none());
2022-10-02 23:23:58 -04:00
}
}
}
}
if self.step == Step::Prevote {
2022-10-16 07:30:11 -04:00
let (participation, weight) = self.log.message_instances(self.round, Data::Prevote(None));
2022-10-02 23:23:58 -04:00
// 34-35
if participation >= self.weights.threshold() {
2022-10-16 07:30:11 -04:00
let timeout = self.timeout(Step::Prevote);
self.timeouts.entry(Step::Prevote).or_insert(timeout);
2022-10-02 23:23:58 -04:00
}
// 44-46
if weight >= self.weights.threshold() {
2022-10-16 07:30:11 -04:00
debug_assert!(self.broadcast(Data::Precommit(None)).await.is_none());
2022-10-02 23:23:58 -04:00
}
}
2022-10-16 09:09:14 -04:00
// 36-43
2022-10-16 07:30:11 -04:00
if (self.valid.is_none()) && ((self.step == Step::Prevote) || (self.step == Step::Precommit)) {
if let Some(proposal) = proposal {
debug_assert!(matches!(proposal, Data::Proposal(..)));
if let Data::Proposal(_, block) = proposal {
if self.log.has_consensus(self.round, Data::Prevote(Some(block.id()))) {
self.valid = Some((self.round, block.clone()));
if self.step == Step::Prevote {
self.locked = self.valid.clone();
return Ok(
self
.broadcast(Data::Precommit(Some((
block.id(),
self.signer.sign(block.id().as_ref()),
))))
.await,
);
2022-10-16 07:30:11 -04:00
}
}
}
}
}
2022-10-02 23:23:58 -04:00
Ok(None)
}
}