3 Commits

Author SHA1 Message Date
Luke Parker
aa666afc08 Don't clear cache within a batch build
A caller *can* call batch from a threaded environment and still trigger this at
this time. I'm unsure that use case exists/matters.

If GITHUB_CI is set, build in two batches to try and avoid storage limits.
2023-11-27 03:25:25 -05:00
Luke Parker
9da1d714b3 Fixes to name handling 2023-11-27 02:00:16 -05:00
Luke Parker
292263b21e Simultaenously build Docker images used in tests 2023-11-27 01:27:04 -05:00
23 changed files with 679 additions and 544 deletions

2
Cargo.lock generated
View File

@@ -7646,6 +7646,7 @@ name = "serai-docker-tests"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"chrono", "chrono",
"tokio",
] ]
[[package]] [[package]]
@@ -7665,6 +7666,7 @@ dependencies = [
"rand_core", "rand_core",
"serai-client", "serai-client",
"serai-coordinator-tests", "serai-coordinator-tests",
"serai-docker-tests",
"serai-message-queue-tests", "serai-message-queue-tests",
"serai-processor", "serai-processor",
"serai-processor-tests", "serai-processor-tests",

View File

@@ -19,8 +19,8 @@ mod bitcoin {
check::<IsTrue<{ Bitcoin::DUST >= bitcoin_serai::wallet::DUST }>>(); check::<IsTrue<{ Bitcoin::DUST >= bitcoin_serai::wallet::DUST }>>();
} }
fn spawn_bitcoin() -> DockerTest { async fn spawn_bitcoin() -> DockerTest {
serai_docker_tests::build("bitcoin".to_string()); serai_docker_tests::build("bitcoin".to_string()).await;
let composition = TestBodySpecification::with_image( let composition = TestBodySpecification::with_image(
Image::with_repository("serai-dev-bitcoin").pull_policy(PullPolicy::Never), Image::with_repository("serai-dev-bitcoin").pull_policy(PullPolicy::Never),
@@ -73,8 +73,8 @@ mod monero {
use super::*; use super::*;
use crate::networks::{Network, Monero}; use crate::networks::{Network, Monero};
fn spawn_monero() -> DockerTest { async fn spawn_monero() -> DockerTest {
serai_docker_tests::build("monero".to_string()); serai_docker_tests::build("monero".to_string()).await;
let composition = TestBodySpecification::with_image( let composition = TestBodySpecification::with_image(
Image::with_repository("serai-dev-monero").pull_policy(PullPolicy::Never), Image::with_repository("serai-dev-monero").pull_policy(PullPolicy::Never),

View File

@@ -44,49 +44,59 @@ macro_rules! test_network {
test_key_gen::<$N>().await; test_key_gen::<$N>().await;
} }
#[test] #[tokio::test]
fn $scanner() { async fn $scanner() {
*INIT_LOGGER; *INIT_LOGGER;
let docker = $docker(); let docker = $docker().await;
docker.run(|ops| async move { docker
test_scanner($network(&ops).await).await; .run_async(|ops| async move {
}); test_scanner($network(&ops).await).await;
})
.await;
} }
#[test] #[tokio::test]
fn $signer() { async fn $signer() {
*INIT_LOGGER; *INIT_LOGGER;
let docker = $docker(); let docker = $docker().await;
docker.run(|ops| async move { docker
test_signer($network(&ops).await).await; .run_async(|ops| async move {
}); test_signer($network(&ops).await).await;
})
.await;
} }
#[test] #[tokio::test]
fn $wallet() { async fn $wallet() {
*INIT_LOGGER; *INIT_LOGGER;
let docker = $docker(); let docker = $docker().await;
docker.run(|ops| async move { docker
test_wallet($network(&ops).await).await; .run_async(|ops| async move {
}); test_wallet($network(&ops).await).await;
})
.await;
} }
#[test] #[tokio::test]
fn $addresses() { async fn $addresses() {
*INIT_LOGGER; *INIT_LOGGER;
let docker = $docker(); let docker = $docker().await;
docker.run(|ops| async move { docker
test_addresses($network(&ops).await).await; .run_async(|ops| async move {
}); test_addresses($network(&ops).await).await;
})
.await;
} }
#[test] #[tokio::test]
fn $no_deadlock_in_multisig_completed() { async fn $no_deadlock_in_multisig_completed() {
*INIT_LOGGER; *INIT_LOGGER;
let docker = $docker(); let docker = $docker().await;
docker.run(|ops| async move { docker
test_no_deadlock_in_multisig_completed($network(&ops).await).await; .run_async(|ops| async move {
}); test_no_deadlock_in_multisig_completed($network(&ops).await).await;
})
.await;
} }
}; };
} }

View File

@@ -14,7 +14,7 @@ macro_rules! serai_test {
TestBodySpecification, DockerTest, TestBodySpecification, DockerTest,
}; };
serai_docker_tests::build("serai".to_string()); serai_docker_tests::build("serai".to_string()).await;
let handle = concat!("serai_client-serai_node-", stringify!($name)); let handle = concat!("serai_client-serai_node-", stringify!($name));

View File

@@ -37,14 +37,18 @@ mod tests;
static UNIQUE_ID: OnceLock<Mutex<u16>> = OnceLock::new(); static UNIQUE_ID: OnceLock<Mutex<u16>> = OnceLock::new();
pub fn coordinator_instance( pub fn coordinator_docker_name() -> String {
"serai-dev-coordinator".to_string()
}
pub async fn coordinator_instance(
name: &str, name: &str,
message_queue_key: <Ristretto as Ciphersuite>::F, message_queue_key: <Ristretto as Ciphersuite>::F,
) -> TestBodySpecification { ) -> TestBodySpecification {
serai_docker_tests::build("coordinator".to_string()); serai_docker_tests::build("coordinator".to_string()).await;
TestBodySpecification::with_image( TestBodySpecification::with_image(
Image::with_repository("serai-dev-coordinator").pull_policy(PullPolicy::Never), Image::with_repository(coordinator_docker_name()).pull_policy(PullPolicy::Never),
) )
.replace_env( .replace_env(
[ [
@@ -63,11 +67,15 @@ pub fn coordinator_instance(
) )
} }
pub fn serai_composition(name: &str) -> TestBodySpecification { pub fn serai_docker_name() -> String {
serai_docker_tests::build("serai".to_string()); "serai-dev-serai".to_string()
}
pub async fn serai_composition(name: &str) -> TestBodySpecification {
serai_docker_tests::build("serai".to_string()).await;
TestBodySpecification::with_image( TestBodySpecification::with_image(
Image::with_repository("serai-dev-serai").pull_policy(PullPolicy::Never), Image::with_repository(serai_docker_name()).pull_policy(PullPolicy::Never),
) )
.replace_cmd(vec![ .replace_cmd(vec![
"serai-node".to_string(), "serai-node".to_string(),
@@ -82,15 +90,22 @@ pub fn serai_composition(name: &str) -> TestBodySpecification {
} }
pub type Handles = (String, String, String); pub type Handles = (String, String, String);
pub fn coordinator_stack( pub async fn coordinator_stack(
name: &str, name: &str,
) -> (Handles, <Ristretto as Ciphersuite>::F, Vec<TestBodySpecification>) { ) -> (Handles, <Ristretto as Ciphersuite>::F, Vec<TestBodySpecification>) {
let serai_composition = serai_composition(name); serai_docker_tests::build_batch(vec![
serai_docker_name(),
serai_message_queue_tests::docker_name(),
coordinator_docker_name(),
])
.await;
let serai_composition = serai_composition(name).await;
let (coord_key, message_queue_keys, message_queue_composition) = let (coord_key, message_queue_keys, message_queue_composition) =
serai_message_queue_tests::instance(); serai_message_queue_tests::instance().await;
let coordinator_composition = coordinator_instance(name, coord_key); let coordinator_composition = coordinator_instance(name, coord_key).await;
// Give every item in this stack a unique ID // Give every item in this stack a unique ID
// Uses a Mutex as we can't generate a 8-byte random ID without hitting hostname length limits // Uses a Mutex as we can't generate a 8-byte random ID without hitting hostname length limits

View File

@@ -253,7 +253,7 @@ pub async fn batch(
#[tokio::test] #[tokio::test]
async fn batch_test() { async fn batch_test() {
let _one_at_a_time = ONE_AT_A_TIME.get_or_init(|| Mutex::new(())).lock(); let _one_at_a_time = ONE_AT_A_TIME.get_or_init(|| Mutex::new(())).lock();
let (processors, test) = new_test(); let (processors, test) = new_test().await;
test test
.run_async(|ops| async move { .run_async(|ops| async move {

View File

@@ -222,7 +222,7 @@ pub async fn key_gen<C: Ciphersuite>(
#[tokio::test] #[tokio::test]
async fn key_gen_test() { async fn key_gen_test() {
let _one_at_a_time = ONE_AT_A_TIME.get_or_init(|| Mutex::new(())).lock(); let _one_at_a_time = ONE_AT_A_TIME.get_or_init(|| Mutex::new(())).lock();
let (processors, test) = new_test(); let (processors, test) = new_test().await;
test test
.run_async(|ops| async move { .run_async(|ops| async move {

View File

@@ -21,7 +21,7 @@ pub(crate) const THRESHOLD: usize = ((COORDINATORS * 2) / 3) + 1;
pub(crate) static ONE_AT_A_TIME: OnceLock<Mutex<()>> = OnceLock::new(); pub(crate) static ONE_AT_A_TIME: OnceLock<Mutex<()>> = OnceLock::new();
pub(crate) fn new_test() -> (Vec<(Handles, <Ristretto as Ciphersuite>::F)>, DockerTest) { pub(crate) async fn new_test() -> (Vec<(Handles, <Ristretto as Ciphersuite>::F)>, DockerTest) {
let mut coordinators = vec![]; let mut coordinators = vec![];
let mut test = DockerTest::new().with_network(dockertest::Network::Isolated); let mut test = DockerTest::new().with_network(dockertest::Network::Isolated);
for i in 0 .. COORDINATORS { for i in 0 .. COORDINATORS {
@@ -33,7 +33,8 @@ pub(crate) fn new_test() -> (Vec<(Handles, <Ristretto as Ciphersuite>::F)>, Dock
4 => "Eve", 4 => "Eve",
5 => "Ferdie", 5 => "Ferdie",
_ => panic!("needed a 7th name for a serai node"), _ => panic!("needed a 7th name for a serai node"),
}); })
.await;
coordinators.push((handles, coord_key)); coordinators.push((handles, coord_key));
for composition in compositions { for composition in compositions {
test.provide_container(composition); test.provide_container(composition);

View File

@@ -170,7 +170,7 @@ pub async fn sign(
#[tokio::test] #[tokio::test]
async fn sign_test() { async fn sign_test() {
let _one_at_a_time = ONE_AT_A_TIME.get_or_init(|| Mutex::new(())).lock(); let _one_at_a_time = ONE_AT_A_TIME.get_or_init(|| Mutex::new(())).lock();
let (processors, test) = new_test(); let (processors, test) = new_test().await;
test test
.run_async(|ops| async move { .run_async(|ops| async move {

View File

@@ -15,3 +15,5 @@ rustdoc-args = ["--cfg", "docsrs"]
[dependencies] [dependencies]
chrono = "0.4" chrono = "0.4"
tokio = { version = "1", default-features = false, features = ["sync"] }

View File

@@ -1,19 +1,27 @@
use std::{ use std::{
sync::{Mutex, OnceLock}, sync::{OnceLock, Arc},
collections::{HashSet, HashMap}, collections::{HashSet, HashMap},
time::SystemTime, time::SystemTime,
path::PathBuf, path::PathBuf,
fs, env, fs, env,
process::Command,
}; };
static BUILT: OnceLock<Mutex<HashMap<String, bool>>> = OnceLock::new(); use tokio::{sync::Mutex, process::Command};
pub fn build(name: String) {
static BUILT: OnceLock<Mutex<HashMap<String, Arc<Mutex<bool>>>>> = OnceLock::new();
async fn build_inner(name: String) {
let built = BUILT.get_or_init(|| Mutex::new(HashMap::new())); let built = BUILT.get_or_init(|| Mutex::new(HashMap::new()));
// Only one call to build will acquire this lock // Only one call to build will acquire this lock
let mut built_lock = built.lock().unwrap(); let mut built_lock = built.lock().await;
if built_lock.contains_key(&name) { if !built_lock.contains_key(&name) {
// If it was built, return built_lock.insert(name.clone(), Arc::new(Mutex::new(false)));
}
let this_lock = built_lock[&name].clone();
drop(built_lock);
let mut built_lock = this_lock.lock().await;
// Already built
if *built_lock {
return; return;
} }
@@ -30,14 +38,17 @@ pub fn build(name: String) {
let mut orchestration_path = repo_path.clone(); let mut orchestration_path = repo_path.clone();
orchestration_path.push("orchestration"); orchestration_path.push("orchestration");
let name_without_serai_dev = name.split("serai-dev-").nth(1).unwrap_or(&name);
// If this Docker image was created after this repo was last edited, return here // If this Docker image was created after this repo was last edited, return here
// This should have better performance than Docker and allows running while offline // This should have better performance than Docker and allows running while offline
if let Ok(res) = Command::new("docker") if let Ok(res) = Command::new("docker")
.arg("inspect") .arg("inspect")
.arg("-f") .arg("-f")
.arg("{{ .Metadata.LastTagTime }}") .arg("{{ .Metadata.LastTagTime }}")
.arg(format!("serai-dev-{name}")) .arg(name.clone())
.output() .output()
.await
{ {
let last_tag_time_buf = String::from_utf8(res.stdout).expect("docker had non-utf8 output"); let last_tag_time_buf = String::from_utf8(res.stdout).expect("docker had non-utf8 output");
let last_tag_time = last_tag_time_buf.trim(); let last_tag_time = last_tag_time_buf.trim();
@@ -51,16 +62,19 @@ pub fn build(name: String) {
); );
let mut dockerfile_path = orchestration_path.clone(); let mut dockerfile_path = orchestration_path.clone();
if HashSet::from(["bitcoin", "ethereum", "monero"]).contains(name.as_str()) { {
dockerfile_path = dockerfile_path.join("coins"); let name = name_without_serai_dev;
} if HashSet::from(["bitcoin", "ethereum", "monero"]).contains(&name) {
if name.contains("-processor") { dockerfile_path = dockerfile_path.join("coins");
dockerfile_path = dockerfile_path }
.join("processor") if name.contains("-processor") {
.join(name.split('-').next().unwrap()) dockerfile_path = dockerfile_path
.join("Dockerfile"); .join("processor")
} else { .join(name.split('-').next().unwrap())
dockerfile_path = dockerfile_path.join(&name).join("Dockerfile"); .join("Dockerfile");
} else {
dockerfile_path = dockerfile_path.join(name).join("Dockerfile");
}
} }
// For all services, if the Dockerfile was edited after the image was built we should rebuild // For all services, if the Dockerfile was edited after the image was built we should rebuild
@@ -69,7 +83,7 @@ pub fn build(name: String) {
// Check any additionally specified paths // Check any additionally specified paths
let meta = |path: PathBuf| (path.clone(), fs::metadata(path)); let meta = |path: PathBuf| (path.clone(), fs::metadata(path));
let mut metadatas = match name.as_str() { let mut metadatas = match name_without_serai_dev {
"bitcoin" => vec![], "bitcoin" => vec![],
"monero" => vec![], "monero" => vec![],
"message-queue" => vec![ "message-queue" => vec![
@@ -133,7 +147,7 @@ pub fn build(name: String) {
if let Some(last_modified) = last_modified { if let Some(last_modified) = last_modified {
if last_modified < created_time { if last_modified < created_time {
println!("{} was built after the most recent source code edits, assuming built.", name); println!("{} was built after the most recent source code edits, assuming built.", name);
built_lock.insert(name, true); *built_lock = true;
return; return;
} }
} }
@@ -143,6 +157,7 @@ pub fn build(name: String) {
println!("Building {}...", &name); println!("Building {}...", &name);
// Version which always prints // Version which always prints
/*
if !Command::new("docker") if !Command::new("docker")
.current_dir(orchestration_path) .current_dir(orchestration_path)
.arg("compose") .arg("compose")
@@ -151,20 +166,22 @@ pub fn build(name: String) {
.spawn() .spawn()
.unwrap() .unwrap()
.wait() .wait()
.await
.unwrap() .unwrap()
.success() .success()
{ {
panic!("failed to build {name}"); panic!("failed to build {name}");
} }
*/
// Version which only prints on error // Version which only prints on error
/*
let res = Command::new("docker") let res = Command::new("docker")
.current_dir(orchestration_path) .current_dir(orchestration_path)
.arg("compose") .arg("compose")
.arg("build") .arg("build")
.arg(&name) .arg(name_without_serai_dev)
.output() .output()
.await
.unwrap(); .unwrap();
if !res.status.success() { if !res.status.success() {
println!("failed to build {name}\n"); println!("failed to build {name}\n");
@@ -182,10 +199,14 @@ pub fn build(name: String) {
); );
panic!("failed to build {name}"); panic!("failed to build {name}");
} }
*/
println!("Built!"); println!("Built!");
// Set built
*built_lock = true;
}
async fn clear_cache_if_github() {
if std::env::var("GITHUB_CI").is_ok() { if std::env::var("GITHUB_CI").is_ok() {
println!("In CI, so clearing cache to prevent hitting the storage limits."); println!("In CI, so clearing cache to prevent hitting the storage limits.");
if !Command::new("docker") if !Command::new("docker")
@@ -194,14 +215,28 @@ pub fn build(name: String) {
.arg("--all") .arg("--all")
.arg("--force") .arg("--force")
.output() .output()
.await
.unwrap() .unwrap()
.status .status
.success() .success()
{ {
println!("failed to clear cache after building {name}\n"); println!("failed to clear cache\n");
} }
} }
}
// Set built
built_lock.insert(name, true); pub async fn build(name: String) {
build_inner(name).await;
clear_cache_if_github().await;
}
pub async fn build_batch(names: Vec<String>) {
let mut handles = vec![];
for name in names.into_iter().collect::<HashSet<_>>() {
handles.push(tokio::spawn(build_inner(name)));
}
for handle in handles {
handle.await.unwrap();
}
clear_cache_if_github().await;
} }

View File

@@ -35,6 +35,7 @@ serai-client = { path = "../../substrate/client", features = ["serai"] }
tokio = { version = "1", features = ["time"] } tokio = { version = "1", features = ["time"] }
dockertest = "0.4" dockertest = "0.4"
serai-docker-tests = { path = "../docker" }
serai-message-queue-tests = { path = "../message-queue" } serai-message-queue-tests = { path = "../message-queue" }
serai-processor-tests = { path = "../processor" } serai-processor-tests = { path = "../processor" }
serai-coordinator-tests = { path = "../coordinator" } serai-coordinator-tests = { path = "../coordinator" }

View File

@@ -31,19 +31,38 @@ pub struct Handles {
serai: String, serai: String,
} }
pub fn full_stack(name: &str) -> (Handles, Vec<TestBodySpecification>) { pub async fn full_stack(name: &str) -> (Handles, Vec<TestBodySpecification>) {
let (coord_key, message_queue_keys, message_queue_composition) = message_queue_instance(); let mut processor_docker_names = serai_processor_tests::docker_names(NetworkId::Bitcoin);
processor_docker_names.extend(serai_processor_tests::docker_names(NetworkId::Monero));
let (bitcoin_composition, bitcoin_port) = network_instance(NetworkId::Bitcoin); let mut docker_names = vec![
serai_message_queue_tests::docker_name(),
serai_coordinator_tests::serai_docker_name(),
serai_coordinator_tests::coordinator_docker_name(),
];
// If this is in the GH CI, build in two stages so we don't hit storage limits
if std::env::var("GITHUB_CI").is_ok() {
serai_docker_tests::build_batch(processor_docker_names).await;
} else {
docker_names.extend(processor_docker_names);
}
serai_docker_tests::build_batch(docker_names).await;
let (coord_key, message_queue_keys, message_queue_composition) = message_queue_instance().await;
let (bitcoin_composition, bitcoin_port) = network_instance(NetworkId::Bitcoin).await;
let bitcoin_processor_composition = let bitcoin_processor_composition =
processor_instance(NetworkId::Bitcoin, bitcoin_port, message_queue_keys[&NetworkId::Bitcoin]); processor_instance(NetworkId::Bitcoin, bitcoin_port, message_queue_keys[&NetworkId::Bitcoin])
.await;
let (monero_composition, monero_port) = network_instance(NetworkId::Monero); let (monero_composition, monero_port) = network_instance(NetworkId::Monero).await;
let monero_processor_composition = let monero_processor_composition =
processor_instance(NetworkId::Monero, monero_port, message_queue_keys[&NetworkId::Monero]); processor_instance(NetworkId::Monero, monero_port, message_queue_keys[&NetworkId::Monero])
.await;
let coordinator_composition = coordinator_instance(name, coord_key); let coordinator_composition = coordinator_instance(name, coord_key).await;
let serai_composition = serai_composition(name); let serai_composition = serai_composition(name).await;
// Give every item in this stack a unique ID // Give every item in this stack a unique ID
// Uses a Mutex as we can't generate a 8-byte random ID without hitting hostname length limits // Uses a Mutex as we can't generate a 8-byte random ID without hitting hostname length limits

View File

@@ -26,7 +26,7 @@ use crate::tests::*;
#[tokio::test] #[tokio::test]
async fn mint_and_burn_test() { async fn mint_and_burn_test() {
let _one_at_a_time = ONE_AT_A_TIME.get_or_init(|| Mutex::new(())).lock(); let _one_at_a_time = ONE_AT_A_TIME.get_or_init(|| Mutex::new(())).lock();
let (handles, test) = new_test(); let (handles, test) = new_test().await;
test test
.run_async(|ops| async move { .run_async(|ops| async move {

View File

@@ -11,7 +11,7 @@ pub(crate) const VALIDATORS: usize = 4;
pub(crate) static ONE_AT_A_TIME: OnceLock<Mutex<()>> = OnceLock::new(); pub(crate) static ONE_AT_A_TIME: OnceLock<Mutex<()>> = OnceLock::new();
pub(crate) fn new_test() -> (Vec<Handles>, DockerTest) { pub(crate) async fn new_test() -> (Vec<Handles>, DockerTest) {
let mut validators = vec![]; let mut validators = vec![];
let mut test = DockerTest::new().with_network(dockertest::Network::Isolated); let mut test = DockerTest::new().with_network(dockertest::Network::Isolated);
for i in 0 .. VALIDATORS { for i in 0 .. VALIDATORS {
@@ -23,7 +23,8 @@ pub(crate) fn new_test() -> (Vec<Handles>, DockerTest) {
4 => "Eve", 4 => "Eve",
5 => "Ferdie", 5 => "Ferdie",
_ => panic!("needed a 7th name for a serai node"), _ => panic!("needed a 7th name for a serai node"),
}); })
.await;
validators.push(handles); validators.push(handles);
for composition in compositions { for composition in compositions {
test.provide_container(composition); test.provide_container(composition);

View File

@@ -13,10 +13,14 @@ use dockertest::{
PullPolicy, Image, LogAction, LogPolicy, LogSource, LogOptions, TestBodySpecification, PullPolicy, Image, LogAction, LogPolicy, LogSource, LogOptions, TestBodySpecification,
}; };
pub fn docker_name() -> String {
"serai-dev-message-queue".to_string()
}
pub type MessageQueuePrivateKey = <Ristretto as Ciphersuite>::F; pub type MessageQueuePrivateKey = <Ristretto as Ciphersuite>::F;
pub fn instance( pub async fn instance(
) -> (MessageQueuePrivateKey, HashMap<NetworkId, MessageQueuePrivateKey>, TestBodySpecification) { ) -> (MessageQueuePrivateKey, HashMap<NetworkId, MessageQueuePrivateKey>, TestBodySpecification) {
serai_docker_tests::build("message-queue".to_string()); serai_docker_tests::build("message-queue".to_string()).await;
let coord_key = <Ristretto as Ciphersuite>::F::random(&mut OsRng); let coord_key = <Ristretto as Ciphersuite>::F::random(&mut OsRng);
let priv_keys = HashMap::from([ let priv_keys = HashMap::from([
@@ -26,7 +30,7 @@ pub fn instance(
]); ]);
let composition = TestBodySpecification::with_image( let composition = TestBodySpecification::with_image(
Image::with_repository("serai-dev-message-queue").pull_policy(PullPolicy::Never), Image::with_repository(docker_name()).pull_policy(PullPolicy::Never),
) )
.set_log_options(Some(LogOptions { .set_log_options(Some(LogOptions {
action: LogAction::Forward, action: LogAction::Forward,
@@ -58,8 +62,8 @@ pub fn instance(
(coord_key, priv_keys, composition) (coord_key, priv_keys, composition)
} }
#[test] #[tokio::test]
fn basic_functionality() { async fn basic_functionality() {
use zeroize::Zeroizing; use zeroize::Zeroizing;
use dockertest::DockerTest; use dockertest::DockerTest;
@@ -67,99 +71,107 @@ fn basic_functionality() {
use serai_message_queue::{Service, Metadata, client::MessageQueue}; use serai_message_queue::{Service, Metadata, client::MessageQueue};
let mut test = DockerTest::new().with_network(dockertest::Network::Isolated); let mut test = DockerTest::new().with_network(dockertest::Network::Isolated);
let (coord_key, priv_keys, composition) = instance(); let (coord_key, priv_keys, composition) = instance().await;
test.provide_container(composition); test.provide_container(composition);
test.run(|ops| async move { test
tokio::time::timeout(core::time::Duration::from_secs(60), async move { .run_async(|ops| async move {
// Sleep for a second for the message-queue to boot tokio::time::timeout(core::time::Duration::from_secs(60), async move {
// It isn't an error to start immediately, it just silences an error // Sleep for a second for the message-queue to boot
tokio::time::sleep(core::time::Duration::from_secs(1)).await; // It isn't an error to start immediately, it just silences an error
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
let rpc = ops.handle("serai-dev-message-queue").host_port(2287).unwrap(); let rpc = ops.handle("serai-dev-message-queue").host_port(2287).unwrap();
let rpc = rpc.0.to_string() + ":" + &rpc.1.to_string(); let rpc = rpc.0.to_string() + ":" + &rpc.1.to_string();
// Queue some messages // Queue some messages
let coordinator = let coordinator =
MessageQueue::new(Service::Coordinator, rpc.clone(), Zeroizing::new(coord_key)); MessageQueue::new(Service::Coordinator, rpc.clone(), Zeroizing::new(coord_key));
coordinator
.queue(
Metadata {
from: Service::Coordinator,
to: Service::Processor(NetworkId::Bitcoin),
intent: b"intent".to_vec(),
},
b"Hello, World!".to_vec(),
)
.await;
// Queue this twice, which message-queue should de-duplicate
for _ in 0 .. 2 {
coordinator coordinator
.queue( .queue(
Metadata { Metadata {
from: Service::Coordinator, from: Service::Coordinator,
to: Service::Processor(NetworkId::Bitcoin), to: Service::Processor(NetworkId::Bitcoin),
intent: b"intent 2".to_vec(), intent: b"intent".to_vec(),
}, },
b"Hello, World, again!".to_vec(), b"Hello, World!".to_vec(),
) )
.await; .await;
}
// Successfully get it // Queue this twice, which message-queue should de-duplicate
let bitcoin = MessageQueue::new( for _ in 0 .. 2 {
Service::Processor(NetworkId::Bitcoin), coordinator
rpc.clone(), .queue(
Zeroizing::new(priv_keys[&NetworkId::Bitcoin]), Metadata {
); from: Service::Coordinator,
let msg = bitcoin.next(Service::Coordinator).await; to: Service::Processor(NetworkId::Bitcoin),
assert_eq!(msg.from, Service::Coordinator); intent: b"intent 2".to_vec(),
assert_eq!(msg.id, 0); },
assert_eq!(&msg.msg, b"Hello, World!"); b"Hello, World, again!".to_vec(),
)
.await;
}
// If we don't ack it, it should continue to be returned // Successfully get it
assert_eq!(msg, bitcoin.next(Service::Coordinator).await); let bitcoin = MessageQueue::new(
Service::Processor(NetworkId::Bitcoin),
rpc.clone(),
Zeroizing::new(priv_keys[&NetworkId::Bitcoin]),
);
let msg = bitcoin.next(Service::Coordinator).await;
assert_eq!(msg.from, Service::Coordinator);
assert_eq!(msg.id, 0);
assert_eq!(&msg.msg, b"Hello, World!");
// Acknowledging it should yield the next message // If we don't ack it, it should continue to be returned
bitcoin.ack(Service::Coordinator, 0).await; assert_eq!(msg, bitcoin.next(Service::Coordinator).await);
let next_msg = bitcoin.next(Service::Coordinator).await; // Acknowledging it should yield the next message
assert!(msg != next_msg); bitcoin.ack(Service::Coordinator, 0).await;
assert_eq!(next_msg.from, Service::Coordinator);
assert_eq!(next_msg.id, 1);
assert_eq!(&next_msg.msg, b"Hello, World, again!");
bitcoin.ack(Service::Coordinator, 1).await;
// No further messages should be available let next_msg = bitcoin.next(Service::Coordinator).await;
tokio::time::timeout(core::time::Duration::from_secs(10), bitcoin.next(Service::Coordinator)) assert!(msg != next_msg);
.await assert_eq!(next_msg.from, Service::Coordinator);
.unwrap_err(); assert_eq!(next_msg.id, 1);
assert_eq!(&next_msg.msg, b"Hello, World, again!");
bitcoin.ack(Service::Coordinator, 1).await;
// Queueing to a distinct processor should work, with a unique ID // No further messages should be available
coordinator tokio::time::timeout(
.queue( core::time::Duration::from_secs(10),
Metadata { bitcoin.next(Service::Coordinator),
from: Service::Coordinator,
to: Service::Processor(NetworkId::Monero),
// Intents should be per-from-to, making this valid
intent: b"intent".to_vec(),
},
b"Hello, World!".to_vec(),
) )
.await;
let monero = MessageQueue::new(
Service::Processor(NetworkId::Monero),
rpc,
Zeroizing::new(priv_keys[&NetworkId::Monero]),
);
assert_eq!(monero.next(Service::Coordinator).await.id, 0);
monero.ack(Service::Coordinator, 0).await;
tokio::time::timeout(core::time::Duration::from_secs(10), monero.next(Service::Coordinator))
.await .await
.unwrap_err(); .unwrap_err();
// Queueing to a distinct processor should work, with a unique ID
coordinator
.queue(
Metadata {
from: Service::Coordinator,
to: Service::Processor(NetworkId::Monero),
// Intents should be per-from-to, making this valid
intent: b"intent".to_vec(),
},
b"Hello, World!".to_vec(),
)
.await;
let monero = MessageQueue::new(
Service::Processor(NetworkId::Monero),
rpc,
Zeroizing::new(priv_keys[&NetworkId::Monero]),
);
assert_eq!(monero.next(Service::Coordinator).await.id, 0);
monero.ack(Service::Coordinator, 0).await;
tokio::time::timeout(
core::time::Duration::from_secs(10),
monero.next(Service::Coordinator),
)
.await
.unwrap_err();
})
.await
.unwrap();
}) })
.await .await;
.unwrap();
});
} }

View File

@@ -24,7 +24,20 @@ mod tests;
static UNIQUE_ID: OnceLock<Mutex<u16>> = OnceLock::new(); static UNIQUE_ID: OnceLock<Mutex<u16>> = OnceLock::new();
pub fn processor_instance( fn network_str(network: NetworkId) -> &'static str {
match network {
NetworkId::Serai => panic!("starting a processor for Serai"),
NetworkId::Bitcoin => "bitcoin",
NetworkId::Ethereum => "ethereum",
NetworkId::Monero => "monero",
}
}
pub fn processor_docker_name(network: NetworkId) -> String {
format!("{}-processor", network_str(network))
}
pub async fn processor_instance(
network: NetworkId, network: NetworkId,
port: u32, port: u32,
message_queue_key: <Ristretto as Ciphersuite>::F, message_queue_key: <Ristretto as Ciphersuite>::F,
@@ -32,17 +45,12 @@ pub fn processor_instance(
let mut entropy = [0; 32]; let mut entropy = [0; 32];
OsRng.fill_bytes(&mut entropy); OsRng.fill_bytes(&mut entropy);
let network_str = match network { let network_str = network_str(network);
NetworkId::Serai => panic!("starting a processor for Serai"), serai_docker_tests::build(processor_docker_name(network)).await;
NetworkId::Bitcoin => "bitcoin",
NetworkId::Ethereum => "ethereum",
NetworkId::Monero => "monero",
};
let image = format!("{network_str}-processor");
serai_docker_tests::build(image.clone());
TestBodySpecification::with_image( TestBodySpecification::with_image(
Image::with_repository(format!("serai-dev-{image}")).pull_policy(PullPolicy::Never), Image::with_repository(format!("serai-dev-{}", processor_docker_name(network)))
.pull_policy(PullPolicy::Never),
) )
.replace_env( .replace_env(
[ [
@@ -58,17 +66,23 @@ pub fn processor_instance(
) )
} }
pub fn docker_names(network: NetworkId) -> Vec<String> {
vec![network_docker_name(network), processor_docker_name(network)]
}
pub type Handles = (String, String, String); pub type Handles = (String, String, String);
pub fn processor_stack( pub async fn processor_stack(
network: NetworkId, network: NetworkId,
) -> (Handles, <Ristretto as Ciphersuite>::F, Vec<TestBodySpecification>) { ) -> (Handles, <Ristretto as Ciphersuite>::F, Vec<TestBodySpecification>) {
let (network_composition, network_rpc_port) = network_instance(network); serai_docker_tests::build_batch(docker_names(network)).await;
let (network_composition, network_rpc_port) = network_instance(network).await;
let (coord_key, message_queue_keys, message_queue_composition) = let (coord_key, message_queue_keys, message_queue_composition) =
serai_message_queue_tests::instance(); serai_message_queue_tests::instance().await;
let processor_composition = let processor_composition =
processor_instance(network, network_rpc_port, message_queue_keys[&network]); processor_instance(network, network_rpc_port, message_queue_keys[&network]).await;
// Give every item in this stack a unique ID // Give every item in this stack a unique ID
// Uses a Mutex as we can't generate a 8-byte random ID without hitting hostname length limits // Uses a Mutex as we can't generate a 8-byte random ID without hitting hostname length limits

View File

@@ -21,8 +21,19 @@ pub const RPC_PASS: &str = "seraidex";
pub const BTC_PORT: u32 = 8332; pub const BTC_PORT: u32 = 8332;
pub const XMR_PORT: u32 = 18081; pub const XMR_PORT: u32 = 18081;
pub fn bitcoin_instance() -> (TestBodySpecification, u32) { pub fn network_docker_name(network: NetworkId) -> String {
serai_docker_tests::build("bitcoin".to_string()); match network {
NetworkId::Serai => {
panic!("asking for docker name for external network Serai, which isn't external")
}
NetworkId::Bitcoin => "bitcoin".to_string(),
NetworkId::Ethereum => todo!(),
NetworkId::Monero => "monero".to_string(),
}
}
pub async fn bitcoin_instance() -> (TestBodySpecification, u32) {
serai_docker_tests::build(network_docker_name(NetworkId::Bitcoin)).await;
let composition = TestBodySpecification::with_image( let composition = TestBodySpecification::with_image(
Image::with_repository("serai-dev-bitcoin").pull_policy(PullPolicy::Never), Image::with_repository("serai-dev-bitcoin").pull_policy(PullPolicy::Never),
@@ -41,8 +52,8 @@ pub fn bitcoin_instance() -> (TestBodySpecification, u32) {
(composition, BTC_PORT) (composition, BTC_PORT)
} }
pub fn monero_instance() -> (TestBodySpecification, u32) { pub async fn monero_instance() -> (TestBodySpecification, u32) {
serai_docker_tests::build("monero".to_string()); serai_docker_tests::build(network_docker_name(NetworkId::Monero)).await;
let composition = TestBodySpecification::with_image( let composition = TestBodySpecification::with_image(
Image::with_repository("serai-dev-monero").pull_policy(PullPolicy::Never), Image::with_repository("serai-dev-monero").pull_policy(PullPolicy::Never),
@@ -63,11 +74,11 @@ pub fn monero_instance() -> (TestBodySpecification, u32) {
(composition, XMR_PORT) (composition, XMR_PORT)
} }
pub fn network_instance(network: NetworkId) -> (TestBodySpecification, u32) { pub async fn network_instance(network: NetworkId) -> (TestBodySpecification, u32) {
match network { match network {
NetworkId::Bitcoin => bitcoin_instance(), NetworkId::Bitcoin => bitcoin_instance().await,
NetworkId::Ethereum => todo!(), NetworkId::Ethereum => todo!(),
NetworkId::Monero => monero_instance(), NetworkId::Monero => monero_instance().await,
NetworkId::Serai => { NetworkId::Serai => {
panic!("Serai is not a valid network to spawn an instance of for a processor") panic!("Serai is not a valid network to spawn an instance of for a processor")
} }

View File

@@ -191,164 +191,167 @@ pub(crate) async fn substrate_block(
} }
} }
#[test] #[tokio::test]
fn batch_test() { async fn batch_test() {
for network in [NetworkId::Bitcoin, NetworkId::Monero] { for network in [NetworkId::Bitcoin, NetworkId::Monero] {
let (coordinators, test) = new_test(network); let (coordinators, test) = new_test(network).await;
test.run(|ops| async move { test
tokio::time::sleep(Duration::from_secs(1)).await; .run_async(|ops| async move {
let mut coordinators = coordinators
.into_iter()
.map(|(handles, key)| Coordinator::new(network, &ops, handles, key))
.collect::<Vec<_>>();
// Create a wallet before we start generating keys
let mut wallet = Wallet::new(network, &ops, coordinators[0].network_handle.clone()).await;
coordinators[0].sync(&ops, &coordinators[1 ..]).await;
// Generate keys
let key_pair = key_gen(&mut coordinators).await;
// Now we we have to mine blocks to activate the key
// (the first key is activated when the network's time as of a block exceeds the Serai time
// it was confirmed at)
// Mine multiple sets of medians to ensure the median is sufficiently advanced
for _ in 0 .. (10 * confirmations(network)) {
coordinators[0].add_block(&ops).await;
tokio::time::sleep(Duration::from_secs(1)).await; tokio::time::sleep(Duration::from_secs(1)).await;
}
coordinators[0].sync(&ops, &coordinators[1 ..]).await;
// Run twice, once with an instruction and once without let mut coordinators = coordinators
let substrate_block_num = (OsRng.next_u64() % 4_000_000_000u64) + 1; .into_iter()
for i in 0 .. 2 { .map(|(handles, key)| Coordinator::new(network, &ops, handles, key))
let mut serai_address = [0; 32]; .collect::<Vec<_>>();
OsRng.fill_bytes(&mut serai_address);
let instruction =
if i == 0 { Some(InInstruction::Transfer(SeraiAddress(serai_address))) } else { None };
// Send into the processor's wallet // Create a wallet before we start generating keys
let (tx, balance_sent) = let mut wallet = Wallet::new(network, &ops, coordinators[0].network_handle.clone()).await;
wallet.send_to_address(&ops, &key_pair.1, instruction.clone()).await; coordinators[0].sync(&ops, &coordinators[1 ..]).await;
for coordinator in &mut coordinators {
coordinator.publish_transacton(&ops, &tx).await;
}
// Put the TX past the confirmation depth // Generate keys
let mut block_with_tx = None; let key_pair = key_gen(&mut coordinators).await;
for _ in 0 .. confirmations(network) {
let (hash, _) = coordinators[0].add_block(&ops).await; // Now we we have to mine blocks to activate the key
if block_with_tx.is_none() { // (the first key is activated when the network's time as of a block exceeds the Serai time
block_with_tx = Some(hash); // it was confirmed at)
} // Mine multiple sets of medians to ensure the median is sufficiently advanced
for _ in 0 .. (10 * confirmations(network)) {
coordinators[0].add_block(&ops).await;
tokio::time::sleep(Duration::from_secs(1)).await;
} }
coordinators[0].sync(&ops, &coordinators[1 ..]).await; coordinators[0].sync(&ops, &coordinators[1 ..]).await;
// Sleep for 10s // Run twice, once with an instruction and once without
// The scanner works on a 5s interval, so this leaves a few s for any processing/latency let substrate_block_num = (OsRng.next_u64() % 4_000_000_000u64) + 1;
tokio::time::sleep(Duration::from_secs(10)).await; for i in 0 .. 2 {
let mut serai_address = [0; 32];
OsRng.fill_bytes(&mut serai_address);
let instruction =
if i == 0 { Some(InInstruction::Transfer(SeraiAddress(serai_address))) } else { None };
let expected_batch = Batch { // Send into the processor's wallet
network, let (tx, balance_sent) =
id: i, wallet.send_to_address(&ops, &key_pair.1, instruction.clone()).await;
block: BlockHash(block_with_tx.unwrap()), for coordinator in &mut coordinators {
instructions: if let Some(instruction) = &instruction { coordinator.publish_transacton(&ops, &tx).await;
vec![InInstructionWithBalance {
instruction: instruction.clone(),
balance: Balance {
coin: balance_sent.coin,
amount: Amount(
balance_sent.amount.0 -
(2 * if network == NetworkId::Bitcoin {
Bitcoin::COST_TO_AGGREGATE
} else {
Monero::COST_TO_AGGREGATE
}),
),
},
}]
} else {
// This shouldn't have an instruction as we didn't add any data into the TX we sent
// Empty batches remain valuable as they let us achieve consensus on the block and spend
// contained outputs
vec![]
},
};
// Make sure the processors picked it up by checking they're trying to sign a batch for it
let (mut id, mut preprocesses) =
recv_batch_preprocesses(&mut coordinators, Session(0), &expected_batch, 0).await;
// Trigger a random amount of re-attempts
for attempt in 1 ..= u32::try_from(OsRng.next_u64() % 4).unwrap() {
// TODO: Double check how the processor handles this ID field
// It should be able to assert its perfectly sequential
id.attempt = attempt;
for coordinator in coordinators.iter_mut() {
coordinator
.send_message(messages::coordinator::CoordinatorMessage::BatchReattempt {
id: id.clone(),
})
.await;
} }
(id, preprocesses) =
recv_batch_preprocesses(&mut coordinators, Session(0), &expected_batch, attempt).await;
}
// Continue with signing the batch // Put the TX past the confirmation depth
let batch = sign_batch(&mut coordinators, key_pair.0 .0, id, preprocesses).await; let mut block_with_tx = None;
for _ in 0 .. confirmations(network) {
// Check it let (hash, _) = coordinators[0].add_block(&ops).await;
assert_eq!(batch.batch, expected_batch); if block_with_tx.is_none() {
block_with_tx = Some(hash);
// Fire a SubstrateBlock }
let serai_time = }
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); coordinators[0].sync(&ops, &coordinators[1 ..]).await;
for coordinator in &mut coordinators {
let plans = substrate_block( // Sleep for 10s
coordinator, // The scanner works on a 5s interval, so this leaves a few s for any processing/latency
messages::substrate::CoordinatorMessage::SubstrateBlock { tokio::time::sleep(Duration::from_secs(10)).await;
context: SubstrateContext {
serai_time, let expected_batch = Batch {
network_latest_finalized_block: batch.batch.block, network,
}, id: i,
block: substrate_block_num + u64::from(i), block: BlockHash(block_with_tx.unwrap()),
burns: vec![], instructions: if let Some(instruction) = &instruction {
batches: vec![batch.batch.id], vec![InInstructionWithBalance {
}, instruction: instruction.clone(),
) balance: Balance {
.await; coin: balance_sent.coin,
if instruction.is_some() || (instruction.is_none() && (network == NetworkId::Monero)) { amount: Amount(
assert!(plans.is_empty()); balance_sent.amount.0 -
} else { (2 * if network == NetworkId::Bitcoin {
// If no instruction was used, and the processor csn presume the origin, it'd have Bitcoin::COST_TO_AGGREGATE
// created a refund Plan } else {
assert_eq!(plans.len(), 1); Monero::COST_TO_AGGREGATE
} }),
} ),
} },
}]
// With the latter InInstruction not existing, we should've triggered a refund if the origin } else {
// was detectable // This shouldn't have an instruction as we didn't add any data into the TX we sent
// Check this is trying to sign a Plan // Empty batches remain valuable as they let us achieve consensus on the block and
if network != NetworkId::Monero { // spend contained outputs
let mut refund_id = None; vec![]
for coordinator in &mut coordinators { },
match coordinator.recv_message().await { };
messages::ProcessorMessage::Sign(messages::sign::ProcessorMessage::Preprocess {
id, // Make sure the processors picked it up by checking they're trying to sign a batch for it
.. let (mut id, mut preprocesses) =
}) => { recv_batch_preprocesses(&mut coordinators, Session(0), &expected_batch, 0).await;
if refund_id.is_none() { // Trigger a random amount of re-attempts
refund_id = Some(id.clone()); for attempt in 1 ..= u32::try_from(OsRng.next_u64() % 4).unwrap() {
} // TODO: Double check how the processor handles this ID field
assert_eq!(refund_id.as_ref().unwrap(), &id); // It should be able to assert its perfectly sequential
id.attempt = attempt;
for coordinator in coordinators.iter_mut() {
coordinator
.send_message(messages::coordinator::CoordinatorMessage::BatchReattempt {
id: id.clone(),
})
.await;
}
(id, preprocesses) =
recv_batch_preprocesses(&mut coordinators, Session(0), &expected_batch, attempt)
.await;
}
// Continue with signing the batch
let batch = sign_batch(&mut coordinators, key_pair.0 .0, id, preprocesses).await;
// Check it
assert_eq!(batch.batch, expected_batch);
// Fire a SubstrateBlock
let serai_time =
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
for coordinator in &mut coordinators {
let plans = substrate_block(
coordinator,
messages::substrate::CoordinatorMessage::SubstrateBlock {
context: SubstrateContext {
serai_time,
network_latest_finalized_block: batch.batch.block,
},
block: substrate_block_num + u64::from(i),
burns: vec![],
batches: vec![batch.batch.id],
},
)
.await;
if instruction.is_some() || (instruction.is_none() && (network == NetworkId::Monero)) {
assert!(plans.is_empty());
} else {
// If no instruction was used, and the processor csn presume the origin, it'd have
// created a refund Plan
assert_eq!(plans.len(), 1);
} }
_ => panic!("processor didn't send preprocess for expected refund transaction"),
} }
} }
}
}); // With the latter InInstruction not existing, we should've triggered a refund if the origin
// was detectable
// Check this is trying to sign a Plan
if network != NetworkId::Monero {
let mut refund_id = None;
for coordinator in &mut coordinators {
match coordinator.recv_message().await {
messages::ProcessorMessage::Sign(messages::sign::ProcessorMessage::Preprocess {
id,
..
}) => {
if refund_id.is_none() {
refund_id = Some(id.clone());
}
assert_eq!(refund_id.as_ref().unwrap(), &id);
}
_ => panic!("processor didn't send preprocess for expected refund transaction"),
}
}
}
})
.await;
} }
} }

View File

@@ -142,23 +142,25 @@ pub(crate) async fn key_gen(coordinators: &mut [Coordinator]) -> KeyPair {
key_pair key_pair
} }
#[test] #[tokio::test]
fn key_gen_test() { async fn key_gen_test() {
for network in [NetworkId::Bitcoin, NetworkId::Monero] { for network in [NetworkId::Bitcoin, NetworkId::Monero] {
let (coordinators, test) = new_test(network); let (coordinators, test) = new_test(network).await;
test.run(|ops| async move { test
// Sleep for a second for the message-queue to boot .run_async(|ops| async move {
// It isn't an error to start immediately, it just silences an error // Sleep for a second for the message-queue to boot
tokio::time::sleep(core::time::Duration::from_secs(1)).await; // It isn't an error to start immediately, it just silences an error
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
// Connect to the Message Queues as the coordinator // Connect to the Message Queues as the coordinator
let mut coordinators = coordinators let mut coordinators = coordinators
.into_iter() .into_iter()
.map(|(handles, key)| Coordinator::new(network, &ops, handles, key)) .map(|(handles, key)| Coordinator::new(network, &ops, handles, key))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
key_gen(&mut coordinators).await; key_gen(&mut coordinators).await;
}); })
.await;
} }
} }

View File

@@ -17,11 +17,13 @@ mod send;
pub(crate) const COORDINATORS: usize = 4; pub(crate) const COORDINATORS: usize = 4;
pub(crate) const THRESHOLD: usize = ((COORDINATORS * 2) / 3) + 1; pub(crate) const THRESHOLD: usize = ((COORDINATORS * 2) / 3) + 1;
fn new_test(network: NetworkId) -> (Vec<(Handles, <Ristretto as Ciphersuite>::F)>, DockerTest) { pub(crate) async fn new_test(
network: NetworkId,
) -> (Vec<(Handles, <Ristretto as Ciphersuite>::F)>, DockerTest) {
let mut coordinators = vec![]; let mut coordinators = vec![];
let mut test = DockerTest::new().with_network(dockertest::Network::Isolated); let mut test = DockerTest::new().with_network(dockertest::Network::Isolated);
for _ in 0 .. COORDINATORS { for _ in 0 .. COORDINATORS {
let (handles, coord_key, compositions) = processor_stack(network); let (handles, coord_key, compositions) = processor_stack(network).await;
coordinators.push((handles, coord_key)); coordinators.push((handles, coord_key));
for composition in compositions { for composition in compositions {
test.provide_container(composition); test.provide_container(composition);

View File

@@ -142,163 +142,166 @@ pub(crate) async fn sign_tx(
tx.unwrap() tx.unwrap()
} }
#[test] #[tokio::test]
fn send_test() { async fn send_test() {
for network in [NetworkId::Bitcoin, NetworkId::Monero] { for network in [NetworkId::Bitcoin, NetworkId::Monero] {
let (coordinators, test) = new_test(network); let (coordinators, test) = new_test(network).await;
test.run(|ops| async move { test
tokio::time::sleep(Duration::from_secs(1)).await; .run_async(|ops| async move {
let mut coordinators = coordinators
.into_iter()
.map(|(handles, key)| Coordinator::new(network, &ops, handles, key))
.collect::<Vec<_>>();
// Create a wallet before we start generating keys
let mut wallet = Wallet::new(network, &ops, coordinators[0].network_handle.clone()).await;
coordinators[0].sync(&ops, &coordinators[1 ..]).await;
// Generate keys
let key_pair = key_gen(&mut coordinators).await;
// Now we we have to mine blocks to activate the key
// (the first key is activated when the network's time as of a block exceeds the Serai time
// it was confirmed at)
// Mine multiple sets of medians to ensure the median is sufficiently advanced
for _ in 0 .. (10 * confirmations(network)) {
coordinators[0].add_block(&ops).await;
tokio::time::sleep(Duration::from_secs(1)).await; tokio::time::sleep(Duration::from_secs(1)).await;
}
coordinators[0].sync(&ops, &coordinators[1 ..]).await;
// Send into the processor's wallet let mut coordinators = coordinators
let (tx, balance_sent) = wallet.send_to_address(&ops, &key_pair.1, None).await; .into_iter()
for coordinator in &mut coordinators { .map(|(handles, key)| Coordinator::new(network, &ops, handles, key))
coordinator.publish_transacton(&ops, &tx).await; .collect::<Vec<_>>();
}
// Put the TX past the confirmation depth // Create a wallet before we start generating keys
let mut block_with_tx = None; let mut wallet = Wallet::new(network, &ops, coordinators[0].network_handle.clone()).await;
for _ in 0 .. confirmations(network) { coordinators[0].sync(&ops, &coordinators[1 ..]).await;
let (hash, _) = coordinators[0].add_block(&ops).await;
if block_with_tx.is_none() { // Generate keys
block_with_tx = Some(hash); let key_pair = key_gen(&mut coordinators).await;
// Now we we have to mine blocks to activate the key
// (the first key is activated when the network's time as of a block exceeds the Serai time
// it was confirmed at)
// Mine multiple sets of medians to ensure the median is sufficiently advanced
for _ in 0 .. (10 * confirmations(network)) {
coordinators[0].add_block(&ops).await;
tokio::time::sleep(Duration::from_secs(1)).await;
} }
} coordinators[0].sync(&ops, &coordinators[1 ..]).await;
coordinators[0].sync(&ops, &coordinators[1 ..]).await;
// Sleep for 10s // Send into the processor's wallet
// The scanner works on a 5s interval, so this leaves a few s for any processing/latency let (tx, balance_sent) = wallet.send_to_address(&ops, &key_pair.1, None).await;
tokio::time::sleep(Duration::from_secs(10)).await; for coordinator in &mut coordinators {
let expected_batch =
Batch { network, id: 0, block: BlockHash(block_with_tx.unwrap()), instructions: vec![] };
// Make sure the proceessors picked it up by checking they're trying to sign a batch for it
let (id, preprocesses) =
recv_batch_preprocesses(&mut coordinators, Session(0), &expected_batch, 0).await;
// Continue with signing the batch
let batch = sign_batch(&mut coordinators, key_pair.0 .0, id, preprocesses).await;
// Check it
assert_eq!(batch.batch, expected_batch);
// Fire a SubstrateBlock with a burn
let substrate_block_num = (OsRng.next_u64() % 4_000_000_000u64) + 1;
let serai_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
let mut plans = vec![];
for coordinator in &mut coordinators {
let these_plans = substrate_block(
coordinator,
messages::substrate::CoordinatorMessage::SubstrateBlock {
context: SubstrateContext {
serai_time,
network_latest_finalized_block: batch.batch.block,
},
block: substrate_block_num,
burns: vec![OutInstructionWithBalance {
instruction: OutInstruction { address: wallet.address(), data: None },
balance: balance_sent,
}],
batches: vec![batch.batch.id],
},
)
.await;
if plans.is_empty() {
plans = these_plans;
} else {
assert_eq!(plans, these_plans);
}
}
assert_eq!(plans.len(), 1);
// Start signing the TX
let (mut id, mut preprocesses) =
recv_sign_preprocesses(&mut coordinators, Session(0), 0).await;
assert_eq!(id, SignId { session: Session(0), id: plans[0].id, attempt: 0 });
// Trigger a random amount of re-attempts
for attempt in 1 ..= u32::try_from(OsRng.next_u64() % 4).unwrap() {
// TODO: Double check how the processor handles this ID field
// It should be able to assert its perfectly sequential
id.attempt = attempt;
for coordinator in coordinators.iter_mut() {
coordinator
.send_message(messages::sign::CoordinatorMessage::Reattempt { id: id.clone() })
.await;
}
(id, preprocesses) = recv_sign_preprocesses(&mut coordinators, Session(0), attempt).await;
}
let participating = preprocesses.keys().cloned().collect::<Vec<_>>();
let tx_id = sign_tx(&mut coordinators, Session(0), id.clone(), preprocesses).await;
// Make sure all participating nodes published the TX
let participating =
participating.iter().map(|p| usize::from(u16::from(*p) - 1)).collect::<HashSet<_>>();
for participant in &participating {
assert!(coordinators[*participant].get_transaction(&ops, &tx_id).await.is_some());
}
// Publish this transaction to the left out nodes
let tx = coordinators[*participating.iter().next().unwrap()]
.get_transaction(&ops, &tx_id)
.await
.unwrap();
for (i, coordinator) in coordinators.iter_mut().enumerate() {
if !participating.contains(&i) {
coordinator.publish_transacton(&ops, &tx).await; coordinator.publish_transacton(&ops, &tx).await;
// Tell them of it as a completion of the relevant signing nodess }
coordinator
.send_message(messages::sign::CoordinatorMessage::Completed { // Put the TX past the confirmation depth
session: Session(0), let mut block_with_tx = None;
id: id.id, for _ in 0 .. confirmations(network) {
tx: tx_id.clone(), let (hash, _) = coordinators[0].add_block(&ops).await;
}) if block_with_tx.is_none() {
.await; block_with_tx = Some(hash);
// Verify they send Completed back
match coordinator.recv_message().await {
messages::ProcessorMessage::Sign(messages::sign::ProcessorMessage::Completed {
session,
id: this_id,
tx: this_tx,
}) => {
assert_eq!(session, Session(0));
assert_eq!(&this_id, &id.id);
assert_eq!(this_tx, tx_id);
}
_ => panic!("processor didn't send Completed"),
} }
} }
} coordinators[0].sync(&ops, &coordinators[1 ..]).await;
// TODO: Test the Eventuality from the blockchain, instead of from the coordinator // Sleep for 10s
// TODO: Test what happenns when Completed is sent with a non-existent TX ID // The scanner works on a 5s interval, so this leaves a few s for any processing/latency
// TODO: Test what happenns when Completed is sent with a non-completing TX ID tokio::time::sleep(Duration::from_secs(10)).await;
});
let expected_batch =
Batch { network, id: 0, block: BlockHash(block_with_tx.unwrap()), instructions: vec![] };
// Make sure the proceessors picked it up by checking they're trying to sign a batch for it
let (id, preprocesses) =
recv_batch_preprocesses(&mut coordinators, Session(0), &expected_batch, 0).await;
// Continue with signing the batch
let batch = sign_batch(&mut coordinators, key_pair.0 .0, id, preprocesses).await;
// Check it
assert_eq!(batch.batch, expected_batch);
// Fire a SubstrateBlock with a burn
let substrate_block_num = (OsRng.next_u64() % 4_000_000_000u64) + 1;
let serai_time =
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
let mut plans = vec![];
for coordinator in &mut coordinators {
let these_plans = substrate_block(
coordinator,
messages::substrate::CoordinatorMessage::SubstrateBlock {
context: SubstrateContext {
serai_time,
network_latest_finalized_block: batch.batch.block,
},
block: substrate_block_num,
burns: vec![OutInstructionWithBalance {
instruction: OutInstruction { address: wallet.address(), data: None },
balance: balance_sent,
}],
batches: vec![batch.batch.id],
},
)
.await;
if plans.is_empty() {
plans = these_plans;
} else {
assert_eq!(plans, these_plans);
}
}
assert_eq!(plans.len(), 1);
// Start signing the TX
let (mut id, mut preprocesses) =
recv_sign_preprocesses(&mut coordinators, Session(0), 0).await;
assert_eq!(id, SignId { session: Session(0), id: plans[0].id, attempt: 0 });
// Trigger a random amount of re-attempts
for attempt in 1 ..= u32::try_from(OsRng.next_u64() % 4).unwrap() {
// TODO: Double check how the processor handles this ID field
// It should be able to assert its perfectly sequential
id.attempt = attempt;
for coordinator in coordinators.iter_mut() {
coordinator
.send_message(messages::sign::CoordinatorMessage::Reattempt { id: id.clone() })
.await;
}
(id, preprocesses) = recv_sign_preprocesses(&mut coordinators, Session(0), attempt).await;
}
let participating = preprocesses.keys().cloned().collect::<Vec<_>>();
let tx_id = sign_tx(&mut coordinators, Session(0), id.clone(), preprocesses).await;
// Make sure all participating nodes published the TX
let participating =
participating.iter().map(|p| usize::from(u16::from(*p) - 1)).collect::<HashSet<_>>();
for participant in &participating {
assert!(coordinators[*participant].get_transaction(&ops, &tx_id).await.is_some());
}
// Publish this transaction to the left out nodes
let tx = coordinators[*participating.iter().next().unwrap()]
.get_transaction(&ops, &tx_id)
.await
.unwrap();
for (i, coordinator) in coordinators.iter_mut().enumerate() {
if !participating.contains(&i) {
coordinator.publish_transacton(&ops, &tx).await;
// Tell them of it as a completion of the relevant signing nodess
coordinator
.send_message(messages::sign::CoordinatorMessage::Completed {
session: Session(0),
id: id.id,
tx: tx_id.clone(),
})
.await;
// Verify they send Completed back
match coordinator.recv_message().await {
messages::ProcessorMessage::Sign(messages::sign::ProcessorMessage::Completed {
session,
id: this_id,
tx: this_tx,
}) => {
assert_eq!(session, Session(0));
assert_eq!(&this_id, &id.id);
assert_eq!(this_tx, tx_id);
}
_ => panic!("processor didn't send Completed"),
}
}
}
// TODO: Test the Eventuality from the blockchain, instead of from the coordinator
// TODO: Test what happenns when Completed is sent with a non-existent TX ID
// TODO: Test what happenns when Completed is sent with a non-completing TX ID
})
.await;
} }
} }

View File

@@ -1,5 +1,5 @@
#[test] #[tokio::test]
pub fn reproducibly_builds() { pub async fn reproducibly_builds() {
use std::{collections::HashSet, process::Command}; use std::{collections::HashSet, process::Command};
use rand_core::{RngCore, OsRng}; use rand_core::{RngCore, OsRng};
@@ -9,7 +9,7 @@ pub fn reproducibly_builds() {
const RUNS: usize = 3; const RUNS: usize = 3;
const TIMEOUT: u16 = 180 * 60; // 3 hours const TIMEOUT: u16 = 180 * 60; // 3 hours
serai_docker_tests::build("runtime".to_string()); serai_docker_tests::build("runtime".to_string()).await;
let mut ids = vec![[0; 8]; RUNS]; let mut ids = vec![[0; 8]; RUNS];
for id in &mut ids { for id in &mut ids {
@@ -38,64 +38,66 @@ pub fn reproducibly_builds() {
); );
} }
test.run(|_| async { test
let ids = ids; .run_async(|_| async {
let mut containers = vec![]; let ids = ids;
for container in String::from_utf8( let mut containers = vec![];
Command::new("docker").arg("ps").arg("--format").arg("{{.Names}}").output().unwrap().stdout, for container in String::from_utf8(
) Command::new("docker").arg("ps").arg("--format").arg("{{.Names}}").output().unwrap().stdout,
.expect("output wasn't utf-8") )
.lines() .expect("output wasn't utf-8")
{ .lines()
for id in &ids { {
if container.contains(&hex::encode(id)) { for id in &ids {
containers.push(container.trim().to_string()); if container.contains(&hex::encode(id)) {
containers.push(container.trim().to_string());
}
} }
} }
} assert_eq!(containers.len(), RUNS, "couldn't find all containers");
assert_eq!(containers.len(), RUNS, "couldn't find all containers");
let mut res = vec![None; RUNS]; let mut res = vec![None; RUNS];
'attempt: for _ in 0 .. (TIMEOUT / 10) { 'attempt: for _ in 0 .. (TIMEOUT / 10) {
tokio::time::sleep(core::time::Duration::from_secs(10)).await; tokio::time::sleep(core::time::Duration::from_secs(10)).await;
'runner: for (i, container) in containers.iter().enumerate() { 'runner: for (i, container) in containers.iter().enumerate() {
if res[i].is_some() { if res[i].is_some() {
continue; continue;
}
let logs = Command::new("docker").arg("logs").arg(container).output().unwrap();
let Some(last_log) =
std::str::from_utf8(&logs.stdout).expect("output wasn't utf-8").lines().last()
else {
continue 'runner;
};
let split = last_log.split("Runtime hash: ").collect::<Vec<_>>();
if split.len() == 2 {
res[i] = Some(split[1].to_string());
continue 'runner;
}
} }
let logs = Command::new("docker").arg("logs").arg(container).output().unwrap(); for item in &res {
let Some(last_log) = if item.is_none() {
std::str::from_utf8(&logs.stdout).expect("output wasn't utf-8").lines().last() continue 'attempt;
else { }
continue 'runner;
};
let split = last_log.split("Runtime hash: ").collect::<Vec<_>>();
if split.len() == 2 {
res[i] = Some(split[1].to_string());
continue 'runner;
} }
break;
} }
// If we didn't get results from all runners, panic
for item in &res { for item in &res {
if item.is_none() { if item.is_none() {
continue 'attempt; panic!("couldn't get runtime hashes within allowed time");
} }
} }
break; let mut identical = HashSet::new();
} for res in res.clone() {
identical.insert(res.unwrap());
// If we didn't get results from all runners, panic
for item in &res {
if item.is_none() {
panic!("couldn't get runtime hashes within allowed time");
} }
} assert_eq!(identical.len(), 1, "got different runtime hashes {:?}", res);
let mut identical = HashSet::new(); })
for res in res.clone() { .await;
identical.insert(res.unwrap());
}
assert_eq!(identical.len(), 1, "got different runtime hashes {:?}", res);
});
} }