From 7823ece4fe2470b07bae1d854025a4d7aec8756b Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Wed, 26 Jul 2023 05:55:47 -0400 Subject: [PATCH] Test multiple batches, re-attempts, randomized selected signers --- tests/processor/src/lib.rs | 6 +- tests/processor/src/networks.rs | 21 ++- tests/processor/src/tests/batch.rs | 262 ++++++++++++++++++----------- 3 files changed, 181 insertions(+), 108 deletions(-) diff --git a/tests/processor/src/lib.rs b/tests/processor/src/lib.rs index 8ed76d30..d6f0a4ac 100644 --- a/tests/processor/src/lib.rs +++ b/tests/processor/src/lib.rs @@ -252,10 +252,14 @@ impl Coordinator { use monero_serai::rpc::HttpRpc; let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the coordinator's Monero RPC"); - let _: EmptyResponse = rpc + let res: serde_json::Value = rpc .json_rpc_call("submit_block", Some(serde_json::json!([hex::encode(block)]))) .await .unwrap(); + let err = res.get("error"); + if err.is_some() && (err.unwrap() != &serde_json::Value::Null) { + panic!("failed to submit Monero block: {res}"); + } } NetworkId::Serai => panic!("processor tests broadcasting block to Serai"), } diff --git a/tests/processor/src/networks.rs b/tests/processor/src/networks.rs index 4f6d6545..4e4b5d75 100644 --- a/tests/processor/src/networks.rs +++ b/tests/processor/src/networks.rs @@ -228,6 +228,7 @@ impl Wallet { Transaction, }; + const AMOUNT: u64 = 100000000; let mut tx = Transaction { version: 2, lock_time: LockTime::ZERO, @@ -237,13 +238,19 @@ impl Wallet { sequence: Sequence(u32::MAX), witness: Witness::default(), }], - output: vec![TxOut { - value: input_tx.output[0].value - 10000, - script_pubkey: Payload::p2tr_tweaked(TweakedPublicKey::dangerous_assume_tweaked( - XOnlyPublicKey::from_slice(&to[1 ..]).unwrap(), - )) - .script_pubkey(), - }], + output: vec![ + TxOut { + value: input_tx.output[0].value - AMOUNT - 10000, + script_pubkey: input_tx.output[0].script_pubkey.clone(), + }, + TxOut { + value: AMOUNT, + script_pubkey: Payload::p2tr_tweaked(TweakedPublicKey::dangerous_assume_tweaked( + XOnlyPublicKey::from_slice(&to[1 ..]).unwrap(), + )) + .script_pubkey(), + }, + ], }; let mut der = SECP256K1 diff --git a/tests/processor/src/tests/batch.rs b/tests/processor/src/tests/batch.rs index 4bbd2b42..23006a53 100644 --- a/tests/processor/src/tests/batch.rs +++ b/tests/processor/src/tests/batch.rs @@ -2,13 +2,135 @@ use std::collections::HashMap; use dkg::{Participant, tests::clone_without}; +use messages::sign::SignId; + use serai_primitives::{NetworkId, BlockHash, crypto::RuntimePublic, PublicKey}; -use serai_in_instructions_primitives::batch_message; +use serai_in_instructions_primitives::{SignedBatch, batch_message}; use dockertest::DockerTest; use crate::{*, tests::*}; +pub(crate) async fn recv_batch_preprocesses( + coordinators: &mut [Coordinator], + key: [u8; 32], + attempt: u32, +) -> (SignId, HashMap>) { + let mut id = None; + let mut preprocesses = HashMap::new(); + for (i, coordinator) in coordinators.iter_mut().enumerate() { + let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap(); + + let msg = coordinator.recv_message().await; + match msg { + messages::ProcessorMessage::Coordinator( + messages::coordinator::ProcessorMessage::BatchPreprocess { id: this_id, preprocess }, + ) => { + if id.is_none() { + assert_eq!(&this_id.key, &key); + assert_eq!(this_id.attempt, attempt); + id = Some(this_id.clone()); + } + assert_eq!(&this_id, id.as_ref().unwrap()); + + preprocesses.insert(i, preprocess); + } + _ => panic!("processor didn't send batch preprocess"), + } + } + + // Reduce the preprocesses down to the threshold + while preprocesses.len() > THRESHOLD { + preprocesses.remove( + &Participant::new( + u16::try_from(OsRng.next_u64() % u64::try_from(COORDINATORS).unwrap()).unwrap() + 1, + ) + .unwrap(), + ); + } + + (id.unwrap(), preprocesses) +} + +pub(crate) async fn sign_batch( + coordinators: &mut [Coordinator], + id: SignId, + preprocesses: HashMap>, +) -> SignedBatch { + assert_eq!(preprocesses.len(), THRESHOLD); + + for (i, coordinator) in coordinators.iter_mut().enumerate() { + let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap(); + + if preprocesses.contains_key(&i) { + coordinator + .send_message(messages::coordinator::CoordinatorMessage::BatchPreprocesses { + id: id.clone(), + preprocesses: clone_without(&preprocesses, &i), + }) + .await; + } + } + + let mut shares = HashMap::new(); + for (i, coordinator) in coordinators.iter_mut().enumerate() { + let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap(); + + if preprocesses.contains_key(&i) { + match coordinator.recv_message().await { + messages::ProcessorMessage::Coordinator( + messages::coordinator::ProcessorMessage::BatchShare { id: this_id, share }, + ) => { + assert_eq!(&this_id, &id); + shares.insert(i, share); + } + _ => panic!("processor didn't send batch share"), + } + } + } + + for (i, coordinator) in coordinators.iter_mut().enumerate() { + let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap(); + + if preprocesses.contains_key(&i) { + coordinator + .send_message(messages::coordinator::CoordinatorMessage::BatchShares { + id: id.clone(), + shares: clone_without(&shares, &i), + }) + .await; + } + } + + // The selected processors should yield the batch + let mut batch = None; + for (i, coordinator) in coordinators.iter_mut().enumerate() { + let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap(); + + if preprocesses.contains_key(&i) { + match coordinator.recv_message().await { + messages::ProcessorMessage::Substrate(messages::substrate::ProcessorMessage::Update { + key, + batch: this_batch, + }) => { + assert_eq!(&key, &id.key); + + if batch.is_none() { + assert!(PublicKey::from_raw(id.key.clone().try_into().unwrap()) + .verify(&batch_message(&this_batch.batch), &this_batch.signature)); + + batch = Some(this_batch.clone()); + } + + assert_eq!(batch.as_ref().unwrap(), &this_batch); + } + _ => panic!("processor didn't send batch"), + } + } + } + batch.unwrap() +} + #[test] fn batch_test() { for network in [NetworkId::Bitcoin, NetworkId::Monero] { @@ -46,115 +168,55 @@ fn batch_test() { } coordinators[0].sync(&ops, &coordinators[1 ..]).await; - // Send into the processor's wallet - let tx = wallet.send_to_address(&ops, &key_pair.1).await; - for coordinator in &coordinators { - coordinator.publish_transacton(&ops, &tx).await; - } - - // Put the TX past the confirmation depth - let mut block_with_tx = None; - for _ in 0 .. confirmations(network) { - let (hash, _) = coordinators[0].add_block(&ops).await; - if block_with_tx.is_none() { - block_with_tx = Some(hash); + for i in 0 .. 2 { + // Send into the processor's wallet + let tx = wallet.send_to_address(&ops, &key_pair.1).await; + for coordinator in &mut coordinators { + coordinator.publish_transacton(&ops, &tx).await; } - } - coordinators[0].sync(&ops, &coordinators[1 ..]).await; - // Sleep for 10s - // The scanner works on a 5s interval, so this leaves a few s for any processing/latency - tokio::time::sleep(core::time::Duration::from_secs(10)).await; - - // Make sure the proceessors picked it up by checking they're trying to sign a batch for it - let mut id = None; - let mut preprocesses = HashMap::new(); - for (i, coordinator) in coordinators.iter_mut().enumerate() { - // Only use their preprocess if they're within the threshold - let i = if i < THRESHOLD { - Some(Participant::new(u16::try_from(i).unwrap() + 1).unwrap()) - } else { - None - }; - - let msg = coordinator.recv_message().await; - match msg { - messages::ProcessorMessage::Coordinator( - messages::coordinator::ProcessorMessage::BatchPreprocess { id: this_id, preprocess }, - ) => { - assert_eq!(&this_id.key, &key_pair.0 .0); - assert_eq!(this_id.attempt, 0); - - if id.is_none() { - id = Some(this_id.clone()); - } - assert_eq!(&this_id, id.as_ref().unwrap()); - - if let Some(i) = i { - preprocesses.insert(i, preprocess); - } + // Put the TX past the confirmation depth + let mut block_with_tx = None; + for _ in 0 .. confirmations(network) { + let (hash, _) = coordinators[0].add_block(&ops).await; + if block_with_tx.is_none() { + block_with_tx = Some(hash); } - _ => panic!("processor didn't send batch preprocess"), } - } - let id = id.unwrap(); + coordinators[0].sync(&ops, &coordinators[1 ..]).await; - // Continue with batch siging by sending the preprocesses to selected parties - for (i, coordinator) in coordinators.iter_mut().enumerate().take(THRESHOLD) { - let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap(); + // Sleep for 10s + // The scanner works on a 5s interval, so this leaves a few s for any processing/latency + tokio::time::sleep(core::time::Duration::from_secs(10)).await; - coordinator - .send_message(messages::coordinator::CoordinatorMessage::BatchPreprocesses { - id: id.clone(), - preprocesses: clone_without(&preprocesses, &i), - }) - .await; - } - - let mut shares = HashMap::new(); - for (i, coordinator) in coordinators.iter_mut().enumerate().take(THRESHOLD) { - let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap(); - - match coordinator.recv_message().await { - messages::ProcessorMessage::Coordinator( - messages::coordinator::ProcessorMessage::BatchShare { id: this_id, share }, - ) => { - assert_eq!(&this_id, &id); - shares.insert(i, share); + // Make sure the proceessors picked it up by checking they're trying to sign a batch for it + let (mut id, mut preprocesses) = + recv_batch_preprocesses(&mut coordinators, key_pair.0 .0, 0).await; + // Trigger a random amount of re-attempts + for attempt in 1 ..= u32::try_from(OsRng.next_u64() % 4).unwrap() { + // TODO: Double check how the processor handles this ID field + // It should be able to assert its perfectly sequential + id.attempt = attempt; + for coordinator in coordinators.iter_mut() { + coordinator + .send_message(messages::coordinator::CoordinatorMessage::BatchReattempt { + id: id.clone(), + }) + .await; } - _ => panic!("processor didn't send batch share"), + (id, preprocesses) = + recv_batch_preprocesses(&mut coordinators, key_pair.0 .0, attempt).await; } - } - for (i, coordinator) in coordinators.iter_mut().enumerate().take(THRESHOLD) { - let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap(); + // Continue with signing the batch + let batch = sign_batch(&mut coordinators, id, preprocesses).await; - coordinator - .send_message(messages::coordinator::CoordinatorMessage::BatchShares { - id: id.clone(), - shares: clone_without(&shares, &i), - }) - .await; - } - - // The selected processors should yield the batch - for coordinator in coordinators.iter_mut().take(THRESHOLD) { - match coordinator.recv_message().await { - messages::ProcessorMessage::Substrate( - messages::substrate::ProcessorMessage::Update { key, batch }, - ) => { - assert_eq!(&key, &key_pair.0 .0); - - assert_eq!(batch.batch.network, network); - assert_eq!(batch.batch.id, 0); - assert!(PublicKey::from_raw(key_pair.0 .0) - .verify(&batch_message(&batch.batch), &batch.signature)); - assert_eq!(batch.batch.block, BlockHash(block_with_tx.unwrap())); - // This shouldn't have an instruction as we didn't add any data into the TX we sent - assert!(batch.batch.instructions.is_empty()); - } - _ => panic!("processor didn't send batch"), - } + // Check it + assert_eq!(batch.batch.network, network); + assert_eq!(batch.batch.id, i); + assert_eq!(batch.batch.block, BlockHash(block_with_tx.unwrap())); + // This shouldn't have an instruction as we didn't add any data into the TX we sent + assert!(batch.batch.instructions.is_empty()); } }); }