mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-12 22:19:26 +00:00
Remove the Signer events pseudo-channel for a returned message
Also replaces SignerEvent with usage of ProcessorMessage directly.
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
use core::{marker::PhantomData, fmt};
|
||||
use std::collections::{VecDeque, HashMap};
|
||||
use std::collections::HashMap;
|
||||
|
||||
use rand_core::OsRng;
|
||||
|
||||
@@ -18,12 +18,6 @@ use crate::{
|
||||
networks::{Transaction, Eventuality, Network},
|
||||
};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum SignerEvent<N: Network> {
|
||||
SignedTransaction { id: [u8; 32], tx: <N::Transaction as Transaction<N>>::Id },
|
||||
ProcessorMessage(ProcessorMessage),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct SignerDb<N: Network, D: Db>(D, PhantomData<N>);
|
||||
impl<N: Network, D: Db> SignerDb<N, D> {
|
||||
@@ -162,8 +156,6 @@ pub struct Signer<N: Network, D: Db> {
|
||||
preprocessing: HashMap<[u8; 32], (Vec<SignMachineFor<N>>, Vec<PreprocessFor<N>>)>,
|
||||
#[allow(clippy::type_complexity)]
|
||||
signing: HashMap<[u8; 32], (SignatureMachineFor<N>, Vec<SignatureShareFor<N>>)>,
|
||||
|
||||
pub events: VecDeque<SignerEvent<N>>,
|
||||
}
|
||||
|
||||
impl<N: Network, D: Db> fmt::Debug for Signer<N, D> {
|
||||
@@ -210,8 +202,6 @@ impl<N: Network, D: Db> Signer<N, D> {
|
||||
attempt: HashMap::new(),
|
||||
preprocessing: HashMap::new(),
|
||||
signing: HashMap::new(),
|
||||
|
||||
events: VecDeque::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -245,6 +235,7 @@ impl<N: Network, D: Db> Signer<N, D> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
fn already_completed(&self, txn: &mut D::Transaction<'_>, id: [u8; 32]) -> bool {
|
||||
if !SignerDb::<N, D>::completions(txn, id).is_empty() {
|
||||
debug!(
|
||||
@@ -258,7 +249,12 @@ impl<N: Network, D: Db> Signer<N, D> {
|
||||
}
|
||||
}
|
||||
|
||||
fn complete(&mut self, id: [u8; 32], tx_id: <N::Transaction as Transaction<N>>::Id) {
|
||||
#[must_use]
|
||||
fn complete(
|
||||
&mut self,
|
||||
id: [u8; 32],
|
||||
tx_id: <N::Transaction as Transaction<N>>::Id,
|
||||
) -> ProcessorMessage {
|
||||
// Assert we're actively signing for this TX
|
||||
assert!(self.signable.remove(&id).is_some(), "completed a TX we weren't signing for");
|
||||
assert!(self.attempt.remove(&id).is_some(), "attempt had an ID signable didn't have");
|
||||
@@ -271,10 +267,20 @@ impl<N: Network, D: Db> Signer<N, D> {
|
||||
self.signing.remove(&id);
|
||||
|
||||
// Emit the event for it
|
||||
self.events.push_back(SignerEvent::SignedTransaction { id, tx: tx_id });
|
||||
ProcessorMessage::Completed {
|
||||
key: self.keys[0].group_key().to_bytes().as_ref().to_vec(),
|
||||
id,
|
||||
tx: tx_id.as_ref().to_vec(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn completed(&mut self, txn: &mut D::Transaction<'_>, id: [u8; 32], tx: N::Transaction) {
|
||||
#[must_use]
|
||||
pub fn completed(
|
||||
&mut self,
|
||||
txn: &mut D::Transaction<'_>,
|
||||
id: [u8; 32],
|
||||
tx: N::Transaction,
|
||||
) -> Option<ProcessorMessage> {
|
||||
let first_completion = !self.already_completed(txn, id);
|
||||
|
||||
// Save this completion to the DB
|
||||
@@ -282,17 +288,21 @@ impl<N: Network, D: Db> Signer<N, D> {
|
||||
SignerDb::<N, D>::complete(txn, id, &tx);
|
||||
|
||||
if first_completion {
|
||||
self.complete(id, tx.id());
|
||||
Some(self.complete(id, tx.id()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns Some if the first completion.
|
||||
// Doesn't use any loops/retries since we'll eventually get this from the Scanner anyways
|
||||
#[must_use]
|
||||
async fn claimed_eventuality_completion(
|
||||
&mut self,
|
||||
txn: &mut D::Transaction<'_>,
|
||||
id: [u8; 32],
|
||||
tx_id: &<N::Transaction as Transaction<N>>::Id,
|
||||
) -> bool {
|
||||
) -> Option<ProcessorMessage> {
|
||||
if let Some(eventuality) = SignerDb::<N, D>::eventuality(txn, id) {
|
||||
// Transaction hasn't hit our mempool/was dropped for a different signature
|
||||
// The latter can happen given certain latency conditions/a single malicious signer
|
||||
@@ -305,7 +315,7 @@ impl<N: Network, D: Db> Signer<N, D> {
|
||||
hex::encode(id),
|
||||
"(or had another connectivity issue)",
|
||||
);
|
||||
return false;
|
||||
return None;
|
||||
};
|
||||
|
||||
if self.network.confirm_completion(&eventuality, &tx) {
|
||||
@@ -317,8 +327,7 @@ impl<N: Network, D: Db> Signer<N, D> {
|
||||
SignerDb::<N, D>::complete(txn, id, &tx);
|
||||
|
||||
if first_completion {
|
||||
self.complete(id, tx.id());
|
||||
return true;
|
||||
return Some(self.complete(id, tx.id()));
|
||||
}
|
||||
} else {
|
||||
warn!(
|
||||
@@ -337,12 +346,18 @@ impl<N: Network, D: Db> Signer<N, D> {
|
||||
"which we already marked as completed",
|
||||
);
|
||||
}
|
||||
false
|
||||
None
|
||||
}
|
||||
|
||||
async fn attempt(&mut self, txn: &mut D::Transaction<'_>, id: [u8; 32], attempt: u32) {
|
||||
#[must_use]
|
||||
async fn attempt(
|
||||
&mut self,
|
||||
txn: &mut D::Transaction<'_>,
|
||||
id: [u8; 32],
|
||||
attempt: u32,
|
||||
) -> Option<ProcessorMessage> {
|
||||
if self.already_completed(txn, id) {
|
||||
return;
|
||||
return None;
|
||||
}
|
||||
|
||||
// Check if we're already working on this attempt
|
||||
@@ -354,7 +369,7 @@ impl<N: Network, D: Db> Signer<N, D> {
|
||||
attempt,
|
||||
curr_attempt
|
||||
);
|
||||
return;
|
||||
return None;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -363,7 +378,7 @@ impl<N: Network, D: Db> Signer<N, D> {
|
||||
// (also because we do need an owned tx anyways)
|
||||
let Some(tx) = self.signable.get(&id).cloned() else {
|
||||
warn!("told to attempt a TX we aren't currently signing for");
|
||||
return;
|
||||
return None;
|
||||
};
|
||||
|
||||
// Delete any existing machines
|
||||
@@ -395,7 +410,7 @@ impl<N: Network, D: Db> Signer<N, D> {
|
||||
hex::encode(id.id),
|
||||
id.attempt
|
||||
);
|
||||
return;
|
||||
return None;
|
||||
}
|
||||
|
||||
SignerDb::<N, D>::attempt(txn, &id);
|
||||
@@ -408,7 +423,7 @@ impl<N: Network, D: Db> Signer<N, D> {
|
||||
let machine = match self.network.attempt_send(keys.clone(), tx.clone()).await {
|
||||
Err(e) => {
|
||||
error!("failed to attempt {}, #{}: {:?}", hex::encode(id.id), id.attempt, e);
|
||||
return;
|
||||
return None;
|
||||
}
|
||||
Ok(machine) => machine,
|
||||
};
|
||||
@@ -426,38 +441,41 @@ impl<N: Network, D: Db> Signer<N, D> {
|
||||
self.preprocessing.insert(id.id, (machines, preprocesses));
|
||||
|
||||
// Broadcast our preprocess
|
||||
self.events.push_back(SignerEvent::ProcessorMessage(ProcessorMessage::Preprocess {
|
||||
id,
|
||||
preprocesses: serialized_preprocesses,
|
||||
}));
|
||||
Some(ProcessorMessage::Preprocess { id, preprocesses: serialized_preprocesses })
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub async fn sign_transaction(
|
||||
&mut self,
|
||||
txn: &mut D::Transaction<'_>,
|
||||
id: [u8; 32],
|
||||
tx: N::SignableTransaction,
|
||||
eventuality: N::Eventuality,
|
||||
) {
|
||||
) -> Option<ProcessorMessage> {
|
||||
// The caller is expected to re-issue sign orders on reboot
|
||||
// This is solely used by the rebroadcast task
|
||||
SignerDb::<N, D>::add_active_sign(txn, &id);
|
||||
|
||||
if self.already_completed(txn, id) {
|
||||
return;
|
||||
return None;
|
||||
}
|
||||
|
||||
SignerDb::<N, D>::save_eventuality(txn, id, eventuality);
|
||||
|
||||
self.signable.insert(id, tx);
|
||||
self.attempt(txn, id, 0).await;
|
||||
self.attempt(txn, id, 0).await
|
||||
}
|
||||
|
||||
pub async fn handle(&mut self, txn: &mut D::Transaction<'_>, msg: CoordinatorMessage) {
|
||||
#[must_use]
|
||||
pub async fn handle(
|
||||
&mut self,
|
||||
txn: &mut D::Transaction<'_>,
|
||||
msg: CoordinatorMessage,
|
||||
) -> Option<ProcessorMessage> {
|
||||
match msg {
|
||||
CoordinatorMessage::Preprocesses { id, mut preprocesses } => {
|
||||
if self.verify_id(&id).is_err() {
|
||||
return;
|
||||
return None;
|
||||
}
|
||||
|
||||
let (machines, our_preprocesses) = match self.preprocessing.remove(&id.id) {
|
||||
@@ -467,7 +485,7 @@ impl<N: Network, D: Db> Signer<N, D> {
|
||||
"not preprocessing for {}. this is an error if we didn't reboot",
|
||||
hex::encode(id.id)
|
||||
);
|
||||
return;
|
||||
return None;
|
||||
}
|
||||
Some(machine) => machine,
|
||||
};
|
||||
@@ -516,15 +534,12 @@ impl<N: Network, D: Db> Signer<N, D> {
|
||||
self.signing.insert(id.id, (signature_machine.unwrap(), shares));
|
||||
|
||||
// Broadcast our shares
|
||||
self.events.push_back(SignerEvent::ProcessorMessage(ProcessorMessage::Share {
|
||||
id,
|
||||
shares: serialized_shares,
|
||||
}));
|
||||
Some(ProcessorMessage::Share { id, shares: serialized_shares })
|
||||
}
|
||||
|
||||
CoordinatorMessage::Shares { id, mut shares } => {
|
||||
if self.verify_id(&id).is_err() {
|
||||
return;
|
||||
return None;
|
||||
}
|
||||
|
||||
let (machine, our_shares) = match self.signing.remove(&id.id) {
|
||||
@@ -540,7 +555,7 @@ impl<N: Network, D: Db> Signer<N, D> {
|
||||
"not preprocessing for {}. this is an error if we didn't reboot",
|
||||
hex::encode(id.id)
|
||||
);
|
||||
return;
|
||||
return None;
|
||||
}
|
||||
Some(machine) => machine,
|
||||
};
|
||||
@@ -582,12 +597,10 @@ impl<N: Network, D: Db> Signer<N, D> {
|
||||
}
|
||||
|
||||
// Stop trying to sign for this TX
|
||||
self.complete(id.id, tx_id);
|
||||
Some(self.complete(id.id, tx_id))
|
||||
}
|
||||
|
||||
CoordinatorMessage::Reattempt { id } => {
|
||||
self.attempt(txn, id.id, id.attempt).await;
|
||||
}
|
||||
CoordinatorMessage::Reattempt { id } => self.attempt(txn, id.id, id.attempt).await,
|
||||
|
||||
CoordinatorMessage::Completed { key: _, id, tx: mut tx_vec } => {
|
||||
let mut tx = <N::Transaction as Transaction<N>>::Id::default();
|
||||
@@ -601,11 +614,11 @@ impl<N: Network, D: Db> Signer<N, D> {
|
||||
hex::encode(id),
|
||||
"that's not a valid TX ID",
|
||||
);
|
||||
return;
|
||||
return None;
|
||||
}
|
||||
tx.as_mut().copy_from_slice(&tx_vec);
|
||||
|
||||
self.claimed_eventuality_completion(txn, id, &tx).await;
|
||||
self.claimed_eventuality_completion(txn, id, &tx).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user