From b5a6b0693e04783283db070a5dcac88ea98fd0b4 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sun, 12 Jan 2025 18:29:08 -0500 Subject: [PATCH] Add a proper error type to ContinuallyRan This isn't necessary. Because we just log the error, we never match off of it, we don't need any structure beyond String (or now Debug, which still gives us a way to print the error). This is for the ergonomics of not having to constantly write `.map_err(|e| format!("{e:?}"))`. --- common/task/src/lib.rs | 27 +++++++++++++++++++++--- coordinator/cosign/src/delay.rs | 6 ++++-- coordinator/cosign/src/evaluator.rs | 4 +++- coordinator/cosign/src/intend.rs | 4 +++- coordinator/p2p/libp2p/src/dial.rs | 9 ++++---- coordinator/p2p/libp2p/src/validators.rs | 21 +++++++++--------- coordinator/p2p/src/heartbeat.rs | 4 +++- coordinator/src/serai.rs | 4 +++- coordinator/src/substrate.rs | 3 ++- coordinator/src/tributary.rs | 18 +++++++++++----- coordinator/substrate/src/canonical.rs | 4 +++- coordinator/substrate/src/ephemeral.rs | 4 +++- coordinator/tributary/src/lib.rs | 4 +++- processor/bin/src/coordinator.rs | 1 + processor/bitcoin/src/txindex.rs | 4 +++- processor/scanner/src/batch/mod.rs | 9 ++++++-- processor/scanner/src/eventuality/mod.rs | 4 +++- processor/scanner/src/index/mod.rs | 4 +++- processor/scanner/src/report/mod.rs | 6 ++++-- processor/scanner/src/scan/mod.rs | 4 +++- processor/scanner/src/substrate/mod.rs | 6 ++++-- processor/signers/src/batch/mod.rs | 6 ++++-- processor/signers/src/coordinator/mod.rs | 4 +++- processor/signers/src/cosign/mod.rs | 6 ++++-- processor/signers/src/slash_report.rs | 6 ++++-- processor/signers/src/transaction/mod.rs | 10 ++++----- 26 files changed, 126 insertions(+), 56 deletions(-) diff --git a/common/task/src/lib.rs b/common/task/src/lib.rs index 2a061c10..64cf9416 100644 --- a/common/task/src/lib.rs +++ b/common/task/src/lib.rs @@ -2,7 +2,11 @@ #![doc = include_str!("../README.md")] #![deny(missing_docs)] -use core::{future::Future, time::Duration}; +use core::{ + fmt::{self, Debug}, + future::Future, + time::Duration, +}; use tokio::sync::mpsc; @@ -60,6 +64,15 @@ impl TaskHandle { } } +/// An enum which can't be constructed, representing that the task does not error. +pub enum DoesNotError {} +impl Debug for DoesNotError { + fn fmt(&self, _: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + // This type can't be constructed so we'll never have a `&self` to call this fn with + unreachable!() + } +} + /// A task to be continually ran. pub trait ContinuallyRan: Sized + Send { /// The amount of seconds before this task should be polled again. @@ -69,11 +82,14 @@ pub trait ContinuallyRan: Sized + Send { /// Upon error, the amount of time waited will be linearly increased until this limit. const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 120; + /// The error potentially yielded upon running an iteration of this task. + type Error: Debug; + /// Run an iteration of the task. /// /// If this returns `true`, all dependents of the task will immediately have a new iteration ran /// (without waiting for whatever timer they were already on). - fn run_iteration(&mut self) -> impl Send + Future>; + fn run_iteration(&mut self) -> impl Send + Future>; /// Continually run the task. fn continually_run( @@ -115,12 +131,17 @@ pub trait ContinuallyRan: Sized + Send { } } Err(e) => { - log::warn!("{}", e); + log::warn!("{e:?}"); increase_sleep_before_next_task(&mut current_sleep_before_next_task); } } // Don't run the task again for another few seconds UNLESS told to run now + /* + We could replace tokio::mpsc with async_channel, tokio::time::sleep with + patchable_async_sleep::sleep, and tokio::select with futures_lite::future::or + It isn't worth the effort when patchable_async_sleep::sleep will still resolve to tokio + */ tokio::select! { () = tokio::time::sleep(Duration::from_secs(current_sleep_before_next_task)) => {}, msg = task.run_now.recv() => { diff --git a/coordinator/cosign/src/delay.rs b/coordinator/cosign/src/delay.rs index 5593eaf7..3439135b 100644 --- a/coordinator/cosign/src/delay.rs +++ b/coordinator/cosign/src/delay.rs @@ -2,7 +2,7 @@ use core::future::Future; use std::time::{Duration, SystemTime}; use serai_db::*; -use serai_task::ContinuallyRan; +use serai_task::{DoesNotError, ContinuallyRan}; use crate::evaluator::CosignedBlocks; @@ -25,7 +25,9 @@ pub(crate) struct CosignDelayTask { } impl ContinuallyRan for CosignDelayTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = DoesNotError; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut made_progress = false; loop { diff --git a/coordinator/cosign/src/evaluator.rs b/coordinator/cosign/src/evaluator.rs index db286a4f..4216d5a7 100644 --- a/coordinator/cosign/src/evaluator.rs +++ b/coordinator/cosign/src/evaluator.rs @@ -80,7 +80,9 @@ pub(crate) struct CosignEvaluatorTask { } impl ContinuallyRan for CosignEvaluatorTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut known_cosign = None; let mut made_progress = false; diff --git a/coordinator/cosign/src/intend.rs b/coordinator/cosign/src/intend.rs index ebe3513c..c42c2d12 100644 --- a/coordinator/cosign/src/intend.rs +++ b/coordinator/cosign/src/intend.rs @@ -61,7 +61,9 @@ pub(crate) struct CosignIntendTask { } impl ContinuallyRan for CosignIntendTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let start_block_number = ScanCosignFrom::get(&self.db).unwrap_or(1); let latest_block_number = diff --git a/coordinator/p2p/libp2p/src/dial.rs b/coordinator/p2p/libp2p/src/dial.rs index 1530e34b..b001446b 100644 --- a/coordinator/p2p/libp2p/src/dial.rs +++ b/coordinator/p2p/libp2p/src/dial.rs @@ -5,7 +5,7 @@ use rand_core::{RngCore, OsRng}; use tokio::sync::mpsc; -use serai_client::Serai; +use serai_client::{SeraiError, Serai}; use libp2p::{ core::multiaddr::{Protocol, Multiaddr}, @@ -50,7 +50,9 @@ impl ContinuallyRan for DialTask { const DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60; const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 10 * 60; - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = SeraiError; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { self.validators.update().await?; @@ -83,8 +85,7 @@ impl ContinuallyRan for DialTask { .unwrap_or(0) .saturating_sub(1)) { - let mut potential_peers = - self.serai.p2p_validators(network).await.map_err(|e| format!("{e:?}"))?; + let mut potential_peers = self.serai.p2p_validators(network).await?; for _ in 0 .. (TARGET_PEERS_PER_NETWORK - peer_count) { if potential_peers.is_empty() { break; diff --git a/coordinator/p2p/libp2p/src/validators.rs b/coordinator/p2p/libp2p/src/validators.rs index 951a5e99..0395ff3a 100644 --- a/coordinator/p2p/libp2p/src/validators.rs +++ b/coordinator/p2p/libp2p/src/validators.rs @@ -4,7 +4,7 @@ use std::{ collections::{HashSet, HashMap}, }; -use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, Serai}; +use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, SeraiError, Serai}; use serai_task::{Task, ContinuallyRan}; @@ -50,9 +50,8 @@ impl Validators { async fn session_changes( serai: impl Borrow, sessions: impl Borrow>, - ) -> Result)>, String> { - let temporal_serai = - serai.borrow().as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?; + ) -> Result)>, SeraiError> { + let temporal_serai = serai.borrow().as_of_latest_finalized_block().await?; let temporal_serai = temporal_serai.validator_sets(); let mut session_changes = vec![]; @@ -69,7 +68,7 @@ impl Validators { let session = match temporal_serai.session(network).await { Ok(Some(session)) => session, Ok(None) => return Ok(None), - Err(e) => return Err(format!("{e:?}")), + Err(e) => return Err(e), }; if sessions.get(&network) == Some(&session) { @@ -81,7 +80,7 @@ impl Validators { session, validators.into_iter().map(peer_id_from_public).collect(), ))), - Err(e) => Err(format!("{e:?}")), + Err(e) => Err(e), } } }); @@ -147,7 +146,7 @@ impl Validators { } /// Update the view of the validators. - pub(crate) async fn update(&mut self) -> Result<(), String> { + pub(crate) async fn update(&mut self) -> Result<(), SeraiError> { let session_changes = Self::session_changes(&*self.serai, &self.sessions).await?; self.incorporate_session_changes(session_changes); Ok(()) @@ -200,13 +199,13 @@ impl ContinuallyRan for UpdateValidatorsTask { const DELAY_BETWEEN_ITERATIONS: u64 = 60; const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60; - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = SeraiError; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let session_changes = { let validators = self.validators.read().await; - Validators::session_changes(validators.serai.clone(), validators.sessions.clone()) - .await - .map_err(|e| format!("{e:?}"))? + Validators::session_changes(validators.serai.clone(), validators.sessions.clone()).await? }; self.validators.write().await.incorporate_session_changes(session_changes); Ok(true) diff --git a/coordinator/p2p/src/heartbeat.rs b/coordinator/p2p/src/heartbeat.rs index 8a2f3220..f13a0e5c 100644 --- a/coordinator/p2p/src/heartbeat.rs +++ b/coordinator/p2p/src/heartbeat.rs @@ -45,7 +45,9 @@ pub(crate) struct HeartbeatTask { } impl ContinuallyRan for HeartbeatTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { // If our blockchain hasn't had a block in the past minute, trigger the heartbeat protocol const TIME_TO_TRIGGER_SYNCING: Duration = Duration::from_secs(60); diff --git a/coordinator/src/serai.rs b/coordinator/src/serai.rs index b0093002..20599b3d 100644 --- a/coordinator/src/serai.rs +++ b/coordinator/src/serai.rs @@ -20,7 +20,9 @@ pub struct PublishSlashReportTask { serai: Arc, } impl ContinuallyRan for PublishSlashReportTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut made_progress = false; for network in serai_client::primitives::NETWORKS { diff --git a/coordinator/src/substrate.rs b/coordinator/src/substrate.rs index 224b6278..7601b2cc 100644 --- a/coordinator/src/substrate.rs +++ b/coordinator/src/substrate.rs @@ -32,7 +32,8 @@ pub(crate) struct SubstrateTask { } impl ContinuallyRan for SubstrateTask

{ - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; // TODO + fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut made_progress = false; diff --git a/coordinator/src/tributary.rs b/coordinator/src/tributary.rs index a96cf225..a445be16 100644 --- a/coordinator/src/tributary.rs +++ b/coordinator/src/tributary.rs @@ -15,7 +15,7 @@ use serai_client::validator_sets::primitives::ValidatorSet; use tributary_sdk::{TransactionKind, TransactionError, ProvidedError, TransactionTrait, Tributary}; -use serai_task::{Task, TaskHandle, ContinuallyRan}; +use serai_task::{Task, TaskHandle, DoesNotError, ContinuallyRan}; use message_queue::{Service, Metadata, client::MessageQueue}; @@ -76,7 +76,9 @@ pub(crate) struct ProvideCosignCosignedTransactionsTask ContinuallyRan for ProvideCosignCosignedTransactionsTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut made_progress = false; @@ -154,7 +156,9 @@ pub(crate) struct AddTributaryTransactionsTask key: Zeroizing<::F>, } impl ContinuallyRan for AddTributaryTransactionsTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = DoesNotError; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut made_progress = false; loop { @@ -212,7 +216,9 @@ pub(crate) struct TributaryProcessorMessagesTask { message_queue: Arc, } impl ContinuallyRan for TributaryProcessorMessagesTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; // TODO + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut made_progress = false; loop { @@ -242,7 +248,9 @@ pub(crate) struct SignSlashReportTask { key: Zeroizing<::F>, } impl ContinuallyRan for SignSlashReportTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = DoesNotError; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut txn = self.db.txn(); let Some(()) = SignSlashReport::try_recv(&mut txn, self.set.set) else { return Ok(false) }; diff --git a/coordinator/substrate/src/canonical.rs b/coordinator/substrate/src/canonical.rs index e1bbe6c2..34165774 100644 --- a/coordinator/substrate/src/canonical.rs +++ b/coordinator/substrate/src/canonical.rs @@ -34,7 +34,9 @@ impl CanonicalEventStream { } impl ContinuallyRan for CanonicalEventStream { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let next_block = NextBlock::get(&self.db).unwrap_or(0); let latest_finalized_block = diff --git a/coordinator/substrate/src/ephemeral.rs b/coordinator/substrate/src/ephemeral.rs index 54df6b3c..eacfed9d 100644 --- a/coordinator/substrate/src/ephemeral.rs +++ b/coordinator/substrate/src/ephemeral.rs @@ -39,7 +39,9 @@ impl EphemeralEventStream { } impl ContinuallyRan for EphemeralEventStream { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let next_block = NextBlock::get(&self.db).unwrap_or(0); let latest_finalized_block = diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index e897afe5..83300a0d 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -512,7 +512,9 @@ impl ScanTributaryTask { } impl ContinuallyRan for ScanTributaryTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let (mut last_block_number, mut last_block_hash) = TributaryDb::last_handled_tributary_block(&self.tributary_db, self.set) diff --git a/processor/bin/src/coordinator.rs b/processor/bin/src/coordinator.rs index ffafd466..591826bd 100644 --- a/processor/bin/src/coordinator.rs +++ b/processor/bin/src/coordinator.rs @@ -95,6 +95,7 @@ impl Coordinator { message_queue.ack(Service::Coordinator, msg.id).await; // Fire that there's a new message + // This assumes the success path, not the just-rebooted-path received_message_send .send(()) .expect("failed to tell the Coordinator there's a new message"); diff --git a/processor/bitcoin/src/txindex.rs b/processor/bitcoin/src/txindex.rs index 6a55a4c4..2ba40ca8 100644 --- a/processor/bitcoin/src/txindex.rs +++ b/processor/bitcoin/src/txindex.rs @@ -39,7 +39,9 @@ pub(crate) fn script_pubkey_for_on_chain_output( pub(crate) struct TxIndexTask(pub(crate) Rpc); impl ContinuallyRan for TxIndexTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let latest_block_number = self .0 diff --git a/processor/scanner/src/batch/mod.rs b/processor/scanner/src/batch/mod.rs index 158306ca..736c3ac4 100644 --- a/processor/scanner/src/batch/mod.rs +++ b/processor/scanner/src/batch/mod.rs @@ -7,7 +7,10 @@ use serai_db::{DbTxn, Db}; use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch}; -use primitives::{EncodableG, task::ContinuallyRan}; +use primitives::{ + EncodableG, + task::{DoesNotError, ContinuallyRan}, +}; use crate::{ db::{Returnable, ScannerGlobalDb, InInstructionData, ScanToBatchDb, BatchData, BatchToReportDb}, index, @@ -60,7 +63,9 @@ impl BatchTask { } impl ContinuallyRan for BatchTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = DoesNotError; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let highest_batchable = { // Fetch the next to scan block diff --git a/processor/scanner/src/eventuality/mod.rs b/processor/scanner/src/eventuality/mod.rs index bb3e4b7e..8a416903 100644 --- a/processor/scanner/src/eventuality/mod.rs +++ b/processor/scanner/src/eventuality/mod.rs @@ -190,7 +190,9 @@ impl> EventualityTask { } impl> ContinuallyRan for EventualityTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { // Fetch the highest acknowledged block let Some(highest_acknowledged) = ScannerGlobalDb::::highest_acknowledged_block(&self.db) diff --git a/processor/scanner/src/index/mod.rs b/processor/scanner/src/index/mod.rs index 03abc8a8..50032bae 100644 --- a/processor/scanner/src/index/mod.rs +++ b/processor/scanner/src/index/mod.rs @@ -58,7 +58,9 @@ impl IndexTask { } impl ContinuallyRan for IndexTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { // Fetch the latest finalized block let our_latest_finalized = IndexDb::latest_finalized_block(&self.db) diff --git a/processor/scanner/src/report/mod.rs b/processor/scanner/src/report/mod.rs index 2a7ab6a1..9055fcd0 100644 --- a/processor/scanner/src/report/mod.rs +++ b/processor/scanner/src/report/mod.rs @@ -4,7 +4,7 @@ use serai_db::{DbTxn, Db}; use serai_validator_sets_primitives::Session; -use primitives::task::ContinuallyRan; +use primitives::task::{DoesNotError, ContinuallyRan}; use crate::{ db::{BatchData, BatchToReportDb, BatchesToSign}, substrate, ScannerFeed, @@ -27,7 +27,9 @@ impl ReportTask { } impl ContinuallyRan for ReportTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = DoesNotError; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut made_progress = false; loop { diff --git a/processor/scanner/src/scan/mod.rs b/processor/scanner/src/scan/mod.rs index 25127ace..24426c62 100644 --- a/processor/scanner/src/scan/mod.rs +++ b/processor/scanner/src/scan/mod.rs @@ -98,7 +98,9 @@ impl ScanTask { } impl ContinuallyRan for ScanTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { // Fetch the safe to scan block let latest_scannable = diff --git a/processor/scanner/src/substrate/mod.rs b/processor/scanner/src/substrate/mod.rs index 6b22a259..4963f66b 100644 --- a/processor/scanner/src/substrate/mod.rs +++ b/processor/scanner/src/substrate/mod.rs @@ -5,7 +5,7 @@ use serai_db::{Get, DbTxn, Db}; use serai_coins_primitives::{OutInstruction, OutInstructionWithBalance}; use messages::substrate::ExecutedBatch; -use primitives::task::ContinuallyRan; +use primitives::task::{DoesNotError, ContinuallyRan}; use crate::{ db::{ScannerGlobalDb, SubstrateToEventualityDb, AcknowledgedBatches}, index, batch, ScannerFeed, KeyFor, @@ -50,7 +50,9 @@ impl SubstrateTask { } impl ContinuallyRan for SubstrateTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = DoesNotError; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut made_progress = false; loop { diff --git a/processor/signers/src/batch/mod.rs b/processor/signers/src/batch/mod.rs index c791f4e0..2c4fd1f5 100644 --- a/processor/signers/src/batch/mod.rs +++ b/processor/signers/src/batch/mod.rs @@ -14,7 +14,7 @@ use serai_db::{Get, DbTxn, Db}; use messages::sign::VariantSignId; -use primitives::task::ContinuallyRan; +use primitives::task::{DoesNotError, ContinuallyRan}; use scanner::{BatchesToSign, AcknowledgedBatches}; use frost_attempt_manager::*; @@ -79,7 +79,9 @@ impl BatchSignerTask { } impl ContinuallyRan for BatchSignerTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = DoesNotError; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut iterated = false; diff --git a/processor/signers/src/coordinator/mod.rs b/processor/signers/src/coordinator/mod.rs index 319f098c..003c14cd 100644 --- a/processor/signers/src/coordinator/mod.rs +++ b/processor/signers/src/coordinator/mod.rs @@ -22,7 +22,9 @@ impl CoordinatorTask { } impl ContinuallyRan for CoordinatorTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = String; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut iterated = false; diff --git a/processor/signers/src/cosign/mod.rs b/processor/signers/src/cosign/mod.rs index 2de18e86..dc5de6cd 100644 --- a/processor/signers/src/cosign/mod.rs +++ b/processor/signers/src/cosign/mod.rs @@ -11,7 +11,7 @@ use serai_db::{DbTxn, Db}; use messages::{sign::VariantSignId, coordinator::cosign_block_msg}; -use primitives::task::ContinuallyRan; +use primitives::task::{DoesNotError, ContinuallyRan}; use frost_attempt_manager::*; @@ -51,7 +51,9 @@ impl CosignerTask { } impl ContinuallyRan for CosignerTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = DoesNotError; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut iterated = false; diff --git a/processor/signers/src/slash_report.rs b/processor/signers/src/slash_report.rs index 577ec90b..a5d155ef 100644 --- a/processor/signers/src/slash_report.rs +++ b/processor/signers/src/slash_report.rs @@ -13,7 +13,7 @@ use serai_db::{DbTxn, Db}; use messages::sign::VariantSignId; -use primitives::task::ContinuallyRan; +use primitives::task::{DoesNotError, ContinuallyRan}; use scanner::ScannerFeed; use frost_attempt_manager::*; @@ -52,7 +52,9 @@ impl SlashReportSignerTask { } impl ContinuallyRan for SlashReportSignerTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = DoesNotError; + + fn run_iteration(&mut self) -> impl Send + Future> { async move { let mut iterated = false; diff --git a/processor/signers/src/transaction/mod.rs b/processor/signers/src/transaction/mod.rs index efb20217..b62e7303 100644 --- a/processor/signers/src/transaction/mod.rs +++ b/processor/signers/src/transaction/mod.rs @@ -92,7 +92,9 @@ impl> impl>> ContinuallyRan for TransactionSignerTask { - fn run_iteration(&mut self) -> impl Send + Future> { + type Error = P::EphemeralError; + + fn run_iteration(&mut self) -> impl Send + Future> { async { let mut iterated = false; @@ -222,11 +224,7 @@ impl> let tx = TransactionFor::::read(&mut tx_buf).unwrap(); assert!(tx_buf.is_empty()); - self - .publisher - .publish(tx) - .await - .map_err(|e| format!("couldn't re-broadcast transactions: {e:?}"))?; + self.publisher.publish(tx).await?; } self.last_publication = Instant::now();