mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-12 05:59:23 +00:00
Compare commits
6 Commits
f501d46d44
...
0ce9aad9b2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0ce9aad9b2 | ||
|
|
e35aa04afb | ||
|
|
e7de5125a2 | ||
|
|
158140c3a7 | ||
|
|
df9a9adaa8 | ||
|
|
d854807edd |
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -8363,7 +8363,6 @@ dependencies = [
|
||||
"serai-task",
|
||||
"tokio",
|
||||
"tributary-sdk",
|
||||
"void",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
@@ -8402,7 +8401,6 @@ dependencies = [
|
||||
name = "serai-coordinator-tests"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"blake2",
|
||||
"borsh",
|
||||
"ciphersuite",
|
||||
@@ -8605,7 +8603,6 @@ dependencies = [
|
||||
name = "serai-full-stack-tests"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bitcoin-serai",
|
||||
"curve25519-dalek",
|
||||
"dockertest",
|
||||
@@ -9001,6 +8998,7 @@ dependencies = [
|
||||
"hex",
|
||||
"parity-scale-codec",
|
||||
"serai-coins-primitives",
|
||||
"serai-cosign",
|
||||
"serai-in-instructions-primitives",
|
||||
"serai-primitives",
|
||||
"serai-validator-sets-primitives",
|
||||
|
||||
@@ -88,7 +88,6 @@ impl<D: Db> ContinuallyRan for CosignIntendTask<D> {
|
||||
}
|
||||
let block_hash = block.hash();
|
||||
SubstrateBlockHash::set(&mut txn, block_number, &block_hash);
|
||||
SubstrateBlockNumber::set(&mut txn, block_hash, &block_number);
|
||||
|
||||
let global_session_for_this_block = LatestGlobalSessionIntended::get(&txn);
|
||||
|
||||
|
||||
@@ -128,7 +128,6 @@ create_db! {
|
||||
|
||||
// An index of Substrate blocks
|
||||
SubstrateBlockHash: (block_number: u64) -> [u8; 32],
|
||||
SubstrateBlockNumber: (block_hash: [u8; 32]) -> u64,
|
||||
// A mapping from a global session's ID to its relevant information.
|
||||
GlobalSessions: (global_session: [u8; 32]) -> GlobalSession,
|
||||
// The last block to be cosigned by a global session.
|
||||
@@ -229,6 +228,43 @@ pub trait RequestNotableCosigns: 'static + Send {
|
||||
#[derive(Debug)]
|
||||
pub struct Faulted;
|
||||
|
||||
/// An error incurred while intaking a cosign.
|
||||
#[derive(Debug)]
|
||||
pub enum IntakeCosignError {
|
||||
/// Cosign is for a not-yet-indexed block
|
||||
NotYetIndexedBlock,
|
||||
/// A later cosign for this cosigner has already been handled
|
||||
StaleCosign,
|
||||
/// The cosign's global session isn't recognized
|
||||
UnrecognizedGlobalSession,
|
||||
/// The cosign is for a block before its global session starts
|
||||
BeforeGlobalSessionStart,
|
||||
/// The cosign is for a block after its global session ends
|
||||
AfterGlobalSessionEnd,
|
||||
/// The cosign's signing network wasn't a participant in this global session
|
||||
NonParticipatingNetwork,
|
||||
/// The cosign had an invalid signature
|
||||
InvalidSignature,
|
||||
/// The cosign is for a global session which has yet to have its declaration block cosigned
|
||||
FutureGlobalSession,
|
||||
}
|
||||
|
||||
impl IntakeCosignError {
|
||||
/// If this error is temporal to the local view
|
||||
pub fn temporal(&self) -> bool {
|
||||
match self {
|
||||
IntakeCosignError::NotYetIndexedBlock |
|
||||
IntakeCosignError::StaleCosign |
|
||||
IntakeCosignError::UnrecognizedGlobalSession |
|
||||
IntakeCosignError::FutureGlobalSession => true,
|
||||
IntakeCosignError::BeforeGlobalSessionStart |
|
||||
IntakeCosignError::AfterGlobalSessionEnd |
|
||||
IntakeCosignError::NonParticipatingNetwork |
|
||||
IntakeCosignError::InvalidSignature => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The interface to manage cosigning with.
|
||||
pub struct Cosigning<D: Db> {
|
||||
db: D,
|
||||
@@ -282,13 +318,6 @@ impl<D: Db> Cosigning<D> {
|
||||
))
|
||||
}
|
||||
|
||||
/// Fetch a finalized block's number by its hash.
|
||||
///
|
||||
/// This block is not guaranteed to be cosigned.
|
||||
pub fn finalized_block_number(getter: &impl Get, block_hash: [u8; 32]) -> Option<u64> {
|
||||
SubstrateBlockNumber::get(getter, block_hash)
|
||||
}
|
||||
|
||||
/// Fetch the notable cosigns for a global session in order to respond to requests.
|
||||
///
|
||||
/// If this global session hasn't produced any notable cosigns, this will return the latest
|
||||
@@ -335,25 +364,15 @@ impl<D: Db> Cosigning<D> {
|
||||
}
|
||||
|
||||
/// Intake a cosign.
|
||||
///
|
||||
/// - Returns Err(_) if there was an error trying to validate the cosign.
|
||||
/// - Returns Ok(true) if the cosign was successfully handled or could not be handled at this
|
||||
/// time.
|
||||
/// - Returns Ok(false) if the cosign was invalid.
|
||||
//
|
||||
// We collapse a cosign which shouldn't be handled yet into a valid cosign (`Ok(true)`) as we
|
||||
// assume we'll either explicitly request it if we need it or we'll naturally see it (or a later,
|
||||
// more relevant, cosign) again.
|
||||
//
|
||||
// Takes `&mut self` as this should only be called once at any given moment.
|
||||
// TODO: Don't overload bool here
|
||||
pub fn intake_cosign(&mut self, signed_cosign: &SignedCosign) -> Result<bool, String> {
|
||||
pub fn intake_cosign(&mut self, signed_cosign: &SignedCosign) -> Result<(), IntakeCosignError> {
|
||||
let cosign = &signed_cosign.cosign;
|
||||
let network = cosign.cosigner;
|
||||
|
||||
// Check our indexed blockchain includes a block with this block number
|
||||
let Some(our_block_hash) = SubstrateBlockHash::get(&self.db, cosign.block_number) else {
|
||||
return Ok(true);
|
||||
Err(IntakeCosignError::NotYetIndexedBlock)?
|
||||
};
|
||||
let faulty = cosign.block_hash != our_block_hash;
|
||||
|
||||
@@ -363,20 +382,19 @@ impl<D: Db> Cosigning<D> {
|
||||
NetworksLatestCosignedBlock::get(&self.db, cosign.global_session, network)
|
||||
{
|
||||
if existing.cosign.block_number >= cosign.block_number {
|
||||
return Ok(true);
|
||||
Err(IntakeCosignError::StaleCosign)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let Some(global_session) = GlobalSessions::get(&self.db, cosign.global_session) else {
|
||||
// Unrecognized global session
|
||||
return Ok(true);
|
||||
Err(IntakeCosignError::UnrecognizedGlobalSession)?
|
||||
};
|
||||
|
||||
// Check the cosigned block number is in range to the global session
|
||||
if cosign.block_number < global_session.start_block_number {
|
||||
// Cosign is for a block predating the global session
|
||||
return Ok(false);
|
||||
Err(IntakeCosignError::BeforeGlobalSessionStart)?;
|
||||
}
|
||||
if !faulty {
|
||||
// This prevents a malicious validator set, on the same chain, from producing a cosign after
|
||||
@@ -384,7 +402,7 @@ impl<D: Db> Cosigning<D> {
|
||||
if let Some(last_block) = GlobalSessionsLastBlock::get(&self.db, cosign.global_session) {
|
||||
if cosign.block_number > last_block {
|
||||
// Cosign is for a block after the last block this global session should have signed
|
||||
return Ok(false);
|
||||
Err(IntakeCosignError::AfterGlobalSessionEnd)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -393,13 +411,13 @@ impl<D: Db> Cosigning<D> {
|
||||
{
|
||||
let key = Public::from({
|
||||
let Some(key) = global_session.keys.get(&network) else {
|
||||
return Ok(false);
|
||||
Err(IntakeCosignError::NonParticipatingNetwork)?
|
||||
};
|
||||
*key
|
||||
});
|
||||
|
||||
if !signed_cosign.verify_signature(key) {
|
||||
return Ok(false);
|
||||
Err(IntakeCosignError::InvalidSignature)?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -415,7 +433,7 @@ impl<D: Db> Cosigning<D> {
|
||||
// block declaring it was cosigned
|
||||
if (global_session.start_block_number - 1) > latest_cosigned_block_number {
|
||||
drop(txn);
|
||||
return Ok(true);
|
||||
return Err(IntakeCosignError::FutureGlobalSession);
|
||||
}
|
||||
|
||||
// This is safe as it's in-range and newer, as prior checked since it isn't faulty
|
||||
@@ -429,9 +447,10 @@ impl<D: Db> Cosigning<D> {
|
||||
|
||||
let mut weight_cosigned = 0;
|
||||
for fault in &faults {
|
||||
let Some(stake) = global_session.stakes.get(&fault.cosign.cosigner) else {
|
||||
Err("cosigner with recognized key didn't have a stake entry saved".to_string())?
|
||||
};
|
||||
let stake = global_session
|
||||
.stakes
|
||||
.get(&fault.cosign.cosigner)
|
||||
.expect("cosigner with recognized key didn't have a stake entry saved");
|
||||
weight_cosigned += stake;
|
||||
}
|
||||
|
||||
@@ -443,7 +462,7 @@ impl<D: Db> Cosigning<D> {
|
||||
}
|
||||
|
||||
txn.commit();
|
||||
Ok(true)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Receive intended cosigns to produce for this ValidatorSet.
|
||||
|
||||
@@ -33,7 +33,6 @@ serai-client = { path = "../../../substrate/client", default-features = false, f
|
||||
serai-cosign = { path = "../../cosign" }
|
||||
tributary-sdk = { path = "../../tributary-sdk" }
|
||||
|
||||
void = { version = "1", default-features = false }
|
||||
futures-util = { version = "0.3", default-features = false, features = ["std"] }
|
||||
tokio = { version = "1", default-features = false, features = ["sync"] }
|
||||
libp2p = { version = "0.52", default-features = false, features = ["tokio", "tcp", "noise", "yamux", "ping", "request-response", "gossipsub", "macros"] }
|
||||
|
||||
@@ -225,8 +225,8 @@ impl SwarmTask {
|
||||
SwarmEvent::Behaviour(
|
||||
BehaviorEvent::AllowList(event) | BehaviorEvent::ConnectionLimits(event)
|
||||
) => {
|
||||
// Ensure these are unreachable cases, not actual events
|
||||
let _: void::Void = event;
|
||||
// This *is* an exhaustive match as these events are empty enums
|
||||
match event {}
|
||||
}
|
||||
SwarmEvent::Behaviour(
|
||||
BehaviorEvent::Ping(ping::Event { peer: _, connection, result, })
|
||||
|
||||
@@ -8,9 +8,9 @@ use serai_client::{
|
||||
validator_sets::primitives::{Session, ValidatorSet},
|
||||
};
|
||||
|
||||
use serai_cosign::CosignIntent;
|
||||
|
||||
use serai_cosign::SignedCosign;
|
||||
use serai_coordinator_substrate::NewSetInformation;
|
||||
use serai_coordinator_tributary::Transaction;
|
||||
|
||||
#[cfg(all(feature = "parity-db", not(feature = "rocksdb")))]
|
||||
pub(crate) type Db = serai_db::ParityDb;
|
||||
@@ -66,14 +66,48 @@ pub(crate) fn prune_tributary_db(set: ValidatorSet) {
|
||||
|
||||
create_db! {
|
||||
Coordinator {
|
||||
// The currently active Tributaries
|
||||
ActiveTributaries: () -> Vec<NewSetInformation>,
|
||||
// The latest Tributary to have been retired for a network
|
||||
// Since Tributaries are retired sequentially, this is informative to if any Tributary has been
|
||||
// retired
|
||||
RetiredTributary: (network: NetworkId) -> Session,
|
||||
// The last handled message from a Processor
|
||||
LastProcessorMessage: (network: NetworkId) -> u64,
|
||||
// Cosigns we produced and tried to intake yet incurred an error while doing so
|
||||
ErroneousCosigns: () -> Vec<SignedCosign>,
|
||||
}
|
||||
}
|
||||
|
||||
db_channel! {
|
||||
Coordinator {
|
||||
// Cosigns we produced
|
||||
SignedCosigns: () -> SignedCosign,
|
||||
// Tributaries to clean up upon reboot
|
||||
TributaryCleanup: () -> ValidatorSet,
|
||||
PendingCosigns: (set: ValidatorSet) -> CosignIntent,
|
||||
}
|
||||
}
|
||||
|
||||
mod _internal_db {
|
||||
use super::*;
|
||||
|
||||
db_channel! {
|
||||
Coordinator {
|
||||
// Tributary transactions to publish
|
||||
TributaryTransactions: (set: ValidatorSet) -> Transaction,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct TributaryTransactions;
|
||||
impl TributaryTransactions {
|
||||
pub(crate) fn send(txn: &mut impl DbTxn, set: ValidatorSet, tx: &Transaction) {
|
||||
// If this set has yet to be retired, send this transaction
|
||||
if RetiredTributary::get(txn, set.network).map(|session| session.0) < Some(set.session.0) {
|
||||
_internal_db::TributaryTransactions::send(txn, set, tx);
|
||||
}
|
||||
}
|
||||
pub(crate) fn try_recv(txn: &mut impl DbTxn, set: ValidatorSet) -> Option<Transaction> {
|
||||
_internal_db::TributaryTransactions::try_recv(txn, set)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use core::{ops::Deref, time::Duration};
|
||||
use std::{sync::Arc, time::Instant};
|
||||
use std::{sync::Arc, collections::HashMap, time::Instant};
|
||||
|
||||
use zeroize::{Zeroize, Zeroizing};
|
||||
use rand_core::{RngCore, OsRng};
|
||||
@@ -9,16 +9,22 @@ use ciphersuite::{
|
||||
Ciphersuite, Ristretto,
|
||||
};
|
||||
|
||||
use borsh::BorshDeserialize;
|
||||
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use serai_client::{primitives::PublicKey, Serai};
|
||||
use serai_client::{
|
||||
primitives::{NetworkId, PublicKey},
|
||||
validator_sets::primitives::ValidatorSet,
|
||||
Serai,
|
||||
};
|
||||
use message_queue::{Service, client::MessageQueue};
|
||||
|
||||
use serai_task::{Task, TaskHandle, ContinuallyRan};
|
||||
|
||||
use serai_cosign::{SignedCosign, Cosigning};
|
||||
use serai_cosign::{Faulted, SignedCosign, Cosigning};
|
||||
use serai_coordinator_substrate::{CanonicalEventStream, EphemeralEventStream, SignSlashReport};
|
||||
use serai_coordinator_tributary::Transaction;
|
||||
use serai_coordinator_tributary::{Signed, Transaction, SubstrateBlockPlans};
|
||||
|
||||
mod db;
|
||||
use db::*;
|
||||
@@ -63,18 +69,60 @@ async fn serai() -> Arc<Serai> {
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn_cosigning(
|
||||
db: impl serai_db::Db,
|
||||
fn spawn_cosigning<D: serai_db::Db>(
|
||||
mut db: D,
|
||||
serai: Arc<Serai>,
|
||||
p2p: impl p2p::P2p,
|
||||
tasks_to_run_upon_cosigning: Vec<TaskHandle>,
|
||||
mut p2p_cosigns: mpsc::UnboundedReceiver<SignedCosign>,
|
||||
mut signed_cosigns: mpsc::UnboundedReceiver<SignedCosign>,
|
||||
) {
|
||||
let mut cosigning = Cosigning::spawn(db, serai, p2p.clone(), tasks_to_run_upon_cosigning);
|
||||
let mut cosigning = Cosigning::spawn(db.clone(), serai, p2p.clone(), tasks_to_run_upon_cosigning);
|
||||
tokio::spawn(async move {
|
||||
const COSIGN_LOOP_INTERVAL: Duration = Duration::from_secs(5);
|
||||
|
||||
let last_cosign_rebroadcast = Instant::now();
|
||||
loop {
|
||||
// Intake our own cosigns
|
||||
match Cosigning::<D>::latest_cosigned_block_number(&db) {
|
||||
Ok(latest_cosigned_block_number) => {
|
||||
let mut txn = db.txn();
|
||||
// The cosigns we prior tried to intake yet failed to
|
||||
let mut cosigns = ErroneousCosigns::get(&txn).unwrap_or(vec![]);
|
||||
// The cosigns we have yet to intake
|
||||
while let Some(cosign) = SignedCosigns::try_recv(&mut txn) {
|
||||
cosigns.push(cosign);
|
||||
}
|
||||
|
||||
let mut erroneous = vec![];
|
||||
for cosign in cosigns {
|
||||
// If this cosign is stale, move on
|
||||
if cosign.cosign.block_number <= latest_cosigned_block_number {
|
||||
continue;
|
||||
}
|
||||
|
||||
match cosigning.intake_cosign(&cosign) {
|
||||
// Publish this cosign
|
||||
Ok(()) => p2p.publish_cosign(cosign).await,
|
||||
Err(e) => {
|
||||
assert!(e.temporal(), "signed an invalid cosign: {e:?}");
|
||||
// Since this had a temporal error, queue it to try again later
|
||||
erroneous.push(cosign);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// Save the cosigns with temporal errors to the database
|
||||
ErroneousCosigns::set(&mut txn, &erroneous);
|
||||
|
||||
txn.commit();
|
||||
}
|
||||
Err(Faulted) => {
|
||||
// We don't panic here as the following code rebroadcasts our cosigns which is
|
||||
// necessary to inform other coordinators of the faulty cosigns
|
||||
log::error!("cosigning faulted");
|
||||
}
|
||||
}
|
||||
|
||||
let time_till_cosign_rebroadcast = (last_cosign_rebroadcast +
|
||||
serai_cosign::BROADCAST_FREQUENCY)
|
||||
.saturating_duration_since(Instant::now());
|
||||
@@ -86,19 +134,134 @@ fn spawn_cosigning(
|
||||
}
|
||||
cosign = p2p_cosigns.recv() => {
|
||||
let cosign = cosign.expect("p2p cosigns channel was dropped?");
|
||||
let _: Result<_, _> = cosigning.intake_cosign(&cosign);
|
||||
}
|
||||
cosign = signed_cosigns.recv() => {
|
||||
let cosign = cosign.expect("signed cosigns channel was dropped?");
|
||||
// TODO: Handle this error
|
||||
let _: Result<_, _> = cosigning.intake_cosign(&cosign);
|
||||
p2p.publish_cosign(cosign).await;
|
||||
if cosigning.intake_cosign(&cosign).is_ok() {
|
||||
p2p.publish_cosign(cosign).await;
|
||||
}
|
||||
}
|
||||
// Make sure this loop runs at least this often
|
||||
() = tokio::time::sleep(COSIGN_LOOP_INTERVAL) => {}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn handle_processor_messages(
|
||||
mut db: impl serai_db::Db,
|
||||
message_queue: Arc<MessageQueue>,
|
||||
network: NetworkId,
|
||||
) {
|
||||
loop {
|
||||
let (msg_id, msg) = {
|
||||
let msg = message_queue.next(Service::Processor(network)).await;
|
||||
// Check this message's sender is as expected
|
||||
assert_eq!(msg.from, Service::Processor(network));
|
||||
|
||||
// Check this message's ID is as expected
|
||||
let last = LastProcessorMessage::get(&db, network);
|
||||
let next = last.map(|id| id + 1).unwrap_or(0);
|
||||
// This should either be the last message's ID, if we committed but didn't send our ACK, or
|
||||
// the expected next message's ID
|
||||
assert!((Some(msg.id) == last) || (msg.id == next));
|
||||
|
||||
// TODO: Check msg.sig
|
||||
|
||||
// If this is the message we already handled, and just failed to ACK, ACK it now and move on
|
||||
if Some(msg.id) == last {
|
||||
message_queue.ack(Service::Processor(network), msg.id).await;
|
||||
continue;
|
||||
}
|
||||
|
||||
(msg.id, messages::ProcessorMessage::deserialize(&mut msg.msg.as_slice()).unwrap())
|
||||
};
|
||||
|
||||
let mut txn = db.txn();
|
||||
|
||||
match msg {
|
||||
messages::ProcessorMessage::KeyGen(msg) => match msg {
|
||||
messages::key_gen::ProcessorMessage::Participation { session, participation } => {
|
||||
let set = ValidatorSet { network, session };
|
||||
TributaryTransactions::send(
|
||||
&mut txn,
|
||||
set,
|
||||
&Transaction::DkgParticipation { participation, signed: Signed::default() },
|
||||
);
|
||||
}
|
||||
messages::key_gen::ProcessorMessage::GeneratedKeyPair {
|
||||
session,
|
||||
substrate_key,
|
||||
network_key,
|
||||
} => todo!("TODO Transaction::DkgConfirmationPreprocess"),
|
||||
messages::key_gen::ProcessorMessage::Blame { session, participant } => {
|
||||
let set = ValidatorSet { network, session };
|
||||
TributaryTransactions::send(
|
||||
&mut txn,
|
||||
set,
|
||||
&Transaction::RemoveParticipant {
|
||||
participant: todo!("TODO"),
|
||||
signed: Signed::default(),
|
||||
},
|
||||
);
|
||||
}
|
||||
},
|
||||
messages::ProcessorMessage::Sign(msg) => match msg {
|
||||
messages::sign::ProcessorMessage::InvalidParticipant { session, participant } => {
|
||||
let set = ValidatorSet { network, session };
|
||||
TributaryTransactions::send(
|
||||
&mut txn,
|
||||
set,
|
||||
&Transaction::RemoveParticipant {
|
||||
participant: todo!("TODO"),
|
||||
signed: Signed::default(),
|
||||
},
|
||||
);
|
||||
}
|
||||
messages::sign::ProcessorMessage::Preprocesses { id, preprocesses } => {
|
||||
todo!("TODO Transaction::Batch + Transaction::Sign")
|
||||
}
|
||||
messages::sign::ProcessorMessage::Shares { id, shares } => todo!("TODO Transaction::Sign"),
|
||||
},
|
||||
messages::ProcessorMessage::Coordinator(msg) => match msg {
|
||||
messages::coordinator::ProcessorMessage::CosignedBlock { cosign } => {
|
||||
SignedCosigns::send(&mut txn, &cosign);
|
||||
}
|
||||
messages::coordinator::ProcessorMessage::SignedBatch { batch } => {
|
||||
todo!("TODO Save to DB, have task read from DB and publish to Serai")
|
||||
}
|
||||
messages::coordinator::ProcessorMessage::SignedSlashReport { session, signature } => {
|
||||
todo!("TODO Save to DB, have task read from DB and publish to Serai")
|
||||
}
|
||||
},
|
||||
messages::ProcessorMessage::Substrate(msg) => match msg {
|
||||
messages::substrate::ProcessorMessage::SubstrateBlockAck { block, plans } => {
|
||||
let mut by_session = HashMap::new();
|
||||
for plan in plans {
|
||||
by_session
|
||||
.entry(plan.session)
|
||||
.or_insert_with(|| Vec::with_capacity(1))
|
||||
.push(plan.transaction_plan_id);
|
||||
}
|
||||
for (session, plans) in by_session {
|
||||
let set = ValidatorSet { network, session };
|
||||
SubstrateBlockPlans::set(&mut txn, set, block, &plans);
|
||||
TributaryTransactions::send(
|
||||
&mut txn,
|
||||
set,
|
||||
&Transaction::SubstrateBlock { hash: block },
|
||||
);
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
// Mark this as the last handled message
|
||||
LastProcessorMessage::set(&mut txn, network, &msg_id);
|
||||
// Commit the txn
|
||||
txn.commit();
|
||||
// Now that we won't handle this message again, acknowledge it so we won't see it again
|
||||
message_queue.ack(Service::Processor(network), msg_id).await;
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
// Override the panic handler with one which will panic if any tokio task panics
|
||||
@@ -148,6 +311,8 @@ async fn main() {
|
||||
prune_tributary_db(to_cleanup);
|
||||
// Drain the cosign intents created for this set
|
||||
while !Cosigning::<Db>::intended_cosigns(&mut txn, to_cleanup).is_empty() {}
|
||||
// Drain the transactions to publish for this set
|
||||
while TributaryTransactions::try_recv(&mut txn, to_cleanup).is_some() {}
|
||||
// Remove the SignSlashReport notification
|
||||
SignSlashReport::try_recv(&mut txn, to_cleanup);
|
||||
}
|
||||
@@ -217,7 +382,6 @@ async fn main() {
|
||||
);
|
||||
|
||||
// Spawn the cosign handler
|
||||
let (signed_cosigns_send, signed_cosigns_recv) = mpsc::unbounded_channel();
|
||||
spawn_cosigning(
|
||||
db.clone(),
|
||||
serai.clone(),
|
||||
@@ -225,7 +389,6 @@ async fn main() {
|
||||
// Run the Substrate scanners once we cosign new blocks
|
||||
vec![substrate_canonical_task, substrate_ephemeral_task],
|
||||
p2p_cosigns_recv,
|
||||
signed_cosigns_recv,
|
||||
);
|
||||
|
||||
// Spawn all Tributaries on-disk
|
||||
@@ -254,7 +417,14 @@ async fn main() {
|
||||
.continually_run(substrate_task_def, vec![]),
|
||||
);
|
||||
|
||||
// TODO: Handle processor messages
|
||||
// Handle all of the Processors' messages
|
||||
for network in serai_client::primitives::NETWORKS {
|
||||
if network == NetworkId::Serai {
|
||||
continue;
|
||||
}
|
||||
tokio::spawn(handle_processor_messages(db.clone(), message_queue.clone(), network));
|
||||
}
|
||||
|
||||
todo!("TODO")
|
||||
// Run the spawned tasks ad-infinitum
|
||||
core::future::pending().await
|
||||
}
|
||||
|
||||
@@ -69,8 +69,7 @@ impl<P: P2p> ContinuallyRan for SubstrateTask<P> {
|
||||
intent: msg.intent(),
|
||||
};
|
||||
let msg = borsh::to_vec(&msg).unwrap();
|
||||
// TODO: Make this fallible
|
||||
self.message_queue.queue(metadata, msg).await;
|
||||
self.message_queue.queue(metadata, msg).await?;
|
||||
txn.commit();
|
||||
made_progress = true;
|
||||
}
|
||||
@@ -132,8 +131,7 @@ impl<P: P2p> ContinuallyRan for SubstrateTask<P> {
|
||||
intent: msg.intent(),
|
||||
};
|
||||
let msg = borsh::to_vec(&msg).unwrap();
|
||||
// TODO: Make this fallible
|
||||
self.message_queue.queue(metadata, msg).await;
|
||||
self.message_queue.queue(metadata, msg).await?;
|
||||
|
||||
// Commit the transaction for all of this
|
||||
txn.commit();
|
||||
|
||||
@@ -8,27 +8,68 @@ use ciphersuite::{Ciphersuite, Ristretto};
|
||||
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use serai_db::{DbTxn, Db as DbTrait};
|
||||
use serai_db::{Get, DbTxn, Db as DbTrait, create_db, db_channel};
|
||||
|
||||
use scale::Encode;
|
||||
use serai_client::validator_sets::primitives::ValidatorSet;
|
||||
|
||||
use tributary_sdk::{TransactionError, ProvidedError, Tributary};
|
||||
use tributary_sdk::{TransactionKind, TransactionError, ProvidedError, TransactionTrait, Tributary};
|
||||
|
||||
use serai_task::{Task, TaskHandle, ContinuallyRan};
|
||||
|
||||
use message_queue::{Service, Metadata, client::MessageQueue};
|
||||
|
||||
use serai_cosign::Cosigning;
|
||||
use serai_cosign::{Faulted, CosignIntent, Cosigning};
|
||||
use serai_coordinator_substrate::{NewSetInformation, SignSlashReport};
|
||||
use serai_coordinator_tributary::{Transaction, ProcessorMessages, ScanTributaryTask};
|
||||
use serai_coordinator_tributary::{Transaction, ProcessorMessages, CosignIntents, ScanTributaryTask};
|
||||
use serai_coordinator_p2p::P2p;
|
||||
|
||||
use crate::Db;
|
||||
use crate::{Db, TributaryTransactions};
|
||||
|
||||
db_channel! {
|
||||
Coordinator {
|
||||
PendingCosigns: (set: ValidatorSet) -> CosignIntent,
|
||||
}
|
||||
}
|
||||
|
||||
/// Provide a Provided Transaction to the Tributary.
|
||||
///
|
||||
/// This is not a well-designed function. This is specific to the context in which its called,
|
||||
/// within this file. It should only be considered an internal helper for this domain alone.
|
||||
async fn provide_transaction<TD: DbTrait, P: P2p>(
|
||||
set: ValidatorSet,
|
||||
tributary: &Tributary<TD, Transaction, P>,
|
||||
tx: Transaction,
|
||||
) {
|
||||
match tributary.provide_transaction(tx.clone()).await {
|
||||
// The Tributary uses its own DB, so we may provide this multiple times if we reboot before
|
||||
// committing the txn which provoked this
|
||||
Ok(()) | Err(ProvidedError::AlreadyProvided) => {}
|
||||
Err(ProvidedError::NotProvided) => {
|
||||
panic!("providing a Transaction which wasn't a Provided transaction: {tx:?}");
|
||||
}
|
||||
Err(ProvidedError::InvalidProvided(e)) => {
|
||||
panic!("providing an invalid Provided transaction, tx: {tx:?}, error: {e:?}")
|
||||
}
|
||||
// The Tributary's scan task won't advance if we don't have the Provided transactions
|
||||
// present on-chain, and this enters an infinite loop to block the calling task from
|
||||
// advancing
|
||||
Err(ProvidedError::LocalMismatchesOnChain) => loop {
|
||||
log::error!(
|
||||
"Tributary {:?} was supposed to provide {:?} but peers disagree, halting Tributary",
|
||||
set,
|
||||
tx,
|
||||
);
|
||||
// Print this every five minutes as this does need to be handled
|
||||
tokio::time::sleep(Duration::from_secs(5 * 60)).await;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Provides Cosign/Cosigned Transactions onto the Tributary.
|
||||
pub(crate) struct ProvideCosignCosignedTransactionsTask<CD: DbTrait, TD: DbTrait, P: P2p> {
|
||||
db: CD,
|
||||
tributary_db: TD,
|
||||
set: NewSetInformation,
|
||||
tributary: Tributary<TD, Transaction, P>,
|
||||
}
|
||||
@@ -36,40 +77,6 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
|
||||
for ProvideCosignCosignedTransactionsTask<CD, TD, P>
|
||||
{
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
/// Provide a Provided Transaction to the Tributary.
|
||||
///
|
||||
/// This is not a well-designed function. This is specific to the context in which its called,
|
||||
/// within this file. It should only be considered an internal helper for this domain alone.
|
||||
async fn provide_transaction<TD: DbTrait, P: P2p>(
|
||||
set: ValidatorSet,
|
||||
tributary: &Tributary<TD, Transaction, P>,
|
||||
tx: Transaction,
|
||||
) {
|
||||
match tributary.provide_transaction(tx.clone()).await {
|
||||
// The Tributary uses its own DB, so we may provide this multiple times if we reboot before
|
||||
// committing the txn which provoked this
|
||||
Ok(()) | Err(ProvidedError::AlreadyProvided) => {}
|
||||
Err(ProvidedError::NotProvided) => {
|
||||
panic!("providing a Transaction which wasn't a Provided transaction: {tx:?}");
|
||||
}
|
||||
Err(ProvidedError::InvalidProvided(e)) => {
|
||||
panic!("providing an invalid Provided transaction, tx: {tx:?}, error: {e:?}")
|
||||
}
|
||||
Err(ProvidedError::LocalMismatchesOnChain) => loop {
|
||||
// The Tributary's scan task won't advance if we don't have the Provided transactions
|
||||
// present on-chain, and this enters an infinite loop to block the calling task from
|
||||
// advancing
|
||||
log::error!(
|
||||
"Tributary {:?} was supposed to provide {:?} but peers disagree, halting Tributary",
|
||||
set,
|
||||
tx,
|
||||
);
|
||||
// Print this every five minutes as this does need to be handled
|
||||
tokio::time::sleep(Duration::from_secs(5 * 60)).await;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
async move {
|
||||
let mut made_progress = false;
|
||||
|
||||
@@ -79,16 +86,27 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
|
||||
let mut txn = self.db.txn();
|
||||
|
||||
// Fetch the next cosign this tributary should handle
|
||||
let Some(cosign) = crate::PendingCosigns::try_recv(&mut txn, self.set.set) else { break };
|
||||
let Some(cosign) = PendingCosigns::try_recv(&mut txn, self.set.set) else { break };
|
||||
pending_notable_cosign = cosign.notable;
|
||||
|
||||
// If we (Serai) haven't cosigned this block, break as this is still pending
|
||||
let Ok(latest) = Cosigning::<CD>::latest_cosigned_block_number(&txn) else { break };
|
||||
let latest = match Cosigning::<CD>::latest_cosigned_block_number(&txn) {
|
||||
Ok(latest) => latest,
|
||||
Err(Faulted) => {
|
||||
log::error!("cosigning faulted");
|
||||
Err("cosigning faulted")?
|
||||
}
|
||||
};
|
||||
if latest < cosign.block_number {
|
||||
break;
|
||||
}
|
||||
|
||||
// Because we've cosigned it, provide the TX for that
|
||||
{
|
||||
let mut txn = self.tributary_db.txn();
|
||||
CosignIntents::provide(&mut txn, self.set.set, &cosign);
|
||||
txn.commit();
|
||||
}
|
||||
provide_transaction(
|
||||
self.set.set,
|
||||
&self.tributary,
|
||||
@@ -109,7 +127,7 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
|
||||
// intended_cosigns will only yield up to and including the next notable cosign
|
||||
for cosign in Cosigning::<CD>::intended_cosigns(&mut txn, self.set.set) {
|
||||
// Flag this cosign as pending
|
||||
crate::PendingCosigns::send(&mut txn, self.set.set, &cosign);
|
||||
PendingCosigns::send(&mut txn, self.set.set, &cosign);
|
||||
// Provide the transaction to queue it for work
|
||||
provide_transaction(
|
||||
self.set.set,
|
||||
@@ -127,6 +145,66 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
|
||||
}
|
||||
}
|
||||
|
||||
/// Adds all of the transactions sent via `TributaryTransactions`.
|
||||
pub(crate) struct AddTributaryTransactionsTask<CD: DbTrait, TD: DbTrait, P: P2p> {
|
||||
db: CD,
|
||||
tributary_db: TD,
|
||||
tributary: Tributary<TD, Transaction, P>,
|
||||
set: ValidatorSet,
|
||||
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
|
||||
}
|
||||
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for AddTributaryTransactionsTask<CD, TD, P> {
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
async move {
|
||||
let mut made_progress = false;
|
||||
loop {
|
||||
let mut txn = self.db.txn();
|
||||
let Some(mut tx) = TributaryTransactions::try_recv(&mut txn, self.set) else { break };
|
||||
|
||||
let kind = tx.kind();
|
||||
match kind {
|
||||
TransactionKind::Provided(_) => provide_transaction(self.set, &self.tributary, tx).await,
|
||||
TransactionKind::Unsigned | TransactionKind::Signed(_, _) => {
|
||||
// If this is a signed transaction, sign it
|
||||
if matches!(kind, TransactionKind::Signed(_, _)) {
|
||||
tx.sign(&mut OsRng, self.tributary.genesis(), &self.key);
|
||||
}
|
||||
|
||||
// Actually add the transaction
|
||||
// TODO: If this is a preprocess, make sure the topic has been recognized
|
||||
let res = self.tributary.add_transaction(tx.clone()).await;
|
||||
match &res {
|
||||
// Fresh publication, already published
|
||||
Ok(true | false) => {}
|
||||
Err(
|
||||
TransactionError::TooLargeTransaction |
|
||||
TransactionError::InvalidSigner |
|
||||
TransactionError::InvalidNonce |
|
||||
TransactionError::InvalidSignature |
|
||||
TransactionError::InvalidContent,
|
||||
) => {
|
||||
panic!("created an invalid transaction, tx: {tx:?}, err: {res:?}");
|
||||
}
|
||||
// We've published too many transactions recently
|
||||
// Drop this txn to try to publish it again later on a future iteration
|
||||
Err(TransactionError::TooManyInMempool) => {
|
||||
drop(txn);
|
||||
break;
|
||||
}
|
||||
// This isn't a Provided transaction so this should never be hit
|
||||
Err(TransactionError::ProvidedAddedToMempool) => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
made_progress = true;
|
||||
txn.commit();
|
||||
}
|
||||
Ok(made_progress)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Takes the messages from ScanTributaryTask and publishes them to the message-queue.
|
||||
pub(crate) struct TributaryProcessorMessagesTask<TD: DbTrait> {
|
||||
tributary_db: TD,
|
||||
@@ -146,8 +224,7 @@ impl<TD: DbTrait> ContinuallyRan for TributaryProcessorMessagesTask<TD> {
|
||||
intent: msg.intent(),
|
||||
};
|
||||
let msg = borsh::to_vec(&msg).unwrap();
|
||||
// TODO: Make this fallible
|
||||
self.message_queue.queue(metadata, msg).await;
|
||||
self.message_queue.queue(metadata, msg).await?;
|
||||
txn.commit();
|
||||
made_progress = true;
|
||||
}
|
||||
@@ -190,7 +267,10 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for SignSlashReportTask<CD
|
||||
}
|
||||
// We've published too many transactions recently
|
||||
// Drop this txn to try to publish it again later on a future iteration
|
||||
Err(TransactionError::TooManyInMempool) => return Ok(false),
|
||||
Err(TransactionError::TooManyInMempool) => {
|
||||
drop(txn);
|
||||
return Ok(false);
|
||||
}
|
||||
// This isn't a Provided transaction so this should never be hit
|
||||
Err(TransactionError::ProvidedAddedToMempool) => unreachable!(),
|
||||
}
|
||||
@@ -294,6 +374,7 @@ pub(crate) async fn spawn_tributary<P: P2p>(
|
||||
tokio::spawn(
|
||||
(ProvideCosignCosignedTransactionsTask {
|
||||
db: db.clone(),
|
||||
tributary_db: tributary_db.clone(),
|
||||
set: set.clone(),
|
||||
tributary: tributary.clone(),
|
||||
})
|
||||
@@ -314,7 +395,7 @@ pub(crate) async fn spawn_tributary<P: P2p>(
|
||||
// Spawn the scan task
|
||||
let (scan_tributary_task_def, scan_tributary_task) = Task::new();
|
||||
tokio::spawn(
|
||||
ScanTributaryTask::<_, _, P>::new(db.clone(), tributary_db.clone(), &set, reader)
|
||||
ScanTributaryTask::<_, P>::new(tributary_db.clone(), &set, reader)
|
||||
// This is the only handle for this TributaryProcessorMessagesTask, so when this task is
|
||||
// dropped, it will be too
|
||||
.continually_run(scan_tributary_task_def, vec![scan_tributary_messages_task]),
|
||||
@@ -325,14 +406,27 @@ pub(crate) async fn spawn_tributary<P: P2p>(
|
||||
tokio::spawn(
|
||||
(SignSlashReportTask {
|
||||
db: db.clone(),
|
||||
tributary_db,
|
||||
tributary_db: tributary_db.clone(),
|
||||
tributary: tributary.clone(),
|
||||
set: set.clone(),
|
||||
key: serai_key,
|
||||
key: serai_key.clone(),
|
||||
})
|
||||
.continually_run(sign_slash_report_task_def, vec![]),
|
||||
);
|
||||
|
||||
// Spawn the add transactions task
|
||||
let (add_tributary_transactions_task_def, add_tributary_transactions_task) = Task::new();
|
||||
tokio::spawn(
|
||||
(AddTributaryTransactionsTask {
|
||||
db: db.clone(),
|
||||
tributary_db,
|
||||
tributary: tributary.clone(),
|
||||
set: set.set,
|
||||
key: serai_key,
|
||||
})
|
||||
.continually_run(add_tributary_transactions_task_def, vec![]),
|
||||
);
|
||||
|
||||
// Whenever a new block occurs, immediately run the scan task
|
||||
// This function also preserves the ProvideCosignCosignedTransactionsTask handle until the
|
||||
// Tributary is retired, ensuring it isn't dropped prematurely and that the task don't run ad
|
||||
@@ -342,6 +436,10 @@ pub(crate) async fn spawn_tributary<P: P2p>(
|
||||
set.set,
|
||||
tributary,
|
||||
scan_tributary_task,
|
||||
vec![provide_cosign_cosigned_transactions_task, sign_slash_report_task],
|
||||
vec![
|
||||
provide_cosign_cosigned_transactions_task,
|
||||
sign_slash_report_task,
|
||||
add_tributary_transactions_task,
|
||||
],
|
||||
));
|
||||
}
|
||||
|
||||
@@ -9,6 +9,8 @@ use messages::sign::{VariantSignId, SignId};
|
||||
|
||||
use serai_db::*;
|
||||
|
||||
use serai_cosign::CosignIntent;
|
||||
|
||||
use crate::transaction::SigningProtocolRound;
|
||||
|
||||
/// A topic within the database which the group participates in
|
||||
@@ -187,6 +189,8 @@ create_db!(
|
||||
// The slash points a validator has accrued, with u32::MAX representing a fatal slash.
|
||||
SlashPoints: (set: ValidatorSet, validator: SeraiAddress) -> u32,
|
||||
|
||||
// The cosign intent for a Substrate block
|
||||
CosignIntents: (set: ValidatorSet, substrate_block_hash: [u8; 32]) -> CosignIntent,
|
||||
// The latest Substrate block to cosign.
|
||||
LatestSubstrateBlockToCosign: (set: ValidatorSet) -> [u8; 32],
|
||||
// The hash of the block we're actively cosigning.
|
||||
@@ -194,6 +198,9 @@ create_db!(
|
||||
// If this block has already been cosigned.
|
||||
Cosigned: (set: ValidatorSet, substrate_block_hash: [u8; 32]) -> (),
|
||||
|
||||
// The plans to whitelist upon a `Transaction::SubstrateBlock` being included on-chain.
|
||||
SubstrateBlockPlans: (set: ValidatorSet, substrate_block_hash: [u8; 32]) -> Vec<[u8; 32]>,
|
||||
|
||||
// The weight accumulated for a topic.
|
||||
AccumulatedWeight: (set: ValidatorSet, topic: Topic) -> u64,
|
||||
// The entries accumulated for a topic, by validator.
|
||||
|
||||
@@ -24,14 +24,13 @@ use tributary_sdk::{
|
||||
Transaction as TributaryTransaction, Block, TributaryReader, P2p,
|
||||
};
|
||||
|
||||
use serai_cosign::Cosigning;
|
||||
use serai_cosign::CosignIntent;
|
||||
use serai_coordinator_substrate::NewSetInformation;
|
||||
|
||||
use messages::sign::VariantSignId;
|
||||
|
||||
mod transaction;
|
||||
pub(crate) use transaction::{SigningProtocolRound, Signed};
|
||||
pub use transaction::Transaction;
|
||||
pub use transaction::{SigningProtocolRound, Signed, Transaction};
|
||||
|
||||
mod db;
|
||||
use db::*;
|
||||
@@ -45,17 +44,58 @@ impl ProcessorMessages {
|
||||
}
|
||||
}
|
||||
|
||||
struct ScanBlock<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> {
|
||||
/// The cosign intents.
|
||||
pub struct CosignIntents;
|
||||
impl CosignIntents {
|
||||
/// Provide a CosignIntent for this Tributary.
|
||||
///
|
||||
/// This must be done before the associated `Transaction::Cosign` is provided.
|
||||
pub fn provide(txn: &mut impl DbTxn, set: ValidatorSet, intent: &CosignIntent) {
|
||||
db::CosignIntents::set(txn, set, intent.block_hash, intent);
|
||||
}
|
||||
fn take(
|
||||
txn: &mut impl DbTxn,
|
||||
set: ValidatorSet,
|
||||
substrate_block_hash: [u8; 32],
|
||||
) -> Option<CosignIntent> {
|
||||
db::CosignIntents::take(txn, set, substrate_block_hash)
|
||||
}
|
||||
}
|
||||
|
||||
/// The plans to whitelist upon a `Transaction::SubstrateBlock` being included on-chain.
|
||||
pub struct SubstrateBlockPlans;
|
||||
impl SubstrateBlockPlans {
|
||||
/// Set the plans to whitelist upon the associated `Transaction::SubstrateBlock` being included
|
||||
/// on-chain.
|
||||
///
|
||||
/// This must be done before the associated `Transaction::Cosign` is provided.
|
||||
pub fn set(
|
||||
txn: &mut impl DbTxn,
|
||||
set: ValidatorSet,
|
||||
substrate_block_hash: [u8; 32],
|
||||
plans: &Vec<[u8; 32]>,
|
||||
) {
|
||||
db::SubstrateBlockPlans::set(txn, set, substrate_block_hash, &plans);
|
||||
}
|
||||
fn take(
|
||||
txn: &mut impl DbTxn,
|
||||
set: ValidatorSet,
|
||||
substrate_block_hash: [u8; 32],
|
||||
) -> Option<Vec<[u8; 32]>> {
|
||||
db::SubstrateBlockPlans::take(txn, set, substrate_block_hash)
|
||||
}
|
||||
}
|
||||
|
||||
struct ScanBlock<'a, TD: Db, TDT: DbTxn, P: P2p> {
|
||||
_td: PhantomData<TD>,
|
||||
_p2p: PhantomData<P>,
|
||||
cosign_db: &'a CD,
|
||||
tributary_txn: &'a mut TDT,
|
||||
set: ValidatorSet,
|
||||
validators: &'a [SeraiAddress],
|
||||
total_weight: u64,
|
||||
validator_weights: &'a HashMap<SeraiAddress, u64>,
|
||||
}
|
||||
impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> {
|
||||
impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
|
||||
fn potentially_start_cosign(&mut self) {
|
||||
// Don't start a new cosigning instance if we're actively running one
|
||||
if TributaryDb::actively_cosigning(self.tributary_txn, self.set).is_some() {
|
||||
@@ -74,20 +114,20 @@ impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> {
|
||||
return;
|
||||
}
|
||||
|
||||
let Some(substrate_block_number) =
|
||||
Cosigning::<CD>::finalized_block_number(self.cosign_db, latest_substrate_block_to_cosign)
|
||||
else {
|
||||
// This is a valid panic as we shouldn't be scanning this block if we didn't provide all
|
||||
// Provided transactions within it, and the block to cosign is a Provided transaction
|
||||
panic!("cosigning a block our cosigner didn't index")
|
||||
};
|
||||
let intent =
|
||||
CosignIntents::take(self.tributary_txn, self.set, latest_substrate_block_to_cosign)
|
||||
.expect("Transaction::Cosign locally provided but CosignIntents wasn't populated");
|
||||
assert_eq!(
|
||||
intent.block_hash, latest_substrate_block_to_cosign,
|
||||
"provided CosignIntent wasn't saved by its block hash"
|
||||
);
|
||||
|
||||
// Mark us as actively cosigning
|
||||
TributaryDb::start_cosigning(
|
||||
self.tributary_txn,
|
||||
self.set,
|
||||
latest_substrate_block_to_cosign,
|
||||
substrate_block_number,
|
||||
intent.block_number,
|
||||
);
|
||||
// Send the message for the processor to start signing
|
||||
TributaryDb::send_message(
|
||||
@@ -95,8 +135,7 @@ impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> {
|
||||
self.set,
|
||||
messages::coordinator::CoordinatorMessage::CosignSubstrateBlock {
|
||||
session: self.set.session,
|
||||
block_number: substrate_block_number,
|
||||
block: latest_substrate_block_to_cosign,
|
||||
intent,
|
||||
},
|
||||
);
|
||||
}
|
||||
@@ -206,11 +245,32 @@ impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> {
|
||||
}
|
||||
Transaction::SubstrateBlock { hash } => {
|
||||
// Whitelist all of the IDs this Substrate block causes to be signed
|
||||
todo!("TODO")
|
||||
let plans = SubstrateBlockPlans::take(self.tributary_txn, self.set, hash).expect(
|
||||
"Transaction::SubstrateBlock locally provided but SubstrateBlockPlans wasn't populated",
|
||||
);
|
||||
for plan in plans {
|
||||
TributaryDb::recognize_topic(
|
||||
self.tributary_txn,
|
||||
self.set,
|
||||
Topic::Sign {
|
||||
id: VariantSignId::Transaction(plan),
|
||||
attempt: 0,
|
||||
round: SigningProtocolRound::Preprocess,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
Transaction::Batch { hash } => {
|
||||
// Whitelist the signing of this batch, publishing our own preprocess
|
||||
todo!("TODO")
|
||||
// Whitelist the signing of this batch
|
||||
TributaryDb::recognize_topic(
|
||||
self.tributary_txn,
|
||||
self.set,
|
||||
Topic::Sign {
|
||||
id: VariantSignId::Batch(hash),
|
||||
attempt: 0,
|
||||
round: SigningProtocolRound::Preprocess,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
Transaction::SlashReport { slash_points, signed } => {
|
||||
@@ -411,8 +471,7 @@ impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> {
|
||||
}
|
||||
|
||||
/// The task to scan the Tributary, populating `ProcessorMessages`.
|
||||
pub struct ScanTributaryTask<CD: Db, TD: Db, P: P2p> {
|
||||
cosign_db: CD,
|
||||
pub struct ScanTributaryTask<TD: Db, P: P2p> {
|
||||
tributary_db: TD,
|
||||
set: ValidatorSet,
|
||||
validators: Vec<SeraiAddress>,
|
||||
@@ -422,10 +481,9 @@ pub struct ScanTributaryTask<CD: Db, TD: Db, P: P2p> {
|
||||
_p2p: PhantomData<P>,
|
||||
}
|
||||
|
||||
impl<CD: Db, TD: Db, P: P2p> ScanTributaryTask<CD, TD, P> {
|
||||
impl<TD: Db, P: P2p> ScanTributaryTask<TD, P> {
|
||||
/// Create a new instance of this task.
|
||||
pub fn new(
|
||||
cosign_db: CD,
|
||||
tributary_db: TD,
|
||||
new_set: &NewSetInformation,
|
||||
tributary: TributaryReader<TD, Transaction>,
|
||||
@@ -442,7 +500,6 @@ impl<CD: Db, TD: Db, P: P2p> ScanTributaryTask<CD, TD, P> {
|
||||
}
|
||||
|
||||
ScanTributaryTask {
|
||||
cosign_db,
|
||||
tributary_db,
|
||||
set: new_set.set,
|
||||
validators,
|
||||
@@ -454,7 +511,7 @@ impl<CD: Db, TD: Db, P: P2p> ScanTributaryTask<CD, TD, P> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<CD: Db, TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<CD, TD, P> {
|
||||
impl<TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<TD, P> {
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
async move {
|
||||
let (mut last_block_number, mut last_block_hash) =
|
||||
@@ -486,7 +543,6 @@ impl<CD: Db, TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<CD, TD, P> {
|
||||
(ScanBlock {
|
||||
_td: PhantomData::<TD>,
|
||||
_p2p: PhantomData::<P>,
|
||||
cosign_db: &self.cosign_db,
|
||||
tributary_txn: &mut tributary_txn,
|
||||
set: self.set,
|
||||
validators: &self.validators,
|
||||
|
||||
@@ -64,22 +64,20 @@ impl MessageQueue {
|
||||
Self::new(service, url, priv_key)
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
async fn send(socket: &mut TcpStream, msg: MessageQueueRequest) -> bool {
|
||||
async fn send(socket: &mut TcpStream, msg: MessageQueueRequest) -> Result<(), String> {
|
||||
let msg = borsh::to_vec(&msg).unwrap();
|
||||
let Ok(()) = socket.write_all(&u32::try_from(msg.len()).unwrap().to_le_bytes()).await else {
|
||||
log::warn!("couldn't send the message len");
|
||||
return false;
|
||||
match socket.write_all(&u32::try_from(msg.len()).unwrap().to_le_bytes()).await {
|
||||
Ok(()) => {}
|
||||
Err(e) => Err(format!("couldn't send the message len: {e:?}"))?,
|
||||
};
|
||||
let Ok(()) = socket.write_all(&msg).await else {
|
||||
log::warn!("couldn't write the message");
|
||||
return false;
|
||||
};
|
||||
true
|
||||
match socket.write_all(&msg).await {
|
||||
Ok(()) => {}
|
||||
Err(e) => Err(format!("couldn't write the message: {e:?}"))?,
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn queue(&self, metadata: Metadata, msg: Vec<u8>) {
|
||||
// TODO: Should this use OsRng? Deterministic or deterministic + random may be better.
|
||||
pub async fn queue(&self, metadata: Metadata, msg: Vec<u8>) -> Result<(), String> {
|
||||
let nonce = Zeroizing::new(<Ristretto as Ciphersuite>::F::random(&mut OsRng));
|
||||
let nonce_pub = Ristretto::generator() * nonce.deref();
|
||||
let sig = SchnorrSignature::<Ristretto>::sign(
|
||||
@@ -97,6 +95,21 @@ impl MessageQueue {
|
||||
.serialize();
|
||||
|
||||
let msg = MessageQueueRequest::Queue { meta: metadata, msg, sig };
|
||||
|
||||
let mut socket = match TcpStream::connect(&self.url).await {
|
||||
Ok(socket) => socket,
|
||||
Err(e) => Err(format!("failed to connect to the message-queue service: {e:?}"))?,
|
||||
};
|
||||
Self::send(&mut socket, msg.clone()).await?;
|
||||
match socket.read_u8().await {
|
||||
Ok(1) => {}
|
||||
Ok(b) => Err(format!("message-queue didn't return for 1 for its ack, recieved: {b}"))?,
|
||||
Err(e) => Err(format!("failed to read the response from the message-queue service: {e:?}"))?,
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn queue_with_retry(&self, metadata: Metadata, msg: Vec<u8>) {
|
||||
let mut first = true;
|
||||
loop {
|
||||
// Sleep, so we don't hammer re-attempts
|
||||
@@ -105,14 +118,9 @@ impl MessageQueue {
|
||||
}
|
||||
first = false;
|
||||
|
||||
let Ok(mut socket) = TcpStream::connect(&self.url).await else { continue };
|
||||
if !Self::send(&mut socket, msg.clone()).await {
|
||||
continue;
|
||||
if self.queue(metadata.clone(), msg.clone()).await.is_ok() {
|
||||
break;
|
||||
}
|
||||
if socket.read_u8().await.ok() != Some(1) {
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -136,7 +144,7 @@ impl MessageQueue {
|
||||
log::trace!("opened socket for next");
|
||||
|
||||
loop {
|
||||
if !Self::send(&mut socket, msg.clone()).await {
|
||||
if Self::send(&mut socket, msg.clone()).await.is_err() {
|
||||
continue 'outer;
|
||||
}
|
||||
let status = match socket.read_u8().await {
|
||||
@@ -224,7 +232,7 @@ impl MessageQueue {
|
||||
first = false;
|
||||
|
||||
let Ok(mut socket) = TcpStream::connect(&self.url).await else { continue };
|
||||
if !Self::send(&mut socket, msg.clone()).await {
|
||||
if Self::send(&mut socket, msg.clone()).await.is_err() {
|
||||
continue;
|
||||
}
|
||||
if socket.read_u8().await.ok() != Some(1) {
|
||||
|
||||
@@ -103,6 +103,7 @@ impl Coordinator {
|
||||
});
|
||||
|
||||
// Spawn a task to send messages to the message-queue
|
||||
// TODO: Define a proper task for this and remove use of queue_with_retry
|
||||
tokio::spawn({
|
||||
let mut db = db.clone();
|
||||
async move {
|
||||
@@ -115,12 +116,12 @@ impl Coordinator {
|
||||
to: Service::Coordinator,
|
||||
intent: borsh::from_slice::<messages::ProcessorMessage>(&msg).unwrap().intent(),
|
||||
};
|
||||
message_queue.queue(metadata, msg).await;
|
||||
message_queue.queue_with_retry(metadata, msg).await;
|
||||
txn.commit();
|
||||
}
|
||||
None => {
|
||||
let _ =
|
||||
tokio::time::timeout(core::time::Duration::from_secs(60), sent_message_recv.recv())
|
||||
tokio::time::timeout(core::time::Duration::from_secs(6), sent_message_recv.recv())
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,3 +29,5 @@ serai-primitives = { path = "../../substrate/primitives", default-features = fal
|
||||
in-instructions-primitives = { package = "serai-in-instructions-primitives", path = "../../substrate/in-instructions/primitives", default-features = false, features = ["std", "borsh"] }
|
||||
coins-primitives = { package = "serai-coins-primitives", path = "../../substrate/coins/primitives", default-features = false, features = ["std", "borsh"] }
|
||||
validator-sets-primitives = { package = "serai-validator-sets-primitives", path = "../../substrate/validator-sets/primitives", default-features = false, features = ["std", "borsh"] }
|
||||
|
||||
serai-cosign = { path = "../../coordinator/cosign", default-features = false }
|
||||
|
||||
@@ -11,6 +11,8 @@ use validator_sets_primitives::{Session, KeyPair, Slash};
|
||||
use coins_primitives::OutInstructionWithBalance;
|
||||
use in_instructions_primitives::SignedBatch;
|
||||
|
||||
use serai_cosign::{CosignIntent, SignedCosign};
|
||||
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
|
||||
pub struct SubstrateContext {
|
||||
pub serai_time: u64,
|
||||
@@ -50,7 +52,8 @@ pub mod key_gen {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize)]
|
||||
// This set of messages is sent entirely and solely by serai-processor-key-gen.
|
||||
#[derive(Clone, BorshSerialize, BorshDeserialize)]
|
||||
pub enum ProcessorMessage {
|
||||
// Participated in the specified key generation protocol.
|
||||
Participation { session: Session, participation: Vec<u8> },
|
||||
@@ -141,7 +144,8 @@ pub mod sign {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
|
||||
// This set of messages is sent entirely and solely by serai-processor-frost-attempt-manager.
|
||||
#[derive(Clone, Debug, BorshSerialize, BorshDeserialize)]
|
||||
pub enum ProcessorMessage {
|
||||
// Participant sent an invalid message during the sign protocol.
|
||||
InvalidParticipant { session: Session, participant: Participant },
|
||||
@@ -155,39 +159,25 @@ pub mod sign {
|
||||
pub mod coordinator {
|
||||
use super::*;
|
||||
|
||||
// TODO: Remove this for the one defined in serai-cosign
|
||||
pub fn cosign_block_msg(block_number: u64, block: [u8; 32]) -> Vec<u8> {
|
||||
const DST: &[u8] = b"Cosign";
|
||||
let mut res = vec![u8::try_from(DST.len()).unwrap()];
|
||||
res.extend(DST);
|
||||
res.extend(block_number.to_le_bytes());
|
||||
res.extend(block);
|
||||
res
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
|
||||
pub enum CoordinatorMessage {
|
||||
/// Cosign the specified Substrate block.
|
||||
///
|
||||
/// This is sent by the Coordinator's Tributary scanner.
|
||||
CosignSubstrateBlock { session: Session, block_number: u64, block: [u8; 32] },
|
||||
CosignSubstrateBlock { session: Session, intent: CosignIntent },
|
||||
/// Sign the slash report for this session.
|
||||
///
|
||||
/// This is sent by the Coordinator's Tributary scanner.
|
||||
SignSlashReport { session: Session, report: Vec<Slash> },
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
|
||||
pub struct PlanMeta {
|
||||
pub session: Session,
|
||||
pub id: [u8; 32],
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
|
||||
// This set of messages is sent entirely and solely by serai-processor-bin's implementation of
|
||||
// the signers::Coordinator trait.
|
||||
// TODO: Move message creation into serai-processor-signers
|
||||
#[derive(Clone, Debug, BorshSerialize, BorshDeserialize)]
|
||||
pub enum ProcessorMessage {
|
||||
CosignedBlock { block_number: u64, block: [u8; 32], signature: Vec<u8> },
|
||||
CosignedBlock { cosign: SignedCosign },
|
||||
SignedBatch { batch: SignedBatch },
|
||||
SubstrateBlockAck { block: u64, plans: Vec<PlanMeta> },
|
||||
SignedSlashReport { session: Session, signature: Vec<u8> },
|
||||
}
|
||||
}
|
||||
@@ -231,17 +221,16 @@ pub mod substrate {
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Eq, Debug)]
|
||||
pub enum ProcessorMessage {}
|
||||
impl BorshSerialize for ProcessorMessage {
|
||||
fn serialize<W: borsh::io::Write>(&self, _writer: &mut W) -> borsh::io::Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
|
||||
pub struct PlanMeta {
|
||||
pub session: Session,
|
||||
pub transaction_plan_id: [u8; 32],
|
||||
}
|
||||
impl BorshDeserialize for ProcessorMessage {
|
||||
fn deserialize_reader<R: borsh::io::Read>(_reader: &mut R) -> borsh::io::Result<Self> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, BorshSerialize, BorshDeserialize)]
|
||||
pub enum ProcessorMessage {
|
||||
// TODO: Have the processor send this
|
||||
SubstrateBlockAck { block: [u8; 32], plans: Vec<PlanMeta> },
|
||||
}
|
||||
}
|
||||
|
||||
@@ -268,7 +257,7 @@ impl_from!(sign, CoordinatorMessage, Sign);
|
||||
impl_from!(coordinator, CoordinatorMessage, Coordinator);
|
||||
impl_from!(substrate, CoordinatorMessage, Substrate);
|
||||
|
||||
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
|
||||
#[derive(Clone, Debug, BorshSerialize, BorshDeserialize)]
|
||||
pub enum ProcessorMessage {
|
||||
KeyGen(key_gen::ProcessorMessage),
|
||||
Sign(sign::ProcessorMessage),
|
||||
@@ -331,8 +320,8 @@ impl CoordinatorMessage {
|
||||
CoordinatorMessage::Coordinator(msg) => {
|
||||
let (sub, id) = match msg {
|
||||
// We only cosign a block once, and Reattempt is a separate message
|
||||
coordinator::CoordinatorMessage::CosignSubstrateBlock { block_number, .. } => {
|
||||
(0, block_number.encode())
|
||||
coordinator::CoordinatorMessage::CosignSubstrateBlock { intent, .. } => {
|
||||
(0, intent.block_number.encode())
|
||||
}
|
||||
// We only sign one slash report, and Reattempt is a separate message
|
||||
coordinator::CoordinatorMessage::SignSlashReport { session, .. } => (1, session.encode()),
|
||||
@@ -404,17 +393,26 @@ impl ProcessorMessage {
|
||||
}
|
||||
ProcessorMessage::Coordinator(msg) => {
|
||||
let (sub, id) = match msg {
|
||||
coordinator::ProcessorMessage::CosignedBlock { block, .. } => (0, block.encode()),
|
||||
coordinator::ProcessorMessage::CosignedBlock { cosign } => {
|
||||
(0, cosign.cosign.block_hash.encode())
|
||||
}
|
||||
coordinator::ProcessorMessage::SignedBatch { batch, .. } => (1, batch.batch.id.encode()),
|
||||
coordinator::ProcessorMessage::SubstrateBlockAck { block, .. } => (2, block.encode()),
|
||||
coordinator::ProcessorMessage::SignedSlashReport { session, .. } => (3, session.encode()),
|
||||
coordinator::ProcessorMessage::SignedSlashReport { session, .. } => (2, session.encode()),
|
||||
};
|
||||
|
||||
let mut res = vec![PROCESSOR_UID, TYPE_COORDINATOR_UID, sub];
|
||||
res.extend(&id);
|
||||
res
|
||||
}
|
||||
ProcessorMessage::Substrate(_) => panic!("requesting intent for empty message type"),
|
||||
ProcessorMessage::Substrate(msg) => {
|
||||
let (sub, id) = match msg {
|
||||
substrate::ProcessorMessage::SubstrateBlockAck { block, .. } => (0, block.encode()),
|
||||
};
|
||||
|
||||
let mut res = vec![PROCESSOR_UID, TYPE_SUBSTRATE_UID, sub];
|
||||
res.extend(&id);
|
||||
res
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,7 +19,6 @@ workspace = true
|
||||
[dependencies]
|
||||
hex = "0.4"
|
||||
|
||||
async-trait = "0.1"
|
||||
zeroize = { version = "1", default-features = false }
|
||||
rand_core = { version = "0.6", default-features = false }
|
||||
|
||||
|
||||
@@ -19,8 +19,6 @@ workspace = true
|
||||
[dependencies]
|
||||
hex = "0.4"
|
||||
|
||||
async-trait = "0.1"
|
||||
|
||||
zeroize = { version = "1", default-features = false }
|
||||
rand_core = { version = "0.6", default-features = false }
|
||||
|
||||
|
||||
Reference in New Issue
Block a user