From d854807eddb942e3a0c6a17c773fcd321d583b5f Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sat, 11 Jan 2025 21:57:58 -0500 Subject: [PATCH] Make message_queue::client::Client::send fallible Allows tasks to report the errors themselves and handle retry in our standardized way. --- coordinator/src/substrate.rs | 6 ++-- coordinator/src/tributary.rs | 3 +- message-queue/src/client.rs | 50 ++++++++++++++++++-------------- processor/bin/src/coordinator.rs | 5 ++-- 4 files changed, 35 insertions(+), 29 deletions(-) diff --git a/coordinator/src/substrate.rs b/coordinator/src/substrate.rs index 8b5d2b41..224b6278 100644 --- a/coordinator/src/substrate.rs +++ b/coordinator/src/substrate.rs @@ -69,8 +69,7 @@ impl ContinuallyRan for SubstrateTask

{ intent: msg.intent(), }; let msg = borsh::to_vec(&msg).unwrap(); - // TODO: Make this fallible - self.message_queue.queue(metadata, msg).await; + self.message_queue.queue(metadata, msg).await?; txn.commit(); made_progress = true; } @@ -132,8 +131,7 @@ impl ContinuallyRan for SubstrateTask

{ intent: msg.intent(), }; let msg = borsh::to_vec(&msg).unwrap(); - // TODO: Make this fallible - self.message_queue.queue(metadata, msg).await; + self.message_queue.queue(metadata, msg).await?; // Commit the transaction for all of this txn.commit(); diff --git a/coordinator/src/tributary.rs b/coordinator/src/tributary.rs index 55fae37c..76a034d5 100644 --- a/coordinator/src/tributary.rs +++ b/coordinator/src/tributary.rs @@ -146,8 +146,7 @@ impl ContinuallyRan for TributaryProcessorMessagesTask { intent: msg.intent(), }; let msg = borsh::to_vec(&msg).unwrap(); - // TODO: Make this fallible - self.message_queue.queue(metadata, msg).await; + self.message_queue.queue(metadata, msg).await?; txn.commit(); made_progress = true; } diff --git a/message-queue/src/client.rs b/message-queue/src/client.rs index 3aaf5a24..b503c232 100644 --- a/message-queue/src/client.rs +++ b/message-queue/src/client.rs @@ -64,22 +64,20 @@ impl MessageQueue { Self::new(service, url, priv_key) } - #[must_use] - async fn send(socket: &mut TcpStream, msg: MessageQueueRequest) -> bool { + async fn send(socket: &mut TcpStream, msg: MessageQueueRequest) -> Result<(), String> { let msg = borsh::to_vec(&msg).unwrap(); - let Ok(()) = socket.write_all(&u32::try_from(msg.len()).unwrap().to_le_bytes()).await else { - log::warn!("couldn't send the message len"); - return false; + match socket.write_all(&u32::try_from(msg.len()).unwrap().to_le_bytes()).await { + Ok(()) => {} + Err(e) => Err(format!("couldn't send the message len: {e:?}"))?, }; - let Ok(()) = socket.write_all(&msg).await else { - log::warn!("couldn't write the message"); - return false; - }; - true + match socket.write_all(&msg).await { + Ok(()) => {} + Err(e) => Err(format!("couldn't write the message: {e:?}"))?, + } + Ok(()) } - pub async fn queue(&self, metadata: Metadata, msg: Vec) { - // TODO: Should this use OsRng? Deterministic or deterministic + random may be better. + pub async fn queue(&self, metadata: Metadata, msg: Vec) -> Result<(), String> { let nonce = Zeroizing::new(::F::random(&mut OsRng)); let nonce_pub = Ristretto::generator() * nonce.deref(); let sig = SchnorrSignature::::sign( @@ -97,6 +95,21 @@ impl MessageQueue { .serialize(); let msg = MessageQueueRequest::Queue { meta: metadata, msg, sig }; + + let mut socket = match TcpStream::connect(&self.url).await { + Ok(socket) => socket, + Err(e) => Err(format!("failed to connect to the message-queue service: {e:?}"))?, + }; + Self::send(&mut socket, msg.clone()).await?; + match socket.read_u8().await { + Ok(1) => {} + Ok(b) => Err(format!("message-queue didn't return for 1 for its ack, recieved: {b}"))?, + Err(e) => Err(format!("failed to read the response from the message-queue service: {e:?}"))?, + } + Ok(()) + } + + pub async fn queue_with_retry(&self, metadata: Metadata, msg: Vec) { let mut first = true; loop { // Sleep, so we don't hammer re-attempts @@ -105,14 +118,9 @@ impl MessageQueue { } first = false; - let Ok(mut socket) = TcpStream::connect(&self.url).await else { continue }; - if !Self::send(&mut socket, msg.clone()).await { - continue; + if self.queue(metadata.clone(), msg.clone()).await.is_ok() { + break; } - if socket.read_u8().await.ok() != Some(1) { - continue; - } - break; } } @@ -136,7 +144,7 @@ impl MessageQueue { log::trace!("opened socket for next"); loop { - if !Self::send(&mut socket, msg.clone()).await { + if Self::send(&mut socket, msg.clone()).await.is_err() { continue 'outer; } let status = match socket.read_u8().await { @@ -224,7 +232,7 @@ impl MessageQueue { first = false; let Ok(mut socket) = TcpStream::connect(&self.url).await else { continue }; - if !Self::send(&mut socket, msg.clone()).await { + if Self::send(&mut socket, msg.clone()).await.is_err() { continue; } if socket.read_u8().await.ok() != Some(1) { diff --git a/processor/bin/src/coordinator.rs b/processor/bin/src/coordinator.rs index 255525a2..ffafd466 100644 --- a/processor/bin/src/coordinator.rs +++ b/processor/bin/src/coordinator.rs @@ -103,6 +103,7 @@ impl Coordinator { }); // Spawn a task to send messages to the message-queue + // TODO: Define a proper task for this and remove use of queue_with_retry tokio::spawn({ let mut db = db.clone(); async move { @@ -115,12 +116,12 @@ impl Coordinator { to: Service::Coordinator, intent: borsh::from_slice::(&msg).unwrap().intent(), }; - message_queue.queue(metadata, msg).await; + message_queue.queue_with_retry(metadata, msg).await; txn.commit(); } None => { let _ = - tokio::time::timeout(core::time::Duration::from_secs(60), sent_message_recv.recv()) + tokio::time::timeout(core::time::Duration::from_secs(6), sent_message_recv.recv()) .await; } }