From fda90e23c9d18da10d8807e515f03c6a9f39e041 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Fri, 1 Sep 2023 00:59:10 -0400 Subject: [PATCH] Reduce and clarify data flow in Tributary scanner --- coordinator/src/main.rs | 2 + coordinator/src/tributary/db.rs | 106 ++++++------ coordinator/src/tributary/handle.rs | 234 ++++++++++++--------------- coordinator/src/tributary/scanner.rs | 1 - 4 files changed, 168 insertions(+), 175 deletions(-) diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index fe8f8952..682bad94 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -359,6 +359,8 @@ pub async fn handle_p2p( } // TODO2: Rate limit this per timestamp + // And/or slash on Heartbeat which justifies a response, since the node obviously was + // offline and we must now use our bandwidth to compensate for them? P2pMessageKind::Heartbeat(genesis) => { if msg.msg.len() != 40 { log::error!("validator sent invalid heartbeat"); diff --git a/coordinator/src/tributary/db.rs b/coordinator/src/tributary/db.rs index 6b423b03..c4398289 100644 --- a/coordinator/src/tributary/db.rs +++ b/coordinator/src/tributary/db.rs @@ -8,6 +8,47 @@ use serai_client::validator_sets::primitives::{ValidatorSet, KeyPair}; pub use serai_db::*; +// Used to determine if an ID is acceptable +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub enum Zone { + Dkg, + Batch, + Sign, +} + +impl Zone { + fn label(&self) -> &'static str { + match self { + Zone::Dkg => "dkg", + Zone::Batch => "batch", + Zone::Sign => "sign", + } + } +} + +// A struct to refer to a piece of data all validators will presumably provide a value for. +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub struct DataSpecification { + pub zone: Zone, + pub label: &'static str, + pub id: [u8; 32], + pub attempt: u32, +} + +impl DataSpecification { + fn as_key(&self, genesis: [u8; 32]) -> Vec { + // TODO: Use a proper transcript here to avoid conflicts? + [ + genesis.as_ref(), + self.zone.label().as_bytes(), + self.label.as_bytes(), + self.id.as_ref(), + self.attempt.to_le_bytes().as_ref(), + ] + .concat() + } +} + #[derive(Debug)] pub struct TributaryDb(pub D); impl TributaryDb { @@ -97,24 +138,17 @@ impl TributaryDb { Some(KeyPair::decode(&mut getter.get(Self::key_pair_key(set))?.as_slice()).unwrap()) } - fn recognized_id_key(label: &'static str, genesis: [u8; 32], id: [u8; 32]) -> Vec { - Self::tributary_key(b"recognized", [label.as_bytes(), genesis.as_ref(), id.as_ref()].concat()) + fn recognized_id_key(genesis: [u8; 32], zone: Zone, id: [u8; 32]) -> Vec { + Self::tributary_key( + b"recognized", + [genesis.as_ref(), zone.label().as_bytes(), id.as_ref()].concat(), + ) } - pub fn recognized_id( - getter: &G, - label: &'static str, - genesis: [u8; 32], - id: [u8; 32], - ) -> bool { - getter.get(Self::recognized_id_key(label, genesis, id)).is_some() + pub fn recognized_id(getter: &G, genesis: [u8; 32], zone: Zone, id: [u8; 32]) -> bool { + getter.get(Self::recognized_id_key(genesis, zone, id)).is_some() } - pub fn recognize_id( - txn: &mut D::Transaction<'_>, - label: &'static str, - genesis: [u8; 32], - id: [u8; 32], - ) { - txn.put(Self::recognized_id_key(label, genesis, id), []) + pub fn recognize_id(txn: &mut D::Transaction<'_>, genesis: [u8; 32], zone: Zone, id: [u8; 32]) { + txn.put(Self::recognized_id_key(genesis, zone, id), []) } fn attempt_key(genesis: [u8; 32], id: [u8; 32]) -> Vec { @@ -127,62 +161,42 @@ impl TributaryDb { ) } - fn data_received_key( - label: &'static [u8], - genesis: [u8; 32], - id: [u8; 32], - attempt: u32, - ) -> Vec { - Self::tributary_key( - b"data_received", - [label, genesis.as_ref(), id.as_ref(), attempt.to_le_bytes().as_ref()].concat(), - ) + // Key for the amount of instances received thus far + fn data_received_key(genesis: [u8; 32], data_spec: &DataSpecification) -> Vec { + Self::tributary_key(b"data_received", data_spec.as_key(genesis)) } fn data_key( - label: &'static [u8], genesis: [u8; 32], - id: [u8; 32], - attempt: u32, + data_spec: &DataSpecification, signer: ::G, ) -> Vec { Self::tributary_key( b"data", - [ - label, - genesis.as_ref(), - id.as_ref(), - attempt.to_le_bytes().as_ref(), - signer.to_bytes().as_ref(), - ] - .concat(), + [data_spec.as_key(genesis).as_slice(), signer.to_bytes().as_ref()].concat(), ) } pub fn data( - label: &'static [u8], getter: &G, genesis: [u8; 32], - id: [u8; 32], - attempt: u32, + data_spec: &DataSpecification, signer: ::G, ) -> Option> { - getter.get(Self::data_key(label, genesis, id, attempt, signer)) + getter.get(Self::data_key(genesis, data_spec, signer)) } pub fn set_data( - label: &'static [u8], txn: &mut D::Transaction<'_>, genesis: [u8; 32], - id: [u8; 32], - attempt: u32, + data_spec: &DataSpecification, signer: ::G, data: &[u8], ) -> u16 { - let received_key = Self::data_received_key(label, genesis, id, attempt); + let received_key = Self::data_received_key(genesis, data_spec); let mut received = u16::from_le_bytes(txn.get(&received_key).unwrap_or(vec![0; 2]).try_into().unwrap()); received += 1; txn.put(received_key, received.to_le_bytes()); - txn.put(Self::data_key(label, genesis, id, attempt, signer), data); + txn.put(Self::data_key(genesis, data_spec, signer), data); received } diff --git a/coordinator/src/tributary/handle.rs b/coordinator/src/tributary/handle.rs index 868500c0..ae5d3164 100644 --- a/coordinator/src/tributary/handle.rs +++ b/coordinator/src/tributary/handle.rs @@ -33,13 +33,18 @@ use processor_messages::{ use serai_db::{Get, Db}; -use crate::processors::Processors; -use super::{Transaction, TributarySpec, TributaryDb, scanner::RecognizedIdType}; +use crate::{ + processors::Processors, + tributary::{ + Transaction, TributarySpec, Zone, DataSpecification, TributaryDb, scanner::RecognizedIdType, + }, +}; -const DKG_CONFIRMATION_NONCES: &[u8] = b"dkg_confirmation_nonces"; -const DKG_CONFIRMATION_SHARES: &[u8] = b"dkg_confirmation_shares"; +const DKG_CONFIRMATION_NONCES: &str = "confirmation_nonces"; +const DKG_CONFIRMATION_SHARES: &str = "confirmation_shares"; -// Instead of maintaing state, this simply re-creates the machine(s) in-full on every call. +// Instead of maintaing state, this simply re-creates the machine(s) in-full on every call (which +// should only be once per tributary). // This simplifies data flow and prevents requiring multiple paths. // While more expensive, this only runs an O(n) algorithm, which is tolerable to run multiple // times. @@ -154,27 +159,18 @@ impl DkgConfirmer { } } -#[allow(clippy::too_many_arguments)] // TODO fn read_known_to_exist_data( getter: &G, spec: &TributarySpec, key: &Zeroizing<::F>, - label: &'static [u8], - id: [u8; 32], + data_spec: &DataSpecification, needed: u16, - attempt: u32, - bytes: Vec, - signed: Option<&Signed>, ) -> Option>> { let mut data = HashMap::new(); for validator in spec.validators().iter().map(|validator| validator.0) { data.insert( spec.i(validator).unwrap(), - if Some(&validator) == signed.map(|signed| &signed.signer) { - bytes.clone() - } else if let Some(data) = - TributaryDb::::data(label, getter, spec.genesis(), id, attempt, validator) - { + if let Some(data) = TributaryDb::::data(getter, spec.genesis(), data_spec, validator) { data } else { continue; @@ -220,19 +216,14 @@ pub fn generated_key_pair( txn, spec, key, - DKG_CONFIRMATION_NONCES, - [0; 32], + &DataSpecification { zone: Zone::Dkg, label: DKG_CONFIRMATION_NONCES, id: [0; 32], attempt }, spec.n(), - attempt, - vec![], - None, ) else { panic!("wasn't a participant in confirming a key pair"); }; DkgConfirmer::share(spec, key, attempt, preprocesses, key_pair) } -#[allow(clippy::too_many_arguments)] // TODO pub async fn handle_application_tx< D: Db, Pro: Processors, @@ -245,93 +236,68 @@ pub async fn handle_application_tx< spec: &TributarySpec, processors: &Pro, publish_serai_tx: PST, - genesis: [u8; 32], key: &Zeroizing<::F>, recognized_id: RID, txn: &mut ::Transaction<'_>, ) { - // Used to determine if an ID is acceptable - #[derive(Clone, Copy, PartialEq, Eq, Debug)] - enum Zone { - Dkg, - Batch, - Sign, - } + let genesis = spec.genesis(); - impl Zone { - fn label(&self) -> &'static str { - match self { - Zone::Dkg => { - panic!("getting the label for dkg despite dkg code paths not needing a label") - } - Zone::Batch => "batch", - Zone::Sign => "sign", - } + let handle = |txn: &mut _, data_spec: &DataSpecification, bytes: Vec, signed: &Signed| { + if data_spec.zone == Zone::Dkg { + // Since Dkg doesn't have an ID, solely attempts, this should just be [0; 32] + assert_eq!(data_spec.id, [0; 32], "DKG, which shouldn't have IDs, had a non-0 ID"); + } else if !TributaryDb::::recognized_id(txn, genesis, data_spec.zone, data_spec.id) { + // TODO: Full slash + todo!(); } - } - let handle = - |txn: &mut _, zone: Zone, label, needed, id, attempt, bytes: Vec, signed: &Signed| { - if zone == Zone::Dkg { - // Since Dkg doesn't have an ID, solely attempts, this should just be [0; 32] - assert_eq!(id, [0; 32], "DKG, which shouldn't have IDs, had a non-0 ID"); - } else if !TributaryDb::::recognized_id(txn, zone.label(), genesis, id) { + // If they've already published a TX for this attempt, slash + if let Some(data) = TributaryDb::::data(txn, genesis, data_spec, signed.signer) { + if data != bytes { // TODO: Full slash todo!(); } - // If they've already published a TX for this attempt, slash - if let Some(data) = TributaryDb::::data(label, txn, genesis, id, attempt, signed.signer) { - if data != bytes { - // TODO: Full slash - todo!(); - } + // TODO: Slash + return None; + } - // TODO: Slash - return None; - } + // If the attempt is lesser than the blockchain's, slash + let curr_attempt = TributaryDb::::attempt(txn, genesis, data_spec.id); + if data_spec.attempt < curr_attempt { + // TODO: Slash for being late + return None; + } + if data_spec.attempt > curr_attempt { + // TODO: Full slash + todo!(); + } - // If the attempt is lesser than the blockchain's, slash - let curr_attempt = TributaryDb::::attempt(txn, genesis, id); - if attempt < curr_attempt { - // TODO: Slash for being late - return None; - } - if attempt > curr_attempt { - // TODO: Full slash - todo!(); - } + // TODO: We can also full slash if shares before all commitments, or share before the + // necessary preprocesses - // TODO: We can also full slash if shares before all commitments, or share before the - // necessary preprocesses + // TODO: If this is shares, we need to check they are part of the selected signing set - // TODO: If this is shares, we need to check they are part of the selected signing set + // Store this data + let received = TributaryDb::::set_data(txn, genesis, data_spec, signed.signer, &bytes); - // Store this data - let received = - TributaryDb::::set_data(label, txn, genesis, id, attempt, signed.signer, &bytes); - - // If we have all the needed commitments/preprocesses/shares, tell the processor - // TODO: This needs to be coded by weight, not by validator count - if received == needed { - return Some(read_known_to_exist_data::( - txn, - spec, - key, - label, - id, - needed, - attempt, - bytes, - Some(signed), - )); - } - None - }; + // If we have all the needed commitments/preprocesses/shares, tell the processor + // TODO: This needs to be coded by weight, not by validator count + let needed = if data_spec.zone == Zone::Dkg { spec.n() } else { spec.t() }; + if received == needed { + return Some(read_known_to_exist_data::(txn, spec, key, data_spec, needed)); + } + None + }; match tx { Transaction::DkgCommitments(attempt, bytes, signed) => { - match handle(txn, Zone::Dkg, b"dkg_commitments", spec.n(), [0; 32], attempt, bytes, &signed) { + match handle( + txn, + &DataSpecification { zone: Zone::Dkg, label: "commitments", id: [0; 32], attempt }, + bytes, + &signed, + ) { Some(Some(commitments)) => { log::info!("got all DkgCommitments for {}", hex::encode(genesis)); processors @@ -377,15 +343,21 @@ pub async fn handle_application_tx< let confirmation_nonces = handle( txn, - Zone::Dkg, - DKG_CONFIRMATION_NONCES, - spec.n(), - [0; 32], - attempt, + &DataSpecification { + zone: Zone::Dkg, + label: DKG_CONFIRMATION_NONCES, + id: [0; 32], + attempt, + }, confirmation_nonces.to_vec(), &signed, ); - match handle(txn, Zone::Dkg, b"dkg_shares", spec.n(), [0; 32], attempt, bytes, &signed) { + match handle( + txn, + &DataSpecification { zone: Zone::Dkg, label: "shares", id: [0; 32], attempt }, + bytes, + &signed, + ) { Some(Some(shares)) => { log::info!("got all DkgShares for {}", hex::encode(genesis)); assert!(confirmation_nonces.is_some()); @@ -407,11 +379,12 @@ pub async fn handle_application_tx< Transaction::DkgConfirmed(attempt, shares, signed) => { match handle( txn, - Zone::Dkg, - DKG_CONFIRMATION_SHARES, - spec.n(), - [0; 32], - attempt, + &DataSpecification { + zone: Zone::Dkg, + label: DKG_CONFIRMATION_SHARES, + id: [0; 32], + attempt, + }, shares.to_vec(), &signed, ) { @@ -422,12 +395,13 @@ pub async fn handle_application_tx< txn, spec, key, - DKG_CONFIRMATION_NONCES, - [0; 32], + &DataSpecification { + zone: Zone::Dkg, + label: DKG_CONFIRMATION_NONCES, + id: [0; 32], + attempt, + }, spec.n(), - attempt, - vec![], - None, ) else { panic!("wasn't a participant in DKG confirmation nonces"); }; @@ -458,7 +432,7 @@ pub async fn handle_application_tx< Transaction::Batch(_, batch) => { // Because this Batch has achieved synchrony, its batch ID should be authorized - TributaryDb::::recognize_id(txn, Zone::Batch.label(), genesis, batch); + TributaryDb::::recognize_id(txn, genesis, Zone::Batch, batch); recognized_id(spec.set().network, genesis, RecognizedIdType::Batch, batch).await; } @@ -469,7 +443,7 @@ pub async fn handle_application_tx< ); for id in plan_ids { - TributaryDb::::recognize_id(txn, Zone::Sign.label(), genesis, id); + TributaryDb::::recognize_id(txn, genesis, Zone::Sign, id); recognized_id(spec.set().network, genesis, RecognizedIdType::Plan, id).await; } } @@ -477,11 +451,12 @@ pub async fn handle_application_tx< Transaction::BatchPreprocess(data) => { match handle( txn, - Zone::Batch, - b"batch_preprocess", - spec.t(), - data.plan, - data.attempt, + &DataSpecification { + zone: Zone::Batch, + label: "preprocess", + id: data.plan, + attempt: data.attempt, + }, data.data, &data.signed, ) { @@ -503,11 +478,12 @@ pub async fn handle_application_tx< Transaction::BatchShare(data) => { match handle( txn, - Zone::Batch, - b"batch_share", - spec.t(), - data.plan, - data.attempt, + &DataSpecification { + zone: Zone::Batch, + label: "share", + id: data.plan, + attempt: data.attempt, + }, data.data, &data.signed, ) { @@ -534,11 +510,12 @@ pub async fn handle_application_tx< let key_pair = TributaryDb::::key_pair(txn, spec.set()); match handle( txn, - Zone::Sign, - b"sign_preprocess", - spec.t(), - data.plan, - data.attempt, + &DataSpecification { + zone: Zone::Sign, + label: "preprocess", + id: data.plan, + attempt: data.attempt, + }, data.data, &data.signed, ) { @@ -568,11 +545,12 @@ pub async fn handle_application_tx< let key_pair = TributaryDb::::key_pair(txn, spec.set()); match handle( txn, - Zone::Sign, - b"sign_share", - spec.t(), - data.plan, - data.attempt, + &DataSpecification { + zone: Zone::Sign, + label: "share", + id: data.plan, + attempt: data.attempt, + }, data.data, &data.signed, ) { diff --git a/coordinator/src/tributary/scanner.rs b/coordinator/src/tributary/scanner.rs index 05df7395..fe8a18d2 100644 --- a/coordinator/src/tributary/scanner.rs +++ b/coordinator/src/tributary/scanner.rs @@ -84,7 +84,6 @@ async fn handle_block< spec, processors, publish_serai_tx.clone(), - genesis, key, recognized_id.clone(), &mut txn,