mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-12 05:59:23 +00:00
Compare commits
2 Commits
0ce9aad9b2
...
b5a6b0693e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b5a6b0693e | ||
|
|
3cc2abfedc |
@@ -2,7 +2,11 @@
|
||||
#![doc = include_str!("../README.md")]
|
||||
#![deny(missing_docs)]
|
||||
|
||||
use core::{future::Future, time::Duration};
|
||||
use core::{
|
||||
fmt::{self, Debug},
|
||||
future::Future,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
@@ -60,6 +64,15 @@ impl TaskHandle {
|
||||
}
|
||||
}
|
||||
|
||||
/// An enum which can't be constructed, representing that the task does not error.
|
||||
pub enum DoesNotError {}
|
||||
impl Debug for DoesNotError {
|
||||
fn fmt(&self, _: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
|
||||
// This type can't be constructed so we'll never have a `&self` to call this fn with
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
/// A task to be continually ran.
|
||||
pub trait ContinuallyRan: Sized + Send {
|
||||
/// The amount of seconds before this task should be polled again.
|
||||
@@ -69,11 +82,14 @@ pub trait ContinuallyRan: Sized + Send {
|
||||
/// Upon error, the amount of time waited will be linearly increased until this limit.
|
||||
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 120;
|
||||
|
||||
/// The error potentially yielded upon running an iteration of this task.
|
||||
type Error: Debug;
|
||||
|
||||
/// Run an iteration of the task.
|
||||
///
|
||||
/// If this returns `true`, all dependents of the task will immediately have a new iteration ran
|
||||
/// (without waiting for whatever timer they were already on).
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>>;
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>>;
|
||||
|
||||
/// Continually run the task.
|
||||
fn continually_run(
|
||||
@@ -115,12 +131,17 @@ pub trait ContinuallyRan: Sized + Send {
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
log::warn!("{}", e);
|
||||
log::warn!("{e:?}");
|
||||
increase_sleep_before_next_task(&mut current_sleep_before_next_task);
|
||||
}
|
||||
}
|
||||
|
||||
// Don't run the task again for another few seconds UNLESS told to run now
|
||||
/*
|
||||
We could replace tokio::mpsc with async_channel, tokio::time::sleep with
|
||||
patchable_async_sleep::sleep, and tokio::select with futures_lite::future::or
|
||||
It isn't worth the effort when patchable_async_sleep::sleep will still resolve to tokio
|
||||
*/
|
||||
tokio::select! {
|
||||
() = tokio::time::sleep(Duration::from_secs(current_sleep_before_next_task)) => {},
|
||||
msg = task.run_now.recv() => {
|
||||
|
||||
@@ -2,7 +2,7 @@ use core::future::Future;
|
||||
use std::time::{Duration, SystemTime};
|
||||
|
||||
use serai_db::*;
|
||||
use serai_task::ContinuallyRan;
|
||||
use serai_task::{DoesNotError, ContinuallyRan};
|
||||
|
||||
use crate::evaluator::CosignedBlocks;
|
||||
|
||||
@@ -25,7 +25,9 @@ pub(crate) struct CosignDelayTask<D: Db> {
|
||||
}
|
||||
|
||||
impl<D: Db> ContinuallyRan for CosignDelayTask<D> {
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
type Error = DoesNotError;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||
async move {
|
||||
let mut made_progress = false;
|
||||
loop {
|
||||
|
||||
@@ -80,7 +80,9 @@ pub(crate) struct CosignEvaluatorTask<D: Db, R: RequestNotableCosigns> {
|
||||
}
|
||||
|
||||
impl<D: Db, R: RequestNotableCosigns> ContinuallyRan for CosignEvaluatorTask<D, R> {
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
type Error = String;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||
async move {
|
||||
let mut known_cosign = None;
|
||||
let mut made_progress = false;
|
||||
|
||||
@@ -61,7 +61,9 @@ pub(crate) struct CosignIntendTask<D: Db> {
|
||||
}
|
||||
|
||||
impl<D: Db> ContinuallyRan for CosignIntendTask<D> {
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
type Error = String;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||
async move {
|
||||
let start_block_number = ScanCosignFrom::get(&self.db).unwrap_or(1);
|
||||
let latest_block_number =
|
||||
|
||||
@@ -5,7 +5,7 @@ use rand_core::{RngCore, OsRng};
|
||||
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use serai_client::Serai;
|
||||
use serai_client::{SeraiError, Serai};
|
||||
|
||||
use libp2p::{
|
||||
core::multiaddr::{Protocol, Multiaddr},
|
||||
@@ -50,7 +50,9 @@ impl ContinuallyRan for DialTask {
|
||||
const DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60;
|
||||
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 10 * 60;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
type Error = SeraiError;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||
async move {
|
||||
self.validators.update().await?;
|
||||
|
||||
@@ -83,8 +85,7 @@ impl ContinuallyRan for DialTask {
|
||||
.unwrap_or(0)
|
||||
.saturating_sub(1))
|
||||
{
|
||||
let mut potential_peers =
|
||||
self.serai.p2p_validators(network).await.map_err(|e| format!("{e:?}"))?;
|
||||
let mut potential_peers = self.serai.p2p_validators(network).await?;
|
||||
for _ in 0 .. (TARGET_PEERS_PER_NETWORK - peer_count) {
|
||||
if potential_peers.is_empty() {
|
||||
break;
|
||||
|
||||
@@ -4,7 +4,7 @@ use std::{
|
||||
collections::{HashSet, HashMap},
|
||||
};
|
||||
|
||||
use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, Serai};
|
||||
use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, SeraiError, Serai};
|
||||
|
||||
use serai_task::{Task, ContinuallyRan};
|
||||
|
||||
@@ -50,9 +50,8 @@ impl Validators {
|
||||
async fn session_changes(
|
||||
serai: impl Borrow<Serai>,
|
||||
sessions: impl Borrow<HashMap<NetworkId, Session>>,
|
||||
) -> Result<Vec<(NetworkId, Session, HashSet<PeerId>)>, String> {
|
||||
let temporal_serai =
|
||||
serai.borrow().as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?;
|
||||
) -> Result<Vec<(NetworkId, Session, HashSet<PeerId>)>, SeraiError> {
|
||||
let temporal_serai = serai.borrow().as_of_latest_finalized_block().await?;
|
||||
let temporal_serai = temporal_serai.validator_sets();
|
||||
|
||||
let mut session_changes = vec![];
|
||||
@@ -69,7 +68,7 @@ impl Validators {
|
||||
let session = match temporal_serai.session(network).await {
|
||||
Ok(Some(session)) => session,
|
||||
Ok(None) => return Ok(None),
|
||||
Err(e) => return Err(format!("{e:?}")),
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
|
||||
if sessions.get(&network) == Some(&session) {
|
||||
@@ -81,7 +80,7 @@ impl Validators {
|
||||
session,
|
||||
validators.into_iter().map(peer_id_from_public).collect(),
|
||||
))),
|
||||
Err(e) => Err(format!("{e:?}")),
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -147,7 +146,7 @@ impl Validators {
|
||||
}
|
||||
|
||||
/// Update the view of the validators.
|
||||
pub(crate) async fn update(&mut self) -> Result<(), String> {
|
||||
pub(crate) async fn update(&mut self) -> Result<(), SeraiError> {
|
||||
let session_changes = Self::session_changes(&*self.serai, &self.sessions).await?;
|
||||
self.incorporate_session_changes(session_changes);
|
||||
Ok(())
|
||||
@@ -200,13 +199,13 @@ impl ContinuallyRan for UpdateValidatorsTask {
|
||||
const DELAY_BETWEEN_ITERATIONS: u64 = 60;
|
||||
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
type Error = SeraiError;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||
async move {
|
||||
let session_changes = {
|
||||
let validators = self.validators.read().await;
|
||||
Validators::session_changes(validators.serai.clone(), validators.sessions.clone())
|
||||
.await
|
||||
.map_err(|e| format!("{e:?}"))?
|
||||
Validators::session_changes(validators.serai.clone(), validators.sessions.clone()).await?
|
||||
};
|
||||
self.validators.write().await.incorporate_session_changes(session_changes);
|
||||
Ok(true)
|
||||
|
||||
@@ -45,7 +45,9 @@ pub(crate) struct HeartbeatTask<TD: Db, Tx: TransactionTrait, P: P2p> {
|
||||
}
|
||||
|
||||
impl<TD: Db, Tx: TransactionTrait, P: P2p> ContinuallyRan for HeartbeatTask<TD, Tx, P> {
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
type Error = String;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||
async move {
|
||||
// If our blockchain hasn't had a block in the past minute, trigger the heartbeat protocol
|
||||
const TIME_TO_TRIGGER_SYNCING: Duration = Duration::from_secs(60);
|
||||
|
||||
@@ -39,6 +39,8 @@ mod p2p {
|
||||
pub use serai_coordinator_libp2p_p2p::Libp2p;
|
||||
}
|
||||
|
||||
mod serai;
|
||||
|
||||
// Use a zeroizing allocator for this entire application
|
||||
// While secrets should already be zeroized, the presence of secret keys in a networked application
|
||||
// (at increased risk of OOB reads) justifies the performance hit in case any secrets weren't
|
||||
|
||||
95
coordinator/src/serai.rs
Normal file
95
coordinator/src/serai.rs
Normal file
@@ -0,0 +1,95 @@
|
||||
use core::future::Future;
|
||||
use std::sync::Arc;
|
||||
|
||||
use serai_db::{Get, DbTxn, Db as DbTrait, create_db};
|
||||
|
||||
use scale::Decode;
|
||||
use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, Serai};
|
||||
|
||||
use serai_task::ContinuallyRan;
|
||||
|
||||
create_db! {
|
||||
CoordinatorSerai {
|
||||
SlashReports: (network: NetworkId) -> (Session, Vec<u8>),
|
||||
}
|
||||
}
|
||||
|
||||
/// Publish `SlashReport`s from `SlashReports` onto Serai.
|
||||
pub struct PublishSlashReportTask<CD: DbTrait> {
|
||||
db: CD,
|
||||
serai: Arc<Serai>,
|
||||
}
|
||||
impl<CD: DbTrait> ContinuallyRan for PublishSlashReportTask<CD> {
|
||||
type Error = String;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||
async move {
|
||||
let mut made_progress = false;
|
||||
for network in serai_client::primitives::NETWORKS {
|
||||
if network == NetworkId::Serai {
|
||||
continue;
|
||||
};
|
||||
|
||||
let mut txn = self.db.txn();
|
||||
let Some((session, slash_report)) = SlashReports::take(&mut txn, network) else {
|
||||
// No slash report to publish
|
||||
continue;
|
||||
};
|
||||
let slash_report = serai_client::Transaction::decode(&mut slash_report.as_slice()).unwrap();
|
||||
|
||||
let serai =
|
||||
self.serai.as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?;
|
||||
let serai = serai.validator_sets();
|
||||
let session_after_slash_report = Session(session.0 + 1);
|
||||
let current_session = serai.session(network).await.map_err(|e| format!("{e:?}"))?;
|
||||
let current_session = current_session.map(|session| session.0);
|
||||
// Only attempt to publish the slash report for session #n while session #n+1 is still
|
||||
// active
|
||||
let session_after_slash_report_retired =
|
||||
current_session > Some(session_after_slash_report.0);
|
||||
if session_after_slash_report_retired {
|
||||
// Commit the txn to drain this SlashReport from the database and not try it again later
|
||||
txn.commit();
|
||||
continue;
|
||||
}
|
||||
|
||||
if Some(session_after_slash_report.0) != current_session {
|
||||
// We already checked the current session wasn't greater, and they're not equal
|
||||
assert!(current_session < Some(session_after_slash_report.0));
|
||||
// This would mean the Serai node is resyncing and is behind where it prior was
|
||||
Err("have a SlashReport for a session Serai has yet to retire".to_string())?;
|
||||
}
|
||||
|
||||
// If this session which should publish a slash report already has, move on
|
||||
let key_pending_slash_report =
|
||||
serai.key_pending_slash_report(network).await.map_err(|e| format!("{e:?}"))?;
|
||||
if key_pending_slash_report.is_none() {
|
||||
txn.commit();
|
||||
continue;
|
||||
};
|
||||
|
||||
/*
|
||||
let tx = serai_client::SeraiValidatorSets::report_slashes(
|
||||
network,
|
||||
slash_report,
|
||||
signature.clone(),
|
||||
);
|
||||
*/
|
||||
|
||||
match self.serai.publish(&slash_report).await {
|
||||
Ok(()) => {
|
||||
txn.commit();
|
||||
made_progress = true;
|
||||
}
|
||||
// This could be specific to this TX (such as an already in mempool error) and it may be
|
||||
// worthwhile to continue iteration with the other pending slash reports. We assume this
|
||||
// error ephemeral and that the latency incurred for this ephemeral error to resolve is
|
||||
// miniscule compared to the window available to publish the slash report. That makes
|
||||
// this a non-issue.
|
||||
Err(e) => Err(format!("couldn't publish slash report transaction: {e:?}"))?,
|
||||
}
|
||||
}
|
||||
Ok(made_progress)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -32,7 +32,8 @@ pub(crate) struct SubstrateTask<P: P2p> {
|
||||
}
|
||||
|
||||
impl<P: P2p> ContinuallyRan for SubstrateTask<P> {
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
type Error = String; // TODO
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||
async move {
|
||||
let mut made_progress = false;
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ use serai_client::validator_sets::primitives::ValidatorSet;
|
||||
|
||||
use tributary_sdk::{TransactionKind, TransactionError, ProvidedError, TransactionTrait, Tributary};
|
||||
|
||||
use serai_task::{Task, TaskHandle, ContinuallyRan};
|
||||
use serai_task::{Task, TaskHandle, DoesNotError, ContinuallyRan};
|
||||
|
||||
use message_queue::{Service, Metadata, client::MessageQueue};
|
||||
|
||||
@@ -76,7 +76,9 @@ pub(crate) struct ProvideCosignCosignedTransactionsTask<CD: DbTrait, TD: DbTrait
|
||||
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
|
||||
for ProvideCosignCosignedTransactionsTask<CD, TD, P>
|
||||
{
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
type Error = String;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||
async move {
|
||||
let mut made_progress = false;
|
||||
|
||||
@@ -154,7 +156,9 @@ pub(crate) struct AddTributaryTransactionsTask<CD: DbTrait, TD: DbTrait, P: P2p>
|
||||
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
|
||||
}
|
||||
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for AddTributaryTransactionsTask<CD, TD, P> {
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
type Error = DoesNotError;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||
async move {
|
||||
let mut made_progress = false;
|
||||
loop {
|
||||
@@ -212,7 +216,9 @@ pub(crate) struct TributaryProcessorMessagesTask<TD: DbTrait> {
|
||||
message_queue: Arc<MessageQueue>,
|
||||
}
|
||||
impl<TD: DbTrait> ContinuallyRan for TributaryProcessorMessagesTask<TD> {
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
type Error = String; // TODO
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||
async move {
|
||||
let mut made_progress = false;
|
||||
loop {
|
||||
@@ -242,7 +248,9 @@ pub(crate) struct SignSlashReportTask<CD: DbTrait, TD: DbTrait, P: P2p> {
|
||||
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
|
||||
}
|
||||
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for SignSlashReportTask<CD, TD, P> {
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
type Error = DoesNotError;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||
async move {
|
||||
let mut txn = self.db.txn();
|
||||
let Some(()) = SignSlashReport::try_recv(&mut txn, self.set.set) else { return Ok(false) };
|
||||
|
||||
@@ -34,7 +34,9 @@ impl<D: Db> CanonicalEventStream<D> {
|
||||
}
|
||||
|
||||
impl<D: Db> ContinuallyRan for CanonicalEventStream<D> {
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
type Error = String;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||
async move {
|
||||
let next_block = NextBlock::get(&self.db).unwrap_or(0);
|
||||
let latest_finalized_block =
|
||||
|
||||
@@ -39,7 +39,9 @@ impl<D: Db> EphemeralEventStream<D> {
|
||||
}
|
||||
|
||||
impl<D: Db> ContinuallyRan for EphemeralEventStream<D> {
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
type Error = String;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||
async move {
|
||||
let next_block = NextBlock::get(&self.db).unwrap_or(0);
|
||||
let latest_finalized_block =
|
||||
|
||||
@@ -512,7 +512,9 @@ impl<TD: Db, P: P2p> ScanTributaryTask<TD, P> {
|
||||
}
|
||||
|
||||
impl<TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<TD, P> {
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
type Error = String;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||
async move {
|
||||
let (mut last_block_number, mut last_block_hash) =
|
||||
TributaryDb::last_handled_tributary_block(&self.tributary_db, self.set)
|
||||
|
||||
@@ -95,6 +95,7 @@ impl Coordinator {
|
||||
message_queue.ack(Service::Coordinator, msg.id).await;
|
||||
|
||||
// Fire that there's a new message
|
||||
// This assumes the success path, not the just-rebooted-path
|
||||
received_message_send
|
||||
.send(())
|
||||
.expect("failed to tell the Coordinator there's a new message");
|
||||
|
||||
@@ -39,7 +39,9 @@ pub(crate) fn script_pubkey_for_on_chain_output(
|
||||
pub(crate) struct TxIndexTask<D: Db>(pub(crate) Rpc<D>);
|
||||
|
||||
impl<D: Db> ContinuallyRan for TxIndexTask<D> {
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
type Error = String;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||
async move {
|
||||
let latest_block_number = self
|
||||
.0
|
||||
|
||||
@@ -7,7 +7,10 @@ use serai_db::{DbTxn, Db};
|
||||
|
||||
use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch};
|
||||
|
||||
use primitives::{EncodableG, task::ContinuallyRan};
|
||||
use primitives::{
|
||||
EncodableG,
|
||||
task::{DoesNotError, ContinuallyRan},
|
||||
};
|
||||
use crate::{
|
||||
db::{Returnable, ScannerGlobalDb, InInstructionData, ScanToBatchDb, BatchData, BatchToReportDb},
|
||||
index,
|
||||
@@ -60,7 +63,9 @@ impl<D: Db, S: ScannerFeed> BatchTask<D, S> {
|
||||
}
|
||||
|
||||
impl<D: Db, S: ScannerFeed> ContinuallyRan for BatchTask<D, S> {
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
type Error = DoesNotError;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||
async move {
|
||||
let highest_batchable = {
|
||||
// Fetch the next to scan block
|
||||
|
||||
@@ -190,7 +190,9 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> EventualityTask<D, S, Sch> {
|
||||
}
|
||||
|
||||
impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTask<D, S, Sch> {
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
type Error = String;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||
async move {
|
||||
// Fetch the highest acknowledged block
|
||||
let Some(highest_acknowledged) = ScannerGlobalDb::<S>::highest_acknowledged_block(&self.db)
|
||||
|
||||
@@ -58,7 +58,9 @@ impl<D: Db, S: ScannerFeed> IndexTask<D, S> {
|
||||
}
|
||||
|
||||
impl<D: Db, S: ScannerFeed> ContinuallyRan for IndexTask<D, S> {
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
type Error = String;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||
async move {
|
||||
// Fetch the latest finalized block
|
||||
let our_latest_finalized = IndexDb::latest_finalized_block(&self.db)
|
||||
|
||||
@@ -4,7 +4,7 @@ use serai_db::{DbTxn, Db};
|
||||
|
||||
use serai_validator_sets_primitives::Session;
|
||||
|
||||
use primitives::task::ContinuallyRan;
|
||||
use primitives::task::{DoesNotError, ContinuallyRan};
|
||||
use crate::{
|
||||
db::{BatchData, BatchToReportDb, BatchesToSign},
|
||||
substrate, ScannerFeed,
|
||||
@@ -27,7 +27,9 @@ impl<D: Db, S: ScannerFeed> ReportTask<D, S> {
|
||||
}
|
||||
|
||||
impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
type Error = DoesNotError;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||
async move {
|
||||
let mut made_progress = false;
|
||||
loop {
|
||||
|
||||
@@ -98,7 +98,9 @@ impl<D: Db, S: ScannerFeed> ScanTask<D, S> {
|
||||
}
|
||||
|
||||
impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanTask<D, S> {
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
type Error = String;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||
async move {
|
||||
// Fetch the safe to scan block
|
||||
let latest_scannable =
|
||||
|
||||
@@ -5,7 +5,7 @@ use serai_db::{Get, DbTxn, Db};
|
||||
use serai_coins_primitives::{OutInstruction, OutInstructionWithBalance};
|
||||
|
||||
use messages::substrate::ExecutedBatch;
|
||||
use primitives::task::ContinuallyRan;
|
||||
use primitives::task::{DoesNotError, ContinuallyRan};
|
||||
use crate::{
|
||||
db::{ScannerGlobalDb, SubstrateToEventualityDb, AcknowledgedBatches},
|
||||
index, batch, ScannerFeed, KeyFor,
|
||||
@@ -50,7 +50,9 @@ impl<D: Db, S: ScannerFeed> SubstrateTask<D, S> {
|
||||
}
|
||||
|
||||
impl<D: Db, S: ScannerFeed> ContinuallyRan for SubstrateTask<D, S> {
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
type Error = DoesNotError;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||
async move {
|
||||
let mut made_progress = false;
|
||||
loop {
|
||||
|
||||
@@ -14,7 +14,7 @@ use serai_db::{Get, DbTxn, Db};
|
||||
|
||||
use messages::sign::VariantSignId;
|
||||
|
||||
use primitives::task::ContinuallyRan;
|
||||
use primitives::task::{DoesNotError, ContinuallyRan};
|
||||
use scanner::{BatchesToSign, AcknowledgedBatches};
|
||||
|
||||
use frost_attempt_manager::*;
|
||||
@@ -79,7 +79,9 @@ impl<D: Db, E: GroupEncoding> BatchSignerTask<D, E> {
|
||||
}
|
||||
|
||||
impl<D: Db, E: Send + GroupEncoding> ContinuallyRan for BatchSignerTask<D, E> {
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
type Error = DoesNotError;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||
async move {
|
||||
let mut iterated = false;
|
||||
|
||||
|
||||
@@ -22,7 +22,9 @@ impl<D: Db, C: Coordinator> CoordinatorTask<D, C> {
|
||||
}
|
||||
|
||||
impl<D: Db, C: Coordinator> ContinuallyRan for CoordinatorTask<D, C> {
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
type Error = String;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||
async move {
|
||||
let mut iterated = false;
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ use serai_db::{DbTxn, Db};
|
||||
|
||||
use messages::{sign::VariantSignId, coordinator::cosign_block_msg};
|
||||
|
||||
use primitives::task::ContinuallyRan;
|
||||
use primitives::task::{DoesNotError, ContinuallyRan};
|
||||
|
||||
use frost_attempt_manager::*;
|
||||
|
||||
@@ -51,7 +51,9 @@ impl<D: Db> CosignerTask<D> {
|
||||
}
|
||||
|
||||
impl<D: Db> ContinuallyRan for CosignerTask<D> {
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
type Error = DoesNotError;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, DoesNotError>> {
|
||||
async move {
|
||||
let mut iterated = false;
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@ use serai_db::{DbTxn, Db};
|
||||
|
||||
use messages::sign::VariantSignId;
|
||||
|
||||
use primitives::task::ContinuallyRan;
|
||||
use primitives::task::{DoesNotError, ContinuallyRan};
|
||||
use scanner::ScannerFeed;
|
||||
|
||||
use frost_attempt_manager::*;
|
||||
@@ -52,7 +52,9 @@ impl<D: Db, S: ScannerFeed> SlashReportSignerTask<D, S> {
|
||||
}
|
||||
|
||||
impl<D: Db, S: ScannerFeed> ContinuallyRan for SlashReportSignerTask<D, S> {
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
type Error = DoesNotError;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||
async move {
|
||||
let mut iterated = false;
|
||||
|
||||
|
||||
@@ -92,7 +92,9 @@ impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>
|
||||
impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>> ContinuallyRan
|
||||
for TransactionSignerTask<D, ST, P>
|
||||
{
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
type Error = P::EphemeralError;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||
async {
|
||||
let mut iterated = false;
|
||||
|
||||
@@ -222,11 +224,7 @@ impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>
|
||||
let tx = TransactionFor::<ST>::read(&mut tx_buf).unwrap();
|
||||
assert!(tx_buf.is_empty());
|
||||
|
||||
self
|
||||
.publisher
|
||||
.publish(tx)
|
||||
.await
|
||||
.map_err(|e| format!("couldn't re-broadcast transactions: {e:?}"))?;
|
||||
self.publisher.publish(tx).await?;
|
||||
}
|
||||
|
||||
self.last_publication = Instant::now();
|
||||
|
||||
Reference in New Issue
Block a user