diff --git a/Cargo.lock b/Cargo.lock index 1f67f626..ede4518f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8337,8 +8337,6 @@ dependencies = [ "serai-message-queue", "serai-processor-messages", "serai-task", - "sp-application-crypto", - "sp-runtime", "tokio", "tributary-chain", "zalloc", diff --git a/coordinator/cosign/src/lib.rs b/coordinator/cosign/src/lib.rs index aa2883aa..c4428a39 100644 --- a/coordinator/cosign/src/lib.rs +++ b/coordinator/cosign/src/lib.rs @@ -82,13 +82,13 @@ enum HasEvents { #[derive(Clone, Copy, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] pub struct CosignIntent { /// The global session this cosign is being performed under. - global_session: [u8; 32], + pub global_session: [u8; 32], /// The number of the block to cosign. - block_number: u64, + pub block_number: u64, /// The hash of the block to cosign. - block_hash: [u8; 32], + pub block_hash: [u8; 32], /// If this cosign must be handled before further cosigns are. - notable: bool, + pub notable: bool, } /// A cosign. diff --git a/coordinator/src/db.rs b/coordinator/src/db.rs new file mode 100644 index 00000000..4e932306 --- /dev/null +++ b/coordinator/src/db.rs @@ -0,0 +1,79 @@ +use std::{path::Path, fs}; + +pub(crate) use serai_db::{Get, DbTxn, Db as DbTrait}; +use serai_db::{create_db, db_channel}; + +use serai_client::{ + primitives::NetworkId, + validator_sets::primitives::{Session, ValidatorSet}, +}; + +use serai_cosign::CosignIntent; + +use serai_coordinator_substrate::NewSetInformation; + +#[cfg(all(feature = "parity-db", not(feature = "rocksdb")))] +pub(crate) type Db = serai_db::ParityDb; +#[cfg(feature = "rocksdb")] +pub(crate) type Db = serai_db::RocksDB; + +#[allow(unused_variables, unreachable_code)] +fn db(path: &str) -> Db { + { + let path: &Path = path.as_ref(); + // This may error if this path already exists, which we shouldn't propagate/panic on. If this + // is a problem (such as we don't have the necessary permissions to write to this path), we + // expect the following DB opening to error. + let _: Result<_, _> = fs::create_dir_all(path.parent().unwrap()); + } + + #[cfg(all(feature = "parity-db", feature = "rocksdb"))] + panic!("built with parity-db and rocksdb"); + #[cfg(all(feature = "parity-db", not(feature = "rocksdb")))] + let db = serai_db::new_parity_db(path); + #[cfg(feature = "rocksdb")] + let db = serai_db::new_rocksdb(path); + db +} + +pub(crate) fn coordinator_db() -> Db { + let root_path = serai_env::var("DB_PATH").expect("path to DB wasn't specified"); + db(&format!("{root_path}/coordinator/db")) +} + +fn tributary_db_folder(set: ValidatorSet) -> String { + let root_path = serai_env::var("DB_PATH").expect("path to DB wasn't specified"); + let network = match set.network { + NetworkId::Serai => panic!("creating Tributary for the Serai network"), + NetworkId::Bitcoin => "Bitcoin", + NetworkId::Ethereum => "Ethereum", + NetworkId::Monero => "Monero", + }; + format!("{root_path}/tributary-{network}-{}", set.session.0) +} + +pub(crate) fn tributary_db(set: ValidatorSet) -> Db { + db(&format!("{}/db", tributary_db_folder(set))) +} + +pub(crate) fn prune_tributary_db(set: ValidatorSet) { + log::info!("pruning data directory for tributary {set:?}"); + let db = tributary_db_folder(set); + if fs::exists(&db).expect("couldn't check if tributary DB exists") { + fs::remove_dir_all(db).unwrap(); + } +} + +create_db! { + Coordinator { + ActiveTributaries: () -> Vec, + RetiredTributary: (network: NetworkId) -> Session, + } +} + +db_channel! { + Coordinator { + TributaryCleanup: () -> ValidatorSet, + PendingCosigns: (set: ValidatorSet) -> CosignIntent, + } +} diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index fb96dc76..dbe185fb 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -1,5 +1,5 @@ use core::{marker::PhantomData, ops::Deref, time::Duration}; -use std::{sync::Arc, collections::HashMap, time::Instant, path::Path, fs}; +use std::{sync::Arc, collections::HashMap, time::Instant}; use zeroize::{Zeroize, Zeroizing}; use rand_core::{RngCore, OsRng}; @@ -20,31 +20,19 @@ use serai_client::{ }; use message_queue::{Service, client::MessageQueue}; -use serai_db::{*, Db as DbTrait}; - -use ::tributary::Tributary; +use ::tributary::ProvidedError; use serai_task::{Task, TaskHandle, ContinuallyRan}; use serai_cosign::{SignedCosign, Cosigning}; use serai_coordinator_substrate::{NewSetInformation, CanonicalEventStream, EphemeralEventStream}; +mod db; +use db::*; + mod tributary; use tributary::{Transaction, ScanTributaryTask}; -create_db! { - Coordinator { - ActiveTributaries: () -> Vec, - RetiredTributary: (set: ValidatorSet) -> (), - } -} - -db_channel! { - Coordinator { - TributaryCleanup: () -> ValidatorSet, - } -} - mod p2p { pub use serai_coordinator_p2p::*; pub use serai_coordinator_libp2p_p2p::Libp2p; @@ -58,49 +46,7 @@ mod p2p { static ALLOCATOR: zalloc::ZeroizingAlloc = zalloc::ZeroizingAlloc(std::alloc::System); -#[cfg(all(feature = "parity-db", not(feature = "rocksdb")))] -type Db = serai_db::ParityDb; -#[cfg(feature = "rocksdb")] -type Db = serai_db::RocksDB; - -#[allow(unused_variables, unreachable_code)] -fn db(path: &str) -> Db { - { - let path: &Path = path.as_ref(); - // This may error if this path already exists, which we shouldn't propagate/panic on. If this - // is a problem (such as we don't have the necessary permissions to write to this path), we - // expect the following DB opening to error. - let _: Result<_, _> = fs::create_dir_all(path.parent().unwrap()); - } - - #[cfg(all(feature = "parity-db", feature = "rocksdb"))] - panic!("built with parity-db and rocksdb"); - #[cfg(all(feature = "parity-db", not(feature = "rocksdb")))] - let db = serai_db::new_parity_db(path); - #[cfg(feature = "rocksdb")] - let db = serai_db::new_rocksdb(path); - db -} - -fn coordinator_db() -> Db { - let root_path = serai_env::var("DB_PATH").expect("path to DB wasn't specified"); - db(&format!("{root_path}/coordinator/db")) -} - -fn tributary_db_folder(set: ValidatorSet) -> String { - let root_path = serai_env::var("DB_PATH").expect("path to DB wasn't specified"); - let network = match set.network { - NetworkId::Serai => panic!("creating Tributary for the Serai network"), - NetworkId::Bitcoin => "Bitcoin", - NetworkId::Ethereum => "Ethereum", - NetworkId::Monero => "Monero", - }; - format!("{root_path}/tributary-{network}-{}", set.session.0) -} - -fn tributary_db(set: ValidatorSet) -> Db { - db(&format!("{}/db", tributary_db_folder(set))) -} +type Tributary

= ::tributary::Tributary; async fn serai() -> Arc { const SERAI_CONNECTION_DELAY: Duration = Duration::from_secs(10); @@ -124,7 +70,6 @@ async fn serai() -> Arc { } } -// TODO: intended_cosigns fn spawn_cosigning( db: impl serai_db::Db, serai: Arc, @@ -167,12 +112,13 @@ fn spawn_cosigning( async fn spawn_tributary( mut db: Db, p2p: P, - p2p_add_tributary: &mpsc::UnboundedSender<(ValidatorSet, Tributary)>, + p2p_add_tributary: &mpsc::UnboundedSender<(ValidatorSet, Tributary

)>, set: NewSetInformation, serai_key: Zeroizing<::F>, ) { // Don't spawn retired Tributaries - if RetiredTributary::get(&db, set.set).is_some() { + if RetiredTributary::get(&db, set.set.network).map(|session| session.0) >= Some(set.set.session.0) + { return; } @@ -216,21 +162,15 @@ async fn spawn_tributary( } let tributary_db = tributary_db(set.set); - let tributary = Tributary::<_, Transaction, _>::new( - tributary_db.clone(), - genesis, - start_time, - serai_key, - tributary_validators, - p2p, - ) - .await - .unwrap(); + let mut tributary = + Tributary::new(tributary_db.clone(), genesis, start_time, serai_key, tributary_validators, p2p) + .await + .unwrap(); let reader = tributary.reader(); p2p_add_tributary.send((set.set, tributary)).expect("p2p's add_tributary channel was closed?"); - let (scan_tributary_task_def, scan_tributary_task) = Task::new(); + let (scan_tributary_task_def, mut scan_tributary_task) = Task::new(); tokio::spawn( (ScanTributaryTask { cosign_db: db.clone(), @@ -246,21 +186,114 @@ async fn spawn_tributary( ); // TODO^ On Tributary block, drain this task's ProcessorMessages - // Have the tributary scanner run as soon as there's a new block tokio::spawn(async move { loop { - if RetiredTributary::get(&db, set.set).is_some() { + // Break once this Tributary is retired + if RetiredTributary::get(&db, set.set.network).map(|session| session.0) >= + Some(set.set.session.0) + { break; } - tributary - .next_block_notification() - .await - .await - .map_err(|_| ()) + let provide = |tributary: Tributary<_>, scan_tributary_task, tx: Transaction| async move { + match tributary.provide_transaction(tx.clone()).await { + // The Tributary uses its own DB, so we may provide this multiple times if we reboot + // before committing the txn which provoked this + Ok(()) | Err(ProvidedError::AlreadyProvided) => {} + Err(ProvidedError::NotProvided) => { + panic!("providing a Transaction which wasn't a Provided transaction?"); + } + Err(ProvidedError::InvalidProvided(e)) => { + panic!("providing an invalid Provided transaction: {e:?}") + } + Err(ProvidedError::LocalMismatchesOnChain) => { + // Drop the Tributary and scan Tributary task so we don't continue running them here + drop(tributary); + drop(scan_tributary_task); + + loop { + // We're actually only halting the Tributary's scan task (which already only scans + // if all Provided transactions align) as the P2P task is still maintaining a clone + // of the Tributary handle + log::error!( + "Tributary {:?} was supposed to provide {:?} but peers disagree, halting Tributary", + set.set, + tx, + ); + // Print this every five minutes as this does need to be handled + tokio::time::sleep(Duration::from_secs(5 * 60)).await; + } + + // Declare this unreachable so Rust will let us perform the above drops + unreachable!(); + } + } + (tributary, scan_tributary_task) + }; + + // Check if we produced any cosigns we were supposed to + let mut pending_notable_cosign = false; + loop { + let mut txn = db.txn(); + + // Fetch the next cosign this tributary should handle + let Some(cosign) = PendingCosigns::try_recv(&mut txn, set.set) else { break }; + pending_notable_cosign = cosign.notable; + + // If we (Serai) haven't cosigned this block, break as this is still pending + let Ok(latest) = Cosigning::::latest_cosigned_block_number(&txn) else { break }; + if latest < cosign.block_number { + break; + } + + // Because we've cosigned it, provide the TX for that + (tributary, scan_tributary_task) = provide( + tributary, + scan_tributary_task, + Transaction::Cosigned { substrate_block_hash: cosign.block_hash }, + ) + .await; + // Clear pending_notable_cosign since this cosign isn't pending + pending_notable_cosign = false; + + // Commit the txn to clear this from PendingCosigns + txn.commit(); + } + + // If we don't have any notable cosigns pending, provide the next set of cosign intents + if pending_notable_cosign { + let mut txn = db.txn(); + // intended_cosigns will only yield up to and including the next notable cosign + for cosign in Cosigning::::intended_cosigns(&mut txn, set.set) { + // Flag this cosign as pending + PendingCosigns::send(&mut txn, set.set, &cosign); + // Provide the transaction to queue it for work + (tributary, scan_tributary_task) = provide( + tributary, + scan_tributary_task, + Transaction::Cosign { substrate_block_hash: cosign.block_hash }, + ) + .await; + } + txn.commit(); + } + + // Have the tributary scanner run as soon as there's a new block + // This is wrapped in a timeout so we don't go too long without running the above code + match tokio::time::timeout( + Duration::from_millis(::tributary::tendermint::TARGET_BLOCK_TIME.into()), + tributary.next_block_notification().await, + ) + .await + { + // Future resolved within the timeout, notification + Ok(Ok(())) => scan_tributary_task.run_now(), + // Future resolved within the timeout, notification failed due to sender being dropped // unreachable since this owns the tributary object and doesn't drop it - .expect("tributary was dropped causing notification to error"); - scan_tributary_task.run_now(); + Ok(Err(_)) => panic!("tributary was dropped causing notification to error"), + // Future didn't resolve within the timeout + Err(_) => {} + } } }); } @@ -311,15 +344,17 @@ async fn main() { // Cleanup all historic Tributaries while let Some(to_cleanup) = TributaryCleanup::try_recv(&mut txn) { - // TributaryCleanup may fire this message multiple times so this may fail if we've already - // performed cleanup - log::info!("pruning data directory for historic tributary {to_cleanup:?}"); - let _: Result<_, _> = fs::remove_dir_all(tributary_db_folder(to_cleanup)); + prune_tributary_db(to_cleanup); + // Drain the cosign intents created for this set + while !Cosigning::::intended_cosigns(&mut txn, to_cleanup).is_empty() {} } // Remove retired Tributaries from ActiveTributaries let mut active_tributaries = ActiveTributaries::get(&txn).unwrap_or(vec![]); - active_tributaries.retain(|tributary| RetiredTributary::get(&txn, tributary.set).is_none()); + active_tributaries.retain(|tributary| { + RetiredTributary::get(&txn, tributary.set.network).map(|session| session.0) < + Some(tributary.set.session.0) + }); ActiveTributaries::set(&mut txn, &active_tributaries); txn.commit(); diff --git a/coordinator/src/tributary/db.rs b/coordinator/src/tributary/db.rs index 0d85110d..a3eab8db 100644 --- a/coordinator/src/tributary/db.rs +++ b/coordinator/src/tributary/db.rs @@ -189,8 +189,10 @@ create_db!( // The latest Substrate block to cosign. LatestSubstrateBlockToCosign: (set: ValidatorSet) -> [u8; 32], - // If we're actively cosigning or not. - ActivelyCosigning: (set: ValidatorSet) -> (), + // The hash of the block we're actively cosigning. + ActivelyCosigning: (set: ValidatorSet) -> [u8; 32], + // If this block has already been cosigned. + Cosigned: (set: ValidatorSet, substrate_block_hash: [u8; 32]) -> (), // The weight accumulated for a topic. AccumulatedWeight: (set: ValidatorSet, topic: Topic) -> u64, @@ -238,19 +240,20 @@ impl TributaryDb { ) { LatestSubstrateBlockToCosign::set(txn, set, &substrate_block_hash); } - pub(crate) fn actively_cosigning(txn: &mut impl DbTxn, set: ValidatorSet) -> bool { - ActivelyCosigning::get(txn, set).is_some() + pub(crate) fn actively_cosigning(txn: &mut impl DbTxn, set: ValidatorSet) -> Option<[u8; 32]> { + ActivelyCosigning::get(txn, set) } pub(crate) fn start_cosigning( txn: &mut impl DbTxn, set: ValidatorSet, + substrate_block_hash: [u8; 32], substrate_block_number: u64, ) { assert!( ActivelyCosigning::get(txn, set).is_none(), "starting cosigning while already cosigning" ); - ActivelyCosigning::set(txn, set, &()); + ActivelyCosigning::set(txn, set, &substrate_block_hash); TributaryDb::recognize_topic( txn, @@ -265,6 +268,20 @@ impl TributaryDb { pub(crate) fn finish_cosigning(txn: &mut impl DbTxn, set: ValidatorSet) { assert!(ActivelyCosigning::take(txn, set).is_some(), "finished cosigning but not cosigning"); } + pub(crate) fn mark_cosigned( + txn: &mut impl DbTxn, + set: ValidatorSet, + substrate_block_hash: [u8; 32], + ) { + Cosigned::set(txn, set, substrate_block_hash, &()); + } + pub(crate) fn cosigned( + txn: &mut impl DbTxn, + set: ValidatorSet, + substrate_block_hash: [u8; 32], + ) -> bool { + Cosigned::get(txn, set, substrate_block_hash).is_some() + } pub(crate) fn recognize_topic(txn: &mut impl DbTxn, set: ValidatorSet, topic: Topic) { AccumulatedWeight::set(txn, set, topic, &0); diff --git a/coordinator/src/tributary/scan.rs b/coordinator/src/tributary/scan.rs index 9da982e5..6e4d8d3f 100644 --- a/coordinator/src/tributary/scan.rs +++ b/coordinator/src/tributary/scan.rs @@ -45,17 +45,22 @@ struct ScanBlock<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> { impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> { fn potentially_start_cosign(&mut self) { // Don't start a new cosigning instance if we're actively running one - if TributaryDb::actively_cosigning(self.tributary_txn, self.set) { + if TributaryDb::actively_cosigning(self.tributary_txn, self.set).is_some() { return; } - // Start cosigning the latest intended-to-be-cosigned block + // Fetch the latest intended-to-be-cosigned block let Some(latest_substrate_block_to_cosign) = TributaryDb::latest_substrate_block_to_cosign(self.tributary_txn, self.set) else { return; }; + // If it was already cosigned, return + if TributaryDb::cosigned(self.tributary_txn, self.set, latest_substrate_block_to_cosign) { + return; + } + let Some(substrate_block_number) = Cosigning::::finalized_block_number(self.cosign_db, latest_substrate_block_to_cosign) else { @@ -65,7 +70,12 @@ impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> { }; // Mark us as actively cosigning - TributaryDb::start_cosigning(self.tributary_txn, self.set, substrate_block_number); + TributaryDb::start_cosigning( + self.tributary_txn, + self.set, + latest_substrate_block_to_cosign, + substrate_block_number, + ); // Send the message for the processor to start signing TributaryDb::send_message( self.tributary_txn, @@ -154,24 +164,31 @@ impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> { self.set, substrate_block_hash, ); - // Start a new cosign if we weren't already working on one + // Start a new cosign if we aren't already working on one self.potentially_start_cosign(); } Transaction::Cosigned { substrate_block_hash } => { - TributaryDb::finish_cosigning(self.tributary_txn, self.set); + /* + We provide one Cosigned per Cosign transaction, but they have independent orders. This + means we may receive Cosigned before Cosign. In order to ensure we only start work on + not-yet-Cosigned cosigns, we flag all cosigned blocks as cosigned. Then, when we choose + the next block to work on, we won't if it's already been cosigned. + */ + TributaryDb::mark_cosigned(self.tributary_txn, self.set, substrate_block_hash); - // Fetch the latest intended-to-be-cosigned block - let Some(latest_substrate_block_to_cosign) = - TributaryDb::latest_substrate_block_to_cosign(self.tributary_txn, self.set) - else { - return; - }; - // If this is the block we just cosigned, return, preventing us from signing it again - if latest_substrate_block_to_cosign == substrate_block_hash { + // If we aren't actively cosigning this block, return + // This occurs when we have Cosign TXs A, B, C, we received Cosigned for A and start on C, + // and then receive Cosigned for B + if TributaryDb::actively_cosigning(self.tributary_txn, self.set) != + Some(substrate_block_hash) + { return; } - // Since we do have a new cosign to work on, start it + // Since this is the block we were cosigning, mark us as having finished cosigning + TributaryDb::finish_cosigning(self.tributary_txn, self.set); + + // Start working on the next cosign self.potentially_start_cosign(); } Transaction::SubstrateBlock { hash } => { diff --git a/coordinator/src/tributary/transaction.rs b/coordinator/src/tributary/transaction.rs index 0befbf36..34528cb9 100644 --- a/coordinator/src/tributary/transaction.rs +++ b/coordinator/src/tributary/transaction.rs @@ -231,9 +231,11 @@ impl TransactionTrait for Transaction { TransactionKind::Signed((b"DkgConfirmation", attempt).encode(), signed.nonce(1)) } - Transaction::Cosign { .. } => TransactionKind::Provided("CosignSubstrateBlock"), + Transaction::Cosign { .. } => TransactionKind::Provided("Cosign"), Transaction::Cosigned { .. } => TransactionKind::Provided("Cosigned"), + // TODO: Provide this Transaction::SubstrateBlock { .. } => TransactionKind::Provided("SubstrateBlock"), + // TODO: Provide this Transaction::Batch { .. } => TransactionKind::Provided("Batch"), Transaction::Sign { id, attempt, round, signed, .. } => { diff --git a/processor/messages/src/lib.rs b/processor/messages/src/lib.rs index ee6ed8ac..5b3d325f 100644 --- a/processor/messages/src/lib.rs +++ b/processor/messages/src/lib.rs @@ -213,17 +213,17 @@ pub mod substrate { pub enum CoordinatorMessage { /// Keys set on the Serai blockchain. /// - /// This is set by the Coordinator's Substrate canonical event stream. + /// This is sent by the Coordinator's Substrate canonical event stream. SetKeys { serai_time: u64, session: Session, key_pair: KeyPair }, /// Slashes reported on the Serai blockchain OR the process timed out. /// /// This is the final message for a session, /// - /// This is set by the Coordinator's Substrate canonical event stream. + /// This is sent by the Coordinator's Substrate canonical event stream. SlashesReported { session: Session }, /// A block from Serai with relevance to this processor. /// - /// This is set by the Coordinator's Substrate canonical event stream. + /// This is sent by the Coordinator's Substrate canonical event stream. Block { serai_block_number: u64, batch: Option,