Dedicated library for intending and evaluating cosigns

Not only cleans the existing cosign code but enables non-Serai-coordinators to
evaluate cosigns if they gain access to a feed of them (such as over an RPC).
This would let centralized services not only track the finalized chain yet the
cosigned chain without directly running a coordinator.

Still being wrapped up.
This commit is contained in:
Luke Parker
2024-12-22 06:41:55 -05:00
parent 147a6e43d0
commit 4de1a5804d
13 changed files with 914 additions and 669 deletions

View File

@@ -1,336 +0,0 @@
use core::time::Duration;
use std::{
sync::Arc,
collections::{HashSet, HashMap},
};
use tokio::{
sync::{mpsc, Mutex, RwLock},
time::sleep,
};
use borsh::BorshSerialize;
use sp_application_crypto::RuntimePublic;
use serai_client::{
primitives::{NETWORKS, NetworkId, Signature},
validator_sets::primitives::{Session, ValidatorSet},
SeraiError, TemporalSerai, Serai,
};
use serai_db::{Get, DbTxn, Db, create_db};
use processor_messages::coordinator::cosign_block_msg;
use crate::{
p2p::{CosignedBlock, GossipMessageKind, P2p},
substrate::LatestCosignedBlock,
};
create_db! {
CosignDb {
ReceivedCosign: (set: ValidatorSet, block: [u8; 32]) -> CosignedBlock,
LatestCosign: (network: NetworkId) -> CosignedBlock,
DistinctChain: (set: ValidatorSet) -> (),
}
}
pub struct CosignEvaluator<D: Db> {
db: Mutex<D>,
serai: Arc<Serai>,
stakes: RwLock<Option<HashMap<NetworkId, u64>>>,
latest_cosigns: RwLock<HashMap<NetworkId, CosignedBlock>>,
}
impl<D: Db> CosignEvaluator<D> {
async fn update_latest_cosign(&self) {
let stakes_lock = self.stakes.read().await;
// If we haven't gotten the stake data yet, return
let Some(stakes) = stakes_lock.as_ref() else { return };
let total_stake = stakes.values().copied().sum::<u64>();
let latest_cosigns = self.latest_cosigns.read().await;
let mut highest_block = 0;
for cosign in latest_cosigns.values() {
let mut networks = HashSet::new();
for (network, sub_cosign) in &*latest_cosigns {
if sub_cosign.block_number >= cosign.block_number {
networks.insert(network);
}
}
let sum_stake =
networks.into_iter().map(|network| stakes.get(network).unwrap_or(&0)).sum::<u64>();
let needed_stake = ((total_stake * 2) / 3) + 1;
if (total_stake == 0) || (sum_stake > needed_stake) {
highest_block = highest_block.max(cosign.block_number);
}
}
let mut db_lock = self.db.lock().await;
let mut txn = db_lock.txn();
if highest_block > LatestCosignedBlock::latest_cosigned_block(&txn) {
log::info!("setting latest cosigned block to {}", highest_block);
LatestCosignedBlock::set(&mut txn, &highest_block);
}
txn.commit();
}
async fn update_stakes(&self) -> Result<(), SeraiError> {
let serai = self.serai.as_of_latest_finalized_block().await?;
let mut stakes = HashMap::new();
for network in NETWORKS {
// Use if this network has published a Batch for a short-circuit of if they've ever set a key
let set_key = serai.in_instructions().last_batch_for_network(network).await?.is_some();
if set_key {
stakes.insert(
network,
serai
.validator_sets()
.total_allocated_stake(network)
.await?
.expect("network which published a batch didn't have a stake set")
.0,
);
}
}
// Since we've successfully built stakes, set it
*self.stakes.write().await = Some(stakes);
self.update_latest_cosign().await;
Ok(())
}
// Uses Err to signify a message should be retried
async fn handle_new_cosign(&self, cosign: CosignedBlock) -> Result<(), SeraiError> {
// If we already have this cosign or a newer cosign, return
if let Some(latest) = self.latest_cosigns.read().await.get(&cosign.network) {
if latest.block_number >= cosign.block_number {
return Ok(());
}
}
// If this an old cosign (older than a day), drop it
let latest_block = self.serai.latest_finalized_block().await?;
if (cosign.block_number + (24 * 60 * 60 / 6)) < latest_block.number() {
log::debug!("received old cosign supposedly signed by {:?}", cosign.network);
return Ok(());
}
let Some(block) = self.serai.finalized_block_by_number(cosign.block_number).await? else {
log::warn!("received cosign with a block number which doesn't map to a block");
return Ok(());
};
async fn set_with_keys_fn(
serai: &TemporalSerai<'_>,
network: NetworkId,
) -> Result<Option<ValidatorSet>, SeraiError> {
let Some(latest_session) = serai.validator_sets().session(network).await? else {
log::warn!("received cosign from {:?}, which doesn't yet have a session", network);
return Ok(None);
};
let prior_session = Session(latest_session.0.saturating_sub(1));
Ok(Some(
if serai
.validator_sets()
.keys(ValidatorSet { network, session: prior_session })
.await?
.is_some()
{
ValidatorSet { network, session: prior_session }
} else {
ValidatorSet { network, session: latest_session }
},
))
}
// Get the key for this network as of the prior block
// If we have two chains, this value may be different across chains depending on if one chain
// included the set_keys and one didn't
// Because set_keys will force a cosign, it will force detection of distinct blocks
// re: set_keys using keys prior to set_keys (assumed amenable to all)
let serai = self.serai.as_of(block.header.parent_hash.into());
let Some(set_with_keys) = set_with_keys_fn(&serai, cosign.network).await? else {
return Ok(());
};
let Some(keys) = serai.validator_sets().keys(set_with_keys).await? else {
log::warn!("received cosign for a block we didn't have keys for");
return Ok(());
};
if !keys
.0
.verify(&cosign_block_msg(cosign.block_number, cosign.block), &Signature(cosign.signature))
{
log::warn!("received cosigned block with an invalid signature");
return Ok(());
}
log::info!(
"received cosign for block {} ({}) by {:?}",
block.number(),
hex::encode(cosign.block),
cosign.network
);
// Save this cosign to the DB
{
let mut db = self.db.lock().await;
let mut txn = db.txn();
ReceivedCosign::set(&mut txn, set_with_keys, cosign.block, &cosign);
LatestCosign::set(&mut txn, set_with_keys.network, &(cosign));
txn.commit();
}
if cosign.block != block.hash() {
log::error!(
"received cosign for a distinct block at {}. we have {}. cosign had {}",
cosign.block_number,
hex::encode(block.hash()),
hex::encode(cosign.block)
);
let serai = self.serai.as_of(latest_block.hash());
let mut db = self.db.lock().await;
// Save this set as being on a different chain
let mut txn = db.txn();
DistinctChain::set(&mut txn, set_with_keys, &());
txn.commit();
let mut total_stake = 0;
let mut total_on_distinct_chain = 0;
for network in NETWORKS {
if network == NetworkId::Serai {
continue;
}
// Get the current set for this network
let set_with_keys = {
let mut res;
while {
res = set_with_keys_fn(&serai, cosign.network).await;
res.is_err()
} {
log::error!(
"couldn't get the set with keys when checking for a distinct chain: {:?}",
res
);
tokio::time::sleep(core::time::Duration::from_secs(3)).await;
}
res.unwrap()
};
// Get its stake
// Doesn't use the stakes inside self to prevent deadlocks re: multi-lock acquisition
if let Some(set_with_keys) = set_with_keys {
let stake = {
let mut res;
while {
res = serai.validator_sets().total_allocated_stake(set_with_keys.network).await;
res.is_err()
} {
log::error!(
"couldn't get total allocated stake when checking for a distinct chain: {:?}",
res
);
tokio::time::sleep(core::time::Duration::from_secs(3)).await;
}
res.unwrap()
};
if let Some(stake) = stake {
total_stake += stake.0;
if DistinctChain::get(&*db, set_with_keys).is_some() {
total_on_distinct_chain += stake.0;
}
}
}
}
// See https://github.com/serai-dex/serai/issues/339 for the reasoning on 17%
if (total_stake * 17 / 100) <= total_on_distinct_chain {
panic!("17% of validator sets (by stake) have co-signed a distinct chain");
}
} else {
{
let mut latest_cosigns = self.latest_cosigns.write().await;
latest_cosigns.insert(cosign.network, cosign);
}
self.update_latest_cosign().await;
}
Ok(())
}
#[allow(clippy::new_ret_no_self)]
pub fn new<P: P2p>(db: D, p2p: P, serai: Arc<Serai>) -> mpsc::UnboundedSender<CosignedBlock> {
let mut latest_cosigns = HashMap::new();
for network in NETWORKS {
if let Some(cosign) = LatestCosign::get(&db, network) {
latest_cosigns.insert(network, cosign);
}
}
let evaluator = Arc::new(Self {
db: Mutex::new(db),
serai,
stakes: RwLock::new(None),
latest_cosigns: RwLock::new(latest_cosigns),
});
// Spawn a task to update stakes regularly
tokio::spawn({
let evaluator = evaluator.clone();
async move {
loop {
// Run this until it passes
while evaluator.update_stakes().await.is_err() {
log::warn!("couldn't update stakes in the cosign evaluator");
// Try again in 10 seconds
sleep(Duration::from_secs(10)).await;
}
// Run it every 10 minutes as we don't need the exact stake data for this to be valid
sleep(Duration::from_secs(10 * 60)).await;
}
}
});
// Spawn a task to receive cosigns and handle them
let (send, mut recv) = mpsc::unbounded_channel();
tokio::spawn({
let evaluator = evaluator.clone();
async move {
while let Some(msg) = recv.recv().await {
while evaluator.handle_new_cosign(msg).await.is_err() {
// Try again in 10 seconds
sleep(Duration::from_secs(10)).await;
}
}
}
});
// Spawn a task to rebroadcast the most recent cosigns
tokio::spawn({
async move {
loop {
let cosigns = evaluator.latest_cosigns.read().await.values().copied().collect::<Vec<_>>();
for cosign in cosigns {
let mut buf = vec![];
cosign.serialize(&mut buf).unwrap();
P2p::broadcast(&p2p, GossipMessageKind::CosignedBlock, buf).await;
}
sleep(Duration::from_secs(60)).await;
}
}
});
// Return the channel to send cosigns
send
}
}

View File

@@ -1,332 +0,0 @@
/*
If:
A) This block has events and it's been at least X blocks since the last cosign or
B) This block doesn't have events but it's been X blocks since a skipped block which did
have events or
C) This block key gens (which changes who the cosigners are)
cosign this block.
This creates both a minimum and maximum delay of X blocks before a block's cosigning begins,
barring key gens which are exceptional. The minimum delay is there to ensure we don't constantly
spawn new protocols every 6 seconds, overwriting the old ones. The maximum delay is there to
ensure any block needing cosigned is consigned within a reasonable amount of time.
*/
use zeroize::Zeroizing;
use ciphersuite::{Ciphersuite, Ristretto};
use borsh::{BorshSerialize, BorshDeserialize};
use serai_client::{
SeraiError, Serai,
primitives::NetworkId,
validator_sets::primitives::{Session, ValidatorSet},
};
use serai_db::*;
use crate::{Db, substrate::in_set, tributary::SeraiBlockNumber};
// 5 minutes, expressed in blocks
// TODO: Pull a constant for block time
const COSIGN_DISTANCE: u64 = 5 * 60 / 6;
#[derive(Clone, Copy, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
enum HasEvents {
KeyGen,
Yes,
No,
}
create_db!(
SubstrateCosignDb {
ScanCosignFrom: () -> u64,
IntendedCosign: () -> (u64, Option<u64>),
BlockHasEventsCache: (block: u64) -> HasEvents,
LatestCosignedBlock: () -> u64,
}
);
impl IntendedCosign {
// Sets the intended to cosign block, clearing the prior value entirely.
pub fn set_intended_cosign(txn: &mut impl DbTxn, intended: u64) {
Self::set(txn, &(intended, None::<u64>));
}
// Sets the cosign skipped since the last intended to cosign block.
pub fn set_skipped_cosign(txn: &mut impl DbTxn, skipped: u64) {
let (intended, prior_skipped) = Self::get(txn).unwrap();
assert!(prior_skipped.is_none());
Self::set(txn, &(intended, Some(skipped)));
}
}
impl LatestCosignedBlock {
pub fn latest_cosigned_block(getter: &impl Get) -> u64 {
Self::get(getter).unwrap_or_default().max(1)
}
}
db_channel! {
SubstrateDbChannels {
CosignTransactions: (network: NetworkId) -> (Session, u64, [u8; 32]),
}
}
impl CosignTransactions {
// Append a cosign transaction.
pub fn append_cosign(txn: &mut impl DbTxn, set: ValidatorSet, number: u64, hash: [u8; 32]) {
CosignTransactions::send(txn, set.network, &(set.session, number, hash))
}
}
async fn block_has_events(
txn: &mut impl DbTxn,
serai: &Serai,
block: u64,
) -> Result<HasEvents, SeraiError> {
let cached = BlockHasEventsCache::get(txn, block);
match cached {
None => {
let serai = serai.as_of(
serai
.finalized_block_by_number(block)
.await?
.expect("couldn't get block which should've been finalized")
.hash(),
);
if !serai.validator_sets().key_gen_events().await?.is_empty() {
return Ok(HasEvents::KeyGen);
}
let has_no_events = serai.coins().burn_with_instruction_events().await?.is_empty() &&
serai.in_instructions().batch_events().await?.is_empty() &&
serai.validator_sets().new_set_events().await?.is_empty() &&
serai.validator_sets().set_retired_events().await?.is_empty();
let has_events = if has_no_events { HasEvents::No } else { HasEvents::Yes };
BlockHasEventsCache::set(txn, block, &has_events);
Ok(has_events)
}
Some(code) => Ok(code),
}
}
async fn potentially_cosign_block(
txn: &mut impl DbTxn,
serai: &Serai,
block: u64,
skipped_block: Option<u64>,
window_end_exclusive: u64,
) -> Result<bool, SeraiError> {
// The following code regarding marking cosigned if prior block is cosigned expects this block to
// not be zero
// While we could perform this check there, there's no reason not to optimize the entire function
// as such
if block == 0 {
return Ok(false);
}
let block_has_events = block_has_events(txn, serai, block).await?;
// If this block had no events and immediately follows a cosigned block, mark it as cosigned
if (block_has_events == HasEvents::No) &&
(LatestCosignedBlock::latest_cosigned_block(txn) == (block - 1))
{
log::debug!("automatically co-signing next block ({block}) since it has no events");
LatestCosignedBlock::set(txn, &block);
}
// If we skipped a block, we're supposed to sign it plus the COSIGN_DISTANCE if no other blocks
// trigger a cosigning protocol covering it
// This means there will be the maximum delay allowed from a block needing cosigning occurring
// and a cosign for it triggering
let maximally_latent_cosign_block =
skipped_block.map(|skipped_block| skipped_block + COSIGN_DISTANCE);
// If this block is within the window,
if block < window_end_exclusive {
// and set a key, cosign it
if block_has_events == HasEvents::KeyGen {
IntendedCosign::set_intended_cosign(txn, block);
// Carry skipped if it isn't included by cosigning this block
if let Some(skipped) = skipped_block {
if skipped > block {
IntendedCosign::set_skipped_cosign(txn, block);
}
}
return Ok(true);
}
} else if (Some(block) == maximally_latent_cosign_block) || (block_has_events != HasEvents::No) {
// Since this block was outside the window and had events/was maximally latent, cosign it
IntendedCosign::set_intended_cosign(txn, block);
return Ok(true);
}
Ok(false)
}
/*
Advances the cosign protocol as should be done per the latest block.
A block is considered cosigned if:
A) It was cosigned
B) It's the parent of a cosigned block
C) It immediately follows a cosigned block and has no events requiring cosigning
This only actually performs advancement within a limited bound (generally until it finds a block
which should be cosigned). Accordingly, it is necessary to call multiple times even if
`latest_number` doesn't change.
*/
async fn advance_cosign_protocol_inner(
db: &mut impl Db,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
serai: &Serai,
latest_number: u64,
) -> Result<(), SeraiError> {
let mut txn = db.txn();
const INITIAL_INTENDED_COSIGN: u64 = 1;
let (last_intended_to_cosign_block, mut skipped_block) = {
let intended_cosign = IntendedCosign::get(&txn);
// If we haven't prior intended to cosign a block, set the intended cosign to 1
if let Some(intended_cosign) = intended_cosign {
intended_cosign
} else {
IntendedCosign::set_intended_cosign(&mut txn, INITIAL_INTENDED_COSIGN);
IntendedCosign::get(&txn).unwrap()
}
};
// "windows" refers to the window of blocks where even if there's a block which should be
// cosigned, it won't be due to proximity due to the prior cosign
let mut window_end_exclusive = last_intended_to_cosign_block + COSIGN_DISTANCE;
// If we've never triggered a cosign, don't skip any cosigns based on proximity
if last_intended_to_cosign_block == INITIAL_INTENDED_COSIGN {
window_end_exclusive = 1;
}
// The consensus rules for this are `last_intended_to_cosign_block + 1`
let scan_start_block = last_intended_to_cosign_block + 1;
// As a practical optimization, we don't re-scan old blocks since old blocks are independent to
// new state
let scan_start_block = scan_start_block.max(ScanCosignFrom::get(&txn).unwrap_or(1));
// Check all blocks within the window to see if they should be cosigned
// If so, we're skipping them and need to flag them as skipped so that once the window closes, we
// do cosign them
// We only perform this check if we haven't already marked a block as skipped since the cosign
// the skipped block will cause will cosign all other blocks within this window
if skipped_block.is_none() {
let window_end_inclusive = window_end_exclusive - 1;
for b in scan_start_block ..= window_end_inclusive.min(latest_number) {
if block_has_events(&mut txn, serai, b).await? == HasEvents::Yes {
skipped_block = Some(b);
log::debug!("skipping cosigning {b} due to proximity to prior cosign");
IntendedCosign::set_skipped_cosign(&mut txn, b);
break;
}
}
}
// A block which should be cosigned
let mut to_cosign = None;
// A list of sets which are cosigning, along with a boolean of if we're in the set
let mut cosigning = vec![];
for block in scan_start_block ..= latest_number {
let actual_block = serai
.finalized_block_by_number(block)
.await?
.expect("couldn't get block which should've been finalized");
// Save the block number for this block, as needed by the cosigner to perform cosigning
SeraiBlockNumber::set(&mut txn, actual_block.hash(), &block);
if potentially_cosign_block(&mut txn, serai, block, skipped_block, window_end_exclusive).await?
{
to_cosign = Some((block, actual_block.hash()));
// Get the keys as of the prior block
// If this key sets new keys, the coordinator won't acknowledge so until we process this
// block
// We won't process this block until its co-signed
// Using the keys of the prior block ensures this deadlock isn't reached
let serai = serai.as_of(actual_block.header.parent_hash.into());
for network in serai_client::primitives::NETWORKS {
// Get the latest session to have set keys
let set_with_keys = {
let Some(latest_session) = serai.validator_sets().session(network).await? else {
continue;
};
let prior_session = Session(latest_session.0.saturating_sub(1));
if serai
.validator_sets()
.keys(ValidatorSet { network, session: prior_session })
.await?
.is_some()
{
ValidatorSet { network, session: prior_session }
} else {
let set = ValidatorSet { network, session: latest_session };
if serai.validator_sets().keys(set).await?.is_none() {
continue;
}
set
}
};
log::debug!("{:?} will be cosigning {block}", set_with_keys.network);
cosigning.push((set_with_keys, in_set(key, &serai, set_with_keys).await?.unwrap()));
}
break;
}
// If this TX is committed, always start future scanning from the next block
ScanCosignFrom::set(&mut txn, &(block + 1));
// Since we're scanning *from* the next block, tidy the cache
BlockHasEventsCache::del(&mut txn, block);
}
if let Some((number, hash)) = to_cosign {
// If this block doesn't have cosigners, yet does have events, automatically mark it as
// cosigned
if cosigning.is_empty() {
log::debug!("{} had no cosigners available, marking as cosigned", number);
LatestCosignedBlock::set(&mut txn, &number);
} else {
for (set, in_set) in cosigning {
if in_set {
log::debug!("cosigning {number} with {:?} {:?}", set.network, set.session);
CosignTransactions::append_cosign(&mut txn, set, number, hash);
}
}
}
}
txn.commit();
Ok(())
}
pub async fn advance_cosign_protocol(
db: &mut impl Db,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
serai: &Serai,
latest_number: u64,
) -> Result<(), SeraiError> {
loop {
let scan_from = ScanCosignFrom::get(db).unwrap_or(1);
// Only scan 1000 blocks at a time to limit a massive txn from forming
let scan_to = latest_number.min(scan_from + 1000);
advance_cosign_protocol_inner(db, key, serai, scan_to).await?;
// If we didn't limit the scan_to, break
if scan_to == latest_number {
break;
}
}
Ok(())
}