Cache the proposal for a round

This commit is contained in:
Luke Parker
2024-04-28 03:36:18 -04:00
parent 77d6c273e0
commit 209eae9a9a
2 changed files with 421 additions and 10 deletions

View File

@@ -3,7 +3,7 @@ use core::fmt::Debug;
use std::{
sync::Arc,
time::{SystemTime, Instant, Duration},
collections::VecDeque,
collections::{VecDeque, HashMap},
};
use parity_scale_codec::{Encode, Decode};
@@ -245,6 +245,10 @@ pub struct TendermintMachine<N: Network> {
synced_block_result_send: mpsc::UnboundedSender<bool>,
block: BlockData<N>,
// TODO: Move this into the Block struct
round_proposals: HashMap<RoundNumber, (Option<RoundNumber>, N::Block)>,
// TODO: Move this into the Round struct
upons: Upons,
}
pub struct SyncedBlock<N: Network> {
@@ -346,6 +350,9 @@ impl<N: Network + 'static> TendermintMachine<N> {
proposal,
);
// Reset the round proposals
self.round_proposals = HashMap::new();
// Start the first round
self.round(RoundNumber(0), Some(round_end));
}
@@ -383,6 +390,410 @@ impl<N: Network + 'static> TendermintMachine<N> {
}
}
fn proposal_for_round(&self, round: RoundNumber) -> Option<(Option<RoundNumber>, &N::Block)> {
self.round_proposals.get(&round).map(|(round, block)| (*round, block))
}
// L22-27
fn upon_proposal_without_valid_round(&mut self) {
if self.block.round().step != Step::Propose {
return;
}
// If we have the proposal message...
let Some((None, block)) = self.proposal_for_round(self.block.round().number) else {
return;
};
// There either needs to not be a locked value or it must be equivalent
#[allow(clippy::map_unwrap_or)]
if self
.block
.locked
.as_ref()
.map(|(_round, locked_block)| block.id() == *locked_block)
.unwrap_or(true)
{
self.broadcast(Data::Prevote(Some(block.id())));
} else {
self.broadcast(Data::Prevote(None));
}
}
// L28-33
fn upon_proposal_with_valid_round(&mut self) {
if self.block.round().step != Step::Propose {
return;
}
// If we have the proposal message...
let Some((Some(proposal_valid_round), block)) =
self.proposal_for_round(self.block.round().number)
else {
return;
};
// Check we have the necessary prevotes
if !self.block.log.has_consensus(proposal_valid_round, &Data::Prevote(Some(block.id()))) {
return;
}
// We don't check valid round < current round as the `message` function does
// If locked is None, lockedRoundp is -1 and less than valid round
#[allow(clippy::map_unwrap_or)]
let locked_clause_1 = self
.block
.locked
.as_ref()
.map(|(locked_round, _block)| locked_round.0 <= proposal_valid_round.0)
.unwrap_or(true);
// The second clause is if the locked values are equivalent. If no value is locked, they aren't
#[allow(clippy::map_unwrap_or)]
let locked_clause_2 = self
.block
.locked
.as_ref()
.map(|(_round, locked_block)| block.id() == *locked_block)
.unwrap_or(false);
if locked_clause_1 || locked_clause_2 {
self.broadcast(Data::Prevote(Some(block.id())));
} else {
self.broadcast(Data::Prevote(None));
}
}
// L34-35
fn upon_prevotes(&mut self) {
if self.upons.upon_prevotes || (self.block.round().step != Step::Prevote) {
return;
}
if self.block.log.has_participation(self.block.round().number, Step::Prevote) {
self.block.round_mut().set_timeout(Step::Prevote);
self.upons.upon_prevotes = true;
}
}
// L36-43
async fn upon_successful_current_round_prevotes(&mut self) {
// Returning if `self.step == Step::Propose` is equivalent to guarding `step >= prevote`
if self.upons.upon_successful_current_round_prevotes ||
(self.block.round().step == Step::Propose)
{
return;
}
// If we have the proposal message...
let Some((_, block)) = self.proposal_for_round(self.block.round().number) else {
return;
};
// Check we have the necessary prevotes
if !self.block.log.has_consensus(self.block.round().number, &Data::Prevote(Some(block.id()))) {
return;
}
let block = block.clone();
self.upons.upon_successful_current_round_prevotes = true;
if self.block.round().step == Step::Prevote {
self.block.locked = Some((self.block.round().number, block.id()));
let signature = self
.signer
.sign(&commit_msg(
self.block.end_time[&self.block.round().number].canonical(),
block.id().as_ref(),
))
.await;
self.broadcast(Data::Precommit(Some((block.id(), signature))));
}
self.block.valid = Some((self.block.round().number, block));
}
// L44-46
fn upon_negative_current_round_prevotes(&mut self) {
if self.upons.upon_negative_current_round_prevotes || (self.block.round().step != Step::Prevote)
{
return;
}
if self.block.log.has_consensus(self.block.round().number, &Data::Prevote(None)) {
self.broadcast(Data::Precommit(None));
}
self.upons.upon_negative_current_round_prevotes = true;
}
// L47-48
fn upon_precommits(&mut self) {
if self.upons.upon_precommits {
return;
}
if self.block.log.has_participation(self.block.round().number, Step::Precommit) {
self.block.round_mut().set_timeout(Step::Precommit);
self.upons.upon_precommits = true;
}
}
// L22-48
async fn all_current_round_upons(&mut self) {
self.upon_proposal_without_valid_round();
self.upon_proposal_with_valid_round();
self.upon_prevotes();
self.upon_successful_current_round_prevotes().await;
self.upon_negative_current_round_prevotes();
self.upon_precommits();
}
// L49-54
async fn upon_successful_precommits(&mut self, round: RoundNumber) -> bool {
// If we have the proposal message...
let Some((_, block)) = self.proposal_for_round(round) else { return false };
// Check we have the necessary precommits
// The precommit we check we have consensus upon uses a junk signature since message equality
// disregards the signature
if !self
.block
.log
.has_consensus(round, &Data::Precommit(Some((block.id(), self.signer.sign(&[]).await))))
{
return false;
}
// Get all participants in this commit
let mut validators = vec![];
let mut sigs = vec![];
// Get all precommits for this round
for (validator, msgs) in &self.block.log.log[&round] {
if let Some(signed) = msgs.get(&Step::Precommit) {
if let Data::Precommit(Some((id, sig))) = &signed.msg.data {
// If this precommit was for this block, include it
if *id == block.id() {
validators.push(*validator);
sigs.push(sig.clone());
}
}
}
}
// Form the commit itself
let commit_msg = commit_msg(self.block.end_time[&round].canonical(), block.id().as_ref());
let commit = Commit {
end_time: self.block.end_time[&round].canonical(),
validators: validators.clone(),
signature: self.network.signature_scheme().aggregate(&validators, &commit_msg, &sigs),
};
debug_assert!(self.network.verify_commit(block.id(), &commit));
// Add the block and reset the machine
log::info!(
target: "tendermint",
"TendermintMachine produced block {}",
hex::encode(block.id().as_ref()),
);
let id = block.id();
let proposal = self.network.add_block(block.clone(), commit).await;
log::trace!(
target: "tendermint",
"added block {} (produced by machine)",
hex::encode(id.as_ref()),
);
self.reset(round, proposal).await;
true
}
// L49-54
async fn all_any_round_upons(&mut self, round: RoundNumber) -> bool {
self.upon_successful_precommits(round).await
}
// Returns Ok(true) if this was a Precommit which had either no signature or its signature
// validated
// Returns Ok(false) if it wasn't a Precommit or the signature wasn't validated yet
// Returns Err if the signature was invalid
async fn verify_precommit_signature(
&mut self,
signed: &SignedMessageFor<N>,
) -> Result<bool, TendermintError> {
let msg = &signed.msg;
if let Data::Precommit(precommit) = &msg.data {
let Some((id, sig)) = precommit else { return Ok(true) };
// Also verify the end_time of the commit
// Only perform this verification if we already have the end_time
// Else, there's a DoS where we receive a precommit for some round infinitely in the future
// which forces us to calculate every end time
if let Some(end_time) = self.block.end_time.get(&msg.round) {
if !self.validators.verify(msg.sender, &commit_msg(end_time.canonical(), id.as_ref()), sig)
{
log::warn!(target: "tendermint", "validator produced an invalid commit signature");
self
.slash(
msg.sender,
SlashEvent::WithEvidence(Evidence::InvalidPrecommit(signed.encode())),
)
.await;
Err(TendermintError::Malicious)?;
}
return Ok(true);
}
}
Ok(false)
}
async fn message(&mut self, signed: &SignedMessageFor<N>) -> Result<(), TendermintError> {
let msg = &signed.msg;
if msg.block != self.block.number {
Err(TendermintError::Temporal)?;
}
// If this is a precommit, verify its signature
self.verify_precommit_signature(signed).await?;
// Only let the proposer propose
if matches!(msg.data, Data::Proposal(..)) &&
(msg.sender != self.weights.proposer(msg.block, msg.round))
{
log::warn!(target: "tendermint", "validator who wasn't the proposer proposed");
// TODO: This should have evidence
self
.slash(msg.sender, SlashEvent::Id(SlashReason::InvalidProposer, msg.block.0, msg.round.0))
.await;
Err(TendermintError::Malicious)?;
};
// If this is a proposal, verify the block
// If the block is invalid, drop the message, letting the timeout cover it
// This prevents needing to check if valid inside every `upon` block
if let Data::Proposal(_, block) = &msg.data {
match self.network.validate(block).await {
Ok(()) => {}
Err(BlockError::Temporal) => return Err(TendermintError::Temporal),
Err(BlockError::Fatal) => {
log::warn!(target: "tendermint", "validator proposed a fatally invalid block");
self
.slash(
msg.sender,
SlashEvent::Id(SlashReason::InvalidBlock, self.block.number.0, msg.round.0),
)
.await;
Err(TendermintError::Malicious)?;
}
};
}
// If this is a proposal, verify the valid round isn't fundamentally invalid
if let Data::Proposal(Some(valid_round), _) = msg.data {
if valid_round.0 >= msg.round.0 {
log::warn!(
target: "tendermint",
"proposed proposed with a syntactically invalid valid round",
);
self
.slash(msg.sender, SlashEvent::WithEvidence(Evidence::InvalidValidRound(msg.encode())))
.await;
Err(TendermintError::Malicious)?;
}
}
// Add it to the log, returning if it was already handled
match self.block.log.log(signed.clone()) {
Ok(true) => {}
Ok(false) => Err(TendermintError::AlreadyHandled)?,
Err(evidence) => {
self.slash(msg.sender, SlashEvent::WithEvidence(evidence)).await;
Err(TendermintError::Malicious)?;
}
}
log::debug!(
target: "tendermint",
"received new tendermint message (block: {}, round: {}, step: {:?})",
msg.block.0,
msg.round.0,
msg.data.step(),
);
// If this is a proposal, insert it
if let Data::Proposal(vr, block) = &msg.data {
self.round_proposals.insert(msg.round, (*vr, block.clone()));
}
// L55-56
// Jump ahead if we should
if (msg.round.0 > self.block.round().number.0) &&
(self.block.log.round_participation(msg.round) >= self.weights.fault_threshold())
{
log::debug!(
target: "tendermint",
"jumping from round {} to round {}",
self.block.round().number.0,
msg.round.0,
);
// Jump to the new round.
let old_round = self.block.round().number;
self.round(msg.round, None);
// If any jumped over/to round already has precommit messages, verify their signatures
for jumped in (old_round.0 + 1) ..= msg.round.0 {
let jumped = RoundNumber(jumped);
let round_msgs = self.block.log.log.get(&jumped).cloned().unwrap_or_default();
for (validator, msgs) in &round_msgs {
if let Some(existing) = msgs.get(&Step::Precommit) {
if let Ok(res) = self.verify_precommit_signature(existing).await {
// Ensure this actually verified the signature instead of believing it shouldn't yet
assert!(res);
} else {
// Remove the message so it isn't counted towards forming a commit/included in one
// This won't remove the fact they precommitted for this block hash in the MessageLog
// TODO: Don't even log these in the first place until we jump, preventing needing
// to do this in the first place
self
.block
.log
.log
.get_mut(&jumped)
.unwrap()
.get_mut(validator)
.unwrap()
.remove(&Step::Precommit)
.unwrap();
}
}
}
}
}
// Now that we've jumped, and:
// 1) If this is a message for an old round, verified the precommit signatures
// 2) If this is a message for what was the current round, verified the precommit signatures
// 3) If this is a message for what was a future round, verified the precommit signatures if it
// has 34+% participation
// Run all `upons` run for any round, which may produce a Commit if it has 67+% participation
// (returning true if it does, letting us return now)
// It's necessary to verify the precommit signatures before Commit production is allowed, hence
// this specific flow
if self.all_any_round_upons(msg.round).await {
return Ok(());
}
// If this is a historic round, or a future round without sufficient participation, return
if msg.round.0 != self.block.round().number.0 {
return Ok(());
}
// msg.round is now guaranteed to be equal to self.block.round().number
debug_assert_eq!(msg.round, self.block.round().number);
// Run all `upons` run for the current round
self.all_current_round_upons().await;
Ok(())
}
/// Create a new Tendermint machine, from the specified point, with the specified block as the
/// one to propose next. This will return a channel to send messages from the gossip layer and
/// the machine itself. The machine should have `run` called from an asynchronous task.
@@ -450,6 +861,15 @@ impl<N: Network + 'static> TendermintMachine<N> {
validator_id,
Some(proposal),
),
round_proposals: HashMap::new(),
upons: Upons {
upon_prevotes: false,
upon_successful_current_round_prevotes: false,
upon_negative_current_round_prevotes: false,
upon_precommits: false,
},
};
// The end time of the last block is the start time for this one

View File

@@ -86,13 +86,4 @@ impl<N: Network> MessageLog<N> {
let (_, weight) = self.message_instances(round, data);
weight >= self.weights.threshold()
}
pub(crate) fn get(
&self,
round: RoundNumber,
sender: N::ValidatorId,
step: Step,
) -> Option<&SignedMessageFor<N>> {
self.log.get(&round).and_then(|round| round.get(&sender).and_then(|msgs| msgs.get(&step)))
}
}