use std::future::Future; use futures::stream::{StreamExt, FuturesOrdered}; use serai_client::{ primitives::{PublicKey, NetworkId, EmbeddedEllipticCurve}, validator_sets::primitives::MAX_KEY_SHARES_PER_SET, Serai, }; use serai_db::*; use serai_task::ContinuallyRan; use serai_cosign::Cosigning; use crate::NewSetInformation; create_db!( CoordinatorSubstrateEphemeral { NextBlock: () -> u64, } ); /// The event stream for ephemeral events. pub struct EphemeralEventStream { db: D, serai: Serai, validator: PublicKey, } impl EphemeralEventStream { /// Create a new ephemeral event stream. /// /// Only one of these may exist over the provided database. pub fn new(db: D, serai: Serai, validator: PublicKey) -> Self { Self { db, serai, validator } } } impl ContinuallyRan for EphemeralEventStream { fn run_iteration(&mut self) -> impl Send + Future> { async move { let next_block = NextBlock::get(&self.db).unwrap_or(0); let latest_finalized_block = Cosigning::::latest_cosigned_block_number(&self.db).map_err(|e| format!("{e:?}"))?; // These are all the events which generate canonical messages struct EphemeralEvents { block_hash: [u8; 32], time: u64, new_set_events: Vec, accepted_handover_events: Vec, } // For a cosigned block, fetch all relevant events let scan = { let db = self.db.clone(); let serai = &self.serai; move |block_number| { let block_hash = Cosigning::::cosigned_block(&db, block_number); async move { let block_hash = match block_hash { Ok(Some(block_hash)) => block_hash, Ok(None) => { panic!("iterating to latest cosigned block but couldn't get cosigned block") } Err(serai_cosign::Faulted) => return Err("cosigning process faulted".to_string()), }; let temporal_serai = serai.as_of(block_hash); let temporal_serai_validators = temporal_serai.validator_sets(); let (block, new_set_events, accepted_handover_events) = tokio::try_join!( serai.block(block_hash), temporal_serai_validators.new_set_events(), temporal_serai_validators.accepted_handover_events(), ) .map_err(|e| format!("{e:?}"))?; let Some(block) = block else { Err(format!("Serai node didn't have cosigned block #{block_number}"))? }; let time = if block_number == 0 { block.time().unwrap_or(0) } else { // Serai's block time is in milliseconds block .time() .ok_or_else(|| "non-genesis Serai block didn't have a time".to_string())? / 1000 }; Ok(( block_number, EphemeralEvents { block_hash, time, new_set_events, accepted_handover_events }, )) } } }; // Sync the next set of upcoming blocks all at once to minimize latency const BLOCKS_TO_SYNC_AT_ONCE: u64 = 50; let mut set = FuturesOrdered::new(); for block_number in next_block ..= latest_finalized_block.min(next_block + BLOCKS_TO_SYNC_AT_ONCE) { set.push_back(scan(block_number)); } for block_number in next_block ..= latest_finalized_block { // Get the next block in our queue let (popped_block_number, block) = set.next().await.unwrap()?; assert_eq!(block_number, popped_block_number); // Re-populate the queue if (block_number + BLOCKS_TO_SYNC_AT_ONCE) <= latest_finalized_block { set.push_back(scan(block_number + BLOCKS_TO_SYNC_AT_ONCE)); } let mut txn = self.db.txn(); for new_set in block.new_set_events { let serai_client::validator_sets::ValidatorSetsEvent::NewSet { set } = &new_set else { panic!("NewSet event wasn't a NewSet event: {new_set:?}"); }; // We only coordinate over external networks if set.network == NetworkId::Serai { continue; } let serai = self.serai.as_of(block.block_hash); let serai = serai.validator_sets(); let Some(validators) = serai.participants(set.network).await.map_err(|e| format!("{e:?}"))? else { Err(format!( "block #{block_number} declared a new set but didn't have the participants" ))? }; let in_set = validators.iter().any(|(validator, _)| *validator == self.validator); if in_set { if u16::try_from(validators.len()).is_err() { Err("more than u16::MAX validators sent")?; } let Ok(validators) = validators .into_iter() .map(|(validator, weight)| u16::try_from(weight).map(|weight| (validator, weight))) .collect::, _>>() else { Err("validator's weight exceeded u16::MAX".to_string())? }; let total_weight = validators.iter().map(|(_, weight)| u32::from(*weight)).sum::(); if total_weight > MAX_KEY_SHARES_PER_SET { Err(format!( "{set:?} has {total_weight} key shares when the max is {MAX_KEY_SHARES_PER_SET}" ))?; } let total_weight = u16::try_from(total_weight).unwrap(); // Fetch all of the validators' embedded elliptic curve keys let mut embedded_elliptic_curve_keys = FuturesOrdered::new(); for (validator, _) in &validators { let validator = *validator; // try_join doesn't return a future so we need to wrap it in this additional async // block embedded_elliptic_curve_keys.push_back(async move { tokio::try_join!( // One future to fetch the substrate embedded key serai .embedded_elliptic_curve_key(validator, EmbeddedEllipticCurve::Embedwards25519), // One future to fetch the external embedded key, if there is a distinct curve async { // `embedded_elliptic_curves` is documented to have the second entry be the // network-specific curve (if it exists and is distinct from Embedwards25519) if let Some(curve) = set.network.embedded_elliptic_curves().get(1) { serai.embedded_elliptic_curve_key(validator, *curve).await.map(Some) } else { Ok(None) } } ) .map(|(substrate_embedded_key, external_embedded_key)| { (validator, substrate_embedded_key, external_embedded_key) }) }); } let mut evrf_public_keys = Vec::with_capacity(usize::from(total_weight)); for (validator, weight) in &validators { let (future_validator, substrate_embedded_key, external_embedded_key) = embedded_elliptic_curve_keys.next().await.unwrap().map_err(|e| format!("{e:?}"))?; assert_eq!(*validator, future_validator); let external_embedded_key = external_embedded_key.unwrap_or(substrate_embedded_key.clone()); match (substrate_embedded_key, external_embedded_key) { (Some(substrate_embedded_key), Some(external_embedded_key)) => { let substrate_embedded_key = <[u8; 32]>::try_from(substrate_embedded_key) .map_err(|_| "Embedwards25519 key wasn't 32 bytes".to_string())?; for _ in 0 .. *weight { evrf_public_keys.push((substrate_embedded_key, external_embedded_key.clone())); } } _ => Err("NewSet with validator missing an embedded key".to_string())?, } } crate::NewSet::send( &mut txn, &NewSetInformation { set: *set, serai_block: block.block_hash, start_time: block.time, // TODO: Why do we have this as an explicit field here? // Shouldn't thiis be inlined into the Processor's key gen code, where it's used? threshold: ((total_weight * 2) / 3) + 1, validators, evrf_public_keys, }, ); } } for accepted_handover in block.accepted_handover_events { let serai_client::validator_sets::ValidatorSetsEvent::AcceptedHandover { set } = &accepted_handover else { panic!("AcceptedHandover event wasn't a AcceptedHandover event: {accepted_handover:?}"); }; crate::SignSlashReport::send(&mut txn, set); } txn.commit(); } Ok(next_block <= latest_finalized_block) } } }