From 8c9441a1a5b30435aa35c06b9d12ae4f2a68f1a5 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Tue, 31 Dec 2024 10:37:19 -0500 Subject: [PATCH] Redo coordinator's Substrate scanner --- .github/workflows/msrv.yml | 1 + .github/workflows/tests.yml | 1 + Cargo.lock | 2 + Cargo.toml | 1 + coordinator/cosign/Cargo.toml | 9 +- coordinator/cosign/src/evaluator.rs | 8 + coordinator/cosign/src/lib.rs | 150 +++-- coordinator/src/substrate/db.rs | 32 - coordinator/src/substrate/mod.rs | 583 ------------------ coordinator/substrate/Cargo.toml | 35 ++ coordinator/substrate/LICENSE | 15 + coordinator/substrate/README.md | 14 + coordinator/substrate/src/canonical.rs | 216 +++++++ coordinator/substrate/src/ephemeral.rs | 240 +++++++ coordinator/substrate/src/lib.rs | 109 ++++ deny.toml | 1 + processor/bin/src/coordinator.rs | 5 +- processor/bin/src/lib.rs | 9 +- processor/messages/src/lib.rs | 4 +- processor/signers/src/lib.rs | 2 +- substrate/abi/src/in_instructions.rs | 15 +- substrate/client/src/serai/in_instructions.rs | 5 +- substrate/client/src/serai/mod.rs | 6 +- .../client/tests/common/genesis_liquidity.rs | 3 +- substrate/in-instructions/pallet/src/lib.rs | 69 +-- 25 files changed, 792 insertions(+), 743 deletions(-) delete mode 100644 coordinator/src/substrate/db.rs delete mode 100644 coordinator/src/substrate/mod.rs create mode 100644 coordinator/substrate/Cargo.toml create mode 100644 coordinator/substrate/LICENSE create mode 100644 coordinator/substrate/README.md create mode 100644 coordinator/substrate/src/canonical.rs create mode 100644 coordinator/substrate/src/ephemeral.rs create mode 100644 coordinator/substrate/src/lib.rs diff --git a/.github/workflows/msrv.yml b/.github/workflows/msrv.yml index 75fcdd79..4d37fab7 100644 --- a/.github/workflows/msrv.yml +++ b/.github/workflows/msrv.yml @@ -176,6 +176,7 @@ jobs: cargo msrv verify --manifest-path coordinator/tributary/tendermint/Cargo.toml cargo msrv verify --manifest-path coordinator/tributary/Cargo.toml cargo msrv verify --manifest-path coordinator/cosign/Cargo.toml + cargo msrv verify --manifest-path coordinator/substrate/Cargo.toml cargo msrv verify --manifest-path coordinator/Cargo.toml msrv-substrate: diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 9f1b0a1f..65a35cc3 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -62,6 +62,7 @@ jobs: -p tendermint-machine \ -p tributary-chain \ -p serai-cosign \ + -p serai-coordinator-substrate \ -p serai-coordinator \ -p serai-orchestrator \ -p serai-docker-tests diff --git a/Cargo.lock b/Cargo.lock index fe3c80bb..40f0e276 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8346,11 +8346,13 @@ version = "0.1.0" dependencies = [ "blake2", "borsh", + "futures", "log", "parity-scale-codec", "serai-client", "serai-cosign", "serai-db", + "serai-processor-messages", "serai-task", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index eea39f37..688537b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -99,6 +99,7 @@ members = [ "coordinator/tributary/tendermint", "coordinator/tributary", "coordinator/cosign", + "coordinator/substrate", "coordinator", "substrate/primitives", diff --git a/coordinator/cosign/Cargo.toml b/coordinator/cosign/Cargo.toml index bbd96399..fa5bd8ee 100644 --- a/coordinator/cosign/Cargo.toml +++ b/coordinator/cosign/Cargo.toml @@ -14,9 +14,6 @@ rust-version = "1.81" all-features = true rustdoc-args = ["--cfg", "docsrs"] -[package.metadata.cargo-machete] -ignored = ["scale"] - [lints] workspace = true @@ -30,7 +27,7 @@ serai-client = { path = "../../substrate/client", default-features = false, feat log = { version = "0.4", default-features = false, features = ["std"] } -tokio = { version = "1", default-features = false, features = [] } +tokio = { version = "1", default-features = false } -serai-db = { path = "../../common/db" } -serai-task = { path = "../../common/task" } +serai-db = { version = "0.1.1", path = "../../common/db" } +serai-task = { version = "0.1", path = "../../common/task" } diff --git a/coordinator/cosign/src/evaluator.rs b/coordinator/cosign/src/evaluator.rs index 856a6e00..fc606ecc 100644 --- a/coordinator/cosign/src/evaluator.rs +++ b/coordinator/cosign/src/evaluator.rs @@ -122,6 +122,8 @@ impl ContinuallyRan for CosignEvaluatorTask ContinuallyRan for CosignEvaluatorTask ContinuallyRan for CosignEvaluatorTask bool { + let Ok(signer) = schnorrkel::PublicKey::from_bytes(&signer.0) else { return false }; + let Ok(signature) = schnorrkel::Signature::from_bytes(&self.signature) else { return false }; + + signer.verify_simple(COSIGN_CONTEXT, &self.cosign.encode(), &signature).is_ok() + } +} + create_db! { Cosign { // The following are populated by the intend task and used throughout the library @@ -97,64 +156,6 @@ create_db! { } } -/// If the block has events. -#[derive(Clone, Copy, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] -enum HasEvents { - /// The block had a notable event. - /// - /// This is a special case as blocks with key gen events change the keys used for cosigning, and - /// accordingly must be cosigned before we advance past them. - Notable, - /// The block had an non-notable event justifying a cosign. - NonNotable, - /// The block didn't have an event justifying a cosign. - No, -} - -/// An intended cosign. -#[derive(Clone, Copy, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] -struct CosignIntent { - /// The global session this cosign is being performed under. - global_session: [u8; 32], - /// The number of the block to cosign. - block_number: u64, - /// The hash of the block to cosign. - block_hash: [u8; 32], - /// If this cosign must be handled before further cosigns are. - notable: bool, -} - -/// A cosign. -#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] -pub struct Cosign { - /// The global session this cosign is being performed under. - pub global_session: [u8; 32], - /// The number of the block to cosign. - pub block_number: u64, - /// The hash of the block to cosign. - pub block_hash: [u8; 32], - /// The actual cosigner. - pub cosigner: NetworkId, -} - -/// A signed cosign. -#[derive(Clone, Debug, BorshSerialize, BorshDeserialize)] -pub struct SignedCosign { - /// The cosign. - pub cosign: Cosign, - /// The signature for the cosign. - pub signature: [u8; 64], -} - -impl SignedCosign { - fn verify_signature(&self, signer: serai_client::Public) -> bool { - let Ok(signer) = schnorrkel::PublicKey::from_bytes(&signer.0) else { return false }; - let Ok(signature) = schnorrkel::Signature::from_bytes(&self.signature) else { return false }; - - signer.verify_simple(COSIGN_CONTEXT, &borsh::to_vec(&self.cosign).unwrap(), &signature).is_ok() - } -} - /// Fetch the keys used for cosigning by a specific network. async fn keys_for_network( serai: &TemporalSerai<'_>, @@ -219,6 +220,7 @@ pub trait RequestNotableCosigns: 'static + Send { } /// An error used to indicate the cosigning protocol has faulted. +#[derive(Debug)] pub struct Faulted; /// The interface to manage cosigning with. @@ -255,12 +257,23 @@ impl Cosigning { } /// The latest cosigned block number. - pub fn latest_cosigned_block_number(&self) -> Result { - if FaultedSession::get(&self.db).is_some() { + pub fn latest_cosigned_block_number(getter: &impl Get) -> Result { + if FaultedSession::get(getter).is_some() { Err(Faulted)?; } - Ok(LatestCosignedBlockNumber::get(&self.db).unwrap_or(0)) + Ok(LatestCosignedBlockNumber::get(getter).unwrap_or(0)) + } + + /// Fetch an cosigned Substrate block by its block number. + pub fn cosigned_block(getter: &impl Get, block_number: u64) -> Result, Faulted> { + if block_number > Self::latest_cosigned_block_number(getter)? { + return Ok(None); + } + + Ok(Some( + SubstrateBlocks::get(getter, block_number).expect("cosigned block but didn't index it"), + )) } /// Fetch the notable cosigns for a global session in order to respond to requests. @@ -422,4 +435,19 @@ impl Cosigning { txn.commit(); Ok(true) } + + /// Receive intended cosigns to produce for this ValidatorSet. + /// + /// All cosigns intended, up to and including the next notable cosign, are returned. + /// + /// This will drain the internal channel and not re-yield these intentions again. + pub fn intended_cosigns(txn: &mut impl DbTxn, set: ValidatorSet) -> Vec { + let mut res: Vec = vec![]; + // While we have yet to find a notable cosign... + while !res.last().map(|cosign| cosign.notable).unwrap_or(false) { + let Some(intent) = intend::IntendedCosigns::try_recv(txn, set) else { break }; + res.push(intent); + } + res + } } diff --git a/coordinator/src/substrate/db.rs b/coordinator/src/substrate/db.rs deleted file mode 100644 index 2621e5ef..00000000 --- a/coordinator/src/substrate/db.rs +++ /dev/null @@ -1,32 +0,0 @@ -use serai_client::primitives::NetworkId; - -pub use serai_db::*; - -mod inner_db { - use super::*; - - create_db!( - SubstrateDb { - NextBlock: () -> u64, - HandledEvent: (block: [u8; 32]) -> u32, - BatchInstructionsHashDb: (network: NetworkId, id: u32) -> [u8; 32] - } - ); -} -pub(crate) use inner_db::{NextBlock, BatchInstructionsHashDb}; - -pub struct HandledEvent; -impl HandledEvent { - fn next_to_handle_event(getter: &impl Get, block: [u8; 32]) -> u32 { - inner_db::HandledEvent::get(getter, block).map_or(0, |last| last + 1) - } - pub fn is_unhandled(getter: &impl Get, block: [u8; 32], event_id: u32) -> bool { - let next = Self::next_to_handle_event(getter, block); - assert!(next >= event_id); - next == event_id - } - pub fn handle_event(txn: &mut impl DbTxn, block: [u8; 32], index: u32) { - assert!(Self::next_to_handle_event(txn, block) == index); - inner_db::HandledEvent::set(txn, block, &index); - } -} diff --git a/coordinator/src/substrate/mod.rs b/coordinator/src/substrate/mod.rs deleted file mode 100644 index d1946b7e..00000000 --- a/coordinator/src/substrate/mod.rs +++ /dev/null @@ -1,583 +0,0 @@ -use core::{ops::Deref, time::Duration}; -use std::{ - sync::Arc, - collections::{HashSet, HashMap}, -}; - -use zeroize::Zeroizing; - -use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto}; - -use serai_client::{ - SeraiError, Block, Serai, TemporalSerai, - primitives::{BlockHash, EmbeddedEllipticCurve, NetworkId}, - validator_sets::{primitives::ValidatorSet, ValidatorSetsEvent}, - in_instructions::InInstructionsEvent, - coins::CoinsEvent, -}; - -use serai_db::DbTxn; - -use processor_messages::SubstrateContext; - -use tokio::{sync::mpsc, time::sleep}; - -use crate::{ - Db, - processors::Processors, - tributary::{TributarySpec, SeraiDkgCompleted}, -}; - -mod db; -pub use db::*; - -mod cosign; -pub use cosign::*; - -async fn in_set( - key: &Zeroizing<::F>, - serai: &TemporalSerai<'_>, - set: ValidatorSet, -) -> Result, SeraiError> { - let Some(participants) = serai.validator_sets().participants(set.network).await? else { - return Ok(None); - }; - let key = (Ristretto::generator() * key.deref()).to_bytes(); - Ok(Some(participants.iter().any(|(participant, _)| participant.0 == key))) -} - -async fn handle_new_set( - txn: &mut D::Transaction<'_>, - key: &Zeroizing<::F>, - new_tributary_spec: &mpsc::UnboundedSender, - serai: &Serai, - block: &Block, - set: ValidatorSet, -) -> Result<(), SeraiError> { - if in_set(key, &serai.as_of(block.hash()), set) - .await? - .expect("NewSet for set which doesn't exist") - { - log::info!("present in set {:?}", set); - - let validators; - let mut evrf_public_keys = vec![]; - { - let serai = serai.as_of(block.hash()); - let serai = serai.validator_sets(); - let set_participants = - serai.participants(set.network).await?.expect("NewSet for set which doesn't exist"); - - validators = set_participants - .iter() - .map(|(k, w)| { - ( - ::read_G::<&[u8]>(&mut k.0.as_ref()) - .expect("invalid key registered as participant"), - u16::try_from(*w).unwrap(), - ) - }) - .collect::>(); - for (validator, _) in set_participants { - // This is only run for external networks which always do a DKG for Serai - let substrate = serai - .embedded_elliptic_curve_key(validator, EmbeddedEllipticCurve::Embedwards25519) - .await? - .expect("Serai called NewSet on a validator without an Embedwards25519 key"); - // `embedded_elliptic_curves` is documented to have the second entry be the - // network-specific curve (if it exists and is distinct from Embedwards25519) - let network = - if let Some(embedded_elliptic_curve) = set.network.embedded_elliptic_curves().get(1) { - serai.embedded_elliptic_curve_key(validator, *embedded_elliptic_curve).await?.expect( - "Serai called NewSet on a validator without the embedded key required for the network", - ) - } else { - substrate.clone() - }; - evrf_public_keys.push(( - <[u8; 32]>::try_from(substrate) - .expect("validator-sets pallet accepted a key of an invalid length"), - network, - )); - } - }; - - let time = if let Ok(time) = block.time() { - time - } else { - assert_eq!(block.number(), 0); - // Use the next block's time - loop { - let Ok(Some(res)) = serai.finalized_block_by_number(1).await else { - sleep(Duration::from_secs(5)).await; - continue; - }; - break res.time().unwrap(); - } - }; - // The block time is in milliseconds yet the Tributary is in seconds - let time = time / 1000; - // Since this block is in the past, and Tendermint doesn't play nice with starting chains after - // their start time (though it does eventually work), delay the start time by 120 seconds - // This is meant to handle ~20 blocks of lack of finalization for this first block - const SUBSTRATE_TO_TRIBUTARY_TIME_DELAY: u64 = 120; - let time = time + SUBSTRATE_TO_TRIBUTARY_TIME_DELAY; - - let spec = TributarySpec::new(block.hash(), time, set, validators, evrf_public_keys); - - log::info!("creating new tributary for {:?}", spec.set()); - - // Save it to the database now, not on the channel receiver's side, so this is safe against - // reboots - // If this txn finishes, and we reboot, then this'll be reloaded from active Tributaries - // If this txn doesn't finish, this will be re-fired - // If we waited to save to the DB, this txn may be finished, preventing re-firing, yet the - // prior fired event may have not been received yet - crate::ActiveTributaryDb::add_participating_in_tributary(txn, &spec); - - new_tributary_spec.send(spec).unwrap(); - } else { - log::info!("not present in new set {:?}", set); - } - - Ok(()) -} - -async fn handle_batch_and_burns( - txn: &mut impl DbTxn, - processors: &Pro, - serai: &Serai, - block: &Block, -) -> Result<(), SeraiError> { - // Track which networks had events with a Vec in ordr to preserve the insertion order - // While that shouldn't be needed, ensuring order never hurts, and may enable design choices - // with regards to Processor <-> Coordinator message passing - let mut networks_with_event = vec![]; - let mut network_had_event = |burns: &mut HashMap<_, _>, batches: &mut HashMap<_, _>, network| { - // Don't insert this network multiple times - // A Vec is still used in order to maintain the insertion order - if !networks_with_event.contains(&network) { - networks_with_event.push(network); - burns.insert(network, vec![]); - batches.insert(network, vec![]); - } - }; - - let mut batch_block = HashMap::new(); - let mut batches = HashMap::>::new(); - let mut burns = HashMap::new(); - - let serai = serai.as_of(block.hash()); - for batch in serai.in_instructions().batch_events().await? { - if let InInstructionsEvent::Batch { network, id, block: network_block, instructions_hash } = - batch - { - network_had_event(&mut burns, &mut batches, network); - - BatchInstructionsHashDb::set(txn, network, id, &instructions_hash); - - // Make sure this is the only Batch event for this network in this Block - assert!(batch_block.insert(network, network_block).is_none()); - - // Add the batch included by this block - batches.get_mut(&network).unwrap().push(id); - } else { - panic!("Batch event wasn't Batch: {batch:?}"); - } - } - - for burn in serai.coins().burn_with_instruction_events().await? { - if let CoinsEvent::BurnWithInstruction { from: _, instruction } = burn { - let network = instruction.balance.coin.network(); - network_had_event(&mut burns, &mut batches, network); - - // network_had_event should register an entry in burns - burns.get_mut(&network).unwrap().push(instruction); - } else { - panic!("Burn event wasn't Burn: {burn:?}"); - } - } - - assert_eq!(HashSet::<&_>::from_iter(networks_with_event.iter()).len(), networks_with_event.len()); - - for network in networks_with_event { - let network_latest_finalized_block = if let Some(block) = batch_block.remove(&network) { - block - } else { - // If it's had a batch or a burn, it must have had a block acknowledged - serai - .in_instructions() - .latest_block_for_network(network) - .await? - .expect("network had a batch/burn yet never set a latest block") - }; - - processors - .send( - network, - processor_messages::substrate::CoordinatorMessage::SubstrateBlock { - context: SubstrateContext { - serai_time: block.time().unwrap() / 1000, - network_latest_finalized_block, - }, - block: block.number(), - burns: burns.remove(&network).unwrap(), - batches: batches.remove(&network).unwrap(), - }, - ) - .await; - } - - Ok(()) -} - -// Handle a specific Substrate block, returning an error when it fails to get data -// (not blocking / holding) -#[allow(clippy::too_many_arguments)] -async fn handle_block( - db: &mut D, - key: &Zeroizing<::F>, - new_tributary_spec: &mpsc::UnboundedSender, - perform_slash_report: &mpsc::UnboundedSender, - tributary_retired: &mpsc::UnboundedSender, - processors: &Pro, - serai: &Serai, - block: Block, -) -> Result<(), SeraiError> { - let hash = block.hash(); - - // Define an indexed event ID. - let mut event_id = 0; - - // If a new validator set was activated, create tributary/inform processor to do a DKG - for new_set in serai.as_of(hash).validator_sets().new_set_events().await? { - // Individually mark each event as handled so on reboot, we minimize duplicates - // Additionally, if the Serai connection also fails 1/100 times, this means a block with 1000 - // events will successfully be incrementally handled - // (though the Serai connection should be stable, making this unnecessary) - let ValidatorSetsEvent::NewSet { set } = new_set else { - panic!("NewSet event wasn't NewSet: {new_set:?}"); - }; - - // If this is Serai, do nothing - // We only coordinate/process external networks - if set.network == NetworkId::Serai { - continue; - } - - if HandledEvent::is_unhandled(db, hash, event_id) { - log::info!("found fresh new set event {:?}", new_set); - let mut txn = db.txn(); - handle_new_set::(&mut txn, key, new_tributary_spec, serai, &block, set).await?; - HandledEvent::handle_event(&mut txn, hash, event_id); - txn.commit(); - } - event_id += 1; - } - - // If a key pair was confirmed, inform the processor - for key_gen in serai.as_of(hash).validator_sets().key_gen_events().await? { - if HandledEvent::is_unhandled(db, hash, event_id) { - log::info!("found fresh key gen event {:?}", key_gen); - let ValidatorSetsEvent::KeyGen { set, key_pair } = key_gen else { - panic!("KeyGen event wasn't KeyGen: {key_gen:?}"); - }; - let substrate_key = key_pair.0 .0; - processors - .send( - set.network, - processor_messages::substrate::CoordinatorMessage::ConfirmKeyPair { - context: SubstrateContext { - serai_time: block.time().unwrap() / 1000, - network_latest_finalized_block: serai - .as_of(block.hash()) - .in_instructions() - .latest_block_for_network(set.network) - .await? - // The processor treats this as a magic value which will cause it to find a network - // block which has a time greater than or equal to the Serai time - .unwrap_or(BlockHash([0; 32])), - }, - session: set.session, - key_pair, - }, - ) - .await; - - // TODO: If we were in the set, yet were removed, drop the tributary - - let mut txn = db.txn(); - SeraiDkgCompleted::set(&mut txn, set, &substrate_key); - HandledEvent::handle_event(&mut txn, hash, event_id); - txn.commit(); - } - event_id += 1; - } - - for accepted_handover in serai.as_of(hash).validator_sets().accepted_handover_events().await? { - let ValidatorSetsEvent::AcceptedHandover { set } = accepted_handover else { - panic!("AcceptedHandover event wasn't AcceptedHandover: {accepted_handover:?}"); - }; - - if set.network == NetworkId::Serai { - continue; - } - - if HandledEvent::is_unhandled(db, hash, event_id) { - log::info!("found fresh accepted handover event {:?}", accepted_handover); - // TODO: This isn't atomic with the event handling - // Send a oneshot receiver so we can await the response? - perform_slash_report.send(set).unwrap(); - let mut txn = db.txn(); - HandledEvent::handle_event(&mut txn, hash, event_id); - txn.commit(); - } - event_id += 1; - } - - for retired_set in serai.as_of(hash).validator_sets().set_retired_events().await? { - let ValidatorSetsEvent::SetRetired { set } = retired_set else { - panic!("SetRetired event wasn't SetRetired: {retired_set:?}"); - }; - - if set.network == NetworkId::Serai { - continue; - } - - if HandledEvent::is_unhandled(db, hash, event_id) { - log::info!("found fresh set retired event {:?}", retired_set); - let mut txn = db.txn(); - crate::ActiveTributaryDb::retire_tributary(&mut txn, set); - tributary_retired.send(set).unwrap(); - HandledEvent::handle_event(&mut txn, hash, event_id); - txn.commit(); - } - event_id += 1; - } - - // Finally, tell the processor of acknowledged blocks/burns - // This uses a single event as unlike prior events which individually executed code, all - // following events share data collection - if HandledEvent::is_unhandled(db, hash, event_id) { - let mut txn = db.txn(); - handle_batch_and_burns(&mut txn, processors, serai, &block).await?; - HandledEvent::handle_event(&mut txn, hash, event_id); - txn.commit(); - } - - Ok(()) -} - -#[allow(clippy::too_many_arguments)] -async fn handle_new_blocks( - db: &mut D, - key: &Zeroizing<::F>, - new_tributary_spec: &mpsc::UnboundedSender, - perform_slash_report: &mpsc::UnboundedSender, - tributary_retired: &mpsc::UnboundedSender, - processors: &Pro, - serai: &Serai, - next_block: &mut u64, -) -> Result<(), SeraiError> { - // Check if there's been a new Substrate block - let latest_number = serai.latest_finalized_block().await?.number(); - - // Advance the cosigning protocol - advance_cosign_protocol(db, key, serai, latest_number).await?; - - // Reduce to the latest cosigned block - let latest_number = latest_number.min(LatestCosignedBlock::latest_cosigned_block(db)); - - if latest_number < *next_block { - return Ok(()); - } - - for b in *next_block ..= latest_number { - let block = serai - .finalized_block_by_number(b) - .await? - .expect("couldn't get block before the latest finalized block"); - - log::info!("handling substrate block {b}"); - handle_block( - db, - key, - new_tributary_spec, - perform_slash_report, - tributary_retired, - processors, - serai, - block, - ) - .await?; - *next_block += 1; - - let mut txn = db.txn(); - NextBlock::set(&mut txn, next_block); - txn.commit(); - - log::info!("handled substrate block {b}"); - } - - Ok(()) -} - -pub async fn scan_task( - mut db: D, - key: Zeroizing<::F>, - processors: Pro, - serai: Arc, - new_tributary_spec: mpsc::UnboundedSender, - perform_slash_report: mpsc::UnboundedSender, - tributary_retired: mpsc::UnboundedSender, -) { - log::info!("scanning substrate"); - let mut next_substrate_block = NextBlock::get(&db).unwrap_or_default(); - - /* - let new_substrate_block_notifier = { - let serai = &serai; - move || async move { - loop { - match serai.newly_finalized_block().await { - Ok(sub) => return sub, - Err(e) => { - log::error!("couldn't communicate with serai node: {e}"); - sleep(Duration::from_secs(5)).await; - } - } - } - } - }; - */ - // TODO: Restore the above subscription-based system - // That would require moving serai-client from HTTP to websockets - let new_substrate_block_notifier = { - let serai = &serai; - move |next_substrate_block| async move { - loop { - match serai.latest_finalized_block().await { - Ok(latest) => { - if latest.header.number >= next_substrate_block { - return latest; - } - sleep(Duration::from_secs(3)).await; - } - Err(e) => { - log::error!("couldn't communicate with serai node: {e}"); - sleep(Duration::from_secs(5)).await; - } - } - } - } - }; - - loop { - // await the next block, yet if our notifier had an error, re-create it - { - let Ok(_) = tokio::time::timeout( - Duration::from_secs(60), - new_substrate_block_notifier(next_substrate_block), - ) - .await - else { - // Timed out, which may be because Serai isn't finalizing or may be some issue with the - // notifier - if serai.latest_finalized_block().await.map(|block| block.number()).ok() == - Some(next_substrate_block.saturating_sub(1)) - { - log::info!("serai hasn't finalized a block in the last 60s..."); - } - continue; - }; - - /* - // next_block is a Option - if next_block.and_then(Result::ok).is_none() { - substrate_block_notifier = new_substrate_block_notifier(next_substrate_block); - continue; - } - */ - } - - match handle_new_blocks( - &mut db, - &key, - &new_tributary_spec, - &perform_slash_report, - &tributary_retired, - &processors, - &serai, - &mut next_substrate_block, - ) - .await - { - Ok(()) => {} - Err(e) => { - log::error!("couldn't communicate with serai node: {e}"); - sleep(Duration::from_secs(5)).await; - } - } - } -} - -/// Gets the expected ID for the next Batch. -/// -/// Will log an error and apply a slight sleep on error, letting the caller simply immediately -/// retry. -pub(crate) async fn expected_next_batch( - serai: &Serai, - network: NetworkId, -) -> Result { - async fn expected_next_batch_inner(serai: &Serai, network: NetworkId) -> Result { - let serai = serai.as_of_latest_finalized_block().await?; - let last = serai.in_instructions().last_batch_for_network(network).await?; - Ok(if let Some(last) = last { last + 1 } else { 0 }) - } - match expected_next_batch_inner(serai, network).await { - Ok(next) => Ok(next), - Err(e) => { - log::error!("couldn't get the expected next batch from substrate: {e:?}"); - sleep(Duration::from_millis(100)).await; - Err(e) - } - } -} - -/// Verifies `Batch`s which have already been indexed from Substrate. -/// -/// Spins if a distinct `Batch` is detected on-chain. -/// -/// This has a slight malleability in that doesn't verify *who* published a `Batch` is as expected. -/// This is deemed fine. -pub(crate) async fn verify_published_batches( - txn: &mut D::Transaction<'_>, - network: NetworkId, - optimistic_up_to: u32, -) -> Option { - // TODO: Localize from MainDb to SubstrateDb - let last = crate::LastVerifiedBatchDb::get(txn, network); - for id in last.map_or(0, |last| last + 1) ..= optimistic_up_to { - let Some(on_chain) = BatchInstructionsHashDb::get(txn, network, id) else { - break; - }; - let off_chain = crate::ExpectedBatchDb::get(txn, network, id).unwrap(); - if on_chain != off_chain { - // Halt operations on this network and spin, as this is a critical fault - loop { - log::error!( - "{}! network: {:?} id: {} off-chain: {} on-chain: {}", - "on-chain batch doesn't match off-chain", - network, - id, - hex::encode(off_chain), - hex::encode(on_chain), - ); - sleep(Duration::from_secs(60)).await; - } - } - crate::LastVerifiedBatchDb::set(txn, network, &id); - } - - crate::LastVerifiedBatchDb::get(txn, network) -} diff --git a/coordinator/substrate/Cargo.toml b/coordinator/substrate/Cargo.toml new file mode 100644 index 00000000..4d66c05e --- /dev/null +++ b/coordinator/substrate/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "serai-coordinator-substrate" +version = "0.1.0" +description = "Serai Coordinator's Substrate Scanner" +license = "AGPL-3.0-only" +repository = "https://github.com/serai-dex/serai/tree/develop/coordinator/substrate" +authors = ["Luke Parker "] +keywords = [] +edition = "2021" +publish = false +rust-version = "1.81" + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + +[lints] +workspace = true + +[dependencies] +scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive"] } +borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] } +serai-client = { path = "../../substrate/client", default-features = false, features = ["serai", "borsh"] } + +log = { version = "0.4", default-features = false, features = ["std"] } + +futures = { version = "0.3", default-features = false, features = ["std"] } +tokio = { version = "1", default-features = false } + +serai-db = { version = "0.1.1", path = "../../common/db" } +serai-task = { version = "0.1", path = "../../common/task" } + +serai-cosign = { path = "../cosign" } + +messages = { package = "serai-processor-messages", path = "../../processor/messages" } diff --git a/coordinator/substrate/LICENSE b/coordinator/substrate/LICENSE new file mode 100644 index 00000000..26d57cbb --- /dev/null +++ b/coordinator/substrate/LICENSE @@ -0,0 +1,15 @@ +AGPL-3.0-only license + +Copyright (c) 2023-2024 Luke Parker + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License Version 3 as +published by the Free Software Foundation. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . diff --git a/coordinator/substrate/README.md b/coordinator/substrate/README.md new file mode 100644 index 00000000..83d217aa --- /dev/null +++ b/coordinator/substrate/README.md @@ -0,0 +1,14 @@ +# Serai Coordinate Substrate Scanner + +This is the scanner of the Serai blockchain for the purposes of Serai's coordinator. + +Two event streams are defined: + +- Canonical events, which must be handled by every validator, regardless of the sets they're present + in. These are represented by `serai_processor_messages::substrate::CoordinatorMessage`. +- Ephemeral events, which only need to be handled by the validators present within the sets they + relate to. These are represented by two channels, `NewSet` and `SignSlashReport`. + +The canonical event stream is available without provision of a validator's public key. The ephemeral +event stream requires provision of a validator's public key. Both are ordered within themselves, yet +there are no ordering guarantees across the two. diff --git a/coordinator/substrate/src/canonical.rs b/coordinator/substrate/src/canonical.rs new file mode 100644 index 00000000..d778bc7c --- /dev/null +++ b/coordinator/substrate/src/canonical.rs @@ -0,0 +1,216 @@ +use std::future::Future; + +use futures::stream::{StreamExt, FuturesOrdered}; + +use serai_client::Serai; + +use messages::substrate::{InInstructionResult, ExecutedBatch, CoordinatorMessage}; + +use serai_db::*; +use serai_task::ContinuallyRan; + +use serai_cosign::Cosigning; + +create_db!( + CoordinatorSubstrateCanonical { + NextBlock: () -> u64, + } +); + +/// The event stream for canonical events. +pub struct CanonicalEventStream { + db: D, + serai: Serai, +} + +impl CanonicalEventStream { + /// Create a new canonical event stream. + /// + /// Only one of these may exist over the provided database. + pub fn new(db: D, serai: Serai) -> Self { + Self { db, serai } + } +} + +impl ContinuallyRan for CanonicalEventStream { + fn run_iteration(&mut self) -> impl Send + Future> { + async move { + let next_block = NextBlock::get(&self.db).unwrap_or(0); + let latest_finalized_block = + Cosigning::::latest_cosigned_block_number(&self.db).map_err(|e| format!("{e:?}"))?; + + // These are all the events which generate canonical messages + struct CanonicalEvents { + time: u64, + key_gen_events: Vec, + set_retired_events: Vec, + batch_events: Vec, + burn_events: Vec, + } + + // For a cosigned block, fetch all relevant events + let scan = { + let db = self.db.clone(); + let serai = &self.serai; + move |block_number| { + let block_hash = Cosigning::::cosigned_block(&db, block_number); + + async move { + let block_hash = match block_hash { + Ok(Some(block_hash)) => block_hash, + Ok(None) => { + panic!("iterating to latest cosigned block but couldn't get cosigned block") + } + Err(serai_cosign::Faulted) => return Err("cosigning process faulted".to_string()), + }; + let temporal_serai = serai.as_of(block_hash); + let temporal_serai_validators = temporal_serai.validator_sets(); + let temporal_serai_instructions = temporal_serai.in_instructions(); + let temporal_serai_coins = temporal_serai.coins(); + + let (block, key_gen_events, set_retired_events, batch_events, burn_events) = + tokio::try_join!( + serai.block(block_hash), + temporal_serai_validators.key_gen_events(), + temporal_serai_validators.set_retired_events(), + temporal_serai_instructions.batch_events(), + temporal_serai_coins.burn_with_instruction_events(), + ) + .map_err(|e| format!("{e:?}"))?; + let Some(block) = block else { + Err(format!("Serai node didn't have cosigned block #{block_number}"))? + }; + + let time = if block_number == 0 { + block.time().unwrap_or(0) + } else { + // Serai's block time is in milliseconds + block + .time() + .ok_or_else(|| "non-genesis Serai block didn't have a time".to_string())? / + 1000 + }; + + Ok(( + block_number, + CanonicalEvents { + time, + key_gen_events, + set_retired_events, + batch_events, + burn_events, + }, + )) + } + } + }; + + // Sync the next set of upcoming blocks all at once to minimize latency + const BLOCKS_TO_SYNC_AT_ONCE: u64 = 10; + let mut set = FuturesOrdered::new(); + for block_number in + next_block ..= latest_finalized_block.min(next_block + BLOCKS_TO_SYNC_AT_ONCE) + { + set.push_back(scan(block_number)); + } + + for block_number in next_block ..= latest_finalized_block { + // Get the next block in our queue + let (popped_block_number, block) = set.next().await.unwrap()?; + assert_eq!(block_number, popped_block_number); + // Re-populate the queue + if (block_number + BLOCKS_TO_SYNC_AT_ONCE) <= latest_finalized_block { + set.push_back(scan(block_number + BLOCKS_TO_SYNC_AT_ONCE)); + } + + let mut txn = self.db.txn(); + + for key_gen in block.key_gen_events { + let serai_client::validator_sets::ValidatorSetsEvent::KeyGen { set, key_pair } = &key_gen + else { + panic!("KeyGen event wasn't a KeyGen event: {key_gen:?}"); + }; + crate::Canonical::send( + &mut txn, + set.network, + &CoordinatorMessage::SetKeys { + serai_time: block.time, + session: set.session, + key_pair: key_pair.clone(), + }, + ); + } + + for set_retired in block.set_retired_events { + let serai_client::validator_sets::ValidatorSetsEvent::SetRetired { set } = &set_retired + else { + panic!("SetRetired event wasn't a SetRetired event: {set_retired:?}"); + }; + crate::Canonical::send( + &mut txn, + set.network, + &CoordinatorMessage::SlashesReported { session: set.session }, + ); + } + + for network in serai_client::primitives::NETWORKS { + let mut batch = None; + for this_batch in &block.batch_events { + let serai_client::in_instructions::InInstructionsEvent::Batch { + network: batch_network, + publishing_session, + id, + in_instructions_hash, + in_instruction_results, + } = this_batch + else { + panic!("Batch event wasn't a Batch event: {this_batch:?}"); + }; + if network == *batch_network { + if batch.is_some() { + Err("Serai block had multiple batches for the same network".to_string())?; + } + batch = Some(ExecutedBatch { + id: *id, + publisher: *publishing_session, + in_instructions_hash: *in_instructions_hash, + in_instruction_results: in_instruction_results + .iter() + .map(|bit| { + if *bit { + InInstructionResult::Succeeded + } else { + InInstructionResult::Failed + } + }) + .collect(), + }); + } + } + + let mut burns = vec![]; + for burn in &block.burn_events { + let serai_client::coins::CoinsEvent::BurnWithInstruction { from: _, instruction } = + &burn + else { + panic!("Burn event wasn't a Burn.in event: {burn:?}"); + }; + if instruction.balance.coin.network() == network { + burns.push(instruction.clone()); + } + } + + crate::Canonical::send( + &mut txn, + network, + &CoordinatorMessage::Block { serai_block_number: block_number, batch, burns }, + ); + } + + txn.commit(); + } + + Ok(next_block <= latest_finalized_block) + } + } +} diff --git a/coordinator/substrate/src/ephemeral.rs b/coordinator/substrate/src/ephemeral.rs new file mode 100644 index 00000000..858b5895 --- /dev/null +++ b/coordinator/substrate/src/ephemeral.rs @@ -0,0 +1,240 @@ +use std::future::Future; + +use futures::stream::{StreamExt, FuturesOrdered}; + +use serai_client::{ + primitives::{PublicKey, NetworkId, EmbeddedEllipticCurve}, + validator_sets::primitives::MAX_KEY_SHARES_PER_SET, + Serai, +}; + +use serai_db::*; +use serai_task::ContinuallyRan; + +use serai_cosign::Cosigning; + +use crate::NewSetInformation; + +create_db!( + CoordinatorSubstrateEphemeral { + NextBlock: () -> u64, + } +); + +/// The event stream for ephemeral events. +pub struct EphemeralEventStream { + db: D, + serai: Serai, + validator: PublicKey, +} + +impl EphemeralEventStream { + /// Create a new ephemeral event stream. + /// + /// Only one of these may exist over the provided database. + pub fn new(db: D, serai: Serai, validator: PublicKey) -> Self { + Self { db, serai, validator } + } +} + +impl ContinuallyRan for EphemeralEventStream { + fn run_iteration(&mut self) -> impl Send + Future> { + async move { + let next_block = NextBlock::get(&self.db).unwrap_or(0); + let latest_finalized_block = + Cosigning::::latest_cosigned_block_number(&self.db).map_err(|e| format!("{e:?}"))?; + + // These are all the events which generate canonical messages + struct EphemeralEvents { + block_hash: [u8; 32], + time: u64, + new_set_events: Vec, + accepted_handover_events: Vec, + } + + // For a cosigned block, fetch all relevant events + let scan = { + let db = self.db.clone(); + let serai = &self.serai; + move |block_number| { + let block_hash = Cosigning::::cosigned_block(&db, block_number); + + async move { + let block_hash = match block_hash { + Ok(Some(block_hash)) => block_hash, + Ok(None) => { + panic!("iterating to latest cosigned block but couldn't get cosigned block") + } + Err(serai_cosign::Faulted) => return Err("cosigning process faulted".to_string()), + }; + + let temporal_serai = serai.as_of(block_hash); + let temporal_serai_validators = temporal_serai.validator_sets(); + let (block, new_set_events, accepted_handover_events) = tokio::try_join!( + serai.block(block_hash), + temporal_serai_validators.new_set_events(), + temporal_serai_validators.accepted_handover_events(), + ) + .map_err(|e| format!("{e:?}"))?; + let Some(block) = block else { + Err(format!("Serai node didn't have cosigned block #{block_number}"))? + }; + + let time = if block_number == 0 { + block.time().unwrap_or(0) + } else { + // Serai's block time is in milliseconds + block + .time() + .ok_or_else(|| "non-genesis Serai block didn't have a time".to_string())? / + 1000 + }; + + Ok(( + block_number, + EphemeralEvents { block_hash, time, new_set_events, accepted_handover_events }, + )) + } + } + }; + + // Sync the next set of upcoming blocks all at once to minimize latency + const BLOCKS_TO_SYNC_AT_ONCE: u64 = 50; + let mut set = FuturesOrdered::new(); + for block_number in + next_block ..= latest_finalized_block.min(next_block + BLOCKS_TO_SYNC_AT_ONCE) + { + set.push_back(scan(block_number)); + } + + for block_number in next_block ..= latest_finalized_block { + // Get the next block in our queue + let (popped_block_number, block) = set.next().await.unwrap()?; + assert_eq!(block_number, popped_block_number); + // Re-populate the queue + if (block_number + BLOCKS_TO_SYNC_AT_ONCE) <= latest_finalized_block { + set.push_back(scan(block_number + BLOCKS_TO_SYNC_AT_ONCE)); + } + + let mut txn = self.db.txn(); + + for new_set in block.new_set_events { + let serai_client::validator_sets::ValidatorSetsEvent::NewSet { set } = &new_set else { + panic!("NewSet event wasn't a NewSet event: {new_set:?}"); + }; + + // We only coordinate over external networks + if set.network == NetworkId::Serai { + continue; + } + + let serai = self.serai.as_of(block.block_hash); + let serai = serai.validator_sets(); + let Some(validators) = + serai.participants(set.network).await.map_err(|e| format!("{e:?}"))? + else { + Err(format!( + "block #{block_number} declared a new set but didn't have the participants" + ))? + }; + let in_set = validators.iter().any(|(validator, _)| *validator == self.validator); + if in_set { + if u16::try_from(validators.len()).is_err() { + Err("more than u16::MAX validators sent")?; + } + + let Ok(validators) = validators + .into_iter() + .map(|(validator, weight)| u16::try_from(weight).map(|weight| (validator, weight))) + .collect::, _>>() + else { + Err("validator's weight exceeded u16::MAX".to_string())? + }; + + let total_weight = validators.iter().map(|(_, weight)| u32::from(*weight)).sum::(); + if total_weight > MAX_KEY_SHARES_PER_SET { + Err(format!( + "{set:?} has {total_weight} key shares when the max is {MAX_KEY_SHARES_PER_SET}" + ))?; + } + let total_weight = u16::try_from(total_weight).unwrap(); + + // Fetch all of the validators' embedded elliptic curve keys + let mut embedded_elliptic_curve_keys = FuturesOrdered::new(); + for (validator, _) in &validators { + let validator = *validator; + // try_join doesn't return a future so we need to wrap it in this additional async + // block + embedded_elliptic_curve_keys.push_back(async move { + tokio::try_join!( + // One future to fetch the substrate embedded key + serai + .embedded_elliptic_curve_key(validator, EmbeddedEllipticCurve::Embedwards25519), + // One future to fetch the external embedded key, if there is a distinct curve + async { + // `embedded_elliptic_curves` is documented to have the second entry be the + // network-specific curve (if it exists and is distinct from Embedwards25519) + if let Some(curve) = set.network.embedded_elliptic_curves().get(1) { + serai.embedded_elliptic_curve_key(validator, *curve).await.map(Some) + } else { + Ok(None) + } + } + ) + .map(|(substrate_embedded_key, external_embedded_key)| { + (validator, substrate_embedded_key, external_embedded_key) + }) + }); + } + + let mut evrf_public_keys = Vec::with_capacity(usize::from(total_weight)); + for (validator, weight) in &validators { + let (future_validator, substrate_embedded_key, external_embedded_key) = + embedded_elliptic_curve_keys.next().await.unwrap().map_err(|e| format!("{e:?}"))?; + assert_eq!(*validator, future_validator); + let external_embedded_key = + external_embedded_key.unwrap_or(substrate_embedded_key.clone()); + match (substrate_embedded_key, external_embedded_key) { + (Some(substrate_embedded_key), Some(external_embedded_key)) => { + let substrate_embedded_key = <[u8; 32]>::try_from(substrate_embedded_key) + .map_err(|_| "Embedwards25519 key wasn't 32 bytes".to_string())?; + for _ in 0 .. *weight { + evrf_public_keys.push((substrate_embedded_key, external_embedded_key.clone())); + } + } + _ => Err("NewSet with validator missing an embedded key".to_string())?, + } + } + + crate::NewSet::send( + &mut txn, + &NewSetInformation { + set: *set, + serai_block: block.block_hash, + start_time: block.time, + // TODO: Why do we have this as an explicit field here? + // Shouldn't thiis be inlined into the Processor's key gen code, where it's used? + threshold: ((total_weight * 2) / 3) + 1, + validators, + evrf_public_keys, + }, + ); + } + } + + for accepted_handover in block.accepted_handover_events { + let serai_client::validator_sets::ValidatorSetsEvent::AcceptedHandover { set } = + &accepted_handover + else { + panic!("AcceptedHandover event wasn't a AcceptedHandover event: {accepted_handover:?}"); + }; + crate::SignSlashReport::send(&mut txn, set); + } + + txn.commit(); + } + + Ok(next_block <= latest_finalized_block) + } + } +} diff --git a/coordinator/substrate/src/lib.rs b/coordinator/substrate/src/lib.rs new file mode 100644 index 00000000..9c3c8863 --- /dev/null +++ b/coordinator/substrate/src/lib.rs @@ -0,0 +1,109 @@ +#![cfg_attr(docsrs, feature(doc_auto_cfg))] +#![doc = include_str!("../README.md")] +#![deny(missing_docs)] + +use scale::{Encode, Decode}; +use borsh::{io, BorshSerialize, BorshDeserialize}; + +use serai_client::{ + primitives::{PublicKey, NetworkId}, + validator_sets::primitives::ValidatorSet, +}; + +use serai_db::*; + +mod canonical; +mod ephemeral; + +fn borsh_serialize_validators( + validators: &Vec<(PublicKey, u16)>, + writer: &mut W, +) -> Result<(), io::Error> { + // This doesn't use `encode_to` as `encode_to` panics if the writer returns an error + writer.write_all(&validators.encode()) +} + +fn borsh_deserialize_validators( + reader: &mut R, +) -> Result, io::Error> { + Decode::decode(&mut scale::IoReader(reader)).map_err(io::Error::other) +} + +/// The information for a new set. +#[derive(Debug, BorshSerialize, BorshDeserialize)] +pub struct NewSetInformation { + set: ValidatorSet, + serai_block: [u8; 32], + start_time: u64, + threshold: u16, + #[borsh( + serialize_with = "borsh_serialize_validators", + deserialize_with = "borsh_deserialize_validators" + )] + validators: Vec<(PublicKey, u16)>, + evrf_public_keys: Vec<([u8; 32], Vec)>, +} + +mod _public_db { + use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet}; + + use serai_db::*; + + use crate::NewSetInformation; + + db_channel!( + CoordinatorSubstrate { + // Canonical messages to send to the processor + Canonical: (network: NetworkId) -> messages::substrate::CoordinatorMessage, + + // Relevant new set, from an ephemeral event stream + NewSet: () -> NewSetInformation, + // Relevant sign slash report, from an ephemeral event stream + SignSlashReport: () -> ValidatorSet, + } + ); +} + +/// The canonical event stream. +pub struct Canonical; +impl Canonical { + pub(crate) fn send( + txn: &mut impl DbTxn, + network: NetworkId, + msg: &messages::substrate::CoordinatorMessage, + ) { + _public_db::Canonical::send(txn, network, msg); + } + /// Try to receive a canonical event, returning `None` if there is none to receive. + pub fn try_recv( + txn: &mut impl DbTxn, + network: NetworkId, + ) -> Option { + _public_db::Canonical::try_recv(txn, network) + } +} + +/// The channel for new set events emitted by an ephemeral event stream. +pub struct NewSet; +impl NewSet { + pub(crate) fn send(txn: &mut impl DbTxn, msg: &NewSetInformation) { + _public_db::NewSet::send(txn, msg); + } + /// Try to receive a new set's information, returning `None` if there is none to receive. + pub fn try_recv(txn: &mut impl DbTxn) -> Option { + _public_db::NewSet::try_recv(txn) + } +} + +/// The channel for notifications to sign a slash report, as emitted by an ephemeral event stream. +pub struct SignSlashReport; +impl SignSlashReport { + pub(crate) fn send(txn: &mut impl DbTxn, set: &ValidatorSet) { + _public_db::SignSlashReport::send(txn, set); + } + /// Try to receive a notification to sign a slash report, returning `None` if there is none to + /// receive. + pub fn try_recv(txn: &mut impl DbTxn) -> Option { + _public_db::SignSlashReport::try_recv(txn) + } +} diff --git a/deny.toml b/deny.toml index cc45984a..fa12461c 100644 --- a/deny.toml +++ b/deny.toml @@ -74,6 +74,7 @@ exceptions = [ { allow = ["AGPL-3.0"], name = "tributary-chain" }, { allow = ["AGPL-3.0"], name = "serai-cosign" }, + { allow = ["AGPL-3.0"], name = "serai-coordinator-substrate" }, { allow = ["AGPL-3.0"], name = "serai-coordinator" }, { allow = ["AGPL-3.0"], name = "serai-coins-pallet" }, diff --git a/processor/bin/src/coordinator.rs b/processor/bin/src/coordinator.rs index e5d0e23b..255525a2 100644 --- a/processor/bin/src/coordinator.rs +++ b/processor/bin/src/coordinator.rs @@ -5,9 +5,8 @@ use tokio::sync::mpsc; use scale::Encode; use serai_client::{ - primitives::Signature, - validator_sets::primitives::Session, - in_instructions::primitives::{Batch, SignedBatch}, + primitives::Signature, validator_sets::primitives::Session, + in_instructions::primitives::SignedBatch, }; use serai_db::{Get, DbTxn, Db, create_db, db_channel}; diff --git a/processor/bin/src/lib.rs b/processor/bin/src/lib.rs index 0fc7257e..7dc794bf 100644 --- a/processor/bin/src/lib.rs +++ b/processor/bin/src/lib.rs @@ -272,20 +272,17 @@ pub async fn main_loop< } messages::substrate::CoordinatorMessage::Block { serai_block_number: _, - batches, + batch, mut burns, } => { let scanner = scanner.as_mut().unwrap(); - // Substrate sets this limit to prevent DoSs from malicious validator sets - // That bound lets us consume this txn in the following loop body, as an optimization - assert!(batches.len() <= 1); - for messages::substrate::ExecutedBatch { + if let Some(messages::substrate::ExecutedBatch { id, publisher, in_instructions_hash, in_instruction_results, - } in batches + }) = batch { let key_to_activate = KeyToActivate::>::try_recv(txn.as_mut().unwrap()).map(|key| key.0); diff --git a/processor/messages/src/lib.rs b/processor/messages/src/lib.rs index bbab3186..1b6e1996 100644 --- a/processor/messages/src/lib.rs +++ b/processor/messages/src/lib.rs @@ -145,7 +145,7 @@ pub mod sign { pub mod coordinator { use super::*; - // TODO: Why does this not simply take the block hash? + // TODO: Remove this for the one defined in serai-cosign pub fn cosign_block_msg(block_number: u64, block: [u8; 32]) -> Vec { const DST: &[u8] = b"Cosign"; let mut res = vec![u8::try_from(DST.len()).unwrap()]; @@ -203,7 +203,7 @@ pub mod substrate { /// A block from Serai with relevance to this processor. Block { serai_block_number: u64, - batches: Vec, + batch: Option, burns: Vec, }, } diff --git a/processor/signers/src/lib.rs b/processor/signers/src/lib.rs index 40e538aa..116f7b9e 100644 --- a/processor/signers/src/lib.rs +++ b/processor/signers/src/lib.rs @@ -12,7 +12,7 @@ use frost::dkg::{ThresholdCore, ThresholdKeys}; use serai_primitives::Signature; use serai_validator_sets_primitives::{Session, Slash}; -use serai_in_instructions_primitives::{Batch, SignedBatch}; +use serai_in_instructions_primitives::SignedBatch; use serai_db::{DbTxn, Db}; diff --git a/substrate/abi/src/in_instructions.rs b/substrate/abi/src/in_instructions.rs index d3ab5ca3..89729f7a 100644 --- a/substrate/abi/src/in_instructions.rs +++ b/substrate/abi/src/in_instructions.rs @@ -2,6 +2,7 @@ use serai_primitives::*; pub use serai_in_instructions_primitives as primitives; use primitives::SignedBatch; +use serai_validator_sets_primitives::Session; #[derive(Clone, PartialEq, Eq, Debug, scale::Encode, scale::Decode, scale_info::TypeInfo)] #[cfg_attr(feature = "borsh", derive(borsh::BorshSerialize, borsh::BorshDeserialize))] @@ -12,11 +13,17 @@ pub enum Call { } #[derive(Clone, PartialEq, Eq, Debug, scale::Encode, scale::Decode, scale_info::TypeInfo)] -#[cfg_attr(feature = "borsh", derive(borsh::BorshSerialize, borsh::BorshDeserialize))] #[cfg_attr(feature = "serde", derive(serde::Serialize))] #[cfg_attr(all(feature = "std", feature = "serde"), derive(serde::Deserialize))] pub enum Event { - Batch { network: NetworkId, id: u32, block: BlockHash, instructions_hash: [u8; 32] }, - InstructionFailure { network: NetworkId, id: u32, index: u32 }, - Halt { network: NetworkId }, + Batch { + network: NetworkId, + publishing_session: Session, + id: u32, + in_instructions_hash: [u8; 32], + in_instruction_results: bitvec::vec::BitVec, + }, + Halt { + network: NetworkId, + }, } diff --git a/substrate/client/src/serai/in_instructions.rs b/substrate/client/src/serai/in_instructions.rs index 50c9ed96..29f9b1a2 100644 --- a/substrate/client/src/serai/in_instructions.rs +++ b/substrate/client/src/serai/in_instructions.rs @@ -1,10 +1,7 @@ pub use serai_abi::in_instructions::primitives; use primitives::SignedBatch; -use crate::{ - primitives::{BlockHash, NetworkId}, - Transaction, SeraiError, Serai, TemporalSerai, -}; +use crate::{primitives::NetworkId, Transaction, SeraiError, Serai, TemporalSerai}; pub type InInstructionsEvent = serai_abi::in_instructions::Event; diff --git a/substrate/client/src/serai/mod.rs b/substrate/client/src/serai/mod.rs index 8b17d5d1..f99e9a39 100644 --- a/substrate/client/src/serai/mod.rs +++ b/substrate/client/src/serai/mod.rs @@ -45,13 +45,13 @@ impl Block { } /// Returns the time of this block, set by its producer, in milliseconds since the epoch. - pub fn time(&self) -> Result { + pub fn time(&self) -> Option { for transaction in &self.transactions { if let Call::Timestamp(timestamp::Call::set { now }) = transaction.call() { - return Ok(*now); + return Some(*now); } } - Err(SeraiError::InvalidNode("no time was present in block".to_string())) + None } } diff --git a/substrate/client/tests/common/genesis_liquidity.rs b/substrate/client/tests/common/genesis_liquidity.rs index 0c0cd269..c8c613f5 100644 --- a/substrate/client/tests/common/genesis_liquidity.rs +++ b/substrate/client/tests/common/genesis_liquidity.rs @@ -65,8 +65,7 @@ pub async fn set_up_genesis( }) .or_insert(0); - let batch = - Batch { network: coin.network(), id: batch_ids[&coin.network()], block, instructions }; + let batch = Batch { network: coin.network(), id: batch_ids[&coin.network()], instructions }; provide_batch(serai, batch).await; } diff --git a/substrate/in-instructions/pallet/src/lib.rs b/substrate/in-instructions/pallet/src/lib.rs index 5b394c3d..79d4c717 100644 --- a/substrate/in-instructions/pallet/src/lib.rs +++ b/substrate/in-instructions/pallet/src/lib.rs @@ -60,9 +60,16 @@ pub mod pallet { #[pallet::event] #[pallet::generate_deposit(fn deposit_event)] pub enum Event { - Batch { network: NetworkId, id: u32, block: BlockHash, instructions_hash: [u8; 32] }, - InstructionFailure { network: NetworkId, id: u32, index: u32 }, - Halt { network: NetworkId }, + Batch { + network: NetworkId, + publishing_session: Session, + id: u32, + in_instructions_hash: [u8; 32], + in_instruction_results: BitVec, + }, + Halt { + network: NetworkId, + }, } #[pallet::error] @@ -254,22 +261,7 @@ pub mod pallet { pub fn execute_batch(origin: OriginFor, batch: SignedBatch) -> DispatchResult { ensure_none(origin)?; - let batch = batch.batch; - - Self::deposit_event(Event::Batch { - network: batch.network, - id: batch.id, - instructions_hash: blake2_256(&batch.instructions.encode()), - }); - for (i, instruction) in batch.instructions.into_iter().enumerate() { - if Self::execute(instruction).is_err() { - Self::deposit_event(Event::InstructionFailure { - network: batch.network, - id: batch.id, - index: u32::try_from(i).unwrap(), - }); - } - } + // The entire Batch execution is handled in pre_dispatch Ok(()) } @@ -300,6 +292,7 @@ pub mod pallet { // verify the signature let (current_session, prior, current) = keys_for_network::(network)?; + let prior_session = Session(current_session.0 - 1); let batch_message = batch_message(&batch.batch); // Check the prior key first since only a single `Batch` (the last one) will be when prior is // Some yet prior wasn't the signing key @@ -315,6 +308,8 @@ pub mod pallet { Err(InvalidTransaction::BadProof)?; } + let batch = batch.batch; + if Halted::::contains_key(network) { Err(InvalidTransaction::Custom(1))?; } @@ -323,10 +318,7 @@ pub mod pallet { // key is publishing `Batch`s. This should only happen once the current key has verified all // `Batch`s published by the prior key, meaning they are accepting the hand-over. if prior.is_some() && (!valid_by_prior) { - ValidatorSets::::retire_set(ValidatorSet { - network, - session: Session(current_session.0 - 1), - }); + ValidatorSets::::retire_set(ValidatorSet { network, session: prior_session }); } // check that this validator set isn't publishing a batch more than once per block @@ -335,34 +327,39 @@ pub mod pallet { if last_block >= current_block { Err(InvalidTransaction::Future)?; } - LastBatchBlock::::insert(batch.batch.network, frame_system::Pallet::::block_number()); + LastBatchBlock::::insert(batch.network, frame_system::Pallet::::block_number()); // Verify the batch is sequential // LastBatch has the last ID set. The next ID should be it + 1 // If there's no ID, the next ID should be 0 let expected = LastBatch::::get(network).map_or(0, |prev| prev + 1); - if batch.batch.id < expected { + if batch.id < expected { Err(InvalidTransaction::Stale)?; } - if batch.batch.id > expected { + if batch.id > expected { Err(InvalidTransaction::Future)?; } - LastBatch::::insert(batch.batch.network, batch.batch.id); + LastBatch::::insert(batch.network, batch.id); - // Verify all Balances in this Batch are for this network - for instruction in &batch.batch.instructions { + let in_instructions_hash = blake2_256(&batch.instructions.encode()); + let mut in_instruction_results = BitVec::new(); + for (i, instruction) in batch.instructions.into_iter().enumerate() { // Verify this coin is for this network - // If this is ever hit, it means the validator set has turned malicious and should be fully - // slashed - // Because we have an error here, no validator set which turns malicious should execute - // this code path - // Accordingly, there's no value in writing code to fully slash the network, when such an - // even would require a runtime upgrade to fully resolve anyways - if instruction.balance.coin.network() != batch.batch.network { + if instruction.balance.coin.network() != batch.network { Err(InvalidTransaction::Custom(2))?; } + + in_instruction_results.push(Self::execute(instruction).is_ok()); } + Self::deposit_event(Event::Batch { + network: batch.network, + publishing_session: if valid_by_prior { prior_session } else { current_session }, + id: batch.id, + in_instructions_hash, + in_instruction_results, + }); + ValidTransaction::with_tag_prefix("in-instructions") .and_provides((batch.batch.network, batch.batch.id)) // Set a 10 block longevity, though this should be included in the next block