From 2db53d5434fa3dfcdb1ce16c88adf4b022b64714 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sun, 27 Aug 2023 05:01:19 -0400 Subject: [PATCH] Use &self for handle_message and sync_block in Tributary They used &mut self to prevent execution at the same time. This uses a lock over the channel to achieve the same security, without requiring a lock over the entire tributary. This fixes post-provided Provided transactions. sync_block waited for the TX to be provided, yet it never would as sync_block held a mutable reference over the entire Tributary, preventing any other read/write operations of any scope. A timeout increased (bc2f23f72b008c758bde4a17e834bb85aef07569) due to this bug not being identified has been decreased back, thankfully. Also shims in basic support for Completed, which was the WIP before this bug was identified. --- coordinator/src/main.rs | 17 ++++++++------- coordinator/src/tributary/handle.rs | 16 ++++++++++++-- coordinator/src/tributary/mod.rs | 3 +++ coordinator/tributary/src/lib.rs | 33 +++++++++++++++++++---------- tests/coordinator/src/tests/mod.rs | 2 +- 5 files changed, 49 insertions(+), 22 deletions(-) diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 17f89c96..183c010f 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -281,7 +281,7 @@ pub async fn handle_p2p( }; log::trace!("handling message for tributary {:?}", tributary.spec.set()); - if tributary.tributary.write().await.handle_message(&msg.msg).await { + if tributary.tributary.read().await.handle_message(&msg.msg).await { P2p::broadcast(&p2p, msg.kind, msg.msg).await; } } @@ -385,11 +385,7 @@ pub async fn handle_p2p( return; }; - // TODO: Add a check which doesn't require write to see if this is the next block in - // line - // If it's in the future, hold it for up to T time - - let res = tributary.tributary.write().await.sync_block(block, msg.msg).await; + let res = tributary.tributary.read().await.sync_block(block, msg.msg).await; log::debug!("received block from {:?}, sync_block returned {}", msg.sender, res); } }); @@ -518,8 +514,9 @@ pub async fn handle_processors( data: share, signed: Transaction::empty_signed(), })), - // TODO - sign::ProcessorMessage::Completed { .. } => todo!(), + sign::ProcessorMessage::Completed { key: _, id, tx } => { + Some(Transaction::SignCompleted(id, tx, Transaction::empty_signed())) + } }, ProcessorMessage::Coordinator(inner_msg) => match inner_msg { coordinator::ProcessorMessage::SubstrateBlockAck { network, block, plans } => { @@ -624,7 +621,9 @@ pub async fn handle_processors( // If this created a transaction, publish it if let Some(mut tx) = tx { + log::trace!("processor message effected transaction {}", hex::encode(tx.hash())); let tributaries = tributaries.read().await; + log::trace!("read global tributaries"); let Some(tributary) = tributaries.get(&genesis) else { // TODO: This can happen since Substrate tells the Processor to generate commitments // at the same time it tells the Tributary to be created @@ -632,9 +631,11 @@ pub async fn handle_processors( panic!("processor is operating on tributary we don't have"); }; let tributary = tributary.tributary.read().await; + log::trace!("read specific tributary"); match tx.kind() { TransactionKind::Provided(_) => { + log::trace!("providing transaction {}", hex::encode(tx.hash())); let res = tributary.provide_transaction(tx).await; if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) { panic!("provided an invalid transaction: {res:?}"); diff --git a/coordinator/src/tributary/handle.rs b/coordinator/src/tributary/handle.rs index 37fd4108..9f9cda79 100644 --- a/coordinator/src/tributary/handle.rs +++ b/coordinator/src/tributary/handle.rs @@ -589,8 +589,20 @@ pub async fn handle_application_tx< None => {} } } - Transaction::SignCompleted(_, _, _) => { - // TODO + Transaction::SignCompleted(id, tx, signed) => { + // TODO: Confirm this is a valid ID + // TODO: Confirm this signer hasn't prior published a completion + let Some(key_pair) = TributaryDb::::key_pair(txn, spec.set()) else { todo!() }; + processors + .send( + spec.set().network, + CoordinatorMessage::Sign(sign::CoordinatorMessage::Completed { + key: key_pair.1.to_vec(), + id, + tx, + }), + ) + .await; } } } diff --git a/coordinator/src/tributary/mod.rs b/coordinator/src/tributary/mod.rs index b9ab5c38..2c4f4d63 100644 --- a/coordinator/src/tributary/mod.rs +++ b/coordinator/src/tributary/mod.rs @@ -243,6 +243,9 @@ pub enum Transaction { SignPreprocess(SignData), SignShare(SignData), + // TODO: We can't make this an Unsigned as we need to prevent spam, which requires a max of 1 + // claim per sender + // Can we de-duplicate across senders though, if they claim the same hash completes? SignCompleted([u8; 32], Vec, Signed), } diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index b7767181..e63a29d0 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -8,7 +8,7 @@ use zeroize::Zeroizing; use ciphersuite::{Ciphersuite, Ristretto}; use scale::Decode; -use futures::{StreamExt, SinkExt}; +use futures::{StreamExt, SinkExt, channel::mpsc::UnboundedReceiver}; use ::tendermint::{ ext::{BlockNumber, Commit, Block as BlockTrait, Network}, SignedMessageFor, SyncedBlock, SyncedBlockSender, SyncedBlockResultReceiver, MessageSender, @@ -144,7 +144,7 @@ pub struct Tributary { genesis: [u8; 32], network: TendermintNetwork, - synced_block: SyncedBlockSender>, + synced_block: Arc>>>, synced_block_result: Arc>, messages: Arc>>>, } @@ -188,7 +188,7 @@ impl Tributary { db, genesis, network, - synced_block, + synced_block: Arc::new(RwLock::new(synced_block)), synced_block_result: Arc::new(RwLock::new(synced_block_result)), messages: Arc::new(RwLock::new(messages)), }) @@ -239,11 +239,12 @@ impl Tributary { res } - // Sync a block. - // TODO: Since we have a static validator set, we should only need the tail commit? - pub async fn sync_block(&mut self, block: Block, commit: Vec) -> bool { - let mut result = self.synced_block_result.write().await; - + async fn sync_block_internal( + &self, + block: Block, + commit: Vec, + result: &mut UnboundedReceiver, + ) -> bool { let (tip, block_number) = { let blockchain = self.network.blockchain.read().await; (blockchain.tip(), blockchain.block_number()) @@ -272,12 +273,22 @@ impl Tributary { } let number = BlockNumber((block_number + 1).into()); - self.synced_block.send(SyncedBlock { number, block, commit }).await.unwrap(); + self.synced_block.write().await.send(SyncedBlock { number, block, commit }).await.unwrap(); result.next().await.unwrap() } + // Sync a block. + // TODO: Since we have a static validator set, we should only need the tail commit? + pub async fn sync_block(&self, block: Block, commit: Vec) -> bool { + let mut result = self.synced_block_result.write().await; + self.sync_block_internal(block, commit, &mut result).await + } + // Return true if the message should be rebroadcasted. - pub async fn handle_message(&mut self, msg: &[u8]) -> bool { + pub async fn handle_message(&self, msg: &[u8]) -> bool { + // Acquire the lock now to prevent sync_block from being run at the same time + let mut sync_block = self.synced_block_result.write().await; + match msg.first() { Some(&TRANSACTION_MESSAGE) => { let Ok(tx) = Transaction::read::<&[u8]>(&mut &msg[1 ..]) else { @@ -316,7 +327,7 @@ impl Tributary { return false; }; let commit = msg[(msg.len() - msg_ref.len()) ..].to_vec(); - if self.sync_block(block, commit).await { + if self.sync_block_internal(block, commit, &mut sync_block).await { log::debug!("synced block over p2p net instead of building the commit ourselves"); } false diff --git a/tests/coordinator/src/tests/mod.rs b/tests/coordinator/src/tests/mod.rs index 72d30906..0e07d835 100644 --- a/tests/coordinator/src/tests/mod.rs +++ b/tests/coordinator/src/tests/mod.rs @@ -43,6 +43,6 @@ pub(crate) fn new_test() -> (Vec<(Handles, ::F)>, Dock pub(crate) async fn wait_for_tributary() { tokio::time::sleep(Duration::from_secs(20)).await; if std::env::var("GITHUB_CI") == Ok("true".to_string()) { - tokio::time::sleep(Duration::from_secs(60)).await; + tokio::time::sleep(Duration::from_secs(40)).await; } }