From cbe83956aadcd477b67d1c2ea6e624cf749ca50c Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Fri, 10 Jan 2025 02:24:24 -0500 Subject: [PATCH] Flesh out Coordinator main Lot of TODOs as the APIs are all being routed together. --- Cargo.lock | 2 +- coordinator/Cargo.toml | 11 +- coordinator/src/main.rs | 321 ++++++++++++++++++++++++- coordinator/substrate/src/canonical.rs | 7 +- coordinator/substrate/src/ephemeral.rs | 9 +- coordinator/substrate/src/lib.rs | 20 +- 6 files changed, 347 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 840a6c53..1f67f626 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8319,7 +8319,6 @@ dependencies = [ "borsh", "ciphersuite", "env_logger", - "flexible-transcript", "frost-schnorrkel", "hex", "log", @@ -8340,6 +8339,7 @@ dependencies = [ "serai-task", "sp-application-crypto", "sp-runtime", + "tokio", "tributary-chain", "zalloc", "zeroize", diff --git a/coordinator/Cargo.toml b/coordinator/Cargo.toml index 3ecce4be..38adbf15 100644 --- a/coordinator/Cargo.toml +++ b/coordinator/Cargo.toml @@ -25,7 +25,6 @@ rand_core = { version = "0.6", default-features = false, features = ["std"] } blake2 = { version = "0.10", default-features = false, features = ["std"] } schnorrkel = { version = "0.11", default-features = false, features = ["std"] } -transcript = { package = "flexible-transcript", path = "../crypto/transcript", default-features = false, features = ["std", "recommended"] } ciphersuite = { path = "../crypto/ciphersuite", default-features = false, features = ["std"] } schnorr = { package = "schnorr-signatures", path = "../crypto/schnorr", default-features = false, features = ["std"] } frost = { package = "modular-frost", path = "../crypto/frost" } @@ -42,7 +41,6 @@ messages = { package = "serai-processor-messages", path = "../processor/messages message-queue = { package = "serai-message-queue", path = "../message-queue" } tributary = { package = "tributary-chain", path = "./tributary" } -sp-application-crypto = { git = "https://github.com/serai-dex/substrate", default-features = false, features = ["std"] } serai-client = { path = "../substrate/client", default-features = false, features = ["serai", "borsh"] } hex = { version = "0.4", default-features = false, features = ["std"] } @@ -51,17 +49,14 @@ borsh = { version = "1", default-features = false, features = ["std", "derive", log = { version = "0.4", default-features = false, features = ["std"] } env_logger = { version = "0.10", default-features = false, features = ["humantime"] } +tokio = { version = "1", default-features = false, features = ["time", "sync", "macros", "rt-multi-thread"] } + serai-cosign = { path = "./cosign" } serai-coordinator-substrate = { path = "./substrate" } serai-coordinator-p2p = { path = "./p2p" } serai-coordinator-libp2p-p2p = { path = "./p2p/libp2p" } -[dev-dependencies] -tributary = { package = "tributary-chain", path = "./tributary", features = ["tests"] } -sp-application-crypto = { git = "https://github.com/serai-dex/substrate", default-features = false, features = ["std"] } -sp-runtime = { git = "https://github.com/serai-dex/substrate", default-features = false, features = ["std"] } - [features] -longer-reattempts = [] +longer-reattempts = [] # TODO parity-db = ["serai-db/parity-db"] rocksdb = ["serai-db/rocksdb"] diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 8af0f824..f1090284 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -1,10 +1,329 @@ +use core::{marker::PhantomData, ops::Deref, time::Duration}; +use std::{sync::Arc, time::Instant, collections::HashMap}; + +use zeroize::{Zeroize, Zeroizing}; +use rand_core::{RngCore, OsRng}; + +use blake2::{digest::typenum::U32, Digest, Blake2s}; +use ciphersuite::{ + group::{ff::PrimeField, GroupEncoding}, + Ciphersuite, Ristretto, +}; + +use tokio::sync::mpsc; + +use scale::Encode; +use serai_client::{ + primitives::{NetworkId, PublicKey, SeraiAddress}, + validator_sets::primitives::ValidatorSet, + Serai, +}; +use message_queue::{Service, client::MessageQueue}; + +use ::tributary::Tributary; + +use serai_task::{Task, TaskHandle, ContinuallyRan}; + +use serai_cosign::{SignedCosign, Cosigning}; +use serai_coordinator_substrate::{NewSetInformation, CanonicalEventStream, EphemeralEventStream}; + mod tributary; +use tributary::{Transaction, ScanTributaryTask}; mod p2p { pub use serai_coordinator_p2p::*; pub use serai_coordinator_libp2p_p2p::Libp2p; } -fn main() { +// Use a zeroizing allocator for this entire application +// While secrets should already be zeroized, the presence of secret keys in a networked application +// (at increased risk of OOB reads) justifies the performance hit in case any secrets weren't +// already +#[global_allocator] +static ALLOCATOR: zalloc::ZeroizingAlloc = + zalloc::ZeroizingAlloc(std::alloc::System); + +#[cfg(all(feature = "parity-db", not(feature = "rocksdb")))] +type Db = serai_db::ParityDb; +#[cfg(feature = "rocksdb")] +type Db = serai_db::RocksDB; + +#[allow(unused_variables, unreachable_code)] +fn db(path: &str) -> Db { + #[cfg(all(feature = "parity-db", feature = "rocksdb"))] + panic!("built with parity-db and rocksdb"); + #[cfg(all(feature = "parity-db", not(feature = "rocksdb")))] + let db = serai_db::new_parity_db(path); + #[cfg(feature = "rocksdb")] + let db = serai_db::new_rocksdb(path); + db +} + +fn coordinator_db() -> Db { + let root_path = serai_env::var("DB_PATH").expect("path to DB wasn't specified"); + db(&format!("{root_path}/coordinator")) +} + +fn tributary_db(set: ValidatorSet) -> Db { + let root_path = serai_env::var("DB_PATH").expect("path to DB wasn't specified"); + let network = match set.network { + NetworkId::Serai => panic!("creating Tributary for the Serai network"), + NetworkId::Bitcoin => "Bitcoin", + NetworkId::Ethereum => "Ethereum", + NetworkId::Monero => "Monero", + }; + db(&format!("{root_path}/tributary-{network}-{}", set.session.0)) +} + +async fn serai() -> Arc { + const SERAI_CONNECTION_DELAY: Duration = Duration::from_secs(10); + const MAX_SERAI_CONNECTION_DELAY: Duration = Duration::from_secs(300); + + let mut delay = SERAI_CONNECTION_DELAY; + loop { + let Ok(serai) = Serai::new(format!( + "http://{}:9944", + serai_env::var("SERAI_HOSTNAME").expect("Serai hostname wasn't provided") + )) + .await + else { + log::error!("couldn't connect to the Serai node"); + tokio::time::sleep(delay).await; + delay = (delay + SERAI_CONNECTION_DELAY).min(MAX_SERAI_CONNECTION_DELAY); + continue; + }; + log::info!("made initial connection to Serai node"); + return Arc::new(serai); + } +} + +// TODO: intended_cosigns +fn spawn_cosigning( + db: impl serai_db::Db, + serai: Arc, + p2p: impl p2p::P2p, + tasks_to_run_upon_cosigning: Vec, + mut p2p_cosigns: mpsc::UnboundedReceiver, + mut signed_cosigns: mpsc::UnboundedReceiver, +) { + let mut cosigning = Cosigning::spawn(db, serai, p2p.clone(), tasks_to_run_upon_cosigning); + tokio::spawn(async move { + let last_cosign_rebroadcast = Instant::now(); + loop { + let time_till_cosign_rebroadcast = (last_cosign_rebroadcast + + serai_cosign::BROADCAST_FREQUENCY) + .saturating_duration_since(Instant::now()); + tokio::select! { + () = tokio::time::sleep(time_till_cosign_rebroadcast) => { + for cosign in cosigning.cosigns_to_rebroadcast() { + p2p.publish_cosign(cosign).await; + } + } + cosign = p2p_cosigns.recv() => { + let cosign = cosign.expect("p2p cosigns channel was dropped?"); + let _: Result<_, _> = cosigning.intake_cosign(&cosign); + } + cosign = signed_cosigns.recv() => { + let cosign = cosign.expect("signed cosigns channel was dropped?"); + // TODO: Handle this error + let _: Result<_, _> = cosigning.intake_cosign(&cosign); + p2p.publish_cosign(cosign).await; + } + } + } + }); +} + +/// Spawn an existing Tributary. +/// +/// This will spawn the Tributary, the Tributary scanning task, and inform the P2P network. +async fn spawn_tributary( + db: Db, + p2p: P, + p2p_add_tributary: mpsc::UnboundedSender>, + set: NewSetInformation, + serai_key: Zeroizing<::F>, +) { + let genesis = <[u8; 32]>::from(Blake2s::::digest((set.serai_block, set.set).encode())); + + // Since the Serai block will be finalized, then cosigned, before we handle this, this time will + // be a couple of minutes stale. While the Tributary will still function with a start time in the + // past, the Tributary will immediately incur round timeouts. We reduce these by adding a + // constant delay of a couple of minutes. + const TRIBUTARY_START_TIME_DELAY: u64 = 120; + let start_time = set.declaration_time + TRIBUTARY_START_TIME_DELAY; + + let mut tributary_validators = Vec::with_capacity(set.validators.len()); + let mut validators = Vec::with_capacity(set.validators.len()); + let mut total_weight = 0; + let mut validator_weights = HashMap::with_capacity(set.validators.len()); + for (validator, weight) in set.validators { + let validator_key = ::read_G(&mut validator.0.as_slice()) + .expect("Serai validator had an invalid public key"); + let validator = SeraiAddress::from(validator); + let weight = u64::from(weight); + tributary_validators.push((validator_key, weight)); + validators.push(validator); + total_weight += weight; + validator_weights.insert(validator, weight); + } + + let tributary_db = tributary_db(set.set); + let tributary = Tributary::<_, Transaction, _>::new( + tributary_db.clone(), + genesis, + start_time, + serai_key, + tributary_validators, + p2p, + ) + .await + .unwrap(); + let reader = tributary.reader(); + + p2p_add_tributary.send(tributary).expect("p2p's add_tributary channel was closed?"); + + let (scan_tributary_task_def, scan_tributary_task) = Task::new(); + tokio::spawn( + (ScanTributaryTask { + cosign_db: db, + tributary_db, + set: set.set, + validators, + total_weight, + validator_weights, + tributary: reader, + _p2p: PhantomData::

, + }) + .continually_run(scan_tributary_task_def, vec![todo!("TODO")]), + ); + // TODO^ On Tributary block, drain this task's ProcessorMessages + + // Have the tributary scanner run as soon as there's a new block + // TODO: Implement retiry, this will hold the tributary/handle indefinitely + tokio::spawn(async move { + loop { + tributary + .next_block_notification() + .await + .await + .map_err(|_| ()) + // unreachable since this owns the tributary object and doesn't drop it + .expect("tributary was dropped causing notification to error"); + scan_tributary_task.run_now(); + } + }); +} + +#[tokio::main] +async fn main() { + // Override the panic handler with one which will panic if any tokio task panics + { + let existing = std::panic::take_hook(); + std::panic::set_hook(Box::new(move |panic| { + existing(panic); + const MSG: &str = "exiting the process due to a task panicking"; + println!("{MSG}"); + log::error!("{MSG}"); + std::process::exit(1); + })); + } + + // Initialize the logger + if std::env::var("RUST_LOG").is_err() { + std::env::set_var("RUST_LOG", serai_env::var("RUST_LOG").unwrap_or_else(|| "info".to_string())); + } + env_logger::init(); + log::info!("starting coordinator service..."); + + // Read the Serai key from the env + let serai_key = { + let mut key_hex = serai_env::var("SERAI_KEY").expect("Serai key wasn't provided"); + let mut key_vec = hex::decode(&key_hex).map_err(|_| ()).expect("Serai key wasn't hex-encoded"); + key_hex.zeroize(); + if key_vec.len() != 32 { + key_vec.zeroize(); + panic!("Serai key had an invalid length"); + } + let mut key_bytes = [0; 32]; + key_bytes.copy_from_slice(&key_vec); + key_vec.zeroize(); + let key = Zeroizing::new(::F::from_repr(key_bytes).unwrap()); + key_bytes.zeroize(); + key + }; + + // Open the database + let db = coordinator_db(); + + // Connect to the message-queue + let message_queue = MessageQueue::from_env(Service::Coordinator); + + // Connect to the Serai node + let serai = serai().await; + + let (p2p_add_tributary_send, p2p_add_tributary_recv) = mpsc::unbounded_channel(); + let (p2p_retire_tributary_send, p2p_retire_tributary_recv) = mpsc::unbounded_channel(); + let (p2p_cosigns_send, p2p_cosigns_recv) = mpsc::unbounded_channel(); + + // Spawn the P2P network + let p2p = { + let serai_keypair = { + let mut key_bytes = serai_key.to_bytes(); + // Schnorrkel SecretKey is the key followed by 32 bytes of entropy for nonces + let mut expanded_key = Zeroizing::new([0; 64]); + expanded_key.as_mut_slice()[.. 32].copy_from_slice(&key_bytes); + OsRng.fill_bytes(&mut expanded_key.as_mut_slice()[32 ..]); + key_bytes.zeroize(); + Zeroizing::new( + schnorrkel::SecretKey::from_bytes(expanded_key.as_slice()).unwrap().to_keypair(), + ) + }; + let p2p = p2p::Libp2p::new(&serai_keypair, serai.clone()); + tokio::spawn(p2p::run::( + db.clone(), + p2p.clone(), + p2p_add_tributary_recv, + p2p_retire_tributary_recv, + p2p_cosigns_send, + )); + p2p + }; + + // TODO: p2p_add_tributary_send, p2p_retire_tributary_send + + // Spawn the Substrate scanners + // TODO: Canonical, NewSet, SignSlashReport + let (substrate_canonical_task_def, substrate_canonical_task) = Task::new(); + tokio::spawn( + CanonicalEventStream::new(db.clone(), serai.clone()) + .continually_run(substrate_canonical_task_def, todo!("TODO")), + ); + let (substrate_ephemeral_task_def, substrate_ephemeral_task) = Task::new(); + tokio::spawn( + EphemeralEventStream::new( + db.clone(), + serai.clone(), + PublicKey::from_raw((::generator() * serai_key.deref()).to_bytes()), + ) + .continually_run(substrate_ephemeral_task_def, todo!("TODO")), + ); + + // Spawn the cosign handler + let (signed_cosigns_send, signed_cosigns_recv) = mpsc::unbounded_channel(); + spawn_cosigning( + db.clone(), + serai.clone(), + p2p.clone(), + // Run the Substrate scanners once we cosign new blocks + vec![substrate_canonical_task, substrate_ephemeral_task], + p2p_cosigns_recv, + signed_cosigns_recv, + ); + + // TODO: Reload tributaries from disk, handle processor messages + + // TODO: On NewSet, save to DB, send KeyGen, spawn tributary task, inform P2P network + todo!("TODO") } diff --git a/coordinator/substrate/src/canonical.rs b/coordinator/substrate/src/canonical.rs index f333e11f..e1bbe6c2 100644 --- a/coordinator/substrate/src/canonical.rs +++ b/coordinator/substrate/src/canonical.rs @@ -1,4 +1,5 @@ -use std::future::Future; +use core::future::Future; +use std::sync::Arc; use futures::stream::{StreamExt, FuturesOrdered}; @@ -20,14 +21,14 @@ create_db!( /// The event stream for canonical events. pub struct CanonicalEventStream { db: D, - serai: Serai, + serai: Arc, } impl CanonicalEventStream { /// Create a new canonical event stream. /// /// Only one of these may exist over the provided database. - pub fn new(db: D, serai: Serai) -> Self { + pub fn new(db: D, serai: Arc) -> Self { Self { db, serai } } } diff --git a/coordinator/substrate/src/ephemeral.rs b/coordinator/substrate/src/ephemeral.rs index 703d5b3a..d889d59f 100644 --- a/coordinator/substrate/src/ephemeral.rs +++ b/coordinator/substrate/src/ephemeral.rs @@ -1,4 +1,5 @@ -use std::future::Future; +use core::future::Future; +use std::sync::Arc; use futures::stream::{StreamExt, FuturesOrdered}; @@ -24,7 +25,7 @@ create_db!( /// The event stream for ephemeral events. pub struct EphemeralEventStream { db: D, - serai: Serai, + serai: Arc, validator: PublicKey, } @@ -32,7 +33,7 @@ impl EphemeralEventStream { /// Create a new ephemeral event stream. /// /// Only one of these may exist over the provided database. - pub fn new(db: D, serai: Serai, validator: PublicKey) -> Self { + pub fn new(db: D, serai: Arc, validator: PublicKey) -> Self { Self { db, serai, validator } } } @@ -216,7 +217,7 @@ impl ContinuallyRan for EphemeralEventStream { &NewSetInformation { set: *set, serai_block: block.block_hash, - start_time: block.time, + declaration_time: block.time, // TODO: Why do we have this as an explicit field here? // Shouldn't thiis be inlined into the Processor's key gen code, where it's used? threshold: ((total_weight * 2) / 3) + 1, diff --git a/coordinator/substrate/src/lib.rs b/coordinator/substrate/src/lib.rs index 41378508..f723332d 100644 --- a/coordinator/substrate/src/lib.rs +++ b/coordinator/substrate/src/lib.rs @@ -13,7 +13,9 @@ use serai_client::{ use serai_db::*; mod canonical; +pub use canonical::CanonicalEventStream; mod ephemeral; +pub use ephemeral::EphemeralEventStream; fn borsh_serialize_validators( validators: &Vec<(PublicKey, u16)>, @@ -32,16 +34,22 @@ fn borsh_deserialize_validators( /// The information for a new set. #[derive(Debug, BorshSerialize, BorshDeserialize)] pub struct NewSetInformation { - set: ValidatorSet, - serai_block: [u8; 32], - start_time: u64, - threshold: u16, + /// The set. + pub set: ValidatorSet, + /// The Serai block which declared it. + pub serai_block: [u8; 32], + /// The time of the block which declared it, in seconds. + pub declaration_time: u64, + /// The threshold to use. + pub threshold: u16, + /// The validators, with the amount of key shares they have. #[borsh( serialize_with = "borsh_serialize_validators", deserialize_with = "borsh_deserialize_validators" )] - validators: Vec<(PublicKey, u16)>, - evrf_public_keys: Vec<([u8; 32], Vec)>, + pub validators: Vec<(PublicKey, u16)>, + /// The eVRF public keys. + pub evrf_public_keys: Vec<([u8; 32], Vec)>, } mod _public_db {