Rename Report task to Batch task

This commit is contained in:
Luke Parker
2024-12-30 10:49:35 -05:00
parent 458f4fe170
commit f0094b3c7c
11 changed files with 319 additions and 341 deletions

View File

@@ -196,18 +196,6 @@ impl signers::Coordinator for CoordinatorSend {
}
}
fn publish_batch(
&mut self,
batch: Batch,
) -> impl Send + Future<Output = Result<(), Self::EphemeralError>> {
async move {
self.send(&messages::ProcessorMessage::Substrate(
messages::substrate::ProcessorMessage::Batch { batch },
));
Ok(())
}
}
fn publish_signed_batch(
&mut self,
batch: SignedBatch,

View File

@@ -9,7 +9,7 @@ use dkg::Participant;
use serai_primitives::BlockHash;
use validator_sets_primitives::{Session, KeyPair, Slash};
use coins_primitives::OutInstructionWithBalance;
use in_instructions_primitives::{Batch, SignedBatch};
use in_instructions_primitives::SignedBatch;
#[derive(Clone, Copy, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
pub struct SubstrateContext {
@@ -208,9 +208,17 @@ pub mod substrate {
},
}
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
pub enum ProcessorMessage {
Batch { batch: Batch },
#[derive(Clone, PartialEq, Eq, Debug)]
pub enum ProcessorMessage {}
impl BorshSerialize for ProcessorMessage {
fn serialize<W: borsh::io::Write>(&self, _writer: &mut W) -> borsh::io::Result<()> {
unimplemented!()
}
}
impl BorshDeserialize for ProcessorMessage {
fn deserialize_reader<R: borsh::io::Read>(_reader: &mut R) -> borsh::io::Result<Self> {
unimplemented!()
}
}
}
@@ -383,15 +391,7 @@ impl ProcessorMessage {
res.extend(&id);
res
}
ProcessorMessage::Substrate(msg) => {
let (sub, id) = match msg {
substrate::ProcessorMessage::Batch { batch } => (0, batch.id.encode()),
};
let mut res = vec![PROCESSOR_UID, TYPE_SUBSTRATE_UID, sub];
res.extend(&id);
res
}
ProcessorMessage::Substrate(_) => panic!("requesting intent for empty message type"),
}
}
}

View File

@@ -5,11 +5,10 @@ use group::GroupEncoding;
use scale::{Encode, Decode, IoReader};
use borsh::{BorshSerialize, BorshDeserialize};
use serai_db::{Get, DbTxn, create_db, db_channel};
use serai_db::{Get, DbTxn, create_db};
use serai_primitives::Balance;
use serai_validator_sets_primitives::Session;
use serai_in_instructions_primitives::Batch;
use primitives::EncodableG;
use crate::{ScannerFeed, KeyFor, AddressFor};
@@ -23,9 +22,9 @@ pub(crate) struct BatchInfo<K: BorshSerialize> {
}
create_db!(
ScannerReport {
// The next block to potentially report
NextToPotentiallyReportBlock: () -> u64,
ScannerBatch {
// The next block to create batches for
NextBlockToBatch: () -> u64,
// The last session to sign a Batch and their first Batch signed
LastSessionToSignBatchAndFirstBatch: () -> (Session, u32),
@@ -41,19 +40,13 @@ create_db!(
}
);
db_channel!(
ScannerReport {
InternalBatches: <G: GroupEncoding>() -> (Session, EncodableG<G>, Batch),
}
);
pub(crate) struct ReturnInformation<S: ScannerFeed> {
pub(crate) address: AddressFor<S>,
pub(crate) balance: Balance,
}
pub(crate) struct ReportDb<S: ScannerFeed>(PhantomData<S>);
impl<S: ScannerFeed> ReportDb<S> {
pub(crate) struct BatchDb<S: ScannerFeed>(PhantomData<S>);
impl<S: ScannerFeed> BatchDb<S> {
pub(crate) fn set_last_session_to_sign_batch_and_first_batch(
txn: &mut impl DbTxn,
session: Session,
@@ -67,14 +60,11 @@ impl<S: ScannerFeed> ReportDb<S> {
LastSessionToSignBatchAndFirstBatch::get(getter)
}
pub(crate) fn set_next_to_potentially_report_block(
txn: &mut impl DbTxn,
next_to_potentially_report_block: u64,
) {
NextToPotentiallyReportBlock::set(txn, &next_to_potentially_report_block);
pub(crate) fn set_next_block_to_batch(txn: &mut impl DbTxn, next_block_to_batch: u64) {
NextBlockToBatch::set(txn, &next_block_to_batch);
}
pub(crate) fn next_to_potentially_report_block(getter: &impl Get) -> Option<u64> {
NextToPotentiallyReportBlock::get(getter)
pub(crate) fn next_block_to_batch(getter: &impl Get) -> Option<u64> {
NextBlockToBatch::get(getter)
}
pub(crate) fn acquire_batch_id(txn: &mut impl DbTxn) -> u32 {

View File

@@ -0,0 +1,252 @@
use core::{marker::PhantomData, future::Future};
use blake2::{digest::typenum::U32, Digest, Blake2b};
use scale::Encode;
use serai_db::{DbTxn, Db};
use serai_validator_sets_primitives::Session;
use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch};
use primitives::{EncodableG, task::ContinuallyRan};
use crate::{
db::{
Returnable, ScannerGlobalDb, InInstructionData, ScanToBatchDb, BatchData, BatchToReportDb,
BatchesToSign,
},
scan::next_to_scan_for_outputs_block,
substrate, ScannerFeed, KeyFor,
};
mod db;
pub(crate) use db::{BatchInfo, ReturnInformation};
use db::BatchDb;
pub(crate) fn take_info_for_batch<S: ScannerFeed>(
txn: &mut impl DbTxn,
id: u32,
) -> Option<BatchInfo<EncodableG<KeyFor<S>>>> {
BatchDb::<S>::take_info_for_batch(txn, id)
}
pub(crate) fn take_return_information<S: ScannerFeed>(
txn: &mut impl DbTxn,
id: u32,
) -> Option<Vec<Option<ReturnInformation<S>>>> {
BatchDb::<S>::take_return_information(txn, id)
}
/*
This task produces Batches for notable blocks, with all InInstructions, in an ordered fashion.
We only produce batches once both tasks, scanning for received outputs and checking for resolved
Eventualities, have processed the block. This ensures we know if this block is notable, and have
the InInstructions for it.
*/
#[allow(non_snake_case)]
pub(crate) struct BatchTask<D: Db, S: ScannerFeed> {
db: D,
_S: PhantomData<S>,
}
impl<D: Db, S: ScannerFeed> BatchTask<D, S> {
pub(crate) fn new(mut db: D, start_block: u64) -> Self {
if BatchDb::<S>::next_block_to_batch(&db).is_none() {
// Initialize the DB
let mut txn = db.txn();
BatchDb::<S>::set_next_block_to_batch(&mut txn, start_block);
txn.commit();
}
Self { db, _S: PhantomData }
}
}
impl<D: Db, S: ScannerFeed> ContinuallyRan for BatchTask<D, S> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move {
let highest_batchable = {
// Fetch the next to scan block
let next_to_scan = next_to_scan_for_outputs_block::<S>(&self.db)
.expect("BatchTask run before writing the start block");
// If we haven't done any work, return
if next_to_scan == 0 {
return Ok(false);
}
// The last scanned block is the block prior to this
#[allow(clippy::let_and_return)]
let last_scanned = next_to_scan - 1;
// The last scanned block is the highest batchable block as we only scan blocks within a
// window where it's safe to immediately report the block
// See `eventuality.rs` for more info
last_scanned
};
let next_block_to_batch = BatchDb::<S>::next_block_to_batch(&self.db)
.expect("BatchTask run before writing the start block");
for block_number in next_block_to_batch ..= highest_batchable {
let mut txn = self.db.txn();
// Receive the InInstructions for this block
// We always do this as we can't trivially tell if we should recv InInstructions before we
// do
let InInstructionData {
session_to_sign_batch,
external_key_for_session_to_sign_batch,
returnable_in_instructions: in_instructions,
} = ScanToBatchDb::<S>::recv_in_instructions(&mut txn, block_number);
let notable = ScannerGlobalDb::<S>::is_block_notable(&txn, block_number);
if !notable {
assert!(in_instructions.is_empty(), "block wasn't notable yet had InInstructions");
}
// If this block is notable, create the Batch(s) for it
if notable {
let network = S::NETWORK;
let mut batch_id = BatchDb::<S>::acquire_batch_id(&mut txn);
// start with empty batch
let mut batches = vec![Batch { network, id: batch_id, instructions: vec![] }];
// We also track the return information for the InInstructions within a Batch in case
// they error
let mut return_information = vec![vec![]];
for Returnable { return_address, in_instruction } in in_instructions {
let balance = in_instruction.balance;
let batch = batches.last_mut().unwrap();
batch.instructions.push(in_instruction);
// check if batch is over-size
if batch.encode().len() > MAX_BATCH_SIZE {
// pop the last instruction so it's back in size
let in_instruction = batch.instructions.pop().unwrap();
// bump the id for the new batch
batch_id = BatchDb::<S>::acquire_batch_id(&mut txn);
// make a new batch with this instruction included
batches.push(Batch { network, id: batch_id, instructions: vec![in_instruction] });
// Since we're allocating a new batch, allocate a new set of return addresses for it
return_information.push(vec![]);
}
// For the set of return addresses for the InInstructions for the batch we just pushed
// onto, push this InInstruction's return addresses
return_information
.last_mut()
.unwrap()
.push(return_address.map(|address| ReturnInformation { address, balance }));
}
// Now that we've finalized the Batches, save the information for each to the database
assert_eq!(batches.len(), return_information.len());
for (batch, return_information) in batches.iter().zip(&return_information) {
assert_eq!(batch.instructions.len(), return_information.len());
BatchDb::<S>::save_batch_info(
&mut txn,
batch.id,
block_number,
session_to_sign_batch,
external_key_for_session_to_sign_batch,
Blake2b::<U32>::digest(batch.instructions.encode()).into(),
);
BatchDb::<S>::save_return_information(&mut txn, batch.id, return_information);
}
for batch in batches {
BatchToReportDb::<S>::send_batch(
&mut txn,
&BatchData {
session_to_sign_batch,
external_key_for_session_to_sign_batch: EncodableG(
external_key_for_session_to_sign_batch,
),
batch,
},
);
}
}
// Update the next block to batch
BatchDb::<S>::set_next_block_to_batch(&mut txn, block_number + 1);
txn.commit();
}
// TODO: This should be its own task. The above doesn't error, doesn't return early, so this
// is fine, but this is precarious and would be better as its own task
loop {
let mut txn = self.db.txn();
let Some(BatchData {
session_to_sign_batch,
external_key_for_session_to_sign_batch,
batch,
}) = BatchToReportDb::<S>::try_recv_batch(&mut txn)
else {
break;
};
/*
If this is the handover Batch, the first Batch signed by a session which retires the
prior validator set, then this should only be signed after the prior validator set's
actions are fully validated.
The new session will only be responsible for signing this Batch if the prior key has
retired, successfully completed all its on-external-network actions.
We check here the prior session has successfully completed all its on-Serai-network
actions by ensuring we've validated all Batches expected from it. Only then do we sign
the Batch confirming the handover.
We also wait for the Batch confirming the handover to be accepted on-chain, ensuring we
don't verify the prior session's Batches, sign the handover Batch and the following
Batch, have the prior session publish a malicious Batch where our handover Batch should
be, before our following Batch becomes our handover Batch.
*/
if session_to_sign_batch != Session(0) {
// We may have Session(1)'s first Batch be Batch 0 if Session(0) never publishes a
// Batch. This is fine as we'll hit the distinct Session check and then set the correct
// values into this DB entry. All other sessions must complete the handover process,
// which requires having published at least one Batch
let (last_session, first_batch) =
BatchDb::<S>::last_session_to_sign_batch_and_first_batch(&txn)
.unwrap_or((Session(0), 0));
// Because this boolean was expanded, we lose short-circuiting. That's fine
let handover_batch = last_session != session_to_sign_batch;
let batch_after_handover_batch =
(last_session == session_to_sign_batch) && ((first_batch + 1) == batch.id);
if handover_batch || batch_after_handover_batch {
let verified_prior_batch = substrate::last_acknowledged_batch::<S>(&txn)
// Since `batch.id = 0` in the Session(0)-never-published-a-Batch case, we don't
// check `last_acknowledged_batch >= (batch.id - 1)` but instead this
.map(|last_acknowledged_batch| (last_acknowledged_batch + 1) >= batch.id)
// We've never verified any Batches
.unwrap_or(false);
if !verified_prior_batch {
// Drop the txn to restore the Batch to report to the DB
drop(txn);
break;
}
}
// If this is the handover Batch, update the last session to sign a Batch
if handover_batch {
BatchDb::<S>::set_last_session_to_sign_batch_and_first_batch(
&mut txn,
session_to_sign_batch,
batch.id,
);
}
}
BatchesToSign::send(&mut txn, &external_key_for_session_to_sign_batch.0, &batch);
txn.commit();
}
// Run dependents if were able to batch any blocks
Ok(next_block_to_batch <= highest_batchable)
}
}
}

View File

@@ -482,7 +482,7 @@ struct BlockBoundInInstructions {
}
db_channel! {
ScannerScanReport {
ScannerScanBatch {
InInstructions: () -> BlockBoundInInstructions,
}
}
@@ -493,8 +493,8 @@ pub(crate) struct InInstructionData<S: ScannerFeed> {
pub(crate) returnable_in_instructions: Vec<Returnable<S>>,
}
pub(crate) struct ScanToReportDb<S: ScannerFeed>(PhantomData<S>);
impl<S: ScannerFeed> ScanToReportDb<S> {
pub(crate) struct ScanToBatchDb<S: ScannerFeed>(PhantomData<S>);
impl<S: ScannerFeed> ScanToBatchDb<S> {
pub(crate) fn send_in_instructions(
txn: &mut impl DbTxn,
block_number: u64,
@@ -545,6 +545,30 @@ impl<S: ScannerFeed> ScanToReportDb<S> {
}
}
#[derive(BorshSerialize, BorshDeserialize)]
pub(crate) struct BatchData<K: BorshSerialize + BorshDeserialize> {
pub(crate) session_to_sign_batch: Session,
pub(crate) external_key_for_session_to_sign_batch: K,
pub(crate) batch: Batch,
}
db_channel! {
ScannerBatchReport {
BatchToReport: <K: Borshy>() -> BatchData<K>,
}
}
pub(crate) struct BatchToReportDb<S: ScannerFeed>(PhantomData<S>);
impl<S: ScannerFeed> BatchToReportDb<S> {
pub(crate) fn send_batch(txn: &mut impl DbTxn, batch_data: &BatchData<EncodableG<KeyFor<S>>>) {
BatchToReport::send(txn, batch_data);
}
pub(crate) fn try_recv_batch(txn: &mut impl DbTxn) -> Option<BatchData<EncodableG<KeyFor<S>>>> {
BatchToReport::try_recv(txn)
}
}
db_channel! {
ScannerSubstrateEventuality {
Burns: (acknowledged_block: u64) -> Vec<OutInstructionWithBalance>,
@@ -583,7 +607,6 @@ mod _public_db {
db_channel! {
ScannerPublic {
Batches: () -> Batch,
BatchesToSign: (key: &[u8]) -> Batch,
AcknowledgedBatches: (key: &[u8]) -> u32,
CompletedEventualities: (key: &[u8]) -> [u8; 32],
@@ -591,21 +614,6 @@ mod _public_db {
}
}
/// The batches to publish.
///
/// This is used for auditing the Batches published to Serai.
pub struct Batches;
impl Batches {
pub(crate) fn send(txn: &mut impl DbTxn, batch: &Batch) {
_public_db::Batches::send(txn, batch);
}
/// Receive a batch to publish.
pub fn try_recv(txn: &mut impl DbTxn) -> Option<Batch> {
_public_db::Batches::try_recv(txn)
}
}
/// The batches to sign and publish.
///
/// This is used for publishing Batches onto Serai.

View File

@@ -23,13 +23,13 @@ pub use lifetime::LifetimeStage;
// Database schema definition and associated functions.
mod db;
use db::ScannerGlobalDb;
pub use db::{Batches, BatchesToSign, AcknowledgedBatches, CompletedEventualities};
pub use db::{BatchesToSign, AcknowledgedBatches, CompletedEventualities};
// Task to index the blockchain, ensuring we don't reorganize finalized blocks.
mod index;
// Scans blocks for received coins.
mod scan;
/// Task which reports Batches to Substrate.
mod report;
/// Task which creates Batches for Substrate.
mod batch;
/// Task which handles events from Substrate once we can.
mod substrate;
/// Check blocks for transactions expected to eventually occur.
@@ -379,24 +379,24 @@ impl<S: ScannerFeed> Scanner<S> {
let index_task = index::IndexTask::new(db.clone(), feed.clone(), start_block).await;
let scan_task = scan::ScanTask::new(db.clone(), feed.clone(), start_block);
let report_task = report::ReportTask::<_, S>::new(db.clone(), start_block);
let batch_task = batch::BatchTask::<_, S>::new(db.clone(), start_block);
let substrate_task = substrate::SubstrateTask::<_, S>::new(db.clone());
let eventuality_task =
eventuality::EventualityTask::<_, _, _>::new(db, feed, scheduler, start_block);
let (index_task_def, _index_handle) = Task::new();
let (scan_task_def, scan_handle) = Task::new();
let (report_task_def, report_handle) = Task::new();
let (batch_task_def, batch_handle) = Task::new();
let (substrate_task_def, substrate_handle) = Task::new();
let (eventuality_task_def, eventuality_handle) = Task::new();
// Upon indexing a new block, scan it
tokio::spawn(index_task.continually_run(index_task_def, vec![scan_handle.clone()]));
// Upon scanning a block, report it
tokio::spawn(scan_task.continually_run(scan_task_def, vec![report_handle]));
// Upon reporting a block, we do nothing (as the burden is on Substrate which won't be
// immediately ready)
tokio::spawn(report_task.continually_run(report_task_def, vec![]));
// Upon scanning a block, creates the batches for it
tokio::spawn(scan_task.continually_run(scan_task_def, vec![batch_handle]));
// Upon creating batches for a block, we do nothing (as the burden is on Substrate which won't
// be immediately ready)
tokio::spawn(batch_task.continually_run(batch_task_def, vec![]));
// Upon handling an event from Substrate, we run the Eventuality task (as it's what's affected)
tokio::spawn(substrate_task.continually_run(substrate_task_def, vec![eventuality_handle]));
// Upon handling the Eventualities in a block, we run the scan task as we've advanced the

View File

@@ -1,240 +0,0 @@
use core::{marker::PhantomData, future::Future};
use blake2::{digest::typenum::U32, Digest, Blake2b};
use scale::Encode;
use serai_db::{DbTxn, Db};
use serai_validator_sets_primitives::Session;
use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch};
use primitives::{EncodableG, task::ContinuallyRan};
use crate::{
db::{Returnable, ScannerGlobalDb, InInstructionData, ScanToReportDb, Batches, BatchesToSign},
scan::next_to_scan_for_outputs_block,
substrate, ScannerFeed, KeyFor,
};
mod db;
pub(crate) use db::{BatchInfo, ReturnInformation, InternalBatches};
use db::ReportDb;
pub(crate) fn take_info_for_batch<S: ScannerFeed>(
txn: &mut impl DbTxn,
id: u32,
) -> Option<BatchInfo<EncodableG<KeyFor<S>>>> {
ReportDb::<S>::take_info_for_batch(txn, id)
}
pub(crate) fn take_return_information<S: ScannerFeed>(
txn: &mut impl DbTxn,
id: u32,
) -> Option<Vec<Option<ReturnInformation<S>>>> {
ReportDb::<S>::take_return_information(txn, id)
}
/*
This task produces Batches for notable blocks, with all InInstructions, in an ordered fashion.
We only report blocks once both tasks, scanning for received outputs and checking for resolved
Eventualities, have processed the block. This ensures we know if this block is notable, and have
the InInstructions for it.
*/
#[allow(non_snake_case)]
pub(crate) struct ReportTask<D: Db, S: ScannerFeed> {
db: D,
_S: PhantomData<S>,
}
impl<D: Db, S: ScannerFeed> ReportTask<D, S> {
pub(crate) fn new(mut db: D, start_block: u64) -> Self {
if ReportDb::<S>::next_to_potentially_report_block(&db).is_none() {
// Initialize the DB
let mut txn = db.txn();
ReportDb::<S>::set_next_to_potentially_report_block(&mut txn, start_block);
txn.commit();
}
Self { db, _S: PhantomData }
}
}
impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move {
let highest_reportable = {
// Fetch the next to scan block
let next_to_scan = next_to_scan_for_outputs_block::<S>(&self.db)
.expect("ReportTask run before writing the start block");
// If we haven't done any work, return
if next_to_scan == 0 {
return Ok(false);
}
// The last scanned block is the block prior to this
#[allow(clippy::let_and_return)]
let last_scanned = next_to_scan - 1;
// The last scanned block is the highest reportable block as we only scan blocks within a
// window where it's safe to immediately report the block
// See `eventuality.rs` for more info
last_scanned
};
let next_to_potentially_report = ReportDb::<S>::next_to_potentially_report_block(&self.db)
.expect("ReportTask run before writing the start block");
for block_number in next_to_potentially_report ..= highest_reportable {
let mut txn = self.db.txn();
// Receive the InInstructions for this block
// We always do this as we can't trivially tell if we should recv InInstructions before we
// do
let InInstructionData {
session_to_sign_batch,
external_key_for_session_to_sign_batch,
returnable_in_instructions: in_instructions,
} = ScanToReportDb::<S>::recv_in_instructions(&mut txn, block_number);
let notable = ScannerGlobalDb::<S>::is_block_notable(&txn, block_number);
if !notable {
assert!(in_instructions.is_empty(), "block wasn't notable yet had InInstructions");
}
// If this block is notable, create the Batch(s) for it
if notable {
let network = S::NETWORK;
let mut batch_id = ReportDb::<S>::acquire_batch_id(&mut txn);
// start with empty batch
let mut batches = vec![Batch { network, id: batch_id, instructions: vec![] }];
// We also track the return information for the InInstructions within a Batch in case
// they error
let mut return_information = vec![vec![]];
for Returnable { return_address, in_instruction } in in_instructions {
let balance = in_instruction.balance;
let batch = batches.last_mut().unwrap();
batch.instructions.push(in_instruction);
// check if batch is over-size
if batch.encode().len() > MAX_BATCH_SIZE {
// pop the last instruction so it's back in size
let in_instruction = batch.instructions.pop().unwrap();
// bump the id for the new batch
batch_id = ReportDb::<S>::acquire_batch_id(&mut txn);
// make a new batch with this instruction included
batches.push(Batch { network, id: batch_id, instructions: vec![in_instruction] });
// Since we're allocating a new batch, allocate a new set of return addresses for it
return_information.push(vec![]);
}
// For the set of return addresses for the InInstructions for the batch we just pushed
// onto, push this InInstruction's return addresses
return_information
.last_mut()
.unwrap()
.push(return_address.map(|address| ReturnInformation { address, balance }));
}
// Now that we've finalized the Batches, save the information for each to the database
assert_eq!(batches.len(), return_information.len());
for (batch, return_information) in batches.iter().zip(&return_information) {
assert_eq!(batch.instructions.len(), return_information.len());
ReportDb::<S>::save_batch_info(
&mut txn,
batch.id,
block_number,
session_to_sign_batch,
external_key_for_session_to_sign_batch,
Blake2b::<U32>::digest(batch.instructions.encode()).into(),
);
ReportDb::<S>::save_return_information(&mut txn, batch.id, return_information);
}
for batch in batches {
InternalBatches::send(
&mut txn,
&(session_to_sign_batch, EncodableG(external_key_for_session_to_sign_batch), batch),
);
}
}
// Update the next to potentially report block
ReportDb::<S>::set_next_to_potentially_report_block(&mut txn, block_number + 1);
txn.commit();
}
// TODO: This should be its own task. The above doesn't error, doesn't return early, so this
// is fine, but this is precarious and would be better as its own task
{
let mut txn = self.db.txn();
while let Some((session_to_sign_batch, external_key_for_session_to_sign_batch, batch)) =
InternalBatches::<KeyFor<S>>::peek(&txn)
{
/*
If this is the handover Batch, the first Batch signed by a session which retires the
prior validator set, then this should only be signed after the prior validator set's
actions are fully validated.
The new session will only be responsible for signing this Batch if the prior key has
retired, successfully completed all its on-external-network actions.
We check here the prior session has successfully completed all its on-Serai-network
actions by ensuring we've validated all Batches expected from it. Only then do we sign
the Batch confirming the handover.
We also wait for the Batch confirming the handover to be accepted on-chain, ensuring we
don't verify the prior session's Batches, sign the handover Batch and the following
Batch, have the prior session publish a malicious Batch where our handover Batch should
be, before our following Batch becomes our handover Batch.
*/
if session_to_sign_batch != Session(0) {
// We may have Session(1)'s first Batch be Batch 0 if Session(0) never publishes a
// Batch. This is fine as we'll hit the distinct Session check and then set the correct
// values into this DB entry. All other sessions must complete the handover process,
// which requires having published at least one Batch
let (last_session, first_batch) =
ReportDb::<S>::last_session_to_sign_batch_and_first_batch(&txn)
.unwrap_or((Session(0), 0));
// Because this boolean was expanded, we lose short-circuiting. That's fine
let handover_batch = last_session != session_to_sign_batch;
let batch_after_handover_batch =
(last_session == session_to_sign_batch) && ((first_batch + 1) == batch.id);
if handover_batch || batch_after_handover_batch {
let verified_prior_batch = substrate::last_acknowledged_batch::<S>(&txn)
// Since `batch.id = 0` in the Session(0)-never-published-a-Batch case, we don't
// check `last_acknowledged_batch >= (batch.id - 1)` but instead this
.map(|last_acknowledged_batch| (last_acknowledged_batch + 1) >= batch.id)
// We've never verified any Batches
.unwrap_or(false);
if !verified_prior_batch {
break;
}
}
// If this is the handover Batch, update the last session to sign a Batch
if handover_batch {
ReportDb::<S>::set_last_session_to_sign_batch_and_first_batch(
&mut txn,
session_to_sign_batch,
batch.id,
);
}
}
// Since we should handle this batch now, recv it from the channel
InternalBatches::<KeyFor<S>>::try_recv(&mut txn).unwrap();
Batches::send(&mut txn, &batch);
BatchesToSign::send(&mut txn, &external_key_for_session_to_sign_batch.0, &batch);
}
txn.commit();
}
// Run dependents if we decided to report any blocks
Ok(next_to_potentially_report <= highest_reportable)
}
}
}

View File

@@ -14,7 +14,7 @@ use crate::{
lifetime::LifetimeStage,
db::{
OutputWithInInstruction, Returnable, SenderScanData, ScannerGlobalDb, InInstructionData,
ScanToReportDb, ScanToEventualityDb,
ScanToBatchDb, ScanToEventualityDb,
},
BlockExt, ScannerFeed, AddressFor, OutputFor, Return, sort_outputs,
eventuality::latest_scannable_block,
@@ -345,7 +345,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanTask<D, S> {
// We need to also specify which key is responsible for signing the Batch for these, which
// will always be the oldest key (as the new key signing the Batch signifies handover
// acceptance)
ScanToReportDb::<S>::send_in_instructions(
ScanToBatchDb::<S>::send_in_instructions(
&mut txn,
b,
&InInstructionData {

View File

@@ -8,7 +8,7 @@ use serai_validator_sets_primitives::Session;
use primitives::task::ContinuallyRan;
use crate::{
db::{ScannerGlobalDb, SubstrateToEventualityDb, AcknowledgedBatches},
report, ScannerFeed, KeyFor,
batch, ScannerFeed, KeyFor,
};
mod db;
@@ -82,12 +82,12 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for SubstrateTask<D, S> {
key_to_activate,
}) => {
// Check if we have the information for this batch
let Some(report::BatchInfo {
let Some(batch::BatchInfo {
block_number,
session_to_sign_batch,
external_key_for_session_to_sign_batch,
in_instructions_hash: expected_in_instructions_hash,
}) = report::take_info_for_batch::<S>(&mut txn, batch_id)
}) = batch::take_info_for_batch::<S>(&mut txn, batch_id)
else {
// If we don't, drop this txn (restoring the action to the database)
drop(txn);
@@ -143,7 +143,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for SubstrateTask<D, S> {
// Return the balances for any InInstructions which failed to execute
{
let return_information = report::take_return_information::<S>(&mut txn, batch_id)
let return_information = batch::take_return_information::<S>(&mut txn, batch_id)
.expect("didn't save the return information for Batch we published");
assert_eq!(
in_instruction_results.len(),
@@ -159,7 +159,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for SubstrateTask<D, S> {
continue;
}
if let Some(report::ReturnInformation { address, balance }) = return_information {
if let Some(batch::ReturnInformation { address, balance }) = return_information {
burns.push(OutInstructionWithBalance {
instruction: OutInstruction { address: address.into() },
balance,

View File

@@ -136,20 +136,6 @@ impl<D: Db, C: Coordinator> ContinuallyRan for CoordinatorTask<D, C> {
}
}
// Publish the Batches
{
let mut txn = self.db.txn();
while let Some(batch) = scanner::Batches::try_recv(&mut txn) {
iterated = true;
self
.coordinator
.publish_batch(batch)
.await
.map_err(|e| format!("couldn't publish Batch: {e:?}"))?;
}
txn.commit();
}
// Publish the signed Batches
{
let mut txn = self.db.txn();

View File

@@ -64,12 +64,6 @@ pub trait Coordinator: 'static + Send + Sync {
signature: Signature,
) -> impl Send + Future<Output = Result<(), Self::EphemeralError>>;
/// Publish a `Batch`.
fn publish_batch(
&mut self,
batch: Batch,
) -> impl Send + Future<Output = Result<(), Self::EphemeralError>>;
/// Publish a `SignedBatch`.
fn publish_signed_batch(
&mut self,