mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-10 13:09:24 +00:00
Rebase onto develop, which reverted this PR, and re-apply this PR
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
use core::{marker::PhantomData, fmt::Debug};
|
||||
use std::{sync::Arc, io, collections::VecDeque};
|
||||
use std::{sync::Arc, io};
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
||||
@@ -154,14 +154,6 @@ pub struct Tributary<D: Db, T: TransactionTrait, P: P2p> {
|
||||
synced_block: Arc<RwLock<SyncedBlockSender<TendermintNetwork<D, T, P>>>>,
|
||||
synced_block_result: Arc<RwLock<SyncedBlockResultReceiver>>,
|
||||
messages: Arc<RwLock<MessageSender<TendermintNetwork<D, T, P>>>>,
|
||||
|
||||
p2p_meta_task_handle: Arc<tokio::task::AbortHandle>,
|
||||
}
|
||||
|
||||
impl<D: Db, T: TransactionTrait, P: P2p> Drop for Tributary<D, T, P> {
|
||||
fn drop(&mut self) {
|
||||
self.p2p_meta_task_handle.abort();
|
||||
}
|
||||
}
|
||||
|
||||
impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
|
||||
@@ -193,28 +185,7 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
|
||||
);
|
||||
let blockchain = Arc::new(RwLock::new(blockchain));
|
||||
|
||||
let to_rebroadcast = Arc::new(RwLock::new(VecDeque::new()));
|
||||
// Actively rebroadcast consensus messages to ensure they aren't prematurely dropped from the
|
||||
// P2P layer
|
||||
let p2p_meta_task_handle = Arc::new(
|
||||
tokio::spawn({
|
||||
let to_rebroadcast = to_rebroadcast.clone();
|
||||
let p2p = p2p.clone();
|
||||
async move {
|
||||
loop {
|
||||
let to_rebroadcast = to_rebroadcast.read().await.clone();
|
||||
for msg in to_rebroadcast {
|
||||
p2p.broadcast(genesis, msg).await;
|
||||
}
|
||||
tokio::time::sleep(core::time::Duration::from_secs(60)).await;
|
||||
}
|
||||
}
|
||||
})
|
||||
.abort_handle(),
|
||||
);
|
||||
|
||||
let network =
|
||||
TendermintNetwork { genesis, signer, validators, blockchain, to_rebroadcast, p2p };
|
||||
let network = TendermintNetwork { genesis, signer, validators, blockchain, p2p };
|
||||
|
||||
let TendermintHandle { synced_block, synced_block_result, messages, machine } =
|
||||
TendermintMachine::new(
|
||||
@@ -235,7 +206,6 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
|
||||
synced_block: Arc::new(RwLock::new(synced_block)),
|
||||
synced_block_result: Arc::new(RwLock::new(synced_block_result)),
|
||||
messages: Arc::new(RwLock::new(messages)),
|
||||
p2p_meta_task_handle,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -1,8 +1,5 @@
|
||||
use core::ops::Deref;
|
||||
use std::{
|
||||
sync::Arc,
|
||||
collections::{VecDeque, HashMap},
|
||||
};
|
||||
use std::{sync::Arc, collections::HashMap};
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
||||
@@ -270,8 +267,6 @@ pub struct TendermintNetwork<D: Db, T: TransactionTrait, P: P2p> {
|
||||
pub(crate) validators: Arc<Validators>,
|
||||
pub(crate) blockchain: Arc<RwLock<Blockchain<D, T>>>,
|
||||
|
||||
pub(crate) to_rebroadcast: Arc<RwLock<VecDeque<Vec<u8>>>>,
|
||||
|
||||
pub(crate) p2p: P,
|
||||
}
|
||||
|
||||
@@ -308,26 +303,6 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
|
||||
async fn broadcast(&mut self, msg: SignedMessageFor<Self>) {
|
||||
let mut to_broadcast = vec![TENDERMINT_MESSAGE];
|
||||
to_broadcast.extend(msg.encode());
|
||||
|
||||
// Since we're broadcasting a Tendermint message, set it to be re-broadcasted every second
|
||||
// until the block it's trying to build is complete
|
||||
// If the P2P layer drops a message before all nodes obtained access, or a node had an
|
||||
// intermittent failure, this will ensure reconcilliation
|
||||
// This is atrocious if there's no content-based deduplication protocol for messages actively
|
||||
// being gossiped
|
||||
// LibP2p, as used by Serai, is configured to content-based deduplicate
|
||||
{
|
||||
let mut to_rebroadcast_lock = self.to_rebroadcast.write().await;
|
||||
to_rebroadcast_lock.push_back(to_broadcast.clone());
|
||||
// We should have, ideally, 3 * validators messages within a round
|
||||
// Therefore, this should keep the most recent 2-rounds
|
||||
// TODO: This isn't perfect. Each participant should just rebroadcast their latest round of
|
||||
// messages
|
||||
while to_rebroadcast_lock.len() > (6 * self.validators.weights.len()) {
|
||||
to_rebroadcast_lock.pop_front();
|
||||
}
|
||||
}
|
||||
|
||||
self.p2p.broadcast(self.genesis, to_broadcast).await
|
||||
}
|
||||
|
||||
@@ -366,7 +341,7 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
|
||||
}
|
||||
}
|
||||
|
||||
async fn validate(&mut self, block: &Self::Block) -> Result<(), TendermintBlockError> {
|
||||
async fn validate(&self, block: &Self::Block) -> Result<(), TendermintBlockError> {
|
||||
let block =
|
||||
Block::read::<&[u8]>(&mut block.0.as_ref()).map_err(|_| TendermintBlockError::Fatal)?;
|
||||
self
|
||||
@@ -428,9 +403,6 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
|
||||
}
|
||||
}
|
||||
|
||||
// Since we've added a valid block, clear to_rebroadcast
|
||||
*self.to_rebroadcast.write().await = VecDeque::new();
|
||||
|
||||
Some(TendermintBlock(
|
||||
self.blockchain.write().await.build_block::<Self>(&self.signature_scheme()).serialize(),
|
||||
))
|
||||
|
||||
@@ -3,7 +3,6 @@ use std::{
|
||||
collections::{HashSet, HashMap},
|
||||
};
|
||||
|
||||
use parity_scale_codec::Encode;
|
||||
use serai_db::{Get, DbTxn, Db};
|
||||
|
||||
use crate::{
|
||||
@@ -20,7 +19,7 @@ pub(crate) struct BlockData<N: Network> {
|
||||
|
||||
pub(crate) number: BlockNumber,
|
||||
pub(crate) validator_id: Option<N::ValidatorId>,
|
||||
pub(crate) proposal: Option<N::Block>,
|
||||
pub(crate) our_proposal: Option<N::Block>,
|
||||
|
||||
pub(crate) log: MessageLog<N>,
|
||||
pub(crate) slashes: HashSet<N::ValidatorId>,
|
||||
@@ -43,7 +42,7 @@ impl<N: Network> BlockData<N> {
|
||||
weights: Arc<N::Weights>,
|
||||
number: BlockNumber,
|
||||
validator_id: Option<N::ValidatorId>,
|
||||
proposal: Option<N::Block>,
|
||||
our_proposal: Option<N::Block>,
|
||||
) -> BlockData<N> {
|
||||
BlockData {
|
||||
db,
|
||||
@@ -51,7 +50,7 @@ impl<N: Network> BlockData<N> {
|
||||
|
||||
number,
|
||||
validator_id,
|
||||
proposal,
|
||||
our_proposal,
|
||||
|
||||
log: MessageLog::new(weights),
|
||||
slashes: HashSet::new(),
|
||||
@@ -108,17 +107,17 @@ impl<N: Network> BlockData<N> {
|
||||
self.populate_end_time(round);
|
||||
}
|
||||
|
||||
// 11-13
|
||||
// L11-13
|
||||
self.round = Some(RoundData::<N>::new(
|
||||
round,
|
||||
time.unwrap_or_else(|| self.end_time[&RoundNumber(round.0 - 1)]),
|
||||
));
|
||||
self.end_time.insert(round, self.round().end_time());
|
||||
|
||||
// 14-21
|
||||
// L14-21
|
||||
if Some(proposer) == self.validator_id {
|
||||
let (round, block) = self.valid.clone().unzip();
|
||||
block.or_else(|| self.proposal.clone()).map(|block| Data::Proposal(round, block))
|
||||
block.or_else(|| self.our_proposal.clone()).map(|block| Data::Proposal(round, block))
|
||||
} else {
|
||||
self.round_mut().set_timeout(Step::Propose);
|
||||
None
|
||||
@@ -198,8 +197,8 @@ impl<N: Network> BlockData<N> {
|
||||
assert!(!new_round);
|
||||
None?;
|
||||
}
|
||||
// Put this message to the DB
|
||||
txn.put(&msg_key, res.encode());
|
||||
// Put that we're sending this message to the DB
|
||||
txn.put(&msg_key, []);
|
||||
|
||||
txn.commit();
|
||||
}
|
||||
|
||||
@@ -288,7 +288,7 @@ pub trait Network: Sized + Send + Sync {
|
||||
async fn slash(&mut self, validator: Self::ValidatorId, slash_event: SlashEvent);
|
||||
|
||||
/// Validate a block.
|
||||
async fn validate(&mut self, block: &Self::Block) -> Result<(), BlockError>;
|
||||
async fn validate(&self, block: &Self::Block) -> Result<(), BlockError>;
|
||||
|
||||
/// Add a block, returning the proposal for the next one.
|
||||
///
|
||||
|
||||
@@ -6,7 +6,7 @@ use std::{
|
||||
collections::{VecDeque, HashMap},
|
||||
};
|
||||
|
||||
use parity_scale_codec::{Encode, Decode};
|
||||
use parity_scale_codec::{Encode, Decode, IoReader};
|
||||
|
||||
use futures_channel::mpsc;
|
||||
use futures_util::{
|
||||
@@ -15,6 +15,8 @@ use futures_util::{
|
||||
};
|
||||
use tokio::time::sleep;
|
||||
|
||||
use serai_db::{Get, DbTxn, Db};
|
||||
|
||||
pub mod time;
|
||||
use time::{sys_time, CanonicalInstant};
|
||||
|
||||
@@ -30,6 +32,11 @@ pub(crate) mod message_log;
|
||||
pub mod ext;
|
||||
use ext::*;
|
||||
|
||||
const MESSAGE_TAPE_KEY: &[u8] = b"tendermint-machine-message_tape";
|
||||
fn message_tape_key(genesis: [u8; 32]) -> Vec<u8> {
|
||||
[MESSAGE_TAPE_KEY, &genesis].concat()
|
||||
}
|
||||
|
||||
pub fn commit_msg(end_time: u64, id: &[u8]) -> Vec<u8> {
|
||||
[&end_time.to_le_bytes(), id].concat()
|
||||
}
|
||||
@@ -103,9 +110,23 @@ impl<V: ValidatorId, B: Block, S: Signature> SignedMessage<V, B, S> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, Decode)]
|
||||
pub enum SlashReason {
|
||||
FailToPropose,
|
||||
InvalidBlock,
|
||||
InvalidProposer,
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Eq, Debug, Encode, Decode)]
|
||||
pub enum Evidence {
|
||||
ConflictingMessages(Vec<u8>, Vec<u8>),
|
||||
InvalidPrecommit(Vec<u8>),
|
||||
InvalidValidRound(Vec<u8>),
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Eq, Debug)]
|
||||
pub enum TendermintError<N: Network> {
|
||||
Malicious(N::ValidatorId, Option<Evidence>),
|
||||
pub enum TendermintError {
|
||||
Malicious,
|
||||
Temporal,
|
||||
AlreadyHandled,
|
||||
InvalidEvidence,
|
||||
@@ -126,20 +147,6 @@ pub type SignedMessageFor<N> = SignedMessage<
|
||||
<<N as Network>::SignatureScheme as SignatureScheme>::Signature,
|
||||
>;
|
||||
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, Decode)]
|
||||
pub enum SlashReason {
|
||||
FailToPropose,
|
||||
InvalidBlock,
|
||||
InvalidMessage,
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Eq, Debug, Encode, Decode)]
|
||||
pub enum Evidence {
|
||||
ConflictingMessages(Vec<u8>, Vec<u8>),
|
||||
InvalidPrecommit(Vec<u8>),
|
||||
InvalidValidRound(Vec<u8>),
|
||||
}
|
||||
|
||||
pub fn decode_signed_message<N: Network>(mut data: &[u8]) -> Option<SignedMessageFor<N>> {
|
||||
SignedMessageFor::<N>::decode(&mut data).ok()
|
||||
}
|
||||
@@ -147,7 +154,7 @@ pub fn decode_signed_message<N: Network>(mut data: &[u8]) -> Option<SignedMessag
|
||||
fn decode_and_verify_signed_message<N: Network>(
|
||||
data: &[u8],
|
||||
schema: &N::SignatureScheme,
|
||||
) -> Result<SignedMessageFor<N>, TendermintError<N>> {
|
||||
) -> Result<SignedMessageFor<N>, TendermintError> {
|
||||
let msg = decode_signed_message::<N>(data).ok_or(TendermintError::InvalidEvidence)?;
|
||||
|
||||
// verify that evidence messages are signed correctly
|
||||
@@ -162,7 +169,7 @@ pub fn verify_tendermint_evience<N: Network>(
|
||||
evidence: &Evidence,
|
||||
schema: &N::SignatureScheme,
|
||||
commit: impl Fn(u64) -> Option<Commit<N::SignatureScheme>>,
|
||||
) -> Result<(), TendermintError<N>> {
|
||||
) -> Result<(), TendermintError> {
|
||||
match evidence {
|
||||
Evidence::ConflictingMessages(first, second) => {
|
||||
let first = decode_and_verify_signed_message::<N>(first, schema)?.msg;
|
||||
@@ -186,15 +193,16 @@ pub fn verify_tendermint_evience<N: Network>(
|
||||
};
|
||||
// TODO: We need to be passed in the genesis time to handle this edge case
|
||||
if msg.block.0 == 0 {
|
||||
todo!("invalid precommit signature on first block")
|
||||
Err(TendermintError::InvalidEvidence)?
|
||||
// todo!("invalid precommit signature on first block")
|
||||
}
|
||||
|
||||
// get the last commit
|
||||
let prior_commit = match commit(msg.block.0 - 1) {
|
||||
Some(c) => c,
|
||||
// If we have yet to sync the block in question, we will return InvalidContent based
|
||||
// If we have yet to sync the block in question, we will return InvalidEvidence based
|
||||
// on our own temporal ambiguity
|
||||
// This will also cause an InvalidContent for anything using a non-existent block,
|
||||
// This will also cause an InvalidEvidence for anything using a non-existent block,
|
||||
// yet that's valid behavior
|
||||
// TODO: Double check the ramifications of this
|
||||
_ => Err(TendermintError::InvalidEvidence)?,
|
||||
@@ -229,6 +237,16 @@ pub enum SlashEvent {
|
||||
WithEvidence(Evidence),
|
||||
}
|
||||
|
||||
// Struct for if various upon handlers have been triggered to ensure they don't trigger multiple
|
||||
// times.
|
||||
#[derive(Clone, PartialEq, Eq, Debug)]
|
||||
struct Upons {
|
||||
upon_prevotes: bool,
|
||||
upon_successful_current_round_prevotes: bool,
|
||||
upon_negative_current_round_prevotes: bool,
|
||||
upon_precommits: bool,
|
||||
}
|
||||
|
||||
/// A machine executing the Tendermint protocol.
|
||||
pub struct TendermintMachine<N: Network> {
|
||||
db: N::Db,
|
||||
@@ -337,6 +355,13 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
||||
);
|
||||
sleep(time_until_round_end).await;
|
||||
|
||||
// Clear the message tape
|
||||
{
|
||||
let mut txn = self.db.txn();
|
||||
txn.del(&message_tape_key(self.genesis));
|
||||
txn.commit();
|
||||
}
|
||||
|
||||
// Clear our outbound message queue
|
||||
self.queue = VecDeque::new();
|
||||
|
||||
@@ -838,7 +863,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
||||
let validators = network.signature_scheme();
|
||||
let weights = Arc::new(network.weights());
|
||||
let validator_id = signer.validator_id().await;
|
||||
// 01-10
|
||||
// L01-10
|
||||
let mut machine = TendermintMachine {
|
||||
db: db.clone(),
|
||||
genesis,
|
||||
@@ -888,16 +913,16 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
||||
pub async fn run(mut self) {
|
||||
log::debug!(target: "tendermint", "running TendermintMachine");
|
||||
|
||||
let mut rebroadcast_future = Box::pin(sleep(Duration::from_secs(60))).fuse();
|
||||
loop {
|
||||
// Also create a future for if the queue has a message
|
||||
// Does not pop_front as if another message has higher priority, its future will be handled
|
||||
// instead in this loop, and the popped value would be dropped with the next iteration
|
||||
// While no other message has a higher priority right now, this is a safer practice
|
||||
let mut queue_future =
|
||||
if self.queue.is_empty() { Fuse::terminated() } else { future::ready(()).fuse() };
|
||||
|
||||
if let Some((our_message, msg, mut sig)) = futures_util::select_biased! {
|
||||
// Handle a new block occurring externally (an external sync loop)
|
||||
// Handle a new block occurring externally (from an external sync loop)
|
||||
// Has the highest priority as it makes all other futures here irrelevant
|
||||
msg = self.synced_block_recv.next() => {
|
||||
if let Some(SyncedBlock { number, block, commit }) = msg {
|
||||
@@ -931,16 +956,19 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
||||
Some((true, self.queue.pop_front().unwrap(), None))
|
||||
},
|
||||
|
||||
// L57-67
|
||||
// Handle any timeouts
|
||||
step = self.block.round().timeout_future().fuse() => {
|
||||
// Remove the timeout so it doesn't persist, always being the selected future due to bias
|
||||
// While this does enable the timeout to be entered again, the timeout setting code will
|
||||
// never attempt to add a timeout after its timeout has expired
|
||||
// (due to it setting an `upon` boolean)
|
||||
self.block.round_mut().timeouts.remove(&step);
|
||||
// Only run if it's still the step in question
|
||||
if self.block.round().step == step {
|
||||
match step {
|
||||
Step::Propose => {
|
||||
|
||||
match step {
|
||||
Step::Propose => {
|
||||
// Only run if it's still the step in question
|
||||
if self.block.round().step == step {
|
||||
// Slash the validator for not proposing when they should've
|
||||
log::debug!(target: "tendermint", "validator didn't propose when they should have");
|
||||
// this slash will be voted on.
|
||||
@@ -953,14 +981,42 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
||||
),
|
||||
).await;
|
||||
self.broadcast(Data::Prevote(None));
|
||||
},
|
||||
Step::Prevote => self.broadcast(Data::Precommit(None)),
|
||||
Step::Precommit => {
|
||||
self.round(RoundNumber(self.block.round().number.0 + 1), None);
|
||||
continue;
|
||||
}
|
||||
},
|
||||
Step::Prevote => {
|
||||
// Only run if it's still the step in question
|
||||
if self.block.round().step == step {
|
||||
self.broadcast(Data::Precommit(None))
|
||||
}
|
||||
},
|
||||
Step::Precommit => {
|
||||
self.round(RoundNumber(self.block.round().number.0 + 1), None);
|
||||
}
|
||||
};
|
||||
|
||||
// Execute the upons now that the state has changed
|
||||
self.all_any_round_upons(self.block.round().number).await;
|
||||
self.all_current_round_upons().await;
|
||||
|
||||
None
|
||||
},
|
||||
|
||||
// If it's been more than 60s, rebroadcast our own messages
|
||||
() = rebroadcast_future => {
|
||||
let key = message_tape_key(self.genesis);
|
||||
let messages = self.db.get(key).unwrap_or(vec![]);
|
||||
let mut messages = messages.as_slice();
|
||||
|
||||
while !messages.is_empty() {
|
||||
self.network.broadcast(
|
||||
SignedMessageFor::<N>::decode(&mut IoReader(&mut messages))
|
||||
.expect("saved invalid message to DB")
|
||||
).await;
|
||||
}
|
||||
|
||||
// Reset the rebroadcast future
|
||||
rebroadcast_future = Box::pin(sleep(core::time::Duration::from_secs(60))).fuse();
|
||||
|
||||
None
|
||||
},
|
||||
|
||||
@@ -982,429 +1038,31 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
||||
}
|
||||
let sig = sig.unwrap();
|
||||
|
||||
// TODO: message may internally call broadcast. We should check within broadcast it's not
|
||||
// broadcasting our own message at this time.
|
||||
let signed_msg = SignedMessage { msg: msg.clone(), sig: sig.clone() };
|
||||
let res = self.message(&signed_msg).await;
|
||||
// If this is our message, and we hit an invariant, we could be slashed.
|
||||
// We only broadcast our message after running it ourselves, to ensure it doesn't error, to
|
||||
// ensure we don't get slashed on invariants.
|
||||
if res.is_err() && our_message {
|
||||
panic!("honest node (ourselves) had invalid behavior");
|
||||
}
|
||||
// Only now should we allow broadcasts since we're sure an invariant wasn't reached causing
|
||||
// us to have invalid messages.
|
||||
|
||||
// Save this message to a linear tape of all our messages for this block, if ours
|
||||
// TODO: Since we do this after we mark this message as sent to prevent equivocations, a
|
||||
// precisely time reboot could cause this message marked as sent yet not added to the tape
|
||||
if our_message {
|
||||
let message_tape_key = message_tape_key(self.genesis);
|
||||
let mut txn = self.db.txn();
|
||||
let mut message_tape = txn.get(&message_tape_key).unwrap_or(vec![]);
|
||||
message_tape.extend(signed_msg.encode());
|
||||
txn.put(&message_tape_key, message_tape);
|
||||
}
|
||||
|
||||
// Re-broadcast this since it's an original consensus message worth handling
|
||||
if res.is_ok() {
|
||||
// Re-broadcast this since it's an original consensus message
|
||||
self.network.broadcast(signed_msg).await;
|
||||
}
|
||||
|
||||
match res {
|
||||
Ok(None) => {}
|
||||
Ok(Some(block)) => {
|
||||
let mut validators = vec![];
|
||||
let mut sigs = vec![];
|
||||
// Get all precommits for this round
|
||||
for (validator, msgs) in &self.block.log.log[&msg.round] {
|
||||
if let Some(signed) = msgs.get(&Step::Precommit) {
|
||||
if let Data::Precommit(Some((id, sig))) = &signed.msg.data {
|
||||
// If this precommit was for this block, include it
|
||||
if *id == block.id() {
|
||||
validators.push(*validator);
|
||||
sigs.push(sig.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let commit_msg =
|
||||
commit_msg(self.block.end_time[&msg.round].canonical(), block.id().as_ref());
|
||||
let commit = Commit {
|
||||
end_time: self.block.end_time[&msg.round].canonical(),
|
||||
validators: validators.clone(),
|
||||
signature: self.network.signature_scheme().aggregate(&validators, &commit_msg, &sigs),
|
||||
};
|
||||
debug_assert!(self.network.verify_commit(block.id(), &commit));
|
||||
|
||||
log::info!(
|
||||
target: "tendermint",
|
||||
"TendermintMachine produced block {}",
|
||||
hex::encode(block.id().as_ref()),
|
||||
);
|
||||
let id = block.id();
|
||||
let proposal = self.network.add_block(block, commit).await;
|
||||
log::trace!(
|
||||
target: "tendermint",
|
||||
"added block {} (produced by machine)",
|
||||
hex::encode(id.as_ref()),
|
||||
);
|
||||
self.reset(msg.round, proposal).await;
|
||||
}
|
||||
Err(TendermintError::Malicious(sender, evidence)) => {
|
||||
let current_msg = SignedMessage { msg: msg.clone(), sig: sig.clone() };
|
||||
|
||||
let slash = if let Some(ev) = evidence {
|
||||
// if the malicious message contains a block, only vote to slash
|
||||
// TODO: Should this decision be made at a higher level?
|
||||
// A higher-level system may be able to verify if the contained block is fatally
|
||||
// invalid
|
||||
// A higher-level system may accept the bandwidth size of this, even if the issue is
|
||||
// just the valid round field
|
||||
if let Data::Proposal(_, _) = ¤t_msg.msg.data {
|
||||
SlashEvent::Id(
|
||||
SlashReason::InvalidBlock,
|
||||
self.block.number.0,
|
||||
self.block.round().number.0,
|
||||
)
|
||||
} else {
|
||||
// slash with evidence otherwise
|
||||
SlashEvent::WithEvidence(ev)
|
||||
}
|
||||
} else {
|
||||
// we don't have evidence. Slash with vote.
|
||||
SlashEvent::Id(
|
||||
SlashReason::InvalidMessage,
|
||||
self.block.number.0,
|
||||
self.block.round().number.0,
|
||||
)
|
||||
};
|
||||
|
||||
// Each message that we're voting to slash over needs to be re-broadcasted so other
|
||||
// validators also trigger their own votes
|
||||
// TODO: should this be inside slash function?
|
||||
if let SlashEvent::Id(_, _, _) = slash {
|
||||
self.network.broadcast(current_msg).await;
|
||||
}
|
||||
|
||||
self.slash(sender, slash).await
|
||||
}
|
||||
Err(
|
||||
TendermintError::Temporal |
|
||||
TendermintError::AlreadyHandled |
|
||||
TendermintError::InvalidEvidence,
|
||||
) => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Returns Ok(true) if this was a Precommit which had either no signature or its signature
|
||||
// validated
|
||||
// Returns Ok(false) if it wasn't a Precommit or the signature wasn't validated yet
|
||||
// Returns Err if the signature was invalid
|
||||
fn verify_precommit_signature(
|
||||
&self,
|
||||
signed: &SignedMessageFor<N>,
|
||||
) -> Result<bool, TendermintError<N>> {
|
||||
let msg = &signed.msg;
|
||||
if let Data::Precommit(precommit) = &msg.data {
|
||||
let Some((id, sig)) = precommit else { return Ok(true) };
|
||||
// Also verify the end_time of the commit
|
||||
// Only perform this verification if we already have the end_time
|
||||
// Else, there's a DoS where we receive a precommit for some round infinitely in the future
|
||||
// which forces us to calculate every end time
|
||||
if let Some(end_time) = self.block.end_time.get(&msg.round) {
|
||||
if !self.validators.verify(msg.sender, &commit_msg(end_time.canonical(), id.as_ref()), sig)
|
||||
{
|
||||
log::warn!(target: "tendermint", "Validator produced an invalid commit signature");
|
||||
Err(TendermintError::Malicious(
|
||||
msg.sender,
|
||||
Some(Evidence::InvalidPrecommit(signed.encode())),
|
||||
))?;
|
||||
}
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
async fn message(
|
||||
&mut self,
|
||||
signed: &SignedMessageFor<N>,
|
||||
) -> Result<Option<N::Block>, TendermintError<N>> {
|
||||
let msg = &signed.msg;
|
||||
if msg.block != self.block.number {
|
||||
Err(TendermintError::Temporal)?;
|
||||
}
|
||||
|
||||
if (msg.block == self.block.number) &&
|
||||
(msg.round == self.block.round().number) &&
|
||||
(msg.data.step() == Step::Propose)
|
||||
{
|
||||
log::trace!(
|
||||
target: "tendermint",
|
||||
"received Propose for block {}, round {}",
|
||||
msg.block.0,
|
||||
msg.round.0,
|
||||
);
|
||||
}
|
||||
|
||||
// If this is a precommit, verify its signature
|
||||
self.verify_precommit_signature(signed)?;
|
||||
|
||||
// Only let the proposer propose
|
||||
if matches!(msg.data, Data::Proposal(..)) &&
|
||||
(msg.sender != self.weights.proposer(msg.block, msg.round))
|
||||
{
|
||||
log::warn!(target: "tendermint", "Validator who wasn't the proposer proposed");
|
||||
// TODO: This should have evidence
|
||||
Err(TendermintError::Malicious(msg.sender, None))?;
|
||||
};
|
||||
|
||||
if !self.block.log.log(signed.clone())? {
|
||||
return Err(TendermintError::AlreadyHandled);
|
||||
}
|
||||
log::trace!(
|
||||
target: "tendermint",
|
||||
"received new tendermint message (block: {}, round: {}, step: {:?})",
|
||||
msg.block.0,
|
||||
msg.round.0,
|
||||
msg.data.step(),
|
||||
);
|
||||
|
||||
// All functions, except for the finalizer and the jump, are locked to the current round
|
||||
|
||||
// Run the finalizer to see if it applies
|
||||
// 49-52
|
||||
if matches!(msg.data, Data::Proposal(..)) || matches!(msg.data, Data::Precommit(_)) {
|
||||
let proposer = self.weights.proposer(self.block.number, msg.round);
|
||||
|
||||
// Get the proposal
|
||||
if let Some(proposal_signed) = self.block.log.get(msg.round, proposer, Step::Propose) {
|
||||
if let Data::Proposal(_, block) = &proposal_signed.msg.data {
|
||||
// Check if it has gotten a sufficient amount of precommits
|
||||
// Uses a junk signature since message equality disregards the signature
|
||||
if self.block.log.has_consensus(
|
||||
msg.round,
|
||||
&Data::Precommit(Some((block.id(), self.signer.sign(&[]).await))),
|
||||
) {
|
||||
// If msg.round is in the future, these Precommits won't have their inner signatures
|
||||
// verified
|
||||
// It should be impossible for msg.round to be in the future however, as this requires
|
||||
// 67% of validators to Precommit, and we jump on 34% participating in the new round
|
||||
// The one exception would be if a validator had 34%, and could cause participation to
|
||||
// go from 33% (not enough to jump) to 67%, without executing the below code
|
||||
// This also would require the local machine to be outside of allowed time tolerances,
|
||||
// or the validator with 34% to not be publishing Prevotes (as those would cause a
|
||||
// a jump)
|
||||
// Both are invariants
|
||||
// TODO: Replace this panic with an inner signature check
|
||||
assert!(msg.round.0 <= self.block.round().number.0);
|
||||
|
||||
log::debug!(target: "tendermint", "block {} has consensus", msg.block.0);
|
||||
return Ok(Some(block.clone()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Else, check if we need to jump ahead
|
||||
#[allow(clippy::comparison_chain)]
|
||||
if msg.round.0 < self.block.round().number.0 {
|
||||
// Prior round, disregard if not finalizing
|
||||
return Ok(None);
|
||||
} else if msg.round.0 > self.block.round().number.0 {
|
||||
// 55-56
|
||||
// Jump, enabling processing by the below code
|
||||
if self.block.log.round_participation(msg.round) > self.weights.fault_threshold() {
|
||||
log::debug!(
|
||||
target: "tendermint",
|
||||
"jumping from round {} to round {}",
|
||||
self.block.round().number.0,
|
||||
msg.round.0,
|
||||
);
|
||||
|
||||
// Jump to the new round.
|
||||
let proposer = self.round(msg.round, None);
|
||||
|
||||
// If this round already has precommit messages, verify their signatures
|
||||
let round_msgs = self.block.log.log[&msg.round].clone();
|
||||
for (validator, msgs) in &round_msgs {
|
||||
if let Some(existing) = msgs.get(&Step::Precommit) {
|
||||
if let Ok(res) = self.verify_precommit_signature(existing) {
|
||||
// Ensure this actually verified the signature instead of believing it shouldn't yet
|
||||
assert!(res);
|
||||
} else {
|
||||
// Remove the message so it isn't counted towards forming a commit/included in one
|
||||
// This won't remove the fact they precommitted for this block hash in the MessageLog
|
||||
// TODO: Don't even log these in the first place until we jump, preventing needing
|
||||
// to do this in the first place
|
||||
let msg = self
|
||||
.block
|
||||
.log
|
||||
.log
|
||||
.get_mut(&msg.round)
|
||||
.unwrap()
|
||||
.get_mut(validator)
|
||||
.unwrap()
|
||||
.remove(&Step::Precommit)
|
||||
.unwrap();
|
||||
|
||||
// Slash the validator for publishing an invalid commit signature
|
||||
self
|
||||
.slash(
|
||||
*validator,
|
||||
SlashEvent::WithEvidence(Evidence::InvalidPrecommit(msg.encode())),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we're the proposer, return now we don't waste time on the current round
|
||||
// (as it doesn't have a proposal, since we didn't propose, and cannot complete)
|
||||
if proposer {
|
||||
return Ok(None);
|
||||
}
|
||||
} else {
|
||||
// Future round which we aren't ready to jump to, so return for now
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
// msg.round is now guaranteed to be equal to self.block.round().number
|
||||
debug_assert_eq!(msg.round, self.block.round().number);
|
||||
|
||||
// The paper executes these checks when the step is prevote. Making sure this message warrants
|
||||
// rerunning these checks is a sane optimization since message instances is a full iteration
|
||||
// of the round map
|
||||
if (self.block.round().step == Step::Prevote) && matches!(msg.data, Data::Prevote(_)) {
|
||||
let (participation, weight) =
|
||||
self.block.log.message_instances(self.block.round().number, &Data::Prevote(None));
|
||||
let threshold_weight = self.weights.threshold();
|
||||
if participation < threshold_weight {
|
||||
log::trace!(
|
||||
target: "tendermint",
|
||||
"progess towards setting prevote timeout, participation: {}, needed: {}",
|
||||
participation,
|
||||
threshold_weight,
|
||||
);
|
||||
}
|
||||
// 34-35
|
||||
if participation >= threshold_weight {
|
||||
log::trace!(
|
||||
target: "tendermint",
|
||||
"setting timeout for prevote due to sufficient participation",
|
||||
);
|
||||
self.block.round_mut().set_timeout(Step::Prevote);
|
||||
}
|
||||
|
||||
// 44-46
|
||||
if weight >= threshold_weight {
|
||||
self.broadcast(Data::Precommit(None));
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
// 47-48
|
||||
if matches!(msg.data, Data::Precommit(_)) &&
|
||||
self.block.log.has_participation(self.block.round().number, Step::Precommit)
|
||||
{
|
||||
log::trace!(
|
||||
target: "tendermint",
|
||||
"setting timeout for precommit due to sufficient participation",
|
||||
);
|
||||
self.block.round_mut().set_timeout(Step::Precommit);
|
||||
}
|
||||
|
||||
// All further operations require actually having the proposal in question
|
||||
let proposer = self.weights.proposer(self.block.number, self.block.round().number);
|
||||
let (vr, block) = if let Some(proposal_signed) =
|
||||
self.block.log.get(self.block.round().number, proposer, Step::Propose)
|
||||
{
|
||||
if let Data::Proposal(vr, block) = &proposal_signed.msg.data {
|
||||
(vr, block)
|
||||
} else {
|
||||
panic!("message for Step::Propose didn't have Data::Proposal");
|
||||
}
|
||||
} else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
// 22-33
|
||||
if self.block.round().step == Step::Propose {
|
||||
// Delay error handling (triggering a slash) until after we vote.
|
||||
let (valid, err) = match self.network.validate(block).await {
|
||||
Ok(()) => (true, Ok(None)),
|
||||
Err(BlockError::Temporal) => (false, Ok(None)),
|
||||
Err(BlockError::Fatal) => (false, {
|
||||
log::warn!(target: "tendermint", "Validator proposed a fatally invalid block");
|
||||
// TODO: Produce evidence of this for the higher level code to decide what to do with
|
||||
Err(TendermintError::Malicious(proposer, None))
|
||||
}),
|
||||
};
|
||||
// Create a raw vote which only requires block validity as a basis for the actual vote.
|
||||
let raw_vote = Some(block.id()).filter(|_| valid);
|
||||
|
||||
// If locked is none, it has a round of -1 according to the protocol. That satisfies
|
||||
// 23 and 29. If it's some, both are satisfied if they're for the same ID. If it's some
|
||||
// with different IDs, the function on 22 rejects yet the function on 28 has one other
|
||||
// condition
|
||||
let locked = self.block.locked.as_ref().map_or(true, |(_, id)| id == &block.id());
|
||||
let mut vote = raw_vote.filter(|_| locked);
|
||||
|
||||
if let Some(vr) = vr {
|
||||
// Malformed message
|
||||
if vr.0 >= self.block.round().number.0 {
|
||||
log::warn!(target: "tendermint", "Validator claimed a round from the future was valid");
|
||||
Err(TendermintError::Malicious(
|
||||
msg.sender,
|
||||
Some(Evidence::InvalidValidRound(signed.encode())),
|
||||
))?;
|
||||
}
|
||||
|
||||
if self.block.log.has_consensus(*vr, &Data::Prevote(Some(block.id()))) {
|
||||
// Allow differing locked values if the proposal has a newer valid round
|
||||
// This is the other condition described above
|
||||
if let Some((locked_round, _)) = self.block.locked.as_ref() {
|
||||
vote = vote.or_else(|| raw_vote.filter(|_| locked_round.0 <= vr.0));
|
||||
}
|
||||
|
||||
self.broadcast(Data::Prevote(vote));
|
||||
return err;
|
||||
}
|
||||
} else {
|
||||
self.broadcast(Data::Prevote(vote));
|
||||
return err;
|
||||
}
|
||||
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
if self.block.valid.as_ref().map_or(true, |(round, _)| round != &self.block.round().number) {
|
||||
// 36-43
|
||||
|
||||
// The run once condition is implemented above. Since valid will always be set by this, it
|
||||
// not being set, or only being set historically, means this has yet to be run
|
||||
|
||||
if self.block.log.has_consensus(self.block.round().number, &Data::Prevote(Some(block.id()))) {
|
||||
match self.network.validate(block).await {
|
||||
// BlockError::Temporal is due to a temporal error we have, yet a supermajority of the
|
||||
// network does not, Because we do not believe this block to be fatally invalid, and
|
||||
// because a supermajority deems it valid, accept it.
|
||||
Ok(()) | Err(BlockError::Temporal) => (),
|
||||
Err(BlockError::Fatal) => {
|
||||
log::warn!(target: "tendermint", "Validator proposed a fatally invalid block");
|
||||
// TODO: Produce evidence of this for the higher level code to decide what to do with
|
||||
Err(TendermintError::Malicious(proposer, None))?
|
||||
}
|
||||
};
|
||||
|
||||
self.block.valid = Some((self.block.round().number, block.clone()));
|
||||
if self.block.round().step == Step::Prevote {
|
||||
self.block.locked = Some((self.block.round().number, block.id()));
|
||||
self.broadcast(Data::Precommit(Some((
|
||||
block.id(),
|
||||
self
|
||||
.signer
|
||||
.sign(&commit_msg(
|
||||
self.block.end_time[&self.block.round().number].canonical(),
|
||||
block.id().as_ref(),
|
||||
))
|
||||
.await,
|
||||
))));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::{sync::Arc, collections::HashMap};
|
||||
|
||||
use parity_scale_codec::Encode;
|
||||
|
||||
use crate::{ext::*, RoundNumber, Step, DataFor, TendermintError, SignedMessageFor, Evidence};
|
||||
use crate::{ext::*, RoundNumber, Step, DataFor, SignedMessageFor, Evidence};
|
||||
|
||||
type RoundLog<N> = HashMap<<N as Network>::ValidatorId, HashMap<Step, SignedMessageFor<N>>>;
|
||||
pub(crate) struct MessageLog<N: Network> {
|
||||
@@ -16,7 +16,7 @@ impl<N: Network> MessageLog<N> {
|
||||
}
|
||||
|
||||
// Returns true if it's a new message
|
||||
pub(crate) fn log(&mut self, signed: SignedMessageFor<N>) -> Result<bool, TendermintError<N>> {
|
||||
pub(crate) fn log(&mut self, signed: SignedMessageFor<N>) -> Result<bool, Evidence> {
|
||||
let msg = &signed.msg;
|
||||
// Clarity, and safety around default != new edge cases
|
||||
let round = self.log.entry(msg.round).or_insert_with(HashMap::new);
|
||||
@@ -30,10 +30,7 @@ impl<N: Network> MessageLog<N> {
|
||||
target: "tendermint",
|
||||
"Validator sent multiple messages for the same block + round + step"
|
||||
);
|
||||
Err(TendermintError::Malicious(
|
||||
msg.sender,
|
||||
Some(Evidence::ConflictingMessages(existing.encode(), signed.encode())),
|
||||
))?;
|
||||
Err(Evidence::ConflictingMessages(existing.encode(), signed.encode()))?;
|
||||
}
|
||||
return Ok(false);
|
||||
}
|
||||
@@ -47,7 +44,8 @@ impl<N: Network> MessageLog<N> {
|
||||
pub(crate) fn message_instances(&self, round: RoundNumber, data: &DataFor<N>) -> (u64, u64) {
|
||||
let mut participating = 0;
|
||||
let mut weight = 0;
|
||||
for (participant, msgs) in &self.log[&round] {
|
||||
let Some(log) = self.log.get(&round) else { return (0, 0) };
|
||||
for (participant, msgs) in log {
|
||||
if let Some(msg) = msgs.get(&data.step()) {
|
||||
let validator_weight = self.weights.weight(*participant);
|
||||
participating += validator_weight;
|
||||
@@ -73,7 +71,8 @@ impl<N: Network> MessageLog<N> {
|
||||
// Check if a supermajority of nodes have participated on a specific step
|
||||
pub(crate) fn has_participation(&self, round: RoundNumber, step: Step) -> bool {
|
||||
let mut participating = 0;
|
||||
for (participant, msgs) in &self.log[&round] {
|
||||
let Some(log) = self.log.get(&round) else { return false };
|
||||
for (participant, msgs) in log {
|
||||
if msgs.get(&step).is_some() {
|
||||
participating += self.weights.weight(*participant);
|
||||
}
|
||||
|
||||
@@ -57,6 +57,7 @@ impl<N: Network> RoundData<N> {
|
||||
|
||||
// Poll all set timeouts, returning the Step whose timeout has just expired
|
||||
pub(crate) async fn timeout_future(&self) -> Step {
|
||||
/*
|
||||
let now = Instant::now();
|
||||
log::trace!(
|
||||
target: "tendermint",
|
||||
@@ -64,6 +65,7 @@ impl<N: Network> RoundData<N> {
|
||||
self.step,
|
||||
self.timeouts.iter().map(|(k, v)| (k, v.duration_since(now))).collect::<HashMap<_, _>>()
|
||||
);
|
||||
*/
|
||||
|
||||
let timeout_future = |step| {
|
||||
let timeout = self.timeouts.get(&step).copied();
|
||||
|
||||
@@ -145,7 +145,7 @@ impl Network for TestNetwork {
|
||||
println!("Slash for {id} due to {event:?}");
|
||||
}
|
||||
|
||||
async fn validate(&mut self, block: &TestBlock) -> Result<(), BlockError> {
|
||||
async fn validate(&self, block: &TestBlock) -> Result<(), BlockError> {
|
||||
block.valid
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user