From d081934725352d37f7e969d399710316fe2c38a5 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Wed, 12 Oct 2022 21:36:40 -0400 Subject: [PATCH] Time code --- substrate/consensus/src/tendermint/mod.rs | 94 +++++++++++++++++------ 1 file changed, 72 insertions(+), 22 deletions(-) diff --git a/substrate/consensus/src/tendermint/mod.rs b/substrate/consensus/src/tendermint/mod.rs index 3057c01d..54c83c08 100644 --- a/substrate/consensus/src/tendermint/mod.rs +++ b/substrate/consensus/src/tendermint/mod.rs @@ -1,5 +1,7 @@ use std::collections::HashMap; +use tokio::{task::{JoinHandle, spawn}, sync::mpsc}; + type ValidatorId = u16; const VALIDATORS: ValidatorId = 5; @@ -68,11 +70,7 @@ fn proposer(height: u32, round: u32) -> ValidatorId { ValidatorId::try_from((height + round) % u32::try_from(VALIDATORS).unwrap()).unwrap() } -fn broadcast(msg: Message) { - todo!(); -} - -#[derive(Clone, PartialEq, Eq, Debug)] +#[derive(Debug)] struct TendermintMachine { proposer: ValidatorId, personal_proposal: Option, @@ -86,13 +84,23 @@ struct TendermintMachine { step: Step, locked: Option<(u32, Block)>, valid: Option<(u32, Block)>, + + timeouts: Arc>>, // TODO: Remove Arc RwLock +} + +#[derive(Debug)] +struct TendermintHandle { + block: Arc>>, + messages: mpsc::Sender, + broadcast: mpsc::Receiver, + handle: JoinHandle<()>, } impl TendermintMachine { fn broadcast(&self, data: Data) -> Option { let msg = Message { sender: self.proposer, height: self.height, round: self.round, data }; let res = self.message(msg).unwrap(); - broadcast(msg); + self.broadcast.send(msg).unwrap(); res } @@ -114,7 +122,7 @@ impl TendermintMachine { }; debug_assert!(self.broadcast(Data::Proposal(round, block)).is_none()); } else { - // TODO schedule timeout propose + self.timeouts.write().unwrap().insert(Step::Precommit, self.timeout(Step::Precommit)); } } @@ -126,7 +134,7 @@ impl TendermintMachine { } /// Called whenever a new height occurs - pub fn propose(&mut self, block: Block) { + fn propose(&mut self, block: Block) { self.personal_proposal = Some(block); self.round_propose(); } @@ -147,21 +155,63 @@ impl TendermintMachine { } // 10 - pub fn new(proposer: ValidatorId, height: u32) -> TendermintMachine { - TendermintMachine { - proposer, - personal_proposal: None, + pub fn new(proposer: ValidatorId, height: u32) -> TendermintHandle { + let block = Arc::new(RwLock::new(None)); + let (msg_send, mut msg_recv) = mpsc::channel(100); // Backlog to accept. Currently arbitrary + let (broadcast_send, broadcast_recv) = mpsc::channel(5); + TendermintHandle { + block: block.clone(), + messages: msg_send, + broadcast: broadcast_recv, + handle: tokio::spawn(async { + let machine = TendermintMachine { + proposer, + personal_proposal: None, - height, + height, - log_map: HashMap::new(), - precommitted: HashMap::new(), + log_map: HashMap::new(), + precommitted: HashMap::new(), - locked: None, - valid: None, + locked: None, + valid: None, - round: 0, - step: Step::Propose + round: 0, + step: Step::Propose + }; + + loop { + if self.personal_proposal.is_none() { + let block = block.lock().unwrap(); + if block.is_some() { + self.personal_proposal = Some(block.take()); + } else { + tokio::yield_now().await; + continue; + } + } + + let now = Instant::now(); + let (t1, t2, t3) = { +let timeouts = self.timeouts.read().unwrap(); +let ready = |step| timeouts.get(step).unwrap_or(now) < now; +(ready(Step::Propose), ready(Step::Prevote), ready(Step::Precommit)) +}; + +if t1 { // Propose timeout +} +if t2 { // Prevote timeout +} +if t3 { // Precommit timeout +} + + match recv.try_recv() { + Ok(msg) => machine.message(msg), + Err(TryRecvError::Empty) => tokio::yield_now().await, + Err(TryRecvError::Disconnected) => break + } + } + }) } } @@ -269,7 +319,7 @@ impl TendermintMachine { None } - pub fn message(&mut self, msg: Message) -> Result, TendermintError> { + fn message(&mut self, msg: Message) -> Result, TendermintError> { if msg.height != self.height { Err(TendermintError::Temporal)?; } @@ -332,7 +382,7 @@ impl TendermintMachine { let (participation, weight) = self.message_instances(self.round, Data::Prevote(None)); // 34-35 if (participation > (((VALIDATORS / 3) * 2) + 1).into()) && first { - // TODO: Schedule timeout prevote + self.timeouts.write().unwrap().insert(Step::Prevote, self.timeout(Step::Prevote)); } // 44-46 @@ -344,7 +394,7 @@ impl TendermintMachine { // 47-48 if self.has_participation(self.round, Step::Precommit) && first { - // TODO: Schedule timeout precommit + self.timeouts.write().unwrap().insert(Step::Precommit, self.timeout(Step::Precommit)); } Ok(None)