Don't track deployment block in the Router

This technically has a TOCTOU where we sync an Epoch's metadata (signifying we
did sync to that point), then check if the Router was deployed, yet at that
very moment the node resets to genesis. By ensuring the Router is deployed, we
avoid this (and don't need to track the deployment block in-contract).

Also uses a JoinSet to sync the 32 blocks in parallel.
This commit is contained in:
Luke Parker
2024-09-20 02:06:35 -04:00
parent 7e4c59a0a3
commit 554c5778e4
3 changed files with 50 additions and 64 deletions

View File

@@ -2,20 +2,23 @@ use core::future::Future;
use std::{sync::Arc, collections::HashSet};
use alloy_core::primitives::B256;
use alloy_rpc_types_eth::{BlockTransactionsKind, BlockNumberOrTag};
use alloy_rpc_types_eth::{Header, BlockTransactionsKind, BlockNumberOrTag};
use alloy_transport::{RpcError, TransportErrorKind};
use alloy_simple_request_transport::SimpleRequest;
use alloy_provider::{Provider, RootProvider};
use serai_client::primitives::{NetworkId, Coin, Amount};
use tokio::task::JoinSet;
use serai_db::Db;
use scanner::ScannerFeed;
use ethereum_schnorr::PublicKey;
use ethereum_erc20::{TopLevelTransfer, Erc20};
use ethereum_router::{Coin as EthereumCoin, InInstruction as EthereumInInstruction, Router};
#[rustfmt::skip]
use ethereum_router::{Coin as EthereumCoin, InInstruction as EthereumInInstruction, Executed, Router};
use crate::{
TOKENS, ETHER_DUST, DAI_DUST, InitialSeraiKey,
@@ -141,8 +144,6 @@ impl<D: Db> ScannerFeed for Rpc<D> {
) -> impl Send + Future<Output = Result<Self::Block, Self::EphemeralError>> {
async move {
let epoch = self.unchecked_block_header_by_number(number).await?;
let mut instructions = vec![];
let mut executed = vec![];
let Some(router) = Router::new(
self.provider.clone(),
@@ -153,16 +154,42 @@ impl<D: Db> ScannerFeed for Rpc<D> {
)
.await?
else {
// The Router wasn't deployed yet so we cannot have any on-chain interactions
// If the Router has been deployed by the block we've synced to, it won't have any events
// for these blocks anways, so this doesn't risk a consensus split
return Ok(FullEpoch { epoch, instructions, executed });
Err(TransportErrorKind::Custom("router wasn't deployed on-chain yet".to_string().into()))?
};
let router_deployment_block = router.deployment_block().await?;
async fn sync_block(
provider: Arc<RootProvider<SimpleRequest>>,
router: Router,
block: Header,
) -> Result<(Vec<EthereumInInstruction>, Vec<Executed>), RpcError<TransportErrorKind>> {
let mut instructions = router.in_instructions(block.number, &HashSet::from(TOKENS)).await?;
// TODO: Use a LocalSet and handle all these in parallel
for token in TOKENS {
for TopLevelTransfer { id, from, amount, data } in Erc20::new(provider.clone(), token)
.top_level_transfers(block.number, router.address())
.await?
{
instructions.push(EthereumInInstruction {
id,
from,
coin: EthereumCoin::Erc20(token),
amount,
data,
});
}
}
let executed = router.executed(block.number).await?;
Ok((instructions, executed))
}
// We use JoinSet here to minimize the latency of the variety of requests we make. For each
// JoinError that may occur, we unwrap it as no underlying tasks should panic
let mut join_set = JoinSet::new();
let mut to_check = epoch.end_hash;
// TODO: This makes 32 sequential requests. We should run them in parallel using block
// nunbers
while to_check != epoch.prior_end_hash {
let to_check_block = self
.provider
@@ -179,34 +206,19 @@ impl<D: Db> ScannerFeed for Rpc<D> {
})?
.header;
// If this is before the Router was deployed, move on
if to_check_block.number < router_deployment_block {
// This is sa
break;
}
instructions.append(
&mut router.in_instructions(to_check_block.number, &HashSet::from(TOKENS)).await?,
);
for token in TOKENS {
for TopLevelTransfer { id, from, amount, data } in
Erc20::new(self.provider.clone(), token)
.top_level_transfers(to_check_block.number, router.address())
.await?
{
instructions.push(EthereumInInstruction {
id,
from,
coin: EthereumCoin::Erc20(token),
amount,
data,
});
}
}
executed.append(&mut router.executed(to_check_block.number).await?);
// Update the next block to check
to_check = *to_check_block.parent_hash;
// Spawn a task to sync this block
join_set.spawn(sync_block(self.provider.clone(), router.clone(), to_check_block));
}
let mut instructions = vec![];
let mut executed = vec![];
while let Some(instructions_and_executed) = join_set.join_next().await {
let (mut these_instructions, mut these_executed) = instructions_and_executed.unwrap()?;
instructions.append(&mut these_instructions);
executed.append(&mut these_executed);
}
Ok(FullEpoch { epoch, instructions, executed })