From 0601d477898ca01a4b87027bf9cf1d2bdb434ff2 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Tue, 3 Sep 2024 18:51:27 -0400 Subject: [PATCH] Work on the tree logic in the transaction-chaining scheduler --- .../scheduler/utxo/primitives/src/lib.rs | 26 +- .../utxo/transaction-chaining/src/lib.rs | 276 +++++++++++------- 2 files changed, 193 insertions(+), 109 deletions(-) diff --git a/processor/scheduler/utxo/primitives/src/lib.rs b/processor/scheduler/utxo/primitives/src/lib.rs index f3e220b0..af8b985f 100644 --- a/processor/scheduler/utxo/primitives/src/lib.rs +++ b/processor/scheduler/utxo/primitives/src/lib.rs @@ -79,7 +79,8 @@ pub trait TransactionPlanner: 'static + Send + Sync { /// /// `operating_costs` is accrued to if Serai faces the burden of a fee or drops inputs not worth /// accumulating. `operating_costs` will be amortized along with this transaction's fee as - /// possible. Please see `spec/processor/UTXO Management.md` for more information. + /// possible, if there is a change output. Please see `spec/processor/UTXO Management.md` for + /// more information. /// /// Returns `None` if the fee exceeded the inputs, or `Some` otherwise. fn plan_transaction_with_fee_amortization( @@ -89,6 +90,12 @@ pub trait TransactionPlanner: 'static + Send + Sync { mut payments: Vec>>, mut change: Option>, ) -> Option> { + // If there's no change output, we can't recoup any operating costs we would amortize + // We also don't have any losses if the inputs are written off/the change output is reduced + let mut operating_costs_if_no_change = 0; + let operating_costs_in_effect = + if change.is_none() { &mut operating_costs_if_no_change } else { operating_costs }; + // Sanity checks { assert!(!inputs.is_empty()); @@ -101,7 +108,8 @@ pub trait TransactionPlanner: 'static + Send + Sync { assert_eq!(coin, payment.balance().coin); } assert!( - (inputs.iter().map(|input| input.balance().amount.0).sum::() + *operating_costs) >= + (inputs.iter().map(|input| input.balance().amount.0).sum::() + + *operating_costs_in_effect) >= payments.iter().map(|payment| payment.balance().amount.0).sum::(), "attempted to fulfill payments without a sufficient input set" ); @@ -119,7 +127,7 @@ pub trait TransactionPlanner: 'static + Send + Sync { while !payments.is_empty() { // We need to pay the fee, and any accrued operating costs, minus what we've already // amortized - let adjusted_fee = (*operating_costs + fee).saturating_sub(amortized); + let adjusted_fee = (*operating_costs_in_effect + fee).saturating_sub(amortized); /* Ideally, we wouldn't use a ceil div yet would be accurate about it. Any remainder could @@ -154,16 +162,16 @@ pub trait TransactionPlanner: 'static + Send + Sync { // dust if inputs < (fee + S::dust(coin).0) { // Write off these inputs - *operating_costs += inputs; + *operating_costs_in_effect += inputs; // Yet also claw back the payments we dropped, as we only lost the change // The dropped payments will be worth less than the inputs + operating_costs we started // with, so this shouldn't use `saturating_sub` - *operating_costs -= amortized; + *operating_costs_in_effect -= amortized; None?; } } else { // Since we have payments which can pay the fee we ended up with, amortize it - let adjusted_fee = (*operating_costs + fee).saturating_sub(amortized); + let adjusted_fee = (*operating_costs_in_effect + fee).saturating_sub(amortized); let per_payment_base_fee = adjusted_fee / u64::try_from(payments.len()).unwrap(); let payments_paying_one_atomic_unit_more = usize::try_from(adjusted_fee % u64::try_from(payments.len()).unwrap()).unwrap(); @@ -174,7 +182,7 @@ pub trait TransactionPlanner: 'static + Send + Sync { payment.balance().amount.0 -= per_payment_fee; amortized += per_payment_fee; } - assert!(amortized >= (*operating_costs + fee)); + assert!(amortized >= (*operating_costs_in_effect + fee)); // If the change is less than the dust, drop it let would_be_change = inputs.iter().map(|input| input.balance().amount.0).sum::() - @@ -182,12 +190,12 @@ pub trait TransactionPlanner: 'static + Send + Sync { fee; if would_be_change < S::dust(coin).0 { change = None; - *operating_costs += would_be_change; + *operating_costs_in_effect += would_be_change; } } // Update the amount of operating costs - *operating_costs = (*operating_costs + fee).saturating_sub(amortized); + *operating_costs_in_effect = (*operating_costs_in_effect + fee).saturating_sub(amortized); } // Because we amortized, or accrued as operating costs, the fee, make the transaction diff --git a/processor/scheduler/utxo/transaction-chaining/src/lib.rs b/processor/scheduler/utxo/transaction-chaining/src/lib.rs index f74e2c2c..8e567e14 100644 --- a/processor/scheduler/utxo/transaction-chaining/src/lib.rs +++ b/processor/scheduler/utxo/transaction-chaining/src/lib.rs @@ -7,7 +7,7 @@ use std::collections::HashMap; use group::GroupEncoding; -use serai_primitives::{Coin, Amount}; +use serai_primitives::{Coin, Amount, Balance}; use serai_db::DbTxn; @@ -41,12 +41,56 @@ impl>> Sched ) -> Vec> { let mut eventualities = vec![]; + let mut accumulate_outputs = |txn, outputs: Vec>| { + let mut outputs_by_key = HashMap::new(); + for output in outputs { + Db::::set_already_accumulated_output(txn, output.id()); + let coin = output.balance().coin; + outputs_by_key + .entry((output.key().to_bytes().as_ref().to_vec(), coin)) + .or_insert_with(|| (output.key(), Db::::outputs(txn, output.key(), coin).unwrap())) + .1 + .push(output); + } + for ((_key_vec, coin), (key, outputs)) in outputs_by_key { + Db::::set_outputs(txn, key, coin, &outputs); + } + }; + for coin in S::NETWORK.coins() { // Fetch our operating costs and all our outputs let mut operating_costs = Db::::operating_costs(txn, *coin).0; let mut outputs = Db::::outputs(txn, key, *coin).unwrap(); - // Fetch the queued payments + // If we have more than the maximum amount of inputs, aggregate until we don't + { + while outputs.len() > MAX_INPUTS { + let Some(planned) = P::plan_transaction_with_fee_amortization( + &mut operating_costs, + fee_rates[coin], + outputs.drain(.. MAX_INPUTS).collect::>(), + vec![], + Some(key_for_change), + ) else { + // We amortized all payments, and even when just trying to make the change output, these + // inputs couldn't afford their own aggregation and were written off + Db::::set_operating_costs(txn, *coin, Amount(operating_costs)); + continue; + }; + + // Send the transactions off for signing + TransactionsToSign::::send(txn, &key, &planned.signable); + // Push the Eventualities onto the result + eventualities.push(planned.eventuality); + // Accumulate the outputs + Db::set_outputs(txn, key, *coin, &outputs); + accumulate_outputs(txn, planned.auxilliary.0); + outputs = Db::outputs(txn, key, *coin).unwrap(); + } + Db::::set_operating_costs(txn, *coin, Amount(operating_costs)); + } + + // Now, handle the payments let mut payments = Db::::queued_payments(txn, key, *coin).unwrap(); if payments.is_empty() { continue; @@ -55,21 +99,24 @@ impl>> Sched // If this is our only key, our outputs and operating costs should be greater than the // payments' value if active_keys.len() == 1 { - // The available amount of fulfill is the amount we have plus the amount we'll reduce by + // The available amount to fulfill is the amount we have plus the amount we'll reduce by // An alternative formulation would be `outputs >= (payments - operating costs)`, but // that'd risk underflow - let available = + let value_available = operating_costs + outputs.iter().map(|output| output.balance().amount.0).sum::(); + assert!( - available >= payments.iter().map(|payment| payment.balance().amount.0).sum::() + value_available >= payments.iter().map(|payment| payment.balance().amount.0).sum::() ); } - let amount_of_payments_that_can_be_handled = - |operating_costs: u64, outputs: &[_], payments: &[_]| { - let value_available = - operating_costs + outputs.iter().map(|output| output.balance().amount.0).sum::(); + // Find the set of payments we should fulfill at this time + loop { + let value_available = + operating_costs + outputs.iter().map(|output| output.balance().amount.0).sum::(); + // Drop to just the payments we currently have the outputs for + { let mut can_handle = 0; let mut value_used = 0; for payment in payments { @@ -80,15 +127,6 @@ impl>> Sched can_handle += 1; } - can_handle - }; - - // Find the set of payments we should fulfill at this time - { - // Drop to just the payments we currently have the outputs for - { - let can_handle = - amount_of_payments_that_can_be_handled(operating_costs, &outputs, &payments); let remaining_payments = payments.drain(can_handle ..).collect::>(); // Restore the rest to the database Db::::set_queued_payments(txn, key, *coin, &remaining_payments); @@ -99,96 +137,132 @@ impl>> Sched if payments_value <= operating_costs { operating_costs -= payments_value; Db::::set_operating_costs(txn, *coin, Amount(operating_costs)); - return vec![]; - } - // We explicitly sort AFTER deciding which payments to handle so we always handle the - // oldest queued payments first (preventing any from eternally being shuffled to the back - // of the line) - payments.sort_by(|a, b| a.balance().amount.0.cmp(&b.balance().amount.0)); - } - assert!(!payments.is_empty()); - - // Find the smallest set of outputs usable to fulfill these outputs - // Size is determined by the largest output, not quantity nor aggregate value - { - // We start by sorting low to high - outputs.sort_by(|a, b| a.balance().amount.0.cmp(&b.balance().amount.0)); - - let value_needed = - payments.iter().map(|payment| payment.balance().amount.0).sum::() - operating_costs; - - let mut needed = 0; - let mut value_present = 0; - for output in &outputs { - needed += 1; - value_present += output.balance().amount.0; - if value_present >= value_needed { + // Reset payments to the queued payments + payments = Db::::queued_payments(txn, key, *coin).unwrap(); + // If there's no more payments, stop looking for which payments we should fulfill + if payments.is_empty() { break; } - } - // Drain, and save back to the DB, the unnecessary outputs - let remaining_outputs = outputs.drain(needed ..).collect::>(); - Db::::set_outputs(txn, key, *coin, &remaining_outputs); - } - assert!(!outputs.is_empty()); - - // We now have the current operating costs, the outputs we're using, and the payments - // The database has the unused outputs/unfilfillable payments - // Actually plan/send off the transactions - - // While our set of outputs exceed the input limit, aggregate them - while outputs.len() > MAX_INPUTS { - let outputs_chunk = outputs.drain(.. MAX_INPUTS).collect::>(); - - // While we're aggregating these outputs, handle any payments we can - let payments_chunk = loop { - let can_handle = - amount_of_payments_that_can_be_handled(operating_costs, &outputs, &payments); - let payments_chunk = payments.drain(.. can_handle.min(MAX_OUTPUTS)).collect::>(); - - let payments_value = - payments_chunk.iter().map(|payment| payment.balance().amount.0).sum::(); - if payments_value <= operating_costs { - operating_costs -= payments_value; - continue; - } - break payments_chunk; - }; - - let Some(planned) = P::plan_transaction_with_fee_amortization( - &mut operating_costs, - fee_rates[coin], - outputs_chunk, - payments_chunk, - // We always use our key for the change here since we may need this change output to - // finish fulfilling these payments - Some(key), - ) else { - // We amortized all payments, and even when just trying to make the change output, these - // inputs couldn't afford their own aggregation and were written off + // Find which of these we should handle continue; - }; - - // Send the transactions off for signing - TransactionsToSign::::send(txn, &key, &planned.signable); - - // Push the Eventualities onto the result - eventualities.push(planned.eventuality); - - let mut effected_received_outputs = planned.auxilliary.0; - // Only handle Change so if someone burns to an External address, we don't use it here - // when the scanner will tell us to return it (without accumulating it) - effected_received_outputs.retain(|output| output.kind() == OutputType::Change); - for output in &effected_received_outputs { - Db::::set_already_accumulated_output(txn, output.id()); } - outputs.append(&mut effected_received_outputs); + + break; + } + if payments.is_empty() { + continue; } - // Now that we have an aggregated set of inputs, create the tree for payments - todo!("TODO"); + // Create a tree to fulfill all of the payments + struct TreeTransaction { + payments: Vec>>, + children: Vec>, + value: u64, + } + let mut tree_transactions = vec![]; + for payments in payments.chunks(MAX_OUTPUTS) { + let value = payments.iter().map(|payment| payment.balance().amount.0).sum::(); + tree_transactions.push(TreeTransaction:: { + payments: payments.to_vec(), + children: vec![], + value, + }); + } + // While we haven't calculated a tree root, or the tree root doesn't support a change output, + // keep working + while (tree_transactions.len() != 1) || (tree_transactions[0].payments.len() == MAX_OUTPUTS) { + let mut next_tree_transactions = vec![]; + for children in tree_transactions.chunks(MAX_OUTPUTS) { + let payments = children + .iter() + .map(|child| { + Payment::new( + P::branch_address(key), + Balance { coin: *coin, amount: Amount(child.value) }, + None, + ) + }) + .collect(); + let value = children.iter().map(|child| child.value).sum(); + next_tree_transactions.push(TreeTransaction { + payments, + children: children.to_vec(), + value, + }); + } + tree_transactions = next_tree_transactions; + } + assert_eq!(tree_transactions.len(), 1); + assert!((tree_transactions.payments.len() + 1) <= MAX_OUTPUTS); + + // Create the transaction for the root of the tree + let Some(planned) = P::plan_transaction_with_fee_amortization( + &mut operating_costs, + fee_rates[coin], + outputs, + tree_transactions.payments, + Some(key_for_change), + ) else { + Db::::set_operating_costs(txn, *coin, Amount(operating_costs)); + continue; + }; + TransactionsToSign::::send(txn, &key, &planned.signable); + eventualities.push(planned.eventuality); + + // We accumulate the change output, but consume the branches here + accumulate_outputs( + txn, + planned + .auxilliary + .0 + .iter() + .filter(|output| output.kind() == OutputType::Change) + .cloned() + .collect(), + ); + // Filter the outputs to the change outputs + let mut branch_outputs = planned.auxilliary.0; + branch_outputs.retain(|output| output.kind() == OutputType::Branch); + + // This is recursive, yet only recurses with logarithmic depth + let execute_tree_transaction = |branch_outputs, children| { + assert_eq!(branch_outputs.len(), children.len()); + + // Sort the branch outputs by their value + branch_outputs.sort_by(|a, b| a.balance().amount.0.cmp(&b.balance().amount.0)); + // Find the child for each branch output + // This is only done within a transaction, not across the layer, so we don't have branches + // created in transactions with less outputs (and therefore less fees) jump places with + // other branches + children.sort_by(|a, b| a.value.cmp(&b.value)); + + for (branch_output, child) in branch_outputs.into_iter().zip(children) { + assert_eq!(branch_output.kind(), OutputType::Branch); + Db::::set_already_accumulated_output(txn, branch_output.id()); + + let Some(planned) = P::plan_transaction_with_fee_amortization( + // Uses 0 as there's no operating costs to incur/amortize here + &mut 0, + fee_rates[coin], + vec![branch_output], + child.payments, + None, + ) else { + // This Branch isn't viable, so drop it (and its children) + continue; + }; + TransactionsToSign::::send(txn, &key, &planned.signable); + eventualities.push(planned.eventuality); + if !child.children.is_empty() { + execute_tree_transaction(planned.auxilliary.0, child.children); + } + } + }; + if !tree_transaction.children.is_empty() { + execute_tree_transaction(branch_outputs, tree_transaction.children); + } } eventualities @@ -288,6 +362,7 @@ impl>> Sched let Some(plan) = P::plan_transaction_with_fee_amortization( // This uses 0 for the operating costs as we don't incur any here + // If the output can't pay for itself to be forwarded, we simply drop it &mut 0, fee_rates[&forward.balance().coin], vec![forward.clone()], @@ -304,6 +379,7 @@ impl>> Sched Payment::new(to_return.address().clone(), to_return.output().balance(), None); let Some(plan) = P::plan_transaction_with_fee_amortization( // This uses 0 for the operating costs as we don't incur any here + // If the output can't pay for itself to be returned, we simply drop it &mut 0, fee_rates[&out_instruction.balance().coin], vec![to_return.output().clone()],