From 96f1d26f7a38c24270066e720efcddf2e53ce1eb Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Wed, 15 Nov 2023 16:57:21 -0500 Subject: [PATCH] Add a cosigning protocol to ensure finalizations are unique (#433) * Add a function to deterministically decide which Serai blocks should be co-signed Has a 5 minute latency between co-signs, also used as the maximal latency before a co-sign is started. * Get all active tributaries we're in at a specific block * Add and route CosignSubstrateBlock, a new provided TX * Split queued cosigns per network * Rename BatchSignId to SubstrateSignId * Add SubstrateSignableId, a meta-type for either Batch or Block, and modularize around it * Handle the CosignSubstrateBlock provided TX * Revert substrate_signer.rs to develop (and patch to still work) Due to SubstrateSigner moving when the prior multisig closes, yet cosigning occurring with the most recent key, a single SubstrateSigner can be reused. We could manage multiple SubstrateSigners, yet considering the much lower specifications for cosigning, I'd rather treat it distinctly. * Route cosigning through the processor * Add note to rename SubstrateSigner post-PR I don't want to do so now in order to preserve the diff's clarity. * Implement cosign evaluation into the coordinator * Get tests to compile * Bug fixes, mark blocks without cosigners available as cosigned * Correct the ID Batch preprocesses are saved under, add log statements * Create a dedicated function to handle cosigns * Correct the flow around Batch verification/queueing Verifying `Batch`s could stall when a `Batch` was signed before its predecessors/before the block it's contained in was cosigned (the latter being inevitable as we can't sign a block containing a signed batch before signing the batch). Now, Batch verification happens on a distinct async task in order to not block the handling of processor messages. This task is the sole caller of verify in order to ensure last_verified_batch isn't unexpectedly mutated. When the processor message handler needs to access it, or needs to queue a Batch, it associates the DB TXN with a lock preventing the other task from doing so. This lock, as currently implemented, is a poor and inefficient design. It should be modified to the pattern used for cosign management. Additionally, a new primitive of a DB-backed channel may be immensely valuable. Fixes a standing potential deadlock and a deadlock introduced with the cosigning protocol. * Working full-stack tests After the last commit, this only required extending a timeout. * Replace "co-sign" with "cosign" to make finding text easier * Update the coordinator tests to support cosigning * Inline prior_batch calculation to prevent panic on rotation Noticed when doing a final review of the branch. --- coordinator/Cargo.toml | 1 + coordinator/src/cosign_evaluator.rs | 209 +++++++++ coordinator/src/main.rs | 421 ++++++++++++------ coordinator/src/p2p.rs | 28 ++ coordinator/src/substrate/db.rs | 91 +++- coordinator/src/substrate/mod.rs | 204 ++++++++- coordinator/src/tests/tributary/handle_p2p.rs | 8 +- coordinator/src/tests/tributary/mod.rs | 63 ++- coordinator/src/tests/tributary/sync.rs | 11 +- coordinator/src/tributary/db.rs | 11 +- coordinator/src/tributary/handle.rs | 64 ++- coordinator/src/tributary/mod.rs | 88 ++-- coordinator/src/tributary/nonce_decider.rs | 39 +- processor/messages/src/lib.rs | 63 ++- processor/src/cosigner.rs | 286 ++++++++++++ processor/src/key_gen.rs | 23 +- processor/src/main.rs | 85 +++- processor/src/substrate_signer.rs | 169 ++++--- processor/src/tests/cosigner.rs | 126 ++++++ processor/src/tests/mod.rs | 1 + processor/src/tests/substrate_signer.rs | 12 +- substrate/client/src/serai/validator_sets.rs | 7 + tests/coordinator/src/lib.rs | 8 +- tests/coordinator/src/tests/batch.rs | 33 +- tests/coordinator/src/tests/cosign.rs | 172 +++++++ tests/coordinator/src/tests/mod.rs | 3 + tests/coordinator/src/tests/sign.rs | 6 +- tests/full-stack/src/tests/mint_and_burn.rs | 2 +- tests/processor/src/tests/batch.rs | 14 +- 29 files changed, 1900 insertions(+), 348 deletions(-) create mode 100644 coordinator/src/cosign_evaluator.rs create mode 100644 processor/src/cosigner.rs create mode 100644 processor/src/tests/cosigner.rs create mode 100644 tests/coordinator/src/tests/cosign.rs diff --git a/coordinator/Cargo.toml b/coordinator/Cargo.toml index 6b938895..b5bda710 100644 --- a/coordinator/Cargo.toml +++ b/coordinator/Cargo.toml @@ -37,6 +37,7 @@ processor-messages = { package = "serai-processor-messages", path = "../processo message-queue = { package = "serai-message-queue", path = "../message-queue" } tributary = { package = "tributary-chain", path = "./tributary" } +sp-application-crypto = { git = "https://github.com/serai-dex/substrate", default-features = false, features = ["std"] } serai-client = { path = "../substrate/client", default-features = false, features = ["serai"] } hex = { version = "0.4", default-features = false, features = ["std"] } diff --git a/coordinator/src/cosign_evaluator.rs b/coordinator/src/cosign_evaluator.rs new file mode 100644 index 00000000..60b0cbe1 --- /dev/null +++ b/coordinator/src/cosign_evaluator.rs @@ -0,0 +1,209 @@ +use core::time::Duration; +use std::{ + sync::{Arc, Mutex, RwLock}, + collections::{HashSet, HashMap}, +}; + +use tokio::{sync::mpsc, time::sleep}; + +use scale::Encode; +use sp_application_crypto::RuntimePublic; +use serai_client::{ + primitives::{NETWORKS, NetworkId, Signature}, + validator_sets::primitives::{Session, ValidatorSet}, + SeraiError, Serai, +}; + +use serai_db::{DbTxn, Db}; + +use processor_messages::coordinator::cosign_block_msg; + +use crate::{ + p2p::{CosignedBlock, P2pMessageKind, P2p}, + substrate::SubstrateDb, +}; + +pub struct CosignEvaluator { + db: Mutex, + serai: Arc, + stakes: RwLock>>, + latest_cosigns: RwLock>, +} + +impl CosignEvaluator { + fn update_latest_cosign(&self) { + let stakes_lock = self.stakes.read().unwrap(); + // If we haven't gotten the stake data yet, return + let Some(stakes) = stakes_lock.as_ref() else { return }; + + let total_stake = stakes.values().cloned().sum::(); + + let latest_cosigns = self.latest_cosigns.read().unwrap(); + let mut highest_block = 0; + for (block_num, _) in latest_cosigns.values() { + let mut networks = HashSet::new(); + for (network, (sub_block_num, _)) in &*latest_cosigns { + if sub_block_num >= block_num { + networks.insert(network); + } + } + let sum_stake = + networks.into_iter().map(|network| stakes.get(network).unwrap_or(&0)).sum::(); + let needed_stake = ((total_stake * 2) / 3) + 1; + if (total_stake == 0) || (sum_stake > needed_stake) { + highest_block = highest_block.max(*block_num); + } + } + + let mut db_lock = self.db.lock().unwrap(); + let mut txn = db_lock.txn(); + if highest_block > SubstrateDb::::latest_cosigned_block(&txn) { + log::info!("setting latest cosigned block to {}", highest_block); + SubstrateDb::::set_latest_cosigned_block(&mut txn, highest_block); + } + txn.commit(); + } + + async fn update_stakes(&self) -> Result<(), SeraiError> { + let serai = self.serai.as_of(self.serai.latest_block_hash().await?); + + let mut stakes = HashMap::new(); + for network in NETWORKS { + // Use if this network has published a Batch for a short-circuit of if they've ever set a key + let set_key = serai.in_instructions().last_batch_for_network(network).await?.is_some(); + if set_key { + stakes.insert( + network, + serai + .validator_sets() + .total_allocated_stake(network) + .await? + .expect("network which published a batch didn't have a stake set") + .0, + ); + } + } + + // Since we've successfully built stakes, set it + *self.stakes.write().unwrap() = Some(stakes); + + self.update_latest_cosign(); + + Ok(()) + } + + // Uses Err to signify a message should be retried + async fn handle_new_cosign(&self, cosign: CosignedBlock) -> Result<(), SeraiError> { + let Some(block) = self.serai.block(cosign.block).await? else { + log::warn!("received cosign for an unknown block"); + return Ok(()); + }; + + // If this an old cosign, don't bother handling it + if block.number() < + self.latest_cosigns.read().unwrap().get(&cosign.network).map(|cosign| cosign.0).unwrap_or(0) + { + log::debug!("received old cosign from {:?}", cosign.network); + return Ok(()); + } + + // Get the key for this network as of the prior block + let serai = self.serai.as_of(block.header().parent_hash.into()); + + let Some(latest_session) = serai.validator_sets().session(cosign.network).await? else { + log::warn!("received cosign from {:?}, which doesn't yet have a session", cosign.network); + return Ok(()); + }; + let prior_session = Session(latest_session.0.saturating_sub(1)); + let set_with_keys = if serai + .validator_sets() + .keys(ValidatorSet { network: cosign.network, session: prior_session }) + .await? + .is_some() + { + ValidatorSet { network: cosign.network, session: prior_session } + } else { + ValidatorSet { network: cosign.network, session: latest_session } + }; + + let Some(keys) = serai.validator_sets().keys(set_with_keys).await? else { + log::warn!("received cosign for a block we didn't have keys for"); + return Ok(()); + }; + + if !keys.0.verify(&cosign_block_msg(cosign.block), &Signature(cosign.signature)) { + log::warn!("received cosigned block with an invalid signature"); + return Ok(()); + } + + log::info!("received cosign for block {} by {:?}", block.number(), cosign.network); + self.latest_cosigns.write().unwrap().insert(cosign.network, (block.number(), cosign)); + + self.update_latest_cosign(); + + Ok(()) + } + + #[allow(clippy::new_ret_no_self)] + pub fn new(db: D, p2p: P, serai: Arc) -> mpsc::UnboundedSender { + let evaluator = Arc::new(Self { + db: Mutex::new(db), + serai, + stakes: RwLock::new(None), + latest_cosigns: RwLock::new(HashMap::new()), + }); + + // Spawn a task to update stakes regularly + tokio::spawn({ + let evaluator = evaluator.clone(); + async move { + loop { + // Run this until it passes + while evaluator.update_stakes().await.is_err() { + log::warn!("couldn't update stakes in the cosign evaluator"); + // Try again in 10 seconds + sleep(Duration::from_secs(10)).await; + } + // Run it every 10 minutes as we don't need the exact stake data for this to be valid + sleep(Duration::from_secs(10 * 60)).await; + } + } + }); + + // Spawn a task to receive cosigns and handle them + let (send, mut recv) = mpsc::unbounded_channel(); + tokio::spawn({ + let evaluator = evaluator.clone(); + async move { + while let Some(msg) = recv.recv().await { + while evaluator.handle_new_cosign(msg).await.is_err() { + // Try again in 10 seconds + sleep(Duration::from_secs(10)).await; + } + } + } + }); + + // Spawn a task to rebroadcast the most recent cosigns + tokio::spawn({ + async move { + loop { + let cosigns = evaluator + .latest_cosigns + .read() + .unwrap() + .values() + .map(|cosign| cosign.1) + .collect::>(); + for cosign in cosigns { + P2p::broadcast(&p2p, P2pMessageKind::CosignedBlock, cosign.encode()).await; + } + sleep(Duration::from_secs(60)).await; + } + } + }); + + // Return the channel to send cosigns + send + } +} diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 64c941d1..be549d77 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -1,6 +1,6 @@ use core::ops::Deref; use std::{ - sync::Arc, + sync::{OnceLock, Arc}, time::Duration, collections::{VecDeque, HashSet, HashMap}, }; @@ -18,6 +18,7 @@ use frost::Participant; use serai_db::{DbTxn, Db}; use serai_env as env; +use scale::Encode; use serai_client::{ primitives::NetworkId, validator_sets::primitives::{Session, ValidatorSet}, @@ -27,7 +28,7 @@ use serai_client::{ use message_queue::{Service, client::MessageQueue}; use tokio::{ - sync::{RwLock, mpsc, broadcast}, + sync::{Mutex, RwLock, mpsc, broadcast}, time::sleep, }; @@ -46,13 +47,20 @@ use db::MainDb; mod p2p; pub use p2p::*; -use processor_messages::{key_gen, sign, coordinator, ProcessorMessage}; +use processor_messages::{ + key_gen, sign, + coordinator::{self, SubstrateSignableId}, + ProcessorMessage, +}; pub mod processors; use processors::Processors; mod substrate; -use substrate::SubstrateDb; +use substrate::{CosignTransactions, SubstrateDb}; + +mod cosign_evaluator; +use cosign_evaluator::CosignEvaluator; #[cfg(test)] pub mod tests; @@ -162,10 +170,16 @@ async fn publish_signed_transaction( } } +// TODO: Find a better pattern for this +static HANDOVER_VERIFY_QUEUE_LOCK: OnceLock> = OnceLock::new(); + +#[allow(clippy::too_many_arguments)] async fn handle_processor_message( db: &mut D, key: &Zeroizing<::F>, serai: &Serai, + p2p: &P, + cosign_channel: &mpsc::UnboundedSender, tributaries: &HashMap>, network: NetworkId, msg: &processors::Message, @@ -174,6 +188,7 @@ async fn handle_processor_message( return true; } + let _hvq_lock = HANDOVER_VERIFY_QUEUE_LOCK.get_or_init(|| Mutex::new(())).lock().await; let mut txn = db.txn(); let mut relevant_tributary = match &msg.msg { @@ -270,12 +285,29 @@ async fn handle_processor_message( coordinator::ProcessorMessage::InvalidParticipant { id, .. } => { Some(SubstrateDb::::session_for_key(&txn, &id.key).unwrap()) } + coordinator::ProcessorMessage::CosignPreprocess { id, .. } => { + Some(SubstrateDb::::session_for_key(&txn, &id.key).unwrap()) + } coordinator::ProcessorMessage::BatchPreprocess { id, .. } => { Some(SubstrateDb::::session_for_key(&txn, &id.key).unwrap()) } - coordinator::ProcessorMessage::BatchShare { id, .. } => { + coordinator::ProcessorMessage::SubstrateShare { id, .. } => { Some(SubstrateDb::::session_for_key(&txn, &id.key).unwrap()) } + coordinator::ProcessorMessage::CosignedBlock { block, signature } => { + let cosigned_block = CosignedBlock { + network, + block: *block, + signature: { + let mut arr = [0; 64]; + arr.copy_from_slice(signature); + arr + }, + }; + cosign_channel.send(cosigned_block).unwrap(); + P2p::broadcast(p2p, P2pMessageKind::CosignedBlock, cosigned_block.encode()).await; + None + } }, // These don't return a relevant Tributary as there's no Tributary with action expected ProcessorMessage::Substrate(inner_msg) => match inner_msg { @@ -284,20 +316,7 @@ async fn handle_processor_message( batch.network, msg.network, "processor sent us a batch for a different network than it was for", ); - let this_batch_id = batch.id; MainDb::::save_expected_batch(&mut txn, batch); - - // Re-define batch - // We can't drop it, yet it shouldn't be accidentally used in the following block - #[allow(clippy::let_unit_value, unused_variables)] - let batch = (); - - // This won't be complete, as this call is when a `Batch` message is received, which - // will be before we get a `SignedBatch` - // It is, however, incremental - // When we need a complete version, we use another call, continuously called as-needed - substrate::verify_published_batches::(&mut txn, msg.network, this_batch_id).await; - None } // If this is a new Batch, immediately publish it (if we can) @@ -323,8 +342,6 @@ async fn handle_processor_message( next += 1; } - let start_id = batches.front().map(|batch| batch.batch.id); - let last_id = batches.back().map(|batch| batch.batch.id); while let Some(batch) = batches.pop_front() { // If this Batch should no longer be published, continue if substrate::get_expected_next_batch(serai, network).await > batch.batch.id { @@ -357,40 +374,8 @@ async fn handle_processor_message( sleep(Duration::from_secs(5)).await; } } - // Verify the `Batch`s we just published - if let Some(last_id) = last_id { - loop { - let verified = - substrate::verify_published_batches::(&mut txn, msg.network, last_id).await; - if verified == Some(last_id) { - break; - } - } - } - // Check if any of these `Batch`s were a handover `Batch` - // If so, we need to publish any delayed `Batch` provided transactions - let mut relevant = None; - if let Some(start_id) = start_id { - let last_id = last_id.unwrap(); - for batch in start_id .. last_id { - if let Some(set) = MainDb::::is_handover_batch(&txn, msg.network, batch) { - // relevant may already be Some. This is a safe over-write, as we don't need to - // be concerned for handovers of Tributaries which have completed their handovers - // While this does bypass the checks that Tributary would've performed at the - // time, if we ever actually participate in a handover, we will verify *all* - // prior `Batch`s, including the ones which would've been explicitly verified - // then - // - // We should only declare this session relevant if it's relevant to us - // We only set handover `Batch`s when we're trying to produce said `Batch`, so this - // would be a `Batch` we were involved in the production of - // Accordingly, iy's relevant - relevant = Some(set.session); - } - } - } - relevant + None } }, }; @@ -598,10 +583,18 @@ async fn handle_processor_message( // slash) and censor transactions (yet don't explicitly ban) vec![] } + coordinator::ProcessorMessage::CosignPreprocess { id, preprocesses } => { + vec![Transaction::SubstratePreprocess(SignData { + plan: id.id, + attempt: id.attempt, + data: preprocesses, + signed: Transaction::empty_signed(), + })] + } coordinator::ProcessorMessage::BatchPreprocess { id, block, preprocesses } => { log::info!( "informed of batch (sign ID {}, attempt {}) for block {}", - hex::encode(id.id), + hex::encode(id.id.encode()), id.attempt, hex::encode(block), ); @@ -613,69 +606,79 @@ async fn handle_processor_message( &mut txn, spec.set().network, RecognizedIdType::Batch, - &id.id, + &{ + let SubstrateSignableId::Batch(id) = id.id else { + panic!("BatchPreprocess SubstrateSignableId wasn't Batch") + }; + id.encode() + }, preprocesses, ); + let intended = Transaction::Batch( + block.0, + match id.id { + SubstrateSignableId::Batch(id) => id, + _ => panic!("BatchPreprocess did not contain Batch ID"), + }, + ); + // If this is the new key's first Batch, only create this TX once we verify all // all prior published `Batch`s + // TODO: This assumes BatchPreprocess is immediately after Batch + // Ensure that assumption let last_received = MainDb::::last_received_batch(&txn, msg.network).unwrap(); let handover_batch = MainDb::::handover_batch(&txn, spec.set()); - if handover_batch.is_none() { - MainDb::::set_handover_batch(&mut txn, spec.set(), last_received); - if last_received != 0 { - // Decrease by 1, to get the ID of the Batch prior to this Batch - let prior_sets_last_batch = last_received - 1; - // TODO: If we're looping here, we're not handling the messages we need to in order - // to create the Batch we're looking for - // Don't have the processor yield the handover batch untill the batch before is - // acknowledged on-chain? - loop { - let successfully_verified = substrate::verify_published_batches::( - &mut txn, - msg.network, - prior_sets_last_batch, - ) - .await; - if successfully_verified == Some(prior_sets_last_batch) { - break; + let mut queue = false; + if let Some(handover_batch) = handover_batch { + // There is a race condition here. We may verify all `Batch`s from the prior set, + // start signing the handover `Batch` `n`, start signing `n+1`, have `n+1` signed + // before `n` (or at the same time), yet then the prior set forges a malicious + // `Batch` `n`. + // + // The malicious `Batch` `n` would be publishable to Serai, as Serai can't + // distinguish what's intended to be a handover `Batch`, yet then anyone could + // publish the new set's `n+1`, causing their acceptance of the handover. + // + // To fix this, if this is after the handover `Batch` and we have yet to verify + // publication of the handover `Batch`, don't yet yield the provided. + if last_received > handover_batch { + if let Some(last_verified) = MainDb::::last_verified_batch(&txn, msg.network) { + if last_verified < handover_batch { + queue = true; } - sleep(Duration::from_secs(5)).await; + } else { + queue = true; } } + } else { + MainDb::::set_handover_batch(&mut txn, spec.set(), last_received); + // If this isn't the first batch, meaning we do have to verify all prior batches, and + // the prior Batch hasn't been verified yet... + if (last_received != 0) && + MainDb::::last_verified_batch(&txn, msg.network) + .map(|last_verified| last_verified < (last_received - 1)) + .unwrap_or(true) + { + // Withhold this TX until we verify all prior `Batch`s + queue = true; + } } - // There is a race condition here. We may verify all `Batch`s from the prior set, - // start signing the handover `Batch` `n`, start signing `n+1`, have `n+1` signed - // before `n` (or at the same time), yet then the prior set forges a malicious - // `Batch` `n`. - // - // The malicious `Batch` `n` would be publishable to Serai, as Serai can't - // distinguish what's intended to be a handover `Batch`, yet then anyone could - // publish the new set's `n+1`, causing their acceptance of the handover. - // - // To fix this, if this is after the handover `Batch` and we have yet to verify - // publication of the handover `Batch`, don't yet yield the provided. - let handover_batch = MainDb::::handover_batch(&txn, spec.set()).unwrap(); - let intended = Transaction::Batch(block.0, id.id); - let mut res = vec![intended.clone()]; - if last_received > handover_batch { - if let Some(last_verified) = MainDb::::last_verified_batch(&txn, msg.network) { - if last_verified < handover_batch { - res = vec![]; - } - } else { - res = vec![]; - } - } - - if res.is_empty() { + if queue { MainDb::::queue_batch(&mut txn, spec.set(), intended); + vec![] + } else { + // Because this is post-verification of the handover batch, take all queued `Batch`s + // now to ensure we don't provide this before an already queued Batch + // This *may* be an unreachable case due to how last_verified_batch is set, yet it + // doesn't hurt to have as a defensive pattern + let mut res = MainDb::::take_queued_batches(&mut txn, spec.set()); + res.push(intended); + res } - - res } else { - vec![Transaction::BatchPreprocess(SignData { + vec![Transaction::SubstratePreprocess(SignData { plan: id.id, attempt: id.attempt, data: preprocesses, @@ -683,24 +686,19 @@ async fn handle_processor_message( })] } } - coordinator::ProcessorMessage::BatchShare { id, shares } => { - vec![Transaction::BatchShare(SignData { + coordinator::ProcessorMessage::SubstrateShare { id, shares } => { + vec![Transaction::SubstrateShare(SignData { plan: id.id, attempt: id.attempt, data: shares.into_iter().map(|share| share.to_vec()).collect(), signed: Transaction::empty_signed(), })] } + coordinator::ProcessorMessage::CosignedBlock { .. } => unreachable!(), }, ProcessorMessage::Substrate(inner_msg) => match inner_msg { processor_messages::substrate::ProcessorMessage::Batch { .. } => unreachable!(), - processor_messages::substrate::ProcessorMessage::SignedBatch { .. } => { - // We only reach here if this SignedBatch triggered the publication of a handover - // Batch - // Since the handover `Batch` was successfully published and verified, we no longer - // have to worry about the above n+1 attack - MainDb::::take_queued_batches(&mut txn, spec.set()) - } + processor_messages::substrate::ProcessorMessage::SignedBatch { .. } => unreachable!(), }, }; @@ -766,11 +764,14 @@ async fn handle_processor_message( true } +#[allow(clippy::too_many_arguments)] async fn handle_processor_messages( mut db: D, key: Zeroizing<::F>, serai: Arc, mut processors: Pro, + p2p: P, + cosign_channel: mpsc::UnboundedSender, network: NetworkId, mut tributary_event: mpsc::UnboundedReceiver>, ) { @@ -794,10 +795,154 @@ async fn handle_processor_messages( } // TODO: Check this ID is sane (last handled ID or expected next ID) - let msg = processors.recv(network).await; - if handle_processor_message(&mut db, &key, &serai, &tributaries, network, &msg).await { + let Ok(msg) = tokio::time::timeout(Duration::from_secs(1), processors.recv(network)).await + else { + continue; + }; + log::trace!("entering handle_processor_message for {:?}", network); + if handle_processor_message( + &mut db, + &key, + &serai, + &p2p, + &cosign_channel, + &tributaries, + network, + &msg, + ) + .await + { processors.ack(msg).await; } + log::trace!("exited handle_processor_message for {:?}", network); + } +} + +#[allow(clippy::too_many_arguments)] +async fn handle_cosigns_and_batch_publication( + mut db: D, + network: NetworkId, + mut tributary_event: mpsc::UnboundedReceiver>, +) { + let mut tributaries = HashMap::new(); + 'outer: loop { + // TODO: Create a better async flow for this, as this does still hammer this task + tokio::task::yield_now().await; + + match tributary_event.try_recv() { + Ok(event) => match event { + TributaryEvent::NewTributary(tributary) => { + let set = tributary.spec.set(); + assert_eq!(set.network, network); + tributaries.insert(set.session, tributary); + } + TributaryEvent::TributaryRetired(set) => { + tributaries.remove(&set.session); + } + }, + Err(mpsc::error::TryRecvError::Empty) => {} + Err(mpsc::error::TryRecvError::Disconnected) => { + panic!("handle_processor_messages tributary_event sender closed") + } + } + + // Handle pending cosigns + while let Some((session, block, hash)) = CosignTransactions::peek_cosign(&db, network) { + let Some(ActiveTributary { spec, tributary }) = tributaries.get(&session) else { + log::warn!("didn't yet have tributary we're supposed to cosign with"); + break; + }; + log::info!( + "{network:?} {session:?} cosigning block #{block} (hash {}...)", + hex::encode(&hash[.. 8]) + ); + let tx = Transaction::CosignSubstrateBlock(hash); + let res = tributary.provide_transaction(tx.clone()).await; + if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) { + if res == Err(ProvidedError::LocalMismatchesOnChain) { + // Spin, since this is a crit for this Tributary + loop { + log::error!( + "{}. tributary: {}, provided: {:?}", + "tributary added distinct CosignSubstrateBlock", + hex::encode(spec.genesis()), + &tx, + ); + sleep(Duration::from_secs(60)).await; + } + } + panic!("provided an invalid CosignSubstrateBlock: {res:?}"); + } + CosignTransactions::take_cosign(db.txn(), network); + } + + // Verify any publifshed `Batch`s + { + let _hvq_lock = HANDOVER_VERIFY_QUEUE_LOCK.get_or_init(|| Mutex::new(())).lock().await; + let mut txn = db.txn(); + let mut to_publish = vec![]; + let start_id = MainDb::::last_verified_batch(&txn, network) + .map(|already_verified| already_verified + 1) + .unwrap_or(0); + if let Some(last_id) = + substrate::verify_published_batches::(&mut txn, network, u32::MAX).await + { + // Check if any of these `Batch`s were a handover `Batch` or the `Batch` before a handover + // `Batch` + // If so, we need to publish queued provided `Batch` transactions + for batch in start_id ..= last_id { + let is_pre_handover = MainDb::::is_handover_batch(&txn, network, batch + 1); + if let Some(set) = is_pre_handover { + let mut queued = MainDb::::take_queued_batches(&mut txn, set); + // is_handover_batch is only set for handover `Batch`s we're participating in, making + // this safe + if queued.is_empty() { + panic!("knew the next Batch was a handover yet didn't queue it"); + } + + // Only publish the handover Batch + to_publish.push((set.session, queued.remove(0))); + // Re-queue the remaining batches + for remaining in queued { + MainDb::::queue_batch(&mut txn, set, remaining); + } + } + + let is_handover = MainDb::::is_handover_batch(&txn, network, batch); + if let Some(set) = is_handover { + for queued in MainDb::::take_queued_batches(&mut txn, set) { + to_publish.push((set.session, queued)); + } + } + } + } + + for (session, tx) in to_publish { + let Some(ActiveTributary { spec, tributary }) = tributaries.get(&session) else { + log::warn!("didn't yet have tributary we're supposed to provide a queued Batch for"); + // Safe since this will drop the txn updating the most recently queued batch + continue 'outer; + }; + let res = tributary.provide_transaction(tx.clone()).await; + if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) { + if res == Err(ProvidedError::LocalMismatchesOnChain) { + // Spin, since this is a crit for this Tributary + loop { + log::error!( + "{}. tributary: {}, provided: {:?}", + "tributary added distinct Batch", + hex::encode(spec.genesis()), + &tx, + ); + sleep(Duration::from_secs(60)).await; + } + } + panic!("provided an invalid Batch: {res:?}"); + } + } + + txn.commit(); + } } } @@ -806,6 +951,8 @@ pub async fn handle_processors( key: Zeroizing<::F>, serai: Arc, processors: Pro, + p2p: P, + cosign_channel: mpsc::UnboundedSender, mut tributary_event: broadcast::Receiver>, ) { let mut channels = HashMap::new(); @@ -813,26 +960,34 @@ pub async fn handle_processors( if network == NetworkId::Serai { continue; } - let (send, recv) = mpsc::unbounded_channel(); + let (processor_send, processor_recv) = mpsc::unbounded_channel(); tokio::spawn(handle_processor_messages( db.clone(), key.clone(), serai.clone(), processors.clone(), + p2p.clone(), + cosign_channel.clone(), network, - recv, + processor_recv, )); - channels.insert(network, send); + let (cosign_send, cosign_recv) = mpsc::unbounded_channel(); + tokio::spawn(handle_cosigns_and_batch_publication(db.clone(), network, cosign_recv)); + channels.insert(network, (processor_send, cosign_send)); } // Listen to new tributary events loop { match tributary_event.recv().await.unwrap() { - TributaryEvent::NewTributary(tributary) => channels[&tributary.spec.set().network] - .send(TributaryEvent::NewTributary(tributary)) - .unwrap(), + TributaryEvent::NewTributary(tributary) => { + let (c1, c2) = &channels[&tributary.spec.set().network]; + c1.send(TributaryEvent::NewTributary(tributary.clone())).unwrap(); + c2.send(TributaryEvent::NewTributary(tributary)).unwrap(); + } TributaryEvent::TributaryRetired(set) => { - channels[&set.network].send(TributaryEvent::TributaryRetired(set)).unwrap() + let (c1, c2) = &channels[&set.network]; + c1.send(TributaryEvent::TributaryRetired(set)).unwrap(); + c2.send(TributaryEvent::TributaryRetired(set)).unwrap(); } }; } @@ -944,6 +1099,7 @@ pub async fn run( }); move |set: ValidatorSet, genesis, id_type, id: Vec, nonce| { + log::debug!("recognized ID {:?} {}", id_type, hex::encode(&id)); let mut raw_db = raw_db.clone(); let key = key.clone(); let tributaries = tributaries.clone(); @@ -956,6 +1112,7 @@ pub async fn run( loop { let Some(preprocess) = MainDb::::first_preprocess(raw_db, set.network, id_type, id) else { + log::warn!("waiting for preprocess for recognized ID"); sleep(Duration::from_millis(100)).await; continue; }; @@ -964,9 +1121,9 @@ pub async fn run( }; let mut tx = match id_type { - RecognizedIdType::Batch => Transaction::BatchPreprocess(SignData { + RecognizedIdType::Batch => Transaction::SubstratePreprocess(SignData { data: get_preprocess(&raw_db, id_type, &id).await, - plan: id.try_into().unwrap(), + plan: SubstrateSignableId::Batch(id.as_slice().try_into().unwrap()), attempt: 0, signed: Transaction::empty_signed(), }), @@ -1029,11 +1186,27 @@ pub async fn run( // in a while (presumably because we're behind) tokio::spawn(p2p::heartbeat_tributaries_task(p2p.clone(), tributary_event_listener_3)); + // Create the Cosign evaluator + let cosign_channel = CosignEvaluator::new(raw_db.clone(), p2p.clone(), serai.clone()); + // Handle P2P messages - tokio::spawn(p2p::handle_p2p_task(p2p, tributary_event_listener_4)); + tokio::spawn(p2p::handle_p2p_task( + p2p.clone(), + cosign_channel.clone(), + tributary_event_listener_4, + )); // Handle all messages from processors - handle_processors(raw_db, key, serai, processors, tributary_event_listener_5).await; + handle_processors( + raw_db, + key, + serai, + processors, + p2p, + cosign_channel, + tributary_event_listener_5, + ) + .await; } #[tokio::main] diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index 9dd1ff2f..a78c97a5 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -8,6 +8,9 @@ use std::{ use async_trait::async_trait; +use scale::{Encode, Decode}; +use serai_client::primitives::NetworkId; + use serai_db::Db; use tokio::{ @@ -37,12 +40,20 @@ use crate::{Transaction, Block, Tributary, ActiveTributary, TributaryEvent}; // TODO: Use distinct topics const LIBP2P_TOPIC: &str = "serai-coordinator"; +#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Encode, Decode)] +pub struct CosignedBlock { + pub network: NetworkId, + pub block: [u8; 32], + pub signature: [u8; 64], +} + #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] pub enum P2pMessageKind { KeepAlive, Tributary([u8; 32]), Heartbeat([u8; 32]), Block([u8; 32]), + CosignedBlock, } impl P2pMessageKind { @@ -64,6 +75,9 @@ impl P2pMessageKind { res.extend(genesis); res } + P2pMessageKind::CosignedBlock => { + vec![4] + } } } @@ -87,6 +101,7 @@ impl P2pMessageKind { reader.read_exact(&mut genesis).ok()?; P2pMessageKind::Block(genesis) }), + 4 => Some(P2pMessageKind::CosignedBlock), _ => None, } } @@ -122,6 +137,7 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p { P2pMessageKind::Tributary(genesis) => format!("Tributary({})", hex::encode(genesis)), P2pMessageKind::Heartbeat(genesis) => format!("Heartbeat({})", hex::encode(genesis)), P2pMessageKind::Block(genesis) => format!("Block({})", hex::encode(genesis)), + P2pMessageKind::CosignedBlock => "CosignedBlock".to_string(), } ); self.broadcast_raw(actual_msg).await; @@ -148,6 +164,7 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p { P2pMessageKind::Tributary(genesis) => format!("Tributary({})", hex::encode(genesis)), P2pMessageKind::Heartbeat(genesis) => format!("Heartbeat({})", hex::encode(genesis)), P2pMessageKind::Block(genesis) => format!("Block({})", hex::encode(genesis)), + P2pMessageKind::CosignedBlock => "CosignedBlock".to_string(), } ); Message { sender, kind, msg } @@ -433,6 +450,7 @@ pub async fn heartbeat_tributaries_task( pub async fn handle_p2p_task( p2p: P, + cosign_channel: mpsc::UnboundedSender, mut tributary_event: broadcast::Receiver>, ) { let channels = Arc::new(RwLock::new(HashMap::<_, mpsc::UnboundedSender>>::new())); @@ -562,6 +580,8 @@ pub async fn handle_p2p_task( res ); } + + P2pMessageKind::CosignedBlock => unreachable!(), } } } @@ -596,6 +616,14 @@ pub async fn handle_p2p_task( channel.send(msg).unwrap(); } } + P2pMessageKind::CosignedBlock => { + let mut msg_ref: &[u8] = msg.msg.as_ref(); + let Ok(msg) = CosignedBlock::decode(&mut scale::IoReader(&mut msg_ref)) else { + log::error!("received CosignedBlock message with invalidly serialized contents"); + continue; + }; + cosign_channel.send(msg).unwrap(); + } } } } diff --git a/coordinator/src/substrate/db.rs b/coordinator/src/substrate/db.rs index 2f702bd0..437af37a 100644 --- a/coordinator/src/substrate/db.rs +++ b/coordinator/src/substrate/db.rs @@ -1,12 +1,79 @@ +use std::sync::{OnceLock, MutexGuard, Mutex}; + use scale::{Encode, Decode}; pub use serai_db::*; use serai_client::{ primitives::NetworkId, - validator_sets::primitives::{Session, KeyPair}, + validator_sets::primitives::{Session, ValidatorSet, KeyPair}, }; +create_db! { + NewSubstrateDb { + CosignTriggered: () -> (), + IntendedCosign: () -> (u64, Option), + BlockHasEvents: (block: u64) -> u8, + CosignTransactions: (network: NetworkId) -> Vec<(Session, u64, [u8; 32])> + } +} + +impl IntendedCosign { + pub fn set_intended_cosign(txn: &mut impl DbTxn, intended: u64) { + Self::set(txn, &(intended, None::)); + } + pub fn set_skipped_cosign(txn: &mut impl DbTxn, skipped: u64) { + let (intended, prior_skipped) = Self::get(txn).unwrap(); + assert!(prior_skipped.is_none()); + Self::set(txn, &(intended, Some(skipped))); + } +} + +// This guarantees: +// 1) Appended transactions are appended +// 2) Taking cosigns does not clear any TXs which weren't taken +// 3) Taking does actually clear the set +static COSIGN_LOCK: OnceLock> = OnceLock::new(); +pub struct CosignTxn(T, MutexGuard<'static, ()>); +impl CosignTxn { + pub fn new(txn: T) -> Self { + Self(txn, COSIGN_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap()) + } + pub fn commit(self) { + self.0.commit(); + } +} +impl CosignTransactions { + // Append a cosign transaction. + pub fn append_cosign( + txn: &mut CosignTxn, + set: ValidatorSet, + number: u64, + hash: [u8; 32], + ) { + #[allow(clippy::unwrap_or_default)] + let mut txs = CosignTransactions::get(&txn.0, set.network).unwrap_or(vec![]); + txs.push((set.session, number, hash)); + CosignTransactions::set(&mut txn.0, set.network, &txs); + } + // Peek at the next cosign transaction. + pub fn peek_cosign(getter: &impl Get, network: NetworkId) -> Option<(Session, u64, [u8; 32])> { + let mut to_cosign = CosignTransactions::get(getter, network)?; + if to_cosign.is_empty() { + None? + } + Some(to_cosign.swap_remove(0)) + } + // Take the next transaction, panicking if it doesn't exist. + pub fn take_cosign(mut txn: impl DbTxn, network: NetworkId) { + let _lock = COSIGN_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap(); + let mut txs = CosignTransactions::get(&txn, network).unwrap(); + txs.remove(0); + CosignTransactions::set(&mut txn, network, &txs); + txn.commit(); + } +} + #[derive(Debug)] pub struct SubstrateDb(pub D); impl SubstrateDb { @@ -18,16 +85,30 @@ impl SubstrateDb { D::key(b"coordinator_substrate", dst, key) } - fn block_key() -> Vec { - Self::substrate_key(b"block", []) + fn next_block_key() -> Vec { + Self::substrate_key(b"next_block", []) } pub fn set_next_block(&mut self, block: u64) { let mut txn = self.0.txn(); - txn.put(Self::block_key(), block.to_le_bytes()); + txn.put(Self::next_block_key(), block.to_le_bytes()); txn.commit(); } pub fn next_block(&self) -> u64 { - u64::from_le_bytes(self.0.get(Self::block_key()).unwrap_or(vec![0; 8]).try_into().unwrap()) + u64::from_le_bytes(self.0.get(Self::next_block_key()).unwrap_or(vec![0; 8]).try_into().unwrap()) + } + + fn latest_cosigned_block_key() -> Vec { + Self::substrate_key(b"latest_cosigned_block", []) + } + pub fn set_latest_cosigned_block(txn: &mut D::Transaction<'_>, latest_cosigned_block: u64) { + txn.put(Self::latest_cosigned_block_key(), latest_cosigned_block.to_le_bytes()); + } + pub fn latest_cosigned_block(getter: &G) -> u64 { + let db = u64::from_le_bytes( + getter.get(Self::latest_cosigned_block_key()).unwrap_or(vec![0; 8]).try_into().unwrap(), + ); + // Mark the genesis as cosigned + db.max(1) } fn event_key(id: &[u8], index: u32) -> Vec { diff --git a/coordinator/src/substrate/mod.rs b/coordinator/src/substrate/mod.rs index 78bdfd26..8a4cb186 100644 --- a/coordinator/src/substrate/mod.rs +++ b/coordinator/src/substrate/mod.rs @@ -8,11 +8,12 @@ use zeroize::Zeroizing; use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto}; +use scale::{Encode, Decode}; use serai_client::{ SeraiError, Block, Serai, TemporalSerai, primitives::{BlockHash, NetworkId}, validator_sets::{ - primitives::{ValidatorSet, KeyPair, amortize_excess_key_shares}, + primitives::{Session, ValidatorSet, KeyPair, amortize_excess_key_shares}, ValidatorSetsEvent, }, in_instructions::InInstructionsEvent, @@ -363,12 +364,191 @@ async fn handle_new_blocks( next_block: &mut u64, ) -> Result<(), SeraiError> { // Check if there's been a new Substrate block - let latest = serai.latest_block().await?; - let latest_number = latest.number(); + let latest_number = serai.latest_block().await?.number(); + + // TODO: If this block directly builds off a cosigned block *and* doesn't contain events, mark + // cosigned, + // TODO: Can we remove any of these events while maintaining security? + { + // If: + // A) This block has events and it's been at least X blocks since the last cosign or + // B) This block doesn't have events but it's been X blocks since a skipped block which did + // have events or + // C) This block key gens (which changes who the cosigners are) + // cosign this block. + const COSIGN_DISTANCE: u64 = 5 * 60 / 6; // 5 minutes, expressed in blocks + + #[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, Decode)] + enum HasEvents { + KeyGen, + Yes, + No, + } + async fn block_has_events( + txn: &mut impl DbTxn, + serai: &Serai, + block: u64, + ) -> Result { + let cached = BlockHasEvents::get(txn, block); + match cached { + None => { + let serai = serai.as_of( + serai + .block_by_number(block) + .await? + .expect("couldn't get block which should've been finalized") + .hash(), + ); + + if !serai.validator_sets().key_gen_events().await?.is_empty() { + return Ok(HasEvents::KeyGen); + } + + let has_no_events = serai.coins().burn_with_instruction_events().await?.is_empty() && + serai.in_instructions().batch_events().await?.is_empty() && + serai.validator_sets().new_set_events().await?.is_empty() && + serai.validator_sets().set_retired_events().await?.is_empty(); + + let has_events = if has_no_events { HasEvents::No } else { HasEvents::Yes }; + + let has_events = has_events.encode(); + assert_eq!(has_events.len(), 1); + BlockHasEvents::set(txn, block, &has_events[0]); + Ok(HasEvents::Yes) + } + Some(code) => Ok(HasEvents::decode(&mut [code].as_slice()).unwrap()), + } + } + + let mut txn = db.0.txn(); + let Some((last_intended_to_cosign_block, mut skipped_block)) = IntendedCosign::get(&txn) else { + IntendedCosign::set_intended_cosign(&mut txn, 1); + txn.commit(); + return Ok(()); + }; + + // If we haven't flagged skipped, and a block within the distance had events, flag the first + // such block as skipped + let mut distance_end_exclusive = last_intended_to_cosign_block + COSIGN_DISTANCE; + // If we've never triggered a cosign, don't skip any cosigns + if CosignTriggered::get(&txn).is_none() { + distance_end_exclusive = 0; + } + if skipped_block.is_none() { + for b in (last_intended_to_cosign_block + 1) .. distance_end_exclusive { + if b > latest_number { + break; + } + + if block_has_events(&mut txn, serai, b).await? == HasEvents::Yes { + skipped_block = Some(b); + log::debug!("skipping cosigning {b} due to proximity to prior cosign"); + IntendedCosign::set_skipped_cosign(&mut txn, b); + break; + } + } + } + + let mut has_no_cosigners = None; + let mut cosign = vec![]; + + // Block we should cosign no matter what if no prior blocks qualified for cosigning + let maximally_latent_cosign_block = + skipped_block.map(|skipped_block| skipped_block + COSIGN_DISTANCE); + for block in (last_intended_to_cosign_block + 1) ..= latest_number { + let mut set = false; + + let block_has_events = block_has_events(&mut txn, serai, block).await?; + // If this block is within the distance, + if block < distance_end_exclusive { + // and set a key, cosign it + if block_has_events == HasEvents::KeyGen { + IntendedCosign::set_intended_cosign(&mut txn, block); + set = true; + // Carry skipped if it isn't included by cosigning this block + if let Some(skipped) = skipped_block { + if skipped > block { + IntendedCosign::set_skipped_cosign(&mut txn, block); + } + } + } + } else if (Some(block) == maximally_latent_cosign_block) || + (block_has_events != HasEvents::No) + { + // Since this block was outside the distance and had events/was maximally latent, cosign it + IntendedCosign::set_intended_cosign(&mut txn, block); + set = true; + } + + if set { + // Get the keys as of the prior block + // That means if this block is setting new keys (which won't lock in until we process this + // block), we won't freeze up waiting for the yet-to-be-processed keys to sign this block + let actual_block = serai + .block_by_number(block) + .await? + .expect("couldn't get block which should've been finalized"); + let serai = serai.as_of(actual_block.header().parent_hash.into()); + + has_no_cosigners = Some(actual_block.clone()); + + for network in serai_client::primitives::NETWORKS { + // Get the latest session to have set keys + let Some(latest_session) = serai.validator_sets().session(network).await? else { + continue; + }; + let prior_session = Session(latest_session.0.saturating_sub(1)); + let set_with_keys = if serai + .validator_sets() + .keys(ValidatorSet { network, session: prior_session }) + .await? + .is_some() + { + ValidatorSet { network, session: prior_session } + } else { + let set = ValidatorSet { network, session: latest_session }; + if serai.validator_sets().keys(set).await?.is_none() { + continue; + } + set + }; + + // Since this is a valid cosigner, don't flag this block as having no cosigners + has_no_cosigners = None; + log::debug!("{:?} will be cosigning {block}", set_with_keys.network); + + if in_set(key, &serai, set_with_keys).await?.unwrap() { + cosign.push((set_with_keys, block, actual_block.hash())); + } + } + + break; + } + } + + // If this block doesn't have cosigners, yet does have events, automatically mark it as + // cosigned + if let Some(has_no_cosigners) = has_no_cosigners { + log::debug!("{} had no cosigners available, marking as cosigned", has_no_cosigners.number()); + SubstrateDb::::set_latest_cosigned_block(&mut txn, has_no_cosigners.number()); + txn.commit(); + } else { + CosignTriggered::set(&mut txn, &()); + let mut txn = CosignTxn::new(txn); + for (set, block, hash) in cosign { + log::debug!("cosigning {block} with {:?} {:?}", set.network, set.session); + CosignTransactions::append_cosign(&mut txn, set, block, hash); + } + txn.commit(); + } + } + + // Reduce to the latest cosigned block + let latest_number = latest_number.min(SubstrateDb::::latest_cosigned_block(&db.0)); + if latest_number < *next_block { return Ok(()); } - let mut latest = Some(latest); for b in *next_block ..= latest_number { log::info!("found substrate block {b}"); @@ -379,14 +559,10 @@ async fn handle_new_blocks( tributary_retired, processors, serai, - if b == latest_number { - latest.take().unwrap() - } else { - serai - .block_by_number(b) - .await? - .expect("couldn't get block before the latest finalized block") - }, + serai + .block_by_number(b) + .await? + .expect("couldn't get block before the latest finalized block"), ) .await?; *next_block += 1; @@ -495,7 +671,9 @@ pub(crate) async fn get_expected_next_batch(serai: &Serai, network: NetworkId) - /// Verifies `Batch`s which have already been indexed from Substrate. /// -/// This has a slight malleability in that doesn't verify *who* published a Batch is as expected. +/// Spins if a distinct `Batch` is detected on-chain. +/// +/// This has a slight malleability in that doesn't verify *who* published a `Batch` is as expected. /// This is deemed fine. pub(crate) async fn verify_published_batches( txn: &mut D::Transaction<'_>, diff --git a/coordinator/src/tests/tributary/handle_p2p.rs b/coordinator/src/tests/tributary/handle_p2p.rs index 3cbc686e..00ef6d34 100644 --- a/coordinator/src/tests/tributary/handle_p2p.rs +++ b/coordinator/src/tests/tributary/handle_p2p.rs @@ -3,7 +3,10 @@ use std::sync::Arc; use rand_core::OsRng; -use tokio::{sync::broadcast, time::sleep}; +use tokio::{ + sync::{mpsc, broadcast}, + time::sleep, +}; use serai_db::MemDb; @@ -32,7 +35,8 @@ async fn handle_p2p_test() { let tributary = Arc::new(tributary); tributary_arcs.push(tributary.clone()); let (new_tributary_send, new_tributary_recv) = broadcast::channel(5); - tokio::spawn(handle_p2p_task(p2p, new_tributary_recv)); + let (cosign_send, _) = mpsc::unbounded_channel(); + tokio::spawn(handle_p2p_task(p2p, cosign_send, new_tributary_recv)); new_tributary_send .send(TributaryEvent::NewTributary(ActiveTributary { spec: spec.clone(), tributary })) .map_err(|_| "failed to send ActiveTributary") diff --git a/coordinator/src/tests/tributary/mod.rs b/coordinator/src/tests/tributary/mod.rs index cec3689b..c17db906 100644 --- a/coordinator/src/tests/tributary/mod.rs +++ b/coordinator/src/tests/tributary/mod.rs @@ -2,6 +2,9 @@ use core::fmt::Debug; use rand_core::{RngCore, OsRng}; +use scale::{Encode, Decode}; +use processor_messages::coordinator::SubstrateSignableId; + use tributary::{ReadWrite, tests::random_signed}; use crate::tributary::{SignData, Transaction}; @@ -28,10 +31,10 @@ fn random_vec(rng: &mut R, limit: usize) -> Vec { res } -fn random_sign_data(rng: &mut R) -> SignData { - let mut plan = [0; N]; - rng.fill_bytes(&mut plan); - +fn random_sign_data( + rng: &mut R, + plan: Id, +) -> SignData { SignData { plan, attempt: random_u32(&mut OsRng), @@ -80,10 +83,18 @@ fn tx_size_limit() { #[test] fn serialize_sign_data() { - test_read_write(random_sign_data::<_, 3>(&mut OsRng)); - test_read_write(random_sign_data::<_, 8>(&mut OsRng)); - test_read_write(random_sign_data::<_, 16>(&mut OsRng)); - test_read_write(random_sign_data::<_, 24>(&mut OsRng)); + let mut plan = [0; 3]; + OsRng.fill_bytes(&mut plan); + test_read_write(random_sign_data::<_, _>(&mut OsRng, plan)); + let mut plan = [0; 5]; + OsRng.fill_bytes(&mut plan); + test_read_write(random_sign_data::<_, _>(&mut OsRng, plan)); + let mut plan = [0; 8]; + OsRng.fill_bytes(&mut plan); + test_read_write(random_sign_data::<_, _>(&mut OsRng, plan)); + let mut plan = [0; 24]; + OsRng.fill_bytes(&mut plan); + test_read_write(random_sign_data::<_, _>(&mut OsRng, plan)); } #[test] @@ -168,6 +179,12 @@ fn serialize_transaction() { random_signed(&mut OsRng), )); + { + let mut block = [0; 32]; + OsRng.fill_bytes(&mut block); + test_read_write(Transaction::CosignSubstrateBlock(block)); + } + { let mut block = [0; 32]; OsRng.fill_bytes(&mut block); @@ -177,11 +194,33 @@ fn serialize_transaction() { } test_read_write(Transaction::SubstrateBlock(OsRng.next_u64())); - test_read_write(Transaction::BatchPreprocess(random_sign_data(&mut OsRng))); - test_read_write(Transaction::BatchShare(random_sign_data(&mut OsRng))); + { + let mut plan = [0; 5]; + OsRng.fill_bytes(&mut plan); + test_read_write(Transaction::SubstratePreprocess(random_sign_data( + &mut OsRng, + SubstrateSignableId::Batch(plan), + ))); + } + { + let mut plan = [0; 5]; + OsRng.fill_bytes(&mut plan); + test_read_write(Transaction::SubstrateShare(random_sign_data( + &mut OsRng, + SubstrateSignableId::Batch(plan), + ))); + } - test_read_write(Transaction::SignPreprocess(random_sign_data(&mut OsRng))); - test_read_write(Transaction::SignShare(random_sign_data(&mut OsRng))); + { + let mut plan = [0; 32]; + OsRng.fill_bytes(&mut plan); + test_read_write(Transaction::SignPreprocess(random_sign_data(&mut OsRng, plan))); + } + { + let mut plan = [0; 32]; + OsRng.fill_bytes(&mut plan); + test_read_write(Transaction::SignShare(random_sign_data(&mut OsRng, plan))); + } { let mut plan = [0; 32]; diff --git a/coordinator/src/tests/tributary/sync.rs b/coordinator/src/tests/tributary/sync.rs index 1ff2d6b1..1267368f 100644 --- a/coordinator/src/tests/tributary/sync.rs +++ b/coordinator/src/tests/tributary/sync.rs @@ -5,7 +5,10 @@ use rand_core::OsRng; use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto}; -use tokio::{sync::broadcast, time::sleep}; +use tokio::{ + sync::{mpsc, broadcast}, + time::sleep, +}; use serai_db::MemDb; @@ -42,7 +45,8 @@ async fn sync_test() { let tributary = Arc::new(tributary); tributary_arcs.push(tributary.clone()); let (new_tributary_send, new_tributary_recv) = broadcast::channel(5); - let thread = tokio::spawn(handle_p2p_task(p2p, new_tributary_recv)); + let (cosign_send, _) = mpsc::unbounded_channel(); + let thread = tokio::spawn(handle_p2p_task(p2p, cosign_send, new_tributary_recv)); new_tributary_send .send(TributaryEvent::NewTributary(ActiveTributary { spec: spec.clone(), tributary })) .map_err(|_| "failed to send ActiveTributary") @@ -77,7 +81,8 @@ async fn sync_test() { let syncer_key = Ristretto::generator() * *syncer_key; let syncer_tributary = Arc::new(syncer_tributary); let (syncer_tributary_send, syncer_tributary_recv) = broadcast::channel(5); - tokio::spawn(handle_p2p_task(syncer_p2p.clone(), syncer_tributary_recv)); + let (cosign_send, _) = mpsc::unbounded_channel(); + tokio::spawn(handle_p2p_task(syncer_p2p.clone(), cosign_send, syncer_tributary_recv)); syncer_tributary_send .send(TributaryEvent::NewTributary(ActiveTributary { spec: spec.clone(), diff --git a/coordinator/src/tributary/db.rs b/coordinator/src/tributary/db.rs index 65ebbf78..6ff6f76b 100644 --- a/coordinator/src/tributary/db.rs +++ b/coordinator/src/tributary/db.rs @@ -9,6 +9,8 @@ use frost::Participant; use serai_client::validator_sets::primitives::{ValidatorSet, KeyPair}; +use processor_messages::coordinator::SubstrateSignableId; + pub use serai_db::*; use crate::tributary::TributarySpec; @@ -16,16 +18,21 @@ use crate::tributary::TributarySpec; #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub enum Topic { Dkg, - Batch([u8; 5]), + SubstrateSign(SubstrateSignableId), Sign([u8; 32]), } impl Topic { fn as_key(&self, genesis: [u8; 32]) -> Vec { let mut res = genesis.to_vec(); + #[allow(unused_assignments)] // False positive + let mut id_buf = vec![]; let (label, id) = match self { Topic::Dkg => (b"dkg".as_slice(), [].as_slice()), - Topic::Batch(id) => (b"batch".as_slice(), id.as_slice()), + Topic::SubstrateSign(id) => { + id_buf = id.encode(); + (b"substrate_sign".as_slice(), id_buf.as_slice()) + } Topic::Sign(id) => (b"sign".as_slice(), id.as_slice()), }; res.push(u8::try_from(label.len()).unwrap()); diff --git a/coordinator/src/tributary/handle.rs b/coordinator/src/tributary/handle.rs index ba995c87..e5f22585 100644 --- a/coordinator/src/tributary/handle.rs +++ b/coordinator/src/tributary/handle.rs @@ -18,7 +18,7 @@ use tributary::{Signed, TransactionKind, TransactionTrait}; use processor_messages::{ key_gen::{self, KeyGenId}, - coordinator::{self, BatchSignId}, + coordinator::{self, SubstrateSignableId, SubstrateSignId}, sign::{self, SignId}, }; @@ -498,10 +498,50 @@ pub(crate) async fn handle_application_tx< } } + Transaction::CosignSubstrateBlock(hash) => { + TributaryDb::::recognize_topic( + txn, + genesis, + Topic::SubstrateSign(SubstrateSignableId::CosigningSubstrateBlock(hash)), + ); + NonceDecider::handle_substrate_signable( + txn, + genesis, + SubstrateSignableId::CosigningSubstrateBlock(hash), + ); + + let key = loop { + let Some(key_pair) = TributaryDb::::key_pair(txn, spec.set()) else { + // This can happen based on a timing condition + log::warn!("CosignSubstrateBlock yet keys weren't set yet"); + tokio::time::sleep(core::time::Duration::from_secs(1)).await; + continue; + }; + break key_pair.0.into(); + }; + processors + .send( + spec.set().network, + coordinator::CoordinatorMessage::CosignSubstrateBlock { + id: SubstrateSignId { + key, + id: SubstrateSignableId::CosigningSubstrateBlock(hash), + attempt: 0, + }, + }, + ) + .await; + } + Transaction::Batch(_, batch) => { // Because this Batch has achieved synchrony, its batch ID should be authorized - TributaryDb::::recognize_topic(txn, genesis, Topic::Batch(batch)); - let nonce = NonceDecider::handle_batch(txn, genesis, batch); + TributaryDb::::recognize_topic( + txn, + genesis, + Topic::SubstrateSign(SubstrateSignableId::Batch(batch)), + ); + let nonce = + NonceDecider::handle_substrate_signable(txn, genesis, SubstrateSignableId::Batch(batch)); recognized_id(spec.set(), genesis, RecognizedIdType::Batch, batch.to_vec(), nonce).await; } @@ -518,14 +558,14 @@ pub(crate) async fn handle_application_tx< } } - Transaction::BatchPreprocess(data) => { + Transaction::SubstratePreprocess(data) => { let Ok(_) = check_sign_data_len::(txn, spec, data.signed.signer, data.data.len()) else { return; }; match handle( txn, &DataSpecification { - topic: Topic::Batch(data.plan), + topic: Topic::SubstrateSign(data.plan), label: BATCH_PREPROCESS, attempt: data.attempt, }, @@ -534,13 +574,13 @@ pub(crate) async fn handle_application_tx< ) { Accumulation::Ready(DataSet::Participating(mut preprocesses)) => { unflatten(spec, &mut preprocesses); - NonceDecider::selected_for_signing_batch(txn, genesis, data.plan); + NonceDecider::selected_for_signing_substrate(txn, genesis, data.plan); let key = TributaryDb::::key_pair(txn, spec.set()).unwrap().0 .0; processors .send( spec.set().network, - coordinator::CoordinatorMessage::BatchPreprocesses { - id: BatchSignId { key, id: data.plan, attempt: data.attempt }, + coordinator::CoordinatorMessage::SubstratePreprocesses { + id: SubstrateSignId { key, id: data.plan, attempt: data.attempt }, preprocesses, }, ) @@ -550,14 +590,14 @@ pub(crate) async fn handle_application_tx< Accumulation::NotReady => {} } } - Transaction::BatchShare(data) => { + Transaction::SubstrateShare(data) => { let Ok(_) = check_sign_data_len::(txn, spec, data.signed.signer, data.data.len()) else { return; }; match handle( txn, &DataSpecification { - topic: Topic::Batch(data.plan), + topic: Topic::SubstrateSign(data.plan), label: BATCH_SHARE, attempt: data.attempt, }, @@ -570,8 +610,8 @@ pub(crate) async fn handle_application_tx< processors .send( spec.set().network, - coordinator::CoordinatorMessage::BatchShares { - id: BatchSignId { key, id: data.plan, attempt: data.attempt }, + coordinator::CoordinatorMessage::SubstrateShares { + id: SubstrateSignId { key, id: data.plan, attempt: data.attempt }, shares: shares .into_iter() .map(|(validator, share)| (validator, share.try_into().unwrap())) diff --git a/coordinator/src/tributary/mod.rs b/coordinator/src/tributary/mod.rs index ab4631de..4af7964c 100644 --- a/coordinator/src/tributary/mod.rs +++ b/coordinator/src/tributary/mod.rs @@ -1,4 +1,7 @@ -use core::ops::{Deref, Range}; +use core::{ + ops::{Deref, Range}, + fmt::Debug, +}; use std::io::{self, Read, Write}; use zeroize::Zeroizing; @@ -15,6 +18,7 @@ use schnorr::SchnorrSignature; use frost::Participant; use scale::{Encode, Decode}; +use processor_messages::coordinator::SubstrateSignableId; use serai_client::{ primitives::{NetworkId, PublicKey}, @@ -167,8 +171,8 @@ impl TributarySpec { } #[derive(Clone, PartialEq, Eq, Debug)] -pub struct SignData { - pub plan: [u8; N], +pub struct SignData { + pub plan: Id, pub attempt: u32, pub data: Vec>, @@ -176,10 +180,10 @@ pub struct SignData { pub signed: Signed, } -impl ReadWrite for SignData { +impl ReadWrite for SignData { fn read(reader: &mut R) -> io::Result { - let mut plan = [0; N]; - reader.read_exact(&mut plan)?; + let plan = Id::decode(&mut scale::IoReader(&mut *reader)) + .map_err(|_| io::Error::new(io::ErrorKind::Other, "invalid plan in SignData"))?; let mut attempt = [0; 4]; reader.read_exact(&mut attempt)?; @@ -208,7 +212,7 @@ impl ReadWrite for SignData { } fn write(&self, writer: &mut W) -> io::Result<()> { - writer.write_all(&self.plan)?; + writer.write_all(&self.plan.encode())?; writer.write_all(&self.attempt.to_le_bytes())?; writer.write_all(&[u8::try_from(self.data.len()).unwrap()])?; @@ -253,6 +257,9 @@ pub enum Transaction { }, DkgConfirmed(u32, [u8; 32], Signed), + // Co-sign a Substrate block. + CosignSubstrateBlock([u8; 32]), + // When we have synchrony on a batch, we can allow signing it // TODO (never?): This is less efficient compared to an ExternalBlock provided transaction, // which would be binding over the block hash and automatically achieve synchrony on all @@ -263,11 +270,11 @@ pub enum Transaction { // IDs SubstrateBlock(u64), - BatchPreprocess(SignData<5>), - BatchShare(SignData<5>), + SubstratePreprocess(SignData), + SubstrateShare(SignData), - SignPreprocess(SignData<32>), - SignShare(SignData<32>), + SignPreprocess(SignData<[u8; 32]>), + SignShare(SignData<[u8; 32]>), // This is defined as an Unsigned transaction in order to de-duplicate SignCompleted amongst // reporters (who should all report the same thing) // We do still track the signer in order to prevent a single signer from publishing arbitrarily @@ -415,6 +422,12 @@ impl ReadWrite for Transaction { } 5 => { + let mut block = [0; 32]; + reader.read_exact(&mut block)?; + Ok(Transaction::CosignSubstrateBlock(block)) + } + + 6 => { let mut block = [0; 32]; reader.read_exact(&mut block)?; let mut batch = [0; 5]; @@ -422,19 +435,19 @@ impl ReadWrite for Transaction { Ok(Transaction::Batch(block, batch)) } - 6 => { + 7 => { let mut block = [0; 8]; reader.read_exact(&mut block)?; Ok(Transaction::SubstrateBlock(u64::from_le_bytes(block))) } - 7 => SignData::read(reader).map(Transaction::BatchPreprocess), - 8 => SignData::read(reader).map(Transaction::BatchShare), + 8 => SignData::read(reader).map(Transaction::SubstratePreprocess), + 9 => SignData::read(reader).map(Transaction::SubstrateShare), - 9 => SignData::read(reader).map(Transaction::SignPreprocess), - 10 => SignData::read(reader).map(Transaction::SignShare), + 10 => SignData::read(reader).map(Transaction::SignPreprocess), + 11 => SignData::read(reader).map(Transaction::SignShare), - 11 => { + 12 => { let mut plan = [0; 32]; reader.read_exact(&mut plan)?; @@ -534,36 +547,41 @@ impl ReadWrite for Transaction { signed.write(writer) } - Transaction::Batch(block, batch) => { + Transaction::CosignSubstrateBlock(block) => { writer.write_all(&[5])?; + writer.write_all(block) + } + + Transaction::Batch(block, batch) => { + writer.write_all(&[6])?; writer.write_all(block)?; writer.write_all(batch) } Transaction::SubstrateBlock(block) => { - writer.write_all(&[6])?; + writer.write_all(&[7])?; writer.write_all(&block.to_le_bytes()) } - Transaction::BatchPreprocess(data) => { - writer.write_all(&[7])?; + Transaction::SubstratePreprocess(data) => { + writer.write_all(&[8])?; data.write(writer) } - Transaction::BatchShare(data) => { - writer.write_all(&[8])?; + Transaction::SubstrateShare(data) => { + writer.write_all(&[9])?; data.write(writer) } Transaction::SignPreprocess(data) => { - writer.write_all(&[9])?; - data.write(writer) - } - Transaction::SignShare(data) => { writer.write_all(&[10])?; data.write(writer) } - Transaction::SignCompleted { plan, tx_hash, first_signer, signature } => { + Transaction::SignShare(data) => { writer.write_all(&[11])?; + data.write(writer) + } + Transaction::SignCompleted { plan, tx_hash, first_signer, signature } => { + writer.write_all(&[12])?; writer.write_all(plan)?; writer .write_all(&[u8::try_from(tx_hash.len()).expect("tx hash length exceed 255 bytes")])?; @@ -585,11 +603,13 @@ impl TransactionTrait for Transaction { Transaction::InvalidDkgShare { signed, .. } => TransactionKind::Signed(signed), Transaction::DkgConfirmed(_, _, signed) => TransactionKind::Signed(signed), + Transaction::CosignSubstrateBlock(_) => TransactionKind::Provided("cosign"), + Transaction::Batch(_, _) => TransactionKind::Provided("batch"), Transaction::SubstrateBlock(_) => TransactionKind::Provided("serai"), - Transaction::BatchPreprocess(data) => TransactionKind::Signed(&data.signed), - Transaction::BatchShare(data) => TransactionKind::Signed(&data.signed), + Transaction::SubstratePreprocess(data) => TransactionKind::Signed(&data.signed), + Transaction::SubstrateShare(data) => TransactionKind::Signed(&data.signed), Transaction::SignPreprocess(data) => TransactionKind::Signed(&data.signed), Transaction::SignShare(data) => TransactionKind::Signed(&data.signed), @@ -607,7 +627,7 @@ impl TransactionTrait for Transaction { } fn verify(&self) -> Result<(), TransactionError> { - if let Transaction::BatchShare(data) = self { + if let Transaction::SubstrateShare(data) = self { for data in &data.data { if data.len() != 32 { Err(TransactionError::InvalidContent)?; @@ -655,11 +675,13 @@ impl Transaction { Transaction::InvalidDkgShare { ref mut signed, .. } => signed, Transaction::DkgConfirmed(_, _, ref mut signed) => signed, + Transaction::CosignSubstrateBlock(_) => panic!("signing CosignSubstrateBlock"), + Transaction::Batch(_, _) => panic!("signing Batch"), Transaction::SubstrateBlock(_) => panic!("signing SubstrateBlock"), - Transaction::BatchPreprocess(ref mut data) => &mut data.signed, - Transaction::BatchShare(ref mut data) => &mut data.signed, + Transaction::SubstratePreprocess(ref mut data) => &mut data.signed, + Transaction::SubstrateShare(ref mut data) => &mut data.signed, Transaction::SignPreprocess(ref mut data) => &mut data.signed, Transaction::SignShare(ref mut data) => &mut data.signed, diff --git a/coordinator/src/tributary/nonce_decider.rs b/coordinator/src/tributary/nonce_decider.rs index 74b8ebbf..37dcbc65 100644 --- a/coordinator/src/tributary/nonce_decider.rs +++ b/coordinator/src/tributary/nonce_decider.rs @@ -1,11 +1,13 @@ use serai_db::{Get, DbTxn, create_db}; +use processor_messages::coordinator::SubstrateSignableId; + use crate::tributary::Transaction; use scale::Encode; -const BATCH_CODE: u8 = 0; -const BATCH_SIGNING_CODE: u8 = 1; +const SUBSTRATE_CODE: u8 = 0; +const SUBSTRATE_SIGNING_CODE: u8 = 1; const PLAN_CODE: u8 = 2; const PLAN_SIGNING_CODE: u8 = 3; @@ -30,9 +32,13 @@ impl NextNonceDb { /// transactions in response. Enables rebooting/rebuilding validators with full safety. pub struct NonceDecider; impl NonceDecider { - pub fn handle_batch(txn: &mut impl DbTxn, genesis: [u8; 32], batch: [u8; 5]) -> u32 { + pub fn handle_substrate_signable( + txn: &mut impl DbTxn, + genesis: [u8; 32], + id: SubstrateSignableId, + ) -> u32 { let nonce_for = NextNonceDb::allocate_nonce(txn, genesis); - ItemNonceDb::set(txn, genesis, BATCH_CODE, &batch, &nonce_for); + ItemNonceDb::set(txn, genesis, SUBSTRATE_CODE, &id.encode(), &nonce_for); nonce_for } @@ -53,12 +59,16 @@ impl NonceDecider { // TODO: The processor won't yield shares for this if the signing protocol aborts. We need to // detect when we're expecting shares for an aborted protocol and insert a dummy transaction // there. - pub fn selected_for_signing_batch(txn: &mut impl DbTxn, genesis: [u8; 32], batch: [u8; 5]) { + pub fn selected_for_signing_substrate( + txn: &mut impl DbTxn, + genesis: [u8; 32], + id: SubstrateSignableId, + ) { let nonce_for = NextNonceDb::allocate_nonce(txn, genesis); - ItemNonceDb::set(txn, genesis, BATCH_SIGNING_CODE, &batch, &nonce_for); + ItemNonceDb::set(txn, genesis, SUBSTRATE_SIGNING_CODE, &id.encode(), &nonce_for); } - // TODO: Same TODO as selected_for_signing_batch + // TODO: Same TODO as selected_for_signing_substrate pub fn selected_for_signing_plan(txn: &mut impl DbTxn, genesis: [u8; 32], plan: [u8; 32]) { let nonce_for = NextNonceDb::allocate_nonce(txn, genesis); ItemNonceDb::set(txn, genesis, PLAN_SIGNING_CODE, &plan, &nonce_for); @@ -86,23 +96,26 @@ impl NonceDecider { assert_eq!(*attempt, 0); Some(Some(2)) } + + Transaction::CosignSubstrateBlock(_) => None, + Transaction::Batch(_, _) => None, Transaction::SubstrateBlock(_) => None, - Transaction::BatchPreprocess(data) => { + Transaction::SubstratePreprocess(data) => { assert_eq!(data.attempt, 0); - Some(ItemNonceDb::get(getter, genesis, BATCH_CODE, &data.plan)) + Some(ItemNonceDb::get(getter, genesis, SUBSTRATE_CODE, &data.plan.encode())) } - Transaction::BatchShare(data) => { + Transaction::SubstrateShare(data) => { assert_eq!(data.attempt, 0); - Some(ItemNonceDb::get(getter, genesis, BATCH_SIGNING_CODE, &data.plan)) + Some(ItemNonceDb::get(getter, genesis, SUBSTRATE_SIGNING_CODE, &data.plan.encode())) } Transaction::SignPreprocess(data) => { assert_eq!(data.attempt, 0); - Some(ItemNonceDb::get(getter, genesis, PLAN_CODE, &data.plan)) + Some(ItemNonceDb::get(getter, genesis, PLAN_CODE, &data.plan.encode())) } Transaction::SignShare(data) => { assert_eq!(data.attempt, 0); - Some(ItemNonceDb::get(getter, genesis, PLAN_SIGNING_CODE, &data.plan)) + Some(ItemNonceDb::get(getter, genesis, PLAN_SIGNING_CODE, &data.plan.encode())) } Transaction::SignCompleted { .. } => None, } diff --git a/processor/messages/src/lib.rs b/processor/messages/src/lib.rs index d08afffb..055b4180 100644 --- a/processor/messages/src/lib.rs +++ b/processor/messages/src/lib.rs @@ -156,20 +156,37 @@ pub mod sign { pub mod coordinator { use super::*; + pub fn cosign_block_msg(block: [u8; 32]) -> Vec { + const DST: &[u8] = b"Cosign"; + let mut res = vec![u8::try_from(DST.len()).unwrap()]; + res.extend(DST); + res.extend(block); + res + } + + #[derive( + Clone, Copy, PartialEq, Eq, Hash, Debug, Zeroize, Encode, Decode, Serialize, Deserialize, + )] + pub enum SubstrateSignableId { + CosigningSubstrateBlock([u8; 32]), + Batch([u8; 5]), + } + #[derive(Clone, PartialEq, Eq, Hash, Debug, Zeroize, Encode, Decode, Serialize, Deserialize)] - pub struct BatchSignId { + pub struct SubstrateSignId { pub key: [u8; 32], - pub id: [u8; 5], + pub id: SubstrateSignableId, pub attempt: u32, } #[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] pub enum CoordinatorMessage { + CosignSubstrateBlock { id: SubstrateSignId }, // Uses Vec instead of [u8; 64] since serde Deserialize isn't implemented for [u8; 64] - BatchPreprocesses { id: BatchSignId, preprocesses: HashMap> }, - BatchShares { id: BatchSignId, shares: HashMap }, + SubstratePreprocesses { id: SubstrateSignId, preprocesses: HashMap> }, + SubstrateShares { id: SubstrateSignId, shares: HashMap }, // Re-attempt a batch signing protocol. - BatchReattempt { id: BatchSignId }, + BatchReattempt { id: SubstrateSignId }, } impl CoordinatorMessage { @@ -179,16 +196,18 @@ pub mod coordinator { // This synchrony obtained lets us ignore the synchrony requirement offered here pub fn required_block(&self) -> Option { match self { - CoordinatorMessage::BatchPreprocesses { .. } => None, - CoordinatorMessage::BatchShares { .. } => None, + CoordinatorMessage::CosignSubstrateBlock { .. } => None, + CoordinatorMessage::SubstratePreprocesses { .. } => None, + CoordinatorMessage::SubstrateShares { .. } => None, CoordinatorMessage::BatchReattempt { .. } => None, } } pub fn key(&self) -> &[u8] { match self { - CoordinatorMessage::BatchPreprocesses { id, .. } => &id.key, - CoordinatorMessage::BatchShares { id, .. } => &id.key, + CoordinatorMessage::CosignSubstrateBlock { id } => &id.key, + CoordinatorMessage::SubstratePreprocesses { id, .. } => &id.key, + CoordinatorMessage::SubstrateShares { id, .. } => &id.key, CoordinatorMessage::BatchReattempt { id } => &id.key, } } @@ -203,9 +222,11 @@ pub mod coordinator { #[derive(Clone, PartialEq, Eq, Debug, Zeroize, Serialize, Deserialize)] pub enum ProcessorMessage { SubstrateBlockAck { network: NetworkId, block: u64, plans: Vec }, - InvalidParticipant { id: BatchSignId, participant: Participant }, - BatchPreprocess { id: BatchSignId, block: BlockHash, preprocesses: Vec> }, - BatchShare { id: BatchSignId, shares: Vec<[u8; 32]> }, + InvalidParticipant { id: SubstrateSignId, participant: Participant }, + CosignPreprocess { id: SubstrateSignId, preprocesses: Vec> }, + BatchPreprocess { id: SubstrateSignId, block: BlockHash, preprocesses: Vec> }, + SubstrateShare { id: SubstrateSignId, shares: Vec<[u8; 32]> }, + CosignedBlock { block: [u8; 32], signature: Vec }, } } @@ -350,10 +371,12 @@ impl CoordinatorMessage { } CoordinatorMessage::Coordinator(msg) => { let (sub, id) = match msg { - // Unique since this embeds the batch ID (hash of it, including its network) and attempt - coordinator::CoordinatorMessage::BatchPreprocesses { id, .. } => (0, id.encode()), - coordinator::CoordinatorMessage::BatchShares { id, .. } => (1, id.encode()), - coordinator::CoordinatorMessage::BatchReattempt { id, .. } => (2, id.encode()), + // Unique since this is the entire message + coordinator::CoordinatorMessage::CosignSubstrateBlock { id } => (0, id.encode()), + // Unique since this embeds the batch ID (including its network) and attempt + coordinator::CoordinatorMessage::SubstratePreprocesses { id, .. } => (1, id.encode()), + coordinator::CoordinatorMessage::SubstrateShares { id, .. } => (2, id.encode()), + coordinator::CoordinatorMessage::BatchReattempt { id, .. } => (3, id.encode()), }; let mut res = vec![COORDINATOR_UID, TYPE_COORDINATOR_UID, sub]; @@ -420,10 +443,12 @@ impl ProcessorMessage { coordinator::ProcessorMessage::SubstrateBlockAck { network, block, .. } => { (0, (network, block).encode()) } - // Unique since BatchSignId + // Unique since SubstrateSignId coordinator::ProcessorMessage::InvalidParticipant { id, .. } => (1, id.encode()), - coordinator::ProcessorMessage::BatchPreprocess { id, .. } => (2, id.encode()), - coordinator::ProcessorMessage::BatchShare { id, .. } => (3, id.encode()), + coordinator::ProcessorMessage::CosignPreprocess { id, .. } => (2, id.encode()), + coordinator::ProcessorMessage::BatchPreprocess { id, .. } => (3, id.encode()), + coordinator::ProcessorMessage::SubstrateShare { id, .. } => (4, id.encode()), + coordinator::ProcessorMessage::CosignedBlock { block, .. } => (5, block.encode()), }; let mut res = vec![PROCESSSOR_UID, TYPE_COORDINATOR_UID, sub]; diff --git a/processor/src/cosigner.rs b/processor/src/cosigner.rs new file mode 100644 index 00000000..dc8008fd --- /dev/null +++ b/processor/src/cosigner.rs @@ -0,0 +1,286 @@ +use core::fmt; +use std::collections::HashMap; + +use rand_core::OsRng; + +use ciphersuite::group::GroupEncoding; +use frost::{ + curve::Ristretto, + ThresholdKeys, FrostError, + algorithm::Algorithm, + sign::{ + Writable, PreprocessMachine, SignMachine, SignatureMachine, AlgorithmMachine, + AlgorithmSignMachine, AlgorithmSignatureMachine, + }, +}; +use frost_schnorrkel::Schnorrkel; + +use log::{info, warn}; + +use scale::Encode; + +use messages::coordinator::*; +use crate::{Get, DbTxn, create_db}; + +create_db! { + CosignerDb { + Completed: (id: [u8; 32]) -> (), + Attempt: (id: [u8; 32], attempt: u32) -> () + } +} + +type Preprocess = as PreprocessMachine>::Preprocess; +type SignatureShare = as SignMachine< + >::Signature, +>>::SignatureShare; + +pub struct Cosigner { + #[allow(dead_code)] // False positive + keys: Vec>, + + id: [u8; 32], + attempt: u32, + #[allow(clippy::type_complexity)] + preprocessing: Option<(Vec>, Vec)>, + #[allow(clippy::type_complexity)] + signing: Option<(AlgorithmSignatureMachine, Vec)>, +} + +impl fmt::Debug for Cosigner { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt + .debug_struct("Cosigner") + .field("id", &self.id) + .field("attempt", &self.attempt) + .field("preprocessing", &self.preprocessing.is_some()) + .field("signing", &self.signing.is_some()) + .finish_non_exhaustive() + } +} + +impl Cosigner { + pub fn new( + txn: &mut impl DbTxn, + keys: Vec>, + id: [u8; 32], + attempt: u32, + ) -> Option<(Cosigner, ProcessorMessage)> { + assert!(!keys.is_empty()); + + if Completed::get(txn, id).is_some() { + return None; + } + + if Attempt::get(txn, id, attempt).is_some() { + warn!( + "already attempted cosigning {}, attempt #{}. this is an error if we didn't reboot", + hex::encode(id), + attempt, + ); + return None; + } + Attempt::set(txn, id, attempt, &()); + + info!("cosigning block {} with attempt #{}", hex::encode(id), attempt); + + let mut machines = vec![]; + let mut preprocesses = vec![]; + let mut serialized_preprocesses = vec![]; + for keys in &keys { + // b"substrate" is a literal from sp-core + let machine = AlgorithmMachine::new(Schnorrkel::new(b"substrate"), keys.clone()); + + let (machine, preprocess) = machine.preprocess(&mut OsRng); + machines.push(machine); + serialized_preprocesses.push(preprocess.serialize()); + preprocesses.push(preprocess); + } + let preprocessing = Some((machines, preprocesses)); + + let substrate_sign_id = SubstrateSignId { + key: keys[0].group_key().to_bytes(), + id: SubstrateSignableId::CosigningSubstrateBlock(id), + attempt, + }; + + Some(( + Cosigner { keys, id, attempt, preprocessing, signing: None }, + ProcessorMessage::CosignPreprocess { + id: substrate_sign_id, + preprocesses: serialized_preprocesses, + }, + )) + } + + #[must_use] + pub async fn handle( + &mut self, + txn: &mut impl DbTxn, + msg: CoordinatorMessage, + ) -> Option { + match msg { + CoordinatorMessage::CosignSubstrateBlock { .. } => { + panic!("Cosigner passed CosignSubstrateBlock") + } + + CoordinatorMessage::SubstratePreprocesses { id, preprocesses } => { + assert_eq!(id.key, self.keys[0].group_key().to_bytes()); + let SubstrateSignableId::CosigningSubstrateBlock(block) = id.id else { + panic!("cosigner passed Batch") + }; + if block != self.id { + panic!("given preprocesses for a distinct block than cosigner is signing") + } + if id.attempt != self.attempt { + panic!("given preprocesses for a distinct attempt than cosigner is signing") + } + + let (machines, our_preprocesses) = match self.preprocessing.take() { + // Either rebooted or RPC error, or some invariant + None => { + warn!( + "not preprocessing for {}. this is an error if we didn't reboot", + hex::encode(block), + ); + return None; + } + Some(preprocess) => preprocess, + }; + + let mut parsed = HashMap::new(); + for l in { + let mut keys = preprocesses.keys().cloned().collect::>(); + keys.sort(); + keys + } { + let mut preprocess_ref = preprocesses.get(&l).unwrap().as_slice(); + let Ok(res) = machines[0].read_preprocess(&mut preprocess_ref) else { + return Some(ProcessorMessage::InvalidParticipant { id, participant: l }); + }; + if !preprocess_ref.is_empty() { + return Some(ProcessorMessage::InvalidParticipant { id, participant: l }); + } + parsed.insert(l, res); + } + let preprocesses = parsed; + + // Only keep a single machine as we only need one to get the signature + let mut signature_machine = None; + let mut shares = vec![]; + let mut serialized_shares = vec![]; + for (m, machine) in machines.into_iter().enumerate() { + let mut preprocesses = preprocesses.clone(); + for (i, our_preprocess) in our_preprocesses.clone().into_iter().enumerate() { + if i != m { + assert!(preprocesses.insert(self.keys[i].params().i(), our_preprocess).is_none()); + } + } + + let (machine, share) = match machine.sign(preprocesses, &cosign_block_msg(self.id)) { + Ok(res) => res, + Err(e) => match e { + FrostError::InternalError(_) | + FrostError::InvalidParticipant(_, _) | + FrostError::InvalidSigningSet(_) | + FrostError::InvalidParticipantQuantity(_, _) | + FrostError::DuplicatedParticipant(_) | + FrostError::MissingParticipant(_) => unreachable!(), + + FrostError::InvalidPreprocess(l) | FrostError::InvalidShare(l) => { + return Some(ProcessorMessage::InvalidParticipant { id, participant: l }) + } + }, + }; + if m == 0 { + signature_machine = Some(machine); + } + + let mut share_bytes = [0; 32]; + share_bytes.copy_from_slice(&share.serialize()); + serialized_shares.push(share_bytes); + + shares.push(share); + } + self.signing = Some((signature_machine.unwrap(), shares)); + + // Broadcast our shares + Some(ProcessorMessage::SubstrateShare { id, shares: serialized_shares }) + } + + CoordinatorMessage::SubstrateShares { id, shares } => { + assert_eq!(id.key, self.keys[0].group_key().to_bytes()); + let SubstrateSignableId::CosigningSubstrateBlock(block) = id.id else { + panic!("cosigner passed Batch") + }; + if block != self.id { + panic!("given preprocesses for a distinct block than cosigner is signing") + } + if id.attempt != self.attempt { + panic!("given preprocesses for a distinct attempt than cosigner is signing") + } + + let (machine, our_shares) = match self.signing.take() { + // Rebooted, RPC error, or some invariant + None => { + // If preprocessing has this ID, it means we were never sent the preprocess by the + // coordinator + if self.preprocessing.is_some() { + panic!("never preprocessed yet signing?"); + } + + warn!( + "not preprocessing for {}. this is an error if we didn't reboot", + hex::encode(block) + ); + return None; + } + Some(signing) => signing, + }; + + let mut parsed = HashMap::new(); + for l in { + let mut keys = shares.keys().cloned().collect::>(); + keys.sort(); + keys + } { + let mut share_ref = shares.get(&l).unwrap().as_slice(); + let Ok(res) = machine.read_share(&mut share_ref) else { + return Some(ProcessorMessage::InvalidParticipant { id, participant: l }); + }; + if !share_ref.is_empty() { + return Some(ProcessorMessage::InvalidParticipant { id, participant: l }); + } + parsed.insert(l, res); + } + let mut shares = parsed; + + for (i, our_share) in our_shares.into_iter().enumerate().skip(1) { + assert!(shares.insert(self.keys[i].params().i(), our_share).is_none()); + } + + let sig = match machine.complete(shares) { + Ok(res) => res, + Err(e) => match e { + FrostError::InternalError(_) | + FrostError::InvalidParticipant(_, _) | + FrostError::InvalidSigningSet(_) | + FrostError::InvalidParticipantQuantity(_, _) | + FrostError::DuplicatedParticipant(_) | + FrostError::MissingParticipant(_) => unreachable!(), + + FrostError::InvalidPreprocess(l) | FrostError::InvalidShare(l) => { + return Some(ProcessorMessage::InvalidParticipant { id, participant: l }) + } + }, + }; + + info!("cosigned {} with attempt #{}", hex::encode(block), id.attempt); + + Completed::set(txn, block, &()); + + Some(ProcessorMessage::CosignedBlock { block, signature: sig.to_bytes().to_vec() }) + } + CoordinatorMessage::BatchReattempt { .. } => panic!("BatchReattempt passed to Cosigner"), + } + } +} diff --git a/processor/src/key_gen.rs b/processor/src/key_gen.rs index 940edd37..1448819d 100644 --- a/processor/src/key_gen.rs +++ b/processor/src/key_gen.rs @@ -36,7 +36,10 @@ create_db!( // Overwriting its commitments would be accordingly poor CommitmentsDb: (key: &KeyGenId) -> HashMap>, GeneratedKeysDb: (set: &ValidatorSet, substrate_key: &[u8; 32], network_key: &[u8]) -> Vec, - KeysDb: (network_key: &[u8]) -> Vec + // These do assume a key is only used once across sets, which holds true so long as a single + // participant is honest in their execution of the protocol + KeysDb: (network_key: &[u8]) -> Vec, + NetworkKey: (substrate_key: [u8; 32]) -> Vec } ); @@ -102,6 +105,7 @@ impl KeysDb { keys.1[0].group_key().to_bytes().as_ref(), ); txn.put(KeysDb::key(keys.1[0].group_key().to_bytes().as_ref()), keys_vec); + NetworkKey::set(txn, key_pair.0.into(), &key_pair.1.clone().into_inner()); keys } @@ -115,6 +119,16 @@ impl KeysDb { assert_eq!(&res.1[0].group_key(), network_key); Some(res) } + + pub fn substrate_keys_by_substrate_key( + getter: &impl Get, + substrate_key: &[u8; 32], + ) -> Option>> { + let network_key = NetworkKey::get(getter, *substrate_key)?; + let res = GeneratedKeysDb::read_keys::(getter, &Self::key(&network_key))?.1; + assert_eq!(&res.0[0].group_key().to_bytes(), substrate_key); + Some(res.0) + } } type SecretShareMachines = @@ -152,6 +166,13 @@ impl KeyGen { KeysDb::keys::(&self.db, key) } + pub fn substrate_keys_by_substrate_key( + &self, + substrate_key: &[u8; 32], + ) -> Option>> { + KeysDb::substrate_keys_by_substrate_key::(&self.db, substrate_key) + } + pub async fn handle( &mut self, txn: &mut D::Transaction<'_>, diff --git a/processor/src/main.rs b/processor/src/main.rs index 17f3b171..97c66018 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -13,7 +13,12 @@ use serai_client::{ validator_sets::primitives::{ValidatorSet, KeyPair}, }; -use messages::{coordinator::PlanMeta, CoordinatorMessage}; +use messages::{ + coordinator::{ + SubstrateSignableId, PlanMeta, CoordinatorMessage as CoordinatorCoordinatorMessage, + }, + CoordinatorMessage, +}; use serai_env as env; @@ -44,6 +49,9 @@ use key_gen::{KeyConfirmed, KeyGen}; mod signer; use signer::Signer; +mod cosigner; +use cosigner::Cosigner; + mod substrate_signer; use substrate_signer::SubstrateSigner; @@ -86,6 +94,9 @@ struct TributaryMutable { // There should only be one SubstrateSigner at a time (see #277) substrate_signer: Option>, + + // Solely mutated by the tributary. + cosigner: Option, } // Items which are mutably borrowed by Substrate. @@ -218,16 +229,58 @@ async fn handle_coordinator_msg( } CoordinatorMessage::Coordinator(msg) => { - if let Some(msg) = tributary_mutable - .substrate_signer - .as_mut() - .expect( - "coordinator told us to sign a batch when we don't have a Substrate signer at this time", - ) - .handle(txn, msg) - .await - { - coordinator.send(msg).await; + let is_batch = match msg { + CoordinatorCoordinatorMessage::CosignSubstrateBlock { .. } => false, + CoordinatorCoordinatorMessage::SubstratePreprocesses { ref id, .. } => { + matches!(&id.id, SubstrateSignableId::Batch(_)) + } + CoordinatorCoordinatorMessage::SubstrateShares { ref id, .. } => { + matches!(&id.id, SubstrateSignableId::Batch(_)) + } + CoordinatorCoordinatorMessage::BatchReattempt { .. } => true, + }; + if is_batch { + if let Some(msg) = tributary_mutable + .substrate_signer + .as_mut() + .expect( + "coordinator told us to sign a batch when we don't currently have a Substrate signer", + ) + .handle(txn, msg) + .await + { + coordinator.send(msg).await; + } + } else { + match msg { + CoordinatorCoordinatorMessage::CosignSubstrateBlock { id } => { + let SubstrateSignableId::CosigningSubstrateBlock(block) = id.id else { + panic!("CosignSubstrateBlock id didn't have a CosigningSubstrateBlock") + }; + let Some(keys) = tributary_mutable.key_gen.substrate_keys_by_substrate_key(&id.key) + else { + panic!("didn't have key shares for the key we were told to cosign with"); + }; + if let Some((cosigner, msg)) = Cosigner::new(txn, keys, block, id.attempt) { + tributary_mutable.cosigner = Some(cosigner); + coordinator.send(msg).await; + } else { + log::warn!("Cosigner::new returned None"); + } + } + _ => { + if let Some(cosigner) = tributary_mutable.cosigner.as_mut() { + if let Some(msg) = cosigner.handle(txn, msg).await { + coordinator.send(msg).await; + } + } else { + log::warn!( + "received message for cosigner yet didn't have a cosigner. {}", + "this is an error if we didn't reboot", + ); + } + } + } } } @@ -240,6 +293,7 @@ async fn handle_coordinator_msg( if context.network_latest_finalized_block.0 == [0; 32] { assert!(tributary_mutable.signers.is_empty()); assert!(tributary_mutable.substrate_signer.is_none()); + assert!(tributary_mutable.cosigner.is_none()); // We can't check this as existing is no longer pub // assert!(substrate_mutable.existing.as_ref().is_none()); @@ -337,7 +391,7 @@ async fn handle_coordinator_msg( } } - // Since this block was acknowledged, we no longer have to sign the batches for it + // Since this block was acknowledged, we no longer have to sign the batches within it if let Some(substrate_signer) = tributary_mutable.substrate_signer.as_mut() { for batch_id in batches { substrate_signer.batch_signed(txn, batch_id); @@ -480,7 +534,11 @@ async fn boot( // This hedges against being dropped due to full mempools, temporarily too low of a fee... tokio::spawn(Signer::::rebroadcast_task(raw_db.clone(), network.clone())); - (main_db, TributaryMutable { key_gen, substrate_signer, signers }, multisig_manager) + ( + main_db, + TributaryMutable { key_gen, substrate_signer, cosigner: None, signers }, + multisig_manager, + ) } #[allow(clippy::await_holding_lock)] // Needed for txn, unfortunately can't be down-scoped @@ -553,6 +611,7 @@ async fn run(mut raw_db: D, network: N, mut for batch in batches { info!("created batch {} ({} instructions)", batch.id, batch.instructions.len()); + // The coordinator expects BatchPreprocess to immediately follow Batch coordinator.send( messages::substrate::ProcessorMessage::Batch { batch: batch.clone() } ).await; diff --git a/processor/src/substrate_signer.rs b/processor/src/substrate_signer.rs index 0a6e0b6d..e9a40bb7 100644 --- a/processor/src/substrate_signer.rs +++ b/processor/src/substrate_signer.rs @@ -48,14 +48,14 @@ impl SubstrateSignerDb { getter.get(Self::completed_key(id)).is_some() } - fn attempt_key(id: &BatchSignId) -> Vec { - Self::sign_key(b"attempt", id.encode()) + fn attempt_key(id: [u8; 5], attempt: u32) -> Vec { + Self::sign_key(b"attempt", (id, attempt).encode()) } - fn attempt(txn: &mut D::Transaction<'_>, id: &BatchSignId) { - txn.put(Self::attempt_key(id), []); + fn attempt(txn: &mut D::Transaction<'_>, id: [u8; 5], attempt: u32) { + txn.put(Self::attempt_key(id, attempt), []); } - fn has_attempt(getter: &G, id: &BatchSignId) -> bool { - getter.get(Self::attempt_key(id)).is_some() + fn has_attempt(getter: &G, id: [u8; 5], attempt: u32) -> bool { + getter.get(Self::attempt_key(id, attempt)).is_some() } fn save_batch(txn: &mut D::Transaction<'_>, batch: &SignedBatch) { @@ -68,6 +68,7 @@ type SignatureShare = as SignMachin >::Signature, >>::SignatureShare; +// TODO: Rename BatchSigner pub struct SubstrateSigner { db: PhantomData, @@ -110,22 +111,27 @@ impl SubstrateSigner { } } - fn verify_id(&self, id: &BatchSignId) -> Result<(), ()> { + fn verify_id(&self, id: &SubstrateSignId) -> Result<([u8; 32], [u8; 5], u32), ()> { + let SubstrateSignId { key, id, attempt } = id; + let SubstrateSignableId::Batch(id) = id else { panic!("SubstrateSigner handed non-Batch") }; + + assert_eq!(key, &self.keys[0].group_key().to_bytes()); + // Check the attempt lines up - match self.attempt.get(&id.id) { + match self.attempt.get(id) { // If we don't have an attempt logged, it's because the coordinator is faulty OR because we // rebooted OR we detected the signed batch on chain // The latter is the expected flow for batches not actively being participated in None => { - warn!("not attempting batch {} #{}", hex::encode(id.id), id.attempt); + warn!("not attempting batch {} #{}", hex::encode(id), attempt); Err(())?; } - Some(attempt) => { - if attempt != &id.attempt { + Some(our_attempt) => { + if attempt != our_attempt { warn!( "sent signing data for batch {} #{} yet we have attempt #{}", - hex::encode(id.id), - id.attempt, + hex::encode(id), + attempt, attempt ); Err(())?; @@ -133,7 +139,7 @@ impl SubstrateSigner { } } - Ok(()) + Ok((*key, *id, *attempt)) } #[must_use] @@ -176,8 +182,7 @@ impl SubstrateSigner { // Update the attempt number self.attempt.insert(id, attempt); - let id = BatchSignId { key: self.keys[0].group_key().to_bytes(), id, attempt }; - info!("signing batch {} #{}", hex::encode(id.id), id.attempt); + info!("signing batch {} #{}", hex::encode(id), attempt); // If we reboot mid-sign, the current design has us abort all signs and wait for latter // attempts/new signing protocols @@ -192,16 +197,15 @@ impl SubstrateSigner { // // Only run if this hasn't already been attempted // TODO: This isn't complete as this txn may not be committed with the expected timing - if SubstrateSignerDb::::has_attempt(txn, &id) { + if SubstrateSignerDb::::has_attempt(txn, id, attempt) { warn!( "already attempted batch {}, attempt #{}. this is an error if we didn't reboot", - hex::encode(id.id), - id.attempt + hex::encode(id), + attempt ); return None; } - - SubstrateSignerDb::::attempt(txn, &id); + SubstrateSignerDb::::attempt(txn, id, attempt); let mut machines = vec![]; let mut preprocesses = vec![]; @@ -215,7 +219,13 @@ impl SubstrateSigner { serialized_preprocesses.push(preprocess.serialize()); preprocesses.push(preprocess); } - self.preprocessing.insert(id.id, (machines, preprocesses)); + self.preprocessing.insert(id, (machines, preprocesses)); + + let id = SubstrateSignId { + key: self.keys[0].group_key().to_bytes(), + id: SubstrateSignableId::Batch(id), + attempt, + }; // Broadcast our preprocesses Some(ProcessorMessage::BatchPreprocess { id, block, preprocesses: serialized_preprocesses }) @@ -246,17 +256,22 @@ impl SubstrateSigner { msg: CoordinatorMessage, ) -> Option { match msg { - CoordinatorMessage::BatchPreprocesses { id, preprocesses } => { - if self.verify_id(&id).is_err() { - return None; - } + CoordinatorMessage::CosignSubstrateBlock { .. } => { + panic!("SubstrateSigner passed CosignSubstrateBlock") + } - let (machines, our_preprocesses) = match self.preprocessing.remove(&id.id) { + CoordinatorMessage::SubstratePreprocesses { id, preprocesses } => { + let (key, id, attempt) = self.verify_id(&id).ok()?; + + let substrate_sign_id = + SubstrateSignId { key, id: SubstrateSignableId::Batch(id), attempt }; + + let (machines, our_preprocesses) = match self.preprocessing.remove(&id) { // Either rebooted or RPC error, or some invariant None => { warn!( "not preprocessing for {}. this is an error if we didn't reboot", - hex::encode(id.id), + hex::encode(id), ); return None; } @@ -271,10 +286,16 @@ impl SubstrateSigner { } { let mut preprocess_ref = preprocesses.get(&l).unwrap().as_slice(); let Ok(res) = machines[0].read_preprocess(&mut preprocess_ref) else { - return Some((ProcessorMessage::InvalidParticipant { id, participant: l }).into()); + return Some( + (ProcessorMessage::InvalidParticipant { id: substrate_sign_id, participant: l }) + .into(), + ); }; if !preprocess_ref.is_empty() { - return Some((ProcessorMessage::InvalidParticipant { id, participant: l }).into()); + return Some( + (ProcessorMessage::InvalidParticipant { id: substrate_sign_id, participant: l }) + .into(), + ); } parsed.insert(l, res); } @@ -292,22 +313,26 @@ impl SubstrateSigner { } } - let (machine, share) = - match machine.sign(preprocesses, &batch_message(&self.signable[&id.id])) { - Ok(res) => res, - Err(e) => match e { - FrostError::InternalError(_) | - FrostError::InvalidParticipant(_, _) | - FrostError::InvalidSigningSet(_) | - FrostError::InvalidParticipantQuantity(_, _) | - FrostError::DuplicatedParticipant(_) | - FrostError::MissingParticipant(_) => unreachable!(), + let (machine, share) = match machine + .sign(preprocesses, &batch_message(&self.signable[&id])) + { + Ok(res) => res, + Err(e) => match e { + FrostError::InternalError(_) | + FrostError::InvalidParticipant(_, _) | + FrostError::InvalidSigningSet(_) | + FrostError::InvalidParticipantQuantity(_, _) | + FrostError::DuplicatedParticipant(_) | + FrostError::MissingParticipant(_) => unreachable!(), - FrostError::InvalidPreprocess(l) | FrostError::InvalidShare(l) => { - return Some((ProcessorMessage::InvalidParticipant { id, participant: l }).into()) - } - }, - }; + FrostError::InvalidPreprocess(l) | FrostError::InvalidShare(l) => { + return Some( + (ProcessorMessage::InvalidParticipant { id: substrate_sign_id, participant: l }) + .into(), + ) + } + }, + }; if m == 0 { signature_machine = Some(machine); } @@ -318,29 +343,33 @@ impl SubstrateSigner { shares.push(share); } - self.signing.insert(id.id, (signature_machine.unwrap(), shares)); + self.signing.insert(id, (signature_machine.unwrap(), shares)); // Broadcast our shares - Some((ProcessorMessage::BatchShare { id, shares: serialized_shares }).into()) + Some( + (ProcessorMessage::SubstrateShare { id: substrate_sign_id, shares: serialized_shares }) + .into(), + ) } - CoordinatorMessage::BatchShares { id, shares } => { - if self.verify_id(&id).is_err() { - return None; - } + CoordinatorMessage::SubstrateShares { id, shares } => { + let (key, id, attempt) = self.verify_id(&id).ok()?; - let (machine, our_shares) = match self.signing.remove(&id.id) { + let substrate_sign_id = + SubstrateSignId { key, id: SubstrateSignableId::Batch(id), attempt }; + + let (machine, our_shares) = match self.signing.remove(&id) { // Rebooted, RPC error, or some invariant None => { // If preprocessing has this ID, it means we were never sent the preprocess by the // coordinator - if self.preprocessing.contains_key(&id.id) { + if self.preprocessing.contains_key(&id) { panic!("never preprocessed yet signing?"); } warn!( "not preprocessing for {}. this is an error if we didn't reboot", - hex::encode(id.id) + hex::encode(id) ); return None; } @@ -355,10 +384,16 @@ impl SubstrateSigner { } { let mut share_ref = shares.get(&l).unwrap().as_slice(); let Ok(res) = machine.read_share(&mut share_ref) else { - return Some((ProcessorMessage::InvalidParticipant { id, participant: l }).into()); + return Some( + (ProcessorMessage::InvalidParticipant { id: substrate_sign_id, participant: l }) + .into(), + ); }; if !share_ref.is_empty() { - return Some((ProcessorMessage::InvalidParticipant { id, participant: l }).into()); + return Some( + (ProcessorMessage::InvalidParticipant { id: substrate_sign_id, participant: l }) + .into(), + ); } parsed.insert(l, res); } @@ -379,30 +414,36 @@ impl SubstrateSigner { FrostError::MissingParticipant(_) => unreachable!(), FrostError::InvalidPreprocess(l) | FrostError::InvalidShare(l) => { - return Some((ProcessorMessage::InvalidParticipant { id, participant: l }).into()) + return Some( + (ProcessorMessage::InvalidParticipant { id: substrate_sign_id, participant: l }) + .into(), + ) } }, }; - info!("signed batch {} with attempt #{}", hex::encode(id.id), id.attempt); + info!("signed batch {} with attempt #{}", hex::encode(id), attempt); let batch = - SignedBatch { batch: self.signable.remove(&id.id).unwrap(), signature: sig.into() }; + SignedBatch { batch: self.signable.remove(&id).unwrap(), signature: sig.into() }; // Save the batch in case it's needed for recovery SubstrateSignerDb::::save_batch(txn, &batch); - SubstrateSignerDb::::complete(txn, id.id); + SubstrateSignerDb::::complete(txn, id); // Stop trying to sign for this batch - assert!(self.attempt.remove(&id.id).is_some()); - assert!(self.preprocessing.remove(&id.id).is_none()); - assert!(self.signing.remove(&id.id).is_none()); + assert!(self.attempt.remove(&id).is_some()); + assert!(self.preprocessing.remove(&id).is_none()); + assert!(self.signing.remove(&id).is_none()); Some((messages::substrate::ProcessorMessage::SignedBatch { batch }).into()) } CoordinatorMessage::BatchReattempt { id } => { - self.attempt(txn, id.id, id.attempt).await.map(Into::into) + let SubstrateSignableId::Batch(batch_id) = id.id else { + panic!("BatchReattempt passed non-Batch ID") + }; + self.attempt(txn, batch_id, id.attempt).await.map(Into::into) } } } diff --git a/processor/src/tests/cosigner.rs b/processor/src/tests/cosigner.rs new file mode 100644 index 00000000..a910e6d2 --- /dev/null +++ b/processor/src/tests/cosigner.rs @@ -0,0 +1,126 @@ +use std::collections::HashMap; + +use rand_core::{RngCore, OsRng}; + +use ciphersuite::group::GroupEncoding; +use frost::{ + curve::Ristretto, + Participant, + dkg::tests::{key_gen, clone_without}, +}; + +use sp_application_crypto::{RuntimePublic, sr25519::Public}; + +use serai_db::{DbTxn, Db, MemDb}; + +use serai_client::primitives::*; + +use messages::coordinator::*; +use crate::cosigner::Cosigner; + +#[tokio::test] +async fn test_cosigner() { + let keys = key_gen::<_, Ristretto>(&mut OsRng); + + let participant_one = Participant::new(1).unwrap(); + + let block = [0xaa; 32]; + + let actual_id = SubstrateSignId { + key: keys.values().next().unwrap().group_key().to_bytes(), + id: SubstrateSignableId::CosigningSubstrateBlock(block), + attempt: (OsRng.next_u64() >> 32).try_into().unwrap(), + }; + + let mut signing_set = vec![]; + while signing_set.len() < usize::from(keys.values().next().unwrap().params().t()) { + let candidate = Participant::new( + u16::try_from((OsRng.next_u64() % u64::try_from(keys.len()).unwrap()) + 1).unwrap(), + ) + .unwrap(); + if signing_set.contains(&candidate) { + continue; + } + signing_set.push(candidate); + } + + let mut signers = HashMap::new(); + let mut dbs = HashMap::new(); + let mut preprocesses = HashMap::new(); + for i in 1 ..= keys.len() { + let i = Participant::new(u16::try_from(i).unwrap()).unwrap(); + let keys = keys.get(&i).unwrap().clone(); + + let mut db = MemDb::new(); + let mut txn = db.txn(); + let (signer, preprocess) = + Cosigner::new(&mut txn, vec![keys], block, actual_id.attempt).unwrap(); + + match preprocess { + // All participants should emit a preprocess + ProcessorMessage::CosignPreprocess { id, preprocesses: mut these_preprocesses } => { + assert_eq!(id, actual_id); + assert_eq!(these_preprocesses.len(), 1); + if signing_set.contains(&i) { + preprocesses.insert(i, these_preprocesses.swap_remove(0)); + } + } + _ => panic!("didn't get preprocess back"), + } + txn.commit(); + + signers.insert(i, signer); + dbs.insert(i, db); + } + + let mut shares = HashMap::new(); + for i in &signing_set { + let mut txn = dbs.get_mut(i).unwrap().txn(); + match signers + .get_mut(i) + .unwrap() + .handle( + &mut txn, + CoordinatorMessage::SubstratePreprocesses { + id: actual_id.clone(), + preprocesses: clone_without(&preprocesses, i), + }, + ) + .await + .unwrap() + { + ProcessorMessage::SubstrateShare { id, shares: mut these_shares } => { + assert_eq!(id, actual_id); + assert_eq!(these_shares.len(), 1); + shares.insert(*i, these_shares.swap_remove(0)); + } + _ => panic!("didn't get share back"), + } + txn.commit(); + } + + for i in &signing_set { + let mut txn = dbs.get_mut(i).unwrap().txn(); + match signers + .get_mut(i) + .unwrap() + .handle( + &mut txn, + CoordinatorMessage::SubstrateShares { + id: actual_id.clone(), + shares: clone_without(&shares, i), + }, + ) + .await + .unwrap() + { + ProcessorMessage::CosignedBlock { block: signed_block, signature } => { + assert_eq!(signed_block, block); + assert!(Public::from_raw(keys[&participant_one].group_key().to_bytes()) + .verify(&cosign_block_msg(block), &Signature(signature.try_into().unwrap()))); + } + _ => panic!("didn't get cosigned block back"), + } + txn.commit(); + } +} diff --git a/processor/src/tests/mod.rs b/processor/src/tests/mod.rs index 05f9cd0b..37d68719 100644 --- a/processor/src/tests/mod.rs +++ b/processor/src/tests/mod.rs @@ -7,6 +7,7 @@ pub(crate) use scanner::{test_scanner, test_no_deadlock_in_multisig_completed}; mod signer; pub(crate) use signer::{sign, test_signer}; +mod cosigner; mod substrate_signer; mod wallet; diff --git a/processor/src/tests/substrate_signer.rs b/processor/src/tests/substrate_signer.rs index def5b747..b47fb9c7 100644 --- a/processor/src/tests/substrate_signer.rs +++ b/processor/src/tests/substrate_signer.rs @@ -18,7 +18,7 @@ use serai_client::{primitives::*, in_instructions::primitives::*}; use messages::{ substrate, - coordinator::{self, BatchSignId, CoordinatorMessage}, + coordinator::{self, SubstrateSignableId, SubstrateSignId, CoordinatorMessage}, ProcessorMessage, }; use crate::substrate_signer::SubstrateSigner; @@ -48,9 +48,9 @@ async fn test_substrate_signer() { ], }; - let actual_id = BatchSignId { + let actual_id = SubstrateSignId { key: keys.values().next().unwrap().group_key().to_bytes(), - id: (batch.network, batch.id).encode().try_into().unwrap(), + id: SubstrateSignableId::Batch((batch.network, batch.id).encode().try_into().unwrap()), attempt: 0, }; @@ -107,7 +107,7 @@ async fn test_substrate_signer() { .unwrap() .handle( &mut txn, - CoordinatorMessage::BatchPreprocesses { + CoordinatorMessage::SubstratePreprocesses { id: actual_id.clone(), preprocesses: clone_without(&preprocesses, i), }, @@ -115,7 +115,7 @@ async fn test_substrate_signer() { .await .unwrap() { - ProcessorMessage::Coordinator(coordinator::ProcessorMessage::BatchShare { + ProcessorMessage::Coordinator(coordinator::ProcessorMessage::SubstrateShare { id, shares: mut these_shares, }) => { @@ -135,7 +135,7 @@ async fn test_substrate_signer() { .unwrap() .handle( &mut txn, - CoordinatorMessage::BatchShares { + CoordinatorMessage::SubstrateShares { id: actual_id.clone(), shares: clone_without(&shares, i), }, diff --git a/substrate/client/src/serai/validator_sets.rs b/substrate/client/src/serai/validator_sets.rs index 8294c6c6..2bb47479 100644 --- a/substrate/client/src/serai/validator_sets.rs +++ b/substrate/client/src/serai/validator_sets.rs @@ -55,6 +55,13 @@ impl<'a> SeraiValidatorSets<'a> { self.0.storage(PALLET, "AllocationPerKeyShare", Some(vec![scale_value(network)])).await } + pub async fn total_allocated_stake( + &self, + network: NetworkId, + ) -> Result, SeraiError> { + self.0.storage(PALLET, "TotalAllocatedStake", Some(vec![scale_value(network)])).await + } + pub async fn allocation( &self, network: NetworkId, diff --git a/tests/coordinator/src/lib.rs b/tests/coordinator/src/lib.rs index bfe6d0d9..699d275c 100644 --- a/tests/coordinator/src/lib.rs +++ b/tests/coordinator/src/lib.rs @@ -223,9 +223,11 @@ impl Processor { /// Receive a message from the coordinator as a processor. pub async fn recv_message(&mut self) -> CoordinatorMessage { - let msg = tokio::time::timeout(Duration::from_secs(10), self.queue.next(Service::Coordinator)) - .await - .unwrap(); + // Set a timeout of an entire 6 minutes as cosigning may be delayed by up to 5 minutes + let msg = + tokio::time::timeout(Duration::from_secs(6 * 60), self.queue.next(Service::Coordinator)) + .await + .unwrap(); assert_eq!(msg.from, Service::Coordinator); assert_eq!(msg.id, self.next_recv_id); self.queue.ack(Service::Coordinator, msg.id).await; diff --git a/tests/coordinator/src/tests/batch.rs b/tests/coordinator/src/tests/batch.rs index 49ceb20b..c1a802e9 100644 --- a/tests/coordinator/src/tests/batch.rs +++ b/tests/coordinator/src/tests/batch.rs @@ -23,7 +23,10 @@ use serai_client::{ InInstructionsEvent, }, }; -use messages::{coordinator::BatchSignId, SubstrateContext, CoordinatorMessage}; +use messages::{ + coordinator::{SubstrateSignableId, SubstrateSignId}, + SubstrateContext, CoordinatorMessage, +}; use crate::{*, tests::*}; @@ -35,9 +38,9 @@ pub async fn batch( ) -> u64 { let mut id = [0; 5]; OsRng.fill_bytes(&mut id); - let id = BatchSignId { + let id = SubstrateSignId { key: (::generator() * **substrate_key).to_bytes(), - id, + id: SubstrateSignableId::Batch(id), attempt: 0, }; @@ -83,7 +86,10 @@ pub async fn batch( let first_preprocesses = processors[known_signer].recv_message().await; let participants = match first_preprocesses { CoordinatorMessage::Coordinator( - messages::coordinator::CoordinatorMessage::BatchPreprocesses { id: this_id, preprocesses }, + messages::coordinator::CoordinatorMessage::SubstratePreprocesses { + id: this_id, + preprocesses, + }, ) => { assert_eq!(&id, &this_id); assert_eq!(preprocesses.len(), THRESHOLD - 1); @@ -97,7 +103,7 @@ pub async fn batch( participants.insert(known_signer_i); participants } - _ => panic!("coordinator didn't send back BatchPreprocesses"), + _ => panic!("coordinator didn't send back SubstratePreprocesses"), }; for i in participants.clone() { @@ -117,7 +123,7 @@ pub async fn batch( assert_eq!( processor.recv_message().await, CoordinatorMessage::Coordinator( - messages::coordinator::CoordinatorMessage::BatchPreprocesses { + messages::coordinator::CoordinatorMessage::SubstratePreprocesses { id: id.clone(), preprocesses } @@ -129,7 +135,7 @@ pub async fn batch( let processor = &mut processors[processor_is.iter().position(|p_i| u16::from(*p_i) == u16::from(i)).unwrap()]; processor - .send_message(messages::coordinator::ProcessorMessage::BatchShare { + .send_message(messages::coordinator::ProcessorMessage::SubstrateShare { id: id.clone(), shares: vec![[u8::try_from(u16::from(i)).unwrap(); 32]], }) @@ -148,7 +154,7 @@ pub async fn batch( assert_eq!( processor.recv_message().await, - CoordinatorMessage::Coordinator(messages::coordinator::CoordinatorMessage::BatchShares { + CoordinatorMessage::Coordinator(messages::coordinator::CoordinatorMessage::SubstrateShares { id: id.clone(), shares, }) @@ -174,7 +180,10 @@ pub async fn batch( let serai = processors[0].serai().await; let mut last_serai_block = serai.latest_block().await.unwrap().number(); - for processor in processors.iter_mut() { + for (i, processor) in processors.iter_mut().enumerate() { + if i == excluded_signer { + continue; + } processor .send_message(messages::substrate::ProcessorMessage::SignedBatch { batch: batch.clone() }) .await; @@ -214,9 +223,9 @@ pub async fn batch( // Verify the coordinator sends SubstrateBlock to all processors let last_block = serai.block_by_number(last_serai_block).await.unwrap().unwrap(); - for processor in processors.iter_mut() { + for i in 0 .. processors.len() { assert_eq!( - processor.recv_message().await, + potentially_cosign(processors, i, processor_is, substrate_key).await, messages::CoordinatorMessage::Substrate( messages::substrate::CoordinatorMessage::SubstrateBlock { context: SubstrateContext { @@ -232,7 +241,7 @@ pub async fn batch( ); // Send the ack as expected, though it shouldn't trigger any observable behavior - processor + processors[i] .send_message(messages::ProcessorMessage::Coordinator( messages::coordinator::ProcessorMessage::SubstrateBlockAck { network: batch.batch.network, diff --git a/tests/coordinator/src/tests/cosign.rs b/tests/coordinator/src/tests/cosign.rs new file mode 100644 index 00000000..4de3ce20 --- /dev/null +++ b/tests/coordinator/src/tests/cosign.rs @@ -0,0 +1,172 @@ +use std::collections::{HashSet, HashMap}; + +use zeroize::Zeroizing; +use rand_core::{RngCore, OsRng}; + +use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto}; +use dkg::Participant; + +use serai_client::primitives::Signature; +use messages::{ + coordinator::{SubstrateSignableId, cosign_block_msg}, + CoordinatorMessage, +}; + +use crate::{*, tests::*}; + +pub async fn potentially_cosign( + processors: &mut [Processor], + primary_processor: usize, + processor_is: &[u8], + substrate_key: &Zeroizing<::F>, +) -> CoordinatorMessage { + let msg = processors[primary_processor].recv_message().await; + let messages::CoordinatorMessage::Coordinator( + messages::coordinator::CoordinatorMessage::CosignSubstrateBlock { id }, + ) = msg.clone() + else { + return msg; + }; + let SubstrateSignableId::CosigningSubstrateBlock(block) = id.id else { + panic!("CosignSubstrateBlock didn't have CosigningSubstrateBlock id") + }; + + for (i, processor) in processors.iter_mut().enumerate() { + if i == primary_processor { + continue; + } + assert_eq!(msg, processor.recv_message().await); + } + + // Select a random participant to exclude, so we know for sure who *is* participating + assert_eq!(COORDINATORS - THRESHOLD, 1); + let excluded_signer = + usize::try_from(OsRng.next_u64() % u64::try_from(processors.len()).unwrap()).unwrap(); + for (i, processor) in processors.iter_mut().enumerate() { + if i == excluded_signer { + continue; + } + + processor + .send_message(messages::coordinator::ProcessorMessage::CosignPreprocess { + id: id.clone(), + preprocesses: vec![[processor_is[i]; 64].to_vec()], + }) + .await; + } + + // Send from the excluded signer so they don't stay stuck + processors[excluded_signer] + .send_message(messages::coordinator::ProcessorMessage::CosignPreprocess { + id: id.clone(), + preprocesses: vec![[processor_is[excluded_signer]; 64].to_vec()], + }) + .await; + + // Read from a known signer to find out who was selected to sign + let known_signer = (excluded_signer + 1) % COORDINATORS; + let first_preprocesses = processors[known_signer].recv_message().await; + let participants = match first_preprocesses { + CoordinatorMessage::Coordinator( + messages::coordinator::CoordinatorMessage::SubstratePreprocesses { + id: this_id, + preprocesses, + }, + ) => { + assert_eq!(&id, &this_id); + assert_eq!(preprocesses.len(), THRESHOLD - 1); + let known_signer_i = Participant::new(u16::from(processor_is[known_signer])).unwrap(); + assert!(!preprocesses.contains_key(&known_signer_i)); + + let mut participants = preprocesses.keys().cloned().collect::>(); + for (p, preprocess) in preprocesses { + assert_eq!(preprocess, vec![u8::try_from(u16::from(p)).unwrap(); 64]); + } + participants.insert(known_signer_i); + participants + } + _ => panic!("coordinator didn't send back SubstratePreprocesses"), + }; + + for i in participants.clone() { + if u16::from(i) == u16::from(processor_is[known_signer]) { + continue; + } + + let processor = + &mut processors[processor_is.iter().position(|p_i| u16::from(*p_i) == u16::from(i)).unwrap()]; + let mut preprocesses = participants + .clone() + .into_iter() + .map(|i| (i, [u8::try_from(u16::from(i)).unwrap(); 64].to_vec())) + .collect::>(); + preprocesses.remove(&i); + + assert_eq!( + processor.recv_message().await, + CoordinatorMessage::Coordinator( + messages::coordinator::CoordinatorMessage::SubstratePreprocesses { + id: id.clone(), + preprocesses + } + ) + ); + } + + for i in participants.clone() { + let processor = + &mut processors[processor_is.iter().position(|p_i| u16::from(*p_i) == u16::from(i)).unwrap()]; + processor + .send_message(messages::coordinator::ProcessorMessage::SubstrateShare { + id: id.clone(), + shares: vec![[u8::try_from(u16::from(i)).unwrap(); 32]], + }) + .await; + } + for i in participants.clone() { + let processor = + &mut processors[processor_is.iter().position(|p_i| u16::from(*p_i) == u16::from(i)).unwrap()]; + let mut shares = participants + .clone() + .into_iter() + .map(|i| (i, [u8::try_from(u16::from(i)).unwrap(); 32])) + .collect::>(); + shares.remove(&i); + + assert_eq!( + processor.recv_message().await, + CoordinatorMessage::Coordinator(messages::coordinator::CoordinatorMessage::SubstrateShares { + id: id.clone(), + shares, + }) + ); + } + + // Expand to a key pair as Schnorrkel expects + // It's the private key + 32-bytes of entropy for nonces + the public key + let mut schnorrkel_key_pair = [0; 96]; + schnorrkel_key_pair[.. 32].copy_from_slice(&substrate_key.to_repr()); + OsRng.fill_bytes(&mut schnorrkel_key_pair[32 .. 64]); + schnorrkel_key_pair[64 ..] + .copy_from_slice(&(::generator() * **substrate_key).to_bytes()); + let signature = Signature( + schnorrkel::keys::Keypair::from_bytes(&schnorrkel_key_pair) + .unwrap() + .sign_simple(b"substrate", &cosign_block_msg(block)) + .to_bytes(), + ); + + for (i, processor) in processors.iter_mut().enumerate() { + if i == excluded_signer { + continue; + } + processor + .send_message(messages::coordinator::ProcessorMessage::CosignedBlock { + block, + signature: signature.0.to_vec(), + }) + .await; + } + + processors[primary_processor].recv_message().await +} diff --git a/tests/coordinator/src/tests/mod.rs b/tests/coordinator/src/tests/mod.rs index 0e84ec66..b8fd248c 100644 --- a/tests/coordinator/src/tests/mod.rs +++ b/tests/coordinator/src/tests/mod.rs @@ -9,6 +9,9 @@ use crate::*; mod key_gen; pub use key_gen::key_gen; +mod cosign; +pub use cosign::potentially_cosign; + mod batch; pub use batch::batch; diff --git a/tests/coordinator/src/tests/sign.rs b/tests/coordinator/src/tests/sign.rs index a00935ee..7ee0c1e1 100644 --- a/tests/coordinator/src/tests/sign.rs +++ b/tests/coordinator/src/tests/sign.rs @@ -328,9 +328,9 @@ async fn sign_test() { let plan_id = plan_id; // We should now get a SubstrateBlock - for processor in processors.iter_mut() { + for i in 0 .. processors.len() { assert_eq!( - processor.recv_message().await, + potentially_cosign(&mut processors, i, &participant_is, &substrate_key).await, messages::CoordinatorMessage::Substrate( messages::substrate::CoordinatorMessage::SubstrateBlock { context: SubstrateContext { @@ -346,7 +346,7 @@ async fn sign_test() { ); // Send the ACK, claiming there's a plan to sign - processor + processors[i] .send_message(messages::ProcessorMessage::Coordinator( messages::coordinator::ProcessorMessage::SubstrateBlockAck { network: NetworkId::Bitcoin, diff --git a/tests/full-stack/src/tests/mint_and_burn.rs b/tests/full-stack/src/tests/mint_and_burn.rs index cc5b3af0..fc3c5ee8 100644 --- a/tests/full-stack/src/tests/mint_and_burn.rs +++ b/tests/full-stack/src/tests/mint_and_burn.rs @@ -555,7 +555,7 @@ async fn mint_and_burn_test() { // Check for up to 5 minutes let mut found = false; let mut i = 0; - while i < (5 * 6) { + while i < (15 * 6) { if let Ok(hash) = rpc.get_block_hash(start_bitcoin_block).await { let block = rpc.get_block(&hash).await.unwrap(); start_bitcoin_block += 1; diff --git a/tests/processor/src/tests/batch.rs b/tests/processor/src/tests/batch.rs index 40dfcc0c..96486567 100644 --- a/tests/processor/src/tests/batch.rs +++ b/tests/processor/src/tests/batch.rs @@ -26,10 +26,10 @@ pub(crate) async fn recv_batch_preprocesses( substrate_key: &[u8; 32], batch: &Batch, attempt: u32, -) -> (BatchSignId, HashMap>) { - let id = BatchSignId { +) -> (SubstrateSignId, HashMap>) { + let id = SubstrateSignId { key: *substrate_key, - id: (batch.network, batch.id).encode().try_into().unwrap(), + id: SubstrateSignableId::Batch((batch.network, batch.id).encode().try_into().unwrap()), attempt, }; @@ -86,7 +86,7 @@ pub(crate) async fn recv_batch_preprocesses( pub(crate) async fn sign_batch( coordinators: &mut [Coordinator], key: [u8; 32], - id: BatchSignId, + id: SubstrateSignId, preprocesses: HashMap>, ) -> SignedBatch { assert_eq!(preprocesses.len(), THRESHOLD); @@ -96,7 +96,7 @@ pub(crate) async fn sign_batch( if preprocesses.contains_key(&i) { coordinator - .send_message(messages::coordinator::CoordinatorMessage::BatchPreprocesses { + .send_message(messages::coordinator::CoordinatorMessage::SubstratePreprocesses { id: id.clone(), preprocesses: clone_without(&preprocesses, &i), }) @@ -111,7 +111,7 @@ pub(crate) async fn sign_batch( if preprocesses.contains_key(&i) { match coordinator.recv_message().await { messages::ProcessorMessage::Coordinator( - messages::coordinator::ProcessorMessage::BatchShare { + messages::coordinator::ProcessorMessage::SubstrateShare { id: this_id, shares: mut these_shares, }, @@ -130,7 +130,7 @@ pub(crate) async fn sign_batch( if preprocesses.contains_key(&i) { coordinator - .send_message(messages::coordinator::CoordinatorMessage::BatchShares { + .send_message(messages::coordinator::CoordinatorMessage::SubstrateShares { id: id.clone(), shares: clone_without(&shares, &i), })