From 56a21ca6a62a15693dfb7aebdc4af53536018588 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Tue, 8 Nov 2022 21:14:03 -0500 Subject: [PATCH] Use futures mpsc instead of tokio --- Cargo.lock | 1 + .../tendermint/client/src/authority/mod.rs | 4 +- substrate/tendermint/machine/Cargo.toml | 1 + substrate/tendermint/machine/src/lib.rs | 226 ++++++++++-------- substrate/tendermint/machine/tests/ext.rs | 18 +- 5 files changed, 137 insertions(+), 113 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c03c24d4..e1bc51e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8893,6 +8893,7 @@ name = "tendermint-machine" version = "0.1.0" dependencies = [ "async-trait", + "futures", "parity-scale-codec", "sp-runtime", "thiserror", diff --git a/substrate/tendermint/client/src/authority/mod.rs b/substrate/tendermint/client/src/authority/mod.rs index a51f9f71..06ea11c5 100644 --- a/substrate/tendermint/client/src/authority/mod.rs +++ b/substrate/tendermint/client/src/authority/mod.rs @@ -8,7 +8,7 @@ use async_trait::async_trait; use log::{warn, error}; use futures::{ - StreamExt, + SinkExt, StreamExt, channel::mpsc::{self, UnboundedSender}, }; @@ -170,7 +170,7 @@ impl TendermintAuthority { let (gossip_tx, mut gossip_rx) = mpsc::unbounded(); // Create the Tendermint machine - let handle = { + let mut handle = { // Set this struct as active *self.import.providers.write().await = Some(providers); self.active = Some(ActiveAuthority { diff --git a/substrate/tendermint/machine/Cargo.toml b/substrate/tendermint/machine/Cargo.toml index 17de1434..32b6add6 100644 --- a/substrate/tendermint/machine/Cargo.toml +++ b/substrate/tendermint/machine/Cargo.toml @@ -13,6 +13,7 @@ thiserror = "1" parity-scale-codec = { version = "3.2", features = ["derive"] } +futures = "0.3" tokio = { version = "1", features = ["macros", "sync", "time", "rt"] } sp-runtime = { git = "https://github.com/serai-dex/substrate", version = "6.0.0", optional = true } diff --git a/substrate/tendermint/machine/src/lib.rs b/substrate/tendermint/machine/src/lib.rs index 2eb0fb28..364f44a5 100644 --- a/substrate/tendermint/machine/src/lib.rs +++ b/substrate/tendermint/machine/src/lib.rs @@ -3,16 +3,14 @@ use core::fmt::Debug; use std::{ sync::Arc, time::{UNIX_EPOCH, SystemTime, Instant, Duration}, - collections::HashMap, + collections::{VecDeque, HashMap}, }; use parity_scale_codec::{Encode, Decode}; -use tokio::{ - task::{JoinHandle, yield_now}, - sync::mpsc::{self, error::TryRecvError}, - time::sleep, -}; +use futures::{task::Poll, StreamExt, channel::mpsc}; + +use tokio::time::sleep; /// Traits and types of the external network being integrated with to provide consensus over. pub mod ext; @@ -113,10 +111,13 @@ pub struct TendermintMachine { start_time: Instant, personal_proposal: N::Block, - queue: Vec<( + queue: VecDeque<( bool, Message::Signature>, )>, + msg_recv: mpsc::UnboundedReceiver< + SignedMessage::Signature>, + >, log: MessageLog, round: Round, @@ -129,15 +130,20 @@ pub struct TendermintMachine { timeouts: HashMap, } -/// A handle to an asynchronous task, along with a channel to inform of it of messages received. +pub type MessageSender = mpsc::UnboundedSender< + SignedMessage< + ::ValidatorId, + ::Block, + <::SignatureScheme as SignatureScheme>::Signature, + >, +>; + +/// A Tendermint machine and its channel to receive messages from the gossip layer over. pub struct TendermintHandle { /// Channel to send messages received from the P2P layer. - pub messages: mpsc::Sender< - SignedMessage::Signature>, - >, - /// Handle for the asynchronous task executing the machine. The task will automatically exit - /// when the channel is dropped. - pub handle: JoinHandle<()>, + pub messages: MessageSender, + /// Tendermint machine to be run on an asynchronous task. + pub machine: TendermintMachine, } impl TendermintMachine { @@ -175,7 +181,7 @@ impl TendermintMachine { let step = data.step(); // 27, 33, 41, 46, 60, 64 self.step = step; - self.queue.push(( + self.queue.push_back(( true, Message { sender: self.validator_id, number: self.number, round: self.round, data }, )); @@ -239,14 +245,19 @@ impl TendermintMachine { self.round(Round(0)); } - /// Create a new Tendermint machine, for the specified proposer, from the specified block, with - /// the specified block as the one to propose next, returning a handle for the machine. + /// Create a new Tendermint machine, from the specified point, with the specified block as the + /// one to propose next. This will return a channel to send messages from the gossip layer and + /// the machine itself. The machine should have `run` called from an asynchronous task. #[allow(clippy::new_ret_no_self)] - pub fn new(network: N, last: (BlockNumber, u64), proposal: N::Block) -> TendermintHandle { - let (msg_send, mut msg_recv) = mpsc::channel(100); // Backlog to accept. Currently arbitrary + pub async fn new( + network: N, + last: (BlockNumber, u64), + proposal: N::Block, + ) -> TendermintHandle { + let (msg_send, msg_recv) = mpsc::unbounded(); TendermintHandle { messages: msg_send, - handle: tokio::spawn(async move { + machine: { let last_end = UNIX_EPOCH + Duration::from_secs(last.1); // If the last block hasn't ended yet, sleep until it has @@ -271,7 +282,7 @@ impl TendermintMachine { let weights = Arc::new(network.weights()); let validator_id = signer.validator_id().await; // 01-10 - let mut machine = TendermintMachine { + TendermintMachine { network, signer, validators, @@ -284,14 +295,15 @@ impl TendermintMachine { // The end time of the last block is the start time for this one // The Commit explicitly contains the end time, so loading the last commit will provide // this. The only exception is for the genesis block, which doesn't have a commit - // Using the genesis time in place will cause this block to be created immediately after - // it, without the standard amount of separation (so their times will be equivalent or - // minimally offset) + // Using the genesis time in place will cause this block to be created immediately + // after it, without the standard amount of separation (so their times will be + // equivalent or minimally offset) // For callers wishing to avoid this, they should pass (0, GENESIS + BLOCK_TIME) start_time: last_time, personal_proposal: proposal, - queue: vec![], + queue: VecDeque::new(), + msg_recv, log: MessageLog::new(weights), round: Round(0), @@ -302,95 +314,101 @@ impl TendermintMachine { valid: None, timeouts: HashMap::new(), - }; - machine.round(Round(0)); + } + }, + } + } - loop { - // Check if any timeouts have been triggered - let now = Instant::now(); - let (t1, t2, t3) = { - let ready = |step| machine.timeouts.get(&step).unwrap_or(&now) < &now; - (ready(Step::Propose), ready(Step::Prevote), ready(Step::Precommit)) - }; + pub async fn run(mut self) { + self.round(Round(0)); - // Propose timeout - if t1 && (machine.step == Step::Propose) { - machine.broadcast(Data::Prevote(None)); - } + 'outer: loop { + // Check if any timeouts have been triggered + let now = Instant::now(); + let (t1, t2, t3) = { + let ready = |step| self.timeouts.get(&step).unwrap_or(&now) < &now; + (ready(Step::Propose), ready(Step::Prevote), ready(Step::Precommit)) + }; - // Prevote timeout - if t2 && (machine.step == Step::Prevote) { - machine.broadcast(Data::Precommit(None)); - } + // Propose timeout + if t1 && (self.step == Step::Propose) { + self.broadcast(Data::Prevote(None)); + } - // Precommit timeout - if t3 { - machine.round(Round(machine.round.0.wrapping_add(1))); - } + // Prevote timeout + if t2 && (self.step == Step::Prevote) { + self.broadcast(Data::Precommit(None)); + } - // Drain the channel of messages - let mut broken = false; - loop { - match msg_recv.try_recv() { - Ok(msg) => { - if !msg.verify_signature(&machine.validators) { - continue; - } - machine.queue.push((false, msg.msg)); - } - Err(TryRecvError::Empty) => break, - Err(TryRecvError::Disconnected) => broken = true, + // Precommit timeout + if t3 { + self.round(Round(self.round.0.wrapping_add(1))); + } + + // Drain the channel of messages + loop { + match futures::poll!(self.msg_recv.next()) { + Poll::Ready(Some(msg)) => { + if !msg.verify_signature(&self.validators) { + continue; } + self.queue.push_back((false, msg.msg)); } - if broken { + Poll::Ready(None) => { + break 'outer; + } + Poll::Pending => { break; } - - // Handle the queue - let mut queue = machine.queue.drain(..).collect::>(); - for (broadcast, msg) in queue.drain(..) { - let res = machine.message(msg.clone()).await; - if res.is_err() && broadcast { - panic!("honest node had invalid behavior"); - } - - match res { - Ok(None) => (), - Ok(Some(block)) => { - let mut validators = vec![]; - let mut sigs = vec![]; - for (v, sig) in machine.log.precommitted.iter().filter_map(|(k, (id, sig))| { - Some((*k, sig.clone())).filter(|_| id == &block.id()) - }) { - validators.push(v); - sigs.push(sig); - } - - let commit = Commit { - end_time: machine.canonical_end_time(msg.round), - validators, - signature: N::SignatureScheme::aggregate(&sigs), - }; - debug_assert!(machine.network.verify_commit(block.id(), &commit)); - - let proposal = machine.network.add_block(block, commit).await; - machine.reset(msg.round, proposal).await; - } - Err(TendermintError::Malicious(validator)) => { - machine.network.slash(validator).await; - } - Err(TendermintError::Temporal) => (), - } - - if broadcast { - let sig = machine.signer.sign(&msg.encode()).await; - machine.network.broadcast(SignedMessage { msg, sig }).await; - } - } - - yield_now().await; } - }), + } + + // Handle the queue + if let Some((broadcast, msg)) = self.queue.pop_front() { + let res = self.message(msg.clone()).await; + if res.is_err() && broadcast { + panic!("honest node had invalid behavior"); + } + + match res { + Ok(None) => (), + Ok(Some(block)) => { + let mut validators = vec![]; + let mut sigs = vec![]; + for (v, sig) in self + .log + .precommitted + .iter() + .filter_map(|(k, (id, sig))| Some((*k, sig.clone())).filter(|_| id == &block.id())) + { + validators.push(v); + sigs.push(sig); + } + + let commit = Commit { + end_time: self.canonical_end_time(msg.round), + validators, + signature: N::SignatureScheme::aggregate(&sigs), + }; + debug_assert!(self.network.verify_commit(block.id(), &commit)); + + let proposal = self.network.add_block(block, commit).await; + self.reset(msg.round, proposal).await; + } + Err(TendermintError::Malicious(validator)) => { + self.network.slash(validator).await; + } + Err(TendermintError::Temporal) => (), + } + + if broadcast { + let sig = self.signer.sign(&msg.encode()).await; + self.network.broadcast(SignedMessage { msg, sig }).await; + } + } + + // futures::pending here does not work + tokio::task::yield_now().await; } } diff --git a/substrate/tendermint/machine/tests/ext.rs b/substrate/tendermint/machine/tests/ext.rs index 0dd9c331..2f7cd666 100644 --- a/substrate/tendermint/machine/tests/ext.rs +++ b/substrate/tendermint/machine/tests/ext.rs @@ -7,9 +7,10 @@ use async_trait::async_trait; use parity_scale_codec::{Encode, Decode}; +use futures::SinkExt; use tokio::{sync::RwLock, time::sleep}; -use tendermint_machine::{ext::*, SignedMessage, TendermintMachine, TendermintHandle}; +use tendermint_machine::{ext::*, SignedMessage, MessageSender, TendermintMachine, TendermintHandle}; type TestValidatorId = u16; type TestBlockId = [u8; 4]; @@ -93,7 +94,7 @@ impl Block for TestBlock { } } -struct TestNetwork(u16, Arc>>>); +struct TestNetwork(u16, Arc>>>); #[async_trait] impl Network for TestNetwork { @@ -117,8 +118,8 @@ impl Network for TestNetwork { } async fn broadcast(&mut self, msg: SignedMessage) { - for handle in self.1.write().await.iter_mut() { - handle.messages.send(msg.clone()).await.unwrap(); + for messages in self.1.write().await.iter_mut() { + messages.send(msg.clone()).await.unwrap(); } } @@ -144,17 +145,20 @@ impl Network for TestNetwork { } impl TestNetwork { - async fn new(validators: usize) -> Arc>>> { + async fn new(validators: usize) -> Arc>>> { let arc = Arc::new(RwLock::new(vec![])); { let mut write = arc.write().await; for i in 0 .. validators { let i = u16::try_from(i).unwrap(); - write.push(TendermintMachine::new( + let TendermintHandle { messages, machine } = TendermintMachine::new( TestNetwork(i, arc.clone()), (BlockNumber(1), (SystemTime::now().duration_since(UNIX_EPOCH)).unwrap().as_secs()), TestBlock { id: 1u32.to_le_bytes(), valid: Ok(()) }, - )); + ) + .await; + tokio::task::spawn(machine.run()); + write.push(messages); } } arc