diff --git a/orchestration/docker-compose.yml b/orchestration/docker-compose.yml index e37c08b4..094a739e 100644 --- a/orchestration/docker-compose.yml +++ b/orchestration/docker-compose.yml @@ -25,7 +25,7 @@ services: entrypoint: /scripts/entry-dev.sh # TODO: Use expose, not ports ports: - - "18443" + - "18443:18443" ethereum: profiles: @@ -50,7 +50,7 @@ services: entrypoint: /scripts/entry-dev.sh # TODO: Use expose, not ports ports: - - "18081" + - "18081:18081" # Infrastructure diff --git a/processor/src/main.rs b/processor/src/main.rs index 2cb9f93e..608a708a 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -218,7 +218,7 @@ async fn sign_plans( .schedulers .get_mut(key.as_ref()) .expect("didn't have a scheduler for a key we have a plan for") - .created_output(branch.expected, branch.actual); + .created_output::(txn, branch.expected, branch.actual); } if let Some((tx, eventuality)) = tx { @@ -322,7 +322,7 @@ async fn handle_coordinator_msg( // TODO: This assumes the coin has a monotonic clock for its blocks' times, which // isn't a viable assumption - // If the latest block number is 10, then the block indexd by 1 has 10 confirms + // If the latest block number is 10, then the block indexed by 1 has 10 confirms // 10 + 1 - 10 = 1 while get_block( coin, @@ -381,7 +381,7 @@ async fn handle_coordinator_msg( substrate_mutable.scanner.rotate_key(txn, activation_number, key).await; substrate_mutable .schedulers - .insert(key.to_bytes().as_ref().to_vec(), Scheduler::::new(key)); + .insert(key.to_bytes().as_ref().to_vec(), Scheduler::::new::(txn, key)); tributary_mutable .signers @@ -434,7 +434,7 @@ async fn handle_coordinator_msg( .schedulers .get_mut(&key_vec) .expect("key we don't have a scheduler for acknowledged a block") - .schedule(outputs, payments); + .schedule::(txn, outputs, payments); coordinator .send(ProcessorMessage::Coordinator( @@ -498,14 +498,14 @@ async fn boot( // The scanner has no long-standing orders to re-issue let (mut scanner, active_keys) = Scanner::new(coin.clone(), raw_db.clone()); - let schedulers = HashMap::, Scheduler>::new(); + let mut schedulers = HashMap::, Scheduler>::new(); let mut substrate_signers = HashMap::new(); let mut signers = HashMap::new(); let main_db = MainDb::new(raw_db.clone()); for key in &active_keys { - // TODO: Load existing schedulers + schedulers.insert(key.to_bytes().as_ref().to_vec(), Scheduler::from_db(raw_db, *key).unwrap()); let (substrate_keys, coin_keys) = key_gen.keys(key); @@ -589,14 +589,6 @@ async fn run(mut raw_db: D, coin: C, mut coordi substrate_mutable.scanner.drop_eventuality(id).await; main_db.finish_signing(&mut txn, key, id); txn.commit(); - - // TODO - // 1) We need to stop signing whenever a peer informs us or the chain has an - // eventuality - // 2) If a peer informed us of an eventuality without an outbound payment, stop - // scanning the chain for it (or at least ack it's solely for sanity purposes?) - // 3) When the chain has an eventuality, if it had an outbound payment, report it up to - // Substrate for logging purposes } } } diff --git a/processor/src/scheduler.rs b/processor/src/scheduler.rs index eacd10b8..faa9e44e 100644 --- a/processor/src/scheduler.rs +++ b/processor/src/scheduler.rs @@ -1,14 +1,17 @@ -use std::collections::{VecDeque, HashMap}; +use std::{ + io::{self, Read}, + collections::{VecDeque, HashMap}, +}; -use frost::curve::Ciphersuite; +use ciphersuite::{group::GroupEncoding, Ciphersuite}; use crate::{ coins::{Output, Coin}, - Payment, Plan, + DbTxn, Db, Payment, Plan, }; /// Stateless, deterministic output/payment manager. -#[derive(Debug)] +#[derive(PartialEq, Eq, Debug)] pub struct Scheduler { key: ::G, @@ -38,15 +41,115 @@ pub struct Scheduler { payments: VecDeque>, } +fn scheduler_key(key: &G) -> Vec { + D::key(b"SCHEDULER", b"scheduler", key.to_bytes()) +} + impl Scheduler { - pub fn new(key: ::G) -> Self { - Scheduler { + fn read(key: ::G, reader: &mut R) -> io::Result { + let mut read_plans = || -> io::Result<_> { + let mut all_plans = HashMap::new(); + let mut all_plans_len = [0; 4]; + reader.read_exact(&mut all_plans_len)?; + for _ in 0 .. u32::from_le_bytes(all_plans_len) { + let mut amount = [0; 8]; + reader.read_exact(&mut amount)?; + let amount = u64::from_le_bytes(amount); + + let mut plans = VecDeque::new(); + let mut plans_len = [0; 4]; + reader.read_exact(&mut plans_len)?; + for _ in 0 .. u32::from_le_bytes(plans_len) { + let mut payments = vec![]; + let mut payments_len = [0; 4]; + reader.read_exact(&mut payments_len)?; + + for _ in 0 .. u32::from_le_bytes(payments_len) { + payments.push(Payment::read(reader)?); + } + plans.push_back(payments); + } + all_plans.insert(amount, plans); + } + Ok(all_plans) + }; + let queued_plans = read_plans()?; + let plans = read_plans()?; + + let mut utxos = vec![]; + let mut utxos_len = [0; 4]; + reader.read_exact(&mut utxos_len)?; + for _ in 0 .. u32::from_le_bytes(utxos_len) { + utxos.push(C::Output::read(reader)?); + } + + let mut payments = VecDeque::new(); + let mut payments_len = [0; 4]; + reader.read_exact(&mut payments_len)?; + for _ in 0 .. u32::from_le_bytes(payments_len) { + payments.push_back(Payment::read(reader)?); + } + + Ok(Scheduler { key, queued_plans, plans, utxos, payments }) + } + + // TODO: Get rid of this + // We reserialize the entire scheduler on any mutation to save it to the DB which is horrible + // We should have an incremental solution + fn serialize(&self) -> Vec { + let mut res = Vec::with_capacity(4096); + + let mut write_plans = |plans: &HashMap>>>| { + res.extend(u32::try_from(plans.len()).unwrap().to_le_bytes()); + for (amount, list_of_plans) in plans { + res.extend(amount.to_le_bytes()); + res.extend(u32::try_from(list_of_plans.len()).unwrap().to_le_bytes()); + for plan in list_of_plans { + res.extend(u32::try_from(plan.len()).unwrap().to_le_bytes()); + for payment in plan { + payment.write(&mut res).unwrap(); + } + } + } + }; + write_plans(&self.queued_plans); + write_plans(&self.plans); + + res.extend(u32::try_from(self.utxos.len()).unwrap().to_le_bytes()); + for utxo in &self.utxos { + utxo.write(&mut res).unwrap(); + } + + res.extend(u32::try_from(self.payments.len()).unwrap().to_le_bytes()); + for payment in &self.payments { + payment.write(&mut res).unwrap(); + } + + debug_assert_eq!(&Self::read(self.key, &mut res.as_slice()).unwrap(), self); + res + } + + pub fn new(txn: &mut D::Transaction<'_>, key: ::G) -> Self { + let res = Scheduler { key, queued_plans: HashMap::new(), plans: HashMap::new(), utxos: vec![], payments: VecDeque::new(), - } + }; + // Save it to disk so from_db won't panic if we don't mutate it before rebooting + txn.put(scheduler_key::(&res.key), res.serialize()); + res + } + + pub fn from_db(db: &D, key: ::G) -> io::Result { + let scheduler = db.get(scheduler_key::(&key)).unwrap_or_else(|| { + panic!("loading scheduler from DB without scheduler for {}", hex::encode(key.to_bytes())) + }); + let mut reader_slice = scheduler.as_slice(); + let reader = &mut reader_slice; + + Self::read(key, reader) } fn execute(&mut self, inputs: Vec, mut payments: Vec>) -> Plan { @@ -141,7 +244,12 @@ impl Scheduler { } // Schedule a series of outputs/payments. - pub fn schedule(&mut self, utxos: Vec, payments: Vec>) -> Vec> { + pub fn schedule( + &mut self, + txn: &mut D::Transaction<'_>, + utxos: Vec, + payments: Vec>, + ) -> Vec> { let mut plans = self.add_outputs(utxos); log::info!("scheduling {} new payments", payments.len()); @@ -222,6 +330,8 @@ impl Scheduler { self.utxos.extend(utxos); } + txn.put(scheduler_key::(&self.key), self.serialize()); + log::info!( "created {} plans containing {} payments to sign", plans.len(), @@ -235,7 +345,12 @@ impl Scheduler { // This can be called whenever, so long as it's properly ordered // (it's independent to Serai/the chain we're scheduling over, yet still expects outputs to be // created in the same order Plans are returned in) - pub fn created_output(&mut self, expected: u64, actual: Option) { + pub fn created_output( + &mut self, + txn: &mut D::Transaction<'_>, + expected: u64, + actual: Option, + ) { log::debug!("output expected to have {} had {:?} after fees", expected, actual); // Get the payments this output is expected to handle @@ -280,5 +395,8 @@ impl Scheduler { } self.plans.entry(actual).or_insert(VecDeque::new()).push_back(payments); + + // TODO: This shows how ridiculous the serialize function is + txn.put(scheduler_key::(&self.key), self.serialize()); } } diff --git a/processor/src/tests/wallet.rs b/processor/src/tests/wallet.rs index 2e064b09..9bd1b157 100644 --- a/processor/src/tests/wallet.rs +++ b/processor/src/tests/wallet.rs @@ -49,10 +49,15 @@ pub async fn test_wallet(coin: C) { } }; - let mut scheduler = Scheduler::new(key); + let mut txn = db.txn(); + let mut scheduler = Scheduler::new::(&mut txn, key); let amount = 2 * C::DUST; - let plans = scheduler - .schedule(outputs.clone(), vec![Payment { address: C::address(key), data: None, amount }]); + let plans = scheduler.schedule::( + &mut txn, + outputs.clone(), + vec![Payment { address: C::address(key), data: None, amount }], + ); + txn.commit(); assert_eq!( plans, vec![Plan { diff --git a/tests/reproducible-runtime/src/lib.rs b/tests/reproducible-runtime/src/lib.rs index 72d8f9a7..994cdce4 100644 --- a/tests/reproducible-runtime/src/lib.rs +++ b/tests/reproducible-runtime/src/lib.rs @@ -85,7 +85,7 @@ pub fn reproducibly_builds() { break; } - // If we didn't get resuts from all runners, panic + // If we didn't get results from all runners, panic for item in &res { if item.is_none() { panic!("couldn't get runtime hashes within allowed time");