From 0198d4cc468200620284ef45a20c6abc03e06700 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Tue, 24 Oct 2023 03:00:37 -0400 Subject: [PATCH] Add a TributaryState struct with higher-level DB logic --- Cargo.lock | 1 + coordinator/Cargo.toml | 1 + coordinator/src/tests/tributary/dkg.rs | 3 +- coordinator/src/tributary/db.rs | 92 +++++++++++++++- coordinator/src/tributary/handle.rs | 142 +++++++++---------------- 5 files changed, 142 insertions(+), 97 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ca59f348..c28e1551 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8188,6 +8188,7 @@ name = "serai-coordinator" version = "0.1.0" dependencies = [ "async-trait", + "bincode", "blake2", "ciphersuite", "env_logger", diff --git a/coordinator/Cargo.toml b/coordinator/Cargo.toml index d00e9abe..5ce736ac 100644 --- a/coordinator/Cargo.toml +++ b/coordinator/Cargo.toml @@ -42,6 +42,7 @@ tributary = { package = "tributary-chain", path = "./tributary" } serai-client = { path = "../substrate/client", features = ["serai"] } hex = "0.4" +bincode = "1" serde_json = { version = "1", default-features = false } log = "0.4" diff --git a/coordinator/src/tests/tributary/dkg.rs b/coordinator/src/tests/tributary/dkg.rs index 433faad3..a3c6643e 100644 --- a/coordinator/src/tests/tributary/dkg.rs +++ b/coordinator/src/tests/tributary/dkg.rs @@ -283,8 +283,7 @@ async fn dkg_test() { let mut txs = vec![]; for key in keys.iter() { let attempt = 0; - // This is fine to re-use the one DB as such, due to exactly how this specific call is coded, - // albeit poor + let (mut scanner_db, _) = new_processors(key, &spec, &tributaries[0].1).await; let mut txn = scanner_db.0.txn(); let share = crate::tributary::generated_key_pair::(&mut txn, key, &spec, &key_pair, 0).unwrap(); diff --git a/coordinator/src/tributary/db.rs b/coordinator/src/tributary/db.rs index 3a936f63..4f5fdb58 100644 --- a/coordinator/src/tributary/db.rs +++ b/coordinator/src/tributary/db.rs @@ -1,13 +1,18 @@ -use std::io::Read; +use core::{marker::PhantomData, ops::Deref}; +use std::{io::Read, collections::HashMap}; use scale::{Encode, Decode}; +use zeroize::Zeroizing; use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto}; +use frost::Participant; use serai_client::validator_sets::primitives::{ValidatorSet, KeyPair}; pub use serai_db::*; +use crate::tributary::TributarySpec; + #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub enum Topic { Dkg, @@ -125,6 +130,29 @@ impl TributaryDb { }) } + fn confirmation_nonces_key(genesis: [u8; 32], attempt: u32) -> Vec { + Self::tributary_key(b"confirmation_nonces", (genesis, attempt).encode()) + } + pub fn save_confirmation_nonces( + txn: &mut D::Transaction<'_>, + genesis: [u8; 32], + attempt: u32, + nonces: HashMap>, + ) { + let nonces = + nonces.into_iter().map(|(key, value)| (u16::from(key), value)).collect::>(); + txn.put(Self::confirmation_nonces_key(genesis, attempt), bincode::serialize(&nonces).unwrap()) + } + pub fn confirmation_nonces( + getter: &G, + genesis: [u8; 32], + attempt: u32, + ) -> Option>> { + let bytes = getter.get(Self::confirmation_nonces_key(genesis, attempt))?; + let map: HashMap> = bincode::deserialize(&bytes).unwrap(); + Some(map.into_iter().map(|(key, value)| (Participant::new(key).unwrap(), value)).collect()) + } + // The key pair which we're actively working on completing fn currently_completing_key_pair_key(genesis: [u8; 32]) -> Vec { Self::tributary_key(b"currently_completing_key_pair", genesis) @@ -221,3 +249,65 @@ impl TributaryDb { txn.put(Self::event_key(&id, index), []); } } + +pub enum DataSet { + Participating(HashMap>), + NotParticipating, +} + +pub enum Accumulation { + Ready(DataSet), + NotReady, +} + +pub struct TributaryState(PhantomData); +impl TributaryState { + pub fn accumulate( + txn: &mut D::Transaction<'_>, + our_key: &Zeroizing<::F>, + spec: &TributarySpec, + data_spec: &DataSpecification, + signer: ::G, + data: &[u8], + ) -> Accumulation { + if TributaryDb::::data(txn, spec.genesis(), data_spec, signer).is_some() { + panic!("accumulating data for a participant multiple times"); + } + let received = TributaryDb::::set_data(txn, spec.genesis(), data_spec, signer, data); + + // If we have all the needed commitments/preprocesses/shares, tell the processor + // TODO: This needs to be coded by weight, not by validator count + let needed = if data_spec.topic == Topic::Dkg { spec.n() } else { spec.t() }; + if received == needed { + return Accumulation::Ready({ + let mut data = HashMap::new(); + for validator in spec.validators().iter().map(|validator| validator.0) { + data.insert( + spec.i(validator).unwrap(), + if let Some(data) = TributaryDb::::data(txn, spec.genesis(), data_spec, validator) { + data + } else { + continue; + }, + ); + } + assert_eq!(data.len(), usize::from(needed)); + + // Remove our own piece of data, if we were involved + if data + .remove( + &spec + .i(Ristretto::generator() * our_key.deref()) + .expect("handling a message for a Tributary we aren't part of"), + ) + .is_some() + { + DataSet::Participating(data) + } else { + DataSet::NotParticipating + } + }); + } + Accumulation::NotReady + } +} diff --git a/coordinator/src/tributary/handle.rs b/coordinator/src/tributary/handle.rs index cde71d3c..4641741b 100644 --- a/coordinator/src/tributary/handle.rs +++ b/coordinator/src/tributary/handle.rs @@ -1,5 +1,4 @@ use core::{ops::Deref, future::Future}; -use std::collections::HashMap; use zeroize::Zeroizing; @@ -21,12 +20,13 @@ use processor_messages::{ sign::{self, SignId}, }; -use serai_db::{Get, Db}; +use serai_db::Db; use crate::{ processors::Processors, tributary::{ - Transaction, TributarySpec, Topic, DataSpecification, TributaryDb, + Transaction, TributarySpec, Topic, DataSpecification, TributaryDb, DataSet, Accumulation, + TributaryState, nonce_decider::NonceDecider, dkg_confirmer::DkgConfirmer, scanner::{RecognizedIdType, RIDTrait}, @@ -46,41 +46,6 @@ const BATCH_SHARE: &str = "b_share"; const SIGN_PREPROCESS: &str = "s_preprocess"; const SIGN_SHARE: &str = "s_share"; -fn read_known_to_exist_data( - getter: &G, - spec: &TributarySpec, - key: &Zeroizing<::F>, - data_spec: &DataSpecification, - needed: u16, -) -> Option>> { - let mut data = HashMap::new(); - for validator in spec.validators().iter().map(|validator| validator.0) { - data.insert( - spec.i(validator).unwrap(), - if let Some(data) = TributaryDb::::data(getter, spec.genesis(), data_spec, validator) { - data - } else { - continue; - }, - ); - } - assert_eq!(data.len(), usize::from(needed)); - - // Remove our own piece of data, if we were involved - if data - .remove( - &spec - .i(Ristretto::generator() * key.deref()) - .expect("handling a message for a Tributary we aren't part of"), - ) - .is_some() - { - Some(data) - } else { - None - } -} - pub fn dkg_confirmation_nonces( key: &Zeroizing<::F>, spec: &TributarySpec, @@ -98,16 +63,7 @@ pub fn generated_key_pair( attempt: u32, ) -> Result<[u8; 32], Participant> { TributaryDb::::save_currently_completing_key_pair(txn, spec.genesis(), key_pair); - - let Some(preprocesses) = read_known_to_exist_data::( - txn, - spec, - key, - &DataSpecification { topic: Topic::Dkg, label: DKG_CONFIRMATION_NONCES, attempt }, - spec.n(), - ) else { - panic!("wasn't a participant in confirming a key pair"); - }; + let preprocesses = TributaryDb::::confirmation_nonces(txn, spec.genesis(), attempt).unwrap(); DkgConfirmer::share(spec, key, attempt, preprocesses, key_pair) } @@ -152,19 +108,19 @@ pub(crate) async fn handle_application_tx< signed.signer.to_bytes(), "published data for ID without an attempt", ); - return None; + return Accumulation::NotReady; }; // If they've already published a TX for this attempt, slash if TributaryDb::::data(txn, genesis, data_spec, signed.signer).is_some() { fatal_slash::(txn, genesis, signed.signer.to_bytes(), "published data multiple times"); - return None; + return Accumulation::NotReady; } // If the attempt is lesser than the blockchain's, slash if data_spec.attempt < curr_attempt { // TODO: Slash for being late - return None; + return Accumulation::NotReady; } // If the attempt is greater, this is a premature publication, full slash if data_spec.attempt > curr_attempt { @@ -174,7 +130,7 @@ pub(crate) async fn handle_application_tx< signed.signer.to_bytes(), "published data with an attempt which hasn't started", ); - return None; + return Accumulation::NotReady; } // TODO: We can also full slash if shares before all commitments, or share before the @@ -182,16 +138,8 @@ pub(crate) async fn handle_application_tx< // TODO: If this is shares, we need to check they are part of the selected signing set - // Store this data - let received = TributaryDb::::set_data(txn, genesis, data_spec, signed.signer, &bytes); - - // If we have all the needed commitments/preprocesses/shares, tell the processor - // TODO: This needs to be coded by weight, not by validator count - let needed = if data_spec.topic == Topic::Dkg { spec.n() } else { spec.t() }; - if received == needed { - return Some(read_known_to_exist_data::(txn, spec, key, data_spec, needed)); - } - None + // Accumulate this data + TributaryState::::accumulate(txn, key, spec, data_spec, signed.signer, &bytes) }; match tx { @@ -202,7 +150,7 @@ pub(crate) async fn handle_application_tx< bytes, &signed, ) { - Some(Some(commitments)) => { + Accumulation::Ready(DataSet::Participating(commitments)) => { log::info!("got all DkgCommitments for {}", hex::encode(genesis)); processors .send( @@ -214,8 +162,10 @@ pub(crate) async fn handle_application_tx< ) .await; } - Some(None) => panic!("wasn't a participant in DKG commitments"), - None => {} + Accumulation::Ready(DataSet::NotParticipating) => { + panic!("wasn't a participant in DKG commitments") + } + Accumulation::NotReady => {} } } @@ -257,9 +207,16 @@ pub(crate) async fn handle_application_tx< bytes, &signed, ) { - Some(Some(shares)) => { + Accumulation::Ready(DataSet::Participating(shares)) => { log::info!("got all DkgShares for {}", hex::encode(genesis)); - assert!(confirmation_nonces.is_some()); + + let Accumulation::Ready(DataSet::Participating(confirmation_nonces)) = + confirmation_nonces + else { + panic!("got all DKG shares yet confirmation nonces aren't Ready(Participating(_))"); + }; + TributaryDb::::save_confirmation_nonces(txn, genesis, attempt, confirmation_nonces); + processors .send( spec.set().network, @@ -270,8 +227,10 @@ pub(crate) async fn handle_application_tx< ) .await; } - Some(None) => panic!("wasn't a participant in DKG shares"), - None => assert!(confirmation_nonces.is_none()), + Accumulation::Ready(DataSet::NotParticipating) => { + panic!("wasn't a participant in DKG shares") + } + Accumulation::NotReady => assert!(matches!(confirmation_nonces, Accumulation::NotReady)), } } @@ -282,19 +241,12 @@ pub(crate) async fn handle_application_tx< shares.to_vec(), &signed, ) { - Some(Some(shares)) => { + Accumulation::Ready(DataSet::Participating(shares)) => { log::info!("got all DkgConfirmed for {}", hex::encode(genesis)); - let Some(preprocesses) = read_known_to_exist_data::( - txn, - spec, - key, - &DataSpecification { topic: Topic::Dkg, label: DKG_CONFIRMATION_NONCES, attempt }, - spec.n(), - ) else { - panic!("wasn't a participant in DKG confirmation nonces"); - }; - + let preprocesses = TributaryDb::::confirmation_nonces(txn, genesis, attempt).unwrap(); + // TODO: This can technically happen under very very very specific timing as the txn put + // happens before DkgConfirmed, yet the txn commit isn't guaranteed to let key_pair = TributaryDb::::currently_completing_key_pair(txn, genesis) .unwrap_or_else(|| { panic!( @@ -314,8 +266,10 @@ pub(crate) async fn handle_application_tx< ) .await; } - Some(None) => panic!("wasn't a participant in DKG confirmination shares"), - None => {} + Accumulation::Ready(DataSet::NotParticipating) => { + panic!("wasn't a participant in DKG confirmination shares") + } + Accumulation::NotReady => {} } } @@ -350,7 +304,7 @@ pub(crate) async fn handle_application_tx< data.data, &data.signed, ) { - Some(Some(preprocesses)) => { + Accumulation::Ready(DataSet::Participating(preprocesses)) => { NonceDecider::::selected_for_signing_batch(txn, genesis, data.plan); let key = TributaryDb::::key_pair(txn, spec.set()).unwrap().0 .0.to_vec(); processors @@ -363,8 +317,8 @@ pub(crate) async fn handle_application_tx< ) .await; } - Some(None) => {} - None => {} + Accumulation::Ready(DataSet::NotParticipating) => {} + Accumulation::NotReady => {} } } Transaction::BatchShare(data) => { @@ -378,7 +332,7 @@ pub(crate) async fn handle_application_tx< data.data, &data.signed, ) { - Some(Some(shares)) => { + Accumulation::Ready(DataSet::Participating(shares)) => { let key = TributaryDb::::key_pair(txn, spec.set()).unwrap().0 .0.to_vec(); processors .send( @@ -393,8 +347,8 @@ pub(crate) async fn handle_application_tx< ) .await; } - Some(None) => {} - None => {} + Accumulation::Ready(DataSet::NotParticipating) => {} + Accumulation::NotReady => {} } } @@ -410,7 +364,7 @@ pub(crate) async fn handle_application_tx< data.data, &data.signed, ) { - Some(Some(preprocesses)) => { + Accumulation::Ready(DataSet::Participating(preprocesses)) => { NonceDecider::::selected_for_signing_plan(txn, genesis, data.plan); processors .send( @@ -429,8 +383,8 @@ pub(crate) async fn handle_application_tx< ) .await; } - Some(None) => {} - None => {} + Accumulation::Ready(DataSet::NotParticipating) => {} + Accumulation::NotReady => {} } } Transaction::SignShare(data) => { @@ -445,7 +399,7 @@ pub(crate) async fn handle_application_tx< data.data, &data.signed, ) { - Some(Some(shares)) => { + Accumulation::Ready(DataSet::Participating(shares)) => { processors .send( spec.set().network, @@ -463,8 +417,8 @@ pub(crate) async fn handle_application_tx< ) .await; } - Some(None) => {} - None => {} + Accumulation::Ready(DataSet::NotParticipating) => {} + Accumulation::NotReady => {} } } Transaction::SignCompleted { plan, tx_hash, .. } => {