Definition and delineation of tasks within the scanner

Also defines primitives for the processor.
This commit is contained in:
Luke Parker
2024-08-20 11:57:56 -04:00
parent 57a0ba966b
commit 8763ef23ed
15 changed files with 653 additions and 105 deletions

View File

@@ -1,25 +1,91 @@
use core::marker::PhantomData;
use std::{
sync::Arc,
io::Read,
time::Duration,
collections::{VecDeque, HashSet, HashMap},
};
use core::fmt::Debug;
use ciphersuite::group::GroupEncoding;
use frost::curve::Ciphersuite;
use primitives::{ReceivedOutput, Block};
use log::{info, debug, warn};
use tokio::{
sync::{RwLockReadGuard, RwLockWriteGuard, RwLock, mpsc},
time::sleep,
};
mod db;
mod index;
use crate::{
Get, DbTxn, Db,
networks::{Output, Transaction, Eventuality, EventualitiesTracker, Block, Network},
};
/// A feed usable to scan a blockchain.
///
/// This defines the primitive types used, along with various getters necessary for indexing.
#[async_trait::async_trait]
pub trait ScannerFeed: Send + Sync {
/// The type of the key used to receive coins on this blockchain.
type Key: group::Group + group::GroupEncoding;
/// The type of the address used to specify who to send coins to on this blockchain.
type Address;
/// The type representing a received (and spendable) output.
type Output: ReceivedOutput<Self::Key, Self::Address>;
/// The representation of a block for this blockchain.
///
/// A block is defined as a consensus event associated with a set of transactions. It is not
/// necessary to literally define it as whatever the external network defines as a block. For
/// external networks which finalize block(s), this block type should be a representation of all
/// transactions within a finalization event.
type Block: Block;
/// An error encountered when fetching data from the blockchain.
///
/// This MUST be an ephemeral error. Retrying fetching data from the blockchain MUST eventually
/// resolve without manual intervention.
type EphemeralError: Debug;
/// Fetch the number of the latest finalized block.
///
/// The block number is its zero-indexed position within a linear view of the external network's
/// consensus. The genesis block accordingly has block number 0.
async fn latest_finalized_block_number(&self) -> Result<u64, Self::EphemeralError>;
/// Fetch a block by its number.
async fn block_by_number(&self, number: u64) -> Result<Self::Block, Self::EphemeralError>;
/// Scan a block for its outputs.
async fn scan_for_outputs(
&self,
block: &Self::Block,
key: Self::Key,
) -> Result<Self::Output, Self::EphemeralError>;
}
#[async_trait::async_trait]
pub(crate) trait ContinuallyRan: Sized {
async fn run_instance(&mut self) -> Result<(), String>;
async fn continually_run(mut self) {
// The default number of seconds to sleep before running the task again
let default_sleep_before_next_task = 5;
// The current number of seconds to sleep before running the task again
// We increment this upon errors in order to not flood the logs with errors
let mut current_sleep_before_next_task = default_sleep_before_next_task;
let increase_sleep_before_next_task = |current_sleep_before_next_task: &mut u64| {
let new_sleep = *current_sleep_before_next_task + default_sleep_before_next_task;
// Set a limit of sleeping for two minutes
*current_sleep_before_next_task = new_sleep.max(120);
};
loop {
match self.run_instance().await {
Ok(()) => {
// Upon a successful (error-free) loop iteration, reset the amount of time we sleep
current_sleep_before_next_task = default_sleep_before_next_task;
}
Err(e) => {
log::debug!("{}", e);
increase_sleep_before_next_task(&mut current_sleep_before_next_task);
}
}
// Don't run the task again for another few seconds
// This is at the start of the loop so we can continue without skipping this delay
tokio::time::sleep(core::time::Duration::from_secs(current_sleep_before_next_task)).await;
}
}
}
/*
#[derive(Clone, Debug)]
pub enum ScannerEvent<N: Network> {
// Block scanned
@@ -44,86 +110,6 @@ pub type ScannerEventChannel<N> = mpsc::UnboundedReceiver<ScannerEvent<N>>;
#[derive(Clone, Debug)]
struct ScannerDb<N: Network, D: Db>(PhantomData<N>, PhantomData<D>);
impl<N: Network, D: Db> ScannerDb<N, D> {
fn scanner_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec<u8> {
D::key(b"SCANNER", dst, key)
}
fn block_key(number: usize) -> Vec<u8> {
Self::scanner_key(b"block_id", u64::try_from(number).unwrap().to_le_bytes())
}
fn block_number_key(id: &<N::Block as Block<N>>::Id) -> Vec<u8> {
Self::scanner_key(b"block_number", id)
}
fn save_block(txn: &mut D::Transaction<'_>, number: usize, id: &<N::Block as Block<N>>::Id) {
txn.put(Self::block_number_key(id), u64::try_from(number).unwrap().to_le_bytes());
txn.put(Self::block_key(number), id);
}
fn block<G: Get>(getter: &G, number: usize) -> Option<<N::Block as Block<N>>::Id> {
getter.get(Self::block_key(number)).map(|id| {
let mut res = <N::Block as Block<N>>::Id::default();
res.as_mut().copy_from_slice(&id);
res
})
}
fn block_number<G: Get>(getter: &G, id: &<N::Block as Block<N>>::Id) -> Option<usize> {
getter
.get(Self::block_number_key(id))
.map(|number| u64::from_le_bytes(number.try_into().unwrap()).try_into().unwrap())
}
fn keys_key() -> Vec<u8> {
Self::scanner_key(b"keys", b"")
}
fn register_key(
txn: &mut D::Transaction<'_>,
activation_number: usize,
key: <N::Curve as Ciphersuite>::G,
) {
let mut keys = txn.get(Self::keys_key()).unwrap_or(vec![]);
let key_bytes = key.to_bytes();
let key_len = key_bytes.as_ref().len();
assert_eq!(keys.len() % (8 + key_len), 0);
// Sanity check this key isn't already present
let mut i = 0;
while i < keys.len() {
if &keys[(i + 8) .. ((i + 8) + key_len)] == key_bytes.as_ref() {
panic!("adding {} as a key yet it was already present", hex::encode(key_bytes));
}
i += 8 + key_len;
}
keys.extend(u64::try_from(activation_number).unwrap().to_le_bytes());
keys.extend(key_bytes.as_ref());
txn.put(Self::keys_key(), keys);
}
fn keys<G: Get>(getter: &G) -> Vec<(usize, <N::Curve as Ciphersuite>::G)> {
let bytes_vec = getter.get(Self::keys_key()).unwrap_or(vec![]);
let mut bytes: &[u8] = bytes_vec.as_ref();
// Assumes keys will be 32 bytes when calculating the capacity
// If keys are larger, this may allocate more memory than needed
// If keys are smaller, this may require additional allocations
// Either are fine
let mut res = Vec::with_capacity(bytes.len() / (8 + 32));
while !bytes.is_empty() {
let mut activation_number = [0; 8];
bytes.read_exact(&mut activation_number).unwrap();
let activation_number = u64::from_le_bytes(activation_number).try_into().unwrap();
res.push((activation_number, N::Curve::read_G(&mut bytes).unwrap()));
}
res
}
fn retire_key(txn: &mut D::Transaction<'_>) {
let keys = Self::keys(txn);
assert_eq!(keys.len(), 2);
txn.del(Self::keys_key());
Self::register_key(txn, keys[1].0, keys[1].1);
}
fn seen_key(id: &<N::Output as Output<N>>::Id) -> Vec<u8> {
Self::scanner_key(b"seen", id)
}
@@ -737,3 +723,4 @@ impl<N: Network, D: Db> Scanner<N, D> {
}
}
}
*/