Skip to content

Commit

Permalink
feat!(inserter): move under inserter feature
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Jan 27, 2024
1 parent 4645a14 commit e445311
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 15 deletions.
9 changes: 5 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] - ReleaseDate
### Added
- derive: support `serde::skip_deserializing` ([#83]).
- the `quanta` feature, enabled by default.
- inserter: can be limited by size, see `Inserter::with_max_bytes()`.
- insert: apply options set on the client ([#90]).
- inserter: can be limited by size, see `Inserter::with_max_bytes()`.
- inserter: `Inserter::pending()` to get stats about still being inserted data.

### Changed
- **BREAKING** inserter: move under the `inserter` feature.
- **BREAKING** inserter: there is no default limits anymore.
- **BREAKING** inserter: `Inserter::write` is synchronous now.
- **BREAKING** inserter: rename `entries` to `rows`.
- **BREAKING** drop the `wa-37420` feature.
- **BREAKING** remove deprecated items.
- inserter: increase performance if the `quanta` feature is enabled.
- inserter: increase performance if the time limit isn't used.
- inserter: improve performance of time measurements by using `quanta`.
- inserter: improve performance if the time limit isn't used.
- derive: move to syn v2.

### Fixed
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ required-features = ["test-util"]
debug = true

[features]
default = ["lz4", "tls", "quanta"]
default = ["lz4", "tls"]

test-util = ["hyper/server"]
inserter = ["dep:quanta"]
watch = ["dep:sha-1", "dep:serde_json"]
lz4 = ["dep:lz4", "dep:clickhouse-rs-cityhash-sys"]
uuid = ["dep:uuid"]
time = ["dep:time"]
lz4 = ["dep:lz4", "dep:clickhouse-rs-cityhash-sys"]
tls = ["dep:hyper-tls"]
quanta = ["dep:quanta"]

[dependencies]
clickhouse-derive = { version = "0.1.1", path = "derive" }
Expand Down
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ insert.end().await?;

</summary>

Requires the `inserter` feature.

```rust,ignore
let mut inserter = client.inserter("some")?
.with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20)))
Expand All @@ -160,7 +162,8 @@ if stats.rows > 0 {

* `Inserter` ends an active insert in `commit()` if thresholds (`max_bytes`, `max_rows`, `period`) are reached.
* The interval between ending active `INSERT`s can be biased by using `with_period_bias` to avoid load spikes by parallel inserters.
* `Inserter::time_left()` can be used to detect when the current period ends. Call `Inserter::commit()` again to check limits.
* `Inserter::time_left()` can be used to detect when the current period ends. Call `Inserter::commit()` again to check limits if your stream emits items rarely.
* Time thresholds implemented by using [quanta](https://docs.rs/quanta) crate to speed the inserter up. Not used if `test-util` is enabled (thus, time can be managed by `tokio::time::advance()` in custom tests).
* All rows between `commit()` calls are inserted in the same `INSERT` statement.
* Do not forget to flush if you want to terminate inserting:
```rust,ignore
Expand Down Expand Up @@ -215,7 +218,7 @@ See [examples](https://github.com/loyd/clickhouse.rs/tree/master/examples).
## Feature Flags
* `lz4` (enabled by default) — enables `Compression::Lz4` and `Compression::Lz4Hc(_)` variants. If enabled, `Compression::Lz4` is used by default for all queries except for `WATCH`.
* `tls` (enabled by default) — supports urls with the `HTTPS` schema.
* `quanta` (enabled by default) - uses the [quanta](https://docs.rs/quanta) crate to speed the inserter up. Not used if `test-util` is enabled (thus, time can be managed by `tokio::time::advance()` in custom tests).
* `inserter` — enables `client.inserter()`.
* `test-util` — adds mocks. See [the example](https://github.com/loyd/clickhouse.rs/tree/master/examples/mock.rs). Use it only in `dev-dependencies`.
* `watch` — enables `client.watch` functionality. See the corresponding section for details.
* `uuid` — adds `serde::uuid` to work with [uuid](https://docs.rs/uuid) crate.
Expand Down
5 changes: 5 additions & 0 deletions benches/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ async fn run_insert(client: Client, iters: u64) -> Result<Duration> {
Ok(start.elapsed())
}

#[cfg(feature = "inserter")]
async fn run_inserter<const WITH_PERIOD: bool>(client: Client, iters: u64) -> Result<Duration> {
let start = Instant::now();
let mut inserter = client.inserter("table")?.with_max_rows(iters);
Expand Down Expand Up @@ -141,10 +142,14 @@ fn insert(c: &mut Criterion) {
run(c, "insert", 6543, run_insert);
}

#[cfg(feature = "inserter")]
fn inserter(c: &mut Criterion) {
run(c, "inserter", 6544, run_inserter::<false>);
run(c, "inserter-period", 6545, run_inserter::<true>);
}

#[cfg(not(feature = "inserter"))]
criterion_group!(benches, insert);
#[cfg(feature = "inserter")]
criterion_group!(benches, insert, inserter);
criterion_main!(benches);
7 changes: 3 additions & 4 deletions examples/usage.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::time::Duration;

use serde::{Deserialize, Serialize};

use clickhouse::{error::Result, sql, Client, Row};
Expand Down Expand Up @@ -39,11 +37,12 @@ async fn insert(client: &Client) -> Result<()> {
insert.end().await
}

#[cfg(feature = "inserter")]
async fn inserter(client: &Client) -> Result<()> {
let mut inserter = client
.inserter("some")?
.with_max_rows(100_000)
.with_period(Some(Duration::from_secs(15)));
.with_period(Some(std::time::Duration::from_secs(15)));

for i in 0..1000 {
if i == 500 {
Expand Down Expand Up @@ -149,13 +148,13 @@ async fn main() -> Result<()> {

ddl(&client).await?;
insert(&client).await?;
#[cfg(feature = "inserter")]
inserter(&client).await?;
select_count(&client).await?;
fetch(&client).await?;
fetch_all(&client).await?;
delete(&client).await?;
select_count(&client).await?;

#[cfg(feature = "watch")]
watch(&client).await?;

Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use self::{error::Result, http_client::HttpClient};

pub mod error;
pub mod insert;
#[cfg(feature = "inserter")]
pub mod inserter;
pub mod query;
pub mod serde;
Expand All @@ -35,6 +36,7 @@ mod http_client;
mod response;
mod row;
mod rowbinary;
#[cfg(feature = "inserter")]
mod ticks;

const TCP_KEEPALIVE: Duration = Duration::from_secs(60);
Expand Down Expand Up @@ -177,6 +179,7 @@ impl Client {
}

/// Creates an inserter to perform multiple INSERTs.
#[cfg(feature = "inserter")]
pub fn inserter<T: Row>(&self, table: &str) -> Result<inserter::Inserter<T>> {
inserter::Inserter::new(self, table)
}
Expand Down
4 changes: 2 additions & 2 deletions src/ticks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ const PERIOD_THRESHOLD: Duration = Duration::from_secs(365 * 24 * 3600);
// === Instant ===

// More efficient `Instant` based on TSC.
#[cfg(all(feature = "quanta", not(feature = "test-util")))]
#[cfg(not(feature = "test-util"))]
type Instant = quanta::Instant;

#[cfg(any(not(feature = "quanta"), feature = "test-util"))]
#[cfg(feature = "test-util")]
type Instant = tokio::time::Instant;

// === Ticks ===
Expand Down
2 changes: 2 additions & 0 deletions tests/it/inserter.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![cfg(feature = "inserter")]

use serde::Serialize;

use clickhouse::{inserter::Quantities, Client, Row};
Expand Down

0 comments on commit e445311

Please sign in to comment.