DKG Removals (#467)

* Update ValidatorSets with a remove_participant call

* Add DkgRemoval, a sign machine for producing the relevant MuSig signatures

* Don't use position-dependent u8s yet Public when removing validators from the DKG

* Add DkgRemovalPreprocess, DkgRemovalShares

Implementation is via a new publish_tributary_tx lambda.

This is code is a copy-pasted mess which will need to be cleaned up.

* Only allow non-removed validators to vote for removals

Otherwise, it's risked that the remaining validators fall below 67% of the
original set.

* Correct publish_serai_tx, which was prior publish_set_keys in practice
This commit is contained in:
Luke Parker
2023-12-04 07:04:44 -05:00
committed by GitHub
parent 99c6375605
commit 797ed49e7b
11 changed files with 1022 additions and 170 deletions

View File

@@ -199,7 +199,6 @@ async fn handle_processor_message<D: Db, P: P2p>(
key_gen::ProcessorMessage::GeneratedKeyPair { id, .. } => Some(id.session),
key_gen::ProcessorMessage::Blame { id, .. } => Some(id.session),
},
// TODO: Review replacing key with Session in messages?
ProcessorMessage::Sign(inner_msg) => match inner_msg {
// We'll only receive InvalidParticipant/Preprocess/Share if we're actively signing
sign::ProcessorMessage::InvalidParticipant { id, .. } => Some(id.session),

View File

@@ -24,7 +24,10 @@ use processor_messages::{
use tributary::{TransactionTrait, Tributary};
use crate::{
tributary::{Transaction, TributarySpec, scanner::handle_new_blocks},
tributary::{
Transaction, TributarySpec,
scanner::{PstTxType, handle_new_blocks},
},
tests::{
MemProcessors, LocalP2p,
tributary::{new_keys, new_spec, new_tributaries, run_tributaries, wait_for_tx_inclusion},
@@ -85,14 +88,19 @@ async fn dkg_test() {
) -> (MemDb, MemProcessors) {
let mut scanner_db = MemDb::new();
let processors = MemProcessors::new();
handle_new_blocks::<_, _, _, _, _, _, LocalP2p>(
handle_new_blocks::<_, _, _, _, _, _, _, _, LocalP2p>(
&mut scanner_db,
key,
|_, _, _, _| async {
panic!("provided TX caused recognized_id to be called in new_processors")
},
&processors,
|_, _| async { panic!("test tried to publish a new Serai TX in new_processors") },
|_, _, _| async { panic!("test tried to publish a new Serai TX in new_processors") },
&|_| async {
panic!(
"test tried to publish a new Tributary TX from handle_application_tx in new_processors"
)
},
spec,
&tributary.reader(),
)
@@ -111,14 +119,19 @@ async fn dkg_test() {
sleep(Duration::from_secs(Tributary::<MemDb, Transaction, LocalP2p>::block_time().into())).await;
// Verify the scanner emits a KeyGen::Commitments message
handle_new_blocks::<_, _, _, _, _, _, LocalP2p>(
handle_new_blocks::<_, _, _, _, _, _, _, _, LocalP2p>(
&mut scanner_db,
&keys[0],
|_, _, _, _| async {
panic!("provided TX caused recognized_id to be called after Commitments")
},
&processors,
|_, _| async { panic!("test tried to publish a new Serai TX after Commitments") },
|_, _, _| async { panic!("test tried to publish a new Serai TX after Commitments") },
&|_| async {
panic!(
"test tried to publish a new Tributary TX from handle_application_tx after Commitments"
)
},
&spec,
&tributaries[0].1.reader(),
)
@@ -190,14 +203,19 @@ async fn dkg_test() {
}
// With just 4 sets of shares, nothing should happen yet
handle_new_blocks::<_, _, _, _, _, _, LocalP2p>(
handle_new_blocks::<_, _, _, _, _, _, _, _, LocalP2p>(
&mut scanner_db,
&keys[0],
|_, _, _, _| async {
panic!("provided TX caused recognized_id to be called after some shares")
},
&processors,
|_, _| async { panic!("test tried to publish a new Serai TX after some shares") },
|_, _, _| async { panic!("test tried to publish a new Serai TX after some shares") },
&|_| async {
panic!(
"test tried to publish a new Tributary TX from handle_application_tx after some shares"
)
},
&spec,
&tributaries[0].1.reader(),
)
@@ -238,12 +256,13 @@ async fn dkg_test() {
};
// Any scanner which has handled the prior blocks should only emit the new event
handle_new_blocks::<_, _, _, _, _, _, LocalP2p>(
handle_new_blocks::<_, _, _, _, _, _, _, _, LocalP2p>(
&mut scanner_db,
&keys[0],
|_, _, _, _| async { panic!("provided TX caused recognized_id to be called after shares") },
&processors,
|_, _| async { panic!("test tried to publish a new Serai TX") },
|_, _, _| async { panic!("test tried to publish a new Serai TX") },
&|_| async { panic!("test tried to publish a new Tributary TX from handle_application_tx") },
&spec,
&tributaries[0].1.reader(),
)
@@ -308,14 +327,16 @@ async fn dkg_test() {
}
// The scanner should successfully try to publish a transaction with a validly signed signature
handle_new_blocks::<_, _, _, _, _, _, LocalP2p>(
handle_new_blocks::<_, _, _, _, _, _, _, _, LocalP2p>(
&mut scanner_db,
&keys[0],
|_, _, _, _| async {
panic!("provided TX caused recognized_id to be called after DKG confirmation")
},
&processors,
|set, tx| {
|set, tx_type, tx| {
assert_eq!(tx_type, PstTxType::SetKeys);
let spec = spec.clone();
let key_pair = key_pair.clone();
async move {
@@ -368,6 +389,7 @@ async fn dkg_test() {
}
}
},
&|_| async { panic!("test tried to publish a new Tributary TX from handle_application_tx") },
&spec,
&tributaries[0].1.reader(),
)

View File

@@ -202,6 +202,17 @@ fn serialize_transaction() {
random_signed_with_nonce(&mut OsRng, 2),
));
{
let mut key = [0; 32];
OsRng.fill_bytes(&mut key);
test_read_write(Transaction::DkgRemovalPreprocess(random_sign_data(&mut OsRng, key, 0)));
}
{
let mut key = [0; 32];
OsRng.fill_bytes(&mut key);
test_read_write(Transaction::DkgRemovalShare(random_sign_data(&mut OsRng, key, 1)));
}
{
let mut block = [0; 32];
OsRng.fill_bytes(&mut block);

View File

@@ -18,6 +18,7 @@ use crate::tributary::TributarySpec;
#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, Decode)]
pub enum Topic {
Dkg,
DkgRemoval([u8; 32]),
SubstrateSign(SubstrateSignableId),
Sign([u8; 32]),
}
@@ -49,7 +50,10 @@ create_db!(
DkgShare: (genesis: [u8; 32], from: u16, to: u16) -> Vec<u8>,
PlanIds: (genesis: &[u8], block: u64) -> Vec<[u8; 32]>,
ConfirmationNonces: (genesis: [u8; 32], attempt: u32) -> HashMap<Participant, Vec<u8>>,
RemovalNonces:
(genesis: [u8; 32], removing: [u8; 32], attempt: u32) -> HashMap<Participant, Vec<u8>>,
CurrentlyCompletingKeyPair: (genesis: [u8; 32]) -> KeyPair,
DkgCompleted: (genesis: [u8; 32]) -> (),
AttemptDb: (genesis: [u8; 32], topic: &Topic) -> u32,
DataReceived: (genesis: [u8; 32], data_spec: &DataSpecification) -> u16,
DataDb: (genesis: [u8; 32], data_spec: &DataSpecification, signer_bytes: &[u8; 32]) -> Vec<u8>,

View File

@@ -0,0 +1,241 @@
use core::ops::Deref;
use std::collections::HashMap;
use zeroize::Zeroizing;
use rand_core::SeedableRng;
use rand_chacha::ChaCha20Rng;
use transcript::{Transcript, RecommendedTranscript};
use ciphersuite::{
group::{Group, GroupEncoding},
Ciphersuite, Ristretto,
};
use frost::{
FrostError,
dkg::{Participant, musig::musig},
sign::*,
};
use frost_schnorrkel::Schnorrkel;
use serai_client::{
Public,
validator_sets::primitives::{musig_context, remove_participant_message},
};
use crate::tributary::TributarySpec;
/*
The following is a clone of DkgConfirmer modified for DKG removals.
The notable difference is this uses a MuSig key of the first `t` participants to respond with
preprocesses, not all `n` participants.
TODO: Exact same commentary on seeded RNGs. The following can drop its seeded RNG if cached
preprocesses are used to carry the preprocess between machines
*/
pub(crate) struct DkgRemoval;
impl DkgRemoval {
// Convert the passed in HashMap, which uses the validators' start index for their `s` threshold
// shares, to the indexes needed for MuSig
fn from_threshold_i_to_musig_i(
mut old_map: HashMap<[u8; 32], Vec<u8>>,
) -> HashMap<Participant, Vec<u8>> {
let mut new_map = HashMap::new();
let mut participating = old_map.keys().cloned().collect::<Vec<_>>();
participating.sort();
for (i, participating) in participating.into_iter().enumerate() {
new_map.insert(
Participant::new(u16::try_from(i + 1).unwrap()).unwrap(),
old_map.remove(&participating).unwrap(),
);
}
new_map
}
fn preprocess_rng(
spec: &TributarySpec,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
attempt: u32,
) -> ChaCha20Rng {
ChaCha20Rng::from_seed({
let mut entropy_transcript = RecommendedTranscript::new(b"DkgRemoval Entropy");
entropy_transcript.append_message(b"spec", spec.serialize());
entropy_transcript.append_message(b"key", Zeroizing::new(key.to_bytes()));
entropy_transcript.append_message(b"attempt", attempt.to_le_bytes());
Zeroizing::new(entropy_transcript).rng_seed(b"preprocess")
})
}
fn preprocess_internal(
spec: &TributarySpec,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
attempt: u32,
participants: Option<&[<Ristretto as Ciphersuite>::G]>,
) -> (Option<AlgorithmSignMachine<Ristretto, Schnorrkel>>, [u8; 64]) {
// TODO: Diversify this among DkgConfirmer/DkgRemoval?
let context = musig_context(spec.set());
let (_, preprocess) = AlgorithmMachine::new(
Schnorrkel::new(b"substrate"),
// Preprocess with our key alone as we don't know the signing set yet
musig(&context, key, &[<Ristretto as Ciphersuite>::G::generator() * key.deref()])
.expect("couldn't get the MuSig key of our key alone")
.into(),
)
.preprocess(&mut Self::preprocess_rng(spec, key, attempt));
let machine = if let Some(participants) = participants {
let (machine, actual_preprocess) = AlgorithmMachine::new(
Schnorrkel::new(b"substrate"),
// Preprocess with our key alone as we don't know the signing set yet
musig(&context, key, participants)
.expect(
"couldn't create a MuSig key for the DKG removal we're supposedly participating in",
)
.into(),
)
.preprocess(&mut Self::preprocess_rng(spec, key, attempt));
// Doesn't use assert_eq due to lack of Debug
assert!(preprocess == actual_preprocess);
Some(machine)
} else {
None
};
(machine, preprocess.serialize().try_into().unwrap())
}
// Get the preprocess for this confirmation.
pub(crate) fn preprocess(
spec: &TributarySpec,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
attempt: u32,
) -> [u8; 64] {
Self::preprocess_internal(spec, key, attempt, None).1
}
fn share_internal(
spec: &TributarySpec,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
attempt: u32,
mut preprocesses: HashMap<Participant, Vec<u8>>,
removed: [u8; 32],
) -> Result<(AlgorithmSignatureMachine<Ristretto, Schnorrkel>, [u8; 32]), Participant> {
// TODO: Remove this ugly blob
let preprocesses = {
let mut preprocesses_participants = preprocesses.keys().cloned().collect::<Vec<_>>();
preprocesses_participants.sort();
let mut actual_keys = vec![];
let spec_validators = spec.validators();
for participant in &preprocesses_participants {
for (validator, _) in &spec_validators {
if participant == &spec.i(*validator).unwrap().start {
actual_keys.push(*validator);
}
}
}
let mut new_preprocesses = HashMap::new();
for (participant, actual_key) in
preprocesses_participants.into_iter().zip(actual_keys.into_iter())
{
new_preprocesses.insert(actual_key, preprocesses.remove(&participant).unwrap());
}
new_preprocesses
};
let participants = preprocesses.keys().cloned().collect::<Vec<_>>();
let preprocesses = Self::from_threshold_i_to_musig_i(
preprocesses.into_iter().map(|(key, preprocess)| (key.to_bytes(), preprocess)).collect(),
);
let machine = Self::preprocess_internal(spec, key, attempt, Some(&participants)).0.unwrap();
let preprocesses = preprocesses
.into_iter()
.map(|(p, preprocess)| {
machine
.read_preprocess(&mut preprocess.as_slice())
.map(|preprocess| (p, preprocess))
.map_err(|_| p)
})
.collect::<Result<HashMap<_, _>, _>>()?;
let (machine, share) = machine
.sign(preprocesses, &remove_participant_message(&spec.set(), Public(removed)))
.map_err(|e| match e {
FrostError::InternalError(e) => unreachable!("FrostError::InternalError {e}"),
FrostError::InvalidParticipant(_, _) |
FrostError::InvalidSigningSet(_) |
FrostError::InvalidParticipantQuantity(_, _) |
FrostError::DuplicatedParticipant(_) |
FrostError::MissingParticipant(_) => unreachable!("{e:?}"),
FrostError::InvalidPreprocess(p) | FrostError::InvalidShare(p) => p,
})?;
Ok((machine, share.serialize().try_into().unwrap()))
}
// Get the share for this confirmation, if the preprocesses are valid.
pub(crate) fn share(
spec: &TributarySpec,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
attempt: u32,
preprocesses: HashMap<Participant, Vec<u8>>,
removed: [u8; 32],
) -> Result<[u8; 32], Participant> {
Self::share_internal(spec, key, attempt, preprocesses, removed).map(|(_, share)| share)
}
pub(crate) fn complete(
spec: &TributarySpec,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
attempt: u32,
preprocesses: HashMap<Participant, Vec<u8>>,
removed: [u8; 32],
mut shares: HashMap<Participant, Vec<u8>>,
) -> Result<(Vec<Public>, [u8; 64]), Participant> {
// TODO: Remove this ugly blob
let shares = {
let mut shares_participants = shares.keys().cloned().collect::<Vec<_>>();
shares_participants.sort();
let mut actual_keys = vec![];
let spec_validators = spec.validators();
for participant in &shares_participants {
for (validator, _) in &spec_validators {
if participant == &spec.i(*validator).unwrap().start {
actual_keys.push(*validator);
}
}
}
let mut new_shares = HashMap::new();
for (participant, actual_key) in shares_participants.into_iter().zip(actual_keys.into_iter())
{
new_shares.insert(actual_key.to_bytes(), shares.remove(&participant).unwrap());
}
new_shares
};
let mut signers = shares.keys().cloned().map(Public).collect::<Vec<_>>();
signers.sort();
let machine = Self::share_internal(spec, key, attempt, preprocesses, removed)
.expect("trying to complete a machine which failed to preprocess")
.0;
let shares = Self::from_threshold_i_to_musig_i(shares)
.into_iter()
.map(|(p, share)| {
machine.read_share(&mut share.as_slice()).map(|share| (p, share)).map_err(|_| p)
})
.collect::<Result<HashMap<_, _>, _>>()?;
let signature = machine.complete(shares).map_err(|e| match e {
FrostError::InternalError(e) => unreachable!("FrostError::InternalError {e}"),
FrostError::InvalidParticipant(_, _) |
FrostError::InvalidSigningSet(_) |
FrostError::InvalidParticipantQuantity(_, _) |
FrostError::DuplicatedParticipant(_) |
FrostError::MissingParticipant(_) => unreachable!("{e:?}"),
FrostError::InvalidPreprocess(p) | FrostError::InvalidShare(p) => p,
})?;
Ok((signers, signature.to_bytes()))
}
}

View File

@@ -1,6 +1,8 @@
use core::{ops::Deref, future::Future};
use std::collections::HashMap;
use rand_core::OsRng;
use zeroize::{Zeroize, Zeroizing};
use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
@@ -26,10 +28,13 @@ use serai_db::{Get, Db};
use crate::{
processors::Processors,
tributary::{
Transaction, TributarySpec, SeraiBlockNumber, Topic, DataSpecification, DataSet, Accumulation,
SignData, Transaction, TributarySpec, SeraiBlockNumber, Topic, DataSpecification, DataSet,
Accumulation,
dkg_confirmer::DkgConfirmer,
scanner::{RecognizedIdType, RIDTrait},
FatallySlashed, DkgShare, PlanIds, ConfirmationNonces, AttemptDb, DataDb,
dkg_removal::DkgRemoval,
scanner::{RecognizedIdType, RIDTrait, PstTxType},
FatallySlashed, DkgShare, DkgCompleted, PlanIds, ConfirmationNonces, RemovalNonces, AttemptDb,
DataDb,
},
};
@@ -40,8 +45,11 @@ const DKG_SHARES: &str = "shares";
const DKG_CONFIRMATION_NONCES: &str = "confirmation_nonces";
const DKG_CONFIRMATION_SHARES: &str = "confirmation_shares";
// These s/b prefixes between Batch and Sign should be unnecessary, as Batch/Share entries
// themselves should already be domain separated
// These d/s/b prefixes between DKG Removal, Batch, and Sign should be unnecessary, as Batch/Share
// entries themselves should already be domain separated
const DKG_REMOVAL_PREPROCESS: &str = "d_preprocess";
const DKG_REMOVAL_SHARE: &str = "d_share";
const BATCH_PREPROCESS: &str = "b_preprocess";
const BATCH_SHARE: &str = "b_share";
@@ -94,26 +102,54 @@ pub fn generated_key_pair<D: Db>(
DkgConfirmer::share(spec, key, attempt, preprocesses, key_pair)
}
pub(super) fn fatal_slash<D: Db>(
pub(super) async fn fatal_slash<
D: Db,
FPtt: Future<Output = ()>,
PTT: Clone + Fn(Transaction) -> FPtt,
>(
txn: &mut D::Transaction<'_>,
genesis: [u8; 32],
account: [u8; 32],
spec: &TributarySpec,
publish_tributary_tx: &PTT,
our_key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
slashing: [u8; 32],
reason: &str,
) {
log::warn!("fatally slashing {}. reason: {}", hex::encode(account), reason);
FatallySlashed::set_fatally_slashed(txn, genesis, account);
let genesis = spec.genesis();
log::warn!("fatally slashing {}. reason: {}", hex::encode(slashing), reason);
FatallySlashed::set_fatally_slashed(txn, genesis, slashing);
// TODO: disconnect the node from network/ban from further participation in all Tributaries
// TODO: If during DKG, trigger a re-attempt
// Despite triggering a re-attempt, this DKG may still complete and may become in-use
// If during a DKG, remove the participant
if DkgCompleted::get(txn, genesis).is_none() {
let preprocess = DkgRemoval::preprocess(spec, our_key, 0);
let mut tx = Transaction::DkgRemovalPreprocess(SignData {
plan: slashing,
attempt: 0,
data: vec![preprocess.to_vec()],
signed: Transaction::empty_signed(),
});
tx.sign(&mut OsRng, genesis, our_key);
publish_tributary_tx(tx).await;
}
}
// TODO: Once Substrate confirms a key, we need to rotate our validator set OR form a second
// Tributary post-DKG
// https://github.com/serai-dex/serai/issues/426
fn fatal_slash_with_participant_index<D: Db>(
spec: &TributarySpec,
async fn fatal_slash_with_participant_index<
D: Db,
FPtt: Future<Output = ()>,
PTT: Clone + Fn(Transaction) -> FPtt,
>(
txn: &mut <D as Db>::Transaction<'_>,
spec: &TributarySpec,
publish_tributary_tx: &PTT,
our_key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
i: Participant,
reason: &str,
) {
@@ -129,14 +165,18 @@ fn fatal_slash_with_participant_index<D: Db>(
}
let validator = validator.unwrap();
fatal_slash::<D>(txn, spec.genesis(), validator.to_bytes(), reason);
fatal_slash::<D, _, _>(txn, spec, publish_tributary_tx, our_key, validator.to_bytes(), reason)
.await;
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn handle_application_tx<
D: Db,
Pro: Processors,
FPst: Future<Output = ()>,
PST: Clone + Fn(ValidatorSet, Vec<u8>) -> FPst,
PST: Clone + Fn(ValidatorSet, PstTxType, Vec<u8>) -> FPst,
FPtt: Future<Output = ()>,
PTT: Clone + Fn(Transaction) -> FPtt,
FRid: Future<Output = ()>,
RID: RIDTrait<FRid>,
>(
@@ -144,6 +184,7 @@ pub(crate) async fn handle_application_tx<
spec: &TributarySpec,
processors: &Pro,
publish_serai_tx: PST,
publish_tributary_tx: &PTT,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
recognized_id: RID,
txn: &mut <D as Db>::Transaction<'_>,
@@ -159,18 +200,28 @@ pub(crate) async fn handle_application_tx<
}
}
let handle = |txn: &mut <D as Db>::Transaction<'_>,
data_spec: &DataSpecification,
bytes: Vec<u8>,
signed: &Signed| {
async fn handle<D: Db, FPtt: Future<Output = ()>, PTT: Clone + Fn(Transaction) -> FPtt>(
txn: &mut <D as Db>::Transaction<'_>,
spec: &TributarySpec,
publish_tributary_tx: &PTT,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
data_spec: &DataSpecification,
bytes: Vec<u8>,
signed: &Signed,
) -> Accumulation {
let genesis = spec.genesis();
let Some(curr_attempt) = AttemptDb::attempt(txn, genesis, data_spec.topic) else {
// Premature publication of a valid ID/publication of an invalid ID
fatal_slash::<D>(
fatal_slash::<D, _, _>(
txn,
genesis,
spec,
publish_tributary_tx,
key,
signed.signer.to_bytes(),
"published data for ID without an attempt",
);
)
.await;
return Accumulation::NotReady;
};
@@ -178,7 +229,15 @@ pub(crate) async fn handle_application_tx<
// This shouldn't be reachable since nonces were made inserted by the coordinator, yet it's a
// cheap check to leave in for safety
if DataDb::get(txn, genesis, data_spec, &signed.signer.to_bytes()).is_some() {
fatal_slash::<D>(txn, genesis, signed.signer.to_bytes(), "published data multiple times");
fatal_slash::<D, _, _>(
txn,
spec,
publish_tributary_tx,
key,
signed.signer.to_bytes(),
"published data multiple times",
)
.await;
return Accumulation::NotReady;
}
@@ -189,12 +248,15 @@ pub(crate) async fn handle_application_tx<
}
// If the attempt is greater, this is a premature publication, full slash
if data_spec.attempt > curr_attempt {
fatal_slash::<D>(
fatal_slash::<D, _, _>(
txn,
genesis,
spec,
publish_tributary_tx,
key,
signed.signer.to_bytes(),
"published data with an attempt which hasn't started",
);
)
.await;
return Accumulation::NotReady;
}
@@ -205,22 +267,31 @@ pub(crate) async fn handle_application_tx<
// Accumulate this data
DataDb::accumulate(txn, key, spec, data_spec, signed.signer, &bytes)
};
}
fn check_sign_data_len<D: Db>(
async fn check_sign_data_len<
D: Db,
FPtt: Future<Output = ()>,
PTT: Clone + Fn(Transaction) -> FPtt,
>(
txn: &mut D::Transaction<'_>,
spec: &TributarySpec,
publish_tributary_tx: &PTT,
our_key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
signer: <Ristretto as Ciphersuite>::G,
len: usize,
) -> Result<(), ()> {
let signer_i = spec.i(signer).unwrap();
if len != usize::from(u16::from(signer_i.end) - u16::from(signer_i.start)) {
fatal_slash::<D>(
fatal_slash::<D, _, _>(
txn,
spec.genesis(),
spec,
publish_tributary_tx,
our_key,
signer.to_bytes(),
"signer published a distinct amount of sign data than they had shares",
);
)
.await;
Err(())?;
}
Ok(())
@@ -242,18 +313,40 @@ pub(crate) async fn handle_application_tx<
match tx {
Transaction::RemoveParticipant(i) => {
fatal_slash_with_participant_index::<D>(spec, txn, i, "RemoveParticipant Provided TX")
fatal_slash_with_participant_index::<D, _, _>(
txn,
spec,
publish_tributary_tx,
key,
i,
"RemoveParticipant Provided TX",
)
.await
}
Transaction::DkgCommitments(attempt, commitments, signed) => {
let Ok(_) = check_sign_data_len::<D>(txn, spec, signed.signer, commitments.len()) else {
let Ok(_) = check_sign_data_len::<D, _, _>(
txn,
spec,
publish_tributary_tx,
key,
signed.signer,
commitments.len(),
)
.await
else {
return;
};
match handle(
match handle::<D, _, _>(
txn,
spec,
publish_tributary_tx,
key,
&DataSpecification { topic: Topic::Dkg, label: DKG_COMMITMENTS, attempt },
commitments.encode(),
&signed,
) {
)
.await
{
Accumulation::Ready(DataSet::Participating(mut commitments)) => {
log::info!("got all DkgCommitments for {}", hex::encode(genesis));
unflatten(spec, &mut commitments);
@@ -281,17 +374,28 @@ pub(crate) async fn handle_application_tx<
let sender_is_len = u16::from(sender_i.end) - u16::from(sender_i.start);
if shares.len() != usize::from(sender_is_len) {
fatal_slash::<D>(
fatal_slash::<D, _, _>(
txn,
genesis,
spec,
publish_tributary_tx,
key,
signed.signer.to_bytes(),
"invalid amount of DKG shares by key shares",
);
)
.await;
return;
}
for shares in &shares {
if shares.len() != (usize::from(spec.n() - sender_is_len)) {
fatal_slash::<D>(txn, genesis, signed.signer.to_bytes(), "invalid amount of DKG shares");
fatal_slash::<D, _, _>(
txn,
spec,
publish_tributary_tx,
key,
signed.signer.to_bytes(),
"invalid amount of DKG shares",
)
.await;
return;
}
}
@@ -348,18 +452,27 @@ pub(crate) async fn handle_application_tx<
// Drop shares as it's been mutated into invalidity
drop(shares);
let confirmation_nonces = handle(
let confirmation_nonces = handle::<D, _, _>(
txn,
spec,
publish_tributary_tx,
key,
&DataSpecification { topic: Topic::Dkg, label: DKG_CONFIRMATION_NONCES, attempt },
confirmation_nonces.to_vec(),
&signed,
);
match handle(
)
.await;
match handle::<D, _, _>(
txn,
spec,
publish_tributary_tx,
key,
&DataSpecification { topic: Topic::Dkg, label: DKG_SHARES, attempt },
our_shares.encode(),
&signed,
) {
)
.await
{
Accumulation::Ready(DataSet::Participating(shares)) => {
log::info!("got all DkgShares for {}", hex::encode(genesis));
@@ -417,24 +530,30 @@ pub(crate) async fn handle_application_tx<
if (u16::from(accuser) < u16::from(range.start)) ||
(u16::from(range.end) <= u16::from(accuser))
{
fatal_slash::<D>(
fatal_slash::<D, _, _>(
txn,
genesis,
spec,
publish_tributary_tx,
key,
signed.signer.to_bytes(),
"accused with a Participant index which wasn't theirs",
);
)
.await;
return;
}
if !((u16::from(range.start) <= u16::from(faulty)) &&
(u16::from(faulty) < u16::from(range.end)))
{
fatal_slash::<D>(
fatal_slash::<D, _, _>(
txn,
genesis,
spec,
publish_tributary_tx,
key,
signed.signer.to_bytes(),
"accused self of having an InvalidDkgShare",
);
)
.await;
return;
}
@@ -454,12 +573,17 @@ pub(crate) async fn handle_application_tx<
}
Transaction::DkgConfirmed(attempt, shares, signed) => {
match handle(
match handle::<D, _, _>(
txn,
spec,
publish_tributary_tx,
key,
&DataSpecification { topic: Topic::Dkg, label: DKG_CONFIRMATION_SHARES, attempt },
shares.to_vec(),
&signed,
) {
)
.await
{
Accumulation::Ready(DataSet::Participating(shares)) => {
log::info!("got all DkgConfirmed for {}", hex::encode(genesis));
@@ -474,13 +598,24 @@ pub(crate) async fn handle_application_tx<
match DkgConfirmer::complete(spec, key, attempt, preprocesses, &key_pair, shares) {
Ok(sig) => sig,
Err(p) => {
fatal_slash_with_participant_index::<D>(spec, txn, p, "invalid DkgConfirmer share");
fatal_slash_with_participant_index::<D, _, _>(
txn,
spec,
publish_tributary_tx,
key,
p,
"invalid DkgConfirmer share",
)
.await;
return;
}
};
DkgCompleted::set(txn, genesis, &());
publish_serai_tx(
spec.set(),
PstTxType::SetKeys,
SeraiValidatorSets::set_keys(spec.set().network, key_pair, Signature(sig)),
)
.await;
@@ -492,6 +627,124 @@ pub(crate) async fn handle_application_tx<
}
}
Transaction::DkgRemovalPreprocess(data) => {
let signer = data.signed.signer;
// TODO: Only handle this if we're not actively removing this validator
if (data.data.len() != 1) || (data.data[0].len() != 64) {
fatal_slash::<D, _, _>(
txn,
spec,
publish_tributary_tx,
key,
signer.to_bytes(),
"non-64-byte DKG removal preprocess",
)
.await;
return;
}
match handle::<D, _, _>(
txn,
spec,
publish_tributary_tx,
key,
&DataSpecification {
topic: Topic::DkgRemoval(data.plan),
label: DKG_REMOVAL_PREPROCESS,
attempt: data.attempt,
},
data.data.encode(),
&data.signed,
)
.await
{
Accumulation::Ready(DataSet::Participating(preprocesses)) => {
RemovalNonces::set(txn, genesis, data.plan, data.attempt, &preprocesses);
let Ok(share) = DkgRemoval::share(spec, key, data.attempt, preprocesses, data.plan)
else {
// TODO: Locally increase slash points to maximum (distinct from an explicitly fatal
// slash) and censor transactions (yet don't explicitly ban)
return;
};
let mut tx = Transaction::DkgRemovalPreprocess(SignData {
plan: data.plan,
attempt: data.attempt,
data: vec![share.to_vec()],
signed: Transaction::empty_signed(),
});
tx.sign(&mut OsRng, genesis, key);
publish_tributary_tx(tx).await;
}
Accumulation::Ready(DataSet::NotParticipating) => {}
Accumulation::NotReady => {}
}
}
Transaction::DkgRemovalShare(data) => {
let signer = data.signed.signer;
if (data.data.len() != 1) || (data.data[0].len() != 32) {
fatal_slash::<D, _, _>(
txn,
spec,
publish_tributary_tx,
key,
signer.to_bytes(),
"non-32-byte DKG removal share",
)
.await;
return;
}
match handle::<D, _, _>(
txn,
spec,
publish_tributary_tx,
key,
&DataSpecification {
topic: Topic::DkgRemoval(data.plan),
label: DKG_REMOVAL_SHARE,
attempt: data.attempt,
},
data.data.encode(),
&data.signed,
)
.await
{
Accumulation::Ready(DataSet::Participating(shares)) => {
let preprocesses = RemovalNonces::get(txn, genesis, data.plan, data.attempt).unwrap();
let Ok((signers, signature)) =
DkgRemoval::complete(spec, key, data.attempt, preprocesses, data.plan, shares)
else {
// TODO: Locally increase slash points to maximum (distinct from an explicitly fatal
// slash) and censor transactions (yet don't explicitly ban)
return;
};
// TODO: Only handle this if we're not actively removing any of the signers
// The created Substrate call will fail if a removed validator was one of the signers
// Since:
// 1) publish_serai_tx will block this task until the TX is published
// 2) We won't scan any more TXs/blocks until we handle this TX
// The TX *must* be successfully published *before* we start removing any more signers
// Accordingly, if the signers aren't currently being removed, they won't be removed
// by the time this transaction is successfully published *unless* a malicious 34%
// participates with the non-participating 33% to continue operation and produce a
// distinct removal (since the non-participating won't block in this block)
// This breaks BFT and is accordingly within bounds
let tx = serai_client::SeraiValidatorSets::remove_participant(
spec.set().network,
Public(data.plan),
signers,
Signature(signature),
);
publish_serai_tx(spec.set(), PstTxType::RemoveParticipant(data.plan), tx).await;
}
Accumulation::Ready(DataSet::NotParticipating) => {}
Accumulation::NotReady => {}
}
}
Transaction::CosignSubstrateBlock(hash) => {
AttemptDb::recognize_topic(
txn,
@@ -540,17 +793,37 @@ pub(crate) async fn handle_application_tx<
Transaction::SubstratePreprocess(data) => {
let signer = data.signed.signer;
let Ok(_) = check_sign_data_len::<D>(txn, spec, signer, data.data.len()) else {
let Ok(_) = check_sign_data_len::<D, _, _>(
txn,
spec,
publish_tributary_tx,
key,
signer,
data.data.len(),
)
.await
else {
return;
};
for data in &data.data {
if data.len() != 64 {
fatal_slash::<D>(txn, genesis, signer.to_bytes(), "non-64-byte Substrate preprocess");
fatal_slash::<D, _, _>(
txn,
spec,
publish_tributary_tx,
key,
signer.to_bytes(),
"non-64-byte Substrate preprocess",
)
.await;
return;
}
}
match handle(
match handle::<D, _, _>(
txn,
spec,
publish_tributary_tx,
key,
&DataSpecification {
topic: Topic::SubstrateSign(data.plan),
label: BATCH_PREPROCESS,
@@ -558,7 +831,9 @@ pub(crate) async fn handle_application_tx<
},
data.data.encode(),
&data.signed,
) {
)
.await
{
Accumulation::Ready(DataSet::Participating(mut preprocesses)) => {
unflatten(spec, &mut preprocesses);
processors
@@ -583,11 +858,23 @@ pub(crate) async fn handle_application_tx<
}
}
Transaction::SubstrateShare(data) => {
let Ok(_) = check_sign_data_len::<D>(txn, spec, data.signed.signer, data.data.len()) else {
let Ok(_) = check_sign_data_len::<D, _, _>(
txn,
spec,
publish_tributary_tx,
key,
data.signed.signer,
data.data.len(),
)
.await
else {
return;
};
match handle(
match handle::<D, _, _>(
txn,
spec,
publish_tributary_tx,
key,
&DataSpecification {
topic: Topic::SubstrateSign(data.plan),
label: BATCH_SHARE,
@@ -595,7 +882,9 @@ pub(crate) async fn handle_application_tx<
},
data.data.encode(),
&data.signed,
) {
)
.await
{
Accumulation::Ready(DataSet::Participating(mut shares)) => {
unflatten(spec, &mut shares);
processors
@@ -621,11 +910,23 @@ pub(crate) async fn handle_application_tx<
}
Transaction::SignPreprocess(data) => {
let Ok(_) = check_sign_data_len::<D>(txn, spec, data.signed.signer, data.data.len()) else {
let Ok(_) = check_sign_data_len::<D, _, _>(
txn,
spec,
publish_tributary_tx,
key,
data.signed.signer,
data.data.len(),
)
.await
else {
return;
};
match handle(
match handle::<D, _, _>(
txn,
spec,
publish_tributary_tx,
key,
&DataSpecification {
topic: Topic::Sign(data.plan),
label: SIGN_PREPROCESS,
@@ -633,7 +934,9 @@ pub(crate) async fn handle_application_tx<
},
data.data.encode(),
&data.signed,
) {
)
.await
{
Accumulation::Ready(DataSet::Participating(mut preprocesses)) => {
unflatten(spec, &mut preprocesses);
processors
@@ -651,11 +954,23 @@ pub(crate) async fn handle_application_tx<
}
}
Transaction::SignShare(data) => {
let Ok(_) = check_sign_data_len::<D>(txn, spec, data.signed.signer, data.data.len()) else {
let Ok(_) = check_sign_data_len::<D, _, _>(
txn,
spec,
publish_tributary_tx,
key,
data.signed.signer,
data.data.len(),
)
.await
else {
return;
};
match handle(
match handle::<D, _, _>(
txn,
spec,
publish_tributary_tx,
key,
&DataSpecification {
topic: Topic::Sign(data.plan),
label: SIGN_SHARE,
@@ -663,7 +978,9 @@ pub(crate) async fn handle_application_tx<
},
data.data.encode(),
&data.signed,
) {
)
.await
{
Accumulation::Ready(DataSet::Participating(mut shares)) => {
unflatten(spec, &mut shares);
processors
@@ -688,12 +1005,15 @@ pub(crate) async fn handle_application_tx<
);
if AttemptDb::attempt(txn, genesis, Topic::Sign(plan)).is_none() {
fatal_slash::<D>(
fatal_slash::<D, _, _>(
txn,
genesis,
spec,
publish_tributary_tx,
key,
first_signer.to_bytes(),
"claimed an unrecognized plan was completed",
);
)
.await;
return;
};

View File

@@ -36,6 +36,7 @@ mod db;
pub use db::*;
mod dkg_confirmer;
mod dkg_removal;
mod handle;
pub use handle::*;
@@ -226,7 +227,7 @@ impl<Id: Clone + PartialEq + Eq + Debug + Encode + Decode> SignData<Id> {
writer.write_all(&[u8::try_from(self.data.len()).unwrap()])?;
for data in &self.data {
if data.len() > u16::MAX.into() {
// Currently, the largest individual preproces is a Monero transaction
// Currently, the largest individual preprocess is a Monero transaction
// It provides 4 commitments per input (128 bytes), a 64-byte proof for them, along with a
// key image and proof (96 bytes)
// Even with all of that, we could support 227 inputs in a single TX
@@ -265,6 +266,9 @@ pub enum Transaction {
},
DkgConfirmed(u32, [u8; 32], Signed),
DkgRemovalPreprocess(SignData<[u8; 32]>),
DkgRemovalShare(SignData<[u8; 32]>),
// Co-sign a Substrate block.
CosignSubstrateBlock([u8; 32]),
@@ -325,6 +329,12 @@ impl Debug for Transaction {
.field("attempt", attempt)
.field("signer", &hex::encode(signed.signer.to_bytes()))
.finish_non_exhaustive(),
Transaction::DkgRemovalPreprocess(sign_data) => {
fmt.debug_struct("Transaction::DkgRemovalPreprocess").field("sign_data", sign_data).finish()
}
Transaction::DkgRemovalShare(sign_data) => {
fmt.debug_struct("Transaction::DkgRemovalShare").field("sign_data", sign_data).finish()
}
Transaction::CosignSubstrateBlock(block) => fmt
.debug_struct("Transaction::CosignSubstrateBlock")
.field("block", &hex::encode(block))
@@ -487,13 +497,16 @@ impl ReadWrite for Transaction {
Ok(Transaction::DkgConfirmed(attempt, confirmation_share, signed))
}
5 => {
5 => SignData::read(reader, 0).map(Transaction::DkgRemovalPreprocess),
6 => SignData::read(reader, 1).map(Transaction::DkgRemovalShare),
7 => {
let mut block = [0; 32];
reader.read_exact(&mut block)?;
Ok(Transaction::CosignSubstrateBlock(block))
}
6 => {
8 => {
let mut block = [0; 32];
reader.read_exact(&mut block)?;
let mut batch = [0; 5];
@@ -501,19 +514,19 @@ impl ReadWrite for Transaction {
Ok(Transaction::Batch(block, batch))
}
7 => {
9 => {
let mut block = [0; 8];
reader.read_exact(&mut block)?;
Ok(Transaction::SubstrateBlock(u64::from_le_bytes(block)))
}
8 => SignData::read(reader, 0).map(Transaction::SubstratePreprocess),
9 => SignData::read(reader, 1).map(Transaction::SubstrateShare),
10 => SignData::read(reader, 0).map(Transaction::SubstratePreprocess),
11 => SignData::read(reader, 1).map(Transaction::SubstrateShare),
10 => SignData::read(reader, 0).map(Transaction::SignPreprocess),
11 => SignData::read(reader, 1).map(Transaction::SignShare),
12 => SignData::read(reader, 0).map(Transaction::SignPreprocess),
13 => SignData::read(reader, 1).map(Transaction::SignShare),
12 => {
14 => {
let mut plan = [0; 32];
reader.read_exact(&mut plan)?;
@@ -610,41 +623,50 @@ impl ReadWrite for Transaction {
signed.write_without_nonce(writer)
}
Transaction::CosignSubstrateBlock(block) => {
Transaction::DkgRemovalPreprocess(data) => {
writer.write_all(&[5])?;
data.write(writer)
}
Transaction::DkgRemovalShare(data) => {
writer.write_all(&[6])?;
data.write(writer)
}
Transaction::CosignSubstrateBlock(block) => {
writer.write_all(&[7])?;
writer.write_all(block)
}
Transaction::Batch(block, batch) => {
writer.write_all(&[6])?;
writer.write_all(&[8])?;
writer.write_all(block)?;
writer.write_all(batch)
}
Transaction::SubstrateBlock(block) => {
writer.write_all(&[7])?;
writer.write_all(&[9])?;
writer.write_all(&block.to_le_bytes())
}
Transaction::SubstratePreprocess(data) => {
writer.write_all(&[8])?;
writer.write_all(&[10])?;
data.write(writer)
}
Transaction::SubstrateShare(data) => {
writer.write_all(&[9])?;
writer.write_all(&[11])?;
data.write(writer)
}
Transaction::SignPreprocess(data) => {
writer.write_all(&[10])?;
writer.write_all(&[12])?;
data.write(writer)
}
Transaction::SignShare(data) => {
writer.write_all(&[11])?;
writer.write_all(&[13])?;
data.write(writer)
}
Transaction::SignCompleted { plan, tx_hash, first_signer, signature } => {
writer.write_all(&[12])?;
writer.write_all(&[14])?;
writer.write_all(plan)?;
writer
.write_all(&[u8::try_from(tx_hash.len()).expect("tx hash length exceed 255 bytes")])?;
@@ -674,6 +696,13 @@ impl TransactionTrait for Transaction {
TransactionKind::Signed((b"dkg", attempt).encode(), signed)
}
Transaction::DkgRemovalPreprocess(data) => {
TransactionKind::Signed((b"dkg_removal", data.plan, data.attempt).encode(), &data.signed)
}
Transaction::DkgRemovalShare(data) => {
TransactionKind::Signed((b"dkg_removal", data.plan, data.attempt).encode(), &data.signed)
}
Transaction::CosignSubstrateBlock(_) => TransactionKind::Provided("cosign"),
Transaction::Batch(_, _) => TransactionKind::Provided("batch"),
@@ -753,6 +782,9 @@ impl Transaction {
Transaction::InvalidDkgShare { .. } => 2,
Transaction::DkgConfirmed(_, _, _) => 2,
Transaction::DkgRemovalPreprocess(_) => 0,
Transaction::DkgRemovalShare(_) => 1,
Transaction::CosignSubstrateBlock(_) => panic!("signing CosignSubstrateBlock"),
Transaction::Batch(_, _) => panic!("signing Batch"),
@@ -776,6 +808,9 @@ impl Transaction {
Transaction::InvalidDkgShare { ref mut signed, .. } => signed,
Transaction::DkgConfirmed(_, _, ref mut signed) => signed,
Transaction::DkgRemovalPreprocess(ref mut data) => &mut data.signed,
Transaction::DkgRemovalShare(ref mut data) => &mut data.signed,
Transaction::CosignSubstrateBlock(_) => panic!("signing CosignSubstrateBlock"),
Transaction::Batch(_, _) => panic!("signing Batch"),

View File

@@ -13,7 +13,7 @@ use serai_client::{validator_sets::primitives::ValidatorSet, Serai};
use serai_db::DbTxn;
use tributary::{
TransactionKind, Transaction as TributaryTransaction, Block, TributaryReader,
TransactionKind, Transaction as TributaryTransaction, TransactionError, Block, TributaryReader,
tendermint::{
tx::{TendermintTx, Evidence, decode_signed_message},
TendermintNetwork,
@@ -43,12 +43,21 @@ impl<FRid, F: Clone + Fn(ValidatorSet, [u8; 32], RecognizedIdType, Vec<u8>) -> F
{
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum PstTxType {
SetKeys,
RemoveParticipant([u8; 32]),
}
// Handle a specific Tributary block
#[allow(clippy::too_many_arguments)]
async fn handle_block<
D: Db,
Pro: Processors,
FPst: Future<Output = ()>,
PST: Clone + Fn(ValidatorSet, Vec<u8>) -> FPst,
PST: Clone + Fn(ValidatorSet, PstTxType, Vec<u8>) -> FPst,
FPtt: Future<Output = ()>,
PTT: Clone + Fn(Transaction) -> FPtt,
FRid: Future<Output = ()>,
RID: RIDTrait<FRid>,
P: P2p,
@@ -58,12 +67,12 @@ async fn handle_block<
recognized_id: RID,
processors: &Pro,
publish_serai_tx: PST,
publish_tributary_tx: &PTT,
spec: &TributarySpec,
block: Block<Transaction>,
) {
log::info!("found block for Tributary {:?}", spec.set());
let genesis = spec.genesis();
let hash = block.hash();
let mut event_id = 0;
@@ -100,19 +109,23 @@ async fn handle_block<
// Since anything with evidence is fundamentally faulty behavior, not just temporal errors,
// mark the node as fatally slashed
fatal_slash::<D>(
fatal_slash::<D, _, _>(
&mut txn,
genesis,
spec,
publish_tributary_tx,
key,
msgs.0.msg.sender,
&format!("invalid tendermint messages: {:?}", msgs),
);
)
.await;
}
TributaryTransaction::Application(tx) => {
handle_application_tx::<D, _, _, _, _, _>(
handle_application_tx::<D, _, _, _, _, _, _, _>(
tx,
spec,
processors,
publish_serai_tx.clone(),
publish_tributary_tx,
key,
recognized_id.clone(),
&mut txn,
@@ -130,11 +143,14 @@ async fn handle_block<
// TODO: Trigger any necessary re-attempts
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn handle_new_blocks<
D: Db,
Pro: Processors,
FPst: Future<Output = ()>,
PST: Clone + Fn(ValidatorSet, Vec<u8>) -> FPst,
PST: Clone + Fn(ValidatorSet, PstTxType, Vec<u8>) -> FPst,
FPtt: Future<Output = ()>,
PTT: Clone + Fn(Transaction) -> FPtt,
FRid: Future<Output = ()>,
RID: RIDTrait<FRid>,
P: P2p,
@@ -144,6 +160,7 @@ pub(crate) async fn handle_new_blocks<
recognized_id: RID,
processors: &Pro,
publish_serai_tx: PST,
publish_tributary_tx: &PTT,
spec: &TributarySpec,
tributary: &TributaryReader<D, Transaction>,
) {
@@ -165,12 +182,13 @@ pub(crate) async fn handle_new_blocks<
}
}
handle_block::<_, _, _, _, _, _, P>(
handle_block::<_, _, _, _, _, _, _, _, P>(
db,
key,
recognized_id.clone(),
processors,
publish_serai_tx.clone(),
publish_tributary_tx,
spec,
block,
)
@@ -222,12 +240,12 @@ pub(crate) async fn scan_tributaries_task<
// the next block occurs
let next_block_notification = tributary.next_block_notification().await;
handle_new_blocks::<_, _, _, _, _, _, P>(
handle_new_blocks::<_, _, _, _, _, _, _, _, P>(
&mut tributary_db,
&key,
recognized_id.clone(),
&processors,
|set, tx| {
|set, tx_type, tx| {
let serai = serai.clone();
async move {
loop {
@@ -242,33 +260,52 @@ pub(crate) async fn scan_tributaries_task<
Err(e) => {
if let Ok(serai) = serai.as_of_latest_finalized_block().await {
let serai = serai.validator_sets();
// Check if this failed because the keys were already set by someone
// else
if matches!(serai.keys(spec.set()).await, Ok(Some(_))) {
log::info!("another coordinator set key pair for {:?}", set);
break;
}
// The above block may return false if the keys have been pruned from
// the state
// Check if this session is no longer the latest session, meaning it at
// some point did set keys, and we're just operating off very
// historical data
// The following block is irrelevant, and can/likely will fail, if
// we're publishing a TX for an old session
// If we're on a newer session, move on
if let Ok(Some(current_session)) =
serai.session(spec.set().network).await
{
if current_session.0 > spec.set().session.0 {
log::warn!(
"trying to set keys for a set which isn't the latest {:?}",
"trying to publish a TX relevant to a set {} {:?}",
"which isn't the latest",
set
);
break;
}
}
// Check if someone else published the TX in question
match tx_type {
PstTxType::SetKeys => {
if matches!(serai.keys(spec.set()).await, Ok(Some(_))) {
log::info!("another coordinator set key pair for {:?}", set);
break;
}
}
PstTxType::RemoveParticipant(removed) => {
if let Ok(Some(participants)) =
serai.participants(spec.set().network).await
{
if !participants
.iter()
.any(|(participant, _)| participant.0 == removed)
{
log::info!(
"another coordinator published removal for {:?}",
hex::encode(removed)
);
break;
}
}
}
}
}
log::error!(
"couldn't connect to Serai node to publish set_keys TX: {:?}",
"couldn't connect to Serai node to publish {tx_type:?} TX: {:?}",
e
);
tokio::time::sleep(core::time::Duration::from_secs(10)).await;
@@ -277,6 +314,21 @@ pub(crate) async fn scan_tributaries_task<
}
}
},
&|tx| {
let tributary = tributary.clone();
async move {
match tributary.add_transaction(tx.clone()).await {
Ok(_) => {}
// Can happen as this occurs on a distinct DB TXN
Err(TransactionError::InvalidNonce) => {
log::warn!(
"publishing TX {tx:?} returned InvalidNonce. was it already added?"
)
}
Err(e) => panic!("created an invalid transaction: {e:?}"),
}
}
},
spec,
&reader,
)