diff --git a/networks/ethereum/relayer/README.md b/networks/ethereum/relayer/README.md index beed4b72..fc2d36fd 100644 --- a/networks/ethereum/relayer/README.md +++ b/networks/ethereum/relayer/README.md @@ -1,4 +1,4 @@ # Ethereum Transaction Relayer -This server collects Ethereum router commands to be published, offering an RPC -to fetch them. +This server collects Ethereum transactions to be published, offering an RPC to +fetch them. diff --git a/networks/ethereum/relayer/src/main.rs b/networks/ethereum/relayer/src/main.rs index 54593004..f5a7e0f9 100644 --- a/networks/ethereum/relayer/src/main.rs +++ b/networks/ethereum/relayer/src/main.rs @@ -40,8 +40,8 @@ async fn main() { db }; - // Start command recipience server - // This should not be publicly exposed + // Start transaction recipience server + // This MUST NOT be publicly exposed // TODO: Add auth tokio::spawn({ let db = db.clone(); @@ -58,25 +58,27 @@ async fn main() { let mut buf = vec![0; usize::try_from(msg_len).unwrap()]; let Ok(_) = socket.read_exact(&mut buf).await else { break }; - if buf.len() < 5 { + if buf.len() < (4 + 1) { break; } let nonce = u32::from_le_bytes(buf[.. 4].try_into().unwrap()); let mut txn = db.txn(); + // Save the transaction txn.put(nonce.to_le_bytes(), &buf[4 ..]); txn.commit(); let Ok(()) = socket.write_all(&[1]).await else { break }; - log::info!("received signed command #{nonce}"); + log::info!("received transaction to publish (nonce {nonce})"); } }); } } }); - // Start command fetch server + // Start transaction fetch server // 5132 ^ ((b'E' << 8) | b'R') + 1 + // TODO: JSON-RPC server which returns this as JSON? let server = TcpListener::bind("0.0.0.0:20831").await.unwrap(); loop { let (mut socket, _) = server.accept().await.unwrap(); @@ -84,16 +86,16 @@ async fn main() { tokio::spawn(async move { let db = db.clone(); loop { - // Nonce to get the router comamnd for + // Nonce to get the unsigned transaction for let mut buf = vec![0; 4]; let Ok(_) = socket.read_exact(&mut buf).await else { break }; - let command = db.get(&buf[.. 4]).unwrap_or(vec![]); - let Ok(()) = socket.write_all(&u32::try_from(command.len()).unwrap().to_le_bytes()).await + let transaction = db.get(&buf[.. 4]).unwrap_or(vec![]); + let Ok(()) = socket.write_all(&u32::try_from(transaction.len()).unwrap().to_le_bytes()).await else { break; }; - let Ok(()) = socket.write_all(&command).await else { break }; + let Ok(()) = socket.write_all(&transaction).await else { break }; } }); } diff --git a/processor/ethereum/src/publisher.rs b/processor/ethereum/src/publisher.rs index d133768b..03b1d24c 100644 --- a/processor/ethereum/src/publisher.rs +++ b/processor/ethereum/src/publisher.rs @@ -1,11 +1,17 @@ use core::future::Future; use std::sync::Arc; +use alloy_rlp::Encodable; + use alloy_transport::{TransportErrorKind, RpcError}; use alloy_simple_request_transport::SimpleRequest; use alloy_provider::RootProvider; -use tokio::sync::{RwLockReadGuard, RwLock}; +use tokio::{ + sync::{RwLockReadGuard, RwLock}, + io::{AsyncReadExt, AsyncWriteExt}, + net::TcpStream, +}; use ethereum_schnorr::PublicKey; use ethereum_router::{OutInstructions, Router}; @@ -62,9 +68,11 @@ impl signers::TransactionPublisher for TransactionPublisher { tx: Transaction, ) -> impl Send + Future> { async move { - // Convert from an Action (an internal representation of a signable event) to a TxLegacy let router = self.router().await?; let router = router.as_ref().unwrap(); + + let nonce = tx.0.nonce(); + // Convert from an Action (an internal representation of a signable event) to a TxLegacy let tx = match tx.0 { Action::SetKey { chain_id: _, nonce: _, key } => router.update_serai_key(&key, &tx.1), Action::Batch { chain_id: _, nonce: _, outs } => { @@ -72,40 +80,33 @@ impl signers::TransactionPublisher for TransactionPublisher { } }; - /* - use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, - net::TcpStream, - }; - - let mut msg = vec![]; - match completion.command() { - RouterCommand::UpdateSeraiKey { nonce, .. } | RouterCommand::Execute { nonce, .. } => { - msg.extend(&u32::try_from(nonce).unwrap().to_le_bytes()); - } - } - completion.write(&mut msg).unwrap(); + // Nonce + let mut msg = nonce.to_le_bytes().to_vec(); + // Transaction + tx.encode(&mut msg); let Ok(mut socket) = TcpStream::connect(&self.relayer_url).await else { - log::warn!("couldn't connect to the relayer server"); - Err(NetworkError::ConnectionError)? + Err(TransportErrorKind::Custom( + "couldn't connect to the relayer server".to_string().into(), + ))? }; let Ok(()) = socket.write_all(&u32::try_from(msg.len()).unwrap().to_le_bytes()).await else { - log::warn!("couldn't send the message's len to the relayer server"); - Err(NetworkError::ConnectionError)? + Err(TransportErrorKind::Custom( + "couldn't send the message's len to the relayer server".to_string().into(), + ))? }; let Ok(()) = socket.write_all(&msg).await else { - log::warn!("couldn't write the message to the relayer server"); - Err(NetworkError::ConnectionError)? + Err(TransportErrorKind::Custom( + "couldn't write the message to the relayer server".to_string().into(), + ))? }; if socket.read_u8().await.ok() != Some(1) { - log::warn!("didn't get the ack from the relayer server"); - Err(NetworkError::ConnectionError)?; + Err(TransportErrorKind::Custom( + "didn't get the ack from the relayer server".to_string().into(), + ))?; } Ok(()) - */ - todo!("TODO") } } }