Add flow to add transactions onto Tributaries

This commit is contained in:
Luke Parker
2025-01-12 07:32:45 -05:00
parent e35aa04afb
commit 0ce9aad9b2
6 changed files with 246 additions and 56 deletions

View File

@@ -9,8 +9,8 @@ use serai_client::{
};
use serai_cosign::SignedCosign;
use serai_coordinator_substrate::NewSetInformation;
use serai_coordinator_tributary::Transaction;
#[cfg(all(feature = "parity-db", not(feature = "rocksdb")))]
pub(crate) type Db = serai_db::ParityDb;
@@ -81,9 +81,33 @@ create_db! {
db_channel! {
Coordinator {
// Tributaries to clean up upon reboot
TributaryCleanup: () -> ValidatorSet,
// Cosigns we produced
SignedCosigns: () -> SignedCosign,
// Tributaries to clean up upon reboot
TributaryCleanup: () -> ValidatorSet,
}
}
mod _internal_db {
use super::*;
db_channel! {
Coordinator {
// Tributary transactions to publish
TributaryTransactions: (set: ValidatorSet) -> Transaction,
}
}
}
pub(crate) struct TributaryTransactions;
impl TributaryTransactions {
pub(crate) fn send(txn: &mut impl DbTxn, set: ValidatorSet, tx: &Transaction) {
// If this set has yet to be retired, send this transaction
if RetiredTributary::get(txn, set.network).map(|session| session.0) < Some(set.session.0) {
_internal_db::TributaryTransactions::send(txn, set, tx);
}
}
pub(crate) fn try_recv(txn: &mut impl DbTxn, set: ValidatorSet) -> Option<Transaction> {
_internal_db::TributaryTransactions::try_recv(txn, set)
}
}

View File

@@ -1,5 +1,5 @@
use core::{ops::Deref, time::Duration};
use std::{sync::Arc, time::Instant};
use std::{sync::Arc, collections::HashMap, time::Instant};
use zeroize::{Zeroize, Zeroizing};
use rand_core::{RngCore, OsRng};
@@ -15,6 +15,7 @@ use tokio::sync::mpsc;
use serai_client::{
primitives::{NetworkId, PublicKey},
validator_sets::primitives::ValidatorSet,
Serai,
};
use message_queue::{Service, client::MessageQueue};
@@ -23,7 +24,7 @@ use serai_task::{Task, TaskHandle, ContinuallyRan};
use serai_cosign::{Faulted, SignedCosign, Cosigning};
use serai_coordinator_substrate::{CanonicalEventStream, EphemeralEventStream, SignSlashReport};
use serai_coordinator_tributary::Transaction;
use serai_coordinator_tributary::{Signed, Transaction, SubstrateBlockPlans};
mod db;
use db::*;
@@ -178,7 +179,12 @@ async fn handle_processor_messages(
match msg {
messages::ProcessorMessage::KeyGen(msg) => match msg {
messages::key_gen::ProcessorMessage::Participation { session, participation } => {
todo!("TODO Transaction::DkgParticipation")
let set = ValidatorSet { network, session };
TributaryTransactions::send(
&mut txn,
set,
&Transaction::DkgParticipation { participation, signed: Signed::default() },
);
}
messages::key_gen::ProcessorMessage::GeneratedKeyPair {
session,
@@ -186,12 +192,28 @@ async fn handle_processor_messages(
network_key,
} => todo!("TODO Transaction::DkgConfirmationPreprocess"),
messages::key_gen::ProcessorMessage::Blame { session, participant } => {
todo!("TODO Transaction::RemoveParticipant")
let set = ValidatorSet { network, session };
TributaryTransactions::send(
&mut txn,
set,
&Transaction::RemoveParticipant {
participant: todo!("TODO"),
signed: Signed::default(),
},
);
}
},
messages::ProcessorMessage::Sign(msg) => match msg {
messages::sign::ProcessorMessage::InvalidParticipant { session, participant } => {
todo!("TODO Transaction::RemoveParticipant")
let set = ValidatorSet { network, session };
TributaryTransactions::send(
&mut txn,
set,
&Transaction::RemoveParticipant {
participant: todo!("TODO"),
signed: Signed::default(),
},
);
}
messages::sign::ProcessorMessage::Preprocesses { id, preprocesses } => {
todo!("TODO Transaction::Batch + Transaction::Sign")
@@ -211,7 +233,22 @@ async fn handle_processor_messages(
},
messages::ProcessorMessage::Substrate(msg) => match msg {
messages::substrate::ProcessorMessage::SubstrateBlockAck { block, plans } => {
todo!("TODO Transaction::SubstrateBlock")
let mut by_session = HashMap::new();
for plan in plans {
by_session
.entry(plan.session)
.or_insert_with(|| Vec::with_capacity(1))
.push(plan.transaction_plan_id);
}
for (session, plans) in by_session {
let set = ValidatorSet { network, session };
SubstrateBlockPlans::set(&mut txn, set, block, &plans);
TributaryTransactions::send(
&mut txn,
set,
&Transaction::SubstrateBlock { hash: block },
);
}
}
},
}
@@ -274,6 +311,8 @@ async fn main() {
prune_tributary_db(to_cleanup);
// Drain the cosign intents created for this set
while !Cosigning::<Db>::intended_cosigns(&mut txn, to_cleanup).is_empty() {}
// Drain the transactions to publish for this set
while TributaryTransactions::try_recv(&mut txn, to_cleanup).is_some() {}
// Remove the SignSlashReport notification
SignSlashReport::try_recv(&mut txn, to_cleanup);
}

View File

@@ -13,7 +13,7 @@ use serai_db::{Get, DbTxn, Db as DbTrait, create_db, db_channel};
use scale::Encode;
use serai_client::validator_sets::primitives::ValidatorSet;
use tributary_sdk::{TransactionError, ProvidedError, Tributary};
use tributary_sdk::{TransactionKind, TransactionError, ProvidedError, TransactionTrait, Tributary};
use serai_task::{Task, TaskHandle, ContinuallyRan};
@@ -24,7 +24,7 @@ use serai_coordinator_substrate::{NewSetInformation, SignSlashReport};
use serai_coordinator_tributary::{Transaction, ProcessorMessages, CosignIntents, ScanTributaryTask};
use serai_coordinator_p2p::P2p;
use crate::Db;
use crate::{Db, TributaryTransactions};
db_channel! {
Coordinator {
@@ -32,17 +32,6 @@ db_channel! {
}
}
/// Provides Cosign/Cosigned Transactions onto the Tributary.
pub(crate) struct ProvideCosignCosignedTransactionsTask<CD: DbTrait, TD: DbTrait, P: P2p> {
db: CD,
tributary_db: TD,
set: NewSetInformation,
tributary: Tributary<TD, Transaction, P>,
}
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
for ProvideCosignCosignedTransactionsTask<CD, TD, P>
{
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
/// Provide a Provided Transaction to the Tributary.
///
/// This is not a well-designed function. This is specific to the context in which its called,
@@ -62,10 +51,10 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
Err(ProvidedError::InvalidProvided(e)) => {
panic!("providing an invalid Provided transaction, tx: {tx:?}, error: {e:?}")
}
Err(ProvidedError::LocalMismatchesOnChain) => loop {
// The Tributary's scan task won't advance if we don't have the Provided transactions
// present on-chain, and this enters an infinite loop to block the calling task from
// advancing
Err(ProvidedError::LocalMismatchesOnChain) => loop {
log::error!(
"Tributary {:?} was supposed to provide {:?} but peers disagree, halting Tributary",
set,
@@ -77,6 +66,17 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
}
}
/// Provides Cosign/Cosigned Transactions onto the Tributary.
pub(crate) struct ProvideCosignCosignedTransactionsTask<CD: DbTrait, TD: DbTrait, P: P2p> {
db: CD,
tributary_db: TD,
set: NewSetInformation,
tributary: Tributary<TD, Transaction, P>,
}
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
for ProvideCosignCosignedTransactionsTask<CD, TD, P>
{
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move {
let mut made_progress = false;
@@ -145,6 +145,66 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
}
}
/// Adds all of the transactions sent via `TributaryTransactions`.
pub(crate) struct AddTributaryTransactionsTask<CD: DbTrait, TD: DbTrait, P: P2p> {
db: CD,
tributary_db: TD,
tributary: Tributary<TD, Transaction, P>,
set: ValidatorSet,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
}
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for AddTributaryTransactionsTask<CD, TD, P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move {
let mut made_progress = false;
loop {
let mut txn = self.db.txn();
let Some(mut tx) = TributaryTransactions::try_recv(&mut txn, self.set) else { break };
let kind = tx.kind();
match kind {
TransactionKind::Provided(_) => provide_transaction(self.set, &self.tributary, tx).await,
TransactionKind::Unsigned | TransactionKind::Signed(_, _) => {
// If this is a signed transaction, sign it
if matches!(kind, TransactionKind::Signed(_, _)) {
tx.sign(&mut OsRng, self.tributary.genesis(), &self.key);
}
// Actually add the transaction
// TODO: If this is a preprocess, make sure the topic has been recognized
let res = self.tributary.add_transaction(tx.clone()).await;
match &res {
// Fresh publication, already published
Ok(true | false) => {}
Err(
TransactionError::TooLargeTransaction |
TransactionError::InvalidSigner |
TransactionError::InvalidNonce |
TransactionError::InvalidSignature |
TransactionError::InvalidContent,
) => {
panic!("created an invalid transaction, tx: {tx:?}, err: {res:?}");
}
// We've published too many transactions recently
// Drop this txn to try to publish it again later on a future iteration
Err(TransactionError::TooManyInMempool) => {
drop(txn);
break;
}
// This isn't a Provided transaction so this should never be hit
Err(TransactionError::ProvidedAddedToMempool) => unreachable!(),
}
}
}
made_progress = true;
txn.commit();
}
Ok(made_progress)
}
}
}
/// Takes the messages from ScanTributaryTask and publishes them to the message-queue.
pub(crate) struct TributaryProcessorMessagesTask<TD: DbTrait> {
tributary_db: TD,
@@ -207,7 +267,10 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for SignSlashReportTask<CD
}
// We've published too many transactions recently
// Drop this txn to try to publish it again later on a future iteration
Err(TransactionError::TooManyInMempool) => return Ok(false),
Err(TransactionError::TooManyInMempool) => {
drop(txn);
return Ok(false);
}
// This isn't a Provided transaction so this should never be hit
Err(TransactionError::ProvidedAddedToMempool) => unreachable!(),
}
@@ -343,14 +406,27 @@ pub(crate) async fn spawn_tributary<P: P2p>(
tokio::spawn(
(SignSlashReportTask {
db: db.clone(),
tributary_db,
tributary_db: tributary_db.clone(),
tributary: tributary.clone(),
set: set.clone(),
key: serai_key,
key: serai_key.clone(),
})
.continually_run(sign_slash_report_task_def, vec![]),
);
// Spawn the add transactions task
let (add_tributary_transactions_task_def, add_tributary_transactions_task) = Task::new();
tokio::spawn(
(AddTributaryTransactionsTask {
db: db.clone(),
tributary_db,
tributary: tributary.clone(),
set: set.set,
key: serai_key,
})
.continually_run(add_tributary_transactions_task_def, vec![]),
);
// Whenever a new block occurs, immediately run the scan task
// This function also preserves the ProvideCosignCosignedTransactionsTask handle until the
// Tributary is retired, ensuring it isn't dropped prematurely and that the task don't run ad
@@ -360,6 +436,10 @@ pub(crate) async fn spawn_tributary<P: P2p>(
set.set,
tributary,
scan_tributary_task,
vec![provide_cosign_cosigned_transactions_task, sign_slash_report_task],
vec![
provide_cosign_cosigned_transactions_task,
sign_slash_report_task,
add_tributary_transactions_task,
],
));
}

View File

@@ -198,6 +198,9 @@ create_db!(
// If this block has already been cosigned.
Cosigned: (set: ValidatorSet, substrate_block_hash: [u8; 32]) -> (),
// The plans to whitelist upon a `Transaction::SubstrateBlock` being included on-chain.
SubstrateBlockPlans: (set: ValidatorSet, substrate_block_hash: [u8; 32]) -> Vec<[u8; 32]>,
// The weight accumulated for a topic.
AccumulatedWeight: (set: ValidatorSet, topic: Topic) -> u64,
// The entries accumulated for a topic, by validator.

View File

@@ -30,8 +30,7 @@ use serai_coordinator_substrate::NewSetInformation;
use messages::sign::VariantSignId;
mod transaction;
pub(crate) use transaction::{SigningProtocolRound, Signed};
pub use transaction::Transaction;
pub use transaction::{SigningProtocolRound, Signed, Transaction};
mod db;
use db::*;
@@ -63,6 +62,30 @@ impl CosignIntents {
}
}
/// The plans to whitelist upon a `Transaction::SubstrateBlock` being included on-chain.
pub struct SubstrateBlockPlans;
impl SubstrateBlockPlans {
/// Set the plans to whitelist upon the associated `Transaction::SubstrateBlock` being included
/// on-chain.
///
/// This must be done before the associated `Transaction::Cosign` is provided.
pub fn set(
txn: &mut impl DbTxn,
set: ValidatorSet,
substrate_block_hash: [u8; 32],
plans: &Vec<[u8; 32]>,
) {
db::SubstrateBlockPlans::set(txn, set, substrate_block_hash, &plans);
}
fn take(
txn: &mut impl DbTxn,
set: ValidatorSet,
substrate_block_hash: [u8; 32],
) -> Option<Vec<[u8; 32]>> {
db::SubstrateBlockPlans::take(txn, set, substrate_block_hash)
}
}
struct ScanBlock<'a, TD: Db, TDT: DbTxn, P: P2p> {
_td: PhantomData<TD>,
_p2p: PhantomData<P>,
@@ -222,11 +245,32 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
}
Transaction::SubstrateBlock { hash } => {
// Whitelist all of the IDs this Substrate block causes to be signed
todo!("TODO")
let plans = SubstrateBlockPlans::take(self.tributary_txn, self.set, hash).expect(
"Transaction::SubstrateBlock locally provided but SubstrateBlockPlans wasn't populated",
);
for plan in plans {
TributaryDb::recognize_topic(
self.tributary_txn,
self.set,
Topic::Sign {
id: VariantSignId::Transaction(plan),
attempt: 0,
round: SigningProtocolRound::Preprocess,
},
);
}
}
Transaction::Batch { hash } => {
// Whitelist the signing of this batch, publishing our own preprocess
todo!("TODO")
// Whitelist the signing of this batch
TributaryDb::recognize_topic(
self.tributary_txn,
self.set,
Topic::Sign {
id: VariantSignId::Batch(hash),
attempt: 0,
round: SigningProtocolRound::Preprocess,
},
);
}
Transaction::SlashReport { slash_points, signed } => {

View File

@@ -224,13 +224,13 @@ pub mod substrate {
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
pub struct PlanMeta {
pub session: Session,
pub transaction: [u8; 32],
pub transaction_plan_id: [u8; 32],
}
#[derive(Clone, Debug, BorshSerialize, BorshDeserialize)]
pub enum ProcessorMessage {
// TODO: Have the processor send this
SubstrateBlockAck { block: u64, plans: Vec<PlanMeta> },
SubstrateBlockAck { block: [u8; 32], plans: Vec<PlanMeta> },
}
}