Remove events from SubstrateSigner

Same vibes as prior commit.
This commit is contained in:
Luke Parker
2023-11-09 01:56:07 -05:00
parent 2eb155753a
commit 978134a9d1
3 changed files with 106 additions and 122 deletions

View File

@@ -45,7 +45,7 @@ mod signer;
use signer::Signer; use signer::Signer;
mod substrate_signer; mod substrate_signer;
use substrate_signer::{SubstrateSignerEvent, SubstrateSigner}; use substrate_signer::SubstrateSigner;
mod multisigs; mod multisigs;
use multisigs::{MultisigEvent, MultisigManager}; use multisigs::{MultisigEvent, MultisigManager};
@@ -218,14 +218,17 @@ async fn handle_coordinator_msg<D: Db, N: Network, Co: Coordinator>(
} }
CoordinatorMessage::Coordinator(msg) => { CoordinatorMessage::Coordinator(msg) => {
tributary_mutable if let Some(msg) = tributary_mutable
.substrate_signer .substrate_signer
.as_mut() .as_mut()
.expect( .expect(
"coordinator told us to sign a batch when we don't have a Substrate signer at this time", "coordinator told us to sign a batch when we don't have a Substrate signer at this time",
) )
.handle(txn, msg) .handle(txn, msg)
.await; .await
{
coordinator.send(msg).await;
}
} }
CoordinatorMessage::Substrate(msg) => { CoordinatorMessage::Substrate(msg) => {
@@ -552,7 +555,9 @@ async fn run<N: Network, D: Db, Co: Coordinator>(mut raw_db: D, network: N, mut
).await; ).await;
if let Some(substrate_signer) = tributary_mutable.substrate_signer.as_mut() { if let Some(substrate_signer) = tributary_mutable.substrate_signer.as_mut() {
substrate_signer.sign(txn, batch).await; if let Some(msg) = substrate_signer.sign(txn, batch).await {
coordinator.send(msg).await;
}
} }
} }
@@ -577,23 +582,6 @@ async fn run<N: Network, D: Db, Co: Coordinator>(mut raw_db: D, network: N, mut
}, },
} }
// Check if the signers have events
// The signers will only have events after the above select executes, so having no timeout on
// the above is fine
// TODO: Have the Signers return these events, allowing removing these channels?
if let Some(signer) = tributary_mutable.substrate_signer.as_mut() {
while let Some(msg) = signer.events.pop_front() {
match msg {
SubstrateSignerEvent::ProcessorMessage(msg) => {
coordinator.send(msg).await;
}
SubstrateSignerEvent::SignedBatch(batch) => {
coordinator.send(messages::substrate::ProcessorMessage::SignedBatch { batch }).await;
}
}
}
}
txn.into_inner().unwrap().commit(); txn.into_inner().unwrap().commit();
if let Some(msg) = outer_msg { if let Some(msg) = outer_msg {
coordinator.ack(msg).await; coordinator.ack(msg).await;

View File

@@ -1,5 +1,5 @@
use core::{marker::PhantomData, fmt}; use core::{marker::PhantomData, fmt};
use std::collections::{VecDeque, HashMap}; use std::collections::HashMap;
use rand_core::OsRng; use rand_core::OsRng;
@@ -31,12 +31,6 @@ fn batch_sign_id(network: NetworkId, id: u32) -> [u8; 5] {
(network, id).encode().try_into().unwrap() (network, id).encode().try_into().unwrap()
} }
#[derive(Debug)]
pub enum SubstrateSignerEvent {
ProcessorMessage(ProcessorMessage),
SignedBatch(SignedBatch),
}
#[derive(Debug)] #[derive(Debug)]
struct SubstrateSignerDb<D: Db>(D); struct SubstrateSignerDb<D: Db>(D);
impl<D: Db> SubstrateSignerDb<D> { impl<D: Db> SubstrateSignerDb<D> {
@@ -88,8 +82,6 @@ pub struct SubstrateSigner<D: Db> {
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
signing: signing:
HashMap<[u8; 5], (AlgorithmSignatureMachine<Ristretto, Schnorrkel>, Vec<SignatureShare>)>, HashMap<[u8; 5], (AlgorithmSignatureMachine<Ristretto, Schnorrkel>, Vec<SignatureShare>)>,
pub events: VecDeque<SubstrateSignerEvent>,
} }
impl<D: Db> fmt::Debug for SubstrateSigner<D> { impl<D: Db> fmt::Debug for SubstrateSigner<D> {
@@ -115,8 +107,6 @@ impl<D: Db> SubstrateSigner<D> {
attempt: HashMap::new(), attempt: HashMap::new(),
preprocessing: HashMap::new(), preprocessing: HashMap::new(),
signing: HashMap::new(), signing: HashMap::new(),
events: VecDeque::new(),
} }
} }
@@ -146,10 +136,16 @@ impl<D: Db> SubstrateSigner<D> {
Ok(()) Ok(())
} }
async fn attempt(&mut self, txn: &mut D::Transaction<'_>, id: [u8; 5], attempt: u32) { #[must_use]
async fn attempt(
&mut self,
txn: &mut D::Transaction<'_>,
id: [u8; 5],
attempt: u32,
) -> Option<ProcessorMessage> {
// See above commentary for why this doesn't emit SignedBatch // See above commentary for why this doesn't emit SignedBatch
if SubstrateSignerDb::<D>::completed(txn, id) { if SubstrateSignerDb::<D>::completed(txn, id) {
return; return None;
} }
// Check if we're already working on this attempt // Check if we're already working on this attempt
@@ -161,7 +157,7 @@ impl<D: Db> SubstrateSigner<D> {
attempt, attempt,
curr_attempt curr_attempt
); );
return; return None;
} }
} }
@@ -170,7 +166,7 @@ impl<D: Db> SubstrateSigner<D> {
batch.block batch.block
} else { } else {
warn!("told to attempt signing a batch we aren't currently signing for"); warn!("told to attempt signing a batch we aren't currently signing for");
return; return None;
}; };
// Delete any existing machines // Delete any existing machines
@@ -201,7 +197,7 @@ impl<D: Db> SubstrateSigner<D> {
hex::encode(id.id), hex::encode(id.id),
id.attempt id.attempt
); );
return; return None;
} }
SubstrateSignerDb::<D>::attempt(txn, &id); SubstrateSignerDb::<D>::attempt(txn, &id);
@@ -225,29 +221,37 @@ impl<D: Db> SubstrateSigner<D> {
self.preprocessing.insert(id.id, (machines, preprocesses)); self.preprocessing.insert(id.id, (machines, preprocesses));
// Broadcast our preprocesses // Broadcast our preprocesses
self.events.push_back(SubstrateSignerEvent::ProcessorMessage( Some(ProcessorMessage::BatchPreprocess { id, block, preprocesses: serialized_preprocesses })
ProcessorMessage::BatchPreprocess { id, block, preprocesses: serialized_preprocesses },
));
} }
pub async fn sign(&mut self, txn: &mut D::Transaction<'_>, batch: Batch) { #[must_use]
pub async fn sign(
&mut self,
txn: &mut D::Transaction<'_>,
batch: Batch,
) -> Option<ProcessorMessage> {
debug_assert_eq!(self.network, batch.network); debug_assert_eq!(self.network, batch.network);
let id = batch_sign_id(batch.network, batch.id); let id = batch_sign_id(batch.network, batch.id);
if SubstrateSignerDb::<D>::completed(txn, id) { if SubstrateSignerDb::<D>::completed(txn, id) {
debug!("Sign batch order for ID we've already completed signing"); debug!("Sign batch order for ID we've already completed signing");
// See batch_signed for commentary on why this simply returns // See batch_signed for commentary on why this simply returns
return; return None;
} }
self.signable.insert(id, batch); self.signable.insert(id, batch);
self.attempt(txn, id, 0).await; self.attempt(txn, id, 0).await
} }
pub async fn handle(&mut self, txn: &mut D::Transaction<'_>, msg: CoordinatorMessage) { #[must_use]
pub async fn handle(
&mut self,
txn: &mut D::Transaction<'_>,
msg: CoordinatorMessage,
) -> Option<messages::ProcessorMessage> {
match msg { match msg {
CoordinatorMessage::BatchPreprocesses { id, mut preprocesses } => { CoordinatorMessage::BatchPreprocesses { id, mut preprocesses } => {
if self.verify_id(&id).is_err() { if self.verify_id(&id).is_err() {
return; return None;
} }
let (machines, our_preprocesses) = match self.preprocessing.remove(&id.id) { let (machines, our_preprocesses) = match self.preprocessing.remove(&id.id) {
@@ -257,7 +261,7 @@ impl<D: Db> SubstrateSigner<D> {
"not preprocessing for {}. this is an error if we didn't reboot", "not preprocessing for {}. this is an error if we didn't reboot",
hex::encode(id.id), hex::encode(id.id),
); );
return; return None;
} }
Some(preprocess) => preprocess, Some(preprocess) => preprocess,
}; };
@@ -310,14 +314,12 @@ impl<D: Db> SubstrateSigner<D> {
self.signing.insert(id.id, (signature_machine.unwrap(), shares)); self.signing.insert(id.id, (signature_machine.unwrap(), shares));
// Broadcast our shares // Broadcast our shares
self.events.push_back(SubstrateSignerEvent::ProcessorMessage( Some((ProcessorMessage::BatchShare { id, shares: serialized_shares }).into())
ProcessorMessage::BatchShare { id, shares: serialized_shares },
));
} }
CoordinatorMessage::BatchShares { id, mut shares } => { CoordinatorMessage::BatchShares { id, mut shares } => {
if self.verify_id(&id).is_err() { if self.verify_id(&id).is_err() {
return; return None;
} }
let (machine, our_shares) = match self.signing.remove(&id.id) { let (machine, our_shares) = match self.signing.remove(&id.id) {
@@ -333,7 +335,7 @@ impl<D: Db> SubstrateSigner<D> {
"not preprocessing for {}. this is an error if we didn't reboot", "not preprocessing for {}. this is an error if we didn't reboot",
hex::encode(id.id) hex::encode(id.id)
); );
return; return None;
} }
Some(signing) => signing, Some(signing) => signing,
}; };
@@ -377,11 +379,11 @@ impl<D: Db> SubstrateSigner<D> {
assert!(self.preprocessing.remove(&id.id).is_none()); assert!(self.preprocessing.remove(&id.id).is_none());
assert!(self.signing.remove(&id.id).is_none()); assert!(self.signing.remove(&id.id).is_none());
self.events.push_back(SubstrateSignerEvent::SignedBatch(batch)); Some((messages::substrate::ProcessorMessage::SignedBatch { batch }).into())
} }
CoordinatorMessage::BatchReattempt { id } => { CoordinatorMessage::BatchReattempt { id } => {
self.attempt(txn, id.id, id.attempt).await; self.attempt(txn, id.id, id.attempt).await.map(Into::into)
} }
} }
} }
@@ -407,8 +409,8 @@ impl<D: Db> SubstrateSigner<D> {
// While a successive batch's signing would also cause this block to be acknowledged, Substrate // While a successive batch's signing would also cause this block to be acknowledged, Substrate
// guarantees a batch's ordered inclusion // guarantees a batch's ordered inclusion
// This also doesn't emit any further events since all mutation from the Batch being signed // This also doesn't return any messages since all mutation from the Batch being signed happens
// happens on the substrate::CoordinatorMessage::SubstrateBlock message (which SignedBatch is // on the substrate::CoordinatorMessage::SubstrateBlock message (which SignedBatch is meant to
// meant to end up triggering) // end up triggering)
} }
} }

View File

@@ -16,8 +16,12 @@ use serai_db::{DbTxn, Db, MemDb};
use scale::Encode; use scale::Encode;
use serai_client::{primitives::*, in_instructions::primitives::*}; use serai_client::{primitives::*, in_instructions::primitives::*};
use messages::coordinator::*; use messages::{
use crate::substrate_signer::{SubstrateSignerEvent, SubstrateSigner}; substrate,
coordinator::{self, BatchSignId, CoordinatorMessage},
ProcessorMessage,
};
use crate::substrate_signer::SubstrateSigner;
#[tokio::test] #[tokio::test]
async fn test_substrate_signer() { async fn test_substrate_signer() {
@@ -50,28 +54,10 @@ async fn test_substrate_signer() {
attempt: 0, attempt: 0,
}; };
let mut signers = HashMap::new();
let mut dbs = HashMap::new();
let mut t = 0;
for i in 1 ..= keys.len() {
let i = Participant::new(u16::try_from(i).unwrap()).unwrap();
let keys = keys.get(&i).unwrap().clone();
t = keys.params().t();
let mut signer = SubstrateSigner::<MemDb>::new(NetworkId::Monero, vec![keys]);
let mut db = MemDb::new();
let mut txn = db.txn();
signer.sign(&mut txn, batch.clone()).await;
txn.commit();
signers.insert(i, signer);
dbs.insert(i, db);
}
let mut signing_set = vec![]; let mut signing_set = vec![];
while signing_set.len() < usize::from(t) { while signing_set.len() < usize::from(keys.values().next().unwrap().params().t()) {
let candidate = Participant::new( let candidate = Participant::new(
u16::try_from((OsRng.next_u64() % u64::try_from(signers.len()).unwrap()) + 1).unwrap(), u16::try_from((OsRng.next_u64() % u64::try_from(keys.len()).unwrap()) + 1).unwrap(),
) )
.unwrap(); .unwrap();
if signing_set.contains(&candidate) { if signing_set.contains(&candidate) {
@@ -80,31 +66,43 @@ async fn test_substrate_signer() {
signing_set.push(candidate); signing_set.push(candidate);
} }
// All participants should emit a preprocess let mut signers = HashMap::new();
let mut dbs = HashMap::new();
let mut preprocesses = HashMap::new(); let mut preprocesses = HashMap::new();
for i in 1 ..= signers.len() { for i in 1 ..= keys.len() {
let i = Participant::new(u16::try_from(i).unwrap()).unwrap(); let i = Participant::new(u16::try_from(i).unwrap()).unwrap();
if let SubstrateSignerEvent::ProcessorMessage(ProcessorMessage::BatchPreprocess { let keys = keys.get(&i).unwrap().clone();
let mut signer = SubstrateSigner::<MemDb>::new(NetworkId::Monero, vec![keys]);
let mut db = MemDb::new();
let mut txn = db.txn();
match signer.sign(&mut txn, batch.clone()).await.unwrap() {
// All participants should emit a preprocess
coordinator::ProcessorMessage::BatchPreprocess {
id, id,
block: batch_block, block: batch_block,
preprocesses: mut these_preprocesses, preprocesses: mut these_preprocesses,
}) = signers.get_mut(&i).unwrap().events.pop_front().unwrap() } => {
{
assert_eq!(id, actual_id); assert_eq!(id, actual_id);
assert_eq!(batch_block, block); assert_eq!(batch_block, block);
assert_eq!(these_preprocesses.len(), 1); assert_eq!(these_preprocesses.len(), 1);
if signing_set.contains(&i) { if signing_set.contains(&i) {
preprocesses.insert(i, these_preprocesses.swap_remove(0)); preprocesses.insert(i, these_preprocesses.swap_remove(0));
} }
} else {
panic!("didn't get preprocess back");
} }
_ => panic!("didn't get preprocess back"),
}
txn.commit();
signers.insert(i, signer);
dbs.insert(i, db);
} }
let mut shares = HashMap::new(); let mut shares = HashMap::new();
for i in &signing_set { for i in &signing_set {
let mut txn = dbs.get_mut(i).unwrap().txn(); let mut txn = dbs.get_mut(i).unwrap().txn();
signers match signers
.get_mut(i) .get_mut(i)
.unwrap() .unwrap()
.handle( .handle(
@@ -114,25 +112,25 @@ async fn test_substrate_signer() {
preprocesses: clone_without(&preprocesses, i), preprocesses: clone_without(&preprocesses, i),
}, },
) )
.await; .await
txn.commit(); .unwrap()
{
if let SubstrateSignerEvent::ProcessorMessage(ProcessorMessage::BatchShare { ProcessorMessage::Coordinator(coordinator::ProcessorMessage::BatchShare {
id, id,
shares: mut these_shares, shares: mut these_shares,
}) = signers.get_mut(i).unwrap().events.pop_front().unwrap() }) => {
{
assert_eq!(id, actual_id); assert_eq!(id, actual_id);
assert_eq!(these_shares.len(), 1); assert_eq!(these_shares.len(), 1);
shares.insert(*i, these_shares.swap_remove(0)); shares.insert(*i, these_shares.swap_remove(0));
} else {
panic!("didn't get share back");
} }
_ => panic!("didn't get share back"),
}
txn.commit();
} }
for i in &signing_set { for i in &signing_set {
let mut txn = dbs.get_mut(i).unwrap().txn(); let mut txn = dbs.get_mut(i).unwrap().txn();
signers match signers
.get_mut(i) .get_mut(i)
.unwrap() .unwrap()
.handle( .handle(
@@ -142,22 +140,18 @@ async fn test_substrate_signer() {
shares: clone_without(&shares, i), shares: clone_without(&shares, i),
}, },
) )
.await; .await
txn.commit(); .unwrap()
if let SubstrateSignerEvent::SignedBatch(signed_batch) =
signers.get_mut(i).unwrap().events.pop_front().unwrap()
{ {
ProcessorMessage::Substrate(substrate::ProcessorMessage::SignedBatch {
batch: signed_batch,
}) => {
assert_eq!(signed_batch.batch, batch); assert_eq!(signed_batch.batch, batch);
assert!(Public::from_raw(keys[&participant_one].group_key().to_bytes()) assert!(Public::from_raw(keys[&participant_one].group_key().to_bytes())
.verify(&batch_message(&batch), &signed_batch.signature)); .verify(&batch_message(&batch), &signed_batch.signature));
} else {
panic!("didn't get signed batch back");
} }
_ => panic!("didn't get signed batch back"),
} }
txn.commit();
// Make sure there's no events left
for (_, mut signer) in signers.drain() {
assert!(signer.events.pop_front().is_none());
} }
} }