From 78d5372fb7c7c6e245593d36ff912237b770d70b Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Tue, 25 Apr 2023 03:14:42 -0400 Subject: [PATCH] Initial code to handle messages from processors --- coordinator/src/main.rs | 136 ++++++++++++++++++++++--- coordinator/src/processor.rs | 4 +- coordinator/src/substrate/mod.rs | 11 +- coordinator/src/tests/tributary/dkg.rs | 15 ++- coordinator/src/tributary/scanner.rs | 4 +- coordinator/tributary/src/lib.rs | 2 +- 6 files changed, 137 insertions(+), 35 deletions(-) diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 65162963..c80ba71d 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -11,6 +11,7 @@ use std::{ }; use zeroize::Zeroizing; +use rand_core::OsRng; use ciphersuite::{group::ff::Field, Ciphersuite, Ristretto}; @@ -22,7 +23,7 @@ use tokio::{sync::RwLock, time::sleep}; use ::tributary::{ReadWrite, Block, Tributary, TributaryReader}; mod tributary; -use crate::tributary::{TributarySpec, Transaction}; +use crate::tributary::{TributarySpec, SignData, Transaction}; mod db; use db::MainDb; @@ -30,6 +31,8 @@ use db::MainDb; mod p2p; pub use p2p::*; +use processor_messages::{key_gen, sign, coordinator, ProcessorMessage}; + pub mod processor; use processor::Processor; @@ -67,7 +70,7 @@ async fn add_tributary( spec: TributarySpec, ) -> TributaryReader { let tributary = Tributary::<_, Transaction, _>::new( - // TODO: Use a db on a distinct volume + // TODO2: Use a db on a distinct volume db, spec.genesis(), spec.start_time(), @@ -91,7 +94,7 @@ async fn add_tributary( pub async fn scan_substrate( db: D, key: Zeroizing<::F>, - mut processor: Pro, + processor: Pro, serai: Serai, ) { let mut db = substrate::SubstrateDb::new(db); @@ -102,13 +105,13 @@ pub async fn scan_substrate( &mut db, &key, create_new_tributary, - &mut processor, + &processor, &serai, &mut last_substrate_block, ) .await { - // TODO: Should this use a notification system for new blocks? + // TODO2: Should this use a notification system for new blocks? // Right now it's sleeping for half the block time. Ok(()) => sleep(Duration::from_secs(3)).await, Err(e) => { @@ -124,7 +127,7 @@ pub async fn scan_tributaries( raw_db: D, key: Zeroizing<::F>, p2p: P, - mut processor: Pro, + processor: Pro, tributaries: Arc>>>, ) { let mut tributary_readers = vec![]; @@ -160,7 +163,7 @@ pub async fn scan_tributaries( tributary::scanner::handle_new_blocks::<_, _>( &mut tributary_db, &key, - &mut processor, + &processor, spec, reader, ) @@ -168,7 +171,7 @@ pub async fn scan_tributaries( } // Sleep for half the block time - // TODO: Should we define a notification system for when a new block occurs? + // TODO2: Should we define a notification system for when a new block occurs? sleep(Duration::from_secs((Tributary::::block_time() / 2).into())).await; } } @@ -221,7 +224,7 @@ pub async fn handle_p2p( } } - // TODO: Rate limit this + // TODO2: Rate limit this P2pMessageKind::Heartbeat(genesis) => { let tributaries = tributaries.read().await; let Some(tributary) = tributaries.get(&genesis) else { @@ -310,6 +313,110 @@ pub async fn handle_p2p( } } +#[allow(clippy::type_complexity)] +pub async fn handle_processors( + key: Zeroizing<::F>, + mut processor: Pro, + tributaries: Arc>>>, +) { + let pub_key = Ristretto::generator() * key.deref(); + + loop { + let msg = processor.recv().await; + + // TODO: We need (ValidatorSet or key) to genesis hash + let genesis = [0; 32]; + + let tx = match msg.msg { + ProcessorMessage::KeyGen(msg) => match msg { + key_gen::ProcessorMessage::Commitments { id, commitments } => { + Some(Transaction::DkgCommitments(id.attempt, commitments, Transaction::empty_signed())) + } + key_gen::ProcessorMessage::Shares { id, shares } => { + Some(Transaction::DkgShares(id.attempt, shares, Transaction::empty_signed())) + } + // TODO + key_gen::ProcessorMessage::GeneratedKeyPair { .. } => todo!(), + }, + ProcessorMessage::Sign(msg) => match msg { + sign::ProcessorMessage::Preprocess { id, preprocess } => { + Some(Transaction::SignPreprocess(SignData { + plan: id.id, + attempt: id.attempt, + data: preprocess, + signed: Transaction::empty_signed(), + })) + } + sign::ProcessorMessage::Share { id, share } => Some(Transaction::SignShare(SignData { + plan: id.id, + attempt: id.attempt, + data: share, + signed: Transaction::empty_signed(), + })), + // TODO + sign::ProcessorMessage::Completed { .. } => todo!(), + }, + ProcessorMessage::Coordinator(msg) => match msg { + // TODO + coordinator::ProcessorMessage::SubstrateBlockAck { .. } => todo!(), + coordinator::ProcessorMessage::BatchPreprocess { id, preprocess } => { + Some(Transaction::BatchPreprocess(SignData { + plan: id.id, + attempt: id.attempt, + data: preprocess, + signed: Transaction::empty_signed(), + })) + } + coordinator::ProcessorMessage::BatchShare { id, share } => { + Some(Transaction::BatchShare(SignData { + plan: id.id, + attempt: id.attempt, + data: share.to_vec(), + signed: Transaction::empty_signed(), + })) + } + }, + ProcessorMessage::Substrate(msg) => match msg { + // TODO + processor_messages::substrate::ProcessorMessage::Update { .. } => todo!(), + }, + }; + + // If this created a transaction, publish it + if let Some(mut tx) = tx { + // Get the next nonce + // let mut txn = db.txn(); + // let nonce = MainDb::tx_nonce(&mut txn, msg.id, tributary); + + let nonce = 0; // TODO + tx.sign(&mut OsRng, genesis, &key, nonce); + + let tributaries = tributaries.read().await; + let Some(tributary) = tributaries.get(&genesis) else { + // TODO: This can happen since Substrate tells the Processor to generate commitments + // at the same time it tells the Tributary to be created + // There's no guarantee the Tributary will have been created though + panic!("processor is operating on tributary we don't have"); + }; + + let tributary = tributary.tributary.read().await; + if tributary + .next_nonce(pub_key) + .await + .expect("we don't have a nonce, meaning we aren't a participant on this tributary") > + nonce + { + log::warn!("we've already published this transaction. this should only appear on reboot"); + } else { + // We should've created a valid transaction + assert!(tributary.add_transaction(tx).await, "created an invalid transaction"); + } + + // txn.commit(); + } + } +} + pub async fn run( raw_db: D, key: Zeroizing<::F>, @@ -344,7 +451,7 @@ pub async fn run( raw_db.clone(), key.clone(), p2p.clone(), - processor, + processor.clone(), tributaries.clone(), )); @@ -353,13 +460,10 @@ pub async fn run( tokio::spawn(heartbeat_tributaries(p2p.clone(), tributaries.clone())); // Handle P2P messages - // TODO: We also have to broadcast blocks once they're added - tokio::spawn(handle_p2p(Ristretto::generator() * key.deref(), p2p, tributaries)); + tokio::spawn(handle_p2p(Ristretto::generator() * key.deref(), p2p, tributaries.clone())); - loop { - // Handle all messages from processors - todo!() - } + // Handle all messages from processors + handle_processors(key, processor, tributaries).await; } #[tokio::main] diff --git a/coordinator/src/processor.rs b/coordinator/src/processor.rs index c27f2681..29ed753a 100644 --- a/coordinator/src/processor.rs +++ b/coordinator/src/processor.rs @@ -12,7 +12,7 @@ pub struct Message { #[async_trait::async_trait] pub trait Processor: 'static + Send + Sync + Clone { - async fn send(&mut self, msg: CoordinatorMessage); + async fn send(&self, msg: CoordinatorMessage); async fn recv(&mut self) -> Message; async fn ack(&mut self, msg: Message); } @@ -29,7 +29,7 @@ impl MemProcessor { #[async_trait::async_trait] impl Processor for MemProcessor { - async fn send(&mut self, msg: CoordinatorMessage) { + async fn send(&self, msg: CoordinatorMessage) { self.0.write().await.push_back(msg) } async fn recv(&mut self) -> Message { diff --git a/coordinator/src/substrate/mod.rs b/coordinator/src/substrate/mod.rs index 5ddf4c74..7045f522 100644 --- a/coordinator/src/substrate/mod.rs +++ b/coordinator/src/substrate/mod.rs @@ -47,7 +47,7 @@ async fn handle_new_set< db: &D, key: &Zeroizing<::F>, add_new_tributary: ANT, - processor: &mut Pro, + processor: &Pro, serai: &Serai, block: &Block, set: ValidatorSet, @@ -85,7 +85,7 @@ async fn handle_new_set< async fn handle_key_gen( key: &Zeroizing<::F>, - processor: &mut Pro, + processor: &Pro, serai: &Serai, block: &Block, set: ValidatorSet, @@ -116,7 +116,7 @@ async fn handle_key_gen( } async fn handle_batch_and_burns( - processor: &mut Pro, + processor: &Pro, serai: &Serai, block: &Block, ) -> Result<(), SeraiError> { @@ -189,6 +189,7 @@ async fn handle_batch_and_burns( serai_time: block.time().unwrap(), coin_latest_finalized_block, }, + network, block: block.number(), key: serai .get_keys(ValidatorSet { network, session: Session(0) }) // TODO2 @@ -215,7 +216,7 @@ async fn handle_block< db: &mut SubstrateDb, key: &Zeroizing<::F>, add_new_tributary: ANT, - processor: &mut Pro, + processor: &Pro, serai: &Serai, block: Block, ) -> Result<(), SeraiError> { @@ -283,7 +284,7 @@ pub async fn handle_new_blocks< db: &mut SubstrateDb, key: &Zeroizing<::F>, add_new_tributary: ANT, - processor: &mut Pro, + processor: &Pro, serai: &Serai, last_block: &mut u64, ) -> Result<(), SeraiError> { diff --git a/coordinator/src/tests/tributary/dkg.rs b/coordinator/src/tests/tributary/dkg.rs index 230bfbf7..0c4e8a99 100644 --- a/coordinator/src/tests/tributary/dkg.rs +++ b/coordinator/src/tests/tributary/dkg.rs @@ -80,13 +80,13 @@ async fn dkg_test() { tributary: &Tributary, ) -> (TributaryDb, MemProcessor) { let mut scanner_db = TributaryDb(MemDb::new()); - let mut processor = MemProcessor::new(); - handle_new_blocks(&mut scanner_db, key, &mut processor, spec, &tributary.reader()).await; + let processor = MemProcessor::new(); + handle_new_blocks(&mut scanner_db, key, &processor, spec, &tributary.reader()).await; (scanner_db, processor) } // Instantiate a scanner and verify it has nothing to report - let (mut scanner_db, mut processor) = new_processor(&keys[0], &spec, &tributaries[0].1).await; + let (mut scanner_db, processor) = new_processor(&keys[0], &spec, &tributaries[0].1).await; assert!(processor.0.read().await.is_empty()); // Publish the last commitment @@ -96,8 +96,7 @@ async fn dkg_test() { sleep(Duration::from_secs(Tributary::::block_time().into())).await; // Verify the scanner emits a KeyGen::Commitments message - handle_new_blocks(&mut scanner_db, &keys[0], &mut processor, &spec, &tributaries[0].1.reader()) - .await; + handle_new_blocks(&mut scanner_db, &keys[0], &processor, &spec, &tributaries[0].1.reader()).await; { let mut msgs = processor.0.write().await; assert_eq!(msgs.pop_front().unwrap(), expected_commitments); @@ -138,8 +137,7 @@ async fn dkg_test() { } // With just 4 sets of shares, nothing should happen yet - handle_new_blocks(&mut scanner_db, &keys[0], &mut processor, &spec, &tributaries[0].1.reader()) - .await; + handle_new_blocks(&mut scanner_db, &keys[0], &processor, &spec, &tributaries[0].1.reader()).await; assert!(processor.0.write().await.is_empty()); // Publish the final set of shares @@ -170,8 +168,7 @@ async fn dkg_test() { }; // Any scanner which has handled the prior blocks should only emit the new event - handle_new_blocks(&mut scanner_db, &keys[0], &mut processor, &spec, &tributaries[0].1.reader()) - .await; + handle_new_blocks(&mut scanner_db, &keys[0], &processor, &spec, &tributaries[0].1.reader()).await; { let mut msgs = processor.0.write().await; assert_eq!(msgs.pop_front().unwrap(), shares_for(0)); diff --git a/coordinator/src/tributary/scanner.rs b/coordinator/src/tributary/scanner.rs index 2d7877ec..f5cab36d 100644 --- a/coordinator/src/tributary/scanner.rs +++ b/coordinator/src/tributary/scanner.rs @@ -25,7 +25,7 @@ use crate::{ async fn handle_block( db: &mut TributaryDb, key: &Zeroizing<::F>, - processor: &mut Pro, + processor: &Pro, spec: &TributarySpec, block: Block, ) { @@ -285,7 +285,7 @@ async fn handle_block( pub async fn handle_new_blocks( db: &mut TributaryDb, key: &Zeroizing<::F>, - processor: &mut Pro, + processor: &Pro, spec: &TributarySpec, tributary: &TributaryReader, ) { diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index 7b1e5bff..9a82aa4f 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -164,7 +164,7 @@ impl Tributary { // Returns if the transaction was valid. // Safe to be &self since the only meaningful usage of self is self.network.blockchain which - // successfully acquires its own write lock. + // successfully acquires its own write lock pub async fn add_transaction(&self, tx: T) -> bool { let mut to_broadcast = vec![TRANSACTION_MESSAGE]; tx.write(&mut to_broadcast).unwrap();