Start handling messages from the processor

Does route ProcessorMessage::CosignedBlock. Rest are stubbed with TODO.
This commit is contained in:
Luke Parker
2025-01-12 05:53:43 -05:00
parent e7de5125a2
commit e35aa04afb

View File

@@ -9,14 +9,19 @@ use ciphersuite::{
Ciphersuite, Ristretto,
};
use borsh::BorshDeserialize;
use tokio::sync::mpsc;
use serai_client::{primitives::PublicKey, Serai};
use serai_client::{
primitives::{NetworkId, PublicKey},
Serai,
};
use message_queue::{Service, client::MessageQueue};
use serai_task::{Task, TaskHandle, ContinuallyRan};
use serai_cosign::{SignedCosign, Cosigning};
use serai_cosign::{Faulted, SignedCosign, Cosigning};
use serai_coordinator_substrate::{CanonicalEventStream, EphemeralEventStream, SignSlashReport};
use serai_coordinator_tributary::Transaction;
@@ -63,18 +68,60 @@ async fn serai() -> Arc<Serai> {
}
}
fn spawn_cosigning(
db: impl serai_db::Db,
fn spawn_cosigning<D: serai_db::Db>(
mut db: D,
serai: Arc<Serai>,
p2p: impl p2p::P2p,
tasks_to_run_upon_cosigning: Vec<TaskHandle>,
mut p2p_cosigns: mpsc::UnboundedReceiver<SignedCosign>,
mut signed_cosigns: mpsc::UnboundedReceiver<SignedCosign>,
) {
let mut cosigning = Cosigning::spawn(db, serai, p2p.clone(), tasks_to_run_upon_cosigning);
let mut cosigning = Cosigning::spawn(db.clone(), serai, p2p.clone(), tasks_to_run_upon_cosigning);
tokio::spawn(async move {
const COSIGN_LOOP_INTERVAL: Duration = Duration::from_secs(5);
let last_cosign_rebroadcast = Instant::now();
loop {
// Intake our own cosigns
match Cosigning::<D>::latest_cosigned_block_number(&db) {
Ok(latest_cosigned_block_number) => {
let mut txn = db.txn();
// The cosigns we prior tried to intake yet failed to
let mut cosigns = ErroneousCosigns::get(&txn).unwrap_or(vec![]);
// The cosigns we have yet to intake
while let Some(cosign) = SignedCosigns::try_recv(&mut txn) {
cosigns.push(cosign);
}
let mut erroneous = vec![];
for cosign in cosigns {
// If this cosign is stale, move on
if cosign.cosign.block_number <= latest_cosigned_block_number {
continue;
}
match cosigning.intake_cosign(&cosign) {
// Publish this cosign
Ok(()) => p2p.publish_cosign(cosign).await,
Err(e) => {
assert!(e.temporal(), "signed an invalid cosign: {e:?}");
// Since this had a temporal error, queue it to try again later
erroneous.push(cosign);
}
};
}
// Save the cosigns with temporal errors to the database
ErroneousCosigns::set(&mut txn, &erroneous);
txn.commit();
}
Err(Faulted) => {
// We don't panic here as the following code rebroadcasts our cosigns which is
// necessary to inform other coordinators of the faulty cosigns
log::error!("cosigning faulted");
}
}
let time_till_cosign_rebroadcast = (last_cosign_rebroadcast +
serai_cosign::BROADCAST_FREQUENCY)
.saturating_duration_since(Instant::now());
@@ -86,19 +133,98 @@ fn spawn_cosigning(
}
cosign = p2p_cosigns.recv() => {
let cosign = cosign.expect("p2p cosigns channel was dropped?");
let _: Result<_, _> = cosigning.intake_cosign(&cosign);
}
cosign = signed_cosigns.recv() => {
let cosign = cosign.expect("signed cosigns channel was dropped?");
// TODO: Handle this error
let _: Result<_, _> = cosigning.intake_cosign(&cosign);
p2p.publish_cosign(cosign).await;
if cosigning.intake_cosign(&cosign).is_ok() {
p2p.publish_cosign(cosign).await;
}
}
// Make sure this loop runs at least this often
() = tokio::time::sleep(COSIGN_LOOP_INTERVAL) => {}
}
}
});
}
async fn handle_processor_messages(
mut db: impl serai_db::Db,
message_queue: Arc<MessageQueue>,
network: NetworkId,
) {
loop {
let (msg_id, msg) = {
let msg = message_queue.next(Service::Processor(network)).await;
// Check this message's sender is as expected
assert_eq!(msg.from, Service::Processor(network));
// Check this message's ID is as expected
let last = LastProcessorMessage::get(&db, network);
let next = last.map(|id| id + 1).unwrap_or(0);
// This should either be the last message's ID, if we committed but didn't send our ACK, or
// the expected next message's ID
assert!((Some(msg.id) == last) || (msg.id == next));
// TODO: Check msg.sig
// If this is the message we already handled, and just failed to ACK, ACK it now and move on
if Some(msg.id) == last {
message_queue.ack(Service::Processor(network), msg.id).await;
continue;
}
(msg.id, messages::ProcessorMessage::deserialize(&mut msg.msg.as_slice()).unwrap())
};
let mut txn = db.txn();
match msg {
messages::ProcessorMessage::KeyGen(msg) => match msg {
messages::key_gen::ProcessorMessage::Participation { session, participation } => {
todo!("TODO Transaction::DkgParticipation")
}
messages::key_gen::ProcessorMessage::GeneratedKeyPair {
session,
substrate_key,
network_key,
} => todo!("TODO Transaction::DkgConfirmationPreprocess"),
messages::key_gen::ProcessorMessage::Blame { session, participant } => {
todo!("TODO Transaction::RemoveParticipant")
}
},
messages::ProcessorMessage::Sign(msg) => match msg {
messages::sign::ProcessorMessage::InvalidParticipant { session, participant } => {
todo!("TODO Transaction::RemoveParticipant")
}
messages::sign::ProcessorMessage::Preprocesses { id, preprocesses } => {
todo!("TODO Transaction::Batch + Transaction::Sign")
}
messages::sign::ProcessorMessage::Shares { id, shares } => todo!("TODO Transaction::Sign"),
},
messages::ProcessorMessage::Coordinator(msg) => match msg {
messages::coordinator::ProcessorMessage::CosignedBlock { cosign } => {
SignedCosigns::send(&mut txn, &cosign);
}
messages::coordinator::ProcessorMessage::SignedBatch { batch } => {
todo!("TODO Save to DB, have task read from DB and publish to Serai")
}
messages::coordinator::ProcessorMessage::SignedSlashReport { session, signature } => {
todo!("TODO Save to DB, have task read from DB and publish to Serai")
}
},
messages::ProcessorMessage::Substrate(msg) => match msg {
messages::substrate::ProcessorMessage::SubstrateBlockAck { block, plans } => {
todo!("TODO Transaction::SubstrateBlock")
}
},
}
// Mark this as the last handled message
LastProcessorMessage::set(&mut txn, network, &msg_id);
// Commit the txn
txn.commit();
// Now that we won't handle this message again, acknowledge it so we won't see it again
message_queue.ack(Service::Processor(network), msg_id).await;
}
}
#[tokio::main]
async fn main() {
// Override the panic handler with one which will panic if any tokio task panics
@@ -217,7 +343,6 @@ async fn main() {
);
// Spawn the cosign handler
let (signed_cosigns_send, signed_cosigns_recv) = mpsc::unbounded_channel();
spawn_cosigning(
db.clone(),
serai.clone(),
@@ -225,7 +350,6 @@ async fn main() {
// Run the Substrate scanners once we cosign new blocks
vec![substrate_canonical_task, substrate_ephemeral_task],
p2p_cosigns_recv,
signed_cosigns_recv,
);
// Spawn all Tributaries on-disk
@@ -254,7 +378,14 @@ async fn main() {
.continually_run(substrate_task_def, vec![]),
);
// TODO: Handle processor messages
// Handle all of the Processors' messages
for network in serai_client::primitives::NETWORKS {
if network == NetworkId::Serai {
continue;
}
tokio::spawn(handle_processor_messages(db.clone(), message_queue.clone(), network));
}
todo!("TODO")
// Run the spawned tasks ad-infinitum
core::future::pending().await
}