diff --git a/processor/primitives/src/lib.rs b/processor/primitives/src/lib.rs index b0b7ae04..7a8be219 100644 --- a/processor/primitives/src/lib.rs +++ b/processor/primitives/src/lib.rs @@ -45,15 +45,20 @@ pub trait Id: } impl Id for [u8; N] where [u8; N]: Default {} -/// A wrapper for a group element which implements the borsh traits. +/// A wrapper for a group element which implements the scale/borsh traits. #[derive(Clone, Copy, PartialEq, Eq, Debug)] -pub struct BorshG(pub G); -impl BorshSerialize for BorshG { +pub struct EncodableG(pub G); +impl Encode for EncodableG { + fn using_encoded R>(&self, f: F) -> R { + f(self.0.to_bytes().as_ref()) + } +} +impl BorshSerialize for EncodableG { fn serialize(&self, writer: &mut W) -> borsh::io::Result<()> { writer.write_all(self.0.to_bytes().as_ref()) } } -impl BorshDeserialize for BorshG { +impl BorshDeserialize for EncodableG { fn deserialize_reader(reader: &mut R) -> borsh::io::Result { let mut repr = G::Repr::default(); reader.read_exact(repr.as_mut())?; diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs index d53bf7c7..a37e05f4 100644 --- a/processor/scanner/src/db.rs +++ b/processor/scanner/src/db.rs @@ -7,7 +7,7 @@ use serai_db::{Get, DbTxn, create_db, db_channel}; use serai_in_instructions_primitives::InInstructionWithBalance; -use primitives::{ReceivedOutput, BorshG}; +use primitives::{ReceivedOutput, EncodableG}; use crate::{lifetime::LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor, Return}; @@ -24,6 +24,7 @@ struct SeraiKeyDbEntry { pub(crate) struct SeraiKey { pub(crate) key: K, pub(crate) stage: LifetimeStage, + pub(crate) activation_block_number: u64, pub(crate) block_at_which_reporting_starts: u64, } @@ -45,11 +46,10 @@ impl OutputWithInInstruction { create_db!( Scanner { ActiveKeys: () -> Vec>, + RetireAt: (key: K) -> u64, // The next block to scan for received outputs NextToScanForOutputsBlock: () -> u64, - // The next block to check for resolving eventualities - NextToCheckForEventualitiesBlock: () -> u64, // The next block to potentially report NextToPotentiallyReportBlock: () -> u64, // Highest acknowledged block @@ -95,29 +95,52 @@ impl ScannerDb { /// activation_block_number is inclusive, so the key will be scanned for starting at the /// specified block. pub(crate) fn queue_key(txn: &mut impl DbTxn, activation_block_number: u64, key: KeyFor) { - // Set this block as notable + // Set the block which has a key activate as notable NotableBlock::set(txn, activation_block_number, &()); // TODO: Panic if we've ever seen this key before // Push the key - let mut keys: Vec>>> = ActiveKeys::get(txn).unwrap_or(vec![]); - keys.push(SeraiKeyDbEntry { activation_block_number, key: BorshG(key) }); + let mut keys: Vec>>> = + ActiveKeys::get(txn).unwrap_or(vec![]); + keys.push(SeraiKeyDbEntry { activation_block_number, key: EncodableG(key) }); ActiveKeys::set(txn, &keys); } /// Retire a key. /// /// The key retired must be the oldest key. There must be another key actively tracked. - // TODO: This will be called from the Eventuality task yet this field is read by the scan task - // We need to write the argument for its safety - pub(crate) fn retire_key(txn: &mut impl DbTxn, key: KeyFor) { - let mut keys: Vec>>> = + pub(crate) fn retire_key(txn: &mut impl DbTxn, at_block: u64, key: KeyFor) { + // Set the block which has a key retire as notable + NotableBlock::set(txn, at_block, &()); + + let keys: Vec>>> = ActiveKeys::get(txn).expect("retiring key yet no active keys"); assert!(keys.len() > 1, "retiring our only key"); assert_eq!(keys[0].key.0, key, "not retiring the oldest key"); - keys.remove(0); - ActiveKeys::set(txn, &keys); + + RetireAt::set(txn, EncodableG(key), &at_block); + } + pub(crate) fn tidy_keys(txn: &mut impl DbTxn) { + let mut keys: Vec>>> = + ActiveKeys::get(txn).expect("retiring key yet no active keys"); + let Some(key) = keys.first() else { return }; + + // Get the block we're scanning for next + let block_number = Self::next_to_scan_for_outputs_block(txn).expect( + "tidying keys despite never setting the next to scan for block (done on initialization)", + ); + // If this key is scheduled for retiry... + if let Some(retire_at) = RetireAt::get(txn, key.key) { + // And is retired by/at this block... + if retire_at <= block_number { + // Remove it from the list of keys + let key = keys.remove(0); + ActiveKeys::set(txn, &keys); + // Also clean up the retiry block + RetireAt::del(txn, key.key); + } + } } /// Fetch the active keys, as of the next-to-scan-for-outputs Block. /// @@ -129,9 +152,16 @@ impl ScannerDb { // If we've scanned block 1,000,000, we can't answer the active keys as of block 0 let block_number = Self::next_to_scan_for_outputs_block(getter)?; - let raw_keys: Vec>>> = ActiveKeys::get(getter)?; + let raw_keys: Vec>>> = ActiveKeys::get(getter)?; let mut keys = Vec::with_capacity(2); for i in 0 .. raw_keys.len() { + // Ensure this key isn't retired + if let Some(retire_at) = RetireAt::get(getter, raw_keys[i].key) { + if retire_at <= block_number { + continue; + } + } + // Ensure this key isn't yet to activate if block_number < raw_keys[i].activation_block_number { continue; } @@ -141,7 +171,12 @@ impl ScannerDb { raw_keys[i].activation_block_number, raw_keys.get(i + 1).map(|key| key.activation_block_number), ); - keys.push(SeraiKey { key: raw_keys[i].key.0, stage, block_at_which_reporting_starts }); + keys.push(SeraiKey { + key: raw_keys[i].key.0, + stage, + activation_block_number: raw_keys[i].activation_block_number, + block_at_which_reporting_starts, + }); } assert!(keys.len() <= 2, "more than two keys active"); Some(keys) @@ -154,19 +189,9 @@ impl ScannerDb { ); NextToScanForOutputsBlock::set(txn, &start_block); - // We can receive outputs in this block, but any descending transactions will be in the next - // block. This, with the check on-set, creates a bound that this value in the DB is non-zero. - NextToCheckForEventualitiesBlock::set(txn, &(start_block + 1)); NextToPotentiallyReportBlock::set(txn, &start_block); } - pub(crate) fn latest_scannable_block(getter: &impl Get) -> Option { - // We can only scan up to whatever block we've checked the Eventualities of, plus the window - // length. Since this returns an inclusive bound, we need to subtract 1 - // See `eventuality.rs` for more info - NextToCheckForEventualitiesBlock::get(getter).map(|b| b + S::WINDOW_LENGTH - 1) - } - pub(crate) fn set_next_to_scan_for_outputs_block( txn: &mut impl DbTxn, next_to_scan_for_outputs_block: u64, @@ -177,20 +202,6 @@ impl ScannerDb { NextToScanForOutputsBlock::get(getter) } - pub(crate) fn set_next_to_check_for_eventualities_block( - txn: &mut impl DbTxn, - next_to_check_for_eventualities_block: u64, - ) { - assert!( - next_to_check_for_eventualities_block != 0, - "next to check for eventualities block was 0 when it's bound non-zero" - ); - NextToCheckForEventualitiesBlock::set(txn, &next_to_check_for_eventualities_block); - } - pub(crate) fn next_to_check_for_eventualities_block(getter: &impl Get) -> Option { - NextToCheckForEventualitiesBlock::get(getter) - } - pub(crate) fn set_next_to_potentially_report_block( txn: &mut impl DbTxn, next_to_potentially_report_block: u64, @@ -229,7 +240,15 @@ impl ScannerDb { SerializedQueuedOutputs::set(txn, queue_for_block, &outputs); } - pub(crate) fn flag_notable(txn: &mut impl DbTxn, block_number: u64) { + /* + This is so verbosely named as the DB itself already flags upon external outputs. Specifically, + if any block yields External outputs to accumulate, we flag it as notable. + + There is the slight edge case where some External outputs are queued for accumulation later. We + consider those outputs received as of the block they're queued to (maintaining the policy any + blocks in which we receive outputs is notable). + */ + pub(crate) fn flag_notable_due_to_non_external_output(txn: &mut impl DbTxn, block_number: u64) { assert!( NextToPotentiallyReportBlock::get(txn).unwrap() <= block_number, "already potentially reported a block we're only now flagging as notable" @@ -298,6 +317,17 @@ db_channel! { pub(crate) struct ScanToEventualityDb(PhantomData); impl ScanToEventualityDb { pub(crate) fn send_scan_data(txn: &mut impl DbTxn, block_number: u64, data: &SenderScanData) { + // If we received an External output to accumulate, or have an External output to forward + // (meaning we received an External output), or have an External output to return (again + // meaning we received an External output), set this block as notable due to receiving outputs + // The non-External output case is covered with `flag_notable_due_to_non_external_output` + if !(data.received_external_outputs.is_empty() && + data.forwards.is_empty() && + data.returns.is_empty()) + { + NotableBlock::set(txn, block_number, &()); + } + /* SerializedForwardedOutputsIndex: (block_number: u64) -> Vec, SerializedForwardedOutput: (output_id: &[u8]) -> Vec, @@ -357,11 +387,6 @@ impl ScanToReportDb { block_number: u64, in_instructions: Vec, ) { - if !in_instructions.is_empty() { - // Set this block as notable - NotableBlock::set(txn, block_number, &()); - } - InInstructions::send(txn, (), &BlockBoundInInstructions { block_number, in_instructions }); } diff --git a/processor/scanner/src/eventuality/db.rs b/processor/scanner/src/eventuality/db.rs index e379532d..baed33c4 100644 --- a/processor/scanner/src/eventuality/db.rs +++ b/processor/scanner/src/eventuality/db.rs @@ -13,12 +13,29 @@ impl Borshy for T {} create_db!( ScannerEventuality { + // The next block to check for resolving eventualities + NextToCheckForEventualitiesBlock: () -> u64, + SerializedEventualities: () -> Vec, } ); pub(crate) struct EventualityDb(PhantomData); impl EventualityDb { + pub(crate) fn set_next_to_check_for_eventualities_block( + txn: &mut impl DbTxn, + next_to_check_for_eventualities_block: u64, + ) { + assert!( + next_to_check_for_eventualities_block != 0, + "next-to-check-for-eventualities block was 0 when it's bound non-zero" + ); + NextToCheckForEventualitiesBlock::set(txn, &next_to_check_for_eventualities_block); + } + pub(crate) fn next_to_check_for_eventualities_block(getter: &impl Get) -> Option { + NextToCheckForEventualitiesBlock::get(getter) + } + pub(crate) fn set_eventualities( txn: &mut impl DbTxn, key: KeyFor, diff --git a/processor/scanner/src/eventuality/mod.rs b/processor/scanner/src/eventuality/mod.rs index 3a472ce2..f682bf36 100644 --- a/processor/scanner/src/eventuality/mod.rs +++ b/processor/scanner/src/eventuality/mod.rs @@ -1,6 +1,6 @@ use group::GroupEncoding; -use serai_db::{DbTxn, Db}; +use serai_db::{Get, DbTxn, Db}; use primitives::{task::ContinuallyRan, OutputType, ReceivedOutput, Eventuality, Block}; @@ -14,6 +14,16 @@ use crate::{ mod db; use db::EventualityDb; +/// The latest scannable block, which is determined by this task. +/// +/// This task decides when a key retires, which impacts the scan task. Accordingly, the scanner is +/// only allowed to scan `S::WINDOW_LENGTH - 1` blocks ahead so we can safely schedule keys to +/// retire `S::WINDOW_LENGTH` blocks out. +pub(crate) fn latest_scannable_block(getter: &impl Get) -> Option { + EventualityDb::::next_to_check_for_eventualities_block(getter) + .map(|b| b + S::WINDOW_LENGTH - 1) +} + /* When we scan a block, we receive outputs. When this block is acknowledged, we accumulate those outputs into some scheduler, potentially causing certain transactions to begin their signing @@ -64,6 +74,21 @@ struct EventualityTask> { scheduler: Sch, } +impl> EventualityTask { + pub(crate) fn new(mut db: D, feed: S, scheduler: Sch, start_block: u64) -> Self { + if EventualityDb::::next_to_check_for_eventualities_block(&db).is_none() { + // Initialize the DB + let mut txn = db.txn(); + // We can receive outputs in `start_block`, but any descending transactions will be in the + // next block + EventualityDb::::set_next_to_check_for_eventualities_block(&mut txn, start_block + 1); + txn.commit(); + } + + Self { db, feed, scheduler } + } +} + #[async_trait::async_trait] impl> ContinuallyRan for EventualityTask { async fn run_iteration(&mut self) -> Result { @@ -93,7 +118,7 @@ impl> ContinuallyRan for EventualityTas .expect("EventualityTask run before writing the start block"); // Fetch the next block to check - let next_to_check = ScannerDb::::next_to_check_for_eventualities_block(&self.db) + let next_to_check = EventualityDb::::next_to_check_for_eventualities_block(&self.db) .expect("EventualityTask run before writing the start block"); // Check all blocks @@ -121,21 +146,19 @@ impl> ContinuallyRan for EventualityTas /* This is proper as the keys for the next to scan block (at most `WINDOW_LENGTH` ahead, - which is `<= CONFIRMATIONS`) will be the keys to use here. + which is `<= CONFIRMATIONS`) will be the keys to use here, with only minor edge cases. - If we had added a new key (which hasn't actually actived by the block we're currently - working on), it won't have any Eventualities for at least `CONFIRMATIONS` blocks (so it'd - have no impact here). + This may include a key which has yet to activate by our perception. We can simply drop + those. - As for retiring a key, that's done on this task's timeline. We ensure we don't bork the - scanner by officially retiring the key `WINDOW_LENGTH` blocks in the future (ensuring the - scanner never has a malleable view of the keys). + This may not include a key which has retired by the next-to-scan block. This task is the + one which decides when to retire a key, and when it marks a key to be retired, it is done + with it. Accordingly, it's not an issue if such a key was dropped. */ - // TODO: Ensure the add key/remove key DB fns are called by the same task to prevent issues - // there - // TODO: On register eventuality, assert the above timeline assumptions let mut keys = ScannerDb::::active_keys_as_of_next_to_scan_for_outputs_block(&self.db) .expect("scanning for a blockchain without any keys set"); + // Since the next-to-scan block is ahead of us, drop keys which have yet to actually activate + keys.retain(|key| b <= key.activation_block_number); let mut txn = self.db.txn(); @@ -146,20 +169,16 @@ impl> ContinuallyRan for EventualityTas scan_data; let mut outputs = received_external_outputs; - for key in keys { - let (eventualities_is_empty, completed_eventualities) = { + for key in &keys { + let completed_eventualities = { let mut eventualities = EventualityDb::::eventualities(&txn, key.key); let completed_eventualities = block.check_for_eventuality_resolutions(&mut eventualities); EventualityDb::::set_eventualities(&mut txn, key.key, &eventualities); - (eventualities.active_eventualities.is_empty(), completed_eventualities) + completed_eventualities }; - for (tx, completed_eventuality) in completed_eventualities { - log::info!( - "eventuality {} resolved by {}", - hex::encode(completed_eventuality.id()), - hex::encode(tx.as_ref()) - ); + for tx in completed_eventualities.keys() { + log::info!("eventuality resolved by {}", hex::encode(tx.as_ref())); } // Fetch all non-External outputs @@ -221,10 +240,12 @@ impl> ContinuallyRan for EventualityTas outputs.extend(non_external_outputs); } + // Update the scheduler let mut scheduler_update = SchedulerUpdate { outputs, forwards, returns }; scheduler_update.outputs.sort_by(sort_outputs); scheduler_update.forwards.sort_by(sort_outputs); scheduler_update.returns.sort_by(|a, b| sort_outputs(&a.output, &b.output)); + // Intake the new Eventualities let new_eventualities = self.scheduler.update(&mut txn, scheduler_update); for (key, new_eventualities) in new_eventualities { let key = { @@ -234,6 +255,11 @@ impl> ContinuallyRan for EventualityTas KeyFor::::from_bytes(&key_repr).unwrap() }; + keys + .iter() + .find(|serai_key| serai_key.key == key) + .expect("queueing eventuality for key which isn't active"); + let mut eventualities = EventualityDb::::eventualities(&txn, key); for new_eventuality in new_eventualities { eventualities.active_eventualities.insert(new_eventuality.lookup(), new_eventuality); @@ -241,24 +267,26 @@ impl> ContinuallyRan for EventualityTas EventualityDb::::set_eventualities(&mut txn, key, &eventualities); } - for key in keys { + // Now that we've intaked any Eventualities caused, check if we're retiring any keys + for key in &keys { if key.stage == LifetimeStage::Finishing { let eventualities = EventualityDb::::eventualities(&txn, key.key); + // TODO: This assumes the Scheduler is empty if eventualities.active_eventualities.is_empty() { log::info!( "key {} has finished and is being retired", hex::encode(key.key.to_bytes().as_ref()) ); - ScannerDb::::flag_notable(&mut txn, b + S::WINDOW_LENGTH); - // TODO: Retire the key - todo!("TODO") + // Retire this key `WINDOW_LENGTH` blocks in the future to ensure the scan task never + // has a malleable view of the keys. + ScannerDb::::retire_key(&mut txn, b + S::WINDOW_LENGTH, key.key); } } } - // Update the next to check block - ScannerDb::::set_next_to_check_for_eventualities_block(&mut txn, next_to_check); + // Update the next-to-check block + EventualityDb::::set_next_to_check_for_eventualities_block(&mut txn, next_to_check); txn.commit(); } diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 2d19207f..b363faa1 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -14,6 +14,7 @@ mod lifetime; // Database schema definition and associated functions. mod db; +use db::ScannerDb; // Task to index the blockchain, ensuring we don't reorganize finalized blocks. mod index; // Scans blocks for received coins. @@ -208,7 +209,9 @@ impl Scanner { "acknowledging a block which wasn't notable" ); ScannerDb::::set_highest_acknowledged_block(txn, block_number); - ScannerDb::::queue_key(txn, block_number + S::WINDOW_LENGTH); + if let Some(key_to_activate) = key_to_activate { + ScannerDb::::queue_key(txn, block_number + S::WINDOW_LENGTH, key_to_activate); + } } /// Queue Burns. @@ -249,11 +252,6 @@ impl ScannerDb { // Return this block's outputs so they can be pruned from the RAM cache outputs } - fn latest_scanned_block(getter: &G) -> Option { - getter - .get(Self::scanned_block_key()) - .map(|bytes| u64::from_le_bytes(bytes.try_into().unwrap()).try_into().unwrap()) - } } // Panic if we've already seen these outputs diff --git a/processor/scanner/src/scan.rs b/processor/scanner/src/scan.rs index f176680e..201f64a1 100644 --- a/processor/scanner/src/scan.rs +++ b/processor/scanner/src/scan.rs @@ -13,6 +13,7 @@ use crate::{ lifetime::LifetimeStage, db::{OutputWithInInstruction, SenderScanData, ScannerDb, ScanToReportDb, ScanToEventualityDb}, BlockExt, ScannerFeed, AddressFor, OutputFor, Return, sort_outputs, + eventuality::latest_scannable_block, }; // Construct an InInstruction from an external output. @@ -69,7 +70,7 @@ struct ScanForOutputsTask { impl ContinuallyRan for ScanForOutputsTask { async fn run_iteration(&mut self) -> Result { // Fetch the safe to scan block - let latest_scannable = ScannerDb::::latest_scannable_block(&self.db) + let latest_scannable = latest_scannable_block::(&self.db) .expect("ScanForOutputsTask run before writing the start block"); // Fetch the next block to scan let next_to_scan = ScannerDb::::next_to_scan_for_outputs_block(&self.db) @@ -80,12 +81,16 @@ impl ContinuallyRan for ScanForOutputsTask { log::info!("scanning block: {} ({b})", hex::encode(block.id())); - assert_eq!(ScannerDb::::next_to_scan_for_outputs_block(&self.db).unwrap(), b); - let keys = ScannerDb::::active_keys_as_of_next_to_scan_for_outputs_block(&self.db) - .expect("scanning for a blockchain without any keys set"); - let mut txn = self.db.txn(); + assert_eq!(ScannerDb::::next_to_scan_for_outputs_block(&txn).unwrap(), b); + + // Tidy the keys, then fetch them + // We don't have to tidy them here, we just have to somewhere, so why not here? + ScannerDb::::tidy_keys(&mut txn); + let keys = ScannerDb::::active_keys_as_of_next_to_scan_for_outputs_block(&txn) + .expect("scanning for a blockchain without any keys set"); + let mut scan_data = SenderScanData { block_number: b, received_external_outputs: vec![], @@ -156,7 +161,7 @@ impl ContinuallyRan for ScanForOutputsTask { // We ensure it's over the dust limit to prevent people sending 1 satoshi from causing // an invocation of a consensus/signing protocol if balance.amount.0 >= self.feed.dust(balance.coin).0 { - ScannerDb::::flag_notable(&mut txn, b); + ScannerDb::::flag_notable_due_to_non_external_output(&mut txn, b); } continue; }