From 2cb1d35d89210e00856001130d446ef1b6389cca Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Tue, 8 Nov 2022 21:14:57 -0500 Subject: [PATCH] Timeout futures --- substrate/tendermint/machine/src/lib.rs | 26 +++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/substrate/tendermint/machine/src/lib.rs b/substrate/tendermint/machine/src/lib.rs index 364f44a5..a043ed24 100644 --- a/substrate/tendermint/machine/src/lib.rs +++ b/substrate/tendermint/machine/src/lib.rs @@ -8,7 +8,7 @@ use std::{ use parity_scale_codec::{Encode, Decode}; -use futures::{task::Poll, StreamExt, channel::mpsc}; +use futures::{task::Poll, future, StreamExt, channel::mpsc}; use tokio::time::sleep; @@ -324,24 +324,34 @@ impl TendermintMachine { 'outer: loop { // Check if any timeouts have been triggered - let now = Instant::now(); - let (t1, t2, t3) = { - let ready = |step| self.timeouts.get(&step).unwrap_or(&now) < &now; - (ready(Step::Propose), ready(Step::Prevote), ready(Step::Precommit)) + let timeout_future = |step| { + let timeout = self.timeouts.get(&step).copied(); + async move { + if let Some(timeout) = timeout { + sleep(timeout.saturating_duration_since(Instant::now())).await + } else { + future::pending::<()>().await + } + } + }; + tokio::pin! { + let propose_timeout = timeout_future(Step::Propose); + let prevote_timeout = timeout_future(Step::Prevote); + let precommit_timeout = timeout_future(Step::Precommit); }; // Propose timeout - if t1 && (self.step == Step::Propose) { + if futures::poll!(&mut propose_timeout).is_ready() && (self.step == Step::Propose) { self.broadcast(Data::Prevote(None)); } // Prevote timeout - if t2 && (self.step == Step::Prevote) { + if futures::poll!(&mut prevote_timeout).is_ready() && (self.step == Step::Prevote) { self.broadcast(Data::Precommit(None)); } // Precommit timeout - if t3 { + if futures::poll!(&mut precommit_timeout).is_ready() { self.round(Round(self.round.0.wrapping_add(1))); }