From 0ce9aad9b2a34f936e52ee815beeae404c61b325 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sun, 12 Jan 2025 07:32:45 -0500 Subject: [PATCH] Add flow to add transactions onto Tributaries --- coordinator/src/db.rs | 30 +++++- coordinator/src/main.rs | 51 ++++++++-- coordinator/src/tributary.rs | 160 +++++++++++++++++++++++-------- coordinator/tributary/src/db.rs | 3 + coordinator/tributary/src/lib.rs | 54 ++++++++++- processor/messages/src/lib.rs | 4 +- 6 files changed, 246 insertions(+), 56 deletions(-) diff --git a/coordinator/src/db.rs b/coordinator/src/db.rs index 012d0257..8bb2fd24 100644 --- a/coordinator/src/db.rs +++ b/coordinator/src/db.rs @@ -9,8 +9,8 @@ use serai_client::{ }; use serai_cosign::SignedCosign; - use serai_coordinator_substrate::NewSetInformation; +use serai_coordinator_tributary::Transaction; #[cfg(all(feature = "parity-db", not(feature = "rocksdb")))] pub(crate) type Db = serai_db::ParityDb; @@ -81,9 +81,33 @@ create_db! { db_channel! { Coordinator { - // Tributaries to clean up upon reboot - TributaryCleanup: () -> ValidatorSet, // Cosigns we produced SignedCosigns: () -> SignedCosign, + // Tributaries to clean up upon reboot + TributaryCleanup: () -> ValidatorSet, + } +} + +mod _internal_db { + use super::*; + + db_channel! { + Coordinator { + // Tributary transactions to publish + TributaryTransactions: (set: ValidatorSet) -> Transaction, + } + } +} + +pub(crate) struct TributaryTransactions; +impl TributaryTransactions { + pub(crate) fn send(txn: &mut impl DbTxn, set: ValidatorSet, tx: &Transaction) { + // If this set has yet to be retired, send this transaction + if RetiredTributary::get(txn, set.network).map(|session| session.0) < Some(set.session.0) { + _internal_db::TributaryTransactions::send(txn, set, tx); + } + } + pub(crate) fn try_recv(txn: &mut impl DbTxn, set: ValidatorSet) -> Option { + _internal_db::TributaryTransactions::try_recv(txn, set) } } diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index f549378d..f693650a 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -1,5 +1,5 @@ use core::{ops::Deref, time::Duration}; -use std::{sync::Arc, time::Instant}; +use std::{sync::Arc, collections::HashMap, time::Instant}; use zeroize::{Zeroize, Zeroizing}; use rand_core::{RngCore, OsRng}; @@ -15,6 +15,7 @@ use tokio::sync::mpsc; use serai_client::{ primitives::{NetworkId, PublicKey}, + validator_sets::primitives::ValidatorSet, Serai, }; use message_queue::{Service, client::MessageQueue}; @@ -23,7 +24,7 @@ use serai_task::{Task, TaskHandle, ContinuallyRan}; use serai_cosign::{Faulted, SignedCosign, Cosigning}; use serai_coordinator_substrate::{CanonicalEventStream, EphemeralEventStream, SignSlashReport}; -use serai_coordinator_tributary::Transaction; +use serai_coordinator_tributary::{Signed, Transaction, SubstrateBlockPlans}; mod db; use db::*; @@ -178,7 +179,12 @@ async fn handle_processor_messages( match msg { messages::ProcessorMessage::KeyGen(msg) => match msg { messages::key_gen::ProcessorMessage::Participation { session, participation } => { - todo!("TODO Transaction::DkgParticipation") + let set = ValidatorSet { network, session }; + TributaryTransactions::send( + &mut txn, + set, + &Transaction::DkgParticipation { participation, signed: Signed::default() }, + ); } messages::key_gen::ProcessorMessage::GeneratedKeyPair { session, @@ -186,12 +192,28 @@ async fn handle_processor_messages( network_key, } => todo!("TODO Transaction::DkgConfirmationPreprocess"), messages::key_gen::ProcessorMessage::Blame { session, participant } => { - todo!("TODO Transaction::RemoveParticipant") + let set = ValidatorSet { network, session }; + TributaryTransactions::send( + &mut txn, + set, + &Transaction::RemoveParticipant { + participant: todo!("TODO"), + signed: Signed::default(), + }, + ); } }, messages::ProcessorMessage::Sign(msg) => match msg { messages::sign::ProcessorMessage::InvalidParticipant { session, participant } => { - todo!("TODO Transaction::RemoveParticipant") + let set = ValidatorSet { network, session }; + TributaryTransactions::send( + &mut txn, + set, + &Transaction::RemoveParticipant { + participant: todo!("TODO"), + signed: Signed::default(), + }, + ); } messages::sign::ProcessorMessage::Preprocesses { id, preprocesses } => { todo!("TODO Transaction::Batch + Transaction::Sign") @@ -211,7 +233,22 @@ async fn handle_processor_messages( }, messages::ProcessorMessage::Substrate(msg) => match msg { messages::substrate::ProcessorMessage::SubstrateBlockAck { block, plans } => { - todo!("TODO Transaction::SubstrateBlock") + let mut by_session = HashMap::new(); + for plan in plans { + by_session + .entry(plan.session) + .or_insert_with(|| Vec::with_capacity(1)) + .push(plan.transaction_plan_id); + } + for (session, plans) in by_session { + let set = ValidatorSet { network, session }; + SubstrateBlockPlans::set(&mut txn, set, block, &plans); + TributaryTransactions::send( + &mut txn, + set, + &Transaction::SubstrateBlock { hash: block }, + ); + } } }, } @@ -274,6 +311,8 @@ async fn main() { prune_tributary_db(to_cleanup); // Drain the cosign intents created for this set while !Cosigning::::intended_cosigns(&mut txn, to_cleanup).is_empty() {} + // Drain the transactions to publish for this set + while TributaryTransactions::try_recv(&mut txn, to_cleanup).is_some() {} // Remove the SignSlashReport notification SignSlashReport::try_recv(&mut txn, to_cleanup); } diff --git a/coordinator/src/tributary.rs b/coordinator/src/tributary.rs index f09c14cd..a96cf225 100644 --- a/coordinator/src/tributary.rs +++ b/coordinator/src/tributary.rs @@ -13,7 +13,7 @@ use serai_db::{Get, DbTxn, Db as DbTrait, create_db, db_channel}; use scale::Encode; use serai_client::validator_sets::primitives::ValidatorSet; -use tributary_sdk::{TransactionError, ProvidedError, Tributary}; +use tributary_sdk::{TransactionKind, TransactionError, ProvidedError, TransactionTrait, Tributary}; use serai_task::{Task, TaskHandle, ContinuallyRan}; @@ -24,7 +24,7 @@ use serai_coordinator_substrate::{NewSetInformation, SignSlashReport}; use serai_coordinator_tributary::{Transaction, ProcessorMessages, CosignIntents, ScanTributaryTask}; use serai_coordinator_p2p::P2p; -use crate::Db; +use crate::{Db, TributaryTransactions}; db_channel! { Coordinator { @@ -32,6 +32,40 @@ db_channel! { } } +/// Provide a Provided Transaction to the Tributary. +/// +/// This is not a well-designed function. This is specific to the context in which its called, +/// within this file. It should only be considered an internal helper for this domain alone. +async fn provide_transaction( + set: ValidatorSet, + tributary: &Tributary, + tx: Transaction, +) { + match tributary.provide_transaction(tx.clone()).await { + // The Tributary uses its own DB, so we may provide this multiple times if we reboot before + // committing the txn which provoked this + Ok(()) | Err(ProvidedError::AlreadyProvided) => {} + Err(ProvidedError::NotProvided) => { + panic!("providing a Transaction which wasn't a Provided transaction: {tx:?}"); + } + Err(ProvidedError::InvalidProvided(e)) => { + panic!("providing an invalid Provided transaction, tx: {tx:?}, error: {e:?}") + } + // The Tributary's scan task won't advance if we don't have the Provided transactions + // present on-chain, and this enters an infinite loop to block the calling task from + // advancing + Err(ProvidedError::LocalMismatchesOnChain) => loop { + log::error!( + "Tributary {:?} was supposed to provide {:?} but peers disagree, halting Tributary", + set, + tx, + ); + // Print this every five minutes as this does need to be handled + tokio::time::sleep(Duration::from_secs(5 * 60)).await; + }, + } +} + /// Provides Cosign/Cosigned Transactions onto the Tributary. pub(crate) struct ProvideCosignCosignedTransactionsTask { db: CD, @@ -43,40 +77,6 @@ impl ContinuallyRan for ProvideCosignCosignedTransactionsTask { fn run_iteration(&mut self) -> impl Send + Future> { - /// Provide a Provided Transaction to the Tributary. - /// - /// This is not a well-designed function. This is specific to the context in which its called, - /// within this file. It should only be considered an internal helper for this domain alone. - async fn provide_transaction( - set: ValidatorSet, - tributary: &Tributary, - tx: Transaction, - ) { - match tributary.provide_transaction(tx.clone()).await { - // The Tributary uses its own DB, so we may provide this multiple times if we reboot before - // committing the txn which provoked this - Ok(()) | Err(ProvidedError::AlreadyProvided) => {} - Err(ProvidedError::NotProvided) => { - panic!("providing a Transaction which wasn't a Provided transaction: {tx:?}"); - } - Err(ProvidedError::InvalidProvided(e)) => { - panic!("providing an invalid Provided transaction, tx: {tx:?}, error: {e:?}") - } - Err(ProvidedError::LocalMismatchesOnChain) => loop { - // The Tributary's scan task won't advance if we don't have the Provided transactions - // present on-chain, and this enters an infinite loop to block the calling task from - // advancing - log::error!( - "Tributary {:?} was supposed to provide {:?} but peers disagree, halting Tributary", - set, - tx, - ); - // Print this every five minutes as this does need to be handled - tokio::time::sleep(Duration::from_secs(5 * 60)).await; - }, - } - } - async move { let mut made_progress = false; @@ -145,6 +145,66 @@ impl ContinuallyRan } } +/// Adds all of the transactions sent via `TributaryTransactions`. +pub(crate) struct AddTributaryTransactionsTask { + db: CD, + tributary_db: TD, + tributary: Tributary, + set: ValidatorSet, + key: Zeroizing<::F>, +} +impl ContinuallyRan for AddTributaryTransactionsTask { + fn run_iteration(&mut self) -> impl Send + Future> { + async move { + let mut made_progress = false; + loop { + let mut txn = self.db.txn(); + let Some(mut tx) = TributaryTransactions::try_recv(&mut txn, self.set) else { break }; + + let kind = tx.kind(); + match kind { + TransactionKind::Provided(_) => provide_transaction(self.set, &self.tributary, tx).await, + TransactionKind::Unsigned | TransactionKind::Signed(_, _) => { + // If this is a signed transaction, sign it + if matches!(kind, TransactionKind::Signed(_, _)) { + tx.sign(&mut OsRng, self.tributary.genesis(), &self.key); + } + + // Actually add the transaction + // TODO: If this is a preprocess, make sure the topic has been recognized + let res = self.tributary.add_transaction(tx.clone()).await; + match &res { + // Fresh publication, already published + Ok(true | false) => {} + Err( + TransactionError::TooLargeTransaction | + TransactionError::InvalidSigner | + TransactionError::InvalidNonce | + TransactionError::InvalidSignature | + TransactionError::InvalidContent, + ) => { + panic!("created an invalid transaction, tx: {tx:?}, err: {res:?}"); + } + // We've published too many transactions recently + // Drop this txn to try to publish it again later on a future iteration + Err(TransactionError::TooManyInMempool) => { + drop(txn); + break; + } + // This isn't a Provided transaction so this should never be hit + Err(TransactionError::ProvidedAddedToMempool) => unreachable!(), + } + } + } + + made_progress = true; + txn.commit(); + } + Ok(made_progress) + } + } +} + /// Takes the messages from ScanTributaryTask and publishes them to the message-queue. pub(crate) struct TributaryProcessorMessagesTask { tributary_db: TD, @@ -207,7 +267,10 @@ impl ContinuallyRan for SignSlashReportTask return Ok(false), + Err(TransactionError::TooManyInMempool) => { + drop(txn); + return Ok(false); + } // This isn't a Provided transaction so this should never be hit Err(TransactionError::ProvidedAddedToMempool) => unreachable!(), } @@ -343,14 +406,27 @@ pub(crate) async fn spawn_tributary( tokio::spawn( (SignSlashReportTask { db: db.clone(), - tributary_db, + tributary_db: tributary_db.clone(), tributary: tributary.clone(), set: set.clone(), - key: serai_key, + key: serai_key.clone(), }) .continually_run(sign_slash_report_task_def, vec![]), ); + // Spawn the add transactions task + let (add_tributary_transactions_task_def, add_tributary_transactions_task) = Task::new(); + tokio::spawn( + (AddTributaryTransactionsTask { + db: db.clone(), + tributary_db, + tributary: tributary.clone(), + set: set.set, + key: serai_key, + }) + .continually_run(add_tributary_transactions_task_def, vec![]), + ); + // Whenever a new block occurs, immediately run the scan task // This function also preserves the ProvideCosignCosignedTransactionsTask handle until the // Tributary is retired, ensuring it isn't dropped prematurely and that the task don't run ad @@ -360,6 +436,10 @@ pub(crate) async fn spawn_tributary( set.set, tributary, scan_tributary_task, - vec![provide_cosign_cosigned_transactions_task, sign_slash_report_task], + vec![ + provide_cosign_cosigned_transactions_task, + sign_slash_report_task, + add_tributary_transactions_task, + ], )); } diff --git a/coordinator/tributary/src/db.rs b/coordinator/tributary/src/db.rs index 9d426d96..08fac488 100644 --- a/coordinator/tributary/src/db.rs +++ b/coordinator/tributary/src/db.rs @@ -198,6 +198,9 @@ create_db!( // If this block has already been cosigned. Cosigned: (set: ValidatorSet, substrate_block_hash: [u8; 32]) -> (), + // The plans to whitelist upon a `Transaction::SubstrateBlock` being included on-chain. + SubstrateBlockPlans: (set: ValidatorSet, substrate_block_hash: [u8; 32]) -> Vec<[u8; 32]>, + // The weight accumulated for a topic. AccumulatedWeight: (set: ValidatorSet, topic: Topic) -> u64, // The entries accumulated for a topic, by validator. diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index 91a77a62..e897afe5 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -30,8 +30,7 @@ use serai_coordinator_substrate::NewSetInformation; use messages::sign::VariantSignId; mod transaction; -pub(crate) use transaction::{SigningProtocolRound, Signed}; -pub use transaction::Transaction; +pub use transaction::{SigningProtocolRound, Signed, Transaction}; mod db; use db::*; @@ -63,6 +62,30 @@ impl CosignIntents { } } +/// The plans to whitelist upon a `Transaction::SubstrateBlock` being included on-chain. +pub struct SubstrateBlockPlans; +impl SubstrateBlockPlans { + /// Set the plans to whitelist upon the associated `Transaction::SubstrateBlock` being included + /// on-chain. + /// + /// This must be done before the associated `Transaction::Cosign` is provided. + pub fn set( + txn: &mut impl DbTxn, + set: ValidatorSet, + substrate_block_hash: [u8; 32], + plans: &Vec<[u8; 32]>, + ) { + db::SubstrateBlockPlans::set(txn, set, substrate_block_hash, &plans); + } + fn take( + txn: &mut impl DbTxn, + set: ValidatorSet, + substrate_block_hash: [u8; 32], + ) -> Option> { + db::SubstrateBlockPlans::take(txn, set, substrate_block_hash) + } +} + struct ScanBlock<'a, TD: Db, TDT: DbTxn, P: P2p> { _td: PhantomData, _p2p: PhantomData

, @@ -222,11 +245,32 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { } Transaction::SubstrateBlock { hash } => { // Whitelist all of the IDs this Substrate block causes to be signed - todo!("TODO") + let plans = SubstrateBlockPlans::take(self.tributary_txn, self.set, hash).expect( + "Transaction::SubstrateBlock locally provided but SubstrateBlockPlans wasn't populated", + ); + for plan in plans { + TributaryDb::recognize_topic( + self.tributary_txn, + self.set, + Topic::Sign { + id: VariantSignId::Transaction(plan), + attempt: 0, + round: SigningProtocolRound::Preprocess, + }, + ); + } } Transaction::Batch { hash } => { - // Whitelist the signing of this batch, publishing our own preprocess - todo!("TODO") + // Whitelist the signing of this batch + TributaryDb::recognize_topic( + self.tributary_txn, + self.set, + Topic::Sign { + id: VariantSignId::Batch(hash), + attempt: 0, + round: SigningProtocolRound::Preprocess, + }, + ); } Transaction::SlashReport { slash_points, signed } => { diff --git a/processor/messages/src/lib.rs b/processor/messages/src/lib.rs index 5cda454b..acf01775 100644 --- a/processor/messages/src/lib.rs +++ b/processor/messages/src/lib.rs @@ -224,13 +224,13 @@ pub mod substrate { #[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] pub struct PlanMeta { pub session: Session, - pub transaction: [u8; 32], + pub transaction_plan_id: [u8; 32], } #[derive(Clone, Debug, BorshSerialize, BorshDeserialize)] pub enum ProcessorMessage { // TODO: Have the processor send this - SubstrateBlockAck { block: u64, plans: Vec }, + SubstrateBlockAck { block: [u8; 32], plans: Vec }, } }