diff --git a/processor/scheduler/utxo/transaction-chaining/src/db.rs b/processor/scheduler/utxo/transaction-chaining/src/db.rs index 7d800718..d629480f 100644 --- a/processor/scheduler/utxo/transaction-chaining/src/db.rs +++ b/processor/scheduler/utxo/transaction-chaining/src/db.rs @@ -13,6 +13,7 @@ create_db! { TransactionChainingScheduler { OperatingCosts: (coin: Coin) -> Amount, SerializedOutputs: (key: &[u8], coin: Coin) -> Vec, + AlreadyAccumulatedOutput: (id: &[u8]) -> (), // We should be immediately able to schedule the fulfillment of payments, yet this may not be // possible if we're in the middle of a multisig rotation (as our output set will be split) SerializedQueuedPayments: (key: &[u8], coin: Coin) -> Vec, @@ -58,6 +59,21 @@ impl Db { SerializedOutputs::del(txn, key.to_bytes().as_ref(), coin); } + pub(crate) fn set_already_accumulated_output( + txn: &mut impl DbTxn, + output: as ReceivedOutput, AddressFor>>::Id, + ) { + AlreadyAccumulatedOutput::set(txn, output.as_ref(), &()); + } + pub(crate) fn take_if_already_accumulated_output( + txn: &mut impl DbTxn, + output: as ReceivedOutput, AddressFor>>::Id, + ) -> bool { + let res = AlreadyAccumulatedOutput::get(txn, output.as_ref()).is_some(); + AlreadyAccumulatedOutput::del(txn, output.as_ref()); + res + } + pub(crate) fn queued_payments( getter: &impl Get, key: KeyFor, diff --git a/processor/scheduler/utxo/transaction-chaining/src/lib.rs b/processor/scheduler/utxo/transaction-chaining/src/lib.rs index 9e552c13..f74e2c2c 100644 --- a/processor/scheduler/utxo/transaction-chaining/src/lib.rs +++ b/processor/scheduler/utxo/transaction-chaining/src/lib.rs @@ -60,7 +60,9 @@ impl>> Sched // that'd risk underflow let available = operating_costs + outputs.iter().map(|output| output.balance().amount.0).sum::(); - assert!(available >= payments.iter().map(|payment| payment.balance().amount.0).sum::()); + assert!( + available >= payments.iter().map(|payment| payment.balance().amount.0).sum::() + ); } let amount_of_payments_that_can_be_handled = @@ -179,6 +181,9 @@ impl>> Sched // Only handle Change so if someone burns to an External address, we don't use it here // when the scanner will tell us to return it (without accumulating it) effected_received_outputs.retain(|output| output.kind() == OutputType::Change); + for output in &effected_received_outputs { + Db::::set_already_accumulated_output(txn, output.id()); + } outputs.append(&mut effected_received_outputs); } @@ -236,9 +241,13 @@ impl>> Sched let mut outputs_by_coin = HashMap::with_capacity(1); for output in update.outputs().iter().filter(|output| output.key() == *key) { match output.kind() { - OutputType::External | OutputType::Forwarded => {}, - // TODO: Only accumulate these if we haven't already, but do accumulate if not - OutputType::Branch | OutputType::Change => todo!("TODO"), + OutputType::External | OutputType::Forwarded => {} + // Only accumulate these if we haven't already + OutputType::Branch | OutputType::Change => { + if Db::::take_if_already_accumulated_output(txn, output.id()) { + continue; + } + } } let coin = output.balance().coin; if let std::collections::hash_map::Entry::Vacant(e) = outputs_by_coin.entry(coin) {