mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-09 20:59:23 +00:00
Remove async recursion
Greatly increases safety as well by ensuring only one message is processed at once.
This commit is contained in:
@@ -10,6 +10,5 @@ edition = "2021"
|
||||
[dependencies]
|
||||
parity-scale-codec = { version = "3.2", features = ["derive"] }
|
||||
|
||||
async-recursion = "1.0"
|
||||
async-trait = "0.1"
|
||||
tokio = { version = "1", features = ["macros", "rt", "sync"] }
|
||||
|
||||
@@ -166,6 +166,7 @@ pub trait Network: Send + Sync {
|
||||
|
||||
/// Trigger a slash for the validator in question who was definitively malicious.
|
||||
/// The exact process of triggering a slash is undefined and left to the network as a whole.
|
||||
// TODO: This is spammed right now.
|
||||
async fn slash(&mut self, validator: Self::ValidatorId);
|
||||
|
||||
/// Validate a block.
|
||||
|
||||
@@ -93,6 +93,11 @@ pub struct TendermintMachine<N: Network> {
|
||||
start_time: Instant,
|
||||
personal_proposal: N::Block,
|
||||
|
||||
queue: Vec<(
|
||||
bool,
|
||||
Message<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
|
||||
)>,
|
||||
|
||||
log: MessageLog<N>,
|
||||
round: Round,
|
||||
step: Step,
|
||||
@@ -128,37 +133,36 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
||||
self.start_time + offset
|
||||
}
|
||||
|
||||
#[async_recursion::async_recursion]
|
||||
async fn broadcast(
|
||||
fn broadcast(
|
||||
&mut self,
|
||||
data: Data<N::Block, <N::SignatureScheme as SignatureScheme>::Signature>,
|
||||
) -> Option<N::Block> {
|
||||
) {
|
||||
let step = data.step();
|
||||
let msg = Message { sender: self.proposer, number: self.number, round: self.round, data };
|
||||
let res = self.message(msg.clone()).await.unwrap();
|
||||
// 27, 33, 41, 46, 60, 64
|
||||
self.step = step; // TODO: Before or after the above handling call?
|
||||
|
||||
let sig = self.signer.sign(&msg.encode());
|
||||
self.network.write().await.broadcast(SignedMessage { msg, sig }).await;
|
||||
res
|
||||
self.step = step;
|
||||
self.queue.push((
|
||||
true,
|
||||
Message { sender: self.proposer, number: self.number, round: self.round, data },
|
||||
));
|
||||
}
|
||||
|
||||
// 14-21
|
||||
async fn round_propose(&mut self) {
|
||||
fn round_propose(&mut self) -> bool {
|
||||
if self.weights.proposer(self.number, self.round) == self.proposer {
|
||||
let (round, block) = self
|
||||
.valid
|
||||
.clone()
|
||||
.map(|(r, b)| (Some(r), b))
|
||||
.unwrap_or((None, self.personal_proposal.clone()));
|
||||
debug_assert!(self.broadcast(Data::Proposal(round, block)).await.is_none());
|
||||
self.broadcast(Data::Proposal(round, block));
|
||||
true
|
||||
} else {
|
||||
self.timeouts.insert(Step::Propose, self.timeout(Step::Propose));
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
async fn round(&mut self, round: Round) {
|
||||
fn round(&mut self, round: Round) -> bool {
|
||||
dbg!(round);
|
||||
|
||||
// Correct the start time
|
||||
@@ -172,7 +176,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
||||
|
||||
self.round = round;
|
||||
self.step = Step::Propose;
|
||||
self.round_propose().await;
|
||||
self.round_propose()
|
||||
}
|
||||
|
||||
// 53-54
|
||||
@@ -186,7 +190,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
||||
self.locked = None;
|
||||
self.valid = None;
|
||||
|
||||
self.round(Round(0)).await;
|
||||
self.round(Round(0));
|
||||
}
|
||||
|
||||
/// Create a new Tendermint machine, for the specified proposer, from the specified block, with
|
||||
@@ -217,6 +221,8 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
||||
start_time: Instant::now(),
|
||||
personal_proposal: proposal,
|
||||
|
||||
queue: vec![],
|
||||
|
||||
log: MessageLog::new(weights),
|
||||
round: Round(0),
|
||||
step: Step::Propose,
|
||||
@@ -226,7 +232,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
||||
|
||||
timeouts: HashMap::new(),
|
||||
};
|
||||
machine.round_propose().await;
|
||||
machine.round_propose();
|
||||
|
||||
loop {
|
||||
// Check if any timeouts have been triggered
|
||||
@@ -238,55 +244,76 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
||||
|
||||
// Propose timeout
|
||||
if t1 && (machine.step == Step::Propose) {
|
||||
debug_assert!(machine.broadcast(Data::Prevote(None)).await.is_none());
|
||||
machine.broadcast(Data::Prevote(None));
|
||||
}
|
||||
|
||||
// Prevote timeout
|
||||
if t2 && (machine.step == Step::Prevote) {
|
||||
debug_assert!(machine.broadcast(Data::Precommit(None)).await.is_none());
|
||||
machine.broadcast(Data::Precommit(None));
|
||||
}
|
||||
|
||||
// Precommit timeout
|
||||
if t3 {
|
||||
machine.round(Round(machine.round.0.wrapping_add(1))).await;
|
||||
machine.round(Round(machine.round.0.wrapping_add(1)));
|
||||
}
|
||||
|
||||
// If there's a message, handle it
|
||||
match msg_recv.try_recv() {
|
||||
Ok(msg) => {
|
||||
if !machine.signer.verify(msg.msg.sender, &msg.msg.encode(), msg.sig) {
|
||||
yield_now().await;
|
||||
continue;
|
||||
}
|
||||
|
||||
match machine.message(msg.msg).await {
|
||||
Ok(None) => (),
|
||||
Ok(Some(block)) => {
|
||||
let mut validators = vec![];
|
||||
let mut sigs = vec![];
|
||||
for (v, sig) in machine.log.precommitted.iter().filter_map(|(k, (id, sig))| {
|
||||
Some((*k, sig.clone())).filter(|_| id == &block.id())
|
||||
}) {
|
||||
validators.push(v);
|
||||
sigs.push(sig);
|
||||
}
|
||||
|
||||
let commit =
|
||||
Commit { validators, signature: N::SignatureScheme::aggregate(&sigs) };
|
||||
debug_assert!(machine.network.read().await.verify_commit(block.id(), &commit));
|
||||
|
||||
let proposal = machine.network.write().await.add_block(block, commit);
|
||||
machine.reset(proposal).await
|
||||
// Drain the channel of messages
|
||||
let mut broken = false;
|
||||
loop {
|
||||
match msg_recv.try_recv() {
|
||||
Ok(msg) => {
|
||||
if !machine.signer.verify(msg.msg.sender, &msg.msg.encode(), msg.sig) {
|
||||
continue;
|
||||
}
|
||||
Err(TendermintError::Malicious(validator)) => {
|
||||
machine.network.write().await.slash(validator).await
|
||||
}
|
||||
Err(TendermintError::Temporal) => (),
|
||||
machine.queue.push((false, msg.msg));
|
||||
}
|
||||
Err(TryRecvError::Empty) => break,
|
||||
Err(TryRecvError::Disconnected) => broken = true,
|
||||
}
|
||||
Err(TryRecvError::Empty) => yield_now().await,
|
||||
Err(TryRecvError::Disconnected) => break,
|
||||
}
|
||||
if broken {
|
||||
break;
|
||||
}
|
||||
|
||||
// Handle the queue
|
||||
let mut queue = machine.queue.drain(..).collect::<Vec<_>>();
|
||||
for (broadcast, msg) in queue.drain(..) {
|
||||
let res = machine.message(msg.clone()).await;
|
||||
if res.is_err() && broadcast {
|
||||
panic!("honest node had invalid behavior");
|
||||
}
|
||||
|
||||
match res {
|
||||
Ok(None) => (),
|
||||
Ok(Some(block)) => {
|
||||
let mut validators = vec![];
|
||||
let mut sigs = vec![];
|
||||
for (v, sig) in machine.log.precommitted.iter().filter_map(|(k, (id, sig))| {
|
||||
Some((*k, sig.clone())).filter(|_| id == &block.id())
|
||||
}) {
|
||||
validators.push(v);
|
||||
sigs.push(sig);
|
||||
}
|
||||
|
||||
let commit = Commit { validators, signature: N::SignatureScheme::aggregate(&sigs) };
|
||||
debug_assert!(machine.network.read().await.verify_commit(block.id(), &commit));
|
||||
|
||||
let proposal = machine.network.write().await.add_block(block, commit);
|
||||
machine.reset(proposal).await;
|
||||
}
|
||||
Err(TendermintError::Malicious(validator)) => {
|
||||
machine.network.write().await.slash(validator).await;
|
||||
}
|
||||
Err(TendermintError::Temporal) => (),
|
||||
}
|
||||
|
||||
if broadcast {
|
||||
let sig = machine.signer.sign(&msg.encode());
|
||||
machine.network.write().await.broadcast(SignedMessage { msg, sig }).await;
|
||||
}
|
||||
}
|
||||
|
||||
yield_now().await;
|
||||
}
|
||||
}),
|
||||
}
|
||||
@@ -348,21 +375,50 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
||||
// 55-56
|
||||
// Jump, enabling processing by the below code
|
||||
if self.log.round_participation(self.round) > self.weights.fault_thresold() {
|
||||
self.round(msg.round).await;
|
||||
// If we're the proposer, return to avoid a double process
|
||||
if self.round(msg.round) {
|
||||
return Ok(None);
|
||||
}
|
||||
} else {
|
||||
// Future round which we aren't ready to jump to, so return for now
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
// The paper executes these checks when the step is prevote. Making sure this message warrants
|
||||
// rerunning these checks is a sane optimization since message instances is a full iteration
|
||||
// of the round map
|
||||
if (self.step == Step::Prevote) && matches!(msg.data, Data::Prevote(_)) {
|
||||
let (participation, weight) = self.log.message_instances(self.round, Data::Prevote(None));
|
||||
// 34-35
|
||||
if participation >= self.weights.threshold() {
|
||||
let timeout = self.timeout(Step::Prevote);
|
||||
self.timeouts.entry(Step::Prevote).or_insert(timeout);
|
||||
}
|
||||
|
||||
// 44-46
|
||||
if weight >= self.weights.threshold() {
|
||||
self.broadcast(Data::Precommit(None));
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
// 47-48
|
||||
if matches!(msg.data, Data::Precommit(_)) &&
|
||||
self.log.has_participation(self.round, Step::Precommit)
|
||||
{
|
||||
let timeout = self.timeout(Step::Precommit);
|
||||
self.timeouts.entry(Step::Precommit).or_insert(timeout);
|
||||
}
|
||||
|
||||
let proposer = self.weights.proposer(self.number, self.round);
|
||||
if let Some(Data::Proposal(vr, block)) = self.log.get(self.round, proposer, Step::Propose) {
|
||||
// 22-33
|
||||
if self.step == Step::Propose {
|
||||
// Delay error handling (triggering a slash) until after we vote.
|
||||
let (valid, err) = match self.network.write().await.validate(block) {
|
||||
Ok(_) => (true, Ok(())),
|
||||
Err(BlockError::Temporal) => (false, Ok(())),
|
||||
Ok(_) => (true, Ok(None)),
|
||||
Err(BlockError::Temporal) => (false, Ok(None)),
|
||||
Err(BlockError::Fatal) => (false, Err(TendermintError::Malicious(proposer))),
|
||||
};
|
||||
// Create a raw vote which only requires block validity as a basis for the actual vote.
|
||||
@@ -388,13 +444,13 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
||||
vote = vote.or_else(|| raw_vote.filter(|_| locked_round.0 <= vr.0));
|
||||
}
|
||||
|
||||
debug_assert!(self.broadcast(Data::Prevote(vote)).await.is_none());
|
||||
self.broadcast(Data::Prevote(vote));
|
||||
return err;
|
||||
}
|
||||
} else {
|
||||
debug_assert!(self.broadcast(Data::Prevote(vote)).await.is_none());
|
||||
self.broadcast(Data::Prevote(vote));
|
||||
return err;
|
||||
}
|
||||
|
||||
err?;
|
||||
} else if self.valid.as_ref().map(|(round, _)| round != &self.round).unwrap_or(true) {
|
||||
// 36-43
|
||||
|
||||
@@ -411,44 +467,16 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
||||
self.valid = Some((self.round, block.clone()));
|
||||
if self.step == Step::Prevote {
|
||||
self.locked = Some((self.round, block.id()));
|
||||
return Ok(
|
||||
self
|
||||
.broadcast(Data::Precommit(Some((
|
||||
block.id(),
|
||||
self.signer.sign(block.id().as_ref()),
|
||||
))))
|
||||
.await,
|
||||
);
|
||||
self.broadcast(Data::Precommit(Some((
|
||||
block.id(),
|
||||
self.signer.sign(block.id().as_ref()),
|
||||
))));
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// The paper executes these checks when the step is prevote. Making sure this message warrants
|
||||
// rerunning these checks is a sane optimization since message instances is a full iteration
|
||||
// of the round map
|
||||
if (self.step == Step::Prevote) && matches!(msg.data, Data::Prevote(_)) {
|
||||
let (participation, weight) = self.log.message_instances(self.round, Data::Prevote(None));
|
||||
// 34-35
|
||||
if participation >= self.weights.threshold() {
|
||||
let timeout = self.timeout(Step::Prevote);
|
||||
self.timeouts.entry(Step::Prevote).or_insert(timeout);
|
||||
}
|
||||
|
||||
// 44-46
|
||||
if weight >= self.weights.threshold() {
|
||||
debug_assert!(self.broadcast(Data::Precommit(None)).await.is_none());
|
||||
}
|
||||
}
|
||||
|
||||
// 47-48
|
||||
if matches!(msg.data, Data::Precommit(_)) &&
|
||||
self.log.has_participation(self.round, Step::Precommit)
|
||||
{
|
||||
let timeout = self.timeout(Step::Precommit);
|
||||
self.timeouts.entry(Step::Precommit).or_insert(timeout);
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user