From 8634d90b6b3ac2dc8a163db1c44367ca101a0a56 Mon Sep 17 00:00:00 2001 From: econsta <143191603+econsta@users.noreply.github.com> Date: Mon, 20 Nov 2023 12:03:45 +0400 Subject: [PATCH] implement db macro for processor/signer.rs (#438) * implement db macro for processor/signer.rs * Use () * CompletedDb -> CompletionsDb * () -> &() --------- Co-authored-by: Luke Parker --- processor/src/signer.rs | 183 ++++++++++++++++++---------------------- 1 file changed, 83 insertions(+), 100 deletions(-) diff --git a/processor/src/signer.rs b/processor/src/signer.rs index 3f07c070..b84646ee 100644 --- a/processor/src/signer.rs +++ b/processor/src/signer.rs @@ -2,7 +2,6 @@ use core::{marker::PhantomData, fmt}; use std::collections::HashMap; use rand_core::OsRng; - use ciphersuite::group::GroupEncoding; use frost::{ ThresholdKeys, FrostError, @@ -13,69 +12,74 @@ use log::{info, debug, warn, error}; use scale::Encode; use messages::sign::*; + +pub use serai_db::*; + use crate::{ Get, DbTxn, Db, networks::{Transaction, Eventuality, Network}, }; -#[derive(Debug)] -struct SignerDb(D, PhantomData); -impl SignerDb { - fn sign_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec { - D::key(b"SIGNER", dst, key) +create_db!( + SignerDb { + CompletionsDb: (id: [u8; 32]) -> Vec, + EventualityDb: (id: [u8; 32]) -> Vec, + AttemptDb: (id: &SignId) -> (), + TransactionDb: (id: &[u8]) -> Vec, + ActiveSignsDb: () -> Vec<[u8; 32]>, + CompletedOnChainDb: (id: &[u8; 32]) -> (), } +); - fn active_signs_key() -> Vec { - Self::sign_key(b"active_signs", []) - } - fn completed_on_chain_key(id: &[u8; 32]) -> Vec { - Self::sign_key(b"completed_on_chain", id) - } - fn active_signs(getter: &G) -> Vec<[u8; 32]> { - let active = getter.get(Self::active_signs_key()).unwrap_or(vec![]); - let mut active_ref = active.as_slice(); - let mut res = vec![]; - while !active_ref.is_empty() { - res.push(active_ref[.. 32].try_into().unwrap()); - active_ref = &active_ref[32 ..]; - } - res - } - fn add_active_sign(txn: &mut D::Transaction<'_>, id: &[u8; 32]) { - if txn.get(Self::completed_on_chain_key(id)).is_some() { +impl ActiveSignsDb { + fn add_active_sign(txn: &mut impl DbTxn, id: &[u8; 32]) { + if CompletedOnChainDb::get(txn, id).is_some() { return; } - let key = Self::active_signs_key(); - let mut active = txn.get(&key).unwrap_or(vec![]); - active.extend(id); - txn.put(key, active); + let mut active = ActiveSignsDb::get(txn).unwrap_or_default(); + active.push(*id); + ActiveSignsDb::set(txn, &active); } - fn complete_on_chain(txn: &mut D::Transaction<'_>, id: &[u8; 32]) { - txn.put(Self::completed_on_chain_key(id), []); - txn.put( - Self::active_signs_key(), - Self::active_signs(txn) +} + +impl CompletedOnChainDb { + fn complete_on_chain(txn: &mut impl DbTxn, id: &[u8; 32]) { + CompletedOnChainDb::set(txn, id, &()); + ActiveSignsDb::set( + txn, + &ActiveSignsDb::get(txn) + .unwrap_or_default() .into_iter() .filter(|active| active != id) .flatten() .collect::>(), ); } +} +impl CompletionsDb { + fn completions( + getter: &impl Get, + id: [u8; 32], + ) -> Vec<>::Id> { + let completions = Self::get(getter, id).unwrap_or_default(); + let mut completions_ref = completions.as_slice(); + let mut res = vec![]; + while !completions_ref.is_empty() { + let mut id = >::Id::default(); + let id_len = id.as_ref().len(); + id.as_mut().copy_from_slice(&completions_ref[.. id_len]); + completions_ref = &completions_ref[id_len ..]; + res.push(id); + } + res + } - fn transaction_key(id: &>::Id) -> Vec { - Self::sign_key(b"tx", id) - } - fn completions_key(id: [u8; 32]) -> Vec { - Self::sign_key(b"completed", id) - } - fn complete(txn: &mut D::Transaction<'_>, id: [u8; 32], tx: &N::Transaction) { + fn complete(txn: &mut impl DbTxn, id: [u8; 32], tx: &N::Transaction) { + let tx_id = tx.id(); // Transactions can be completed by multiple signatures // Save every solution in order to be robust - let tx_id = tx.id(); - txn.put(Self::transaction_key(&tx_id), tx.serialize()); - - let mut existing = txn.get(Self::completions_key(id)).unwrap_or(vec![]); - + TransactionDb::save_transaction::(txn, tx); + let mut existing = Self::get(txn, id).unwrap_or_default(); // Don't add this TX if it's already present let tx_len = tx_id.as_ref().len(); assert_eq!(existing.len() % tx_len, 0); @@ -89,50 +93,30 @@ impl SignerDb { } existing.extend(tx_id.as_ref()); - txn.put(Self::completions_key(id), existing); + Self::set(txn, id, &existing); } - fn completions(getter: &G, id: [u8; 32]) -> Vec<>::Id> { - let completions = getter.get(Self::completions_key(id)).unwrap_or(vec![]); - let mut completions_ref = completions.as_slice(); - let mut res = vec![]; - while !completions_ref.is_empty() { - let mut id = >::Id::default(); - let id_len = id.as_ref().len(); - id.as_mut().copy_from_slice(&completions_ref[.. id_len]); - completions_ref = &completions_ref[id_len ..]; - res.push(id); - } - res +} + +impl EventualityDb { + fn save_eventuality(txn: &mut impl DbTxn, id: [u8; 32], eventuality: N::Eventuality) { + txn.put(Self::key(id), eventuality.serialize()); } - fn transaction( - getter: &G, + + fn eventuality(getter: &impl Get, id: [u8; 32]) -> Option { + Some(N::Eventuality::read(&mut getter.get(Self::key(id))?.as_slice()).unwrap()) + } +} + +impl TransactionDb { + fn save_transaction(txn: &mut impl DbTxn, tx: &N::Transaction) { + Self::set(txn, tx.id().as_ref(), &tx.serialize()); + } + + fn transaction( + getter: &impl Get, id: >::Id, ) -> Option { - getter - .get(Self::transaction_key(&id)) - .map(|tx| N::Transaction::read(&mut tx.as_slice()).unwrap()) - } - - fn eventuality_key(id: [u8; 32]) -> Vec { - Self::sign_key(b"eventuality", id) - } - fn save_eventuality(txn: &mut D::Transaction<'_>, id: [u8; 32], eventuality: N::Eventuality) { - txn.put(Self::eventuality_key(id), eventuality.serialize()); - } - fn eventuality(getter: &G, id: [u8; 32]) -> Option { - Some( - N::Eventuality::read::<&[u8]>(&mut getter.get(Self::eventuality_key(id))?.as_ref()).unwrap(), - ) - } - - fn attempt_key(id: &SignId) -> Vec { - Self::sign_key(b"attempt", id.encode()) - } - fn attempt(txn: &mut D::Transaction<'_>, id: &SignId) { - txn.put(Self::attempt_key(id), []); - } - fn has_attempt(getter: &G, id: &SignId) -> bool { - getter.get(Self::attempt_key(id)).is_some() + Self::get(getter, id.as_ref()).map(|tx| N::Transaction::read(&mut tx.as_slice()).unwrap()) } } @@ -175,12 +159,12 @@ impl Signer { pub async fn rebroadcast_task(db: D, network: N) { log::info!("rebroadcasting transactions for plans whose completions yet to be confirmed..."); loop { - for active in SignerDb::::active_signs(&db) { - for completion in SignerDb::::completions(&db, active) { + for active in ActiveSignsDb::get(&db).unwrap_or_default() { + for completion in CompletionsDb::completions::(&db, active) { log::info!("rebroadcasting {}", hex::encode(&completion)); // TODO: Don't drop the error entirely. Check for invariants let _ = network - .publish_transaction(&SignerDb::::transaction(&db, completion).unwrap()) + .publish_transaction(&TransactionDb::transaction::(&db, completion).unwrap()) .await; } } @@ -237,7 +221,7 @@ impl Signer { #[must_use] fn already_completed(&self, txn: &mut D::Transaction<'_>, id: [u8; 32]) -> bool { - if !SignerDb::::completions(txn, id).is_empty() { + if !CompletionsDb::completions::(txn, id).is_empty() { debug!( "SignTransaction/Reattempt order for {}, which we've already completed signing", hex::encode(id) @@ -284,8 +268,8 @@ impl Signer { let first_completion = !self.already_completed(txn, id); // Save this completion to the DB - SignerDb::::complete_on_chain(txn, &id); - SignerDb::::complete(txn, id, &tx); + CompletedOnChainDb::complete_on_chain(txn, &id); + CompletionsDb::complete::(txn, id, &tx); if first_completion { Some(self.complete(id, tx.id())) @@ -303,7 +287,7 @@ impl Signer { id: [u8; 32], tx_id: &>::Id, ) -> Option { - if let Some(eventuality) = SignerDb::::eventuality(txn, id) { + if let Some(eventuality) = EventualityDb::eventuality::(txn, id) { // Transaction hasn't hit our mempool/was dropped for a different signature // The latter can happen given certain latency conditions/a single malicious signer // In the case of a single malicious signer, they can drag multiple honest validators down @@ -324,7 +308,7 @@ impl Signer { let first_completion = !self.already_completed(txn, id); // Save this completion to the DB - SignerDb::::complete(txn, id, &tx); + CompletionsDb::complete::(txn, id, &tx); if first_completion { return Some(self.complete(id, tx.id())); @@ -338,7 +322,7 @@ impl Signer { } } else { // If we don't have this in RAM, it should be because we already finished signing it - assert!(!SignerDb::::completions(txn, id).is_empty()); + assert!(!CompletionsDb::completions::(txn, id).is_empty()); info!( "signer {} informed of the eventuality completion for plan {}, {}", hex::encode(self.keys[0].group_key().to_bytes()), @@ -405,7 +389,7 @@ impl Signer { // // Only run if this hasn't already been attempted // TODO: This isn't complete as this txn may not be committed with the expected timing - if SignerDb::::has_attempt(txn, &id) { + if AttemptDb::get(txn, &id).is_some() { warn!( "already attempted {} #{}. this is an error if we didn't reboot", hex::encode(id.id), @@ -413,8 +397,7 @@ impl Signer { ); return None; } - - SignerDb::::attempt(txn, &id); + AttemptDb::set(txn, &id, &()); // Attempt to create the TX let mut machines = vec![]; @@ -451,13 +434,13 @@ impl Signer { ) -> Option { // The caller is expected to re-issue sign orders on reboot // This is solely used by the rebroadcast task - SignerDb::::add_active_sign(txn, &id); + ActiveSignsDb::add_active_sign(txn, &id); if self.already_completed(txn, id) { return None; } - SignerDb::::save_eventuality(txn, id, eventuality); + EventualityDb::save_eventuality::(txn, id, eventuality); self.signable.insert(id, tx); self.attempt(txn, id, 0).await @@ -605,7 +588,7 @@ impl Signer { }; // Save the transaction in case it's needed for recovery - SignerDb::::complete(txn, id.id, &tx); + CompletionsDb::complete::(txn, id.id, &tx); // Publish it let tx_id = tx.id();