Add further logs to message-queue

This commit is contained in:
Luke Parker
2024-01-01 08:53:49 -05:00
parent 8bd2a0fc56
commit fffe89765e
2 changed files with 20 additions and 5 deletions

View File

@@ -68,9 +68,13 @@ impl MessageQueue {
async fn send(socket: &mut TcpStream, msg: MessageQueueRequest) -> bool { async fn send(socket: &mut TcpStream, msg: MessageQueueRequest) -> bool {
let msg = borsh::to_vec(&msg).unwrap(); let msg = borsh::to_vec(&msg).unwrap();
let Ok(()) = socket.write_all(&u32::try_from(msg.len()).unwrap().to_le_bytes()).await else { let Ok(()) = socket.write_all(&u32::try_from(msg.len()).unwrap().to_le_bytes()).await else {
log::warn!("couldn't send the message len");
return false;
};
let Ok(()) = socket.write_all(&msg).await else {
log::warn!("couldn't write the message");
return false; return false;
}; };
let Ok(()) = socket.write_all(&msg).await else { return false };
true true
} }
@@ -134,8 +138,12 @@ impl MessageQueue {
if !Self::send(&mut socket, msg.clone()).await { if !Self::send(&mut socket, msg.clone()).await {
continue 'outer; continue 'outer;
} }
let Ok(status) = socket.read_u8().await else { let status = match socket.read_u8().await {
Ok(status) => status,
Err(e) => {
log::warn!("couldn't read status u8: {e:?}");
continue 'outer; continue 'outer;
}
}; };
// If there wasn't a message, check again in 1s // If there wasn't a message, check again in 1s
// TODO: Use a notification system here // TODO: Use a notification system here
@@ -150,12 +158,17 @@ impl MessageQueue {
// Timeout after 5 seconds in case there's an issue with the length handling // Timeout after 5 seconds in case there's an issue with the length handling
let Ok(msg) = tokio::time::timeout(core::time::Duration::from_secs(5), async { let Ok(msg) = tokio::time::timeout(core::time::Duration::from_secs(5), async {
// Read the message length // Read the message length
let Ok(len) = socket.read_u32_le().await else { let len = match socket.read_u32_le().await {
Ok(len) => len,
Err(e) => {
log::warn!("couldn't read len: {e:?}");
return vec![]; return vec![];
}
}; };
let mut buf = vec![0; usize::try_from(len).unwrap()]; let mut buf = vec![0; usize::try_from(len).unwrap()];
// Read the message // Read the message
let Ok(_) = socket.read_exact(&mut buf).await else { let Ok(_) = socket.read_exact(&mut buf).await else {
log::warn!("couldn't read the message");
return vec![]; return vec![];
}; };
buf buf

View File

@@ -563,6 +563,8 @@ async fn run<N: Network, D: Db, Co: Coordinator>(mut raw_db: D, network: N, mut
loop { loop {
let mut txn = raw_db.txn(); let mut txn = raw_db.txn();
log::trace!("new db txn in run");
let mut outer_msg = None; let mut outer_msg = None;
tokio::select! { tokio::select! {