Work on the higher-level signers API

This commit is contained in:
Luke Parker
2024-09-07 03:33:26 -04:00
parent 8f848b1abc
commit 59ff944152
5 changed files with 153 additions and 32 deletions

1
Cargo.lock generated
View File

@@ -8735,6 +8735,7 @@ dependencies = [
"serai-processor-scheduler-primitives", "serai-processor-scheduler-primitives",
"serai-validator-sets-primitives", "serai-validator-sets-primitives",
"tokio", "tokio",
"zeroize",
] ]
[[package]] [[package]]

View File

@@ -21,6 +21,7 @@ workspace = true
[dependencies] [dependencies]
async-trait = { version = "0.1", default-features = false } async-trait = { version = "0.1", default-features = false }
zeroize = { version = "1", default-features = false, features = ["std"] }
ciphersuite = { path = "../../crypto/ciphersuite", default-features = false, features = ["std"] } ciphersuite = { path = "../../crypto/ciphersuite", default-features = false, features = ["std"] }
frost = { package = "modular-frost", path = "../../crypto/frost", default-features = false } frost = { package = "modular-frost", path = "../../crypto/frost", default-features = false }

View File

@@ -4,6 +4,14 @@ use serai_db::{Get, DbTxn, create_db, db_channel};
use messages::sign::{ProcessorMessage, CoordinatorMessage}; use messages::sign::{ProcessorMessage, CoordinatorMessage};
create_db! {
SignersGlobal {
RegisteredKeys: () -> Vec<Session>,
SerializedKeys: (session: Session) -> Vec<u8>,
LatestRetiredSession: () -> Session,
}
}
db_channel! { db_channel! {
SignersGlobal { SignersGlobal {
// CompletedEventualities needs to be handled by each signer, meaning we need to turn its // CompletedEventualities needs to be handled by each signer, meaning we need to turn its

View File

@@ -2,11 +2,18 @@
#![doc = include_str!("../README.md")] #![doc = include_str!("../README.md")]
#![deny(missing_docs)] #![deny(missing_docs)]
use core::fmt::Debug; use core::{fmt::Debug, marker::PhantomData};
use frost::sign::PreprocessMachine; use zeroize::Zeroizing;
use scheduler::SignableTransaction; use serai_validator_sets_primitives::Session;
use ciphersuite::{group::GroupEncoding, Ristretto};
use frost::dkg::{ThresholdCore, ThresholdKeys};
use serai_db::{DbTxn, Db};
use scheduler::{Transaction, SignableTransaction, TransactionsToSign};
pub(crate) mod db; pub(crate) mod db;
@@ -14,7 +21,7 @@ mod transaction;
/// An object capable of publishing a transaction. /// An object capable of publishing a transaction.
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait TransactionPublisher<S: SignableTransaction>: 'static + Send + Sync { pub trait TransactionPublisher<T: Transaction>: 'static + Send + Sync {
/// An error encountered when publishing a transaction. /// An error encountered when publishing a transaction.
/// ///
/// This MUST be an ephemeral error. Retrying publication MUST eventually resolve without manual /// This MUST be an ephemeral error. Retrying publication MUST eventually resolve without manual
@@ -28,10 +35,124 @@ pub trait TransactionPublisher<S: SignableTransaction>: 'static + Send + Sync {
/// ///
/// The transaction already being present in the mempool/on-chain MUST NOT be considered an /// The transaction already being present in the mempool/on-chain MUST NOT be considered an
/// error. /// error.
async fn publish( async fn publish(&self, tx: T) -> Result<(), Self::EphemeralError>;
&self, }
tx: <S::PreprocessMachine as PreprocessMachine>::Signature,
) -> Result<(), Self::EphemeralError>; /// The signers used by a processor.
pub struct Signers<ST: SignableTransaction>(PhantomData<ST>);
/*
This is completely outside of consensus, so the worst that can happen is:
1) Leakage of a private key, hence the usage of frost-attempt-manager which has an API to ensure
that doesn't happen
2) The database isn't perfectly cleaned up (leaving some bytes on disk wasted)
3) The state isn't perfectly cleaned up (leaving some bytes in RAM wasted)
The last two are notably possible via a series of race conditions. For example, if an Eventuality
completion comes in *before* we registered a key, the signer will hold the signing protocol in
memory until the session is retired entirely.
*/
impl<ST: SignableTransaction> Signers<ST> {
/// Initialize the signers.
///
/// This will spawn tasks for any historically registered keys.
pub fn new(db: impl Db) -> Self {
for session in db::RegisteredKeys::get(&db).unwrap_or(vec![]) {
let buf = db::SerializedKeys::get(&db, session).unwrap();
let mut buf = buf.as_slice();
let mut substrate_keys = vec![];
let mut external_keys = vec![];
while !buf.is_empty() {
substrate_keys
.push(ThresholdKeys::from(ThresholdCore::<Ristretto>::read(&mut buf).unwrap()));
external_keys
.push(ThresholdKeys::from(ThresholdCore::<ST::Ciphersuite>::read(&mut buf).unwrap()));
}
todo!("TODO")
}
todo!("TODO")
}
/// Register a set of keys to sign with.
///
/// If this session (or a session after it) has already been retired, this is a NOP.
pub fn register_keys(
&mut self,
txn: &mut impl DbTxn,
session: Session,
substrate_keys: Vec<ThresholdKeys<Ristretto>>,
network_keys: Vec<ThresholdKeys<ST::Ciphersuite>>,
) {
if Some(session.0) <= db::LatestRetiredSession::get(txn).map(|session| session.0) {
return;
}
{
let mut sessions = db::RegisteredKeys::get(txn).unwrap_or_else(|| Vec::with_capacity(1));
sessions.push(session);
db::RegisteredKeys::set(txn, &sessions);
}
{
let mut buf = Zeroizing::new(Vec::with_capacity(2 * substrate_keys.len() * 128));
for (substrate_keys, network_keys) in substrate_keys.into_iter().zip(network_keys) {
buf.extend(&*substrate_keys.serialize());
buf.extend(&*network_keys.serialize());
}
db::SerializedKeys::set(txn, session, &buf);
}
}
/// Retire the signers for a session.
///
/// This MUST be called in order, for every session (even if we didn't register keys for this
/// session).
pub fn retire_session(
&mut self,
txn: &mut impl DbTxn,
session: Session,
external_key: &impl GroupEncoding,
) {
// Update the latest retired session
{
let next_to_retire =
db::LatestRetiredSession::get(txn).map_or(Session(0), |session| Session(session.0 + 1));
assert_eq!(session, next_to_retire);
db::LatestRetiredSession::set(txn, &session);
}
// Kill the tasks
todo!("TODO");
// Update RegisteredKeys/SerializedKeys
if let Some(registered) = db::RegisteredKeys::get(txn) {
db::RegisteredKeys::set(
txn,
&registered.into_iter().filter(|session_i| *session_i != session).collect(),
);
}
db::SerializedKeys::del(txn, session);
// Drain the transactions to sign
// Presumably, TransactionsToSign will be fully populated before retiry occurs, making this
// perfect in not leaving any pending blobs behind
while TransactionsToSign::<ST>::try_recv(txn, external_key).is_some() {}
// Drain our DB channels
while db::CompletedEventualitiesForEachKey::try_recv(txn, session).is_some() {}
while db::CoordinatorToTransactionSignerMessages::try_recv(txn, session).is_some() {}
while db::TransactionSignerToCoordinatorMessages::try_recv(txn, session).is_some() {}
while db::CoordinatorToBatchSignerMessages::try_recv(txn, session).is_some() {}
while db::BatchSignerToCoordinatorMessages::try_recv(txn, session).is_some() {}
while db::CoordinatorToSlashReportSignerMessages::try_recv(txn, session).is_some() {}
while db::SlashReportSignerToCoordinatorMessages::try_recv(txn, session).is_some() {}
while db::CoordinatorToCosignerMessages::try_recv(txn, session).is_some() {}
while db::CosignerToCoordinatorMessages::try_recv(txn, session).is_some() {}
}
} }
/* /*

View File

@@ -11,7 +11,6 @@ use serai_db::{DbTxn, Db};
use primitives::task::ContinuallyRan; use primitives::task::ContinuallyRan;
use scheduler::{Transaction, SignableTransaction, TransactionsToSign}; use scheduler::{Transaction, SignableTransaction, TransactionsToSign};
use scanner::{ScannerFeed, Scheduler};
use frost_attempt_manager::*; use frost_attempt_manager::*;
@@ -26,40 +25,35 @@ use crate::{
mod db; mod db;
use db::*; use db::*;
type TransactionFor<S, Sch> = < type TransactionFor<ST> =
< <<ST as SignableTransaction>::PreprocessMachine as PreprocessMachine>::Signature;
<Sch as Scheduler<S>
>::SignableTransaction as SignableTransaction>::PreprocessMachine as PreprocessMachine
>::Signature;
// Fetches transactions to sign and signs them. // Fetches transactions to sign and signs them.
pub(crate) struct TransactionTask< pub(crate) struct TransactionTask<
D: Db, D: Db,
S: ScannerFeed, ST: SignableTransaction,
Sch: Scheduler<S>, P: TransactionPublisher<TransactionFor<ST>>,
P: TransactionPublisher<Sch::SignableTransaction>,
> { > {
db: D, db: D,
publisher: P, publisher: P,
session: Session, session: Session,
keys: Vec<ThresholdKeys<<Sch::SignableTransaction as SignableTransaction>::Ciphersuite>>, keys: Vec<ThresholdKeys<ST::Ciphersuite>>,
active_signing_protocols: HashSet<[u8; 32]>, active_signing_protocols: HashSet<[u8; 32]>,
attempt_manager: attempt_manager: AttemptManager<D, <ST as SignableTransaction>::PreprocessMachine>,
AttemptManager<D, <Sch::SignableTransaction as SignableTransaction>::PreprocessMachine>,
last_publication: Instant, last_publication: Instant,
} }
impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>, P: TransactionPublisher<Sch::SignableTransaction>> impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>>
TransactionTask<D, S, Sch, P> TransactionTask<D, ST, P>
{ {
pub(crate) fn new( pub(crate) fn new(
db: D, db: D,
publisher: P, publisher: P,
session: Session, session: Session,
keys: Vec<ThresholdKeys<<Sch::SignableTransaction as SignableTransaction>::Ciphersuite>>, keys: Vec<ThresholdKeys<ST::Ciphersuite>>,
) -> Self { ) -> Self {
let mut active_signing_protocols = HashSet::new(); let mut active_signing_protocols = HashSet::new();
let mut attempt_manager = AttemptManager::new( let mut attempt_manager = AttemptManager::new(
@@ -74,8 +68,7 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>, P: TransactionPublisher<Sch::Sign
let signable_transaction_buf = SerializedSignableTransactions::get(&db, tx).unwrap(); let signable_transaction_buf = SerializedSignableTransactions::get(&db, tx).unwrap();
let mut signable_transaction_buf = signable_transaction_buf.as_slice(); let mut signable_transaction_buf = signable_transaction_buf.as_slice();
let signable_transaction = let signable_transaction = ST::read(&mut signable_transaction_buf).unwrap();
<Sch as Scheduler<S>>::SignableTransaction::read(&mut signable_transaction_buf).unwrap();
assert!(signable_transaction_buf.is_empty()); assert!(signable_transaction_buf.is_empty());
assert_eq!(signable_transaction.id(), tx); assert_eq!(signable_transaction.id(), tx);
@@ -99,8 +92,8 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>, P: TransactionPublisher<Sch::Sign
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>, P: TransactionPublisher<Sch::SignableTransaction>> impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>> ContinuallyRan
ContinuallyRan for TransactionTask<D, S, Sch, P> for TransactionTask<D, ST, P>
{ {
async fn run_iteration(&mut self) -> Result<bool, String> { async fn run_iteration(&mut self) -> Result<bool, String> {
let mut iterated = false; let mut iterated = false;
@@ -108,10 +101,7 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>, P: TransactionPublisher<Sch::Sign
// Check for new transactions to sign // Check for new transactions to sign
loop { loop {
let mut txn = self.db.txn(); let mut txn = self.db.txn();
let Some(tx) = TransactionsToSign::<Sch::SignableTransaction>::try_recv( let Some(tx) = TransactionsToSign::<ST>::try_recv(&mut txn, &self.keys[0].group_key()) else {
&mut txn,
&self.keys[0].group_key(),
) else {
break; break;
}; };
iterated = true; iterated = true;
@@ -208,7 +198,7 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>, P: TransactionPublisher<Sch::Sign
for tx in &self.active_signing_protocols { for tx in &self.active_signing_protocols {
let Some(tx_buf) = SerializedTransactions::get(&self.db, *tx) else { continue }; let Some(tx_buf) = SerializedTransactions::get(&self.db, *tx) else { continue };
let mut tx_buf = tx_buf.as_slice(); let mut tx_buf = tx_buf.as_slice();
let tx = TransactionFor::<S, Sch>::read(&mut tx_buf).unwrap(); let tx = TransactionFor::<ST>::read(&mut tx_buf).unwrap();
assert!(tx_buf.is_empty()); assert!(tx_buf.is_empty());
self self