diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index f92002d6..53bb9030 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -7,7 +7,7 @@ use serai_db::{Get, DbTxn, Db}; use serai_primitives::{NetworkId, Coin, Amount}; use serai_in_instructions_primitives::Batch; -use serai_coins_primitives::{OutInstruction, OutInstructionWithBalance}; +use serai_coins_primitives::OutInstructionWithBalance; use primitives::{task::*, Address, ReceivedOutput, Block}; @@ -17,15 +17,16 @@ pub use lifetime::LifetimeStage; // Database schema definition and associated functions. mod db; -use db::{ScannerGlobalDb, SubstrateToEventualityDb}; // Task to index the blockchain, ensuring we don't reorganize finalized blocks. mod index; // Scans blocks for received coins. mod scan; +/// Task which reports Batches to Substrate. +mod report; +/// Task which handles events from Substrate once we can. +mod substrate; /// Check blocks for transactions expected to eventually occur. mod eventuality; -/// Task which reports `Batch`s to Substrate. -mod report; pub(crate) fn sort_outputs>( a: &O, @@ -280,7 +281,7 @@ pub trait Scheduler: 'static + Send { /// A representation of a scanner. #[allow(non_snake_case)] pub struct Scanner { - eventuality_handle: RunNowHandle, + substrate_handle: RunNowHandle, _S: PhantomData, } impl Scanner { @@ -297,24 +298,29 @@ impl Scanner { let index_task = index::IndexTask::new(db.clone(), feed.clone(), start_block).await; let scan_task = scan::ScanTask::new(db.clone(), feed.clone(), start_block); let report_task = report::ReportTask::<_, S, _>::new(db.clone(), batch_publisher, start_block); + let substrate_task = substrate::SubstrateTask::<_, S>::new(db.clone()); let eventuality_task = eventuality::EventualityTask::new(db, feed, scheduler, start_block); let (_index_handle, index_run) = RunNowHandle::new(); let (scan_handle, scan_run) = RunNowHandle::new(); let (report_handle, report_run) = RunNowHandle::new(); + let (substrate_handle, substrate_run) = RunNowHandle::new(); let (eventuality_handle, eventuality_run) = RunNowHandle::new(); // Upon indexing a new block, scan it tokio::spawn(index_task.continually_run(index_run, vec![scan_handle.clone()])); // Upon scanning a block, report it tokio::spawn(scan_task.continually_run(scan_run, vec![report_handle])); - // Upon reporting a block, we do nothing + // Upon reporting a block, we do nothing (as the burden is on Substrate which won't be + // immediately ready) tokio::spawn(report_task.continually_run(report_run, vec![])); + // Upon handling an event from Substrate, we run the Eventuality task (as it's what's affected) + tokio::spawn(substrate_task.continually_run(substrate_run, vec![eventuality_handle])); // Upon handling the Eventualities in a block, we run the scan task as we've advanced the // window its allowed to scan tokio::spawn(eventuality_task.continually_run(eventuality_run, vec![scan_handle])); - Self { eventuality_handle, _S: PhantomData } + Self { substrate_handle, _S: PhantomData } } /// Acknowledge a Batch having been published on Serai. @@ -335,80 +341,23 @@ impl Scanner { mut txn: impl DbTxn, batch_id: u32, in_instruction_succeededs: Vec, - mut burns: Vec, + burns: Vec, key_to_activate: Option>, ) { log::info!("acknowledging batch {batch_id}"); - // TODO: We need to take all of these arguments and send them to a task - // Then, when we do have this block number, we need to execute this function - let block_number = report::take_block_number_for_batch::(&mut txn, batch_id) - .expect("didn't have the block number for a Batch"); - - assert!( - ScannerGlobalDb::::is_block_notable(&txn, block_number), - "acknowledging a block which wasn't notable" + // Queue acknowledging this block via the Substrate task + substrate::queue_acknowledge_batch::( + &mut txn, + batch_id, + in_instruction_succeededs, + burns, + key_to_activate, ); - if let Some(prior_highest_acknowledged_block) = - ScannerGlobalDb::::highest_acknowledged_block(&txn) - { - // If a single block produced multiple Batches, the block number won't increment - assert!( - block_number >= prior_highest_acknowledged_block, - "acknowledging blocks out-of-order" - ); - for b in (prior_highest_acknowledged_block + 1) .. block_number { - assert!( - !ScannerGlobalDb::::is_block_notable(&txn, b), - "skipped acknowledging a block which was notable" - ); - } - } - - ScannerGlobalDb::::set_highest_acknowledged_block(&mut txn, block_number); - if let Some(key_to_activate) = key_to_activate { - ScannerGlobalDb::::queue_key(&mut txn, block_number + S::WINDOW_LENGTH, key_to_activate); - } - - // Return the balances for any InInstructions which failed to execute - { - let return_information = report::take_return_information::(&mut txn, batch_id) - .expect("didn't save the return information for Batch we published"); - assert_eq!( - in_instruction_succeededs.len(), - return_information.len(), - "amount of InInstruction succeededs differed from amount of return information saved" - ); - - // We map these into standard Burns - for (succeeded, return_information) in - in_instruction_succeededs.into_iter().zip(return_information) - { - if succeeded { - continue; - } - - if let Some(report::ReturnInformation { address, balance }) = return_information { - burns.push(OutInstructionWithBalance { - instruction: OutInstruction { address: address.into(), data: None }, - balance, - }); - } - } - } - - if !burns.is_empty() { - // We send these Burns as stemming from this block we just acknowledged - // This causes them to be acted on after we accumulate the outputs from this block - SubstrateToEventualityDb::send_burns(&mut txn, block_number, &burns); - } - - // Commit the txn + // Commit this txn so this data is flushed txn.commit(); - // Run the Eventuality task since we've advanced it - // We couldn't successfully do this if that txn was still floating around, uncommitted - // The execution of this task won't actually have more work until the txn is committed - self.eventuality_handle.run_now(); + // Then run the Substrate task + self.substrate_handle.run_now(); } /// Queue Burns. @@ -442,14 +391,16 @@ impl Scanner { latency and likely practically require we add regularly scheduled notable blocks (which may be unnecessary). */ - pub fn queue_burns(&mut self, txn: &mut impl DbTxn, burns: &Vec) { + pub fn queue_burns(&mut self, mut txn: impl DbTxn, burns: Vec) { if burns.is_empty() { return; } - let queue_as_of = ScannerGlobalDb::::highest_acknowledged_block(txn) - .expect("queueing Burns yet never acknowledged a block"); - - SubstrateToEventualityDb::send_burns(txn, queue_as_of, burns) + // Queue queueing these burns via the Substrate task + substrate::queue_queue_burns::(&mut txn, burns); + // Commit this txn so this data is flushed + txn.commit(); + // Then run the Substrate task + self.substrate_handle.run_now(); } } diff --git a/processor/scanner/src/substrate/db.rs b/processor/scanner/src/substrate/db.rs new file mode 100644 index 00000000..697897c2 --- /dev/null +++ b/processor/scanner/src/substrate/db.rs @@ -0,0 +1,89 @@ +use core::marker::PhantomData; + +use group::GroupEncoding; + +use borsh::{BorshSerialize, BorshDeserialize}; +use serai_db::{Get, DbTxn, create_db, db_channel}; + +use serai_coins_primitives::OutInstructionWithBalance; + +use crate::{ScannerFeed, KeyFor}; + +#[derive(BorshSerialize, BorshDeserialize)] +struct AcknowledgeBatchEncodable { + batch_id: u32, + in_instruction_succeededs: Vec, + burns: Vec, + key_to_activate: Option>, +} + +#[derive(BorshSerialize, BorshDeserialize)] +enum ActionEncodable { + AcknowledgeBatch(AcknowledgeBatchEncodable), + QueueBurns(Vec), +} + +pub(crate) struct AcknowledgeBatch { + pub(crate) batch_id: u32, + pub(crate) in_instruction_succeededs: Vec, + pub(crate) burns: Vec, + pub(crate) key_to_activate: Option>, +} + +pub(crate) enum Action { + AcknowledgeBatch(AcknowledgeBatch), + QueueBurns(Vec), +} + +db_channel!( + ScannerSubstrate { + Actions: (empty_key: ()) -> ActionEncodable, + } +); + +pub(crate) struct SubstrateDb(PhantomData); +impl SubstrateDb { + pub(crate) fn queue_acknowledge_batch( + txn: &mut impl DbTxn, + batch_id: u32, + in_instruction_succeededs: Vec, + burns: Vec, + key_to_activate: Option>, + ) { + Actions::send( + txn, + (), + &ActionEncodable::AcknowledgeBatch(AcknowledgeBatchEncodable { + batch_id, + in_instruction_succeededs, + burns, + key_to_activate: key_to_activate.map(|key| key.to_bytes().as_ref().to_vec()), + }), + ); + } + pub(crate) fn queue_queue_burns(txn: &mut impl DbTxn, burns: Vec) { + Actions::send(txn, (), &ActionEncodable::QueueBurns(burns)); + } + + pub(crate) fn next_action(txn: &mut impl DbTxn) -> Option> { + let action_encodable = Actions::try_recv(txn, ())?; + Some(match action_encodable { + ActionEncodable::AcknowledgeBatch(AcknowledgeBatchEncodable { + batch_id, + in_instruction_succeededs, + burns, + key_to_activate, + }) => Action::AcknowledgeBatch(AcknowledgeBatch { + batch_id, + in_instruction_succeededs, + burns, + key_to_activate: key_to_activate.map(|key| { + let mut repr = as GroupEncoding>::Repr::default(); + repr.as_mut().copy_from_slice(&key); + KeyFor::::from_bytes(&repr).unwrap() + }), + }), + ActionEncodable::QueueBurns(burns) => Action::QueueBurns(burns), + }) + } +} diff --git a/processor/scanner/src/substrate/mod.rs b/processor/scanner/src/substrate/mod.rs new file mode 100644 index 00000000..4feb85d5 --- /dev/null +++ b/processor/scanner/src/substrate/mod.rs @@ -0,0 +1,162 @@ +use core::marker::PhantomData; + +use serai_db::{DbTxn, Db}; + +use serai_coins_primitives::{OutInstruction, OutInstructionWithBalance}; + +use primitives::task::ContinuallyRan; +use crate::{ + db::{ScannerGlobalDb, SubstrateToEventualityDb}, + report, ScannerFeed, KeyFor, +}; + +mod db; +use db::*; + +pub(crate) fn queue_acknowledge_batch( + txn: &mut impl DbTxn, + batch_id: u32, + in_instruction_succeededs: Vec, + burns: Vec, + key_to_activate: Option>, +) { + SubstrateDb::::queue_acknowledge_batch( + txn, + batch_id, + in_instruction_succeededs, + burns, + key_to_activate, + ) +} +pub(crate) fn queue_queue_burns( + txn: &mut impl DbTxn, + burns: Vec, +) { + SubstrateDb::::queue_queue_burns(txn, burns) +} + +/* + When Serai acknowledges a Batch, we can only handle it once we've scanned the chain and generated + the same Batch ourselves. This takes the `acknowledge_batch`, `queue_burns` arguments and sits on + them until we're able to process them. +*/ +#[allow(non_snake_case)] +pub(crate) struct SubstrateTask { + db: D, + _S: PhantomData, +} + +impl SubstrateTask { + pub(crate) fn new(db: D) -> Self { + Self { db, _S: PhantomData } + } +} + +#[async_trait::async_trait] +impl ContinuallyRan for SubstrateTask { + async fn run_iteration(&mut self) -> Result { + let mut made_progress = false; + loop { + // Fetch the next action to handle + let mut txn = self.db.txn(); + let Some(action) = SubstrateDb::::next_action(&mut txn) else { + drop(txn); + return Ok(made_progress); + }; + + match action { + Action::AcknowledgeBatch(AcknowledgeBatch { + batch_id, + in_instruction_succeededs, + mut burns, + key_to_activate, + }) => { + // Check if we have the information for this batch + let Some(block_number) = report::take_block_number_for_batch::(&mut txn, batch_id) + else { + // If we don't, drop this txn (restoring the action to the database) + drop(txn); + return Ok(made_progress); + }; + + // Mark we made progress and handle this + made_progress = true; + + assert!( + ScannerGlobalDb::::is_block_notable(&txn, block_number), + "acknowledging a block which wasn't notable" + ); + if let Some(prior_highest_acknowledged_block) = + ScannerGlobalDb::::highest_acknowledged_block(&txn) + { + // If a single block produced multiple Batches, the block number won't increment + assert!( + block_number >= prior_highest_acknowledged_block, + "acknowledging blocks out-of-order" + ); + for b in (prior_highest_acknowledged_block + 1) .. block_number { + assert!( + !ScannerGlobalDb::::is_block_notable(&txn, b), + "skipped acknowledging a block which was notable" + ); + } + } + + ScannerGlobalDb::::set_highest_acknowledged_block(&mut txn, block_number); + if let Some(key_to_activate) = key_to_activate { + ScannerGlobalDb::::queue_key( + &mut txn, + block_number + S::WINDOW_LENGTH, + key_to_activate, + ); + } + + // Return the balances for any InInstructions which failed to execute + { + let return_information = report::take_return_information::(&mut txn, batch_id) + .expect("didn't save the return information for Batch we published"); + assert_eq!( + in_instruction_succeededs.len(), + return_information.len(), + "amount of InInstruction succeededs differed from amount of return information saved" + ); + + // We map these into standard Burns + for (succeeded, return_information) in + in_instruction_succeededs.into_iter().zip(return_information) + { + if succeeded { + continue; + } + + if let Some(report::ReturnInformation { address, balance }) = return_information { + burns.push(OutInstructionWithBalance { + instruction: OutInstruction { address: address.into(), data: None }, + balance, + }); + } + } + } + + if !burns.is_empty() { + // We send these Burns as stemming from this block we just acknowledged + // This causes them to be acted on after we accumulate the outputs from this block + SubstrateToEventualityDb::send_burns(&mut txn, block_number, &burns); + } + } + + Action::QueueBurns(burns) => { + // We can instantly handle this so long as we've handled all prior actions + made_progress = true; + + let queue_as_of = ScannerGlobalDb::::highest_acknowledged_block(&txn) + .expect("queueing Burns yet never acknowledged a block"); + + SubstrateToEventualityDb::send_burns(&mut txn, queue_as_of, &burns); + } + } + + txn.commit(); + } + } +}