From 6deb60513c8b352b2c9a2830ecca241edbeff02d Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sun, 1 Sep 2024 00:05:08 -0400 Subject: [PATCH] Expand primitives/scanner with niceties needed for the scheduler --- processor/primitives/src/output.rs | 2 +- processor/scanner/src/eventuality/mod.rs | 10 ++-- processor/scanner/src/lib.rs | 67 +++++++++++++++++++++--- processor/scanner/src/scan/mod.rs | 4 +- processor/scanner/src/substrate/mod.rs | 5 ++ 5 files changed, 75 insertions(+), 13 deletions(-) diff --git a/processor/primitives/src/output.rs b/processor/primitives/src/output.rs index 9a300940..d59b4fd0 100644 --- a/processor/primitives/src/output.rs +++ b/processor/primitives/src/output.rs @@ -8,7 +8,7 @@ use serai_primitives::{ExternalAddress, Balance}; use crate::Id; /// An address on the external network. -pub trait Address: Send + Sync + Into + TryFrom { +pub trait Address: Send + Sync + Clone + Into + TryFrom { /// Write this address. fn write(&self, writer: &mut impl io::Write) -> io::Result<()>; /// Read an address. diff --git a/processor/scanner/src/eventuality/mod.rs b/processor/scanner/src/eventuality/mod.rs index 3be7f3ce..bfc879ea 100644 --- a/processor/scanner/src/eventuality/mod.rs +++ b/processor/scanner/src/eventuality/mod.rs @@ -12,7 +12,7 @@ use crate::{ SeraiKey, OutputWithInInstruction, ReceiverScanData, ScannerGlobalDb, SubstrateToEventualityDb, ScanToEventualityDb, }, - BlockExt, ScannerFeed, KeyFor, OutputFor, EventualityFor, SchedulerUpdate, Scheduler, + BlockExt, ScannerFeed, KeyFor, OutputFor, EventualityFor, Payment, SchedulerUpdate, Scheduler, sort_outputs, scan::{next_to_scan_for_outputs_block, queue_output_until_block}, }; @@ -165,7 +165,11 @@ impl> EventualityTask { { intaked_any = true; - let new_eventualities = self.scheduler.fulfill(&mut txn, &keys_with_stages, burns); + let new_eventualities = self.scheduler.fulfill( + &mut txn, + &keys_with_stages, + burns.into_iter().filter_map(|burn| Payment::try_from(burn).ok()).collect(), + ); intake_eventualities::(&mut txn, new_eventualities); } txn.commit(); @@ -291,7 +295,7 @@ impl> ContinuallyRan for EventualityTas // Drop any outputs less than the dust limit non_external_outputs.retain(|output| { let balance = output.balance(); - balance.amount.0 >= self.feed.dust(balance.coin).0 + balance.amount.0 >= S::dust(balance.coin).0 }); /* diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 53bb9030..80cf96be 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -5,7 +5,7 @@ use group::GroupEncoding; use serai_db::{Get, DbTxn, Db}; -use serai_primitives::{NetworkId, Coin, Amount}; +use serai_primitives::{NetworkId, Coin, Amount, Balance, Data}; use serai_in_instructions_primitives::Batch; use serai_coins_primitives::OutInstructionWithBalance; @@ -143,7 +143,7 @@ pub trait ScannerFeed: 'static + Send + Sync + Clone { /// /// This MUST be constant. Serai MUST NOT create internal outputs worth less than this. This /// SHOULD be a value worth handling at a human level. - fn dust(&self, coin: Coin) -> Amount; + fn dust(coin: Coin) -> Amount; /// The cost to aggregate an input as of the specified block. /// @@ -155,10 +155,14 @@ pub trait ScannerFeed: 'static + Send + Sync + Clone { ) -> Result; } -type KeyFor = <::Block as Block>::Key; -type AddressFor = <::Block as Block>::Address; -type OutputFor = <::Block as Block>::Output; -type EventualityFor = <::Block as Block>::Eventuality; +/// The key type for this ScannerFeed. +pub type KeyFor = <::Block as Block>::Key; +/// The address type for this ScannerFeed. +pub type AddressFor = <::Block as Block>::Address; +/// The output type for this ScannerFeed. +pub type OutputFor = <::Block as Block>::Output; +/// The eventuality type for this ScannerFeed. +pub type EventualityFor = <::Block as Block>::Eventuality; #[async_trait::async_trait] pub trait BatchPublisher: 'static + Send + Sync { @@ -200,6 +204,55 @@ pub struct SchedulerUpdate { returns: Vec>, } +impl SchedulerUpdate { + /// The outputs to accumulate. + pub fn outputs(&self) -> &[OutputFor] { + &self.outputs + } + /// The outputs to forward to the latest multisig. + pub fn forwards(&self) -> &[OutputFor] { + &self.forwards + } + /// The outputs to return. + pub fn returns(&self) -> &[Return] { + &self.returns + } +} + +/// A payment to fulfill. +#[derive(Clone)] +pub struct Payment { + address: AddressFor, + balance: Balance, + data: Option>, +} + +impl TryFrom for Payment { + type Error = (); + fn try_from(out_instruction_with_balance: OutInstructionWithBalance) -> Result { + Ok(Payment { + address: out_instruction_with_balance.instruction.address.try_into().map_err(|_| ())?, + balance: out_instruction_with_balance.balance, + data: out_instruction_with_balance.instruction.data.map(Data::consume), + }) + } +} + +impl Payment { + /// The address to pay. + pub fn address(&self) -> &AddressFor { + &self.address + } + /// The balance to transfer. + pub fn balance(&self) -> Balance { + self.balance + } + /// The data to associate with this payment. + pub fn data(&self) -> &Option> { + &self.data + } +} + /// The object responsible for accumulating outputs and planning new transactions. pub trait Scheduler: 'static + Send { /// Activate a key. @@ -274,7 +327,7 @@ pub trait Scheduler: 'static + Send { &mut self, txn: &mut impl DbTxn, active_keys: &[(KeyFor, LifetimeStage)], - payments: Vec, + payments: Vec>, ) -> HashMap, Vec>>; } diff --git a/processor/scanner/src/scan/mod.rs b/processor/scanner/src/scan/mod.rs index 4d6ca16e..51671dc6 100644 --- a/processor/scanner/src/scan/mod.rs +++ b/processor/scanner/src/scan/mod.rs @@ -215,7 +215,7 @@ impl ContinuallyRan for ScanTask { let balance = output.balance(); // We ensure it's over the dust limit to prevent people sending 1 satoshi from causing // an invocation of a consensus/signing protocol - if balance.amount.0 >= self.feed.dust(balance.coin).0 { + if balance.amount.0 >= S::dust(balance.coin).0 { ScannerGlobalDb::::flag_notable_due_to_non_external_output(&mut txn, b); } continue; @@ -243,7 +243,7 @@ impl ContinuallyRan for ScanTask { balance.amount.0 -= 2 * cost_to_aggregate.0; // Now, check it's still past the dust threshold - if balance.amount.0 < self.feed.dust(balance.coin).0 { + if balance.amount.0 < S::dust(balance.coin).0 { continue; } diff --git a/processor/scanner/src/substrate/mod.rs b/processor/scanner/src/substrate/mod.rs index 4feb85d5..d67be9dc 100644 --- a/processor/scanner/src/substrate/mod.rs +++ b/processor/scanner/src/substrate/mod.rs @@ -138,6 +138,11 @@ impl ContinuallyRan for SubstrateTask { } } + // Drop burns less than the dust + let burns = burns + .into_iter() + .filter(|burn| burn.balance.amount.0 >= S::dust(burn.balance.coin).0) + .collect::>(); if !burns.is_empty() { // We send these Burns as stemming from this block we just acknowledged // This causes them to be acted on after we accumulate the outputs from this block