Files
serai/coordinator/src/tributary/scan.rs
Luke Parker 0a611cb155 Further flesh out tributary scanning
Renames `label` to `round` since `Label` was renamed to `SigningProtocolRound`.

Adds some more context-less validation to transactions which used to be done
within the custom decode function which was simplified via the usage of borsh.

Documents in processor-messages where the Coordinator sends each of its
messages.
2025-01-03 06:57:28 -05:00

397 lines
14 KiB
Rust

use core::future::Future;
use std::collections::HashMap;
use ciphersuite::group::GroupEncoding;
use serai_client::{
primitives::SeraiAddress,
validator_sets::primitives::{ValidatorSet, Slash},
};
use tributary::{
Signed as TributarySigned, TransactionKind, TransactionTrait,
Transaction as TributaryTransaction, Block, TributaryReader,
tendermint::{
tx::{TendermintTx, Evidence, decode_signed_message},
TendermintNetwork,
},
};
use serai_db::*;
use serai_task::ContinuallyRan;
use messages::sign::VariantSignId;
use crate::tributary::{
db::*,
transaction::{SigningProtocolRound, Signed, Transaction},
};
struct ScanBlock<'a, D: DbTxn, TD: Db> {
txn: &'a mut D,
set: ValidatorSet,
validators: &'a [SeraiAddress],
total_weight: u64,
validator_weights: &'a HashMap<SeraiAddress, u64>,
tributary: &'a TributaryReader<TD, Transaction>,
}
impl<'a, D: DbTxn, TD: Db> ScanBlock<'a, D, TD> {
fn handle_application_tx(&mut self, block_number: u64, tx: Transaction) {
let signer = |signed: Signed| SeraiAddress(signed.signer.to_bytes());
if let TransactionKind::Signed(_, TributarySigned { signer, .. }) = tx.kind() {
// Don't handle transactions from those fatally slashed
// TODO: The fact they can publish these TXs makes this a notable spam vector
if TributaryDb::is_fatally_slashed(self.txn, self.set, SeraiAddress(signer.to_bytes())) {
return;
}
}
match tx {
// Accumulate this vote and fatally slash the participant if past the threshold
Transaction::RemoveParticipant { participant, signed } => {
let signer = signer(signed);
// Check the participant voted to be removed actually exists
if !self.validators.iter().any(|validator| *validator == participant) {
TributaryDb::fatal_slash(
self.txn,
self.set,
signer,
"voted to remove non-existent participant",
);
return;
}
match TributaryDb::accumulate(
self.txn,
self.set,
self.validators,
self.total_weight,
block_number,
Topic::RemoveParticipant { participant },
signer,
self.validator_weights[&signer],
&(),
) {
DataSet::None => {}
DataSet::Participating(_) => {
TributaryDb::fatal_slash(self.txn, self.set, participant, "voted to remove");
}
};
}
// Send the participation to the processor
Transaction::DkgParticipation { participation, signed } => {
TributaryDb::send_message(
self.txn,
self.set,
messages::key_gen::CoordinatorMessage::Participation {
session: self.set.session,
participant: todo!("TODO"),
participation,
},
);
}
Transaction::DkgConfirmationPreprocess { attempt, preprocess, signed } => {
// Accumulate the preprocesses into our own FROST attempt manager
todo!("TODO")
}
Transaction::DkgConfirmationShare { attempt, share, signed } => {
// Accumulate the shares into our own FROST attempt manager
todo!("TODO")
}
Transaction::Cosign { substrate_block_hash } => {
// Update the latest intended-to-be-cosigned Substrate block
TributaryDb::set_latest_substrate_block_to_cosign(self.txn, self.set, substrate_block_hash);
// TODO: If we aren't currently cosigning a block, start cosigning this one
}
Transaction::Cosigned { substrate_block_hash } => {
// Start cosigning the latest intended-to-be-cosigned block
let Some(latest_substrate_block_to_cosign) =
TributaryDb::latest_substrate_block_to_cosign(self.txn, self.set)
else {
return;
};
// If this is the block we just cosigned, return
if latest_substrate_block_to_cosign == substrate_block_hash {
return;
}
let substrate_block_number = todo!("TODO");
// Whitelist the topic
TributaryDb::recognize_topic(
self.txn,
self.set,
Topic::Sign {
id: VariantSignId::Cosign(substrate_block_number),
attempt: 0,
round: SigningProtocolRound::Preprocess,
},
);
// Send the message for the processor to start signing
TributaryDb::send_message(
self.txn,
self.set,
messages::coordinator::CoordinatorMessage::CosignSubstrateBlock {
session: self.set.session,
block_number: substrate_block_number,
block: substrate_block_hash,
},
);
}
Transaction::SubstrateBlock { hash } => {
// Whitelist all of the IDs this Substrate block causes to be signed
todo!("TODO")
}
Transaction::Batch { hash } => {
// Whitelist the signing of this batch, publishing our own preprocess
todo!("TODO")
}
Transaction::SlashReport { slash_points, signed } => {
let signer = signer(signed);
if slash_points.len() != self.validators.len() {
TributaryDb::fatal_slash(
self.txn,
self.set,
signer,
"slash report was for a distinct amount of signers",
);
return;
}
// Accumulate, and if past the threshold, calculate *the* slash report and start signing it
match TributaryDb::accumulate(
self.txn,
self.set,
self.validators,
self.total_weight,
block_number,
Topic::SlashReport,
signer,
self.validator_weights[&signer],
&slash_points,
) {
DataSet::None => {}
DataSet::Participating(data_set) => {
// Find the median reported slashes for this validator
// TODO: This lets 34% perform a fatal slash. Should that be allowed?
let mut median_slash_report = Vec::with_capacity(self.validators.len());
for i in 0 .. self.validators.len() {
let mut this_validator =
data_set.values().map(|report| report[i]).collect::<Vec<_>>();
this_validator.sort_unstable();
// Choose the median, where if there are two median values, the lower one is chosen
let median_index = if (this_validator.len() % 2) == 1 {
this_validator.len() / 2
} else {
(this_validator.len() / 2) - 1
};
median_slash_report.push(this_validator[median_index]);
}
// We only publish slashes for the `f` worst performers to:
// 1) Effect amnesty if there were network disruptions which affected everyone
// 2) Ensure the signing threshold doesn't have a disincentive to do their job
// Find the worst performer within the signing threshold's slash points
let f = (self.validators.len() - 1) / 3;
let worst_validator_in_supermajority_slash_points = {
let mut sorted_slash_points = median_slash_report.clone();
sorted_slash_points.sort_unstable();
// This won't be a valid index if `f == 0`, which means we don't have any validators
// to slash
let index_of_first_validator_to_slash = self.validators.len() - f;
let index_of_worst_validator_in_supermajority = index_of_first_validator_to_slash - 1;
sorted_slash_points[index_of_worst_validator_in_supermajority]
};
// Perform the amortization
for slash_points in &mut median_slash_report {
*slash_points =
slash_points.saturating_sub(worst_validator_in_supermajority_slash_points)
}
let amortized_slash_report = median_slash_report;
// Create the resulting slash report
let mut slash_report = vec![];
for (validator, points) in self.validators.iter().copied().zip(amortized_slash_report) {
if points != 0 {
slash_report.push(Slash { key: validator.into(), points });
}
}
assert!(slash_report.len() <= f);
// Recognize the topic for signing the slash report
TributaryDb::recognize_topic(
self.txn,
self.set,
Topic::Sign {
id: VariantSignId::SlashReport,
attempt: 0,
round: SigningProtocolRound::Preprocess,
},
);
// Send the message for the processor to start signing
TributaryDb::send_message(
self.txn,
self.set,
messages::coordinator::CoordinatorMessage::SignSlashReport {
session: self.set.session,
report: slash_report,
},
);
}
};
}
Transaction::Sign { id, attempt, round, data, signed } => {
let topic = Topic::Sign { id, attempt, round };
let signer = signer(signed);
if u64::try_from(data.len()).unwrap() != self.validator_weights[&signer] {
TributaryDb::fatal_slash(
self.txn,
self.set,
signer,
"signer signed with a distinct amount of key shares than they had key shares",
);
return;
}
match TributaryDb::accumulate(
self.txn,
self.set,
self.validators,
self.total_weight,
block_number,
topic,
signer,
self.validator_weights[&signer],
&data,
) {
DataSet::None => {}
DataSet::Participating(data_set) => {
let id = topic.sign_id(self.set).expect("Topic::Sign didn't have SignId");
let flatten_data_set = |data_set| todo!("TODO");
let data_set = flatten_data_set(data_set);
TributaryDb::send_message(
self.txn,
self.set,
match round {
SigningProtocolRound::Preprocess => {
messages::sign::CoordinatorMessage::Preprocesses { id, preprocesses: data_set }
}
SigningProtocolRound::Share => {
messages::sign::CoordinatorMessage::Shares { id, shares: data_set }
}
},
)
}
};
}
}
}
fn handle_block(mut self, block_number: u64, block: Block<Transaction>) {
TributaryDb::start_of_block(self.txn, self.set, block_number);
for tx in block.transactions {
match tx {
TributaryTransaction::Tendermint(TendermintTx::SlashEvidence(ev)) => {
// Since the evidence is on the chain, it will have already been validated
// We can just punish the signer
let data = match ev {
Evidence::ConflictingMessages(first, second) => (first, Some(second)),
Evidence::InvalidPrecommit(first) | Evidence::InvalidValidRound(first) => (first, None),
};
/* TODO
let msgs = (
decode_signed_message::<TendermintNetwork<D, Transaction, P>>(&data.0).unwrap(),
if data.1.is_some() {
Some(
decode_signed_message::<TendermintNetwork<D, Transaction, P>>(&data.1.unwrap())
.unwrap(),
)
} else {
None
},
);
// Since anything with evidence is fundamentally faulty behavior, not just temporal
// errors, mark the node as fatally slashed
TributaryDb::fatal_slash(
self.txn, msgs.0.msg.sender, &format!("invalid tendermint messages: {msgs:?}"));
*/
todo!("TODO")
}
TributaryTransaction::Application(tx) => {
self.handle_application_tx(block_number, tx);
}
}
}
}
}
struct ScanTributaryTask<D: Db, TD: Db> {
db: D,
set: ValidatorSet,
validators: Vec<SeraiAddress>,
total_weight: u64,
validator_weights: HashMap<SeraiAddress, u64>,
tributary: TributaryReader<TD, Transaction>,
}
impl<D: Db, TD: Db> ContinuallyRan for ScanTributaryTask<D, TD> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move {
let (mut last_block_number, mut last_block_hash) =
TributaryDb::last_handled_tributary_block(&self.db, self.set)
.unwrap_or((0, self.tributary.genesis()));
let mut made_progess = false;
while let Some(next) = self.tributary.block_after(&last_block_hash) {
let block = self.tributary.block(&next).unwrap();
let block_number = last_block_number + 1;
let block_hash = block.hash();
// Make sure we have all of the provided transactions for this block
for tx in &block.transactions {
let TransactionKind::Provided(order) = tx.kind() else {
continue;
};
// make sure we have all the provided txs in this block locally
if !self.tributary.locally_provided_txs_in_block(&block_hash, order) {
return Err(format!(
"didn't have the provided Transactions on-chain for set (ephemeral error): {:?}",
self.set
));
}
}
let mut txn = self.db.txn();
(ScanBlock {
txn: &mut txn,
set: self.set,
validators: &self.validators,
total_weight: self.total_weight,
validator_weights: &self.validator_weights,
tributary: &self.tributary,
})
.handle_block(block_number, block);
TributaryDb::set_last_handled_tributary_block(&mut txn, self.set, block_number, block_hash);
last_block_number = block_number;
last_block_hash = block_hash;
txn.commit();
made_progess = true;
}
Ok(made_progess)
}
}
}