From f07ec7bee05a646ce40896c56b5bc80c06c97d0a Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sun, 8 Sep 2024 22:13:42 -0400 Subject: [PATCH] Route the coordinator, fix race conditions in the signers library --- Cargo.lock | 2 +- processor/frost-attempt-manager/Cargo.toml | 1 - .../frost-attempt-manager/src/individual.rs | 24 +- processor/frost-attempt-manager/src/lib.rs | 26 +-- processor/messages/Cargo.toml | 2 + processor/messages/src/lib.rs | 36 ++- processor/primitives/src/block.rs | 2 + processor/scanner/src/db.rs | 15 +- processor/scanner/src/eventuality/mod.rs | 2 +- processor/scanner/src/lib.rs | 20 +- processor/scheduler/primitives/src/lib.rs | 4 + processor/signers/src/coordinator.rs | 98 +++++++++ processor/signers/src/db.rs | 14 +- processor/signers/src/lib.rs | 206 +++++++++++------- processor/signers/src/transaction/mod.rs | 76 ++++--- 15 files changed, 356 insertions(+), 172 deletions(-) create mode 100644 processor/signers/src/coordinator.rs diff --git a/Cargo.lock b/Cargo.lock index b960db4d..d6b0e3de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8630,7 +8630,6 @@ name = "serai-processor-frost-attempt-manager" version = "0.1.0" dependencies = [ "borsh", - "hex", "log", "modular-frost", "parity-scale-codec", @@ -8666,6 +8665,7 @@ version = "0.1.0" dependencies = [ "borsh", "dkg", + "hex", "parity-scale-codec", "serai-coins-primitives", "serai-in-instructions-primitives", diff --git a/processor/frost-attempt-manager/Cargo.toml b/processor/frost-attempt-manager/Cargo.toml index 67bd8bb6..ad8d2a4c 100644 --- a/processor/frost-attempt-manager/Cargo.toml +++ b/processor/frost-attempt-manager/Cargo.toml @@ -26,7 +26,6 @@ frost = { package = "modular-frost", path = "../../crypto/frost", version = "^0. serai-validator-sets-primitives = { path = "../../substrate/validator-sets/primitives", default-features = false, features = ["std"] } -hex = { version = "0.4", default-features = false, features = ["std"] } log = { version = "0.4", default-features = false, features = ["std"] } scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std"] } diff --git a/processor/frost-attempt-manager/src/individual.rs b/processor/frost-attempt-manager/src/individual.rs index 2591b582..6a8b3352 100644 --- a/processor/frost-attempt-manager/src/individual.rs +++ b/processor/frost-attempt-manager/src/individual.rs @@ -10,11 +10,11 @@ use frost::{ use serai_validator_sets_primitives::Session; use serai_db::{Get, DbTxn, Db, create_db}; -use messages::sign::{SignId, ProcessorMessage}; +use messages::sign::{VariantSignId, SignId, ProcessorMessage}; create_db!( FrostAttemptManager { - Attempted: (id: [u8; 32]) -> u32, + Attempted: (id: VariantSignId) -> u32, } ); @@ -28,7 +28,7 @@ pub(crate) struct SigningProtocol { // The key shares we sign with are expected to be continguous from this position. start_i: Participant, // The ID of this signing protocol. - id: [u8; 32], + id: VariantSignId, // This accepts a vector of `root` machines in order to support signing with multiple key shares. root: Vec, preprocessed: HashMap, HashMap>)>, @@ -48,10 +48,10 @@ impl SigningProtocol { db: D, session: Session, start_i: Participant, - id: [u8; 32], + id: VariantSignId, root: Vec, ) -> Self { - log::info!("starting signing protocol {}", hex::encode(id)); + log::info!("starting signing protocol {id:?}"); Self { db, @@ -100,7 +100,7 @@ impl SigningProtocol { txn.commit(); } - log::debug!("attemting a new instance of signing protocol {}", hex::encode(self.id)); + log::debug!("attemting a new instance of signing protocol {:?}", self.id); let mut our_preprocesses = HashMap::with_capacity(self.root.len()); let mut preprocessed = Vec::with_capacity(self.root.len()); @@ -137,7 +137,7 @@ impl SigningProtocol { attempt: u32, serialized_preprocesses: HashMap>, ) -> Vec { - log::debug!("handling preprocesses for signing protocol {}", hex::encode(self.id)); + log::debug!("handling preprocesses for signing protocol {:?}", self.id); let Some((machines, our_serialized_preprocesses)) = self.preprocessed.remove(&attempt) else { return vec![]; @@ -211,8 +211,8 @@ impl SigningProtocol { assert!(self.shared.insert(attempt, (shared.swap_remove(0), our_shares)).is_none()); log::debug!( - "successfully handled preprocesses for signing protocol {}, sending shares", - hex::encode(self.id) + "successfully handled preprocesses for signing protocol {:?}, sending shares", + self.id, ); msgs.push(ProcessorMessage::Shares { id: SignId { session: self.session, id: self.id, attempt }, @@ -229,7 +229,7 @@ impl SigningProtocol { attempt: u32, serialized_shares: HashMap>, ) -> Result> { - log::debug!("handling shares for signing protocol {}", hex::encode(self.id)); + log::debug!("handling shares for signing protocol {:?}", self.id); let Some((machine, our_serialized_shares)) = self.shared.remove(&attempt) else { Err(vec![])? }; @@ -272,13 +272,13 @@ impl SigningProtocol { }, }; - log::info!("finished signing for protocol {}", hex::encode(self.id)); + log::info!("finished signing for protocol {:?}", self.id); Ok(signature) } /// Cleanup the database entries for a specified signing protocol. - pub(crate) fn cleanup(txn: &mut impl DbTxn, id: [u8; 32]) { + pub(crate) fn cleanup(txn: &mut impl DbTxn, id: VariantSignId) { Attempted::del(txn, id); } } diff --git a/processor/frost-attempt-manager/src/lib.rs b/processor/frost-attempt-manager/src/lib.rs index 6666ffac..db8b0861 100644 --- a/processor/frost-attempt-manager/src/lib.rs +++ b/processor/frost-attempt-manager/src/lib.rs @@ -9,7 +9,7 @@ use frost::{Participant, sign::PreprocessMachine}; use serai_validator_sets_primitives::Session; use serai_db::{DbTxn, Db}; -use messages::sign::{ProcessorMessage, CoordinatorMessage}; +use messages::sign::{VariantSignId, ProcessorMessage, CoordinatorMessage}; mod individual; use individual::SigningProtocol; @@ -21,7 +21,7 @@ pub enum Response { /// A produced signature. Signature { /// The ID of the protocol this is for. - id: [u8; 32], + id: VariantSignId, /// The signature. signature: M::Signature, }, @@ -32,7 +32,7 @@ pub struct AttemptManager { db: D, session: Session, start_i: Participant, - active: HashMap<[u8; 32], SigningProtocol>, + active: HashMap>, } impl AttemptManager { @@ -46,7 +46,7 @@ impl AttemptManager { /// Register a signing protocol to attempt. /// /// This ID must be unique across all sessions, attempt managers, protocols, etc. - pub fn register(&mut self, id: [u8; 32], machines: Vec) -> Vec { + pub fn register(&mut self, id: VariantSignId, machines: Vec) -> Vec { let mut protocol = SigningProtocol::new(self.db.clone(), self.session, self.start_i, id, machines); let messages = protocol.attempt(0); @@ -60,11 +60,11 @@ impl AttemptManager { /// This does not stop the protocol from being re-registered and further worked on (with /// undefined behavior) then. The higher-level context must never call `register` again with this /// ID accordingly. - pub fn retire(&mut self, txn: &mut impl DbTxn, id: [u8; 32]) { + pub fn retire(&mut self, txn: &mut impl DbTxn, id: VariantSignId) { if self.active.remove(&id).is_none() { - log::info!("retiring protocol {}, which we didn't register/already retired", hex::encode(id)); + log::info!("retiring protocol {id:?}, which we didn't register/already retired"); } else { - log::info!("retired signing protocol {}", hex::encode(id)); + log::info!("retired signing protocol {id:?}"); } SigningProtocol::::cleanup(txn, id); } @@ -79,8 +79,8 @@ impl AttemptManager { CoordinatorMessage::Preprocesses { id, preprocesses } => { let Some(protocol) = self.active.get_mut(&id.id) else { log::trace!( - "handling preprocesses for signing protocol {}, which we're not actively running", - hex::encode(id.id) + "handling preprocesses for signing protocol {:?}, which we're not actively running", + id.id, ); return Response::Messages(vec![]); }; @@ -89,8 +89,8 @@ impl AttemptManager { CoordinatorMessage::Shares { id, shares } => { let Some(protocol) = self.active.get_mut(&id.id) else { log::trace!( - "handling shares for signing protocol {}, which we're not actively running", - hex::encode(id.id) + "handling shares for signing protocol {:?}, which we're not actively running", + id.id, ); return Response::Messages(vec![]); }; @@ -102,8 +102,8 @@ impl AttemptManager { CoordinatorMessage::Reattempt { id } => { let Some(protocol) = self.active.get_mut(&id.id) else { log::trace!( - "reattempting signing protocol {}, which we're not actively running", - hex::encode(id.id) + "reattempting signing protocol {:?}, which we're not actively running", + id.id, ); return Response::Messages(vec![]); }; diff --git a/processor/messages/Cargo.toml b/processor/messages/Cargo.toml index 0eba999d..dbadd9db 100644 --- a/processor/messages/Cargo.toml +++ b/processor/messages/Cargo.toml @@ -17,6 +17,8 @@ rustdoc-args = ["--cfg", "docsrs"] workspace = true [dependencies] +hex = { version = "0.4", default-features = false, features = ["std"] } + scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std"] } borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] } diff --git a/processor/messages/src/lib.rs b/processor/messages/src/lib.rs index 27d75d2e..ef907f97 100644 --- a/processor/messages/src/lib.rs +++ b/processor/messages/src/lib.rs @@ -1,3 +1,4 @@ +use core::fmt; use std::collections::HashMap; use scale::{Encode, Decode}; @@ -85,10 +86,37 @@ pub mod key_gen { pub mod sign { use super::*; - #[derive(Clone, PartialEq, Eq, Hash, Debug, Encode, Decode, BorshSerialize, BorshDeserialize)] + #[derive(Clone, Copy, PartialEq, Eq, Hash, Encode, Decode, BorshSerialize, BorshDeserialize)] + pub enum VariantSignId { + Cosign([u8; 32]), + Batch(u32), + SlashReport([u8; 32]), + Transaction([u8; 32]), + } + impl fmt::Debug for VariantSignId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + match self { + Self::Cosign(cosign) => { + f.debug_struct("VariantSignId::Cosign").field("0", &hex::encode(cosign)).finish() + } + Self::Batch(batch) => f.debug_struct("VariantSignId::Batch").field("0", &batch).finish(), + Self::SlashReport(slash_report) => f + .debug_struct("VariantSignId::SlashReport") + .field("0", &hex::encode(slash_report)) + .finish(), + Self::Transaction(tx) => { + f.debug_struct("VariantSignId::Transaction").field("0", &hex::encode(tx)).finish() + } + } + } + } + + #[derive( + Clone, Copy, PartialEq, Eq, Hash, Debug, Encode, Decode, BorshSerialize, BorshDeserialize, + )] pub struct SignId { pub session: Session, - pub id: [u8; 32], + pub id: VariantSignId, pub attempt: u32, } @@ -109,11 +137,11 @@ pub mod sign { None } - pub fn session(&self) -> Session { + pub fn sign_id(&self) -> &SignId { match self { CoordinatorMessage::Preprocesses { id, .. } | CoordinatorMessage::Shares { id, .. } | - CoordinatorMessage::Reattempt { id, .. } => id.session, + CoordinatorMessage::Reattempt { id, .. } => id, } } } diff --git a/processor/primitives/src/block.rs b/processor/primitives/src/block.rs index 6f603ab2..89dff54f 100644 --- a/processor/primitives/src/block.rs +++ b/processor/primitives/src/block.rs @@ -60,6 +60,8 @@ pub trait Block: Send + Sync + Sized + Clone + Debug { /// Check if this block resolved any Eventualities. /// + /// This MUST mutate `eventualities` to no longer contain the resolved Eventualities. + /// /// Returns tbe resolved Eventualities, indexed by the ID of the transactions which resolved /// them. fn check_for_eventuality_resolutions( diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs index 246e5f46..f72fa202 100644 --- a/processor/scanner/src/db.rs +++ b/processor/scanner/src/db.rs @@ -1,6 +1,7 @@ use core::marker::PhantomData; use std::io::{self, Read, Write}; +use group::GroupEncoding; use scale::{Encode, Decode, IoReader}; use borsh::{BorshSerialize, BorshDeserialize}; use serai_db::{Get, DbTxn, create_db, db_channel}; @@ -526,20 +527,20 @@ mod _completed_eventualities { db_channel! { ScannerPublic { - CompletedEventualities: (empty_key: ()) -> [u8; 32], + CompletedEventualities: (key: &[u8]) -> [u8; 32], } } } /// The IDs of completed Eventualities found on-chain, within a finalized block. -pub struct CompletedEventualities(PhantomData); -impl CompletedEventualities { - pub(crate) fn send(txn: &mut impl DbTxn, id: [u8; 32]) { - _completed_eventualities::CompletedEventualities::send(txn, (), &id); +pub struct CompletedEventualities(PhantomData); +impl CompletedEventualities { + pub(crate) fn send(txn: &mut impl DbTxn, key: &K, id: [u8; 32]) { + _completed_eventualities::CompletedEventualities::send(txn, key.to_bytes().as_ref(), &id); } /// Receive the ID of a completed Eventuality. - pub fn try_recv(txn: &mut impl DbTxn) -> Option<[u8; 32]> { - _completed_eventualities::CompletedEventualities::try_recv(txn, ()) + pub fn try_recv(txn: &mut impl DbTxn, key: &K) -> Option<[u8; 32]> { + _completed_eventualities::CompletedEventualities::try_recv(txn, key.to_bytes().as_ref()) } } diff --git a/processor/scanner/src/eventuality/mod.rs b/processor/scanner/src/eventuality/mod.rs index 7dadbe55..be5b4555 100644 --- a/processor/scanner/src/eventuality/mod.rs +++ b/processor/scanner/src/eventuality/mod.rs @@ -298,7 +298,7 @@ impl> ContinuallyRan for EventualityTas hex::encode(eventuality.id()), hex::encode(tx.as_ref()) ); - CompletedEventualities::::send(&mut txn, eventuality.id()); + CompletedEventualities::send(&mut txn, &key.key, eventuality.id()); } // Fetch all non-External outputs diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 6403605d..3323c6ff 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -362,24 +362,24 @@ impl Scanner { let substrate_task = substrate::SubstrateTask::<_, S>::new(db.clone()); let eventuality_task = eventuality::EventualityTask::<_, _, Sch>::new(db, feed, start_block); - let (index_run, _index_handle) = Task::new(); - let (scan_run, scan_handle) = Task::new(); - let (report_run, report_handle) = Task::new(); - let (substrate_run, substrate_handle) = Task::new(); - let (eventuality_run, eventuality_handle) = Task::new(); + let (index_task_def, _index_handle) = Task::new(); + let (scan_task_def, scan_handle) = Task::new(); + let (report_task_def, report_handle) = Task::new(); + let (substrate_task_def, substrate_handle) = Task::new(); + let (eventuality_task_def, eventuality_handle) = Task::new(); // Upon indexing a new block, scan it - tokio::spawn(index_task.continually_run(index_run, vec![scan_handle.clone()])); + tokio::spawn(index_task.continually_run(index_task_def, vec![scan_handle.clone()])); // Upon scanning a block, report it - tokio::spawn(scan_task.continually_run(scan_run, vec![report_handle])); + tokio::spawn(scan_task.continually_run(scan_task_def, vec![report_handle])); // Upon reporting a block, we do nothing (as the burden is on Substrate which won't be // immediately ready) - tokio::spawn(report_task.continually_run(report_run, vec![])); + tokio::spawn(report_task.continually_run(report_task_def, vec![])); // Upon handling an event from Substrate, we run the Eventuality task (as it's what's affected) - tokio::spawn(substrate_task.continually_run(substrate_run, vec![eventuality_handle])); + tokio::spawn(substrate_task.continually_run(substrate_task_def, vec![eventuality_handle])); // Upon handling the Eventualities in a block, we run the scan task as we've advanced the // window its allowed to scan - tokio::spawn(eventuality_task.continually_run(eventuality_run, vec![scan_handle])); + tokio::spawn(eventuality_task.continually_run(eventuality_task_def, vec![scan_handle])); Self { substrate_handle, _S: PhantomData } } diff --git a/processor/scheduler/primitives/src/lib.rs b/processor/scheduler/primitives/src/lib.rs index cef10d35..f146027d 100644 --- a/processor/scheduler/primitives/src/lib.rs +++ b/processor/scheduler/primitives/src/lib.rs @@ -41,6 +41,10 @@ pub trait SignableTransaction: 'static + Sized + Send + Sync + Clone { fn sign(self, keys: ThresholdKeys) -> Self::PreprocessMachine; } +/// The transaction type for a SignableTransaction. +pub type TransactionFor = + <::PreprocessMachine as PreprocessMachine>::Signature; + mod db { use serai_db::{Get, DbTxn, create_db, db_channel}; diff --git a/processor/signers/src/coordinator.rs b/processor/signers/src/coordinator.rs new file mode 100644 index 00000000..43dcc571 --- /dev/null +++ b/processor/signers/src/coordinator.rs @@ -0,0 +1,98 @@ +use serai_db::{DbTxn, Db}; + +use primitives::task::ContinuallyRan; + +use crate::{ + db::{ + RegisteredKeys, CosignerToCoordinatorMessages, BatchSignerToCoordinatorMessages, + SlashReportSignerToCoordinatorMessages, TransactionSignerToCoordinatorMessages, + }, + Coordinator, +}; + +// Fetches messages to send the coordinator and sends them. +pub(crate) struct CoordinatorTask { + db: D, + coordinator: C, +} + +impl CoordinatorTask { + pub(crate) fn new(db: D, coordinator: C) -> Self { + Self { db, coordinator } + } +} + +#[async_trait::async_trait] +impl ContinuallyRan for CoordinatorTask { + async fn run_iteration(&mut self) -> Result { + let mut iterated = false; + + for session in RegisteredKeys::get(&self.db).unwrap_or(vec![]) { + loop { + let mut txn = self.db.txn(); + let Some(msg) = CosignerToCoordinatorMessages::try_recv(&mut txn, session) else { + break; + }; + iterated = true; + + self + .coordinator + .send(msg) + .await + .map_err(|e| format!("couldn't send sign message to the coordinator: {e:?}"))?; + + txn.commit(); + } + + loop { + let mut txn = self.db.txn(); + let Some(msg) = BatchSignerToCoordinatorMessages::try_recv(&mut txn, session) else { + break; + }; + iterated = true; + + self + .coordinator + .send(msg) + .await + .map_err(|e| format!("couldn't send sign message to the coordinator: {e:?}"))?; + + txn.commit(); + } + + loop { + let mut txn = self.db.txn(); + let Some(msg) = SlashReportSignerToCoordinatorMessages::try_recv(&mut txn, session) else { + break; + }; + iterated = true; + + self + .coordinator + .send(msg) + .await + .map_err(|e| format!("couldn't send sign message to the coordinator: {e:?}"))?; + + txn.commit(); + } + + loop { + let mut txn = self.db.txn(); + let Some(msg) = TransactionSignerToCoordinatorMessages::try_recv(&mut txn, session) else { + break; + }; + iterated = true; + + self + .coordinator + .send(msg) + .await + .map_err(|e| format!("couldn't send sign message to the coordinator: {e:?}"))?; + + txn.commit(); + } + } + + Ok(iterated) + } +} diff --git a/processor/signers/src/db.rs b/processor/signers/src/db.rs index ec9b879c..ae62c947 100644 --- a/processor/signers/src/db.rs +++ b/processor/signers/src/db.rs @@ -15,14 +15,8 @@ create_db! { db_channel! { SignersGlobal { - // CompletedEventualities needs to be handled by each signer, meaning we need to turn its - // effective spsc into a spmc. We do this by duplicating its message for all keys we're - // signing for. - // TODO: Populate from CompletedEventualities - CompletedEventualitiesForEachKey: (session: Session) -> [u8; 32], - - CoordinatorToTransactionSignerMessages: (session: Session) -> CoordinatorMessage, - TransactionSignerToCoordinatorMessages: (session: Session) -> ProcessorMessage, + CoordinatorToCosignerMessages: (session: Session) -> CoordinatorMessage, + CosignerToCoordinatorMessages: (session: Session) -> ProcessorMessage, CoordinatorToBatchSignerMessages: (session: Session) -> CoordinatorMessage, BatchSignerToCoordinatorMessages: (session: Session) -> ProcessorMessage, @@ -30,7 +24,7 @@ db_channel! { CoordinatorToSlashReportSignerMessages: (session: Session) -> CoordinatorMessage, SlashReportSignerToCoordinatorMessages: (session: Session) -> ProcessorMessage, - CoordinatorToCosignerMessages: (session: Session) -> CoordinatorMessage, - CosignerToCoordinatorMessages: (session: Session) -> ProcessorMessage, + CoordinatorToTransactionSignerMessages: (session: Session) -> CoordinatorMessage, + TransactionSignerToCoordinatorMessages: (session: Session) -> ProcessorMessage, } } diff --git a/processor/signers/src/lib.rs b/processor/signers/src/lib.rs index 72fe2d17..a53f2208 100644 --- a/processor/signers/src/lib.rs +++ b/processor/signers/src/lib.rs @@ -7,23 +7,42 @@ use std::collections::HashMap; use zeroize::Zeroizing; -use serai_validator_sets_primitives::Session; - -use ciphersuite::{group::GroupEncoding, Ristretto}; +use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto}; use frost::dkg::{ThresholdCore, ThresholdKeys}; +use serai_validator_sets_primitives::Session; + use serai_db::{DbTxn, Db}; -use primitives::task::TaskHandle; -use scheduler::{Transaction, SignableTransaction, TransactionsToSign}; +use messages::sign::{VariantSignId, ProcessorMessage, CoordinatorMessage}; + +use primitives::task::{Task, TaskHandle, ContinuallyRan}; +use scheduler::{Transaction, SignableTransaction, TransactionFor}; pub(crate) mod db; +mod coordinator; +use coordinator::CoordinatorTask; + mod transaction; +use transaction::TransactionTask; + +/// A connection to the Coordinator which messages can be published with. +#[async_trait::async_trait] +pub trait Coordinator: 'static + Send + Sync { + /// An error encountered when sending a message. + /// + /// This MUST be an ephemeral error. Retrying sending a message MUST eventually resolve without + /// manual intervention/changing the arguments. + type EphemeralError: Debug; + + /// Send a `messages::sign::ProcessorMessage`. + async fn send(&mut self, message: ProcessorMessage) -> Result<(), Self::EphemeralError>; +} /// An object capable of publishing a transaction. #[async_trait::async_trait] -pub trait TransactionPublisher: 'static + Send + Sync { +pub trait TransactionPublisher: 'static + Send + Sync + Clone { /// An error encountered when publishing a transaction. /// /// This MUST be an ephemeral error. Retrying publication MUST eventually resolve without manual @@ -40,9 +59,18 @@ pub trait TransactionPublisher: 'static + Send + Sync { async fn publish(&self, tx: T) -> Result<(), Self::EphemeralError>; } +struct Tasks { + cosigner: TaskHandle, + batch: TaskHandle, + slash_report: TaskHandle, + transaction: TaskHandle, +} + /// The signers used by a processor. +#[allow(non_snake_case)] pub struct Signers { - tasks: HashMap>, + coordinator_handle: TaskHandle, + tasks: HashMap, _ST: PhantomData, } @@ -62,9 +90,57 @@ impl Signers { /// Initialize the signers. /// /// This will spawn tasks for any historically registered keys. - pub fn new(db: impl Db) -> Self { + pub fn new( + mut db: impl Db, + coordinator: impl Coordinator, + publisher: &impl TransactionPublisher>, + ) -> Self { + /* + On boot, perform any database cleanup which was queued. + + We don't do this cleanup at time of dropping the task as we'd need to wait an unbounded + amount of time for the task to stop (requiring an async task), then we'd have to drain the + channels (which would be on a distinct DB transaction and risk not occurring if we rebooted + while waiting for the task to stop). This is the easiest way to handle this. + */ + { + let mut txn = db.txn(); + for (session, external_key_bytes) in db::ToCleanup::get(&txn).unwrap_or(vec![]) { + let mut external_key_bytes = external_key_bytes.as_slice(); + let external_key = + ::read_G(&mut external_key_bytes).unwrap(); + assert!(external_key_bytes.is_empty()); + + // Drain the transactions to sign + // TransactionsToSign will be fully populated by the scheduler before retiry occurs, making + // this perfect in not leaving any pending blobs behind + while scheduler::TransactionsToSign::::try_recv(&mut txn, &external_key).is_some() {} + + // Drain the completed Eventualities + // This will be fully populated by the scanner before retiry + while scanner::CompletedEventualities::try_recv(&mut txn, &external_key).is_some() {} + + // Drain our DB channels + while db::CoordinatorToCosignerMessages::try_recv(&mut txn, session).is_some() {} + while db::CosignerToCoordinatorMessages::try_recv(&mut txn, session).is_some() {} + while db::CoordinatorToBatchSignerMessages::try_recv(&mut txn, session).is_some() {} + while db::BatchSignerToCoordinatorMessages::try_recv(&mut txn, session).is_some() {} + while db::CoordinatorToSlashReportSignerMessages::try_recv(&mut txn, session).is_some() {} + while db::SlashReportSignerToCoordinatorMessages::try_recv(&mut txn, session).is_some() {} + while db::CoordinatorToTransactionSignerMessages::try_recv(&mut txn, session).is_some() {} + while db::TransactionSignerToCoordinatorMessages::try_recv(&mut txn, session).is_some() {} + } + db::ToCleanup::del(&mut txn); + txn.commit(); + } + let mut tasks = HashMap::new(); + let (coordinator_task, coordinator_handle) = Task::new(); + tokio::spawn( + CoordinatorTask::new(db.clone(), coordinator).continually_run(coordinator_task, vec![]), + ); + for session in db::RegisteredKeys::get(&db).unwrap_or(vec![]) { let buf = db::SerializedKeys::get(&db, session).unwrap(); let mut buf = buf.as_slice(); @@ -78,10 +154,23 @@ impl Signers { .push(ThresholdKeys::from(ThresholdCore::::read(&mut buf).unwrap())); } - todo!("TODO") + // TODO: Batch signer, cosigner, slash report signers + + let (transaction_task, transaction_handle) = Task::new(); + tokio::spawn( + TransactionTask::<_, ST, _>::new(db.clone(), publisher.clone(), session, external_keys) + .continually_run(transaction_task, vec![coordinator_handle.clone()]), + ); + + tasks.insert(session, Tasks { + cosigner: todo!("TODO"), + batch: todo!("TODO"), + slash_report: todo!("TODO"), + transaction: transaction_handle, + }); } - Self { tasks, _ST: PhantomData } + Self { coordinator_handle, tasks, _ST: PhantomData } } /// Register a set of keys to sign with. @@ -146,82 +235,31 @@ impl Signers { let mut to_cleanup = db::ToCleanup::get(txn).unwrap_or(vec![]); to_cleanup.push((session, external_key.to_bytes().as_ref().to_vec())); db::ToCleanup::set(txn, &to_cleanup); + } - // TODO: Handle all of the following cleanup on a task - /* - // Kill the tasks - if let Some(tasks) = self.tasks.remove(&session) { - for task in tasks { - task.close().await; + /// Queue handling a message. + /// + /// This is a cheap call and able to be done inline with a higher-level loop. + pub fn queue_message(&mut self, txn: &mut impl DbTxn, message: &CoordinatorMessage) { + let sign_id = message.sign_id(); + let tasks = self.tasks.get(&sign_id.session); + match sign_id.id { + VariantSignId::Cosign(_) => { + db::CoordinatorToCosignerMessages::send(txn, sign_id.session, message); + if let Some(tasks) = tasks { tasks.cosigner.run_now(); } + } + VariantSignId::Batch(_) => { + db::CoordinatorToBatchSignerMessages::send(txn, sign_id.session, message); + if let Some(tasks) = tasks { tasks.batch.run_now(); } + } + VariantSignId::SlashReport(_) => { + db::CoordinatorToSlashReportSignerMessages::send(txn, sign_id.session, message); + if let Some(tasks) = tasks { tasks.slash_report.run_now(); } + } + VariantSignId::Transaction(_) => { + db::CoordinatorToTransactionSignerMessages::send(txn, sign_id.session, message); + if let Some(tasks) = tasks { tasks.transaction.run_now(); } } } - - // Drain the transactions to sign - // Presumably, TransactionsToSign will be fully populated before retiry occurs, making this - // perfect in not leaving any pending blobs behind - while TransactionsToSign::::try_recv(txn, external_key).is_some() {} - - // Drain our DB channels - while db::CompletedEventualitiesForEachKey::try_recv(txn, session).is_some() {} - while db::CoordinatorToTransactionSignerMessages::try_recv(txn, session).is_some() {} - while db::TransactionSignerToCoordinatorMessages::try_recv(txn, session).is_some() {} - while db::CoordinatorToBatchSignerMessages::try_recv(txn, session).is_some() {} - while db::BatchSignerToCoordinatorMessages::try_recv(txn, session).is_some() {} - while db::CoordinatorToSlashReportSignerMessages::try_recv(txn, session).is_some() {} - while db::SlashReportSignerToCoordinatorMessages::try_recv(txn, session).is_some() {} - while db::CoordinatorToCosignerMessages::try_recv(txn, session).is_some() {} - while db::CosignerToCoordinatorMessages::try_recv(txn, session).is_some() {} - */ } } - -/* -// The signers used by a Processor, key-scoped. -struct KeySigners { - transaction: AttemptManager, - substrate: AttemptManager>, - cosigner: AttemptManager>, -} - -/// The signers used by a protocol. -pub struct Signers(HashMap, KeySigners>); - -impl Signers { - /// Create a new set of signers. - pub fn new(db: D) -> Self { - // TODO: Load the registered keys - // TODO: Load the transactions being signed - // TODO: Load the batches being signed - todo!("TODO") - } - - /// Register a transaction to sign. - pub fn sign_transaction(&mut self) -> Vec { - todo!("TODO") - } - /// Mark a transaction as signed. - pub fn signed_transaction(&mut self) { todo!("TODO") } - - /// Register a batch to sign. - pub fn sign_batch(&mut self, key: KeyFor, batch: Batch) -> Vec { - todo!("TODO") - } - /// Mark a batch as signed. - pub fn signed_batch(&mut self, batch: u32) { todo!("TODO") } - - /// Register a slash report to sign. - pub fn sign_slash_report(&mut self) -> Vec { - todo!("TODO") - } - /// Mark a slash report as signed. - pub fn signed_slash_report(&mut self) { todo!("TODO") } - - /// Start a cosigning protocol. - pub fn cosign(&mut self) { todo!("TODO") } - - /// Handle a message for a signing protocol. - pub fn handle(&mut self, msg: CoordinatorMessage) -> Vec { - todo!("TODO") - } -} -*/ diff --git a/processor/signers/src/transaction/mod.rs b/processor/signers/src/transaction/mod.rs index 8fdf8145..be08cec2 100644 --- a/processor/signers/src/transaction/mod.rs +++ b/processor/signers/src/transaction/mod.rs @@ -3,31 +3,28 @@ use std::{ time::{Duration, Instant}, }; -use frost::{dkg::ThresholdKeys, sign::PreprocessMachine}; +use frost::dkg::ThresholdKeys; use serai_validator_sets_primitives::Session; use serai_db::{DbTxn, Db}; +use messages::sign::VariantSignId; + use primitives::task::ContinuallyRan; -use scheduler::{Transaction, SignableTransaction, TransactionsToSign}; +use scheduler::{Transaction, SignableTransaction, TransactionFor, TransactionsToSign}; +use scanner::CompletedEventualities; use frost_attempt_manager::*; use crate::{ - db::{ - CoordinatorToTransactionSignerMessages, TransactionSignerToCoordinatorMessages, - CompletedEventualitiesForEachKey, - }, + db::{CoordinatorToTransactionSignerMessages, TransactionSignerToCoordinatorMessages}, TransactionPublisher, }; mod db; use db::*; -type TransactionFor = - <::PreprocessMachine as PreprocessMachine>::Signature; - // Fetches transactions to sign and signs them. pub(crate) struct TransactionTask< D: Db, @@ -76,7 +73,7 @@ impl> for keys in &keys { machines.push(signable_transaction.clone().sign(keys.clone())); } - attempt_manager.register(tx, machines); + attempt_manager.register(VariantSignId::Transaction(tx), machines); } Self { @@ -123,7 +120,7 @@ impl> for keys in &self.keys { machines.push(tx.clone().sign(keys.clone())); } - for msg in self.attempt_manager.register(tx.id(), machines) { + for msg in self.attempt_manager.register(VariantSignId::Transaction(tx.id()), machines) { TransactionSignerToCoordinatorMessages::send(&mut txn, self.session, &msg); } @@ -133,28 +130,42 @@ impl> // Check for completed Eventualities (meaning we should no longer sign for these transactions) loop { let mut txn = self.db.txn(); - let Some(id) = CompletedEventualitiesForEachKey::try_recv(&mut txn, self.session) else { + let Some(id) = CompletedEventualities::try_recv(&mut txn, &self.keys[0].group_key()) else { break; }; + + /* + We may have yet to register this signing protocol. + + While `TransactionsToSign` is populated before `CompletedEventualities`, we could + theoretically have `TransactionsToSign` populated with a new transaction _while iterating + over `CompletedEventualities`_, and then have `CompletedEventualities` populated. In that + edge case, we will see the completion notification before we see the transaction. + + In such a case, we break (dropping the txn, re-queueing the completion notification). On + the task's next iteration, we'll process the transaction from `TransactionsToSign` and be + able to make progress. + */ + if !self.active_signing_protocols.remove(&id) { + break; + } iterated = true; - // This may or may not be an ID this key was responsible for - if self.active_signing_protocols.remove(&id) { - // Since it was, remove this as an active signing protocol - ActiveSigningProtocols::set( - &mut txn, - self.session, - &self.active_signing_protocols.iter().copied().collect(), - ); - // Clean up the database - SerializedSignableTransactions::del(&mut txn, id); - SerializedTransactions::del(&mut txn, id); + // Since it was, remove this as an active signing protocol + ActiveSigningProtocols::set( + &mut txn, + self.session, + &self.active_signing_protocols.iter().copied().collect(), + ); + // Clean up the database + SerializedSignableTransactions::del(&mut txn, id); + SerializedTransactions::del(&mut txn, id); + + // We retire with a txn so we either successfully flag this Eventuality as completed, and + // won't re-register it (making this retire safe), or we don't flag it, meaning we will + // re-register it, yet that's safe as we have yet to retire it + self.attempt_manager.retire(&mut txn, VariantSignId::Transaction(id)); - // We retire with a txn so we either successfully flag this Eventuality as completed, and - // won't re-register it (making this retire safe), or we don't flag it, meaning we will - // re-register it, yet that's safe as we have yet to retire it - self.attempt_manager.retire(&mut txn, id); - } txn.commit(); } @@ -178,7 +189,14 @@ impl> { let mut buf = Vec::with_capacity(256); signed_tx.write(&mut buf).unwrap(); - SerializedTransactions::set(&mut txn, id, &buf); + SerializedTransactions::set( + &mut txn, + match id { + VariantSignId::Transaction(id) => id, + _ => panic!("TransactionTask signed a non-transaction"), + }, + &buf, + ); } self