mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-08 20:29:23 +00:00
Correct processor flow to have the coordinator decide signing set/re-attempts
The signing set should be the first group to submit preprocesses to Tributary. Re-attempts shouldn't be once every 30s, yet n blocks since the last relevant message. Removes the use of an async task/channel in the signer (and Substrate signer). Also removes the need to be able to get the time from a coin's block, which was a fragile system marked with a TODO already.
This commit is contained in:
@@ -1,9 +1,5 @@
|
||||
use core::{marker::PhantomData, fmt};
|
||||
use std::{
|
||||
sync::Arc,
|
||||
time::{SystemTime, Duration},
|
||||
collections::HashMap,
|
||||
};
|
||||
use std::collections::{VecDeque, HashMap};
|
||||
|
||||
use rand_core::OsRng;
|
||||
|
||||
@@ -14,10 +10,6 @@ use frost::{
|
||||
};
|
||||
|
||||
use log::{info, debug, warn, error};
|
||||
use tokio::{
|
||||
sync::{RwLock, mpsc},
|
||||
time::sleep,
|
||||
};
|
||||
|
||||
use messages::sign::*;
|
||||
use crate::{
|
||||
@@ -25,16 +17,12 @@ use crate::{
|
||||
coins::{Transaction, Eventuality, Coin},
|
||||
};
|
||||
|
||||
const CHANNEL_MSG: &str = "Signer handler was dropped. Shutting down?";
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum SignerEvent<C: Coin> {
|
||||
SignedTransaction { id: [u8; 32], tx: <C::Transaction as Transaction<C>>::Id },
|
||||
ProcessorMessage(ProcessorMessage),
|
||||
}
|
||||
|
||||
pub type SignerEventChannel<C> = mpsc::UnboundedReceiver<SignerEvent<C>>;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct SignerDb<C: Coin, D: Db>(D, PhantomData<C>);
|
||||
impl<C: Coin, D: Db> SignerDb<C, D> {
|
||||
@@ -106,7 +94,7 @@ pub struct Signer<C: Coin, D: Db> {
|
||||
|
||||
keys: ThresholdKeys<C::Curve>,
|
||||
|
||||
signable: HashMap<[u8; 32], (SystemTime, C::SignableTransaction)>,
|
||||
signable: HashMap<[u8; 32], C::SignableTransaction>,
|
||||
attempt: HashMap<[u8; 32], u32>,
|
||||
preprocessing: HashMap<[u8; 32], <C::TransactionMachine as PreprocessMachine>::SignMachine>,
|
||||
#[allow(clippy::type_complexity)]
|
||||
@@ -117,7 +105,7 @@ pub struct Signer<C: Coin, D: Db> {
|
||||
>::SignatureMachine,
|
||||
>,
|
||||
|
||||
events: mpsc::UnboundedSender<SignerEvent<C>>,
|
||||
pub events: VecDeque<SignerEvent<C>>,
|
||||
}
|
||||
|
||||
impl<C: Coin, D: Db> fmt::Debug for Signer<C, D> {
|
||||
@@ -131,18 +119,9 @@ impl<C: Coin, D: Db> fmt::Debug for Signer<C, D> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct SignerHandle<C: Coin, D: Db> {
|
||||
signer: Arc<RwLock<Signer<C, D>>>,
|
||||
pub events: SignerEventChannel<C>,
|
||||
}
|
||||
|
||||
impl<C: Coin, D: Db> Signer<C, D> {
|
||||
#[allow(clippy::new_ret_no_self)]
|
||||
pub fn new(db: D, coin: C, keys: ThresholdKeys<C::Curve>) -> SignerHandle<C, D> {
|
||||
let (events_send, events_recv) = mpsc::unbounded_channel();
|
||||
|
||||
let signer = Arc::new(RwLock::new(Signer {
|
||||
pub fn new(db: D, coin: C, keys: ThresholdKeys<C::Curve>) -> Signer<C, D> {
|
||||
Signer {
|
||||
coin,
|
||||
db: SignerDb(db, PhantomData),
|
||||
|
||||
@@ -153,37 +132,35 @@ impl<C: Coin, D: Db> Signer<C, D> {
|
||||
preprocessing: HashMap::new(),
|
||||
signing: HashMap::new(),
|
||||
|
||||
events: events_send,
|
||||
}));
|
||||
events: VecDeque::new(),
|
||||
}
|
||||
}
|
||||
|
||||
tokio::spawn(Signer::run(signer.clone()));
|
||||
|
||||
SignerHandle { signer, events: events_recv }
|
||||
pub async fn keys(&self) -> ThresholdKeys<C::Curve> {
|
||||
self.keys.clone()
|
||||
}
|
||||
|
||||
fn verify_id(&self, id: &SignId) -> Result<(), ()> {
|
||||
if !id.signing_set(&self.keys.params()).contains(&self.keys.params().i()) {
|
||||
panic!("coordinator sent us preprocesses for a signing attempt we're not participating in");
|
||||
}
|
||||
|
||||
// Check the attempt lines up
|
||||
match self.attempt.get(&id.id) {
|
||||
// If we don't have an attempt logged, it's because the coordinator is faulty OR
|
||||
// because we rebooted
|
||||
// If we don't have an attempt logged, it's because the coordinator is faulty OR because we
|
||||
// rebooted
|
||||
None => {
|
||||
warn!(
|
||||
"not attempting {} #{}. this is an error if we didn't reboot",
|
||||
hex::encode(id.id),
|
||||
id.attempt
|
||||
);
|
||||
// Don't panic on the assumption we rebooted
|
||||
Err(())?;
|
||||
}
|
||||
Some(attempt) => {
|
||||
// This could be an old attempt, or it may be a 'future' attempt if we rebooted and
|
||||
// our SystemTime wasn't monotonic, as it may be
|
||||
if attempt != &id.attempt {
|
||||
debug!("sent signing data for a distinct attempt");
|
||||
warn!(
|
||||
"sent signing data for {} #{} yet we have attempt #{}",
|
||||
hex::encode(id.id),
|
||||
id.attempt,
|
||||
attempt
|
||||
);
|
||||
Err(())?;
|
||||
}
|
||||
}
|
||||
@@ -192,16 +169,7 @@ impl<C: Coin, D: Db> Signer<C, D> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn emit(&mut self, event: SignerEvent<C>) -> bool {
|
||||
if self.events.send(event).is_err() {
|
||||
info!("{}", CHANNEL_MSG);
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
async fn eventuality_completion(
|
||||
pub async fn eventuality_completion(
|
||||
&mut self,
|
||||
id: [u8; 32],
|
||||
tx_id: &<C::Transaction as Transaction<C>>::Id,
|
||||
@@ -234,7 +202,7 @@ impl<C: Coin, D: Db> Signer<C, D> {
|
||||
self.preprocessing.remove(&id);
|
||||
self.signing.remove(&id);
|
||||
|
||||
self.emit(SignerEvent::SignedTransaction { id, tx: tx.id() });
|
||||
self.events.push_back(SignerEvent::SignedTransaction { id, tx: tx.id() });
|
||||
} else {
|
||||
warn!(
|
||||
"a validator claimed {} completed {} when it did not",
|
||||
@@ -252,7 +220,140 @@ impl<C: Coin, D: Db> Signer<C, D> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle(&mut self, msg: CoordinatorMessage) {
|
||||
async fn check_completion(&mut self, id: [u8; 32]) -> bool {
|
||||
if let Some(txs) = self.db.completed(id) {
|
||||
debug!(
|
||||
"SignTransaction/Reattempt order for {}, which we've already completed signing",
|
||||
hex::encode(id)
|
||||
);
|
||||
|
||||
// Find the first instance we noted as having completed *and can still get from our node*
|
||||
let mut tx = None;
|
||||
let mut buf = <C::Transaction as Transaction<C>>::Id::default();
|
||||
let tx_id_len = buf.as_ref().len();
|
||||
assert_eq!(txs.len() % tx_id_len, 0);
|
||||
for id in 0 .. (txs.len() / tx_id_len) {
|
||||
let start = id * tx_id_len;
|
||||
buf.as_mut().copy_from_slice(&txs[start .. (start + tx_id_len)]);
|
||||
if self.coin.get_transaction(&buf).await.is_ok() {
|
||||
tx = Some(buf);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Fire the SignedTransaction event again
|
||||
if let Some(tx) = tx {
|
||||
self.events.push_back(SignerEvent::SignedTransaction { id, tx });
|
||||
} else {
|
||||
warn!("completed signing {} yet couldn't get any of the completing TXs", hex::encode(id));
|
||||
}
|
||||
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
async fn attempt(&mut self, id: [u8; 32], attempt: u32) {
|
||||
if self.check_completion(id).await {
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if we're already working on this attempt
|
||||
if let Some(curr_attempt) = self.attempt.get(&id) {
|
||||
if curr_attempt >= &attempt {
|
||||
warn!(
|
||||
"told to attempt {} #{} yet we're already working on {}",
|
||||
hex::encode(id),
|
||||
attempt,
|
||||
curr_attempt
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Start this attempt
|
||||
// Clone the TX so we don't have an immutable borrow preventing the below mutable actions
|
||||
// (also because we do need an owned tx anyways)
|
||||
let Some(tx) = self.signable.get(&id).cloned() else {
|
||||
warn!("told to attempt a TX we aren't currently signing for");
|
||||
return;
|
||||
};
|
||||
|
||||
// Delete any existing machines
|
||||
self.preprocessing.remove(&id);
|
||||
self.signing.remove(&id);
|
||||
|
||||
// Update the attempt number
|
||||
self.attempt.insert(id, attempt);
|
||||
|
||||
let id = SignId { key: self.keys.group_key().to_bytes().as_ref().to_vec(), id, attempt };
|
||||
|
||||
info!("signing for {} #{}", hex::encode(id.id), id.attempt);
|
||||
|
||||
// If we reboot mid-sign, the current design has us abort all signs and wait for latter
|
||||
// attempts/new signing protocols
|
||||
// This is distinct from the DKG which will continue DKG sessions, even on reboot
|
||||
// This is because signing is tolerant of failures of up to 1/3rd of the group
|
||||
// The DKG requires 100% participation
|
||||
// While we could apply similar tricks as the DKG (a seeded RNG) to achieve support for
|
||||
// reboots, it's not worth the complexity when messing up here leaks our secret share
|
||||
//
|
||||
// Despite this, on reboot, we'll get told of active signing items, and may be in this
|
||||
// branch again for something we've already attempted
|
||||
//
|
||||
// Only run if this hasn't already been attempted
|
||||
if self.db.has_attempt(&id) {
|
||||
warn!(
|
||||
"already attempted {} #{}. this is an error if we didn't reboot",
|
||||
hex::encode(id.id),
|
||||
id.attempt
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let mut txn = self.db.0.txn();
|
||||
SignerDb::<C, D>::attempt(&mut txn, &id);
|
||||
txn.commit();
|
||||
|
||||
// Attempt to create the TX
|
||||
let machine = match self.coin.attempt_send(tx).await {
|
||||
Err(e) => {
|
||||
error!("failed to attempt {}, #{}: {:?}", hex::encode(id.id), id.attempt, e);
|
||||
return;
|
||||
}
|
||||
Ok(machine) => machine,
|
||||
};
|
||||
|
||||
let (machine, preprocess) = machine.preprocess(&mut OsRng);
|
||||
self.preprocessing.insert(id.id, machine);
|
||||
|
||||
// Broadcast our preprocess
|
||||
self.events.push_back(SignerEvent::ProcessorMessage(ProcessorMessage::Preprocess {
|
||||
id,
|
||||
preprocess: preprocess.serialize(),
|
||||
}));
|
||||
}
|
||||
|
||||
pub async fn sign_transaction(
|
||||
&mut self,
|
||||
id: [u8; 32],
|
||||
tx: C::SignableTransaction,
|
||||
eventuality: C::Eventuality,
|
||||
) {
|
||||
if self.check_completion(id).await {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut txn = self.db.0.txn();
|
||||
SignerDb::<C, D>::save_eventuality(&mut txn, id, eventuality);
|
||||
txn.commit();
|
||||
|
||||
self.signable.insert(id, tx);
|
||||
self.attempt(id, 0).await;
|
||||
}
|
||||
|
||||
pub async fn handle(&mut self, msg: CoordinatorMessage) {
|
||||
match msg {
|
||||
CoordinatorMessage::Preprocesses { id, mut preprocesses } => {
|
||||
if self.verify_id(&id).is_err() {
|
||||
@@ -292,7 +393,7 @@ impl<C: Coin, D: Db> Signer<C, D> {
|
||||
self.signing.insert(id.id, machine);
|
||||
|
||||
// Broadcast our share
|
||||
self.emit(SignerEvent::ProcessorMessage(ProcessorMessage::Share {
|
||||
self.events.push_back(SignerEvent::ProcessorMessage(ProcessorMessage::Share {
|
||||
id,
|
||||
share: share.serialize(),
|
||||
}));
|
||||
@@ -357,7 +458,11 @@ impl<C: Coin, D: Db> Signer<C, D> {
|
||||
assert!(self.preprocessing.remove(&id.id).is_none());
|
||||
assert!(self.signing.remove(&id.id).is_none());
|
||||
|
||||
self.emit(SignerEvent::SignedTransaction { id: id.id, tx: tx_id });
|
||||
self.events.push_back(SignerEvent::SignedTransaction { id: id.id, tx: tx_id });
|
||||
}
|
||||
|
||||
CoordinatorMessage::Reattempt { id } => {
|
||||
self.attempt(id.id, id.attempt).await;
|
||||
}
|
||||
|
||||
CoordinatorMessage::Completed { key: _, id, tx: mut tx_vec } => {
|
||||
@@ -377,190 +482,4 @@ impl<C: Coin, D: Db> Signer<C, D> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// An async function, to be spawned on a task, to handle signing
|
||||
async fn run(signer_arc: Arc<RwLock<Self>>) {
|
||||
const SIGN_TIMEOUT: u64 = 30;
|
||||
|
||||
loop {
|
||||
// Sleep until a timeout expires (or five seconds expire)
|
||||
// Since this code start new sessions, it will delay any ordered signing sessions from
|
||||
// starting for up to 5 seconds, hence why this number can't be too high (such as 30 seconds,
|
||||
// the full timeout)
|
||||
// This won't delay re-attempting any signing session however, nor will it block the
|
||||
// sign_transaction function (since this doesn't hold any locks)
|
||||
sleep({
|
||||
let now = SystemTime::now();
|
||||
let mut lowest = Duration::from_secs(5);
|
||||
let signer = signer_arc.read().await;
|
||||
for (id, (start, _)) in &signer.signable {
|
||||
let until = if let Some(attempt) = signer.attempt.get(id) {
|
||||
// Get when this attempt times out
|
||||
(*start + Duration::from_secs(u64::from(attempt + 1) * SIGN_TIMEOUT))
|
||||
.duration_since(now)
|
||||
.unwrap_or(Duration::ZERO)
|
||||
} else {
|
||||
Duration::ZERO
|
||||
};
|
||||
|
||||
if until < lowest {
|
||||
lowest = until;
|
||||
}
|
||||
}
|
||||
lowest
|
||||
})
|
||||
.await;
|
||||
|
||||
// Because a signing attempt has timed out (or five seconds has passed), check all
|
||||
// sessions' timeouts
|
||||
{
|
||||
let mut signer = signer_arc.write().await;
|
||||
let keys = signer.signable.keys().cloned().collect::<Vec<_>>();
|
||||
for id in keys {
|
||||
let (start, tx) = &signer.signable[&id];
|
||||
let start = *start;
|
||||
|
||||
let attempt = u32::try_from(
|
||||
SystemTime::now().duration_since(start).unwrap_or(Duration::ZERO).as_secs() /
|
||||
SIGN_TIMEOUT,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Check if we're already working on this attempt
|
||||
if let Some(curr_attempt) = signer.attempt.get(&id) {
|
||||
if curr_attempt >= &attempt {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Start this attempt
|
||||
// Clone the TX so we don't have an immutable borrow preventing the below mutable actions
|
||||
// (also because we do need an owned tx anyways)
|
||||
let tx = tx.clone();
|
||||
|
||||
// Delete any existing machines
|
||||
signer.preprocessing.remove(&id);
|
||||
signer.signing.remove(&id);
|
||||
|
||||
// Update the attempt number so we don't re-enter this conditional
|
||||
signer.attempt.insert(id, attempt);
|
||||
|
||||
let id =
|
||||
SignId { key: signer.keys.group_key().to_bytes().as_ref().to_vec(), id, attempt };
|
||||
// Only preprocess if we're a signer
|
||||
if !id.signing_set(&signer.keys.params()).contains(&signer.keys.params().i()) {
|
||||
continue;
|
||||
}
|
||||
info!("selected to sign {} #{}", hex::encode(id.id), id.attempt);
|
||||
|
||||
// If we reboot mid-sign, the current design has us abort all signs and wait for latter
|
||||
// attempts/new signing protocols
|
||||
// This is distinct from the DKG which will continue DKG sessions, even on reboot
|
||||
// This is because signing is tolerant of failures of up to 1/3rd of the group
|
||||
// The DKG requires 100% participation
|
||||
// While we could apply similar tricks as the DKG (a seeded RNG) to achieve support for
|
||||
// reboots, it's not worth the complexity when messing up here leaks our secret share
|
||||
//
|
||||
// Despite this, on reboot, we'll get told of active signing items, and may be in this
|
||||
// branch again for something we've already attempted
|
||||
//
|
||||
// Only run if this hasn't already been attempted
|
||||
if signer.db.has_attempt(&id) {
|
||||
warn!(
|
||||
"already attempted {} #{}. this is an error if we didn't reboot",
|
||||
hex::encode(id.id),
|
||||
id.attempt
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut txn = signer.db.0.txn();
|
||||
SignerDb::<C, D>::attempt(&mut txn, &id);
|
||||
txn.commit();
|
||||
|
||||
// Attempt to create the TX
|
||||
let machine = match signer.coin.attempt_send(tx).await {
|
||||
Err(e) => {
|
||||
error!("failed to attempt {}, #{}: {:?}", hex::encode(id.id), id.attempt, e);
|
||||
continue;
|
||||
}
|
||||
Ok(machine) => machine,
|
||||
};
|
||||
|
||||
let (machine, preprocess) = machine.preprocess(&mut OsRng);
|
||||
signer.preprocessing.insert(id.id, machine);
|
||||
|
||||
// Broadcast our preprocess
|
||||
if !signer.emit(SignerEvent::ProcessorMessage(ProcessorMessage::Preprocess {
|
||||
id,
|
||||
preprocess: preprocess.serialize(),
|
||||
})) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: Coin, D: Db> SignerHandle<C, D> {
|
||||
pub async fn keys(&self) -> ThresholdKeys<C::Curve> {
|
||||
self.signer.read().await.keys.clone()
|
||||
}
|
||||
|
||||
pub async fn sign_transaction(
|
||||
&self,
|
||||
id: [u8; 32],
|
||||
start: SystemTime,
|
||||
tx: C::SignableTransaction,
|
||||
eventuality: C::Eventuality,
|
||||
) {
|
||||
let mut signer = self.signer.write().await;
|
||||
|
||||
if let Some(txs) = signer.db.completed(id) {
|
||||
debug!("SignTransaction order for ID we've already completed signing");
|
||||
|
||||
// Find the first instance we noted as having completed *and can still get from our node*
|
||||
let mut tx = None;
|
||||
let mut buf = <C::Transaction as Transaction<C>>::Id::default();
|
||||
let tx_id_len = buf.as_ref().len();
|
||||
assert_eq!(txs.len() % tx_id_len, 0);
|
||||
for id in 0 .. (txs.len() / tx_id_len) {
|
||||
let start = id * tx_id_len;
|
||||
buf.as_mut().copy_from_slice(&txs[start .. (start + tx_id_len)]);
|
||||
if signer.coin.get_transaction(&buf).await.is_ok() {
|
||||
tx = Some(buf);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Fire the SignedTransaction event again
|
||||
if let Some(tx) = tx {
|
||||
if !signer.emit(SignerEvent::SignedTransaction { id, tx }) {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
warn!("completed signing {} yet couldn't get any of the completing TXs", hex::encode(id));
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
let mut txn = signer.db.0.txn();
|
||||
SignerDb::<C, D>::save_eventuality(&mut txn, id, eventuality);
|
||||
txn.commit();
|
||||
|
||||
signer.signable.insert(id, (start, tx));
|
||||
}
|
||||
|
||||
pub async fn eventuality_completion(
|
||||
&self,
|
||||
id: [u8; 32],
|
||||
tx: &<C::Transaction as Transaction<C>>::Id,
|
||||
) {
|
||||
self.signer.write().await.eventuality_completion(id, tx).await;
|
||||
}
|
||||
|
||||
pub async fn handle(&self, msg: CoordinatorMessage) {
|
||||
self.signer.write().await.handle(msg).await;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user