8 Commits

Author SHA1 Message Date
Luke Parker
ce3b90541e Make transactions undroppable
coordinator/cosign/src/delay.rs literally demonstrates how we'd need to rewrite
our handling of transactions with this change. It can be cleaned up a bit but
already identifies ergonomic issues. It also doesn't model passing an &mut txn
to an async function, which would also require using the droppable wrapper
struct.

To locally see this build, run

RUSTFLAGS="-Zpanic_abort_tests -C panic=abort" cargo +nightly build -p serai-cosign --all-targets

To locally see this fail to build, run

cargo build -p serai-cosign --all-targets

While it doesn't say which line causes it fail to build, the only distinction
is panic=unwind.

For more context, please see #578.
2025-01-15 03:56:59 -05:00
Luke Parker
cb410cc4e0 Correct how we handle rounding errors within the penalty fn
We explicitly no longer slash stakes but we still set the maximum slash to the
allocated stake + the rewards. Now, the reward slash is bound to the rewards
and the stake slash is bound to the stake. This prevents an improperly rounded
reward slash from effecting a stake slash.
2025-01-15 02:46:31 -05:00
Luke Parker
6c145a5ec3 Disable offline, disruptive slashes
Reasoning commented in codebase
2025-01-14 11:44:13 -05:00
Luke Parker
a7fef2ba7a Redesign Slash/SlashReport types with a function to calculate the penalty 2025-01-14 07:51:39 -05:00
Luke Parker
291ebf5e24 Have serai-task warnings print with the name of the task 2025-01-14 02:52:26 -05:00
Luke Parker
5e0e91c85d Add tasks to publish data onto Serai 2025-01-14 01:58:26 -05:00
Luke Parker
b5a6b0693e Add a proper error type to ContinuallyRan
This isn't necessary. Because we just log the error, we never match off of it,
we don't need any structure beyond String (or now Debug, which still gives us
a way to print the error). This is for the ergonomics of not having to
constantly write `.map_err(|e| format!("{e:?}"))`.
2025-01-12 18:29:08 -05:00
Luke Parker
3cc2abfedc Add a task to publish slash reports 2025-01-12 17:47:48 -05:00
49 changed files with 977 additions and 132 deletions

1
Cargo.lock generated
View File

@@ -8385,6 +8385,7 @@ dependencies = [
name = "serai-coordinator-substrate"
version = "0.1.0"
dependencies = [
"bitvec",
"borsh",
"futures",
"log",

View File

@@ -30,13 +30,53 @@ pub trait Get {
/// is undefined. The transaction may block, deadlock, panic, overwrite one of the two values
/// randomly, or any other action, at time of write or at time of commit.
#[must_use]
pub trait DbTxn: Send + Get {
pub trait DbTxn: Sized + Send + Get {
/// Write a value to this key.
fn put(&mut self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>);
/// Delete the value from this key.
fn del(&mut self, key: impl AsRef<[u8]>);
/// Commit this transaction.
fn commit(self);
/// Close this transaction.
///
/// This is equivalent to `Drop` on transactions which can be dropped. This is explicit and works
/// with transactions which can't be dropped.
fn close(self) {
drop(self);
}
}
// Credit for the idea goes to https://jack.wrenn.fyi/blog/undroppable
pub struct Undroppable<T>(Option<T>);
impl<T> Drop for Undroppable<T> {
fn drop(&mut self) {
// Use an assertion at compile time to prevent this code from compiling if generated
#[allow(clippy::assertions_on_constants)]
const {
assert!(false, "Undroppable DbTxn was dropped. Ensure all code paths call commit or close");
}
}
}
impl<T: DbTxn> Get for Undroppable<T> {
fn get(&self, key: impl AsRef<[u8]>) -> Option<Vec<u8>> {
self.0.as_ref().unwrap().get(key)
}
}
impl<T: DbTxn> DbTxn for Undroppable<T> {
fn put(&mut self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) {
self.0.as_mut().unwrap().put(key, value);
}
fn del(&mut self, key: impl AsRef<[u8]>) {
self.0.as_mut().unwrap().del(key);
}
fn commit(mut self) {
self.0.take().unwrap().commit();
let _ = core::mem::ManuallyDrop::new(self);
}
fn close(mut self) {
drop(self.0.take().unwrap());
let _ = core::mem::ManuallyDrop::new(self);
}
}
/// A database supporting atomic transaction.
@@ -51,6 +91,10 @@ pub trait Db: 'static + Send + Sync + Clone + Get {
let dst_len = u8::try_from(item_dst.len()).unwrap();
[[db_len].as_ref(), db_dst, [dst_len].as_ref(), item_dst, key.as_ref()].concat()
}
/// Open a new transaction.
fn txn(&mut self) -> Self::Transaction<'_>;
/// Open a new transaction which may be dropped.
fn unsafe_txn(&mut self) -> Self::Transaction<'_>;
/// Open a new transaction which must be committed or closed.
fn txn(&mut self) -> Undroppable<Self::Transaction<'_>> {
Undroppable(Some(self.unsafe_txn()))
}
}

View File

@@ -74,7 +74,7 @@ impl Get for MemDb {
}
impl Db for MemDb {
type Transaction<'a> = MemDbTxn<'a>;
fn txn(&mut self) -> MemDbTxn<'_> {
fn unsafe_txn(&mut self) -> MemDbTxn<'_> {
MemDbTxn(self, HashMap::new(), HashSet::new())
}
}

View File

@@ -37,7 +37,7 @@ impl Get for Arc<ParityDb> {
}
impl Db for Arc<ParityDb> {
type Transaction<'a> = Transaction<'a>;
fn txn(&mut self) -> Self::Transaction<'_> {
fn unsafe_txn(&mut self) -> Self::Transaction<'_> {
Transaction(self, vec![])
}
}

View File

@@ -39,7 +39,7 @@ impl<T: ThreadMode> Get for Arc<OptimisticTransactionDB<T>> {
}
impl<T: Send + ThreadMode + 'static> Db for Arc<OptimisticTransactionDB<T>> {
type Transaction<'a> = Transaction<'a, T>;
fn txn(&mut self) -> Self::Transaction<'_> {
fn unsafe_txn(&mut self) -> Self::Transaction<'_> {
let mut opts = WriteOptions::default();
opts.set_sync(true);
Transaction(self.transaction_opt(&opts, &Default::default()), &**self)

View File

@@ -2,10 +2,16 @@
#![doc = include_str!("../README.md")]
#![deny(missing_docs)]
use core::{future::Future, time::Duration};
use core::{
fmt::{self, Debug},
future::Future,
time::Duration,
};
use tokio::sync::mpsc;
mod type_name;
/// A handle for a task.
///
/// The task will only stop running once all handles for it are dropped.
@@ -45,8 +51,6 @@ impl Task {
impl TaskHandle {
/// Tell the task to run now (and not whenever its next iteration on a timer is).
///
/// Panics if the task has been dropped.
pub fn run_now(&self) {
#[allow(clippy::match_same_arms)]
match self.run_now.try_send(()) {
@@ -54,12 +58,22 @@ impl TaskHandle {
// NOP on full, as this task will already be ran as soon as possible
Err(mpsc::error::TrySendError::Full(())) => {}
Err(mpsc::error::TrySendError::Closed(())) => {
// The task should only be closed if all handles are dropped, and this one hasn't been
panic!("task was unexpectedly closed when calling run_now")
}
}
}
}
/// An enum which can't be constructed, representing that the task does not error.
pub enum DoesNotError {}
impl Debug for DoesNotError {
fn fmt(&self, _: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
// This type can't be constructed so we'll never have a `&self` to call this fn with
unreachable!()
}
}
/// A task to be continually ran.
pub trait ContinuallyRan: Sized + Send {
/// The amount of seconds before this task should be polled again.
@@ -69,11 +83,14 @@ pub trait ContinuallyRan: Sized + Send {
/// Upon error, the amount of time waited will be linearly increased until this limit.
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 120;
/// The error potentially yielded upon running an iteration of this task.
type Error: Debug;
/// Run an iteration of the task.
///
/// If this returns `true`, all dependents of the task will immediately have a new iteration ran
/// (without waiting for whatever timer they were already on).
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>>;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>>;
/// Continually run the task.
fn continually_run(
@@ -115,12 +132,20 @@ pub trait ContinuallyRan: Sized + Send {
}
}
Err(e) => {
log::warn!("{}", e);
// Get the type name
let type_name = type_name::strip_type_name(core::any::type_name::<Self>());
// Print the error as a warning, prefixed by the task's type
log::warn!("{type_name}: {e:?}");
increase_sleep_before_next_task(&mut current_sleep_before_next_task);
}
}
// Don't run the task again for another few seconds UNLESS told to run now
/*
We could replace tokio::mpsc with async_channel, tokio::time::sleep with
patchable_async_sleep::sleep, and tokio::select with futures_lite::future::or
It isn't worth the effort when patchable_async_sleep::sleep will still resolve to tokio
*/
tokio::select! {
() = tokio::time::sleep(Duration::from_secs(current_sleep_before_next_task)) => {},
msg = task.run_now.recv() => {

View File

@@ -0,0 +1,31 @@
/// Strip the modules from a type name.
// This may be of the form `a::b::C`, in which case we only want `C`
pub(crate) fn strip_type_name(full_type_name: &'static str) -> String {
// It also may be `a::b::C<d::e::F>`, in which case, we only attempt to strip `a::b`
let mut by_generics = full_type_name.split('<');
// Strip to just `C`
let full_outer_object_name = by_generics.next().unwrap();
let mut outer_object_name_parts = full_outer_object_name.split("::");
let mut last_part_in_outer_object_name = outer_object_name_parts.next().unwrap();
for part in outer_object_name_parts {
last_part_in_outer_object_name = part;
}
// Push back on the generic terms
let mut type_name = last_part_in_outer_object_name.to_string();
for generic in by_generics {
type_name.push('<');
type_name.push_str(generic);
}
type_name
}
#[test]
fn test_strip_type_name() {
assert_eq!(strip_type_name("core::option::Option"), "Option");
assert_eq!(
strip_type_name("core::option::Option<alloc::string::String>"),
"Option<alloc::string::String>"
);
}

View File

@@ -30,7 +30,7 @@ schnorr = { package = "schnorr-signatures", path = "../crypto/schnorr", default-
frost = { package = "modular-frost", path = "../crypto/frost" }
frost-schnorrkel = { path = "../crypto/schnorrkel" }
scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive"] }
scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive", "bit-vec"] }
zalloc = { path = "../common/zalloc" }
serai-db = { path = "../common/db" }

View File

@@ -2,7 +2,7 @@ use core::future::Future;
use std::time::{Duration, SystemTime};
use serai_db::*;
use serai_task::ContinuallyRan;
use serai_task::{DoesNotError, ContinuallyRan};
use crate::evaluator::CosignedBlocks;
@@ -24,8 +24,19 @@ pub(crate) struct CosignDelayTask<D: Db> {
pub(crate) db: D,
}
struct AwaitUndroppable<T: DbTxn>(Option<core::mem::ManuallyDrop<Undroppable<T>>>);
impl<T: DbTxn> Drop for AwaitUndroppable<T> {
fn drop(&mut self) {
if let Some(mut txn) = self.0.take() {
(unsafe { core::mem::ManuallyDrop::take(&mut txn) }).close();
}
}
}
impl<D: Db> ContinuallyRan for CosignDelayTask<D> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = DoesNotError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut made_progress = false;
loop {
@@ -33,14 +44,18 @@ impl<D: Db> ContinuallyRan for CosignDelayTask<D> {
// Receive the next block to mark as cosigned
let Some((block_number, time_evaluated)) = CosignedBlocks::try_recv(&mut txn) else {
txn.close();
break;
};
// Calculate when we should mark it as valid
let time_valid =
SystemTime::UNIX_EPOCH + Duration::from_secs(time_evaluated) + ACKNOWLEDGEMENT_DELAY;
// Sleep until then
let mut txn = AwaitUndroppable(Some(core::mem::ManuallyDrop::new(txn)));
tokio::time::sleep(SystemTime::now().duration_since(time_valid).unwrap_or(Duration::ZERO))
.await;
let mut txn = core::mem::ManuallyDrop::into_inner(txn.0.take().unwrap());
// Set the cosigned block
LatestCosignedBlockNumber::set(&mut txn, &block_number);

View File

@@ -80,12 +80,14 @@ pub(crate) struct CosignEvaluatorTask<D: Db, R: RequestNotableCosigns> {
}
impl<D: Db, R: RequestNotableCosigns> ContinuallyRan for CosignEvaluatorTask<D, R> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut known_cosign = None;
let mut made_progress = false;
loop {
let mut txn = self.db.txn();
let mut txn = self.db.unsafe_txn();
let Some(BlockEventData { block_number, has_events }) = BlockEvents::try_recv(&mut txn)
else {
break;

View File

@@ -61,14 +61,16 @@ pub(crate) struct CosignIntendTask<D: Db> {
}
impl<D: Db> ContinuallyRan for CosignIntendTask<D> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let start_block_number = ScanCosignFrom::get(&self.db).unwrap_or(1);
let latest_block_number =
self.serai.latest_finalized_block().await.map_err(|e| format!("{e:?}"))?.number();
for block_number in start_block_number ..= latest_block_number {
let mut txn = self.db.txn();
let mut txn = self.db.unsafe_txn();
let (block, mut has_events) =
block_has_events_justifying_a_cosign(&self.serai, block_number)

View File

@@ -424,7 +424,7 @@ impl<D: Db> Cosigning<D> {
// Since we verified this cosign's signature, and have a chain sufficiently long, handle the
// cosign
let mut txn = self.db.txn();
let mut txn = self.db.unsafe_txn();
if !faulty {
// If this is for a future global session, we don't acknowledge this cosign at this time
@@ -480,3 +480,30 @@ impl<D: Db> Cosigning<D> {
res
}
}
mod tests {
use super::*;
struct RNC;
impl RequestNotableCosigns for RNC {
/// The error type which may be encountered when requesting notable cosigns.
type Error = ();
/// Request the notable cosigns for this global session.
fn request_notable_cosigns(
&self,
global_session: [u8; 32],
) -> impl Send + Future<Output = Result<(), Self::Error>> {
async move { Ok(()) }
}
}
#[tokio::test]
async fn test() {
let db: serai_db::MemDb = serai_db::MemDb::new();
let serai = unsafe { core::mem::transmute(0u64) };
let request = RNC;
let tasks = vec![];
let _ = Cosigning::spawn(db, serai, request, tasks);
core::future::pending().await
}
}

View File

@@ -5,7 +5,7 @@ use rand_core::{RngCore, OsRng};
use tokio::sync::mpsc;
use serai_client::Serai;
use serai_client::{SeraiError, Serai};
use libp2p::{
core::multiaddr::{Protocol, Multiaddr},
@@ -50,7 +50,9 @@ impl ContinuallyRan for DialTask {
const DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60;
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 10 * 60;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = SeraiError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
self.validators.update().await?;
@@ -83,8 +85,7 @@ impl ContinuallyRan for DialTask {
.unwrap_or(0)
.saturating_sub(1))
{
let mut potential_peers =
self.serai.p2p_validators(network).await.map_err(|e| format!("{e:?}"))?;
let mut potential_peers = self.serai.p2p_validators(network).await?;
for _ in 0 .. (TARGET_PEERS_PER_NETWORK - peer_count) {
if potential_peers.is_empty() {
break;

View File

@@ -4,7 +4,7 @@ use std::{
collections::{HashSet, HashMap},
};
use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, Serai};
use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, SeraiError, Serai};
use serai_task::{Task, ContinuallyRan};
@@ -50,9 +50,8 @@ impl Validators {
async fn session_changes(
serai: impl Borrow<Serai>,
sessions: impl Borrow<HashMap<NetworkId, Session>>,
) -> Result<Vec<(NetworkId, Session, HashSet<PeerId>)>, String> {
let temporal_serai =
serai.borrow().as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?;
) -> Result<Vec<(NetworkId, Session, HashSet<PeerId>)>, SeraiError> {
let temporal_serai = serai.borrow().as_of_latest_finalized_block().await?;
let temporal_serai = temporal_serai.validator_sets();
let mut session_changes = vec![];
@@ -69,7 +68,7 @@ impl Validators {
let session = match temporal_serai.session(network).await {
Ok(Some(session)) => session,
Ok(None) => return Ok(None),
Err(e) => return Err(format!("{e:?}")),
Err(e) => return Err(e),
};
if sessions.get(&network) == Some(&session) {
@@ -81,7 +80,7 @@ impl Validators {
session,
validators.into_iter().map(peer_id_from_public).collect(),
))),
Err(e) => Err(format!("{e:?}")),
Err(e) => Err(e),
}
}
});
@@ -147,7 +146,7 @@ impl Validators {
}
/// Update the view of the validators.
pub(crate) async fn update(&mut self) -> Result<(), String> {
pub(crate) async fn update(&mut self) -> Result<(), SeraiError> {
let session_changes = Self::session_changes(&*self.serai, &self.sessions).await?;
self.incorporate_session_changes(session_changes);
Ok(())
@@ -200,13 +199,13 @@ impl ContinuallyRan for UpdateValidatorsTask {
const DELAY_BETWEEN_ITERATIONS: u64 = 60;
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = SeraiError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let session_changes = {
let validators = self.validators.read().await;
Validators::session_changes(validators.serai.clone(), validators.sessions.clone())
.await
.map_err(|e| format!("{e:?}"))?
Validators::session_changes(validators.serai.clone(), validators.sessions.clone()).await?
};
self.validators.write().await.incorporate_session_changes(session_changes);
Ok(true)

View File

@@ -45,7 +45,9 @@ pub(crate) struct HeartbeatTask<TD: Db, Tx: TransactionTrait, P: P2p> {
}
impl<TD: Db, Tx: TransactionTrait, P: P2p> ContinuallyRan for HeartbeatTask<TD, Tx, P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
// If our blockchain hasn't had a block in the past minute, trigger the heartbeat protocol
const TIME_TO_TRIGGER_SYNCING: Duration = Duration::from_secs(60);

View File

@@ -225,10 +225,10 @@ async fn handle_processor_messages(
SignedCosigns::send(&mut txn, &cosign);
}
messages::coordinator::ProcessorMessage::SignedBatch { batch } => {
todo!("TODO Save to DB, have task read from DB and publish to Serai")
todo!("TODO PublishBatchTask")
}
messages::coordinator::ProcessorMessage::SignedSlashReport { session, signature } => {
todo!("TODO Save to DB, have task read from DB and publish to Serai")
todo!("TODO PublishSlashReportTask")
}
},
messages::ProcessorMessage::Substrate(msg) => match msg {

View File

@@ -32,7 +32,8 @@ pub(crate) struct SubstrateTask<P: P2p> {
}
impl<P: P2p> ContinuallyRan for SubstrateTask<P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = String; // TODO
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut made_progress = false;

View File

@@ -15,7 +15,7 @@ use serai_client::validator_sets::primitives::ValidatorSet;
use tributary_sdk::{TransactionKind, TransactionError, ProvidedError, TransactionTrait, Tributary};
use serai_task::{Task, TaskHandle, ContinuallyRan};
use serai_task::{Task, TaskHandle, DoesNotError, ContinuallyRan};
use message_queue::{Service, Metadata, client::MessageQueue};
@@ -76,7 +76,9 @@ pub(crate) struct ProvideCosignCosignedTransactionsTask<CD: DbTrait, TD: DbTrait
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
for ProvideCosignCosignedTransactionsTask<CD, TD, P>
{
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut made_progress = false;
@@ -154,7 +156,9 @@ pub(crate) struct AddTributaryTransactionsTask<CD: DbTrait, TD: DbTrait, P: P2p>
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
}
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for AddTributaryTransactionsTask<CD, TD, P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = DoesNotError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut made_progress = false;
loop {
@@ -212,7 +216,9 @@ pub(crate) struct TributaryProcessorMessagesTask<TD: DbTrait> {
message_queue: Arc<MessageQueue>,
}
impl<TD: DbTrait> ContinuallyRan for TributaryProcessorMessagesTask<TD> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = String; // TODO
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut made_progress = false;
loop {
@@ -242,7 +248,9 @@ pub(crate) struct SignSlashReportTask<CD: DbTrait, TD: DbTrait, P: P2p> {
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
}
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for SignSlashReportTask<CD, TD, P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = DoesNotError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut txn = self.db.txn();
let Some(()) = SignSlashReport::try_recv(&mut txn, self.set.set) else { return Ok(false) };

View File

@@ -18,7 +18,9 @@ rustdoc-args = ["--cfg", "docsrs"]
workspace = true
[dependencies]
scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive"] }
bitvec = { version = "1", default-features = false, features = ["std"] }
scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive", "bit-vec"] }
borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] }
serai-client = { path = "../../substrate/client", version = "0.1", default-features = false, features = ["serai", "borsh"] }

View File

@@ -1,6 +1,6 @@
# Serai Coordinate Substrate Scanner
# Serai Coordinator Substrate
This is the scanner of the Serai blockchain for the purposes of Serai's coordinator.
This crate manages the Serai coordinators's interactions with Serai's Substrate blockchain.
Two event streams are defined:
@@ -12,3 +12,9 @@ Two event streams are defined:
The canonical event stream is available without provision of a validator's public key. The ephemeral
event stream requires provision of a validator's public key. Both are ordered within themselves, yet
there are no ordering guarantees across the two.
Additionally, a collection of tasks are defined to publish data onto Serai:
- `SetKeysTask`, which sets the keys generated via DKGs onto Serai.
- `PublishBatchTask`, which publishes `Batch`s onto Serai.
- `PublishSlashReportTask`, which publishes `SlashReport`s onto Serai.

View File

@@ -34,7 +34,9 @@ impl<D: Db> CanonicalEventStream<D> {
}
impl<D: Db> ContinuallyRan for CanonicalEventStream<D> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let next_block = NextBlock::get(&self.db).unwrap_or(0);
let latest_finalized_block =

View File

@@ -39,7 +39,9 @@ impl<D: Db> EphemeralEventStream<D> {
}
impl<D: Db> ContinuallyRan for EphemeralEventStream<D> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let next_block = NextBlock::get(&self.db).unwrap_or(0);
let latest_finalized_block =
@@ -157,8 +159,9 @@ impl<D: Db> ContinuallyRan for EphemeralEventStream<D> {
Err("validator's weight exceeded u16::MAX".to_string())?
};
// Do the summation in u32 so we don't risk a u16 overflow
let total_weight = validators.iter().map(|(_, weight)| u32::from(*weight)).sum::<u32>();
if total_weight > MAX_KEY_SHARES_PER_SET {
if total_weight > u32::from(MAX_KEY_SHARES_PER_SET) {
Err(format!(
"{set:?} has {total_weight} key shares when the max is {MAX_KEY_SHARES_PER_SET}"
))?;

View File

@@ -6,8 +6,10 @@ use scale::{Encode, Decode};
use borsh::{io, BorshSerialize, BorshDeserialize};
use serai_client::{
primitives::{PublicKey, NetworkId},
validator_sets::primitives::ValidatorSet,
primitives::{NetworkId, PublicKey, Signature, SeraiAddress},
validator_sets::primitives::{Session, ValidatorSet, KeyPair},
in_instructions::primitives::SignedBatch,
Transaction,
};
use serai_db::*;
@@ -17,6 +19,13 @@ pub use canonical::CanonicalEventStream;
mod ephemeral;
pub use ephemeral::EphemeralEventStream;
mod set_keys;
pub use set_keys::SetKeysTask;
mod publish_batch;
pub use publish_batch::PublishBatchTask;
mod publish_slash_report;
pub use publish_slash_report::PublishSlashReportTask;
fn borsh_serialize_validators<W: io::Write>(
validators: &Vec<(PublicKey, u16)>,
writer: &mut W,
@@ -53,11 +62,7 @@ pub struct NewSetInformation {
}
mod _public_db {
use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet};
use serai_db::*;
use crate::NewSetInformation;
use super::*;
db_channel!(
CoordinatorSubstrate {
@@ -68,6 +73,18 @@ mod _public_db {
NewSet: () -> NewSetInformation,
// Potentially relevant sign slash report, from an ephemeral event stream
SignSlashReport: (set: ValidatorSet) -> (),
// Signed batches to publish onto the Serai network
SignedBatches: (network: NetworkId) -> SignedBatch,
}
);
create_db!(
CoordinatorSubstrate {
// Keys to set on the Serai network
Keys: (network: NetworkId) -> (Session, Vec<u8>),
// Slash reports to publish onto the Serai network
SlashReports: (network: NetworkId) -> (Session, Vec<u8>),
}
);
}
@@ -118,3 +135,94 @@ impl SignSlashReport {
_public_db::SignSlashReport::try_recv(txn, set)
}
}
/// The keys to set on Serai.
pub struct Keys;
impl Keys {
/// Set the keys to report for a validator set.
///
/// This only saves the most recent keys as only a single session is eligible to have its keys
/// reported at once.
pub fn set(
txn: &mut impl DbTxn,
set: ValidatorSet,
key_pair: KeyPair,
signature_participants: bitvec::vec::BitVec<u8, bitvec::order::Lsb0>,
signature: Signature,
) {
// If we have a more recent pair of keys, don't write this historic one
if let Some((existing_session, _)) = _public_db::Keys::get(txn, set.network) {
if existing_session.0 >= set.session.0 {
return;
}
}
let tx = serai_client::validator_sets::SeraiValidatorSets::set_keys(
set.network,
key_pair,
signature_participants,
signature,
);
_public_db::Keys::set(txn, set.network, &(set.session, tx.encode()));
}
pub(crate) fn take(txn: &mut impl DbTxn, network: NetworkId) -> Option<(Session, Transaction)> {
let (session, tx) = _public_db::Keys::take(txn, network)?;
Some((session, <_>::decode(&mut tx.as_slice()).unwrap()))
}
}
/// The signed batches to publish onto Serai.
pub struct SignedBatches;
impl SignedBatches {
/// Send a `SignedBatch` to publish onto Serai.
///
/// These will be published sequentially. Out-of-order sending risks hanging the task.
pub fn send(txn: &mut impl DbTxn, batch: &SignedBatch) {
_public_db::SignedBatches::send(txn, batch.batch.network, batch);
}
pub(crate) fn try_recv(txn: &mut impl DbTxn, network: NetworkId) -> Option<SignedBatch> {
_public_db::SignedBatches::try_recv(txn, network)
}
}
/// The slash report was invalid.
#[derive(Debug)]
pub struct InvalidSlashReport;
/// The slash reports to publish onto Serai.
pub struct SlashReports;
impl SlashReports {
/// Set the slashes to report for a validator set.
///
/// This only saves the most recent slashes as only a single session is eligible to have its
/// slashes reported at once.
///
/// Returns Err if the slashes are invalid. Returns Ok if the slashes weren't detected as
/// invalid. Slashes may be considered invalid by the Serai blockchain later even if not detected
/// as invalid here.
pub fn set(
txn: &mut impl DbTxn,
set: ValidatorSet,
slashes: Vec<(SeraiAddress, u32)>,
signature: Signature,
) -> Result<(), InvalidSlashReport> {
// If we have a more recent slash report, don't write this historic one
if let Some((existing_session, _)) = _public_db::SlashReports::get(txn, set.network) {
if existing_session.0 >= set.session.0 {
return Ok(());
}
}
let tx = serai_client::validator_sets::SeraiValidatorSets::report_slashes(
set.network,
slashes.try_into().map_err(|_| InvalidSlashReport)?,
signature,
);
_public_db::SlashReports::set(txn, set.network, &(set.session, tx.encode()));
Ok(())
}
pub(crate) fn take(txn: &mut impl DbTxn, network: NetworkId) -> Option<(Session, Transaction)> {
let (session, tx) = _public_db::SlashReports::take(txn, network)?;
Some((session, <_>::decode(&mut tx.as_slice()).unwrap()))
}
}

View File

@@ -0,0 +1,66 @@
use core::future::Future;
use std::sync::Arc;
use serai_db::{DbTxn, Db};
use serai_client::{primitives::NetworkId, SeraiError, Serai};
use serai_task::ContinuallyRan;
use crate::SignedBatches;
/// Publish `SignedBatch`s from `SignedBatches` onto Serai.
pub struct PublishBatchTask<D: Db> {
db: D,
serai: Arc<Serai>,
network: NetworkId,
}
impl<D: Db> PublishBatchTask<D> {
/// Create a task to publish `SignedBatch`s onto Serai.
///
/// Returns None if `network == NetworkId::Serai`.
// TODO: ExternalNetworkId
pub fn new(db: D, serai: Arc<Serai>, network: NetworkId) -> Option<Self> {
if network == NetworkId::Serai {
None?
};
Some(Self { db, serai, network })
}
}
impl<D: Db> ContinuallyRan for PublishBatchTask<D> {
type Error = SeraiError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut made_progress = false;
loop {
let mut txn = self.db.txn();
let Some(batch) = SignedBatches::try_recv(&mut txn, self.network) else {
// No batch to publish at this time
break;
};
// Publish this Batch if it hasn't already been published
let serai = self.serai.as_of_latest_finalized_block().await?;
let last_batch = serai.in_instructions().last_batch_for_network(self.network).await?;
if last_batch < Some(batch.batch.id) {
// This stream of Batches *should* be sequential within the larger context of the Serai
// coordinator. In this library, we use a more relaxed definition and don't assert
// sequence. This does risk hanging the task, if Batch #n+1 is sent before Batch #n, but
// that is a documented fault of the `SignedBatches` API.
self
.serai
.publish(&serai_client::in_instructions::SeraiInInstructions::execute_batch(batch))
.await?;
}
txn.commit();
made_progress = true;
}
Ok(made_progress)
}
}
}

View File

@@ -0,0 +1,89 @@
use core::future::Future;
use std::sync::Arc;
use serai_db::{DbTxn, Db};
use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, Serai};
use serai_task::ContinuallyRan;
use crate::SlashReports;
/// Publish slash reports from `SlashReports` onto Serai.
pub struct PublishSlashReportTask<D: Db> {
db: D,
serai: Arc<Serai>,
}
impl<D: Db> PublishSlashReportTask<D> {
/// Create a task to publish slash reports onto Serai.
pub fn new(db: D, serai: Arc<Serai>) -> Self {
Self { db, serai }
}
}
impl<D: Db> ContinuallyRan for PublishSlashReportTask<D> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut made_progress = false;
for network in serai_client::primitives::NETWORKS {
if network == NetworkId::Serai {
continue;
};
let mut txn = self.db.txn();
let Some((session, slash_report)) = SlashReports::take(&mut txn, network) else {
// No slash report to publish
continue;
};
let serai =
self.serai.as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?;
let serai = serai.validator_sets();
let session_after_slash_report = Session(session.0 + 1);
let current_session = serai.session(network).await.map_err(|e| format!("{e:?}"))?;
let current_session = current_session.map(|session| session.0);
// Only attempt to publish the slash report for session #n while session #n+1 is still
// active
let session_after_slash_report_retired =
current_session > Some(session_after_slash_report.0);
if session_after_slash_report_retired {
// Commit the txn to drain this slash report from the database and not try it again later
txn.commit();
continue;
}
if Some(session_after_slash_report.0) != current_session {
// We already checked the current session wasn't greater, and they're not equal
assert!(current_session < Some(session_after_slash_report.0));
// This would mean the Serai node is resyncing and is behind where it prior was
Err("have a slash report for a session Serai has yet to retire".to_string())?;
}
// If this session which should publish a slash report already has, move on
let key_pending_slash_report =
serai.key_pending_slash_report(network).await.map_err(|e| format!("{e:?}"))?;
if key_pending_slash_report.is_none() {
txn.commit();
continue;
};
match self.serai.publish(&slash_report).await {
Ok(()) => {
txn.commit();
made_progress = true;
}
// This could be specific to this TX (such as an already in mempool error) and it may be
// worthwhile to continue iteration with the other pending slash reports. We assume this
// error ephemeral and that the latency incurred for this ephemeral error to resolve is
// miniscule compared to the window available to publish the slash report. That makes
// this a non-issue.
Err(e) => Err(format!("couldn't publish slash report transaction: {e:?}"))?,
}
}
Ok(made_progress)
}
}
}

View File

@@ -0,0 +1,88 @@
use core::future::Future;
use std::sync::Arc;
use serai_db::{DbTxn, Db};
use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet, Serai};
use serai_task::ContinuallyRan;
use crate::Keys;
/// Set keys from `Keys` on Serai.
pub struct SetKeysTask<D: Db> {
db: D,
serai: Arc<Serai>,
}
impl<D: Db> SetKeysTask<D> {
/// Create a task to publish slash reports onto Serai.
pub fn new(db: D, serai: Arc<Serai>) -> Self {
Self { db, serai }
}
}
impl<D: Db> ContinuallyRan for SetKeysTask<D> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut made_progress = false;
for network in serai_client::primitives::NETWORKS {
if network == NetworkId::Serai {
continue;
};
let mut txn = self.db.txn();
let Some((session, keys)) = Keys::take(&mut txn, network) else {
// No keys to set
continue;
};
let serai =
self.serai.as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?;
let serai = serai.validator_sets();
let current_session = serai.session(network).await.map_err(|e| format!("{e:?}"))?;
let current_session = current_session.map(|session| session.0);
// Only attempt to set these keys if this isn't a retired session
if Some(session.0) < current_session {
// Commit the txn to take these keys from the database and not try it again later
txn.commit();
continue;
}
if Some(session.0) != current_session {
// We already checked the current session wasn't greater, and they're not equal
assert!(current_session < Some(session.0));
// This would mean the Serai node is resyncing and is behind where it prior was
Err("have a keys for a session Serai has yet to start".to_string())?;
}
// If this session already has had its keys set, move on
if serai
.keys(ValidatorSet { network, session })
.await
.map_err(|e| format!("{e:?}"))?
.is_some()
{
txn.commit();
continue;
};
match self.serai.publish(&keys).await {
Ok(()) => {
txn.commit();
made_progress = true;
}
// This could be specific to this TX (such as an already in mempool error) and it may be
// worthwhile to continue iteration with the other pending slash reports. We assume this
// error ephemeral and that the latency incurred for this ephemeral error to resolve is
// miniscule compared to the window reasonable to set the keys. That makes this a
// non-issue.
Err(e) => Err(format!("couldn't publish set keys transaction: {e:?}"))?,
}
}
Ok(made_progress)
}
}
}

View File

@@ -206,7 +206,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
}
Transaction::DkgConfirmationShare { attempt, share, signed } => {
// Accumulate the shares into our own FROST attempt manager
todo!("TODO")
todo!("TODO: SetKeysTask")
}
Transaction::Cosign { substrate_block_hash } => {
@@ -352,8 +352,11 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
// Create the resulting slash report
let mut slash_report = vec![];
for (validator, points) in self.validators.iter().copied().zip(amortized_slash_report) {
if points != 0 {
slash_report.push(Slash { key: validator.into(), points });
// TODO: Natively store this as a `Slash`
if points == u32::MAX {
slash_report.push(Slash::Fatal);
} else {
slash_report.push(Slash::Points(points));
}
}
assert!(slash_report.len() <= f);
@@ -512,7 +515,9 @@ impl<TD: Db, P: P2p> ScanTributaryTask<TD, P> {
}
impl<TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<TD, P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let (mut last_block_number, mut last_block_hash) =
TributaryDb::last_handled_tributary_block(&self.tributary_db, self.set)

View File

@@ -301,14 +301,14 @@ impl TransactionTrait for Transaction {
Transaction::Batch { .. } => {}
Transaction::Sign { data, .. } => {
if data.len() > usize::try_from(MAX_KEY_SHARES_PER_SET).unwrap() {
if data.len() > usize::from(MAX_KEY_SHARES_PER_SET) {
Err(TransactionError::InvalidContent)?
}
// TODO: MAX_SIGN_LEN
}
Transaction::SlashReport { slash_points, .. } => {
if slash_points.len() > usize::try_from(MAX_KEY_SHARES_PER_SET).unwrap() {
if slash_points.len() > usize::from(MAX_KEY_SHARES_PER_SET) {
Err(TransactionError::InvalidContent)?
}
}

View File

@@ -95,6 +95,7 @@ impl Coordinator {
message_queue.ack(Service::Coordinator, msg.id).await;
// Fire that there's a new message
// This assumes the success path, not the just-rebooted-path
received_message_send
.send(())
.expect("failed to tell the Coordinator there's a new message");

View File

@@ -39,7 +39,9 @@ pub(crate) fn script_pubkey_for_on_chain_output(
pub(crate) struct TxIndexTask<D: Db>(pub(crate) Rpc<D>);
impl<D: Db> ContinuallyRan for TxIndexTask<D> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let latest_block_number = self
.0

View File

@@ -7,7 +7,10 @@ use serai_db::{DbTxn, Db};
use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch};
use primitives::{EncodableG, task::ContinuallyRan};
use primitives::{
EncodableG,
task::{DoesNotError, ContinuallyRan},
};
use crate::{
db::{Returnable, ScannerGlobalDb, InInstructionData, ScanToBatchDb, BatchData, BatchToReportDb},
index,
@@ -60,7 +63,9 @@ impl<D: Db, S: ScannerFeed> BatchTask<D, S> {
}
impl<D: Db, S: ScannerFeed> ContinuallyRan for BatchTask<D, S> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = DoesNotError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let highest_batchable = {
// Fetch the next to scan block

View File

@@ -190,7 +190,9 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> EventualityTask<D, S, Sch> {
}
impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTask<D, S, Sch> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
// Fetch the highest acknowledged block
let Some(highest_acknowledged) = ScannerGlobalDb::<S>::highest_acknowledged_block(&self.db)

View File

@@ -58,7 +58,9 @@ impl<D: Db, S: ScannerFeed> IndexTask<D, S> {
}
impl<D: Db, S: ScannerFeed> ContinuallyRan for IndexTask<D, S> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
// Fetch the latest finalized block
let our_latest_finalized = IndexDb::latest_finalized_block(&self.db)

View File

@@ -4,7 +4,7 @@ use serai_db::{DbTxn, Db};
use serai_validator_sets_primitives::Session;
use primitives::task::ContinuallyRan;
use primitives::task::{DoesNotError, ContinuallyRan};
use crate::{
db::{BatchData, BatchToReportDb, BatchesToSign},
substrate, ScannerFeed,
@@ -27,7 +27,9 @@ impl<D: Db, S: ScannerFeed> ReportTask<D, S> {
}
impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = DoesNotError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut made_progress = false;
loop {

View File

@@ -98,7 +98,9 @@ impl<D: Db, S: ScannerFeed> ScanTask<D, S> {
}
impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanTask<D, S> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
// Fetch the safe to scan block
let latest_scannable =

View File

@@ -5,7 +5,7 @@ use serai_db::{Get, DbTxn, Db};
use serai_coins_primitives::{OutInstruction, OutInstructionWithBalance};
use messages::substrate::ExecutedBatch;
use primitives::task::ContinuallyRan;
use primitives::task::{DoesNotError, ContinuallyRan};
use crate::{
db::{ScannerGlobalDb, SubstrateToEventualityDb, AcknowledgedBatches},
index, batch, ScannerFeed, KeyFor,
@@ -50,7 +50,9 @@ impl<D: Db, S: ScannerFeed> SubstrateTask<D, S> {
}
impl<D: Db, S: ScannerFeed> ContinuallyRan for SubstrateTask<D, S> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = DoesNotError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut made_progress = false;
loop {

View File

@@ -14,7 +14,7 @@ use serai_db::{Get, DbTxn, Db};
use messages::sign::VariantSignId;
use primitives::task::ContinuallyRan;
use primitives::task::{DoesNotError, ContinuallyRan};
use scanner::{BatchesToSign, AcknowledgedBatches};
use frost_attempt_manager::*;
@@ -79,7 +79,9 @@ impl<D: Db, E: GroupEncoding> BatchSignerTask<D, E> {
}
impl<D: Db, E: Send + GroupEncoding> ContinuallyRan for BatchSignerTask<D, E> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = DoesNotError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut iterated = false;

View File

@@ -22,7 +22,9 @@ impl<D: Db, C: Coordinator> CoordinatorTask<D, C> {
}
impl<D: Db, C: Coordinator> ContinuallyRan for CoordinatorTask<D, C> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut iterated = false;

View File

@@ -11,7 +11,7 @@ use serai_db::{DbTxn, Db};
use messages::{sign::VariantSignId, coordinator::cosign_block_msg};
use primitives::task::ContinuallyRan;
use primitives::task::{DoesNotError, ContinuallyRan};
use frost_attempt_manager::*;
@@ -51,7 +51,9 @@ impl<D: Db> CosignerTask<D> {
}
impl<D: Db> ContinuallyRan for CosignerTask<D> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = DoesNotError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, DoesNotError>> {
async move {
let mut iterated = false;

View File

@@ -13,7 +13,7 @@ use serai_db::{DbTxn, Db};
use messages::sign::VariantSignId;
use primitives::task::ContinuallyRan;
use primitives::task::{DoesNotError, ContinuallyRan};
use scanner::ScannerFeed;
use frost_attempt_manager::*;
@@ -52,7 +52,9 @@ impl<D: Db, S: ScannerFeed> SlashReportSignerTask<D, S> {
}
impl<D: Db, S: ScannerFeed> ContinuallyRan for SlashReportSignerTask<D, S> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = DoesNotError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut iterated = false;

View File

@@ -92,7 +92,9 @@ impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>
impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>> ContinuallyRan
for TransactionSignerTask<D, ST, P>
{
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
type Error = P::EphemeralError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async {
let mut iterated = false;
@@ -222,11 +224,7 @@ impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>
let tx = TransactionFor::<ST>::read(&mut tx_buf).unwrap();
assert!(tx_buf.is_empty());
self
.publisher
.publish(tx)
.await
.map_err(|e| format!("couldn't re-broadcast transactions: {e:?}"))?;
self.publisher.publish(tx).await?;
}
self.last_publication = Instant::now();

View File

@@ -21,7 +21,7 @@ pub enum Call {
},
report_slashes {
network: NetworkId,
slashes: BoundedVec<(SeraiAddress, u32), ConstU32<{ MAX_KEY_SHARES_PER_SET / 3 }>>,
slashes: BoundedVec<(SeraiAddress, u32), ConstU32<{ MAX_KEY_SHARES_PER_SET_U32 / 3 }>>,
signature: Signature,
},
allocate {

View File

@@ -238,9 +238,11 @@ impl<'a> SeraiValidatorSets<'a> {
pub fn report_slashes(
network: NetworkId,
// TODO: This bounds a maximum length but takes more space than just publishing all the u32s
// (50 * (32 + 4)) > (150 * 4)
slashes: sp_runtime::BoundedVec<
(SeraiAddress, u32),
sp_core::ConstU32<{ primitives::MAX_KEY_SHARES_PER_SET / 3 }>,
sp_core::ConstU32<{ primitives::MAX_KEY_SHARES_PER_SET_U32 / 3 }>,
>,
signature: Signature,
) -> Transaction {

View File

@@ -23,7 +23,7 @@ pub mod pallet {
use economic_security_pallet::{Config as EconomicSecurityConfig, Pallet as EconomicSecurity};
use serai_primitives::*;
use validator_sets_primitives::{MAX_KEY_SHARES_PER_SET, Session};
use validator_sets_primitives::{MAX_KEY_SHARES_PER_SET_U32, Session};
pub use emissions_primitives as primitives;
use primitives::*;
@@ -74,7 +74,7 @@ pub mod pallet {
_,
Identity,
NetworkId,
BoundedVec<(PublicKey, u64), ConstU32<{ MAX_KEY_SHARES_PER_SET }>>,
BoundedVec<(PublicKey, u64), ConstU32<{ MAX_KEY_SHARES_PER_SET_U32 }>>,
OptionQuery,
>;

View File

@@ -3,6 +3,7 @@ use crate::BlockNumber;
// 1 MB
pub const BLOCK_SIZE: u32 = 1024 * 1024;
// 6 seconds
// TODO: Use Duration
pub const TARGET_BLOCK_TIME: u64 = 6;
/// Measured in blocks.

View File

@@ -282,7 +282,7 @@ impl pallet_authorship::Config for Runtime {
}
// Maximum number of authorities per session.
pub type MaxAuthorities = ConstU32<{ validator_sets::primitives::MAX_KEY_SHARES_PER_SET }>;
pub type MaxAuthorities = ConstU32<{ validator_sets::primitives::MAX_KEY_SHARES_PER_SET_U32 }>;
/// Longevity of an offence report.
pub type ReportLongevity = <Runtime as pallet_babe::Config>::EpochDuration;

View File

@@ -141,7 +141,7 @@ pub mod pallet {
_,
Identity,
NetworkId,
BoundedVec<(Public, u64), ConstU32<{ MAX_KEY_SHARES_PER_SET }>>,
BoundedVec<(Public, u64), ConstU32<{ MAX_KEY_SHARES_PER_SET_U32 }>>,
OptionQuery,
>;
/// The validators selected to be in-set, regardless of if removed.
@@ -402,7 +402,7 @@ pub mod pallet {
// Clear the current InSet
assert_eq!(
InSet::<T>::clear_prefix(network, MAX_KEY_SHARES_PER_SET, None).maybe_cursor,
InSet::<T>::clear_prefix(network, MAX_KEY_SHARES_PER_SET_U32, None).maybe_cursor,
None
);
@@ -412,11 +412,11 @@ pub mod pallet {
{
let mut iter = SortedAllocationsIter::<T>::new(network);
let mut key_shares = 0;
while key_shares < u64::from(MAX_KEY_SHARES_PER_SET) {
while key_shares < u64::from(MAX_KEY_SHARES_PER_SET_U32) {
let Some((key, amount)) = iter.next() else { break };
let these_key_shares =
(amount.0 / allocation_per_key_share).min(u64::from(MAX_KEY_SHARES_PER_SET));
(amount.0 / allocation_per_key_share).min(u64::from(MAX_KEY_SHARES_PER_SET_U32));
participants.push((key, these_key_shares));
key_shares += these_key_shares;
@@ -535,7 +535,7 @@ pub mod pallet {
top = Some(key_shares);
}
if key_shares > u64::from(MAX_KEY_SHARES_PER_SET) {
if key_shares > u64::from(MAX_KEY_SHARES_PER_SET_U32) {
break;
}
}
@@ -547,7 +547,7 @@ pub mod pallet {
// post_amortization_key_shares_for_top_validator yields what the top validator's key shares
// would be after such a reduction, letting us evaluate this correctly
let top = post_amortization_key_shares_for_top_validator(validators_len, top, key_shares);
(top * 3) < key_shares.min(MAX_KEY_SHARES_PER_SET.into())
(top * 3) < key_shares.min(MAX_KEY_SHARES_PER_SET_U32.into())
}
fn increase_allocation(
@@ -586,7 +586,7 @@ pub mod pallet {
// The above is_bft calls are only used to check a BFT net doesn't become non-BFT
// Check here if this call would prevent a non-BFT net from *ever* becoming BFT
if (new_allocation / allocation_per_key_share) >= (MAX_KEY_SHARES_PER_SET / 3).into() {
if (new_allocation / allocation_per_key_share) >= (MAX_KEY_SHARES_PER_SET_U32 / 3).into() {
Err(Error::<T>::AllocationWouldPreventFaultTolerance)?;
}
@@ -1010,7 +1010,7 @@ pub mod pallet {
pub fn report_slashes(
origin: OriginFor<T>,
network: NetworkId,
slashes: BoundedVec<(Public, u32), ConstU32<{ MAX_KEY_SHARES_PER_SET / 3 }>>,
slashes: BoundedVec<(Public, u32), ConstU32<{ MAX_KEY_SHARES_PER_SET_U32 / 3 }>>,
signature: Signature,
) -> DispatchResult {
ensure_none(origin)?;
@@ -1209,7 +1209,7 @@ pub mod pallet {
ValidTransaction::with_tag_prefix("ValidatorSets")
.and_provides((1, set))
.longevity(MAX_KEY_SHARES_PER_SET.into())
.longevity(MAX_KEY_SHARES_PER_SET_U32.into())
.propagate(true)
.build()
}

View File

@@ -1,5 +1,7 @@
#![cfg_attr(not(feature = "std"), no_std)]
use core::time::Duration;
#[cfg(feature = "std")]
use zeroize::Zeroize;
@@ -13,20 +15,30 @@ use borsh::{BorshSerialize, BorshDeserialize};
#[cfg(feature = "serde")]
use serde::{Serialize, Deserialize};
use sp_core::{ConstU32, sr25519::Public, bounded::BoundedVec};
use sp_core::{ConstU32, bounded::BoundedVec, sr25519::Public};
#[cfg(not(feature = "std"))]
use sp_std::vec::Vec;
use serai_primitives::NetworkId;
/// The maximum amount of key shares per set.
pub const MAX_KEY_SHARES_PER_SET: u32 = 150;
mod slash_points;
pub use slash_points::*;
/// The expected duration for a session.
// 1 week
pub const SESSION_LENGTH: Duration = Duration::from_secs(7 * 24 * 60 * 60);
/// The maximum length for a key.
// Support keys up to 96 bytes (BLS12-381 G2).
pub const MAX_KEY_LEN: u32 = 96;
/// The maximum amount of key shares per set.
pub const MAX_KEY_SHARES_PER_SET: u16 = 150;
pub const MAX_KEY_SHARES_PER_SET_U32: u32 = MAX_KEY_SHARES_PER_SET as u32;
/// The type used to identify a specific session of validators.
#[derive(
Clone, Copy, PartialEq, Eq, Hash, Default, Debug, Encode, Decode, TypeInfo, MaxEncodedLen,
Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Encode, Decode, TypeInfo, MaxEncodedLen,
)]
#[cfg_attr(feature = "std", derive(Zeroize))]
#[cfg_attr(feature = "borsh", derive(BorshSerialize, BorshDeserialize))]
@@ -34,7 +46,9 @@ pub const MAX_KEY_LEN: u32 = 96;
pub struct Session(pub u32);
/// The type used to identify a specific validator set during a specific session.
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Encode, Decode, TypeInfo, MaxEncodedLen)]
#[derive(
Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Encode, Decode, TypeInfo, MaxEncodedLen,
)]
#[cfg_attr(feature = "std", derive(Zeroize))]
#[cfg_attr(feature = "borsh", derive(BorshSerialize, BorshDeserialize))]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
@@ -43,13 +57,13 @@ pub struct ValidatorSet {
pub network: NetworkId,
}
type MaxKeyLen = ConstU32<MAX_KEY_LEN>;
/// The type representing a Key from an external network.
pub type ExternalKey = BoundedVec<u8, MaxKeyLen>;
pub type ExternalKey = BoundedVec<u8, ConstU32<MAX_KEY_LEN>>;
/// The key pair for a validator set.
///
/// This is their Ristretto key, used for signing Batches, and their key on the external network.
/// This is their Ristretto key, used for publishing data onto Serai, and their key on the external
/// network.
#[derive(Clone, PartialEq, Eq, Debug, Encode, Decode, TypeInfo, MaxEncodedLen)]
#[cfg_attr(feature = "borsh", derive(BorshSerialize, BorshDeserialize))]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
@@ -81,12 +95,12 @@ impl Zeroize for KeyPair {
/// The MuSig context for a validator set.
pub fn musig_context(set: ValidatorSet) -> Vec<u8> {
[b"ValidatorSets-musig_key".as_ref(), &set.encode()].concat()
(b"ValidatorSets-musig_key".as_ref(), set).encode()
}
/// The MuSig public key for a validator set.
///
/// This function panics on invalid input.
/// This function panics on invalid input, per the definition of `dkg::musig::musig_key`.
pub fn musig_key(set: ValidatorSet, set_keys: &[Public]) -> Public {
let mut keys = Vec::new();
for key in set_keys {
@@ -98,33 +112,11 @@ pub fn musig_key(set: ValidatorSet, set_keys: &[Public]) -> Public {
Public(dkg::musig::musig_key::<Ristretto>(&musig_context(set), &keys).unwrap().to_bytes())
}
/// The message for the set_keys signature.
/// The message for the `set_keys` signature.
pub fn set_keys_message(set: &ValidatorSet, key_pair: &KeyPair) -> Vec<u8> {
(b"ValidatorSets-set_keys", set, key_pair).encode()
}
#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, Decode, TypeInfo, MaxEncodedLen)]
#[cfg_attr(feature = "borsh", derive(BorshSerialize, BorshDeserialize))]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct Slash {
#[cfg_attr(
feature = "borsh",
borsh(
serialize_with = "serai_primitives::borsh_serialize_public",
deserialize_with = "serai_primitives::borsh_deserialize_public"
)
)]
pub key: Public,
pub points: u32,
}
#[derive(Clone, PartialEq, Eq, Debug, Encode, Decode, TypeInfo, MaxEncodedLen)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct SlashReport(pub BoundedVec<Slash, ConstU32<{ MAX_KEY_SHARES_PER_SET / 3 }>>);
pub fn report_slashes_message(set: &ValidatorSet, slashes: &SlashReport) -> Vec<u8> {
(b"ValidatorSets-report_slashes", set, slashes).encode()
}
/// For a set of validators whose key shares may exceed the maximum, reduce until they equal the
/// maximum.
///

View File

@@ -0,0 +1,299 @@
use core::{num::NonZero, time::Duration};
#[cfg(feature = "std")]
use zeroize::Zeroize;
use scale::{Encode, Decode, MaxEncodedLen};
use scale_info::TypeInfo;
#[cfg(feature = "borsh")]
use borsh::{BorshSerialize, BorshDeserialize};
#[cfg(feature = "serde")]
use serde::{Serialize, Deserialize};
use sp_core::{ConstU32, bounded::BoundedVec};
#[cfg(not(feature = "std"))]
use sp_std::vec::Vec;
use serai_primitives::{TARGET_BLOCK_TIME, Amount};
use crate::{SESSION_LENGTH, MAX_KEY_SHARES_PER_SET_U32};
/// Each slash point is equivalent to the downtime implied by missing a block proposal.
// Takes a NonZero<u16> so that the result is never 0.
fn downtime_per_slash_point(validators: NonZero<u16>) -> Duration {
Duration::from_secs(TARGET_BLOCK_TIME) * u32::from(u16::from(validators))
}
/// A slash for a validator.
#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, Decode, MaxEncodedLen, TypeInfo)]
#[cfg_attr(feature = "std", derive(Zeroize))]
#[cfg_attr(feature = "borsh", derive(BorshSerialize, BorshDeserialize))]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum Slash {
/// The slash points accumulated by this validator.
///
/// Each point is considered as `downtime_per_slash_point(validators)` downtime, where
/// `validators` is the amount of validators present in the set.
Points(u32),
/// A fatal slash due to fundamentally faulty behavior.
///
/// This should only be used for misbehavior with explicit evidence of impropriety. This should
/// not be used for liveness failures. The validator will be penalized all allocated stake.
Fatal,
}
impl Slash {
/// Calculate the penalty which should be applied to the validator.
///
/// Does not panic, even due to overflows, if `allocated_stake + session_rewards <= u64::MAX`.
pub fn penalty(
self,
validators: NonZero<u16>,
allocated_stake: Amount,
session_rewards: Amount,
) -> Amount {
match self {
Self::Points(slash_points) => {
let mut slash_points = u64::from(slash_points);
// Do the logic with the stake in u128 to prevent overflow from multiplying u64s
let allocated_stake = u128::from(allocated_stake.0);
let session_rewards = u128::from(session_rewards.0);
// A Serai validator is allowed to be offline for an average of one day every two weeks
// with no additional penalty. They'll solely not earn rewards for the time they were
// offline.
const GRACE_WINDOW: Duration = Duration::from_secs(2 * 7 * 24 * 60 * 60);
const GRACE: Duration = Duration::from_secs(24 * 60 * 60);
// GRACE / GRACE_WINDOW is the fraction of the time a validator is allowed to be offline
// This means we want SESSION_LENGTH * (GRACE / GRACE_WINDOW), but with the parentheses
// moved so we don't incur the floordiv and hit 0
const PENALTY_FREE_DOWNTIME: Duration = Duration::from_secs(
(SESSION_LENGTH.as_secs() * GRACE.as_secs()) / GRACE_WINDOW.as_secs(),
);
let downtime_per_slash_point = downtime_per_slash_point(validators);
let penalty_free_slash_points =
PENALTY_FREE_DOWNTIME.as_secs() / downtime_per_slash_point.as_secs();
/*
In practice, the following means:
- Hours 0-12 are penalized as if they're hours 0-12.
- Hours 12-24 are penalized as if they're hours 12-36.
- Hours 24-36 are penalized as if they're hours 36-96.
- Hours 36-48 are penalized as if they're hours 96-168.
/* Commented, see below explanation of why.
- Hours 48-168 are penalized for 0-2% of stake.
- 168-336 hours of slashes, for a session only lasting 168 hours, is penalized for 2-10%
of stake.
This means a validator offline has to be offline for more than two days to start having
their stake slashed.
*/
This means a validator offline for two days will not earn any rewards for that session.
*/
const MULTIPLIERS: [u64; 4] = [1, 2, 5, 6];
let reward_slash = {
// In intervals of the penalty-free slash points, weight the slash points accrued
// The multiplier for the first interval is 1 as it's penalty-free
let mut weighted_slash_points_for_reward_slash = 0;
let mut total_possible_slash_points_for_rewards_slash = 0;
for mult in MULTIPLIERS {
let slash_points_in_interval = slash_points.min(penalty_free_slash_points);
weighted_slash_points_for_reward_slash += slash_points_in_interval * mult;
total_possible_slash_points_for_rewards_slash += penalty_free_slash_points * mult;
slash_points -= slash_points_in_interval;
}
// If there are no penalty-free slash points, and the validator was slashed, slash the
// entire reward
(u128::from(weighted_slash_points_for_reward_slash) * session_rewards)
.checked_div(u128::from(total_possible_slash_points_for_rewards_slash))
.unwrap_or({
if weighted_slash_points_for_reward_slash == 0 {
0
} else {
session_rewards
}
})
};
// Ensure the slash never exceeds the amount slashable (due to rounding errors)
let reward_slash = reward_slash.min(session_rewards);
/*
let slash_points_for_entire_session =
SESSION_LENGTH.as_secs() / downtime_per_slash_point.as_secs();
let offline_slash = {
// The amount of stake to slash for being offline
const MAX_STAKE_SLASH_PERCENTAGE_OFFLINE: u64 = 2;
let stake_to_slash_for_being_offline =
(allocated_stake * u128::from(MAX_STAKE_SLASH_PERCENTAGE_OFFLINE)) / 100;
// We already removed the slash points for `intervals * penalty_free_slash_points`
let slash_points_for_reward_slash =
penalty_free_slash_points * u64::try_from(MULTIPLIERS.len()).unwrap();
let slash_points_for_offline_stake_slash =
slash_points_for_entire_session.saturating_sub(slash_points_for_reward_slash);
let slash_points_in_interval = slash_points.min(slash_points_for_offline_stake_slash);
slash_points -= slash_points_in_interval;
// If there are no slash points for the entire session, don't slash stake
// That's an extreme edge case which shouldn't start penalizing validators
(u128::from(slash_points_in_interval) * stake_to_slash_for_being_offline)
.checked_div(u128::from(slash_points_for_offline_stake_slash))
.unwrap_or(0)
};
let disruptive_slash = {
/*
A validator may have more slash points than `slash_points_for_stake_slash` if they
didn't just accrue slashes for missing block proposals, yet also accrued slashes for
being disruptive. In that case, we still want to bound their slash points so they can't
somehow be slashed for 100% of their stake (which should only happen on a fatal slash).
*/
const MAX_STAKE_SLASH_PERCENTAGE_DISRUPTIVE: u64 = 8;
let stake_to_slash_for_being_disruptive =
(allocated_stake * u128::from(MAX_STAKE_SLASH_PERCENTAGE_DISRUPTIVE)) / 100;
// Follows the offline slash for `unwrap_or` policy
(u128::from(slash_points.min(slash_points_for_entire_session)) *
stake_to_slash_for_being_disruptive)
.checked_div(u128::from(slash_points_for_entire_session))
.unwrap_or(0)
};
*/
/*
We do not slash for being offline/disruptive at this time. Doing so allows an adversary
to DoS nodes to not just take them offline, yet also take away their stake. This isn't
preferable to the increased incentive to properly maintain a node when the rewards should
already be sufficient for that purpose.
Validators also shouldn't be able to be so disruptive due to their limiting upon
disruption *while its ongoing*. Slashes as a post-response, while an arguably worthwhile
economic penalty, can never be a response in the moment (as necessary to actually handle
the disruption).
If stake slashing was to be re-enabled, the percentage of stake which is eligible for
slashing should be variable to how close we are to losing liveness. This would mean if
less than 10% of validators are offline, no stake is slashes. If 10% are, 2% is eligible.
If 20% are, 5% is eligible. If 30% are, 10% is eligible.
(or similar)
This would mean that a DoS is insufficient to cause a validator to lose their stake.
Instead, a coordinated DoS against multiple Serai validators would be needed,
strengthening our assumptions.
*/
let offline_slash = 0;
let disruptive_slash = 0;
let stake_slash = (offline_slash + disruptive_slash).min(allocated_stake);
let penalty_u128 = reward_slash + stake_slash;
// saturating_into
Amount(u64::try_from(penalty_u128).unwrap_or(u64::MAX))
}
// On fatal slash, their entire stake is removed
Self::Fatal => Amount(allocated_stake.0 + session_rewards.0),
}
}
}
#[derive(Clone, PartialEq, Eq, Debug, Encode, Decode, TypeInfo, MaxEncodedLen)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct SlashReport(pub BoundedVec<Slash, ConstU32<{ MAX_KEY_SHARES_PER_SET_U32 }>>);
// This is assumed binding to the ValidatorSet via the key signed with
pub fn report_slashes_message(slashes: &SlashReport) -> Vec<u8> {
(b"ValidatorSets-report_slashes", slashes).encode()
}
#[test]
fn test_penalty() {
for validators in [1, 50, 100, crate::MAX_KEY_SHARES_PER_SET] {
let validators = NonZero::new(validators).unwrap();
// 12 hours of slash points should only decrease the rewards proportionately
let twelve_hours_of_slash_points =
u32::try_from((12 * 60 * 60) / downtime_per_slash_point(validators).as_secs()).unwrap();
assert_eq!(
Slash::Points(twelve_hours_of_slash_points).penalty(
validators,
Amount(u64::MAX),
Amount(168)
),
Amount(12)
);
// 24 hours of slash points should be counted as 36 hours
assert_eq!(
Slash::Points(2 * twelve_hours_of_slash_points).penalty(
validators,
Amount(u64::MAX),
Amount(168)
),
Amount(36)
);
// 36 hours of slash points should be counted as 96 hours
assert_eq!(
Slash::Points(3 * twelve_hours_of_slash_points).penalty(
validators,
Amount(u64::MAX),
Amount(168)
),
Amount(96)
);
// 48 hours of slash points should be counted as 168 hours
assert_eq!(
Slash::Points(4 * twelve_hours_of_slash_points).penalty(
validators,
Amount(u64::MAX),
Amount(168)
),
Amount(168)
);
/*
// A full week of slash points should slash 2%
let week_of_slash_points = 14 * twelve_hours_of_slash_points;
assert_eq!(
Slash::Points(week_of_slash_points).penalty(validators, Amount(1000), Amount(168)),
Amount(20 + 168)
);
// Two weeks of slash points should slash 10%
assert_eq!(
Slash::Points(2 * week_of_slash_points).penalty(validators, Amount(1000), Amount(168)),
Amount(100 + 168)
);
// Anything greater should still only slash 10%
assert_eq!(
Slash::Points(u32::MAX).penalty(validators, Amount(1000), Amount(168)),
Amount(100 + 168)
);
*/
// Anything greater should still only slash the rewards
assert_eq!(
Slash::Points(u32::MAX).penalty(validators, Amount(u64::MAX), Amount(168)),
Amount(168)
);
}
}
#[test]
fn no_overflow() {
Slash::Points(u32::MAX).penalty(
NonZero::new(u16::MAX).unwrap(),
Amount(u64::MAX),
Amount(u64::MAX),
);
Slash::Points(u32::MAX).penalty(NonZero::new(1).unwrap(), Amount(u64::MAX), Amount(u64::MAX));
}