Integrate coordinator with MessageQueue and RocksDB

Also resolves a couple TODOs.
This commit is contained in:
Luke Parker
2023-07-18 01:53:51 -04:00
parent a05961974a
commit a7c9c1ef55
12 changed files with 309 additions and 237 deletions

View File

@@ -14,10 +14,13 @@ use rand_core::OsRng;
use ciphersuite::{group::ff::Field, Ciphersuite, Ristretto};
use serai_db::{DbTxn, Db, MemDb};
use serai_db::{DbTxn, Db};
use serai_env as env;
use serai_client::{Public, Signature, Serai};
use message_queue::{Service, client::MessageQueue};
use tokio::{
sync::{
mpsc::{self, UnboundedSender},
@@ -322,7 +325,7 @@ pub async fn handle_p2p<D: Db, P: P2p>(
// connection
// In order to reduce congestion though, we should at least check if we take value from
// this message before running spawn
// TODO
// TODO2
tokio::spawn({
let tributaries = tributaries.clone();
async move {
@@ -398,7 +401,7 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
key_gen::ProcessorMessage::GeneratedKeyPair { id, substrate_key, coin_key } => {
assert_eq!(
id.set.network, msg.network,
"processor claimed to be a different network than it was for SubstrateBlockAck",
"processor claimed to be a different network than it was for GeneratedKeyPair",
);
// TODO: Also check the other KeyGenId fields
@@ -415,13 +418,15 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
Signature([0; 64]), // TODO
);
match serai.publish(&tx).await {
Ok(hash) => {
log::info!("voted on key pair for {:?} in TX {}", id.set, hex::encode(hash))
}
Err(e) => {
log::error!("couldn't connect to Serai node to publish vote TX: {:?}", e);
todo!(); // TODO
loop {
match serai.publish(&tx).await {
Ok(hash) => {
log::info!("voted on key pair for {:?} in TX {}", id.set, hex::encode(hash))
}
Err(e) => {
log::error!("couldn't connect to Serai node to publish vote TX: {:?}", e);
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
}
@@ -507,19 +512,22 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
// TODO: Check this key's key pair's substrate key is authorized to publish batches
// TODO: Check the batch ID is an atomic increment
match serai.publish(&Serai::execute_batch(batch.clone())).await {
Ok(hash) => {
log::info!(
"executed batch {:?} {} (block {}) in TX {}",
batch.batch.network,
batch.batch.id,
hex::encode(batch.batch.block),
hex::encode(hash),
)
}
Err(e) => {
log::error!("couldn't connect to Serai node to publish batch TX: {:?}", e);
todo!(); // TODO
loop {
match serai.publish(&Serai::execute_batch(batch.clone())).await {
Ok(hash) => {
log::info!(
"executed batch {:?} {} (block {}) in TX {}",
batch.batch.network,
batch.batch.id,
hex::encode(batch.batch.block),
hex::encode(hash),
);
break;
}
Err(e) => {
log::error!("couldn't connect to Serai node to publish batch TX: {:?}", e);
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
}
@@ -661,12 +669,17 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
#[tokio::main]
async fn main() {
let db = MemDb::new(); // TODO
let db = Arc::new(
rocksdb::TransactionDB::<rocksdb::SingleThreaded>::open_default(
env::var("DB_PATH").expect("path to DB wasn't specified"),
)
.unwrap(),
);
let key = Zeroizing::new(<Ristretto as Ciphersuite>::F::ZERO); // TODO
let p2p = LocalP2p::new(1).swap_remove(0); // TODO
let processors = processors::MemProcessors::new(); // TODO
let processors = Arc::new(MessageQueue::new(Service::Coordinator));
let serai = || async {
loop {

View File

@@ -1,14 +1,10 @@
use std::{
sync::Arc,
collections::{VecDeque, HashMap},
};
use tokio::sync::RwLock;
use std::sync::Arc;
use serai_client::primitives::NetworkId;
use processor_messages::{ProcessorMessage, CoordinatorMessage};
use message_queue::{Service, Metadata, client::MessageQueue};
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct Message {
pub id: u64,
@@ -23,27 +19,31 @@ pub trait Processors: 'static + Send + Sync + Clone {
async fn ack(&mut self, msg: Message);
}
// TODO: Move this to tests
#[derive(Clone)]
pub struct MemProcessors(pub Arc<RwLock<HashMap<NetworkId, VecDeque<CoordinatorMessage>>>>);
impl MemProcessors {
#[allow(clippy::new_without_default)]
pub fn new() -> MemProcessors {
MemProcessors(Arc::new(RwLock::new(HashMap::new())))
}
}
#[async_trait::async_trait]
impl Processors for MemProcessors {
impl Processors for Arc<MessageQueue> {
async fn send(&self, network: NetworkId, msg: CoordinatorMessage) {
let mut processors = self.0.write().await;
let processor = processors.entry(network).or_insert_with(VecDeque::new);
processor.push_back(msg);
let metadata =
Metadata { from: self.service, to: Service::Processor(network), intent: msg.intent() };
let msg = serde_json::to_string(&msg).unwrap();
self.queue(metadata, msg.into_bytes()).await;
}
async fn recv(&mut self) -> Message {
todo!()
// TODO: Use a proper expected next ID
let msg = self.next(0).await;
let network = match msg.from {
Service::Processor(network) => network,
Service::Coordinator => panic!("coordinator sent coordinator message"),
};
let id = msg.id;
// Deserialize it into a ProcessorMessage
let msg: ProcessorMessage =
serde_json::from_slice(&msg.msg).expect("message wasn't a JSON-encoded ProcessorMessage");
return Message { id, network, msg };
}
async fn ack(&mut self, _: Message) {
todo!()
async fn ack(&mut self, msg: Message) {
MessageQueue::ack(self, msg.id).await
}
}

View File

@@ -1 +1,38 @@
use std::{
sync::Arc,
collections::{VecDeque, HashMap},
};
use serai_client::primitives::NetworkId;
use processor_messages::CoordinatorMessage;
use tokio::sync::RwLock;
use crate::processors::{Message, Processors};
pub mod tributary;
#[derive(Clone)]
pub struct MemProcessors(pub Arc<RwLock<HashMap<NetworkId, VecDeque<CoordinatorMessage>>>>);
impl MemProcessors {
#[allow(clippy::new_without_default)]
pub fn new() -> MemProcessors {
MemProcessors(Arc::new(RwLock::new(HashMap::new())))
}
}
#[async_trait::async_trait]
impl Processors for MemProcessors {
async fn send(&self, network: NetworkId, msg: CoordinatorMessage) {
let mut processors = self.0.write().await;
let processor = processors.entry(network).or_insert_with(VecDeque::new);
processor.push_back(msg);
}
async fn recv(&mut self) -> Message {
todo!()
}
async fn ack(&mut self, _: Message) {
todo!()
}
}

View File

@@ -19,10 +19,12 @@ use processor_messages::{
use tributary::{Transaction as TransactionTrait, Tributary};
use crate::{
processors::MemProcessors,
LocalP2p,
tributary::{TributaryDb, Transaction, TributarySpec, scanner::handle_new_blocks},
tests::tributary::{new_keys, new_spec, new_tributaries, run_tributaries, wait_for_tx_inclusion},
tests::{
MemProcessors,
tributary::{new_keys, new_spec, new_tributaries, run_tributaries, wait_for_tx_inclusion},
},
};
#[tokio::test]