diff --git a/Cargo.lock b/Cargo.lock index 09d8e704..d1bb6cd0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4884,6 +4884,8 @@ dependencies = [ "group", "hex", "hex-literal", + "hyper", + "hyper-rustls", "modular-frost", "monero-generators", "multiexp", @@ -4892,7 +4894,6 @@ dependencies = [ "rand_chacha", "rand_core", "rand_distr", - "reqwest", "serde", "serde_json", "sha3", diff --git a/coins/monero/Cargo.toml b/coins/monero/Cargo.toml index 982a939e..910b152c 100644 --- a/coins/monero/Cargo.toml +++ b/coins/monero/Cargo.toml @@ -54,12 +54,11 @@ serde_json = { version = "1", default-features = false, features = ["alloc"] } base58-monero = { version = "2", default-features = false, features = ["check"] } -# Used for the provided RPC +# Used for the provided HTTP RPC digest_auth = { version = "0.3", optional = true } -reqwest = { version = "0.11", features = ["json"], optional = true } - -# Used for the binaries -tokio = { version = "1", features = ["rt-multi-thread", "macros"], optional = true } +hyper = { version = "0.14", default-features = false, features = ["http1", "tcp", "client", "backports", "deprecated"], optional = true } +hyper-rustls = { version = "0.24", default-features = false, features = ["http1", "native-tokio"], optional = true } +tokio = { version = "1", default-features = false, optional = true } [build-dependencies] dalek-ff-group = { path = "../../crypto/dalek-ff-group", version = "0.4", default-features = false } @@ -99,9 +98,9 @@ std = [ "base58-monero/std", ] -http_rpc = ["digest_auth", "reqwest"] +http-rpc = ["digest_auth", "hyper", "hyper-rustls", "tokio/time", "tokio/rt"] multisig = ["transcript", "frost", "dleq", "std"] -binaries = ["tokio"] +binaries = ["tokio/rt-multi-thread", "tokio/macros", "http-rpc"] experimental = [] -default = ["std", "http_rpc"] +default = ["std", "http-rpc"] diff --git a/coins/monero/src/rpc/http.rs b/coins/monero/src/rpc/http.rs index 8c24762a..b19c1da6 100644 --- a/coins/monero/src/rpc/http.rs +++ b/coins/monero/src/rpc/http.rs @@ -1,22 +1,26 @@ use async_trait::async_trait; use digest_auth::AuthContext; -use reqwest::Client; +use hyper::{header::HeaderValue, Request, service::Service, client::connect::HttpConnector, Client}; +use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder}; use crate::rpc::{RpcError, RpcConnection, Rpc}; #[derive(Clone, Debug)] enum Authentication { // If unauthenticated, reuse a single client - Unauthenticated(Client), + Unauthenticated(Client>), // If authenticated, don't reuse clients so that each connection makes its own connection // This ensures that if a nonce is requested, another caller doesn't make a request invalidating // it // We could acquire a mutex over the client, yet creating a new client is preferred for the // possibility of parallelism - Authenticated(String, String), + Authenticated(HttpsConnector, String, String), } +/// An HTTP(S) transport for the RPC. +/// +/// Requires tokio. #[derive(Clone, Debug)] pub struct HttpRpc { authentication: Authentication, @@ -29,12 +33,15 @@ impl HttpRpc { /// A daemon requiring authentication can be used via including the username and password in the /// URL. pub fn new(mut url: String) -> Result, RpcError> { + let https_builder = + HttpsConnectorBuilder::new().with_native_roots().https_or_http().enable_http1().build(); + let authentication = if url.contains('@') { // Parse out the username and password let url_clone = url; let split_url = url_clone.split('@').collect::>(); if split_url.len() != 2 { - Err(RpcError::InvalidNode)?; + Err(RpcError::ConnectionError)?; } let mut userpass = split_url[0]; url = split_url[1].to_string(); @@ -43,7 +50,7 @@ impl HttpRpc { if userpass.contains("://") { let split_userpass = userpass.split("://").collect::>(); if split_userpass.len() != 2 { - Err(RpcError::InvalidNode)?; + Err(RpcError::ConnectionError)?; } url = split_userpass[0].to_string() + "://" + &url; userpass = split_userpass[1]; @@ -51,61 +58,136 @@ impl HttpRpc { let split_userpass = userpass.split(':').collect::>(); if split_userpass.len() != 2 { - Err(RpcError::InvalidNode)?; + Err(RpcError::ConnectionError)?; } - Authentication::Authenticated(split_userpass[0].to_string(), split_userpass[1].to_string()) + Authentication::Authenticated( + https_builder, + split_userpass[0].to_string(), + split_userpass[1].to_string(), + ) } else { - Authentication::Unauthenticated(Client::new()) + Authentication::Unauthenticated(Client::builder().build(https_builder)) }; Ok(Rpc(HttpRpc { authentication, url })) } } -#[async_trait] -impl RpcConnection for HttpRpc { - async fn post(&self, route: &str, body: Vec) -> Result, RpcError> { - #[allow(unused_assignments)] // False positive - let mut client_storage = None; - let client = match &self.authentication { - Authentication::Unauthenticated(client) => client, - Authentication::Authenticated(_, _) => { - client_storage = Some(Client::new()); - client_storage.as_ref().unwrap() +impl HttpRpc { + async fn inner_post(&self, route: &str, body: Vec) -> Result, RpcError> { + let request = |uri| { + Request::post(uri) + .header(hyper::header::HOST, { + let mut host = self.url.clone(); + if let Some(protocol_pos) = host.find("://") { + host.drain(0 .. (protocol_pos + 3)); + } + host + }) + .body(body.clone().into()) + .unwrap() + }; + + let mut connection_task_handle = None; + let response = match &self.authentication { + Authentication::Unauthenticated(client) => client + .request(request(self.url.clone() + "/" + route)) + .await + .map_err(|_| RpcError::ConnectionError)?, + Authentication::Authenticated(https_builder, user, pass) => { + let connection = https_builder + .clone() + .call(self.url.parse().map_err(|_| RpcError::ConnectionError)?) + .await + .map_err(|_| RpcError::ConnectionError)?; + let (mut requester, connection) = hyper::client::conn::http1::handshake(connection) + .await + .map_err(|_| RpcError::ConnectionError)?; + let connection_task = tokio::spawn(connection); + connection_task_handle = Some(connection_task.abort_handle()); + + let mut response = requester + .send_request(request("/".to_string() + route)) + .await + .map_err(|_| RpcError::ConnectionError)?; + // Only provide authentication if this daemon actually expects it + if let Some(header) = response.headers().get("www-authenticate") { + let mut request = request("/".to_string() + route); + request.headers_mut().insert( + "Authorization", + HeaderValue::from_str( + &digest_auth::parse( + header + .to_str() + .map_err(|_| RpcError::InvalidNode("www-authenticate header wasn't a string"))?, + ) + .map_err(|_| RpcError::InvalidNode("invalid digest-auth response"))? + .respond(&AuthContext::new_post::<_, _, _, &[u8]>( + user, + pass, + "/".to_string() + route, + None, + )) + .map_err(|_| RpcError::InvalidNode("couldn't respond to digest-auth challenge"))? + .to_header_string(), + ) + .unwrap(), + ); + + // Wait for the connection to be ready again + requester.ready().await.map_err(|_| RpcError::ConnectionError)?; + + // Make the request with the response challenge + response = + requester.send_request(request).await.map_err(|_| RpcError::ConnectionError)?; + } + + response } }; - let mut builder = client.post(self.url.clone() + "/" + route).body(body); - if let Authentication::Authenticated(user, pass) = &self.authentication { - let req = client.post(&self.url).send().await.map_err(|_| RpcError::InvalidNode)?; - // Only provide authentication if this daemon actually expects it - if let Some(header) = req.headers().get("www-authenticate") { - builder = builder.header( - "Authorization", - digest_auth::parse(header.to_str().map_err(|_| RpcError::InvalidNode)?) - .map_err(|_| RpcError::InvalidNode)? - .respond(&AuthContext::new_post::<_, _, _, &[u8]>( - user, - pass, - "/".to_string() + route, - None, - )) - .map_err(|_| RpcError::InvalidNode)? - .to_header_string(), - ); - } + /* + let length = usize::try_from( + response + .headers() + .get("content-length") + .ok_or(RpcError::InvalidNode("no content-length header"))? + .to_str() + .map_err(|_| RpcError::InvalidNode("non-ascii content-length value"))? + .parse::() + .map_err(|_| RpcError::InvalidNode("non-u32 content-length value"))?, + ) + .unwrap(); + // Only pre-allocate 1 MB so a malicious node which claims a content-length of 1 GB actually + // has to send 1 GB of data to cause a 1 GB allocation + let mut res = Vec::with_capacity(length.max(1024 * 1024)); + let mut body = response.into_body(); + while res.len() < length { + let Some(data) = body.data().await else { break }; + res.extend(data.map_err(|_| RpcError::ConnectionError)?.as_ref()); + } + */ + + let res = hyper::body::to_bytes(response.into_body()) + .await + .map_err(|_| RpcError::ConnectionError)? + .to_vec(); + + if let Some(connection_task) = connection_task_handle { + // Clean up the connection task + connection_task.abort(); } - Ok( - builder - .send() - .await - .map_err(|_| RpcError::ConnectionError)? - .bytes() - .await - .map_err(|_| RpcError::ConnectionError)? - .slice(..) - .to_vec(), - ) + Ok(res) + } +} + +#[async_trait] +impl RpcConnection for HttpRpc { + async fn post(&self, route: &str, body: Vec) -> Result, RpcError> { + // TODO: Make this timeout configurable + tokio::time::timeout(core::time::Duration::from_secs(30), self.inner_post(route, body)) + .await + .map_err(|_| RpcError::ConnectionError)? } } diff --git a/coins/monero/src/rpc/mod.rs b/coins/monero/src/rpc/mod.rs index cfe4e4b8..0a220984 100644 --- a/coins/monero/src/rpc/mod.rs +++ b/coins/monero/src/rpc/mod.rs @@ -22,9 +22,9 @@ use crate::{ wallet::{FeePriority, Fee}, }; -#[cfg(feature = "http_rpc")] +#[cfg(feature = "http-rpc")] mod http; -#[cfg(feature = "http_rpc")] +#[cfg(feature = "http-rpc")] pub use http::*; // Number of blocks the fee estimate will be valid for @@ -59,8 +59,8 @@ pub enum RpcError { InternalError(&'static str), #[cfg_attr(feature = "std", error("connection error"))] ConnectionError, - #[cfg_attr(feature = "std", error("invalid node"))] - InvalidNode, + #[cfg_attr(feature = "std", error("invalid node ({0})"))] + InvalidNode(&'static str), #[cfg_attr(feature = "std", error("unsupported protocol version ({0})"))] UnsupportedProtocol(usize), #[cfg_attr(feature = "std", error("transactions not found"))] @@ -78,11 +78,11 @@ pub enum RpcError { } fn rpc_hex(value: &str) -> Result, RpcError> { - hex::decode(value).map_err(|_| RpcError::InvalidNode) + hex::decode(value).map_err(|_| RpcError::InvalidNode("expected hex wasn't hex")) } fn hash_hex(hash: &str) -> Result<[u8; 32], RpcError> { - rpc_hex(hash)?.try_into().map_err(|_| RpcError::InvalidNode) + rpc_hex(hash)?.try_into().map_err(|_| RpcError::InvalidNode("hash wasn't 32-bytes")) } fn rpc_point(point: &str) -> Result { @@ -145,9 +145,9 @@ impl Rpc { ) .await?, ) - .map_err(|_| RpcError::InvalidNode)?, + .map_err(|_| RpcError::InvalidNode("response wasn't utf-8"))?, ) - .map_err(|_| RpcError::InvalidNode) + .map_err(|_| RpcError::InvalidNode("response wasn't json")) } /// Perform a JSON-RPC call with the specified method with the provided parameters @@ -256,7 +256,7 @@ impl Rpc { // This does run a few keccak256 hashes, which is pointless if the node is trusted // In exchange, this provides resilience against invalid/malicious nodes if tx.hash() != hashes[i] { - Err(RpcError::InvalidNode)?; + Err(RpcError::InvalidNode("replied with transaction wasn't the requested transaction"))?; } Ok(tx) @@ -282,7 +282,7 @@ impl Rpc { let header: BlockHeaderByHeightResponse = self.json_rpc_call("get_block_header_by_height", Some(json!({ "height": number }))).await?; - rpc_hex(&header.block_header.hash)?.try_into().map_err(|_| RpcError::InvalidNode) + hash_hex(&header.block_header.hash) } /// Get a block from the node by its hash. @@ -296,10 +296,10 @@ impl Rpc { let res: BlockResponse = self.json_rpc_call("get_block", Some(json!({ "hash": hex::encode(hash) }))).await?; - let block = - Block::read::<&[u8]>(&mut rpc_hex(&res.blob)?.as_ref()).map_err(|_| RpcError::InvalidNode)?; + let block = Block::read::<&[u8]>(&mut rpc_hex(&res.blob)?.as_ref()) + .map_err(|_| RpcError::InvalidNode("invalid block"))?; if block.hash() != hash { - Err(RpcError::InvalidNode)?; + Err(RpcError::InvalidNode("different block than requested (hash)"))?; } Ok(block) } @@ -313,10 +313,12 @@ impl Rpc { if usize::try_from(*actual).unwrap() == number { Ok(block) } else { - Err(RpcError::InvalidNode) + Err(RpcError::InvalidNode("different block than requested (number)")) } } - _ => Err(RpcError::InvalidNode), + _ => { + Err(RpcError::InvalidNode("block's miner_tx didn't have an input of kind Input::Gen")) + } } } e => e, @@ -496,7 +498,7 @@ impl Rpc { read_object(&mut indexes) })() - .map_err(|_| RpcError::InvalidNode) + .map_err(|_| RpcError::InvalidNode("invalid binary response")) } /// Get the output distribution, from the specified height to the specified height (both @@ -569,11 +571,7 @@ impl Rpc { let txs = self .get_transactions( - &outs - .outs - .iter() - .map(|out| rpc_hex(&out.txid)?.try_into().map_err(|_| RpcError::InvalidNode)) - .collect::, _>>()?, + &outs.outs.iter().map(|out| hash_hex(&out.txid)).collect::, _>>()?, ) .await?; @@ -589,10 +587,7 @@ impl Rpc { // invalid keys may honestly exist on the blockchain // Only a recent hard fork checked output keys were valid points let Some(key) = CompressedEdwardsY( - hex::decode(&out.key) - .map_err(|_| RpcError::InvalidNode)? - .try_into() - .map_err(|_| RpcError::InvalidNode)?, + rpc_hex(&out.key)?.try_into().map_err(|_| RpcError::InvalidNode("non-32-byte point"))?, ) .decompress() else { return Ok(None); @@ -736,7 +731,7 @@ impl Rpc { let mut blocks = Vec::with_capacity(block_strs.len()); for block in block_strs { - blocks.push(rpc_hex(&block)?.try_into().map_err(|_| RpcError::InvalidNode)?); + blocks.push(hash_hex(&block)?); } Ok(blocks) } diff --git a/coins/monero/src/wallet/scan.rs b/coins/monero/src/wallet/scan.rs index 89954b98..aa2bd5c5 100644 --- a/coins/monero/src/wallet/scan.rs +++ b/coins/monero/src/wallet/scan.rs @@ -233,7 +233,9 @@ impl SpendableOutput { .get_o_indexes(self.output.absolute.tx) .await? .get(usize::from(self.output.absolute.o)) - .ok_or(RpcError::InvalidNode)?; + .ok_or(RpcError::InvalidNode( + "node returned output indexes didn't include an index for this output", + ))?; Ok(()) } diff --git a/processor/src/networks/monero.rs b/processor/src/networks/monero.rs index a89f9e0b..f983c524 100644 --- a/processor/src/networks/monero.rs +++ b/processor/src/networks/monero.rs @@ -181,6 +181,13 @@ impl PartialEq for Monero { } impl Eq for Monero {} +fn map_rpc_err(err: RpcError) -> NetworkError { + if let RpcError::InvalidNode(reason) = &err { + log::error!("Monero RpcError::InvalidNode({reason})"); + } + NetworkError::ConnectionError +} + impl Monero { pub fn new(url: String) -> Monero { Monero { rpc: HttpRpc::new(url).unwrap() } @@ -259,7 +266,7 @@ impl Monero { } // Check a fork hasn't occurred which this processor hasn't been updated for - assert_eq!(protocol, self.rpc.get_protocol().await.map_err(|_| NetworkError::ConnectionError)?); + assert_eq!(protocol, self.rpc.get_protocol().await.map_err(map_rpc_err)?); let spendable_outputs = inputs.iter().map(|input| input.0.clone()).collect::>(); @@ -277,7 +284,7 @@ impl Monero { &spendable_outputs, ) .await - .map_err(|_| NetworkError::ConnectionError)?; + .map_err(map_rpc_err)?; let inputs = spendable_outputs.into_iter().zip(decoys).collect::>(); @@ -348,7 +355,7 @@ impl Monero { } TransactionError::RpcError(e) => { log::error!("RpcError when preparing transaction: {e:?}"); - Err(NetworkError::ConnectionError) + Err(map_rpc_err(e)) } }, } @@ -421,18 +428,16 @@ impl Network for Monero { async fn get_latest_block_number(&self) -> Result { // Monero defines height as chain length, so subtract 1 for block number - Ok(self.rpc.get_height().await.map_err(|_| NetworkError::ConnectionError)? - 1) + Ok(self.rpc.get_height().await.map_err(map_rpc_err)? - 1) } async fn get_block(&self, number: usize) -> Result { Ok( self .rpc - .get_block( - self.rpc.get_block_hash(number).await.map_err(|_| NetworkError::ConnectionError)?, - ) + .get_block(self.rpc.get_block_hash(number).await.map_err(map_rpc_err)?) .await - .map_err(|_| NetworkError::ConnectionError)?, + .map_err(map_rpc_err)?, ) } @@ -602,7 +607,7 @@ impl Network for Monero { } async fn get_transaction(&self, id: &[u8; 32]) -> Result { - self.rpc.get_transaction(*id).await.map_err(|_| NetworkError::ConnectionError) + self.rpc.get_transaction(*id).await.map_err(map_rpc_err) } fn confirm_completion(&self, eventuality: &Eventuality, tx: &Transaction) -> bool {