diff --git a/Cargo.lock b/Cargo.lock index 7e51ec8a..9afcdcc8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8334,6 +8334,7 @@ dependencies = [ "alloy-rpc-client", "alloy-rpc-types-eth", "alloy-simple-request-transport", + "alloy-transport", "borsh", "ciphersuite", "const-hex", diff --git a/processor/ethereum/Cargo.toml b/processor/ethereum/Cargo.toml index 9a3b264c..649e3fb8 100644 --- a/processor/ethereum/Cargo.toml +++ b/processor/ethereum/Cargo.toml @@ -35,6 +35,7 @@ alloy-rlp = { version = "0.3", default-features = false } alloy-consensus = { version = "0.3", default-features = false } alloy-rpc-types-eth = { version = "0.3", default-features = false } +alloy-transport = { version = "0.3", default-features = false } alloy-simple-request-transport = { path = "../../networks/ethereum/alloy-simple-request-transport", default-features = false } alloy-rpc-client = { version = "0.3", default-features = false } alloy-provider = { version = "0.3", default-features = false } diff --git a/processor/ethereum/src/main.rs b/processor/ethereum/src/main.rs index 06c0bc98..0ebf0f59 100644 --- a/processor/ethereum/src/main.rs +++ b/processor/ethereum/src/main.rs @@ -30,14 +30,13 @@ use publisher::TransactionPublisher; #[tokio::main] async fn main() { let db = bin::init(); - let feed = { - let provider = Arc::new(RootProvider::new( - ClientBuilder::default().transport(SimpleRequest::new(bin::url()), true), - )); - Rpc { provider } - }; + + let provider = Arc::new(RootProvider::new( + ClientBuilder::default().transport(SimpleRequest::new(bin::url()), true), + )); + let chain_id = loop { - match feed.provider.get_chain_id().await { + match provider.get_chain_id().await { Ok(chain_id) => break U256::try_from(chain_id).unwrap(), Err(e) => { log::error!("couldn't connect to the Ethereum node for the chain ID: {e:?}"); @@ -48,9 +47,9 @@ async fn main() { bin::main_loop::<_, KeyGenParams, _>( db, - feed.clone(), + Rpc { provider: provider.clone() }, Scheduler::new(SmartContract { chain_id }), - TransactionPublisher::new({ + TransactionPublisher::new(provider, { let relayer_hostname = env::var("ETHEREUM_RELAYER_HOSTNAME") .expect("ethereum relayer hostname wasn't specified") .to_string(); diff --git a/processor/ethereum/src/primitives/output.rs b/processor/ethereum/src/primitives/output.rs index 843f22f6..0f327921 100644 --- a/processor/ethereum/src/primitives/output.rs +++ b/processor/ethereum/src/primitives/output.rs @@ -1,6 +1,6 @@ use std::io; -use ciphersuite::{Ciphersuite, Secp256k1}; +use ciphersuite::{group::GroupEncoding, Ciphersuite, Secp256k1}; use alloy_core::primitives::U256; @@ -59,7 +59,10 @@ impl AsMut<[u8]> for OutputId { } #[derive(Clone, PartialEq, Eq, Debug)] -pub(crate) struct Output(pub(crate) EthereumInInstruction); +pub(crate) struct Output { + pub(crate) key: ::G, + pub(crate) instruction: EthereumInInstruction, +} impl ReceivedOutput<::G, Address> for Output { type Id = OutputId; type TransactionId = [u8; 32]; @@ -71,40 +74,43 @@ impl ReceivedOutput<::G, Address> for Output { fn id(&self) -> Self::Id { let mut id = [0; 40]; - id[.. 32].copy_from_slice(&self.0.id.0); - id[32 ..].copy_from_slice(&self.0.id.1.to_le_bytes()); + id[.. 32].copy_from_slice(&self.instruction.id.0); + id[32 ..].copy_from_slice(&self.instruction.id.1.to_le_bytes()); OutputId(id) } fn transaction_id(&self) -> Self::TransactionId { - self.0.id.0 + self.instruction.id.0 } fn key(&self) -> ::G { - todo!("TODO") + self.key } fn presumed_origin(&self) -> Option
{ - Some(Address::from(self.0.from)) + Some(Address::from(self.instruction.from)) } fn balance(&self) -> Balance { - let coin = coin_to_serai_coin(&self.0.coin).unwrap_or_else(|| { + let coin = coin_to_serai_coin(&self.instruction.coin).unwrap_or_else(|| { panic!( "mapping coin from an EthereumInInstruction with coin {}, which we don't handle.", "this never should have been yielded" ) }); - Balance { coin, amount: amount_to_serai_amount(coin, self.0.amount) } + Balance { coin, amount: amount_to_serai_amount(coin, self.instruction.amount) } } fn data(&self) -> &[u8] { - &self.0.data + &self.instruction.data } fn write(&self, writer: &mut W) -> io::Result<()> { - self.0.write(writer) + writer.write_all(self.key.to_bytes().as_ref())?; + self.instruction.write(writer) } fn read(reader: &mut R) -> io::Result { - EthereumInInstruction::read(reader).map(Self) + let key = Secp256k1::read_G(reader)?; + let instruction = EthereumInInstruction::read(reader)?; + Ok(Self { key, instruction }) } } diff --git a/processor/ethereum/src/publisher.rs b/processor/ethereum/src/publisher.rs index 1874e556..cc9c1f5f 100644 --- a/processor/ethereum/src/publisher.rs +++ b/processor/ethereum/src/publisher.rs @@ -1,34 +1,68 @@ use core::future::Future; +use std::sync::Arc; -use crate::transaction::Transaction; +use alloy_transport::{TransportErrorKind, RpcError}; +use alloy_simple_request_transport::SimpleRequest; +use alloy_provider::RootProvider; + +use tokio::sync::{RwLockReadGuard, RwLock}; + +use ethereum_schnorr::PublicKey; +use ethereum_router::{OutInstructions, Router}; + +use crate::transaction::{Action, Transaction}; #[derive(Clone)] pub(crate) struct TransactionPublisher { + initial_serai_key: PublicKey, + rpc: Arc>, + router: Arc>>, relayer_url: String, } impl TransactionPublisher { - pub(crate) fn new(relayer_url: String) -> Self { - Self { relayer_url } + pub(crate) fn new(rpc: Arc>, relayer_url: String) -> Self { + Self { initial_serai_key: todo!("TODO"), rpc, router: Arc::new(RwLock::new(None)), relayer_url } + } + + // This will always return Ok(Some(_)) or Err(_), never Ok(None) + async fn router(&self) -> Result>, RpcError> { + let router = self.router.read().await; + + // If the router is None, find it on-chain + if router.is_none() { + drop(router); + let mut router = self.router.write().await; + // Check again if it's None in case a different task already did this + if router.is_none() { + let Some(router_actual) = Router::new(self.rpc.clone(), &self.initial_serai_key).await? else { + Err(TransportErrorKind::Custom("publishing transaction yet couldn't find router on chain. was our node reset?".to_string().into()))? + }; + *router = Some(router_actual); + } + return Ok(router.downgrade()); + } + + Ok(router) } } impl signers::TransactionPublisher for TransactionPublisher { - type EphemeralError = (); + type EphemeralError = RpcError; fn publish( &self, tx: Transaction, ) -> impl Send + Future> { - // Convert from an Action (an internal representation of a signable event) to a TxLegacy - /* TODO - match tx.0 { - Action::SetKey { chain_id: _, nonce: _, key } => self.router.update_serai_key(key, tx.1), - Action::Batch { chain_id: _, nonce: _, outs } => self.router.execute(outs, tx.1), - } - */ - async move { + // Convert from an Action (an internal representation of a signable event) to a TxLegacy + let router = self.router().await?; + let router = router.as_ref().unwrap(); + let tx = match tx.0 { + Action::SetKey { chain_id: _, nonce: _, key } => router.update_serai_key(&key, &tx.1), + Action::Batch { chain_id: _, nonce: _, outs } => router.execute(OutInstructions::from(outs.as_ref()), &tx.1), + }; + /* use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, diff --git a/processor/ethereum/src/scheduler.rs b/processor/ethereum/src/scheduler.rs index ca636b5b..6683eeac 100644 --- a/processor/ethereum/src/scheduler.rs +++ b/processor/ethereum/src/scheduler.rs @@ -68,6 +68,7 @@ impl smart_contract_scheduler::SmartContract for SmartContract { // TODO: Per-batch gas limit // TODO: Create several batches + // TODO: Handle fees let action = Action::Batch { chain_id: self.chain_id, nonce, outs }; vec![(action.clone(), action.eventuality())]