diff --git a/coordinator/src/cosign_evaluator.rs b/coordinator/src/cosign_evaluator.rs index a38377b4..6d5b94fc 100644 --- a/coordinator/src/cosign_evaluator.rs +++ b/coordinator/src/cosign_evaluator.rs @@ -9,7 +9,7 @@ use tokio::{ time::sleep, }; -use scale::Encode; +use borsh::BorshSerialize; use sp_application_crypto::RuntimePublic; use serai_client::{ primitives::{NETWORKS, NetworkId, Signature}, @@ -28,7 +28,8 @@ use crate::{ create_db! { CosignDb { - ReceivedCosign: (set: ValidatorSet, block: [u8; 32]) -> Vec, + ReceivedCosign: (set: ValidatorSet, block: [u8; 32]) -> CosignedBlock, + LatestCosign: (network: NetworkId) -> CosignedBlock, DistinctChain: (set: ValidatorSet) -> (), } } @@ -37,7 +38,7 @@ pub struct CosignEvaluator { db: Mutex, serai: Arc, stakes: RwLock>>, - latest_cosigns: RwLock>, + latest_cosigns: RwLock>, } impl CosignEvaluator { @@ -50,10 +51,10 @@ impl CosignEvaluator { let latest_cosigns = self.latest_cosigns.read().await; let mut highest_block = 0; - for (block_num, _) in latest_cosigns.values() { + for cosign in latest_cosigns.values() { let mut networks = HashSet::new(); - for (network, (sub_block_num, _)) in &*latest_cosigns { - if sub_block_num >= block_num { + for (network, sub_cosign) in &*latest_cosigns { + if sub_cosign.block_number >= cosign.block_number { networks.insert(network); } } @@ -61,7 +62,7 @@ impl CosignEvaluator { networks.into_iter().map(|network| stakes.get(network).unwrap_or(&0)).sum::(); let needed_stake = ((total_stake * 2) / 3) + 1; if (total_stake == 0) || (sum_stake > needed_stake) { - highest_block = highest_block.max(*block_num); + highest_block = highest_block.max(cosign.block_number); } } @@ -106,7 +107,7 @@ impl CosignEvaluator { async fn handle_new_cosign(&self, cosign: CosignedBlock) -> Result<(), SeraiError> { // If we already have this cosign or a newer cosign, return if let Some(latest) = self.latest_cosigns.read().await.get(&cosign.network) { - if latest.0 >= cosign.block_number { + if latest.block_number >= cosign.block_number { return Ok(()); } } @@ -180,7 +181,8 @@ impl CosignEvaluator { { let mut db = self.db.lock().await; let mut txn = db.txn(); - ReceivedCosign::set(&mut txn, set_with_keys, cosign.block, &cosign.encode()); + ReceivedCosign::set(&mut txn, set_with_keys, cosign.block, &cosign); + LatestCosign::set(&mut txn, set_with_keys.network, &(cosign)); txn.commit(); } @@ -258,7 +260,7 @@ impl CosignEvaluator { } else { { let mut latest_cosigns = self.latest_cosigns.write().await; - latest_cosigns.insert(cosign.network, (block.number(), cosign)); + latest_cosigns.insert(cosign.network, cosign); } self.update_latest_cosign().await; } @@ -268,11 +270,18 @@ impl CosignEvaluator { #[allow(clippy::new_ret_no_self)] pub fn new(db: D, p2p: P, serai: Arc) -> mpsc::UnboundedSender { + let mut latest_cosigns = HashMap::new(); + for network in NETWORKS { + if let Some(cosign) = LatestCosign::get(&db, network) { + latest_cosigns.insert(network, cosign); + } + } + let evaluator = Arc::new(Self { db: Mutex::new(db), serai, stakes: RwLock::new(None), - latest_cosigns: RwLock::new(HashMap::new()), + latest_cosigns: RwLock::new(latest_cosigns), }); // Spawn a task to update stakes regularly @@ -310,15 +319,11 @@ impl CosignEvaluator { tokio::spawn({ async move { loop { - let cosigns = evaluator - .latest_cosigns - .read() - .await - .values() - .map(|cosign| cosign.1) - .collect::>(); + let cosigns = evaluator.latest_cosigns.read().await.values().cloned().collect::>(); for cosign in cosigns { - P2p::broadcast(&p2p, P2pMessageKind::CosignedBlock, cosign.encode()).await; + let mut buf = vec![]; + cosign.serialize(&mut buf).unwrap(); + P2p::broadcast(&p2p, P2pMessageKind::CosignedBlock, buf).await; } sleep(Duration::from_secs(60)).await; } diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 840af4a2..3a09b510 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -18,6 +18,7 @@ use frost::Participant; use serai_db::{DbTxn, Db}; use scale::Encode; +use borsh::BorshSerialize; use serai_client::{ primitives::NetworkId, validator_sets::primitives::{Session, ValidatorSet, KeyPair}, @@ -248,7 +249,9 @@ async fn handle_processor_message( }, }; cosign_channel.send(cosigned_block).unwrap(); - P2p::broadcast(p2p, P2pMessageKind::CosignedBlock, cosigned_block.encode()).await; + let mut buf = vec![]; + cosigned_block.serialize(&mut buf).unwrap(); + P2p::broadcast(p2p, P2pMessageKind::CosignedBlock, buf).await; None } }, @@ -555,7 +558,7 @@ async fn handle_processor_message( let SubstrateSignableId::Batch(id) = id.id else { panic!("BatchPreprocess SubstrateSignableId wasn't Batch") }; - id.encode() + id.to_le_bytes() }, preprocesses.into_iter().map(Into::into).collect(), ); @@ -1057,7 +1060,7 @@ pub async fn run( let mut tx = match id_type { RecognizedIdType::Batch => Transaction::SubstrateSign(SignData { data: get_preprocess(&raw_db, id_type, &id).await, - plan: SubstrateSignableId::Batch(id.as_slice().try_into().unwrap()), + plan: SubstrateSignableId::Batch(u32::from_le_bytes(id.try_into().unwrap())), label: Label::Preprocess, attempt: 0, signed: Transaction::empty_signed(), diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index f3dda43b..c3f27b7c 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -8,7 +8,7 @@ use std::{ use async_trait::async_trait; -use scale::{Encode, Decode}; +use borsh::{BorshSerialize, BorshDeserialize}; use serai_client::primitives::NetworkId; use serai_db::Db; @@ -39,7 +39,7 @@ use crate::{Transaction, Block, Tributary, ActiveTributary, TributaryEvent}; const LIBP2P_TOPIC: &str = "serai-coordinator"; -#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Encode, Decode)] +#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, BorshSerialize, BorshDeserialize)] pub struct CosignedBlock { pub network: NetworkId, pub block_number: u64, @@ -705,8 +705,7 @@ pub async fn handle_p2p_task( } } P2pMessageKind::CosignedBlock => { - let mut msg_ref: &[u8] = msg.msg.as_ref(); - let Ok(msg) = CosignedBlock::decode(&mut scale::IoReader(&mut msg_ref)) else { + let Ok(msg) = CosignedBlock::deserialize_reader(&mut msg.msg.as_slice()) else { log::error!("received CosignedBlock message with invalidly serialized contents"); continue; }; diff --git a/coordinator/src/substrate/mod.rs b/coordinator/src/substrate/mod.rs index 9c702f8f..999a1e3e 100644 --- a/coordinator/src/substrate/mod.rs +++ b/coordinator/src/substrate/mod.rs @@ -12,7 +12,7 @@ use serai_client::{ SeraiError, Block, Serai, TemporalSerai, primitives::{BlockHash, NetworkId}, validator_sets::{ - primitives::{ValidatorSet, KeyPair, amortize_excess_key_shares}, + primitives::{ValidatorSet, amortize_excess_key_shares}, ValidatorSetsEvent, }, in_instructions::InInstructionsEvent, @@ -25,7 +25,11 @@ use processor_messages::SubstrateContext; use tokio::{sync::mpsc, time::sleep}; -use crate::{Db, processors::Processors, tributary::TributarySpec}; +use crate::{ + Db, + processors::Processors, + tributary::{TributarySpec, SeraiDkgRemoval, SeraiDkgCompleted}, +}; mod db; pub use db::*; @@ -114,37 +118,6 @@ async fn handle_new_set( Ok(()) } -async fn handle_key_gen( - processors: &Pro, - serai: &Serai, - block: &Block, - set: ValidatorSet, - key_pair: KeyPair, -) -> Result<(), SeraiError> { - processors - .send( - set.network, - processor_messages::substrate::CoordinatorMessage::ConfirmKeyPair { - context: SubstrateContext { - serai_time: block.time().unwrap() / 1000, - network_latest_finalized_block: serai - .as_of(block.hash()) - .in_instructions() - .latest_block_for_network(set.network) - .await? - // The processor treats this as a magic value which will cause it to find a network - // block which has a time greater than or equal to the Serai time - .unwrap_or(BlockHash([0; 32])), - }, - session: set.session, - key_pair, - }, - ) - .await; - - Ok(()) -} - async fn handle_batch_and_burns( txn: &mut impl DbTxn, processors: &Pro, @@ -249,6 +222,19 @@ async fn handle_block( // Define an indexed event ID. let mut event_id = 0; + if HandledEvent::is_unhandled(db, hash, event_id) { + let mut txn = db.txn(); + for removal in serai.as_of(hash).validator_sets().participant_removed_events().await? { + let ValidatorSetsEvent::ParticipantRemoved { set, removed } = removal else { + panic!("ParticipantRemoved event wasn't ParticipantRemoved: {removal:?}"); + }; + SeraiDkgRemoval::set(&mut txn, set, removed.0, &()); + } + HandledEvent::handle_event(&mut txn, hash, event_id); + txn.commit(); + } + event_id += 1; + // If a new validator set was activated, create tributary/inform processor to do a DKG for new_set in serai.as_of(hash).validator_sets().new_set_events().await? { // Individually mark each event as handled so on reboot, we minimize duplicates @@ -279,12 +265,31 @@ async fn handle_block( for key_gen in serai.as_of(hash).validator_sets().key_gen_events().await? { if HandledEvent::is_unhandled(db, hash, event_id) { log::info!("found fresh key gen event {:?}", key_gen); - if let ValidatorSetsEvent::KeyGen { set, key_pair } = key_gen { - handle_key_gen(processors, serai, &block, set, key_pair).await?; - } else { + let ValidatorSetsEvent::KeyGen { set, key_pair } = key_gen else { panic!("KeyGen event wasn't KeyGen: {key_gen:?}"); - } + }; + processors + .send( + set.network, + processor_messages::substrate::CoordinatorMessage::ConfirmKeyPair { + context: SubstrateContext { + serai_time: block.time().unwrap() / 1000, + network_latest_finalized_block: serai + .as_of(block.hash()) + .in_instructions() + .latest_block_for_network(set.network) + .await? + // The processor treats this as a magic value which will cause it to find a network + // block which has a time greater than or equal to the Serai time + .unwrap_or(BlockHash([0; 32])), + }, + session: set.session, + key_pair, + }, + ) + .await; let mut txn = db.txn(); + SeraiDkgCompleted::set(&mut txn, set, &()); HandledEvent::handle_event(&mut txn, hash, event_id); txn.commit(); } diff --git a/coordinator/src/tests/tributary/mod.rs b/coordinator/src/tests/tributary/mod.rs index 66f65ee4..0b1eae1c 100644 --- a/coordinator/src/tests/tributary/mod.rs +++ b/coordinator/src/tests/tributary/mod.rs @@ -223,27 +223,24 @@ fn serialize_transaction() { { let mut block = [0; 32]; OsRng.fill_bytes(&mut block); - let mut batch = [0; 5]; - OsRng.fill_bytes(&mut batch); + let batch = u32::try_from(OsRng.next_u64() >> 32).unwrap(); test_read_write(Transaction::Batch { block, batch }); } test_read_write(Transaction::SubstrateBlock(OsRng.next_u64())); { - let mut plan = [0; 5]; - OsRng.fill_bytes(&mut plan); + let batch = u32::try_from(OsRng.next_u64() >> 32).unwrap(); test_read_write(Transaction::SubstrateSign(random_sign_data( &mut OsRng, - SubstrateSignableId::Batch(plan), + SubstrateSignableId::Batch(batch), Label::Preprocess, ))); } { - let mut plan = [0; 5]; - OsRng.fill_bytes(&mut plan); + let batch = u32::try_from(OsRng.next_u64() >> 32).unwrap(); test_read_write(Transaction::SubstrateSign(random_sign_data( &mut OsRng, - SubstrateSignableId::Batch(plan), + SubstrateSignableId::Batch(batch), Label::Share, ))); } diff --git a/coordinator/src/tributary/db.rs b/coordinator/src/tributary/db.rs index 562c04e5..f87bf232 100644 --- a/coordinator/src/tributary/db.rs +++ b/coordinator/src/tributary/db.rs @@ -1,10 +1,11 @@ use std::collections::HashMap; use scale::Encode; +use borsh::{BorshSerialize, BorshDeserialize}; use frost::Participant; -use serai_client::validator_sets::primitives::KeyPair; +use serai_client::validator_sets::primitives::{KeyPair, ValidatorSet}; use processor_messages::coordinator::SubstrateSignableId; @@ -14,7 +15,7 @@ use tributary::ReadWrite; use crate::tributary::{Label, Transaction}; -#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode)] +#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, BorshSerialize, BorshDeserialize)] pub enum Topic { Dkg, DkgConfirmation, @@ -44,6 +45,10 @@ pub enum Accumulation { create_db!( Tributary { SeraiBlockNumber: (hash: [u8; 32]) -> u64, + SeraiDkgRemoval: (spec: ValidatorSet, removing: [u8; 32]) -> (), + SeraiDkgCompleted: (spec: ValidatorSet) -> (), + + TributaryBlockNumber: (block: [u8; 32]) -> u32, LastHandledBlock: (genesis: [u8; 32]) -> [u8; 32], FatalSlashes: (genesis: [u8; 32]) -> Vec<[u8; 32]>, FatallySlashed: (genesis: [u8; 32], account: [u8; 32]) -> (), @@ -54,7 +59,9 @@ create_db!( (genesis: [u8; 32], removing: [u8; 32], attempt: u32) -> HashMap>, DkgKeyPair: (genesis: [u8; 32], attempt: u32) -> KeyPair, DkgCompleted: (genesis: [u8; 32]) -> (), + LocallyDkgRemoved: (genesis: [u8; 32], validator: [u8; 32]) -> (), AttemptDb: (genesis: [u8; 32], topic: &Topic) -> u32, + ReattemptDb: (genesis: [u8; 32], block: u32) -> Vec, DataReceived: (genesis: [u8; 32], data_spec: &DataSpecification) -> u16, DataDb: (genesis: [u8; 32], data_spec: &DataSpecification, signer_bytes: &[u8; 32]) -> Vec, @@ -82,6 +89,13 @@ impl AttemptDb { Self::set(txn, genesis, &topic, &0u32); } + pub fn start_next_attempt(txn: &mut impl DbTxn, genesis: [u8; 32], topic: Topic) -> u32 { + let next = + Self::attempt(txn, genesis, topic).expect("starting next attempt for unknown topic") + 1; + Self::set(txn, genesis, &topic, &next); + next + } + pub fn attempt(getter: &impl Get, genesis: [u8; 32], topic: Topic) -> Option { let attempt = Self::get(getter, genesis, &topic); // Don't require explicit recognition of the Dkg topic as it starts when the chain does @@ -92,6 +106,42 @@ impl AttemptDb { } } +impl ReattemptDb { + pub fn schedule_reattempt( + txn: &mut impl DbTxn, + genesis: [u8; 32], + current_block_number: u32, + topic: Topic, + ) { + // 5 minutes + const BASE_REATTEMPT_DELAY: u32 = (5 * 60 * 1000) / tributary::tendermint::TARGET_BLOCK_TIME; + // 5 minutes for attempts 0 ..= 2, 10 minutes for attempts 3 ..= 5, 15 minutes for attempts > 5 + // Assumes no event will take longer than 15 minutes, yet grows the time in case there are + // network bandwidth issues + let reattempt_delay = BASE_REATTEMPT_DELAY * + ((AttemptDb::attempt(txn, genesis, topic) + .expect("scheduling re-attempt for unknown topic") / + 3) + + 1) + .min(3); + let upon_block = current_block_number + reattempt_delay; + + #[allow(clippy::unwrap_or_default)] + let mut reattempts = Self::get(txn, genesis, upon_block).unwrap_or(vec![]); + reattempts.push(topic); + Self::set(txn, genesis, upon_block, &reattempts); + } + + pub fn take(txn: &mut impl DbTxn, genesis: [u8; 32], block_number: u32) -> Vec { + #[allow(clippy::unwrap_or_default)] + let res = Self::get(txn, genesis, block_number).unwrap_or(vec![]); + if !res.is_empty() { + Self::del(txn, genesis, block_number); + } + res + } +} + impl SignedTransactionDb { pub fn take_signed_transaction( txn: &mut impl DbTxn, diff --git a/coordinator/src/tributary/handle.rs b/coordinator/src/tributary/handle.rs index bca41712..18f8ad32 100644 --- a/coordinator/src/tributary/handle.rs +++ b/coordinator/src/tributary/handle.rs @@ -26,12 +26,9 @@ use serai_db::*; use crate::{ processors::Processors, tributary::{ - SignData, Transaction, TributarySpec, SeraiBlockNumber, Topic, Label, DataSpecification, - DataSet, Accumulation, + *, signing_protocol::{DkgConfirmer, DkgRemoval}, scanner::{RecognizedIdType, RIDTrait, PstTxType, PSTTrait, PTTTrait, TributaryBlockHandler}, - FatallySlashed, DkgShare, DkgCompleted, PlanIds, ConfirmationNonces, RemovalNonces, DkgKeyPair, - AttemptDb, DataReceived, DataDb, }, P2p, }; @@ -106,6 +103,7 @@ impl::G, data: &Vec, ) -> Accumulation { + log::debug!("accumulating entry for {:?} attempt #{}", &data_spec.topic, &data_spec.attempt); let genesis = self.spec.genesis(); if DataDb::get(self.txn, genesis, data_spec, &signer.to_bytes()).is_some() { panic!("accumulating data for a participant multiple times"); @@ -121,13 +119,37 @@ impl= needed) { + let needs_everyone = + (data_spec.topic == Topic::Dkg) || (data_spec.topic == Topic::DkgConfirmation); + let needed = if needs_everyone { self.spec.n() } else { self.spec.t() }; + if received_range.contains(&needed) { + log::debug!( + "accumulation for entry {:?} attempt #{} is ready", + &data_spec.topic, + &data_spec.attempt + ); return Accumulation::Ready({ let mut data = HashMap::new(); for validator in self.spec.validators().iter().map(|validator| validator.0) { @@ -187,6 +209,12 @@ impl, + pub block_number: u32, _p2p: PhantomData

, } impl TributaryBlockHandler<'_, T, Pro, PST, PTT, RID, P> { + async fn dkg_removal_attempt(&mut self, removing: [u8; 32], attempt: u32) { + let preprocess = + (DkgRemoval { spec: self.spec, key: self.our_key, txn: self.txn, removing, attempt }) + .preprocess(); + let mut tx = Transaction::DkgRemoval(SignData { + plan: removing, + attempt, + label: Label::Preprocess, + data: vec![preprocess.to_vec()], + signed: Transaction::empty_signed(), + }); + tx.sign(&mut OsRng, self.spec.genesis(), self.our_key); + self.publish_tributary_tx.publish_tributary_tx(tx).await; + } + pub async fn fatal_slash(&mut self, slashing: [u8; 32], reason: &str) { + // TODO: If this fatal slash puts the remaining set below the threshold, spin + let genesis = self.spec.genesis(); log::warn!("fatally slashing {}. reason: {}", hex::encode(slashing), reason); @@ -144,23 +163,7 @@ impl { + if DkgCompleted::get(self.txn, genesis).is_none() { + // Since it wasn't completed, instruct the processor to start the next attempt + let id = + processor_messages::key_gen::KeyGenId { session: self.spec.set().session, attempt }; + let our_i = self.spec.i(Ristretto::generator() * self.our_key.deref()).unwrap(); + + // TODO: Handle removed parties (modify n/i to accept list of removed) + // TODO: Don't fatal slash, yet don't include, parties who have been offline so long as + // we still meet the needed threshold. We'd need a complete DKG protocol we then remove + // the offline participants from. publishing the DKG protocol completed without them. + + let params = + frost::ThresholdParams::new(self.spec.t(), self.spec.n(), our_i.start).unwrap(); + let shares = u16::from(our_i.end) - u16::from(our_i.start); + + self + .processors + .send( + self.spec.set().network, + processor_messages::key_gen::CoordinatorMessage::GenerateKey { id, params, shares }, + ) + .await; + } + } + Topic::DkgConfirmation => { + panic!("re-attempting DkgConfirmation when we should be re-attempting the Dkg") + } + Topic::DkgRemoval(removing) => { + if DkgCompleted::get(self.txn, genesis).is_none() && + LocallyDkgRemoved::get(self.txn, genesis, removing).is_none() && + SeraiDkgCompleted::get(self.txn, self.spec.set()).is_none() && + SeraiDkgRemoval::get(self.txn, self.spec.set(), removing).is_none() + { + // Since it wasn't completed, attempt a new DkgRemoval + self.dkg_removal_attempt(removing, attempt).await; + } + } + Topic::SubstrateSign(inner_id) => { + let id = processor_messages::coordinator::SubstrateSignId { + session: self.spec.set().session, + id: inner_id, + attempt, + }; + match inner_id { + SubstrateSignableId::CosigningSubstrateBlock(block) => { + let block_number = SeraiBlockNumber::get(self.txn, block) + .expect("couldn't get the block number for prior attempted cosign"); + + // Check if the cosigner has a signature from our set for this block/a newer one + let latest_cosign = + crate::cosign_evaluator::LatestCosign::get(self.txn, self.spec.set().network) + .map(|cosign| cosign.block_number) + .unwrap_or(0); + if latest_cosign < block_number { + // Instruct the processor to start the next attempt + self + .processors + .send( + self.spec.set().network, + processor_messages::coordinator::CoordinatorMessage::CosignSubstrateBlock { + id, + block_number, + }, + ) + .await; + } + } + SubstrateSignableId::Batch(batch) => { + // If the Batch hasn't appeared on-chain... + if BatchInstructionsHashDb::get(self.txn, self.spec.set().network, batch).is_none() { + // Instruct the processor to start the next attempt + // The processor won't continue if it's already signed a Batch + // Prior checking if the Batch is on-chain just may reduce the non-participating + // 33% from publishing their re-attempt messages + self + .processors + .send( + self.spec.set().network, + processor_messages::coordinator::CoordinatorMessage::BatchReattempt { id }, + ) + .await; + } + } + } + } + Topic::Sign(id) => { + // Instruct the processor to start the next attempt + // If it has already noted a completion, it won't send a preprocess and will simply drop + // the re-attempt message + self + .processors + .send( + self.spec.set().network, + processor_messages::sign::CoordinatorMessage::Reattempt { + id: processor_messages::sign::SignId { + session: self.spec.set().session, + id, + attempt, + }, + }, + ) + .await; + } + } + } } } @@ -247,8 +378,10 @@ pub(crate) async fn handle_new_blocks< ) { let genesis = tributary.genesis(); let mut last_block = LastHandledBlock::get(db, genesis).unwrap_or(genesis); + let mut block_number = TributaryBlockNumber::get(db, last_block).unwrap_or(0); while let Some(next) = tributary.block_after(&last_block) { let block = tributary.block(&next).unwrap(); + block_number += 1; // Make sure we have all of the provided transactions for this block for tx in &block.transactions { @@ -264,6 +397,7 @@ pub(crate) async fn handle_new_blocks< } let mut txn = db.txn(); + TributaryBlockNumber::set(&mut txn, next, &block_number); (TributaryBlockHandler { txn: &mut txn, spec, @@ -273,6 +407,7 @@ pub(crate) async fn handle_new_blocks< publish_serai_tx, publish_tributary_tx, block, + block_number, _p2p: PhantomData::

, }) .handle::() @@ -368,6 +503,15 @@ pub(crate) async fn scan_tributaries_task< } } PstTxType::RemoveParticipant(removed) => { + if let Ok(Some(_)) = serai.keys(spec.set()).await { + log::info!( + "keys were set before we {} {:?}", + "personally could publish the removal for", + hex::encode(removed) + ); + break; + } + if let Ok(Some(participants)) = serai.participants(spec.set().network).await { diff --git a/coordinator/src/tributary/transaction.rs b/coordinator/src/tributary/transaction.rs index 4a2748d5..65a8eae8 100644 --- a/coordinator/src/tributary/transaction.rs +++ b/coordinator/src/tributary/transaction.rs @@ -132,7 +132,6 @@ impl SignData { pub enum Transaction { RemoveParticipant(Participant), - // Once this completes successfully, no more instances should be created. DkgCommitments { attempt: u32, commitments: Vec>, @@ -170,7 +169,7 @@ pub enum Transaction { // with the current processor, yet it would still be an improvement. Batch { block: [u8; 32], - batch: [u8; 5], + batch: u32, }, // When a Serai block is finalized, with the contained batches, we can allow the associated plan // IDs @@ -230,13 +229,13 @@ impl Debug for Transaction { Transaction::Batch { block, batch } => fmt .debug_struct("Transaction::Batch") .field("block", &hex::encode(block)) - .field("batch", &hex::encode(batch)) + .field("batch", &batch) .finish(), Transaction::SubstrateBlock(block) => { fmt.debug_struct("Transaction::SubstrateBlock").field("block", block).finish() } Transaction::SubstrateSign(sign_data) => { - fmt.debug_struct("Transaction::Substrate").field("sign_data", sign_data).finish() + fmt.debug_struct("Transaction::SubstrateSign").field("sign_data", sign_data).finish() } Transaction::Sign(sign_data) => { fmt.debug_struct("Transaction::Sign").field("sign_data", sign_data).finish() @@ -390,9 +389,9 @@ impl ReadWrite for Transaction { 7 => { let mut block = [0; 32]; reader.read_exact(&mut block)?; - let mut batch = [0; 5]; + let mut batch = [0; 4]; reader.read_exact(&mut batch)?; - Ok(Transaction::Batch { block, batch }) + Ok(Transaction::Batch { block, batch: u32::from_le_bytes(batch) }) } 8 => { @@ -514,7 +513,7 @@ impl ReadWrite for Transaction { Transaction::Batch { block, batch } => { writer.write_all(&[7])?; writer.write_all(block)?; - writer.write_all(batch) + writer.write_all(&batch.to_le_bytes()) } Transaction::SubstrateBlock(block) => { diff --git a/processor/messages/src/lib.rs b/processor/messages/src/lib.rs index fe4f8f89..828145ac 100644 --- a/processor/messages/src/lib.rs +++ b/processor/messages/src/lib.rs @@ -168,7 +168,7 @@ pub mod coordinator { )] pub enum SubstrateSignableId { CosigningSubstrateBlock([u8; 32]), - Batch([u8; 5]), + Batch(u32), } #[derive(Clone, PartialEq, Eq, Hash, Debug, Encode, Decode, BorshSerialize, BorshDeserialize)] diff --git a/processor/src/batch_signer.rs b/processor/src/batch_signer.rs index 9b8cb995..003397ef 100644 --- a/processor/src/batch_signer.rs +++ b/processor/src/batch_signer.rs @@ -16,7 +16,6 @@ use frost_schnorrkel::Schnorrkel; use log::{info, debug, warn}; -use scale::Encode; use serai_client::{ primitives::{NetworkId, BlockHash}, in_instructions::primitives::{Batch, SignedBatch, batch_message}, @@ -26,15 +25,10 @@ use serai_client::{ use messages::coordinator::*; use crate::{Get, DbTxn, Db, create_db}; -// Generate an ID unique to a Batch -fn batch_sign_id(network: NetworkId, id: u32) -> [u8; 5] { - (network, id).encode().try_into().unwrap() -} - create_db!( BatchSignerDb { - CompletedDb: (id: [u8; 5]) -> (), - AttemptDb: (id: [u8; 5], attempt: u32) -> (), + CompletedDb: (id: u32) -> (), + AttemptDb: (id: u32, attempt: u32) -> (), BatchDb: (block: BlockHash) -> SignedBatch } ); @@ -51,14 +45,12 @@ pub struct BatchSigner { session: Session, keys: Vec>, - signable: HashMap<[u8; 5], Batch>, - attempt: HashMap<[u8; 5], u32>, + signable: HashMap, + attempt: HashMap, #[allow(clippy::type_complexity)] - preprocessing: - HashMap<[u8; 5], (Vec>, Vec)>, + preprocessing: HashMap>, Vec)>, #[allow(clippy::type_complexity)] - signing: - HashMap<[u8; 5], (AlgorithmSignatureMachine, Vec)>, + signing: HashMap, Vec)>, } impl fmt::Debug for BatchSigner { @@ -92,7 +84,7 @@ impl BatchSigner { } } - fn verify_id(&self, id: &SubstrateSignId) -> Result<(Session, [u8; 5], u32), ()> { + fn verify_id(&self, id: &SubstrateSignId) -> Result<(Session, u32, u32), ()> { let SubstrateSignId { session, id, attempt } = id; let SubstrateSignableId::Batch(id) = id else { panic!("BatchSigner handed non-Batch") }; @@ -104,17 +96,12 @@ impl BatchSigner { // rebooted OR we detected the signed batch on chain // The latter is the expected flow for batches not actively being participated in None => { - warn!("not attempting batch {} #{}", hex::encode(id), attempt); + warn!("not attempting batch {id} #{attempt}"); Err(())?; } Some(our_attempt) => { if attempt != our_attempt { - warn!( - "sent signing data for batch {} #{} yet we have attempt #{}", - hex::encode(id), - attempt, - attempt - ); + warn!("sent signing data for batch {id} #{attempt} yet we have attempt #{our_attempt}"); Err(())?; } } @@ -127,7 +114,7 @@ impl BatchSigner { async fn attempt( &mut self, txn: &mut D::Transaction<'_>, - id: [u8; 5], + id: u32, attempt: u32, ) -> Option { // See above commentary for why this doesn't emit SignedBatch @@ -138,12 +125,7 @@ impl BatchSigner { // Check if we're already working on this attempt if let Some(curr_attempt) = self.attempt.get(&id) { if curr_attempt >= &attempt { - warn!( - "told to attempt {} #{} yet we're already working on {}", - hex::encode(id), - attempt, - curr_attempt - ); + warn!("told to attempt {id} #{attempt} yet we're already working on {curr_attempt}"); return None; } } @@ -163,7 +145,7 @@ impl BatchSigner { // Update the attempt number self.attempt.insert(id, attempt); - info!("signing batch {} #{}", hex::encode(id), attempt); + info!("signing batch {id} #{attempt}"); // If we reboot mid-sign, the current design has us abort all signs and wait for latter // attempts/new signing protocols @@ -180,9 +162,7 @@ impl BatchSigner { // TODO: This isn't complete as this txn may not be committed with the expected timing if AttemptDb::get(txn, id, attempt).is_some() { warn!( - "already attempted batch {}, attempt #{}. this is an error if we didn't reboot", - hex::encode(id), - attempt + "already attempted batch {id}, attempt #{attempt}. this is an error if we didn't reboot" ); return None; } @@ -215,7 +195,7 @@ impl BatchSigner { batch: Batch, ) -> Option { debug_assert_eq!(self.network, batch.network); - let id = batch_sign_id(batch.network, batch.id); + let id = batch.id; if CompletedDb::get(txn, id).is_some() { debug!("Sign batch order for ID we've already completed signing"); // See batch_signed for commentary on why this simply returns @@ -246,10 +226,7 @@ impl BatchSigner { let (machines, our_preprocesses) = match self.preprocessing.remove(&id) { // Either rebooted or RPC error, or some invariant None => { - warn!( - "not preprocessing for {}. this is an error if we didn't reboot", - hex::encode(id), - ); + warn!("not preprocessing for {id}. this is an error if we didn't reboot"); return None; } Some(preprocess) => preprocess, @@ -344,10 +321,7 @@ impl BatchSigner { panic!("never preprocessed yet signing?"); } - warn!( - "not preprocessing for {}. this is an error if we didn't reboot", - hex::encode(id) - ); + warn!("not preprocessing for {id}. this is an error if we didn't reboot"); return None; } Some(signing) => signing, @@ -399,7 +373,7 @@ impl BatchSigner { }, }; - info!("signed batch {} with attempt #{}", hex::encode(id), attempt); + info!("signed batch {id} with attempt #{attempt}"); let batch = SignedBatch { batch: self.signable.remove(&id).unwrap(), signature: sig.into() }; @@ -426,15 +400,13 @@ impl BatchSigner { } pub fn batch_signed(&mut self, txn: &mut D::Transaction<'_>, id: u32) { - let sign_id = batch_sign_id(self.network, id); - // Stop trying to sign for this batch - CompletedDb::set(txn, sign_id, &()); + CompletedDb::set(txn, id, &()); - self.signable.remove(&sign_id); - self.attempt.remove(&sign_id); - self.preprocessing.remove(&sign_id); - self.signing.remove(&sign_id); + self.signable.remove(&id); + self.attempt.remove(&id); + self.preprocessing.remove(&id); + self.signing.remove(&id); // This doesn't emit SignedBatch because it doesn't have access to the SignedBatch // This function is expected to only be called once Substrate acknowledges this block, diff --git a/processor/src/tests/batch_signer.rs b/processor/src/tests/batch_signer.rs index 0564db5a..eb9e3359 100644 --- a/processor/src/tests/batch_signer.rs +++ b/processor/src/tests/batch_signer.rs @@ -13,7 +13,6 @@ use sp_application_crypto::{RuntimePublic, sr25519::Public}; use serai_db::{DbTxn, Db, MemDb}; -use scale::Encode; #[rustfmt::skip] use serai_client::{primitives::*, in_instructions::primitives::*, validator_sets::primitives::Session}; @@ -49,11 +48,8 @@ async fn test_batch_signer() { ], }; - let actual_id = SubstrateSignId { - session: Session(0), - id: SubstrateSignableId::Batch((batch.network, batch.id).encode().try_into().unwrap()), - attempt: 0, - }; + let actual_id = + SubstrateSignId { session: Session(0), id: SubstrateSignableId::Batch(batch.id), attempt: 0 }; let mut signing_set = vec![]; while signing_set.len() < usize::from(keys.values().next().unwrap().params().t()) { diff --git a/substrate/client/src/serai/validator_sets.rs b/substrate/client/src/serai/validator_sets.rs index be9b64b0..bf04f5e8 100644 --- a/substrate/client/src/serai/validator_sets.rs +++ b/substrate/client/src/serai/validator_sets.rs @@ -35,6 +35,23 @@ impl<'a> SeraiValidatorSets<'a> { .await } + pub async fn participant_removed_events(&self) -> Result, SeraiError> { + self + .0 + .events(|event| { + if let serai_abi::Event::ValidatorSets(event) = event { + if matches!(event, ValidatorSetsEvent::ParticipantRemoved { .. }) { + Some(event.clone()) + } else { + None + } + } else { + None + } + }) + .await + } + pub async fn key_gen_events(&self) -> Result, SeraiError> { self .0 diff --git a/tests/coordinator/src/lib.rs b/tests/coordinator/src/lib.rs index a07d7ea4..316c8d4b 100644 --- a/tests/coordinator/src/lib.rs +++ b/tests/coordinator/src/lib.rs @@ -269,9 +269,22 @@ impl Processor { assert_eq!(msg.id, *next_recv_id); let msg_msg = borsh::from_slice(&msg.msg).unwrap(); - if !is_cosign_message(&msg_msg) { + // Remove any BatchReattempts clogging the pipe + // TODO: Set up a wrapper around serai-client so we aren't throwing this away yet + // leave it for the tests + if matches!( + msg_msg, + messages::CoordinatorMessage::Coordinator( + messages::coordinator::CoordinatorMessage::BatchReattempt { .. } + ) + ) { + queue.ack(Service::Coordinator, msg.id).await; + *next_recv_id += 1; continue; } + if !is_cosign_message(&msg_msg) { + continue; + }; queue.ack(Service::Coordinator, msg.id).await; *next_recv_id += 1; msg_msg @@ -393,17 +406,13 @@ impl Processor { *next_send_id += 1; } - /// Receive a message from the coordinator as a processor. - pub async fn recv_message(&mut self) -> CoordinatorMessage { + async fn recv_message_inner(&mut self) -> CoordinatorMessage { loop { tokio::task::yield_now().await; let mut queue_lock = self.queue.lock().await; let (_, next_recv_id, queue) = &mut *queue_lock; - // Set a timeout of an entire 6 minutes as cosigning may be delayed by up to 5 minutes - let msg = tokio::time::timeout(Duration::from_secs(6 * 60), queue.next(Service::Coordinator)) - .await - .unwrap(); + let msg = queue.next(Service::Coordinator).await; assert_eq!(msg.from, Service::Coordinator); assert_eq!(msg.id, *next_recv_id); @@ -419,6 +428,13 @@ impl Processor { } } + /// Receive a message from the coordinator as a processor. + pub async fn recv_message(&mut self) -> CoordinatorMessage { + // Set a timeout of 15 minutes to allow effectively any protocol to occur without a fear of + // an arbitrary timeout cutting it short + tokio::time::timeout(Duration::from_secs(15 * 60), self.recv_message_inner()).await.unwrap() + } + pub async fn set_substrate_key( &mut self, substrate_key: Zeroizing<::F>, diff --git a/tests/coordinator/src/tests/batch.rs b/tests/coordinator/src/tests/batch.rs index 501cdc0a..54cda6c9 100644 --- a/tests/coordinator/src/tests/batch.rs +++ b/tests/coordinator/src/tests/batch.rs @@ -38,9 +38,7 @@ pub async fn batch( substrate_key: &Zeroizing<::F>, batch: Batch, ) -> u64 { - let mut id = [0; 5]; - OsRng.fill_bytes(&mut id); - let id = SubstrateSignId { session, id: SubstrateSignableId::Batch(id), attempt: 0 }; + let id = SubstrateSignId { session, id: SubstrateSignableId::Batch(batch.id), attempt: 0 }; for processor in processors.iter_mut() { processor @@ -222,8 +220,19 @@ pub async fn batch( // Verify the coordinator sends SubstrateBlock to all processors let last_block = serai.finalized_block_by_number(last_serai_block).await.unwrap().unwrap(); for processor in processors { + // Handle a potential re-attempt message in the pipeline + let mut received = processor.recv_message().await; + if matches!( + received, + messages::CoordinatorMessage::Coordinator( + messages::coordinator::CoordinatorMessage::BatchReattempt { .. } + ) + ) { + received = processor.recv_message().await + } + assert_eq!( - processor.recv_message().await, + received, messages::CoordinatorMessage::Substrate( messages::substrate::CoordinatorMessage::SubstrateBlock { context: SubstrateContext { diff --git a/tests/coordinator/src/tests/mod.rs b/tests/coordinator/src/tests/mod.rs index 0e84ec66..330b16c5 100644 --- a/tests/coordinator/src/tests/mod.rs +++ b/tests/coordinator/src/tests/mod.rs @@ -46,8 +46,8 @@ pub(crate) fn new_test() -> (Vec<(Handles, ::F)>, Dock // Use an RPC to enaluate if a condition was met, with the following time being a timeout // https://github.com/serai-dex/serai/issues/340 pub(crate) async fn wait_for_tributary() { - tokio::time::sleep(Duration::from_secs(20)).await; + tokio::time::sleep(Duration::from_secs(15)).await; if std::env::var("GITHUB_CI") == Ok("true".to_string()) { - tokio::time::sleep(Duration::from_secs(40)).await; + tokio::time::sleep(Duration::from_secs(6)).await; } } diff --git a/tests/processor/src/tests/batch.rs b/tests/processor/src/tests/batch.rs index 9c678b98..459f42ee 100644 --- a/tests/processor/src/tests/batch.rs +++ b/tests/processor/src/tests/batch.rs @@ -7,7 +7,6 @@ use dkg::{Participant, tests::clone_without}; use messages::{coordinator::*, SubstrateContext}; -use scale::Encode; use serai_client::{ primitives::{ BlockHash, Amount, Balance, crypto::RuntimePublic, PublicKey, SeraiAddress, NetworkId, @@ -28,11 +27,7 @@ pub(crate) async fn recv_batch_preprocesses( batch: &Batch, attempt: u32, ) -> (SubstrateSignId, HashMap) { - let id = SubstrateSignId { - session, - id: SubstrateSignableId::Batch((batch.network, batch.id).encode().try_into().unwrap()), - attempt, - }; + let id = SubstrateSignId { session, id: SubstrateSignableId::Batch(batch.id), attempt }; let mut block = None; let mut preprocesses = HashMap::new();