mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-10 21:19:24 +00:00
Use a single txn for an entire coordinator message
Removes direct DB accesses whre possible. Documents the safety of the rest. Does uncover one case of unsafety not previously noted.
This commit is contained in:
@@ -37,7 +37,7 @@ pub enum ScannerEvent<C: Coin> {
|
||||
pub type ScannerEventChannel<C> = mpsc::UnboundedReceiver<ScannerEvent<C>>;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct ScannerDb<C: Coin, D: Db>(D, PhantomData<C>);
|
||||
struct ScannerDb<C: Coin, D: Db>(PhantomData<C>, PhantomData<D>);
|
||||
impl<C: Coin, D: Db> ScannerDb<C, D> {
|
||||
fn scanner_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec<u8> {
|
||||
D::key(b"SCANNER", dst, key)
|
||||
@@ -60,9 +60,8 @@ impl<C: Coin, D: Db> ScannerDb<C, D> {
|
||||
res
|
||||
})
|
||||
}
|
||||
fn block_number(&self, id: &<C::Block as Block<C>>::Id) -> Option<usize> {
|
||||
self
|
||||
.0
|
||||
fn block_number<G: Get>(getter: &G, id: &<C::Block as Block<C>>::Id) -> Option<usize> {
|
||||
getter
|
||||
.get(Self::block_number_key(id))
|
||||
.map(|number| u64::from_le_bytes(number.try_into().unwrap()).try_into().unwrap())
|
||||
}
|
||||
@@ -91,8 +90,8 @@ impl<C: Coin, D: Db> ScannerDb<C, D> {
|
||||
keys.extend(key_bytes.as_ref());
|
||||
txn.put(Self::active_keys_key(), keys);
|
||||
}
|
||||
fn active_keys(&self) -> Vec<<C::Curve as Ciphersuite>::G> {
|
||||
let bytes_vec = self.0.get(Self::active_keys_key()).unwrap_or(vec![]);
|
||||
fn active_keys<G: Get>(getter: &G) -> Vec<<C::Curve as Ciphersuite>::G> {
|
||||
let bytes_vec = getter.get(Self::active_keys_key()).unwrap_or(vec![]);
|
||||
let mut bytes: &[u8] = bytes_vec.as_ref();
|
||||
|
||||
// Assumes keys will be 32 bytes when calculating the capacity
|
||||
@@ -109,8 +108,8 @@ impl<C: Coin, D: Db> ScannerDb<C, D> {
|
||||
fn seen_key(id: &<C::Output as Output>::Id) -> Vec<u8> {
|
||||
Self::scanner_key(b"seen", id)
|
||||
}
|
||||
fn seen(&self, id: &<C::Output as Output>::Id) -> bool {
|
||||
self.0.get(Self::seen_key(id)).is_some()
|
||||
fn seen<G: Get>(getter: &G, id: &<C::Output as Output>::Id) -> bool {
|
||||
getter.get(Self::seen_key(id)).is_some()
|
||||
}
|
||||
|
||||
fn next_batch_key() -> Vec<u8> {
|
||||
@@ -201,9 +200,8 @@ impl<C: Coin, D: Db> ScannerDb<C, D> {
|
||||
// Return this block's outputs so they can be pruned from the RAM cache
|
||||
(id, outputs)
|
||||
}
|
||||
fn latest_scanned_block(&self, key: <C::Curve as Ciphersuite>::G) -> usize {
|
||||
let bytes = self
|
||||
.0
|
||||
fn latest_scanned_block<G: Get>(getter: &G, key: <C::Curve as Ciphersuite>::G) -> usize {
|
||||
let bytes = getter
|
||||
.get(Self::scanned_block_key(&key))
|
||||
.expect("asking for latest scanned block of key which wasn't rotated to");
|
||||
u64::from_le_bytes(bytes.try_into().unwrap()).try_into().unwrap()
|
||||
@@ -216,7 +214,7 @@ impl<C: Coin, D: Db> ScannerDb<C, D> {
|
||||
#[derive(Debug)]
|
||||
pub struct Scanner<C: Coin, D: Db> {
|
||||
coin: C,
|
||||
db: ScannerDb<C, D>,
|
||||
db: D,
|
||||
keys: Vec<<C::Curve as Ciphersuite>::G>,
|
||||
|
||||
eventualities: EventualitiesTracker<C::Eventuality>,
|
||||
@@ -267,7 +265,12 @@ impl<C: Coin, D: Db> ScannerHandle<C, D> {
|
||||
/// If a key has been prior set, both keys will be scanned for as detailed in the Multisig
|
||||
/// documentation. The old key will eventually stop being scanned for, leaving just the
|
||||
/// updated-to key.
|
||||
pub async fn rotate_key(&mut self, activation_number: usize, key: <C::Curve as Ciphersuite>::G) {
|
||||
pub async fn rotate_key(
|
||||
&mut self,
|
||||
txn: &mut D::Transaction<'_>,
|
||||
activation_number: usize,
|
||||
key: <C::Curve as Ciphersuite>::G,
|
||||
) {
|
||||
let mut scanner = self.scanner.write().await;
|
||||
if !scanner.keys.is_empty() {
|
||||
// Protonet will have a single, static validator set
|
||||
@@ -276,21 +279,25 @@ impl<C: Coin, D: Db> ScannerHandle<C, D> {
|
||||
}
|
||||
|
||||
info!("Rotating to key {}", hex::encode(key.to_bytes()));
|
||||
let mut txn = scanner.db.0.txn();
|
||||
let (_, outputs) = ScannerDb::<C, D>::save_scanned_block(&mut txn, &key, activation_number);
|
||||
|
||||
let (_, outputs) = ScannerDb::<C, D>::save_scanned_block(txn, &key, activation_number);
|
||||
scanner.ram_scanned.insert(key.to_bytes().as_ref().to_vec(), activation_number);
|
||||
assert!(outputs.is_empty());
|
||||
ScannerDb::<C, D>::add_active_key(&mut txn, key);
|
||||
txn.commit();
|
||||
|
||||
ScannerDb::<C, D>::add_active_key(txn, key);
|
||||
scanner.keys.push(key);
|
||||
}
|
||||
|
||||
pub async fn block_number(&self, id: &<C::Block as Block<C>>::Id) -> Option<usize> {
|
||||
self.scanner.read().await.db.block_number(id)
|
||||
// This is safe, despite not having a txn, since it's a static value
|
||||
// At worst, it's not set when it's expected to be set, yet that should be handled contextually
|
||||
ScannerDb::<C, D>::block_number(&self.scanner.read().await.db, id)
|
||||
}
|
||||
|
||||
/// Acknowledge having handled a block for a key.
|
||||
pub async fn ack_up_to_block(
|
||||
&mut self,
|
||||
txn: &mut D::Transaction<'_>,
|
||||
key: <C::Curve as Ciphersuite>::G,
|
||||
id: <C::Block as Block<C>>::Id,
|
||||
) -> (Vec<BlockHash>, Vec<C::Output>) {
|
||||
@@ -298,23 +305,20 @@ impl<C: Coin, D: Db> ScannerHandle<C, D> {
|
||||
debug!("Block {} acknowledged", hex::encode(&id));
|
||||
|
||||
// Get the number for this block
|
||||
let number =
|
||||
scanner.db.block_number(&id).expect("main loop trying to operate on data we haven't scanned");
|
||||
let number = ScannerDb::<C, D>::block_number(txn, &id)
|
||||
.expect("main loop trying to operate on data we haven't scanned");
|
||||
// Get the number of the last block we acknowledged
|
||||
let prior = scanner.db.latest_scanned_block(key);
|
||||
let prior = ScannerDb::<C, D>::latest_scanned_block(txn, key);
|
||||
|
||||
let mut blocks = vec![];
|
||||
let mut outputs = vec![];
|
||||
let mut txn = scanner.db.0.txn();
|
||||
for number in (prior + 1) ..= number {
|
||||
let (block, these_outputs) = ScannerDb::<C, D>::save_scanned_block(&mut txn, &key, number);
|
||||
let (block, these_outputs) = ScannerDb::<C, D>::save_scanned_block(txn, &key, number);
|
||||
let block = BlockHash(block.unwrap().as_ref().try_into().unwrap());
|
||||
blocks.push(block);
|
||||
outputs.extend(these_outputs);
|
||||
}
|
||||
assert_eq!(blocks.last().unwrap().as_ref(), id.as_ref());
|
||||
// TODO: This likely needs to be atomic with the scheduler?
|
||||
txn.commit();
|
||||
|
||||
for output in &outputs {
|
||||
assert!(scanner.ram_outputs.remove(output.id().as_ref()));
|
||||
@@ -329,8 +333,14 @@ impl<C: Coin, D: Db> Scanner<C, D> {
|
||||
pub fn new(coin: C, db: D) -> (ScannerHandle<C, D>, Vec<<C::Curve as Ciphersuite>::G>) {
|
||||
let (events_send, events_recv) = mpsc::unbounded_channel();
|
||||
|
||||
let db = ScannerDb(db, PhantomData);
|
||||
let keys = db.active_keys();
|
||||
let keys = ScannerDb::<C, D>::active_keys(&db);
|
||||
let mut ram_scanned = HashMap::new();
|
||||
for key in keys.clone() {
|
||||
ram_scanned.insert(
|
||||
key.to_bytes().as_ref().to_vec(),
|
||||
ScannerDb::<C, D>::latest_scanned_block(&db, key),
|
||||
);
|
||||
}
|
||||
|
||||
let scanner = Arc::new(RwLock::new(Scanner {
|
||||
coin,
|
||||
@@ -339,7 +349,7 @@ impl<C: Coin, D: Db> Scanner<C, D> {
|
||||
|
||||
eventualities: EventualitiesTracker::new(),
|
||||
|
||||
ram_scanned: HashMap::new(),
|
||||
ram_scanned,
|
||||
ram_outputs: HashSet::new(),
|
||||
|
||||
events: events_send,
|
||||
@@ -380,21 +390,7 @@ impl<C: Coin, D: Db> Scanner<C, D> {
|
||||
|
||||
for key in scanner.keys.clone() {
|
||||
let key_vec = key.to_bytes().as_ref().to_vec();
|
||||
let latest_scanned = {
|
||||
// Grab the latest scanned block according to the DB
|
||||
let db_scanned = scanner.db.latest_scanned_block(key);
|
||||
// We may, within this process's lifetime, have scanned more blocks
|
||||
// If they're still being processed, we will not have officially written them to the DB
|
||||
// as scanned yet
|
||||
// That way, if the process terminates, and is rebooted, we'll rescan from a handled
|
||||
// point, re-firing all events along the way, enabling them to be properly processed
|
||||
// In order to not re-fire them within this process's lifetime, check our RAM cache
|
||||
// of what we've scanned
|
||||
// We are allowed to re-fire them within this lifetime. It's just wasteful
|
||||
let ram_scanned = scanner.ram_scanned.get(&key_vec).cloned().unwrap_or(0);
|
||||
// Pick whichever is higher
|
||||
db_scanned.max(ram_scanned)
|
||||
};
|
||||
let latest_scanned = scanner.ram_scanned[&key_vec];
|
||||
|
||||
for i in (latest_scanned + 1) ..= latest {
|
||||
// TODO2: Check for key deprecation
|
||||
@@ -408,14 +404,15 @@ impl<C: Coin, D: Db> Scanner<C, D> {
|
||||
};
|
||||
let block_id = block.id();
|
||||
|
||||
if let Some(id) = ScannerDb::<C, D>::block(&scanner.db.0, i) {
|
||||
// These block calls are safe, despite not having a txn, since they're static values
|
||||
if let Some(id) = ScannerDb::<C, D>::block(&scanner.db, i) {
|
||||
if id != block_id {
|
||||
panic!("reorg'd from finalized {} to {}", hex::encode(id), hex::encode(block_id));
|
||||
}
|
||||
} else {
|
||||
info!("Found new block: {}", hex::encode(&block_id));
|
||||
|
||||
if let Some(id) = ScannerDb::<C, D>::block(&scanner.db.0, i.saturating_sub(1)) {
|
||||
if let Some(id) = ScannerDb::<C, D>::block(&scanner.db, i.saturating_sub(1)) {
|
||||
if id != block.parent() {
|
||||
panic!(
|
||||
"block {} doesn't build off expected parent {}",
|
||||
@@ -425,7 +422,7 @@ impl<C: Coin, D: Db> Scanner<C, D> {
|
||||
}
|
||||
}
|
||||
|
||||
let mut txn = scanner.db.0.txn();
|
||||
let mut txn = scanner.db.txn();
|
||||
ScannerDb::<C, D>::save_block(&mut txn, i, &block_id);
|
||||
txn.commit();
|
||||
}
|
||||
@@ -470,7 +467,35 @@ impl<C: Coin, D: Db> Scanner<C, D> {
|
||||
// On Bitcoin, the output ID should be unique for a given chain
|
||||
// On Monero, it's trivial to make an output sharing an ID with another
|
||||
// We should only scan outputs with valid IDs however, which will be unique
|
||||
let seen = scanner.db.seen(&id);
|
||||
|
||||
/*
|
||||
The safety of this code must satisfy the following conditions:
|
||||
1) seen is not set for the first occurrence
|
||||
2) seen is set for any future occurrence
|
||||
|
||||
seen is only written to after this code completes. Accordingly, it cannot be set
|
||||
before the first occurrence UNLESSS it's set, yet the last scanned block isn't.
|
||||
They are both written in the same database transaction, preventing this.
|
||||
|
||||
As for future occurrences, the RAM entry ensures they're handled properly even if
|
||||
the database has yet to be set.
|
||||
|
||||
On reboot, which will clear the RAM, if seen wasn't set, neither was latest scanned
|
||||
block. Accordingly, this will scan from some prior block, re-populating the RAM.
|
||||
|
||||
If seen was set, then this will be successfully read.
|
||||
|
||||
There's also no concern ram_outputs was pruned, yet seen wasn't set, as pruning
|
||||
from ram_outputs will acquire a write lock (preventing this code from acquiring
|
||||
its own write lock and running), and during its holding of the write lock, it
|
||||
commits the transaction setting seen and the latest scanned block.
|
||||
|
||||
This last case isn't true. Committing seen/latest_scanned_block happens after
|
||||
relinquishing the write lock.
|
||||
|
||||
TODO: Only update ram_outputs after committing the TXN in question.
|
||||
*/
|
||||
let seen = ScannerDb::<C, D>::seen(&scanner.db, &id);
|
||||
let id = id.as_ref().to_vec();
|
||||
if seen || scanner.ram_outputs.contains(&id) {
|
||||
panic!("scanned an output multiple times");
|
||||
@@ -483,7 +508,7 @@ impl<C: Coin, D: Db> Scanner<C, D> {
|
||||
}
|
||||
|
||||
// Save the outputs to disk
|
||||
let mut txn = scanner.db.0.txn();
|
||||
let mut txn = scanner.db.txn();
|
||||
let batch = ScannerDb::<C, D>::save_outputs(&mut txn, &key, &block_id, &outputs);
|
||||
txn.commit();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user