From 5897efd7c770be1ac8a62adcf778bb3116e2235a Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sat, 14 Oct 2023 16:09:24 -0400 Subject: [PATCH] Clean out create_new_tributary It made sense when the task was in main.rs. Now that it isn't, it's a pointless indirection. --- coordinator/src/substrate/mod.rs | 56 +++++++++++++------------------- 1 file changed, 22 insertions(+), 34 deletions(-) diff --git a/coordinator/src/substrate/mod.rs b/coordinator/src/substrate/mod.rs index 12673782..49b1d1a5 100644 --- a/coordinator/src/substrate/mod.rs +++ b/coordinator/src/substrate/mod.rs @@ -47,10 +47,10 @@ async fn in_set( Ok(Some(participants.iter().any(|participant| participant.0 == key))) } -async fn handle_new_set( - db: &mut D, +async fn handle_new_set( + txn: &mut D::Transaction<'_>, key: &Zeroizing<::F>, - create_new_tributary: CNT, + new_tributary_spec: &mpsc::UnboundedSender, serai: &Serai, block: &Block, set: ValidatorSet, @@ -107,7 +107,18 @@ async fn handle_new_set( let time = time + SUBSTRATE_TO_TRIBUTARY_TIME_DELAY; let spec = TributarySpec::new(block.hash(), time, set, set_data); - create_new_tributary(db, spec.clone()); + + log::info!("creating new tributary for {:?}", spec.set()); + + // Save it to the database now, not on the channel receiver's side, so this is safe against + // reboots + // If this txn finishes, and we reboot, then this'll be reloaded from active Tributaries + // If this txn doesn't finish, this will be re-fired + // If we waited to save to the DB, this txn may be finished, preventing re-firing, yet the + // prior fired event may have not been received yet + crate::MainDb::::add_active_tributary(txn, &spec); + + new_tributary_spec.send(spec).unwrap(); } else { log::info!("not present in set {:?}", set); } @@ -245,11 +256,10 @@ async fn handle_batch_and_burns( // Handle a specific Substrate block, returning an error when it fails to get data // (not blocking / holding) -#[allow(clippy::needless_pass_by_ref_mut)] // False positive? -async fn handle_block( +async fn handle_block( db: &mut SubstrateDb, key: &Zeroizing<::F>, - create_new_tributary: CNT, + new_tributary_spec: &mpsc::UnboundedSender, processors: &Pro, serai: &Serai, block: Block, @@ -277,8 +287,8 @@ async fn handle_block::handled_event(&db.0, hash, event_id) { log::info!("found fresh new set event {:?}", new_set); - handle_new_set(&mut db.0, key, create_new_tributary.clone(), serai, &block, set).await?; let mut txn = db.0.txn(); + handle_new_set::(&mut txn, key, new_tributary_spec, serai, &block, set).await?; SubstrateDb::::handle_event(&mut txn, hash, event_id); txn.commit(); } @@ -322,10 +332,10 @@ async fn handle_block( +async fn handle_new_blocks( db: &mut SubstrateDb, key: &Zeroizing<::F>, - create_new_tributary: CNT, + new_tributary_spec: &mpsc::UnboundedSender, processors: &Pro, serai: &Serai, next_block: &mut u64, @@ -343,7 +353,7 @@ async fn handle_new_blocks( match handle_new_blocks( &mut db, &key, - |db: &mut D, spec: TributarySpec| { - log::info!("creating new tributary for {:?}", spec.set()); - - // Check it isn't already present in the DB due to rescanning this block upon reboot - for existing_spec in crate::MainDb::::active_tributaries(db).1 { - if spec.set() == existing_spec.set() { - log::warn!( - "already created tributary {:?}, this should only happen on reboot", - spec.set() - ); - return; - } - } - - // Save it to the database - let mut txn = db.txn(); - crate::MainDb::::add_active_tributary(&mut txn, &spec); - txn.commit(); - - // If we reboot before this is read, the fact it was saved to the database means it'll be - // handled on reboot - new_tributary_spec.send(spec).unwrap(); - }, + &new_tributary_spec, &processors, &serai, &mut next_substrate_block,