diff --git a/processor/scanner/src/eventuality/mod.rs b/processor/scanner/src/eventuality/mod.rs index 84670f79..6db60b71 100644 --- a/processor/scanner/src/eventuality/mod.rs +++ b/processor/scanner/src/eventuality/mod.rs @@ -455,7 +455,9 @@ impl> ContinuallyRan for EventualityTas key.key != keys.last().unwrap().key, "key which was forwarding was the last key (which has no key after it to forward to)" ); - Sch::flush_key(&mut txn, &block, key.key, keys.last().unwrap().key); + let new_eventualities = + Sch::flush_key(&mut txn, &block, key.key, keys.last().unwrap().key); + intake_eventualities::(&mut txn, new_eventualities); } // Now that we've intaked any Eventualities caused, check if we're retiring any keys diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 8ecb731f..ecefb9a8 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -259,13 +259,12 @@ pub trait Scheduler: 'static + Send { /// /// If the retiring key has any unfulfilled payments associated with it, those MUST be made /// the responsibility of the new key. - // TODO: This needs to return a HashMap for the eventualities fn flush_key( txn: &mut impl DbTxn, block: &BlockFor, retiring_key: KeyFor, new_key: KeyFor, - ); + ) -> HashMap, Vec>>; /// Retire a key as it'll no longer be used. /// diff --git a/processor/scheduler/utxo/transaction-chaining/src/lib.rs b/processor/scheduler/utxo/transaction-chaining/src/lib.rs index 7359a87c..321d4b60 100644 --- a/processor/scheduler/utxo/transaction-chaining/src/lib.rs +++ b/processor/scheduler/utxo/transaction-chaining/src/lib.rs @@ -454,6 +454,42 @@ impl>> Sched eventualities } + + fn flush_outputs( + txn: &mut impl DbTxn, + eventualities: &mut HashMap, Vec>>, + block: &BlockFor, + from: KeyFor, + to: KeyFor, + coin: Coin, + ) { + let from_bytes = from.to_bytes().as_ref().to_vec(); + // Ensure our inputs are aggregated + eventualities + .entry(from_bytes.clone()) + .or_insert(vec![]) + .append(&mut Self::aggregate_inputs(txn, block, to, from, coin)); + + // Now that our inputs are aggregated, transfer all of them to the new key + let mut operating_costs = Db::::operating_costs(txn, coin).0; + let outputs = Db::::outputs(txn, from, coin).unwrap(); + if outputs.is_empty() { + return; + } + let planned = P::plan_transaction_with_fee_amortization( + &mut operating_costs, + P::fee_rate(block, coin), + outputs, + vec![], + Some(to), + ); + Db::::set_operating_costs(txn, coin, Amount(operating_costs)); + let Some(planned) = planned else { return }; + + TransactionsToSign::::send(txn, &from, &planned.signable); + eventualities.get_mut(&from_bytes).unwrap().push(planned.eventuality); + Self::accumulate_outputs(txn, planned.auxilliary.0, false); + } } impl>> SchedulerTrait @@ -470,22 +506,28 @@ impl>> Sched fn flush_key( txn: &mut impl DbTxn, - _block: &BlockFor, + block: &BlockFor, retiring_key: KeyFor, new_key: KeyFor, - ) { + ) -> HashMap, Vec>> { + let mut eventualities = HashMap::new(); for coin in S::NETWORK.coins() { - let still_queued = Db::::queued_payments(txn, retiring_key, *coin).unwrap(); - let mut new_queued = Db::::queued_payments(txn, new_key, *coin).unwrap(); + // Move the payments to the new key + { + let still_queued = Db::::queued_payments(txn, retiring_key, *coin).unwrap(); + let mut new_queued = Db::::queued_payments(txn, new_key, *coin).unwrap(); - let mut queued = still_queued; - queued.append(&mut new_queued); + let mut queued = still_queued; + queued.append(&mut new_queued); - Db::::set_queued_payments(txn, retiring_key, *coin, &[]); - Db::::set_queued_payments(txn, new_key, *coin, &queued); + Db::::set_queued_payments(txn, retiring_key, *coin, &[]); + Db::::set_queued_payments(txn, new_key, *coin, &queued); + } - // TODO: Forward all existing outputs + // Move the outputs to the new key + Self::flush_outputs(txn, &mut eventualities, block, retiring_key, new_key, *coin); } + eventualities } fn retire_key(txn: &mut impl DbTxn, key: KeyFor) { @@ -512,7 +554,24 @@ impl>> Sched .insert(key.to_bytes().as_ref().to_vec(), Self::step(txn, active_keys, block, *key)); } - // TODO: If this key has been flushed, forward all outputs + // If this key has been flushed, forward all outputs + match active_keys[0].1 { + LifetimeStage::ActiveYetNotReporting | + LifetimeStage::Active | + LifetimeStage::UsingNewForChange => {} + LifetimeStage::Forwarding | LifetimeStage::Finishing => { + for coin in S::NETWORK.coins() { + Self::flush_outputs( + txn, + &mut eventualities, + block, + active_keys[0].0, + active_keys[1].0, + *coin, + ); + } + } + } // Create the transactions for the forwards/burns {