diff --git a/processor/messages/src/lib.rs b/processor/messages/src/lib.rs index 096fddb9..27d75d2e 100644 --- a/processor/messages/src/lib.rs +++ b/processor/messages/src/lib.rs @@ -102,10 +102,6 @@ pub mod sign { Shares { id: SignId, shares: HashMap> }, // Re-attempt a signing protocol. Reattempt { id: SignId }, - /* TODO - // Completed a signing protocol already. - Completed { session: Session, id: [u8; 32], tx: Vec }, - */ } impl CoordinatorMessage { @@ -118,7 +114,6 @@ pub mod sign { CoordinatorMessage::Preprocesses { id, .. } | CoordinatorMessage::Shares { id, .. } | CoordinatorMessage::Reattempt { id, .. } => id.session, - // TODO CoordinatorMessage::Completed { session, .. } => *session, } } } @@ -131,8 +126,6 @@ pub mod sign { Preprocesses { id: SignId, preprocesses: Vec> }, // Signed shares for the specified signing protocol. Shares { id: SignId, shares: Vec> }, - // Completed a signing protocol already. - // TODO Completed { session: Session, id: [u8; 32], tx: Vec }, } } @@ -330,11 +323,6 @@ impl CoordinatorMessage { sign::CoordinatorMessage::Preprocesses { id, .. } => (0, id), sign::CoordinatorMessage::Shares { id, .. } => (1, id), sign::CoordinatorMessage::Reattempt { id, .. } => (2, id), - // The coordinator should report all reported completions to the processor - // Accordingly, the intent is a combination of plan ID and actual TX - // While transaction alone may suffice, that doesn't cover cross-chain TX ID conflicts, - // which are possible - // TODO sign::CoordinatorMessage::Completed { id, tx, .. } => (3, (id, tx).encode()), }; let mut res = vec![COORDINATOR_UID, TYPE_SIGN_UID, sub]; @@ -406,8 +394,6 @@ impl ProcessorMessage { // Unique since SignId sign::ProcessorMessage::Preprocesses { id, .. } => (1, id.encode()), sign::ProcessorMessage::Shares { id, .. } => (2, id.encode()), - // Unique since a processor will only sign a TX once - // TODO sign::ProcessorMessage::Completed { id, .. } => (3, id.to_vec()), }; let mut res = vec![PROCESSOR_UID, TYPE_SIGN_UID, sub]; diff --git a/processor/src/multisigs/db.rs b/processor/src/multisigs/db.rs deleted file mode 100644 index 3d1d13bd..00000000 --- a/processor/src/multisigs/db.rs +++ /dev/null @@ -1,260 +0,0 @@ -use std::io; - -use ciphersuite::Ciphersuite; -pub use serai_db::*; - -use scale::{Encode, Decode}; -use serai_client::{primitives::Balance, in_instructions::primitives::InInstructionWithBalance}; - -use crate::{ - Get, Plan, - networks::{Output, Transaction, Network}, -}; - -#[derive(Clone, PartialEq, Eq, Debug)] -pub enum PlanFromScanning { - Refund(N::Output, N::Address), - Forward(N::Output), -} - -impl PlanFromScanning { - fn read(reader: &mut R) -> io::Result { - let mut kind = [0xff]; - reader.read_exact(&mut kind)?; - match kind[0] { - 0 => { - let output = N::Output::read(reader)?; - - let mut address_vec_len = [0; 4]; - reader.read_exact(&mut address_vec_len)?; - let mut address_vec = - vec![0; usize::try_from(u32::from_le_bytes(address_vec_len)).unwrap()]; - reader.read_exact(&mut address_vec)?; - let address = - N::Address::try_from(address_vec).map_err(|_| "invalid address saved to disk").unwrap(); - - Ok(PlanFromScanning::Refund(output, address)) - } - 1 => { - let output = N::Output::read(reader)?; - Ok(PlanFromScanning::Forward(output)) - } - _ => panic!("reading unrecognized PlanFromScanning"), - } - } - fn write(&self, writer: &mut W) -> io::Result<()> { - match self { - PlanFromScanning::Refund(output, address) => { - writer.write_all(&[0])?; - output.write(writer)?; - - let address_vec: Vec = - address.clone().try_into().map_err(|_| "invalid address being refunded to").unwrap(); - writer.write_all(&u32::try_from(address_vec.len()).unwrap().to_le_bytes())?; - writer.write_all(&address_vec) - } - PlanFromScanning::Forward(output) => { - writer.write_all(&[1])?; - output.write(writer) - } - } - } -} - -create_db!( - MultisigsDb { - NextBatchDb: () -> u32, - PlanDb: (id: &[u8]) -> Vec, - PlansFromScanningDb: (block_number: u64) -> Vec, - OperatingCostsDb: () -> u64, - ResolvedDb: (tx: &[u8]) -> [u8; 32], - SigningDb: (key: &[u8]) -> Vec, - ForwardedOutputDb: (balance: Balance) -> Vec, - DelayedOutputDb: () -> Vec - } -); - -impl PlanDb { - pub fn save_active_plan( - txn: &mut impl DbTxn, - key: &[u8], - block_number: usize, - plan: &Plan, - operating_costs_at_time: u64, - ) { - let id = plan.id(); - - { - let mut signing = SigningDb::get(txn, key).unwrap_or_default(); - - // If we've already noted we're signing this, return - assert_eq!(signing.len() % 32, 0); - for i in 0 .. (signing.len() / 32) { - if signing[(i * 32) .. ((i + 1) * 32)] == id { - return; - } - } - - signing.extend(&id); - SigningDb::set(txn, key, &signing); - } - - { - let mut buf = block_number.to_le_bytes().to_vec(); - plan.write(&mut buf).unwrap(); - buf.extend(&operating_costs_at_time.to_le_bytes()); - Self::set(txn, &id, &buf); - } - } - - pub fn active_plans(getter: &impl Get, key: &[u8]) -> Vec<(u64, Plan, u64)> { - let signing = SigningDb::get(getter, key).unwrap_or_default(); - let mut res = vec![]; - - assert_eq!(signing.len() % 32, 0); - for i in 0 .. (signing.len() / 32) { - let id = &signing[(i * 32) .. ((i + 1) * 32)]; - let buf = Self::get(getter, id).unwrap(); - - let block_number = u64::from_le_bytes(buf[.. 8].try_into().unwrap()); - let plan = Plan::::read::<&[u8]>(&mut &buf[8 ..]).unwrap(); - assert_eq!(id, &plan.id()); - let operating_costs = u64::from_le_bytes(buf[(buf.len() - 8) ..].try_into().unwrap()); - res.push((block_number, plan, operating_costs)); - } - res - } - - pub fn plan_by_key_with_self_change( - getter: &impl Get, - key: ::G, - id: [u8; 32], - ) -> bool { - let plan = Plan::::read::<&[u8]>(&mut &Self::get(getter, &id).unwrap()[8 ..]).unwrap(); - assert_eq!(plan.id(), id); - if let Some(change) = N::change_address(plan.key) { - (key == plan.key) && (Some(change) == plan.change) - } else { - false - } - } -} - -impl OperatingCostsDb { - pub fn take_operating_costs(txn: &mut impl DbTxn) -> u64 { - let existing = Self::get(txn).unwrap_or_default(); - txn.del(Self::key()); - existing - } - pub fn set_operating_costs(txn: &mut impl DbTxn, amount: u64) { - if amount != 0 { - Self::set(txn, &amount); - } - } -} - -impl ResolvedDb { - pub fn resolve_plan( - txn: &mut impl DbTxn, - key: &[u8], - plan: [u8; 32], - resolution: &>::Id, - ) { - let mut signing = SigningDb::get(txn, key).unwrap_or_default(); - assert_eq!(signing.len() % 32, 0); - - let mut found = false; - for i in 0 .. (signing.len() / 32) { - let start = i * 32; - let end = i + 32; - if signing[start .. end] == plan { - found = true; - signing = [&signing[.. start], &signing[end ..]].concat(); - break; - } - } - - if !found { - log::warn!("told to finish signing {} yet wasn't actively signing it", hex::encode(plan)); - } - SigningDb::set(txn, key, &signing); - Self::set(txn, resolution.as_ref(), &plan); - } -} - -impl PlansFromScanningDb { - pub fn set_plans_from_scanning( - txn: &mut impl DbTxn, - block_number: usize, - plans: Vec>, - ) { - let mut buf = vec![]; - for plan in plans { - plan.write(&mut buf).unwrap(); - } - Self::set(txn, block_number.try_into().unwrap(), &buf); - } - - pub fn take_plans_from_scanning( - txn: &mut impl DbTxn, - block_number: usize, - ) -> Option>> { - let block_number = u64::try_from(block_number).unwrap(); - let res = Self::get(txn, block_number).map(|plans| { - let mut plans_ref = plans.as_slice(); - let mut res = vec![]; - while !plans_ref.is_empty() { - res.push(PlanFromScanning::::read(&mut plans_ref).unwrap()); - } - res - }); - if res.is_some() { - txn.del(Self::key(block_number)); - } - res - } -} - -impl ForwardedOutputDb { - pub fn save_forwarded_output(txn: &mut impl DbTxn, instruction: &InInstructionWithBalance) { - let mut existing = Self::get(txn, instruction.balance).unwrap_or_default(); - existing.extend(instruction.encode()); - Self::set(txn, instruction.balance, &existing); - } - - pub fn take_forwarded_output( - txn: &mut impl DbTxn, - balance: Balance, - ) -> Option { - let outputs = Self::get(txn, balance)?; - let mut outputs_ref = outputs.as_slice(); - let res = InInstructionWithBalance::decode(&mut outputs_ref).unwrap(); - assert!(outputs_ref.len() < outputs.len()); - if outputs_ref.is_empty() { - txn.del(Self::key(balance)); - } else { - Self::set(txn, balance, &outputs); - } - Some(res) - } -} - -impl DelayedOutputDb { - pub fn save_delayed_output(txn: &mut impl DbTxn, instruction: &InInstructionWithBalance) { - let mut existing = Self::get(txn).unwrap_or_default(); - existing.extend(instruction.encode()); - Self::set(txn, &existing); - } - - pub fn take_delayed_outputs(txn: &mut impl DbTxn) -> Vec { - let Some(outputs) = Self::get(txn) else { return vec![] }; - txn.del(Self::key()); - - let mut outputs_ref = outputs.as_slice(); - let mut res = vec![]; - while !outputs_ref.is_empty() { - res.push(InInstructionWithBalance::decode(&mut outputs_ref).unwrap()); - } - res - } -} diff --git a/processor/src/multisigs/mod.rs b/processor/src/multisigs/mod.rs index 92ea0271..c20a922c 100644 --- a/processor/src/multisigs/mod.rs +++ b/processor/src/multisigs/mod.rs @@ -1,1070 +1,8 @@ -use core::time::Duration; -use std::collections::HashSet; - -use ciphersuite::{group::GroupEncoding, Ciphersuite}; - -use scale::{Encode, Decode}; -use messages::SubstrateContext; - -use serai_client::{ - primitives::{MAX_DATA_LEN, ExternalAddress, BlockHash, Data}, - in_instructions::primitives::{ - InInstructionWithBalance, Batch, RefundableInInstruction, Shorthand, MAX_BATCH_SIZE, - }, - coins::primitives::{OutInstruction, OutInstructionWithBalance}, -}; - -use log::{info, error}; - -use tokio::time::sleep; - -/* TODO -#[cfg(not(test))] -mod scanner; -#[cfg(test)] -pub mod scanner; -*/ - -use scanner::{ScannerEvent, ScannerHandle, Scanner}; - -mod db; -use db::*; - -pub(crate) mod scheduler; -use scheduler::Scheduler; - -use crate::{ - Get, Db, Payment, Plan, - networks::{OutputType, Output, SignableTransaction, Eventuality, Block, PreparedSend, Network}, -}; - -// InInstructionWithBalance from an external output -fn instruction_from_output( - output: &N::Output, -) -> (Option, Option) { - assert_eq!(output.kind(), OutputType::External); - - let presumed_origin = output.presumed_origin().map(|address| { - ExternalAddress::new( - address - .try_into() - .map_err(|_| ()) - .expect("presumed origin couldn't be converted to a Vec"), - ) - .expect("presumed origin exceeded address limits") - }); - - let mut data = output.data(); - let max_data_len = usize::try_from(MAX_DATA_LEN).unwrap(); - if data.len() > max_data_len { - error!( - "data in output {} exceeded MAX_DATA_LEN ({MAX_DATA_LEN}): {}. skipping", - hex::encode(output.id()), - data.len(), - ); - return (presumed_origin, None); - } - - let shorthand = match Shorthand::decode(&mut data) { - Ok(shorthand) => shorthand, - Err(e) => { - info!("data in output {} wasn't valid shorthand: {e:?}", hex::encode(output.id())); - return (presumed_origin, None); - } - }; - let instruction = match RefundableInInstruction::try_from(shorthand) { - Ok(instruction) => instruction, - Err(e) => { - info!( - "shorthand in output {} wasn't convertible to a RefundableInInstruction: {e:?}", - hex::encode(output.id()) - ); - return (presumed_origin, None); - } - }; - - let mut balance = output.balance(); - // Deduct twice the cost to aggregate to prevent economic attacks by malicious miners against - // other users - balance.amount.0 -= 2 * N::COST_TO_AGGREGATE; - - ( - instruction.origin.or(presumed_origin), - Some(InInstructionWithBalance { instruction: instruction.instruction, balance }), - ) -} - -#[derive(Clone, Copy, PartialEq, Eq, Debug)] -enum RotationStep { - // Use the existing multisig for all actions (steps 1-3) - UseExisting, - // Use the new multisig as change (step 4) - NewAsChange, - // The existing multisig is expected to solely forward transactions at this point (step 5) - ForwardFromExisting, - // The existing multisig is expected to finish its own transactions and do nothing more - // (step 6) - ClosingExisting, -} - -// This explicitly shouldn't take the database as we prepare Plans we won't execute for fee -// estimates -async fn prepare_send( - network: &N, - block_number: usize, - plan: Plan, - operating_costs: u64, -) -> PreparedSend { - loop { - match network.prepare_send(block_number, plan.clone(), operating_costs).await { - Ok(prepared) => { - return prepared; - } - Err(e) => { - error!("couldn't prepare a send for plan {}: {e}", hex::encode(plan.id())); - // The processor is either trying to create an invalid TX (fatal) or the node went - // offline - // The former requires a patch, the latter is a connection issue - // If the latter, this is an appropriate sleep. If the former, we should panic, yet - // this won't flood the console ad infinitum - sleep(Duration::from_secs(60)).await; - } - } - } -} - -pub struct MultisigViewer { - activation_block: usize, - key: ::G, - scheduler: N::Scheduler, -} - #[allow(clippy::type_complexity)] #[derive(Clone, Debug)] pub enum MultisigEvent { // Batches to publish Batches(Option<(::G, ::G)>, Vec), // Eventuality completion found on-chain - Completed(Vec, [u8; 32], ::Completion), -} - -pub struct MultisigManager { - scanner: ScannerHandle, - existing: Option>, - new: Option>, -} - -impl MultisigManager { - pub async fn new( - raw_db: &D, - network: &N, - ) -> ( - Self, - Vec<::G>, - Vec<(Plan, N::SignableTransaction, N::Eventuality)>, - ) { - // The scanner has no long-standing orders to re-issue - let (mut scanner, current_keys) = Scanner::new(network.clone(), raw_db.clone()); - - let mut schedulers = vec![]; - - assert!(current_keys.len() <= 2); - let mut actively_signing = vec![]; - for (_, key) in ¤t_keys { - schedulers.push(N::Scheduler::from_db(raw_db, *key, N::NETWORK).unwrap()); - - // Load any TXs being actively signed - let key = key.to_bytes(); - for (block_number, plan, operating_costs) in PlanDb::active_plans::(raw_db, key.as_ref()) { - let block_number = block_number.try_into().unwrap(); - - let id = plan.id(); - info!("reloading plan {}: {:?}", hex::encode(id), plan); - - let key_bytes = plan.key.to_bytes(); - - let Some((tx, eventuality)) = - prepare_send(network, block_number, plan.clone(), operating_costs).await.tx - else { - panic!("previously created transaction is no longer being created") - }; - - scanner - .register_eventuality(key_bytes.as_ref(), block_number, id, eventuality.clone()) - .await; - actively_signing.push((plan, tx, eventuality)); - } - } - - ( - MultisigManager { - scanner, - existing: current_keys.first().copied().map(|(activation_block, key)| MultisigViewer { - activation_block, - key, - scheduler: schedulers.remove(0), - }), - new: current_keys.get(1).copied().map(|(activation_block, key)| MultisigViewer { - activation_block, - key, - scheduler: schedulers.remove(0), - }), - }, - current_keys.into_iter().map(|(_, key)| key).collect(), - actively_signing, - ) - } - - /// Returns the block number for a block hash, if it's known and all keys have scanned the block. - // This is guaranteed to atomically increment so long as no new keys are added to the scanner - // which activate at a block before the currently highest scanned block. This is prevented by - // the processor waiting for `Batch` inclusion before scanning too far ahead, and activation only - // happening after the "too far ahead" window. - pub async fn block_number( - &self, - getter: &G, - hash: &>::Id, - ) -> Option { - let latest = ScannerHandle::::block_number(getter, hash)?; - - // While the scanner has cemented this block, that doesn't mean it's been scanned for all - // keys - // ram_scanned will return the lowest scanned block number out of all keys - if latest > self.scanner.ram_scanned().await { - return None; - } - Some(latest) - } - - pub async fn add_key( - &mut self, - txn: &mut D::Transaction<'_>, - activation_block: usize, - external_key: ::G, - ) { - self.scanner.register_key(txn, activation_block, external_key).await; - let viewer = Some(MultisigViewer { - activation_block, - key: external_key, - scheduler: N::Scheduler::new::(txn, external_key, N::NETWORK), - }); - - if self.existing.is_none() { - self.existing = viewer; - return; - } - self.new = viewer; - } - - fn current_rotation_step(&self, block_number: usize) -> RotationStep { - let Some(new) = self.new.as_ref() else { return RotationStep::UseExisting }; - - // Period numbering here has no meaning other than these are the time values useful here, and - // the order they're calculated in. They have no reference/shared marker with anything else - - // ESTIMATED_BLOCK_TIME_IN_SECONDS is fine to use here. While inaccurate, it shouldn't be - // drastically off, and even if it is, it's a hiccup to latency handling only possible when - // rotating. The error rate wouldn't be acceptable if it was allowed to accumulate over time, - // yet rotation occurs on Serai's clock, disconnecting any errors here from any prior. - - // N::CONFIRMATIONS + 10 minutes - let period_1_start = new.activation_block + - N::CONFIRMATIONS + - (10usize * 60).div_ceil(N::ESTIMATED_BLOCK_TIME_IN_SECONDS); - - // N::CONFIRMATIONS - let period_2_start = period_1_start + N::CONFIRMATIONS; - - // 6 hours after period 2 - // Also ensure 6 hours is greater than the amount of CONFIRMATIONS, for sanity purposes - let period_3_start = - period_2_start + ((6 * 60 * 60) / N::ESTIMATED_BLOCK_TIME_IN_SECONDS).max(N::CONFIRMATIONS); - - if block_number < period_1_start { - RotationStep::UseExisting - } else if block_number < period_2_start { - RotationStep::NewAsChange - } else if block_number < period_3_start { - RotationStep::ForwardFromExisting - } else { - RotationStep::ClosingExisting - } - } - - // Convert new Burns to Payments. - // - // Also moves payments from the old Scheduler to the new multisig if the step calls for it. - fn burns_to_payments( - &mut self, - txn: &mut D::Transaction<'_>, - step: RotationStep, - burns: Vec, - ) -> (Vec>, Vec>) { - let mut payments = vec![]; - for out in burns { - let OutInstructionWithBalance { instruction: OutInstruction { address, data }, balance } = - out; - assert_eq!(balance.coin.network(), N::NETWORK); - - if let Ok(address) = N::Address::try_from(address.consume()) { - payments.push(Payment { address, data: data.map(Data::consume), balance }); - } - } - - let payments = payments; - match step { - RotationStep::UseExisting | RotationStep::NewAsChange => (payments, vec![]), - RotationStep::ForwardFromExisting | RotationStep::ClosingExisting => { - // Consume any payments the prior scheduler was unable to complete - // This should only actually matter once - let mut new_payments = self.existing.as_mut().unwrap().scheduler.consume_payments::(txn); - // Add the new payments - new_payments.extend(payments); - (vec![], new_payments) - } - } - } - - fn split_outputs_by_key(&self, outputs: Vec) -> (Vec, Vec) { - let mut existing_outputs = Vec::with_capacity(outputs.len()); - let mut new_outputs = vec![]; - - let existing_key = self.existing.as_ref().unwrap().key; - let new_key = self.new.as_ref().map(|new| new.key); - for output in outputs { - if output.key() == existing_key { - existing_outputs.push(output); - } else { - assert_eq!(Some(output.key()), new_key); - new_outputs.push(output); - } - } - - (existing_outputs, new_outputs) - } - - fn refund_plan( - scheduler: &mut N::Scheduler, - txn: &mut D::Transaction<'_>, - output: N::Output, - refund_to: N::Address, - ) -> Plan { - log::info!("creating refund plan for {}", hex::encode(output.id())); - assert_eq!(output.kind(), OutputType::External); - scheduler.refund_plan::(txn, output, refund_to) - } - - // Returns the plan for forwarding if one is needed. - // Returns None if one is not needed to forward this output. - fn forward_plan(&mut self, txn: &mut D::Transaction<'_>, output: &N::Output) -> Option> { - log::info!("creating forwarding plan for {}", hex::encode(output.id())); - let res = self.existing.as_mut().unwrap().scheduler.forward_plan::( - txn, - output.clone(), - self.new.as_ref().expect("forwarding plan yet no new multisig").key, - ); - if res.is_none() { - log::info!("no forwarding plan was necessary for {}", hex::encode(output.id())); - } - res - } - - // Filter newly received outputs due to the step being RotationStep::ClosingExisting. - // - // Returns the Plans for the `Branch`s which should be created off outputs which passed the - // filter. - fn filter_outputs_due_to_closing( - &mut self, - txn: &mut D::Transaction<'_>, - existing_outputs: &mut Vec, - ) -> Vec> { - /* - The document says to only handle outputs we created. We don't know what outputs we - created. We do have an ordered view of equivalent outputs however, and can assume the - first (and likely only) ones are the ones we created. - - Accordingly, only handling outputs we created should be definable as only handling - outputs from the resolution of Eventualities. - - This isn't feasible. It requires knowing what Eventualities were completed in this block, - when we handle this block, which we don't know without fully serialized scanning + Batch - publication. - - Take the following scenario: - 1) A network uses 10 confirmations. Block x is scanned, meaning x+9a exists. - 2) 67% of nodes process x, create, sign, and publish a TX, creating an Eventuality. - 3) A reorganization to a shorter chain occurs, including the published TX in x+1b. - 4) The 33% of nodes which are latent will be allowed to scan x+1b as soon as x+10b - exists. They won't wait for Serai to include the Batch for x until they try to scan - x+10b. - 5) These latent nodes will handle x+1b, post-create an Eventuality, post-learn x+1b - contained resolutions, changing how x+1b should've been interpreted. - - We either have to: - A) Fully serialize scanning (removing the ability to utilize throughput to allow higher - latency, at least while the step is `ClosingExisting`). - B) Create Eventualities immediately, which we can't do as then both the external - network's clock AND Serai's clock can trigger Eventualities, removing ordering. - We'd need to shift entirely to the external network's clock, only handling Burns - outside the parallelization window (which would be extremely latent). - C) Use a different mechanism to determine if we created an output. - D) Re-define which outputs are still to be handled after the 6 hour period expires, such - that the multisig's lifetime cannot be further extended yet it does fulfill its - responsibility. - - External outputs to the existing multisig will be: - - Scanned before the rotation and unused (as used External outputs become Change) - - Forwarded immediately upon scanning - - Not scanned before the cut off time (and accordingly dropped) - - For the first case, since they're scanned before the rotation and unused, they'll be - forwarded with all other available outputs (since they'll be available when scanned). - - Change outputs will be: - - Scanned before the rotation and forwarded with all other available outputs - - Forwarded immediately upon scanning - - Not scanned before the cut off time, requiring an extension exclusive to these outputs - - The important thing to note about honest Change outputs to the existing multisig is that - they'll only be created within `CONFIRMATIONS+1` blocks of the activation block. Also - important to note is that there's another explicit window of `CONFIRMATIONS` before the - 6 hour window. - - Eventualities are not guaranteed to be known before we scan the block containing their - resolution. They are guaranteed to be known within `CONFIRMATIONS-1` blocks however, due - to the limitation on how far we'll scan ahead. - - This means we will know of all Eventualities related to Change outputs we need to forward - before the 6 hour period begins (as forwarding outputs will not create any Change outputs - to the existing multisig). - - This means a definition of complete can be defined as: - 1) Handled all Branch outputs - 2) Forwarded all External outputs received before the end of 6 hour window - 3) Forwarded the results of all Eventualities with Change, which will have been created - before the 6 hour window - - How can we track and ensure this without needing to check if an output is from the - resolution of an Eventuality? - - 1) We only create Branch outputs before the 6 hour window starts. These are guaranteed - to appear within `CONFIRMATIONS` blocks. They will exist with arbitrary depth however, - meaning that upon completion they will spawn several more Eventualities. The further - created Eventualities re-risk being present after the 6 hour period ends. - - We can: - 1) Build a queue for Branch outputs, delaying their handling until relevant - Eventualities are guaranteed to be present. - - This solution would theoretically work for all outputs and allow collapsing this - problem to simply: - - > Accordingly, only handling outputs we created should be definable as only - handling outputs from the resolution of Eventualities. - - 2) Create all Eventualities under a Branch at time of Branch creation. - This idea fails as Plans are tightly bound to outputs. - - 3) Don't track Branch outputs by Eventualities, yet by the amount of Branch outputs - remaining. Any Branch output received, of a useful amount, is assumed to be our - own and handled. All other Branch outputs, even if they're the completion of some - Eventuality, are dropped. - - This avoids needing any additional queue, avoiding additional pipelining/latency. - - 2) External outputs are self-evident. We simply stop handling them at the cut-off point, - and only start checking after `CONFIRMATIONS` blocks if all Eventualities are - complete. - - 3) Since all Change Eventualities will be known prior to the 6 hour window's beginning, - we can safely check if a received Change output is the resolution of an Eventuality. - We only need to forward it if so. Forwarding it simply requires only checking if - Eventualities are complete after `CONFIRMATIONS` blocks, same as for straggling - External outputs. - */ - - let mut plans = vec![]; - existing_outputs.retain(|output| { - match output.kind() { - OutputType::External | OutputType::Forwarded => false, - OutputType::Branch => { - let scheduler = &mut self.existing.as_mut().unwrap().scheduler; - // There *would* be a race condition here due to the fact we only mark a `Branch` output - // as needed when we process the block (and handle scheduling), yet actual `Branch` - // outputs may appear as soon as the next block (and we scan the next block before we - // process the prior block) - // - // Unlike Eventuality checking, which happens on scanning and is therefore asynchronous, - // all scheduling (and this check against the scheduler) happens on processing, which is - // synchronous - // - // While we could move Eventuality checking into the block processing, removing its - // asynchonicity, we could only check data the Scanner deems important. The Scanner won't - // deem important Eventuality resolutions which don't create an output to Serai unless - // it knows of the Eventuality. Accordingly, at best we could have a split role (the - // Scanner noting completion of Eventualities which don't have relevant outputs, the - // processing noting completion of ones which do) - // - // This is unnecessary, due to the current flow around Eventuality resolutions and the - // current bounds naturally found being sufficiently amenable, yet notable for the future - if scheduler.can_use_branch(output.balance()) { - // We could simply call can_use_branch, yet it'd have an edge case where if we receive - // two outputs for 100, and we could use one such output, we'd handle both. - // - // Individually schedule each output once confirming they're usable in order to avoid - // this. - let mut plan = scheduler.schedule::( - txn, - vec![output.clone()], - vec![], - self.new.as_ref().unwrap().key, - false, - ); - assert_eq!(plan.len(), 1); - let plan = plan.remove(0); - plans.push(plan); - } - false - } - OutputType::Change => { - // If the TX containing this output resolved an Eventuality... - if let Some(plan) = ResolvedDb::get(txn, output.tx_id().as_ref()) { - // And the Eventuality had change... - // We need this check as Eventualities have a race condition and can't be relied - // on, as extensively detailed above. Eventualities explicitly with change do have - // a safe timing window however - if PlanDb::plan_by_key_with_self_change::( - txn, - // Pass the key so the DB checks the Plan's key is this multisig's, preventing a - // potential issue where the new multisig creates a Plan with change *and a - // payment to the existing multisig's change address* - self.existing.as_ref().unwrap().key, - plan, - ) { - // Then this is an honest change output we need to forward - // (or it's a payment to the change address in the same transaction as an honest - // change output, which is fine to let slip in) - return true; - } - } - false - } - } - }); - plans - } - - // Returns the Plans caused from a block being acknowledged. - // - // Will rotate keys if the block acknowledged is the retirement block. - async fn plans_from_block( - &mut self, - txn: &mut D::Transaction<'_>, - block_number: usize, - block_id: >::Id, - step: &mut RotationStep, - burns: Vec, - ) -> (bool, Vec>, HashSet<[u8; 32]>) { - let (mut existing_payments, mut new_payments) = self.burns_to_payments(txn, *step, burns); - - let mut plans = vec![]; - let mut plans_from_scanning = HashSet::new(); - - // We now have to acknowledge the acknowledged block, if it's new - // It won't be if this block's `InInstruction`s were split into multiple `Batch`s - let (acquired_lock, (mut existing_outputs, new_outputs)) = { - let (acquired_lock, mut outputs) = if ScannerHandle::::db_scanned(txn) - .expect("published a Batch despite never scanning a block") < - block_number - { - // Load plans crated when we scanned the block - let scanning_plans = - PlansFromScanningDb::take_plans_from_scanning::(txn, block_number).unwrap(); - // Expand into actual plans - plans = scanning_plans - .into_iter() - .map(|plan| match plan { - PlanFromScanning::Refund(output, refund_to) => { - let existing = self.existing.as_mut().unwrap(); - if output.key() == existing.key { - Self::refund_plan(&mut existing.scheduler, txn, output, refund_to) - } else { - let new = self - .new - .as_mut() - .expect("new multisig didn't expect yet output wasn't for existing multisig"); - assert_eq!(output.key(), new.key, "output wasn't for existing nor new multisig"); - Self::refund_plan(&mut new.scheduler, txn, output, refund_to) - } - } - PlanFromScanning::Forward(output) => self - .forward_plan(txn, &output) - .expect("supposed to forward an output yet no forwarding plan"), - }) - .collect(); - - for plan in &plans { - plans_from_scanning.insert(plan.id()); - } - - let (is_retirement_block, outputs) = self.scanner.ack_block(txn, block_id.clone()).await; - if is_retirement_block { - let existing = self.existing.take().unwrap(); - assert!(existing.scheduler.empty()); - self.existing = self.new.take(); - *step = RotationStep::UseExisting; - assert!(existing_payments.is_empty()); - existing_payments = new_payments; - new_payments = vec![]; - } - (true, outputs) - } else { - (false, vec![]) - }; - - // Remove all outputs already present in plans - let mut output_set = HashSet::new(); - for plan in &plans { - for input in &plan.inputs { - output_set.insert(input.id().as_ref().to_vec()); - } - } - outputs.retain(|output| !output_set.remove(output.id().as_ref())); - assert_eq!(output_set.len(), 0); - - (acquired_lock, self.split_outputs_by_key(outputs)) - }; - - // If we're closing the existing multisig, filter its outputs down - if *step == RotationStep::ClosingExisting { - plans.extend(self.filter_outputs_due_to_closing(txn, &mut existing_outputs)); - } - - // Now that we've done all our filtering, schedule the existing multisig's outputs - plans.extend({ - let existing = self.existing.as_mut().unwrap(); - let existing_key = existing.key; - self.existing.as_mut().unwrap().scheduler.schedule::( - txn, - existing_outputs, - existing_payments, - match *step { - RotationStep::UseExisting => existing_key, - RotationStep::NewAsChange | - RotationStep::ForwardFromExisting | - RotationStep::ClosingExisting => self.new.as_ref().unwrap().key, - }, - match *step { - RotationStep::UseExisting | RotationStep::NewAsChange => false, - RotationStep::ForwardFromExisting | RotationStep::ClosingExisting => true, - }, - ) - }); - - for plan in &plans { - // This first equality should 'never meaningfully' be false - // All created plans so far are by the existing multisig EXCEPT: - // A) If we created a refund plan from the new multisig (yet that wouldn't have change) - // B) The existing Scheduler returned a Plan for the new key (yet that happens with the SC - // scheduler, yet that doesn't have change) - // Despite being 'unnecessary' now, it's better to explicitly ensure and be robust - if plan.key == self.existing.as_ref().unwrap().key { - if let Some(change) = N::change_address(plan.key) { - if plan.change == Some(change) { - // Assert these (self-change) are only created during the expected step - match *step { - RotationStep::UseExisting => {} - RotationStep::NewAsChange | - RotationStep::ForwardFromExisting | - RotationStep::ClosingExisting => panic!("change was set to self despite rotating"), - } - } - } - } - } - - // Schedule the new multisig's outputs too - if let Some(new) = self.new.as_mut() { - plans.extend(new.scheduler.schedule::(txn, new_outputs, new_payments, new.key, false)); - } - - (acquired_lock, plans, plans_from_scanning) - } - - /// Handle a SubstrateBlock event, building the relevant Plans. - pub async fn substrate_block( - &mut self, - txn: &mut D::Transaction<'_>, - network: &N, - context: SubstrateContext, - burns: Vec, - ) -> (bool, Vec<(::G, [u8; 32], N::SignableTransaction, N::Eventuality)>) - { - let mut block_id = >::Id::default(); - block_id.as_mut().copy_from_slice(context.network_latest_finalized_block.as_ref()); - let block_number = ScannerHandle::::block_number(txn, &block_id) - .expect("SubstrateBlock with context we haven't synced"); - - // Determine what step of rotation we're currently in - let mut step = self.current_rotation_step(block_number); - - // Get the Plans from this block - let (acquired_lock, plans, plans_from_scanning) = - self.plans_from_block(txn, block_number, block_id, &mut step, burns).await; - - let res = { - let mut res = Vec::with_capacity(plans.len()); - - for plan in plans { - let id = plan.id(); - info!("preparing plan {}: {:?}", hex::encode(id), plan); - - let key = plan.key; - let key_bytes = key.to_bytes(); - - let (tx, post_fee_branches) = { - let running_operating_costs = OperatingCostsDb::take_operating_costs(txn); - - PlanDb::save_active_plan::( - txn, - key_bytes.as_ref(), - block_number, - &plan, - running_operating_costs, - ); - - // If this Plan is from the scanner handler below, don't take the opportunity to amortze - // operating costs - // It operates with limited context, and on a different clock, making it nable to react - // to operating costs - // Despite this, in order to properly save forwarded outputs' instructions, it needs to - // know the actual value forwarded outputs will be created with - // Including operating costs prevents that - let from_scanning = plans_from_scanning.contains(&plan.id()); - let to_use_operating_costs = if from_scanning { 0 } else { running_operating_costs }; - - let PreparedSend { tx, post_fee_branches, mut operating_costs } = - prepare_send(network, block_number, plan, to_use_operating_costs).await; - - // Restore running_operating_costs to operating_costs - if from_scanning { - // If we're forwarding (or refunding) this output, operating_costs should still be 0 - // Either this TX wasn't created, causing no operating costs, or it was yet it'd be - // amortized - assert_eq!(operating_costs, 0); - - operating_costs += running_operating_costs; - } - - OperatingCostsDb::set_operating_costs(txn, operating_costs); - - (tx, post_fee_branches) - }; - - for branch in post_fee_branches { - let existing = self.existing.as_mut().unwrap(); - let to_use = if key == existing.key { - existing - } else { - let new = self - .new - .as_mut() - .expect("plan wasn't for existing multisig yet there wasn't a new multisig"); - assert_eq!(key, new.key); - new - }; - - to_use.scheduler.created_output::(txn, branch.expected, branch.actual); - } - - if let Some((tx, eventuality)) = tx { - // The main function we return to will send an event to the coordinator which must be - // fired before these registered Eventualities have their Completions fired - // Safety is derived from a mutable lock on the Scanner being preserved, preventing - // scanning (and detection of Eventuality resolutions) before it's released - // It's only released by the main function after it does what it will - self - .scanner - .register_eventuality(key_bytes.as_ref(), block_number, id, eventuality.clone()) - .await; - - res.push((key, id, tx, eventuality)); - } - - // TODO: If the TX is None, restore its inputs to the scheduler for efficiency's sake - // If this TODO is removed, also reduce the operating costs - } - res - }; - (acquired_lock, res) - } - - pub async fn release_scanner_lock(&mut self) { - self.scanner.release_lock().await; - } - - pub async fn scanner_event_to_multisig_event( - &self, - txn: &mut D::Transaction<'_>, - network: &N, - msg: ScannerEvent, - ) -> MultisigEvent { - let (block_number, event) = match msg { - ScannerEvent::Block { is_retirement_block, block, mut outputs } => { - // Since the Scanner is asynchronous, the following is a concern for race conditions - // We safely know the step of a block since keys are declared, and the Scanner is safe - // with respect to the declaration of keys - // Accordingly, the following calls regarding new keys and step should be safe - let block_number = ScannerHandle::::block_number(txn, &block) - .expect("didn't have the block number for a block we just scanned"); - let step = self.current_rotation_step(block_number); - - // Instructions created from this block - let mut instructions = vec![]; - - // If any of these outputs were forwarded, create their instruction now - for output in &outputs { - if output.kind() != OutputType::Forwarded { - continue; - } - - if let Some(instruction) = ForwardedOutputDb::take_forwarded_output(txn, output.balance()) - { - instructions.push(instruction); - } - } - - // If the remaining outputs aren't externally received funds, don't handle them as - // instructions - outputs.retain(|output| output.kind() == OutputType::External); - - // These plans are of limited context. They're only allowed the outputs newly received - // within this block and are intended to handle forwarding transactions/refunds - let mut plans = vec![]; - - // If the old multisig is explicitly only supposed to forward, create all such plans now - if step == RotationStep::ForwardFromExisting { - let mut i = 0; - while i < outputs.len() { - let output = &outputs[i]; - let plans = &mut plans; - let txn = &mut *txn; - - #[allow(clippy::redundant_closure_call)] - let should_retain = (|| async move { - // If this output doesn't belong to the existing multisig, it shouldn't be forwarded - if output.key() != self.existing.as_ref().unwrap().key { - return true; - } - - let plans_at_start = plans.len(); - let (refund_to, instruction) = instruction_from_output::(output); - if let Some(mut instruction) = instruction { - let Some(shimmed_plan) = N::Scheduler::shim_forward_plan( - output.clone(), - self.new.as_ref().expect("forwarding from existing yet no new multisig").key, - ) else { - // If this network doesn't need forwarding, report the output now - return true; - }; - plans.push(PlanFromScanning::::Forward(output.clone())); - - // Set the instruction for this output to be returned - // We need to set it under the amount it's forwarded with, so prepare its forwarding - // TX to determine the fees involved - let PreparedSend { tx, post_fee_branches: _, operating_costs } = - prepare_send(network, block_number, shimmed_plan, 0).await; - // operating_costs should not increase in a forwarding TX - assert_eq!(operating_costs, 0); - - // If this actually forwarded any coins, save the output as forwarded - // If this didn't create a TX, we don't bother saving the output as forwarded - // The fact we already created and pushed a plan still using this output will cause - // it to not be retained here, and later the plan will be dropped as this did here, - // letting it die out - if let Some(tx) = &tx { - instruction.balance.amount.0 -= tx.0.fee(); - - /* - Sending a Plan, with arbitrary data proxying the InInstruction, would require - adding a flow for networks which drop their data to still embed arbitrary data. - It'd also have edge cases causing failures (we'd need to manually provide the - origin if it was implied, which may exceed the encoding limit). - - Instead, we save the InInstruction as we scan this output. Then, when the - output is successfully forwarded, we simply read it from the local database. - This also saves the costs of embedding arbitrary data. - - Since we can't rely on the Eventuality system to detect if it's a forwarded - transaction, due to the asynchonicity of the Eventuality system, we instead - interpret an Forwarded output which has an amount associated with an - InInstruction which was forwarded as having been forwarded. - */ - ForwardedOutputDb::save_forwarded_output(txn, &instruction); - } - } else if let Some(refund_to) = refund_to { - if let Ok(refund_to) = refund_to.consume().try_into() { - // Build a dedicated Plan refunding this - plans.push(PlanFromScanning::Refund(output.clone(), refund_to)); - } - } - - // Only keep if we didn't make a Plan consuming it - plans_at_start == plans.len() - })() - .await; - if should_retain { - i += 1; - continue; - } - outputs.remove(i); - } - } - - for output in outputs { - // If this is an External transaction to the existing multisig, and we're either solely - // forwarding or closing the existing multisig, drop it - // In the case of the forwarding case, we'll report it once it hits the new multisig - if (match step { - RotationStep::UseExisting | RotationStep::NewAsChange => false, - RotationStep::ForwardFromExisting | RotationStep::ClosingExisting => true, - }) && (output.key() == self.existing.as_ref().unwrap().key) - { - continue; - } - - let (refund_to, instruction) = instruction_from_output::(&output); - let Some(instruction) = instruction else { - if let Some(refund_to) = refund_to { - if let Ok(refund_to) = refund_to.consume().try_into() { - plans.push(PlanFromScanning::Refund(output.clone(), refund_to)); - } - } - continue; - }; - - // Delay External outputs received to new multisig earlier than expected - if Some(output.key()) == self.new.as_ref().map(|new| new.key) { - match step { - RotationStep::UseExisting => { - DelayedOutputDb::save_delayed_output(txn, &instruction); - continue; - } - RotationStep::NewAsChange | - RotationStep::ForwardFromExisting | - RotationStep::ClosingExisting => {} - } - } - - instructions.push(instruction); - } - - // Save the plans created while scanning - // TODO: Should we combine all of these plans to reduce the fees incurred from their - // execution? They're refunds and forwards. Neither should need isolate Plan/Eventualities. - PlansFromScanningDb::set_plans_from_scanning(txn, block_number, plans); - - // If any outputs were delayed, append them into this block - match step { - RotationStep::UseExisting => {} - RotationStep::NewAsChange | - RotationStep::ForwardFromExisting | - RotationStep::ClosingExisting => { - instructions.extend(DelayedOutputDb::take_delayed_outputs(txn)); - } - } - - let mut block_hash = [0; 32]; - block_hash.copy_from_slice(block.as_ref()); - let mut batch_id = NextBatchDb::get(txn).unwrap_or_default(); - - // start with empty batch - let mut batches = vec![Batch { - network: N::NETWORK, - id: batch_id, - block: BlockHash(block_hash), - instructions: vec![], - }]; - - for instruction in instructions { - let batch = batches.last_mut().unwrap(); - batch.instructions.push(instruction); - - // check if batch is over-size - if batch.encode().len() > MAX_BATCH_SIZE { - // pop the last instruction so it's back in size - let instruction = batch.instructions.pop().unwrap(); - - // bump the id for the new batch - batch_id += 1; - - // make a new batch with this instruction included - batches.push(Batch { - network: N::NETWORK, - id: batch_id, - block: BlockHash(block_hash), - instructions: vec![instruction], - }); - } - } - - // Save the next batch ID - NextBatchDb::set(txn, &(batch_id + 1)); - - ( - block_number, - MultisigEvent::Batches( - if is_retirement_block { - Some((self.existing.as_ref().unwrap().key, self.new.as_ref().unwrap().key)) - } else { - None - }, - batches, - ), - ) - } - - // This must be emitted before ScannerEvent::Block for all completions of known Eventualities - // within the block. Unknown Eventualities may have their Completed events emitted after - // ScannerEvent::Block however. - ScannerEvent::Completed(key, block_number, id, tx_id, completion) => { - ResolvedDb::resolve_plan::(txn, &key, id, &tx_id); - (block_number, MultisigEvent::Completed(key, id, completion)) - } - }; - - // If we either received a Block event (which will be the trigger when we have no - // Plans/Eventualities leading into ClosingExisting), or we received the last Completed for - // this multisig, set its retirement block - let existing = self.existing.as_ref().unwrap(); - - // This multisig is closing - let closing = self.current_rotation_step(block_number) == RotationStep::ClosingExisting; - // There's nothing left in its Scheduler. This call is safe as: - // 1) When ClosingExisting, all outputs should've been already forwarded, preventing - // new UTXOs from accumulating. - // 2) No new payments should be issued. - // 3) While there may be plans, they'll be dropped to create Eventualities. - // If this Eventuality is resolved, the Plan has already been dropped. - // 4) If this Eventuality will trigger a Plan, it'll still be in the plans HashMap. - let scheduler_is_empty = closing && existing.scheduler.empty(); - // Nothing is still being signed - let no_active_plans = scheduler_is_empty && - PlanDb::active_plans::(txn, existing.key.to_bytes().as_ref()).is_empty(); - - self - .scanner - .multisig_completed - // The above explicitly included their predecessor to ensure short-circuiting, yet their - // names aren't defined as an aggregate check. Still including all three here ensures all are - // used in the final value - .send(closing && scheduler_is_empty && no_active_plans) - .unwrap(); - - event - } - - pub async fn next_scanner_event(&mut self) -> ScannerEvent { - self.scanner.events.recv().await.unwrap() - } + Completed(Vec, [u8; 32], (reader: &mut R) -> io::Result; - fn write(&self, writer: &mut W) -> io::Result<()>; -} - -impl SchedulerAddendum for () { - fn read(_: &mut R) -> io::Result { - Ok(()) - } - fn write(&self, _: &mut W) -> io::Result<()> { - Ok(()) - } -} - -pub trait Scheduler: Sized + Clone + PartialEq + Debug { - type Addendum: SchedulerAddendum; - - /// Check if this Scheduler is empty. - fn empty(&self) -> bool; - - /// Create a new Scheduler. - fn new( - txn: &mut D::Transaction<'_>, - key: ::G, - network: NetworkId, - ) -> Self; - - /// Load a Scheduler from the DB. - fn from_db( - db: &D, - key: ::G, - network: NetworkId, - ) -> io::Result; - - /// Check if a branch is usable. - fn can_use_branch(&self, balance: Balance) -> bool; - - /// Schedule a series of outputs/payments. - fn schedule( - &mut self, - txn: &mut D::Transaction<'_>, - utxos: Vec, - payments: Vec>, - // TODO: Tighten this to multisig_for_any_change - key_for_any_change: ::G, - force_spend: bool, - ) -> Vec>; - - /// Consume all payments still pending within this Scheduler, without scheduling them. - fn consume_payments(&mut self, txn: &mut D::Transaction<'_>) -> Vec>; - - /// Note a branch output as having been created, with the amount it was actually created with, - /// or not having been created due to being too small. - fn created_output( - &mut self, - txn: &mut D::Transaction<'_>, - expected: u64, - actual: Option, - ); - - /// Refund a specific output. - fn refund_plan( - &mut self, - txn: &mut D::Transaction<'_>, - output: N::Output, - refund_to: N::Address, - ) -> Plan; - - /// Shim the forwarding Plan as necessary to obtain a fee estimate. - /// - /// If this Scheduler is for a Network which requires forwarding, this must return Some with a - /// plan with identical fee behavior. If forwarding isn't necessary, returns None. - fn shim_forward_plan(output: N::Output, to: ::G) -> Option>; - - /// Forward a specific output to the new multisig. - /// - /// Returns None if no forwarding is necessary. Must return Some if forwarding is necessary. - fn forward_plan( - &mut self, - txn: &mut D::Transaction<'_>, - output: N::Output, - to: ::G, - ) -> Option>; -} diff --git a/processor/src/multisigs/scheduler/utxo.rs b/processor/src/multisigs/scheduler/utxo.rs deleted file mode 100644 index 1865cab9..00000000 --- a/processor/src/multisigs/scheduler/utxo.rs +++ /dev/null @@ -1,631 +0,0 @@ -use std::{ - io::{self, Read}, - collections::{VecDeque, HashMap}, -}; - -use ciphersuite::{group::GroupEncoding, Ciphersuite}; - -use serai_client::primitives::{NetworkId, Coin, Amount, Balance}; - -use crate::{ - DbTxn, Db, Payment, Plan, - networks::{OutputType, Output, Network, UtxoNetwork}, - multisigs::scheduler::Scheduler as SchedulerTrait, -}; - -/// Deterministic output/payment manager. -#[derive(Clone, PartialEq, Eq, Debug)] -pub struct Scheduler { - key: ::G, - coin: Coin, - - // Serai, when it has more outputs expected than it can handle in a single transaction, will - // schedule the outputs to be handled later. Immediately, it just creates additional outputs - // which will eventually handle those outputs - // - // These maps map output amounts, which we'll receive in the future, to the payments they should - // be used on - // - // When those output amounts appear, their payments should be scheduled - // The Vec is for all payments that should be done per output instance - // The VecDeque allows multiple sets of payments with the same sum amount to properly co-exist - // - // queued_plans are for outputs which we will create, yet when created, will have their amount - // reduced by the fee it cost to be created. The Scheduler will then be told how what amount the - // output actually has, and it'll be moved into plans - queued_plans: HashMap>>>, - plans: HashMap>>>, - - // UTXOs available - utxos: Vec, - - // Payments awaiting scheduling due to the output availability problem - payments: VecDeque>, -} - -fn scheduler_key(key: &G) -> Vec { - D::key(b"SCHEDULER", b"scheduler", key.to_bytes()) -} - -impl> Scheduler { - pub fn empty(&self) -> bool { - self.queued_plans.is_empty() && - self.plans.is_empty() && - self.utxos.is_empty() && - self.payments.is_empty() - } - - fn read( - key: ::G, - coin: Coin, - reader: &mut R, - ) -> io::Result { - let mut read_plans = || -> io::Result<_> { - let mut all_plans = HashMap::new(); - let mut all_plans_len = [0; 4]; - reader.read_exact(&mut all_plans_len)?; - for _ in 0 .. u32::from_le_bytes(all_plans_len) { - let mut amount = [0; 8]; - reader.read_exact(&mut amount)?; - let amount = u64::from_le_bytes(amount); - - let mut plans = VecDeque::new(); - let mut plans_len = [0; 4]; - reader.read_exact(&mut plans_len)?; - for _ in 0 .. u32::from_le_bytes(plans_len) { - let mut payments = vec![]; - let mut payments_len = [0; 4]; - reader.read_exact(&mut payments_len)?; - - for _ in 0 .. u32::from_le_bytes(payments_len) { - payments.push(Payment::read(reader)?); - } - plans.push_back(payments); - } - all_plans.insert(amount, plans); - } - Ok(all_plans) - }; - let queued_plans = read_plans()?; - let plans = read_plans()?; - - let mut utxos = vec![]; - let mut utxos_len = [0; 4]; - reader.read_exact(&mut utxos_len)?; - for _ in 0 .. u32::from_le_bytes(utxos_len) { - utxos.push(N::Output::read(reader)?); - } - - let mut payments = VecDeque::new(); - let mut payments_len = [0; 4]; - reader.read_exact(&mut payments_len)?; - for _ in 0 .. u32::from_le_bytes(payments_len) { - payments.push_back(Payment::read(reader)?); - } - - Ok(Scheduler { key, coin, queued_plans, plans, utxos, payments }) - } - - // TODO2: Get rid of this - // We reserialize the entire scheduler on any mutation to save it to the DB which is horrible - // We should have an incremental solution - fn serialize(&self) -> Vec { - let mut res = Vec::with_capacity(4096); - - let mut write_plans = |plans: &HashMap>>>| { - res.extend(u32::try_from(plans.len()).unwrap().to_le_bytes()); - for (amount, list_of_plans) in plans { - res.extend(amount.to_le_bytes()); - res.extend(u32::try_from(list_of_plans.len()).unwrap().to_le_bytes()); - for plan in list_of_plans { - res.extend(u32::try_from(plan.len()).unwrap().to_le_bytes()); - for payment in plan { - payment.write(&mut res).unwrap(); - } - } - } - }; - write_plans(&self.queued_plans); - write_plans(&self.plans); - - res.extend(u32::try_from(self.utxos.len()).unwrap().to_le_bytes()); - for utxo in &self.utxos { - utxo.write(&mut res).unwrap(); - } - - res.extend(u32::try_from(self.payments.len()).unwrap().to_le_bytes()); - for payment in &self.payments { - payment.write(&mut res).unwrap(); - } - - debug_assert_eq!(&Self::read(self.key, self.coin, &mut res.as_slice()).unwrap(), self); - res - } - - pub fn new( - txn: &mut D::Transaction<'_>, - key: ::G, - network: NetworkId, - ) -> Self { - assert!(N::branch_address(key).is_some()); - assert!(N::change_address(key).is_some()); - assert!(N::forward_address(key).is_some()); - - let coin = { - let coins = network.coins(); - assert_eq!(coins.len(), 1); - coins[0] - }; - - let res = Scheduler { - key, - coin, - queued_plans: HashMap::new(), - plans: HashMap::new(), - utxos: vec![], - payments: VecDeque::new(), - }; - // Save it to disk so from_db won't panic if we don't mutate it before rebooting - txn.put(scheduler_key::(&res.key), res.serialize()); - res - } - - pub fn from_db( - db: &D, - key: ::G, - network: NetworkId, - ) -> io::Result { - let coin = { - let coins = network.coins(); - assert_eq!(coins.len(), 1); - coins[0] - }; - - let scheduler = db.get(scheduler_key::(&key)).unwrap_or_else(|| { - panic!("loading scheduler from DB without scheduler for {}", hex::encode(key.to_bytes())) - }); - let mut reader_slice = scheduler.as_slice(); - let reader = &mut reader_slice; - - Self::read(key, coin, reader) - } - - pub fn can_use_branch(&self, balance: Balance) -> bool { - assert_eq!(balance.coin, self.coin); - self.plans.contains_key(&balance.amount.0) - } - - fn execute( - &mut self, - inputs: Vec, - mut payments: Vec>, - key_for_any_change: ::G, - ) -> Plan { - let mut change = false; - let mut max = N::MAX_OUTPUTS; - - let payment_amounts = |payments: &Vec>| { - payments.iter().map(|payment| payment.balance.amount.0).sum::() - }; - - // Requires a change output - if inputs.iter().map(|output| output.balance().amount.0).sum::() != - payment_amounts(&payments) - { - change = true; - max -= 1; - } - - let mut add_plan = |payments| { - let amount = payment_amounts(&payments); - self.queued_plans.entry(amount).or_insert(VecDeque::new()).push_back(payments); - amount - }; - - let branch_address = N::branch_address(self.key).unwrap(); - - // If we have more payments than we can handle in a single TX, create plans for them - // TODO2: This isn't perfect. For 258 outputs, and a MAX_OUTPUTS of 16, this will create: - // 15 branches of 16 leaves - // 1 branch of: - // - 1 branch of 16 leaves - // - 2 leaves - // If this was perfect, the heaviest branch would have 1 branch of 3 leaves and 15 leaves - while payments.len() > max { - // The resulting TX will have the remaining payments and a new branch payment - let to_remove = (payments.len() + 1) - N::MAX_OUTPUTS; - // Don't remove more than possible - let to_remove = to_remove.min(N::MAX_OUTPUTS); - - // Create the plan - let removed = payments.drain((payments.len() - to_remove) ..).collect::>(); - assert_eq!(removed.len(), to_remove); - let amount = add_plan(removed); - - // Create the payment for the plan - // Push it to the front so it's not moved into a branch until all lower-depth items are - payments.insert( - 0, - Payment { - address: branch_address.clone(), - data: None, - balance: Balance { coin: self.coin, amount: Amount(amount) }, - }, - ); - } - - Plan { - key: self.key, - inputs, - payments, - change: Some(N::change_address(key_for_any_change).unwrap()).filter(|_| change), - scheduler_addendum: (), - } - } - - fn add_outputs( - &mut self, - mut utxos: Vec, - key_for_any_change: ::G, - ) -> Vec> { - log::info!("adding {} outputs", utxos.len()); - - let mut txs = vec![]; - - for utxo in utxos.drain(..) { - if utxo.kind() == OutputType::Branch { - let amount = utxo.balance().amount.0; - if let Some(plans) = self.plans.get_mut(&amount) { - // Execute the first set of payments possible with an output of this amount - let payments = plans.pop_front().unwrap(); - // They won't be equal if we dropped payments due to being dust - assert!(amount >= payments.iter().map(|payment| payment.balance.amount.0).sum::()); - - // If we've grabbed the last plan for this output amount, remove it from the map - if plans.is_empty() { - self.plans.remove(&amount); - } - - // Create a TX for these payments - txs.push(self.execute(vec![utxo], payments, key_for_any_change)); - continue; - } - } - - self.utxos.push(utxo); - } - - log::info!("{} planned TXs have had their required inputs confirmed", txs.len()); - txs - } - - // Schedule a series of outputs/payments. - pub fn schedule( - &mut self, - txn: &mut D::Transaction<'_>, - utxos: Vec, - mut payments: Vec>, - key_for_any_change: ::G, - force_spend: bool, - ) -> Vec> { - for utxo in &utxos { - assert_eq!(utxo.balance().coin, self.coin); - } - for payment in &payments { - assert_eq!(payment.balance.coin, self.coin); - } - - // Drop payments to our own branch address - /* - created_output will be called any time we send to a branch address. If it's called, and it - wasn't expecting to be called, that's almost certainly an error. The only way to guarantee - this however is to only have us send to a branch address when creating a branch, hence the - dropping of pointless payments. - - This is not comprehensive as a payment may still be made to another active multisig's branch - address, depending on timing. This is safe as the issue only occurs when a multisig sends to - its *own* branch address, since created_output is called on the signer's Scheduler. - */ - { - let branch_address = N::branch_address(self.key).unwrap(); - payments = - payments.drain(..).filter(|payment| payment.address != branch_address).collect::>(); - } - - let mut plans = self.add_outputs(utxos, key_for_any_change); - - log::info!("scheduling {} new payments", payments.len()); - - // Add all new payments to the list of pending payments - self.payments.extend(payments); - let payments_at_start = self.payments.len(); - log::info!("{} payments are now scheduled", payments_at_start); - - // If we don't have UTXOs available, don't try to continue - if self.utxos.is_empty() { - log::info!("no utxos currently available"); - return plans; - } - - // Sort UTXOs so the highest valued ones are first - self.utxos.sort_by(|a, b| a.balance().amount.0.cmp(&b.balance().amount.0).reverse()); - - // We always want to aggregate our UTXOs into a single UTXO in the name of simplicity - // We may have more UTXOs than will fit into a TX though - // We use the most valuable UTXOs to handle our current payments, and we return aggregation TXs - // for the rest of the inputs - // Since we do multiple aggregation TXs at once, this will execute in logarithmic time - let utxos = self.utxos.drain(..).collect::>(); - let mut utxo_chunks = - utxos.chunks(N::MAX_INPUTS).map(<[::Output]>::to_vec).collect::>(); - - // Use the first chunk for any scheduled payments, since it has the most value - let utxos = utxo_chunks.remove(0); - - // If the last chunk exists and only has one output, don't try aggregating it - // Set it to be restored to UTXO set - let mut to_restore = None; - if let Some(mut chunk) = utxo_chunks.pop() { - if chunk.len() == 1 { - to_restore = Some(chunk.pop().unwrap()); - } else { - utxo_chunks.push(chunk); - } - } - - for chunk in utxo_chunks.drain(..) { - log::debug!("aggregating a chunk of {} inputs", chunk.len()); - plans.push(Plan { - key: self.key, - inputs: chunk, - payments: vec![], - change: Some(N::change_address(key_for_any_change).unwrap()), - scheduler_addendum: (), - }) - } - - // We want to use all possible UTXOs for all possible payments - let mut balance = utxos.iter().map(|output| output.balance().amount.0).sum::(); - - // If we can't fulfill the next payment, we have encountered an instance of the UTXO - // availability problem - // This shows up in networks like Monero, where because we spent outputs, our change has yet to - // re-appear. Since it has yet to re-appear, we only operate with a balance which is a subset - // of our total balance - // Despite this, we may be ordered to fulfill a payment which is our total balance - // The solution is to wait for the temporarily unavailable change outputs to re-appear, - // granting us access to our full balance - let mut executing = vec![]; - while !self.payments.is_empty() { - let amount = self.payments[0].balance.amount.0; - if balance.checked_sub(amount).is_some() { - balance -= amount; - executing.push(self.payments.pop_front().unwrap()); - } else { - // Doesn't check if other payments would fit into the current batch as doing so may never - // let enough inputs become simultaneously availabile to enable handling of payments[0] - break; - } - } - - // Now that we have the list of payments we can successfully handle right now, create the TX - // for them - if !executing.is_empty() { - plans.push(self.execute(utxos, executing, key_for_any_change)); - } else { - // If we don't have any payments to execute, save these UTXOs for later - self.utxos.extend(utxos); - } - - // If we're instructed to force a spend, do so - // This is used when an old multisig is retiring and we want to always transfer outputs to the - // new one, regardless if we currently have payments - if force_spend && (!self.utxos.is_empty()) { - assert!(self.utxos.len() <= N::MAX_INPUTS); - plans.push(Plan { - key: self.key, - inputs: self.utxos.drain(..).collect::>(), - payments: vec![], - change: Some(N::change_address(key_for_any_change).unwrap()), - scheduler_addendum: (), - }); - } - - // If there's a UTXO to restore, restore it - // This is done now as if there is a to_restore output, and it was inserted into self.utxos - // earlier, self.utxos.len() may become `N::MAX_INPUTS + 1` - // The prior block requires the len to be `<= N::MAX_INPUTS` - if let Some(to_restore) = to_restore { - self.utxos.push(to_restore); - } - - txn.put(scheduler_key::(&self.key), self.serialize()); - - log::info!( - "created {} plans containing {} payments to sign, with {} payments pending scheduling", - plans.len(), - payments_at_start - self.payments.len(), - self.payments.len(), - ); - plans - } - - pub fn consume_payments(&mut self, txn: &mut D::Transaction<'_>) -> Vec> { - let res: Vec<_> = self.payments.drain(..).collect(); - if !res.is_empty() { - txn.put(scheduler_key::(&self.key), self.serialize()); - } - res - } - - // Note a branch output as having been created, with the amount it was actually created with, - // or not having been created due to being too small - pub fn created_output( - &mut self, - txn: &mut D::Transaction<'_>, - expected: u64, - actual: Option, - ) { - log::debug!("output expected to have {} had {:?} after fees", expected, actual); - - // Get the payments this output is expected to handle - let queued = self.queued_plans.get_mut(&expected).unwrap(); - let mut payments = queued.pop_front().unwrap(); - assert_eq!(expected, payments.iter().map(|payment| payment.balance.amount.0).sum::()); - // If this was the last set of payments at this amount, remove it - if queued.is_empty() { - self.queued_plans.remove(&expected); - } - - // If we didn't actually create this output, return, dropping the child payments - let Some(actual) = actual else { return }; - - // Amortize the fee amongst all payments underneath this branch - { - let mut to_amortize = actual - expected; - // If the payments are worth less than this fee we need to amortize, return, dropping them - if payments.iter().map(|payment| payment.balance.amount.0).sum::() < to_amortize { - return; - } - while to_amortize != 0 { - let payments_len = u64::try_from(payments.len()).unwrap(); - let per_payment = to_amortize / payments_len; - let mut overage = to_amortize % payments_len; - - for payment in &mut payments { - let to_subtract = per_payment + overage; - // Only subtract the overage once - overage = 0; - - let subtractable = payment.balance.amount.0.min(to_subtract); - to_amortize -= subtractable; - payment.balance.amount.0 -= subtractable; - } - } - } - - // Drop payments now below the dust threshold - let payments = payments - .into_iter() - .filter(|payment| payment.balance.amount.0 >= N::DUST) - .collect::>(); - // Sanity check this was done properly - assert!(actual >= payments.iter().map(|payment| payment.balance.amount.0).sum::()); - - // If there's no payments left, return - if payments.is_empty() { - return; - } - - self.plans.entry(actual).or_insert(VecDeque::new()).push_back(payments); - - // TODO2: This shows how ridiculous the serialize function is - txn.put(scheduler_key::(&self.key), self.serialize()); - } -} - -impl> SchedulerTrait for Scheduler { - type Addendum = (); - - /// Check if this Scheduler is empty. - fn empty(&self) -> bool { - Scheduler::empty(self) - } - - /// Create a new Scheduler. - fn new( - txn: &mut D::Transaction<'_>, - key: ::G, - network: NetworkId, - ) -> Self { - Scheduler::new::(txn, key, network) - } - - /// Load a Scheduler from the DB. - fn from_db( - db: &D, - key: ::G, - network: NetworkId, - ) -> io::Result { - Scheduler::from_db::(db, key, network) - } - - /// Check if a branch is usable. - fn can_use_branch(&self, balance: Balance) -> bool { - Scheduler::can_use_branch(self, balance) - } - - /// Schedule a series of outputs/payments. - fn schedule( - &mut self, - txn: &mut D::Transaction<'_>, - utxos: Vec, - payments: Vec>, - key_for_any_change: ::G, - force_spend: bool, - ) -> Vec> { - Scheduler::schedule::(self, txn, utxos, payments, key_for_any_change, force_spend) - } - - /// Consume all payments still pending within this Scheduler, without scheduling them. - fn consume_payments(&mut self, txn: &mut D::Transaction<'_>) -> Vec> { - Scheduler::consume_payments::(self, txn) - } - - /// Note a branch output as having been created, with the amount it was actually created with, - /// or not having been created due to being too small. - // TODO: Move this to Balance. - fn created_output( - &mut self, - txn: &mut D::Transaction<'_>, - expected: u64, - actual: Option, - ) { - Scheduler::created_output::(self, txn, expected, actual) - } - - fn refund_plan( - &mut self, - _: &mut D::Transaction<'_>, - output: N::Output, - refund_to: N::Address, - ) -> Plan { - let output_id = output.id().as_ref().to_vec(); - let res = Plan { - key: output.key(), - // Uses a payment as this will still be successfully sent due to fee amortization, - // and because change is currently always a Serai key - payments: vec![Payment { address: refund_to, data: None, balance: output.balance() }], - inputs: vec![output], - change: None, - scheduler_addendum: (), - }; - log::info!("refund plan for {} has ID {}", hex::encode(output_id), hex::encode(res.id())); - res - } - - fn shim_forward_plan(output: N::Output, to: ::G) -> Option> { - Some(Plan { - key: output.key(), - payments: vec![Payment { - address: N::forward_address(to).unwrap(), - data: None, - balance: output.balance(), - }], - inputs: vec![output], - change: None, - scheduler_addendum: (), - }) - } - - fn forward_plan( - &mut self, - _: &mut D::Transaction<'_>, - output: N::Output, - to: ::G, - ) -> Option> { - assert_eq!(self.key, output.key()); - // Call shim as shim returns the actual - Self::shim_forward_plan(output, to) - } -} diff --git a/processor/src/plan.rs b/processor/src/plan.rs deleted file mode 100644 index 58a8a5e1..00000000 --- a/processor/src/plan.rs +++ /dev/null @@ -1,212 +0,0 @@ -use std::io; - -use scale::{Encode, Decode}; - -use transcript::{Transcript, RecommendedTranscript}; -use ciphersuite::group::GroupEncoding; -use frost::curve::Ciphersuite; - -use serai_client::primitives::Balance; - -use crate::{ - networks::{Output, Network}, - multisigs::scheduler::{SchedulerAddendum, Scheduler}, -}; - -#[derive(Clone, PartialEq, Eq, Debug)] -pub struct Payment { - pub address: N::Address, - pub data: Option>, - pub balance: Balance, -} - -impl Payment { - pub fn transcript(&self, transcript: &mut T) { - transcript.domain_separate(b"payment"); - transcript.append_message(b"address", self.address.to_string().as_bytes()); - if let Some(data) = self.data.as_ref() { - transcript.append_message(b"data", data); - } - transcript.append_message(b"coin", self.balance.coin.encode()); - transcript.append_message(b"amount", self.balance.amount.0.to_le_bytes()); - } - - pub fn write(&self, writer: &mut W) -> io::Result<()> { - // TODO: Don't allow creating Payments with an Address which can't be serialized - let address: Vec = self - .address - .clone() - .try_into() - .map_err(|_| io::Error::other("address couldn't be serialized"))?; - writer.write_all(&u32::try_from(address.len()).unwrap().to_le_bytes())?; - writer.write_all(&address)?; - - writer.write_all(&[u8::from(self.data.is_some())])?; - if let Some(data) = &self.data { - writer.write_all(&u32::try_from(data.len()).unwrap().to_le_bytes())?; - writer.write_all(data)?; - } - - writer.write_all(&self.balance.encode()) - } - - pub fn read(reader: &mut R) -> io::Result { - let mut buf = [0; 4]; - reader.read_exact(&mut buf)?; - let mut address = vec![0; usize::try_from(u32::from_le_bytes(buf)).unwrap()]; - reader.read_exact(&mut address)?; - let address = N::Address::try_from(address).map_err(|_| io::Error::other("invalid address"))?; - - let mut buf = [0; 1]; - reader.read_exact(&mut buf)?; - let data = if buf[0] == 1 { - let mut buf = [0; 4]; - reader.read_exact(&mut buf)?; - let mut data = vec![0; usize::try_from(u32::from_le_bytes(buf)).unwrap()]; - reader.read_exact(&mut data)?; - Some(data) - } else { - None - }; - - let balance = Balance::decode(&mut scale::IoReader(reader)) - .map_err(|_| io::Error::other("invalid balance"))?; - - Ok(Payment { address, data, balance }) - } -} - -#[derive(Clone, PartialEq)] -pub struct Plan { - pub key: ::G, - pub inputs: Vec, - /// The payments this Plan is intended to create. - /// - /// This should only contain payments leaving Serai. While it is acceptable for users to enter - /// Serai's address(es) as the payment address, as that'll be handled by anything which expects - /// certain properties, Serai as a system MUST NOT use payments for internal transfers. Doing - /// so will cause a reduction in their value by the TX fee/operating costs, creating an - /// incomplete transfer. - pub payments: Vec>, - /// The change this Plan should use. - /// - /// This MUST contain a Serai address. Operating costs may be deducted from the payments in this - /// Plan on the premise that the change address is Serai's, and accordingly, Serai will recoup - /// the operating costs. - // - // TODO: Consider moving to ::G? - pub change: Option, - /// The scheduler's additional data. - pub scheduler_addendum: >::Addendum, -} -impl core::fmt::Debug for Plan { - fn fmt(&self, fmt: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> { - fmt - .debug_struct("Plan") - .field("key", &hex::encode(self.key.to_bytes())) - .field("inputs", &self.inputs) - .field("payments", &self.payments) - .field("change", &self.change.as_ref().map(ToString::to_string)) - .field("scheduler_addendum", &self.scheduler_addendum) - .finish() - } -} - -impl Plan { - pub fn transcript(&self) -> RecommendedTranscript { - let mut transcript = RecommendedTranscript::new(b"Serai Processor Plan ID"); - transcript.domain_separate(b"meta"); - transcript.append_message(b"network", N::ID); - transcript.append_message(b"key", self.key.to_bytes()); - - transcript.domain_separate(b"inputs"); - for input in &self.inputs { - transcript.append_message(b"input", input.id()); - } - - transcript.domain_separate(b"payments"); - for payment in &self.payments { - payment.transcript(&mut transcript); - } - - if let Some(change) = &self.change { - transcript.append_message(b"change", change.to_string()); - } - - let mut addendum_bytes = vec![]; - self.scheduler_addendum.write(&mut addendum_bytes).unwrap(); - transcript.append_message(b"scheduler_addendum", addendum_bytes); - - transcript - } - - pub fn id(&self) -> [u8; 32] { - let challenge = self.transcript().challenge(b"id"); - let mut res = [0; 32]; - res.copy_from_slice(&challenge[.. 32]); - res - } - - pub fn write(&self, writer: &mut W) -> io::Result<()> { - writer.write_all(self.key.to_bytes().as_ref())?; - - writer.write_all(&u32::try_from(self.inputs.len()).unwrap().to_le_bytes())?; - for input in &self.inputs { - input.write(writer)?; - } - - writer.write_all(&u32::try_from(self.payments.len()).unwrap().to_le_bytes())?; - for payment in &self.payments { - payment.write(writer)?; - } - - // TODO: Have Plan construction fail if change cannot be serialized - let change = if let Some(change) = &self.change { - change.clone().try_into().map_err(|_| { - io::Error::other(format!( - "an address we said to use as change couldn't be converted to a Vec: {}", - change.to_string(), - )) - })? - } else { - vec![] - }; - assert!(serai_client::primitives::MAX_ADDRESS_LEN <= u8::MAX.into()); - writer.write_all(&[u8::try_from(change.len()).unwrap()])?; - writer.write_all(&change)?; - self.scheduler_addendum.write(writer) - } - - pub fn read(reader: &mut R) -> io::Result { - let key = N::Curve::read_G(reader)?; - - let mut inputs = vec![]; - let mut buf = [0; 4]; - reader.read_exact(&mut buf)?; - for _ in 0 .. u32::from_le_bytes(buf) { - inputs.push(N::Output::read(reader)?); - } - - let mut payments = vec![]; - reader.read_exact(&mut buf)?; - for _ in 0 .. u32::from_le_bytes(buf) { - payments.push(Payment::::read(reader)?); - } - - let mut len = [0; 1]; - reader.read_exact(&mut len)?; - let mut change = vec![0; usize::from(len[0])]; - reader.read_exact(&mut change)?; - let change = - if change.is_empty() { - None - } else { - Some(N::Address::try_from(change).map_err(|_| { - io::Error::other("couldn't deserialize an Address serialized into a Plan") - })?) - }; - - let scheduler_addendum = >::Addendum::read(reader)?; - Ok(Plan { key, inputs, payments, change, scheduler_addendum }) - } -}