From 32df302cc4aadb37c05a9f0c43c31a8bc66faa9a Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Thu, 24 Aug 2023 21:55:59 -0400 Subject: [PATCH] Move recognized_id from a channel to an async lambda Fixes a race condition. Also fixes recognizing batch IDs. --- coordinator/src/db.rs | 4 +- coordinator/src/main.rs | 151 +++++++++++++------------ coordinator/src/tests/tributary/dkg.rs | 31 +++-- coordinator/src/tributary/handle.rs | 26 ++--- coordinator/src/tributary/scanner.rs | 26 +++-- 5 files changed, 128 insertions(+), 110 deletions(-) diff --git a/coordinator/src/db.rs b/coordinator/src/db.rs index aa4a39e0..aac6a513 100644 --- a/coordinator/src/db.rs +++ b/coordinator/src/db.rs @@ -68,7 +68,7 @@ impl<'a, D: Db> MainDb<'a, D> { ) { let key = Self::batches_in_block_key(network, block.0); let Some(mut existing) = txn.get(&key) else { - txn.put(&key, block.0); + txn.put(&key, id); return; }; @@ -77,7 +77,7 @@ impl<'a, D: Db> MainDb<'a, D> { return; } - existing.extend(block.0); + existing.extend(id); txn.put(&key, existing); } diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 54b98c97..221baf7f 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -2,7 +2,7 @@ #![allow(unreachable_code)] #![allow(clippy::diverging_sub_expression)] -use core::ops::Deref; +use core::{ops::Deref, future::Future}; use std::{ sync::Arc, time::{SystemTime, Duration}, @@ -21,13 +21,7 @@ use serai_client::{primitives::NetworkId, Public, Serai}; use message_queue::{Service, client::MessageQueue}; -use tokio::{ - sync::{ - mpsc::{self, UnboundedSender}, - RwLock, - }, - time::sleep, -}; +use tokio::{sync::RwLock, time::sleep}; use ::tributary::{ ReadWrite, ProvidedError, TransactionKind, TransactionTrait, Block, Tributary, TributaryReader, @@ -143,10 +137,16 @@ pub async fn scan_substrate( } #[allow(clippy::type_complexity)] -pub async fn scan_tributaries( +pub async fn scan_tributaries< + D: Db, + Pro: Processors, + P: P2p, + FRid: Future>, + RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32]) -> FRid, +>( raw_db: D, key: Zeroizing<::F>, - recognized_id_send: UnboundedSender<(NetworkId, [u8; 32], RecognizedIdType, [u8; 32])>, + recognized_id: RID, p2p: P, processors: Pro, serai: Arc, @@ -184,10 +184,10 @@ pub async fn scan_tributaries( } for (spec, reader) in &tributary_readers { - tributary::scanner::handle_new_blocks::<_, _, _, _, P>( + tributary::scanner::handle_new_blocks::<_, _, _, _, _, _, P>( &mut tributary_db, &key, - &recognized_id_send, + recognized_id.clone(), &processors, |set, tx| { let serai = serai.clone(); @@ -537,6 +537,12 @@ pub async fn handle_processors( Some(Transaction::SubstrateBlock(block)) } coordinator::ProcessorMessage::BatchPreprocess { id, block, preprocess } => { + log::info!( + "informed of batch (sign ID {}, attempt {}) for block {}", + hex::encode(id.id), + id.attempt, + hex::encode(block), + ); // If this is the first attempt instance, synchronize around the block first if id.attempt == 0 { // Save the preprocess to disk so we can publish it later @@ -678,16 +684,74 @@ pub async fn run( .await; } + // When we reach synchrony on an event requiring signing, send our preprocess for it + let recognized_id = { + let raw_db = raw_db.clone(); + let key = key.clone(); + let tributaries = tributaries.clone(); + move |network, genesis, id_type, id| { + let raw_db = raw_db.clone(); + let key = key.clone(); + let tributaries = tributaries.clone(); + async move { + let (ids, txs) = match id_type { + RecognizedIdType::Block => { + let block = id; + + let ids = MainDb::::batches_in_block(&raw_db, network, block); + let mut txs = vec![]; + for id in &ids { + txs.push(Transaction::BatchPreprocess(SignData { + plan: *id, + attempt: 0, + data: MainDb::::first_preprocess(&raw_db, *id), + signed: Transaction::empty_signed(), + })); + } + (ids, txs) + } + + RecognizedIdType::Plan => ( + vec![id], + vec![Transaction::SignPreprocess(SignData { + plan: id, + attempt: 0, + data: MainDb::::first_preprocess(&raw_db, id), + signed: Transaction::empty_signed(), + })], + ), + }; + + let tributaries = tributaries.read().await; + let Some(tributary) = tributaries.get(&genesis) else { + panic!("tributary we don't have came to consensus on an ExternalBlock"); + }; + let tributary = tributary.tributary.read().await; + + for mut tx in txs { + // TODO: Same note as prior nonce acquisition + log::trace!("getting next nonce for Tributary TX containing Batch signing data"); + let nonce = tributary + .next_nonce(Ristretto::generator() * key.deref()) + .await + .expect("publishing a TX to a tributary we aren't in"); + tx.sign(&mut OsRng, genesis, &key, nonce); + + publish_transaction(&tributary, tx).await; + } + + ids + } + } + }; + // Handle new blocks for each Tributary - // TODO: This channel is unsafe. The Tributary may send an event, which then is marked handled, - // before it actually is. This must be a blocking function. - let (recognized_id_send, mut recognized_id_recv) = mpsc::unbounded_channel(); { let raw_db = raw_db.clone(); tokio::spawn(scan_tributaries( raw_db, key.clone(), - recognized_id_send, + recognized_id, p2p.clone(), processors.clone(), serai.clone(), @@ -695,61 +759,6 @@ pub async fn run( )); } - // When we reach consensus on a new external block, send our BatchPreprocess for it - tokio::spawn({ - let raw_db = raw_db.clone(); - let key = key.clone(); - let tributaries = tributaries.clone(); - async move { - loop { - if let Some((network, genesis, id_type, id)) = recognized_id_recv.recv().await { - let txs = match id_type { - RecognizedIdType::Block => { - let mut txs = vec![]; - for id in MainDb::::batches_in_block(&raw_db, network, id) { - txs.push(Transaction::BatchPreprocess(SignData { - plan: id, - attempt: 0, - data: MainDb::::first_preprocess(&raw_db, id), - signed: Transaction::empty_signed(), - })); - } - txs - } - - RecognizedIdType::Plan => vec![Transaction::SignPreprocess(SignData { - plan: id, - attempt: 0, - data: MainDb::::first_preprocess(&raw_db, id), - signed: Transaction::empty_signed(), - })], - }; - - let tributaries = tributaries.read().await; - let Some(tributary) = tributaries.get(&genesis) else { - panic!("tributary we don't have came to consensus on an ExternalBlock"); - }; - let tributary = tributary.tributary.read().await; - - for mut tx in txs { - // TODO: Same note as prior nonce acquisition - log::trace!("getting next nonce for Tributary TX containing Batch signing data"); - let nonce = tributary - .next_nonce(Ristretto::generator() * key.deref()) - .await - .expect("publishing a TX to a tributary we aren't in"); - tx.sign(&mut OsRng, genesis, &key, nonce); - - publish_transaction(&tributary, tx).await; - } - } else { - log::warn!("recognized_id_send was dropped. are we shutting down?"); - break; - } - } - } - }); - // Spawn the heartbeat task, which will trigger syncing if there hasn't been a Tributary block // in a while (presumably because we're behind) tokio::spawn(heartbeat_tributaries(p2p.clone(), tributaries.clone())); diff --git a/coordinator/src/tests/tributary/dkg.rs b/coordinator/src/tests/tributary/dkg.rs index 03aaeda0..7da9fdf2 100644 --- a/coordinator/src/tests/tributary/dkg.rs +++ b/coordinator/src/tests/tributary/dkg.rs @@ -11,7 +11,7 @@ use frost::Participant; use sp_runtime::traits::Verify; -use tokio::{time::sleep, sync::mpsc}; +use tokio::time::sleep; use serai_db::{DbTxn, Db, MemDb}; @@ -83,11 +83,12 @@ async fn dkg_test() { ) -> (TributaryDb, MemProcessors) { let mut scanner_db = TributaryDb(MemDb::new()); let processors = MemProcessors::new(); - // Uses a brand new channel since this channel won't be used within this test - handle_new_blocks::<_, _, _, _, LocalP2p>( + handle_new_blocks::<_, _, _, _, _, _, LocalP2p>( &mut scanner_db, key, - &mpsc::unbounded_channel().0, + |_, _, _, _| async { + panic!("provided TX caused recognized_id to be called in new_processors") + }, &processors, |_, _| async { panic!("test tried to publish a new Serai TX in new_processors") }, spec, @@ -108,10 +109,12 @@ async fn dkg_test() { sleep(Duration::from_secs(Tributary::::block_time().into())).await; // Verify the scanner emits a KeyGen::Commitments message - handle_new_blocks::<_, _, _, _, LocalP2p>( + handle_new_blocks::<_, _, _, _, _, _, LocalP2p>( &mut scanner_db, &keys[0], - &mpsc::unbounded_channel().0, + |_, _, _, _| async { + panic!("provided TX caused recognized_id to be called after Commitments") + }, &processors, |_, _| async { panic!("test tried to publish a new Serai TX after Commitments") }, &spec, @@ -186,10 +189,12 @@ async fn dkg_test() { } // With just 4 sets of shares, nothing should happen yet - handle_new_blocks::<_, _, _, _, LocalP2p>( + handle_new_blocks::<_, _, _, _, _, _, LocalP2p>( &mut scanner_db, &keys[0], - &mpsc::unbounded_channel().0, + |_, _, _, _| async { + panic!("provided TX caused recognized_id to be called after some shares") + }, &processors, |_, _| async { panic!("test tried to publish a new Serai TX after some shares") }, &spec, @@ -227,10 +232,10 @@ async fn dkg_test() { }; // Any scanner which has handled the prior blocks should only emit the new event - handle_new_blocks::<_, _, _, _, LocalP2p>( + handle_new_blocks::<_, _, _, _, _, _, LocalP2p>( &mut scanner_db, &keys[0], - &mpsc::unbounded_channel().0, + |_, _, _, _| async { panic!("provided TX caused recognized_id to be called after shares") }, &processors, |_, _| async { panic!("test tried to publish a new Serai TX") }, &spec, @@ -294,10 +299,12 @@ async fn dkg_test() { } // The scanner should successfully try to publish a transaction with a validly signed signature - handle_new_blocks::<_, _, _, _, LocalP2p>( + handle_new_blocks::<_, _, _, _, _, _, LocalP2p>( &mut scanner_db, &keys[0], - &mpsc::unbounded_channel().0, + |_, _, _, _| async { + panic!("provided TX caused recognized_id to be called after DKG confirmation") + }, &processors, |set, tx| { let spec = spec.clone(); diff --git a/coordinator/src/tributary/handle.rs b/coordinator/src/tributary/handle.rs index 51fbb647..ae29bc0d 100644 --- a/coordinator/src/tributary/handle.rs +++ b/coordinator/src/tributary/handle.rs @@ -15,8 +15,6 @@ use frost::{ }; use frost_schnorrkel::Schnorrkel; -use tokio::sync::mpsc::UnboundedSender; - use serai_client::{ Signature, primitives::NetworkId, @@ -224,8 +222,10 @@ pub fn generated_key_pair( pub async fn handle_application_tx< D: Db, Pro: Processors, - F: Future, - PST: Clone + Fn(ValidatorSet, Encoded) -> F, + FPst: Future, + PST: Clone + Fn(ValidatorSet, Encoded) -> FPst, + FRid: Future>, + RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32]) -> FRid, >( tx: Transaction, spec: &TributarySpec, @@ -233,7 +233,7 @@ pub async fn handle_application_tx< publish_serai_tx: PST, genesis: [u8; 32], key: &Zeroizing<::F>, - recognized_id: &UnboundedSender<(NetworkId, [u8; 32], RecognizedIdType, [u8; 32])>, + recognized_id: RID, txn: &mut ::Transaction<'_>, ) { // Used to determine if an ID is acceptable @@ -431,11 +431,10 @@ pub async fn handle_application_tx< } Transaction::ExternalBlock(block) => { - // Because this external block has been finalized, its batch ID should be authorized - TributaryDb::::recognize_id(txn, Zone::Batch.label(), genesis, block); - recognized_id - .send((spec.set().network, genesis, RecognizedIdType::Block, block)) - .expect("recognized_id_recv was dropped. are we shutting down?"); + // Because this external block has been finalized, its batch IDs should be authorized + for id in recognized_id(spec.set().network, genesis, RecognizedIdType::Block, block).await { + TributaryDb::::recognize_id(txn, Zone::Batch.label(), genesis, id); + } } Transaction::SubstrateBlock(block) => { @@ -446,9 +445,10 @@ pub async fn handle_application_tx< for id in plan_ids { TributaryDb::::recognize_id(txn, Zone::Sign.label(), genesis, id); - recognized_id - .send((spec.set().network, genesis, RecognizedIdType::Plan, id)) - .expect("recognized_id_recv was dropped. are we shutting down?"); + assert_eq!( + recognized_id(spec.set().network, genesis, RecognizedIdType::Plan, id).await, + vec![id] + ); } } diff --git a/coordinator/src/tributary/scanner.rs b/coordinator/src/tributary/scanner.rs index a6894e8a..0dd07a02 100644 --- a/coordinator/src/tributary/scanner.rs +++ b/coordinator/src/tributary/scanner.rs @@ -8,8 +8,6 @@ use serai_client::{ primitives::NetworkId, validator_sets::primitives::ValidatorSet, subxt::utils::Encoded, }; -use tokio::sync::mpsc::UnboundedSender; - use tributary::{ Transaction as TributaryTransaction, Block, TributaryReader, tendermint::{ @@ -39,13 +37,15 @@ pub enum RecognizedIdType { async fn handle_block< D: Db, Pro: Processors, - F: Future, - PST: Clone + Fn(ValidatorSet, Encoded) -> F, + FPst: Future, + PST: Clone + Fn(ValidatorSet, Encoded) -> FPst, + FRid: Future>, + RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32]) -> FRid, P: P2p, >( db: &mut TributaryDb, key: &Zeroizing<::F>, - recognized_id: &UnboundedSender<(NetworkId, [u8; 32], RecognizedIdType, [u8; 32])>, + recognized_id: RID, processors: &Pro, publish_serai_tx: PST, spec: &TributarySpec, @@ -79,14 +79,14 @@ async fn handle_block< // TODO: disconnect the node from network/ban from further participation in Tributary } TributaryTransaction::Application(tx) => { - handle_application_tx::( + handle_application_tx::( tx, spec, processors, publish_serai_tx.clone(), genesis, key, - recognized_id, + recognized_id.clone(), &mut txn, ) .await; @@ -105,13 +105,15 @@ async fn handle_block< pub async fn handle_new_blocks< D: Db, Pro: Processors, - F: Future, - PST: Clone + Fn(ValidatorSet, Encoded) -> F, + FPst: Future, + PST: Clone + Fn(ValidatorSet, Encoded) -> FPst, + FRid: Future>, + RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32]) -> FRid, P: P2p, >( db: &mut TributaryDb, key: &Zeroizing<::F>, - recognized_id: &UnboundedSender<(NetworkId, [u8; 32], RecognizedIdType, [u8; 32])>, + recognized_id: RID, processors: &Pro, publish_serai_tx: PST, spec: &TributarySpec, @@ -121,10 +123,10 @@ pub async fn handle_new_blocks< let mut last_block = db.last_block(genesis); while let Some(next) = tributary.block_after(&last_block) { let block = tributary.block(&next).unwrap(); - handle_block::<_, _, _, _, P>( + handle_block::<_, _, _, _, _, _, P>( db, key, - recognized_id, + recognized_id.clone(), processors, publish_serai_tx.clone(), spec,