From 7e7184082251e241bb13690ddb0b937d426857dc Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Mon, 26 Aug 2024 23:15:19 -0400 Subject: [PATCH] Add helper methods Has fetched blocks checked to be the indexed blocks. Has scanned outputs be sorted, meaning they aren't subject to implicit order/may be non-deterministic (such as if handled by a threadpool). --- processor/primitives/src/block.rs | 4 ++- processor/scanner/src/eventuality.rs | 32 ++++------------- processor/scanner/src/index.rs | 2 +- processor/scanner/src/lib.rs | 52 ++++++++++++++++++++++++++-- processor/scanner/src/scan.rs | 23 ++---------- 5 files changed, 63 insertions(+), 50 deletions(-) diff --git a/processor/primitives/src/block.rs b/processor/primitives/src/block.rs index 5ca2acec..6f603ab2 100644 --- a/processor/primitives/src/block.rs +++ b/processor/primitives/src/block.rs @@ -54,7 +54,9 @@ pub trait Block: Send + Sync + Sized + Clone + Debug { fn id(&self) -> [u8; 32]; /// Scan all outputs within this block to find the outputs spendable by this key. - fn scan_for_outputs(&self, key: Self::Key) -> Vec; + /// + /// No assumption on the order of the returned outputs is made. + fn scan_for_outputs_unordered(&self, key: Self::Key) -> Vec; /// Check if this block resolved any Eventualities. /// diff --git a/processor/scanner/src/eventuality.rs b/processor/scanner/src/eventuality.rs index 83ab4eba..8fc18246 100644 --- a/processor/scanner/src/eventuality.rs +++ b/processor/scanner/src/eventuality.rs @@ -5,13 +5,11 @@ use serai_db::{DbTxn, Db}; use primitives::{OutputType, ReceivedOutput, Block}; // TODO: Localize to EventualityDb? -use crate::{lifetime::LifetimeStage, db::ScannerDb, ScannerFeed, KeyFor, Scheduler, ContinuallyRan}; +use crate::{ + lifetime::LifetimeStage, db::ScannerDb, BlockExt, ScannerFeed, KeyFor, Scheduler, ContinuallyRan, +}; /* - Note: The following assumes there's some value, `CONFIRMATIONS`, and the finalized block we - operate on is `CONFIRMATIONS` blocks deep. This is true for Proof-of-Work chains yet not the API - actively used here. - When we scan a block, we receive outputs. When this block is acknowledged, we accumulate those outputs into some scheduler, potentially causing certain transactions to begin their signing protocol. @@ -112,25 +110,7 @@ impl> ContinuallyRan for EventualityTas iterated = true; - // TODO: Add a helper to fetch an indexed block, de-duplicate with scan - let block = match self.feed.block_by_number(b).await { - Ok(block) => block, - Err(e) => Err(format!("couldn't fetch block {b}: {e:?}"))?, - }; - - // Check the ID of this block is the expected ID - { - let expected = - ScannerDb::::block_id(&self.db, b).expect("scannable block didn't have its ID saved"); - if block.id() != expected { - panic!( - "finalized chain reorganized from {} to {} at {}", - hex::encode(expected), - hex::encode(block.id()), - b - ); - } - } + let block = self.feed.block_by_number(b).await?; log::info!("checking eventuality completions in block: {} ({b})", hex::encode(block.id())); @@ -171,7 +151,6 @@ impl> ContinuallyRan for EventualityTas }; // Fetch all non-External outputs - // TODO: Have a scan_for_outputs_ext which sorts for us let mut non_external_outputs = block.scan_for_outputs(key.key); non_external_outputs.retain(|output| output.kind() != OutputType::External); // Drop any outputs less than the dust limit @@ -210,6 +189,7 @@ impl> ContinuallyRan for EventualityTas let outputs_to_return = ScannerDb::::take_queued_returns(&mut txn, b); + // TODO: This also has to intake Burns let new_eventualities = self.scheduler.accumulate_outputs_and_return_outputs(&mut txn, outputs, outputs_to_return); for (key, new_eventualities) in new_eventualities { @@ -220,7 +200,7 @@ impl> ContinuallyRan for EventualityTas KeyFor::::from_bytes(&key_repr).unwrap() }; - let mut eventualities = ScannerDb::::eventualities(&txn, key.key); + let mut eventualities = ScannerDb::::eventualities(&txn, key); for new_eventuality in new_eventualities { eventualities.active_eventualities.insert(new_eventuality.lookup(), new_eventuality); } diff --git a/processor/scanner/src/index.rs b/processor/scanner/src/index.rs index 1d278015..e3c5c6ac 100644 --- a/processor/scanner/src/index.rs +++ b/processor/scanner/src/index.rs @@ -43,7 +43,7 @@ impl ContinuallyRan for IndexFinalizedTask { // Index the hashes of all blocks until the latest finalized block for b in (our_latest_finalized + 1) ..= latest_finalized { - let block = match self.feed.block_header_by_number(b).await { + let block = match self.feed.unchecked_block_header_by_number(b).await { Ok(block) => block, Err(e) => Err(format!("couldn't fetch block {b}: {e:?}"))?, }; diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 822acb27..5b41301e 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -23,6 +23,23 @@ mod eventuality; /// Task which reports `Batch`s to Substrate. mod report; +/// Extension traits around Block. +pub(crate) trait BlockExt: Block { + fn scan_for_outputs(&self, key: Self::Key) -> Vec; +} +impl BlockExt for B { + fn scan_for_outputs(&self, key: Self::Key) -> Vec { + let mut outputs = self.scan_for_outputs_unordered(); + outputs.sort_by(|a, b| { + use core::cmp::{Ordering, Ord}; + let res = a.id().as_ref().cmp(&b.id().as_ref()); + assert!(res != Ordering::Equal, "scanned two outputs within a block with the same ID"); + res + }); + outputs + } +} + /// A feed usable to scan a blockchain. /// /// This defines the primitive types used, along with various getters necessary for indexing. @@ -68,13 +85,44 @@ pub trait ScannerFeed: Send + Sync { async fn latest_finalized_block_number(&self) -> Result; /// Fetch a block header by its number. - async fn block_header_by_number( + /// + /// This does not check the returned BlockHeader is the header for the block we indexed. + async fn unchecked_block_header_by_number( &self, number: u64, ) -> Result<::Header, Self::EphemeralError>; /// Fetch a block by its number. - async fn block_by_number(&self, number: u64) -> Result; + /// + /// This does not check the returned Block is the block we indexed. + async fn unchecked_block_by_number( + &self, + number: u64, + ) -> Result; + + /// Fetch a block by its number. + /// + /// Panics if the block requested wasn't indexed. + async fn block_by_number(&self, getter: &impl Get, number: u64) -> Result { + let block = match self.unchecked_block_by_number(number).await { + Ok(block) => block, + Err(e) => Err(format!("couldn't fetch block {number}: {e:?}"))?, + }; + + // Check the ID of this block is the expected ID + { + let expected = + ScannerDb::::block_id(&self.db, number).expect("requested a block which wasn't indexed"); + if block.id() != expected { + panic!( + "finalized chain reorganized from {} to {} at {}", + hex::encode(expected), + hex::encode(block.id()), + number, + ); + } + } + } /// The cost to aggregate an input as of the specified block. /// diff --git a/processor/scanner/src/scan.rs b/processor/scanner/src/scan.rs index cd010d7c..ddc1110e 100644 --- a/processor/scanner/src/scan.rs +++ b/processor/scanner/src/scan.rs @@ -12,7 +12,7 @@ use primitives::{OutputType, ReceivedOutput, Block}; use crate::{ lifetime::LifetimeStage, db::{OutputWithInInstruction, ScannerDb}, - ScannerFeed, AddressFor, OutputFor, ContinuallyRan, + BlockExt, ScannerFeed, AddressFor, OutputFor, ContinuallyRan, }; // Construct an InInstruction from an external output. @@ -76,24 +76,7 @@ impl ContinuallyRan for ScanForOutputsTask { .expect("ScanForOutputsTask run before writing the start block"); for b in next_to_scan ..= latest_scannable { - let block = match self.feed.block_by_number(b).await { - Ok(block) => block, - Err(e) => Err(format!("couldn't fetch block {b}: {e:?}"))?, - }; - - // Check the ID of this block is the expected ID - { - let expected = - ScannerDb::::block_id(&self.db, b).expect("scannable block didn't have its ID saved"); - if block.id() != expected { - panic!( - "finalized chain reorganized from {} to {} at {}", - hex::encode(expected), - hex::encode(block.id()), - b - ); - } - } + let block = self.feed.block_by_number(b).await?; log::info!("scanning block: {} ({b})", hex::encode(block.id())); @@ -143,7 +126,7 @@ impl ContinuallyRan for ScanForOutputsTask { worthwhile, and even if they're not economically, they are technically). The alternative, we drop outputs here with a generic filter rule and then report back - the insolvency created, still doesn't work as we'd only be creating if an insolvency if + the insolvency created, still doesn't work as we'd only be creating an insolvency if the output was actually made by us (and not simply someone else sending in). We can have the Eventuality task report the insolvency, yet that requires the scanner be responsible for such filter logic. It's more flexible, and has a cleaner API,