Files
serai/processor/scanner/src/eventuality/mod.rs

372 lines
16 KiB
Rust
Raw Normal View History

use group::GroupEncoding;
2024-08-28 18:43:40 -04:00
use serai_db::{Get, DbTxn, Db};
2024-08-21 18:41:51 -04:00
use primitives::{task::ContinuallyRan, OutputType, ReceivedOutput, Eventuality, Block};
2024-08-21 18:41:51 -04:00
use crate::{
lifetime::LifetimeStage,
2024-08-29 12:45:47 -04:00
db::{
OutputWithInInstruction, ReceiverScanData, ScannerGlobalDb, SubstrateToEventualityDb,
ScanToEventualityDb,
},
BlockExt, ScannerFeed, KeyFor, SchedulerUpdate, Scheduler, sort_outputs,
2024-08-28 19:00:02 -04:00
scan::{next_to_scan_for_outputs_block, queue_output_until_block},
};
mod db;
use db::EventualityDb;
2024-08-28 18:43:40 -04:00
/// The latest scannable block, which is determined by this task.
///
/// This task decides when a key retires, which impacts the scan task. Accordingly, the scanner is
/// only allowed to scan `S::WINDOW_LENGTH - 1` blocks ahead so we can safely schedule keys to
/// retire `S::WINDOW_LENGTH` blocks out.
pub(crate) fn latest_scannable_block<S: ScannerFeed>(getter: &impl Get) -> Option<u64> {
2024-08-29 12:45:47 -04:00
assert!(S::WINDOW_LENGTH > 0);
2024-08-28 18:43:40 -04:00
EventualityDb::<S>::next_to_check_for_eventualities_block(getter)
.map(|b| b + S::WINDOW_LENGTH - 1)
}
/*
When we scan a block, we receive outputs. When this block is acknowledged, we accumulate those
outputs into some scheduler, potentially causing certain transactions to begin their signing
protocol.
Despite only scanning blocks with `CONFIRMATIONS`, we cannot assume that these transactions (in
their signed form) will only appear after `CONFIRMATIONS`. For `CONFIRMATIONS = 10`, the scanned
block's number being `1`, the blockchain will have blocks with numbers `0 ..= 10`. While this
implies the earliest the transaction will appear is when the block number is `11`, which is
`1 + CONFIRMATIONS` (the number of the scanned block, plus the confirmations), this isn't
guaranteed.
A reorganization could occur which causes all unconfirmed blocks to be replaced, with the new
blockchain having the signed transaction present immediately.
This means that in order to detect Eventuality completions, we can only check block `b+1` once
we've acknowledged block `b`, accumulated its outputs, triggered any transactions, and prepared
for their Eventualities. This is important as both the completion of Eventualities, and the scan
process, may cause a block to be considered notable (where notable blocks must be perfectly
ordered).
We do not want to fully serialize the scan flow solely because the Eventuality flow must be. If
the time to scan, acknowledge, and intake a block ever exceeded the block time, we'd form a
backlog.
The solution is to form a window of blocks we can scan/acknowledge/intake, safely, such that we
only form a backlog if the latency for a block exceeds the duration of the entire window (the
amount of blocks in the window * the block time).
By considering the block an Eventuality resolves not as the block it does, yet the block a window
later, we enable the following flow:
- The scanner scans within its window, submitting blocks for acknowledgement.
- We have the blocks acknowledged (the consensus protocol handling this in parallel).
- The scanner checks for Eventualities completed following acknowledged blocks.
- If all Eventualities for a retiring multisig have been cleared, the notable block is one window
later.
- The start of the window shifts to the last block we've checked for Eventualities. This means
the end of the window is the block we just set as notable, and yes, once that's scanned we can
successfully publish a batch for it in a canonical fashion.
This forms a backlog only if the latency of scanning, acknowledgement, and intake (including
checking Eventualities) exceeds the window duration (the desired property).
*/
2024-08-28 19:58:28 -04:00
pub(crate) struct EventualityTask<D: Db, S: ScannerFeed, Sch: Scheduler<S>> {
2024-08-21 18:41:51 -04:00
db: D,
feed: S,
scheduler: Sch,
2024-08-21 18:41:51 -04:00
}
2024-08-28 18:43:40 -04:00
impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> EventualityTask<D, S, Sch> {
pub(crate) fn new(mut db: D, feed: S, scheduler: Sch, start_block: u64) -> Self {
if EventualityDb::<S>::next_to_check_for_eventualities_block(&db).is_none() {
// Initialize the DB
let mut txn = db.txn();
2024-08-29 12:45:47 -04:00
EventualityDb::<S>::set_next_to_check_for_eventualities_block(&mut txn, start_block);
2024-08-28 18:43:40 -04:00
txn.commit();
}
Self { db, feed, scheduler }
}
2024-08-29 12:45:47 -04:00
// Returns a boolean of if we intaked any Burns.
fn intake_burns(&mut self) -> bool {
let mut intaked_any = false;
// If we've handled an notable block, we may have Burns being queued with it as the reference
if let Some(latest_handled_notable_block) =
EventualityDb::<S>::latest_handled_notable_block(&self.db)
{
let mut txn = self.db.txn();
// Drain the entire channel
while let Some(burns) =
SubstrateToEventualityDb::try_recv_burns(&mut txn, latest_handled_notable_block)
{
intaked_any = true;
let new_eventualities = self.scheduler.fulfill(&mut txn, burns);
// TODO: De-duplicate this with below instance via a helper function
for (key, new_eventualities) in new_eventualities {
let key = {
let mut key_repr = <KeyFor<S> as GroupEncoding>::Repr::default();
assert_eq!(key.len(), key_repr.as_ref().len());
key_repr.as_mut().copy_from_slice(&key);
KeyFor::<S>::from_bytes(&key_repr).unwrap()
};
let mut eventualities = EventualityDb::<S>::eventualities(&txn, key);
for new_eventuality in new_eventualities {
eventualities.insert(new_eventuality);
}
EventualityDb::<S>::set_eventualities(&mut txn, key, &eventualities);
}
}
txn.commit();
}
intaked_any
}
2024-08-28 18:43:40 -04:00
}
2024-08-21 18:41:51 -04:00
#[async_trait::async_trait]
impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTask<D, S, Sch> {
2024-08-21 18:41:51 -04:00
async fn run_iteration(&mut self) -> Result<bool, String> {
2024-08-29 12:45:47 -04:00
// Fetch the highest acknowledged block
let Some(highest_acknowledged) = ScannerGlobalDb::<S>::highest_acknowledged_block(&self.db)
else {
// If we've never acknowledged a block, return
return Ok(false);
};
// A boolean of if we've made any progress to return at the end of the function
let mut made_progress = false;
// Start by intaking any Burns we have sitting around
made_progress |= self.intake_burns();
2024-08-21 18:41:51 -04:00
/*
2024-08-29 12:45:47 -04:00
Eventualities increase upon one of two cases:
1) We're fulfilling Burns
2) We acknowledged a block
We can't know the processor has intaked all Burns it should have when we process block `b`.
We solve this by executing a consensus protocol whenever a resolution for an Eventuality
created to fulfill Burns occurs. Accordingly, we force ourselves to obtain synchrony on such
blocks (and all preceding Burns).
This means we can only iterate up to the block currently pending acknowledgement.
2024-08-21 18:41:51 -04:00
We only know blocks will need acknowledgement *for sure* if they were scanned. The only other
causes are key activation and retirement (both scheduled outside the scan window). This makes
the exclusive upper bound the *next block to scan*.
*/
let exclusive_upper_bound = {
// Fetch the next to scan block
2024-08-28 19:00:02 -04:00
let next_to_scan = next_to_scan_for_outputs_block::<S>(&self.db)
2024-08-21 18:41:51 -04:00
.expect("EventualityTask run before writing the start block");
// If we haven't done any work, return
if next_to_scan == 0 {
return Ok(false);
}
next_to_scan
};
// Fetch the next block to check
2024-08-28 18:43:40 -04:00
let next_to_check = EventualityDb::<S>::next_to_check_for_eventualities_block(&self.db)
2024-08-21 18:41:51 -04:00
.expect("EventualityTask run before writing the start block");
// Check all blocks
for b in next_to_check .. exclusive_upper_bound {
2024-08-29 12:45:47 -04:00
let is_block_notable = ScannerGlobalDb::<S>::is_block_notable(&self.db, b);
if is_block_notable {
/*
If this block is notable *and* not acknowledged, break.
This is so if Burns queued prior to this block's acknowledgement caused any Eventualities
(which may resolve this block), we have them. If it wasn't for that, it'd be so if this
block's acknowledgement caused any Eventualities, we have them, though those would only
potentially resolve in the next block (letting us scan this block without delay).
*/
if b > highest_acknowledged {
2024-08-21 18:41:51 -04:00
break;
}
2024-08-29 12:45:47 -04:00
// Since this block is notable, ensure we've intaked all the Burns preceding it
// We can know with certainty that the channel is fully populated at this time since we've
// acknowledged a newer block (so we've handled the state up to this point and new state
// will be for the newer block)
#[allow(unused_assignments)]
{
made_progress |= self.intake_burns();
}
2024-08-21 18:41:51 -04:00
}
2024-08-29 12:45:47 -04:00
// Since we're handling this block, we are making progress
made_progress = true;
2024-08-21 18:41:51 -04:00
let block = self.feed.block_by_number(&self.db, b).await?;
2024-08-24 23:43:31 -04:00
2024-08-27 16:43:50 -04:00
log::debug!("checking eventuality completions in block: {} ({b})", hex::encode(block.id()));
2024-08-24 23:43:31 -04:00
/*
This is proper as the keys for the next to scan block (at most `WINDOW_LENGTH` ahead,
2024-08-28 18:43:40 -04:00
which is `<= CONFIRMATIONS`) will be the keys to use here, with only minor edge cases.
2024-08-24 23:43:31 -04:00
2024-08-28 18:43:40 -04:00
This may include a key which has yet to activate by our perception. We can simply drop
those.
2024-08-24 23:43:31 -04:00
2024-08-28 18:43:40 -04:00
This may not include a key which has retired by the next-to-scan block. This task is the
one which decides when to retire a key, and when it marks a key to be retired, it is done
with it. Accordingly, it's not an issue if such a key was dropped.
2024-08-24 23:43:31 -04:00
*/
2024-08-28 19:58:28 -04:00
let mut keys =
ScannerGlobalDb::<S>::active_keys_as_of_next_to_scan_for_outputs_block(&self.db)
.expect("scanning for a blockchain without any keys set");
2024-08-28 18:43:40 -04:00
// Since the next-to-scan block is ahead of us, drop keys which have yet to actually activate
keys.retain(|key| b <= key.activation_block_number);
2024-08-21 18:41:51 -04:00
let mut txn = self.db.txn();
2024-08-24 23:43:31 -04:00
// Fetch the data from the scanner
let scan_data = ScanToEventualityDb::recv_scan_data(&mut txn, b);
assert_eq!(scan_data.block_number, b);
let ReceiverScanData { block_number: _, received_external_outputs, forwards, returns } =
scan_data;
let mut outputs = received_external_outputs;
2024-08-24 23:43:31 -04:00
2024-08-28 18:43:40 -04:00
for key in &keys {
let completed_eventualities = {
let mut eventualities = EventualityDb::<S>::eventualities(&txn, key.key);
2024-08-24 23:43:31 -04:00
let completed_eventualities = block.check_for_eventuality_resolutions(&mut eventualities);
EventualityDb::<S>::set_eventualities(&mut txn, key.key, &eventualities);
2024-08-28 18:43:40 -04:00
completed_eventualities
2024-08-24 23:43:31 -04:00
};
2024-08-28 18:43:40 -04:00
for tx in completed_eventualities.keys() {
log::info!("eventuality resolved by {}", hex::encode(tx.as_ref()));
2024-08-27 16:43:50 -04:00
}
2024-08-24 23:43:31 -04:00
// Fetch all non-External outputs
let mut non_external_outputs = block.scan_for_outputs(key.key);
non_external_outputs.retain(|output| output.kind() != OutputType::External);
// Drop any outputs less than the dust limit
2024-08-29 12:45:47 -04:00
// TODO: Either further filter to outputs we made or also check cost_to_aggregate
2024-08-24 23:43:31 -04:00
non_external_outputs.retain(|output| {
let balance = output.balance();
balance.amount.0 >= self.feed.dust(balance.coin).0
});
/*
Now that we have all non-External outputs, we filter them to be only the outputs which
are from transactions which resolve our own Eventualities *if* the multisig is retiring.
This implements step 6 of `spec/processor/Multisig Rotation.md`.
We may receive a Change output. The only issue with accumulating this would be if it
extends the multisig's lifetime (by increasing the amount of outputs yet to be
forwarded). By checking it's one we made, either:
1) It's a legitimate Change output to be forwarded
2) It's a Change output created by a user burning coins (specifying the Change address),
which can only be created while the multisig is actively handling `Burn`s (therefore
ensuring this multisig cannot be kept alive ad-infinitum)
The commentary on Change outputs also applies to Branch/Forwarded. They'll presumably get
ignored if not usable however.
*/
if key.stage == LifetimeStage::Finishing {
non_external_outputs
.retain(|output| completed_eventualities.contains_key(&output.transaction_id()));
}
// Now, we iterate over all Forwarded outputs and queue their InInstructions
for output in
non_external_outputs.iter().filter(|output| output.kind() == OutputType::Forwarded)
{
let Some(eventuality) = completed_eventualities.get(&output.transaction_id()) else {
// Output sent to the forwarding address yet not actually forwarded
continue;
};
let Some(forwarded) = eventuality.forwarded_output() else {
// This was a TX made by us, yet someone burned to the forwarding address
continue;
};
let (return_address, in_instruction) =
2024-08-28 19:58:28 -04:00
ScannerGlobalDb::<S>::return_address_and_in_instruction_for_forwarded_output(
&txn, &forwarded,
)
.expect("forwarded an output yet didn't save its InInstruction to the DB");
2024-08-28 19:00:02 -04:00
queue_output_until_block::<S>(
&mut txn,
b + S::WINDOW_LENGTH,
&OutputWithInInstruction { output: output.clone(), return_address, in_instruction },
);
}
2024-08-24 23:43:31 -04:00
// Accumulate all of these outputs
outputs.extend(non_external_outputs);
}
2024-08-28 18:43:40 -04:00
// Update the scheduler
let mut scheduler_update = SchedulerUpdate { outputs, forwards, returns };
scheduler_update.outputs.sort_by(sort_outputs);
scheduler_update.forwards.sort_by(sort_outputs);
scheduler_update.returns.sort_by(|a, b| sort_outputs(&a.output, &b.output));
2024-08-28 18:43:40 -04:00
// Intake the new Eventualities
let new_eventualities = self.scheduler.update(&mut txn, scheduler_update);
for (key, new_eventualities) in new_eventualities {
let key = {
let mut key_repr = <KeyFor<S> as GroupEncoding>::Repr::default();
assert_eq!(key.len(), key_repr.as_ref().len());
key_repr.as_mut().copy_from_slice(&key);
KeyFor::<S>::from_bytes(&key_repr).unwrap()
};
2024-08-28 18:43:40 -04:00
keys
.iter()
.find(|serai_key| serai_key.key == key)
.expect("queueing eventuality for key which isn't active");
let mut eventualities = EventualityDb::<S>::eventualities(&txn, key);
for new_eventuality in new_eventualities {
2024-08-28 23:31:31 -04:00
eventualities.insert(new_eventuality);
}
EventualityDb::<S>::set_eventualities(&mut txn, key, &eventualities);
}
2024-08-28 18:43:40 -04:00
// Now that we've intaked any Eventualities caused, check if we're retiring any keys
for key in &keys {
2024-08-27 16:43:50 -04:00
if key.stage == LifetimeStage::Finishing {
let eventualities = EventualityDb::<S>::eventualities(&txn, key.key);
2024-08-28 18:43:40 -04:00
// TODO: This assumes the Scheduler is empty
2024-08-27 16:43:50 -04:00
if eventualities.active_eventualities.is_empty() {
log::info!(
"key {} has finished and is being retired",
hex::encode(key.key.to_bytes().as_ref())
);
2024-08-28 18:43:40 -04:00
// Retire this key `WINDOW_LENGTH` blocks in the future to ensure the scan task never
// has a malleable view of the keys.
2024-08-28 19:58:28 -04:00
ScannerGlobalDb::<S>::retire_key(&mut txn, b + S::WINDOW_LENGTH, key.key);
2024-08-27 16:43:50 -04:00
}
}
}
2024-08-28 18:43:40 -04:00
// Update the next-to-check block
EventualityDb::<S>::set_next_to_check_for_eventualities_block(&mut txn, next_to_check);
2024-08-29 12:45:47 -04:00
// If this block was notable, update the latest-handled notable block
if is_block_notable {
EventualityDb::<S>::set_latest_handled_notable_block(&mut txn, b);
}
2024-08-21 18:41:51 -04:00
txn.commit();
}
// Run dependents if we successfully checked any blocks
2024-08-29 12:45:47 -04:00
Ok(made_progress)
2024-08-21 18:41:51 -04:00
}
}