Test the processor's batch signing

Updates message-queue ot try recv every second, not 5.
This commit is contained in:
Luke Parker
2023-07-25 18:09:23 -04:00
parent a2493cfafc
commit 88a1fce15c
8 changed files with 123 additions and 35 deletions

View File

@@ -146,14 +146,15 @@ impl Coordinator {
}
/// Send a message to a processor as its coordinator.
pub async fn send_message(&mut self, msg: CoordinatorMessage) {
pub async fn send_message(&mut self, msg: impl Into<CoordinatorMessage>) {
let msg: CoordinatorMessage = msg.into();
self
.queue
.queue(
Metadata {
from: Service::Coordinator,
to: Service::Processor(self.network),
intent: self.next_send_id.to_le_bytes().to_vec(),
intent: msg.intent(),
},
serde_json::to_string(&msg).unwrap().into_bytes(),
)
@@ -174,7 +175,7 @@ impl Coordinator {
serde_json::from_slice(&msg.msg).unwrap()
}
pub async fn add_block(&self, ops: &DockerOperations) -> Vec<u8> {
pub async fn add_block(&self, ops: &DockerOperations) -> ([u8; 32], Vec<u8>) {
let rpc_url = network_rpc(self.network, ops, &self.network_handle);
match self.network {
NetworkId::Bitcoin => {
@@ -193,16 +194,12 @@ impl Coordinator {
.await
.unwrap();
// Get it to return it
let block = rpc
.get_block(
&rpc.get_block_hash(rpc.get_latest_block_number().await.unwrap()).await.unwrap(),
)
.await
.unwrap();
// Get it so we can return it
let hash = rpc.get_block_hash(rpc.get_latest_block_number().await.unwrap()).await.unwrap();
let block = rpc.get_block(&hash).await.unwrap();
let mut block_buf = vec![];
block.consensus_encode(&mut block_buf).unwrap();
block_buf
(hash, block_buf)
}
NetworkId::Ethereum => todo!(),
NetworkId::Monero => {
@@ -229,11 +226,8 @@ impl Coordinator {
)
.await
.unwrap();
rpc
.get_block(rpc.get_block_hash(rpc.get_height().await.unwrap() - 1).await.unwrap())
.await
.unwrap()
.serialize()
let hash = rpc.get_block_hash(rpc.get_height().await.unwrap() - 1).await.unwrap();
(hash, rpc.get_block(hash).await.unwrap().serialize())
}
NetworkId::Serai => panic!("processor tests adding block to Serai"),
}

View File

@@ -1,4 +1,9 @@
use serai_primitives::NetworkId;
use std::collections::HashMap;
use dkg::{Participant, tests::clone_without};
use serai_primitives::{NetworkId, BlockHash, crypto::RuntimePublic, PublicKey};
use serai_in_instructions_primitives::batch_message;
use dockertest::DockerTest;
@@ -25,7 +30,11 @@ fn scan_test() {
.map(|(handles, key)| Coordinator::new(network, &ops, handles, key))
.collect::<Vec<_>>();
// Start by generating keys
// Create a wallet before we start generating keys
let mut wallet = Wallet::new(network, &ops, coordinators[0].network_handle.clone()).await;
coordinators[0].sync(&ops, &coordinators[1 ..]).await;
// Generate keys
let key_pair = key_gen(&mut coordinators, network).await;
// Now we we have to mine blocks to activate the key
@@ -33,43 +42,120 @@ fn scan_test() {
// confirmed at)
for _ in 0 .. confirmations(network) {
let block = coordinators[0].add_block(&ops).await;
for coordinator in &coordinators[1 ..] {
coordinator.broadcast_block(&ops, &block).await;
}
coordinators[0].add_block(&ops).await;
}
coordinators[0].sync(&ops, &coordinators[1 ..]).await;
// Send into the processor's wallet
let mut wallet = Wallet::new(network, &ops, coordinators[0].network_handle.clone()).await;
coordinators[0].sync(&ops, &coordinators[1 ..]).await;
let tx = wallet.send_to_address(&ops, &key_pair.1).await;
for coordinator in &coordinators {
coordinator.publish_transacton(&ops, &tx).await;
}
// Put the TX past the confirmation depth
let mut block_with_tx = None;
for _ in 0 .. confirmations(network) {
let block = coordinators[0].add_block(&ops).await;
for coordinator in &coordinators[1 ..] {
coordinator.broadcast_block(&ops, &block).await;
let (hash, _) = coordinators[0].add_block(&ops).await;
if block_with_tx.is_none() {
block_with_tx = Some(hash);
}
}
coordinators[0].sync(&ops, &coordinators[1 ..]).await;
// Sleep for 10s
// The scanner works on a 5s interval, so this leaves a few s for any processing/latency
tokio::time::sleep(core::time::Duration::from_secs(10)).await;
// Make sure the coordinators picked it up by checking they're trying to sign a batch for it
for coordinator in &mut coordinators {
// Make sure the proceessors picked it up by checking they're trying to sign a batch for it
let mut id = None;
let mut preprocesses = HashMap::new();
for (i, coordinator) in coordinators.iter_mut().enumerate() {
// Only use their preprocess if they're within the threshold
let i = if i < THRESHOLD {
Some(Participant::new(u16::try_from(i).unwrap() + 1).unwrap())
} else {
None
};
let msg = coordinator.recv_message().await;
match msg {
messages::ProcessorMessage::Coordinator(
messages::coordinator::ProcessorMessage::BatchPreprocess { id, .. },
messages::coordinator::ProcessorMessage::BatchPreprocess { id: this_id, preprocess },
) => {
assert_eq!(&id.key, &key_pair.0 .0);
assert_eq!(id.attempt, 0);
assert_eq!(&this_id.key, &key_pair.0 .0);
assert_eq!(this_id.attempt, 0);
if id.is_none() {
id = Some(this_id.clone());
}
assert_eq!(&this_id, id.as_ref().unwrap());
if let Some(i) = i {
preprocesses.insert(i, preprocess);
}
}
_ => panic!("processor didn't send batch preprocess"),
}
}
let id = id.unwrap();
// Continue with batch siging by sending the preprocesses to selected parties
for (i, coordinator) in coordinators.iter_mut().enumerate().take(THRESHOLD) {
let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap();
coordinator
.send_message(messages::coordinator::CoordinatorMessage::BatchPreprocesses {
id: id.clone(),
preprocesses: clone_without(&preprocesses, &i),
})
.await;
}
let mut shares = HashMap::new();
for (i, coordinator) in coordinators.iter_mut().enumerate().take(THRESHOLD) {
let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap();
match coordinator.recv_message().await {
messages::ProcessorMessage::Coordinator(
messages::coordinator::ProcessorMessage::BatchShare { id: this_id, share },
) => {
assert_eq!(&this_id, &id);
shares.insert(i, share);
}
_ => panic!("processor didn't send batch share"),
}
}
for (i, coordinator) in coordinators.iter_mut().enumerate().take(THRESHOLD) {
let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap();
coordinator
.send_message(messages::coordinator::CoordinatorMessage::BatchShares {
id: id.clone(),
shares: clone_without(&shares, &i),
})
.await;
}
// The selected processors should yield the batch
for coordinator in coordinators.iter_mut().take(THRESHOLD) {
match coordinator.recv_message().await {
messages::ProcessorMessage::Substrate(
messages::substrate::ProcessorMessage::Update { key, batch },
) => {
assert_eq!(&key, &key_pair.0 .0);
assert_eq!(batch.batch.network, network);
assert_eq!(batch.batch.id, 0);
assert!(PublicKey::from_raw(key_pair.0 .0)
.verify(&batch_message(&batch.batch), &batch.signature));
assert_eq!(batch.batch.block, BlockHash(block_with_tx.unwrap()));
// This shouldn't have an instruction as we didn't add any data into the TX we sent
assert!(batch.batch.instructions.is_empty());
}
_ => panic!("processor didn't send batch"),
}
}
});
}
}