diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs index 3ea41161..b4d7c27b 100644 --- a/processor/scanner/src/db.rs +++ b/processor/scanner/src/db.rs @@ -226,32 +226,6 @@ impl ScannerDb { NotableBlock::set(txn, block_number, &()); } - // TODO: Use a DbChannel here, and send the instructions to the report task and the outputs to - // the eventuality task? That way this cleans up after itself - pub(crate) fn set_in_instructions( - txn: &mut impl DbTxn, - block_number: u64, - outputs: Vec>, - ) { - if !outputs.is_empty() { - // Set this block as notable - NotableBlock::set(txn, block_number, &()); - } - - let mut buf = Vec::with_capacity(outputs.len() * 128); - for output in outputs { - output.write(&mut buf).unwrap(); - } - SerializedOutputs::set(txn, block_number, &buf); - } - - pub(crate) fn in_instructions( - getter: &impl Get, - block_number: u64, - ) -> Option>> { - todo!("TODO") - } - pub(crate) fn is_block_notable(getter: &impl Get, number: u64) -> bool { NotableBlock::get(getter, number).is_some() } @@ -352,3 +326,44 @@ impl ScanToEventualityDb { todo!("TODO") } } + +#[derive(BorshSerialize, BorshDeserialize)] +pub(crate) struct BlockBoundInInstructions { + pub(crate) block_number: u64, + pub(crate) in_instructions: Vec, +} + +db_channel! { + ScannerScanReport { + InInstructions: (empty_key: ()) -> BlockBoundInInstructions, + } +} + +pub(crate) struct ScanToReportDb(PhantomData); +impl ScanToReportDb { + pub(crate) fn send_in_instructions( + txn: &mut impl DbTxn, + block_number: u64, + in_instructions: Vec, + ) { + if !in_instructions.is_empty() { + // Set this block as notable + NotableBlock::set(txn, block_number, &()); + } + + InInstructions::send(txn, (), &BlockBoundInInstructions { block_number, in_instructions }); + } + + pub(crate) fn recv_in_instructions( + txn: &mut impl DbTxn, + block_number: u64, + ) -> Vec { + let data = InInstructions::try_recv(txn, ()) + .expect("receiving InInstructions for a scanned block not yet sent"); + assert_eq!( + block_number, data.block_number, + "received InInstructions for a scanned block distinct than expected" + ); + data.in_instructions + } +} diff --git a/processor/scanner/src/report.rs b/processor/scanner/src/report.rs index f2caf692..39a72106 100644 --- a/processor/scanner/src/report.rs +++ b/processor/scanner/src/report.rs @@ -4,10 +4,11 @@ use serai_db::{DbTxn, Db}; use serai_primitives::BlockHash; use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch}; -use primitives::ReceivedOutput; - -// TODO: Localize to ReportDb? -use crate::{db::ScannerDb, index, ScannerFeed, ContinuallyRan}; +// TODO: Localize to Report? +use crate::{ + db::{ScannerDb, ScanToReportDb}, + index, ScannerFeed, ContinuallyRan, +}; /* This task produces Batches for notable blocks, with all InInstructions, in an ordered fashion. @@ -47,23 +48,15 @@ impl ContinuallyRan for ReportTask { for b in next_to_potentially_report ..= highest_reportable { let mut txn = self.db.txn(); + // Receive the InInstructions for this block + // We always do this as we can't trivially tell if we should recv InInstructions before we do + let in_instructions = ScanToReportDb::::recv_in_instructions(&mut txn, b); + let notable = ScannerDb::::is_block_notable(&txn, b); + if !notable { + assert!(in_instructions.is_empty(), "block wasn't notable yet had InInstructions"); + } // If this block is notable, create the Batch(s) for it - if ScannerDb::::is_block_notable(&txn, b) { - let in_instructions = { - let mut in_instructions = ScannerDb::::in_instructions(&txn, b) - .expect("reporting block which didn't set its InInstructions"); - // Sort these before reporting them in case anything we did is non-deterministic/to have - // a well-defined order (not implicit to however we got this result, enabling different - // methods to be used in the future) - in_instructions.sort_by(|a, b| { - use core::cmp::{Ordering, Ord}; - let res = a.output.id().as_ref().cmp(b.output.id().as_ref()); - assert!(res != Ordering::Equal); - res - }); - in_instructions - }; - + if notable { let network = S::NETWORK; let block_hash = index::block_id(&txn, b); let mut batch_id = ScannerDb::::acquire_batch_id(&mut txn); @@ -74,7 +67,7 @@ impl ContinuallyRan for ReportTask { for instruction in in_instructions { let batch = batches.last_mut().unwrap(); - batch.instructions.push(instruction.in_instruction); + batch.instructions.push(instruction); // check if batch is over-size if batch.encode().len() > MAX_BATCH_SIZE { diff --git a/processor/scanner/src/scan.rs b/processor/scanner/src/scan.rs index d8312e3b..861a9725 100644 --- a/processor/scanner/src/scan.rs +++ b/processor/scanner/src/scan.rs @@ -11,7 +11,7 @@ use primitives::{OutputType, ReceivedOutput, Block}; // TODO: Localize to ScanDb? use crate::{ lifetime::LifetimeStage, - db::{OutputWithInInstruction, SenderScanData, ScannerDb, ScanToEventualityDb}, + db::{OutputWithInInstruction, SenderScanData, ScannerDb, ScanToReportDb, ScanToEventualityDb}, BlockExt, ScannerFeed, AddressFor, OutputFor, Return, ContinuallyRan, }; @@ -92,7 +92,25 @@ impl ContinuallyRan for ScanForOutputsTask { forwards: vec![], returns: vec![], }; - let mut in_instructions = ScannerDb::::take_queued_outputs(&mut txn, b); + let mut in_instructions = vec![]; + + let queued_outputs = { + let mut queued_outputs = ScannerDb::::take_queued_outputs(&mut txn, b); + + // Sort the queued outputs in case they weren't queued in a deterministic fashion + queued_outputs.sort_by(|a, b| { + use core::cmp::{Ordering, Ord}; + let res = a.output.id().as_ref().cmp(b.output.id().as_ref()); + assert!(res != Ordering::Equal); + res + }); + + queued_outputs + }; + for queued_output in queued_outputs { + scan_data.received_external_outputs.push(queued_output.output); + in_instructions.push(queued_output.in_instruction); + } // Scan for each key for key in keys { @@ -228,14 +246,14 @@ impl ContinuallyRan for ScanForOutputsTask { assert!(matches!(key.stage, LifetimeStage::Active | LifetimeStage::UsingNewForChange)); scan_data.received_external_outputs.push(output_with_in_instruction.output.clone()); - in_instructions.push(output_with_in_instruction); + in_instructions.push(output_with_in_instruction.in_instruction); } } - // Save the outputs to return + // Send the scan data to the eventuality task ScanToEventualityDb::::send_scan_data(&mut txn, b, &scan_data); - // Save the in instructions - ScannerDb::::set_in_instructions(&mut txn, b, in_instructions); + // Send the in instructions to the report task + ScanToReportDb::::send_in_instructions(&mut txn, b, in_instructions); // Update the next to scan block ScannerDb::::set_next_to_scan_for_outputs_block(&mut txn, b + 1); txn.commit();