Replace reqwest with hyper in monero-serai

Ensures a connection pool isn't used behind-the-scenes, as necessitated by
authenticated connections.
This commit is contained in:
Luke Parker
2023-10-27 16:37:58 -04:00
parent 87fdc8ce35
commit 052ef39a25
6 changed files with 177 additions and 93 deletions

3
Cargo.lock generated
View File

@@ -4884,6 +4884,8 @@ dependencies = [
"group", "group",
"hex", "hex",
"hex-literal", "hex-literal",
"hyper",
"hyper-rustls",
"modular-frost", "modular-frost",
"monero-generators", "monero-generators",
"multiexp", "multiexp",
@@ -4892,7 +4894,6 @@ dependencies = [
"rand_chacha", "rand_chacha",
"rand_core", "rand_core",
"rand_distr", "rand_distr",
"reqwest",
"serde", "serde",
"serde_json", "serde_json",
"sha3", "sha3",

View File

@@ -54,12 +54,11 @@ serde_json = { version = "1", default-features = false, features = ["alloc"] }
base58-monero = { version = "2", default-features = false, features = ["check"] } base58-monero = { version = "2", default-features = false, features = ["check"] }
# Used for the provided RPC # Used for the provided HTTP RPC
digest_auth = { version = "0.3", optional = true } digest_auth = { version = "0.3", optional = true }
reqwest = { version = "0.11", features = ["json"], optional = true } hyper = { version = "0.14", default-features = false, features = ["http1", "tcp", "client", "backports", "deprecated"], optional = true }
hyper-rustls = { version = "0.24", default-features = false, features = ["http1", "native-tokio"], optional = true }
# Used for the binaries tokio = { version = "1", default-features = false, optional = true }
tokio = { version = "1", features = ["rt-multi-thread", "macros"], optional = true }
[build-dependencies] [build-dependencies]
dalek-ff-group = { path = "../../crypto/dalek-ff-group", version = "0.4", default-features = false } dalek-ff-group = { path = "../../crypto/dalek-ff-group", version = "0.4", default-features = false }
@@ -99,9 +98,9 @@ std = [
"base58-monero/std", "base58-monero/std",
] ]
http_rpc = ["digest_auth", "reqwest"] http-rpc = ["digest_auth", "hyper", "hyper-rustls", "tokio/time", "tokio/rt"]
multisig = ["transcript", "frost", "dleq", "std"] multisig = ["transcript", "frost", "dleq", "std"]
binaries = ["tokio"] binaries = ["tokio/rt-multi-thread", "tokio/macros", "http-rpc"]
experimental = [] experimental = []
default = ["std", "http_rpc"] default = ["std", "http-rpc"]

View File

@@ -1,22 +1,26 @@
use async_trait::async_trait; use async_trait::async_trait;
use digest_auth::AuthContext; use digest_auth::AuthContext;
use reqwest::Client; use hyper::{header::HeaderValue, Request, service::Service, client::connect::HttpConnector, Client};
use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
use crate::rpc::{RpcError, RpcConnection, Rpc}; use crate::rpc::{RpcError, RpcConnection, Rpc};
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
enum Authentication { enum Authentication {
// If unauthenticated, reuse a single client // If unauthenticated, reuse a single client
Unauthenticated(Client), Unauthenticated(Client<HttpsConnector<HttpConnector>>),
// If authenticated, don't reuse clients so that each connection makes its own connection // If authenticated, don't reuse clients so that each connection makes its own connection
// This ensures that if a nonce is requested, another caller doesn't make a request invalidating // This ensures that if a nonce is requested, another caller doesn't make a request invalidating
// it // it
// We could acquire a mutex over the client, yet creating a new client is preferred for the // We could acquire a mutex over the client, yet creating a new client is preferred for the
// possibility of parallelism // possibility of parallelism
Authenticated(String, String), Authenticated(HttpsConnector<HttpConnector>, String, String),
} }
/// An HTTP(S) transport for the RPC.
///
/// Requires tokio.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct HttpRpc { pub struct HttpRpc {
authentication: Authentication, authentication: Authentication,
@@ -29,12 +33,15 @@ impl HttpRpc {
/// A daemon requiring authentication can be used via including the username and password in the /// A daemon requiring authentication can be used via including the username and password in the
/// URL. /// URL.
pub fn new(mut url: String) -> Result<Rpc<HttpRpc>, RpcError> { pub fn new(mut url: String) -> Result<Rpc<HttpRpc>, RpcError> {
let https_builder =
HttpsConnectorBuilder::new().with_native_roots().https_or_http().enable_http1().build();
let authentication = if url.contains('@') { let authentication = if url.contains('@') {
// Parse out the username and password // Parse out the username and password
let url_clone = url; let url_clone = url;
let split_url = url_clone.split('@').collect::<Vec<_>>(); let split_url = url_clone.split('@').collect::<Vec<_>>();
if split_url.len() != 2 { if split_url.len() != 2 {
Err(RpcError::InvalidNode)?; Err(RpcError::ConnectionError)?;
} }
let mut userpass = split_url[0]; let mut userpass = split_url[0];
url = split_url[1].to_string(); url = split_url[1].to_string();
@@ -43,7 +50,7 @@ impl HttpRpc {
if userpass.contains("://") { if userpass.contains("://") {
let split_userpass = userpass.split("://").collect::<Vec<_>>(); let split_userpass = userpass.split("://").collect::<Vec<_>>();
if split_userpass.len() != 2 { if split_userpass.len() != 2 {
Err(RpcError::InvalidNode)?; Err(RpcError::ConnectionError)?;
} }
url = split_userpass[0].to_string() + "://" + &url; url = split_userpass[0].to_string() + "://" + &url;
userpass = split_userpass[1]; userpass = split_userpass[1];
@@ -51,61 +58,136 @@ impl HttpRpc {
let split_userpass = userpass.split(':').collect::<Vec<_>>(); let split_userpass = userpass.split(':').collect::<Vec<_>>();
if split_userpass.len() != 2 { if split_userpass.len() != 2 {
Err(RpcError::InvalidNode)?; Err(RpcError::ConnectionError)?;
} }
Authentication::Authenticated(split_userpass[0].to_string(), split_userpass[1].to_string()) Authentication::Authenticated(
https_builder,
split_userpass[0].to_string(),
split_userpass[1].to_string(),
)
} else { } else {
Authentication::Unauthenticated(Client::new()) Authentication::Unauthenticated(Client::builder().build(https_builder))
}; };
Ok(Rpc(HttpRpc { authentication, url })) Ok(Rpc(HttpRpc { authentication, url }))
} }
} }
#[async_trait] impl HttpRpc {
impl RpcConnection for HttpRpc { async fn inner_post(&self, route: &str, body: Vec<u8>) -> Result<Vec<u8>, RpcError> {
async fn post(&self, route: &str, body: Vec<u8>) -> Result<Vec<u8>, RpcError> { let request = |uri| {
#[allow(unused_assignments)] // False positive Request::post(uri)
let mut client_storage = None; .header(hyper::header::HOST, {
let client = match &self.authentication { let mut host = self.url.clone();
Authentication::Unauthenticated(client) => client, if let Some(protocol_pos) = host.find("://") {
Authentication::Authenticated(_, _) => { host.drain(0 .. (protocol_pos + 3));
client_storage = Some(Client::new()); }
client_storage.as_ref().unwrap() host
})
.body(body.clone().into())
.unwrap()
};
let mut connection_task_handle = None;
let response = match &self.authentication {
Authentication::Unauthenticated(client) => client
.request(request(self.url.clone() + "/" + route))
.await
.map_err(|_| RpcError::ConnectionError)?,
Authentication::Authenticated(https_builder, user, pass) => {
let connection = https_builder
.clone()
.call(self.url.parse().map_err(|_| RpcError::ConnectionError)?)
.await
.map_err(|_| RpcError::ConnectionError)?;
let (mut requester, connection) = hyper::client::conn::http1::handshake(connection)
.await
.map_err(|_| RpcError::ConnectionError)?;
let connection_task = tokio::spawn(connection);
connection_task_handle = Some(connection_task.abort_handle());
let mut response = requester
.send_request(request("/".to_string() + route))
.await
.map_err(|_| RpcError::ConnectionError)?;
// Only provide authentication if this daemon actually expects it
if let Some(header) = response.headers().get("www-authenticate") {
let mut request = request("/".to_string() + route);
request.headers_mut().insert(
"Authorization",
HeaderValue::from_str(
&digest_auth::parse(
header
.to_str()
.map_err(|_| RpcError::InvalidNode("www-authenticate header wasn't a string"))?,
)
.map_err(|_| RpcError::InvalidNode("invalid digest-auth response"))?
.respond(&AuthContext::new_post::<_, _, _, &[u8]>(
user,
pass,
"/".to_string() + route,
None,
))
.map_err(|_| RpcError::InvalidNode("couldn't respond to digest-auth challenge"))?
.to_header_string(),
)
.unwrap(),
);
// Wait for the connection to be ready again
requester.ready().await.map_err(|_| RpcError::ConnectionError)?;
// Make the request with the response challenge
response =
requester.send_request(request).await.map_err(|_| RpcError::ConnectionError)?;
}
response
} }
}; };
let mut builder = client.post(self.url.clone() + "/" + route).body(body); /*
if let Authentication::Authenticated(user, pass) = &self.authentication { let length = usize::try_from(
let req = client.post(&self.url).send().await.map_err(|_| RpcError::InvalidNode)?; response
// Only provide authentication if this daemon actually expects it .headers()
if let Some(header) = req.headers().get("www-authenticate") { .get("content-length")
builder = builder.header( .ok_or(RpcError::InvalidNode("no content-length header"))?
"Authorization", .to_str()
digest_auth::parse(header.to_str().map_err(|_| RpcError::InvalidNode)?) .map_err(|_| RpcError::InvalidNode("non-ascii content-length value"))?
.map_err(|_| RpcError::InvalidNode)? .parse::<u32>()
.respond(&AuthContext::new_post::<_, _, _, &[u8]>( .map_err(|_| RpcError::InvalidNode("non-u32 content-length value"))?,
user, )
pass, .unwrap();
"/".to_string() + route, // Only pre-allocate 1 MB so a malicious node which claims a content-length of 1 GB actually
None, // has to send 1 GB of data to cause a 1 GB allocation
)) let mut res = Vec::with_capacity(length.max(1024 * 1024));
.map_err(|_| RpcError::InvalidNode)? let mut body = response.into_body();
.to_header_string(), while res.len() < length {
); let Some(data) = body.data().await else { break };
} res.extend(data.map_err(|_| RpcError::ConnectionError)?.as_ref());
}
*/
let res = hyper::body::to_bytes(response.into_body())
.await
.map_err(|_| RpcError::ConnectionError)?
.to_vec();
if let Some(connection_task) = connection_task_handle {
// Clean up the connection task
connection_task.abort();
} }
Ok( Ok(res)
builder }
.send() }
.await
.map_err(|_| RpcError::ConnectionError)? #[async_trait]
.bytes() impl RpcConnection for HttpRpc {
.await async fn post(&self, route: &str, body: Vec<u8>) -> Result<Vec<u8>, RpcError> {
.map_err(|_| RpcError::ConnectionError)? // TODO: Make this timeout configurable
.slice(..) tokio::time::timeout(core::time::Duration::from_secs(30), self.inner_post(route, body))
.to_vec(), .await
) .map_err(|_| RpcError::ConnectionError)?
} }
} }

View File

@@ -22,9 +22,9 @@ use crate::{
wallet::{FeePriority, Fee}, wallet::{FeePriority, Fee},
}; };
#[cfg(feature = "http_rpc")] #[cfg(feature = "http-rpc")]
mod http; mod http;
#[cfg(feature = "http_rpc")] #[cfg(feature = "http-rpc")]
pub use http::*; pub use http::*;
// Number of blocks the fee estimate will be valid for // Number of blocks the fee estimate will be valid for
@@ -59,8 +59,8 @@ pub enum RpcError {
InternalError(&'static str), InternalError(&'static str),
#[cfg_attr(feature = "std", error("connection error"))] #[cfg_attr(feature = "std", error("connection error"))]
ConnectionError, ConnectionError,
#[cfg_attr(feature = "std", error("invalid node"))] #[cfg_attr(feature = "std", error("invalid node ({0})"))]
InvalidNode, InvalidNode(&'static str),
#[cfg_attr(feature = "std", error("unsupported protocol version ({0})"))] #[cfg_attr(feature = "std", error("unsupported protocol version ({0})"))]
UnsupportedProtocol(usize), UnsupportedProtocol(usize),
#[cfg_attr(feature = "std", error("transactions not found"))] #[cfg_attr(feature = "std", error("transactions not found"))]
@@ -78,11 +78,11 @@ pub enum RpcError {
} }
fn rpc_hex(value: &str) -> Result<Vec<u8>, RpcError> { fn rpc_hex(value: &str) -> Result<Vec<u8>, RpcError> {
hex::decode(value).map_err(|_| RpcError::InvalidNode) hex::decode(value).map_err(|_| RpcError::InvalidNode("expected hex wasn't hex"))
} }
fn hash_hex(hash: &str) -> Result<[u8; 32], RpcError> { fn hash_hex(hash: &str) -> Result<[u8; 32], RpcError> {
rpc_hex(hash)?.try_into().map_err(|_| RpcError::InvalidNode) rpc_hex(hash)?.try_into().map_err(|_| RpcError::InvalidNode("hash wasn't 32-bytes"))
} }
fn rpc_point(point: &str) -> Result<EdwardsPoint, RpcError> { fn rpc_point(point: &str) -> Result<EdwardsPoint, RpcError> {
@@ -145,9 +145,9 @@ impl<R: RpcConnection> Rpc<R> {
) )
.await?, .await?,
) )
.map_err(|_| RpcError::InvalidNode)?, .map_err(|_| RpcError::InvalidNode("response wasn't utf-8"))?,
) )
.map_err(|_| RpcError::InvalidNode) .map_err(|_| RpcError::InvalidNode("response wasn't json"))
} }
/// Perform a JSON-RPC call with the specified method with the provided parameters /// Perform a JSON-RPC call with the specified method with the provided parameters
@@ -256,7 +256,7 @@ impl<R: RpcConnection> Rpc<R> {
// This does run a few keccak256 hashes, which is pointless if the node is trusted // This does run a few keccak256 hashes, which is pointless if the node is trusted
// In exchange, this provides resilience against invalid/malicious nodes // In exchange, this provides resilience against invalid/malicious nodes
if tx.hash() != hashes[i] { if tx.hash() != hashes[i] {
Err(RpcError::InvalidNode)?; Err(RpcError::InvalidNode("replied with transaction wasn't the requested transaction"))?;
} }
Ok(tx) Ok(tx)
@@ -282,7 +282,7 @@ impl<R: RpcConnection> Rpc<R> {
let header: BlockHeaderByHeightResponse = let header: BlockHeaderByHeightResponse =
self.json_rpc_call("get_block_header_by_height", Some(json!({ "height": number }))).await?; self.json_rpc_call("get_block_header_by_height", Some(json!({ "height": number }))).await?;
rpc_hex(&header.block_header.hash)?.try_into().map_err(|_| RpcError::InvalidNode) hash_hex(&header.block_header.hash)
} }
/// Get a block from the node by its hash. /// Get a block from the node by its hash.
@@ -296,10 +296,10 @@ impl<R: RpcConnection> Rpc<R> {
let res: BlockResponse = let res: BlockResponse =
self.json_rpc_call("get_block", Some(json!({ "hash": hex::encode(hash) }))).await?; self.json_rpc_call("get_block", Some(json!({ "hash": hex::encode(hash) }))).await?;
let block = let block = Block::read::<&[u8]>(&mut rpc_hex(&res.blob)?.as_ref())
Block::read::<&[u8]>(&mut rpc_hex(&res.blob)?.as_ref()).map_err(|_| RpcError::InvalidNode)?; .map_err(|_| RpcError::InvalidNode("invalid block"))?;
if block.hash() != hash { if block.hash() != hash {
Err(RpcError::InvalidNode)?; Err(RpcError::InvalidNode("different block than requested (hash)"))?;
} }
Ok(block) Ok(block)
} }
@@ -313,10 +313,12 @@ impl<R: RpcConnection> Rpc<R> {
if usize::try_from(*actual).unwrap() == number { if usize::try_from(*actual).unwrap() == number {
Ok(block) Ok(block)
} else { } else {
Err(RpcError::InvalidNode) Err(RpcError::InvalidNode("different block than requested (number)"))
} }
} }
_ => Err(RpcError::InvalidNode), _ => {
Err(RpcError::InvalidNode("block's miner_tx didn't have an input of kind Input::Gen"))
}
} }
} }
e => e, e => e,
@@ -496,7 +498,7 @@ impl<R: RpcConnection> Rpc<R> {
read_object(&mut indexes) read_object(&mut indexes)
})() })()
.map_err(|_| RpcError::InvalidNode) .map_err(|_| RpcError::InvalidNode("invalid binary response"))
} }
/// Get the output distribution, from the specified height to the specified height (both /// Get the output distribution, from the specified height to the specified height (both
@@ -569,11 +571,7 @@ impl<R: RpcConnection> Rpc<R> {
let txs = self let txs = self
.get_transactions( .get_transactions(
&outs &outs.outs.iter().map(|out| hash_hex(&out.txid)).collect::<Result<Vec<_>, _>>()?,
.outs
.iter()
.map(|out| rpc_hex(&out.txid)?.try_into().map_err(|_| RpcError::InvalidNode))
.collect::<Result<Vec<_>, _>>()?,
) )
.await?; .await?;
@@ -589,10 +587,7 @@ impl<R: RpcConnection> Rpc<R> {
// invalid keys may honestly exist on the blockchain // invalid keys may honestly exist on the blockchain
// Only a recent hard fork checked output keys were valid points // Only a recent hard fork checked output keys were valid points
let Some(key) = CompressedEdwardsY( let Some(key) = CompressedEdwardsY(
hex::decode(&out.key) rpc_hex(&out.key)?.try_into().map_err(|_| RpcError::InvalidNode("non-32-byte point"))?,
.map_err(|_| RpcError::InvalidNode)?
.try_into()
.map_err(|_| RpcError::InvalidNode)?,
) )
.decompress() else { .decompress() else {
return Ok(None); return Ok(None);
@@ -736,7 +731,7 @@ impl<R: RpcConnection> Rpc<R> {
let mut blocks = Vec::with_capacity(block_strs.len()); let mut blocks = Vec::with_capacity(block_strs.len());
for block in block_strs { for block in block_strs {
blocks.push(rpc_hex(&block)?.try_into().map_err(|_| RpcError::InvalidNode)?); blocks.push(hash_hex(&block)?);
} }
Ok(blocks) Ok(blocks)
} }

View File

@@ -233,7 +233,9 @@ impl SpendableOutput {
.get_o_indexes(self.output.absolute.tx) .get_o_indexes(self.output.absolute.tx)
.await? .await?
.get(usize::from(self.output.absolute.o)) .get(usize::from(self.output.absolute.o))
.ok_or(RpcError::InvalidNode)?; .ok_or(RpcError::InvalidNode(
"node returned output indexes didn't include an index for this output",
))?;
Ok(()) Ok(())
} }

View File

@@ -181,6 +181,13 @@ impl PartialEq for Monero {
} }
impl Eq for Monero {} impl Eq for Monero {}
fn map_rpc_err(err: RpcError) -> NetworkError {
if let RpcError::InvalidNode(reason) = &err {
log::error!("Monero RpcError::InvalidNode({reason})");
}
NetworkError::ConnectionError
}
impl Monero { impl Monero {
pub fn new(url: String) -> Monero { pub fn new(url: String) -> Monero {
Monero { rpc: HttpRpc::new(url).unwrap() } Monero { rpc: HttpRpc::new(url).unwrap() }
@@ -259,7 +266,7 @@ impl Monero {
} }
// Check a fork hasn't occurred which this processor hasn't been updated for // Check a fork hasn't occurred which this processor hasn't been updated for
assert_eq!(protocol, self.rpc.get_protocol().await.map_err(|_| NetworkError::ConnectionError)?); assert_eq!(protocol, self.rpc.get_protocol().await.map_err(map_rpc_err)?);
let spendable_outputs = inputs.iter().map(|input| input.0.clone()).collect::<Vec<_>>(); let spendable_outputs = inputs.iter().map(|input| input.0.clone()).collect::<Vec<_>>();
@@ -277,7 +284,7 @@ impl Monero {
&spendable_outputs, &spendable_outputs,
) )
.await .await
.map_err(|_| NetworkError::ConnectionError)?; .map_err(map_rpc_err)?;
let inputs = spendable_outputs.into_iter().zip(decoys).collect::<Vec<_>>(); let inputs = spendable_outputs.into_iter().zip(decoys).collect::<Vec<_>>();
@@ -348,7 +355,7 @@ impl Monero {
} }
TransactionError::RpcError(e) => { TransactionError::RpcError(e) => {
log::error!("RpcError when preparing transaction: {e:?}"); log::error!("RpcError when preparing transaction: {e:?}");
Err(NetworkError::ConnectionError) Err(map_rpc_err(e))
} }
}, },
} }
@@ -421,18 +428,16 @@ impl Network for Monero {
async fn get_latest_block_number(&self) -> Result<usize, NetworkError> { async fn get_latest_block_number(&self) -> Result<usize, NetworkError> {
// Monero defines height as chain length, so subtract 1 for block number // Monero defines height as chain length, so subtract 1 for block number
Ok(self.rpc.get_height().await.map_err(|_| NetworkError::ConnectionError)? - 1) Ok(self.rpc.get_height().await.map_err(map_rpc_err)? - 1)
} }
async fn get_block(&self, number: usize) -> Result<Self::Block, NetworkError> { async fn get_block(&self, number: usize) -> Result<Self::Block, NetworkError> {
Ok( Ok(
self self
.rpc .rpc
.get_block( .get_block(self.rpc.get_block_hash(number).await.map_err(map_rpc_err)?)
self.rpc.get_block_hash(number).await.map_err(|_| NetworkError::ConnectionError)?,
)
.await .await
.map_err(|_| NetworkError::ConnectionError)?, .map_err(map_rpc_err)?,
) )
} }
@@ -602,7 +607,7 @@ impl Network for Monero {
} }
async fn get_transaction(&self, id: &[u8; 32]) -> Result<Transaction, NetworkError> { async fn get_transaction(&self, id: &[u8; 32]) -> Result<Transaction, NetworkError> {
self.rpc.get_transaction(*id).await.map_err(|_| NetworkError::ConnectionError) self.rpc.get_transaction(*id).await.map_err(map_rpc_err)
} }
fn confirm_completion(&self, eventuality: &Eventuality, tx: &Transaction) -> bool { fn confirm_completion(&self, eventuality: &Eventuality, tx: &Transaction) -> bool {