From aee0bde45d40f3be7a0fb43bd0a3e4c4b486b4bc Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sun, 30 Oct 2022 05:37:23 -0400 Subject: [PATCH] Connect broadcast --- substrate/tendermint/client/src/tendermint.rs | 70 ++++++++++++------- substrate/tendermint/machine/src/lib.rs | 2 +- 2 files changed, 45 insertions(+), 27 deletions(-) diff --git a/substrate/tendermint/client/src/tendermint.rs b/substrate/tendermint/client/src/tendermint.rs index 54f9e6ca..deb51c5f 100644 --- a/substrate/tendermint/client/src/tendermint.rs +++ b/substrate/tendermint/client/src/tendermint.rs @@ -48,6 +48,7 @@ pub(crate) struct TendermintImport { validators: Arc>, number: Arc>, + gossip_queue: Arc>>>, importing_block: Arc::Hash>>>, pub(crate) machine: Arc>>>, @@ -108,33 +109,48 @@ impl TendermintAuthority { let mut last_number = last_number.0 + 1; let mut recv = gossip .messages_for(TendermintGossip::>::topic::(last_number)); - loop { - match recv.try_next() { - Ok(Some(msg)) => handle - .messages - .send(match SignedMessage::decode(&mut msg.message.as_ref()) { - Ok(msg) => msg, - Err(e) => { - warn!("couldn't decode valid message: {}", e); - continue; + 'outer: loop { + // Send out any queued messages + let mut queue = self.0.gossip_queue.write().unwrap().drain(..).collect::>(); + for msg in queue.drain(..) { + gossip.gossip_message( + TendermintGossip::>::topic::(msg.number().0), + msg.encode(), + false, + ); + } + + // Handle any received messages + // Makes sure to handle all pending messages before acquiring the out-queue lock again + 'inner: loop { + match recv.try_next() { + Ok(Some(msg)) => handle + .messages + .send(match SignedMessage::decode(&mut msg.message.as_ref()) { + Ok(msg) => msg, + Err(e) => { + warn!("couldn't decode valid message: {}", e); + continue; + } + }) + .await + .unwrap(), + Ok(None) => break 'outer, + // No messages available + Err(_) => { + // Check if we the block updated and should be listening on a different topic + let curr = *self.0.number.read().unwrap(); + if last_number != curr { + last_number = curr; + // TODO: Will this return existing messages on the new height? Or will those have been + // ignored and are now gone? + recv = gossip.messages_for(TendermintGossip::>::topic::< + T::Block, + >(last_number)); } - }) - .await - .unwrap(), - Ok(None) => break, - // No messages available - Err(_) => { - // Check if we the block updated and should be listening on a different topic - let curr = *self.0.number.read().unwrap(); - if last_number != curr { - last_number = curr; - // TODO: Will this return existing messages on the new height? Or will those have been - // ignored and are now gone? - recv = gossip.messages_for(TendermintGossip::>::topic::< - T::Block, - >(last_number)); + yield_now().await; + break 'inner; } - yield_now().await; } } } @@ -149,6 +165,7 @@ impl Clone for TendermintImport { validators: self.validators.clone(), number: self.number.clone(), + gossip_queue: self.gossip_queue.clone(), importing_block: self.importing_block.clone(), machine: self.machine.clone(), @@ -175,6 +192,7 @@ impl TendermintImport { validators: Arc::new(TendermintValidators::new(client.clone())), number: Arc::new(RwLock::new(0)), + gossip_queue: Arc::new(RwLock::new(vec![])), importing_block: Arc::new(RwLock::new(None)), machine: Arc::new(RwLock::new(None)), @@ -359,7 +377,7 @@ impl Network for TendermintImport { } async fn broadcast(&mut self, msg: SignedMessage) { - // TODO + self.gossip_queue.write().unwrap().push(msg); } async fn slash(&mut self, validator: u16) { diff --git a/substrate/tendermint/machine/src/lib.rs b/substrate/tendermint/machine/src/lib.rs index cb73bcf0..6efc16fd 100644 --- a/substrate/tendermint/machine/src/lib.rs +++ b/substrate/tendermint/machine/src/lib.rs @@ -278,7 +278,7 @@ impl TendermintMachine { weights: weights.clone(), proposer, - number: BlockNumber(last.0.0 + 1), + number: BlockNumber(last.0 .0 + 1), canonical_start_time: last.1, // The end time of the last block is the start time for this one // The Commit explicitly contains the end time, so loading the last commit will provide