diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index f1090284..fb96dc76 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -1,5 +1,5 @@ use core::{marker::PhantomData, ops::Deref, time::Duration}; -use std::{sync::Arc, time::Instant, collections::HashMap}; +use std::{sync::Arc, collections::HashMap, time::Instant, path::Path, fs}; use zeroize::{Zeroize, Zeroizing}; use rand_core::{RngCore, OsRng}; @@ -15,11 +15,13 @@ use tokio::sync::mpsc; use scale::Encode; use serai_client::{ primitives::{NetworkId, PublicKey, SeraiAddress}, - validator_sets::primitives::ValidatorSet, + validator_sets::primitives::{Session, ValidatorSet}, Serai, }; use message_queue::{Service, client::MessageQueue}; +use serai_db::{*, Db as DbTrait}; + use ::tributary::Tributary; use serai_task::{Task, TaskHandle, ContinuallyRan}; @@ -30,6 +32,19 @@ use serai_coordinator_substrate::{NewSetInformation, CanonicalEventStream, Ephem mod tributary; use tributary::{Transaction, ScanTributaryTask}; +create_db! { + Coordinator { + ActiveTributaries: () -> Vec, + RetiredTributary: (set: ValidatorSet) -> (), + } +} + +db_channel! { + Coordinator { + TributaryCleanup: () -> ValidatorSet, + } +} + mod p2p { pub use serai_coordinator_p2p::*; pub use serai_coordinator_libp2p_p2p::Libp2p; @@ -50,6 +65,14 @@ type Db = serai_db::RocksDB; #[allow(unused_variables, unreachable_code)] fn db(path: &str) -> Db { + { + let path: &Path = path.as_ref(); + // This may error if this path already exists, which we shouldn't propagate/panic on. If this + // is a problem (such as we don't have the necessary permissions to write to this path), we + // expect the following DB opening to error. + let _: Result<_, _> = fs::create_dir_all(path.parent().unwrap()); + } + #[cfg(all(feature = "parity-db", feature = "rocksdb"))] panic!("built with parity-db and rocksdb"); #[cfg(all(feature = "parity-db", not(feature = "rocksdb")))] @@ -61,10 +84,10 @@ fn db(path: &str) -> Db { fn coordinator_db() -> Db { let root_path = serai_env::var("DB_PATH").expect("path to DB wasn't specified"); - db(&format!("{root_path}/coordinator")) + db(&format!("{root_path}/coordinator/db")) } -fn tributary_db(set: ValidatorSet) -> Db { +fn tributary_db_folder(set: ValidatorSet) -> String { let root_path = serai_env::var("DB_PATH").expect("path to DB wasn't specified"); let network = match set.network { NetworkId::Serai => panic!("creating Tributary for the Serai network"), @@ -72,7 +95,11 @@ fn tributary_db(set: ValidatorSet) -> Db { NetworkId::Ethereum => "Ethereum", NetworkId::Monero => "Monero", }; - db(&format!("{root_path}/tributary-{network}-{}", set.session.0)) + format!("{root_path}/tributary-{network}-{}", set.session.0) +} + +fn tributary_db(set: ValidatorSet) -> Db { + db(&format!("{}/db", tributary_db_folder(set))) } async fn serai() -> Arc { @@ -138,12 +165,32 @@ fn spawn_cosigning( /// /// This will spawn the Tributary, the Tributary scanning task, and inform the P2P network. async fn spawn_tributary( - db: Db, + mut db: Db, p2p: P, - p2p_add_tributary: mpsc::UnboundedSender>, + p2p_add_tributary: &mpsc::UnboundedSender<(ValidatorSet, Tributary)>, set: NewSetInformation, serai_key: Zeroizing<::F>, ) { + // Don't spawn retired Tributaries + if RetiredTributary::get(&db, set.set).is_some() { + return; + } + + // TODO: Move from spawn_tributary to on NewSet + // Queue the historical Tributary for this network for deletion + // We explicitly don't queue this upon Tributary retire to give time to investigate retired + // Tributaries if questions are raised post-retiry. This gives a week after the Tributary has + // been retired to make a backup of the data directory for any investigations. + if let Some(historic_session) = set.set.session.0.checked_sub(2) { + // This may get fired several times but that isn't an issue + let mut txn = db.txn(); + TributaryCleanup::send( + &mut txn, + &ValidatorSet { network: set.set.network, session: Session(historic_session) }, + ); + txn.commit(); + } + let genesis = <[u8; 32]>::from(Blake2s::::digest((set.serai_block, set.set).encode())); // Since the Serai block will be finalized, then cosigned, before we handle this, this time will @@ -181,12 +228,12 @@ async fn spawn_tributary( .unwrap(); let reader = tributary.reader(); - p2p_add_tributary.send(tributary).expect("p2p's add_tributary channel was closed?"); + p2p_add_tributary.send((set.set, tributary)).expect("p2p's add_tributary channel was closed?"); let (scan_tributary_task_def, scan_tributary_task) = Task::new(); tokio::spawn( (ScanTributaryTask { - cosign_db: db, + cosign_db: db.clone(), tributary_db, set: set.set, validators, @@ -200,9 +247,12 @@ async fn spawn_tributary( // TODO^ On Tributary block, drain this task's ProcessorMessages // Have the tributary scanner run as soon as there's a new block - // TODO: Implement retiry, this will hold the tributary/handle indefinitely tokio::spawn(async move { loop { + if RetiredTributary::get(&db, set.set).is_some() { + break; + } + tributary .next_block_notification() .await @@ -254,7 +304,28 @@ async fn main() { }; // Open the database - let db = coordinator_db(); + let mut db = coordinator_db(); + + let existing_tributaries_at_boot = { + let mut txn = db.txn(); + + // Cleanup all historic Tributaries + while let Some(to_cleanup) = TributaryCleanup::try_recv(&mut txn) { + // TributaryCleanup may fire this message multiple times so this may fail if we've already + // performed cleanup + log::info!("pruning data directory for historic tributary {to_cleanup:?}"); + let _: Result<_, _> = fs::remove_dir_all(tributary_db_folder(to_cleanup)); + } + + // Remove retired Tributaries from ActiveTributaries + let mut active_tributaries = ActiveTributaries::get(&txn).unwrap_or(vec![]); + active_tributaries.retain(|tributary| RetiredTributary::get(&txn, tributary.set).is_none()); + ActiveTributaries::set(&mut txn, &active_tributaries); + + txn.commit(); + + active_tributaries + }; // Connect to the message-queue let message_queue = MessageQueue::from_env(Service::Coordinator); @@ -321,9 +392,16 @@ async fn main() { signed_cosigns_recv, ); - // TODO: Reload tributaries from disk, handle processor messages + // Spawn all Tributaries on-disk + for tributary in existing_tributaries_at_boot { + spawn_tributary(db.clone(), p2p.clone(), &p2p_add_tributary_send, tributary, serai_key.clone()) + .await; + } - // TODO: On NewSet, save to DB, send KeyGen, spawn tributary task, inform P2P network + // TODO: Hndle processor messages + + // TODO: On NewSet, queue historical for deletionn, save to DB, send KeyGen, spawn tributary + // task, inform P2P network todo!("TODO") }