mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-08 04:09:23 +00:00
Add a new primitive of a DB-backed channel
The coordinator already had one of these, albeit implemented much worse than the one now properly introduced. It had to either be sending or receiving, whereas the new one can do both at the same time. This replaces said instance and enables pleasant patterns when implementing the processor/coordinator.
This commit is contained in:
@@ -29,7 +29,7 @@ pub fn serai_db_key(
|
|||||||
///
|
///
|
||||||
/// ```ignore
|
/// ```ignore
|
||||||
/// create_db!(
|
/// create_db!(
|
||||||
/// TrubutariesDb {
|
/// TributariesDb {
|
||||||
/// AttemptsDb: (key_bytes: &[u8], attempt_id: u32) -> u64,
|
/// AttemptsDb: (key_bytes: &[u8], attempt_id: u32) -> u64,
|
||||||
/// ExpiredDb: (genesis: [u8; 32]) -> Vec<u8>
|
/// ExpiredDb: (genesis: [u8; 32]) -> Vec<u8>
|
||||||
/// }
|
/// }
|
||||||
@@ -70,3 +70,51 @@ macro_rules! create_db {
|
|||||||
)*
|
)*
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! db_channel {
|
||||||
|
($db_name: ident {
|
||||||
|
$($field_name: ident: ($($arg: ident: $arg_type: ty),*) -> $field_type: ty$(,)?)*
|
||||||
|
}) => {
|
||||||
|
$(
|
||||||
|
create_db! {
|
||||||
|
$db_name {
|
||||||
|
$field_name: ($($arg: $arg_type,)* index: u32) -> $field_type,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl $field_name {
|
||||||
|
pub fn send(txn: &mut impl DbTxn $(, $arg: $arg_type)*, value: &$field_type) {
|
||||||
|
// Use index 0 to store the amount of messages
|
||||||
|
let messages_sent_key = $field_name::key($($arg),*, 0);
|
||||||
|
let messages_sent = txn.get(&messages_sent_key).map(|counter| {
|
||||||
|
u32::from_le_bytes(counter.try_into().unwrap())
|
||||||
|
}).unwrap_or(0);
|
||||||
|
txn.put(&messages_sent_key, (messages_sent + 1).to_le_bytes());
|
||||||
|
|
||||||
|
// + 2 as index 1 is used for the amount of messages read
|
||||||
|
// Using distinct counters enables send to be called without mutating anything recv may
|
||||||
|
// at the same time
|
||||||
|
let index_to_use = messages_sent + 2;
|
||||||
|
|
||||||
|
$field_name::set(txn, $($arg),*, index_to_use, value);
|
||||||
|
}
|
||||||
|
pub fn try_recv(txn: &mut impl DbTxn $(, $arg: $arg_type)*) -> Option<$field_type> {
|
||||||
|
let messages_recvd_key = $field_name::key($($arg),*, 1);
|
||||||
|
let messages_recvd = txn.get(&messages_recvd_key).map(|counter| {
|
||||||
|
u32::from_le_bytes(counter.try_into().unwrap())
|
||||||
|
}).unwrap_or(0);
|
||||||
|
|
||||||
|
let index_to_read = messages_recvd + 2;
|
||||||
|
|
||||||
|
let res = $field_name::get(txn, $($arg),*, index_to_read);
|
||||||
|
if res.is_some() {
|
||||||
|
$field_name::del(txn, $($arg),*, index_to_read);
|
||||||
|
txn.put(&messages_recvd_key, (messages_recvd + 1).to_le_bytes());
|
||||||
|
}
|
||||||
|
res
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)*
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|||||||
@@ -847,33 +847,36 @@ async fn handle_cosigns_and_batch_publication<D: Db, P: P2p>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Handle pending cosigns
|
// Handle pending cosigns
|
||||||
while let Some((session, block, hash)) = CosignTransactions::peek_cosign(&db, network) {
|
{
|
||||||
let Some(ActiveTributary { spec, tributary }) = tributaries.get(&session) else {
|
let mut txn = db.txn();
|
||||||
log::warn!("didn't yet have tributary we're supposed to cosign with");
|
while let Some((session, block, hash)) = CosignTransactions::try_recv(&mut txn, network) {
|
||||||
break;
|
let Some(ActiveTributary { spec, tributary }) = tributaries.get(&session) else {
|
||||||
};
|
log::warn!("didn't yet have tributary we're supposed to cosign with");
|
||||||
log::info!(
|
break;
|
||||||
"{network:?} {session:?} cosigning block #{block} (hash {}...)",
|
};
|
||||||
hex::encode(&hash[.. 8])
|
log::info!(
|
||||||
);
|
"{network:?} {session:?} cosigning block #{block} (hash {}...)",
|
||||||
let tx = Transaction::CosignSubstrateBlock(hash);
|
hex::encode(&hash[.. 8])
|
||||||
let res = tributary.provide_transaction(tx.clone()).await;
|
);
|
||||||
if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) {
|
let tx = Transaction::CosignSubstrateBlock(hash);
|
||||||
if res == Err(ProvidedError::LocalMismatchesOnChain) {
|
let res = tributary.provide_transaction(tx.clone()).await;
|
||||||
// Spin, since this is a crit for this Tributary
|
if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) {
|
||||||
loop {
|
if res == Err(ProvidedError::LocalMismatchesOnChain) {
|
||||||
log::error!(
|
// Spin, since this is a crit for this Tributary
|
||||||
"{}. tributary: {}, provided: {:?}",
|
loop {
|
||||||
"tributary added distinct CosignSubstrateBlock",
|
log::error!(
|
||||||
hex::encode(spec.genesis()),
|
"{}. tributary: {}, provided: {:?}",
|
||||||
&tx,
|
"tributary added distinct CosignSubstrateBlock",
|
||||||
);
|
hex::encode(spec.genesis()),
|
||||||
sleep(Duration::from_secs(60)).await;
|
&tx,
|
||||||
|
);
|
||||||
|
sleep(Duration::from_secs(60)).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
panic!("provided an invalid CosignSubstrateBlock: {res:?}");
|
||||||
}
|
}
|
||||||
panic!("provided an invalid CosignSubstrateBlock: {res:?}");
|
|
||||||
}
|
}
|
||||||
CosignTransactions::take_cosign(db.txn(), network);
|
txn.commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify any publifshed `Batch`s
|
// Verify any publifshed `Batch`s
|
||||||
|
|||||||
@@ -37,7 +37,6 @@ pub(crate) use tributary::{ReadWrite, P2p as TributaryP2p};
|
|||||||
|
|
||||||
use crate::{Transaction, Block, Tributary, ActiveTributary, TributaryEvent};
|
use crate::{Transaction, Block, Tributary, ActiveTributary, TributaryEvent};
|
||||||
|
|
||||||
// TODO: Use distinct topics
|
|
||||||
const LIBP2P_TOPIC: &str = "serai-coordinator";
|
const LIBP2P_TOPIC: &str = "serai-coordinator";
|
||||||
|
|
||||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Encode, Decode)]
|
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Encode, Decode)]
|
||||||
|
|||||||
@@ -1,5 +1,3 @@
|
|||||||
use std::sync::{OnceLock, MutexGuard, Mutex};
|
|
||||||
|
|
||||||
use scale::{Encode, Decode};
|
use scale::{Encode, Decode};
|
||||||
|
|
||||||
pub use serai_db::*;
|
pub use serai_db::*;
|
||||||
@@ -10,11 +8,10 @@ use serai_client::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
create_db! {
|
create_db! {
|
||||||
NewSubstrateDb {
|
SubstrateDb {
|
||||||
CosignTriggered: () -> (),
|
CosignTriggered: () -> (),
|
||||||
IntendedCosign: () -> (u64, Option<u64>),
|
IntendedCosign: () -> (u64, Option<u64>),
|
||||||
BlockHasEvents: (block: u64) -> u8,
|
BlockHasEvents: (block: u64) -> u8,
|
||||||
CosignTransactions: (network: NetworkId) -> Vec<(Session, u64, [u8; 32])>,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -29,48 +26,15 @@ impl IntendedCosign {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// This guarantees:
|
db_channel! {
|
||||||
// 1) Appended transactions are appended
|
SubstrateDb {
|
||||||
// 2) Taking cosigns does not clear any TXs which weren't taken
|
CosignTransactions: (network: NetworkId) -> (Session, u64, [u8; 32]),
|
||||||
// 3) Taking does actually clear the set
|
|
||||||
static COSIGN_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
|
|
||||||
pub struct CosignTxn<T: DbTxn>(T, MutexGuard<'static, ()>);
|
|
||||||
impl<T: DbTxn> CosignTxn<T> {
|
|
||||||
pub fn new(txn: T) -> Self {
|
|
||||||
Self(txn, COSIGN_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap())
|
|
||||||
}
|
|
||||||
pub fn commit(self) {
|
|
||||||
self.0.commit();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
impl CosignTransactions {
|
impl CosignTransactions {
|
||||||
// Append a cosign transaction.
|
// Append a cosign transaction.
|
||||||
pub fn append_cosign<T: DbTxn>(
|
pub fn append_cosign(txn: &mut impl DbTxn, set: ValidatorSet, number: u64, hash: [u8; 32]) {
|
||||||
txn: &mut CosignTxn<T>,
|
CosignTransactions::send(txn, set.network, &(set.session, number, hash))
|
||||||
set: ValidatorSet,
|
|
||||||
number: u64,
|
|
||||||
hash: [u8; 32],
|
|
||||||
) {
|
|
||||||
#[allow(clippy::unwrap_or_default)]
|
|
||||||
let mut txs = CosignTransactions::get(&txn.0, set.network).unwrap_or(vec![]);
|
|
||||||
txs.push((set.session, number, hash));
|
|
||||||
CosignTransactions::set(&mut txn.0, set.network, &txs);
|
|
||||||
}
|
|
||||||
// Peek at the next cosign transaction.
|
|
||||||
pub fn peek_cosign(getter: &impl Get, network: NetworkId) -> Option<(Session, u64, [u8; 32])> {
|
|
||||||
let mut to_cosign = CosignTransactions::get(getter, network)?;
|
|
||||||
if to_cosign.is_empty() {
|
|
||||||
None?
|
|
||||||
}
|
|
||||||
Some(to_cosign.swap_remove(0))
|
|
||||||
}
|
|
||||||
// Take the next transaction, panicking if it doesn't exist.
|
|
||||||
pub fn take_cosign(mut txn: impl DbTxn, network: NetworkId) {
|
|
||||||
let _lock = COSIGN_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap();
|
|
||||||
let mut txs = CosignTransactions::get(&txn, network).unwrap();
|
|
||||||
txs.remove(0);
|
|
||||||
CosignTransactions::set(&mut txn, network, &txs);
|
|
||||||
txn.commit();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -368,7 +368,6 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
|
|||||||
|
|
||||||
// TODO: If this block directly builds off a cosigned block *and* doesn't contain events, mark
|
// TODO: If this block directly builds off a cosigned block *and* doesn't contain events, mark
|
||||||
// cosigned,
|
// cosigned,
|
||||||
// TODO: Can we remove any of these events while maintaining security?
|
|
||||||
{
|
{
|
||||||
// If:
|
// If:
|
||||||
// A) This block has events and it's been at least X blocks since the last cosign or
|
// A) This block has events and it's been at least X blocks since the last cosign or
|
||||||
@@ -533,16 +532,14 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
|
|||||||
if let Some(has_no_cosigners) = has_no_cosigners {
|
if let Some(has_no_cosigners) = has_no_cosigners {
|
||||||
log::debug!("{} had no cosigners available, marking as cosigned", has_no_cosigners.number());
|
log::debug!("{} had no cosigners available, marking as cosigned", has_no_cosigners.number());
|
||||||
SubstrateDb::<D>::set_latest_cosigned_block(&mut txn, has_no_cosigners.number());
|
SubstrateDb::<D>::set_latest_cosigned_block(&mut txn, has_no_cosigners.number());
|
||||||
txn.commit();
|
|
||||||
} else {
|
} else {
|
||||||
CosignTriggered::set(&mut txn, &());
|
CosignTriggered::set(&mut txn, &());
|
||||||
let mut txn = CosignTxn::new(txn);
|
|
||||||
for (set, block, hash) in cosign {
|
for (set, block, hash) in cosign {
|
||||||
log::debug!("cosigning {block} with {:?} {:?}", set.network, set.session);
|
log::debug!("cosigning {block} with {:?} {:?}", set.network, set.session);
|
||||||
CosignTransactions::append_cosign(&mut txn, set, block, hash);
|
CosignTransactions::append_cosign(&mut txn, set, block, hash);
|
||||||
}
|
}
|
||||||
txn.commit();
|
|
||||||
}
|
}
|
||||||
|
txn.commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reduce to the latest cosigned block
|
// Reduce to the latest cosigned block
|
||||||
|
|||||||
Reference in New Issue
Block a user