Skip to content

Commit

Permalink
Merge branch 'feat/hyper-v1'
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Jul 15, 2024
2 parents 24b39c7 + 6bb871d commit e5cce3d
Show file tree
Hide file tree
Showing 16 changed files with 516 additions and 300 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- inserter: can be limited by size, see `Inserter::with_max_bytes()`.
- inserter: `Inserter::pending()` to get stats about still being inserted data.
- inserter: `Inserter::force_commit()` to commit and insert immediately.
- mock: impl `Default` instance for `Mock`.

### Changed
- **BREAKING** http: `HttpClient` API is changed due to moving to hyper v1.
- **BREAKING** inserter: move under the `inserter` feature.
- **BREAKING** inserter: there is no default limits anymore.
- **BREAKING** inserter: `Inserter::write` is synchronous now.
- **BREAKING** inserter: rename `entries` to `rows`.
- **BREAKING** drop the `wa-37420` feature.
- **BREAKING** remove deprecated items.
- **BREAKING** mock: `provide()`, `watch()` and `watch_only_events()` now accept iterators instead of streams.
- inserter: improve performance of time measurements by using `quanta`.
- inserter: improve performance if the time limit isn't used.
- derive: move to syn v2.
Expand Down
9 changes: 6 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,13 @@ thiserror = "1.0.16"
serde = "1.0.106"
bytes = "1.5.0"
tokio = { version = "1.0.1", features = ["rt", "macros"] }
hyper = { version = "0.14", features = ["client", "tcp", "http1", "stream"] }
hyper-tls = { version = "0.5.0", optional = true }
http-body-util = "0.1.2"
hyper = "1.4"
hyper-util = { version = "0.1.6", features = ["client-legacy", "http1"] }
hyper-tls = { version = "0.6.0", optional = true }
url = "2.1.1"
futures = "0.3.5"
futures-channel = "0.3.30"
static_assertions = "1.1"
sealed = "0.5"
sha-1 = { version = "0.10", optional = true }
Expand All @@ -70,7 +73,7 @@ quanta = { version = "0.12", optional = true }
criterion = "0.5.0"
serde = { version = "1.0.106", features = ["derive"] }
tokio = { version = "1.0.1", features = ["full", "test-util"] }
hyper = { version = "0.14", features = ["client", "tcp", "http1", "stream", "server"] }
hyper = { version = "1.1", features = ["server"] }
serde_bytes = "0.11.4"
serde_repr = "0.1.7"
uuid = { version = "1", features = ["v4"] }
Expand Down
57 changes: 57 additions & 0 deletions benches/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::{convert::Infallible, future::Future, net::SocketAddr, thread};

use bytes::Bytes;
use futures::stream::StreamExt;
use http_body_util::BodyExt;
use hyper::{
body::{Body, Incoming},
server::conn,
service, Request, Response,
};
use hyper_util::rt::{TokioIo, TokioTimer};
use tokio::{net::TcpListener, runtime};

pub struct ServerHandle;

pub fn start_server<S, F, B>(addr: SocketAddr, serve: S) -> ServerHandle
where
S: Fn(Request<Incoming>) -> F + Send + Sync + 'static,
F: Future<Output = Response<B>> + Send,
B: Body<Data = Bytes, Error = Infallible> + Send + 'static,
{
let serving = async move {
let listener = TcpListener::bind(addr).await.unwrap();

loop {
let (stream, _) = listener.accept().await.unwrap();

let service =
service::service_fn(|request| async { Ok::<_, Infallible>(serve(request).await) });

// SELECT benchmark doesn't read the whole body, so ignore possible errors.
let _ = conn::http1::Builder::new()
.timer(TokioTimer::new())
.serve_connection(TokioIo::new(stream), service)
.await;
}
};

thread::spawn(move || {
runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(serving);
});

ServerHandle
}

pub async fn skip_incoming(request: Request<Incoming>) {
let mut body = request.into_body().into_data_stream();

// Read and skip all frames.
while let Some(result) = body.next().await {
result.unwrap();
}
}
39 changes: 8 additions & 31 deletions benches/insert.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,19 @@
use std::{future::Future, mem, time::Duration};

use bytes::Bytes;
use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput};
use http_body_util::Empty;
use hyper::{body::Incoming, Request, Response};
use serde::Serialize;
use tokio::{runtime::Runtime, time::Instant};

use clickhouse::{error::Result, Client, Compression, Row};

mod server {
use std::{convert::Infallible, net::SocketAddr, thread};
mod common;

use futures::stream::StreamExt;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use tokio::runtime;

async fn handle(req: Request<Body>) -> Result<Response<Body>, Infallible> {
let mut body = req.into_body();

while let Some(res) = body.next().await {
res.unwrap();
}

Ok(Response::new(Body::empty()))
}

pub fn start(addr: SocketAddr) {
thread::spawn(move || {
runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
let make_svc =
make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(handle)) });
Server::bind(&addr).serve(make_svc).await.unwrap();
});
});
}
async fn serve(request: Request<Incoming>) -> Response<Empty<Bytes>> {
common::skip_incoming(request).await;
Response::new(Empty::new())
}

#[derive(Row, Serialize)]
Expand Down Expand Up @@ -102,7 +79,7 @@ where
F: Future<Output = Result<Duration>>,
{
let addr = format!("127.0.0.1:{port}").parse().unwrap();
server::start(addr);
let _server = common::start_server(addr, serve);

let mut group = c.benchmark_group(name);
group.throughput(Throughput::Bytes(mem::size_of::<SomeRow>() as u64));
Expand Down
46 changes: 17 additions & 29 deletions benches/select.rs
Original file line number Diff line number Diff line change
@@ -1,47 +1,35 @@
use std::convert::Infallible;
use std::mem;

use bytes::Bytes;
use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput};
use futures::stream::{self, StreamExt as _};
use http_body_util::StreamBody;
use hyper::{
body::{Body, Frame, Incoming},
Request, Response,
};
use serde::Deserialize;
use tokio::{runtime::Runtime, time::Instant};

use clickhouse::{error::Result, Client, Compression, Row};

mod server {
use std::{convert::Infallible, net::SocketAddr, thread};
mod common;

use bytes::Bytes;
use futures::stream;
use hyper::service::{make_service_fn, service_fn};
use hyper::{body, Body, Request, Response, Server};
use tokio::runtime;
async fn serve(
request: Request<Incoming>,
) -> Response<impl Body<Data = Bytes, Error = Infallible>> {
common::skip_incoming(request).await;

async fn handle(req: Request<Body>) -> Result<Response<Body>, Infallible> {
let _ = body::aggregate(req.into_body()).await;
let chunk = Bytes::from_static(&[15; 128 * 1024]);
let stream = stream::repeat(Ok::<Bytes, &'static str>(chunk));
let body = Body::wrap_stream(stream);
let chunk = Bytes::from_static(&[15; 128 * 1024]);
let stream = stream::repeat(chunk).map(|chunk| Ok(Frame::data(chunk)));

Ok(Response::new(body))
}

pub fn start(addr: SocketAddr) {
thread::spawn(move || {
runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
let make_svc =
make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(handle)) });
Server::bind(&addr).serve(make_svc).await.unwrap();
});
});
}
Response::new(StreamBody::new(stream))
}

fn select(c: &mut Criterion) {
let addr = "127.0.0.1:6543".parse().unwrap();
server::start(addr);
let _server = common::start_server(addr, serve);

#[allow(dead_code)]
#[derive(Debug, Row, Deserialize)]
Expand Down
9 changes: 3 additions & 6 deletions examples/mock.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use clickhouse::{error::Result, test, Client, Row};
use futures::stream;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, PartialEq)]
Expand Down Expand Up @@ -56,7 +55,7 @@ async fn main() {
assert!(recording.query().await.contains("CREATE TABLE"));

// How to test SELECT.
mock.add(test::handlers::provide(stream::iter(list.clone())));
mock.add(test::handlers::provide(list.clone()));
let rows = make_select(&client).await.unwrap();
assert_eq!(rows, list);

Expand All @@ -76,17 +75,15 @@ async fn main() {
{
// Check `CREATE LIVE VIEW` (for `watch(query)` case only).
let recording = mock.add(test::handlers::record_ddl());
mock.add(test::handlers::watch(stream::iter(
list.into_iter().map(|row| (42, row)),
)));
mock.add(test::handlers::watch(list.into_iter().map(|row| (42, row))));
let (version, row) = make_watch(&client).await.unwrap();
assert!(recording.query().await.contains("CREATE LIVE VIEW"));
assert_eq!(version, 42);
assert_eq!(row, SomeRow { no: 1 });

// `EVENTS`.
let recording = mock.add(test::handlers::record_ddl());
mock.add(test::handlers::watch_only_events(stream::iter(3..5)));
mock.add(test::handlers::watch_only_events(3..5));
let version = make_watch_only_events(&client).await.unwrap();
assert!(recording.query().await.contains("CREATE LIVE VIEW"));
assert_eq!(version, 3);
Expand Down
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ impl From<hyper::Error> for Error {
}
}

impl From<hyper_util::client::legacy::Error> for Error {
fn from(error: hyper_util::client::legacy::Error) -> Self {
Self::Network(Box::new(error))
}
}

impl ser::Error for Error {
fn custom<T: fmt::Display>(msg: T) -> Self {
Self::Custom(msg.to_string())
Expand Down
22 changes: 15 additions & 7 deletions src/http_client.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
use hyper::{
client::{connect::Connect, ResponseFuture},
Body, Request,
};
use hyper::Request;
use hyper_util::client::legacy::{connect::Connect, Client, ResponseFuture};
use sealed::sealed;

use crate::request_body::RequestBody;

/// A trait for underlying HTTP client.
///
/// Firstly, now it is implemented only for `hyper_util::client::legacy::Client`,
/// it's impossible to use another HTTP client.
///
/// Secondly, although it's stable in terms of semver, it will be changed in the future
/// (e.g. to support more runtimes, not only tokio). Thus, prefer to open a feature
/// request instead of implementing this trait manually.
#[sealed]
pub trait HttpClient: Send + Sync + 'static {
fn _request(&self, req: Request<Body>) -> ResponseFuture;
fn request(&self, req: Request<RequestBody>) -> ResponseFuture;
}

#[sealed]
impl<C> HttpClient for hyper::Client<C>
impl<C> HttpClient for Client<C, RequestBody>
where
C: Connect + Clone + Send + Sync + 'static,
{
fn _request(&self, req: Request<Body>) -> ResponseFuture {
fn request(&self, req: Request<RequestBody>) -> ResponseFuture {
self.request(req)
}
}
16 changes: 9 additions & 7 deletions src/insert.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{future::Future, marker::PhantomData, mem, panic, pin::Pin, time::Duration};

use bytes::{Bytes, BytesMut};
use hyper::{self, body, Body, Request};
use hyper::{self, Request};
use serde::Serialize;
use tokio::{
task::JoinHandle,
Expand All @@ -11,6 +11,7 @@ use url::Url;

use crate::{
error::{Error, Result},
request_body::{ChunkSender, RequestBody},
response::Response,
row::{self, Row},
rowbinary, Client, Compression,
Expand All @@ -28,7 +29,7 @@ const MIN_CHUNK_SIZE: usize = BUFFER_SIZE - 1024; // slightly less to avoid extr
#[must_use]
pub struct Insert<T> {
buffer: BytesMut,
sender: Option<body::Sender>,
sender: Option<ChunkSender>,
#[cfg(feature = "lz4")]
compression: Compression,
send_timeout: Option<Duration>,
Expand Down Expand Up @@ -95,13 +96,14 @@ impl<T> Insert<T> {
builder = builder.header("X-ClickHouse-Key", password);
}

let (sender, body) = Body::channel();
let (sender, body) = RequestBody::chunked();

let request = builder
.body(body)
.map_err(|err| Error::InvalidParams(Box::new(err)))?;

let future = client.client._request(request);
let future = client.http.request(request);
// TODO: introduce `Executor` to allow bookkeeping of spawned tasks.
let handle =
tokio::spawn(async move { Response::new(future, Compression::None).finish().await });

Expand Down Expand Up @@ -225,9 +227,9 @@ impl<T> Insert<T> {

let sender = self.sender.as_mut().unwrap(); // checked above

let is_timed_out = match timeout!(self, send_timeout, sender.send_data(chunk)) {
Some(Ok(())) => return Ok(()),
Some(Err(_)) => false, // an actual error will be returned from `wait_handle`
let is_timed_out = match timeout!(self, send_timeout, sender.send(chunk)) {
Some(true) => return Ok(()),
Some(false) => false, // an actual error will be returned from `wait_handle`
None => true,
};

Expand Down
Loading

0 comments on commit e5cce3d

Please sign in to comment.