diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 0e2db23c..9f2e9e7a 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -4,7 +4,6 @@ use std::{sync::Arc, time::Instant}; use zeroize::{Zeroize, Zeroizing}; use rand_core::{RngCore, OsRng}; -use blake2::{digest::typenum::U32, Digest, Blake2s}; use ciphersuite::{ group::{ff::PrimeField, GroupEncoding}, Ciphersuite, Ristretto, @@ -12,23 +11,19 @@ use ciphersuite::{ use tokio::sync::mpsc; -use scale::Encode; -use serai_client::{primitives::PublicKey, validator_sets::primitives::ValidatorSet, Serai}; +use serai_client::{primitives::PublicKey, Serai}; use message_queue::{Service, client::MessageQueue}; -use tributary_sdk::Tributary; - use serai_task::{Task, TaskHandle, ContinuallyRan}; use serai_cosign::{SignedCosign, Cosigning}; -use serai_coordinator_substrate::{NewSetInformation, CanonicalEventStream, EphemeralEventStream}; -use serai_coordinator_tributary::{Transaction, ScanTributaryTask}; +use serai_coordinator_substrate::{CanonicalEventStream, EphemeralEventStream}; +use serai_coordinator_tributary::Transaction; mod db; use db::*; mod tributary; -use tributary::ScanTributaryMessagesTask; mod substrate; use substrate::SubstrateTask; @@ -104,69 +99,6 @@ fn spawn_cosigning( }); } -/// Spawn an existing Tributary. -/// -/// This will spawn the Tributary, the Tributary scanning task, and inform the P2P network. -async fn spawn_tributary( - db: Db, - message_queue: Arc, - p2p: P, - p2p_add_tributary: &mpsc::UnboundedSender<(ValidatorSet, Tributary)>, - set: NewSetInformation, - serai_key: Zeroizing<::F>, -) { - // Don't spawn retired Tributaries - if RetiredTributary::get(&db, set.set.network).map(|session| session.0) >= Some(set.set.session.0) - { - return; - } - - let genesis = <[u8; 32]>::from(Blake2s::::digest((set.serai_block, set.set).encode())); - - // Since the Serai block will be finalized, then cosigned, before we handle this, this time will - // be a couple of minutes stale. While the Tributary will still function with a start time in the - // past, the Tributary will immediately incur round timeouts. We reduce these by adding a - // constant delay of a couple of minutes. - const TRIBUTARY_START_TIME_DELAY: u64 = 120; - let start_time = set.declaration_time + TRIBUTARY_START_TIME_DELAY; - - let mut tributary_validators = Vec::with_capacity(set.validators.len()); - for (validator, weight) in set.validators.iter().copied() { - let validator_key = ::read_G(&mut validator.0.as_slice()) - .expect("Serai validator had an invalid public key"); - let weight = u64::from(weight); - tributary_validators.push((validator_key, weight)); - } - - let tributary_db = tributary_db(set.set); - let tributary = - Tributary::new(tributary_db.clone(), genesis, start_time, serai_key, tributary_validators, p2p) - .await - .unwrap(); - let reader = tributary.reader(); - - p2p_add_tributary - .send((set.set, tributary.clone())) - .expect("p2p's add_tributary channel was closed?"); - - // Spawn the task to send all messages from the Tributary scanner to the message-queue - let (scan_tributary_messages_task_def, scan_tributary_messages_task) = Task::new(); - tokio::spawn( - (ScanTributaryMessagesTask { tributary_db: tributary_db.clone(), set: set.set, message_queue }) - .continually_run(scan_tributary_messages_task_def, vec![]), - ); - - let (scan_tributary_task_def, scan_tributary_task) = Task::new(); - tokio::spawn( - ScanTributaryTask::<_, _, P>::new(db.clone(), tributary_db, &set, reader) - // This is the only handle for this ScanTributaryMessagesTask, so when this task is dropped, - // it will be too - .continually_run(scan_tributary_task_def, vec![scan_tributary_messages_task]), - ); - - tokio::spawn(tributary::run(db, set, tributary, scan_tributary_task)); -} - #[tokio::main] async fn main() { // Override the panic handler with one which will panic if any tokio task panics @@ -297,7 +229,7 @@ async fn main() { // Spawn all Tributaries on-disk for tributary in existing_tributaries_at_boot { - spawn_tributary( + crate::tributary::spawn_tributary( db.clone(), message_queue.clone(), p2p.clone(), diff --git a/coordinator/src/substrate.rs b/coordinator/src/substrate.rs index 7ea5c257..8b5d2b41 100644 --- a/coordinator/src/substrate.rs +++ b/coordinator/src/substrate.rs @@ -141,7 +141,7 @@ impl ContinuallyRan for SubstrateTask

{ // Now spawn the Tributary // If we reboot after committing the txn, but before this is called, this will be called // on boot - crate::spawn_tributary( + crate::tributary::spawn_tributary( self.db.clone(), self.message_queue.clone(), self.p2p.clone(), diff --git a/coordinator/src/tributary.rs b/coordinator/src/tributary.rs index 4fb193b3..56fe8a37 100644 --- a/coordinator/src/tributary.rs +++ b/coordinator/src/tributary.rs @@ -1,28 +1,138 @@ use core::{future::Future, time::Duration}; use std::sync::Arc; -use serai_db::{DbTxn, Db}; +use zeroize::Zeroizing; +use blake2::{digest::typenum::U32, Digest, Blake2s}; +use ciphersuite::{Ciphersuite, Ristretto}; +use tokio::sync::mpsc; + +use serai_db::{DbTxn, Db as DbTrait}; + +use scale::Encode; use serai_client::validator_sets::primitives::ValidatorSet; use tributary_sdk::{ProvidedError, Tributary}; -use serai_task::{TaskHandle, ContinuallyRan}; +use serai_task::{Task, TaskHandle, ContinuallyRan}; use message_queue::{Service, Metadata, client::MessageQueue}; use serai_cosign::Cosigning; use serai_coordinator_substrate::NewSetInformation; -use serai_coordinator_tributary::{Transaction, ProcessorMessages}; +use serai_coordinator_tributary::{Transaction, ProcessorMessages, ScanTributaryTask}; use serai_coordinator_p2p::P2p; -pub(crate) struct ScanTributaryMessagesTask { +use crate::Db; + +/// Provides Cosign/Cosigned Transactions onto the Tributary. +pub(crate) struct ProvideCosignCosignedTransactionsTask { + pub(crate) db: CD, + pub(crate) set: NewSetInformation, + pub(crate) tributary: Tributary, +} +impl ContinuallyRan + for ProvideCosignCosignedTransactionsTask +{ + fn run_iteration(&mut self) -> impl Send + Future> { + /// Provide a Provided Transaction to the Tributary. + /// + /// This is not a well-designed function. This is specific to the context in which its called, + /// within this file. It should only be considered an internal helper for this domain alone. + async fn provide_transaction( + set: ValidatorSet, + tributary: &Tributary, + tx: Transaction, + ) { + match tributary.provide_transaction(tx.clone()).await { + // The Tributary uses its own DB, so we may provide this multiple times if we reboot before + // committing the txn which provoked this + Ok(()) | Err(ProvidedError::AlreadyProvided) => {} + Err(ProvidedError::NotProvided) => { + panic!("providing a Transaction which wasn't a Provided transaction: {tx:?}"); + } + Err(ProvidedError::InvalidProvided(e)) => { + panic!("providing an invalid Provided transaction, tx: {tx:?}, error: {e:?}") + } + Err(ProvidedError::LocalMismatchesOnChain) => loop { + // The Tributary's scan task won't advance if we don't have the Provided transactions + // present on-chain, and this enters an infinite loop to block the calling task from + // advancing + log::error!( + "Tributary {:?} was supposed to provide {:?} but peers disagree, halting Tributary", + set, + tx, + ); + // Print this every five minutes as this does need to be handled + tokio::time::sleep(Duration::from_secs(5 * 60)).await; + }, + } + } + + async move { + let mut made_progress = false; + + // Check if we produced any cosigns we were supposed to + let mut pending_notable_cosign = false; + loop { + let mut txn = self.db.txn(); + + // Fetch the next cosign this tributary should handle + let Some(cosign) = crate::PendingCosigns::try_recv(&mut txn, self.set.set) else { break }; + pending_notable_cosign = cosign.notable; + + // If we (Serai) haven't cosigned this block, break as this is still pending + let Ok(latest) = Cosigning::::latest_cosigned_block_number(&txn) else { break }; + if latest < cosign.block_number { + break; + } + + // Because we've cosigned it, provide the TX for that + provide_transaction( + self.set.set, + &self.tributary, + Transaction::Cosigned { substrate_block_hash: cosign.block_hash }, + ) + .await; + // Clear pending_notable_cosign since this cosign isn't pending + pending_notable_cosign = false; + + // Commit the txn to clear this from PendingCosigns + txn.commit(); + made_progress = true; + } + + // If we don't have any notable cosigns pending, provide the next set of cosign intents + if !pending_notable_cosign { + let mut txn = self.db.txn(); + // intended_cosigns will only yield up to and including the next notable cosign + for cosign in Cosigning::::intended_cosigns(&mut txn, self.set.set) { + // Flag this cosign as pending + crate::PendingCosigns::send(&mut txn, self.set.set, &cosign); + // Provide the transaction to queue it for work + provide_transaction( + self.set.set, + &self.tributary, + Transaction::Cosign { substrate_block_hash: cosign.block_hash }, + ) + .await; + } + txn.commit(); + made_progress = true; + } + + Ok(made_progress) + } + } +} + +/// Takes the messages from ScanTributaryTask and publishes them to the message-queue. +pub(crate) struct TributaryProcessorMessagesTask { pub(crate) tributary_db: TD, pub(crate) set: ValidatorSet, pub(crate) message_queue: Arc, } - -impl ContinuallyRan for ScanTributaryMessagesTask { +impl ContinuallyRan for TributaryProcessorMessagesTask { fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut made_progress = false; @@ -45,118 +155,122 @@ impl ContinuallyRan for ScanTributaryMessagesTask { } } -async fn provide_transaction( +/// Run the scan task whenever the Tributary adds a new block. +async fn scan_on_new_block( + db: CD, set: ValidatorSet, - tributary: &Tributary, - tx: Transaction, -) { - match tributary.provide_transaction(tx.clone()).await { - // The Tributary uses its own DB, so we may provide this multiple times if we reboot before - // committing the txn which provoked this - Ok(()) | Err(ProvidedError::AlreadyProvided) => {} - Err(ProvidedError::NotProvided) => { - panic!("providing a Transaction which wasn't a Provided transaction: {tx:?}"); - } - Err(ProvidedError::InvalidProvided(e)) => { - panic!("providing an invalid Provided transaction, tx: {tx:?}, error: {e:?}") - } - Err(ProvidedError::LocalMismatchesOnChain) => loop { - // The Tributary's scan task won't advance if we don't have the Provided transactions - // present on-chain, and this enters an infinite loop to block the calling task from - // advancing - log::error!( - "Tributary {:?} was supposed to provide {:?} but peers disagree, halting Tributary", - set, - tx, - ); - // Print this every five minutes as this does need to be handled - tokio::time::sleep(Duration::from_secs(5 * 60)).await; - }, - } -} - -/// Run a Tributary. -/// -/// The Tributary handle existing causes the Tributary's consensus engine to be run. We distinctly -/// have `ScanTributaryTask` to scan the produced blocks. This function provides Provided -/// transactions onto the Tributary and invokes ScanTributaryTask whenver a new Tributary block is -/// produced (instead of only on the standard interval). -pub(crate) async fn run( - mut db: CD, - set: NewSetInformation, tributary: Tributary, scan_tributary_task: TaskHandle, + tasks_to_keep_alive: Vec, ) { loop { // Break once this Tributary is retired - if crate::RetiredTributary::get(&db, set.set.network).map(|session| session.0) >= - Some(set.set.session.0) + if crate::RetiredTributary::get(&db, set.network).map(|session| session.0) >= + Some(set.session.0) { + drop(tasks_to_keep_alive); break; } - // Check if we produced any cosigns we were supposed to - let mut pending_notable_cosign = false; - loop { - let mut txn = db.txn(); - - // Fetch the next cosign this tributary should handle - let Some(cosign) = crate::PendingCosigns::try_recv(&mut txn, set.set) else { break }; - pending_notable_cosign = cosign.notable; - - // If we (Serai) haven't cosigned this block, break as this is still pending - let Ok(latest) = Cosigning::::latest_cosigned_block_number(&txn) else { break }; - if latest < cosign.block_number { - break; - } - - // Because we've cosigned it, provide the TX for that - provide_transaction( - set.set, - &tributary, - Transaction::Cosigned { substrate_block_hash: cosign.block_hash }, - ) - .await; - // Clear pending_notable_cosign since this cosign isn't pending - pending_notable_cosign = false; - - // Commit the txn to clear this from PendingCosigns - txn.commit(); - } - - // If we don't have any notable cosigns pending, provide the next set of cosign intents - if pending_notable_cosign { - let mut txn = db.txn(); - // intended_cosigns will only yield up to and including the next notable cosign - for cosign in Cosigning::::intended_cosigns(&mut txn, set.set) { - // Flag this cosign as pending - crate::PendingCosigns::send(&mut txn, set.set, &cosign); - // Provide the transaction to queue it for work - provide_transaction( - set.set, - &tributary, - Transaction::Cosign { substrate_block_hash: cosign.block_hash }, - ) - .await; - } - txn.commit(); - } - // Have the tributary scanner run as soon as there's a new block - // This is wrapped in a timeout so we don't go too long without running the above code - match tokio::time::timeout( - Duration::from_millis(tributary_sdk::tendermint::TARGET_BLOCK_TIME.into()), - tributary.next_block_notification().await, - ) - .await - { - // Future resolved within the timeout, notification - Ok(Ok(())) => scan_tributary_task.run_now(), - // Future resolved within the timeout, notification failed due to sender being dropped + match tributary.next_block_notification().await.await { + Ok(()) => scan_tributary_task.run_now(), // unreachable since this owns the tributary object and doesn't drop it - Ok(Err(_)) => panic!("tributary was dropped causing notification to error"), - // Future didn't resolve within the timeout - Err(_) => {} + Err(_) => panic!("tributary was dropped causing notification to error"), } } } + +/// Spawn a Tributary. +/// +/// This will spawn the Tributary, the Tributary scan task, forward the messages from the scan task +/// to the message queue, provide Cosign/Cosigned transactions, and inform the P2P network. +pub(crate) async fn spawn_tributary( + db: Db, + message_queue: Arc, + p2p: P, + p2p_add_tributary: &mpsc::UnboundedSender<(ValidatorSet, Tributary)>, + set: NewSetInformation, + serai_key: Zeroizing<::F>, +) { + // Don't spawn retired Tributaries + if crate::db::RetiredTributary::get(&db, set.set.network).map(|session| session.0) >= + Some(set.set.session.0) + { + return; + } + + let genesis = <[u8; 32]>::from(Blake2s::::digest((set.serai_block, set.set).encode())); + + // Since the Serai block will be finalized, then cosigned, before we handle this, this time will + // be a couple of minutes stale. While the Tributary will still function with a start time in the + // past, the Tributary will immediately incur round timeouts. We reduce these by adding a + // constant delay of a couple of minutes. + const TRIBUTARY_START_TIME_DELAY: u64 = 120; + let start_time = set.declaration_time + TRIBUTARY_START_TIME_DELAY; + + let mut tributary_validators = Vec::with_capacity(set.validators.len()); + for (validator, weight) in set.validators.iter().copied() { + let validator_key = ::read_G(&mut validator.0.as_slice()) + .expect("Serai validator had an invalid public key"); + let weight = u64::from(weight); + tributary_validators.push((validator_key, weight)); + } + + // Spawn the Tributary + let tributary_db = crate::db::tributary_db(set.set); + let tributary = + Tributary::new(tributary_db.clone(), genesis, start_time, serai_key, tributary_validators, p2p) + .await + .unwrap(); + let reader = tributary.reader(); + + // Inform the P2P network + p2p_add_tributary + .send((set.set, tributary.clone())) + .expect("p2p's add_tributary channel was closed?"); + + // Spawn the task to provide Cosign/Cosigned transactions onto the Tributary + let (provide_cosign_cosigned_transactions_task_def, provide_cosign_cosigned_transactions_task) = + Task::new(); + tokio::spawn( + (ProvideCosignCosignedTransactionsTask { + db: db.clone(), + set: set.clone(), + tributary: tributary.clone(), + }) + .continually_run(provide_cosign_cosigned_transactions_task_def, vec![]), + ); + + // Spawn the task to send all messages from the Tributary scanner to the message-queue + let (scan_tributary_messages_task_def, scan_tributary_messages_task) = Task::new(); + tokio::spawn( + (TributaryProcessorMessagesTask { + tributary_db: tributary_db.clone(), + set: set.set, + message_queue, + }) + .continually_run(scan_tributary_messages_task_def, vec![]), + ); + + // Spawn the scan task + let (scan_tributary_task_def, scan_tributary_task) = Task::new(); + tokio::spawn( + ScanTributaryTask::<_, _, P>::new(db.clone(), tributary_db, &set, reader) + // This is the only handle for this TributaryProcessorMessagesTask, so when this task is + // dropped, it will be too + .continually_run(scan_tributary_task_def, vec![scan_tributary_messages_task]), + ); + + // Whenever a new block occurs, immediately run the scan task + // This function also preserves the ProvideCosignCosignedTransactionsTask handle until the + // Tributary is retired, ensuring it isn't dropped prematurely and that the task don't run ad + // infinitum + tokio::spawn(scan_on_new_block( + db, + set.set, + tributary, + scan_tributary_task, + vec![provide_cosign_cosigned_transactions_task], + )); +}