Move from polling loops to a pure IO model for sc_tendermint's gossip

This commit is contained in:
Luke Parker
2022-11-08 02:14:39 -05:00
parent c31a55cce0
commit 5dab3352f2
2 changed files with 81 additions and 60 deletions

View File

@@ -49,6 +49,8 @@ impl<T: TendermintValidator> Validator<T::Block> for TendermintGossip<T> {
return ValidationResult::Discard; return ValidationResult::Discard;
} }
// Verify the signature here so we don't carry invalid messages in our gossip layer
// This will cause double verification of the signature, yet that's a minimal cost
if !msg.verify_signature(&self.signature_scheme) { if !msg.verify_signature(&self.signature_scheme) {
return ValidationResult::Discard; return ValidationResult::Discard;
} }

View File

@@ -5,9 +5,12 @@ use std::{
use async_trait::async_trait; use async_trait::async_trait;
use log::warn; use log::{warn, error};
use tokio::task::yield_now; use futures::{
StreamExt,
channel::mpsc::{self, UnboundedSender},
};
use sp_core::{Encode, Decode}; use sp_core::{Encode, Decode};
use sp_keystore::CryptoStore; use sp_keystore::CryptoStore;
@@ -52,13 +55,11 @@ use import_future::ImportFuture;
struct ActiveAuthority<T: TendermintValidator> { struct ActiveAuthority<T: TendermintValidator> {
signer: TendermintSigner<T>, signer: TendermintSigner<T>,
// Block whose gossip is being tracked // Notification channel for when we start a new number
number: Arc<RwLock<u64>>, new_number: UnboundedSender<u64>,
// Outgoing message queue, placed here as the GossipEngine itself can't be // Outgoing message queue, placed here as the GossipEngine itself can't be
gossip_queue: Arc< gossip: UnboundedSender<
RwLock< SignedMessage<u16, T::Block, <TendermintValidators<T> as SignatureScheme>::Signature>,
Vec<SignedMessage<u16, T::Block, <TendermintValidators<T> as SignatureScheme>::Signature>>,
>,
>, >,
// Block producer // Block producer
@@ -148,21 +149,26 @@ impl<T: TendermintValidator> TendermintAuthority<T> {
registry: Option<&Registry>, registry: Option<&Registry>,
) { ) {
let (best_hash, last) = self.get_last(); let (best_hash, last) = self.get_last();
let mut last_number = last.0 .0 + 1; let new_number = last.0 .0 + 1;
// Shared references between us and the Tendermint machine (and its actions via its Network // Shared references between us and the Tendermint machine (and its actions via its Network
// trait) // trait)
let number = Arc::new(RwLock::new(last_number)); let number = Arc::new(RwLock::new(new_number));
let gossip_queue = Arc::new(RwLock::new(vec![]));
// Create the gossip network // Create the gossip network
let mut gossip = GossipEngine::new( let mut gossip = GossipEngine::new(
network.clone(), network.clone(),
PROTOCOL_NAME, PROTOCOL_NAME,
protocol,
Arc::new(TendermintGossip::new(number.clone(), self.import.validators.clone())), Arc::new(TendermintGossip::new(number.clone(), self.import.validators.clone())),
registry, registry,
); );
// This should only have a single value, yet a bounded channel with a capacity of 1 would cause
// a firm bound. It's not worth having a backlog crash the node since we aren't constrained
let (new_number_tx, mut new_number_rx) = mpsc::unbounded();
let (gossip_tx, mut gossip_rx) = mpsc::unbounded();
// Create the Tendermint machine // Create the Tendermint machine
let handle = { let handle = {
// Set this struct as active // Set this struct as active
@@ -170,8 +176,8 @@ impl<T: TendermintValidator> TendermintAuthority<T> {
self.active = Some(ActiveAuthority { self.active = Some(ActiveAuthority {
signer: TendermintSigner(keys, self.import.validators.clone()), signer: TendermintSigner(keys, self.import.validators.clone()),
number: number.clone(), new_number: new_number_tx,
gossip_queue: gossip_queue.clone(), gossip: gossip_tx,
env, env,
announce: network, announce: network,
@@ -186,55 +192,51 @@ impl<T: TendermintValidator> TendermintAuthority<T> {
}; };
// Start receiving messages about the Tendermint process for this block // Start receiving messages about the Tendermint process for this block
let mut recv = gossip.messages_for(TendermintGossip::<T>::topic(last_number)); let mut recv = gossip.messages_for(TendermintGossip::<T>::topic(new_number));
'outer: loop { loop {
// Send out any queued messages futures::select_biased! {
let mut queue = gossip_queue.write().unwrap().drain(..).collect::<Vec<_>>(); // GossipEngine closed down
for msg in queue.drain(..) { _ = gossip => break,
gossip.gossip_message(TendermintGossip::<T>::topic(msg.number().0), msg.encode(), false);
}
// Handle any received messages // Machine reached a new height
// This inner loop enables handling all pending messages before acquiring the out-queue lock new_number = new_number_rx.next() => {
// again if let Some(new_number) = new_number {
// TODO: Move to a select model. The disadvantage of this is we'll more frequently acquire *number.write().unwrap() = new_number;
// the above lock, despite lack of reason to do so recv = gossip.messages_for(TendermintGossip::<T>::topic(new_number));
let _ = futures::poll!(&mut gossip); } else {
'inner: loop { break;
match recv.try_next() { }
Ok(Some(msg)) => handle },
.messages
.send(match SignedMessage::decode(&mut msg.message.as_ref()) {
Ok(msg) => msg,
Err(e) => {
warn!(target: "tendermint", "Couldn't decode valid message: {}", e);
continue;
}
})
.await
.unwrap(),
// Ok(None) IS NOT when there aren't messages available. It's when the channel is closed // Message to broadcast
// If we're no longer receiving messages from the network, it must no longer be running msg = gossip_rx.next() => {
// We should no longer be accordingly if let Some(msg) = msg {
Ok(None) => break 'outer, let topic = TendermintGossip::<T>::topic(msg.number().0);
gossip.gossip_message(topic, msg.encode(), false);
} else {
break;
}
},
// No messages available // Received a message
Err(_) => { msg = recv.next() => {
// Check if we the block updated and should be listening on a different topic if let Some(msg) = msg {
let curr = *number.read().unwrap(); handle
if last_number != curr { .messages
last_number = curr; .send(match SignedMessage::decode(&mut msg.message.as_ref()) {
// TODO: Will this return existing messages on the new height? Or will those have Ok(msg) => msg,
// been ignored and are now gone? Err(e) => {
recv = gossip.messages_for(TendermintGossip::<T>::topic(last_number)); // This is guaranteed to be valid thanks to to the gossip validator, assuming
} // that pipeline is correct. That's why this doesn't panic
error!(target: "tendermint", "Couldn't decode valid message: {}", e);
// If there are no messages available, yield to not hog the thread, then return to the continue;
// outer loop }
yield_now().await; })
break 'inner; .await
.unwrap()
} else {
break;
} }
} }
} }
@@ -267,7 +269,13 @@ impl<T: TendermintValidator> Network for TendermintAuthority<T> {
&mut self, &mut self,
msg: SignedMessage<u16, Self::Block, <TendermintValidators<T> as SignatureScheme>::Signature>, msg: SignedMessage<u16, Self::Block, <TendermintValidators<T> as SignatureScheme>::Signature>,
) { ) {
self.active.as_mut().unwrap().gossip_queue.write().unwrap().push(msg); if self.active.as_mut().unwrap().gossip.unbounded_send(msg).is_err() {
warn!(
target: "tendermint",
"Attempted to broadcast a message except the gossip channel is closed. {}",
"Is the node shutting down?"
);
}
} }
async fn slash(&mut self, _validator: u16) { async fn slash(&mut self, _validator: u16) {
@@ -344,7 +352,18 @@ impl<T: TendermintValidator> Network for TendermintAuthority<T> {
.finalize_block(BlockId::Hash(hash), Some(justification), true) .finalize_block(BlockId::Hash(hash), Some(justification), true)
.map_err(|_| Error::InvalidJustification) .map_err(|_| Error::InvalidJustification)
.unwrap(); .unwrap();
*self.active.as_mut().unwrap().number.write().unwrap() += 1;
let number: u64 = match (*block.header().number()).try_into() {
Ok(number) => number,
Err(_) => panic!("BlockNumber exceeded u64"),
};
if self.active.as_mut().unwrap().new_number.unbounded_send(number + 1).is_err() {
warn!(
target: "tendermint",
"Attempted to send a new number to the gossip handler except it's closed. {}",
"Is the node shutting down?"
);
}
self.active.as_ref().unwrap().announce.announce_block(hash, None); self.active.as_ref().unwrap().announce.announce_block(hash, None);
self.get_proposal(block.header()).await self.get_proposal(block.header()).await