Add tasks to publish data onto Serai

This commit is contained in:
Luke Parker
2025-01-14 01:58:26 -05:00
parent b5a6b0693e
commit 5e0e91c85d
11 changed files with 303 additions and 38 deletions

View File

@@ -6,8 +6,10 @@ use scale::{Encode, Decode};
use borsh::{io, BorshSerialize, BorshDeserialize};
use serai_client::{
primitives::{PublicKey, NetworkId},
validator_sets::primitives::ValidatorSet,
primitives::{NetworkId, PublicKey, Signature, SeraiAddress},
validator_sets::primitives::{Session, ValidatorSet, KeyPair},
in_instructions::primitives::SignedBatch,
Transaction,
};
use serai_db::*;
@@ -17,6 +19,13 @@ pub use canonical::CanonicalEventStream;
mod ephemeral;
pub use ephemeral::EphemeralEventStream;
mod set_keys;
pub use set_keys::SetKeysTask;
mod publish_batch;
pub use publish_batch::PublishBatchTask;
mod publish_slash_report;
pub use publish_slash_report::PublishSlashReportTask;
fn borsh_serialize_validators<W: io::Write>(
validators: &Vec<(PublicKey, u16)>,
writer: &mut W,
@@ -53,11 +62,7 @@ pub struct NewSetInformation {
}
mod _public_db {
use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet};
use serai_db::*;
use crate::NewSetInformation;
use super::*;
db_channel!(
CoordinatorSubstrate {
@@ -68,6 +73,18 @@ mod _public_db {
NewSet: () -> NewSetInformation,
// Potentially relevant sign slash report, from an ephemeral event stream
SignSlashReport: (set: ValidatorSet) -> (),
// Signed batches to publish onto the Serai network
SignedBatches: (network: NetworkId) -> SignedBatch,
}
);
create_db!(
CoordinatorSubstrate {
// Keys to set on the Serai network
Keys: (network: NetworkId) -> (Session, Vec<u8>),
// Slash reports to publish onto the Serai network
SlashReports: (network: NetworkId) -> (Session, Vec<u8>),
}
);
}
@@ -118,3 +135,94 @@ impl SignSlashReport {
_public_db::SignSlashReport::try_recv(txn, set)
}
}
/// The keys to set on Serai.
pub struct Keys;
impl Keys {
/// Set the keys to report for a validator set.
///
/// This only saves the most recent keys as only a single session is eligible to have its keys
/// reported at once.
pub fn set(
txn: &mut impl DbTxn,
set: ValidatorSet,
key_pair: KeyPair,
signature_participants: bitvec::vec::BitVec<u8, bitvec::order::Lsb0>,
signature: Signature,
) {
// If we have a more recent pair of keys, don't write this historic one
if let Some((existing_session, _)) = _public_db::Keys::get(txn, set.network) {
if existing_session.0 >= set.session.0 {
return;
}
}
let tx = serai_client::validator_sets::SeraiValidatorSets::set_keys(
set.network,
key_pair,
signature_participants,
signature,
);
_public_db::Keys::set(txn, set.network, &(set.session, tx.encode()));
}
pub(crate) fn take(txn: &mut impl DbTxn, network: NetworkId) -> Option<(Session, Transaction)> {
let (session, tx) = _public_db::Keys::take(txn, network)?;
Some((session, <_>::decode(&mut tx.as_slice()).unwrap()))
}
}
/// The signed batches to publish onto Serai.
pub struct SignedBatches;
impl SignedBatches {
/// Send a `SignedBatch` to publish onto Serai.
///
/// These will be published sequentially. Out-of-order sending risks hanging the task.
pub fn send(txn: &mut impl DbTxn, batch: &SignedBatch) {
_public_db::SignedBatches::send(txn, batch.batch.network, batch);
}
pub(crate) fn try_recv(txn: &mut impl DbTxn, network: NetworkId) -> Option<SignedBatch> {
_public_db::SignedBatches::try_recv(txn, network)
}
}
/// The slash report was invalid.
#[derive(Debug)]
pub struct InvalidSlashReport;
/// The slash reports to publish onto Serai.
pub struct SlashReports;
impl SlashReports {
/// Set the slashes to report for a validator set.
///
/// This only saves the most recent slashes as only a single session is eligible to have its
/// slashes reported at once.
///
/// Returns Err if the slashes are invalid. Returns Ok if the slashes weren't detected as
/// invalid. Slashes may be considered invalid by the Serai blockchain later even if not detected
/// as invalid here.
pub fn set(
txn: &mut impl DbTxn,
set: ValidatorSet,
slashes: Vec<(SeraiAddress, u32)>,
signature: Signature,
) -> Result<(), InvalidSlashReport> {
// If we have a more recent slash report, don't write this historic one
if let Some((existing_session, _)) = _public_db::SlashReports::get(txn, set.network) {
if existing_session.0 >= set.session.0 {
return Ok(());
}
}
let tx = serai_client::validator_sets::SeraiValidatorSets::report_slashes(
set.network,
slashes.try_into().map_err(|_| InvalidSlashReport)?,
signature,
);
_public_db::SlashReports::set(txn, set.network, &(set.session, tx.encode()));
Ok(())
}
pub(crate) fn take(txn: &mut impl DbTxn, network: NetworkId) -> Option<(Session, Transaction)> {
let (session, tx) = _public_db::SlashReports::take(txn, network)?;
Some((session, <_>::decode(&mut tx.as_slice()).unwrap()))
}
}

View File

@@ -0,0 +1,66 @@
use core::future::Future;
use std::sync::Arc;
use serai_db::{DbTxn, Db};
use serai_client::{primitives::NetworkId, SeraiError, Serai};
use serai_task::ContinuallyRan;
use crate::SignedBatches;
/// Publish `SignedBatch`s from `SignedBatches` onto Serai.
pub struct PublishBatchTask<D: Db> {
db: D,
serai: Arc<Serai>,
network: NetworkId,
}
impl<D: Db> PublishBatchTask<D> {
/// Create a task to publish `SignedBatch`s onto Serai.
///
/// Returns None if `network == NetworkId::Serai`.
// TODO: ExternalNetworkId
pub fn new(db: D, serai: Arc<Serai>, network: NetworkId) -> Option<Self> {
if network == NetworkId::Serai {
None?
};
Some(Self { db, serai, network })
}
}
impl<D: Db> ContinuallyRan for PublishBatchTask<D> {
type Error = SeraiError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut made_progress = false;
loop {
let mut txn = self.db.txn();
let Some(batch) = SignedBatches::try_recv(&mut txn, self.network) else {
// No batch to publish at this time
break;
};
// Publish this Batch if it hasn't already been published
let serai = self.serai.as_of_latest_finalized_block().await?;
let last_batch = serai.in_instructions().last_batch_for_network(self.network).await?;
if last_batch < Some(batch.batch.id) {
// This stream of Batches *should* be sequential within the larger context of the Serai
// coordinator. In this library, we use a more relaxed definition and don't assert
// sequence. This does risk hanging the task, if Batch #n+1 is sent before Batch #n, but
// that is a documented fault of the `SignedBatches` API.
self
.serai
.publish(&serai_client::in_instructions::SeraiInInstructions::execute_batch(batch))
.await?;
}
txn.commit();
made_progress = true;
}
Ok(made_progress)
}
}
}

View File

@@ -0,0 +1,89 @@
use core::future::Future;
use std::sync::Arc;
use serai_db::{DbTxn, Db};
use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, Serai};
use serai_task::ContinuallyRan;
use crate::SlashReports;
/// Publish slash reports from `SlashReports` onto Serai.
pub struct PublishSlashReportTask<D: Db> {
db: D,
serai: Arc<Serai>,
}
impl<D: Db> PublishSlashReportTask<D> {
/// Create a task to publish slash reports onto Serai.
pub fn new(db: D, serai: Arc<Serai>) -> Self {
Self { db, serai }
}
}
impl<D: Db> ContinuallyRan for PublishSlashReportTask<D> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut made_progress = false;
for network in serai_client::primitives::NETWORKS {
if network == NetworkId::Serai {
continue;
};
let mut txn = self.db.txn();
let Some((session, slash_report)) = SlashReports::take(&mut txn, network) else {
// No slash report to publish
continue;
};
let serai =
self.serai.as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?;
let serai = serai.validator_sets();
let session_after_slash_report = Session(session.0 + 1);
let current_session = serai.session(network).await.map_err(|e| format!("{e:?}"))?;
let current_session = current_session.map(|session| session.0);
// Only attempt to publish the slash report for session #n while session #n+1 is still
// active
let session_after_slash_report_retired =
current_session > Some(session_after_slash_report.0);
if session_after_slash_report_retired {
// Commit the txn to drain this slash report from the database and not try it again later
txn.commit();
continue;
}
if Some(session_after_slash_report.0) != current_session {
// We already checked the current session wasn't greater, and they're not equal
assert!(current_session < Some(session_after_slash_report.0));
// This would mean the Serai node is resyncing and is behind where it prior was
Err("have a slash report for a session Serai has yet to retire".to_string())?;
}
// If this session which should publish a slash report already has, move on
let key_pending_slash_report =
serai.key_pending_slash_report(network).await.map_err(|e| format!("{e:?}"))?;
if key_pending_slash_report.is_none() {
txn.commit();
continue;
};
match self.serai.publish(&slash_report).await {
Ok(()) => {
txn.commit();
made_progress = true;
}
// This could be specific to this TX (such as an already in mempool error) and it may be
// worthwhile to continue iteration with the other pending slash reports. We assume this
// error ephemeral and that the latency incurred for this ephemeral error to resolve is
// miniscule compared to the window available to publish the slash report. That makes
// this a non-issue.
Err(e) => Err(format!("couldn't publish slash report transaction: {e:?}"))?,
}
}
Ok(made_progress)
}
}
}

View File

@@ -0,0 +1,88 @@
use core::future::Future;
use std::sync::Arc;
use serai_db::{DbTxn, Db};
use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet, Serai};
use serai_task::ContinuallyRan;
use crate::Keys;
/// Set keys from `Keys` on Serai.
pub struct SetKeysTask<D: Db> {
db: D,
serai: Arc<Serai>,
}
impl<D: Db> SetKeysTask<D> {
/// Create a task to publish slash reports onto Serai.
pub fn new(db: D, serai: Arc<Serai>) -> Self {
Self { db, serai }
}
}
impl<D: Db> ContinuallyRan for SetKeysTask<D> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut made_progress = false;
for network in serai_client::primitives::NETWORKS {
if network == NetworkId::Serai {
continue;
};
let mut txn = self.db.txn();
let Some((session, keys)) = Keys::take(&mut txn, network) else {
// No keys to set
continue;
};
let serai =
self.serai.as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?;
let serai = serai.validator_sets();
let current_session = serai.session(network).await.map_err(|e| format!("{e:?}"))?;
let current_session = current_session.map(|session| session.0);
// Only attempt to set these keys if this isn't a retired session
if Some(session.0) < current_session {
// Commit the txn to take these keys from the database and not try it again later
txn.commit();
continue;
}
if Some(session.0) != current_session {
// We already checked the current session wasn't greater, and they're not equal
assert!(current_session < Some(session.0));
// This would mean the Serai node is resyncing and is behind where it prior was
Err("have a keys for a session Serai has yet to start".to_string())?;
}
// If this session already has had its keys set, move on
if serai
.keys(ValidatorSet { network, session })
.await
.map_err(|e| format!("{e:?}"))?
.is_some()
{
txn.commit();
continue;
};
match self.serai.publish(&keys).await {
Ok(()) => {
txn.commit();
made_progress = true;
}
// This could be specific to this TX (such as an already in mempool error) and it may be
// worthwhile to continue iteration with the other pending slash reports. We assume this
// error ephemeral and that the latency incurred for this ephemeral error to resolve is
// miniscule compared to the window reasonable to set the keys. That makes this a
// non-issue.
Err(e) => Err(format!("couldn't publish set keys transaction: {e:?}"))?,
}
}
Ok(made_progress)
}
}
}