Timeout futures

This commit is contained in:
Luke Parker
2022-11-08 21:14:57 -05:00
parent 56a21ca6a6
commit 2cb1d35d89

View File

@@ -8,7 +8,7 @@ use std::{
use parity_scale_codec::{Encode, Decode}; use parity_scale_codec::{Encode, Decode};
use futures::{task::Poll, StreamExt, channel::mpsc}; use futures::{task::Poll, future, StreamExt, channel::mpsc};
use tokio::time::sleep; use tokio::time::sleep;
@@ -324,24 +324,34 @@ impl<N: Network + 'static> TendermintMachine<N> {
'outer: loop { 'outer: loop {
// Check if any timeouts have been triggered // Check if any timeouts have been triggered
let now = Instant::now(); let timeout_future = |step| {
let (t1, t2, t3) = { let timeout = self.timeouts.get(&step).copied();
let ready = |step| self.timeouts.get(&step).unwrap_or(&now) < &now; async move {
(ready(Step::Propose), ready(Step::Prevote), ready(Step::Precommit)) if let Some(timeout) = timeout {
sleep(timeout.saturating_duration_since(Instant::now())).await
} else {
future::pending::<()>().await
}
}
};
tokio::pin! {
let propose_timeout = timeout_future(Step::Propose);
let prevote_timeout = timeout_future(Step::Prevote);
let precommit_timeout = timeout_future(Step::Precommit);
}; };
// Propose timeout // Propose timeout
if t1 && (self.step == Step::Propose) { if futures::poll!(&mut propose_timeout).is_ready() && (self.step == Step::Propose) {
self.broadcast(Data::Prevote(None)); self.broadcast(Data::Prevote(None));
} }
// Prevote timeout // Prevote timeout
if t2 && (self.step == Step::Prevote) { if futures::poll!(&mut prevote_timeout).is_ready() && (self.step == Step::Prevote) {
self.broadcast(Data::Precommit(None)); self.broadcast(Data::Precommit(None));
} }
// Precommit timeout // Precommit timeout
if t3 { if futures::poll!(&mut precommit_timeout).is_ready() {
self.round(Round(self.round.0.wrapping_add(1))); self.round(Round(self.round.0.wrapping_add(1)));
} }