diff --git a/Cargo.lock b/Cargo.lock index ea9b74df..a6b1c37e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8318,6 +8318,7 @@ dependencies = [ "blake2", "borsh", "ciphersuite", + "dkg", "env_logger", "frost-schnorrkel", "hex", @@ -8387,6 +8388,7 @@ version = "0.1.0" dependencies = [ "bitvec", "borsh", + "dkg", "futures", "log", "parity-scale-codec", diff --git a/coordinator/Cargo.toml b/coordinator/Cargo.toml index ce3ceda1..bd3c2e3d 100644 --- a/coordinator/Cargo.toml +++ b/coordinator/Cargo.toml @@ -26,6 +26,7 @@ blake2 = { version = "0.10", default-features = false, features = ["std"] } schnorrkel = { version = "0.11", default-features = false, features = ["std"] } ciphersuite = { path = "../crypto/ciphersuite", default-features = false, features = ["std"] } +dkg = { path = "../crypto/dkg", default-features = false, features = ["std"] } schnorr = { package = "schnorr-signatures", path = "../crypto/schnorr", default-features = false, features = ["std"] } frost = { package = "modular-frost", path = "../crypto/frost" } frost-schnorrkel = { path = "../crypto/schnorrkel" } diff --git a/coordinator/src/db.rs b/coordinator/src/db.rs index b17e569b..6f336bcc 100644 --- a/coordinator/src/db.rs +++ b/coordinator/src/db.rs @@ -3,6 +3,8 @@ use std::{path::Path, fs}; pub(crate) use serai_db::{Get, DbTxn, Db as DbTrait}; use serai_db::{create_db, db_channel}; +use dkg::Participant; + use serai_client::{ primitives::NetworkId, validator_sets::primitives::{Session, ValidatorSet}, @@ -95,6 +97,8 @@ mod _internal_db { Coordinator { // Tributary transactions to publish TributaryTransactions: (set: ValidatorSet) -> Transaction, + // Participants to remove + RemoveParticipant: (set: ValidatorSet) -> Participant, } } } @@ -111,3 +115,16 @@ impl TributaryTransactions { _internal_db::TributaryTransactions::try_recv(txn, set) } } + +pub(crate) struct RemoveParticipant; +impl RemoveParticipant { + pub(crate) fn send(txn: &mut impl DbTxn, set: ValidatorSet, participant: Participant) { + // If this set has yet to be retired, send this transaction + if RetiredTributary::get(txn, set.network).map(|session| session.0) < Some(set.session.0) { + _internal_db::RemoveParticipant::send(txn, set, &participant); + } + } + pub(crate) fn try_recv(txn: &mut impl DbTxn, set: ValidatorSet) -> Option { + _internal_db::RemoveParticipant::try_recv(txn, set) + } +} diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 0048ebd1..c739d390 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -14,7 +14,7 @@ use borsh::BorshDeserialize; use tokio::sync::mpsc; use serai_client::{ - primitives::{NetworkId, PublicKey, Signature}, + primitives::{NetworkId, SeraiAddress, Signature}, validator_sets::primitives::ValidatorSet, Serai, }; @@ -209,28 +209,12 @@ async fn handle_network( network_key, } => todo!("TODO Transaction::DkgConfirmationPreprocess"), messages::key_gen::ProcessorMessage::Blame { session, participant } => { - let set = ValidatorSet { network, session }; - TributaryTransactions::send( - &mut txn, - set, - &Transaction::RemoveParticipant { - participant: todo!("TODO"), - signed: Signed::default(), - }, - ); + RemoveParticipant::send(&mut txn, ValidatorSet { network, session }, participant); } }, messages::ProcessorMessage::Sign(msg) => match msg { messages::sign::ProcessorMessage::InvalidParticipant { session, participant } => { - let set = ValidatorSet { network, session }; - TributaryTransactions::send( - &mut txn, - set, - &Transaction::RemoveParticipant { - participant: todo!("TODO"), - signed: Signed::default(), - }, - ); + RemoveParticipant::send(&mut txn, ValidatorSet { network, session }, participant); } messages::sign::ProcessorMessage::Preprocesses { id, preprocesses } => { let set = ValidatorSet { network, session: id.session }; @@ -371,6 +355,8 @@ async fn main() { while !Cosigning::::intended_cosigns(&mut txn, to_cleanup).is_empty() {} // Drain the transactions to publish for this set while TributaryTransactions::try_recv(&mut txn, to_cleanup).is_some() {} + // Drain the participants to remove for this set + while RemoveParticipant::try_recv(&mut txn, to_cleanup).is_some() {} // Remove the SignSlashReport notification SignSlashReport::try_recv(&mut txn, to_cleanup); } @@ -434,7 +420,7 @@ async fn main() { EphemeralEventStream::new( db.clone(), serai.clone(), - PublicKey::from_raw((::generator() * serai_key.deref()).to_bytes()), + SeraiAddress((::generator() * serai_key.deref()).to_bytes()), ) .continually_run(substrate_ephemeral_task_def, vec![substrate_task]), ); diff --git a/coordinator/src/tributary.rs b/coordinator/src/tributary.rs index 2ebfd223..6f3020cb 100644 --- a/coordinator/src/tributary.rs +++ b/coordinator/src/tributary.rs @@ -26,7 +26,7 @@ use serai_coordinator_tributary::{ }; use serai_coordinator_p2p::P2p; -use crate::{Db, TributaryTransactions}; +use crate::{Db, TributaryTransactions, RemoveParticipant}; create_db! { Coordinator { @@ -158,8 +158,14 @@ impl ContinuallyRan #[must_use] async fn add_signed_unsigned_transaction( tributary: &Tributary, - tx: &Transaction, + key: &Zeroizing<::F>, + mut tx: Transaction, ) -> bool { + // If this is a signed transaction, sign it + if matches!(tx.kind(), TransactionKind::Signed(_, _)) { + tx.sign(&mut OsRng, tributary.genesis(), key); + } + let res = tributary.add_transaction(tx.clone()).await; match &res { // Fresh publication, already published @@ -191,7 +197,7 @@ pub(crate) struct AddTributaryTransactionsTask db: CD, tributary_db: TD, tributary: Tributary, - set: ValidatorSet, + set: NewSetInformation, key: Zeroizing<::F>, } impl ContinuallyRan for AddTributaryTransactionsTask { @@ -204,23 +210,20 @@ impl ContinuallyRan for AddTributaryTransactio // Provide/add all transactions sent our way loop { let mut txn = self.db.txn(); - let Some(mut tx) = TributaryTransactions::try_recv(&mut txn, self.set) else { break }; + let Some(tx) = TributaryTransactions::try_recv(&mut txn, self.set.set) else { break }; let kind = tx.kind(); match kind { - TransactionKind::Provided(_) => provide_transaction(self.set, &self.tributary, tx).await, + TransactionKind::Provided(_) => { + provide_transaction(self.set.set, &self.tributary, tx).await + } TransactionKind::Unsigned | TransactionKind::Signed(_, _) => { - // If this is a signed transaction, sign it - if matches!(kind, TransactionKind::Signed(_, _)) { - tx.sign(&mut OsRng, self.tributary.genesis(), &self.key); - } - // If this is a transaction with signing data, check the topic is recognized before // publishing let topic = tx.topic(); let still_requires_recognition = if let Some(topic) = topic { (topic.requires_recognition() && - (!RecognizedTopics::recognized(&self.tributary_db, self.set, topic))) + (!RecognizedTopics::recognized(&self.tributary_db, self.set.set, topic))) .then_some(topic) } else { None @@ -229,11 +232,11 @@ impl ContinuallyRan for AddTributaryTransactio // Queue the transaction until the topic is recognized // We use the Tributary DB for this so it's cleaned up when the Tributary DB is let mut txn = self.tributary_db.txn(); - PublishOnRecognition::set(&mut txn, self.set, topic, &tx); + PublishOnRecognition::set(&mut txn, self.set.set, topic, &tx); txn.commit(); } else { // Actually add the transaction - if !add_signed_unsigned_transaction(&self.tributary, &tx).await { + if !add_signed_unsigned_transaction(&self.tributary, &self.key, tx).await { break; } } @@ -248,12 +251,12 @@ impl ContinuallyRan for AddTributaryTransactio loop { let mut txn = self.tributary_db.txn(); let Some(topic) = - RecognizedTopics::try_recv_topic_requiring_recognition(&mut txn, self.set) + RecognizedTopics::try_recv_topic_requiring_recognition(&mut txn, self.set.set) else { break; }; - if let Some(tx) = PublishOnRecognition::take(&mut txn, self.set, topic) { - if !add_signed_unsigned_transaction(&self.tributary, &tx).await { + if let Some(tx) = PublishOnRecognition::take(&mut txn, self.set.set, topic) { + if !add_signed_unsigned_transaction(&self.tributary, &self.key, tx).await { break; } } @@ -262,6 +265,21 @@ impl ContinuallyRan for AddTributaryTransactio txn.commit(); } + // Publish any participant removals + loop { + let mut txn = self.db.txn(); + let Some(participant) = RemoveParticipant::try_recv(&mut txn, self.set.set) else { break }; + let tx = Transaction::RemoveParticipant { + participant: self.set.participant_indexes_reverse_lookup[&participant], + signed: Default::default(), + }; + if !add_signed_unsigned_transaction(&self.tributary, &self.key, tx).await { + break; + } + made_progress = true; + txn.commit(); + } + Ok(made_progress) } } @@ -487,7 +505,7 @@ pub(crate) async fn spawn_tributary( db: db.clone(), tributary_db, tributary: tributary.clone(), - set: set.set, + set: set.clone(), key: serai_key, }) .continually_run(add_tributary_transactions_task_def, vec![]), diff --git a/coordinator/substrate/Cargo.toml b/coordinator/substrate/Cargo.toml index f4eeaa59..c733cc31 100644 --- a/coordinator/substrate/Cargo.toml +++ b/coordinator/substrate/Cargo.toml @@ -22,6 +22,9 @@ bitvec = { version = "1", default-features = false, features = ["std"] } scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive", "bit-vec"] } borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] } + +dkg = { path = "../../crypto/dkg", default-features = false, features = ["std"] } + serai-client = { path = "../../substrate/client", version = "0.1", default-features = false, features = ["serai", "borsh"] } log = { version = "0.4", default-features = false, features = ["std"] } diff --git a/coordinator/substrate/src/ephemeral.rs b/coordinator/substrate/src/ephemeral.rs index 3ea8de98..18c11d00 100644 --- a/coordinator/substrate/src/ephemeral.rs +++ b/coordinator/substrate/src/ephemeral.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use futures::stream::{StreamExt, FuturesOrdered}; use serai_client::{ - primitives::{PublicKey, NetworkId, EmbeddedEllipticCurve}, + primitives::{NetworkId, SeraiAddress, EmbeddedEllipticCurve}, validator_sets::primitives::MAX_KEY_SHARES_PER_SET, Serai, }; @@ -26,14 +26,14 @@ create_db!( pub struct EphemeralEventStream { db: D, serai: Arc, - validator: PublicKey, + validator: SeraiAddress, } impl EphemeralEventStream { /// Create a new ephemeral event stream. /// /// Only one of these may exist over the provided database. - pub fn new(db: D, serai: Arc, validator: PublicKey) -> Self { + pub fn new(db: D, serai: Arc, validator: SeraiAddress) -> Self { Self { db, serai, validator } } } @@ -145,6 +145,10 @@ impl ContinuallyRan for EphemeralEventStream { "block #{block_number} declared a new set but didn't have the participants" ))? }; + let validators = validators + .into_iter() + .map(|(validator, weight)| (SeraiAddress::from(validator), weight)) + .collect::>(); let in_set = validators.iter().any(|(validator, _)| *validator == self.validator); if in_set { if u16::try_from(validators.len()).is_err() { @@ -177,14 +181,16 @@ impl ContinuallyRan for EphemeralEventStream { embedded_elliptic_curve_keys.push_back(async move { tokio::try_join!( // One future to fetch the substrate embedded key - serai - .embedded_elliptic_curve_key(validator, EmbeddedEllipticCurve::Embedwards25519), + serai.embedded_elliptic_curve_key( + validator.into(), + EmbeddedEllipticCurve::Embedwards25519 + ), // One future to fetch the external embedded key, if there is a distinct curve async { // `embedded_elliptic_curves` is documented to have the second entry be the // network-specific curve (if it exists and is distinct from Embedwards25519) if let Some(curve) = set.network.embedded_elliptic_curves().get(1) { - serai.embedded_elliptic_curve_key(validator, *curve).await.map(Some) + serai.embedded_elliptic_curve_key(validator.into(), *curve).await.map(Some) } else { Ok(None) } @@ -215,19 +221,22 @@ impl ContinuallyRan for EphemeralEventStream { } } - crate::NewSet::send( - &mut txn, - &NewSetInformation { - set: *set, - serai_block: block.block_hash, - declaration_time: block.time, - // TODO: Why do we have this as an explicit field here? - // Shouldn't thiis be inlined into the Processor's key gen code, where it's used? - threshold: ((total_weight * 2) / 3) + 1, - validators, - evrf_public_keys, - }, - ); + let mut new_set = NewSetInformation { + set: *set, + serai_block: block.block_hash, + declaration_time: block.time, + // TODO: Why do we have this as an explicit field here? + // Shouldn't this be inlined into the Processor's key gen code, where it's used? + threshold: ((total_weight * 2) / 3) + 1, + validators, + evrf_public_keys, + participant_indexes: Default::default(), + participant_indexes_reverse_lookup: Default::default(), + }; + // These aren't serialized, and we immediately serialize and drop this, so this isn't + // necessary. It's just good practice not have this be dirty + new_set.init_participant_indexes(); + crate::NewSet::send(&mut txn, &new_set); } } diff --git a/coordinator/substrate/src/lib.rs b/coordinator/substrate/src/lib.rs index c8e437f4..f313eb36 100644 --- a/coordinator/substrate/src/lib.rs +++ b/coordinator/substrate/src/lib.rs @@ -2,11 +2,15 @@ #![doc = include_str!("../README.md")] #![deny(missing_docs)] +use std::collections::HashMap; + use scale::{Encode, Decode}; -use borsh::{io, BorshSerialize, BorshDeserialize}; +use borsh::{BorshSerialize, BorshDeserialize}; + +use dkg::Participant; use serai_client::{ - primitives::{NetworkId, PublicKey, Signature}, + primitives::{NetworkId, SeraiAddress, Signature}, validator_sets::primitives::{Session, ValidatorSet, KeyPair, SlashReport}, in_instructions::primitives::SignedBatch, Transaction, @@ -26,22 +30,9 @@ pub use publish_batch::PublishBatchTask; mod publish_slash_report; pub use publish_slash_report::PublishSlashReportTask; -fn borsh_serialize_validators( - validators: &Vec<(PublicKey, u16)>, - writer: &mut W, -) -> Result<(), io::Error> { - // This doesn't use `encode_to` as `encode_to` panics if the writer returns an error - writer.write_all(&validators.encode()) -} - -fn borsh_deserialize_validators( - reader: &mut R, -) -> Result, io::Error> { - Decode::decode(&mut scale::IoReader(reader)).map_err(io::Error::other) -} - /// The information for a new set. #[derive(Clone, Debug, BorshSerialize, BorshDeserialize)] +#[borsh(init = init_participant_indexes)] pub struct NewSetInformation { /// The set. pub set: ValidatorSet, @@ -52,13 +43,34 @@ pub struct NewSetInformation { /// The threshold to use. pub threshold: u16, /// The validators, with the amount of key shares they have. - #[borsh( - serialize_with = "borsh_serialize_validators", - deserialize_with = "borsh_deserialize_validators" - )] - pub validators: Vec<(PublicKey, u16)>, + pub validators: Vec<(SeraiAddress, u16)>, /// The eVRF public keys. pub evrf_public_keys: Vec<([u8; 32], Vec)>, + /// The participant indexes, indexed by their validator. + #[borsh(skip)] + pub participant_indexes: HashMap>, + /// The validators, indexed by their participant indexes. + #[borsh(skip)] + pub participant_indexes_reverse_lookup: HashMap, +} + +impl NewSetInformation { + fn init_participant_indexes(&mut self) { + let mut next_i = 1; + self.participant_indexes = HashMap::with_capacity(self.validators.len()); + self.participant_indexes_reverse_lookup = HashMap::with_capacity(self.validators.len()); + for (validator, weight) in &self.validators { + let mut these_is = Vec::with_capacity((*weight).into()); + for _ in 0 .. *weight { + let this_i = Participant::new(next_i).unwrap(); + next_i += 1; + + these_is.push(this_i); + self.participant_indexes_reverse_lookup.insert(this_i, *validator); + } + self.participant_indexes.insert(*validator, these_is); + } + } } mod _public_db { diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index 6b8616aa..f37928c3 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -515,7 +515,6 @@ impl ScanTributaryTask { let mut total_weight = 0; let mut validator_weights = HashMap::with_capacity(new_set.validators.len()); for (validator, weight) in new_set.validators.iter().copied() { - let validator = SeraiAddress::from(validator); let weight = u64::from(weight); validators.push(validator); total_weight += weight; @@ -597,7 +596,6 @@ impl ContinuallyRan for ScanTributaryTask { pub fn slash_report_transaction(getter: &impl Get, set: &NewSetInformation) -> Transaction { let mut slash_points = Vec::with_capacity(set.validators.len()); for (validator, _weight) in set.validators.iter().copied() { - let validator = SeraiAddress::from(validator); slash_points.push(SlashPoints::get(getter, set.set, validator).unwrap_or(0)); } Transaction::SlashReport { slash_points, signed: Signed::default() }