From d07447fe9704b9ceb2e8416c4b6080b658ac4c3c Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sat, 22 Jul 2023 04:04:17 -0400 Subject: [PATCH] Implement an (almost) full Key Gen test for processor's Docker tests It doesn't confirm the key pair yet. Adds the infra neded to test processors against each other. --- Cargo.lock | 5 + tests/docker/src/lib.rs | 2 + tests/message-queue/Cargo.toml | 1 + tests/message-queue/src/lib.rs | 18 +-- tests/processor/Cargo.toml | 3 +- tests/processor/src/lib.rs | 216 +++++++++++++++++-------------- tests/processor/src/tests/mod.rs | 190 +++++++++++++++++++++++++++ 7 files changed, 325 insertions(+), 110 deletions(-) create mode 100644 tests/processor/src/tests/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 0795297f..1ed302be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8739,6 +8739,9 @@ dependencies = [ [[package]] name = "serai-docker-tests" version = "0.1.0" +dependencies = [ + "chrono", +] [[package]] name = "serai-env" @@ -8813,6 +8816,7 @@ dependencies = [ "serai-message-queue", "serai-primitives", "tokio", + "zeroize", ] [[package]] @@ -8953,6 +8957,7 @@ dependencies = [ "serai-validator-sets-primitives", "serde_json", "tokio", + "zeroize", ] [[package]] diff --git a/tests/docker/src/lib.rs b/tests/docker/src/lib.rs index c6fb6fc0..1c9eb04c 100644 --- a/tests/docker/src/lib.rs +++ b/tests/docker/src/lib.rs @@ -92,6 +92,8 @@ pub fn build(name: String) { } } } else { + // Recursively crawl since we care when the folder's contents were edited, not the folder + // itself for entry in fs::read_dir(path.clone()).expect("couldn't read directory") { metadatas .push(meta(path.join(entry.expect("couldn't access item in directory").file_name()))); diff --git a/tests/message-queue/Cargo.toml b/tests/message-queue/Cargo.toml index 71bb7ffa..c07b7c3c 100644 --- a/tests/message-queue/Cargo.toml +++ b/tests/message-queue/Cargo.toml @@ -16,6 +16,7 @@ rustdoc-args = ["--cfg", "docsrs"] [dependencies] hex = "0.4" +zeroize = "1" rand_core = "0.6" ciphersuite = { path = "../../crypto/ciphersuite", features = ["ristretto"] } diff --git a/tests/message-queue/src/lib.rs b/tests/message-queue/src/lib.rs index 2b1a03a4..923eb1e3 100644 --- a/tests/message-queue/src/lib.rs +++ b/tests/message-queue/src/lib.rs @@ -57,9 +57,7 @@ pub fn instance( #[test] fn basic_functionality() { - use std::env; - - use ciphersuite::group::ff::PrimeField; + use zeroize::Zeroizing; use dockertest::DockerTest; @@ -74,12 +72,11 @@ fn basic_functionality() { tokio::time::sleep(core::time::Duration::from_secs(1)).await; let rpc = ops.handle("serai-dev-message-queue").host_port(2287).unwrap(); - // TODO: Add new to MessageQueue to avoid needing to use set_var - env::set_var("MESSAGE_QUEUE_RPC", rpc.0.to_string() + ":" + &rpc.1.to_string()); - env::set_var("MESSAGE_QUEUE_KEY", hex::encode(coord_key.to_repr())); + let rpc = rpc.0.to_string() + ":" + &rpc.1.to_string(); // Queue some messages - let coordinator = MessageQueue::from_env(Service::Coordinator); + let coordinator = + MessageQueue::new(Service::Coordinator, rpc.clone(), Zeroizing::new(coord_key)); coordinator .queue( Metadata { @@ -103,8 +100,11 @@ fn basic_functionality() { .await; // Successfully get it - env::set_var("MESSAGE_QUEUE_KEY", hex::encode(priv_keys[&NetworkId::Bitcoin].to_repr())); - let bitcoin = MessageQueue::from_env(Service::Processor(NetworkId::Bitcoin)); + let bitcoin = MessageQueue::new( + Service::Processor(NetworkId::Bitcoin), + rpc, + Zeroizing::new(priv_keys[&NetworkId::Bitcoin]), + ); let msg = bitcoin.next(0).await; assert_eq!(msg.from, Service::Coordinator); assert_eq!(msg.id, 0); diff --git a/tests/processor/Cargo.toml b/tests/processor/Cargo.toml index a42457c8..af2cd7bb 100644 --- a/tests/processor/Cargo.toml +++ b/tests/processor/Cargo.toml @@ -16,10 +16,11 @@ rustdoc-args = ["--cfg", "docsrs"] [dependencies] hex = "0.4" +zeroize = "1" rand_core = "0.6" ciphersuite = { path = "../../crypto/ciphersuite", features = ["ristretto"] } -dkg = { path = "../../crypto/dkg" } +dkg = { path = "../../crypto/dkg", features = ["tests"] } messages = { package = "serai-processor-messages", path = "../../processor/messages" } diff --git a/tests/processor/src/lib.rs b/tests/processor/src/lib.rs index 71dfa7b9..6fc9d677 100644 --- a/tests/processor/src/lib.rs +++ b/tests/processor/src/lib.rs @@ -1,36 +1,88 @@ +use std::sync::{OnceLock, Mutex}; + use rand_core::{RngCore, OsRng}; use ciphersuite::{group::ff::PrimeField, Ciphersuite, Ristretto}; +use serai_primitives::NetworkId; + use dockertest::{ PullPolicy, Image, LogAction, LogPolicy, LogSource, LogOptions, StartPolicy, Composition, }; -pub fn bitcoin_instance() -> Composition { - serai_docker_tests::build("bitcoin".to_string()); +const RPC_USER: &str = "serai"; +const RPC_PASS: &str = "seraidex"; - Composition::with_image( - Image::with_repository("serai-dev-bitcoin").pull_policy(PullPolicy::Never), - ) - .with_log_options(Some(LogOptions { +static UNIQUE_ID: OnceLock> = OnceLock::new(); + +fn log_options() -> Option { + Some(LogOptions { action: LogAction::Forward, policy: LogPolicy::Always, source: LogSource::Both, - })) - .with_cmd(vec![ - "bitcoind".to_string(), - "-txindex".to_string(), - "-regtest".to_string(), - "-rpcuser=serai".to_string(), - "-rpcpassword=seraidex".to_string(), - "-rpcbind=0.0.0.0".to_string(), - "-rpcallowip=0.0.0.0/0".to_string(), - "-rpcport=8332".to_string(), - ]) - .with_start_policy(StartPolicy::Strict) + }) } -pub fn instance(message_queue_key: ::F) -> Composition { +pub fn bitcoin_instance() -> (Composition, u16) { + serai_docker_tests::build("bitcoin".to_string()); + + ( + Composition::with_image( + Image::with_repository("serai-dev-bitcoin").pull_policy(PullPolicy::Never), + ) + .with_cmd(vec![ + "bitcoind".to_string(), + "-txindex".to_string(), + "-regtest".to_string(), + format!("-rpcuser={RPC_USER}"), + format!("-rpcpassword={RPC_PASS}"), + "-rpcbind=0.0.0.0".to_string(), + "-rpcallowip=0.0.0.0/0".to_string(), + "-rpcport=8332".to_string(), + ]), + 8332, + ) +} + +pub fn monero_instance() -> (Composition, u16) { + serai_docker_tests::build("monero".to_string()); + + ( + Composition::with_image( + Image::with_repository("serai-dev-monero").pull_policy(PullPolicy::Never), + ) + .with_cmd(vec![ + "monerod".to_string(), + "--regtest".to_string(), + "--offline".to_string(), + "--fixed-difficulty=1".to_string(), + "--rpc-bind-ip=0.0.0.0".to_string(), + format!("--rpc-login={RPC_USER}:{RPC_PASS}"), + "--rpc-access-control-origins=*".to_string(), + "--confirm-external-bind".to_string(), + "--non-interactive".to_string(), + ]) + .with_start_policy(StartPolicy::Strict), + 18081, + ) +} + +pub fn network_instance(network: NetworkId) -> (Composition, u16) { + match network { + NetworkId::Bitcoin => bitcoin_instance(), + NetworkId::Ethereum => todo!(), + NetworkId::Monero => monero_instance(), + NetworkId::Serai => { + panic!("Serai is not a valid network to spawn an instance of for a processor") + } + } +} + +pub fn processor_instance( + network: NetworkId, + port: u16, + message_queue_key: ::F, +) -> Composition { serai_docker_tests::build("processor".to_string()); let mut entropy = [0; 32]; @@ -39,104 +91,68 @@ pub fn instance(message_queue_key: ::F) -> Composition Composition::with_image( Image::with_repository("serai-dev-processor").pull_policy(PullPolicy::Never), ) - .with_log_options(Some(LogOptions { - action: LogAction::Forward, - policy: LogPolicy::Always, - source: LogSource::Both, - })) .with_env( [ ("MESSAGE_QUEUE_KEY".to_string(), hex::encode(message_queue_key.to_repr())), ("ENTROPY".to_string(), hex::encode(entropy)), - ("NETWORK".to_string(), "bitcoin".to_string()), - ("NETWORK_RPC_LOGIN".to_string(), "serai:seraidex".to_string()), - ("NETWORK_RPC_PORT".to_string(), "8332".to_string()), + ( + "NETWORK".to_string(), + (match network { + NetworkId::Serai => panic!("starting a processor for Serai"), + NetworkId::Bitcoin => "bitcoin", + NetworkId::Ethereum => "ethereum", + NetworkId::Monero => "monero", + }) + .to_string(), + ), + ("NETWORK_RPC_LOGIN".to_string(), format!("{RPC_USER}:{RPC_PASS}")), + ("NETWORK_RPC_PORT".to_string(), port.to_string()), ("DB_PATH".to_string(), "./processor-db".to_string()), ] .into(), ) - .with_start_policy(StartPolicy::Strict) } -#[test] -fn basic_functionality() { - use std::env; - - use serai_primitives::NetworkId; - use serai_validator_sets_primitives::{Session, ValidatorSet}; - - use serai_message_queue::{Service, Metadata, client::MessageQueue}; - - use dockertest::DockerTest; - - let bitcoin_composition = bitcoin_instance(); +pub fn processor_stack( + network: NetworkId, +) -> (String, ::F, Vec) { + let (network_composition, network_rpc_port) = network_instance(network); let (coord_key, message_queue_keys, message_queue_composition) = serai_message_queue_tests::instance(); - let message_queue_composition = message_queue_composition.with_start_policy(StartPolicy::Strict); - let mut processor_composition = instance(message_queue_keys[&NetworkId::Bitcoin]); - processor_composition.inject_container_name(bitcoin_composition.handle(), "NETWORK_RPC_HOSTNAME"); - processor_composition - .inject_container_name(message_queue_composition.handle(), "MESSAGE_QUEUE_RPC"); + let processor_composition = + processor_instance(network, network_rpc_port, message_queue_keys[&network]); - let mut test = DockerTest::new(); - test.add_composition(bitcoin_composition); - test.add_composition(message_queue_composition); - test.add_composition(processor_composition); + // Give every item in this stack a unique ID + // Uses a Mutex as we can't generate a 8-byte random ID without hitting hostname length limits + let unique_id = { + let unique_id_mutex = UNIQUE_ID.get_or_init(|| Mutex::new(0)); + let mut unique_id_lock = unique_id_mutex.lock().unwrap(); + let unique_id = hex::encode(unique_id_lock.to_be_bytes()); + *unique_id_lock += 1; + unique_id + }; - test.run(|ops| async move { - // Sleep for 10 seconds to be polite and let things boot - tokio::time::sleep(core::time::Duration::from_secs(10)).await; - - // Connect to the Message Queue as the coordinator - let rpc = ops.handle("serai-dev-message-queue").host_port(2287).unwrap(); - // TODO: MessageQueue::new - env::set_var( - "MESSAGE_QUEUE_RPC", - "http://".to_string() + &rpc.0.to_string() + ":" + &rpc.1.to_string(), + let mut compositions = vec![]; + let mut handles = vec![]; + for composition in [network_composition, message_queue_composition, processor_composition] { + let handle = composition.handle(); + compositions.push( + composition + .with_start_policy(StartPolicy::Strict) + .with_container_name(format!("{handle}-{}", &unique_id)) + .with_log_options(log_options()), ); - env::set_var("MESSAGE_QUEUE_KEY", hex::encode(coord_key.to_repr())); - let coordinator = MessageQueue::from_env(Service::Coordinator); + handles.push(compositions.last().unwrap().handle()); + } - // Order a key gen - let id = messages::key_gen::KeyGenId { - set: ValidatorSet { session: Session(0), network: NetworkId::Bitcoin }, - attempt: 0, - }; + let processor_composition = compositions.last_mut().unwrap(); + processor_composition.inject_container_name(handles.remove(0), "NETWORK_RPC_HOSTNAME"); + processor_composition.inject_container_name(handles.remove(0), "MESSAGE_QUEUE_RPC"); - coordinator - .queue( - Metadata { - from: Service::Coordinator, - to: Service::Processor(NetworkId::Bitcoin), - intent: b"key_gen_0".to_vec(), - }, - serde_json::to_string(&messages::CoordinatorMessage::KeyGen( - messages::key_gen::CoordinatorMessage::GenerateKey { - id, - params: dkg::ThresholdParams::new(3, 4, dkg::Participant::new(1).unwrap()).unwrap(), - }, - )) - .unwrap() - .into_bytes(), - ) - .await; - - // Read the created commitments - let msg = coordinator.next(0).await; - assert_eq!(msg.from, Service::Processor(NetworkId::Bitcoin)); - assert_eq!(msg.id, 0); - let msg: messages::ProcessorMessage = serde_json::from_slice(&msg.msg).unwrap(); - match msg { - messages::ProcessorMessage::KeyGen(messages::key_gen::ProcessorMessage::Commitments { - id: this_id, - commitments: _, - }) => { - assert_eq!(this_id, id); - } - _ => panic!("processor didn't return Commitments in response to GenerateKey"), - } - coordinator.ack(0).await; - }); + (compositions[1].handle(), coord_key, compositions) } + +#[cfg(test)] +mod tests; diff --git a/tests/processor/src/tests/mod.rs b/tests/processor/src/tests/mod.rs new file mode 100644 index 00000000..d5be3bb2 --- /dev/null +++ b/tests/processor/src/tests/mod.rs @@ -0,0 +1,190 @@ +use std::collections::HashMap; + +use zeroize::Zeroizing; + +use ciphersuite::{Ciphersuite, Ristretto}; +use dkg::{Participant, ThresholdParams, tests::clone_without}; + +use serai_primitives::NetworkId; +use serai_validator_sets_primitives::{Session, ValidatorSet}; + +use serai_message_queue::{Service, Metadata, client::MessageQueue}; + +use dockertest::{DockerOperations, DockerTest}; + +use crate::*; + +const COORDINATORS: usize = 4; +const THRESHOLD: usize = ((COORDINATORS * 2) / 3) + 1; + +fn coordinator_queue( + ops: &DockerOperations, + handle: String, + coord_key: ::F, +) -> MessageQueue { + let rpc = ops.handle(&handle).host_port(2287).unwrap(); + let rpc = rpc.0.to_string() + ":" + &rpc.1.to_string(); + MessageQueue::new(Service::Coordinator, rpc, Zeroizing::new(coord_key)) +} + +// Receive a message from a processor via its coordinator +async fn recv_message( + coordinator: &MessageQueue, + from: NetworkId, + id: u64, +) -> messages::ProcessorMessage { + let msg = + tokio::time::timeout(core::time::Duration::from_secs(10), coordinator.next(id)).await.unwrap(); + assert_eq!(msg.from, Service::Processor(from)); + assert_eq!(msg.id, id); + coordinator.ack(id).await; + serde_json::from_slice(&msg.msg).unwrap() +} + +// Perform an interaction with all processors via their coordinators +async fn interact_with_all< + FS: Fn(Participant) -> messages::key_gen::CoordinatorMessage, + FR: FnMut(Participant, messages::key_gen::ProcessorMessage), +>( + id: u64, + coordinators: &[MessageQueue], + network: NetworkId, + message: FS, + mut recv: FR, +) { + for (i, coordinator) in coordinators.iter().enumerate() { + let participant = Participant::new(u16::try_from(i + 1).unwrap()).unwrap(); + coordinator + .queue( + Metadata { + from: Service::Coordinator, + to: Service::Processor(network), + intent: id.to_le_bytes().to_vec(), + }, + serde_json::to_string(&messages::CoordinatorMessage::KeyGen(message(participant))) + .unwrap() + .into_bytes(), + ) + .await; + + match recv_message(coordinator, network, id).await { + messages::ProcessorMessage::KeyGen(msg) => recv(participant, msg), + _ => panic!("processor didn't return KeyGen message"), + } + } +} + +#[test] +fn key_gen() { + for network in [NetworkId::Bitcoin, NetworkId::Monero] { + let mut coordinators = vec![]; + let mut test = DockerTest::new(); + for _ in 0 .. COORDINATORS { + let (coord_handle, coord_key, compositions) = processor_stack(network); + coordinators.push((coord_handle, coord_key)); + for composition in compositions { + test.add_composition(composition); + } + } + + test.run(|ops| async move { + // Sleep for a second for the message-queue to boot + // It isn't an error to start immediately, it just silences an error + tokio::time::sleep(core::time::Duration::from_secs(1)).await; + + // Connect to the Message Queues as the coordinator + let coordinators = coordinators + .into_iter() + .map(|(handle, key)| coordinator_queue(&ops, handle, key)) + .collect::>(); + + // Order a key gen + let id = messages::key_gen::KeyGenId { + set: ValidatorSet { session: Session(0), network }, + attempt: 0, + }; + + let mut commitments = HashMap::new(); + interact_with_all( + 0, + &coordinators, + network, + |participant| messages::key_gen::CoordinatorMessage::GenerateKey { + id, + params: ThresholdParams::new( + u16::try_from(THRESHOLD).unwrap(), + u16::try_from(COORDINATORS).unwrap(), + participant, + ) + .unwrap(), + }, + |participant, msg| match msg { + messages::key_gen::ProcessorMessage::Commitments { + id: this_id, + commitments: these_commitments, + } => { + assert_eq!(this_id, id); + commitments.insert(participant, these_commitments); + } + _ => panic!("processor didn't return Commitments in response to GenerateKey"), + }, + ) + .await; + + // Send the commitments to all parties + let mut shares = HashMap::new(); + interact_with_all( + 1, + &coordinators, + network, + |participant| messages::key_gen::CoordinatorMessage::Commitments { + id, + commitments: clone_without(&commitments, &participant), + }, + |participant, msg| match msg { + messages::key_gen::ProcessorMessage::Shares { id: this_id, shares: these_shares } => { + assert_eq!(this_id, id); + shares.insert(participant, these_shares); + } + _ => panic!("processor didn't return Shares in response to GenerateKey"), + }, + ) + .await; + + // Send the shares + let mut substrate_key = None; + let mut coin_key = None; + interact_with_all( + 2, + &coordinators, + network, + |participant| messages::key_gen::CoordinatorMessage::Shares { + id, + shares: shares + .iter() + .filter_map(|(this_participant, shares)| { + shares.get(&participant).cloned().map(|share| (*this_participant, share)) + }) + .collect(), + }, + |_, msg| match msg { + messages::key_gen::ProcessorMessage::GeneratedKeyPair { + id: this_id, + substrate_key: this_substrate_key, + coin_key: this_coin_key, + } => { + assert_eq!(this_id, id); + if substrate_key.is_none() { + substrate_key = Some(this_substrate_key); + coin_key = Some(this_coin_key.clone()); + } + assert_eq!(substrate_key.unwrap(), this_substrate_key); + assert_eq!(coin_key.as_ref().unwrap(), &this_coin_key); + } + _ => panic!("processor didn't return GeneratedKeyPair in response to GenerateKey"), + }, + ) + .await; + }); + } +}