diff --git a/coordinator/src/dkg_confirmation.rs b/coordinator/src/dkg_confirmation.rs index e09b0a4d..16857614 100644 --- a/coordinator/src/dkg_confirmation.rs +++ b/coordinator/src/dkg_confirmation.rs @@ -373,7 +373,7 @@ impl ContinuallyRan for ConfirmDkgTask { use bitvec::prelude::*; signature_participants = bitvec![u8, Lsb0; 0; 0]; let mut i = 0; - for (validator, _) in self.set.validators { + for (validator, _) in &self.set.validators { if Some(validator) == musig_validators.get(i) { signature_participants.push(true); i += 1; diff --git a/coordinator/src/tributary.rs b/coordinator/src/tributary.rs index fdb9c090..b3cffb68 100644 --- a/coordinator/src/tributary.rs +++ b/coordinator/src/tributary.rs @@ -173,18 +173,32 @@ async fn add_signed_unsigned_transaction( match &res { // Fresh publication, already published Ok(true | false) => {} - // InvalidNonce may be out-of-order TXs, not invalid ones, but we only create nonce #n+1 after - // on-chain inclusion of the TX with nonce #n, so it is invalid within our context - // TODO: We need to handle publishing #n when #n already on-chain Err( TransactionError::TooLargeTransaction | TransactionError::InvalidSigner | - TransactionError::InvalidNonce | TransactionError::InvalidSignature | TransactionError::InvalidContent, ) => { panic!("created an invalid transaction, tx: {tx:?}, err: {res:?}"); } + // InvalidNonce may be out-of-order TXs, not invalid ones, but we only create nonce #n+1 after + // on-chain inclusion of the TX with nonce #n, so it is invalid within our context unless the + // issue is this transaction was already included on-chain + Err(TransactionError::InvalidNonce) => { + let TransactionKind::Signed(order, signed) = tx.kind() else { + panic!("non-Signed transaction had InvalidNonce"); + }; + let next_nonce = tributary + .next_nonce(&signed.signer, &order) + .await + .expect("signer who is a present validator didn't have a nonce"); + assert!(next_nonce != signed.nonce); + // We're publishing an old transaction + if next_nonce > signed.nonce { + return true; + } + panic!("nonce in transaction wasn't contiguous with nonce on-chain"); + } // We've published too many transactions recently Err(TransactionError::TooManyInMempool) => { return false; @@ -196,6 +210,43 @@ async fn add_signed_unsigned_transaction( true } +async fn add_with_recognition_check( + set: ValidatorSet, + tributary_db: &mut TD, + tributary: &Tributary, + key: &Zeroizing<::F>, + tx: Transaction, +) -> bool { + let kind = tx.kind(); + match kind { + TransactionKind::Provided(_) => provide_transaction(set, tributary, tx).await, + TransactionKind::Unsigned | TransactionKind::Signed(_, _) => { + // If this is a transaction with signing data, check the topic is recognized before + // publishing + let topic = tx.topic(); + let still_requires_recognition = if let Some(topic) = topic { + (topic.requires_recognition() && (!RecognizedTopics::recognized(tributary_db, set, topic))) + .then_some(topic) + } else { + None + }; + if let Some(topic) = still_requires_recognition { + // Queue the transaction until the topic is recognized + // We use the Tributary DB for this so it's cleaned up when the Tributary DB is + let mut tributary_txn = tributary_db.txn(); + PublishOnRecognition::set(&mut tributary_txn, set, topic, &tx); + tributary_txn.commit(); + } else { + // Actually add the transaction + if !add_signed_unsigned_transaction(tributary, key, tx).await { + return false; + } + } + } + } + true +} + /// Adds all of the transactions sent via `TributaryTransactionsFromProcessorMessages`. pub(crate) struct AddTributaryTransactionsTask { db: CD, @@ -214,49 +265,44 @@ impl ContinuallyRan for AddTributaryTransactio // Provide/add all transactions sent our way loop { let mut txn = self.db.txn(); - // This gives priority to DkgConfirmation as that will only yield transactions at the start - // of the Tributary, ensuring this will be exhausted and yield to ProcessorMessages - let tx = match TributaryTransactionsFromDkgConfirmation::try_recv(&mut txn, self.set.set) { - Some(tx) => tx, - None => { - let Some(tx) = - TributaryTransactionsFromProcessorMessages::try_recv(&mut txn, self.set.set) - else { - break; - }; - tx - } + let Some(tx) = TributaryTransactionsFromDkgConfirmation::try_recv(&mut txn, self.set.set) + else { + break; }; - let kind = tx.kind(); - match kind { - TransactionKind::Provided(_) => { - provide_transaction(self.set.set, &self.tributary, tx).await - } - TransactionKind::Unsigned | TransactionKind::Signed(_, _) => { - // If this is a transaction with signing data, check the topic is recognized before - // publishing - let topic = tx.topic(); - let still_requires_recognition = if let Some(topic) = topic { - (topic.requires_recognition() && - (!RecognizedTopics::recognized(&self.tributary_db, self.set.set, topic))) - .then_some(topic) - } else { - None - }; - if let Some(topic) = still_requires_recognition { - // Queue the transaction until the topic is recognized - // We use the Tributary DB for this so it's cleaned up when the Tributary DB is - let mut txn = self.tributary_db.txn(); - PublishOnRecognition::set(&mut txn, self.set.set, topic, &tx); - txn.commit(); - } else { - // Actually add the transaction - if !add_signed_unsigned_transaction(&self.tributary, &self.key, tx).await { - break; - } - } - } + if !add_with_recognition_check( + self.set.set, + &mut self.tributary_db, + &self.tributary, + &self.key, + tx, + ) + .await + { + break; + } + + made_progress = true; + txn.commit(); + } + + loop { + let mut txn = self.db.txn(); + let Some(tx) = TributaryTransactionsFromProcessorMessages::try_recv(&mut txn, self.set.set) + else { + break; + }; + + if !add_with_recognition_check( + self.set.set, + &mut self.tributary_db, + &self.tributary, + &self.key, + tx, + ) + .await + { + break; } made_progress = true; @@ -265,20 +311,20 @@ impl ContinuallyRan for AddTributaryTransactio // Provide/add all transactions due to newly recognized topics loop { - let mut txn = self.tributary_db.txn(); + let mut tributary_txn = self.tributary_db.txn(); let Some(topic) = - RecognizedTopics::try_recv_topic_requiring_recognition(&mut txn, self.set.set) + RecognizedTopics::try_recv_topic_requiring_recognition(&mut tributary_txn, self.set.set) else { break; }; - if let Some(tx) = PublishOnRecognition::take(&mut txn, self.set.set, topic) { + if let Some(tx) = PublishOnRecognition::take(&mut tributary_txn, self.set.set, topic) { if !add_signed_unsigned_transaction(&self.tributary, &self.key, tx).await { break; } } made_progress = true; - txn.commit(); + tributary_txn.commit(); } // Publish any participant removals