Merge AckBlock with Burns

Offers greater efficiency while reducing concerns re: atomicity.
This commit is contained in:
Luke Parker
2023-04-15 18:38:40 -04:00
parent eafd054296
commit e21fc5ff3c
6 changed files with 51 additions and 66 deletions

View File

@@ -173,20 +173,13 @@ pub mod substrate {
#[derive(Clone, PartialEq, Eq, Debug, Zeroize, Serialize, Deserialize)] #[derive(Clone, PartialEq, Eq, Debug, Zeroize, Serialize, Deserialize)]
pub enum CoordinatorMessage { pub enum CoordinatorMessage {
// Substrate acknwoledged the block, meaning it should be acted upon. SubstrateBlock { context: SubstrateContext, key: Vec<u8>, burns: Vec<OutInstructionWithBalance> },
//
// This still needs to come from Substrate, not from the validator-chain, due to it mutating
// the scheduler, which the Substrate chain primarily does. To have two causes of mutation
// requires a definitive ordering, which isn't achievable when we have distinct consensus.
BlockAcknowledged { context: SubstrateContext, key: Vec<u8>, block: BlockHash },
Burns { context: SubstrateContext, burns: Vec<OutInstructionWithBalance> },
} }
impl CoordinatorMessage { impl CoordinatorMessage {
pub fn required_block(&self) -> Option<BlockHash> { pub fn required_block(&self) -> Option<BlockHash> {
let context = match self { let context = match self {
CoordinatorMessage::BlockAcknowledged { context, .. } => context, CoordinatorMessage::SubstrateBlock { context, .. } => context,
CoordinatorMessage::Burns { context, .. } => context,
}; };
Some(context.coin_latest_finalized_block) Some(context.coin_latest_finalized_block)
} }

View File

@@ -394,40 +394,22 @@ async fn run<C: Coin, D: Db, Co: Coordinator>(raw_db: D, coin: C, mut coordinato
CoordinatorMessage::Substrate(msg) => { CoordinatorMessage::Substrate(msg) => {
match msg { match msg {
// TODO: Merge this with Burns so we don't have two distinct scheduling actions messages::substrate::CoordinatorMessage::SubstrateBlock {
messages::substrate::CoordinatorMessage::BlockAcknowledged {
context, context,
key: key_vec, key: key_vec,
block burns,
} => { } => {
let mut block_id = <C::Block as Block<C>>::Id::default();
block_id.as_mut().copy_from_slice(&context.coin_latest_finalized_block.0);
let key = let key =
<C::Curve as Ciphersuite>::read_G::<&[u8]>(&mut key_vec.as_ref()).unwrap(); <C::Curve as Ciphersuite>::read_G::<&[u8]>(&mut key_vec.as_ref()).unwrap();
let mut block_id = <C::Block as Block<C>>::Id::default();
block_id.as_mut().copy_from_slice(&block.0);
let plans = schedulers // We now have to acknowledge every block for this key up to the acknowledged block
.get_mut(&key_vec) let outputs = scanner.ack_up_to_block(key, block_id).await;
.expect("key we don't have a scheduler for acknowledged a block")
.add_outputs(scanner.ack_block(key, block_id).await);
sign_plans(
&mut main_db,
&coin,
&scanner,
&mut schedulers,
&signers,
context,
plans
).await;
}
messages::substrate::CoordinatorMessage::Burns { context, burns } => {
// TODO2: Rewrite rotation documentation
let schedule_key = active_keys.last().expect("burn event despite no keys");
let scheduler = schedulers.get_mut(schedule_key.to_bytes().as_ref()).unwrap();
let mut payments = vec![]; let mut payments = vec![];
for out in burns.clone() { for out in burns {
let OutInstructionWithBalance { let OutInstructionWithBalance {
instruction: OutInstruction { address, data }, instruction: OutInstruction { address, data },
balance, balance,
@@ -441,7 +423,11 @@ async fn run<C: Coin, D: Db, Co: Coordinator>(raw_db: D, coin: C, mut coordinato
} }
} }
let plans = scheduler.schedule(payments); let plans = schedulers
.get_mut(&key_vec)
.expect("key we don't have a scheduler for acknowledged a block")
.schedule(outputs, payments);
sign_plans( sign_plans(
&mut main_db, &mut main_db,
&coin, &coin,

View File

@@ -181,11 +181,7 @@ impl<C: Coin, D: Db> ScannerDb<C, D> {
key: &<C::Curve as Ciphersuite>::G, key: &<C::Curve as Ciphersuite>::G,
block: usize, block: usize,
) -> Vec<C::Output> { ) -> Vec<C::Output> {
let new_key = txn.get(Self::scanned_block_key(key)).is_none();
let outputs = Self::block(txn, block).and_then(|id| Self::outputs(txn, key, &id)); let outputs = Self::block(txn, block).and_then(|id| Self::outputs(txn, key, &id));
// Either this is a new key, with no outputs, or we're acknowledging this block
// If we're acknowledging it, we should have outputs available
assert_eq!(new_key, outputs.is_none());
let outputs = outputs.unwrap_or(vec![]); let outputs = outputs.unwrap_or(vec![]);
// Mark all the outputs from this block as seen // Mark all the outputs from this block as seen
@@ -199,7 +195,10 @@ impl<C: Coin, D: Db> ScannerDb<C, D> {
outputs outputs
} }
fn latest_scanned_block(&self, key: <C::Curve as Ciphersuite>::G) -> usize { fn latest_scanned_block(&self, key: <C::Curve as Ciphersuite>::G) -> usize {
let bytes = self.0.get(Self::scanned_block_key(&key)).unwrap_or(vec![0; 8]); let bytes = self
.0
.get(Self::scanned_block_key(&key))
.expect("asking for latest scanned block of key which wasn't rotated to");
u64::from_le_bytes(bytes.try_into().unwrap()).try_into().unwrap() u64::from_le_bytes(bytes.try_into().unwrap()).try_into().unwrap()
} }
} }
@@ -282,22 +281,30 @@ impl<C: Coin, D: Db> ScannerHandle<C, D> {
} }
/// Acknowledge having handled a block for a key. /// Acknowledge having handled a block for a key.
pub async fn ack_block( pub async fn ack_up_to_block(
&self, &self,
key: <C::Curve as Ciphersuite>::G, key: <C::Curve as Ciphersuite>::G,
id: <C::Block as Block<C>>::Id, id: <C::Block as Block<C>>::Id,
) -> Vec<C::Output> { ) -> Vec<C::Output> {
let mut scanner = self.scanner.write().await; let mut scanner = self.scanner.write().await;
debug!("Block {} acknowledged", hex::encode(&id)); debug!("Block {} acknowledged", hex::encode(&id));
// Get the number for this block
let number = let number =
scanner.db.block_number(&id).expect("main loop trying to operate on data we haven't scanned"); scanner.db.block_number(&id).expect("main loop trying to operate on data we haven't scanned");
// Get the number of the last block we acknowledged
let prior = scanner.db.latest_scanned_block(key);
let mut outputs = vec![];
let mut txn = scanner.db.0.txn(); let mut txn = scanner.db.0.txn();
let outputs = ScannerDb::<C, D>::save_scanned_block(&mut txn, &key, number); for number in (prior + 1) ..= number {
outputs.extend(ScannerDb::<C, D>::save_scanned_block(&mut txn, &key, number));
}
// TODO: This likely needs to be atomic with the scheduler?
txn.commit(); txn.commit();
for output in &outputs { for output in &outputs {
scanner.ram_outputs.remove(output.id().as_ref()); assert!(scanner.ram_outputs.remove(output.id().as_ref()));
} }
outputs outputs

View File

@@ -108,9 +108,7 @@ impl<C: Coin> Scheduler<C> {
Plan { key: self.key, inputs, payments, change: Some(self.key).filter(|_| change) } Plan { key: self.key, inputs, payments, change: Some(self.key).filter(|_| change) }
} }
// When Substrate emits `Updates` for a coin, all outputs should be added up to the fn add_outputs(&mut self, mut utxos: Vec<C::Output>) -> Vec<Plan<C>> {
// acknowledged block.
pub fn add_outputs(&mut self, mut utxos: Vec<C::Output>) -> Vec<Plan<C>> {
log::info!("adding {} outputs", utxos.len()); log::info!("adding {} outputs", utxos.len());
let mut txs = vec![]; let mut txs = vec![];
@@ -139,14 +137,13 @@ impl<C: Coin> Scheduler<C> {
} }
log::info!("{} planned TXs have had their required inputs confirmed", txs.len()); log::info!("{} planned TXs have had their required inputs confirmed", txs.len());
// Additionally call schedule in case these outputs enable fulfilling scheduled payments
txs.extend(self.schedule(vec![]));
txs txs
} }
// Schedule a series of payments. This should be called after `add_outputs`. // Schedule a series of outputs/payments.
pub fn schedule(&mut self, payments: Vec<Payment<C>>) -> Vec<Plan<C>> { pub fn schedule(&mut self, utxos: Vec<C::Output>, payments: Vec<Payment<C>>) -> Vec<Plan<C>> {
let mut plans = self.add_outputs(utxos);
log::info!("scheduling {} new payments", payments.len()); log::info!("scheduling {} new payments", payments.len());
// Add all new payments to the list of pending payments // Add all new payments to the list of pending payments
@@ -157,7 +154,7 @@ impl<C: Coin> Scheduler<C> {
// If we don't have UTXOs available, don't try to continue // If we don't have UTXOs available, don't try to continue
if self.utxos.is_empty() { if self.utxos.is_empty() {
log::info!("no utxos currently avilable"); log::info!("no utxos currently avilable");
return vec![]; return plans;
} }
// Sort UTXOs so the highest valued ones are first // Sort UTXOs so the highest valued ones are first
@@ -185,13 +182,12 @@ impl<C: Coin> Scheduler<C> {
} }
} }
let mut txs = vec![];
for chunk in utxo_chunks.drain(..) { for chunk in utxo_chunks.drain(..) {
// TODO: While payments have their TXs' fees deducted from themselves, that doesn't hold here // TODO: While payments have their TXs' fees deducted from themselves, that doesn't hold here
// We need to charge a fee before reporting incoming UTXOs to Substrate to cover aggregation // We need to charge a fee before reporting incoming UTXOs to Substrate to cover aggregation
// TXs // TXs
log::debug!("aggregating a chunk of {} inputs", C::MAX_INPUTS); log::debug!("aggregating a chunk of {} inputs", C::MAX_INPUTS);
txs.push(Plan { key: self.key, inputs: chunk, payments: vec![], change: Some(self.key) }) plans.push(Plan { key: self.key, inputs: chunk, payments: vec![], change: Some(self.key) })
} }
// We want to use all possible UTXOs for all possible payments // We want to use all possible UTXOs for all possible payments
@@ -220,17 +216,18 @@ impl<C: Coin> Scheduler<C> {
// Now that we have the list of payments we can successfully handle right now, create the TX // Now that we have the list of payments we can successfully handle right now, create the TX
// for them // for them
if !executing.is_empty() { if !executing.is_empty() {
txs.push(self.execute(utxos, executing)); plans.push(self.execute(utxos, executing));
} else { } else {
// If we don't have any payments to execute, save these UTXOs for later
self.utxos.extend(utxos); self.utxos.extend(utxos);
} }
log::info!( log::info!(
"created {} TXs containing {} payments to sign", "created {} plans containing {} payments to sign",
txs.len(), plans.len(),
payments_at_start - self.payments.len(), payments_at_start - self.payments.len(),
); );
txs plans
} }
// Note a branch output as having been created, with the amount it was actually created with, // Note a branch output as having been created, with the amount it was actually created with,

View File

@@ -70,7 +70,7 @@ pub async fn test_scanner<C: Coin>(coin: C) {
verify_event(new_scanner().await).await; verify_event(new_scanner().await).await;
// Acknowledge the block // Acknowledge the block
assert_eq!(scanner.ack_block(keys.group_key(), block_id.clone()).await, outputs); assert_eq!(scanner.ack_up_to_block(keys.group_key(), block_id.clone()).await, outputs);
// There should be no more events // There should be no more events
assert!(timeout(Duration::from_secs(30), scanner.events.recv()).await.is_err()); assert!(timeout(Duration::from_secs(30), scanner.events.recv()).await.is_err());

View File

@@ -49,16 +49,14 @@ pub async fn test_wallet<C: Coin>(coin: C) {
}; };
let mut scheduler = Scheduler::new(key); let mut scheduler = Scheduler::new(key);
// Add these outputs, which should return no plans
assert!(scheduler.add_outputs(outputs.clone()).is_empty());
let amount = 2 * C::DUST; let amount = 2 * C::DUST;
let plans = scheduler.schedule(vec![Payment { address: C::address(key), data: None, amount }]); let plans = scheduler
.schedule(outputs.clone(), vec![Payment { address: C::address(key), data: None, amount }]);
assert_eq!( assert_eq!(
plans, plans,
vec![Plan { vec![Plan {
key, key,
inputs: outputs, inputs: outputs.clone(),
payments: vec![Payment { address: C::address(key), data: None, amount }], payments: vec![Payment { address: C::address(key), data: None, amount }],
change: Some(key), change: Some(key),
}] }]
@@ -91,6 +89,7 @@ pub async fn test_wallet<C: Coin>(coin: C) {
coin.mine_block().await; coin.mine_block().await;
let block_number = coin.get_latest_block_number().await.unwrap(); let block_number = coin.get_latest_block_number().await.unwrap();
let block = coin.get_block(block_number).await.unwrap(); let block = coin.get_block(block_number).await.unwrap();
let first_outputs = outputs;
let outputs = coin.get_outputs(&block, key).await.unwrap(); let outputs = coin.get_outputs(&block, key).await.unwrap();
assert_eq!(outputs.len(), 2); assert_eq!(outputs.len(), 2);
let amount = amount - tx.fee(&coin).await; let amount = amount - tx.fee(&coin).await;
@@ -118,5 +117,8 @@ pub async fn test_wallet<C: Coin>(coin: C) {
} }
// Check the Scanner DB can reload the outputs // Check the Scanner DB can reload the outputs
assert_eq!(scanner.ack_block(key, block.id()).await, outputs); assert_eq!(
scanner.ack_up_to_block(key, block.id()).await,
[first_outputs, outputs].concat().to_vec()
);
} }