From 79655672ef869109ba035ec0b22792ccdaab0703 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sun, 16 Apr 2023 00:51:56 -0400 Subject: [PATCH] Make progres on handling NewSet events Further bones out the coordinator. --- Cargo.lock | 4 ++ coordinator/Cargo.toml | 7 ++- coordinator/src/db.rs | 25 ++++++++++ coordinator/src/main.rs | 31 ++++++++++--- coordinator/src/p2p.rs | 25 ++++++++++ coordinator/src/substrate.rs | 90 +++++++++++++++++++++++++++++++----- 6 files changed, 164 insertions(+), 18 deletions(-) create mode 100644 coordinator/src/db.rs create mode 100644 coordinator/src/p2p.rs diff --git a/Cargo.lock b/Cargo.lock index 5615734a..37698a73 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1304,7 +1304,10 @@ checksum = "13418e745008f7349ec7e449155f419a61b92b58a99cc3616942b926825ec76b" name = "coordinator" version = "0.1.0" dependencies = [ + "async-trait", "blake2", + "ciphersuite", + "flexible-transcript", "log", "modular-frost", "processor-messages", @@ -1313,6 +1316,7 @@ dependencies = [ "serai-db", "tokio", "tributary-chain", + "zeroize", ] [[package]] diff --git a/coordinator/Cargo.toml b/coordinator/Cargo.toml index 63cb3061..4a30b5a5 100644 --- a/coordinator/Cargo.toml +++ b/coordinator/Cargo.toml @@ -14,10 +14,14 @@ all-features = true rustdoc-args = ["--cfg", "docsrs"] [dependencies] -log = "0.4" +async-trait = "0.1" + +zeroize = "^1.5" blake2 = "0.10" +transcript = { package = "flexible-transcript", path = "../crypto/transcript", features = ["recommended"] } +ciphersuite = { path = "../crypto/ciphersuite" } frost = { package = "modular-frost", path = "../crypto/frost" } serai-db = { path = "../common/db" } @@ -27,6 +31,7 @@ tributary = { package = "tributary-chain", path = "./tributary" } serai-client = { path = "../substrate/client", features = ["serai"] } +log = "0.4" tokio = { version = "1", features = ["full"] } [dev-dependencies] diff --git a/coordinator/src/db.rs b/coordinator/src/db.rs new file mode 100644 index 00000000..54de4ecb --- /dev/null +++ b/coordinator/src/db.rs @@ -0,0 +1,25 @@ +pub use serai_db::*; + +#[derive(Debug)] +pub struct MainDb(pub D); +impl MainDb { + pub fn new(db: D) -> Self { + Self(db) + } + + fn main_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec { + D::key(b"MAIN", dst, key) + } + + fn event_key(id: &[u8], index: u32) -> Vec { + Self::main_key(b"event", [id, index.to_le_bytes().as_ref()].concat()) + } + pub fn handle_event(&mut self, id: [u8; 32], index: u32) { + let mut txn = self.0.txn(); + txn.put(Self::event_key(&id, index), []); + txn.commit(); + } + pub fn handled_event(&self, id: [u8; 32], index: u32) -> bool { + self.0.get(Self::event_key(&id, index)).is_some() + } +} diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 516c1999..7240c698 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -1,22 +1,30 @@ -#![allow(dead_code)] #![allow(unused_variables)] -#![allow(unused_mut)] -use serai_db::Db; +use serai_db::{Db, MemDb}; use serai_client::Serai; +mod db; +pub use db::*; + mod transaction; +pub use transaction::Transaction as TributaryTransaction; + +mod p2p; +pub use p2p::*; + mod substrate; #[cfg(test)] mod tests; -async fn run(db: D, serai: Serai) { +async fn run(db: D, p2p: P, serai: Serai) { + let mut db = MainDb::new(db); + let mut last_substrate_block = 0; // TODO: Load from DB loop { - match substrate::handle_new_blocks(&serai, &mut last_substrate_block).await { + match substrate::handle_new_blocks(&mut db, &p2p, &serai, &mut last_substrate_block).await { Ok(()) => {} Err(e) => log::error!("couldn't communicate with serai node: {e}"), } @@ -29,5 +37,16 @@ async fn run(db: D, serai: Serai) { #[tokio::main] async fn main() { - // Open the database + let db = MemDb::new(); // TODO + let p2p = LocalP2p {}; // TODO + let serai = || async { + loop { + let Ok(serai) = Serai::new("ws://127.0.0.1:9944").await else { + log::error!("couldn't connect to the Serai node"); + continue + }; + return serai; + } + }; + run(db, p2p, serai().await).await } diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs new file mode 100644 index 00000000..918a8c1f --- /dev/null +++ b/coordinator/src/p2p.rs @@ -0,0 +1,25 @@ +use core::fmt::Debug; + +use async_trait::async_trait; + +use tributary::P2p as TributaryP2p; + +// TODO +#[async_trait] +pub trait P2p: Send + Sync + Clone + Debug + TributaryP2p {} + +// TODO +#[derive(Clone, Debug)] +pub struct LocalP2p {} + +#[async_trait] +impl TributaryP2p for LocalP2p { + async fn broadcast(&self, msg: Vec) { + // TODO + todo!() + } +} + +// TODO +#[async_trait] +impl P2p for LocalP2p {} diff --git a/coordinator/src/substrate.rs b/coordinator/src/substrate.rs index ec0b8bac..48e34685 100644 --- a/coordinator/src/substrate.rs +++ b/coordinator/src/substrate.rs @@ -1,28 +1,94 @@ -use serai_client::{SeraiError, Block, Serai}; +use std::collections::HashMap; -async fn handle_block(serai: &Serai, block: Block) -> Result, SeraiError> { +use zeroize::Zeroizing; + +use transcript::{Transcript, RecommendedTranscript}; +use ciphersuite::{group::ff::Field, Ciphersuite, Ristretto}; + +use serai_client::{SeraiError, Block, Serai, validator_sets::ValidatorSetsEvent}; + +use tributary::Tributary; + +use crate::{Db, MainDb, TributaryTransaction, P2p}; + +async fn handle_block( + db: &mut MainDb, + p2p: &P, + serai: &Serai, + block: Block, +) -> Result<(), SeraiError> { let hash = block.hash(); - let mut actions = vec![]; + + let mut event_id = 0; // If a new validator set was activated, create tributary/inform processor to do a DKG for new_set in serai.get_new_set_events(hash).await? { - todo!() + if !db.handled_event(hash, event_id) { + if let ValidatorSetsEvent::NewSet { set } = new_set { + let set_data = serai.get_validator_set(set).await?.unwrap(); + + let mut genesis = RecommendedTranscript::new(b"Serai Tributary Genesis"); + genesis.append_message(b"serai_block", hash); + genesis.append_message(b"session", set.session.0.to_le_bytes()); + genesis.append_message(b"network", set.network.0.to_le_bytes()); + let genesis = genesis.challenge(b"genesis"); + let genesis_ref: &[u8] = genesis.as_ref(); + let genesis = genesis_ref[.. 32].try_into().unwrap(); + + let mut validators = HashMap::new(); + for (participant, amount) in &set_data.participants { + validators.insert( + // TODO2: Ensure an invalid public key can't be a validator + ::read_G::<&[u8]>(&mut participant.0.as_ref()).unwrap(), + // Give one weight on Tributary per bond instance + amount.0 / set_data.bond.0, + ); + } + + // TODO: Do something with this + let tributary = Tributary::<_, TributaryTransaction, _>::new( + // TODO2: Use a DB on a dedicated volume + db.0.clone(), + genesis, + block.time().unwrap(), + Zeroizing::new(::F::ZERO), // TODO + validators, + p2p.clone(), + ) + .await + .unwrap(); + } else { + panic!("NewSet event wasn't NewSet: {new_set:?}"); + } + db.handle_event(hash, event_id); + } + event_id += 1; } // If a key pair was confirmed, inform the processor for key_gen in serai.get_key_gen_events(hash).await? { - todo!() + if !db.handled_event(hash, event_id) { + // TODO: Handle key_gen + db.handle_event(hash, event_id); + } + event_id += 1; } // If batch, tell processor of block acknowledged/burns - for new_set in serai.get_batch_events(hash).await? { - todo!() + for batch in serai.get_batch_events(hash).await? { + if !db.handled_event(hash, event_id) { + // TODO: Handle batch + db.handle_event(hash, event_id); + } + event_id += 1; } - Ok(actions) + Ok(()) } -pub(crate) async fn handle_new_blocks( +pub async fn handle_new_blocks( + db: &mut MainDb, + p2p: &P, serai: &Serai, last_substrate_block: &mut u64, ) -> Result<(), SeraiError> { @@ -35,7 +101,9 @@ pub(crate) async fn handle_new_blocks( let mut latest = Some(latest); for b in (*last_substrate_block + 1) ..= latest_number { - let actions = handle_block( + handle_block( + db, + p2p, serai, if b == latest_number { latest.take().unwrap() @@ -44,7 +112,7 @@ pub(crate) async fn handle_new_blocks( }, ) .await?; - // TODO: Handle actions, update the DB + // TODO: Update the DB *last_substrate_block += 1; }