From 5e0e91c85dc360aa43e069c6c1773b1651fb57a9 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Tue, 14 Jan 2025 01:58:26 -0500 Subject: [PATCH] Add tasks to publish data onto Serai --- Cargo.lock | 1 + coordinator/Cargo.toml | 2 +- coordinator/src/main.rs | 6 +- coordinator/substrate/Cargo.toml | 4 +- coordinator/substrate/README.md | 10 +- coordinator/substrate/src/lib.rs | 122 +++++++++++++++++- coordinator/substrate/src/publish_batch.rs | 66 ++++++++++ .../src/publish_slash_report.rs} | 38 +++--- coordinator/substrate/src/set_keys.rs | 88 +++++++++++++ coordinator/tributary/src/lib.rs | 2 +- substrate/client/src/serai/validator_sets.rs | 2 + 11 files changed, 303 insertions(+), 38 deletions(-) create mode 100644 coordinator/substrate/src/publish_batch.rs rename coordinator/{src/serai.rs => substrate/src/publish_slash_report.rs} (77%) create mode 100644 coordinator/substrate/src/set_keys.rs diff --git a/Cargo.lock b/Cargo.lock index 2c649c36..ea9b74df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8385,6 +8385,7 @@ dependencies = [ name = "serai-coordinator-substrate" version = "0.1.0" dependencies = [ + "bitvec", "borsh", "futures", "log", diff --git a/coordinator/Cargo.toml b/coordinator/Cargo.toml index 2eec60c8..ce3ceda1 100644 --- a/coordinator/Cargo.toml +++ b/coordinator/Cargo.toml @@ -30,7 +30,7 @@ schnorr = { package = "schnorr-signatures", path = "../crypto/schnorr", default- frost = { package = "modular-frost", path = "../crypto/frost" } frost-schnorrkel = { path = "../crypto/schnorrkel" } -scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive"] } +scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive", "bit-vec"] } zalloc = { path = "../common/zalloc" } serai-db = { path = "../common/db" } diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index b6bb275d..e1f6708a 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -39,8 +39,6 @@ mod p2p { pub use serai_coordinator_libp2p_p2p::Libp2p; } -mod serai; - // Use a zeroizing allocator for this entire application // While secrets should already be zeroized, the presence of secret keys in a networked application // (at increased risk of OOB reads) justifies the performance hit in case any secrets weren't @@ -227,10 +225,10 @@ async fn handle_processor_messages( SignedCosigns::send(&mut txn, &cosign); } messages::coordinator::ProcessorMessage::SignedBatch { batch } => { - todo!("TODO Save to DB, have task read from DB and publish to Serai") + todo!("TODO PublishBatchTask") } messages::coordinator::ProcessorMessage::SignedSlashReport { session, signature } => { - todo!("TODO Save to DB, have task read from DB and publish to Serai") + todo!("TODO PublishSlashReportTask") } }, messages::ProcessorMessage::Substrate(msg) => match msg { diff --git a/coordinator/substrate/Cargo.toml b/coordinator/substrate/Cargo.toml index 21577d62..f4eeaa59 100644 --- a/coordinator/substrate/Cargo.toml +++ b/coordinator/substrate/Cargo.toml @@ -18,7 +18,9 @@ rustdoc-args = ["--cfg", "docsrs"] workspace = true [dependencies] -scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive"] } +bitvec = { version = "1", default-features = false, features = ["std"] } + +scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive", "bit-vec"] } borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] } serai-client = { path = "../../substrate/client", version = "0.1", default-features = false, features = ["serai", "borsh"] } diff --git a/coordinator/substrate/README.md b/coordinator/substrate/README.md index 83d217aa..1bce3218 100644 --- a/coordinator/substrate/README.md +++ b/coordinator/substrate/README.md @@ -1,6 +1,6 @@ -# Serai Coordinate Substrate Scanner +# Serai Coordinator Substrate -This is the scanner of the Serai blockchain for the purposes of Serai's coordinator. +This crate manages the Serai coordinators's interactions with Serai's Substrate blockchain. Two event streams are defined: @@ -12,3 +12,9 @@ Two event streams are defined: The canonical event stream is available without provision of a validator's public key. The ephemeral event stream requires provision of a validator's public key. Both are ordered within themselves, yet there are no ordering guarantees across the two. + +Additionally, a collection of tasks are defined to publish data onto Serai: + +- `SetKeysTask`, which sets the keys generated via DKGs onto Serai. +- `PublishBatchTask`, which publishes `Batch`s onto Serai. +- `PublishSlashReportTask`, which publishes `SlashReport`s onto Serai. diff --git a/coordinator/substrate/src/lib.rs b/coordinator/substrate/src/lib.rs index 228cbed9..dc8056a7 100644 --- a/coordinator/substrate/src/lib.rs +++ b/coordinator/substrate/src/lib.rs @@ -6,8 +6,10 @@ use scale::{Encode, Decode}; use borsh::{io, BorshSerialize, BorshDeserialize}; use serai_client::{ - primitives::{PublicKey, NetworkId}, - validator_sets::primitives::ValidatorSet, + primitives::{NetworkId, PublicKey, Signature, SeraiAddress}, + validator_sets::primitives::{Session, ValidatorSet, KeyPair}, + in_instructions::primitives::SignedBatch, + Transaction, }; use serai_db::*; @@ -17,6 +19,13 @@ pub use canonical::CanonicalEventStream; mod ephemeral; pub use ephemeral::EphemeralEventStream; +mod set_keys; +pub use set_keys::SetKeysTask; +mod publish_batch; +pub use publish_batch::PublishBatchTask; +mod publish_slash_report; +pub use publish_slash_report::PublishSlashReportTask; + fn borsh_serialize_validators( validators: &Vec<(PublicKey, u16)>, writer: &mut W, @@ -53,11 +62,7 @@ pub struct NewSetInformation { } mod _public_db { - use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet}; - - use serai_db::*; - - use crate::NewSetInformation; + use super::*; db_channel!( CoordinatorSubstrate { @@ -68,6 +73,18 @@ mod _public_db { NewSet: () -> NewSetInformation, // Potentially relevant sign slash report, from an ephemeral event stream SignSlashReport: (set: ValidatorSet) -> (), + + // Signed batches to publish onto the Serai network + SignedBatches: (network: NetworkId) -> SignedBatch, + } + ); + + create_db!( + CoordinatorSubstrate { + // Keys to set on the Serai network + Keys: (network: NetworkId) -> (Session, Vec), + // Slash reports to publish onto the Serai network + SlashReports: (network: NetworkId) -> (Session, Vec), } ); } @@ -118,3 +135,94 @@ impl SignSlashReport { _public_db::SignSlashReport::try_recv(txn, set) } } + +/// The keys to set on Serai. +pub struct Keys; +impl Keys { + /// Set the keys to report for a validator set. + /// + /// This only saves the most recent keys as only a single session is eligible to have its keys + /// reported at once. + pub fn set( + txn: &mut impl DbTxn, + set: ValidatorSet, + key_pair: KeyPair, + signature_participants: bitvec::vec::BitVec, + signature: Signature, + ) { + // If we have a more recent pair of keys, don't write this historic one + if let Some((existing_session, _)) = _public_db::Keys::get(txn, set.network) { + if existing_session.0 >= set.session.0 { + return; + } + } + + let tx = serai_client::validator_sets::SeraiValidatorSets::set_keys( + set.network, + key_pair, + signature_participants, + signature, + ); + _public_db::Keys::set(txn, set.network, &(set.session, tx.encode())); + } + pub(crate) fn take(txn: &mut impl DbTxn, network: NetworkId) -> Option<(Session, Transaction)> { + let (session, tx) = _public_db::Keys::take(txn, network)?; + Some((session, <_>::decode(&mut tx.as_slice()).unwrap())) + } +} + +/// The signed batches to publish onto Serai. +pub struct SignedBatches; +impl SignedBatches { + /// Send a `SignedBatch` to publish onto Serai. + /// + /// These will be published sequentially. Out-of-order sending risks hanging the task. + pub fn send(txn: &mut impl DbTxn, batch: &SignedBatch) { + _public_db::SignedBatches::send(txn, batch.batch.network, batch); + } + pub(crate) fn try_recv(txn: &mut impl DbTxn, network: NetworkId) -> Option { + _public_db::SignedBatches::try_recv(txn, network) + } +} + +/// The slash report was invalid. +#[derive(Debug)] +pub struct InvalidSlashReport; + +/// The slash reports to publish onto Serai. +pub struct SlashReports; +impl SlashReports { + /// Set the slashes to report for a validator set. + /// + /// This only saves the most recent slashes as only a single session is eligible to have its + /// slashes reported at once. + /// + /// Returns Err if the slashes are invalid. Returns Ok if the slashes weren't detected as + /// invalid. Slashes may be considered invalid by the Serai blockchain later even if not detected + /// as invalid here. + pub fn set( + txn: &mut impl DbTxn, + set: ValidatorSet, + slashes: Vec<(SeraiAddress, u32)>, + signature: Signature, + ) -> Result<(), InvalidSlashReport> { + // If we have a more recent slash report, don't write this historic one + if let Some((existing_session, _)) = _public_db::SlashReports::get(txn, set.network) { + if existing_session.0 >= set.session.0 { + return Ok(()); + } + } + + let tx = serai_client::validator_sets::SeraiValidatorSets::report_slashes( + set.network, + slashes.try_into().map_err(|_| InvalidSlashReport)?, + signature, + ); + _public_db::SlashReports::set(txn, set.network, &(set.session, tx.encode())); + Ok(()) + } + pub(crate) fn take(txn: &mut impl DbTxn, network: NetworkId) -> Option<(Session, Transaction)> { + let (session, tx) = _public_db::SlashReports::take(txn, network)?; + Some((session, <_>::decode(&mut tx.as_slice()).unwrap())) + } +} diff --git a/coordinator/substrate/src/publish_batch.rs b/coordinator/substrate/src/publish_batch.rs new file mode 100644 index 00000000..6d186266 --- /dev/null +++ b/coordinator/substrate/src/publish_batch.rs @@ -0,0 +1,66 @@ +use core::future::Future; +use std::sync::Arc; + +use serai_db::{DbTxn, Db}; + +use serai_client::{primitives::NetworkId, SeraiError, Serai}; + +use serai_task::ContinuallyRan; + +use crate::SignedBatches; + +/// Publish `SignedBatch`s from `SignedBatches` onto Serai. +pub struct PublishBatchTask { + db: D, + serai: Arc, + network: NetworkId, +} + +impl PublishBatchTask { + /// Create a task to publish `SignedBatch`s onto Serai. + /// + /// Returns None if `network == NetworkId::Serai`. + // TODO: ExternalNetworkId + pub fn new(db: D, serai: Arc, network: NetworkId) -> Option { + if network == NetworkId::Serai { + None? + }; + Some(Self { db, serai, network }) + } +} + +impl ContinuallyRan for PublishBatchTask { + type Error = SeraiError; + + fn run_iteration(&mut self) -> impl Send + Future> { + async move { + let mut made_progress = false; + + loop { + let mut txn = self.db.txn(); + let Some(batch) = SignedBatches::try_recv(&mut txn, self.network) else { + // No batch to publish at this time + break; + }; + + // Publish this Batch if it hasn't already been published + let serai = self.serai.as_of_latest_finalized_block().await?; + let last_batch = serai.in_instructions().last_batch_for_network(self.network).await?; + if last_batch < Some(batch.batch.id) { + // This stream of Batches *should* be sequential within the larger context of the Serai + // coordinator. In this library, we use a more relaxed definition and don't assert + // sequence. This does risk hanging the task, if Batch #n+1 is sent before Batch #n, but + // that is a documented fault of the `SignedBatches` API. + self + .serai + .publish(&serai_client::in_instructions::SeraiInInstructions::execute_batch(batch)) + .await?; + } + + txn.commit(); + made_progress = true; + } + Ok(made_progress) + } + } +} diff --git a/coordinator/src/serai.rs b/coordinator/substrate/src/publish_slash_report.rs similarity index 77% rename from coordinator/src/serai.rs rename to coordinator/substrate/src/publish_slash_report.rs index 20599b3d..9c20fcdd 100644 --- a/coordinator/src/serai.rs +++ b/coordinator/substrate/src/publish_slash_report.rs @@ -1,25 +1,28 @@ use core::future::Future; use std::sync::Arc; -use serai_db::{Get, DbTxn, Db as DbTrait, create_db}; +use serai_db::{DbTxn, Db}; -use scale::Decode; use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, Serai}; use serai_task::ContinuallyRan; -create_db! { - CoordinatorSerai { - SlashReports: (network: NetworkId) -> (Session, Vec), +use crate::SlashReports; + +/// Publish slash reports from `SlashReports` onto Serai. +pub struct PublishSlashReportTask { + db: D, + serai: Arc, +} + +impl PublishSlashReportTask { + /// Create a task to publish slash reports onto Serai. + pub fn new(db: D, serai: Arc) -> Self { + Self { db, serai } } } -/// Publish `SlashReport`s from `SlashReports` onto Serai. -pub struct PublishSlashReportTask { - db: CD, - serai: Arc, -} -impl ContinuallyRan for PublishSlashReportTask { +impl ContinuallyRan for PublishSlashReportTask { type Error = String; fn run_iteration(&mut self) -> impl Send + Future> { @@ -35,7 +38,6 @@ impl ContinuallyRan for PublishSlashReportTask { // No slash report to publish continue; }; - let slash_report = serai_client::Transaction::decode(&mut slash_report.as_slice()).unwrap(); let serai = self.serai.as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?; @@ -48,7 +50,7 @@ impl ContinuallyRan for PublishSlashReportTask { let session_after_slash_report_retired = current_session > Some(session_after_slash_report.0); if session_after_slash_report_retired { - // Commit the txn to drain this SlashReport from the database and not try it again later + // Commit the txn to drain this slash report from the database and not try it again later txn.commit(); continue; } @@ -57,7 +59,7 @@ impl ContinuallyRan for PublishSlashReportTask { // We already checked the current session wasn't greater, and they're not equal assert!(current_session < Some(session_after_slash_report.0)); // This would mean the Serai node is resyncing and is behind where it prior was - Err("have a SlashReport for a session Serai has yet to retire".to_string())?; + Err("have a slash report for a session Serai has yet to retire".to_string())?; } // If this session which should publish a slash report already has, move on @@ -68,14 +70,6 @@ impl ContinuallyRan for PublishSlashReportTask { continue; }; - /* - let tx = serai_client::SeraiValidatorSets::report_slashes( - network, - slash_report, - signature.clone(), - ); - */ - match self.serai.publish(&slash_report).await { Ok(()) => { txn.commit(); diff --git a/coordinator/substrate/src/set_keys.rs b/coordinator/substrate/src/set_keys.rs new file mode 100644 index 00000000..129bb703 --- /dev/null +++ b/coordinator/substrate/src/set_keys.rs @@ -0,0 +1,88 @@ +use core::future::Future; +use std::sync::Arc; + +use serai_db::{DbTxn, Db}; + +use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet, Serai}; + +use serai_task::ContinuallyRan; + +use crate::Keys; + +/// Set keys from `Keys` on Serai. +pub struct SetKeysTask { + db: D, + serai: Arc, +} + +impl SetKeysTask { + /// Create a task to publish slash reports onto Serai. + pub fn new(db: D, serai: Arc) -> Self { + Self { db, serai } + } +} + +impl ContinuallyRan for SetKeysTask { + type Error = String; + + fn run_iteration(&mut self) -> impl Send + Future> { + async move { + let mut made_progress = false; + for network in serai_client::primitives::NETWORKS { + if network == NetworkId::Serai { + continue; + }; + + let mut txn = self.db.txn(); + let Some((session, keys)) = Keys::take(&mut txn, network) else { + // No keys to set + continue; + }; + + let serai = + self.serai.as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?; + let serai = serai.validator_sets(); + let current_session = serai.session(network).await.map_err(|e| format!("{e:?}"))?; + let current_session = current_session.map(|session| session.0); + // Only attempt to set these keys if this isn't a retired session + if Some(session.0) < current_session { + // Commit the txn to take these keys from the database and not try it again later + txn.commit(); + continue; + } + + if Some(session.0) != current_session { + // We already checked the current session wasn't greater, and they're not equal + assert!(current_session < Some(session.0)); + // This would mean the Serai node is resyncing and is behind where it prior was + Err("have a keys for a session Serai has yet to start".to_string())?; + } + + // If this session already has had its keys set, move on + if serai + .keys(ValidatorSet { network, session }) + .await + .map_err(|e| format!("{e:?}"))? + .is_some() + { + txn.commit(); + continue; + }; + + match self.serai.publish(&keys).await { + Ok(()) => { + txn.commit(); + made_progress = true; + } + // This could be specific to this TX (such as an already in mempool error) and it may be + // worthwhile to continue iteration with the other pending slash reports. We assume this + // error ephemeral and that the latency incurred for this ephemeral error to resolve is + // miniscule compared to the window reasonable to set the keys. That makes this a + // non-issue. + Err(e) => Err(format!("couldn't publish set keys transaction: {e:?}"))?, + } + } + Ok(made_progress) + } + } +} diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index 83300a0d..80724c76 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -206,7 +206,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { } Transaction::DkgConfirmationShare { attempt, share, signed } => { // Accumulate the shares into our own FROST attempt manager - todo!("TODO") + todo!("TODO: SetKeysTask") } Transaction::Cosign { substrate_block_hash } => { diff --git a/substrate/client/src/serai/validator_sets.rs b/substrate/client/src/serai/validator_sets.rs index 89990406..8eb50b70 100644 --- a/substrate/client/src/serai/validator_sets.rs +++ b/substrate/client/src/serai/validator_sets.rs @@ -238,6 +238,8 @@ impl<'a> SeraiValidatorSets<'a> { pub fn report_slashes( network: NetworkId, + // TODO: This bounds a maximum length but takes more space than just publishing all the u32s + // (50 * (32 + 4)) > (150 * 4) slashes: sp_runtime::BoundedVec< (SeraiAddress, u32), sp_core::ConstU32<{ primitives::MAX_KEY_SHARES_PER_SET / 3 }>,