diff --git a/processor/bin/src/coordinator.rs b/processor/bin/src/coordinator.rs index e05712cf..e5d0e23b 100644 --- a/processor/bin/src/coordinator.rs +++ b/processor/bin/src/coordinator.rs @@ -196,18 +196,6 @@ impl signers::Coordinator for CoordinatorSend { } } - fn publish_batch( - &mut self, - batch: Batch, - ) -> impl Send + Future> { - async move { - self.send(&messages::ProcessorMessage::Substrate( - messages::substrate::ProcessorMessage::Batch { batch }, - )); - Ok(()) - } - } - fn publish_signed_batch( &mut self, batch: SignedBatch, diff --git a/processor/messages/src/lib.rs b/processor/messages/src/lib.rs index 748cf39b..bbab3186 100644 --- a/processor/messages/src/lib.rs +++ b/processor/messages/src/lib.rs @@ -9,7 +9,7 @@ use dkg::Participant; use serai_primitives::BlockHash; use validator_sets_primitives::{Session, KeyPair, Slash}; use coins_primitives::OutInstructionWithBalance; -use in_instructions_primitives::{Batch, SignedBatch}; +use in_instructions_primitives::SignedBatch; #[derive(Clone, Copy, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] pub struct SubstrateContext { @@ -208,9 +208,17 @@ pub mod substrate { }, } - #[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] - pub enum ProcessorMessage { - Batch { batch: Batch }, + #[derive(Clone, PartialEq, Eq, Debug)] + pub enum ProcessorMessage {} + impl BorshSerialize for ProcessorMessage { + fn serialize(&self, _writer: &mut W) -> borsh::io::Result<()> { + unimplemented!() + } + } + impl BorshDeserialize for ProcessorMessage { + fn deserialize_reader(_reader: &mut R) -> borsh::io::Result { + unimplemented!() + } } } @@ -383,15 +391,7 @@ impl ProcessorMessage { res.extend(&id); res } - ProcessorMessage::Substrate(msg) => { - let (sub, id) = match msg { - substrate::ProcessorMessage::Batch { batch } => (0, batch.id.encode()), - }; - - let mut res = vec![PROCESSOR_UID, TYPE_SUBSTRATE_UID, sub]; - res.extend(&id); - res - } + ProcessorMessage::Substrate(_) => panic!("requesting intent for empty message type"), } } } diff --git a/processor/scanner/src/report/db.rs b/processor/scanner/src/batch/db.rs similarity index 83% rename from processor/scanner/src/report/db.rs rename to processor/scanner/src/batch/db.rs index d5b8fbd1..edca6d4a 100644 --- a/processor/scanner/src/report/db.rs +++ b/processor/scanner/src/batch/db.rs @@ -5,11 +5,10 @@ use group::GroupEncoding; use scale::{Encode, Decode, IoReader}; use borsh::{BorshSerialize, BorshDeserialize}; -use serai_db::{Get, DbTxn, create_db, db_channel}; +use serai_db::{Get, DbTxn, create_db}; use serai_primitives::Balance; use serai_validator_sets_primitives::Session; -use serai_in_instructions_primitives::Batch; use primitives::EncodableG; use crate::{ScannerFeed, KeyFor, AddressFor}; @@ -23,9 +22,9 @@ pub(crate) struct BatchInfo { } create_db!( - ScannerReport { - // The next block to potentially report - NextToPotentiallyReportBlock: () -> u64, + ScannerBatch { + // The next block to create batches for + NextBlockToBatch: () -> u64, // The last session to sign a Batch and their first Batch signed LastSessionToSignBatchAndFirstBatch: () -> (Session, u32), @@ -41,19 +40,13 @@ create_db!( } ); -db_channel!( - ScannerReport { - InternalBatches: () -> (Session, EncodableG, Batch), - } -); - pub(crate) struct ReturnInformation { pub(crate) address: AddressFor, pub(crate) balance: Balance, } -pub(crate) struct ReportDb(PhantomData); -impl ReportDb { +pub(crate) struct BatchDb(PhantomData); +impl BatchDb { pub(crate) fn set_last_session_to_sign_batch_and_first_batch( txn: &mut impl DbTxn, session: Session, @@ -67,14 +60,11 @@ impl ReportDb { LastSessionToSignBatchAndFirstBatch::get(getter) } - pub(crate) fn set_next_to_potentially_report_block( - txn: &mut impl DbTxn, - next_to_potentially_report_block: u64, - ) { - NextToPotentiallyReportBlock::set(txn, &next_to_potentially_report_block); + pub(crate) fn set_next_block_to_batch(txn: &mut impl DbTxn, next_block_to_batch: u64) { + NextBlockToBatch::set(txn, &next_block_to_batch); } - pub(crate) fn next_to_potentially_report_block(getter: &impl Get) -> Option { - NextToPotentiallyReportBlock::get(getter) + pub(crate) fn next_block_to_batch(getter: &impl Get) -> Option { + NextBlockToBatch::get(getter) } pub(crate) fn acquire_batch_id(txn: &mut impl DbTxn) -> u32 { diff --git a/processor/scanner/src/batch/mod.rs b/processor/scanner/src/batch/mod.rs new file mode 100644 index 00000000..e12cda89 --- /dev/null +++ b/processor/scanner/src/batch/mod.rs @@ -0,0 +1,252 @@ +use core::{marker::PhantomData, future::Future}; + +use blake2::{digest::typenum::U32, Digest, Blake2b}; + +use scale::Encode; +use serai_db::{DbTxn, Db}; + +use serai_validator_sets_primitives::Session; +use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch}; + +use primitives::{EncodableG, task::ContinuallyRan}; +use crate::{ + db::{ + Returnable, ScannerGlobalDb, InInstructionData, ScanToBatchDb, BatchData, BatchToReportDb, + BatchesToSign, + }, + scan::next_to_scan_for_outputs_block, + substrate, ScannerFeed, KeyFor, +}; + +mod db; +pub(crate) use db::{BatchInfo, ReturnInformation}; +use db::BatchDb; + +pub(crate) fn take_info_for_batch( + txn: &mut impl DbTxn, + id: u32, +) -> Option>>> { + BatchDb::::take_info_for_batch(txn, id) +} + +pub(crate) fn take_return_information( + txn: &mut impl DbTxn, + id: u32, +) -> Option>>> { + BatchDb::::take_return_information(txn, id) +} + +/* + This task produces Batches for notable blocks, with all InInstructions, in an ordered fashion. + + We only produce batches once both tasks, scanning for received outputs and checking for resolved + Eventualities, have processed the block. This ensures we know if this block is notable, and have + the InInstructions for it. +*/ +#[allow(non_snake_case)] +pub(crate) struct BatchTask { + db: D, + _S: PhantomData, +} + +impl BatchTask { + pub(crate) fn new(mut db: D, start_block: u64) -> Self { + if BatchDb::::next_block_to_batch(&db).is_none() { + // Initialize the DB + let mut txn = db.txn(); + BatchDb::::set_next_block_to_batch(&mut txn, start_block); + txn.commit(); + } + + Self { db, _S: PhantomData } + } +} + +impl ContinuallyRan for BatchTask { + fn run_iteration(&mut self) -> impl Send + Future> { + async move { + let highest_batchable = { + // Fetch the next to scan block + let next_to_scan = next_to_scan_for_outputs_block::(&self.db) + .expect("BatchTask run before writing the start block"); + // If we haven't done any work, return + if next_to_scan == 0 { + return Ok(false); + } + // The last scanned block is the block prior to this + #[allow(clippy::let_and_return)] + let last_scanned = next_to_scan - 1; + // The last scanned block is the highest batchable block as we only scan blocks within a + // window where it's safe to immediately report the block + // See `eventuality.rs` for more info + last_scanned + }; + + let next_block_to_batch = BatchDb::::next_block_to_batch(&self.db) + .expect("BatchTask run before writing the start block"); + + for block_number in next_block_to_batch ..= highest_batchable { + let mut txn = self.db.txn(); + + // Receive the InInstructions for this block + // We always do this as we can't trivially tell if we should recv InInstructions before we + // do + let InInstructionData { + session_to_sign_batch, + external_key_for_session_to_sign_batch, + returnable_in_instructions: in_instructions, + } = ScanToBatchDb::::recv_in_instructions(&mut txn, block_number); + + let notable = ScannerGlobalDb::::is_block_notable(&txn, block_number); + if !notable { + assert!(in_instructions.is_empty(), "block wasn't notable yet had InInstructions"); + } + // If this block is notable, create the Batch(s) for it + if notable { + let network = S::NETWORK; + let mut batch_id = BatchDb::::acquire_batch_id(&mut txn); + + // start with empty batch + let mut batches = vec![Batch { network, id: batch_id, instructions: vec![] }]; + // We also track the return information for the InInstructions within a Batch in case + // they error + let mut return_information = vec![vec![]]; + + for Returnable { return_address, in_instruction } in in_instructions { + let balance = in_instruction.balance; + + let batch = batches.last_mut().unwrap(); + batch.instructions.push(in_instruction); + + // check if batch is over-size + if batch.encode().len() > MAX_BATCH_SIZE { + // pop the last instruction so it's back in size + let in_instruction = batch.instructions.pop().unwrap(); + + // bump the id for the new batch + batch_id = BatchDb::::acquire_batch_id(&mut txn); + + // make a new batch with this instruction included + batches.push(Batch { network, id: batch_id, instructions: vec![in_instruction] }); + // Since we're allocating a new batch, allocate a new set of return addresses for it + return_information.push(vec![]); + } + + // For the set of return addresses for the InInstructions for the batch we just pushed + // onto, push this InInstruction's return addresses + return_information + .last_mut() + .unwrap() + .push(return_address.map(|address| ReturnInformation { address, balance })); + } + + // Now that we've finalized the Batches, save the information for each to the database + assert_eq!(batches.len(), return_information.len()); + for (batch, return_information) in batches.iter().zip(&return_information) { + assert_eq!(batch.instructions.len(), return_information.len()); + BatchDb::::save_batch_info( + &mut txn, + batch.id, + block_number, + session_to_sign_batch, + external_key_for_session_to_sign_batch, + Blake2b::::digest(batch.instructions.encode()).into(), + ); + BatchDb::::save_return_information(&mut txn, batch.id, return_information); + } + + for batch in batches { + BatchToReportDb::::send_batch( + &mut txn, + &BatchData { + session_to_sign_batch, + external_key_for_session_to_sign_batch: EncodableG( + external_key_for_session_to_sign_batch, + ), + batch, + }, + ); + } + } + + // Update the next block to batch + BatchDb::::set_next_block_to_batch(&mut txn, block_number + 1); + + txn.commit(); + } + + // TODO: This should be its own task. The above doesn't error, doesn't return early, so this + // is fine, but this is precarious and would be better as its own task + loop { + let mut txn = self.db.txn(); + let Some(BatchData { + session_to_sign_batch, + external_key_for_session_to_sign_batch, + batch, + }) = BatchToReportDb::::try_recv_batch(&mut txn) + else { + break; + }; + + /* + If this is the handover Batch, the first Batch signed by a session which retires the + prior validator set, then this should only be signed after the prior validator set's + actions are fully validated. + + The new session will only be responsible for signing this Batch if the prior key has + retired, successfully completed all its on-external-network actions. + + We check here the prior session has successfully completed all its on-Serai-network + actions by ensuring we've validated all Batches expected from it. Only then do we sign + the Batch confirming the handover. + + We also wait for the Batch confirming the handover to be accepted on-chain, ensuring we + don't verify the prior session's Batches, sign the handover Batch and the following + Batch, have the prior session publish a malicious Batch where our handover Batch should + be, before our following Batch becomes our handover Batch. + */ + if session_to_sign_batch != Session(0) { + // We may have Session(1)'s first Batch be Batch 0 if Session(0) never publishes a + // Batch. This is fine as we'll hit the distinct Session check and then set the correct + // values into this DB entry. All other sessions must complete the handover process, + // which requires having published at least one Batch + let (last_session, first_batch) = + BatchDb::::last_session_to_sign_batch_and_first_batch(&txn) + .unwrap_or((Session(0), 0)); + // Because this boolean was expanded, we lose short-circuiting. That's fine + let handover_batch = last_session != session_to_sign_batch; + let batch_after_handover_batch = + (last_session == session_to_sign_batch) && ((first_batch + 1) == batch.id); + if handover_batch || batch_after_handover_batch { + let verified_prior_batch = substrate::last_acknowledged_batch::(&txn) + // Since `batch.id = 0` in the Session(0)-never-published-a-Batch case, we don't + // check `last_acknowledged_batch >= (batch.id - 1)` but instead this + .map(|last_acknowledged_batch| (last_acknowledged_batch + 1) >= batch.id) + // We've never verified any Batches + .unwrap_or(false); + if !verified_prior_batch { + // Drop the txn to restore the Batch to report to the DB + drop(txn); + break; + } + } + + // If this is the handover Batch, update the last session to sign a Batch + if handover_batch { + BatchDb::::set_last_session_to_sign_batch_and_first_batch( + &mut txn, + session_to_sign_batch, + batch.id, + ); + } + } + + BatchesToSign::send(&mut txn, &external_key_for_session_to_sign_batch.0, &batch); + txn.commit(); + } + + // Run dependents if were able to batch any blocks + Ok(next_block_to_batch <= highest_batchable) + } + } +} diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs index 80b716ae..8790da31 100644 --- a/processor/scanner/src/db.rs +++ b/processor/scanner/src/db.rs @@ -482,7 +482,7 @@ struct BlockBoundInInstructions { } db_channel! { - ScannerScanReport { + ScannerScanBatch { InInstructions: () -> BlockBoundInInstructions, } } @@ -493,8 +493,8 @@ pub(crate) struct InInstructionData { pub(crate) returnable_in_instructions: Vec>, } -pub(crate) struct ScanToReportDb(PhantomData); -impl ScanToReportDb { +pub(crate) struct ScanToBatchDb(PhantomData); +impl ScanToBatchDb { pub(crate) fn send_in_instructions( txn: &mut impl DbTxn, block_number: u64, @@ -545,6 +545,30 @@ impl ScanToReportDb { } } +#[derive(BorshSerialize, BorshDeserialize)] +pub(crate) struct BatchData { + pub(crate) session_to_sign_batch: Session, + pub(crate) external_key_for_session_to_sign_batch: K, + pub(crate) batch: Batch, +} + +db_channel! { + ScannerBatchReport { + BatchToReport: () -> BatchData, + } +} + +pub(crate) struct BatchToReportDb(PhantomData); +impl BatchToReportDb { + pub(crate) fn send_batch(txn: &mut impl DbTxn, batch_data: &BatchData>>) { + BatchToReport::send(txn, batch_data); + } + + pub(crate) fn try_recv_batch(txn: &mut impl DbTxn) -> Option>>> { + BatchToReport::try_recv(txn) + } +} + db_channel! { ScannerSubstrateEventuality { Burns: (acknowledged_block: u64) -> Vec, @@ -583,7 +607,6 @@ mod _public_db { db_channel! { ScannerPublic { - Batches: () -> Batch, BatchesToSign: (key: &[u8]) -> Batch, AcknowledgedBatches: (key: &[u8]) -> u32, CompletedEventualities: (key: &[u8]) -> [u8; 32], @@ -591,21 +614,6 @@ mod _public_db { } } -/// The batches to publish. -/// -/// This is used for auditing the Batches published to Serai. -pub struct Batches; -impl Batches { - pub(crate) fn send(txn: &mut impl DbTxn, batch: &Batch) { - _public_db::Batches::send(txn, batch); - } - - /// Receive a batch to publish. - pub fn try_recv(txn: &mut impl DbTxn) -> Option { - _public_db::Batches::try_recv(txn) - } -} - /// The batches to sign and publish. /// /// This is used for publishing Batches onto Serai. diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 1ef4f8c2..e5b37969 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -23,13 +23,13 @@ pub use lifetime::LifetimeStage; // Database schema definition and associated functions. mod db; use db::ScannerGlobalDb; -pub use db::{Batches, BatchesToSign, AcknowledgedBatches, CompletedEventualities}; +pub use db::{BatchesToSign, AcknowledgedBatches, CompletedEventualities}; // Task to index the blockchain, ensuring we don't reorganize finalized blocks. mod index; // Scans blocks for received coins. mod scan; -/// Task which reports Batches to Substrate. -mod report; +/// Task which creates Batches for Substrate. +mod batch; /// Task which handles events from Substrate once we can. mod substrate; /// Check blocks for transactions expected to eventually occur. @@ -379,24 +379,24 @@ impl Scanner { let index_task = index::IndexTask::new(db.clone(), feed.clone(), start_block).await; let scan_task = scan::ScanTask::new(db.clone(), feed.clone(), start_block); - let report_task = report::ReportTask::<_, S>::new(db.clone(), start_block); + let batch_task = batch::BatchTask::<_, S>::new(db.clone(), start_block); let substrate_task = substrate::SubstrateTask::<_, S>::new(db.clone()); let eventuality_task = eventuality::EventualityTask::<_, _, _>::new(db, feed, scheduler, start_block); let (index_task_def, _index_handle) = Task::new(); let (scan_task_def, scan_handle) = Task::new(); - let (report_task_def, report_handle) = Task::new(); + let (batch_task_def, batch_handle) = Task::new(); let (substrate_task_def, substrate_handle) = Task::new(); let (eventuality_task_def, eventuality_handle) = Task::new(); // Upon indexing a new block, scan it tokio::spawn(index_task.continually_run(index_task_def, vec![scan_handle.clone()])); - // Upon scanning a block, report it - tokio::spawn(scan_task.continually_run(scan_task_def, vec![report_handle])); - // Upon reporting a block, we do nothing (as the burden is on Substrate which won't be - // immediately ready) - tokio::spawn(report_task.continually_run(report_task_def, vec![])); + // Upon scanning a block, creates the batches for it + tokio::spawn(scan_task.continually_run(scan_task_def, vec![batch_handle])); + // Upon creating batches for a block, we do nothing (as the burden is on Substrate which won't + // be immediately ready) + tokio::spawn(batch_task.continually_run(batch_task_def, vec![])); // Upon handling an event from Substrate, we run the Eventuality task (as it's what's affected) tokio::spawn(substrate_task.continually_run(substrate_task_def, vec![eventuality_handle])); // Upon handling the Eventualities in a block, we run the scan task as we've advanced the diff --git a/processor/scanner/src/report/mod.rs b/processor/scanner/src/report/mod.rs deleted file mode 100644 index d3c2995a..00000000 --- a/processor/scanner/src/report/mod.rs +++ /dev/null @@ -1,240 +0,0 @@ -use core::{marker::PhantomData, future::Future}; - -use blake2::{digest::typenum::U32, Digest, Blake2b}; - -use scale::Encode; -use serai_db::{DbTxn, Db}; - -use serai_validator_sets_primitives::Session; -use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch}; - -use primitives::{EncodableG, task::ContinuallyRan}; -use crate::{ - db::{Returnable, ScannerGlobalDb, InInstructionData, ScanToReportDb, Batches, BatchesToSign}, - scan::next_to_scan_for_outputs_block, - substrate, ScannerFeed, KeyFor, -}; - -mod db; -pub(crate) use db::{BatchInfo, ReturnInformation, InternalBatches}; -use db::ReportDb; - -pub(crate) fn take_info_for_batch( - txn: &mut impl DbTxn, - id: u32, -) -> Option>>> { - ReportDb::::take_info_for_batch(txn, id) -} - -pub(crate) fn take_return_information( - txn: &mut impl DbTxn, - id: u32, -) -> Option>>> { - ReportDb::::take_return_information(txn, id) -} - -/* - This task produces Batches for notable blocks, with all InInstructions, in an ordered fashion. - - We only report blocks once both tasks, scanning for received outputs and checking for resolved - Eventualities, have processed the block. This ensures we know if this block is notable, and have - the InInstructions for it. -*/ -#[allow(non_snake_case)] -pub(crate) struct ReportTask { - db: D, - _S: PhantomData, -} - -impl ReportTask { - pub(crate) fn new(mut db: D, start_block: u64) -> Self { - if ReportDb::::next_to_potentially_report_block(&db).is_none() { - // Initialize the DB - let mut txn = db.txn(); - ReportDb::::set_next_to_potentially_report_block(&mut txn, start_block); - txn.commit(); - } - - Self { db, _S: PhantomData } - } -} - -impl ContinuallyRan for ReportTask { - fn run_iteration(&mut self) -> impl Send + Future> { - async move { - let highest_reportable = { - // Fetch the next to scan block - let next_to_scan = next_to_scan_for_outputs_block::(&self.db) - .expect("ReportTask run before writing the start block"); - // If we haven't done any work, return - if next_to_scan == 0 { - return Ok(false); - } - // The last scanned block is the block prior to this - #[allow(clippy::let_and_return)] - let last_scanned = next_to_scan - 1; - // The last scanned block is the highest reportable block as we only scan blocks within a - // window where it's safe to immediately report the block - // See `eventuality.rs` for more info - last_scanned - }; - - let next_to_potentially_report = ReportDb::::next_to_potentially_report_block(&self.db) - .expect("ReportTask run before writing the start block"); - - for block_number in next_to_potentially_report ..= highest_reportable { - let mut txn = self.db.txn(); - - // Receive the InInstructions for this block - // We always do this as we can't trivially tell if we should recv InInstructions before we - // do - let InInstructionData { - session_to_sign_batch, - external_key_for_session_to_sign_batch, - returnable_in_instructions: in_instructions, - } = ScanToReportDb::::recv_in_instructions(&mut txn, block_number); - - let notable = ScannerGlobalDb::::is_block_notable(&txn, block_number); - if !notable { - assert!(in_instructions.is_empty(), "block wasn't notable yet had InInstructions"); - } - // If this block is notable, create the Batch(s) for it - if notable { - let network = S::NETWORK; - let mut batch_id = ReportDb::::acquire_batch_id(&mut txn); - - // start with empty batch - let mut batches = vec![Batch { network, id: batch_id, instructions: vec![] }]; - // We also track the return information for the InInstructions within a Batch in case - // they error - let mut return_information = vec![vec![]]; - - for Returnable { return_address, in_instruction } in in_instructions { - let balance = in_instruction.balance; - - let batch = batches.last_mut().unwrap(); - batch.instructions.push(in_instruction); - - // check if batch is over-size - if batch.encode().len() > MAX_BATCH_SIZE { - // pop the last instruction so it's back in size - let in_instruction = batch.instructions.pop().unwrap(); - - // bump the id for the new batch - batch_id = ReportDb::::acquire_batch_id(&mut txn); - - // make a new batch with this instruction included - batches.push(Batch { network, id: batch_id, instructions: vec![in_instruction] }); - // Since we're allocating a new batch, allocate a new set of return addresses for it - return_information.push(vec![]); - } - - // For the set of return addresses for the InInstructions for the batch we just pushed - // onto, push this InInstruction's return addresses - return_information - .last_mut() - .unwrap() - .push(return_address.map(|address| ReturnInformation { address, balance })); - } - - // Now that we've finalized the Batches, save the information for each to the database - assert_eq!(batches.len(), return_information.len()); - for (batch, return_information) in batches.iter().zip(&return_information) { - assert_eq!(batch.instructions.len(), return_information.len()); - ReportDb::::save_batch_info( - &mut txn, - batch.id, - block_number, - session_to_sign_batch, - external_key_for_session_to_sign_batch, - Blake2b::::digest(batch.instructions.encode()).into(), - ); - ReportDb::::save_return_information(&mut txn, batch.id, return_information); - } - - for batch in batches { - InternalBatches::send( - &mut txn, - &(session_to_sign_batch, EncodableG(external_key_for_session_to_sign_batch), batch), - ); - } - } - - // Update the next to potentially report block - ReportDb::::set_next_to_potentially_report_block(&mut txn, block_number + 1); - - txn.commit(); - } - - // TODO: This should be its own task. The above doesn't error, doesn't return early, so this - // is fine, but this is precarious and would be better as its own task - { - let mut txn = self.db.txn(); - while let Some((session_to_sign_batch, external_key_for_session_to_sign_batch, batch)) = - InternalBatches::>::peek(&txn) - { - /* - If this is the handover Batch, the first Batch signed by a session which retires the - prior validator set, then this should only be signed after the prior validator set's - actions are fully validated. - - The new session will only be responsible for signing this Batch if the prior key has - retired, successfully completed all its on-external-network actions. - - We check here the prior session has successfully completed all its on-Serai-network - actions by ensuring we've validated all Batches expected from it. Only then do we sign - the Batch confirming the handover. - - We also wait for the Batch confirming the handover to be accepted on-chain, ensuring we - don't verify the prior session's Batches, sign the handover Batch and the following - Batch, have the prior session publish a malicious Batch where our handover Batch should - be, before our following Batch becomes our handover Batch. - */ - if session_to_sign_batch != Session(0) { - // We may have Session(1)'s first Batch be Batch 0 if Session(0) never publishes a - // Batch. This is fine as we'll hit the distinct Session check and then set the correct - // values into this DB entry. All other sessions must complete the handover process, - // which requires having published at least one Batch - let (last_session, first_batch) = - ReportDb::::last_session_to_sign_batch_and_first_batch(&txn) - .unwrap_or((Session(0), 0)); - // Because this boolean was expanded, we lose short-circuiting. That's fine - let handover_batch = last_session != session_to_sign_batch; - let batch_after_handover_batch = - (last_session == session_to_sign_batch) && ((first_batch + 1) == batch.id); - if handover_batch || batch_after_handover_batch { - let verified_prior_batch = substrate::last_acknowledged_batch::(&txn) - // Since `batch.id = 0` in the Session(0)-never-published-a-Batch case, we don't - // check `last_acknowledged_batch >= (batch.id - 1)` but instead this - .map(|last_acknowledged_batch| (last_acknowledged_batch + 1) >= batch.id) - // We've never verified any Batches - .unwrap_or(false); - if !verified_prior_batch { - break; - } - } - - // If this is the handover Batch, update the last session to sign a Batch - if handover_batch { - ReportDb::::set_last_session_to_sign_batch_and_first_batch( - &mut txn, - session_to_sign_batch, - batch.id, - ); - } - } - - // Since we should handle this batch now, recv it from the channel - InternalBatches::>::try_recv(&mut txn).unwrap(); - - Batches::send(&mut txn, &batch); - BatchesToSign::send(&mut txn, &external_key_for_session_to_sign_batch.0, &batch); - } - txn.commit(); - } - - // Run dependents if we decided to report any blocks - Ok(next_to_potentially_report <= highest_reportable) - } - } -} diff --git a/processor/scanner/src/scan/mod.rs b/processor/scanner/src/scan/mod.rs index 14506092..25127ace 100644 --- a/processor/scanner/src/scan/mod.rs +++ b/processor/scanner/src/scan/mod.rs @@ -14,7 +14,7 @@ use crate::{ lifetime::LifetimeStage, db::{ OutputWithInInstruction, Returnable, SenderScanData, ScannerGlobalDb, InInstructionData, - ScanToReportDb, ScanToEventualityDb, + ScanToBatchDb, ScanToEventualityDb, }, BlockExt, ScannerFeed, AddressFor, OutputFor, Return, sort_outputs, eventuality::latest_scannable_block, @@ -345,7 +345,7 @@ impl ContinuallyRan for ScanTask { // We need to also specify which key is responsible for signing the Batch for these, which // will always be the oldest key (as the new key signing the Batch signifies handover // acceptance) - ScanToReportDb::::send_in_instructions( + ScanToBatchDb::::send_in_instructions( &mut txn, b, &InInstructionData { diff --git a/processor/scanner/src/substrate/mod.rs b/processor/scanner/src/substrate/mod.rs index fddc7453..506debd4 100644 --- a/processor/scanner/src/substrate/mod.rs +++ b/processor/scanner/src/substrate/mod.rs @@ -8,7 +8,7 @@ use serai_validator_sets_primitives::Session; use primitives::task::ContinuallyRan; use crate::{ db::{ScannerGlobalDb, SubstrateToEventualityDb, AcknowledgedBatches}, - report, ScannerFeed, KeyFor, + batch, ScannerFeed, KeyFor, }; mod db; @@ -82,12 +82,12 @@ impl ContinuallyRan for SubstrateTask { key_to_activate, }) => { // Check if we have the information for this batch - let Some(report::BatchInfo { + let Some(batch::BatchInfo { block_number, session_to_sign_batch, external_key_for_session_to_sign_batch, in_instructions_hash: expected_in_instructions_hash, - }) = report::take_info_for_batch::(&mut txn, batch_id) + }) = batch::take_info_for_batch::(&mut txn, batch_id) else { // If we don't, drop this txn (restoring the action to the database) drop(txn); @@ -143,7 +143,7 @@ impl ContinuallyRan for SubstrateTask { // Return the balances for any InInstructions which failed to execute { - let return_information = report::take_return_information::(&mut txn, batch_id) + let return_information = batch::take_return_information::(&mut txn, batch_id) .expect("didn't save the return information for Batch we published"); assert_eq!( in_instruction_results.len(), @@ -159,7 +159,7 @@ impl ContinuallyRan for SubstrateTask { continue; } - if let Some(report::ReturnInformation { address, balance }) = return_information { + if let Some(batch::ReturnInformation { address, balance }) = return_information { burns.push(OutInstructionWithBalance { instruction: OutInstruction { address: address.into() }, balance, diff --git a/processor/signers/src/coordinator/mod.rs b/processor/signers/src/coordinator/mod.rs index 1e3c84d2..b57742a5 100644 --- a/processor/signers/src/coordinator/mod.rs +++ b/processor/signers/src/coordinator/mod.rs @@ -136,20 +136,6 @@ impl ContinuallyRan for CoordinatorTask { } } - // Publish the Batches - { - let mut txn = self.db.txn(); - while let Some(batch) = scanner::Batches::try_recv(&mut txn) { - iterated = true; - self - .coordinator - .publish_batch(batch) - .await - .map_err(|e| format!("couldn't publish Batch: {e:?}"))?; - } - txn.commit(); - } - // Publish the signed Batches { let mut txn = self.db.txn(); diff --git a/processor/signers/src/lib.rs b/processor/signers/src/lib.rs index 4943e91d..40e538aa 100644 --- a/processor/signers/src/lib.rs +++ b/processor/signers/src/lib.rs @@ -64,12 +64,6 @@ pub trait Coordinator: 'static + Send + Sync { signature: Signature, ) -> impl Send + Future>; - /// Publish a `Batch`. - fn publish_batch( - &mut self, - batch: Batch, - ) -> impl Send + Future>; - /// Publish a `SignedBatch`. fn publish_signed_batch( &mut self,