mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-10 21:19:24 +00:00
BlockData and RoundData structs
This commit is contained in:
@@ -16,7 +16,10 @@ use futures::{
|
|||||||
use tokio::time::sleep;
|
use tokio::time::sleep;
|
||||||
|
|
||||||
mod time;
|
mod time;
|
||||||
use time::{sys_time, CanonicalInstant};
|
use time::*;
|
||||||
|
|
||||||
|
mod round;
|
||||||
|
use round::*;
|
||||||
|
|
||||||
mod message_log;
|
mod message_log;
|
||||||
use message_log::MessageLog;
|
use message_log::MessageLog;
|
||||||
@@ -103,6 +106,21 @@ enum TendermintError<V: ValidatorId> {
|
|||||||
Temporal,
|
Temporal,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct BlockData<N: Network> {
|
||||||
|
number: BlockNumber,
|
||||||
|
validator_id: Option<N::ValidatorId>,
|
||||||
|
proposal: N::Block,
|
||||||
|
|
||||||
|
log: MessageLog<N>,
|
||||||
|
slashes: HashSet<N::ValidatorId>,
|
||||||
|
end_time: HashMap<Round, CanonicalInstant>,
|
||||||
|
|
||||||
|
round: RoundData<N>,
|
||||||
|
|
||||||
|
locked: Option<(Round, <N::Block as Block>::Id)>,
|
||||||
|
valid: Option<(Round, N::Block)>,
|
||||||
|
}
|
||||||
|
|
||||||
/// A machine executing the Tendermint protocol.
|
/// A machine executing the Tendermint protocol.
|
||||||
pub struct TendermintMachine<N: Network> {
|
pub struct TendermintMachine<N: Network> {
|
||||||
network: N,
|
network: N,
|
||||||
@@ -110,11 +128,6 @@ pub struct TendermintMachine<N: Network> {
|
|||||||
validators: N::SignatureScheme,
|
validators: N::SignatureScheme,
|
||||||
weights: Arc<N::Weights>,
|
weights: Arc<N::Weights>,
|
||||||
|
|
||||||
validator_id: Option<N::ValidatorId>,
|
|
||||||
|
|
||||||
number: BlockNumber,
|
|
||||||
personal_proposal: N::Block,
|
|
||||||
|
|
||||||
queue:
|
queue:
|
||||||
VecDeque<Message<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>>,
|
VecDeque<Message<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>>,
|
||||||
msg_recv: mpsc::UnboundedReceiver<
|
msg_recv: mpsc::UnboundedReceiver<
|
||||||
@@ -122,17 +135,7 @@ pub struct TendermintMachine<N: Network> {
|
|||||||
>,
|
>,
|
||||||
step_recv: mpsc::UnboundedReceiver<(Commit<N::SignatureScheme>, N::Block)>,
|
step_recv: mpsc::UnboundedReceiver<(Commit<N::SignatureScheme>, N::Block)>,
|
||||||
|
|
||||||
log: MessageLog<N>,
|
block: BlockData<N>,
|
||||||
slashes: HashSet<N::ValidatorId>,
|
|
||||||
end_time: HashMap<Round, CanonicalInstant>,
|
|
||||||
round: Round,
|
|
||||||
start_time: CanonicalInstant,
|
|
||||||
step: Step,
|
|
||||||
|
|
||||||
locked: Option<(Round, <N::Block as Block>::Id)>,
|
|
||||||
valid: Option<(Round, N::Block)>,
|
|
||||||
|
|
||||||
timeouts: HashMap<Step, Instant>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type StepSender<N> =
|
pub type StepSender<N> =
|
||||||
@@ -158,126 +161,113 @@ pub struct TendermintHandle<N: Network> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<N: Network + 'static> TendermintMachine<N> {
|
impl<N: Network + 'static> TendermintMachine<N> {
|
||||||
fn timeout(&self, step: Step) -> CanonicalInstant {
|
|
||||||
let adjusted_block = N::BLOCK_PROCESSING_TIME * (self.round.0 + 1);
|
|
||||||
let adjusted_latency = N::LATENCY_TIME * (self.round.0 + 1);
|
|
||||||
let offset = Duration::from_secs(
|
|
||||||
(match step {
|
|
||||||
Step::Propose => adjusted_block + adjusted_latency,
|
|
||||||
Step::Prevote => adjusted_block + (2 * adjusted_latency),
|
|
||||||
Step::Precommit => adjusted_block + (3 * adjusted_latency),
|
|
||||||
})
|
|
||||||
.into(),
|
|
||||||
);
|
|
||||||
self.start_time + offset
|
|
||||||
}
|
|
||||||
|
|
||||||
fn broadcast(
|
fn broadcast(
|
||||||
&mut self,
|
&mut self,
|
||||||
data: Data<N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
|
data: Data<N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
|
||||||
) {
|
) {
|
||||||
if let Some(validator_id) = &self.validator_id {
|
if let Some(validator_id) = &self.block.validator_id {
|
||||||
// 27, 33, 41, 46, 60, 64
|
// 27, 33, 41, 46, 60, 64
|
||||||
self.step = data.step();
|
self.block.round.step = data.step();
|
||||||
self.queue.push_back(Message {
|
self.queue.push_back(Message {
|
||||||
sender: *validator_id,
|
sender: *validator_id,
|
||||||
number: self.number,
|
number: self.block.number,
|
||||||
round: self.round,
|
round: self.block.round.round,
|
||||||
data,
|
data,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 14-21
|
fn populate_end_time(&mut self, round: Round) {
|
||||||
fn round_propose(&mut self) -> bool {
|
for r in (self.block.round.round.0 + 1) .. round.0 {
|
||||||
if Some(self.weights.proposer(self.number, self.round)) == self.validator_id {
|
self.block.end_time.insert(
|
||||||
let (round, block) = self
|
Round(r),
|
||||||
.valid
|
RoundData::<N>::new(Round(r), self.block.end_time[&Round(r - 1)]).end_time(),
|
||||||
.clone()
|
);
|
||||||
.map(|(r, b)| (Some(r), b))
|
}
|
||||||
.unwrap_or((None, self.personal_proposal.clone()));
|
}
|
||||||
|
|
||||||
|
// Start a new round. Returns true if we were the proposer
|
||||||
|
fn round(&mut self, round: Round, time: Option<CanonicalInstant>) -> bool {
|
||||||
|
// If skipping rounds, populate end_time
|
||||||
|
self.populate_end_time(round);
|
||||||
|
|
||||||
|
// 11-13
|
||||||
|
self.block.round =
|
||||||
|
RoundData::<N>::new(round, time.unwrap_or_else(|| self.block.end_time[&Round(round.0 - 1)]));
|
||||||
|
self.block.end_time.insert(round, self.block.round.end_time());
|
||||||
|
|
||||||
|
// 14-21
|
||||||
|
if Some(self.weights.proposer(self.block.number, self.block.round.round)) ==
|
||||||
|
self.block.validator_id
|
||||||
|
{
|
||||||
|
let (round, block) = if let Some((round, block)) = &self.block.valid {
|
||||||
|
(Some(*round), block.clone())
|
||||||
|
} else {
|
||||||
|
(None, self.block.proposal.clone())
|
||||||
|
};
|
||||||
self.broadcast(Data::Proposal(round, block));
|
self.broadcast(Data::Proposal(round, block));
|
||||||
true
|
true
|
||||||
} else {
|
} else {
|
||||||
self.timeouts.insert(Step::Propose, self.timeout(Step::Propose).instant());
|
self.block.round.set_timeout(Step::Propose);
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn round(&mut self, round: Round) -> bool {
|
|
||||||
// If moving to a new round, correct the start time and populate end_time
|
|
||||||
for r in self.round.0 .. round.0 {
|
|
||||||
let end = self.timeout(Step::Precommit);
|
|
||||||
self.end_time.insert(Round(r), end);
|
|
||||||
self.round.0 += 1;
|
|
||||||
self.start_time = end;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write the round regardless in case of reset
|
|
||||||
|
|
||||||
// 11-13
|
|
||||||
self.round = round;
|
|
||||||
self.step = Step::Propose;
|
|
||||||
|
|
||||||
// Write the end time
|
|
||||||
self.end_time.insert(round, self.timeout(Step::Precommit));
|
|
||||||
// Clear timeouts
|
|
||||||
self.timeouts = HashMap::new();
|
|
||||||
|
|
||||||
self.round_propose()
|
|
||||||
}
|
|
||||||
|
|
||||||
// 53-54
|
// 53-54
|
||||||
async fn reset(&mut self, end_round: Round, proposal: N::Block) {
|
async fn reset(&mut self, end_round: Round, proposal: N::Block) {
|
||||||
// Wait for the next block interval
|
// Ensure we have the end time data for the last round
|
||||||
let round_end = self.end_time[&end_round];
|
self.populate_end_time(end_round);
|
||||||
|
|
||||||
|
// Sleep until this round ends
|
||||||
|
let round_end = self.block.end_time[&end_round];
|
||||||
sleep(round_end.instant().saturating_duration_since(Instant::now())).await;
|
sleep(round_end.instant().saturating_duration_since(Instant::now())).await;
|
||||||
|
|
||||||
self.validator_id = self.signer.validator_id().await;
|
// Only keep queued messages for this block
|
||||||
|
self.queue = self.queue.drain(..).filter(|msg| msg.number == self.block.number).collect();
|
||||||
|
|
||||||
self.number.0 += 1;
|
// Create the new block
|
||||||
self.personal_proposal = proposal;
|
self.block = BlockData {
|
||||||
|
number: BlockNumber(self.block.number.0 + 1),
|
||||||
|
validator_id: self.signer.validator_id().await,
|
||||||
|
proposal,
|
||||||
|
|
||||||
self.queue = self.queue.drain(..).filter(|msg| msg.number == self.number).collect();
|
log: MessageLog::new(self.weights.clone()),
|
||||||
|
slashes: HashSet::new(),
|
||||||
|
end_time: HashMap::new(),
|
||||||
|
|
||||||
self.log = MessageLog::new(self.weights.clone());
|
// This will be populated in the following round() call
|
||||||
self.slashes = HashSet::new();
|
round: RoundData::<N>::new(Round(0), CanonicalInstant::new(0)),
|
||||||
self.end_time = HashMap::new();
|
|
||||||
self.start_time = round_end;
|
|
||||||
|
|
||||||
self.locked = None;
|
locked: None,
|
||||||
self.valid = None;
|
valid: None,
|
||||||
|
};
|
||||||
|
|
||||||
self.round(Round(0));
|
// Start the first round
|
||||||
|
self.round(Round(0), Some(round_end));
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn reset_by_commit(&mut self, commit: Commit<N::SignatureScheme>, proposal: N::Block) {
|
async fn reset_by_commit(&mut self, commit: Commit<N::SignatureScheme>, proposal: N::Block) {
|
||||||
let mut round = None;
|
let mut round = self.block.round.round;
|
||||||
// If our start time is >= the commit's end time, it's from a previous round
|
// If this commit is for a round we don't have, jump up to it
|
||||||
if self.start_time.canonical() >= commit.end_time {
|
while self.block.end_time[&round].canonical() < commit.end_time {
|
||||||
for (round_i, end_time) in &self.end_time {
|
round.0 += 1;
|
||||||
if end_time.canonical() == commit.end_time {
|
self.populate_end_time(round);
|
||||||
round = Some(*round_i);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Increment rounds until we find the round
|
|
||||||
while {
|
|
||||||
self.round(Round(self.round.0 + 1));
|
|
||||||
// Use < to prevent an infinite loop
|
|
||||||
self.end_time[&self.round].canonical() < commit.end_time
|
|
||||||
} {}
|
|
||||||
round =
|
|
||||||
Some(self.round).filter(|_| self.end_time[&self.round].canonical() == commit.end_time);
|
|
||||||
}
|
}
|
||||||
|
// If this commit is for a prior round, find it
|
||||||
|
while self.block.end_time[&round].canonical() > commit.end_time {
|
||||||
|
if round.0 == 0 {
|
||||||
|
panic!("commit isn't for this machine's next block");
|
||||||
|
}
|
||||||
|
round.0 -= 1;
|
||||||
|
}
|
||||||
|
debug_assert_eq!(self.block.end_time[&round].canonical(), commit.end_time);
|
||||||
|
|
||||||
self.reset(round.expect("commit wasn't for the machine's next block"), proposal).await;
|
self.reset(round, proposal).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn slash(&mut self, validator: N::ValidatorId) {
|
async fn slash(&mut self, validator: N::ValidatorId) {
|
||||||
if !self.slashes.contains(&validator) {
|
if !self.block.slashes.contains(&validator) {
|
||||||
self.slashes.insert(validator);
|
self.block.slashes.insert(validator);
|
||||||
self.network.slash(validator).await;
|
self.network.slash(validator).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -312,61 +302,42 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
|||||||
validators,
|
validators,
|
||||||
weights: weights.clone(),
|
weights: weights.clone(),
|
||||||
|
|
||||||
validator_id,
|
|
||||||
|
|
||||||
number: BlockNumber(last.0 .0 + 1),
|
|
||||||
personal_proposal: proposal,
|
|
||||||
|
|
||||||
queue: VecDeque::new(),
|
queue: VecDeque::new(),
|
||||||
msg_recv,
|
msg_recv,
|
||||||
step_recv,
|
step_recv,
|
||||||
|
|
||||||
log: MessageLog::new(weights),
|
block: BlockData {
|
||||||
slashes: HashSet::new(),
|
number: BlockNumber(last.0 .0 + 1),
|
||||||
end_time: HashMap::new(),
|
validator_id,
|
||||||
round: Round(0),
|
proposal,
|
||||||
// The end time of the last block is the start time for this one
|
|
||||||
// The Commit explicitly contains the end time, so loading the last commit will provide
|
|
||||||
// this. The only exception is for the genesis block, which doesn't have a commit
|
|
||||||
// Using the genesis time in place will cause this block to be created immediately
|
|
||||||
// after it, without the standard amount of separation (so their times will be
|
|
||||||
// equivalent or minimally offset)
|
|
||||||
// For callers wishing to avoid this, they should pass (0, GENESIS + N::block_time())
|
|
||||||
start_time: CanonicalInstant::new(last.1),
|
|
||||||
step: Step::Propose,
|
|
||||||
|
|
||||||
locked: None,
|
log: MessageLog::new(weights),
|
||||||
valid: None,
|
slashes: HashSet::new(),
|
||||||
|
end_time: HashMap::new(),
|
||||||
|
|
||||||
timeouts: HashMap::new(),
|
// This will be populated in the following round() call
|
||||||
|
round: RoundData::<N>::new(Round(0), CanonicalInstant::new(0)),
|
||||||
|
|
||||||
|
locked: None,
|
||||||
|
valid: None,
|
||||||
|
},
|
||||||
};
|
};
|
||||||
machine.round(Round(0));
|
|
||||||
|
// The end time of the last block is the start time for this one
|
||||||
|
// The Commit explicitly contains the end time, so loading the last commit will provide
|
||||||
|
// this. The only exception is for the genesis block, which doesn't have a commit
|
||||||
|
// Using the genesis time in place will cause this block to be created immediately
|
||||||
|
// after it, without the standard amount of separation (so their times will be
|
||||||
|
// equivalent or minimally offset)
|
||||||
|
// For callers wishing to avoid this, they should pass (0, GENESIS + N::block_time())
|
||||||
|
machine.round(Round(0), Some(CanonicalInstant::new(last.1)));
|
||||||
machine
|
machine
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run(mut self) {
|
pub async fn run(mut self) {
|
||||||
self.round(Round(0));
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
// Create futures for the various timeouts
|
|
||||||
let timeout_future = |step| {
|
|
||||||
let timeout = self.timeouts.get(&step).copied();
|
|
||||||
(async move {
|
|
||||||
if let Some(timeout) = timeout {
|
|
||||||
sleep(timeout.saturating_duration_since(Instant::now())).await;
|
|
||||||
} else {
|
|
||||||
future::pending::<()>().await;
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.fuse()
|
|
||||||
};
|
|
||||||
let propose_timeout = timeout_future(Step::Propose);
|
|
||||||
let prevote_timeout = timeout_future(Step::Prevote);
|
|
||||||
let precommit_timeout = timeout_future(Step::Precommit);
|
|
||||||
futures::pin_mut!(propose_timeout, prevote_timeout, precommit_timeout);
|
|
||||||
|
|
||||||
// Also create a future for if the queue has a message
|
// Also create a future for if the queue has a message
|
||||||
// Does not pop_front as if another message has higher priority, its future will be handled
|
// Does not pop_front as if another message has higher priority, its future will be handled
|
||||||
// instead in this loop, and the popped value would be dropped with the next iteration
|
// instead in this loop, and the popped value would be dropped with the next iteration
|
||||||
@@ -391,31 +362,28 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
|||||||
},
|
},
|
||||||
|
|
||||||
// Handle any timeouts
|
// Handle any timeouts
|
||||||
_ = &mut propose_timeout => {
|
step = self.block.round.timeout_future().fuse() => {
|
||||||
// Remove the timeout so it doesn't persist, always being the selected future due to bias
|
// Remove the timeout so it doesn't persist, always being the selected future due to bias
|
||||||
// While this does enable the below get_entry calls to enter timeouts again, they'll
|
// While this does enable the timeout to be entered again, the timeout setting code will
|
||||||
// never attempt to add a timeout after this timeout has expired
|
// never attempt to add a timeout after its timeout has expired
|
||||||
self.timeouts.remove(&Step::Propose);
|
self.block.round.timeouts.remove(&step);
|
||||||
if self.step == Step::Propose {
|
// Only run if it's still the step in question
|
||||||
// Slash the validator for not proposing when they should've
|
if self.block.round.step == step {
|
||||||
self.slash(self.weights.proposer(self.number, self.round)).await;
|
match step {
|
||||||
self.broadcast(Data::Prevote(None));
|
Step::Propose => {
|
||||||
|
// Slash the validator for not proposing when they should've
|
||||||
|
self.slash(self.weights.proposer(self.block.number, self.block.round.round)).await;
|
||||||
|
self.broadcast(Data::Prevote(None));
|
||||||
|
},
|
||||||
|
Step::Prevote => self.broadcast(Data::Precommit(None)),
|
||||||
|
Step::Precommit => {
|
||||||
|
self.round(Round(self.block.round.round.0 + 1), None);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
None
|
None
|
||||||
},
|
},
|
||||||
_ = &mut prevote_timeout => {
|
|
||||||
self.timeouts.remove(&Step::Prevote);
|
|
||||||
if self.step == Step::Prevote {
|
|
||||||
self.broadcast(Data::Precommit(None));
|
|
||||||
}
|
|
||||||
None
|
|
||||||
},
|
|
||||||
_ = &mut precommit_timeout => {
|
|
||||||
// Technically unnecessary since round() will clear the timeouts
|
|
||||||
self.timeouts.remove(&Step::Precommit);
|
|
||||||
self.round(Round(self.round.0.wrapping_add(1)));
|
|
||||||
continue;
|
|
||||||
},
|
|
||||||
|
|
||||||
// Handle any received messages
|
// Handle any received messages
|
||||||
msg = self.msg_recv.next() => {
|
msg = self.msg_recv.next() => {
|
||||||
@@ -440,6 +408,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
|||||||
let mut validators = vec![];
|
let mut validators = vec![];
|
||||||
let mut sigs = vec![];
|
let mut sigs = vec![];
|
||||||
for (v, sig) in self
|
for (v, sig) in self
|
||||||
|
.block
|
||||||
.log
|
.log
|
||||||
.precommitted
|
.precommitted
|
||||||
.iter()
|
.iter()
|
||||||
@@ -450,7 +419,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let commit = Commit {
|
let commit = Commit {
|
||||||
end_time: self.end_time[&msg.round].canonical(),
|
end_time: self.block.end_time[&msg.round].canonical(),
|
||||||
validators,
|
validators,
|
||||||
signature: N::SignatureScheme::aggregate(&sigs),
|
signature: N::SignatureScheme::aggregate(&sigs),
|
||||||
};
|
};
|
||||||
@@ -484,7 +453,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
|||||||
// Only perform this verification if we already have the end_time
|
// Only perform this verification if we already have the end_time
|
||||||
// Else, there's a DoS where we receive a precommit for some round infinitely in the future
|
// Else, there's a DoS where we receive a precommit for some round infinitely in the future
|
||||||
// which forces to calculate every end time
|
// which forces to calculate every end time
|
||||||
if let Some(end_time) = self.end_time.get(&round) {
|
if let Some(end_time) = self.block.end_time.get(&round) {
|
||||||
if !self.validators.verify(sender, &commit_msg(end_time.canonical(), id.as_ref()), sig) {
|
if !self.validators.verify(sender, &commit_msg(end_time.canonical(), id.as_ref()), sig) {
|
||||||
Err(TendermintError::Malicious(sender))?;
|
Err(TendermintError::Malicious(sender))?;
|
||||||
}
|
}
|
||||||
@@ -497,7 +466,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
|||||||
&mut self,
|
&mut self,
|
||||||
msg: Message<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
|
msg: Message<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
|
||||||
) -> Result<Option<N::Block>, TendermintError<N::ValidatorId>> {
|
) -> Result<Option<N::Block>, TendermintError<N::ValidatorId>> {
|
||||||
if msg.number != self.number {
|
if msg.number != self.block.number {
|
||||||
Err(TendermintError::Temporal)?;
|
Err(TendermintError::Temporal)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -511,7 +480,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
|||||||
Err(TendermintError::Malicious(msg.sender))?;
|
Err(TendermintError::Malicious(msg.sender))?;
|
||||||
};
|
};
|
||||||
|
|
||||||
if !self.log.log(msg.clone())? {
|
if !self.block.log.log(msg.clone())? {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -520,13 +489,14 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
|||||||
// Run the finalizer to see if it applies
|
// Run the finalizer to see if it applies
|
||||||
// 49-52
|
// 49-52
|
||||||
if matches!(msg.data, Data::Proposal(..)) || matches!(msg.data, Data::Precommit(_)) {
|
if matches!(msg.data, Data::Proposal(..)) || matches!(msg.data, Data::Precommit(_)) {
|
||||||
let proposer = self.weights.proposer(self.number, msg.round);
|
let proposer = self.weights.proposer(self.block.number, msg.round);
|
||||||
|
|
||||||
// Get the proposal
|
// Get the proposal
|
||||||
if let Some(Data::Proposal(_, block)) = self.log.get(msg.round, proposer, Step::Propose) {
|
if let Some(Data::Proposal(_, block)) = self.block.log.get(msg.round, proposer, Step::Propose)
|
||||||
|
{
|
||||||
// Check if it has gotten a sufficient amount of precommits
|
// Check if it has gotten a sufficient amount of precommits
|
||||||
// Use a junk signature since message equality disregards the signature
|
// Use a junk signature since message equality disregards the signature
|
||||||
if self.log.has_consensus(
|
if self.block.log.has_consensus(
|
||||||
msg.round,
|
msg.round,
|
||||||
Data::Precommit(Some((block.id(), self.signer.sign(&[]).await))),
|
Data::Precommit(Some((block.id(), self.signer.sign(&[]).await))),
|
||||||
) {
|
) {
|
||||||
@@ -537,15 +507,15 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
|||||||
|
|
||||||
// Else, check if we need to jump ahead
|
// Else, check if we need to jump ahead
|
||||||
#[allow(clippy::comparison_chain)]
|
#[allow(clippy::comparison_chain)]
|
||||||
if msg.round.0 < self.round.0 {
|
if msg.round.0 < self.block.round.round.0 {
|
||||||
// Prior round, disregard if not finalizing
|
// Prior round, disregard if not finalizing
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
} else if msg.round.0 > self.round.0 {
|
} else if msg.round.0 > self.block.round.round.0 {
|
||||||
// 55-56
|
// 55-56
|
||||||
// Jump, enabling processing by the below code
|
// Jump, enabling processing by the below code
|
||||||
if self.log.round_participation(msg.round) > self.weights.fault_thresold() {
|
if self.block.log.round_participation(msg.round) > self.weights.fault_thresold() {
|
||||||
// If this round already has precommit messages, verify their signatures
|
// If this round already has precommit messages, verify their signatures
|
||||||
let round_msgs = self.log.log[&msg.round].clone();
|
let round_msgs = self.block.log.log[&msg.round].clone();
|
||||||
for (validator, msgs) in &round_msgs {
|
for (validator, msgs) in &round_msgs {
|
||||||
if let Some(data) = msgs.get(&Step::Precommit) {
|
if let Some(data) = msgs.get(&Step::Precommit) {
|
||||||
if self.verify_precommit_signature(*validator, msg.round, data).is_err() {
|
if self.verify_precommit_signature(*validator, msg.round, data).is_err() {
|
||||||
@@ -555,7 +525,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
|||||||
}
|
}
|
||||||
// If we're the proposer, return now so we re-run processing with our proposal
|
// If we're the proposer, return now so we re-run processing with our proposal
|
||||||
// If we continue now, it'd just be wasted ops
|
// If we continue now, it'd just be wasted ops
|
||||||
if self.round(msg.round) {
|
if self.round(msg.round, None) {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@@ -567,12 +537,12 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
|||||||
// The paper executes these checks when the step is prevote. Making sure this message warrants
|
// The paper executes these checks when the step is prevote. Making sure this message warrants
|
||||||
// rerunning these checks is a sane optimization since message instances is a full iteration
|
// rerunning these checks is a sane optimization since message instances is a full iteration
|
||||||
// of the round map
|
// of the round map
|
||||||
if (self.step == Step::Prevote) && matches!(msg.data, Data::Prevote(_)) {
|
if (self.block.round.step == Step::Prevote) && matches!(msg.data, Data::Prevote(_)) {
|
||||||
let (participation, weight) = self.log.message_instances(self.round, Data::Prevote(None));
|
let (participation, weight) =
|
||||||
|
self.block.log.message_instances(self.block.round.round, Data::Prevote(None));
|
||||||
// 34-35
|
// 34-35
|
||||||
if participation >= self.weights.threshold() {
|
if participation >= self.weights.threshold() {
|
||||||
let timeout = self.timeout(Step::Prevote);
|
self.block.round.set_timeout(Step::Prevote);
|
||||||
self.timeouts.entry(Step::Prevote).or_insert_with(|| timeout.instant());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 44-46
|
// 44-46
|
||||||
@@ -584,16 +554,17 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
|||||||
|
|
||||||
// 47-48
|
// 47-48
|
||||||
if matches!(msg.data, Data::Precommit(_)) &&
|
if matches!(msg.data, Data::Precommit(_)) &&
|
||||||
self.log.has_participation(self.round, Step::Precommit)
|
self.block.log.has_participation(self.block.round.round, Step::Precommit)
|
||||||
{
|
{
|
||||||
let timeout = self.timeout(Step::Precommit);
|
self.block.round.set_timeout(Step::Precommit);
|
||||||
self.timeouts.entry(Step::Precommit).or_insert_with(|| timeout.instant());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let proposer = self.weights.proposer(self.number, self.round);
|
let proposer = self.weights.proposer(self.block.number, self.block.round.round);
|
||||||
if let Some(Data::Proposal(vr, block)) = self.log.get(self.round, proposer, Step::Propose) {
|
if let Some(Data::Proposal(vr, block)) =
|
||||||
|
self.block.log.get(self.block.round.round, proposer, Step::Propose)
|
||||||
|
{
|
||||||
// 22-33
|
// 22-33
|
||||||
if self.step == Step::Propose {
|
if self.block.round.step == Step::Propose {
|
||||||
// Delay error handling (triggering a slash) until after we vote.
|
// Delay error handling (triggering a slash) until after we vote.
|
||||||
let (valid, err) = match self.network.validate(block).await {
|
let (valid, err) = match self.network.validate(block).await {
|
||||||
Ok(_) => (true, Ok(None)),
|
Ok(_) => (true, Ok(None)),
|
||||||
@@ -607,19 +578,19 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
|||||||
// 23 and 29. If it's some, both are satisfied if they're for the same ID. If it's some
|
// 23 and 29. If it's some, both are satisfied if they're for the same ID. If it's some
|
||||||
// with different IDs, the function on 22 rejects yet the function on 28 has one other
|
// with different IDs, the function on 22 rejects yet the function on 28 has one other
|
||||||
// condition
|
// condition
|
||||||
let locked = self.locked.as_ref().map(|(_, id)| id == &block.id()).unwrap_or(true);
|
let locked = self.block.locked.as_ref().map(|(_, id)| id == &block.id()).unwrap_or(true);
|
||||||
let mut vote = raw_vote.filter(|_| locked);
|
let mut vote = raw_vote.filter(|_| locked);
|
||||||
|
|
||||||
if let Some(vr) = vr {
|
if let Some(vr) = vr {
|
||||||
// Malformed message
|
// Malformed message
|
||||||
if vr.0 >= self.round.0 {
|
if vr.0 >= self.block.round.round.0 {
|
||||||
Err(TendermintError::Malicious(msg.sender))?;
|
Err(TendermintError::Malicious(msg.sender))?;
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.log.has_consensus(*vr, Data::Prevote(Some(block.id()))) {
|
if self.block.log.has_consensus(*vr, Data::Prevote(Some(block.id()))) {
|
||||||
// Allow differing locked values if the proposal has a newer valid round
|
// Allow differing locked values if the proposal has a newer valid round
|
||||||
// This is the other condition described above
|
// This is the other condition described above
|
||||||
if let Some((locked_round, _)) = self.locked.as_ref() {
|
if let Some((locked_round, _)) = self.block.locked.as_ref() {
|
||||||
vote = vote.or_else(|| raw_vote.filter(|_| locked_round.0 <= vr.0));
|
vote = vote.or_else(|| raw_vote.filter(|_| locked_round.0 <= vr.0));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -630,27 +601,36 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
|||||||
self.broadcast(Data::Prevote(vote));
|
self.broadcast(Data::Prevote(vote));
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
} else if self.valid.as_ref().map(|(round, _)| round != &self.round).unwrap_or(true) {
|
} else if self
|
||||||
|
.block
|
||||||
|
.valid
|
||||||
|
.as_ref()
|
||||||
|
.map(|(round, _)| round != &self.block.round.round)
|
||||||
|
.unwrap_or(true)
|
||||||
|
{
|
||||||
// 36-43
|
// 36-43
|
||||||
|
|
||||||
// The run once condition is implemented above. Sinve valid will always be set, it not
|
// The run once condition is implemented above. Sinve valid will always be set, it not
|
||||||
// being set, or only being set historically, means this has yet to be run
|
// being set, or only being set historically, means this has yet to be run
|
||||||
|
|
||||||
if self.log.has_consensus(self.round, Data::Prevote(Some(block.id()))) {
|
if self.block.log.has_consensus(self.block.round.round, Data::Prevote(Some(block.id()))) {
|
||||||
match self.network.validate(block).await {
|
match self.network.validate(block).await {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
Err(BlockError::Temporal) => (),
|
Err(BlockError::Temporal) => (),
|
||||||
Err(BlockError::Fatal) => Err(TendermintError::Malicious(proposer))?,
|
Err(BlockError::Fatal) => Err(TendermintError::Malicious(proposer))?,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.valid = Some((self.round, block.clone()));
|
self.block.valid = Some((self.block.round.round, block.clone()));
|
||||||
if self.step == Step::Prevote {
|
if self.block.round.step == Step::Prevote {
|
||||||
self.locked = Some((self.round, block.id()));
|
self.block.locked = Some((self.block.round.round, block.id()));
|
||||||
self.broadcast(Data::Precommit(Some((
|
self.broadcast(Data::Precommit(Some((
|
||||||
block.id(),
|
block.id(),
|
||||||
self
|
self
|
||||||
.signer
|
.signer
|
||||||
.sign(&commit_msg(self.end_time[&self.round].canonical(), block.id().as_ref()))
|
.sign(&commit_msg(
|
||||||
|
self.block.end_time[&self.block.round.round].canonical(),
|
||||||
|
block.id().as_ref(),
|
||||||
|
))
|
||||||
.await,
|
.await,
|
||||||
))));
|
))));
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
|
|||||||
82
substrate/tendermint/machine/src/round.rs
Normal file
82
substrate/tendermint/machine/src/round.rs
Normal file
@@ -0,0 +1,82 @@
|
|||||||
|
use std::{
|
||||||
|
marker::PhantomData,
|
||||||
|
time::{Duration, Instant},
|
||||||
|
collections::HashMap,
|
||||||
|
};
|
||||||
|
|
||||||
|
use futures::{FutureExt, future};
|
||||||
|
use tokio::time::sleep;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
time::CanonicalInstant,
|
||||||
|
Step,
|
||||||
|
ext::{Round, Network},
|
||||||
|
};
|
||||||
|
|
||||||
|
pub(crate) struct RoundData<N: Network> {
|
||||||
|
_network: PhantomData<N>,
|
||||||
|
pub(crate) round: Round,
|
||||||
|
pub(crate) start_time: CanonicalInstant,
|
||||||
|
pub(crate) step: Step,
|
||||||
|
pub(crate) timeouts: HashMap<Step, Instant>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<N: Network> RoundData<N> {
|
||||||
|
pub(crate) fn new(round: Round, start_time: CanonicalInstant) -> Self {
|
||||||
|
RoundData {
|
||||||
|
_network: PhantomData,
|
||||||
|
round,
|
||||||
|
start_time,
|
||||||
|
step: Step::Propose,
|
||||||
|
timeouts: HashMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn timeout(&self, step: Step) -> CanonicalInstant {
|
||||||
|
let adjusted_block = N::BLOCK_PROCESSING_TIME * (self.round.0 + 1);
|
||||||
|
let adjusted_latency = N::LATENCY_TIME * (self.round.0 + 1);
|
||||||
|
let offset = Duration::from_secs(
|
||||||
|
(match step {
|
||||||
|
Step::Propose => adjusted_block + adjusted_latency,
|
||||||
|
Step::Prevote => adjusted_block + (2 * adjusted_latency),
|
||||||
|
Step::Precommit => adjusted_block + (3 * adjusted_latency),
|
||||||
|
})
|
||||||
|
.into(),
|
||||||
|
);
|
||||||
|
self.start_time + offset
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn end_time(&self) -> CanonicalInstant {
|
||||||
|
self.timeout(Step::Precommit)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn set_timeout(&mut self, step: Step) {
|
||||||
|
let timeout = self.timeout(step).instant();
|
||||||
|
self.timeouts.entry(step).or_insert(timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn timeout_future(&self) -> Step {
|
||||||
|
let timeout_future = |step| {
|
||||||
|
let timeout = self.timeouts.get(&step).copied();
|
||||||
|
(async move {
|
||||||
|
if let Some(timeout) = timeout {
|
||||||
|
sleep(timeout.saturating_duration_since(Instant::now())).await;
|
||||||
|
} else {
|
||||||
|
future::pending::<()>().await;
|
||||||
|
}
|
||||||
|
step
|
||||||
|
})
|
||||||
|
.fuse()
|
||||||
|
};
|
||||||
|
let propose_timeout = timeout_future(Step::Propose);
|
||||||
|
let prevote_timeout = timeout_future(Step::Prevote);
|
||||||
|
let precommit_timeout = timeout_future(Step::Precommit);
|
||||||
|
futures::pin_mut!(propose_timeout, prevote_timeout, precommit_timeout);
|
||||||
|
|
||||||
|
futures::select_biased! {
|
||||||
|
step = propose_timeout => step,
|
||||||
|
step = prevote_timeout => step,
|
||||||
|
step = precommit_timeout => step,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user