From 6d5049cab26872e9e3d28acc7dcf2b698822542e Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sat, 11 Jan 2025 02:10:15 -0500 Subject: [PATCH] Move the task providing transactions onto the Tributary to the Tributary module Slims down the main file a bit --- coordinator/src/main.rs | 113 +--------------------------- coordinator/src/tributary/mod.rs | 124 ++++++++++++++++++++++++++++++- 2 files changed, 124 insertions(+), 113 deletions(-) diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 496d9ac1..842f7f9c 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -151,7 +151,7 @@ async fn spawn_tributary( let mut validators = Vec::with_capacity(set.validators.len()); let mut total_weight = 0; let mut validator_weights = HashMap::with_capacity(set.validators.len()); - for (validator, weight) in set.validators { + for (validator, weight) in set.validators.iter().copied() { let validator_key = ::read_G(&mut validator.0.as_slice()) .expect("Serai validator had an invalid public key"); let validator = SeraiAddress::from(validator); @@ -197,116 +197,7 @@ async fn spawn_tributary( .continually_run(scan_tributary_task_def, vec![scan_tributary_messages_task]), ); - tokio::spawn(async move { - loop { - // Break once this Tributary is retired - if RetiredTributary::get(&db, set.set.network).map(|session| session.0) >= - Some(set.set.session.0) - { - break; - } - - let provide = |tributary: Tributary<_>, scan_tributary_task, tx: Transaction| async move { - match tributary.provide_transaction(tx.clone()).await { - // The Tributary uses its own DB, so we may provide this multiple times if we reboot - // before committing the txn which provoked this - Ok(()) | Err(ProvidedError::AlreadyProvided) => {} - Err(ProvidedError::NotProvided) => { - panic!("providing a Transaction which wasn't a Provided transaction?"); - } - Err(ProvidedError::InvalidProvided(e)) => { - panic!("providing an invalid Provided transaction: {e:?}") - } - Err(ProvidedError::LocalMismatchesOnChain) => { - // Drop the Tributary and scan Tributary task so we don't continue running them here - drop(tributary); - drop(scan_tributary_task); - - loop { - // We're actually only halting the Tributary's scan task (which already only scans - // if all Provided transactions align) as the P2P task is still maintaining a clone - // of the Tributary handle - log::error!( - "Tributary {:?} was supposed to provide {:?} but peers disagree, halting Tributary", - set.set, - tx, - ); - // Print this every five minutes as this does need to be handled - tokio::time::sleep(Duration::from_secs(5 * 60)).await; - } - - // Declare this unreachable so Rust will let us perform the above drops - unreachable!(); - } - } - (tributary, scan_tributary_task) - }; - - // Check if we produced any cosigns we were supposed to - let mut pending_notable_cosign = false; - loop { - let mut txn = db.txn(); - - // Fetch the next cosign this tributary should handle - let Some(cosign) = PendingCosigns::try_recv(&mut txn, set.set) else { break }; - pending_notable_cosign = cosign.notable; - - // If we (Serai) haven't cosigned this block, break as this is still pending - let Ok(latest) = Cosigning::::latest_cosigned_block_number(&txn) else { break }; - if latest < cosign.block_number { - break; - } - - // Because we've cosigned it, provide the TX for that - (tributary, scan_tributary_task) = provide( - tributary, - scan_tributary_task, - Transaction::Cosigned { substrate_block_hash: cosign.block_hash }, - ) - .await; - // Clear pending_notable_cosign since this cosign isn't pending - pending_notable_cosign = false; - - // Commit the txn to clear this from PendingCosigns - txn.commit(); - } - - // If we don't have any notable cosigns pending, provide the next set of cosign intents - if pending_notable_cosign { - let mut txn = db.txn(); - // intended_cosigns will only yield up to and including the next notable cosign - for cosign in Cosigning::::intended_cosigns(&mut txn, set.set) { - // Flag this cosign as pending - PendingCosigns::send(&mut txn, set.set, &cosign); - // Provide the transaction to queue it for work - (tributary, scan_tributary_task) = provide( - tributary, - scan_tributary_task, - Transaction::Cosign { substrate_block_hash: cosign.block_hash }, - ) - .await; - } - txn.commit(); - } - - // Have the tributary scanner run as soon as there's a new block - // This is wrapped in a timeout so we don't go too long without running the above code - match tokio::time::timeout( - Duration::from_millis(::tributary::tendermint::TARGET_BLOCK_TIME.into()), - tributary.next_block_notification().await, - ) - .await - { - // Future resolved within the timeout, notification - Ok(Ok(())) => scan_tributary_task.run_now(), - // Future resolved within the timeout, notification failed due to sender being dropped - // unreachable since this owns the tributary object and doesn't drop it - Ok(Err(_)) => panic!("tributary was dropped causing notification to error"), - // Future didn't resolve within the timeout - Err(_) => {} - } - } - }); + tokio::spawn(tributary::run(db, set, tributary, scan_tributary_task)); } #[tokio::main] diff --git a/coordinator/src/tributary/mod.rs b/coordinator/src/tributary/mod.rs index 6b7e3dbb..4ca3cbbe 100644 --- a/coordinator/src/tributary/mod.rs +++ b/coordinator/src/tributary/mod.rs @@ -1,14 +1,17 @@ -use core::future::Future; +use core::{future::Future, time::Duration}; use std::sync::Arc; use serai_db::{DbTxn, Db}; use serai_client::validator_sets::primitives::ValidatorSet; -use serai_task::ContinuallyRan; +use ::tributary::{ProvidedError, Tributary}; + +use serai_task::{TaskHandle, ContinuallyRan}; use message_queue::{Service, Metadata, client::MessageQueue}; +use serai_cosign::Cosigning; use serai_coordinator_substrate::NewSetInformation; use serai_coordinator_p2p::P2p; @@ -25,6 +28,7 @@ pub(crate) struct ScanTributaryMessagesTask { pub(crate) set: ValidatorSet, pub(crate) message_queue: Arc, } + impl ContinuallyRan for ScanTributaryMessagesTask { fn run_iteration(&mut self) -> impl Send + Future> { async move { @@ -47,3 +51,119 @@ impl ContinuallyRan for ScanTributaryMessagesTask { } } } + +async fn provide_transaction( + set: ValidatorSet, + tributary: &Tributary, + tx: Transaction, +) { + match tributary.provide_transaction(tx.clone()).await { + // The Tributary uses its own DB, so we may provide this multiple times if we reboot before + // committing the txn which provoked this + Ok(()) | Err(ProvidedError::AlreadyProvided) => {} + Err(ProvidedError::NotProvided) => { + panic!("providing a Transaction which wasn't a Provided transaction: {tx:?}"); + } + Err(ProvidedError::InvalidProvided(e)) => { + panic!("providing an invalid Provided transaction, tx: {tx:?}, error: {e:?}") + } + Err(ProvidedError::LocalMismatchesOnChain) => loop { + // The Tributary's scan task won't advance if we don't have the Provided transactions + // present on-chain, and this enters an infinite loop to block the calling task from + // advancing + log::error!( + "Tributary {:?} was supposed to provide {:?} but peers disagree, halting Tributary", + set, + tx, + ); + // Print this every five minutes as this does need to be handled + tokio::time::sleep(Duration::from_secs(5 * 60)).await; + }, + } +} + +/// Run a Tributary. +/// +/// The Tributary handle existing causes the Tributary's consensus engine to be run. We distinctly +/// have `ScanTributaryTask` to scan the produced blocks. This function provides Provided +/// transactions onto the Tributary and invokes ScanTributaryTask whenver a new Tributary block is +/// produced (instead of only on the standard interval). +pub(crate) async fn run( + mut db: CD, + set: NewSetInformation, + tributary: Tributary, + scan_tributary_task: TaskHandle, +) { + loop { + // Break once this Tributary is retired + if crate::RetiredTributary::get(&db, set.set.network).map(|session| session.0) >= + Some(set.set.session.0) + { + break; + } + + // Check if we produced any cosigns we were supposed to + let mut pending_notable_cosign = false; + loop { + let mut txn = db.txn(); + + // Fetch the next cosign this tributary should handle + let Some(cosign) = crate::PendingCosigns::try_recv(&mut txn, set.set) else { break }; + pending_notable_cosign = cosign.notable; + + // If we (Serai) haven't cosigned this block, break as this is still pending + let Ok(latest) = Cosigning::::latest_cosigned_block_number(&txn) else { break }; + if latest < cosign.block_number { + break; + } + + // Because we've cosigned it, provide the TX for that + provide_transaction( + set.set, + &tributary, + Transaction::Cosigned { substrate_block_hash: cosign.block_hash }, + ) + .await; + // Clear pending_notable_cosign since this cosign isn't pending + pending_notable_cosign = false; + + // Commit the txn to clear this from PendingCosigns + txn.commit(); + } + + // If we don't have any notable cosigns pending, provide the next set of cosign intents + if pending_notable_cosign { + let mut txn = db.txn(); + // intended_cosigns will only yield up to and including the next notable cosign + for cosign in Cosigning::::intended_cosigns(&mut txn, set.set) { + // Flag this cosign as pending + crate::PendingCosigns::send(&mut txn, set.set, &cosign); + // Provide the transaction to queue it for work + provide_transaction( + set.set, + &tributary, + Transaction::Cosign { substrate_block_hash: cosign.block_hash }, + ) + .await; + } + txn.commit(); + } + + // Have the tributary scanner run as soon as there's a new block + // This is wrapped in a timeout so we don't go too long without running the above code + match tokio::time::timeout( + Duration::from_millis(::tributary::tendermint::TARGET_BLOCK_TIME.into()), + tributary.next_block_notification().await, + ) + .await + { + // Future resolved within the timeout, notification + Ok(Ok(())) => scan_tributary_task.run_now(), + // Future resolved within the timeout, notification failed due to sender being dropped + // unreachable since this owns the tributary object and doesn't drop it + Ok(Err(_)) => panic!("tributary was dropped causing notification to error"), + // Future didn't resolve within the timeout + Err(_) => {} + } + } +}