diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index dbe185fb..496d9ac1 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -31,7 +31,7 @@ mod db; use db::*; mod tributary; -use tributary::{Transaction, ScanTributaryTask}; +use tributary::{Transaction, ScanTributaryTask, ScanTributaryMessagesTask}; mod p2p { pub use serai_coordinator_p2p::*; @@ -111,6 +111,7 @@ fn spawn_cosigning( /// This will spawn the Tributary, the Tributary scanning task, and inform the P2P network. async fn spawn_tributary( mut db: Db, + message_queue: Arc, p2p: P, p2p_add_tributary: &mpsc::UnboundedSender<(ValidatorSet, Tributary

)>, set: NewSetInformation, @@ -168,7 +169,16 @@ async fn spawn_tributary( .unwrap(); let reader = tributary.reader(); - p2p_add_tributary.send((set.set, tributary)).expect("p2p's add_tributary channel was closed?"); + p2p_add_tributary + .send((set.set, tributary.clone())) + .expect("p2p's add_tributary channel was closed?"); + + // Spawn the task to send all messages from the Tributary scanner to the message-queue + let (scan_tributary_messages_task_def, scan_tributary_messages_task) = Task::new(); + tokio::spawn( + (ScanTributaryMessagesTask { tributary_db: tributary_db.clone(), set: set.set, message_queue }) + .continually_run(scan_tributary_messages_task_def, vec![]), + ); let (scan_tributary_task_def, mut scan_tributary_task) = Task::new(); tokio::spawn( @@ -182,9 +192,10 @@ async fn spawn_tributary( tributary: reader, _p2p: PhantomData::

, }) - .continually_run(scan_tributary_task_def, vec![todo!("TODO")]), + // This is the only handle for this ScanTributaryMessagesTask, so when this task is dropped, it + // will be too + .continually_run(scan_tributary_task_def, vec![scan_tributary_messages_task]), ); - // TODO^ On Tributary block, drain this task's ProcessorMessages tokio::spawn(async move { loop { @@ -363,7 +374,7 @@ async fn main() { }; // Connect to the message-queue - let message_queue = MessageQueue::from_env(Service::Coordinator); + let message_queue = Arc::new(MessageQueue::from_env(Service::Coordinator)); // Connect to the Serai node let serai = serai().await; @@ -429,8 +440,15 @@ async fn main() { // Spawn all Tributaries on-disk for tributary in existing_tributaries_at_boot { - spawn_tributary(db.clone(), p2p.clone(), &p2p_add_tributary_send, tributary, serai_key.clone()) - .await; + spawn_tributary( + db.clone(), + message_queue.clone(), + p2p.clone(), + &p2p_add_tributary_send, + tributary, + serai_key.clone(), + ) + .await; } // TODO: Hndle processor messages diff --git a/coordinator/src/tributary/db.rs b/coordinator/src/tributary/db.rs index a3eab8db..99fbe69a 100644 --- a/coordinator/src/tributary/db.rs +++ b/coordinator/src/tributary/db.rs @@ -446,4 +446,11 @@ impl TributaryDb { ) { ProcessorMessages::send(txn, set, &message.into()); } + + pub(crate) fn try_recv_message( + txn: &mut impl DbTxn, + set: ValidatorSet, + ) -> Option { + ProcessorMessages::try_recv(txn, set) + } } diff --git a/coordinator/src/tributary/mod.rs b/coordinator/src/tributary/mod.rs index 60f005e3..6b7e3dbb 100644 --- a/coordinator/src/tributary/mod.rs +++ b/coordinator/src/tributary/mod.rs @@ -1,3 +1,17 @@ +use core::future::Future; +use std::sync::Arc; + +use serai_db::{DbTxn, Db}; + +use serai_client::validator_sets::primitives::ValidatorSet; + +use serai_task::ContinuallyRan; + +use message_queue::{Service, Metadata, client::MessageQueue}; + +use serai_coordinator_substrate::NewSetInformation; +use serai_coordinator_p2p::P2p; + mod transaction; pub use transaction::Transaction; @@ -5,3 +19,31 @@ mod db; mod scan; pub(crate) use scan::ScanTributaryTask; + +pub(crate) struct ScanTributaryMessagesTask { + pub(crate) tributary_db: TD, + pub(crate) set: ValidatorSet, + pub(crate) message_queue: Arc, +} +impl ContinuallyRan for ScanTributaryMessagesTask { + fn run_iteration(&mut self) -> impl Send + Future> { + async move { + let mut made_progress = false; + loop { + let mut txn = self.tributary_db.txn(); + let Some(msg) = db::TributaryDb::try_recv_message(&mut txn, self.set) else { break }; + let metadata = Metadata { + from: Service::Coordinator, + to: Service::Processor(self.set.network), + intent: msg.intent(), + }; + let msg = borsh::to_vec(&msg).unwrap(); + // TODO: Make this fallible + self.message_queue.queue(metadata, msg).await; + txn.commit(); + made_progress = true; + } + Ok(made_progress) + } + } +} diff --git a/coordinator/src/tributary/scan.rs b/coordinator/src/tributary/scan.rs index 6e4d8d3f..ac7fd43b 100644 --- a/coordinator/src/tributary/scan.rs +++ b/coordinator/src/tributary/scan.rs @@ -414,7 +414,7 @@ impl ContinuallyRan for ScanTributaryTask { TributaryDb::last_handled_tributary_block(&self.tributary_db, self.set) .unwrap_or((0, self.tributary.genesis())); - let mut made_progess = false; + let mut made_progress = false; while let Some(next) = self.tributary.block_after(&last_block_hash) { let block = self.tributary.block(&next).unwrap(); let block_number = last_block_number + 1; @@ -457,10 +457,10 @@ impl ContinuallyRan for ScanTributaryTask { last_block_hash = block_hash; tributary_txn.commit(); - made_progess = true; + made_progress = true; } - Ok(made_progess) + Ok(made_progress) } } }