Add ScanDb

This commit is contained in:
Luke Parker
2024-08-28 19:00:02 -04:00
parent 77ef25416b
commit fdfe520f9d
5 changed files with 113 additions and 53 deletions

View File

@@ -9,7 +9,10 @@ use serai_in_instructions_primitives::InInstructionWithBalance;
use primitives::{ReceivedOutput, EncodableG}; use primitives::{ReceivedOutput, EncodableG};
use crate::{lifetime::LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor, Return}; use crate::{
lifetime::LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor, Return,
scan::next_to_scan_for_outputs_block,
};
// The DB macro doesn't support `BorshSerialize + BorshDeserialize` as a bound, hence this. // The DB macro doesn't support `BorshSerialize + BorshDeserialize` as a bound, hence this.
trait Borshy: BorshSerialize + BorshDeserialize {} trait Borshy: BorshSerialize + BorshDeserialize {}
@@ -35,7 +38,7 @@ pub(crate) struct OutputWithInInstruction<S: ScannerFeed> {
} }
impl<S: ScannerFeed> OutputWithInInstruction<S> { impl<S: ScannerFeed> OutputWithInInstruction<S> {
fn write(&self, writer: &mut impl io::Write) -> io::Result<()> { pub(crate) fn write(&self, writer: &mut impl io::Write) -> io::Result<()> {
self.output.write(writer)?; self.output.write(writer)?;
// TODO self.return_address.write(writer)?; // TODO self.return_address.write(writer)?;
self.in_instruction.encode_to(writer); self.in_instruction.encode_to(writer);
@@ -48,8 +51,6 @@ create_db!(
ActiveKeys: <K: Borshy>() -> Vec<SeraiKeyDbEntry<K>>, ActiveKeys: <K: Borshy>() -> Vec<SeraiKeyDbEntry<K>>,
RetireAt: <K: Encode>(key: K) -> u64, RetireAt: <K: Encode>(key: K) -> u64,
// The next block to scan for received outputs
NextToScanForOutputsBlock: () -> u64,
// The next block to potentially report // The next block to potentially report
NextToPotentiallyReportBlock: () -> u64, NextToPotentiallyReportBlock: () -> u64,
// Highest acknowledged block // Highest acknowledged block
@@ -74,9 +75,6 @@ create_db!(
*/ */
// This collapses from `bool` to `()`, using if the value was set for true and false otherwise // This collapses from `bool` to `()`, using if the value was set for true and false otherwise
NotableBlock: (number: u64) -> (), NotableBlock: (number: u64) -> (),
SerializedQueuedOutputs: (block_number: u64) -> Vec<u8>,
SerializedOutputs: (block_number: u64) -> Vec<u8>,
} }
); );
@@ -127,7 +125,7 @@ impl<S: ScannerFeed> ScannerDb<S> {
let Some(key) = keys.first() else { return }; let Some(key) = keys.first() else { return };
// Get the block we're scanning for next // Get the block we're scanning for next
let block_number = Self::next_to_scan_for_outputs_block(txn).expect( let block_number = next_to_scan_for_outputs_block::<S>(txn).expect(
"tidying keys despite never setting the next to scan for block (done on initialization)", "tidying keys despite never setting the next to scan for block (done on initialization)",
); );
// If this key is scheduled for retiry... // If this key is scheduled for retiry...
@@ -150,7 +148,7 @@ impl<S: ScannerFeed> ScannerDb<S> {
) -> Option<Vec<SeraiKey<KeyFor<S>>>> { ) -> Option<Vec<SeraiKey<KeyFor<S>>>> {
// We don't take this as an argument as we don't keep all historical keys in memory // We don't take this as an argument as we don't keep all historical keys in memory
// If we've scanned block 1,000,000, we can't answer the active keys as of block 0 // If we've scanned block 1,000,000, we can't answer the active keys as of block 0
let block_number = Self::next_to_scan_for_outputs_block(getter)?; let block_number = next_to_scan_for_outputs_block::<S>(getter)?;
let raw_keys: Vec<SeraiKeyDbEntry<EncodableG<KeyFor<S>>>> = ActiveKeys::get(getter)?; let raw_keys: Vec<SeraiKeyDbEntry<EncodableG<KeyFor<S>>>> = ActiveKeys::get(getter)?;
let mut keys = Vec::with_capacity(2); let mut keys = Vec::with_capacity(2);
@@ -183,25 +181,9 @@ impl<S: ScannerFeed> ScannerDb<S> {
} }
pub(crate) fn set_start_block(txn: &mut impl DbTxn, start_block: u64, id: [u8; 32]) { pub(crate) fn set_start_block(txn: &mut impl DbTxn, start_block: u64, id: [u8; 32]) {
assert!(
NextToScanForOutputsBlock::get(txn).is_none(),
"setting start block but prior set start block"
);
NextToScanForOutputsBlock::set(txn, &start_block);
NextToPotentiallyReportBlock::set(txn, &start_block); NextToPotentiallyReportBlock::set(txn, &start_block);
} }
pub(crate) fn set_next_to_scan_for_outputs_block(
txn: &mut impl DbTxn,
next_to_scan_for_outputs_block: u64,
) {
NextToScanForOutputsBlock::set(txn, &next_to_scan_for_outputs_block);
}
pub(crate) fn next_to_scan_for_outputs_block(getter: &impl Get) -> Option<u64> {
NextToScanForOutputsBlock::get(getter)
}
pub(crate) fn set_next_to_potentially_report_block( pub(crate) fn set_next_to_potentially_report_block(
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
next_to_potentially_report_block: u64, next_to_potentially_report_block: u64,
@@ -222,24 +204,6 @@ impl<S: ScannerFeed> ScannerDb<S> {
HighestAcknowledgedBlock::get(getter) HighestAcknowledgedBlock::get(getter)
} }
pub(crate) fn take_queued_outputs(
txn: &mut impl DbTxn,
block_number: u64,
) -> Vec<OutputWithInInstruction<S>> {
todo!("TODO")
}
pub(crate) fn queue_output_until_block(
txn: &mut impl DbTxn,
queue_for_block: u64,
output: &OutputWithInInstruction<S>,
) {
let mut outputs =
SerializedQueuedOutputs::get(txn, queue_for_block).unwrap_or(Vec::with_capacity(128));
output.write(&mut outputs).unwrap();
SerializedQueuedOutputs::set(txn, queue_for_block, &outputs);
}
/* /*
This is so verbosely named as the DB itself already flags upon external outputs. Specifically, This is so verbosely named as the DB itself already flags upon external outputs. Specifically,
if any block yields External outputs to accumulate, we flag it as notable. if any block yields External outputs to accumulate, we flag it as notable.

View File

@@ -9,6 +9,7 @@ use crate::{
lifetime::LifetimeStage, lifetime::LifetimeStage,
db::{OutputWithInInstruction, ReceiverScanData, ScannerDb, ScanToEventualityDb}, db::{OutputWithInInstruction, ReceiverScanData, ScannerDb, ScanToEventualityDb},
BlockExt, ScannerFeed, KeyFor, SchedulerUpdate, Scheduler, sort_outputs, BlockExt, ScannerFeed, KeyFor, SchedulerUpdate, Scheduler, sort_outputs,
scan::{next_to_scan_for_outputs_block, queue_output_until_block},
}; };
mod db; mod db;
@@ -104,7 +105,7 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
*/ */
let exclusive_upper_bound = { let exclusive_upper_bound = {
// Fetch the next to scan block // Fetch the next to scan block
let next_to_scan = ScannerDb::<S>::next_to_scan_for_outputs_block(&self.db) let next_to_scan = next_to_scan_for_outputs_block::<S>(&self.db)
.expect("EventualityTask run before writing the start block"); .expect("EventualityTask run before writing the start block");
// If we haven't done any work, return // If we haven't done any work, return
if next_to_scan == 0 { if next_to_scan == 0 {
@@ -229,7 +230,7 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
&txn, &forwarded, &txn, &forwarded,
) )
.expect("forwarded an output yet didn't save its InInstruction to the DB"); .expect("forwarded an output yet didn't save its InInstruction to the DB");
ScannerDb::<S>::queue_output_until_block( queue_output_until_block::<S>(
&mut txn, &mut txn,
b + S::WINDOW_LENGTH, b + S::WINDOW_LENGTH,
&OutputWithInInstruction { output: output.clone(), return_address, in_instruction }, &OutputWithInInstruction { output: output.clone(), return_address, in_instruction },

View File

@@ -7,7 +7,9 @@ use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch};
// TODO: Localize to Report? // TODO: Localize to Report?
use crate::{ use crate::{
db::{ScannerDb, ScanToReportDb}, db::{ScannerDb, ScanToReportDb},
index, ScannerFeed, ContinuallyRan, index,
scan::next_to_scan_for_outputs_block,
ScannerFeed, ContinuallyRan,
}; };
/* /*
@@ -27,7 +29,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
async fn run_iteration(&mut self) -> Result<bool, String> { async fn run_iteration(&mut self) -> Result<bool, String> {
let highest_reportable = { let highest_reportable = {
// Fetch the next to scan block // Fetch the next to scan block
let next_to_scan = ScannerDb::<S>::next_to_scan_for_outputs_block(&self.db) let next_to_scan = next_to_scan_for_outputs_block::<S>(&self.db)
.expect("ReportTask run before writing the start block"); .expect("ReportTask run before writing the start block");
// If we haven't done any work, return // If we haven't done any work, return
if next_to_scan == 0 { if next_to_scan == 0 {

View File

@@ -0,0 +1,59 @@
use core::marker::PhantomData;
use std::io;
use scale::Encode;
use borsh::{BorshSerialize, BorshDeserialize};
use serai_db::{Get, DbTxn, create_db};
use serai_in_instructions_primitives::InInstructionWithBalance;
use primitives::{EncodableG, ReceivedOutput, EventualityTracker};
use crate::{
lifetime::LifetimeStage, db::OutputWithInInstruction, ScannerFeed, KeyFor, AddressFor, OutputFor,
EventualityFor, Return, scan::next_to_scan_for_outputs_block,
};
// The DB macro doesn't support `BorshSerialize + BorshDeserialize` as a bound, hence this.
trait Borshy: BorshSerialize + BorshDeserialize {}
impl<T: BorshSerialize + BorshDeserialize> Borshy for T {}
create_db!(
ScannerScan {
// The next block to scan for received outputs
NextToScanForOutputsBlock: () -> u64,
SerializedQueuedOutputs: (block_number: u64) -> Vec<u8>,
}
);
pub(crate) struct ScanDb<S: ScannerFeed>(PhantomData<S>);
impl<S: ScannerFeed> ScanDb<S> {
pub(crate) fn set_next_to_scan_for_outputs_block(
txn: &mut impl DbTxn,
next_to_scan_for_outputs_block: u64,
) {
NextToScanForOutputsBlock::set(txn, &next_to_scan_for_outputs_block);
}
pub(crate) fn next_to_scan_for_outputs_block(getter: &impl Get) -> Option<u64> {
NextToScanForOutputsBlock::get(getter)
}
pub(crate) fn take_queued_outputs(
txn: &mut impl DbTxn,
block_number: u64,
) -> Vec<OutputWithInInstruction<S>> {
todo!("TODO")
}
pub(crate) fn queue_output_until_block(
txn: &mut impl DbTxn,
queue_for_block: u64,
output: &OutputWithInInstruction<S>,
) {
let mut outputs =
SerializedQueuedOutputs::get(txn, queue_for_block).unwrap_or(Vec::with_capacity(128));
output.write(&mut outputs).unwrap();
SerializedQueuedOutputs::set(txn, queue_for_block, &outputs);
}
}

View File

@@ -1,5 +1,5 @@
use scale::Decode; use scale::Decode;
use serai_db::{DbTxn, Db}; use serai_db::{Get, DbTxn, Db};
use serai_primitives::MAX_DATA_LEN; use serai_primitives::MAX_DATA_LEN;
use serai_in_instructions_primitives::{ use serai_in_instructions_primitives::{
@@ -16,6 +16,27 @@ use crate::{
eventuality::latest_scannable_block, eventuality::latest_scannable_block,
}; };
mod db;
use db::ScanDb;
pub(crate) fn next_to_scan_for_outputs_block<S: ScannerFeed>(getter: &impl Get) -> Option<u64> {
ScanDb::<S>::next_to_scan_for_outputs_block(getter)
}
pub(crate) fn queue_output_until_block<S: ScannerFeed>(
txn: &mut impl DbTxn,
queue_for_block: u64,
output: &OutputWithInInstruction<S>,
) {
assert!(
queue_for_block >=
next_to_scan_for_outputs_block::<S>(txn)
.expect("queueing an output despite no next-to-scan-for-outputs block"),
"queueing an output for a block already scanned"
);
ScanDb::<S>::queue_output_until_block(txn, queue_for_block, output)
}
// Construct an InInstruction from an external output. // Construct an InInstruction from an external output.
// //
// Also returns the address to return the coins to upon error. // Also returns the address to return the coins to upon error.
@@ -66,6 +87,19 @@ struct ScanForOutputsTask<D: Db, S: ScannerFeed> {
feed: S, feed: S,
} }
impl<D: Db, S: ScannerFeed> ScanForOutputsTask<D, S> {
pub(crate) fn new(mut db: D, feed: S, start_block: u64) -> Self {
if ScanDb::<S>::next_to_scan_for_outputs_block(&db).is_none() {
// Initialize the DB
let mut txn = db.txn();
ScanDb::<S>::set_next_to_scan_for_outputs_block(&mut txn, start_block);
txn.commit();
}
Self { db, feed }
}
}
#[async_trait::async_trait] #[async_trait::async_trait]
impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> { impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
async fn run_iteration(&mut self) -> Result<bool, String> { async fn run_iteration(&mut self) -> Result<bool, String> {
@@ -73,7 +107,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
let latest_scannable = latest_scannable_block::<S>(&self.db) let latest_scannable = latest_scannable_block::<S>(&self.db)
.expect("ScanForOutputsTask run before writing the start block"); .expect("ScanForOutputsTask run before writing the start block");
// Fetch the next block to scan // Fetch the next block to scan
let next_to_scan = ScannerDb::<S>::next_to_scan_for_outputs_block(&self.db) let next_to_scan = ScanDb::<S>::next_to_scan_for_outputs_block(&self.db)
.expect("ScanForOutputsTask run before writing the start block"); .expect("ScanForOutputsTask run before writing the start block");
for b in next_to_scan ..= latest_scannable { for b in next_to_scan ..= latest_scannable {
@@ -83,7 +117,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
let mut txn = self.db.txn(); let mut txn = self.db.txn();
assert_eq!(ScannerDb::<S>::next_to_scan_for_outputs_block(&txn).unwrap(), b); assert_eq!(ScanDb::<S>::next_to_scan_for_outputs_block(&txn).unwrap(), b);
// Tidy the keys, then fetch them // Tidy the keys, then fetch them
// We don't have to tidy them here, we just have to somewhere, so why not here? // We don't have to tidy them here, we just have to somewhere, so why not here?
@@ -100,7 +134,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
let mut in_instructions = vec![]; let mut in_instructions = vec![];
let queued_outputs = { let queued_outputs = {
let mut queued_outputs = ScannerDb::<S>::take_queued_outputs(&mut txn, b); let mut queued_outputs = ScanDb::<S>::take_queued_outputs(&mut txn, b);
// Sort the queued outputs in case they weren't queued in a deterministic fashion // Sort the queued outputs in case they weren't queued in a deterministic fashion
queued_outputs.sort_by(|a, b| sort_outputs(&a.output, &b.output)); queued_outputs.sort_by(|a, b| sort_outputs(&a.output, &b.output));
queued_outputs queued_outputs
@@ -217,7 +251,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
// This multisig isn't yet reporting its External outputs to avoid a DoS // This multisig isn't yet reporting its External outputs to avoid a DoS
// Queue the output to be reported when this multisig starts reporting // Queue the output to be reported when this multisig starts reporting
LifetimeStage::ActiveYetNotReporting => { LifetimeStage::ActiveYetNotReporting => {
ScannerDb::<S>::queue_output_until_block( ScanDb::<S>::queue_output_until_block(
&mut txn, &mut txn,
key.block_at_which_reporting_starts, key.block_at_which_reporting_starts,
&output_with_in_instruction, &output_with_in_instruction,
@@ -253,7 +287,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
// Send the InInstructions to the report task // Send the InInstructions to the report task
ScanToReportDb::<S>::send_in_instructions(&mut txn, b, in_instructions); ScanToReportDb::<S>::send_in_instructions(&mut txn, b, in_instructions);
// Update the next to scan block // Update the next to scan block
ScannerDb::<S>::set_next_to_scan_for_outputs_block(&mut txn, b + 1); ScanDb::<S>::set_next_to_scan_for_outputs_block(&mut txn, b + 1);
txn.commit(); txn.commit();
} }