Ethereum relayer server

Causes send test to pass for the processor.
This commit is contained in:
Luke Parker
2024-05-22 18:50:11 -04:00
parent ac709b2945
commit 1d2beb3ee4
20 changed files with 416 additions and 44 deletions

View File

@@ -28,7 +28,7 @@ pub fn processor_instance(
network: NetworkId,
port: u32,
message_queue_key: <Ristretto as Ciphersuite>::F,
) -> TestBodySpecification {
) -> Vec<TestBodySpecification> {
let mut entropy = [0; 32];
OsRng.fill_bytes(&mut entropy);
@@ -41,7 +41,7 @@ pub fn processor_instance(
let image = format!("{network_str}-processor");
serai_docker_tests::build(image.clone());
TestBodySpecification::with_image(
let mut res = vec![TestBodySpecification::with_image(
Image::with_repository(format!("serai-dev-{image}")).pull_policy(PullPolicy::Never),
)
.replace_env(
@@ -55,10 +55,30 @@ pub fn processor_instance(
("RUST_LOG".to_string(), "serai_processor=trace,".to_string()),
]
.into(),
)
)];
if network == NetworkId::Ethereum {
serai_docker_tests::build("ethereum-relayer".to_string());
res.push(
TestBodySpecification::with_image(
Image::with_repository("serai-dev-ethereum-relayer".to_string())
.pull_policy(PullPolicy::Never),
)
.replace_env(
[
("DB_PATH".to_string(), "./ethereum-relayer-db".to_string()),
("RUST_LOG".to_string(), "serai_ethereum_relayer=trace,".to_string()),
]
.into(),
)
.set_publish_all_ports(true),
);
}
res
}
pub type Handles = (String, String, String);
pub type Handles = (String, String, String, String);
pub fn processor_stack(
network: NetworkId,
network_hostname_override: Option<String>,
@@ -68,7 +88,7 @@ pub fn processor_stack(
let (coord_key, message_queue_keys, message_queue_composition) =
serai_message_queue_tests::instance();
let processor_composition =
let mut processor_compositions =
processor_instance(network, network_rpc_port, message_queue_keys[&network]);
// Give every item in this stack a unique ID
@@ -84,7 +104,7 @@ pub fn processor_stack(
let mut compositions = vec![];
let mut handles = vec![];
for (name, composition) in [
(
Some((
match network {
NetworkId::Serai => unreachable!(),
NetworkId::Bitcoin => "bitcoin",
@@ -92,10 +112,14 @@ pub fn processor_stack(
NetworkId::Monero => "monero",
},
network_composition,
),
("message_queue", message_queue_composition),
("processor", processor_composition),
] {
)),
Some(("message_queue", message_queue_composition)),
Some(("processor", processor_compositions.remove(0))),
processor_compositions.pop().map(|composition| ("relayer", composition)),
]
.into_iter()
.flatten()
{
let handle = format!("processor-{name}-{unique_id}");
compositions.push(
composition.set_start_policy(StartPolicy::Strict).set_handle(handle.clone()).set_log_options(
@@ -113,14 +137,27 @@ pub fn processor_stack(
handles.push(handle);
}
let processor_composition = compositions.last_mut().unwrap();
let processor_composition = compositions.get_mut(2).unwrap();
processor_composition.inject_container_name(
network_hostname_override.unwrap_or_else(|| handles[0].clone()),
"NETWORK_RPC_HOSTNAME",
);
if let Some(hostname) = handles.get(3) {
processor_composition.inject_container_name(hostname, "ETHEREUM_RELAYER_HOSTNAME");
processor_composition.modify_env("ETHEREUM_RELAYER_PORT", "20830");
}
processor_composition.inject_container_name(handles[1].clone(), "MESSAGE_QUEUE_RPC");
((handles[0].clone(), handles[1].clone(), handles[2].clone()), coord_key, compositions)
(
(
handles[0].clone(),
handles[1].clone(),
handles[2].clone(),
handles.get(3).cloned().unwrap_or(String::new()),
),
coord_key,
compositions,
)
}
#[derive(serde::Deserialize, Debug)]
@@ -134,6 +171,7 @@ pub struct Coordinator {
message_queue_handle: String,
#[allow(unused)]
processor_handle: String,
relayer_handle: String,
next_send_id: u64,
next_recv_id: u64,
@@ -144,7 +182,7 @@ impl Coordinator {
pub fn new(
network: NetworkId,
ops: &DockerOperations,
handles: (String, String, String),
handles: Handles,
coord_key: <Ristretto as Ciphersuite>::F,
) -> Coordinator {
let rpc = ops.handle(&handles.1).host_port(2287).unwrap();
@@ -156,6 +194,7 @@ impl Coordinator {
network_handle: handles.0,
message_queue_handle: handles.1,
processor_handle: handles.2,
relayer_handle: handles.3,
next_send_id: 0,
next_recv_id: 0,
@@ -508,7 +547,7 @@ impl Coordinator {
}
}
pub async fn publish_transacton(&self, ops: &DockerOperations, tx: &[u8]) {
pub async fn publish_transaction(&self, ops: &DockerOperations, tx: &[u8]) {
let rpc_url = network_rpc(self.network, ops, &self.network_handle);
match self.network {
NetworkId::Bitcoin => {
@@ -545,6 +584,14 @@ impl Coordinator {
}
}
pub async fn publish_eventuality_completion(&self, ops: &DockerOperations, tx: &[u8]) {
match self.network {
NetworkId::Bitcoin | NetworkId::Monero => self.publish_transaction(ops, tx).await,
NetworkId::Ethereum => (),
NetworkId::Serai => panic!("processor tests broadcasting block to Serai"),
}
}
pub async fn get_published_transaction(
&self,
ops: &DockerOperations,
@@ -575,14 +622,7 @@ impl Coordinator {
}
}
NetworkId::Ethereum => {
use ethereum_serai::alloy::{
consensus::{TxLegacy, Signed},
simple_request_transport::SimpleRequest,
rpc_client::ClientBuilder,
provider::{Provider, RootProvider},
network::Ethereum,
};
/*
let provider = RootProvider::<_, Ethereum>::new(
ClientBuilder::default().transport(SimpleRequest::new(rpc_url.clone()), true),
);
@@ -593,6 +633,43 @@ impl Coordinator {
let mut bytes = vec![];
tx.encode_with_signature_fields(&sig, &mut bytes);
Some(bytes)
*/
// This is being passed a signature. We need to check the relayer has a TX with this
// signature
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
};
let (ip, port) = ops.handle(&self.relayer_handle).host_port(20831).unwrap();
let relayer_url = format!("{ip}:{port}");
let mut socket = TcpStream::connect(&relayer_url).await.unwrap();
// Iterate over every published command
for i in 1 .. u32::MAX {
socket.write_all(&i.to_le_bytes()).await.unwrap();
let mut recvd_len = [0; 4];
socket.read_exact(&mut recvd_len).await.unwrap();
if recvd_len == [0; 4] {
break;
}
let mut msg = vec![0; usize::try_from(u32::from_le_bytes(recvd_len)).unwrap()];
socket.read_exact(&mut msg).await.unwrap();
for start_pos in 0 .. msg.len() {
if (start_pos + tx.len()) > msg.len() {
break;
}
if &msg[start_pos .. (start_pos + tx.len())] == tx {
return Some(msg);
}
}
}
None
}
NetworkId::Monero => {
use monero_serai::rpc::HttpRpc;