diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c797706..4bead9d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -41,6 +41,7 @@ jobs: - run: cargo test - run: cargo test --no-default-features - run: cargo test --features uuid,time + - run: cargo test --all-features services: clickhouse: diff --git a/CHANGELOG.md b/CHANGELOG.md index bc84b72..bc5c2ef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,37 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] - ReleaseDate +### Added +- derive: support `serde::skip_deserializing` ([#83]). +- the `quanta` feature, enabled by default. +- inserter: can be limited by size, see `Inserter::with_max_bytes()`. + +### Changed +- **BREAKING** inserter: `Inserter::write` is synchronous now. +- **BREAKING** inserter: rename `entries` to `rows`. +- **BREAKING** drop the `wa-37420` feature. +- **BREAKING** remove deprecated items. +- inserter: increase performance if the `quanta` feature is enabled. +- inserter: increase performance if the time limit isn't used. +- derive: move to syn v2. + +### Fixed +- watch: support a new syntax. + +[#83]: https://github.com/loyd/clickhouse.rs/pull/83 + +## [0.11.6] - 2023-09-27 +### Fixed +- client: accept HTTPs urls if `tls` feature is enabled ([#58]). + +[#58]: https://github.com/loyd/clickhouse.rs/issues/56 + +## [0.11.5] - 2023-06-12 +### Changed +- inserter: start new insert only when the first row is provided ([#68], [#70]). + +[#70]: https://github.com/loyd/clickhouse.rs/pull/70 +[#68]: https://github.com/loyd/clickhouse.rs/pull/68 ## [0.11.4] - 2023-05-14 ### Added @@ -241,7 +272,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `Client::query()` for selecting from tables and DDL statements. -[Unreleased]: https://github.com/loyd/clickhouse.rs/compare/v0.11.4...HEAD +[Unreleased]: https://github.com/loyd/clickhouse.rs/compare/v0.11.6...HEAD +[0.11.6]: https://github.com/loyd/clickhouse.rs/compare/v0.11.5...v0.11.6 +[0.11.5]: https://github.com/loyd/clickhouse.rs/compare/v0.11.4...v0.11.5 [0.11.4]: https://github.com/loyd/clickhouse.rs/compare/v0.11.3...v0.11.4 [0.11.3]: https://github.com/loyd/clickhouse.rs/compare/v0.11.2...v0.11.3 [0.11.2]: https://github.com/loyd/clickhouse.rs/compare/v0.11.1...v0.11.2 diff --git a/Cargo.toml b/Cargo.toml index 6d83f87..7517d85 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,13 +1,13 @@ [package] name = "clickhouse" -version = "0.11.4" +version = "0.11.6" description = "A typed client for ClickHouse with killer features" keywords = ["clickhouse", "database", "driver", "tokio", "hyper"] authors = ["Paul Loyd "] repository = "https://github.com/loyd/clickhouse.rs" license = "MIT OR Apache-2.0" readme = "README.md" -edition = "2018" +edition = "2021" rust-version = "1.60" [package.metadata.docs.rs] @@ -34,7 +34,7 @@ required-features = ["test-util"] debug = true [features] -default = ["lz4", "tls"] +default = ["lz4", "tls", "quanta"] test-util = ["hyper/server"] watch = ["dep:sha-1", "dep:serde_json"] @@ -42,9 +42,7 @@ lz4 = ["dep:lz4", "dep:clickhouse-rs-cityhash-sys"] uuid = ["dep:uuid"] time = ["dep:time"] tls = ["dep:hyper-tls"] - -# Temporary workaround for https://github.com/ClickHouse/ClickHouse/issues/37420 -wa-37420 = [] +quanta = ["dep:quanta"] [dependencies] clickhouse-derive = { version = "0.1.1", path = "derive" } @@ -58,7 +56,7 @@ hyper-tls = { version = "0.5.0", optional = true } url = "2.1.1" futures = "0.3.5" static_assertions = "1.1" -sealed = "0.4" +sealed = "0.5" sha-1 = { version = "0.10", optional = true } serde_json = { version = "1.0.68", optional = true } lz4 = { version = "1.23.3", optional = true } @@ -66,15 +64,15 @@ clickhouse-rs-cityhash-sys = { version = "0.1.2", optional = true } uuid = { version = "1", optional = true } time = { version = "0.3", optional = true } bstr = { version = "1.2", default-features = false } +quanta = { version = "0.12", optional = true } [dev-dependencies] -criterion = "0.4.0" +criterion = "0.5.0" serde = { version = "1.0.106", features = ["derive"] } tokio = { version = "1.0.1", features = ["full", "test-util"] } hyper = { version = "0.14", features = ["client", "tcp", "http1", "stream", "server"] } serde_bytes = "0.11.4" serde_repr = "0.1.7" uuid = { version = "1", features = ["v4"] } -function_name = "0.3" time = { version = "0.3.17", features = ["macros", "rand"] } rand = "0.8.5" diff --git a/README.md b/README.md index ae66a5d..74d6238 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ A typed client for ClickHouse. [crates-badge]: https://img.shields.io/crates/v/clickhouse.svg [crates-url]: https://crates.io/crates/clickhouse -[docs-badge]: https://docs.rs/clickhouse/badge.svg +[docs-badge]: https://img.shields.io/docsrs/clickhouse [docs-url]: https://docs.rs/clickhouse [mit-badge]: https://img.shields.io/badge/license-MIT-blue.svg [mit-url]: https://github.com/loyd/clickhouse.rs/blob/master/LICENSE @@ -17,23 +17,28 @@ A typed client for ClickHouse. [actions-url]: https://github.com/loyd/clickhouse.rs/actions/workflows/ci.yml * Uses `serde` for encoding/decoding rows. -* Uses `RowBinary` encoding. -* Supports HTTP and HTTPS. +* Supports `serde` attributes: `skip_serializing`, `skip_deserializing`, `rename`. +* Uses `RowBinary` encoding over HTTP transport. + * There are plans to switch to `Native` over TCP. +* Supports TLS. +* Supports compression and decompression (LZ4 and LZ4HC). * Provides API for selecting. * Provides API for inserting. * Provides API for infinite transactional (see below) inserting. * Provides API for watching live views. -* Compression and decompression (LZ4). * Provides mocks for unit testing. +Note: [ch2rs](https://github.com/loyd/ch2rs) is useful to generate a row type from ClickHouse. + ## Usage + To use the crate, add this to your `Cargo.toml`: ```toml [dependencies] -clickhouse = "0.11.4" +clickhouse = "0.11.6" [dev-dependencies] -clickhouse = { version = "0.11.4", features = ["test-util"] } +clickhouse = { version = "0.11.6", features = ["test-util"] } ```
@@ -43,7 +48,7 @@ clickhouse = { version = "0.11.4", features = ["test-util"] } -CH server older than v22.6 (2022-06-16) handles `RowBinary` [incorrectly](https://github.com/ClickHouse/ClickHouse/issues/37420) in some rare cases. Enable `wa-37420` feature to solve this problem. Don't use it for newer versions. +CH server older than v22.6 (2022-06-16) handles `RowBinary` [incorrectly](https://github.com/ClickHouse/ClickHouse/issues/37420) in some rare cases. Use 0.11 and enable `wa-37420` feature to solve this problem. Don't use it for newer versions.
@@ -126,7 +131,6 @@ insert.end().await?; * If `end()` isn't called, the `INSERT` is aborted. * Rows are being sent progressively to spread network load. * ClickHouse inserts batches atomically only if all rows fit in the same partition and their number is less [`max_insert_block_size`](https://clickhouse.tech/docs/en/operations/settings/settings/#settings-max_insert_block_size). -* [ch2rs](https://github.com/loyd/ch2rs) is useful to generate a row type from ClickHouse.
@@ -170,22 +174,24 @@ insert.end().await?; ```rust,ignore let mut inserter = client.inserter("some")? .with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20))) - .with_max_entries(750_000) + .with_max_bytes(50_000_000) + .with_max_rows(750_000) .with_period(Some(Duration::from_secs(15))); -inserter.write(&MyRow { no: 0, name: "foo".into() }).await?; -inserter.write(&MyRow { no: 1, name: "bar".into() }).await?; +inserter.write(&MyRow { no: 0, name: "foo".into() })?; +inserter.write(&MyRow { no: 1, name: "bar".into() })?; let stats = inserter.commit().await?; -if stats.entries > 0 { +if stats.rows > 0 { println!( - "{} entries ({} transactions) have been inserted", - stats.entries, stats.transactions, + "{} bytes, {} rows, {} transactions have been inserted", + stats.bytes, stats.rows, stats.transactions, ); } ``` -* `Inserter` ends an active insert in `commit()` if thresholds (`max_entries`, `period`) are reached. +* `Inserter` ends an active insert in `commit()` if thresholds (`max_bytes`, `max_rows`, `period`) are reached. * The interval between ending active `INSERT`s can be biased by using `with_period_bias` to avoid load spikes by parallel inserters. +* `Inserter::time_left()` can be used to detect when the current period ends. Call `Inserter::commit()` again to check limits. * All rows between `commit()` calls are inserted in the same `INSERT` statement. * Do not forget to flush if you want to terminate inserting: ```rust,ignore @@ -240,11 +246,11 @@ See [examples](https://github.com/loyd/clickhouse.rs/tree/master/examples). ## Feature Flags * `lz4` (enabled by default) — enables `Compression::Lz4` and `Compression::Lz4Hc(_)` variants. If enabled, `Compression::Lz4` is used by default for all queries except for `WATCH`. * `tls` (enabled by default) — supports urls with the `HTTPS` schema. +* `quanta` (enabled by default) - uses the [quanta](https://docs.rs/quanta) crate to speed the inserter up. Not used if `test-util` is enabled (thus, time can be managed by `tokio::time::advance()` in custom tests). * `test-util` — adds mocks. See [the example](https://github.com/loyd/clickhouse.rs/tree/master/examples/mock.rs). Use it only in `dev-dependencies`. * `watch` — enables `client.watch` functionality. See the corresponding section for details. -* `uuid` — adds `serde::uuid` to work with [uuid](https://docs.rs/uuid/latest/uuid/) crate. -* `time` — adds `serde::time` to work with [time](https://docs.rs/time/latest/time/) crate. -* `wa-37420` — implements a workaround for CH versions prior to v22.6. See the corresponding section for details. +* `uuid` — adds `serde::uuid` to work with [uuid](https://docs.rs/uuid) crate. +* `time` — adds `serde::time` to work with [time](https://docs.rs/time) crate. ## Data Types * `(U)Int(8|16|32|64|128)` maps to/from corresponding `(u|i)(8|16|32|64|128)` types or newtypes around them. @@ -257,7 +263,7 @@ See [examples](https://github.com/loyd/clickhouse.rs/tree/master/examples). Example ```rust,ignore - #[derive(Debug, Serialize, Deserialize)] + #[derive(Row, Debug, Serialize, Deserialize)] struct MyRow<'a> { str: &'a str, string: String, diff --git a/benches/insert.rs b/benches/insert.rs index 059635e..47260b1 100644 --- a/benches/insert.rs +++ b/benches/insert.rs @@ -1,4 +1,4 @@ -use std::{mem, time::Duration}; +use std::{future::Future, mem, time::Duration}; use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput}; use serde::Serialize; @@ -39,38 +39,59 @@ mod server { } } -fn insert(c: &mut Criterion) { - let addr = "127.0.0.1:6543".parse().unwrap(); - server::start(addr); +#[derive(Row, Serialize)] +struct SomeRow { + a: u64, + b: i64, + c: i32, + d: u32, +} + +async fn run_insert(client: Client, iters: u64) -> Result { + let start = Instant::now(); + let mut insert = client.insert("table")?; - #[derive(Row, Serialize)] - struct SomeRow { - a: u64, - b: i64, - c: i32, - d: u32, + for _ in 0..iters { + insert + .write(&black_box(SomeRow { + a: 42, + b: 42, + c: 42, + d: 42, + })) + .await?; } - async fn run(client: Client, iters: u64) -> Result { - let start = Instant::now(); - let mut insert = client.insert("table")?; - - for _ in 0..iters { - insert - .write(&black_box(SomeRow { - a: 42, - b: 42, - c: 42, - d: 42, - })) - .await?; - } + insert.end().await?; + Ok(start.elapsed()) +} - insert.end().await?; - Ok(start.elapsed()) +async fn run_inserter(client: Client, iters: u64) -> Result { + let start = Instant::now(); + let mut inserter = client.inserter("table")?.with_max_rows(iters); + + for _ in 0..iters { + inserter.write(&black_box(SomeRow { + a: 42, + b: 42, + c: 42, + d: 42, + }))?; + inserter.commit().await?; } - let mut group = c.benchmark_group("insert"); + inserter.end().await?; + Ok(start.elapsed()) +} + +fn run(c: &mut Criterion, name: &str, port: u16, f: impl Fn(Client, u64) -> F) +where + F: Future>, +{ + let addr = format!("127.0.0.1:{port}").parse().unwrap(); + server::start(addr); + + let mut group = c.benchmark_group(name); group.throughput(Throughput::Bytes(mem::size_of::() as u64)); group.bench_function("no compression", |b| { b.iter_custom(|iters| { @@ -78,7 +99,7 @@ fn insert(c: &mut Criterion) { let client = Client::default() .with_url(format!("http://{addr}")) .with_compression(Compression::None); - rt.block_on(run(client, iters)).unwrap() + rt.block_on((f)(client, iters)).unwrap() }) }); #[cfg(feature = "lz4")] @@ -88,7 +109,7 @@ fn insert(c: &mut Criterion) { let client = Client::default() .with_url(format!("http://{addr}")) .with_compression(Compression::Lz4); - rt.block_on(run(client, iters)).unwrap() + rt.block_on((f)(client, iters)).unwrap() }) }); #[cfg(feature = "lz4")] @@ -98,11 +119,19 @@ fn insert(c: &mut Criterion) { let client = Client::default() .with_url(format!("http://{addr}")) .with_compression(Compression::Lz4Hc(4)); - rt.block_on(run(client, iters)).unwrap() + rt.block_on((f)(client, iters)).unwrap() }) }); group.finish(); } -criterion_group!(benches, insert); +fn insert(c: &mut Criterion) { + run(c, "insert", 6543, run_insert); +} + +fn inserter(c: &mut Criterion) { + run(c, "inserter", 6544, run_inserter); +} + +criterion_group!(benches, insert, inserter); criterion_main!(benches); diff --git a/derive/Cargo.toml b/derive/Cargo.toml index efef66a..57f9fa8 100644 --- a/derive/Cargo.toml +++ b/derive/Cargo.toml @@ -11,6 +11,6 @@ proc-macro = true [dependencies] proc-macro2 = "1.0" -syn = "1.0" +syn = "2.0" quote = "1.0" -serde_derive_internals = "0.26" +serde_derive_internals = "0.29" diff --git a/derive/src/lib.rs b/derive/src/lib.rs index eb8937e..51425bf 100644 --- a/derive/src/lib.rs +++ b/derive/src/lib.rs @@ -1,59 +1,10 @@ use proc_macro2::TokenStream; use quote::quote; -use serde_derive_internals::{attr::get_serde_meta_items, Ctxt}; -use syn::{parse_macro_input, Data, DataStruct, DeriveInput, Fields, Ident, Lit, Meta, NestedMeta}; - -/// Parses `#[serde(skip_serializing)]` -fn serde_skipped(cx: &Ctxt, attrs: &[syn::Attribute]) -> bool { - for meta_items in attrs - .iter() - .filter_map(|attr| get_serde_meta_items(cx, attr).ok()) - { - for meta_item in meta_items { - match meta_item { - NestedMeta::Meta(Meta::Path(path)) - if path - .get_ident() - .map_or(false, |i| *i == "skip_serializing") => - { - return true - } - _ => continue, - } - } - } - false -} - -/// Parses `#[serde(rename = "..")]` -fn serde_rename(cx: &Ctxt, field: &syn::Field) -> Option { - for meta_items in field - .attrs - .iter() - .filter_map(|attr| get_serde_meta_items(cx, attr).ok()) - { - for meta_item in meta_items { - match meta_item { - NestedMeta::Meta(Meta::NameValue(nv)) - if nv - .path - .get_ident() - .map_or(false, |i| *i == "rename") => - { - if let Lit::Str(lit) = nv.lit { - return Some(lit.value()); - } - } - _ => continue, - } - } - } - None -} - -fn unraw(ident: &Ident) -> String { - ident.to_string().trim_start_matches("r#").to_owned() -} +use serde_derive_internals::{ + attr::{Default as SerdeDefault, Field}, + Ctxt, +}; +use syn::{parse_macro_input, Data, DataStruct, DeriveInput, Fields}; fn column_names(data: &DataStruct) -> TokenStream { match &data.fields { @@ -62,11 +13,10 @@ fn column_names(data: &DataStruct) -> TokenStream { let column_names_iter = fields .named .iter() - .filter(|f| !serde_skipped(&cx, &f.attrs)) - .map(|f| match serde_rename(&cx, f) { - Some(name) => name, - None => unraw(f.ident.as_ref().unwrap()), - }); + .enumerate() + .map(|(index, field)| Field::from_ast(&cx, index, field, None, &SerdeDefault::None)) + .filter(|field| !field.skip_serializing() && !field.skip_deserializing()) + .map(|field| field.name().serialize_name().to_string()); let tokens = quote! { &[#( #column_names_iter,)*] @@ -100,6 +50,7 @@ pub fn row(input: proc_macro::TokenStream) -> proc_macro::TokenStream { // TODO: replace `clickhouse` with `::clickhouse` here. let expanded = quote! { + #[automatically_derived] impl #impl_generics clickhouse::Row for #name #ty_generics #where_clause { const COLUMN_NAMES: &'static [&'static str] = #column_names; } diff --git a/examples/usage.rs b/examples/usage.rs index a5d386a..c99a064 100644 --- a/examples/usage.rs +++ b/examples/usage.rs @@ -42,15 +42,15 @@ async fn insert(client: &Client) -> Result<()> { async fn inserter(client: &Client) -> Result<()> { let mut inserter = client .inserter("some")? - .with_max_entries(100_000) + .with_max_rows(100_000) .with_period(Some(Duration::from_secs(15))); for i in 0..1000 { if i == 500 { - inserter.set_max_entries(300); + inserter.set_max_rows(300); } - inserter.write(&MyRow { no: i, name: "foo" }).await?; + inserter.write(&MyRow { no: i, name: "foo" })?; inserter.commit().await?; } diff --git a/rustfmt.toml b/rustfmt.toml index f39ac2a..9abe81d 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,3 +1,3 @@ -edition = "2018" +edition = "2021" merge_derives = false diff --git a/src/insert.rs b/src/insert.rs index f0f9984..0b7ea69 100644 --- a/src/insert.rs +++ b/src/insert.rs @@ -19,14 +19,15 @@ use crate::{ const BUFFER_SIZE: usize = 128 * 1024; const MIN_CHUNK_SIZE: usize = BUFFER_SIZE - 1024; // slightly less to avoid extra reallocations -/// Performs only one `INSERT`. +/// Performs one `INSERT`. +/// +/// The [`Insert::end`] must be called to finalize the `INSERT`. +/// Otherwise, the whole `INSERT` will be aborted. /// /// Rows are being sent progressively to spread network load. #[must_use] pub struct Insert { buffer: BytesMut, - #[cfg(feature = "wa-37420")] - chunk_count: usize, sender: Option, #[cfg(feature = "lz4")] compression: Compression, @@ -108,8 +109,6 @@ impl Insert { Ok(Self { buffer: BytesMut::with_capacity(BUFFER_SIZE), - #[cfg(feature = "wa-37420")] - chunk_count: 0, sender: Some(sender), #[cfg(feature = "lz4")] compression: client.compression, @@ -153,7 +152,18 @@ impl Insert { self.end_timeout = end_timeout; } - /// Serializes and writes to the socket a provided row. + /// Serializes the provided row into an internal buffer. + /// Once the buffer is full, it's sent to a background task writing to the socket. + /// + /// Close to: + /// ```ignore + /// async fn write(&self, row: &T) -> Result; + /// ``` + /// + /// A returned future doesn't depend on the row's lifetime. + /// + /// Returns an error if the row cannot be serialized or the background task failed. + /// Once failed, the whole `INSERT` is aborted and cannot be used anymore. /// /// # Panics /// If called after previous call returned an error. @@ -161,12 +171,7 @@ impl Insert { where T: Serialize, { - assert!(self.sender.is_some(), "write() after error"); - - let result = rowbinary::serialize_into(&mut self.buffer, row); - if result.is_err() { - self.abort(); - } + let result = self.do_write(row); async move { result?; @@ -177,10 +182,30 @@ impl Insert { } } - /// Ends `INSERT`. - /// Succeeds if the server returns 200. + #[inline(always)] + pub(crate) fn do_write(&mut self, row: &T) -> Result + where + T: Serialize, + { + assert!(self.sender.is_some(), "write() after error"); + + let old_buf_size = self.buffer.len(); + let result = rowbinary::serialize_into(&mut self.buffer, row); + let written = self.buffer.len() - old_buf_size; + + if result.is_err() { + self.abort(); + } + + result.and(Ok(written)) + } + + /// Ends `INSERT`, the server starts processing the data. + /// + /// Succeeds if the server returns 200, that means the `INSERT` was handled successfully, + /// including all materialized views and quorum writes. /// - /// If it isn't called, the whole `INSERT` is aborted. + /// NOTE: If it isn't called, the whole `INSERT` is aborted. pub async fn end(mut self) -> Result<()> { if !self.buffer.is_empty() { self.send_chunk().await?; @@ -195,10 +220,6 @@ impl Insert { return Ok(()); } - // A temporary workaround for https://github.com/ClickHouse/ClickHouse/issues/37420. - #[cfg(feature = "wa-37420")] - self.prepend_bom(); - // Hyper uses non-trivial and inefficient schema of buffering chunks. // It's difficult to determine when allocations occur. // So, instead we control it manually here and rely on the system allocator. @@ -256,18 +277,6 @@ impl Insert { Ok(mem::replace(&mut self.buffer, BytesMut::with_capacity(BUFFER_SIZE)).freeze()) } - #[cfg(feature = "wa-37420")] - fn prepend_bom(&mut self) { - if self.chunk_count == 0 && self.buffer.starts_with(&[0xef, 0xbb, 0xbf]) { - let mut new_chunk = BytesMut::with_capacity(self.buffer.len() + 3); - new_chunk.extend_from_slice(&[0xef, 0xbb, 0xbf]); - new_chunk.extend_from_slice(&self.buffer); - self.buffer = new_chunk; - } - - self.chunk_count += 1; - } - fn abort(&mut self) { if let Some(sender) = self.sender.take() { sender.abort(); diff --git a/src/inserter.rs b/src/inserter.rs index 186f3ff..20611cc 100644 --- a/src/inserter.rs +++ b/src/inserter.rs @@ -1,41 +1,51 @@ -use std::{future::Future, mem}; +use std::mem; use serde::Serialize; -use tokio::time::{Duration, Instant}; +use tokio::time::Duration; use crate::{error::Result, insert::Insert, row::Row, ticks::Ticks, Client}; -const DEFAULT_MAX_ENTRIES: u64 = 500_000; +const DEFAULT_MAX_ROWS: u64 = 500_000; /// Performs multiple consecutive `INSERT`s. /// +/// By default, it ends the current active `INSERT` every 500_000 rows. +/// Use `with_max_bytes`,`with_max_rows` and `with_period` to modify this behaviour. +/// /// Rows are being sent progressively to spread network load. +/// +/// All rows written by [`Inserter::write()`] between [`Inserter::commit()`] calls +/// are sent in one `INSERT` statement. #[must_use] pub struct Inserter { client: Client, table: String, - max_entries: u64, + max_bytes: u64, + max_rows: u64, send_timeout: Option, end_timeout: Option, - insert: Insert, + insert: Option>, ticks: Ticks, - committed: Quantities, - uncommitted_entries: u64, + pending: Quantities, + in_transaction: bool, } -/// Statistics about inserted rows. +/// Statistics about pending or inserted data. #[derive(Debug, Clone, PartialEq, Eq)] pub struct Quantities { - /// How many rows ([`Inserter::write`]) have been inserted. - pub entries: u64, - /// How many nonempty transactions ([`Inserter::commit`]) have been inserted. + /// The number of uncompressed bytes. + pub bytes: u64, + /// The number for rows (calls of [`Inserter::write`]). + pub rows: u64, + /// The number of nonempty transactions (calls of [`Inserter::commit`]). pub transactions: u64, } impl Quantities { /// Just zero quantities, nothing special. pub const ZERO: Quantities = Quantities { - entries: 0, + bytes: 0, + rows: 0, transactions: 0, }; } @@ -48,17 +58,18 @@ where Ok(Self { client: client.clone(), table: table.into(), - max_entries: DEFAULT_MAX_ENTRIES, + max_bytes: u64::MAX, + max_rows: DEFAULT_MAX_ROWS, send_timeout: None, end_timeout: None, - insert: client.insert(table)?, + insert: None, ticks: Ticks::default(), - committed: Quantities::ZERO, - uncommitted_entries: 0, + pending: Quantities::ZERO, + in_transaction: false, }) } - /// See [`Insert::with_max_entries()`]. + /// See [`Insert::with_timeouts()`]. /// /// Note that [`Inserter::commit()`] can call [`Insert::end()`] inside, /// so `end_timeout` is also applied to `commit()` method. @@ -71,14 +82,25 @@ where self } + /// The maximum number of uncompressed bytes in one `INSERT` statement. + /// + /// Note: ClickHouse inserts batches atomically only if all rows fit in the same partition + /// and their number is less [`max_insert_block_size`](https://clickhouse.tech/docs/en/operations/settings/settings/#settings-max_insert_block_size). + /// + /// Unlimited by default. + pub fn with_max_bytes(mut self, threshold: u64) -> Self { + self.set_max_bytes(threshold); + self + } + /// The maximum number of rows in one `INSERT` statement. /// /// Note: ClickHouse inserts batches atomically only if all rows fit in the same partition /// and their number is less [`max_insert_block_size`](https://clickhouse.tech/docs/en/operations/settings/settings/#settings-max_insert_block_size). /// /// `500_000` by default. - pub fn with_max_entries(mut self, threshold: u64) -> Self { - self.set_max_entries(threshold); + pub fn with_max_rows(mut self, threshold: u64) -> Self { + self.set_max_rows(threshold); self } @@ -89,6 +111,12 @@ where /// However, it's possible to use [`Inserter::time_left()`] and set a timer up /// to call [`Inserter::commit()`] to check passed time again. /// + /// Extra ticks are skipped if the previous `INSERT` is still in progress: + /// ```text + /// Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 | + /// Actual ticks: | work -----| delay | work ---| work -----| work -----| + /// ``` + /// /// `None` by default. pub fn with_period(mut self, period: Option) -> Self { self.set_period(period); @@ -96,32 +124,35 @@ where } /// Adds a bias to the period. The actual period will be in the following range: - /// ```ignore + /// ```text /// [period * (1 - bias), period * (1 + bias)] /// ``` /// + /// The `bias` parameter is clamped to the range `[0, 1]`. + /// /// It helps to avoid producing a lot of `INSERT`s at the same time by multiple inserters. pub fn with_period_bias(mut self, bias: f64) -> Self { self.set_period_bias(bias); self } - #[deprecated(note = "use `with_period()` instead")] - pub fn with_max_duration(mut self, threshold: Duration) -> Self { - self.set_period(Some(threshold)); - self - } - /// See [`Inserter::with_timeouts()`]. pub fn set_timeouts(&mut self, send_timeout: Option, end_timeout: Option) { self.send_timeout = send_timeout; self.end_timeout = end_timeout; - self.insert.set_timeouts(send_timeout, end_timeout); + if let Some(insert) = &mut self.insert { + insert.set_timeouts(self.send_timeout, self.end_timeout); + } + } + + /// See [`Inserter::with_max_bytes()`]. + pub fn set_max_bytes(&mut self, threshold: u64) { + self.max_bytes = threshold; } - /// See [`Inserter::with_max_entries()`]. - pub fn set_max_entries(&mut self, threshold: u64) { - self.max_entries = threshold; + /// See [`Inserter::with_max_rows()`]. + pub fn set_max_rows(&mut self, threshold: u64) { + self.max_rows = threshold; } /// See [`Inserter::with_period()`]. @@ -136,47 +167,58 @@ where self.ticks.reschedule(); } - #[deprecated(note = "use `set_period()` instead")] - pub fn set_max_duration(&mut self, threshold: Duration) { - self.ticks.set_period(Some(threshold)); - } - /// How much time we have until the next tick. /// /// `None` if the period isn't configured. pub fn time_left(&mut self) -> Option { - Some( - self.ticks - .next_at()? - .saturating_duration_since(Instant::now()), - ) + self.ticks.time_left() + } + + /// Returns statistics about data not yet inserted into ClickHouse. + pub fn pending(&self) -> &Quantities { + &self.pending } - /// Serializes and writes to the socket a provided row. + /// Serializes the provided row into an internal buffer. + /// + /// To check limits and sent to ClickHouse, call [`Inserter::commit()`]. /// /// # Panics /// If called after previous call returned an error. #[inline] - pub fn write<'a>(&'a mut self, row: &T) -> impl Future> + 'a + Send + pub fn write(&mut self, row: &T) -> Result<()> where T: Serialize, { - self.uncommitted_entries += 1; - self.insert.write(row) - } + if self.insert.is_none() { + self.init_insert()?; + } - /// Checks limits and ends a current `INSERT` if they are reached. - pub async fn commit(&mut self) -> Result { - if self.uncommitted_entries > 0 { - self.committed.entries += self.uncommitted_entries; - self.committed.transactions += 1; - self.uncommitted_entries = 0; + match self.insert.as_mut().unwrap().do_write(row) { + Ok(bytes) => { + self.pending.bytes += bytes as u64; + self.pending.rows += 1; + + if !self.in_transaction { + self.pending.transactions += 1; + self.in_transaction = true; + } + + Ok(()) + } + Err(err) => { + self.pending = Quantities::ZERO; + Err(err) + } } + } - let now = Instant::now(); + /// Checks limits and ends the current `INSERT` if they are reached. + pub async fn commit(&mut self) -> Result { + self.in_transaction = false; - Ok(if self.is_threshold_reached(now) { - let quantities = mem::replace(&mut self.committed, Quantities::ZERO); + Ok(if self.limits_reached() { + let quantities = mem::replace(&mut self.pending, Quantities::ZERO); let result = self.insert().await; self.ticks.reschedule(); result?; @@ -186,23 +228,36 @@ where }) } - /// Ends a current `INSERT` and whole `Inserter` unconditionally. + /// Ends the current `INSERT` and whole `Inserter` unconditionally. /// /// If it isn't called, the current `INSERT` is aborted. - pub async fn end(self) -> Result { - self.insert.end().await?; - Ok(self.committed) + pub async fn end(mut self) -> Result { + self.insert().await?; + Ok(self.pending) } - fn is_threshold_reached(&self, now: Instant) -> bool { - self.committed.entries >= self.max_entries - || self.ticks.next_at().map_or(false, |next_at| now >= next_at) + fn limits_reached(&self) -> bool { + self.pending.rows >= self.max_rows + || self.pending.bytes >= self.max_bytes + || self.ticks.reached() } async fn insert(&mut self) -> Result<()> { - let mut new_insert = self.client.insert(&self.table)?; // Actually it mustn't fail. + if let Some(insert) = self.insert.take() { + insert.end().await?; + } + Ok(()) + } + + #[cold] + #[inline(never)] + fn init_insert(&mut self) -> Result<()> { + debug_assert!(self.insert.is_none()); + debug_assert_eq!(self.pending, Quantities::ZERO); + + let mut new_insert: Insert = self.client.insert(&self.table)?; new_insert.set_timeouts(self.send_timeout, self.end_timeout); - let insert = mem::replace(&mut self.insert, new_insert); - insert.end().await + self.insert = Some(new_insert); + Ok(()) } } diff --git a/src/lib.rs b/src/lib.rs index 9266135..5e9660b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,13 +32,6 @@ pub mod test; #[cfg(feature = "watch")] pub mod watch; -#[cfg(feature = "uuid")] -#[doc(hidden)] -#[deprecated(since = "0.11.1", note = "use `clickhouse::serde::uuid` instead")] -pub mod uuid { - pub use crate::serde::uuid::*; -} - mod buflist; mod compression; mod cursor; @@ -77,7 +70,10 @@ impl Default for Client { connector.set_keepalive(Some(TCP_KEEPALIVE)); #[cfg(feature = "tls")] - let connector = HttpsConnector::new_with_connector(connector); + let connector = HttpsConnector::new_with_connector({ + connector.enforce_http(false); + connector + }); let client = hyper::Client::builder() .pool_idle_timeout(POOL_IDLE_TIMEOUT) @@ -204,6 +200,9 @@ impl Client { } /// Starts a new WATCH query. + /// + /// The `query` can be either the table name or a SELECT query. + /// In the second case, a new LV table is created. #[cfg(feature = "watch")] pub fn watch(&self, query: &str) -> watch::Watch { watch::Watch::new(self, query) diff --git a/src/row.rs b/src/row.rs index 11361ff..c46fc55 100644 --- a/src/row.rs +++ b/src/row.rs @@ -4,6 +4,7 @@ pub trait Row { const COLUMN_NAMES: &'static [&'static str]; // TODO: count + // TODO: different list for SELECT/INSERT (de/ser) } // Actually, it's not public now. @@ -138,6 +139,21 @@ mod tests { assert_eq!(join_column_names::().unwrap(), "`one`"); } + #[test] + fn it_skips_deserializing() { + use serde::Deserialize; + + #[derive(Row, Deserialize)] + #[allow(dead_code)] + struct TopLevel { + one: u32, + #[serde(skip_deserializing)] + two: u32, + } + + assert_eq!(join_column_names::().unwrap(), "`one`"); + } + #[test] fn it_rejects_other() { #[derive(Row)] diff --git a/src/sql/escape.rs b/src/sql/escape.rs index 8178153..e43cde6 100644 --- a/src/sql/escape.rs +++ b/src/sql/escape.rs @@ -35,13 +35,13 @@ fn escape(src: &str, mut dst: impl fmt::Write, ch: char) -> fmt::Result { #[test] fn it_escapes_string() { let mut actual = String::new(); - string(r#"f\o'o '' b\'ar'"#, &mut actual).unwrap(); - assert_eq!(actual, r#"'f\\o\'o \'\' b\\\'ar\''"#); + string(r"f\o'o '' b\'ar'", &mut actual).unwrap(); + assert_eq!(actual, r"'f\\o\'o \'\' b\\\'ar\''"); } #[test] fn it_escapes_identifier() { let mut actual = String::new(); - identifier(r#"f\o`o `` b\`ar`"#, &mut actual).unwrap(); - assert_eq!(actual, r#"`f\\o\`o \`\` b\\\`ar\``"#); + identifier(r"f\o`o `` b\`ar`", &mut actual).unwrap(); + assert_eq!(actual, r"`f\\o\`o \`\` b\\\`ar\``"); } diff --git a/src/ticks.rs b/src/ticks.rs index 37d2157..d6ea56a 100644 --- a/src/ticks.rs +++ b/src/ticks.rs @@ -1,7 +1,18 @@ -use tokio::time::{Duration, Instant}; +use tokio::time::Duration; const PERIOD_THRESHOLD: Duration = Duration::from_secs(365 * 24 * 3600); +// === Instant === + +// More efficient `Instant` based on TSC. +#[cfg(all(feature = "quanta", not(feature = "test-util")))] +type Instant = quanta::Instant; + +#[cfg(any(not(feature = "quanta"), feature = "test-util"))] +type Instant = tokio::time::Instant; + +// === Ticks === + pub(crate) struct Ticks { period: Duration, max_bias: f64, @@ -29,8 +40,13 @@ impl Ticks { self.max_bias = max_bias.clamp(0., 1.); } - pub(crate) fn next_at(&self) -> Option { + pub(crate) fn time_left(&self) -> Option { self.next_at + .map(|n| n.saturating_duration_since(Instant::now())) + } + + pub(crate) fn reached(&self) -> bool { + self.next_at.map_or(false, |n| Instant::now() >= n) } pub(crate) fn reschedule(&mut self) { @@ -62,91 +78,113 @@ impl Ticks { } } -#[tokio::test] -async fn it_works() { - tokio::time::pause(); - let origin = Instant::now(); - - // No bias. - let mut ticks = Ticks::default(); - ticks.set_period(Some(Duration::from_secs(10))); - ticks.reschedule(); - - assert_eq!(ticks.next_at().unwrap() - origin, Duration::from_secs(10)); - tokio::time::advance(Duration::from_secs(3)).await; - ticks.reschedule(); - assert_eq!(ticks.next_at().unwrap() - origin, Duration::from_secs(10)); - tokio::time::advance(Duration::from_secs(7)).await; - ticks.reschedule(); - assert_eq!(ticks.next_at().unwrap() - origin, Duration::from_secs(20)); - - // Up to 10% bias. - ticks.set_period_bias(0.1); - ticks.reschedule(); - assert_eq!(ticks.next_at().unwrap() - origin, Duration::from_secs(19)); - tokio::time::advance(Duration::from_secs(12)).await; - ticks.reschedule(); - assert_eq!(ticks.next_at().unwrap() - origin, Duration::from_secs(29)); - - // Try other seeds. - tokio::time::advance(Duration::from_nanos(32768)).await; - ticks.reschedule(); - assert_eq!( - (ticks.next_at().unwrap() - origin).as_secs_f64().round(), - 30. - ); - - tokio::time::advance(Duration::from_nanos(32767)).await; - ticks.reschedule(); - assert_eq!(ticks.next_at().unwrap() - origin, Duration::from_secs(31)); -} +#[cfg(test)] +mod tests { + use super::*; + + #[cfg(feature = "test-util")] // only with `tokio::time::Instant` + #[tokio::test(start_paused = true)] + async fn smoke() { + // No bias. + let mut ticks = Ticks::default(); + ticks.set_period(Some(Duration::from_secs(10))); + ticks.reschedule(); + + assert_eq!(ticks.time_left(), Some(Duration::from_secs(10))); + assert!(!ticks.reached()); + tokio::time::advance(Duration::from_secs(3)).await; + ticks.reschedule(); + assert_eq!(ticks.time_left(), Some(Duration::from_secs(7))); + assert!(!ticks.reached()); + tokio::time::advance(Duration::from_secs(7)).await; + assert!(ticks.reached()); + ticks.reschedule(); + assert_eq!(ticks.time_left(), Some(Duration::from_secs(10))); + assert!(!ticks.reached()); + + // Up to 10% bias. + ticks.set_period_bias(0.1); + ticks.reschedule(); + assert_eq!(ticks.time_left(), Some(Duration::from_secs(9))); + assert!(!ticks.reached()); + tokio::time::advance(Duration::from_secs(12)).await; + assert!(ticks.reached()); + ticks.reschedule(); + assert_eq!(ticks.time_left(), Some(Duration::from_secs(7))); + assert!(!ticks.reached()); + + // Try other seeds. + tokio::time::advance(Duration::from_nanos(32768)).await; + ticks.reschedule(); + assert_eq!( + ticks.time_left(), + Some(Duration::from_secs_f64(7.999982492)) + ); + + tokio::time::advance(Duration::from_nanos(32767)).await; + ticks.reschedule(); + assert_eq!( + ticks.time_left(), + Some(Duration::from_secs_f64(8.999934465)) + ); + } -#[tokio::test] -async fn it_skips_extra_ticks() { - tokio::time::pause(); - let origin = Instant::now(); - - let mut ticks = Ticks::default(); - ticks.set_period(Some(Duration::from_secs(10))); - ticks.set_period_bias(0.1); - ticks.reschedule(); - - // Trivial case, just skip several ticks. - assert_eq!(ticks.next_at().unwrap() - origin, Duration::from_secs(9)); - tokio::time::advance(Duration::from_secs(30)).await; - ticks.reschedule(); - assert_eq!(ticks.next_at().unwrap() - origin, Duration::from_secs(39)); - - // Hit biased zone. - tokio::time::advance(Duration::from_secs(19)).await; - ticks.reschedule(); - assert_eq!(ticks.next_at().unwrap() - origin, Duration::from_secs(59)); -} + #[cfg(feature = "test-util")] // only with `tokio::time::Instant` + #[tokio::test(start_paused = true)] + async fn skip_extra_ticks() { + let mut ticks = Ticks::default(); + ticks.set_period(Some(Duration::from_secs(10))); + ticks.set_period_bias(0.1); + ticks.reschedule(); + + // Trivial case, just skip several ticks. + assert_eq!(ticks.time_left(), Some(Duration::from_secs(9))); + assert!(!ticks.reached()); + tokio::time::advance(Duration::from_secs(30)).await; + assert!(ticks.reached()); + ticks.reschedule(); + assert_eq!(ticks.time_left(), Some(Duration::from_secs(9))); + assert!(!ticks.reached()); + + // Hit biased zone. + tokio::time::advance(Duration::from_secs(19)).await; + assert!(ticks.reached()); + ticks.reschedule(); + assert_eq!(ticks.time_left(), Some(Duration::from_secs(10))); + assert!(!ticks.reached()); + } -#[tokio::test] -async fn it_is_disabled() { - let mut ticks = Ticks::default(); - assert!(ticks.next_at().is_none()); - ticks.reschedule(); - assert!(ticks.next_at().is_none()); - - // Not disabled. - ticks.set_period(Some(Duration::from_secs(10))); - ticks.reschedule(); - assert!(ticks.next_at().is_some()); - - // Explicitly. - ticks.set_period(None); - ticks.reschedule(); - assert!(ticks.next_at().is_none()); - - // Zero duration. - ticks.set_period(Some(Duration::from_secs(0))); - ticks.reschedule(); - assert!(ticks.next_at().is_none()); - - // Too big duration. - ticks.set_period(Some(PERIOD_THRESHOLD)); - ticks.reschedule(); - assert!(ticks.next_at().is_none()); + #[tokio::test] + async fn disabled() { + let mut ticks = Ticks::default(); + assert_eq!(ticks.time_left(), None); + assert!(!ticks.reached()); + ticks.reschedule(); + assert_eq!(ticks.time_left(), None); + assert!(!ticks.reached()); + + // Not disabled. + ticks.set_period(Some(Duration::from_secs(10))); + ticks.reschedule(); + assert!(ticks.time_left().unwrap() < Duration::from_secs(10)); + assert!(!ticks.reached()); + + // Explicitly. + ticks.set_period(None); + ticks.reschedule(); + assert_eq!(ticks.time_left(), None); + assert!(!ticks.reached()); + + // Zero duration. + ticks.set_period(Some(Duration::from_secs(0))); + ticks.reschedule(); + assert_eq!(ticks.time_left(), None); + assert!(!ticks.reached()); + + // Too big duration. + ticks.set_period(Some(PERIOD_THRESHOLD)); + ticks.reschedule(); + assert_eq!(ticks.time_left(), None); + assert!(!ticks.reached()); + } } diff --git a/src/watch.rs b/src/watch.rs index 113c883..aacadd9 100644 --- a/src/watch.rs +++ b/src/watch.rs @@ -29,15 +29,15 @@ impl Watch { self } - // TODO: `timeout()`. - /// Limits the number of updates after initial one. pub fn limit(mut self, limit: impl Into>) -> Self { self.limit = limit.into(); self } - /// See [docs](https://clickhouse.tech/docs/en/sql-reference/statements/create/view/#live-view-with-refresh) + /// See [docs](https://clickhouse.com/docs/en/sql-reference/statements/create/view#with-refresh-clause). + /// + /// Makes sense only for SQL queries (`client.watch("SELECT X")`). pub fn refresh(mut self, interval: impl Into>) -> Self { self.refresh = interval.into(); self @@ -227,10 +227,10 @@ async fn init_cursor(client: &Client, params: &WatchParams) -> Result { - common::_priv::prepare_database(file!(), function_name!()).await - }; -} - -pub(crate) use {function_name::named, prepare_database}; -pub(crate) mod _priv { - use super::*; - - pub async fn prepare_database(file_path: &str, fn_name: &str) -> Client { - let name = make_db_name(file_path, fn_name); - let client = Client::default().with_url(format!("http://{HOST}")); - - client - .query("DROP DATABASE IF EXISTS ?") - .bind(sql::Identifier(&name)) - .execute() - .await - .expect("cannot drop db"); - - client - .query("CREATE DATABASE ?") - .bind(sql::Identifier(&name)) - .execute() - .await - .expect("cannot create db"); - - client.with_database(name) - } - - fn make_db_name(file_path: &str, fn_name: &str) -> String { - let (_, basename) = file_path.rsplit_once('/').expect("invalid file's path"); - let prefix = basename.strip_suffix(".rs").expect("invalid file's path"); - format!("{prefix}__{fn_name}") - } -} diff --git a/tests/test_compression.rs b/tests/it/compression.rs similarity index 65% rename from tests/test_compression.rs rename to tests/it/compression.rs index 206c7f5..8994b7e 100644 --- a/tests/test_compression.rs +++ b/tests/it/compression.rs @@ -2,8 +2,6 @@ use serde::{Deserialize, Serialize}; use clickhouse::{Client, Compression, Row}; -mod common; - async fn check(client: Client) { #[derive(Debug, Row, Serialize, Deserialize)] struct MyRow<'a> { @@ -23,14 +21,11 @@ async fn check(client: Client) { .await .unwrap(); - let mut inserter = client.inserter("test").unwrap(); - + let mut insert = client.insert("test").unwrap(); for i in 0..200_000 { - inserter.write(&MyRow { no: i, name: "foo" }).await.unwrap(); - inserter.commit().await.unwrap(); + insert.write(&MyRow { no: i, name: "foo" }).await.unwrap(); } - - inserter.end().await.unwrap(); + insert.end().await.unwrap(); // Check data. @@ -44,25 +39,22 @@ async fn check(client: Client) { assert_eq!(sum_len, 600_000); } -#[common::named] #[tokio::test] async fn none() { - let client = common::prepare_database!().with_compression(Compression::None); + let client = prepare_database!().with_compression(Compression::None); check(client).await; } #[cfg(feature = "lz4")] -#[common::named] #[tokio::test] async fn lz4() { - let client = common::prepare_database!().with_compression(Compression::Lz4); + let client = prepare_database!().with_compression(Compression::Lz4); check(client).await; } #[cfg(feature = "lz4")] -#[common::named] #[tokio::test] async fn lz4_hc() { - let client = common::prepare_database!().with_compression(Compression::Lz4Hc(4)); + let client = prepare_database!().with_compression(Compression::Lz4Hc(4)); check(client).await; } diff --git a/tests/test_cursor_error.rs b/tests/it/cursor_error.rs similarity index 91% rename from tests/test_cursor_error.rs rename to tests/it/cursor_error.rs index 2f1b85a..21d1bad 100644 --- a/tests/test_cursor_error.rs +++ b/tests/it/cursor_error.rs @@ -1,18 +1,14 @@ use clickhouse::{Client, Compression}; -mod common; - -#[common::named] #[tokio::test] async fn deferred() { - let client = common::prepare_database!(); + let client = prepare_database!(); max_execution_time(client, false).await; } -#[common::named] #[tokio::test] async fn wait_end_of_query() { - let client = common::prepare_database!(); + let client = prepare_database!(); max_execution_time(client, true).await; } @@ -48,10 +44,9 @@ async fn max_execution_time(mut client: Client, wait_end_of_query: bool) { } #[cfg(feature = "lz4")] -#[common::named] #[tokio::test] async fn deferred_lz4() { - let client = common::prepare_database!().with_compression(Compression::Lz4); + let client = prepare_database!().with_compression(Compression::Lz4); client .query("CREATE TABLE test(no UInt32) ENGINE = MergeTree ORDER BY no") diff --git a/tests/it/inserter.rs b/tests/it/inserter.rs new file mode 100644 index 0000000..a6d35aa --- /dev/null +++ b/tests/it/inserter.rs @@ -0,0 +1,152 @@ +use serde::Serialize; + +use clickhouse::{inserter::Quantities, Client, Row}; + +#[derive(Debug, Row, Serialize)] +struct MyRow { + data: String, +} + +async fn create_table(client: &Client) { + client + .query("CREATE TABLE test(data String) ENGINE = MergeTree ORDER BY data") + .execute() + .await + .unwrap(); +} + +#[tokio::test] +async fn limited_by_rows() { + let client = prepare_database!(); + create_table(&client).await; + + let mut inserter = client.inserter("test").unwrap().with_max_rows(10); + let rows = 100; + + for i in (2..=rows).step_by(2) { + let row = MyRow { + data: (i - 1).to_string(), + }; + inserter.write(&row).unwrap(); + let row = MyRow { + data: i.to_string(), + }; + inserter.write(&row).unwrap(); + + let inserted = inserter.commit().await.unwrap(); + let pending = inserter.pending(); + + if i % 10 == 0 { + assert_ne!(inserted.bytes, 0); + assert_eq!(inserted.rows, 10); + assert_eq!(inserted.transactions, 5); + assert_eq!(pending, &Quantities::ZERO); + } else { + assert_eq!(inserted, Quantities::ZERO); + assert_ne!(pending.bytes, 0); + assert_eq!(pending.rows, i % 10); + assert_eq!(pending.transactions, (i % 10) / 2); + } + } + + assert_eq!(inserter.end().await.unwrap(), Quantities::ZERO); + + let (count, sum) = client + .query("SELECT count(), sum(toUInt64(data)) FROM test") + .fetch_one::<(u64, u64)>() + .await + .unwrap(); + + assert_eq!(count, rows); + assert_eq!(sum, (1..=rows).sum::()); +} + +#[tokio::test] +async fn limited_by_bytes() { + let client = prepare_database!(); + create_table(&client).await; + + let mut inserter = client.inserter("test").unwrap().with_max_bytes(100); + let rows = 100; + + let row = MyRow { + data: "x".repeat(9), // +1 for length + }; + + for i in 1..=rows { + inserter.write(&row).unwrap(); + + let inserted = inserter.commit().await.unwrap(); + let pending = inserter.pending(); + + if i % 10 == 0 { + assert_eq!(inserted.bytes, 100); + assert_eq!(inserted.rows, 10); + assert_eq!(inserted.transactions, 10); + assert_eq!(pending, &Quantities::ZERO); + } else { + assert_eq!(inserted, Quantities::ZERO); + assert_eq!(pending.bytes, (i % 10) * 10); + assert_eq!(pending.rows, i % 10); + assert_eq!(pending.transactions, i % 10); + } + } + + assert_eq!(inserter.end().await.unwrap(), Quantities::ZERO); + + let count = client + .query("SELECT count() FROM test") + .fetch_one::() + .await + .unwrap(); + + assert_eq!(count, rows); +} + +#[cfg(feature = "test-util")] // only with `tokio::time::Instant` +#[tokio::test(start_paused = true)] +async fn limited_by_time() { + use std::time::Duration; + + let client = prepare_database!(); + create_table(&client).await; + + let period = Duration::from_secs(1); + let mut inserter = client.inserter("test").unwrap().with_period(Some(period)); + let rows = 100; + + for i in 1..=rows { + let row = MyRow { + data: i.to_string(), + }; + inserter.write(&row).unwrap(); + + tokio::time::sleep(period / 10).await; + + let inserted = inserter.commit().await.unwrap(); + let pending = inserter.pending(); + + if i % 10 == 0 { + assert_ne!(inserted.bytes, 0); + assert_eq!(inserted.rows, 10); + assert_eq!(inserted.transactions, 10); + assert_eq!(pending, &Quantities::ZERO); + } else { + assert_eq!(inserted, Quantities::ZERO); + assert_ne!(pending.bytes, 0); + assert_eq!(pending.rows, i % 10); + assert_eq!(pending.transactions, i % 10); + } + } + + assert_eq!(inserter.end().await.unwrap(), Quantities::ZERO); + + let (count, sum) = client + .query("SELECT count(), sum(toUInt64(data)) FROM test") + .fetch_one::<(u64, u64)>() + .await + .unwrap(); + + assert_eq!(count, rows); + assert_eq!(sum, (1..=rows).sum::()); +} diff --git a/tests/test_ip.rs b/tests/it/ip.rs similarity index 95% rename from tests/test_ip.rs rename to tests/it/ip.rs index 966ca09..3476dd0 100644 --- a/tests/test_ip.rs +++ b/tests/it/ip.rs @@ -4,12 +4,9 @@ use serde::{Deserialize, Serialize}; use clickhouse::Row; -mod common; - -#[common::named] #[tokio::test] async fn smoke() { - let client = common::prepare_database!(); + let client = prepare_database!(); #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Row)] struct MyRow { diff --git a/tests/it/main.rs b/tests/it/main.rs new file mode 100644 index 0000000..5e5b952 --- /dev/null +++ b/tests/it/main.rs @@ -0,0 +1,60 @@ +use clickhouse::{sql, Client}; + +macro_rules! prepare_database { + () => { + crate::_priv::prepare_database({ + fn f() {} + fn type_name_of_val(_: T) -> &'static str { + std::any::type_name::() + } + type_name_of_val(f) + }) + .await + }; +} + +mod compression; +mod cursor_error; +mod inserter; +mod ip; +mod nested; +mod query; +mod time; +mod uuid; +mod watch; + +const HOST: &str = "localhost:8123"; + +mod _priv { + use super::*; + + pub(crate) async fn prepare_database(fn_path: &str) -> Client { + let name = make_db_name(fn_path); + let client = Client::default().with_url(format!("http://{HOST}")); + + client + .query("DROP DATABASE IF EXISTS ?") + .bind(sql::Identifier(&name)) + .execute() + .await + .expect("cannot drop db"); + + client + .query("CREATE DATABASE ?") + .bind(sql::Identifier(&name)) + .execute() + .await + .expect("cannot create db"); + + client.with_database(name) + } + + // `it::compression::lz4::{{closure}}::f` -> `chrs__compression__lz4` + fn make_db_name(fn_path: &str) -> String { + assert!(fn_path.starts_with("it::")); + let mut iter = fn_path.split("::").skip(1); + let module = iter.next().unwrap(); + let test = iter.next().unwrap(); + format!("chrs__{module}__{test}") + } +} diff --git a/tests/test_nested.rs b/tests/it/nested.rs similarity index 93% rename from tests/test_nested.rs rename to tests/it/nested.rs index daf75b1..928ac72 100644 --- a/tests/test_nested.rs +++ b/tests/it/nested.rs @@ -2,12 +2,9 @@ use serde::{Deserialize, Serialize}; use clickhouse::Row; -mod common; - -#[common::named] #[tokio::test] async fn smoke() { - let client = common::prepare_database!(); + let client = prepare_database!(); #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Row)] struct MyRow { diff --git a/tests/test_query.rs b/tests/it/query.rs similarity index 94% rename from tests/test_query.rs rename to tests/it/query.rs index 6757cbd..85c6ce3 100644 --- a/tests/test_query.rs +++ b/tests/it/query.rs @@ -2,12 +2,9 @@ use serde::{Deserialize, Serialize}; use clickhouse::{error::Error, AsyncInsertOptions, Row}; -mod common; - -#[common::named] #[tokio::test] async fn smoke() { - let client = common::prepare_database!(); + let client = prepare_database!(); #[derive(Debug, Row, Serialize, Deserialize)] struct MyRow<'a> { @@ -54,7 +51,6 @@ async fn smoke() { } } -#[common::named] #[tokio::test] async fn async_insert() { let client = common::prepare_database!(); @@ -130,7 +126,7 @@ async fn async_insert() { #[common::named] #[tokio::test] async fn fetch_one_and_optional() { - let client = common::prepare_database!(); + let client = prepare_database!(); client .query("CREATE TABLE test(n String) ENGINE = MergeTree ORDER BY n") @@ -163,10 +159,9 @@ async fn fetch_one_and_optional() { } // See #19. -#[common::named] #[tokio::test] async fn long_query() { - let client = common::prepare_database!(); + let client = prepare_database!(); client .query("CREATE TABLE test(n String) ENGINE = MergeTree ORDER BY n") @@ -187,10 +182,9 @@ async fn long_query() { } // See #22. -#[common::named] #[tokio::test] async fn big_borrowed_str() { - let client = common::prepare_database!(); + let client = prepare_database!(); #[derive(Debug, Row, Serialize, Deserialize)] struct MyRow<'a> { @@ -226,10 +220,9 @@ async fn big_borrowed_str() { } // See #31. -#[common::named] #[tokio::test] async fn all_floats() { - let client = common::prepare_database!(); + let client = prepare_database!(); client .query("CREATE TABLE test(no UInt32, f Float64) ENGINE = MergeTree ORDER BY no") diff --git a/tests/test_time.rs b/tests/it/time.rs similarity index 97% rename from tests/test_time.rs rename to tests/it/time.rs index 6459f98..9a73653 100644 --- a/tests/test_time.rs +++ b/tests/it/time.rs @@ -8,12 +8,9 @@ use time::{macros::datetime, Date, OffsetDateTime}; use clickhouse::Row; -mod common; - -#[common::named] #[tokio::test] async fn datetime() { - let client = common::prepare_database!(); + let client = prepare_database!(); #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Row)] struct MyRow { @@ -116,10 +113,9 @@ async fn datetime() { assert_eq!(row_str.dt64ns, &original_row.dt64ns.to_string()[..29]); } -#[common::named] #[tokio::test] async fn date() { - let client = common::prepare_database!(); + let client = prepare_database!(); #[derive(Debug, Serialize, Deserialize, Row)] struct MyRow { @@ -170,10 +166,9 @@ async fn date() { } } -#[common::named] #[tokio::test] async fn date32() { - let client = common::prepare_database!(); + let client = prepare_database!(); #[derive(Debug, Serialize, Deserialize, Row)] struct MyRow { diff --git a/tests/test_uuid.rs b/tests/it/uuid.rs similarity index 94% rename from tests/test_uuid.rs rename to tests/it/uuid.rs index eb12483..14f57cc 100644 --- a/tests/test_uuid.rs +++ b/tests/it/uuid.rs @@ -5,12 +5,9 @@ use uuid::Uuid; use clickhouse::Row; -mod common; - -#[common::named] #[tokio::test] async fn smoke() { - let client = common::prepare_database!(); + let client = prepare_database!(); #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Row)] struct MyRow { diff --git a/tests/test_watch.rs b/tests/it/watch.rs similarity index 95% rename from tests/test_watch.rs rename to tests/it/watch.rs index e42d843..560bac8 100644 --- a/tests/test_watch.rs +++ b/tests/it/watch.rs @@ -4,8 +4,6 @@ use serde::{Deserialize, Serialize}; use clickhouse::{Client, Row}; -mod common; - #[derive(Debug, PartialEq, Row, Serialize, Deserialize)] struct MyRow { num: u32, @@ -33,10 +31,9 @@ async fn insert_into_table(client: &Client, rows: &[MyRow]) { insert.end().await.unwrap(); } -#[common::named] #[tokio::test] async fn changes() { - let client = common::prepare_database!(); + let client = prepare_database!(); create_table(&client).await; @@ -71,10 +68,9 @@ async fn changes() { assert_eq!(cursor2.next().await.unwrap(), Some((3, MyRow { num: 21 }))); } -#[common::named] #[tokio::test] async fn events() { - let client = common::prepare_database!(); + let client = prepare_database!(); create_table(&client).await; diff --git a/tests/test_wa_37420.rs b/tests/test_wa_37420.rs deleted file mode 100644 index 9851b8f..0000000 --- a/tests/test_wa_37420.rs +++ /dev/null @@ -1,72 +0,0 @@ -#![cfg(feature = "wa-37420")] - -use serde::{Deserialize, Serialize}; - -use clickhouse::Row; - -mod common; - -#[common::named] -#[tokio::test] -async fn smoke() { - let client = common::prepare_database!(); - - #[derive(Debug, Row, Serialize, Deserialize)] - pub struct Row { - time: u64, - name: String, - } - - #[derive(Debug, Row, Serialize, Deserialize)] - pub struct Row2 { - name: String, - time: u64, - } - - // Create a table. - client - .query("CREATE TABLE test(name String, time UInt64) ENGINE = MergeTree ORDER BY time") - .execute() - .await - .unwrap(); - - let row = Row { - time: 1651760768976141295, - name: "first".into(), - }; - - // Write to the table. - let mut insert = client.insert("test").unwrap(); - insert.write(&row).await.unwrap(); - insert.end().await.unwrap(); - - // Read from the table. - let inserted = client - .query("SELECT ?fields FROM test WHERE name = 'first'") - .fetch_one::() - .await - .unwrap(); - - assert_eq!(inserted.time, row.time); - - // ------- - - let row = Row2 { - name: "second".into(), - time: 1651760768976141295, - }; - - // Write to the table. - let mut insert = client.insert("test").unwrap(); - insert.write(&row).await.unwrap(); - insert.end().await.unwrap(); - - // Read from the table. - let inserted = client - .query("SELECT ?fields FROM test WHERE name = 'second'") - .fetch_one::() - .await - .unwrap(); - - assert_eq!(inserted.time, row.time); -}