Route the coordinator, fix race conditions in the signers library

This commit is contained in:
Luke Parker
2024-09-08 22:13:42 -04:00
parent 7484eadbbb
commit f07ec7bee0
15 changed files with 356 additions and 172 deletions

View File

@@ -0,0 +1,98 @@
use serai_db::{DbTxn, Db};
use primitives::task::ContinuallyRan;
use crate::{
db::{
RegisteredKeys, CosignerToCoordinatorMessages, BatchSignerToCoordinatorMessages,
SlashReportSignerToCoordinatorMessages, TransactionSignerToCoordinatorMessages,
},
Coordinator,
};
// Fetches messages to send the coordinator and sends them.
pub(crate) struct CoordinatorTask<D: Db, C: Coordinator> {
db: D,
coordinator: C,
}
impl<D: Db, C: Coordinator> CoordinatorTask<D, C> {
pub(crate) fn new(db: D, coordinator: C) -> Self {
Self { db, coordinator }
}
}
#[async_trait::async_trait]
impl<D: Db, C: Coordinator> ContinuallyRan for CoordinatorTask<D, C> {
async fn run_iteration(&mut self) -> Result<bool, String> {
let mut iterated = false;
for session in RegisteredKeys::get(&self.db).unwrap_or(vec![]) {
loop {
let mut txn = self.db.txn();
let Some(msg) = CosignerToCoordinatorMessages::try_recv(&mut txn, session) else {
break;
};
iterated = true;
self
.coordinator
.send(msg)
.await
.map_err(|e| format!("couldn't send sign message to the coordinator: {e:?}"))?;
txn.commit();
}
loop {
let mut txn = self.db.txn();
let Some(msg) = BatchSignerToCoordinatorMessages::try_recv(&mut txn, session) else {
break;
};
iterated = true;
self
.coordinator
.send(msg)
.await
.map_err(|e| format!("couldn't send sign message to the coordinator: {e:?}"))?;
txn.commit();
}
loop {
let mut txn = self.db.txn();
let Some(msg) = SlashReportSignerToCoordinatorMessages::try_recv(&mut txn, session) else {
break;
};
iterated = true;
self
.coordinator
.send(msg)
.await
.map_err(|e| format!("couldn't send sign message to the coordinator: {e:?}"))?;
txn.commit();
}
loop {
let mut txn = self.db.txn();
let Some(msg) = TransactionSignerToCoordinatorMessages::try_recv(&mut txn, session) else {
break;
};
iterated = true;
self
.coordinator
.send(msg)
.await
.map_err(|e| format!("couldn't send sign message to the coordinator: {e:?}"))?;
txn.commit();
}
}
Ok(iterated)
}
}

View File

@@ -15,14 +15,8 @@ create_db! {
db_channel! {
SignersGlobal {
// CompletedEventualities needs to be handled by each signer, meaning we need to turn its
// effective spsc into a spmc. We do this by duplicating its message for all keys we're
// signing for.
// TODO: Populate from CompletedEventualities
CompletedEventualitiesForEachKey: (session: Session) -> [u8; 32],
CoordinatorToTransactionSignerMessages: (session: Session) -> CoordinatorMessage,
TransactionSignerToCoordinatorMessages: (session: Session) -> ProcessorMessage,
CoordinatorToCosignerMessages: (session: Session) -> CoordinatorMessage,
CosignerToCoordinatorMessages: (session: Session) -> ProcessorMessage,
CoordinatorToBatchSignerMessages: (session: Session) -> CoordinatorMessage,
BatchSignerToCoordinatorMessages: (session: Session) -> ProcessorMessage,
@@ -30,7 +24,7 @@ db_channel! {
CoordinatorToSlashReportSignerMessages: (session: Session) -> CoordinatorMessage,
SlashReportSignerToCoordinatorMessages: (session: Session) -> ProcessorMessage,
CoordinatorToCosignerMessages: (session: Session) -> CoordinatorMessage,
CosignerToCoordinatorMessages: (session: Session) -> ProcessorMessage,
CoordinatorToTransactionSignerMessages: (session: Session) -> CoordinatorMessage,
TransactionSignerToCoordinatorMessages: (session: Session) -> ProcessorMessage,
}
}

View File

@@ -7,23 +7,42 @@ use std::collections::HashMap;
use zeroize::Zeroizing;
use serai_validator_sets_primitives::Session;
use ciphersuite::{group::GroupEncoding, Ristretto};
use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
use frost::dkg::{ThresholdCore, ThresholdKeys};
use serai_validator_sets_primitives::Session;
use serai_db::{DbTxn, Db};
use primitives::task::TaskHandle;
use scheduler::{Transaction, SignableTransaction, TransactionsToSign};
use messages::sign::{VariantSignId, ProcessorMessage, CoordinatorMessage};
use primitives::task::{Task, TaskHandle, ContinuallyRan};
use scheduler::{Transaction, SignableTransaction, TransactionFor};
pub(crate) mod db;
mod coordinator;
use coordinator::CoordinatorTask;
mod transaction;
use transaction::TransactionTask;
/// A connection to the Coordinator which messages can be published with.
#[async_trait::async_trait]
pub trait Coordinator: 'static + Send + Sync {
/// An error encountered when sending a message.
///
/// This MUST be an ephemeral error. Retrying sending a message MUST eventually resolve without
/// manual intervention/changing the arguments.
type EphemeralError: Debug;
/// Send a `messages::sign::ProcessorMessage`.
async fn send(&mut self, message: ProcessorMessage) -> Result<(), Self::EphemeralError>;
}
/// An object capable of publishing a transaction.
#[async_trait::async_trait]
pub trait TransactionPublisher<T: Transaction>: 'static + Send + Sync {
pub trait TransactionPublisher<T: Transaction>: 'static + Send + Sync + Clone {
/// An error encountered when publishing a transaction.
///
/// This MUST be an ephemeral error. Retrying publication MUST eventually resolve without manual
@@ -40,9 +59,18 @@ pub trait TransactionPublisher<T: Transaction>: 'static + Send + Sync {
async fn publish(&self, tx: T) -> Result<(), Self::EphemeralError>;
}
struct Tasks {
cosigner: TaskHandle,
batch: TaskHandle,
slash_report: TaskHandle,
transaction: TaskHandle,
}
/// The signers used by a processor.
#[allow(non_snake_case)]
pub struct Signers<ST: SignableTransaction> {
tasks: HashMap<Session, Vec<TaskHandle>>,
coordinator_handle: TaskHandle,
tasks: HashMap<Session, Tasks>,
_ST: PhantomData<ST>,
}
@@ -62,9 +90,57 @@ impl<ST: SignableTransaction> Signers<ST> {
/// Initialize the signers.
///
/// This will spawn tasks for any historically registered keys.
pub fn new(db: impl Db) -> Self {
pub fn new(
mut db: impl Db,
coordinator: impl Coordinator,
publisher: &impl TransactionPublisher<TransactionFor<ST>>,
) -> Self {
/*
On boot, perform any database cleanup which was queued.
We don't do this cleanup at time of dropping the task as we'd need to wait an unbounded
amount of time for the task to stop (requiring an async task), then we'd have to drain the
channels (which would be on a distinct DB transaction and risk not occurring if we rebooted
while waiting for the task to stop). This is the easiest way to handle this.
*/
{
let mut txn = db.txn();
for (session, external_key_bytes) in db::ToCleanup::get(&txn).unwrap_or(vec![]) {
let mut external_key_bytes = external_key_bytes.as_slice();
let external_key =
<ST::Ciphersuite as Ciphersuite>::read_G(&mut external_key_bytes).unwrap();
assert!(external_key_bytes.is_empty());
// Drain the transactions to sign
// TransactionsToSign will be fully populated by the scheduler before retiry occurs, making
// this perfect in not leaving any pending blobs behind
while scheduler::TransactionsToSign::<ST>::try_recv(&mut txn, &external_key).is_some() {}
// Drain the completed Eventualities
// This will be fully populated by the scanner before retiry
while scanner::CompletedEventualities::try_recv(&mut txn, &external_key).is_some() {}
// Drain our DB channels
while db::CoordinatorToCosignerMessages::try_recv(&mut txn, session).is_some() {}
while db::CosignerToCoordinatorMessages::try_recv(&mut txn, session).is_some() {}
while db::CoordinatorToBatchSignerMessages::try_recv(&mut txn, session).is_some() {}
while db::BatchSignerToCoordinatorMessages::try_recv(&mut txn, session).is_some() {}
while db::CoordinatorToSlashReportSignerMessages::try_recv(&mut txn, session).is_some() {}
while db::SlashReportSignerToCoordinatorMessages::try_recv(&mut txn, session).is_some() {}
while db::CoordinatorToTransactionSignerMessages::try_recv(&mut txn, session).is_some() {}
while db::TransactionSignerToCoordinatorMessages::try_recv(&mut txn, session).is_some() {}
}
db::ToCleanup::del(&mut txn);
txn.commit();
}
let mut tasks = HashMap::new();
let (coordinator_task, coordinator_handle) = Task::new();
tokio::spawn(
CoordinatorTask::new(db.clone(), coordinator).continually_run(coordinator_task, vec![]),
);
for session in db::RegisteredKeys::get(&db).unwrap_or(vec![]) {
let buf = db::SerializedKeys::get(&db, session).unwrap();
let mut buf = buf.as_slice();
@@ -78,10 +154,23 @@ impl<ST: SignableTransaction> Signers<ST> {
.push(ThresholdKeys::from(ThresholdCore::<ST::Ciphersuite>::read(&mut buf).unwrap()));
}
todo!("TODO")
// TODO: Batch signer, cosigner, slash report signers
let (transaction_task, transaction_handle) = Task::new();
tokio::spawn(
TransactionTask::<_, ST, _>::new(db.clone(), publisher.clone(), session, external_keys)
.continually_run(transaction_task, vec![coordinator_handle.clone()]),
);
tasks.insert(session, Tasks {
cosigner: todo!("TODO"),
batch: todo!("TODO"),
slash_report: todo!("TODO"),
transaction: transaction_handle,
});
}
Self { tasks, _ST: PhantomData }
Self { coordinator_handle, tasks, _ST: PhantomData }
}
/// Register a set of keys to sign with.
@@ -146,82 +235,31 @@ impl<ST: SignableTransaction> Signers<ST> {
let mut to_cleanup = db::ToCleanup::get(txn).unwrap_or(vec![]);
to_cleanup.push((session, external_key.to_bytes().as_ref().to_vec()));
db::ToCleanup::set(txn, &to_cleanup);
}
// TODO: Handle all of the following cleanup on a task
/*
// Kill the tasks
if let Some(tasks) = self.tasks.remove(&session) {
for task in tasks {
task.close().await;
/// Queue handling a message.
///
/// This is a cheap call and able to be done inline with a higher-level loop.
pub fn queue_message(&mut self, txn: &mut impl DbTxn, message: &CoordinatorMessage) {
let sign_id = message.sign_id();
let tasks = self.tasks.get(&sign_id.session);
match sign_id.id {
VariantSignId::Cosign(_) => {
db::CoordinatorToCosignerMessages::send(txn, sign_id.session, message);
if let Some(tasks) = tasks { tasks.cosigner.run_now(); }
}
VariantSignId::Batch(_) => {
db::CoordinatorToBatchSignerMessages::send(txn, sign_id.session, message);
if let Some(tasks) = tasks { tasks.batch.run_now(); }
}
VariantSignId::SlashReport(_) => {
db::CoordinatorToSlashReportSignerMessages::send(txn, sign_id.session, message);
if let Some(tasks) = tasks { tasks.slash_report.run_now(); }
}
VariantSignId::Transaction(_) => {
db::CoordinatorToTransactionSignerMessages::send(txn, sign_id.session, message);
if let Some(tasks) = tasks { tasks.transaction.run_now(); }
}
}
// Drain the transactions to sign
// Presumably, TransactionsToSign will be fully populated before retiry occurs, making this
// perfect in not leaving any pending blobs behind
while TransactionsToSign::<ST>::try_recv(txn, external_key).is_some() {}
// Drain our DB channels
while db::CompletedEventualitiesForEachKey::try_recv(txn, session).is_some() {}
while db::CoordinatorToTransactionSignerMessages::try_recv(txn, session).is_some() {}
while db::TransactionSignerToCoordinatorMessages::try_recv(txn, session).is_some() {}
while db::CoordinatorToBatchSignerMessages::try_recv(txn, session).is_some() {}
while db::BatchSignerToCoordinatorMessages::try_recv(txn, session).is_some() {}
while db::CoordinatorToSlashReportSignerMessages::try_recv(txn, session).is_some() {}
while db::SlashReportSignerToCoordinatorMessages::try_recv(txn, session).is_some() {}
while db::CoordinatorToCosignerMessages::try_recv(txn, session).is_some() {}
while db::CosignerToCoordinatorMessages::try_recv(txn, session).is_some() {}
*/
}
}
/*
// The signers used by a Processor, key-scoped.
struct KeySigners<D: Db, T: Clone + PreprocessMachine> {
transaction: AttemptManager<D, T>,
substrate: AttemptManager<D, AlgorithmMachine<Ristretto, Schnorrkel>>,
cosigner: AttemptManager<D, AlgorithmMachine<Ristretto, Schnorrkel>>,
}
/// The signers used by a protocol.
pub struct Signers<D: Db, T: Clone + PreprocessMachine>(HashMap<Vec<u8>, KeySigners<D, T>>);
impl<D: Db, T: Clone + PreprocessMachine> Signers<D, T> {
/// Create a new set of signers.
pub fn new(db: D) -> Self {
// TODO: Load the registered keys
// TODO: Load the transactions being signed
// TODO: Load the batches being signed
todo!("TODO")
}
/// Register a transaction to sign.
pub fn sign_transaction(&mut self) -> Vec<ProcessorMessage> {
todo!("TODO")
}
/// Mark a transaction as signed.
pub fn signed_transaction(&mut self) { todo!("TODO") }
/// Register a batch to sign.
pub fn sign_batch(&mut self, key: KeyFor<S>, batch: Batch) -> Vec<ProcessorMessage> {
todo!("TODO")
}
/// Mark a batch as signed.
pub fn signed_batch(&mut self, batch: u32) { todo!("TODO") }
/// Register a slash report to sign.
pub fn sign_slash_report(&mut self) -> Vec<ProcessorMessage> {
todo!("TODO")
}
/// Mark a slash report as signed.
pub fn signed_slash_report(&mut self) { todo!("TODO") }
/// Start a cosigning protocol.
pub fn cosign(&mut self) { todo!("TODO") }
/// Handle a message for a signing protocol.
pub fn handle(&mut self, msg: CoordinatorMessage) -> Vec<ProcessorMessage> {
todo!("TODO")
}
}
*/

View File

@@ -3,31 +3,28 @@ use std::{
time::{Duration, Instant},
};
use frost::{dkg::ThresholdKeys, sign::PreprocessMachine};
use frost::dkg::ThresholdKeys;
use serai_validator_sets_primitives::Session;
use serai_db::{DbTxn, Db};
use messages::sign::VariantSignId;
use primitives::task::ContinuallyRan;
use scheduler::{Transaction, SignableTransaction, TransactionsToSign};
use scheduler::{Transaction, SignableTransaction, TransactionFor, TransactionsToSign};
use scanner::CompletedEventualities;
use frost_attempt_manager::*;
use crate::{
db::{
CoordinatorToTransactionSignerMessages, TransactionSignerToCoordinatorMessages,
CompletedEventualitiesForEachKey,
},
db::{CoordinatorToTransactionSignerMessages, TransactionSignerToCoordinatorMessages},
TransactionPublisher,
};
mod db;
use db::*;
type TransactionFor<ST> =
<<ST as SignableTransaction>::PreprocessMachine as PreprocessMachine>::Signature;
// Fetches transactions to sign and signs them.
pub(crate) struct TransactionTask<
D: Db,
@@ -76,7 +73,7 @@ impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>
for keys in &keys {
machines.push(signable_transaction.clone().sign(keys.clone()));
}
attempt_manager.register(tx, machines);
attempt_manager.register(VariantSignId::Transaction(tx), machines);
}
Self {
@@ -123,7 +120,7 @@ impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>
for keys in &self.keys {
machines.push(tx.clone().sign(keys.clone()));
}
for msg in self.attempt_manager.register(tx.id(), machines) {
for msg in self.attempt_manager.register(VariantSignId::Transaction(tx.id()), machines) {
TransactionSignerToCoordinatorMessages::send(&mut txn, self.session, &msg);
}
@@ -133,28 +130,42 @@ impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>
// Check for completed Eventualities (meaning we should no longer sign for these transactions)
loop {
let mut txn = self.db.txn();
let Some(id) = CompletedEventualitiesForEachKey::try_recv(&mut txn, self.session) else {
let Some(id) = CompletedEventualities::try_recv(&mut txn, &self.keys[0].group_key()) else {
break;
};
/*
We may have yet to register this signing protocol.
While `TransactionsToSign` is populated before `CompletedEventualities`, we could
theoretically have `TransactionsToSign` populated with a new transaction _while iterating
over `CompletedEventualities`_, and then have `CompletedEventualities` populated. In that
edge case, we will see the completion notification before we see the transaction.
In such a case, we break (dropping the txn, re-queueing the completion notification). On
the task's next iteration, we'll process the transaction from `TransactionsToSign` and be
able to make progress.
*/
if !self.active_signing_protocols.remove(&id) {
break;
}
iterated = true;
// This may or may not be an ID this key was responsible for
if self.active_signing_protocols.remove(&id) {
// Since it was, remove this as an active signing protocol
ActiveSigningProtocols::set(
&mut txn,
self.session,
&self.active_signing_protocols.iter().copied().collect(),
);
// Clean up the database
SerializedSignableTransactions::del(&mut txn, id);
SerializedTransactions::del(&mut txn, id);
// Since it was, remove this as an active signing protocol
ActiveSigningProtocols::set(
&mut txn,
self.session,
&self.active_signing_protocols.iter().copied().collect(),
);
// Clean up the database
SerializedSignableTransactions::del(&mut txn, id);
SerializedTransactions::del(&mut txn, id);
// We retire with a txn so we either successfully flag this Eventuality as completed, and
// won't re-register it (making this retire safe), or we don't flag it, meaning we will
// re-register it, yet that's safe as we have yet to retire it
self.attempt_manager.retire(&mut txn, VariantSignId::Transaction(id));
// We retire with a txn so we either successfully flag this Eventuality as completed, and
// won't re-register it (making this retire safe), or we don't flag it, meaning we will
// re-register it, yet that's safe as we have yet to retire it
self.attempt_manager.retire(&mut txn, id);
}
txn.commit();
}
@@ -178,7 +189,14 @@ impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>
{
let mut buf = Vec::with_capacity(256);
signed_tx.write(&mut buf).unwrap();
SerializedTransactions::set(&mut txn, id, &buf);
SerializedTransactions::set(
&mut txn,
match id {
VariantSignId::Transaction(id) => id,
_ => panic!("TransactionTask signed a non-transaction"),
},
&buf,
);
}
self