diff --git a/Cargo.lock b/Cargo.lock index 77934820..9b349d36 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7273,6 +7273,7 @@ dependencies = [ name = "serai-client" version = "0.1.0" dependencies = [ + "async-lock", "bitcoin", "blake2", "ciphersuite", diff --git a/coordinator/src/substrate/cosign.rs b/coordinator/src/substrate/cosign.rs new file mode 100644 index 00000000..ffb5d202 --- /dev/null +++ b/coordinator/src/substrate/cosign.rs @@ -0,0 +1,264 @@ +/* + If: + A) This block has events and it's been at least X blocks since the last cosign or + B) This block doesn't have events but it's been X blocks since a skipped block which did + have events or + C) This block key gens (which changes who the cosigners are) + cosign this block. + + This creates both a minimum and maximum delay of X blocks before a block's cosigning begins, + barring key gens which are exceptional. The minimum delay is there to ensure we don't constantly + spawn new protocols every 6 seconds, overwriting the old ones. The maximum delay is there to + ensure any block needing cosigned is consigned within a reasonable amount of time. +*/ + +use core::{ops::Deref, time::Duration}; +use std::{ + sync::Arc, + collections::{HashSet, HashMap}, +}; + +use zeroize::Zeroizing; + +use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto}; + +use scale::{Encode, Decode}; +use serai_client::{ + SeraiError, Block, Serai, TemporalSerai, + primitives::{BlockHash, NetworkId}, + validator_sets::{ + primitives::{Session, ValidatorSet, KeyPair, amortize_excess_key_shares}, + ValidatorSetsEvent, + }, + in_instructions::InInstructionsEvent, + coins::CoinsEvent, +}; + +use serai_db::*; + +use processor_messages::SubstrateContext; + +use tokio::{sync::mpsc, time::sleep}; + +use crate::{ + Db, + processors::Processors, + tributary::{TributarySpec, SeraiBlockNumber}, +}; + +// 5 minutes, expressed in blocks +// TODO: Pull a constant for block time +const COSIGN_DISTANCE: u64 = 5 * 60 / 6; + +create_db!( + SubstrateCosignDb { + CosignTriggered: () -> (), + IntendedCosign: () -> (u64, Option), + BlockHasEvents: (block: u64) -> u8, + LatestCosignedBlock: () -> u64, + } +); + +impl IntendedCosign { + pub fn set_intended_cosign(txn: &mut impl DbTxn, intended: u64) { + Self::set(txn, &(intended, None::)); + } + pub fn set_skipped_cosign(txn: &mut impl DbTxn, skipped: u64) { + let (intended, prior_skipped) = Self::get(txn).unwrap(); + assert!(prior_skipped.is_none()); + Self::set(txn, &(intended, Some(skipped))); + } +} + +impl LatestCosignedBlock { + pub fn latest_cosigned_block(getter: &impl Get) -> u64 { + Self::get(getter).unwrap_or_default().max(1) + } +} + +db_channel! { + SubstrateDbChannels { + CosignTransactions: (network: NetworkId) -> (Session, u64, [u8; 32]), + } +} + +impl CosignTransactions { + // Append a cosign transaction. + pub fn append_cosign(txn: &mut impl DbTxn, set: ValidatorSet, number: u64, hash: [u8; 32]) { + CosignTransactions::send(txn, set.network, &(set.session, number, hash)) + } +} + +#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, Decode)] +enum HasEvents { + KeyGen, + Yes, + No, +} +async fn block_has_events( + txn: &mut impl DbTxn, + serai: &Serai, + block: u64, +) -> Result { + let cached = BlockHasEvents::get(txn, block); + match cached { + None => { + let serai = serai.as_of( + serai + .finalized_block_by_number(block) + .await? + .expect("couldn't get block which should've been finalized") + .hash(), + ); + + if !serai.validator_sets().key_gen_events().await?.is_empty() { + return Ok(HasEvents::KeyGen); + } + + let has_no_events = serai.coins().burn_with_instruction_events().await?.is_empty() && + serai.in_instructions().batch_events().await?.is_empty() && + serai.validator_sets().new_set_events().await?.is_empty() && + serai.validator_sets().set_retired_events().await?.is_empty(); + + let has_events = if has_no_events { HasEvents::No } else { HasEvents::Yes }; + + let has_events = has_events.encode(); + assert_eq!(has_events.len(), 1); + BlockHasEvents::set(txn, block, &has_events[0]); + Ok(HasEvents::Yes) + } + Some(code) => Ok(HasEvents::decode(&mut [code].as_slice()).unwrap()), + } +} + +/* + Advances the cosign protocol as should be done per the latest block. + + A block is considered cosigned if: + A) It was cosigned + B) It's the parent of a cosigned block + C) It immediately follows a cosigned block and has no events requiring cosigning (TODO) +*/ +async fn advance_cosign_protocol(db: &mut impl Db, serai: &Serai, latest_number: u64) -> Result<(), ()> { + let Some((last_intended_to_cosign_block, mut skipped_block)) = IntendedCosign::get(&txn) else { + let mut txn = db.txn(); + IntendedCosign::set_intended_cosign(&mut txn, 1); + txn.commit(); + return Ok(()); + }; +} + +// If we haven't flagged skipped, and a block within the distance had events, flag the first +// such block as skipped +let mut distance_end_exclusive = last_intended_to_cosign_block + COSIGN_DISTANCE; +// If we've never triggered a cosign, don't skip any cosigns +if CosignTriggered::get(&txn).is_none() { + distance_end_exclusive = 0; +} +if skipped_block.is_none() { + for b in (last_intended_to_cosign_block + 1) .. distance_end_exclusive { + if b > latest_number { + break; + } + + if block_has_events(&mut txn, serai, b).await? == HasEvents::Yes { + skipped_block = Some(b); + log::debug!("skipping cosigning {b} due to proximity to prior cosign"); + IntendedCosign::set_skipped_cosign(&mut txn, b); + break; + } + } +} + +let mut has_no_cosigners = None; +let mut cosign = vec![]; + +// Block we should cosign no matter what if no prior blocks qualified for cosigning +let maximally_latent_cosign_block = + skipped_block.map(|skipped_block| skipped_block + COSIGN_DISTANCE); +for block in (last_intended_to_cosign_block + 1) ..= latest_number { + let actual_block = serai + .finalized_block_by_number(block) + .await? + .expect("couldn't get block which should've been finalized"); + SeraiBlockNumber::set(&mut txn, actual_block.hash(), &block); + + let mut set = false; + + let block_has_events = block_has_events(&mut txn, serai, block).await?; + // If this block is within the distance, + if block < distance_end_exclusive { + // and set a key, cosign it + if block_has_events == HasEvents::KeyGen { + IntendedCosign::set_intended_cosign(&mut txn, block); + set = true; + // Carry skipped if it isn't included by cosigning this block + if let Some(skipped) = skipped_block { + if skipped > block { + IntendedCosign::set_skipped_cosign(&mut txn, block); + } + } + } + } else if (Some(block) == maximally_latent_cosign_block) || + (block_has_events != HasEvents::No) + { + // Since this block was outside the distance and had events/was maximally latent, cosign it + IntendedCosign::set_intended_cosign(&mut txn, block); + set = true; + } + + if set { + // Get the keys as of the prior block + // That means if this block is setting new keys (which won't lock in until we process this + // block), we won't freeze up waiting for the yet-to-be-processed keys to sign this block + let serai = serai.as_of(actual_block.header.parent_hash.into()); + + has_no_cosigners = Some(actual_block.clone()); + + for network in serai_client::primitives::NETWORKS { + // Get the latest session to have set keys + let Some(latest_session) = serai.validator_sets().session(network).await? else { + continue; + }; + let prior_session = Session(latest_session.0.saturating_sub(1)); + let set_with_keys = if serai + .validator_sets() + .keys(ValidatorSet { network, session: prior_session }) + .await? + .is_some() + { + ValidatorSet { network, session: prior_session } + } else { + let set = ValidatorSet { network, session: latest_session }; + if serai.validator_sets().keys(set).await?.is_none() { + continue; + } + set + }; + + // Since this is a valid cosigner, don't flag this block as having no cosigners + has_no_cosigners = None; + log::debug!("{:?} will be cosigning {block}", set_with_keys.network); + + if in_set(key, &serai, set_with_keys).await?.unwrap() { + cosign.push((set_with_keys, block, actual_block.hash())); + } + } + + break; + } +} + +// If this block doesn't have cosigners, yet does have events, automatically mark it as +// cosigned +if let Some(has_no_cosigners) = has_no_cosigners { + log::debug!("{} had no cosigners available, marking as cosigned", has_no_cosigners.number()); + LatestCosignedBlock::set(&mut txn, &has_no_cosigners.number()); +} else { + CosignTriggered::set(&mut txn, &()); + for (set, block, hash) in cosign { + log::debug!("cosigning {block} with {:?} {:?}", set.network, set.session); + CosignTransactions::append_cosign(&mut txn, set, block, hash); + } +} +txn.commit(); diff --git a/coordinator/src/substrate/mod.rs b/coordinator/src/substrate/mod.rs index 65af3fcb..b5c58f2b 100644 --- a/coordinator/src/substrate/mod.rs +++ b/coordinator/src/substrate/mod.rs @@ -62,7 +62,8 @@ async fn handle_new_set( log::info!("present in set {:?}", set); let set_data = { - let serai = serai.as_of(block.hash()).validator_sets(); + let serai = serai.as_of(block.hash()); + let serai = serai.validator_sets(); let set_participants = serai.participants(set.network).await?.expect("NewSet for set which doesn't exist"); diff --git a/substrate/client/Cargo.toml b/substrate/client/Cargo.toml index fcf9ac12..6901a83f 100644 --- a/substrate/client/Cargo.toml +++ b/substrate/client/Cargo.toml @@ -28,6 +28,8 @@ sp-core = { git = "https://github.com/serai-dex/substrate", optional = true } sp-runtime = { git = "https://github.com/serai-dex/substrate", optional = true } frame-system = { git = "https://github.com/serai-dex/substrate", optional = true } +async-lock = "3" + simple-request = { path = "../../common/request", version = "0.1", optional = true } bitcoin = { version = "0.31", optional = true } diff --git a/substrate/client/src/serai/coins.rs b/substrate/client/src/serai/coins.rs index d13a711c..c5bef95d 100644 --- a/substrate/client/src/serai/coins.rs +++ b/substrate/client/src/serai/coins.rs @@ -11,18 +11,18 @@ const PALLET: &str = "Coins"; pub type CoinsEvent = serai_abi::coins::Event; #[derive(Clone, Copy)] -pub struct SeraiCoins<'a>(pub(crate) TemporalSerai<'a>); +pub struct SeraiCoins<'a>(pub(crate) &'a TemporalSerai<'a>); impl<'a> SeraiCoins<'a> { - pub fn into_inner(self) -> TemporalSerai<'a> { - self.0 - } - pub async fn mint_events(&self) -> Result, SeraiError> { self .0 .events(|event| { if let serai_abi::Event::Coins(event) = event { - Some(event).filter(|event| matches!(event, CoinsEvent::Mint { .. })) + if matches!(event, CoinsEvent::Mint { .. }) { + Some(event.clone()) + } else { + None + } } else { None } @@ -35,7 +35,11 @@ impl<'a> SeraiCoins<'a> { .0 .events(|event| { if let serai_abi::Event::Coins(event) = event { - Some(event).filter(|event| matches!(event, CoinsEvent::BurnWithInstruction { .. })) + if matches!(event, CoinsEvent::BurnWithInstruction { .. }) { + Some(event.clone()) + } else { + None + } } else { None } diff --git a/substrate/client/src/serai/dex.rs b/substrate/client/src/serai/dex.rs index 5e7043c3..00108dfe 100644 --- a/substrate/client/src/serai/dex.rs +++ b/substrate/client/src/serai/dex.rs @@ -6,12 +6,14 @@ use crate::{SeraiError, TemporalSerai}; pub type DexEvent = serai_abi::dex::Event; #[derive(Clone, Copy)] -pub struct SeraiDex<'a>(pub(crate) TemporalSerai<'a>); +pub struct SeraiDex<'a>(pub(crate) &'a TemporalSerai<'a>); impl<'a> SeraiDex<'a> { pub async fn events(&self) -> Result, SeraiError> { self .0 - .events(|event| if let serai_abi::Event::Dex(event) = event { Some(event) } else { None }) + .events( + |event| if let serai_abi::Event::Dex(event) = event { Some(event.clone()) } else { None }, + ) .await } diff --git a/substrate/client/src/serai/in_instructions.rs b/substrate/client/src/serai/in_instructions.rs index 3ba92e43..a8b47bfc 100644 --- a/substrate/client/src/serai/in_instructions.rs +++ b/substrate/client/src/serai/in_instructions.rs @@ -11,12 +11,8 @@ pub type InInstructionsEvent = serai_abi::in_instructions::Event; const PALLET: &str = "InInstructions"; #[derive(Clone, Copy)] -pub struct SeraiInInstructions<'a>(pub(crate) TemporalSerai<'a>); +pub struct SeraiInInstructions<'a>(pub(crate) &'a TemporalSerai<'a>); impl<'a> SeraiInInstructions<'a> { - pub fn into_inner(self) -> TemporalSerai<'a> { - self.0 - } - pub async fn latest_block_for_network( &self, network: NetworkId, @@ -36,7 +32,11 @@ impl<'a> SeraiInInstructions<'a> { .0 .events(|event| { if let serai_abi::Event::InInstructions(event) = event { - Some(event).filter(|event| matches!(event, InInstructionsEvent::Batch { .. })) + if matches!(event, InInstructionsEvent::Batch { .. }) { + Some(event.clone()) + } else { + None + } } else { None } diff --git a/substrate/client/src/serai/mod.rs b/substrate/client/src/serai/mod.rs index 1d6f29bb..be9b0cff 100644 --- a/substrate/client/src/serai/mod.rs +++ b/substrate/client/src/serai/mod.rs @@ -1,5 +1,6 @@ use thiserror::Error; +use async_lock::RwLock; use simple_request::{hyper, Request, Client}; use scale::{Encode, Decode, Compact}; @@ -69,8 +70,17 @@ pub struct Serai { genesis: [u8; 32], } -#[derive(Clone, Copy)] -pub struct TemporalSerai<'a>(pub(crate) &'a Serai, pub(crate) [u8; 32]); +type EventsInBlock = Vec>; +pub struct TemporalSerai<'a> { + serai: &'a Serai, + block: [u8; 32], + events: RwLock>, +} +impl<'a> Clone for TemporalSerai<'a> { + fn clone(&self) -> Self { + Self { serai: self.serai, block: self.block, events: RwLock::new(None) } + } +} impl Serai { pub async fn call( @@ -289,27 +299,35 @@ impl Serai { /// itself. pub async fn as_of_latest_finalized_block(&self) -> Result { let latest = self.latest_finalized_block_hash().await?; - Ok(TemporalSerai(self, latest)) + Ok(TemporalSerai { serai: self, block: latest, events: RwLock::new(None) }) } /// Returns a TemporalSerai able to retrieve state as of the specified block. pub fn as_of(&self, block: [u8; 32]) -> TemporalSerai { - TemporalSerai(self, block) + TemporalSerai { serai: self, block, events: RwLock::new(None) } } } impl<'a> TemporalSerai<'a> { - pub fn into_inner(&self) -> &Serai { - self.0 - } + async fn events( + &self, + filter_map: impl Fn(&Event) -> Option, + ) -> Result, SeraiError> { + let mut events = self.events.read().await; + if events.is_none() { + drop(events); + let mut events_write = self.events.write().await; + #[allow(clippy::unwrap_or_default)] + if events_write.is_none() { + *events_write = Some(self.storage("System", "Events", ()).await?.unwrap_or(vec![])); + } + drop(events_write); + events = self.events.read().await; + } - async fn events(&self, filter_map: impl Fn(Event) -> Option) -> Result, SeraiError> { let mut res = vec![]; - let all_events: Option>> = - self.storage("System", "Events", ()).await?; - #[allow(clippy::unwrap_or_default)] - for event in all_events.unwrap_or(vec![]) { - if let Some(event) = filter_map(event.event) { + for event in events.as_ref().unwrap() { + if let Some(event) = filter_map(&event.event) { res.push(event); } } @@ -328,7 +346,7 @@ impl<'a> TemporalSerai<'a> { full_key.extend(key.encode()); let res: Option = - self.0.call("state_getStorage", [hex::encode(full_key), hex::encode(self.1)]).await?; + self.serai.call("state_getStorage", [hex::encode(full_key), hex::encode(self.block)]).await?; let Some(res) = res else { return Ok(None) }; let res = Serai::hex_decode(res)?; Ok(Some(R::decode(&mut res.as_slice()).map_err(|_| { @@ -336,19 +354,19 @@ impl<'a> TemporalSerai<'a> { })?)) } - pub fn coins(self) -> SeraiCoins<'a> { + pub fn coins(&'a self) -> SeraiCoins<'a> { SeraiCoins(self) } - pub fn dex(self) -> SeraiDex<'a> { + pub fn dex(&'a self) -> SeraiDex<'a> { SeraiDex(self) } - pub fn in_instructions(self) -> SeraiInInstructions<'a> { + pub fn in_instructions(&'a self) -> SeraiInInstructions<'a> { SeraiInInstructions(self) } - pub fn validator_sets(self) -> SeraiValidatorSets<'a> { + pub fn validator_sets(&'a self) -> SeraiValidatorSets<'a> { SeraiValidatorSets(self) } } diff --git a/substrate/client/src/serai/validator_sets.rs b/substrate/client/src/serai/validator_sets.rs index cb6ea81b..be9b64b0 100644 --- a/substrate/client/src/serai/validator_sets.rs +++ b/substrate/client/src/serai/validator_sets.rs @@ -16,18 +16,18 @@ const PALLET: &str = "ValidatorSets"; pub type ValidatorSetsEvent = serai_abi::validator_sets::Event; #[derive(Clone, Copy)] -pub struct SeraiValidatorSets<'a>(pub(crate) TemporalSerai<'a>); +pub struct SeraiValidatorSets<'a>(pub(crate) &'a TemporalSerai<'a>); impl<'a> SeraiValidatorSets<'a> { - pub fn into_inner(self) -> TemporalSerai<'a> { - self.0 - } - pub async fn new_set_events(&self) -> Result, SeraiError> { self .0 .events(|event| { if let serai_abi::Event::ValidatorSets(event) = event { - Some(event).filter(|event| matches!(event, ValidatorSetsEvent::NewSet { .. })) + if matches!(event, ValidatorSetsEvent::NewSet { .. }) { + Some(event.clone()) + } else { + None + } } else { None } @@ -40,7 +40,11 @@ impl<'a> SeraiValidatorSets<'a> { .0 .events(|event| { if let serai_abi::Event::ValidatorSets(event) = event { - Some(event).filter(|event| matches!(event, ValidatorSetsEvent::KeyGen { .. })) + if matches!(event, ValidatorSetsEvent::KeyGen { .. }) { + Some(event.clone()) + } else { + None + } } else { None } @@ -53,7 +57,11 @@ impl<'a> SeraiValidatorSets<'a> { .0 .events(|event| { if let serai_abi::Event::ValidatorSets(event) = event { - Some(event).filter(|event| matches!(event, ValidatorSetsEvent::SetRetired { .. })) + if matches!(event, ValidatorSetsEvent::SetRetired { .. }) { + Some(event.clone()) + } else { + None + } } else { None } diff --git a/substrate/client/tests/burn.rs b/substrate/client/tests/burn.rs index 67a8359c..a30dabec 100644 --- a/substrate/client/tests/burn.rs +++ b/substrate/client/tests/burn.rs @@ -54,6 +54,7 @@ serai_test!( let block = provide_batch(&serai, batch.clone()).await; + let instruction = { let serai = serai.as_of(block); let batches = serai.in_instructions().batch_events().await.unwrap(); assert_eq!( @@ -82,19 +83,20 @@ serai_test!( OsRng.fill_bytes(&mut rand_bytes); let data = Data::new(rand_bytes).unwrap(); - let instruction = OutInstructionWithBalance { + OutInstructionWithBalance { balance, instruction: OutInstruction { address: external_address, data: Some(data) }, - }; + } +}; - let serai = serai.into_inner(); let block = publish_tx( - serai, + &serai, &serai.sign(&pair, SeraiCoins::burn_with_instruction(instruction.clone()), 0, 0), ) .await; - let serai = serai.as_of(block).coins(); + let serai = serai.as_of(block); + let serai = serai.coins(); let events = serai.burn_with_instruction_events().await.unwrap(); assert_eq!(events, vec![CoinsEvent::BurnWithInstruction { from: address, instruction }]); assert_eq!(serai.coin_supply(coin).await.unwrap(), Amount(0)); diff --git a/substrate/client/tests/validator_sets.rs b/substrate/client/tests/validator_sets.rs index 7f66cf11..d66b68fb 100644 --- a/substrate/client/tests/validator_sets.rs +++ b/substrate/client/tests/validator_sets.rs @@ -48,7 +48,8 @@ serai_test!( ); { - let vs_serai = serai.as_of_latest_finalized_block().await.unwrap().validator_sets(); + let vs_serai = serai.as_of_latest_finalized_block().await.unwrap(); + let vs_serai = vs_serai.validator_sets(); let participants = vs_serai.participants(set.network).await .unwrap() .unwrap() @@ -64,7 +65,8 @@ serai_test!( // While the set_keys function should handle this, it's beneficial to // independently test it - let serai = serai.as_of(block).validator_sets(); + let serai = serai.as_of(block); + let serai = serai.validator_sets(); assert_eq!( serai.key_gen_events().await.unwrap(), vec![ValidatorSetsEvent::KeyGen { set, key_pair: key_pair.clone() }] diff --git a/tests/coordinator/src/tests/sign.rs b/tests/coordinator/src/tests/sign.rs index 6f56572e..f2cea1df 100644 --- a/tests/coordinator/src/tests/sign.rs +++ b/tests/coordinator/src/tests/sign.rs @@ -243,7 +243,8 @@ async fn sign_test() { let block_included_in_hash = serai.finalized_block_by_number(block_included_in).await.unwrap().unwrap().hash(); - let serai = serai.as_of(block_included_in_hash).coins(); + let serai = serai.as_of(block_included_in_hash); + let serai = serai.coins(); assert_eq!( serai.coin_balance(Coin::Serai, serai_addr).await.unwrap(), Amount(1_000_000_000) @@ -310,7 +311,8 @@ async fn sign_test() { let last_serai_block = serai.finalized_block_by_number(last_serai_block).await.unwrap().unwrap(); let last_serai_block_hash = last_serai_block.hash(); - let serai = serai.as_of(last_serai_block_hash).coins(); + let serai = serai.as_of(last_serai_block_hash); + let serai = serai.coins(); assert_eq!(serai.coin_supply(Coin::Bitcoin).await.unwrap(), Amount(0)); assert_eq!(serai.coin_balance(Coin::Bitcoin, serai_addr).await.unwrap(), Amount(0));