From e2e7a70f1eb092f9702f780a3f09c73f960ccfed Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sat, 12 Nov 2022 07:12:05 -0500 Subject: [PATCH] Clean up time code in tendermint-machine --- substrate/tendermint/machine/src/lib.rs | 187 +++++++++--------- .../tendermint/machine/src/message_log.rs | 2 +- substrate/tendermint/machine/src/time.rs | 43 ++++ substrate/tendermint/machine/tests/ext.rs | 14 +- 4 files changed, 148 insertions(+), 98 deletions(-) create mode 100644 substrate/tendermint/machine/src/time.rs diff --git a/substrate/tendermint/machine/src/lib.rs b/substrate/tendermint/machine/src/lib.rs index 6b674ae5..3e388051 100644 --- a/substrate/tendermint/machine/src/lib.rs +++ b/substrate/tendermint/machine/src/lib.rs @@ -2,7 +2,7 @@ use core::fmt::Debug; use std::{ sync::Arc, - time::{UNIX_EPOCH, SystemTime, Instant, Duration}, + time::{SystemTime, Instant, Duration}, collections::{VecDeque, HashSet, HashMap}, }; @@ -15,13 +15,16 @@ use futures::{ }; use tokio::time::sleep; -/// Traits and types of the external network being integrated with to provide consensus over. -pub mod ext; -use ext::*; +mod time; +use time::{sys_time, CanonicalInstant}; mod message_log; use message_log::MessageLog; +/// Traits and types of the external network being integrated with to provide consensus over. +pub mod ext; +use ext::*; + pub(crate) fn commit_msg(end_time: u64, id: &[u8]) -> Vec { [&end_time.to_le_bytes(), id].concat().to_vec() } @@ -110,8 +113,6 @@ pub struct TendermintMachine { validator_id: Option, number: BlockNumber, - canonical_start_time: u64, - start_time: Instant, personal_proposal: N::Block, queue: @@ -123,8 +124,9 @@ pub struct TendermintMachine { log: MessageLog, slashes: HashSet, + end_time: HashMap, round: Round, - end_time: HashMap, + start_time: CanonicalInstant, step: Step, locked: Option<(Round, ::Id)>, @@ -133,6 +135,9 @@ pub struct TendermintMachine { timeouts: HashMap, } +pub type StepSender = + mpsc::UnboundedSender<(Commit<::SignatureScheme>, ::Block)>; + pub type MessageSender = mpsc::UnboundedSender< SignedMessage< ::ValidatorId, @@ -145,7 +150,7 @@ pub type MessageSender = mpsc::UnboundedSender< pub struct TendermintHandle { /// Channel to trigger the machine to move to the next height. /// Takes in the the previous block's commit, along with the new proposal. - pub step: mpsc::UnboundedSender<(Commit, N::Block)>, + pub step: StepSender, /// Channel to send messages received from the P2P layer. pub messages: MessageSender, /// Tendermint machine to be run on an asynchronous task. @@ -153,19 +158,7 @@ pub struct TendermintHandle { } impl TendermintMachine { - // Get the canonical end time for a given round, represented as seconds since the epoch - // While we have the Instant already in end_time, converting it to a SystemTime would be lossy, - // potentially enough to cause a consensus failure. Independently tracking this variable ensures - // that won't happen - fn canonical_end_time(&self, round: Round) -> u64 { - let mut time = self.canonical_start_time; - for r in 0 .. u64::from(round.0 + 1) { - time += (r + 1) * u64::from(N::block_time()); - } - time - } - - fn timeout(&self, step: Step) -> Instant { + fn timeout(&self, step: Step) -> CanonicalInstant { let adjusted_block = N::BLOCK_PROCESSING_TIME * (self.round.0 + 1); let adjusted_latency = N::LATENCY_TIME * (self.round.0 + 1); let offset = Duration::from_secs( @@ -206,27 +199,31 @@ impl TendermintMachine { self.broadcast(Data::Proposal(round, block)); true } else { - self.timeouts.insert(Step::Propose, self.timeout(Step::Propose)); + self.timeouts.insert(Step::Propose, self.timeout(Step::Propose).instant()); false } } fn round(&mut self, round: Round) -> bool { - // Correct the start time + // If moving to a new round, correct the start time and populate end_time for r in self.round.0 .. round.0 { let end = self.timeout(Step::Precommit); self.end_time.insert(Round(r), end); - self.start_time = end; self.round.0 += 1; + self.start_time = end; } - debug_assert_eq!(self.round, round); + + // Write the round regardless in case of reset // 11-13 + self.round = round; + self.step = Step::Propose; + + // Write the end time + self.end_time.insert(round, self.timeout(Step::Precommit)); // Clear timeouts self.timeouts = HashMap::new(); - self.end_time.insert(round, self.timeout(Step::Precommit)); - self.step = Step::Propose; self.round_propose() } @@ -234,13 +231,11 @@ impl TendermintMachine { async fn reset(&mut self, end_round: Round, proposal: N::Block) { // Wait for the next block interval let round_end = self.end_time[&end_round]; - sleep(round_end.saturating_duration_since(Instant::now())).await; + sleep(round_end.instant().saturating_duration_since(Instant::now())).await; self.validator_id = self.signer.validator_id().await; self.number.0 += 1; - self.canonical_start_time = self.canonical_end_time(end_round); - self.start_time = round_end; self.personal_proposal = proposal; self.queue = self.queue.drain(..).filter(|msg| msg.number == self.number).collect(); @@ -248,6 +243,7 @@ impl TendermintMachine { self.log = MessageLog::new(self.weights.clone()); self.slashes = HashSet::new(); self.end_time = HashMap::new(); + self.start_time = round_end; self.locked = None; self.valid = None; @@ -256,23 +252,27 @@ impl TendermintMachine { } async fn reset_by_commit(&mut self, commit: Commit, proposal: N::Block) { - // Determine the Round number this commit ended on - let mut round = Round(0); - // Use < to prevent an infinite loop - while self.canonical_end_time(round) < commit.end_time { - round.0 += 1; + let mut round = None; + // If our start time is >= the commit's end time, it's from a previous round + if self.start_time.canonical() >= commit.end_time { + for (round_i, end_time) in &self.end_time { + if end_time.canonical() == commit.end_time { + round = Some(*round_i); + break; + } + } + } else { + // Increment rounds until we find the round + while { + self.round(Round(self.round.0 + 1)); + // Use < to prevent an infinite loop + self.end_time[&self.round].canonical() < commit.end_time + } {} + round = + Some(self.round).filter(|_| self.end_time[&self.round].canonical() == commit.end_time); } - debug_assert_eq!( - self.canonical_end_time(round), - commit.end_time, - "resetting by commit for a different block" - ); - // Populate the various pieces of round info - if self.round.0 < round.0 { - self.round(round); - } - self.reset(round, proposal).await; + self.reset(round.expect("commit wasn't for the machine's next block"), proposal).await; } async fn slash(&mut self, validator: N::ValidatorId) { @@ -297,24 +297,9 @@ impl TendermintMachine { step: step_send, messages: msg_send, machine: { - let last_end = UNIX_EPOCH + Duration::from_secs(last.1); - + let last_time = sys_time(last.1); // If the last block hasn't ended yet, sleep until it has - { - let now = SystemTime::now(); - if last_end > now { - sleep(last_end.duration_since(now).unwrap_or(Duration::ZERO)).await; - } - } - - // Convert the last time to an instant - // This is imprecise yet should be precise enough, given this library only has - // second accuracy - let last_time = { - let instant_now = Instant::now(); - let sys_now = SystemTime::now(); - instant_now - sys_now.duration_since(last_end).unwrap_or(Duration::ZERO) - }; + sleep(last_time.duration_since(SystemTime::now()).unwrap_or(Duration::ZERO)).await; let signer = network.signer(); let validators = network.signature_scheme(); @@ -330,15 +315,6 @@ impl TendermintMachine { validator_id, number: BlockNumber(last.0 .0 + 1), - canonical_start_time: last.1, - // The end time of the last block is the start time for this one - // The Commit explicitly contains the end time, so loading the last commit will provide - // this. The only exception is for the genesis block, which doesn't have a commit - // Using the genesis time in place will cause this block to be created immediately - // after it, without the standard amount of separation (so their times will be - // equivalent or minimally offset) - // For callers wishing to avoid this, they should pass (0, GENESIS + N::block_time()) - start_time: last_time, personal_proposal: proposal, queue: VecDeque::new(), @@ -347,8 +323,16 @@ impl TendermintMachine { log: MessageLog::new(weights), slashes: HashSet::new(), - round: Round(0), end_time: HashMap::new(), + round: Round(0), + // The end time of the last block is the start time for this one + // The Commit explicitly contains the end time, so loading the last commit will provide + // this. The only exception is for the genesis block, which doesn't have a commit + // Using the genesis time in place will cause this block to be created immediately + // after it, without the standard amount of separation (so their times will be + // equivalent or minimally offset) + // For callers wishing to avoid this, they should pass (0, GENESIS + N::block_time()) + start_time: CanonicalInstant::new(last.1), step: Step::Propose, locked: None, @@ -371,9 +355,9 @@ impl TendermintMachine { let timeout = self.timeouts.get(&step).copied(); (async move { if let Some(timeout) = timeout { - sleep(timeout.saturating_duration_since(Instant::now())).await + sleep(timeout.saturating_duration_since(Instant::now())).await; } else { - future::pending::<()>().await + future::pending::<()>().await; } }) .fuse() @@ -466,7 +450,7 @@ impl TendermintMachine { } let commit = Commit { - end_time: self.canonical_end_time(msg.round), + end_time: self.end_time[&msg.round].canonical(), validators, signature: N::SignatureScheme::aggregate(&sigs), }; @@ -489,6 +473,26 @@ impl TendermintMachine { } } + fn verify_precommit_signature( + &self, + sender: N::ValidatorId, + round: Round, + data: &Data::Signature>, + ) -> Result<(), TendermintError> { + if let Data::Precommit(Some((id, sig))) = data { + // Also verify the end_time of the commit + // Only perform this verification if we already have the end_time + // Else, there's a DoS where we receive a precommit for some round infinitely in the future + // which forces to calculate every end time + if let Some(end_time) = self.end_time.get(&round) { + if !self.validators.verify(sender, &commit_msg(end_time.canonical(), id.as_ref()), sig) { + Err(TendermintError::Malicious(sender))?; + } + } + } + Ok(()) + } + async fn message( &mut self, msg: Message::Signature>, @@ -497,17 +501,8 @@ impl TendermintMachine { Err(TendermintError::Temporal)?; } - // Verify the end time and signature if this is a precommit - if let Data::Precommit(Some((id, sig))) = &msg.data { - if !self.validators.verify( - msg.sender, - &commit_msg(self.canonical_end_time(msg.round), id.as_ref()), - sig, - ) { - // Since we verified this validator actually sent the message, they're malicious - Err(TendermintError::Malicious(msg.sender))?; - } - } + // If this is a precommit, verify its signature + self.verify_precommit_signature(msg.sender, msg.round, &msg.data)?; // Only let the proposer propose if matches!(msg.data, Data::Proposal(..)) && @@ -548,8 +543,18 @@ impl TendermintMachine { } else if msg.round.0 > self.round.0 { // 55-56 // Jump, enabling processing by the below code - if self.log.round_participation(self.round) > self.weights.fault_thresold() { - // If we're the proposer, return to avoid a double process + if self.log.round_participation(msg.round) > self.weights.fault_thresold() { + // If this round already has precommit messages, verify their signatures + let round_msgs = self.log.log[&msg.round].clone(); + for (validator, msgs) in &round_msgs { + if let Some(data) = msgs.get(&Step::Precommit) { + if self.verify_precommit_signature(*validator, msg.round, data).is_err() { + self.slash(*validator).await; + } + } + } + // If we're the proposer, return now so we re-run processing with our proposal + // If we continue now, it'd just be wasted ops if self.round(msg.round) { return Ok(None); } @@ -567,7 +572,7 @@ impl TendermintMachine { // 34-35 if participation >= self.weights.threshold() { let timeout = self.timeout(Step::Prevote); - self.timeouts.entry(Step::Prevote).or_insert(timeout); + self.timeouts.entry(Step::Prevote).or_insert_with(|| timeout.instant()); } // 44-46 @@ -582,7 +587,7 @@ impl TendermintMachine { self.log.has_participation(self.round, Step::Precommit) { let timeout = self.timeout(Step::Precommit); - self.timeouts.entry(Step::Precommit).or_insert(timeout); + self.timeouts.entry(Step::Precommit).or_insert_with(|| timeout.instant()); } let proposer = self.weights.proposer(self.number, self.round); @@ -645,7 +650,7 @@ impl TendermintMachine { block.id(), self .signer - .sign(&commit_msg(self.canonical_end_time(self.round), block.id().as_ref())) + .sign(&commit_msg(self.end_time[&self.round].canonical(), block.id().as_ref())) .await, )))); return Ok(None); diff --git a/substrate/tendermint/machine/src/message_log.rs b/substrate/tendermint/machine/src/message_log.rs index 2536cad1..793ad7a2 100644 --- a/substrate/tendermint/machine/src/message_log.rs +++ b/substrate/tendermint/machine/src/message_log.rs @@ -8,7 +8,7 @@ pub(crate) struct MessageLog { N::ValidatorId, (::Id, ::Signature), >, - log: HashMap< + pub(crate) log: HashMap< Round, HashMap< N::ValidatorId, diff --git a/substrate/tendermint/machine/src/time.rs b/substrate/tendermint/machine/src/time.rs new file mode 100644 index 00000000..b6bd1c18 --- /dev/null +++ b/substrate/tendermint/machine/src/time.rs @@ -0,0 +1,43 @@ +use core::ops::Add; +use std::time::{UNIX_EPOCH, SystemTime, Instant, Duration}; + +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub(crate) struct CanonicalInstant { + /// Time since the epoch. + time: u64, + /// An Instant synchronized with the above time. + instant: Instant, +} + +pub(crate) fn sys_time(time: u64) -> SystemTime { + UNIX_EPOCH + Duration::from_secs(time) +} + +impl CanonicalInstant { + pub(crate) fn new(time: u64) -> CanonicalInstant { + // This is imprecise yet should be precise enough, as it'll resolve within a few ms + let instant_now = Instant::now(); + let sys_now = SystemTime::now(); + + // If the time is in the future, this will be off by that much time + let elapsed = sys_now.duration_since(sys_time(time)).unwrap_or(Duration::ZERO); + let synced_instant = instant_now - elapsed; + + CanonicalInstant { time, instant: synced_instant } + } + + pub(crate) fn canonical(&self) -> u64 { + self.time + } + + pub(crate) fn instant(&self) -> Instant { + self.instant + } +} + +impl Add for CanonicalInstant { + type Output = CanonicalInstant; + fn add(self, duration: Duration) -> CanonicalInstant { + CanonicalInstant { time: self.time + duration.as_secs(), instant: self.instant + duration } + } +} diff --git a/substrate/tendermint/machine/tests/ext.rs b/substrate/tendermint/machine/tests/ext.rs index 62552ff7..d94a2f3b 100644 --- a/substrate/tendermint/machine/tests/ext.rs +++ b/substrate/tendermint/machine/tests/ext.rs @@ -10,7 +10,9 @@ use parity_scale_codec::{Encode, Decode}; use futures::SinkExt; use tokio::{sync::RwLock, time::sleep}; -use tendermint_machine::{ext::*, SignedMessage, MessageSender, TendermintMachine, TendermintHandle}; +use tendermint_machine::{ + ext::*, SignedMessage, StepSender, MessageSender, TendermintMachine, TendermintHandle, +}; type TestValidatorId = u16; type TestBlockId = [u8; 4]; @@ -94,7 +96,7 @@ impl Block for TestBlock { } } -struct TestNetwork(u16, Arc>>>); +struct TestNetwork(u16, Arc, StepSender)>>>); #[async_trait] impl Network for TestNetwork { @@ -119,7 +121,7 @@ impl Network for TestNetwork { } async fn broadcast(&mut self, msg: SignedMessage) { - for messages in self.1.write().await.iter_mut() { + for (messages, _) in self.1.write().await.iter_mut() { messages.send(msg.clone()).await.unwrap(); } } @@ -146,20 +148,20 @@ impl Network for TestNetwork { } impl TestNetwork { - async fn new(validators: usize) -> Arc>>> { + async fn new(validators: usize) -> Arc, StepSender)>>> { let arc = Arc::new(RwLock::new(vec![])); { let mut write = arc.write().await; for i in 0 .. validators { let i = u16::try_from(i).unwrap(); - let TendermintHandle { messages, machine, .. } = TendermintMachine::new( + let TendermintHandle { messages, machine, step } = TendermintMachine::new( TestNetwork(i, arc.clone()), (BlockNumber(1), (SystemTime::now().duration_since(UNIX_EPOCH)).unwrap().as_secs()), TestBlock { id: 1u32.to_le_bytes(), valid: Ok(()) }, ) .await; tokio::task::spawn(machine.run()); - write.push(messages); + write.push((messages, step)); } } arc