Simplify async code in in_instructions_unordered

Outsources fetching the ERC20 events to top_level_transfers_unordered.
This commit is contained in:
Luke Parker
2025-01-24 05:34:49 -05:00
parent 201b675031
commit f948881eba
6 changed files with 284 additions and 324 deletions

View File

@@ -29,7 +29,7 @@ use serai_client::{
use ethereum_primitives::LogIndex;
use ethereum_schnorr::{PublicKey, Signature};
use ethereum_deployer::Deployer;
use erc20::{Transfer, Erc20};
use erc20::{Transfer, TopLevelTransfer, TopLevelTransfers, Erc20};
use futures_util::stream::{StreamExt, FuturesUnordered};
@@ -451,35 +451,66 @@ impl Router {
}
}
/// Fetch the `InInstruction`s emitted by the Router from this block.
/// Fetch the `InInstruction`s for the Router for the specified inclusive range of blocks.
///
/// This includes all `InInstruction` events from the Router and all top-level transfers to the
/// Router.
///
/// This is not guaranteed to return them in any order.
pub async fn in_instructions_unordered(
&self,
from_block: u64,
to_block: u64,
allowed_tokens: &HashSet<Address>,
allowed_erc20s: &HashSet<Address>,
) -> Result<Vec<InInstruction>, RpcError<TransportErrorKind>> {
// The InInstruction events for this block
let logs = {
let in_instruction_logs = {
let filter = Filter::new().from_block(from_block).to_block(to_block).address(self.address);
let filter = filter.event_signature(InInstructionEvent::SIGNATURE_HASH);
self.provider.get_logs(&filter).await?
};
let mut in_instructions = Vec::with_capacity(logs.len());
/*
We check that for all InInstructions for ERC20s emitted, a corresponding transfer occurred.
On this initial loop, we just queue the ERC20 InInstructions for later verification.
// Define the Vec for the result now that we have the logs as a size hint
let mut in_instructions = Vec::with_capacity(in_instruction_logs.len());
We don't do this for ETH as it'd require tracing the transaction, which is non-trivial. It
also isn't necessary as all of this is solely defense in depth.
*/
let mut erc20s = HashSet::new();
let mut erc20_transfer_logs = FuturesUnordered::new();
let mut erc20_transactions = HashSet::new();
let mut erc20_in_instructions = vec![];
for log in logs {
// Handle the top-level transfers for this block
let mut justifying_erc20_transfer_logs = HashSet::new();
let erc20_transfer_logs = {
let mut transfers = FuturesUnordered::new();
for erc20 in allowed_erc20s {
transfers.push(async move {
(
erc20,
Erc20::top_level_transfers_unordered(
&self.provider,
from_block,
to_block,
*erc20,
self.address,
)
.await,
)
});
}
let mut logs = HashMap::with_capacity(allowed_erc20s.len());
while let Some((token, transfers)) = transfers.next().await {
let TopLevelTransfers { logs: token_logs, transfers } = transfers?;
logs.insert(token, token_logs);
// Map the top-level transfer to an InInstruction
for transfer in transfers {
let TopLevelTransfer { id, transaction_hash, from, amount, data } = transfer;
justifying_erc20_transfer_logs.insert(transfer.id);
let in_instruction =
InInstruction { id, transaction_hash, from, coin: Coin::Erc20(*token), amount, data };
in_instructions.push(in_instruction);
}
}
logs
};
// Now handle the InInstruction events
for log in in_instruction_logs {
// Double check the address which emitted this log
if log.address() != self.address {
Err(TransportErrorKind::Custom(
@@ -491,18 +522,22 @@ impl Router {
continue;
}
let id = LogIndex {
block_hash: log
.block_hash
.ok_or_else(|| {
TransportErrorKind::Custom("log didn't have its block hash set".to_string().into())
})?
.into(),
index_within_block: log.log_index.ok_or_else(|| {
TransportErrorKind::Custom("log didn't have its index set".to_string().into())
})?,
let log_index = |log: &Log| -> Result<LogIndex, TransportErrorKind> {
Ok(LogIndex {
block_hash: log
.block_hash
.ok_or_else(|| {
TransportErrorKind::Custom("log didn't have its block hash set".to_string().into())
})?
.into(),
index_within_block: log.log_index.ok_or_else(|| {
TransportErrorKind::Custom("log didn't have its index set".to_string().into())
})?,
})
};
let id = log_index(&log)?;
let transaction_hash = log.transaction_hash.ok_or_else(|| {
TransportErrorKind::Custom("log didn't have its transaction hash set".to_string().into())
})?;
@@ -530,135 +565,57 @@ impl Router {
};
match coin {
Coin::Ether => in_instructions.push(in_instruction),
Coin::Ether => {}
Coin::Erc20(token) => {
if !allowed_tokens.contains(&token) {
// Check this is an allowed token
if !allowed_erc20s.contains(&token) {
continue;
}
// Fetch the ERC20 transfer events necessary to verify this InInstruction has a matching
// transfer
if !erc20s.contains(&token) {
erc20s.insert(token);
erc20_transfer_logs.push(async move {
let filter = Erc20::transfer_filter(from_block, to_block, token, self.address);
self.provider.get_logs(&filter).await.map(|logs| (token, logs))
});
}
erc20_transactions.insert(transaction_hash);
erc20_in_instructions.push((transaction_hash, in_instruction))
}
}
}
/*
We check that for all InInstructions for ERC20s emitted, a corresponding transfer
occurred.
// Collect the ERC20 transfer logs
let erc20_transfer_logs = {
let mut collected = HashMap::with_capacity(erc20s.len());
while let Some(token_and_logs) = erc20_transfer_logs.next().await {
let (token, logs) = token_and_logs?;
collected.insert(token, logs);
}
collected
};
We don't do this for ETH as it'd require tracing the transaction, which is non-trivial.
It also isn't necessary as all of this is solely defense in depth.
*/
let mut justified = false;
// These logs are returned from `top_level_transfers_unordered` and we don't require any
// ordering of them
for log in erc20_transfer_logs[&token].get(&transaction_hash).unwrap_or(&vec![]) {
let log_index = log_index(log)?;
/*
For each transaction, it may have a top-level ERC20 transfer. That top-level transfer won't
be the transfer caused by the call to `inInstruction`, so we shouldn't consider it
justification for this `InInstruction` event.
Fetch all top-level transfers here so we can ignore them.
*/
let mut erc20_top_level_transfers = FuturesUnordered::new();
let mut transaction_transfer_logs = HashMap::new();
for transaction in erc20_transactions {
// Filter to the logs for this specific transaction
let logs = erc20_transfer_logs
.values()
.flat_map(|logs_per_token| logs_per_token.iter())
.filter_map(|log| {
let log_transaction_hash = log.transaction_hash.ok_or_else(|| {
TransportErrorKind::Custom(
"log didn't have its transaction hash set".to_string().into(),
)
});
match log_transaction_hash {
Ok(log_transaction_hash) => {
if log_transaction_hash == transaction {
Some(Ok(log))
} else {
None
}
// Ensure we didn't already use this transfer to justify a distinct InInstruction
if justifying_erc20_transfer_logs.contains(&log_index) {
continue;
}
// Check if this log is from the token we expected to be transferred
if log.address() != Address::from(in_instruction.coin) {
continue;
}
// Check if this is a transfer log
if log.topics().first() != Some(&Transfer::SIGNATURE_HASH) {
continue;
}
let Ok(transfer) = Transfer::decode_log(&log.inner.clone(), true) else { continue };
// Check if this aligns with the InInstruction
if (transfer.from == in_instruction.from) &&
(transfer.to == self.address) &&
(transfer.value == in_instruction.amount)
{
justifying_erc20_transfer_logs.insert(log_index);
justified = true;
break;
}
Err(e) => Some(Err(e)),
}
})
.collect::<Result<Vec<_>, _>>()?;
// Find the top-level transfer
erc20_top_level_transfers.push(Erc20::top_level_transfer(
&self.provider,
transaction,
logs.clone(),
));
// Keep the transaction-indexed logs for the actual justifying
transaction_transfer_logs.insert(transaction, logs);
}
/*
In order to prevent a single transfer from being used to justify multiple distinct
InInstructions, we insert the transfer's log index into this HashSet.
*/
let mut already_used_to_justify = HashSet::new();
// Collect the top-level transfers
while let Some(erc20_top_level_transfer) = erc20_top_level_transfers.next().await {
let erc20_top_level_transfer = erc20_top_level_transfer?;
// If this transaction had a top-level transfer...
if let Some(erc20_top_level_transfer) = erc20_top_level_transfer {
// Mark this log index as used so it isn't used again
already_used_to_justify.insert(erc20_top_level_transfer.id.index_within_block);
}
}
// Now, for each ERC20 InInstruction, find a justifying transfer log
for (transaction_hash, in_instruction) in erc20_in_instructions {
let mut justified = false;
for log in &transaction_transfer_logs[&transaction_hash] {
let log_index = log.log_index.ok_or_else(|| {
TransportErrorKind::Custom(
"log in transaction receipt didn't have its log index set".to_string().into(),
)
})?;
// Ensure we didn't already use this transfer to check a distinct InInstruction event
if already_used_to_justify.contains(&log_index) {
continue;
if !justified {
// This is an exploit, a non-conforming ERC20, or an invalid connection
Err(TransportErrorKind::Custom(
"ERC20 InInstruction with no matching transfer log".to_string().into(),
))?;
}
}
// Check if this log is from the token we expected to be transferred
if log.address() != Address::from(in_instruction.coin) {
continue;
}
// Check if this is a transfer log
if log.topics().first() != Some(&Transfer::SIGNATURE_HASH) {
continue;
}
let Ok(transfer) = Transfer::decode_log(&log.inner.clone(), true) else { continue };
// Check if this aligns with the InInstruction
if (transfer.from == in_instruction.from) &&
(transfer.to == self.address) &&
(transfer.value == in_instruction.amount)
{
already_used_to_justify.insert(log_index);
justified = true;
break;
}
}
if !justified {
// This is an exploit, a non-conforming ERC20, or an invalid connection
Err(TransportErrorKind::Custom(
"ERC20 InInstruction with no matching transfer log".to_string().into(),
))?;
}
in_instructions.push(in_instruction);
}
@@ -666,7 +623,7 @@ impl Router {
Ok(in_instructions)
}
/// Fetch the executed actions from this block.
/// Fetch the executed actions for the specified range of blocks.
pub async fn executed(
&self,
from_block: u64,