diff --git a/substrate/tendermint/machine/src/lib.rs b/substrate/tendermint/machine/src/lib.rs index a043ed24..d3fbd74a 100644 --- a/substrate/tendermint/machine/src/lib.rs +++ b/substrate/tendermint/machine/src/lib.rs @@ -8,8 +8,11 @@ use std::{ use parity_scale_codec::{Encode, Decode}; -use futures::{task::Poll, future, StreamExt, channel::mpsc}; - +use futures::{ + FutureExt, StreamExt, + future::{self, Fuse}, + channel::mpsc, +}; use tokio::time::sleep; /// Traits and types of the external network being integrated with to provide consensus over. @@ -111,10 +114,8 @@ pub struct TendermintMachine { start_time: Instant, personal_proposal: N::Block, - queue: VecDeque<( - bool, - Message::Signature>, - )>, + queue: + VecDeque::Signature>>, msg_recv: mpsc::UnboundedReceiver< SignedMessage::Signature>, >, @@ -181,10 +182,12 @@ impl TendermintMachine { let step = data.step(); // 27, 33, 41, 46, 60, 64 self.step = step; - self.queue.push_back(( - true, - Message { sender: self.validator_id, number: self.number, round: self.round, data }, - )); + self.queue.push_back(Message { + sender: self.validator_id, + number: self.number, + round: self.round, + data, + }); } // 14-21 @@ -234,7 +237,7 @@ impl TendermintMachine { self.start_time = round_end; self.personal_proposal = proposal; - self.queue = self.queue.drain(..).filter(|msg| msg.1.number == self.number).collect(); + self.queue = self.queue.drain(..).filter(|msg| msg.number == self.number).collect(); self.log = MessageLog::new(self.weights.clone()); self.end_time = HashMap::new(); @@ -322,17 +325,18 @@ impl TendermintMachine { pub async fn run(mut self) { self.round(Round(0)); - 'outer: loop { - // Check if any timeouts have been triggered + loop { + // Create futures for the various timeouts let timeout_future = |step| { let timeout = self.timeouts.get(&step).copied(); - async move { + (async move { if let Some(timeout) = timeout { sleep(timeout.saturating_duration_since(Instant::now())).await } else { future::pending::<()>().await } - } + }) + .fuse() }; tokio::pin! { let propose_timeout = timeout_future(Step::Propose); @@ -340,41 +344,56 @@ impl TendermintMachine { let precommit_timeout = timeout_future(Step::Precommit); }; - // Propose timeout - if futures::poll!(&mut propose_timeout).is_ready() && (self.step == Step::Propose) { - self.broadcast(Data::Prevote(None)); - } + // Also create a future for if the queue has a message + // Does not pop_front as if another message has higher priority, its future will be handled + // instead in this loop, and the popped value would be dropped with the next iteration + // While no other message has a higher priority right now, this is a safer practice + let mut queue_future = + if self.queue.is_empty() { Fuse::terminated() } else { future::ready(()).fuse() }; - // Prevote timeout - if futures::poll!(&mut prevote_timeout).is_ready() && (self.step == Step::Prevote) { - self.broadcast(Data::Precommit(None)); - } + if let Some((broadcast, msg)) = futures::select_biased! { + // Handle our messages + _ = queue_future => { + Some((true, self.queue.pop_front().unwrap())) + }, - // Precommit timeout - if futures::poll!(&mut precommit_timeout).is_ready() { - self.round(Round(self.round.0.wrapping_add(1))); - } + // Handle any timeouts + _ = &mut propose_timeout => { + // Remove the timeout so it doesn't persist, always being the selected future due to bias + // While this does enable the below get_entry calls to enter timeouts again, they'll + // never attempt to add a timeout after this timeout has expired + self.timeouts.remove(&Step::Propose); + if self.step == Step::Propose { + self.broadcast(Data::Prevote(None)); + } + None + }, + _ = &mut prevote_timeout => { + self.timeouts.remove(&Step::Prevote); + if self.step == Step::Prevote { + self.broadcast(Data::Precommit(None)); + } + None + }, + _ = &mut precommit_timeout => { + // Technically unnecessary since round() will clear the timeouts + self.timeouts.remove(&Step::Precommit); + self.round(Round(self.round.0.wrapping_add(1))); + continue; + }, - // Drain the channel of messages - loop { - match futures::poll!(self.msg_recv.next()) { - Poll::Ready(Some(msg)) => { + // Handle any received messages + msg = self.msg_recv.next() => { + if let Some(msg) = msg { if !msg.verify_signature(&self.validators) { continue; } - self.queue.push_back((false, msg.msg)); - } - Poll::Ready(None) => { - break 'outer; - } - Poll::Pending => { + Some((false, msg.msg)) + } else { break; } } - } - - // Handle the queue - if let Some((broadcast, msg)) = self.queue.pop_front() { + } { let res = self.message(msg.clone()).await; if res.is_err() && broadcast { panic!("honest node had invalid behavior"); @@ -416,9 +435,6 @@ impl TendermintMachine { self.network.broadcast(SignedMessage { msg, sig }).await; } } - - // futures::pending here does not work - tokio::task::yield_now().await; } }