From 6ccac2d0ab7d36091b09d45b8d7fcbdbe0e09977 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Mon, 17 Jul 2023 15:49:15 -0400 Subject: [PATCH] Add a message-queue connection to processor Still needs love, yet should get us closer to starting testing. --- Cargo.lock | 68 +---------- message-queue/src/main.rs | 6 +- message-queue/src/messages.rs | 1 + message-queue/src/queue.rs | 9 +- processor/Cargo.toml | 5 +- processor/src/coins/bitcoin.rs | 2 +- processor/src/coins/monero.rs | 2 +- processor/src/coordinator.rs | 154 ++++++++++++++++++++++++ processor/src/key_gen.rs | 2 +- processor/src/main.rs | 2 +- processor/src/plan.rs | 2 +- processor/src/scanner.rs | 2 +- processor/src/signer.rs | 2 +- processor/src/substrate_signer.rs | 2 +- processor/src/tests/key_gen.rs | 2 +- processor/src/tests/signer.rs | 2 +- processor/src/tests/substrate_signer.rs | 2 +- 17 files changed, 184 insertions(+), 81 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index db5b15ad..de685b5f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3279,26 +3279,6 @@ dependencies = [ "regex", ] -[[package]] -name = "gloo-net" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9902a044653b26b99f7e3693a42f171312d9be8b26b5697bd1e43ad1f8a35e10" -dependencies = [ - "futures-channel", - "futures-core", - "futures-sink", - "gloo-utils", - "js-sys", - "pin-project", - "serde", - "serde_json", - "thiserror", - "wasm-bindgen", - "wasm-bindgen-futures", - "web-sys", -] - [[package]] name = "gloo-timers" version = "0.2.6" @@ -3311,19 +3291,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "gloo-utils" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "037fcb07216cb3a30f7292bd0176b050b7b9a052ba830ef7d5d65f6dc64ba58e" -dependencies = [ - "js-sys", - "serde", - "serde_json", - "wasm-bindgen", - "web-sys", -] - [[package]] name = "group" version = "0.12.1" @@ -3954,8 +3921,6 @@ dependencies = [ "jsonrpsee-proc-macros", "jsonrpsee-server", "jsonrpsee-types", - "jsonrpsee-wasm-client", - "jsonrpsee-ws-client", "tracing", ] @@ -3965,11 +3930,7 @@ version = "0.16.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "965de52763f2004bc91ac5bcec504192440f0b568a5d621c59d9dbd6f886c3fb" dependencies = [ - "anyhow", - "futures-channel", - "futures-timer", "futures-util", - "gloo-net", "http", "jsonrpsee-core", "jsonrpsee-types", @@ -4010,7 +3971,6 @@ dependencies = [ "thiserror", "tokio", "tracing", - "wasm-bindgen-futures", ] [[package]] @@ -4081,29 +4041,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "jsonrpsee-wasm-client" -version = "0.16.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a77310456f43c6c89bcba1f6b2fc2a28300da7c341f320f5128f8c83cc63232d" -dependencies = [ - "jsonrpsee-client-transport", - "jsonrpsee-core", - "jsonrpsee-types", -] - -[[package]] -name = "jsonrpsee-ws-client" -version = "0.16.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b83daeecfc6517cfe210df24e570fb06213533dfb990318fae781f4c7119dd9" -dependencies = [ - "http", - "jsonrpsee-client-transport", - "jsonrpsee-core", - "jsonrpsee-types", -] - [[package]] name = "k256" version = "0.13.1" @@ -8786,14 +8723,13 @@ dependencies = [ "async-trait", "bincode", "bitcoin-serai", + "ciphersuite", "dalek-ff-group", "env_logger", "flexible-transcript", "frost-schnorrkel", "futures", - "group 0.13.0", "hex", - "jsonrpsee", "k256", "lazy_static", "log", @@ -8802,6 +8738,8 @@ dependencies = [ "parity-scale-codec", "rand_chacha 0.3.1", "rand_core 0.6.4", + "reqwest", + "schnorr-signatures", "secp256k1", "serai-client", "serai-db", diff --git a/message-queue/src/main.rs b/message-queue/src/main.rs index c03cefee..96d8079f 100644 --- a/message-queue/src/main.rs +++ b/message-queue/src/main.rs @@ -53,6 +53,8 @@ fn queue_message(meta: Metadata, msg: Vec, sig: SchnorrSignature) // Queue it (*QUEUES).read().unwrap()[&meta.to].write().unwrap().queue_message(QueuedMessage { from: meta.from, + // Temporary value which queue_message will override + id: u64::MAX, msg, sig: sig.serialize(), }); @@ -133,6 +135,8 @@ async fn main() { // Start server let builder = ServerBuilder::new(); + // TODO: Add middleware to check some key is present in the header, making this an authed + // connection // TODO: Set max request/response size // 5132 ^ ((b'M' << 8) | b'Q') let listen_on: &[std::net::SocketAddr] = &["0.0.0.0:2287".parse().unwrap()]; @@ -152,7 +156,7 @@ async fn main() { .unwrap(); module .register_method("next", |args, _| { - let args = args.parse::<(Service, u64, Vec)>().unwrap(); + let args = args.parse::<(Service, u64)>().unwrap(); get_next_message(args.0, args.1); Ok(()) }) diff --git a/message-queue/src/messages.rs b/message-queue/src/messages.rs index ce26aed8..a6a6d0be 100644 --- a/message-queue/src/messages.rs +++ b/message-queue/src/messages.rs @@ -14,6 +14,7 @@ pub enum Service { #[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] pub struct QueuedMessage { pub from: Service, + pub id: u64, pub msg: Vec, pub sig: Vec, } diff --git a/message-queue/src/queue.rs b/message-queue/src/queue.rs index 76599446..af189f43 100644 --- a/message-queue/src/queue.rs +++ b/message-queue/src/queue.rs @@ -33,8 +33,9 @@ impl Queue { fn message_key(&self, id: u64) -> Vec { Self::key(b"message", serde_json::to_vec(&(self.1, id)).unwrap()) } - pub(crate) fn queue_message(&mut self, msg: QueuedMessage) { + pub(crate) fn queue_message(&mut self, mut msg: QueuedMessage) { let id = self.message_count(); + msg.id = id; let msg_key = self.message_key(id); let msg_count_key = self.message_count_key(); @@ -45,7 +46,11 @@ impl Queue { } pub(crate) fn get_message(&self, id: u64) -> Option { - self.0.get(self.message_key(id)).map(|bytes| serde_json::from_slice(&bytes).unwrap()) + let msg = self.0.get(self.message_key(id)).map(|bytes| serde_json::from_slice(&bytes).unwrap()); + if let Some(msg) = msg.as_ref() { + assert_eq!(msg.id, id, "message stored at {id} has ID {}", msg.id); + } + msg } pub(crate) fn ack_message(&mut self, id: u64) { diff --git a/processor/Cargo.toml b/processor/Cargo.toml index f1c26afc..2d58939a 100644 --- a/processor/Cargo.toml +++ b/processor/Cargo.toml @@ -32,7 +32,8 @@ bincode = "1" serde_json = "1" # Cryptography -group = "0.13" +ciphersuite = { path = "../crypto/ciphersuite", features = ["ristretto"] } +schnorr = { package = "schnorr-signatures", path = "../crypto/schnorr" } transcript = { package = "flexible-transcript", path = "../crypto/transcript" } frost = { package = "modular-frost", path = "../crypto/frost", features = ["ristretto"] } @@ -60,7 +61,7 @@ serai-client = { path = "../substrate/client", default-features = false } messages = { package = "serai-processor-messages", path = "./messages" } -jsonrpsee = { version = "0.16", features = ["client"] } +reqwest = "0.11" message-queue = { package = "serai-message-queue", path = "../message-queue" } [dev-dependencies] diff --git a/processor/src/coins/bitcoin.rs b/processor/src/coins/bitcoin.rs index 1de34bca..a7e2682a 100644 --- a/processor/src/coins/bitcoin.rs +++ b/processor/src/coins/bitcoin.rs @@ -3,7 +3,7 @@ use std::{time::Duration, io, collections::HashMap}; use async_trait::async_trait; use transcript::RecommendedTranscript; -use group::ff::PrimeField; +use ciphersuite::group::ff::PrimeField; use k256::{ProjectivePoint, Scalar}; use frost::{ curve::{Curve, Secp256k1}, diff --git a/processor/src/coins/monero.rs b/processor/src/coins/monero.rs index 5cdd8163..bb83a58a 100644 --- a/processor/src/coins/monero.rs +++ b/processor/src/coins/monero.rs @@ -6,7 +6,7 @@ use zeroize::Zeroizing; use transcript::RecommendedTranscript; -use group::{ff::Field, Group}; +use ciphersuite::group::{ff::Field, Group}; use dalek_ff_group::{Scalar, EdwardsPoint}; use frost::{curve::Ed25519, ThresholdKeys}; diff --git a/processor/src/coordinator.rs b/processor/src/coordinator.rs index 0acb2541..80c40782 100644 --- a/processor/src/coordinator.rs +++ b/processor/src/coordinator.rs @@ -1,10 +1,23 @@ +use core::ops::Deref; use std::{ sync::{Arc, RwLock}, collections::VecDeque, }; +use zeroize::Zeroizing; +use rand_core::OsRng; + +use ciphersuite::{group::ff::Field, Ciphersuite, Ristretto}; +use schnorr::SchnorrSignature; + +use serde::{Serialize, Deserialize}; + use messages::{ProcessorMessage, CoordinatorMessage}; +use serai_client::primitives::NetworkId; +use message_queue::{Service, Metadata, QueuedMessage, message_challenge}; +use reqwest::Client; + #[derive(Clone, PartialEq, Eq, Debug)] pub struct Message { pub id: u64, @@ -18,6 +31,147 @@ pub trait Coordinator { async fn ack(&mut self, msg: Message); } +pub struct MessageQueue { + network: NetworkId, + priv_key: Zeroizing<::F>, + pub_key: ::G, + client: Client, + message_queue_url: String, +} + +impl MessageQueue { + pub fn new( + message_queue_url: String, + network: NetworkId, + priv_key: Zeroizing<::F>, + ) -> MessageQueue { + MessageQueue { + network, + pub_key: Ristretto::generator() * priv_key.deref(), + priv_key, + client: Client::new(), + message_queue_url, + } + } + + async fn json_call(&self, method: &'static str, params: serde_json::Value) -> serde_json::Value { + #[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] + struct JsonRpcRequest { + version: &'static str, + method: &'static str, + params: serde_json::Value, + id: u64, + } + + let res = loop { + // Make the request + if let Ok(req) = self + .client + .post(&self.message_queue_url) + .json(&JsonRpcRequest { version: "2.0", method, params: params.clone(), id: 0 }) + .send() + .await + { + // Get the response + if let Ok(res) = req.text().await { + break res; + } + } + + // Sleep 5s before trying again + tokio::time::sleep(core::time::Duration::from_secs(5)).await; + }; + + let json = + serde_json::from_str::(&res).expect("message-queue returned invalid JSON"); + if json.get("result").is_none() { + panic!("call failed: {json}"); + } + json + } + + async fn queue(&self, metadata: Metadata, msg: Vec, sig: Vec) { + let json = self.json_call("queue", serde_json::json!([metadata, msg, sig])).await; + if json.get("result") != Some(&serde_json::Value::Bool(true)) { + panic!("failed to queue message: {json}"); + } + } + + async fn next(&self) -> Message { + loop { + // TODO: Use a proper expected next ID + let json = + self.json_call("next", serde_json::json!([Service::Processor(self.network), 0])).await; + + // Convert from a Value to a type via reserialization + let msg: Option = serde_json::from_str( + &serde_json::to_string( + &json.get("result").expect("successful JSON RPC call didn't have result"), + ) + .unwrap(), + ) + .expect("next didn't return an Option"); + + // If there wasn't a message, check again in 5s + let Some(msg) = msg else { + tokio::time::sleep(core::time::Duration::from_secs(5)).await; + continue; + }; + + // Verify the message + assert_eq!(msg.from, Service::Coordinator, "non-coordinator sent us message"); + // TODO: Verify the coordinator's signature + // TODO: Check the ID is sane + let id = msg.id; + + // Deserialize it into a CoordinatorMessage + let msg: CoordinatorMessage = serde_json::from_str( + &String::from_utf8(msg.msg).expect("msg wasn't valid UTF-8 (not JSON?)"), + ) + .expect("message wasn't a JSON-encoded CoordinatorMessage"); + return Message { id, msg }; + } + } + + async fn ack(&self, id: u64, sig: Vec) { + let json = self.json_call("ack", serde_json::json!([id, sig])).await; + if json.get("result") != Some(&serde_json::Value::Bool(true)) { + panic!("failed to ack message {id}: {json}"); + } + } +} + +#[async_trait::async_trait] +impl Coordinator for MessageQueue { + async fn send(&mut self, msg: ProcessorMessage) { + let metadata = Metadata { + from: Service::Processor(self.network), + to: Service::Coordinator, + intent: msg.intent(), + }; + let msg = serde_json::to_string(&msg).unwrap(); + + // TODO: Should this use OsRng? Deterministic or deterministic + random may be better. + let nonce = Zeroizing::new(::F::random(&mut OsRng)); + let nonce_pub = Ristretto::generator() * nonce.deref(); + let sig = SchnorrSignature::::sign( + &self.priv_key, + nonce, + message_challenge(self.pub_key, metadata.to, &metadata.intent, msg.as_bytes(), nonce_pub), + ); + self.queue(metadata, msg.into_bytes(), sig.serialize()).await; + } + + async fn recv(&mut self) -> Message { + self.next().await + } + + async fn ack(&mut self, msg: Message) { + // TODO: Use a proper signature once message-queue checks ack signatures + MessageQueue::ack(self, msg.id, vec![0; 64]).await + } +} + // TODO: Move this to tests pub struct MemCoordinator(Arc>>); impl MemCoordinator { diff --git a/processor/src/key_gen.rs b/processor/src/key_gen.rs index ce670db4..9b015d78 100644 --- a/processor/src/key_gen.rs +++ b/processor/src/key_gen.rs @@ -7,7 +7,7 @@ use rand_core::SeedableRng; use rand_chacha::ChaCha20Rng; use transcript::{Transcript, RecommendedTranscript}; -use group::GroupEncoding; +use ciphersuite::group::GroupEncoding; use frost::{ curve::{Ciphersuite, Ristretto}, dkg::{Participant, ThresholdParams, ThresholdCore, ThresholdKeys, encryption::*, frost::*}, diff --git a/processor/src/main.rs b/processor/src/main.rs index ead6c52b..fac8fc84 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -6,7 +6,7 @@ use std::{ use zeroize::{Zeroize, Zeroizing}; use transcript::{Transcript, RecommendedTranscript}; -use group::GroupEncoding; +use ciphersuite::group::GroupEncoding; use frost::{curve::Ciphersuite, ThresholdKeys}; use log::{info, warn, error}; diff --git a/processor/src/plan.rs b/processor/src/plan.rs index e705661f..ea90006d 100644 --- a/processor/src/plan.rs +++ b/processor/src/plan.rs @@ -1,7 +1,7 @@ use std::io; use transcript::{Transcript, RecommendedTranscript}; -use group::GroupEncoding; +use ciphersuite::group::GroupEncoding; use frost::curve::Ciphersuite; use crate::coins::{Output, Coin}; diff --git a/processor/src/scanner.rs b/processor/src/scanner.rs index 335bf9f6..fd2fd110 100644 --- a/processor/src/scanner.rs +++ b/processor/src/scanner.rs @@ -5,7 +5,7 @@ use std::{ collections::{HashSet, HashMap}, }; -use group::GroupEncoding; +use ciphersuite::group::GroupEncoding; use frost::curve::Ciphersuite; use log::{info, debug, warn}; diff --git a/processor/src/signer.rs b/processor/src/signer.rs index 44696d34..a5883dca 100644 --- a/processor/src/signer.rs +++ b/processor/src/signer.rs @@ -3,7 +3,7 @@ use std::collections::{VecDeque, HashMap}; use rand_core::OsRng; -use group::GroupEncoding; +use ciphersuite::group::GroupEncoding; use frost::{ ThresholdKeys, sign::{Writable, PreprocessMachine, SignMachine, SignatureMachine}, diff --git a/processor/src/substrate_signer.rs b/processor/src/substrate_signer.rs index f8f00f9b..12290ad5 100644 --- a/processor/src/substrate_signer.rs +++ b/processor/src/substrate_signer.rs @@ -5,7 +5,7 @@ use rand_core::OsRng; use scale::Encode; -use group::GroupEncoding; +use ciphersuite::group::GroupEncoding; use frost::{ curve::Ristretto, ThresholdKeys, diff --git a/processor/src/tests/key_gen.rs b/processor/src/tests/key_gen.rs index 4a52f2d1..52cd710e 100644 --- a/processor/src/tests/key_gen.rs +++ b/processor/src/tests/key_gen.rs @@ -4,7 +4,7 @@ use zeroize::Zeroizing; use rand_core::{RngCore, OsRng}; -use group::GroupEncoding; +use ciphersuite::group::GroupEncoding; use frost::{Participant, ThresholdParams, tests::clone_without}; use serai_db::{DbTxn, Db, MemDb}; diff --git a/processor/src/tests/signer.rs b/processor/src/tests/signer.rs index bcc0a363..168c7a56 100644 --- a/processor/src/tests/signer.rs +++ b/processor/src/tests/signer.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use rand_core::{RngCore, OsRng}; -use group::GroupEncoding; +use ciphersuite::group::GroupEncoding; use frost::{ Participant, ThresholdKeys, dkg::tests::{key_gen, clone_without}, diff --git a/processor/src/tests/substrate_signer.rs b/processor/src/tests/substrate_signer.rs index a0fdec07..56cdb20f 100644 --- a/processor/src/tests/substrate_signer.rs +++ b/processor/src/tests/substrate_signer.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use rand_core::{RngCore, OsRng}; -use group::GroupEncoding; +use ciphersuite::group::GroupEncoding; use frost::{ curve::Ristretto, Participant,