1 Commits

Author SHA1 Message Date
Luke Parker
ce3b90541e Make transactions undroppable
coordinator/cosign/src/delay.rs literally demonstrates how we'd need to rewrite
our handling of transactions with this change. It can be cleaned up a bit but
already identifies ergonomic issues. It also doesn't model passing an &mut txn
to an async function, which would also require using the droppable wrapper
struct.

To locally see this build, run

RUSTFLAGS="-Zpanic_abort_tests -C panic=abort" cargo +nightly build -p serai-cosign --all-targets

To locally see this fail to build, run

cargo build -p serai-cosign --all-targets

While it doesn't say which line causes it fail to build, the only distinction
is panic=unwind.

For more context, please see #578.
2025-01-15 03:56:59 -05:00
8 changed files with 93 additions and 9 deletions

View File

@@ -30,13 +30,53 @@ pub trait Get {
/// is undefined. The transaction may block, deadlock, panic, overwrite one of the two values /// is undefined. The transaction may block, deadlock, panic, overwrite one of the two values
/// randomly, or any other action, at time of write or at time of commit. /// randomly, or any other action, at time of write or at time of commit.
#[must_use] #[must_use]
pub trait DbTxn: Send + Get { pub trait DbTxn: Sized + Send + Get {
/// Write a value to this key. /// Write a value to this key.
fn put(&mut self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>); fn put(&mut self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>);
/// Delete the value from this key. /// Delete the value from this key.
fn del(&mut self, key: impl AsRef<[u8]>); fn del(&mut self, key: impl AsRef<[u8]>);
/// Commit this transaction. /// Commit this transaction.
fn commit(self); fn commit(self);
/// Close this transaction.
///
/// This is equivalent to `Drop` on transactions which can be dropped. This is explicit and works
/// with transactions which can't be dropped.
fn close(self) {
drop(self);
}
}
// Credit for the idea goes to https://jack.wrenn.fyi/blog/undroppable
pub struct Undroppable<T>(Option<T>);
impl<T> Drop for Undroppable<T> {
fn drop(&mut self) {
// Use an assertion at compile time to prevent this code from compiling if generated
#[allow(clippy::assertions_on_constants)]
const {
assert!(false, "Undroppable DbTxn was dropped. Ensure all code paths call commit or close");
}
}
}
impl<T: DbTxn> Get for Undroppable<T> {
fn get(&self, key: impl AsRef<[u8]>) -> Option<Vec<u8>> {
self.0.as_ref().unwrap().get(key)
}
}
impl<T: DbTxn> DbTxn for Undroppable<T> {
fn put(&mut self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) {
self.0.as_mut().unwrap().put(key, value);
}
fn del(&mut self, key: impl AsRef<[u8]>) {
self.0.as_mut().unwrap().del(key);
}
fn commit(mut self) {
self.0.take().unwrap().commit();
let _ = core::mem::ManuallyDrop::new(self);
}
fn close(mut self) {
drop(self.0.take().unwrap());
let _ = core::mem::ManuallyDrop::new(self);
}
} }
/// A database supporting atomic transaction. /// A database supporting atomic transaction.
@@ -51,6 +91,10 @@ pub trait Db: 'static + Send + Sync + Clone + Get {
let dst_len = u8::try_from(item_dst.len()).unwrap(); let dst_len = u8::try_from(item_dst.len()).unwrap();
[[db_len].as_ref(), db_dst, [dst_len].as_ref(), item_dst, key.as_ref()].concat() [[db_len].as_ref(), db_dst, [dst_len].as_ref(), item_dst, key.as_ref()].concat()
} }
/// Open a new transaction. /// Open a new transaction which may be dropped.
fn txn(&mut self) -> Self::Transaction<'_>; fn unsafe_txn(&mut self) -> Self::Transaction<'_>;
/// Open a new transaction which must be committed or closed.
fn txn(&mut self) -> Undroppable<Self::Transaction<'_>> {
Undroppable(Some(self.unsafe_txn()))
}
} }

View File

@@ -74,7 +74,7 @@ impl Get for MemDb {
} }
impl Db for MemDb { impl Db for MemDb {
type Transaction<'a> = MemDbTxn<'a>; type Transaction<'a> = MemDbTxn<'a>;
fn txn(&mut self) -> MemDbTxn<'_> { fn unsafe_txn(&mut self) -> MemDbTxn<'_> {
MemDbTxn(self, HashMap::new(), HashSet::new()) MemDbTxn(self, HashMap::new(), HashSet::new())
} }
} }

View File

@@ -37,7 +37,7 @@ impl Get for Arc<ParityDb> {
} }
impl Db for Arc<ParityDb> { impl Db for Arc<ParityDb> {
type Transaction<'a> = Transaction<'a>; type Transaction<'a> = Transaction<'a>;
fn txn(&mut self) -> Self::Transaction<'_> { fn unsafe_txn(&mut self) -> Self::Transaction<'_> {
Transaction(self, vec![]) Transaction(self, vec![])
} }
} }

View File

@@ -39,7 +39,7 @@ impl<T: ThreadMode> Get for Arc<OptimisticTransactionDB<T>> {
} }
impl<T: Send + ThreadMode + 'static> Db for Arc<OptimisticTransactionDB<T>> { impl<T: Send + ThreadMode + 'static> Db for Arc<OptimisticTransactionDB<T>> {
type Transaction<'a> = Transaction<'a, T>; type Transaction<'a> = Transaction<'a, T>;
fn txn(&mut self) -> Self::Transaction<'_> { fn unsafe_txn(&mut self) -> Self::Transaction<'_> {
let mut opts = WriteOptions::default(); let mut opts = WriteOptions::default();
opts.set_sync(true); opts.set_sync(true);
Transaction(self.transaction_opt(&opts, &Default::default()), &**self) Transaction(self.transaction_opt(&opts, &Default::default()), &**self)

View File

@@ -24,6 +24,15 @@ pub(crate) struct CosignDelayTask<D: Db> {
pub(crate) db: D, pub(crate) db: D,
} }
struct AwaitUndroppable<T: DbTxn>(Option<core::mem::ManuallyDrop<Undroppable<T>>>);
impl<T: DbTxn> Drop for AwaitUndroppable<T> {
fn drop(&mut self) {
if let Some(mut txn) = self.0.take() {
(unsafe { core::mem::ManuallyDrop::take(&mut txn) }).close();
}
}
}
impl<D: Db> ContinuallyRan for CosignDelayTask<D> { impl<D: Db> ContinuallyRan for CosignDelayTask<D> {
type Error = DoesNotError; type Error = DoesNotError;
@@ -35,14 +44,18 @@ impl<D: Db> ContinuallyRan for CosignDelayTask<D> {
// Receive the next block to mark as cosigned // Receive the next block to mark as cosigned
let Some((block_number, time_evaluated)) = CosignedBlocks::try_recv(&mut txn) else { let Some((block_number, time_evaluated)) = CosignedBlocks::try_recv(&mut txn) else {
txn.close();
break; break;
}; };
// Calculate when we should mark it as valid // Calculate when we should mark it as valid
let time_valid = let time_valid =
SystemTime::UNIX_EPOCH + Duration::from_secs(time_evaluated) + ACKNOWLEDGEMENT_DELAY; SystemTime::UNIX_EPOCH + Duration::from_secs(time_evaluated) + ACKNOWLEDGEMENT_DELAY;
// Sleep until then // Sleep until then
let mut txn = AwaitUndroppable(Some(core::mem::ManuallyDrop::new(txn)));
tokio::time::sleep(SystemTime::now().duration_since(time_valid).unwrap_or(Duration::ZERO)) tokio::time::sleep(SystemTime::now().duration_since(time_valid).unwrap_or(Duration::ZERO))
.await; .await;
let mut txn = core::mem::ManuallyDrop::into_inner(txn.0.take().unwrap());
// Set the cosigned block // Set the cosigned block
LatestCosignedBlockNumber::set(&mut txn, &block_number); LatestCosignedBlockNumber::set(&mut txn, &block_number);

View File

@@ -87,7 +87,7 @@ impl<D: Db, R: RequestNotableCosigns> ContinuallyRan for CosignEvaluatorTask<D,
let mut known_cosign = None; let mut known_cosign = None;
let mut made_progress = false; let mut made_progress = false;
loop { loop {
let mut txn = self.db.txn(); let mut txn = self.db.unsafe_txn();
let Some(BlockEventData { block_number, has_events }) = BlockEvents::try_recv(&mut txn) let Some(BlockEventData { block_number, has_events }) = BlockEvents::try_recv(&mut txn)
else { else {
break; break;

View File

@@ -70,7 +70,7 @@ impl<D: Db> ContinuallyRan for CosignIntendTask<D> {
self.serai.latest_finalized_block().await.map_err(|e| format!("{e:?}"))?.number(); self.serai.latest_finalized_block().await.map_err(|e| format!("{e:?}"))?.number();
for block_number in start_block_number ..= latest_block_number { for block_number in start_block_number ..= latest_block_number {
let mut txn = self.db.txn(); let mut txn = self.db.unsafe_txn();
let (block, mut has_events) = let (block, mut has_events) =
block_has_events_justifying_a_cosign(&self.serai, block_number) block_has_events_justifying_a_cosign(&self.serai, block_number)

View File

@@ -424,7 +424,7 @@ impl<D: Db> Cosigning<D> {
// Since we verified this cosign's signature, and have a chain sufficiently long, handle the // Since we verified this cosign's signature, and have a chain sufficiently long, handle the
// cosign // cosign
let mut txn = self.db.txn(); let mut txn = self.db.unsafe_txn();
if !faulty { if !faulty {
// If this is for a future global session, we don't acknowledge this cosign at this time // If this is for a future global session, we don't acknowledge this cosign at this time
@@ -480,3 +480,30 @@ impl<D: Db> Cosigning<D> {
res res
} }
} }
mod tests {
use super::*;
struct RNC;
impl RequestNotableCosigns for RNC {
/// The error type which may be encountered when requesting notable cosigns.
type Error = ();
/// Request the notable cosigns for this global session.
fn request_notable_cosigns(
&self,
global_session: [u8; 32],
) -> impl Send + Future<Output = Result<(), Self::Error>> {
async move { Ok(()) }
}
}
#[tokio::test]
async fn test() {
let db: serai_db::MemDb = serai_db::MemDb::new();
let serai = unsafe { core::mem::transmute(0u64) };
let request = RNC;
let tasks = vec![];
let _ = Cosigning::spawn(db, serai, request, tasks);
core::future::pending().await
}
}