Files
serai/message-queue/src/main.rs

282 lines
9.2 KiB
Rust
Raw Normal View History

pub(crate) use std::{
sync::{Arc, RwLock},
collections::HashMap,
};
use dalek_ff_group::Ristretto;
Smash the singular `Ciphersuite` trait into multiple This helps identify where the various functionalities are used, or rather, not used. The `Ciphersuite` trait present in `patches/ciphersuite`, facilitating the entire FCMP++ tree, only requires the markers _and_ canonical point decoding. I've opened a PR to upstream such a trait into `group` (https://github.com/zkcrypto/group/pull/68). `WrappedGroup` is still justified for as long as `Group::generator` exists. Moving `::generator()` to its own trait, on an independent structure (upstream) would be massively appreciated. @tarcieri also wanted to update from `fn generator()` to `const GENERATOR`, which would encourage further discussion on https://github.com/zkcrypto/group/issues/32 and https://github.com/zkcrypto/group/issues/45, which have been stagnant. The `Id` trait is occasionally used yet really should be first off the chopping block. Finally, `WithPreferredHash` is only actually used around a third of the time, which more than justifies it being a separate trait. --- Updates `dalek_ff_group::Scalar` to directly re-export `curve25519_dalek::Scalar`, as without issue. `dalek_ff_group::RistrettoPoint` also could be replaced with an export of `curve25519_dalek::RistrettoPoint`, yet the coordinator relies on how we implemented `Hash` on it for the hell of it so it isn't worth it at this time. `dalek_ff_group::EdwardsPoint` can't be replaced for an re-export of `curve25519_dalek::SubgroupPoint` as it doesn't implement `zeroize`, `subtle` traits within a released, non-yanked version. Relevance to https://github.com/serai-dex/serai/issues/201 and https://github.com/dalek-cryptography/curve25519-dalek/issues/811#issuecomment-3247732746. Also updates the `Ristretto` ciphersuite to prefer `Blake2b-512` over `SHA2-512`. In order to maintain compliance with FROST's IETF standard, `modular-frost` defines its own ciphersuite for Ristretto which still uses `SHA2-512`.
2025-09-03 12:25:37 -04:00
pub(crate) use ciphersuite::{group::GroupEncoding, WrappedGroup, GroupCanonicalEncoding};
pub(crate) use schnorr_signatures::SchnorrSignature;
pub(crate) use serai_primitives::ExternalNetworkId;
pub(crate) use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpListener,
};
use serai_db::{Get, DbTxn, Db as DbTrait};
pub(crate) use crate::messages::*;
pub(crate) use crate::queue::Queue;
2023-07-17 00:20:10 -04:00
#[cfg(all(feature = "parity-db", not(feature = "rocksdb")))]
pub(crate) type Db = Arc<serai_db::ParityDb>;
#[cfg(feature = "rocksdb")]
pub(crate) type Db = serai_db::RocksDB;
#[allow(clippy::type_complexity)]
mod clippy {
use super::*;
use once_cell::sync::Lazy;
Smash the singular `Ciphersuite` trait into multiple This helps identify where the various functionalities are used, or rather, not used. The `Ciphersuite` trait present in `patches/ciphersuite`, facilitating the entire FCMP++ tree, only requires the markers _and_ canonical point decoding. I've opened a PR to upstream such a trait into `group` (https://github.com/zkcrypto/group/pull/68). `WrappedGroup` is still justified for as long as `Group::generator` exists. Moving `::generator()` to its own trait, on an independent structure (upstream) would be massively appreciated. @tarcieri also wanted to update from `fn generator()` to `const GENERATOR`, which would encourage further discussion on https://github.com/zkcrypto/group/issues/32 and https://github.com/zkcrypto/group/issues/45, which have been stagnant. The `Id` trait is occasionally used yet really should be first off the chopping block. Finally, `WithPreferredHash` is only actually used around a third of the time, which more than justifies it being a separate trait. --- Updates `dalek_ff_group::Scalar` to directly re-export `curve25519_dalek::Scalar`, as without issue. `dalek_ff_group::RistrettoPoint` also could be replaced with an export of `curve25519_dalek::RistrettoPoint`, yet the coordinator relies on how we implemented `Hash` on it for the hell of it so it isn't worth it at this time. `dalek_ff_group::EdwardsPoint` can't be replaced for an re-export of `curve25519_dalek::SubgroupPoint` as it doesn't implement `zeroize`, `subtle` traits within a released, non-yanked version. Relevance to https://github.com/serai-dex/serai/issues/201 and https://github.com/dalek-cryptography/curve25519-dalek/issues/811#issuecomment-3247732746. Also updates the `Ristretto` ciphersuite to prefer `Blake2b-512` over `SHA2-512`. In order to maintain compliance with FROST's IETF standard, `modular-frost` defines its own ciphersuite for Ristretto which still uses `SHA2-512`.
2025-09-03 12:25:37 -04:00
pub(crate) static KEYS: Lazy<Arc<RwLock<HashMap<Service, <Ristretto as WrappedGroup>::G>>>> =
Lazy::new(|| Arc::new(RwLock::new(HashMap::new())));
pub(crate) static QUEUES: Lazy<Arc<RwLock<HashMap<(Service, Service), RwLock<Queue<Db>>>>>> =
Lazy::new(|| Arc::new(RwLock::new(HashMap::new())));
}
pub(crate) use self::clippy::*;
mod messages;
mod queue;
#[global_allocator]
static ALLOCATOR: zalloc::ZeroizingAlloc<std::alloc::System> =
zalloc::ZeroizingAlloc(std::alloc::System);
// queue RPC method
/*
Queues a message to be delivered from a processor to a coordinator, or vice versa.
Messages are authenticated to be coming from the claimed service. Recipient services SHOULD
independently verify signatures.
The metadata specifies an intent. Only one message, for a specified intent, will be delivered.
This allows services to safely send messages multiple times without them being delivered
multiple times.
The message will be ordered by this service, with the order having no guarantees other than
successful ordering by the time this call returns.
*/
pub(crate) fn queue_message(
db: &mut Db,
meta: &Metadata,
msg: Vec<u8>,
sig: SchnorrSignature<Ristretto>,
) {
{
2023-12-08 08:32:15 -05:00
let from = KEYS.read().unwrap()[&meta.from];
assert!(
sig.verify(from, message_challenge(meta.from, from, meta.to, &meta.intent, &msg, sig.R))
);
}
// Assert one, and only one of these, is the coordinator
assert!(matches!(meta.from, Service::Coordinator) ^ matches!(meta.to, Service::Coordinator));
// Lock the queue
2024-12-08 18:27:01 -05:00
let queue_lock = QUEUES.read().unwrap();
let mut queue_lock = queue_lock[&(meta.from, meta.to)].write().unwrap();
// Verify (from, to, intent) hasn't been prior seen
fn key(domain: &'static [u8], key: impl AsRef<[u8]>) -> Vec<u8> {
[&[u8::try_from(domain.len()).unwrap()], domain, key.as_ref()].concat()
}
fn intent_key(from: Service, to: Service, intent: &[u8]) -> Vec<u8> {
key(b"intent_seen", borsh::to_vec(&(from, to, intent)).unwrap())
}
let mut txn = db.txn();
let intent_key = intent_key(meta.from, meta.to, &meta.intent);
if Get::get(&txn, &intent_key).is_some() {
log::warn!(
"Prior queued message attempted to be queued again. From: {:?} To: {:?} Intent: {}",
meta.from,
meta.to,
hex::encode(&meta.intent)
);
return;
}
DbTxn::put(&mut txn, intent_key, []);
// Queue it
let id = queue_lock.queue_message(
&mut txn,
QueuedMessage {
from: meta.from,
// Temporary value which queue_message will override
id: u64::MAX,
msg,
sig: sig.serialize(),
},
);
log::info!("Queued message. From: {:?} To: {:?} ID: {id}", meta.from, meta.to);
DbTxn::commit(txn);
}
2023-07-16 20:53:58 -04:00
// next RPC method
/*
Gets the next message in queue for the named services.
This is not authenticated due to the fact every nonce would have to be saved to prevent
replays, or a challenge-response protocol implemented. Neither are worth doing when there
should be no sensitive data on this server.
*/
pub(crate) fn get_next_message(from: Service, to: Service) -> Option<QueuedMessage> {
2023-12-08 08:32:15 -05:00
let queue_outer = QUEUES.read().unwrap();
let queue = queue_outer[&(from, to)].read().unwrap();
let next = queue.last_acknowledged().map_or(0, |i| i + 1);
queue.get_message(next)
}
// ack RPC method
/*
Acknowledges a message as received and handled, meaning it'll no longer be returned as the next
message.
*/
pub(crate) fn ack_message(from: Service, to: Service, id: u64, sig: SchnorrSignature<Ristretto>) {
{
2023-12-08 08:32:15 -05:00
let to_key = KEYS.read().unwrap()[&to];
assert!(sig.verify(to_key, ack_challenge(to, to_key, from, id, sig.R)));
}
// Is it:
// The acknowledged message should be > last acknowledged OR
// The acknowledged message should be >=
// It's the first if we save messages as acknowledged before acknowledging them
// It's the second if we acknowledge messages before saving them as acknowledged
// TODO: Check only a proper message is being acked
2025-08-25 09:17:29 -04:00
log::info!("Acknowledging From: {from:?} To: {to:?} ID: {id}");
2023-12-08 08:32:15 -05:00
QUEUES.read().unwrap()[&(from, to)].write().unwrap().ack_message(id)
}
#[tokio::main(flavor = "current_thread")]
async fn main() {
// Override the panic handler with one which will panic if any tokio task panics
{
let existing = std::panic::take_hook();
std::panic::set_hook(Box::new(move |panic| {
existing(panic);
const MSG: &str = "exiting the process due to a task panicking";
println!("{MSG}");
log::error!("{MSG}");
std::process::exit(1);
}));
}
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", serai_env::var("RUST_LOG").unwrap_or_else(|| "info".to_string()));
}
env_logger::init();
log::info!("Starting message-queue service...");
// Open the DB
#[allow(unused_variables, unreachable_code)]
let db = {
#[cfg(all(feature = "parity-db", feature = "rocksdb"))]
panic!("built with parity-db and rocksdb");
#[cfg(all(feature = "parity-db", not(feature = "rocksdb")))]
let db =
serai_db::new_parity_db(&serai_env::var("DB_PATH").expect("path to DB wasn't specified"));
#[cfg(feature = "rocksdb")]
let db =
serai_db::new_rocksdb(&serai_env::var("DB_PATH").expect("path to DB wasn't specified"));
db
};
let read_key = |str| {
let key = serai_env::var(str)?;
Smash the singular `Ciphersuite` trait into multiple This helps identify where the various functionalities are used, or rather, not used. The `Ciphersuite` trait present in `patches/ciphersuite`, facilitating the entire FCMP++ tree, only requires the markers _and_ canonical point decoding. I've opened a PR to upstream such a trait into `group` (https://github.com/zkcrypto/group/pull/68). `WrappedGroup` is still justified for as long as `Group::generator` exists. Moving `::generator()` to its own trait, on an independent structure (upstream) would be massively appreciated. @tarcieri also wanted to update from `fn generator()` to `const GENERATOR`, which would encourage further discussion on https://github.com/zkcrypto/group/issues/32 and https://github.com/zkcrypto/group/issues/45, which have been stagnant. The `Id` trait is occasionally used yet really should be first off the chopping block. Finally, `WithPreferredHash` is only actually used around a third of the time, which more than justifies it being a separate trait. --- Updates `dalek_ff_group::Scalar` to directly re-export `curve25519_dalek::Scalar`, as without issue. `dalek_ff_group::RistrettoPoint` also could be replaced with an export of `curve25519_dalek::RistrettoPoint`, yet the coordinator relies on how we implemented `Hash` on it for the hell of it so it isn't worth it at this time. `dalek_ff_group::EdwardsPoint` can't be replaced for an re-export of `curve25519_dalek::SubgroupPoint` as it doesn't implement `zeroize`, `subtle` traits within a released, non-yanked version. Relevance to https://github.com/serai-dex/serai/issues/201 and https://github.com/dalek-cryptography/curve25519-dalek/issues/811#issuecomment-3247732746. Also updates the `Ristretto` ciphersuite to prefer `Blake2b-512` over `SHA2-512`. In order to maintain compliance with FROST's IETF standard, `modular-frost` defines its own ciphersuite for Ristretto which still uses `SHA2-512`.
2025-09-03 12:25:37 -04:00
let mut repr = <<Ristretto as WrappedGroup>::G as GroupEncoding>::Repr::default();
repr.as_mut().copy_from_slice(&hex::decode(key).unwrap());
Smash the singular `Ciphersuite` trait into multiple This helps identify where the various functionalities are used, or rather, not used. The `Ciphersuite` trait present in `patches/ciphersuite`, facilitating the entire FCMP++ tree, only requires the markers _and_ canonical point decoding. I've opened a PR to upstream such a trait into `group` (https://github.com/zkcrypto/group/pull/68). `WrappedGroup` is still justified for as long as `Group::generator` exists. Moving `::generator()` to its own trait, on an independent structure (upstream) would be massively appreciated. @tarcieri also wanted to update from `fn generator()` to `const GENERATOR`, which would encourage further discussion on https://github.com/zkcrypto/group/issues/32 and https://github.com/zkcrypto/group/issues/45, which have been stagnant. The `Id` trait is occasionally used yet really should be first off the chopping block. Finally, `WithPreferredHash` is only actually used around a third of the time, which more than justifies it being a separate trait. --- Updates `dalek_ff_group::Scalar` to directly re-export `curve25519_dalek::Scalar`, as without issue. `dalek_ff_group::RistrettoPoint` also could be replaced with an export of `curve25519_dalek::RistrettoPoint`, yet the coordinator relies on how we implemented `Hash` on it for the hell of it so it isn't worth it at this time. `dalek_ff_group::EdwardsPoint` can't be replaced for an re-export of `curve25519_dalek::SubgroupPoint` as it doesn't implement `zeroize`, `subtle` traits within a released, non-yanked version. Relevance to https://github.com/serai-dex/serai/issues/201 and https://github.com/dalek-cryptography/curve25519-dalek/issues/811#issuecomment-3247732746. Also updates the `Ristretto` ciphersuite to prefer `Blake2b-512` over `SHA2-512`. In order to maintain compliance with FROST's IETF standard, `modular-frost` defines its own ciphersuite for Ristretto which still uses `SHA2-512`.
2025-09-03 12:25:37 -04:00
Some(<Ristretto as GroupCanonicalEncoding>::from_canonical_bytes(&repr).unwrap())
};
let register_service = |service, key| {
2023-12-08 08:32:15 -05:00
KEYS.write().unwrap().insert(service, key);
let mut queues = QUEUES.write().unwrap();
if service == Service::Coordinator {
for network in serai_primitives::EXTERNAL_NETWORKS {
queues.insert(
(service, Service::Processor(network)),
RwLock::new(Queue(db.clone(), service, Service::Processor(network))),
);
}
} else {
queues.insert(
(service, Service::Coordinator),
RwLock::new(Queue(db.clone(), service, Service::Coordinator)),
);
}
};
// Make queues for each ExternalNetworkId
for network in serai_primitives::EXTERNAL_NETWORKS {
// Use a match so we error if the list of NetworkIds changes
let Some(key) = read_key(match network {
ExternalNetworkId::Bitcoin => "BITCOIN_KEY",
ExternalNetworkId::Ethereum => "ETHEREUM_KEY",
ExternalNetworkId::Monero => "MONERO_KEY",
2023-08-01 00:47:36 -04:00
}) else {
continue;
};
register_service(Service::Processor(network), key);
}
// And the coordinator's
register_service(Service::Coordinator, read_key("COORDINATOR_KEY").unwrap());
// Start server
2023-07-17 00:20:10 -04:00
// 5132 ^ ((b'M' << 8) | b'Q')
let server = TcpListener::bind("0.0.0.0:2287").await.unwrap();
loop {
let (mut socket, _) = server.accept().await.unwrap();
// TODO: Add a magic value with a key at the start of the connection to make this authed
let mut db = db.clone();
tokio::spawn(async move {
while let Ok(msg_len) = socket.read_u32_le().await {
let mut buf = vec![0; usize::try_from(msg_len).unwrap()];
let Ok(_) = socket.read_exact(&mut buf).await else { break };
let msg = borsh::from_slice(&buf).unwrap();
match msg {
MessageQueueRequest::Queue { meta, msg, sig } => {
queue_message(
&mut db,
&meta,
msg,
SchnorrSignature::<Ristretto>::read(&mut sig.as_slice()).unwrap(),
);
2023-12-16 20:54:24 -05:00
let Ok(()) = socket.write_all(&[1]).await else { break };
}
MessageQueueRequest::Next { from, to } => match get_next_message(from, to) {
Some(msg) => {
2023-12-16 20:54:24 -05:00
let Ok(()) = socket.write_all(&[1]).await else { break };
let msg = borsh::to_vec(&msg).unwrap();
let len = u32::try_from(msg.len()).unwrap();
2023-12-16 20:54:24 -05:00
let Ok(()) = socket.write_all(&len.to_le_bytes()).await else { break };
let Ok(()) = socket.write_all(&msg).await else { break };
}
None => {
2023-12-16 20:54:24 -05:00
let Ok(()) = socket.write_all(&[0]).await else { break };
}
},
MessageQueueRequest::Ack { from, to, id, sig } => {
ack_message(
from,
to,
id,
SchnorrSignature::<Ristretto>::read(&mut sig.as_slice()).unwrap(),
);
2023-12-16 20:54:24 -05:00
let Ok(()) = socket.write_all(&[1]).await else { break };
}
}
}
});
}
}