Clean up time code in tendermint-machine

This commit is contained in:
Luke Parker
2022-11-12 07:12:05 -05:00
parent b53759c6ec
commit e2e7a70f1e
4 changed files with 148 additions and 98 deletions

View File

@@ -2,7 +2,7 @@ use core::fmt::Debug;
use std::{ use std::{
sync::Arc, sync::Arc,
time::{UNIX_EPOCH, SystemTime, Instant, Duration}, time::{SystemTime, Instant, Duration},
collections::{VecDeque, HashSet, HashMap}, collections::{VecDeque, HashSet, HashMap},
}; };
@@ -15,13 +15,16 @@ use futures::{
}; };
use tokio::time::sleep; use tokio::time::sleep;
/// Traits and types of the external network being integrated with to provide consensus over. mod time;
pub mod ext; use time::{sys_time, CanonicalInstant};
use ext::*;
mod message_log; mod message_log;
use message_log::MessageLog; use message_log::MessageLog;
/// Traits and types of the external network being integrated with to provide consensus over.
pub mod ext;
use ext::*;
pub(crate) fn commit_msg(end_time: u64, id: &[u8]) -> Vec<u8> { pub(crate) fn commit_msg(end_time: u64, id: &[u8]) -> Vec<u8> {
[&end_time.to_le_bytes(), id].concat().to_vec() [&end_time.to_le_bytes(), id].concat().to_vec()
} }
@@ -110,8 +113,6 @@ pub struct TendermintMachine<N: Network> {
validator_id: Option<N::ValidatorId>, validator_id: Option<N::ValidatorId>,
number: BlockNumber, number: BlockNumber,
canonical_start_time: u64,
start_time: Instant,
personal_proposal: N::Block, personal_proposal: N::Block,
queue: queue:
@@ -123,8 +124,9 @@ pub struct TendermintMachine<N: Network> {
log: MessageLog<N>, log: MessageLog<N>,
slashes: HashSet<N::ValidatorId>, slashes: HashSet<N::ValidatorId>,
end_time: HashMap<Round, CanonicalInstant>,
round: Round, round: Round,
end_time: HashMap<Round, Instant>, start_time: CanonicalInstant,
step: Step, step: Step,
locked: Option<(Round, <N::Block as Block>::Id)>, locked: Option<(Round, <N::Block as Block>::Id)>,
@@ -133,6 +135,9 @@ pub struct TendermintMachine<N: Network> {
timeouts: HashMap<Step, Instant>, timeouts: HashMap<Step, Instant>,
} }
pub type StepSender<N> =
mpsc::UnboundedSender<(Commit<<N as Network>::SignatureScheme>, <N as Network>::Block)>;
pub type MessageSender<N> = mpsc::UnboundedSender< pub type MessageSender<N> = mpsc::UnboundedSender<
SignedMessage< SignedMessage<
<N as Network>::ValidatorId, <N as Network>::ValidatorId,
@@ -145,7 +150,7 @@ pub type MessageSender<N> = mpsc::UnboundedSender<
pub struct TendermintHandle<N: Network> { pub struct TendermintHandle<N: Network> {
/// Channel to trigger the machine to move to the next height. /// Channel to trigger the machine to move to the next height.
/// Takes in the the previous block's commit, along with the new proposal. /// Takes in the the previous block's commit, along with the new proposal.
pub step: mpsc::UnboundedSender<(Commit<N::SignatureScheme>, N::Block)>, pub step: StepSender<N>,
/// Channel to send messages received from the P2P layer. /// Channel to send messages received from the P2P layer.
pub messages: MessageSender<N>, pub messages: MessageSender<N>,
/// Tendermint machine to be run on an asynchronous task. /// Tendermint machine to be run on an asynchronous task.
@@ -153,19 +158,7 @@ pub struct TendermintHandle<N: Network> {
} }
impl<N: Network + 'static> TendermintMachine<N> { impl<N: Network + 'static> TendermintMachine<N> {
// Get the canonical end time for a given round, represented as seconds since the epoch fn timeout(&self, step: Step) -> CanonicalInstant {
// While we have the Instant already in end_time, converting it to a SystemTime would be lossy,
// potentially enough to cause a consensus failure. Independently tracking this variable ensures
// that won't happen
fn canonical_end_time(&self, round: Round) -> u64 {
let mut time = self.canonical_start_time;
for r in 0 .. u64::from(round.0 + 1) {
time += (r + 1) * u64::from(N::block_time());
}
time
}
fn timeout(&self, step: Step) -> Instant {
let adjusted_block = N::BLOCK_PROCESSING_TIME * (self.round.0 + 1); let adjusted_block = N::BLOCK_PROCESSING_TIME * (self.round.0 + 1);
let adjusted_latency = N::LATENCY_TIME * (self.round.0 + 1); let adjusted_latency = N::LATENCY_TIME * (self.round.0 + 1);
let offset = Duration::from_secs( let offset = Duration::from_secs(
@@ -206,27 +199,31 @@ impl<N: Network + 'static> TendermintMachine<N> {
self.broadcast(Data::Proposal(round, block)); self.broadcast(Data::Proposal(round, block));
true true
} else { } else {
self.timeouts.insert(Step::Propose, self.timeout(Step::Propose)); self.timeouts.insert(Step::Propose, self.timeout(Step::Propose).instant());
false false
} }
} }
fn round(&mut self, round: Round) -> bool { fn round(&mut self, round: Round) -> bool {
// Correct the start time // If moving to a new round, correct the start time and populate end_time
for r in self.round.0 .. round.0 { for r in self.round.0 .. round.0 {
let end = self.timeout(Step::Precommit); let end = self.timeout(Step::Precommit);
self.end_time.insert(Round(r), end); self.end_time.insert(Round(r), end);
self.start_time = end;
self.round.0 += 1; self.round.0 += 1;
self.start_time = end;
} }
debug_assert_eq!(self.round, round);
// Write the round regardless in case of reset
// 11-13 // 11-13
self.round = round;
self.step = Step::Propose;
// Write the end time
self.end_time.insert(round, self.timeout(Step::Precommit));
// Clear timeouts // Clear timeouts
self.timeouts = HashMap::new(); self.timeouts = HashMap::new();
self.end_time.insert(round, self.timeout(Step::Precommit));
self.step = Step::Propose;
self.round_propose() self.round_propose()
} }
@@ -234,13 +231,11 @@ impl<N: Network + 'static> TendermintMachine<N> {
async fn reset(&mut self, end_round: Round, proposal: N::Block) { async fn reset(&mut self, end_round: Round, proposal: N::Block) {
// Wait for the next block interval // Wait for the next block interval
let round_end = self.end_time[&end_round]; let round_end = self.end_time[&end_round];
sleep(round_end.saturating_duration_since(Instant::now())).await; sleep(round_end.instant().saturating_duration_since(Instant::now())).await;
self.validator_id = self.signer.validator_id().await; self.validator_id = self.signer.validator_id().await;
self.number.0 += 1; self.number.0 += 1;
self.canonical_start_time = self.canonical_end_time(end_round);
self.start_time = round_end;
self.personal_proposal = proposal; self.personal_proposal = proposal;
self.queue = self.queue.drain(..).filter(|msg| msg.number == self.number).collect(); self.queue = self.queue.drain(..).filter(|msg| msg.number == self.number).collect();
@@ -248,6 +243,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
self.log = MessageLog::new(self.weights.clone()); self.log = MessageLog::new(self.weights.clone());
self.slashes = HashSet::new(); self.slashes = HashSet::new();
self.end_time = HashMap::new(); self.end_time = HashMap::new();
self.start_time = round_end;
self.locked = None; self.locked = None;
self.valid = None; self.valid = None;
@@ -256,23 +252,27 @@ impl<N: Network + 'static> TendermintMachine<N> {
} }
async fn reset_by_commit(&mut self, commit: Commit<N::SignatureScheme>, proposal: N::Block) { async fn reset_by_commit(&mut self, commit: Commit<N::SignatureScheme>, proposal: N::Block) {
// Determine the Round number this commit ended on let mut round = None;
let mut round = Round(0); // If our start time is >= the commit's end time, it's from a previous round
// Use < to prevent an infinite loop if self.start_time.canonical() >= commit.end_time {
while self.canonical_end_time(round) < commit.end_time { for (round_i, end_time) in &self.end_time {
round.0 += 1; if end_time.canonical() == commit.end_time {
round = Some(*round_i);
break;
}
}
} else {
// Increment rounds until we find the round
while {
self.round(Round(self.round.0 + 1));
// Use < to prevent an infinite loop
self.end_time[&self.round].canonical() < commit.end_time
} {}
round =
Some(self.round).filter(|_| self.end_time[&self.round].canonical() == commit.end_time);
} }
debug_assert_eq!(
self.canonical_end_time(round),
commit.end_time,
"resetting by commit for a different block"
);
// Populate the various pieces of round info self.reset(round.expect("commit wasn't for the machine's next block"), proposal).await;
if self.round.0 < round.0 {
self.round(round);
}
self.reset(round, proposal).await;
} }
async fn slash(&mut self, validator: N::ValidatorId) { async fn slash(&mut self, validator: N::ValidatorId) {
@@ -297,24 +297,9 @@ impl<N: Network + 'static> TendermintMachine<N> {
step: step_send, step: step_send,
messages: msg_send, messages: msg_send,
machine: { machine: {
let last_end = UNIX_EPOCH + Duration::from_secs(last.1); let last_time = sys_time(last.1);
// If the last block hasn't ended yet, sleep until it has // If the last block hasn't ended yet, sleep until it has
{ sleep(last_time.duration_since(SystemTime::now()).unwrap_or(Duration::ZERO)).await;
let now = SystemTime::now();
if last_end > now {
sleep(last_end.duration_since(now).unwrap_or(Duration::ZERO)).await;
}
}
// Convert the last time to an instant
// This is imprecise yet should be precise enough, given this library only has
// second accuracy
let last_time = {
let instant_now = Instant::now();
let sys_now = SystemTime::now();
instant_now - sys_now.duration_since(last_end).unwrap_or(Duration::ZERO)
};
let signer = network.signer(); let signer = network.signer();
let validators = network.signature_scheme(); let validators = network.signature_scheme();
@@ -330,15 +315,6 @@ impl<N: Network + 'static> TendermintMachine<N> {
validator_id, validator_id,
number: BlockNumber(last.0 .0 + 1), number: BlockNumber(last.0 .0 + 1),
canonical_start_time: last.1,
// The end time of the last block is the start time for this one
// The Commit explicitly contains the end time, so loading the last commit will provide
// this. The only exception is for the genesis block, which doesn't have a commit
// Using the genesis time in place will cause this block to be created immediately
// after it, without the standard amount of separation (so their times will be
// equivalent or minimally offset)
// For callers wishing to avoid this, they should pass (0, GENESIS + N::block_time())
start_time: last_time,
personal_proposal: proposal, personal_proposal: proposal,
queue: VecDeque::new(), queue: VecDeque::new(),
@@ -347,8 +323,16 @@ impl<N: Network + 'static> TendermintMachine<N> {
log: MessageLog::new(weights), log: MessageLog::new(weights),
slashes: HashSet::new(), slashes: HashSet::new(),
round: Round(0),
end_time: HashMap::new(), end_time: HashMap::new(),
round: Round(0),
// The end time of the last block is the start time for this one
// The Commit explicitly contains the end time, so loading the last commit will provide
// this. The only exception is for the genesis block, which doesn't have a commit
// Using the genesis time in place will cause this block to be created immediately
// after it, without the standard amount of separation (so their times will be
// equivalent or minimally offset)
// For callers wishing to avoid this, they should pass (0, GENESIS + N::block_time())
start_time: CanonicalInstant::new(last.1),
step: Step::Propose, step: Step::Propose,
locked: None, locked: None,
@@ -371,9 +355,9 @@ impl<N: Network + 'static> TendermintMachine<N> {
let timeout = self.timeouts.get(&step).copied(); let timeout = self.timeouts.get(&step).copied();
(async move { (async move {
if let Some(timeout) = timeout { if let Some(timeout) = timeout {
sleep(timeout.saturating_duration_since(Instant::now())).await sleep(timeout.saturating_duration_since(Instant::now())).await;
} else { } else {
future::pending::<()>().await future::pending::<()>().await;
} }
}) })
.fuse() .fuse()
@@ -466,7 +450,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
} }
let commit = Commit { let commit = Commit {
end_time: self.canonical_end_time(msg.round), end_time: self.end_time[&msg.round].canonical(),
validators, validators,
signature: N::SignatureScheme::aggregate(&sigs), signature: N::SignatureScheme::aggregate(&sigs),
}; };
@@ -489,6 +473,26 @@ impl<N: Network + 'static> TendermintMachine<N> {
} }
} }
fn verify_precommit_signature(
&self,
sender: N::ValidatorId,
round: Round,
data: &Data<N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
) -> Result<(), TendermintError<N::ValidatorId>> {
if let Data::Precommit(Some((id, sig))) = data {
// Also verify the end_time of the commit
// Only perform this verification if we already have the end_time
// Else, there's a DoS where we receive a precommit for some round infinitely in the future
// which forces to calculate every end time
if let Some(end_time) = self.end_time.get(&round) {
if !self.validators.verify(sender, &commit_msg(end_time.canonical(), id.as_ref()), sig) {
Err(TendermintError::Malicious(sender))?;
}
}
}
Ok(())
}
async fn message( async fn message(
&mut self, &mut self,
msg: Message<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>, msg: Message<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
@@ -497,17 +501,8 @@ impl<N: Network + 'static> TendermintMachine<N> {
Err(TendermintError::Temporal)?; Err(TendermintError::Temporal)?;
} }
// Verify the end time and signature if this is a precommit // If this is a precommit, verify its signature
if let Data::Precommit(Some((id, sig))) = &msg.data { self.verify_precommit_signature(msg.sender, msg.round, &msg.data)?;
if !self.validators.verify(
msg.sender,
&commit_msg(self.canonical_end_time(msg.round), id.as_ref()),
sig,
) {
// Since we verified this validator actually sent the message, they're malicious
Err(TendermintError::Malicious(msg.sender))?;
}
}
// Only let the proposer propose // Only let the proposer propose
if matches!(msg.data, Data::Proposal(..)) && if matches!(msg.data, Data::Proposal(..)) &&
@@ -548,8 +543,18 @@ impl<N: Network + 'static> TendermintMachine<N> {
} else if msg.round.0 > self.round.0 { } else if msg.round.0 > self.round.0 {
// 55-56 // 55-56
// Jump, enabling processing by the below code // Jump, enabling processing by the below code
if self.log.round_participation(self.round) > self.weights.fault_thresold() { if self.log.round_participation(msg.round) > self.weights.fault_thresold() {
// If we're the proposer, return to avoid a double process // If this round already has precommit messages, verify their signatures
let round_msgs = self.log.log[&msg.round].clone();
for (validator, msgs) in &round_msgs {
if let Some(data) = msgs.get(&Step::Precommit) {
if self.verify_precommit_signature(*validator, msg.round, data).is_err() {
self.slash(*validator).await;
}
}
}
// If we're the proposer, return now so we re-run processing with our proposal
// If we continue now, it'd just be wasted ops
if self.round(msg.round) { if self.round(msg.round) {
return Ok(None); return Ok(None);
} }
@@ -567,7 +572,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
// 34-35 // 34-35
if participation >= self.weights.threshold() { if participation >= self.weights.threshold() {
let timeout = self.timeout(Step::Prevote); let timeout = self.timeout(Step::Prevote);
self.timeouts.entry(Step::Prevote).or_insert(timeout); self.timeouts.entry(Step::Prevote).or_insert_with(|| timeout.instant());
} }
// 44-46 // 44-46
@@ -582,7 +587,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
self.log.has_participation(self.round, Step::Precommit) self.log.has_participation(self.round, Step::Precommit)
{ {
let timeout = self.timeout(Step::Precommit); let timeout = self.timeout(Step::Precommit);
self.timeouts.entry(Step::Precommit).or_insert(timeout); self.timeouts.entry(Step::Precommit).or_insert_with(|| timeout.instant());
} }
let proposer = self.weights.proposer(self.number, self.round); let proposer = self.weights.proposer(self.number, self.round);
@@ -645,7 +650,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
block.id(), block.id(),
self self
.signer .signer
.sign(&commit_msg(self.canonical_end_time(self.round), block.id().as_ref())) .sign(&commit_msg(self.end_time[&self.round].canonical(), block.id().as_ref()))
.await, .await,
)))); ))));
return Ok(None); return Ok(None);

View File

@@ -8,7 +8,7 @@ pub(crate) struct MessageLog<N: Network> {
N::ValidatorId, N::ValidatorId,
(<N::Block as Block>::Id, <N::SignatureScheme as SignatureScheme>::Signature), (<N::Block as Block>::Id, <N::SignatureScheme as SignatureScheme>::Signature),
>, >,
log: HashMap< pub(crate) log: HashMap<
Round, Round,
HashMap< HashMap<
N::ValidatorId, N::ValidatorId,

View File

@@ -0,0 +1,43 @@
use core::ops::Add;
use std::time::{UNIX_EPOCH, SystemTime, Instant, Duration};
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub(crate) struct CanonicalInstant {
/// Time since the epoch.
time: u64,
/// An Instant synchronized with the above time.
instant: Instant,
}
pub(crate) fn sys_time(time: u64) -> SystemTime {
UNIX_EPOCH + Duration::from_secs(time)
}
impl CanonicalInstant {
pub(crate) fn new(time: u64) -> CanonicalInstant {
// This is imprecise yet should be precise enough, as it'll resolve within a few ms
let instant_now = Instant::now();
let sys_now = SystemTime::now();
// If the time is in the future, this will be off by that much time
let elapsed = sys_now.duration_since(sys_time(time)).unwrap_or(Duration::ZERO);
let synced_instant = instant_now - elapsed;
CanonicalInstant { time, instant: synced_instant }
}
pub(crate) fn canonical(&self) -> u64 {
self.time
}
pub(crate) fn instant(&self) -> Instant {
self.instant
}
}
impl Add<Duration> for CanonicalInstant {
type Output = CanonicalInstant;
fn add(self, duration: Duration) -> CanonicalInstant {
CanonicalInstant { time: self.time + duration.as_secs(), instant: self.instant + duration }
}
}

View File

@@ -10,7 +10,9 @@ use parity_scale_codec::{Encode, Decode};
use futures::SinkExt; use futures::SinkExt;
use tokio::{sync::RwLock, time::sleep}; use tokio::{sync::RwLock, time::sleep};
use tendermint_machine::{ext::*, SignedMessage, MessageSender, TendermintMachine, TendermintHandle}; use tendermint_machine::{
ext::*, SignedMessage, StepSender, MessageSender, TendermintMachine, TendermintHandle,
};
type TestValidatorId = u16; type TestValidatorId = u16;
type TestBlockId = [u8; 4]; type TestBlockId = [u8; 4];
@@ -94,7 +96,7 @@ impl Block for TestBlock {
} }
} }
struct TestNetwork(u16, Arc<RwLock<Vec<MessageSender<Self>>>>); struct TestNetwork(u16, Arc<RwLock<Vec<(MessageSender<Self>, StepSender<Self>)>>>);
#[async_trait] #[async_trait]
impl Network for TestNetwork { impl Network for TestNetwork {
@@ -119,7 +121,7 @@ impl Network for TestNetwork {
} }
async fn broadcast(&mut self, msg: SignedMessage<TestValidatorId, Self::Block, [u8; 32]>) { async fn broadcast(&mut self, msg: SignedMessage<TestValidatorId, Self::Block, [u8; 32]>) {
for messages in self.1.write().await.iter_mut() { for (messages, _) in self.1.write().await.iter_mut() {
messages.send(msg.clone()).await.unwrap(); messages.send(msg.clone()).await.unwrap();
} }
} }
@@ -146,20 +148,20 @@ impl Network for TestNetwork {
} }
impl TestNetwork { impl TestNetwork {
async fn new(validators: usize) -> Arc<RwLock<Vec<MessageSender<Self>>>> { async fn new(validators: usize) -> Arc<RwLock<Vec<(MessageSender<Self>, StepSender<Self>)>>> {
let arc = Arc::new(RwLock::new(vec![])); let arc = Arc::new(RwLock::new(vec![]));
{ {
let mut write = arc.write().await; let mut write = arc.write().await;
for i in 0 .. validators { for i in 0 .. validators {
let i = u16::try_from(i).unwrap(); let i = u16::try_from(i).unwrap();
let TendermintHandle { messages, machine, .. } = TendermintMachine::new( let TendermintHandle { messages, machine, step } = TendermintMachine::new(
TestNetwork(i, arc.clone()), TestNetwork(i, arc.clone()),
(BlockNumber(1), (SystemTime::now().duration_since(UNIX_EPOCH)).unwrap().as_secs()), (BlockNumber(1), (SystemTime::now().duration_since(UNIX_EPOCH)).unwrap().as_secs()),
TestBlock { id: 1u32.to_le_bytes(), valid: Ok(()) }, TestBlock { id: 1u32.to_le_bytes(), valid: Ok(()) },
) )
.await; .await;
tokio::task::spawn(machine.run()); tokio::task::spawn(machine.run());
write.push(messages); write.push((messages, step));
} }
} }
arc arc