Move recognized_id from a channel to an async lambda

Fixes a race condition. Also fixes recognizing batch IDs.
This commit is contained in:
Luke Parker
2023-08-24 21:55:59 -04:00
parent ea8e26eca3
commit 32df302cc4
5 changed files with 128 additions and 110 deletions

View File

@@ -8,8 +8,6 @@ use serai_client::{
primitives::NetworkId, validator_sets::primitives::ValidatorSet, subxt::utils::Encoded,
};
use tokio::sync::mpsc::UnboundedSender;
use tributary::{
Transaction as TributaryTransaction, Block, TributaryReader,
tendermint::{
@@ -39,13 +37,15 @@ pub enum RecognizedIdType {
async fn handle_block<
D: Db,
Pro: Processors,
F: Future<Output = ()>,
PST: Clone + Fn(ValidatorSet, Encoded) -> F,
FPst: Future<Output = ()>,
PST: Clone + Fn(ValidatorSet, Encoded) -> FPst,
FRid: Future<Output = Vec<[u8; 32]>>,
RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32]) -> FRid,
P: P2p,
>(
db: &mut TributaryDb<D>,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
recognized_id: &UnboundedSender<(NetworkId, [u8; 32], RecognizedIdType, [u8; 32])>,
recognized_id: RID,
processors: &Pro,
publish_serai_tx: PST,
spec: &TributarySpec,
@@ -79,14 +79,14 @@ async fn handle_block<
// TODO: disconnect the node from network/ban from further participation in Tributary
}
TributaryTransaction::Application(tx) => {
handle_application_tx::<D, _, _, _>(
handle_application_tx::<D, _, _, _, _, _>(
tx,
spec,
processors,
publish_serai_tx.clone(),
genesis,
key,
recognized_id,
recognized_id.clone(),
&mut txn,
)
.await;
@@ -105,13 +105,15 @@ async fn handle_block<
pub async fn handle_new_blocks<
D: Db,
Pro: Processors,
F: Future<Output = ()>,
PST: Clone + Fn(ValidatorSet, Encoded) -> F,
FPst: Future<Output = ()>,
PST: Clone + Fn(ValidatorSet, Encoded) -> FPst,
FRid: Future<Output = Vec<[u8; 32]>>,
RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32]) -> FRid,
P: P2p,
>(
db: &mut TributaryDb<D>,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
recognized_id: &UnboundedSender<(NetworkId, [u8; 32], RecognizedIdType, [u8; 32])>,
recognized_id: RID,
processors: &Pro,
publish_serai_tx: PST,
spec: &TributarySpec,
@@ -121,10 +123,10 @@ pub async fn handle_new_blocks<
let mut last_block = db.last_block(genesis);
while let Some(next) = tributary.block_after(&last_block) {
let block = tributary.block(&next).unwrap();
handle_block::<_, _, _, _, P>(
handle_block::<_, _, _, _, _, _, P>(
db,
key,
recognized_id,
recognized_id.clone(),
processors,
publish_serai_tx.clone(),
spec,