message-queue RocksDB + fix listening

This commit is contained in:
Luke Parker
2023-07-17 00:20:10 -04:00
parent 8543487db2
commit 845c2842b5
3 changed files with 13 additions and 6 deletions

1
Cargo.lock generated
View File

@@ -8696,6 +8696,7 @@ dependencies = [
"jsonrpsee", "jsonrpsee",
"lazy_static", "lazy_static",
"log", "log",
"rocksdb",
"schnorr-signatures", "schnorr-signatures",
"serai-db", "serai-db",
"serai-primitives", "serai-primitives",

View File

@@ -32,7 +32,8 @@ schnorr-signatures = { path = "../crypto/schnorr" }
log = "0.4" log = "0.4"
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
serai-db = { path = "../common/db" } serai-db = { path = "../common/db", features = ["rocksdb"] }
rocksdb = "0.21"
serai-primitives = { path = "../substrate/primitives" } serai-primitives = { path = "../substrate/primitives" }

View File

@@ -16,10 +16,12 @@ use messages::*;
mod queue; mod queue;
use queue::Queue; use queue::Queue;
type Db = Arc<rocksdb::TransactionDB>;
lazy_static::lazy_static! { lazy_static::lazy_static! {
static ref KEYS: Arc<RwLock<HashMap<Service, <Ristretto as Ciphersuite>::G>>> = static ref KEYS: Arc<RwLock<HashMap<Service, <Ristretto as Ciphersuite>::G>>> =
Arc::new(RwLock::new(HashMap::new())); Arc::new(RwLock::new(HashMap::new()));
static ref QUEUES: Arc<RwLock<HashMap<Service, RwLock<Queue<serai_db::MemDb>>>>> = static ref QUEUES: Arc<RwLock<HashMap<Service, RwLock<Queue<Db>>>>> =
Arc::new(RwLock::new(HashMap::new())); Arc::new(RwLock::new(HashMap::new()));
} }
@@ -97,8 +99,8 @@ fn ack_message(service: Service, id: u64, _signature: SchnorrSignature<Ristretto
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
// Open the DB // Open the DB
// TODO let db =
let db = serai_db::MemDb::new(); Arc::new(rocksdb::TransactionDB::open_default(std::env::var("DB_PATH").unwrap()).unwrap());
let read_key = |str| { let read_key = |str| {
let Ok(key) = std::env::var(str) else { None? }; let Ok(key) = std::env::var(str) else { None? };
@@ -132,7 +134,8 @@ async fn main() {
// Start server // Start server
let builder = ServerBuilder::new(); let builder = ServerBuilder::new();
// TODO: Set max request/response size // TODO: Set max request/response size
let listen_on: &[std::net::SocketAddr] = &["0.0.0.0".parse().unwrap()]; // 5132 ^ ((b'M' << 8) | b'Q')
let listen_on: &[std::net::SocketAddr] = &["0.0.0.0:2287".parse().unwrap()];
let server = builder.build(listen_on).await.unwrap(); let server = builder.build(listen_on).await.unwrap();
let mut module = RpcModule::new(()); let mut module = RpcModule::new(());
@@ -165,5 +168,7 @@ async fn main() {
Ok(()) Ok(())
}) })
.unwrap(); .unwrap();
server.start(module).unwrap();
// Run until stopped, which it never will
server.start(module).unwrap().stopped().await;
} }