Tidy transaction signing task

This commit is contained in:
Luke Parker
2024-09-06 17:33:02 -04:00
parent 100c80be9f
commit 8f848b1abc

View File

@@ -1,4 +1,7 @@
use std::{collections::HashSet, time::{Duration, Instant}}; use std::{
collections::HashSet,
time::{Duration, Instant},
};
use frost::{dkg::ThresholdKeys, sign::PreprocessMachine}; use frost::{dkg::ThresholdKeys, sign::PreprocessMachine};
@@ -71,7 +74,8 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>, P: TransactionPublisher<Sch::Sign
let signable_transaction_buf = SerializedSignableTransactions::get(&db, tx).unwrap(); let signable_transaction_buf = SerializedSignableTransactions::get(&db, tx).unwrap();
let mut signable_transaction_buf = signable_transaction_buf.as_slice(); let mut signable_transaction_buf = signable_transaction_buf.as_slice();
let signable_transaction = <Sch as Scheduler<S>>::SignableTransaction::read(&mut signable_transaction_buf).unwrap(); let signable_transaction =
<Sch as Scheduler<S>>::SignableTransaction::read(&mut signable_transaction_buf).unwrap();
assert!(signable_transaction_buf.is_empty()); assert!(signable_transaction_buf.is_empty());
assert_eq!(signable_transaction.id(), tx); assert_eq!(signable_transaction.id(), tx);
@@ -82,7 +86,15 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>, P: TransactionPublisher<Sch::Sign
attempt_manager.register(tx, machines); attempt_manager.register(tx, machines);
} }
Self { db, publisher, session, keys, active_signing_protocols, attempt_manager, last_publication: Instant::now() } Self {
db,
publisher,
session,
keys,
active_signing_protocols,
attempt_manager,
last_publication: Instant::now(),
}
} }
} }
@@ -106,7 +118,11 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>, P: TransactionPublisher<Sch::Sign
// Save this to the database as a transaction to sign // Save this to the database as a transaction to sign
self.active_signing_protocols.insert(tx.id()); self.active_signing_protocols.insert(tx.id());
ActiveSigningProtocols::set(&mut txn, self.session, &self.active_signing_protocols.iter().copied().collect()); ActiveSigningProtocols::set(
&mut txn,
self.session,
&self.active_signing_protocols.iter().copied().collect(),
);
{ {
let mut buf = Vec::with_capacity(256); let mut buf = Vec::with_capacity(256);
tx.write(&mut buf).unwrap(); tx.write(&mut buf).unwrap();
@@ -132,18 +148,23 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>, P: TransactionPublisher<Sch::Sign
}; };
iterated = true; iterated = true;
// Remove this as an active signing protocol // This may or may not be an ID this key was responsible for
self.active_signing_protocols.remove(&id); if self.active_signing_protocols.remove(&id) {
ActiveSigningProtocols::set(&mut txn, self.session, &self.active_signing_protocols.iter().copied().collect()); // Since it was, remove this as an active signing protocol
// Clean up the database ActiveSigningProtocols::set(
SerializedSignableTransactions::del(&mut txn, id); &mut txn,
SerializedTransactions::del(&mut txn, id); self.session,
&self.active_signing_protocols.iter().copied().collect(),
);
// Clean up the database
SerializedSignableTransactions::del(&mut txn, id);
SerializedTransactions::del(&mut txn, id);
// We retire with a txn so we either successfully flag this Eventuality as completed, and // We retire with a txn so we either successfully flag this Eventuality as completed, and
// won't re-register it (making this retire safe), or we don't flag it, meaning we will // won't re-register it (making this retire safe), or we don't flag it, meaning we will
// re-register it, yet that's safe as we have yet to retire it // re-register it, yet that's safe as we have yet to retire it
self.attempt_manager.retire(&mut txn, id); self.attempt_manager.retire(&mut txn, id);
// TODO: Stop rebroadcasting this transaction }
txn.commit(); txn.commit();
} }
@@ -170,7 +191,6 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>, P: TransactionPublisher<Sch::Sign
SerializedTransactions::set(&mut txn, id, &buf); SerializedTransactions::set(&mut txn, id, &buf);
} }
// TODO: Attempt publication every minute
self self
.publisher .publisher
.publish(signed_tx) .publish(signed_tx)
@@ -191,7 +211,11 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>, P: TransactionPublisher<Sch::Sign
let tx = TransactionFor::<S, Sch>::read(&mut tx_buf).unwrap(); let tx = TransactionFor::<S, Sch>::read(&mut tx_buf).unwrap();
assert!(tx_buf.is_empty()); assert!(tx_buf.is_empty());
self.publisher.publish(tx).await.map_err(|e| format!("couldn't re-broadcast transactions: {e:?}"))?; self
.publisher
.publish(tx)
.await
.map_err(|e| format!("couldn't re-broadcast transactions: {e:?}"))?;
} }
self.last_publication = Instant::now(); self.last_publication = Instant::now();