mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-08 12:19:24 +00:00
coordinator/src/db.rs db macro implimentation (#431)
* coordinator/src/db.rs db macro implimentation * fixed fmt errors * converted txn functions to get/set counterparts * use take_signed_transaction function * fix for two fo the tests * Misc tweaks * Minor tweaks --------- Co-authored-by: Luke Parker <lukeparker5132@gmail.com>
This commit is contained in:
@@ -39,7 +39,7 @@ mod tributary;
|
||||
use crate::tributary::{TributarySpec, SignData, Transaction, scanner::RecognizedIdType, PlanIds};
|
||||
|
||||
mod db;
|
||||
use db::MainDb;
|
||||
use db::*;
|
||||
|
||||
mod p2p;
|
||||
pub use p2p::*;
|
||||
@@ -83,7 +83,7 @@ async fn add_tributary<D: Db, Pro: Processors, P: P2p>(
|
||||
tributaries: &broadcast::Sender<TributaryEvent<D, P>>,
|
||||
spec: TributarySpec,
|
||||
) {
|
||||
if MainDb::<D>::is_tributary_retired(&db, spec.set()) {
|
||||
if RetiredTributaryDb::get(&db, spec.set()).is_some() {
|
||||
log::info!("not adding tributary {:?} since it's been retired", spec.set());
|
||||
}
|
||||
|
||||
@@ -138,7 +138,7 @@ async fn publish_signed_transaction<D: Db, P: P2p>(
|
||||
|
||||
// Safe as we should deterministically create transactions, meaning if this is already on-disk,
|
||||
// it's what we're saving now
|
||||
MainDb::<D>::save_signed_transaction(txn, signed.nonce, tx);
|
||||
SignedTransactionDb::set(txn, &order, signed.nonce, &tx.serialize());
|
||||
|
||||
(order, signer)
|
||||
} else {
|
||||
@@ -147,8 +147,9 @@ async fn publish_signed_transaction<D: Db, P: P2p>(
|
||||
|
||||
// If we're trying to publish 5, when the last transaction published was 3, this will delay
|
||||
// publication until the point in time we publish 4
|
||||
while let Some(tx) = MainDb::<D>::take_signed_transaction(
|
||||
while let Some(tx) = SignedTransactionDb::take_signed_transaction(
|
||||
txn,
|
||||
&order,
|
||||
tributary
|
||||
.next_nonce(&signer, &order)
|
||||
.await
|
||||
@@ -181,8 +182,13 @@ async fn handle_processor_message<D: Db, P: P2p>(
|
||||
network: NetworkId,
|
||||
msg: &processors::Message,
|
||||
) -> bool {
|
||||
if MainDb::<D>::handled_message(db, msg.network, msg.id) {
|
||||
return true;
|
||||
#[allow(clippy::nonminimal_bool)]
|
||||
if let Some(already_handled) = HandledMessageDb::get(db, msg.network) {
|
||||
assert!(!(already_handled > msg.id));
|
||||
assert!((already_handled == msg.id) || (already_handled == msg.id - 1));
|
||||
if already_handled == msg.id {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
let _hvq_lock = HANDOVER_VERIFY_QUEUE_LOCK.get_or_init(|| Mutex::new(())).lock().await;
|
||||
@@ -219,7 +225,7 @@ async fn handle_processor_message<D: Db, P: P2p>(
|
||||
.iter()
|
||||
.map(|plan| plan.session)
|
||||
.filter(|session| {
|
||||
!MainDb::<D>::is_tributary_retired(&txn, ValidatorSet { network, session: *session })
|
||||
RetiredTributaryDb::get(&txn, ValidatorSet { network, session: *session }).is_none()
|
||||
})
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
@@ -293,7 +299,7 @@ async fn handle_processor_message<D: Db, P: P2p>(
|
||||
batch.network, msg.network,
|
||||
"processor sent us a batch for a different network than it was for",
|
||||
);
|
||||
MainDb::<D>::save_expected_batch(&mut txn, batch);
|
||||
ExpectedBatchDb::save_expected_batch(&mut txn, batch);
|
||||
None
|
||||
}
|
||||
// If this is a new Batch, immediately publish it (if we can)
|
||||
@@ -306,7 +312,7 @@ async fn handle_processor_message<D: Db, P: P2p>(
|
||||
log::debug!("received batch {:?} {}", batch.batch.network, batch.batch.id);
|
||||
|
||||
// Save this batch to the disk
|
||||
MainDb::<D>::save_batch(&mut txn, batch.clone());
|
||||
BatchDb::set(&mut txn, batch.batch.network, batch.batch.id, &batch.clone());
|
||||
|
||||
// Get the next-to-execute batch ID
|
||||
let mut next = substrate::get_expected_next_batch(serai, network).await;
|
||||
@@ -314,7 +320,7 @@ async fn handle_processor_message<D: Db, P: P2p>(
|
||||
// Since we have a new batch, publish all batches yet to be published to Serai
|
||||
// This handles the edge-case where batch n+1 is signed before batch n is
|
||||
let mut batches = VecDeque::new();
|
||||
while let Some(batch) = MainDb::<D>::batch(&txn, network, next) {
|
||||
while let Some(batch) = BatchDb::get(&txn, network, next) {
|
||||
batches.push_back(batch);
|
||||
next += 1;
|
||||
}
|
||||
@@ -359,10 +365,12 @@ async fn handle_processor_message<D: Db, P: P2p>(
|
||||
|
||||
// If we have a relevant Tributary, check it's actually still relevant and has yet to be retired
|
||||
if let Some(relevant_tributary_value) = relevant_tributary {
|
||||
if MainDb::<D>::is_tributary_retired(
|
||||
if RetiredTributaryDb::get(
|
||||
&txn,
|
||||
ValidatorSet { network: msg.network, session: relevant_tributary_value },
|
||||
) {
|
||||
)
|
||||
.is_some()
|
||||
{
|
||||
relevant_tributary = None;
|
||||
}
|
||||
}
|
||||
@@ -491,7 +499,7 @@ async fn handle_processor_message<D: Db, P: P2p>(
|
||||
}
|
||||
sign::ProcessorMessage::Preprocess { id, preprocesses } => {
|
||||
if id.attempt == 0 {
|
||||
MainDb::<D>::save_first_preprocess(
|
||||
FirstPreprocessDb::save_first_preprocess(
|
||||
&mut txn,
|
||||
network,
|
||||
RecognizedIdType::Plan,
|
||||
@@ -563,7 +571,7 @@ async fn handle_processor_message<D: Db, P: P2p>(
|
||||
// If this is the first attempt instance, wait until we synchronize around the batch
|
||||
// first
|
||||
if id.attempt == 0 {
|
||||
MainDb::<D>::save_first_preprocess(
|
||||
FirstPreprocessDb::save_first_preprocess(
|
||||
&mut txn,
|
||||
spec.set().network,
|
||||
RecognizedIdType::Batch,
|
||||
@@ -588,8 +596,8 @@ async fn handle_processor_message<D: Db, P: P2p>(
|
||||
// all prior published `Batch`s
|
||||
// TODO: This assumes BatchPreprocess is immediately after Batch
|
||||
// Ensure that assumption
|
||||
let last_received = MainDb::<D>::last_received_batch(&txn, msg.network).unwrap();
|
||||
let handover_batch = MainDb::<D>::handover_batch(&txn, spec.set());
|
||||
let last_received = LastReceivedBatchDb::get(&txn, msg.network).unwrap();
|
||||
let handover_batch = HandoverBatchDb::get(&txn, spec.set());
|
||||
let mut queue = false;
|
||||
if let Some(handover_batch) = handover_batch {
|
||||
// There is a race condition here. We may verify all `Batch`s from the prior set,
|
||||
@@ -604,7 +612,7 @@ async fn handle_processor_message<D: Db, P: P2p>(
|
||||
// To fix this, if this is after the handover `Batch` and we have yet to verify
|
||||
// publication of the handover `Batch`, don't yet yield the provided.
|
||||
if last_received > handover_batch {
|
||||
if let Some(last_verified) = MainDb::<D>::last_verified_batch(&txn, msg.network) {
|
||||
if let Some(last_verified) = LastVerifiedBatchDb::get(&txn, msg.network) {
|
||||
if last_verified < handover_batch {
|
||||
queue = true;
|
||||
}
|
||||
@@ -613,11 +621,11 @@ async fn handle_processor_message<D: Db, P: P2p>(
|
||||
}
|
||||
}
|
||||
} else {
|
||||
MainDb::<D>::set_handover_batch(&mut txn, spec.set(), last_received);
|
||||
HandoverBatchDb::set_handover_batch(&mut txn, spec.set(), last_received);
|
||||
// If this isn't the first batch, meaning we do have to verify all prior batches, and
|
||||
// the prior Batch hasn't been verified yet...
|
||||
if (last_received != 0) &&
|
||||
MainDb::<D>::last_verified_batch(&txn, msg.network)
|
||||
LastVerifiedBatchDb::get(&txn, msg.network)
|
||||
.map(|last_verified| last_verified < (last_received - 1))
|
||||
.unwrap_or(true)
|
||||
{
|
||||
@@ -627,14 +635,14 @@ async fn handle_processor_message<D: Db, P: P2p>(
|
||||
}
|
||||
|
||||
if queue {
|
||||
MainDb::<D>::queue_batch(&mut txn, spec.set(), intended);
|
||||
QueuedBatchesDb::queue(&mut txn, spec.set(), intended);
|
||||
vec![]
|
||||
} else {
|
||||
// Because this is post-verification of the handover batch, take all queued `Batch`s
|
||||
// now to ensure we don't provide this before an already queued Batch
|
||||
// This *may* be an unreachable case due to how last_verified_batch is set, yet it
|
||||
// doesn't hurt to have as a defensive pattern
|
||||
let mut res = MainDb::<D>::take_queued_batches(&mut txn, spec.set());
|
||||
let mut res = QueuedBatchesDb::take(&mut txn, spec.set());
|
||||
res.push(intended);
|
||||
res
|
||||
}
|
||||
@@ -702,7 +710,7 @@ async fn handle_processor_message<D: Db, P: P2p>(
|
||||
}
|
||||
}
|
||||
|
||||
MainDb::<D>::save_handled_message(&mut txn, msg.network, msg.id);
|
||||
HandledMessageDb::set(&mut txn, msg.network, &msg.id);
|
||||
txn.commit();
|
||||
|
||||
true
|
||||
@@ -828,7 +836,7 @@ async fn handle_cosigns_and_batch_publication<D: Db, P: P2p>(
|
||||
let _hvq_lock = HANDOVER_VERIFY_QUEUE_LOCK.get_or_init(|| Mutex::new(())).lock().await;
|
||||
let mut txn = db.txn();
|
||||
let mut to_publish = vec![];
|
||||
let start_id = MainDb::<D>::last_verified_batch(&txn, network)
|
||||
let start_id = LastVerifiedBatchDb::get(&txn, network)
|
||||
.map(|already_verified| already_verified + 1)
|
||||
.unwrap_or(0);
|
||||
if let Some(last_id) =
|
||||
@@ -838,9 +846,10 @@ async fn handle_cosigns_and_batch_publication<D: Db, P: P2p>(
|
||||
// `Batch`
|
||||
// If so, we need to publish queued provided `Batch` transactions
|
||||
for batch in start_id ..= last_id {
|
||||
let is_pre_handover = MainDb::<D>::is_handover_batch(&txn, network, batch + 1);
|
||||
if let Some(set) = is_pre_handover {
|
||||
let mut queued = MainDb::<D>::take_queued_batches(&mut txn, set);
|
||||
let is_pre_handover = LookupHandoverBatchDb::get(&txn, network, batch + 1);
|
||||
if let Some(session) = is_pre_handover {
|
||||
let set = ValidatorSet { network, session };
|
||||
let mut queued = QueuedBatchesDb::take(&mut txn, set);
|
||||
// is_handover_batch is only set for handover `Batch`s we're participating in, making
|
||||
// this safe
|
||||
if queued.is_empty() {
|
||||
@@ -851,14 +860,14 @@ async fn handle_cosigns_and_batch_publication<D: Db, P: P2p>(
|
||||
to_publish.push((set.session, queued.remove(0)));
|
||||
// Re-queue the remaining batches
|
||||
for remaining in queued {
|
||||
MainDb::<D>::queue_batch(&mut txn, set, remaining);
|
||||
QueuedBatchesDb::queue(&mut txn, set, remaining);
|
||||
}
|
||||
}
|
||||
|
||||
let is_handover = MainDb::<D>::is_handover_batch(&txn, network, batch);
|
||||
if let Some(set) = is_handover {
|
||||
for queued in MainDb::<D>::take_queued_batches(&mut txn, set) {
|
||||
to_publish.push((set.session, queued));
|
||||
let is_handover = LookupHandoverBatchDb::get(&txn, network, batch);
|
||||
if let Some(session) = is_handover {
|
||||
for queued in QueuedBatchesDb::take(&mut txn, ValidatorSet { network, session }) {
|
||||
to_publish.push((session, queued));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -952,7 +961,7 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
|
||||
|
||||
let (new_tributary_spec_send, mut new_tributary_spec_recv) = mpsc::unbounded_channel();
|
||||
// Reload active tributaries from the database
|
||||
for spec in MainDb::<D>::active_tributaries(&raw_db).1 {
|
||||
for spec in ActiveTributaryDb::active_tributaries(&raw_db).1 {
|
||||
new_tributary_spec_send.send(spec).unwrap();
|
||||
}
|
||||
|
||||
@@ -1058,8 +1067,7 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
|
||||
// This waits until the necessary preprocess is available 0,
|
||||
let get_preprocess = |raw_db, id_type, id| async move {
|
||||
loop {
|
||||
let Some(preprocess) = MainDb::<D>::first_preprocess(raw_db, set.network, id_type, id)
|
||||
else {
|
||||
let Some(preprocess) = FirstPreprocessDb::get(raw_db, set.network, id_type, id) else {
|
||||
log::warn!("waiting for preprocess for recognized ID");
|
||||
sleep(Duration::from_millis(100)).await;
|
||||
continue;
|
||||
@@ -1096,7 +1104,7 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
|
||||
let tributaries = tributaries.read().await;
|
||||
let Some(tributary) = tributaries.get(&genesis) else {
|
||||
// If we don't have this Tributary because it's retired, break and move on
|
||||
if MainDb::<D>::is_tributary_retired(&raw_db, set) {
|
||||
if RetiredTributaryDb::get(&raw_db, set).is_some() {
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user