Publish ExternablBlock/SubstrateBlock, delay *Preprocess until ID acknowledged

Adds a channel for the Tributary scanner to communicate when an ID has been
acknowledged.
This commit is contained in:
Luke Parker
2023-05-08 22:20:51 -04:00
parent a7f2740dfb
commit 964fdee175
7 changed files with 252 additions and 77 deletions

View File

@@ -27,17 +27,17 @@ impl<D: Db> TributaryDb<D> {
self.0.get(Self::block_key(genesis)).map(|last| last.try_into().unwrap()).unwrap_or(genesis)
}
// This shouldn't need genesis? Yet it's saner to have then quibble about.
fn batch_id_key(genesis: &[u8], ext_block: [u8; 32]) -> Vec<u8> {
Self::tributary_key(b"batch_id", [genesis, ext_block.as_ref()].concat())
}
pub fn batch_id<G: Get>(getter: &G, genesis: [u8; 32], ext_block: [u8; 32]) -> Option<[u8; 32]> {
getter.get(Self::batch_id_key(&genesis, ext_block)).map(|bytes| bytes.try_into().unwrap())
}
fn plan_ids_key(genesis: &[u8], block: u64) -> Vec<u8> {
Self::tributary_key(b"plan_ids", [genesis, block.to_le_bytes().as_ref()].concat())
}
pub fn set_plan_ids(
txn: &mut D::Transaction<'_>,
genesis: [u8; 32],
block: u64,
plans: &[[u8; 32]],
) {
txn.put(Self::plan_ids_key(&genesis, block), plans.concat());
}
pub fn plan_ids<G: Get>(getter: &G, genesis: [u8; 32], block: u64) -> Option<Vec<[u8; 32]>> {
getter.get(Self::plan_ids_key(&genesis, block)).map(|bytes| {
let mut res = vec![];

View File

@@ -5,6 +5,8 @@ use zeroize::Zeroizing;
use ciphersuite::{Ciphersuite, Ristretto};
use tokio::sync::mpsc::UnboundedSender;
use tributary::{Signed, Block, TributaryReader};
use processor_messages::{
@@ -21,10 +23,17 @@ use crate::{
tributary::{TributaryDb, TributarySpec, Transaction},
};
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum RecognizedIdType {
Block,
Plan,
}
// Handle a specific Tributary block
async fn handle_block<D: Db, Pro: Processor>(
db: &mut TributaryDb<D>,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
recognized_id: &UnboundedSender<([u8; 32], RecognizedIdType, [u8; 32])>,
processor: &Pro,
spec: &TributarySpec,
block: Block<Transaction>,
@@ -172,16 +181,10 @@ async fn handle_block<D: Db, Pro: Processor>(
Transaction::ExternalBlock(block) => {
// Because this external block has been finalized, its batch ID should be authorized
// If we didn't provide this transaction, we should halt until we do
// If we provided a distinct transaction, we should error
// If we did provide this transaction, we should've set the batch ID for the block
let batch_id = TributaryDb::<D>::batch_id(&txn, genesis, block).expect(
"synced a tributary block finalizing a external block in a provided transaction \
despite us not providing that transaction",
);
TributaryDb::<D>::recognize_id(&mut txn, Zone::Batch.label(), genesis, batch_id);
TributaryDb::<D>::recognize_id(&mut txn, Zone::Batch.label(), genesis, block);
recognized_id
.send((genesis, RecognizedIdType::Block, block))
.expect("recognized_id_recv was dropped. are we shutting down?");
}
Transaction::SubstrateBlock(block) => {
@@ -192,6 +195,9 @@ async fn handle_block<D: Db, Pro: Processor>(
for id in plan_ids {
TributaryDb::<D>::recognize_id(&mut txn, Zone::Sign.label(), genesis, id);
recognized_id
.send((genesis, RecognizedIdType::Plan, id))
.expect("recognized_id_recv was dropped. are we shutting down?");
}
}
@@ -287,6 +293,7 @@ async fn handle_block<D: Db, Pro: Processor>(
pub async fn handle_new_blocks<D: Db, Pro: Processor>(
db: &mut TributaryDb<D>,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
recognized_id: &UnboundedSender<([u8; 32], RecognizedIdType, [u8; 32])>,
processor: &Pro,
spec: &TributarySpec,
tributary: &TributaryReader<D, Transaction>,
@@ -295,7 +302,7 @@ pub async fn handle_new_blocks<D: Db, Pro: Processor>(
let mut last_block = db.last_block(genesis);
while let Some(next) = tributary.block_after(&last_block) {
let block = tributary.block(&next).unwrap();
handle_block(db, key, processor, spec, block).await;
handle_block(db, key, recognized_id, processor, spec, block).await;
last_block = next;
db.set_last_block(genesis, next);
}