Implement SeraiAddress <-> Participant mapping and add RemoveParticipant transactions

This commit is contained in:
Luke Parker
2025-01-15 12:51:35 -05:00
parent bea4f92b7a
commit 167826aa88
9 changed files with 125 additions and 79 deletions

2
Cargo.lock generated
View File

@@ -8318,6 +8318,7 @@ dependencies = [
"blake2", "blake2",
"borsh", "borsh",
"ciphersuite", "ciphersuite",
"dkg",
"env_logger", "env_logger",
"frost-schnorrkel", "frost-schnorrkel",
"hex", "hex",
@@ -8387,6 +8388,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"bitvec", "bitvec",
"borsh", "borsh",
"dkg",
"futures", "futures",
"log", "log",
"parity-scale-codec", "parity-scale-codec",

View File

@@ -26,6 +26,7 @@ blake2 = { version = "0.10", default-features = false, features = ["std"] }
schnorrkel = { version = "0.11", default-features = false, features = ["std"] } schnorrkel = { version = "0.11", default-features = false, features = ["std"] }
ciphersuite = { path = "../crypto/ciphersuite", default-features = false, features = ["std"] } ciphersuite = { path = "../crypto/ciphersuite", default-features = false, features = ["std"] }
dkg = { path = "../crypto/dkg", default-features = false, features = ["std"] }
schnorr = { package = "schnorr-signatures", path = "../crypto/schnorr", default-features = false, features = ["std"] } schnorr = { package = "schnorr-signatures", path = "../crypto/schnorr", default-features = false, features = ["std"] }
frost = { package = "modular-frost", path = "../crypto/frost" } frost = { package = "modular-frost", path = "../crypto/frost" }
frost-schnorrkel = { path = "../crypto/schnorrkel" } frost-schnorrkel = { path = "../crypto/schnorrkel" }

View File

@@ -3,6 +3,8 @@ use std::{path::Path, fs};
pub(crate) use serai_db::{Get, DbTxn, Db as DbTrait}; pub(crate) use serai_db::{Get, DbTxn, Db as DbTrait};
use serai_db::{create_db, db_channel}; use serai_db::{create_db, db_channel};
use dkg::Participant;
use serai_client::{ use serai_client::{
primitives::NetworkId, primitives::NetworkId,
validator_sets::primitives::{Session, ValidatorSet}, validator_sets::primitives::{Session, ValidatorSet},
@@ -95,6 +97,8 @@ mod _internal_db {
Coordinator { Coordinator {
// Tributary transactions to publish // Tributary transactions to publish
TributaryTransactions: (set: ValidatorSet) -> Transaction, TributaryTransactions: (set: ValidatorSet) -> Transaction,
// Participants to remove
RemoveParticipant: (set: ValidatorSet) -> Participant,
} }
} }
} }
@@ -111,3 +115,16 @@ impl TributaryTransactions {
_internal_db::TributaryTransactions::try_recv(txn, set) _internal_db::TributaryTransactions::try_recv(txn, set)
} }
} }
pub(crate) struct RemoveParticipant;
impl RemoveParticipant {
pub(crate) fn send(txn: &mut impl DbTxn, set: ValidatorSet, participant: Participant) {
// If this set has yet to be retired, send this transaction
if RetiredTributary::get(txn, set.network).map(|session| session.0) < Some(set.session.0) {
_internal_db::RemoveParticipant::send(txn, set, &participant);
}
}
pub(crate) fn try_recv(txn: &mut impl DbTxn, set: ValidatorSet) -> Option<Participant> {
_internal_db::RemoveParticipant::try_recv(txn, set)
}
}

View File

@@ -14,7 +14,7 @@ use borsh::BorshDeserialize;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use serai_client::{ use serai_client::{
primitives::{NetworkId, PublicKey, Signature}, primitives::{NetworkId, SeraiAddress, Signature},
validator_sets::primitives::ValidatorSet, validator_sets::primitives::ValidatorSet,
Serai, Serai,
}; };
@@ -209,28 +209,12 @@ async fn handle_network(
network_key, network_key,
} => todo!("TODO Transaction::DkgConfirmationPreprocess"), } => todo!("TODO Transaction::DkgConfirmationPreprocess"),
messages::key_gen::ProcessorMessage::Blame { session, participant } => { messages::key_gen::ProcessorMessage::Blame { session, participant } => {
let set = ValidatorSet { network, session }; RemoveParticipant::send(&mut txn, ValidatorSet { network, session }, participant);
TributaryTransactions::send(
&mut txn,
set,
&Transaction::RemoveParticipant {
participant: todo!("TODO"),
signed: Signed::default(),
},
);
} }
}, },
messages::ProcessorMessage::Sign(msg) => match msg { messages::ProcessorMessage::Sign(msg) => match msg {
messages::sign::ProcessorMessage::InvalidParticipant { session, participant } => { messages::sign::ProcessorMessage::InvalidParticipant { session, participant } => {
let set = ValidatorSet { network, session }; RemoveParticipant::send(&mut txn, ValidatorSet { network, session }, participant);
TributaryTransactions::send(
&mut txn,
set,
&Transaction::RemoveParticipant {
participant: todo!("TODO"),
signed: Signed::default(),
},
);
} }
messages::sign::ProcessorMessage::Preprocesses { id, preprocesses } => { messages::sign::ProcessorMessage::Preprocesses { id, preprocesses } => {
let set = ValidatorSet { network, session: id.session }; let set = ValidatorSet { network, session: id.session };
@@ -371,6 +355,8 @@ async fn main() {
while !Cosigning::<Db>::intended_cosigns(&mut txn, to_cleanup).is_empty() {} while !Cosigning::<Db>::intended_cosigns(&mut txn, to_cleanup).is_empty() {}
// Drain the transactions to publish for this set // Drain the transactions to publish for this set
while TributaryTransactions::try_recv(&mut txn, to_cleanup).is_some() {} while TributaryTransactions::try_recv(&mut txn, to_cleanup).is_some() {}
// Drain the participants to remove for this set
while RemoveParticipant::try_recv(&mut txn, to_cleanup).is_some() {}
// Remove the SignSlashReport notification // Remove the SignSlashReport notification
SignSlashReport::try_recv(&mut txn, to_cleanup); SignSlashReport::try_recv(&mut txn, to_cleanup);
} }
@@ -434,7 +420,7 @@ async fn main() {
EphemeralEventStream::new( EphemeralEventStream::new(
db.clone(), db.clone(),
serai.clone(), serai.clone(),
PublicKey::from_raw((<Ristretto as Ciphersuite>::generator() * serai_key.deref()).to_bytes()), SeraiAddress((<Ristretto as Ciphersuite>::generator() * serai_key.deref()).to_bytes()),
) )
.continually_run(substrate_ephemeral_task_def, vec![substrate_task]), .continually_run(substrate_ephemeral_task_def, vec![substrate_task]),
); );

View File

@@ -26,7 +26,7 @@ use serai_coordinator_tributary::{
}; };
use serai_coordinator_p2p::P2p; use serai_coordinator_p2p::P2p;
use crate::{Db, TributaryTransactions}; use crate::{Db, TributaryTransactions, RemoveParticipant};
create_db! { create_db! {
Coordinator { Coordinator {
@@ -158,8 +158,14 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
#[must_use] #[must_use]
async fn add_signed_unsigned_transaction<TD: DbTrait, P: P2p>( async fn add_signed_unsigned_transaction<TD: DbTrait, P: P2p>(
tributary: &Tributary<TD, Transaction, P>, tributary: &Tributary<TD, Transaction, P>,
tx: &Transaction, key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
mut tx: Transaction,
) -> bool { ) -> bool {
// If this is a signed transaction, sign it
if matches!(tx.kind(), TransactionKind::Signed(_, _)) {
tx.sign(&mut OsRng, tributary.genesis(), key);
}
let res = tributary.add_transaction(tx.clone()).await; let res = tributary.add_transaction(tx.clone()).await;
match &res { match &res {
// Fresh publication, already published // Fresh publication, already published
@@ -191,7 +197,7 @@ pub(crate) struct AddTributaryTransactionsTask<CD: DbTrait, TD: DbTrait, P: P2p>
db: CD, db: CD,
tributary_db: TD, tributary_db: TD,
tributary: Tributary<TD, Transaction, P>, tributary: Tributary<TD, Transaction, P>,
set: ValidatorSet, set: NewSetInformation,
key: Zeroizing<<Ristretto as Ciphersuite>::F>, key: Zeroizing<<Ristretto as Ciphersuite>::F>,
} }
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for AddTributaryTransactionsTask<CD, TD, P> { impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for AddTributaryTransactionsTask<CD, TD, P> {
@@ -204,23 +210,20 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for AddTributaryTransactio
// Provide/add all transactions sent our way // Provide/add all transactions sent our way
loop { loop {
let mut txn = self.db.txn(); let mut txn = self.db.txn();
let Some(mut tx) = TributaryTransactions::try_recv(&mut txn, self.set) else { break }; let Some(tx) = TributaryTransactions::try_recv(&mut txn, self.set.set) else { break };
let kind = tx.kind(); let kind = tx.kind();
match kind { match kind {
TransactionKind::Provided(_) => provide_transaction(self.set, &self.tributary, tx).await, TransactionKind::Provided(_) => {
TransactionKind::Unsigned | TransactionKind::Signed(_, _) => { provide_transaction(self.set.set, &self.tributary, tx).await
// If this is a signed transaction, sign it
if matches!(kind, TransactionKind::Signed(_, _)) {
tx.sign(&mut OsRng, self.tributary.genesis(), &self.key);
} }
TransactionKind::Unsigned | TransactionKind::Signed(_, _) => {
// If this is a transaction with signing data, check the topic is recognized before // If this is a transaction with signing data, check the topic is recognized before
// publishing // publishing
let topic = tx.topic(); let topic = tx.topic();
let still_requires_recognition = if let Some(topic) = topic { let still_requires_recognition = if let Some(topic) = topic {
(topic.requires_recognition() && (topic.requires_recognition() &&
(!RecognizedTopics::recognized(&self.tributary_db, self.set, topic))) (!RecognizedTopics::recognized(&self.tributary_db, self.set.set, topic)))
.then_some(topic) .then_some(topic)
} else { } else {
None None
@@ -229,11 +232,11 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for AddTributaryTransactio
// Queue the transaction until the topic is recognized // Queue the transaction until the topic is recognized
// We use the Tributary DB for this so it's cleaned up when the Tributary DB is // We use the Tributary DB for this so it's cleaned up when the Tributary DB is
let mut txn = self.tributary_db.txn(); let mut txn = self.tributary_db.txn();
PublishOnRecognition::set(&mut txn, self.set, topic, &tx); PublishOnRecognition::set(&mut txn, self.set.set, topic, &tx);
txn.commit(); txn.commit();
} else { } else {
// Actually add the transaction // Actually add the transaction
if !add_signed_unsigned_transaction(&self.tributary, &tx).await { if !add_signed_unsigned_transaction(&self.tributary, &self.key, tx).await {
break; break;
} }
} }
@@ -248,12 +251,12 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for AddTributaryTransactio
loop { loop {
let mut txn = self.tributary_db.txn(); let mut txn = self.tributary_db.txn();
let Some(topic) = let Some(topic) =
RecognizedTopics::try_recv_topic_requiring_recognition(&mut txn, self.set) RecognizedTopics::try_recv_topic_requiring_recognition(&mut txn, self.set.set)
else { else {
break; break;
}; };
if let Some(tx) = PublishOnRecognition::take(&mut txn, self.set, topic) { if let Some(tx) = PublishOnRecognition::take(&mut txn, self.set.set, topic) {
if !add_signed_unsigned_transaction(&self.tributary, &tx).await { if !add_signed_unsigned_transaction(&self.tributary, &self.key, tx).await {
break; break;
} }
} }
@@ -262,6 +265,21 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for AddTributaryTransactio
txn.commit(); txn.commit();
} }
// Publish any participant removals
loop {
let mut txn = self.db.txn();
let Some(participant) = RemoveParticipant::try_recv(&mut txn, self.set.set) else { break };
let tx = Transaction::RemoveParticipant {
participant: self.set.participant_indexes_reverse_lookup[&participant],
signed: Default::default(),
};
if !add_signed_unsigned_transaction(&self.tributary, &self.key, tx).await {
break;
}
made_progress = true;
txn.commit();
}
Ok(made_progress) Ok(made_progress)
} }
} }
@@ -487,7 +505,7 @@ pub(crate) async fn spawn_tributary<P: P2p>(
db: db.clone(), db: db.clone(),
tributary_db, tributary_db,
tributary: tributary.clone(), tributary: tributary.clone(),
set: set.set, set: set.clone(),
key: serai_key, key: serai_key,
}) })
.continually_run(add_tributary_transactions_task_def, vec![]), .continually_run(add_tributary_transactions_task_def, vec![]),

View File

@@ -22,6 +22,9 @@ bitvec = { version = "1", default-features = false, features = ["std"] }
scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive", "bit-vec"] } scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive", "bit-vec"] }
borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] } borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] }
dkg = { path = "../../crypto/dkg", default-features = false, features = ["std"] }
serai-client = { path = "../../substrate/client", version = "0.1", default-features = false, features = ["serai", "borsh"] } serai-client = { path = "../../substrate/client", version = "0.1", default-features = false, features = ["serai", "borsh"] }
log = { version = "0.4", default-features = false, features = ["std"] } log = { version = "0.4", default-features = false, features = ["std"] }

View File

@@ -4,7 +4,7 @@ use std::sync::Arc;
use futures::stream::{StreamExt, FuturesOrdered}; use futures::stream::{StreamExt, FuturesOrdered};
use serai_client::{ use serai_client::{
primitives::{PublicKey, NetworkId, EmbeddedEllipticCurve}, primitives::{NetworkId, SeraiAddress, EmbeddedEllipticCurve},
validator_sets::primitives::MAX_KEY_SHARES_PER_SET, validator_sets::primitives::MAX_KEY_SHARES_PER_SET,
Serai, Serai,
}; };
@@ -26,14 +26,14 @@ create_db!(
pub struct EphemeralEventStream<D: Db> { pub struct EphemeralEventStream<D: Db> {
db: D, db: D,
serai: Arc<Serai>, serai: Arc<Serai>,
validator: PublicKey, validator: SeraiAddress,
} }
impl<D: Db> EphemeralEventStream<D> { impl<D: Db> EphemeralEventStream<D> {
/// Create a new ephemeral event stream. /// Create a new ephemeral event stream.
/// ///
/// Only one of these may exist over the provided database. /// Only one of these may exist over the provided database.
pub fn new(db: D, serai: Arc<Serai>, validator: PublicKey) -> Self { pub fn new(db: D, serai: Arc<Serai>, validator: SeraiAddress) -> Self {
Self { db, serai, validator } Self { db, serai, validator }
} }
} }
@@ -145,6 +145,10 @@ impl<D: Db> ContinuallyRan for EphemeralEventStream<D> {
"block #{block_number} declared a new set but didn't have the participants" "block #{block_number} declared a new set but didn't have the participants"
))? ))?
}; };
let validators = validators
.into_iter()
.map(|(validator, weight)| (SeraiAddress::from(validator), weight))
.collect::<Vec<_>>();
let in_set = validators.iter().any(|(validator, _)| *validator == self.validator); let in_set = validators.iter().any(|(validator, _)| *validator == self.validator);
if in_set { if in_set {
if u16::try_from(validators.len()).is_err() { if u16::try_from(validators.len()).is_err() {
@@ -177,14 +181,16 @@ impl<D: Db> ContinuallyRan for EphemeralEventStream<D> {
embedded_elliptic_curve_keys.push_back(async move { embedded_elliptic_curve_keys.push_back(async move {
tokio::try_join!( tokio::try_join!(
// One future to fetch the substrate embedded key // One future to fetch the substrate embedded key
serai serai.embedded_elliptic_curve_key(
.embedded_elliptic_curve_key(validator, EmbeddedEllipticCurve::Embedwards25519), validator.into(),
EmbeddedEllipticCurve::Embedwards25519
),
// One future to fetch the external embedded key, if there is a distinct curve // One future to fetch the external embedded key, if there is a distinct curve
async { async {
// `embedded_elliptic_curves` is documented to have the second entry be the // `embedded_elliptic_curves` is documented to have the second entry be the
// network-specific curve (if it exists and is distinct from Embedwards25519) // network-specific curve (if it exists and is distinct from Embedwards25519)
if let Some(curve) = set.network.embedded_elliptic_curves().get(1) { if let Some(curve) = set.network.embedded_elliptic_curves().get(1) {
serai.embedded_elliptic_curve_key(validator, *curve).await.map(Some) serai.embedded_elliptic_curve_key(validator.into(), *curve).await.map(Some)
} else { } else {
Ok(None) Ok(None)
} }
@@ -215,19 +221,22 @@ impl<D: Db> ContinuallyRan for EphemeralEventStream<D> {
} }
} }
crate::NewSet::send( let mut new_set = NewSetInformation {
&mut txn,
&NewSetInformation {
set: *set, set: *set,
serai_block: block.block_hash, serai_block: block.block_hash,
declaration_time: block.time, declaration_time: block.time,
// TODO: Why do we have this as an explicit field here? // TODO: Why do we have this as an explicit field here?
// Shouldn't thiis be inlined into the Processor's key gen code, where it's used? // Shouldn't this be inlined into the Processor's key gen code, where it's used?
threshold: ((total_weight * 2) / 3) + 1, threshold: ((total_weight * 2) / 3) + 1,
validators, validators,
evrf_public_keys, evrf_public_keys,
}, participant_indexes: Default::default(),
); participant_indexes_reverse_lookup: Default::default(),
};
// These aren't serialized, and we immediately serialize and drop this, so this isn't
// necessary. It's just good practice not have this be dirty
new_set.init_participant_indexes();
crate::NewSet::send(&mut txn, &new_set);
} }
} }

View File

@@ -2,11 +2,15 @@
#![doc = include_str!("../README.md")] #![doc = include_str!("../README.md")]
#![deny(missing_docs)] #![deny(missing_docs)]
use std::collections::HashMap;
use scale::{Encode, Decode}; use scale::{Encode, Decode};
use borsh::{io, BorshSerialize, BorshDeserialize}; use borsh::{BorshSerialize, BorshDeserialize};
use dkg::Participant;
use serai_client::{ use serai_client::{
primitives::{NetworkId, PublicKey, Signature}, primitives::{NetworkId, SeraiAddress, Signature},
validator_sets::primitives::{Session, ValidatorSet, KeyPair, SlashReport}, validator_sets::primitives::{Session, ValidatorSet, KeyPair, SlashReport},
in_instructions::primitives::SignedBatch, in_instructions::primitives::SignedBatch,
Transaction, Transaction,
@@ -26,22 +30,9 @@ pub use publish_batch::PublishBatchTask;
mod publish_slash_report; mod publish_slash_report;
pub use publish_slash_report::PublishSlashReportTask; pub use publish_slash_report::PublishSlashReportTask;
fn borsh_serialize_validators<W: io::Write>(
validators: &Vec<(PublicKey, u16)>,
writer: &mut W,
) -> Result<(), io::Error> {
// This doesn't use `encode_to` as `encode_to` panics if the writer returns an error
writer.write_all(&validators.encode())
}
fn borsh_deserialize_validators<R: io::Read>(
reader: &mut R,
) -> Result<Vec<(PublicKey, u16)>, io::Error> {
Decode::decode(&mut scale::IoReader(reader)).map_err(io::Error::other)
}
/// The information for a new set. /// The information for a new set.
#[derive(Clone, Debug, BorshSerialize, BorshDeserialize)] #[derive(Clone, Debug, BorshSerialize, BorshDeserialize)]
#[borsh(init = init_participant_indexes)]
pub struct NewSetInformation { pub struct NewSetInformation {
/// The set. /// The set.
pub set: ValidatorSet, pub set: ValidatorSet,
@@ -52,13 +43,34 @@ pub struct NewSetInformation {
/// The threshold to use. /// The threshold to use.
pub threshold: u16, pub threshold: u16,
/// The validators, with the amount of key shares they have. /// The validators, with the amount of key shares they have.
#[borsh( pub validators: Vec<(SeraiAddress, u16)>,
serialize_with = "borsh_serialize_validators",
deserialize_with = "borsh_deserialize_validators"
)]
pub validators: Vec<(PublicKey, u16)>,
/// The eVRF public keys. /// The eVRF public keys.
pub evrf_public_keys: Vec<([u8; 32], Vec<u8>)>, pub evrf_public_keys: Vec<([u8; 32], Vec<u8>)>,
/// The participant indexes, indexed by their validator.
#[borsh(skip)]
pub participant_indexes: HashMap<SeraiAddress, Vec<Participant>>,
/// The validators, indexed by their participant indexes.
#[borsh(skip)]
pub participant_indexes_reverse_lookup: HashMap<Participant, SeraiAddress>,
}
impl NewSetInformation {
fn init_participant_indexes(&mut self) {
let mut next_i = 1;
self.participant_indexes = HashMap::with_capacity(self.validators.len());
self.participant_indexes_reverse_lookup = HashMap::with_capacity(self.validators.len());
for (validator, weight) in &self.validators {
let mut these_is = Vec::with_capacity((*weight).into());
for _ in 0 .. *weight {
let this_i = Participant::new(next_i).unwrap();
next_i += 1;
these_is.push(this_i);
self.participant_indexes_reverse_lookup.insert(this_i, *validator);
}
self.participant_indexes.insert(*validator, these_is);
}
}
} }
mod _public_db { mod _public_db {

View File

@@ -515,7 +515,6 @@ impl<TD: Db, P: P2p> ScanTributaryTask<TD, P> {
let mut total_weight = 0; let mut total_weight = 0;
let mut validator_weights = HashMap::with_capacity(new_set.validators.len()); let mut validator_weights = HashMap::with_capacity(new_set.validators.len());
for (validator, weight) in new_set.validators.iter().copied() { for (validator, weight) in new_set.validators.iter().copied() {
let validator = SeraiAddress::from(validator);
let weight = u64::from(weight); let weight = u64::from(weight);
validators.push(validator); validators.push(validator);
total_weight += weight; total_weight += weight;
@@ -597,7 +596,6 @@ impl<TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<TD, P> {
pub fn slash_report_transaction(getter: &impl Get, set: &NewSetInformation) -> Transaction { pub fn slash_report_transaction(getter: &impl Get, set: &NewSetInformation) -> Transaction {
let mut slash_points = Vec::with_capacity(set.validators.len()); let mut slash_points = Vec::with_capacity(set.validators.len());
for (validator, _weight) in set.validators.iter().copied() { for (validator, _weight) in set.validators.iter().copied() {
let validator = SeraiAddress::from(validator);
slash_points.push(SlashPoints::get(getter, set.set, validator).unwrap_or(0)); slash_points.push(SlashPoints::get(getter, set.set, validator).unwrap_or(0));
} }
Transaction::SlashReport { slash_points, signed: Signed::default() } Transaction::SlashReport { slash_points, signed: Signed::default() }