diff --git a/Cargo.lock b/Cargo.lock index 03c83dfa..9aab1e10 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8191,14 +8191,12 @@ dependencies = [ "monero-serai", "parity-scale-codec", "rand_core 0.6.4", + "serai-client", "serai-docker-tests", - "serai-in-instructions-primitives", "serai-message-queue", "serai-message-queue-tests", - "serai-primitives", "serai-processor", "serai-processor-messages", - "serai-validator-sets-primitives", "serde", "serde_json", "tokio", diff --git a/tests/processor/Cargo.toml b/tests/processor/Cargo.toml index 2563de23..39cd0dca 100644 --- a/tests/processor/Cargo.toml +++ b/tests/processor/Cargo.toml @@ -29,9 +29,7 @@ monero-serai = { path = "../../coins/monero" } messages = { package = "serai-processor-messages", path = "../../processor/messages" } scale = { package = "parity-scale-codec", version = "3" } -serai-primitives = { path = "../../substrate/primitives" } -serai-validator-sets-primitives = { path = "../../substrate/validator-sets/primitives" } -serai-in-instructions-primitives = { path = "../../substrate/in-instructions/primitives" } +serai-client = { path = "../../substrate/client" } serai-message-queue = { path = "../../message-queue" } serde = { version = "1", default-features = false } diff --git a/tests/processor/src/lib.rs b/tests/processor/src/lib.rs index d6f0a4ac..070ea2fb 100644 --- a/tests/processor/src/lib.rs +++ b/tests/processor/src/lib.rs @@ -5,7 +5,7 @@ use rand_core::{RngCore, OsRng}; use ciphersuite::{group::ff::PrimeField, Ciphersuite, Ristretto}; -use serai_primitives::NetworkId; +use serai_client::primitives::NetworkId; use messages::{ProcessorMessage, CoordinatorMessage}; use serai_message_queue::{Service, Metadata, client::MessageQueue}; @@ -57,9 +57,10 @@ pub fn processor_instance( ) } +pub type Handles = (String, String, String); pub fn processor_stack( network: NetworkId, -) -> ((String, String, String), ::F, Vec) { +) -> (Handles, ::F, Vec) { let (network_composition, network_rpc_port) = network_instance(network); let (coord_key, message_queue_keys, message_queue_composition) = diff --git a/tests/processor/src/networks.rs b/tests/processor/src/networks.rs index 57ccdec2..47514109 100644 --- a/tests/processor/src/networks.rs +++ b/tests/processor/src/networks.rs @@ -5,9 +5,11 @@ use rand_core::{RngCore, OsRng}; use scale::Encode; -use serai_primitives::{NetworkId, Amount}; -use serai_validator_sets_primitives::ExternalKey; -use serai_in_instructions_primitives::{InInstruction, RefundableInInstruction, Shorthand}; +use serai_client::{ + primitives::{Amount, NetworkId, Coin, Balance, ExternalAddress}, + validator_sets::primitives::ExternalKey, + in_instructions::primitives::{InInstruction, RefundableInInstruction, Shorthand}, +}; use dockertest::{PullPolicy, Image, StartPolicy, Composition, DockerOperations}; @@ -221,7 +223,7 @@ impl Wallet { ops: &DockerOperations, to: &ExternalKey, instruction: Option, - ) -> (Vec, Amount) { + ) -> (Vec, Balance) { match self { Wallet::Bitcoin { private_key, public_key, ref mut input_tx } => { use bitcoin_serai::bitcoin::{ @@ -298,7 +300,7 @@ impl Wallet { let mut buf = vec![]; tx.consensus_encode(&mut buf).unwrap(); *input_tx = tx; - (buf, Amount(AMOUNT)) + (buf, Balance { coin: Coin::Bitcoin, amount: Amount(AMOUNT) }) } Wallet::Monero { handle, ref spend_key, ref view_pair, ref mut inputs } => { @@ -376,7 +378,31 @@ impl Wallet { .remove(0), ); - (tx.serialize(), Amount(AMOUNT)) + (tx.serialize(), Balance { coin: Coin::Monero, amount: Amount(AMOUNT) }) + } + } + } + + pub fn address(&self) -> ExternalAddress { + use serai_client::coins; + + match self { + Wallet::Bitcoin { public_key, .. } => { + use bitcoin_serai::bitcoin::{Network, Address}; + ExternalAddress::new( + coins::bitcoin::Address(Address::p2pkh(public_key, Network::Regtest)).try_into().unwrap(), + ) + .unwrap() + } + Wallet::Monero { view_pair, .. } => { + use monero_serai::wallet::address::{Network, AddressSpec}; + ExternalAddress::new( + coins::monero::Address::new(view_pair.address(Network::Mainnet, AddressSpec::Standard)) + .unwrap() + .try_into() + .unwrap(), + ) + .unwrap() } } } diff --git a/tests/processor/src/tests/batch.rs b/tests/processor/src/tests/batch.rs index 77b798f4..e697e017 100644 --- a/tests/processor/src/tests/batch.rs +++ b/tests/processor/src/tests/batch.rs @@ -1,17 +1,18 @@ -use std::collections::HashMap; +use std::{ + collections::HashMap, + time::{SystemTime, Duration}, +}; use dkg::{Participant, tests::clone_without}; -use messages::sign::SignId; +use messages::{sign::SignId, SubstrateContext}; -use serai_primitives::{ - BlockHash, crypto::RuntimePublic, PublicKey, SeraiAddress, NetworkId, Coin, Balance, +use serai_client::{ + primitives::{BlockHash, crypto::RuntimePublic, PublicKey, SeraiAddress, NetworkId}, + in_instructions::primitives::{ + InInstruction, InInstructionWithBalance, SignedBatch, batch_message, + }, }; -use serai_in_instructions_primitives::{ - InInstruction, InInstructionWithBalance, SignedBatch, batch_message, -}; - -use dockertest::DockerTest; use crate::{*, tests::*}; @@ -135,21 +136,46 @@ pub(crate) async fn sign_batch( batch.unwrap() } +pub(crate) async fn substrate_block( + coordinator: &mut Coordinator, + block: messages::substrate::CoordinatorMessage, +) { + match block.clone() { + messages::substrate::CoordinatorMessage::SubstrateBlock { + context: _, + network: sent_network, + block: sent_block, + key: _, + burns, + } => { + coordinator.send_message(block).await; + match coordinator.recv_message().await { + messages::ProcessorMessage::Coordinator( + messages::coordinator::ProcessorMessage::SubstrateBlockAck { + network: recvd_network, + block: recvd_block, + plans, + }, + ) => { + assert_eq!(recvd_network, sent_network); + assert_eq!(recvd_block, sent_block); + // TODO: This isn't the correct formula at all + assert_eq!(plans.len(), if burns.is_empty() { 0 } else { 1 }); + } + _ => panic!("coordinator didn't respond to SubstrateBlock with SubstrateBlockAck"), + } + } + _ => panic!("substrate_block message wasn't a SubstrateBlock"), + } +} + #[test] fn batch_test() { for network in [NetworkId::Bitcoin, NetworkId::Monero] { - let mut coordinators = vec![]; - let mut test = DockerTest::new(); - for _ in 0 .. COORDINATORS { - let (handles, coord_key, compositions) = processor_stack(network); - coordinators.push((handles, coord_key)); - for composition in compositions { - test.add_composition(composition); - } - } + let (coordinators, test) = new_test(network); test.run(|ops| async move { - tokio::time::sleep(core::time::Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_secs(1)).await; let mut coordinators = coordinators .into_iter() @@ -173,6 +199,7 @@ fn batch_test() { coordinators[0].sync(&ops, &coordinators[1 ..]).await; // Run twice, once with an instruction and once without + let substrate_block_num = (OsRng.next_u64() % 4_000_000_000u64) + 1; for i in 0 .. 2 { let mut serai_address = [0; 32]; OsRng.fill_bytes(&mut serai_address); @@ -180,7 +207,7 @@ fn batch_test() { if i == 1 { Some(InInstruction::Transfer(SeraiAddress(serai_address))) } else { None }; // Send into the processor's wallet - let (tx, amount_sent) = + let (tx, balance_sent) = wallet.send_to_address(&ops, &key_pair.1, instruction.clone()).await; for coordinator in &mut coordinators { coordinator.publish_transacton(&ops, &tx).await; @@ -198,7 +225,7 @@ fn batch_test() { // Sleep for 10s // The scanner works on a 5s interval, so this leaves a few s for any processing/latency - tokio::time::sleep(core::time::Duration::from_secs(10)).await; + tokio::time::sleep(Duration::from_secs(10)).await; // Make sure the proceessors picked it up by checking they're trying to sign a batch for it let (mut id, mut preprocesses) = @@ -229,18 +256,7 @@ fn batch_test() { if let Some(instruction) = instruction { assert_eq!( batch.batch.instructions, - vec![InInstructionWithBalance { - instruction, - balance: Balance { - coin: match network { - NetworkId::Bitcoin => Coin::Bitcoin, - NetworkId::Ethereum => todo!(), - NetworkId::Monero => Coin::Monero, - NetworkId::Serai => panic!("running processor tests on Serai"), - }, - amount: amount_sent, - } - }] + vec![InInstructionWithBalance { instruction, balance: balance_sent }] ); } else { // This shouldn't have an instruction as we didn't add any data into the TX we sent @@ -248,6 +264,27 @@ fn batch_test() { // contained outputs assert!(batch.batch.instructions.is_empty()); } + + // Fire a SubstrateBlock + let serai_time = + SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); + for coordinator in &mut coordinators { + substrate_block( + coordinator, + messages::substrate::CoordinatorMessage::SubstrateBlock { + context: SubstrateContext { + serai_time, + coin_latest_finalized_block: batch.batch.block, + }, + network, + block: substrate_block_num + u64::from(i), + // TODO: Should we use the network key here? Or should we only use the Ristretto key? + key: key_pair.1.to_vec(), + burns: vec![], + }, + ) + .await; + } } }); } diff --git a/tests/processor/src/tests/key_gen.rs b/tests/processor/src/tests/key_gen.rs index 1a97c3e7..447d91cd 100644 --- a/tests/processor/src/tests/key_gen.rs +++ b/tests/processor/src/tests/key_gen.rs @@ -2,13 +2,13 @@ use std::{collections::HashMap, time::SystemTime}; use dkg::{Participant, ThresholdParams, tests::clone_without}; -use serai_primitives::{NetworkId, BlockHash, PublicKey}; -use serai_validator_sets_primitives::{Session, KeyPair, ValidatorSet}; +use serai_client::{ + primitives::{NetworkId, BlockHash, PublicKey}, + validator_sets::primitives::{Session, KeyPair, ValidatorSet}, +}; use messages::{SubstrateContext, key_gen::KeyGenId, CoordinatorMessage, ProcessorMessage}; -use dockertest::DockerTest; - use crate::{*, tests::*}; pub(crate) async fn key_gen(coordinators: &mut [Coordinator], network: NetworkId) -> KeyPair { @@ -140,15 +140,7 @@ pub(crate) async fn key_gen(coordinators: &mut [Coordinator], network: NetworkId #[test] fn key_gen_test() { for network in [NetworkId::Bitcoin, NetworkId::Monero] { - let mut coordinators = vec![]; - let mut test = DockerTest::new(); - for _ in 0 .. COORDINATORS { - let (handles, coord_key, compositions) = processor_stack(network); - coordinators.push((handles, coord_key)); - for composition in compositions { - test.add_composition(composition); - } - } + let (coordinators, test) = new_test(network); test.run(|ops| async move { // Sleep for a second for the message-queue to boot diff --git a/tests/processor/src/tests/mod.rs b/tests/processor/src/tests/mod.rs index 28805c4d..ffca6061 100644 --- a/tests/processor/src/tests/mod.rs +++ b/tests/processor/src/tests/mod.rs @@ -1,7 +1,31 @@ +use ciphersuite::{Ciphersuite, Ristretto}; + +use serai_client::primitives::NetworkId; + +use dockertest::DockerTest; + +use crate::*; + mod key_gen; pub(crate) use key_gen::key_gen; mod batch; +pub(crate) use batch::{recv_batch_preprocesses, sign_batch, substrate_block}; + +mod send; pub(crate) const COORDINATORS: usize = 4; pub(crate) const THRESHOLD: usize = ((COORDINATORS * 2) / 3) + 1; + +fn new_test(network: NetworkId) -> (Vec<(Handles, ::F)>, DockerTest) { + let mut coordinators = vec![]; + let mut test = DockerTest::new(); + for _ in 0 .. COORDINATORS { + let (handles, coord_key, compositions) = processor_stack(network); + coordinators.push((handles, coord_key)); + for composition in compositions { + test.add_composition(composition); + } + } + (coordinators, test) +} diff --git a/tests/processor/src/tests/send.rs b/tests/processor/src/tests/send.rs new file mode 100644 index 00000000..858a8634 --- /dev/null +++ b/tests/processor/src/tests/send.rs @@ -0,0 +1,245 @@ +use std::{ + collections::HashMap, + time::{SystemTime, Duration}, +}; + +use dkg::{Participant, tests::clone_without}; + +use messages::{sign::SignId, SubstrateContext}; + +use serai_client::{ + primitives::{BlockHash, NetworkId}, + tokens::primitives::{OutInstruction, OutInstructionWithBalance}, +}; + +use crate::{*, tests::*}; + +#[allow(unused)] +pub(crate) async fn recv_sign_preprocesses( + coordinators: &mut [Coordinator], + key: [u8; 32], + attempt: u32, +) -> (SignId, HashMap>) { + let mut id = None; + let mut preprocesses = HashMap::new(); + for (i, coordinator) in coordinators.iter_mut().enumerate() { + let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap(); + + let msg = coordinator.recv_message().await; + match msg { + messages::ProcessorMessage::Sign(messages::sign::ProcessorMessage::Preprocess { + id: this_id, + preprocess, + }) => { + if id.is_none() { + assert_eq!(&this_id.key, &key); + assert_eq!(this_id.attempt, attempt); + id = Some(this_id.clone()); + } + assert_eq!(&this_id, id.as_ref().unwrap()); + + preprocesses.insert(i, preprocess); + } + _ => panic!("processor didn't send sign preprocess"), + } + } + + // Reduce the preprocesses down to the threshold + while preprocesses.len() > THRESHOLD { + preprocesses.remove( + &Participant::new( + u16::try_from(OsRng.next_u64() % u64::try_from(COORDINATORS).unwrap()).unwrap() + 1, + ) + .unwrap(), + ); + } + + (id.unwrap(), preprocesses) +} + +#[allow(unused)] +pub(crate) async fn sign_tx( + coordinators: &mut [Coordinator], + id: SignId, + preprocesses: HashMap>, +) -> Vec { + assert_eq!(preprocesses.len(), THRESHOLD); + + for (i, coordinator) in coordinators.iter_mut().enumerate() { + let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap(); + + if preprocesses.contains_key(&i) { + coordinator + .send_message(messages::sign::CoordinatorMessage::Preprocesses { + id: id.clone(), + preprocesses: clone_without(&preprocesses, &i), + }) + .await; + } + } + + let mut shares = HashMap::new(); + for (i, coordinator) in coordinators.iter_mut().enumerate() { + let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap(); + + if preprocesses.contains_key(&i) { + match coordinator.recv_message().await { + messages::ProcessorMessage::Sign(messages::sign::ProcessorMessage::Share { + id: this_id, + share, + }) => { + assert_eq!(&this_id, &id); + shares.insert(i, share); + } + _ => panic!("processor didn't send TX shares"), + } + } + } + + for (i, coordinator) in coordinators.iter_mut().enumerate() { + let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap(); + + if preprocesses.contains_key(&i) { + coordinator + .send_message(messages::sign::CoordinatorMessage::Shares { + id: id.clone(), + shares: clone_without(&shares, &i), + }) + .await; + } + } + + // The selected processors should yield Completed + let mut tx = None; + for (i, coordinator) in coordinators.iter_mut().enumerate() { + let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap(); + + if preprocesses.contains_key(&i) { + match coordinator.recv_message().await { + messages::ProcessorMessage::Sign(messages::sign::ProcessorMessage::Completed { + key, + id: this_id, + tx: this_tx, + }) => { + assert_eq!(&key, &id.key); + assert_eq!(&this_id, &id.id); + + if tx.is_none() { + tx = Some(this_tx.clone()); + } + + assert_eq!(tx.as_ref().unwrap(), &this_tx); + } + _ => panic!("processor didn't send Completed"), + } + } + } + tx.unwrap() +} + +#[test] +fn send_test() { + for network in [NetworkId::Bitcoin, NetworkId::Monero] { + let (coordinators, test) = new_test(network); + + test.run(|ops| async move { + tokio::time::sleep(Duration::from_secs(1)).await; + + let mut coordinators = coordinators + .into_iter() + .map(|(handles, key)| Coordinator::new(network, &ops, handles, key)) + .collect::>(); + + // Create a wallet before we start generating keys + let mut wallet = Wallet::new(network, &ops, coordinators[0].network_handle.clone()).await; + coordinators[0].sync(&ops, &coordinators[1 ..]).await; + + // Generate keys + let key_pair = key_gen(&mut coordinators, network).await; + + // Now we we have to mine blocks to activate the key + // (the first key is activated when the coin's block time exceeds the Serai time it was + // confirmed at) + + for _ in 0 .. confirmations(network) { + coordinators[0].add_block(&ops).await; + } + coordinators[0].sync(&ops, &coordinators[1 ..]).await; + + // Send into the processor's wallet + let (tx, balance_sent) = wallet.send_to_address(&ops, &key_pair.1, None).await; + for coordinator in &mut coordinators { + coordinator.publish_transacton(&ops, &tx).await; + } + + // Put the TX past the confirmation depth + let mut block_with_tx = None; + for _ in 0 .. confirmations(network) { + let (hash, _) = coordinators[0].add_block(&ops).await; + if block_with_tx.is_none() { + block_with_tx = Some(hash); + } + } + coordinators[0].sync(&ops, &coordinators[1 ..]).await; + + // Sleep for 10s + // The scanner works on a 5s interval, so this leaves a few s for any processing/latency + tokio::time::sleep(Duration::from_secs(10)).await; + + // Make sure the proceessors picked it up by checking they're trying to sign a batch for it + let (id, preprocesses) = recv_batch_preprocesses(&mut coordinators, key_pair.0 .0, 0).await; + + // Continue with signing the batch + let batch = sign_batch(&mut coordinators, id, preprocesses).await; + + // Check it + assert_eq!(batch.batch.network, network); + assert_eq!(batch.batch.id, 0); + assert_eq!(batch.batch.block, BlockHash(block_with_tx.unwrap())); + assert!(batch.batch.instructions.is_empty()); + + // Fire a SubstrateBlock with a burn + let substrate_block_num = (OsRng.next_u64() % 4_000_000_000u64) + 1; + let serai_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); + + for coordinator in &mut coordinators { + substrate_block( + coordinator, + messages::substrate::CoordinatorMessage::SubstrateBlock { + context: SubstrateContext { + serai_time, + coin_latest_finalized_block: batch.batch.block, + }, + network, + block: substrate_block_num, + // TODO: Should we use the network key here? Or should we only use the Ristretto key? + key: key_pair.1.to_vec(), + burns: vec![OutInstructionWithBalance { + instruction: OutInstruction { address: wallet.address(), data: None }, + balance: balance_sent, + }], + }, + ) + .await; + } + + /* + // Trigger a random amount of re-attempts + for attempt in 1 ..= u32::try_from(OsRng.next_u64() % 4).unwrap() { + // TODO: Double check how the processor handles this ID field + // It should be able to assert its perfectly sequential + id.attempt = attempt; + for coordinator in coordinators.iter_mut() { + coordinator + .send_message(messages::sign::CoordinatorMessage::Reattempt { + id: id.clone(), + }) + .await; + } + (id, preprocesses) = + recv_batch_preprocesses(&mut coordinators, key_pair.0 .0, attempt).await; + } + */ + }); + } +}