Files
serai/processor/signers/src/transaction/mod.rs

130 lines
3.6 KiB
Rust
Raw Normal View History

2024-09-06 03:20:38 -04:00
use frost::dkg::ThresholdKeys;
use serai_validator_sets_primitives::Session;
use serai_db::{DbTxn, Db};
use primitives::task::ContinuallyRan;
2024-09-06 03:20:38 -04:00
use scheduler::{SignableTransaction, TransactionsToSign};
use scanner::{ScannerFeed, Scheduler};
use frost_attempt_manager::*;
use crate::{
db::{
CoordinatorToTransactionSignerMessages, TransactionSignerToCoordinatorMessages,
CompletedEventualitiesForEachKey,
},
TransactionPublisher,
};
mod db;
// Fetches transactions to sign and signs them.
2024-09-06 03:20:38 -04:00
pub(crate) struct TransactionTask<
D: Db,
S: ScannerFeed,
Sch: Scheduler<S>,
P: TransactionPublisher<Sch::SignableTransaction>,
> {
db: D,
2024-09-06 03:20:38 -04:00
session: Session,
keys: Vec<ThresholdKeys<<Sch::SignableTransaction as SignableTransaction>::Ciphersuite>>,
attempt_manager:
AttemptManager<D, <Sch::SignableTransaction as SignableTransaction>::PreprocessMachine>,
2024-09-06 03:20:38 -04:00
publisher: P,
}
2024-09-06 03:20:38 -04:00
impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>, P: TransactionPublisher<Sch::SignableTransaction>>
TransactionTask<D, S, Sch, P>
{
pub(crate) fn new(
db: D,
2024-09-06 03:20:38 -04:00
session: Session,
keys: Vec<ThresholdKeys<<Sch::SignableTransaction as SignableTransaction>::Ciphersuite>>,
publisher: P,
) -> Self {
2024-09-06 03:20:38 -04:00
let attempt_manager = AttemptManager::new(
db.clone(),
session,
keys.first().expect("creating a transaction signer with 0 keys").params().i(),
);
Self { db, session, keys, attempt_manager, publisher }
}
}
#[async_trait::async_trait]
2024-09-06 03:20:38 -04:00
impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>, P: TransactionPublisher<Sch::SignableTransaction>>
ContinuallyRan for TransactionTask<D, S, Sch, P>
{
async fn run_iteration(&mut self) -> Result<bool, String> {
let mut iterated = false;
// Check for new transactions to sign
loop {
let mut txn = self.db.txn();
2024-09-06 03:20:38 -04:00
let Some(tx) = TransactionsToSign::<Sch::SignableTransaction>::try_recv(
&mut txn,
&self.keys[0].group_key(),
) else {
break;
};
iterated = true;
let mut machines = Vec::with_capacity(self.keys.len());
for keys in &self.keys {
machines.push(tx.clone().sign(keys.clone()));
}
2024-09-06 03:20:38 -04:00
for msg in self.attempt_manager.register(tx.id(), machines) {
TransactionSignerToCoordinatorMessages::send(&mut txn, self.session, &msg);
}
txn.commit();
}
// Check for completed Eventualities (meaning we should no longer sign for these transactions)
loop {
let mut txn = self.db.txn();
2024-09-06 03:20:38 -04:00
let Some(id) = CompletedEventualitiesForEachKey::try_recv(&mut txn, self.session) else {
break;
};
iterated = true;
2024-09-06 03:20:38 -04:00
self.attempt_manager.retire(id);
// TODO: Stop rebroadcasting this transaction
txn.commit();
}
2024-09-06 03:20:38 -04:00
// Handle any messages sent to us
loop {
let mut txn = self.db.txn();
2024-09-06 03:20:38 -04:00
let Some(msg) = CoordinatorToTransactionSignerMessages::try_recv(&mut txn, self.session)
else {
break;
};
iterated = true;
match self.attempt_manager.handle(msg) {
2024-09-06 03:20:38 -04:00
Response::Messages(msgs) => {
for msg in msgs {
TransactionSignerToCoordinatorMessages::send(&mut txn, self.session, &msg);
}
}
Response::Signature(signed_tx) => {
// TODO: Save this TX to the DB
// TODO: Attempt publication every minute
// TODO: On boot, reload all TXs to rebroadcast
self
.publisher
.publish(signed_tx)
.await
.map_err(|e| format!("couldn't publish transaction: {e:?}"))?;
}
}
2024-09-06 03:20:38 -04:00
txn.commit();
}
Ok(iterated)
}
}