Add input aggregation in the transaction-chaining scheduler

Also handles some other misc in it.
This commit is contained in:
Luke Parker
2024-09-03 01:41:51 -04:00
parent 3c787e005f
commit 75b4707002
6 changed files with 268 additions and 70 deletions

1
Cargo.lock generated
View File

@@ -8741,6 +8741,7 @@ dependencies = [
"serai-primitives", "serai-primitives",
"serai-processor-primitives", "serai-processor-primitives",
"serai-processor-scanner", "serai-processor-scanner",
"serai-processor-scheduler-primitives",
] ]
[[package]] [[package]]

View File

@@ -23,3 +23,4 @@ serai-primitives = { path = "../../../../substrate/primitives", default-features
primitives = { package = "serai-processor-primitives", path = "../../../primitives" } primitives = { package = "serai-processor-primitives", path = "../../../primitives" }
scanner = { package = "serai-processor-scanner", path = "../../../scanner" } scanner = { package = "serai-processor-scanner", path = "../../../scanner" }
scheduler-primitives = { package = "serai-processor-scheduler-primitives", path = "../../primitives" }

View File

@@ -7,11 +7,22 @@ use core::fmt::Debug;
use serai_primitives::{Coin, Amount}; use serai_primitives::{Coin, Amount};
use primitives::{ReceivedOutput, Payment}; use primitives::{ReceivedOutput, Payment};
use scanner::{ScannerFeed, KeyFor, AddressFor, OutputFor}; use scanner::{ScannerFeed, KeyFor, AddressFor, OutputFor, EventualityFor};
use scheduler_primitives::*;
/// A planned transaction.
pub struct PlannedTransaction<S: ScannerFeed, ST: SignableTransaction, A> {
/// The signable transaction.
pub signable: ST,
/// The Eventuality to watch for.
pub eventuality: EventualityFor<S>,
/// The auxilliary data for this transaction.
pub auxilliary: A,
}
/// An object able to plan a transaction. /// An object able to plan a transaction.
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait TransactionPlanner<S: ScannerFeed>: 'static + Send + Sync { pub trait TransactionPlanner<S: ScannerFeed, A>: 'static + Send + Sync {
/// An error encountered when determining the fee rate. /// An error encountered when determining the fee rate.
/// ///
/// This MUST be an ephemeral error. Retrying fetching data from the blockchain MUST eventually /// This MUST be an ephemeral error. Retrying fetching data from the blockchain MUST eventually
@@ -21,8 +32,8 @@ pub trait TransactionPlanner<S: ScannerFeed>: 'static + Send + Sync {
/// The type representing a fee rate to use for transactions. /// The type representing a fee rate to use for transactions.
type FeeRate: Clone + Copy; type FeeRate: Clone + Copy;
/// The type representing a planned transaction. /// The type representing a signable transaction.
type PlannedTransaction; type SignableTransaction: SignableTransaction;
/// Obtain the fee rate to pay. /// Obtain the fee rate to pay.
/// ///
@@ -62,7 +73,7 @@ pub trait TransactionPlanner<S: ScannerFeed>: 'static + Send + Sync {
inputs: Vec<OutputFor<S>>, inputs: Vec<OutputFor<S>>,
payments: Vec<Payment<AddressFor<S>>>, payments: Vec<Payment<AddressFor<S>>>,
change: Option<KeyFor<S>>, change: Option<KeyFor<S>>,
) -> Self::PlannedTransaction; ) -> PlannedTransaction<S, Self::SignableTransaction, A>;
/// Obtain a PlannedTransaction via amortizing the fee over the payments. /// Obtain a PlannedTransaction via amortizing the fee over the payments.
/// ///
@@ -77,7 +88,7 @@ pub trait TransactionPlanner<S: ScannerFeed>: 'static + Send + Sync {
inputs: Vec<OutputFor<S>>, inputs: Vec<OutputFor<S>>,
mut payments: Vec<Payment<AddressFor<S>>>, mut payments: Vec<Payment<AddressFor<S>>>,
mut change: Option<KeyFor<S>>, mut change: Option<KeyFor<S>>,
) -> Option<Self::PlannedTransaction> { ) -> Option<PlannedTransaction<S, Self::SignableTransaction, A>> {
// Sanity checks // Sanity checks
{ {
assert!(!inputs.is_empty()); assert!(!inputs.is_empty());

View File

@@ -30,6 +30,6 @@ serai-primitives = { path = "../../../../substrate/primitives", default-features
serai-db = { path = "../../../../common/db" } serai-db = { path = "../../../../common/db" }
primitives = { package = "serai-processor-primitives", path = "../../../primitives" } primitives = { package = "serai-processor-primitives", path = "../../../primitives" }
scanner = { package = "serai-processor-scanner", path = "../../../scanner" }
scheduler-primitives = { package = "serai-processor-scheduler-primitives", path = "../../primitives" } scheduler-primitives = { package = "serai-processor-scheduler-primitives", path = "../../primitives" }
utxo-scheduler-primitives = { package = "serai-processor-utxo-scheduler-primitives", path = "../primitives" } utxo-scheduler-primitives = { package = "serai-processor-utxo-scheduler-primitives", path = "../primitives" }
scanner = { package = "serai-processor-scanner", path = "../../../scanner" }

View File

@@ -6,8 +6,8 @@ use serai_primitives::{Coin, Amount};
use serai_db::{Get, DbTxn, create_db}; use serai_db::{Get, DbTxn, create_db};
use primitives::ReceivedOutput; use primitives::{Payment, ReceivedOutput};
use scanner::{ScannerFeed, KeyFor, OutputFor}; use scanner::{ScannerFeed, KeyFor, AddressFor, OutputFor};
create_db! { create_db! {
TransactionChainingScheduler { TransactionChainingScheduler {
@@ -15,7 +15,7 @@ create_db! {
SerializedOutputs: (key: &[u8], coin: Coin) -> Vec<u8>, SerializedOutputs: (key: &[u8], coin: Coin) -> Vec<u8>,
// We should be immediately able to schedule the fulfillment of payments, yet this may not be // We should be immediately able to schedule the fulfillment of payments, yet this may not be
// possible if we're in the middle of a multisig rotation (as our output set will be split) // possible if we're in the middle of a multisig rotation (as our output set will be split)
SerializedQueuedPayments: (key: &[u8]) > Vec<u8>, SerializedQueuedPayments: (key: &[u8], coin: Coin) -> Vec<u8>,
} }
} }
@@ -61,13 +61,19 @@ impl<S: ScannerFeed> Db<S> {
pub(crate) fn queued_payments( pub(crate) fn queued_payments(
getter: &impl Get, getter: &impl Get,
key: KeyFor<S>, key: KeyFor<S>,
) -> Option<Vec<Payment<S>>> { coin: Coin,
) -> Option<Vec<Payment<AddressFor<S>>>> {
todo!("TODO") todo!("TODO")
} }
pub(crate) fn set_queued_payments(txn: &mut impl DbTxn, key: KeyFor<S>, queued: Vec<Payment<S>>) { pub(crate) fn set_queued_payments(
txn: &mut impl DbTxn,
key: KeyFor<S>,
coin: Coin,
queued: &Vec<Payment<AddressFor<S>>>,
) {
todo!("TODO") todo!("TODO")
} }
pub(crate) fn del_outputs(txn: &mut impl DbTxn, key: KeyFor<S>) { pub(crate) fn del_queued_payments(txn: &mut impl DbTxn, key: KeyFor<S>, coin: Coin) {
SerializedQueuedPayments::del(txn, key.to_bytes().as_ref()); SerializedQueuedPayments::del(txn, key.to_bytes().as_ref(), coin);
} }
} }

View File

@@ -7,11 +7,11 @@ use std::collections::HashMap;
use group::GroupEncoding; use group::GroupEncoding;
use serai_primitives::Coin; use serai_primitives::{Coin, Amount};
use serai_db::DbTxn; use serai_db::DbTxn;
use primitives::{ReceivedOutput, Payment}; use primitives::{OutputType, ReceivedOutput, Payment};
use scanner::{ use scanner::{
LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor, EventualityFor, SchedulerUpdate, LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor, EventualityFor, SchedulerUpdate,
Scheduler as SchedulerTrait, Scheduler as SchedulerTrait,
@@ -22,65 +22,205 @@ use utxo_scheduler_primitives::*;
mod db; mod db;
use db::Db; use db::Db;
/// A planned transaction. /// The outputs which will be effected by a PlannedTransaction and received by Serai.
pub struct PlannedTransaction<S: ScannerFeed, T> { pub struct EffectedReceivedOutputs<S: ScannerFeed>(Vec<OutputFor<S>>);
/// The signable transaction.
signable: T,
/// The outputs we'll receive from this.
effected_received_outputs: OutputFor<S>,
/// The Eventuality to watch for.
eventuality: EventualityFor<S>,
}
/// A scheduler of transactions for networks premised on the UTXO model which support /// A scheduler of transactions for networks premised on the UTXO model which support
/// transaction chaining. /// transaction chaining.
pub struct Scheduler< pub struct Scheduler<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>>(
S: ScannerFeed, PhantomData<S>,
T, PhantomData<P>,
P: TransactionPlanner<S, PlannedTransaction = PlannedTransaction<S, T>>, );
>(PhantomData<S>, PhantomData<T>, PhantomData<P>);
impl<S: ScannerFeed, T, P: TransactionPlanner<S, PlannedTransaction = PlannedTransaction<S, T>>> impl<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>> Scheduler<S, P> {
Scheduler<S, T, P> fn handle_queued_payments(
&mut self,
txn: &mut impl DbTxn,
active_keys: &[(KeyFor<S>, LifetimeStage)],
key: KeyFor<S>,
) -> Vec<EventualityFor<S>> {
let mut eventualities = vec![];
for coin in S::NETWORK.coins() {
// Fetch our operating costs and all our outputs
let mut operating_costs = Db::<S>::operating_costs(txn, *coin).0;
let mut outputs = Db::<S>::outputs(txn, key, *coin).unwrap();
// Fetch the queued payments
let mut payments = Db::<S>::queued_payments(txn, key, *coin).unwrap();
if payments.is_empty() {
continue;
}
// If this is our only key, our outputs and operating costs should be greater than the
// payments' value
if active_keys.len() == 1 {
// The available amount of fulfill is the amount we have plus the amount we'll reduce by
// An alternative formulation would be `outputs >= (payments - operating costs)`, but
// that'd risk underflow
let available =
operating_costs + outputs.iter().map(|output| output.balance().amount.0).sum::<u64>();
assert!(available >= payments.iter().map(|payment| payment.balance().amount.0).sum::<u64>());
}
let amount_of_payments_that_can_be_handled =
|operating_costs: u64, outputs: &[_], payments: &[_]| {
let value_available =
operating_costs + outputs.iter().map(|output| output.balance().amount.0).sum::<u64>();
let mut can_handle = 0;
let mut value_used = 0;
for payment in payments {
value_used += payment.balance().amount.0;
if value_available < value_used {
break;
}
can_handle += 1;
}
can_handle
};
// Find the set of payments we should fulfill at this time
{ {
fn accumulate_outputs(txn: &mut impl DbTxn, key: KeyFor<S>, outputs: &[OutputFor<S>]) { // Drop to just the payments we currently have the outputs for
// Accumulate them in memory {
let mut outputs_by_coin = HashMap::with_capacity(1); let can_handle =
for output in outputs.iter().filter(|output| output.key() == key) { amount_of_payments_that_can_be_handled(operating_costs, &outputs, &payments);
let coin = output.balance().coin; let remaining_payments = payments.drain(can_handle ..).collect::<Vec<_>>();
if let std::collections::hash_map::Entry::Vacant(e) = outputs_by_coin.entry(coin) { // Restore the rest to the database
e.insert(Db::<S>::outputs(txn, key, coin).unwrap()); Db::<S>::set_queued_payments(txn, key, *coin, &remaining_payments);
} }
outputs_by_coin.get_mut(&coin).unwrap().push(output.clone()); let payments_value = payments.iter().map(|payment| payment.balance().amount.0).sum::<u64>();
// If these payments are worth less than the operating costs, immediately drop them
if payments_value <= operating_costs {
operating_costs -= payments_value;
Db::<S>::set_operating_costs(txn, *coin, Amount(operating_costs));
return vec![];
} }
// Flush them to the database // We explicitly sort AFTER deciding which payments to handle so we always handle the
for (coin, outputs) in outputs_by_coin { // oldest queued payments first (preventing any from eternally being shuffled to the back
Db::<S>::set_outputs(txn, key, coin, &outputs); // of the line)
payments.sort_by(|a, b| a.balance().amount.0.cmp(&b.balance().amount.0));
} }
assert!(!payments.is_empty());
// Find the smallest set of outputs usable to fulfill these outputs
// Size is determined by the largest output, not quantity nor aggregate value
{
// We start by sorting low to high
outputs.sort_by(|a, b| a.balance().amount.0.cmp(&b.balance().amount.0));
let value_needed =
payments.iter().map(|payment| payment.balance().amount.0).sum::<u64>() - operating_costs;
let mut needed = 0;
let mut value_present = 0;
for output in &outputs {
needed += 1;
value_present += output.balance().amount.0;
if value_present >= value_needed {
break;
} }
} }
impl< // Drain, and save back to the DB, the unnecessary outputs
S: ScannerFeed, let remaining_outputs = outputs.drain(needed ..).collect::<Vec<_>>();
T: 'static + Send + Sync + SignableTransaction, Db::<S>::set_outputs(txn, key, *coin, &remaining_outputs);
P: TransactionPlanner<S, PlannedTransaction = PlannedTransaction<S, T>>, }
> SchedulerTrait<S> for Scheduler<S, T, P> assert!(!outputs.is_empty());
// We now have the current operating costs, the outputs we're using, and the payments
// The database has the unused outputs/unfilfillable payments
// Actually plan/send off the transactions
// While our set of outputs exceed the input limit, aggregate them
while outputs.len() > MAX_INPUTS {
let outputs_chunk = outputs.drain(.. MAX_INPUTS).collect::<Vec<_>>();
// While we're aggregating these outputs, handle any payments we can
let payments_chunk = loop {
let can_handle =
amount_of_payments_that_can_be_handled(operating_costs, &outputs, &payments);
let payments_chunk = payments.drain(.. can_handle.min(MAX_OUTPUTS)).collect::<Vec<_>>();
let payments_value =
payments_chunk.iter().map(|payment| payment.balance().amount.0).sum::<u64>();
if payments_value <= operating_costs {
operating_costs -= payments_value;
continue;
}
break payments_chunk;
};
let Some(planned) = P::plan_transaction_with_fee_amortization(
&mut operating_costs,
fee_rates[coin],
outputs_chunk,
payments_chunk,
// We always use our key for the change here since we may need this change output to
// finish fulfilling these payments
Some(key),
) else {
// We amortized all payments, and even when just trying to make the change output, these
// inputs couldn't afford their own aggregation and were written off
continue;
};
// Send the transactions off for signing
TransactionsToSign::<P::SignableTransaction>::send(txn, &key, &planned.signable);
// Push the Eventualities onto the result
eventualities.push(planned.eventuality);
let mut effected_received_outputs = planned.auxilliary.0;
// Only handle Change so if someone burns to an External address, we don't use it here
// when the scanner will tell us to return it (without accumulating it)
effected_received_outputs.retain(|output| output.kind() == OutputType::Change);
outputs.append(&mut effected_received_outputs);
}
// Now that we have an aggregated set of inputs, create the tree for payments
todo!("TODO");
}
eventualities
}
}
impl<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>> SchedulerTrait<S>
for Scheduler<S, P>
{ {
fn activate_key(&mut self, txn: &mut impl DbTxn, key: KeyFor<S>) { fn activate_key(&mut self, txn: &mut impl DbTxn, key: KeyFor<S>) {
for coin in S::NETWORK.coins() { for coin in S::NETWORK.coins() {
assert!(Db::<S>::outputs(txn, key, *coin).is_none());
Db::<S>::set_outputs(txn, key, *coin, &[]); Db::<S>::set_outputs(txn, key, *coin, &[]);
assert!(Db::<S>::queued_payments(txn, key, *coin).is_none());
Db::<S>::set_queued_payments(txn, key, *coin, &vec![]);
} }
} }
fn flush_key(&mut self, txn: &mut impl DbTxn, retiring_key: KeyFor<S>, new_key: KeyFor<S>) { fn flush_key(&mut self, txn: &mut impl DbTxn, retiring_key: KeyFor<S>, new_key: KeyFor<S>) {
todo!("TODO") for coin in S::NETWORK.coins() {
let still_queued = Db::<S>::queued_payments(txn, retiring_key, *coin).unwrap();
let mut new_queued = Db::<S>::queued_payments(txn, new_key, *coin).unwrap();
let mut queued = still_queued;
queued.append(&mut new_queued);
Db::<S>::set_queued_payments(txn, retiring_key, *coin, &vec![]);
Db::<S>::set_queued_payments(txn, new_key, *coin, &queued);
}
} }
fn retire_key(&mut self, txn: &mut impl DbTxn, key: KeyFor<S>) { fn retire_key(&mut self, txn: &mut impl DbTxn, key: KeyFor<S>) {
for coin in S::NETWORK.coins() { for coin in S::NETWORK.coins() {
assert!(Db::<S>::outputs(txn, key, *coin).is_none()); assert!(Db::<S>::outputs(txn, key, *coin).unwrap().is_empty());
Db::<S>::del_outputs(txn, key, *coin); Db::<S>::del_outputs(txn, key, *coin);
assert!(Db::<S>::queued_payments(txn, key, *coin).unwrap().is_empty());
Db::<S>::del_queued_payments(txn, key, *coin);
} }
} }
@@ -91,12 +231,41 @@ impl<
update: SchedulerUpdate<S>, update: SchedulerUpdate<S>,
) -> HashMap<Vec<u8>, Vec<EventualityFor<S>>> { ) -> HashMap<Vec<u8>, Vec<EventualityFor<S>>> {
// Accumulate all the outputs // Accumulate all the outputs
for key in active_keys { for (key, _) in active_keys {
Self::accumulate_outputs(txn, key.0, update.outputs()); // Accumulate them in memory
let mut outputs_by_coin = HashMap::with_capacity(1);
for output in update.outputs().iter().filter(|output| output.key() == *key) {
match output.kind() {
OutputType::External | OutputType::Forwarded => {},
// TODO: Only accumulate these if we haven't already, but do accumulate if not
OutputType::Branch | OutputType::Change => todo!("TODO"),
}
let coin = output.balance().coin;
if let std::collections::hash_map::Entry::Vacant(e) = outputs_by_coin.entry(coin) {
e.insert(Db::<S>::outputs(txn, *key, coin).unwrap());
}
outputs_by_coin.get_mut(&coin).unwrap().push(output.clone());
}
// Flush them to the database
for (coin, outputs) in outputs_by_coin {
Db::<S>::set_outputs(txn, *key, coin, &outputs);
}
} }
let mut fee_rates: HashMap<Coin, _> = todo!("TODO"); let mut fee_rates: HashMap<Coin, _> = todo!("TODO");
// Fulfill the payments we prior couldn't
let mut eventualities = HashMap::new();
for (key, _stage) in active_keys {
eventualities.insert(
key.to_bytes().as_ref().to_vec(),
self.handle_queued_payments(txn, active_keys, *key),
);
}
// TODO: If this key has been flushed, forward all outputs
// Create the transactions for the forwards/burns // Create the transactions for the forwards/burns
{ {
let mut planned_txs = vec![]; let mut planned_txs = vec![];
@@ -137,20 +306,14 @@ impl<
planned_txs.push((key, plan)); planned_txs.push((key, plan));
} }
let mut eventualities = HashMap::new();
for (key, planned_tx) in planned_txs { for (key, planned_tx) in planned_txs {
// Send the transactions off for signing // Send the transactions off for signing
TransactionsToSign::<T>::send(txn, &key, &planned_tx.signable); TransactionsToSign::<P::SignableTransaction>::send(txn, &key, &planned_tx.signable);
// Insert the eventualities into the result // Insert the Eventualities into the result
eventualities eventualities[key.to_bytes().as_ref()].push(planned_tx.eventuality);
.entry(key.to_bytes().as_ref().to_vec())
.or_insert(Vec::with_capacity(1))
.push(planned_tx.eventuality);
} }
// TODO: Fulfill any payments we prior couldn't
eventualities eventualities
} }
} }
@@ -159,13 +322,29 @@ impl<
&mut self, &mut self,
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
active_keys: &[(KeyFor<S>, LifetimeStage)], active_keys: &[(KeyFor<S>, LifetimeStage)],
payments: Vec<Payment<AddressFor<S>>>, mut payments: Vec<Payment<AddressFor<S>>>,
) -> HashMap<Vec<u8>, Vec<EventualityFor<S>>> { ) -> HashMap<Vec<u8>, Vec<EventualityFor<S>>> {
// TODO: Find the key to use for fulfillment // Find the key to filfill these payments with
// TODO: Sort outputs and payments by amount let fulfillment_key = match active_keys[0].1 {
// TODO: For as long as we don't have sufficiently aggregated inputs to handle all payments, LifetimeStage::ActiveYetNotReporting => {
// aggregate panic!("expected to fulfill payments despite not reporting for the oldest key")
// TODO: Create the tree for the payments }
todo!("TODO") LifetimeStage::Active | LifetimeStage::UsingNewForChange => active_keys[0].0,
LifetimeStage::Forwarding | LifetimeStage::Finishing => active_keys[1].0,
};
// Queue the payments for this key
for coin in S::NETWORK.coins() {
let mut queued_payments = Db::<S>::queued_payments(txn, fulfillment_key, *coin).unwrap();
queued_payments
.extend(payments.iter().filter(|payment| payment.balance().coin == *coin).cloned());
Db::<S>::set_queued_payments(txn, fulfillment_key, *coin, &queued_payments);
}
// Handle the queued payments
HashMap::from([(
fulfillment_key.to_bytes().as_ref().to_vec(),
self.handle_queued_payments(txn, active_keys, fulfillment_key),
)])
} }
} }