diff --git a/Cargo.lock b/Cargo.lock index 980d9405..2c649c36 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8998,6 +8998,7 @@ dependencies = [ "hex", "parity-scale-codec", "serai-coins-primitives", + "serai-cosign", "serai-in-instructions-primitives", "serai-primitives", "serai-validator-sets-primitives", diff --git a/coordinator/cosign/src/intend.rs b/coordinator/cosign/src/intend.rs index 9fa229c5..ebe3513c 100644 --- a/coordinator/cosign/src/intend.rs +++ b/coordinator/cosign/src/intend.rs @@ -88,7 +88,6 @@ impl ContinuallyRan for CosignIntendTask { } let block_hash = block.hash(); SubstrateBlockHash::set(&mut txn, block_number, &block_hash); - SubstrateBlockNumber::set(&mut txn, block_hash, &block_number); let global_session_for_this_block = LatestGlobalSessionIntended::get(&txn); diff --git a/coordinator/src/db.rs b/coordinator/src/db.rs index 4e932306..012d0257 100644 --- a/coordinator/src/db.rs +++ b/coordinator/src/db.rs @@ -8,7 +8,7 @@ use serai_client::{ validator_sets::primitives::{Session, ValidatorSet}, }; -use serai_cosign::CosignIntent; +use serai_cosign::SignedCosign; use serai_coordinator_substrate::NewSetInformation; @@ -66,14 +66,24 @@ pub(crate) fn prune_tributary_db(set: ValidatorSet) { create_db! { Coordinator { + // The currently active Tributaries ActiveTributaries: () -> Vec, + // The latest Tributary to have been retired for a network + // Since Tributaries are retired sequentially, this is informative to if any Tributary has been + // retired RetiredTributary: (network: NetworkId) -> Session, + // The last handled message from a Processor + LastProcessorMessage: (network: NetworkId) -> u64, + // Cosigns we produced and tried to intake yet incurred an error while doing so + ErroneousCosigns: () -> Vec, } } db_channel! { Coordinator { + // Tributaries to clean up upon reboot TributaryCleanup: () -> ValidatorSet, - PendingCosigns: (set: ValidatorSet) -> CosignIntent, + // Cosigns we produced + SignedCosigns: () -> SignedCosign, } } diff --git a/coordinator/src/tributary.rs b/coordinator/src/tributary.rs index 76a034d5..f09c14cd 100644 --- a/coordinator/src/tributary.rs +++ b/coordinator/src/tributary.rs @@ -8,7 +8,7 @@ use ciphersuite::{Ciphersuite, Ristretto}; use tokio::sync::mpsc; -use serai_db::{DbTxn, Db as DbTrait}; +use serai_db::{Get, DbTxn, Db as DbTrait, create_db, db_channel}; use scale::Encode; use serai_client::validator_sets::primitives::ValidatorSet; @@ -19,16 +19,23 @@ use serai_task::{Task, TaskHandle, ContinuallyRan}; use message_queue::{Service, Metadata, client::MessageQueue}; -use serai_cosign::Cosigning; +use serai_cosign::{Faulted, CosignIntent, Cosigning}; use serai_coordinator_substrate::{NewSetInformation, SignSlashReport}; -use serai_coordinator_tributary::{Transaction, ProcessorMessages, ScanTributaryTask}; +use serai_coordinator_tributary::{Transaction, ProcessorMessages, CosignIntents, ScanTributaryTask}; use serai_coordinator_p2p::P2p; use crate::Db; +db_channel! { + Coordinator { + PendingCosigns: (set: ValidatorSet) -> CosignIntent, + } +} + /// Provides Cosign/Cosigned Transactions onto the Tributary. pub(crate) struct ProvideCosignCosignedTransactionsTask { db: CD, + tributary_db: TD, set: NewSetInformation, tributary: Tributary, } @@ -79,16 +86,27 @@ impl ContinuallyRan let mut txn = self.db.txn(); // Fetch the next cosign this tributary should handle - let Some(cosign) = crate::PendingCosigns::try_recv(&mut txn, self.set.set) else { break }; + let Some(cosign) = PendingCosigns::try_recv(&mut txn, self.set.set) else { break }; pending_notable_cosign = cosign.notable; // If we (Serai) haven't cosigned this block, break as this is still pending - let Ok(latest) = Cosigning::::latest_cosigned_block_number(&txn) else { break }; + let latest = match Cosigning::::latest_cosigned_block_number(&txn) { + Ok(latest) => latest, + Err(Faulted) => { + log::error!("cosigning faulted"); + Err("cosigning faulted")? + } + }; if latest < cosign.block_number { break; } // Because we've cosigned it, provide the TX for that + { + let mut txn = self.tributary_db.txn(); + CosignIntents::provide(&mut txn, self.set.set, &cosign); + txn.commit(); + } provide_transaction( self.set.set, &self.tributary, @@ -109,7 +127,7 @@ impl ContinuallyRan // intended_cosigns will only yield up to and including the next notable cosign for cosign in Cosigning::::intended_cosigns(&mut txn, self.set.set) { // Flag this cosign as pending - crate::PendingCosigns::send(&mut txn, self.set.set, &cosign); + PendingCosigns::send(&mut txn, self.set.set, &cosign); // Provide the transaction to queue it for work provide_transaction( self.set.set, @@ -293,6 +311,7 @@ pub(crate) async fn spawn_tributary( tokio::spawn( (ProvideCosignCosignedTransactionsTask { db: db.clone(), + tributary_db: tributary_db.clone(), set: set.clone(), tributary: tributary.clone(), }) @@ -313,7 +332,7 @@ pub(crate) async fn spawn_tributary( // Spawn the scan task let (scan_tributary_task_def, scan_tributary_task) = Task::new(); tokio::spawn( - ScanTributaryTask::<_, _, P>::new(db.clone(), tributary_db.clone(), &set, reader) + ScanTributaryTask::<_, P>::new(tributary_db.clone(), &set, reader) // This is the only handle for this TributaryProcessorMessagesTask, so when this task is // dropped, it will be too .continually_run(scan_tributary_task_def, vec![scan_tributary_messages_task]), diff --git a/coordinator/tributary/src/db.rs b/coordinator/tributary/src/db.rs index c48393af..9d426d96 100644 --- a/coordinator/tributary/src/db.rs +++ b/coordinator/tributary/src/db.rs @@ -9,6 +9,8 @@ use messages::sign::{VariantSignId, SignId}; use serai_db::*; +use serai_cosign::CosignIntent; + use crate::transaction::SigningProtocolRound; /// A topic within the database which the group participates in @@ -187,6 +189,8 @@ create_db!( // The slash points a validator has accrued, with u32::MAX representing a fatal slash. SlashPoints: (set: ValidatorSet, validator: SeraiAddress) -> u32, + // The cosign intent for a Substrate block + CosignIntents: (set: ValidatorSet, substrate_block_hash: [u8; 32]) -> CosignIntent, // The latest Substrate block to cosign. LatestSubstrateBlockToCosign: (set: ValidatorSet) -> [u8; 32], // The hash of the block we're actively cosigning. diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index 686af18d..91a77a62 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -24,7 +24,7 @@ use tributary_sdk::{ Transaction as TributaryTransaction, Block, TributaryReader, P2p, }; -use serai_cosign::Cosigning; +use serai_cosign::CosignIntent; use serai_coordinator_substrate::NewSetInformation; use messages::sign::VariantSignId; @@ -45,17 +45,34 @@ impl ProcessorMessages { } } -struct ScanBlock<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> { +/// The cosign intents. +pub struct CosignIntents; +impl CosignIntents { + /// Provide a CosignIntent for this Tributary. + /// + /// This must be done before the associated `Transaction::Cosign` is provided. + pub fn provide(txn: &mut impl DbTxn, set: ValidatorSet, intent: &CosignIntent) { + db::CosignIntents::set(txn, set, intent.block_hash, intent); + } + fn take( + txn: &mut impl DbTxn, + set: ValidatorSet, + substrate_block_hash: [u8; 32], + ) -> Option { + db::CosignIntents::take(txn, set, substrate_block_hash) + } +} + +struct ScanBlock<'a, TD: Db, TDT: DbTxn, P: P2p> { _td: PhantomData, _p2p: PhantomData

, - cosign_db: &'a CD, tributary_txn: &'a mut TDT, set: ValidatorSet, validators: &'a [SeraiAddress], total_weight: u64, validator_weights: &'a HashMap, } -impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> { +impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { fn potentially_start_cosign(&mut self) { // Don't start a new cosigning instance if we're actively running one if TributaryDb::actively_cosigning(self.tributary_txn, self.set).is_some() { @@ -74,20 +91,20 @@ impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> { return; } - let Some(substrate_block_number) = - Cosigning::::finalized_block_number(self.cosign_db, latest_substrate_block_to_cosign) - else { - // This is a valid panic as we shouldn't be scanning this block if we didn't provide all - // Provided transactions within it, and the block to cosign is a Provided transaction - panic!("cosigning a block our cosigner didn't index") - }; + let intent = + CosignIntents::take(self.tributary_txn, self.set, latest_substrate_block_to_cosign) + .expect("Transaction::Cosign locally provided but CosignIntents wasn't populated"); + assert_eq!( + intent.block_hash, latest_substrate_block_to_cosign, + "provided CosignIntent wasn't saved by its block hash" + ); // Mark us as actively cosigning TributaryDb::start_cosigning( self.tributary_txn, self.set, latest_substrate_block_to_cosign, - substrate_block_number, + intent.block_number, ); // Send the message for the processor to start signing TributaryDb::send_message( @@ -95,8 +112,7 @@ impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> { self.set, messages::coordinator::CoordinatorMessage::CosignSubstrateBlock { session: self.set.session, - block_number: substrate_block_number, - block: latest_substrate_block_to_cosign, + intent, }, ); } @@ -411,8 +427,7 @@ impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> { } /// The task to scan the Tributary, populating `ProcessorMessages`. -pub struct ScanTributaryTask { - cosign_db: CD, +pub struct ScanTributaryTask { tributary_db: TD, set: ValidatorSet, validators: Vec, @@ -422,10 +437,9 @@ pub struct ScanTributaryTask { _p2p: PhantomData

, } -impl ScanTributaryTask { +impl ScanTributaryTask { /// Create a new instance of this task. pub fn new( - cosign_db: CD, tributary_db: TD, new_set: &NewSetInformation, tributary: TributaryReader, @@ -442,7 +456,6 @@ impl ScanTributaryTask { } ScanTributaryTask { - cosign_db, tributary_db, set: new_set.set, validators, @@ -454,7 +467,7 @@ impl ScanTributaryTask { } } -impl ContinuallyRan for ScanTributaryTask { +impl ContinuallyRan for ScanTributaryTask { fn run_iteration(&mut self) -> impl Send + Future> { async move { let (mut last_block_number, mut last_block_hash) = @@ -486,7 +499,6 @@ impl ContinuallyRan for ScanTributaryTask { (ScanBlock { _td: PhantomData::, _p2p: PhantomData::

, - cosign_db: &self.cosign_db, tributary_txn: &mut tributary_txn, set: self.set, validators: &self.validators, diff --git a/processor/messages/Cargo.toml b/processor/messages/Cargo.toml index 03dc0441..b1387301 100644 --- a/processor/messages/Cargo.toml +++ b/processor/messages/Cargo.toml @@ -29,3 +29,5 @@ serai-primitives = { path = "../../substrate/primitives", default-features = fal in-instructions-primitives = { package = "serai-in-instructions-primitives", path = "../../substrate/in-instructions/primitives", default-features = false, features = ["std", "borsh"] } coins-primitives = { package = "serai-coins-primitives", path = "../../substrate/coins/primitives", default-features = false, features = ["std", "borsh"] } validator-sets-primitives = { package = "serai-validator-sets-primitives", path = "../../substrate/validator-sets/primitives", default-features = false, features = ["std", "borsh"] } + +serai-cosign = { path = "../../coordinator/cosign", default-features = false } diff --git a/processor/messages/src/lib.rs b/processor/messages/src/lib.rs index ec072fe5..5cda454b 100644 --- a/processor/messages/src/lib.rs +++ b/processor/messages/src/lib.rs @@ -11,6 +11,8 @@ use validator_sets_primitives::{Session, KeyPair, Slash}; use coins_primitives::OutInstructionWithBalance; use in_instructions_primitives::SignedBatch; +use serai_cosign::{CosignIntent, SignedCosign}; + #[derive(Clone, Copy, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] pub struct SubstrateContext { pub serai_time: u64, @@ -50,7 +52,8 @@ pub mod key_gen { } } - #[derive(Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize)] + // This set of messages is sent entirely and solely by serai-processor-key-gen. + #[derive(Clone, BorshSerialize, BorshDeserialize)] pub enum ProcessorMessage { // Participated in the specified key generation protocol. Participation { session: Session, participation: Vec }, @@ -141,7 +144,8 @@ pub mod sign { } } - #[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] + // This set of messages is sent entirely and solely by serai-processor-frost-attempt-manager. + #[derive(Clone, Debug, BorshSerialize, BorshDeserialize)] pub enum ProcessorMessage { // Participant sent an invalid message during the sign protocol. InvalidParticipant { session: Session, participant: Participant }, @@ -155,39 +159,25 @@ pub mod sign { pub mod coordinator { use super::*; - // TODO: Remove this for the one defined in serai-cosign - pub fn cosign_block_msg(block_number: u64, block: [u8; 32]) -> Vec { - const DST: &[u8] = b"Cosign"; - let mut res = vec![u8::try_from(DST.len()).unwrap()]; - res.extend(DST); - res.extend(block_number.to_le_bytes()); - res.extend(block); - res - } - #[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] pub enum CoordinatorMessage { /// Cosign the specified Substrate block. /// /// This is sent by the Coordinator's Tributary scanner. - CosignSubstrateBlock { session: Session, block_number: u64, block: [u8; 32] }, + CosignSubstrateBlock { session: Session, intent: CosignIntent }, /// Sign the slash report for this session. /// /// This is sent by the Coordinator's Tributary scanner. SignSlashReport { session: Session, report: Vec }, } - #[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] - pub struct PlanMeta { - pub session: Session, - pub id: [u8; 32], - } - - #[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] + // This set of messages is sent entirely and solely by serai-processor-bin's implementation of + // the signers::Coordinator trait. + // TODO: Move message creation into serai-processor-signers + #[derive(Clone, Debug, BorshSerialize, BorshDeserialize)] pub enum ProcessorMessage { - CosignedBlock { block_number: u64, block: [u8; 32], signature: Vec }, + CosignedBlock { cosign: SignedCosign }, SignedBatch { batch: SignedBatch }, - SubstrateBlockAck { block: u64, plans: Vec }, SignedSlashReport { session: Session, signature: Vec }, } } @@ -231,17 +221,16 @@ pub mod substrate { }, } - #[derive(Clone, PartialEq, Eq, Debug)] - pub enum ProcessorMessage {} - impl BorshSerialize for ProcessorMessage { - fn serialize(&self, _writer: &mut W) -> borsh::io::Result<()> { - unimplemented!() - } + #[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] + pub struct PlanMeta { + pub session: Session, + pub transaction: [u8; 32], } - impl BorshDeserialize for ProcessorMessage { - fn deserialize_reader(_reader: &mut R) -> borsh::io::Result { - unimplemented!() - } + + #[derive(Clone, Debug, BorshSerialize, BorshDeserialize)] + pub enum ProcessorMessage { + // TODO: Have the processor send this + SubstrateBlockAck { block: u64, plans: Vec }, } } @@ -268,7 +257,7 @@ impl_from!(sign, CoordinatorMessage, Sign); impl_from!(coordinator, CoordinatorMessage, Coordinator); impl_from!(substrate, CoordinatorMessage, Substrate); -#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] +#[derive(Clone, Debug, BorshSerialize, BorshDeserialize)] pub enum ProcessorMessage { KeyGen(key_gen::ProcessorMessage), Sign(sign::ProcessorMessage), @@ -331,8 +320,8 @@ impl CoordinatorMessage { CoordinatorMessage::Coordinator(msg) => { let (sub, id) = match msg { // We only cosign a block once, and Reattempt is a separate message - coordinator::CoordinatorMessage::CosignSubstrateBlock { block_number, .. } => { - (0, block_number.encode()) + coordinator::CoordinatorMessage::CosignSubstrateBlock { intent, .. } => { + (0, intent.block_number.encode()) } // We only sign one slash report, and Reattempt is a separate message coordinator::CoordinatorMessage::SignSlashReport { session, .. } => (1, session.encode()), @@ -404,17 +393,26 @@ impl ProcessorMessage { } ProcessorMessage::Coordinator(msg) => { let (sub, id) = match msg { - coordinator::ProcessorMessage::CosignedBlock { block, .. } => (0, block.encode()), + coordinator::ProcessorMessage::CosignedBlock { cosign } => { + (0, cosign.cosign.block_hash.encode()) + } coordinator::ProcessorMessage::SignedBatch { batch, .. } => (1, batch.batch.id.encode()), - coordinator::ProcessorMessage::SubstrateBlockAck { block, .. } => (2, block.encode()), - coordinator::ProcessorMessage::SignedSlashReport { session, .. } => (3, session.encode()), + coordinator::ProcessorMessage::SignedSlashReport { session, .. } => (2, session.encode()), }; let mut res = vec![PROCESSOR_UID, TYPE_COORDINATOR_UID, sub]; res.extend(&id); res } - ProcessorMessage::Substrate(_) => panic!("requesting intent for empty message type"), + ProcessorMessage::Substrate(msg) => { + let (sub, id) = match msg { + substrate::ProcessorMessage::SubstrateBlockAck { block, .. } => (0, block.encode()), + }; + + let mut res = vec![PROCESSOR_UID, TYPE_SUBSTRATE_UID, sub]; + res.extend(&id); + res + } } } }