diff --git a/processor/messages/src/lib.rs b/processor/messages/src/lib.rs index d9534293..998c7cea 100644 --- a/processor/messages/src/lib.rs +++ b/processor/messages/src/lib.rs @@ -82,7 +82,7 @@ pub mod sign { #[derive(Clone, Copy, PartialEq, Eq, Hash, Encode, Decode, BorshSerialize, BorshDeserialize)] pub enum VariantSignId { - Cosign([u8; 32]), + Cosign(u64), Batch(u32), SlashReport(Session), Transaction([u8; 32]), @@ -91,7 +91,7 @@ pub mod sign { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { match self { Self::Cosign(cosign) => { - f.debug_struct("VariantSignId::Cosign").field("0", &hex::encode(cosign)).finish() + f.debug_struct("VariantSignId::Cosign").field("0", &cosign).finish() } Self::Batch(batch) => f.debug_struct("VariantSignId::Batch").field("0", &batch).finish(), Self::SlashReport(session) => { diff --git a/processor/signers/src/coordinator/mod.rs b/processor/signers/src/coordinator/mod.rs index 0b1ee467..a3163922 100644 --- a/processor/signers/src/coordinator/mod.rs +++ b/processor/signers/src/coordinator/mod.rs @@ -90,6 +90,21 @@ impl ContinuallyRan for CoordinatorTask { txn.commit(); } + // Publish the cosigns from this session + { + let mut txn = self.db.txn(); + while let Some(((block_number, block_id), signature)) = Cosign::try_recv(&mut txn, session) + { + iterated = true; + self + .coordinator + .publish_cosign(block_number, block_id, <_>::decode(&mut signature.as_slice()).unwrap()) + .await + .map_err(|e| format!("couldn't publish Cosign: {e:?}"))?; + } + txn.commit(); + } + // If this session signed its slash report, publish its signature { let mut txn = self.db.txn(); diff --git a/processor/signers/src/cosign/db.rs b/processor/signers/src/cosign/db.rs new file mode 100644 index 00000000..01a42446 --- /dev/null +++ b/processor/signers/src/cosign/db.rs @@ -0,0 +1,9 @@ +use serai_validator_sets_primitives::Session; + +use serai_db::{Get, DbTxn, create_db}; + +create_db! { + SignersCosigner { + LatestCosigned: (session: Session) -> u64, + } +} diff --git a/processor/signers/src/cosign/mod.rs b/processor/signers/src/cosign/mod.rs new file mode 100644 index 00000000..41db8050 --- /dev/null +++ b/processor/signers/src/cosign/mod.rs @@ -0,0 +1,122 @@ +use ciphersuite::Ristretto; +use frost::dkg::ThresholdKeys; + +use scale::Encode; +use serai_primitives::Signature; +use serai_validator_sets_primitives::Session; + +use serai_db::{DbTxn, Db}; + +use messages::{sign::VariantSignId, coordinator::cosign_block_msg}; + +use primitives::task::ContinuallyRan; + +use frost_attempt_manager::*; + +use crate::{ + db::{ToCosign, Cosign, CoordinatorToCosignerMessages, CosignerToCoordinatorMessages}, + WrappedSchnorrkelMachine, +}; + +mod db; +use db::LatestCosigned; + +/// Fetches the latest cosign information and works on it. +/// +/// Only the latest cosign attempt is kept. We don't work on historical attempts as later cosigns +/// supersede them. +#[allow(non_snake_case)] +pub(crate) struct CosignerTask { + db: D, + + session: Session, + keys: Vec>, + + current_cosign: Option<(u64, [u8; 32])>, + attempt_manager: AttemptManager, +} + +impl CosignerTask { + pub(crate) fn new(db: D, session: Session, keys: Vec>) -> Self { + let attempt_manager = AttemptManager::new( + db.clone(), + session, + keys.first().expect("creating a cosigner with 0 keys").params().i(), + ); + + Self { db, session, keys, current_cosign: None, attempt_manager } + } +} + +#[async_trait::async_trait] +impl ContinuallyRan for CosignerTask { + async fn run_iteration(&mut self) -> Result { + let mut iterated = false; + + // Check the cosign to work on + { + let mut txn = self.db.txn(); + if let Some(cosign) = ToCosign::get(&txn, self.session) { + // If this wasn't already signed for... + if LatestCosigned::get(&txn, self.session) < Some(cosign.0) { + // If this isn't the cosign we're currently working on, meaning it's fresh + if self.current_cosign != Some(cosign) { + // Retire the current cosign + if let Some(current_cosign) = self.current_cosign { + assert!(current_cosign.0 < cosign.0); + self.attempt_manager.retire(&mut txn, VariantSignId::Cosign(current_cosign.0)); + } + + // Set the cosign being worked on + self.current_cosign = Some(cosign); + + let mut machines = Vec::with_capacity(self.keys.len()); + { + let message = cosign_block_msg(cosign.0, cosign.1); + for keys in &self.keys { + machines.push(WrappedSchnorrkelMachine::new(keys.clone(), message.clone())); + } + } + for msg in self.attempt_manager.register(VariantSignId::Cosign(cosign.0), machines) { + CosignerToCoordinatorMessages::send(&mut txn, self.session, &msg); + } + + txn.commit(); + } + } + } + } + + // Handle any messages sent to us + loop { + let mut txn = self.db.txn(); + let Some(msg) = CoordinatorToCosignerMessages::try_recv(&mut txn, self.session) else { + break; + }; + iterated = true; + + match self.attempt_manager.handle(msg) { + Response::Messages(msgs) => { + for msg in msgs { + CosignerToCoordinatorMessages::send(&mut txn, self.session, &msg); + } + } + Response::Signature { id, signature } => { + let VariantSignId::Cosign(block_number) = id else { + panic!("CosignerTask signed a non-Cosign") + }; + assert_eq!(Some(block_number), self.current_cosign.map(|cosign| cosign.0)); + + let cosign = self.current_cosign.take().unwrap(); + LatestCosigned::set(&mut txn, self.session, &cosign.0); + // Send the cosign + Cosign::send(&mut txn, self.session, &(cosign, Signature::from(signature).encode())); + } + } + + txn.commit(); + } + + Ok(iterated) + } +} diff --git a/processor/signers/src/db.rs b/processor/signers/src/db.rs index ea022fca..b4de78d9 100644 --- a/processor/signers/src/db.rs +++ b/processor/signers/src/db.rs @@ -10,12 +10,15 @@ create_db! { SerializedKeys: (session: Session) -> Vec, LatestRetiredSession: () -> Session, ToCleanup: () -> Vec<(Session, Vec)>, + + ToCosign: (session: Session) -> (u64, [u8; 32]), } } db_channel! { SignersGlobal { - Cosign: (session: Session) -> (u64, [u8; 32]), + Cosign: (session: Session) -> ((u64, [u8; 32]), Vec), + SlashReport: (session: Session) -> Vec, SlashReportSignature: (session: Session) -> Vec, diff --git a/processor/signers/src/lib.rs b/processor/signers/src/lib.rs index cc40ce25..881205f8 100644 --- a/processor/signers/src/lib.rs +++ b/processor/signers/src/lib.rs @@ -30,6 +30,9 @@ pub(crate) mod db; mod coordinator; use coordinator::CoordinatorTask; +mod cosign; +use cosign::CosignerTask; + mod batch; use batch::BatchSignerTask; @@ -51,6 +54,14 @@ pub trait Coordinator: 'static + Send + Sync { /// Send a `messages::sign::ProcessorMessage`. async fn send(&mut self, message: ProcessorMessage) -> Result<(), Self::EphemeralError>; + /// Publish a cosign. + async fn publish_cosign( + &mut self, + block_number: u64, + block_id: [u8; 32], + signature: Signature, + ) -> Result<(), Self::EphemeralError>; + /// Publish a `Batch`. async fn publish_batch(&mut self, batch: Batch) -> Result<(), Self::EphemeralError>; @@ -92,7 +103,14 @@ struct Tasks { /// The signers used by a processor. #[allow(non_snake_case)] -pub struct Signers> { +pub struct Signers< + D: Db, + S: ScannerFeed, + Sch: Scheduler, + P: TransactionPublisher>>, +> { + db: D, + publisher: P, coordinator_handle: TaskHandle, tasks: HashMap, _Sch: PhantomData, @@ -115,15 +133,66 @@ type SignableTransactionFor = >::SignableTransaction completion comes in *before* we registered a key, the signer will hold the signing protocol in memory until the session is retired entirely. */ -impl> Signers { +impl< + D: Db, + S: ScannerFeed, + Sch: Scheduler, + P: TransactionPublisher>>, + > Signers +{ + fn tasks( + db: D, + publisher: P, + coordinator_handle: TaskHandle, + session: Session, + substrate_keys: Vec>, + external_keys: Vec>>, + ) -> Tasks { + let (cosign_task, cosign_handle) = Task::new(); + tokio::spawn( + CosignerTask::new(db.clone(), session, substrate_keys.clone()) + .continually_run(cosign_task, vec![coordinator_handle.clone()]), + ); + + let (batch_task, batch_handle) = Task::new(); + tokio::spawn( + BatchSignerTask::new( + db.clone(), + session, + external_keys[0].group_key(), + substrate_keys.clone(), + ) + .continually_run(batch_task, vec![coordinator_handle.clone()]), + ); + + let (slash_report_task, slash_report_handle) = Task::new(); + tokio::spawn( + SlashReportSignerTask::<_, S>::new(db.clone(), session, substrate_keys) + .continually_run(slash_report_task, vec![coordinator_handle.clone()]), + ); + + let (transaction_task, transaction_handle) = Task::new(); + tokio::spawn( + TransactionSignerTask::<_, SignableTransactionFor, _>::new( + db, + publisher, + session, + external_keys, + ) + .continually_run(transaction_task, vec![coordinator_handle]), + ); + + Tasks { + cosigner: cosign_handle, + batch: batch_handle, + slash_report: slash_report_handle, + transaction: transaction_handle, + } + } /// Initialize the signers. /// /// This will spawn tasks for any historically registered keys. - pub fn new( - mut db: impl Db, - coordinator: impl Coordinator, - publisher: &impl TransactionPublisher>>, - ) -> Self { + pub fn new(mut db: D, coordinator: impl Coordinator, publisher: P) -> Self { /* On boot, perform any database cleanup which was queued. @@ -158,6 +227,8 @@ impl> Signers { // Drain the completed Eventualities while scanner::CompletedEventualities::try_recv(&mut txn, &external_key).is_some() {} + // Delete the cosign this session should be working on + db::ToCosign::del(&mut txn, session); // Drain our DB channels while db::Cosign::try_recv(&mut txn, session).is_some() {} while db::SlashReport::try_recv(&mut txn, session).is_some() {} @@ -195,48 +266,20 @@ impl> Signers { )); } - // TODO: Cosigner - - let (batch_task, batch_handle) = Task::new(); - tokio::spawn( - BatchSignerTask::new( - db.clone(), - session, - external_keys[0].group_key(), - substrate_keys.clone(), - ) - .continually_run(batch_task, vec![coordinator_handle.clone()]), - ); - - let (slash_report_task, slash_report_handle) = Task::new(); - tokio::spawn( - SlashReportSignerTask::<_, S>::new(db.clone(), session, substrate_keys.clone()) - .continually_run(slash_report_task, vec![coordinator_handle.clone()]), - ); - - let (transaction_task, transaction_handle) = Task::new(); - tokio::spawn( - TransactionSignerTask::<_, SignableTransactionFor, _>::new( - db.clone(), - publisher.clone(), - session, - external_keys, - ) - .continually_run(transaction_task, vec![coordinator_handle.clone()]), - ); - tasks.insert( session, - Tasks { - cosigner: todo!("TODO"), - batch: batch_handle, - slash_report: slash_report_handle, - transaction: transaction_handle, - }, + Self::tasks( + db.clone(), + publisher.clone(), + coordinator_handle.clone(), + session, + substrate_keys, + external_keys, + ), ); } - Self { coordinator_handle, tasks, _Sch: PhantomData, _S: PhantomData } + Self { db, publisher, coordinator_handle, tasks, _Sch: PhantomData, _S: PhantomData } } /// Register a set of keys to sign with. @@ -247,7 +290,7 @@ impl> Signers { txn: &mut impl DbTxn, session: Session, substrate_keys: Vec>, - network_keys: Vec>>, + external_keys: Vec>>, ) { // Don't register already retired keys if Some(session.0) <= db::LatestRetiredSession::get(txn).map(|session| session.0) { @@ -262,12 +305,25 @@ impl> Signers { { let mut buf = Zeroizing::new(Vec::with_capacity(2 * substrate_keys.len() * 128)); - for (substrate_keys, network_keys) in substrate_keys.into_iter().zip(network_keys) { + for (substrate_keys, external_keys) in substrate_keys.iter().zip(&external_keys) { buf.extend(&*substrate_keys.serialize()); - buf.extend(&*network_keys.serialize()); + buf.extend(&*external_keys.serialize()); } db::SerializedKeys::set(txn, session, &buf); } + + // Spawn the tasks + self.tasks.insert( + session, + Self::tasks( + self.db.clone(), + self.publisher.clone(), + self.coordinator_handle.clone(), + session, + substrate_keys, + external_keys, + ), + ); } /// Retire the signers for a session. @@ -302,6 +358,9 @@ impl> Signers { let mut to_cleanup = db::ToCleanup::get(txn).unwrap_or(vec![]); to_cleanup.push((session, external_key.to_bytes().as_ref().to_vec())); db::ToCleanup::set(txn, &to_cleanup); + + // Drop the task handles, which will cause the tasks to close + self.tasks.remove(&session); } /// Queue handling a message. @@ -348,7 +407,7 @@ impl> Signers { block_number: u64, block: [u8; 32], ) { - db::Cosign::send(&mut txn, session, &(block_number, block)); + db::ToCosign::set(&mut txn, session, &(block_number, block)); txn.commit(); if let Some(tasks) = self.tasks.get(&session) { diff --git a/processor/signers/src/slash_report.rs b/processor/signers/src/slash_report.rs index bdb6cdba..19a2523b 100644 --- a/processor/signers/src/slash_report.rs +++ b/processor/signers/src/slash_report.rs @@ -26,7 +26,7 @@ use crate::{ WrappedSchnorrkelMachine, }; -// Fetches slash_reportes to sign and signs them. +// Fetches slash reports to sign and signs them. #[allow(non_snake_case)] pub(crate) struct SlashReportSignerTask { db: D, @@ -44,7 +44,7 @@ impl SlashReportSignerTask { let attempt_manager = AttemptManager::new( db.clone(), session, - keys.first().expect("creating a slash_report signer with 0 keys").params().i(), + keys.first().expect("creating a slash report signer with 0 keys").params().i(), ); Self { db, _S: PhantomData, session, keys, has_slash_report: false, attempt_manager }