diff --git a/message-queue/src/client.rs b/message-queue/src/client.rs index c2831e17..928c2735 100644 --- a/message-queue/src/client.rs +++ b/message-queue/src/client.rs @@ -162,11 +162,15 @@ impl MessageQueue { // Verify the message // Verify the sender is sane if matches!(self.service, Service::Processor(_)) { - assert_eq!(msg.from, Service::Coordinator, "non-coordinator sent processor message"); + assert_eq!( + msg.from, + Service::Coordinator, + "non-coordinator sent us (a processor) a message" + ); } else { assert!( matches!(msg.from, Service::Processor(_)), - "non-processor sent coordinator message" + "non-processor sent us (coordinator) a message" ); } // TODO: Verify the sender's signature diff --git a/message-queue/src/main.rs b/message-queue/src/main.rs index d807ddc8..c59abe5b 100644 --- a/message-queue/src/main.rs +++ b/message-queue/src/main.rs @@ -15,6 +15,8 @@ mod binaries { pub(crate) use serai_primitives::NetworkId; + use serai_db::{Get, DbTxn, Db as DbTrait}; + pub(crate) use jsonrpsee::{RpcModule, server::ServerBuilder}; pub(crate) use crate::messages::*; @@ -44,7 +46,12 @@ mod binaries { The message will be ordered by this service, with the order having no guarantees other than successful ordering by the time this call returns. */ - pub(crate) fn queue_message(meta: Metadata, msg: Vec, sig: SchnorrSignature) { + pub(crate) fn queue_message( + db: &RwLock, + meta: Metadata, + msg: Vec, + sig: SchnorrSignature, + ) { { let from = (*KEYS).read().unwrap()[&meta.from]; assert!( @@ -55,18 +62,45 @@ mod binaries { // Assert one, and only one of these, is the coordinator assert!(matches!(meta.from, Service::Coordinator) ^ matches!(meta.to, Service::Coordinator)); - // TODO: Verify (from, intent) hasn't been prior seen + // Verify (from, intent) hasn't been prior seen + // At the time of writing, intents should be unique even across `from`. There's a DoS where + // a service sends another service's intent, causing the other service to have their message + // dropped though. + // Including from prevents that DoS, and allows simplifying intents to solely unique within + // a service (not within all of Serai). + fn key(domain: &'static [u8], key: impl AsRef<[u8]>) -> Vec { + [&[u8::try_from(domain.len()).unwrap()], domain, key.as_ref()].concat() + } + fn intent_key(from: Service, intent: &[u8]) -> Vec { + key(b"intent_seen", bincode::serialize(&(from, intent)).unwrap()) + } + let mut db = db.write().unwrap(); + let mut txn = db.txn(); + let intent_key = intent_key(meta.from, &meta.intent); + if Get::get(&txn, &intent_key).is_some() { + log::warn!( + "Prior queued message attempted to be queued again. From: {:?} Intent: {}", + meta.from, + hex::encode(&meta.intent) + ); + return; + } + DbTxn::put(&mut txn, intent_key, []); // Queue it - let id = (*QUEUES).read().unwrap()[&meta.to].write().unwrap().queue_message(QueuedMessage { - from: meta.from, - // Temporary value which queue_message will override - id: u64::MAX, - msg, - sig: sig.serialize(), - }); + let id = (*QUEUES).read().unwrap()[&meta.to].write().unwrap().queue_message( + &mut txn, + QueuedMessage { + from: meta.from, + // Temporary value which queue_message will override + id: u64::MAX, + msg, + sig: sig.serialize(), + }, + ); log::info!("Queued message from {:?}. It is {:?} {id}", meta.from, meta.to); + DbTxn::commit(txn); } // next RPC method @@ -180,11 +214,12 @@ async fn main() { let listen_on: &[std::net::SocketAddr] = &["0.0.0.0:2287".parse().unwrap()]; let server = builder.build(listen_on).await.unwrap(); - let mut module = RpcModule::new(()); + let mut module = RpcModule::new(RwLock::new(db)); module - .register_method("queue", |args, _| { + .register_method("queue", |args, db| { let args = args.parse::<(Metadata, Vec, Vec)>().unwrap(); queue_message( + db, args.0, args.1, SchnorrSignature::::read(&mut args.2.as_slice()).unwrap(), diff --git a/message-queue/src/queue.rs b/message-queue/src/queue.rs index ab9dfc28..28273410 100644 --- a/message-queue/src/queue.rs +++ b/message-queue/src/queue.rs @@ -33,16 +33,20 @@ impl Queue { fn message_key(&self, id: u64) -> Vec { Self::key(b"message", serde_json::to_vec(&(self.1, id)).unwrap()) } - pub(crate) fn queue_message(&mut self, mut msg: QueuedMessage) -> u64 { + // TODO: This is fine as-used, yet gets from the DB while having a txn. It should get from the + // txn + pub(crate) fn queue_message( + &mut self, + txn: &mut D::Transaction<'_>, + mut msg: QueuedMessage, + ) -> u64 { let id = self.message_count(); msg.id = id; let msg_key = self.message_key(id); let msg_count_key = self.message_count_key(); - let mut txn = self.0.txn(); txn.put(msg_key, serde_json::to_vec(&msg).unwrap()); txn.put(msg_count_key, (id + 1).to_le_bytes()); - txn.commit(); id } diff --git a/tests/message-queue/src/lib.rs b/tests/message-queue/src/lib.rs index 923eb1e3..f1a54844 100644 --- a/tests/message-queue/src/lib.rs +++ b/tests/message-queue/src/lib.rs @@ -88,16 +88,19 @@ fn basic_functionality() { ) .await; - coordinator - .queue( - Metadata { - from: Service::Coordinator, - to: Service::Processor(NetworkId::Bitcoin), - intent: b"intent 2".to_vec(), - }, - b"Hello, World, again!".to_vec(), - ) - .await; + // Queue this twice, which message-queue should de-duplicate + for _ in 0 .. 2 { + coordinator + .queue( + Metadata { + from: Service::Coordinator, + to: Service::Processor(NetworkId::Bitcoin), + intent: b"intent 2".to_vec(), + }, + b"Hello, World, again!".to_vec(), + ) + .await; + } // Successfully get it let bitcoin = MessageQueue::new( @@ -121,5 +124,9 @@ fn basic_functionality() { assert_eq!(next_msg.from, Service::Coordinator); assert_eq!(next_msg.id, 1); assert_eq!(&next_msg.msg, b"Hello, World, again!"); + bitcoin.ack(1).await; + + // No further messages should be available + tokio::time::timeout(core::time::Duration::from_secs(10), bitcoin.next(2)).await.unwrap_err(); }); }