From cee788eac39d985df240e732aca5c9693067e0d4 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sun, 6 Aug 2023 12:38:44 -0400 Subject: [PATCH] Test the Coordinator emits KeyGen Mainly just a test that the full stack is properly set up and we've hit basic functioning for further testing. --- Cargo.lock | 2 + coordinator/src/main.rs | 24 ++++-- coordinator/src/substrate/mod.rs | 25 +++++- processor/src/main.rs | 7 +- tests/coordinator/Cargo.toml | 8 +- tests/coordinator/src/lib.rs | 132 ++++++++++++++++++++++++++--- tests/coordinator/src/tests/mod.rs | 61 ++++++++++--- 7 files changed, 224 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 73cd816c..ed3f73fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7975,6 +7975,7 @@ name = "serai-coordinator-tests" version = "0.1.0" dependencies = [ "ciphersuite", + "dkg", "dockertest", "hex", "serai-client", @@ -7985,6 +7986,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "zeroize", ] [[package]] diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index d04bd059..fee2eb1b 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -9,10 +9,10 @@ use std::{ collections::{VecDeque, HashMap}, }; -use zeroize::Zeroizing; +use zeroize::{Zeroize, Zeroizing}; use rand_core::OsRng; -use ciphersuite::{group::ff::Field, Ciphersuite, Ristretto}; +use ciphersuite::{group::ff::PrimeField, Ciphersuite, Ristretto}; use serai_db::{DbTxn, Db}; use serai_env as env; @@ -684,17 +684,31 @@ async fn main() { let db = serai_db::new_rocksdb(&env::var("DB_PATH").expect("path to DB wasn't specified")); - let key = Zeroizing::new(::F::ZERO); // TODO + let key = { + let mut key_hex = serai_env::var("SERAI_KEY").expect("Serai key wasn't provided"); + let mut key_vec = hex::decode(&key_hex).map_err(|_| ()).expect("Serai key wasn't hex-encoded"); + key_hex.zeroize(); + if key_vec.len() != 32 { + key_vec.zeroize(); + panic!("Serai key had an invalid length"); + } + let mut key_bytes = [0; 32]; + key_bytes.copy_from_slice(&key_vec); + key_vec.zeroize(); + let key = Zeroizing::new(::F::from_repr(key_bytes).unwrap()); + key_bytes.zeroize(); + key + }; let p2p = LocalP2p::new(1).swap_remove(0); // TODO let processors = Arc::new(MessageQueue::from_env(Service::Coordinator)); let serai = || async { loop { - let Ok(serai) = Serai::new(&dbg!(format!( + let Ok(serai) = Serai::new(&format!( "ws://{}:9944", serai_env::var("SERAI_HOSTNAME").expect("Serai hostname wasn't provided") - ))) + )) .await else { log::error!("couldn't connect to the Serai node"); diff --git a/coordinator/src/substrate/mod.rs b/coordinator/src/substrate/mod.rs index 819d9616..32689fdf 100644 --- a/coordinator/src/substrate/mod.rs +++ b/coordinator/src/substrate/mod.rs @@ -1,4 +1,4 @@ -use core::{ops::Deref, future::Future}; +use core::{ops::Deref, time::Duration, future::Future}; use std::collections::{HashSet, HashMap}; use zeroize::Zeroizing; @@ -21,6 +21,8 @@ use serai_db::DbTxn; use processor_messages::{SubstrateContext, key_gen::KeyGenId, CoordinatorMessage}; +use tokio::time::sleep; + use crate::{Db, processors::Processors, tributary::TributarySpec}; mod db; @@ -53,9 +55,24 @@ async fn handle_new_set< set: ValidatorSet, ) -> Result<(), SeraiError> { if in_set(key, serai, set).await?.expect("NewSet for set which doesn't exist") { + log::info!("present in set {:?}", set); + let set_data = serai.get_validator_set(set).await?.expect("NewSet for set which doesn't exist"); - let spec = TributarySpec::new(block.hash(), block.time().unwrap(), set, set_data); + let time = if let Ok(time) = block.time() { + time + } else { + assert_eq!(block.number(), 0); + // Use the next block's time + loop { + let Ok(Some(res)) = serai.get_block_by_number(1).await else { + sleep(Duration::from_secs(5)).await; + continue; + }; + break res.time().unwrap(); + } + }; + let spec = TributarySpec::new(block.hash(), time, set, set_data); create_new_tributary(db, spec.clone()); // Trigger a DKG @@ -79,6 +96,8 @@ async fn handle_new_set< }), ) .await; + } else { + log::info!("not present in set {:?}", set); } Ok(()) @@ -241,6 +260,7 @@ async fn handle_block< // stable) if !SubstrateDb::::handled_event(&db.0, hash, event_id) { if let ValidatorSetsEvent::NewSet { set } = new_set { + log::info!("found fresh new set event {:?}", new_set); handle_new_set( &mut db.0, key, @@ -264,6 +284,7 @@ async fn handle_block< // If a key pair was confirmed, inform the processor for key_gen in serai.get_key_gen_events(hash).await? { if !SubstrateDb::::handled_event(&db.0, hash, event_id) { + log::info!("found fresh key gen event {:?}", key_gen); if let ValidatorSetsEvent::KeyGen { set, key_pair } = key_gen { handle_key_gen(key, processors, serai, &block, set, key_pair).await?; } else { diff --git a/processor/src/main.rs b/processor/src/main.rs index e78fc9ff..37cc7ce0 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -471,7 +471,12 @@ async fn boot( if entropy.len() != 64 { panic!("entropy isn't the right length"); } - let bytes = Zeroizing::new(hex::decode(entropy).expect("entropy wasn't hex-formatted")); + let mut bytes = + Zeroizing::new(hex::decode(entropy).map_err(|_| ()).expect("entropy wasn't hex-formatted")); + if bytes.len() != 32 { + bytes.zeroize(); + panic!("entropy wasn't 32 bytes"); + } let mut entropy = Zeroizing::new([0; 32]); let entropy_mut: &mut [u8] = entropy.as_mut(); entropy_mut.copy_from_slice(bytes.as_ref()); diff --git a/tests/coordinator/Cargo.toml b/tests/coordinator/Cargo.toml index 45317670..0e94e9aa 100644 --- a/tests/coordinator/Cargo.toml +++ b/tests/coordinator/Cargo.toml @@ -16,13 +16,19 @@ rustdoc-args = ["--cfg", "docsrs"] [dependencies] hex = "0.4" +zeroize = { version = "1", default-features = false } + ciphersuite = { path = "../../crypto/ciphersuite", default-features = false, features = ["ristretto"] } +dkg = { path = "../../crypto/dkg", default-features = false, features = ["tests"] } messages = { package = "serai-processor-messages", path = "../../processor/messages" } -serai-client = { path = "../../substrate/client" } +serai-client = { path = "../../substrate/client", features = ["serai"] } serai-message-queue = { path = "../../message-queue" } +serde = { version = "1", default-features = false } +serde_json = { version = "1", default-features = false } + tokio = { version = "1", features = ["time"] } dockertest = "0.3" diff --git a/tests/coordinator/src/lib.rs b/tests/coordinator/src/lib.rs index 52b6b815..49d0ff26 100644 --- a/tests/coordinator/src/lib.rs +++ b/tests/coordinator/src/lib.rs @@ -2,12 +2,18 @@ use std::sync::{OnceLock, Mutex}; +use zeroize::Zeroizing; + use ciphersuite::{group::ff::PrimeField, Ciphersuite, Ristretto}; use serai_client::primitives::NetworkId; +use messages::{CoordinatorMessage, ProcessorMessage}; +use serai_message_queue::{Service, Metadata, client::MessageQueue}; + use dockertest::{ PullPolicy, Image, LogAction, LogPolicy, LogSource, LogOptions, StartPolicy, Composition, + DockerOperations, }; #[cfg(test)] @@ -15,7 +21,10 @@ mod tests; static UNIQUE_ID: OnceLock> = OnceLock::new(); -pub fn coordinator_instance(message_queue_key: ::F) -> Composition { +pub fn coordinator_instance( + name: &str, + message_queue_key: ::F, +) -> Composition { serai_docker_tests::build("coordinator".to_string()); Composition::with_image( @@ -25,6 +34,10 @@ pub fn coordinator_instance(message_queue_key: ::F) -> [ ("MESSAGE_QUEUE_KEY".to_string(), hex::encode(message_queue_key.to_repr())), ("DB_PATH".to_string(), "./coordinator-db".to_string()), + ("SERAI_KEY".to_string(), { + use serai_client::primitives::insecure_pair_from_name; + hex::encode(insecure_pair_from_name(name).as_ref().secret.to_bytes()[.. 32].as_ref()) + }), ] .into(), ) @@ -33,16 +46,20 @@ pub fn coordinator_instance(message_queue_key: ::F) -> pub fn serai_composition(name: &str) -> Composition { serai_docker_tests::build("serai".to_string()); - Composition::with_image(Image::with_repository("serai-dev-serai").pull_policy(PullPolicy::Never)) - .with_cmd(vec![ - "serai-node".to_string(), - "--unsafe-rpc-external".to_string(), - "--rpc-cors".to_string(), - "all".to_string(), - "--chain".to_string(), - "devnet".to_string(), - format!("--{name}"), - ]) + let mut composition = Composition::with_image( + Image::with_repository("serai-dev-serai").pull_policy(PullPolicy::Never), + ) + .with_cmd(vec![ + "serai-node".to_string(), + "--unsafe-rpc-external".to_string(), + "--rpc-cors".to_string(), + "all".to_string(), + "--chain".to_string(), + "local".to_string(), + format!("--{}", name.to_lowercase()), + ]); + composition.publish_all_ports(); + composition } pub type Handles = (String, String, String); @@ -52,7 +69,7 @@ pub fn coordinator_stack(name: &str) -> (Handles, ::F, let (coord_key, message_queue_keys, message_queue_composition) = serai_message_queue_tests::instance(); - let coordinator_composition = coordinator_instance(message_queue_keys[&NetworkId::Bitcoin]); + let coordinator_composition = coordinator_instance(name, coord_key); // Give every item in this stack a unique ID // Uses a Mutex as we can't generate a 8-byte random ID without hitting hostname length limits @@ -91,7 +108,96 @@ pub fn coordinator_stack(name: &str) -> (Handles, ::F, ( (compositions[0].handle(), compositions[1].handle(), compositions[2].handle()), - coord_key, + message_queue_keys[&NetworkId::Bitcoin], compositions, ) } + +pub struct Processor { + network: NetworkId, + + #[allow(unused)] + serai_handle: String, + #[allow(unused)] + message_queue_handle: String, + #[allow(unused)] + coordinator_handle: String, + + next_send_id: u64, + next_recv_id: u64, + queue: MessageQueue, +} + +impl Processor { + pub async fn new( + network: NetworkId, + ops: &DockerOperations, + handles: (String, String, String), + processor_key: ::F, + ) -> Processor { + let message_queue_rpc = ops.handle(&handles.1).host_port(2287).unwrap(); + let message_queue_rpc = format!("{}:{}", message_queue_rpc.0, message_queue_rpc.1); + + // Sleep until the Substrate RPC starts + let serai_rpc = ops.handle(&handles.0).host_port(9944).unwrap(); + let serai_rpc = format!("ws://{}:{}", serai_rpc.0, serai_rpc.1); + // Bound execution to 60 seconds + for _ in 0 .. 60 { + tokio::time::sleep(core::time::Duration::from_secs(1)).await; + let Ok(client) = serai_client::Serai::new(&serai_rpc).await else { continue }; + if client.get_latest_block_hash().await.is_err() { + continue; + } + break; + } + + // The Serai RPC may or may not be started + // Assume it is and continue, so if it's a few seconds late, it's still within tolerance + + Processor { + network, + + serai_handle: handles.0, + message_queue_handle: handles.1, + coordinator_handle: handles.2, + + next_send_id: 0, + next_recv_id: 0, + queue: MessageQueue::new( + Service::Processor(network), + message_queue_rpc, + Zeroizing::new(processor_key), + ), + } + } + + /// Send a message to a processor as its coordinator. + pub async fn send_message(&mut self, msg: impl Into) { + let msg: ProcessorMessage = msg.into(); + self + .queue + .queue( + Metadata { + from: Service::Processor(self.network), + to: Service::Coordinator, + intent: msg.intent(), + }, + serde_json::to_string(&msg).unwrap().into_bytes(), + ) + .await; + self.next_send_id += 1; + } + + /// Receive a message from a processor as its coordinator. + pub async fn recv_message(&mut self) -> CoordinatorMessage { + let msg = + tokio::time::timeout(core::time::Duration::from_secs(10), self.queue.next(self.next_recv_id)) + .await + .unwrap(); + assert_eq!(msg.from, Service::Coordinator); + assert_eq!(msg.id, self.next_recv_id); + self.queue.ack(self.next_recv_id).await; + self.next_recv_id += 1; + serde_json::from_slice(&msg.msg).unwrap() + } +} diff --git a/tests/coordinator/src/tests/mod.rs b/tests/coordinator/src/tests/mod.rs index ce57fe73..57e3d7e9 100644 --- a/tests/coordinator/src/tests/mod.rs +++ b/tests/coordinator/src/tests/mod.rs @@ -1,12 +1,19 @@ use std::time::Duration; use ciphersuite::{Ciphersuite, Ristretto}; +use dkg::{Participant, ThresholdParams}; + +use serai_client::{ + primitives::NetworkId, + validator_sets::primitives::{Session, ValidatorSet}, +}; +use messages::{key_gen::KeyGenId, CoordinatorMessage}; use dockertest::DockerTest; use crate::*; -pub(crate) const COORDINATORS: usize = 4; +pub(crate) const COORDINATORS: usize = 3; // pub(crate) const THRESHOLD: usize = ((COORDINATORS * 2) / 3) + 1; fn new_test() -> (Vec<(Handles, ::F)>, DockerTest) { @@ -14,12 +21,12 @@ fn new_test() -> (Vec<(Handles, ::F)>, DockerTest) { let mut test = DockerTest::new(); for i in 0 .. COORDINATORS { let (handles, coord_key, compositions) = coordinator_stack(match i { - 0 => "alice", - 1 => "bob", - 2 => "charlie", - 3 => "dave", - 4 => "eve", - 5 => "ferdie", + 0 => "Alice", + 1 => "Bob", + 2 => "Charlie", + 3 => "Dave", + 4 => "Eve", + 5 => "Ferdie", _ => panic!("needed a 6th name for a serai node"), }); coordinators.push((handles, coord_key)); @@ -30,11 +37,39 @@ fn new_test() -> (Vec<(Handles, ::F)>, DockerTest) { (coordinators, test) } -#[test] -fn stack_test() { - let (_coordinators, test) = new_test(); +#[tokio::test] +async fn stack_test() { + let (processors, test) = new_test(); - test.run(|_ops| async move { - tokio::time::sleep(Duration::from_secs(30)).await; - }); + test + .run_async(|ops| async move { + // Wait for the Serai node to boot + tokio::time::sleep(Duration::from_secs(30)).await; + + // Connect to the Message Queues as the processor + let mut new_processors: Vec = vec![]; + for (handles, key) in processors { + new_processors.push(Processor::new(NetworkId::Bitcoin, &ops, handles, key).await); + } + let mut processors = new_processors; + + for (i, processor) in processors.iter_mut().enumerate() { + assert_eq!( + processor.recv_message().await, + CoordinatorMessage::KeyGen(messages::key_gen::CoordinatorMessage::GenerateKey { + id: KeyGenId { + set: ValidatorSet { session: Session(0), network: NetworkId::Bitcoin }, + attempt: 0 + }, + params: ThresholdParams::new( + 3, + 3, + Participant::new(u16::try_from(i).unwrap() + 1).unwrap() + ) + .unwrap() + }) + ); + } + }) + .await; }