From 933b17aa911d1b27c8c76d5c4715b329d6cbfa5a Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sun, 21 Apr 2024 10:14:22 -0400 Subject: [PATCH] Revert coordinator/tributary to fd4f247917cb83de19f933f75e8cf90008b25976 \#560 is causing notable CI failures, with its logs including slashes at 10x the prior rate. --- coordinator/tributary/src/lib.rs | 34 +- coordinator/tributary/src/tendermint/mod.rs | 32 +- coordinator/tributary/tendermint/src/block.rs | 17 +- coordinator/tributary/tendermint/src/ext.rs | 2 +- coordinator/tributary/tendermint/src/lib.rs | 978 ++++++++---------- .../tributary/tendermint/src/message_log.rs | 15 +- coordinator/tributary/tendermint/src/round.rs | 2 - coordinator/tributary/tendermint/tests/ext.rs | 2 +- 8 files changed, 529 insertions(+), 553 deletions(-) diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index 121ac385..dcf38c68 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -1,5 +1,5 @@ use core::{marker::PhantomData, fmt::Debug}; -use std::{sync::Arc, io}; +use std::{sync::Arc, io, collections::VecDeque}; use async_trait::async_trait; @@ -154,6 +154,14 @@ pub struct Tributary { synced_block: Arc>>>, synced_block_result: Arc>, messages: Arc>>>, + + p2p_meta_task_handle: Arc, +} + +impl Drop for Tributary { + fn drop(&mut self) { + self.p2p_meta_task_handle.abort(); + } } impl Tributary { @@ -185,7 +193,28 @@ impl Tributary { ); let blockchain = Arc::new(RwLock::new(blockchain)); - let network = TendermintNetwork { genesis, signer, validators, blockchain, p2p }; + let to_rebroadcast = Arc::new(RwLock::new(VecDeque::new())); + // Actively rebroadcast consensus messages to ensure they aren't prematurely dropped from the + // P2P layer + let p2p_meta_task_handle = Arc::new( + tokio::spawn({ + let to_rebroadcast = to_rebroadcast.clone(); + let p2p = p2p.clone(); + async move { + loop { + let to_rebroadcast = to_rebroadcast.read().await.clone(); + for msg in to_rebroadcast { + p2p.broadcast(genesis, msg).await; + } + tokio::time::sleep(core::time::Duration::from_secs(60)).await; + } + } + }) + .abort_handle(), + ); + + let network = + TendermintNetwork { genesis, signer, validators, blockchain, to_rebroadcast, p2p }; let TendermintHandle { synced_block, synced_block_result, messages, machine } = TendermintMachine::new( @@ -206,6 +235,7 @@ impl Tributary { synced_block: Arc::new(RwLock::new(synced_block)), synced_block_result: Arc::new(RwLock::new(synced_block_result)), messages: Arc::new(RwLock::new(messages)), + p2p_meta_task_handle, }) } diff --git a/coordinator/tributary/src/tendermint/mod.rs b/coordinator/tributary/src/tendermint/mod.rs index 0ce6232c..e38efa5d 100644 --- a/coordinator/tributary/src/tendermint/mod.rs +++ b/coordinator/tributary/src/tendermint/mod.rs @@ -1,5 +1,8 @@ use core::ops::Deref; -use std::{sync::Arc, collections::HashMap}; +use std::{ + sync::Arc, + collections::{VecDeque, HashMap}, +}; use async_trait::async_trait; @@ -267,6 +270,8 @@ pub struct TendermintNetwork { pub(crate) validators: Arc, pub(crate) blockchain: Arc>>, + pub(crate) to_rebroadcast: Arc>>>, + pub(crate) p2p: P, } @@ -303,6 +308,26 @@ impl Network for TendermintNetwork async fn broadcast(&mut self, msg: SignedMessageFor) { let mut to_broadcast = vec![TENDERMINT_MESSAGE]; to_broadcast.extend(msg.encode()); + + // Since we're broadcasting a Tendermint message, set it to be re-broadcasted every second + // until the block it's trying to build is complete + // If the P2P layer drops a message before all nodes obtained access, or a node had an + // intermittent failure, this will ensure reconcilliation + // This is atrocious if there's no content-based deduplication protocol for messages actively + // being gossiped + // LibP2p, as used by Serai, is configured to content-based deduplicate + { + let mut to_rebroadcast_lock = self.to_rebroadcast.write().await; + to_rebroadcast_lock.push_back(to_broadcast.clone()); + // We should have, ideally, 3 * validators messages within a round + // Therefore, this should keep the most recent 2-rounds + // TODO: This isn't perfect. Each participant should just rebroadcast their latest round of + // messages + while to_rebroadcast_lock.len() > (6 * self.validators.weights.len()) { + to_rebroadcast_lock.pop_front(); + } + } + self.p2p.broadcast(self.genesis, to_broadcast).await } @@ -341,7 +366,7 @@ impl Network for TendermintNetwork } } - async fn validate(&self, block: &Self::Block) -> Result<(), TendermintBlockError> { + async fn validate(&mut self, block: &Self::Block) -> Result<(), TendermintBlockError> { let block = Block::read::<&[u8]>(&mut block.0.as_ref()).map_err(|_| TendermintBlockError::Fatal)?; self @@ -403,6 +428,9 @@ impl Network for TendermintNetwork } } + // Since we've added a valid block, clear to_rebroadcast + *self.to_rebroadcast.write().await = VecDeque::new(); + Some(TendermintBlock( self.blockchain.write().await.build_block::(&self.signature_scheme()).serialize(), )) diff --git a/coordinator/tributary/tendermint/src/block.rs b/coordinator/tributary/tendermint/src/block.rs index 236b4816..6dfacfdb 100644 --- a/coordinator/tributary/tendermint/src/block.rs +++ b/coordinator/tributary/tendermint/src/block.rs @@ -3,6 +3,7 @@ use std::{ collections::{HashSet, HashMap}, }; +use parity_scale_codec::Encode; use serai_db::{Get, DbTxn, Db}; use crate::{ @@ -19,7 +20,7 @@ pub(crate) struct BlockData { pub(crate) number: BlockNumber, pub(crate) validator_id: Option, - pub(crate) our_proposal: Option, + pub(crate) proposal: Option, pub(crate) log: MessageLog, pub(crate) slashes: HashSet, @@ -42,7 +43,7 @@ impl BlockData { weights: Arc, number: BlockNumber, validator_id: Option, - our_proposal: Option, + proposal: Option, ) -> BlockData { BlockData { db, @@ -50,7 +51,7 @@ impl BlockData { number, validator_id, - our_proposal, + proposal, log: MessageLog::new(weights), slashes: HashSet::new(), @@ -107,17 +108,17 @@ impl BlockData { self.populate_end_time(round); } - // L11-13 + // 11-13 self.round = Some(RoundData::::new( round, time.unwrap_or_else(|| self.end_time[&RoundNumber(round.0 - 1)]), )); self.end_time.insert(round, self.round().end_time()); - // L14-21 + // 14-21 if Some(proposer) == self.validator_id { let (round, block) = self.valid.clone().unzip(); - block.or_else(|| self.our_proposal.clone()).map(|block| Data::Proposal(round, block)) + block.or_else(|| self.proposal.clone()).map(|block| Data::Proposal(round, block)) } else { self.round_mut().set_timeout(Step::Propose); None @@ -197,8 +198,8 @@ impl BlockData { assert!(!new_round); None?; } - // Put that we're sending this message to the DB - txn.put(&msg_key, []); + // Put this message to the DB + txn.put(&msg_key, res.encode()); txn.commit(); } diff --git a/coordinator/tributary/tendermint/src/ext.rs b/coordinator/tributary/tendermint/src/ext.rs index 3869d9d9..b3d568a2 100644 --- a/coordinator/tributary/tendermint/src/ext.rs +++ b/coordinator/tributary/tendermint/src/ext.rs @@ -288,7 +288,7 @@ pub trait Network: Sized + Send + Sync { async fn slash(&mut self, validator: Self::ValidatorId, slash_event: SlashEvent); /// Validate a block. - async fn validate(&self, block: &Self::Block) -> Result<(), BlockError>; + async fn validate(&mut self, block: &Self::Block) -> Result<(), BlockError>; /// Add a block, returning the proposal for the next one. /// diff --git a/coordinator/tributary/tendermint/src/lib.rs b/coordinator/tributary/tendermint/src/lib.rs index 0e328e02..da80a41c 100644 --- a/coordinator/tributary/tendermint/src/lib.rs +++ b/coordinator/tributary/tendermint/src/lib.rs @@ -6,7 +6,7 @@ use std::{ collections::VecDeque, }; -use parity_scale_codec::{Encode, Decode, IoReader}; +use parity_scale_codec::{Encode, Decode}; use futures_channel::mpsc; use futures_util::{ @@ -15,8 +15,6 @@ use futures_util::{ }; use tokio::time::sleep; -use serai_db::{Get, DbTxn, Db}; - pub mod time; use time::{sys_time, CanonicalInstant}; @@ -32,11 +30,6 @@ pub(crate) mod message_log; pub mod ext; use ext::*; -const MESSAGE_TAPE_KEY: &[u8] = b"tendermint-machine-message_tape"; -fn message_tape_key(genesis: [u8; 32]) -> Vec { - [MESSAGE_TAPE_KEY, &genesis].concat() -} - pub fn commit_msg(end_time: u64, id: &[u8]) -> Vec { [&end_time.to_le_bytes(), id].concat() } @@ -110,23 +103,9 @@ impl SignedMessage { } } -#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, Decode)] -pub enum SlashReason { - FailToPropose, - InvalidBlock, - InvalidProposer, -} - -#[derive(Clone, PartialEq, Eq, Debug, Encode, Decode)] -pub enum Evidence { - ConflictingMessages(Vec, Vec), - InvalidPrecommit(Vec), - InvalidValidRound(Vec), -} - #[derive(Clone, PartialEq, Eq, Debug)] -pub enum TendermintError { - Malicious, +pub enum TendermintError { + Malicious(N::ValidatorId, Option), Temporal, AlreadyHandled, InvalidEvidence, @@ -147,6 +126,20 @@ pub type SignedMessageFor = SignedMessage< <::SignatureScheme as SignatureScheme>::Signature, >; +#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, Decode)] +pub enum SlashReason { + FailToPropose, + InvalidBlock, + InvalidMessage, +} + +#[derive(Clone, PartialEq, Eq, Debug, Encode, Decode)] +pub enum Evidence { + ConflictingMessages(Vec, Vec), + InvalidPrecommit(Vec), + InvalidValidRound(Vec), +} + pub fn decode_signed_message(mut data: &[u8]) -> Option> { SignedMessageFor::::decode(&mut data).ok() } @@ -154,7 +147,7 @@ pub fn decode_signed_message(mut data: &[u8]) -> Option( data: &[u8], schema: &N::SignatureScheme, -) -> Result, TendermintError> { +) -> Result, TendermintError> { let msg = decode_signed_message::(data).ok_or(TendermintError::InvalidEvidence)?; // verify that evidence messages are signed correctly @@ -169,7 +162,7 @@ pub fn verify_tendermint_evience( evidence: &Evidence, schema: &N::SignatureScheme, commit: impl Fn(u64) -> Option>, -) -> Result<(), TendermintError> { +) -> Result<(), TendermintError> { match evidence { Evidence::ConflictingMessages(first, second) => { let first = decode_and_verify_signed_message::(first, schema)?.msg; @@ -193,16 +186,15 @@ pub fn verify_tendermint_evience( }; // TODO: We need to be passed in the genesis time to handle this edge case if msg.block.0 == 0 { - Err(TendermintError::InvalidEvidence)? - // todo!("invalid precommit signature on first block") + todo!("invalid precommit signature on first block") } // get the last commit let prior_commit = match commit(msg.block.0 - 1) { Some(c) => c, - // If we have yet to sync the block in question, we will return InvalidEvidence based + // If we have yet to sync the block in question, we will return InvalidContent based // on our own temporal ambiguity - // This will also cause an InvalidEvidence for anything using a non-existent block, + // This will also cause an InvalidContent for anything using a non-existent block, // yet that's valid behavior // TODO: Double check the ramifications of this _ => Err(TendermintError::InvalidEvidence)?, @@ -237,16 +229,6 @@ pub enum SlashEvent { WithEvidence(Evidence), } -// Struct for if various upon handlers have been triggered to ensure they don't trigger multiple -// times. -#[derive(Clone, PartialEq, Eq, Debug)] -struct Upons { - upon_prevotes: bool, - upon_successful_current_round_prevotes: bool, - upon_negative_current_round_prevotes: bool, - upon_precommits: bool, -} - /// A machine executing the Tendermint protocol. pub struct TendermintMachine { db: N::Db, @@ -263,7 +245,6 @@ pub struct TendermintMachine { synced_block_result_send: mpsc::UnboundedSender, block: BlockData, - upons: Upons, } pub struct SyncedBlock { @@ -344,13 +325,6 @@ impl TendermintMachine { ); sleep(time_until_round_end).await; - // Clear the message tape - { - let mut txn = self.db.txn(); - txn.del(&message_tape_key(self.genesis)); - txn.commit(); - } - // Clear our outbound message queue self.queue = VecDeque::new(); @@ -364,14 +338,6 @@ impl TendermintMachine { proposal, ); - // Clear upons - self.upons = Upons { - upon_prevotes: false, - upon_successful_current_round_prevotes: false, - upon_negative_current_round_prevotes: false, - upon_precommits: false, - }; - // Start the first round self.round(RoundNumber(0), Some(round_end)); } @@ -409,414 +375,6 @@ impl TendermintMachine { } } - fn proposal_for_round(&self, round: RoundNumber) -> Option<(Option, &N::Block)> { - let proposer = self.weights.proposer(self.block.number, round); - if let Some(proposal_signed) = self.block.log.get(round, proposer, Step::Propose) { - if let Data::Proposal(vr, block) = &proposal_signed.msg.data { - Some((*vr, block)) - } else { - panic!("message for Step::Propose didn't have Data::Proposal"); - } - } else { - None? - } - } - - // L22-27 - fn upon_proposal_without_valid_round(&mut self) { - if self.block.round().step != Step::Propose { - return; - } - - // If we have the proposal message... - let Some((None, block)) = self.proposal_for_round(self.block.round().number) else { - return; - }; - - // There either needs to not be a locked value or it must be equivalent - #[allow(clippy::map_unwrap_or)] - if self - .block - .locked - .as_ref() - .map(|(_round, locked_block)| block.id() == *locked_block) - .unwrap_or(true) - { - self.broadcast(Data::Prevote(Some(block.id()))); - } else { - self.broadcast(Data::Prevote(None)); - } - } - - // L28-33 - fn upon_proposal_with_valid_round(&mut self) { - if self.block.round().step != Step::Propose { - return; - } - - // If we have the proposal message... - let Some((Some(proposal_valid_round), block)) = - self.proposal_for_round(self.block.round().number) - else { - return; - }; - - // Check we have the necessary prevotes - if !self.block.log.has_consensus(proposal_valid_round, &Data::Prevote(Some(block.id()))) { - return; - } - - // We don't check valid round < current round as the `message` function does - - // If locked is None, lockedRoundp is -1 and less than valid round - #[allow(clippy::map_unwrap_or)] - let locked_clause_1 = self - .block - .locked - .as_ref() - .map(|(locked_round, _block)| locked_round.0 <= proposal_valid_round.0) - .unwrap_or(true); - // The second clause is if the locked values are equivalent. If no value is locked, they aren't - #[allow(clippy::map_unwrap_or)] - let locked_clause_2 = self - .block - .locked - .as_ref() - .map(|(_round, locked_block)| block.id() == *locked_block) - .unwrap_or(false); - - if locked_clause_1 || locked_clause_2 { - self.broadcast(Data::Prevote(Some(block.id()))); - } else { - self.broadcast(Data::Prevote(None)); - } - } - - // L34-35 - fn upon_prevotes(&mut self) { - if self.upons.upon_prevotes || (self.block.round().step != Step::Prevote) { - return; - } - - if self.block.log.has_participation(self.block.round().number, Step::Prevote) { - self.block.round_mut().set_timeout(Step::Prevote); - self.upons.upon_prevotes = true; - } - } - - // L36-43 - async fn upon_successful_current_round_prevotes(&mut self) { - // Returning if `self.step == Step::Propose` is equivalent to guarding `step >= prevote` - if self.upons.upon_successful_current_round_prevotes || - (self.block.round().step == Step::Propose) - { - return; - } - - // If we have the proposal message... - let Some((_, block)) = self.proposal_for_round(self.block.round().number) else { - return; - }; - - // Check we have the necessary prevotes - if !self.block.log.has_consensus(self.block.round().number, &Data::Prevote(Some(block.id()))) { - return; - } - - let block = block.clone(); - self.upons.upon_successful_current_round_prevotes = true; - - if self.block.round().step == Step::Prevote { - self.block.locked = Some((self.block.round().number, block.id())); - let signature = self - .signer - .sign(&commit_msg( - self.block.end_time[&self.block.round().number].canonical(), - block.id().as_ref(), - )) - .await; - self.broadcast(Data::Precommit(Some((block.id(), signature)))); - } - self.block.valid = Some((self.block.round().number, block)); - } - - // L44-46 - fn upon_negative_current_round_prevotes(&mut self) { - if self.upons.upon_negative_current_round_prevotes || (self.block.round().step != Step::Prevote) - { - return; - } - - if self.block.log.has_consensus(self.block.round().number, &Data::Prevote(None)) { - self.broadcast(Data::Precommit(None)); - } - - self.upons.upon_negative_current_round_prevotes = true; - } - - // L47-48 - fn upon_precommits(&mut self) { - if self.upons.upon_precommits { - return; - } - - if self.block.log.has_participation(self.block.round().number, Step::Precommit) { - self.block.round_mut().set_timeout(Step::Precommit); - self.upons.upon_precommits = true; - } - } - - // L22-48 - async fn all_current_round_upons(&mut self) { - self.upon_proposal_without_valid_round(); - self.upon_proposal_with_valid_round(); - self.upon_prevotes(); - self.upon_successful_current_round_prevotes().await; - self.upon_negative_current_round_prevotes(); - self.upon_precommits(); - } - - // L49-54 - async fn upon_successful_precommits(&mut self, round: RoundNumber) -> bool { - // If we have the proposal message... - let Some((_, block)) = self.proposal_for_round(round) else { return false }; - - // Check we have the necessary precommits - // The precommit we check we have consensus upon uses a junk signature since message equality - // disregards the signature - if !self - .block - .log - .has_consensus(round, &Data::Precommit(Some((block.id(), self.signer.sign(&[]).await)))) - { - return false; - } - - // Get all participants in this commit - let mut validators = vec![]; - let mut sigs = vec![]; - // Get all precommits for this round - for (validator, msgs) in &self.block.log.log[&round] { - if let Some(signed) = msgs.get(&Step::Precommit) { - if let Data::Precommit(Some((id, sig))) = &signed.msg.data { - // If this precommit was for this block, include it - if *id == block.id() { - validators.push(*validator); - sigs.push(sig.clone()); - } - } - } - } - - // Form the commit itself - let commit_msg = commit_msg(self.block.end_time[&round].canonical(), block.id().as_ref()); - let commit = Commit { - end_time: self.block.end_time[&round].canonical(), - validators: validators.clone(), - signature: self.network.signature_scheme().aggregate(&validators, &commit_msg, &sigs), - }; - debug_assert!(self.network.verify_commit(block.id(), &commit)); - - // Add the block and reset the machine - log::info!( - target: "tendermint", - "TendermintMachine produced block {}", - hex::encode(block.id().as_ref()), - ); - let id = block.id(); - let proposal = self.network.add_block(block.clone(), commit).await; - log::trace!( - target: "tendermint", - "added block {} (produced by machine)", - hex::encode(id.as_ref()), - ); - self.reset(round, proposal).await; - - true - } - - // L49-54 - async fn all_any_round_upons(&mut self, round: RoundNumber) -> bool { - self.upon_successful_precommits(round).await - } - - // Returns Ok(true) if this was a Precommit which had either no signature or its signature - // validated - // Returns Ok(false) if it wasn't a Precommit or the signature wasn't validated yet - // Returns Err if the signature was invalid - async fn verify_precommit_signature( - &mut self, - signed: &SignedMessageFor, - ) -> Result { - let msg = &signed.msg; - if let Data::Precommit(precommit) = &msg.data { - let Some((id, sig)) = precommit else { return Ok(true) }; - // Also verify the end_time of the commit - // Only perform this verification if we already have the end_time - // Else, there's a DoS where we receive a precommit for some round infinitely in the future - // which forces us to calculate every end time - if let Some(end_time) = self.block.end_time.get(&msg.round) { - if !self.validators.verify(msg.sender, &commit_msg(end_time.canonical(), id.as_ref()), sig) - { - log::warn!(target: "tendermint", "validator produced an invalid commit signature"); - self - .slash( - msg.sender, - SlashEvent::WithEvidence(Evidence::InvalidPrecommit(signed.encode())), - ) - .await; - Err(TendermintError::Malicious)?; - } - return Ok(true); - } - } - Ok(false) - } - - async fn message(&mut self, signed: &SignedMessageFor) -> Result<(), TendermintError> { - let msg = &signed.msg; - if msg.block != self.block.number { - Err(TendermintError::Temporal)?; - } - - // If this is a precommit, verify its signature - self.verify_precommit_signature(signed).await?; - - // Only let the proposer propose - if matches!(msg.data, Data::Proposal(..)) && - (msg.sender != self.weights.proposer(msg.block, msg.round)) - { - log::warn!(target: "tendermint", "validator who wasn't the proposer proposed"); - // TODO: This should have evidence - self - .slash(msg.sender, SlashEvent::Id(SlashReason::InvalidProposer, msg.block.0, msg.round.0)) - .await; - Err(TendermintError::Malicious)?; - }; - - // If this is a proposal, verify the block - // If the block is invalid, drop the message, letting the timeout cover it - // This prevents needing to check if valid inside every `upon` block - if let Data::Proposal(_, block) = &msg.data { - match self.network.validate(block).await { - Ok(()) => {} - Err(BlockError::Temporal) => return Err(TendermintError::Temporal), - Err(BlockError::Fatal) => { - log::warn!(target: "tendermint", "validator proposed a fatally invalid block"); - self - .slash( - msg.sender, - SlashEvent::Id(SlashReason::InvalidBlock, self.block.number.0, msg.round.0), - ) - .await; - Err(TendermintError::Malicious)?; - } - }; - } - - // If this is a proposal, verify the valid round isn't fundamentally invalid - if let Data::Proposal(Some(valid_round), _) = msg.data { - if valid_round.0 >= msg.round.0 { - log::warn!( - target: "tendermint", - "proposed proposed with a syntactically invalid valid round", - ); - self - .slash(msg.sender, SlashEvent::WithEvidence(Evidence::InvalidValidRound(msg.encode()))) - .await; - Err(TendermintError::Malicious)?; - } - } - - // Add it to the log, returning if it was already handled - match self.block.log.log(signed.clone()) { - Ok(true) => {} - Ok(false) => Err(TendermintError::AlreadyHandled)?, - Err(evidence) => { - self.slash(msg.sender, SlashEvent::WithEvidence(evidence)).await; - Err(TendermintError::Malicious)?; - } - } - log::debug!( - target: "tendermint", - "received new tendermint message (block: {}, round: {}, step: {:?})", - msg.block.0, - msg.round.0, - msg.data.step(), - ); - - // L55-56 - // Jump ahead if we should - if (msg.round.0 > self.block.round().number.0) && - (self.block.log.round_participation(msg.round) >= self.weights.fault_threshold()) - { - log::debug!( - target: "tendermint", - "jumping from round {} to round {}", - self.block.round().number.0, - msg.round.0, - ); - - // Jump to the new round. - let old_round = self.block.round().number; - self.round(msg.round, None); - - // If any jumped over/to round already has precommit messages, verify their signatures - for jumped in (old_round.0 + 1) ..= msg.round.0 { - let jumped = RoundNumber(jumped); - let round_msgs = self.block.log.log.get(&jumped).cloned().unwrap_or_default(); - for (validator, msgs) in &round_msgs { - if let Some(existing) = msgs.get(&Step::Precommit) { - if let Ok(res) = self.verify_precommit_signature(existing).await { - // Ensure this actually verified the signature instead of believing it shouldn't yet - assert!(res); - } else { - // Remove the message so it isn't counted towards forming a commit/included in one - // This won't remove the fact they precommitted for this block hash in the MessageLog - // TODO: Don't even log these in the first place until we jump, preventing needing - // to do this in the first place - self - .block - .log - .log - .get_mut(&jumped) - .unwrap() - .get_mut(validator) - .unwrap() - .remove(&Step::Precommit) - .unwrap(); - } - } - } - } - } - - // Now that we've jumped, and: - // 1) If this is a message for an old round, verified the precommit signatures - // 2) If this is a message for what was the current round, verified the precommit signatures - // 3) If this is a message for what was a future round, verified the precommit signatures if it - // has 34+% participation - // Run all `upons` run for any round, which may produce a Commit if it has 67+% participation - // (returning true if it does, letting us return now) - // It's necessary to verify the precommit signatures before Commit production is allowed, hence - // this specific flow - if self.all_any_round_upons(msg.round).await { - return Ok(()); - } - - // If this is a historic round, or a future round without sufficient participation, return - if msg.round.0 != self.block.round().number.0 { - return Ok(()); - } - // msg.round is now guaranteed to be equal to self.block.round().number - debug_assert_eq!(msg.round, self.block.round().number); - - // Run all `upons` run for the current round - self.all_current_round_upons().await; - - Ok(()) - } - /// Create a new Tendermint machine, from the specified point, with the specified block as the /// one to propose next. This will return a channel to send messages from the gossip layer and /// the machine itself. The machine should have `run` called from an asynchronous task. @@ -861,7 +419,7 @@ impl TendermintMachine { let validators = network.signature_scheme(); let weights = Arc::new(network.weights()); let validator_id = signer.validator_id().await; - // L01-10 + // 01-10 let mut machine = TendermintMachine { db: db.clone(), genesis, @@ -884,13 +442,6 @@ impl TendermintMachine { validator_id, Some(proposal), ), - - upons: Upons { - upon_prevotes: false, - upon_successful_current_round_prevotes: false, - upon_negative_current_round_prevotes: false, - upon_precommits: false, - }, }; // The end time of the last block is the start time for this one @@ -909,16 +460,16 @@ impl TendermintMachine { pub async fn run(mut self) { log::debug!(target: "tendermint", "running TendermintMachine"); - let mut rebroadcast_future = Box::pin(sleep(Duration::from_secs(60))).fuse(); loop { // Also create a future for if the queue has a message // Does not pop_front as if another message has higher priority, its future will be handled // instead in this loop, and the popped value would be dropped with the next iteration + // While no other message has a higher priority right now, this is a safer practice let mut queue_future = if self.queue.is_empty() { Fuse::terminated() } else { future::ready(()).fuse() }; if let Some((our_message, msg, mut sig)) = futures_util::select_biased! { - // Handle a new block occurring externally (from an external sync loop) + // Handle a new block occurring externally (an external sync loop) // Has the highest priority as it makes all other futures here irrelevant msg = self.synced_block_recv.next() => { if let Some(SyncedBlock { number, block, commit }) = msg { @@ -952,21 +503,18 @@ impl TendermintMachine { Some((true, self.queue.pop_front().unwrap(), None)) }, - // L57-67 // Handle any timeouts step = self.block.round().timeout_future().fuse() => { // Remove the timeout so it doesn't persist, always being the selected future due to bias // While this does enable the timeout to be entered again, the timeout setting code will // never attempt to add a timeout after its timeout has expired - // (due to it setting an `upon` boolean) self.block.round_mut().timeouts.remove(&step); - - match step { - Step::Propose => { - // Only run if it's still the step in question - if self.block.round().step == step { + // Only run if it's still the step in question + if self.block.round().step == step { + match step { + Step::Propose => { // Slash the validator for not proposing when they should've - log::debug!(target: "tendermint", "validator didn't propose when they should have"); + log::debug!(target: "tendermint", "Validator didn't propose when they should have"); // this slash will be voted on. self.slash( self.weights.proposer(self.block.number, self.block.round().number), @@ -977,42 +525,14 @@ impl TendermintMachine { ), ).await; self.broadcast(Data::Prevote(None)); + }, + Step::Prevote => self.broadcast(Data::Precommit(None)), + Step::Precommit => { + self.round(RoundNumber(self.block.round().number.0 + 1), None); + continue; } - }, - Step::Prevote => { - // Only run if it's still the step in question - if self.block.round().step == step { - self.broadcast(Data::Precommit(None)) - } - }, - Step::Precommit => { - self.round(RoundNumber(self.block.round().number.0 + 1), None); } - }; - - // Execute the upons now that the state has changed - self.all_any_round_upons(self.block.round().number).await; - self.all_current_round_upons().await; - - None - }, - - // If it's been more than 60s, rebroadcast our own messages - () = rebroadcast_future => { - let key = message_tape_key(self.genesis); - let messages = self.db.get(key).unwrap_or(vec![]); - let mut messages = messages.as_slice(); - - while !messages.is_empty() { - self.network.broadcast( - SignedMessageFor::::decode(&mut IoReader(&mut messages)) - .expect("saved invalid message to DB") - ).await; } - - // Reset the rebroadcast future - rebroadcast_future = Box::pin(sleep(core::time::Duration::from_secs(60))).fuse(); - None }, @@ -1034,31 +554,429 @@ impl TendermintMachine { } let sig = sig.unwrap(); + // TODO: message may internally call broadcast. We should check within broadcast it's not + // broadcasting our own message at this time. let signed_msg = SignedMessage { msg: msg.clone(), sig: sig.clone() }; let res = self.message(&signed_msg).await; - // If this is our message, and we hit an invariant, we could be slashed. - // We only broadcast our message after running it ourselves, to ensure it doesn't error, to - // ensure we don't get slashed on invariants. if res.is_err() && our_message { panic!("honest node (ourselves) had invalid behavior"); } + // Only now should we allow broadcasts since we're sure an invariant wasn't reached causing + // us to have invalid messages. - // Save this message to a linear tape of all our messages for this block, if ours - // TODO: Since we do this after we mark this message as sent to prevent equivocations, a - // precisely time reboot could cause this message marked as sent yet not added to the tape - if our_message { - let message_tape_key = message_tape_key(self.genesis); - let mut txn = self.db.txn(); - let mut message_tape = txn.get(&message_tape_key).unwrap_or(vec![]); - message_tape.extend(signed_msg.encode()); - txn.put(&message_tape_key, message_tape); + if res.is_ok() { + // Re-broadcast this since it's an original consensus message + self.network.broadcast(signed_msg).await; } - // Re-broadcast this since it's an original consensus message worth handling - if res.is_ok() { - self.network.broadcast(signed_msg).await; + match res { + Ok(None) => {} + Ok(Some(block)) => { + let mut validators = vec![]; + let mut sigs = vec![]; + // Get all precommits for this round + for (validator, msgs) in &self.block.log.log[&msg.round] { + if let Some(signed) = msgs.get(&Step::Precommit) { + if let Data::Precommit(Some((id, sig))) = &signed.msg.data { + // If this precommit was for this block, include it + if *id == block.id() { + validators.push(*validator); + sigs.push(sig.clone()); + } + } + } + } + + let commit_msg = + commit_msg(self.block.end_time[&msg.round].canonical(), block.id().as_ref()); + let commit = Commit { + end_time: self.block.end_time[&msg.round].canonical(), + validators: validators.clone(), + signature: self.network.signature_scheme().aggregate(&validators, &commit_msg, &sigs), + }; + debug_assert!(self.network.verify_commit(block.id(), &commit)); + + log::info!( + target: "tendermint", + "TendermintMachine produced block {}", + hex::encode(block.id().as_ref()), + ); + let id = block.id(); + let proposal = self.network.add_block(block, commit).await; + log::trace!( + target: "tendermint", + "added block {} (produced by machine)", + hex::encode(id.as_ref()), + ); + self.reset(msg.round, proposal).await; + } + Err(TendermintError::Malicious(sender, evidence)) => { + let current_msg = SignedMessage { msg: msg.clone(), sig: sig.clone() }; + + let slash = if let Some(ev) = evidence { + // if the malicious message contains a block, only vote to slash + // TODO: Should this decision be made at a higher level? + // A higher-level system may be able to verify if the contained block is fatally + // invalid + // A higher-level system may accept the bandwidth size of this, even if the issue is + // just the valid round field + if let Data::Proposal(_, _) = ¤t_msg.msg.data { + SlashEvent::Id( + SlashReason::InvalidBlock, + self.block.number.0, + self.block.round().number.0, + ) + } else { + // slash with evidence otherwise + SlashEvent::WithEvidence(ev) + } + } else { + // we don't have evidence. Slash with vote. + SlashEvent::Id( + SlashReason::InvalidMessage, + self.block.number.0, + self.block.round().number.0, + ) + }; + + // Each message that we're voting to slash over needs to be re-broadcasted so other + // validators also trigger their own votes + // TODO: should this be inside slash function? + if let SlashEvent::Id(_, _, _) = slash { + self.network.broadcast(current_msg).await; + } + + self.slash(sender, slash).await + } + Err( + TendermintError::Temporal | + TendermintError::AlreadyHandled | + TendermintError::InvalidEvidence, + ) => (), } } } } + + // Returns Ok(true) if this was a Precommit which had either no signature or its signature + // validated + // Returns Ok(false) if it wasn't a Precommit or the signature wasn't validated yet + // Returns Err if the signature was invalid + fn verify_precommit_signature( + &self, + signed: &SignedMessageFor, + ) -> Result> { + let msg = &signed.msg; + if let Data::Precommit(precommit) = &msg.data { + let Some((id, sig)) = precommit else { return Ok(true) }; + // Also verify the end_time of the commit + // Only perform this verification if we already have the end_time + // Else, there's a DoS where we receive a precommit for some round infinitely in the future + // which forces us to calculate every end time + if let Some(end_time) = self.block.end_time.get(&msg.round) { + if !self.validators.verify(msg.sender, &commit_msg(end_time.canonical(), id.as_ref()), sig) + { + log::warn!(target: "tendermint", "Validator produced an invalid commit signature"); + Err(TendermintError::Malicious( + msg.sender, + Some(Evidence::InvalidPrecommit(signed.encode())), + ))?; + } + return Ok(true); + } + } + Ok(false) + } + + async fn message( + &mut self, + signed: &SignedMessageFor, + ) -> Result, TendermintError> { + let msg = &signed.msg; + if msg.block != self.block.number { + Err(TendermintError::Temporal)?; + } + + if (msg.block == self.block.number) && + (msg.round == self.block.round().number) && + (msg.data.step() == Step::Propose) + { + log::trace!( + target: "tendermint", + "received Propose for block {}, round {}", + msg.block.0, + msg.round.0, + ); + } + + // If this is a precommit, verify its signature + self.verify_precommit_signature(signed)?; + + // Only let the proposer propose + if matches!(msg.data, Data::Proposal(..)) && + (msg.sender != self.weights.proposer(msg.block, msg.round)) + { + log::warn!(target: "tendermint", "Validator who wasn't the proposer proposed"); + // TODO: This should have evidence + Err(TendermintError::Malicious(msg.sender, None))?; + }; + + if !self.block.log.log(signed.clone())? { + return Err(TendermintError::AlreadyHandled); + } + log::debug!( + target: "tendermint", + "received new tendermint message (block: {}, round: {}, step: {:?})", + msg.block.0, + msg.round.0, + msg.data.step(), + ); + + // All functions, except for the finalizer and the jump, are locked to the current round + + // Run the finalizer to see if it applies + // 49-52 + if matches!(msg.data, Data::Proposal(..)) || matches!(msg.data, Data::Precommit(_)) { + let proposer = self.weights.proposer(self.block.number, msg.round); + + // Get the proposal + if let Some(proposal_signed) = self.block.log.get(msg.round, proposer, Step::Propose) { + if let Data::Proposal(_, block) = &proposal_signed.msg.data { + // Check if it has gotten a sufficient amount of precommits + // Uses a junk signature since message equality disregards the signature + if self.block.log.has_consensus( + msg.round, + &Data::Precommit(Some((block.id(), self.signer.sign(&[]).await))), + ) { + // If msg.round is in the future, these Precommits won't have their inner signatures + // verified + // It should be impossible for msg.round to be in the future however, as this requires + // 67% of validators to Precommit, and we jump on 34% participating in the new round + // The one exception would be if a validator had 34%, and could cause participation to + // go from 33% (not enough to jump) to 67%, without executing the below code + // This also would require the local machine to be outside of allowed time tolerances, + // or the validator with 34% to not be publishing Prevotes (as those would cause a + // a jump) + // Both are invariants + // TODO: Replace this panic with an inner signature check + assert!(msg.round.0 <= self.block.round().number.0); + + log::debug!(target: "tendermint", "block {} has consensus", msg.block.0); + return Ok(Some(block.clone())); + } + } + } + } + + // Else, check if we need to jump ahead + #[allow(clippy::comparison_chain)] + if msg.round.0 < self.block.round().number.0 { + // Prior round, disregard if not finalizing + return Ok(None); + } else if msg.round.0 > self.block.round().number.0 { + // 55-56 + // Jump, enabling processing by the below code + if self.block.log.round_participation(msg.round) > self.weights.fault_threshold() { + log::debug!( + target: "tendermint", + "jumping from round {} to round {}", + self.block.round().number.0, + msg.round.0, + ); + + // Jump to the new round. + let proposer = self.round(msg.round, None); + + // If this round already has precommit messages, verify their signatures + let round_msgs = self.block.log.log[&msg.round].clone(); + for (validator, msgs) in &round_msgs { + if let Some(existing) = msgs.get(&Step::Precommit) { + if let Ok(res) = self.verify_precommit_signature(existing) { + // Ensure this actually verified the signature instead of believing it shouldn't yet + assert!(res); + } else { + // Remove the message so it isn't counted towards forming a commit/included in one + // This won't remove the fact they precommitted for this block hash in the MessageLog + // TODO: Don't even log these in the first place until we jump, preventing needing + // to do this in the first place + let msg = self + .block + .log + .log + .get_mut(&msg.round) + .unwrap() + .get_mut(validator) + .unwrap() + .remove(&Step::Precommit) + .unwrap(); + + // Slash the validator for publishing an invalid commit signature + self + .slash( + *validator, + SlashEvent::WithEvidence(Evidence::InvalidPrecommit(msg.encode())), + ) + .await; + } + } + } + + // If we're the proposer, return now we don't waste time on the current round + // (as it doesn't have a proposal, since we didn't propose, and cannot complete) + if proposer { + return Ok(None); + } + } else { + // Future round which we aren't ready to jump to, so return for now + return Ok(None); + } + } + + // msg.round is now guaranteed to be equal to self.block.round().number + debug_assert_eq!(msg.round, self.block.round().number); + + // The paper executes these checks when the step is prevote. Making sure this message warrants + // rerunning these checks is a sane optimization since message instances is a full iteration + // of the round map + if (self.block.round().step == Step::Prevote) && matches!(msg.data, Data::Prevote(_)) { + let (participation, weight) = + self.block.log.message_instances(self.block.round().number, &Data::Prevote(None)); + let threshold_weight = self.weights.threshold(); + if participation < threshold_weight { + log::trace!( + target: "tendermint", + "progess towards setting prevote timeout, participation: {}, needed: {}", + participation, + threshold_weight, + ); + } + // 34-35 + if participation >= threshold_weight { + log::trace!( + target: "tendermint", + "setting timeout for prevote due to sufficient participation", + ); + self.block.round_mut().set_timeout(Step::Prevote); + } + + // 44-46 + if weight >= threshold_weight { + self.broadcast(Data::Precommit(None)); + return Ok(None); + } + } + + // 47-48 + if matches!(msg.data, Data::Precommit(_)) && + self.block.log.has_participation(self.block.round().number, Step::Precommit) + { + log::trace!( + target: "tendermint", + "setting timeout for precommit due to sufficient participation", + ); + self.block.round_mut().set_timeout(Step::Precommit); + } + + // All further operations require actually having the proposal in question + let proposer = self.weights.proposer(self.block.number, self.block.round().number); + let (vr, block) = if let Some(proposal_signed) = + self.block.log.get(self.block.round().number, proposer, Step::Propose) + { + if let Data::Proposal(vr, block) = &proposal_signed.msg.data { + (vr, block) + } else { + panic!("message for Step::Propose didn't have Data::Proposal"); + } + } else { + return Ok(None); + }; + + // 22-33 + if self.block.round().step == Step::Propose { + // Delay error handling (triggering a slash) until after we vote. + let (valid, err) = match self.network.validate(block).await { + Ok(()) => (true, Ok(None)), + Err(BlockError::Temporal) => (false, Ok(None)), + Err(BlockError::Fatal) => (false, { + log::warn!(target: "tendermint", "Validator proposed a fatally invalid block"); + // TODO: Produce evidence of this for the higher level code to decide what to do with + Err(TendermintError::Malicious(proposer, None)) + }), + }; + // Create a raw vote which only requires block validity as a basis for the actual vote. + let raw_vote = Some(block.id()).filter(|_| valid); + + // If locked is none, it has a round of -1 according to the protocol. That satisfies + // 23 and 29. If it's some, both are satisfied if they're for the same ID. If it's some + // with different IDs, the function on 22 rejects yet the function on 28 has one other + // condition + let locked = self.block.locked.as_ref().map_or(true, |(_, id)| id == &block.id()); + let mut vote = raw_vote.filter(|_| locked); + + if let Some(vr) = vr { + // Malformed message + if vr.0 >= self.block.round().number.0 { + log::warn!(target: "tendermint", "Validator claimed a round from the future was valid"); + Err(TendermintError::Malicious( + msg.sender, + Some(Evidence::InvalidValidRound(signed.encode())), + ))?; + } + + if self.block.log.has_consensus(*vr, &Data::Prevote(Some(block.id()))) { + // Allow differing locked values if the proposal has a newer valid round + // This is the other condition described above + if let Some((locked_round, _)) = self.block.locked.as_ref() { + vote = vote.or_else(|| raw_vote.filter(|_| locked_round.0 <= vr.0)); + } + + self.broadcast(Data::Prevote(vote)); + return err; + } + } else { + self.broadcast(Data::Prevote(vote)); + return err; + } + + return Ok(None); + } + + if self.block.valid.as_ref().map_or(true, |(round, _)| round != &self.block.round().number) { + // 36-43 + + // The run once condition is implemented above. Since valid will always be set by this, it + // not being set, or only being set historically, means this has yet to be run + + if self.block.log.has_consensus(self.block.round().number, &Data::Prevote(Some(block.id()))) { + match self.network.validate(block).await { + // BlockError::Temporal is due to a temporal error we have, yet a supermajority of the + // network does not, Because we do not believe this block to be fatally invalid, and + // because a supermajority deems it valid, accept it. + Ok(()) | Err(BlockError::Temporal) => (), + Err(BlockError::Fatal) => { + log::warn!(target: "tendermint", "Validator proposed a fatally invalid block"); + // TODO: Produce evidence of this for the higher level code to decide what to do with + Err(TendermintError::Malicious(proposer, None))? + } + }; + + self.block.valid = Some((self.block.round().number, block.clone())); + if self.block.round().step == Step::Prevote { + self.block.locked = Some((self.block.round().number, block.id())); + self.broadcast(Data::Precommit(Some(( + block.id(), + self + .signer + .sign(&commit_msg( + self.block.end_time[&self.block.round().number].canonical(), + block.id().as_ref(), + )) + .await, + )))); + } + } + } + + Ok(None) + } } diff --git a/coordinator/tributary/tendermint/src/message_log.rs b/coordinator/tributary/tendermint/src/message_log.rs index e65568ca..3959852d 100644 --- a/coordinator/tributary/tendermint/src/message_log.rs +++ b/coordinator/tributary/tendermint/src/message_log.rs @@ -2,7 +2,7 @@ use std::{sync::Arc, collections::HashMap}; use parity_scale_codec::Encode; -use crate::{ext::*, RoundNumber, Step, DataFor, SignedMessageFor, Evidence}; +use crate::{ext::*, RoundNumber, Step, DataFor, TendermintError, SignedMessageFor, Evidence}; type RoundLog = HashMap<::ValidatorId, HashMap>>; pub(crate) struct MessageLog { @@ -16,7 +16,7 @@ impl MessageLog { } // Returns true if it's a new message - pub(crate) fn log(&mut self, signed: SignedMessageFor) -> Result { + pub(crate) fn log(&mut self, signed: SignedMessageFor) -> Result> { let msg = &signed.msg; // Clarity, and safety around default != new edge cases let round = self.log.entry(msg.round).or_insert_with(HashMap::new); @@ -30,7 +30,10 @@ impl MessageLog { target: "tendermint", "Validator sent multiple messages for the same block + round + step" ); - Err(Evidence::ConflictingMessages(existing.encode(), signed.encode()))?; + Err(TendermintError::Malicious( + msg.sender, + Some(Evidence::ConflictingMessages(existing.encode(), signed.encode())), + ))?; } return Ok(false); } @@ -44,8 +47,7 @@ impl MessageLog { pub(crate) fn message_instances(&self, round: RoundNumber, data: &DataFor) -> (u64, u64) { let mut participating = 0; let mut weight = 0; - let Some(log) = self.log.get(&round) else { return (0, 0) }; - for (participant, msgs) in log { + for (participant, msgs) in &self.log[&round] { if let Some(msg) = msgs.get(&data.step()) { let validator_weight = self.weights.weight(*participant); participating += validator_weight; @@ -71,8 +73,7 @@ impl MessageLog { // Check if a supermajority of nodes have participated on a specific step pub(crate) fn has_participation(&self, round: RoundNumber, step: Step) -> bool { let mut participating = 0; - let Some(log) = self.log.get(&round) else { return false }; - for (participant, msgs) in log { + for (participant, msgs) in &self.log[&round] { if msgs.get(&step).is_some() { participating += self.weights.weight(*participant); } diff --git a/coordinator/tributary/tendermint/src/round.rs b/coordinator/tributary/tendermint/src/round.rs index a97e3ed1..445c2784 100644 --- a/coordinator/tributary/tendermint/src/round.rs +++ b/coordinator/tributary/tendermint/src/round.rs @@ -57,7 +57,6 @@ impl RoundData { // Poll all set timeouts, returning the Step whose timeout has just expired pub(crate) async fn timeout_future(&self) -> Step { - /* let now = Instant::now(); log::trace!( target: "tendermint", @@ -65,7 +64,6 @@ impl RoundData { self.step, self.timeouts.iter().map(|(k, v)| (k, v.duration_since(now))).collect::>() ); - */ let timeout_future = |step| { let timeout = self.timeouts.get(&step).copied(); diff --git a/coordinator/tributary/tendermint/tests/ext.rs b/coordinator/tributary/tendermint/tests/ext.rs index bec95ddc..3b3cf7c3 100644 --- a/coordinator/tributary/tendermint/tests/ext.rs +++ b/coordinator/tributary/tendermint/tests/ext.rs @@ -145,7 +145,7 @@ impl Network for TestNetwork { println!("Slash for {id} due to {event:?}"); } - async fn validate(&self, block: &TestBlock) -> Result<(), BlockError> { + async fn validate(&mut self, block: &TestBlock) -> Result<(), BlockError> { block.valid }