Skip to content

Commit

Permalink
Upgrade and shrink direct dependencies
Browse files Browse the repository at this point in the history
This is mostly getting rid of the pre-1.0 hyper version, and using only
one http client library.
  • Loading branch information
tomaszklak committed Dec 4, 2024
1 parent f7e52c0 commit 79bdde2
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 295 deletions.
22 changes: 7 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,24 @@ version = "0.12.1"
all-features = true

[dependencies]
attohttpc = {version = "0.16", default-features = false}
bytes = {version = "1", optional = true}
futures = {version = "0.3", optional = true}
http = {version = "0.2", optional = true}
log = "0.4"
rand = "0.8"
reqwest = { version = "0.12.9", features = ["blocking"] }
thiserror = "2.0.4"
tokio = {version = "1", optional = true, features = ["net"]}
url = "2"
xmltree = "0.10"

[dependencies.hyper]
default-features = false
features = ["client", "http1", "http2", "runtime"]
optional = true
version = "0.14"
xmltree = "0.11"

[dev-dependencies]
http-body-util = "0.1.2"
hyper-new = { package = "hyper", version = "1", features = ["server", "http1"] }
hyper-util = { version = "0.1.10", features = ["tokio"] }
http-body-util = "0.1"
hyper = { package = "hyper", version = "1", features = ["server", "http1"] }
hyper-util = { version = "0.1", features = ["tokio"] }
simplelog = "0.9"
test-log = "0.2"
tokio = {version = "1", features = ["full"]}

[features]
aio = ["futures", "tokio", "hyper", "bytes", "http"]
aio = ["tokio"]
default = []

[[example]]
Expand Down
75 changes: 27 additions & 48 deletions src/aio/search.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr};
use std::future::Future;
use std::net::SocketAddr;
use std::str::FromStr;
use std::time::Duration;

use futures::prelude::*;
use hyper::client::Client;
// use futures::prelude::*;
use tokio::net::UdpSocket;
use tokio::time::timeout;

use crate::aio::Gateway;
use crate::common::{messages, parsing, SearchOptions};
use crate::errors::SearchError;
use crate::search::validate_url;

const MAX_RESPONSE_SIZE: usize = 1500;

Expand Down Expand Up @@ -78,7 +79,7 @@ pub async fn search_gateway(options: SearchOptions) -> Result<Gateway, SearchErr
control_schema,
};

let gateway_url = hyper::Uri::from_str(&format!("{gateway}"))?;
let gateway_url = reqwest::Url::from_str(&format!("{gateway}"))?;

validate_url(addr.ip(), &gateway_url)?;

Expand All @@ -102,16 +103,15 @@ async fn send_search_request(socket: &mut UdpSocket, addr: SocketAddr) -> Result
addr,
socket.local_addr()
);
socket
Ok(socket
.send_to(messages::SEARCH_REQUEST.as_bytes(), &addr)
.map_ok(|_| ())
.map_err(SearchError::from)
.await
.map(|_| ())?)
}

async fn receive_search_response(socket: &mut UdpSocket) -> Result<(Vec<u8>, SocketAddr), SearchError> {
let mut buff = [0u8; MAX_RESPONSE_SIZE];
let (n, from) = socket.recv_from(&mut buff).map_err(SearchError::from).await?;
let (n, from) = socket.recv_from(&mut buff).await?;
debug!("received broadcast response from: {}", from);
Ok((buff[..n].to_vec(), from))
}
Expand All @@ -130,60 +130,40 @@ fn handle_broadcast_resp(from: &SocketAddr, data: &[u8]) -> Result<(SocketAddr,
}

async fn get_control_urls(addr: &SocketAddr, path: &str) -> Result<(String, String), SearchError> {
let uri = match format!("http://{}{}", addr, path).parse() {
Ok(uri) => uri,
Err(err) => return Err(SearchError::from(err)),
};
let url: reqwest::Url = format!("http://{}{}", addr, path).parse()?;

debug!("requesting control url from: {}", uri);
let client = Client::new();
let resp = hyper::body::to_bytes(client.get(uri).await?.into_body())
.map_err(SearchError::from)
.await?;
debug!("requesting control url from: {:?}", url);
let client = reqwest::Client::new();
let resp = client.get(url).send().await?;

debug!("handling control response from: {}", addr);
let c = std::io::Cursor::new(&resp);
parsing::parse_control_urls(c)
let body = resp.bytes().await?;
parsing::parse_control_urls(body.as_ref())
}

async fn get_control_schemas(
addr: &SocketAddr,
control_schema_url: &str,
) -> Result<HashMap<String, Vec<String>>, SearchError> {
let uri: hyper::Uri = match format!("http://{}{}", addr, control_schema_url).parse() {
Ok(uri) => uri,
Err(err) => return Err(SearchError::from(err)),
};
let url: reqwest::Url = format!("http://{}{}", addr, control_schema_url).parse()?;

validate_url(addr.ip(), &uri)?;
validate_url(addr.ip(), &url)?;

debug!("requesting control schema from: {}", uri);
let client = Client::new();
let resp = hyper::body::to_bytes(client.get(uri).await?.into_body())
.map_err(SearchError::from)
.await?;
debug!("requesting control schema from: {}", url);
let client = reqwest::Client::new();
let resp = client.get(url).send().await?;

debug!("handling schema response from: {}", addr);
let c = std::io::Cursor::new(&resp);
parsing::parse_schemas(c)
}

fn validate_url(src_ip: IpAddr, url: &hyper::Uri) -> Result<(), SearchError> {
match url.host() {
Some(url_host) if url_host != src_ip.to_string() => Err(SearchError::SpoofedUrl {
src_ip,
url_host: url_host.to_owned(),
}),
None => Err(SearchError::UriMissingHost(url.clone())),
_ => Ok(()),
}
let body = resp.bytes().await?;
parsing::parse_schemas(body.as_ref())
}

#[cfg(test)]
mod tests {
use super::*;
use http_body_util::Full;
use hyper_new::{body::Bytes, service::service_fn, Request, Response};
use hyper::{body::Bytes, service::service_fn, Request, Response};
use hyper_util::rt::TokioIo;
use std::convert::Infallible;
use std::{
Expand Down Expand Up @@ -303,14 +283,13 @@ mod tests {
// `hyper::rt` IO traits.
let io = TokioIo::new(stream);

let hello_fn =
move |r: Request<hyper_new::body::Incoming>| -> Result<Response<Full<Bytes>>, Infallible> {
println!("Request: {r:?}");
Ok(Response::new(Full::new(Bytes::from(resp.clone()))))
};
let hello_fn = move |r: Request<hyper::body::Incoming>| -> Result<Response<Full<Bytes>>, Infallible> {
println!("Request: {r:?}");
Ok(Response::new(Full::new(Bytes::from(resp.clone()))))
};

// Finally, we bind the incoming connection to our `hello` service
if let Err(err) = hyper_new::server::conn::http1::Builder::new()
if let Err(err) = hyper::server::conn::http1::Builder::new()
// `service_fn` converts our function in a `Service`
.serve_connection(io, service_fn(|r| async { hello_fn(r) }))
.await
Expand Down
22 changes: 8 additions & 14 deletions src/aio/soap.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
use hyper::{
header::{CONTENT_LENGTH, CONTENT_TYPE},
Body, Client, Request,
};

use crate::errors::RequestError;
use reqwest::header::{CONTENT_LENGTH, CONTENT_TYPE};

#[derive(Clone, Debug)]
pub struct Action(String);
Expand All @@ -17,18 +13,16 @@ impl Action {
const HEADER_NAME: &str = "SOAPAction";

pub async fn send_async(url: &str, action: Action, body: &str) -> Result<String, RequestError> {
let client = Client::new();
let client = reqwest::Client::new();

let req = Request::builder()
.uri(url)
.method("POST")
let resp = client
.post(url)
.header(HEADER_NAME, action.0)
.header(CONTENT_TYPE, "text/xml")
.header(CONTENT_LENGTH, body.len() as u64)
.body(Body::from(body.to_string()))?;
.body(body.to_owned())
.send()
.await?;

let resp = client.request(req).await?;
let body = hyper::body::to_bytes(resp.into_body()).await?;
let string = String::from_utf8(body.to_vec())?;
Ok(string)
Ok(resp.text().await?)
}
Loading

0 comments on commit 79bdde2

Please sign in to comment.