From b65dbacd6abb23279b52d9d40f1c1fad5d1d2774 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Mon, 26 Aug 2024 22:57:28 -0400 Subject: [PATCH] Move ContinuallyRan into primitives I'm unsure where else it'll be used within the processor, yet it's generally useful and I don't want to make a dedicated crate yet. --- Cargo.lock | 2 + processor/primitives/Cargo.toml | 3 ++ processor/primitives/src/lib.rs | 3 ++ processor/primitives/src/task.rs | 93 ++++++++++++++++++++++++++++++++ processor/scanner/src/lib.rs | 84 +---------------------------- 5 files changed, 102 insertions(+), 83 deletions(-) create mode 100644 processor/primitives/src/task.rs diff --git a/Cargo.lock b/Cargo.lock index f887bd8c..4cc54e15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8654,8 +8654,10 @@ dependencies = [ "async-trait", "borsh", "group", + "log", "parity-scale-codec", "serai-primitives", + "tokio", ] [[package]] diff --git a/processor/primitives/Cargo.toml b/processor/primitives/Cargo.toml index dd59c0a8..9427a604 100644 --- a/processor/primitives/Cargo.toml +++ b/processor/primitives/Cargo.toml @@ -25,3 +25,6 @@ serai-primitives = { path = "../../substrate/primitives", default-features = fal scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std"] } borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] } + +log = { version = "0.4", default-features = false, features = ["std"] } +tokio = { version = "1", default-features = false, features = ["macros", "sync", "time"] } diff --git a/processor/primitives/src/lib.rs b/processor/primitives/src/lib.rs index f796a13a..b0b7ae04 100644 --- a/processor/primitives/src/lib.rs +++ b/processor/primitives/src/lib.rs @@ -9,6 +9,9 @@ use group::GroupEncoding; use scale::{Encode, Decode}; use borsh::{BorshSerialize, BorshDeserialize}; +/// A module for task-related structs and functionality. +pub mod task; + mod output; pub use output::*; diff --git a/processor/primitives/src/task.rs b/processor/primitives/src/task.rs new file mode 100644 index 00000000..a7d6153c --- /dev/null +++ b/processor/primitives/src/task.rs @@ -0,0 +1,93 @@ +use core::time::Duration; + +use tokio::sync::mpsc; + +/// A handle to immediately run an iteration of a task. +#[derive(Clone)] +pub struct RunNowHandle(mpsc::Sender<()>); +/// An instruction recipient to immediately run an iteration of a task. +pub struct RunNowRecipient(mpsc::Receiver<()>); + +impl RunNowHandle { + /// Create a new run-now handle to be assigned to a task. + pub fn new() -> (Self, RunNowRecipient) { + // Uses a capacity of 1 as any call to run as soon as possible satisfies all calls to run as + // soon as possible + let (send, recv) = mpsc::channel(1); + (Self(send), RunNowRecipient(recv)) + } + + /// Tell the task to run now (and not whenever its next iteration on a timer is). + /// + /// Panics if the task has been dropped. + pub fn run_now(&self) { + #[allow(clippy::match_same_arms)] + match self.0.try_send(()) { + Ok(()) => {} + // NOP on full, as this task will already be ran as soon as possible + Err(mpsc::error::TrySendError::Full(())) => {} + Err(mpsc::error::TrySendError::Closed(())) => { + panic!("task was unexpectedly closed when calling run_now") + } + } + } +} + +/// A task to be continually ran. +#[async_trait::async_trait] +pub trait ContinuallyRan: Sized { + /// The amount of seconds before this task should be polled again. + const DELAY_BETWEEN_ITERATIONS: u64 = 5; + /// The maximum amount of seconds before this task should be run again. + /// + /// Upon error, the amount of time waited will be linearly increased until this limit. + const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 120; + + /// Run an iteration of the task. + /// + /// If this returns `true`, all dependents of the task will immediately have a new iteration ran + /// (without waiting for whatever timer they were already on). + async fn run_iteration(&mut self) -> Result; + + /// Continually run the task. + /// + /// This returns a channel which can have a message set to immediately trigger a new run of an + /// iteration. + async fn continually_run(mut self, mut run_now: RunNowRecipient, dependents: Vec) { + // The default number of seconds to sleep before running the task again + let default_sleep_before_next_task = Self::DELAY_BETWEEN_ITERATIONS; + // The current number of seconds to sleep before running the task again + // We increment this upon errors in order to not flood the logs with errors + let mut current_sleep_before_next_task = default_sleep_before_next_task; + let increase_sleep_before_next_task = |current_sleep_before_next_task: &mut u64| { + let new_sleep = *current_sleep_before_next_task + default_sleep_before_next_task; + // Set a limit of sleeping for two minutes + *current_sleep_before_next_task = new_sleep.max(Self::MAX_DELAY_BETWEEN_ITERATIONS); + }; + + loop { + match self.run_iteration().await { + Ok(run_dependents) => { + // Upon a successful (error-free) loop iteration, reset the amount of time we sleep + current_sleep_before_next_task = default_sleep_before_next_task; + + if run_dependents { + for dependent in &dependents { + dependent.run_now(); + } + } + } + Err(e) => { + log::debug!("{}", e); + increase_sleep_before_next_task(&mut current_sleep_before_next_task); + } + } + + // Don't run the task again for another few seconds UNLESS told to run now + tokio::select! { + () = tokio::time::sleep(Duration::from_secs(current_sleep_before_next_task)) => {}, + msg = run_now.0.recv() => assert_eq!(msg, Some(()), "run now handle was dropped"), + } + } + } +} diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 7919f006..822acb27 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -7,7 +7,7 @@ use serai_db::DbTxn; use serai_primitives::{NetworkId, Coin, Amount}; -use primitives::Block; +use primitives::{task::*, Block}; // Logic for deciding where in its lifetime a multisig is. mod lifetime; @@ -111,88 +111,6 @@ pub trait Scheduler { ) -> HashMap, Vec>>; } -/// A handle to immediately run an iteration of a task. -#[derive(Clone)] -pub(crate) struct RunNowHandle(mpsc::Sender<()>); -/// An instruction recipient to immediately run an iteration of a task. -pub(crate) struct RunNowRecipient(mpsc::Receiver<()>); - -impl RunNowHandle { - /// Create a new run-now handle to be assigned to a task. - pub(crate) fn new() -> (Self, RunNowRecipient) { - // Uses a capacity of 1 as any call to run as soon as possible satisfies all calls to run as - // soon as possible - let (send, recv) = mpsc::channel(1); - (Self(send), RunNowRecipient(recv)) - } - - /// Tell the task to run now (and not whenever its next iteration on a timer is). - /// - /// Panics if the task has been dropped. - pub(crate) fn run_now(&self) { - #[allow(clippy::match_same_arms)] - match self.0.try_send(()) { - Ok(()) => {} - // NOP on full, as this task will already be ran as soon as possible - Err(mpsc::error::TrySendError::Full(())) => {} - Err(mpsc::error::TrySendError::Closed(())) => { - panic!("task was unexpectedly closed when calling run_now") - } - } - } -} - -#[async_trait::async_trait] -pub(crate) trait ContinuallyRan: Sized { - /// Run an iteration of the task. - /// - /// If this returns `true`, all dependents of the task will immediately have a new iteration ran - /// (without waiting for whatever timer they were already on). - async fn run_iteration(&mut self) -> Result; - - /// Continually run the task. - /// - /// This returns a channel which can have a message set to immediately trigger a new run of an - /// iteration. - async fn continually_run(mut self, mut run_now: RunNowRecipient, dependents: Vec) { - // The default number of seconds to sleep before running the task again - let default_sleep_before_next_task = 5; - // The current number of seconds to sleep before running the task again - // We increment this upon errors in order to not flood the logs with errors - let mut current_sleep_before_next_task = default_sleep_before_next_task; - let increase_sleep_before_next_task = |current_sleep_before_next_task: &mut u64| { - let new_sleep = *current_sleep_before_next_task + default_sleep_before_next_task; - // Set a limit of sleeping for two minutes - *current_sleep_before_next_task = new_sleep.max(120); - }; - - loop { - match self.run_iteration().await { - Ok(run_dependents) => { - // Upon a successful (error-free) loop iteration, reset the amount of time we sleep - current_sleep_before_next_task = default_sleep_before_next_task; - - if run_dependents { - for dependent in &dependents { - dependent.run_now(); - } - } - } - Err(e) => { - log::debug!("{}", e); - increase_sleep_before_next_task(&mut current_sleep_before_next_task); - } - } - - // Don't run the task again for another few seconds UNLESS told to run now - tokio::select! { - () = tokio::time::sleep(Duration::from_secs(current_sleep_before_next_task)) => {}, - msg = run_now.0.recv() => assert_eq!(msg, Some(()), "run now handle was dropped"), - } - } - } -} - /// A representation of a scanner. pub struct Scanner(PhantomData); impl Scanner {