From 086458d041ef863aa85428d50926b017fcf347e3 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Wed, 27 Sep 2023 00:00:31 -0400 Subject: [PATCH] Txn for handling a processor message handle_processor_messages function added to remove a very large block of nested code. MainDb cleaned to never be instantiated. --- coordinator/src/db.rs | 29 +- coordinator/src/main.rs | 664 ++++++++++++++++++++-------------------- 2 files changed, 335 insertions(+), 358 deletions(-) diff --git a/coordinator/src/db.rs b/coordinator/src/db.rs index 43bd4cb0..4393037c 100644 --- a/coordinator/src/db.rs +++ b/coordinator/src/db.rs @@ -1,3 +1,5 @@ +use core::marker::PhantomData; + use scale::{Encode, Decode}; use serai_client::{primitives::NetworkId, in_instructions::primitives::SignedBatch}; @@ -6,12 +8,8 @@ pub use serai_db::*; use crate::tributary::TributarySpec; #[derive(Debug)] -pub struct MainDb<'a, D: Db>(&'a mut D); -impl<'a, D: Db> MainDb<'a, D> { - pub fn new(db: &'a mut D) -> Self { - Self(db) - } - +pub struct MainDb(PhantomData); +impl MainDb { fn main_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec { D::key(b"coordinator_main", dst, key) } @@ -29,8 +27,8 @@ impl<'a, D: Db> MainDb<'a, D> { fn acive_tributaries_key() -> Vec { Self::main_key(b"active_tributaries", []) } - pub fn active_tributaries(&self) -> (Vec, Vec) { - let bytes = self.0.get(Self::acive_tributaries_key()).unwrap_or(vec![]); + pub fn active_tributaries(getter: &G) -> (Vec, Vec) { + let bytes = getter.get(Self::acive_tributaries_key()).unwrap_or(vec![]); let mut bytes_ref: &[u8] = bytes.as_ref(); let mut tributaries = vec![]; @@ -40,9 +38,9 @@ impl<'a, D: Db> MainDb<'a, D> { (bytes, tributaries) } - pub fn add_active_tributary(&mut self, spec: &TributarySpec) { + pub fn add_active_tributary(txn: &mut D::Transaction<'_>, spec: &TributarySpec) { let key = Self::acive_tributaries_key(); - let (mut existing_bytes, existing) = self.active_tributaries(); + let (mut existing_bytes, existing) = Self::active_tributaries(txn); for tributary in &existing { if tributary == spec { return; @@ -50,9 +48,7 @@ impl<'a, D: Db> MainDb<'a, D> { } spec.write(&mut existing_bytes).unwrap(); - let mut txn = self.0.txn(); txn.put(key, existing_bytes); - txn.commit(); } fn first_preprocess_key(id: [u8; 32]) -> Vec { @@ -73,14 +69,11 @@ impl<'a, D: Db> MainDb<'a, D> { fn batch_key(network: NetworkId, id: u32) -> Vec { Self::main_key(b"batch", (network, id).encode()) } - pub fn save_batch(&mut self, batch: SignedBatch) { - let mut txn = self.0.txn(); + pub fn save_batch(txn: &mut D::Transaction<'_>, batch: SignedBatch) { txn.put(Self::batch_key(batch.batch.network, batch.batch.id), batch.encode()); - txn.commit(); } - pub fn batch(&self, network: NetworkId, id: u32) -> Option { - self - .0 + pub fn batch(getter: &G, network: NetworkId, id: u32) -> Option { + getter .get(Self::batch_key(network, id)) .map(|batch| SignedBatch::decode(&mut batch.as_ref()).unwrap()) } diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index d08a43d1..b21b9a19 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -99,7 +99,7 @@ async fn add_tributary( spec.n(), spec .i(Ristretto::generator() * key.deref()) - .expect("adding a tribtuary for a set we aren't in set for"), + .expect("adding a tributary for a set we aren't in set for"), ) .unwrap(), }, @@ -173,7 +173,9 @@ pub async fn scan_substrate( log::info!("creating new tributary for {:?}", spec.set()); // Save it to the database - MainDb::new(db).add_active_tributary(&spec); + let mut txn = db.txn(); + MainDb::::add_active_tributary(&mut txn, &spec); + txn.commit(); // If we reboot before this is read, the fact it was saved to the database means it'll be // handled on reboot @@ -513,6 +515,305 @@ pub async fn publish_signed_transaction( } } +async fn handle_processor_messages( + mut db: D, + key: Zeroizing<::F>, + serai: Arc, + mut processors: Pro, + tributary: ActiveTributary, + mut recv: mpsc::UnboundedReceiver, +) { + let db_clone = db.clone(); // Enables cloning the DB while we have a txn + let pub_key = Ristretto::generator() * key.deref(); + + let ActiveTributary { spec, tributary } = tributary; + let genesis = spec.genesis(); + + loop { + let msg: processors::Message = recv.recv().await.unwrap(); + + if !MainDb::::handled_message(&db, msg.id) { + let mut txn = db.txn(); + + // TODO: We probably want to NOP here, not panic? + // TODO: We do have to track produced Batches in order to ensure their integrity + let my_i = spec.i(pub_key).expect("processor message for network we aren't a validator in"); + + let tx = match msg.msg.clone() { + ProcessorMessage::KeyGen(inner_msg) => match inner_msg { + key_gen::ProcessorMessage::Commitments { id, commitments } => { + Some(Transaction::DkgCommitments(id.attempt, commitments, Transaction::empty_signed())) + } + key_gen::ProcessorMessage::Shares { id, mut shares } => { + // Create a MuSig-based machine to inform Substrate of this key generation + let nonces = crate::tributary::dkg_confirmation_nonces(&key, &spec, id.attempt); + + let mut tx_shares = Vec::with_capacity(shares.len()); + for i in 1 ..= spec.n() { + let i = Participant::new(i).unwrap(); + if i == my_i { + continue; + } + tx_shares.push( + shares.remove(&i).expect("processor didn't send share for another validator"), + ); + } + + Some(Transaction::DkgShares { + attempt: id.attempt, + shares: tx_shares, + confirmation_nonces: nonces, + signed: Transaction::empty_signed(), + }) + } + key_gen::ProcessorMessage::GeneratedKeyPair { id, substrate_key, network_key } => { + assert_eq!( + id.set.network, msg.network, + "processor claimed to be a different network than it was for GeneratedKeyPair", + ); + // TODO: Also check the other KeyGenId fields + + // Tell the Tributary the key pair, get back the share for the MuSig + // signature + let share = crate::tributary::generated_key_pair::( + &mut txn, + &key, + &spec, + &(Public(substrate_key), network_key.try_into().unwrap()), + id.attempt, + ); + + match share { + Ok(share) => { + Some(Transaction::DkgConfirmed(id.attempt, share, Transaction::empty_signed())) + } + Err(p) => { + todo!("participant {p:?} sent invalid DKG confirmation preprocesses") + } + } + } + }, + ProcessorMessage::Sign(msg) => match msg { + sign::ProcessorMessage::Preprocess { id, preprocess } => { + if id.attempt == 0 { + MainDb::::save_first_preprocess(&mut txn, id.id, preprocess); + + None + } else { + Some(Transaction::SignPreprocess(SignData { + plan: id.id, + attempt: id.attempt, + data: preprocess, + signed: Transaction::empty_signed(), + })) + } + } + sign::ProcessorMessage::Share { id, share } => Some(Transaction::SignShare(SignData { + plan: id.id, + attempt: id.attempt, + data: share, + signed: Transaction::empty_signed(), + })), + sign::ProcessorMessage::Completed { key: _, id, tx } => { + let r = Zeroizing::new(::F::random(&mut OsRng)); + #[allow(non_snake_case)] + let R = ::generator() * r.deref(); + let mut tx = Transaction::SignCompleted { + plan: id, + tx_hash: tx, + first_signer: pub_key, + signature: SchnorrSignature { R, s: ::F::ZERO }, + }; + let signed = SchnorrSignature::sign(&key, r, tx.sign_completed_challenge()); + match &mut tx { + Transaction::SignCompleted { signature, .. } => { + *signature = signed; + } + _ => unreachable!(), + } + Some(tx) + } + }, + ProcessorMessage::Coordinator(inner_msg) => match inner_msg { + coordinator::ProcessorMessage::SubstrateBlockAck { network, block, plans } => { + assert_eq!( + network, msg.network, + "processor claimed to be a different network than it was for SubstrateBlockAck", + ); + + // TODO: This needs to be scoped per multisig + TributaryDb::::set_plan_ids(&mut txn, genesis, block, &plans); + + Some(Transaction::SubstrateBlock(block)) + } + coordinator::ProcessorMessage::BatchPreprocess { id, block, preprocess } => { + log::info!( + "informed of batch (sign ID {}, attempt {}) for block {}", + hex::encode(id.id), + id.attempt, + hex::encode(block), + ); + // If this is the first attempt instance, wait until we synchronize around + // the batch first + if id.attempt == 0 { + MainDb::::save_first_preprocess(&mut txn, id.id, preprocess); + + Some(Transaction::Batch(block.0, id.id)) + } else { + Some(Transaction::BatchPreprocess(SignData { + plan: id.id, + attempt: id.attempt, + data: preprocess, + signed: Transaction::empty_signed(), + })) + } + } + coordinator::ProcessorMessage::BatchShare { id, share } => { + Some(Transaction::BatchShare(SignData { + plan: id.id, + attempt: id.attempt, + data: share.to_vec(), + signed: Transaction::empty_signed(), + })) + } + }, + ProcessorMessage::Substrate(inner_msg) => match inner_msg { + processor_messages::substrate::ProcessorMessage::Update { batch } => { + assert_eq!( + batch.batch.network, msg.network, + "processor sent us a batch for a different network than it was for", + ); + // TODO: Check this key's key pair's substrate key is authorized to publish + // batches + + // Save this batch to the disk + MainDb::::save_batch(&mut txn, batch); + + /* + Use a dedicated task to publish batches due to the latency potentially + incurred. + + This does not guarantee the batch has actually been published when the + message is `ack`ed to message-queue. Accordingly, if we reboot, these + batches would be dropped (as we wouldn't see the `Update` again, triggering + our re-attempt to publish). + + The solution to this is to have the task try not to publish the batch which + caused it to be spawned, yet all saved batches which have yet to published. + This does risk having multiple tasks trying to publish all pending batches, + yet these aren't notably complex. + */ + tokio::spawn({ + let db = db_clone.clone(); + let serai = serai.clone(); + let network = msg.network; + async move { + // Since we have a new batch, publish all batches yet to be published to + // Serai + // This handles the edge-case where batch n+1 is signed before batch n is + while let Some(batch) = { + // Get the next-to-execute batch ID + let next = { + let mut first = true; + loop { + if !first { + log::error!( + "{} {network:?}", + "couldn't connect to Serai node to get the next batch ID for", + ); + tokio::time::sleep(Duration::from_secs(5)).await; + } + first = false; + + let Ok(latest_block) = serai.get_latest_block().await else { + continue; + }; + let Ok(last) = + serai.get_last_batch_for_network(latest_block.hash(), network).await + else { + continue; + }; + break if let Some(last) = last { last + 1 } else { 0 }; + } + }; + + // If we have this batch, attempt to publish it + MainDb::::batch(&db, network, next) + } { + let id = batch.batch.id; + let block = batch.batch.block; + + let tx = Serai::execute_batch(batch); + // This publish may fail if this transactions already exists in the + // mempool, which is possible, or if this batch was already executed + // on-chain + // Either case will have eventual resolution and be handled by the + // above check on if this batch should execute + if serai.publish(&tx).await.is_ok() { + log::info!("published batch {network:?} {id} (block {})", hex::encode(block)); + } + } + } + }); + + None + } + }, + }; + + // If this created a transaction, publish it + // TODO: This block may be fired multiple times, with the Tributary maintaining its + // own txns. How safe is that? + if let Some(mut tx) = tx { + log::trace!("processor message effected transaction {}", hex::encode(tx.hash())); + + match tx.kind() { + TransactionKind::Provided(_) => { + log::trace!("providing transaction {}", hex::encode(tx.hash())); + let res = tributary.provide_transaction(tx).await; + if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) { + panic!("provided an invalid transaction: {res:?}"); + } + } + TransactionKind::Unsigned => { + log::trace!("publishing unsigned transaction {}", hex::encode(tx.hash())); + // Ignores the result since we can't differentiate already in-mempool from + // already on-chain from invalid + // TODO: Don't ignore the result + tributary.add_transaction(tx).await; + } + TransactionKind::Signed(_) => { + log::trace!("getting next nonce for Tributary TX in response to processor message"); + + let nonce = loop { + let Some(nonce) = + NonceDecider::::nonce(&txn, genesis, &tx).expect("signed TX didn't have nonce") + else { + // This can be None if: + // 1) We scanned the relevant transaction(s) in a Tributary block + // 2) The processor was sent a message and responded + // 3) The Tributary TXN has yet to be committed + log::warn!("nonce has yet to be saved for processor-instigated transaction"); + sleep(Duration::from_millis(100)).await; + continue; + }; + break nonce; + }; + tx.sign(&mut OsRng, genesis, &key, nonce); + + publish_signed_transaction(&tributary, tx).await; + } + } + } + + MainDb::::save_handled_message(&mut txn, msg.id); + txn.commit(); + } + + processors.ack(msg).await; + } +} + pub async fn handle_processors( db: D, key: Zeroizing<::F>, @@ -520,9 +821,8 @@ pub async fn handle_processors( mut processors: Pro, mut new_tributary: broadcast::Receiver>, ) { - let pub_key = Ristretto::generator() * key.deref(); - let channels = Arc::new(RwLock::new(HashMap::new())); + // Listen to new tributary events tokio::spawn({ let db = db.clone(); let processors = processors.clone(); @@ -530,339 +830,27 @@ pub async fn handle_processors( async move { loop { let channels = channels.clone(); - let ActiveTributary { spec, tributary } = new_tributary.recv().await.unwrap(); - let genesis = spec.genesis(); - tokio::spawn({ - let mut db = db.clone(); - let key = key.clone(); - let serai = serai.clone(); - let mut processors = processors.clone(); - async move { - let (send, mut recv) = mpsc::unbounded_channel(); - // TODO: Support multisig rotation (not per-Tributary yet per-network?) - channels.write().await.insert(spec.set().network, send); + let tributary = new_tributary.recv().await.unwrap(); - loop { - let msg: processors::Message = recv.recv().await.unwrap(); + let (send, recv) = mpsc::unbounded_channel(); + // TODO: Support multisig rotation (not per-Tributary yet per-network?) + channels.write().await.insert(tributary.spec.set().network, send); - // TODO: We probably want to NOP here, not panic? - // TODO: We do have to track produced Batches in order to ensure their integrity - let my_i = - spec.i(pub_key).expect("processor message for network we aren't a validator in"); - - let tx = match msg.msg.clone() { - ProcessorMessage::KeyGen(inner_msg) => match inner_msg { - key_gen::ProcessorMessage::Commitments { id, commitments } => { - Some(Transaction::DkgCommitments( - id.attempt, - commitments, - Transaction::empty_signed(), - )) - } - key_gen::ProcessorMessage::Shares { id, mut shares } => { - // Create a MuSig-based machine to inform Substrate of this key generation - let nonces = crate::tributary::dkg_confirmation_nonces(&key, &spec, id.attempt); - - let mut tx_shares = Vec::with_capacity(shares.len()); - for i in 1 ..= spec.n() { - let i = Participant::new(i).unwrap(); - if i == my_i { - continue; - } - tx_shares.push( - shares - .remove(&i) - .expect("processor didn't send share for another validator"), - ); - } - - Some(Transaction::DkgShares { - attempt: id.attempt, - shares: tx_shares, - confirmation_nonces: nonces, - signed: Transaction::empty_signed(), - }) - } - key_gen::ProcessorMessage::GeneratedKeyPair { - id, - substrate_key, - network_key, - } => { - assert_eq!( - id.set.network, msg.network, - "processor claimed to be a different network than it was for GeneratedKeyPair", - ); - // TODO: Also check the other KeyGenId fields - - // Tell the Tributary the key pair, get back the share for the MuSig signature - let mut txn = db.txn(); - let share = crate::tributary::generated_key_pair::( - &mut txn, - &key, - &spec, - &(Public(substrate_key), network_key.try_into().unwrap()), - id.attempt, - ); - txn.commit(); - - match share { - Ok(share) => Some(Transaction::DkgConfirmed( - id.attempt, - share, - Transaction::empty_signed(), - )), - Err(p) => { - todo!("participant {p:?} sent invalid DKG confirmation preprocesses") - } - } - } - }, - ProcessorMessage::Sign(msg) => match msg { - sign::ProcessorMessage::Preprocess { id, preprocess } => { - if id.attempt == 0 { - let mut txn = db.txn(); - MainDb::::save_first_preprocess(&mut txn, id.id, preprocess); - txn.commit(); - - None - } else { - Some(Transaction::SignPreprocess(SignData { - plan: id.id, - attempt: id.attempt, - data: preprocess, - signed: Transaction::empty_signed(), - })) - } - } - sign::ProcessorMessage::Share { id, share } => { - Some(Transaction::SignShare(SignData { - plan: id.id, - attempt: id.attempt, - data: share, - signed: Transaction::empty_signed(), - })) - } - sign::ProcessorMessage::Completed { key: _, id, tx } => { - let r = Zeroizing::new(::F::random(&mut OsRng)); - #[allow(non_snake_case)] - let R = ::generator() * r.deref(); - let mut tx = Transaction::SignCompleted { - plan: id, - tx_hash: tx, - first_signer: pub_key, - signature: SchnorrSignature { R, s: ::F::ZERO }, - }; - let signed = SchnorrSignature::sign(&key, r, tx.sign_completed_challenge()); - match &mut tx { - Transaction::SignCompleted { signature, .. } => { - *signature = signed; - } - _ => unreachable!(), - } - Some(tx) - } - }, - ProcessorMessage::Coordinator(inner_msg) => match inner_msg { - coordinator::ProcessorMessage::SubstrateBlockAck { network, block, plans } => { - assert_eq!( - network, msg.network, - "processor claimed to be a different network than it was for SubstrateBlockAck", - ); - - // Safe to use its own txn since this is static and just needs to be written - // before we provide SubstrateBlock - let mut txn = db.txn(); - // TODO: This needs to be scoped per multisig - TributaryDb::::set_plan_ids(&mut txn, genesis, block, &plans); - txn.commit(); - - Some(Transaction::SubstrateBlock(block)) - } - coordinator::ProcessorMessage::BatchPreprocess { id, block, preprocess } => { - log::info!( - "informed of batch (sign ID {}, attempt {}) for block {}", - hex::encode(id.id), - id.attempt, - hex::encode(block), - ); - // If this is the first attempt instance, wait until we synchronize around the - // batch first - if id.attempt == 0 { - // Save the preprocess to disk so we can publish it later - // This is fine to use its own TX since it's static and just needs to be - // written before this message finishes it handling (or with this message's - // finished handling) - let mut txn = db.txn(); - MainDb::::save_first_preprocess(&mut txn, id.id, preprocess); - txn.commit(); - - Some(Transaction::Batch(block.0, id.id)) - } else { - Some(Transaction::BatchPreprocess(SignData { - plan: id.id, - attempt: id.attempt, - data: preprocess, - signed: Transaction::empty_signed(), - })) - } - } - coordinator::ProcessorMessage::BatchShare { id, share } => { - Some(Transaction::BatchShare(SignData { - plan: id.id, - attempt: id.attempt, - data: share.to_vec(), - signed: Transaction::empty_signed(), - })) - } - }, - ProcessorMessage::Substrate(inner_msg) => match inner_msg { - processor_messages::substrate::ProcessorMessage::Update { batch } => { - assert_eq!( - batch.batch.network, msg.network, - "processor sent us a batch for a different network than it was for", - ); - // TODO: Check this key's key pair's substrate key is authorized to publish - // batches - - // Save this batch to the disk - MainDb::new(&mut db).save_batch(batch); - - /* - Use a dedicated task to publish batches due to the latency potentially - incurred. - - This does not guarantee the batch has actually been published when the - message is `ack`ed to message-queue. Accordingly, if we reboot, these batches - would be dropped (as we wouldn't see the `Update` again, triggering our - re-attempt to publish). - - The solution to this is to have the task try not to publish the batch which - caused it to be spawned, yet all saved batches which have yet to published. - This does risk having multiple tasks trying to publish all pending batches, - yet these aren't notably complex. - */ - tokio::spawn({ - let mut db = db.clone(); - let serai = serai.clone(); - let network = msg.network; - async move { - // Since we have a new batch, publish all batches yet to be published to - // Serai - // This handles the edge-case where batch n+1 is signed before batch n is - while let Some(batch) = { - // Get the next-to-execute batch ID - let next = { - let mut first = true; - loop { - if !first { - log::error!( - "{} {network:?}", - "couldn't connect to Serai node to get the next batch ID for", - ); - tokio::time::sleep(Duration::from_secs(5)).await; - } - first = false; - - let Ok(latest_block) = serai.get_latest_block().await else { - continue; - }; - let Ok(last) = serai - .get_last_batch_for_network(latest_block.hash(), network) - .await - else { - continue; - }; - break if let Some(last) = last { last + 1 } else { 0 }; - } - }; - - // If we have this batch, attempt to publish it - MainDb::new(&mut db).batch(network, next) - } { - let id = batch.batch.id; - let block = batch.batch.block; - - let tx = Serai::execute_batch(batch); - // This publish may fail if this transactions already exists in the - // mempool, which is possible, or if this batch was already executed - // on-chain - // Either case will have eventual resolution and be handled by the above - // check on if this batch should execute - if serai.publish(&tx).await.is_ok() { - log::info!( - "published batch {network:?} {id} (block {})", - hex::encode(block) - ); - } - } - } - }); - - None - } - }, - }; - - // If this created a transaction, publish it - if let Some(mut tx) = tx { - log::trace!("processor message effected transaction {}", hex::encode(tx.hash())); - - match tx.kind() { - TransactionKind::Provided(_) => { - log::trace!("providing transaction {}", hex::encode(tx.hash())); - let res = tributary.provide_transaction(tx).await; - if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) { - panic!("provided an invalid transaction: {res:?}"); - } - } - TransactionKind::Unsigned => { - log::trace!("publishing unsigned transaction {}", hex::encode(tx.hash())); - // Ignores the result since we can't differentiate already in-mempool from - // already on-chain from invalid - // TODO: Don't ignore the result - tributary.add_transaction(tx).await; - } - TransactionKind::Signed(_) => { - log::trace!( - "getting next nonce for Tributary TX in response to processor message" - ); - - let nonce = loop { - let Some(nonce) = NonceDecider::::nonce(&db, genesis, &tx) - .expect("signed TX didn't have nonce") - else { - // This can be None if: - // 1) We scanned the relevant transaction(s) in a Tributary block - // 2) The processor was sent a message and responded - // 3) The Tributary TXN has yet to be committed - log::warn!( - "nonce has yet to be saved for processor-instigated transaction" - ); - sleep(Duration::from_millis(100)).await; - continue; - }; - break nonce; - }; - tx.sign(&mut OsRng, genesis, &key, nonce); - - publish_signed_transaction(&tributary, tx).await; - } - } - } - - // TODO: Consider a global txn for this message? - let mut txn = db.txn(); - MainDb::<'static, D>::save_handled_message(&mut txn, msg.id); - txn.commit(); - - processors.ack(msg).await; - } - } - }); + // For each new tributary, spawn a dedicated task to handle its messages from the processor + // TODO: Redo per network, not per tributary + tokio::spawn(handle_processor_messages( + db.clone(), + key.clone(), + serai.clone(), + processors.clone(), + tributary, + recv, + )); } } }); + // Dispatch task let mut last_msg = None; loop { // TODO: We dispatch this to an async task per-processor, yet we don't move to the next message @@ -871,10 +859,6 @@ pub async fn handle_processors( // Alternatively, a peek method with local delineation of handled messages would work. let msg = processors.recv().await; - if MainDb::<'static, D>::handled_message(&db, msg.id) { - processors.ack(msg).await; - continue; - } if last_msg == Some(msg.id) { sleep(Duration::from_secs(1)).await; continue; @@ -892,7 +876,7 @@ pub async fn handle_processors( } pub async fn run( - mut raw_db: D, + raw_db: D, key: Zeroizing<::F>, p2p: P, processors: Pro, @@ -902,7 +886,7 @@ pub async fn run( let (new_tributary_spec_send, mut new_tributary_spec_recv) = mpsc::unbounded_channel(); // Reload active tributaries from the database - for spec in MainDb::new(&mut raw_db).active_tributaries().1 { + for spec in MainDb::::active_tributaries(&raw_db).1 { new_tributary_spec_send.send(spec).unwrap(); } @@ -975,13 +959,13 @@ pub async fn run( let key = key.clone(); let tributaries = tributaries.clone(); async move { - // SubstrateBlockAck is fired before Preprocess, creating a race between Tributary ack - // of the SubstrateBlock and the sending of all Preprocesses + // The transactions for these are fired before the preprocesses are actually + // received/saved, creating a race between Tributary ack and the availability of all + // Preprocesses // This waits until the necessary preprocess is available let get_preprocess = |raw_db, id| async move { loop { let Some(preprocess) = MainDb::::first_preprocess(raw_db, id) else { - assert_eq!(id_type, RecognizedIdType::Plan); sleep(Duration::from_millis(100)).await; continue; };