From 7312fa8d3ccd8bc812085159afbc2eaa04021666 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Wed, 15 Jan 2025 12:08:28 -0500 Subject: [PATCH] Spawn PublishSlashReportTask Updates it so that it'll try for every network instead of returning after any network fails. Uses the SlashReport type throughout the codebase. --- coordinator/src/main.rs | 27 ++++- coordinator/substrate/src/lib.rs | 21 +--- .../substrate/src/publish_slash_report.rs | 112 ++++++++++-------- coordinator/tributary/src/lib.rs | 4 +- processor/messages/src/lib.rs | 10 +- substrate/abi/src/validator_sets.rs | 2 +- substrate/client/src/serai/validator_sets.rs | 11 +- substrate/runtime/src/abi.rs | 20 +--- substrate/validator-sets/pallet/src/lib.rs | 2 +- .../primitives/src/slash_points.rs | 24 ++++ 10 files changed, 132 insertions(+), 101 deletions(-) diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 5895f74c..0048ebd1 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -14,7 +14,7 @@ use borsh::BorshDeserialize; use tokio::sync::mpsc; use serai_client::{ - primitives::{NetworkId, PublicKey}, + primitives::{NetworkId, PublicKey, Signature}, validator_sets::primitives::ValidatorSet, Serai, }; @@ -25,6 +25,7 @@ use serai_task::{Task, TaskHandle, ContinuallyRan}; use serai_cosign::{Faulted, SignedCosign, Cosigning}; use serai_coordinator_substrate::{ CanonicalEventStream, EphemeralEventStream, SignSlashReport, SignedBatches, PublishBatchTask, + SlashReports, PublishSlashReportTask, }; use serai_coordinator_tributary::{SigningProtocolRound, Signed, Transaction, SubstrateBlockPlans}; @@ -161,6 +162,7 @@ async fn handle_network( .unwrap() .continually_run(publish_batch_task_def, vec![]), ); + // Forget its handle so it always runs in the background core::mem::forget(publish_batch_task); } @@ -274,8 +276,17 @@ async fn handle_network( messages::coordinator::ProcessorMessage::SignedBatch { batch } => { SignedBatches::send(&mut txn, &batch); } - messages::coordinator::ProcessorMessage::SignedSlashReport { session, signature } => { - todo!("TODO PublishSlashReportTask") + messages::coordinator::ProcessorMessage::SignedSlashReport { + session, + slash_report, + signature, + } => { + SlashReports::set( + &mut txn, + ValidatorSet { network, session }, + slash_report, + Signature(signature), + ); } }, messages::ProcessorMessage::Substrate(msg) => match msg { @@ -472,6 +483,16 @@ async fn main() { tokio::spawn(handle_network(db.clone(), message_queue.clone(), serai.clone(), network)); } + // Spawn the task to publish slash reports + { + let (publish_slash_report_task_def, publish_slash_report_task) = Task::new(); + tokio::spawn( + PublishSlashReportTask::new(db, serai).continually_run(publish_slash_report_task_def, vec![]), + ); + // Always have this run in the background + core::mem::forget(publish_slash_report_task); + } + // Run the spawned tasks ad-infinitum core::future::pending().await } diff --git a/coordinator/substrate/src/lib.rs b/coordinator/substrate/src/lib.rs index a03c05dd..c8e437f4 100644 --- a/coordinator/substrate/src/lib.rs +++ b/coordinator/substrate/src/lib.rs @@ -6,8 +6,8 @@ use scale::{Encode, Decode}; use borsh::{io, BorshSerialize, BorshDeserialize}; use serai_client::{ - primitives::{NetworkId, PublicKey, Signature, SeraiAddress}, - validator_sets::primitives::{Session, ValidatorSet, KeyPair}, + primitives::{NetworkId, PublicKey, Signature}, + validator_sets::primitives::{Session, ValidatorSet, KeyPair, SlashReport}, in_instructions::primitives::SignedBatch, Transaction, }; @@ -183,10 +183,6 @@ impl SignedBatches { } } -/// The slash report was invalid. -#[derive(Debug)] -pub struct InvalidSlashReport; - /// The slash reports to publish onto Serai. pub struct SlashReports; impl SlashReports { @@ -194,30 +190,25 @@ impl SlashReports { /// /// This only saves the most recent slashes as only a single session is eligible to have its /// slashes reported at once. - /// - /// Returns Err if the slashes are invalid. Returns Ok if the slashes weren't detected as - /// invalid. Slashes may be considered invalid by the Serai blockchain later even if not detected - /// as invalid here. pub fn set( txn: &mut impl DbTxn, set: ValidatorSet, - slashes: Vec<(SeraiAddress, u32)>, + slash_report: SlashReport, signature: Signature, - ) -> Result<(), InvalidSlashReport> { + ) { // If we have a more recent slash report, don't write this historic one if let Some((existing_session, _)) = _public_db::SlashReports::get(txn, set.network) { if existing_session.0 >= set.session.0 { - return Ok(()); + return; } } let tx = serai_client::validator_sets::SeraiValidatorSets::report_slashes( set.network, - slashes.try_into().map_err(|_| InvalidSlashReport)?, + slash_report, signature, ); _public_db::SlashReports::set(txn, set.network, &(set.session, tx.encode())); - Ok(()) } pub(crate) fn take(txn: &mut impl DbTxn, network: NetworkId) -> Option<(Session, Transaction)> { let (session, tx) = _public_db::SlashReports::take(txn, network)?; diff --git a/coordinator/substrate/src/publish_slash_report.rs b/coordinator/substrate/src/publish_slash_report.rs index 9c20fcdd..a26d4bd6 100644 --- a/coordinator/substrate/src/publish_slash_report.rs +++ b/coordinator/substrate/src/publish_slash_report.rs @@ -22,66 +22,80 @@ impl PublishSlashReportTask { } } +impl PublishSlashReportTask { + // Returns if a slash report was successfully published + async fn publish(&mut self, network: NetworkId) -> Result { + let mut txn = self.db.txn(); + let Some((session, slash_report)) = SlashReports::take(&mut txn, network) else { + // No slash report to publish + return Ok(false); + }; + + let serai = self.serai.as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?; + let serai = serai.validator_sets(); + let session_after_slash_report = Session(session.0 + 1); + let current_session = serai.session(network).await.map_err(|e| format!("{e:?}"))?; + let current_session = current_session.map(|session| session.0); + // Only attempt to publish the slash report for session #n while session #n+1 is still + // active + let session_after_slash_report_retired = current_session > Some(session_after_slash_report.0); + if session_after_slash_report_retired { + // Commit the txn to drain this slash report from the database and not try it again later + txn.commit(); + return Ok(false); + } + + if Some(session_after_slash_report.0) != current_session { + // We already checked the current session wasn't greater, and they're not equal + assert!(current_session < Some(session_after_slash_report.0)); + // This would mean the Serai node is resyncing and is behind where it prior was + Err("have a slash report for a session Serai has yet to retire".to_string())?; + } + + // If this session which should publish a slash report already has, move on + let key_pending_slash_report = + serai.key_pending_slash_report(network).await.map_err(|e| format!("{e:?}"))?; + if key_pending_slash_report.is_none() { + txn.commit(); + return Ok(false); + }; + + match self.serai.publish(&slash_report).await { + Ok(()) => { + txn.commit(); + Ok(true) + } + // This could be specific to this TX (such as an already in mempool error) and it may be + // worthwhile to continue iteration with the other pending slash reports. We assume this + // error ephemeral and that the latency incurred for this ephemeral error to resolve is + // miniscule compared to the window available to publish the slash report. That makes + // this a non-issue. + Err(e) => Err(format!("couldn't publish slash report transaction: {e:?}")), + } + } +} + impl ContinuallyRan for PublishSlashReportTask { type Error = String; fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut made_progress = false; + let mut error = None; for network in serai_client::primitives::NETWORKS { if network == NetworkId::Serai { continue; }; - let mut txn = self.db.txn(); - let Some((session, slash_report)) = SlashReports::take(&mut txn, network) else { - // No slash report to publish - continue; - }; - - let serai = - self.serai.as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?; - let serai = serai.validator_sets(); - let session_after_slash_report = Session(session.0 + 1); - let current_session = serai.session(network).await.map_err(|e| format!("{e:?}"))?; - let current_session = current_session.map(|session| session.0); - // Only attempt to publish the slash report for session #n while session #n+1 is still - // active - let session_after_slash_report_retired = - current_session > Some(session_after_slash_report.0); - if session_after_slash_report_retired { - // Commit the txn to drain this slash report from the database and not try it again later - txn.commit(); - continue; - } - - if Some(session_after_slash_report.0) != current_session { - // We already checked the current session wasn't greater, and they're not equal - assert!(current_session < Some(session_after_slash_report.0)); - // This would mean the Serai node is resyncing and is behind where it prior was - Err("have a slash report for a session Serai has yet to retire".to_string())?; - } - - // If this session which should publish a slash report already has, move on - let key_pending_slash_report = - serai.key_pending_slash_report(network).await.map_err(|e| format!("{e:?}"))?; - if key_pending_slash_report.is_none() { - txn.commit(); - continue; - }; - - match self.serai.publish(&slash_report).await { - Ok(()) => { - txn.commit(); - made_progress = true; - } - // This could be specific to this TX (such as an already in mempool error) and it may be - // worthwhile to continue iteration with the other pending slash reports. We assume this - // error ephemeral and that the latency incurred for this ephemeral error to resolve is - // miniscule compared to the window available to publish the slash report. That makes - // this a non-issue. - Err(e) => Err(format!("couldn't publish slash report transaction: {e:?}"))?, - } + let network_res = self.publish(network).await; + // We made progress if any network successfully published their slash report + made_progress |= network_res == Ok(true); + // We want to yield the first error *after* attempting for every network + error = error.or(network_res.err()); + } + // Yield the error + if let Some(error) = error { + Err(error)? } Ok(made_progress) } diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index bd6119dd..6b8616aa 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -371,7 +371,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { // Create the resulting slash report let mut slash_report = vec![]; - for (_, points) in self.validators.iter().copied().zip(amortized_slash_report) { + for points in amortized_slash_report { // TODO: Natively store this as a `Slash` if points == u32::MAX { slash_report.push(Slash::Fatal); @@ -397,7 +397,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { self.set, messages::coordinator::CoordinatorMessage::SignSlashReport { session: self.set.session, - report: slash_report, + slash_report: slash_report.try_into().unwrap(), }, ); } diff --git a/processor/messages/src/lib.rs b/processor/messages/src/lib.rs index acf01775..b8f496ab 100644 --- a/processor/messages/src/lib.rs +++ b/processor/messages/src/lib.rs @@ -7,7 +7,7 @@ use borsh::{BorshSerialize, BorshDeserialize}; use dkg::Participant; use serai_primitives::BlockHash; -use validator_sets_primitives::{Session, KeyPair, Slash}; +use validator_sets_primitives::{Session, KeyPair, SlashReport}; use coins_primitives::OutInstructionWithBalance; use in_instructions_primitives::SignedBatch; @@ -100,7 +100,9 @@ pub mod sign { Self::Cosign(cosign) => { f.debug_struct("VariantSignId::Cosign").field("0", &cosign).finish() } - Self::Batch(batch) => f.debug_struct("VariantSignId::Batch").field("0", &batch).finish(), + Self::Batch(batch) => { + f.debug_struct("VariantSignId::Batch").field("0", &hex::encode(batch)).finish() + } Self::SlashReport => f.debug_struct("VariantSignId::SlashReport").finish(), Self::Transaction(tx) => { f.debug_struct("VariantSignId::Transaction").field("0", &hex::encode(tx)).finish() @@ -168,7 +170,7 @@ pub mod coordinator { /// Sign the slash report for this session. /// /// This is sent by the Coordinator's Tributary scanner. - SignSlashReport { session: Session, report: Vec }, + SignSlashReport { session: Session, slash_report: SlashReport }, } // This set of messages is sent entirely and solely by serai-processor-bin's implementation of @@ -178,7 +180,7 @@ pub mod coordinator { pub enum ProcessorMessage { CosignedBlock { cosign: SignedCosign }, SignedBatch { batch: SignedBatch }, - SignedSlashReport { session: Session, signature: Vec }, + SignedSlashReport { session: Session, slash_report: SlashReport, signature: [u8; 64] }, } } diff --git a/substrate/abi/src/validator_sets.rs b/substrate/abi/src/validator_sets.rs index ec8a5714..85317c0d 100644 --- a/substrate/abi/src/validator_sets.rs +++ b/substrate/abi/src/validator_sets.rs @@ -21,7 +21,7 @@ pub enum Call { }, report_slashes { network: NetworkId, - slashes: BoundedVec<(SeraiAddress, u32), ConstU32<{ MAX_KEY_SHARES_PER_SET_U32 / 3 }>>, + slashes: SlashReport, signature: Signature, }, allocate { diff --git a/substrate/client/src/serai/validator_sets.rs b/substrate/client/src/serai/validator_sets.rs index c92e4f89..882f7af6 100644 --- a/substrate/client/src/serai/validator_sets.rs +++ b/substrate/client/src/serai/validator_sets.rs @@ -5,10 +5,10 @@ use sp_runtime::BoundedVec; use serai_abi::primitives::Amount; pub use serai_abi::validator_sets::primitives; -use primitives::{MAX_KEY_LEN, Session, ValidatorSet, KeyPair}; +use primitives::{MAX_KEY_LEN, Session, ValidatorSet, KeyPair, SlashReport}; use crate::{ - primitives::{EmbeddedEllipticCurve, NetworkId, SeraiAddress}, + primitives::{EmbeddedEllipticCurve, NetworkId}, Transaction, Serai, TemporalSerai, SeraiError, }; @@ -238,12 +238,7 @@ impl<'a> SeraiValidatorSets<'a> { pub fn report_slashes( network: NetworkId, - // TODO: This bounds a maximum length but takes more space than just publishing all the u32s - // (50 * (32 + 4)) > (150 * 4) - slashes: sp_runtime::BoundedVec< - (SeraiAddress, u32), - sp_core::ConstU32<{ primitives::MAX_KEY_SHARES_PER_SET_U32 / 3 }>, - >, + slashes: SlashReport, signature: Signature, ) -> Transaction { Serai::unsigned(serai_abi::Call::ValidatorSets( diff --git a/substrate/runtime/src/abi.rs b/substrate/runtime/src/abi.rs index 107389c1..81c8b202 100644 --- a/substrate/runtime/src/abi.rs +++ b/substrate/runtime/src/abi.rs @@ -111,13 +111,7 @@ impl From for RuntimeCall { serai_abi::validator_sets::Call::report_slashes { network, slashes, signature } => { RuntimeCall::ValidatorSets(validator_sets::Call::report_slashes { network, - slashes: <_>::try_from( - slashes - .into_iter() - .map(|(addr, slash)| (PublicKey::from(addr), slash)) - .collect::>(), - ) - .unwrap(), + slashes, signature, }) } @@ -301,17 +295,7 @@ impl TryInto for RuntimeCall { } } validator_sets::Call::report_slashes { network, slashes, signature } => { - serai_abi::validator_sets::Call::report_slashes { - network, - slashes: <_>::try_from( - slashes - .into_iter() - .map(|(addr, slash)| (SeraiAddress::from(addr), slash)) - .collect::>(), - ) - .unwrap(), - signature, - } + serai_abi::validator_sets::Call::report_slashes { network, slashes, signature } } validator_sets::Call::allocate { network, amount } => { serai_abi::validator_sets::Call::allocate { network, amount } diff --git a/substrate/validator-sets/pallet/src/lib.rs b/substrate/validator-sets/pallet/src/lib.rs index 2ba1b45f..4fbddda4 100644 --- a/substrate/validator-sets/pallet/src/lib.rs +++ b/substrate/validator-sets/pallet/src/lib.rs @@ -1010,7 +1010,7 @@ pub mod pallet { pub fn report_slashes( origin: OriginFor, network: NetworkId, - slashes: BoundedVec<(Public, u32), ConstU32<{ MAX_KEY_SHARES_PER_SET_U32 / 3 }>>, + slashes: SlashReport, signature: Signature, ) -> DispatchResult { ensure_none(origin)?; diff --git a/substrate/validator-sets/primitives/src/slash_points.rs b/substrate/validator-sets/primitives/src/slash_points.rs index c045a4ef..d420157e 100644 --- a/substrate/validator-sets/primitives/src/slash_points.rs +++ b/substrate/validator-sets/primitives/src/slash_points.rs @@ -210,6 +210,30 @@ impl Slash { #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct SlashReport(pub BoundedVec>); +#[cfg(feature = "borsh")] +impl BorshSerialize for SlashReport { + fn serialize(&self, writer: &mut W) -> borsh::io::Result<()> { + BorshSerialize::serialize(self.0.as_slice(), writer) + } +} +#[cfg(feature = "borsh")] +impl BorshDeserialize for SlashReport { + fn deserialize_reader(reader: &mut R) -> borsh::io::Result { + let slashes = Vec::::deserialize_reader(reader)?; + slashes + .try_into() + .map(Self) + .map_err(|_| borsh::io::Error::other("length of slash report exceeds max validators")) + } +} + +impl TryFrom> for SlashReport { + type Error = &'static str; + fn try_from(slashes: Vec) -> Result { + slashes.try_into().map(Self).map_err(|_| "length of slash report exceeds max validators") + } +} + // This is assumed binding to the ValidatorSet via the key signed with pub fn report_slashes_message(slashes: &SlashReport) -> Vec { (b"ValidatorSets-report_slashes", slashes).encode()