6 Commits

Author SHA1 Message Date
Luke Parker
0ce9aad9b2 Add flow to add transactions onto Tributaries 2025-01-12 07:32:45 -05:00
Luke Parker
e35aa04afb Start handling messages from the processor
Does route ProcessorMessage::CosignedBlock. Rest are stubbed with TODO.
2025-01-12 06:07:55 -05:00
Luke Parker
e7de5125a2 Have processor-messages use CosignIntent/SignedCosign, not the historic cosign format
Has yet to update the processor accordingly.
2025-01-12 05:52:33 -05:00
Luke Parker
158140c3a7 Add a proper error for intake_cosign 2025-01-12 05:49:17 -05:00
Luke Parker
df9a9adaa8 Remove direct dependencies of void, async-trait 2025-01-12 03:48:43 -05:00
Luke Parker
d854807edd Make message_queue::client::Client::send fallible
Allows tasks to report the errors themselves and handle retry in our
standardized way.
2025-01-11 21:57:58 -05:00
17 changed files with 589 additions and 205 deletions

4
Cargo.lock generated
View File

@@ -8363,7 +8363,6 @@ dependencies = [
"serai-task",
"tokio",
"tributary-sdk",
"void",
"zeroize",
]
@@ -8402,7 +8401,6 @@ dependencies = [
name = "serai-coordinator-tests"
version = "0.1.0"
dependencies = [
"async-trait",
"blake2",
"borsh",
"ciphersuite",
@@ -8605,7 +8603,6 @@ dependencies = [
name = "serai-full-stack-tests"
version = "0.1.0"
dependencies = [
"async-trait",
"bitcoin-serai",
"curve25519-dalek",
"dockertest",
@@ -9001,6 +8998,7 @@ dependencies = [
"hex",
"parity-scale-codec",
"serai-coins-primitives",
"serai-cosign",
"serai-in-instructions-primitives",
"serai-primitives",
"serai-validator-sets-primitives",

View File

@@ -88,7 +88,6 @@ impl<D: Db> ContinuallyRan for CosignIntendTask<D> {
}
let block_hash = block.hash();
SubstrateBlockHash::set(&mut txn, block_number, &block_hash);
SubstrateBlockNumber::set(&mut txn, block_hash, &block_number);
let global_session_for_this_block = LatestGlobalSessionIntended::get(&txn);

View File

@@ -128,7 +128,6 @@ create_db! {
// An index of Substrate blocks
SubstrateBlockHash: (block_number: u64) -> [u8; 32],
SubstrateBlockNumber: (block_hash: [u8; 32]) -> u64,
// A mapping from a global session's ID to its relevant information.
GlobalSessions: (global_session: [u8; 32]) -> GlobalSession,
// The last block to be cosigned by a global session.
@@ -229,6 +228,43 @@ pub trait RequestNotableCosigns: 'static + Send {
#[derive(Debug)]
pub struct Faulted;
/// An error incurred while intaking a cosign.
#[derive(Debug)]
pub enum IntakeCosignError {
/// Cosign is for a not-yet-indexed block
NotYetIndexedBlock,
/// A later cosign for this cosigner has already been handled
StaleCosign,
/// The cosign's global session isn't recognized
UnrecognizedGlobalSession,
/// The cosign is for a block before its global session starts
BeforeGlobalSessionStart,
/// The cosign is for a block after its global session ends
AfterGlobalSessionEnd,
/// The cosign's signing network wasn't a participant in this global session
NonParticipatingNetwork,
/// The cosign had an invalid signature
InvalidSignature,
/// The cosign is for a global session which has yet to have its declaration block cosigned
FutureGlobalSession,
}
impl IntakeCosignError {
/// If this error is temporal to the local view
pub fn temporal(&self) -> bool {
match self {
IntakeCosignError::NotYetIndexedBlock |
IntakeCosignError::StaleCosign |
IntakeCosignError::UnrecognizedGlobalSession |
IntakeCosignError::FutureGlobalSession => true,
IntakeCosignError::BeforeGlobalSessionStart |
IntakeCosignError::AfterGlobalSessionEnd |
IntakeCosignError::NonParticipatingNetwork |
IntakeCosignError::InvalidSignature => false,
}
}
}
/// The interface to manage cosigning with.
pub struct Cosigning<D: Db> {
db: D,
@@ -282,13 +318,6 @@ impl<D: Db> Cosigning<D> {
))
}
/// Fetch a finalized block's number by its hash.
///
/// This block is not guaranteed to be cosigned.
pub fn finalized_block_number(getter: &impl Get, block_hash: [u8; 32]) -> Option<u64> {
SubstrateBlockNumber::get(getter, block_hash)
}
/// Fetch the notable cosigns for a global session in order to respond to requests.
///
/// If this global session hasn't produced any notable cosigns, this will return the latest
@@ -335,25 +364,15 @@ impl<D: Db> Cosigning<D> {
}
/// Intake a cosign.
///
/// - Returns Err(_) if there was an error trying to validate the cosign.
/// - Returns Ok(true) if the cosign was successfully handled or could not be handled at this
/// time.
/// - Returns Ok(false) if the cosign was invalid.
//
// We collapse a cosign which shouldn't be handled yet into a valid cosign (`Ok(true)`) as we
// assume we'll either explicitly request it if we need it or we'll naturally see it (or a later,
// more relevant, cosign) again.
//
// Takes `&mut self` as this should only be called once at any given moment.
// TODO: Don't overload bool here
pub fn intake_cosign(&mut self, signed_cosign: &SignedCosign) -> Result<bool, String> {
pub fn intake_cosign(&mut self, signed_cosign: &SignedCosign) -> Result<(), IntakeCosignError> {
let cosign = &signed_cosign.cosign;
let network = cosign.cosigner;
// Check our indexed blockchain includes a block with this block number
let Some(our_block_hash) = SubstrateBlockHash::get(&self.db, cosign.block_number) else {
return Ok(true);
Err(IntakeCosignError::NotYetIndexedBlock)?
};
let faulty = cosign.block_hash != our_block_hash;
@@ -363,20 +382,19 @@ impl<D: Db> Cosigning<D> {
NetworksLatestCosignedBlock::get(&self.db, cosign.global_session, network)
{
if existing.cosign.block_number >= cosign.block_number {
return Ok(true);
Err(IntakeCosignError::StaleCosign)?;
}
}
}
let Some(global_session) = GlobalSessions::get(&self.db, cosign.global_session) else {
// Unrecognized global session
return Ok(true);
Err(IntakeCosignError::UnrecognizedGlobalSession)?
};
// Check the cosigned block number is in range to the global session
if cosign.block_number < global_session.start_block_number {
// Cosign is for a block predating the global session
return Ok(false);
Err(IntakeCosignError::BeforeGlobalSessionStart)?;
}
if !faulty {
// This prevents a malicious validator set, on the same chain, from producing a cosign after
@@ -384,7 +402,7 @@ impl<D: Db> Cosigning<D> {
if let Some(last_block) = GlobalSessionsLastBlock::get(&self.db, cosign.global_session) {
if cosign.block_number > last_block {
// Cosign is for a block after the last block this global session should have signed
return Ok(false);
Err(IntakeCosignError::AfterGlobalSessionEnd)?;
}
}
}
@@ -393,13 +411,13 @@ impl<D: Db> Cosigning<D> {
{
let key = Public::from({
let Some(key) = global_session.keys.get(&network) else {
return Ok(false);
Err(IntakeCosignError::NonParticipatingNetwork)?
};
*key
});
if !signed_cosign.verify_signature(key) {
return Ok(false);
Err(IntakeCosignError::InvalidSignature)?;
}
}
@@ -415,7 +433,7 @@ impl<D: Db> Cosigning<D> {
// block declaring it was cosigned
if (global_session.start_block_number - 1) > latest_cosigned_block_number {
drop(txn);
return Ok(true);
return Err(IntakeCosignError::FutureGlobalSession);
}
// This is safe as it's in-range and newer, as prior checked since it isn't faulty
@@ -429,9 +447,10 @@ impl<D: Db> Cosigning<D> {
let mut weight_cosigned = 0;
for fault in &faults {
let Some(stake) = global_session.stakes.get(&fault.cosign.cosigner) else {
Err("cosigner with recognized key didn't have a stake entry saved".to_string())?
};
let stake = global_session
.stakes
.get(&fault.cosign.cosigner)
.expect("cosigner with recognized key didn't have a stake entry saved");
weight_cosigned += stake;
}
@@ -443,7 +462,7 @@ impl<D: Db> Cosigning<D> {
}
txn.commit();
Ok(true)
Ok(())
}
/// Receive intended cosigns to produce for this ValidatorSet.

View File

@@ -33,7 +33,6 @@ serai-client = { path = "../../../substrate/client", default-features = false, f
serai-cosign = { path = "../../cosign" }
tributary-sdk = { path = "../../tributary-sdk" }
void = { version = "1", default-features = false }
futures-util = { version = "0.3", default-features = false, features = ["std"] }
tokio = { version = "1", default-features = false, features = ["sync"] }
libp2p = { version = "0.52", default-features = false, features = ["tokio", "tcp", "noise", "yamux", "ping", "request-response", "gossipsub", "macros"] }

View File

@@ -225,8 +225,8 @@ impl SwarmTask {
SwarmEvent::Behaviour(
BehaviorEvent::AllowList(event) | BehaviorEvent::ConnectionLimits(event)
) => {
// Ensure these are unreachable cases, not actual events
let _: void::Void = event;
// This *is* an exhaustive match as these events are empty enums
match event {}
}
SwarmEvent::Behaviour(
BehaviorEvent::Ping(ping::Event { peer: _, connection, result, })

View File

@@ -8,9 +8,9 @@ use serai_client::{
validator_sets::primitives::{Session, ValidatorSet},
};
use serai_cosign::CosignIntent;
use serai_cosign::SignedCosign;
use serai_coordinator_substrate::NewSetInformation;
use serai_coordinator_tributary::Transaction;
#[cfg(all(feature = "parity-db", not(feature = "rocksdb")))]
pub(crate) type Db = serai_db::ParityDb;
@@ -66,14 +66,48 @@ pub(crate) fn prune_tributary_db(set: ValidatorSet) {
create_db! {
Coordinator {
// The currently active Tributaries
ActiveTributaries: () -> Vec<NewSetInformation>,
// The latest Tributary to have been retired for a network
// Since Tributaries are retired sequentially, this is informative to if any Tributary has been
// retired
RetiredTributary: (network: NetworkId) -> Session,
// The last handled message from a Processor
LastProcessorMessage: (network: NetworkId) -> u64,
// Cosigns we produced and tried to intake yet incurred an error while doing so
ErroneousCosigns: () -> Vec<SignedCosign>,
}
}
db_channel! {
Coordinator {
// Cosigns we produced
SignedCosigns: () -> SignedCosign,
// Tributaries to clean up upon reboot
TributaryCleanup: () -> ValidatorSet,
PendingCosigns: (set: ValidatorSet) -> CosignIntent,
}
}
mod _internal_db {
use super::*;
db_channel! {
Coordinator {
// Tributary transactions to publish
TributaryTransactions: (set: ValidatorSet) -> Transaction,
}
}
}
pub(crate) struct TributaryTransactions;
impl TributaryTransactions {
pub(crate) fn send(txn: &mut impl DbTxn, set: ValidatorSet, tx: &Transaction) {
// If this set has yet to be retired, send this transaction
if RetiredTributary::get(txn, set.network).map(|session| session.0) < Some(set.session.0) {
_internal_db::TributaryTransactions::send(txn, set, tx);
}
}
pub(crate) fn try_recv(txn: &mut impl DbTxn, set: ValidatorSet) -> Option<Transaction> {
_internal_db::TributaryTransactions::try_recv(txn, set)
}
}

View File

@@ -1,5 +1,5 @@
use core::{ops::Deref, time::Duration};
use std::{sync::Arc, time::Instant};
use std::{sync::Arc, collections::HashMap, time::Instant};
use zeroize::{Zeroize, Zeroizing};
use rand_core::{RngCore, OsRng};
@@ -9,16 +9,22 @@ use ciphersuite::{
Ciphersuite, Ristretto,
};
use borsh::BorshDeserialize;
use tokio::sync::mpsc;
use serai_client::{primitives::PublicKey, Serai};
use serai_client::{
primitives::{NetworkId, PublicKey},
validator_sets::primitives::ValidatorSet,
Serai,
};
use message_queue::{Service, client::MessageQueue};
use serai_task::{Task, TaskHandle, ContinuallyRan};
use serai_cosign::{SignedCosign, Cosigning};
use serai_cosign::{Faulted, SignedCosign, Cosigning};
use serai_coordinator_substrate::{CanonicalEventStream, EphemeralEventStream, SignSlashReport};
use serai_coordinator_tributary::Transaction;
use serai_coordinator_tributary::{Signed, Transaction, SubstrateBlockPlans};
mod db;
use db::*;
@@ -63,18 +69,60 @@ async fn serai() -> Arc<Serai> {
}
}
fn spawn_cosigning(
db: impl serai_db::Db,
fn spawn_cosigning<D: serai_db::Db>(
mut db: D,
serai: Arc<Serai>,
p2p: impl p2p::P2p,
tasks_to_run_upon_cosigning: Vec<TaskHandle>,
mut p2p_cosigns: mpsc::UnboundedReceiver<SignedCosign>,
mut signed_cosigns: mpsc::UnboundedReceiver<SignedCosign>,
) {
let mut cosigning = Cosigning::spawn(db, serai, p2p.clone(), tasks_to_run_upon_cosigning);
let mut cosigning = Cosigning::spawn(db.clone(), serai, p2p.clone(), tasks_to_run_upon_cosigning);
tokio::spawn(async move {
const COSIGN_LOOP_INTERVAL: Duration = Duration::from_secs(5);
let last_cosign_rebroadcast = Instant::now();
loop {
// Intake our own cosigns
match Cosigning::<D>::latest_cosigned_block_number(&db) {
Ok(latest_cosigned_block_number) => {
let mut txn = db.txn();
// The cosigns we prior tried to intake yet failed to
let mut cosigns = ErroneousCosigns::get(&txn).unwrap_or(vec![]);
// The cosigns we have yet to intake
while let Some(cosign) = SignedCosigns::try_recv(&mut txn) {
cosigns.push(cosign);
}
let mut erroneous = vec![];
for cosign in cosigns {
// If this cosign is stale, move on
if cosign.cosign.block_number <= latest_cosigned_block_number {
continue;
}
match cosigning.intake_cosign(&cosign) {
// Publish this cosign
Ok(()) => p2p.publish_cosign(cosign).await,
Err(e) => {
assert!(e.temporal(), "signed an invalid cosign: {e:?}");
// Since this had a temporal error, queue it to try again later
erroneous.push(cosign);
}
};
}
// Save the cosigns with temporal errors to the database
ErroneousCosigns::set(&mut txn, &erroneous);
txn.commit();
}
Err(Faulted) => {
// We don't panic here as the following code rebroadcasts our cosigns which is
// necessary to inform other coordinators of the faulty cosigns
log::error!("cosigning faulted");
}
}
let time_till_cosign_rebroadcast = (last_cosign_rebroadcast +
serai_cosign::BROADCAST_FREQUENCY)
.saturating_duration_since(Instant::now());
@@ -86,19 +134,134 @@ fn spawn_cosigning(
}
cosign = p2p_cosigns.recv() => {
let cosign = cosign.expect("p2p cosigns channel was dropped?");
let _: Result<_, _> = cosigning.intake_cosign(&cosign);
}
cosign = signed_cosigns.recv() => {
let cosign = cosign.expect("signed cosigns channel was dropped?");
// TODO: Handle this error
let _: Result<_, _> = cosigning.intake_cosign(&cosign);
p2p.publish_cosign(cosign).await;
if cosigning.intake_cosign(&cosign).is_ok() {
p2p.publish_cosign(cosign).await;
}
}
// Make sure this loop runs at least this often
() = tokio::time::sleep(COSIGN_LOOP_INTERVAL) => {}
}
}
});
}
async fn handle_processor_messages(
mut db: impl serai_db::Db,
message_queue: Arc<MessageQueue>,
network: NetworkId,
) {
loop {
let (msg_id, msg) = {
let msg = message_queue.next(Service::Processor(network)).await;
// Check this message's sender is as expected
assert_eq!(msg.from, Service::Processor(network));
// Check this message's ID is as expected
let last = LastProcessorMessage::get(&db, network);
let next = last.map(|id| id + 1).unwrap_or(0);
// This should either be the last message's ID, if we committed but didn't send our ACK, or
// the expected next message's ID
assert!((Some(msg.id) == last) || (msg.id == next));
// TODO: Check msg.sig
// If this is the message we already handled, and just failed to ACK, ACK it now and move on
if Some(msg.id) == last {
message_queue.ack(Service::Processor(network), msg.id).await;
continue;
}
(msg.id, messages::ProcessorMessage::deserialize(&mut msg.msg.as_slice()).unwrap())
};
let mut txn = db.txn();
match msg {
messages::ProcessorMessage::KeyGen(msg) => match msg {
messages::key_gen::ProcessorMessage::Participation { session, participation } => {
let set = ValidatorSet { network, session };
TributaryTransactions::send(
&mut txn,
set,
&Transaction::DkgParticipation { participation, signed: Signed::default() },
);
}
messages::key_gen::ProcessorMessage::GeneratedKeyPair {
session,
substrate_key,
network_key,
} => todo!("TODO Transaction::DkgConfirmationPreprocess"),
messages::key_gen::ProcessorMessage::Blame { session, participant } => {
let set = ValidatorSet { network, session };
TributaryTransactions::send(
&mut txn,
set,
&Transaction::RemoveParticipant {
participant: todo!("TODO"),
signed: Signed::default(),
},
);
}
},
messages::ProcessorMessage::Sign(msg) => match msg {
messages::sign::ProcessorMessage::InvalidParticipant { session, participant } => {
let set = ValidatorSet { network, session };
TributaryTransactions::send(
&mut txn,
set,
&Transaction::RemoveParticipant {
participant: todo!("TODO"),
signed: Signed::default(),
},
);
}
messages::sign::ProcessorMessage::Preprocesses { id, preprocesses } => {
todo!("TODO Transaction::Batch + Transaction::Sign")
}
messages::sign::ProcessorMessage::Shares { id, shares } => todo!("TODO Transaction::Sign"),
},
messages::ProcessorMessage::Coordinator(msg) => match msg {
messages::coordinator::ProcessorMessage::CosignedBlock { cosign } => {
SignedCosigns::send(&mut txn, &cosign);
}
messages::coordinator::ProcessorMessage::SignedBatch { batch } => {
todo!("TODO Save to DB, have task read from DB and publish to Serai")
}
messages::coordinator::ProcessorMessage::SignedSlashReport { session, signature } => {
todo!("TODO Save to DB, have task read from DB and publish to Serai")
}
},
messages::ProcessorMessage::Substrate(msg) => match msg {
messages::substrate::ProcessorMessage::SubstrateBlockAck { block, plans } => {
let mut by_session = HashMap::new();
for plan in plans {
by_session
.entry(plan.session)
.or_insert_with(|| Vec::with_capacity(1))
.push(plan.transaction_plan_id);
}
for (session, plans) in by_session {
let set = ValidatorSet { network, session };
SubstrateBlockPlans::set(&mut txn, set, block, &plans);
TributaryTransactions::send(
&mut txn,
set,
&Transaction::SubstrateBlock { hash: block },
);
}
}
},
}
// Mark this as the last handled message
LastProcessorMessage::set(&mut txn, network, &msg_id);
// Commit the txn
txn.commit();
// Now that we won't handle this message again, acknowledge it so we won't see it again
message_queue.ack(Service::Processor(network), msg_id).await;
}
}
#[tokio::main]
async fn main() {
// Override the panic handler with one which will panic if any tokio task panics
@@ -148,6 +311,8 @@ async fn main() {
prune_tributary_db(to_cleanup);
// Drain the cosign intents created for this set
while !Cosigning::<Db>::intended_cosigns(&mut txn, to_cleanup).is_empty() {}
// Drain the transactions to publish for this set
while TributaryTransactions::try_recv(&mut txn, to_cleanup).is_some() {}
// Remove the SignSlashReport notification
SignSlashReport::try_recv(&mut txn, to_cleanup);
}
@@ -217,7 +382,6 @@ async fn main() {
);
// Spawn the cosign handler
let (signed_cosigns_send, signed_cosigns_recv) = mpsc::unbounded_channel();
spawn_cosigning(
db.clone(),
serai.clone(),
@@ -225,7 +389,6 @@ async fn main() {
// Run the Substrate scanners once we cosign new blocks
vec![substrate_canonical_task, substrate_ephemeral_task],
p2p_cosigns_recv,
signed_cosigns_recv,
);
// Spawn all Tributaries on-disk
@@ -254,7 +417,14 @@ async fn main() {
.continually_run(substrate_task_def, vec![]),
);
// TODO: Handle processor messages
// Handle all of the Processors' messages
for network in serai_client::primitives::NETWORKS {
if network == NetworkId::Serai {
continue;
}
tokio::spawn(handle_processor_messages(db.clone(), message_queue.clone(), network));
}
todo!("TODO")
// Run the spawned tasks ad-infinitum
core::future::pending().await
}

View File

@@ -69,8 +69,7 @@ impl<P: P2p> ContinuallyRan for SubstrateTask<P> {
intent: msg.intent(),
};
let msg = borsh::to_vec(&msg).unwrap();
// TODO: Make this fallible
self.message_queue.queue(metadata, msg).await;
self.message_queue.queue(metadata, msg).await?;
txn.commit();
made_progress = true;
}
@@ -132,8 +131,7 @@ impl<P: P2p> ContinuallyRan for SubstrateTask<P> {
intent: msg.intent(),
};
let msg = borsh::to_vec(&msg).unwrap();
// TODO: Make this fallible
self.message_queue.queue(metadata, msg).await;
self.message_queue.queue(metadata, msg).await?;
// Commit the transaction for all of this
txn.commit();

View File

@@ -8,27 +8,68 @@ use ciphersuite::{Ciphersuite, Ristretto};
use tokio::sync::mpsc;
use serai_db::{DbTxn, Db as DbTrait};
use serai_db::{Get, DbTxn, Db as DbTrait, create_db, db_channel};
use scale::Encode;
use serai_client::validator_sets::primitives::ValidatorSet;
use tributary_sdk::{TransactionError, ProvidedError, Tributary};
use tributary_sdk::{TransactionKind, TransactionError, ProvidedError, TransactionTrait, Tributary};
use serai_task::{Task, TaskHandle, ContinuallyRan};
use message_queue::{Service, Metadata, client::MessageQueue};
use serai_cosign::Cosigning;
use serai_cosign::{Faulted, CosignIntent, Cosigning};
use serai_coordinator_substrate::{NewSetInformation, SignSlashReport};
use serai_coordinator_tributary::{Transaction, ProcessorMessages, ScanTributaryTask};
use serai_coordinator_tributary::{Transaction, ProcessorMessages, CosignIntents, ScanTributaryTask};
use serai_coordinator_p2p::P2p;
use crate::Db;
use crate::{Db, TributaryTransactions};
db_channel! {
Coordinator {
PendingCosigns: (set: ValidatorSet) -> CosignIntent,
}
}
/// Provide a Provided Transaction to the Tributary.
///
/// This is not a well-designed function. This is specific to the context in which its called,
/// within this file. It should only be considered an internal helper for this domain alone.
async fn provide_transaction<TD: DbTrait, P: P2p>(
set: ValidatorSet,
tributary: &Tributary<TD, Transaction, P>,
tx: Transaction,
) {
match tributary.provide_transaction(tx.clone()).await {
// The Tributary uses its own DB, so we may provide this multiple times if we reboot before
// committing the txn which provoked this
Ok(()) | Err(ProvidedError::AlreadyProvided) => {}
Err(ProvidedError::NotProvided) => {
panic!("providing a Transaction which wasn't a Provided transaction: {tx:?}");
}
Err(ProvidedError::InvalidProvided(e)) => {
panic!("providing an invalid Provided transaction, tx: {tx:?}, error: {e:?}")
}
// The Tributary's scan task won't advance if we don't have the Provided transactions
// present on-chain, and this enters an infinite loop to block the calling task from
// advancing
Err(ProvidedError::LocalMismatchesOnChain) => loop {
log::error!(
"Tributary {:?} was supposed to provide {:?} but peers disagree, halting Tributary",
set,
tx,
);
// Print this every five minutes as this does need to be handled
tokio::time::sleep(Duration::from_secs(5 * 60)).await;
},
}
}
/// Provides Cosign/Cosigned Transactions onto the Tributary.
pub(crate) struct ProvideCosignCosignedTransactionsTask<CD: DbTrait, TD: DbTrait, P: P2p> {
db: CD,
tributary_db: TD,
set: NewSetInformation,
tributary: Tributary<TD, Transaction, P>,
}
@@ -36,40 +77,6 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
for ProvideCosignCosignedTransactionsTask<CD, TD, P>
{
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
/// Provide a Provided Transaction to the Tributary.
///
/// This is not a well-designed function. This is specific to the context in which its called,
/// within this file. It should only be considered an internal helper for this domain alone.
async fn provide_transaction<TD: DbTrait, P: P2p>(
set: ValidatorSet,
tributary: &Tributary<TD, Transaction, P>,
tx: Transaction,
) {
match tributary.provide_transaction(tx.clone()).await {
// The Tributary uses its own DB, so we may provide this multiple times if we reboot before
// committing the txn which provoked this
Ok(()) | Err(ProvidedError::AlreadyProvided) => {}
Err(ProvidedError::NotProvided) => {
panic!("providing a Transaction which wasn't a Provided transaction: {tx:?}");
}
Err(ProvidedError::InvalidProvided(e)) => {
panic!("providing an invalid Provided transaction, tx: {tx:?}, error: {e:?}")
}
Err(ProvidedError::LocalMismatchesOnChain) => loop {
// The Tributary's scan task won't advance if we don't have the Provided transactions
// present on-chain, and this enters an infinite loop to block the calling task from
// advancing
log::error!(
"Tributary {:?} was supposed to provide {:?} but peers disagree, halting Tributary",
set,
tx,
);
// Print this every five minutes as this does need to be handled
tokio::time::sleep(Duration::from_secs(5 * 60)).await;
},
}
}
async move {
let mut made_progress = false;
@@ -79,16 +86,27 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
let mut txn = self.db.txn();
// Fetch the next cosign this tributary should handle
let Some(cosign) = crate::PendingCosigns::try_recv(&mut txn, self.set.set) else { break };
let Some(cosign) = PendingCosigns::try_recv(&mut txn, self.set.set) else { break };
pending_notable_cosign = cosign.notable;
// If we (Serai) haven't cosigned this block, break as this is still pending
let Ok(latest) = Cosigning::<CD>::latest_cosigned_block_number(&txn) else { break };
let latest = match Cosigning::<CD>::latest_cosigned_block_number(&txn) {
Ok(latest) => latest,
Err(Faulted) => {
log::error!("cosigning faulted");
Err("cosigning faulted")?
}
};
if latest < cosign.block_number {
break;
}
// Because we've cosigned it, provide the TX for that
{
let mut txn = self.tributary_db.txn();
CosignIntents::provide(&mut txn, self.set.set, &cosign);
txn.commit();
}
provide_transaction(
self.set.set,
&self.tributary,
@@ -109,7 +127,7 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
// intended_cosigns will only yield up to and including the next notable cosign
for cosign in Cosigning::<CD>::intended_cosigns(&mut txn, self.set.set) {
// Flag this cosign as pending
crate::PendingCosigns::send(&mut txn, self.set.set, &cosign);
PendingCosigns::send(&mut txn, self.set.set, &cosign);
// Provide the transaction to queue it for work
provide_transaction(
self.set.set,
@@ -127,6 +145,66 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
}
}
/// Adds all of the transactions sent via `TributaryTransactions`.
pub(crate) struct AddTributaryTransactionsTask<CD: DbTrait, TD: DbTrait, P: P2p> {
db: CD,
tributary_db: TD,
tributary: Tributary<TD, Transaction, P>,
set: ValidatorSet,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
}
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for AddTributaryTransactionsTask<CD, TD, P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move {
let mut made_progress = false;
loop {
let mut txn = self.db.txn();
let Some(mut tx) = TributaryTransactions::try_recv(&mut txn, self.set) else { break };
let kind = tx.kind();
match kind {
TransactionKind::Provided(_) => provide_transaction(self.set, &self.tributary, tx).await,
TransactionKind::Unsigned | TransactionKind::Signed(_, _) => {
// If this is a signed transaction, sign it
if matches!(kind, TransactionKind::Signed(_, _)) {
tx.sign(&mut OsRng, self.tributary.genesis(), &self.key);
}
// Actually add the transaction
// TODO: If this is a preprocess, make sure the topic has been recognized
let res = self.tributary.add_transaction(tx.clone()).await;
match &res {
// Fresh publication, already published
Ok(true | false) => {}
Err(
TransactionError::TooLargeTransaction |
TransactionError::InvalidSigner |
TransactionError::InvalidNonce |
TransactionError::InvalidSignature |
TransactionError::InvalidContent,
) => {
panic!("created an invalid transaction, tx: {tx:?}, err: {res:?}");
}
// We've published too many transactions recently
// Drop this txn to try to publish it again later on a future iteration
Err(TransactionError::TooManyInMempool) => {
drop(txn);
break;
}
// This isn't a Provided transaction so this should never be hit
Err(TransactionError::ProvidedAddedToMempool) => unreachable!(),
}
}
}
made_progress = true;
txn.commit();
}
Ok(made_progress)
}
}
}
/// Takes the messages from ScanTributaryTask and publishes them to the message-queue.
pub(crate) struct TributaryProcessorMessagesTask<TD: DbTrait> {
tributary_db: TD,
@@ -146,8 +224,7 @@ impl<TD: DbTrait> ContinuallyRan for TributaryProcessorMessagesTask<TD> {
intent: msg.intent(),
};
let msg = borsh::to_vec(&msg).unwrap();
// TODO: Make this fallible
self.message_queue.queue(metadata, msg).await;
self.message_queue.queue(metadata, msg).await?;
txn.commit();
made_progress = true;
}
@@ -190,7 +267,10 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for SignSlashReportTask<CD
}
// We've published too many transactions recently
// Drop this txn to try to publish it again later on a future iteration
Err(TransactionError::TooManyInMempool) => return Ok(false),
Err(TransactionError::TooManyInMempool) => {
drop(txn);
return Ok(false);
}
// This isn't a Provided transaction so this should never be hit
Err(TransactionError::ProvidedAddedToMempool) => unreachable!(),
}
@@ -294,6 +374,7 @@ pub(crate) async fn spawn_tributary<P: P2p>(
tokio::spawn(
(ProvideCosignCosignedTransactionsTask {
db: db.clone(),
tributary_db: tributary_db.clone(),
set: set.clone(),
tributary: tributary.clone(),
})
@@ -314,7 +395,7 @@ pub(crate) async fn spawn_tributary<P: P2p>(
// Spawn the scan task
let (scan_tributary_task_def, scan_tributary_task) = Task::new();
tokio::spawn(
ScanTributaryTask::<_, _, P>::new(db.clone(), tributary_db.clone(), &set, reader)
ScanTributaryTask::<_, P>::new(tributary_db.clone(), &set, reader)
// This is the only handle for this TributaryProcessorMessagesTask, so when this task is
// dropped, it will be too
.continually_run(scan_tributary_task_def, vec![scan_tributary_messages_task]),
@@ -325,14 +406,27 @@ pub(crate) async fn spawn_tributary<P: P2p>(
tokio::spawn(
(SignSlashReportTask {
db: db.clone(),
tributary_db,
tributary_db: tributary_db.clone(),
tributary: tributary.clone(),
set: set.clone(),
key: serai_key,
key: serai_key.clone(),
})
.continually_run(sign_slash_report_task_def, vec![]),
);
// Spawn the add transactions task
let (add_tributary_transactions_task_def, add_tributary_transactions_task) = Task::new();
tokio::spawn(
(AddTributaryTransactionsTask {
db: db.clone(),
tributary_db,
tributary: tributary.clone(),
set: set.set,
key: serai_key,
})
.continually_run(add_tributary_transactions_task_def, vec![]),
);
// Whenever a new block occurs, immediately run the scan task
// This function also preserves the ProvideCosignCosignedTransactionsTask handle until the
// Tributary is retired, ensuring it isn't dropped prematurely and that the task don't run ad
@@ -342,6 +436,10 @@ pub(crate) async fn spawn_tributary<P: P2p>(
set.set,
tributary,
scan_tributary_task,
vec![provide_cosign_cosigned_transactions_task, sign_slash_report_task],
vec![
provide_cosign_cosigned_transactions_task,
sign_slash_report_task,
add_tributary_transactions_task,
],
));
}

View File

@@ -9,6 +9,8 @@ use messages::sign::{VariantSignId, SignId};
use serai_db::*;
use serai_cosign::CosignIntent;
use crate::transaction::SigningProtocolRound;
/// A topic within the database which the group participates in
@@ -187,6 +189,8 @@ create_db!(
// The slash points a validator has accrued, with u32::MAX representing a fatal slash.
SlashPoints: (set: ValidatorSet, validator: SeraiAddress) -> u32,
// The cosign intent for a Substrate block
CosignIntents: (set: ValidatorSet, substrate_block_hash: [u8; 32]) -> CosignIntent,
// The latest Substrate block to cosign.
LatestSubstrateBlockToCosign: (set: ValidatorSet) -> [u8; 32],
// The hash of the block we're actively cosigning.
@@ -194,6 +198,9 @@ create_db!(
// If this block has already been cosigned.
Cosigned: (set: ValidatorSet, substrate_block_hash: [u8; 32]) -> (),
// The plans to whitelist upon a `Transaction::SubstrateBlock` being included on-chain.
SubstrateBlockPlans: (set: ValidatorSet, substrate_block_hash: [u8; 32]) -> Vec<[u8; 32]>,
// The weight accumulated for a topic.
AccumulatedWeight: (set: ValidatorSet, topic: Topic) -> u64,
// The entries accumulated for a topic, by validator.

View File

@@ -24,14 +24,13 @@ use tributary_sdk::{
Transaction as TributaryTransaction, Block, TributaryReader, P2p,
};
use serai_cosign::Cosigning;
use serai_cosign::CosignIntent;
use serai_coordinator_substrate::NewSetInformation;
use messages::sign::VariantSignId;
mod transaction;
pub(crate) use transaction::{SigningProtocolRound, Signed};
pub use transaction::Transaction;
pub use transaction::{SigningProtocolRound, Signed, Transaction};
mod db;
use db::*;
@@ -45,17 +44,58 @@ impl ProcessorMessages {
}
}
struct ScanBlock<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> {
/// The cosign intents.
pub struct CosignIntents;
impl CosignIntents {
/// Provide a CosignIntent for this Tributary.
///
/// This must be done before the associated `Transaction::Cosign` is provided.
pub fn provide(txn: &mut impl DbTxn, set: ValidatorSet, intent: &CosignIntent) {
db::CosignIntents::set(txn, set, intent.block_hash, intent);
}
fn take(
txn: &mut impl DbTxn,
set: ValidatorSet,
substrate_block_hash: [u8; 32],
) -> Option<CosignIntent> {
db::CosignIntents::take(txn, set, substrate_block_hash)
}
}
/// The plans to whitelist upon a `Transaction::SubstrateBlock` being included on-chain.
pub struct SubstrateBlockPlans;
impl SubstrateBlockPlans {
/// Set the plans to whitelist upon the associated `Transaction::SubstrateBlock` being included
/// on-chain.
///
/// This must be done before the associated `Transaction::Cosign` is provided.
pub fn set(
txn: &mut impl DbTxn,
set: ValidatorSet,
substrate_block_hash: [u8; 32],
plans: &Vec<[u8; 32]>,
) {
db::SubstrateBlockPlans::set(txn, set, substrate_block_hash, &plans);
}
fn take(
txn: &mut impl DbTxn,
set: ValidatorSet,
substrate_block_hash: [u8; 32],
) -> Option<Vec<[u8; 32]>> {
db::SubstrateBlockPlans::take(txn, set, substrate_block_hash)
}
}
struct ScanBlock<'a, TD: Db, TDT: DbTxn, P: P2p> {
_td: PhantomData<TD>,
_p2p: PhantomData<P>,
cosign_db: &'a CD,
tributary_txn: &'a mut TDT,
set: ValidatorSet,
validators: &'a [SeraiAddress],
total_weight: u64,
validator_weights: &'a HashMap<SeraiAddress, u64>,
}
impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> {
impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
fn potentially_start_cosign(&mut self) {
// Don't start a new cosigning instance if we're actively running one
if TributaryDb::actively_cosigning(self.tributary_txn, self.set).is_some() {
@@ -74,20 +114,20 @@ impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> {
return;
}
let Some(substrate_block_number) =
Cosigning::<CD>::finalized_block_number(self.cosign_db, latest_substrate_block_to_cosign)
else {
// This is a valid panic as we shouldn't be scanning this block if we didn't provide all
// Provided transactions within it, and the block to cosign is a Provided transaction
panic!("cosigning a block our cosigner didn't index")
};
let intent =
CosignIntents::take(self.tributary_txn, self.set, latest_substrate_block_to_cosign)
.expect("Transaction::Cosign locally provided but CosignIntents wasn't populated");
assert_eq!(
intent.block_hash, latest_substrate_block_to_cosign,
"provided CosignIntent wasn't saved by its block hash"
);
// Mark us as actively cosigning
TributaryDb::start_cosigning(
self.tributary_txn,
self.set,
latest_substrate_block_to_cosign,
substrate_block_number,
intent.block_number,
);
// Send the message for the processor to start signing
TributaryDb::send_message(
@@ -95,8 +135,7 @@ impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> {
self.set,
messages::coordinator::CoordinatorMessage::CosignSubstrateBlock {
session: self.set.session,
block_number: substrate_block_number,
block: latest_substrate_block_to_cosign,
intent,
},
);
}
@@ -206,11 +245,32 @@ impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> {
}
Transaction::SubstrateBlock { hash } => {
// Whitelist all of the IDs this Substrate block causes to be signed
todo!("TODO")
let plans = SubstrateBlockPlans::take(self.tributary_txn, self.set, hash).expect(
"Transaction::SubstrateBlock locally provided but SubstrateBlockPlans wasn't populated",
);
for plan in plans {
TributaryDb::recognize_topic(
self.tributary_txn,
self.set,
Topic::Sign {
id: VariantSignId::Transaction(plan),
attempt: 0,
round: SigningProtocolRound::Preprocess,
},
);
}
}
Transaction::Batch { hash } => {
// Whitelist the signing of this batch, publishing our own preprocess
todo!("TODO")
// Whitelist the signing of this batch
TributaryDb::recognize_topic(
self.tributary_txn,
self.set,
Topic::Sign {
id: VariantSignId::Batch(hash),
attempt: 0,
round: SigningProtocolRound::Preprocess,
},
);
}
Transaction::SlashReport { slash_points, signed } => {
@@ -411,8 +471,7 @@ impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> {
}
/// The task to scan the Tributary, populating `ProcessorMessages`.
pub struct ScanTributaryTask<CD: Db, TD: Db, P: P2p> {
cosign_db: CD,
pub struct ScanTributaryTask<TD: Db, P: P2p> {
tributary_db: TD,
set: ValidatorSet,
validators: Vec<SeraiAddress>,
@@ -422,10 +481,9 @@ pub struct ScanTributaryTask<CD: Db, TD: Db, P: P2p> {
_p2p: PhantomData<P>,
}
impl<CD: Db, TD: Db, P: P2p> ScanTributaryTask<CD, TD, P> {
impl<TD: Db, P: P2p> ScanTributaryTask<TD, P> {
/// Create a new instance of this task.
pub fn new(
cosign_db: CD,
tributary_db: TD,
new_set: &NewSetInformation,
tributary: TributaryReader<TD, Transaction>,
@@ -442,7 +500,6 @@ impl<CD: Db, TD: Db, P: P2p> ScanTributaryTask<CD, TD, P> {
}
ScanTributaryTask {
cosign_db,
tributary_db,
set: new_set.set,
validators,
@@ -454,7 +511,7 @@ impl<CD: Db, TD: Db, P: P2p> ScanTributaryTask<CD, TD, P> {
}
}
impl<CD: Db, TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<CD, TD, P> {
impl<TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<TD, P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move {
let (mut last_block_number, mut last_block_hash) =
@@ -486,7 +543,6 @@ impl<CD: Db, TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<CD, TD, P> {
(ScanBlock {
_td: PhantomData::<TD>,
_p2p: PhantomData::<P>,
cosign_db: &self.cosign_db,
tributary_txn: &mut tributary_txn,
set: self.set,
validators: &self.validators,

View File

@@ -64,22 +64,20 @@ impl MessageQueue {
Self::new(service, url, priv_key)
}
#[must_use]
async fn send(socket: &mut TcpStream, msg: MessageQueueRequest) -> bool {
async fn send(socket: &mut TcpStream, msg: MessageQueueRequest) -> Result<(), String> {
let msg = borsh::to_vec(&msg).unwrap();
let Ok(()) = socket.write_all(&u32::try_from(msg.len()).unwrap().to_le_bytes()).await else {
log::warn!("couldn't send the message len");
return false;
match socket.write_all(&u32::try_from(msg.len()).unwrap().to_le_bytes()).await {
Ok(()) => {}
Err(e) => Err(format!("couldn't send the message len: {e:?}"))?,
};
let Ok(()) = socket.write_all(&msg).await else {
log::warn!("couldn't write the message");
return false;
};
true
match socket.write_all(&msg).await {
Ok(()) => {}
Err(e) => Err(format!("couldn't write the message: {e:?}"))?,
}
Ok(())
}
pub async fn queue(&self, metadata: Metadata, msg: Vec<u8>) {
// TODO: Should this use OsRng? Deterministic or deterministic + random may be better.
pub async fn queue(&self, metadata: Metadata, msg: Vec<u8>) -> Result<(), String> {
let nonce = Zeroizing::new(<Ristretto as Ciphersuite>::F::random(&mut OsRng));
let nonce_pub = Ristretto::generator() * nonce.deref();
let sig = SchnorrSignature::<Ristretto>::sign(
@@ -97,6 +95,21 @@ impl MessageQueue {
.serialize();
let msg = MessageQueueRequest::Queue { meta: metadata, msg, sig };
let mut socket = match TcpStream::connect(&self.url).await {
Ok(socket) => socket,
Err(e) => Err(format!("failed to connect to the message-queue service: {e:?}"))?,
};
Self::send(&mut socket, msg.clone()).await?;
match socket.read_u8().await {
Ok(1) => {}
Ok(b) => Err(format!("message-queue didn't return for 1 for its ack, recieved: {b}"))?,
Err(e) => Err(format!("failed to read the response from the message-queue service: {e:?}"))?,
}
Ok(())
}
pub async fn queue_with_retry(&self, metadata: Metadata, msg: Vec<u8>) {
let mut first = true;
loop {
// Sleep, so we don't hammer re-attempts
@@ -105,14 +118,9 @@ impl MessageQueue {
}
first = false;
let Ok(mut socket) = TcpStream::connect(&self.url).await else { continue };
if !Self::send(&mut socket, msg.clone()).await {
continue;
if self.queue(metadata.clone(), msg.clone()).await.is_ok() {
break;
}
if socket.read_u8().await.ok() != Some(1) {
continue;
}
break;
}
}
@@ -136,7 +144,7 @@ impl MessageQueue {
log::trace!("opened socket for next");
loop {
if !Self::send(&mut socket, msg.clone()).await {
if Self::send(&mut socket, msg.clone()).await.is_err() {
continue 'outer;
}
let status = match socket.read_u8().await {
@@ -224,7 +232,7 @@ impl MessageQueue {
first = false;
let Ok(mut socket) = TcpStream::connect(&self.url).await else { continue };
if !Self::send(&mut socket, msg.clone()).await {
if Self::send(&mut socket, msg.clone()).await.is_err() {
continue;
}
if socket.read_u8().await.ok() != Some(1) {

View File

@@ -103,6 +103,7 @@ impl Coordinator {
});
// Spawn a task to send messages to the message-queue
// TODO: Define a proper task for this and remove use of queue_with_retry
tokio::spawn({
let mut db = db.clone();
async move {
@@ -115,12 +116,12 @@ impl Coordinator {
to: Service::Coordinator,
intent: borsh::from_slice::<messages::ProcessorMessage>(&msg).unwrap().intent(),
};
message_queue.queue(metadata, msg).await;
message_queue.queue_with_retry(metadata, msg).await;
txn.commit();
}
None => {
let _ =
tokio::time::timeout(core::time::Duration::from_secs(60), sent_message_recv.recv())
tokio::time::timeout(core::time::Duration::from_secs(6), sent_message_recv.recv())
.await;
}
}

View File

@@ -29,3 +29,5 @@ serai-primitives = { path = "../../substrate/primitives", default-features = fal
in-instructions-primitives = { package = "serai-in-instructions-primitives", path = "../../substrate/in-instructions/primitives", default-features = false, features = ["std", "borsh"] }
coins-primitives = { package = "serai-coins-primitives", path = "../../substrate/coins/primitives", default-features = false, features = ["std", "borsh"] }
validator-sets-primitives = { package = "serai-validator-sets-primitives", path = "../../substrate/validator-sets/primitives", default-features = false, features = ["std", "borsh"] }
serai-cosign = { path = "../../coordinator/cosign", default-features = false }

View File

@@ -11,6 +11,8 @@ use validator_sets_primitives::{Session, KeyPair, Slash};
use coins_primitives::OutInstructionWithBalance;
use in_instructions_primitives::SignedBatch;
use serai_cosign::{CosignIntent, SignedCosign};
#[derive(Clone, Copy, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
pub struct SubstrateContext {
pub serai_time: u64,
@@ -50,7 +52,8 @@ pub mod key_gen {
}
}
#[derive(Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize)]
// This set of messages is sent entirely and solely by serai-processor-key-gen.
#[derive(Clone, BorshSerialize, BorshDeserialize)]
pub enum ProcessorMessage {
// Participated in the specified key generation protocol.
Participation { session: Session, participation: Vec<u8> },
@@ -141,7 +144,8 @@ pub mod sign {
}
}
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
// This set of messages is sent entirely and solely by serai-processor-frost-attempt-manager.
#[derive(Clone, Debug, BorshSerialize, BorshDeserialize)]
pub enum ProcessorMessage {
// Participant sent an invalid message during the sign protocol.
InvalidParticipant { session: Session, participant: Participant },
@@ -155,39 +159,25 @@ pub mod sign {
pub mod coordinator {
use super::*;
// TODO: Remove this for the one defined in serai-cosign
pub fn cosign_block_msg(block_number: u64, block: [u8; 32]) -> Vec<u8> {
const DST: &[u8] = b"Cosign";
let mut res = vec![u8::try_from(DST.len()).unwrap()];
res.extend(DST);
res.extend(block_number.to_le_bytes());
res.extend(block);
res
}
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
pub enum CoordinatorMessage {
/// Cosign the specified Substrate block.
///
/// This is sent by the Coordinator's Tributary scanner.
CosignSubstrateBlock { session: Session, block_number: u64, block: [u8; 32] },
CosignSubstrateBlock { session: Session, intent: CosignIntent },
/// Sign the slash report for this session.
///
/// This is sent by the Coordinator's Tributary scanner.
SignSlashReport { session: Session, report: Vec<Slash> },
}
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
pub struct PlanMeta {
pub session: Session,
pub id: [u8; 32],
}
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
// This set of messages is sent entirely and solely by serai-processor-bin's implementation of
// the signers::Coordinator trait.
// TODO: Move message creation into serai-processor-signers
#[derive(Clone, Debug, BorshSerialize, BorshDeserialize)]
pub enum ProcessorMessage {
CosignedBlock { block_number: u64, block: [u8; 32], signature: Vec<u8> },
CosignedBlock { cosign: SignedCosign },
SignedBatch { batch: SignedBatch },
SubstrateBlockAck { block: u64, plans: Vec<PlanMeta> },
SignedSlashReport { session: Session, signature: Vec<u8> },
}
}
@@ -231,17 +221,16 @@ pub mod substrate {
},
}
#[derive(Clone, PartialEq, Eq, Debug)]
pub enum ProcessorMessage {}
impl BorshSerialize for ProcessorMessage {
fn serialize<W: borsh::io::Write>(&self, _writer: &mut W) -> borsh::io::Result<()> {
unimplemented!()
}
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
pub struct PlanMeta {
pub session: Session,
pub transaction_plan_id: [u8; 32],
}
impl BorshDeserialize for ProcessorMessage {
fn deserialize_reader<R: borsh::io::Read>(_reader: &mut R) -> borsh::io::Result<Self> {
unimplemented!()
}
#[derive(Clone, Debug, BorshSerialize, BorshDeserialize)]
pub enum ProcessorMessage {
// TODO: Have the processor send this
SubstrateBlockAck { block: [u8; 32], plans: Vec<PlanMeta> },
}
}
@@ -268,7 +257,7 @@ impl_from!(sign, CoordinatorMessage, Sign);
impl_from!(coordinator, CoordinatorMessage, Coordinator);
impl_from!(substrate, CoordinatorMessage, Substrate);
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
#[derive(Clone, Debug, BorshSerialize, BorshDeserialize)]
pub enum ProcessorMessage {
KeyGen(key_gen::ProcessorMessage),
Sign(sign::ProcessorMessage),
@@ -331,8 +320,8 @@ impl CoordinatorMessage {
CoordinatorMessage::Coordinator(msg) => {
let (sub, id) = match msg {
// We only cosign a block once, and Reattempt is a separate message
coordinator::CoordinatorMessage::CosignSubstrateBlock { block_number, .. } => {
(0, block_number.encode())
coordinator::CoordinatorMessage::CosignSubstrateBlock { intent, .. } => {
(0, intent.block_number.encode())
}
// We only sign one slash report, and Reattempt is a separate message
coordinator::CoordinatorMessage::SignSlashReport { session, .. } => (1, session.encode()),
@@ -404,17 +393,26 @@ impl ProcessorMessage {
}
ProcessorMessage::Coordinator(msg) => {
let (sub, id) = match msg {
coordinator::ProcessorMessage::CosignedBlock { block, .. } => (0, block.encode()),
coordinator::ProcessorMessage::CosignedBlock { cosign } => {
(0, cosign.cosign.block_hash.encode())
}
coordinator::ProcessorMessage::SignedBatch { batch, .. } => (1, batch.batch.id.encode()),
coordinator::ProcessorMessage::SubstrateBlockAck { block, .. } => (2, block.encode()),
coordinator::ProcessorMessage::SignedSlashReport { session, .. } => (3, session.encode()),
coordinator::ProcessorMessage::SignedSlashReport { session, .. } => (2, session.encode()),
};
let mut res = vec![PROCESSOR_UID, TYPE_COORDINATOR_UID, sub];
res.extend(&id);
res
}
ProcessorMessage::Substrate(_) => panic!("requesting intent for empty message type"),
ProcessorMessage::Substrate(msg) => {
let (sub, id) = match msg {
substrate::ProcessorMessage::SubstrateBlockAck { block, .. } => (0, block.encode()),
};
let mut res = vec![PROCESSOR_UID, TYPE_SUBSTRATE_UID, sub];
res.extend(&id);
res
}
}
}
}

View File

@@ -19,7 +19,6 @@ workspace = true
[dependencies]
hex = "0.4"
async-trait = "0.1"
zeroize = { version = "1", default-features = false }
rand_core = { version = "0.6", default-features = false }

View File

@@ -19,8 +19,6 @@ workspace = true
[dependencies]
hex = "0.4"
async-trait = "0.1"
zeroize = { version = "1", default-features = false }
rand_core = { version = "0.6", default-features = false }