mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-11 21:49:26 +00:00
implements the db macro for processor/src/multisigs/scanner.rs
This commit is contained in:
@@ -10,11 +10,13 @@ use ciphersuite::group::GroupEncoding;
|
||||
use frost::curve::Ciphersuite;
|
||||
|
||||
use log::{info, debug, warn};
|
||||
use serai_db::create_db;
|
||||
use tokio::{
|
||||
sync::{RwLockReadGuard, RwLockWriteGuard, RwLock, mpsc},
|
||||
time::sleep,
|
||||
};
|
||||
|
||||
use scale::Encode;
|
||||
use crate::{
|
||||
Get, DbTxn, Db,
|
||||
networks::{Output, Transaction, EventualitiesTracker, Block, Network},
|
||||
@@ -30,48 +32,61 @@ pub enum ScannerEvent<N: Network> {
|
||||
|
||||
pub type ScannerEventChannel<N> = mpsc::UnboundedReceiver<ScannerEvent<N>>;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct ScannerDb<N: Network, D: Db>(PhantomData<N>, PhantomData<D>);
|
||||
impl<N: Network, D: Db> ScannerDb<N, D> {
|
||||
fn scanner_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec<u8> {
|
||||
D::key(b"SCANNER", dst, key)
|
||||
create_db!(
|
||||
ScannerDb {
|
||||
BlockKeyDb: (number: u64) -> Vec<u8>,
|
||||
BlockNumberKeyDb: (id: Vec<u8>) -> u64,
|
||||
KeysDb: () -> Vec<u8>,
|
||||
SeenDb: (id: Vec<u8>) -> Vec<u8>,
|
||||
OutputsDb: (block: Vec<u8>) -> Vec<u8>,
|
||||
ScannedBlocksDb: () -> u64,
|
||||
RetirementBlocksDb: (key: Vec<u8>) -> u64
|
||||
}
|
||||
);
|
||||
|
||||
impl BlockKeyDb {
|
||||
|
||||
fn save_block<N: Network>(txn: &mut impl DbTxn, number: usize, id: &<N::Block as Block<N>>::Id) {
|
||||
Self::set(
|
||||
txn,
|
||||
u64::try_from(number).unwrap(),
|
||||
&BlockNumberKeyDb::to_block_number_key::<N>(id)
|
||||
);
|
||||
BlockNumberKeyDb::set(
|
||||
txn,
|
||||
BlockNumberKeyDb::to_block_number_key::<N>(id),
|
||||
&u64::try_from(number).unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
fn block_key(number: usize) -> Vec<u8> {
|
||||
Self::scanner_key(b"block_id", u64::try_from(number).unwrap().to_le_bytes())
|
||||
}
|
||||
fn block_number_key(id: &<N::Block as Block<N>>::Id) -> Vec<u8> {
|
||||
Self::scanner_key(b"block_number", id)
|
||||
}
|
||||
fn save_block(txn: &mut D::Transaction<'_>, number: usize, id: &<N::Block as Block<N>>::Id) {
|
||||
txn.put(Self::block_number_key(id), u64::try_from(number).unwrap().to_le_bytes());
|
||||
txn.put(Self::block_key(number), id);
|
||||
}
|
||||
fn block<G: Get>(getter: &G, number: usize) -> Option<<N::Block as Block<N>>::Id> {
|
||||
getter.get(Self::block_key(number)).map(|id| {
|
||||
fn block<N: Network>(getter: &impl Get, number: usize) -> Option<<N::Block as Block<N>>::Id> {
|
||||
Self::get(getter, number.try_into().unwrap()).map(|bytes| {
|
||||
let mut res = <N::Block as Block<N>>::Id::default();
|
||||
res.as_mut().copy_from_slice(&id);
|
||||
res.as_mut().copy_from_slice(&bytes);
|
||||
res
|
||||
})
|
||||
}
|
||||
fn block_number<G: Get>(getter: &G, id: &<N::Block as Block<N>>::Id) -> Option<usize> {
|
||||
getter
|
||||
.get(Self::block_number_key(id))
|
||||
.map(|number| u64::from_le_bytes(number.try_into().unwrap()).try_into().unwrap())
|
||||
}
|
||||
|
||||
impl BlockNumberKeyDb {
|
||||
fn to_block_number_key<N: Network>(id: &<N::Block as Block<N>>::Id) -> Vec<u8> {
|
||||
id.as_ref().into()
|
||||
}
|
||||
|
||||
fn keys_key() -> Vec<u8> {
|
||||
Self::scanner_key(b"keys", b"")
|
||||
fn block_number<N: Network>(getter: &impl Get, id: &<N::Block as Block<N>>::Id) -> Option<usize> {
|
||||
let key = Self::to_block_number_key::<N>(id);
|
||||
Self::get(getter, key).map(|number| usize::try_from(number).unwrap())
|
||||
}
|
||||
fn register_key(
|
||||
txn: &mut D::Transaction<'_>,
|
||||
}
|
||||
|
||||
impl KeysDb {
|
||||
fn register_key<N: Network>(
|
||||
txn: &mut impl DbTxn,
|
||||
activation_number: usize,
|
||||
key: <N::Curve as Ciphersuite>::G,
|
||||
) {
|
||||
let mut keys = txn.get(Self::keys_key()).unwrap_or(vec![]);
|
||||
|
||||
let mut keys = Self::get(txn).unwrap_or_default();
|
||||
let key_bytes = key.to_bytes();
|
||||
|
||||
let key_len = key_bytes.as_ref().len();
|
||||
assert_eq!(keys.len() % (8 + key_len), 0);
|
||||
|
||||
@@ -86,10 +101,11 @@ impl<N: Network, D: Db> ScannerDb<N, D> {
|
||||
|
||||
keys.extend(u64::try_from(activation_number).unwrap().to_le_bytes());
|
||||
keys.extend(key_bytes.as_ref());
|
||||
txn.put(Self::keys_key(), keys);
|
||||
Self::set(txn, &keys);
|
||||
}
|
||||
fn keys<G: Get>(getter: &G) -> Vec<(usize, <N::Curve as Ciphersuite>::G)> {
|
||||
let bytes_vec = getter.get(Self::keys_key()).unwrap_or(vec![]);
|
||||
|
||||
fn keys<N: Network>(getter: &impl Get) -> Vec<(usize, <N::Curve as Ciphersuite>::G)> {
|
||||
let bytes_vec = Self::get(getter).unwrap_or_default();
|
||||
let mut bytes: &[u8] = bytes_vec.as_ref();
|
||||
|
||||
// Assumes keys will be 32 bytes when calculating the capacity
|
||||
@@ -100,31 +116,36 @@ impl<N: Network, D: Db> ScannerDb<N, D> {
|
||||
while !bytes.is_empty() {
|
||||
let mut activation_number = [0; 8];
|
||||
bytes.read_exact(&mut activation_number).unwrap();
|
||||
let activation_number = u64::from_le_bytes(activation_number).try_into().unwrap();
|
||||
|
||||
res.push((activation_number, N::Curve::read_G(&mut bytes).unwrap()));
|
||||
res.push((u64::from_le_bytes(activation_number).try_into().unwrap(), N::Curve::read_G(&mut bytes).unwrap()));
|
||||
}
|
||||
res
|
||||
}
|
||||
fn retire_key(txn: &mut D::Transaction<'_>) {
|
||||
let keys = Self::keys(txn);
|
||||
|
||||
fn retire_key<N: Network>(txn: &mut impl DbTxn) {
|
||||
let keys = Self::keys::<N>(txn);
|
||||
assert_eq!(keys.len(), 2);
|
||||
txn.del(Self::keys_key());
|
||||
Self::register_key(txn, keys[1].0, keys[1].1);
|
||||
txn.del(Self::key());
|
||||
Self::register_key::<N>(txn, keys[1].0, keys[1].1);
|
||||
}
|
||||
}
|
||||
|
||||
impl SeenDb {
|
||||
fn to_seen_key<N: Network>(id: &<N::Output as Output<N>>::Id) -> Vec<u8> {
|
||||
id.as_ref().into()
|
||||
}
|
||||
|
||||
fn seen_key(id: &<N::Output as Output<N>>::Id) -> Vec<u8> {
|
||||
Self::scanner_key(b"seen", id)
|
||||
fn seen<N: Network>(getter: &impl Get, id: &<N::Output as Output<N>>::Id) -> bool {
|
||||
Self::get(getter, Self::to_seen_key::<N>(id)).is_some()
|
||||
}
|
||||
fn seen<G: Get>(getter: &G, id: &<N::Output as Output<N>>::Id) -> bool {
|
||||
getter.get(Self::seen_key(id)).is_some()
|
||||
}
|
||||
|
||||
impl OutputsDb {
|
||||
fn to_outputs_key<N: Network>(block: &<N::Block as Block<N>>::Id) -> Vec<u8> {
|
||||
block.as_ref().into()
|
||||
}
|
||||
|
||||
fn outputs_key(block: &<N::Block as Block<N>>::Id) -> Vec<u8> {
|
||||
Self::scanner_key(b"outputs", block.as_ref())
|
||||
}
|
||||
fn save_outputs(
|
||||
txn: &mut D::Transaction<'_>,
|
||||
fn save_outputs<N: Network>(
|
||||
txn: &mut impl DbTxn,
|
||||
block: &<N::Block as Block<N>>::Id,
|
||||
outputs: &[N::Output],
|
||||
) {
|
||||
@@ -132,13 +153,14 @@ impl<N: Network, D: Db> ScannerDb<N, D> {
|
||||
for output in outputs {
|
||||
output.write(&mut bytes).unwrap();
|
||||
}
|
||||
txn.put(Self::outputs_key(block), bytes);
|
||||
Self::set(txn, Self::to_outputs_key::<N>(block), &bytes);
|
||||
}
|
||||
fn outputs(
|
||||
txn: &D::Transaction<'_>,
|
||||
|
||||
fn outputs<N: Network>(
|
||||
txn: &impl DbTxn,
|
||||
block: &<N::Block as Block<N>>::Id,
|
||||
) -> Option<Vec<N::Output>> {
|
||||
let bytes_vec = txn.get(Self::outputs_key(block))?;
|
||||
let bytes_vec = Self::get(txn, Self::to_outputs_key::<N>(block))?;
|
||||
let mut bytes: &[u8] = bytes_vec.as_ref();
|
||||
|
||||
let mut res = vec![];
|
||||
@@ -147,46 +169,44 @@ impl<N: Network, D: Db> ScannerDb<N, D> {
|
||||
}
|
||||
Some(res)
|
||||
}
|
||||
}
|
||||
|
||||
fn scanned_block_key() -> Vec<u8> {
|
||||
Self::scanner_key(b"scanned_block", [])
|
||||
}
|
||||
|
||||
fn save_scanned_block(txn: &mut D::Transaction<'_>, block: usize) -> Vec<N::Output> {
|
||||
let id = Self::block(txn, block); // It may be None for the first key rotated to
|
||||
let outputs =
|
||||
if let Some(id) = id.as_ref() { Self::outputs(txn, id).unwrap_or(vec![]) } else { vec![] };
|
||||
|
||||
impl ScannedBlocksDb {
|
||||
fn save_scanned_block<N: Network>(txn: &mut impl DbTxn, block: usize) -> Vec<N::Output> {
|
||||
let id = BlockKeyDb::block::<N>(txn, block);
|
||||
let outputs = id.as_ref().and_then(|id| OutputsDb::outputs::<N>(txn, id)).unwrap_or_default();
|
||||
|
||||
// Mark all the outputs from this block as seen
|
||||
for output in &outputs {
|
||||
txn.put(Self::seen_key(&output.id()), b"");
|
||||
SeenDb::set(txn, SeenDb::to_seen_key::<N>(&output.id()), b"");
|
||||
}
|
||||
|
||||
txn.put(Self::scanned_block_key(), u64::try_from(block).unwrap().to_le_bytes());
|
||||
Self::set(txn, &u64::try_from(block).unwrap());
|
||||
|
||||
// Return this block's outputs so they can be pruned from the RAM cache
|
||||
outputs
|
||||
}
|
||||
fn latest_scanned_block<G: Get>(getter: &G) -> Option<usize> {
|
||||
getter
|
||||
.get(Self::scanned_block_key())
|
||||
.map(|bytes| u64::from_le_bytes(bytes.try_into().unwrap()).try_into().unwrap())
|
||||
|
||||
fn latest_scanned_block(getter: &impl Get) -> Option<usize> {
|
||||
Self::get(getter)
|
||||
.map(|number| usize::try_from(number).unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
impl RetirementBlocksDb {
|
||||
fn to_retirement_block_key<N: Network>(key: &<N::Curve as Ciphersuite>::G) -> Vec<u8> {
|
||||
key.to_bytes().as_ref().to_vec()
|
||||
}
|
||||
|
||||
fn retirement_block_key(key: &<N::Curve as Ciphersuite>::G) -> Vec<u8> {
|
||||
Self::scanner_key(b"retirement_block", key.to_bytes())
|
||||
}
|
||||
fn save_retirement_block(
|
||||
txn: &mut D::Transaction<'_>,
|
||||
fn save_retirement_block<N: Network>(
|
||||
txn: &mut impl DbTxn,
|
||||
key: &<N::Curve as Ciphersuite>::G,
|
||||
block: usize,
|
||||
) {
|
||||
txn.put(Self::retirement_block_key(key), u64::try_from(block).unwrap().to_le_bytes());
|
||||
Self::set(txn, Self::to_retirement_block_key::<N>(key), &u64::try_from(block).unwrap());
|
||||
}
|
||||
fn retirement_block<G: Get>(getter: &G, key: &<N::Curve as Ciphersuite>::G) -> Option<usize> {
|
||||
getter
|
||||
.get(Self::retirement_block_key(key))
|
||||
.map(|bytes| usize::try_from(u64::from_le_bytes(bytes.try_into().unwrap())).unwrap())
|
||||
|
||||
fn retirement_block<N: Network>(getter: &impl Get, key: &<N::Curve as Ciphersuite>::G) -> Option<usize> {
|
||||
Self::get(getter, Self::to_retirement_block_key::<N>(key)).map(|number| usize::try_from(number).unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -280,10 +300,10 @@ impl<N: Network, D: Db> ScannerHandle<N, D> {
|
||||
if scanner.keys.is_empty() {
|
||||
assert!(scanner.ram_scanned.is_none());
|
||||
scanner.ram_scanned = Some(activation_number);
|
||||
assert!(ScannerDb::<N, D>::save_scanned_block(txn, activation_number).is_empty());
|
||||
assert!(ScannedBlocksDb::save_scanned_block::<N>(txn, activation_number).is_empty());
|
||||
}
|
||||
|
||||
ScannerDb::<N, D>::register_key(txn, activation_number, key);
|
||||
KeysDb::register_key::<N>(txn, activation_number, key);
|
||||
scanner.keys.push((activation_number, key));
|
||||
#[cfg(not(test))] // TODO: A test violates this. Improve the test with a better flow
|
||||
assert!(scanner.keys.len() <= 2);
|
||||
@@ -292,14 +312,14 @@ impl<N: Network, D: Db> ScannerHandle<N, D> {
|
||||
}
|
||||
|
||||
pub fn db_scanned<G: Get>(getter: &G) -> Option<usize> {
|
||||
ScannerDb::<N, D>::latest_scanned_block(getter)
|
||||
ScannedBlocksDb::latest_scanned_block(getter)
|
||||
}
|
||||
|
||||
// This perform a database read which isn't safe with regards to if the value is set or not
|
||||
// It may be set, when it isn't expected to be set, or not set, when it is expected to be set
|
||||
// Since the value is static, if it's set, it's correctly set
|
||||
pub fn block_number<G: Get>(getter: &G, id: &<N::Block as Block<N>>::Id) -> Option<usize> {
|
||||
ScannerDb::<N, D>::block_number(getter, id)
|
||||
BlockNumberKeyDb::block_number::<N>(getter, id)
|
||||
}
|
||||
|
||||
/// Acknowledge having handled a block.
|
||||
@@ -318,11 +338,11 @@ impl<N: Network, D: Db> ScannerHandle<N, D> {
|
||||
let mut scanner = self.scanner.long_term_acquire().await;
|
||||
|
||||
// Get the number for this block
|
||||
let number = ScannerDb::<N, D>::block_number(txn, &id)
|
||||
let number = BlockNumberKeyDb::block_number::<N>(txn, &id)
|
||||
.expect("main loop trying to operate on data we haven't scanned");
|
||||
log::trace!("block {} was {number}", hex::encode(&id));
|
||||
|
||||
let outputs = ScannerDb::<N, D>::save_scanned_block(txn, number);
|
||||
let outputs = ScannedBlocksDb::save_scanned_block::<N>(txn, number);
|
||||
// This has a race condition if we try to ack a block we scanned on a prior boot, and we have
|
||||
// yet to scan it on this boot
|
||||
assert!(number <= scanner.ram_scanned.unwrap());
|
||||
@@ -335,10 +355,10 @@ impl<N: Network, D: Db> ScannerHandle<N, D> {
|
||||
self.held_scanner = Some(scanner);
|
||||
|
||||
// Load the key from the DB, as it will have already been removed from RAM if retired
|
||||
let key = ScannerDb::<N, D>::keys(txn)[0].1;
|
||||
let is_retirement_block = ScannerDb::<N, D>::retirement_block(txn, &key) == Some(number);
|
||||
let key = KeysDb::keys::<N>(txn)[0].1;
|
||||
let is_retirement_block = RetirementBlocksDb::retirement_block::<N>(txn, &key) == Some(number);
|
||||
if is_retirement_block {
|
||||
ScannerDb::<N, D>::retire_key(txn);
|
||||
KeysDb::retire_key::<N>(txn);
|
||||
}
|
||||
(is_retirement_block, outputs)
|
||||
}
|
||||
@@ -378,13 +398,13 @@ impl<N: Network, D: Db> Scanner<N, D> {
|
||||
let (events_send, events_recv) = mpsc::unbounded_channel();
|
||||
let (multisig_completed_send, multisig_completed_recv) = mpsc::unbounded_channel();
|
||||
|
||||
let keys = ScannerDb::<N, D>::keys(&db);
|
||||
let keys = KeysDb::keys::<N>(&db);
|
||||
let mut eventualities = HashMap::new();
|
||||
for key in &keys {
|
||||
eventualities.insert(key.1.to_bytes().as_ref().to_vec(), EventualitiesTracker::new());
|
||||
}
|
||||
|
||||
let ram_scanned = ScannerDb::<N, D>::latest_scanned_block(&db);
|
||||
let ram_scanned = ScannedBlocksDb::latest_scanned_block(&db);
|
||||
|
||||
let scanner = ScannerHold {
|
||||
scanner: Arc::new(RwLock::new(Some(Scanner {
|
||||
@@ -510,13 +530,13 @@ impl<N: Network, D: Db> Scanner<N, D> {
|
||||
// These DB calls are safe, despite not having a txn, since they're static values
|
||||
// There's no issue if they're written in advance of expected (such as on reboot)
|
||||
// They're also only expected here
|
||||
if let Some(id) = ScannerDb::<N, D>::block(&db, block_being_scanned) {
|
||||
if let Some(id) = BlockKeyDb::block::<N>(&db, block_being_scanned) {
|
||||
if id != block_id {
|
||||
panic!("reorg'd from finalized {} to {}", hex::encode(id), hex::encode(block_id));
|
||||
}
|
||||
} else {
|
||||
// TODO: Move this to an unwrap
|
||||
if let Some(id) = ScannerDb::<N, D>::block(&db, block_being_scanned.saturating_sub(1)) {
|
||||
if let Some(id) = BlockKeyDb::block::<N>(&db, block_being_scanned.saturating_sub(1)) {
|
||||
if id != block.parent() {
|
||||
panic!(
|
||||
"block {} doesn't build off expected parent {}",
|
||||
@@ -527,7 +547,7 @@ impl<N: Network, D: Db> Scanner<N, D> {
|
||||
}
|
||||
|
||||
let mut txn = db.txn();
|
||||
ScannerDb::<N, D>::save_block(&mut txn, block_being_scanned, &block_id);
|
||||
BlockKeyDb::save_block::<N>(&mut txn, block_being_scanned, &block_id);
|
||||
txn.commit();
|
||||
}
|
||||
|
||||
@@ -617,7 +637,7 @@ impl<N: Network, D: Db> Scanner<N, D> {
|
||||
|
||||
TODO2: Only update ram_outputs after committing the TXN in question.
|
||||
*/
|
||||
let seen = ScannerDb::<N, D>::seen(&db, &id);
|
||||
let seen = SeenDb::seen::<N>(&db, &id);
|
||||
let id = id.as_ref().to_vec();
|
||||
if seen || scanner.ram_outputs.contains(&id) {
|
||||
panic!("scanned an output multiple times");
|
||||
@@ -644,9 +664,9 @@ impl<N: Network, D: Db> Scanner<N, D> {
|
||||
if completed {
|
||||
let mut txn = db.txn();
|
||||
// The retiring key is the earliest one still around
|
||||
let retiring_key = ScannerDb::<N, D>::keys(&txn)[0].1;
|
||||
let retiring_key = KeysDb::keys::<N>(&txn)[0].1;
|
||||
// This value is static w.r.t. the key
|
||||
ScannerDb::<N, D>::save_retirement_block(
|
||||
RetirementBlocksDb::save_retirement_block::<N>(
|
||||
&mut txn,
|
||||
&retiring_key,
|
||||
block_number + N::CONFIRMATIONS,
|
||||
@@ -679,11 +699,11 @@ impl<N: Network, D: Db> Scanner<N, D> {
|
||||
// - There's outputs
|
||||
// as only those blocks are meaningful and warrant obtaining synchrony over
|
||||
let is_retirement_block =
|
||||
ScannerDb::<N, D>::retirement_block(&db, &scanner.keys[0].1) == Some(block_being_scanned);
|
||||
RetirementBlocksDb::retirement_block::<N>(&db, &scanner.keys[0].1) == Some(block_being_scanned);
|
||||
let sent_block = if has_activation || is_retirement_block || (!outputs.is_empty()) {
|
||||
// Save the outputs to disk
|
||||
let mut txn = db.txn();
|
||||
ScannerDb::<N, D>::save_outputs(&mut txn, &block_id, &outputs);
|
||||
OutputsDb::save_outputs::<N>(&mut txn, &block_id, &outputs);
|
||||
txn.commit();
|
||||
|
||||
// Send all outputs
|
||||
|
||||
Reference in New Issue
Block a user