From 90f2b03595780baad6f6c0273f5d07195b259b9d Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Tue, 11 Apr 2023 05:49:27 -0400 Subject: [PATCH] Finish routing eventualities Also corrects some misc TODOs and tidies up some log statements. --- processor/src/coins/mod.rs | 15 ++++ processor/src/coordinator.rs | 1 - processor/src/main.rs | 17 +++- processor/src/scanner.rs | 44 +++++++++-- processor/src/signer.rs | 129 +++++++++++++++++++++---------- processor/src/tests/addresses.rs | 6 ++ processor/src/tests/scanner.rs | 3 + processor/src/tests/wallet.rs | 6 ++ 8 files changed, 171 insertions(+), 50 deletions(-) diff --git a/processor/src/coins/mod.rs b/processor/src/coins/mod.rs index 9d6b28de..c2de39a4 100644 --- a/processor/src/coins/mod.rs +++ b/processor/src/coins/mod.rs @@ -148,6 +148,21 @@ impl EventualitiesTracker { // If our self tracker already went past this block number, set it back self.block_number = self.block_number.min(block_number); } + + pub fn drop(&mut self, id: [u8; 32]) { + // O(n) due to the lack of a reverse lookup + let mut found_key = None; + for (key, value) in &self.map { + if value.0 == id { + found_key = Some(key.clone()); + break; + } + } + + if let Some(key) = found_key { + self.map.remove(&key); + } + } } impl Default for EventualitiesTracker { diff --git a/processor/src/coordinator.rs b/processor/src/coordinator.rs index 92cba33e..0acb2541 100644 --- a/processor/src/coordinator.rs +++ b/processor/src/coordinator.rs @@ -5,7 +5,6 @@ use std::{ use messages::{ProcessorMessage, CoordinatorMessage}; -// TODO: Also include the coin block height here so we can delay handling if not synced? #[derive(Clone, PartialEq, Eq, Debug)] pub struct Message { pub id: u64, diff --git a/processor/src/main.rs b/processor/src/main.rs index b5c0173a..f562925b 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -109,7 +109,10 @@ async fn get_fee(coin: &C, block_number: usize) -> C::Fee { return block.median_fee(); } Err(e) => { - error!("couldn't get block {}: {e}", block_number); + error!( + "couldn't get block {block_number} in get_fee. {} {}", + "this should only happen if the node is offline. error: ", e + ); // Since this block is considered finalized, we shouldn't be unable to get it unless the // node is offline, hence the long sleep sleep(Duration::from_secs(60)).await; @@ -455,8 +458,6 @@ async fn run(raw_db: D, coin: C, mut coordinato }, msg = scanner.events.recv() => { - // These need to be sent to the coordinator which needs to check they aren't replayed - // TODO match msg.unwrap() { ScannerEvent::Block(key, block, time, outputs) => { let key = key.to_bytes().as_ref().to_vec(); @@ -500,6 +501,13 @@ async fn run(raw_db: D, coin: C, mut coordinato substrate_signers[&key].sign(time, batch).await; }, + + ScannerEvent::Completed(id, tx) => { + // We don't know which signer had this plan, so inform all of them + for (_, signer) in signers.iter_mut() { + signer.eventuality_completion(id, &tx).await; + } + }, } }, @@ -526,7 +534,10 @@ async fn run(raw_db: D, coin: C, mut coordinato }, SignerEvent::SignedTransaction { id, tx } => { + // If we die after calling finish_signing, we'll never fire Completed + // TODO: Is that acceptable? Do we need to fire Completed before firing finish_signing? main_db.finish_signing(&key, id); + scanner.drop_eventuality(id).await; coordinator .send(ProcessorMessage::Sign(messages::sign::ProcessorMessage::Completed { key: key.to_vec(), diff --git a/processor/src/scanner.rs b/processor/src/scanner.rs index 2115e86e..b2f2b4fb 100644 --- a/processor/src/scanner.rs +++ b/processor/src/scanner.rs @@ -16,13 +16,15 @@ use tokio::{ use crate::{ DbTxn, Db, - coins::{Output, EventualitiesTracker, Block, Coin}, + coins::{Output, Transaction, EventualitiesTracker, Block, Coin}, }; #[derive(Clone, Debug)] pub enum ScannerEvent { // Block scanned Block(::G, >::Id, SystemTime, Vec), + // Eventuality completion found on-chain + Completed([u8; 32], >::Id), } pub type ScannerEventChannel = mpsc::UnboundedReceiver>; @@ -68,15 +70,31 @@ impl ScannerDb { } fn add_active_key(&mut self, txn: &mut D::Transaction, key: ::G) { let mut keys = self.0.get(Self::active_keys_key()).unwrap_or(vec![]); - // TODO: Don't do this if the key is already marked active (which can happen based on reboot - // timing) - keys.extend(key.to_bytes().as_ref()); + + let key_bytes = key.to_bytes(); + + // Don't add this key if it's already present + let key_len = key_bytes.as_ref().len(); + let mut i = 0; + while i < keys.len() { + if keys[i .. (i + key_len)].as_ref() == key_bytes.as_ref() { + debug!("adding {} as an active key yet it was already present", hex::encode(key_bytes)); + return; + } + i += key_len; + } + + keys.extend(key_bytes.as_ref()); txn.put(Self::active_keys_key(), keys); } fn active_keys(&self) -> Vec<::G> { let bytes_vec = self.0.get(Self::active_keys_key()).unwrap_or(vec![]); let mut bytes: &[u8] = bytes_vec.as_ref(); + // Assumes keys will be 32 bytes when calculating the capacity + // If keys are larger, this may allocate more memory than needed + // If keys are smaller, this may require additional allocations + // Either are fine let mut res = Vec::with_capacity(bytes.len() / 32); while !bytes.is_empty() { res.push(C::Curve::read_G(&mut bytes).unwrap()); @@ -210,6 +228,10 @@ impl ScannerHandle { self.scanner.write().await.eventualities.register(block_number, id, eventuality) } + pub async fn drop_eventuality(&self, id: [u8; 32]) { + self.scanner.write().await.eventualities.drop(id); + } + /// Rotate the key being scanned for. /// /// If no key has been prior set, this will become the key with no further actions. @@ -362,9 +384,17 @@ impl Scanner { for (id, tx) in coin.get_eventuality_completions(&mut scanner.eventualities, &block).await { - // TODO: Fire Completed - let _ = id; - let _ = tx; + // This should only happen if there's a P2P net desync or there's a malicious + // validator + warn!( + "eventuality {} resolved by {}, as found on chain. this should not happen", + hex::encode(id), + hex::encode(&tx) + ); + + if !scanner.emit(ScannerEvent::Completed(id, tx)) { + return; + } } let outputs = match scanner.coin.get_outputs(&block, key).await { diff --git a/processor/src/signer.rs b/processor/src/signer.rs index 6ba62f1d..beac611e 100644 --- a/processor/src/signer.rs +++ b/processor/src/signer.rs @@ -49,7 +49,7 @@ impl SignerDb { &mut self, txn: &mut D::Transaction, id: [u8; 32], - tx: >::Id, + tx: &>::Id, ) { // Transactions can be completed by multiple signatures // Save every solution in order to be robust @@ -165,7 +165,11 @@ impl Signer { // If we don't have an attempt logged, it's because the coordinator is faulty OR // because we rebooted None => { - warn!("not attempting {:?}. this is an error if we didn't reboot", id); + warn!( + "not attempting {} #{}. this is an error if we didn't reboot", + hex::encode(id.id), + id.attempt + ); // Don't panic on the assumption we rebooted Err(())?; } @@ -191,6 +195,57 @@ impl Signer { } } + async fn eventuality_completion( + &mut self, + id: [u8; 32], + tx_id: &>::Id, + ) { + if let Some(eventuality) = self.db.eventuality(id) { + // Transaction hasn't hit our mempool/was dropped for a different signature + // The latter can happen given certain latency conditions/a single malicious signer + // In the case of a single malicious signer, they can drag multiple honest + // validators down with them, so we unfortunately can't slash on this case + let Ok(tx) = self.coin.get_transaction(tx_id).await else { + warn!( + "a validator claimed {} completed {} yet we didn't have that TX in our mempool", + hex::encode(tx_id), + hex::encode(id), + ); + return; + }; + + if self.coin.confirm_completion(&eventuality, &tx) { + debug!("eventuality for {} resolved in TX {}", hex::encode(id), hex::encode(tx_id)); + + // Stop trying to sign for this TX + let mut txn = self.db.0.txn(); + self.db.save_transaction(&mut txn, &tx); + self.db.complete(&mut txn, id, tx_id); + txn.commit(); + + self.signable.remove(&id); + self.attempt.remove(&id); + self.preprocessing.remove(&id); + self.signing.remove(&id); + + self.emit(SignerEvent::SignedTransaction { id, tx: tx.id() }); + } else { + warn!( + "a validator claimed {} completed {} when it did not", + hex::encode(tx_id), + hex::encode(id) + ); + } + } else { + debug!( + "signer {} informed of the completion of {}. {}", + hex::encode(self.keys.group_key().to_bytes()), + hex::encode(id), + "this signer did not have/has already completed that plan", + ); + } + } + async fn handle(&mut self, msg: CoordinatorMessage) { match msg { CoordinatorMessage::Preprocesses { id, mut preprocesses } => { @@ -201,7 +256,10 @@ impl Signer { let machine = match self.preprocessing.remove(&id.id) { // Either rebooted or RPC error, or some invariant None => { - warn!("not preprocessing for {:?}. this is an error if we didn't reboot", id); + warn!( + "not preprocessing for {}. this is an error if we didn't reboot", + hex::encode(id.id) + ); return; } Some(machine) => machine, @@ -248,7 +306,10 @@ impl Signer { panic!("never preprocessed yet signing?"); } - warn!("not preprocessing for {:?}. this is an error if we didn't reboot", id); + warn!( + "not preprocessing for {}. this is an error if we didn't reboot", + hex::encode(id.id) + ); return; } Some(machine) => machine, @@ -273,14 +334,15 @@ impl Signer { // Save the transaction in case it's needed for recovery let mut txn = self.db.0.txn(); self.db.save_transaction(&mut txn, &tx); - self.db.complete(&mut txn, id.id, tx.id()); + let tx_id = tx.id(); + self.db.complete(&mut txn, id.id, &tx_id); txn.commit(); // Publish it if let Err(e) = self.coin.publish_transaction(&tx).await { error!("couldn't publish {:?}: {:?}", tx, e); } else { - info!("published {:?}", hex::encode(tx.id())); + info!("published {}", hex::encode(&tx_id)); } // Stop trying to sign for this TX @@ -289,46 +351,23 @@ impl Signer { assert!(self.preprocessing.remove(&id.id).is_none()); assert!(self.signing.remove(&id.id).is_none()); - self.emit(SignerEvent::SignedTransaction { id: id.id, tx: tx.id() }); + self.emit(SignerEvent::SignedTransaction { id: id.id, tx: tx_id }); } - CoordinatorMessage::Completed { key: _, id, tx: tx_vec } => { + CoordinatorMessage::Completed { key: _, id, tx: mut tx_vec } => { let mut tx = >::Id::default(); if tx.as_ref().len() != tx_vec.len() { + tx_vec.truncate(2 * tx.as_ref().len()); warn!( - "a validator claimed {} completed {id:?} yet that's not a valid TX ID", - hex::encode(&tx) + "a validator claimed {} completed {} yet that's not a valid TX ID", + hex::encode(&tx), + hex::encode(id), ); return; } tx.as_mut().copy_from_slice(&tx_vec); - if let Some(eventuality) = self.db.eventuality(id) { - // Transaction hasn't hit our mempool/was dropped for a different signature - // The latter can happen given certain latency conditions/a single malicious signer - // In the case of a single malicious signer, they can drag multiple honest - // validators down with them, so we unfortunately can't slash on this case - let Ok(tx) = self.coin.get_transaction(&tx).await else { - todo!("queue checking eventualities"); // or give up here? - }; - - if self.coin.confirm_completion(&eventuality, &tx) { - // Stop trying to sign for this TX - let mut txn = self.db.0.txn(); - self.db.save_transaction(&mut txn, &tx); - self.db.complete(&mut txn, id, tx.id()); - txn.commit(); - - self.signable.remove(&id); - self.attempt.remove(&id); - self.preprocessing.remove(&id); - self.signing.remove(&id); - - self.emit(SignerEvent::SignedTransaction { id, tx: tx.id() }); - } else { - warn!("a validator claimed {} completed {id:?} when it did not", hex::encode(&tx.id())); - } - } + self.eventuality_completion(id, &tx).await; } } } @@ -406,7 +445,7 @@ impl Signer { if !id.signing_set(&signer.keys.params()).contains(&signer.keys.params().i()) { continue; } - info!("selected to sign {:?}", id); + info!("selected to sign {} #{}", hex::encode(id.id), id.attempt); // If we reboot mid-sign, the current design has us abort all signs and wait for latter // attempts/new signing protocols @@ -421,7 +460,11 @@ impl Signer { // // Only run if this hasn't already been attempted if signer.db.has_attempt(&id) { - warn!("already attempted {:?}. this is an error if we didn't reboot", id); + warn!( + "already attempted {} #{}. this is an error if we didn't reboot", + hex::encode(id.id), + id.attempt + ); continue; } @@ -432,7 +475,7 @@ impl Signer { // Attempt to create the TX let machine = match signer.coin.attempt_send(tx).await { Err(e) => { - error!("failed to attempt {:?}: {:?}", id, e); + error!("failed to attempt {}, #{}: {:?}", hex::encode(id.id), id.attempt, e); continue; } Ok(machine) => machine, @@ -503,6 +546,14 @@ impl SignerHandle { signer.signable.insert(id, (start, tx)); } + pub async fn eventuality_completion( + &self, + id: [u8; 32], + tx: &>::Id, + ) { + self.signer.write().await.eventuality_completion(id, tx).await; + } + pub async fn handle(&self, msg: CoordinatorMessage) { self.signer.write().await.handle(msg).await; } diff --git a/processor/src/tests/addresses.rs b/processor/src/tests/addresses.rs index 5c07ceac..29a15f58 100644 --- a/processor/src/tests/addresses.rs +++ b/processor/src/tests/addresses.rs @@ -56,6 +56,9 @@ async fn spend( assert_eq!(outputs[0].kind(), OutputType::Change); outputs } + ScannerEvent::Completed(_, _) => { + panic!("unexpectedly got eventuality completion"); + } } } @@ -89,6 +92,9 @@ pub async fn test_addresses(coin: C) { assert_eq!(outputs[0].kind(), OutputType::Branch); outputs } + ScannerEvent::Completed(_, _) => { + panic!("unexpectedly got eventuality completion"); + } }; // Spend the branch output, creating a change output and ensuring we actually get change diff --git a/processor/src/tests/scanner.rs b/processor/src/tests/scanner.rs index 0829f880..e4cfd706 100644 --- a/processor/src/tests/scanner.rs +++ b/processor/src/tests/scanner.rs @@ -56,6 +56,9 @@ pub async fn test_scanner(coin: C) { assert_eq!(outputs[0].kind(), OutputType::External); outputs } + ScannerEvent::Completed(_, _) => { + panic!("unexpectedly got eventuality completion"); + } }; (scanner, outputs) }; diff --git a/processor/src/tests/wallet.rs b/processor/src/tests/wallet.rs index bd85a6c4..dd0a0aae 100644 --- a/processor/src/tests/wallet.rs +++ b/processor/src/tests/wallet.rs @@ -39,6 +39,9 @@ pub async fn test_wallet(coin: C) { assert_eq!(outputs.len(), 1); (block_id, outputs) } + ScannerEvent::Completed(_, _) => { + panic!("unexpectedly got eventuality completion"); + } } }; @@ -105,6 +108,9 @@ pub async fn test_wallet(coin: C) { assert_eq!(time, block.time()); assert_eq!(these_outputs, outputs); } + ScannerEvent::Completed(_, _) => { + panic!("unexpectedly got eventuality completion"); + } } // Check the Scanner DB can reload the outputs