From 88a1fce15c86ae40f67e1adb5aa78d1a4d7a510f Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Tue, 25 Jul 2023 18:09:23 -0400 Subject: [PATCH] Test the processor's batch signing Updates message-queue ot try recv every second, not 5. --- Cargo.lock | 2 + message-queue/src/client.rs | 4 +- processor/src/substrate_signer.rs | 4 +- substrate/primitives/Cargo.toml | 1 + substrate/primitives/src/lib.rs | 2 + tests/processor/Cargo.toml | 1 + tests/processor/src/lib.rs | 26 +++---- tests/processor/src/tests/scan.rs | 118 ++++++++++++++++++++++++++---- 8 files changed, 123 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e05072f9..503527ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8785,6 +8785,7 @@ dependencies = [ "parity-scale-codec", "scale-info", "serde", + "sp-application-crypto", "sp-core", "sp-runtime", "zeroize", @@ -8854,6 +8855,7 @@ dependencies = [ "monero-serai", "rand_core 0.6.4", "serai-docker-tests", + "serai-in-instructions-primitives", "serai-message-queue", "serai-message-queue-tests", "serai-primitives", diff --git a/message-queue/src/client.rs b/message-queue/src/client.rs index 8296da7d..c2831e17 100644 --- a/message-queue/src/client.rs +++ b/message-queue/src/client.rs @@ -153,9 +153,9 @@ impl MessageQueue { ) .expect("next didn't return an Option"); - // If there wasn't a message, check again in 5s + // If there wasn't a message, check again in 1s let Some(msg) = msg else { - tokio::time::sleep(core::time::Duration::from_secs(5)).await; + tokio::time::sleep(core::time::Duration::from_secs(1)).await; continue; }; diff --git a/processor/src/substrate_signer.rs b/processor/src/substrate_signer.rs index 12290ad5..7c23a07a 100644 --- a/processor/src/substrate_signer.rs +++ b/processor/src/substrate_signer.rs @@ -162,7 +162,7 @@ impl SubstrateSigner { self.attempt.insert(id, attempt); let id = SignId { key: self.keys.group_key().to_bytes().to_vec(), id, attempt }; - info!("signing batch {} #{}", hex::encode(id.id), id.attempt); + info!("signing batch {} with attempt #{}", hex::encode(id.id), id.attempt); // If we reboot mid-sign, the current design has us abort all signs and wait for latter // attempts/new signing protocols @@ -312,6 +312,8 @@ impl SubstrateSigner { Err(e) => todo!("malicious signer: {:?}", e), }; + info!("signed batch {} with attempt #{}", hex::encode(id.id), id.attempt); + let batch = SignedBatch { batch: self.signable.remove(&id.id).unwrap(), signature: sig.into() }; diff --git a/substrate/primitives/Cargo.toml b/substrate/primitives/Cargo.toml index e9aa183d..ebdc6c72 100644 --- a/substrate/primitives/Cargo.toml +++ b/substrate/primitives/Cargo.toml @@ -21,6 +21,7 @@ serde = { version = "1", default-features = false, features = ["derive", "alloc" scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["derive"] } scale-info = { version = "2", default-features = false, features = ["derive"] } +sp-application-crypto = { git = "https://github.com/serai-dex/substrate", default-features = false } sp-core = { git = "https://github.com/serai-dex/substrate", default-features = false } sp-runtime = { git = "https://github.com/serai-dex/substrate", default-features = false } diff --git a/substrate/primitives/src/lib.rs b/substrate/primitives/src/lib.rs index a47fb61f..2fc97ff8 100644 --- a/substrate/primitives/src/lib.rs +++ b/substrate/primitives/src/lib.rs @@ -12,6 +12,8 @@ use scale_info::TypeInfo; use sp_core::{ConstU32, bounded::BoundedVec}; +pub use sp_application_crypto as crypto; + mod amount; pub use amount::*; diff --git a/tests/processor/Cargo.toml b/tests/processor/Cargo.toml index e041ea82..1121636b 100644 --- a/tests/processor/Cargo.toml +++ b/tests/processor/Cargo.toml @@ -30,6 +30,7 @@ messages = { package = "serai-processor-messages", path = "../../processor/messa serai-primitives = { path = "../../substrate/primitives" } serai-validator-sets-primitives = { path = "../../substrate/validator-sets/primitives" } +serai-in-instructions-primitives = { path = "../../substrate/in-instructions/primitives" } serai-message-queue = { path = "../../message-queue" } serde = "1" diff --git a/tests/processor/src/lib.rs b/tests/processor/src/lib.rs index c7acff5d..8ed76d30 100644 --- a/tests/processor/src/lib.rs +++ b/tests/processor/src/lib.rs @@ -146,14 +146,15 @@ impl Coordinator { } /// Send a message to a processor as its coordinator. - pub async fn send_message(&mut self, msg: CoordinatorMessage) { + pub async fn send_message(&mut self, msg: impl Into) { + let msg: CoordinatorMessage = msg.into(); self .queue .queue( Metadata { from: Service::Coordinator, to: Service::Processor(self.network), - intent: self.next_send_id.to_le_bytes().to_vec(), + intent: msg.intent(), }, serde_json::to_string(&msg).unwrap().into_bytes(), ) @@ -174,7 +175,7 @@ impl Coordinator { serde_json::from_slice(&msg.msg).unwrap() } - pub async fn add_block(&self, ops: &DockerOperations) -> Vec { + pub async fn add_block(&self, ops: &DockerOperations) -> ([u8; 32], Vec) { let rpc_url = network_rpc(self.network, ops, &self.network_handle); match self.network { NetworkId::Bitcoin => { @@ -193,16 +194,12 @@ impl Coordinator { .await .unwrap(); - // Get it to return it - let block = rpc - .get_block( - &rpc.get_block_hash(rpc.get_latest_block_number().await.unwrap()).await.unwrap(), - ) - .await - .unwrap(); + // Get it so we can return it + let hash = rpc.get_block_hash(rpc.get_latest_block_number().await.unwrap()).await.unwrap(); + let block = rpc.get_block(&hash).await.unwrap(); let mut block_buf = vec![]; block.consensus_encode(&mut block_buf).unwrap(); - block_buf + (hash, block_buf) } NetworkId::Ethereum => todo!(), NetworkId::Monero => { @@ -229,11 +226,8 @@ impl Coordinator { ) .await .unwrap(); - rpc - .get_block(rpc.get_block_hash(rpc.get_height().await.unwrap() - 1).await.unwrap()) - .await - .unwrap() - .serialize() + let hash = rpc.get_block_hash(rpc.get_height().await.unwrap() - 1).await.unwrap(); + (hash, rpc.get_block(hash).await.unwrap().serialize()) } NetworkId::Serai => panic!("processor tests adding block to Serai"), } diff --git a/tests/processor/src/tests/scan.rs b/tests/processor/src/tests/scan.rs index b2e7ad6d..cbfc1b7b 100644 --- a/tests/processor/src/tests/scan.rs +++ b/tests/processor/src/tests/scan.rs @@ -1,4 +1,9 @@ -use serai_primitives::NetworkId; +use std::collections::HashMap; + +use dkg::{Participant, tests::clone_without}; + +use serai_primitives::{NetworkId, BlockHash, crypto::RuntimePublic, PublicKey}; +use serai_in_instructions_primitives::batch_message; use dockertest::DockerTest; @@ -25,7 +30,11 @@ fn scan_test() { .map(|(handles, key)| Coordinator::new(network, &ops, handles, key)) .collect::>(); - // Start by generating keys + // Create a wallet before we start generating keys + let mut wallet = Wallet::new(network, &ops, coordinators[0].network_handle.clone()).await; + coordinators[0].sync(&ops, &coordinators[1 ..]).await; + + // Generate keys let key_pair = key_gen(&mut coordinators, network).await; // Now we we have to mine blocks to activate the key @@ -33,43 +42,120 @@ fn scan_test() { // confirmed at) for _ in 0 .. confirmations(network) { - let block = coordinators[0].add_block(&ops).await; - for coordinator in &coordinators[1 ..] { - coordinator.broadcast_block(&ops, &block).await; - } + coordinators[0].add_block(&ops).await; } + coordinators[0].sync(&ops, &coordinators[1 ..]).await; // Send into the processor's wallet - let mut wallet = Wallet::new(network, &ops, coordinators[0].network_handle.clone()).await; - coordinators[0].sync(&ops, &coordinators[1 ..]).await; let tx = wallet.send_to_address(&ops, &key_pair.1).await; for coordinator in &coordinators { coordinator.publish_transacton(&ops, &tx).await; } // Put the TX past the confirmation depth + let mut block_with_tx = None; for _ in 0 .. confirmations(network) { - let block = coordinators[0].add_block(&ops).await; - for coordinator in &coordinators[1 ..] { - coordinator.broadcast_block(&ops, &block).await; + let (hash, _) = coordinators[0].add_block(&ops).await; + if block_with_tx.is_none() { + block_with_tx = Some(hash); } } + coordinators[0].sync(&ops, &coordinators[1 ..]).await; + // Sleep for 10s + // The scanner works on a 5s interval, so this leaves a few s for any processing/latency tokio::time::sleep(core::time::Duration::from_secs(10)).await; - // Make sure the coordinators picked it up by checking they're trying to sign a batch for it - for coordinator in &mut coordinators { + // Make sure the proceessors picked it up by checking they're trying to sign a batch for it + let mut id = None; + let mut preprocesses = HashMap::new(); + for (i, coordinator) in coordinators.iter_mut().enumerate() { + // Only use their preprocess if they're within the threshold + let i = if i < THRESHOLD { + Some(Participant::new(u16::try_from(i).unwrap() + 1).unwrap()) + } else { + None + }; + let msg = coordinator.recv_message().await; match msg { messages::ProcessorMessage::Coordinator( - messages::coordinator::ProcessorMessage::BatchPreprocess { id, .. }, + messages::coordinator::ProcessorMessage::BatchPreprocess { id: this_id, preprocess }, ) => { - assert_eq!(&id.key, &key_pair.0 .0); - assert_eq!(id.attempt, 0); + assert_eq!(&this_id.key, &key_pair.0 .0); + assert_eq!(this_id.attempt, 0); + + if id.is_none() { + id = Some(this_id.clone()); + } + assert_eq!(&this_id, id.as_ref().unwrap()); + + if let Some(i) = i { + preprocesses.insert(i, preprocess); + } } _ => panic!("processor didn't send batch preprocess"), } } + let id = id.unwrap(); + + // Continue with batch siging by sending the preprocesses to selected parties + for (i, coordinator) in coordinators.iter_mut().enumerate().take(THRESHOLD) { + let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap(); + + coordinator + .send_message(messages::coordinator::CoordinatorMessage::BatchPreprocesses { + id: id.clone(), + preprocesses: clone_without(&preprocesses, &i), + }) + .await; + } + + let mut shares = HashMap::new(); + for (i, coordinator) in coordinators.iter_mut().enumerate().take(THRESHOLD) { + let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap(); + + match coordinator.recv_message().await { + messages::ProcessorMessage::Coordinator( + messages::coordinator::ProcessorMessage::BatchShare { id: this_id, share }, + ) => { + assert_eq!(&this_id, &id); + shares.insert(i, share); + } + _ => panic!("processor didn't send batch share"), + } + } + + for (i, coordinator) in coordinators.iter_mut().enumerate().take(THRESHOLD) { + let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap(); + + coordinator + .send_message(messages::coordinator::CoordinatorMessage::BatchShares { + id: id.clone(), + shares: clone_without(&shares, &i), + }) + .await; + } + + // The selected processors should yield the batch + for coordinator in coordinators.iter_mut().take(THRESHOLD) { + match coordinator.recv_message().await { + messages::ProcessorMessage::Substrate( + messages::substrate::ProcessorMessage::Update { key, batch }, + ) => { + assert_eq!(&key, &key_pair.0 .0); + + assert_eq!(batch.batch.network, network); + assert_eq!(batch.batch.id, 0); + assert!(PublicKey::from_raw(key_pair.0 .0) + .verify(&batch_message(&batch.batch), &batch.signature)); + assert_eq!(batch.batch.block, BlockHash(block_with_tx.unwrap())); + // This shouldn't have an instruction as we didn't add any data into the TX we sent + assert!(batch.batch.instructions.is_empty()); + } + _ => panic!("processor didn't send batch"), + } + } }); } }