From c05b0c9eba6c677f172f9d004089eac2452ea5d7 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sat, 11 Jan 2025 03:07:15 -0500 Subject: [PATCH] Handle Canonical, NewSet from serai-coordinator-substrate --- coordinator/src/main.rs | 191 ++++++++++++++++++++++++++----- coordinator/substrate/src/lib.rs | 2 +- processor/messages/src/lib.rs | 2 +- 3 files changed, 162 insertions(+), 33 deletions(-) diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 842f7f9c..71f73d65 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -1,4 +1,4 @@ -use core::{marker::PhantomData, ops::Deref, time::Duration}; +use core::{marker::PhantomData, ops::Deref, future::Future, time::Duration}; use std::{sync::Arc, collections::HashMap, time::Instant}; use zeroize::{Zeroize, Zeroizing}; @@ -14,13 +14,11 @@ use tokio::sync::mpsc; use scale::Encode; use serai_client::{ - primitives::{NetworkId, PublicKey, SeraiAddress}, + primitives::{PublicKey, SeraiAddress}, validator_sets::primitives::{Session, ValidatorSet}, Serai, }; -use message_queue::{Service, client::MessageQueue}; - -use ::tributary::ProvidedError; +use message_queue::{Service, Metadata, client::MessageQueue}; use serai_task::{Task, TaskHandle, ContinuallyRan}; @@ -110,7 +108,7 @@ fn spawn_cosigning( /// /// This will spawn the Tributary, the Tributary scanning task, and inform the P2P network. async fn spawn_tributary( - mut db: Db, + db: Db, message_queue: Arc, p2p: P, p2p_add_tributary: &mpsc::UnboundedSender<(ValidatorSet, Tributary

)>, @@ -123,21 +121,6 @@ async fn spawn_tributary( return; } - // TODO: Move from spawn_tributary to on NewSet - // Queue the historical Tributary for this network for deletion - // We explicitly don't queue this upon Tributary retire to give time to investigate retired - // Tributaries if questions are raised post-retiry. This gives a week after the Tributary has - // been retired to make a backup of the data directory for any investigations. - if let Some(historic_session) = set.set.session.0.checked_sub(2) { - // This may get fired several times but that isn't an issue - let mut txn = db.txn(); - TributaryCleanup::send( - &mut txn, - &ValidatorSet { network: set.set.network, session: Session(historic_session) }, - ); - txn.commit(); - } - let genesis = <[u8; 32]>::from(Blake2s::::digest((set.serai_block, set.set).encode())); // Since the Serai block will be finalized, then cosigned, before we handle this, this time will @@ -163,7 +146,7 @@ async fn spawn_tributary( } let tributary_db = tributary_db(set.set); - let mut tributary = + let tributary = Tributary::new(tributary_db.clone(), genesis, start_time, serai_key, tributary_validators, p2p) .await .unwrap(); @@ -180,7 +163,7 @@ async fn spawn_tributary( .continually_run(scan_tributary_messages_task_def, vec![]), ); - let (scan_tributary_task_def, mut scan_tributary_task) = Task::new(); + let (scan_tributary_task_def, scan_tributary_task) = Task::new(); tokio::spawn( (ScanTributaryTask { cosign_db: db.clone(), @@ -200,6 +183,143 @@ async fn spawn_tributary( tokio::spawn(tributary::run(db, set, tributary, scan_tributary_task)); } +struct SubstrateTask { + serai_key: Zeroizing<::F>, + db: Db, + message_queue: Arc, + p2p: P, + p2p_add_tributary: mpsc::UnboundedSender<(ValidatorSet, Tributary

)>, + p2p_retire_tributary: mpsc::UnboundedSender, +} + +impl ContinuallyRan for SubstrateTask

{ + fn run_iteration(&mut self) -> impl Send + Future> { + async move { + let mut made_progress = false; + + // Handle the Canonical events + for network in serai_client::primitives::NETWORKS { + loop { + let mut txn = self.db.txn(); + let Some(msg) = serai_coordinator_substrate::Canonical::try_recv(&mut txn, network) + else { + break; + }; + + match msg { + // TODO: Stop trying to confirm the DKG + messages::substrate::CoordinatorMessage::SetKeys { .. } => todo!("TODO"), + messages::substrate::CoordinatorMessage::SlashesReported { session } => { + let prior_retired = RetiredTributary::get(&txn, network); + let next_to_be_retired = + prior_retired.map(|session| Session(session.0 + 1)).unwrap_or(Session(0)); + assert_eq!(session, next_to_be_retired); + RetiredTributary::set(&mut txn, network, &session); + self + .p2p_retire_tributary + .send(ValidatorSet { network, session }) + .expect("p2p retire_tributary channel dropped?"); + } + messages::substrate::CoordinatorMessage::Block { .. } => {} + } + + let msg = messages::CoordinatorMessage::from(msg); + let metadata = Metadata { + from: Service::Coordinator, + to: Service::Processor(network), + intent: msg.intent(), + }; + let msg = borsh::to_vec(&msg).unwrap(); + // TODO: Make this fallible + self.message_queue.queue(metadata, msg).await; + txn.commit(); + made_progress = true; + } + } + + // Handle the NewSet events + loop { + let mut txn = self.db.txn(); + let Some(new_set) = serai_coordinator_substrate::NewSet::try_recv(&mut txn) else { break }; + + if let Some(historic_session) = new_set.set.session.0.checked_sub(2) { + // We should have retired this session if we're here + if RetiredTributary::get(&txn, new_set.set.network).map(|session| session.0) < + Some(historic_session) + { + /* + If we haven't, it's because we're processing the NewSet event before the retiry + event from the Canonical event stream. This happens if the Canonical event, and + then the NewSet event, is fired while we're already iterating over NewSet events. + + We break, dropping the txn, restoring this NewSet to the database, so we'll only + handle it once a future iteration of this loop handles the retiry event. + */ + break; + } + + /* + Queue this historical Tributary for deletion. + + We explicitly don't queue this upon Tributary retire, instead here, to give time to + investigate retired Tributaries if questions are raised post-retiry. This gives a + week (the duration of the following session) after the Tributary has been retired to + make a backup of the data directory for any investigations. + */ + TributaryCleanup::send( + &mut txn, + &ValidatorSet { network: new_set.set.network, session: Session(historic_session) }, + ); + } + + // Save this Tributary as active to the database + { + let mut active_tributaries = + ActiveTributaries::get(&txn).unwrap_or(Vec::with_capacity(1)); + active_tributaries.push(new_set.clone()); + ActiveTributaries::set(&mut txn, &active_tributaries); + } + + // Send GenerateKey to the processor + let msg = messages::key_gen::CoordinatorMessage::GenerateKey { + session: new_set.set.session, + threshold: new_set.threshold, + evrf_public_keys: new_set.evrf_public_keys.clone(), + }; + let msg = messages::CoordinatorMessage::from(msg); + let metadata = Metadata { + from: Service::Coordinator, + to: Service::Processor(new_set.set.network), + intent: msg.intent(), + }; + let msg = borsh::to_vec(&msg).unwrap(); + // TODO: Make this fallible + self.message_queue.queue(metadata, msg).await; + + // Commit the transaction for all of this + txn.commit(); + + // Now spawn the Tributary + // If we reboot after committing the txn, but before this is called, this will be called + // on boot + spawn_tributary( + self.db.clone(), + self.message_queue.clone(), + self.p2p.clone(), + &self.p2p_add_tributary, + new_set, + self.serai_key.clone(), + ) + .await; + + made_progress = true; + } + + Ok(made_progress) + } + } +} + #[tokio::main] async fn main() { // Override the panic handler with one which will panic if any tokio task panics @@ -298,14 +418,13 @@ async fn main() { p2p }; - // TODO: p2p_add_tributary_send, p2p_retire_tributary_send - // Spawn the Substrate scanners - // TODO: Canonical, NewSet, SignSlashReport + // TODO: SignSlashReport + let (substrate_task_def, substrate_task) = Task::new(); let (substrate_canonical_task_def, substrate_canonical_task) = Task::new(); tokio::spawn( CanonicalEventStream::new(db.clone(), serai.clone()) - .continually_run(substrate_canonical_task_def, todo!("TODO")), + .continually_run(substrate_canonical_task_def, vec![substrate_task.clone()]), ); let (substrate_ephemeral_task_def, substrate_ephemeral_task) = Task::new(); tokio::spawn( @@ -314,7 +433,7 @@ async fn main() { serai.clone(), PublicKey::from_raw((::generator() * serai_key.deref()).to_bytes()), ) - .continually_run(substrate_ephemeral_task_def, todo!("TODO")), + .continually_run(substrate_ephemeral_task_def, vec![substrate_task]), ); // Spawn the cosign handler @@ -342,10 +461,20 @@ async fn main() { .await; } - // TODO: Hndle processor messages + // Handle the events from the Substrate scanner + tokio::spawn( + (SubstrateTask { + serai_key: serai_key.clone(), + db: db.clone(), + message_queue: message_queue.clone(), + p2p: p2p.clone(), + p2p_add_tributary: p2p_add_tributary_send.clone(), + p2p_retire_tributary: p2p_retire_tributary_send.clone(), + }) + .continually_run(substrate_task_def, vec![]), + ); - // TODO: On NewSet, queue historical for deletionn, save to DB, send KeyGen, spawn tributary - // task, inform P2P network + // TODO: Handle processor messages todo!("TODO") } diff --git a/coordinator/substrate/src/lib.rs b/coordinator/substrate/src/lib.rs index f723332d..b3f00a5e 100644 --- a/coordinator/substrate/src/lib.rs +++ b/coordinator/substrate/src/lib.rs @@ -32,7 +32,7 @@ fn borsh_deserialize_validators( } /// The information for a new set. -#[derive(Debug, BorshSerialize, BorshDeserialize)] +#[derive(Clone, Debug, BorshSerialize, BorshDeserialize)] pub struct NewSetInformation { /// The set. pub set: ValidatorSet, diff --git a/processor/messages/src/lib.rs b/processor/messages/src/lib.rs index 5b3d325f..ec072fe5 100644 --- a/processor/messages/src/lib.rs +++ b/processor/messages/src/lib.rs @@ -24,7 +24,7 @@ pub mod key_gen { pub enum CoordinatorMessage { /// Instructs the Processor to begin the key generation process. /// - /// This is sent by the Coordinator when it creates the Tributary (TODO). + /// This is sent by the Coordinator when it creates the Tributary. GenerateKey { session: Session, threshold: u16, evrf_public_keys: Vec<([u8; 32], Vec)> }, /// Received participations for the specified key generation protocol. ///