Don't add transactions for topics which have yet to be recognized

This commit is contained in:
Luke Parker
2025-01-15 07:01:24 -05:00
parent 0de3fda921
commit 7ce5bdad44
4 changed files with 181 additions and 45 deletions

View File

@@ -21,11 +21,19 @@ use message_queue::{Service, Metadata, client::MessageQueue};
use serai_cosign::{Faulted, CosignIntent, Cosigning};
use serai_coordinator_substrate::{NewSetInformation, SignSlashReport};
use serai_coordinator_tributary::{Transaction, ProcessorMessages, CosignIntents, ScanTributaryTask};
use serai_coordinator_tributary::{
Topic, Transaction, ProcessorMessages, CosignIntents, RecognizedTopics, ScanTributaryTask,
};
use serai_coordinator_p2p::P2p;
use crate::{Db, TributaryTransactions};
create_db! {
Coordinator {
PublishOnRecognition: (set: ValidatorSet, topic: Topic) -> Transaction,
}
}
db_channel! {
Coordinator {
PendingCosigns: (set: ValidatorSet) -> CosignIntent,
@@ -147,6 +155,37 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
}
}
#[must_use]
async fn add_signed_unsigned_transaction<TD: DbTrait, P: P2p>(
tributary: &Tributary<TD, Transaction, P>,
tx: &Transaction,
) -> bool {
let res = tributary.add_transaction(tx.clone()).await;
match &res {
// Fresh publication, already published
Ok(true | false) => {}
// InvalidNonce may be out-of-order TXs, not invalid ones, but we only create nonce #n+1 after
// on-chain inclusion of the TX with nonce #n, so it is invalid within our context
Err(
TransactionError::TooLargeTransaction |
TransactionError::InvalidSigner |
TransactionError::InvalidNonce |
TransactionError::InvalidSignature |
TransactionError::InvalidContent,
) => {
panic!("created an invalid transaction, tx: {tx:?}, err: {res:?}");
}
// We've published too many transactions recently
Err(TransactionError::TooManyInMempool) => {
return false;
}
// This isn't a Provided transaction so this should never be hit
Err(TransactionError::ProvidedAddedToMempool) => unreachable!(),
}
true
}
/// Adds all of the transactions sent via `TributaryTransactions`.
pub(crate) struct AddTributaryTransactionsTask<CD: DbTrait, TD: DbTrait, P: P2p> {
db: CD,
@@ -161,6 +200,8 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for AddTributaryTransactio
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut made_progress = false;
// Provide/add all transactions sent our way
loop {
let mut txn = self.db.txn();
let Some(mut tx) = TributaryTransactions::try_recv(&mut txn, self.set) else { break };
@@ -174,29 +215,27 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for AddTributaryTransactio
tx.sign(&mut OsRng, self.tributary.genesis(), &self.key);
}
// Actually add the transaction
// TODO: If this is a preprocess, make sure the topic has been recognized
let res = self.tributary.add_transaction(tx.clone()).await;
match &res {
// Fresh publication, already published
Ok(true | false) => {}
Err(
TransactionError::TooLargeTransaction |
TransactionError::InvalidSigner |
TransactionError::InvalidNonce |
TransactionError::InvalidSignature |
TransactionError::InvalidContent,
) => {
panic!("created an invalid transaction, tx: {tx:?}, err: {res:?}");
}
// We've published too many transactions recently
// Drop this txn to try to publish it again later on a future iteration
Err(TransactionError::TooManyInMempool) => {
drop(txn);
// If this is a transaction with signing data, check the topic is recognized before
// publishing
let topic = tx.topic();
let still_requires_recognition = if let Some(topic) = topic {
(topic.requires_recognition() &&
(!RecognizedTopics::recognized(&self.tributary_db, self.set, topic)))
.then_some(topic)
} else {
None
};
if let Some(topic) = still_requires_recognition {
// Queue the transaction until the topic is recognized
// We use the Tributary DB for this so it's cleaned up when the Tributary DB is
let mut txn = self.tributary_db.txn();
PublishOnRecognition::set(&mut txn, self.set, topic, &tx);
txn.commit();
} else {
// Actually add the transaction
if !add_signed_unsigned_transaction(&self.tributary, &tx).await {
break;
}
// This isn't a Provided transaction so this should never be hit
Err(TransactionError::ProvidedAddedToMempool) => unreachable!(),
}
}
}
@@ -204,6 +243,25 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for AddTributaryTransactio
made_progress = true;
txn.commit();
}
// Provide/add all transactions due to newly recognized topics
loop {
let mut txn = self.tributary_db.txn();
let Some(topic) =
RecognizedTopics::try_recv_topic_requiring_recognition(&mut txn, self.set)
else {
break;
};
if let Some(tx) = PublishOnRecognition::take(&mut txn, self.set, topic) {
if !add_signed_unsigned_transaction(&self.tributary, &tx).await {
break;
}
}
made_progress = true;
txn.commit();
}
Ok(made_progress)
}
}