diff --git a/common/task/src/lib.rs b/common/task/src/lib.rs index 2a061c10..64cf9416 100644 --- a/common/task/src/lib.rs +++ b/common/task/src/lib.rs @@ -2,7 +2,11 @@ #![doc = include_str!("../README.md")] #![deny(missing_docs)] -use core::{future::Future, time::Duration}; +use core::{ + fmt::{self, Debug}, + future::Future, + time::Duration, +}; use tokio::sync::mpsc; @@ -60,6 +64,15 @@ impl TaskHandle { } } +/// An enum which can't be constructed, representing that the task does not error. +pub enum DoesNotError {} +impl Debug for DoesNotError { + fn fmt(&self, _: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + // This type can't be constructed so we'll never have a `&self` to call this fn with + unreachable!() + } +} + /// A task to be continually ran. pub trait ContinuallyRan: Sized + Send { /// The amount of seconds before this task should be polled again. @@ -69,11 +82,14 @@ pub trait ContinuallyRan: Sized + Send { /// Upon error, the amount of time waited will be linearly increased until this limit. const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 120; + /// The error potentially yielded upon running an iteration of this task. + type Error: Debug; + /// Run an iteration of the task. /// /// If this returns `true`, all dependents of the task will immediately have a new iteration ran /// (without waiting for whatever timer they were already on). - fn run_iteration(&mut self) -> impl Send + Future>; + fn run_iteration(&mut self) -> impl Send + Future>; /// Continually run the task. fn continually_run( @@ -115,12 +131,17 @@ pub trait ContinuallyRan: Sized + Send { } } Err(e) => { - log::warn!("{}", e); + log::warn!("{e:?}"); increase_sleep_before_next_task(&mut current_sleep_before_next_task); } } // Don't run the task again for another few seconds UNLESS told to run now + /* + We could replace tokio::mpsc with async_channel, tokio::time::sleep with + patchable_async_sleep::sleep, and tokio::select with futures_lite::future::or + It isn't worth the effort when patchable_async_sleep::sleep will still resolve to tokio + */ tokio::select! { () = tokio::time::sleep(Duration::from_secs(current_sleep_before_next_task)) => {}, msg = task.run_now.recv() => { diff --git a/coordinator/cosign/src/delay.rs b/coordinator/cosign/src/delay.rs index 5593eaf7..3439135b 100644 --- a/coordinator/cosign/src/delay.rs +++ b/coordinator/cosign/src/delay.rs @@ -2,7 +2,7 @@ use core::future::Future; use std::time::{Duration, SystemTime}; use serai_db::*; -use serai_task::ContinuallyRan; +use serai_task::{DoesNotError, ContinuallyRan}; use crate::evaluator::CosignedBlocks; @@ -25,7 +25,9 @@ pub(crate) struct CosignDelayTask { } impl ContinuallyRan for CosignDelayTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = DoesNotError; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut made_progress = false; loop { diff --git a/coordinator/cosign/src/evaluator.rs b/coordinator/cosign/src/evaluator.rs index db286a4f..4216d5a7 100644 --- a/coordinator/cosign/src/evaluator.rs +++ b/coordinator/cosign/src/evaluator.rs @@ -80,7 +80,9 @@ pub(crate) struct CosignEvaluatorTask { } impl ContinuallyRan for CosignEvaluatorTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut known_cosign = None; let mut made_progress = false; diff --git a/coordinator/cosign/src/intend.rs b/coordinator/cosign/src/intend.rs index ebe3513c..c42c2d12 100644 --- a/coordinator/cosign/src/intend.rs +++ b/coordinator/cosign/src/intend.rs @@ -61,7 +61,9 @@ pub(crate) struct CosignIntendTask { } impl ContinuallyRan for CosignIntendTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let start_block_number = ScanCosignFrom::get(&self.db).unwrap_or(1); let latest_block_number = diff --git a/coordinator/p2p/libp2p/src/dial.rs b/coordinator/p2p/libp2p/src/dial.rs index 1530e34b..b001446b 100644 --- a/coordinator/p2p/libp2p/src/dial.rs +++ b/coordinator/p2p/libp2p/src/dial.rs @@ -5,7 +5,7 @@ use rand_core::{RngCore, OsRng}; use tokio::sync::mpsc; -use serai_client::Serai; +use serai_client::{SeraiError, Serai}; use libp2p::{ core::multiaddr::{Protocol, Multiaddr}, @@ -50,7 +50,9 @@ impl ContinuallyRan for DialTask { const DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60; const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 10 * 60; - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = SeraiError; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { self.validators.update().await?; @@ -83,8 +85,7 @@ impl ContinuallyRan for DialTask { .unwrap_or(0) .saturating_sub(1)) { - let mut potential_peers = - self.serai.p2p_validators(network).await.map_err(|e| format!("{e:?}"))?; + let mut potential_peers = self.serai.p2p_validators(network).await?; for _ in 0 .. (TARGET_PEERS_PER_NETWORK - peer_count) { if potential_peers.is_empty() { break; diff --git a/coordinator/p2p/libp2p/src/validators.rs b/coordinator/p2p/libp2p/src/validators.rs index 951a5e99..0395ff3a 100644 --- a/coordinator/p2p/libp2p/src/validators.rs +++ b/coordinator/p2p/libp2p/src/validators.rs @@ -4,7 +4,7 @@ use std::{ collections::{HashSet, HashMap}, }; -use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, Serai}; +use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, SeraiError, Serai}; use serai_task::{Task, ContinuallyRan}; @@ -50,9 +50,8 @@ impl Validators { async fn session_changes( serai: impl Borrow, sessions: impl Borrow>, - ) -> Result)>, String> { - let temporal_serai = - serai.borrow().as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?; + ) -> Result)>, SeraiError> { + let temporal_serai = serai.borrow().as_of_latest_finalized_block().await?; let temporal_serai = temporal_serai.validator_sets(); let mut session_changes = vec![]; @@ -69,7 +68,7 @@ impl Validators { let session = match temporal_serai.session(network).await { Ok(Some(session)) => session, Ok(None) => return Ok(None), - Err(e) => return Err(format!("{e:?}")), + Err(e) => return Err(e), }; if sessions.get(&network) == Some(&session) { @@ -81,7 +80,7 @@ impl Validators { session, validators.into_iter().map(peer_id_from_public).collect(), ))), - Err(e) => Err(format!("{e:?}")), + Err(e) => Err(e), } } }); @@ -147,7 +146,7 @@ impl Validators { } /// Update the view of the validators. - pub(crate) async fn update(&mut self) -> Result<(), String> { + pub(crate) async fn update(&mut self) -> Result<(), SeraiError> { let session_changes = Self::session_changes(&*self.serai, &self.sessions).await?; self.incorporate_session_changes(session_changes); Ok(()) @@ -200,13 +199,13 @@ impl ContinuallyRan for UpdateValidatorsTask { const DELAY_BETWEEN_ITERATIONS: u64 = 60; const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60; - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = SeraiError; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let session_changes = { let validators = self.validators.read().await; - Validators::session_changes(validators.serai.clone(), validators.sessions.clone()) - .await - .map_err(|e| format!("{e:?}"))? + Validators::session_changes(validators.serai.clone(), validators.sessions.clone()).await? }; self.validators.write().await.incorporate_session_changes(session_changes); Ok(true) diff --git a/coordinator/p2p/src/heartbeat.rs b/coordinator/p2p/src/heartbeat.rs index 8a2f3220..f13a0e5c 100644 --- a/coordinator/p2p/src/heartbeat.rs +++ b/coordinator/p2p/src/heartbeat.rs @@ -45,7 +45,9 @@ pub(crate) struct HeartbeatTask { } impl ContinuallyRan for HeartbeatTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { // If our blockchain hasn't had a block in the past minute, trigger the heartbeat protocol const TIME_TO_TRIGGER_SYNCING: Duration = Duration::from_secs(60); diff --git a/coordinator/src/serai.rs b/coordinator/src/serai.rs index b0093002..20599b3d 100644 --- a/coordinator/src/serai.rs +++ b/coordinator/src/serai.rs @@ -20,7 +20,9 @@ pub struct PublishSlashReportTask { serai: Arc, } impl ContinuallyRan for PublishSlashReportTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut made_progress = false; for network in serai_client::primitives::NETWORKS { diff --git a/coordinator/src/substrate.rs b/coordinator/src/substrate.rs index 224b6278..7601b2cc 100644 --- a/coordinator/src/substrate.rs +++ b/coordinator/src/substrate.rs @@ -32,7 +32,8 @@ pub(crate) struct SubstrateTask { } impl ContinuallyRan for SubstrateTask

{ - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; // TODO + fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut made_progress = false; diff --git a/coordinator/src/tributary.rs b/coordinator/src/tributary.rs index a96cf225..a445be16 100644 --- a/coordinator/src/tributary.rs +++ b/coordinator/src/tributary.rs @@ -15,7 +15,7 @@ use serai_client::validator_sets::primitives::ValidatorSet; use tributary_sdk::{TransactionKind, TransactionError, ProvidedError, TransactionTrait, Tributary}; -use serai_task::{Task, TaskHandle, ContinuallyRan}; +use serai_task::{Task, TaskHandle, DoesNotError, ContinuallyRan}; use message_queue::{Service, Metadata, client::MessageQueue}; @@ -76,7 +76,9 @@ pub(crate) struct ProvideCosignCosignedTransactionsTask ContinuallyRan for ProvideCosignCosignedTransactionsTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut made_progress = false; @@ -154,7 +156,9 @@ pub(crate) struct AddTributaryTransactionsTask key: Zeroizing<::F>, } impl ContinuallyRan for AddTributaryTransactionsTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = DoesNotError; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut made_progress = false; loop { @@ -212,7 +216,9 @@ pub(crate) struct TributaryProcessorMessagesTask { message_queue: Arc, } impl ContinuallyRan for TributaryProcessorMessagesTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; // TODO + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut made_progress = false; loop { @@ -242,7 +248,9 @@ pub(crate) struct SignSlashReportTask { key: Zeroizing<::F>, } impl ContinuallyRan for SignSlashReportTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = DoesNotError; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut txn = self.db.txn(); let Some(()) = SignSlashReport::try_recv(&mut txn, self.set.set) else { return Ok(false) }; diff --git a/coordinator/substrate/src/canonical.rs b/coordinator/substrate/src/canonical.rs index e1bbe6c2..34165774 100644 --- a/coordinator/substrate/src/canonical.rs +++ b/coordinator/substrate/src/canonical.rs @@ -34,7 +34,9 @@ impl CanonicalEventStream { } impl ContinuallyRan for CanonicalEventStream { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let next_block = NextBlock::get(&self.db).unwrap_or(0); let latest_finalized_block = diff --git a/coordinator/substrate/src/ephemeral.rs b/coordinator/substrate/src/ephemeral.rs index 54df6b3c..eacfed9d 100644 --- a/coordinator/substrate/src/ephemeral.rs +++ b/coordinator/substrate/src/ephemeral.rs @@ -39,7 +39,9 @@ impl EphemeralEventStream { } impl ContinuallyRan for EphemeralEventStream { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let next_block = NextBlock::get(&self.db).unwrap_or(0); let latest_finalized_block = diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index e897afe5..83300a0d 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -512,7 +512,9 @@ impl ScanTributaryTask { } impl ContinuallyRan for ScanTributaryTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let (mut last_block_number, mut last_block_hash) = TributaryDb::last_handled_tributary_block(&self.tributary_db, self.set) diff --git a/processor/bin/src/coordinator.rs b/processor/bin/src/coordinator.rs index ffafd466..591826bd 100644 --- a/processor/bin/src/coordinator.rs +++ b/processor/bin/src/coordinator.rs @@ -95,6 +95,7 @@ impl Coordinator { message_queue.ack(Service::Coordinator, msg.id).await; // Fire that there's a new message + // This assumes the success path, not the just-rebooted-path received_message_send .send(()) .expect("failed to tell the Coordinator there's a new message"); diff --git a/processor/bitcoin/src/txindex.rs b/processor/bitcoin/src/txindex.rs index 6a55a4c4..2ba40ca8 100644 --- a/processor/bitcoin/src/txindex.rs +++ b/processor/bitcoin/src/txindex.rs @@ -39,7 +39,9 @@ pub(crate) fn script_pubkey_for_on_chain_output( pub(crate) struct TxIndexTask(pub(crate) Rpc); impl ContinuallyRan for TxIndexTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let latest_block_number = self .0 diff --git a/processor/scanner/src/batch/mod.rs b/processor/scanner/src/batch/mod.rs index 158306ca..736c3ac4 100644 --- a/processor/scanner/src/batch/mod.rs +++ b/processor/scanner/src/batch/mod.rs @@ -7,7 +7,10 @@ use serai_db::{DbTxn, Db}; use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch}; -use primitives::{EncodableG, task::ContinuallyRan}; +use primitives::{ + EncodableG, + task::{DoesNotError, ContinuallyRan}, +}; use crate::{ db::{Returnable, ScannerGlobalDb, InInstructionData, ScanToBatchDb, BatchData, BatchToReportDb}, index, @@ -60,7 +63,9 @@ impl BatchTask { } impl ContinuallyRan for BatchTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = DoesNotError; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let highest_batchable = { // Fetch the next to scan block diff --git a/processor/scanner/src/eventuality/mod.rs b/processor/scanner/src/eventuality/mod.rs index bb3e4b7e..8a416903 100644 --- a/processor/scanner/src/eventuality/mod.rs +++ b/processor/scanner/src/eventuality/mod.rs @@ -190,7 +190,9 @@ impl> EventualityTask { } impl> ContinuallyRan for EventualityTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { // Fetch the highest acknowledged block let Some(highest_acknowledged) = ScannerGlobalDb::::highest_acknowledged_block(&self.db) diff --git a/processor/scanner/src/index/mod.rs b/processor/scanner/src/index/mod.rs index 03abc8a8..50032bae 100644 --- a/processor/scanner/src/index/mod.rs +++ b/processor/scanner/src/index/mod.rs @@ -58,7 +58,9 @@ impl IndexTask { } impl ContinuallyRan for IndexTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { // Fetch the latest finalized block let our_latest_finalized = IndexDb::latest_finalized_block(&self.db) diff --git a/processor/scanner/src/report/mod.rs b/processor/scanner/src/report/mod.rs index 2a7ab6a1..9055fcd0 100644 --- a/processor/scanner/src/report/mod.rs +++ b/processor/scanner/src/report/mod.rs @@ -4,7 +4,7 @@ use serai_db::{DbTxn, Db}; use serai_validator_sets_primitives::Session; -use primitives::task::ContinuallyRan; +use primitives::task::{DoesNotError, ContinuallyRan}; use crate::{ db::{BatchData, BatchToReportDb, BatchesToSign}, substrate, ScannerFeed, @@ -27,7 +27,9 @@ impl ReportTask { } impl ContinuallyRan for ReportTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = DoesNotError; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut made_progress = false; loop { diff --git a/processor/scanner/src/scan/mod.rs b/processor/scanner/src/scan/mod.rs index 25127ace..24426c62 100644 --- a/processor/scanner/src/scan/mod.rs +++ b/processor/scanner/src/scan/mod.rs @@ -98,7 +98,9 @@ impl ScanTask { } impl ContinuallyRan for ScanTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { // Fetch the safe to scan block let latest_scannable = diff --git a/processor/scanner/src/substrate/mod.rs b/processor/scanner/src/substrate/mod.rs index 6b22a259..4963f66b 100644 --- a/processor/scanner/src/substrate/mod.rs +++ b/processor/scanner/src/substrate/mod.rs @@ -5,7 +5,7 @@ use serai_db::{Get, DbTxn, Db}; use serai_coins_primitives::{OutInstruction, OutInstructionWithBalance}; use messages::substrate::ExecutedBatch; -use primitives::task::ContinuallyRan; +use primitives::task::{DoesNotError, ContinuallyRan}; use crate::{ db::{ScannerGlobalDb, SubstrateToEventualityDb, AcknowledgedBatches}, index, batch, ScannerFeed, KeyFor, @@ -50,7 +50,9 @@ impl SubstrateTask { } impl ContinuallyRan for SubstrateTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = DoesNotError; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut made_progress = false; loop { diff --git a/processor/signers/src/batch/mod.rs b/processor/signers/src/batch/mod.rs index c791f4e0..2c4fd1f5 100644 --- a/processor/signers/src/batch/mod.rs +++ b/processor/signers/src/batch/mod.rs @@ -14,7 +14,7 @@ use serai_db::{Get, DbTxn, Db}; use messages::sign::VariantSignId; -use primitives::task::ContinuallyRan; +use primitives::task::{DoesNotError, ContinuallyRan}; use scanner::{BatchesToSign, AcknowledgedBatches}; use frost_attempt_manager::*; @@ -79,7 +79,9 @@ impl BatchSignerTask { } impl ContinuallyRan for BatchSignerTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = DoesNotError; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut iterated = false; diff --git a/processor/signers/src/coordinator/mod.rs b/processor/signers/src/coordinator/mod.rs index 319f098c..003c14cd 100644 --- a/processor/signers/src/coordinator/mod.rs +++ b/processor/signers/src/coordinator/mod.rs @@ -22,7 +22,9 @@ impl CoordinatorTask { } impl ContinuallyRan for CoordinatorTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut iterated = false; diff --git a/processor/signers/src/cosign/mod.rs b/processor/signers/src/cosign/mod.rs index 2de18e86..dc5de6cd 100644 --- a/processor/signers/src/cosign/mod.rs +++ b/processor/signers/src/cosign/mod.rs @@ -11,7 +11,7 @@ use serai_db::{DbTxn, Db}; use messages::{sign::VariantSignId, coordinator::cosign_block_msg}; -use primitives::task::ContinuallyRan; +use primitives::task::{DoesNotError, ContinuallyRan}; use frost_attempt_manager::*; @@ -51,7 +51,9 @@ impl CosignerTask { } impl ContinuallyRan for CosignerTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = DoesNotError; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut iterated = false; diff --git a/processor/signers/src/slash_report.rs b/processor/signers/src/slash_report.rs index 577ec90b..a5d155ef 100644 --- a/processor/signers/src/slash_report.rs +++ b/processor/signers/src/slash_report.rs @@ -13,7 +13,7 @@ use serai_db::{DbTxn, Db}; use messages::sign::VariantSignId; -use primitives::task::ContinuallyRan; +use primitives::task::{DoesNotError, ContinuallyRan}; use scanner::ScannerFeed; use frost_attempt_manager::*; @@ -52,7 +52,9 @@ impl SlashReportSignerTask { } impl ContinuallyRan for SlashReportSignerTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = DoesNotError; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut iterated = false; diff --git a/processor/signers/src/transaction/mod.rs b/processor/signers/src/transaction/mod.rs index efb20217..b62e7303 100644 --- a/processor/signers/src/transaction/mod.rs +++ b/processor/signers/src/transaction/mod.rs @@ -92,7 +92,9 @@ impl> impl>> ContinuallyRan for TransactionSignerTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = P::EphemeralError; + + fn run_iteration(&mut self) -> impl Send + Future> { async { let mut iterated = false; @@ -222,11 +224,7 @@ impl> let tx = TransactionFor::::read(&mut tx_buf).unwrap(); assert!(tx_buf.is_empty()); - self - .publisher - .publish(tx) - .await - .map_err(|e| format!("couldn't re-broadcast transactions: {e:?}"))?; + self.publisher.publish(tx).await?; } self.last_publication = Instant::now();