mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-09 04:39:24 +00:00
Coordinator Cleanup (#481)
* Move logic for evaluating if a cosign should occur to its own file Cleans it up and makes it more robust. * Have expected_next_batch return an error instead of retrying While convenient to offer an error-free implementation, it potentially caused very long lived lock acquisitions in handle_processor_message. * Unify and clean DkgConfirmer and DkgRemoval Does so via adding a new file for the common code, SigningProtocol. Modifies from_cache to return the preprocess with the machine, as there's no reason not to. Also removes an unused Result around the type. Clarifies the security around deterministic nonces, removing them for saved-to-disk cached preprocesses. The cached preprocesses are encrypted as the DB is not a proper secret store. Moves arguments always present in the protocol from function arguments into the struct itself. Removes the horribly ugly code in DkgRemoval, fixing multiple issues present with it which would cause it to fail on use. * Set SeraiBlockNumber in cosign.rs as it's used by the cosigning protocol * Remove unnecessary Clone from lambdas in coordinator * Remove the EventDb from Tributary scanner We used per-Transaction DB TXNs so on error, we don't have to rescan the entire block yet only the rest of it. We prevented scanning multiple transactions by tracking which we already had. This is over-engineered and not worth it. * Implement borsh for HasEvents, removing the manual encoding * Merge DkgConfirmer and DkgRemoval into signing_protocol.rs Fixes a bug in DkgConfirmer which would cause it to improperly handle indexes if any validator had multiple key shares. * Strictly type DataSpecification's Label * Correct threshold_i_map_to_keys_and_musig_i_map It didn't include the participant's own index and accordingly was offset. * Create TributaryBlockHandler This struct contains all variables prior passed to handle_block and stops them from being passed around again and again. This also ensures fatal_slash is only called while handling a block, as needed as it expects to operate under perfect consensus. * Inline accumulate, store confirmation nonces with shares Inlining accumulate makes sense due to the amount of data accumulate needed to be passed. Storing confirmation nonces with shares ensures that both are available or neither. Prior, one could be yet the other may not have been (requiring an assert in runtime to ensure we didn't bungle it somehow). * Create helper functions for handling DkgRemoval/SubstrateSign/Sign Tributary TXs * Move Label into SignData All of our transactions which use SignData end up with the same common usage pattern for Label, justifying this. Removes 3 transactions, explicitly de-duplicating their handlers. * Remove CurrentlyCompletingKeyPair for the non-contextual DkgKeyPair * Remove the manual read/write for TributarySpec for borsh This struct doesn't have any optimizations booned by the manual impl. Using borsh reduces our scope. * Use temporary variables to further minimize LoC in tributary handler * Remove usage of tuples for non-trivial Tributary transactions * Remove serde from dkg serde could be used to deserialize intenrally inconsistent objects which could lead to panics or faults. The BorshDeserialize derives have been replaced with a manual implementation which won't produce inconsistent objects. * Abstract Future generics using new trait definitions in coordinator * Move published_signed_transaction to tributary/mod.rs to reduce the size of main.rs * Split coordinator/src/tributary/mod.rs into spec.rs and transaction.rs
This commit is contained in:
@@ -8,12 +8,11 @@ use zeroize::Zeroizing;
|
||||
|
||||
use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
|
||||
|
||||
use scale::{Encode, Decode};
|
||||
use serai_client::{
|
||||
SeraiError, Block, Serai, TemporalSerai,
|
||||
primitives::{BlockHash, NetworkId},
|
||||
validator_sets::{
|
||||
primitives::{Session, ValidatorSet, KeyPair, amortize_excess_key_shares},
|
||||
primitives::{ValidatorSet, KeyPair, amortize_excess_key_shares},
|
||||
ValidatorSetsEvent,
|
||||
},
|
||||
in_instructions::InInstructionsEvent,
|
||||
@@ -26,15 +25,14 @@ use processor_messages::SubstrateContext;
|
||||
|
||||
use tokio::{sync::mpsc, time::sleep};
|
||||
|
||||
use crate::{
|
||||
Db,
|
||||
processors::Processors,
|
||||
tributary::{TributarySpec, SeraiBlockNumber},
|
||||
};
|
||||
use crate::{Db, processors::Processors, tributary::TributarySpec};
|
||||
|
||||
mod db;
|
||||
pub use db::*;
|
||||
|
||||
mod cosign;
|
||||
pub use cosign::*;
|
||||
|
||||
async fn in_set(
|
||||
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
|
||||
serai: &TemporalSerai<'_>,
|
||||
@@ -110,7 +108,7 @@ async fn handle_new_set<D: Db>(
|
||||
|
||||
new_tributary_spec.send(spec).unwrap();
|
||||
} else {
|
||||
log::info!("not present in set {:?}", set);
|
||||
log::info!("not present in new set {:?}", set);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -147,8 +145,8 @@ async fn handle_key_gen<Pro: Processors>(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_batch_and_burns<D: Db, Pro: Processors>(
|
||||
db: &mut D,
|
||||
async fn handle_batch_and_burns<Pro: Processors>(
|
||||
txn: &mut impl DbTxn,
|
||||
processors: &Pro,
|
||||
serai: &Serai,
|
||||
block: &Block,
|
||||
@@ -178,9 +176,7 @@ async fn handle_batch_and_burns<D: Db, Pro: Processors>(
|
||||
{
|
||||
network_had_event(&mut burns, &mut batches, network);
|
||||
|
||||
let mut txn = db.txn();
|
||||
BatchInstructionsHashDb::set(&mut txn, network, id, &instructions_hash);
|
||||
txn.commit();
|
||||
BatchInstructionsHashDb::set(txn, network, id, &instructions_hash);
|
||||
|
||||
// Make sure this is the only Batch event for this network in this Block
|
||||
assert!(batch_block.insert(network, network_block).is_none());
|
||||
@@ -257,8 +253,8 @@ async fn handle_block<D: Db, Pro: Processors>(
|
||||
for new_set in serai.as_of(hash).validator_sets().new_set_events().await? {
|
||||
// Individually mark each event as handled so on reboot, we minimize duplicates
|
||||
// Additionally, if the Serai connection also fails 1/100 times, this means a block with 1000
|
||||
// events will successfully be incrementally handled (though the Serai connection should be
|
||||
// stable)
|
||||
// events will successfully be incrementally handled
|
||||
// (though the Serai connection should be stable, making this unnecessary)
|
||||
let ValidatorSetsEvent::NewSet { set } = new_set else {
|
||||
panic!("NewSet event wasn't NewSet: {new_set:?}");
|
||||
};
|
||||
@@ -269,11 +265,11 @@ async fn handle_block<D: Db, Pro: Processors>(
|
||||
continue;
|
||||
}
|
||||
|
||||
if EventDb::is_unhandled(db, &hash, event_id) {
|
||||
if HandledEvent::is_unhandled(db, hash, event_id) {
|
||||
log::info!("found fresh new set event {:?}", new_set);
|
||||
let mut txn = db.txn();
|
||||
handle_new_set::<D>(&mut txn, key, new_tributary_spec, serai, &block, set).await?;
|
||||
EventDb::handle_event(&mut txn, &hash, event_id);
|
||||
HandledEvent::handle_event(&mut txn, hash, event_id);
|
||||
txn.commit();
|
||||
}
|
||||
event_id += 1;
|
||||
@@ -281,7 +277,7 @@ async fn handle_block<D: Db, Pro: Processors>(
|
||||
|
||||
// If a key pair was confirmed, inform the processor
|
||||
for key_gen in serai.as_of(hash).validator_sets().key_gen_events().await? {
|
||||
if EventDb::is_unhandled(db, &hash, event_id) {
|
||||
if HandledEvent::is_unhandled(db, hash, event_id) {
|
||||
log::info!("found fresh key gen event {:?}", key_gen);
|
||||
if let ValidatorSetsEvent::KeyGen { set, key_pair } = key_gen {
|
||||
handle_key_gen(processors, serai, &block, set, key_pair).await?;
|
||||
@@ -289,7 +285,7 @@ async fn handle_block<D: Db, Pro: Processors>(
|
||||
panic!("KeyGen event wasn't KeyGen: {key_gen:?}");
|
||||
}
|
||||
let mut txn = db.txn();
|
||||
EventDb::handle_event(&mut txn, &hash, event_id);
|
||||
HandledEvent::handle_event(&mut txn, hash, event_id);
|
||||
txn.commit();
|
||||
}
|
||||
event_id += 1;
|
||||
@@ -304,28 +300,26 @@ async fn handle_block<D: Db, Pro: Processors>(
|
||||
continue;
|
||||
}
|
||||
|
||||
if EventDb::is_unhandled(db, &hash, event_id) {
|
||||
if HandledEvent::is_unhandled(db, hash, event_id) {
|
||||
log::info!("found fresh set retired event {:?}", retired_set);
|
||||
let mut txn = db.txn();
|
||||
crate::ActiveTributaryDb::retire_tributary(&mut txn, set);
|
||||
tributary_retired.send(set).unwrap();
|
||||
EventDb::handle_event(&mut txn, &hash, event_id);
|
||||
HandledEvent::handle_event(&mut txn, hash, event_id);
|
||||
txn.commit();
|
||||
}
|
||||
event_id += 1;
|
||||
}
|
||||
|
||||
// Finally, tell the processor of acknowledged blocks/burns
|
||||
// This uses a single event as. unlike prior events which individually executed code, all
|
||||
// This uses a single event as unlike prior events which individually executed code, all
|
||||
// following events share data collection
|
||||
// This does break the uniqueness of (hash, event_id) -> one event, yet
|
||||
// (network, (hash, event_id)) remains valid as a unique ID for an event
|
||||
if EventDb::is_unhandled(db, &hash, event_id) {
|
||||
handle_batch_and_burns(db, processors, serai, &block).await?;
|
||||
if HandledEvent::is_unhandled(db, hash, event_id) {
|
||||
let mut txn = db.txn();
|
||||
handle_batch_and_burns(&mut txn, processors, serai, &block).await?;
|
||||
HandledEvent::handle_event(&mut txn, hash, event_id);
|
||||
txn.commit();
|
||||
}
|
||||
let mut txn = db.txn();
|
||||
EventDb::handle_event(&mut txn, &hash, event_id);
|
||||
txn.commit();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -342,181 +336,8 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
|
||||
// Check if there's been a new Substrate block
|
||||
let latest_number = serai.latest_finalized_block().await?.number();
|
||||
|
||||
// TODO: If this block directly builds off a cosigned block *and* doesn't contain events, mark
|
||||
// cosigned,
|
||||
{
|
||||
// If:
|
||||
// A) This block has events and it's been at least X blocks since the last cosign or
|
||||
// B) This block doesn't have events but it's been X blocks since a skipped block which did
|
||||
// have events or
|
||||
// C) This block key gens (which changes who the cosigners are)
|
||||
// cosign this block.
|
||||
const COSIGN_DISTANCE: u64 = 5 * 60 / 6; // 5 minutes, expressed in blocks
|
||||
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, Decode)]
|
||||
enum HasEvents {
|
||||
KeyGen,
|
||||
Yes,
|
||||
No,
|
||||
}
|
||||
async fn block_has_events(
|
||||
txn: &mut impl DbTxn,
|
||||
serai: &Serai,
|
||||
block: u64,
|
||||
) -> Result<HasEvents, SeraiError> {
|
||||
let cached = BlockHasEvents::get(txn, block);
|
||||
match cached {
|
||||
None => {
|
||||
let serai = serai.as_of(
|
||||
serai
|
||||
.finalized_block_by_number(block)
|
||||
.await?
|
||||
.expect("couldn't get block which should've been finalized")
|
||||
.hash(),
|
||||
);
|
||||
|
||||
if !serai.validator_sets().key_gen_events().await?.is_empty() {
|
||||
return Ok(HasEvents::KeyGen);
|
||||
}
|
||||
|
||||
let has_no_events = serai.coins().burn_with_instruction_events().await?.is_empty() &&
|
||||
serai.in_instructions().batch_events().await?.is_empty() &&
|
||||
serai.validator_sets().new_set_events().await?.is_empty() &&
|
||||
serai.validator_sets().set_retired_events().await?.is_empty();
|
||||
|
||||
let has_events = if has_no_events { HasEvents::No } else { HasEvents::Yes };
|
||||
|
||||
let has_events = has_events.encode();
|
||||
assert_eq!(has_events.len(), 1);
|
||||
BlockHasEvents::set(txn, block, &has_events[0]);
|
||||
Ok(HasEvents::Yes)
|
||||
}
|
||||
Some(code) => Ok(HasEvents::decode(&mut [code].as_slice()).unwrap()),
|
||||
}
|
||||
}
|
||||
|
||||
let mut txn = db.txn();
|
||||
let Some((last_intended_to_cosign_block, mut skipped_block)) = IntendedCosign::get(&txn) else {
|
||||
IntendedCosign::set_intended_cosign(&mut txn, 1);
|
||||
txn.commit();
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
// If we haven't flagged skipped, and a block within the distance had events, flag the first
|
||||
// such block as skipped
|
||||
let mut distance_end_exclusive = last_intended_to_cosign_block + COSIGN_DISTANCE;
|
||||
// If we've never triggered a cosign, don't skip any cosigns
|
||||
if CosignTriggered::get(&txn).is_none() {
|
||||
distance_end_exclusive = 0;
|
||||
}
|
||||
if skipped_block.is_none() {
|
||||
for b in (last_intended_to_cosign_block + 1) .. distance_end_exclusive {
|
||||
if b > latest_number {
|
||||
break;
|
||||
}
|
||||
|
||||
if block_has_events(&mut txn, serai, b).await? == HasEvents::Yes {
|
||||
skipped_block = Some(b);
|
||||
log::debug!("skipping cosigning {b} due to proximity to prior cosign");
|
||||
IntendedCosign::set_skipped_cosign(&mut txn, b);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut has_no_cosigners = None;
|
||||
let mut cosign = vec![];
|
||||
|
||||
// Block we should cosign no matter what if no prior blocks qualified for cosigning
|
||||
let maximally_latent_cosign_block =
|
||||
skipped_block.map(|skipped_block| skipped_block + COSIGN_DISTANCE);
|
||||
for block in (last_intended_to_cosign_block + 1) ..= latest_number {
|
||||
let actual_block = serai
|
||||
.finalized_block_by_number(block)
|
||||
.await?
|
||||
.expect("couldn't get block which should've been finalized");
|
||||
SeraiBlockNumber::set(&mut txn, actual_block.hash(), &block);
|
||||
|
||||
let mut set = false;
|
||||
|
||||
let block_has_events = block_has_events(&mut txn, serai, block).await?;
|
||||
// If this block is within the distance,
|
||||
if block < distance_end_exclusive {
|
||||
// and set a key, cosign it
|
||||
if block_has_events == HasEvents::KeyGen {
|
||||
IntendedCosign::set_intended_cosign(&mut txn, block);
|
||||
set = true;
|
||||
// Carry skipped if it isn't included by cosigning this block
|
||||
if let Some(skipped) = skipped_block {
|
||||
if skipped > block {
|
||||
IntendedCosign::set_skipped_cosign(&mut txn, block);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (Some(block) == maximally_latent_cosign_block) ||
|
||||
(block_has_events != HasEvents::No)
|
||||
{
|
||||
// Since this block was outside the distance and had events/was maximally latent, cosign it
|
||||
IntendedCosign::set_intended_cosign(&mut txn, block);
|
||||
set = true;
|
||||
}
|
||||
|
||||
if set {
|
||||
// Get the keys as of the prior block
|
||||
// That means if this block is setting new keys (which won't lock in until we process this
|
||||
// block), we won't freeze up waiting for the yet-to-be-processed keys to sign this block
|
||||
let serai = serai.as_of(actual_block.header.parent_hash.into());
|
||||
|
||||
has_no_cosigners = Some(actual_block.clone());
|
||||
|
||||
for network in serai_client::primitives::NETWORKS {
|
||||
// Get the latest session to have set keys
|
||||
let Some(latest_session) = serai.validator_sets().session(network).await? else {
|
||||
continue;
|
||||
};
|
||||
let prior_session = Session(latest_session.0.saturating_sub(1));
|
||||
let set_with_keys = if serai
|
||||
.validator_sets()
|
||||
.keys(ValidatorSet { network, session: prior_session })
|
||||
.await?
|
||||
.is_some()
|
||||
{
|
||||
ValidatorSet { network, session: prior_session }
|
||||
} else {
|
||||
let set = ValidatorSet { network, session: latest_session };
|
||||
if serai.validator_sets().keys(set).await?.is_none() {
|
||||
continue;
|
||||
}
|
||||
set
|
||||
};
|
||||
|
||||
// Since this is a valid cosigner, don't flag this block as having no cosigners
|
||||
has_no_cosigners = None;
|
||||
log::debug!("{:?} will be cosigning {block}", set_with_keys.network);
|
||||
|
||||
if in_set(key, &serai, set_with_keys).await?.unwrap() {
|
||||
cosign.push((set_with_keys, block, actual_block.hash()));
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// If this block doesn't have cosigners, yet does have events, automatically mark it as
|
||||
// cosigned
|
||||
if let Some(has_no_cosigners) = has_no_cosigners {
|
||||
log::debug!("{} had no cosigners available, marking as cosigned", has_no_cosigners.number());
|
||||
LatestCosignedBlock::set(&mut txn, &has_no_cosigners.number());
|
||||
} else {
|
||||
CosignTriggered::set(&mut txn, &());
|
||||
for (set, block, hash) in cosign {
|
||||
log::debug!("cosigning {block} with {:?} {:?}", set.network, set.session);
|
||||
CosignTransactions::append_cosign(&mut txn, set, block, hash);
|
||||
}
|
||||
}
|
||||
txn.commit();
|
||||
}
|
||||
// Advance the cosigning protocol
|
||||
advance_cosign_protocol(db, key, serai, latest_number).await?;
|
||||
|
||||
// Reduce to the latest cosigned block
|
||||
let latest_number = latest_number.min(LatestCosignedBlock::latest_cosigned_block(db));
|
||||
@@ -526,24 +347,19 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
|
||||
}
|
||||
|
||||
for b in *next_block ..= latest_number {
|
||||
log::info!("found substrate block {b}");
|
||||
handle_block(
|
||||
db,
|
||||
key,
|
||||
new_tributary_spec,
|
||||
tributary_retired,
|
||||
processors,
|
||||
serai,
|
||||
serai
|
||||
.finalized_block_by_number(b)
|
||||
.await?
|
||||
.expect("couldn't get block before the latest finalized block"),
|
||||
)
|
||||
.await?;
|
||||
let block = serai
|
||||
.finalized_block_by_number(b)
|
||||
.await?
|
||||
.expect("couldn't get block before the latest finalized block");
|
||||
|
||||
log::info!("handling substrate block {b}");
|
||||
handle_block(db, key, new_tributary_spec, tributary_retired, processors, serai, block).await?;
|
||||
*next_block += 1;
|
||||
|
||||
let mut txn = db.txn();
|
||||
NextBlock::set(&mut txn, next_block);
|
||||
txn.commit();
|
||||
|
||||
log::info!("handled substrate block {b}");
|
||||
}
|
||||
|
||||
@@ -578,6 +394,7 @@ pub async fn scan_task<D: Db, Pro: Processors>(
|
||||
};
|
||||
*/
|
||||
// TODO: Restore the above subscription-based system
|
||||
// That would require moving serai-client from HTTP to websockets
|
||||
let new_substrate_block_notifier = {
|
||||
let serai = &serai;
|
||||
move |next_substrate_block| async move {
|
||||
@@ -648,22 +465,25 @@ pub async fn scan_task<D: Db, Pro: Processors>(
|
||||
}
|
||||
|
||||
/// Gets the expected ID for the next Batch.
|
||||
pub(crate) async fn get_expected_next_batch(serai: &Serai, network: NetworkId) -> u32 {
|
||||
let mut first = true;
|
||||
loop {
|
||||
if !first {
|
||||
log::error!("{} {network:?}", "couldn't connect to Serai node to get the next batch ID for",);
|
||||
sleep(Duration::from_secs(5)).await;
|
||||
///
|
||||
/// Will log an error and apply a slight sleep on error, letting the caller simply immediately
|
||||
/// retry.
|
||||
pub(crate) async fn expected_next_batch(
|
||||
serai: &Serai,
|
||||
network: NetworkId,
|
||||
) -> Result<u32, SeraiError> {
|
||||
async fn expected_next_batch_inner(serai: &Serai, network: NetworkId) -> Result<u32, SeraiError> {
|
||||
let serai = serai.as_of_latest_finalized_block().await?;
|
||||
let last = serai.in_instructions().last_batch_for_network(network).await?;
|
||||
Ok(if let Some(last) = last { last + 1 } else { 0 })
|
||||
}
|
||||
match expected_next_batch_inner(serai, network).await {
|
||||
Ok(next) => Ok(next),
|
||||
Err(e) => {
|
||||
log::error!("couldn't get the expected next batch from substrate: {e:?}");
|
||||
sleep(Duration::from_millis(100)).await;
|
||||
Err(e)
|
||||
}
|
||||
first = false;
|
||||
|
||||
let Ok(serai) = serai.as_of_latest_finalized_block().await else {
|
||||
continue;
|
||||
};
|
||||
let Ok(last) = serai.in_instructions().last_batch_for_network(network).await else {
|
||||
continue;
|
||||
};
|
||||
break if let Some(last) = last { last + 1 } else { 0 };
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user