diff --git a/coordinator/src/tributary.rs b/coordinator/src/tributary.rs index a445be16..2ebfd223 100644 --- a/coordinator/src/tributary.rs +++ b/coordinator/src/tributary.rs @@ -21,11 +21,19 @@ use message_queue::{Service, Metadata, client::MessageQueue}; use serai_cosign::{Faulted, CosignIntent, Cosigning}; use serai_coordinator_substrate::{NewSetInformation, SignSlashReport}; -use serai_coordinator_tributary::{Transaction, ProcessorMessages, CosignIntents, ScanTributaryTask}; +use serai_coordinator_tributary::{ + Topic, Transaction, ProcessorMessages, CosignIntents, RecognizedTopics, ScanTributaryTask, +}; use serai_coordinator_p2p::P2p; use crate::{Db, TributaryTransactions}; +create_db! { + Coordinator { + PublishOnRecognition: (set: ValidatorSet, topic: Topic) -> Transaction, + } +} + db_channel! { Coordinator { PendingCosigns: (set: ValidatorSet) -> CosignIntent, @@ -147,6 +155,37 @@ impl ContinuallyRan } } +#[must_use] +async fn add_signed_unsigned_transaction( + tributary: &Tributary, + tx: &Transaction, +) -> bool { + let res = tributary.add_transaction(tx.clone()).await; + match &res { + // Fresh publication, already published + Ok(true | false) => {} + // InvalidNonce may be out-of-order TXs, not invalid ones, but we only create nonce #n+1 after + // on-chain inclusion of the TX with nonce #n, so it is invalid within our context + Err( + TransactionError::TooLargeTransaction | + TransactionError::InvalidSigner | + TransactionError::InvalidNonce | + TransactionError::InvalidSignature | + TransactionError::InvalidContent, + ) => { + panic!("created an invalid transaction, tx: {tx:?}, err: {res:?}"); + } + // We've published too many transactions recently + Err(TransactionError::TooManyInMempool) => { + return false; + } + // This isn't a Provided transaction so this should never be hit + Err(TransactionError::ProvidedAddedToMempool) => unreachable!(), + } + + true +} + /// Adds all of the transactions sent via `TributaryTransactions`. pub(crate) struct AddTributaryTransactionsTask { db: CD, @@ -161,6 +200,8 @@ impl ContinuallyRan for AddTributaryTransactio fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut made_progress = false; + + // Provide/add all transactions sent our way loop { let mut txn = self.db.txn(); let Some(mut tx) = TributaryTransactions::try_recv(&mut txn, self.set) else { break }; @@ -174,29 +215,27 @@ impl ContinuallyRan for AddTributaryTransactio tx.sign(&mut OsRng, self.tributary.genesis(), &self.key); } - // Actually add the transaction - // TODO: If this is a preprocess, make sure the topic has been recognized - let res = self.tributary.add_transaction(tx.clone()).await; - match &res { - // Fresh publication, already published - Ok(true | false) => {} - Err( - TransactionError::TooLargeTransaction | - TransactionError::InvalidSigner | - TransactionError::InvalidNonce | - TransactionError::InvalidSignature | - TransactionError::InvalidContent, - ) => { - panic!("created an invalid transaction, tx: {tx:?}, err: {res:?}"); - } - // We've published too many transactions recently - // Drop this txn to try to publish it again later on a future iteration - Err(TransactionError::TooManyInMempool) => { - drop(txn); + // If this is a transaction with signing data, check the topic is recognized before + // publishing + let topic = tx.topic(); + let still_requires_recognition = if let Some(topic) = topic { + (topic.requires_recognition() && + (!RecognizedTopics::recognized(&self.tributary_db, self.set, topic))) + .then_some(topic) + } else { + None + }; + if let Some(topic) = still_requires_recognition { + // Queue the transaction until the topic is recognized + // We use the Tributary DB for this so it's cleaned up when the Tributary DB is + let mut txn = self.tributary_db.txn(); + PublishOnRecognition::set(&mut txn, self.set, topic, &tx); + txn.commit(); + } else { + // Actually add the transaction + if !add_signed_unsigned_transaction(&self.tributary, &tx).await { break; } - // This isn't a Provided transaction so this should never be hit - Err(TransactionError::ProvidedAddedToMempool) => unreachable!(), } } } @@ -204,6 +243,25 @@ impl ContinuallyRan for AddTributaryTransactio made_progress = true; txn.commit(); } + + // Provide/add all transactions due to newly recognized topics + loop { + let mut txn = self.tributary_db.txn(); + let Some(topic) = + RecognizedTopics::try_recv_topic_requiring_recognition(&mut txn, self.set) + else { + break; + }; + if let Some(tx) = PublishOnRecognition::take(&mut txn, self.set, topic) { + if !add_signed_unsigned_transaction(&self.tributary, &tx).await { + break; + } + } + + made_progress = true; + txn.commit(); + } + Ok(made_progress) } } diff --git a/coordinator/tributary/src/db.rs b/coordinator/tributary/src/db.rs index 08fac488..aefe45d3 100644 --- a/coordinator/tributary/src/db.rs +++ b/coordinator/tributary/src/db.rs @@ -15,20 +15,35 @@ use crate::transaction::SigningProtocolRound; /// A topic within the database which the group participates in #[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, BorshSerialize, BorshDeserialize)] -pub(crate) enum Topic { +pub enum Topic { /// Vote to remove a participant - RemoveParticipant { participant: SeraiAddress }, + RemoveParticipant { + /// The participant to remove + participant: SeraiAddress, + }, // DkgParticipation isn't represented here as participations are immediately sent to the // processor, not accumulated within this databse /// Participation in the signing protocol to confirm the DKG results on Substrate - DkgConfirmation { attempt: u32, round: SigningProtocolRound }, + DkgConfirmation { + /// The attempt number this is for + attempt: u32, + /// The round of the signing protocol + round: SigningProtocolRound, + }, /// The local view of the SlashReport, to be aggregated into the final SlashReport SlashReport, /// Participation in a signing protocol - Sign { id: VariantSignId, attempt: u32, round: SigningProtocolRound }, + Sign { + /// The ID of the signing protocol + id: VariantSignId, + /// The attempt number this is for + attempt: u32, + /// The round of the signing protocol + round: SigningProtocolRound, + }, } enum Participating { @@ -138,16 +153,17 @@ impl Topic { } } - fn requires_whitelisting(&self) -> bool { + /// If this topic requires recognition before entries are permitted for it. + pub fn requires_recognition(&self) -> bool { #[allow(clippy::match_same_arms)] match self { - // We don't require whitelisting to remove a participant + // We don't require recognition to remove a participant Topic::RemoveParticipant { .. } => false, - // We don't require whitelisting for the first attempt, solely the re-attempts + // We don't require recognition for the first attempt, solely the re-attempts Topic::DkgConfirmation { attempt, .. } => *attempt != 0, - // We don't require whitelisting for the slash report + // We don't require recognition for the slash report Topic::SlashReport { .. } => false, - // We do require whitelisting for every sign protocol + // We do require recognition for every sign protocol Topic::Sign { .. } => true, } } @@ -198,7 +214,7 @@ create_db!( // If this block has already been cosigned. Cosigned: (set: ValidatorSet, substrate_block_hash: [u8; 32]) -> (), - // The plans to whitelist upon a `Transaction::SubstrateBlock` being included on-chain. + // The plans to recognize upon a `Transaction::SubstrateBlock` being included on-chain. SubstrateBlockPlans: (set: ValidatorSet, substrate_block_hash: [u8; 32]) -> Vec<[u8; 32]>, // The weight accumulated for a topic. @@ -214,6 +230,7 @@ create_db!( db_channel!( CoordinatorTributary { ProcessorMessages: (set: ValidatorSet) -> messages::CoordinatorMessage, + RecognizedTopics: (set: ValidatorSet) -> Topic, } ); @@ -262,7 +279,7 @@ impl TributaryDb { ); ActivelyCosigning::set(txn, set, &substrate_block_hash); - TributaryDb::recognize_topic( + Self::recognize_topic( txn, set, Topic::Sign { @@ -292,6 +309,10 @@ impl TributaryDb { pub(crate) fn recognize_topic(txn: &mut impl DbTxn, set: ValidatorSet, topic: Topic) { AccumulatedWeight::set(txn, set, topic, &0); + RecognizedTopics::send(txn, set, &topic); + } + pub(crate) fn recognized(getter: &impl Get, set: ValidatorSet, topic: Topic) -> bool { + AccumulatedWeight::get(getter, set, topic).is_some() } pub(crate) fn start_of_block(txn: &mut impl DbTxn, set: ValidatorSet, block_number: u64) { @@ -350,8 +371,13 @@ impl TributaryDb { // nonces on transactions (deterministically to the topic) let accumulated_weight = AccumulatedWeight::get(txn, set, topic); - if topic.requires_whitelisting() && accumulated_weight.is_none() { - Self::fatal_slash(txn, set, validator, "participated in unrecognized topic"); + if topic.requires_recognition() && accumulated_weight.is_none() { + Self::fatal_slash( + txn, + set, + validator, + "participated in unrecognized topic which requires recognition", + ); return DataSet::None; } let mut accumulated_weight = accumulated_weight.unwrap_or(0); diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index f0aa8029..bd6119dd 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -34,6 +34,7 @@ pub use transaction::{SigningProtocolRound, Signed, Transaction}; mod db; use db::*; +pub use db::Topic; /// Messages to send to the Processors. pub struct ProcessorMessages; @@ -62,10 +63,28 @@ impl CosignIntents { } } -/// The plans to whitelist upon a `Transaction::SubstrateBlock` being included on-chain. +/// An interface to the topics recognized on this Tributary. +pub struct RecognizedTopics; +impl RecognizedTopics { + /// If this topic has been recognized by this Tributary. + /// + /// This will either be by explicit recognition or participation. + pub fn recognized(getter: &impl Get, set: ValidatorSet, topic: Topic) -> bool { + TributaryDb::recognized(getter, set, topic) + } + /// The next topic requiring recognition which has been recognized by this Tributary. + pub fn try_recv_topic_requiring_recognition( + txn: &mut impl DbTxn, + set: ValidatorSet, + ) -> Option { + db::RecognizedTopics::try_recv(txn, set) + } +} + +/// The plans to recognize upon a `Transaction::SubstrateBlock` being included on-chain. pub struct SubstrateBlockPlans; impl SubstrateBlockPlans { - /// Set the plans to whitelist upon the associated `Transaction::SubstrateBlock` being included + /// Set the plans to recognize upon the associated `Transaction::SubstrateBlock` being included /// on-chain. /// /// This must be done before the associated `Transaction::Cosign` is provided. @@ -75,7 +94,7 @@ impl SubstrateBlockPlans { substrate_block_hash: [u8; 32], plans: &Vec<[u8; 32]>, ) { - db::SubstrateBlockPlans::set(txn, set, substrate_block_hash, &plans); + db::SubstrateBlockPlans::set(txn, set, substrate_block_hash, plans); } fn take( txn: &mut impl DbTxn, @@ -154,6 +173,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { } } + let topic = tx.topic(); match tx { // Accumulate this vote and fatally slash the participant if past the threshold Transaction::RemoveParticipant { participant, signed } => { @@ -176,7 +196,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { self.validators, self.total_weight, block_number, - Topic::RemoveParticipant { participant }, + topic.unwrap(), signer, self.validator_weights[&signer], &(), @@ -244,7 +264,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { self.potentially_start_cosign(); } Transaction::SubstrateBlock { hash } => { - // Whitelist all of the IDs this Substrate block causes to be signed + // Recognize all of the IDs this Substrate block causes to be signed let plans = SubstrateBlockPlans::take(self.tributary_txn, self.set, hash).expect( "Transaction::SubstrateBlock locally provided but SubstrateBlockPlans wasn't populated", ); @@ -261,7 +281,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { } } Transaction::Batch { hash } => { - // Whitelist the signing of this batch + // Recognize the signing of this batch TributaryDb::recognize_topic( self.tributary_txn, self.set, @@ -293,7 +313,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { self.validators, self.total_weight, block_number, - Topic::SlashReport, + topic.unwrap(), signer, self.validator_weights[&signer], &slash_points, @@ -351,7 +371,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { // Create the resulting slash report let mut slash_report = vec![]; - for (validator, points) in self.validators.iter().copied().zip(amortized_slash_report) { + for (_, points) in self.validators.iter().copied().zip(amortized_slash_report) { // TODO: Natively store this as a `Slash` if points == u32::MAX { slash_report.push(Slash::Fatal); @@ -385,7 +405,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { } Transaction::Sign { id, attempt, round, data, signed } => { - let topic = Topic::Sign { id, attempt, round }; + let topic = topic.unwrap(); let signer = signer(signed); if u64::try_from(data.len()).unwrap() != self.validator_weights[&signer] { diff --git a/coordinator/tributary/src/transaction.rs b/coordinator/tributary/src/transaction.rs index 2cc4600c..f72d2620 100644 --- a/coordinator/tributary/src/transaction.rs +++ b/coordinator/tributary/src/transaction.rs @@ -25,6 +25,8 @@ use tributary_sdk::{ }, }; +use crate::db::Topic; + /// The round this data is for, within a signing protocol. #[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, BorshSerialize, BorshDeserialize)] pub enum SigningProtocolRound { @@ -180,7 +182,7 @@ pub enum Transaction { /// /// This is provided after the block has been cosigned. /// - /// With the acknowledgement of a Substrate block, we can whitelist all the `VariantSignId`s + /// With the acknowledgement of a Substrate block, we can recognize all the `VariantSignId`s /// resulting from its handling. SubstrateBlock { /// The hash of the Substrate block @@ -318,6 +320,36 @@ impl TransactionTrait for Transaction { } impl Transaction { + /// The topic in the database for this transaction. + pub fn topic(&self) -> Option { + #[allow(clippy::match_same_arms)] // This doesn't make semantic sense here + match self { + Transaction::RemoveParticipant { participant, .. } => { + Some(Topic::RemoveParticipant { participant: *participant }) + } + + Transaction::DkgParticipation { .. } => None, + Transaction::DkgConfirmationPreprocess { attempt, .. } => { + Some(Topic::DkgConfirmation { attempt: *attempt, round: SigningProtocolRound::Preprocess }) + } + Transaction::DkgConfirmationShare { attempt, .. } => { + Some(Topic::DkgConfirmation { attempt: *attempt, round: SigningProtocolRound::Share }) + } + + // Provided TXs + Transaction::Cosign { .. } | + Transaction::Cosigned { .. } | + Transaction::SubstrateBlock { .. } | + Transaction::Batch { .. } => None, + + Transaction::Sign { id, attempt, round, .. } => { + Some(Topic::Sign { id: *id, attempt: *attempt, round: *round }) + } + + Transaction::SlashReport { .. } => Some(Topic::SlashReport), + } + } + /// Sign a transaction. /// /// Panics if signing a transaction whose type isn't `TransactionKind::Signed`.