From 92a4cceeebac52e3d572dfe018243b853cdfd4c6 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Wed, 15 Jan 2025 11:21:55 -0500 Subject: [PATCH] Spawn PublishBatchTask Also removes the expectation Batches published via it are sent in an ordered fashion. That won't be true if the signing protocols complete out-of-order (as possible when we are signing them in parallel). --- coordinator/src/main.rs | 25 +++++++-- coordinator/substrate/src/lib.rs | 2 - coordinator/substrate/src/publish_batch.rs | 59 +++++++++++++++------- 3 files changed, 62 insertions(+), 24 deletions(-) diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index f189ffad..5895f74c 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -23,7 +23,9 @@ use message_queue::{Service, client::MessageQueue}; use serai_task::{Task, TaskHandle, ContinuallyRan}; use serai_cosign::{Faulted, SignedCosign, Cosigning}; -use serai_coordinator_substrate::{CanonicalEventStream, EphemeralEventStream, SignSlashReport}; +use serai_coordinator_substrate::{ + CanonicalEventStream, EphemeralEventStream, SignSlashReport, SignedBatches, PublishBatchTask, +}; use serai_coordinator_tributary::{SigningProtocolRound, Signed, Transaction, SubstrateBlockPlans}; mod db; @@ -145,11 +147,24 @@ fn spawn_cosigning( }); } -async fn handle_processor_messages( +async fn handle_network( mut db: impl serai_db::Db, message_queue: Arc, + serai: Arc, network: NetworkId, ) { + // Spawn the task to publish batches for this network + { + let (publish_batch_task_def, publish_batch_task) = Task::new(); + tokio::spawn( + PublishBatchTask::new(db.clone(), serai.clone(), network) + .unwrap() + .continually_run(publish_batch_task_def, vec![]), + ); + core::mem::forget(publish_batch_task); + } + + // Handle Processor messages loop { let (msg_id, msg) = { let msg = message_queue.next(Service::Processor(network)).await; @@ -257,7 +272,7 @@ async fn handle_processor_messages( SignedCosigns::send(&mut txn, &cosign); } messages::coordinator::ProcessorMessage::SignedBatch { batch } => { - todo!("TODO PublishBatchTask") + SignedBatches::send(&mut txn, &batch); } messages::coordinator::ProcessorMessage::SignedSlashReport { session, signature } => { todo!("TODO PublishSlashReportTask") @@ -449,12 +464,12 @@ async fn main() { .continually_run(substrate_task_def, vec![]), ); - // Handle all of the Processors' messages + // Handle each of the networks for network in serai_client::primitives::NETWORKS { if network == NetworkId::Serai { continue; } - tokio::spawn(handle_processor_messages(db.clone(), message_queue.clone(), network)); + tokio::spawn(handle_network(db.clone(), message_queue.clone(), serai.clone(), network)); } // Run the spawned tasks ad-infinitum diff --git a/coordinator/substrate/src/lib.rs b/coordinator/substrate/src/lib.rs index dc8056a7..a03c05dd 100644 --- a/coordinator/substrate/src/lib.rs +++ b/coordinator/substrate/src/lib.rs @@ -175,8 +175,6 @@ impl Keys { pub struct SignedBatches; impl SignedBatches { /// Send a `SignedBatch` to publish onto Serai. - /// - /// These will be published sequentially. Out-of-order sending risks hanging the task. pub fn send(txn: &mut impl DbTxn, batch: &SignedBatch) { _public_db::SignedBatches::send(txn, batch.batch.network, batch); } diff --git a/coordinator/substrate/src/publish_batch.rs b/coordinator/substrate/src/publish_batch.rs index 6d186266..e9038d87 100644 --- a/coordinator/substrate/src/publish_batch.rs +++ b/coordinator/substrate/src/publish_batch.rs @@ -1,14 +1,21 @@ use core::future::Future; use std::sync::Arc; -use serai_db::{DbTxn, Db}; - -use serai_client::{primitives::NetworkId, SeraiError, Serai}; +#[rustfmt::skip] +use serai_client::{primitives::NetworkId, in_instructions::primitives::SignedBatch, SeraiError, Serai}; +use serai_db::{Get, DbTxn, Db, create_db}; use serai_task::ContinuallyRan; use crate::SignedBatches; +create_db!( + CoordinatorSubstrate { + LastPublishedBatch: (network: NetworkId) -> u32, + BatchesToPublish: (network: NetworkId, batch: u32) -> SignedBatch, + } +); + /// Publish `SignedBatch`s from `SignedBatches` onto Serai. pub struct PublishBatchTask { db: D, @@ -34,32 +41,50 @@ impl ContinuallyRan for PublishBatchTask { fn run_iteration(&mut self) -> impl Send + Future> { async move { - let mut made_progress = false; - + // Read from SignedBatches, which is sequential, into our own mapping loop { let mut txn = self.db.txn(); let Some(batch) = SignedBatches::try_recv(&mut txn, self.network) else { - // No batch to publish at this time break; }; - // Publish this Batch if it hasn't already been published + // If this is a Batch not yet published, save it into our unordered mapping + if LastPublishedBatch::get(&txn, self.network) < Some(batch.batch.id) { + BatchesToPublish::set(&mut txn, self.network, batch.batch.id, &batch); + } + + txn.commit(); + } + + // Synchronize our last published batch with the Serai network's + let next_to_publish = { let serai = self.serai.as_of_latest_finalized_block().await?; let last_batch = serai.in_instructions().last_batch_for_network(self.network).await?; - if last_batch < Some(batch.batch.id) { - // This stream of Batches *should* be sequential within the larger context of the Serai - // coordinator. In this library, we use a more relaxed definition and don't assert - // sequence. This does risk hanging the task, if Batch #n+1 is sent before Batch #n, but - // that is a documented fault of the `SignedBatches` API. + + let mut txn = self.db.txn(); + let mut our_last_batch = LastPublishedBatch::get(&txn, self.network); + while our_last_batch < last_batch { + let next_batch = our_last_batch.map(|batch| batch + 1).unwrap_or(0); + // Clean up the Batch to publish since it's already been published + BatchesToPublish::take(&mut txn, self.network, next_batch); + our_last_batch = Some(next_batch); + } + if let Some(last_batch) = our_last_batch { + LastPublishedBatch::set(&mut txn, self.network, &last_batch); + } + last_batch.map(|batch| batch + 1).unwrap_or(0) + }; + + let made_progress = + if let Some(batch) = BatchesToPublish::get(&self.db, self.network, next_to_publish) { self .serai .publish(&serai_client::in_instructions::SeraiInInstructions::execute_batch(batch)) .await?; - } - - txn.commit(); - made_progress = true; - } + true + } else { + false + }; Ok(made_progress) } }