2025-01-11 04:14:21 -05:00
|
|
|
use core::{ops::Deref, time::Duration};
|
2025-01-12 07:32:45 -05:00
|
|
|
use std::{sync::Arc, collections::HashMap, time::Instant};
|
2025-01-10 02:24:24 -05:00
|
|
|
|
|
|
|
|
use zeroize::{Zeroize, Zeroizing};
|
|
|
|
|
use rand_core::{RngCore, OsRng};
|
|
|
|
|
|
2025-08-20 04:50:37 -04:00
|
|
|
use dalek_ff_group::Ristretto;
|
2025-01-10 02:24:24 -05:00
|
|
|
use ciphersuite::{
|
2025-08-25 09:17:29 -04:00
|
|
|
group::{ff::PrimeField, GroupEncoding},
|
2025-08-20 04:50:37 -04:00
|
|
|
Ciphersuite,
|
2025-01-10 02:24:24 -05:00
|
|
|
};
|
|
|
|
|
|
2025-01-12 05:53:43 -05:00
|
|
|
use borsh::BorshDeserialize;
|
|
|
|
|
|
2025-01-10 02:24:24 -05:00
|
|
|
use tokio::sync::mpsc;
|
|
|
|
|
|
2025-01-12 05:53:43 -05:00
|
|
|
use serai_client::{
|
2025-01-30 03:14:24 -05:00
|
|
|
primitives::{ExternalNetworkId, PublicKey, SeraiAddress, Signature},
|
|
|
|
|
validator_sets::primitives::{ExternalValidatorSet, KeyPair},
|
2025-01-12 05:53:43 -05:00
|
|
|
Serai,
|
|
|
|
|
};
|
2025-01-11 04:14:21 -05:00
|
|
|
use message_queue::{Service, client::MessageQueue};
|
|
|
|
|
|
2025-01-10 02:24:24 -05:00
|
|
|
use serai_task::{Task, TaskHandle, ContinuallyRan};
|
|
|
|
|
|
2025-01-12 05:53:43 -05:00
|
|
|
use serai_cosign::{Faulted, SignedCosign, Cosigning};
|
2025-01-15 11:21:55 -05:00
|
|
|
use serai_coordinator_substrate::{
|
2025-01-15 17:49:00 -05:00
|
|
|
CanonicalEventStream, EphemeralEventStream, SignSlashReport, SetKeysTask, SignedBatches,
|
|
|
|
|
PublishBatchTask, SlashReports, PublishSlashReportTask,
|
2025-01-15 11:21:55 -05:00
|
|
|
};
|
2025-01-15 10:47:47 -05:00
|
|
|
use serai_coordinator_tributary::{SigningProtocolRound, Signed, Transaction, SubstrateBlockPlans};
|
2025-01-10 02:24:24 -05:00
|
|
|
|
2025-01-11 01:31:28 -05:00
|
|
|
mod db;
|
|
|
|
|
use db::*;
|
|
|
|
|
|
2023-04-20 05:05:17 -04:00
|
|
|
mod tributary;
|
2025-01-15 20:29:57 -05:00
|
|
|
mod dkg_confirmation;
|
2025-01-11 04:14:21 -05:00
|
|
|
|
|
|
|
|
mod substrate;
|
|
|
|
|
use substrate::SubstrateTask;
|
2025-01-09 01:26:25 -05:00
|
|
|
|
|
|
|
|
mod p2p {
|
2025-01-09 06:58:00 -05:00
|
|
|
pub use serai_coordinator_p2p::*;
|
2025-01-09 01:26:25 -05:00
|
|
|
pub use serai_coordinator_libp2p_p2p::Libp2p;
|
|
|
|
|
}
|
2023-04-17 02:10:33 -04:00
|
|
|
|
2025-01-10 02:24:24 -05:00
|
|
|
// Use a zeroizing allocator for this entire application
|
|
|
|
|
// While secrets should already be zeroized, the presence of secret keys in a networked application
|
|
|
|
|
// (at increased risk of OOB reads) justifies the performance hit in case any secrets weren't
|
|
|
|
|
// already
|
|
|
|
|
#[global_allocator]
|
|
|
|
|
static ALLOCATOR: zalloc::ZeroizingAlloc<std::alloc::System> =
|
|
|
|
|
zalloc::ZeroizingAlloc(std::alloc::System);
|
|
|
|
|
|
|
|
|
|
async fn serai() -> Arc<Serai> {
|
|
|
|
|
const SERAI_CONNECTION_DELAY: Duration = Duration::from_secs(10);
|
|
|
|
|
const MAX_SERAI_CONNECTION_DELAY: Duration = Duration::from_secs(300);
|
|
|
|
|
|
|
|
|
|
let mut delay = SERAI_CONNECTION_DELAY;
|
|
|
|
|
loop {
|
|
|
|
|
let Ok(serai) = Serai::new(format!(
|
|
|
|
|
"http://{}:9944",
|
|
|
|
|
serai_env::var("SERAI_HOSTNAME").expect("Serai hostname wasn't provided")
|
|
|
|
|
))
|
|
|
|
|
.await
|
|
|
|
|
else {
|
|
|
|
|
log::error!("couldn't connect to the Serai node");
|
|
|
|
|
tokio::time::sleep(delay).await;
|
|
|
|
|
delay = (delay + SERAI_CONNECTION_DELAY).min(MAX_SERAI_CONNECTION_DELAY);
|
|
|
|
|
continue;
|
|
|
|
|
};
|
|
|
|
|
log::info!("made initial connection to Serai node");
|
|
|
|
|
return Arc::new(serai);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-01-12 05:53:43 -05:00
|
|
|
fn spawn_cosigning<D: serai_db::Db>(
|
|
|
|
|
mut db: D,
|
2025-01-10 02:24:24 -05:00
|
|
|
serai: Arc<Serai>,
|
|
|
|
|
p2p: impl p2p::P2p,
|
|
|
|
|
tasks_to_run_upon_cosigning: Vec<TaskHandle>,
|
|
|
|
|
mut p2p_cosigns: mpsc::UnboundedReceiver<SignedCosign>,
|
|
|
|
|
) {
|
2025-01-12 05:53:43 -05:00
|
|
|
let mut cosigning = Cosigning::spawn(db.clone(), serai, p2p.clone(), tasks_to_run_upon_cosigning);
|
2025-01-10 02:24:24 -05:00
|
|
|
tokio::spawn(async move {
|
2025-01-12 05:53:43 -05:00
|
|
|
const COSIGN_LOOP_INTERVAL: Duration = Duration::from_secs(5);
|
|
|
|
|
|
2025-01-10 02:24:24 -05:00
|
|
|
let last_cosign_rebroadcast = Instant::now();
|
|
|
|
|
loop {
|
2025-01-12 05:53:43 -05:00
|
|
|
// Intake our own cosigns
|
|
|
|
|
match Cosigning::<D>::latest_cosigned_block_number(&db) {
|
|
|
|
|
Ok(latest_cosigned_block_number) => {
|
|
|
|
|
let mut txn = db.txn();
|
|
|
|
|
// The cosigns we prior tried to intake yet failed to
|
|
|
|
|
let mut cosigns = ErroneousCosigns::get(&txn).unwrap_or(vec![]);
|
|
|
|
|
// The cosigns we have yet to intake
|
|
|
|
|
while let Some(cosign) = SignedCosigns::try_recv(&mut txn) {
|
|
|
|
|
cosigns.push(cosign);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let mut erroneous = vec![];
|
|
|
|
|
for cosign in cosigns {
|
|
|
|
|
// If this cosign is stale, move on
|
|
|
|
|
if cosign.cosign.block_number <= latest_cosigned_block_number {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
match cosigning.intake_cosign(&cosign) {
|
|
|
|
|
// Publish this cosign
|
|
|
|
|
Ok(()) => p2p.publish_cosign(cosign).await,
|
|
|
|
|
Err(e) => {
|
|
|
|
|
assert!(e.temporal(), "signed an invalid cosign: {e:?}");
|
|
|
|
|
// Since this had a temporal error, queue it to try again later
|
|
|
|
|
erroneous.push(cosign);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Save the cosigns with temporal errors to the database
|
|
|
|
|
ErroneousCosigns::set(&mut txn, &erroneous);
|
|
|
|
|
|
|
|
|
|
txn.commit();
|
|
|
|
|
}
|
|
|
|
|
Err(Faulted) => {
|
|
|
|
|
// We don't panic here as the following code rebroadcasts our cosigns which is
|
|
|
|
|
// necessary to inform other coordinators of the faulty cosigns
|
|
|
|
|
log::error!("cosigning faulted");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-01-10 02:24:24 -05:00
|
|
|
let time_till_cosign_rebroadcast = (last_cosign_rebroadcast +
|
|
|
|
|
serai_cosign::BROADCAST_FREQUENCY)
|
|
|
|
|
.saturating_duration_since(Instant::now());
|
|
|
|
|
tokio::select! {
|
|
|
|
|
() = tokio::time::sleep(time_till_cosign_rebroadcast) => {
|
|
|
|
|
for cosign in cosigning.cosigns_to_rebroadcast() {
|
|
|
|
|
p2p.publish_cosign(cosign).await;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
cosign = p2p_cosigns.recv() => {
|
|
|
|
|
let cosign = cosign.expect("p2p cosigns channel was dropped?");
|
2025-01-12 05:53:43 -05:00
|
|
|
if cosigning.intake_cosign(&cosign).is_ok() {
|
|
|
|
|
p2p.publish_cosign(cosign).await;
|
|
|
|
|
}
|
2025-01-10 02:24:24 -05:00
|
|
|
}
|
2025-01-12 05:53:43 -05:00
|
|
|
// Make sure this loop runs at least this often
|
|
|
|
|
() = tokio::time::sleep(COSIGN_LOOP_INTERVAL) => {}
|
2025-01-10 02:24:24 -05:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
2025-01-15 11:21:55 -05:00
|
|
|
async fn handle_network(
|
2025-01-12 05:53:43 -05:00
|
|
|
mut db: impl serai_db::Db,
|
|
|
|
|
message_queue: Arc<MessageQueue>,
|
2025-01-15 11:21:55 -05:00
|
|
|
serai: Arc<Serai>,
|
2025-01-30 03:14:24 -05:00
|
|
|
network: ExternalNetworkId,
|
2025-01-12 05:53:43 -05:00
|
|
|
) {
|
2025-01-15 11:21:55 -05:00
|
|
|
// Spawn the task to publish batches for this network
|
|
|
|
|
{
|
|
|
|
|
let (publish_batch_task_def, publish_batch_task) = Task::new();
|
|
|
|
|
tokio::spawn(
|
|
|
|
|
PublishBatchTask::new(db.clone(), serai.clone(), network)
|
|
|
|
|
.continually_run(publish_batch_task_def, vec![]),
|
|
|
|
|
);
|
2025-01-15 12:08:28 -05:00
|
|
|
// Forget its handle so it always runs in the background
|
2025-01-15 11:21:55 -05:00
|
|
|
core::mem::forget(publish_batch_task);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Handle Processor messages
|
2025-01-12 05:53:43 -05:00
|
|
|
loop {
|
|
|
|
|
let (msg_id, msg) = {
|
|
|
|
|
let msg = message_queue.next(Service::Processor(network)).await;
|
|
|
|
|
// Check this message's sender is as expected
|
|
|
|
|
assert_eq!(msg.from, Service::Processor(network));
|
|
|
|
|
|
|
|
|
|
// Check this message's ID is as expected
|
|
|
|
|
let last = LastProcessorMessage::get(&db, network);
|
|
|
|
|
let next = last.map(|id| id + 1).unwrap_or(0);
|
|
|
|
|
// This should either be the last message's ID, if we committed but didn't send our ACK, or
|
|
|
|
|
// the expected next message's ID
|
|
|
|
|
assert!((Some(msg.id) == last) || (msg.id == next));
|
|
|
|
|
|
|
|
|
|
// TODO: Check msg.sig
|
|
|
|
|
|
|
|
|
|
// If this is the message we already handled, and just failed to ACK, ACK it now and move on
|
|
|
|
|
if Some(msg.id) == last {
|
|
|
|
|
message_queue.ack(Service::Processor(network), msg.id).await;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
(msg.id, messages::ProcessorMessage::deserialize(&mut msg.msg.as_slice()).unwrap())
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let mut txn = db.txn();
|
|
|
|
|
|
|
|
|
|
match msg {
|
|
|
|
|
messages::ProcessorMessage::KeyGen(msg) => match msg {
|
|
|
|
|
messages::key_gen::ProcessorMessage::Participation { session, participation } => {
|
2025-01-30 03:14:24 -05:00
|
|
|
let set = ExternalValidatorSet { network, session };
|
2025-01-15 20:29:57 -05:00
|
|
|
TributaryTransactionsFromProcessorMessages::send(
|
2025-01-12 07:32:45 -05:00
|
|
|
&mut txn,
|
|
|
|
|
set,
|
|
|
|
|
&Transaction::DkgParticipation { participation, signed: Signed::default() },
|
|
|
|
|
);
|
2025-01-12 05:53:43 -05:00
|
|
|
}
|
|
|
|
|
messages::key_gen::ProcessorMessage::GeneratedKeyPair {
|
|
|
|
|
session,
|
|
|
|
|
substrate_key,
|
|
|
|
|
network_key,
|
2025-01-15 20:29:57 -05:00
|
|
|
} => {
|
|
|
|
|
KeysToConfirm::set(
|
|
|
|
|
&mut txn,
|
2025-01-30 03:14:24 -05:00
|
|
|
ExternalValidatorSet { network, session },
|
2025-01-15 20:29:57 -05:00
|
|
|
&KeyPair(
|
|
|
|
|
PublicKey::from_raw(substrate_key),
|
|
|
|
|
network_key
|
|
|
|
|
.try_into()
|
|
|
|
|
.expect("generated a network key which exceeds the maximum key length"),
|
|
|
|
|
),
|
|
|
|
|
);
|
|
|
|
|
}
|
2025-01-12 05:53:43 -05:00
|
|
|
messages::key_gen::ProcessorMessage::Blame { session, participant } => {
|
2025-01-30 03:14:24 -05:00
|
|
|
RemoveParticipant::send(&mut txn, ExternalValidatorSet { network, session }, participant);
|
2025-01-12 05:53:43 -05:00
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
messages::ProcessorMessage::Sign(msg) => match msg {
|
|
|
|
|
messages::sign::ProcessorMessage::InvalidParticipant { session, participant } => {
|
2025-01-30 03:14:24 -05:00
|
|
|
RemoveParticipant::send(&mut txn, ExternalValidatorSet { network, session }, participant);
|
2025-01-12 05:53:43 -05:00
|
|
|
}
|
|
|
|
|
messages::sign::ProcessorMessage::Preprocesses { id, preprocesses } => {
|
2025-01-30 03:14:24 -05:00
|
|
|
let set = ExternalValidatorSet { network, session: id.session };
|
2025-01-15 10:47:47 -05:00
|
|
|
if id.attempt == 0 {
|
|
|
|
|
// Batches are declared by their intent to be signed
|
|
|
|
|
if let messages::sign::VariantSignId::Batch(hash) = id.id {
|
2025-01-15 20:29:57 -05:00
|
|
|
TributaryTransactionsFromProcessorMessages::send(
|
|
|
|
|
&mut txn,
|
|
|
|
|
set,
|
|
|
|
|
&Transaction::Batch { hash },
|
|
|
|
|
);
|
2025-01-15 10:47:47 -05:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-01-15 20:29:57 -05:00
|
|
|
TributaryTransactionsFromProcessorMessages::send(
|
2025-01-15 10:47:47 -05:00
|
|
|
&mut txn,
|
|
|
|
|
set,
|
|
|
|
|
&Transaction::Sign {
|
|
|
|
|
id: id.id,
|
|
|
|
|
attempt: id.attempt,
|
|
|
|
|
round: SigningProtocolRound::Preprocess,
|
|
|
|
|
data: preprocesses,
|
|
|
|
|
signed: Signed::default(),
|
|
|
|
|
},
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
messages::sign::ProcessorMessage::Shares { id, shares } => {
|
2025-01-30 03:14:24 -05:00
|
|
|
let set = ExternalValidatorSet { network, session: id.session };
|
2025-01-15 20:29:57 -05:00
|
|
|
TributaryTransactionsFromProcessorMessages::send(
|
2025-01-15 10:47:47 -05:00
|
|
|
&mut txn,
|
|
|
|
|
set,
|
|
|
|
|
&Transaction::Sign {
|
|
|
|
|
id: id.id,
|
|
|
|
|
attempt: id.attempt,
|
|
|
|
|
round: SigningProtocolRound::Share,
|
|
|
|
|
data: shares,
|
|
|
|
|
signed: Signed::default(),
|
|
|
|
|
},
|
|
|
|
|
);
|
2025-01-12 05:53:43 -05:00
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
messages::ProcessorMessage::Coordinator(msg) => match msg {
|
|
|
|
|
messages::coordinator::ProcessorMessage::CosignedBlock { cosign } => {
|
|
|
|
|
SignedCosigns::send(&mut txn, &cosign);
|
|
|
|
|
}
|
|
|
|
|
messages::coordinator::ProcessorMessage::SignedBatch { batch } => {
|
2025-01-15 11:21:55 -05:00
|
|
|
SignedBatches::send(&mut txn, &batch);
|
2025-01-12 05:53:43 -05:00
|
|
|
}
|
2025-01-15 12:08:28 -05:00
|
|
|
messages::coordinator::ProcessorMessage::SignedSlashReport {
|
|
|
|
|
session,
|
|
|
|
|
slash_report,
|
|
|
|
|
signature,
|
|
|
|
|
} => {
|
|
|
|
|
SlashReports::set(
|
|
|
|
|
&mut txn,
|
2025-01-30 03:14:24 -05:00
|
|
|
ExternalValidatorSet { network, session },
|
2025-01-15 12:08:28 -05:00
|
|
|
slash_report,
|
|
|
|
|
Signature(signature),
|
|
|
|
|
);
|
2025-01-12 05:53:43 -05:00
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
messages::ProcessorMessage::Substrate(msg) => match msg {
|
|
|
|
|
messages::substrate::ProcessorMessage::SubstrateBlockAck { block, plans } => {
|
2025-01-12 07:32:45 -05:00
|
|
|
let mut by_session = HashMap::new();
|
|
|
|
|
for plan in plans {
|
|
|
|
|
by_session
|
|
|
|
|
.entry(plan.session)
|
|
|
|
|
.or_insert_with(|| Vec::with_capacity(1))
|
|
|
|
|
.push(plan.transaction_plan_id);
|
|
|
|
|
}
|
|
|
|
|
for (session, plans) in by_session {
|
2025-01-30 03:14:24 -05:00
|
|
|
let set = ExternalValidatorSet { network, session };
|
2025-01-12 07:32:45 -05:00
|
|
|
SubstrateBlockPlans::set(&mut txn, set, block, &plans);
|
2025-01-15 20:29:57 -05:00
|
|
|
TributaryTransactionsFromProcessorMessages::send(
|
2025-01-12 07:32:45 -05:00
|
|
|
&mut txn,
|
|
|
|
|
set,
|
|
|
|
|
&Transaction::SubstrateBlock { hash: block },
|
|
|
|
|
);
|
|
|
|
|
}
|
2025-01-12 05:53:43 -05:00
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Mark this as the last handled message
|
|
|
|
|
LastProcessorMessage::set(&mut txn, network, &msg_id);
|
|
|
|
|
// Commit the txn
|
|
|
|
|
txn.commit();
|
|
|
|
|
// Now that we won't handle this message again, acknowledge it so we won't see it again
|
|
|
|
|
message_queue.ack(Service::Processor(network), msg_id).await;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-01-10 02:24:24 -05:00
|
|
|
#[tokio::main]
|
|
|
|
|
async fn main() {
|
|
|
|
|
// Override the panic handler with one which will panic if any tokio task panics
|
|
|
|
|
{
|
|
|
|
|
let existing = std::panic::take_hook();
|
|
|
|
|
std::panic::set_hook(Box::new(move |panic| {
|
|
|
|
|
existing(panic);
|
|
|
|
|
const MSG: &str = "exiting the process due to a task panicking";
|
|
|
|
|
println!("{MSG}");
|
|
|
|
|
log::error!("{MSG}");
|
|
|
|
|
std::process::exit(1);
|
|
|
|
|
}));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Initialize the logger
|
|
|
|
|
if std::env::var("RUST_LOG").is_err() {
|
|
|
|
|
std::env::set_var("RUST_LOG", serai_env::var("RUST_LOG").unwrap_or_else(|| "info".to_string()));
|
|
|
|
|
}
|
|
|
|
|
env_logger::init();
|
|
|
|
|
log::info!("starting coordinator service...");
|
|
|
|
|
|
|
|
|
|
// Read the Serai key from the env
|
|
|
|
|
let serai_key = {
|
|
|
|
|
let mut key_hex = serai_env::var("SERAI_KEY").expect("Serai key wasn't provided");
|
|
|
|
|
let mut key_vec = hex::decode(&key_hex).map_err(|_| ()).expect("Serai key wasn't hex-encoded");
|
|
|
|
|
key_hex.zeroize();
|
|
|
|
|
if key_vec.len() != 32 {
|
|
|
|
|
key_vec.zeroize();
|
|
|
|
|
panic!("Serai key had an invalid length");
|
|
|
|
|
}
|
|
|
|
|
let mut key_bytes = [0; 32];
|
|
|
|
|
key_bytes.copy_from_slice(&key_vec);
|
|
|
|
|
key_vec.zeroize();
|
|
|
|
|
let key = Zeroizing::new(<Ristretto as Ciphersuite>::F::from_repr(key_bytes).unwrap());
|
|
|
|
|
key_bytes.zeroize();
|
|
|
|
|
key
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Open the database
|
2025-01-10 20:10:05 -05:00
|
|
|
let mut db = coordinator_db();
|
|
|
|
|
|
|
|
|
|
let existing_tributaries_at_boot = {
|
|
|
|
|
let mut txn = db.txn();
|
|
|
|
|
|
|
|
|
|
// Cleanup all historic Tributaries
|
|
|
|
|
while let Some(to_cleanup) = TributaryCleanup::try_recv(&mut txn) {
|
2025-01-11 01:31:28 -05:00
|
|
|
prune_tributary_db(to_cleanup);
|
2025-01-15 20:29:57 -05:00
|
|
|
// Remove the keys to confirm for this network
|
|
|
|
|
KeysToConfirm::take(&mut txn, to_cleanup);
|
2025-01-15 21:00:50 -05:00
|
|
|
KeySet::take(&mut txn, to_cleanup);
|
2025-01-11 01:31:28 -05:00
|
|
|
// Drain the cosign intents created for this set
|
|
|
|
|
while !Cosigning::<Db>::intended_cosigns(&mut txn, to_cleanup).is_empty() {}
|
2025-01-12 07:32:45 -05:00
|
|
|
// Drain the transactions to publish for this set
|
2025-01-15 20:29:57 -05:00
|
|
|
while TributaryTransactionsFromProcessorMessages::try_recv(&mut txn, to_cleanup).is_some() {}
|
|
|
|
|
while TributaryTransactionsFromDkgConfirmation::try_recv(&mut txn, to_cleanup).is_some() {}
|
2025-01-15 12:51:35 -05:00
|
|
|
// Drain the participants to remove for this set
|
|
|
|
|
while RemoveParticipant::try_recv(&mut txn, to_cleanup).is_some() {}
|
2025-01-11 06:51:55 -05:00
|
|
|
// Remove the SignSlashReport notification
|
|
|
|
|
SignSlashReport::try_recv(&mut txn, to_cleanup);
|
2025-01-10 20:10:05 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Remove retired Tributaries from ActiveTributaries
|
|
|
|
|
let mut active_tributaries = ActiveTributaries::get(&txn).unwrap_or(vec![]);
|
2025-01-11 01:31:28 -05:00
|
|
|
active_tributaries.retain(|tributary| {
|
|
|
|
|
RetiredTributary::get(&txn, tributary.set.network).map(|session| session.0) <
|
|
|
|
|
Some(tributary.set.session.0)
|
|
|
|
|
});
|
2025-01-10 20:10:05 -05:00
|
|
|
ActiveTributaries::set(&mut txn, &active_tributaries);
|
|
|
|
|
|
|
|
|
|
txn.commit();
|
|
|
|
|
|
|
|
|
|
active_tributaries
|
|
|
|
|
};
|
2025-01-10 02:24:24 -05:00
|
|
|
|
|
|
|
|
// Connect to the message-queue
|
2025-01-11 01:55:36 -05:00
|
|
|
let message_queue = Arc::new(MessageQueue::from_env(Service::Coordinator));
|
2025-01-10 02:24:24 -05:00
|
|
|
|
|
|
|
|
// Connect to the Serai node
|
|
|
|
|
let serai = serai().await;
|
|
|
|
|
|
|
|
|
|
let (p2p_add_tributary_send, p2p_add_tributary_recv) = mpsc::unbounded_channel();
|
|
|
|
|
let (p2p_retire_tributary_send, p2p_retire_tributary_recv) = mpsc::unbounded_channel();
|
|
|
|
|
let (p2p_cosigns_send, p2p_cosigns_recv) = mpsc::unbounded_channel();
|
|
|
|
|
|
|
|
|
|
// Spawn the P2P network
|
|
|
|
|
let p2p = {
|
|
|
|
|
let serai_keypair = {
|
|
|
|
|
let mut key_bytes = serai_key.to_bytes();
|
|
|
|
|
// Schnorrkel SecretKey is the key followed by 32 bytes of entropy for nonces
|
|
|
|
|
let mut expanded_key = Zeroizing::new([0; 64]);
|
|
|
|
|
expanded_key.as_mut_slice()[.. 32].copy_from_slice(&key_bytes);
|
|
|
|
|
OsRng.fill_bytes(&mut expanded_key.as_mut_slice()[32 ..]);
|
|
|
|
|
key_bytes.zeroize();
|
|
|
|
|
Zeroizing::new(
|
|
|
|
|
schnorrkel::SecretKey::from_bytes(expanded_key.as_slice()).unwrap().to_keypair(),
|
|
|
|
|
)
|
|
|
|
|
};
|
|
|
|
|
let p2p = p2p::Libp2p::new(&serai_keypair, serai.clone());
|
|
|
|
|
tokio::spawn(p2p::run::<Db, Transaction, _>(
|
|
|
|
|
db.clone(),
|
|
|
|
|
p2p.clone(),
|
|
|
|
|
p2p_add_tributary_recv,
|
|
|
|
|
p2p_retire_tributary_recv,
|
|
|
|
|
p2p_cosigns_send,
|
|
|
|
|
));
|
|
|
|
|
p2p
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Spawn the Substrate scanners
|
2025-01-11 03:07:15 -05:00
|
|
|
let (substrate_task_def, substrate_task) = Task::new();
|
2025-01-10 02:24:24 -05:00
|
|
|
let (substrate_canonical_task_def, substrate_canonical_task) = Task::new();
|
|
|
|
|
tokio::spawn(
|
|
|
|
|
CanonicalEventStream::new(db.clone(), serai.clone())
|
2025-01-11 03:07:15 -05:00
|
|
|
.continually_run(substrate_canonical_task_def, vec![substrate_task.clone()]),
|
2025-01-10 02:24:24 -05:00
|
|
|
);
|
|
|
|
|
let (substrate_ephemeral_task_def, substrate_ephemeral_task) = Task::new();
|
|
|
|
|
tokio::spawn(
|
|
|
|
|
EphemeralEventStream::new(
|
|
|
|
|
db.clone(),
|
|
|
|
|
serai.clone(),
|
2025-01-15 12:51:35 -05:00
|
|
|
SeraiAddress((<Ristretto as Ciphersuite>::generator() * serai_key.deref()).to_bytes()),
|
2025-01-10 02:24:24 -05:00
|
|
|
)
|
2025-01-11 03:07:15 -05:00
|
|
|
.continually_run(substrate_ephemeral_task_def, vec![substrate_task]),
|
2025-01-10 02:24:24 -05:00
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// Spawn the cosign handler
|
|
|
|
|
spawn_cosigning(
|
|
|
|
|
db.clone(),
|
|
|
|
|
serai.clone(),
|
|
|
|
|
p2p.clone(),
|
|
|
|
|
// Run the Substrate scanners once we cosign new blocks
|
|
|
|
|
vec![substrate_canonical_task, substrate_ephemeral_task],
|
|
|
|
|
p2p_cosigns_recv,
|
|
|
|
|
);
|
|
|
|
|
|
2025-01-10 20:10:05 -05:00
|
|
|
// Spawn all Tributaries on-disk
|
|
|
|
|
for tributary in existing_tributaries_at_boot {
|
2025-01-11 05:12:56 -05:00
|
|
|
crate::tributary::spawn_tributary(
|
2025-01-11 01:55:36 -05:00
|
|
|
db.clone(),
|
|
|
|
|
message_queue.clone(),
|
|
|
|
|
p2p.clone(),
|
|
|
|
|
&p2p_add_tributary_send,
|
|
|
|
|
tributary,
|
|
|
|
|
serai_key.clone(),
|
|
|
|
|
)
|
|
|
|
|
.await;
|
2025-01-10 20:10:05 -05:00
|
|
|
}
|
|
|
|
|
|
2025-01-11 03:07:15 -05:00
|
|
|
// Handle the events from the Substrate scanner
|
|
|
|
|
tokio::spawn(
|
|
|
|
|
(SubstrateTask {
|
|
|
|
|
serai_key: serai_key.clone(),
|
|
|
|
|
db: db.clone(),
|
|
|
|
|
message_queue: message_queue.clone(),
|
|
|
|
|
p2p: p2p.clone(),
|
|
|
|
|
p2p_add_tributary: p2p_add_tributary_send.clone(),
|
|
|
|
|
p2p_retire_tributary: p2p_retire_tributary_send.clone(),
|
|
|
|
|
})
|
|
|
|
|
.continually_run(substrate_task_def, vec![]),
|
|
|
|
|
);
|
2025-01-10 02:24:24 -05:00
|
|
|
|
2025-01-15 11:21:55 -05:00
|
|
|
// Handle each of the networks
|
2025-01-30 03:14:24 -05:00
|
|
|
for network in serai_client::primitives::EXTERNAL_NETWORKS {
|
2025-01-15 11:21:55 -05:00
|
|
|
tokio::spawn(handle_network(db.clone(), message_queue.clone(), serai.clone(), network));
|
2025-01-12 05:53:43 -05:00
|
|
|
}
|
2025-01-10 02:24:24 -05:00
|
|
|
|
2025-01-15 17:49:00 -05:00
|
|
|
// Spawn the task to set keys
|
|
|
|
|
{
|
|
|
|
|
let (set_keys_task_def, set_keys_task) = Task::new();
|
|
|
|
|
tokio::spawn(
|
|
|
|
|
SetKeysTask::new(db.clone(), serai.clone()).continually_run(set_keys_task_def, vec![]),
|
|
|
|
|
);
|
|
|
|
|
// Forget its handle so it always runs in the background
|
|
|
|
|
core::mem::forget(set_keys_task);
|
|
|
|
|
}
|
|
|
|
|
|
2025-01-15 12:08:28 -05:00
|
|
|
// Spawn the task to publish slash reports
|
|
|
|
|
{
|
|
|
|
|
let (publish_slash_report_task_def, publish_slash_report_task) = Task::new();
|
|
|
|
|
tokio::spawn(
|
|
|
|
|
PublishSlashReportTask::new(db, serai).continually_run(publish_slash_report_task_def, vec![]),
|
|
|
|
|
);
|
|
|
|
|
// Always have this run in the background
|
|
|
|
|
core::mem::forget(publish_slash_report_task);
|
|
|
|
|
}
|
|
|
|
|
|
2025-01-12 05:53:43 -05:00
|
|
|
// Run the spawned tasks ad-infinitum
|
|
|
|
|
core::future::pending().await
|
2023-04-15 17:38:47 -04:00
|
|
|
}
|