diff --git a/coordinator/src/db.rs b/coordinator/src/db.rs index a7edd3cc..6f309d46 100644 --- a/coordinator/src/db.rs +++ b/coordinator/src/db.rs @@ -1,5 +1,8 @@ -use scale::Encode; -use serai_client::primitives::{NetworkId, BlockHash}; +use scale::{Encode, Decode}; +use serai_client::{ + primitives::{NetworkId, BlockHash}, + in_instructions::primitives::SignedBatch, +}; pub use serai_db::*; @@ -95,4 +98,19 @@ impl<'a, D: Db> MainDb<'a, D> { pub fn first_preprocess(getter: &G, id: [u8; 32]) -> Option> { getter.get(Self::first_preprocess_key(id)) } + + fn batch_key(network: NetworkId, id: u32) -> Vec { + Self::main_key(b"batch", (network, id).encode()) + } + pub fn save_batch(&mut self, batch: SignedBatch) { + let mut txn = self.0.txn(); + txn.put(Self::batch_key(batch.batch.network, batch.batch.id), batch.encode()); + txn.commit(); + } + pub fn batch(&self, network: NetworkId, id: u32) -> Option { + self + .0 + .get(Self::batch_key(network, id)) + .map(|batch| SignedBatch::decode(&mut batch.as_ref()).unwrap()) + } } diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 6f8c6969..6743acfb 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -332,7 +332,7 @@ pub async fn handle_p2p( let mut msg = p2p.receive().await; // Spawn a dedicated task to handle this message, ensuring any singularly latent message // doesn't hold everything up - // TODO2: Move to one task per tributary + // TODO2: Move to one task per tributary (or two. One for Tendermint, one for Tributary) tokio::spawn({ let p2p = p2p.clone(); let tributaries = tributaries.clone(); @@ -474,6 +474,8 @@ pub async fn handle_processors( let pub_key = Ristretto::generator() * key.deref(); loop { + // TODO: Dispatch this message to a task dedicated to handling this processor, preventing one + // processor from holding up all the others let msg = processors.recv().await; // TODO2: This is slow, and only works as long as a network only has a single Tributary @@ -626,41 +628,68 @@ pub async fn handle_processors( "processor sent us a batch for a different network than it was for", ); // TODO: Check this key's key pair's substrate key is authorized to publish batches - // TODO: Handle the fact batch n+1 can be signed before batch n - let tx = Serai::execute_batch(batch.clone()); - loop { - match serai.publish(&tx).await { - Ok(_) => { - log::info!( - "executed batch {:?} {} (block {})", - batch.batch.network, - batch.batch.id, - hex::encode(batch.batch.block), - ); - break; - } - Err(e) => { - if let Ok(latest_block) = serai.get_latest_block().await { - if let Ok(Some(last)) = - serai.get_last_batch_for_network(latest_block.hash(), batch.batch.network).await - { - if last >= batch.batch.id { - log::info!( - "another coordinator executed batch {:?} {} (block {})", - batch.batch.network, - batch.batch.id, - hex::encode(batch.batch.block), + // Save this batch to the disk + MainDb::new(&mut db).save_batch(batch); + + /* + Use a dedicated task to publish batches due to the latency potentially incurred. + + This does not guarantee the batch has actually been published when the message is + `ack`ed to message-queue. Accordingly, if we reboot, these batches would be dropped + (as we wouldn't see the `Update` again, triggering our re-attempt to publish). + + The solution to this is to have the task try not to publish the batch which caused it + to be spawned, yet all saved batches which have yet to published. This does risk having + multiple tasks trying to publish all pending batches, yet these aren't notably complex. + */ + tokio::spawn({ + let mut db = db.clone(); + let serai = serai.clone(); + let network = msg.network; + async move { + // Since we have a new batch, publish all batches yet to be published to Serai + // This handles the edge-case where batch n+1 is signed before batch n is + while let Some(batch) = { + // Get the next-to-execute batch ID + let next = { + let mut first = true; + loop { + if !first { + log::error!( + "couldn't connect to Serai node to get the next batch ID for {network:?}", ); - break; + tokio::time::sleep(Duration::from_secs(5)).await; } + first = false; + + let Ok(latest_block) = serai.get_latest_block().await else { continue }; + let Ok(last) = + serai.get_last_batch_for_network(latest_block.hash(), network).await + else { + continue; + }; + break if let Some(last) = last { last + 1 } else { 0 }; } + }; + + // If we have this batch, attempt to publish it + MainDb::new(&mut db).batch(network, next) + } { + let id = batch.batch.id; + let block = batch.batch.block; + + let tx = Serai::execute_batch(batch); + // This publish may fail if this transactions already exists in the mempool, which + // is possible, or if this batch was already executed on-chain + // Either case will have eventual resolution and be handled by the above check on + // if this block should execute + if serai.publish(&tx).await.is_ok() { + log::info!("published batch {network:?} {id} (block {})", hex::encode(block)); } - log::error!("couldn't connect to Serai node to publish batch TX: {:?}", e); - tokio::time::sleep(Duration::from_secs(10)).await; } } - } + }); None }