Intent based de-duplication in MessageQueue

This commit is contained in:
Luke Parker
2023-08-29 17:05:01 -04:00
parent 83c25eff03
commit e9fca37181
4 changed files with 76 additions and 26 deletions

View File

@@ -162,11 +162,15 @@ impl MessageQueue {
// Verify the message // Verify the message
// Verify the sender is sane // Verify the sender is sane
if matches!(self.service, Service::Processor(_)) { if matches!(self.service, Service::Processor(_)) {
assert_eq!(msg.from, Service::Coordinator, "non-coordinator sent processor message"); assert_eq!(
msg.from,
Service::Coordinator,
"non-coordinator sent us (a processor) a message"
);
} else { } else {
assert!( assert!(
matches!(msg.from, Service::Processor(_)), matches!(msg.from, Service::Processor(_)),
"non-processor sent coordinator message" "non-processor sent us (coordinator) a message"
); );
} }
// TODO: Verify the sender's signature // TODO: Verify the sender's signature

View File

@@ -15,6 +15,8 @@ mod binaries {
pub(crate) use serai_primitives::NetworkId; pub(crate) use serai_primitives::NetworkId;
use serai_db::{Get, DbTxn, Db as DbTrait};
pub(crate) use jsonrpsee::{RpcModule, server::ServerBuilder}; pub(crate) use jsonrpsee::{RpcModule, server::ServerBuilder};
pub(crate) use crate::messages::*; pub(crate) use crate::messages::*;
@@ -44,7 +46,12 @@ mod binaries {
The message will be ordered by this service, with the order having no guarantees other than The message will be ordered by this service, with the order having no guarantees other than
successful ordering by the time this call returns. successful ordering by the time this call returns.
*/ */
pub(crate) fn queue_message(meta: Metadata, msg: Vec<u8>, sig: SchnorrSignature<Ristretto>) { pub(crate) fn queue_message(
db: &RwLock<Db>,
meta: Metadata,
msg: Vec<u8>,
sig: SchnorrSignature<Ristretto>,
) {
{ {
let from = (*KEYS).read().unwrap()[&meta.from]; let from = (*KEYS).read().unwrap()[&meta.from];
assert!( assert!(
@@ -55,18 +62,45 @@ mod binaries {
// Assert one, and only one of these, is the coordinator // Assert one, and only one of these, is the coordinator
assert!(matches!(meta.from, Service::Coordinator) ^ matches!(meta.to, Service::Coordinator)); assert!(matches!(meta.from, Service::Coordinator) ^ matches!(meta.to, Service::Coordinator));
// TODO: Verify (from, intent) hasn't been prior seen // Verify (from, intent) hasn't been prior seen
// At the time of writing, intents should be unique even across `from`. There's a DoS where
// a service sends another service's intent, causing the other service to have their message
// dropped though.
// Including from prevents that DoS, and allows simplifying intents to solely unique within
// a service (not within all of Serai).
fn key(domain: &'static [u8], key: impl AsRef<[u8]>) -> Vec<u8> {
[&[u8::try_from(domain.len()).unwrap()], domain, key.as_ref()].concat()
}
fn intent_key(from: Service, intent: &[u8]) -> Vec<u8> {
key(b"intent_seen", bincode::serialize(&(from, intent)).unwrap())
}
let mut db = db.write().unwrap();
let mut txn = db.txn();
let intent_key = intent_key(meta.from, &meta.intent);
if Get::get(&txn, &intent_key).is_some() {
log::warn!(
"Prior queued message attempted to be queued again. From: {:?} Intent: {}",
meta.from,
hex::encode(&meta.intent)
);
return;
}
DbTxn::put(&mut txn, intent_key, []);
// Queue it // Queue it
let id = (*QUEUES).read().unwrap()[&meta.to].write().unwrap().queue_message(QueuedMessage { let id = (*QUEUES).read().unwrap()[&meta.to].write().unwrap().queue_message(
&mut txn,
QueuedMessage {
from: meta.from, from: meta.from,
// Temporary value which queue_message will override // Temporary value which queue_message will override
id: u64::MAX, id: u64::MAX,
msg, msg,
sig: sig.serialize(), sig: sig.serialize(),
}); },
);
log::info!("Queued message from {:?}. It is {:?} {id}", meta.from, meta.to); log::info!("Queued message from {:?}. It is {:?} {id}", meta.from, meta.to);
DbTxn::commit(txn);
} }
// next RPC method // next RPC method
@@ -180,11 +214,12 @@ async fn main() {
let listen_on: &[std::net::SocketAddr] = &["0.0.0.0:2287".parse().unwrap()]; let listen_on: &[std::net::SocketAddr] = &["0.0.0.0:2287".parse().unwrap()];
let server = builder.build(listen_on).await.unwrap(); let server = builder.build(listen_on).await.unwrap();
let mut module = RpcModule::new(()); let mut module = RpcModule::new(RwLock::new(db));
module module
.register_method("queue", |args, _| { .register_method("queue", |args, db| {
let args = args.parse::<(Metadata, Vec<u8>, Vec<u8>)>().unwrap(); let args = args.parse::<(Metadata, Vec<u8>, Vec<u8>)>().unwrap();
queue_message( queue_message(
db,
args.0, args.0,
args.1, args.1,
SchnorrSignature::<Ristretto>::read(&mut args.2.as_slice()).unwrap(), SchnorrSignature::<Ristretto>::read(&mut args.2.as_slice()).unwrap(),

View File

@@ -33,16 +33,20 @@ impl<D: Db> Queue<D> {
fn message_key(&self, id: u64) -> Vec<u8> { fn message_key(&self, id: u64) -> Vec<u8> {
Self::key(b"message", serde_json::to_vec(&(self.1, id)).unwrap()) Self::key(b"message", serde_json::to_vec(&(self.1, id)).unwrap())
} }
pub(crate) fn queue_message(&mut self, mut msg: QueuedMessage) -> u64 { // TODO: This is fine as-used, yet gets from the DB while having a txn. It should get from the
// txn
pub(crate) fn queue_message(
&mut self,
txn: &mut D::Transaction<'_>,
mut msg: QueuedMessage,
) -> u64 {
let id = self.message_count(); let id = self.message_count();
msg.id = id; msg.id = id;
let msg_key = self.message_key(id); let msg_key = self.message_key(id);
let msg_count_key = self.message_count_key(); let msg_count_key = self.message_count_key();
let mut txn = self.0.txn();
txn.put(msg_key, serde_json::to_vec(&msg).unwrap()); txn.put(msg_key, serde_json::to_vec(&msg).unwrap());
txn.put(msg_count_key, (id + 1).to_le_bytes()); txn.put(msg_count_key, (id + 1).to_le_bytes());
txn.commit();
id id
} }

View File

@@ -88,6 +88,8 @@ fn basic_functionality() {
) )
.await; .await;
// Queue this twice, which message-queue should de-duplicate
for _ in 0 .. 2 {
coordinator coordinator
.queue( .queue(
Metadata { Metadata {
@@ -98,6 +100,7 @@ fn basic_functionality() {
b"Hello, World, again!".to_vec(), b"Hello, World, again!".to_vec(),
) )
.await; .await;
}
// Successfully get it // Successfully get it
let bitcoin = MessageQueue::new( let bitcoin = MessageQueue::new(
@@ -121,5 +124,9 @@ fn basic_functionality() {
assert_eq!(next_msg.from, Service::Coordinator); assert_eq!(next_msg.from, Service::Coordinator);
assert_eq!(next_msg.id, 1); assert_eq!(next_msg.id, 1);
assert_eq!(&next_msg.msg, b"Hello, World, again!"); assert_eq!(&next_msg.msg, b"Hello, World, again!");
bitcoin.ack(1).await;
// No further messages should be available
tokio::time::timeout(core::time::Duration::from_secs(10), bitcoin.next(2)).await.unwrap_err();
}); });
} }