diff --git a/Cargo.lock b/Cargo.lock index 1f6b372a..b1613420 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9446,8 +9446,8 @@ dependencies = [ "alloy-sol-macro", "alloy-sol-types", "alloy-transport", + "futures-util", "serai-processor-ethereum-primitives", - "tokio", ] [[package]] @@ -9480,6 +9480,7 @@ dependencies = [ "borsh", "build-solidity-contracts", "ethereum-schnorr-contract", + "futures-util", "group", "k256", "rand_core", diff --git a/processor/ethereum/erc20/Cargo.toml b/processor/ethereum/erc20/Cargo.toml index 21be88c5..078192a4 100644 --- a/processor/ethereum/erc20/Cargo.toml +++ b/processor/ethereum/erc20/Cargo.toml @@ -29,5 +29,4 @@ alloy-provider = { version = "0.9", default-features = false } ethereum-primitives = { package = "serai-processor-ethereum-primitives", path = "../primitives", default-features = false } -# TODO futures-util = { version = "0.3", default-features = false, features = ["std"] } -tokio = { version = "1", default-features = false, features = ["rt"] } +futures-util = { version = "0.3", default-features = false, features = ["std"] } diff --git a/processor/ethereum/erc20/src/lib.rs b/processor/ethereum/erc20/src/lib.rs index e72e357b..df0a3922 100644 --- a/processor/ethereum/erc20/src/lib.rs +++ b/processor/ethereum/erc20/src/lib.rs @@ -2,20 +2,21 @@ #![doc = include_str!("../README.md")] #![deny(missing_docs)] -use std::{sync::Arc, collections::HashSet}; +use core::borrow::Borrow; +use std::{sync::Arc, collections::HashMap}; -use alloy_core::primitives::{Address, B256, U256}; +use alloy_core::primitives::{Address, U256}; use alloy_sol_types::{SolInterface, SolEvent}; -use alloy_rpc_types_eth::{Filter, TransactionTrait}; +use alloy_rpc_types_eth::{Log, Filter, TransactionTrait}; use alloy_transport::{TransportErrorKind, RpcError}; use alloy_simple_request_transport::SimpleRequest; use alloy_provider::{Provider, RootProvider}; use ethereum_primitives::LogIndex; -use tokio::task::JoinSet; +use futures_util::stream::{StreamExt, FuturesUnordered}; #[rustfmt::skip] #[expect(warnings)] @@ -30,6 +31,9 @@ use abi::IERC20::{IERC20Calls, transferCall, transferFromCall}; pub use abi::IERC20::Transfer; /// A top-level ERC20 transfer +/// +/// This does not include `token`, `to` fields. Those are assumed contextual to the creation of +/// this. #[derive(Clone, Debug)] pub struct TopLevelTransfer { /// The ID of the event for this transfer. @@ -46,160 +50,175 @@ pub struct TopLevelTransfer { /// A view for an ERC20 contract. #[derive(Clone, Debug)] -pub struct Erc20(Arc>, Address); +pub struct Erc20 { + provider: Arc>, + address: Address, +} impl Erc20 { /// Construct a new view of the specified ERC20 contract. - pub fn new(provider: Arc>, address: [u8; 20]) -> Self { - Self(provider, Address::from(&address)) + pub fn new(provider: Arc>, address: Address) -> Self { + Self { provider, address } } - /// Match a transaction for its top-level transfer to the specified address (if one exists). - pub async fn match_top_level_transfer( + /// The filter for transfer logs of the specified ERC20, to the specified recipient. + pub fn transfer_filter(from_block: u64, to_block: u64, erc20: Address, to: Address) -> Filter { + let filter = Filter::new().from_block(from_block).to_block(to_block); + filter.address(erc20).event_signature(Transfer::SIGNATURE_HASH).topic2(to.into_word()) + } + + /// Yield the top-level transfer for the specified transaction (if one exists). + /// + /// The passed-in logs MUST be the logs for this transaction. The logs MUST be filtered to the + /// `Transfer` events of the intended token(s) and the intended `to` transferred to. These + /// properties are completely unchecked and assumed to be the case. + /// + /// This does NOT yield THE top-level transfer. If multiple `Transfer` events have identical + /// structure to the top-level transfer call, the earliest `Transfer` event present in the logs + /// is considered the top-level transfer. + // Yielding THE top-level transfer would require tracing the transaction execution and isn't + // worth the effort. + pub async fn top_level_transfer( provider: impl AsRef>, - transaction_hash: B256, - to: Address, + transaction_hash: [u8; 32], + mut transfer_logs: Vec>, ) -> Result, RpcError> { // Fetch the transaction let transaction = - provider.as_ref().get_transaction_by_hash(transaction_hash).await?.ok_or_else(|| { - TransportErrorKind::Custom( - "node didn't have the transaction which emitted a log it had".to_string().into(), - ) - })?; + provider.as_ref().get_transaction_by_hash(transaction_hash.into()).await?.ok_or_else( + || { + TransportErrorKind::Custom( + "node didn't have the transaction which emitted a log it had".to_string().into(), + ) + }, + )?; // If this is a top-level call... // Don't validate the encoding as this can't be re-encoded to an identical bytestring due // to the `InInstruction` appended after the call itself - if let Ok(call) = IERC20Calls::abi_decode(transaction.inner.input(), false) { - // Extract the top-level call's from/to/value - let (from, call_to, value) = match call { - IERC20Calls::transfer(transferCall { to, value }) => (transaction.from, to, value), - IERC20Calls::transferFrom(transferFromCall { from, to, value }) => (from, to, value), - // Treat any other function selectors as unrecognized - _ => return Ok(None), - }; - // If this isn't a transfer to the expected address, return None - if call_to != to { - return Ok(None); + let Ok(call) = IERC20Calls::abi_decode(transaction.inner.input(), false) else { + return Ok(None); + }; + + // Extract the top-level call's from/to/value + let (from, to, value) = match call { + IERC20Calls::transfer(transferCall { to, value }) => (transaction.from, to, value), + IERC20Calls::transferFrom(transferFromCall { from, to, value }) => (from, to, value), + // Treat any other function selectors as unrecognized + _ => return Ok(None), + }; + + // Sort the logs to ensure the the earliest logs are first + transfer_logs.sort_by_key(|log| log.borrow().log_index); + // Find the log for this top-level transfer + for log in transfer_logs { + // Check the log is for the called contract + // This handles the edge case where we're checking if transfers of token X were top-level and + // a transfer of token Y (with equivalent structure) was top-level + if Some(log.borrow().address()) != transaction.inner.to() { + continue; } - // Fetch the transaction's logs - let receipt = - provider.as_ref().get_transaction_receipt(transaction_hash).await?.ok_or_else(|| { - TransportErrorKind::Custom( - "node didn't have receipt for a transaction we were matching for a top-level transfer" - .to_string() - .into(), - ) - })?; + // Since the caller is responsible for filtering these to `Transfer` events, we can assume + // this is a non-compliant ERC20 or an error with the logs fetched. We assume ERC20 + // compliance here, making this an RPC error + let log = log.borrow().log_decode::().map_err(|_| { + TransportErrorKind::Custom("log didn't include a valid transfer event".to_string().into()) + })?; - // Find the log for this transfer - for log in receipt.inner.logs() { - // If this log was emitted by a different contract, continue - if Some(log.address()) != transaction.inner.to() { - continue; - } + let block_hash = log.block_hash.ok_or_else(|| { + TransportErrorKind::Custom("log didn't have its block hash set".to_string().into()) + })?; + let log_index = log.log_index.ok_or_else(|| { + TransportErrorKind::Custom("log didn't have its index set".to_string().into()) + })?; + let log = log.inner.data; - // Check if this is actually a transfer log - // https://github.com/alloy-rs/core/issues/589 - if log.topics().first() != Some(&Transfer::SIGNATURE_HASH) { - continue; - } - - let block_hash = log.block_hash.ok_or_else(|| { - TransportErrorKind::Custom("log didn't have its block hash set".to_string().into()) - })?; - let log_index = log.log_index.ok_or_else(|| { - TransportErrorKind::Custom("log didn't have its index set".to_string().into()) - })?; - let log = log - .log_decode::() - .map_err(|e| { - TransportErrorKind::Custom(format!("failed to decode Transfer log: {e:?}").into()) - })? - .inner - .data; - - // Ensure the top-level transfer is equivalent to the transfer this log represents. Since - // we can't find the exact top-level transfer without tracing the call, we just rule the - // first equivalent transfer as THE top-level transfer - if !((log.from == from) && (log.to == to) && (log.value == value)) { - continue; - } - - // Read the data appended after - let encoded = call.abi_encode(); - let data = transaction.inner.input().as_ref()[encoded.len() ..].to_vec(); - - return Ok(Some(TopLevelTransfer { - id: LogIndex { block_hash: *block_hash, index_within_block: log_index }, - transaction_hash: *transaction_hash, - from: log.from, - amount: log.value, - data, - })); + // Ensure the top-level transfer is equivalent to the transfer this log represents + if !((log.from == from) && (log.to == to) && (log.value == value)) { + continue; } + + // Read the data appended after + let encoded = call.abi_encode(); + let data = transaction.inner.input().as_ref()[encoded.len() ..].to_vec(); + + return Ok(Some(TopLevelTransfer { + id: LogIndex { block_hash: *block_hash, index_within_block: log_index }, + transaction_hash, + from: log.from, + amount: log.value, + data, + })); } Ok(None) } - /// Fetch all top-level transfers to the specified address. + /// Fetch all top-level transfers to the specified address for this token. /// /// The result of this function is unordered. - pub async fn top_level_transfers( + pub async fn top_level_transfers_unordered( &self, - block: u64, + from_block: u64, + to_block: u64, to: Address, ) -> Result, RpcError> { - // Get all transfers within this block - let filter = Filter::new().from_block(block).to_block(block).address(self.1); - let filter = filter.event_signature(Transfer::SIGNATURE_HASH); - let mut to_topic = [0; 32]; - to_topic[12 ..].copy_from_slice(to.as_ref()); - let filter = filter.topic2(B256::from(to_topic)); - let logs = self.0.get_logs(&filter).await?; + // Get all transfers within these blocks + let logs = self + .provider + .get_logs(&Self::transfer_filter(from_block, to_block, self.address, to)) + .await?; - // These logs are for all transactions which performed any transfer - // We now check each transaction for having a top-level transfer to the specified address - let tx_ids = logs - .into_iter() - .map(|log| { - // Double check the address which emitted this log - if log.address() != self.1 { - Err(TransportErrorKind::Custom( - "node returned logs for a different address than requested".to_string().into(), - ))?; - } + // The logs, indexed by their transactions + let mut transaction_logs = HashMap::new(); + // Index the logs by their transactions + for log in logs { + // Double check the address which emitted this log + if log.address() != self.address { + Err(TransportErrorKind::Custom( + "node returned logs for a different address than requested".to_string().into(), + ))?; + } + // Double check the event signature for this log + if log.topics().first() != Some(&Transfer::SIGNATURE_HASH) { + Err(TransportErrorKind::Custom( + "node returned a log for a different topic than filtered to".to_string().into(), + ))?; + } + // Double check the `to` topic + if log.topics().get(2) != Some(&to.into_word()) { + Err(TransportErrorKind::Custom( + "node returned a transfer for a different `to` than filtered to".to_string().into(), + ))?; + } - log.transaction_hash.ok_or_else(|| { + let tx_id = log + .transaction_hash + .ok_or_else(|| { TransportErrorKind::Custom("log didn't specify its transaction hash".to_string().into()) - }) - }) - .collect::, _>>()?; + })? + .0; - let mut join_set = JoinSet::new(); - for tx_id in tx_ids { - join_set.spawn(Self::match_top_level_transfer(self.0.clone(), tx_id, to)); + transaction_logs.entry(tx_id).or_insert_with(|| Vec::with_capacity(1)).push(log); + } + + // Use `FuturesUnordered` so these RPC calls run in parallel + let mut futures = FuturesUnordered::new(); + for (tx_id, transfer_logs) in transaction_logs { + futures.push(Self::top_level_transfer(&self.provider, tx_id, transfer_logs)); } let mut top_level_transfers = vec![]; - while let Some(top_level_transfer) = join_set.join_next().await { - // This is an error if a task panics or aborts - // Panicking on a task panic is desired behavior, and we haven't aborted any tasks - match top_level_transfer.unwrap() { + while let Some(top_level_transfer) = futures.next().await { + match top_level_transfer { // Top-level transfer Ok(Some(top_level_transfer)) => top_level_transfers.push(top_level_transfer), // Not a top-level transfer Ok(None) => continue, // Failed to get this transaction's information so abort - Err(e) => { - join_set.abort_all(); - Err(e)? - } + Err(e) => Err(e)?, } } - Ok(top_level_transfers) } } diff --git a/processor/ethereum/router/Cargo.toml b/processor/ethereum/router/Cargo.toml index 4b737a00..4078ba0e 100644 --- a/processor/ethereum/router/Cargo.toml +++ b/processor/ethereum/router/Cargo.toml @@ -41,6 +41,8 @@ erc20 = { package = "serai-processor-ethereum-erc20", path = "../erc20", default serai-client = { path = "../../../substrate/client", default-features = false, features = ["ethereum"] } +futures-util = { version = "0.3", default-features = false, features = ["std"] } + [build-dependencies] build-solidity-contracts = { path = "../../../networks/ethereum/build-contracts", default-features = false } diff --git a/processor/ethereum/router/src/lib.rs b/processor/ethereum/router/src/lib.rs index fd88a222..9e15c9f9 100644 --- a/processor/ethereum/router/src/lib.rs +++ b/processor/ethereum/router/src/lib.rs @@ -2,7 +2,10 @@ #![doc = include_str!("../README.md")] #![deny(missing_docs)] -use std::{sync::Arc, collections::HashSet}; +use std::{ + sync::Arc, + collections::{HashSet, HashMap}, +}; use borsh::{BorshSerialize, BorshDeserialize}; @@ -21,12 +24,14 @@ use alloy_transport::{TransportErrorKind, RpcError}; use alloy_simple_request_transport::SimpleRequest; use alloy_provider::{Provider, RootProvider}; +use serai_client::networks::ethereum::Address as SeraiAddress; + use ethereum_primitives::LogIndex; use ethereum_schnorr::{PublicKey, Signature}; use ethereum_deployer::Deployer; use erc20::{Transfer, Erc20}; -use serai_client::networks::ethereum::Address as SeraiAddress; +use futures_util::stream::{StreamExt, FuturesUnordered}; #[rustfmt::skip] #[expect(warnings)] @@ -397,25 +402,33 @@ impl Router { } /// Fetch the `InInstruction`s emitted by the Router from this block. - pub async fn in_instructions( + /// + /// This is not guaranteed to return them in any order. + pub async fn in_instructions_unordered( &self, - block: u64, + from_block: u64, + to_block: u64, allowed_tokens: &HashSet
, ) -> Result, RpcError> { // The InInstruction events for this block - let filter = Filter::new().from_block(block).to_block(block).address(self.address); - let filter = filter.event_signature(InInstructionEvent::SIGNATURE_HASH); - let mut logs = self.provider.get_logs(&filter).await?; - logs.sort_by_key(|log| (log.block_number, log.log_index)); + let logs = { + let filter = Filter::new().from_block(from_block).to_block(to_block).address(self.address); + let filter = filter.event_signature(InInstructionEvent::SIGNATURE_HASH); + self.provider.get_logs(&filter).await? + }; + let mut in_instructions = Vec::with_capacity(logs.len()); /* We check that for all InInstructions for ERC20s emitted, a corresponding transfer occurred. - In order to prevent a transfer from being used to justify multiple distinct InInstructions, - we insert the transfer's log index into this HashSet. - */ - let mut transfer_check = HashSet::new(); + On this initial loop, we just queue the ERC20 InInstructions for later verification. - let mut in_instructions = vec![]; + We don't do this for ETH as it'd require tracing the transaction, which is non-trivial. It + also isn't necessary as all of this is solely defense in depth. + */ + let mut erc20s = HashSet::new(); + let mut erc20_transfer_logs = FuturesUnordered::new(); + let mut erc20_transactions = HashSet::new(); + let mut erc20_in_instructions = vec![]; for log in logs { // Double check the address which emitted this log if log.address() != self.address { @@ -423,6 +436,10 @@ impl Router { "node returned a log from a different address than requested".to_string().into(), ))?; } + // Double check this is a InInstruction log + if log.topics().first() != Some(&InInstructionEvent::SIGNATURE_HASH) { + continue; + } let id = LogIndex { block_hash: log @@ -439,6 +456,7 @@ impl Router { let transaction_hash = log.transaction_hash.ok_or_else(|| { TransportErrorKind::Custom("log didn't have its transaction hash set".to_string().into()) })?; + let transaction_hash = *transaction_hash; let log = log .log_decode::() @@ -451,82 +469,148 @@ impl Router { .data; let coin = Coin::from(log.coin); - if let Coin::Erc20(token) = coin { - if !allowed_tokens.contains(&token) { - continue; - } - // Get all logs for this TX - let receipt = - self.provider.get_transaction_receipt(transaction_hash).await?.ok_or_else(|| { - TransportErrorKind::Custom( - "node didn't have the receipt for a transaction it had".to_string().into(), - ) - })?; - let tx_logs = receipt.inner.logs(); - - /* - The transfer which causes an InInstruction event won't be a top-level transfer. - Accordingly, when looking for the matching transfer, disregard the top-level transfer (if - one exists). - */ - if let Some(matched) = - Erc20::match_top_level_transfer(&self.provider, transaction_hash, self.address).await? - { - // Mark this log index as used so it isn't used again - transfer_check.insert(matched.id.index_within_block); - } - - // Find a matching transfer log - let mut found_transfer = false; - for tx_log in tx_logs { - let log_index = tx_log.log_index.ok_or_else(|| { - TransportErrorKind::Custom( - "log in transaction receipt didn't have its log index set".to_string().into(), - ) - })?; - - // Ensure we didn't already use this transfer to check a distinct InInstruction event - if transfer_check.contains(&log_index) { - continue; - } - - // Check if this log is from the token we expected to be transferred - if tx_log.address() != token { - continue; - } - // Check if this is a transfer log - // https://github.com/alloy-rs/core/issues/589 - if tx_log.topics().first() != Some(&Transfer::SIGNATURE_HASH) { - continue; - } - let Ok(transfer) = Transfer::decode_log(&tx_log.inner.clone(), true) else { continue }; - // Check if this is a transfer to us for the expected amount - if (transfer.to == self.address) && (transfer.value == log.amount) { - transfer_check.insert(log_index); - found_transfer = true; - break; - } - } - if !found_transfer { - // This shouldn't be a simple error - // This is an exploit, a non-conforming ERC20, or a malicious connection - // This should halt the process. While this is sufficient, it's sub-optimal - // TODO - Err(TransportErrorKind::Custom( - "ERC20 InInstruction with no matching transfer log".to_string().into(), - ))?; - } - }; - - in_instructions.push(InInstruction { + let in_instruction = InInstruction { id, - transaction_hash: *transaction_hash, + transaction_hash, from: log.from, coin, amount: log.amount, data: log.instruction.as_ref().to_vec(), - }); + }; + + match coin { + Coin::Ether => in_instructions.push(in_instruction), + Coin::Erc20(token) => { + if !allowed_tokens.contains(&token) { + continue; + } + + // Fetch the ERC20 transfer events necessary to verify this InInstruction has a matching + // transfer + if !erc20s.contains(&token) { + erc20s.insert(token); + erc20_transfer_logs.push(async move { + let filter = Erc20::transfer_filter(from_block, to_block, token, self.address); + self.provider.get_logs(&filter).await.map(|logs| (token, logs)) + }); + } + erc20_transactions.insert(transaction_hash); + erc20_in_instructions.push((transaction_hash, in_instruction)) + } + } + } + + // Collect the ERC20 transfer logs + let erc20_transfer_logs = { + let mut collected = HashMap::with_capacity(erc20s.len()); + while let Some(token_and_logs) = erc20_transfer_logs.next().await { + let (token, logs) = token_and_logs?; + collected.insert(token, logs); + } + collected + }; + + /* + For each transaction, it may have a top-level ERC20 transfer. That top-level transfer won't + be the transfer caused by the call to `inInstruction`, so we shouldn't consider it + justification for this `InInstruction` event. + + Fetch all top-level transfers here so we can ignore them. + */ + let mut erc20_top_level_transfers = FuturesUnordered::new(); + let mut transaction_transfer_logs = HashMap::new(); + for transaction in erc20_transactions { + // Filter to the logs for this specific transaction + let logs = erc20_transfer_logs + .values() + .flat_map(|logs_per_token| logs_per_token.iter()) + .filter_map(|log| { + let log_transaction_hash = log.transaction_hash.ok_or_else(|| { + TransportErrorKind::Custom( + "log didn't have its transaction hash set".to_string().into(), + ) + }); + match log_transaction_hash { + Ok(log_transaction_hash) => { + if log_transaction_hash == transaction { + Some(Ok(log)) + } else { + None + } + } + Err(e) => Some(Err(e)), + } + }) + .collect::, _>>()?; + + // Find the top-level transfer + erc20_top_level_transfers.push(Erc20::top_level_transfer( + &self.provider, + transaction, + logs.clone(), + )); + // Keep the transaction-indexed logs for the actual justifying + transaction_transfer_logs.insert(transaction, logs); + } + + /* + In order to prevent a single transfer from being used to justify multiple distinct + InInstructions, we insert the transfer's log index into this HashSet. + */ + let mut already_used_to_justify = HashSet::new(); + + // Collect the top-level transfers + while let Some(erc20_top_level_transfer) = erc20_top_level_transfers.next().await { + let erc20_top_level_transfer = erc20_top_level_transfer?; + // If this transaction had a top-level transfer... + if let Some(erc20_top_level_transfer) = erc20_top_level_transfer { + // Mark this log index as used so it isn't used again + already_used_to_justify.insert(erc20_top_level_transfer.id.index_within_block); + } + } + + // Now, for each ERC20 InInstruction, find a justifying transfer log + for (transaction_hash, in_instruction) in erc20_in_instructions { + let mut justified = false; + for log in &transaction_transfer_logs[&transaction_hash] { + let log_index = log.log_index.ok_or_else(|| { + TransportErrorKind::Custom( + "log in transaction receipt didn't have its log index set".to_string().into(), + ) + })?; + + // Ensure we didn't already use this transfer to check a distinct InInstruction event + if already_used_to_justify.contains(&log_index) { + continue; + } + + // Check if this log is from the token we expected to be transferred + if log.address() != Address::from(in_instruction.coin) { + continue; + } + // Check if this is a transfer log + if log.topics().first() != Some(&Transfer::SIGNATURE_HASH) { + continue; + } + let Ok(transfer) = Transfer::decode_log(&log.inner.clone(), true) else { continue }; + // Check if this aligns with the InInstruction + if (transfer.from == in_instruction.from) && + (transfer.to == self.address) && + (transfer.value == in_instruction.amount) + { + already_used_to_justify.insert(log_index); + justified = true; + break; + } + } + if !justified { + // This is an exploit, a non-conforming ERC20, or an invalid connection + Err(TransportErrorKind::Custom( + "ERC20 InInstruction with no matching transfer log".to_string().into(), + ))?; + } + in_instructions.push(in_instruction); } Ok(in_instructions) diff --git a/processor/ethereum/src/primitives/block.rs b/processor/ethereum/src/primitives/block.rs index 5804114f..b01493f0 100644 --- a/processor/ethereum/src/primitives/block.rs +++ b/processor/ethereum/src/primitives/block.rs @@ -32,6 +32,7 @@ impl primitives::BlockHeader for Epoch { #[derive(Clone, PartialEq, Eq, Debug)] pub(crate) struct FullEpoch { pub(crate) epoch: Epoch, + /// The unordered list of `InInstruction`s within this epoch pub(crate) instructions: Vec, pub(crate) executed: Vec, } diff --git a/processor/ethereum/src/rpc.rs b/processor/ethereum/src/rpc.rs index 128db1e4..480c9440 100644 --- a/processor/ethereum/src/rpc.rs +++ b/processor/ethereum/src/rpc.rs @@ -162,12 +162,14 @@ impl ScannerFeed for Rpc { router: Router, block: Header, ) -> Result<(Vec, Vec), RpcError> { - let mut instructions = router.in_instructions(block.number, &HashSet::from(TOKENS)).await?; + let mut instructions = router + .in_instructions_unordered(block.number, block.number, &HashSet::from(TOKENS)) + .await?; for token in TOKENS { for TopLevelTransfer { id, transaction_hash, from, amount, data } in - Erc20::new(provider.clone(), **token) - .top_level_transfers(block.number, router.address()) + Erc20::new(provider.clone(), token) + .top_level_transfers_unordered(block.number, block.number, router.address()) .await? { instructions.push(EthereumInInstruction {