diff --git a/processor/ethereum/router/contracts/Router.sol b/processor/ethereum/router/contracts/Router.sol index d82c0d90..9100f59e 100644 --- a/processor/ethereum/router/contracts/Router.sol +++ b/processor/ethereum/router/contracts/Router.sol @@ -7,9 +7,6 @@ import "Schnorr.sol"; // _ is used as a prefix for internal functions and smart-contract-scoped variables contract Router { - // The block at which this contract was deployed. - uint256 private _deploymentBlock; - // Nonce is incremented for each command executed, preventing replays uint256 private _nonce; @@ -66,8 +63,6 @@ contract Router { } constructor(bytes32 initialSeraiKey) _updateSeraiKeyAtEndOfFn(0, initialSeraiKey) { - _deploymentBlock = block.number; - // We consumed nonce 0 when setting the initial Serai key _nonce = 1; // Nonces are incremented by 1 upon account creation, prior to any code execution, per EIP-161 @@ -235,10 +230,6 @@ contract Router { return _nonce; } - function deploymentBlock() external view returns (uint256) { - return _deploymentBlock; - } - function smartContractNonce() external view returns (uint256) { return _smartContractNonce; } diff --git a/processor/ethereum/router/src/lib.rs b/processor/ethereum/router/src/lib.rs index d78b3218..7a7cffd8 100644 --- a/processor/ethereum/router/src/lib.rs +++ b/processor/ethereum/router/src/lib.rs @@ -11,7 +11,7 @@ use alloy_consensus::TxLegacy; use alloy_sol_types::{SolValue, SolConstructor, SolCall, SolEvent}; -use alloy_rpc_types_eth::{TransactionInput, TransactionRequest, Filter}; +use alloy_rpc_types_eth::Filter; use alloy_transport::{TransportErrorKind, RpcError}; use alloy_simple_request_transport::SimpleRequest; use alloy_provider::{Provider, RootProvider}; @@ -296,23 +296,6 @@ impl Router { self.1 } - /// Fetch the block this contract was deployed at. - pub async fn deployment_block(&self) -> Result> { - let call = TransactionRequest::default() - .to(self.address()) - .input(TransactionInput::new(abi::deploymentBlockCall::new(()).abi_encode().into())); - let bytes = self.0.call(&call).await?; - let deployment_block = abi::deploymentBlockCall::abi_decode_returns(&bytes, true) - .map_err(|e| { - TransportErrorKind::Custom( - format!("node returned a non-u256 for function returning u256: {e:?}").into(), - ) - })? - ._0; - - Ok(deployment_block.try_into().unwrap()) - } - /// Get the message to be signed in order to update the key for Serai. pub fn update_serai_key_message(chain_id: U256, nonce: u64, key: &PublicKey) -> Vec { ( diff --git a/processor/ethereum/src/rpc.rs b/processor/ethereum/src/rpc.rs index a5533484..7f8a422b 100644 --- a/processor/ethereum/src/rpc.rs +++ b/processor/ethereum/src/rpc.rs @@ -2,20 +2,23 @@ use core::future::Future; use std::{sync::Arc, collections::HashSet}; use alloy_core::primitives::B256; -use alloy_rpc_types_eth::{BlockTransactionsKind, BlockNumberOrTag}; +use alloy_rpc_types_eth::{Header, BlockTransactionsKind, BlockNumberOrTag}; use alloy_transport::{RpcError, TransportErrorKind}; use alloy_simple_request_transport::SimpleRequest; use alloy_provider::{Provider, RootProvider}; use serai_client::primitives::{NetworkId, Coin, Amount}; +use tokio::task::JoinSet; + use serai_db::Db; use scanner::ScannerFeed; use ethereum_schnorr::PublicKey; use ethereum_erc20::{TopLevelTransfer, Erc20}; -use ethereum_router::{Coin as EthereumCoin, InInstruction as EthereumInInstruction, Router}; +#[rustfmt::skip] +use ethereum_router::{Coin as EthereumCoin, InInstruction as EthereumInInstruction, Executed, Router}; use crate::{ TOKENS, ETHER_DUST, DAI_DUST, InitialSeraiKey, @@ -141,8 +144,6 @@ impl ScannerFeed for Rpc { ) -> impl Send + Future> { async move { let epoch = self.unchecked_block_header_by_number(number).await?; - let mut instructions = vec![]; - let mut executed = vec![]; let Some(router) = Router::new( self.provider.clone(), @@ -153,16 +154,42 @@ impl ScannerFeed for Rpc { ) .await? else { - // The Router wasn't deployed yet so we cannot have any on-chain interactions - // If the Router has been deployed by the block we've synced to, it won't have any events - // for these blocks anways, so this doesn't risk a consensus split - return Ok(FullEpoch { epoch, instructions, executed }); + Err(TransportErrorKind::Custom("router wasn't deployed on-chain yet".to_string().into()))? }; - let router_deployment_block = router.deployment_block().await?; + async fn sync_block( + provider: Arc>, + router: Router, + block: Header, + ) -> Result<(Vec, Vec), RpcError> { + let mut instructions = router.in_instructions(block.number, &HashSet::from(TOKENS)).await?; - // TODO: Use a LocalSet and handle all these in parallel + for token in TOKENS { + for TopLevelTransfer { id, from, amount, data } in Erc20::new(provider.clone(), token) + .top_level_transfers(block.number, router.address()) + .await? + { + instructions.push(EthereumInInstruction { + id, + from, + coin: EthereumCoin::Erc20(token), + amount, + data, + }); + } + } + + let executed = router.executed(block.number).await?; + + Ok((instructions, executed)) + } + + // We use JoinSet here to minimize the latency of the variety of requests we make. For each + // JoinError that may occur, we unwrap it as no underlying tasks should panic + let mut join_set = JoinSet::new(); let mut to_check = epoch.end_hash; + // TODO: This makes 32 sequential requests. We should run them in parallel using block + // nunbers while to_check != epoch.prior_end_hash { let to_check_block = self .provider @@ -179,34 +206,19 @@ impl ScannerFeed for Rpc { })? .header; - // If this is before the Router was deployed, move on - if to_check_block.number < router_deployment_block { - // This is sa - break; - } - - instructions.append( - &mut router.in_instructions(to_check_block.number, &HashSet::from(TOKENS)).await?, - ); - for token in TOKENS { - for TopLevelTransfer { id, from, amount, data } in - Erc20::new(self.provider.clone(), token) - .top_level_transfers(to_check_block.number, router.address()) - .await? - { - instructions.push(EthereumInInstruction { - id, - from, - coin: EthereumCoin::Erc20(token), - amount, - data, - }); - } - } - - executed.append(&mut router.executed(to_check_block.number).await?); - + // Update the next block to check to_check = *to_check_block.parent_hash; + + // Spawn a task to sync this block + join_set.spawn(sync_block(self.provider.clone(), router.clone(), to_check_block)); + } + + let mut instructions = vec![]; + let mut executed = vec![]; + while let Some(instructions_and_executed) = join_set.join_next().await { + let (mut these_instructions, mut these_executed) = instructions_and_executed.unwrap()?; + instructions.append(&mut these_instructions); + executed.append(&mut these_executed); } Ok(FullEpoch { epoch, instructions, executed })