use core::{future::Future, time::Duration}; use std::sync::Arc; use zeroize::Zeroizing; use rand_core::OsRng; use blake2::{digest::typenum::U32, Digest, Blake2s}; use ciphersuite::{Ciphersuite, Ristretto}; use tokio::sync::mpsc; use serai_db::{DbTxn, Db as DbTrait}; use scale::Encode; use serai_client::validator_sets::primitives::ValidatorSet; use tributary_sdk::{TransactionError, ProvidedError, Tributary}; use serai_task::{Task, TaskHandle, ContinuallyRan}; use message_queue::{Service, Metadata, client::MessageQueue}; use serai_cosign::Cosigning; use serai_coordinator_substrate::{NewSetInformation, SignSlashReport}; use serai_coordinator_tributary::{Transaction, ProcessorMessages, ScanTributaryTask}; use serai_coordinator_p2p::P2p; use crate::Db; /// Provides Cosign/Cosigned Transactions onto the Tributary. pub(crate) struct ProvideCosignCosignedTransactionsTask { db: CD, set: NewSetInformation, tributary: Tributary, } impl ContinuallyRan for ProvideCosignCosignedTransactionsTask { fn run_iteration(&mut self) -> impl Send + Future> { /// Provide a Provided Transaction to the Tributary. /// /// This is not a well-designed function. This is specific to the context in which its called, /// within this file. It should only be considered an internal helper for this domain alone. async fn provide_transaction( set: ValidatorSet, tributary: &Tributary, tx: Transaction, ) { match tributary.provide_transaction(tx.clone()).await { // The Tributary uses its own DB, so we may provide this multiple times if we reboot before // committing the txn which provoked this Ok(()) | Err(ProvidedError::AlreadyProvided) => {} Err(ProvidedError::NotProvided) => { panic!("providing a Transaction which wasn't a Provided transaction: {tx:?}"); } Err(ProvidedError::InvalidProvided(e)) => { panic!("providing an invalid Provided transaction, tx: {tx:?}, error: {e:?}") } Err(ProvidedError::LocalMismatchesOnChain) => loop { // The Tributary's scan task won't advance if we don't have the Provided transactions // present on-chain, and this enters an infinite loop to block the calling task from // advancing log::error!( "Tributary {:?} was supposed to provide {:?} but peers disagree, halting Tributary", set, tx, ); // Print this every five minutes as this does need to be handled tokio::time::sleep(Duration::from_secs(5 * 60)).await; }, } } async move { let mut made_progress = false; // Check if we produced any cosigns we were supposed to let mut pending_notable_cosign = false; loop { let mut txn = self.db.txn(); // Fetch the next cosign this tributary should handle let Some(cosign) = crate::PendingCosigns::try_recv(&mut txn, self.set.set) else { break }; pending_notable_cosign = cosign.notable; // If we (Serai) haven't cosigned this block, break as this is still pending let Ok(latest) = Cosigning::::latest_cosigned_block_number(&txn) else { break }; if latest < cosign.block_number { break; } // Because we've cosigned it, provide the TX for that provide_transaction( self.set.set, &self.tributary, Transaction::Cosigned { substrate_block_hash: cosign.block_hash }, ) .await; // Clear pending_notable_cosign since this cosign isn't pending pending_notable_cosign = false; // Commit the txn to clear this from PendingCosigns txn.commit(); made_progress = true; } // If we don't have any notable cosigns pending, provide the next set of cosign intents if !pending_notable_cosign { let mut txn = self.db.txn(); // intended_cosigns will only yield up to and including the next notable cosign for cosign in Cosigning::::intended_cosigns(&mut txn, self.set.set) { // Flag this cosign as pending crate::PendingCosigns::send(&mut txn, self.set.set, &cosign); // Provide the transaction to queue it for work provide_transaction( self.set.set, &self.tributary, Transaction::Cosign { substrate_block_hash: cosign.block_hash }, ) .await; } txn.commit(); made_progress = true; } Ok(made_progress) } } } /// Takes the messages from ScanTributaryTask and publishes them to the message-queue. pub(crate) struct TributaryProcessorMessagesTask { tributary_db: TD, set: ValidatorSet, message_queue: Arc, } impl ContinuallyRan for TributaryProcessorMessagesTask { fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut made_progress = false; loop { let mut txn = self.tributary_db.txn(); let Some(msg) = ProcessorMessages::try_recv(&mut txn, self.set) else { break }; let metadata = Metadata { from: Service::Coordinator, to: Service::Processor(self.set.network), intent: msg.intent(), }; let msg = borsh::to_vec(&msg).unwrap(); self.message_queue.queue(metadata, msg).await?; txn.commit(); made_progress = true; } Ok(made_progress) } } } /// Checks for the notification to sign a slash report and does so if present. pub(crate) struct SignSlashReportTask { db: CD, tributary_db: TD, tributary: Tributary, set: NewSetInformation, key: Zeroizing<::F>, } impl ContinuallyRan for SignSlashReportTask { fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut txn = self.db.txn(); let Some(()) = SignSlashReport::try_recv(&mut txn, self.set.set) else { return Ok(false) }; // Fetch the slash report for this Tributary let mut tx = serai_coordinator_tributary::slash_report_transaction(&self.tributary_db, &self.set); tx.sign(&mut OsRng, self.tributary.genesis(), &self.key); let res = self.tributary.add_transaction(tx.clone()).await; match &res { // Fresh publication, already published Ok(true | false) => {} Err( TransactionError::TooLargeTransaction | TransactionError::InvalidSigner | TransactionError::InvalidNonce | TransactionError::InvalidSignature | TransactionError::InvalidContent, ) => { panic!("created an invalid SlashReport transaction, tx: {tx:?}, err: {res:?}"); } // We've published too many transactions recently // Drop this txn to try to publish it again later on a future iteration Err(TransactionError::TooManyInMempool) => return Ok(false), // This isn't a Provided transaction so this should never be hit Err(TransactionError::ProvidedAddedToMempool) => unreachable!(), } txn.commit(); Ok(true) } } } /// Run the scan task whenever the Tributary adds a new block. async fn scan_on_new_block( db: CD, set: ValidatorSet, tributary: Tributary, scan_tributary_task: TaskHandle, tasks_to_keep_alive: Vec, ) { loop { // Break once this Tributary is retired if crate::RetiredTributary::get(&db, set.network).map(|session| session.0) >= Some(set.session.0) { drop(tasks_to_keep_alive); break; } // Have the tributary scanner run as soon as there's a new block match tributary.next_block_notification().await.await { Ok(()) => scan_tributary_task.run_now(), // unreachable since this owns the tributary object and doesn't drop it Err(_) => panic!("tributary was dropped causing notification to error"), } } } /// Spawn a Tributary. /// /// This will: /// - Spawn the Tributary /// - Inform the P2P network of the Tributary /// - Spawn the ScanTributaryTask /// - Spawn the ProvideCosignCosignedTransactionsTask /// - Spawn the TributaryProcessorMessagesTask /// - Spawn the SignSlashReportTask /// - Iterate the scan task whenever a new block occurs (not just on the standard interval) pub(crate) async fn spawn_tributary( db: Db, message_queue: Arc, p2p: P, p2p_add_tributary: &mpsc::UnboundedSender<(ValidatorSet, Tributary)>, set: NewSetInformation, serai_key: Zeroizing<::F>, ) { // Don't spawn retired Tributaries if crate::db::RetiredTributary::get(&db, set.set.network).map(|session| session.0) >= Some(set.set.session.0) { return; } let genesis = <[u8; 32]>::from(Blake2s::::digest((set.serai_block, set.set).encode())); // Since the Serai block will be finalized, then cosigned, before we handle this, this time will // be a couple of minutes stale. While the Tributary will still function with a start time in the // past, the Tributary will immediately incur round timeouts. We reduce these by adding a // constant delay of a couple of minutes. const TRIBUTARY_START_TIME_DELAY: u64 = 120; let start_time = set.declaration_time + TRIBUTARY_START_TIME_DELAY; let mut tributary_validators = Vec::with_capacity(set.validators.len()); for (validator, weight) in set.validators.iter().copied() { let validator_key = ::read_G(&mut validator.0.as_slice()) .expect("Serai validator had an invalid public key"); let weight = u64::from(weight); tributary_validators.push((validator_key, weight)); } // Spawn the Tributary let tributary_db = crate::db::tributary_db(set.set); let tributary = Tributary::new( tributary_db.clone(), genesis, start_time, serai_key.clone(), tributary_validators, p2p, ) .await .unwrap(); let reader = tributary.reader(); // Inform the P2P network p2p_add_tributary .send((set.set, tributary.clone())) .expect("p2p's add_tributary channel was closed?"); // Spawn the task to provide Cosign/Cosigned transactions onto the Tributary let (provide_cosign_cosigned_transactions_task_def, provide_cosign_cosigned_transactions_task) = Task::new(); tokio::spawn( (ProvideCosignCosignedTransactionsTask { db: db.clone(), set: set.clone(), tributary: tributary.clone(), }) .continually_run(provide_cosign_cosigned_transactions_task_def, vec![]), ); // Spawn the task to send all messages from the Tributary scanner to the message-queue let (scan_tributary_messages_task_def, scan_tributary_messages_task) = Task::new(); tokio::spawn( (TributaryProcessorMessagesTask { tributary_db: tributary_db.clone(), set: set.set, message_queue, }) .continually_run(scan_tributary_messages_task_def, vec![]), ); // Spawn the scan task let (scan_tributary_task_def, scan_tributary_task) = Task::new(); tokio::spawn( ScanTributaryTask::<_, _, P>::new(db.clone(), tributary_db.clone(), &set, reader) // This is the only handle for this TributaryProcessorMessagesTask, so when this task is // dropped, it will be too .continually_run(scan_tributary_task_def, vec![scan_tributary_messages_task]), ); // Spawn the sign slash report task let (sign_slash_report_task_def, sign_slash_report_task) = Task::new(); tokio::spawn( (SignSlashReportTask { db: db.clone(), tributary_db, tributary: tributary.clone(), set: set.clone(), key: serai_key, }) .continually_run(sign_slash_report_task_def, vec![]), ); // Whenever a new block occurs, immediately run the scan task // This function also preserves the ProvideCosignCosignedTransactionsTask handle until the // Tributary is retired, ensuring it isn't dropped prematurely and that the task don't run ad // infinitum tokio::spawn(scan_on_new_block( db, set.set, tributary, scan_tributary_task, vec![provide_cosign_cosigned_transactions_task, sign_slash_report_task], )); }