From 100c80be9ff91fb371a458aab5628c4362dee166 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Fri, 6 Sep 2024 04:15:02 -0400 Subject: [PATCH] Finish transaction signing task with TX rebroadcast code --- .../frost-attempt-manager/src/individual.rs | 6 +- processor/frost-attempt-manager/src/lib.rs | 15 ++- processor/scheduler/primitives/src/lib.rs | 2 +- processor/signers/src/lib.rs | 6 +- processor/signers/src/transaction/db.rs | 10 ++ processor/signers/src/transaction/mod.rs | 93 +++++++++++++++++-- 6 files changed, 109 insertions(+), 23 deletions(-) diff --git a/processor/frost-attempt-manager/src/individual.rs b/processor/frost-attempt-manager/src/individual.rs index 049731c6..2591b582 100644 --- a/processor/frost-attempt-manager/src/individual.rs +++ b/processor/frost-attempt-manager/src/individual.rs @@ -278,9 +278,7 @@ impl SigningProtocol { } /// Cleanup the database entries for a specified signing protocol. - pub(crate) fn cleanup(db: &mut D, id: [u8; 32]) { - let mut txn = db.txn(); - Attempted::del(&mut txn, id); - txn.commit(); + pub(crate) fn cleanup(txn: &mut impl DbTxn, id: [u8; 32]) { + Attempted::del(txn, id); } } diff --git a/processor/frost-attempt-manager/src/lib.rs b/processor/frost-attempt-manager/src/lib.rs index 2ce46784..6666ffac 100644 --- a/processor/frost-attempt-manager/src/lib.rs +++ b/processor/frost-attempt-manager/src/lib.rs @@ -8,7 +8,7 @@ use frost::{Participant, sign::PreprocessMachine}; use serai_validator_sets_primitives::Session; -use serai_db::Db; +use serai_db::{DbTxn, Db}; use messages::sign::{ProcessorMessage, CoordinatorMessage}; mod individual; @@ -19,7 +19,12 @@ pub enum Response { /// Messages to send to the coordinator. Messages(Vec), /// A produced signature. - Signature(M::Signature), + Signature { + /// The ID of the protocol this is for. + id: [u8; 32], + /// The signature. + signature: M::Signature, + }, } /// A manager of attempts for a variety of signing protocols. @@ -55,13 +60,13 @@ impl AttemptManager { /// This does not stop the protocol from being re-registered and further worked on (with /// undefined behavior) then. The higher-level context must never call `register` again with this /// ID accordingly. - pub fn retire(&mut self, id: [u8; 32]) { + pub fn retire(&mut self, txn: &mut impl DbTxn, id: [u8; 32]) { if self.active.remove(&id).is_none() { log::info!("retiring protocol {}, which we didn't register/already retired", hex::encode(id)); } else { log::info!("retired signing protocol {}", hex::encode(id)); } - SigningProtocol::::cleanup(&mut self.db, id); + SigningProtocol::::cleanup(txn, id); } /// Handle a message for a signing protocol. @@ -90,7 +95,7 @@ impl AttemptManager { return Response::Messages(vec![]); }; match protocol.shares(id.attempt, shares) { - Ok(signature) => Response::Signature(signature), + Ok(signature) => Response::Signature { id: id.id, signature }, Err(messages) => Response::Messages(messages), } } diff --git a/processor/scheduler/primitives/src/lib.rs b/processor/scheduler/primitives/src/lib.rs index 4de4f67a..cef10d35 100644 --- a/processor/scheduler/primitives/src/lib.rs +++ b/processor/scheduler/primitives/src/lib.rs @@ -23,7 +23,7 @@ pub trait SignableTransaction: 'static + Sized + Send + Sync + Clone { /// The ciphersuite used to sign this transaction. type Ciphersuite: Ciphersuite; /// The preprocess machine for the signing protocol for this transaction. - type PreprocessMachine: Clone + PreprocessMachine; + type PreprocessMachine: Clone + PreprocessMachine; /// Read a `SignableTransaction`. fn read(reader: &mut impl io::Read) -> io::Result; diff --git a/processor/signers/src/lib.rs b/processor/signers/src/lib.rs index 7453f4b6..eb09440d 100644 --- a/processor/signers/src/lib.rs +++ b/processor/signers/src/lib.rs @@ -19,15 +19,15 @@ pub trait TransactionPublisher: 'static + Send + Sync { /// /// This MUST be an ephemeral error. Retrying publication MUST eventually resolve without manual /// intervention/changing the arguments. - /// - /// The transaction already being present in the mempool/on-chain SHOULD NOT be considered an - /// error. type EphemeralError: Debug; /// Publish a transaction. /// /// This will be called multiple times, with the same transaction, until the transaction is /// confirmed on-chain. + /// + /// The transaction already being present in the mempool/on-chain MUST NOT be considered an + /// error. async fn publish( &self, tx: ::Signature, diff --git a/processor/signers/src/transaction/db.rs b/processor/signers/src/transaction/db.rs index 8b137891..b77d38c7 100644 --- a/processor/signers/src/transaction/db.rs +++ b/processor/signers/src/transaction/db.rs @@ -1 +1,11 @@ +use serai_validator_sets_primitives::Session; +use serai_db::{Get, DbTxn, create_db}; + +create_db! { + TransactionSigner { + ActiveSigningProtocols: (session: Session) -> Vec<[u8; 32]>, + SerializedSignableTransactions: (id: [u8; 32]) -> Vec, + SerializedTransactions: (id: [u8; 32]) -> Vec, + } +} diff --git a/processor/signers/src/transaction/mod.rs b/processor/signers/src/transaction/mod.rs index 4ed573f4..85a6a0ab 100644 --- a/processor/signers/src/transaction/mod.rs +++ b/processor/signers/src/transaction/mod.rs @@ -1,11 +1,13 @@ -use frost::dkg::ThresholdKeys; +use std::{collections::HashSet, time::{Duration, Instant}}; + +use frost::{dkg::ThresholdKeys, sign::PreprocessMachine}; use serai_validator_sets_primitives::Session; use serai_db::{DbTxn, Db}; use primitives::task::ContinuallyRan; -use scheduler::{SignableTransaction, TransactionsToSign}; +use scheduler::{Transaction, SignableTransaction, TransactionsToSign}; use scanner::{ScannerFeed, Scheduler}; use frost_attempt_manager::*; @@ -19,6 +21,13 @@ use crate::{ }; mod db; +use db::*; + +type TransactionFor = < + < + + >::SignableTransaction as SignableTransaction>::PreprocessMachine as PreprocessMachine +>::Signature; // Fetches transactions to sign and signs them. pub(crate) struct TransactionTask< @@ -28,11 +37,16 @@ pub(crate) struct TransactionTask< P: TransactionPublisher, > { db: D, + publisher: P, + session: Session, keys: Vec::Ciphersuite>>, + + active_signing_protocols: HashSet<[u8; 32]>, attempt_manager: AttemptManager::PreprocessMachine>, - publisher: P, + + last_publication: Instant, } impl, P: TransactionPublisher> @@ -40,16 +54,35 @@ impl, P: TransactionPublisher::Ciphersuite>>, - publisher: P, ) -> Self { - let attempt_manager = AttemptManager::new( + let mut active_signing_protocols = HashSet::new(); + let mut attempt_manager = AttemptManager::new( db.clone(), session, keys.first().expect("creating a transaction signer with 0 keys").params().i(), ); - Self { db, session, keys, attempt_manager, publisher } + + // Re-register all active signing protocols + for tx in ActiveSigningProtocols::get(&db, session).unwrap_or(vec![]) { + active_signing_protocols.insert(tx); + + let signable_transaction_buf = SerializedSignableTransactions::get(&db, tx).unwrap(); + let mut signable_transaction_buf = signable_transaction_buf.as_slice(); + let signable_transaction = >::SignableTransaction::read(&mut signable_transaction_buf).unwrap(); + assert!(signable_transaction_buf.is_empty()); + assert_eq!(signable_transaction.id(), tx); + + let mut machines = Vec::with_capacity(keys.len()); + for keys in &keys { + machines.push(signable_transaction.clone().sign(keys.clone())); + } + attempt_manager.register(tx, machines); + } + + Self { db, publisher, session, keys, active_signing_protocols, attempt_manager, last_publication: Instant::now() } } } @@ -71,6 +104,15 @@ impl, P: TransactionPublisher, P: TransactionPublisher, P: TransactionPublisher, P: TransactionPublisher { - // TODO: Save this TX to the DB + Response::Signature { id, signature: signed_tx } => { + // Save this transaction to the database + { + let mut buf = Vec::with_capacity(256); + signed_tx.write(&mut buf).unwrap(); + SerializedTransactions::set(&mut txn, id, &buf); + } + // TODO: Attempt publication every minute - // TODO: On boot, reload all TXs to rebroadcast self .publisher .publish(signed_tx) @@ -124,6 +182,21 @@ impl, P: TransactionPublisher Duration::from_secs(5 * 60) { + for tx in &self.active_signing_protocols { + let Some(tx_buf) = SerializedTransactions::get(&self.db, *tx) else { continue }; + let mut tx_buf = tx_buf.as_slice(); + let tx = TransactionFor::::read(&mut tx_buf).unwrap(); + assert!(tx_buf.is_empty()); + + self.publisher.publish(tx).await.map_err(|e| format!("couldn't re-broadcast transactions: {e:?}"))?; + } + + self.last_publication = Instant::now(); + } + Ok(iterated) } }