Make message_queue::client::Client::send fallible

Allows tasks to report the errors themselves and handle retry in our
standardized way.
This commit is contained in:
Luke Parker
2025-01-11 21:57:58 -05:00
parent f501d46d44
commit d854807edd
4 changed files with 35 additions and 29 deletions

View File

@@ -69,8 +69,7 @@ impl<P: P2p> ContinuallyRan for SubstrateTask<P> {
intent: msg.intent(), intent: msg.intent(),
}; };
let msg = borsh::to_vec(&msg).unwrap(); let msg = borsh::to_vec(&msg).unwrap();
// TODO: Make this fallible self.message_queue.queue(metadata, msg).await?;
self.message_queue.queue(metadata, msg).await;
txn.commit(); txn.commit();
made_progress = true; made_progress = true;
} }
@@ -132,8 +131,7 @@ impl<P: P2p> ContinuallyRan for SubstrateTask<P> {
intent: msg.intent(), intent: msg.intent(),
}; };
let msg = borsh::to_vec(&msg).unwrap(); let msg = borsh::to_vec(&msg).unwrap();
// TODO: Make this fallible self.message_queue.queue(metadata, msg).await?;
self.message_queue.queue(metadata, msg).await;
// Commit the transaction for all of this // Commit the transaction for all of this
txn.commit(); txn.commit();

View File

@@ -146,8 +146,7 @@ impl<TD: DbTrait> ContinuallyRan for TributaryProcessorMessagesTask<TD> {
intent: msg.intent(), intent: msg.intent(),
}; };
let msg = borsh::to_vec(&msg).unwrap(); let msg = borsh::to_vec(&msg).unwrap();
// TODO: Make this fallible self.message_queue.queue(metadata, msg).await?;
self.message_queue.queue(metadata, msg).await;
txn.commit(); txn.commit();
made_progress = true; made_progress = true;
} }

View File

@@ -64,22 +64,20 @@ impl MessageQueue {
Self::new(service, url, priv_key) Self::new(service, url, priv_key)
} }
#[must_use] async fn send(socket: &mut TcpStream, msg: MessageQueueRequest) -> Result<(), String> {
async fn send(socket: &mut TcpStream, msg: MessageQueueRequest) -> bool {
let msg = borsh::to_vec(&msg).unwrap(); let msg = borsh::to_vec(&msg).unwrap();
let Ok(()) = socket.write_all(&u32::try_from(msg.len()).unwrap().to_le_bytes()).await else { match socket.write_all(&u32::try_from(msg.len()).unwrap().to_le_bytes()).await {
log::warn!("couldn't send the message len"); Ok(()) => {}
return false; Err(e) => Err(format!("couldn't send the message len: {e:?}"))?,
}; };
let Ok(()) = socket.write_all(&msg).await else { match socket.write_all(&msg).await {
log::warn!("couldn't write the message"); Ok(()) => {}
return false; Err(e) => Err(format!("couldn't write the message: {e:?}"))?,
}; }
true Ok(())
} }
pub async fn queue(&self, metadata: Metadata, msg: Vec<u8>) { pub async fn queue(&self, metadata: Metadata, msg: Vec<u8>) -> Result<(), String> {
// TODO: Should this use OsRng? Deterministic or deterministic + random may be better.
let nonce = Zeroizing::new(<Ristretto as Ciphersuite>::F::random(&mut OsRng)); let nonce = Zeroizing::new(<Ristretto as Ciphersuite>::F::random(&mut OsRng));
let nonce_pub = Ristretto::generator() * nonce.deref(); let nonce_pub = Ristretto::generator() * nonce.deref();
let sig = SchnorrSignature::<Ristretto>::sign( let sig = SchnorrSignature::<Ristretto>::sign(
@@ -97,6 +95,21 @@ impl MessageQueue {
.serialize(); .serialize();
let msg = MessageQueueRequest::Queue { meta: metadata, msg, sig }; let msg = MessageQueueRequest::Queue { meta: metadata, msg, sig };
let mut socket = match TcpStream::connect(&self.url).await {
Ok(socket) => socket,
Err(e) => Err(format!("failed to connect to the message-queue service: {e:?}"))?,
};
Self::send(&mut socket, msg.clone()).await?;
match socket.read_u8().await {
Ok(1) => {}
Ok(b) => Err(format!("message-queue didn't return for 1 for its ack, recieved: {b}"))?,
Err(e) => Err(format!("failed to read the response from the message-queue service: {e:?}"))?,
}
Ok(())
}
pub async fn queue_with_retry(&self, metadata: Metadata, msg: Vec<u8>) {
let mut first = true; let mut first = true;
loop { loop {
// Sleep, so we don't hammer re-attempts // Sleep, so we don't hammer re-attempts
@@ -105,16 +118,11 @@ impl MessageQueue {
} }
first = false; first = false;
let Ok(mut socket) = TcpStream::connect(&self.url).await else { continue }; if self.queue(metadata.clone(), msg.clone()).await.is_ok() {
if !Self::send(&mut socket, msg.clone()).await {
continue;
}
if socket.read_u8().await.ok() != Some(1) {
continue;
}
break; break;
} }
} }
}
pub async fn next(&self, from: Service) -> QueuedMessage { pub async fn next(&self, from: Service) -> QueuedMessage {
let msg = MessageQueueRequest::Next { from, to: self.service }; let msg = MessageQueueRequest::Next { from, to: self.service };
@@ -136,7 +144,7 @@ impl MessageQueue {
log::trace!("opened socket for next"); log::trace!("opened socket for next");
loop { loop {
if !Self::send(&mut socket, msg.clone()).await { if Self::send(&mut socket, msg.clone()).await.is_err() {
continue 'outer; continue 'outer;
} }
let status = match socket.read_u8().await { let status = match socket.read_u8().await {
@@ -224,7 +232,7 @@ impl MessageQueue {
first = false; first = false;
let Ok(mut socket) = TcpStream::connect(&self.url).await else { continue }; let Ok(mut socket) = TcpStream::connect(&self.url).await else { continue };
if !Self::send(&mut socket, msg.clone()).await { if Self::send(&mut socket, msg.clone()).await.is_err() {
continue; continue;
} }
if socket.read_u8().await.ok() != Some(1) { if socket.read_u8().await.ok() != Some(1) {

View File

@@ -103,6 +103,7 @@ impl Coordinator {
}); });
// Spawn a task to send messages to the message-queue // Spawn a task to send messages to the message-queue
// TODO: Define a proper task for this and remove use of queue_with_retry
tokio::spawn({ tokio::spawn({
let mut db = db.clone(); let mut db = db.clone();
async move { async move {
@@ -115,12 +116,12 @@ impl Coordinator {
to: Service::Coordinator, to: Service::Coordinator,
intent: borsh::from_slice::<messages::ProcessorMessage>(&msg).unwrap().intent(), intent: borsh::from_slice::<messages::ProcessorMessage>(&msg).unwrap().intent(),
}; };
message_queue.queue(metadata, msg).await; message_queue.queue_with_retry(metadata, msg).await;
txn.commit(); txn.commit();
} }
None => { None => {
let _ = let _ =
tokio::time::timeout(core::time::Duration::from_secs(60), sent_message_recv.recv()) tokio::time::timeout(core::time::Duration::from_secs(6), sent_message_recv.recv())
.await; .await;
} }
} }