2 Commits

Author SHA1 Message Date
Luke Parker
b5a6b0693e Add a proper error type to ContinuallyRan
This isn't necessary. Because we just log the error, we never match off of it,
we don't need any structure beyond String (or now Debug, which still gives us
a way to print the error). This is for the ergonomics of not having to
constantly write `.map_err(|e| format!("{e:?}"))`.
2025-01-12 18:29:08 -05:00
Luke Parker
3cc2abfedc Add a task to publish slash reports 2025-01-12 17:47:48 -05:00
27 changed files with 220 additions and 55 deletions

View File

@@ -2,7 +2,11 @@
#![doc = include_str!("../README.md")]
#![deny(missing_docs)]
use core::{future::Future, time::Duration};
use core::{
fmt::{self, Debug},
future::Future,
time::Duration,
};
use tokio::sync::mpsc;
@@ -60,6 +64,15 @@ impl TaskHandle {
}
}
/// An enum which can't be constructed, representing that the task does not error.
pub enum DoesNotError {}
impl Debug for DoesNotError {
fn fmt(&self, _: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
// This type can't be constructed so we'll never have a `&self` to call this fn with
unreachable!()
}
}
/// A task to be continually ran.
pub trait ContinuallyRan: Sized + Send {
/// The amount of seconds before this task should be polled again.
@@ -69,11 +82,14 @@ pub trait ContinuallyRan: Sized + Send {
/// Upon error, the amount of time waited will be linearly increased until this limit.
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 120;
/// The error potentially yielded upon running an iteration of this task.
type Error: Debug;
/// Run an iteration of the task.
///
/// If this returns `true`, all dependents of the task will immediately have a new iteration ran
/// (without waiting for whatever timer they were already on).
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>>;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>>;
/// Continually run the task.
fn continually_run(
@@ -115,12 +131,17 @@ pub trait ContinuallyRan: Sized + Send {
}
}
Err(e) => {
log::warn!("{}", e);
log::warn!("{e:?}");
increase_sleep_before_next_task(&mut current_sleep_before_next_task);
}
}
// Don't run the task again for another few seconds UNLESS told to run now
/*
We could replace tokio::mpsc with async_channel, tokio::time::sleep with
patchable_async_sleep::sleep, and tokio::select with futures_lite::future::or
It isn't worth the effort when patchable_async_sleep::sleep will still resolve to tokio
*/
tokio::select! {
() = tokio::time::sleep(Duration::from_secs(current_sleep_before_next_task)) => {},
msg = task.run_now.recv() => {

View File

@@ -2,7 +2,7 @@ use core::future::Future;
use std::time::{Duration, SystemTime};
use serai_db::*;
use serai_task::ContinuallyRan;
use serai_task::{DoesNotError, ContinuallyRan};
use crate::evaluator::CosignedBlocks;
@@ -25,7 +25,9 @@ pub(crate) struct CosignDelayTask<D: Db> {
}
impl<D: Db> ContinuallyRan for CosignDelayTask<D> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = DoesNotError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut made_progress = false;
loop {

View File

@@ -80,7 +80,9 @@ pub(crate) struct CosignEvaluatorTask<D: Db, R: RequestNotableCosigns> {
}
impl<D: Db, R: RequestNotableCosigns> ContinuallyRan for CosignEvaluatorTask<D, R> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut known_cosign = None;
let mut made_progress = false;

View File

@@ -61,7 +61,9 @@ pub(crate) struct CosignIntendTask<D: Db> {
}
impl<D: Db> ContinuallyRan for CosignIntendTask<D> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let start_block_number = ScanCosignFrom::get(&self.db).unwrap_or(1);
let latest_block_number =

View File

@@ -5,7 +5,7 @@ use rand_core::{RngCore, OsRng};
use tokio::sync::mpsc;
use serai_client::Serai;
use serai_client::{SeraiError, Serai};
use libp2p::{
core::multiaddr::{Protocol, Multiaddr},
@@ -50,7 +50,9 @@ impl ContinuallyRan for DialTask {
const DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60;
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 10 * 60;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = SeraiError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
self.validators.update().await?;
@@ -83,8 +85,7 @@ impl ContinuallyRan for DialTask {
.unwrap_or(0)
.saturating_sub(1))
{
let mut potential_peers =
self.serai.p2p_validators(network).await.map_err(|e| format!("{e:?}"))?;
let mut potential_peers = self.serai.p2p_validators(network).await?;
for _ in 0 .. (TARGET_PEERS_PER_NETWORK - peer_count) {
if potential_peers.is_empty() {
break;

View File

@@ -4,7 +4,7 @@ use std::{
collections::{HashSet, HashMap},
};
use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, Serai};
use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, SeraiError, Serai};
use serai_task::{Task, ContinuallyRan};
@@ -50,9 +50,8 @@ impl Validators {
async fn session_changes(
serai: impl Borrow<Serai>,
sessions: impl Borrow<HashMap<NetworkId, Session>>,
) -> Result<Vec<(NetworkId, Session, HashSet<PeerId>)>, String> {
let temporal_serai =
serai.borrow().as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?;
) -> Result<Vec<(NetworkId, Session, HashSet<PeerId>)>, SeraiError> {
let temporal_serai = serai.borrow().as_of_latest_finalized_block().await?;
let temporal_serai = temporal_serai.validator_sets();
let mut session_changes = vec![];
@@ -69,7 +68,7 @@ impl Validators {
let session = match temporal_serai.session(network).await {
Ok(Some(session)) => session,
Ok(None) => return Ok(None),
Err(e) => return Err(format!("{e:?}")),
Err(e) => return Err(e),
};
if sessions.get(&network) == Some(&session) {
@@ -81,7 +80,7 @@ impl Validators {
session,
validators.into_iter().map(peer_id_from_public).collect(),
))),
Err(e) => Err(format!("{e:?}")),
Err(e) => Err(e),
}
}
});
@@ -147,7 +146,7 @@ impl Validators {
}
/// Update the view of the validators.
pub(crate) async fn update(&mut self) -> Result<(), String> {
pub(crate) async fn update(&mut self) -> Result<(), SeraiError> {
let session_changes = Self::session_changes(&*self.serai, &self.sessions).await?;
self.incorporate_session_changes(session_changes);
Ok(())
@@ -200,13 +199,13 @@ impl ContinuallyRan for UpdateValidatorsTask {
const DELAY_BETWEEN_ITERATIONS: u64 = 60;
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = SeraiError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let session_changes = {
let validators = self.validators.read().await;
Validators::session_changes(validators.serai.clone(), validators.sessions.clone())
.await
.map_err(|e| format!("{e:?}"))?
Validators::session_changes(validators.serai.clone(), validators.sessions.clone()).await?
};
self.validators.write().await.incorporate_session_changes(session_changes);
Ok(true)

View File

@@ -45,7 +45,9 @@ pub(crate) struct HeartbeatTask<TD: Db, Tx: TransactionTrait, P: P2p> {
}
impl<TD: Db, Tx: TransactionTrait, P: P2p> ContinuallyRan for HeartbeatTask<TD, Tx, P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
// If our blockchain hasn't had a block in the past minute, trigger the heartbeat protocol
const TIME_TO_TRIGGER_SYNCING: Duration = Duration::from_secs(60);

View File

@@ -39,6 +39,8 @@ mod p2p {
pub use serai_coordinator_libp2p_p2p::Libp2p;
}
mod serai;
// Use a zeroizing allocator for this entire application
// While secrets should already be zeroized, the presence of secret keys in a networked application
// (at increased risk of OOB reads) justifies the performance hit in case any secrets weren't

95
coordinator/src/serai.rs Normal file
View File

@@ -0,0 +1,95 @@
use core::future::Future;
use std::sync::Arc;
use serai_db::{Get, DbTxn, Db as DbTrait, create_db};
use scale::Decode;
use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, Serai};
use serai_task::ContinuallyRan;
create_db! {
CoordinatorSerai {
SlashReports: (network: NetworkId) -> (Session, Vec<u8>),
}
}
/// Publish `SlashReport`s from `SlashReports` onto Serai.
pub struct PublishSlashReportTask<CD: DbTrait> {
db: CD,
serai: Arc<Serai>,
}
impl<CD: DbTrait> ContinuallyRan for PublishSlashReportTask<CD> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut made_progress = false;
for network in serai_client::primitives::NETWORKS {
if network == NetworkId::Serai {
continue;
};
let mut txn = self.db.txn();
let Some((session, slash_report)) = SlashReports::take(&mut txn, network) else {
// No slash report to publish
continue;
};
let slash_report = serai_client::Transaction::decode(&mut slash_report.as_slice()).unwrap();
let serai =
self.serai.as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?;
let serai = serai.validator_sets();
let session_after_slash_report = Session(session.0 + 1);
let current_session = serai.session(network).await.map_err(|e| format!("{e:?}"))?;
let current_session = current_session.map(|session| session.0);
// Only attempt to publish the slash report for session #n while session #n+1 is still
// active
let session_after_slash_report_retired =
current_session > Some(session_after_slash_report.0);
if session_after_slash_report_retired {
// Commit the txn to drain this SlashReport from the database and not try it again later
txn.commit();
continue;
}
if Some(session_after_slash_report.0) != current_session {
// We already checked the current session wasn't greater, and they're not equal
assert!(current_session < Some(session_after_slash_report.0));
// This would mean the Serai node is resyncing and is behind where it prior was
Err("have a SlashReport for a session Serai has yet to retire".to_string())?;
}
// If this session which should publish a slash report already has, move on
let key_pending_slash_report =
serai.key_pending_slash_report(network).await.map_err(|e| format!("{e:?}"))?;
if key_pending_slash_report.is_none() {
txn.commit();
continue;
};
/*
let tx = serai_client::SeraiValidatorSets::report_slashes(
network,
slash_report,
signature.clone(),
);
*/
match self.serai.publish(&slash_report).await {
Ok(()) => {
txn.commit();
made_progress = true;
}
// This could be specific to this TX (such as an already in mempool error) and it may be
// worthwhile to continue iteration with the other pending slash reports. We assume this
// error ephemeral and that the latency incurred for this ephemeral error to resolve is
// miniscule compared to the window available to publish the slash report. That makes
// this a non-issue.
Err(e) => Err(format!("couldn't publish slash report transaction: {e:?}"))?,
}
}
Ok(made_progress)
}
}
}

View File

@@ -32,7 +32,8 @@ pub(crate) struct SubstrateTask<P: P2p> {
}
impl<P: P2p> ContinuallyRan for SubstrateTask<P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = String; // TODO
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut made_progress = false;

View File

@@ -15,7 +15,7 @@ use serai_client::validator_sets::primitives::ValidatorSet;
use tributary_sdk::{TransactionKind, TransactionError, ProvidedError, TransactionTrait, Tributary};
use serai_task::{Task, TaskHandle, ContinuallyRan};
use serai_task::{Task, TaskHandle, DoesNotError, ContinuallyRan};
use message_queue::{Service, Metadata, client::MessageQueue};
@@ -76,7 +76,9 @@ pub(crate) struct ProvideCosignCosignedTransactionsTask<CD: DbTrait, TD: DbTrait
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
for ProvideCosignCosignedTransactionsTask<CD, TD, P>
{
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut made_progress = false;
@@ -154,7 +156,9 @@ pub(crate) struct AddTributaryTransactionsTask<CD: DbTrait, TD: DbTrait, P: P2p>
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
}
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for AddTributaryTransactionsTask<CD, TD, P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = DoesNotError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut made_progress = false;
loop {
@@ -212,7 +216,9 @@ pub(crate) struct TributaryProcessorMessagesTask<TD: DbTrait> {
message_queue: Arc<MessageQueue>,
}
impl<TD: DbTrait> ContinuallyRan for TributaryProcessorMessagesTask<TD> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = String; // TODO
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut made_progress = false;
loop {
@@ -242,7 +248,9 @@ pub(crate) struct SignSlashReportTask<CD: DbTrait, TD: DbTrait, P: P2p> {
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
}
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for SignSlashReportTask<CD, TD, P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = DoesNotError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut txn = self.db.txn();
let Some(()) = SignSlashReport::try_recv(&mut txn, self.set.set) else { return Ok(false) };

View File

@@ -34,7 +34,9 @@ impl<D: Db> CanonicalEventStream<D> {
}
impl<D: Db> ContinuallyRan for CanonicalEventStream<D> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let next_block = NextBlock::get(&self.db).unwrap_or(0);
let latest_finalized_block =

View File

@@ -39,7 +39,9 @@ impl<D: Db> EphemeralEventStream<D> {
}
impl<D: Db> ContinuallyRan for EphemeralEventStream<D> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let next_block = NextBlock::get(&self.db).unwrap_or(0);
let latest_finalized_block =

View File

@@ -512,7 +512,9 @@ impl<TD: Db, P: P2p> ScanTributaryTask<TD, P> {
}
impl<TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<TD, P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let (mut last_block_number, mut last_block_hash) =
TributaryDb::last_handled_tributary_block(&self.tributary_db, self.set)

View File

@@ -95,6 +95,7 @@ impl Coordinator {
message_queue.ack(Service::Coordinator, msg.id).await;
// Fire that there's a new message
// This assumes the success path, not the just-rebooted-path
received_message_send
.send(())
.expect("failed to tell the Coordinator there's a new message");

View File

@@ -39,7 +39,9 @@ pub(crate) fn script_pubkey_for_on_chain_output(
pub(crate) struct TxIndexTask<D: Db>(pub(crate) Rpc<D>);
impl<D: Db> ContinuallyRan for TxIndexTask<D> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let latest_block_number = self
.0

View File

@@ -7,7 +7,10 @@ use serai_db::{DbTxn, Db};
use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch};
use primitives::{EncodableG, task::ContinuallyRan};
use primitives::{
EncodableG,
task::{DoesNotError, ContinuallyRan},
};
use crate::{
db::{Returnable, ScannerGlobalDb, InInstructionData, ScanToBatchDb, BatchData, BatchToReportDb},
index,
@@ -60,7 +63,9 @@ impl<D: Db, S: ScannerFeed> BatchTask<D, S> {
}
impl<D: Db, S: ScannerFeed> ContinuallyRan for BatchTask<D, S> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = DoesNotError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let highest_batchable = {
// Fetch the next to scan block

View File

@@ -190,7 +190,9 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> EventualityTask<D, S, Sch> {
}
impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTask<D, S, Sch> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
// Fetch the highest acknowledged block
let Some(highest_acknowledged) = ScannerGlobalDb::<S>::highest_acknowledged_block(&self.db)

View File

@@ -58,7 +58,9 @@ impl<D: Db, S: ScannerFeed> IndexTask<D, S> {
}
impl<D: Db, S: ScannerFeed> ContinuallyRan for IndexTask<D, S> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
// Fetch the latest finalized block
let our_latest_finalized = IndexDb::latest_finalized_block(&self.db)

View File

@@ -4,7 +4,7 @@ use serai_db::{DbTxn, Db};
use serai_validator_sets_primitives::Session;
use primitives::task::ContinuallyRan;
use primitives::task::{DoesNotError, ContinuallyRan};
use crate::{
db::{BatchData, BatchToReportDb, BatchesToSign},
substrate, ScannerFeed,
@@ -27,7 +27,9 @@ impl<D: Db, S: ScannerFeed> ReportTask<D, S> {
}
impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = DoesNotError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut made_progress = false;
loop {

View File

@@ -98,7 +98,9 @@ impl<D: Db, S: ScannerFeed> ScanTask<D, S> {
}
impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanTask<D, S> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
// Fetch the safe to scan block
let latest_scannable =

View File

@@ -5,7 +5,7 @@ use serai_db::{Get, DbTxn, Db};
use serai_coins_primitives::{OutInstruction, OutInstructionWithBalance};
use messages::substrate::ExecutedBatch;
use primitives::task::ContinuallyRan;
use primitives::task::{DoesNotError, ContinuallyRan};
use crate::{
db::{ScannerGlobalDb, SubstrateToEventualityDb, AcknowledgedBatches},
index, batch, ScannerFeed, KeyFor,
@@ -50,7 +50,9 @@ impl<D: Db, S: ScannerFeed> SubstrateTask<D, S> {
}
impl<D: Db, S: ScannerFeed> ContinuallyRan for SubstrateTask<D, S> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = DoesNotError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut made_progress = false;
loop {

View File

@@ -14,7 +14,7 @@ use serai_db::{Get, DbTxn, Db};
use messages::sign::VariantSignId;
use primitives::task::ContinuallyRan;
use primitives::task::{DoesNotError, ContinuallyRan};
use scanner::{BatchesToSign, AcknowledgedBatches};
use frost_attempt_manager::*;
@@ -79,7 +79,9 @@ impl<D: Db, E: GroupEncoding> BatchSignerTask<D, E> {
}
impl<D: Db, E: Send + GroupEncoding> ContinuallyRan for BatchSignerTask<D, E> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = DoesNotError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut iterated = false;

View File

@@ -22,7 +22,9 @@ impl<D: Db, C: Coordinator> CoordinatorTask<D, C> {
}
impl<D: Db, C: Coordinator> ContinuallyRan for CoordinatorTask<D, C> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut iterated = false;

View File

@@ -11,7 +11,7 @@ use serai_db::{DbTxn, Db};
use messages::{sign::VariantSignId, coordinator::cosign_block_msg};
use primitives::task::ContinuallyRan;
use primitives::task::{DoesNotError, ContinuallyRan};
use frost_attempt_manager::*;
@@ -51,7 +51,9 @@ impl<D: Db> CosignerTask<D> {
}
impl<D: Db> ContinuallyRan for CosignerTask<D> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = DoesNotError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, DoesNotError>> {
async move {
let mut iterated = false;

View File

@@ -13,7 +13,7 @@ use serai_db::{DbTxn, Db};
use messages::sign::VariantSignId;
use primitives::task::ContinuallyRan;
use primitives::task::{DoesNotError, ContinuallyRan};
use scanner::ScannerFeed;
use frost_attempt_manager::*;
@@ -52,7 +52,9 @@ impl<D: Db, S: ScannerFeed> SlashReportSignerTask<D, S> {
}
impl<D: Db, S: ScannerFeed> ContinuallyRan for SlashReportSignerTask<D, S> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = DoesNotError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut iterated = false;

View File

@@ -92,7 +92,9 @@ impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>
impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>> ContinuallyRan
for TransactionSignerTask<D, ST, P>
{
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = P::EphemeralError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async {
let mut iterated = false;
@@ -222,11 +224,7 @@ impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>
let tx = TransactionFor::<ST>::read(&mut tx_buf).unwrap();
assert!(tx_buf.is_empty());
self
.publisher
.publish(tx)
.await
.map_err(|e| format!("couldn't re-broadcast transactions: {e:?}"))?;
self.publisher.publish(tx).await?;
}
self.last_publication = Instant::now();