From 091d485fd88eaf04732d901f65880eb6aa301b1b Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Fri, 10 Jan 2025 02:22:58 -0500 Subject: [PATCH] Have the Tributary scanner DB be distinct from the cosign DB Allows deleting the entire Tributary scanner DB upon retiry. --- coordinator/src/tributary/mod.rs | 1 + coordinator/src/tributary/scan.rs | 98 ++++++++++++++++++------------- 2 files changed, 58 insertions(+), 41 deletions(-) diff --git a/coordinator/src/tributary/mod.rs b/coordinator/src/tributary/mod.rs index 6d748940..60f005e3 100644 --- a/coordinator/src/tributary/mod.rs +++ b/coordinator/src/tributary/mod.rs @@ -4,3 +4,4 @@ pub use transaction::Transaction; mod db; mod scan; +pub(crate) use scan::ScanTributaryTask; diff --git a/coordinator/src/tributary/scan.rs b/coordinator/src/tributary/scan.rs index fec89f28..9da982e5 100644 --- a/coordinator/src/tributary/scan.rs +++ b/coordinator/src/tributary/scan.rs @@ -32,41 +32,43 @@ use crate::{ }, }; -struct ScanBlock<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> { - _db: PhantomData, +struct ScanBlock<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> { _p2p: PhantomData

, - txn: &'a mut DT, + cosign_db: &'a CD, + tributary_txn: &'a mut TDT, set: ValidatorSet, validators: &'a [SeraiAddress], total_weight: u64, validator_weights: &'a HashMap, tributary: &'a TributaryReader, } -impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> { +impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> { fn potentially_start_cosign(&mut self) { // Don't start a new cosigning instance if we're actively running one - if TributaryDb::actively_cosigning(self.txn, self.set) { + if TributaryDb::actively_cosigning(self.tributary_txn, self.set) { return; } // Start cosigning the latest intended-to-be-cosigned block let Some(latest_substrate_block_to_cosign) = - TributaryDb::latest_substrate_block_to_cosign(self.txn, self.set) + TributaryDb::latest_substrate_block_to_cosign(self.tributary_txn, self.set) else { return; }; let Some(substrate_block_number) = - Cosigning::::finalized_block_number(self.txn, latest_substrate_block_to_cosign) + Cosigning::::finalized_block_number(self.cosign_db, latest_substrate_block_to_cosign) else { + // This is a valid panic as we shouldn't be scanning this block if we didn't provide all + // Provided transactions within it, and the block to cosign is a Provided transaction panic!("cosigning a block our cosigner didn't index") }; // Mark us as actively cosigning - TributaryDb::start_cosigning(self.txn, self.set, substrate_block_number); + TributaryDb::start_cosigning(self.tributary_txn, self.set, substrate_block_number); // Send the message for the processor to start signing TributaryDb::send_message( - self.txn, + self.tributary_txn, self.set, messages::coordinator::CoordinatorMessage::CosignSubstrateBlock { session: self.set.session, @@ -81,7 +83,11 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> { if let TransactionKind::Signed(_, TributarySigned { signer, .. }) = tx.kind() { // Don't handle transactions from those fatally slashed // TODO: The fact they can publish these TXs makes this a notable spam vector - if TributaryDb::is_fatally_slashed(self.txn, self.set, SeraiAddress(signer.to_bytes())) { + if TributaryDb::is_fatally_slashed( + self.tributary_txn, + self.set, + SeraiAddress(signer.to_bytes()), + ) { return; } } @@ -94,7 +100,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> { // Check the participant voted to be removed actually exists if !self.validators.iter().any(|validator| *validator == participant) { TributaryDb::fatal_slash( - self.txn, + self.tributary_txn, self.set, signer, "voted to remove non-existent participant", @@ -103,7 +109,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> { } match TributaryDb::accumulate( - self.txn, + self.tributary_txn, self.set, self.validators, self.total_weight, @@ -115,7 +121,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> { ) { DataSet::None => {} DataSet::Participating(_) => { - TributaryDb::fatal_slash(self.txn, self.set, participant, "voted to remove"); + TributaryDb::fatal_slash(self.tributary_txn, self.set, participant, "voted to remove"); } }; } @@ -123,7 +129,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> { // Send the participation to the processor Transaction::DkgParticipation { participation, signed } => { TributaryDb::send_message( - self.txn, + self.tributary_txn, self.set, messages::key_gen::CoordinatorMessage::Participation { session: self.set.session, @@ -143,16 +149,20 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> { Transaction::Cosign { substrate_block_hash } => { // Update the latest intended-to-be-cosigned Substrate block - TributaryDb::set_latest_substrate_block_to_cosign(self.txn, self.set, substrate_block_hash); + TributaryDb::set_latest_substrate_block_to_cosign( + self.tributary_txn, + self.set, + substrate_block_hash, + ); // Start a new cosign if we weren't already working on one self.potentially_start_cosign(); } Transaction::Cosigned { substrate_block_hash } => { - TributaryDb::finish_cosigning(self.txn, self.set); + TributaryDb::finish_cosigning(self.tributary_txn, self.set); // Fetch the latest intended-to-be-cosigned block let Some(latest_substrate_block_to_cosign) = - TributaryDb::latest_substrate_block_to_cosign(self.txn, self.set) + TributaryDb::latest_substrate_block_to_cosign(self.tributary_txn, self.set) else { return; }; @@ -178,7 +188,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> { if slash_points.len() != self.validators.len() { TributaryDb::fatal_slash( - self.txn, + self.tributary_txn, self.set, signer, "slash report was for a distinct amount of signers", @@ -188,7 +198,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> { // Accumulate, and if past the threshold, calculate *the* slash report and start signing it match TributaryDb::accumulate( - self.txn, + self.tributary_txn, self.set, self.validators, self.total_weight, @@ -260,7 +270,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> { // Recognize the topic for signing the slash report TributaryDb::recognize_topic( - self.txn, + self.tributary_txn, self.set, Topic::Sign { id: VariantSignId::SlashReport, @@ -270,7 +280,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> { ); // Send the message for the processor to start signing TributaryDb::send_message( - self.txn, + self.tributary_txn, self.set, messages::coordinator::CoordinatorMessage::SignSlashReport { session: self.set.session, @@ -287,7 +297,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> { if u64::try_from(data.len()).unwrap() != self.validator_weights[&signer] { TributaryDb::fatal_slash( - self.txn, + self.tributary_txn, self.set, signer, "signer signed with a distinct amount of key shares than they had key shares", @@ -296,7 +306,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> { } match TributaryDb::accumulate( - self.txn, + self.tributary_txn, self.set, self.validators, self.total_weight, @@ -312,7 +322,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> { let flatten_data_set = |data_set| todo!("TODO"); let data_set = flatten_data_set(data_set); TributaryDb::send_message( - self.txn, + self.tributary_txn, self.set, match round { SigningProtocolRound::Preprocess => { @@ -330,7 +340,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> { } fn handle_block(mut self, block_number: u64, block: Block) { - TributaryDb::start_of_block(self.txn, self.set, block_number); + TributaryDb::start_of_block(self.tributary_txn, self.set, block_number); for tx in block.transactions { match tx { @@ -356,7 +366,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> { // Since anything with evidence is fundamentally faulty behavior, not just temporal // errors, mark the node as fatally slashed TributaryDb::fatal_slash( - self.txn, + self.tributary_txn, self.set, SeraiAddress(msgs.0.msg.sender), &format!("invalid tendermint messages: {msgs:?}"), @@ -370,20 +380,21 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> { } } -struct ScanTributaryTask { - db: D, - set: ValidatorSet, - validators: Vec, - total_weight: u64, - validator_weights: HashMap, - tributary: TributaryReader, - _p2p: PhantomData

, +pub(crate) struct ScanTributaryTask { + pub(crate) cosign_db: CD, + pub(crate) tributary_db: TD, + pub(crate) set: ValidatorSet, + pub(crate) validators: Vec, + pub(crate) total_weight: u64, + pub(crate) validator_weights: HashMap, + pub(crate) tributary: TributaryReader, + pub(crate) _p2p: PhantomData

, } -impl ContinuallyRan for ScanTributaryTask { +impl ContinuallyRan for ScanTributaryTask { fn run_iteration(&mut self) -> impl Send + Future> { async move { let (mut last_block_number, mut last_block_hash) = - TributaryDb::last_handled_tributary_block(&self.db, self.set) + TributaryDb::last_handled_tributary_block(&self.tributary_db, self.set) .unwrap_or((0, self.tributary.genesis())); let mut made_progess = false; @@ -407,11 +418,11 @@ impl ContinuallyRan for ScanTributaryTask { } } - let mut txn = self.db.txn(); + let mut tributary_txn = self.tributary_db.txn(); (ScanBlock { - _db: PhantomData::, _p2p: PhantomData::

, - txn: &mut txn, + cosign_db: &self.cosign_db, + tributary_txn: &mut tributary_txn, set: self.set, validators: &self.validators, total_weight: self.total_weight, @@ -419,10 +430,15 @@ impl ContinuallyRan for ScanTributaryTask { tributary: &self.tributary, }) .handle_block(block_number, block); - TributaryDb::set_last_handled_tributary_block(&mut txn, self.set, block_number, block_hash); + TributaryDb::set_last_handled_tributary_block( + &mut tributary_txn, + self.set, + block_number, + block_hash, + ); last_block_number = block_number; last_block_hash = block_hash; - txn.commit(); + tributary_txn.commit(); made_progess = true; }