14 Commits

Author SHA1 Message Date
Luke Parker
ce3b90541e Make transactions undroppable
coordinator/cosign/src/delay.rs literally demonstrates how we'd need to rewrite
our handling of transactions with this change. It can be cleaned up a bit but
already identifies ergonomic issues. It also doesn't model passing an &mut txn
to an async function, which would also require using the droppable wrapper
struct.

To locally see this build, run

RUSTFLAGS="-Zpanic_abort_tests -C panic=abort" cargo +nightly build -p serai-cosign --all-targets

To locally see this fail to build, run

cargo build -p serai-cosign --all-targets

While it doesn't say which line causes it fail to build, the only distinction
is panic=unwind.

For more context, please see #578.
2025-01-15 03:56:59 -05:00
Luke Parker
cb410cc4e0 Correct how we handle rounding errors within the penalty fn
We explicitly no longer slash stakes but we still set the maximum slash to the
allocated stake + the rewards. Now, the reward slash is bound to the rewards
and the stake slash is bound to the stake. This prevents an improperly rounded
reward slash from effecting a stake slash.
2025-01-15 02:46:31 -05:00
Luke Parker
6c145a5ec3 Disable offline, disruptive slashes
Reasoning commented in codebase
2025-01-14 11:44:13 -05:00
Luke Parker
a7fef2ba7a Redesign Slash/SlashReport types with a function to calculate the penalty 2025-01-14 07:51:39 -05:00
Luke Parker
291ebf5e24 Have serai-task warnings print with the name of the task 2025-01-14 02:52:26 -05:00
Luke Parker
5e0e91c85d Add tasks to publish data onto Serai 2025-01-14 01:58:26 -05:00
Luke Parker
b5a6b0693e Add a proper error type to ContinuallyRan
This isn't necessary. Because we just log the error, we never match off of it,
we don't need any structure beyond String (or now Debug, which still gives us
a way to print the error). This is for the ergonomics of not having to
constantly write `.map_err(|e| format!("{e:?}"))`.
2025-01-12 18:29:08 -05:00
Luke Parker
3cc2abfedc Add a task to publish slash reports 2025-01-12 17:47:48 -05:00
Luke Parker
0ce9aad9b2 Add flow to add transactions onto Tributaries 2025-01-12 07:32:45 -05:00
Luke Parker
e35aa04afb Start handling messages from the processor
Does route ProcessorMessage::CosignedBlock. Rest are stubbed with TODO.
2025-01-12 06:07:55 -05:00
Luke Parker
e7de5125a2 Have processor-messages use CosignIntent/SignedCosign, not the historic cosign format
Has yet to update the processor accordingly.
2025-01-12 05:52:33 -05:00
Luke Parker
158140c3a7 Add a proper error for intake_cosign 2025-01-12 05:49:17 -05:00
Luke Parker
df9a9adaa8 Remove direct dependencies of void, async-trait 2025-01-12 03:48:43 -05:00
Luke Parker
d854807edd Make message_queue::client::Client::send fallible
Allows tasks to report the errors themselves and handle retry in our
standardized way.
2025-01-11 21:57:58 -05:00
58 changed files with 1562 additions and 333 deletions

5
Cargo.lock generated
View File

@@ -8363,7 +8363,6 @@ dependencies = [
"serai-task", "serai-task",
"tokio", "tokio",
"tributary-sdk", "tributary-sdk",
"void",
"zeroize", "zeroize",
] ]
@@ -8386,6 +8385,7 @@ dependencies = [
name = "serai-coordinator-substrate" name = "serai-coordinator-substrate"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"bitvec",
"borsh", "borsh",
"futures", "futures",
"log", "log",
@@ -8402,7 +8402,6 @@ dependencies = [
name = "serai-coordinator-tests" name = "serai-coordinator-tests"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"async-trait",
"blake2", "blake2",
"borsh", "borsh",
"ciphersuite", "ciphersuite",
@@ -8605,7 +8604,6 @@ dependencies = [
name = "serai-full-stack-tests" name = "serai-full-stack-tests"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"async-trait",
"bitcoin-serai", "bitcoin-serai",
"curve25519-dalek", "curve25519-dalek",
"dockertest", "dockertest",
@@ -9001,6 +8999,7 @@ dependencies = [
"hex", "hex",
"parity-scale-codec", "parity-scale-codec",
"serai-coins-primitives", "serai-coins-primitives",
"serai-cosign",
"serai-in-instructions-primitives", "serai-in-instructions-primitives",
"serai-primitives", "serai-primitives",
"serai-validator-sets-primitives", "serai-validator-sets-primitives",

View File

@@ -30,13 +30,53 @@ pub trait Get {
/// is undefined. The transaction may block, deadlock, panic, overwrite one of the two values /// is undefined. The transaction may block, deadlock, panic, overwrite one of the two values
/// randomly, or any other action, at time of write or at time of commit. /// randomly, or any other action, at time of write or at time of commit.
#[must_use] #[must_use]
pub trait DbTxn: Send + Get { pub trait DbTxn: Sized + Send + Get {
/// Write a value to this key. /// Write a value to this key.
fn put(&mut self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>); fn put(&mut self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>);
/// Delete the value from this key. /// Delete the value from this key.
fn del(&mut self, key: impl AsRef<[u8]>); fn del(&mut self, key: impl AsRef<[u8]>);
/// Commit this transaction. /// Commit this transaction.
fn commit(self); fn commit(self);
/// Close this transaction.
///
/// This is equivalent to `Drop` on transactions which can be dropped. This is explicit and works
/// with transactions which can't be dropped.
fn close(self) {
drop(self);
}
}
// Credit for the idea goes to https://jack.wrenn.fyi/blog/undroppable
pub struct Undroppable<T>(Option<T>);
impl<T> Drop for Undroppable<T> {
fn drop(&mut self) {
// Use an assertion at compile time to prevent this code from compiling if generated
#[allow(clippy::assertions_on_constants)]
const {
assert!(false, "Undroppable DbTxn was dropped. Ensure all code paths call commit or close");
}
}
}
impl<T: DbTxn> Get for Undroppable<T> {
fn get(&self, key: impl AsRef<[u8]>) -> Option<Vec<u8>> {
self.0.as_ref().unwrap().get(key)
}
}
impl<T: DbTxn> DbTxn for Undroppable<T> {
fn put(&mut self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) {
self.0.as_mut().unwrap().put(key, value);
}
fn del(&mut self, key: impl AsRef<[u8]>) {
self.0.as_mut().unwrap().del(key);
}
fn commit(mut self) {
self.0.take().unwrap().commit();
let _ = core::mem::ManuallyDrop::new(self);
}
fn close(mut self) {
drop(self.0.take().unwrap());
let _ = core::mem::ManuallyDrop::new(self);
}
} }
/// A database supporting atomic transaction. /// A database supporting atomic transaction.
@@ -51,6 +91,10 @@ pub trait Db: 'static + Send + Sync + Clone + Get {
let dst_len = u8::try_from(item_dst.len()).unwrap(); let dst_len = u8::try_from(item_dst.len()).unwrap();
[[db_len].as_ref(), db_dst, [dst_len].as_ref(), item_dst, key.as_ref()].concat() [[db_len].as_ref(), db_dst, [dst_len].as_ref(), item_dst, key.as_ref()].concat()
} }
/// Open a new transaction. /// Open a new transaction which may be dropped.
fn txn(&mut self) -> Self::Transaction<'_>; fn unsafe_txn(&mut self) -> Self::Transaction<'_>;
/// Open a new transaction which must be committed or closed.
fn txn(&mut self) -> Undroppable<Self::Transaction<'_>> {
Undroppable(Some(self.unsafe_txn()))
}
} }

View File

@@ -74,7 +74,7 @@ impl Get for MemDb {
} }
impl Db for MemDb { impl Db for MemDb {
type Transaction<'a> = MemDbTxn<'a>; type Transaction<'a> = MemDbTxn<'a>;
fn txn(&mut self) -> MemDbTxn<'_> { fn unsafe_txn(&mut self) -> MemDbTxn<'_> {
MemDbTxn(self, HashMap::new(), HashSet::new()) MemDbTxn(self, HashMap::new(), HashSet::new())
} }
} }

View File

@@ -37,7 +37,7 @@ impl Get for Arc<ParityDb> {
} }
impl Db for Arc<ParityDb> { impl Db for Arc<ParityDb> {
type Transaction<'a> = Transaction<'a>; type Transaction<'a> = Transaction<'a>;
fn txn(&mut self) -> Self::Transaction<'_> { fn unsafe_txn(&mut self) -> Self::Transaction<'_> {
Transaction(self, vec![]) Transaction(self, vec![])
} }
} }

View File

@@ -39,7 +39,7 @@ impl<T: ThreadMode> Get for Arc<OptimisticTransactionDB<T>> {
} }
impl<T: Send + ThreadMode + 'static> Db for Arc<OptimisticTransactionDB<T>> { impl<T: Send + ThreadMode + 'static> Db for Arc<OptimisticTransactionDB<T>> {
type Transaction<'a> = Transaction<'a, T>; type Transaction<'a> = Transaction<'a, T>;
fn txn(&mut self) -> Self::Transaction<'_> { fn unsafe_txn(&mut self) -> Self::Transaction<'_> {
let mut opts = WriteOptions::default(); let mut opts = WriteOptions::default();
opts.set_sync(true); opts.set_sync(true);
Transaction(self.transaction_opt(&opts, &Default::default()), &**self) Transaction(self.transaction_opt(&opts, &Default::default()), &**self)

View File

@@ -2,10 +2,16 @@
#![doc = include_str!("../README.md")] #![doc = include_str!("../README.md")]
#![deny(missing_docs)] #![deny(missing_docs)]
use core::{future::Future, time::Duration}; use core::{
fmt::{self, Debug},
future::Future,
time::Duration,
};
use tokio::sync::mpsc; use tokio::sync::mpsc;
mod type_name;
/// A handle for a task. /// A handle for a task.
/// ///
/// The task will only stop running once all handles for it are dropped. /// The task will only stop running once all handles for it are dropped.
@@ -45,8 +51,6 @@ impl Task {
impl TaskHandle { impl TaskHandle {
/// Tell the task to run now (and not whenever its next iteration on a timer is). /// Tell the task to run now (and not whenever its next iteration on a timer is).
///
/// Panics if the task has been dropped.
pub fn run_now(&self) { pub fn run_now(&self) {
#[allow(clippy::match_same_arms)] #[allow(clippy::match_same_arms)]
match self.run_now.try_send(()) { match self.run_now.try_send(()) {
@@ -54,12 +58,22 @@ impl TaskHandle {
// NOP on full, as this task will already be ran as soon as possible // NOP on full, as this task will already be ran as soon as possible
Err(mpsc::error::TrySendError::Full(())) => {} Err(mpsc::error::TrySendError::Full(())) => {}
Err(mpsc::error::TrySendError::Closed(())) => { Err(mpsc::error::TrySendError::Closed(())) => {
// The task should only be closed if all handles are dropped, and this one hasn't been
panic!("task was unexpectedly closed when calling run_now") panic!("task was unexpectedly closed when calling run_now")
} }
} }
} }
} }
/// An enum which can't be constructed, representing that the task does not error.
pub enum DoesNotError {}
impl Debug for DoesNotError {
fn fmt(&self, _: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
// This type can't be constructed so we'll never have a `&self` to call this fn with
unreachable!()
}
}
/// A task to be continually ran. /// A task to be continually ran.
pub trait ContinuallyRan: Sized + Send { pub trait ContinuallyRan: Sized + Send {
/// The amount of seconds before this task should be polled again. /// The amount of seconds before this task should be polled again.
@@ -69,11 +83,14 @@ pub trait ContinuallyRan: Sized + Send {
/// Upon error, the amount of time waited will be linearly increased until this limit. /// Upon error, the amount of time waited will be linearly increased until this limit.
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 120; const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 120;
/// The error potentially yielded upon running an iteration of this task.
type Error: Debug;
/// Run an iteration of the task. /// Run an iteration of the task.
/// ///
/// If this returns `true`, all dependents of the task will immediately have a new iteration ran /// If this returns `true`, all dependents of the task will immediately have a new iteration ran
/// (without waiting for whatever timer they were already on). /// (without waiting for whatever timer they were already on).
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>>; fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>>;
/// Continually run the task. /// Continually run the task.
fn continually_run( fn continually_run(
@@ -115,12 +132,20 @@ pub trait ContinuallyRan: Sized + Send {
} }
} }
Err(e) => { Err(e) => {
log::warn!("{}", e); // Get the type name
let type_name = type_name::strip_type_name(core::any::type_name::<Self>());
// Print the error as a warning, prefixed by the task's type
log::warn!("{type_name}: {e:?}");
increase_sleep_before_next_task(&mut current_sleep_before_next_task); increase_sleep_before_next_task(&mut current_sleep_before_next_task);
} }
} }
// Don't run the task again for another few seconds UNLESS told to run now // Don't run the task again for another few seconds UNLESS told to run now
/*
We could replace tokio::mpsc with async_channel, tokio::time::sleep with
patchable_async_sleep::sleep, and tokio::select with futures_lite::future::or
It isn't worth the effort when patchable_async_sleep::sleep will still resolve to tokio
*/
tokio::select! { tokio::select! {
() = tokio::time::sleep(Duration::from_secs(current_sleep_before_next_task)) => {}, () = tokio::time::sleep(Duration::from_secs(current_sleep_before_next_task)) => {},
msg = task.run_now.recv() => { msg = task.run_now.recv() => {

View File

@@ -0,0 +1,31 @@
/// Strip the modules from a type name.
// This may be of the form `a::b::C`, in which case we only want `C`
pub(crate) fn strip_type_name(full_type_name: &'static str) -> String {
// It also may be `a::b::C<d::e::F>`, in which case, we only attempt to strip `a::b`
let mut by_generics = full_type_name.split('<');
// Strip to just `C`
let full_outer_object_name = by_generics.next().unwrap();
let mut outer_object_name_parts = full_outer_object_name.split("::");
let mut last_part_in_outer_object_name = outer_object_name_parts.next().unwrap();
for part in outer_object_name_parts {
last_part_in_outer_object_name = part;
}
// Push back on the generic terms
let mut type_name = last_part_in_outer_object_name.to_string();
for generic in by_generics {
type_name.push('<');
type_name.push_str(generic);
}
type_name
}
#[test]
fn test_strip_type_name() {
assert_eq!(strip_type_name("core::option::Option"), "Option");
assert_eq!(
strip_type_name("core::option::Option<alloc::string::String>"),
"Option<alloc::string::String>"
);
}

View File

@@ -30,7 +30,7 @@ schnorr = { package = "schnorr-signatures", path = "../crypto/schnorr", default-
frost = { package = "modular-frost", path = "../crypto/frost" } frost = { package = "modular-frost", path = "../crypto/frost" }
frost-schnorrkel = { path = "../crypto/schnorrkel" } frost-schnorrkel = { path = "../crypto/schnorrkel" }
scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive"] } scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive", "bit-vec"] }
zalloc = { path = "../common/zalloc" } zalloc = { path = "../common/zalloc" }
serai-db = { path = "../common/db" } serai-db = { path = "../common/db" }

View File

@@ -2,7 +2,7 @@ use core::future::Future;
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
use serai_db::*; use serai_db::*;
use serai_task::ContinuallyRan; use serai_task::{DoesNotError, ContinuallyRan};
use crate::evaluator::CosignedBlocks; use crate::evaluator::CosignedBlocks;
@@ -24,8 +24,19 @@ pub(crate) struct CosignDelayTask<D: Db> {
pub(crate) db: D, pub(crate) db: D,
} }
struct AwaitUndroppable<T: DbTxn>(Option<core::mem::ManuallyDrop<Undroppable<T>>>);
impl<T: DbTxn> Drop for AwaitUndroppable<T> {
fn drop(&mut self) {
if let Some(mut txn) = self.0.take() {
(unsafe { core::mem::ManuallyDrop::take(&mut txn) }).close();
}
}
}
impl<D: Db> ContinuallyRan for CosignDelayTask<D> { impl<D: Db> ContinuallyRan for CosignDelayTask<D> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { type Error = DoesNotError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move { async move {
let mut made_progress = false; let mut made_progress = false;
loop { loop {
@@ -33,14 +44,18 @@ impl<D: Db> ContinuallyRan for CosignDelayTask<D> {
// Receive the next block to mark as cosigned // Receive the next block to mark as cosigned
let Some((block_number, time_evaluated)) = CosignedBlocks::try_recv(&mut txn) else { let Some((block_number, time_evaluated)) = CosignedBlocks::try_recv(&mut txn) else {
txn.close();
break; break;
}; };
// Calculate when we should mark it as valid // Calculate when we should mark it as valid
let time_valid = let time_valid =
SystemTime::UNIX_EPOCH + Duration::from_secs(time_evaluated) + ACKNOWLEDGEMENT_DELAY; SystemTime::UNIX_EPOCH + Duration::from_secs(time_evaluated) + ACKNOWLEDGEMENT_DELAY;
// Sleep until then // Sleep until then
let mut txn = AwaitUndroppable(Some(core::mem::ManuallyDrop::new(txn)));
tokio::time::sleep(SystemTime::now().duration_since(time_valid).unwrap_or(Duration::ZERO)) tokio::time::sleep(SystemTime::now().duration_since(time_valid).unwrap_or(Duration::ZERO))
.await; .await;
let mut txn = core::mem::ManuallyDrop::into_inner(txn.0.take().unwrap());
// Set the cosigned block // Set the cosigned block
LatestCosignedBlockNumber::set(&mut txn, &block_number); LatestCosignedBlockNumber::set(&mut txn, &block_number);

View File

@@ -80,12 +80,14 @@ pub(crate) struct CosignEvaluatorTask<D: Db, R: RequestNotableCosigns> {
} }
impl<D: Db, R: RequestNotableCosigns> ContinuallyRan for CosignEvaluatorTask<D, R> { impl<D: Db, R: RequestNotableCosigns> ContinuallyRan for CosignEvaluatorTask<D, R> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move { async move {
let mut known_cosign = None; let mut known_cosign = None;
let mut made_progress = false; let mut made_progress = false;
loop { loop {
let mut txn = self.db.txn(); let mut txn = self.db.unsafe_txn();
let Some(BlockEventData { block_number, has_events }) = BlockEvents::try_recv(&mut txn) let Some(BlockEventData { block_number, has_events }) = BlockEvents::try_recv(&mut txn)
else { else {
break; break;

View File

@@ -61,14 +61,16 @@ pub(crate) struct CosignIntendTask<D: Db> {
} }
impl<D: Db> ContinuallyRan for CosignIntendTask<D> { impl<D: Db> ContinuallyRan for CosignIntendTask<D> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move { async move {
let start_block_number = ScanCosignFrom::get(&self.db).unwrap_or(1); let start_block_number = ScanCosignFrom::get(&self.db).unwrap_or(1);
let latest_block_number = let latest_block_number =
self.serai.latest_finalized_block().await.map_err(|e| format!("{e:?}"))?.number(); self.serai.latest_finalized_block().await.map_err(|e| format!("{e:?}"))?.number();
for block_number in start_block_number ..= latest_block_number { for block_number in start_block_number ..= latest_block_number {
let mut txn = self.db.txn(); let mut txn = self.db.unsafe_txn();
let (block, mut has_events) = let (block, mut has_events) =
block_has_events_justifying_a_cosign(&self.serai, block_number) block_has_events_justifying_a_cosign(&self.serai, block_number)
@@ -88,7 +90,6 @@ impl<D: Db> ContinuallyRan for CosignIntendTask<D> {
} }
let block_hash = block.hash(); let block_hash = block.hash();
SubstrateBlockHash::set(&mut txn, block_number, &block_hash); SubstrateBlockHash::set(&mut txn, block_number, &block_hash);
SubstrateBlockNumber::set(&mut txn, block_hash, &block_number);
let global_session_for_this_block = LatestGlobalSessionIntended::get(&txn); let global_session_for_this_block = LatestGlobalSessionIntended::get(&txn);

View File

@@ -128,7 +128,6 @@ create_db! {
// An index of Substrate blocks // An index of Substrate blocks
SubstrateBlockHash: (block_number: u64) -> [u8; 32], SubstrateBlockHash: (block_number: u64) -> [u8; 32],
SubstrateBlockNumber: (block_hash: [u8; 32]) -> u64,
// A mapping from a global session's ID to its relevant information. // A mapping from a global session's ID to its relevant information.
GlobalSessions: (global_session: [u8; 32]) -> GlobalSession, GlobalSessions: (global_session: [u8; 32]) -> GlobalSession,
// The last block to be cosigned by a global session. // The last block to be cosigned by a global session.
@@ -229,6 +228,43 @@ pub trait RequestNotableCosigns: 'static + Send {
#[derive(Debug)] #[derive(Debug)]
pub struct Faulted; pub struct Faulted;
/// An error incurred while intaking a cosign.
#[derive(Debug)]
pub enum IntakeCosignError {
/// Cosign is for a not-yet-indexed block
NotYetIndexedBlock,
/// A later cosign for this cosigner has already been handled
StaleCosign,
/// The cosign's global session isn't recognized
UnrecognizedGlobalSession,
/// The cosign is for a block before its global session starts
BeforeGlobalSessionStart,
/// The cosign is for a block after its global session ends
AfterGlobalSessionEnd,
/// The cosign's signing network wasn't a participant in this global session
NonParticipatingNetwork,
/// The cosign had an invalid signature
InvalidSignature,
/// The cosign is for a global session which has yet to have its declaration block cosigned
FutureGlobalSession,
}
impl IntakeCosignError {
/// If this error is temporal to the local view
pub fn temporal(&self) -> bool {
match self {
IntakeCosignError::NotYetIndexedBlock |
IntakeCosignError::StaleCosign |
IntakeCosignError::UnrecognizedGlobalSession |
IntakeCosignError::FutureGlobalSession => true,
IntakeCosignError::BeforeGlobalSessionStart |
IntakeCosignError::AfterGlobalSessionEnd |
IntakeCosignError::NonParticipatingNetwork |
IntakeCosignError::InvalidSignature => false,
}
}
}
/// The interface to manage cosigning with. /// The interface to manage cosigning with.
pub struct Cosigning<D: Db> { pub struct Cosigning<D: Db> {
db: D, db: D,
@@ -282,13 +318,6 @@ impl<D: Db> Cosigning<D> {
)) ))
} }
/// Fetch a finalized block's number by its hash.
///
/// This block is not guaranteed to be cosigned.
pub fn finalized_block_number(getter: &impl Get, block_hash: [u8; 32]) -> Option<u64> {
SubstrateBlockNumber::get(getter, block_hash)
}
/// Fetch the notable cosigns for a global session in order to respond to requests. /// Fetch the notable cosigns for a global session in order to respond to requests.
/// ///
/// If this global session hasn't produced any notable cosigns, this will return the latest /// If this global session hasn't produced any notable cosigns, this will return the latest
@@ -335,25 +364,15 @@ impl<D: Db> Cosigning<D> {
} }
/// Intake a cosign. /// Intake a cosign.
///
/// - Returns Err(_) if there was an error trying to validate the cosign.
/// - Returns Ok(true) if the cosign was successfully handled or could not be handled at this
/// time.
/// - Returns Ok(false) if the cosign was invalid.
//
// We collapse a cosign which shouldn't be handled yet into a valid cosign (`Ok(true)`) as we
// assume we'll either explicitly request it if we need it or we'll naturally see it (or a later,
// more relevant, cosign) again.
// //
// Takes `&mut self` as this should only be called once at any given moment. // Takes `&mut self` as this should only be called once at any given moment.
// TODO: Don't overload bool here pub fn intake_cosign(&mut self, signed_cosign: &SignedCosign) -> Result<(), IntakeCosignError> {
pub fn intake_cosign(&mut self, signed_cosign: &SignedCosign) -> Result<bool, String> {
let cosign = &signed_cosign.cosign; let cosign = &signed_cosign.cosign;
let network = cosign.cosigner; let network = cosign.cosigner;
// Check our indexed blockchain includes a block with this block number // Check our indexed blockchain includes a block with this block number
let Some(our_block_hash) = SubstrateBlockHash::get(&self.db, cosign.block_number) else { let Some(our_block_hash) = SubstrateBlockHash::get(&self.db, cosign.block_number) else {
return Ok(true); Err(IntakeCosignError::NotYetIndexedBlock)?
}; };
let faulty = cosign.block_hash != our_block_hash; let faulty = cosign.block_hash != our_block_hash;
@@ -363,20 +382,19 @@ impl<D: Db> Cosigning<D> {
NetworksLatestCosignedBlock::get(&self.db, cosign.global_session, network) NetworksLatestCosignedBlock::get(&self.db, cosign.global_session, network)
{ {
if existing.cosign.block_number >= cosign.block_number { if existing.cosign.block_number >= cosign.block_number {
return Ok(true); Err(IntakeCosignError::StaleCosign)?;
} }
} }
} }
let Some(global_session) = GlobalSessions::get(&self.db, cosign.global_session) else { let Some(global_session) = GlobalSessions::get(&self.db, cosign.global_session) else {
// Unrecognized global session Err(IntakeCosignError::UnrecognizedGlobalSession)?
return Ok(true);
}; };
// Check the cosigned block number is in range to the global session // Check the cosigned block number is in range to the global session
if cosign.block_number < global_session.start_block_number { if cosign.block_number < global_session.start_block_number {
// Cosign is for a block predating the global session // Cosign is for a block predating the global session
return Ok(false); Err(IntakeCosignError::BeforeGlobalSessionStart)?;
} }
if !faulty { if !faulty {
// This prevents a malicious validator set, on the same chain, from producing a cosign after // This prevents a malicious validator set, on the same chain, from producing a cosign after
@@ -384,7 +402,7 @@ impl<D: Db> Cosigning<D> {
if let Some(last_block) = GlobalSessionsLastBlock::get(&self.db, cosign.global_session) { if let Some(last_block) = GlobalSessionsLastBlock::get(&self.db, cosign.global_session) {
if cosign.block_number > last_block { if cosign.block_number > last_block {
// Cosign is for a block after the last block this global session should have signed // Cosign is for a block after the last block this global session should have signed
return Ok(false); Err(IntakeCosignError::AfterGlobalSessionEnd)?;
} }
} }
} }
@@ -393,20 +411,20 @@ impl<D: Db> Cosigning<D> {
{ {
let key = Public::from({ let key = Public::from({
let Some(key) = global_session.keys.get(&network) else { let Some(key) = global_session.keys.get(&network) else {
return Ok(false); Err(IntakeCosignError::NonParticipatingNetwork)?
}; };
*key *key
}); });
if !signed_cosign.verify_signature(key) { if !signed_cosign.verify_signature(key) {
return Ok(false); Err(IntakeCosignError::InvalidSignature)?;
} }
} }
// Since we verified this cosign's signature, and have a chain sufficiently long, handle the // Since we verified this cosign's signature, and have a chain sufficiently long, handle the
// cosign // cosign
let mut txn = self.db.txn(); let mut txn = self.db.unsafe_txn();
if !faulty { if !faulty {
// If this is for a future global session, we don't acknowledge this cosign at this time // If this is for a future global session, we don't acknowledge this cosign at this time
@@ -415,7 +433,7 @@ impl<D: Db> Cosigning<D> {
// block declaring it was cosigned // block declaring it was cosigned
if (global_session.start_block_number - 1) > latest_cosigned_block_number { if (global_session.start_block_number - 1) > latest_cosigned_block_number {
drop(txn); drop(txn);
return Ok(true); return Err(IntakeCosignError::FutureGlobalSession);
} }
// This is safe as it's in-range and newer, as prior checked since it isn't faulty // This is safe as it's in-range and newer, as prior checked since it isn't faulty
@@ -429,9 +447,10 @@ impl<D: Db> Cosigning<D> {
let mut weight_cosigned = 0; let mut weight_cosigned = 0;
for fault in &faults { for fault in &faults {
let Some(stake) = global_session.stakes.get(&fault.cosign.cosigner) else { let stake = global_session
Err("cosigner with recognized key didn't have a stake entry saved".to_string())? .stakes
}; .get(&fault.cosign.cosigner)
.expect("cosigner with recognized key didn't have a stake entry saved");
weight_cosigned += stake; weight_cosigned += stake;
} }
@@ -443,7 +462,7 @@ impl<D: Db> Cosigning<D> {
} }
txn.commit(); txn.commit();
Ok(true) Ok(())
} }
/// Receive intended cosigns to produce for this ValidatorSet. /// Receive intended cosigns to produce for this ValidatorSet.
@@ -461,3 +480,30 @@ impl<D: Db> Cosigning<D> {
res res
} }
} }
mod tests {
use super::*;
struct RNC;
impl RequestNotableCosigns for RNC {
/// The error type which may be encountered when requesting notable cosigns.
type Error = ();
/// Request the notable cosigns for this global session.
fn request_notable_cosigns(
&self,
global_session: [u8; 32],
) -> impl Send + Future<Output = Result<(), Self::Error>> {
async move { Ok(()) }
}
}
#[tokio::test]
async fn test() {
let db: serai_db::MemDb = serai_db::MemDb::new();
let serai = unsafe { core::mem::transmute(0u64) };
let request = RNC;
let tasks = vec![];
let _ = Cosigning::spawn(db, serai, request, tasks);
core::future::pending().await
}
}

View File

@@ -33,7 +33,6 @@ serai-client = { path = "../../../substrate/client", default-features = false, f
serai-cosign = { path = "../../cosign" } serai-cosign = { path = "../../cosign" }
tributary-sdk = { path = "../../tributary-sdk" } tributary-sdk = { path = "../../tributary-sdk" }
void = { version = "1", default-features = false }
futures-util = { version = "0.3", default-features = false, features = ["std"] } futures-util = { version = "0.3", default-features = false, features = ["std"] }
tokio = { version = "1", default-features = false, features = ["sync"] } tokio = { version = "1", default-features = false, features = ["sync"] }
libp2p = { version = "0.52", default-features = false, features = ["tokio", "tcp", "noise", "yamux", "ping", "request-response", "gossipsub", "macros"] } libp2p = { version = "0.52", default-features = false, features = ["tokio", "tcp", "noise", "yamux", "ping", "request-response", "gossipsub", "macros"] }

View File

@@ -5,7 +5,7 @@ use rand_core::{RngCore, OsRng};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use serai_client::Serai; use serai_client::{SeraiError, Serai};
use libp2p::{ use libp2p::{
core::multiaddr::{Protocol, Multiaddr}, core::multiaddr::{Protocol, Multiaddr},
@@ -50,7 +50,9 @@ impl ContinuallyRan for DialTask {
const DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60; const DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60;
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 10 * 60; const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 10 * 60;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { type Error = SeraiError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move { async move {
self.validators.update().await?; self.validators.update().await?;
@@ -83,8 +85,7 @@ impl ContinuallyRan for DialTask {
.unwrap_or(0) .unwrap_or(0)
.saturating_sub(1)) .saturating_sub(1))
{ {
let mut potential_peers = let mut potential_peers = self.serai.p2p_validators(network).await?;
self.serai.p2p_validators(network).await.map_err(|e| format!("{e:?}"))?;
for _ in 0 .. (TARGET_PEERS_PER_NETWORK - peer_count) { for _ in 0 .. (TARGET_PEERS_PER_NETWORK - peer_count) {
if potential_peers.is_empty() { if potential_peers.is_empty() {
break; break;

View File

@@ -225,8 +225,8 @@ impl SwarmTask {
SwarmEvent::Behaviour( SwarmEvent::Behaviour(
BehaviorEvent::AllowList(event) | BehaviorEvent::ConnectionLimits(event) BehaviorEvent::AllowList(event) | BehaviorEvent::ConnectionLimits(event)
) => { ) => {
// Ensure these are unreachable cases, not actual events // This *is* an exhaustive match as these events are empty enums
let _: void::Void = event; match event {}
} }
SwarmEvent::Behaviour( SwarmEvent::Behaviour(
BehaviorEvent::Ping(ping::Event { peer: _, connection, result, }) BehaviorEvent::Ping(ping::Event { peer: _, connection, result, })

View File

@@ -4,7 +4,7 @@ use std::{
collections::{HashSet, HashMap}, collections::{HashSet, HashMap},
}; };
use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, Serai}; use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, SeraiError, Serai};
use serai_task::{Task, ContinuallyRan}; use serai_task::{Task, ContinuallyRan};
@@ -50,9 +50,8 @@ impl Validators {
async fn session_changes( async fn session_changes(
serai: impl Borrow<Serai>, serai: impl Borrow<Serai>,
sessions: impl Borrow<HashMap<NetworkId, Session>>, sessions: impl Borrow<HashMap<NetworkId, Session>>,
) -> Result<Vec<(NetworkId, Session, HashSet<PeerId>)>, String> { ) -> Result<Vec<(NetworkId, Session, HashSet<PeerId>)>, SeraiError> {
let temporal_serai = let temporal_serai = serai.borrow().as_of_latest_finalized_block().await?;
serai.borrow().as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?;
let temporal_serai = temporal_serai.validator_sets(); let temporal_serai = temporal_serai.validator_sets();
let mut session_changes = vec![]; let mut session_changes = vec![];
@@ -69,7 +68,7 @@ impl Validators {
let session = match temporal_serai.session(network).await { let session = match temporal_serai.session(network).await {
Ok(Some(session)) => session, Ok(Some(session)) => session,
Ok(None) => return Ok(None), Ok(None) => return Ok(None),
Err(e) => return Err(format!("{e:?}")), Err(e) => return Err(e),
}; };
if sessions.get(&network) == Some(&session) { if sessions.get(&network) == Some(&session) {
@@ -81,7 +80,7 @@ impl Validators {
session, session,
validators.into_iter().map(peer_id_from_public).collect(), validators.into_iter().map(peer_id_from_public).collect(),
))), ))),
Err(e) => Err(format!("{e:?}")), Err(e) => Err(e),
} }
} }
}); });
@@ -147,7 +146,7 @@ impl Validators {
} }
/// Update the view of the validators. /// Update the view of the validators.
pub(crate) async fn update(&mut self) -> Result<(), String> { pub(crate) async fn update(&mut self) -> Result<(), SeraiError> {
let session_changes = Self::session_changes(&*self.serai, &self.sessions).await?; let session_changes = Self::session_changes(&*self.serai, &self.sessions).await?;
self.incorporate_session_changes(session_changes); self.incorporate_session_changes(session_changes);
Ok(()) Ok(())
@@ -200,13 +199,13 @@ impl ContinuallyRan for UpdateValidatorsTask {
const DELAY_BETWEEN_ITERATIONS: u64 = 60; const DELAY_BETWEEN_ITERATIONS: u64 = 60;
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60; const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { type Error = SeraiError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move { async move {
let session_changes = { let session_changes = {
let validators = self.validators.read().await; let validators = self.validators.read().await;
Validators::session_changes(validators.serai.clone(), validators.sessions.clone()) Validators::session_changes(validators.serai.clone(), validators.sessions.clone()).await?
.await
.map_err(|e| format!("{e:?}"))?
}; };
self.validators.write().await.incorporate_session_changes(session_changes); self.validators.write().await.incorporate_session_changes(session_changes);
Ok(true) Ok(true)

View File

@@ -45,7 +45,9 @@ pub(crate) struct HeartbeatTask<TD: Db, Tx: TransactionTrait, P: P2p> {
} }
impl<TD: Db, Tx: TransactionTrait, P: P2p> ContinuallyRan for HeartbeatTask<TD, Tx, P> { impl<TD: Db, Tx: TransactionTrait, P: P2p> ContinuallyRan for HeartbeatTask<TD, Tx, P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move { async move {
// If our blockchain hasn't had a block in the past minute, trigger the heartbeat protocol // If our blockchain hasn't had a block in the past minute, trigger the heartbeat protocol
const TIME_TO_TRIGGER_SYNCING: Duration = Duration::from_secs(60); const TIME_TO_TRIGGER_SYNCING: Duration = Duration::from_secs(60);

View File

@@ -8,9 +8,9 @@ use serai_client::{
validator_sets::primitives::{Session, ValidatorSet}, validator_sets::primitives::{Session, ValidatorSet},
}; };
use serai_cosign::CosignIntent; use serai_cosign::SignedCosign;
use serai_coordinator_substrate::NewSetInformation; use serai_coordinator_substrate::NewSetInformation;
use serai_coordinator_tributary::Transaction;
#[cfg(all(feature = "parity-db", not(feature = "rocksdb")))] #[cfg(all(feature = "parity-db", not(feature = "rocksdb")))]
pub(crate) type Db = serai_db::ParityDb; pub(crate) type Db = serai_db::ParityDb;
@@ -66,14 +66,48 @@ pub(crate) fn prune_tributary_db(set: ValidatorSet) {
create_db! { create_db! {
Coordinator { Coordinator {
// The currently active Tributaries
ActiveTributaries: () -> Vec<NewSetInformation>, ActiveTributaries: () -> Vec<NewSetInformation>,
// The latest Tributary to have been retired for a network
// Since Tributaries are retired sequentially, this is informative to if any Tributary has been
// retired
RetiredTributary: (network: NetworkId) -> Session, RetiredTributary: (network: NetworkId) -> Session,
// The last handled message from a Processor
LastProcessorMessage: (network: NetworkId) -> u64,
// Cosigns we produced and tried to intake yet incurred an error while doing so
ErroneousCosigns: () -> Vec<SignedCosign>,
} }
} }
db_channel! { db_channel! {
Coordinator { Coordinator {
// Cosigns we produced
SignedCosigns: () -> SignedCosign,
// Tributaries to clean up upon reboot
TributaryCleanup: () -> ValidatorSet, TributaryCleanup: () -> ValidatorSet,
PendingCosigns: (set: ValidatorSet) -> CosignIntent, }
}
mod _internal_db {
use super::*;
db_channel! {
Coordinator {
// Tributary transactions to publish
TributaryTransactions: (set: ValidatorSet) -> Transaction,
}
}
}
pub(crate) struct TributaryTransactions;
impl TributaryTransactions {
pub(crate) fn send(txn: &mut impl DbTxn, set: ValidatorSet, tx: &Transaction) {
// If this set has yet to be retired, send this transaction
if RetiredTributary::get(txn, set.network).map(|session| session.0) < Some(set.session.0) {
_internal_db::TributaryTransactions::send(txn, set, tx);
}
}
pub(crate) fn try_recv(txn: &mut impl DbTxn, set: ValidatorSet) -> Option<Transaction> {
_internal_db::TributaryTransactions::try_recv(txn, set)
} }
} }

View File

@@ -1,5 +1,5 @@
use core::{ops::Deref, time::Duration}; use core::{ops::Deref, time::Duration};
use std::{sync::Arc, time::Instant}; use std::{sync::Arc, collections::HashMap, time::Instant};
use zeroize::{Zeroize, Zeroizing}; use zeroize::{Zeroize, Zeroizing};
use rand_core::{RngCore, OsRng}; use rand_core::{RngCore, OsRng};
@@ -9,16 +9,22 @@ use ciphersuite::{
Ciphersuite, Ristretto, Ciphersuite, Ristretto,
}; };
use borsh::BorshDeserialize;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use serai_client::{primitives::PublicKey, Serai}; use serai_client::{
primitives::{NetworkId, PublicKey},
validator_sets::primitives::ValidatorSet,
Serai,
};
use message_queue::{Service, client::MessageQueue}; use message_queue::{Service, client::MessageQueue};
use serai_task::{Task, TaskHandle, ContinuallyRan}; use serai_task::{Task, TaskHandle, ContinuallyRan};
use serai_cosign::{SignedCosign, Cosigning}; use serai_cosign::{Faulted, SignedCosign, Cosigning};
use serai_coordinator_substrate::{CanonicalEventStream, EphemeralEventStream, SignSlashReport}; use serai_coordinator_substrate::{CanonicalEventStream, EphemeralEventStream, SignSlashReport};
use serai_coordinator_tributary::Transaction; use serai_coordinator_tributary::{Signed, Transaction, SubstrateBlockPlans};
mod db; mod db;
use db::*; use db::*;
@@ -63,18 +69,60 @@ async fn serai() -> Arc<Serai> {
} }
} }
fn spawn_cosigning( fn spawn_cosigning<D: serai_db::Db>(
db: impl serai_db::Db, mut db: D,
serai: Arc<Serai>, serai: Arc<Serai>,
p2p: impl p2p::P2p, p2p: impl p2p::P2p,
tasks_to_run_upon_cosigning: Vec<TaskHandle>, tasks_to_run_upon_cosigning: Vec<TaskHandle>,
mut p2p_cosigns: mpsc::UnboundedReceiver<SignedCosign>, mut p2p_cosigns: mpsc::UnboundedReceiver<SignedCosign>,
mut signed_cosigns: mpsc::UnboundedReceiver<SignedCosign>,
) { ) {
let mut cosigning = Cosigning::spawn(db, serai, p2p.clone(), tasks_to_run_upon_cosigning); let mut cosigning = Cosigning::spawn(db.clone(), serai, p2p.clone(), tasks_to_run_upon_cosigning);
tokio::spawn(async move { tokio::spawn(async move {
const COSIGN_LOOP_INTERVAL: Duration = Duration::from_secs(5);
let last_cosign_rebroadcast = Instant::now(); let last_cosign_rebroadcast = Instant::now();
loop { loop {
// Intake our own cosigns
match Cosigning::<D>::latest_cosigned_block_number(&db) {
Ok(latest_cosigned_block_number) => {
let mut txn = db.txn();
// The cosigns we prior tried to intake yet failed to
let mut cosigns = ErroneousCosigns::get(&txn).unwrap_or(vec![]);
// The cosigns we have yet to intake
while let Some(cosign) = SignedCosigns::try_recv(&mut txn) {
cosigns.push(cosign);
}
let mut erroneous = vec![];
for cosign in cosigns {
// If this cosign is stale, move on
if cosign.cosign.block_number <= latest_cosigned_block_number {
continue;
}
match cosigning.intake_cosign(&cosign) {
// Publish this cosign
Ok(()) => p2p.publish_cosign(cosign).await,
Err(e) => {
assert!(e.temporal(), "signed an invalid cosign: {e:?}");
// Since this had a temporal error, queue it to try again later
erroneous.push(cosign);
}
};
}
// Save the cosigns with temporal errors to the database
ErroneousCosigns::set(&mut txn, &erroneous);
txn.commit();
}
Err(Faulted) => {
// We don't panic here as the following code rebroadcasts our cosigns which is
// necessary to inform other coordinators of the faulty cosigns
log::error!("cosigning faulted");
}
}
let time_till_cosign_rebroadcast = (last_cosign_rebroadcast + let time_till_cosign_rebroadcast = (last_cosign_rebroadcast +
serai_cosign::BROADCAST_FREQUENCY) serai_cosign::BROADCAST_FREQUENCY)
.saturating_duration_since(Instant::now()); .saturating_duration_since(Instant::now());
@@ -86,19 +134,134 @@ fn spawn_cosigning(
} }
cosign = p2p_cosigns.recv() => { cosign = p2p_cosigns.recv() => {
let cosign = cosign.expect("p2p cosigns channel was dropped?"); let cosign = cosign.expect("p2p cosigns channel was dropped?");
let _: Result<_, _> = cosigning.intake_cosign(&cosign); if cosigning.intake_cosign(&cosign).is_ok() {
}
cosign = signed_cosigns.recv() => {
let cosign = cosign.expect("signed cosigns channel was dropped?");
// TODO: Handle this error
let _: Result<_, _> = cosigning.intake_cosign(&cosign);
p2p.publish_cosign(cosign).await; p2p.publish_cosign(cosign).await;
} }
} }
// Make sure this loop runs at least this often
() = tokio::time::sleep(COSIGN_LOOP_INTERVAL) => {}
}
} }
}); });
} }
async fn handle_processor_messages(
mut db: impl serai_db::Db,
message_queue: Arc<MessageQueue>,
network: NetworkId,
) {
loop {
let (msg_id, msg) = {
let msg = message_queue.next(Service::Processor(network)).await;
// Check this message's sender is as expected
assert_eq!(msg.from, Service::Processor(network));
// Check this message's ID is as expected
let last = LastProcessorMessage::get(&db, network);
let next = last.map(|id| id + 1).unwrap_or(0);
// This should either be the last message's ID, if we committed but didn't send our ACK, or
// the expected next message's ID
assert!((Some(msg.id) == last) || (msg.id == next));
// TODO: Check msg.sig
// If this is the message we already handled, and just failed to ACK, ACK it now and move on
if Some(msg.id) == last {
message_queue.ack(Service::Processor(network), msg.id).await;
continue;
}
(msg.id, messages::ProcessorMessage::deserialize(&mut msg.msg.as_slice()).unwrap())
};
let mut txn = db.txn();
match msg {
messages::ProcessorMessage::KeyGen(msg) => match msg {
messages::key_gen::ProcessorMessage::Participation { session, participation } => {
let set = ValidatorSet { network, session };
TributaryTransactions::send(
&mut txn,
set,
&Transaction::DkgParticipation { participation, signed: Signed::default() },
);
}
messages::key_gen::ProcessorMessage::GeneratedKeyPair {
session,
substrate_key,
network_key,
} => todo!("TODO Transaction::DkgConfirmationPreprocess"),
messages::key_gen::ProcessorMessage::Blame { session, participant } => {
let set = ValidatorSet { network, session };
TributaryTransactions::send(
&mut txn,
set,
&Transaction::RemoveParticipant {
participant: todo!("TODO"),
signed: Signed::default(),
},
);
}
},
messages::ProcessorMessage::Sign(msg) => match msg {
messages::sign::ProcessorMessage::InvalidParticipant { session, participant } => {
let set = ValidatorSet { network, session };
TributaryTransactions::send(
&mut txn,
set,
&Transaction::RemoveParticipant {
participant: todo!("TODO"),
signed: Signed::default(),
},
);
}
messages::sign::ProcessorMessage::Preprocesses { id, preprocesses } => {
todo!("TODO Transaction::Batch + Transaction::Sign")
}
messages::sign::ProcessorMessage::Shares { id, shares } => todo!("TODO Transaction::Sign"),
},
messages::ProcessorMessage::Coordinator(msg) => match msg {
messages::coordinator::ProcessorMessage::CosignedBlock { cosign } => {
SignedCosigns::send(&mut txn, &cosign);
}
messages::coordinator::ProcessorMessage::SignedBatch { batch } => {
todo!("TODO PublishBatchTask")
}
messages::coordinator::ProcessorMessage::SignedSlashReport { session, signature } => {
todo!("TODO PublishSlashReportTask")
}
},
messages::ProcessorMessage::Substrate(msg) => match msg {
messages::substrate::ProcessorMessage::SubstrateBlockAck { block, plans } => {
let mut by_session = HashMap::new();
for plan in plans {
by_session
.entry(plan.session)
.or_insert_with(|| Vec::with_capacity(1))
.push(plan.transaction_plan_id);
}
for (session, plans) in by_session {
let set = ValidatorSet { network, session };
SubstrateBlockPlans::set(&mut txn, set, block, &plans);
TributaryTransactions::send(
&mut txn,
set,
&Transaction::SubstrateBlock { hash: block },
);
}
}
},
}
// Mark this as the last handled message
LastProcessorMessage::set(&mut txn, network, &msg_id);
// Commit the txn
txn.commit();
// Now that we won't handle this message again, acknowledge it so we won't see it again
message_queue.ack(Service::Processor(network), msg_id).await;
}
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
// Override the panic handler with one which will panic if any tokio task panics // Override the panic handler with one which will panic if any tokio task panics
@@ -148,6 +311,8 @@ async fn main() {
prune_tributary_db(to_cleanup); prune_tributary_db(to_cleanup);
// Drain the cosign intents created for this set // Drain the cosign intents created for this set
while !Cosigning::<Db>::intended_cosigns(&mut txn, to_cleanup).is_empty() {} while !Cosigning::<Db>::intended_cosigns(&mut txn, to_cleanup).is_empty() {}
// Drain the transactions to publish for this set
while TributaryTransactions::try_recv(&mut txn, to_cleanup).is_some() {}
// Remove the SignSlashReport notification // Remove the SignSlashReport notification
SignSlashReport::try_recv(&mut txn, to_cleanup); SignSlashReport::try_recv(&mut txn, to_cleanup);
} }
@@ -217,7 +382,6 @@ async fn main() {
); );
// Spawn the cosign handler // Spawn the cosign handler
let (signed_cosigns_send, signed_cosigns_recv) = mpsc::unbounded_channel();
spawn_cosigning( spawn_cosigning(
db.clone(), db.clone(),
serai.clone(), serai.clone(),
@@ -225,7 +389,6 @@ async fn main() {
// Run the Substrate scanners once we cosign new blocks // Run the Substrate scanners once we cosign new blocks
vec![substrate_canonical_task, substrate_ephemeral_task], vec![substrate_canonical_task, substrate_ephemeral_task],
p2p_cosigns_recv, p2p_cosigns_recv,
signed_cosigns_recv,
); );
// Spawn all Tributaries on-disk // Spawn all Tributaries on-disk
@@ -254,7 +417,14 @@ async fn main() {
.continually_run(substrate_task_def, vec![]), .continually_run(substrate_task_def, vec![]),
); );
// TODO: Handle processor messages // Handle all of the Processors' messages
for network in serai_client::primitives::NETWORKS {
if network == NetworkId::Serai {
continue;
}
tokio::spawn(handle_processor_messages(db.clone(), message_queue.clone(), network));
}
todo!("TODO") // Run the spawned tasks ad-infinitum
core::future::pending().await
} }

View File

@@ -32,7 +32,8 @@ pub(crate) struct SubstrateTask<P: P2p> {
} }
impl<P: P2p> ContinuallyRan for SubstrateTask<P> { impl<P: P2p> ContinuallyRan for SubstrateTask<P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { type Error = String; // TODO
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move { async move {
let mut made_progress = false; let mut made_progress = false;
@@ -69,8 +70,7 @@ impl<P: P2p> ContinuallyRan for SubstrateTask<P> {
intent: msg.intent(), intent: msg.intent(),
}; };
let msg = borsh::to_vec(&msg).unwrap(); let msg = borsh::to_vec(&msg).unwrap();
// TODO: Make this fallible self.message_queue.queue(metadata, msg).await?;
self.message_queue.queue(metadata, msg).await;
txn.commit(); txn.commit();
made_progress = true; made_progress = true;
} }
@@ -132,8 +132,7 @@ impl<P: P2p> ContinuallyRan for SubstrateTask<P> {
intent: msg.intent(), intent: msg.intent(),
}; };
let msg = borsh::to_vec(&msg).unwrap(); let msg = borsh::to_vec(&msg).unwrap();
// TODO: Make this fallible self.message_queue.queue(metadata, msg).await?;
self.message_queue.queue(metadata, msg).await;
// Commit the transaction for all of this // Commit the transaction for all of this
txn.commit(); txn.commit();

View File

@@ -8,43 +8,39 @@ use ciphersuite::{Ciphersuite, Ristretto};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use serai_db::{DbTxn, Db as DbTrait}; use serai_db::{Get, DbTxn, Db as DbTrait, create_db, db_channel};
use scale::Encode; use scale::Encode;
use serai_client::validator_sets::primitives::ValidatorSet; use serai_client::validator_sets::primitives::ValidatorSet;
use tributary_sdk::{TransactionError, ProvidedError, Tributary}; use tributary_sdk::{TransactionKind, TransactionError, ProvidedError, TransactionTrait, Tributary};
use serai_task::{Task, TaskHandle, ContinuallyRan}; use serai_task::{Task, TaskHandle, DoesNotError, ContinuallyRan};
use message_queue::{Service, Metadata, client::MessageQueue}; use message_queue::{Service, Metadata, client::MessageQueue};
use serai_cosign::Cosigning; use serai_cosign::{Faulted, CosignIntent, Cosigning};
use serai_coordinator_substrate::{NewSetInformation, SignSlashReport}; use serai_coordinator_substrate::{NewSetInformation, SignSlashReport};
use serai_coordinator_tributary::{Transaction, ProcessorMessages, ScanTributaryTask}; use serai_coordinator_tributary::{Transaction, ProcessorMessages, CosignIntents, ScanTributaryTask};
use serai_coordinator_p2p::P2p; use serai_coordinator_p2p::P2p;
use crate::Db; use crate::{Db, TributaryTransactions};
/// Provides Cosign/Cosigned Transactions onto the Tributary. db_channel! {
pub(crate) struct ProvideCosignCosignedTransactionsTask<CD: DbTrait, TD: DbTrait, P: P2p> { Coordinator {
db: CD, PendingCosigns: (set: ValidatorSet) -> CosignIntent,
set: NewSetInformation, }
tributary: Tributary<TD, Transaction, P>,
} }
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
for ProvideCosignCosignedTransactionsTask<CD, TD, P> /// Provide a Provided Transaction to the Tributary.
{ ///
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { /// This is not a well-designed function. This is specific to the context in which its called,
/// Provide a Provided Transaction to the Tributary. /// within this file. It should only be considered an internal helper for this domain alone.
/// async fn provide_transaction<TD: DbTrait, P: P2p>(
/// This is not a well-designed function. This is specific to the context in which its called,
/// within this file. It should only be considered an internal helper for this domain alone.
async fn provide_transaction<TD: DbTrait, P: P2p>(
set: ValidatorSet, set: ValidatorSet,
tributary: &Tributary<TD, Transaction, P>, tributary: &Tributary<TD, Transaction, P>,
tx: Transaction, tx: Transaction,
) { ) {
match tributary.provide_transaction(tx.clone()).await { match tributary.provide_transaction(tx.clone()).await {
// The Tributary uses its own DB, so we may provide this multiple times if we reboot before // The Tributary uses its own DB, so we may provide this multiple times if we reboot before
// committing the txn which provoked this // committing the txn which provoked this
@@ -55,10 +51,10 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
Err(ProvidedError::InvalidProvided(e)) => { Err(ProvidedError::InvalidProvided(e)) => {
panic!("providing an invalid Provided transaction, tx: {tx:?}, error: {e:?}") panic!("providing an invalid Provided transaction, tx: {tx:?}, error: {e:?}")
} }
Err(ProvidedError::LocalMismatchesOnChain) => loop {
// The Tributary's scan task won't advance if we don't have the Provided transactions // The Tributary's scan task won't advance if we don't have the Provided transactions
// present on-chain, and this enters an infinite loop to block the calling task from // present on-chain, and this enters an infinite loop to block the calling task from
// advancing // advancing
Err(ProvidedError::LocalMismatchesOnChain) => loop {
log::error!( log::error!(
"Tributary {:?} was supposed to provide {:?} but peers disagree, halting Tributary", "Tributary {:?} was supposed to provide {:?} but peers disagree, halting Tributary",
set, set,
@@ -68,8 +64,21 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
tokio::time::sleep(Duration::from_secs(5 * 60)).await; tokio::time::sleep(Duration::from_secs(5 * 60)).await;
}, },
} }
} }
/// Provides Cosign/Cosigned Transactions onto the Tributary.
pub(crate) struct ProvideCosignCosignedTransactionsTask<CD: DbTrait, TD: DbTrait, P: P2p> {
db: CD,
tributary_db: TD,
set: NewSetInformation,
tributary: Tributary<TD, Transaction, P>,
}
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
for ProvideCosignCosignedTransactionsTask<CD, TD, P>
{
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move { async move {
let mut made_progress = false; let mut made_progress = false;
@@ -79,16 +88,27 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
let mut txn = self.db.txn(); let mut txn = self.db.txn();
// Fetch the next cosign this tributary should handle // Fetch the next cosign this tributary should handle
let Some(cosign) = crate::PendingCosigns::try_recv(&mut txn, self.set.set) else { break }; let Some(cosign) = PendingCosigns::try_recv(&mut txn, self.set.set) else { break };
pending_notable_cosign = cosign.notable; pending_notable_cosign = cosign.notable;
// If we (Serai) haven't cosigned this block, break as this is still pending // If we (Serai) haven't cosigned this block, break as this is still pending
let Ok(latest) = Cosigning::<CD>::latest_cosigned_block_number(&txn) else { break }; let latest = match Cosigning::<CD>::latest_cosigned_block_number(&txn) {
Ok(latest) => latest,
Err(Faulted) => {
log::error!("cosigning faulted");
Err("cosigning faulted")?
}
};
if latest < cosign.block_number { if latest < cosign.block_number {
break; break;
} }
// Because we've cosigned it, provide the TX for that // Because we've cosigned it, provide the TX for that
{
let mut txn = self.tributary_db.txn();
CosignIntents::provide(&mut txn, self.set.set, &cosign);
txn.commit();
}
provide_transaction( provide_transaction(
self.set.set, self.set.set,
&self.tributary, &self.tributary,
@@ -109,7 +129,7 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
// intended_cosigns will only yield up to and including the next notable cosign // intended_cosigns will only yield up to and including the next notable cosign
for cosign in Cosigning::<CD>::intended_cosigns(&mut txn, self.set.set) { for cosign in Cosigning::<CD>::intended_cosigns(&mut txn, self.set.set) {
// Flag this cosign as pending // Flag this cosign as pending
crate::PendingCosigns::send(&mut txn, self.set.set, &cosign); PendingCosigns::send(&mut txn, self.set.set, &cosign);
// Provide the transaction to queue it for work // Provide the transaction to queue it for work
provide_transaction( provide_transaction(
self.set.set, self.set.set,
@@ -127,6 +147,68 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
} }
} }
/// Adds all of the transactions sent via `TributaryTransactions`.
pub(crate) struct AddTributaryTransactionsTask<CD: DbTrait, TD: DbTrait, P: P2p> {
db: CD,
tributary_db: TD,
tributary: Tributary<TD, Transaction, P>,
set: ValidatorSet,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
}
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for AddTributaryTransactionsTask<CD, TD, P> {
type Error = DoesNotError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut made_progress = false;
loop {
let mut txn = self.db.txn();
let Some(mut tx) = TributaryTransactions::try_recv(&mut txn, self.set) else { break };
let kind = tx.kind();
match kind {
TransactionKind::Provided(_) => provide_transaction(self.set, &self.tributary, tx).await,
TransactionKind::Unsigned | TransactionKind::Signed(_, _) => {
// If this is a signed transaction, sign it
if matches!(kind, TransactionKind::Signed(_, _)) {
tx.sign(&mut OsRng, self.tributary.genesis(), &self.key);
}
// Actually add the transaction
// TODO: If this is a preprocess, make sure the topic has been recognized
let res = self.tributary.add_transaction(tx.clone()).await;
match &res {
// Fresh publication, already published
Ok(true | false) => {}
Err(
TransactionError::TooLargeTransaction |
TransactionError::InvalidSigner |
TransactionError::InvalidNonce |
TransactionError::InvalidSignature |
TransactionError::InvalidContent,
) => {
panic!("created an invalid transaction, tx: {tx:?}, err: {res:?}");
}
// We've published too many transactions recently
// Drop this txn to try to publish it again later on a future iteration
Err(TransactionError::TooManyInMempool) => {
drop(txn);
break;
}
// This isn't a Provided transaction so this should never be hit
Err(TransactionError::ProvidedAddedToMempool) => unreachable!(),
}
}
}
made_progress = true;
txn.commit();
}
Ok(made_progress)
}
}
}
/// Takes the messages from ScanTributaryTask and publishes them to the message-queue. /// Takes the messages from ScanTributaryTask and publishes them to the message-queue.
pub(crate) struct TributaryProcessorMessagesTask<TD: DbTrait> { pub(crate) struct TributaryProcessorMessagesTask<TD: DbTrait> {
tributary_db: TD, tributary_db: TD,
@@ -134,7 +216,9 @@ pub(crate) struct TributaryProcessorMessagesTask<TD: DbTrait> {
message_queue: Arc<MessageQueue>, message_queue: Arc<MessageQueue>,
} }
impl<TD: DbTrait> ContinuallyRan for TributaryProcessorMessagesTask<TD> { impl<TD: DbTrait> ContinuallyRan for TributaryProcessorMessagesTask<TD> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { type Error = String; // TODO
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move { async move {
let mut made_progress = false; let mut made_progress = false;
loop { loop {
@@ -146,8 +230,7 @@ impl<TD: DbTrait> ContinuallyRan for TributaryProcessorMessagesTask<TD> {
intent: msg.intent(), intent: msg.intent(),
}; };
let msg = borsh::to_vec(&msg).unwrap(); let msg = borsh::to_vec(&msg).unwrap();
// TODO: Make this fallible self.message_queue.queue(metadata, msg).await?;
self.message_queue.queue(metadata, msg).await;
txn.commit(); txn.commit();
made_progress = true; made_progress = true;
} }
@@ -165,7 +248,9 @@ pub(crate) struct SignSlashReportTask<CD: DbTrait, TD: DbTrait, P: P2p> {
key: Zeroizing<<Ristretto as Ciphersuite>::F>, key: Zeroizing<<Ristretto as Ciphersuite>::F>,
} }
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for SignSlashReportTask<CD, TD, P> { impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for SignSlashReportTask<CD, TD, P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { type Error = DoesNotError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move { async move {
let mut txn = self.db.txn(); let mut txn = self.db.txn();
let Some(()) = SignSlashReport::try_recv(&mut txn, self.set.set) else { return Ok(false) }; let Some(()) = SignSlashReport::try_recv(&mut txn, self.set.set) else { return Ok(false) };
@@ -190,7 +275,10 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for SignSlashReportTask<CD
} }
// We've published too many transactions recently // We've published too many transactions recently
// Drop this txn to try to publish it again later on a future iteration // Drop this txn to try to publish it again later on a future iteration
Err(TransactionError::TooManyInMempool) => return Ok(false), Err(TransactionError::TooManyInMempool) => {
drop(txn);
return Ok(false);
}
// This isn't a Provided transaction so this should never be hit // This isn't a Provided transaction so this should never be hit
Err(TransactionError::ProvidedAddedToMempool) => unreachable!(), Err(TransactionError::ProvidedAddedToMempool) => unreachable!(),
} }
@@ -294,6 +382,7 @@ pub(crate) async fn spawn_tributary<P: P2p>(
tokio::spawn( tokio::spawn(
(ProvideCosignCosignedTransactionsTask { (ProvideCosignCosignedTransactionsTask {
db: db.clone(), db: db.clone(),
tributary_db: tributary_db.clone(),
set: set.clone(), set: set.clone(),
tributary: tributary.clone(), tributary: tributary.clone(),
}) })
@@ -314,7 +403,7 @@ pub(crate) async fn spawn_tributary<P: P2p>(
// Spawn the scan task // Spawn the scan task
let (scan_tributary_task_def, scan_tributary_task) = Task::new(); let (scan_tributary_task_def, scan_tributary_task) = Task::new();
tokio::spawn( tokio::spawn(
ScanTributaryTask::<_, _, P>::new(db.clone(), tributary_db.clone(), &set, reader) ScanTributaryTask::<_, P>::new(tributary_db.clone(), &set, reader)
// This is the only handle for this TributaryProcessorMessagesTask, so when this task is // This is the only handle for this TributaryProcessorMessagesTask, so when this task is
// dropped, it will be too // dropped, it will be too
.continually_run(scan_tributary_task_def, vec![scan_tributary_messages_task]), .continually_run(scan_tributary_task_def, vec![scan_tributary_messages_task]),
@@ -325,14 +414,27 @@ pub(crate) async fn spawn_tributary<P: P2p>(
tokio::spawn( tokio::spawn(
(SignSlashReportTask { (SignSlashReportTask {
db: db.clone(), db: db.clone(),
tributary_db, tributary_db: tributary_db.clone(),
tributary: tributary.clone(), tributary: tributary.clone(),
set: set.clone(), set: set.clone(),
key: serai_key, key: serai_key.clone(),
}) })
.continually_run(sign_slash_report_task_def, vec![]), .continually_run(sign_slash_report_task_def, vec![]),
); );
// Spawn the add transactions task
let (add_tributary_transactions_task_def, add_tributary_transactions_task) = Task::new();
tokio::spawn(
(AddTributaryTransactionsTask {
db: db.clone(),
tributary_db,
tributary: tributary.clone(),
set: set.set,
key: serai_key,
})
.continually_run(add_tributary_transactions_task_def, vec![]),
);
// Whenever a new block occurs, immediately run the scan task // Whenever a new block occurs, immediately run the scan task
// This function also preserves the ProvideCosignCosignedTransactionsTask handle until the // This function also preserves the ProvideCosignCosignedTransactionsTask handle until the
// Tributary is retired, ensuring it isn't dropped prematurely and that the task don't run ad // Tributary is retired, ensuring it isn't dropped prematurely and that the task don't run ad
@@ -342,6 +444,10 @@ pub(crate) async fn spawn_tributary<P: P2p>(
set.set, set.set,
tributary, tributary,
scan_tributary_task, scan_tributary_task,
vec![provide_cosign_cosigned_transactions_task, sign_slash_report_task], vec![
provide_cosign_cosigned_transactions_task,
sign_slash_report_task,
add_tributary_transactions_task,
],
)); ));
} }

View File

@@ -18,7 +18,9 @@ rustdoc-args = ["--cfg", "docsrs"]
workspace = true workspace = true
[dependencies] [dependencies]
scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive"] } bitvec = { version = "1", default-features = false, features = ["std"] }
scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive", "bit-vec"] }
borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] } borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] }
serai-client = { path = "../../substrate/client", version = "0.1", default-features = false, features = ["serai", "borsh"] } serai-client = { path = "../../substrate/client", version = "0.1", default-features = false, features = ["serai", "borsh"] }

View File

@@ -1,6 +1,6 @@
# Serai Coordinate Substrate Scanner # Serai Coordinator Substrate
This is the scanner of the Serai blockchain for the purposes of Serai's coordinator. This crate manages the Serai coordinators's interactions with Serai's Substrate blockchain.
Two event streams are defined: Two event streams are defined:
@@ -12,3 +12,9 @@ Two event streams are defined:
The canonical event stream is available without provision of a validator's public key. The ephemeral The canonical event stream is available without provision of a validator's public key. The ephemeral
event stream requires provision of a validator's public key. Both are ordered within themselves, yet event stream requires provision of a validator's public key. Both are ordered within themselves, yet
there are no ordering guarantees across the two. there are no ordering guarantees across the two.
Additionally, a collection of tasks are defined to publish data onto Serai:
- `SetKeysTask`, which sets the keys generated via DKGs onto Serai.
- `PublishBatchTask`, which publishes `Batch`s onto Serai.
- `PublishSlashReportTask`, which publishes `SlashReport`s onto Serai.

View File

@@ -34,7 +34,9 @@ impl<D: Db> CanonicalEventStream<D> {
} }
impl<D: Db> ContinuallyRan for CanonicalEventStream<D> { impl<D: Db> ContinuallyRan for CanonicalEventStream<D> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move { async move {
let next_block = NextBlock::get(&self.db).unwrap_or(0); let next_block = NextBlock::get(&self.db).unwrap_or(0);
let latest_finalized_block = let latest_finalized_block =

View File

@@ -39,7 +39,9 @@ impl<D: Db> EphemeralEventStream<D> {
} }
impl<D: Db> ContinuallyRan for EphemeralEventStream<D> { impl<D: Db> ContinuallyRan for EphemeralEventStream<D> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move { async move {
let next_block = NextBlock::get(&self.db).unwrap_or(0); let next_block = NextBlock::get(&self.db).unwrap_or(0);
let latest_finalized_block = let latest_finalized_block =
@@ -157,8 +159,9 @@ impl<D: Db> ContinuallyRan for EphemeralEventStream<D> {
Err("validator's weight exceeded u16::MAX".to_string())? Err("validator's weight exceeded u16::MAX".to_string())?
}; };
// Do the summation in u32 so we don't risk a u16 overflow
let total_weight = validators.iter().map(|(_, weight)| u32::from(*weight)).sum::<u32>(); let total_weight = validators.iter().map(|(_, weight)| u32::from(*weight)).sum::<u32>();
if total_weight > MAX_KEY_SHARES_PER_SET { if total_weight > u32::from(MAX_KEY_SHARES_PER_SET) {
Err(format!( Err(format!(
"{set:?} has {total_weight} key shares when the max is {MAX_KEY_SHARES_PER_SET}" "{set:?} has {total_weight} key shares when the max is {MAX_KEY_SHARES_PER_SET}"
))?; ))?;

View File

@@ -6,8 +6,10 @@ use scale::{Encode, Decode};
use borsh::{io, BorshSerialize, BorshDeserialize}; use borsh::{io, BorshSerialize, BorshDeserialize};
use serai_client::{ use serai_client::{
primitives::{PublicKey, NetworkId}, primitives::{NetworkId, PublicKey, Signature, SeraiAddress},
validator_sets::primitives::ValidatorSet, validator_sets::primitives::{Session, ValidatorSet, KeyPair},
in_instructions::primitives::SignedBatch,
Transaction,
}; };
use serai_db::*; use serai_db::*;
@@ -17,6 +19,13 @@ pub use canonical::CanonicalEventStream;
mod ephemeral; mod ephemeral;
pub use ephemeral::EphemeralEventStream; pub use ephemeral::EphemeralEventStream;
mod set_keys;
pub use set_keys::SetKeysTask;
mod publish_batch;
pub use publish_batch::PublishBatchTask;
mod publish_slash_report;
pub use publish_slash_report::PublishSlashReportTask;
fn borsh_serialize_validators<W: io::Write>( fn borsh_serialize_validators<W: io::Write>(
validators: &Vec<(PublicKey, u16)>, validators: &Vec<(PublicKey, u16)>,
writer: &mut W, writer: &mut W,
@@ -53,11 +62,7 @@ pub struct NewSetInformation {
} }
mod _public_db { mod _public_db {
use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet}; use super::*;
use serai_db::*;
use crate::NewSetInformation;
db_channel!( db_channel!(
CoordinatorSubstrate { CoordinatorSubstrate {
@@ -68,6 +73,18 @@ mod _public_db {
NewSet: () -> NewSetInformation, NewSet: () -> NewSetInformation,
// Potentially relevant sign slash report, from an ephemeral event stream // Potentially relevant sign slash report, from an ephemeral event stream
SignSlashReport: (set: ValidatorSet) -> (), SignSlashReport: (set: ValidatorSet) -> (),
// Signed batches to publish onto the Serai network
SignedBatches: (network: NetworkId) -> SignedBatch,
}
);
create_db!(
CoordinatorSubstrate {
// Keys to set on the Serai network
Keys: (network: NetworkId) -> (Session, Vec<u8>),
// Slash reports to publish onto the Serai network
SlashReports: (network: NetworkId) -> (Session, Vec<u8>),
} }
); );
} }
@@ -118,3 +135,94 @@ impl SignSlashReport {
_public_db::SignSlashReport::try_recv(txn, set) _public_db::SignSlashReport::try_recv(txn, set)
} }
} }
/// The keys to set on Serai.
pub struct Keys;
impl Keys {
/// Set the keys to report for a validator set.
///
/// This only saves the most recent keys as only a single session is eligible to have its keys
/// reported at once.
pub fn set(
txn: &mut impl DbTxn,
set: ValidatorSet,
key_pair: KeyPair,
signature_participants: bitvec::vec::BitVec<u8, bitvec::order::Lsb0>,
signature: Signature,
) {
// If we have a more recent pair of keys, don't write this historic one
if let Some((existing_session, _)) = _public_db::Keys::get(txn, set.network) {
if existing_session.0 >= set.session.0 {
return;
}
}
let tx = serai_client::validator_sets::SeraiValidatorSets::set_keys(
set.network,
key_pair,
signature_participants,
signature,
);
_public_db::Keys::set(txn, set.network, &(set.session, tx.encode()));
}
pub(crate) fn take(txn: &mut impl DbTxn, network: NetworkId) -> Option<(Session, Transaction)> {
let (session, tx) = _public_db::Keys::take(txn, network)?;
Some((session, <_>::decode(&mut tx.as_slice()).unwrap()))
}
}
/// The signed batches to publish onto Serai.
pub struct SignedBatches;
impl SignedBatches {
/// Send a `SignedBatch` to publish onto Serai.
///
/// These will be published sequentially. Out-of-order sending risks hanging the task.
pub fn send(txn: &mut impl DbTxn, batch: &SignedBatch) {
_public_db::SignedBatches::send(txn, batch.batch.network, batch);
}
pub(crate) fn try_recv(txn: &mut impl DbTxn, network: NetworkId) -> Option<SignedBatch> {
_public_db::SignedBatches::try_recv(txn, network)
}
}
/// The slash report was invalid.
#[derive(Debug)]
pub struct InvalidSlashReport;
/// The slash reports to publish onto Serai.
pub struct SlashReports;
impl SlashReports {
/// Set the slashes to report for a validator set.
///
/// This only saves the most recent slashes as only a single session is eligible to have its
/// slashes reported at once.
///
/// Returns Err if the slashes are invalid. Returns Ok if the slashes weren't detected as
/// invalid. Slashes may be considered invalid by the Serai blockchain later even if not detected
/// as invalid here.
pub fn set(
txn: &mut impl DbTxn,
set: ValidatorSet,
slashes: Vec<(SeraiAddress, u32)>,
signature: Signature,
) -> Result<(), InvalidSlashReport> {
// If we have a more recent slash report, don't write this historic one
if let Some((existing_session, _)) = _public_db::SlashReports::get(txn, set.network) {
if existing_session.0 >= set.session.0 {
return Ok(());
}
}
let tx = serai_client::validator_sets::SeraiValidatorSets::report_slashes(
set.network,
slashes.try_into().map_err(|_| InvalidSlashReport)?,
signature,
);
_public_db::SlashReports::set(txn, set.network, &(set.session, tx.encode()));
Ok(())
}
pub(crate) fn take(txn: &mut impl DbTxn, network: NetworkId) -> Option<(Session, Transaction)> {
let (session, tx) = _public_db::SlashReports::take(txn, network)?;
Some((session, <_>::decode(&mut tx.as_slice()).unwrap()))
}
}

View File

@@ -0,0 +1,66 @@
use core::future::Future;
use std::sync::Arc;
use serai_db::{DbTxn, Db};
use serai_client::{primitives::NetworkId, SeraiError, Serai};
use serai_task::ContinuallyRan;
use crate::SignedBatches;
/// Publish `SignedBatch`s from `SignedBatches` onto Serai.
pub struct PublishBatchTask<D: Db> {
db: D,
serai: Arc<Serai>,
network: NetworkId,
}
impl<D: Db> PublishBatchTask<D> {
/// Create a task to publish `SignedBatch`s onto Serai.
///
/// Returns None if `network == NetworkId::Serai`.
// TODO: ExternalNetworkId
pub fn new(db: D, serai: Arc<Serai>, network: NetworkId) -> Option<Self> {
if network == NetworkId::Serai {
None?
};
Some(Self { db, serai, network })
}
}
impl<D: Db> ContinuallyRan for PublishBatchTask<D> {
type Error = SeraiError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut made_progress = false;
loop {
let mut txn = self.db.txn();
let Some(batch) = SignedBatches::try_recv(&mut txn, self.network) else {
// No batch to publish at this time
break;
};
// Publish this Batch if it hasn't already been published
let serai = self.serai.as_of_latest_finalized_block().await?;
let last_batch = serai.in_instructions().last_batch_for_network(self.network).await?;
if last_batch < Some(batch.batch.id) {
// This stream of Batches *should* be sequential within the larger context of the Serai
// coordinator. In this library, we use a more relaxed definition and don't assert
// sequence. This does risk hanging the task, if Batch #n+1 is sent before Batch #n, but
// that is a documented fault of the `SignedBatches` API.
self
.serai
.publish(&serai_client::in_instructions::SeraiInInstructions::execute_batch(batch))
.await?;
}
txn.commit();
made_progress = true;
}
Ok(made_progress)
}
}
}

View File

@@ -0,0 +1,89 @@
use core::future::Future;
use std::sync::Arc;
use serai_db::{DbTxn, Db};
use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, Serai};
use serai_task::ContinuallyRan;
use crate::SlashReports;
/// Publish slash reports from `SlashReports` onto Serai.
pub struct PublishSlashReportTask<D: Db> {
db: D,
serai: Arc<Serai>,
}
impl<D: Db> PublishSlashReportTask<D> {
/// Create a task to publish slash reports onto Serai.
pub fn new(db: D, serai: Arc<Serai>) -> Self {
Self { db, serai }
}
}
impl<D: Db> ContinuallyRan for PublishSlashReportTask<D> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut made_progress = false;
for network in serai_client::primitives::NETWORKS {
if network == NetworkId::Serai {
continue;
};
let mut txn = self.db.txn();
let Some((session, slash_report)) = SlashReports::take(&mut txn, network) else {
// No slash report to publish
continue;
};
let serai =
self.serai.as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?;
let serai = serai.validator_sets();
let session_after_slash_report = Session(session.0 + 1);
let current_session = serai.session(network).await.map_err(|e| format!("{e:?}"))?;
let current_session = current_session.map(|session| session.0);
// Only attempt to publish the slash report for session #n while session #n+1 is still
// active
let session_after_slash_report_retired =
current_session > Some(session_after_slash_report.0);
if session_after_slash_report_retired {
// Commit the txn to drain this slash report from the database and not try it again later
txn.commit();
continue;
}
if Some(session_after_slash_report.0) != current_session {
// We already checked the current session wasn't greater, and they're not equal
assert!(current_session < Some(session_after_slash_report.0));
// This would mean the Serai node is resyncing and is behind where it prior was
Err("have a slash report for a session Serai has yet to retire".to_string())?;
}
// If this session which should publish a slash report already has, move on
let key_pending_slash_report =
serai.key_pending_slash_report(network).await.map_err(|e| format!("{e:?}"))?;
if key_pending_slash_report.is_none() {
txn.commit();
continue;
};
match self.serai.publish(&slash_report).await {
Ok(()) => {
txn.commit();
made_progress = true;
}
// This could be specific to this TX (such as an already in mempool error) and it may be
// worthwhile to continue iteration with the other pending slash reports. We assume this
// error ephemeral and that the latency incurred for this ephemeral error to resolve is
// miniscule compared to the window available to publish the slash report. That makes
// this a non-issue.
Err(e) => Err(format!("couldn't publish slash report transaction: {e:?}"))?,
}
}
Ok(made_progress)
}
}
}

View File

@@ -0,0 +1,88 @@
use core::future::Future;
use std::sync::Arc;
use serai_db::{DbTxn, Db};
use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet, Serai};
use serai_task::ContinuallyRan;
use crate::Keys;
/// Set keys from `Keys` on Serai.
pub struct SetKeysTask<D: Db> {
db: D,
serai: Arc<Serai>,
}
impl<D: Db> SetKeysTask<D> {
/// Create a task to publish slash reports onto Serai.
pub fn new(db: D, serai: Arc<Serai>) -> Self {
Self { db, serai }
}
}
impl<D: Db> ContinuallyRan for SetKeysTask<D> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut made_progress = false;
for network in serai_client::primitives::NETWORKS {
if network == NetworkId::Serai {
continue;
};
let mut txn = self.db.txn();
let Some((session, keys)) = Keys::take(&mut txn, network) else {
// No keys to set
continue;
};
let serai =
self.serai.as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?;
let serai = serai.validator_sets();
let current_session = serai.session(network).await.map_err(|e| format!("{e:?}"))?;
let current_session = current_session.map(|session| session.0);
// Only attempt to set these keys if this isn't a retired session
if Some(session.0) < current_session {
// Commit the txn to take these keys from the database and not try it again later
txn.commit();
continue;
}
if Some(session.0) != current_session {
// We already checked the current session wasn't greater, and they're not equal
assert!(current_session < Some(session.0));
// This would mean the Serai node is resyncing and is behind where it prior was
Err("have a keys for a session Serai has yet to start".to_string())?;
}
// If this session already has had its keys set, move on
if serai
.keys(ValidatorSet { network, session })
.await
.map_err(|e| format!("{e:?}"))?
.is_some()
{
txn.commit();
continue;
};
match self.serai.publish(&keys).await {
Ok(()) => {
txn.commit();
made_progress = true;
}
// This could be specific to this TX (such as an already in mempool error) and it may be
// worthwhile to continue iteration with the other pending slash reports. We assume this
// error ephemeral and that the latency incurred for this ephemeral error to resolve is
// miniscule compared to the window reasonable to set the keys. That makes this a
// non-issue.
Err(e) => Err(format!("couldn't publish set keys transaction: {e:?}"))?,
}
}
Ok(made_progress)
}
}
}

View File

@@ -9,6 +9,8 @@ use messages::sign::{VariantSignId, SignId};
use serai_db::*; use serai_db::*;
use serai_cosign::CosignIntent;
use crate::transaction::SigningProtocolRound; use crate::transaction::SigningProtocolRound;
/// A topic within the database which the group participates in /// A topic within the database which the group participates in
@@ -187,6 +189,8 @@ create_db!(
// The slash points a validator has accrued, with u32::MAX representing a fatal slash. // The slash points a validator has accrued, with u32::MAX representing a fatal slash.
SlashPoints: (set: ValidatorSet, validator: SeraiAddress) -> u32, SlashPoints: (set: ValidatorSet, validator: SeraiAddress) -> u32,
// The cosign intent for a Substrate block
CosignIntents: (set: ValidatorSet, substrate_block_hash: [u8; 32]) -> CosignIntent,
// The latest Substrate block to cosign. // The latest Substrate block to cosign.
LatestSubstrateBlockToCosign: (set: ValidatorSet) -> [u8; 32], LatestSubstrateBlockToCosign: (set: ValidatorSet) -> [u8; 32],
// The hash of the block we're actively cosigning. // The hash of the block we're actively cosigning.
@@ -194,6 +198,9 @@ create_db!(
// If this block has already been cosigned. // If this block has already been cosigned.
Cosigned: (set: ValidatorSet, substrate_block_hash: [u8; 32]) -> (), Cosigned: (set: ValidatorSet, substrate_block_hash: [u8; 32]) -> (),
// The plans to whitelist upon a `Transaction::SubstrateBlock` being included on-chain.
SubstrateBlockPlans: (set: ValidatorSet, substrate_block_hash: [u8; 32]) -> Vec<[u8; 32]>,
// The weight accumulated for a topic. // The weight accumulated for a topic.
AccumulatedWeight: (set: ValidatorSet, topic: Topic) -> u64, AccumulatedWeight: (set: ValidatorSet, topic: Topic) -> u64,
// The entries accumulated for a topic, by validator. // The entries accumulated for a topic, by validator.

View File

@@ -24,14 +24,13 @@ use tributary_sdk::{
Transaction as TributaryTransaction, Block, TributaryReader, P2p, Transaction as TributaryTransaction, Block, TributaryReader, P2p,
}; };
use serai_cosign::Cosigning; use serai_cosign::CosignIntent;
use serai_coordinator_substrate::NewSetInformation; use serai_coordinator_substrate::NewSetInformation;
use messages::sign::VariantSignId; use messages::sign::VariantSignId;
mod transaction; mod transaction;
pub(crate) use transaction::{SigningProtocolRound, Signed}; pub use transaction::{SigningProtocolRound, Signed, Transaction};
pub use transaction::Transaction;
mod db; mod db;
use db::*; use db::*;
@@ -45,17 +44,58 @@ impl ProcessorMessages {
} }
} }
struct ScanBlock<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> { /// The cosign intents.
pub struct CosignIntents;
impl CosignIntents {
/// Provide a CosignIntent for this Tributary.
///
/// This must be done before the associated `Transaction::Cosign` is provided.
pub fn provide(txn: &mut impl DbTxn, set: ValidatorSet, intent: &CosignIntent) {
db::CosignIntents::set(txn, set, intent.block_hash, intent);
}
fn take(
txn: &mut impl DbTxn,
set: ValidatorSet,
substrate_block_hash: [u8; 32],
) -> Option<CosignIntent> {
db::CosignIntents::take(txn, set, substrate_block_hash)
}
}
/// The plans to whitelist upon a `Transaction::SubstrateBlock` being included on-chain.
pub struct SubstrateBlockPlans;
impl SubstrateBlockPlans {
/// Set the plans to whitelist upon the associated `Transaction::SubstrateBlock` being included
/// on-chain.
///
/// This must be done before the associated `Transaction::Cosign` is provided.
pub fn set(
txn: &mut impl DbTxn,
set: ValidatorSet,
substrate_block_hash: [u8; 32],
plans: &Vec<[u8; 32]>,
) {
db::SubstrateBlockPlans::set(txn, set, substrate_block_hash, &plans);
}
fn take(
txn: &mut impl DbTxn,
set: ValidatorSet,
substrate_block_hash: [u8; 32],
) -> Option<Vec<[u8; 32]>> {
db::SubstrateBlockPlans::take(txn, set, substrate_block_hash)
}
}
struct ScanBlock<'a, TD: Db, TDT: DbTxn, P: P2p> {
_td: PhantomData<TD>, _td: PhantomData<TD>,
_p2p: PhantomData<P>, _p2p: PhantomData<P>,
cosign_db: &'a CD,
tributary_txn: &'a mut TDT, tributary_txn: &'a mut TDT,
set: ValidatorSet, set: ValidatorSet,
validators: &'a [SeraiAddress], validators: &'a [SeraiAddress],
total_weight: u64, total_weight: u64,
validator_weights: &'a HashMap<SeraiAddress, u64>, validator_weights: &'a HashMap<SeraiAddress, u64>,
} }
impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> { impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
fn potentially_start_cosign(&mut self) { fn potentially_start_cosign(&mut self) {
// Don't start a new cosigning instance if we're actively running one // Don't start a new cosigning instance if we're actively running one
if TributaryDb::actively_cosigning(self.tributary_txn, self.set).is_some() { if TributaryDb::actively_cosigning(self.tributary_txn, self.set).is_some() {
@@ -74,20 +114,20 @@ impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> {
return; return;
} }
let Some(substrate_block_number) = let intent =
Cosigning::<CD>::finalized_block_number(self.cosign_db, latest_substrate_block_to_cosign) CosignIntents::take(self.tributary_txn, self.set, latest_substrate_block_to_cosign)
else { .expect("Transaction::Cosign locally provided but CosignIntents wasn't populated");
// This is a valid panic as we shouldn't be scanning this block if we didn't provide all assert_eq!(
// Provided transactions within it, and the block to cosign is a Provided transaction intent.block_hash, latest_substrate_block_to_cosign,
panic!("cosigning a block our cosigner didn't index") "provided CosignIntent wasn't saved by its block hash"
}; );
// Mark us as actively cosigning // Mark us as actively cosigning
TributaryDb::start_cosigning( TributaryDb::start_cosigning(
self.tributary_txn, self.tributary_txn,
self.set, self.set,
latest_substrate_block_to_cosign, latest_substrate_block_to_cosign,
substrate_block_number, intent.block_number,
); );
// Send the message for the processor to start signing // Send the message for the processor to start signing
TributaryDb::send_message( TributaryDb::send_message(
@@ -95,8 +135,7 @@ impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> {
self.set, self.set,
messages::coordinator::CoordinatorMessage::CosignSubstrateBlock { messages::coordinator::CoordinatorMessage::CosignSubstrateBlock {
session: self.set.session, session: self.set.session,
block_number: substrate_block_number, intent,
block: latest_substrate_block_to_cosign,
}, },
); );
} }
@@ -167,7 +206,7 @@ impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> {
} }
Transaction::DkgConfirmationShare { attempt, share, signed } => { Transaction::DkgConfirmationShare { attempt, share, signed } => {
// Accumulate the shares into our own FROST attempt manager // Accumulate the shares into our own FROST attempt manager
todo!("TODO") todo!("TODO: SetKeysTask")
} }
Transaction::Cosign { substrate_block_hash } => { Transaction::Cosign { substrate_block_hash } => {
@@ -206,11 +245,32 @@ impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> {
} }
Transaction::SubstrateBlock { hash } => { Transaction::SubstrateBlock { hash } => {
// Whitelist all of the IDs this Substrate block causes to be signed // Whitelist all of the IDs this Substrate block causes to be signed
todo!("TODO") let plans = SubstrateBlockPlans::take(self.tributary_txn, self.set, hash).expect(
"Transaction::SubstrateBlock locally provided but SubstrateBlockPlans wasn't populated",
);
for plan in plans {
TributaryDb::recognize_topic(
self.tributary_txn,
self.set,
Topic::Sign {
id: VariantSignId::Transaction(plan),
attempt: 0,
round: SigningProtocolRound::Preprocess,
},
);
}
} }
Transaction::Batch { hash } => { Transaction::Batch { hash } => {
// Whitelist the signing of this batch, publishing our own preprocess // Whitelist the signing of this batch
todo!("TODO") TributaryDb::recognize_topic(
self.tributary_txn,
self.set,
Topic::Sign {
id: VariantSignId::Batch(hash),
attempt: 0,
round: SigningProtocolRound::Preprocess,
},
);
} }
Transaction::SlashReport { slash_points, signed } => { Transaction::SlashReport { slash_points, signed } => {
@@ -292,8 +352,11 @@ impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> {
// Create the resulting slash report // Create the resulting slash report
let mut slash_report = vec![]; let mut slash_report = vec![];
for (validator, points) in self.validators.iter().copied().zip(amortized_slash_report) { for (validator, points) in self.validators.iter().copied().zip(amortized_slash_report) {
if points != 0 { // TODO: Natively store this as a `Slash`
slash_report.push(Slash { key: validator.into(), points }); if points == u32::MAX {
slash_report.push(Slash::Fatal);
} else {
slash_report.push(Slash::Points(points));
} }
} }
assert!(slash_report.len() <= f); assert!(slash_report.len() <= f);
@@ -411,8 +474,7 @@ impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> {
} }
/// The task to scan the Tributary, populating `ProcessorMessages`. /// The task to scan the Tributary, populating `ProcessorMessages`.
pub struct ScanTributaryTask<CD: Db, TD: Db, P: P2p> { pub struct ScanTributaryTask<TD: Db, P: P2p> {
cosign_db: CD,
tributary_db: TD, tributary_db: TD,
set: ValidatorSet, set: ValidatorSet,
validators: Vec<SeraiAddress>, validators: Vec<SeraiAddress>,
@@ -422,10 +484,9 @@ pub struct ScanTributaryTask<CD: Db, TD: Db, P: P2p> {
_p2p: PhantomData<P>, _p2p: PhantomData<P>,
} }
impl<CD: Db, TD: Db, P: P2p> ScanTributaryTask<CD, TD, P> { impl<TD: Db, P: P2p> ScanTributaryTask<TD, P> {
/// Create a new instance of this task. /// Create a new instance of this task.
pub fn new( pub fn new(
cosign_db: CD,
tributary_db: TD, tributary_db: TD,
new_set: &NewSetInformation, new_set: &NewSetInformation,
tributary: TributaryReader<TD, Transaction>, tributary: TributaryReader<TD, Transaction>,
@@ -442,7 +503,6 @@ impl<CD: Db, TD: Db, P: P2p> ScanTributaryTask<CD, TD, P> {
} }
ScanTributaryTask { ScanTributaryTask {
cosign_db,
tributary_db, tributary_db,
set: new_set.set, set: new_set.set,
validators, validators,
@@ -454,8 +514,10 @@ impl<CD: Db, TD: Db, P: P2p> ScanTributaryTask<CD, TD, P> {
} }
} }
impl<CD: Db, TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<CD, TD, P> { impl<TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<TD, P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move { async move {
let (mut last_block_number, mut last_block_hash) = let (mut last_block_number, mut last_block_hash) =
TributaryDb::last_handled_tributary_block(&self.tributary_db, self.set) TributaryDb::last_handled_tributary_block(&self.tributary_db, self.set)
@@ -486,7 +548,6 @@ impl<CD: Db, TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<CD, TD, P> {
(ScanBlock { (ScanBlock {
_td: PhantomData::<TD>, _td: PhantomData::<TD>,
_p2p: PhantomData::<P>, _p2p: PhantomData::<P>,
cosign_db: &self.cosign_db,
tributary_txn: &mut tributary_txn, tributary_txn: &mut tributary_txn,
set: self.set, set: self.set,
validators: &self.validators, validators: &self.validators,

View File

@@ -301,14 +301,14 @@ impl TransactionTrait for Transaction {
Transaction::Batch { .. } => {} Transaction::Batch { .. } => {}
Transaction::Sign { data, .. } => { Transaction::Sign { data, .. } => {
if data.len() > usize::try_from(MAX_KEY_SHARES_PER_SET).unwrap() { if data.len() > usize::from(MAX_KEY_SHARES_PER_SET) {
Err(TransactionError::InvalidContent)? Err(TransactionError::InvalidContent)?
} }
// TODO: MAX_SIGN_LEN // TODO: MAX_SIGN_LEN
} }
Transaction::SlashReport { slash_points, .. } => { Transaction::SlashReport { slash_points, .. } => {
if slash_points.len() > usize::try_from(MAX_KEY_SHARES_PER_SET).unwrap() { if slash_points.len() > usize::from(MAX_KEY_SHARES_PER_SET) {
Err(TransactionError::InvalidContent)? Err(TransactionError::InvalidContent)?
} }
} }

View File

@@ -64,22 +64,20 @@ impl MessageQueue {
Self::new(service, url, priv_key) Self::new(service, url, priv_key)
} }
#[must_use] async fn send(socket: &mut TcpStream, msg: MessageQueueRequest) -> Result<(), String> {
async fn send(socket: &mut TcpStream, msg: MessageQueueRequest) -> bool {
let msg = borsh::to_vec(&msg).unwrap(); let msg = borsh::to_vec(&msg).unwrap();
let Ok(()) = socket.write_all(&u32::try_from(msg.len()).unwrap().to_le_bytes()).await else { match socket.write_all(&u32::try_from(msg.len()).unwrap().to_le_bytes()).await {
log::warn!("couldn't send the message len"); Ok(()) => {}
return false; Err(e) => Err(format!("couldn't send the message len: {e:?}"))?,
}; };
let Ok(()) = socket.write_all(&msg).await else { match socket.write_all(&msg).await {
log::warn!("couldn't write the message"); Ok(()) => {}
return false; Err(e) => Err(format!("couldn't write the message: {e:?}"))?,
}; }
true Ok(())
} }
pub async fn queue(&self, metadata: Metadata, msg: Vec<u8>) { pub async fn queue(&self, metadata: Metadata, msg: Vec<u8>) -> Result<(), String> {
// TODO: Should this use OsRng? Deterministic or deterministic + random may be better.
let nonce = Zeroizing::new(<Ristretto as Ciphersuite>::F::random(&mut OsRng)); let nonce = Zeroizing::new(<Ristretto as Ciphersuite>::F::random(&mut OsRng));
let nonce_pub = Ristretto::generator() * nonce.deref(); let nonce_pub = Ristretto::generator() * nonce.deref();
let sig = SchnorrSignature::<Ristretto>::sign( let sig = SchnorrSignature::<Ristretto>::sign(
@@ -97,6 +95,21 @@ impl MessageQueue {
.serialize(); .serialize();
let msg = MessageQueueRequest::Queue { meta: metadata, msg, sig }; let msg = MessageQueueRequest::Queue { meta: metadata, msg, sig };
let mut socket = match TcpStream::connect(&self.url).await {
Ok(socket) => socket,
Err(e) => Err(format!("failed to connect to the message-queue service: {e:?}"))?,
};
Self::send(&mut socket, msg.clone()).await?;
match socket.read_u8().await {
Ok(1) => {}
Ok(b) => Err(format!("message-queue didn't return for 1 for its ack, recieved: {b}"))?,
Err(e) => Err(format!("failed to read the response from the message-queue service: {e:?}"))?,
}
Ok(())
}
pub async fn queue_with_retry(&self, metadata: Metadata, msg: Vec<u8>) {
let mut first = true; let mut first = true;
loop { loop {
// Sleep, so we don't hammer re-attempts // Sleep, so we don't hammer re-attempts
@@ -105,16 +118,11 @@ impl MessageQueue {
} }
first = false; first = false;
let Ok(mut socket) = TcpStream::connect(&self.url).await else { continue }; if self.queue(metadata.clone(), msg.clone()).await.is_ok() {
if !Self::send(&mut socket, msg.clone()).await {
continue;
}
if socket.read_u8().await.ok() != Some(1) {
continue;
}
break; break;
} }
} }
}
pub async fn next(&self, from: Service) -> QueuedMessage { pub async fn next(&self, from: Service) -> QueuedMessage {
let msg = MessageQueueRequest::Next { from, to: self.service }; let msg = MessageQueueRequest::Next { from, to: self.service };
@@ -136,7 +144,7 @@ impl MessageQueue {
log::trace!("opened socket for next"); log::trace!("opened socket for next");
loop { loop {
if !Self::send(&mut socket, msg.clone()).await { if Self::send(&mut socket, msg.clone()).await.is_err() {
continue 'outer; continue 'outer;
} }
let status = match socket.read_u8().await { let status = match socket.read_u8().await {
@@ -224,7 +232,7 @@ impl MessageQueue {
first = false; first = false;
let Ok(mut socket) = TcpStream::connect(&self.url).await else { continue }; let Ok(mut socket) = TcpStream::connect(&self.url).await else { continue };
if !Self::send(&mut socket, msg.clone()).await { if Self::send(&mut socket, msg.clone()).await.is_err() {
continue; continue;
} }
if socket.read_u8().await.ok() != Some(1) { if socket.read_u8().await.ok() != Some(1) {

View File

@@ -95,6 +95,7 @@ impl Coordinator {
message_queue.ack(Service::Coordinator, msg.id).await; message_queue.ack(Service::Coordinator, msg.id).await;
// Fire that there's a new message // Fire that there's a new message
// This assumes the success path, not the just-rebooted-path
received_message_send received_message_send
.send(()) .send(())
.expect("failed to tell the Coordinator there's a new message"); .expect("failed to tell the Coordinator there's a new message");
@@ -103,6 +104,7 @@ impl Coordinator {
}); });
// Spawn a task to send messages to the message-queue // Spawn a task to send messages to the message-queue
// TODO: Define a proper task for this and remove use of queue_with_retry
tokio::spawn({ tokio::spawn({
let mut db = db.clone(); let mut db = db.clone();
async move { async move {
@@ -115,12 +117,12 @@ impl Coordinator {
to: Service::Coordinator, to: Service::Coordinator,
intent: borsh::from_slice::<messages::ProcessorMessage>(&msg).unwrap().intent(), intent: borsh::from_slice::<messages::ProcessorMessage>(&msg).unwrap().intent(),
}; };
message_queue.queue(metadata, msg).await; message_queue.queue_with_retry(metadata, msg).await;
txn.commit(); txn.commit();
} }
None => { None => {
let _ = let _ =
tokio::time::timeout(core::time::Duration::from_secs(60), sent_message_recv.recv()) tokio::time::timeout(core::time::Duration::from_secs(6), sent_message_recv.recv())
.await; .await;
} }
} }

View File

@@ -39,7 +39,9 @@ pub(crate) fn script_pubkey_for_on_chain_output(
pub(crate) struct TxIndexTask<D: Db>(pub(crate) Rpc<D>); pub(crate) struct TxIndexTask<D: Db>(pub(crate) Rpc<D>);
impl<D: Db> ContinuallyRan for TxIndexTask<D> { impl<D: Db> ContinuallyRan for TxIndexTask<D> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move { async move {
let latest_block_number = self let latest_block_number = self
.0 .0

View File

@@ -29,3 +29,5 @@ serai-primitives = { path = "../../substrate/primitives", default-features = fal
in-instructions-primitives = { package = "serai-in-instructions-primitives", path = "../../substrate/in-instructions/primitives", default-features = false, features = ["std", "borsh"] } in-instructions-primitives = { package = "serai-in-instructions-primitives", path = "../../substrate/in-instructions/primitives", default-features = false, features = ["std", "borsh"] }
coins-primitives = { package = "serai-coins-primitives", path = "../../substrate/coins/primitives", default-features = false, features = ["std", "borsh"] } coins-primitives = { package = "serai-coins-primitives", path = "../../substrate/coins/primitives", default-features = false, features = ["std", "borsh"] }
validator-sets-primitives = { package = "serai-validator-sets-primitives", path = "../../substrate/validator-sets/primitives", default-features = false, features = ["std", "borsh"] } validator-sets-primitives = { package = "serai-validator-sets-primitives", path = "../../substrate/validator-sets/primitives", default-features = false, features = ["std", "borsh"] }
serai-cosign = { path = "../../coordinator/cosign", default-features = false }

View File

@@ -11,6 +11,8 @@ use validator_sets_primitives::{Session, KeyPair, Slash};
use coins_primitives::OutInstructionWithBalance; use coins_primitives::OutInstructionWithBalance;
use in_instructions_primitives::SignedBatch; use in_instructions_primitives::SignedBatch;
use serai_cosign::{CosignIntent, SignedCosign};
#[derive(Clone, Copy, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] #[derive(Clone, Copy, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
pub struct SubstrateContext { pub struct SubstrateContext {
pub serai_time: u64, pub serai_time: u64,
@@ -50,7 +52,8 @@ pub mod key_gen {
} }
} }
#[derive(Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize)] // This set of messages is sent entirely and solely by serai-processor-key-gen.
#[derive(Clone, BorshSerialize, BorshDeserialize)]
pub enum ProcessorMessage { pub enum ProcessorMessage {
// Participated in the specified key generation protocol. // Participated in the specified key generation protocol.
Participation { session: Session, participation: Vec<u8> }, Participation { session: Session, participation: Vec<u8> },
@@ -141,7 +144,8 @@ pub mod sign {
} }
} }
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] // This set of messages is sent entirely and solely by serai-processor-frost-attempt-manager.
#[derive(Clone, Debug, BorshSerialize, BorshDeserialize)]
pub enum ProcessorMessage { pub enum ProcessorMessage {
// Participant sent an invalid message during the sign protocol. // Participant sent an invalid message during the sign protocol.
InvalidParticipant { session: Session, participant: Participant }, InvalidParticipant { session: Session, participant: Participant },
@@ -155,39 +159,25 @@ pub mod sign {
pub mod coordinator { pub mod coordinator {
use super::*; use super::*;
// TODO: Remove this for the one defined in serai-cosign
pub fn cosign_block_msg(block_number: u64, block: [u8; 32]) -> Vec<u8> {
const DST: &[u8] = b"Cosign";
let mut res = vec![u8::try_from(DST.len()).unwrap()];
res.extend(DST);
res.extend(block_number.to_le_bytes());
res.extend(block);
res
}
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] #[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
pub enum CoordinatorMessage { pub enum CoordinatorMessage {
/// Cosign the specified Substrate block. /// Cosign the specified Substrate block.
/// ///
/// This is sent by the Coordinator's Tributary scanner. /// This is sent by the Coordinator's Tributary scanner.
CosignSubstrateBlock { session: Session, block_number: u64, block: [u8; 32] }, CosignSubstrateBlock { session: Session, intent: CosignIntent },
/// Sign the slash report for this session. /// Sign the slash report for this session.
/// ///
/// This is sent by the Coordinator's Tributary scanner. /// This is sent by the Coordinator's Tributary scanner.
SignSlashReport { session: Session, report: Vec<Slash> }, SignSlashReport { session: Session, report: Vec<Slash> },
} }
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] // This set of messages is sent entirely and solely by serai-processor-bin's implementation of
pub struct PlanMeta { // the signers::Coordinator trait.
pub session: Session, // TODO: Move message creation into serai-processor-signers
pub id: [u8; 32], #[derive(Clone, Debug, BorshSerialize, BorshDeserialize)]
}
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
pub enum ProcessorMessage { pub enum ProcessorMessage {
CosignedBlock { block_number: u64, block: [u8; 32], signature: Vec<u8> }, CosignedBlock { cosign: SignedCosign },
SignedBatch { batch: SignedBatch }, SignedBatch { batch: SignedBatch },
SubstrateBlockAck { block: u64, plans: Vec<PlanMeta> },
SignedSlashReport { session: Session, signature: Vec<u8> }, SignedSlashReport { session: Session, signature: Vec<u8> },
} }
} }
@@ -231,17 +221,16 @@ pub mod substrate {
}, },
} }
#[derive(Clone, PartialEq, Eq, Debug)] #[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
pub enum ProcessorMessage {} pub struct PlanMeta {
impl BorshSerialize for ProcessorMessage { pub session: Session,
fn serialize<W: borsh::io::Write>(&self, _writer: &mut W) -> borsh::io::Result<()> { pub transaction_plan_id: [u8; 32],
unimplemented!()
}
}
impl BorshDeserialize for ProcessorMessage {
fn deserialize_reader<R: borsh::io::Read>(_reader: &mut R) -> borsh::io::Result<Self> {
unimplemented!()
} }
#[derive(Clone, Debug, BorshSerialize, BorshDeserialize)]
pub enum ProcessorMessage {
// TODO: Have the processor send this
SubstrateBlockAck { block: [u8; 32], plans: Vec<PlanMeta> },
} }
} }
@@ -268,7 +257,7 @@ impl_from!(sign, CoordinatorMessage, Sign);
impl_from!(coordinator, CoordinatorMessage, Coordinator); impl_from!(coordinator, CoordinatorMessage, Coordinator);
impl_from!(substrate, CoordinatorMessage, Substrate); impl_from!(substrate, CoordinatorMessage, Substrate);
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] #[derive(Clone, Debug, BorshSerialize, BorshDeserialize)]
pub enum ProcessorMessage { pub enum ProcessorMessage {
KeyGen(key_gen::ProcessorMessage), KeyGen(key_gen::ProcessorMessage),
Sign(sign::ProcessorMessage), Sign(sign::ProcessorMessage),
@@ -331,8 +320,8 @@ impl CoordinatorMessage {
CoordinatorMessage::Coordinator(msg) => { CoordinatorMessage::Coordinator(msg) => {
let (sub, id) = match msg { let (sub, id) = match msg {
// We only cosign a block once, and Reattempt is a separate message // We only cosign a block once, and Reattempt is a separate message
coordinator::CoordinatorMessage::CosignSubstrateBlock { block_number, .. } => { coordinator::CoordinatorMessage::CosignSubstrateBlock { intent, .. } => {
(0, block_number.encode()) (0, intent.block_number.encode())
} }
// We only sign one slash report, and Reattempt is a separate message // We only sign one slash report, and Reattempt is a separate message
coordinator::CoordinatorMessage::SignSlashReport { session, .. } => (1, session.encode()), coordinator::CoordinatorMessage::SignSlashReport { session, .. } => (1, session.encode()),
@@ -404,17 +393,26 @@ impl ProcessorMessage {
} }
ProcessorMessage::Coordinator(msg) => { ProcessorMessage::Coordinator(msg) => {
let (sub, id) = match msg { let (sub, id) = match msg {
coordinator::ProcessorMessage::CosignedBlock { block, .. } => (0, block.encode()), coordinator::ProcessorMessage::CosignedBlock { cosign } => {
(0, cosign.cosign.block_hash.encode())
}
coordinator::ProcessorMessage::SignedBatch { batch, .. } => (1, batch.batch.id.encode()), coordinator::ProcessorMessage::SignedBatch { batch, .. } => (1, batch.batch.id.encode()),
coordinator::ProcessorMessage::SubstrateBlockAck { block, .. } => (2, block.encode()), coordinator::ProcessorMessage::SignedSlashReport { session, .. } => (2, session.encode()),
coordinator::ProcessorMessage::SignedSlashReport { session, .. } => (3, session.encode()),
}; };
let mut res = vec![PROCESSOR_UID, TYPE_COORDINATOR_UID, sub]; let mut res = vec![PROCESSOR_UID, TYPE_COORDINATOR_UID, sub];
res.extend(&id); res.extend(&id);
res res
} }
ProcessorMessage::Substrate(_) => panic!("requesting intent for empty message type"), ProcessorMessage::Substrate(msg) => {
let (sub, id) = match msg {
substrate::ProcessorMessage::SubstrateBlockAck { block, .. } => (0, block.encode()),
};
let mut res = vec![PROCESSOR_UID, TYPE_SUBSTRATE_UID, sub];
res.extend(&id);
res
}
} }
} }
} }

View File

@@ -7,7 +7,10 @@ use serai_db::{DbTxn, Db};
use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch}; use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch};
use primitives::{EncodableG, task::ContinuallyRan}; use primitives::{
EncodableG,
task::{DoesNotError, ContinuallyRan},
};
use crate::{ use crate::{
db::{Returnable, ScannerGlobalDb, InInstructionData, ScanToBatchDb, BatchData, BatchToReportDb}, db::{Returnable, ScannerGlobalDb, InInstructionData, ScanToBatchDb, BatchData, BatchToReportDb},
index, index,
@@ -60,7 +63,9 @@ impl<D: Db, S: ScannerFeed> BatchTask<D, S> {
} }
impl<D: Db, S: ScannerFeed> ContinuallyRan for BatchTask<D, S> { impl<D: Db, S: ScannerFeed> ContinuallyRan for BatchTask<D, S> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { type Error = DoesNotError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move { async move {
let highest_batchable = { let highest_batchable = {
// Fetch the next to scan block // Fetch the next to scan block

View File

@@ -190,7 +190,9 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> EventualityTask<D, S, Sch> {
} }
impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTask<D, S, Sch> { impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTask<D, S, Sch> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move { async move {
// Fetch the highest acknowledged block // Fetch the highest acknowledged block
let Some(highest_acknowledged) = ScannerGlobalDb::<S>::highest_acknowledged_block(&self.db) let Some(highest_acknowledged) = ScannerGlobalDb::<S>::highest_acknowledged_block(&self.db)

View File

@@ -58,7 +58,9 @@ impl<D: Db, S: ScannerFeed> IndexTask<D, S> {
} }
impl<D: Db, S: ScannerFeed> ContinuallyRan for IndexTask<D, S> { impl<D: Db, S: ScannerFeed> ContinuallyRan for IndexTask<D, S> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move { async move {
// Fetch the latest finalized block // Fetch the latest finalized block
let our_latest_finalized = IndexDb::latest_finalized_block(&self.db) let our_latest_finalized = IndexDb::latest_finalized_block(&self.db)

View File

@@ -4,7 +4,7 @@ use serai_db::{DbTxn, Db};
use serai_validator_sets_primitives::Session; use serai_validator_sets_primitives::Session;
use primitives::task::ContinuallyRan; use primitives::task::{DoesNotError, ContinuallyRan};
use crate::{ use crate::{
db::{BatchData, BatchToReportDb, BatchesToSign}, db::{BatchData, BatchToReportDb, BatchesToSign},
substrate, ScannerFeed, substrate, ScannerFeed,
@@ -27,7 +27,9 @@ impl<D: Db, S: ScannerFeed> ReportTask<D, S> {
} }
impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> { impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { type Error = DoesNotError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move { async move {
let mut made_progress = false; let mut made_progress = false;
loop { loop {

View File

@@ -98,7 +98,9 @@ impl<D: Db, S: ScannerFeed> ScanTask<D, S> {
} }
impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanTask<D, S> { impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanTask<D, S> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move { async move {
// Fetch the safe to scan block // Fetch the safe to scan block
let latest_scannable = let latest_scannable =

View File

@@ -5,7 +5,7 @@ use serai_db::{Get, DbTxn, Db};
use serai_coins_primitives::{OutInstruction, OutInstructionWithBalance}; use serai_coins_primitives::{OutInstruction, OutInstructionWithBalance};
use messages::substrate::ExecutedBatch; use messages::substrate::ExecutedBatch;
use primitives::task::ContinuallyRan; use primitives::task::{DoesNotError, ContinuallyRan};
use crate::{ use crate::{
db::{ScannerGlobalDb, SubstrateToEventualityDb, AcknowledgedBatches}, db::{ScannerGlobalDb, SubstrateToEventualityDb, AcknowledgedBatches},
index, batch, ScannerFeed, KeyFor, index, batch, ScannerFeed, KeyFor,
@@ -50,7 +50,9 @@ impl<D: Db, S: ScannerFeed> SubstrateTask<D, S> {
} }
impl<D: Db, S: ScannerFeed> ContinuallyRan for SubstrateTask<D, S> { impl<D: Db, S: ScannerFeed> ContinuallyRan for SubstrateTask<D, S> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { type Error = DoesNotError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move { async move {
let mut made_progress = false; let mut made_progress = false;
loop { loop {

View File

@@ -14,7 +14,7 @@ use serai_db::{Get, DbTxn, Db};
use messages::sign::VariantSignId; use messages::sign::VariantSignId;
use primitives::task::ContinuallyRan; use primitives::task::{DoesNotError, ContinuallyRan};
use scanner::{BatchesToSign, AcknowledgedBatches}; use scanner::{BatchesToSign, AcknowledgedBatches};
use frost_attempt_manager::*; use frost_attempt_manager::*;
@@ -79,7 +79,9 @@ impl<D: Db, E: GroupEncoding> BatchSignerTask<D, E> {
} }
impl<D: Db, E: Send + GroupEncoding> ContinuallyRan for BatchSignerTask<D, E> { impl<D: Db, E: Send + GroupEncoding> ContinuallyRan for BatchSignerTask<D, E> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { type Error = DoesNotError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move { async move {
let mut iterated = false; let mut iterated = false;

View File

@@ -22,7 +22,9 @@ impl<D: Db, C: Coordinator> CoordinatorTask<D, C> {
} }
impl<D: Db, C: Coordinator> ContinuallyRan for CoordinatorTask<D, C> { impl<D: Db, C: Coordinator> ContinuallyRan for CoordinatorTask<D, C> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move { async move {
let mut iterated = false; let mut iterated = false;

View File

@@ -11,7 +11,7 @@ use serai_db::{DbTxn, Db};
use messages::{sign::VariantSignId, coordinator::cosign_block_msg}; use messages::{sign::VariantSignId, coordinator::cosign_block_msg};
use primitives::task::ContinuallyRan; use primitives::task::{DoesNotError, ContinuallyRan};
use frost_attempt_manager::*; use frost_attempt_manager::*;
@@ -51,7 +51,9 @@ impl<D: Db> CosignerTask<D> {
} }
impl<D: Db> ContinuallyRan for CosignerTask<D> { impl<D: Db> ContinuallyRan for CosignerTask<D> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { type Error = DoesNotError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, DoesNotError>> {
async move { async move {
let mut iterated = false; let mut iterated = false;

View File

@@ -13,7 +13,7 @@ use serai_db::{DbTxn, Db};
use messages::sign::VariantSignId; use messages::sign::VariantSignId;
use primitives::task::ContinuallyRan; use primitives::task::{DoesNotError, ContinuallyRan};
use scanner::ScannerFeed; use scanner::ScannerFeed;
use frost_attempt_manager::*; use frost_attempt_manager::*;
@@ -52,7 +52,9 @@ impl<D: Db, S: ScannerFeed> SlashReportSignerTask<D, S> {
} }
impl<D: Db, S: ScannerFeed> ContinuallyRan for SlashReportSignerTask<D, S> { impl<D: Db, S: ScannerFeed> ContinuallyRan for SlashReportSignerTask<D, S> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { type Error = DoesNotError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move { async move {
let mut iterated = false; let mut iterated = false;

View File

@@ -92,7 +92,9 @@ impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>
impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>> ContinuallyRan impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>> ContinuallyRan
for TransactionSignerTask<D, ST, P> for TransactionSignerTask<D, ST, P>
{ {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { type Error = P::EphemeralError;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async { async {
let mut iterated = false; let mut iterated = false;
@@ -222,11 +224,7 @@ impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>
let tx = TransactionFor::<ST>::read(&mut tx_buf).unwrap(); let tx = TransactionFor::<ST>::read(&mut tx_buf).unwrap();
assert!(tx_buf.is_empty()); assert!(tx_buf.is_empty());
self self.publisher.publish(tx).await?;
.publisher
.publish(tx)
.await
.map_err(|e| format!("couldn't re-broadcast transactions: {e:?}"))?;
} }
self.last_publication = Instant::now(); self.last_publication = Instant::now();

View File

@@ -21,7 +21,7 @@ pub enum Call {
}, },
report_slashes { report_slashes {
network: NetworkId, network: NetworkId,
slashes: BoundedVec<(SeraiAddress, u32), ConstU32<{ MAX_KEY_SHARES_PER_SET / 3 }>>, slashes: BoundedVec<(SeraiAddress, u32), ConstU32<{ MAX_KEY_SHARES_PER_SET_U32 / 3 }>>,
signature: Signature, signature: Signature,
}, },
allocate { allocate {

View File

@@ -238,9 +238,11 @@ impl<'a> SeraiValidatorSets<'a> {
pub fn report_slashes( pub fn report_slashes(
network: NetworkId, network: NetworkId,
// TODO: This bounds a maximum length but takes more space than just publishing all the u32s
// (50 * (32 + 4)) > (150 * 4)
slashes: sp_runtime::BoundedVec< slashes: sp_runtime::BoundedVec<
(SeraiAddress, u32), (SeraiAddress, u32),
sp_core::ConstU32<{ primitives::MAX_KEY_SHARES_PER_SET / 3 }>, sp_core::ConstU32<{ primitives::MAX_KEY_SHARES_PER_SET_U32 / 3 }>,
>, >,
signature: Signature, signature: Signature,
) -> Transaction { ) -> Transaction {

View File

@@ -23,7 +23,7 @@ pub mod pallet {
use economic_security_pallet::{Config as EconomicSecurityConfig, Pallet as EconomicSecurity}; use economic_security_pallet::{Config as EconomicSecurityConfig, Pallet as EconomicSecurity};
use serai_primitives::*; use serai_primitives::*;
use validator_sets_primitives::{MAX_KEY_SHARES_PER_SET, Session}; use validator_sets_primitives::{MAX_KEY_SHARES_PER_SET_U32, Session};
pub use emissions_primitives as primitives; pub use emissions_primitives as primitives;
use primitives::*; use primitives::*;
@@ -74,7 +74,7 @@ pub mod pallet {
_, _,
Identity, Identity,
NetworkId, NetworkId,
BoundedVec<(PublicKey, u64), ConstU32<{ MAX_KEY_SHARES_PER_SET }>>, BoundedVec<(PublicKey, u64), ConstU32<{ MAX_KEY_SHARES_PER_SET_U32 }>>,
OptionQuery, OptionQuery,
>; >;

View File

@@ -3,6 +3,7 @@ use crate::BlockNumber;
// 1 MB // 1 MB
pub const BLOCK_SIZE: u32 = 1024 * 1024; pub const BLOCK_SIZE: u32 = 1024 * 1024;
// 6 seconds // 6 seconds
// TODO: Use Duration
pub const TARGET_BLOCK_TIME: u64 = 6; pub const TARGET_BLOCK_TIME: u64 = 6;
/// Measured in blocks. /// Measured in blocks.

View File

@@ -282,7 +282,7 @@ impl pallet_authorship::Config for Runtime {
} }
// Maximum number of authorities per session. // Maximum number of authorities per session.
pub type MaxAuthorities = ConstU32<{ validator_sets::primitives::MAX_KEY_SHARES_PER_SET }>; pub type MaxAuthorities = ConstU32<{ validator_sets::primitives::MAX_KEY_SHARES_PER_SET_U32 }>;
/// Longevity of an offence report. /// Longevity of an offence report.
pub type ReportLongevity = <Runtime as pallet_babe::Config>::EpochDuration; pub type ReportLongevity = <Runtime as pallet_babe::Config>::EpochDuration;

View File

@@ -141,7 +141,7 @@ pub mod pallet {
_, _,
Identity, Identity,
NetworkId, NetworkId,
BoundedVec<(Public, u64), ConstU32<{ MAX_KEY_SHARES_PER_SET }>>, BoundedVec<(Public, u64), ConstU32<{ MAX_KEY_SHARES_PER_SET_U32 }>>,
OptionQuery, OptionQuery,
>; >;
/// The validators selected to be in-set, regardless of if removed. /// The validators selected to be in-set, regardless of if removed.
@@ -402,7 +402,7 @@ pub mod pallet {
// Clear the current InSet // Clear the current InSet
assert_eq!( assert_eq!(
InSet::<T>::clear_prefix(network, MAX_KEY_SHARES_PER_SET, None).maybe_cursor, InSet::<T>::clear_prefix(network, MAX_KEY_SHARES_PER_SET_U32, None).maybe_cursor,
None None
); );
@@ -412,11 +412,11 @@ pub mod pallet {
{ {
let mut iter = SortedAllocationsIter::<T>::new(network); let mut iter = SortedAllocationsIter::<T>::new(network);
let mut key_shares = 0; let mut key_shares = 0;
while key_shares < u64::from(MAX_KEY_SHARES_PER_SET) { while key_shares < u64::from(MAX_KEY_SHARES_PER_SET_U32) {
let Some((key, amount)) = iter.next() else { break }; let Some((key, amount)) = iter.next() else { break };
let these_key_shares = let these_key_shares =
(amount.0 / allocation_per_key_share).min(u64::from(MAX_KEY_SHARES_PER_SET)); (amount.0 / allocation_per_key_share).min(u64::from(MAX_KEY_SHARES_PER_SET_U32));
participants.push((key, these_key_shares)); participants.push((key, these_key_shares));
key_shares += these_key_shares; key_shares += these_key_shares;
@@ -535,7 +535,7 @@ pub mod pallet {
top = Some(key_shares); top = Some(key_shares);
} }
if key_shares > u64::from(MAX_KEY_SHARES_PER_SET) { if key_shares > u64::from(MAX_KEY_SHARES_PER_SET_U32) {
break; break;
} }
} }
@@ -547,7 +547,7 @@ pub mod pallet {
// post_amortization_key_shares_for_top_validator yields what the top validator's key shares // post_amortization_key_shares_for_top_validator yields what the top validator's key shares
// would be after such a reduction, letting us evaluate this correctly // would be after such a reduction, letting us evaluate this correctly
let top = post_amortization_key_shares_for_top_validator(validators_len, top, key_shares); let top = post_amortization_key_shares_for_top_validator(validators_len, top, key_shares);
(top * 3) < key_shares.min(MAX_KEY_SHARES_PER_SET.into()) (top * 3) < key_shares.min(MAX_KEY_SHARES_PER_SET_U32.into())
} }
fn increase_allocation( fn increase_allocation(
@@ -586,7 +586,7 @@ pub mod pallet {
// The above is_bft calls are only used to check a BFT net doesn't become non-BFT // The above is_bft calls are only used to check a BFT net doesn't become non-BFT
// Check here if this call would prevent a non-BFT net from *ever* becoming BFT // Check here if this call would prevent a non-BFT net from *ever* becoming BFT
if (new_allocation / allocation_per_key_share) >= (MAX_KEY_SHARES_PER_SET / 3).into() { if (new_allocation / allocation_per_key_share) >= (MAX_KEY_SHARES_PER_SET_U32 / 3).into() {
Err(Error::<T>::AllocationWouldPreventFaultTolerance)?; Err(Error::<T>::AllocationWouldPreventFaultTolerance)?;
} }
@@ -1010,7 +1010,7 @@ pub mod pallet {
pub fn report_slashes( pub fn report_slashes(
origin: OriginFor<T>, origin: OriginFor<T>,
network: NetworkId, network: NetworkId,
slashes: BoundedVec<(Public, u32), ConstU32<{ MAX_KEY_SHARES_PER_SET / 3 }>>, slashes: BoundedVec<(Public, u32), ConstU32<{ MAX_KEY_SHARES_PER_SET_U32 / 3 }>>,
signature: Signature, signature: Signature,
) -> DispatchResult { ) -> DispatchResult {
ensure_none(origin)?; ensure_none(origin)?;
@@ -1209,7 +1209,7 @@ pub mod pallet {
ValidTransaction::with_tag_prefix("ValidatorSets") ValidTransaction::with_tag_prefix("ValidatorSets")
.and_provides((1, set)) .and_provides((1, set))
.longevity(MAX_KEY_SHARES_PER_SET.into()) .longevity(MAX_KEY_SHARES_PER_SET_U32.into())
.propagate(true) .propagate(true)
.build() .build()
} }

View File

@@ -1,5 +1,7 @@
#![cfg_attr(not(feature = "std"), no_std)] #![cfg_attr(not(feature = "std"), no_std)]
use core::time::Duration;
#[cfg(feature = "std")] #[cfg(feature = "std")]
use zeroize::Zeroize; use zeroize::Zeroize;
@@ -13,20 +15,30 @@ use borsh::{BorshSerialize, BorshDeserialize};
#[cfg(feature = "serde")] #[cfg(feature = "serde")]
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
use sp_core::{ConstU32, sr25519::Public, bounded::BoundedVec}; use sp_core::{ConstU32, bounded::BoundedVec, sr25519::Public};
#[cfg(not(feature = "std"))] #[cfg(not(feature = "std"))]
use sp_std::vec::Vec; use sp_std::vec::Vec;
use serai_primitives::NetworkId; use serai_primitives::NetworkId;
/// The maximum amount of key shares per set. mod slash_points;
pub const MAX_KEY_SHARES_PER_SET: u32 = 150; pub use slash_points::*;
/// The expected duration for a session.
// 1 week
pub const SESSION_LENGTH: Duration = Duration::from_secs(7 * 24 * 60 * 60);
/// The maximum length for a key.
// Support keys up to 96 bytes (BLS12-381 G2). // Support keys up to 96 bytes (BLS12-381 G2).
pub const MAX_KEY_LEN: u32 = 96; pub const MAX_KEY_LEN: u32 = 96;
/// The maximum amount of key shares per set.
pub const MAX_KEY_SHARES_PER_SET: u16 = 150;
pub const MAX_KEY_SHARES_PER_SET_U32: u32 = MAX_KEY_SHARES_PER_SET as u32;
/// The type used to identify a specific session of validators. /// The type used to identify a specific session of validators.
#[derive( #[derive(
Clone, Copy, PartialEq, Eq, Hash, Default, Debug, Encode, Decode, TypeInfo, MaxEncodedLen, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Encode, Decode, TypeInfo, MaxEncodedLen,
)] )]
#[cfg_attr(feature = "std", derive(Zeroize))] #[cfg_attr(feature = "std", derive(Zeroize))]
#[cfg_attr(feature = "borsh", derive(BorshSerialize, BorshDeserialize))] #[cfg_attr(feature = "borsh", derive(BorshSerialize, BorshDeserialize))]
@@ -34,7 +46,9 @@ pub const MAX_KEY_LEN: u32 = 96;
pub struct Session(pub u32); pub struct Session(pub u32);
/// The type used to identify a specific validator set during a specific session. /// The type used to identify a specific validator set during a specific session.
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Encode, Decode, TypeInfo, MaxEncodedLen)] #[derive(
Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Encode, Decode, TypeInfo, MaxEncodedLen,
)]
#[cfg_attr(feature = "std", derive(Zeroize))] #[cfg_attr(feature = "std", derive(Zeroize))]
#[cfg_attr(feature = "borsh", derive(BorshSerialize, BorshDeserialize))] #[cfg_attr(feature = "borsh", derive(BorshSerialize, BorshDeserialize))]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
@@ -43,13 +57,13 @@ pub struct ValidatorSet {
pub network: NetworkId, pub network: NetworkId,
} }
type MaxKeyLen = ConstU32<MAX_KEY_LEN>;
/// The type representing a Key from an external network. /// The type representing a Key from an external network.
pub type ExternalKey = BoundedVec<u8, MaxKeyLen>; pub type ExternalKey = BoundedVec<u8, ConstU32<MAX_KEY_LEN>>;
/// The key pair for a validator set. /// The key pair for a validator set.
/// ///
/// This is their Ristretto key, used for signing Batches, and their key on the external network. /// This is their Ristretto key, used for publishing data onto Serai, and their key on the external
/// network.
#[derive(Clone, PartialEq, Eq, Debug, Encode, Decode, TypeInfo, MaxEncodedLen)] #[derive(Clone, PartialEq, Eq, Debug, Encode, Decode, TypeInfo, MaxEncodedLen)]
#[cfg_attr(feature = "borsh", derive(BorshSerialize, BorshDeserialize))] #[cfg_attr(feature = "borsh", derive(BorshSerialize, BorshDeserialize))]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
@@ -81,12 +95,12 @@ impl Zeroize for KeyPair {
/// The MuSig context for a validator set. /// The MuSig context for a validator set.
pub fn musig_context(set: ValidatorSet) -> Vec<u8> { pub fn musig_context(set: ValidatorSet) -> Vec<u8> {
[b"ValidatorSets-musig_key".as_ref(), &set.encode()].concat() (b"ValidatorSets-musig_key".as_ref(), set).encode()
} }
/// The MuSig public key for a validator set. /// The MuSig public key for a validator set.
/// ///
/// This function panics on invalid input. /// This function panics on invalid input, per the definition of `dkg::musig::musig_key`.
pub fn musig_key(set: ValidatorSet, set_keys: &[Public]) -> Public { pub fn musig_key(set: ValidatorSet, set_keys: &[Public]) -> Public {
let mut keys = Vec::new(); let mut keys = Vec::new();
for key in set_keys { for key in set_keys {
@@ -98,33 +112,11 @@ pub fn musig_key(set: ValidatorSet, set_keys: &[Public]) -> Public {
Public(dkg::musig::musig_key::<Ristretto>(&musig_context(set), &keys).unwrap().to_bytes()) Public(dkg::musig::musig_key::<Ristretto>(&musig_context(set), &keys).unwrap().to_bytes())
} }
/// The message for the set_keys signature. /// The message for the `set_keys` signature.
pub fn set_keys_message(set: &ValidatorSet, key_pair: &KeyPair) -> Vec<u8> { pub fn set_keys_message(set: &ValidatorSet, key_pair: &KeyPair) -> Vec<u8> {
(b"ValidatorSets-set_keys", set, key_pair).encode() (b"ValidatorSets-set_keys", set, key_pair).encode()
} }
#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, Decode, TypeInfo, MaxEncodedLen)]
#[cfg_attr(feature = "borsh", derive(BorshSerialize, BorshDeserialize))]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct Slash {
#[cfg_attr(
feature = "borsh",
borsh(
serialize_with = "serai_primitives::borsh_serialize_public",
deserialize_with = "serai_primitives::borsh_deserialize_public"
)
)]
pub key: Public,
pub points: u32,
}
#[derive(Clone, PartialEq, Eq, Debug, Encode, Decode, TypeInfo, MaxEncodedLen)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct SlashReport(pub BoundedVec<Slash, ConstU32<{ MAX_KEY_SHARES_PER_SET / 3 }>>);
pub fn report_slashes_message(set: &ValidatorSet, slashes: &SlashReport) -> Vec<u8> {
(b"ValidatorSets-report_slashes", set, slashes).encode()
}
/// For a set of validators whose key shares may exceed the maximum, reduce until they equal the /// For a set of validators whose key shares may exceed the maximum, reduce until they equal the
/// maximum. /// maximum.
/// ///

View File

@@ -0,0 +1,299 @@
use core::{num::NonZero, time::Duration};
#[cfg(feature = "std")]
use zeroize::Zeroize;
use scale::{Encode, Decode, MaxEncodedLen};
use scale_info::TypeInfo;
#[cfg(feature = "borsh")]
use borsh::{BorshSerialize, BorshDeserialize};
#[cfg(feature = "serde")]
use serde::{Serialize, Deserialize};
use sp_core::{ConstU32, bounded::BoundedVec};
#[cfg(not(feature = "std"))]
use sp_std::vec::Vec;
use serai_primitives::{TARGET_BLOCK_TIME, Amount};
use crate::{SESSION_LENGTH, MAX_KEY_SHARES_PER_SET_U32};
/// Each slash point is equivalent to the downtime implied by missing a block proposal.
// Takes a NonZero<u16> so that the result is never 0.
fn downtime_per_slash_point(validators: NonZero<u16>) -> Duration {
Duration::from_secs(TARGET_BLOCK_TIME) * u32::from(u16::from(validators))
}
/// A slash for a validator.
#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, Decode, MaxEncodedLen, TypeInfo)]
#[cfg_attr(feature = "std", derive(Zeroize))]
#[cfg_attr(feature = "borsh", derive(BorshSerialize, BorshDeserialize))]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum Slash {
/// The slash points accumulated by this validator.
///
/// Each point is considered as `downtime_per_slash_point(validators)` downtime, where
/// `validators` is the amount of validators present in the set.
Points(u32),
/// A fatal slash due to fundamentally faulty behavior.
///
/// This should only be used for misbehavior with explicit evidence of impropriety. This should
/// not be used for liveness failures. The validator will be penalized all allocated stake.
Fatal,
}
impl Slash {
/// Calculate the penalty which should be applied to the validator.
///
/// Does not panic, even due to overflows, if `allocated_stake + session_rewards <= u64::MAX`.
pub fn penalty(
self,
validators: NonZero<u16>,
allocated_stake: Amount,
session_rewards: Amount,
) -> Amount {
match self {
Self::Points(slash_points) => {
let mut slash_points = u64::from(slash_points);
// Do the logic with the stake in u128 to prevent overflow from multiplying u64s
let allocated_stake = u128::from(allocated_stake.0);
let session_rewards = u128::from(session_rewards.0);
// A Serai validator is allowed to be offline for an average of one day every two weeks
// with no additional penalty. They'll solely not earn rewards for the time they were
// offline.
const GRACE_WINDOW: Duration = Duration::from_secs(2 * 7 * 24 * 60 * 60);
const GRACE: Duration = Duration::from_secs(24 * 60 * 60);
// GRACE / GRACE_WINDOW is the fraction of the time a validator is allowed to be offline
// This means we want SESSION_LENGTH * (GRACE / GRACE_WINDOW), but with the parentheses
// moved so we don't incur the floordiv and hit 0
const PENALTY_FREE_DOWNTIME: Duration = Duration::from_secs(
(SESSION_LENGTH.as_secs() * GRACE.as_secs()) / GRACE_WINDOW.as_secs(),
);
let downtime_per_slash_point = downtime_per_slash_point(validators);
let penalty_free_slash_points =
PENALTY_FREE_DOWNTIME.as_secs() / downtime_per_slash_point.as_secs();
/*
In practice, the following means:
- Hours 0-12 are penalized as if they're hours 0-12.
- Hours 12-24 are penalized as if they're hours 12-36.
- Hours 24-36 are penalized as if they're hours 36-96.
- Hours 36-48 are penalized as if they're hours 96-168.
/* Commented, see below explanation of why.
- Hours 48-168 are penalized for 0-2% of stake.
- 168-336 hours of slashes, for a session only lasting 168 hours, is penalized for 2-10%
of stake.
This means a validator offline has to be offline for more than two days to start having
their stake slashed.
*/
This means a validator offline for two days will not earn any rewards for that session.
*/
const MULTIPLIERS: [u64; 4] = [1, 2, 5, 6];
let reward_slash = {
// In intervals of the penalty-free slash points, weight the slash points accrued
// The multiplier for the first interval is 1 as it's penalty-free
let mut weighted_slash_points_for_reward_slash = 0;
let mut total_possible_slash_points_for_rewards_slash = 0;
for mult in MULTIPLIERS {
let slash_points_in_interval = slash_points.min(penalty_free_slash_points);
weighted_slash_points_for_reward_slash += slash_points_in_interval * mult;
total_possible_slash_points_for_rewards_slash += penalty_free_slash_points * mult;
slash_points -= slash_points_in_interval;
}
// If there are no penalty-free slash points, and the validator was slashed, slash the
// entire reward
(u128::from(weighted_slash_points_for_reward_slash) * session_rewards)
.checked_div(u128::from(total_possible_slash_points_for_rewards_slash))
.unwrap_or({
if weighted_slash_points_for_reward_slash == 0 {
0
} else {
session_rewards
}
})
};
// Ensure the slash never exceeds the amount slashable (due to rounding errors)
let reward_slash = reward_slash.min(session_rewards);
/*
let slash_points_for_entire_session =
SESSION_LENGTH.as_secs() / downtime_per_slash_point.as_secs();
let offline_slash = {
// The amount of stake to slash for being offline
const MAX_STAKE_SLASH_PERCENTAGE_OFFLINE: u64 = 2;
let stake_to_slash_for_being_offline =
(allocated_stake * u128::from(MAX_STAKE_SLASH_PERCENTAGE_OFFLINE)) / 100;
// We already removed the slash points for `intervals * penalty_free_slash_points`
let slash_points_for_reward_slash =
penalty_free_slash_points * u64::try_from(MULTIPLIERS.len()).unwrap();
let slash_points_for_offline_stake_slash =
slash_points_for_entire_session.saturating_sub(slash_points_for_reward_slash);
let slash_points_in_interval = slash_points.min(slash_points_for_offline_stake_slash);
slash_points -= slash_points_in_interval;
// If there are no slash points for the entire session, don't slash stake
// That's an extreme edge case which shouldn't start penalizing validators
(u128::from(slash_points_in_interval) * stake_to_slash_for_being_offline)
.checked_div(u128::from(slash_points_for_offline_stake_slash))
.unwrap_or(0)
};
let disruptive_slash = {
/*
A validator may have more slash points than `slash_points_for_stake_slash` if they
didn't just accrue slashes for missing block proposals, yet also accrued slashes for
being disruptive. In that case, we still want to bound their slash points so they can't
somehow be slashed for 100% of their stake (which should only happen on a fatal slash).
*/
const MAX_STAKE_SLASH_PERCENTAGE_DISRUPTIVE: u64 = 8;
let stake_to_slash_for_being_disruptive =
(allocated_stake * u128::from(MAX_STAKE_SLASH_PERCENTAGE_DISRUPTIVE)) / 100;
// Follows the offline slash for `unwrap_or` policy
(u128::from(slash_points.min(slash_points_for_entire_session)) *
stake_to_slash_for_being_disruptive)
.checked_div(u128::from(slash_points_for_entire_session))
.unwrap_or(0)
};
*/
/*
We do not slash for being offline/disruptive at this time. Doing so allows an adversary
to DoS nodes to not just take them offline, yet also take away their stake. This isn't
preferable to the increased incentive to properly maintain a node when the rewards should
already be sufficient for that purpose.
Validators also shouldn't be able to be so disruptive due to their limiting upon
disruption *while its ongoing*. Slashes as a post-response, while an arguably worthwhile
economic penalty, can never be a response in the moment (as necessary to actually handle
the disruption).
If stake slashing was to be re-enabled, the percentage of stake which is eligible for
slashing should be variable to how close we are to losing liveness. This would mean if
less than 10% of validators are offline, no stake is slashes. If 10% are, 2% is eligible.
If 20% are, 5% is eligible. If 30% are, 10% is eligible.
(or similar)
This would mean that a DoS is insufficient to cause a validator to lose their stake.
Instead, a coordinated DoS against multiple Serai validators would be needed,
strengthening our assumptions.
*/
let offline_slash = 0;
let disruptive_slash = 0;
let stake_slash = (offline_slash + disruptive_slash).min(allocated_stake);
let penalty_u128 = reward_slash + stake_slash;
// saturating_into
Amount(u64::try_from(penalty_u128).unwrap_or(u64::MAX))
}
// On fatal slash, their entire stake is removed
Self::Fatal => Amount(allocated_stake.0 + session_rewards.0),
}
}
}
#[derive(Clone, PartialEq, Eq, Debug, Encode, Decode, TypeInfo, MaxEncodedLen)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct SlashReport(pub BoundedVec<Slash, ConstU32<{ MAX_KEY_SHARES_PER_SET_U32 }>>);
// This is assumed binding to the ValidatorSet via the key signed with
pub fn report_slashes_message(slashes: &SlashReport) -> Vec<u8> {
(b"ValidatorSets-report_slashes", slashes).encode()
}
#[test]
fn test_penalty() {
for validators in [1, 50, 100, crate::MAX_KEY_SHARES_PER_SET] {
let validators = NonZero::new(validators).unwrap();
// 12 hours of slash points should only decrease the rewards proportionately
let twelve_hours_of_slash_points =
u32::try_from((12 * 60 * 60) / downtime_per_slash_point(validators).as_secs()).unwrap();
assert_eq!(
Slash::Points(twelve_hours_of_slash_points).penalty(
validators,
Amount(u64::MAX),
Amount(168)
),
Amount(12)
);
// 24 hours of slash points should be counted as 36 hours
assert_eq!(
Slash::Points(2 * twelve_hours_of_slash_points).penalty(
validators,
Amount(u64::MAX),
Amount(168)
),
Amount(36)
);
// 36 hours of slash points should be counted as 96 hours
assert_eq!(
Slash::Points(3 * twelve_hours_of_slash_points).penalty(
validators,
Amount(u64::MAX),
Amount(168)
),
Amount(96)
);
// 48 hours of slash points should be counted as 168 hours
assert_eq!(
Slash::Points(4 * twelve_hours_of_slash_points).penalty(
validators,
Amount(u64::MAX),
Amount(168)
),
Amount(168)
);
/*
// A full week of slash points should slash 2%
let week_of_slash_points = 14 * twelve_hours_of_slash_points;
assert_eq!(
Slash::Points(week_of_slash_points).penalty(validators, Amount(1000), Amount(168)),
Amount(20 + 168)
);
// Two weeks of slash points should slash 10%
assert_eq!(
Slash::Points(2 * week_of_slash_points).penalty(validators, Amount(1000), Amount(168)),
Amount(100 + 168)
);
// Anything greater should still only slash 10%
assert_eq!(
Slash::Points(u32::MAX).penalty(validators, Amount(1000), Amount(168)),
Amount(100 + 168)
);
*/
// Anything greater should still only slash the rewards
assert_eq!(
Slash::Points(u32::MAX).penalty(validators, Amount(u64::MAX), Amount(168)),
Amount(168)
);
}
}
#[test]
fn no_overflow() {
Slash::Points(u32::MAX).penalty(
NonZero::new(u16::MAX).unwrap(),
Amount(u64::MAX),
Amount(u64::MAX),
);
Slash::Points(u32::MAX).penalty(NonZero::new(1).unwrap(), Amount(u64::MAX), Amount(u64::MAX));
}

View File

@@ -19,7 +19,6 @@ workspace = true
[dependencies] [dependencies]
hex = "0.4" hex = "0.4"
async-trait = "0.1"
zeroize = { version = "1", default-features = false } zeroize = { version = "1", default-features = false }
rand_core = { version = "0.6", default-features = false } rand_core = { version = "0.6", default-features = false }

View File

@@ -19,8 +19,6 @@ workspace = true
[dependencies] [dependencies]
hex = "0.4" hex = "0.4"
async-trait = "0.1"
zeroize = { version = "1", default-features = false } zeroize = { version = "1", default-features = false }
rand_core = { version = "0.6", default-features = false } rand_core = { version = "0.6", default-features = false }