diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 7898b619..391ecef1 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -1,4 +1,4 @@ -use core::{ops::Deref, future::Future}; +use core::ops::Deref; use std::{ sync::Arc, time::{SystemTime, Duration}, @@ -191,125 +191,6 @@ pub async fn scan_substrate( } } -pub(crate) trait RIDTrait: - Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32], u32) -> FRid -{ -} -impl FRid> - RIDTrait for F -{ -} - -pub(crate) async fn scan_tributaries< - D: Db, - Pro: Processors, - P: P2p, - FRid: Send + Future, - RID: 'static + Send + Sync + RIDTrait, ->( - raw_db: D, - key: Zeroizing<::F>, - recognized_id: RID, - processors: Pro, - serai: Arc, - mut new_tributary: broadcast::Receiver>, -) { - log::info!("scanning tributaries"); - - loop { - match new_tributary.recv().await { - Ok(ActiveTributary { spec, tributary }) => { - // For each Tributary, spawn a dedicated scanner task - tokio::spawn({ - let raw_db = raw_db.clone(); - let key = key.clone(); - let recognized_id = recognized_id.clone(); - let processors = processors.clone(); - let serai = serai.clone(); - async move { - let spec = &spec; - let reader = tributary.reader(); - let mut tributary_db = tributary::TributaryDb::new(raw_db.clone()); - loop { - // Obtain the next block notification now to prevent obtaining it immediately after - // the next block occurs - let next_block_notification = tributary.next_block_notification().await; - - tributary::scanner::handle_new_blocks::<_, _, _, _, _, _, P>( - &mut tributary_db, - &key, - recognized_id.clone(), - &processors, - |set, tx| { - let serai = serai.clone(); - async move { - loop { - match serai.publish(&tx).await { - Ok(_) => { - log::info!("set key pair for {set:?}"); - break; - } - // This is assumed to be some ephemeral error due to the assumed fault-free - // creation - // TODO2: Differentiate connection errors from invariants - Err(e) => { - if let Ok(latest) = serai.get_latest_block_hash().await { - // Check if this failed because the keys were already set by someone - // else - if matches!(serai.get_keys(spec.set(), latest).await, Ok(Some(_))) { - log::info!("another coordinator set key pair for {:?}", set); - break; - } - - // The above block may return false if the keys have been pruned from - // the state - // Check if this session is no longer the latest session, meaning it at - // some point did set keys, and we're just operating off very - // historical data - if let Ok(Some(current_session)) = - serai.get_session(spec.set().network, latest).await - { - if current_session.0 > spec.set().session.0 { - log::warn!( - "trying to set keys for a set which isn't the latest {:?}", - set - ); - break; - } - } - } - - log::error!( - "couldn't connect to Serai node to publish set_keys TX: {:?}", - e - ); - sleep(Duration::from_secs(10)).await; - } - } - } - } - }, - spec, - &reader, - ) - .await; - - next_block_notification - .await - .map_err(|_| "") - .expect("tributary dropped its notifications?"); - } - } - }); - } - Err(broadcast::error::RecvError::Lagged(_)) => { - panic!("scan_tributaries lagged to handle new_tributary") - } - Err(broadcast::error::RecvError::Closed) => panic!("new_tributary sender closed"), - } - } -} - pub async fn heartbeat_tributaries( p2p: P, mut new_tributary: broadcast::Receiver>, @@ -638,7 +519,7 @@ async fn handle_processor_messages( ); // TODO: Find all Tributaries active at this Substrate block, and make sure we have - // them all + // them all (if we were present in them) for tributary in tributaries.values() { // TODO: This needs to be scoped per multisig @@ -1266,7 +1147,7 @@ pub async fn run( // Handle new blocks for each Tributary { let raw_db = raw_db.clone(); - tokio::spawn(scan_tributaries( + tokio::spawn(tributary::scanner::scan_tributaries( raw_db, key.clone(), recognized_id, diff --git a/coordinator/src/tributary/handle.rs b/coordinator/src/tributary/handle.rs index 1ec83a97..8b4ff9df 100644 --- a/coordinator/src/tributary/handle.rs +++ b/coordinator/src/tributary/handle.rs @@ -35,8 +35,9 @@ use serai_db::{Get, Db}; use crate::{ processors::Processors, tributary::{ - Transaction, TributarySpec, Topic, DataSpecification, TributaryDb, nonce_decider::NonceDecider, - scanner::RecognizedIdType, + Transaction, TributarySpec, Topic, DataSpecification, TributaryDb, + nonce_decider::NonceDecider, + scanner::{RecognizedIdType, RIDTrait}, }, }; @@ -240,7 +241,7 @@ pub(crate) async fn handle_application_tx< FPst: Future, PST: Clone + Fn(ValidatorSet, Encoded) -> FPst, FRid: Future, - RID: crate::RIDTrait, + RID: RIDTrait, >( tx: Transaction, spec: &TributarySpec, diff --git a/coordinator/src/tributary/scanner.rs b/coordinator/src/tributary/scanner.rs index ee254ff4..20cdea6d 100644 --- a/coordinator/src/tributary/scanner.rs +++ b/coordinator/src/tributary/scanner.rs @@ -1,10 +1,15 @@ use core::future::Future; +use std::sync::Arc; use zeroize::Zeroizing; use ciphersuite::{Ciphersuite, Ristretto}; -use serai_client::{validator_sets::primitives::ValidatorSet, subxt::utils::Encoded}; +use tokio::sync::broadcast; + +use serai_client::{ + primitives::NetworkId, validator_sets::primitives::ValidatorSet, subxt::utils::Encoded, Serai, +}; use tributary::{ TransactionKind, Transaction as TributaryTransaction, Block, TributaryReader, @@ -30,6 +35,15 @@ pub enum RecognizedIdType { Plan, } +pub(crate) trait RIDTrait: + Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32], u32) -> FRid +{ +} +impl FRid> + RIDTrait for F +{ +} + // Handle a specific Tributary block #[allow(clippy::needless_pass_by_ref_mut)] // False positive? async fn handle_block< @@ -38,7 +52,7 @@ async fn handle_block< FPst: Future, PST: Clone + Fn(ValidatorSet, Encoded) -> FPst, FRid: Future, - RID: crate::RIDTrait, + RID: RIDTrait, P: P2p, >( db: &mut TributaryDb, @@ -105,7 +119,7 @@ pub(crate) async fn handle_new_blocks< FPst: Future, PST: Clone + Fn(ValidatorSet, Encoded) -> FPst, FRid: Future, - RID: crate::RIDTrait, + RID: RIDTrait, P: P2p, >( db: &mut TributaryDb, @@ -148,3 +162,113 @@ pub(crate) async fn handle_new_blocks< db.set_last_block(genesis, next); } } + +pub(crate) async fn scan_tributaries< + D: Db, + Pro: Processors, + P: P2p, + FRid: Send + Future, + RID: 'static + Send + Sync + RIDTrait, +>( + raw_db: D, + key: Zeroizing<::F>, + recognized_id: RID, + processors: Pro, + serai: Arc, + mut new_tributary: broadcast::Receiver>, +) { + log::info!("scanning tributaries"); + + loop { + match new_tributary.recv().await { + Ok(crate::ActiveTributary { spec, tributary }) => { + // For each Tributary, spawn a dedicated scanner task + tokio::spawn({ + let raw_db = raw_db.clone(); + let key = key.clone(); + let recognized_id = recognized_id.clone(); + let processors = processors.clone(); + let serai = serai.clone(); + async move { + let spec = &spec; + let reader = tributary.reader(); + let mut tributary_db = TributaryDb::new(raw_db.clone()); + loop { + // Obtain the next block notification now to prevent obtaining it immediately after + // the next block occurs + let next_block_notification = tributary.next_block_notification().await; + + handle_new_blocks::<_, _, _, _, _, _, P>( + &mut tributary_db, + &key, + recognized_id.clone(), + &processors, + |set, tx| { + let serai = serai.clone(); + async move { + loop { + match serai.publish(&tx).await { + Ok(_) => { + log::info!("set key pair for {set:?}"); + break; + } + // This is assumed to be some ephemeral error due to the assumed fault-free + // creation + // TODO2: Differentiate connection errors from invariants + Err(e) => { + if let Ok(latest) = serai.get_latest_block_hash().await { + // Check if this failed because the keys were already set by someone + // else + if matches!(serai.get_keys(spec.set(), latest).await, Ok(Some(_))) { + log::info!("another coordinator set key pair for {:?}", set); + break; + } + + // The above block may return false if the keys have been pruned from + // the state + // Check if this session is no longer the latest session, meaning it at + // some point did set keys, and we're just operating off very + // historical data + if let Ok(Some(current_session)) = + serai.get_session(spec.set().network, latest).await + { + if current_session.0 > spec.set().session.0 { + log::warn!( + "trying to set keys for a set which isn't the latest {:?}", + set + ); + break; + } + } + } + + log::error!( + "couldn't connect to Serai node to publish set_keys TX: {:?}", + e + ); + tokio::time::sleep(core::time::Duration::from_secs(10)).await; + } + } + } + } + }, + spec, + &reader, + ) + .await; + + next_block_notification + .await + .map_err(|_| "") + .expect("tributary dropped its notifications?"); + } + } + }); + } + Err(broadcast::error::RecvError::Lagged(_)) => { + panic!("scan_tributaries lagged to handle new_tributary") + } + Err(broadcast::error::RecvError::Closed) => panic!("new_tributary sender closed"), + } + } +}