Publish SlashReport onto the Tributary

This commit is contained in:
Luke Parker
2025-01-11 06:51:55 -05:00
parent e731b546ab
commit 74106b025f
7 changed files with 124 additions and 30 deletions

View File

@@ -17,7 +17,7 @@ use message_queue::{Service, client::MessageQueue};
use serai_task::{Task, TaskHandle, ContinuallyRan};
use serai_cosign::{SignedCosign, Cosigning};
use serai_coordinator_substrate::{CanonicalEventStream, EphemeralEventStream};
use serai_coordinator_substrate::{CanonicalEventStream, EphemeralEventStream, SignSlashReport};
use serai_coordinator_tributary::Transaction;
mod db;
@@ -148,6 +148,8 @@ async fn main() {
prune_tributary_db(to_cleanup);
// Drain the cosign intents created for this set
while !Cosigning::<Db>::intended_cosigns(&mut txn, to_cleanup).is_empty() {}
// Remove the SignSlashReport notification
SignSlashReport::try_recv(&mut txn, to_cleanup);
}
// Remove retired Tributaries from ActiveTributaries
@@ -198,7 +200,6 @@ async fn main() {
};
// Spawn the Substrate scanners
// TODO: SignSlashReport
let (substrate_task_def, substrate_task) = Task::new();
let (substrate_canonical_task_def, substrate_canonical_task) = Task::new();
tokio::spawn(

View File

@@ -2,6 +2,7 @@ use core::{future::Future, time::Duration};
use std::sync::Arc;
use zeroize::Zeroizing;
use rand_core::OsRng;
use blake2::{digest::typenum::U32, Digest, Blake2s};
use ciphersuite::{Ciphersuite, Ristretto};
@@ -12,14 +13,14 @@ use serai_db::{DbTxn, Db as DbTrait};
use scale::Encode;
use serai_client::validator_sets::primitives::ValidatorSet;
use tributary_sdk::{ProvidedError, Tributary};
use tributary_sdk::{TransactionError, ProvidedError, Tributary};
use serai_task::{Task, TaskHandle, ContinuallyRan};
use message_queue::{Service, Metadata, client::MessageQueue};
use serai_cosign::Cosigning;
use serai_coordinator_substrate::NewSetInformation;
use serai_coordinator_substrate::{NewSetInformation, SignSlashReport};
use serai_coordinator_tributary::{Transaction, ProcessorMessages, ScanTributaryTask};
use serai_coordinator_p2p::P2p;
@@ -27,9 +28,9 @@ use crate::Db;
/// Provides Cosign/Cosigned Transactions onto the Tributary.
pub(crate) struct ProvideCosignCosignedTransactionsTask<CD: DbTrait, TD: DbTrait, P: P2p> {
pub(crate) db: CD,
pub(crate) set: NewSetInformation,
pub(crate) tributary: Tributary<TD, Transaction, P>,
db: CD,
set: NewSetInformation,
tributary: Tributary<TD, Transaction, P>,
}
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
for ProvideCosignCosignedTransactionsTask<CD, TD, P>
@@ -128,9 +129,9 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
/// Takes the messages from ScanTributaryTask and publishes them to the message-queue.
pub(crate) struct TributaryProcessorMessagesTask<TD: DbTrait> {
pub(crate) tributary_db: TD,
pub(crate) set: ValidatorSet,
pub(crate) message_queue: Arc<MessageQueue>,
tributary_db: TD,
set: ValidatorSet,
message_queue: Arc<MessageQueue>,
}
impl<TD: DbTrait> ContinuallyRan for TributaryProcessorMessagesTask<TD> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
@@ -155,6 +156,51 @@ impl<TD: DbTrait> ContinuallyRan for TributaryProcessorMessagesTask<TD> {
}
}
/// Checks for the notification to sign a slash report and does so if present.
pub(crate) struct SignSlashReportTask<CD: DbTrait, TD: DbTrait, P: P2p> {
db: CD,
tributary_db: TD,
tributary: Tributary<TD, Transaction, P>,
set: NewSetInformation,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
}
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for SignSlashReportTask<CD, TD, P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move {
let mut txn = self.db.txn();
let Some(()) = SignSlashReport::try_recv(&mut txn, self.set.set) else { return Ok(false) };
// Fetch the slash report for this Tributary
let mut tx =
serai_coordinator_tributary::slash_report_transaction(&self.tributary_db, &self.set);
tx.sign(&mut OsRng, self.tributary.genesis(), &self.key);
let res = self.tributary.add_transaction(tx.clone()).await;
match &res {
// Fresh publication, already published
Ok(true | false) => {}
Err(
TransactionError::TooLargeTransaction |
TransactionError::InvalidSigner |
TransactionError::InvalidNonce |
TransactionError::InvalidSignature |
TransactionError::InvalidContent,
) => {
panic!("created an invalid SlashReport transaction, tx: {tx:?}, err: {res:?}");
}
// We've published too many transactions recently
// Drop this txn to try to publish it again later on a future iteration
Err(TransactionError::TooManyInMempool) => return Ok(false),
// This isn't a Provided transaction so this should never be hit
Err(TransactionError::ProvidedAddedToMempool) => unreachable!(),
}
txn.commit();
Ok(true)
}
}
}
/// Run the scan task whenever the Tributary adds a new block.
async fn scan_on_new_block<CD: DbTrait, TD: DbTrait, P: P2p>(
db: CD,
@@ -183,8 +229,14 @@ async fn scan_on_new_block<CD: DbTrait, TD: DbTrait, P: P2p>(
/// Spawn a Tributary.
///
/// This will spawn the Tributary, the Tributary scan task, forward the messages from the scan task
/// to the message queue, provide Cosign/Cosigned transactions, and inform the P2P network.
/// This will:
/// - Spawn the Tributary
/// - Inform the P2P network of the Tributary
/// - Spawn the ScanTributaryTask
/// - Spawn the ProvideCosignCosignedTransactionsTask
/// - Spawn the TributaryProcessorMessagesTask
/// - Spawn the SignSlashReportTask
/// - Iterate the scan task whenever a new block occurs (not just on the standard interval)
pub(crate) async fn spawn_tributary<P: P2p>(
db: Db,
message_queue: Arc<MessageQueue>,
@@ -219,10 +271,16 @@ pub(crate) async fn spawn_tributary<P: P2p>(
// Spawn the Tributary
let tributary_db = crate::db::tributary_db(set.set);
let tributary =
Tributary::new(tributary_db.clone(), genesis, start_time, serai_key, tributary_validators, p2p)
.await
.unwrap();
let tributary = Tributary::new(
tributary_db.clone(),
genesis,
start_time,
serai_key.clone(),
tributary_validators,
p2p,
)
.await
.unwrap();
let reader = tributary.reader();
// Inform the P2P network
@@ -256,12 +314,25 @@ pub(crate) async fn spawn_tributary<P: P2p>(
// Spawn the scan task
let (scan_tributary_task_def, scan_tributary_task) = Task::new();
tokio::spawn(
ScanTributaryTask::<_, _, P>::new(db.clone(), tributary_db, &set, reader)
ScanTributaryTask::<_, _, P>::new(db.clone(), tributary_db.clone(), &set, reader)
// This is the only handle for this TributaryProcessorMessagesTask, so when this task is
// dropped, it will be too
.continually_run(scan_tributary_task_def, vec![scan_tributary_messages_task]),
);
// Spawn the sign slash report task
let (sign_slash_report_task_def, sign_slash_report_task) = Task::new();
tokio::spawn(
(SignSlashReportTask {
db: db.clone(),
tributary_db,
tributary: tributary.clone(),
set: set.clone(),
key: serai_key,
})
.continually_run(sign_slash_report_task_def, vec![]),
);
// Whenever a new block occurs, immediately run the scan task
// This function also preserves the ProvideCosignCosignedTransactionsTask handle until the
// Tributary is retired, ensuring it isn't dropped prematurely and that the task don't run ad
@@ -271,6 +342,6 @@ pub(crate) async fn spawn_tributary<P: P2p>(
set.set,
tributary,
scan_tributary_task,
vec![provide_cosign_cosigned_transactions_task],
vec![provide_cosign_cosigned_transactions_task, sign_slash_report_task],
));
}