diff --git a/Cargo.lock b/Cargo.lock index 4c2cbfa0..ac7fd188 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -256,6 +256,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "async-recursion" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cda8f4bcc10624c4e85bc66b3f452cca98cfa5ca002dc83a16aad2367641bea" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-std" version = "1.12.0" @@ -8857,6 +8868,8 @@ dependencies = [ name = "tendermint-machine" version = "0.1.0" dependencies = [ + "async-recursion", + "async-trait", "tokio", ] diff --git a/substrate/tendermint/Cargo.toml b/substrate/tendermint/Cargo.toml index aa53034b..d626357a 100644 --- a/substrate/tendermint/Cargo.toml +++ b/substrate/tendermint/Cargo.toml @@ -8,4 +8,6 @@ authors = ["Luke Parker "] edition = "2021" [dependencies] -tokio = "1" +async-recursion = "1.0" +async-trait = "0.1" +tokio = { version = "1", features = ["macros", "rt", "sync"] } diff --git a/substrate/tendermint/src/ext.rs b/substrate/tendermint/src/ext.rs index ed6150c4..3df8b62a 100644 --- a/substrate/tendermint/src/ext.rs +++ b/substrate/tendermint/src/ext.rs @@ -1,7 +1,10 @@ use core::{hash::Hash, fmt::Debug}; +use std::sync::Arc; -pub trait ValidatorId: Clone + Copy + PartialEq + Eq + Hash + Debug {} -impl ValidatorId for V {} +use crate::Message; + +pub trait ValidatorId: Send + Sync + Clone + Copy + PartialEq + Eq + Hash + Debug {} +impl ValidatorId for V {} // Type aliases which are distinct according to the type system #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] @@ -17,23 +20,42 @@ pub enum BlockError { Temporal, } -pub trait Block: Clone + PartialEq { - type Id: Copy + Clone + PartialEq; +pub trait Block: Send + Sync + Clone + PartialEq + Debug { + type Id: Send + Sync + Copy + Clone + PartialEq + Debug; fn id(&self) -> Self::Id; } -pub trait Network { +pub trait Weights: Send + Sync { type ValidatorId: ValidatorId; - type Block: Block; fn total_weight(&self) -> u64; fn weight(&self, validator: Self::ValidatorId) -> u64; fn threshold(&self) -> u64 { ((self.total_weight() * 2) / 3) + 1 } + fn fault_thresold(&self) -> u64 { + (self.total_weight() - self.threshold()) + 1 + } + /// Weighted round robin function. fn proposer(&self, number: BlockNumber, round: Round) -> Self::ValidatorId; - - fn validate(&mut self, block: Self::Block) -> Result<(), BlockError>; +} + +#[async_trait::async_trait] +pub trait Network: Send + Sync { + type ValidatorId: ValidatorId; + type Weights: Weights; + type Block: Block; + + fn weights(&self) -> Arc; + + async fn broadcast(&mut self, msg: Message); + + // TODO: Should this take a verifiable reason? + async fn slash(&mut self, validator: Self::ValidatorId); + + fn validate(&mut self, block: &Self::Block) -> Result<(), BlockError>; + // Add a block and return the proposal for the next one + fn add_block(&mut self, block: Self::Block) -> Self::Block; } diff --git a/substrate/tendermint/src/lib.rs b/substrate/tendermint/src/lib.rs index 7bb15baf..e36c0750 100644 --- a/substrate/tendermint/src/lib.rs +++ b/substrate/tendermint/src/lib.rs @@ -1,7 +1,18 @@ +use std::{sync::Arc, time::Instant, collections::HashMap}; + +use tokio::{ + task::{JoinHandle, yield_now}, + sync::{ + RwLock, + mpsc::{self, error::TryRecvError}, + }, +}; + pub mod ext; use ext::*; mod message_log; +use message_log::MessageLog; #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] enum Step { @@ -10,9 +21,9 @@ enum Step { Precommit, } -#[derive(Clone, PartialEq)] +#[derive(Clone, PartialEq, Debug)] enum Data { - Proposal(Option, B), + Proposal(Option, B), Prevote(Option), Precommit(Option), } @@ -27,8 +38,8 @@ impl Data { } } -#[derive(Clone, PartialEq)] -struct Message { +#[derive(Clone, PartialEq, Debug)] +pub struct Message { sender: V, number: BlockNumber, @@ -38,158 +49,155 @@ struct Message { } #[derive(Clone, Copy, PartialEq, Eq, Debug)] -enum TendermintError { +pub enum TendermintError { Malicious(V), - Offline(V), Temporal, } -/* -use std::collections::HashMap; +pub struct TendermintMachine { + network: Arc>, + weights: Arc, + proposer: N::ValidatorId, -use tokio::{ - task::{JoinHandle, spawn}, - sync::mpsc, -}; + number: BlockNumber, + personal_proposal: N::Block, -#[derive(Debug)] -struct TendermintMachine { - proposer: ValidatorId, - personal_proposal: Option, - - number: u32, - - log_map: HashMap>>, - precommitted: HashMap, - - round: u32, + log: MessageLog, + round: Round, step: Step, - locked: Option<(u32, Block)>, - valid: Option<(u32, Block)>, - timeouts: Arc>>, // TODO: Remove Arc RwLock + locked: Option<(Round, N::Block)>, + valid: Option<(Round, N::Block)>, + + timeouts: HashMap, } -#[derive(Debug)] -struct TendermintHandle { - block: Arc>>, - messages: mpsc::Sender, - broadcast: mpsc::Receiver, - handle: JoinHandle<()>, +pub struct TendermintHandle { + // Messages received + pub messages: mpsc::Sender>, + // Async task executing the machine + pub handle: JoinHandle<()>, } -impl TendermintMachine { - fn broadcast(&self, data: Data) -> Option { +impl TendermintMachine { + fn timeout(&self, step: Step) -> Instant { + todo!() + } + + #[async_recursion::async_recursion] + async fn broadcast(&mut self, data: Data) -> Option { let msg = Message { sender: self.proposer, number: self.number, round: self.round, data }; - let res = self.message(msg).unwrap(); - self.broadcast.send(msg).unwrap(); + let res = self.message(msg.clone()).await.unwrap(); + self.network.write().await.broadcast(msg).await; res } // 14-21 - fn round_propose(&mut self) { - // This will happen if it's a new block and propose hasn't been called yet - if self.personal_proposal.is_none() { - // Ensure it's actually a new block. Else, the caller failed to provide necessary data yet - // is still executing the machine - debug_assert_eq!(self.round, 0); - return; - } - - if proposer(self.number, self.round) == self.proposer { - let (round, block) = if let Some((round, block)) = self.valid { - (Some(round), block) + async fn round_propose(&mut self) { + if self.weights.proposer(self.number, self.round) == self.proposer { + let (round, block) = if let Some((round, block)) = &self.valid { + (Some(*round), block.clone()) } else { - (None, self.personal_proposal.unwrap()) + (None, self.personal_proposal.clone()) }; - debug_assert!(self.broadcast(Data::Proposal(round, block)).is_none()); + debug_assert!(self.broadcast(Data::Proposal(round, block)).await.is_none()); } else { - self.timeouts.write().unwrap().insert(Step::Propose, self.timeout(Step::Propose)); + self.timeouts.insert(Step::Propose, self.timeout(Step::Propose)); } } // 11-13 - fn round(&mut self, round: u32) { + async fn round(&mut self, round: Round) { self.round = round; self.step = Step::Propose; - self.round_propose(); - } - - /// Called whenever a new block occurs - fn propose(&mut self, block: Block) { - self.personal_proposal = Some(block); - self.round_propose(); + self.round_propose().await; } // 1-9 - fn reset(&mut self) { - self.personal_proposal = None; + async fn reset(&mut self, proposal: N::Block) { + self.number.0 += 1; + self.personal_proposal = proposal; - self.number += 1; - - self.log_map = HashMap::new(); - self.precommitted = HashMap::new(); + self.log = MessageLog::new(self.network.read().await.weights()); self.locked = None; self.valid = None; - self.round(0); + self.timeouts = HashMap::new(); + + self.round(Round(0)).await; } // 10 - pub fn new(proposer: ValidatorId, number: u32) -> TendermintHandle { - let block = Arc::new(RwLock::new(None)); + pub fn new( + network: N, + proposer: N::ValidatorId, + number: BlockNumber, + proposal: N::Block, + ) -> TendermintHandle { let (msg_send, mut msg_recv) = mpsc::channel(100); // Backlog to accept. Currently arbitrary - let (broadcast_send, broadcast_recv) = mpsc::channel(5); TendermintHandle { - block: block.clone(), messages: msg_send, - broadcast: broadcast_recv, - handle: tokio::spawn(async { - let machine = TendermintMachine { + handle: tokio::spawn(async move { + let weights = network.weights(); + let network = Arc::new(RwLock::new(network)); + let mut machine = TendermintMachine { + network, + weights: weights.clone(), proposer, - personal_proposal: None, number, + personal_proposal: proposal, - log_map: HashMap::new(), - precommitted: HashMap::new(), + log: MessageLog::new(weights), + round: Round(0), + step: Step::Propose, locked: None, valid: None, - round: 0, - step: Step::Propose, + timeouts: HashMap::new(), }; + dbg!("Proposing"); + machine.round_propose().await; loop { - if self.personal_proposal.is_none() { - let block = block.lock().unwrap(); - if block.is_some() { - self.personal_proposal = Some(block.take()); - } else { - tokio::yield_now().await; - continue; - } - } - + // Check if any timeouts have been triggered let now = Instant::now(); let (t1, t2, t3) = { - let timeouts = self.timeouts.read().unwrap(); - let ready = |step| timeouts.get(step).unwrap_or(now) < now; + let ready = |step| machine.timeouts.get(&step).unwrap_or(&now) < &now; (ready(Step::Propose), ready(Step::Prevote), ready(Step::Precommit)) }; - if t1 { // Propose timeout - } - if t2 { // Prevote timeout - } - if t3 { // Precommit timeout + // Propose timeout + if t1 { + todo!() } - match recv.try_recv() { - Ok(msg) => machine.message(msg), - Err(TryRecvError::Empty) => tokio::yield_now().await, + // Prevote timeout + if t2 { + todo!() + } + + // Precommit timeout + if t3 { + todo!() + } + + // If there's a message, handle it + match msg_recv.try_recv() { + Ok(msg) => match machine.message(msg).await { + Ok(None) => (), + Ok(Some(block)) => { + let proposal = machine.network.write().await.add_block(block); + machine.reset(proposal).await + } + Err(TendermintError::Malicious(validator)) => { + machine.network.write().await.slash(validator).await + } + Err(TendermintError::Temporal) => (), + }, + Err(TryRecvError::Empty) => yield_now().await, Err(TryRecvError::Disconnected) => break, } } @@ -198,32 +206,27 @@ impl TendermintMachine { } // 49-54 - fn check_committed(&mut self, round_num: u32) -> Option { - let proposer = proposer(self.number, round_num); - // Safe as we only check for rounds which we received a message for - let round = self.log_map[&round_num]; + fn check_committed(&mut self, round: Round) -> Option { + let proposer = self.weights.proposer(self.number, round); // Get the proposal - if let Some(proposal) = round.get(&proposer).map(|p| p.get(&Step::Propose)).flatten() { + if let Some(proposal) = self.log.get(round, proposer, Step::Propose) { // Destructure debug_assert!(matches!(proposal, Data::Proposal(..))); if let Data::Proposal(_, block) = proposal { // Check if it has gotten a sufficient amount of precommits let (participants, weight) = - self.message_instances(round_num, Data::Precommit(Some(block.hash))); + self.log.message_instances(round, Data::Precommit(Some(block.id()))); - let threshold = ((VALIDATORS / 3) * 2) + 1; - if weight >= threshold.into() { - self.reset(); - return Some(*block); + let threshold = self.weights.threshold(); + if weight >= threshold { + return Some(block.clone()); } // 47-48 - if participants >= threshold.into() { - let map = self.timeouts.write().unwrap(); - if !map.contains_key(Step::Precommit) { - map.insert(Step::Precommit, self.timeout(Step::Precommit)); - } + if participants >= threshold { + let timeout = self.timeout(Step::Precommit); + self.timeouts.entry(Step::Precommit).or_insert(timeout); } } } @@ -231,16 +234,21 @@ impl TendermintMachine { None } - fn message(&mut self, msg: Message) -> Result, TendermintError> { + async fn message( + &mut self, + msg: Message, + ) -> Result, TendermintError> { if msg.number != self.number { Err(TendermintError::Temporal)?; } - if matches!(msg.data, Data::Proposal(..)) && (msg.sender != proposer(msg.height, msg.round)) { + if matches!(msg.data, Data::Proposal(..)) && + (msg.sender != self.weights.proposer(msg.number, msg.round)) + { Err(TendermintError::Malicious(msg.sender))?; }; - if !self.log(msg)? { + if !self.log.log(msg.clone())? { return Ok(None); } @@ -254,36 +262,38 @@ impl TendermintMachine { } // Else, check if we need to jump ahead - let round = self.log_map[&self.round]; - if msg.round < self.round { + if msg.round.0 < self.round.0 { return Ok(None); - } else if msg.round > self.round { + } else if msg.round.0 > self.round.0 { // 55-56 - // TODO: Move to weight - if round.len() > ((VALIDATORS / 3) + 1).into() { + if self.log.round_participation(self.round) > self.weights.fault_thresold() { self.round(msg.round); } else { return Ok(None); } } + let proposal = self + .log + .get(self.round, self.weights.proposer(self.number, self.round), Step::Propose) + .cloned(); if self.step == Step::Propose { - if let Some(proposal) = - round.get(&proposer(self.number, self.round)).map(|p| p.get(&Step::Propose)).flatten() - { + if let Some(proposal) = &proposal { debug_assert!(matches!(proposal, Data::Proposal(..))); if let Data::Proposal(vr, block) = proposal { if let Some(vr) = vr { // 28-33 - let vr = *vr; - if (vr < self.round) && self.has_consensus(vr, Data::Prevote(Some(block.hash))) { + if (vr.0 < self.round.0) && self.log.has_consensus(*vr, Data::Prevote(Some(block.id()))) + { debug_assert!(self - .broadcast(Data::Prevote(Some(block.hash).filter(|_| { + .broadcast(Data::Prevote(Some(block.id()).filter(|_| { self .locked - .map(|(round, value)| (round <= vr) || (block == &value)) + .as_ref() + .map(|(round, value)| (round.0 <= vr.0) || (block.id() == value.id())) .unwrap_or(true) }))) + .await .is_none()); self.step = Step::Prevote; } else { @@ -291,11 +301,16 @@ impl TendermintMachine { } } else { // 22-27 - valid(&block).map_err(|_| TendermintError::Malicious(msg.sender))?; + self + .network + .write() + .await + .validate(block) + .map_err(|_| TendermintError::Malicious(msg.sender))?; debug_assert!(self - .broadcast(Data::Prevote(Some(block.hash).filter( - |_| self.locked.is_none() || self.locked.map(|locked| &locked.1) == Some(block) - ))) + .broadcast(Data::Prevote(Some(block.id()).filter(|_| self.locked.is_none() || + self.locked.as_ref().map(|locked| locked.1.id()) == Some(block.id())))) + .await .is_none()); self.step = Step::Prevote; } @@ -304,23 +319,36 @@ impl TendermintMachine { } if self.step == Step::Prevote { - let (participation, weight) = self.message_instances(self.round, Data::Prevote(None)); + let (participation, weight) = self.log.message_instances(self.round, Data::Prevote(None)); // 34-35 - if participation > (((VALIDATORS / 3) * 2) + 1).into() { - let map = self.timeouts.write().unwrap(); - if !map.contains_key(Step::Prevote) { - map.insert(Step::Prevote, self.timeout(Step::Prevote)) - } + if participation > self.weights.threshold() { + let timeout = self.timeout(Step::Prevote); + self.timeouts.entry(Step::Prevote).or_insert(timeout); } // 44-46 - if (weight > (((VALIDATORS / 3) * 2) + 1).into()) && first { - debug_assert!(self.broadcast(Data::Precommit(None)).is_none()); + if weight > self.weights.threshold() { + debug_assert!(self.broadcast(Data::Precommit(None)).await.is_none()); self.step = Step::Precommit; } } + if (self.valid.is_none()) && ((self.step == Step::Prevote) || (self.step == Step::Precommit)) { + if let Some(proposal) = proposal { + debug_assert!(matches!(proposal, Data::Proposal(..))); + if let Data::Proposal(_, block) = proposal { + if self.log.has_consensus(self.round, Data::Prevote(Some(block.id()))) { + self.valid = Some((self.round, block.clone())); + if self.step == Step::Prevote { + self.locked = self.valid.clone(); + self.step = Step::Precommit; + return Ok(self.broadcast(Data::Precommit(Some(block.id()))).await); + } + } + } + } + } + Ok(None) } } -*/ diff --git a/substrate/tendermint/src/message_log.rs b/substrate/tendermint/src/message_log.rs index 2df03548..45505db4 100644 --- a/substrate/tendermint/src/message_log.rs +++ b/substrate/tendermint/src/message_log.rs @@ -3,14 +3,14 @@ use std::{sync::Arc, collections::HashMap}; use crate::{ext::*, Round, Step, Data, Message, TendermintError}; pub(crate) struct MessageLog { - network: Arc, + weights: Arc, precommitted: HashMap::Id>, log: HashMap>>>, } impl MessageLog { - pub(crate) fn new(network: Arc) -> MessageLog { - MessageLog { network, precommitted: HashMap::new(), log: HashMap::new() } + pub(crate) fn new(weights: Arc) -> MessageLog { + MessageLog { weights, precommitted: HashMap::new(), log: HashMap::new() } } // Returns true if it's a new message @@ -51,7 +51,7 @@ impl MessageLog { let mut weight = 0; for (participant, msgs) in &self.log[&round] { if let Some(msg) = msgs.get(&data.step()) { - let validator_weight = self.network.weight(*participant); + let validator_weight = self.weights.weight(*participant); participating += validator_weight; if &data == msg { weight += validator_weight; @@ -61,6 +61,17 @@ impl MessageLog { (participating, weight) } + // Get the participation in a given round + pub(crate) fn round_participation(&self, round: Round) -> u64 { + let mut weight = 0; + if let Some(round) = self.log.get(&round) { + for participant in round.keys() { + weight += self.weights.weight(*participant); + } + }; + weight + } + // Get the participation in a given round for a given step. pub(crate) fn participation(&self, round: Round, step: Step) -> u64 { let (participating, _) = self.message_instances( @@ -76,13 +87,13 @@ impl MessageLog { // Check if there's been a BFT level of participation pub(crate) fn has_participation(&self, round: Round, step: Step) -> bool { - self.participation(round, step) >= self.network.threshold() + self.participation(round, step) >= self.weights.threshold() } // Check if consensus has been reached on a specific piece of data pub(crate) fn has_consensus(&self, round: Round, data: Data) -> bool { let (_, weight) = self.message_instances(round, data); - weight >= self.network.threshold() + weight >= self.weights.threshold() } pub(crate) fn get( diff --git a/substrate/tendermint/tests/ext.rs b/substrate/tendermint/tests/ext.rs index fe967b8b..ef19a973 100644 --- a/substrate/tendermint/tests/ext.rs +++ b/substrate/tendermint/tests/ext.rs @@ -1,36 +1,99 @@ -use tendermint_machine::ext::*; +use std::sync::Arc; -#[derive(Clone, PartialEq)] +use tokio::sync::{RwLock, mpsc}; + +use tendermint_machine::{ext::*, Message, TendermintMachine, TendermintHandle}; + +type TestValidatorId = u16; +type TestBlockId = u32; + +#[derive(Clone, PartialEq, Debug)] struct TestBlock { - id: u32, + id: TestBlockId, valid: Result<(), BlockError>, } impl Block for TestBlock { - type Id = u32; + type Id = TestBlockId; - fn id(&self) -> u32 { + fn id(&self) -> TestBlockId { self.id } } -struct TestNetwork; -impl Network for TestNetwork { - type ValidatorId = u16; - type Block = TestBlock; +struct TestWeights; +impl Weights for TestWeights { + type ValidatorId = TestValidatorId; fn total_weight(&self) -> u64 { 5 } - fn weight(&self, id: u16) -> u64 { + fn weight(&self, id: TestValidatorId) -> u64 { [1, 1, 1, 1, 1][usize::try_from(id).unwrap()] } - fn proposer(&self, number: BlockNumber, round: Round) -> u16 { - u16::try_from((number.0 + u32::from(round.0)) % 5).unwrap() - } - - fn validate(&mut self, block: TestBlock) -> Result<(), BlockError> { - block.valid + fn proposer(&self, number: BlockNumber, round: Round) -> TestValidatorId { + TestValidatorId::try_from((number.0 + u32::from(round.0)) % 5).unwrap() + } +} + +struct TestNetwork(Arc>>>); + +#[async_trait::async_trait] +impl Network for TestNetwork { + type ValidatorId = TestValidatorId; + type Weights = TestWeights; + type Block = TestBlock; + + fn weights(&self) -> Arc { + Arc::new(TestWeights) + } + + async fn broadcast(&mut self, msg: Message) { + for handle in self.0.write().await.iter_mut() { + handle.messages.send(msg.clone()).await.unwrap(); + } + } + + async fn slash(&mut self, validator: TestValidatorId) { + dbg!("Slash"); + todo!() + } + + fn validate(&mut self, block: &TestBlock) -> Result<(), BlockError> { + block.valid + } + + fn add_block(&mut self, block: TestBlock) -> TestBlock { + dbg!("Adding ", &block); + assert!(block.valid.is_ok()); + TestBlock { id: block.id + 1, valid: Ok(()) } + } +} + +impl TestNetwork { + async fn new(validators: usize) -> Arc>>> { + let arc = Arc::new(RwLock::new(vec![])); + { + let mut write = arc.write().await; + for i in 0 .. validators { + write.push(TendermintMachine::new( + TestNetwork(arc.clone()), + u16::try_from(i).unwrap(), + BlockNumber(1), + TestBlock { id: 1, valid: Ok(()) }, + )); + } + } + dbg!("Created all machines"); + arc + } +} + +#[tokio::test] +async fn test() { + TestNetwork::new(4).await; + loop { + tokio::task::yield_now().await; } }