Remove serai from the cosign evaluator

This commit is contained in:
Luke Parker
2024-12-25 23:21:25 -05:00
parent 56af6c44eb
commit 2aebfb21af
4 changed files with 146 additions and 176 deletions

View File

@@ -143,6 +143,19 @@ macro_rules! db_channel {
Self::set(txn, $($arg,)* index_to_use, value); Self::set(txn, $($arg,)* index_to_use, value);
} }
pub(crate) fn peek(
txn: &mut impl DbTxn
$(, $arg: $arg_type)*
) -> Option<$field_type> {
let messages_recvd_key = Self::key($($arg,)* 1);
let messages_recvd = txn.get(&messages_recvd_key).map(|counter| {
u32::from_le_bytes(counter.try_into().unwrap())
}).unwrap_or(0);
let index_to_read = messages_recvd + 2;
Self::get(txn, $($arg,)* index_to_read)
}
pub(crate) fn try_recv( pub(crate) fn try_recv(
txn: &mut impl DbTxn txn: &mut impl DbTxn
$(, $arg: $arg_type)* $(, $arg: $arg_type)*

View File

@@ -1,13 +1,11 @@
use core::future::Future; use core::future::Future;
use serai_client::Serai;
use serai_db::*; use serai_db::*;
use serai_task::ContinuallyRan; use serai_task::ContinuallyRan;
use crate::{ use crate::{
*, HasEvents, GlobalSession, NetworksLatestCosignedBlock, RequestNotableCosigns,
intend::{BlockEventData, BlockEvents}, intend::{GlobalSessionsChannel, BlockEventData, BlockEvents},
}; };
create_db!( create_db!(
@@ -15,34 +13,51 @@ create_db!(
// The latest cosigned block number. // The latest cosigned block number.
LatestCosignedBlockNumber: () -> u64, LatestCosignedBlockNumber: () -> u64,
// The latest global session evaluated. // The latest global session evaluated.
LatestGlobalSessionEvaluated: () -> ([u8; 32], Vec<ValidatorSet>), LatestGlobalSessionEvaluated: () -> ([u8; 32], GlobalSession),
} }
); );
/// A task to determine if a block has been cosigned and we should handle it. /// A task to determine if a block has been cosigned and we should handle it.
// TODO: Remove `serai` from this
pub(crate) struct CosignEvaluatorTask<D: Db, R: RequestNotableCosigns> { pub(crate) struct CosignEvaluatorTask<D: Db, R: RequestNotableCosigns> {
pub(crate) db: D, pub(crate) db: D,
pub(crate) serai: Serai,
pub(crate) request: R, pub(crate) request: R,
} }
async fn get_latest_global_session_evaluated( fn get_latest_global_session_evaluated(
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
serai: &Serai, block_number: u64,
parent_hash: [u8; 32], ) -> ([u8; 32], GlobalSession) {
) -> Result<([u8; 32], Vec<ValidatorSet>), String> { let mut res = {
Ok(match LatestGlobalSessionEvaluated::get(txn) { let existing = match LatestGlobalSessionEvaluated::get(txn) {
Some(res) => res, Some(existing) => existing,
None => { None => {
// This is the initial global session let first = GlobalSessionsChannel::try_recv(txn)
// Fetch the sets participating and declare it the latest value recognized .expect("fetching latest global session yet none declared");
let sets = cosigning_sets_by_parent_hash(serai, parent_hash).await?; LatestGlobalSessionEvaluated::set(txn, &first);
let initial_global_session = GlobalSession::id(sets.clone()); first
LatestGlobalSessionEvaluated::set(txn, &(initial_global_session, sets.clone()));
(initial_global_session, sets)
} }
}) };
assert!(
existing.1.start_block_number <= block_number,
"candidate's start block number exceeds our block number"
);
existing
};
if let Some(next) = GlobalSessionsChannel::peek(txn) {
assert!(
block_number <= next.1.start_block_number,
"get_latest_global_session_evaluated wasn't called incrementally"
);
// If it's time for this session to activate, take it from the channel and set it
if block_number == next.1.start_block_number {
GlobalSessionsChannel::try_recv(txn).unwrap();
LatestGlobalSessionEvaluated::set(txn, &next);
res = next;
}
}
res
} }
impl<D: Db, R: RequestNotableCosigns> ContinuallyRan for CosignEvaluatorTask<D, R> { impl<D: Db, R: RequestNotableCosigns> ContinuallyRan for CosignEvaluatorTask<D, R> {
@@ -54,8 +69,7 @@ impl<D: Db, R: RequestNotableCosigns> ContinuallyRan for CosignEvaluatorTask<D,
let mut made_progress = false; let mut made_progress = false;
loop { loop {
let mut txn = self.db.txn(); let mut txn = self.db.txn();
let Some(BlockEventData { block_number, parent_hash, block_hash, has_events }) = let Some(BlockEventData { block_number, has_events }) = BlockEvents::try_recv(&mut txn)
BlockEvents::try_recv(&mut txn)
else { else {
break; break;
}; };
@@ -69,16 +83,11 @@ impl<D: Db, R: RequestNotableCosigns> ContinuallyRan for CosignEvaluatorTask<D,
// Because this had notable events, we require an explicit cosign for this block by a // Because this had notable events, we require an explicit cosign for this block by a
// supermajority of the prior block's validator sets // supermajority of the prior block's validator sets
HasEvents::Notable => { HasEvents::Notable => {
let (global_session, sets) = let (global_session, global_session_info) =
get_latest_global_session_evaluated(&mut txn, &self.serai, parent_hash).await?; get_latest_global_session_evaluated(&mut txn, block_number);
let mut weight_cosigned = 0; let mut weight_cosigned = 0;
let global_session_info = for set in global_session_info.sets {
GlobalSessions::get(&txn, global_session).ok_or_else(|| {
"checking if intended cosign was satisfied within an unrecognized global session"
.to_string()
})?;
for set in sets {
// Check if we have the cosign from this set // Check if we have the cosign from this set
if NetworksLatestCosignedBlock::get(&txn, global_session, set.network) if NetworksLatestCosignedBlock::get(&txn, global_session, set.network)
.map(|signed_cosign| signed_cosign.cosign.block_number) == .map(|signed_cosign| signed_cosign.cosign.block_number) ==
@@ -105,14 +114,6 @@ impl<D: Db, R: RequestNotableCosigns> ContinuallyRan for CosignEvaluatorTask<D,
"notable block (#{block_number}) wasn't yet cosigned. this should resolve shortly", "notable block (#{block_number}) wasn't yet cosigned. this should resolve shortly",
)); ));
} }
// Since this block changes the global session, update it
{
let sets = cosigning_sets(&self.serai.as_of(block_hash)).await?;
let sets = sets.into_iter().map(|(set, _key)| set).collect::<Vec<_>>();
let global_session = GlobalSession::id(sets.clone());
LatestGlobalSessionEvaluated::set(&mut txn, &(global_session, sets));
}
} }
// Since this block didn't have any notable events, we simply require a cosign for this // Since this block didn't have any notable events, we simply require a cosign for this
// block or a greater block by the current validator sets // block or a greater block by the current validator sets
@@ -135,17 +136,12 @@ impl<D: Db, R: RequestNotableCosigns> ContinuallyRan for CosignEvaluatorTask<D,
*/ */
// Get the global session for this block // Get the global session for this block
let (global_session, sets) = let (global_session, global_session_info) =
get_latest_global_session_evaluated(&mut txn, &self.serai, parent_hash).await?; get_latest_global_session_evaluated(&mut txn, block_number);
let global_session_info =
GlobalSessions::get(&txn, global_session).ok_or_else(|| {
"checking if intended cosign was satisfied within an unrecognized global session"
.to_string()
})?;
let mut weight_cosigned = 0; let mut weight_cosigned = 0;
let mut lowest_common_block: Option<u64> = None; let mut lowest_common_block: Option<u64> = None;
for set in sets { for set in global_session_info.sets {
// Check if this set cosigned this block or not // Check if this set cosigned this block or not
let Some(cosign) = let Some(cosign) =
NetworksLatestCosignedBlock::get(&txn, global_session, set.network) NetworksLatestCosignedBlock::get(&txn, global_session, set.network)

View File

@@ -21,13 +21,12 @@ create_db!(
#[derive(Debug, BorshSerialize, BorshDeserialize)] #[derive(Debug, BorshSerialize, BorshDeserialize)]
pub(crate) struct BlockEventData { pub(crate) struct BlockEventData {
pub(crate) block_number: u64, pub(crate) block_number: u64,
pub(crate) parent_hash: [u8; 32],
pub(crate) block_hash: [u8; 32],
pub(crate) has_events: HasEvents, pub(crate) has_events: HasEvents,
} }
db_channel! { db_channel! {
CosignIntendChannels { CosignIntendChannels {
GlobalSessionsChannel: () -> ([u8; 32], GlobalSession),
BlockEvents: () -> BlockEventData, BlockEvents: () -> BlockEventData,
IntendedCosigns: (set: ValidatorSet) -> CosignIntent, IntendedCosigns: (set: ValidatorSet) -> CosignIntent,
} }
@@ -87,32 +86,24 @@ impl<D: Db> ContinuallyRan for CosignIntendTask<D> {
block_number - 1 block_number - 1
))?; ))?;
} }
SubstrateBlocks::set(&mut txn, block_number, &block.hash()); SubstrateBlocks::set(&mut txn, block_number, &block.hash());
match has_events { let global_session_for_this_block = LatestGlobalSessionIntended::get(&txn);
HasEvents::Notable | HasEvents::NonNotable => {
// TODO: Replace with LatestGlobalSessionIntended, GlobalSessions
let sets = cosigning_sets_for_block(&self.serai, &block).await?;
// If this block doesn't have any cosigners, meaning it'll never be cosigned, we flag // If this is notable, it creates a new global session, which we index into the database
// it as not having any events requiring cosigning so we don't attempt to sign/require // now
// a cosign for it
if sets.is_empty() {
has_events = HasEvents::No;
}
// If this is notable, it creates a new global session, which we index into the
// database now
if has_events == HasEvents::Notable { if has_events == HasEvents::Notable {
let serai = self.serai.as_of(block.hash()); let serai = self.serai.as_of(block.hash());
let sets = cosigning_sets(&serai).await?; let sets_and_keys = cosigning_sets(&serai).await?;
let global_session = GlobalSession::id(sets.iter().map(|(set, _key)| *set).collect()); let global_session =
GlobalSession::id(sets_and_keys.iter().map(|(set, _key)| *set).collect());
let mut keys = HashMap::new(); let mut sets = Vec::with_capacity(sets_and_keys.len());
let mut stakes = HashMap::new(); let mut keys = HashMap::with_capacity(sets_and_keys.len());
let mut stakes = HashMap::with_capacity(sets_and_keys.len());
let mut total_stake = 0; let mut total_stake = 0;
for (set, key) in &sets { for (set, key) in &sets_and_keys {
sets.push(*set);
keys.insert(set.network, SeraiAddress::from(*key)); keys.insert(set.network, SeraiAddress::from(*key));
let stake = serai let stake = serai
.validator_sets() .validator_sets()
@@ -128,27 +119,45 @@ impl<D: Db> ContinuallyRan for CosignIntendTask<D> {
Err(format!("cosigning sets for block #{block_number} had 0 stake in total"))?; Err(format!("cosigning sets for block #{block_number} had 0 stake in total"))?;
} }
GlobalSessions::set( let global_session_info = GlobalSession {
&mut txn, // This session starts cosigning after this block, as this block must be cosigned by
global_session, // the existing validators
&(GlobalSession { start_block_number: block_number, keys, stakes, total_stake }), start_block_number: block_number + 1,
); sets,
if let Some(ending_global_session) = LatestGlobalSessionIntended::get(&txn) { keys,
stakes,
total_stake,
};
GlobalSessions::set(&mut txn, global_session, &global_session_info);
if let Some(ending_global_session) = global_session_for_this_block {
GlobalSessionsLastBlock::set(&mut txn, ending_global_session, &block_number); GlobalSessionsLastBlock::set(&mut txn, ending_global_session, &block_number);
} }
LatestGlobalSessionIntended::set(&mut txn, &global_session); LatestGlobalSessionIntended::set(&mut txn, &global_session);
GlobalSessionsChannel::send(&mut txn, &(global_session, global_session_info));
} }
if has_events != HasEvents::No { // If there isn't anyone available to cosign this block, meaning it'll never be cosigned,
let global_session = GlobalSession::id(sets.clone()); // we flag it as not having any events requiring cosigning so we don't attempt to
// sign/require a cosign for it
if global_session_for_this_block.is_none() {
has_events = HasEvents::No;
}
match has_events {
HasEvents::Notable | HasEvents::NonNotable => {
let global_session_for_this_block = global_session_for_this_block
.expect("global session for this block was None but still attempting to cosign it");
let global_session_info = GlobalSessions::get(&txn, global_session_for_this_block)
.expect("last global session intended wasn't saved to the database");
// Tell each set of their expectation to cosign this block // Tell each set of their expectation to cosign this block
for set in sets { for set in global_session_info.sets {
log::debug!("{:?} will be cosigning block #{block_number}", set); log::debug!("{:?} will be cosigning block #{block_number}", set);
IntendedCosigns::send( IntendedCosigns::send(
&mut txn, &mut txn,
set, set,
&CosignIntent { &CosignIntent {
global_session, global_session: global_session_for_this_block,
block_number, block_number,
block_hash: block.hash(), block_hash: block.hash(),
notable: has_events == HasEvents::Notable, notable: has_events == HasEvents::Notable,
@@ -156,19 +165,11 @@ impl<D: Db> ContinuallyRan for CosignIntendTask<D> {
); );
} }
} }
}
HasEvents::No => {} HasEvents::No => {}
} }
// Populate a singular feed with every block's status for the evluator to work off of // Populate a singular feed with every block's status for the evluator to work off of
BlockEvents::send( BlockEvents::send(&mut txn, &(BlockEventData { block_number, has_events }));
&mut txn,
&(BlockEventData {
block_number,
parent_hash: block.header.parent_hash.into(),
block_hash: block.hash(),
has_events,
}),
);
// Mark this block as handled, meaning we should scan from the next block moving on // Mark this block as handled, meaning we should scan from the next block moving on
ScanCosignFrom::set(&mut txn, &(block_number + 1)); ScanCosignFrom::set(&mut txn, &(block_number + 1));
txn.commit(); txn.commit();

View File

@@ -48,6 +48,7 @@ pub const COSIGN_CONTEXT: &[u8] = b"serai-cosign";
#[derive(Debug, BorshSerialize, BorshDeserialize)] #[derive(Debug, BorshSerialize, BorshDeserialize)]
pub(crate) struct GlobalSession { pub(crate) struct GlobalSession {
pub(crate) start_block_number: u64, pub(crate) start_block_number: u64,
pub(crate) sets: Vec<ValidatorSet>,
pub(crate) keys: HashMap<NetworkId, SeraiAddress>, pub(crate) keys: HashMap<NetworkId, SeraiAddress>,
pub(crate) stakes: HashMap<NetworkId, u64>, pub(crate) stakes: HashMap<NetworkId, u64>,
pub(crate) total_stake: u64, pub(crate) total_stake: u64,
@@ -120,15 +121,6 @@ struct CosignIntent {
notable: bool, notable: bool,
} }
/// The identification of a cosigner.
#[derive(Clone, Copy, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
pub enum Cosigner {
/// The network which produced this cosign.
ValidatorSet(NetworkId),
/// The individual validator which produced this cosign.
Validator(SeraiAddress),
}
/// A cosign. /// A cosign.
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] #[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
pub struct Cosign { pub struct Cosign {
@@ -139,7 +131,7 @@ pub struct Cosign {
/// The hash of the block to cosign. /// The hash of the block to cosign.
pub block_hash: [u8; 32], pub block_hash: [u8; 32],
/// The actual cosigner. /// The actual cosigner.
pub cosigner: Cosigner, pub cosigner: NetworkId,
} }
/// A signed cosign. /// A signed cosign.
@@ -211,29 +203,6 @@ async fn cosigning_sets(serai: &TemporalSerai<'_>) -> Result<Vec<(ValidatorSet,
Ok(sets) Ok(sets)
} }
/// Fetch the `ValidatorSet`s, and their associated keys, used for cosigning a block by the block's
/// parent hash.
async fn cosigning_sets_by_parent_hash(
serai: &Serai,
parent_hash: [u8; 32],
) -> Result<Vec<ValidatorSet>, String> {
/*
If we're cosigning block `n`, it's cosigned by the sets as of block `n-1` (as block `n` may
update the sets declared but that update shouldn't take effect until block `n` is cosigned).
That's why fetching the cosigning sets for a block by its parent hash is valid.
*/
let sets = cosigning_sets(&serai.as_of(parent_hash)).await?;
Ok(sets.into_iter().map(|(set, _key)| set).collect::<Vec<_>>())
}
/// Fetch the `ValidatorSet`s, and their associated keys, used for cosigning this block.
async fn cosigning_sets_for_block(
serai: &Serai,
block: &Block,
) -> Result<Vec<ValidatorSet>, String> {
cosigning_sets_by_parent_hash(serai, block.header.parent_hash.into()).await
}
/// An object usable to request notable cosigns for a block. /// An object usable to request notable cosigns for a block.
pub trait RequestNotableCosigns: 'static + Send { pub trait RequestNotableCosigns: 'static + Send {
/// The error type which may be encountered when requesting notable cosigns. /// The error type which may be encountered when requesting notable cosigns.
@@ -267,11 +236,11 @@ impl<D: Db> Cosigning<D> {
let (intend_task, _intend_task_handle) = Task::new(); let (intend_task, _intend_task_handle) = Task::new();
let (evaluator_task, evaluator_task_handle) = Task::new(); let (evaluator_task, evaluator_task_handle) = Task::new();
tokio::spawn( tokio::spawn(
(intend::CosignIntendTask { db: db.clone(), serai: serai.clone() }) (intend::CosignIntendTask { db: db.clone(), serai })
.continually_run(intend_task, vec![evaluator_task_handle]), .continually_run(intend_task, vec![evaluator_task_handle]),
); );
tokio::spawn( tokio::spawn(
(evaluator::CosignEvaluatorTask { db: db.clone(), serai, request }) (evaluator::CosignEvaluatorTask { db: db.clone(), request })
.continually_run(evaluator_task, tasks_to_run_upon_cosigning), .continually_run(evaluator_task, tasks_to_run_upon_cosigning),
); );
Self { db } Self { db }
@@ -349,12 +318,7 @@ impl<D: Db> Cosigning<D> {
// TODO: Don't overload bool here // TODO: Don't overload bool here
pub fn intake_cosign(&mut self, signed_cosign: &SignedCosign) -> Result<bool, String> { pub fn intake_cosign(&mut self, signed_cosign: &SignedCosign) -> Result<bool, String> {
let cosign = &signed_cosign.cosign; let cosign = &signed_cosign.cosign;
let network = cosign.cosigner;
let Cosigner::ValidatorSet(network) = cosign.cosigner else {
// TODO
// Individually signed cosign despite that protocol not being implemented
return Ok(false);
};
// Check this isn't a dated cosign // Check this isn't a dated cosign
if let Some(existing) = if let Some(existing) =
@@ -374,7 +338,7 @@ impl<D: Db> Cosigning<D> {
// Unrecognized global session // Unrecognized global session
return Ok(true); return Ok(true);
}; };
if cosign.block_number <= global_session.start_block_number { if cosign.block_number < global_session.start_block_number {
// Cosign is for a block predating the global session // Cosign is for a block predating the global session
return Ok(false); return Ok(false);
} }
@@ -387,14 +351,11 @@ impl<D: Db> Cosigning<D> {
// Check the cosign's signature // Check the cosign's signature
{ {
let key = Public::from(match cosign.cosigner { let key = Public::from({
Cosigner::ValidatorSet(network) => {
let Some(key) = global_session.keys.get(&network) else { let Some(key) = global_session.keys.get(&network) else {
return Ok(false); return Ok(false);
}; };
*key *key
}
Cosigner::Validator(signer) => signer,
}); });
if !signed_cosign.verify_signature(key) { if !signed_cosign.verify_signature(key) {
@@ -409,7 +370,10 @@ impl<D: Db> Cosigning<D> {
if our_block_hash == cosign.block_hash { if our_block_hash == cosign.block_hash {
// If this is for a future global session, we don't acknowledge this cosign at this time // If this is for a future global session, we don't acknowledge this cosign at this time
if global_session.start_block_number > LatestCosignedBlockNumber::get(&txn).unwrap_or(0) { let latest_cosigned_block_number = LatestCosignedBlockNumber::get(&txn).unwrap_or(0);
// This global session starts the block *after* its declaration, so we want to check if the
// block declaring it was cosigned
if (global_session.start_block_number - 1) > latest_cosigned_block_number {
drop(txn); drop(txn);
return Ok(true); return Ok(true);
} }
@@ -418,17 +382,13 @@ impl<D: Db> Cosigning<D> {
} else { } else {
let mut faults = Faults::get(&txn, cosign.global_session).unwrap_or(vec![]); let mut faults = Faults::get(&txn, cosign.global_session).unwrap_or(vec![]);
// Only handle this as a fault if this set wasn't prior faulty // Only handle this as a fault if this set wasn't prior faulty
if !faults.iter().any(|cosign| cosign.cosign.cosigner == Cosigner::ValidatorSet(network)) { if !faults.iter().any(|cosign| cosign.cosign.cosigner == network) {
faults.push(signed_cosign.clone()); faults.push(signed_cosign.clone());
Faults::set(&mut txn, cosign.global_session, &faults); Faults::set(&mut txn, cosign.global_session, &faults);
let mut weight_cosigned = 0; let mut weight_cosigned = 0;
for fault in &faults { for fault in &faults {
let Cosigner::ValidatorSet(network) = fault.cosign.cosigner else { let Some(stake) = global_session.stakes.get(&fault.cosign.cosigner) else {
// TODO when we implement the non-ValidatorSet cosigner protocol
Err("non-ValidatorSet cosigner had a fault".to_string())?
};
let Some(stake) = global_session.stakes.get(&network) else {
Err("cosigner with recognized key didn't have a stake entry saved".to_string())? Err("cosigner with recognized key didn't have a stake entry saved".to_string())?
}; };
weight_cosigned += stake; weight_cosigned += stake;