From 4054e44471db146ae15d127c71424daf195b6b79 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Wed, 11 Sep 2024 04:54:03 -0400 Subject: [PATCH] Start on the new processor main loop --- processor/bitcoin/src/db.rs | 8 + processor/bitcoin/src/key_gen.rs | 6 +- processor/bitcoin/src/main.rs | 83 ++++++++++ processor/messages/src/lib.rs | 11 +- processor/src/main.rs | 259 ------------------------------- 5 files changed, 99 insertions(+), 268 deletions(-) diff --git a/processor/bitcoin/src/db.rs b/processor/bitcoin/src/db.rs index 1d73ebfe..94a7c0ba 100644 --- a/processor/bitcoin/src/db.rs +++ b/processor/bitcoin/src/db.rs @@ -1,5 +1,13 @@ +use serai_client::validator_sets::primitives::Session; + use serai_db::{Get, DbTxn, create_db}; +create_db! { + Processor { + ExternalKeyForSession: (session: Session) -> Vec, + } +} + create_db! { BitcoinProcessor { LatestBlockToYieldAsFinalized: () -> u64, diff --git a/processor/bitcoin/src/key_gen.rs b/processor/bitcoin/src/key_gen.rs index 16183231..416677e7 100644 --- a/processor/bitcoin/src/key_gen.rs +++ b/processor/bitcoin/src/key_gen.rs @@ -1,12 +1,10 @@ use ciphersuite::{group::GroupEncoding, Ciphersuite, Secp256k1}; use frost::ThresholdKeys; -use key_gen::KeyGenParams; - use crate::scan::scanner; -pub(crate) struct KeyGen; -impl KeyGenParams for KeyGen { +pub(crate) struct KeyGenParams; +impl key_gen::KeyGenParams for KeyGenParams { const ID: &'static str = "Bitcoin"; type ExternalNetworkCurve = Secp256k1; diff --git a/processor/bitcoin/src/main.rs b/processor/bitcoin/src/main.rs index d86a4ba1..bb788d1e 100644 --- a/processor/bitcoin/src/main.rs +++ b/processor/bitcoin/src/main.rs @@ -6,6 +6,10 @@ static ALLOCATOR: zalloc::ZeroizingAlloc = zalloc::ZeroizingAlloc(std::alloc::System); +use ciphersuite::Ciphersuite; + +use serai_db::{DbTxn, Db}; + mod primitives; pub(crate) use primitives::*; @@ -14,8 +18,11 @@ mod scan; // App-logic trait satisfactions mod key_gen; +use crate::key_gen::KeyGenParams; mod rpc; +use rpc::Rpc; mod scheduler; +use scheduler::Scheduler; // Our custom code for Bitcoin mod db; @@ -29,6 +36,82 @@ pub(crate) fn hash_bytes(hash: bitcoin_serai::bitcoin::hashes::sha256d::Hash) -> res } +/// Fetch the next message from the Coordinator. +/// +/// This message is guaranteed to have never been handled before, where handling is defined as +/// this `txn` being committed. +async fn next_message(_txn: &mut impl DbTxn) -> messages::CoordinatorMessage { + todo!("TODO") +} + +async fn send_message(_msg: messages::ProcessorMessage) { + todo!("TODO") +} + +async fn coordinator_loop( + mut db: D, + mut key_gen: ::key_gen::KeyGen, + mut signers: signers::Signers, Scheduler, Rpc>, + mut scanner: Option>>, +) { + loop { + let mut txn = Some(db.txn()); + let msg = next_message(txn.as_mut().unwrap()).await; + match msg { + messages::CoordinatorMessage::KeyGen(msg) => { + // This is a computationally expensive call yet it happens infrequently + for msg in key_gen.handle(txn.as_mut().unwrap(), msg) { + send_message(messages::ProcessorMessage::KeyGen(msg)).await; + } + } + // These are cheap calls which are fine to be here in this loop + messages::CoordinatorMessage::Sign(msg) => signers.queue_message(txn.as_mut().unwrap(), &msg), + messages::CoordinatorMessage::Coordinator( + messages::coordinator::CoordinatorMessage::CosignSubstrateBlock { + session, + block_number, + block, + }, + ) => signers.cosign_block(txn.take().unwrap(), session, block_number, block), + messages::CoordinatorMessage::Coordinator( + messages::coordinator::CoordinatorMessage::SignSlashReport { session, report }, + ) => signers.sign_slash_report(txn.take().unwrap(), session, &report), + messages::CoordinatorMessage::Substrate(msg) => match msg { + messages::substrate::CoordinatorMessage::SetKeys { serai_time, session, key_pair } => { + db::ExternalKeyForSession::set(txn.as_mut().unwrap(), session, &key_pair.1.into_inner()); + todo!("TODO: Register in signers"); + todo!("TODO: Scanner activation") + } + messages::substrate::CoordinatorMessage::SlashesReported { session } => { + let key_bytes = db::ExternalKeyForSession::get(txn.as_ref().unwrap(), session).unwrap(); + let mut key_bytes = key_bytes.as_slice(); + let key = + ::ExternalNetworkCurve::read_G(&mut key_bytes) + .unwrap(); + assert!(key_bytes.is_empty()); + + signers.retire_session(txn.as_mut().unwrap(), session, &key) + } + messages::substrate::CoordinatorMessage::BlockWithBatchAcknowledgement { + block, + batch_id, + in_instruction_succeededs, + burns, + key_to_activate, + } => todo!("TODO"), + messages::substrate::CoordinatorMessage::BlockWithoutBatchAcknowledgement { + block, + burns, + } => todo!("TODO"), + }, + }; + // If the txn wasn't already consumed and committed, commit it + if let Some(txn) = txn { + txn.commit(); + } + } +} + #[tokio::main] async fn main() {} diff --git a/processor/messages/src/lib.rs b/processor/messages/src/lib.rs index 998c7cea..ae1ab6d5 100644 --- a/processor/messages/src/lib.rs +++ b/processor/messages/src/lib.rs @@ -113,8 +113,6 @@ pub mod sign { pub attempt: u32, } - // TODO: Make this generic to the ID once we introduce topics into the message-queue and remove - // the global ProcessorMessage/CoordinatorMessage #[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] pub enum CoordinatorMessage { // Received preprocesses for the specified signing protocol. @@ -185,8 +183,10 @@ pub mod substrate { #[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] pub enum CoordinatorMessage { - /// Keys set on the Serai network. + /// Keys set on the Serai blockchain. SetKeys { serai_time: u64, session: Session, key_pair: KeyPair }, + /// Slashes reported on the Serai blockchain OR the process timed out. + SlashesReported { session: Session }, /// The data from a block which acknowledged a Batch. BlockWithBatchAcknowledgement { block: u64, @@ -305,11 +305,12 @@ impl CoordinatorMessage { CoordinatorMessage::Substrate(msg) => { let (sub, id) = match msg { substrate::CoordinatorMessage::SetKeys { session, .. } => (0, session.encode()), + substrate::CoordinatorMessage::SlashesReported { session } => (1, session.encode()), substrate::CoordinatorMessage::BlockWithBatchAcknowledgement { block, .. } => { - (1, block.encode()) + (2, block.encode()) } substrate::CoordinatorMessage::BlockWithoutBatchAcknowledgement { block, .. } => { - (2, block.encode()) + (3, block.encode()) } }; diff --git a/processor/src/main.rs b/processor/src/main.rs index 10406729..51123b92 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -1,21 +1,3 @@ -use std::{time::Duration, collections::HashMap}; - -use zeroize::{Zeroize, Zeroizing}; - -use ciphersuite::{ - group::{ff::PrimeField, GroupEncoding}, - Ciphersuite, Ristretto, -}; -use dkg::evrf::EvrfCurve; - -use log::{info, warn}; -use tokio::time::sleep; - -use serai_client::{ - primitives::{BlockHash, NetworkId}, - validator_sets::primitives::{Session, KeyPair}, -}; - use messages::{ coordinator::{ SubstrateSignableId, PlanMeta, CoordinatorMessage as CoordinatorCoordinatorMessage, @@ -27,112 +9,18 @@ use serai_env as env; use message_queue::{Service, client::MessageQueue}; -mod networks; -use networks::{Block, Network}; -#[cfg(feature = "bitcoin")] -use networks::Bitcoin; -#[cfg(feature = "ethereum")] -use networks::Ethereum; -#[cfg(feature = "monero")] -use networks::Monero; - mod db; pub use db::*; mod coordinator; pub use coordinator::*; -use serai_processor_key_gen as key_gen; -use key_gen::{SessionDb, KeyConfirmed, KeyGen}; - -mod signer; -use signer::Signer; - -mod cosigner; -use cosigner::Cosigner; - -mod batch_signer; -use batch_signer::BatchSigner; - -mod slash_report_signer; -use slash_report_signer::SlashReportSigner; - mod multisigs; use multisigs::{MultisigEvent, MultisigManager}; #[cfg(test)] mod tests; -#[global_allocator] -static ALLOCATOR: zalloc::ZeroizingAlloc = - zalloc::ZeroizingAlloc(std::alloc::System); - -// Items which are mutably borrowed by Tributary. -// Any exceptions to this have to be carefully monitored in order to ensure consistency isn't -// violated. -struct TributaryMutable { - // The following are actually mutably borrowed by Substrate as well. - // - Substrate triggers key gens, and determines which to use. - // - SubstrateBlock events cause scheduling which causes signing. - // - // This is still considered Tributary-mutable as most mutation (preprocesses/shares) happens by - // the Tributary. - // - // Creation of tasks is by Substrate, yet this is safe since the mutable borrow is transferred to - // Tributary. - // - // Tributary stops mutating a key gen attempt before Substrate is made aware of it, ensuring - // Tributary drops its mutable borrow before Substrate acquires it. Tributary will maintain a - // mutable borrow on the *key gen task*, yet the finalization code can successfully run for any - // attempt. - // - // The only other note is how the scanner may cause a signer task to be dropped, effectively - // invalidating the Tributary's mutable borrow. The signer is coded to allow for attempted usage - // of a dropped task. - key_gen: KeyGen, - signers: HashMap>, - - // This is also mutably borrowed by the Scanner. - // The Scanner starts new sign tasks. - // The Tributary mutates already-created signed tasks, potentially completing them. - // Substrate may mark tasks as completed, invalidating any existing mutable borrows. - // The safety of this follows as written above. - - // There should only be one BatchSigner at a time (see #277) - batch_signer: Option>, - - // Solely mutated by the tributary. - cosigner: Option, - slash_report_signer: Option, -} - -// Items which are mutably borrowed by Substrate. -// Any exceptions to this have to be carefully monitored in order to ensure consistency isn't -// violated. - -/* - The MultisigManager contains the Scanner and Schedulers. - - The scanner is expected to autonomously operate, scanning blocks as they appear. When a block is - sufficiently confirmed, the scanner causes the Substrate signer to sign a batch. It itself only - mutates its list of finalized blocks, to protect against re-orgs, and its in-memory state though. - - Disk mutations to the scan-state only happens once the relevant `Batch` is included on Substrate. - It can't be mutated as soon as the `Batch` is signed as we need to know the order of `Batch`s - relevant to `Burn`s. - - Schedulers take in new outputs, confirmed in `Batch`s, and outbound payments, triggered by - `Burn`s. - - Substrate also decides when to move to a new multisig, hence why this entire object is - Substrate-mutable. - - Since MultisigManager should always be verifiable, and the Tributary is temporal, MultisigManager - being entirely SubstrateMutable shows proper data pipe-lining. -*/ - -type SubstrateMutable = MultisigManager; - async fn handle_coordinator_msg( txn: &mut D::Transaction<'_>, network: &N, @@ -141,54 +29,6 @@ async fn handle_coordinator_msg( substrate_mutable: &mut SubstrateMutable, msg: &Message, ) { - // If this message expects a higher block number than we have, halt until synced - async fn wait( - txn: &D::Transaction<'_>, - substrate_mutable: &SubstrateMutable, - block_hash: &BlockHash, - ) { - let mut needed_hash = >::Id::default(); - needed_hash.as_mut().copy_from_slice(&block_hash.0); - - loop { - // Ensure our scanner has scanned this block, which means our daemon has this block at - // a sufficient depth - if substrate_mutable.block_number(txn, &needed_hash).await.is_none() { - warn!( - "node is desynced. we haven't scanned {} which should happen after {} confirms", - hex::encode(&needed_hash), - N::CONFIRMATIONS, - ); - sleep(Duration::from_secs(10)).await; - continue; - }; - break; - } - - // TODO2: Sanity check we got an AckBlock (or this is the AckBlock) for the block in question - - /* - let synced = |context: &SubstrateContext, key| -> Result<(), ()> { - // Check that we've synced this block and can actually operate on it ourselves - let latest = scanner.latest_scanned(key); - if usize::try_from(context.network_latest_finalized_block).unwrap() < latest { - log::warn!( - "external network node disconnected/desynced from rest of the network. \ - our block: {latest:?}, network's acknowledged: {}", - context.network_latest_finalized_block, - ); - Err(())?; - } - Ok(()) - }; - */ - } - - if let Some(required) = msg.msg.required_block() { - // wait only reads from, it doesn't mutate, substrate_mutable - wait(txn, substrate_mutable, &required).await; - } - async fn activate_key( network: &N, substrate_mutable: &mut SubstrateMutable, @@ -220,105 +60,6 @@ async fn handle_coordinator_msg( } match msg.msg.clone() { - CoordinatorMessage::KeyGen(msg) => { - for msg in tributary_mutable.key_gen.handle(txn, msg) { - coordinator.send(msg).await; - } - } - - CoordinatorMessage::Sign(msg) => { - if let Some(msg) = tributary_mutable - .signers - .get_mut(&msg.session()) - .expect("coordinator told us to sign with a signer we don't have") - .handle(txn, msg) - .await - { - coordinator.send(msg).await; - } - } - - CoordinatorMessage::Coordinator(msg) => match msg { - CoordinatorCoordinatorMessage::CosignSubstrateBlock { id, block_number } => { - let SubstrateSignableId::CosigningSubstrateBlock(block) = id.id else { - panic!("CosignSubstrateBlock id didn't have a CosigningSubstrateBlock") - }; - let Some(keys) = tributary_mutable.key_gen.substrate_keys_by_session(id.session) else { - panic!("didn't have key shares for the key we were told to cosign with"); - }; - if let Some((cosigner, msg)) = - Cosigner::new(txn, id.session, keys, block_number, block, id.attempt) - { - tributary_mutable.cosigner = Some(cosigner); - coordinator.send(msg).await; - } else { - log::warn!("Cosigner::new returned None"); - } - } - CoordinatorCoordinatorMessage::SignSlashReport { id, report } => { - assert_eq!(id.id, SubstrateSignableId::SlashReport); - let Some(keys) = tributary_mutable.key_gen.substrate_keys_by_session(id.session) else { - panic!("didn't have key shares for the key we were told to perform a slash report with"); - }; - if let Some((slash_report_signer, msg)) = - SlashReportSigner::new(txn, N::NETWORK, id.session, keys, report, id.attempt) - { - tributary_mutable.slash_report_signer = Some(slash_report_signer); - coordinator.send(msg).await; - } else { - log::warn!("SlashReportSigner::new returned None"); - } - } - _ => { - let (is_cosign, is_batch, is_slash_report) = match msg { - CoordinatorCoordinatorMessage::CosignSubstrateBlock { .. } | - CoordinatorCoordinatorMessage::SignSlashReport { .. } => (false, false, false), - CoordinatorCoordinatorMessage::SubstratePreprocesses { ref id, .. } | - CoordinatorCoordinatorMessage::SubstrateShares { ref id, .. } => ( - matches!(&id.id, SubstrateSignableId::CosigningSubstrateBlock(_)), - matches!(&id.id, SubstrateSignableId::Batch(_)), - matches!(&id.id, SubstrateSignableId::SlashReport), - ), - CoordinatorCoordinatorMessage::BatchReattempt { .. } => (false, true, false), - }; - - if is_cosign { - if let Some(cosigner) = tributary_mutable.cosigner.as_mut() { - if let Some(msg) = cosigner.handle(txn, msg) { - coordinator.send(msg).await; - } - } else { - log::warn!( - "received message for cosigner yet didn't have a cosigner. {}", - "this is an error if we didn't reboot", - ); - } - } else if is_batch { - if let Some(msg) = tributary_mutable - .batch_signer - .as_mut() - .expect( - "coordinator told us to sign a batch when we don't currently have a Substrate signer", - ) - .handle(txn, msg) - { - coordinator.send(msg).await; - } - } else if is_slash_report { - if let Some(slash_report_signer) = tributary_mutable.slash_report_signer.as_mut() { - if let Some(msg) = slash_report_signer.handle(txn, msg) { - coordinator.send(msg).await; - } - } else { - log::warn!( - "received message for slash report signer yet didn't have {}", - "a slash report signer. this is an error if we didn't reboot", - ); - } - } - } - }, - CoordinatorMessage::Substrate(msg) => { match msg { messages::substrate::CoordinatorMessage::ConfirmKeyPair { context, session, key_pair } => {