13 Commits

Author SHA1 Message Date
Luke Parker
be2098d2e1 Remove Serai from the ConfirmDkgTask 2025-01-15 21:00:50 -05:00
Luke Parker
6b41f32371 Correct handling of InvalidNonce within the coordinator 2025-01-15 20:48:54 -05:00
Luke Parker
19b87c7f5a Add the DKG confirmation flow
Finishes the coordinator redo
2025-01-15 20:29:57 -05:00
Luke Parker
505f1b20a4 Correct re-attempts for the DKG Confirmation protocol
Also spawns the SetKeys task.
2025-01-15 17:49:41 -05:00
Luke Parker
8b52b921f3 Have the Tributary scanner yield DKG confirmation signing protocol data 2025-01-15 15:16:30 -05:00
Luke Parker
f36bbcba25 Flatten the map of preprocesses/shares, send Participant index with DkgParticipation 2025-01-15 14:24:51 -05:00
Luke Parker
167826aa88 Implement SeraiAddress <-> Participant mapping and add RemoveParticipant transactions 2025-01-15 12:51:35 -05:00
Luke Parker
bea4f92b7a Fix parity-db builds for the Coordinator 2025-01-15 12:10:11 -05:00
Luke Parker
7312fa8d3c Spawn PublishSlashReportTask
Updates it so that it'll try for every network instead of returning after any
network fails.

Uses the SlashReport type throughout the codebase.
2025-01-15 12:08:28 -05:00
Luke Parker
92a4cceeeb Spawn PublishBatchTask
Also removes the expectation Batches published via it are sent in an ordered
fashion. That won't be true if the signing protocols complete out-of-order (as
possible when we are signing them in parallel).
2025-01-15 11:21:55 -05:00
Luke Parker
3357181fe2 Handle sign::ProcessorMessage::[Preprocesses, Shares] 2025-01-15 10:47:47 -05:00
Luke Parker
7ce5bdad44 Don't add transactions for topics which have yet to be recognized 2025-01-15 07:01:24 -05:00
Luke Parker
0de3fda921 Further space out requests for cosigns from the network 2025-01-15 05:59:56 -05:00
27 changed files with 1386 additions and 383 deletions

5
Cargo.lock generated
View File

@@ -8318,14 +8318,13 @@ dependencies = [
"blake2", "blake2",
"borsh", "borsh",
"ciphersuite", "ciphersuite",
"dkg",
"env_logger", "env_logger",
"frost-schnorrkel", "frost-schnorrkel",
"hex", "hex",
"log", "log",
"modular-frost",
"parity-scale-codec", "parity-scale-codec",
"rand_core", "rand_core",
"schnorr-signatures",
"schnorrkel", "schnorrkel",
"serai-client", "serai-client",
"serai-coordinator-libp2p-p2p", "serai-coordinator-libp2p-p2p",
@@ -8387,6 +8386,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"bitvec", "bitvec",
"borsh", "borsh",
"dkg",
"futures", "futures",
"log", "log",
"parity-scale-codec", "parity-scale-codec",
@@ -8429,6 +8429,7 @@ dependencies = [
"blake2", "blake2",
"borsh", "borsh",
"ciphersuite", "ciphersuite",
"dkg",
"log", "log",
"parity-scale-codec", "parity-scale-codec",
"rand_core", "rand_core",

View File

@@ -25,12 +25,13 @@ rand_core = { version = "0.6", default-features = false, features = ["std"] }
blake2 = { version = "0.10", default-features = false, features = ["std"] } blake2 = { version = "0.10", default-features = false, features = ["std"] }
schnorrkel = { version = "0.11", default-features = false, features = ["std"] } schnorrkel = { version = "0.11", default-features = false, features = ["std"] }
ciphersuite = { path = "../crypto/ciphersuite", default-features = false, features = ["std"] } ciphersuite = { path = "../crypto/ciphersuite", default-features = false, features = ["std", "ristretto"] }
schnorr = { package = "schnorr-signatures", path = "../crypto/schnorr", default-features = false, features = ["std"] } dkg = { path = "../crypto/dkg", default-features = false, features = ["std"] }
frost = { package = "modular-frost", path = "../crypto/frost" }
frost-schnorrkel = { path = "../crypto/schnorrkel" } frost-schnorrkel = { path = "../crypto/schnorrkel" }
hex = { version = "0.4", default-features = false, features = ["std"] }
scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive", "bit-vec"] } scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive", "bit-vec"] }
borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] }
zalloc = { path = "../common/zalloc" } zalloc = { path = "../common/zalloc" }
serai-db = { path = "../common/db" } serai-db = { path = "../common/db" }
@@ -43,9 +44,6 @@ tributary-sdk = { path = "./tributary-sdk" }
serai-client = { path = "../substrate/client", default-features = false, features = ["serai", "borsh"] } serai-client = { path = "../substrate/client", default-features = false, features = ["serai", "borsh"] }
hex = { version = "0.4", default-features = false, features = ["std"] }
borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] }
log = { version = "0.4", default-features = false, features = ["std"] } log = { version = "0.4", default-features = false, features = ["std"] }
env_logger = { version = "0.10", default-features = false, features = ["humantime"] } env_logger = { version = "0.10", default-features = false, features = ["humantime"] }

View File

@@ -1,5 +1,5 @@
use core::future::Future; use core::future::Future;
use std::time::{Duration, SystemTime}; use std::time::{Duration, Instant, SystemTime};
use serai_db::*; use serai_db::*;
use serai_task::ContinuallyRan; use serai_task::ContinuallyRan;
@@ -77,12 +77,22 @@ pub(crate) fn currently_evaluated_global_session(getter: &impl Get) -> Option<[u
pub(crate) struct CosignEvaluatorTask<D: Db, R: RequestNotableCosigns> { pub(crate) struct CosignEvaluatorTask<D: Db, R: RequestNotableCosigns> {
pub(crate) db: D, pub(crate) db: D,
pub(crate) request: R, pub(crate) request: R,
pub(crate) last_request_for_cosigns: Instant,
} }
impl<D: Db, R: RequestNotableCosigns> ContinuallyRan for CosignEvaluatorTask<D, R> { impl<D: Db, R: RequestNotableCosigns> ContinuallyRan for CosignEvaluatorTask<D, R> {
type Error = String; type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> { fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
let should_request_cosigns = |last_request_for_cosigns: &mut Instant| {
const REQUEST_COSIGNS_SPACING: Duration = Duration::from_secs(60);
if Instant::now() < (*last_request_for_cosigns + REQUEST_COSIGNS_SPACING) {
return false;
}
*last_request_for_cosigns = Instant::now();
true
};
async move { async move {
let mut known_cosign = None; let mut known_cosign = None;
let mut made_progress = false; let mut made_progress = false;
@@ -118,12 +128,13 @@ impl<D: Db, R: RequestNotableCosigns> ContinuallyRan for CosignEvaluatorTask<D,
// Check if the sum weight doesn't cross the required threshold // Check if the sum weight doesn't cross the required threshold
if weight_cosigned < (((global_session_info.total_stake * 83) / 100) + 1) { if weight_cosigned < (((global_session_info.total_stake * 83) / 100) + 1) {
// Request the necessary cosigns over the network // Request the necessary cosigns over the network
// TODO: Add a timer to ensure this isn't called too often if should_request_cosigns(&mut self.last_request_for_cosigns) {
self self
.request .request
.request_notable_cosigns(global_session) .request_notable_cosigns(global_session)
.await .await
.map_err(|e| format!("{e:?}"))?; .map_err(|e| format!("{e:?}"))?;
}
// We return an error so the delay before this task is run again increases // We return an error so the delay before this task is run again increases
return Err(format!( return Err(format!(
"notable block (#{block_number}) wasn't yet cosigned. this should resolve shortly", "notable block (#{block_number}) wasn't yet cosigned. this should resolve shortly",
@@ -180,11 +191,13 @@ impl<D: Db, R: RequestNotableCosigns> ContinuallyRan for CosignEvaluatorTask<D,
// If this session hasn't yet produced notable cosigns, then we presume we'll see // If this session hasn't yet produced notable cosigns, then we presume we'll see
// the desired non-notable cosigns as part of normal operations, without needing to // the desired non-notable cosigns as part of normal operations, without needing to
// explicitly request them // explicitly request them
self if should_request_cosigns(&mut self.last_request_for_cosigns) {
.request self
.request_notable_cosigns(global_session) .request
.await .request_notable_cosigns(global_session)
.map_err(|e| format!("{e:?}"))?; .await
.map_err(|e| format!("{e:?}"))?;
}
// We return an error so the delay before this task is run again increases // We return an error so the delay before this task is run again increases
return Err(format!( return Err(format!(
"block (#{block_number}) wasn't yet cosigned. this should resolve shortly", "block (#{block_number}) wasn't yet cosigned. this should resolve shortly",

View File

@@ -3,7 +3,7 @@
#![deny(missing_docs)] #![deny(missing_docs)]
use core::{fmt::Debug, future::Future}; use core::{fmt::Debug, future::Future};
use std::{sync::Arc, collections::HashMap}; use std::{sync::Arc, collections::HashMap, time::Instant};
use blake2::{Digest, Blake2s256}; use blake2::{Digest, Blake2s256};
@@ -288,8 +288,12 @@ impl<D: Db> Cosigning<D> {
.continually_run(intend_task, vec![evaluator_task_handle]), .continually_run(intend_task, vec![evaluator_task_handle]),
); );
tokio::spawn( tokio::spawn(
(evaluator::CosignEvaluatorTask { db: db.clone(), request }) (evaluator::CosignEvaluatorTask {
.continually_run(evaluator_task, vec![delay_task_handle]), db: db.clone(),
request,
last_request_for_cosigns: Instant::now(),
})
.continually_run(evaluator_task, vec![delay_task_handle]),
); );
tokio::spawn( tokio::spawn(
(delay::CosignDelayTask { db: db.clone() }) (delay::CosignDelayTask { db: db.clone() })

View File

@@ -51,6 +51,14 @@ impl Validators {
serai: impl Borrow<Serai>, serai: impl Borrow<Serai>,
sessions: impl Borrow<HashMap<NetworkId, Session>>, sessions: impl Borrow<HashMap<NetworkId, Session>>,
) -> Result<Vec<(NetworkId, Session, HashSet<PeerId>)>, SeraiError> { ) -> Result<Vec<(NetworkId, Session, HashSet<PeerId>)>, SeraiError> {
/*
This uses the latest finalized block, not the latest cosigned block, which should be fine as
in the worst case, we'd connect to unexpected validators. They still shouldn't be able to
bypass the cosign protocol unless a historical global session was malicious, in which case
the cosign protocol already breaks.
Besides, we can't connect to historical validators, only the current validators.
*/
let temporal_serai = serai.borrow().as_of_latest_finalized_block().await?; let temporal_serai = serai.borrow().as_of_latest_finalized_block().await?;
let temporal_serai = temporal_serai.validator_sets(); let temporal_serai = temporal_serai.validator_sets();

View File

@@ -3,9 +3,11 @@ use std::{path::Path, fs};
pub(crate) use serai_db::{Get, DbTxn, Db as DbTrait}; pub(crate) use serai_db::{Get, DbTxn, Db as DbTrait};
use serai_db::{create_db, db_channel}; use serai_db::{create_db, db_channel};
use dkg::Participant;
use serai_client::{ use serai_client::{
primitives::NetworkId, primitives::NetworkId,
validator_sets::primitives::{Session, ValidatorSet}, validator_sets::primitives::{Session, ValidatorSet, KeyPair},
}; };
use serai_cosign::SignedCosign; use serai_cosign::SignedCosign;
@@ -13,7 +15,7 @@ use serai_coordinator_substrate::NewSetInformation;
use serai_coordinator_tributary::Transaction; use serai_coordinator_tributary::Transaction;
#[cfg(all(feature = "parity-db", not(feature = "rocksdb")))] #[cfg(all(feature = "parity-db", not(feature = "rocksdb")))]
pub(crate) type Db = serai_db::ParityDb; pub(crate) type Db = std::sync::Arc<serai_db::ParityDb>;
#[cfg(feature = "rocksdb")] #[cfg(feature = "rocksdb")]
pub(crate) type Db = serai_db::RocksDB; pub(crate) type Db = serai_db::RocksDB;
@@ -76,6 +78,10 @@ create_db! {
LastProcessorMessage: (network: NetworkId) -> u64, LastProcessorMessage: (network: NetworkId) -> u64,
// Cosigns we produced and tried to intake yet incurred an error while doing so // Cosigns we produced and tried to intake yet incurred an error while doing so
ErroneousCosigns: () -> Vec<SignedCosign>, ErroneousCosigns: () -> Vec<SignedCosign>,
// The keys to confirm and set on the Serai network
KeysToConfirm: (set: ValidatorSet) -> KeyPair,
// The key was set on the Serai network
KeySet: (set: ValidatorSet) -> (),
} }
} }
@@ -93,21 +99,51 @@ mod _internal_db {
db_channel! { db_channel! {
Coordinator { Coordinator {
// Tributary transactions to publish // Tributary transactions to publish from the Processor messages
TributaryTransactions: (set: ValidatorSet) -> Transaction, TributaryTransactionsFromProcessorMessages: (set: ValidatorSet) -> Transaction,
// Tributary transactions to publish from the DKG confirmation task
TributaryTransactionsFromDkgConfirmation: (set: ValidatorSet) -> Transaction,
// Participants to remove
RemoveParticipant: (set: ValidatorSet) -> Participant,
} }
} }
} }
pub(crate) struct TributaryTransactions; pub(crate) struct TributaryTransactionsFromProcessorMessages;
impl TributaryTransactions { impl TributaryTransactionsFromProcessorMessages {
pub(crate) fn send(txn: &mut impl DbTxn, set: ValidatorSet, tx: &Transaction) { pub(crate) fn send(txn: &mut impl DbTxn, set: ValidatorSet, tx: &Transaction) {
// If this set has yet to be retired, send this transaction // If this set has yet to be retired, send this transaction
if RetiredTributary::get(txn, set.network).map(|session| session.0) < Some(set.session.0) { if RetiredTributary::get(txn, set.network).map(|session| session.0) < Some(set.session.0) {
_internal_db::TributaryTransactions::send(txn, set, tx); _internal_db::TributaryTransactionsFromProcessorMessages::send(txn, set, tx);
} }
} }
pub(crate) fn try_recv(txn: &mut impl DbTxn, set: ValidatorSet) -> Option<Transaction> { pub(crate) fn try_recv(txn: &mut impl DbTxn, set: ValidatorSet) -> Option<Transaction> {
_internal_db::TributaryTransactions::try_recv(txn, set) _internal_db::TributaryTransactionsFromProcessorMessages::try_recv(txn, set)
}
}
pub(crate) struct TributaryTransactionsFromDkgConfirmation;
impl TributaryTransactionsFromDkgConfirmation {
pub(crate) fn send(txn: &mut impl DbTxn, set: ValidatorSet, tx: &Transaction) {
// If this set has yet to be retired, send this transaction
if RetiredTributary::get(txn, set.network).map(|session| session.0) < Some(set.session.0) {
_internal_db::TributaryTransactionsFromDkgConfirmation::send(txn, set, tx);
}
}
pub(crate) fn try_recv(txn: &mut impl DbTxn, set: ValidatorSet) -> Option<Transaction> {
_internal_db::TributaryTransactionsFromDkgConfirmation::try_recv(txn, set)
}
}
pub(crate) struct RemoveParticipant;
impl RemoveParticipant {
pub(crate) fn send(txn: &mut impl DbTxn, set: ValidatorSet, participant: Participant) {
// If this set has yet to be retired, send this transaction
if RetiredTributary::get(txn, set.network).map(|session| session.0) < Some(set.session.0) {
_internal_db::RemoveParticipant::send(txn, set, &participant);
}
}
pub(crate) fn try_recv(txn: &mut impl DbTxn, set: ValidatorSet) -> Option<Participant> {
_internal_db::RemoveParticipant::try_recv(txn, set)
} }
} }

View File

@@ -0,0 +1,434 @@
use core::{ops::Deref, future::Future};
use std::{boxed::Box, collections::HashMap};
use zeroize::Zeroizing;
use rand_core::OsRng;
use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
use frost_schnorrkel::{
frost::{
dkg::{Participant, musig::musig},
FrostError,
sign::*,
},
Schnorrkel,
};
use serai_db::{DbTxn, Db as DbTrait};
use serai_client::{
primitives::SeraiAddress,
validator_sets::primitives::{ValidatorSet, musig_context, set_keys_message},
};
use serai_task::{DoesNotError, ContinuallyRan};
use serai_coordinator_substrate::{NewSetInformation, Keys};
use serai_coordinator_tributary::{Transaction, DkgConfirmationMessages};
use crate::{KeysToConfirm, KeySet, TributaryTransactionsFromDkgConfirmation};
fn schnorrkel() -> Schnorrkel {
Schnorrkel::new(b"substrate") // TODO: Pull the constant for this
}
fn our_i(
set: &NewSetInformation,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
data: &HashMap<Participant, Vec<u8>>,
) -> Participant {
let public = SeraiAddress((Ristretto::generator() * key.deref()).to_bytes());
let mut our_i = None;
for participant in data.keys() {
let validator_index = usize::from(u16::from(*participant) - 1);
let (validator, _weight) = set.validators[validator_index];
if validator == public {
our_i = Some(*participant);
}
}
our_i.unwrap()
}
// Take a HashMap of participations with non-contiguous Participants and convert them to a
// contiguous sequence.
//
// The input data is expected to not include our own data, which also won't be in the output data.
//
// Returns the mapping from the contiguous Participants to the original Participants.
fn make_contiguous<T>(
our_i: Participant,
mut data: HashMap<Participant, Vec<u8>>,
transform: impl Fn(Vec<u8>) -> std::io::Result<T>,
) -> Result<HashMap<Participant, T>, Participant> {
assert!(!data.contains_key(&our_i));
let mut ordered_participants = data.keys().copied().collect::<Vec<_>>();
ordered_participants.sort_by_key(|participant| u16::from(*participant));
let mut our_i = Some(our_i);
let mut contiguous = HashMap::new();
let mut i = 1;
for participant in ordered_participants {
// If this is the first participant after our own index, increment to account for our index
if let Some(our_i_value) = our_i {
if u16::from(participant) > u16::from(our_i_value) {
i += 1;
our_i = None;
}
}
let contiguous_index = Participant::new(i).unwrap();
let data = match transform(data.remove(&participant).unwrap()) {
Ok(data) => data,
Err(_) => Err(participant)?,
};
contiguous.insert(contiguous_index, data);
i += 1;
}
Ok(contiguous)
}
fn handle_frost_error<T>(result: Result<T, FrostError>) -> Result<T, Participant> {
match &result {
Ok(_) => Ok(result.unwrap()),
Err(FrostError::InvalidPreprocess(participant) | FrostError::InvalidShare(participant)) => {
Err(*participant)
}
// All of these should be unreachable
Err(
FrostError::InternalError(_) |
FrostError::InvalidParticipant(_, _) |
FrostError::InvalidSigningSet(_) |
FrostError::InvalidParticipantQuantity(_, _) |
FrostError::DuplicatedParticipant(_) |
FrostError::MissingParticipant(_),
) => {
result.unwrap();
unreachable!("continued execution after unwrapping Result::Err");
}
}
}
#[rustfmt::skip]
enum Signer {
Preprocess { attempt: u32, seed: CachedPreprocess, preprocess: [u8; 64] },
Share {
attempt: u32,
musig_validators: Vec<SeraiAddress>,
share: [u8; 32],
machine: Box<AlgorithmSignatureMachine<Ristretto, Schnorrkel>>,
},
}
/// Performs the DKG Confirmation protocol.
pub(crate) struct ConfirmDkgTask<CD: DbTrait, TD: DbTrait> {
db: CD,
set: NewSetInformation,
tributary_db: TD,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
signer: Option<Signer>,
}
impl<CD: DbTrait, TD: DbTrait> ConfirmDkgTask<CD, TD> {
pub(crate) fn new(
db: CD,
set: NewSetInformation,
tributary_db: TD,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
) -> Self {
Self { db, set, tributary_db, key, signer: None }
}
fn slash(db: &mut CD, set: ValidatorSet, validator: SeraiAddress) {
let mut txn = db.txn();
TributaryTransactionsFromDkgConfirmation::send(
&mut txn,
set,
&Transaction::RemoveParticipant { participant: validator, signed: Default::default() },
);
txn.commit();
}
fn preprocess(
db: &mut CD,
set: ValidatorSet,
attempt: u32,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
signer: &mut Option<Signer>,
) {
// Perform the preprocess
let (machine, preprocess) = AlgorithmMachine::new(
schnorrkel(),
// We use a 1-of-1 Musig here as we don't know who will actually be in this Musig yet
musig(&musig_context(set), key, &[Ristretto::generator() * key.deref()]).unwrap().into(),
)
.preprocess(&mut OsRng);
// We take the preprocess so we can use it in a distinct machine with the actual Musig
// parameters
let seed = machine.cache();
let mut preprocess_bytes = [0u8; 64];
preprocess_bytes.copy_from_slice(&preprocess.serialize());
let preprocess = preprocess_bytes;
let mut txn = db.txn();
// If this attempt has already been preprocessed for, the Tributary will de-duplicate it
// This may mean the Tributary preprocess is distinct from ours, but we check for that later
TributaryTransactionsFromDkgConfirmation::send(
&mut txn,
set,
&Transaction::DkgConfirmationPreprocess { attempt, preprocess, signed: Default::default() },
);
txn.commit();
*signer = Some(Signer::Preprocess { attempt, seed, preprocess });
}
}
impl<CD: DbTrait, TD: DbTrait> ContinuallyRan for ConfirmDkgTask<CD, TD> {
type Error = DoesNotError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut made_progress = false;
// If we were sent a key to set, create the signer for it
if self.signer.is_none() && KeysToConfirm::get(&self.db, self.set.set).is_some() {
// Create and publish the initial preprocess
Self::preprocess(&mut self.db, self.set.set, 0, &self.key, &mut self.signer);
made_progress = true;
}
// If we have keys to confirm, handle all messages from the tributary
if let Some(key_pair) = KeysToConfirm::get(&self.db, self.set.set) {
// Handle all messages from the Tributary
loop {
let mut tributary_txn = self.tributary_db.txn();
let Some(msg) = DkgConfirmationMessages::try_recv(&mut tributary_txn, self.set.set)
else {
break;
};
match msg {
messages::sign::CoordinatorMessage::Reattempt {
id: messages::sign::SignId { attempt, .. },
} => {
// Create and publish the preprocess for the specified attempt
Self::preprocess(&mut self.db, self.set.set, attempt, &self.key, &mut self.signer);
}
messages::sign::CoordinatorMessage::Preprocesses {
id: messages::sign::SignId { attempt, .. },
mut preprocesses,
} => {
// Confirm the preprocess we're expected to sign with is the one we locally have
// It may be different if we rebooted and made a second preprocess for this attempt
let Some(Signer::Preprocess { attempt: our_attempt, seed, preprocess }) =
self.signer.take()
else {
// If this message is not expected, commit the txn to drop it and move on
// At some point, we'll get a Reattempt and reset
tributary_txn.commit();
break;
};
// Determine the MuSig key signed with
let musig_validators = {
let mut ordered_participants = preprocesses.keys().copied().collect::<Vec<_>>();
ordered_participants.sort_by_key(|participant| u16::from(*participant));
let mut res = vec![];
for participant in ordered_participants {
let (validator, _weight) =
self.set.validators[usize::from(u16::from(participant) - 1)];
res.push(validator);
}
res
};
let musig_public_keys = musig_validators
.iter()
.map(|key| {
Ristretto::read_G(&mut key.0.as_slice())
.expect("Serai validator had invalid public key")
})
.collect::<Vec<_>>();
let keys =
musig(&musig_context(self.set.set), &self.key, &musig_public_keys).unwrap().into();
// Rebuild the machine
let (machine, preprocess_from_cache) =
AlgorithmSignMachine::from_cache(schnorrkel(), keys, seed);
assert_eq!(preprocess.as_slice(), preprocess_from_cache.serialize().as_slice());
// Ensure this is a consistent signing session
let our_i = our_i(&self.set, &self.key, &preprocesses);
let consistent = (attempt == our_attempt) &&
(preprocesses.remove(&our_i).unwrap().as_slice() == preprocess.as_slice());
if !consistent {
tributary_txn.commit();
break;
}
// Reformat the preprocesses into the expected format for Musig
let preprocesses = match make_contiguous(our_i, preprocesses, |preprocess| {
machine.read_preprocess(&mut preprocess.as_slice())
}) {
Ok(preprocesses) => preprocesses,
// This yields the *original participant index*
Err(participant) => {
Self::slash(
&mut self.db,
self.set.set,
self.set.validators[usize::from(u16::from(participant) - 1)].0,
);
tributary_txn.commit();
break;
}
};
// Calculate our share
let (machine, share) = match handle_frost_error(
machine.sign(preprocesses, &set_keys_message(&self.set.set, &key_pair)),
) {
Ok((machine, share)) => (machine, share),
// This yields the *musig participant index*
Err(participant) => {
Self::slash(
&mut self.db,
self.set.set,
musig_validators[usize::from(u16::from(participant) - 1)],
);
tributary_txn.commit();
break;
}
};
// Send our share
let share = <[u8; 32]>::try_from(share.serialize()).unwrap();
let mut txn = self.db.txn();
TributaryTransactionsFromDkgConfirmation::send(
&mut txn,
self.set.set,
&Transaction::DkgConfirmationShare { attempt, share, signed: Default::default() },
);
txn.commit();
self.signer = Some(Signer::Share {
attempt,
musig_validators,
share,
machine: Box::new(machine),
});
}
messages::sign::CoordinatorMessage::Shares {
id: messages::sign::SignId { attempt, .. },
mut shares,
} => {
let Some(Signer::Share { attempt: our_attempt, musig_validators, share, machine }) =
self.signer.take()
else {
tributary_txn.commit();
break;
};
// Ensure this is a consistent signing session
let our_i = our_i(&self.set, &self.key, &shares);
let consistent = (attempt == our_attempt) &&
(shares.remove(&our_i).unwrap().as_slice() == share.as_slice());
if !consistent {
tributary_txn.commit();
break;
}
// Reformat the shares into the expected format for Musig
let shares = match make_contiguous(our_i, shares, |share| {
machine.read_share(&mut share.as_slice())
}) {
Ok(shares) => shares,
// This yields the *original participant index*
Err(participant) => {
Self::slash(
&mut self.db,
self.set.set,
self.set.validators[usize::from(u16::from(participant) - 1)].0,
);
tributary_txn.commit();
break;
}
};
match handle_frost_error(machine.complete(shares)) {
Ok(signature) => {
// Create the bitvec of the participants
let mut signature_participants;
{
use bitvec::prelude::*;
signature_participants = bitvec![u8, Lsb0; 0; 0];
let mut i = 0;
for (validator, _) in &self.set.validators {
if Some(validator) == musig_validators.get(i) {
signature_participants.push(true);
i += 1;
} else {
signature_participants.push(false);
}
}
}
// This is safe to call multiple times as it'll just change which *valid*
// signature to publish
let mut txn = self.db.txn();
Keys::set(
&mut txn,
self.set.set,
key_pair.clone(),
signature_participants,
signature.into(),
);
txn.commit();
}
// This yields the *musig participant index*
Err(participant) => {
Self::slash(
&mut self.db,
self.set.set,
musig_validators[usize::from(u16::from(participant) - 1)],
);
tributary_txn.commit();
break;
}
}
}
}
// Because we successfully handled this message, note we made proress
made_progress = true;
tributary_txn.commit();
}
}
// Check if the key has been set on Serai
if KeysToConfirm::get(&self.db, self.set.set).is_some() &&
KeySet::get(&self.db, self.set.set).is_some()
{
// Take the keys to confirm so we never instantiate the signer again
let mut txn = self.db.txn();
KeysToConfirm::take(&mut txn, self.set.set);
KeySet::take(&mut txn, self.set.set);
txn.commit();
// Drop our own signer
// The task won't die until the Tributary does, but now it'll never do anything again
self.signer = None;
made_progress = true;
}
Ok(made_progress)
}
}
}

View File

@@ -14,8 +14,8 @@ use borsh::BorshDeserialize;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use serai_client::{ use serai_client::{
primitives::{NetworkId, PublicKey}, primitives::{NetworkId, PublicKey, SeraiAddress, Signature},
validator_sets::primitives::ValidatorSet, validator_sets::primitives::{ValidatorSet, KeyPair},
Serai, Serai,
}; };
use message_queue::{Service, client::MessageQueue}; use message_queue::{Service, client::MessageQueue};
@@ -23,13 +23,17 @@ use message_queue::{Service, client::MessageQueue};
use serai_task::{Task, TaskHandle, ContinuallyRan}; use serai_task::{Task, TaskHandle, ContinuallyRan};
use serai_cosign::{Faulted, SignedCosign, Cosigning}; use serai_cosign::{Faulted, SignedCosign, Cosigning};
use serai_coordinator_substrate::{CanonicalEventStream, EphemeralEventStream, SignSlashReport}; use serai_coordinator_substrate::{
use serai_coordinator_tributary::{Signed, Transaction, SubstrateBlockPlans}; CanonicalEventStream, EphemeralEventStream, SignSlashReport, SetKeysTask, SignedBatches,
PublishBatchTask, SlashReports, PublishSlashReportTask,
};
use serai_coordinator_tributary::{SigningProtocolRound, Signed, Transaction, SubstrateBlockPlans};
mod db; mod db;
use db::*; use db::*;
mod tributary; mod tributary;
mod dkg_confirmation;
mod substrate; mod substrate;
use substrate::SubstrateTask; use substrate::SubstrateTask;
@@ -145,11 +149,25 @@ fn spawn_cosigning<D: serai_db::Db>(
}); });
} }
async fn handle_processor_messages( async fn handle_network(
mut db: impl serai_db::Db, mut db: impl serai_db::Db,
message_queue: Arc<MessageQueue>, message_queue: Arc<MessageQueue>,
serai: Arc<Serai>,
network: NetworkId, network: NetworkId,
) { ) {
// Spawn the task to publish batches for this network
{
let (publish_batch_task_def, publish_batch_task) = Task::new();
tokio::spawn(
PublishBatchTask::new(db.clone(), serai.clone(), network)
.unwrap()
.continually_run(publish_batch_task_def, vec![]),
);
// Forget its handle so it always runs in the background
core::mem::forget(publish_batch_task);
}
// Handle Processor messages
loop { loop {
let (msg_id, msg) = { let (msg_id, msg) = {
let msg = message_queue.next(Service::Processor(network)).await; let msg = message_queue.next(Service::Processor(network)).await;
@@ -180,7 +198,7 @@ async fn handle_processor_messages(
messages::ProcessorMessage::KeyGen(msg) => match msg { messages::ProcessorMessage::KeyGen(msg) => match msg {
messages::key_gen::ProcessorMessage::Participation { session, participation } => { messages::key_gen::ProcessorMessage::Participation { session, participation } => {
let set = ValidatorSet { network, session }; let set = ValidatorSet { network, session };
TributaryTransactions::send( TributaryTransactionsFromProcessorMessages::send(
&mut txn, &mut txn,
set, set,
&Transaction::DkgParticipation { participation, signed: Signed::default() }, &Transaction::DkgParticipation { participation, signed: Signed::default() },
@@ -190,45 +208,84 @@ async fn handle_processor_messages(
session, session,
substrate_key, substrate_key,
network_key, network_key,
} => todo!("TODO Transaction::DkgConfirmationPreprocess"), } => {
messages::key_gen::ProcessorMessage::Blame { session, participant } => { KeysToConfirm::set(
let set = ValidatorSet { network, session };
TributaryTransactions::send(
&mut txn, &mut txn,
set, ValidatorSet { network, session },
&Transaction::RemoveParticipant { &KeyPair(
participant: todo!("TODO"), PublicKey::from_raw(substrate_key),
signed: Signed::default(), network_key
}, .try_into()
.expect("generated a network key which exceeds the maximum key length"),
),
); );
} }
messages::key_gen::ProcessorMessage::Blame { session, participant } => {
RemoveParticipant::send(&mut txn, ValidatorSet { network, session }, participant);
}
}, },
messages::ProcessorMessage::Sign(msg) => match msg { messages::ProcessorMessage::Sign(msg) => match msg {
messages::sign::ProcessorMessage::InvalidParticipant { session, participant } => { messages::sign::ProcessorMessage::InvalidParticipant { session, participant } => {
let set = ValidatorSet { network, session }; RemoveParticipant::send(&mut txn, ValidatorSet { network, session }, participant);
TributaryTransactions::send( }
messages::sign::ProcessorMessage::Preprocesses { id, preprocesses } => {
let set = ValidatorSet { network, session: id.session };
if id.attempt == 0 {
// Batches are declared by their intent to be signed
if let messages::sign::VariantSignId::Batch(hash) = id.id {
TributaryTransactionsFromProcessorMessages::send(
&mut txn,
set,
&Transaction::Batch { hash },
);
}
}
TributaryTransactionsFromProcessorMessages::send(
&mut txn, &mut txn,
set, set,
&Transaction::RemoveParticipant { &Transaction::Sign {
participant: todo!("TODO"), id: id.id,
attempt: id.attempt,
round: SigningProtocolRound::Preprocess,
data: preprocesses,
signed: Signed::default(), signed: Signed::default(),
}, },
); );
} }
messages::sign::ProcessorMessage::Preprocesses { id, preprocesses } => { messages::sign::ProcessorMessage::Shares { id, shares } => {
todo!("TODO Transaction::Batch + Transaction::Sign") let set = ValidatorSet { network, session: id.session };
TributaryTransactionsFromProcessorMessages::send(
&mut txn,
set,
&Transaction::Sign {
id: id.id,
attempt: id.attempt,
round: SigningProtocolRound::Share,
data: shares,
signed: Signed::default(),
},
);
} }
messages::sign::ProcessorMessage::Shares { id, shares } => todo!("TODO Transaction::Sign"),
}, },
messages::ProcessorMessage::Coordinator(msg) => match msg { messages::ProcessorMessage::Coordinator(msg) => match msg {
messages::coordinator::ProcessorMessage::CosignedBlock { cosign } => { messages::coordinator::ProcessorMessage::CosignedBlock { cosign } => {
SignedCosigns::send(&mut txn, &cosign); SignedCosigns::send(&mut txn, &cosign);
} }
messages::coordinator::ProcessorMessage::SignedBatch { batch } => { messages::coordinator::ProcessorMessage::SignedBatch { batch } => {
todo!("TODO PublishBatchTask") SignedBatches::send(&mut txn, &batch);
} }
messages::coordinator::ProcessorMessage::SignedSlashReport { session, signature } => { messages::coordinator::ProcessorMessage::SignedSlashReport {
todo!("TODO PublishSlashReportTask") session,
slash_report,
signature,
} => {
SlashReports::set(
&mut txn,
ValidatorSet { network, session },
slash_report,
Signature(signature),
);
} }
}, },
messages::ProcessorMessage::Substrate(msg) => match msg { messages::ProcessorMessage::Substrate(msg) => match msg {
@@ -243,7 +300,7 @@ async fn handle_processor_messages(
for (session, plans) in by_session { for (session, plans) in by_session {
let set = ValidatorSet { network, session }; let set = ValidatorSet { network, session };
SubstrateBlockPlans::set(&mut txn, set, block, &plans); SubstrateBlockPlans::set(&mut txn, set, block, &plans);
TributaryTransactions::send( TributaryTransactionsFromProcessorMessages::send(
&mut txn, &mut txn,
set, set,
&Transaction::SubstrateBlock { hash: block }, &Transaction::SubstrateBlock { hash: block },
@@ -309,10 +366,16 @@ async fn main() {
// Cleanup all historic Tributaries // Cleanup all historic Tributaries
while let Some(to_cleanup) = TributaryCleanup::try_recv(&mut txn) { while let Some(to_cleanup) = TributaryCleanup::try_recv(&mut txn) {
prune_tributary_db(to_cleanup); prune_tributary_db(to_cleanup);
// Remove the keys to confirm for this network
KeysToConfirm::take(&mut txn, to_cleanup);
KeySet::take(&mut txn, to_cleanup);
// Drain the cosign intents created for this set // Drain the cosign intents created for this set
while !Cosigning::<Db>::intended_cosigns(&mut txn, to_cleanup).is_empty() {} while !Cosigning::<Db>::intended_cosigns(&mut txn, to_cleanup).is_empty() {}
// Drain the transactions to publish for this set // Drain the transactions to publish for this set
while TributaryTransactions::try_recv(&mut txn, to_cleanup).is_some() {} while TributaryTransactionsFromProcessorMessages::try_recv(&mut txn, to_cleanup).is_some() {}
while TributaryTransactionsFromDkgConfirmation::try_recv(&mut txn, to_cleanup).is_some() {}
// Drain the participants to remove for this set
while RemoveParticipant::try_recv(&mut txn, to_cleanup).is_some() {}
// Remove the SignSlashReport notification // Remove the SignSlashReport notification
SignSlashReport::try_recv(&mut txn, to_cleanup); SignSlashReport::try_recv(&mut txn, to_cleanup);
} }
@@ -376,7 +439,7 @@ async fn main() {
EphemeralEventStream::new( EphemeralEventStream::new(
db.clone(), db.clone(),
serai.clone(), serai.clone(),
PublicKey::from_raw((<Ristretto as Ciphersuite>::generator() * serai_key.deref()).to_bytes()), SeraiAddress((<Ristretto as Ciphersuite>::generator() * serai_key.deref()).to_bytes()),
) )
.continually_run(substrate_ephemeral_task_def, vec![substrate_task]), .continually_run(substrate_ephemeral_task_def, vec![substrate_task]),
); );
@@ -417,12 +480,32 @@ async fn main() {
.continually_run(substrate_task_def, vec![]), .continually_run(substrate_task_def, vec![]),
); );
// Handle all of the Processors' messages // Handle each of the networks
for network in serai_client::primitives::NETWORKS { for network in serai_client::primitives::NETWORKS {
if network == NetworkId::Serai { if network == NetworkId::Serai {
continue; continue;
} }
tokio::spawn(handle_processor_messages(db.clone(), message_queue.clone(), network)); tokio::spawn(handle_network(db.clone(), message_queue.clone(), serai.clone(), network));
}
// Spawn the task to set keys
{
let (set_keys_task_def, set_keys_task) = Task::new();
tokio::spawn(
SetKeysTask::new(db.clone(), serai.clone()).continually_run(set_keys_task_def, vec![]),
);
// Forget its handle so it always runs in the background
core::mem::forget(set_keys_task);
}
// Spawn the task to publish slash reports
{
let (publish_slash_report_task_def, publish_slash_report_task) = Task::new();
tokio::spawn(
PublishSlashReportTask::new(db, serai).continually_run(publish_slash_report_task_def, vec![]),
);
// Always have this run in the background
core::mem::forget(publish_slash_report_task);
} }
// Run the spawned tasks ad-infinitum // Run the spawned tasks ad-infinitum

View File

@@ -19,7 +19,7 @@ use serai_task::ContinuallyRan;
use serai_coordinator_tributary::Transaction; use serai_coordinator_tributary::Transaction;
use serai_coordinator_p2p::P2p; use serai_coordinator_p2p::P2p;
use crate::Db; use crate::{Db, KeySet};
pub(crate) struct SubstrateTask<P: P2p> { pub(crate) struct SubstrateTask<P: P2p> {
pub(crate) serai_key: Zeroizing<<Ristretto as Ciphersuite>::F>, pub(crate) serai_key: Zeroizing<<Ristretto as Ciphersuite>::F>,
@@ -47,8 +47,9 @@ impl<P: P2p> ContinuallyRan for SubstrateTask<P> {
}; };
match msg { match msg {
// TODO: Stop trying to confirm the DKG messages::substrate::CoordinatorMessage::SetKeys { session, .. } => {
messages::substrate::CoordinatorMessage::SetKeys { .. } => todo!("TODO"), KeySet::set(&mut txn, ValidatorSet { network, session }, &());
}
messages::substrate::CoordinatorMessage::SlashesReported { session } => { messages::substrate::CoordinatorMessage::SlashesReported { session } => {
let prior_retired = crate::db::RetiredTributary::get(&txn, network); let prior_retired = crate::db::RetiredTributary::get(&txn, network);
let next_to_be_retired = let next_to_be_retired =

View File

@@ -21,10 +21,21 @@ use message_queue::{Service, Metadata, client::MessageQueue};
use serai_cosign::{Faulted, CosignIntent, Cosigning}; use serai_cosign::{Faulted, CosignIntent, Cosigning};
use serai_coordinator_substrate::{NewSetInformation, SignSlashReport}; use serai_coordinator_substrate::{NewSetInformation, SignSlashReport};
use serai_coordinator_tributary::{Transaction, ProcessorMessages, CosignIntents, ScanTributaryTask}; use serai_coordinator_tributary::{
Topic, Transaction, ProcessorMessages, CosignIntents, RecognizedTopics, ScanTributaryTask,
};
use serai_coordinator_p2p::P2p; use serai_coordinator_p2p::P2p;
use crate::{Db, TributaryTransactions}; use crate::{
Db, TributaryTransactionsFromProcessorMessages, TributaryTransactionsFromDkgConfirmation,
RemoveParticipant, dkg_confirmation::ConfirmDkgTask,
};
create_db! {
Coordinator {
PublishOnRecognition: (set: ValidatorSet, topic: Topic) -> Transaction,
}
}
db_channel! { db_channel! {
Coordinator { Coordinator {
@@ -147,12 +158,101 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
} }
} }
/// Adds all of the transactions sent via `TributaryTransactions`. #[must_use]
async fn add_signed_unsigned_transaction<TD: DbTrait, P: P2p>(
tributary: &Tributary<TD, Transaction, P>,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
mut tx: Transaction,
) -> bool {
// If this is a signed transaction, sign it
if matches!(tx.kind(), TransactionKind::Signed(_, _)) {
tx.sign(&mut OsRng, tributary.genesis(), key);
}
let res = tributary.add_transaction(tx.clone()).await;
match &res {
// Fresh publication, already published
Ok(true | false) => {}
Err(
TransactionError::TooLargeTransaction |
TransactionError::InvalidSigner |
TransactionError::InvalidSignature |
TransactionError::InvalidContent,
) => {
panic!("created an invalid transaction, tx: {tx:?}, err: {res:?}");
}
// InvalidNonce may be out-of-order TXs, not invalid ones, but we only create nonce #n+1 after
// on-chain inclusion of the TX with nonce #n, so it is invalid within our context unless the
// issue is this transaction was already included on-chain
Err(TransactionError::InvalidNonce) => {
let TransactionKind::Signed(order, signed) = tx.kind() else {
panic!("non-Signed transaction had InvalidNonce");
};
let next_nonce = tributary
.next_nonce(&signed.signer, &order)
.await
.expect("signer who is a present validator didn't have a nonce");
assert!(next_nonce != signed.nonce);
// We're publishing an old transaction
if next_nonce > signed.nonce {
return true;
}
panic!("nonce in transaction wasn't contiguous with nonce on-chain");
}
// We've published too many transactions recently
Err(TransactionError::TooManyInMempool) => {
return false;
}
// This isn't a Provided transaction so this should never be hit
Err(TransactionError::ProvidedAddedToMempool) => unreachable!(),
}
true
}
async fn add_with_recognition_check<TD: DbTrait, P: P2p>(
set: ValidatorSet,
tributary_db: &mut TD,
tributary: &Tributary<TD, Transaction, P>,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
tx: Transaction,
) -> bool {
let kind = tx.kind();
match kind {
TransactionKind::Provided(_) => provide_transaction(set, tributary, tx).await,
TransactionKind::Unsigned | TransactionKind::Signed(_, _) => {
// If this is a transaction with signing data, check the topic is recognized before
// publishing
let topic = tx.topic();
let still_requires_recognition = if let Some(topic) = topic {
(topic.requires_recognition() && (!RecognizedTopics::recognized(tributary_db, set, topic)))
.then_some(topic)
} else {
None
};
if let Some(topic) = still_requires_recognition {
// Queue the transaction until the topic is recognized
// We use the Tributary DB for this so it's cleaned up when the Tributary DB is
let mut tributary_txn = tributary_db.txn();
PublishOnRecognition::set(&mut tributary_txn, set, topic, &tx);
tributary_txn.commit();
} else {
// Actually add the transaction
if !add_signed_unsigned_transaction(tributary, key, tx).await {
return false;
}
}
}
}
true
}
/// Adds all of the transactions sent via `TributaryTransactionsFromProcessorMessages`.
pub(crate) struct AddTributaryTransactionsTask<CD: DbTrait, TD: DbTrait, P: P2p> { pub(crate) struct AddTributaryTransactionsTask<CD: DbTrait, TD: DbTrait, P: P2p> {
db: CD, db: CD,
tributary_db: TD, tributary_db: TD,
tributary: Tributary<TD, Transaction, P>, tributary: Tributary<TD, Transaction, P>,
set: ValidatorSet, set: NewSetInformation,
key: Zeroizing<<Ristretto as Ciphersuite>::F>, key: Zeroizing<<Ristretto as Ciphersuite>::F>,
} }
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for AddTributaryTransactionsTask<CD, TD, P> { impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for AddTributaryTransactionsTask<CD, TD, P> {
@@ -161,49 +261,87 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for AddTributaryTransactio
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> { fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move { async move {
let mut made_progress = false; let mut made_progress = false;
// Provide/add all transactions sent our way
loop { loop {
let mut txn = self.db.txn(); let mut txn = self.db.txn();
let Some(mut tx) = TributaryTransactions::try_recv(&mut txn, self.set) else { break }; let Some(tx) = TributaryTransactionsFromDkgConfirmation::try_recv(&mut txn, self.set.set)
else {
break;
};
let kind = tx.kind(); if !add_with_recognition_check(
match kind { self.set.set,
TransactionKind::Provided(_) => provide_transaction(self.set, &self.tributary, tx).await, &mut self.tributary_db,
TransactionKind::Unsigned | TransactionKind::Signed(_, _) => { &self.tributary,
// If this is a signed transaction, sign it &self.key,
if matches!(kind, TransactionKind::Signed(_, _)) { tx,
tx.sign(&mut OsRng, self.tributary.genesis(), &self.key); )
} .await
{
// Actually add the transaction break;
// TODO: If this is a preprocess, make sure the topic has been recognized
let res = self.tributary.add_transaction(tx.clone()).await;
match &res {
// Fresh publication, already published
Ok(true | false) => {}
Err(
TransactionError::TooLargeTransaction |
TransactionError::InvalidSigner |
TransactionError::InvalidNonce |
TransactionError::InvalidSignature |
TransactionError::InvalidContent,
) => {
panic!("created an invalid transaction, tx: {tx:?}, err: {res:?}");
}
// We've published too many transactions recently
// Drop this txn to try to publish it again later on a future iteration
Err(TransactionError::TooManyInMempool) => {
drop(txn);
break;
}
// This isn't a Provided transaction so this should never be hit
Err(TransactionError::ProvidedAddedToMempool) => unreachable!(),
}
}
} }
made_progress = true; made_progress = true;
txn.commit(); txn.commit();
} }
loop {
let mut txn = self.db.txn();
let Some(tx) = TributaryTransactionsFromProcessorMessages::try_recv(&mut txn, self.set.set)
else {
break;
};
if !add_with_recognition_check(
self.set.set,
&mut self.tributary_db,
&self.tributary,
&self.key,
tx,
)
.await
{
break;
}
made_progress = true;
txn.commit();
}
// Provide/add all transactions due to newly recognized topics
loop {
let mut tributary_txn = self.tributary_db.txn();
let Some(topic) =
RecognizedTopics::try_recv_topic_requiring_recognition(&mut tributary_txn, self.set.set)
else {
break;
};
if let Some(tx) = PublishOnRecognition::take(&mut tributary_txn, self.set.set, topic) {
if !add_signed_unsigned_transaction(&self.tributary, &self.key, tx).await {
break;
}
}
made_progress = true;
tributary_txn.commit();
}
// Publish any participant removals
loop {
let mut txn = self.db.txn();
let Some(participant) = RemoveParticipant::try_recv(&mut txn, self.set.set) else { break };
let tx = Transaction::RemoveParticipant {
participant: self.set.participant_indexes_reverse_lookup[&participant],
signed: Default::default(),
};
if !add_signed_unsigned_transaction(&self.tributary, &self.key, tx).await {
break;
}
made_progress = true;
txn.commit();
}
Ok(made_progress) Ok(made_progress)
} }
} }
@@ -323,6 +461,8 @@ async fn scan_on_new_block<CD: DbTrait, TD: DbTrait, P: P2p>(
/// - Spawn the ScanTributaryTask /// - Spawn the ScanTributaryTask
/// - Spawn the ProvideCosignCosignedTransactionsTask /// - Spawn the ProvideCosignCosignedTransactionsTask
/// - Spawn the TributaryProcessorMessagesTask /// - Spawn the TributaryProcessorMessagesTask
/// - Spawn the AddTributaryTransactionsTask
/// - Spawn the ConfirmDkgTask
/// - Spawn the SignSlashReportTask /// - Spawn the SignSlashReportTask
/// - Iterate the scan task whenever a new block occurs (not just on the standard interval) /// - Iterate the scan task whenever a new block occurs (not just on the standard interval)
pub(crate) async fn spawn_tributary<P: P2p>( pub(crate) async fn spawn_tributary<P: P2p>(
@@ -403,38 +543,45 @@ pub(crate) async fn spawn_tributary<P: P2p>(
// Spawn the scan task // Spawn the scan task
let (scan_tributary_task_def, scan_tributary_task) = Task::new(); let (scan_tributary_task_def, scan_tributary_task) = Task::new();
tokio::spawn( tokio::spawn(
ScanTributaryTask::<_, P>::new(tributary_db.clone(), &set, reader) ScanTributaryTask::<_, P>::new(tributary_db.clone(), set.clone(), reader)
// This is the only handle for this TributaryProcessorMessagesTask, so when this task is // This is the only handle for this TributaryProcessorMessagesTask, so when this task is
// dropped, it will be too // dropped, it will be too
.continually_run(scan_tributary_task_def, vec![scan_tributary_messages_task]), .continually_run(scan_tributary_task_def, vec![scan_tributary_messages_task]),
); );
// Spawn the sign slash report task
let (sign_slash_report_task_def, sign_slash_report_task) = Task::new();
tokio::spawn(
(SignSlashReportTask {
db: db.clone(),
tributary_db: tributary_db.clone(),
tributary: tributary.clone(),
set: set.clone(),
key: serai_key.clone(),
})
.continually_run(sign_slash_report_task_def, vec![]),
);
// Spawn the add transactions task // Spawn the add transactions task
let (add_tributary_transactions_task_def, add_tributary_transactions_task) = Task::new(); let (add_tributary_transactions_task_def, add_tributary_transactions_task) = Task::new();
tokio::spawn( tokio::spawn(
(AddTributaryTransactionsTask { (AddTributaryTransactionsTask {
db: db.clone(), db: db.clone(),
tributary_db, tributary_db: tributary_db.clone(),
tributary: tributary.clone(), tributary: tributary.clone(),
set: set.set, set: set.clone(),
key: serai_key, key: serai_key.clone(),
}) })
.continually_run(add_tributary_transactions_task_def, vec![]), .continually_run(add_tributary_transactions_task_def, vec![]),
); );
// Spawn the task to confirm the DKG result
let (confirm_dkg_task_def, confirm_dkg_task) = Task::new();
tokio::spawn(
ConfirmDkgTask::new(db.clone(), set.clone(), tributary_db.clone(), serai_key.clone())
.continually_run(confirm_dkg_task_def, vec![add_tributary_transactions_task]),
);
// Spawn the sign slash report task
let (sign_slash_report_task_def, sign_slash_report_task) = Task::new();
tokio::spawn(
(SignSlashReportTask {
db: db.clone(),
tributary_db,
tributary: tributary.clone(),
set: set.clone(),
key: serai_key,
})
.continually_run(sign_slash_report_task_def, vec![]),
);
// Whenever a new block occurs, immediately run the scan task // Whenever a new block occurs, immediately run the scan task
// This function also preserves the ProvideCosignCosignedTransactionsTask handle until the // This function also preserves the ProvideCosignCosignedTransactionsTask handle until the
// Tributary is retired, ensuring it isn't dropped prematurely and that the task don't run ad // Tributary is retired, ensuring it isn't dropped prematurely and that the task don't run ad
@@ -444,10 +591,6 @@ pub(crate) async fn spawn_tributary<P: P2p>(
set.set, set.set,
tributary, tributary,
scan_tributary_task, scan_tributary_task,
vec![ vec![provide_cosign_cosigned_transactions_task, confirm_dkg_task, sign_slash_report_task],
provide_cosign_cosigned_transactions_task,
sign_slash_report_task,
add_tributary_transactions_task,
],
)); ));
} }

View File

@@ -22,6 +22,9 @@ bitvec = { version = "1", default-features = false, features = ["std"] }
scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive", "bit-vec"] } scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive", "bit-vec"] }
borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] } borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] }
dkg = { path = "../../crypto/dkg", default-features = false, features = ["std"] }
serai-client = { path = "../../substrate/client", version = "0.1", default-features = false, features = ["serai", "borsh"] } serai-client = { path = "../../substrate/client", version = "0.1", default-features = false, features = ["serai", "borsh"] }
log = { version = "0.4", default-features = false, features = ["std"] } log = { version = "0.4", default-features = false, features = ["std"] }

View File

@@ -4,7 +4,7 @@ use std::sync::Arc;
use futures::stream::{StreamExt, FuturesOrdered}; use futures::stream::{StreamExt, FuturesOrdered};
use serai_client::{ use serai_client::{
primitives::{PublicKey, NetworkId, EmbeddedEllipticCurve}, primitives::{NetworkId, SeraiAddress, EmbeddedEllipticCurve},
validator_sets::primitives::MAX_KEY_SHARES_PER_SET, validator_sets::primitives::MAX_KEY_SHARES_PER_SET,
Serai, Serai,
}; };
@@ -26,14 +26,14 @@ create_db!(
pub struct EphemeralEventStream<D: Db> { pub struct EphemeralEventStream<D: Db> {
db: D, db: D,
serai: Arc<Serai>, serai: Arc<Serai>,
validator: PublicKey, validator: SeraiAddress,
} }
impl<D: Db> EphemeralEventStream<D> { impl<D: Db> EphemeralEventStream<D> {
/// Create a new ephemeral event stream. /// Create a new ephemeral event stream.
/// ///
/// Only one of these may exist over the provided database. /// Only one of these may exist over the provided database.
pub fn new(db: D, serai: Arc<Serai>, validator: PublicKey) -> Self { pub fn new(db: D, serai: Arc<Serai>, validator: SeraiAddress) -> Self {
Self { db, serai, validator } Self { db, serai, validator }
} }
} }
@@ -145,6 +145,10 @@ impl<D: Db> ContinuallyRan for EphemeralEventStream<D> {
"block #{block_number} declared a new set but didn't have the participants" "block #{block_number} declared a new set but didn't have the participants"
))? ))?
}; };
let validators = validators
.into_iter()
.map(|(validator, weight)| (SeraiAddress::from(validator), weight))
.collect::<Vec<_>>();
let in_set = validators.iter().any(|(validator, _)| *validator == self.validator); let in_set = validators.iter().any(|(validator, _)| *validator == self.validator);
if in_set { if in_set {
if u16::try_from(validators.len()).is_err() { if u16::try_from(validators.len()).is_err() {
@@ -177,14 +181,16 @@ impl<D: Db> ContinuallyRan for EphemeralEventStream<D> {
embedded_elliptic_curve_keys.push_back(async move { embedded_elliptic_curve_keys.push_back(async move {
tokio::try_join!( tokio::try_join!(
// One future to fetch the substrate embedded key // One future to fetch the substrate embedded key
serai serai.embedded_elliptic_curve_key(
.embedded_elliptic_curve_key(validator, EmbeddedEllipticCurve::Embedwards25519), validator.into(),
EmbeddedEllipticCurve::Embedwards25519
),
// One future to fetch the external embedded key, if there is a distinct curve // One future to fetch the external embedded key, if there is a distinct curve
async { async {
// `embedded_elliptic_curves` is documented to have the second entry be the // `embedded_elliptic_curves` is documented to have the second entry be the
// network-specific curve (if it exists and is distinct from Embedwards25519) // network-specific curve (if it exists and is distinct from Embedwards25519)
if let Some(curve) = set.network.embedded_elliptic_curves().get(1) { if let Some(curve) = set.network.embedded_elliptic_curves().get(1) {
serai.embedded_elliptic_curve_key(validator, *curve).await.map(Some) serai.embedded_elliptic_curve_key(validator.into(), *curve).await.map(Some)
} else { } else {
Ok(None) Ok(None)
} }
@@ -215,19 +221,22 @@ impl<D: Db> ContinuallyRan for EphemeralEventStream<D> {
} }
} }
crate::NewSet::send( let mut new_set = NewSetInformation {
&mut txn, set: *set,
&NewSetInformation { serai_block: block.block_hash,
set: *set, declaration_time: block.time,
serai_block: block.block_hash, // TODO: Why do we have this as an explicit field here?
declaration_time: block.time, // Shouldn't this be inlined into the Processor's key gen code, where it's used?
// TODO: Why do we have this as an explicit field here? threshold: ((total_weight * 2) / 3) + 1,
// Shouldn't thiis be inlined into the Processor's key gen code, where it's used? validators,
threshold: ((total_weight * 2) / 3) + 1, evrf_public_keys,
validators, participant_indexes: Default::default(),
evrf_public_keys, participant_indexes_reverse_lookup: Default::default(),
}, };
); // These aren't serialized, and we immediately serialize and drop this, so this isn't
// necessary. It's just good practice not have this be dirty
new_set.init_participant_indexes();
crate::NewSet::send(&mut txn, &new_set);
} }
} }

View File

@@ -2,12 +2,16 @@
#![doc = include_str!("../README.md")] #![doc = include_str!("../README.md")]
#![deny(missing_docs)] #![deny(missing_docs)]
use std::collections::HashMap;
use scale::{Encode, Decode}; use scale::{Encode, Decode};
use borsh::{io, BorshSerialize, BorshDeserialize}; use borsh::{BorshSerialize, BorshDeserialize};
use dkg::Participant;
use serai_client::{ use serai_client::{
primitives::{NetworkId, PublicKey, Signature, SeraiAddress}, primitives::{NetworkId, SeraiAddress, Signature},
validator_sets::primitives::{Session, ValidatorSet, KeyPair}, validator_sets::primitives::{Session, ValidatorSet, KeyPair, SlashReport},
in_instructions::primitives::SignedBatch, in_instructions::primitives::SignedBatch,
Transaction, Transaction,
}; };
@@ -26,22 +30,9 @@ pub use publish_batch::PublishBatchTask;
mod publish_slash_report; mod publish_slash_report;
pub use publish_slash_report::PublishSlashReportTask; pub use publish_slash_report::PublishSlashReportTask;
fn borsh_serialize_validators<W: io::Write>(
validators: &Vec<(PublicKey, u16)>,
writer: &mut W,
) -> Result<(), io::Error> {
// This doesn't use `encode_to` as `encode_to` panics if the writer returns an error
writer.write_all(&validators.encode())
}
fn borsh_deserialize_validators<R: io::Read>(
reader: &mut R,
) -> Result<Vec<(PublicKey, u16)>, io::Error> {
Decode::decode(&mut scale::IoReader(reader)).map_err(io::Error::other)
}
/// The information for a new set. /// The information for a new set.
#[derive(Clone, Debug, BorshSerialize, BorshDeserialize)] #[derive(Clone, Debug, BorshSerialize, BorshDeserialize)]
#[borsh(init = init_participant_indexes)]
pub struct NewSetInformation { pub struct NewSetInformation {
/// The set. /// The set.
pub set: ValidatorSet, pub set: ValidatorSet,
@@ -52,13 +43,37 @@ pub struct NewSetInformation {
/// The threshold to use. /// The threshold to use.
pub threshold: u16, pub threshold: u16,
/// The validators, with the amount of key shares they have. /// The validators, with the amount of key shares they have.
#[borsh( pub validators: Vec<(SeraiAddress, u16)>,
serialize_with = "borsh_serialize_validators",
deserialize_with = "borsh_deserialize_validators"
)]
pub validators: Vec<(PublicKey, u16)>,
/// The eVRF public keys. /// The eVRF public keys.
///
/// This will have the necessary copies of the keys proper for each validator's weight,
/// accordingly syncing up with `participant_indexes`.
pub evrf_public_keys: Vec<([u8; 32], Vec<u8>)>, pub evrf_public_keys: Vec<([u8; 32], Vec<u8>)>,
/// The participant indexes, indexed by their validator.
#[borsh(skip)]
pub participant_indexes: HashMap<SeraiAddress, Vec<Participant>>,
/// The validators, indexed by their participant indexes.
#[borsh(skip)]
pub participant_indexes_reverse_lookup: HashMap<Participant, SeraiAddress>,
}
impl NewSetInformation {
fn init_participant_indexes(&mut self) {
let mut next_i = 1;
self.participant_indexes = HashMap::with_capacity(self.validators.len());
self.participant_indexes_reverse_lookup = HashMap::with_capacity(self.validators.len());
for (validator, weight) in &self.validators {
let mut these_is = Vec::with_capacity((*weight).into());
for _ in 0 .. *weight {
let this_i = Participant::new(next_i).unwrap();
next_i += 1;
these_is.push(this_i);
self.participant_indexes_reverse_lookup.insert(this_i, *validator);
}
self.participant_indexes.insert(*validator, these_is);
}
}
} }
mod _public_db { mod _public_db {
@@ -175,8 +190,6 @@ impl Keys {
pub struct SignedBatches; pub struct SignedBatches;
impl SignedBatches { impl SignedBatches {
/// Send a `SignedBatch` to publish onto Serai. /// Send a `SignedBatch` to publish onto Serai.
///
/// These will be published sequentially. Out-of-order sending risks hanging the task.
pub fn send(txn: &mut impl DbTxn, batch: &SignedBatch) { pub fn send(txn: &mut impl DbTxn, batch: &SignedBatch) {
_public_db::SignedBatches::send(txn, batch.batch.network, batch); _public_db::SignedBatches::send(txn, batch.batch.network, batch);
} }
@@ -185,10 +198,6 @@ impl SignedBatches {
} }
} }
/// The slash report was invalid.
#[derive(Debug)]
pub struct InvalidSlashReport;
/// The slash reports to publish onto Serai. /// The slash reports to publish onto Serai.
pub struct SlashReports; pub struct SlashReports;
impl SlashReports { impl SlashReports {
@@ -196,30 +205,25 @@ impl SlashReports {
/// ///
/// This only saves the most recent slashes as only a single session is eligible to have its /// This only saves the most recent slashes as only a single session is eligible to have its
/// slashes reported at once. /// slashes reported at once.
///
/// Returns Err if the slashes are invalid. Returns Ok if the slashes weren't detected as
/// invalid. Slashes may be considered invalid by the Serai blockchain later even if not detected
/// as invalid here.
pub fn set( pub fn set(
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
set: ValidatorSet, set: ValidatorSet,
slashes: Vec<(SeraiAddress, u32)>, slash_report: SlashReport,
signature: Signature, signature: Signature,
) -> Result<(), InvalidSlashReport> { ) {
// If we have a more recent slash report, don't write this historic one // If we have a more recent slash report, don't write this historic one
if let Some((existing_session, _)) = _public_db::SlashReports::get(txn, set.network) { if let Some((existing_session, _)) = _public_db::SlashReports::get(txn, set.network) {
if existing_session.0 >= set.session.0 { if existing_session.0 >= set.session.0 {
return Ok(()); return;
} }
} }
let tx = serai_client::validator_sets::SeraiValidatorSets::report_slashes( let tx = serai_client::validator_sets::SeraiValidatorSets::report_slashes(
set.network, set.network,
slashes.try_into().map_err(|_| InvalidSlashReport)?, slash_report,
signature, signature,
); );
_public_db::SlashReports::set(txn, set.network, &(set.session, tx.encode())); _public_db::SlashReports::set(txn, set.network, &(set.session, tx.encode()));
Ok(())
} }
pub(crate) fn take(txn: &mut impl DbTxn, network: NetworkId) -> Option<(Session, Transaction)> { pub(crate) fn take(txn: &mut impl DbTxn, network: NetworkId) -> Option<(Session, Transaction)> {
let (session, tx) = _public_db::SlashReports::take(txn, network)?; let (session, tx) = _public_db::SlashReports::take(txn, network)?;

View File

@@ -1,14 +1,21 @@
use core::future::Future; use core::future::Future;
use std::sync::Arc; use std::sync::Arc;
use serai_db::{DbTxn, Db}; #[rustfmt::skip]
use serai_client::{primitives::NetworkId, in_instructions::primitives::SignedBatch, SeraiError, Serai};
use serai_client::{primitives::NetworkId, SeraiError, Serai};
use serai_db::{Get, DbTxn, Db, create_db};
use serai_task::ContinuallyRan; use serai_task::ContinuallyRan;
use crate::SignedBatches; use crate::SignedBatches;
create_db!(
CoordinatorSubstrate {
LastPublishedBatch: (network: NetworkId) -> u32,
BatchesToPublish: (network: NetworkId, batch: u32) -> SignedBatch,
}
);
/// Publish `SignedBatch`s from `SignedBatches` onto Serai. /// Publish `SignedBatch`s from `SignedBatches` onto Serai.
pub struct PublishBatchTask<D: Db> { pub struct PublishBatchTask<D: Db> {
db: D, db: D,
@@ -34,32 +41,52 @@ impl<D: Db> ContinuallyRan for PublishBatchTask<D> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> { fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move { async move {
let mut made_progress = false; // Read from SignedBatches, which is sequential, into our own mapping
loop { loop {
let mut txn = self.db.txn(); let mut txn = self.db.txn();
let Some(batch) = SignedBatches::try_recv(&mut txn, self.network) else { let Some(batch) = SignedBatches::try_recv(&mut txn, self.network) else {
// No batch to publish at this time
break; break;
}; };
// Publish this Batch if it hasn't already been published // If this is a Batch not yet published, save it into our unordered mapping
if LastPublishedBatch::get(&txn, self.network) < Some(batch.batch.id) {
BatchesToPublish::set(&mut txn, self.network, batch.batch.id, &batch);
}
txn.commit();
}
// Synchronize our last published batch with the Serai network's
let next_to_publish = {
// This uses the latest finalized block, not the latest cosigned block, which should be
// fine as in the worst case, the only impact is no longer attempting TX publication
let serai = self.serai.as_of_latest_finalized_block().await?; let serai = self.serai.as_of_latest_finalized_block().await?;
let last_batch = serai.in_instructions().last_batch_for_network(self.network).await?; let last_batch = serai.in_instructions().last_batch_for_network(self.network).await?;
if last_batch < Some(batch.batch.id) {
// This stream of Batches *should* be sequential within the larger context of the Serai let mut txn = self.db.txn();
// coordinator. In this library, we use a more relaxed definition and don't assert let mut our_last_batch = LastPublishedBatch::get(&txn, self.network);
// sequence. This does risk hanging the task, if Batch #n+1 is sent before Batch #n, but while our_last_batch < last_batch {
// that is a documented fault of the `SignedBatches` API. let next_batch = our_last_batch.map(|batch| batch + 1).unwrap_or(0);
// Clean up the Batch to publish since it's already been published
BatchesToPublish::take(&mut txn, self.network, next_batch);
our_last_batch = Some(next_batch);
}
if let Some(last_batch) = our_last_batch {
LastPublishedBatch::set(&mut txn, self.network, &last_batch);
}
last_batch.map(|batch| batch + 1).unwrap_or(0)
};
let made_progress =
if let Some(batch) = BatchesToPublish::get(&self.db, self.network, next_to_publish) {
self self
.serai .serai
.publish(&serai_client::in_instructions::SeraiInInstructions::execute_batch(batch)) .publish(&serai_client::in_instructions::SeraiInInstructions::execute_batch(batch))
.await?; .await?;
} true
} else {
txn.commit(); false
made_progress = true; };
}
Ok(made_progress) Ok(made_progress)
} }
} }

View File

@@ -22,66 +22,82 @@ impl<D: Db> PublishSlashReportTask<D> {
} }
} }
impl<D: Db> PublishSlashReportTask<D> {
// Returns if a slash report was successfully published
async fn publish(&mut self, network: NetworkId) -> Result<bool, String> {
let mut txn = self.db.txn();
let Some((session, slash_report)) = SlashReports::take(&mut txn, network) else {
// No slash report to publish
return Ok(false);
};
// This uses the latest finalized block, not the latest cosigned block, which should be
// fine as in the worst case, the only impact is no longer attempting TX publication
let serai = self.serai.as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?;
let serai = serai.validator_sets();
let session_after_slash_report = Session(session.0 + 1);
let current_session = serai.session(network).await.map_err(|e| format!("{e:?}"))?;
let current_session = current_session.map(|session| session.0);
// Only attempt to publish the slash report for session #n while session #n+1 is still
// active
let session_after_slash_report_retired = current_session > Some(session_after_slash_report.0);
if session_after_slash_report_retired {
// Commit the txn to drain this slash report from the database and not try it again later
txn.commit();
return Ok(false);
}
if Some(session_after_slash_report.0) != current_session {
// We already checked the current session wasn't greater, and they're not equal
assert!(current_session < Some(session_after_slash_report.0));
// This would mean the Serai node is resyncing and is behind where it prior was
Err("have a slash report for a session Serai has yet to retire".to_string())?;
}
// If this session which should publish a slash report already has, move on
let key_pending_slash_report =
serai.key_pending_slash_report(network).await.map_err(|e| format!("{e:?}"))?;
if key_pending_slash_report.is_none() {
txn.commit();
return Ok(false);
};
match self.serai.publish(&slash_report).await {
Ok(()) => {
txn.commit();
Ok(true)
}
// This could be specific to this TX (such as an already in mempool error) and it may be
// worthwhile to continue iteration with the other pending slash reports. We assume this
// error ephemeral and that the latency incurred for this ephemeral error to resolve is
// miniscule compared to the window available to publish the slash report. That makes
// this a non-issue.
Err(e) => Err(format!("couldn't publish slash report transaction: {e:?}")),
}
}
}
impl<D: Db> ContinuallyRan for PublishSlashReportTask<D> { impl<D: Db> ContinuallyRan for PublishSlashReportTask<D> {
type Error = String; type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> { fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move { async move {
let mut made_progress = false; let mut made_progress = false;
let mut error = None;
for network in serai_client::primitives::NETWORKS { for network in serai_client::primitives::NETWORKS {
if network == NetworkId::Serai { if network == NetworkId::Serai {
continue; continue;
}; };
let mut txn = self.db.txn(); let network_res = self.publish(network).await;
let Some((session, slash_report)) = SlashReports::take(&mut txn, network) else { // We made progress if any network successfully published their slash report
// No slash report to publish made_progress |= network_res == Ok(true);
continue; // We want to yield the first error *after* attempting for every network
}; error = error.or(network_res.err());
}
let serai = // Yield the error
self.serai.as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?; if let Some(error) = error {
let serai = serai.validator_sets(); Err(error)?
let session_after_slash_report = Session(session.0 + 1);
let current_session = serai.session(network).await.map_err(|e| format!("{e:?}"))?;
let current_session = current_session.map(|session| session.0);
// Only attempt to publish the slash report for session #n while session #n+1 is still
// active
let session_after_slash_report_retired =
current_session > Some(session_after_slash_report.0);
if session_after_slash_report_retired {
// Commit the txn to drain this slash report from the database and not try it again later
txn.commit();
continue;
}
if Some(session_after_slash_report.0) != current_session {
// We already checked the current session wasn't greater, and they're not equal
assert!(current_session < Some(session_after_slash_report.0));
// This would mean the Serai node is resyncing and is behind where it prior was
Err("have a slash report for a session Serai has yet to retire".to_string())?;
}
// If this session which should publish a slash report already has, move on
let key_pending_slash_report =
serai.key_pending_slash_report(network).await.map_err(|e| format!("{e:?}"))?;
if key_pending_slash_report.is_none() {
txn.commit();
continue;
};
match self.serai.publish(&slash_report).await {
Ok(()) => {
txn.commit();
made_progress = true;
}
// This could be specific to this TX (such as an already in mempool error) and it may be
// worthwhile to continue iteration with the other pending slash reports. We assume this
// error ephemeral and that the latency incurred for this ephemeral error to resolve is
// miniscule compared to the window available to publish the slash report. That makes
// this a non-issue.
Err(e) => Err(format!("couldn't publish slash report transaction: {e:?}"))?,
}
} }
Ok(made_progress) Ok(made_progress)
} }

View File

@@ -39,6 +39,8 @@ impl<D: Db> ContinuallyRan for SetKeysTask<D> {
continue; continue;
}; };
// This uses the latest finalized block, not the latest cosigned block, which should be
// fine as in the worst case, the only impact is no longer attempting TX publication
let serai = let serai =
self.serai.as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?; self.serai.as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?;
let serai = serai.validator_sets(); let serai = serai.validator_sets();

View File

@@ -21,13 +21,14 @@ workspace = true
zeroize = { version = "^1.5", default-features = false, features = ["std"] } zeroize = { version = "^1.5", default-features = false, features = ["std"] }
rand_core = { version = "0.6", default-features = false, features = ["std"] } rand_core = { version = "0.6", default-features = false, features = ["std"] }
blake2 = { version = "0.10", default-features = false, features = ["std"] }
ciphersuite = { path = "../../crypto/ciphersuite", default-features = false, features = ["std"] }
schnorr = { package = "schnorr-signatures", path = "../../crypto/schnorr", default-features = false, features = ["std"] }
scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive"] } scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive"] }
borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] } borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] }
blake2 = { version = "0.10", default-features = false, features = ["std"] }
ciphersuite = { path = "../../crypto/ciphersuite", default-features = false, features = ["std"] }
dkg = { path = "../../crypto/dkg", default-features = false, features = ["std"] }
schnorr = { package = "schnorr-signatures", path = "../../crypto/schnorr", default-features = false, features = ["std"] }
serai-client = { path = "../../substrate/client", default-features = false, features = ["serai", "borsh"] } serai-client = { path = "../../substrate/client", default-features = false, features = ["serai", "borsh"] }
serai-db = { path = "../../common/db" } serai-db = { path = "../../common/db" }

View File

@@ -15,20 +15,35 @@ use crate::transaction::SigningProtocolRound;
/// A topic within the database which the group participates in /// A topic within the database which the group participates in
#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, BorshSerialize, BorshDeserialize)] #[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, BorshSerialize, BorshDeserialize)]
pub(crate) enum Topic { pub enum Topic {
/// Vote to remove a participant /// Vote to remove a participant
RemoveParticipant { participant: SeraiAddress }, RemoveParticipant {
/// The participant to remove
participant: SeraiAddress,
},
// DkgParticipation isn't represented here as participations are immediately sent to the // DkgParticipation isn't represented here as participations are immediately sent to the
// processor, not accumulated within this databse // processor, not accumulated within this databse
/// Participation in the signing protocol to confirm the DKG results on Substrate /// Participation in the signing protocol to confirm the DKG results on Substrate
DkgConfirmation { attempt: u32, round: SigningProtocolRound }, DkgConfirmation {
/// The attempt number this is for
attempt: u32,
/// The round of the signing protocol
round: SigningProtocolRound,
},
/// The local view of the SlashReport, to be aggregated into the final SlashReport /// The local view of the SlashReport, to be aggregated into the final SlashReport
SlashReport, SlashReport,
/// Participation in a signing protocol /// Participation in a signing protocol
Sign { id: VariantSignId, attempt: u32, round: SigningProtocolRound }, Sign {
/// The ID of the signing protocol
id: VariantSignId,
/// The attempt number this is for
attempt: u32,
/// The round of the signing protocol
round: SigningProtocolRound,
},
} }
enum Participating { enum Participating {
@@ -79,9 +94,9 @@ impl Topic {
} }
} }
// The SignId for this topic /// The SignId for this topic
// ///
// Returns None if Topic isn't Topic::Sign /// Returns None if Topic isn't Topic::Sign
pub(crate) fn sign_id(self, set: ValidatorSet) -> Option<messages::sign::SignId> { pub(crate) fn sign_id(self, set: ValidatorSet) -> Option<messages::sign::SignId> {
#[allow(clippy::match_same_arms)] #[allow(clippy::match_same_arms)]
match self { match self {
@@ -92,6 +107,33 @@ impl Topic {
} }
} }
/// The SignId for this DKG Confirmation.
///
/// This is undefined except for being consistent to the DKG Confirmation signing protocol and
/// unique across sets.
///
/// Returns None if Topic isn't Topic::DkgConfirmation.
pub(crate) fn dkg_confirmation_sign_id(
self,
set: ValidatorSet,
) -> Option<messages::sign::SignId> {
#[allow(clippy::match_same_arms)]
match self {
Topic::RemoveParticipant { .. } => None,
Topic::DkgConfirmation { attempt, round: _ } => Some({
let id = {
let mut id = [0; 32];
let encoded_set = set.encode();
id[.. encoded_set.len()].copy_from_slice(&encoded_set);
VariantSignId::Batch(id)
};
SignId { session: set.session, id, attempt }
}),
Topic::SlashReport { .. } => None,
Topic::Sign { .. } => None,
}
}
/// The topic which precedes this topic as a prerequisite /// The topic which precedes this topic as a prerequisite
/// ///
/// The preceding topic must define this topic as succeeding /// The preceding topic must define this topic as succeeding
@@ -138,21 +180,22 @@ impl Topic {
} }
} }
fn requires_whitelisting(&self) -> bool { /// If this topic requires recognition before entries are permitted for it.
pub fn requires_recognition(&self) -> bool {
#[allow(clippy::match_same_arms)] #[allow(clippy::match_same_arms)]
match self { match self {
// We don't require whitelisting to remove a participant // We don't require recognition to remove a participant
Topic::RemoveParticipant { .. } => false, Topic::RemoveParticipant { .. } => false,
// We don't require whitelisting for the first attempt, solely the re-attempts // We don't require recognition for the first attempt, solely the re-attempts
Topic::DkgConfirmation { attempt, .. } => *attempt != 0, Topic::DkgConfirmation { attempt, .. } => *attempt != 0,
// We don't require whitelisting for the slash report // We don't require recognition for the slash report
Topic::SlashReport { .. } => false, Topic::SlashReport { .. } => false,
// We do require whitelisting for every sign protocol // We do require recognition for every sign protocol
Topic::Sign { .. } => true, Topic::Sign { .. } => true,
} }
} }
fn required_participation(&self, n: u64) -> u64 { fn required_participation(&self, n: u16) -> u16 {
let _ = self; let _ = self;
// All of our topics require 2/3rds participation // All of our topics require 2/3rds participation
((2 * n) / 3) + 1 ((2 * n) / 3) + 1
@@ -198,11 +241,11 @@ create_db!(
// If this block has already been cosigned. // If this block has already been cosigned.
Cosigned: (set: ValidatorSet, substrate_block_hash: [u8; 32]) -> (), Cosigned: (set: ValidatorSet, substrate_block_hash: [u8; 32]) -> (),
// The plans to whitelist upon a `Transaction::SubstrateBlock` being included on-chain. // The plans to recognize upon a `Transaction::SubstrateBlock` being included on-chain.
SubstrateBlockPlans: (set: ValidatorSet, substrate_block_hash: [u8; 32]) -> Vec<[u8; 32]>, SubstrateBlockPlans: (set: ValidatorSet, substrate_block_hash: [u8; 32]) -> Vec<[u8; 32]>,
// The weight accumulated for a topic. // The weight accumulated for a topic.
AccumulatedWeight: (set: ValidatorSet, topic: Topic) -> u64, AccumulatedWeight: (set: ValidatorSet, topic: Topic) -> u16,
// The entries accumulated for a topic, by validator. // The entries accumulated for a topic, by validator.
Accumulated: <D: Borshy>(set: ValidatorSet, topic: Topic, validator: SeraiAddress) -> D, Accumulated: <D: Borshy>(set: ValidatorSet, topic: Topic, validator: SeraiAddress) -> D,
@@ -213,7 +256,12 @@ create_db!(
db_channel!( db_channel!(
CoordinatorTributary { CoordinatorTributary {
// Messages to send to the processor
ProcessorMessages: (set: ValidatorSet) -> messages::CoordinatorMessage, ProcessorMessages: (set: ValidatorSet) -> messages::CoordinatorMessage,
// Messages for the DKG confirmation
DkgConfirmationMessages: (set: ValidatorSet) -> messages::sign::CoordinatorMessage,
// Topics which have been explicitly recognized
RecognizedTopics: (set: ValidatorSet) -> Topic,
} }
); );
@@ -262,7 +310,7 @@ impl TributaryDb {
); );
ActivelyCosigning::set(txn, set, &substrate_block_hash); ActivelyCosigning::set(txn, set, &substrate_block_hash);
TributaryDb::recognize_topic( Self::recognize_topic(
txn, txn,
set, set,
Topic::Sign { Topic::Sign {
@@ -292,6 +340,10 @@ impl TributaryDb {
pub(crate) fn recognize_topic(txn: &mut impl DbTxn, set: ValidatorSet, topic: Topic) { pub(crate) fn recognize_topic(txn: &mut impl DbTxn, set: ValidatorSet, topic: Topic) {
AccumulatedWeight::set(txn, set, topic, &0); AccumulatedWeight::set(txn, set, topic, &0);
RecognizedTopics::send(txn, set, &topic);
}
pub(crate) fn recognized(getter: &impl Get, set: ValidatorSet, topic: Topic) -> bool {
AccumulatedWeight::get(getter, set, topic).is_some()
} }
pub(crate) fn start_of_block(txn: &mut impl DbTxn, set: ValidatorSet, block_number: u64) { pub(crate) fn start_of_block(txn: &mut impl DbTxn, set: ValidatorSet, block_number: u64) {
@@ -312,6 +364,12 @@ impl TributaryDb {
Self::recognize_topic(txn, set, topic); Self::recognize_topic(txn, set, topic);
if let Some(id) = topic.sign_id(set) { if let Some(id) = topic.sign_id(set) {
Self::send_message(txn, set, messages::sign::CoordinatorMessage::Reattempt { id }); Self::send_message(txn, set, messages::sign::CoordinatorMessage::Reattempt { id });
} else if let Some(id) = topic.dkg_confirmation_sign_id(set) {
DkgConfirmationMessages::send(
txn,
set,
&messages::sign::CoordinatorMessage::Reattempt { id },
);
} }
} }
} }
@@ -339,19 +397,24 @@ impl TributaryDb {
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
set: ValidatorSet, set: ValidatorSet,
validators: &[SeraiAddress], validators: &[SeraiAddress],
total_weight: u64, total_weight: u16,
block_number: u64, block_number: u64,
topic: Topic, topic: Topic,
validator: SeraiAddress, validator: SeraiAddress,
validator_weight: u64, validator_weight: u16,
data: &D, data: &D,
) -> DataSet<D> { ) -> DataSet<D> {
// This function will only be called once for a (validator, topic) tuple due to how we handle // This function will only be called once for a (validator, topic) tuple due to how we handle
// nonces on transactions (deterministically to the topic) // nonces on transactions (deterministically to the topic)
let accumulated_weight = AccumulatedWeight::get(txn, set, topic); let accumulated_weight = AccumulatedWeight::get(txn, set, topic);
if topic.requires_whitelisting() && accumulated_weight.is_none() { if topic.requires_recognition() && accumulated_weight.is_none() {
Self::fatal_slash(txn, set, validator, "participated in unrecognized topic"); Self::fatal_slash(
txn,
set,
validator,
"participated in unrecognized topic which requires recognition",
);
return DataSet::None; return DataSet::None;
} }
let mut accumulated_weight = accumulated_weight.unwrap_or(0); let mut accumulated_weight = accumulated_weight.unwrap_or(0);

View File

@@ -6,6 +6,7 @@ use core::{marker::PhantomData, future::Future};
use std::collections::HashMap; use std::collections::HashMap;
use ciphersuite::group::GroupEncoding; use ciphersuite::group::GroupEncoding;
use dkg::Participant;
use serai_client::{ use serai_client::{
primitives::SeraiAddress, primitives::SeraiAddress,
@@ -27,13 +28,14 @@ use tributary_sdk::{
use serai_cosign::CosignIntent; use serai_cosign::CosignIntent;
use serai_coordinator_substrate::NewSetInformation; use serai_coordinator_substrate::NewSetInformation;
use messages::sign::VariantSignId; use messages::sign::{VariantSignId, SignId};
mod transaction; mod transaction;
pub use transaction::{SigningProtocolRound, Signed, Transaction}; pub use transaction::{SigningProtocolRound, Signed, Transaction};
mod db; mod db;
use db::*; use db::*;
pub use db::Topic;
/// Messages to send to the Processors. /// Messages to send to the Processors.
pub struct ProcessorMessages; pub struct ProcessorMessages;
@@ -44,6 +46,24 @@ impl ProcessorMessages {
} }
} }
/// Messages for the DKG confirmation.
pub struct DkgConfirmationMessages;
impl DkgConfirmationMessages {
/// Receive a message for the DKG confirmation.
///
/// These messages use the ProcessorMessage API as that's what existing flows are designed
/// around, enabling their reuse. The ProcessorMessage includes a VariantSignId which isn't
/// applicable to the DKG confirmation (as there's no such variant of the VariantSignId). The
/// actual ID is undefined other than it will be consistent to the signing protocol and unique
/// across validator sets, with no guarantees of uniqueness across contexts.
pub fn try_recv(
txn: &mut impl DbTxn,
set: ValidatorSet,
) -> Option<messages::sign::CoordinatorMessage> {
db::DkgConfirmationMessages::try_recv(txn, set)
}
}
/// The cosign intents. /// The cosign intents.
pub struct CosignIntents; pub struct CosignIntents;
impl CosignIntents { impl CosignIntents {
@@ -62,10 +82,28 @@ impl CosignIntents {
} }
} }
/// The plans to whitelist upon a `Transaction::SubstrateBlock` being included on-chain. /// An interface to the topics recognized on this Tributary.
pub struct RecognizedTopics;
impl RecognizedTopics {
/// If this topic has been recognized by this Tributary.
///
/// This will either be by explicit recognition or participation.
pub fn recognized(getter: &impl Get, set: ValidatorSet, topic: Topic) -> bool {
TributaryDb::recognized(getter, set, topic)
}
/// The next topic requiring recognition which has been recognized by this Tributary.
pub fn try_recv_topic_requiring_recognition(
txn: &mut impl DbTxn,
set: ValidatorSet,
) -> Option<Topic> {
db::RecognizedTopics::try_recv(txn, set)
}
}
/// The plans to recognize upon a `Transaction::SubstrateBlock` being included on-chain.
pub struct SubstrateBlockPlans; pub struct SubstrateBlockPlans;
impl SubstrateBlockPlans { impl SubstrateBlockPlans {
/// Set the plans to whitelist upon the associated `Transaction::SubstrateBlock` being included /// Set the plans to recognize upon the associated `Transaction::SubstrateBlock` being included
/// on-chain. /// on-chain.
/// ///
/// This must be done before the associated `Transaction::Cosign` is provided. /// This must be done before the associated `Transaction::Cosign` is provided.
@@ -75,7 +113,7 @@ impl SubstrateBlockPlans {
substrate_block_hash: [u8; 32], substrate_block_hash: [u8; 32],
plans: &Vec<[u8; 32]>, plans: &Vec<[u8; 32]>,
) { ) {
db::SubstrateBlockPlans::set(txn, set, substrate_block_hash, &plans); db::SubstrateBlockPlans::set(txn, set, substrate_block_hash, plans);
} }
fn take( fn take(
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
@@ -90,32 +128,32 @@ struct ScanBlock<'a, TD: Db, TDT: DbTxn, P: P2p> {
_td: PhantomData<TD>, _td: PhantomData<TD>,
_p2p: PhantomData<P>, _p2p: PhantomData<P>,
tributary_txn: &'a mut TDT, tributary_txn: &'a mut TDT,
set: ValidatorSet, set: &'a NewSetInformation,
validators: &'a [SeraiAddress], validators: &'a [SeraiAddress],
total_weight: u64, total_weight: u16,
validator_weights: &'a HashMap<SeraiAddress, u64>, validator_weights: &'a HashMap<SeraiAddress, u16>,
} }
impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
fn potentially_start_cosign(&mut self) { fn potentially_start_cosign(&mut self) {
// Don't start a new cosigning instance if we're actively running one // Don't start a new cosigning instance if we're actively running one
if TributaryDb::actively_cosigning(self.tributary_txn, self.set).is_some() { if TributaryDb::actively_cosigning(self.tributary_txn, self.set.set).is_some() {
return; return;
} }
// Fetch the latest intended-to-be-cosigned block // Fetch the latest intended-to-be-cosigned block
let Some(latest_substrate_block_to_cosign) = let Some(latest_substrate_block_to_cosign) =
TributaryDb::latest_substrate_block_to_cosign(self.tributary_txn, self.set) TributaryDb::latest_substrate_block_to_cosign(self.tributary_txn, self.set.set)
else { else {
return; return;
}; };
// If it was already cosigned, return // If it was already cosigned, return
if TributaryDb::cosigned(self.tributary_txn, self.set, latest_substrate_block_to_cosign) { if TributaryDb::cosigned(self.tributary_txn, self.set.set, latest_substrate_block_to_cosign) {
return; return;
} }
let intent = let intent =
CosignIntents::take(self.tributary_txn, self.set, latest_substrate_block_to_cosign) CosignIntents::take(self.tributary_txn, self.set.set, latest_substrate_block_to_cosign)
.expect("Transaction::Cosign locally provided but CosignIntents wasn't populated"); .expect("Transaction::Cosign locally provided but CosignIntents wasn't populated");
assert_eq!( assert_eq!(
intent.block_hash, latest_substrate_block_to_cosign, intent.block_hash, latest_substrate_block_to_cosign,
@@ -125,20 +163,71 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
// Mark us as actively cosigning // Mark us as actively cosigning
TributaryDb::start_cosigning( TributaryDb::start_cosigning(
self.tributary_txn, self.tributary_txn,
self.set, self.set.set,
latest_substrate_block_to_cosign, latest_substrate_block_to_cosign,
intent.block_number, intent.block_number,
); );
// Send the message for the processor to start signing // Send the message for the processor to start signing
TributaryDb::send_message( TributaryDb::send_message(
self.tributary_txn, self.tributary_txn,
self.set, self.set.set,
messages::coordinator::CoordinatorMessage::CosignSubstrateBlock { messages::coordinator::CoordinatorMessage::CosignSubstrateBlock {
session: self.set.session, session: self.set.set.session,
intent, intent,
}, },
); );
} }
fn accumulate_dkg_confirmation<D: AsRef<[u8]> + Borshy>(
&mut self,
block_number: u64,
topic: Topic,
data: &D,
signer: SeraiAddress,
) -> Option<(SignId, HashMap<Participant, Vec<u8>>)> {
match TributaryDb::accumulate::<D>(
self.tributary_txn,
self.set.set,
self.validators,
self.total_weight,
block_number,
topic,
signer,
self.validator_weights[&signer],
data,
) {
DataSet::None => None,
DataSet::Participating(data_set) => {
let id = topic.dkg_confirmation_sign_id(self.set.set).unwrap();
// This will be used in a MuSig protocol, so the Participant indexes are the validator's
// position in the list regardless of their weight
let flatten_data_set = |data_set: HashMap<_, D>| {
let mut entries = HashMap::with_capacity(usize::from(self.total_weight));
for (validator, participation) in data_set {
let (index, (_validator, _weight)) = &self
.set
.validators
.iter()
.enumerate()
.find(|(_i, (validator_i, _weight))| validator == *validator_i)
.unwrap();
// The index is zero-indexed yet participants are one-indexed
let index = index + 1;
entries.insert(
Participant::new(u16::try_from(index).unwrap()).unwrap(),
participation.as_ref().to_vec(),
);
}
entries
};
let data_set = flatten_data_set(data_set);
Some((id, data_set))
}
}
}
fn handle_application_tx(&mut self, block_number: u64, tx: Transaction) { fn handle_application_tx(&mut self, block_number: u64, tx: Transaction) {
let signer = |signed: Signed| SeraiAddress(signed.signer().to_bytes()); let signer = |signed: Signed| SeraiAddress(signed.signer().to_bytes());
@@ -147,13 +236,14 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
// TODO: The fact they can publish these TXs makes this a notable spam vector // TODO: The fact they can publish these TXs makes this a notable spam vector
if TributaryDb::is_fatally_slashed( if TributaryDb::is_fatally_slashed(
self.tributary_txn, self.tributary_txn,
self.set, self.set.set,
SeraiAddress(signer.to_bytes()), SeraiAddress(signer.to_bytes()),
) { ) {
return; return;
} }
} }
let topic = tx.topic();
match tx { match tx {
// Accumulate this vote and fatally slash the participant if past the threshold // Accumulate this vote and fatally slash the participant if past the threshold
Transaction::RemoveParticipant { participant, signed } => { Transaction::RemoveParticipant { participant, signed } => {
@@ -163,7 +253,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
if !self.validators.iter().any(|validator| *validator == participant) { if !self.validators.iter().any(|validator| *validator == participant) {
TributaryDb::fatal_slash( TributaryDb::fatal_slash(
self.tributary_txn, self.tributary_txn,
self.set, self.set.set,
signer, signer,
"voted to remove non-existent participant", "voted to remove non-existent participant",
); );
@@ -172,18 +262,23 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
match TributaryDb::accumulate( match TributaryDb::accumulate(
self.tributary_txn, self.tributary_txn,
self.set, self.set.set,
self.validators, self.validators,
self.total_weight, self.total_weight,
block_number, block_number,
Topic::RemoveParticipant { participant }, topic.unwrap(),
signer, signer,
self.validator_weights[&signer], self.validator_weights[&signer],
&(), &(),
) { ) {
DataSet::None => {} DataSet::None => {}
DataSet::Participating(_) => { DataSet::Participating(_) => {
TributaryDb::fatal_slash(self.tributary_txn, self.set, participant, "voted to remove"); TributaryDb::fatal_slash(
self.tributary_txn,
self.set.set,
participant,
"voted to remove",
);
} }
}; };
} }
@@ -192,28 +287,52 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
Transaction::DkgParticipation { participation, signed } => { Transaction::DkgParticipation { participation, signed } => {
TributaryDb::send_message( TributaryDb::send_message(
self.tributary_txn, self.tributary_txn,
self.set, self.set.set,
messages::key_gen::CoordinatorMessage::Participation { messages::key_gen::CoordinatorMessage::Participation {
session: self.set.session, session: self.set.set.session,
participant: todo!("TODO"), participant: self.set.participant_indexes[&signer(signed)][0],
participation, participation,
}, },
); );
} }
Transaction::DkgConfirmationPreprocess { attempt, preprocess, signed } => { Transaction::DkgConfirmationPreprocess { attempt: _, preprocess, signed } => {
// Accumulate the preprocesses into our own FROST attempt manager let topic = topic.unwrap();
todo!("TODO") let signer = signer(signed);
let Some((id, data_set)) =
self.accumulate_dkg_confirmation(block_number, topic, &preprocess, signer)
else {
return;
};
db::DkgConfirmationMessages::send(
self.tributary_txn,
self.set.set,
&messages::sign::CoordinatorMessage::Preprocesses { id, preprocesses: data_set },
);
} }
Transaction::DkgConfirmationShare { attempt, share, signed } => { Transaction::DkgConfirmationShare { attempt: _, share, signed } => {
// Accumulate the shares into our own FROST attempt manager let topic = topic.unwrap();
todo!("TODO: SetKeysTask") let signer = signer(signed);
let Some((id, data_set)) =
self.accumulate_dkg_confirmation(block_number, topic, &share, signer)
else {
return;
};
db::DkgConfirmationMessages::send(
self.tributary_txn,
self.set.set,
&messages::sign::CoordinatorMessage::Shares { id, shares: data_set },
);
} }
Transaction::Cosign { substrate_block_hash } => { Transaction::Cosign { substrate_block_hash } => {
// Update the latest intended-to-be-cosigned Substrate block // Update the latest intended-to-be-cosigned Substrate block
TributaryDb::set_latest_substrate_block_to_cosign( TributaryDb::set_latest_substrate_block_to_cosign(
self.tributary_txn, self.tributary_txn,
self.set, self.set.set,
substrate_block_hash, substrate_block_hash,
); );
// Start a new cosign if we aren't already working on one // Start a new cosign if we aren't already working on one
@@ -226,32 +345,32 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
not-yet-Cosigned cosigns, we flag all cosigned blocks as cosigned. Then, when we choose not-yet-Cosigned cosigns, we flag all cosigned blocks as cosigned. Then, when we choose
the next block to work on, we won't if it's already been cosigned. the next block to work on, we won't if it's already been cosigned.
*/ */
TributaryDb::mark_cosigned(self.tributary_txn, self.set, substrate_block_hash); TributaryDb::mark_cosigned(self.tributary_txn, self.set.set, substrate_block_hash);
// If we aren't actively cosigning this block, return // If we aren't actively cosigning this block, return
// This occurs when we have Cosign TXs A, B, C, we received Cosigned for A and start on C, // This occurs when we have Cosign TXs A, B, C, we received Cosigned for A and start on C,
// and then receive Cosigned for B // and then receive Cosigned for B
if TributaryDb::actively_cosigning(self.tributary_txn, self.set) != if TributaryDb::actively_cosigning(self.tributary_txn, self.set.set) !=
Some(substrate_block_hash) Some(substrate_block_hash)
{ {
return; return;
} }
// Since this is the block we were cosigning, mark us as having finished cosigning // Since this is the block we were cosigning, mark us as having finished cosigning
TributaryDb::finish_cosigning(self.tributary_txn, self.set); TributaryDb::finish_cosigning(self.tributary_txn, self.set.set);
// Start working on the next cosign // Start working on the next cosign
self.potentially_start_cosign(); self.potentially_start_cosign();
} }
Transaction::SubstrateBlock { hash } => { Transaction::SubstrateBlock { hash } => {
// Whitelist all of the IDs this Substrate block causes to be signed // Recognize all of the IDs this Substrate block causes to be signed
let plans = SubstrateBlockPlans::take(self.tributary_txn, self.set, hash).expect( let plans = SubstrateBlockPlans::take(self.tributary_txn, self.set.set, hash).expect(
"Transaction::SubstrateBlock locally provided but SubstrateBlockPlans wasn't populated", "Transaction::SubstrateBlock locally provided but SubstrateBlockPlans wasn't populated",
); );
for plan in plans { for plan in plans {
TributaryDb::recognize_topic( TributaryDb::recognize_topic(
self.tributary_txn, self.tributary_txn,
self.set, self.set.set,
Topic::Sign { Topic::Sign {
id: VariantSignId::Transaction(plan), id: VariantSignId::Transaction(plan),
attempt: 0, attempt: 0,
@@ -261,10 +380,10 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
} }
} }
Transaction::Batch { hash } => { Transaction::Batch { hash } => {
// Whitelist the signing of this batch // Recognize the signing of this batch
TributaryDb::recognize_topic( TributaryDb::recognize_topic(
self.tributary_txn, self.tributary_txn,
self.set, self.set.set,
Topic::Sign { Topic::Sign {
id: VariantSignId::Batch(hash), id: VariantSignId::Batch(hash),
attempt: 0, attempt: 0,
@@ -279,7 +398,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
if slash_points.len() != self.validators.len() { if slash_points.len() != self.validators.len() {
TributaryDb::fatal_slash( TributaryDb::fatal_slash(
self.tributary_txn, self.tributary_txn,
self.set, self.set.set,
signer, signer,
"slash report was for a distinct amount of signers", "slash report was for a distinct amount of signers",
); );
@@ -289,11 +408,11 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
// Accumulate, and if past the threshold, calculate *the* slash report and start signing it // Accumulate, and if past the threshold, calculate *the* slash report and start signing it
match TributaryDb::accumulate( match TributaryDb::accumulate(
self.tributary_txn, self.tributary_txn,
self.set, self.set.set,
self.validators, self.validators,
self.total_weight, self.total_weight,
block_number, block_number,
Topic::SlashReport, topic.unwrap(),
signer, signer,
self.validator_weights[&signer], self.validator_weights[&signer],
&slash_points, &slash_points,
@@ -307,10 +426,6 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
have a supermajority agree the slash should be fatal. If there isn't a supermajority, have a supermajority agree the slash should be fatal. If there isn't a supermajority,
but the median believe the slash should be fatal, we need to fallback to a large but the median believe the slash should be fatal, we need to fallback to a large
constant. constant.
Also, TODO, each slash point should probably be considered as
`MAX_KEY_SHARES_PER_SET * BLOCK_TIME` seconds of downtime. As this time crosses
various thresholds (1 day, 3 days, etc), a multiplier should be attached.
*/ */
let mut median_slash_report = Vec::with_capacity(self.validators.len()); let mut median_slash_report = Vec::with_capacity(self.validators.len());
for i in 0 .. self.validators.len() { for i in 0 .. self.validators.len() {
@@ -351,7 +466,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
// Create the resulting slash report // Create the resulting slash report
let mut slash_report = vec![]; let mut slash_report = vec![];
for (validator, points) in self.validators.iter().copied().zip(amortized_slash_report) { for points in amortized_slash_report {
// TODO: Natively store this as a `Slash` // TODO: Natively store this as a `Slash`
if points == u32::MAX { if points == u32::MAX {
slash_report.push(Slash::Fatal); slash_report.push(Slash::Fatal);
@@ -364,7 +479,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
// Recognize the topic for signing the slash report // Recognize the topic for signing the slash report
TributaryDb::recognize_topic( TributaryDb::recognize_topic(
self.tributary_txn, self.tributary_txn,
self.set, self.set.set,
Topic::Sign { Topic::Sign {
id: VariantSignId::SlashReport, id: VariantSignId::SlashReport,
attempt: 0, attempt: 0,
@@ -374,24 +489,24 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
// Send the message for the processor to start signing // Send the message for the processor to start signing
TributaryDb::send_message( TributaryDb::send_message(
self.tributary_txn, self.tributary_txn,
self.set, self.set.set,
messages::coordinator::CoordinatorMessage::SignSlashReport { messages::coordinator::CoordinatorMessage::SignSlashReport {
session: self.set.session, session: self.set.set.session,
report: slash_report, slash_report: slash_report.try_into().unwrap(),
}, },
); );
} }
}; };
} }
Transaction::Sign { id, attempt, round, data, signed } => { Transaction::Sign { id: _, attempt: _, round, data, signed } => {
let topic = Topic::Sign { id, attempt, round }; let topic = topic.unwrap();
let signer = signer(signed); let signer = signer(signed);
if u64::try_from(data.len()).unwrap() != self.validator_weights[&signer] { if data.len() != usize::from(self.validator_weights[&signer]) {
TributaryDb::fatal_slash( TributaryDb::fatal_slash(
self.tributary_txn, self.tributary_txn,
self.set, self.set.set,
signer, signer,
"signer signed with a distinct amount of key shares than they had key shares", "signer signed with a distinct amount of key shares than they had key shares",
); );
@@ -400,7 +515,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
match TributaryDb::accumulate( match TributaryDb::accumulate(
self.tributary_txn, self.tributary_txn,
self.set, self.set.set,
self.validators, self.validators,
self.total_weight, self.total_weight,
block_number, block_number,
@@ -411,12 +526,22 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
) { ) {
DataSet::None => {} DataSet::None => {}
DataSet::Participating(data_set) => { DataSet::Participating(data_set) => {
let id = topic.sign_id(self.set).expect("Topic::Sign didn't have SignId"); let id = topic.sign_id(self.set.set).expect("Topic::Sign didn't have SignId");
let flatten_data_set = |data_set| todo!("TODO"); let flatten_data_set = |data_set: HashMap<_, Vec<_>>| {
let mut entries = HashMap::with_capacity(usize::from(self.total_weight));
for (validator, shares) in data_set {
let indexes = &self.set.participant_indexes[&validator];
assert_eq!(indexes.len(), shares.len());
for (index, share) in indexes.iter().zip(shares) {
entries.insert(*index, share);
}
}
entries
};
let data_set = flatten_data_set(data_set); let data_set = flatten_data_set(data_set);
TributaryDb::send_message( TributaryDb::send_message(
self.tributary_txn, self.tributary_txn,
self.set, self.set.set,
match round { match round {
SigningProtocolRound::Preprocess => { SigningProtocolRound::Preprocess => {
messages::sign::CoordinatorMessage::Preprocesses { id, preprocesses: data_set } messages::sign::CoordinatorMessage::Preprocesses { id, preprocesses: data_set }
@@ -427,13 +552,13 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
}, },
) )
} }
}; }
} }
} }
} }
fn handle_block(mut self, block_number: u64, block: Block<Transaction>) { fn handle_block(mut self, block_number: u64, block: Block<Transaction>) {
TributaryDb::start_of_block(self.tributary_txn, self.set, block_number); TributaryDb::start_of_block(self.tributary_txn, self.set.set, block_number);
for tx in block.transactions { for tx in block.transactions {
match tx { match tx {
@@ -460,7 +585,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
// errors, mark the node as fatally slashed // errors, mark the node as fatally slashed
TributaryDb::fatal_slash( TributaryDb::fatal_slash(
self.tributary_txn, self.tributary_txn,
self.set, self.set.set,
SeraiAddress(msgs.0.msg.sender), SeraiAddress(msgs.0.msg.sender),
&format!("invalid tendermint messages: {msgs:?}"), &format!("invalid tendermint messages: {msgs:?}"),
); );
@@ -476,10 +601,10 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
/// The task to scan the Tributary, populating `ProcessorMessages`. /// The task to scan the Tributary, populating `ProcessorMessages`.
pub struct ScanTributaryTask<TD: Db, P: P2p> { pub struct ScanTributaryTask<TD: Db, P: P2p> {
tributary_db: TD, tributary_db: TD,
set: ValidatorSet, set: NewSetInformation,
validators: Vec<SeraiAddress>, validators: Vec<SeraiAddress>,
total_weight: u64, total_weight: u16,
validator_weights: HashMap<SeraiAddress, u64>, validator_weights: HashMap<SeraiAddress, u16>,
tributary: TributaryReader<TD, Transaction>, tributary: TributaryReader<TD, Transaction>,
_p2p: PhantomData<P>, _p2p: PhantomData<P>,
} }
@@ -488,15 +613,13 @@ impl<TD: Db, P: P2p> ScanTributaryTask<TD, P> {
/// Create a new instance of this task. /// Create a new instance of this task.
pub fn new( pub fn new(
tributary_db: TD, tributary_db: TD,
new_set: &NewSetInformation, set: NewSetInformation,
tributary: TributaryReader<TD, Transaction>, tributary: TributaryReader<TD, Transaction>,
) -> Self { ) -> Self {
let mut validators = Vec::with_capacity(new_set.validators.len()); let mut validators = Vec::with_capacity(set.validators.len());
let mut total_weight = 0; let mut total_weight = 0;
let mut validator_weights = HashMap::with_capacity(new_set.validators.len()); let mut validator_weights = HashMap::with_capacity(set.validators.len());
for (validator, weight) in new_set.validators.iter().copied() { for (validator, weight) in set.validators.iter().copied() {
let validator = SeraiAddress::from(validator);
let weight = u64::from(weight);
validators.push(validator); validators.push(validator);
total_weight += weight; total_weight += weight;
validator_weights.insert(validator, weight); validator_weights.insert(validator, weight);
@@ -504,7 +627,7 @@ impl<TD: Db, P: P2p> ScanTributaryTask<TD, P> {
ScanTributaryTask { ScanTributaryTask {
tributary_db, tributary_db,
set: new_set.set, set,
validators, validators,
total_weight, total_weight,
validator_weights, validator_weights,
@@ -520,7 +643,7 @@ impl<TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<TD, P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> { fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move { async move {
let (mut last_block_number, mut last_block_hash) = let (mut last_block_number, mut last_block_hash) =
TributaryDb::last_handled_tributary_block(&self.tributary_db, self.set) TributaryDb::last_handled_tributary_block(&self.tributary_db, self.set.set)
.unwrap_or((0, self.tributary.genesis())); .unwrap_or((0, self.tributary.genesis()));
let mut made_progress = false; let mut made_progress = false;
@@ -539,7 +662,7 @@ impl<TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<TD, P> {
if !self.tributary.locally_provided_txs_in_block(&block_hash, order) { if !self.tributary.locally_provided_txs_in_block(&block_hash, order) {
return Err(format!( return Err(format!(
"didn't have the provided Transactions on-chain for set (ephemeral error): {:?}", "didn't have the provided Transactions on-chain for set (ephemeral error): {:?}",
self.set self.set.set
)); ));
} }
} }
@@ -549,7 +672,7 @@ impl<TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<TD, P> {
_td: PhantomData::<TD>, _td: PhantomData::<TD>,
_p2p: PhantomData::<P>, _p2p: PhantomData::<P>,
tributary_txn: &mut tributary_txn, tributary_txn: &mut tributary_txn,
set: self.set, set: &self.set,
validators: &self.validators, validators: &self.validators,
total_weight: self.total_weight, total_weight: self.total_weight,
validator_weights: &self.validator_weights, validator_weights: &self.validator_weights,
@@ -557,7 +680,7 @@ impl<TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<TD, P> {
.handle_block(block_number, block); .handle_block(block_number, block);
TributaryDb::set_last_handled_tributary_block( TributaryDb::set_last_handled_tributary_block(
&mut tributary_txn, &mut tributary_txn,
self.set, self.set.set,
block_number, block_number,
block_hash, block_hash,
); );
@@ -577,7 +700,6 @@ impl<TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<TD, P> {
pub fn slash_report_transaction(getter: &impl Get, set: &NewSetInformation) -> Transaction { pub fn slash_report_transaction(getter: &impl Get, set: &NewSetInformation) -> Transaction {
let mut slash_points = Vec::with_capacity(set.validators.len()); let mut slash_points = Vec::with_capacity(set.validators.len());
for (validator, _weight) in set.validators.iter().copied() { for (validator, _weight) in set.validators.iter().copied() {
let validator = SeraiAddress::from(validator);
slash_points.push(SlashPoints::get(getter, set.set, validator).unwrap_or(0)); slash_points.push(SlashPoints::get(getter, set.set, validator).unwrap_or(0));
} }
Transaction::SlashReport { slash_points, signed: Signed::default() } Transaction::SlashReport { slash_points, signed: Signed::default() }

View File

@@ -25,6 +25,8 @@ use tributary_sdk::{
}, },
}; };
use crate::db::Topic;
/// The round this data is for, within a signing protocol. /// The round this data is for, within a signing protocol.
#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, BorshSerialize, BorshDeserialize)] #[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, BorshSerialize, BorshDeserialize)]
pub enum SigningProtocolRound { pub enum SigningProtocolRound {
@@ -180,7 +182,7 @@ pub enum Transaction {
/// ///
/// This is provided after the block has been cosigned. /// This is provided after the block has been cosigned.
/// ///
/// With the acknowledgement of a Substrate block, we can whitelist all the `VariantSignId`s /// With the acknowledgement of a Substrate block, we can recognize all the `VariantSignId`s
/// resulting from its handling. /// resulting from its handling.
SubstrateBlock { SubstrateBlock {
/// The hash of the Substrate block /// The hash of the Substrate block
@@ -257,9 +259,7 @@ impl TransactionTrait for Transaction {
Transaction::Cosign { .. } => TransactionKind::Provided("Cosign"), Transaction::Cosign { .. } => TransactionKind::Provided("Cosign"),
Transaction::Cosigned { .. } => TransactionKind::Provided("Cosigned"), Transaction::Cosigned { .. } => TransactionKind::Provided("Cosigned"),
// TODO: Provide this
Transaction::SubstrateBlock { .. } => TransactionKind::Provided("SubstrateBlock"), Transaction::SubstrateBlock { .. } => TransactionKind::Provided("SubstrateBlock"),
// TODO: Provide this
Transaction::Batch { .. } => TransactionKind::Provided("Batch"), Transaction::Batch { .. } => TransactionKind::Provided("Batch"),
Transaction::Sign { id, attempt, round, signed, .. } => TransactionKind::Signed( Transaction::Sign { id, attempt, round, signed, .. } => TransactionKind::Signed(
@@ -318,6 +318,36 @@ impl TransactionTrait for Transaction {
} }
impl Transaction { impl Transaction {
/// The topic in the database for this transaction.
pub fn topic(&self) -> Option<Topic> {
#[allow(clippy::match_same_arms)] // This doesn't make semantic sense here
match self {
Transaction::RemoveParticipant { participant, .. } => {
Some(Topic::RemoveParticipant { participant: *participant })
}
Transaction::DkgParticipation { .. } => None,
Transaction::DkgConfirmationPreprocess { attempt, .. } => {
Some(Topic::DkgConfirmation { attempt: *attempt, round: SigningProtocolRound::Preprocess })
}
Transaction::DkgConfirmationShare { attempt, .. } => {
Some(Topic::DkgConfirmation { attempt: *attempt, round: SigningProtocolRound::Share })
}
// Provided TXs
Transaction::Cosign { .. } |
Transaction::Cosigned { .. } |
Transaction::SubstrateBlock { .. } |
Transaction::Batch { .. } => None,
Transaction::Sign { id, attempt, round, .. } => {
Some(Topic::Sign { id: *id, attempt: *attempt, round: *round })
}
Transaction::SlashReport { .. } => Some(Topic::SlashReport),
}
}
/// Sign a transaction. /// Sign a transaction.
/// ///
/// Panics if signing a transaction whose type isn't `TransactionKind::Signed`. /// Panics if signing a transaction whose type isn't `TransactionKind::Signed`.

View File

@@ -29,8 +29,8 @@ pub(crate) fn generators<C: EvrfCurve>() -> &'static EvrfGenerators<C> {
.or_insert_with(|| { .or_insert_with(|| {
// If we haven't prior needed generators for this Ciphersuite, generate new ones // If we haven't prior needed generators for this Ciphersuite, generate new ones
Box::leak(Box::new(EvrfGenerators::<C>::new( Box::leak(Box::new(EvrfGenerators::<C>::new(
((MAX_KEY_SHARES_PER_SET * 2 / 3) + 1).try_into().unwrap(), (MAX_KEY_SHARES_PER_SET * 2 / 3) + 1,
MAX_KEY_SHARES_PER_SET.try_into().unwrap(), MAX_KEY_SHARES_PER_SET,
))) )))
}) })
.downcast_ref() .downcast_ref()

View File

@@ -7,7 +7,7 @@ use borsh::{BorshSerialize, BorshDeserialize};
use dkg::Participant; use dkg::Participant;
use serai_primitives::BlockHash; use serai_primitives::BlockHash;
use validator_sets_primitives::{Session, KeyPair, Slash}; use validator_sets_primitives::{Session, KeyPair, SlashReport};
use coins_primitives::OutInstructionWithBalance; use coins_primitives::OutInstructionWithBalance;
use in_instructions_primitives::SignedBatch; use in_instructions_primitives::SignedBatch;
@@ -100,7 +100,9 @@ pub mod sign {
Self::Cosign(cosign) => { Self::Cosign(cosign) => {
f.debug_struct("VariantSignId::Cosign").field("0", &cosign).finish() f.debug_struct("VariantSignId::Cosign").field("0", &cosign).finish()
} }
Self::Batch(batch) => f.debug_struct("VariantSignId::Batch").field("0", &batch).finish(), Self::Batch(batch) => {
f.debug_struct("VariantSignId::Batch").field("0", &hex::encode(batch)).finish()
}
Self::SlashReport => f.debug_struct("VariantSignId::SlashReport").finish(), Self::SlashReport => f.debug_struct("VariantSignId::SlashReport").finish(),
Self::Transaction(tx) => { Self::Transaction(tx) => {
f.debug_struct("VariantSignId::Transaction").field("0", &hex::encode(tx)).finish() f.debug_struct("VariantSignId::Transaction").field("0", &hex::encode(tx)).finish()
@@ -168,7 +170,7 @@ pub mod coordinator {
/// Sign the slash report for this session. /// Sign the slash report for this session.
/// ///
/// This is sent by the Coordinator's Tributary scanner. /// This is sent by the Coordinator's Tributary scanner.
SignSlashReport { session: Session, report: Vec<Slash> }, SignSlashReport { session: Session, slash_report: SlashReport },
} }
// This set of messages is sent entirely and solely by serai-processor-bin's implementation of // This set of messages is sent entirely and solely by serai-processor-bin's implementation of
@@ -178,7 +180,7 @@ pub mod coordinator {
pub enum ProcessorMessage { pub enum ProcessorMessage {
CosignedBlock { cosign: SignedCosign }, CosignedBlock { cosign: SignedCosign },
SignedBatch { batch: SignedBatch }, SignedBatch { batch: SignedBatch },
SignedSlashReport { session: Session, signature: Vec<u8> }, SignedSlashReport { session: Session, slash_report: SlashReport, signature: [u8; 64] },
} }
} }

View File

@@ -21,7 +21,7 @@ pub enum Call {
}, },
report_slashes { report_slashes {
network: NetworkId, network: NetworkId,
slashes: BoundedVec<(SeraiAddress, u32), ConstU32<{ MAX_KEY_SHARES_PER_SET_U32 / 3 }>>, slashes: SlashReport,
signature: Signature, signature: Signature,
}, },
allocate { allocate {

View File

@@ -5,10 +5,10 @@ use sp_runtime::BoundedVec;
use serai_abi::primitives::Amount; use serai_abi::primitives::Amount;
pub use serai_abi::validator_sets::primitives; pub use serai_abi::validator_sets::primitives;
use primitives::{MAX_KEY_LEN, Session, ValidatorSet, KeyPair}; use primitives::{MAX_KEY_LEN, Session, ValidatorSet, KeyPair, SlashReport};
use crate::{ use crate::{
primitives::{EmbeddedEllipticCurve, NetworkId, SeraiAddress}, primitives::{EmbeddedEllipticCurve, NetworkId},
Transaction, Serai, TemporalSerai, SeraiError, Transaction, Serai, TemporalSerai, SeraiError,
}; };
@@ -238,12 +238,7 @@ impl<'a> SeraiValidatorSets<'a> {
pub fn report_slashes( pub fn report_slashes(
network: NetworkId, network: NetworkId,
// TODO: This bounds a maximum length but takes more space than just publishing all the u32s slashes: SlashReport,
// (50 * (32 + 4)) > (150 * 4)
slashes: sp_runtime::BoundedVec<
(SeraiAddress, u32),
sp_core::ConstU32<{ primitives::MAX_KEY_SHARES_PER_SET_U32 / 3 }>,
>,
signature: Signature, signature: Signature,
) -> Transaction { ) -> Transaction {
Serai::unsigned(serai_abi::Call::ValidatorSets( Serai::unsigned(serai_abi::Call::ValidatorSets(

View File

@@ -111,13 +111,7 @@ impl From<Call> for RuntimeCall {
serai_abi::validator_sets::Call::report_slashes { network, slashes, signature } => { serai_abi::validator_sets::Call::report_slashes { network, slashes, signature } => {
RuntimeCall::ValidatorSets(validator_sets::Call::report_slashes { RuntimeCall::ValidatorSets(validator_sets::Call::report_slashes {
network, network,
slashes: <_>::try_from( slashes,
slashes
.into_iter()
.map(|(addr, slash)| (PublicKey::from(addr), slash))
.collect::<Vec<_>>(),
)
.unwrap(),
signature, signature,
}) })
} }
@@ -301,17 +295,7 @@ impl TryInto<Call> for RuntimeCall {
} }
} }
validator_sets::Call::report_slashes { network, slashes, signature } => { validator_sets::Call::report_slashes { network, slashes, signature } => {
serai_abi::validator_sets::Call::report_slashes { serai_abi::validator_sets::Call::report_slashes { network, slashes, signature }
network,
slashes: <_>::try_from(
slashes
.into_iter()
.map(|(addr, slash)| (SeraiAddress::from(addr), slash))
.collect::<Vec<_>>(),
)
.unwrap(),
signature,
}
} }
validator_sets::Call::allocate { network, amount } => { validator_sets::Call::allocate { network, amount } => {
serai_abi::validator_sets::Call::allocate { network, amount } serai_abi::validator_sets::Call::allocate { network, amount }

View File

@@ -1010,7 +1010,7 @@ pub mod pallet {
pub fn report_slashes( pub fn report_slashes(
origin: OriginFor<T>, origin: OriginFor<T>,
network: NetworkId, network: NetworkId,
slashes: BoundedVec<(Public, u32), ConstU32<{ MAX_KEY_SHARES_PER_SET_U32 / 3 }>>, slashes: SlashReport,
signature: Signature, signature: Signature,
) -> DispatchResult { ) -> DispatchResult {
ensure_none(origin)?; ensure_none(origin)?;

View File

@@ -210,6 +210,30 @@ impl Slash {
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct SlashReport(pub BoundedVec<Slash, ConstU32<{ MAX_KEY_SHARES_PER_SET_U32 }>>); pub struct SlashReport(pub BoundedVec<Slash, ConstU32<{ MAX_KEY_SHARES_PER_SET_U32 }>>);
#[cfg(feature = "borsh")]
impl BorshSerialize for SlashReport {
fn serialize<W: borsh::io::Write>(&self, writer: &mut W) -> borsh::io::Result<()> {
BorshSerialize::serialize(self.0.as_slice(), writer)
}
}
#[cfg(feature = "borsh")]
impl BorshDeserialize for SlashReport {
fn deserialize_reader<R: borsh::io::Read>(reader: &mut R) -> borsh::io::Result<Self> {
let slashes = Vec::<Slash>::deserialize_reader(reader)?;
slashes
.try_into()
.map(Self)
.map_err(|_| borsh::io::Error::other("length of slash report exceeds max validators"))
}
}
impl TryFrom<Vec<Slash>> for SlashReport {
type Error = &'static str;
fn try_from(slashes: Vec<Slash>) -> Result<SlashReport, &'static str> {
slashes.try_into().map(Self).map_err(|_| "length of slash report exceeds max validators")
}
}
// This is assumed binding to the ValidatorSet via the key signed with // This is assumed binding to the ValidatorSet via the key signed with
pub fn report_slashes_message(slashes: &SlashReport) -> Vec<u8> { pub fn report_slashes_message(slashes: &SlashReport) -> Vec<u8> {
(b"ValidatorSets-report_slashes", slashes).encode() (b"ValidatorSets-report_slashes", slashes).encode()