From e35aa04afbd524771e8174dc3b10333906f07f16 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sun, 12 Jan 2025 05:53:43 -0500 Subject: [PATCH] Start handling messages from the processor Does route ProcessorMessage::CosignedBlock. Rest are stubbed with TODO. --- coordinator/src/main.rs | 165 +++++++++++++++++++++++++++++++++++----- 1 file changed, 148 insertions(+), 17 deletions(-) diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 0dae9b40..f549378d 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -9,14 +9,19 @@ use ciphersuite::{ Ciphersuite, Ristretto, }; +use borsh::BorshDeserialize; + use tokio::sync::mpsc; -use serai_client::{primitives::PublicKey, Serai}; +use serai_client::{ + primitives::{NetworkId, PublicKey}, + Serai, +}; use message_queue::{Service, client::MessageQueue}; use serai_task::{Task, TaskHandle, ContinuallyRan}; -use serai_cosign::{SignedCosign, Cosigning}; +use serai_cosign::{Faulted, SignedCosign, Cosigning}; use serai_coordinator_substrate::{CanonicalEventStream, EphemeralEventStream, SignSlashReport}; use serai_coordinator_tributary::Transaction; @@ -63,18 +68,60 @@ async fn serai() -> Arc { } } -fn spawn_cosigning( - db: impl serai_db::Db, +fn spawn_cosigning( + mut db: D, serai: Arc, p2p: impl p2p::P2p, tasks_to_run_upon_cosigning: Vec, mut p2p_cosigns: mpsc::UnboundedReceiver, - mut signed_cosigns: mpsc::UnboundedReceiver, ) { - let mut cosigning = Cosigning::spawn(db, serai, p2p.clone(), tasks_to_run_upon_cosigning); + let mut cosigning = Cosigning::spawn(db.clone(), serai, p2p.clone(), tasks_to_run_upon_cosigning); tokio::spawn(async move { + const COSIGN_LOOP_INTERVAL: Duration = Duration::from_secs(5); + let last_cosign_rebroadcast = Instant::now(); loop { + // Intake our own cosigns + match Cosigning::::latest_cosigned_block_number(&db) { + Ok(latest_cosigned_block_number) => { + let mut txn = db.txn(); + // The cosigns we prior tried to intake yet failed to + let mut cosigns = ErroneousCosigns::get(&txn).unwrap_or(vec![]); + // The cosigns we have yet to intake + while let Some(cosign) = SignedCosigns::try_recv(&mut txn) { + cosigns.push(cosign); + } + + let mut erroneous = vec![]; + for cosign in cosigns { + // If this cosign is stale, move on + if cosign.cosign.block_number <= latest_cosigned_block_number { + continue; + } + + match cosigning.intake_cosign(&cosign) { + // Publish this cosign + Ok(()) => p2p.publish_cosign(cosign).await, + Err(e) => { + assert!(e.temporal(), "signed an invalid cosign: {e:?}"); + // Since this had a temporal error, queue it to try again later + erroneous.push(cosign); + } + }; + } + + // Save the cosigns with temporal errors to the database + ErroneousCosigns::set(&mut txn, &erroneous); + + txn.commit(); + } + Err(Faulted) => { + // We don't panic here as the following code rebroadcasts our cosigns which is + // necessary to inform other coordinators of the faulty cosigns + log::error!("cosigning faulted"); + } + } + let time_till_cosign_rebroadcast = (last_cosign_rebroadcast + serai_cosign::BROADCAST_FREQUENCY) .saturating_duration_since(Instant::now()); @@ -86,19 +133,98 @@ fn spawn_cosigning( } cosign = p2p_cosigns.recv() => { let cosign = cosign.expect("p2p cosigns channel was dropped?"); - let _: Result<_, _> = cosigning.intake_cosign(&cosign); - } - cosign = signed_cosigns.recv() => { - let cosign = cosign.expect("signed cosigns channel was dropped?"); - // TODO: Handle this error - let _: Result<_, _> = cosigning.intake_cosign(&cosign); - p2p.publish_cosign(cosign).await; + if cosigning.intake_cosign(&cosign).is_ok() { + p2p.publish_cosign(cosign).await; + } } + // Make sure this loop runs at least this often + () = tokio::time::sleep(COSIGN_LOOP_INTERVAL) => {} } } }); } +async fn handle_processor_messages( + mut db: impl serai_db::Db, + message_queue: Arc, + network: NetworkId, +) { + loop { + let (msg_id, msg) = { + let msg = message_queue.next(Service::Processor(network)).await; + // Check this message's sender is as expected + assert_eq!(msg.from, Service::Processor(network)); + + // Check this message's ID is as expected + let last = LastProcessorMessage::get(&db, network); + let next = last.map(|id| id + 1).unwrap_or(0); + // This should either be the last message's ID, if we committed but didn't send our ACK, or + // the expected next message's ID + assert!((Some(msg.id) == last) || (msg.id == next)); + + // TODO: Check msg.sig + + // If this is the message we already handled, and just failed to ACK, ACK it now and move on + if Some(msg.id) == last { + message_queue.ack(Service::Processor(network), msg.id).await; + continue; + } + + (msg.id, messages::ProcessorMessage::deserialize(&mut msg.msg.as_slice()).unwrap()) + }; + + let mut txn = db.txn(); + + match msg { + messages::ProcessorMessage::KeyGen(msg) => match msg { + messages::key_gen::ProcessorMessage::Participation { session, participation } => { + todo!("TODO Transaction::DkgParticipation") + } + messages::key_gen::ProcessorMessage::GeneratedKeyPair { + session, + substrate_key, + network_key, + } => todo!("TODO Transaction::DkgConfirmationPreprocess"), + messages::key_gen::ProcessorMessage::Blame { session, participant } => { + todo!("TODO Transaction::RemoveParticipant") + } + }, + messages::ProcessorMessage::Sign(msg) => match msg { + messages::sign::ProcessorMessage::InvalidParticipant { session, participant } => { + todo!("TODO Transaction::RemoveParticipant") + } + messages::sign::ProcessorMessage::Preprocesses { id, preprocesses } => { + todo!("TODO Transaction::Batch + Transaction::Sign") + } + messages::sign::ProcessorMessage::Shares { id, shares } => todo!("TODO Transaction::Sign"), + }, + messages::ProcessorMessage::Coordinator(msg) => match msg { + messages::coordinator::ProcessorMessage::CosignedBlock { cosign } => { + SignedCosigns::send(&mut txn, &cosign); + } + messages::coordinator::ProcessorMessage::SignedBatch { batch } => { + todo!("TODO Save to DB, have task read from DB and publish to Serai") + } + messages::coordinator::ProcessorMessage::SignedSlashReport { session, signature } => { + todo!("TODO Save to DB, have task read from DB and publish to Serai") + } + }, + messages::ProcessorMessage::Substrate(msg) => match msg { + messages::substrate::ProcessorMessage::SubstrateBlockAck { block, plans } => { + todo!("TODO Transaction::SubstrateBlock") + } + }, + } + + // Mark this as the last handled message + LastProcessorMessage::set(&mut txn, network, &msg_id); + // Commit the txn + txn.commit(); + // Now that we won't handle this message again, acknowledge it so we won't see it again + message_queue.ack(Service::Processor(network), msg_id).await; + } +} + #[tokio::main] async fn main() { // Override the panic handler with one which will panic if any tokio task panics @@ -217,7 +343,6 @@ async fn main() { ); // Spawn the cosign handler - let (signed_cosigns_send, signed_cosigns_recv) = mpsc::unbounded_channel(); spawn_cosigning( db.clone(), serai.clone(), @@ -225,7 +350,6 @@ async fn main() { // Run the Substrate scanners once we cosign new blocks vec![substrate_canonical_task, substrate_ephemeral_task], p2p_cosigns_recv, - signed_cosigns_recv, ); // Spawn all Tributaries on-disk @@ -254,7 +378,14 @@ async fn main() { .continually_run(substrate_task_def, vec![]), ); - // TODO: Handle processor messages + // Handle all of the Processors' messages + for network in serai_client::primitives::NETWORKS { + if network == NetworkId::Serai { + continue; + } + tokio::spawn(handle_processor_messages(db.clone(), message_queue.clone(), network)); + } - todo!("TODO") + // Run the spawned tasks ad-infinitum + core::future::pending().await }