mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-08 12:19:24 +00:00
Use parity-db in current Dockerfiles (#455)
* Use redb and in Dockerfiles The motivation for redb was to remove the multiple rocksdb compile times from CI. * Correct feature flagging of coordinator and message-queue in Dockerfiles * Correct message-queue DB type alias * Use consistent table typing in redb * Correct rebase artifacts * Correct removal of binaries feature from message-queue * Correct processor feature flagging * Replace redb with parity-db It still has much better compile times yet doesn't block when creating multiple transactions. It also is actively maintained and doesn't grow our tree. The MPT aspects are irrelevant. * Correct stray Redb * clippy warning * Correct txn get
This commit is contained in:
@@ -1,154 +1,149 @@
|
||||
#[cfg(feature = "binaries")]
|
||||
mod messages;
|
||||
#[cfg(feature = "binaries")]
|
||||
mod queue;
|
||||
|
||||
#[cfg(feature = "binaries")]
|
||||
mod binaries {
|
||||
pub(crate) use std::{
|
||||
sync::{Arc, RwLock},
|
||||
collections::HashMap,
|
||||
};
|
||||
pub(crate) use std::{
|
||||
sync::{Arc, RwLock},
|
||||
collections::HashMap,
|
||||
};
|
||||
|
||||
pub(crate) use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
|
||||
pub(crate) use schnorr_signatures::SchnorrSignature;
|
||||
pub(crate) use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
|
||||
pub(crate) use schnorr_signatures::SchnorrSignature;
|
||||
|
||||
pub(crate) use serai_primitives::NetworkId;
|
||||
pub(crate) use serai_primitives::NetworkId;
|
||||
|
||||
pub(crate) use tokio::{
|
||||
io::{AsyncReadExt, AsyncWriteExt},
|
||||
net::TcpListener,
|
||||
};
|
||||
pub(crate) use tokio::{
|
||||
io::{AsyncReadExt, AsyncWriteExt},
|
||||
net::TcpListener,
|
||||
};
|
||||
|
||||
use serai_db::{Get, DbTxn, Db as DbTrait};
|
||||
use serai_db::{Get, DbTxn, Db as DbTrait};
|
||||
|
||||
pub(crate) use crate::messages::*;
|
||||
pub(crate) use crate::messages::*;
|
||||
|
||||
pub(crate) use crate::queue::Queue;
|
||||
pub(crate) use crate::queue::Queue;
|
||||
|
||||
pub(crate) type Db = serai_db::RocksDB;
|
||||
#[cfg(all(feature = "parity-db", not(feature = "rocksdb")))]
|
||||
pub(crate) type Db = Arc<serai_db::ParityDb>;
|
||||
#[cfg(feature = "rocksdb")]
|
||||
pub(crate) type Db = serai_db::RocksDB;
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
mod clippy {
|
||||
use super::*;
|
||||
use once_cell::sync::Lazy;
|
||||
pub(crate) static KEYS: Lazy<Arc<RwLock<HashMap<Service, <Ristretto as Ciphersuite>::G>>>> =
|
||||
Lazy::new(|| Arc::new(RwLock::new(HashMap::new())));
|
||||
pub(crate) static QUEUES: Lazy<Arc<RwLock<HashMap<(Service, Service), RwLock<Queue<Db>>>>>> =
|
||||
Lazy::new(|| Arc::new(RwLock::new(HashMap::new())));
|
||||
}
|
||||
pub(crate) use self::clippy::*;
|
||||
#[allow(clippy::type_complexity)]
|
||||
mod clippy {
|
||||
use super::*;
|
||||
use once_cell::sync::Lazy;
|
||||
pub(crate) static KEYS: Lazy<Arc<RwLock<HashMap<Service, <Ristretto as Ciphersuite>::G>>>> =
|
||||
Lazy::new(|| Arc::new(RwLock::new(HashMap::new())));
|
||||
pub(crate) static QUEUES: Lazy<Arc<RwLock<HashMap<(Service, Service), RwLock<Queue<Db>>>>>> =
|
||||
Lazy::new(|| Arc::new(RwLock::new(HashMap::new())));
|
||||
}
|
||||
pub(crate) use self::clippy::*;
|
||||
|
||||
// queue RPC method
|
||||
/*
|
||||
Queues a message to be delivered from a processor to a coordinator, or vice versa.
|
||||
// queue RPC method
|
||||
/*
|
||||
Queues a message to be delivered from a processor to a coordinator, or vice versa.
|
||||
|
||||
Messages are authenticated to be coming from the claimed service. Recipient services SHOULD
|
||||
independently verify signatures.
|
||||
Messages are authenticated to be coming from the claimed service. Recipient services SHOULD
|
||||
independently verify signatures.
|
||||
|
||||
The metadata specifies an intent. Only one message, for a specified intent, will be delivered.
|
||||
This allows services to safely send messages multiple times without them being delivered
|
||||
multiple times.
|
||||
The metadata specifies an intent. Only one message, for a specified intent, will be delivered.
|
||||
This allows services to safely send messages multiple times without them being delivered
|
||||
multiple times.
|
||||
|
||||
The message will be ordered by this service, with the order having no guarantees other than
|
||||
successful ordering by the time this call returns.
|
||||
*/
|
||||
pub(crate) fn queue_message(
|
||||
db: &mut Db,
|
||||
meta: Metadata,
|
||||
msg: Vec<u8>,
|
||||
sig: SchnorrSignature<Ristretto>,
|
||||
) {
|
||||
{
|
||||
let from = (*KEYS).read().unwrap()[&meta.from];
|
||||
assert!(
|
||||
sig.verify(from, message_challenge(meta.from, from, meta.to, &meta.intent, &msg, sig.R))
|
||||
);
|
||||
}
|
||||
|
||||
// Assert one, and only one of these, is the coordinator
|
||||
assert!(matches!(meta.from, Service::Coordinator) ^ matches!(meta.to, Service::Coordinator));
|
||||
|
||||
// Verify (from, to, intent) hasn't been prior seen
|
||||
fn key(domain: &'static [u8], key: impl AsRef<[u8]>) -> Vec<u8> {
|
||||
[&[u8::try_from(domain.len()).unwrap()], domain, key.as_ref()].concat()
|
||||
}
|
||||
fn intent_key(from: Service, to: Service, intent: &[u8]) -> Vec<u8> {
|
||||
key(b"intent_seen", borsh::to_vec(&(from, to, intent)).unwrap())
|
||||
}
|
||||
let mut txn = db.txn();
|
||||
let intent_key = intent_key(meta.from, meta.to, &meta.intent);
|
||||
if Get::get(&txn, &intent_key).is_some() {
|
||||
log::warn!(
|
||||
"Prior queued message attempted to be queued again. From: {:?} To: {:?} Intent: {}",
|
||||
meta.from,
|
||||
meta.to,
|
||||
hex::encode(&meta.intent)
|
||||
);
|
||||
return;
|
||||
}
|
||||
DbTxn::put(&mut txn, intent_key, []);
|
||||
|
||||
// Queue it
|
||||
let id = (*QUEUES).read().unwrap()[&(meta.from, meta.to)].write().unwrap().queue_message(
|
||||
&mut txn,
|
||||
QueuedMessage {
|
||||
from: meta.from,
|
||||
// Temporary value which queue_message will override
|
||||
id: u64::MAX,
|
||||
msg,
|
||||
sig: sig.serialize(),
|
||||
},
|
||||
The message will be ordered by this service, with the order having no guarantees other than
|
||||
successful ordering by the time this call returns.
|
||||
*/
|
||||
pub(crate) fn queue_message(
|
||||
db: &mut Db,
|
||||
meta: Metadata,
|
||||
msg: Vec<u8>,
|
||||
sig: SchnorrSignature<Ristretto>,
|
||||
) {
|
||||
{
|
||||
let from = (*KEYS).read().unwrap()[&meta.from];
|
||||
assert!(
|
||||
sig.verify(from, message_challenge(meta.from, from, meta.to, &meta.intent, &msg, sig.R))
|
||||
);
|
||||
|
||||
log::info!("Queued message. From: {:?} To: {:?} ID: {id}", meta.from, meta.to);
|
||||
DbTxn::commit(txn);
|
||||
}
|
||||
|
||||
// next RPC method
|
||||
/*
|
||||
Gets the next message in queue for the named services.
|
||||
// Assert one, and only one of these, is the coordinator
|
||||
assert!(matches!(meta.from, Service::Coordinator) ^ matches!(meta.to, Service::Coordinator));
|
||||
|
||||
This is not authenticated due to the fact every nonce would have to be saved to prevent
|
||||
replays, or a challenge-response protocol implemented. Neither are worth doing when there
|
||||
should be no sensitive data on this server.
|
||||
*/
|
||||
pub(crate) fn get_next_message(from: Service, to: Service) -> Option<QueuedMessage> {
|
||||
let queue_outer = (*QUEUES).read().unwrap();
|
||||
let queue = queue_outer[&(from, to)].read().unwrap();
|
||||
let next = queue.last_acknowledged().map(|i| i + 1).unwrap_or(0);
|
||||
queue.get_message(next)
|
||||
// Verify (from, to, intent) hasn't been prior seen
|
||||
fn key(domain: &'static [u8], key: impl AsRef<[u8]>) -> Vec<u8> {
|
||||
[&[u8::try_from(domain.len()).unwrap()], domain, key.as_ref()].concat()
|
||||
}
|
||||
|
||||
// ack RPC method
|
||||
/*
|
||||
Acknowledges a message as received and handled, meaning it'll no longer be returned as the next
|
||||
message.
|
||||
*/
|
||||
pub(crate) fn ack_message(from: Service, to: Service, id: u64, sig: SchnorrSignature<Ristretto>) {
|
||||
{
|
||||
let to_key = (*KEYS).read().unwrap()[&to];
|
||||
assert!(sig.verify(to_key, ack_challenge(to, to_key, from, id, sig.R)));
|
||||
}
|
||||
|
||||
// Is it:
|
||||
// The acknowledged message should be > last acknowledged OR
|
||||
// The acknowledged message should be >=
|
||||
// It's the first if we save messages as acknowledged before acknowledging them
|
||||
// It's the second if we acknowledge messages before saving them as acknowledged
|
||||
// TODO: Check only a proper message is being acked
|
||||
|
||||
log::info!("Acknowledging From: {:?} To: {:?} ID: {}", from, to, id);
|
||||
|
||||
(*QUEUES).read().unwrap()[&(from, to)].write().unwrap().ack_message(id)
|
||||
fn intent_key(from: Service, to: Service, intent: &[u8]) -> Vec<u8> {
|
||||
key(b"intent_seen", borsh::to_vec(&(from, to, intent)).unwrap())
|
||||
}
|
||||
let mut txn = db.txn();
|
||||
let intent_key = intent_key(meta.from, meta.to, &meta.intent);
|
||||
if Get::get(&txn, &intent_key).is_some() {
|
||||
log::warn!(
|
||||
"Prior queued message attempted to be queued again. From: {:?} To: {:?} Intent: {}",
|
||||
meta.from,
|
||||
meta.to,
|
||||
hex::encode(&meta.intent)
|
||||
);
|
||||
return;
|
||||
}
|
||||
DbTxn::put(&mut txn, intent_key, []);
|
||||
|
||||
// Queue it
|
||||
let id = (*QUEUES).read().unwrap()[&(meta.from, meta.to)].write().unwrap().queue_message(
|
||||
&mut txn,
|
||||
QueuedMessage {
|
||||
from: meta.from,
|
||||
// Temporary value which queue_message will override
|
||||
id: u64::MAX,
|
||||
msg,
|
||||
sig: sig.serialize(),
|
||||
},
|
||||
);
|
||||
|
||||
log::info!("Queued message. From: {:?} To: {:?} ID: {id}", meta.from, meta.to);
|
||||
DbTxn::commit(txn);
|
||||
}
|
||||
|
||||
// next RPC method
|
||||
/*
|
||||
Gets the next message in queue for the named services.
|
||||
|
||||
This is not authenticated due to the fact every nonce would have to be saved to prevent
|
||||
replays, or a challenge-response protocol implemented. Neither are worth doing when there
|
||||
should be no sensitive data on this server.
|
||||
*/
|
||||
pub(crate) fn get_next_message(from: Service, to: Service) -> Option<QueuedMessage> {
|
||||
let queue_outer = (*QUEUES).read().unwrap();
|
||||
let queue = queue_outer[&(from, to)].read().unwrap();
|
||||
let next = queue.last_acknowledged().map(|i| i + 1).unwrap_or(0);
|
||||
queue.get_message(next)
|
||||
}
|
||||
|
||||
// ack RPC method
|
||||
/*
|
||||
Acknowledges a message as received and handled, meaning it'll no longer be returned as the next
|
||||
message.
|
||||
*/
|
||||
pub(crate) fn ack_message(from: Service, to: Service, id: u64, sig: SchnorrSignature<Ristretto>) {
|
||||
{
|
||||
let to_key = (*KEYS).read().unwrap()[&to];
|
||||
assert!(sig.verify(to_key, ack_challenge(to, to_key, from, id, sig.R)));
|
||||
}
|
||||
|
||||
// Is it:
|
||||
// The acknowledged message should be > last acknowledged OR
|
||||
// The acknowledged message should be >=
|
||||
// It's the first if we save messages as acknowledged before acknowledging them
|
||||
// It's the second if we acknowledge messages before saving them as acknowledged
|
||||
// TODO: Check only a proper message is being acked
|
||||
|
||||
log::info!("Acknowledging From: {:?} To: {:?} ID: {}", from, to, id);
|
||||
|
||||
(*QUEUES).read().unwrap()[&(from, to)].write().unwrap().ack_message(id)
|
||||
}
|
||||
|
||||
#[cfg(feature = "binaries")]
|
||||
#[tokio::main(flavor = "current_thread")]
|
||||
async fn main() {
|
||||
use binaries::*;
|
||||
|
||||
// Override the panic handler with one which will panic if any tokio task panics
|
||||
{
|
||||
let existing = std::panic::take_hook();
|
||||
@@ -169,7 +164,18 @@ async fn main() {
|
||||
log::info!("Starting message-queue service...");
|
||||
|
||||
// Open the DB
|
||||
let db = serai_db::new_rocksdb(&serai_env::var("DB_PATH").expect("path to DB wasn't specified"));
|
||||
#[allow(unused_variables, unreachable_code)]
|
||||
let db = {
|
||||
#[cfg(all(feature = "parity-db", feature = "rocksdb"))]
|
||||
panic!("built with parity-db and rocksdb");
|
||||
#[cfg(all(feature = "parity-db", not(feature = "rocksdb")))]
|
||||
let db =
|
||||
serai_db::new_parity_db(&serai_env::var("DB_PATH").expect("path to DB wasn't specified"));
|
||||
#[cfg(feature = "rocksdb")]
|
||||
let db =
|
||||
serai_db::new_rocksdb(&serai_env::var("DB_PATH").expect("path to DB wasn't specified"));
|
||||
db
|
||||
};
|
||||
|
||||
let read_key = |str| {
|
||||
let key = serai_env::var(str)?;
|
||||
@@ -272,8 +278,3 @@ async fn main() {
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "binaries"))]
|
||||
fn main() {
|
||||
panic!("To run binaries, please build with `--feature binaries`.");
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user