5 Commits

Author SHA1 Message Date
Luke Parker
1de8136739 Remove Session from VariantSignId::SlashReport
It's only there to make the VariantSignid unique across Sessions. By localizing
the VariantSignid to a Session, we avoid this, and can better ensure we don't
queue work for historic sessions.
2024-12-30 06:16:03 -05:00
Luke Parker
445c49f030 Have the scanner's report task ensure handovers only occur if Batchs are valid
This is incomplete at this time. The logic is fine, but needs to be moved to a
distinct location to handle singular blocks which produce multiple Batches.
2024-12-30 06:11:47 -05:00
Luke Parker
5b74fc8ac1 Merge ExternalKeyForSessionToSignBatch into InfoForBatch 2024-12-30 05:34:13 -05:00
Luke Parker
e67e301fc2 Have the processor verify the published Batches match expectations 2024-12-30 05:21:26 -05:00
Luke Parker
1d50792eed Document serai-db with bounds and intent 2024-12-26 02:35:32 -05:00
24 changed files with 486 additions and 320 deletions

376
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
[package]
name = "serai-db"
version = "0.1.0"
version = "0.1.1"
description = "A simple database trait and backends for it"
license = "MIT"
repository = "https://github.com/serai-dex/serai/tree/develop/common/db"
@@ -18,7 +18,7 @@ workspace = true
[dependencies]
parity-db = { version = "0.4", default-features = false, optional = true }
rocksdb = { version = "0.21", default-features = false, features = ["zstd"], optional = true }
rocksdb = { version = "0.23", default-features = false, features = ["zstd"], optional = true }
[features]
parity-db = ["dep:parity-db"]

8
common/db/README.md Normal file
View File

@@ -0,0 +1,8 @@
# Serai DB
An inefficient, minimal abstraction around databases.
The abstraction offers `get`, `put`, and `del` with helper functions and macros
built on top. Database iteration is not offered, forcing the caller to manually
implement indexing schemes. This ensures wide compatibility across abstracted
databases.

View File

@@ -14,26 +14,43 @@ mod parity_db;
#[cfg(feature = "parity-db")]
pub use parity_db::{ParityDb, new_parity_db};
/// An object implementing get.
/// An object implementing `get`.
pub trait Get {
/// Get a value from the database.
fn get(&self, key: impl AsRef<[u8]>) -> Option<Vec<u8>>;
}
/// An atomic database operation.
/// An atomic database transaction.
///
/// A transaction is only required to atomically commit. It is not required that two `Get` calls
/// made with the same transaction return the same result, if another transaction wrote to that
/// key.
///
/// If two transactions are created, and both write (including deletions) to the same key, behavior
/// is undefined. The transaction may block, deadlock, panic, overwrite one of the two values
/// randomly, or any other action, at time of write or at time of commit.
#[must_use]
pub trait DbTxn: Send + Get {
/// Write a value to this key.
fn put(&mut self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>);
/// Delete the value from this key.
fn del(&mut self, key: impl AsRef<[u8]>);
/// Commit this transaction.
fn commit(self);
}
/// A database supporting atomic operations.
/// A database supporting atomic transaction.
pub trait Db: 'static + Send + Sync + Clone + Get {
/// The type representing a database transaction.
type Transaction<'a>: DbTxn;
/// Calculate a key for a database entry.
///
/// Keys are separated by the database, the item within the database, and the item's key itself.
fn key(db_dst: &'static [u8], item_dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec<u8> {
let db_len = u8::try_from(db_dst.len()).unwrap();
let dst_len = u8::try_from(item_dst.len()).unwrap();
[[db_len].as_ref(), db_dst, [dst_len].as_ref(), item_dst, key.as_ref()].concat()
}
/// Open a new transaction.
fn txn(&mut self) -> Self::Transaction<'_>;
}

View File

@@ -11,7 +11,7 @@ use crate::*;
#[derive(PartialEq, Eq, Debug)]
pub struct MemDbTxn<'a>(&'a MemDb, HashMap<Vec<u8>, Vec<u8>>, HashSet<Vec<u8>>);
impl<'a> Get for MemDbTxn<'a> {
impl Get for MemDbTxn<'_> {
fn get(&self, key: impl AsRef<[u8]>) -> Option<Vec<u8>> {
if self.2.contains(key.as_ref()) {
return None;
@@ -23,7 +23,7 @@ impl<'a> Get for MemDbTxn<'a> {
.or_else(|| self.0 .0.read().unwrap().get(key.as_ref()).cloned())
}
}
impl<'a> DbTxn for MemDbTxn<'a> {
impl DbTxn for MemDbTxn<'_> {
fn put(&mut self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) {
self.2.remove(key.as_ref());
self.1.insert(key.as_ref().to_vec(), value.as_ref().to_vec());

View File

@@ -280,7 +280,13 @@ pub async fn main_loop<
// Substrate sets this limit to prevent DoSs from malicious validator sets
// That bound lets us consume this txn in the following loop body, as an optimization
assert!(batches.len() <= 1);
for messages::substrate::ExecutedBatch { id, in_instructions } in batches {
for messages::substrate::ExecutedBatch {
id,
publisher,
in_instructions_hash,
in_instruction_results,
} in batches
{
let key_to_activate =
KeyToActivate::<KeyFor<S>>::try_recv(txn.as_mut().unwrap()).map(|key| key.0);
@@ -288,7 +294,9 @@ pub async fn main_loop<
let _: () = scanner.acknowledge_batch(
txn.take().unwrap(),
id,
in_instructions,
publisher,
in_instructions_hash,
in_instruction_results,
/*
`acknowledge_batch` takes burns to optimize handling returns with standard
payments. That's why handling these with a Batch (and not waiting until the

View File

@@ -14,7 +14,7 @@ use messages::sign::{VariantSignId, SignId, ProcessorMessage};
create_db!(
FrostAttemptManager {
Attempted: (id: VariantSignId) -> u32,
Attempted: (session: Session, id: VariantSignId) -> u32,
}
);
@@ -92,11 +92,11 @@ impl<D: Db, M: Clone + PreprocessMachine> SigningProtocol<D, M> {
*/
{
let mut txn = self.db.txn();
let prior_attempted = Attempted::get(&txn, self.id);
let prior_attempted = Attempted::get(&txn, self.session, self.id);
if Some(attempt) <= prior_attempted {
return vec![];
}
Attempted::set(&mut txn, self.id, &attempt);
Attempted::set(&mut txn, self.session, self.id, &attempt);
txn.commit();
}
@@ -278,7 +278,7 @@ impl<D: Db, M: Clone + PreprocessMachine> SigningProtocol<D, M> {
}
/// Cleanup the database entries for a specified signing protocol.
pub(crate) fn cleanup(txn: &mut impl DbTxn, id: VariantSignId) {
Attempted::del(txn, id);
pub(crate) fn cleanup(txn: &mut impl DbTxn, session: Session, id: VariantSignId) {
Attempted::del(txn, session, id);
}
}

View File

@@ -45,7 +45,7 @@ impl<D: Db, M: Clone + PreprocessMachine> AttemptManager<D, M> {
/// Register a signing protocol to attempt.
///
/// This ID must be unique across all sessions, attempt managers, protocols, etc.
/// This ID must be unique to the session, across all attempt managers, protocols, etc.
pub fn register(&mut self, id: VariantSignId, machines: Vec<M>) -> Vec<ProcessorMessage> {
let mut protocol =
SigningProtocol::new(self.db.clone(), self.session, self.start_i, id, machines);
@@ -66,7 +66,7 @@ impl<D: Db, M: Clone + PreprocessMachine> AttemptManager<D, M> {
} else {
log::info!("retired signing protocol {id:?}");
}
SigningProtocol::<D, M>::cleanup(txn, id);
SigningProtocol::<D, M>::cleanup(txn, self.session, id);
}
/// Handle a message for a signing protocol.

View File

@@ -84,7 +84,7 @@ pub mod sign {
pub enum VariantSignId {
Cosign(u64),
Batch(u32),
SlashReport(Session),
SlashReport,
Transaction([u8; 32]),
}
impl fmt::Debug for VariantSignId {
@@ -94,9 +94,7 @@ pub mod sign {
f.debug_struct("VariantSignId::Cosign").field("0", &cosign).finish()
}
Self::Batch(batch) => f.debug_struct("VariantSignId::Batch").field("0", &batch).finish(),
Self::SlashReport(session) => {
f.debug_struct("VariantSignId::SlashReport").field("0", &session).finish()
}
Self::SlashReport => f.debug_struct("VariantSignId::SlashReport").finish(),
Self::Transaction(tx) => {
f.debug_struct("VariantSignId::Transaction").field("0", &hex::encode(tx)).finish()
}
@@ -189,7 +187,9 @@ pub mod substrate {
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
pub struct ExecutedBatch {
pub id: u32,
pub in_instructions: Vec<InInstructionResult>,
pub publisher: Session,
pub in_instructions_hash: [u8; 32],
pub in_instruction_results: Vec<InInstructionResult>,
}
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
@@ -197,6 +197,8 @@ pub mod substrate {
/// Keys set on the Serai blockchain.
SetKeys { serai_time: u64, session: Session, key_pair: KeyPair },
/// Slashes reported on the Serai blockchain OR the process timed out.
///
/// This is the final message for a session,
SlashesReported { session: Session },
/// A block from Serai with relevance to this processor.
Block {

View File

@@ -24,6 +24,7 @@ scale = { package = "parity-scale-codec", version = "3", default-features = fals
borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] }
# Cryptography
blake2 = { version = "0.10", default-features = false, features = ["std"] }
group = { version = "0.13", default-features = false }
# Application
@@ -35,6 +36,7 @@ serai-db = { path = "../../common/db" }
messages = { package = "serai-processor-messages", path = "../messages" }
serai-primitives = { path = "../../substrate/primitives", default-features = false, features = ["std"] }
serai-validator-sets-primitives = { path = "../../substrate/validator-sets/primitives", default-features = false, features = ["std", "borsh"] }
serai-in-instructions-primitives = { path = "../../substrate/in-instructions/primitives", default-features = false, features = ["std", "borsh"] }
serai-coins-primitives = { path = "../../substrate/coins/primitives", default-features = false, features = ["std", "borsh"] }

View File

@@ -7,8 +7,9 @@ use scale::{Encode, Decode, IoReader};
use borsh::{BorshSerialize, BorshDeserialize};
use serai_db::{Get, DbTxn, create_db, db_channel};
use serai_in_instructions_primitives::{InInstructionWithBalance, Batch};
use serai_coins_primitives::OutInstructionWithBalance;
use serai_validator_sets_primitives::Session;
use serai_in_instructions_primitives::{InInstructionWithBalance, Batch};
use primitives::{EncodableG, ReceivedOutput};
@@ -25,11 +26,13 @@ impl<T: BorshSerialize + BorshDeserialize> Borshy for T {}
#[derive(BorshSerialize, BorshDeserialize)]
struct SeraiKeyDbEntry<K: Borshy> {
activation_block_number: u64,
session: Session,
key: K,
}
#[derive(Clone)]
pub(crate) struct SeraiKey<K> {
pub(crate) session: Session,
pub(crate) key: K,
pub(crate) stage: LifetimeStage,
pub(crate) activation_block_number: u64,
@@ -165,7 +168,7 @@ impl<S: ScannerFeed> ScannerGlobalDb<S> {
// If this new key retires a key, mark the block at which forwarding explicitly occurs notable
// This lets us obtain synchrony over the transactions we'll make to accomplish this
if let Some(key_retired_by_this) = keys.last() {
let this_keys_session = if let Some(key_retired_by_this) = keys.last() {
NotableBlock::set(
txn,
Lifetime::calculate::<S>(
@@ -182,10 +185,17 @@ impl<S: ScannerFeed> ScannerGlobalDb<S> {
),
&(),
);
}
Session(key_retired_by_this.session.0 + 1)
} else {
Session(0)
};
// Push and save the next key
keys.push(SeraiKeyDbEntry { activation_block_number, key: EncodableG(key) });
keys.push(SeraiKeyDbEntry {
activation_block_number,
session: this_keys_session,
key: EncodableG(key),
});
ActiveKeys::set(txn, &keys);
// Now tidy the keys, ensuring this has a maximum length of 2
@@ -236,6 +246,7 @@ impl<S: ScannerFeed> ScannerGlobalDb<S> {
raw_keys.get(i + 1).map(|key| key.activation_block_number),
);
keys.push(SeraiKey {
session: raw_keys[i].session,
key: raw_keys[i].key.0,
stage,
activation_block_number: raw_keys[i].activation_block_number,
@@ -477,6 +488,7 @@ db_channel! {
}
pub(crate) struct InInstructionData<S: ScannerFeed> {
pub(crate) session_to_sign_batch: Session,
pub(crate) external_key_for_session_to_sign_batch: KeyFor<S>,
pub(crate) returnable_in_instructions: Vec<Returnable<S>>,
}
@@ -488,7 +500,8 @@ impl<S: ScannerFeed> ScanToReportDb<S> {
block_number: u64,
data: &InInstructionData<S>,
) {
let mut buf = data.external_key_for_session_to_sign_batch.to_bytes().as_ref().to_vec();
let mut buf = data.session_to_sign_batch.encode();
buf.extend(data.external_key_for_session_to_sign_batch.to_bytes().as_ref());
for returnable_in_instruction in &data.returnable_in_instructions {
returnable_in_instruction.write(&mut buf).unwrap();
}
@@ -510,6 +523,7 @@ impl<S: ScannerFeed> ScanToReportDb<S> {
);
let mut buf = data.returnable_in_instructions.as_slice();
let session_to_sign_batch = Session::decode(&mut buf).unwrap();
let external_key_for_session_to_sign_batch = {
let mut external_key_for_session_to_sign_batch =
<KeyFor<S> as GroupEncoding>::Repr::default();
@@ -523,7 +537,11 @@ impl<S: ScannerFeed> ScanToReportDb<S> {
while !buf.is_empty() {
returnable_in_instructions.push(Returnable::read(&mut buf).unwrap());
}
InInstructionData { external_key_for_session_to_sign_batch, returnable_in_instructions }
InInstructionData {
session_to_sign_batch,
external_key_for_session_to_sign_batch,
returnable_in_instructions,
}
}
}

View File

@@ -11,6 +11,7 @@ use borsh::{BorshSerialize, BorshDeserialize};
use serai_db::{Get, DbTxn, Db};
use serai_primitives::{NetworkId, Coin, Amount};
use serai_validator_sets_primitives::Session;
use serai_coins_primitives::OutInstructionWithBalance;
use primitives::{task::*, Address, ReceivedOutput, Block, Payment};
@@ -437,10 +438,13 @@ impl<S: ScannerFeed> Scanner<S> {
/// `queue_burns`. Doing so will cause them to be executed multiple times.
///
/// The calls to this function must be ordered with regards to `queue_burns`.
#[allow(clippy::too_many_arguments)]
pub fn acknowledge_batch(
&mut self,
mut txn: impl DbTxn,
batch_id: u32,
publisher: Session,
in_instructions_hash: [u8; 32],
in_instruction_results: Vec<messages::substrate::InInstructionResult>,
burns: Vec<OutInstructionWithBalance>,
key_to_activate: Option<KeyFor<S>>,
@@ -451,6 +455,8 @@ impl<S: ScannerFeed> Scanner<S> {
substrate::queue_acknowledge_batch::<S>(
&mut txn,
batch_id,
publisher,
in_instructions_hash,
in_instruction_results,
burns,
key_to_activate,

View File

@@ -8,21 +8,32 @@ use borsh::{BorshSerialize, BorshDeserialize};
use serai_db::{Get, DbTxn, create_db};
use serai_primitives::Balance;
use serai_validator_sets_primitives::Session;
use primitives::EncodableG;
use crate::{ScannerFeed, KeyFor, AddressFor};
#[derive(BorshSerialize, BorshDeserialize)]
pub(crate) struct BatchInfo<K: BorshSerialize> {
pub(crate) block_number: u64,
pub(crate) session_to_sign_batch: Session,
pub(crate) external_key_for_session_to_sign_batch: K,
pub(crate) in_instructions_hash: [u8; 32],
}
create_db!(
ScannerReport {
// The next block to potentially report
NextToPotentiallyReportBlock: () -> u64,
// The last session to sign a Batch and their first Batch signed
LastSessionToSignBatchAndFirstBatch: () -> (Session, u32),
// The next Batch ID to use
NextBatchId: () -> u32,
// The block number which caused a batch
BlockNumberForBatch: (batch: u32) -> u64,
// The external key for the session which should sign a batch
ExternalKeyForSessionToSignBatch: (batch: u32) -> Vec<u8>,
// The information needed to verify a batch
InfoForBatch: <G: GroupEncoding>(batch: u32) -> BatchInfo<EncodableG<G>>,
// The return addresses for the InInstructions within a Batch
SerializedReturnAddresses: (batch: u32) -> Vec<u8>,
@@ -36,6 +47,19 @@ pub(crate) struct ReturnInformation<S: ScannerFeed> {
pub(crate) struct ReportDb<S: ScannerFeed>(PhantomData<S>);
impl<S: ScannerFeed> ReportDb<S> {
pub(crate) fn set_last_session_to_sign_batch_and_first_batch(
txn: &mut impl DbTxn,
session: Session,
id: u32,
) {
LastSessionToSignBatchAndFirstBatch::set(txn, &(session, id));
}
pub(crate) fn last_session_to_sign_batch_and_first_batch(
getter: &impl Get,
) -> Option<(Session, u32)> {
LastSessionToSignBatchAndFirstBatch::get(getter)
}
pub(crate) fn set_next_to_potentially_report_block(
txn: &mut impl DbTxn,
next_to_potentially_report_block: u64,
@@ -46,38 +70,37 @@ impl<S: ScannerFeed> ReportDb<S> {
NextToPotentiallyReportBlock::get(getter)
}
pub(crate) fn acquire_batch_id(txn: &mut impl DbTxn, block_number: u64) -> u32 {
pub(crate) fn acquire_batch_id(txn: &mut impl DbTxn) -> u32 {
let id = NextBatchId::get(txn).unwrap_or(0);
NextBatchId::set(txn, &(id + 1));
BlockNumberForBatch::set(txn, id, &block_number);
id
}
pub(crate) fn take_block_number_for_batch(txn: &mut impl DbTxn, id: u32) -> Option<u64> {
BlockNumberForBatch::take(txn, id)
}
pub(crate) fn save_external_key_for_session_to_sign_batch(
pub(crate) fn save_batch_info(
txn: &mut impl DbTxn,
id: u32,
external_key_for_session_to_sign_batch: &KeyFor<S>,
block_number: u64,
session_to_sign_batch: Session,
external_key_for_session_to_sign_batch: KeyFor<S>,
in_instructions_hash: [u8; 32],
) {
ExternalKeyForSessionToSignBatch::set(
InfoForBatch::set(
txn,
id,
&external_key_for_session_to_sign_batch.to_bytes().as_ref().to_vec(),
&BatchInfo {
block_number,
session_to_sign_batch,
external_key_for_session_to_sign_batch: EncodableG(external_key_for_session_to_sign_batch),
in_instructions_hash,
},
);
}
pub(crate) fn take_external_key_for_session_to_sign_batch(
pub(crate) fn take_info_for_batch(
txn: &mut impl DbTxn,
id: u32,
) -> Option<KeyFor<S>> {
ExternalKeyForSessionToSignBatch::get(txn, id).map(|key_vec| {
let mut key = <KeyFor<S> as GroupEncoding>::Repr::default();
key.as_mut().copy_from_slice(&key_vec);
KeyFor::<S>::from_bytes(&key).unwrap()
})
) -> Option<BatchInfo<EncodableG<KeyFor<S>>>> {
InfoForBatch::take(txn, id)
}
pub(crate) fn save_return_information(

View File

@@ -1,35 +1,29 @@
use core::{marker::PhantomData, future::Future};
use blake2::{digest::typenum::U32, Digest, Blake2b};
use scale::Encode;
use serai_db::{DbTxn, Db};
use serai_primitives::BlockHash;
use serai_validator_sets_primitives::Session;
use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch};
use primitives::task::ContinuallyRan;
use primitives::{EncodableG, task::ContinuallyRan};
use crate::{
db::{Returnable, ScannerGlobalDb, InInstructionData, ScanToReportDb, Batches, BatchesToSign},
index,
scan::next_to_scan_for_outputs_block,
ScannerFeed, KeyFor,
substrate, ScannerFeed, KeyFor,
};
mod db;
pub(crate) use db::ReturnInformation;
pub(crate) use db::{BatchInfo, ReturnInformation};
use db::ReportDb;
pub(crate) fn take_block_number_for_batch<S: ScannerFeed>(
pub(crate) fn take_info_for_batch<S: ScannerFeed>(
txn: &mut impl DbTxn,
id: u32,
) -> Option<u64> {
ReportDb::<S>::take_block_number_for_batch(txn, id)
}
pub(crate) fn take_external_key_for_session_to_sign_batch<S: ScannerFeed>(
txn: &mut impl DbTxn,
id: u32,
) -> Option<KeyFor<S>> {
ReportDb::<S>::take_external_key_for_session_to_sign_batch(txn, id)
) -> Option<BatchInfo<EncodableG<KeyFor<S>>>> {
ReportDb::<S>::take_info_for_batch(txn, id)
}
pub(crate) fn take_return_information<S: ScannerFeed>(
@@ -88,33 +82,85 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
let next_to_potentially_report = ReportDb::<S>::next_to_potentially_report_block(&self.db)
.expect("ReportTask run before writing the start block");
for b in next_to_potentially_report ..= highest_reportable {
for block_number in next_to_potentially_report ..= highest_reportable {
let mut txn = self.db.txn();
// Receive the InInstructions for this block
// We always do this as we can't trivially tell if we should recv InInstructions before we
// do
let InInstructionData {
session_to_sign_batch,
external_key_for_session_to_sign_batch,
returnable_in_instructions: in_instructions,
} = ScanToReportDb::<S>::recv_in_instructions(&mut txn, b);
let notable = ScannerGlobalDb::<S>::is_block_notable(&txn, b);
} = ScanToReportDb::<S>::recv_in_instructions(&mut txn, block_number);
let notable = ScannerGlobalDb::<S>::is_block_notable(&txn, block_number);
if !notable {
assert!(in_instructions.is_empty(), "block wasn't notable yet had InInstructions");
}
// If this block is notable, create the Batch(s) for it
if notable {
let network = S::NETWORK;
let block_hash = index::block_id(&txn, b);
let mut batch_id = ReportDb::<S>::acquire_batch_id(&mut txn, b);
let mut batch_id = ReportDb::<S>::acquire_batch_id(&mut txn);
/*
If this is the handover Batch, the first Batch signed by a session which retires the
prior validator set, then this should only be signed after the prior validator set's
actions are fully validated.
The new session will only be responsible for signing this Batch if the prior key has
retired, successfully completed all its on-external-network actions.
We check here the prior session has successfully completed all its on-Serai-network
actions by ensuring we've validated all Batches expected from it. Only then do we sign
the Batch confirming the handover.
We also wait for the Batch confirming the handover to be accepted on-chain, ensuring we
don't verify the prior session's Batches, sign the handover Batch and the following
Batch, have the prior session publish a malicious Batch where our handover Batch should
be, before our following Batch becomes our handover Batch.
*/
if session_to_sign_batch != Session(0) {
// We may have Session(1)'s first Batch be Batch 0 if Session(0) never publishes a
// Batch. This is fine as we'll hit the distinct Session check and then set the correct
// values into this DB entry. All other sessions must complete the handover process,
// which requires having published at least one Batch
let (last_session, first_batch) =
ReportDb::<S>::last_session_to_sign_batch_and_first_batch(&txn)
.unwrap_or((Session(0), 0));
// Because this boolean was expanded, we lose short-circuiting. That's fine
let handover_batch = last_session != session_to_sign_batch;
let batch_after_handover_batch =
(last_session == session_to_sign_batch) && ((first_batch + 1) == batch_id);
if handover_batch || batch_after_handover_batch {
let verified_prior_batch = substrate::last_acknowledged_batch::<S>(&txn)
// Since `batch_id = 0` in the Session(0)-never-published-a-Batch case, we don't
// check `last_acknowledged_batch >= (batch_id - 1)` but instead this
.map(|last_acknowledged_batch| (last_acknowledged_batch + 1) >= batch_id)
// We've never verified any Batches
.unwrap_or(false);
if !verified_prior_batch {
// Drop this txn, restoring the Batch to be worked on in the future
drop(txn);
return Ok(block_number > next_to_potentially_report);
}
}
// If this is the handover Batch, update the last session to sign a Batch
if handover_batch {
ReportDb::<S>::set_last_session_to_sign_batch_and_first_batch(
&mut txn,
session_to_sign_batch,
batch_id,
);
}
}
// TODO: The above code doesn't work if we end up with two Batches (the handover and the
// following) within this one Block due to Batch size limits
// start with empty batch
let mut batches = vec![Batch {
network,
id: batch_id,
block: BlockHash(block_hash),
instructions: vec![],
}];
let mut batches = vec![Batch { network, id: batch_id, instructions: vec![] }];
// We also track the return information for the InInstructions within a Batch in case
// they error
let mut return_information = vec![vec![]];
@@ -131,15 +177,10 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
let in_instruction = batch.instructions.pop().unwrap();
// bump the id for the new batch
batch_id = ReportDb::<S>::acquire_batch_id(&mut txn, b);
batch_id = ReportDb::<S>::acquire_batch_id(&mut txn);
// make a new batch with this instruction included
batches.push(Batch {
network,
id: batch_id,
block: BlockHash(block_hash),
instructions: vec![in_instruction],
});
batches.push(Batch { network, id: batch_id, instructions: vec![in_instruction] });
// Since we're allocating a new batch, allocate a new set of return addresses for it
return_information.push(vec![]);
}
@@ -152,14 +193,17 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
.push(return_address.map(|address| ReturnInformation { address, balance }));
}
// Save the return addresses to the database
// Now that we've finalized the Batches, save the information for each to the database
assert_eq!(batches.len(), return_information.len());
for (batch, return_information) in batches.iter().zip(&return_information) {
assert_eq!(batch.instructions.len(), return_information.len());
ReportDb::<S>::save_external_key_for_session_to_sign_batch(
ReportDb::<S>::save_batch_info(
&mut txn,
batch.id,
&external_key_for_session_to_sign_batch,
block_number,
session_to_sign_batch,
external_key_for_session_to_sign_batch,
Blake2b::<U32>::digest(batch.instructions.encode()).into(),
);
ReportDb::<S>::save_return_information(&mut txn, batch.id, return_information);
}
@@ -171,7 +215,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
}
// Update the next to potentially report block
ReportDb::<S>::set_next_to_potentially_report_block(&mut txn, b + 1);
ReportDb::<S>::set_next_to_potentially_report_block(&mut txn, block_number + 1);
txn.commit();
}

View File

@@ -349,6 +349,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanTask<D, S> {
&mut txn,
b,
&InInstructionData {
session_to_sign_batch: keys[0].session,
external_key_for_session_to_sign_batch: keys[0].key,
returnable_in_instructions: in_instructions,
},

View File

@@ -6,12 +6,15 @@ use borsh::{BorshSerialize, BorshDeserialize};
use serai_db::{Get, DbTxn, create_db, db_channel};
use serai_coins_primitives::OutInstructionWithBalance;
use serai_validator_sets_primitives::Session;
use crate::{ScannerFeed, KeyFor};
#[derive(BorshSerialize, BorshDeserialize)]
struct AcknowledgeBatchEncodable {
batch_id: u32,
publisher: Session,
in_instructions_hash: [u8; 32],
in_instruction_results: Vec<messages::substrate::InInstructionResult>,
burns: Vec<OutInstructionWithBalance>,
key_to_activate: Option<Vec<u8>>,
@@ -25,6 +28,8 @@ enum ActionEncodable {
pub(crate) struct AcknowledgeBatch<S: ScannerFeed> {
pub(crate) batch_id: u32,
pub(crate) publisher: Session,
pub(crate) in_instructions_hash: [u8; 32],
pub(crate) in_instruction_results: Vec<messages::substrate::InInstructionResult>,
pub(crate) burns: Vec<OutInstructionWithBalance>,
pub(crate) key_to_activate: Option<KeyFor<S>>,
@@ -35,6 +40,12 @@ pub(crate) enum Action<S: ScannerFeed> {
QueueBurns(Vec<OutInstructionWithBalance>),
}
create_db!(
ScannerSubstrate {
LastAcknowledgedBatch: () -> u32,
}
);
db_channel!(
ScannerSubstrate {
Actions: () -> ActionEncodable,
@@ -43,9 +54,19 @@ db_channel!(
pub(crate) struct SubstrateDb<S: ScannerFeed>(PhantomData<S>);
impl<S: ScannerFeed> SubstrateDb<S> {
pub(crate) fn last_acknowledged_batch(getter: &impl Get) -> Option<u32> {
LastAcknowledgedBatch::get(getter)
}
pub(crate) fn set_last_acknowledged_batch(txn: &mut impl DbTxn, id: u32) {
LastAcknowledgedBatch::set(txn, &id)
}
pub(crate) fn queue_acknowledge_batch(
txn: &mut impl DbTxn,
batch_id: u32,
publisher: Session,
in_instructions_hash: [u8; 32],
in_instruction_results: Vec<messages::substrate::InInstructionResult>,
burns: Vec<OutInstructionWithBalance>,
key_to_activate: Option<KeyFor<S>>,
@@ -54,6 +75,8 @@ impl<S: ScannerFeed> SubstrateDb<S> {
txn,
&ActionEncodable::AcknowledgeBatch(AcknowledgeBatchEncodable {
batch_id,
publisher,
in_instructions_hash,
in_instruction_results,
burns,
key_to_activate: key_to_activate.map(|key| key.to_bytes().as_ref().to_vec()),
@@ -69,11 +92,15 @@ impl<S: ScannerFeed> SubstrateDb<S> {
Some(match action_encodable {
ActionEncodable::AcknowledgeBatch(AcknowledgeBatchEncodable {
batch_id,
publisher,
in_instructions_hash,
in_instruction_results,
burns,
key_to_activate,
}) => Action::AcknowledgeBatch(AcknowledgeBatch {
batch_id,
publisher,
in_instructions_hash,
in_instruction_results,
burns,
key_to_activate: key_to_activate.map(|key| {

View File

@@ -1,8 +1,9 @@
use core::{marker::PhantomData, future::Future};
use serai_db::{DbTxn, Db};
use serai_db::{Get, DbTxn, Db};
use serai_coins_primitives::{OutInstruction, OutInstructionWithBalance};
use serai_validator_sets_primitives::Session;
use primitives::task::ContinuallyRan;
use crate::{
@@ -13,9 +14,14 @@ use crate::{
mod db;
use db::*;
pub(crate) fn last_acknowledged_batch<S: ScannerFeed>(getter: &impl Get) -> Option<u32> {
SubstrateDb::<S>::last_acknowledged_batch(getter)
}
pub(crate) fn queue_acknowledge_batch<S: ScannerFeed>(
txn: &mut impl DbTxn,
batch_id: u32,
publisher: Session,
in_instructions_hash: [u8; 32],
in_instruction_results: Vec<messages::substrate::InInstructionResult>,
burns: Vec<OutInstructionWithBalance>,
key_to_activate: Option<KeyFor<S>>,
@@ -23,6 +29,8 @@ pub(crate) fn queue_acknowledge_batch<S: ScannerFeed>(
SubstrateDb::<S>::queue_acknowledge_batch(
txn,
batch_id,
publisher,
in_instructions_hash,
in_instruction_results,
burns,
key_to_activate,
@@ -67,28 +75,39 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for SubstrateTask<D, S> {
match action {
Action::AcknowledgeBatch(AcknowledgeBatch {
batch_id,
publisher,
in_instructions_hash,
in_instruction_results,
mut burns,
key_to_activate,
}) => {
// Check if we have the information for this batch
let Some(block_number) = report::take_block_number_for_batch::<S>(&mut txn, batch_id)
let Some(report::BatchInfo {
block_number,
session_to_sign_batch,
external_key_for_session_to_sign_batch,
in_instructions_hash: expected_in_instructions_hash,
}) = report::take_info_for_batch::<S>(&mut txn, batch_id)
else {
// If we don't, drop this txn (restoring the action to the database)
drop(txn);
return Ok(made_progress);
};
assert_eq!(
publisher, session_to_sign_batch,
"batch acknowledged on-chain was acknowledged by an unexpected publisher"
);
assert_eq!(
in_instructions_hash, expected_in_instructions_hash,
"batch acknowledged on-chain was distinct"
);
{
let external_key_for_session_to_sign_batch =
report::take_external_key_for_session_to_sign_batch::<S>(&mut txn, batch_id)
.unwrap();
SubstrateDb::<S>::set_last_acknowledged_batch(&mut txn, batch_id);
AcknowledgedBatches::send(
&mut txn,
&external_key_for_session_to_sign_batch,
&external_key_for_session_to_sign_batch.0,
batch_id,
);
}
// Mark we made progress and handle this
made_progress = true;

View File

@@ -376,6 +376,12 @@ impl<
/// This is a cheap call and able to be done inline from a higher-level loop.
pub fn queue_message(&mut self, txn: &mut impl DbTxn, message: &CoordinatorMessage) {
let sign_id = message.sign_id();
// Don't queue messages for already retired keys
if Some(sign_id.session.0) <= db::LatestRetiredSession::get(txn).map(|session| session.0) {
return;
}
let tasks = self.tasks.get(&sign_id.session);
match sign_id.id {
VariantSignId::Cosign(_) => {
@@ -390,7 +396,7 @@ impl<
tasks.batch.run_now();
}
}
VariantSignId::SlashReport(_) => {
VariantSignId::SlashReport => {
db::CoordinatorToSlashReportSignerMessages::send(txn, sign_id.session, message);
if let Some(tasks) = tasks {
tasks.slash_report.run_now();
@@ -415,6 +421,11 @@ impl<
block_number: u64,
block: [u8; 32],
) {
// Don't cosign blocks with already retired keys
if Some(session.0) <= db::LatestRetiredSession::get(txn).map(|session| session.0) {
return;
}
db::ToCosign::set(&mut txn, session, &(block_number, block));
txn.commit();
@@ -432,6 +443,11 @@ impl<
session: Session,
slash_report: &Vec<Slash>,
) {
// Don't sign slash reports with already retired keys
if Some(session.0) <= db::LatestRetiredSession::get(txn).map(|session| session.0) {
return;
}
db::SlashReport::send(&mut txn, session, slash_report);
txn.commit();

View File

@@ -79,8 +79,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for SlashReportSignerTask<D, S> {
}
}
let mut txn = self.db.txn();
for msg in self.attempt_manager.register(VariantSignId::SlashReport(self.session), machines)
{
for msg in self.attempt_manager.register(VariantSignId::SlashReport, machines) {
SlashReportSignerToCoordinatorMessages::send(&mut txn, self.session, &msg);
}
txn.commit();
@@ -102,14 +101,15 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for SlashReportSignerTask<D, S> {
}
}
Response::Signature { id, signature } => {
let VariantSignId::SlashReport(session) = id else {
panic!("SlashReportSignerTask signed a non-SlashReport")
};
assert_eq!(session, self.session);
assert_eq!(id, VariantSignId::SlashReport);
// Drain the channel
SlashReport::try_recv(&mut txn, self.session).unwrap();
// Send the signature
SlashReportSignature::send(&mut txn, session, &Signature::from(signature).encode());
SlashReportSignature::send(
&mut txn,
self.session,
&Signature::from(signature).encode(),
);
}
}

View File

@@ -13,13 +13,6 @@ const PALLET: &str = "InInstructions";
#[derive(Clone, Copy)]
pub struct SeraiInInstructions<'a>(pub(crate) &'a TemporalSerai<'a>);
impl<'a> SeraiInInstructions<'a> {
pub async fn latest_block_for_network(
&self,
network: NetworkId,
) -> Result<Option<BlockHash>, SeraiError> {
self.0.storage(PALLET, "LatestNetworkBlock", network).await
}
pub async fn last_batch_for_network(
&self,
network: NetworkId,

View File

@@ -25,9 +25,6 @@ serai_test!(
let network = NetworkId::Bitcoin;
let id = 0;
let mut block_hash = BlockHash([0; 32]);
OsRng.fill_bytes(&mut block_hash.0);
let mut address = SeraiAddress::new([0; 32]);
OsRng.fill_bytes(&mut address.0);
@@ -38,7 +35,6 @@ serai_test!(
let batch = Batch {
network,
id,
block: block_hash,
instructions: vec![InInstructionWithBalance {
instruction: InInstruction::Transfer(address),
balance,
@@ -50,15 +46,12 @@ serai_test!(
let serai = serai.as_of(block);
{
let serai = serai.in_instructions();
let latest_finalized = serai.latest_block_for_network(network).await.unwrap();
assert_eq!(latest_finalized, Some(block_hash));
let batches = serai.batch_events().await.unwrap();
assert_eq!(
batches,
vec![InInstructionsEvent::Batch {
network,
id,
block: block_hash,
instructions_hash: Blake2b::<U32>::digest(batch.instructions.encode()).into(),
}]
);

View File

@@ -52,7 +52,6 @@ pub async fn provide_batch(serai: &Serai, batch: Batch) -> [u8; 32] {
vec![InInstructionsEvent::Batch {
network: batch.network,
id: batch.id,
block: batch.block,
instructions_hash: Blake2b::<U32>::digest(batch.instructions.encode()).into(),
}],
);

View File

@@ -89,12 +89,6 @@ pub mod pallet {
#[pallet::storage]
pub(crate) type Halted<T: Config> = StorageMap<_, Identity, NetworkId, (), OptionQuery>;
// The latest block a network has acknowledged as finalized
#[pallet::storage]
#[pallet::getter(fn latest_network_block)]
pub(crate) type LatestNetworkBlock<T: Config> =
StorageMap<_, Identity, NetworkId, BlockHash, OptionQuery>;
impl<T: Config> Pallet<T> {
// Use a dedicated transaction layer when executing this InInstruction
// This lets it individually error without causing any storage modifications
@@ -262,11 +256,9 @@ pub mod pallet {
let batch = batch.batch;
LatestNetworkBlock::<T>::insert(batch.network, batch.block);
Self::deposit_event(Event::Batch {
network: batch.network,
id: batch.id,
block: batch.block,
instructions_hash: blake2_256(&batch.instructions.encode()),
});
for (i, instruction) in batch.instructions.into_iter().enumerate() {

View File

@@ -19,8 +19,7 @@ use sp_application_crypto::sr25519::Signature;
use sp_std::vec::Vec;
use sp_runtime::RuntimeDebug;
#[rustfmt::skip]
use serai_primitives::{BlockHash, Balance, NetworkId, SeraiAddress, ExternalAddress, system_address};
use serai_primitives::{Balance, NetworkId, SeraiAddress, ExternalAddress, system_address};
mod shorthand;
pub use shorthand::*;
@@ -107,7 +106,6 @@ pub struct InInstructionWithBalance {
pub struct Batch {
pub network: NetworkId,
pub id: u32,
pub block: BlockHash,
pub instructions: Vec<InInstructionWithBalance>,
}