diff --git a/networks/ethereum/relayer/src/main.rs b/networks/ethereum/relayer/src/main.rs index f5a7e0f9..6424c90f 100644 --- a/networks/ethereum/relayer/src/main.rs +++ b/networks/ethereum/relayer/src/main.rs @@ -91,7 +91,8 @@ async fn main() { let Ok(_) = socket.read_exact(&mut buf).await else { break }; let transaction = db.get(&buf[.. 4]).unwrap_or(vec![]); - let Ok(()) = socket.write_all(&u32::try_from(transaction.len()).unwrap().to_le_bytes()).await + let Ok(()) = + socket.write_all(&u32::try_from(transaction.len()).unwrap().to_le_bytes()).await else { break; }; diff --git a/processor/bitcoin/src/primitives/block.rs b/processor/bitcoin/src/primitives/block.rs index e3df7e69..02b8e595 100644 --- a/processor/bitcoin/src/primitives/block.rs +++ b/processor/bitcoin/src/primitives/block.rs @@ -43,7 +43,11 @@ impl primitives::Block for Block { primitives::BlockHeader::id(&BlockHeader(self.1.header)) } - fn scan_for_outputs_unordered(&self, key: Self::Key) -> Vec { + fn scan_for_outputs_unordered( + &self, + _latest_active_key: Self::Key, + key: Self::Key, + ) -> Vec { let scanner = scanner(key); let mut res = vec![]; diff --git a/processor/ethereum/src/primitives/block.rs b/processor/ethereum/src/primitives/block.rs index 2c0e0505..a6268c0b 100644 --- a/processor/ethereum/src/primitives/block.rs +++ b/processor/ethereum/src/primitives/block.rs @@ -59,8 +59,19 @@ impl primitives::Block for FullEpoch { self.epoch.end_hash } - fn scan_for_outputs_unordered(&self, _key: Self::Key) -> Vec { + fn scan_for_outputs_unordered( + &self, + latest_active_key: Self::Key, + key: Self::Key, + ) -> Vec { // Only return these outputs for the latest key + if latest_active_key != key { + return vec![]; + } + + // Associate all outputs with the latest active key + // We don't associate these with the current key within the SC as that'll cause outputs to be + // marked for forwarding if the SC is delayed to actually rotate todo!("TODO") } diff --git a/processor/monero/src/primitives/block.rs b/processor/monero/src/primitives/block.rs index 70a559c1..6afae429 100644 --- a/processor/monero/src/primitives/block.rs +++ b/processor/monero/src/primitives/block.rs @@ -40,7 +40,11 @@ impl primitives::Block for Block { self.0.block.hash() } - fn scan_for_outputs_unordered(&self, key: Self::Key) -> Vec { + fn scan_for_outputs_unordered( + &self, + _latest_active_key: Self::Key, + key: Self::Key, + ) -> Vec { let mut scanner = GuaranteedScanner::new(view_pair(key)); scanner.register_subaddress(EXTERNAL_SUBADDRESS); scanner.register_subaddress(BRANCH_SUBADDRESS); diff --git a/processor/primitives/src/block.rs b/processor/primitives/src/block.rs index da481247..a3dec40b 100644 --- a/processor/primitives/src/block.rs +++ b/processor/primitives/src/block.rs @@ -43,7 +43,11 @@ pub trait Block: Send + Sync + Sized + Clone + Debug { /// Scan all outputs within this block to find the outputs spendable by this key. /// /// No assumption on the order of the returned outputs is made. - fn scan_for_outputs_unordered(&self, key: Self::Key) -> Vec; + fn scan_for_outputs_unordered( + &self, + latest_active_key: Self::Key, + key: Self::Key, + ) -> Vec; /// Check if this block resolved any Eventualities. /// diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs index 49ab1785..884e0e2b 100644 --- a/processor/scanner/src/db.rs +++ b/processor/scanner/src/db.rs @@ -28,6 +28,7 @@ struct SeraiKeyDbEntry { key: K, } +#[derive(Clone)] pub(crate) struct SeraiKey { pub(crate) key: K, pub(crate) stage: LifetimeStage, diff --git a/processor/scanner/src/eventuality/mod.rs b/processor/scanner/src/eventuality/mod.rs index 99fea2fb..bb3e4b7e 100644 --- a/processor/scanner/src/eventuality/mod.rs +++ b/processor/scanner/src/eventuality/mod.rs @@ -273,6 +273,18 @@ impl> ContinuallyRan for EventualityTas log::debug!("checking eventuality completions in block: {} ({b})", hex::encode(block.id())); let (keys, keys_with_stages) = self.keys_and_keys_with_stages(b); + let latest_active_key = { + let mut keys_with_stages = keys_with_stages.clone(); + loop { + // Use the most recent key + let (key, stage) = keys_with_stages.pop().unwrap(); + // Unless this key is active, but not yet reporting + if stage == LifetimeStage::ActiveYetNotReporting { + continue; + } + break key; + } + }; let mut txn = self.db.txn(); @@ -307,7 +319,7 @@ impl> ContinuallyRan for EventualityTas } // Fetch all non-External outputs - let mut non_external_outputs = block.scan_for_outputs(key.key); + let mut non_external_outputs = block.scan_for_outputs(latest_active_key, key.key); non_external_outputs.retain(|output| output.kind() != OutputType::External); // Drop any outputs less than the dust limit non_external_outputs.retain(|output| { diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 1b6afaa9..e591d210 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -46,11 +46,11 @@ pub(crate) fn sort_outputs /// Extension traits around Block. pub(crate) trait BlockExt: Block { - fn scan_for_outputs(&self, key: Self::Key) -> Vec; + fn scan_for_outputs(&self, latest_active_key: Self::Key, key: Self::Key) -> Vec; } impl BlockExt for B { - fn scan_for_outputs(&self, key: Self::Key) -> Vec { - let mut outputs = self.scan_for_outputs_unordered(key); + fn scan_for_outputs(&self, latest_active_key: Self::Key, key: Self::Key) -> Vec { + let mut outputs = self.scan_for_outputs_unordered(latest_active_key, key); outputs.sort_by(sort_outputs); outputs } diff --git a/processor/scanner/src/scan/mod.rs b/processor/scanner/src/scan/mod.rs index b235ff15..7004a4d9 100644 --- a/processor/scanner/src/scan/mod.rs +++ b/processor/scanner/src/scan/mod.rs @@ -122,6 +122,19 @@ impl ContinuallyRan for ScanTask { let keys = ScannerGlobalDb::::active_keys_as_of_next_to_scan_for_outputs_block(&txn) .expect("scanning for a blockchain without any keys set"); + let latest_active_key = { + let mut keys = keys.clone(); + loop { + // Use the most recent key + let key = keys.pop().unwrap(); + // Unless this key is active, but not yet reporting + if key.stage == LifetimeStage::ActiveYetNotReporting { + continue; + } + break key.key; + } + }; + // The scan data for this block let mut scan_data = SenderScanData { block_number: b, @@ -157,7 +170,7 @@ impl ContinuallyRan for ScanTask { // Scan for each key for key in &keys { - for output in block.scan_for_outputs(key.key) { + for output in block.scan_for_outputs(latest_active_key, key.key) { assert_eq!(output.key(), key.key); /*