Route burns through the scanner

This commit is contained in:
Luke Parker
2024-08-29 12:45:47 -04:00
parent 8ac501028d
commit f9d02d43c2
6 changed files with 223 additions and 39 deletions

View File

@@ -11,6 +11,8 @@ create_db!(
ScannerEventuality {
// The next block to check for resolving eventualities
NextToCheckForEventualitiesBlock: () -> u64,
// The latest block this task has handled which was notable
LatestHandledNotableBlock: () -> u64,
SerializedEventualities: <K: Encode>(key: K) -> Vec<u8>,
}
@@ -22,16 +24,22 @@ impl<S: ScannerFeed> EventualityDb<S> {
txn: &mut impl DbTxn,
next_to_check_for_eventualities_block: u64,
) {
assert!(
next_to_check_for_eventualities_block != 0,
"next-to-check-for-eventualities block was 0 when it's bound non-zero"
);
NextToCheckForEventualitiesBlock::set(txn, &next_to_check_for_eventualities_block);
}
pub(crate) fn next_to_check_for_eventualities_block(getter: &impl Get) -> Option<u64> {
NextToCheckForEventualitiesBlock::get(getter)
}
pub(crate) fn set_latest_handled_notable_block(
txn: &mut impl DbTxn,
latest_handled_notable_block: u64,
) {
LatestHandledNotableBlock::set(txn, &latest_handled_notable_block);
}
pub(crate) fn latest_handled_notable_block(getter: &impl Get) -> Option<u64> {
LatestHandledNotableBlock::get(getter)
}
pub(crate) fn set_eventualities(
txn: &mut impl DbTxn,
key: KeyFor<S>,

View File

@@ -6,7 +6,10 @@ use primitives::{task::ContinuallyRan, OutputType, ReceivedOutput, Eventuality,
use crate::{
lifetime::LifetimeStage,
db::{OutputWithInInstruction, ReceiverScanData, ScannerGlobalDb, ScanToEventualityDb},
db::{
OutputWithInInstruction, ReceiverScanData, ScannerGlobalDb, SubstrateToEventualityDb,
ScanToEventualityDb,
},
BlockExt, ScannerFeed, KeyFor, SchedulerUpdate, Scheduler, sort_outputs,
scan::{next_to_scan_for_outputs_block, queue_output_until_block},
};
@@ -20,6 +23,7 @@ use db::EventualityDb;
/// only allowed to scan `S::WINDOW_LENGTH - 1` blocks ahead so we can safely schedule keys to
/// retire `S::WINDOW_LENGTH` blocks out.
pub(crate) fn latest_scannable_block<S: ScannerFeed>(getter: &impl Get) -> Option<u64> {
assert!(S::WINDOW_LENGTH > 0);
EventualityDb::<S>::next_to_check_for_eventualities_block(getter)
.map(|b| b + S::WINDOW_LENGTH - 1)
}
@@ -79,24 +83,81 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> EventualityTask<D, S, Sch> {
if EventualityDb::<S>::next_to_check_for_eventualities_block(&db).is_none() {
// Initialize the DB
let mut txn = db.txn();
// We can receive outputs in `start_block`, but any descending transactions will be in the
// next block
EventualityDb::<S>::set_next_to_check_for_eventualities_block(&mut txn, start_block + 1);
EventualityDb::<S>::set_next_to_check_for_eventualities_block(&mut txn, start_block);
txn.commit();
}
Self { db, feed, scheduler }
}
// Returns a boolean of if we intaked any Burns.
fn intake_burns(&mut self) -> bool {
let mut intaked_any = false;
// If we've handled an notable block, we may have Burns being queued with it as the reference
if let Some(latest_handled_notable_block) =
EventualityDb::<S>::latest_handled_notable_block(&self.db)
{
let mut txn = self.db.txn();
// Drain the entire channel
while let Some(burns) =
SubstrateToEventualityDb::try_recv_burns(&mut txn, latest_handled_notable_block)
{
intaked_any = true;
let new_eventualities = self.scheduler.fulfill(&mut txn, burns);
// TODO: De-duplicate this with below instance via a helper function
for (key, new_eventualities) in new_eventualities {
let key = {
let mut key_repr = <KeyFor<S> as GroupEncoding>::Repr::default();
assert_eq!(key.len(), key_repr.as_ref().len());
key_repr.as_mut().copy_from_slice(&key);
KeyFor::<S>::from_bytes(&key_repr).unwrap()
};
let mut eventualities = EventualityDb::<S>::eventualities(&txn, key);
for new_eventuality in new_eventualities {
eventualities.insert(new_eventuality);
}
EventualityDb::<S>::set_eventualities(&mut txn, key, &eventualities);
}
}
txn.commit();
}
intaked_any
}
}
#[async_trait::async_trait]
impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTask<D, S, Sch> {
async fn run_iteration(&mut self) -> Result<bool, String> {
// Fetch the highest acknowledged block
let Some(highest_acknowledged) = ScannerGlobalDb::<S>::highest_acknowledged_block(&self.db)
else {
// If we've never acknowledged a block, return
return Ok(false);
};
// A boolean of if we've made any progress to return at the end of the function
let mut made_progress = false;
// Start by intaking any Burns we have sitting around
made_progress |= self.intake_burns();
/*
The set of Eventualities only increase when a block is acknowledged. Accordingly, we can only
iterate up to (and including) the block currently pending acknowledgement. "including" is
because even if block `b` causes new Eventualities, they'll only potentially resolve in block
`b + 1`.
Eventualities increase upon one of two cases:
1) We're fulfilling Burns
2) We acknowledged a block
We can't know the processor has intaked all Burns it should have when we process block `b`.
We solve this by executing a consensus protocol whenever a resolution for an Eventuality
created to fulfill Burns occurs. Accordingly, we force ourselves to obtain synchrony on such
blocks (and all preceding Burns).
This means we can only iterate up to the block currently pending acknowledgement.
We only know blocks will need acknowledgement *for sure* if they were scanned. The only other
causes are key activation and retirement (both scheduled outside the scan window). This makes
@@ -113,32 +174,38 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
next_to_scan
};
// Fetch the highest acknowledged block
let highest_acknowledged = ScannerGlobalDb::<S>::highest_acknowledged_block(&self.db)
.expect("EventualityTask run before writing the start block");
// Fetch the next block to check
let next_to_check = EventualityDb::<S>::next_to_check_for_eventualities_block(&self.db)
.expect("EventualityTask run before writing the start block");
// Check all blocks
let mut iterated = false;
for b in next_to_check .. exclusive_upper_bound {
// If the prior block was notable *and* not acknowledged, break
// This is so if it caused any Eventualities (which may resolve this block), we have them
{
// This `- 1` is safe as next to check is bound to be non-zero
// This is possible since even if we receive coins in block 0, any transactions we'd make
// would resolve in block 1 (the first block we'll check under this non-zero rule)
let prior_block = b - 1;
if ScannerGlobalDb::<S>::is_block_notable(&self.db, prior_block) &&
(prior_block > highest_acknowledged)
{
let is_block_notable = ScannerGlobalDb::<S>::is_block_notable(&self.db, b);
if is_block_notable {
/*
If this block is notable *and* not acknowledged, break.
This is so if Burns queued prior to this block's acknowledgement caused any Eventualities
(which may resolve this block), we have them. If it wasn't for that, it'd be so if this
block's acknowledgement caused any Eventualities, we have them, though those would only
potentially resolve in the next block (letting us scan this block without delay).
*/
if b > highest_acknowledged {
break;
}
// Since this block is notable, ensure we've intaked all the Burns preceding it
// We can know with certainty that the channel is fully populated at this time since we've
// acknowledged a newer block (so we've handled the state up to this point and new state
// will be for the newer block)
#[allow(unused_assignments)]
{
made_progress |= self.intake_burns();
}
}
iterated = true;
// Since we're handling this block, we are making progress
made_progress = true;
let block = self.feed.block_by_number(&self.db, b).await?;
@@ -186,6 +253,7 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
let mut non_external_outputs = block.scan_for_outputs(key.key);
non_external_outputs.retain(|output| output.kind() != OutputType::External);
// Drop any outputs less than the dust limit
// TODO: Either further filter to outputs we made or also check cost_to_aggregate
non_external_outputs.retain(|output| {
let balance = output.balance();
balance.amount.0 >= self.feed.dust(balance.coin).0
@@ -288,10 +356,16 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
// Update the next-to-check block
EventualityDb::<S>::set_next_to_check_for_eventualities_block(&mut txn, next_to_check);
// If this block was notable, update the latest-handled notable block
if is_block_notable {
EventualityDb::<S>::set_latest_handled_notable_block(&mut txn, b);
}
txn.commit();
}
// Run dependents if we successfully checked any blocks
Ok(iterated)
Ok(made_progress)
}
}