Have processor-messages use CosignIntent/SignedCosign, not the historic cosign format

Has yet to update the processor accordingly.
This commit is contained in:
Luke Parker
2025-01-12 05:52:33 -05:00
parent 158140c3a7
commit e7de5125a2
8 changed files with 115 additions and 70 deletions

View File

@@ -8,7 +8,7 @@ use serai_client::{
validator_sets::primitives::{Session, ValidatorSet},
};
use serai_cosign::CosignIntent;
use serai_cosign::SignedCosign;
use serai_coordinator_substrate::NewSetInformation;
@@ -66,14 +66,24 @@ pub(crate) fn prune_tributary_db(set: ValidatorSet) {
create_db! {
Coordinator {
// The currently active Tributaries
ActiveTributaries: () -> Vec<NewSetInformation>,
// The latest Tributary to have been retired for a network
// Since Tributaries are retired sequentially, this is informative to if any Tributary has been
// retired
RetiredTributary: (network: NetworkId) -> Session,
// The last handled message from a Processor
LastProcessorMessage: (network: NetworkId) -> u64,
// Cosigns we produced and tried to intake yet incurred an error while doing so
ErroneousCosigns: () -> Vec<SignedCosign>,
}
}
db_channel! {
Coordinator {
// Tributaries to clean up upon reboot
TributaryCleanup: () -> ValidatorSet,
PendingCosigns: (set: ValidatorSet) -> CosignIntent,
// Cosigns we produced
SignedCosigns: () -> SignedCosign,
}
}

View File

@@ -8,7 +8,7 @@ use ciphersuite::{Ciphersuite, Ristretto};
use tokio::sync::mpsc;
use serai_db::{DbTxn, Db as DbTrait};
use serai_db::{Get, DbTxn, Db as DbTrait, create_db, db_channel};
use scale::Encode;
use serai_client::validator_sets::primitives::ValidatorSet;
@@ -19,16 +19,23 @@ use serai_task::{Task, TaskHandle, ContinuallyRan};
use message_queue::{Service, Metadata, client::MessageQueue};
use serai_cosign::Cosigning;
use serai_cosign::{Faulted, CosignIntent, Cosigning};
use serai_coordinator_substrate::{NewSetInformation, SignSlashReport};
use serai_coordinator_tributary::{Transaction, ProcessorMessages, ScanTributaryTask};
use serai_coordinator_tributary::{Transaction, ProcessorMessages, CosignIntents, ScanTributaryTask};
use serai_coordinator_p2p::P2p;
use crate::Db;
db_channel! {
Coordinator {
PendingCosigns: (set: ValidatorSet) -> CosignIntent,
}
}
/// Provides Cosign/Cosigned Transactions onto the Tributary.
pub(crate) struct ProvideCosignCosignedTransactionsTask<CD: DbTrait, TD: DbTrait, P: P2p> {
db: CD,
tributary_db: TD,
set: NewSetInformation,
tributary: Tributary<TD, Transaction, P>,
}
@@ -79,16 +86,27 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
let mut txn = self.db.txn();
// Fetch the next cosign this tributary should handle
let Some(cosign) = crate::PendingCosigns::try_recv(&mut txn, self.set.set) else { break };
let Some(cosign) = PendingCosigns::try_recv(&mut txn, self.set.set) else { break };
pending_notable_cosign = cosign.notable;
// If we (Serai) haven't cosigned this block, break as this is still pending
let Ok(latest) = Cosigning::<CD>::latest_cosigned_block_number(&txn) else { break };
let latest = match Cosigning::<CD>::latest_cosigned_block_number(&txn) {
Ok(latest) => latest,
Err(Faulted) => {
log::error!("cosigning faulted");
Err("cosigning faulted")?
}
};
if latest < cosign.block_number {
break;
}
// Because we've cosigned it, provide the TX for that
{
let mut txn = self.tributary_db.txn();
CosignIntents::provide(&mut txn, self.set.set, &cosign);
txn.commit();
}
provide_transaction(
self.set.set,
&self.tributary,
@@ -109,7 +127,7 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
// intended_cosigns will only yield up to and including the next notable cosign
for cosign in Cosigning::<CD>::intended_cosigns(&mut txn, self.set.set) {
// Flag this cosign as pending
crate::PendingCosigns::send(&mut txn, self.set.set, &cosign);
PendingCosigns::send(&mut txn, self.set.set, &cosign);
// Provide the transaction to queue it for work
provide_transaction(
self.set.set,
@@ -293,6 +311,7 @@ pub(crate) async fn spawn_tributary<P: P2p>(
tokio::spawn(
(ProvideCosignCosignedTransactionsTask {
db: db.clone(),
tributary_db: tributary_db.clone(),
set: set.clone(),
tributary: tributary.clone(),
})
@@ -313,7 +332,7 @@ pub(crate) async fn spawn_tributary<P: P2p>(
// Spawn the scan task
let (scan_tributary_task_def, scan_tributary_task) = Task::new();
tokio::spawn(
ScanTributaryTask::<_, _, P>::new(db.clone(), tributary_db.clone(), &set, reader)
ScanTributaryTask::<_, P>::new(tributary_db.clone(), &set, reader)
// This is the only handle for this TributaryProcessorMessagesTask, so when this task is
// dropped, it will be too
.continually_run(scan_tributary_task_def, vec![scan_tributary_messages_task]),