use core::marker::PhantomData; use std::{ sync::Arc, time::{SystemTime, Duration}, collections::{HashSet, HashMap}, }; use group::GroupEncoding; use frost::curve::Ciphersuite; use log::{info, debug, warn}; use tokio::{ sync::{RwLock, mpsc}, time::sleep, }; use crate::{ DbTxn, Db, coins::{Output, Transaction, EventualitiesTracker, Block, Coin}, }; #[derive(Clone, Debug)] pub enum ScannerEvent { // Block scanned Block(::G, >::Id, SystemTime, Vec), // Eventuality completion found on-chain Completed([u8; 32], >::Id), } pub type ScannerEventChannel = mpsc::UnboundedReceiver>; #[derive(Clone, Debug)] struct ScannerDb(D, PhantomData); impl ScannerDb { fn scanner_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec { D::key(b"SCANNER", dst, key) } fn block_key(number: usize) -> Vec { Self::scanner_key(b"block_id", u64::try_from(number).unwrap().to_le_bytes()) } fn block_number_key(id: &>::Id) -> Vec { Self::scanner_key(b"block_number", id) } fn save_block( &mut self, txn: &mut D::Transaction, number: usize, id: &>::Id, ) { txn.put(Self::block_number_key(id), u64::try_from(number).unwrap().to_le_bytes()); txn.put(Self::block_key(number), id); } fn block(&self, number: usize) -> Option<>::Id> { self.0.get(Self::block_key(number)).map(|id| { let mut res = >::Id::default(); res.as_mut().copy_from_slice(&id); res }) } fn block_number(&self, id: &>::Id) -> Option { self .0 .get(Self::block_number_key(id)) .map(|number| u64::from_le_bytes(number.try_into().unwrap()).try_into().unwrap()) } fn active_keys_key() -> Vec { Self::scanner_key(b"active_keys", b"") } fn add_active_key(&mut self, txn: &mut D::Transaction, key: ::G) { let mut keys = self.0.get(Self::active_keys_key()).unwrap_or(vec![]); let key_bytes = key.to_bytes(); // Don't add this key if it's already present let key_len = key_bytes.as_ref().len(); let mut i = 0; while i < keys.len() { if keys[i .. (i + key_len)].as_ref() == key_bytes.as_ref() { debug!("adding {} as an active key yet it was already present", hex::encode(key_bytes)); return; } i += key_len; } keys.extend(key_bytes.as_ref()); txn.put(Self::active_keys_key(), keys); } fn active_keys(&self) -> Vec<::G> { let bytes_vec = self.0.get(Self::active_keys_key()).unwrap_or(vec![]); let mut bytes: &[u8] = bytes_vec.as_ref(); // Assumes keys will be 32 bytes when calculating the capacity // If keys are larger, this may allocate more memory than needed // If keys are smaller, this may require additional allocations // Either are fine let mut res = Vec::with_capacity(bytes.len() / 32); while !bytes.is_empty() { res.push(C::Curve::read_G(&mut bytes).unwrap()); } res } fn seen_key(id: &::Id) -> Vec { Self::scanner_key(b"seen", id) } fn seen(&self, id: &::Id) -> bool { self.0.get(Self::seen_key(id)).is_some() } fn outputs_key( key: &::G, block: &>::Id, ) -> Vec { let key_bytes = key.to_bytes(); let key = key_bytes.as_ref(); // This should be safe without the bincode serialize. Using bincode lets us not worry/have to // think about this let db_key = bincode::serialize(&(key, block.as_ref())).unwrap(); // Assert this is actually length prefixing debug_assert!(db_key.len() >= (1 + key.len() + 1 + block.as_ref().len())); Self::scanner_key(b"outputs", db_key) } fn save_outputs( &mut self, txn: &mut D::Transaction, key: &::G, block: &>::Id, outputs: &[C::Output], ) { let mut bytes = Vec::with_capacity(outputs.len() * 64); for output in outputs { output.write(&mut bytes).unwrap(); } txn.put(Self::outputs_key(key, block), bytes); } fn outputs( &self, key: &::G, block: &>::Id, ) -> Option> { let bytes_vec = self.0.get(Self::outputs_key(key, block))?; let mut bytes: &[u8] = bytes_vec.as_ref(); let mut res = vec![]; while !bytes.is_empty() { res.push(C::Output::read(&mut bytes).unwrap()); } Some(res) } fn scanned_block_key(key: &::G) -> Vec { Self::scanner_key(b"scanned_block", key.to_bytes()) } fn save_scanned_block( &mut self, txn: &mut D::Transaction, key: &::G, block: usize, ) -> Vec { let new_key = self.0.get(Self::scanned_block_key(key)).is_none(); let outputs = self.block(block).and_then(|id| self.outputs(key, &id)); // Either this is a new key, with no outputs, or we're acknowledging this block // If we're acknowledging it, we should have outputs available assert_eq!(new_key, outputs.is_none()); let outputs = outputs.unwrap_or(vec![]); // Mark all the outputs from this block as seen for output in &outputs { txn.put(Self::seen_key(&output.id()), b""); } txn.put(Self::scanned_block_key(key), u64::try_from(block).unwrap().to_le_bytes()); // Return this block's outputs so they can be pruned from the RAM cache outputs } fn latest_scanned_block(&self, key: ::G) -> usize { let bytes = self.0.get(Self::scanned_block_key(&key)).unwrap_or(vec![0; 8]); u64::from_le_bytes(bytes.try_into().unwrap()).try_into().unwrap() } } /// The Scanner emits events relating to the blockchain, notably received outputs. /// It WILL NOT fail to emit an event, even if it reboots at selected moments. /// It MAY fire the same event multiple times. #[derive(Debug)] pub struct Scanner { coin: C, db: ScannerDb, keys: Vec<::G>, eventualities: EventualitiesTracker, ram_scanned: HashMap, usize>, ram_outputs: HashSet>, events: mpsc::UnboundedSender>, } #[derive(Debug)] pub struct ScannerHandle { scanner: Arc>>, pub events: ScannerEventChannel, } impl ScannerHandle { pub async fn ram_scanned(&self) -> usize { let mut res = None; for scanned in self.scanner.read().await.ram_scanned.values() { if res.is_none() { res = Some(*scanned); } // Returns the lowest scanned value so no matter the keys interacted with, this is // sufficiently scanned res = Some(res.unwrap().min(*scanned)); } res.unwrap_or(0) } pub async fn register_eventuality( &self, block_number: usize, id: [u8; 32], eventuality: C::Eventuality, ) { self.scanner.write().await.eventualities.register(block_number, id, eventuality) } pub async fn drop_eventuality(&self, id: [u8; 32]) { self.scanner.write().await.eventualities.drop(id); } /// Rotate the key being scanned for. /// /// If no key has been prior set, this will become the key with no further actions. /// /// If a key has been prior set, both keys will be scanned for as detailed in the Multisig /// documentation. The old key will eventually stop being scanned for, leaving just the /// updated-to key. pub async fn rotate_key(&self, activation_number: usize, key: ::G) { let mut scanner = self.scanner.write().await; if !scanner.keys.is_empty() { // Protonet will have a single, static validator set // TODO2 panic!("only a single key is supported at this time"); } info!("Rotating to key {}", hex::encode(key.to_bytes())); let mut txn = scanner.db.0.txn(); assert!(scanner.db.save_scanned_block(&mut txn, &key, activation_number).is_empty()); scanner.db.add_active_key(&mut txn, key); txn.commit(); scanner.keys.push(key); } pub async fn block_number(&self, id: &>::Id) -> Option { self.scanner.read().await.db.block_number(id) } /// Acknowledge having handled a block for a key. pub async fn ack_block( &self, key: ::G, id: >::Id, ) -> Vec { let mut scanner = self.scanner.write().await; debug!("Block {} acknowledged", hex::encode(&id)); let number = scanner.db.block_number(&id).expect("main loop trying to operate on data we haven't scanned"); let mut txn = scanner.db.0.txn(); let outputs = scanner.db.save_scanned_block(&mut txn, &key, number); txn.commit(); for output in &outputs { scanner.ram_outputs.remove(output.id().as_ref()); } outputs } } impl Scanner { #[allow(clippy::new_ret_no_self)] pub fn new(coin: C, db: D) -> (ScannerHandle, Vec<::G>) { let (events_send, events_recv) = mpsc::unbounded_channel(); let db = ScannerDb(db, PhantomData); let keys = db.active_keys(); let scanner = Arc::new(RwLock::new(Scanner { coin, db, keys: keys.clone(), eventualities: EventualitiesTracker::new(), ram_scanned: HashMap::new(), ram_outputs: HashSet::new(), events: events_send, })); tokio::spawn(Scanner::run(scanner.clone())); (ScannerHandle { scanner, events: events_recv }, keys) } fn emit(&mut self, event: ScannerEvent) -> bool { if self.events.send(event).is_err() { info!("Scanner handler was dropped. Shutting down?"); return false; } true } // An async function, to be spawned on a task, to discover and report outputs async fn run(scanner: Arc>) { loop { // Only check every five seconds for new blocks sleep(Duration::from_secs(5)).await; // Scan new blocks { let mut scanner = scanner.write().await; let latest = scanner.coin.get_latest_block_number().await; let latest = match latest { // Only scan confirmed blocks, which we consider effectively finalized // CONFIRMATIONS - 1 as whatever's in the latest block already has 1 confirm Ok(latest) => latest.saturating_sub(C::CONFIRMATIONS.saturating_sub(1)), Err(_) => { warn!("Couldn't get {}'s latest block number", C::ID); sleep(Duration::from_secs(60)).await; continue; } }; for key in scanner.keys.clone() { let key_vec = key.to_bytes().as_ref().to_vec(); let latest_scanned = { // Grab the latest scanned block according to the DB let db_scanned = scanner.db.latest_scanned_block(key); // We may, within this process's lifetime, have scanned more blocks // If they're still being processed, we will not have officially written them to the DB // as scanned yet // That way, if the process terminates, and is rebooted, we'll rescan from a handled // point, re-firing all events along the way, enabling them to be properly processed // In order to not re-fire them within this process's lifetime, check our RAM cache // of what we've scanned // We are allowed to re-fire them within this lifetime. It's just wasteful let ram_scanned = scanner.ram_scanned.get(&key_vec).cloned().unwrap_or(0); // Pick whichever is higher db_scanned.max(ram_scanned) }; for i in (latest_scanned + 1) ..= latest { // TODO2: Check for key deprecation let block = match scanner.coin.get_block(i).await { Ok(block) => block, Err(_) => { warn!("Couldn't get {} block {i}", C::ID); break; } }; let block_id = block.id(); if let Some(id) = scanner.db.block(i) { // TODO2: Also check this block builds off the previous block if id != block_id { panic!("{} reorg'd from {id:?} to {:?}", C::ID, hex::encode(block_id)); } } else { info!("Found new block: {}", hex::encode(&block_id)); let mut txn = scanner.db.0.txn(); scanner.db.save_block(&mut txn, i, &block_id); txn.commit(); } // Clone coin because we can't borrow it while also mutably borrowing the eventualities // Thankfully, coin is written to be a cheap clone let coin = scanner.coin.clone(); for (id, tx) in coin.get_eventuality_completions(&mut scanner.eventualities, &block).await { // This should only happen if there's a P2P net desync or there's a malicious // validator warn!( "eventuality {} resolved by {}, as found on chain. this should not happen", hex::encode(id), hex::encode(&tx) ); if !scanner.emit(ScannerEvent::Completed(id, tx)) { return; } } let outputs = match scanner.coin.get_outputs(&block, key).await { Ok(outputs) => outputs, Err(_) => { warn!("Couldn't scan {} block {i:?}", C::ID); break; } }; // Panic if we've already seen these outputs for output in &outputs { let id = output.id(); info!( "block {} had output {} worth {}", hex::encode(&block_id), hex::encode(&id), output.amount(), ); // On Bitcoin, the output ID should be unique for a given chain // On Monero, it's trivial to make an output sharing an ID with another // We should only scan outputs with valid IDs however, which will be unique let seen = scanner.db.seen(&id); let id = id.as_ref().to_vec(); if seen || scanner.ram_outputs.contains(&id) { panic!("scanned an output multiple times"); } scanner.ram_outputs.insert(id); } if outputs.is_empty() { continue; } // Save the outputs to disk let mut txn = scanner.db.0.txn(); scanner.db.save_outputs(&mut txn, &key, &block_id, &outputs); txn.commit(); const TIME_TOLERANCE: u64 = 15; let now = SystemTime::now(); let mut time = block.time(); // Block is older than the tolerance // This isn't an issue, yet shows our daemon may have fallen behind/been disconnected if now.duration_since(time).unwrap_or(Duration::ZERO) > Duration::from_secs(TIME_TOLERANCE) { warn!( "the time is {} and we only just received a block dated {}", (now.duration_since(SystemTime::UNIX_EPOCH)).expect("now before epoch").as_secs(), (time.duration_since(SystemTime::UNIX_EPOCH)) .expect("block time before epoch") .as_secs(), ); } // If this block is in the future, either this server's clock is wrong OR the block's // miner's clock is wrong. The latter is the problem // // This time is used to schedule signing sessions over the content of this block // If it's in the future, the first attempt won't time out until this block is no // longer in the future // // Since we don't need consensus, if this time is more than 15s in the future, // set it to the local time // // As long as a supermajority of nodes set a time within ~15s of each other, this // should be fine // TODO2: Make more robust if time.duration_since(now).unwrap_or(Duration::ZERO) > Duration::from_secs(TIME_TOLERANCE) { time = now; } // Send all outputs if !scanner.emit(ScannerEvent::Block(key, block_id, time, outputs)) { return; } // Write this number as scanned so we won't re-fire these outputs scanner.ram_scanned.insert(key_vec.clone(), i); } } } } } }