mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-08 20:29:23 +00:00
Fix the async flow with the Router
It had sequential async calls with complexity O(n), with a variety of redundant calls. There was also a constant of... 4? 5? for each item. Now, the total sequence depth is just 3-4.
This commit is contained in:
@@ -2,20 +2,21 @@
|
||||
#![doc = include_str!("../README.md")]
|
||||
#![deny(missing_docs)]
|
||||
|
||||
use std::{sync::Arc, collections::HashSet};
|
||||
use core::borrow::Borrow;
|
||||
use std::{sync::Arc, collections::HashMap};
|
||||
|
||||
use alloy_core::primitives::{Address, B256, U256};
|
||||
use alloy_core::primitives::{Address, U256};
|
||||
|
||||
use alloy_sol_types::{SolInterface, SolEvent};
|
||||
|
||||
use alloy_rpc_types_eth::{Filter, TransactionTrait};
|
||||
use alloy_rpc_types_eth::{Log, Filter, TransactionTrait};
|
||||
use alloy_transport::{TransportErrorKind, RpcError};
|
||||
use alloy_simple_request_transport::SimpleRequest;
|
||||
use alloy_provider::{Provider, RootProvider};
|
||||
|
||||
use ethereum_primitives::LogIndex;
|
||||
|
||||
use tokio::task::JoinSet;
|
||||
use futures_util::stream::{StreamExt, FuturesUnordered};
|
||||
|
||||
#[rustfmt::skip]
|
||||
#[expect(warnings)]
|
||||
@@ -30,6 +31,9 @@ use abi::IERC20::{IERC20Calls, transferCall, transferFromCall};
|
||||
pub use abi::IERC20::Transfer;
|
||||
|
||||
/// A top-level ERC20 transfer
|
||||
///
|
||||
/// This does not include `token`, `to` fields. Those are assumed contextual to the creation of
|
||||
/// this.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct TopLevelTransfer {
|
||||
/// The ID of the event for this transfer.
|
||||
@@ -46,160 +50,175 @@ pub struct TopLevelTransfer {
|
||||
|
||||
/// A view for an ERC20 contract.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Erc20(Arc<RootProvider<SimpleRequest>>, Address);
|
||||
pub struct Erc20 {
|
||||
provider: Arc<RootProvider<SimpleRequest>>,
|
||||
address: Address,
|
||||
}
|
||||
impl Erc20 {
|
||||
/// Construct a new view of the specified ERC20 contract.
|
||||
pub fn new(provider: Arc<RootProvider<SimpleRequest>>, address: [u8; 20]) -> Self {
|
||||
Self(provider, Address::from(&address))
|
||||
pub fn new(provider: Arc<RootProvider<SimpleRequest>>, address: Address) -> Self {
|
||||
Self { provider, address }
|
||||
}
|
||||
|
||||
/// Match a transaction for its top-level transfer to the specified address (if one exists).
|
||||
pub async fn match_top_level_transfer(
|
||||
/// The filter for transfer logs of the specified ERC20, to the specified recipient.
|
||||
pub fn transfer_filter(from_block: u64, to_block: u64, erc20: Address, to: Address) -> Filter {
|
||||
let filter = Filter::new().from_block(from_block).to_block(to_block);
|
||||
filter.address(erc20).event_signature(Transfer::SIGNATURE_HASH).topic2(to.into_word())
|
||||
}
|
||||
|
||||
/// Yield the top-level transfer for the specified transaction (if one exists).
|
||||
///
|
||||
/// The passed-in logs MUST be the logs for this transaction. The logs MUST be filtered to the
|
||||
/// `Transfer` events of the intended token(s) and the intended `to` transferred to. These
|
||||
/// properties are completely unchecked and assumed to be the case.
|
||||
///
|
||||
/// This does NOT yield THE top-level transfer. If multiple `Transfer` events have identical
|
||||
/// structure to the top-level transfer call, the earliest `Transfer` event present in the logs
|
||||
/// is considered the top-level transfer.
|
||||
// Yielding THE top-level transfer would require tracing the transaction execution and isn't
|
||||
// worth the effort.
|
||||
pub async fn top_level_transfer(
|
||||
provider: impl AsRef<RootProvider<SimpleRequest>>,
|
||||
transaction_hash: B256,
|
||||
to: Address,
|
||||
transaction_hash: [u8; 32],
|
||||
mut transfer_logs: Vec<impl Borrow<Log>>,
|
||||
) -> Result<Option<TopLevelTransfer>, RpcError<TransportErrorKind>> {
|
||||
// Fetch the transaction
|
||||
let transaction =
|
||||
provider.as_ref().get_transaction_by_hash(transaction_hash).await?.ok_or_else(|| {
|
||||
TransportErrorKind::Custom(
|
||||
"node didn't have the transaction which emitted a log it had".to_string().into(),
|
||||
)
|
||||
})?;
|
||||
provider.as_ref().get_transaction_by_hash(transaction_hash.into()).await?.ok_or_else(
|
||||
|| {
|
||||
TransportErrorKind::Custom(
|
||||
"node didn't have the transaction which emitted a log it had".to_string().into(),
|
||||
)
|
||||
},
|
||||
)?;
|
||||
|
||||
// If this is a top-level call...
|
||||
// Don't validate the encoding as this can't be re-encoded to an identical bytestring due
|
||||
// to the `InInstruction` appended after the call itself
|
||||
if let Ok(call) = IERC20Calls::abi_decode(transaction.inner.input(), false) {
|
||||
// Extract the top-level call's from/to/value
|
||||
let (from, call_to, value) = match call {
|
||||
IERC20Calls::transfer(transferCall { to, value }) => (transaction.from, to, value),
|
||||
IERC20Calls::transferFrom(transferFromCall { from, to, value }) => (from, to, value),
|
||||
// Treat any other function selectors as unrecognized
|
||||
_ => return Ok(None),
|
||||
};
|
||||
// If this isn't a transfer to the expected address, return None
|
||||
if call_to != to {
|
||||
return Ok(None);
|
||||
let Ok(call) = IERC20Calls::abi_decode(transaction.inner.input(), false) else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
// Extract the top-level call's from/to/value
|
||||
let (from, to, value) = match call {
|
||||
IERC20Calls::transfer(transferCall { to, value }) => (transaction.from, to, value),
|
||||
IERC20Calls::transferFrom(transferFromCall { from, to, value }) => (from, to, value),
|
||||
// Treat any other function selectors as unrecognized
|
||||
_ => return Ok(None),
|
||||
};
|
||||
|
||||
// Sort the logs to ensure the the earliest logs are first
|
||||
transfer_logs.sort_by_key(|log| log.borrow().log_index);
|
||||
// Find the log for this top-level transfer
|
||||
for log in transfer_logs {
|
||||
// Check the log is for the called contract
|
||||
// This handles the edge case where we're checking if transfers of token X were top-level and
|
||||
// a transfer of token Y (with equivalent structure) was top-level
|
||||
if Some(log.borrow().address()) != transaction.inner.to() {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Fetch the transaction's logs
|
||||
let receipt =
|
||||
provider.as_ref().get_transaction_receipt(transaction_hash).await?.ok_or_else(|| {
|
||||
TransportErrorKind::Custom(
|
||||
"node didn't have receipt for a transaction we were matching for a top-level transfer"
|
||||
.to_string()
|
||||
.into(),
|
||||
)
|
||||
})?;
|
||||
// Since the caller is responsible for filtering these to `Transfer` events, we can assume
|
||||
// this is a non-compliant ERC20 or an error with the logs fetched. We assume ERC20
|
||||
// compliance here, making this an RPC error
|
||||
let log = log.borrow().log_decode::<Transfer>().map_err(|_| {
|
||||
TransportErrorKind::Custom("log didn't include a valid transfer event".to_string().into())
|
||||
})?;
|
||||
|
||||
// Find the log for this transfer
|
||||
for log in receipt.inner.logs() {
|
||||
// If this log was emitted by a different contract, continue
|
||||
if Some(log.address()) != transaction.inner.to() {
|
||||
continue;
|
||||
}
|
||||
let block_hash = log.block_hash.ok_or_else(|| {
|
||||
TransportErrorKind::Custom("log didn't have its block hash set".to_string().into())
|
||||
})?;
|
||||
let log_index = log.log_index.ok_or_else(|| {
|
||||
TransportErrorKind::Custom("log didn't have its index set".to_string().into())
|
||||
})?;
|
||||
let log = log.inner.data;
|
||||
|
||||
// Check if this is actually a transfer log
|
||||
// https://github.com/alloy-rs/core/issues/589
|
||||
if log.topics().first() != Some(&Transfer::SIGNATURE_HASH) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let block_hash = log.block_hash.ok_or_else(|| {
|
||||
TransportErrorKind::Custom("log didn't have its block hash set".to_string().into())
|
||||
})?;
|
||||
let log_index = log.log_index.ok_or_else(|| {
|
||||
TransportErrorKind::Custom("log didn't have its index set".to_string().into())
|
||||
})?;
|
||||
let log = log
|
||||
.log_decode::<Transfer>()
|
||||
.map_err(|e| {
|
||||
TransportErrorKind::Custom(format!("failed to decode Transfer log: {e:?}").into())
|
||||
})?
|
||||
.inner
|
||||
.data;
|
||||
|
||||
// Ensure the top-level transfer is equivalent to the transfer this log represents. Since
|
||||
// we can't find the exact top-level transfer without tracing the call, we just rule the
|
||||
// first equivalent transfer as THE top-level transfer
|
||||
if !((log.from == from) && (log.to == to) && (log.value == value)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Read the data appended after
|
||||
let encoded = call.abi_encode();
|
||||
let data = transaction.inner.input().as_ref()[encoded.len() ..].to_vec();
|
||||
|
||||
return Ok(Some(TopLevelTransfer {
|
||||
id: LogIndex { block_hash: *block_hash, index_within_block: log_index },
|
||||
transaction_hash: *transaction_hash,
|
||||
from: log.from,
|
||||
amount: log.value,
|
||||
data,
|
||||
}));
|
||||
// Ensure the top-level transfer is equivalent to the transfer this log represents
|
||||
if !((log.from == from) && (log.to == to) && (log.value == value)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Read the data appended after
|
||||
let encoded = call.abi_encode();
|
||||
let data = transaction.inner.input().as_ref()[encoded.len() ..].to_vec();
|
||||
|
||||
return Ok(Some(TopLevelTransfer {
|
||||
id: LogIndex { block_hash: *block_hash, index_within_block: log_index },
|
||||
transaction_hash,
|
||||
from: log.from,
|
||||
amount: log.value,
|
||||
data,
|
||||
}));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Fetch all top-level transfers to the specified address.
|
||||
/// Fetch all top-level transfers to the specified address for this token.
|
||||
///
|
||||
/// The result of this function is unordered.
|
||||
pub async fn top_level_transfers(
|
||||
pub async fn top_level_transfers_unordered(
|
||||
&self,
|
||||
block: u64,
|
||||
from_block: u64,
|
||||
to_block: u64,
|
||||
to: Address,
|
||||
) -> Result<Vec<TopLevelTransfer>, RpcError<TransportErrorKind>> {
|
||||
// Get all transfers within this block
|
||||
let filter = Filter::new().from_block(block).to_block(block).address(self.1);
|
||||
let filter = filter.event_signature(Transfer::SIGNATURE_HASH);
|
||||
let mut to_topic = [0; 32];
|
||||
to_topic[12 ..].copy_from_slice(to.as_ref());
|
||||
let filter = filter.topic2(B256::from(to_topic));
|
||||
let logs = self.0.get_logs(&filter).await?;
|
||||
// Get all transfers within these blocks
|
||||
let logs = self
|
||||
.provider
|
||||
.get_logs(&Self::transfer_filter(from_block, to_block, self.address, to))
|
||||
.await?;
|
||||
|
||||
// These logs are for all transactions which performed any transfer
|
||||
// We now check each transaction for having a top-level transfer to the specified address
|
||||
let tx_ids = logs
|
||||
.into_iter()
|
||||
.map(|log| {
|
||||
// Double check the address which emitted this log
|
||||
if log.address() != self.1 {
|
||||
Err(TransportErrorKind::Custom(
|
||||
"node returned logs for a different address than requested".to_string().into(),
|
||||
))?;
|
||||
}
|
||||
// The logs, indexed by their transactions
|
||||
let mut transaction_logs = HashMap::new();
|
||||
// Index the logs by their transactions
|
||||
for log in logs {
|
||||
// Double check the address which emitted this log
|
||||
if log.address() != self.address {
|
||||
Err(TransportErrorKind::Custom(
|
||||
"node returned logs for a different address than requested".to_string().into(),
|
||||
))?;
|
||||
}
|
||||
// Double check the event signature for this log
|
||||
if log.topics().first() != Some(&Transfer::SIGNATURE_HASH) {
|
||||
Err(TransportErrorKind::Custom(
|
||||
"node returned a log for a different topic than filtered to".to_string().into(),
|
||||
))?;
|
||||
}
|
||||
// Double check the `to` topic
|
||||
if log.topics().get(2) != Some(&to.into_word()) {
|
||||
Err(TransportErrorKind::Custom(
|
||||
"node returned a transfer for a different `to` than filtered to".to_string().into(),
|
||||
))?;
|
||||
}
|
||||
|
||||
log.transaction_hash.ok_or_else(|| {
|
||||
let tx_id = log
|
||||
.transaction_hash
|
||||
.ok_or_else(|| {
|
||||
TransportErrorKind::Custom("log didn't specify its transaction hash".to_string().into())
|
||||
})
|
||||
})
|
||||
.collect::<Result<HashSet<_>, _>>()?;
|
||||
})?
|
||||
.0;
|
||||
|
||||
let mut join_set = JoinSet::new();
|
||||
for tx_id in tx_ids {
|
||||
join_set.spawn(Self::match_top_level_transfer(self.0.clone(), tx_id, to));
|
||||
transaction_logs.entry(tx_id).or_insert_with(|| Vec::with_capacity(1)).push(log);
|
||||
}
|
||||
|
||||
// Use `FuturesUnordered` so these RPC calls run in parallel
|
||||
let mut futures = FuturesUnordered::new();
|
||||
for (tx_id, transfer_logs) in transaction_logs {
|
||||
futures.push(Self::top_level_transfer(&self.provider, tx_id, transfer_logs));
|
||||
}
|
||||
|
||||
let mut top_level_transfers = vec![];
|
||||
while let Some(top_level_transfer) = join_set.join_next().await {
|
||||
// This is an error if a task panics or aborts
|
||||
// Panicking on a task panic is desired behavior, and we haven't aborted any tasks
|
||||
match top_level_transfer.unwrap() {
|
||||
while let Some(top_level_transfer) = futures.next().await {
|
||||
match top_level_transfer {
|
||||
// Top-level transfer
|
||||
Ok(Some(top_level_transfer)) => top_level_transfers.push(top_level_transfer),
|
||||
// Not a top-level transfer
|
||||
Ok(None) => continue,
|
||||
// Failed to get this transaction's information so abort
|
||||
Err(e) => {
|
||||
join_set.abort_all();
|
||||
Err(e)?
|
||||
}
|
||||
Err(e) => Err(e)?,
|
||||
}
|
||||
}
|
||||
|
||||
Ok(top_level_transfers)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user