diff --git a/Cargo.lock b/Cargo.lock index bb132e4c..a3bb7df3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8166,7 +8166,6 @@ dependencies = [ "frost-schnorrkel", "futures", "hex", - "lazy_static", "libp2p", "log", "modular-frost", diff --git a/coordinator/Cargo.toml b/coordinator/Cargo.toml index 810d12a8..d00e9abe 100644 --- a/coordinator/Cargo.toml +++ b/coordinator/Cargo.toml @@ -15,7 +15,6 @@ rustdoc-args = ["--cfg", "docsrs"] [dependencies] async-trait = "0.1" -lazy_static = "1" zeroize = "^1.5" rand_core = "0.6" diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index ec9651f4..ea039bf6 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -6,7 +6,7 @@ use core::{ops::Deref, future::Future}; use std::{ sync::Arc, time::{SystemTime, Duration}, - collections::{VecDeque, HashMap}, + collections::HashMap, }; use zeroize::{Zeroize, Zeroizing}; @@ -27,7 +27,10 @@ use serai_client::{primitives::NetworkId, Public, Serai}; use message_queue::{Service, client::MessageQueue}; use futures::stream::StreamExt; -use tokio::{sync::RwLock, time::sleep}; +use tokio::{ + sync::{RwLock, mpsc}, + time::sleep, +}; use ::tributary::{ ReadWrite, ProvidedError, TransactionKind, TransactionTrait, Block, Tributary, TributaryReader, @@ -54,11 +57,6 @@ mod substrate; #[cfg(test)] pub mod tests; -lazy_static::lazy_static! { - // This is a static to satisfy lifetime expectations - static ref NEW_TRIBUTARIES: RwLock> = RwLock::new(VecDeque::new()); -} - pub struct ActiveTributary { pub spec: TributarySpec, pub tributary: Arc>>, @@ -103,6 +101,7 @@ pub async fn scan_substrate( key: Zeroizing<::F>, processors: Pro, serai: Arc, + new_tributary_channel: mpsc::UnboundedSender, ) { log::info!("scanning substrate"); @@ -162,9 +161,7 @@ pub async fn scan_substrate( // Add it to the queue // If we reboot before this is read from the queue, the fact it was saved to the database // means it'll be handled on reboot - async { - NEW_TRIBUTARIES.write().await.push_back(spec); - } + new_tributary_channel.send(spec).unwrap(); }, &processors, &serai, @@ -181,7 +178,7 @@ pub async fn scan_substrate( } } -#[allow(clippy::type_complexity)] +#[allow(clippy::too_many_arguments, clippy::type_complexity)] pub async fn scan_tributaries< D: Db, Pro: Processors, @@ -196,6 +193,7 @@ pub async fn scan_tributaries< processors: Pro, serai: Arc, tributaries: Arc>>, + mut new_tributary_channel: mpsc::UnboundedReceiver, ) { log::info!("scanning tributaries"); @@ -209,11 +207,10 @@ pub async fn scan_tributaries< loop { // The following handle_new_blocks function may take an arbitrary amount of time // Accordingly, it may take a long time to acquire a write lock on the tributaries table - // By definition of NEW_TRIBUTARIES, we allow tributaries to be added almost immediately, - // meaning the Substrate scanner won't become blocked on this + // By definition of new_tributary_channel, we allow tributaries to be 'added' almost + // immediately, meaning the Substrate scanner won't become blocked on this { - let mut new_tributaries = NEW_TRIBUTARIES.write().await; - while let Some(spec) = new_tributaries.pop_front() { + while let Ok(spec) = new_tributary_channel.try_recv() { let reader = add_tributary( raw_db.clone(), key.clone(), @@ -225,6 +222,7 @@ pub async fn scan_tributaries< .await; // Trigger a DKG for the newly added Tributary + // TODO: This needs to moved into add_tributary, or else we may never emit GenerateKey let set = spec.set(); processors .send( @@ -799,8 +797,16 @@ pub async fn run( ) { let serai = Arc::new(serai); + let (new_tributary_channel_send, new_tributary_channel_recv) = mpsc::unbounded_channel(); + // Handle new Substrate blocks - tokio::spawn(scan_substrate(raw_db.clone(), key.clone(), processors.clone(), serai.clone())); + tokio::spawn(scan_substrate( + raw_db.clone(), + key.clone(), + processors.clone(), + serai.clone(), + new_tributary_channel_send, + )); // Handle the Tributaries @@ -883,6 +889,7 @@ pub async fn run( processors.clone(), serai.clone(), tributaries.clone(), + new_tributary_channel_recv, )); } diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index 8d8cf68d..bc252d50 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -149,10 +149,6 @@ struct Behavior { mdns: libp2p::mdns::tokio::Behaviour, } -lazy_static::lazy_static! { - static ref TIME_OF_LAST_P2P_MESSAGE: Mutex = Mutex::new(Instant::now()); -} - #[allow(clippy::type_complexity)] #[derive(Clone)] pub struct LibP2p( @@ -246,10 +242,16 @@ impl LibP2p { let (receive_send, receive_recv) = mpsc::unbounded_channel(); tokio::spawn({ + let mut time_of_last_p2p_message = Instant::now(); + #[allow(clippy::needless_pass_by_ref_mut)] // False positive - async fn broadcast_raw(p2p: &mut Swarm, msg: Vec) { + async fn broadcast_raw( + p2p: &mut Swarm, + time_of_last_p2p_message: &mut Instant, + msg: Vec, + ) { // Update the time of last message - *TIME_OF_LAST_P2P_MESSAGE.lock().await = Instant::now(); + *time_of_last_p2p_message = Instant::now(); match p2p.behaviour_mut().gossipsub.publish(IdentTopic::new(LIBP2P_TOPIC), msg.clone()) { Err(PublishError::SigningError(e)) => panic!("signing error when broadcasting: {e}"), @@ -267,8 +269,7 @@ impl LibP2p { async move { // Run this task ad-infinitum loop { - let time_since_last = - Instant::now().duration_since(*TIME_OF_LAST_P2P_MESSAGE.lock().await); + let time_since_last = Instant::now().duration_since(time_of_last_p2p_message); tokio::select! { biased; @@ -276,6 +277,7 @@ impl LibP2p { msg = broadcast_recv.recv() => { broadcast_raw( &mut swarm, + &mut time_of_last_p2p_message, msg.expect("broadcast_recv closed. are we shutting down?") ).await; } @@ -324,7 +326,11 @@ impl LibP2p { // (where a finalized block only occurs due to network activity), meaning this won't be // run _ = tokio::time::sleep(Duration::from_secs(80).saturating_sub(time_since_last)) => { - broadcast_raw(&mut swarm, P2pMessageKind::KeepAlive.serialize()).await; + broadcast_raw( + &mut swarm, + &mut time_of_last_p2p_message, + P2pMessageKind::KeepAlive.serialize() + ).await; } } } diff --git a/coordinator/src/substrate/mod.rs b/coordinator/src/substrate/mod.rs index b4073a88..5ed4f7ac 100644 --- a/coordinator/src/substrate/mod.rs +++ b/coordinator/src/substrate/mod.rs @@ -1,4 +1,4 @@ -use core::{ops::Deref, time::Duration, future::Future}; +use core::{ops::Deref, time::Duration}; use std::collections::{HashSet, HashMap}; use zeroize::Zeroizing; @@ -43,12 +43,7 @@ async fn in_set( Ok(Some(data.participants.iter().any(|(participant, _)| participant.0 == key))) } -async fn handle_new_set< - D: Db, - Fut: Future, - CNT: Clone + Fn(&mut D, TributarySpec) -> Fut, - Pro: Processors, ->( +async fn handle_new_set( db: &mut D, key: &Zeroizing<::F>, create_new_tributary: CNT, @@ -84,7 +79,7 @@ async fn handle_new_set< let time = time + SUBSTRATE_TO_TRIBUTARY_TIME_DELAY; let spec = TributarySpec::new(block.hash(), time, set, set_data); - create_new_tributary(db, spec.clone()).await; + create_new_tributary(db, spec.clone()); } else { log::info!("not present in set {:?}", set); } @@ -215,12 +210,7 @@ async fn handle_batch_and_burns( // Handle a specific Substrate block, returning an error when it fails to get data // (not blocking / holding) #[allow(clippy::needless_pass_by_ref_mut)] // False positive? -async fn handle_block< - D: Db, - Fut: Future, - CNT: Clone + Fn(&mut D, TributarySpec) -> Fut, - Pro: Processors, ->( +async fn handle_block( db: &mut SubstrateDb, key: &Zeroizing<::F>, create_new_tributary: CNT, @@ -295,12 +285,7 @@ async fn handle_block< Ok(()) } -pub async fn handle_new_blocks< - D: Db, - Fut: Future, - CNT: Clone + Fn(&mut D, TributarySpec) -> Fut, - Pro: Processors, ->( +pub async fn handle_new_blocks( db: &mut SubstrateDb, key: &Zeroizing<::F>, create_new_tributary: CNT,