From f2d9d70068f43b31f16a745d0b584716ad85cbe1 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sun, 23 Apr 2023 04:31:00 -0400 Subject: [PATCH] Reload Tributaries add_active_tributary writes the spec to disk before it returns, so even if the VecDeque it pushes to isn't popped, the tributary will still be loaded on boot. --- coordinator/src/db.rs | 44 +++++++++++++++ coordinator/src/main.rs | 15 +++-- coordinator/src/substrate/mod.rs | 12 ++-- coordinator/src/tests/tributary/chain.rs | 4 +- coordinator/src/tributary/mod.rs | 70 ++++++++++++++++++++++-- 5 files changed, 130 insertions(+), 15 deletions(-) create mode 100644 coordinator/src/db.rs diff --git a/coordinator/src/db.rs b/coordinator/src/db.rs new file mode 100644 index 00000000..f1fd2f7a --- /dev/null +++ b/coordinator/src/db.rs @@ -0,0 +1,44 @@ +pub use serai_db::*; + +use crate::tributary::TributarySpec; + +#[derive(Debug)] +pub struct MainDb(pub D); +impl MainDb { + pub fn new(db: D) -> Self { + Self(db) + } + + fn main_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec { + D::key(b"MAIN", dst, key) + } + + fn acive_tributaries_key() -> Vec { + Self::main_key(b"active_tributaries", []) + } + pub fn active_tributaries(&self) -> (Vec, Vec) { + let bytes = self.0.get(Self::acive_tributaries_key()).unwrap_or(vec![]); + let mut bytes_ref: &[u8] = bytes.as_ref(); + + let mut tributaries = vec![]; + while !bytes_ref.is_empty() { + tributaries.push(TributarySpec::read(&mut bytes_ref).unwrap()); + } + + (bytes, tributaries) + } + pub fn add_active_tributary(&mut self, spec: &TributarySpec) { + let key = Self::acive_tributaries_key(); + let (mut existing_bytes, existing) = self.active_tributaries(); + for tributary in &existing { + if tributary == spec { + return; + } + } + + spec.write(&mut existing_bytes).unwrap(); + let mut txn = self.0.txn(); + txn.put(key, existing_bytes); + txn.commit(); + } +} diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 50aebb48..0b079681 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -23,6 +23,9 @@ use ::tributary::Tributary; mod tributary; use crate::tributary::{TributarySpec, Transaction}; +mod db; +use db::MainDb; + mod p2p; pub use p2p::*; @@ -48,9 +51,9 @@ async fn run( mut processor: Pro, serai: Serai, ) { - let add_new_tributary = |spec: TributarySpec| async { + let add_new_tributary = |db, spec: TributarySpec| async { + MainDb(db).add_active_tributary(&spec); NEW_TRIBUTARIES.write().await.push_back(spec); - // TODO: Save this tributary's information to the databae before returning }; { @@ -92,6 +95,7 @@ async fn run( let mut tributaries = HashMap::<[u8; 32], ActiveTributary>::new(); + // TODO: Use a db on a distinct volume async fn add_tributary( db: D, key: Zeroizing<::F>, @@ -113,7 +117,10 @@ async fn run( tributaries.insert(tributary.genesis(), ActiveTributary { spec, tributary }); } - // TODO: Reload tributaries + // TODO: Can MainDb take a borrow? + for spec in MainDb(raw_db.clone()).active_tributaries().1 { + add_tributary(raw_db.clone(), key.clone(), p2p.clone(), &mut tributaries, spec).await; + } let mut tributary_db = tributary::TributaryDb::new(raw_db.clone()); tokio::spawn(async move { @@ -130,7 +137,7 @@ async fn run( } } - for (genesis, ActiveTributary { spec, tributary }) in tributaries.iter_mut() { + for ActiveTributary { spec, tributary } in tributaries.values() { tributary::scanner::handle_new_blocks::<_, _, P>( &mut tributary_db, &key, diff --git a/coordinator/src/substrate/mod.rs b/coordinator/src/substrate/mod.rs index d7e56ef1..fb9a2791 100644 --- a/coordinator/src/substrate/mod.rs +++ b/coordinator/src/substrate/mod.rs @@ -41,10 +41,10 @@ async fn in_set( async fn handle_new_set< D: Db, Fut: Future, - ANT: Clone + Fn(TributarySpec) -> Fut, + ANT: Clone + Fn(D, TributarySpec) -> Fut, Pro: Processor, >( - db: D, + db: &D, key: &Zeroizing<::F>, add_new_tributary: ANT, processor: &mut Pro, @@ -56,7 +56,7 @@ async fn handle_new_set< let set_data = serai.get_validator_set(set).await?.expect("NewSet for set which doesn't exist"); let spec = TributarySpec::new(block.hash(), block.time().unwrap(), set, set_data); - add_new_tributary(spec.clone()); + add_new_tributary(db.clone(), spec.clone()); // Trigger a DKG // TODO: Check how the processor handles this being fired multiple times @@ -210,7 +210,7 @@ async fn handle_batch_and_burns( async fn handle_block< D: Db, Fut: Future, - ANT: Clone + Fn(TributarySpec) -> Fut, + ANT: Clone + Fn(D, TributarySpec) -> Fut, Pro: Processor, P: P2p, >( @@ -236,7 +236,7 @@ async fn handle_block< if !SubstrateDb::::handled_event(&db.0, hash, event_id) { if let ValidatorSetsEvent::NewSet { set } = new_set { // TODO2: Use a DB on a dedicated volume - handle_new_set(db.0.clone(), key, add_new_tributary.clone(), processor, serai, &block, set) + handle_new_set(&db.0, key, add_new_tributary.clone(), processor, serai, &block, set) .await?; } else { panic!("NewSet event wasn't NewSet: {new_set:?}"); @@ -281,7 +281,7 @@ async fn handle_block< pub async fn handle_new_blocks< D: Db, Fut: Future, - ANT: Clone + Fn(TributarySpec) -> Fut, + ANT: Clone + Fn(D, TributarySpec) -> Fut, Pro: Processor, P: P2p, >( diff --git a/coordinator/src/tests/tributary/chain.rs b/coordinator/src/tests/tributary/chain.rs index ec6cb69f..f6fb19cf 100644 --- a/coordinator/src/tests/tributary/chain.rs +++ b/coordinator/src/tests/tributary/chain.rs @@ -61,7 +61,9 @@ pub fn new_spec( .unwrap(), }; - TributarySpec::new(serai_block, start_time, set, set_data) + let res = TributarySpec::new(serai_block, start_time, set, set_data); + assert_eq!(TributarySpec::read::<&[u8]>(&mut res.serialize().as_ref()).unwrap(), res); + res } pub async fn new_tributaries( diff --git a/coordinator/src/tributary/mod.rs b/coordinator/src/tributary/mod.rs index 4f23276d..433ed4be 100644 --- a/coordinator/src/tributary/mod.rs +++ b/coordinator/src/tributary/mod.rs @@ -1,5 +1,8 @@ use core::ops::Deref; -use std::{io, collections::HashMap}; +use std::{ + io::{self, Read, Write}, + collections::HashMap, +}; use zeroize::Zeroizing; use rand_core::{RngCore, CryptoRng}; @@ -7,13 +10,19 @@ use rand_core::{RngCore, CryptoRng}; use blake2::{Digest, Blake2s256}; use transcript::{Transcript, RecommendedTranscript}; -use ciphersuite::{group::ff::Field, Ciphersuite, Ristretto}; +use ciphersuite::{ + group::{ff::Field, GroupEncoding}, + Ciphersuite, Ristretto, +}; use schnorr::SchnorrSignature; use frost::Participant; -use scale::Encode; +use scale::{Encode, Decode}; -use serai_client::validator_sets::primitives::{ValidatorSet, ValidatorSetData}; +use serai_client::{ + primitives::NetworkId, + validator_sets::primitives::{Session, ValidatorSet, ValidatorSetData}, +}; #[rustfmt::skip] use tributary::{ @@ -99,6 +108,59 @@ impl TributarySpec { pub fn validators(&self) -> Vec<(::G, u64)> { self.validators.clone() } + + pub fn write(&self, writer: &mut W) -> io::Result<()> { + writer.write_all(&self.serai_block)?; + writer.write_all(&self.start_time.to_le_bytes())?; + writer.write_all(&self.set.session.0.to_le_bytes())?; + let network_encoded = self.set.network.encode(); + assert_eq!(network_encoded.len(), 1); + writer.write_all(&network_encoded)?; + writer.write_all(&u32::try_from(self.validators.len()).unwrap().to_le_bytes())?; + for validator in &self.validators { + writer.write_all(&validator.0.to_bytes())?; + writer.write_all(&validator.1.to_le_bytes())?; + } + Ok(()) + } + + pub fn serialize(&self) -> Vec { + let mut res = vec![]; + self.write(&mut res).unwrap(); + res + } + + pub fn read(reader: &mut R) -> io::Result { + let mut serai_block = [0; 32]; + reader.read_exact(&mut serai_block)?; + + let mut start_time = [0; 8]; + reader.read_exact(&mut start_time)?; + let start_time = u64::from_le_bytes(start_time); + + let mut session = [0; 4]; + reader.read_exact(&mut session)?; + let session = Session(u32::from_le_bytes(session)); + + let mut network = [0; 1]; + reader.read_exact(&mut network)?; + let network = NetworkId::decode(&mut &network[..]) + .map_err(|_| io::Error::new(io::ErrorKind::Other, "invalid network"))?; + + let mut validators_len = [0; 4]; + reader.read_exact(&mut validators_len)?; + let validators_len = usize::try_from(u32::from_le_bytes(validators_len)).unwrap(); + + let mut validators = Vec::with_capacity(validators_len); + for _ in 0 .. validators_len { + let key = Ristretto::read_G(reader)?; + let mut bond = [0; 8]; + reader.read_exact(&mut bond)?; + validators.push((key, u64::from_le_bytes(bond))); + } + + Ok(Self { serai_block, start_time, set: ValidatorSet { session, network }, validators }) + } } #[derive(Clone, PartialEq, Eq, Debug)]