diff --git a/processor/src/main.rs b/processor/src/main.rs index 68082946..8cfc4382 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -394,6 +394,7 @@ async fn run(raw_db: D, coin: C, mut coordinato CoordinatorMessage::Substrate(msg) => { match msg { + // TODO: Merge this with Burns so we don't have two distinct scheduling actions messages::substrate::CoordinatorMessage::BlockAcknowledged { context, key: key_vec, diff --git a/processor/src/scheduler.rs b/processor/src/scheduler.rs index 3754aee3..2bbf2af5 100644 --- a/processor/src/scheduler.rs +++ b/processor/src/scheduler.rs @@ -136,21 +136,21 @@ impl Scheduler { } } - // Sort the UTXOs by amount - utxos.sort_by(|a, b| a.amount().cmp(&b.amount()).reverse()); + log::info!("{} planned TXs have had their required inputs confirmed", txs.len()); - // Return the now possible TXs - log::info!("created {} planned TXs to sign from now recived outputs", txs.len()); + // Additionally call schedule in case these outputs enable fulfilling scheduled payments + txs.extend(self.schedule(vec![])); txs } // Schedule a series of payments. This should be called after `add_outputs`. pub fn schedule(&mut self, payments: Vec>) -> Vec> { - log::debug!("scheduling payments"); - assert!(!payments.is_empty(), "tried to schedule zero payments"); + log::info!("scheduling {} new payments", payments.len()); // Add all new payments to the list of pending payments self.payments.extend(payments); + let payments_at_start = self.payments.len(); + log::info!("{} payments are now scheduled", payments_at_start); // If we don't have UTXOs available, don't try to continue if self.utxos.is_empty() { @@ -177,14 +177,9 @@ impl Scheduler { } } - let mut aggregating = vec![]; + let mut txs = vec![]; for chunk in utxo_chunks.drain(..) { - aggregating.push(Plan { - key: self.key, - inputs: chunk, - payments: vec![], - change: Some(self.key), - }) + txs.push(Plan { key: self.key, inputs: chunk, payments: vec![], change: Some(self.key) }) } // We want to use all possible UTXOs for all possible payments @@ -204,14 +199,23 @@ impl Scheduler { if balance.checked_sub(amount).is_some() { balance -= amount; executing.push(self.payments.pop_front().unwrap()); + } else { + // TODO: We could continue checking other payments which aren't [0] + break; } } // Now that we have the list of payments we can successfully handle right now, create the TX // for them - let mut txs = vec![self.execute(utxos, executing)]; - txs.append(&mut aggregating); - log::info!("created {} TXs to sign", txs.len()); + if !executing.is_empty() { + txs.push(self.execute(utxos, executing)); + } + + log::info!( + "created {} TXs containing {} payments to sign", + txs.len(), + payments_at_start - self.payments.len(), + ); txs } @@ -260,6 +264,9 @@ impl Scheduler { payments.drain(..).filter(|payment| payment.amount >= C::DUST).collect::>(); // Sanity check this was done properly assert!(actual >= payments.iter().map(|payment| payment.amount).sum::()); + if payments.is_empty() { + return; + } self.plans.entry(actual).or_insert(VecDeque::new()).push_back(payments); }