Document expectations on Eventuality task and correct code determining the block safe to scan/report

This commit is contained in:
Luke Parker
2024-08-20 19:37:47 -04:00
parent 155ad48f4c
commit 74d3075dae
5 changed files with 65 additions and 278 deletions

View File

@@ -10,17 +10,17 @@ mod index;
mod scan;
mod eventuality;
mod report;
mod safe;
/// A feed usable to scan a blockchain.
///
/// This defines the primitive types used, along with various getters necessary for indexing.
#[async_trait::async_trait]
pub trait ScannerFeed: Send + Sync {
/// The amount of confirmations required for a block to be finalized.
/// The amount of blocks to process in parallel.
///
/// This value must be at least `1`.
const CONFIRMATIONS: u64;
/// This value must be at least `1`. This value should be the worst-case latency to handle a
/// block divided by the expected block time.
const WINDOW_LENGTH: u64;
/// The representation of a block for this blockchain.
///
@@ -36,19 +36,11 @@ pub trait ScannerFeed: Send + Sync {
/// resolve without manual intervention.
type EphemeralError: Debug;
/// Fetch the number of the latest block.
///
/// The block number is its zero-indexed position within a linear view of the external network's
/// consensus. The genesis block accordingly has block number 0.
async fn latest_block_number(&self) -> Result<u64, Self::EphemeralError>;
/// Fetch the number of the latest finalized block.
///
/// The block number is its zero-indexed position within a linear view of the external network's
/// consensus. The genesis block accordingly has block number 0.
async fn latest_finalized_block_number(&self) -> Result<u64, Self::EphemeralError> {
Ok(self.latest_block_number().await? - Self::CONFIRMATIONS)
}
async fn latest_finalized_block_number(&self) -> Result<u64, Self::EphemeralError>;
/// Fetch a block header by its number.
async fn block_header_by_number(
@@ -262,77 +254,7 @@ impl<N: Network, D: Db> ScannerDb<N, D> {
}
}
/// The Scanner emits events relating to the blockchain, notably received outputs.
///
/// It WILL NOT fail to emit an event, even if it reboots at selected moments.
///
/// It MAY fire the same event multiple times.
#[derive(Debug)]
pub struct Scanner<N: Network, D: Db> {
_db: PhantomData<D>,
keys: Vec<(usize, <N::Curve as Ciphersuite>::G)>,
eventualities: HashMap<Vec<u8>, EventualitiesTracker<N::Eventuality>>,
ram_scanned: Option<usize>,
ram_outputs: HashSet<Vec<u8>>,
need_ack: VecDeque<usize>,
events: mpsc::UnboundedSender<ScannerEvent<N>>,
}
#[derive(Clone, Debug)]
struct ScannerHold<N: Network, D: Db> {
scanner: Arc<RwLock<Option<Scanner<N, D>>>>,
}
impl<N: Network, D: Db> ScannerHold<N, D> {
async fn read(&self) -> RwLockReadGuard<'_, Option<Scanner<N, D>>> {
loop {
let lock = self.scanner.read().await;
if lock.is_none() {
drop(lock);
tokio::task::yield_now().await;
continue;
}
return lock;
}
}
async fn write(&self) -> RwLockWriteGuard<'_, Option<Scanner<N, D>>> {
loop {
let lock = self.scanner.write().await;
if lock.is_none() {
drop(lock);
tokio::task::yield_now().await;
continue;
}
return lock;
}
}
// This is safe to not check if something else already acquired the Scanner as the only caller is
// sequential.
async fn long_term_acquire(&self) -> Scanner<N, D> {
self.scanner.write().await.take().unwrap()
}
async fn restore(&self, scanner: Scanner<N, D>) {
let _ = self.scanner.write().await.insert(scanner);
}
}
#[derive(Debug)]
pub struct ScannerHandle<N: Network, D: Db> {
scanner: ScannerHold<N, D>,
held_scanner: Option<Scanner<N, D>>,
pub events: ScannerEventChannel<N>,
pub multisig_completed: mpsc::UnboundedSender<bool>,
}
impl<N: Network, D: Db> ScannerHandle<N, D> {
pub async fn ram_scanned(&self) -> usize {
self.scanner.read().await.as_ref().unwrap().ram_scanned.unwrap_or(0)
}
/// Register a key to scan for.
pub async fn register_key(
&mut self,
@@ -363,17 +285,6 @@ impl<N: Network, D: Db> ScannerHandle<N, D> {
scanner.eventualities.insert(key.to_bytes().as_ref().to_vec(), EventualitiesTracker::new());
}
pub fn db_scanned<G: Get>(getter: &G) -> Option<usize> {
ScannerDb::<N, D>::latest_scanned_block(getter)
}
// This perform a database read which isn't safe with regards to if the value is set or not
// It may be set, when it isn't expected to be set, or not set, when it is expected to be set
// Since the value is static, if it's set, it's correctly set
pub fn block_number<G: Get>(getter: &G, id: &<N::Block as Block<N>>::Id) -> Option<usize> {
ScannerDb::<N, D>::block_number(getter, id)
}
/// Acknowledge having handled a block.
///
/// Creates a lock over the Scanner, preventing its independent scanning operations until
@@ -447,7 +358,6 @@ impl<N: Network, D: Db> Scanner<N, D> {
network: N,
db: D,
) -> (ScannerHandle<N, D>, Vec<(usize, <N::Curve as Ciphersuite>::G)>) {
let (events_send, events_recv) = mpsc::unbounded_channel();
let (multisig_completed_send, multisig_completed_recv) = mpsc::unbounded_channel();
let keys = ScannerDb::<N, D>::keys(&db);
@@ -455,44 +365,6 @@ impl<N: Network, D: Db> Scanner<N, D> {
for key in &keys {
eventualities.insert(key.1.to_bytes().as_ref().to_vec(), EventualitiesTracker::new());
}
let ram_scanned = ScannerDb::<N, D>::latest_scanned_block(&db);
let scanner = ScannerHold {
scanner: Arc::new(RwLock::new(Some(Scanner {
_db: PhantomData,
keys: keys.clone(),
eventualities,
ram_scanned,
ram_outputs: HashSet::new(),
need_ack: VecDeque::new(),
events: events_send,
}))),
};
tokio::spawn(Scanner::run(db, network, scanner.clone(), multisig_completed_recv));
(
ScannerHandle {
scanner,
held_scanner: None,
events: events_recv,
multisig_completed: multisig_completed_send,
},
keys,
)
}
fn emit(&mut self, event: ScannerEvent<N>) -> bool {
if self.events.send(event).is_err() {
info!("Scanner handler was dropped. Shutting down?");
return false;
}
true
}
// An async function, to be spawned on a task, to discover and report outputs
@@ -576,30 +448,6 @@ impl<N: Network, D: Db> Scanner<N, D> {
info!("scanning block: {} ({block_being_scanned})", hex::encode(&block_id));
// These DB calls are safe, despite not having a txn, since they're static values
// There's no issue if they're written in advance of expected (such as on reboot)
// They're also only expected here
if let Some(id) = ScannerDb::<N, D>::block(&db, block_being_scanned) {
if id != block_id {
panic!("reorg'd from finalized {} to {}", hex::encode(id), hex::encode(block_id));
}
} else {
// TODO: Move this to an unwrap
if let Some(id) = ScannerDb::<N, D>::block(&db, block_being_scanned.saturating_sub(1)) {
if id != block.parent() {
panic!(
"block {} doesn't build off expected parent {}",
hex::encode(block_id),
hex::encode(id),
);
}
}
let mut txn = db.txn();
ScannerDb::<N, D>::save_block(&mut txn, block_being_scanned, &block_id);
txn.commit();
}
// Scan new blocks
// TODO: This lock acquisition may be long-lived...
let mut scanner_lock = scanner_hold.write().await;
@@ -617,16 +465,6 @@ impl<N: Network, D: Db> Scanner<N, D> {
has_activation = true;
}
let key_vec = key.to_bytes().as_ref().to_vec();
// TODO: These lines are the ones which will cause a really long-lived lock acquisition
for output in network.get_outputs(&block, key).await {
assert_eq!(output.key(), key);
if output.balance().amount.0 >= N::DUST {
outputs.push(output);
}
}
for (id, (block_number, tx, completion)) in network
.get_eventuality_completions(scanner.eventualities.get_mut(&key_vec).unwrap(), &block)
.await
@@ -778,10 +616,6 @@ impl<N: Network, D: Db> Scanner<N, D> {
let retired = scanner.keys.remove(0).1;
scanner.eventualities.remove(retired.to_bytes().as_ref());
}
// Update ram_scanned
scanner.ram_scanned = Some(block_being_scanned);
drop(scanner_lock);
// If we sent a Block event, once again check multisig_completed
if sent_block &&