diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 9a20ab16..00d7bdca 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -42,8 +42,8 @@ pub use p2p::*; use processor_messages::{key_gen, sign, coordinator, ProcessorMessage}; -pub mod processor; -use processor::Processor; +pub mod processors; +use processors::Processors; mod substrate; @@ -90,10 +90,10 @@ async fn add_tributary( reader } -pub async fn scan_substrate( +pub async fn scan_substrate( db: D, key: Zeroizing<::F>, - processor: Pro, + processors: Pro, serai: Serai, ) { let mut db = substrate::SubstrateDb::new(db); @@ -114,7 +114,7 @@ pub async fn scan_substrate( NEW_TRIBUTARIES.write().await.push_back(spec); } }, - &processor, + &processors, &serai, &mut last_substrate_block, ) @@ -132,12 +132,12 @@ pub async fn scan_substrate( } #[allow(clippy::type_complexity)] -pub async fn scan_tributaries( +pub async fn scan_tributaries( raw_db: D, key: Zeroizing<::F>, recognized_id_send: UnboundedSender<([u8; 32], RecognizedIdType, [u8; 32])>, p2p: P, - processor: Pro, + processors: Pro, tributaries: Arc>>>, ) { let mut tributary_readers = vec![]; @@ -174,7 +174,7 @@ pub async fn scan_tributaries( &mut tributary_db, &key, &recognized_id_send, - &processor, + &processors, spec, reader, ) @@ -359,18 +359,18 @@ pub async fn publish_transaction( } #[allow(clippy::type_complexity)] -pub async fn handle_processors( +pub async fn handle_processors( mut db: D, key: Zeroizing<::F>, - mut processor: Pro, + mut processors: Pro, tributaries: Arc>>>, ) { let pub_key = Ristretto::generator() * key.deref(); loop { - let msg = processor.recv().await; + let msg = processors.recv().await; - // TODO: We need (ValidatorSet or key) to genesis hash + // TODO: Get genesis hash by msg.network let genesis = [0; 32]; let tx = match msg.msg { @@ -410,9 +410,12 @@ pub async fn handle_processors( // TODO sign::ProcessorMessage::Completed { .. } => todo!(), }, - ProcessorMessage::Coordinator(msg) => match msg { - coordinator::ProcessorMessage::SubstrateBlockAck { network: _, block, plans } => { - // TODO2: Check this network aligns with this processor + ProcessorMessage::Coordinator(inner_msg) => match inner_msg { + coordinator::ProcessorMessage::SubstrateBlockAck { network, block, plans } => { + assert_eq!( + msg.network, network, + "processor claimed to be a different network than it was", + ); // Safe to use its own txn since this is static and just needs to be written before we // provide SubstrateBlock @@ -494,15 +497,15 @@ pub async fn handle_processors( } } -pub async fn run( +pub async fn run( mut raw_db: D, key: Zeroizing<::F>, p2p: P, - processor: Pro, + processors: Pro, serai: Serai, ) { // Handle new Substrate blocks - tokio::spawn(scan_substrate(raw_db.clone(), key.clone(), processor.clone(), serai.clone())); + tokio::spawn(scan_substrate(raw_db.clone(), key.clone(), processors.clone(), serai.clone())); // Handle the Tributaries @@ -531,7 +534,7 @@ pub async fn run( key.clone(), recognized_id_send, p2p.clone(), - processor.clone(), + processors.clone(), tributaries.clone(), )); } @@ -587,7 +590,7 @@ pub async fn run( tokio::spawn(handle_p2p(Ristretto::generator() * key.deref(), p2p, tributaries.clone())); // Handle all messages from processors - handle_processors(raw_db, key, processor, tributaries).await; + handle_processors(raw_db, key, processors, tributaries).await; } #[tokio::main] @@ -597,7 +600,7 @@ async fn main() { let key = Zeroizing::new(::F::ZERO); // TODO let p2p = LocalP2p::new(1).swap_remove(0); // TODO - let processor = processor::MemProcessor::new(); // TODO + let processors = processors::MemProcessors::new(); // TODO let serai = || async { loop { @@ -609,5 +612,5 @@ async fn main() { return serai; } }; - run(db, key, p2p, processor, serai().await).await + run(db, key, p2p, processors, serai().await).await } diff --git a/coordinator/src/processor.rs b/coordinator/src/processor.rs deleted file mode 100644 index 29ed753a..00000000 --- a/coordinator/src/processor.rs +++ /dev/null @@ -1,41 +0,0 @@ -use std::{sync::Arc, collections::VecDeque}; - -use tokio::sync::RwLock; - -use processor_messages::{ProcessorMessage, CoordinatorMessage}; - -#[derive(Clone, PartialEq, Eq, Debug)] -pub struct Message { - pub id: u64, - pub msg: ProcessorMessage, -} - -#[async_trait::async_trait] -pub trait Processor: 'static + Send + Sync + Clone { - async fn send(&self, msg: CoordinatorMessage); - async fn recv(&mut self) -> Message; - async fn ack(&mut self, msg: Message); -} - -// TODO: Move this to tests -#[derive(Clone)] -pub struct MemProcessor(pub Arc>>); -impl MemProcessor { - #[allow(clippy::new_without_default)] - pub fn new() -> MemProcessor { - MemProcessor(Arc::new(RwLock::new(VecDeque::new()))) - } -} - -#[async_trait::async_trait] -impl Processor for MemProcessor { - async fn send(&self, msg: CoordinatorMessage) { - self.0.write().await.push_back(msg) - } - async fn recv(&mut self) -> Message { - todo!() - } - async fn ack(&mut self, _: Message) { - todo!() - } -} diff --git a/coordinator/src/processors.rs b/coordinator/src/processors.rs new file mode 100644 index 00000000..3b03cb1d --- /dev/null +++ b/coordinator/src/processors.rs @@ -0,0 +1,49 @@ +use std::{ + sync::Arc, + collections::{VecDeque, HashMap}, +}; + +use tokio::sync::RwLock; + +use serai_client::primitives::NetworkId; + +use processor_messages::{ProcessorMessage, CoordinatorMessage}; + +#[derive(Clone, PartialEq, Eq, Debug)] +pub struct Message { + pub id: u64, + pub network: NetworkId, + pub msg: ProcessorMessage, +} + +#[async_trait::async_trait] +pub trait Processors: 'static + Send + Sync + Clone { + async fn send(&self, network: NetworkId, msg: CoordinatorMessage); + async fn recv(&mut self) -> Message; + async fn ack(&mut self, msg: Message); +} + +// TODO: Move this to tests +#[derive(Clone)] +pub struct MemProcessors(pub Arc>>>); +impl MemProcessors { + #[allow(clippy::new_without_default)] + pub fn new() -> MemProcessors { + MemProcessors(Arc::new(RwLock::new(HashMap::new()))) + } +} + +#[async_trait::async_trait] +impl Processors for MemProcessors { + async fn send(&self, network: NetworkId, msg: CoordinatorMessage) { + let mut processors = self.0.write().await; + let processor = processors.entry(network).or_insert_with(VecDeque::new); + processor.push_back(msg); + } + async fn recv(&mut self) -> Message { + todo!() + } + async fn ack(&mut self, _: Message) { + todo!() + } +} diff --git a/coordinator/src/substrate/mod.rs b/coordinator/src/substrate/mod.rs index cccc8d66..db00eff6 100644 --- a/coordinator/src/substrate/mod.rs +++ b/coordinator/src/substrate/mod.rs @@ -21,7 +21,7 @@ use serai_db::DbTxn; use processor_messages::{SubstrateContext, key_gen::KeyGenId, CoordinatorMessage}; -use crate::{Db, processor::Processor, tributary::TributarySpec}; +use crate::{Db, processors::Processors, tributary::TributarySpec}; mod db; pub use db::*; @@ -42,12 +42,12 @@ async fn handle_new_set< D: Db, Fut: Future, CNT: Clone + Fn(&mut D, TributarySpec) -> Fut, - Pro: Processor, + Pro: Processors, >( db: &mut D, key: &Zeroizing<::F>, create_new_tributary: CNT, - processor: &Pro, + processors: &Pro, serai: &Serai, block: &Block, set: ValidatorSet, @@ -63,9 +63,10 @@ async fn handle_new_set< // We already have a unique event ID based on block, event index (where event index is // the one generated in this handle_block function) // We could use that on this end and the processor end? - processor - .send(CoordinatorMessage::KeyGen( - processor_messages::key_gen::CoordinatorMessage::GenerateKey { + processors + .send( + set.network, + CoordinatorMessage::KeyGen(processor_messages::key_gen::CoordinatorMessage::GenerateKey { id: KeyGenId { set, attempt: 0 }, params: ThresholdParams::new( spec.t(), @@ -75,17 +76,18 @@ async fn handle_new_set< .expect("In set for a set we aren't in set for"), ) .unwrap(), - }, - )) + }), + ) .await; } Ok(()) } -async fn handle_key_gen( +async fn handle_key_gen( + db: &mut D, key: &Zeroizing<::F>, - processor: &Pro, + processors: &Pro, serai: &Serai, block: &Block, set: ValidatorSet, @@ -93,30 +95,33 @@ async fn handle_key_gen( ) -> Result<(), SeraiError> { if in_set(key, serai, set).await?.expect("KeyGen occurred for a set which doesn't exist") { // TODO: Check how the processor handles this being fired multiple times - processor - .send(CoordinatorMessage::Substrate( - processor_messages::substrate::CoordinatorMessage::ConfirmKeyPair { - context: SubstrateContext { - serai_time: block.time().unwrap(), - coin_latest_finalized_block: serai - .get_latest_block_for_network(block.hash(), set.network) - .await? - // The processor treats this as a magic value which will cause it to find a network - // block which has a time greater than or equal to the Serai time - .unwrap_or(BlockHash([0; 32])), + processors + .send( + set.network, + CoordinatorMessage::Substrate( + processor_messages::substrate::CoordinatorMessage::ConfirmKeyPair { + context: SubstrateContext { + serai_time: block.time().unwrap(), + coin_latest_finalized_block: serai + .get_latest_block_for_network(block.hash(), set.network) + .await? + // The processor treats this as a magic value which will cause it to find a network + // block which has a time greater than or equal to the Serai time + .unwrap_or(BlockHash([0; 32])), + }, + set, + key_pair, }, - set, - key_pair, - }, - )) + ), + ) .await; } Ok(()) } -async fn handle_batch_and_burns( - processor: &Pro, +async fn handle_batch_and_burns( + processors: &Pro, serai: &Serai, block: &Block, ) -> Result<(), SeraiError> { @@ -182,23 +187,26 @@ async fn handle_batch_and_burns( }; // TODO: Check how the processor handles this being fired multiple times - processor - .send(CoordinatorMessage::Substrate( - processor_messages::substrate::CoordinatorMessage::SubstrateBlock { - context: SubstrateContext { - serai_time: block.time().unwrap(), - coin_latest_finalized_block, + processors + .send( + network, + CoordinatorMessage::Substrate( + processor_messages::substrate::CoordinatorMessage::SubstrateBlock { + context: SubstrateContext { + serai_time: block.time().unwrap(), + coin_latest_finalized_block, + }, + network, + block: block.number(), + key: serai + .get_keys(ValidatorSet { network, session: Session(0) }) // TODO2 + .await? + .map(|keys| keys.1.into_inner()) + .expect("batch/burn for network which never set keys"), + burns: burns.remove(&network).unwrap(), }, - network, - block: block.number(), - key: serai - .get_keys(ValidatorSet { network, session: Session(0) }) // TODO2 - .await? - .map(|keys| keys.1.into_inner()) - .expect("batch/burn for network which never set keys"), - burns: burns.remove(&network).unwrap(), - }, - )) + ), + ) .await; } @@ -211,12 +219,12 @@ async fn handle_block< D: Db, Fut: Future, CNT: Clone + Fn(&mut D, TributarySpec) -> Fut, - Pro: Processor, + Pro: Processors, >( db: &mut SubstrateDb, key: &Zeroizing<::F>, create_new_tributary: CNT, - processor: &Pro, + processors: &Pro, serai: &Serai, block: Block, ) -> Result<(), SeraiError> { @@ -233,8 +241,16 @@ async fn handle_block< // stable) if !SubstrateDb::::handled_event(&db.0, hash, event_id) { if let ValidatorSetsEvent::NewSet { set } = new_set { - handle_new_set(&mut db.0, key, create_new_tributary.clone(), processor, serai, &block, set) - .await?; + handle_new_set( + &mut db.0, + key, + create_new_tributary.clone(), + processors, + serai, + &block, + set, + ) + .await?; } else { panic!("NewSet event wasn't NewSet: {new_set:?}"); } @@ -249,7 +265,7 @@ async fn handle_block< for key_gen in serai.get_key_gen_events(hash).await? { if !SubstrateDb::::handled_event(&db.0, hash, event_id) { if let ValidatorSetsEvent::KeyGen { set, key_pair } = key_gen { - handle_key_gen(key, processor, serai, &block, set, key_pair).await?; + handle_key_gen(&mut db.0, key, processors, serai, &block, set, key_pair).await?; } else { panic!("KeyGen event wasn't KeyGen: {key_gen:?}"); } @@ -266,7 +282,7 @@ async fn handle_block< // This does break the uniqueness of (hash, event_id) -> one event, yet // (network, (hash, event_id)) remains valid as a unique ID for an event if !SubstrateDb::::handled_event(&db.0, hash, event_id) { - handle_batch_and_burns(processor, serai, &block).await?; + handle_batch_and_burns(processors, serai, &block).await?; } let mut txn = db.0.txn(); SubstrateDb::::handle_event(&mut txn, hash, event_id); @@ -279,12 +295,12 @@ pub async fn handle_new_blocks< D: Db, Fut: Future, CNT: Clone + Fn(&mut D, TributarySpec) -> Fut, - Pro: Processor, + Pro: Processors, >( db: &mut SubstrateDb, key: &Zeroizing<::F>, create_new_tributary: CNT, - processor: &Pro, + processors: &Pro, serai: &Serai, last_block: &mut u64, ) -> Result<(), SeraiError> { @@ -301,7 +317,7 @@ pub async fn handle_new_blocks< db, key, create_new_tributary.clone(), - processor, + processors, serai, if b == latest_number { latest.take().unwrap() diff --git a/coordinator/src/tests/tributary/dkg.rs b/coordinator/src/tests/tributary/dkg.rs index bb6e49f7..85c085cd 100644 --- a/coordinator/src/tests/tributary/dkg.rs +++ b/coordinator/src/tests/tributary/dkg.rs @@ -19,7 +19,7 @@ use processor_messages::{ use tributary::{Transaction as TransactionTrait, Tributary}; use crate::{ - processor::MemProcessor, + processors::MemProcessors, LocalP2p, tributary::{TributaryDb, Transaction, TributarySpec, scanner::handle_new_blocks}, tests::tributary::{new_keys, new_spec, new_tributaries, run_tributaries, wait_for_tx_inclusion}, @@ -74,29 +74,29 @@ async fn dkg_test() { .collect(), }); - async fn new_processor( + async fn new_processors( key: &Zeroizing<::F>, spec: &TributarySpec, tributary: &Tributary, - ) -> (TributaryDb, MemProcessor) { + ) -> (TributaryDb, MemProcessors) { let mut scanner_db = TributaryDb(MemDb::new()); - let processor = MemProcessor::new(); + let processors = MemProcessors::new(); // Uses a brand new channel since this channel won't be used within this test handle_new_blocks( &mut scanner_db, key, &mpsc::unbounded_channel().0, - &processor, + &processors, spec, &tributary.reader(), ) .await; - (scanner_db, processor) + (scanner_db, processors) } // Instantiate a scanner and verify it has nothing to report - let (mut scanner_db, processor) = new_processor(&keys[0], &spec, &tributaries[0].1).await; - assert!(processor.0.read().await.is_empty()); + let (mut scanner_db, processors) = new_processors(&keys[0], &spec, &tributaries[0].1).await; + assert!(processors.0.read().await.is_empty()); // Publish the last commitment let block_before_tx = tributaries[0].1.tip().await; @@ -109,21 +109,25 @@ async fn dkg_test() { &mut scanner_db, &keys[0], &mpsc::unbounded_channel().0, - &processor, + &processors, &spec, &tributaries[0].1.reader(), ) .await; { - let mut msgs = processor.0.write().await; + let mut msgs = processors.0.write().await; + assert_eq!(msgs.len(), 1); + let msgs = msgs.get_mut(&spec.set().network).unwrap(); assert_eq!(msgs.pop_front().unwrap(), expected_commitments); assert!(msgs.is_empty()); } // Verify all keys exhibit this scanner behavior for (i, key) in keys.iter().enumerate() { - let (_, processor) = new_processor(key, &spec, &tributaries[i].1).await; - let mut msgs = processor.0.write().await; + let (_, processors) = new_processors(key, &spec, &tributaries[i].1).await; + let mut msgs = processors.0.write().await; + assert_eq!(msgs.len(), 1); + let msgs = msgs.get_mut(&spec.set().network).unwrap(); assert_eq!(msgs.pop_front().unwrap(), expected_commitments); assert!(msgs.is_empty()); } @@ -158,12 +162,13 @@ async fn dkg_test() { &mut scanner_db, &keys[0], &mpsc::unbounded_channel().0, - &processor, + &processors, &spec, &tributaries[0].1.reader(), ) .await; - assert!(processor.0.write().await.is_empty()); + assert_eq!(processors.0.read().await.len(), 1); + assert!(processors.0.read().await[&spec.set().network].is_empty()); // Publish the final set of shares let block_before_tx = tributaries[0].1.tip().await; @@ -197,21 +202,25 @@ async fn dkg_test() { &mut scanner_db, &keys[0], &mpsc::unbounded_channel().0, - &processor, + &processors, &spec, &tributaries[0].1.reader(), ) .await; { - let mut msgs = processor.0.write().await; + let mut msgs = processors.0.write().await; + assert_eq!(msgs.len(), 1); + let msgs = msgs.get_mut(&spec.set().network).unwrap(); assert_eq!(msgs.pop_front().unwrap(), shares_for(0)); assert!(msgs.is_empty()); } // Yet new scanners should emit all events for (i, key) in keys.iter().enumerate() { - let (_, processor) = new_processor(key, &spec, &tributaries[i].1).await; - let mut msgs = processor.0.write().await; + let (_, processors) = new_processors(key, &spec, &tributaries[i].1).await; + let mut msgs = processors.0.write().await; + assert_eq!(msgs.len(), 1); + let msgs = msgs.get_mut(&spec.set().network).unwrap(); assert_eq!(msgs.pop_front().unwrap(), expected_commitments); assert_eq!(msgs.pop_front().unwrap(), shares_for(i)); assert!(msgs.is_empty()); diff --git a/coordinator/src/tributary/scanner.rs b/coordinator/src/tributary/scanner.rs index 2db25be7..204c2a0d 100644 --- a/coordinator/src/tributary/scanner.rs +++ b/coordinator/src/tributary/scanner.rs @@ -19,7 +19,7 @@ use serai_db::DbTxn; use crate::{ Db, - processor::Processor, + processors::Processors, tributary::{TributaryDb, TributarySpec, Transaction}, }; @@ -30,11 +30,11 @@ pub enum RecognizedIdType { } // Handle a specific Tributary block -async fn handle_block( +async fn handle_block( db: &mut TributaryDb, key: &Zeroizing<::F>, recognized_id: &UnboundedSender<([u8; 32], RecognizedIdType, [u8; 32])>, - processor: &Pro, + processors: &Pro, spec: &TributarySpec, block: Block, ) { @@ -144,11 +144,14 @@ async fn handle_block( if let Some(commitments) = handle(Zone::Dkg, b"dkg_commitments", spec.n(), [0; 32], attempt, bytes, signed) { - processor - .send(CoordinatorMessage::KeyGen(key_gen::CoordinatorMessage::Commitments { - id: KeyGenId { set: spec.set(), attempt }, - commitments, - })) + processors + .send( + spec.set().network, + CoordinatorMessage::KeyGen(key_gen::CoordinatorMessage::Commitments { + id: KeyGenId { set: spec.set(), attempt }, + commitments, + }), + ) .await; } } @@ -170,11 +173,14 @@ async fn handle_block( if let Some(shares) = handle(Zone::Dkg, b"dkg_shares", spec.n(), [0; 32], attempt, bytes, signed) { - processor - .send(CoordinatorMessage::KeyGen(key_gen::CoordinatorMessage::Shares { - id: KeyGenId { set: spec.set(), attempt }, - shares, - })) + processors + .send( + spec.set().network, + CoordinatorMessage::KeyGen(key_gen::CoordinatorMessage::Shares { + id: KeyGenId { set: spec.set(), attempt }, + shares, + }), + ) .await; } } @@ -211,13 +217,16 @@ async fn handle_block( data.data, data.signed, ) { - processor - .send(CoordinatorMessage::Coordinator( - coordinator::CoordinatorMessage::BatchPreprocesses { - id: SignId { key: todo!(), id: data.plan, attempt: data.attempt }, - preprocesses, - }, - )) + processors + .send( + spec.set().network, + CoordinatorMessage::Coordinator( + coordinator::CoordinatorMessage::BatchPreprocesses { + id: SignId { key: todo!(), id: data.plan, attempt: data.attempt }, + preprocesses, + }, + ), + ) .await; } } @@ -231,14 +240,17 @@ async fn handle_block( data.data, data.signed, ) { - processor - .send(CoordinatorMessage::Coordinator(coordinator::CoordinatorMessage::BatchShares { - id: SignId { key: todo!(), id: data.plan, attempt: data.attempt }, - shares: shares - .drain() - .map(|(validator, share)| (validator, share.try_into().unwrap())) - .collect(), - })) + processors + .send( + spec.set().network, + CoordinatorMessage::Coordinator(coordinator::CoordinatorMessage::BatchShares { + id: SignId { key: todo!(), id: data.plan, attempt: data.attempt }, + shares: shares + .drain() + .map(|(validator, share)| (validator, share.try_into().unwrap())) + .collect(), + }), + ) .await; } } @@ -253,11 +265,14 @@ async fn handle_block( data.data, data.signed, ) { - processor - .send(CoordinatorMessage::Sign(sign::CoordinatorMessage::Preprocesses { - id: SignId { key: todo!(), id: data.plan, attempt: data.attempt }, - preprocesses, - })) + processors + .send( + spec.set().network, + CoordinatorMessage::Sign(sign::CoordinatorMessage::Preprocesses { + id: SignId { key: todo!(), id: data.plan, attempt: data.attempt }, + preprocesses, + }), + ) .await; } } @@ -271,11 +286,14 @@ async fn handle_block( data.data, data.signed, ) { - processor - .send(CoordinatorMessage::Sign(sign::CoordinatorMessage::Shares { - id: SignId { key: todo!(), id: data.plan, attempt: data.attempt }, - shares, - })) + processors + .send( + spec.set().network, + CoordinatorMessage::Sign(sign::CoordinatorMessage::Shares { + id: SignId { key: todo!(), id: data.plan, attempt: data.attempt }, + shares, + }), + ) .await; } } @@ -290,11 +308,11 @@ async fn handle_block( // TODO: Trigger any necessary re-attempts } -pub async fn handle_new_blocks( +pub async fn handle_new_blocks( db: &mut TributaryDb, key: &Zeroizing<::F>, recognized_id: &UnboundedSender<([u8; 32], RecognizedIdType, [u8; 32])>, - processor: &Pro, + processors: &Pro, spec: &TributarySpec, tributary: &TributaryReader, ) { @@ -302,7 +320,7 @@ pub async fn handle_new_blocks( let mut last_block = db.last_block(genesis); while let Some(next) = tributary.block_after(&last_block) { let block = tributary.block(&next).unwrap(); - handle_block(db, key, recognized_id, processor, spec, block).await; + handle_block(db, key, recognized_id, processors, spec, block).await; last_block = next; db.set_last_block(genesis, next); }