use std::{ sync::{OnceLock, Mutex}, time::Duration, fs, }; use serai_client::{primitives::NetworkId, Serai}; use dockertest::{ LogAction, LogPolicy, LogSource, LogOptions, StartPolicy, Composition, DockerOperations, }; #[cfg(test)] mod tests; static UNIQUE_ID: OnceLock> = OnceLock::new(); use serai_processor_tests::{RPC_USER, RPC_PASS, network_instance, processor_instance}; use serai_message_queue_tests::instance as message_queue_instance; use serai_coordinator_tests::{coordinator_instance, serai_composition}; #[allow(unused)] #[derive(Clone, Debug)] pub struct Handles { bitcoin: (String, u32), bitcoin_processor: String, monero: (String, u32), monero_processor: String, message_queue: String, coordinator: String, serai: String, } pub fn full_stack(name: &str) -> (Handles, Vec) { let (coord_key, message_queue_keys, message_queue_composition) = message_queue_instance(); let (bitcoin_composition, bitcoin_port) = network_instance(NetworkId::Bitcoin); let bitcoin_processor_composition = processor_instance(NetworkId::Bitcoin, bitcoin_port, message_queue_keys[&NetworkId::Bitcoin]); let (monero_composition, monero_port) = network_instance(NetworkId::Monero); let monero_processor_composition = processor_instance(NetworkId::Monero, monero_port, message_queue_keys[&NetworkId::Monero]); let coordinator_composition = coordinator_instance(name, coord_key); let serai_composition = serai_composition(name); // Give every item in this stack a unique ID // Uses a Mutex as we can't generate a 8-byte random ID without hitting hostname length limits let (first, unique_id) = { let unique_id_mutex = UNIQUE_ID.get_or_init(|| Mutex::new(0)); let mut unique_id_lock = unique_id_mutex.lock().unwrap(); let first = *unique_id_lock == 0; let unique_id = hex::encode(unique_id_lock.to_be_bytes()); *unique_id_lock += 1; (first, unique_id) }; let logs_path = [std::env::current_dir().unwrap().to_str().unwrap(), ".test-logs", "full-stack"] .iter() .collect::(); if first { let _ = fs::remove_dir_all(&logs_path); fs::create_dir_all(&logs_path).expect("couldn't create logs directory"); assert!( fs::read_dir(&logs_path).expect("couldn't read the logs folder").next().is_none(), "logs folder wasn't empty, despite removing it at the start of the run", ); } let logs_path = logs_path.to_str().unwrap().to_string(); let mut compositions = vec![]; let mut handles = vec![]; for composition in [ message_queue_composition, bitcoin_composition, bitcoin_processor_composition, monero_composition, monero_processor_composition, coordinator_composition, serai_composition, ] { let name = format!("{}-{}", composition.handle(), &unique_id); compositions.push( composition .with_start_policy(StartPolicy::Strict) .with_container_name(name.clone()) .with_log_options(Some(LogOptions { action: if std::env::var("GITHUB_CI") == Ok("true".to_string()) { LogAction::Forward } else { LogAction::ForwardToFile { path: logs_path.clone() } }, policy: LogPolicy::Always, source: LogSource::Both, })), ); handles.push(compositions.last().unwrap().handle()); } let handles = Handles { message_queue: handles.remove(0), bitcoin: (handles.remove(0), bitcoin_port), bitcoin_processor: handles.remove(0), monero: (handles.remove(0), monero_port), monero_processor: handles.remove(0), coordinator: handles.remove(0), serai: handles.remove(0), }; { let bitcoin_processor_composition = compositions.get_mut(2).unwrap(); bitcoin_processor_composition .inject_container_name(handles.message_queue.clone(), "MESSAGE_QUEUE_RPC"); bitcoin_processor_composition .inject_container_name(handles.bitcoin.0.clone(), "NETWORK_RPC_HOSTNAME"); } { let monero_processor_composition = compositions.get_mut(4).unwrap(); monero_processor_composition .inject_container_name(handles.message_queue.clone(), "MESSAGE_QUEUE_RPC"); monero_processor_composition .inject_container_name(handles.monero.0.clone(), "NETWORK_RPC_HOSTNAME"); } let coordinator_composition = compositions.get_mut(5).unwrap(); coordinator_composition.inject_container_name(handles.message_queue.clone(), "MESSAGE_QUEUE_RPC"); coordinator_composition.inject_container_name(handles.serai.clone(), "SERAI_HOSTNAME"); (handles, compositions) } impl Handles { pub async fn serai(&self, ops: &DockerOperations) -> Serai { let serai_rpc = ops.handle(&self.serai).host_port(9944).unwrap(); let serai_rpc = format!("ws://{}:{}", serai_rpc.0, serai_rpc.1); // If the RPC server has yet to start, sleep for up to 60s until it does for _ in 0 .. 60 { tokio::time::sleep(Duration::from_secs(1)).await; let Ok(client) = Serai::new(&serai_rpc).await else { continue }; if client.get_latest_block_hash().await.is_err() { continue; } return client; } panic!("serai RPC server wasn't available after 60s"); } pub async fn bitcoin(&self, ops: &DockerOperations) -> bitcoin_serai::rpc::Rpc { let rpc = ops.handle(&self.bitcoin.0).host_port(self.bitcoin.1).unwrap(); let rpc = format!("http://{RPC_USER}:{RPC_PASS}@{}:{}", rpc.0, rpc.1); // If the RPC server has yet to start, sleep for up to 60s until it does for _ in 0 .. 60 { tokio::time::sleep(Duration::from_secs(1)).await; let Ok(client) = bitcoin_serai::rpc::Rpc::new(rpc.clone()).await else { continue }; return client; } panic!("bitcoin RPC server wasn't available after 60s"); } pub async fn monero( &self, ops: &DockerOperations, ) -> monero_serai::rpc::Rpc { use monero_serai::rpc::HttpRpc; let rpc = ops.handle(&self.monero.0).host_port(self.monero.1).unwrap(); let rpc = format!("http://{RPC_USER}:{RPC_PASS}@{}:{}", rpc.0, rpc.1); // If the RPC server has yet to start, sleep for up to 60s until it does for _ in 0 .. 60 { tokio::time::sleep(Duration::from_secs(1)).await; let Ok(client) = HttpRpc::new(rpc.clone()) else { continue }; if client.get_height().await.is_err() { continue; } return client; } panic!("monero RPC server wasn't available after 60s"); } }