Have Scanner::new spawn tasks

This commit is contained in:
Luke Parker
2024-08-28 20:16:06 -04:00
parent 65f3f48517
commit 738636c238
7 changed files with 73 additions and 53 deletions

View File

@@ -7,7 +7,7 @@ use serai_db::{Get, DbTxn, create_db, db_channel};
use serai_in_instructions_primitives::InInstructionWithBalance; use serai_in_instructions_primitives::InInstructionWithBalance;
use primitives::{ReceivedOutput, EncodableG}; use primitives::{EncodableG, ReceivedOutput};
use crate::{ use crate::{
lifetime::LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor, Return, lifetime::LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor, Return,

View File

@@ -21,12 +21,12 @@ pub(crate) fn block_id(getter: &impl Get, block_number: u64) -> [u8; 32] {
This task finds the finalized blocks, verifies they're continguous, and saves their IDs. This task finds the finalized blocks, verifies they're continguous, and saves their IDs.
*/ */
struct IndexFinalizedTask<D: Db, S: ScannerFeed> { pub(crate) struct IndexTask<D: Db, S: ScannerFeed> {
db: D, db: D,
feed: S, feed: S,
} }
impl<D: Db, S: ScannerFeed> IndexFinalizedTask<D, S> { impl<D: Db, S: ScannerFeed> IndexTask<D, S> {
pub(crate) async fn new(mut db: D, feed: S, start_block: u64) -> Self { pub(crate) async fn new(mut db: D, feed: S, start_block: u64) -> Self {
if IndexDb::block_id(&db, start_block).is_none() { if IndexDb::block_id(&db, start_block).is_none() {
// Fetch the block for its ID // Fetch the block for its ID
@@ -36,7 +36,7 @@ impl<D: Db, S: ScannerFeed> IndexFinalizedTask<D, S> {
match feed.unchecked_block_header_by_number(start_block).await { match feed.unchecked_block_header_by_number(start_block).await {
Ok(block) => break block, Ok(block) => break block,
Err(e) => { Err(e) => {
log::warn!("IndexFinalizedTask couldn't fetch start block {start_block}: {e:?}"); log::warn!("IndexTask couldn't fetch start block {start_block}: {e:?}");
tokio::time::sleep(core::time::Duration::from_secs(delay)).await; tokio::time::sleep(core::time::Duration::from_secs(delay)).await;
delay += Self::DELAY_BETWEEN_ITERATIONS; delay += Self::DELAY_BETWEEN_ITERATIONS;
delay = delay.min(Self::MAX_DELAY_BETWEEN_ITERATIONS); delay = delay.min(Self::MAX_DELAY_BETWEEN_ITERATIONS);
@@ -57,7 +57,7 @@ impl<D: Db, S: ScannerFeed> IndexFinalizedTask<D, S> {
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl<D: Db, S: ScannerFeed> ContinuallyRan for IndexFinalizedTask<D, S> { impl<D: Db, S: ScannerFeed> ContinuallyRan for IndexTask<D, S> {
async fn run_iteration(&mut self) -> Result<bool, String> { async fn run_iteration(&mut self) -> Result<bool, String> {
// Fetch the latest finalized block // Fetch the latest finalized block
let our_latest_finalized = IndexDb::latest_finalized_block(&self.db) let our_latest_finalized = IndexDb::latest_finalized_block(&self.db)

View File

@@ -3,7 +3,7 @@ use std::collections::HashMap;
use group::GroupEncoding; use group::GroupEncoding;
use serai_db::{Get, DbTxn}; use serai_db::{Get, DbTxn, Db};
use serai_primitives::{NetworkId, Coin, Amount}; use serai_primitives::{NetworkId, Coin, Amount};
@@ -14,7 +14,7 @@ mod lifetime;
// Database schema definition and associated functions. // Database schema definition and associated functions.
mod db; mod db;
use db::ScannerDb; use db::ScannerGlobalDb;
// Task to index the blockchain, ensuring we don't reorganize finalized blocks. // Task to index the blockchain, ensuring we don't reorganize finalized blocks.
mod index; mod index;
// Scans blocks for received coins. // Scans blocks for received coins.
@@ -50,7 +50,7 @@ impl<B: Block> BlockExt for B {
/// ///
/// This defines the primitive types used, along with various getters necessary for indexing. /// This defines the primitive types used, along with various getters necessary for indexing.
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait ScannerFeed: Send + Sync { pub trait ScannerFeed: 'static + Send + Sync + Clone {
/// The ID of the network being scanned for. /// The ID of the network being scanned for.
const NETWORK: NetworkId; const NETWORK: NetworkId;
@@ -170,7 +170,7 @@ pub struct SchedulerUpdate<S: ScannerFeed> {
} }
/// The object responsible for accumulating outputs and planning new transactions. /// The object responsible for accumulating outputs and planning new transactions.
pub trait Scheduler<S: ScannerFeed>: Send { pub trait Scheduler<S: ScannerFeed>: 'static + Send {
/// Accumulate outputs into the scheduler, yielding the Eventualities now to be scanned for. /// Accumulate outputs into the scheduler, yielding the Eventualities now to be scanned for.
/// ///
/// The `Vec<u8>` used as the key in the returned HashMap should be the encoded key the /// The `Vec<u8>` used as the key in the returned HashMap should be the encoded key the
@@ -183,14 +183,38 @@ pub trait Scheduler<S: ScannerFeed>: Send {
} }
/// A representation of a scanner. /// A representation of a scanner.
pub struct Scanner<S: ScannerFeed>(PhantomData<S>); #[allow(non_snake_case)]
pub struct Scanner<S: ScannerFeed> {
eventuality_handle: RunNowHandle,
_S: PhantomData<S>,
}
impl<S: ScannerFeed> Scanner<S> { impl<S: ScannerFeed> Scanner<S> {
/// Create a new scanner. /// Create a new scanner.
/// ///
/// This will begin its execution, spawning several asynchronous tasks. /// This will begin its execution, spawning several asynchronous tasks.
// TODO: Take start_time and binary search here? // TODO: Take start_time and binary search here?
pub fn new(start_block: u64) -> Self { pub async fn new(db: impl Db, feed: S, scheduler: impl Scheduler<S>, start_block: u64) -> Self {
todo!("TODO") let index_task = index::IndexTask::new(db.clone(), feed.clone(), start_block).await;
let scan_task = scan::ScanTask::new(db.clone(), feed.clone(), start_block);
let report_task = report::ReportTask::<_, S>::new(db.clone(), start_block);
let eventuality_task = eventuality::EventualityTask::new(db, feed, scheduler, start_block);
let (_index_handle, index_run) = RunNowHandle::new();
let (scan_handle, scan_run) = RunNowHandle::new();
let (report_handle, report_run) = RunNowHandle::new();
let (eventuality_handle, eventuality_run) = RunNowHandle::new();
// Upon indexing a new block, scan it
tokio::spawn(index_task.continually_run(index_run, vec![scan_handle.clone()]));
// Upon scanning a block, report it
tokio::spawn(scan_task.continually_run(scan_run, vec![report_handle]));
// Upon reporting a block, we do nothing
tokio::spawn(report_task.continually_run(report_run, vec![]));
// Upon handling the Eventualities in a block, we run the scan task as we've advanced the
// window its allowed to scan
tokio::spawn(eventuality_task.continually_run(eventuality_run, vec![scan_handle]));
Self { eventuality_handle, _S: PhantomData }
} }
/// Acknowledge a block. /// Acknowledge a block.
@@ -199,19 +223,26 @@ impl<S: ScannerFeed> Scanner<S> {
/// have achieved synchrony on it. /// have achieved synchrony on it.
pub fn acknowledge_block( pub fn acknowledge_block(
&mut self, &mut self,
txn: &mut impl DbTxn, mut txn: impl DbTxn,
block_number: u64, block_number: u64,
key_to_activate: Option<KeyFor<S>>, key_to_activate: Option<KeyFor<S>>,
) { ) {
log::info!("acknowledging block {block_number}"); log::info!("acknowledging block {block_number}");
assert!( assert!(
ScannerDb::<S>::is_block_notable(txn, block_number), ScannerGlobalDb::<S>::is_block_notable(&txn, block_number),
"acknowledging a block which wasn't notable" "acknowledging a block which wasn't notable"
); );
ScannerDb::<S>::set_highest_acknowledged_block(txn, block_number); ScannerGlobalDb::<S>::set_highest_acknowledged_block(&mut txn, block_number);
if let Some(key_to_activate) = key_to_activate { if let Some(key_to_activate) = key_to_activate {
ScannerDb::<S>::queue_key(txn, block_number + S::WINDOW_LENGTH, key_to_activate); ScannerGlobalDb::<S>::queue_key(&mut txn, block_number + S::WINDOW_LENGTH, key_to_activate);
} }
// Commit the txn
txn.commit();
// Run the Eventuality task since we've advanced it
// We couldn't successfully do this if that txn was still floating around, uncommitted
// The execution of this task won't actually have more work until the txn is committed
self.eventuality_handle.run_now();
} }
/// Queue Burns. /// Queue Burns.
@@ -220,7 +251,7 @@ impl<S: ScannerFeed> Scanner<S> {
/// safely queue Burns so long as they're only actually added once we've handled the outputs from /// safely queue Burns so long as they're only actually added once we've handled the outputs from
/// the block acknowledged prior to their queueing. /// the block acknowledged prior to their queueing.
pub fn queue_burns(&mut self, txn: &mut impl DbTxn, burns: Vec<()>) { pub fn queue_burns(&mut self, txn: &mut impl DbTxn, burns: Vec<()>) {
let queue_as_of = ScannerDb::<S>::highest_acknowledged_block(txn) let queue_as_of = ScannerGlobalDb::<S>::highest_acknowledged_block(txn)
.expect("queueing Burns yet never acknowledged a block"); .expect("queueing Burns yet never acknowledged a block");
todo!("TODO") todo!("TODO")
} }
@@ -228,8 +259,8 @@ impl<S: ScannerFeed> Scanner<S> {
/* /*
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct ScannerDb<N: Network, D: Db>(PhantomData<N>, PhantomData<D>); struct ScannerGlobalDb<N: Network, D: Db>(PhantomData<N>, PhantomData<D>);
impl<N: Network, D: Db> ScannerDb<N, D> { impl<N: Network, D: Db> ScannerGlobalDb<N, D> {
fn seen_key(id: &<N::Output as Output<N>>::Id) -> Vec<u8> { fn seen_key(id: &<N::Output as Output<N>>::Id) -> Vec<u8> {
Self::scanner_key(b"seen", id) Self::scanner_key(b"seen", id)
} }
@@ -295,7 +326,7 @@ impl<N: Network, D: Db> ScannerDb<N, D> {
TODO2: Only update ram_outputs after committing the TXN in question. TODO2: Only update ram_outputs after committing the TXN in question.
*/ */
let seen = ScannerDb::<N, D>::seen(&db, &id); let seen = ScannerGlobalDb::<N, D>::seen(&db, &id);
let id = id.as_ref().to_vec(); let id = id.as_ref().to_vec();
if seen || scanner.ram_outputs.contains(&id) { if seen || scanner.ram_outputs.contains(&id) {
panic!("scanned an output multiple times"); panic!("scanned an output multiple times");

View File

@@ -1,6 +1,4 @@
use core::marker::PhantomData; use serai_db::{Get, DbTxn, create_db};
use serai_db::{Get, DbTxn, Db, create_db};
create_db!( create_db!(
ScannerReport { ScannerReport {

View File

@@ -1,3 +1,5 @@
use core::marker::PhantomData;
use scale::Encode; use scale::Encode;
use serai_db::{DbTxn, Db}; use serai_db::{DbTxn, Db};
@@ -21,13 +23,14 @@ use db::ReportDb;
Eventualities, have processed the block. This ensures we know if this block is notable, and have Eventualities, have processed the block. This ensures we know if this block is notable, and have
the InInstructions for it. the InInstructions for it.
*/ */
#[allow(non_snake_case)]
pub(crate) struct ReportTask<D: Db, S: ScannerFeed> { pub(crate) struct ReportTask<D: Db, S: ScannerFeed> {
db: D, db: D,
feed: S, _S: PhantomData<S>,
} }
impl<D: Db, S: ScannerFeed> ReportTask<D, S> { impl<D: Db, S: ScannerFeed> ReportTask<D, S> {
pub(crate) fn new(mut db: D, feed: S, start_block: u64) -> Self { pub(crate) fn new(mut db: D, start_block: u64) -> Self {
if ReportDb::next_to_potentially_report_block(&db).is_none() { if ReportDb::next_to_potentially_report_block(&db).is_none() {
// Initialize the DB // Initialize the DB
let mut txn = db.txn(); let mut txn = db.txn();
@@ -35,7 +38,7 @@ impl<D: Db, S: ScannerFeed> ReportTask<D, S> {
txn.commit(); txn.commit();
} }
Self { db, feed } Self { db, _S: PhantomData }
} }
} }

View File

@@ -1,22 +1,8 @@
use core::marker::PhantomData; use core::marker::PhantomData;
use std::io;
use scale::Encode;
use borsh::{BorshSerialize, BorshDeserialize};
use serai_db::{Get, DbTxn, create_db}; use serai_db::{Get, DbTxn, create_db};
use serai_in_instructions_primitives::InInstructionWithBalance; use crate::{db::OutputWithInInstruction, ScannerFeed};
use primitives::{EncodableG, ReceivedOutput, EventualityTracker};
use crate::{
lifetime::LifetimeStage, db::OutputWithInInstruction, ScannerFeed, KeyFor, AddressFor, OutputFor,
EventualityFor, Return, scan::next_to_scan_for_outputs_block,
};
// The DB macro doesn't support `BorshSerialize + BorshDeserialize` as a bound, hence this.
trait Borshy: BorshSerialize + BorshDeserialize {}
impl<T: BorshSerialize + BorshDeserialize> Borshy for T {}
create_db!( create_db!(
ScannerScan { ScannerScan {

View File

@@ -10,7 +10,9 @@ use primitives::{task::ContinuallyRan, OutputType, ReceivedOutput, Block};
use crate::{ use crate::{
lifetime::LifetimeStage, lifetime::LifetimeStage,
db::{OutputWithInInstruction, SenderScanData, ScannerDb, ScanToReportDb, ScanToEventualityDb}, db::{
OutputWithInInstruction, SenderScanData, ScannerGlobalDb, ScanToReportDb, ScanToEventualityDb,
},
BlockExt, ScannerFeed, AddressFor, OutputFor, Return, sort_outputs, BlockExt, ScannerFeed, AddressFor, OutputFor, Return, sort_outputs,
eventuality::latest_scannable_block, eventuality::latest_scannable_block,
}; };
@@ -84,12 +86,12 @@ fn in_instruction_from_output<S: ScannerFeed>(
) )
} }
struct ScanForOutputsTask<D: Db, S: ScannerFeed> { pub(crate) struct ScanTask<D: Db, S: ScannerFeed> {
db: D, db: D,
feed: S, feed: S,
} }
impl<D: Db, S: ScannerFeed> ScanForOutputsTask<D, S> { impl<D: Db, S: ScannerFeed> ScanTask<D, S> {
pub(crate) fn new(mut db: D, feed: S, start_block: u64) -> Self { pub(crate) fn new(mut db: D, feed: S, start_block: u64) -> Self {
if ScanDb::<S>::next_to_scan_for_outputs_block(&db).is_none() { if ScanDb::<S>::next_to_scan_for_outputs_block(&db).is_none() {
// Initialize the DB // Initialize the DB
@@ -103,14 +105,14 @@ impl<D: Db, S: ScannerFeed> ScanForOutputsTask<D, S> {
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> { impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanTask<D, S> {
async fn run_iteration(&mut self) -> Result<bool, String> { async fn run_iteration(&mut self) -> Result<bool, String> {
// Fetch the safe to scan block // Fetch the safe to scan block
let latest_scannable = latest_scannable_block::<S>(&self.db) let latest_scannable =
.expect("ScanForOutputsTask run before writing the start block"); latest_scannable_block::<S>(&self.db).expect("ScanTask run before writing the start block");
// Fetch the next block to scan // Fetch the next block to scan
let next_to_scan = ScanDb::<S>::next_to_scan_for_outputs_block(&self.db) let next_to_scan = ScanDb::<S>::next_to_scan_for_outputs_block(&self.db)
.expect("ScanForOutputsTask run before writing the start block"); .expect("ScanTask run before writing the start block");
for b in next_to_scan ..= latest_scannable { for b in next_to_scan ..= latest_scannable {
let block = self.feed.block_by_number(&self.db, b).await?; let block = self.feed.block_by_number(&self.db, b).await?;
@@ -123,8 +125,8 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
// Tidy the keys, then fetch them // Tidy the keys, then fetch them
// We don't have to tidy them here, we just have to somewhere, so why not here? // We don't have to tidy them here, we just have to somewhere, so why not here?
ScannerDb::<S>::tidy_keys(&mut txn); ScannerGlobalDb::<S>::tidy_keys(&mut txn);
let keys = ScannerDb::<S>::active_keys_as_of_next_to_scan_for_outputs_block(&txn) let keys = ScannerGlobalDb::<S>::active_keys_as_of_next_to_scan_for_outputs_block(&txn)
.expect("scanning for a blockchain without any keys set"); .expect("scanning for a blockchain without any keys set");
let mut scan_data = SenderScanData { let mut scan_data = SenderScanData {
@@ -197,7 +199,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
// We ensure it's over the dust limit to prevent people sending 1 satoshi from causing // We ensure it's over the dust limit to prevent people sending 1 satoshi from causing
// an invocation of a consensus/signing protocol // an invocation of a consensus/signing protocol
if balance.amount.0 >= self.feed.dust(balance.coin).0 { if balance.amount.0 >= self.feed.dust(balance.coin).0 {
ScannerDb::<S>::flag_notable_due_to_non_external_output(&mut txn, b); ScannerGlobalDb::<S>::flag_notable_due_to_non_external_output(&mut txn, b);
} }
continue; continue;
} }
@@ -284,10 +286,10 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
} }
} }
// Send the scan data to the eventuality task
ScanToEventualityDb::<S>::send_scan_data(&mut txn, b, &scan_data);
// Send the InInstructions to the report task // Send the InInstructions to the report task
ScanToReportDb::<S>::send_in_instructions(&mut txn, b, in_instructions); ScanToReportDb::<S>::send_in_instructions(&mut txn, b, in_instructions);
// Send the scan data to the eventuality task
ScanToEventualityDb::<S>::send_scan_data(&mut txn, b, &scan_data);
// Update the next to scan block // Update the next to scan block
ScanDb::<S>::set_next_to_scan_for_outputs_block(&mut txn, b + 1); ScanDb::<S>::set_next_to_scan_for_outputs_block(&mut txn, b + 1);
txn.commit(); txn.commit();