Have the Tributary scanner DB be distinct from the cosign DB

Allows deleting the entire Tributary scanner DB upon retiry.
This commit is contained in:
Luke Parker
2025-01-10 02:22:58 -05:00
parent 2a3eaf4d7e
commit 091d485fd8
2 changed files with 58 additions and 41 deletions

View File

@@ -4,3 +4,4 @@ pub use transaction::Transaction;
mod db; mod db;
mod scan; mod scan;
pub(crate) use scan::ScanTributaryTask;

View File

@@ -32,41 +32,43 @@ use crate::{
}, },
}; };
struct ScanBlock<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> { struct ScanBlock<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> {
_db: PhantomData<D>,
_p2p: PhantomData<P>, _p2p: PhantomData<P>,
txn: &'a mut DT, cosign_db: &'a CD,
tributary_txn: &'a mut TDT,
set: ValidatorSet, set: ValidatorSet,
validators: &'a [SeraiAddress], validators: &'a [SeraiAddress],
total_weight: u64, total_weight: u64,
validator_weights: &'a HashMap<SeraiAddress, u64>, validator_weights: &'a HashMap<SeraiAddress, u64>,
tributary: &'a TributaryReader<TD, Transaction>, tributary: &'a TributaryReader<TD, Transaction>,
} }
impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> { impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> {
fn potentially_start_cosign(&mut self) { fn potentially_start_cosign(&mut self) {
// Don't start a new cosigning instance if we're actively running one // Don't start a new cosigning instance if we're actively running one
if TributaryDb::actively_cosigning(self.txn, self.set) { if TributaryDb::actively_cosigning(self.tributary_txn, self.set) {
return; return;
} }
// Start cosigning the latest intended-to-be-cosigned block // Start cosigning the latest intended-to-be-cosigned block
let Some(latest_substrate_block_to_cosign) = let Some(latest_substrate_block_to_cosign) =
TributaryDb::latest_substrate_block_to_cosign(self.txn, self.set) TributaryDb::latest_substrate_block_to_cosign(self.tributary_txn, self.set)
else { else {
return; return;
}; };
let Some(substrate_block_number) = let Some(substrate_block_number) =
Cosigning::<D>::finalized_block_number(self.txn, latest_substrate_block_to_cosign) Cosigning::<CD>::finalized_block_number(self.cosign_db, latest_substrate_block_to_cosign)
else { else {
// This is a valid panic as we shouldn't be scanning this block if we didn't provide all
// Provided transactions within it, and the block to cosign is a Provided transaction
panic!("cosigning a block our cosigner didn't index") panic!("cosigning a block our cosigner didn't index")
}; };
// Mark us as actively cosigning // Mark us as actively cosigning
TributaryDb::start_cosigning(self.txn, self.set, substrate_block_number); TributaryDb::start_cosigning(self.tributary_txn, self.set, substrate_block_number);
// Send the message for the processor to start signing // Send the message for the processor to start signing
TributaryDb::send_message( TributaryDb::send_message(
self.txn, self.tributary_txn,
self.set, self.set,
messages::coordinator::CoordinatorMessage::CosignSubstrateBlock { messages::coordinator::CoordinatorMessage::CosignSubstrateBlock {
session: self.set.session, session: self.set.session,
@@ -81,7 +83,11 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
if let TransactionKind::Signed(_, TributarySigned { signer, .. }) = tx.kind() { if let TransactionKind::Signed(_, TributarySigned { signer, .. }) = tx.kind() {
// Don't handle transactions from those fatally slashed // Don't handle transactions from those fatally slashed
// TODO: The fact they can publish these TXs makes this a notable spam vector // TODO: The fact they can publish these TXs makes this a notable spam vector
if TributaryDb::is_fatally_slashed(self.txn, self.set, SeraiAddress(signer.to_bytes())) { if TributaryDb::is_fatally_slashed(
self.tributary_txn,
self.set,
SeraiAddress(signer.to_bytes()),
) {
return; return;
} }
} }
@@ -94,7 +100,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
// Check the participant voted to be removed actually exists // Check the participant voted to be removed actually exists
if !self.validators.iter().any(|validator| *validator == participant) { if !self.validators.iter().any(|validator| *validator == participant) {
TributaryDb::fatal_slash( TributaryDb::fatal_slash(
self.txn, self.tributary_txn,
self.set, self.set,
signer, signer,
"voted to remove non-existent participant", "voted to remove non-existent participant",
@@ -103,7 +109,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
} }
match TributaryDb::accumulate( match TributaryDb::accumulate(
self.txn, self.tributary_txn,
self.set, self.set,
self.validators, self.validators,
self.total_weight, self.total_weight,
@@ -115,7 +121,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
) { ) {
DataSet::None => {} DataSet::None => {}
DataSet::Participating(_) => { DataSet::Participating(_) => {
TributaryDb::fatal_slash(self.txn, self.set, participant, "voted to remove"); TributaryDb::fatal_slash(self.tributary_txn, self.set, participant, "voted to remove");
} }
}; };
} }
@@ -123,7 +129,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
// Send the participation to the processor // Send the participation to the processor
Transaction::DkgParticipation { participation, signed } => { Transaction::DkgParticipation { participation, signed } => {
TributaryDb::send_message( TributaryDb::send_message(
self.txn, self.tributary_txn,
self.set, self.set,
messages::key_gen::CoordinatorMessage::Participation { messages::key_gen::CoordinatorMessage::Participation {
session: self.set.session, session: self.set.session,
@@ -143,16 +149,20 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
Transaction::Cosign { substrate_block_hash } => { Transaction::Cosign { substrate_block_hash } => {
// Update the latest intended-to-be-cosigned Substrate block // Update the latest intended-to-be-cosigned Substrate block
TributaryDb::set_latest_substrate_block_to_cosign(self.txn, self.set, substrate_block_hash); TributaryDb::set_latest_substrate_block_to_cosign(
self.tributary_txn,
self.set,
substrate_block_hash,
);
// Start a new cosign if we weren't already working on one // Start a new cosign if we weren't already working on one
self.potentially_start_cosign(); self.potentially_start_cosign();
} }
Transaction::Cosigned { substrate_block_hash } => { Transaction::Cosigned { substrate_block_hash } => {
TributaryDb::finish_cosigning(self.txn, self.set); TributaryDb::finish_cosigning(self.tributary_txn, self.set);
// Fetch the latest intended-to-be-cosigned block // Fetch the latest intended-to-be-cosigned block
let Some(latest_substrate_block_to_cosign) = let Some(latest_substrate_block_to_cosign) =
TributaryDb::latest_substrate_block_to_cosign(self.txn, self.set) TributaryDb::latest_substrate_block_to_cosign(self.tributary_txn, self.set)
else { else {
return; return;
}; };
@@ -178,7 +188,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
if slash_points.len() != self.validators.len() { if slash_points.len() != self.validators.len() {
TributaryDb::fatal_slash( TributaryDb::fatal_slash(
self.txn, self.tributary_txn,
self.set, self.set,
signer, signer,
"slash report was for a distinct amount of signers", "slash report was for a distinct amount of signers",
@@ -188,7 +198,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
// Accumulate, and if past the threshold, calculate *the* slash report and start signing it // Accumulate, and if past the threshold, calculate *the* slash report and start signing it
match TributaryDb::accumulate( match TributaryDb::accumulate(
self.txn, self.tributary_txn,
self.set, self.set,
self.validators, self.validators,
self.total_weight, self.total_weight,
@@ -260,7 +270,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
// Recognize the topic for signing the slash report // Recognize the topic for signing the slash report
TributaryDb::recognize_topic( TributaryDb::recognize_topic(
self.txn, self.tributary_txn,
self.set, self.set,
Topic::Sign { Topic::Sign {
id: VariantSignId::SlashReport, id: VariantSignId::SlashReport,
@@ -270,7 +280,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
); );
// Send the message for the processor to start signing // Send the message for the processor to start signing
TributaryDb::send_message( TributaryDb::send_message(
self.txn, self.tributary_txn,
self.set, self.set,
messages::coordinator::CoordinatorMessage::SignSlashReport { messages::coordinator::CoordinatorMessage::SignSlashReport {
session: self.set.session, session: self.set.session,
@@ -287,7 +297,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
if u64::try_from(data.len()).unwrap() != self.validator_weights[&signer] { if u64::try_from(data.len()).unwrap() != self.validator_weights[&signer] {
TributaryDb::fatal_slash( TributaryDb::fatal_slash(
self.txn, self.tributary_txn,
self.set, self.set,
signer, signer,
"signer signed with a distinct amount of key shares than they had key shares", "signer signed with a distinct amount of key shares than they had key shares",
@@ -296,7 +306,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
} }
match TributaryDb::accumulate( match TributaryDb::accumulate(
self.txn, self.tributary_txn,
self.set, self.set,
self.validators, self.validators,
self.total_weight, self.total_weight,
@@ -312,7 +322,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
let flatten_data_set = |data_set| todo!("TODO"); let flatten_data_set = |data_set| todo!("TODO");
let data_set = flatten_data_set(data_set); let data_set = flatten_data_set(data_set);
TributaryDb::send_message( TributaryDb::send_message(
self.txn, self.tributary_txn,
self.set, self.set,
match round { match round {
SigningProtocolRound::Preprocess => { SigningProtocolRound::Preprocess => {
@@ -330,7 +340,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
} }
fn handle_block(mut self, block_number: u64, block: Block<Transaction>) { fn handle_block(mut self, block_number: u64, block: Block<Transaction>) {
TributaryDb::start_of_block(self.txn, self.set, block_number); TributaryDb::start_of_block(self.tributary_txn, self.set, block_number);
for tx in block.transactions { for tx in block.transactions {
match tx { match tx {
@@ -356,7 +366,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
// Since anything with evidence is fundamentally faulty behavior, not just temporal // Since anything with evidence is fundamentally faulty behavior, not just temporal
// errors, mark the node as fatally slashed // errors, mark the node as fatally slashed
TributaryDb::fatal_slash( TributaryDb::fatal_slash(
self.txn, self.tributary_txn,
self.set, self.set,
SeraiAddress(msgs.0.msg.sender), SeraiAddress(msgs.0.msg.sender),
&format!("invalid tendermint messages: {msgs:?}"), &format!("invalid tendermint messages: {msgs:?}"),
@@ -370,20 +380,21 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
} }
} }
struct ScanTributaryTask<D: Db, TD: Db, P: P2p> { pub(crate) struct ScanTributaryTask<CD: Db, TD: Db, P: P2p> {
db: D, pub(crate) cosign_db: CD,
set: ValidatorSet, pub(crate) tributary_db: TD,
validators: Vec<SeraiAddress>, pub(crate) set: ValidatorSet,
total_weight: u64, pub(crate) validators: Vec<SeraiAddress>,
validator_weights: HashMap<SeraiAddress, u64>, pub(crate) total_weight: u64,
tributary: TributaryReader<TD, Transaction>, pub(crate) validator_weights: HashMap<SeraiAddress, u64>,
_p2p: PhantomData<P>, pub(crate) tributary: TributaryReader<TD, Transaction>,
pub(crate) _p2p: PhantomData<P>,
} }
impl<D: Db, TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<D, TD, P> { impl<CD: Db, TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<CD, TD, P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move { async move {
let (mut last_block_number, mut last_block_hash) = let (mut last_block_number, mut last_block_hash) =
TributaryDb::last_handled_tributary_block(&self.db, self.set) TributaryDb::last_handled_tributary_block(&self.tributary_db, self.set)
.unwrap_or((0, self.tributary.genesis())); .unwrap_or((0, self.tributary.genesis()));
let mut made_progess = false; let mut made_progess = false;
@@ -407,11 +418,11 @@ impl<D: Db, TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<D, TD, P> {
} }
} }
let mut txn = self.db.txn(); let mut tributary_txn = self.tributary_db.txn();
(ScanBlock { (ScanBlock {
_db: PhantomData::<D>,
_p2p: PhantomData::<P>, _p2p: PhantomData::<P>,
txn: &mut txn, cosign_db: &self.cosign_db,
tributary_txn: &mut tributary_txn,
set: self.set, set: self.set,
validators: &self.validators, validators: &self.validators,
total_weight: self.total_weight, total_weight: self.total_weight,
@@ -419,10 +430,15 @@ impl<D: Db, TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<D, TD, P> {
tributary: &self.tributary, tributary: &self.tributary,
}) })
.handle_block(block_number, block); .handle_block(block_number, block);
TributaryDb::set_last_handled_tributary_block(&mut txn, self.set, block_number, block_hash); TributaryDb::set_last_handled_tributary_block(
&mut tributary_txn,
self.set,
block_number,
block_hash,
);
last_block_number = block_number; last_block_number = block_number;
last_block_hash = block_hash; last_block_hash = block_hash;
txn.commit(); tributary_txn.commit();
made_progess = true; made_progess = true;
} }