Pass the lifetime information to the scheduler

Enables it to decide which keys to use for fulfillment/change.
This commit is contained in:
Luke Parker
2024-08-29 17:37:45 -04:00
parent 4f6d91037e
commit 2ca7fccb08
4 changed files with 95 additions and 61 deletions

View File

@@ -1,17 +1,12 @@
use core::marker::PhantomData;
use scale::Encode;
use borsh::{BorshSerialize, BorshDeserialize};
use serai_db::{Get, DbTxn, create_db};
use primitives::{EncodableG, Eventuality, EventualityTracker};
use crate::{ScannerFeed, KeyFor, EventualityFor};
// The DB macro doesn't support `BorshSerialize + BorshDeserialize` as a bound, hence this.
trait Borshy: BorshSerialize + BorshDeserialize {}
impl<T: BorshSerialize + BorshDeserialize> Borshy for T {}
create_db!(
ScannerEventuality {
// The next block to check for resolving eventualities
@@ -20,8 +15,6 @@ create_db!(
LatestHandledNotableBlock: () -> u64,
SerializedEventualities: <K: Encode>(key: K) -> Vec<u8>,
RetiredKey: <K: Borshy>(block_number: u64) -> K,
}
);
@@ -72,19 +65,4 @@ impl<S: ScannerFeed> EventualityDb<S> {
}
res
}
pub(crate) fn retire_key(txn: &mut impl DbTxn, block_number: u64, key: KeyFor<S>) {
assert!(
RetiredKey::get::<EncodableG<KeyFor<S>>>(txn, block_number).is_none(),
"retiring multiple keys within the same block"
);
RetiredKey::set(txn, block_number, &EncodableG(key));
}
pub(crate) fn take_retired_key(txn: &mut impl DbTxn, block_number: u64) -> Option<KeyFor<S>> {
let res = RetiredKey::get::<EncodableG<KeyFor<S>>>(txn, block_number).map(|res| res.0);
if res.is_some() {
RetiredKey::del::<EncodableG<KeyFor<S>>>(txn, block_number);
}
res
}
}

View File

@@ -9,7 +9,7 @@ use primitives::{task::ContinuallyRan, OutputType, ReceivedOutput, Eventuality,
use crate::{
lifetime::LifetimeStage,
db::{
OutputWithInInstruction, ReceiverScanData, ScannerGlobalDb, SubstrateToEventualityDb,
SeraiKey, OutputWithInInstruction, ReceiverScanData, ScannerGlobalDb, SubstrateToEventualityDb,
ScanToEventualityDb,
},
BlockExt, ScannerFeed, KeyFor, EventualityFor, SchedulerUpdate, Scheduler, sort_outputs,
@@ -115,6 +115,34 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> EventualityTask<D, S, Sch> {
Self { db, feed, scheduler }
}
fn keys_and_keys_with_stages(
&self,
block_number: u64,
) -> (Vec<SeraiKey<KeyFor<S>>>, Vec<(KeyFor<S>, LifetimeStage)>) {
/*
This is proper as the keys for the next-to-scan block (at most `WINDOW_LENGTH` ahead,
which is `<= CONFIRMATIONS`) will be the keys to use here, with only minor edge cases.
This may include a key which has yet to activate by our perception. We can simply drop
those.
This may not include a key which has retired by the next-to-scan block. This task is the
one which decides when to retire a key, and when it marks a key to be retired, it is done
with it. Accordingly, it's not an issue if such a key was dropped.
This also may include a key we've retired which has yet to officially retire. That's fine as
we'll do nothing with it, and the Scheduler traits document this behavior.
*/
assert!(S::WINDOW_LENGTH <= S::CONFIRMATIONS);
let mut keys = ScannerGlobalDb::<S>::active_keys_as_of_next_to_scan_for_outputs_block(&self.db)
.expect("scanning for a blockchain without any keys set");
// Since the next-to-scan block is ahead of us, drop keys which have yet to actually activate
keys.retain(|key| block_number <= key.activation_block_number);
let keys_with_stages = keys.iter().map(|key| (key.key, key.stage)).collect::<Vec<_>>();
(keys, keys_with_stages)
}
// Returns a boolean of if we intaked any Burns.
fn intake_burns(&mut self) -> bool {
let mut intaked_any = false;
@@ -123,6 +151,11 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> EventualityTask<D, S, Sch> {
if let Some(latest_handled_notable_block) =
EventualityDb::<S>::latest_handled_notable_block(&self.db)
{
// We always intake Burns per this block as it's the block we have consensus on
// We would have a consensus failure if some thought the change should be the old key and
// others the new key
let (_keys, keys_with_stages) = self.keys_and_keys_with_stages(latest_handled_notable_block);
let mut txn = self.db.txn();
// Drain the entire channel
while let Some(burns) =
@@ -130,7 +163,7 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> EventualityTask<D, S, Sch> {
{
intaked_any = true;
let new_eventualities = self.scheduler.fulfill(&mut txn, burns);
let new_eventualities = self.scheduler.fulfill(&mut txn, &keys_with_stages, burns);
intake_eventualities::<S>(&mut txn, new_eventualities);
}
txn.commit();
@@ -154,6 +187,7 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
let mut made_progress = false;
// Start by intaking any Burns we have sitting around
// It's important we run this regardless of if we have a new block to handle
made_progress |= self.intake_burns();
/*
@@ -206,8 +240,8 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
// Since this block is notable, ensure we've intaked all the Burns preceding it
// We can know with certainty that the channel is fully populated at this time since we've
// acknowledged a newer block (so we've handled the state up to this point and new state
// will be for the newer block)
// acknowledged a newer block (so we've handled the state up to this point and any new
// state will be for the newer block)
#[allow(unused_assignments)]
{
made_progress |= self.intake_burns();
@@ -221,22 +255,7 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
log::debug!("checking eventuality completions in block: {} ({b})", hex::encode(block.id()));
/*
This is proper as the keys for the next to scan block (at most `WINDOW_LENGTH` ahead,
which is `<= CONFIRMATIONS`) will be the keys to use here, with only minor edge cases.
This may include a key which has yet to activate by our perception. We can simply drop
those.
This may not include a key which has retired by the next-to-scan block. This task is the
one which decides when to retire a key, and when it marks a key to be retired, it is done
with it. Accordingly, it's not an issue if such a key was dropped.
*/
let mut keys =
ScannerGlobalDb::<S>::active_keys_as_of_next_to_scan_for_outputs_block(&self.db)
.expect("scanning for a blockchain without any keys set");
// Since the next-to-scan block is ahead of us, drop keys which have yet to actually activate
keys.retain(|key| b <= key.activation_block_number);
let (keys, keys_with_stages) = self.keys_and_keys_with_stages(b);
let mut txn = self.db.txn();
@@ -331,7 +350,8 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
scheduler_update.forwards.sort_by(sort_outputs);
scheduler_update.returns.sort_by(|a, b| sort_outputs(&a.output, &b.output));
// Intake the new Eventualities
let new_eventualities = self.scheduler.update(&mut txn, scheduler_update);
let new_eventualities =
self.scheduler.update(&mut txn, &keys_with_stages, scheduler_update);
for key in new_eventualities.keys() {
keys
.iter()
@@ -345,7 +365,10 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
// If this is the block at which forwarding starts for this key, flush it
// We do this after we issue the above update for any efficiencies gained by doing so
if key.block_at_which_forwarding_starts == Some(b) {
assert!(key.key != keys.last().unwrap().key);
assert!(
key.key != keys.last().unwrap().key,
"key which was forwarding was the last key (which has no key after it to forward to)"
);
self.scheduler.flush_key(&mut txn, key.key, keys.last().unwrap().key);
}
@@ -361,18 +384,15 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
// Retire this key `WINDOW_LENGTH` blocks in the future to ensure the scan task never
// has a malleable view of the keys.
let retire_at = b + S::WINDOW_LENGTH;
ScannerGlobalDb::<S>::retire_key(&mut txn, retire_at, key.key);
EventualityDb::<S>::retire_key(&mut txn, retire_at, key.key);
ScannerGlobalDb::<S>::retire_key(&mut txn, b + S::WINDOW_LENGTH, key.key);
// We tell the scheduler to retire it now as we're done with it, and this fn doesn't
// require it be called with a canonical order
self.scheduler.retire_key(&mut txn, key.key);
}
}
}
// If we retired any key at this block, retire it within the scheduler
if let Some(key) = EventualityDb::<S>::take_retired_key(&mut txn, b) {
self.scheduler.retire_key(&mut txn, key);
}
// Update the next-to-check block
EventualityDb::<S>::set_next_to_check_for_eventualities_block(&mut txn, next_to_check);