Skip to content

Commit

Permalink
Merge branch 'master' into async_insert
Browse files Browse the repository at this point in the history
  • Loading branch information
magurotuna committed Nov 23, 2023
2 parents 465b1cf + 8c1cf9f commit 3e0ca8c
Show file tree
Hide file tree
Showing 28 changed files with 693 additions and 499 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ jobs:
- run: cargo test
- run: cargo test --no-default-features
- run: cargo test --features uuid,time
- run: cargo test --all-features

services:
clickhouse:
Expand Down
35 changes: 34 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,37 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
<!-- next-header -->

## [Unreleased] - ReleaseDate
### Added
- derive: support `serde::skip_deserializing` ([#83]).
- the `quanta` feature, enabled by default.
- inserter: can be limited by size, see `Inserter::with_max_bytes()`.

### Changed
- **BREAKING** inserter: `Inserter::write` is synchronous now.
- **BREAKING** inserter: rename `entries` to `rows`.
- **BREAKING** drop the `wa-37420` feature.
- **BREAKING** remove deprecated items.
- inserter: increase performance if the `quanta` feature is enabled.
- inserter: increase performance if the time limit isn't used.
- derive: move to syn v2.

### Fixed
- watch: support a new syntax.

[#83]: https://github.com/loyd/clickhouse.rs/pull/83

## [0.11.6] - 2023-09-27
### Fixed
- client: accept HTTPs urls if `tls` feature is enabled ([#58]).

[#58]: https://github.com/loyd/clickhouse.rs/issues/56

## [0.11.5] - 2023-06-12
### Changed
- inserter: start new insert only when the first row is provided ([#68], [#70]).

[#70]: https://github.com/loyd/clickhouse.rs/pull/70
[#68]: https://github.com/loyd/clickhouse.rs/pull/68

## [0.11.4] - 2023-05-14
### Added
Expand Down Expand Up @@ -241,7 +272,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `Client::query()` for selecting from tables and DDL statements.

<!-- next-url -->
[Unreleased]: https://github.com/loyd/clickhouse.rs/compare/v0.11.4...HEAD
[Unreleased]: https://github.com/loyd/clickhouse.rs/compare/v0.11.6...HEAD
[0.11.6]: https://github.com/loyd/clickhouse.rs/compare/v0.11.5...v0.11.6
[0.11.5]: https://github.com/loyd/clickhouse.rs/compare/v0.11.4...v0.11.5
[0.11.4]: https://github.com/loyd/clickhouse.rs/compare/v0.11.3...v0.11.4
[0.11.3]: https://github.com/loyd/clickhouse.rs/compare/v0.11.2...v0.11.3
[0.11.2]: https://github.com/loyd/clickhouse.rs/compare/v0.11.1...v0.11.2
Expand Down
16 changes: 7 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
[package]
name = "clickhouse"
version = "0.11.4"
version = "0.11.6"
description = "A typed client for ClickHouse with killer features"
keywords = ["clickhouse", "database", "driver", "tokio", "hyper"]
authors = ["Paul Loyd <[email protected]>"]
repository = "https://github.com/loyd/clickhouse.rs"
license = "MIT OR Apache-2.0"
readme = "README.md"
edition = "2018"
edition = "2021"
rust-version = "1.60"

[package.metadata.docs.rs]
Expand All @@ -34,17 +34,15 @@ required-features = ["test-util"]
debug = true

[features]
default = ["lz4", "tls"]
default = ["lz4", "tls", "quanta"]

test-util = ["hyper/server"]
watch = ["dep:sha-1", "dep:serde_json"]
lz4 = ["dep:lz4", "dep:clickhouse-rs-cityhash-sys"]
uuid = ["dep:uuid"]
time = ["dep:time"]
tls = ["dep:hyper-tls"]

# Temporary workaround for https://github.com/ClickHouse/ClickHouse/issues/37420
wa-37420 = []
quanta = ["dep:quanta"]

[dependencies]
clickhouse-derive = { version = "0.1.1", path = "derive" }
Expand All @@ -58,23 +56,23 @@ hyper-tls = { version = "0.5.0", optional = true }
url = "2.1.1"
futures = "0.3.5"
static_assertions = "1.1"
sealed = "0.4"
sealed = "0.5"
sha-1 = { version = "0.10", optional = true }
serde_json = { version = "1.0.68", optional = true }
lz4 = { version = "1.23.3", optional = true }
clickhouse-rs-cityhash-sys = { version = "0.1.2", optional = true }
uuid = { version = "1", optional = true }
time = { version = "0.3", optional = true }
bstr = { version = "1.2", default-features = false }
quanta = { version = "0.12", optional = true }

[dev-dependencies]
criterion = "0.4.0"
criterion = "0.5.0"
serde = { version = "1.0.106", features = ["derive"] }
tokio = { version = "1.0.1", features = ["full", "test-util"] }
hyper = { version = "0.14", features = ["client", "tcp", "http1", "stream", "server"] }
serde_bytes = "0.11.4"
serde_repr = "0.1.7"
uuid = { version = "1", features = ["v4"] }
function_name = "0.3"
time = { version = "0.3.17", features = ["macros", "rand"] }
rand = "0.8.5"
44 changes: 25 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,36 @@ A typed client for ClickHouse.

[crates-badge]: https://img.shields.io/crates/v/clickhouse.svg
[crates-url]: https://crates.io/crates/clickhouse
[docs-badge]: https://docs.rs/clickhouse/badge.svg
[docs-badge]: https://img.shields.io/docsrs/clickhouse
[docs-url]: https://docs.rs/clickhouse
[mit-badge]: https://img.shields.io/badge/license-MIT-blue.svg
[mit-url]: https://github.com/loyd/clickhouse.rs/blob/master/LICENSE
[actions-badge]: https://github.com/loyd/clickhouse.rs/actions/workflows/ci.yml/badge.svg
[actions-url]: https://github.com/loyd/clickhouse.rs/actions/workflows/ci.yml

* Uses `serde` for encoding/decoding rows.
* Uses `RowBinary` encoding.
* Supports HTTP and HTTPS.
* Supports `serde` attributes: `skip_serializing`, `skip_deserializing`, `rename`.
* Uses `RowBinary` encoding over HTTP transport.
* There are plans to switch to `Native` over TCP.
* Supports TLS.
* Supports compression and decompression (LZ4 and LZ4HC).
* Provides API for selecting.
* Provides API for inserting.
* Provides API for infinite transactional (see below) inserting.
* Provides API for watching live views.
* Compression and decompression (LZ4).
* Provides mocks for unit testing.

Note: [ch2rs](https://github.com/loyd/ch2rs) is useful to generate a row type from ClickHouse.

## Usage

To use the crate, add this to your `Cargo.toml`:
```toml
[dependencies]
clickhouse = "0.11.4"
clickhouse = "0.11.6"

[dev-dependencies]
clickhouse = { version = "0.11.4", features = ["test-util"] }
clickhouse = { version = "0.11.6", features = ["test-util"] }
```

<details>
Expand All @@ -43,7 +48,7 @@ clickhouse = { version = "0.11.4", features = ["test-util"] }

</summary>

CH server older than v22.6 (2022-06-16) handles `RowBinary` [incorrectly](https://github.com/ClickHouse/ClickHouse/issues/37420) in some rare cases. Enable `wa-37420` feature to solve this problem. Don't use it for newer versions.
CH server older than v22.6 (2022-06-16) handles `RowBinary` [incorrectly](https://github.com/ClickHouse/ClickHouse/issues/37420) in some rare cases. Use 0.11 and enable `wa-37420` feature to solve this problem. Don't use it for newer versions.

</details>
<details>
Expand Down Expand Up @@ -126,7 +131,6 @@ insert.end().await?;
* If `end()` isn't called, the `INSERT` is aborted.
* Rows are being sent progressively to spread network load.
* ClickHouse inserts batches atomically only if all rows fit in the same partition and their number is less [`max_insert_block_size`](https://clickhouse.tech/docs/en/operations/settings/settings/#settings-max_insert_block_size).
* [ch2rs](https://github.com/loyd/ch2rs) is useful to generate a row type from ClickHouse.

</details>
<details>
Expand Down Expand Up @@ -170,22 +174,24 @@ insert.end().await?;
```rust,ignore
let mut inserter = client.inserter("some")?
.with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20)))
.with_max_entries(750_000)
.with_max_bytes(50_000_000)
.with_max_rows(750_000)
.with_period(Some(Duration::from_secs(15)));
inserter.write(&MyRow { no: 0, name: "foo".into() }).await?;
inserter.write(&MyRow { no: 1, name: "bar".into() }).await?;
inserter.write(&MyRow { no: 0, name: "foo".into() })?;
inserter.write(&MyRow { no: 1, name: "bar".into() })?;
let stats = inserter.commit().await?;
if stats.entries > 0 {
if stats.rows > 0 {
println!(
"{} entries ({} transactions) have been inserted",
stats.entries, stats.transactions,
"{} bytes, {} rows, {} transactions have been inserted",
stats.bytes, stats.rows, stats.transactions,
);
}
```

* `Inserter` ends an active insert in `commit()` if thresholds (`max_entries`, `period`) are reached.
* `Inserter` ends an active insert in `commit()` if thresholds (`max_bytes`, `max_rows`, `period`) are reached.
* The interval between ending active `INSERT`s can be biased by using `with_period_bias` to avoid load spikes by parallel inserters.
* `Inserter::time_left()` can be used to detect when the current period ends. Call `Inserter::commit()` again to check limits.
* All rows between `commit()` calls are inserted in the same `INSERT` statement.
* Do not forget to flush if you want to terminate inserting:
```rust,ignore
Expand Down Expand Up @@ -240,11 +246,11 @@ See [examples](https://github.com/loyd/clickhouse.rs/tree/master/examples).
## Feature Flags
* `lz4` (enabled by default) — enables `Compression::Lz4` and `Compression::Lz4Hc(_)` variants. If enabled, `Compression::Lz4` is used by default for all queries except for `WATCH`.
* `tls` (enabled by default) — supports urls with the `HTTPS` schema.
* `quanta` (enabled by default) - uses the [quanta](https://docs.rs/quanta) crate to speed the inserter up. Not used if `test-util` is enabled (thus, time can be managed by `tokio::time::advance()` in custom tests).
* `test-util` — adds mocks. See [the example](https://github.com/loyd/clickhouse.rs/tree/master/examples/mock.rs). Use it only in `dev-dependencies`.
* `watch` — enables `client.watch` functionality. See the corresponding section for details.
* `uuid` — adds `serde::uuid` to work with [uuid](https://docs.rs/uuid/latest/uuid/) crate.
* `time` — adds `serde::time` to work with [time](https://docs.rs/time/latest/time/) crate.
* `wa-37420` — implements a workaround for CH versions prior to v22.6. See the corresponding section for details.
* `uuid` — adds `serde::uuid` to work with [uuid](https://docs.rs/uuid) crate.
* `time` — adds `serde::time` to work with [time](https://docs.rs/time) crate.

## Data Types
* `(U)Int(8|16|32|64|128)` maps to/from corresponding `(u|i)(8|16|32|64|128)` types or newtypes around them.
Expand All @@ -257,7 +263,7 @@ See [examples](https://github.com/loyd/clickhouse.rs/tree/master/examples).
<summary>Example</summary>

```rust,ignore
#[derive(Debug, Serialize, Deserialize)]
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow<'a> {
str: &'a str,
string: String,
Expand Down
91 changes: 60 additions & 31 deletions benches/insert.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{mem, time::Duration};
use std::{future::Future, mem, time::Duration};

use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput};
use serde::Serialize;
Expand Down Expand Up @@ -39,46 +39,67 @@ mod server {
}
}

fn insert(c: &mut Criterion) {
let addr = "127.0.0.1:6543".parse().unwrap();
server::start(addr);
#[derive(Row, Serialize)]
struct SomeRow {
a: u64,
b: i64,
c: i32,
d: u32,
}

async fn run_insert(client: Client, iters: u64) -> Result<Duration> {
let start = Instant::now();
let mut insert = client.insert("table")?;

#[derive(Row, Serialize)]
struct SomeRow {
a: u64,
b: i64,
c: i32,
d: u32,
for _ in 0..iters {
insert
.write(&black_box(SomeRow {
a: 42,
b: 42,
c: 42,
d: 42,
}))
.await?;
}

async fn run(client: Client, iters: u64) -> Result<Duration> {
let start = Instant::now();
let mut insert = client.insert("table")?;

for _ in 0..iters {
insert
.write(&black_box(SomeRow {
a: 42,
b: 42,
c: 42,
d: 42,
}))
.await?;
}
insert.end().await?;
Ok(start.elapsed())
}

insert.end().await?;
Ok(start.elapsed())
async fn run_inserter(client: Client, iters: u64) -> Result<Duration> {
let start = Instant::now();
let mut inserter = client.inserter("table")?.with_max_rows(iters);

for _ in 0..iters {
inserter.write(&black_box(SomeRow {
a: 42,
b: 42,
c: 42,
d: 42,
}))?;
inserter.commit().await?;
}

let mut group = c.benchmark_group("insert");
inserter.end().await?;
Ok(start.elapsed())
}

fn run<F>(c: &mut Criterion, name: &str, port: u16, f: impl Fn(Client, u64) -> F)
where
F: Future<Output = Result<Duration>>,
{
let addr = format!("127.0.0.1:{port}").parse().unwrap();
server::start(addr);

let mut group = c.benchmark_group(name);
group.throughput(Throughput::Bytes(mem::size_of::<SomeRow>() as u64));
group.bench_function("no compression", |b| {
b.iter_custom(|iters| {
let rt = Runtime::new().unwrap();
let client = Client::default()
.with_url(format!("http://{addr}"))
.with_compression(Compression::None);
rt.block_on(run(client, iters)).unwrap()
rt.block_on((f)(client, iters)).unwrap()
})
});
#[cfg(feature = "lz4")]
Expand All @@ -88,7 +109,7 @@ fn insert(c: &mut Criterion) {
let client = Client::default()
.with_url(format!("http://{addr}"))
.with_compression(Compression::Lz4);
rt.block_on(run(client, iters)).unwrap()
rt.block_on((f)(client, iters)).unwrap()
})
});
#[cfg(feature = "lz4")]
Expand All @@ -98,11 +119,19 @@ fn insert(c: &mut Criterion) {
let client = Client::default()
.with_url(format!("http://{addr}"))
.with_compression(Compression::Lz4Hc(4));
rt.block_on(run(client, iters)).unwrap()
rt.block_on((f)(client, iters)).unwrap()
})
});
group.finish();
}

criterion_group!(benches, insert);
fn insert(c: &mut Criterion) {
run(c, "insert", 6543, run_insert);
}

fn inserter(c: &mut Criterion) {
run(c, "inserter", 6544, run_inserter);
}

criterion_group!(benches, insert, inserter);
criterion_main!(benches);
4 changes: 2 additions & 2 deletions derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ proc-macro = true

[dependencies]
proc-macro2 = "1.0"
syn = "1.0"
syn = "2.0"
quote = "1.0"
serde_derive_internals = "0.26"
serde_derive_internals = "0.29"
Loading

0 comments on commit 3e0ca8c

Please sign in to comment.