diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs index 7a2d68a9..59af768f 100644 --- a/processor/scanner/src/db.rs +++ b/processor/scanner/src/db.rs @@ -7,7 +7,7 @@ use serai_db::{Get, DbTxn, create_db, db_channel}; use serai_in_instructions_primitives::InInstructionWithBalance; -use primitives::{ReceivedOutput, EncodableG}; +use primitives::{EncodableG, ReceivedOutput}; use crate::{ lifetime::LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor, Return, diff --git a/processor/scanner/src/index/mod.rs b/processor/scanner/src/index/mod.rs index 7c70eedc..930ce55a 100644 --- a/processor/scanner/src/index/mod.rs +++ b/processor/scanner/src/index/mod.rs @@ -21,12 +21,12 @@ pub(crate) fn block_id(getter: &impl Get, block_number: u64) -> [u8; 32] { This task finds the finalized blocks, verifies they're continguous, and saves their IDs. */ -struct IndexFinalizedTask { +pub(crate) struct IndexTask { db: D, feed: S, } -impl IndexFinalizedTask { +impl IndexTask { pub(crate) async fn new(mut db: D, feed: S, start_block: u64) -> Self { if IndexDb::block_id(&db, start_block).is_none() { // Fetch the block for its ID @@ -36,7 +36,7 @@ impl IndexFinalizedTask { match feed.unchecked_block_header_by_number(start_block).await { Ok(block) => break block, Err(e) => { - log::warn!("IndexFinalizedTask couldn't fetch start block {start_block}: {e:?}"); + log::warn!("IndexTask couldn't fetch start block {start_block}: {e:?}"); tokio::time::sleep(core::time::Duration::from_secs(delay)).await; delay += Self::DELAY_BETWEEN_ITERATIONS; delay = delay.min(Self::MAX_DELAY_BETWEEN_ITERATIONS); @@ -57,7 +57,7 @@ impl IndexFinalizedTask { } #[async_trait::async_trait] -impl ContinuallyRan for IndexFinalizedTask { +impl ContinuallyRan for IndexTask { async fn run_iteration(&mut self) -> Result { // Fetch the latest finalized block let our_latest_finalized = IndexDb::latest_finalized_block(&self.db) diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index b363faa1..3515da05 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -3,7 +3,7 @@ use std::collections::HashMap; use group::GroupEncoding; -use serai_db::{Get, DbTxn}; +use serai_db::{Get, DbTxn, Db}; use serai_primitives::{NetworkId, Coin, Amount}; @@ -14,7 +14,7 @@ mod lifetime; // Database schema definition and associated functions. mod db; -use db::ScannerDb; +use db::ScannerGlobalDb; // Task to index the blockchain, ensuring we don't reorganize finalized blocks. mod index; // Scans blocks for received coins. @@ -50,7 +50,7 @@ impl BlockExt for B { /// /// This defines the primitive types used, along with various getters necessary for indexing. #[async_trait::async_trait] -pub trait ScannerFeed: Send + Sync { +pub trait ScannerFeed: 'static + Send + Sync + Clone { /// The ID of the network being scanned for. const NETWORK: NetworkId; @@ -170,7 +170,7 @@ pub struct SchedulerUpdate { } /// The object responsible for accumulating outputs and planning new transactions. -pub trait Scheduler: Send { +pub trait Scheduler: 'static + Send { /// Accumulate outputs into the scheduler, yielding the Eventualities now to be scanned for. /// /// The `Vec` used as the key in the returned HashMap should be the encoded key the @@ -183,14 +183,38 @@ pub trait Scheduler: Send { } /// A representation of a scanner. -pub struct Scanner(PhantomData); +#[allow(non_snake_case)] +pub struct Scanner { + eventuality_handle: RunNowHandle, + _S: PhantomData, +} impl Scanner { /// Create a new scanner. /// /// This will begin its execution, spawning several asynchronous tasks. // TODO: Take start_time and binary search here? - pub fn new(start_block: u64) -> Self { - todo!("TODO") + pub async fn new(db: impl Db, feed: S, scheduler: impl Scheduler, start_block: u64) -> Self { + let index_task = index::IndexTask::new(db.clone(), feed.clone(), start_block).await; + let scan_task = scan::ScanTask::new(db.clone(), feed.clone(), start_block); + let report_task = report::ReportTask::<_, S>::new(db.clone(), start_block); + let eventuality_task = eventuality::EventualityTask::new(db, feed, scheduler, start_block); + + let (_index_handle, index_run) = RunNowHandle::new(); + let (scan_handle, scan_run) = RunNowHandle::new(); + let (report_handle, report_run) = RunNowHandle::new(); + let (eventuality_handle, eventuality_run) = RunNowHandle::new(); + + // Upon indexing a new block, scan it + tokio::spawn(index_task.continually_run(index_run, vec![scan_handle.clone()])); + // Upon scanning a block, report it + tokio::spawn(scan_task.continually_run(scan_run, vec![report_handle])); + // Upon reporting a block, we do nothing + tokio::spawn(report_task.continually_run(report_run, vec![])); + // Upon handling the Eventualities in a block, we run the scan task as we've advanced the + // window its allowed to scan + tokio::spawn(eventuality_task.continually_run(eventuality_run, vec![scan_handle])); + + Self { eventuality_handle, _S: PhantomData } } /// Acknowledge a block. @@ -199,19 +223,26 @@ impl Scanner { /// have achieved synchrony on it. pub fn acknowledge_block( &mut self, - txn: &mut impl DbTxn, + mut txn: impl DbTxn, block_number: u64, key_to_activate: Option>, ) { log::info!("acknowledging block {block_number}"); assert!( - ScannerDb::::is_block_notable(txn, block_number), + ScannerGlobalDb::::is_block_notable(&txn, block_number), "acknowledging a block which wasn't notable" ); - ScannerDb::::set_highest_acknowledged_block(txn, block_number); + ScannerGlobalDb::::set_highest_acknowledged_block(&mut txn, block_number); if let Some(key_to_activate) = key_to_activate { - ScannerDb::::queue_key(txn, block_number + S::WINDOW_LENGTH, key_to_activate); + ScannerGlobalDb::::queue_key(&mut txn, block_number + S::WINDOW_LENGTH, key_to_activate); } + + // Commit the txn + txn.commit(); + // Run the Eventuality task since we've advanced it + // We couldn't successfully do this if that txn was still floating around, uncommitted + // The execution of this task won't actually have more work until the txn is committed + self.eventuality_handle.run_now(); } /// Queue Burns. @@ -220,7 +251,7 @@ impl Scanner { /// safely queue Burns so long as they're only actually added once we've handled the outputs from /// the block acknowledged prior to their queueing. pub fn queue_burns(&mut self, txn: &mut impl DbTxn, burns: Vec<()>) { - let queue_as_of = ScannerDb::::highest_acknowledged_block(txn) + let queue_as_of = ScannerGlobalDb::::highest_acknowledged_block(txn) .expect("queueing Burns yet never acknowledged a block"); todo!("TODO") } @@ -228,8 +259,8 @@ impl Scanner { /* #[derive(Clone, Debug)] -struct ScannerDb(PhantomData, PhantomData); -impl ScannerDb { +struct ScannerGlobalDb(PhantomData, PhantomData); +impl ScannerGlobalDb { fn seen_key(id: &>::Id) -> Vec { Self::scanner_key(b"seen", id) } @@ -295,7 +326,7 @@ impl ScannerDb { TODO2: Only update ram_outputs after committing the TXN in question. */ - let seen = ScannerDb::::seen(&db, &id); + let seen = ScannerGlobalDb::::seen(&db, &id); let id = id.as_ref().to_vec(); if seen || scanner.ram_outputs.contains(&id) { panic!("scanned an output multiple times"); diff --git a/processor/scanner/src/report/db.rs b/processor/scanner/src/report/db.rs index cca2148e..745aa772 100644 --- a/processor/scanner/src/report/db.rs +++ b/processor/scanner/src/report/db.rs @@ -1,6 +1,4 @@ -use core::marker::PhantomData; - -use serai_db::{Get, DbTxn, Db, create_db}; +use serai_db::{Get, DbTxn, create_db}; create_db!( ScannerReport { diff --git a/processor/scanner/src/report/mod.rs b/processor/scanner/src/report/mod.rs index 95bbbbd2..18f842e2 100644 --- a/processor/scanner/src/report/mod.rs +++ b/processor/scanner/src/report/mod.rs @@ -1,3 +1,5 @@ +use core::marker::PhantomData; + use scale::Encode; use serai_db::{DbTxn, Db}; @@ -21,13 +23,14 @@ use db::ReportDb; Eventualities, have processed the block. This ensures we know if this block is notable, and have the InInstructions for it. */ +#[allow(non_snake_case)] pub(crate) struct ReportTask { db: D, - feed: S, + _S: PhantomData, } impl ReportTask { - pub(crate) fn new(mut db: D, feed: S, start_block: u64) -> Self { + pub(crate) fn new(mut db: D, start_block: u64) -> Self { if ReportDb::next_to_potentially_report_block(&db).is_none() { // Initialize the DB let mut txn = db.txn(); @@ -35,7 +38,7 @@ impl ReportTask { txn.commit(); } - Self { db, feed } + Self { db, _S: PhantomData } } } diff --git a/processor/scanner/src/scan/db.rs b/processor/scanner/src/scan/db.rs index 905e10be..9b98150f 100644 --- a/processor/scanner/src/scan/db.rs +++ b/processor/scanner/src/scan/db.rs @@ -1,22 +1,8 @@ use core::marker::PhantomData; -use std::io; -use scale::Encode; -use borsh::{BorshSerialize, BorshDeserialize}; use serai_db::{Get, DbTxn, create_db}; -use serai_in_instructions_primitives::InInstructionWithBalance; - -use primitives::{EncodableG, ReceivedOutput, EventualityTracker}; - -use crate::{ - lifetime::LifetimeStage, db::OutputWithInInstruction, ScannerFeed, KeyFor, AddressFor, OutputFor, - EventualityFor, Return, scan::next_to_scan_for_outputs_block, -}; - -// The DB macro doesn't support `BorshSerialize + BorshDeserialize` as a bound, hence this. -trait Borshy: BorshSerialize + BorshDeserialize {} -impl Borshy for T {} +use crate::{db::OutputWithInInstruction, ScannerFeed}; create_db!( ScannerScan { diff --git a/processor/scanner/src/scan/mod.rs b/processor/scanner/src/scan/mod.rs index 54f9bd77..b427b535 100644 --- a/processor/scanner/src/scan/mod.rs +++ b/processor/scanner/src/scan/mod.rs @@ -10,7 +10,9 @@ use primitives::{task::ContinuallyRan, OutputType, ReceivedOutput, Block}; use crate::{ lifetime::LifetimeStage, - db::{OutputWithInInstruction, SenderScanData, ScannerDb, ScanToReportDb, ScanToEventualityDb}, + db::{ + OutputWithInInstruction, SenderScanData, ScannerGlobalDb, ScanToReportDb, ScanToEventualityDb, + }, BlockExt, ScannerFeed, AddressFor, OutputFor, Return, sort_outputs, eventuality::latest_scannable_block, }; @@ -84,12 +86,12 @@ fn in_instruction_from_output( ) } -struct ScanForOutputsTask { +pub(crate) struct ScanTask { db: D, feed: S, } -impl ScanForOutputsTask { +impl ScanTask { pub(crate) fn new(mut db: D, feed: S, start_block: u64) -> Self { if ScanDb::::next_to_scan_for_outputs_block(&db).is_none() { // Initialize the DB @@ -103,14 +105,14 @@ impl ScanForOutputsTask { } #[async_trait::async_trait] -impl ContinuallyRan for ScanForOutputsTask { +impl ContinuallyRan for ScanTask { async fn run_iteration(&mut self) -> Result { // Fetch the safe to scan block - let latest_scannable = latest_scannable_block::(&self.db) - .expect("ScanForOutputsTask run before writing the start block"); + let latest_scannable = + latest_scannable_block::(&self.db).expect("ScanTask run before writing the start block"); // Fetch the next block to scan let next_to_scan = ScanDb::::next_to_scan_for_outputs_block(&self.db) - .expect("ScanForOutputsTask run before writing the start block"); + .expect("ScanTask run before writing the start block"); for b in next_to_scan ..= latest_scannable { let block = self.feed.block_by_number(&self.db, b).await?; @@ -123,8 +125,8 @@ impl ContinuallyRan for ScanForOutputsTask { // Tidy the keys, then fetch them // We don't have to tidy them here, we just have to somewhere, so why not here? - ScannerDb::::tidy_keys(&mut txn); - let keys = ScannerDb::::active_keys_as_of_next_to_scan_for_outputs_block(&txn) + ScannerGlobalDb::::tidy_keys(&mut txn); + let keys = ScannerGlobalDb::::active_keys_as_of_next_to_scan_for_outputs_block(&txn) .expect("scanning for a blockchain without any keys set"); let mut scan_data = SenderScanData { @@ -197,7 +199,7 @@ impl ContinuallyRan for ScanForOutputsTask { // We ensure it's over the dust limit to prevent people sending 1 satoshi from causing // an invocation of a consensus/signing protocol if balance.amount.0 >= self.feed.dust(balance.coin).0 { - ScannerDb::::flag_notable_due_to_non_external_output(&mut txn, b); + ScannerGlobalDb::::flag_notable_due_to_non_external_output(&mut txn, b); } continue; } @@ -284,10 +286,10 @@ impl ContinuallyRan for ScanForOutputsTask { } } - // Send the scan data to the eventuality task - ScanToEventualityDb::::send_scan_data(&mut txn, b, &scan_data); // Send the InInstructions to the report task ScanToReportDb::::send_in_instructions(&mut txn, b, in_instructions); + // Send the scan data to the eventuality task + ScanToEventualityDb::::send_scan_data(&mut txn, b, &scan_data); // Update the next to scan block ScanDb::::set_next_to_scan_for_outputs_block(&mut txn, b + 1); txn.commit();