Decide flow between scan/eventuality/report

Scan now only handles External outputs, with an associated essay going over
why. Scan directly creates the InInstruction (prior planned to be done in
Report), and Eventuality is declared to end up yielding the outputs.

That will require making the Eventuality flow two-stage. One stage to evaluate
existing Eventualities and yield outputs, and one stage to incorporate new
Eventualities before advancing the scan window.
This commit is contained in:
Luke Parker
2024-08-23 20:30:06 -04:00
parent f2ee4daf43
commit bc0cc5a754
6 changed files with 350 additions and 167 deletions

View File

@@ -5,10 +5,18 @@ use tokio::sync::mpsc;
use serai_primitives::{Coin, Amount};
use primitives::{ReceivedOutput, BlockHeader, Block};
// Logic for deciding where in its lifetime a multisig is.
mod lifetime;
// Database schema definition and associated functions.
mod db;
// Task to index the blockchain, ensuring we don't reorganize finalized blocks.
mod index;
// Scans blocks for received coins.
mod scan;
/// Check blocks for transactions expected to eventually occur.
mod eventuality;
/// Task which reports `Batch`s to Substrate.
mod report;
/// A feed usable to scan a blockchain.
@@ -16,12 +24,22 @@ mod report;
/// This defines the primitive types used, along with various getters necessary for indexing.
#[async_trait::async_trait]
pub trait ScannerFeed: Send + Sync {
/// The amount of confirmations a block must have to be considered finalized.
///
/// This value must be at least `1`.
const CONFIRMATIONS: u64;
/// The amount of blocks to process in parallel.
///
/// This value must be at least `1`. This value should be the worst-case latency to handle a
/// block divided by the expected block time.
const WINDOW_LENGTH: u64;
/// The amount of blocks which will occur in 10 minutes (approximate).
///
/// This value must be at least `1`.
const TEN_MINUTES: u64;
/// The representation of a block for this blockchain.
///
/// A block is defined as a consensus event associated with a set of transactions. It is not
@@ -152,6 +170,32 @@ pub(crate) trait ContinuallyRan: Sized {
}
}
/// A representation of a scanner.
pub struct Scanner;
impl Scanner {
/// Create a new scanner.
///
/// This will begin its execution, spawning several asynchronous tasks.
// TODO: Take start_time and binary search here?
pub fn new(start_block: u64) -> Self {
todo!("TODO")
}
/// Acknowledge a block.
///
/// This means this block was ordered on Serai in relation to `Burn` events, and all validators
/// have achieved synchrony on it.
pub fn acknowledge_block(
&mut self,
block_number: u64,
key_to_activate: Option<()>,
forwarded_outputs: Vec<()>,
eventualities_created: Vec<()>,
) {
todo!("TODO")
}
}
/*
#[derive(Clone, Debug)]
pub enum ScannerEvent<N: Network> {
@@ -172,8 +216,6 @@ pub enum ScannerEvent<N: Network> {
),
}
pub type ScannerEventChannel<N> = mpsc::UnboundedReceiver<ScannerEvent<N>>;
#[derive(Clone, Debug)]
struct ScannerDb<N: Network, D: Db>(PhantomData<N>, PhantomData<D>);
impl<N: Network, D: Db> ScannerDb<N, D> {
@@ -184,38 +226,6 @@ impl<N: Network, D: Db> ScannerDb<N, D> {
getter.get(Self::seen_key(id)).is_some()
}
fn outputs_key(block: &<N::Block as Block<N>>::Id) -> Vec<u8> {
Self::scanner_key(b"outputs", block.as_ref())
}
fn save_outputs(
txn: &mut D::Transaction<'_>,
block: &<N::Block as Block<N>>::Id,
outputs: &[N::Output],
) {
let mut bytes = Vec::with_capacity(outputs.len() * 64);
for output in outputs {
output.write(&mut bytes).unwrap();
}
txn.put(Self::outputs_key(block), bytes);
}
fn outputs(
txn: &D::Transaction<'_>,
block: &<N::Block as Block<N>>::Id,
) -> Option<Vec<N::Output>> {
let bytes_vec = txn.get(Self::outputs_key(block))?;
let mut bytes: &[u8] = bytes_vec.as_ref();
let mut res = vec![];
while !bytes.is_empty() {
res.push(N::Output::read(&mut bytes).unwrap());
}
Some(res)
}
fn scanned_block_key() -> Vec<u8> {
Self::scanner_key(b"scanned_block", [])
}
fn save_scanned_block(txn: &mut D::Transaction<'_>, block: usize) -> Vec<N::Output> {
let id = Self::block(txn, block); // It may be None for the first key rotated to
let outputs =
@@ -255,36 +265,6 @@ impl<N: Network, D: Db> ScannerDb<N, D> {
}
impl<N: Network, D: Db> ScannerHandle<N, D> {
/// Register a key to scan for.
pub async fn register_key(
&mut self,
txn: &mut D::Transaction<'_>,
activation_number: usize,
key: <N::Curve as Ciphersuite>::G,
) {
info!("Registering key {} in scanner at {activation_number}", hex::encode(key.to_bytes()));
let mut scanner_lock = self.scanner.write().await;
let scanner = scanner_lock.as_mut().unwrap();
assert!(
activation_number > scanner.ram_scanned.unwrap_or(0),
"activation block of new keys was already scanned",
);
if scanner.keys.is_empty() {
assert!(scanner.ram_scanned.is_none());
scanner.ram_scanned = Some(activation_number);
assert!(ScannerDb::<N, D>::save_scanned_block(txn, activation_number).is_empty());
}
ScannerDb::<N, D>::register_key(txn, activation_number, key);
scanner.keys.push((activation_number, key));
#[cfg(not(test))] // TODO: A test violates this. Improve the test with a better flow
assert!(scanner.keys.len() <= 2);
scanner.eventualities.insert(key.to_bytes().as_ref().to_vec(), EventualitiesTracker::new());
}
/// Acknowledge having handled a block.
///
/// Creates a lock over the Scanner, preventing its independent scanning operations until
@@ -375,53 +355,6 @@ impl<N: Network, D: Db> Scanner<N, D> {
mut multisig_completed: mpsc::UnboundedReceiver<bool>,
) {
loop {
let (ram_scanned, latest_block_to_scan) = {
// Sleep 5 seconds to prevent hammering the node/scanner lock
sleep(Duration::from_secs(5)).await;
let ram_scanned = {
let scanner_lock = scanner_hold.read().await;
let scanner = scanner_lock.as_ref().unwrap();
// If we're not scanning for keys yet, wait until we are
if scanner.keys.is_empty() {
continue;
}
let ram_scanned = scanner.ram_scanned.unwrap();
// If a Batch has taken too long to be published, start waiting until it is before
// continuing scanning
// Solves a race condition around multisig rotation, documented in the relevant doc
// and demonstrated with mini
if let Some(needing_ack) = scanner.need_ack.front() {
let next = ram_scanned + 1;
let limit = needing_ack + N::CONFIRMATIONS;
assert!(next <= limit);
if next == limit {
continue;
}
};
ram_scanned
};
(
ram_scanned,
loop {
break match network.get_latest_block_number().await {
// Only scan confirmed blocks, which we consider effectively finalized
// CONFIRMATIONS - 1 as whatever's in the latest block already has 1 confirm
Ok(latest) => latest.saturating_sub(N::CONFIRMATIONS.saturating_sub(1)),
Err(_) => {
warn!("couldn't get latest block number");
sleep(Duration::from_secs(60)).await;
continue;
}
};
},
)
};
for block_being_scanned in (ram_scanned + 1) ..= latest_block_to_scan {
// Redo the checks for if we're too far ahead
{