Load/save first_preprocess with RecognizedIdType

Enables their IDs to have conflicts across each other.
This commit is contained in:
Luke Parker
2023-10-14 21:58:10 -04:00
parent 7409d0b3cf
commit a300a1029a
3 changed files with 34 additions and 15 deletions

View File

@@ -15,7 +15,7 @@ use serai_client::{
pub use serai_db::*; pub use serai_db::*;
use ::tributary::ReadWrite; use ::tributary::ReadWrite;
use crate::tributary::{TributarySpec, Transaction}; use crate::tributary::{TributarySpec, Transaction, scanner::RecognizedIdType};
#[derive(Debug)] #[derive(Debug)]
pub struct MainDb<D: Db>(PhantomData<D>); pub struct MainDb<D: Db>(PhantomData<D>);
@@ -106,24 +106,30 @@ impl<D: Db> MainDb<D> {
res res
} }
fn first_preprocess_key(network: NetworkId, id: [u8; 32]) -> Vec<u8> { fn first_preprocess_key(network: NetworkId, id_type: RecognizedIdType, id: [u8; 32]) -> Vec<u8> {
Self::main_key(b"first_preprocess", (network, id).encode()) Self::main_key(b"first_preprocess", (network, id_type, id).encode())
} }
pub fn save_first_preprocess( pub fn save_first_preprocess(
txn: &mut D::Transaction<'_>, txn: &mut D::Transaction<'_>,
network: NetworkId, network: NetworkId,
id_type: RecognizedIdType,
id: [u8; 32], id: [u8; 32],
preprocess: Vec<u8>, preprocess: Vec<u8>,
) { ) {
let key = Self::first_preprocess_key(network, id); let key = Self::first_preprocess_key(network, id_type, id);
if let Some(existing) = txn.get(&key) { if let Some(existing) = txn.get(&key) {
assert_eq!(existing, preprocess, "saved a distinct first preprocess"); assert_eq!(existing, preprocess, "saved a distinct first preprocess");
return; return;
} }
txn.put(key, preprocess); txn.put(key, preprocess);
} }
pub fn first_preprocess<G: Get>(getter: &G, network: NetworkId, id: [u8; 32]) -> Option<Vec<u8>> { pub fn first_preprocess<G: Get>(
getter.get(Self::first_preprocess_key(network, id)) getter: &G,
network: NetworkId,
id_type: RecognizedIdType,
id: [u8; 32],
) -> Option<Vec<u8>> {
getter.get(Self::first_preprocess_key(network, id_type, id))
} }
fn last_received_batch_key(network: NetworkId) -> Vec<u8> { fn last_received_batch_key(network: NetworkId) -> Vec<u8> {

View File

@@ -476,7 +476,13 @@ async fn handle_processor_message<D: Db, P: P2p>(
ProcessorMessage::Sign(msg) => match msg { ProcessorMessage::Sign(msg) => match msg {
sign::ProcessorMessage::Preprocess { id, preprocess } => { sign::ProcessorMessage::Preprocess { id, preprocess } => {
if id.attempt == 0 { if id.attempt == 0 {
MainDb::<D>::save_first_preprocess(&mut txn, network, id.id, preprocess); MainDb::<D>::save_first_preprocess(
&mut txn,
network,
RecognizedIdType::Plan,
id.id,
preprocess,
);
vec![] vec![]
} else { } else {
@@ -527,7 +533,13 @@ async fn handle_processor_message<D: Db, P: P2p>(
// If this is the first attempt instance, wait until we synchronize around the batch // If this is the first attempt instance, wait until we synchronize around the batch
// first // first
if id.attempt == 0 { if id.attempt == 0 {
MainDb::<D>::save_first_preprocess(&mut txn, spec.set().network, id.id, preprocess); MainDb::<D>::save_first_preprocess(
&mut txn,
spec.set().network,
RecognizedIdType::Batch,
id.id,
preprocess,
);
// If this is the new key's first Batch, only create this TX once we verify all // If this is the new key's first Batch, only create this TX once we verify all
// all prior published `Batch`s // all prior published `Batch`s
@@ -860,10 +872,10 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
// received/saved, creating a race between Tributary ack and the availability of all // received/saved, creating a race between Tributary ack and the availability of all
// Preprocesses // Preprocesses
// This waits until the necessary preprocess is available 0, // This waits until the necessary preprocess is available 0,
// TODO: Incorporate RecognizedIdType here? let get_preprocess = |raw_db, id_type, id| async move {
let get_preprocess = |raw_db, id| async move {
loop { loop {
let Some(preprocess) = MainDb::<D>::first_preprocess(raw_db, set.network, id) else { let Some(preprocess) = MainDb::<D>::first_preprocess(raw_db, set.network, id_type, id)
else {
sleep(Duration::from_millis(100)).await; sleep(Duration::from_millis(100)).await;
continue; continue;
}; };
@@ -875,14 +887,14 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
RecognizedIdType::Batch => Transaction::BatchPreprocess(SignData { RecognizedIdType::Batch => Transaction::BatchPreprocess(SignData {
plan: id, plan: id,
attempt: 0, attempt: 0,
data: get_preprocess(&raw_db, id).await, data: get_preprocess(&raw_db, id_type, id).await,
signed: Transaction::empty_signed(), signed: Transaction::empty_signed(),
}), }),
RecognizedIdType::Plan => Transaction::SignPreprocess(SignData { RecognizedIdType::Plan => Transaction::SignPreprocess(SignData {
plan: id, plan: id,
attempt: 0, attempt: 0,
data: get_preprocess(&raw_db, id).await, data: get_preprocess(&raw_db, id_type, id).await,
signed: Transaction::empty_signed(), signed: Transaction::empty_signed(),
}), }),
}; };
@@ -909,7 +921,7 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
}; };
// This is safe to perform multiple times and solely needs atomicity with regards to // This is safe to perform multiple times and solely needs atomicity with regards to
// itself // itself
// TODO: Should this not take a TXN accordingly? It's best practice to take a txn, yet // TODO: Should this not take a txn accordingly? It's best practice to take a txn, yet
// taking a txn fails to declare its achieved independence // taking a txn fails to declare its achieved independence
let mut txn = raw_db.txn(); let mut txn = raw_db.txn();
publish_signed_transaction(&mut txn, tributary, tx).await; publish_signed_transaction(&mut txn, tributary, tx).await;

View File

@@ -7,6 +7,7 @@ use ciphersuite::{Ciphersuite, Ristretto};
use tokio::sync::broadcast; use tokio::sync::broadcast;
use scale::{Encode, Decode};
use serai_client::{validator_sets::primitives::ValidatorSet, subxt::utils::Encoded, Serai}; use serai_client::{validator_sets::primitives::ValidatorSet, subxt::utils::Encoded, Serai};
use tributary::{ use tributary::{
@@ -27,7 +28,7 @@ use crate::{
P2p, P2p,
}; };
#[derive(Clone, Copy, PartialEq, Eq, Debug)] #[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, Decode)]
pub enum RecognizedIdType { pub enum RecognizedIdType {
Batch, Batch,
Plan, Plan,