Remove GlobalSessions DB entry

If we read the currently-being-evaluated session from the evaluator, we can
avoid paying the storage costs on all sessions ad-infinitum.
This commit is contained in:
Luke Parker
2024-12-25 23:51:24 -05:00
parent 2aebfb21af
commit f336ab1ece
4 changed files with 79 additions and 53 deletions

View File

@@ -144,17 +144,17 @@ macro_rules! db_channel {
Self::set(txn, $($arg,)* index_to_use, value); Self::set(txn, $($arg,)* index_to_use, value);
} }
pub(crate) fn peek( pub(crate) fn peek(
txn: &mut impl DbTxn getter: &impl Get
$(, $arg: $arg_type)* $(, $arg: $arg_type)*
) -> Option<$field_type> { ) -> Option<$field_type> {
let messages_recvd_key = Self::key($($arg,)* 1); let messages_recvd_key = Self::key($($arg,)* 1);
let messages_recvd = txn.get(&messages_recvd_key).map(|counter| { let messages_recvd = getter.get(&messages_recvd_key).map(|counter| {
u32::from_le_bytes(counter.try_into().unwrap()) u32::from_le_bytes(counter.try_into().unwrap())
}).unwrap_or(0); }).unwrap_or(0);
let index_to_read = messages_recvd + 2; let index_to_read = messages_recvd + 2;
Self::get(txn, $($arg,)* index_to_read) Self::get(getter, $($arg,)* index_to_read)
} }
pub(crate) fn try_recv( pub(crate) fn try_recv(
txn: &mut impl DbTxn txn: &mut impl DbTxn

View File

@@ -5,35 +5,38 @@ use serai_task::ContinuallyRan;
use crate::{ use crate::{
HasEvents, GlobalSession, NetworksLatestCosignedBlock, RequestNotableCosigns, HasEvents, GlobalSession, NetworksLatestCosignedBlock, RequestNotableCosigns,
intend::{GlobalSessionsChannel, BlockEventData, BlockEvents}, intend::{GlobalSessions, BlockEventData, BlockEvents},
}; };
create_db!( create_db!(
SubstrateCosignEvaluator { SubstrateCosignEvaluator {
// The latest cosigned block number. // The latest cosigned block number.
LatestCosignedBlockNumber: () -> u64, LatestCosignedBlockNumber: () -> u64,
// The latest global session evaluated. // The global session currently being evaluated.
LatestGlobalSessionEvaluated: () -> ([u8; 32], GlobalSession), CurrentlyEvaluatedGlobalSession: () -> ([u8; 32], GlobalSession),
} }
); );
/// A task to determine if a block has been cosigned and we should handle it. // This is a strict function which won't panic, even with a malicious Serai node, so long as:
pub(crate) struct CosignEvaluatorTask<D: Db, R: RequestNotableCosigns> { // - It's called incrementally
pub(crate) db: D, // - It's only called for block numbers we've completed indexing on within the intend task
pub(crate) request: R, // - It's only called for block numbers after a global session has started
} // - The global sessions channel is populated as the block declaring the session is indexed
// Which all hold true within the context of this task and the intend task.
fn get_latest_global_session_evaluated( //
// This function will also ensure the currently evaluated global session is incremented once we
// finish evaluation of the prior session.
fn currently_evaluated_global_session_strict(
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
block_number: u64, block_number: u64,
) -> ([u8; 32], GlobalSession) { ) -> ([u8; 32], GlobalSession) {
let mut res = { let mut res = {
let existing = match LatestGlobalSessionEvaluated::get(txn) { let existing = match CurrentlyEvaluatedGlobalSession::get(txn) {
Some(existing) => existing, Some(existing) => existing,
None => { None => {
let first = GlobalSessionsChannel::try_recv(txn) let first =
.expect("fetching latest global session yet none declared"); GlobalSessions::try_recv(txn).expect("fetching latest global session yet none declared");
LatestGlobalSessionEvaluated::set(txn, &first); CurrentlyEvaluatedGlobalSession::set(txn, &first);
first first
} }
}; };
@@ -44,15 +47,15 @@ fn get_latest_global_session_evaluated(
existing existing
}; };
if let Some(next) = GlobalSessionsChannel::peek(txn) { if let Some(next) = GlobalSessions::peek(txn) {
assert!( assert!(
block_number <= next.1.start_block_number, block_number <= next.1.start_block_number,
"get_latest_global_session_evaluated wasn't called incrementally" "currently_evaluated_global_session_strict wasn't called incrementally"
); );
// If it's time for this session to activate, take it from the channel and set it // If it's time for this session to activate, take it from the channel and set it
if block_number == next.1.start_block_number { if block_number == next.1.start_block_number {
GlobalSessionsChannel::try_recv(txn).unwrap(); GlobalSessions::try_recv(txn).unwrap();
LatestGlobalSessionEvaluated::set(txn, &next); CurrentlyEvaluatedGlobalSession::set(txn, &next);
res = next; res = next;
} }
} }
@@ -60,6 +63,29 @@ fn get_latest_global_session_evaluated(
res res
} }
// This is a non-strict function which won't panic, and also won't increment the session as needed.
pub(crate) fn currently_evaluated_global_session(
getter: &impl Get,
) -> Option<([u8; 32], GlobalSession)> {
// If there's a next session...
if let Some(next_global_session) = GlobalSessions::peek(getter) {
// and we've already evaluated the cosigns for the block declaring it...
if LatestCosignedBlockNumber::get(getter) == Some(next_global_session.1.start_block_number - 1)
{
// return it as the current session.
return Some(next_global_session);
}
}
// Else, return the current session
CurrentlyEvaluatedGlobalSession::get(getter)
}
/// A task to determine if a block has been cosigned and we should handle it.
pub(crate) struct CosignEvaluatorTask<D: Db, R: RequestNotableCosigns> {
pub(crate) db: D,
pub(crate) request: R,
}
impl<D: Db, R: RequestNotableCosigns> ContinuallyRan for CosignEvaluatorTask<D, R> { impl<D: Db, R: RequestNotableCosigns> ContinuallyRan for CosignEvaluatorTask<D, R> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move { async move {
@@ -84,7 +110,7 @@ impl<D: Db, R: RequestNotableCosigns> ContinuallyRan for CosignEvaluatorTask<D,
// supermajority of the prior block's validator sets // supermajority of the prior block's validator sets
HasEvents::Notable => { HasEvents::Notable => {
let (global_session, global_session_info) = let (global_session, global_session_info) =
get_latest_global_session_evaluated(&mut txn, block_number); currently_evaluated_global_session_strict(&mut txn, block_number);
let mut weight_cosigned = 0; let mut weight_cosigned = 0;
for set in global_session_info.sets { for set in global_session_info.sets {
@@ -137,7 +163,7 @@ impl<D: Db, R: RequestNotableCosigns> ContinuallyRan for CosignEvaluatorTask<D,
// Get the global session for this block // Get the global session for this block
let (global_session, global_session_info) = let (global_session, global_session_info) =
get_latest_global_session_evaluated(&mut txn, block_number); currently_evaluated_global_session_strict(&mut txn, block_number);
let mut weight_cosigned = 0; let mut weight_cosigned = 0;
let mut lowest_common_block: Option<u64> = None; let mut lowest_common_block: Option<u64> = None;

View File

@@ -26,7 +26,7 @@ pub(crate) struct BlockEventData {
db_channel! { db_channel! {
CosignIntendChannels { CosignIntendChannels {
GlobalSessionsChannel: () -> ([u8; 32], GlobalSession), GlobalSessions: () -> ([u8; 32], GlobalSession),
BlockEvents: () -> BlockEventData, BlockEvents: () -> BlockEventData,
IntendedCosigns: (set: ValidatorSet) -> CosignIntent, IntendedCosigns: (set: ValidatorSet) -> CosignIntent,
} }
@@ -128,12 +128,14 @@ impl<D: Db> ContinuallyRan for CosignIntendTask<D> {
stakes, stakes,
total_stake, total_stake,
}; };
GlobalSessions::set(&mut txn, global_session, &global_session_info); if let Some((ending_global_session, _ending_global_session_info)) = global_session_for_this_block {
if let Some(ending_global_session) = global_session_for_this_block {
GlobalSessionsLastBlock::set(&mut txn, ending_global_session, &block_number); GlobalSessionsLastBlock::set(&mut txn, ending_global_session, &block_number);
} }
LatestGlobalSessionIntended::set(&mut txn, &global_session); LatestGlobalSessionIntended::set(
GlobalSessionsChannel::send(&mut txn, &(global_session, global_session_info)); &mut txn,
&(global_session, global_session_info.clone()),
);
GlobalSessions::send(&mut txn, &(global_session, global_session_info));
} }
// If there isn't anyone available to cosign this block, meaning it'll never be cosigned, // If there isn't anyone available to cosign this block, meaning it'll never be cosigned,
@@ -145,10 +147,9 @@ impl<D: Db> ContinuallyRan for CosignIntendTask<D> {
match has_events { match has_events {
HasEvents::Notable | HasEvents::NonNotable => { HasEvents::Notable | HasEvents::NonNotable => {
let global_session_for_this_block = global_session_for_this_block let (global_session_for_this_block, global_session_info) =
.expect("global session for this block was None but still attempting to cosign it"); global_session_for_this_block
let global_session_info = GlobalSessions::get(&txn, global_session_for_this_block) .expect("global session for this block was None but still attempting to cosign it");
.expect("last global session intended wasn't saved to the database");
// Tell each set of their expectation to cosign this block // Tell each set of their expectation to cosign this block
for set in global_session_info.sets { for set in global_session_info.sets {

View File

@@ -45,7 +45,7 @@ pub const COSIGN_CONTEXT: &[u8] = b"serai-cosign";
have validator sets follow two distinct global sessions without breaking the bounds of the have validator sets follow two distinct global sessions without breaking the bounds of the
cosigning protocol. cosigning protocol.
*/ */
#[derive(Debug, BorshSerialize, BorshDeserialize)] #[derive(Clone, Debug, BorshSerialize, BorshDeserialize)]
pub(crate) struct GlobalSession { pub(crate) struct GlobalSession {
pub(crate) start_block_number: u64, pub(crate) start_block_number: u64,
pub(crate) sets: Vec<ValidatorSet>, pub(crate) sets: Vec<ValidatorSet>,
@@ -66,14 +66,12 @@ create_db! {
// An index of Substrate blocks // An index of Substrate blocks
SubstrateBlocks: (block_number: u64) -> [u8; 32], SubstrateBlocks: (block_number: u64) -> [u8; 32],
// A mapping from a global session's ID to its relevant information.
GlobalSessions: (global_session: [u8; 32]) -> GlobalSession,
// The last block to be cosigned by a global session. // The last block to be cosigned by a global session.
GlobalSessionsLastBlock: (global_session: [u8; 32]) -> u64, GlobalSessionsLastBlock: (global_session: [u8; 32]) -> u64,
// The latest global session intended. // The latest global session intended.
// //
// This is distinct from the latest global session for which we've evaluated the cosigns for. // This is distinct from the latest global session for which we've evaluated the cosigns for.
LatestGlobalSessionIntended: () -> [u8; 32], LatestGlobalSessionIntended: () -> ([u8; 32], GlobalSession),
// The following are managed by the `intake_cosign` function present in this file // The following are managed by the `intake_cosign` function present in this file
@@ -287,7 +285,9 @@ impl<D: Db> Cosigning<D> {
} }
cosigns cosigns
} else { } else {
let Some(latest_global_session) = LatestGlobalSessionIntended::get(&self.db) else { let Some((latest_global_session, _latest_global_session_info)) =
LatestGlobalSessionIntended::get(&self.db)
else {
return vec![]; return vec![];
}; };
let mut cosigns = Vec::with_capacity(serai_client::primitives::NETWORKS.len()); let mut cosigns = Vec::with_capacity(serai_client::primitives::NETWORKS.len());
@@ -320,7 +320,7 @@ impl<D: Db> Cosigning<D> {
let cosign = &signed_cosign.cosign; let cosign = &signed_cosign.cosign;
let network = cosign.cosigner; let network = cosign.cosigner;
// Check this isn't a dated cosign // Check this isn't a dated cosign within its global session (as it would be if rebroadcasted)
if let Some(existing) = if let Some(existing) =
NetworksLatestCosignedBlock::get(&self.db, cosign.global_session, network) NetworksLatestCosignedBlock::get(&self.db, cosign.global_session, network)
{ {
@@ -334,11 +334,19 @@ impl<D: Db> Cosigning<D> {
return Ok(true); return Ok(true);
}; };
let Some(global_session) = GlobalSessions::get(&self.db, cosign.global_session) else { // Check the cosign aligns with the global session we're currently working on
// Unrecognized global session let Some((global_session, global_session_info)) =
evaluator::currently_evaluated_global_session(&self.db)
else {
// We haven't recognized any global sessions yet
return Ok(true); return Ok(true);
}; };
if cosign.block_number < global_session.start_block_number { if cosign.global_session != global_session {
return Ok(true);
}
// Check the cosigned block number is in range to the global session
if cosign.block_number < global_session_info.start_block_number {
// Cosign is for a block predating the global session // Cosign is for a block predating the global session
return Ok(false); return Ok(false);
} }
@@ -352,7 +360,7 @@ impl<D: Db> Cosigning<D> {
// Check the cosign's signature // Check the cosign's signature
{ {
let key = Public::from({ let key = Public::from({
let Some(key) = global_session.keys.get(&network) else { let Some(key) = global_session_info.keys.get(&network) else {
return Ok(false); return Ok(false);
}; };
*key *key
@@ -369,15 +377,6 @@ impl<D: Db> Cosigning<D> {
let mut txn = self.db.txn(); let mut txn = self.db.txn();
if our_block_hash == cosign.block_hash { if our_block_hash == cosign.block_hash {
// If this is for a future global session, we don't acknowledge this cosign at this time
let latest_cosigned_block_number = LatestCosignedBlockNumber::get(&txn).unwrap_or(0);
// This global session starts the block *after* its declaration, so we want to check if the
// block declaring it was cosigned
if (global_session.start_block_number - 1) > latest_cosigned_block_number {
drop(txn);
return Ok(true);
}
NetworksLatestCosignedBlock::set(&mut txn, cosign.global_session, network, signed_cosign); NetworksLatestCosignedBlock::set(&mut txn, cosign.global_session, network, signed_cosign);
} else { } else {
let mut faults = Faults::get(&txn, cosign.global_session).unwrap_or(vec![]); let mut faults = Faults::get(&txn, cosign.global_session).unwrap_or(vec![]);
@@ -388,14 +387,14 @@ impl<D: Db> Cosigning<D> {
let mut weight_cosigned = 0; let mut weight_cosigned = 0;
for fault in &faults { for fault in &faults {
let Some(stake) = global_session.stakes.get(&fault.cosign.cosigner) else { let Some(stake) = global_session_info.stakes.get(&fault.cosign.cosigner) else {
Err("cosigner with recognized key didn't have a stake entry saved".to_string())? Err("cosigner with recognized key didn't have a stake entry saved".to_string())?
}; };
weight_cosigned += stake; weight_cosigned += stake;
} }
// Check if the sum weight means a fault has occurred // Check if the sum weight means a fault has occurred
if weight_cosigned >= ((global_session.total_stake * 17) / 100) { if weight_cosigned >= ((global_session_info.total_stake * 17) / 100) {
FaultedSession::set(&mut txn, &cosign.global_session); FaultedSession::set(&mut txn, &cosign.global_session);
} }
} }