Delete old Tributaries on reboot

This commit is contained in:
Luke Parker
2025-01-10 20:10:05 -05:00
parent cbe83956aa
commit 378d6b90cf

View File

@@ -1,5 +1,5 @@
use core::{marker::PhantomData, ops::Deref, time::Duration};
use std::{sync::Arc, time::Instant, collections::HashMap};
use std::{sync::Arc, collections::HashMap, time::Instant, path::Path, fs};
use zeroize::{Zeroize, Zeroizing};
use rand_core::{RngCore, OsRng};
@@ -15,11 +15,13 @@ use tokio::sync::mpsc;
use scale::Encode;
use serai_client::{
primitives::{NetworkId, PublicKey, SeraiAddress},
validator_sets::primitives::ValidatorSet,
validator_sets::primitives::{Session, ValidatorSet},
Serai,
};
use message_queue::{Service, client::MessageQueue};
use serai_db::{*, Db as DbTrait};
use ::tributary::Tributary;
use serai_task::{Task, TaskHandle, ContinuallyRan};
@@ -30,6 +32,19 @@ use serai_coordinator_substrate::{NewSetInformation, CanonicalEventStream, Ephem
mod tributary;
use tributary::{Transaction, ScanTributaryTask};
create_db! {
Coordinator {
ActiveTributaries: () -> Vec<NewSetInformation>,
RetiredTributary: (set: ValidatorSet) -> (),
}
}
db_channel! {
Coordinator {
TributaryCleanup: () -> ValidatorSet,
}
}
mod p2p {
pub use serai_coordinator_p2p::*;
pub use serai_coordinator_libp2p_p2p::Libp2p;
@@ -50,6 +65,14 @@ type Db = serai_db::RocksDB;
#[allow(unused_variables, unreachable_code)]
fn db(path: &str) -> Db {
{
let path: &Path = path.as_ref();
// This may error if this path already exists, which we shouldn't propagate/panic on. If this
// is a problem (such as we don't have the necessary permissions to write to this path), we
// expect the following DB opening to error.
let _: Result<_, _> = fs::create_dir_all(path.parent().unwrap());
}
#[cfg(all(feature = "parity-db", feature = "rocksdb"))]
panic!("built with parity-db and rocksdb");
#[cfg(all(feature = "parity-db", not(feature = "rocksdb")))]
@@ -61,10 +84,10 @@ fn db(path: &str) -> Db {
fn coordinator_db() -> Db {
let root_path = serai_env::var("DB_PATH").expect("path to DB wasn't specified");
db(&format!("{root_path}/coordinator"))
db(&format!("{root_path}/coordinator/db"))
}
fn tributary_db(set: ValidatorSet) -> Db {
fn tributary_db_folder(set: ValidatorSet) -> String {
let root_path = serai_env::var("DB_PATH").expect("path to DB wasn't specified");
let network = match set.network {
NetworkId::Serai => panic!("creating Tributary for the Serai network"),
@@ -72,7 +95,11 @@ fn tributary_db(set: ValidatorSet) -> Db {
NetworkId::Ethereum => "Ethereum",
NetworkId::Monero => "Monero",
};
db(&format!("{root_path}/tributary-{network}-{}", set.session.0))
format!("{root_path}/tributary-{network}-{}", set.session.0)
}
fn tributary_db(set: ValidatorSet) -> Db {
db(&format!("{}/db", tributary_db_folder(set)))
}
async fn serai() -> Arc<Serai> {
@@ -138,12 +165,32 @@ fn spawn_cosigning(
///
/// This will spawn the Tributary, the Tributary scanning task, and inform the P2P network.
async fn spawn_tributary<P: p2p::P2p>(
db: Db,
mut db: Db,
p2p: P,
p2p_add_tributary: mpsc::UnboundedSender<Tributary<Db, Transaction, P>>,
p2p_add_tributary: &mpsc::UnboundedSender<(ValidatorSet, Tributary<Db, Transaction, P>)>,
set: NewSetInformation,
serai_key: Zeroizing<<Ristretto as Ciphersuite>::F>,
) {
// Don't spawn retired Tributaries
if RetiredTributary::get(&db, set.set).is_some() {
return;
}
// TODO: Move from spawn_tributary to on NewSet
// Queue the historical Tributary for this network for deletion
// We explicitly don't queue this upon Tributary retire to give time to investigate retired
// Tributaries if questions are raised post-retiry. This gives a week after the Tributary has
// been retired to make a backup of the data directory for any investigations.
if let Some(historic_session) = set.set.session.0.checked_sub(2) {
// This may get fired several times but that isn't an issue
let mut txn = db.txn();
TributaryCleanup::send(
&mut txn,
&ValidatorSet { network: set.set.network, session: Session(historic_session) },
);
txn.commit();
}
let genesis = <[u8; 32]>::from(Blake2s::<U32>::digest((set.serai_block, set.set).encode()));
// Since the Serai block will be finalized, then cosigned, before we handle this, this time will
@@ -181,12 +228,12 @@ async fn spawn_tributary<P: p2p::P2p>(
.unwrap();
let reader = tributary.reader();
p2p_add_tributary.send(tributary).expect("p2p's add_tributary channel was closed?");
p2p_add_tributary.send((set.set, tributary)).expect("p2p's add_tributary channel was closed?");
let (scan_tributary_task_def, scan_tributary_task) = Task::new();
tokio::spawn(
(ScanTributaryTask {
cosign_db: db,
cosign_db: db.clone(),
tributary_db,
set: set.set,
validators,
@@ -200,9 +247,12 @@ async fn spawn_tributary<P: p2p::P2p>(
// TODO^ On Tributary block, drain this task's ProcessorMessages
// Have the tributary scanner run as soon as there's a new block
// TODO: Implement retiry, this will hold the tributary/handle indefinitely
tokio::spawn(async move {
loop {
if RetiredTributary::get(&db, set.set).is_some() {
break;
}
tributary
.next_block_notification()
.await
@@ -254,7 +304,28 @@ async fn main() {
};
// Open the database
let db = coordinator_db();
let mut db = coordinator_db();
let existing_tributaries_at_boot = {
let mut txn = db.txn();
// Cleanup all historic Tributaries
while let Some(to_cleanup) = TributaryCleanup::try_recv(&mut txn) {
// TributaryCleanup may fire this message multiple times so this may fail if we've already
// performed cleanup
log::info!("pruning data directory for historic tributary {to_cleanup:?}");
let _: Result<_, _> = fs::remove_dir_all(tributary_db_folder(to_cleanup));
}
// Remove retired Tributaries from ActiveTributaries
let mut active_tributaries = ActiveTributaries::get(&txn).unwrap_or(vec![]);
active_tributaries.retain(|tributary| RetiredTributary::get(&txn, tributary.set).is_none());
ActiveTributaries::set(&mut txn, &active_tributaries);
txn.commit();
active_tributaries
};
// Connect to the message-queue
let message_queue = MessageQueue::from_env(Service::Coordinator);
@@ -321,9 +392,16 @@ async fn main() {
signed_cosigns_recv,
);
// TODO: Reload tributaries from disk, handle processor messages
// Spawn all Tributaries on-disk
for tributary in existing_tributaries_at_boot {
spawn_tributary(db.clone(), p2p.clone(), &p2p_add_tributary_send, tributary, serai_key.clone())
.await;
}
// TODO: On NewSet, save to DB, send KeyGen, spawn tributary task, inform P2P network
// TODO: Hndle processor messages
// TODO: On NewSet, queue historical for deletionn, save to DB, send KeyGen, spawn tributary
// task, inform P2P network
todo!("TODO")
}