From 7275a95907de065509b9310d38e03dad2f2c8205 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Fri, 13 Oct 2023 23:36:07 -0400 Subject: [PATCH] Break handle_processor_messages out to handle_processor_message, move a helper fn to substrate --- coordinator/src/main.rs | 1037 ++++++++++++++---------------- coordinator/src/substrate/mod.rs | 160 +++-- 2 files changed, 606 insertions(+), 591 deletions(-) diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 853e2ca1..91971b7b 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -18,7 +18,11 @@ use frost::Participant; use serai_db::{DbTxn, Db}; use serai_env as env; -use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet, Public, Serai}; +use serai_client::{ + primitives::NetworkId, + validator_sets::primitives::{Session, ValidatorSet}, + Public, Serai, +}; use message_queue::{Service, client::MessageQueue}; @@ -108,19 +112,18 @@ async fn add_tributary( } async fn publish_signed_transaction( - db: &mut D, + txn: &mut D::Transaction<'_>, tributary: &Tributary, tx: Transaction, ) { log::debug!("publishing transaction {}", hex::encode(tx.hash())); - let mut txn = db.txn(); let signer = if let TransactionKind::Signed(signed) = tx.kind() { let signer = signed.signer; // Safe as we should deterministically create transactions, meaning if this is already on-disk, // it's what we're saving now - MainDb::::save_signed_transaction(&mut txn, signed.nonce, tx); + MainDb::::save_signed_transaction(txn, signed.nonce, tx); signer } else { @@ -130,7 +133,7 @@ async fn publish_signed_transaction( // If we're trying to publish 5, when the last transaction published was 3, this will delay // publication until the point in time we publish 4 while let Some(tx) = MainDb::::take_signed_transaction( - &mut txn, + txn, tributary .next_nonce(signer) .await @@ -142,39 +145,497 @@ async fn publish_signed_transaction( // Our use case only calls this function sequentially assert!(tributary.add_transaction(tx).await, "created an invalid transaction"); } - txn.commit(); } -/// Verifies `Batch`s which have already been indexed from Substrate. -async fn verify_published_batches( - txn: &mut D::Transaction<'_>, +async fn handle_processor_message( + db: &mut D, + key: &Zeroizing<::F>, + serai: &Serai, + tributaries: &HashMap>, network: NetworkId, - optimistic_up_to: u32, -) -> Option { - let last = MainDb::::last_verified_batch(txn, network); - for id in last.map(|last| last + 1).unwrap_or(0) ..= optimistic_up_to { - let Some(on_chain) = SubstrateDb::::batch_instructions_hash(txn, network, id) else { - break; - }; - let off_chain = MainDb::::expected_batch(txn, network, id).unwrap(); - if on_chain != off_chain { - // Halt operations on this network and spin, as this is a critical fault - loop { - log::error!( - "{}! network: {:?} id: {} off-chain: {} on-chain: {}", - "on-chain batch doesn't match off-chain", - network, - id, - hex::encode(off_chain), - hex::encode(on_chain), - ); - sleep(Duration::from_secs(60)).await; - } - } - MainDb::::save_last_verified_batch(txn, network, id); + msg: &processors::Message, +) -> bool { + if MainDb::::handled_message(db, msg.network, msg.id) { + return true; } - MainDb::::last_verified_batch(txn, network) + let mut txn = db.txn(); + + let mut relevant_tributary = match &msg.msg { + // We'll only receive these if we fired GenerateKey, which we'll only do if if we're + // in-set, making the Tributary relevant + ProcessorMessage::KeyGen(inner_msg) => match inner_msg { + key_gen::ProcessorMessage::Commitments { id, .. } => Some(id.set.session), + key_gen::ProcessorMessage::Shares { id, .. } => Some(id.set.session), + key_gen::ProcessorMessage::GeneratedKeyPair { id, .. } => Some(id.set.session), + }, + // TODO: Review replacing key with Session in messages? + ProcessorMessage::Sign(inner_msg) => match inner_msg { + // We'll only receive Preprocess and Share if we're actively signing + sign::ProcessorMessage::Preprocess { id, .. } => { + Some(SubstrateDb::::session_for_key(&txn, &id.key).unwrap()) + } + sign::ProcessorMessage::Share { id, .. } => { + Some(SubstrateDb::::session_for_key(&txn, &id.key).unwrap()) + } + // While the Processor's Scanner will always emit Completed, that's routed through the + // Signer and only becomes a ProcessorMessage::Completed if the Signer is present and + // confirms it + sign::ProcessorMessage::Completed { key, .. } => { + Some(SubstrateDb::::session_for_key(&txn, key).unwrap()) + } + }, + ProcessorMessage::Coordinator(inner_msg) => match inner_msg { + // This is a special case as it's relevant to *all* Tributaries for this network + // It doesn't return a Tributary to become `relevant_tributary` though + coordinator::ProcessorMessage::SubstrateBlockAck { network, block, plans } => { + assert_eq!( + *network, msg.network, + "processor claimed to be a different network than it was for SubstrateBlockAck", + ); + + // TODO: Find all Tributaries active at this Substrate block, and make sure we have + // them all (if we were present in them) + + for tributary in tributaries.values() { + // TODO: This needs to be scoped per multisig + TributaryDb::::set_plan_ids(&mut txn, tributary.spec.genesis(), *block, plans); + + let tx = Transaction::SubstrateBlock(*block); + log::trace!("processor message effected transaction {}", hex::encode(tx.hash())); + log::trace!("providing transaction {}", hex::encode(tx.hash())); + let res = tributary.tributary.provide_transaction(tx).await; + if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) { + if res == Err(ProvidedError::LocalMismatchesOnChain) { + // Spin, since this is a crit for this Tributary + loop { + log::error!( + "{}. tributary: {}, provided: SubstrateBlock({})", + "tributary added distinct provided to delayed locally provided TX", + hex::encode(tributary.spec.genesis()), + block, + ); + sleep(Duration::from_secs(60)).await; + } + } + panic!("provided an invalid transaction: {res:?}"); + } + } + + None + } + // We'll only fire these if we are the Substrate signer, making the Tributary relevant + coordinator::ProcessorMessage::BatchPreprocess { id, .. } => { + Some(SubstrateDb::::session_for_key(&txn, &id.key).unwrap()) + } + coordinator::ProcessorMessage::BatchShare { id, .. } => { + Some(SubstrateDb::::session_for_key(&txn, &id.key).unwrap()) + } + }, + // These don't return a relevant Tributary as there's no Tributary with action expected + ProcessorMessage::Substrate(inner_msg) => match inner_msg { + processor_messages::substrate::ProcessorMessage::Batch { batch } => { + assert_eq!( + batch.network, msg.network, + "processor sent us a batch for a different network than it was for", + ); + let this_batch_id = batch.id; + MainDb::::save_expected_batch(&mut txn, batch); + + // Re-define batch + // We can't drop it, yet it shouldn't be accidentally used in the following block + #[allow(clippy::let_unit_value, unused_variables)] + let batch = (); + + // This won't be complete, as this call is when a `Batch` message is received, which + // will be before we get a `SignedBatch` + // It is, however, incremental + // When we need a complete version, we use another call, continuously called as-needed + substrate::verify_published_batches::(&mut txn, msg.network, this_batch_id).await; + + None + } + // If this is a new Batch, immediately publish it (if we can) + processor_messages::substrate::ProcessorMessage::SignedBatch { batch } => { + assert_eq!( + batch.batch.network, msg.network, + "processor sent us a signed batch for a different network than it was for", + ); + // TODO: Check this key's key pair's substrate key is authorized to publish batches + + log::debug!("received batch {:?} {}", batch.batch.network, batch.batch.id); + + // Save this batch to the disk + MainDb::::save_batch(&mut txn, batch.clone()); + + // Get the next-to-execute batch ID + let mut next = substrate::get_expected_next_batch(serai, network).await; + + // Since we have a new batch, publish all batches yet to be published to Serai + // This handles the edge-case where batch n+1 is signed before batch n is + let mut batches = VecDeque::new(); + while let Some(batch) = MainDb::::batch(&txn, network, next) { + batches.push_back(batch); + next += 1; + } + + let start_id = batches.front().map(|batch| batch.batch.id); + let last_id = batches.back().map(|batch| batch.batch.id); + while let Some(batch) = batches.pop_front() { + // If this Batch should no longer be published, continue + if substrate::get_expected_next_batch(serai, network).await > batch.batch.id { + continue; + } + + let tx = Serai::execute_batch(batch.clone()); + log::debug!("attempting to publish batch {:?} {}", batch.batch.network, batch.batch.id,); + // This publish may fail if this transactions already exists in the mempool, which is + // possible, or if this batch was already executed on-chain + // Either case will have eventual resolution and be handled by the above check on if + // this batch should execute + let res = serai.publish(&tx).await; + if res.is_ok() { + log::info!( + "published batch {network:?} {} (block {})", + batch.batch.id, + hex::encode(batch.batch.block), + ); + } else { + log::debug!( + "couldn't publish batch {:?} {}: {:?}", + batch.batch.network, + batch.batch.id, + res, + ); + // If we failed to publish it, restore it + batches.push_front(batch); + // Sleep for a few seconds before retrying to prevent hammering the node + sleep(Duration::from_secs(5)).await; + } + } + // Verify the `Batch`s we just published + if let Some(last_id) = last_id { + loop { + let verified = + substrate::verify_published_batches::(&mut txn, msg.network, last_id).await; + if verified == Some(last_id) { + break; + } + } + } + + // Check if any of these `Batch`s were a handover `Batch` + // If so, we need to publish any delayed `Batch` provided transactions + let mut relevant = None; + if let Some(start_id) = start_id { + let last_id = last_id.unwrap(); + for batch in start_id .. last_id { + if let Some(set) = MainDb::::is_handover_batch(&txn, msg.network, batch) { + // relevant may already be Some. This is a safe over-write, as we don't need to + // be concerned for handovers of Tributaries which have completed their handovers + // While this does bypass the checks that Tributary would've performed at the + // time, if we ever actually participate in a handover, we will verify *all* + // prior `Batch`s, including the ones which would've been explicitly verified + // then + // + // We should only declare this session relevant if it's relevant to us + // We only set handover `Batch`s when we're trying to produce said `Batch`, so this + // would be a `Batch` we were involved in the production of + // Accordingly, iy's relevant + relevant = Some(set.session); + } + } + } + relevant + } + }, + }; + + // If we have a relevant Tributary, check it's actually still relevant and has yet to be retired + if let Some(relevant_tributary_value) = relevant_tributary { + if !is_active_set( + serai, + ValidatorSet { network: msg.network, session: relevant_tributary_value }, + ) + .await + { + relevant_tributary = None; + } + } + + // If there's a relevant Tributary... + if let Some(relevant_tributary) = relevant_tributary { + // Make sure we have it + // Per the reasoning above, we only return a Tributary as relevant if we're a participant + // Accordingly, we do *need* to have this Tributary now to handle it UNLESS the Tributary has + // already completed and this is simply an old message (which we prior checked) + let Some(ActiveTributary { spec, tributary }) = tributaries.get(&relevant_tributary) else { + // Since we don't, sleep for a fraction of a second and return false, signaling we didn't + // handle this message + // At the start of the loop which calls this function, we'll check for new tributaries, making + // this eventually resolve + sleep(Duration::from_millis(100)).await; + return false; + }; + + let genesis = spec.genesis(); + let pub_key = Ristretto::generator() * key.deref(); + + let txs = match msg.msg.clone() { + ProcessorMessage::KeyGen(inner_msg) => match inner_msg { + key_gen::ProcessorMessage::Commitments { id, commitments } => { + vec![Transaction::DkgCommitments(id.attempt, commitments, Transaction::empty_signed())] + } + key_gen::ProcessorMessage::Shares { id, mut shares } => { + // Create a MuSig-based machine to inform Substrate of this key generation + let nonces = crate::tributary::dkg_confirmation_nonces(key, spec, id.attempt); + + let mut tx_shares = Vec::with_capacity(shares.len()); + for i in 1 ..= spec.n() { + let i = Participant::new(i).unwrap(); + if i == + spec + .i(pub_key) + .expect("processor message to DKG for a session we aren't a validator in") + { + continue; + } + tx_shares + .push(shares.remove(&i).expect("processor didn't send share for another validator")); + } + + vec![Transaction::DkgShares { + attempt: id.attempt, + shares: tx_shares, + confirmation_nonces: nonces, + signed: Transaction::empty_signed(), + }] + } + key_gen::ProcessorMessage::GeneratedKeyPair { id, substrate_key, network_key } => { + assert_eq!( + id.set.network, msg.network, + "processor claimed to be a different network than it was for GeneratedKeyPair", + ); + // TODO2: Also check the other KeyGenId fields + + // Tell the Tributary the key pair, get back the share for the MuSig signature + let share = crate::tributary::generated_key_pair::( + &mut txn, + key, + spec, + &(Public(substrate_key), network_key.try_into().unwrap()), + id.attempt, + ); + + match share { + Ok(share) => { + vec![Transaction::DkgConfirmed(id.attempt, share, Transaction::empty_signed())] + } + Err(p) => { + todo!("participant {p:?} sent invalid DKG confirmation preprocesses") + } + } + } + }, + ProcessorMessage::Sign(msg) => match msg { + sign::ProcessorMessage::Preprocess { id, preprocess } => { + if id.attempt == 0 { + MainDb::::save_first_preprocess(&mut txn, network, id.id, preprocess); + + vec![] + } else { + vec![Transaction::SignPreprocess(SignData { + plan: id.id, + attempt: id.attempt, + data: preprocess, + signed: Transaction::empty_signed(), + })] + } + } + sign::ProcessorMessage::Share { id, share } => vec![Transaction::SignShare(SignData { + plan: id.id, + attempt: id.attempt, + data: share, + signed: Transaction::empty_signed(), + })], + sign::ProcessorMessage::Completed { key: _, id, tx } => { + let r = Zeroizing::new(::F::random(&mut OsRng)); + #[allow(non_snake_case)] + let R = ::generator() * r.deref(); + let mut tx = Transaction::SignCompleted { + plan: id, + tx_hash: tx, + first_signer: pub_key, + signature: SchnorrSignature { R, s: ::F::ZERO }, + }; + let signed = SchnorrSignature::sign(key, r, tx.sign_completed_challenge()); + match &mut tx { + Transaction::SignCompleted { signature, .. } => { + *signature = signed; + } + _ => unreachable!(), + } + vec![tx] + } + }, + ProcessorMessage::Coordinator(inner_msg) => match inner_msg { + coordinator::ProcessorMessage::SubstrateBlockAck { .. } => unreachable!(), + coordinator::ProcessorMessage::BatchPreprocess { id, block, preprocess } => { + log::info!( + "informed of batch (sign ID {}, attempt {}) for block {}", + hex::encode(id.id), + id.attempt, + hex::encode(block), + ); + + // If this is the first attempt instance, wait until we synchronize around the batch + // first + if id.attempt == 0 { + MainDb::::save_first_preprocess(&mut txn, spec.set().network, id.id, preprocess); + + // If this is the new key's first Batch, only create this TX once we verify all + // all prior published `Batch`s + let last_received = MainDb::::last_received_batch(&txn, msg.network).unwrap(); + let handover_batch = MainDb::::handover_batch(&txn, spec.set()); + if handover_batch.is_none() { + MainDb::::set_handover_batch(&mut txn, spec.set(), last_received); + if last_received != 0 { + // Decrease by 1, to get the ID of the Batch prior to this Batch + let prior_sets_last_batch = last_received - 1; + loop { + let successfully_verified = substrate::verify_published_batches::( + &mut txn, + msg.network, + prior_sets_last_batch, + ) + .await; + if successfully_verified == Some(prior_sets_last_batch) { + break; + } + sleep(Duration::from_secs(5)).await; + } + } + } + + // There is a race condition here. We may verify all `Batch`s from the prior set, + // start signing the handover `Batch` `n`, start signing `n+1`, have `n+1` signed + // before `n` (or at the same time), yet then the prior set forges a malicious + // `Batch` `n`. + // + // The malicious `Batch` `n` would be publishable to Serai, as Serai can't + // distinguish what's intended to be a handover `Batch`, yet then anyone could + // publish the new set's `n+1`, causing their acceptance of the handover. + // + // To fix this, if this is after the handover `Batch` and we have yet to verify + // publication of the handover `Batch`, don't yet yield the provided. + let handover_batch = MainDb::::handover_batch(&txn, spec.set()).unwrap(); + let intended = Transaction::Batch(block.0, id.id); + let mut res = vec![intended.clone()]; + if last_received > handover_batch { + if let Some(last_verified) = MainDb::::last_verified_batch(&txn, msg.network) { + if last_verified < handover_batch { + res = vec![]; + } + } else { + res = vec![]; + } + } + + if res.is_empty() { + MainDb::::queue_batch(&mut txn, spec.set(), intended); + } + + res + } else { + vec![Transaction::BatchPreprocess(SignData { + plan: id.id, + attempt: id.attempt, + data: preprocess, + signed: Transaction::empty_signed(), + })] + } + } + coordinator::ProcessorMessage::BatchShare { id, share } => { + vec![Transaction::BatchShare(SignData { + plan: id.id, + attempt: id.attempt, + data: share.to_vec(), + signed: Transaction::empty_signed(), + })] + } + }, + ProcessorMessage::Substrate(inner_msg) => match inner_msg { + processor_messages::substrate::ProcessorMessage::Batch { .. } => unreachable!(), + processor_messages::substrate::ProcessorMessage::SignedBatch { .. } => { + // We only reach here if this SignedBatch triggered the publication of a handover + // Batch + // Since the handover `Batch` was successfully published and verified, we no longer + // have to worry about the above n+1 attack + MainDb::::take_queued_batches(&mut txn, spec.set()) + } + }, + }; + + // If this created transactions, publish them + for mut tx in txs { + log::trace!("processor message effected transaction {}", hex::encode(tx.hash())); + + match tx.kind() { + TransactionKind::Provided(_) => { + log::trace!("providing transaction {}", hex::encode(tx.hash())); + let res = tributary.provide_transaction(tx.clone()).await; + if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) { + if res == Err(ProvidedError::LocalMismatchesOnChain) { + // Spin, since this is a crit for this Tributary + loop { + log::error!( + "{}. tributary: {}, provided: {:?}", + "tributary added distinct provided to delayed locally provided TX", + hex::encode(spec.genesis()), + &tx, + ); + sleep(Duration::from_secs(60)).await; + } + } + panic!("provided an invalid transaction: {res:?}"); + } + } + TransactionKind::Unsigned => { + log::trace!("publishing unsigned transaction {}", hex::encode(tx.hash())); + // Ignores the result since we can't differentiate already in-mempool from + // already on-chain from invalid + // TODO: Don't ignore the result + tributary.add_transaction(tx).await; + } + TransactionKind::Signed(_) => { + log::trace!("getting next nonce for Tributary TX in response to processor message"); + + let nonce = loop { + let Some(nonce) = + NonceDecider::::nonce(&txn, genesis, &tx).expect("signed TX didn't have nonce") + else { + // This can be None if the following events occur, in order: + // 1) We scanned the relevant transaction(s) in a Tributary block + // 2) The processor was sent a message and responded + // 3) The Tributary TXN has yet to be committed + log::warn!("nonce has yet to be saved for processor-instigated transaction"); + sleep(Duration::from_millis(100)).await; + continue; + }; + break nonce; + }; + tx.sign(&mut OsRng, genesis, key, nonce); + + publish_signed_transaction(&mut txn, tributary, tx).await; + } + } + } + } + + MainDb::::save_handled_message(&mut txn, msg.network, msg.id); + txn.commit(); + + true } async fn handle_processor_messages( @@ -185,11 +646,7 @@ async fn handle_processor_messages( network: NetworkId, mut new_tributary: mpsc::UnboundedReceiver>, ) { - let mut db_clone = db.clone(); // Enables cloning the DB while we have a txn - let pub_key = Ristretto::generator() * key.deref(); - let mut tributaries = HashMap::new(); - loop { match new_tributary.try_recv() { Ok(tributary) => { @@ -205,507 +662,9 @@ async fn handle_processor_messages( // TODO: Check this ID is sane (last handled ID or expected next ID) let msg = processors.recv(network).await; - - if !MainDb::::handled_message(&db, msg.network, msg.id) { - let mut txn = db.txn(); - - let mut relevant_tributary = match &msg.msg { - // We'll only receive these if we fired GenerateKey, which we'll only do if if we're - // in-set, making the Tributary relevant - ProcessorMessage::KeyGen(inner_msg) => match inner_msg { - key_gen::ProcessorMessage::Commitments { id, .. } => Some(id.set.session), - key_gen::ProcessorMessage::Shares { id, .. } => Some(id.set.session), - key_gen::ProcessorMessage::GeneratedKeyPair { id, .. } => Some(id.set.session), - }, - // TODO: Review replacing key with Session in messages? - ProcessorMessage::Sign(inner_msg) => match inner_msg { - // We'll only receive Preprocess and Share if we're actively signing - sign::ProcessorMessage::Preprocess { id, .. } => { - Some(SubstrateDb::::session_for_key(&txn, &id.key).unwrap()) - } - sign::ProcessorMessage::Share { id, .. } => { - Some(SubstrateDb::::session_for_key(&txn, &id.key).unwrap()) - } - // While the Processor's Scanner will always emit Completed, that's routed through the - // Signer and only becomes a ProcessorMessage::Completed if the Signer is present and - // confirms it - sign::ProcessorMessage::Completed { key, .. } => { - Some(SubstrateDb::::session_for_key(&txn, key).unwrap()) - } - }, - ProcessorMessage::Coordinator(inner_msg) => match inner_msg { - // This is a special case as it's relevant to *all* Tributaries for this network - // It doesn't return a Tributary to become `relevant_tributary` though - coordinator::ProcessorMessage::SubstrateBlockAck { network, block, plans } => { - assert_eq!( - *network, msg.network, - "processor claimed to be a different network than it was for SubstrateBlockAck", - ); - - // TODO: Find all Tributaries active at this Substrate block, and make sure we have - // them all (if we were present in them) - - for tributary in tributaries.values() { - // TODO: This needs to be scoped per multisig - TributaryDb::::set_plan_ids(&mut txn, tributary.spec.genesis(), *block, plans); - - let tx = Transaction::SubstrateBlock(*block); - log::trace!("processor message effected transaction {}", hex::encode(tx.hash())); - log::trace!("providing transaction {}", hex::encode(tx.hash())); - let res = tributary.tributary.provide_transaction(tx).await; - if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) { - if res == Err(ProvidedError::LocalMismatchesOnChain) { - // Spin, since this is a crit for this Tributary - loop { - log::error!( - "{}. tributary: {}, provided: SubstrateBlock({})", - "tributary added distinct provided to delayed locally provided TX", - hex::encode(tributary.spec.genesis()), - block, - ); - sleep(Duration::from_secs(60)).await; - } - } - panic!("provided an invalid transaction: {res:?}"); - } - } - - None - } - // We'll only fire these if we are the Substrate signer, making the Tributary relevant - coordinator::ProcessorMessage::BatchPreprocess { id, .. } => { - Some(SubstrateDb::::session_for_key(&txn, &id.key).unwrap()) - } - coordinator::ProcessorMessage::BatchShare { id, .. } => { - Some(SubstrateDb::::session_for_key(&txn, &id.key).unwrap()) - } - }, - // These don't return a relevant Tributary as there's no Tributary with action expected - ProcessorMessage::Substrate(inner_msg) => match inner_msg { - processor_messages::substrate::ProcessorMessage::Batch { batch } => { - assert_eq!( - batch.network, msg.network, - "processor sent us a batch for a different network than it was for", - ); - let this_batch_id = batch.id; - MainDb::::save_expected_batch(&mut txn, batch); - - // Re-define batch - // We can't drop it, yet it shouldn't be accidentally used in the following block - #[allow(clippy::let_unit_value, unused_variables)] - let batch = (); - - // This won't be complete, as this call is when a `Batch` message is received, which - // will be before we get a `SignedBatch` - // It is, however, incremental - // When we need a complete version, we use another call, continuously called as-needed - verify_published_batches::(&mut txn, msg.network, this_batch_id).await; - - None - } - // If this is a new Batch, immediately publish it (if we can) - processor_messages::substrate::ProcessorMessage::SignedBatch { batch } => { - assert_eq!( - batch.batch.network, msg.network, - "processor sent us a signed batch for a different network than it was for", - ); - // TODO: Check this key's key pair's substrate key is authorized to publish batches - - log::debug!("received batch {:?} {}", batch.batch.network, batch.batch.id); - - // Save this batch to the disk - MainDb::::save_batch(&mut txn, batch.clone()); - - // Get the next-to-execute batch ID - async fn get_next(serai: &Serai, network: NetworkId) -> u32 { - let mut first = true; - loop { - if !first { - log::error!( - "{} {network:?}", - "couldn't connect to Serai node to get the next batch ID for", - ); - sleep(Duration::from_secs(5)).await; - } - first = false; - - let Ok(latest_block) = serai.get_latest_block().await else { - continue; - }; - let Ok(last) = serai.get_last_batch_for_network(latest_block.hash(), network).await - else { - continue; - }; - break if let Some(last) = last { last + 1 } else { 0 }; - } - } - let mut next = get_next(&serai, network).await; - - // Since we have a new batch, publish all batches yet to be published to Serai - // This handles the edge-case where batch n+1 is signed before batch n is - let mut batches = VecDeque::new(); - while let Some(batch) = MainDb::::batch(&txn, network, next) { - batches.push_back(batch); - next += 1; - } - - let start_id = batches.front().map(|batch| batch.batch.id); - let last_id = batches.back().map(|batch| batch.batch.id); - while let Some(batch) = batches.pop_front() { - // If this Batch should no longer be published, continue - if get_next(&serai, network).await > batch.batch.id { - continue; - } - - let tx = Serai::execute_batch(batch.clone()); - log::debug!( - "attempting to publish batch {:?} {}", - batch.batch.network, - batch.batch.id, - ); - // This publish may fail if this transactions already exists in the mempool, which is - // possible, or if this batch was already executed on-chain - // Either case will have eventual resolution and be handled by the above check on if - // this batch should execute - let res = serai.publish(&tx).await; - if res.is_ok() { - log::info!( - "published batch {network:?} {} (block {})", - batch.batch.id, - hex::encode(batch.batch.block), - ); - } else { - log::debug!( - "couldn't publish batch {:?} {}: {:?}", - batch.batch.network, - batch.batch.id, - res, - ); - // If we failed to publish it, restore it - batches.push_front(batch); - // Sleep for a few seconds before retrying to prevent hammering the node - sleep(Duration::from_secs(5)).await; - } - } - // Verify the `Batch`s we just published - if let Some(last_id) = last_id { - loop { - let verified = verify_published_batches::(&mut txn, msg.network, last_id).await; - if verified == Some(last_id) { - break; - } - } - } - - // Check if any of these `Batch`s were a handover `Batch` - // If so, we need to publish any delayed `Batch` provided transactions - let mut relevant = None; - if let Some(start_id) = start_id { - let last_id = last_id.unwrap(); - for batch in start_id .. last_id { - if let Some(set) = MainDb::::is_handover_batch(&txn, msg.network, batch) { - // relevant may already be Some. This is a safe over-write, as we don't need to - // be concerned for handovers of Tributaries which have completed their handovers - // While this does bypass the checks that Tributary would've performed at the - // time, if we ever actually participate in a handover, we will verify *all* - // prior `Batch`s, including the ones which would've been explicitly verified - // then - relevant = Some(set.session); - } - } - } - relevant - } - }, - }; - - // If we have a relevant Tributary, check it's actually still relevant and has yet to be - // retired - if let Some(relevant_tributary_value) = relevant_tributary { - if !is_active_set( - &serai, - ValidatorSet { network: msg.network, session: relevant_tributary_value }, - ) - .await - { - relevant_tributary = None; - } - } - - // If there's a relevant Tributary... - if let Some(relevant_tributary) = relevant_tributary { - // Make sure we have it - // Per the reasoning above, we only return a Tributary as relevant if we're a participant - // Accordingly, we do *need* to have this Tributary now to handle it UNLESS the Tributary - // has already completed and this is simply an old message - let Some(ActiveTributary { spec, tributary }) = tributaries.get(&relevant_tributary) else { - // Since we don't, sleep for a fraction of a second and move to the next loop iteration - // At the start of the loop, we'll check for new tributaries, making this eventually - // resolve - sleep(Duration::from_millis(100)).await; - continue; - }; - - let genesis = spec.genesis(); - - let txs = match msg.msg.clone() { - ProcessorMessage::KeyGen(inner_msg) => match inner_msg { - key_gen::ProcessorMessage::Commitments { id, commitments } => { - vec![Transaction::DkgCommitments( - id.attempt, - commitments, - Transaction::empty_signed(), - )] - } - key_gen::ProcessorMessage::Shares { id, mut shares } => { - // Create a MuSig-based machine to inform Substrate of this key generation - let nonces = crate::tributary::dkg_confirmation_nonces(&key, spec, id.attempt); - - let mut tx_shares = Vec::with_capacity(shares.len()); - for i in 1 ..= spec.n() { - let i = Participant::new(i).unwrap(); - if i == - spec - .i(pub_key) - .expect("processor message to DKG for a session we aren't a validator in") - { - continue; - } - tx_shares.push( - shares.remove(&i).expect("processor didn't send share for another validator"), - ); - } - - vec![Transaction::DkgShares { - attempt: id.attempt, - shares: tx_shares, - confirmation_nonces: nonces, - signed: Transaction::empty_signed(), - }] - } - key_gen::ProcessorMessage::GeneratedKeyPair { id, substrate_key, network_key } => { - assert_eq!( - id.set.network, msg.network, - "processor claimed to be a different network than it was for GeneratedKeyPair", - ); - // TODO2: Also check the other KeyGenId fields - - // Tell the Tributary the key pair, get back the share for the MuSig signature - let share = crate::tributary::generated_key_pair::( - &mut txn, - &key, - spec, - &(Public(substrate_key), network_key.try_into().unwrap()), - id.attempt, - ); - - match share { - Ok(share) => { - vec![Transaction::DkgConfirmed(id.attempt, share, Transaction::empty_signed())] - } - Err(p) => { - todo!("participant {p:?} sent invalid DKG confirmation preprocesses") - } - } - } - }, - ProcessorMessage::Sign(msg) => match msg { - sign::ProcessorMessage::Preprocess { id, preprocess } => { - if id.attempt == 0 { - MainDb::::save_first_preprocess(&mut txn, network, id.id, preprocess); - - vec![] - } else { - vec![Transaction::SignPreprocess(SignData { - plan: id.id, - attempt: id.attempt, - data: preprocess, - signed: Transaction::empty_signed(), - })] - } - } - sign::ProcessorMessage::Share { id, share } => vec![Transaction::SignShare(SignData { - plan: id.id, - attempt: id.attempt, - data: share, - signed: Transaction::empty_signed(), - })], - sign::ProcessorMessage::Completed { key: _, id, tx } => { - let r = Zeroizing::new(::F::random(&mut OsRng)); - #[allow(non_snake_case)] - let R = ::generator() * r.deref(); - let mut tx = Transaction::SignCompleted { - plan: id, - tx_hash: tx, - first_signer: pub_key, - signature: SchnorrSignature { R, s: ::F::ZERO }, - }; - let signed = SchnorrSignature::sign(&key, r, tx.sign_completed_challenge()); - match &mut tx { - Transaction::SignCompleted { signature, .. } => { - *signature = signed; - } - _ => unreachable!(), - } - vec![tx] - } - }, - ProcessorMessage::Coordinator(inner_msg) => match inner_msg { - coordinator::ProcessorMessage::SubstrateBlockAck { .. } => unreachable!(), - coordinator::ProcessorMessage::BatchPreprocess { id, block, preprocess } => { - log::info!( - "informed of batch (sign ID {}, attempt {}) for block {}", - hex::encode(id.id), - id.attempt, - hex::encode(block), - ); - - // If this is the first attempt instance, wait until we synchronize around the batch - // first - if id.attempt == 0 { - MainDb::::save_first_preprocess(&mut txn, spec.set().network, id.id, preprocess); - - // If this is the new key's first Batch, only create this TX once we verify all - // all prior published `Batch`s - let last_received = MainDb::::last_received_batch(&txn, msg.network).unwrap(); - let handover_batch = MainDb::::handover_batch(&txn, spec.set()); - if handover_batch.is_none() { - MainDb::::set_handover_batch(&mut txn, spec.set(), last_received); - if last_received != 0 { - // Decrease by 1, to get the ID of the Batch prior to this Batch - let prior_sets_last_batch = last_received - 1; - loop { - let successfully_verified = - verify_published_batches::(&mut txn, msg.network, prior_sets_last_batch) - .await; - if successfully_verified == Some(prior_sets_last_batch) { - break; - } - sleep(Duration::from_secs(5)).await; - } - } - } - - // There is a race condition here. We may verify all `Batch`s from the prior set, - // start signing the handover `Batch` `n`, start signing `n+1`, have `n+1` signed - // before `n` (or at the same time), yet then the prior set forges a malicious - // `Batch` `n`. - // - // The malicious `Batch` `n` would be publishable to Serai, as Serai can't - // distinguish what's intended to be a handover `Batch`, yet then anyone could - // publish the new set's `n+1`, causing their acceptance of the handover. - // - // To fix this, if this is after the handover `Batch` and we have yet to verify - // publication of the handover `Batch`, don't yet yield the provided. - let handover_batch = MainDb::::handover_batch(&txn, spec.set()).unwrap(); - let intended = Transaction::Batch(block.0, id.id); - let mut res = vec![intended.clone()]; - if last_received > handover_batch { - if let Some(last_verified) = MainDb::::last_verified_batch(&txn, msg.network) { - if last_verified < handover_batch { - res = vec![]; - } - } else { - res = vec![]; - } - } - - if res.is_empty() { - MainDb::::queue_batch(&mut txn, spec.set(), intended); - } - - res - } else { - vec![Transaction::BatchPreprocess(SignData { - plan: id.id, - attempt: id.attempt, - data: preprocess, - signed: Transaction::empty_signed(), - })] - } - } - coordinator::ProcessorMessage::BatchShare { id, share } => { - vec![Transaction::BatchShare(SignData { - plan: id.id, - attempt: id.attempt, - data: share.to_vec(), - signed: Transaction::empty_signed(), - })] - } - }, - ProcessorMessage::Substrate(inner_msg) => match inner_msg { - processor_messages::substrate::ProcessorMessage::Batch { .. } => unreachable!(), - processor_messages::substrate::ProcessorMessage::SignedBatch { .. } => { - // We only reach here if this SignedBatch triggered the publication of a handover - // Batch - // Since the handover `Batch` was successfully published and verified, we no longer - // have to worry about the above n+1 attack - MainDb::::take_queued_batches(&mut txn, spec.set()) - } - }, - }; - - // If this created transactions, publish them - for mut tx in txs { - log::trace!("processor message effected transaction {}", hex::encode(tx.hash())); - - match tx.kind() { - TransactionKind::Provided(_) => { - log::trace!("providing transaction {}", hex::encode(tx.hash())); - let res = tributary.provide_transaction(tx.clone()).await; - if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) { - if res == Err(ProvidedError::LocalMismatchesOnChain) { - // Spin, since this is a crit for this Tributary - loop { - log::error!( - "{}. tributary: {}, provided: {:?}", - "tributary added distinct provided to delayed locally provided TX", - hex::encode(spec.genesis()), - &tx, - ); - sleep(Duration::from_secs(60)).await; - } - } - panic!("provided an invalid transaction: {res:?}"); - } - } - TransactionKind::Unsigned => { - log::trace!("publishing unsigned transaction {}", hex::encode(tx.hash())); - // Ignores the result since we can't differentiate already in-mempool from - // already on-chain from invalid - // TODO: Don't ignore the result - tributary.add_transaction(tx).await; - } - TransactionKind::Signed(_) => { - log::trace!("getting next nonce for Tributary TX in response to processor message"); - - let nonce = loop { - let Some(nonce) = NonceDecider::::nonce(&txn, genesis, &tx) - .expect("signed TX didn't have nonce") - else { - // This can be None if: - // 1) We scanned the relevant transaction(s) in a Tributary block - // 2) The processor was sent a message and responded - // 3) The Tributary TXN has yet to be committed - log::warn!("nonce has yet to be saved for processor-instigated transaction"); - sleep(Duration::from_millis(100)).await; - continue; - }; - break nonce; - }; - tx.sign(&mut OsRng, genesis, &key, nonce); - - publish_signed_transaction(&mut db_clone, tributary, tx).await; - } - } - } - } - - MainDb::::save_handled_message(&mut txn, msg.network, msg.id); - txn.commit(); + if handle_processor_message(&mut db, &key, &serai, &tributaries, network, &msg).await { + processors.ack(msg).await; } - - processors.ack(msg).await; } } @@ -863,7 +822,9 @@ pub async fn run( // TODO: This may happen if the task above is simply slow panic!("tributary we don't have came to consensus on an Batch"); }; - publish_signed_transaction(&mut raw_db, tributary, tx).await; + let mut txn = raw_db.txn(); + publish_signed_transaction(&mut txn, tributary, tx).await; + txn.commit(); } } }; diff --git a/coordinator/src/substrate/mod.rs b/coordinator/src/substrate/mod.rs index 1ba2e2c7..6f963650 100644 --- a/coordinator/src/substrate/mod.rs +++ b/coordinator/src/substrate/mod.rs @@ -359,59 +359,6 @@ async fn handle_new_blocks bool { - // TODO: Track this from the Substrate scanner to reduce our overhead? We'd only have a DB - // call, instead of a series of network requests - let latest = loop { - let Ok(res) = serai.get_latest_block_hash().await else { - log::error!( - "couldn't get the latest block hash from serai when checking tributary relevancy" - ); - sleep(Duration::from_secs(5)).await; - continue; - }; - break res; - }; - - let latest_session = loop { - let Ok(res) = serai.get_session(set.network, latest).await else { - log::error!("couldn't get the latest session from serai when checking tributary relevancy"); - sleep(Duration::from_secs(5)).await; - continue; - }; - // If the on-chain Session is None, then this Session is greater and therefore, for the - // purposes here, active - let Some(res) = res else { return true }; - break res; - }; - - if latest_session.0 > set.session.0 { - // If we're on the Session after the Session after this Session, then this Session is - // definitively completed - if latest_session.0 > (set.session.0 + 1) { - return false; - } else { - // Since the next session has started, check its handover status - let keys = loop { - let Ok(res) = serai.get_keys(set, latest).await else { - log::error!( - "couldn't get the keys for a session from serai when checking tributary relevancy" - ); - sleep(Duration::from_secs(5)).await; - continue; - }; - break res; - }; - // If the keys have been deleted, then this Tributary is retired - if keys.is_none() { - return false; - } - } - } - - true -} - pub async fn scan_task( db: D, key: Zeroizing<::F>, @@ -494,3 +441,110 @@ pub async fn scan_task( } } } + +/// Returns if a ValidatorSet has yet to be retired. +pub async fn is_active_set(serai: &Serai, set: ValidatorSet) -> bool { + // TODO: Track this from the Substrate scanner to reduce our overhead? We'd only have a DB + // call, instead of a series of network requests + let latest = loop { + let Ok(res) = serai.get_latest_block_hash().await else { + log::error!( + "couldn't get the latest block hash from serai when checking tributary relevancy" + ); + sleep(Duration::from_secs(5)).await; + continue; + }; + break res; + }; + + let latest_session = loop { + let Ok(res) = serai.get_session(set.network, latest).await else { + log::error!("couldn't get the latest session from serai when checking tributary relevancy"); + sleep(Duration::from_secs(5)).await; + continue; + }; + // If the on-chain Session is None, then this Session is greater and therefore, for the + // purposes here, active + let Some(res) = res else { return true }; + break res; + }; + + if latest_session.0 > set.session.0 { + // If we're on the Session after the Session after this Session, then this Session is + // definitively completed + if latest_session.0 > (set.session.0 + 1) { + return false; + } else { + // Since the next session has started, check its handover status + let keys = loop { + let Ok(res) = serai.get_keys(set, latest).await else { + log::error!( + "couldn't get the keys for a session from serai when checking tributary relevancy" + ); + sleep(Duration::from_secs(5)).await; + continue; + }; + break res; + }; + // If the keys have been deleted, then this Tributary is retired + if keys.is_none() { + return false; + } + } + } + + true +} + +/// Gets the expected ID for the next Batch. +pub(crate) async fn get_expected_next_batch(serai: &Serai, network: NetworkId) -> u32 { + let mut first = true; + loop { + if !first { + log::error!("{} {network:?}", "couldn't connect to Serai node to get the next batch ID for",); + sleep(Duration::from_secs(5)).await; + } + first = false; + + let Ok(latest_block) = serai.get_latest_block().await else { + continue; + }; + let Ok(last) = serai.get_last_batch_for_network(latest_block.hash(), network).await else { + continue; + }; + break if let Some(last) = last { last + 1 } else { 0 }; + } +} + +/// Verifies `Batch`s which have already been indexed from Substrate. +pub(crate) async fn verify_published_batches( + txn: &mut D::Transaction<'_>, + network: NetworkId, + optimistic_up_to: u32, +) -> Option { + // TODO: Localize from MainDb to SubstrateDb + let last = crate::MainDb::::last_verified_batch(txn, network); + for id in last.map(|last| last + 1).unwrap_or(0) ..= optimistic_up_to { + let Some(on_chain) = SubstrateDb::::batch_instructions_hash(txn, network, id) else { + break; + }; + let off_chain = crate::MainDb::::expected_batch(txn, network, id).unwrap(); + if on_chain != off_chain { + // Halt operations on this network and spin, as this is a critical fault + loop { + log::error!( + "{}! network: {:?} id: {} off-chain: {} on-chain: {}", + "on-chain batch doesn't match off-chain", + network, + id, + hex::encode(off_chain), + hex::encode(on_chain), + ); + sleep(Duration::from_secs(60)).await; + } + } + crate::MainDb::::save_last_verified_batch(txn, network, id); + } + + crate::MainDb::::last_verified_batch(txn, network) +}