diff --git a/processor/bin/src/lib.rs b/processor/bin/src/lib.rs index ef83d7e0..0fc7257e 100644 --- a/processor/bin/src/lib.rs +++ b/processor/bin/src/lib.rs @@ -280,7 +280,13 @@ pub async fn main_loop< // Substrate sets this limit to prevent DoSs from malicious validator sets // That bound lets us consume this txn in the following loop body, as an optimization assert!(batches.len() <= 1); - for messages::substrate::ExecutedBatch { id, in_instructions } in batches { + for messages::substrate::ExecutedBatch { + id, + publisher, + in_instructions_hash, + in_instruction_results, + } in batches + { let key_to_activate = KeyToActivate::>::try_recv(txn.as_mut().unwrap()).map(|key| key.0); @@ -288,7 +294,9 @@ pub async fn main_loop< let _: () = scanner.acknowledge_batch( txn.take().unwrap(), id, - in_instructions, + publisher, + in_instructions_hash, + in_instruction_results, /* `acknowledge_batch` takes burns to optimize handling returns with standard payments. That's why handling these with a Batch (and not waiting until the diff --git a/processor/scanner/Cargo.toml b/processor/scanner/Cargo.toml index c46adde3..1fc70e0f 100644 --- a/processor/scanner/Cargo.toml +++ b/processor/scanner/Cargo.toml @@ -24,6 +24,7 @@ scale = { package = "parity-scale-codec", version = "3", default-features = fals borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] } # Cryptography +blake2 = { version = "0.10", default-features = false, features = ["std"] } group = { version = "0.13", default-features = false } # Application @@ -35,6 +36,7 @@ serai-db = { path = "../../common/db" } messages = { package = "serai-processor-messages", path = "../messages" } serai-primitives = { path = "../../substrate/primitives", default-features = false, features = ["std"] } +serai-validator-sets-primitives = { path = "../../substrate/validator-sets/primitives", default-features = false, features = ["std", "borsh"] } serai-in-instructions-primitives = { path = "../../substrate/in-instructions/primitives", default-features = false, features = ["std", "borsh"] } serai-coins-primitives = { path = "../../substrate/coins/primitives", default-features = false, features = ["std", "borsh"] } diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs index a985ba43..80b716ae 100644 --- a/processor/scanner/src/db.rs +++ b/processor/scanner/src/db.rs @@ -7,8 +7,9 @@ use scale::{Encode, Decode, IoReader}; use borsh::{BorshSerialize, BorshDeserialize}; use serai_db::{Get, DbTxn, create_db, db_channel}; -use serai_in_instructions_primitives::{InInstructionWithBalance, Batch}; use serai_coins_primitives::OutInstructionWithBalance; +use serai_validator_sets_primitives::Session; +use serai_in_instructions_primitives::{InInstructionWithBalance, Batch}; use primitives::{EncodableG, ReceivedOutput}; @@ -25,11 +26,13 @@ impl Borshy for T {} #[derive(BorshSerialize, BorshDeserialize)] struct SeraiKeyDbEntry { activation_block_number: u64, + session: Session, key: K, } #[derive(Clone)] pub(crate) struct SeraiKey { + pub(crate) session: Session, pub(crate) key: K, pub(crate) stage: LifetimeStage, pub(crate) activation_block_number: u64, @@ -165,7 +168,7 @@ impl ScannerGlobalDb { // If this new key retires a key, mark the block at which forwarding explicitly occurs notable // This lets us obtain synchrony over the transactions we'll make to accomplish this - if let Some(key_retired_by_this) = keys.last() { + let this_keys_session = if let Some(key_retired_by_this) = keys.last() { NotableBlock::set( txn, Lifetime::calculate::( @@ -182,10 +185,17 @@ impl ScannerGlobalDb { ), &(), ); - } + Session(key_retired_by_this.session.0 + 1) + } else { + Session(0) + }; // Push and save the next key - keys.push(SeraiKeyDbEntry { activation_block_number, key: EncodableG(key) }); + keys.push(SeraiKeyDbEntry { + activation_block_number, + session: this_keys_session, + key: EncodableG(key), + }); ActiveKeys::set(txn, &keys); // Now tidy the keys, ensuring this has a maximum length of 2 @@ -236,6 +246,7 @@ impl ScannerGlobalDb { raw_keys.get(i + 1).map(|key| key.activation_block_number), ); keys.push(SeraiKey { + session: raw_keys[i].session, key: raw_keys[i].key.0, stage, activation_block_number: raw_keys[i].activation_block_number, @@ -477,6 +488,7 @@ db_channel! { } pub(crate) struct InInstructionData { + pub(crate) session_to_sign_batch: Session, pub(crate) external_key_for_session_to_sign_batch: KeyFor, pub(crate) returnable_in_instructions: Vec>, } @@ -488,7 +500,8 @@ impl ScanToReportDb { block_number: u64, data: &InInstructionData, ) { - let mut buf = data.external_key_for_session_to_sign_batch.to_bytes().as_ref().to_vec(); + let mut buf = data.session_to_sign_batch.encode(); + buf.extend(data.external_key_for_session_to_sign_batch.to_bytes().as_ref()); for returnable_in_instruction in &data.returnable_in_instructions { returnable_in_instruction.write(&mut buf).unwrap(); } @@ -510,6 +523,7 @@ impl ScanToReportDb { ); let mut buf = data.returnable_in_instructions.as_slice(); + let session_to_sign_batch = Session::decode(&mut buf).unwrap(); let external_key_for_session_to_sign_batch = { let mut external_key_for_session_to_sign_batch = as GroupEncoding>::Repr::default(); @@ -523,7 +537,11 @@ impl ScanToReportDb { while !buf.is_empty() { returnable_in_instructions.push(Returnable::read(&mut buf).unwrap()); } - InInstructionData { external_key_for_session_to_sign_batch, returnable_in_instructions } + InInstructionData { + session_to_sign_batch, + external_key_for_session_to_sign_batch, + returnable_in_instructions, + } } } diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 5046753c..1ef4f8c2 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -11,6 +11,7 @@ use borsh::{BorshSerialize, BorshDeserialize}; use serai_db::{Get, DbTxn, Db}; use serai_primitives::{NetworkId, Coin, Amount}; +use serai_validator_sets_primitives::Session; use serai_coins_primitives::OutInstructionWithBalance; use primitives::{task::*, Address, ReceivedOutput, Block, Payment}; @@ -437,10 +438,13 @@ impl Scanner { /// `queue_burns`. Doing so will cause them to be executed multiple times. /// /// The calls to this function must be ordered with regards to `queue_burns`. + #[allow(clippy::too_many_arguments)] pub fn acknowledge_batch( &mut self, mut txn: impl DbTxn, batch_id: u32, + publisher: Session, + in_instructions_hash: [u8; 32], in_instruction_results: Vec, burns: Vec, key_to_activate: Option>, @@ -451,6 +455,8 @@ impl Scanner { substrate::queue_acknowledge_batch::( &mut txn, batch_id, + publisher, + in_instructions_hash, in_instruction_results, burns, key_to_activate, diff --git a/processor/scanner/src/report/db.rs b/processor/scanner/src/report/db.rs index 186accac..b6503e86 100644 --- a/processor/scanner/src/report/db.rs +++ b/processor/scanner/src/report/db.rs @@ -8,9 +8,17 @@ use borsh::{BorshSerialize, BorshDeserialize}; use serai_db::{Get, DbTxn, create_db}; use serai_primitives::Balance; +use serai_validator_sets_primitives::Session; use crate::{ScannerFeed, KeyFor, AddressFor}; +#[derive(BorshSerialize, BorshDeserialize)] +pub(crate) struct BatchInfo { + pub(crate) block_number: u64, + pub(crate) publisher: Session, + pub(crate) in_instructions_hash: [u8; 32], +} + create_db!( ScannerReport { // The next block to potentially report @@ -18,10 +26,11 @@ create_db!( // The next Batch ID to use NextBatchId: () -> u32, - // The block number which caused a batch - BlockNumberForBatch: (batch: u32) -> u64, + // The information needed to verify a batch + InfoForBatch: (batch: u32) -> BatchInfo, // The external key for the session which should sign a batch + // TODO: Merge this with InfoForBatch ExternalKeyForSessionToSignBatch: (batch: u32) -> Vec, // The return addresses for the InInstructions within a Batch @@ -46,15 +55,24 @@ impl ReportDb { NextToPotentiallyReportBlock::get(getter) } - pub(crate) fn acquire_batch_id(txn: &mut impl DbTxn, block_number: u64) -> u32 { + pub(crate) fn acquire_batch_id(txn: &mut impl DbTxn) -> u32 { let id = NextBatchId::get(txn).unwrap_or(0); NextBatchId::set(txn, &(id + 1)); - BlockNumberForBatch::set(txn, id, &block_number); id } - pub(crate) fn take_block_number_for_batch(txn: &mut impl DbTxn, id: u32) -> Option { - BlockNumberForBatch::take(txn, id) + pub(crate) fn save_batch_info( + txn: &mut impl DbTxn, + id: u32, + block_number: u64, + publisher: Session, + in_instructions_hash: [u8; 32], + ) { + InfoForBatch::set(txn, id, &BatchInfo { block_number, publisher, in_instructions_hash }); + } + + pub(crate) fn take_info_for_batch(txn: &mut impl DbTxn, id: u32) -> Option { + InfoForBatch::take(txn, id) } pub(crate) fn save_external_key_for_session_to_sign_batch( diff --git a/processor/scanner/src/report/mod.rs b/processor/scanner/src/report/mod.rs index afb1b672..f5208460 100644 --- a/processor/scanner/src/report/mod.rs +++ b/processor/scanner/src/report/mod.rs @@ -1,28 +1,28 @@ use core::{marker::PhantomData, future::Future}; +use blake2::{digest::typenum::U32, Digest, Blake2b}; + use scale::Encode; use serai_db::{DbTxn, Db}; -use serai_primitives::BlockHash; use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch}; use primitives::task::ContinuallyRan; use crate::{ db::{Returnable, ScannerGlobalDb, InInstructionData, ScanToReportDb, Batches, BatchesToSign}, - index, scan::next_to_scan_for_outputs_block, ScannerFeed, KeyFor, }; mod db; -pub(crate) use db::ReturnInformation; +pub(crate) use db::{BatchInfo, ReturnInformation}; use db::ReportDb; -pub(crate) fn take_block_number_for_batch( +pub(crate) fn take_info_for_batch( txn: &mut impl DbTxn, id: u32, -) -> Option { - ReportDb::::take_block_number_for_batch(txn, id) +) -> Option { + ReportDb::::take_info_for_batch(txn, id) } pub(crate) fn take_external_key_for_session_to_sign_batch( @@ -88,33 +88,28 @@ impl ContinuallyRan for ReportTask { let next_to_potentially_report = ReportDb::::next_to_potentially_report_block(&self.db) .expect("ReportTask run before writing the start block"); - for b in next_to_potentially_report ..= highest_reportable { + for block_number in next_to_potentially_report ..= highest_reportable { let mut txn = self.db.txn(); // Receive the InInstructions for this block // We always do this as we can't trivially tell if we should recv InInstructions before we // do let InInstructionData { + session_to_sign_batch, external_key_for_session_to_sign_batch, returnable_in_instructions: in_instructions, - } = ScanToReportDb::::recv_in_instructions(&mut txn, b); - let notable = ScannerGlobalDb::::is_block_notable(&txn, b); + } = ScanToReportDb::::recv_in_instructions(&mut txn, block_number); + let notable = ScannerGlobalDb::::is_block_notable(&txn, block_number); if !notable { assert!(in_instructions.is_empty(), "block wasn't notable yet had InInstructions"); } // If this block is notable, create the Batch(s) for it if notable { let network = S::NETWORK; - let block_hash = index::block_id(&txn, b); - let mut batch_id = ReportDb::::acquire_batch_id(&mut txn, b); + let mut batch_id = ReportDb::::acquire_batch_id(&mut txn); // start with empty batch - let mut batches = vec![Batch { - network, - id: batch_id, - block: BlockHash(block_hash), - instructions: vec![], - }]; + let mut batches = vec![Batch { network, id: batch_id, instructions: vec![] }]; // We also track the return information for the InInstructions within a Batch in case // they error let mut return_information = vec![vec![]]; @@ -131,15 +126,10 @@ impl ContinuallyRan for ReportTask { let in_instruction = batch.instructions.pop().unwrap(); // bump the id for the new batch - batch_id = ReportDb::::acquire_batch_id(&mut txn, b); + batch_id = ReportDb::::acquire_batch_id(&mut txn); // make a new batch with this instruction included - batches.push(Batch { - network, - id: batch_id, - block: BlockHash(block_hash), - instructions: vec![in_instruction], - }); + batches.push(Batch { network, id: batch_id, instructions: vec![in_instruction] }); // Since we're allocating a new batch, allocate a new set of return addresses for it return_information.push(vec![]); } @@ -152,10 +142,17 @@ impl ContinuallyRan for ReportTask { .push(return_address.map(|address| ReturnInformation { address, balance })); } - // Save the return addresses to the database + // Now that we've finalized the Batches, save the information for each to the database assert_eq!(batches.len(), return_information.len()); for (batch, return_information) in batches.iter().zip(&return_information) { assert_eq!(batch.instructions.len(), return_information.len()); + ReportDb::::save_batch_info( + &mut txn, + batch.id, + block_number, + session_to_sign_batch, + Blake2b::::digest(batch.instructions.encode()).into(), + ); ReportDb::::save_external_key_for_session_to_sign_batch( &mut txn, batch.id, @@ -171,7 +168,7 @@ impl ContinuallyRan for ReportTask { } // Update the next to potentially report block - ReportDb::::set_next_to_potentially_report_block(&mut txn, b + 1); + ReportDb::::set_next_to_potentially_report_block(&mut txn, block_number + 1); txn.commit(); } diff --git a/processor/scanner/src/scan/mod.rs b/processor/scanner/src/scan/mod.rs index 0ebdf992..14506092 100644 --- a/processor/scanner/src/scan/mod.rs +++ b/processor/scanner/src/scan/mod.rs @@ -349,6 +349,7 @@ impl ContinuallyRan for ScanTask { &mut txn, b, &InInstructionData { + session_to_sign_batch: keys[0].session, external_key_for_session_to_sign_batch: keys[0].key, returnable_in_instructions: in_instructions, }, diff --git a/processor/scanner/src/substrate/db.rs b/processor/scanner/src/substrate/db.rs index c1a1b0e2..d0037ac8 100644 --- a/processor/scanner/src/substrate/db.rs +++ b/processor/scanner/src/substrate/db.rs @@ -6,12 +6,15 @@ use borsh::{BorshSerialize, BorshDeserialize}; use serai_db::{Get, DbTxn, create_db, db_channel}; use serai_coins_primitives::OutInstructionWithBalance; +use serai_validator_sets_primitives::Session; use crate::{ScannerFeed, KeyFor}; #[derive(BorshSerialize, BorshDeserialize)] struct AcknowledgeBatchEncodable { batch_id: u32, + publisher: Session, + in_instructions_hash: [u8; 32], in_instruction_results: Vec, burns: Vec, key_to_activate: Option>, @@ -25,6 +28,8 @@ enum ActionEncodable { pub(crate) struct AcknowledgeBatch { pub(crate) batch_id: u32, + pub(crate) publisher: Session, + pub(crate) in_instructions_hash: [u8; 32], pub(crate) in_instruction_results: Vec, pub(crate) burns: Vec, pub(crate) key_to_activate: Option>, @@ -46,6 +51,8 @@ impl SubstrateDb { pub(crate) fn queue_acknowledge_batch( txn: &mut impl DbTxn, batch_id: u32, + publisher: Session, + in_instructions_hash: [u8; 32], in_instruction_results: Vec, burns: Vec, key_to_activate: Option>, @@ -54,6 +61,8 @@ impl SubstrateDb { txn, &ActionEncodable::AcknowledgeBatch(AcknowledgeBatchEncodable { batch_id, + publisher, + in_instructions_hash, in_instruction_results, burns, key_to_activate: key_to_activate.map(|key| key.to_bytes().as_ref().to_vec()), @@ -69,11 +78,15 @@ impl SubstrateDb { Some(match action_encodable { ActionEncodable::AcknowledgeBatch(AcknowledgeBatchEncodable { batch_id, + publisher, + in_instructions_hash, in_instruction_results, burns, key_to_activate, }) => Action::AcknowledgeBatch(AcknowledgeBatch { batch_id, + publisher, + in_instructions_hash, in_instruction_results, burns, key_to_activate: key_to_activate.map(|key| { diff --git a/processor/scanner/src/substrate/mod.rs b/processor/scanner/src/substrate/mod.rs index ce28470d..aced7d53 100644 --- a/processor/scanner/src/substrate/mod.rs +++ b/processor/scanner/src/substrate/mod.rs @@ -3,6 +3,7 @@ use core::{marker::PhantomData, future::Future}; use serai_db::{DbTxn, Db}; use serai_coins_primitives::{OutInstruction, OutInstructionWithBalance}; +use serai_validator_sets_primitives::Session; use primitives::task::ContinuallyRan; use crate::{ @@ -16,6 +17,8 @@ use db::*; pub(crate) fn queue_acknowledge_batch( txn: &mut impl DbTxn, batch_id: u32, + publisher: Session, + in_instructions_hash: [u8; 32], in_instruction_results: Vec, burns: Vec, key_to_activate: Option>, @@ -23,6 +26,8 @@ pub(crate) fn queue_acknowledge_batch( SubstrateDb::::queue_acknowledge_batch( txn, batch_id, + publisher, + in_instructions_hash, in_instruction_results, burns, key_to_activate, @@ -67,17 +72,31 @@ impl ContinuallyRan for SubstrateTask { match action { Action::AcknowledgeBatch(AcknowledgeBatch { batch_id, + publisher, + in_instructions_hash, in_instruction_results, mut burns, key_to_activate, }) => { // Check if we have the information for this batch - let Some(block_number) = report::take_block_number_for_batch::(&mut txn, batch_id) + let Some(report::BatchInfo { + block_number, + publisher: expected_publisher, + in_instructions_hash: expected_in_instructions_hash, + }) = report::take_info_for_batch::(&mut txn, batch_id) else { // If we don't, drop this txn (restoring the action to the database) drop(txn); return Ok(made_progress); }; + assert_eq!( + publisher, expected_publisher, + "batch acknowledged on-chain was acknowledged by an unexpected publisher" + ); + assert_eq!( + in_instructions_hash, expected_in_instructions_hash, + "batch acknowledged on-chain was distinct" + ); { let external_key_for_session_to_sign_batch = diff --git a/substrate/client/src/serai/in_instructions.rs b/substrate/client/src/serai/in_instructions.rs index a8b47bfc..50c9ed96 100644 --- a/substrate/client/src/serai/in_instructions.rs +++ b/substrate/client/src/serai/in_instructions.rs @@ -13,13 +13,6 @@ const PALLET: &str = "InInstructions"; #[derive(Clone, Copy)] pub struct SeraiInInstructions<'a>(pub(crate) &'a TemporalSerai<'a>); impl<'a> SeraiInInstructions<'a> { - pub async fn latest_block_for_network( - &self, - network: NetworkId, - ) -> Result, SeraiError> { - self.0.storage(PALLET, "LatestNetworkBlock", network).await - } - pub async fn last_batch_for_network( &self, network: NetworkId, diff --git a/substrate/client/tests/batch.rs b/substrate/client/tests/batch.rs index 17e4d374..c19a4422 100644 --- a/substrate/client/tests/batch.rs +++ b/substrate/client/tests/batch.rs @@ -25,9 +25,6 @@ serai_test!( let network = NetworkId::Bitcoin; let id = 0; - let mut block_hash = BlockHash([0; 32]); - OsRng.fill_bytes(&mut block_hash.0); - let mut address = SeraiAddress::new([0; 32]); OsRng.fill_bytes(&mut address.0); @@ -38,7 +35,6 @@ serai_test!( let batch = Batch { network, id, - block: block_hash, instructions: vec![InInstructionWithBalance { instruction: InInstruction::Transfer(address), balance, @@ -50,15 +46,12 @@ serai_test!( let serai = serai.as_of(block); { let serai = serai.in_instructions(); - let latest_finalized = serai.latest_block_for_network(network).await.unwrap(); - assert_eq!(latest_finalized, Some(block_hash)); let batches = serai.batch_events().await.unwrap(); assert_eq!( batches, vec![InInstructionsEvent::Batch { network, id, - block: block_hash, instructions_hash: Blake2b::::digest(batch.instructions.encode()).into(), }] ); diff --git a/substrate/client/tests/common/in_instructions.rs b/substrate/client/tests/common/in_instructions.rs index 103940ab..5f29f2ba 100644 --- a/substrate/client/tests/common/in_instructions.rs +++ b/substrate/client/tests/common/in_instructions.rs @@ -52,7 +52,6 @@ pub async fn provide_batch(serai: &Serai, batch: Batch) -> [u8; 32] { vec![InInstructionsEvent::Batch { network: batch.network, id: batch.id, - block: batch.block, instructions_hash: Blake2b::::digest(batch.instructions.encode()).into(), }], ); diff --git a/substrate/in-instructions/pallet/src/lib.rs b/substrate/in-instructions/pallet/src/lib.rs index 1cb05c40..5b394c3d 100644 --- a/substrate/in-instructions/pallet/src/lib.rs +++ b/substrate/in-instructions/pallet/src/lib.rs @@ -89,12 +89,6 @@ pub mod pallet { #[pallet::storage] pub(crate) type Halted = StorageMap<_, Identity, NetworkId, (), OptionQuery>; - // The latest block a network has acknowledged as finalized - #[pallet::storage] - #[pallet::getter(fn latest_network_block)] - pub(crate) type LatestNetworkBlock = - StorageMap<_, Identity, NetworkId, BlockHash, OptionQuery>; - impl Pallet { // Use a dedicated transaction layer when executing this InInstruction // This lets it individually error without causing any storage modifications @@ -262,11 +256,9 @@ pub mod pallet { let batch = batch.batch; - LatestNetworkBlock::::insert(batch.network, batch.block); Self::deposit_event(Event::Batch { network: batch.network, id: batch.id, - block: batch.block, instructions_hash: blake2_256(&batch.instructions.encode()), }); for (i, instruction) in batch.instructions.into_iter().enumerate() { diff --git a/substrate/in-instructions/primitives/src/lib.rs b/substrate/in-instructions/primitives/src/lib.rs index 1455e423..ef88061b 100644 --- a/substrate/in-instructions/primitives/src/lib.rs +++ b/substrate/in-instructions/primitives/src/lib.rs @@ -19,8 +19,7 @@ use sp_application_crypto::sr25519::Signature; use sp_std::vec::Vec; use sp_runtime::RuntimeDebug; -#[rustfmt::skip] -use serai_primitives::{BlockHash, Balance, NetworkId, SeraiAddress, ExternalAddress, system_address}; +use serai_primitives::{Balance, NetworkId, SeraiAddress, ExternalAddress, system_address}; mod shorthand; pub use shorthand::*; @@ -107,7 +106,6 @@ pub struct InInstructionWithBalance { pub struct Batch { pub network: NetworkId, pub id: u32, - pub block: BlockHash, pub instructions: Vec, }