diff --git a/coordinator/src/tributary/db.rs b/coordinator/src/tributary/db.rs index 9dc47ae1..0858908a 100644 --- a/coordinator/src/tributary/db.rs +++ b/coordinator/src/tributary/db.rs @@ -25,56 +25,72 @@ impl TributaryDb { self.0.get(Self::block_key(genesis)).unwrap_or(genesis.to_vec()).try_into().unwrap() } - fn dkg_attempt_key(genesis: [u8; 32]) -> Vec { - Self::tributary_key(b"dkg_attempt", genesis) + fn attempt_key(genesis: [u8; 32], id: [u8; 32]) -> Vec { + let genesis_ref: &[u8] = genesis.as_ref(); + Self::tributary_key(b"attempt", [genesis_ref, id.as_ref()].concat()) } - pub fn dkg_attempt(getter: &G, genesis: [u8; 32]) -> u32 { + pub fn attempt(getter: &G, genesis: [u8; 32], id: [u8; 32]) -> u32 { u32::from_le_bytes( - getter.get(Self::dkg_attempt_key(genesis)).unwrap_or(vec![0; 4]).try_into().unwrap(), + getter.get(Self::attempt_key(genesis, id)).unwrap_or(vec![0; 4]).try_into().unwrap(), ) } - fn dkg_data_received_key(label: &'static [u8], genesis: &[u8], attempt: u32) -> Vec { - Self::tributary_key( - b"dkg_data_received", - [label, genesis, attempt.to_le_bytes().as_ref()].concat(), - ) - } - fn dkg_data_key( + fn data_received_key( label: &'static [u8], - genesis: &[u8], - signer: &::G, + genesis: [u8; 32], + id: [u8; 32], attempt: u32, ) -> Vec { Self::tributary_key( - b"dkg_data", - [label, genesis, signer.to_bytes().as_ref(), attempt.to_le_bytes().as_ref()].concat(), + b"data_received", + [label, genesis.as_ref(), id.as_ref(), attempt.to_le_bytes().as_ref()].concat(), ) } - pub fn dkg_data( + fn data_key( + label: &'static [u8], + genesis: [u8; 32], + id: [u8; 32], + attempt: u32, + signer: &::G, + ) -> Vec { + Self::tributary_key( + b"data", + [ + label, + genesis.as_ref(), + id.as_ref(), + attempt.to_le_bytes().as_ref(), + signer.to_bytes().as_ref(), + ] + .concat(), + ) + } + pub fn data( label: &'static [u8], getter: &G, genesis: [u8; 32], - signer: &::G, + id: [u8; 32], attempt: u32, + signer: &::G, ) -> Option> { - getter.get(Self::dkg_data_key(label, &genesis, signer, attempt)) + getter.get(Self::data_key(label, genesis, id, attempt, signer)) } - pub fn set_dkg_data( + pub fn set_data( label: &'static [u8], txn: &mut D::Transaction<'_>, genesis: [u8; 32], - signer: &::G, + id: [u8; 32], attempt: u32, + signer: &::G, data: &[u8], ) -> u16 { - let received_key = Self::dkg_data_received_key(label, &genesis, attempt); + let received_key = Self::data_received_key(label, genesis, id, attempt); let mut received = u16::from_le_bytes(txn.get(&received_key).unwrap_or(vec![0; 2]).try_into().unwrap()); received += 1; txn.put(received_key, received.to_le_bytes()); - txn.put(Self::dkg_data_key(label, &genesis, signer, attempt), data); + txn.put(Self::data_key(label, genesis, id, attempt, signer), data); received } diff --git a/coordinator/src/tributary/mod.rs b/coordinator/src/tributary/mod.rs index 8d879177..dc44351d 100644 --- a/coordinator/src/tributary/mod.rs +++ b/coordinator/src/tributary/mod.rs @@ -327,6 +327,13 @@ impl TransactionTrait for Transaction { fn verify(&self) -> Result<(), TransactionError> { // TODO: Augment with checks that the Vecs can be deser'd and are for recognized IDs + + if let Transaction::BatchShare(data) = self { + if data.data.len() != 32 { + Err(TransactionError::InvalidContent)?; + } + } + Ok(()) } } diff --git a/coordinator/src/tributary/scanner.rs b/coordinator/src/tributary/scanner.rs index 1871fc27..abb3aca6 100644 --- a/coordinator/src/tributary/scanner.rs +++ b/coordinator/src/tributary/scanner.rs @@ -9,7 +9,8 @@ use tributary::{Signed, Block, P2p, Tributary}; use processor_messages::{ key_gen::{self, KeyGenId}, - CoordinatorMessage, + sign::{self, SignId}, + coordinator, CoordinatorMessage, }; use serai_db::DbTxn; @@ -36,10 +37,10 @@ async fn handle_block( if !TributaryDb::::handled_event(&db.0, hash, event_id) { let mut txn = db.0.txn(); - let mut handle_dkg = |label, attempt, mut bytes: Vec, signed: Signed| { + let mut handle = |label, needed, id, attempt, mut bytes: Vec, signed: Signed| { // If they've already published a TX for this attempt, slash if let Some(data) = - TributaryDb::::dkg_data(label, &txn, tributary.genesis(), &signed.signer, attempt) + TributaryDb::::data(label, &txn, tributary.genesis(), id, attempt, &signed.signer) { if data != bytes { // TODO: Full slash @@ -51,7 +52,7 @@ async fn handle_block( } // If the attempt is lesser than the blockchain's, slash - let curr_attempt = TributaryDb::::dkg_attempt(&txn, tributary.genesis()); + let curr_attempt = TributaryDb::::attempt(&txn, tributary.genesis(), id); if attempt < curr_attempt { // TODO: Slash for being late return None; @@ -62,46 +63,48 @@ async fn handle_block( } // Store this data - let received = TributaryDb::::set_dkg_data( + let received = TributaryDb::::set_data( label, &mut txn, tributary.genesis(), - &signed.signer, + id, attempt, + &signed.signer, &bytes, ); - // If we have all commitments/shares, tell the processor - if received == spec.n() { + // If we have all the needed commitments/preprocesses/shares, tell the processor + if received == needed { let mut data = HashMap::new(); for validator in spec.validators().keys() { data.insert( spec.i(*validator).unwrap(), if validator == &signed.signer { bytes.split_off(0) + } else if let Some(data) = + TributaryDb::::data(label, &txn, tributary.genesis(), id, attempt, validator) + { + data } else { - TributaryDb::::dkg_data(label, &txn, tributary.genesis(), validator, attempt) - .unwrap_or_else(|| { - panic!( - "received all DKG data yet couldn't load {} for a validator", - std::str::from_utf8(label).unwrap(), - ) - }) + continue; }, ); } + assert_eq!(data.len(), usize::from(needed)); - return Some((KeyGenId { set: spec.set(), attempt }, data)); + return Some(data); } None }; match tx { Transaction::DkgCommitments(attempt, bytes, signed) => { - if let Some((id, commitments)) = handle_dkg(b"commitments", attempt, bytes, signed) { + if let Some(commitments) = + handle(b"dkg_commitments", spec.n(), [0; 32], attempt, bytes, signed) + { processor .send(CoordinatorMessage::KeyGen(key_gen::CoordinatorMessage::Commitments { - id, + id: KeyGenId { set: spec.set(), attempt }, commitments, })) .await; @@ -122,20 +125,77 @@ async fn handle_block( ) .unwrap(); - if let Some((id, shares)) = handle_dkg(b"shares", attempt, bytes, signed) { + if let Some(shares) = handle(b"dkg_shares", spec.n(), [0; 32], attempt, bytes, signed) { processor - .send(CoordinatorMessage::KeyGen(key_gen::CoordinatorMessage::Shares { id, shares })) + .send(CoordinatorMessage::KeyGen(key_gen::CoordinatorMessage::Shares { + id: KeyGenId { set: spec.set(), attempt }, + shares, + })) .await; } } - Transaction::SignPreprocess(..) => todo!(), - Transaction::SignShare(..) => todo!(), + Transaction::SignPreprocess(data) => { + // TODO: Validate data.plan + if let Some(preprocesses) = + handle(b"sign_preprocess", spec.t(), data.plan, data.attempt, data.data, data.signed) + { + processor + .send(CoordinatorMessage::Sign(sign::CoordinatorMessage::Preprocesses { + id: SignId { key: todo!(), id: data.plan, attempt: data.attempt }, + preprocesses, + })) + .await; + } + } + Transaction::SignShare(data) => { + // TODO: Validate data.plan + if let Some(shares) = + handle(b"sign_share", spec.t(), data.plan, data.attempt, data.data, data.signed) + { + processor + .send(CoordinatorMessage::Sign(sign::CoordinatorMessage::Shares { + id: SignId { key: todo!(), id: data.plan, attempt: data.attempt }, + shares, + })) + .await; + } + } + // TODO Transaction::FinalizedBlock(..) => todo!(), - Transaction::BatchPreprocess(..) => todo!(), - Transaction::BatchShare(..) => todo!(), + Transaction::BatchPreprocess(data) => { + // TODO: Validate data.plan + if let Some(preprocesses) = + handle(b"batch_preprocess", spec.t(), data.plan, data.attempt, data.data, data.signed) + { + processor + .send(CoordinatorMessage::Coordinator( + coordinator::CoordinatorMessage::BatchPreprocesses { + id: SignId { key: todo!(), id: data.plan, attempt: data.attempt }, + preprocesses, + }, + )) + .await; + } + } + Transaction::BatchShare(data) => { + // TODO: Validate data.plan + if let Some(shares) = + handle(b"batch_share", spec.t(), data.plan, data.attempt, data.data, data.signed) + { + processor + .send(CoordinatorMessage::Coordinator(coordinator::CoordinatorMessage::BatchShares { + id: SignId { key: todo!(), id: data.plan, attempt: data.attempt }, + shares: shares + .drain() + .map(|(validator, share)| (validator, share.try_into().unwrap())) + .collect(), + })) + .await; + } + } } TributaryDb::::handle_event(&mut txn, hash, event_id); @@ -143,6 +203,8 @@ async fn handle_block( } event_id += 1; } + + // TODO: Trigger any necessary re-attempts } pub async fn handle_new_blocks( diff --git a/coordinator/tributary/src/transaction.rs b/coordinator/tributary/src/transaction.rs index 30e07a52..390a031a 100644 --- a/coordinator/tributary/src/transaction.rs +++ b/coordinator/tributary/src/transaction.rs @@ -13,17 +13,20 @@ use crate::{TRANSACTION_SIZE_LIMIT, ReadWrite}; #[derive(Clone, PartialEq, Eq, Debug, Error)] pub enum TransactionError { /// Transaction exceeded the size limit. - #[error("transaction was too large")] + #[error("transaction is too large")] TooLargeTransaction, - /// This transaction's signer isn't a participant. + /// Transaction's signer isn't a participant. #[error("invalid signer")] InvalidSigner, - /// This transaction's nonce isn't the prior nonce plus one. + /// Transaction's nonce isn't the prior nonce plus one. #[error("invalid nonce")] InvalidNonce, - /// This transaction's signature is invalid. + /// Transaction's signature is invalid. #[error("invalid signature")] InvalidSignature, + /// Transaction's content is invalid. + #[error("transaction content is invalid")] + InvalidContent, } /// Data for a signed transaction.