mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-12 14:09:25 +00:00
Compare commits
2 Commits
0ce9aad9b2
...
b5a6b0693e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b5a6b0693e | ||
|
|
3cc2abfedc |
@@ -2,7 +2,11 @@
|
|||||||
#![doc = include_str!("../README.md")]
|
#![doc = include_str!("../README.md")]
|
||||||
#![deny(missing_docs)]
|
#![deny(missing_docs)]
|
||||||
|
|
||||||
use core::{future::Future, time::Duration};
|
use core::{
|
||||||
|
fmt::{self, Debug},
|
||||||
|
future::Future,
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
@@ -60,6 +64,15 @@ impl TaskHandle {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// An enum which can't be constructed, representing that the task does not error.
|
||||||
|
pub enum DoesNotError {}
|
||||||
|
impl Debug for DoesNotError {
|
||||||
|
fn fmt(&self, _: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
|
||||||
|
// This type can't be constructed so we'll never have a `&self` to call this fn with
|
||||||
|
unreachable!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// A task to be continually ran.
|
/// A task to be continually ran.
|
||||||
pub trait ContinuallyRan: Sized + Send {
|
pub trait ContinuallyRan: Sized + Send {
|
||||||
/// The amount of seconds before this task should be polled again.
|
/// The amount of seconds before this task should be polled again.
|
||||||
@@ -69,11 +82,14 @@ pub trait ContinuallyRan: Sized + Send {
|
|||||||
/// Upon error, the amount of time waited will be linearly increased until this limit.
|
/// Upon error, the amount of time waited will be linearly increased until this limit.
|
||||||
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 120;
|
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 120;
|
||||||
|
|
||||||
|
/// The error potentially yielded upon running an iteration of this task.
|
||||||
|
type Error: Debug;
|
||||||
|
|
||||||
/// Run an iteration of the task.
|
/// Run an iteration of the task.
|
||||||
///
|
///
|
||||||
/// If this returns `true`, all dependents of the task will immediately have a new iteration ran
|
/// If this returns `true`, all dependents of the task will immediately have a new iteration ran
|
||||||
/// (without waiting for whatever timer they were already on).
|
/// (without waiting for whatever timer they were already on).
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>>;
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>>;
|
||||||
|
|
||||||
/// Continually run the task.
|
/// Continually run the task.
|
||||||
fn continually_run(
|
fn continually_run(
|
||||||
@@ -115,12 +131,17 @@ pub trait ContinuallyRan: Sized + Send {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
log::warn!("{}", e);
|
log::warn!("{e:?}");
|
||||||
increase_sleep_before_next_task(&mut current_sleep_before_next_task);
|
increase_sleep_before_next_task(&mut current_sleep_before_next_task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Don't run the task again for another few seconds UNLESS told to run now
|
// Don't run the task again for another few seconds UNLESS told to run now
|
||||||
|
/*
|
||||||
|
We could replace tokio::mpsc with async_channel, tokio::time::sleep with
|
||||||
|
patchable_async_sleep::sleep, and tokio::select with futures_lite::future::or
|
||||||
|
It isn't worth the effort when patchable_async_sleep::sleep will still resolve to tokio
|
||||||
|
*/
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
() = tokio::time::sleep(Duration::from_secs(current_sleep_before_next_task)) => {},
|
() = tokio::time::sleep(Duration::from_secs(current_sleep_before_next_task)) => {},
|
||||||
msg = task.run_now.recv() => {
|
msg = task.run_now.recv() => {
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ use core::future::Future;
|
|||||||
use std::time::{Duration, SystemTime};
|
use std::time::{Duration, SystemTime};
|
||||||
|
|
||||||
use serai_db::*;
|
use serai_db::*;
|
||||||
use serai_task::ContinuallyRan;
|
use serai_task::{DoesNotError, ContinuallyRan};
|
||||||
|
|
||||||
use crate::evaluator::CosignedBlocks;
|
use crate::evaluator::CosignedBlocks;
|
||||||
|
|
||||||
@@ -25,7 +25,9 @@ pub(crate) struct CosignDelayTask<D: Db> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<D: Db> ContinuallyRan for CosignDelayTask<D> {
|
impl<D: Db> ContinuallyRan for CosignDelayTask<D> {
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
type Error = DoesNotError;
|
||||||
|
|
||||||
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||||
async move {
|
async move {
|
||||||
let mut made_progress = false;
|
let mut made_progress = false;
|
||||||
loop {
|
loop {
|
||||||
|
|||||||
@@ -80,7 +80,9 @@ pub(crate) struct CosignEvaluatorTask<D: Db, R: RequestNotableCosigns> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<D: Db, R: RequestNotableCosigns> ContinuallyRan for CosignEvaluatorTask<D, R> {
|
impl<D: Db, R: RequestNotableCosigns> ContinuallyRan for CosignEvaluatorTask<D, R> {
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
type Error = String;
|
||||||
|
|
||||||
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||||
async move {
|
async move {
|
||||||
let mut known_cosign = None;
|
let mut known_cosign = None;
|
||||||
let mut made_progress = false;
|
let mut made_progress = false;
|
||||||
|
|||||||
@@ -61,7 +61,9 @@ pub(crate) struct CosignIntendTask<D: Db> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<D: Db> ContinuallyRan for CosignIntendTask<D> {
|
impl<D: Db> ContinuallyRan for CosignIntendTask<D> {
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
type Error = String;
|
||||||
|
|
||||||
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||||
async move {
|
async move {
|
||||||
let start_block_number = ScanCosignFrom::get(&self.db).unwrap_or(1);
|
let start_block_number = ScanCosignFrom::get(&self.db).unwrap_or(1);
|
||||||
let latest_block_number =
|
let latest_block_number =
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ use rand_core::{RngCore, OsRng};
|
|||||||
|
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
use serai_client::Serai;
|
use serai_client::{SeraiError, Serai};
|
||||||
|
|
||||||
use libp2p::{
|
use libp2p::{
|
||||||
core::multiaddr::{Protocol, Multiaddr},
|
core::multiaddr::{Protocol, Multiaddr},
|
||||||
@@ -50,7 +50,9 @@ impl ContinuallyRan for DialTask {
|
|||||||
const DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60;
|
const DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60;
|
||||||
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 10 * 60;
|
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 10 * 60;
|
||||||
|
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
type Error = SeraiError;
|
||||||
|
|
||||||
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||||
async move {
|
async move {
|
||||||
self.validators.update().await?;
|
self.validators.update().await?;
|
||||||
|
|
||||||
@@ -83,8 +85,7 @@ impl ContinuallyRan for DialTask {
|
|||||||
.unwrap_or(0)
|
.unwrap_or(0)
|
||||||
.saturating_sub(1))
|
.saturating_sub(1))
|
||||||
{
|
{
|
||||||
let mut potential_peers =
|
let mut potential_peers = self.serai.p2p_validators(network).await?;
|
||||||
self.serai.p2p_validators(network).await.map_err(|e| format!("{e:?}"))?;
|
|
||||||
for _ in 0 .. (TARGET_PEERS_PER_NETWORK - peer_count) {
|
for _ in 0 .. (TARGET_PEERS_PER_NETWORK - peer_count) {
|
||||||
if potential_peers.is_empty() {
|
if potential_peers.is_empty() {
|
||||||
break;
|
break;
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ use std::{
|
|||||||
collections::{HashSet, HashMap},
|
collections::{HashSet, HashMap},
|
||||||
};
|
};
|
||||||
|
|
||||||
use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, Serai};
|
use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, SeraiError, Serai};
|
||||||
|
|
||||||
use serai_task::{Task, ContinuallyRan};
|
use serai_task::{Task, ContinuallyRan};
|
||||||
|
|
||||||
@@ -50,9 +50,8 @@ impl Validators {
|
|||||||
async fn session_changes(
|
async fn session_changes(
|
||||||
serai: impl Borrow<Serai>,
|
serai: impl Borrow<Serai>,
|
||||||
sessions: impl Borrow<HashMap<NetworkId, Session>>,
|
sessions: impl Borrow<HashMap<NetworkId, Session>>,
|
||||||
) -> Result<Vec<(NetworkId, Session, HashSet<PeerId>)>, String> {
|
) -> Result<Vec<(NetworkId, Session, HashSet<PeerId>)>, SeraiError> {
|
||||||
let temporal_serai =
|
let temporal_serai = serai.borrow().as_of_latest_finalized_block().await?;
|
||||||
serai.borrow().as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?;
|
|
||||||
let temporal_serai = temporal_serai.validator_sets();
|
let temporal_serai = temporal_serai.validator_sets();
|
||||||
|
|
||||||
let mut session_changes = vec![];
|
let mut session_changes = vec![];
|
||||||
@@ -69,7 +68,7 @@ impl Validators {
|
|||||||
let session = match temporal_serai.session(network).await {
|
let session = match temporal_serai.session(network).await {
|
||||||
Ok(Some(session)) => session,
|
Ok(Some(session)) => session,
|
||||||
Ok(None) => return Ok(None),
|
Ok(None) => return Ok(None),
|
||||||
Err(e) => return Err(format!("{e:?}")),
|
Err(e) => return Err(e),
|
||||||
};
|
};
|
||||||
|
|
||||||
if sessions.get(&network) == Some(&session) {
|
if sessions.get(&network) == Some(&session) {
|
||||||
@@ -81,7 +80,7 @@ impl Validators {
|
|||||||
session,
|
session,
|
||||||
validators.into_iter().map(peer_id_from_public).collect(),
|
validators.into_iter().map(peer_id_from_public).collect(),
|
||||||
))),
|
))),
|
||||||
Err(e) => Err(format!("{e:?}")),
|
Err(e) => Err(e),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -147,7 +146,7 @@ impl Validators {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Update the view of the validators.
|
/// Update the view of the validators.
|
||||||
pub(crate) async fn update(&mut self) -> Result<(), String> {
|
pub(crate) async fn update(&mut self) -> Result<(), SeraiError> {
|
||||||
let session_changes = Self::session_changes(&*self.serai, &self.sessions).await?;
|
let session_changes = Self::session_changes(&*self.serai, &self.sessions).await?;
|
||||||
self.incorporate_session_changes(session_changes);
|
self.incorporate_session_changes(session_changes);
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -200,13 +199,13 @@ impl ContinuallyRan for UpdateValidatorsTask {
|
|||||||
const DELAY_BETWEEN_ITERATIONS: u64 = 60;
|
const DELAY_BETWEEN_ITERATIONS: u64 = 60;
|
||||||
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60;
|
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60;
|
||||||
|
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
type Error = SeraiError;
|
||||||
|
|
||||||
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||||
async move {
|
async move {
|
||||||
let session_changes = {
|
let session_changes = {
|
||||||
let validators = self.validators.read().await;
|
let validators = self.validators.read().await;
|
||||||
Validators::session_changes(validators.serai.clone(), validators.sessions.clone())
|
Validators::session_changes(validators.serai.clone(), validators.sessions.clone()).await?
|
||||||
.await
|
|
||||||
.map_err(|e| format!("{e:?}"))?
|
|
||||||
};
|
};
|
||||||
self.validators.write().await.incorporate_session_changes(session_changes);
|
self.validators.write().await.incorporate_session_changes(session_changes);
|
||||||
Ok(true)
|
Ok(true)
|
||||||
|
|||||||
@@ -45,7 +45,9 @@ pub(crate) struct HeartbeatTask<TD: Db, Tx: TransactionTrait, P: P2p> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<TD: Db, Tx: TransactionTrait, P: P2p> ContinuallyRan for HeartbeatTask<TD, Tx, P> {
|
impl<TD: Db, Tx: TransactionTrait, P: P2p> ContinuallyRan for HeartbeatTask<TD, Tx, P> {
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
type Error = String;
|
||||||
|
|
||||||
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||||
async move {
|
async move {
|
||||||
// If our blockchain hasn't had a block in the past minute, trigger the heartbeat protocol
|
// If our blockchain hasn't had a block in the past minute, trigger the heartbeat protocol
|
||||||
const TIME_TO_TRIGGER_SYNCING: Duration = Duration::from_secs(60);
|
const TIME_TO_TRIGGER_SYNCING: Duration = Duration::from_secs(60);
|
||||||
|
|||||||
@@ -39,6 +39,8 @@ mod p2p {
|
|||||||
pub use serai_coordinator_libp2p_p2p::Libp2p;
|
pub use serai_coordinator_libp2p_p2p::Libp2p;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mod serai;
|
||||||
|
|
||||||
// Use a zeroizing allocator for this entire application
|
// Use a zeroizing allocator for this entire application
|
||||||
// While secrets should already be zeroized, the presence of secret keys in a networked application
|
// While secrets should already be zeroized, the presence of secret keys in a networked application
|
||||||
// (at increased risk of OOB reads) justifies the performance hit in case any secrets weren't
|
// (at increased risk of OOB reads) justifies the performance hit in case any secrets weren't
|
||||||
|
|||||||
95
coordinator/src/serai.rs
Normal file
95
coordinator/src/serai.rs
Normal file
@@ -0,0 +1,95 @@
|
|||||||
|
use core::future::Future;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use serai_db::{Get, DbTxn, Db as DbTrait, create_db};
|
||||||
|
|
||||||
|
use scale::Decode;
|
||||||
|
use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, Serai};
|
||||||
|
|
||||||
|
use serai_task::ContinuallyRan;
|
||||||
|
|
||||||
|
create_db! {
|
||||||
|
CoordinatorSerai {
|
||||||
|
SlashReports: (network: NetworkId) -> (Session, Vec<u8>),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Publish `SlashReport`s from `SlashReports` onto Serai.
|
||||||
|
pub struct PublishSlashReportTask<CD: DbTrait> {
|
||||||
|
db: CD,
|
||||||
|
serai: Arc<Serai>,
|
||||||
|
}
|
||||||
|
impl<CD: DbTrait> ContinuallyRan for PublishSlashReportTask<CD> {
|
||||||
|
type Error = String;
|
||||||
|
|
||||||
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||||
|
async move {
|
||||||
|
let mut made_progress = false;
|
||||||
|
for network in serai_client::primitives::NETWORKS {
|
||||||
|
if network == NetworkId::Serai {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut txn = self.db.txn();
|
||||||
|
let Some((session, slash_report)) = SlashReports::take(&mut txn, network) else {
|
||||||
|
// No slash report to publish
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let slash_report = serai_client::Transaction::decode(&mut slash_report.as_slice()).unwrap();
|
||||||
|
|
||||||
|
let serai =
|
||||||
|
self.serai.as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?;
|
||||||
|
let serai = serai.validator_sets();
|
||||||
|
let session_after_slash_report = Session(session.0 + 1);
|
||||||
|
let current_session = serai.session(network).await.map_err(|e| format!("{e:?}"))?;
|
||||||
|
let current_session = current_session.map(|session| session.0);
|
||||||
|
// Only attempt to publish the slash report for session #n while session #n+1 is still
|
||||||
|
// active
|
||||||
|
let session_after_slash_report_retired =
|
||||||
|
current_session > Some(session_after_slash_report.0);
|
||||||
|
if session_after_slash_report_retired {
|
||||||
|
// Commit the txn to drain this SlashReport from the database and not try it again later
|
||||||
|
txn.commit();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if Some(session_after_slash_report.0) != current_session {
|
||||||
|
// We already checked the current session wasn't greater, and they're not equal
|
||||||
|
assert!(current_session < Some(session_after_slash_report.0));
|
||||||
|
// This would mean the Serai node is resyncing and is behind where it prior was
|
||||||
|
Err("have a SlashReport for a session Serai has yet to retire".to_string())?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If this session which should publish a slash report already has, move on
|
||||||
|
let key_pending_slash_report =
|
||||||
|
serai.key_pending_slash_report(network).await.map_err(|e| format!("{e:?}"))?;
|
||||||
|
if key_pending_slash_report.is_none() {
|
||||||
|
txn.commit();
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
/*
|
||||||
|
let tx = serai_client::SeraiValidatorSets::report_slashes(
|
||||||
|
network,
|
||||||
|
slash_report,
|
||||||
|
signature.clone(),
|
||||||
|
);
|
||||||
|
*/
|
||||||
|
|
||||||
|
match self.serai.publish(&slash_report).await {
|
||||||
|
Ok(()) => {
|
||||||
|
txn.commit();
|
||||||
|
made_progress = true;
|
||||||
|
}
|
||||||
|
// This could be specific to this TX (such as an already in mempool error) and it may be
|
||||||
|
// worthwhile to continue iteration with the other pending slash reports. We assume this
|
||||||
|
// error ephemeral and that the latency incurred for this ephemeral error to resolve is
|
||||||
|
// miniscule compared to the window available to publish the slash report. That makes
|
||||||
|
// this a non-issue.
|
||||||
|
Err(e) => Err(format!("couldn't publish slash report transaction: {e:?}"))?,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(made_progress)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -32,7 +32,8 @@ pub(crate) struct SubstrateTask<P: P2p> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<P: P2p> ContinuallyRan for SubstrateTask<P> {
|
impl<P: P2p> ContinuallyRan for SubstrateTask<P> {
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
type Error = String; // TODO
|
||||||
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||||
async move {
|
async move {
|
||||||
let mut made_progress = false;
|
let mut made_progress = false;
|
||||||
|
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ use serai_client::validator_sets::primitives::ValidatorSet;
|
|||||||
|
|
||||||
use tributary_sdk::{TransactionKind, TransactionError, ProvidedError, TransactionTrait, Tributary};
|
use tributary_sdk::{TransactionKind, TransactionError, ProvidedError, TransactionTrait, Tributary};
|
||||||
|
|
||||||
use serai_task::{Task, TaskHandle, ContinuallyRan};
|
use serai_task::{Task, TaskHandle, DoesNotError, ContinuallyRan};
|
||||||
|
|
||||||
use message_queue::{Service, Metadata, client::MessageQueue};
|
use message_queue::{Service, Metadata, client::MessageQueue};
|
||||||
|
|
||||||
@@ -76,7 +76,9 @@ pub(crate) struct ProvideCosignCosignedTransactionsTask<CD: DbTrait, TD: DbTrait
|
|||||||
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
|
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
|
||||||
for ProvideCosignCosignedTransactionsTask<CD, TD, P>
|
for ProvideCosignCosignedTransactionsTask<CD, TD, P>
|
||||||
{
|
{
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
type Error = String;
|
||||||
|
|
||||||
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||||
async move {
|
async move {
|
||||||
let mut made_progress = false;
|
let mut made_progress = false;
|
||||||
|
|
||||||
@@ -154,7 +156,9 @@ pub(crate) struct AddTributaryTransactionsTask<CD: DbTrait, TD: DbTrait, P: P2p>
|
|||||||
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
|
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
|
||||||
}
|
}
|
||||||
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for AddTributaryTransactionsTask<CD, TD, P> {
|
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for AddTributaryTransactionsTask<CD, TD, P> {
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
type Error = DoesNotError;
|
||||||
|
|
||||||
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||||
async move {
|
async move {
|
||||||
let mut made_progress = false;
|
let mut made_progress = false;
|
||||||
loop {
|
loop {
|
||||||
@@ -212,7 +216,9 @@ pub(crate) struct TributaryProcessorMessagesTask<TD: DbTrait> {
|
|||||||
message_queue: Arc<MessageQueue>,
|
message_queue: Arc<MessageQueue>,
|
||||||
}
|
}
|
||||||
impl<TD: DbTrait> ContinuallyRan for TributaryProcessorMessagesTask<TD> {
|
impl<TD: DbTrait> ContinuallyRan for TributaryProcessorMessagesTask<TD> {
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
type Error = String; // TODO
|
||||||
|
|
||||||
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||||
async move {
|
async move {
|
||||||
let mut made_progress = false;
|
let mut made_progress = false;
|
||||||
loop {
|
loop {
|
||||||
@@ -242,7 +248,9 @@ pub(crate) struct SignSlashReportTask<CD: DbTrait, TD: DbTrait, P: P2p> {
|
|||||||
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
|
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
|
||||||
}
|
}
|
||||||
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for SignSlashReportTask<CD, TD, P> {
|
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for SignSlashReportTask<CD, TD, P> {
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
type Error = DoesNotError;
|
||||||
|
|
||||||
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||||
async move {
|
async move {
|
||||||
let mut txn = self.db.txn();
|
let mut txn = self.db.txn();
|
||||||
let Some(()) = SignSlashReport::try_recv(&mut txn, self.set.set) else { return Ok(false) };
|
let Some(()) = SignSlashReport::try_recv(&mut txn, self.set.set) else { return Ok(false) };
|
||||||
|
|||||||
@@ -34,7 +34,9 @@ impl<D: Db> CanonicalEventStream<D> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<D: Db> ContinuallyRan for CanonicalEventStream<D> {
|
impl<D: Db> ContinuallyRan for CanonicalEventStream<D> {
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
type Error = String;
|
||||||
|
|
||||||
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||||
async move {
|
async move {
|
||||||
let next_block = NextBlock::get(&self.db).unwrap_or(0);
|
let next_block = NextBlock::get(&self.db).unwrap_or(0);
|
||||||
let latest_finalized_block =
|
let latest_finalized_block =
|
||||||
|
|||||||
@@ -39,7 +39,9 @@ impl<D: Db> EphemeralEventStream<D> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<D: Db> ContinuallyRan for EphemeralEventStream<D> {
|
impl<D: Db> ContinuallyRan for EphemeralEventStream<D> {
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
type Error = String;
|
||||||
|
|
||||||
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||||
async move {
|
async move {
|
||||||
let next_block = NextBlock::get(&self.db).unwrap_or(0);
|
let next_block = NextBlock::get(&self.db).unwrap_or(0);
|
||||||
let latest_finalized_block =
|
let latest_finalized_block =
|
||||||
|
|||||||
@@ -512,7 +512,9 @@ impl<TD: Db, P: P2p> ScanTributaryTask<TD, P> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<TD, P> {
|
impl<TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<TD, P> {
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
type Error = String;
|
||||||
|
|
||||||
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||||
async move {
|
async move {
|
||||||
let (mut last_block_number, mut last_block_hash) =
|
let (mut last_block_number, mut last_block_hash) =
|
||||||
TributaryDb::last_handled_tributary_block(&self.tributary_db, self.set)
|
TributaryDb::last_handled_tributary_block(&self.tributary_db, self.set)
|
||||||
|
|||||||
@@ -95,6 +95,7 @@ impl Coordinator {
|
|||||||
message_queue.ack(Service::Coordinator, msg.id).await;
|
message_queue.ack(Service::Coordinator, msg.id).await;
|
||||||
|
|
||||||
// Fire that there's a new message
|
// Fire that there's a new message
|
||||||
|
// This assumes the success path, not the just-rebooted-path
|
||||||
received_message_send
|
received_message_send
|
||||||
.send(())
|
.send(())
|
||||||
.expect("failed to tell the Coordinator there's a new message");
|
.expect("failed to tell the Coordinator there's a new message");
|
||||||
|
|||||||
@@ -39,7 +39,9 @@ pub(crate) fn script_pubkey_for_on_chain_output(
|
|||||||
pub(crate) struct TxIndexTask<D: Db>(pub(crate) Rpc<D>);
|
pub(crate) struct TxIndexTask<D: Db>(pub(crate) Rpc<D>);
|
||||||
|
|
||||||
impl<D: Db> ContinuallyRan for TxIndexTask<D> {
|
impl<D: Db> ContinuallyRan for TxIndexTask<D> {
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
type Error = String;
|
||||||
|
|
||||||
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||||
async move {
|
async move {
|
||||||
let latest_block_number = self
|
let latest_block_number = self
|
||||||
.0
|
.0
|
||||||
|
|||||||
@@ -7,7 +7,10 @@ use serai_db::{DbTxn, Db};
|
|||||||
|
|
||||||
use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch};
|
use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch};
|
||||||
|
|
||||||
use primitives::{EncodableG, task::ContinuallyRan};
|
use primitives::{
|
||||||
|
EncodableG,
|
||||||
|
task::{DoesNotError, ContinuallyRan},
|
||||||
|
};
|
||||||
use crate::{
|
use crate::{
|
||||||
db::{Returnable, ScannerGlobalDb, InInstructionData, ScanToBatchDb, BatchData, BatchToReportDb},
|
db::{Returnable, ScannerGlobalDb, InInstructionData, ScanToBatchDb, BatchData, BatchToReportDb},
|
||||||
index,
|
index,
|
||||||
@@ -60,7 +63,9 @@ impl<D: Db, S: ScannerFeed> BatchTask<D, S> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<D: Db, S: ScannerFeed> ContinuallyRan for BatchTask<D, S> {
|
impl<D: Db, S: ScannerFeed> ContinuallyRan for BatchTask<D, S> {
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
type Error = DoesNotError;
|
||||||
|
|
||||||
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||||
async move {
|
async move {
|
||||||
let highest_batchable = {
|
let highest_batchable = {
|
||||||
// Fetch the next to scan block
|
// Fetch the next to scan block
|
||||||
|
|||||||
@@ -190,7 +190,9 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> EventualityTask<D, S, Sch> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTask<D, S, Sch> {
|
impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTask<D, S, Sch> {
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
type Error = String;
|
||||||
|
|
||||||
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||||
async move {
|
async move {
|
||||||
// Fetch the highest acknowledged block
|
// Fetch the highest acknowledged block
|
||||||
let Some(highest_acknowledged) = ScannerGlobalDb::<S>::highest_acknowledged_block(&self.db)
|
let Some(highest_acknowledged) = ScannerGlobalDb::<S>::highest_acknowledged_block(&self.db)
|
||||||
|
|||||||
@@ -58,7 +58,9 @@ impl<D: Db, S: ScannerFeed> IndexTask<D, S> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<D: Db, S: ScannerFeed> ContinuallyRan for IndexTask<D, S> {
|
impl<D: Db, S: ScannerFeed> ContinuallyRan for IndexTask<D, S> {
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
type Error = String;
|
||||||
|
|
||||||
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||||
async move {
|
async move {
|
||||||
// Fetch the latest finalized block
|
// Fetch the latest finalized block
|
||||||
let our_latest_finalized = IndexDb::latest_finalized_block(&self.db)
|
let our_latest_finalized = IndexDb::latest_finalized_block(&self.db)
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ use serai_db::{DbTxn, Db};
|
|||||||
|
|
||||||
use serai_validator_sets_primitives::Session;
|
use serai_validator_sets_primitives::Session;
|
||||||
|
|
||||||
use primitives::task::ContinuallyRan;
|
use primitives::task::{DoesNotError, ContinuallyRan};
|
||||||
use crate::{
|
use crate::{
|
||||||
db::{BatchData, BatchToReportDb, BatchesToSign},
|
db::{BatchData, BatchToReportDb, BatchesToSign},
|
||||||
substrate, ScannerFeed,
|
substrate, ScannerFeed,
|
||||||
@@ -27,7 +27,9 @@ impl<D: Db, S: ScannerFeed> ReportTask<D, S> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
|
impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
type Error = DoesNotError;
|
||||||
|
|
||||||
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||||
async move {
|
async move {
|
||||||
let mut made_progress = false;
|
let mut made_progress = false;
|
||||||
loop {
|
loop {
|
||||||
|
|||||||
@@ -98,7 +98,9 @@ impl<D: Db, S: ScannerFeed> ScanTask<D, S> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanTask<D, S> {
|
impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanTask<D, S> {
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
type Error = String;
|
||||||
|
|
||||||
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||||
async move {
|
async move {
|
||||||
// Fetch the safe to scan block
|
// Fetch the safe to scan block
|
||||||
let latest_scannable =
|
let latest_scannable =
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ use serai_db::{Get, DbTxn, Db};
|
|||||||
use serai_coins_primitives::{OutInstruction, OutInstructionWithBalance};
|
use serai_coins_primitives::{OutInstruction, OutInstructionWithBalance};
|
||||||
|
|
||||||
use messages::substrate::ExecutedBatch;
|
use messages::substrate::ExecutedBatch;
|
||||||
use primitives::task::ContinuallyRan;
|
use primitives::task::{DoesNotError, ContinuallyRan};
|
||||||
use crate::{
|
use crate::{
|
||||||
db::{ScannerGlobalDb, SubstrateToEventualityDb, AcknowledgedBatches},
|
db::{ScannerGlobalDb, SubstrateToEventualityDb, AcknowledgedBatches},
|
||||||
index, batch, ScannerFeed, KeyFor,
|
index, batch, ScannerFeed, KeyFor,
|
||||||
@@ -50,7 +50,9 @@ impl<D: Db, S: ScannerFeed> SubstrateTask<D, S> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<D: Db, S: ScannerFeed> ContinuallyRan for SubstrateTask<D, S> {
|
impl<D: Db, S: ScannerFeed> ContinuallyRan for SubstrateTask<D, S> {
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
type Error = DoesNotError;
|
||||||
|
|
||||||
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||||
async move {
|
async move {
|
||||||
let mut made_progress = false;
|
let mut made_progress = false;
|
||||||
loop {
|
loop {
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ use serai_db::{Get, DbTxn, Db};
|
|||||||
|
|
||||||
use messages::sign::VariantSignId;
|
use messages::sign::VariantSignId;
|
||||||
|
|
||||||
use primitives::task::ContinuallyRan;
|
use primitives::task::{DoesNotError, ContinuallyRan};
|
||||||
use scanner::{BatchesToSign, AcknowledgedBatches};
|
use scanner::{BatchesToSign, AcknowledgedBatches};
|
||||||
|
|
||||||
use frost_attempt_manager::*;
|
use frost_attempt_manager::*;
|
||||||
@@ -79,7 +79,9 @@ impl<D: Db, E: GroupEncoding> BatchSignerTask<D, E> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<D: Db, E: Send + GroupEncoding> ContinuallyRan for BatchSignerTask<D, E> {
|
impl<D: Db, E: Send + GroupEncoding> ContinuallyRan for BatchSignerTask<D, E> {
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
type Error = DoesNotError;
|
||||||
|
|
||||||
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||||
async move {
|
async move {
|
||||||
let mut iterated = false;
|
let mut iterated = false;
|
||||||
|
|
||||||
|
|||||||
@@ -22,7 +22,9 @@ impl<D: Db, C: Coordinator> CoordinatorTask<D, C> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<D: Db, C: Coordinator> ContinuallyRan for CoordinatorTask<D, C> {
|
impl<D: Db, C: Coordinator> ContinuallyRan for CoordinatorTask<D, C> {
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
type Error = String;
|
||||||
|
|
||||||
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||||
async move {
|
async move {
|
||||||
let mut iterated = false;
|
let mut iterated = false;
|
||||||
|
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ use serai_db::{DbTxn, Db};
|
|||||||
|
|
||||||
use messages::{sign::VariantSignId, coordinator::cosign_block_msg};
|
use messages::{sign::VariantSignId, coordinator::cosign_block_msg};
|
||||||
|
|
||||||
use primitives::task::ContinuallyRan;
|
use primitives::task::{DoesNotError, ContinuallyRan};
|
||||||
|
|
||||||
use frost_attempt_manager::*;
|
use frost_attempt_manager::*;
|
||||||
|
|
||||||
@@ -51,7 +51,9 @@ impl<D: Db> CosignerTask<D> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<D: Db> ContinuallyRan for CosignerTask<D> {
|
impl<D: Db> ContinuallyRan for CosignerTask<D> {
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
type Error = DoesNotError;
|
||||||
|
|
||||||
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, DoesNotError>> {
|
||||||
async move {
|
async move {
|
||||||
let mut iterated = false;
|
let mut iterated = false;
|
||||||
|
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ use serai_db::{DbTxn, Db};
|
|||||||
|
|
||||||
use messages::sign::VariantSignId;
|
use messages::sign::VariantSignId;
|
||||||
|
|
||||||
use primitives::task::ContinuallyRan;
|
use primitives::task::{DoesNotError, ContinuallyRan};
|
||||||
use scanner::ScannerFeed;
|
use scanner::ScannerFeed;
|
||||||
|
|
||||||
use frost_attempt_manager::*;
|
use frost_attempt_manager::*;
|
||||||
@@ -52,7 +52,9 @@ impl<D: Db, S: ScannerFeed> SlashReportSignerTask<D, S> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<D: Db, S: ScannerFeed> ContinuallyRan for SlashReportSignerTask<D, S> {
|
impl<D: Db, S: ScannerFeed> ContinuallyRan for SlashReportSignerTask<D, S> {
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
type Error = DoesNotError;
|
||||||
|
|
||||||
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||||
async move {
|
async move {
|
||||||
let mut iterated = false;
|
let mut iterated = false;
|
||||||
|
|
||||||
|
|||||||
@@ -92,7 +92,9 @@ impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>
|
|||||||
impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>> ContinuallyRan
|
impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>> ContinuallyRan
|
||||||
for TransactionSignerTask<D, ST, P>
|
for TransactionSignerTask<D, ST, P>
|
||||||
{
|
{
|
||||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
type Error = P::EphemeralError;
|
||||||
|
|
||||||
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||||
async {
|
async {
|
||||||
let mut iterated = false;
|
let mut iterated = false;
|
||||||
|
|
||||||
@@ -222,11 +224,7 @@ impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>
|
|||||||
let tx = TransactionFor::<ST>::read(&mut tx_buf).unwrap();
|
let tx = TransactionFor::<ST>::read(&mut tx_buf).unwrap();
|
||||||
assert!(tx_buf.is_empty());
|
assert!(tx_buf.is_empty());
|
||||||
|
|
||||||
self
|
self.publisher.publish(tx).await?;
|
||||||
.publisher
|
|
||||||
.publish(tx)
|
|
||||||
.await
|
|
||||||
.map_err(|e| format!("couldn't re-broadcast transactions: {e:?}"))?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
self.last_publication = Instant::now();
|
self.last_publication = Instant::now();
|
||||||
|
|||||||
Reference in New Issue
Block a user