From c13e0c75aeaafa24caef0b3bed36e936e11a3b00 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sun, 13 Nov 2022 18:11:09 -0500 Subject: [PATCH] Move more code into block.rs Introduces type-aliases to obtain Data/Message/SignedMessage solely from a Network object. Fixes a bug regarding stepping when you're not an active validator. --- substrate/tendermint/machine/src/block.rs | 68 ++++++++++++++ substrate/tendermint/machine/src/ext.rs | 11 +-- substrate/tendermint/machine/src/lib.rs | 89 ++++++------------- .../tendermint/machine/src/message_log.rs | 26 ++---- substrate/tendermint/machine/tests/ext.rs | 4 +- 5 files changed, 106 insertions(+), 92 deletions(-) diff --git a/substrate/tendermint/machine/src/block.rs b/substrate/tendermint/machine/src/block.rs index a6b7b190..3e7dc6a5 100644 --- a/substrate/tendermint/machine/src/block.rs +++ b/substrate/tendermint/machine/src/block.rs @@ -8,6 +8,7 @@ use crate::{ ext::{RoundNumber, BlockNumber, Block, Network}, round::RoundData, message_log::MessageLog, + Step, Data, DataFor, Message, MessageFor, }; pub(crate) struct BlockData { @@ -56,4 +57,71 @@ impl BlockData { pub(crate) fn round_mut(&mut self) -> &mut RoundData { self.round.as_mut().unwrap() } + + pub(crate) fn populate_end_time(&mut self, round: RoundNumber) { + for r in (self.round().number.0 + 1) .. round.0 { + self.end_time.insert( + RoundNumber(r), + RoundData::::new(RoundNumber(r), self.end_time[&RoundNumber(r - 1)]).end_time(), + ); + } + } + + // Start a new round. Optionally takes in the time for when this is the first round, and the time + // isn't simply the time of the prior round (yet rather the prior block). Returns the proposal + // data, if we are the proposer. + pub(crate) fn new_round( + &mut self, + round: RoundNumber, + proposer: N::ValidatorId, + time: Option, + ) -> Option> { + debug_assert_eq!(round.0 == 0, time.is_some()); + + // If skipping rounds, populate end_time + if round.0 != 0 { + self.populate_end_time(round); + } + + // 11-13 + self.round = Some(RoundData::::new( + round, + time.unwrap_or_else(|| self.end_time[&RoundNumber(round.0 - 1)]), + )); + self.end_time.insert(round, self.round().end_time()); + + // 14-21 + if Some(proposer) == self.validator_id { + let (round, block) = if let Some((round, block)) = &self.valid { + (Some(*round), block.clone()) + } else { + (None, self.proposal.clone()) + }; + Some(Data::Proposal(round, block)) + } else { + self.round_mut().set_timeout(Step::Propose); + None + } + } + + // Transform Data into an actual Message, using the contextual data from this block + pub(crate) fn message(&mut self, data: DataFor) -> Option> { + debug_assert_eq!( + self.round().step, + match data.step() { + Step::Propose | Step::Prevote => Step::Propose, + Step::Precommit => Step::Prevote, + }, + ); + // 27, 33, 41, 46, 60, 64 + self.round_mut().step = data.step(); + + // Only return a message to if we're actually a current validator + self.validator_id.map(|validator_id| Message { + sender: validator_id, + number: self.number, + round: self.round().number, + data, + }) + } } diff --git a/substrate/tendermint/machine/src/ext.rs b/substrate/tendermint/machine/src/ext.rs index ba411e6f..b7295f2c 100644 --- a/substrate/tendermint/machine/src/ext.rs +++ b/substrate/tendermint/machine/src/ext.rs @@ -6,7 +6,7 @@ use thiserror::Error; use parity_scale_codec::{Encode, Decode}; -use crate::{SignedMessage, commit_msg}; +use crate::{SignedMessageFor, commit_msg}; /// An alias for a series of traits required for a type to be usable as a validator ID, /// automatically implemented for all types satisfying those traits. @@ -249,14 +249,7 @@ pub trait Network: Send + Sync { /// established, this will double-authenticate. Switching to unauthenticated channels in a system /// already providing authenticated channels is not recommended as this is a minor, temporal /// inefficiency while downgrading channels may have wider implications. - async fn broadcast( - &mut self, - msg: SignedMessage< - Self::ValidatorId, - Self::Block, - ::Signature, - >, - ); + async fn broadcast(&mut self, msg: SignedMessageFor); /// Trigger a slash for the validator in question who was definitively malicious. /// The exact process of triggering a slash is undefined and left to the network as a whole. diff --git a/substrate/tendermint/machine/src/lib.rs b/substrate/tendermint/machine/src/lib.rs index cdd3e90a..876b11ec 100644 --- a/substrate/tendermint/machine/src/lib.rs +++ b/substrate/tendermint/machine/src/lib.rs @@ -19,7 +19,6 @@ mod time; use time::{sys_time, CanonicalInstant}; mod round; -use round::RoundData; mod block; use block::BlockData; @@ -108,6 +107,21 @@ enum TendermintError { Temporal, } +// Type aliases to abstract over generic hell +pub(crate) type DataFor = + Data<::Block, <::SignatureScheme as SignatureScheme>::Signature>; +pub(crate) type MessageFor = Message< + ::ValidatorId, + ::Block, + <::SignatureScheme as SignatureScheme>::Signature, +>; +/// Type alias to the SignedMessage type for a given Network +pub type SignedMessageFor = SignedMessage< + ::ValidatorId, + ::Block, + <::SignatureScheme as SignatureScheme>::Signature, +>; + /// A machine executing the Tendermint protocol. pub struct TendermintMachine { network: N, @@ -115,11 +129,8 @@ pub struct TendermintMachine { validators: N::SignatureScheme, weights: Arc, - queue: - VecDeque::Signature>>, - msg_recv: mpsc::UnboundedReceiver< - SignedMessage::Signature>, - >, + queue: VecDeque>, + msg_recv: mpsc::UnboundedReceiver>, step_recv: mpsc::UnboundedReceiver<(Commit, N::Block)>, block: BlockData, @@ -128,13 +139,7 @@ pub struct TendermintMachine { pub type StepSender = mpsc::UnboundedSender<(Commit<::SignatureScheme>, ::Block)>; -pub type MessageSender = mpsc::UnboundedSender< - SignedMessage< - ::ValidatorId, - ::Block, - <::SignatureScheme as SignatureScheme>::Signature, - >, ->; +pub type MessageSender = mpsc::UnboundedSender>; /// A Tendermint machine and its channel to receive messages from the gossip layer over. pub struct TendermintHandle { @@ -148,58 +153,20 @@ pub struct TendermintHandle { } impl TendermintMachine { - fn broadcast( - &mut self, - data: Data::Signature>, - ) { - if let Some(validator_id) = self.block.validator_id { - // 27, 33, 41, 46, 60, 64 - self.block.round_mut().step = data.step(); - self.queue.push_back(Message { - sender: validator_id, - number: self.block.number, - round: self.block.round().number, - data, - }); - } - } - - fn populate_end_time(&mut self, round: RoundNumber) { - for r in (self.block.round().number.0 + 1) .. round.0 { - self.block.end_time.insert( - RoundNumber(r), - RoundData::::new(RoundNumber(r), self.block.end_time[&RoundNumber(r - 1)]).end_time(), - ); + fn broadcast(&mut self, data: DataFor) { + if let Some(msg) = self.block.message(data) { + self.queue.push_back(msg); } } // Start a new round. Returns true if we were the proposer fn round(&mut self, round: RoundNumber, time: Option) -> bool { - // If skipping rounds, populate end_time - if round.0 != 0 { - self.populate_end_time(round); - } - - // 11-13 - self.block.round = Some(RoundData::::new( - round, - time.unwrap_or_else(|| self.block.end_time[&RoundNumber(round.0 - 1)]), - )); - self.block.end_time.insert(round, self.block.round().end_time()); - - // 14-21 - if Some(self.weights.proposer(self.block.number, self.block.round().number)) == - self.block.validator_id + if let Some(data) = + self.block.new_round(round, self.weights.proposer(self.block.number, round), time) { - let (round, block) = if let Some((round, block)) = &self.block.valid { - (Some(*round), block.clone()) - } else { - (None, self.block.proposal.clone()) - }; - self.broadcast(Data::Proposal(round, block)); + self.broadcast(data); true } else { - self.block.round_mut().set_timeout(Step::Propose); false } } @@ -207,7 +174,7 @@ impl TendermintMachine { // 53-54 async fn reset(&mut self, end_round: RoundNumber, proposal: N::Block) { // Ensure we have the end time data for the last round - self.populate_end_time(end_round); + self.block.populate_end_time(end_round); // Sleep until this round ends let round_end = self.block.end_time[&end_round]; @@ -233,7 +200,7 @@ impl TendermintMachine { // If this commit is for a round we don't have, jump up to it while self.block.end_time[&round].canonical() < commit.end_time { round.0 += 1; - self.populate_end_time(round); + self.block.populate_end_time(round); } // If this commit is for a prior round, find it while self.block.end_time[&round].canonical() > commit.end_time { @@ -417,7 +384,7 @@ impl TendermintMachine { &self, sender: N::ValidatorId, round: RoundNumber, - data: &Data::Signature>, + data: &DataFor, ) -> Result<(), TendermintError> { if let Data::Precommit(Some((id, sig))) = data { // Also verify the end_time of the commit @@ -435,7 +402,7 @@ impl TendermintMachine { async fn message( &mut self, - msg: Message::Signature>, + msg: MessageFor, ) -> Result, TendermintError> { if msg.number != self.block.number { Err(TendermintError::Temporal)?; diff --git a/substrate/tendermint/machine/src/message_log.rs b/substrate/tendermint/machine/src/message_log.rs index e9877130..0592160d 100644 --- a/substrate/tendermint/machine/src/message_log.rs +++ b/substrate/tendermint/machine/src/message_log.rs @@ -1,6 +1,6 @@ use std::{sync::Arc, collections::HashMap}; -use crate::{ext::*, RoundNumber, Step, Data, Message, TendermintError}; +use crate::{ext::*, RoundNumber, Step, Data, DataFor, MessageFor, TendermintError}; pub(crate) struct MessageLog { weights: Arc, @@ -8,13 +8,7 @@ pub(crate) struct MessageLog { N::ValidatorId, (::Id, ::Signature), >, - pub(crate) log: HashMap< - RoundNumber, - HashMap< - N::ValidatorId, - HashMap::Signature>>, - >, - >, + pub(crate) log: HashMap>>>, } impl MessageLog { @@ -25,7 +19,7 @@ impl MessageLog { // Returns true if it's a new message pub(crate) fn log( &mut self, - msg: Message::Signature>, + msg: MessageFor, ) -> Result> { let round = self.log.entry(msg.round).or_insert_with(HashMap::new); let msgs = round.entry(msg.sender).or_insert_with(HashMap::new); @@ -55,11 +49,7 @@ impl MessageLog { // For a given round, return the participating weight for this step, and the weight agreeing with // the data. - pub(crate) fn message_instances( - &self, - round: RoundNumber, - data: Data::Signature>, - ) -> (u64, u64) { + pub(crate) fn message_instances(&self, round: RoundNumber, data: DataFor) -> (u64, u64) { let mut participating = 0; let mut weight = 0; for (participant, msgs) in &self.log[&round] { @@ -97,11 +87,7 @@ impl MessageLog { } // Check if consensus has been reached on a specific piece of data - pub(crate) fn has_consensus( - &self, - round: RoundNumber, - data: Data::Signature>, - ) -> bool { + pub(crate) fn has_consensus(&self, round: RoundNumber, data: DataFor) -> bool { let (_, weight) = self.message_instances(round, data); weight >= self.weights.threshold() } @@ -111,7 +97,7 @@ impl MessageLog { round: RoundNumber, sender: N::ValidatorId, step: Step, - ) -> Option<&Data::Signature>> { + ) -> Option<&DataFor> { self.log.get(&round).and_then(|round| round.get(&sender).and_then(|msgs| msgs.get(&step))) } } diff --git a/substrate/tendermint/machine/tests/ext.rs b/substrate/tendermint/machine/tests/ext.rs index 69b8e55f..086c96d8 100644 --- a/substrate/tendermint/machine/tests/ext.rs +++ b/substrate/tendermint/machine/tests/ext.rs @@ -11,7 +11,7 @@ use futures::SinkExt; use tokio::{sync::RwLock, time::sleep}; use tendermint_machine::{ - ext::*, SignedMessage, StepSender, MessageSender, TendermintMachine, TendermintHandle, + ext::*, SignedMessageFor, StepSender, MessageSender, TendermintMachine, TendermintHandle, }; type TestValidatorId = u16; @@ -120,7 +120,7 @@ impl Network for TestNetwork { TestWeights } - async fn broadcast(&mut self, msg: SignedMessage) { + async fn broadcast(&mut self, msg: SignedMessageFor) { for (messages, _) in self.1.write().await.iter_mut() { messages.send(msg.clone()).await.unwrap(); }