diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 9f2e9e7a..0dae9b40 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -17,7 +17,7 @@ use message_queue::{Service, client::MessageQueue}; use serai_task::{Task, TaskHandle, ContinuallyRan}; use serai_cosign::{SignedCosign, Cosigning}; -use serai_coordinator_substrate::{CanonicalEventStream, EphemeralEventStream}; +use serai_coordinator_substrate::{CanonicalEventStream, EphemeralEventStream, SignSlashReport}; use serai_coordinator_tributary::Transaction; mod db; @@ -148,6 +148,8 @@ async fn main() { prune_tributary_db(to_cleanup); // Drain the cosign intents created for this set while !Cosigning::::intended_cosigns(&mut txn, to_cleanup).is_empty() {} + // Remove the SignSlashReport notification + SignSlashReport::try_recv(&mut txn, to_cleanup); } // Remove retired Tributaries from ActiveTributaries @@ -198,7 +200,6 @@ async fn main() { }; // Spawn the Substrate scanners - // TODO: SignSlashReport let (substrate_task_def, substrate_task) = Task::new(); let (substrate_canonical_task_def, substrate_canonical_task) = Task::new(); tokio::spawn( diff --git a/coordinator/src/tributary.rs b/coordinator/src/tributary.rs index 56fe8a37..55fae37c 100644 --- a/coordinator/src/tributary.rs +++ b/coordinator/src/tributary.rs @@ -2,6 +2,7 @@ use core::{future::Future, time::Duration}; use std::sync::Arc; use zeroize::Zeroizing; +use rand_core::OsRng; use blake2::{digest::typenum::U32, Digest, Blake2s}; use ciphersuite::{Ciphersuite, Ristretto}; @@ -12,14 +13,14 @@ use serai_db::{DbTxn, Db as DbTrait}; use scale::Encode; use serai_client::validator_sets::primitives::ValidatorSet; -use tributary_sdk::{ProvidedError, Tributary}; +use tributary_sdk::{TransactionError, ProvidedError, Tributary}; use serai_task::{Task, TaskHandle, ContinuallyRan}; use message_queue::{Service, Metadata, client::MessageQueue}; use serai_cosign::Cosigning; -use serai_coordinator_substrate::NewSetInformation; +use serai_coordinator_substrate::{NewSetInformation, SignSlashReport}; use serai_coordinator_tributary::{Transaction, ProcessorMessages, ScanTributaryTask}; use serai_coordinator_p2p::P2p; @@ -27,9 +28,9 @@ use crate::Db; /// Provides Cosign/Cosigned Transactions onto the Tributary. pub(crate) struct ProvideCosignCosignedTransactionsTask { - pub(crate) db: CD, - pub(crate) set: NewSetInformation, - pub(crate) tributary: Tributary, + db: CD, + set: NewSetInformation, + tributary: Tributary, } impl ContinuallyRan for ProvideCosignCosignedTransactionsTask @@ -128,9 +129,9 @@ impl ContinuallyRan /// Takes the messages from ScanTributaryTask and publishes them to the message-queue. pub(crate) struct TributaryProcessorMessagesTask { - pub(crate) tributary_db: TD, - pub(crate) set: ValidatorSet, - pub(crate) message_queue: Arc, + tributary_db: TD, + set: ValidatorSet, + message_queue: Arc, } impl ContinuallyRan for TributaryProcessorMessagesTask { fn run_iteration(&mut self) -> impl Send + Future> { @@ -155,6 +156,51 @@ impl ContinuallyRan for TributaryProcessorMessagesTask { } } +/// Checks for the notification to sign a slash report and does so if present. +pub(crate) struct SignSlashReportTask { + db: CD, + tributary_db: TD, + tributary: Tributary, + set: NewSetInformation, + key: Zeroizing<::F>, +} +impl ContinuallyRan for SignSlashReportTask { + fn run_iteration(&mut self) -> impl Send + Future> { + async move { + let mut txn = self.db.txn(); + let Some(()) = SignSlashReport::try_recv(&mut txn, self.set.set) else { return Ok(false) }; + + // Fetch the slash report for this Tributary + let mut tx = + serai_coordinator_tributary::slash_report_transaction(&self.tributary_db, &self.set); + tx.sign(&mut OsRng, self.tributary.genesis(), &self.key); + + let res = self.tributary.add_transaction(tx.clone()).await; + match &res { + // Fresh publication, already published + Ok(true | false) => {} + Err( + TransactionError::TooLargeTransaction | + TransactionError::InvalidSigner | + TransactionError::InvalidNonce | + TransactionError::InvalidSignature | + TransactionError::InvalidContent, + ) => { + panic!("created an invalid SlashReport transaction, tx: {tx:?}, err: {res:?}"); + } + // We've published too many transactions recently + // Drop this txn to try to publish it again later on a future iteration + Err(TransactionError::TooManyInMempool) => return Ok(false), + // This isn't a Provided transaction so this should never be hit + Err(TransactionError::ProvidedAddedToMempool) => unreachable!(), + } + + txn.commit(); + Ok(true) + } + } +} + /// Run the scan task whenever the Tributary adds a new block. async fn scan_on_new_block( db: CD, @@ -183,8 +229,14 @@ async fn scan_on_new_block( /// Spawn a Tributary. /// -/// This will spawn the Tributary, the Tributary scan task, forward the messages from the scan task -/// to the message queue, provide Cosign/Cosigned transactions, and inform the P2P network. +/// This will: +/// - Spawn the Tributary +/// - Inform the P2P network of the Tributary +/// - Spawn the ScanTributaryTask +/// - Spawn the ProvideCosignCosignedTransactionsTask +/// - Spawn the TributaryProcessorMessagesTask +/// - Spawn the SignSlashReportTask +/// - Iterate the scan task whenever a new block occurs (not just on the standard interval) pub(crate) async fn spawn_tributary( db: Db, message_queue: Arc, @@ -219,10 +271,16 @@ pub(crate) async fn spawn_tributary( // Spawn the Tributary let tributary_db = crate::db::tributary_db(set.set); - let tributary = - Tributary::new(tributary_db.clone(), genesis, start_time, serai_key, tributary_validators, p2p) - .await - .unwrap(); + let tributary = Tributary::new( + tributary_db.clone(), + genesis, + start_time, + serai_key.clone(), + tributary_validators, + p2p, + ) + .await + .unwrap(); let reader = tributary.reader(); // Inform the P2P network @@ -256,12 +314,25 @@ pub(crate) async fn spawn_tributary( // Spawn the scan task let (scan_tributary_task_def, scan_tributary_task) = Task::new(); tokio::spawn( - ScanTributaryTask::<_, _, P>::new(db.clone(), tributary_db, &set, reader) + ScanTributaryTask::<_, _, P>::new(db.clone(), tributary_db.clone(), &set, reader) // This is the only handle for this TributaryProcessorMessagesTask, so when this task is // dropped, it will be too .continually_run(scan_tributary_task_def, vec![scan_tributary_messages_task]), ); + // Spawn the sign slash report task + let (sign_slash_report_task_def, sign_slash_report_task) = Task::new(); + tokio::spawn( + (SignSlashReportTask { + db: db.clone(), + tributary_db, + tributary: tributary.clone(), + set: set.clone(), + key: serai_key, + }) + .continually_run(sign_slash_report_task_def, vec![]), + ); + // Whenever a new block occurs, immediately run the scan task // This function also preserves the ProvideCosignCosignedTransactionsTask handle until the // Tributary is retired, ensuring it isn't dropped prematurely and that the task don't run ad @@ -271,6 +342,6 @@ pub(crate) async fn spawn_tributary( set.set, tributary, scan_tributary_task, - vec![provide_cosign_cosigned_transactions_task], + vec![provide_cosign_cosigned_transactions_task, sign_slash_report_task], )); } diff --git a/coordinator/substrate/src/ephemeral.rs b/coordinator/substrate/src/ephemeral.rs index d889d59f..54df6b3c 100644 --- a/coordinator/substrate/src/ephemeral.rs +++ b/coordinator/substrate/src/ephemeral.rs @@ -234,7 +234,7 @@ impl ContinuallyRan for EphemeralEventStream { else { panic!("AcceptedHandover event wasn't a AcceptedHandover event: {accepted_handover:?}"); }; - crate::SignSlashReport::send(&mut txn, set); + crate::SignSlashReport::send(&mut txn, *set); } txn.commit(); diff --git a/coordinator/substrate/src/lib.rs b/coordinator/substrate/src/lib.rs index b3f00a5e..228cbed9 100644 --- a/coordinator/substrate/src/lib.rs +++ b/coordinator/substrate/src/lib.rs @@ -66,8 +66,8 @@ mod _public_db { // Relevant new set, from an ephemeral event stream NewSet: () -> NewSetInformation, - // Relevant sign slash report, from an ephemeral event stream - SignSlashReport: () -> ValidatorSet, + // Potentially relevant sign slash report, from an ephemeral event stream + SignSlashReport: (set: ValidatorSet) -> (), } ); } @@ -109,12 +109,12 @@ impl NewSet { /// notifications for all relevant validator sets will be included. pub struct SignSlashReport; impl SignSlashReport { - pub(crate) fn send(txn: &mut impl DbTxn, set: &ValidatorSet) { - _public_db::SignSlashReport::send(txn, set); + pub(crate) fn send(txn: &mut impl DbTxn, set: ValidatorSet) { + _public_db::SignSlashReport::send(txn, set, &()); } /// Try to receive a notification to sign a slash report, returning `None` if there is none to /// receive. - pub fn try_recv(txn: &mut impl DbTxn) -> Option { - _public_db::SignSlashReport::try_recv(txn) + pub fn try_recv(txn: &mut impl DbTxn, set: ValidatorSet) -> Option<()> { + _public_db::SignSlashReport::try_recv(txn, set) } } diff --git a/coordinator/tributary/src/db.rs b/coordinator/tributary/src/db.rs index 87567846..c48393af 100644 --- a/coordinator/tributary/src/db.rs +++ b/coordinator/tributary/src/db.rs @@ -184,8 +184,8 @@ create_db!( // The last handled tributary block's (number, hash) LastHandledTributaryBlock: (set: ValidatorSet) -> (u64, [u8; 32]), - // The slash points a validator has accrued, with u64::MAX representing a fatal slash. - SlashPoints: (set: ValidatorSet, validator: SeraiAddress) -> u64, + // The slash points a validator has accrued, with u32::MAX representing a fatal slash. + SlashPoints: (set: ValidatorSet, validator: SeraiAddress) -> u32, // The latest Substrate block to cosign. LatestSubstrateBlockToCosign: (set: ValidatorSet) -> [u8; 32], @@ -316,7 +316,7 @@ impl TributaryDb { reason: &str, ) { log::warn!("{validator} fatally slashed: {reason}"); - SlashPoints::set(txn, set, validator, &u64::MAX); + SlashPoints::set(txn, set, validator, &u32::MAX); } pub(crate) fn is_fatally_slashed( @@ -324,7 +324,7 @@ impl TributaryDb { set: ValidatorSet, validator: SeraiAddress, ) -> bool { - SlashPoints::get(getter, set, validator).unwrap_or(0) == u64::MAX + SlashPoints::get(getter, set, validator).unwrap_or(0) == u32::MAX } #[allow(clippy::too_many_arguments)] diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index 9b059820..686af18d 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -511,3 +511,13 @@ impl ContinuallyRan for ScanTributaryTask { } } } + +/// Create the Transaction::SlashReport to publish per the local view. +pub fn slash_report_transaction(getter: &impl Get, set: &NewSetInformation) -> Transaction { + let mut slash_points = Vec::with_capacity(set.validators.len()); + for (validator, _weight) in set.validators.iter().copied() { + let validator = SeraiAddress::from(validator); + slash_points.push(SlashPoints::get(getter, set.set, validator).unwrap_or(0)); + } + Transaction::SlashReport { slash_points, signed: Signed::default() } +} diff --git a/coordinator/tributary/src/transaction.rs b/coordinator/tributary/src/transaction.rs index f9fd016d..befad461 100644 --- a/coordinator/tributary/src/transaction.rs +++ b/coordinator/tributary/src/transaction.rs @@ -6,7 +6,7 @@ use rand_core::{RngCore, CryptoRng}; use blake2::{digest::typenum::U32, Digest, Blake2b}; use ciphersuite::{ - group::{ff::Field, GroupEncoding}, + group::{ff::Field, Group, GroupEncoding}, Ciphersuite, Ristretto, }; use schnorr::SchnorrSignature; @@ -80,6 +80,18 @@ impl Signed { } } +impl Default for Signed { + fn default() -> Self { + Self { + signer: ::G::identity(), + signature: SchnorrSignature { + R: ::G::identity(), + s: ::F::ZERO, + }, + } + } +} + /// The Tributary transaction definition used by Serai #[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] pub enum Transaction {