mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-08 12:19:24 +00:00
Convert coordinator/substrate/db to use create_db macro (#436)
* chore: implement create_db for substrate (fix broken branch) * Correct rebase artifacts * chore: remove todo statement * chore: rename BlockDb to NextBlock * chore: return empty tuple instead of empty array for event storage * Finish rebasing * .Minor tweaks to remove leftover variables These may be rebase artifacts. --------- Co-authored-by: Luke Parker <lukeparker5132@gmail.com>
This commit is contained in:
@@ -178,7 +178,7 @@ async fn handle_batch_and_burns<D: Db, Pro: Processors>(
|
||||
network_had_event(&mut burns, &mut batches, network);
|
||||
|
||||
let mut txn = db.txn();
|
||||
SubstrateDb::<D>::save_batch_instructions_hash(&mut txn, network, id, instructions_hash);
|
||||
BatchInstructionsHashDb::set(&mut txn, network, id, &instructions_hash);
|
||||
txn.commit();
|
||||
|
||||
// Make sure this is the only Batch event for this network in this Block
|
||||
@@ -239,7 +239,7 @@ async fn handle_batch_and_burns<D: Db, Pro: Processors>(
|
||||
// Handle a specific Substrate block, returning an error when it fails to get data
|
||||
// (not blocking / holding)
|
||||
async fn handle_block<D: Db, Pro: Processors>(
|
||||
db: &mut SubstrateDb<D>,
|
||||
db: &mut D,
|
||||
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
|
||||
new_tributary_spec: &mpsc::UnboundedSender<TributarySpec>,
|
||||
tributary_retired: &mpsc::UnboundedSender<ValidatorSet>,
|
||||
@@ -268,11 +268,11 @@ async fn handle_block<D: Db, Pro: Processors>(
|
||||
continue;
|
||||
}
|
||||
|
||||
if !SubstrateDb::<D>::handled_event(&db.0, hash, event_id) {
|
||||
if EventDb::is_unhandled(db, &hash, event_id) {
|
||||
log::info!("found fresh new set event {:?}", new_set);
|
||||
let mut txn = db.0.txn();
|
||||
let mut txn = db.txn();
|
||||
handle_new_set::<D>(&mut txn, key, new_tributary_spec, serai, &block, set).await?;
|
||||
SubstrateDb::<D>::handle_event(&mut txn, hash, event_id);
|
||||
EventDb::handle_event(&mut txn, &hash, event_id);
|
||||
txn.commit();
|
||||
}
|
||||
event_id += 1;
|
||||
@@ -280,15 +280,15 @@ async fn handle_block<D: Db, Pro: Processors>(
|
||||
|
||||
// If a key pair was confirmed, inform the processor
|
||||
for key_gen in serai.as_of(hash).validator_sets().key_gen_events().await? {
|
||||
if !SubstrateDb::<D>::handled_event(&db.0, hash, event_id) {
|
||||
if EventDb::is_unhandled(db, &hash, event_id) {
|
||||
log::info!("found fresh key gen event {:?}", key_gen);
|
||||
if let ValidatorSetsEvent::KeyGen { set, key_pair } = key_gen {
|
||||
handle_key_gen(processors, serai, &block, set, key_pair).await?;
|
||||
} else {
|
||||
panic!("KeyGen event wasn't KeyGen: {key_gen:?}");
|
||||
}
|
||||
let mut txn = db.0.txn();
|
||||
SubstrateDb::<D>::handle_event(&mut txn, hash, event_id);
|
||||
let mut txn = db.txn();
|
||||
EventDb::handle_event(&mut txn, &hash, event_id);
|
||||
txn.commit();
|
||||
}
|
||||
event_id += 1;
|
||||
@@ -303,12 +303,12 @@ async fn handle_block<D: Db, Pro: Processors>(
|
||||
continue;
|
||||
}
|
||||
|
||||
if !SubstrateDb::<D>::handled_event(&db.0, hash, event_id) {
|
||||
if EventDb::is_unhandled(db, &hash, event_id) {
|
||||
log::info!("found fresh set retired event {:?}", retired_set);
|
||||
let mut txn = db.0.txn();
|
||||
let mut txn = db.txn();
|
||||
crate::ActiveTributaryDb::retire_tributary(&mut txn, set);
|
||||
tributary_retired.send(set).unwrap();
|
||||
SubstrateDb::<D>::handle_event(&mut txn, hash, event_id);
|
||||
EventDb::handle_event(&mut txn, &hash, event_id);
|
||||
txn.commit();
|
||||
}
|
||||
event_id += 1;
|
||||
@@ -319,18 +319,18 @@ async fn handle_block<D: Db, Pro: Processors>(
|
||||
// following events share data collection
|
||||
// This does break the uniqueness of (hash, event_id) -> one event, yet
|
||||
// (network, (hash, event_id)) remains valid as a unique ID for an event
|
||||
if !SubstrateDb::<D>::handled_event(&db.0, hash, event_id) {
|
||||
handle_batch_and_burns(&mut db.0, processors, serai, &block).await?;
|
||||
if EventDb::is_unhandled(db, &hash, event_id) {
|
||||
handle_batch_and_burns(db, processors, serai, &block).await?;
|
||||
}
|
||||
let mut txn = db.0.txn();
|
||||
SubstrateDb::<D>::handle_event(&mut txn, hash, event_id);
|
||||
let mut txn = db.txn();
|
||||
EventDb::handle_event(&mut txn, &hash, event_id);
|
||||
txn.commit();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_new_blocks<D: Db, Pro: Processors>(
|
||||
db: &mut SubstrateDb<D>,
|
||||
db: &mut D,
|
||||
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
|
||||
new_tributary_spec: &mpsc::UnboundedSender<TributarySpec>,
|
||||
tributary_retired: &mpsc::UnboundedSender<ValidatorSet>,
|
||||
@@ -394,7 +394,7 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
|
||||
}
|
||||
}
|
||||
|
||||
let mut txn = db.0.txn();
|
||||
let mut txn = db.txn();
|
||||
let Some((last_intended_to_cosign_block, mut skipped_block)) = IntendedCosign::get(&txn) else {
|
||||
IntendedCosign::set_intended_cosign(&mut txn, 1);
|
||||
txn.commit();
|
||||
@@ -506,7 +506,7 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
|
||||
// cosigned
|
||||
if let Some(has_no_cosigners) = has_no_cosigners {
|
||||
log::debug!("{} had no cosigners available, marking as cosigned", has_no_cosigners.number());
|
||||
SubstrateDb::<D>::set_latest_cosigned_block(&mut txn, has_no_cosigners.number());
|
||||
LatestCosignedBlock::set(&mut txn, &has_no_cosigners.number());
|
||||
} else {
|
||||
CosignTriggered::set(&mut txn, &());
|
||||
for (set, block, hash) in cosign {
|
||||
@@ -518,7 +518,7 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
|
||||
}
|
||||
|
||||
// Reduce to the latest cosigned block
|
||||
let latest_number = latest_number.min(SubstrateDb::<D>::latest_cosigned_block(&db.0));
|
||||
let latest_number = latest_number.min(LatestCosignedBlock::latest_cosigned_block(db));
|
||||
|
||||
if latest_number < *next_block {
|
||||
return Ok(());
|
||||
@@ -540,7 +540,9 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
|
||||
)
|
||||
.await?;
|
||||
*next_block += 1;
|
||||
db.set_next_block(*next_block);
|
||||
let mut txn = db.txn();
|
||||
NextBlock::set(&mut txn, next_block);
|
||||
txn.commit();
|
||||
log::info!("handled substrate block {b}");
|
||||
}
|
||||
|
||||
@@ -548,7 +550,7 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
|
||||
}
|
||||
|
||||
pub async fn scan_task<D: Db, Pro: Processors>(
|
||||
db: D,
|
||||
mut db: D,
|
||||
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
|
||||
processors: Pro,
|
||||
serai: Arc<Serai>,
|
||||
@@ -556,9 +558,7 @@ pub async fn scan_task<D: Db, Pro: Processors>(
|
||||
tributary_retired: mpsc::UnboundedSender<ValidatorSet>,
|
||||
) {
|
||||
log::info!("scanning substrate");
|
||||
|
||||
let mut db = SubstrateDb::new(db);
|
||||
let mut next_substrate_block = db.next_block();
|
||||
let mut next_substrate_block = NextBlock::get(&db).unwrap_or_default();
|
||||
|
||||
/*
|
||||
let new_substrate_block_notifier = {
|
||||
@@ -680,7 +680,7 @@ pub(crate) async fn verify_published_batches<D: Db>(
|
||||
// TODO: Localize from MainDb to SubstrateDb
|
||||
let last = crate::LastVerifiedBatchDb::get(txn, network);
|
||||
for id in last.map(|last| last + 1).unwrap_or(0) ..= optimistic_up_to {
|
||||
let Some(on_chain) = SubstrateDb::<D>::batch_instructions_hash(txn, network, id) else {
|
||||
let Some(on_chain) = BatchInstructionsHashDb::get(txn, network, id) else {
|
||||
break;
|
||||
};
|
||||
let off_chain = crate::ExpectedBatchDb::get(txn, network, id).unwrap();
|
||||
|
||||
Reference in New Issue
Block a user