From bd0c38104dba45fbc20be14304d92435565942dc Mon Sep 17 00:00:00 2001 From: Nikita Lapkov <5737185+laplab@users.noreply.github.com> Date: Fri, 9 Jun 2023 16:46:50 +0100 Subject: [PATCH 01/24] feat(inserter): start new insert only when the first row is provided (#68) feat(inserter): start new insert only when the first row is provided --- CHANGELOG.md | 4 ++++ src/inserter.rs | 42 ++++++++++++++++++++++++++++++++---------- 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bc84b72..6dd9c98 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] - ReleaseDate +### Changed +- inserter: start new insert only when the first row is provided ([#68]). + +[#68]: https://github.com/loyd/clickhouse.rs/pull/68 ## [0.11.4] - 2023-05-14 ### Added diff --git a/src/inserter.rs b/src/inserter.rs index 186f3ff..6833386 100644 --- a/src/inserter.rs +++ b/src/inserter.rs @@ -1,5 +1,9 @@ -use std::{future::Future, mem}; +use std::mem; +use futures::{ + future::{self, Either}, + Future, +}; use serde::Serialize; use tokio::time::{Duration, Instant}; @@ -17,7 +21,7 @@ pub struct Inserter { max_entries: u64, send_timeout: Option, end_timeout: Option, - insert: Insert, + insert: Option>, ticks: Ticks, committed: Quantities, uncommitted_entries: u64, @@ -51,7 +55,7 @@ where max_entries: DEFAULT_MAX_ENTRIES, send_timeout: None, end_timeout: None, - insert: client.insert(table)?, + insert: None, ticks: Ticks::default(), committed: Quantities::ZERO, uncommitted_entries: 0, @@ -116,7 +120,9 @@ where pub fn set_timeouts(&mut self, send_timeout: Option, end_timeout: Option) { self.send_timeout = send_timeout; self.end_timeout = end_timeout; - self.insert.set_timeouts(send_timeout, end_timeout); + if let Some(insert) = &mut self.insert { + insert.set_timeouts(self.send_timeout, self.end_timeout); + } } /// See [`Inserter::with_max_entries()`]. @@ -162,7 +168,12 @@ where T: Serialize, { self.uncommitted_entries += 1; - self.insert.write(row) + if self.insert.is_none() { + if let Err(e) = self.init_insert() { + return Either::Right(future::ready(Result::<()>::Err(e))); + } + } + Either::Left(self.insert.as_mut().unwrap().write(row)) } /// Checks limits and ends a current `INSERT` if they are reached. @@ -189,8 +200,10 @@ where /// Ends a current `INSERT` and whole `Inserter` unconditionally. /// /// If it isn't called, the current `INSERT` is aborted. - pub async fn end(self) -> Result { - self.insert.end().await?; + pub async fn end(mut self) -> Result { + if let Some(insert) = self.insert.take() { + insert.end().await?; + } Ok(self.committed) } @@ -200,9 +213,18 @@ where } async fn insert(&mut self) -> Result<()> { - let mut new_insert = self.client.insert(&self.table)?; // Actually it mustn't fail. + if let Some(insert) = self.insert.take() { + insert.end().await?; + } + Ok(()) + } + + #[cold] + #[inline(never)] + fn init_insert(&mut self) -> Result<()> { + debug_assert!(self.insert.is_none()); + let mut new_insert: Insert = self.client.insert(&self.table)?; new_insert.set_timeouts(self.send_timeout, self.end_timeout); - let insert = mem::replace(&mut self.insert, new_insert); - insert.end().await + Ok(()) } } From d9a4ee3abad94f68755025351d4af0cff4622f6a Mon Sep 17 00:00:00 2001 From: Nikita Lapkov <5737185+laplab@users.noreply.github.com> Date: Mon, 12 Jun 2023 15:10:45 +0100 Subject: [PATCH 02/24] fix(inserter): make init_insert actually init the insert (#70) --- src/inserter.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/inserter.rs b/src/inserter.rs index 6833386..7cd13e2 100644 --- a/src/inserter.rs +++ b/src/inserter.rs @@ -225,6 +225,7 @@ where debug_assert!(self.insert.is_none()); let mut new_insert: Insert = self.client.insert(&self.table)?; new_insert.set_timeouts(self.send_timeout, self.end_timeout); + self.insert = Some(new_insert); Ok(()) } } From 6750c4ea377715bfa01df31c03a54d80a07a27b1 Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Mon, 12 Jun 2023 18:12:43 +0400 Subject: [PATCH 03/24] docs(CHANGELOG): note about #70 --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6dd9c98..7eca6b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,8 +8,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] - ReleaseDate ### Changed -- inserter: start new insert only when the first row is provided ([#68]). +- inserter: start new insert only when the first row is provided ([#68], [#70]). +[#70]: https://github.com/loyd/clickhouse.rs/pull/70 [#68]: https://github.com/loyd/clickhouse.rs/pull/68 ## [0.11.4] - 2023-05-14 From d4ecc76a7911deb52304264564108c1aa6af0e06 Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Mon, 12 Jun 2023 18:16:49 +0400 Subject: [PATCH 04/24] chore: release 0.11.5 --- CHANGELOG.md | 5 ++++- Cargo.toml | 2 +- README.md | 4 ++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7eca6b2..27fb8a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] - ReleaseDate + +## [0.11.5] - 2023-06-12 ### Changed - inserter: start new insert only when the first row is provided ([#68], [#70]). @@ -246,7 +248,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `Client::query()` for selecting from tables and DDL statements. -[Unreleased]: https://github.com/loyd/clickhouse.rs/compare/v0.11.4...HEAD +[Unreleased]: https://github.com/loyd/clickhouse.rs/compare/v0.11.5...HEAD +[0.11.5]: https://github.com/loyd/clickhouse.rs/compare/v0.11.4...v0.11.5 [0.11.4]: https://github.com/loyd/clickhouse.rs/compare/v0.11.3...v0.11.4 [0.11.3]: https://github.com/loyd/clickhouse.rs/compare/v0.11.2...v0.11.3 [0.11.2]: https://github.com/loyd/clickhouse.rs/compare/v0.11.1...v0.11.2 diff --git a/Cargo.toml b/Cargo.toml index 6d83f87..faf8ed4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "clickhouse" -version = "0.11.4" +version = "0.11.5" description = "A typed client for ClickHouse with killer features" keywords = ["clickhouse", "database", "driver", "tokio", "hyper"] authors = ["Paul Loyd "] diff --git a/README.md b/README.md index 690bf4b..0ef7630 100644 --- a/README.md +++ b/README.md @@ -30,10 +30,10 @@ A typed client for ClickHouse. To use the crate, add this to your `Cargo.toml`: ```toml [dependencies] -clickhouse = "0.11.4" +clickhouse = "0.11.5" [dev-dependencies] -clickhouse = { version = "0.11.4", features = ["test-util"] } +clickhouse = { version = "0.11.5", features = ["test-util"] } ```
From 3fc12f6ae074b4c4875bb27e9f382dd1b0bfb862 Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Wed, 27 Sep 2023 10:53:35 +0400 Subject: [PATCH 05/24] fix(client): don't fail on HTTPs URLs Closes #58 --- src/lib.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 3af1e77..7afda14 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -73,7 +73,10 @@ impl Default for Client { connector.set_keepalive(Some(TCP_KEEPALIVE)); #[cfg(feature = "tls")] - let connector = HttpsConnector::new_with_connector(connector); + let connector = HttpsConnector::new_with_connector({ + connector.enforce_http(false); + connector + }); let client = hyper::Client::builder() .pool_idle_timeout(POOL_IDLE_TIMEOUT) From b35d31395dc2c41d0dfc423ce366d13ee165920b Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Wed, 27 Sep 2023 11:09:38 +0400 Subject: [PATCH 06/24] style(sql): satisfy clippy --- src/sql/escape.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/sql/escape.rs b/src/sql/escape.rs index 8178153..e43cde6 100644 --- a/src/sql/escape.rs +++ b/src/sql/escape.rs @@ -35,13 +35,13 @@ fn escape(src: &str, mut dst: impl fmt::Write, ch: char) -> fmt::Result { #[test] fn it_escapes_string() { let mut actual = String::new(); - string(r#"f\o'o '' b\'ar'"#, &mut actual).unwrap(); - assert_eq!(actual, r#"'f\\o\'o \'\' b\\\'ar\''"#); + string(r"f\o'o '' b\'ar'", &mut actual).unwrap(); + assert_eq!(actual, r"'f\\o\'o \'\' b\\\'ar\''"); } #[test] fn it_escapes_identifier() { let mut actual = String::new(); - identifier(r#"f\o`o `` b\`ar`"#, &mut actual).unwrap(); - assert_eq!(actual, r#"`f\\o\`o \`\` b\\\`ar\``"#); + identifier(r"f\o`o `` b\`ar`", &mut actual).unwrap(); + assert_eq!(actual, r"`f\\o\`o \`\` b\\\`ar\``"); } From d6232f14181cd4dac597141027134e33333c5b01 Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Wed, 27 Sep 2023 11:19:48 +0400 Subject: [PATCH 07/24] refactor(test): reorganize integration tests --- .../compression.rs} | 14 +++++------ .../cursor_error.rs} | 14 +++++------ tests/{test_ip.rs => it/ip.rs} | 6 ++--- tests/{common.rs => it/main.rs} | 24 ++++++++++++------- tests/{test_nested.rs => it/nested.rs} | 6 ++--- tests/{test_query.rs => it/query.rs} | 22 ++++++++--------- tests/{test_time.rs => it/time.rs} | 14 +++++------ tests/{test_uuid.rs => it/uuid.rs} | 6 ++--- tests/{test_wa_37420.rs => it/wa_37420.rs} | 6 ++--- tests/{test_watch.rs => it/watch.rs} | 10 ++++---- 10 files changed, 56 insertions(+), 66 deletions(-) rename tests/{test_compression.rs => it/compression.rs} (79%) rename tests/{test_cursor_error.rs => it/cursor_error.rs} (91%) rename tests/{test_ip.rs => it/ip.rs} (95%) rename tests/{common.rs => it/main.rs} (75%) rename tests/{test_nested.rs => it/nested.rs} (93%) rename tests/{test_query.rs => it/query.rs} (92%) rename tests/{test_time.rs => it/time.rs} (97%) rename tests/{test_uuid.rs => it/uuid.rs} (94%) rename tests/{test_wa_37420.rs => it/wa_37420.rs} (95%) rename tests/{test_watch.rs => it/watch.rs} (95%) diff --git a/tests/test_compression.rs b/tests/it/compression.rs similarity index 79% rename from tests/test_compression.rs rename to tests/it/compression.rs index 206c7f5..e9b248e 100644 --- a/tests/test_compression.rs +++ b/tests/it/compression.rs @@ -2,8 +2,6 @@ use serde::{Deserialize, Serialize}; use clickhouse::{Client, Compression, Row}; -mod common; - async fn check(client: Client) { #[derive(Debug, Row, Serialize, Deserialize)] struct MyRow<'a> { @@ -44,25 +42,25 @@ async fn check(client: Client) { assert_eq!(sum_len, 600_000); } -#[common::named] +#[crate::named] #[tokio::test] async fn none() { - let client = common::prepare_database!().with_compression(Compression::None); + let client = prepare_database!().with_compression(Compression::None); check(client).await; } #[cfg(feature = "lz4")] -#[common::named] +#[crate::named] #[tokio::test] async fn lz4() { - let client = common::prepare_database!().with_compression(Compression::Lz4); + let client = prepare_database!().with_compression(Compression::Lz4); check(client).await; } #[cfg(feature = "lz4")] -#[common::named] +#[crate::named] #[tokio::test] async fn lz4_hc() { - let client = common::prepare_database!().with_compression(Compression::Lz4Hc(4)); + let client = prepare_database!().with_compression(Compression::Lz4Hc(4)); check(client).await; } diff --git a/tests/test_cursor_error.rs b/tests/it/cursor_error.rs similarity index 91% rename from tests/test_cursor_error.rs rename to tests/it/cursor_error.rs index 2f1b85a..a26e5a8 100644 --- a/tests/test_cursor_error.rs +++ b/tests/it/cursor_error.rs @@ -1,18 +1,16 @@ use clickhouse::{Client, Compression}; -mod common; - -#[common::named] +#[crate::named] #[tokio::test] async fn deferred() { - let client = common::prepare_database!(); + let client = prepare_database!(); max_execution_time(client, false).await; } -#[common::named] +#[crate::named] #[tokio::test] async fn wait_end_of_query() { - let client = common::prepare_database!(); + let client = prepare_database!(); max_execution_time(client, true).await; } @@ -48,10 +46,10 @@ async fn max_execution_time(mut client: Client, wait_end_of_query: bool) { } #[cfg(feature = "lz4")] -#[common::named] +#[crate::named] #[tokio::test] async fn deferred_lz4() { - let client = common::prepare_database!().with_compression(Compression::Lz4); + let client = prepare_database!().with_compression(Compression::Lz4); client .query("CREATE TABLE test(no UInt32) ENGINE = MergeTree ORDER BY no") diff --git a/tests/test_ip.rs b/tests/it/ip.rs similarity index 95% rename from tests/test_ip.rs rename to tests/it/ip.rs index 966ca09..21efb84 100644 --- a/tests/test_ip.rs +++ b/tests/it/ip.rs @@ -4,12 +4,10 @@ use serde::{Deserialize, Serialize}; use clickhouse::Row; -mod common; - -#[common::named] +#[crate::named] #[tokio::test] async fn smoke() { - let client = common::prepare_database!(); + let client = prepare_database!(); #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Row)] struct MyRow { diff --git a/tests/common.rs b/tests/it/main.rs similarity index 75% rename from tests/common.rs rename to tests/it/main.rs index 26c1b0a..6a24da0 100644 --- a/tests/common.rs +++ b/tests/it/main.rs @@ -1,20 +1,28 @@ -#![allow(dead_code, unused_imports, unused_macros)] - use clickhouse::{sql, Client}; - -const HOST: &str = "localhost:8123"; +use function_name::named; macro_rules! prepare_database { () => { - common::_priv::prepare_database(file!(), function_name!()).await + crate::_priv::prepare_database(file!(), function_name!()).await }; } -pub(crate) use {function_name::named, prepare_database}; -pub(crate) mod _priv { +mod compression; +mod cursor_error; +mod ip; +mod nested; +mod query; +mod time; +mod uuid; +mod wa_37420; +mod watch; + +const HOST: &str = "localhost:8123"; + +mod _priv { use super::*; - pub async fn prepare_database(file_path: &str, fn_name: &str) -> Client { + pub(crate) async fn prepare_database(file_path: &str, fn_name: &str) -> Client { let name = make_db_name(file_path, fn_name); let client = Client::default().with_url(format!("http://{HOST}")); diff --git a/tests/test_nested.rs b/tests/it/nested.rs similarity index 93% rename from tests/test_nested.rs rename to tests/it/nested.rs index daf75b1..9b2fde7 100644 --- a/tests/test_nested.rs +++ b/tests/it/nested.rs @@ -2,12 +2,10 @@ use serde::{Deserialize, Serialize}; use clickhouse::Row; -mod common; - -#[common::named] +#[crate::named] #[tokio::test] async fn smoke() { - let client = common::prepare_database!(); + let client = prepare_database!(); #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Row)] struct MyRow { diff --git a/tests/test_query.rs b/tests/it/query.rs similarity index 92% rename from tests/test_query.rs rename to tests/it/query.rs index 04d566f..4a8e4a9 100644 --- a/tests/test_query.rs +++ b/tests/it/query.rs @@ -2,12 +2,10 @@ use serde::{Deserialize, Serialize}; use clickhouse::{error::Error, Row}; -mod common; - -#[common::named] +#[crate::named] #[tokio::test] async fn smoke() { - let client = common::prepare_database!(); + let client = prepare_database!(); #[derive(Debug, Row, Serialize, Deserialize)] struct MyRow<'a> { @@ -54,10 +52,10 @@ async fn smoke() { } } -#[common::named] +#[crate::named] #[tokio::test] async fn fetch_one_and_optional() { - let client = common::prepare_database!(); + let client = prepare_database!(); client .query("CREATE TABLE test(n String) ENGINE = MergeTree ORDER BY n") @@ -90,10 +88,10 @@ async fn fetch_one_and_optional() { } // See #19. -#[common::named] +#[crate::named] #[tokio::test] async fn long_query() { - let client = common::prepare_database!(); + let client = prepare_database!(); client .query("CREATE TABLE test(n String) ENGINE = MergeTree ORDER BY n") @@ -114,10 +112,10 @@ async fn long_query() { } // See #22. -#[common::named] +#[crate::named] #[tokio::test] async fn big_borrowed_str() { - let client = common::prepare_database!(); + let client = prepare_database!(); #[derive(Debug, Row, Serialize, Deserialize)] struct MyRow<'a> { @@ -153,10 +151,10 @@ async fn big_borrowed_str() { } // See #31. -#[common::named] +#[crate::named] #[tokio::test] async fn all_floats() { - let client = common::prepare_database!(); + let client = prepare_database!(); client .query("CREATE TABLE test(no UInt32, f Float64) ENGINE = MergeTree ORDER BY no") diff --git a/tests/test_time.rs b/tests/it/time.rs similarity index 97% rename from tests/test_time.rs rename to tests/it/time.rs index 6459f98..1a7597b 100644 --- a/tests/test_time.rs +++ b/tests/it/time.rs @@ -8,12 +8,10 @@ use time::{macros::datetime, Date, OffsetDateTime}; use clickhouse::Row; -mod common; - -#[common::named] +#[crate::named] #[tokio::test] async fn datetime() { - let client = common::prepare_database!(); + let client = prepare_database!(); #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Row)] struct MyRow { @@ -116,10 +114,10 @@ async fn datetime() { assert_eq!(row_str.dt64ns, &original_row.dt64ns.to_string()[..29]); } -#[common::named] +#[crate::named] #[tokio::test] async fn date() { - let client = common::prepare_database!(); + let client = prepare_database!(); #[derive(Debug, Serialize, Deserialize, Row)] struct MyRow { @@ -170,10 +168,10 @@ async fn date() { } } -#[common::named] +#[crate::named] #[tokio::test] async fn date32() { - let client = common::prepare_database!(); + let client = prepare_database!(); #[derive(Debug, Serialize, Deserialize, Row)] struct MyRow { diff --git a/tests/test_uuid.rs b/tests/it/uuid.rs similarity index 94% rename from tests/test_uuid.rs rename to tests/it/uuid.rs index eb12483..fbc196b 100644 --- a/tests/test_uuid.rs +++ b/tests/it/uuid.rs @@ -5,12 +5,10 @@ use uuid::Uuid; use clickhouse::Row; -mod common; - -#[common::named] +#[crate::named] #[tokio::test] async fn smoke() { - let client = common::prepare_database!(); + let client = prepare_database!(); #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Row)] struct MyRow { diff --git a/tests/test_wa_37420.rs b/tests/it/wa_37420.rs similarity index 95% rename from tests/test_wa_37420.rs rename to tests/it/wa_37420.rs index 9851b8f..26d92e6 100644 --- a/tests/test_wa_37420.rs +++ b/tests/it/wa_37420.rs @@ -4,12 +4,10 @@ use serde::{Deserialize, Serialize}; use clickhouse::Row; -mod common; - -#[common::named] +#[crate::named] #[tokio::test] async fn smoke() { - let client = common::prepare_database!(); + let client = prepare_database!(); #[derive(Debug, Row, Serialize, Deserialize)] pub struct Row { diff --git a/tests/test_watch.rs b/tests/it/watch.rs similarity index 95% rename from tests/test_watch.rs rename to tests/it/watch.rs index e42d843..ffb139b 100644 --- a/tests/test_watch.rs +++ b/tests/it/watch.rs @@ -4,8 +4,6 @@ use serde::{Deserialize, Serialize}; use clickhouse::{Client, Row}; -mod common; - #[derive(Debug, PartialEq, Row, Serialize, Deserialize)] struct MyRow { num: u32, @@ -33,10 +31,10 @@ async fn insert_into_table(client: &Client, rows: &[MyRow]) { insert.end().await.unwrap(); } -#[common::named] +#[crate::named] #[tokio::test] async fn changes() { - let client = common::prepare_database!(); + let client = prepare_database!(); create_table(&client).await; @@ -71,10 +69,10 @@ async fn changes() { assert_eq!(cursor2.next().await.unwrap(), Some((3, MyRow { num: 21 }))); } -#[common::named] +#[crate::named] #[tokio::test] async fn events() { - let client = common::prepare_database!(); + let client = prepare_database!(); create_table(&client).await; From 912af3ff1ad160c8ab6466887723dda7dad85e95 Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Wed, 27 Sep 2023 11:21:29 +0400 Subject: [PATCH 08/24] docs(CHANGELOG): note about #58 --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 27fb8a6..a0f3d12 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] - ReleaseDate +### Fixed +- client: accept HTTPs urls if `tls` feature is enabled ([#58]). + +[#58]: https://github.com/loyd/clickhouse.rs/issues/56 ## [0.11.5] - 2023-06-12 ### Changed From f038043e1cd7238bbebf7db44fe1ed026ebb237e Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Wed, 27 Sep 2023 11:22:18 +0400 Subject: [PATCH 09/24] chore: release 0.11.6 --- CHANGELOG.md | 5 ++++- Cargo.toml | 2 +- README.md | 4 ++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a0f3d12..2dd370c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] - ReleaseDate + +## [0.11.6] - 2023-09-27 ### Fixed - client: accept HTTPs urls if `tls` feature is enabled ([#58]). @@ -252,7 +254,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `Client::query()` for selecting from tables and DDL statements. -[Unreleased]: https://github.com/loyd/clickhouse.rs/compare/v0.11.5...HEAD +[Unreleased]: https://github.com/loyd/clickhouse.rs/compare/v0.11.6...HEAD +[0.11.6]: https://github.com/loyd/clickhouse.rs/compare/v0.11.5...v0.11.6 [0.11.5]: https://github.com/loyd/clickhouse.rs/compare/v0.11.4...v0.11.5 [0.11.4]: https://github.com/loyd/clickhouse.rs/compare/v0.11.3...v0.11.4 [0.11.3]: https://github.com/loyd/clickhouse.rs/compare/v0.11.2...v0.11.3 diff --git a/Cargo.toml b/Cargo.toml index faf8ed4..506ea1a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "clickhouse" -version = "0.11.5" +version = "0.11.6" description = "A typed client for ClickHouse with killer features" keywords = ["clickhouse", "database", "driver", "tokio", "hyper"] authors = ["Paul Loyd "] diff --git a/README.md b/README.md index 0ef7630..c0f3484 100644 --- a/README.md +++ b/README.md @@ -30,10 +30,10 @@ A typed client for ClickHouse. To use the crate, add this to your `Cargo.toml`: ```toml [dependencies] -clickhouse = "0.11.5" +clickhouse = "0.11.6" [dev-dependencies] -clickhouse = { version = "0.11.5", features = ["test-util"] } +clickhouse = { version = "0.11.6", features = ["test-util"] } ```
From 58d70939b6c0bc3f8eb9f3cd96c7c3fcdaf1f1f4 Mon Sep 17 00:00:00 2001 From: William Speirs Date: Mon, 9 Oct 2023 02:09:32 -0400 Subject: [PATCH 10/24] feat: support serde::skip_deserializing --- derive/src/lib.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/derive/src/lib.rs b/derive/src/lib.rs index eb8937e..ec17f1b 100644 --- a/derive/src/lib.rs +++ b/derive/src/lib.rs @@ -3,7 +3,7 @@ use quote::quote; use serde_derive_internals::{attr::get_serde_meta_items, Ctxt}; use syn::{parse_macro_input, Data, DataStruct, DeriveInput, Fields, Ident, Lit, Meta, NestedMeta}; -/// Parses `#[serde(skip_serializing)]` +/// Parses `#[serde(skip_serializing)]` and `#[serde(skip_deserializing)]` fn serde_skipped(cx: &Ctxt, attrs: &[syn::Attribute]) -> bool { for meta_items in attrs .iter() @@ -12,12 +12,13 @@ fn serde_skipped(cx: &Ctxt, attrs: &[syn::Attribute]) -> bool { for meta_item in meta_items { match meta_item { NestedMeta::Meta(Meta::Path(path)) - if path - .get_ident() - .map_or(false, |i| *i == "skip_serializing") => - { - return true - } + if path + .get_ident() + // will not work with skip_serializing_if + .map_or(false, |i| *i == "skip_serializing" || *i == "skip_deserializing") => + { + return true + } _ => continue, } } From cef34f73e71d4cd25446475390e9b3b3803f0a40 Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Mon, 9 Oct 2023 10:12:12 +0400 Subject: [PATCH 11/24] docs(CHANGELOG): note about #83 --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2dd370c..833f993 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] - ReleaseDate +### Added +- derive: support `serde::skip_deserializing` ([#83]). + +[#83]: https://github.com/loyd/clickhouse.rs/pull/83 ## [0.11.6] - 2023-09-27 ### Fixed From e71f9d6a3cbfabb986291b65ce1b10e64eddece0 Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Sun, 12 Nov 2023 18:51:56 +0400 Subject: [PATCH 12/24] test(insert): bench the inserter --- benches/insert.rs | 93 +++++++++++++++++++++++++++++++---------------- 1 file changed, 62 insertions(+), 31 deletions(-) diff --git a/benches/insert.rs b/benches/insert.rs index 059635e..a7e486a 100644 --- a/benches/insert.rs +++ b/benches/insert.rs @@ -1,4 +1,4 @@ -use std::{mem, time::Duration}; +use std::{future::Future, mem, time::Duration}; use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput}; use serde::Serialize; @@ -39,38 +39,61 @@ mod server { } } -fn insert(c: &mut Criterion) { - let addr = "127.0.0.1:6543".parse().unwrap(); - server::start(addr); +#[derive(Row, Serialize)] +struct SomeRow { + a: u64, + b: i64, + c: i32, + d: u32, +} + +async fn run_insert(client: Client, iters: u64) -> Result { + let start = Instant::now(); + let mut insert = client.insert("table")?; - #[derive(Row, Serialize)] - struct SomeRow { - a: u64, - b: i64, - c: i32, - d: u32, + for _ in 0..iters { + insert + .write(&black_box(SomeRow { + a: 42, + b: 42, + c: 42, + d: 42, + })) + .await?; } - async fn run(client: Client, iters: u64) -> Result { - let start = Instant::now(); - let mut insert = client.insert("table")?; - - for _ in 0..iters { - insert - .write(&black_box(SomeRow { - a: 42, - b: 42, - c: 42, - d: 42, - })) - .await?; - } + insert.end().await?; + Ok(start.elapsed()) +} - insert.end().await?; - Ok(start.elapsed()) +async fn run_inserter(client: Client, iters: u64) -> Result { + let start = Instant::now(); + let mut inserter = client.inserter("table")?.with_max_entries(iters); + + for _ in 0..iters { + inserter + .write(&black_box(SomeRow { + a: 42, + b: 42, + c: 42, + d: 42, + })) + .await?; + inserter.commit().await?; } - let mut group = c.benchmark_group("insert"); + inserter.end().await?; + Ok(start.elapsed()) +} + +fn run(c: &mut Criterion, name: &str, f: impl Fn(Client, u64) -> F) +where + F: Future>, +{ + let addr = "127.0.0.1:6543".parse().unwrap(); + server::start(addr); + + let mut group = c.benchmark_group(name); group.throughput(Throughput::Bytes(mem::size_of::() as u64)); group.bench_function("no compression", |b| { b.iter_custom(|iters| { @@ -78,7 +101,7 @@ fn insert(c: &mut Criterion) { let client = Client::default() .with_url(format!("http://{addr}")) .with_compression(Compression::None); - rt.block_on(run(client, iters)).unwrap() + rt.block_on((f)(client, iters)).unwrap() }) }); #[cfg(feature = "lz4")] @@ -88,7 +111,7 @@ fn insert(c: &mut Criterion) { let client = Client::default() .with_url(format!("http://{addr}")) .with_compression(Compression::Lz4); - rt.block_on(run(client, iters)).unwrap() + rt.block_on((f)(client, iters)).unwrap() }) }); #[cfg(feature = "lz4")] @@ -98,11 +121,19 @@ fn insert(c: &mut Criterion) { let client = Client::default() .with_url(format!("http://{addr}")) .with_compression(Compression::Lz4Hc(4)); - rt.block_on(run(client, iters)).unwrap() + rt.block_on((f)(client, iters)).unwrap() }) }); group.finish(); } -criterion_group!(benches, insert); +fn insert(c: &mut Criterion) { + run(c, "insert", run_insert); +} + +fn inserter(c: &mut Criterion) { + run(c, "inserter", run_inserter); +} + +criterion_group!(benches, insert, inserter); criterion_main!(benches); From 3dedcc1470d036f81c5e0bf15d65fa9ecc0d3d99 Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Sun, 12 Nov 2023 19:49:14 +0400 Subject: [PATCH 13/24] feat: update deps, move to syn v2 --- CHANGELOG.md | 3 +++ Cargo.toml | 4 +-- derive/Cargo.toml | 4 +-- derive/src/lib.rs | 69 +++++++---------------------------------------- src/row.rs | 16 +++++++++++ 5 files changed, 32 insertions(+), 64 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 833f993..ed9144f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - derive: support `serde::skip_deserializing` ([#83]). +### Changed +- derive: move to syn v2. + [#83]: https://github.com/loyd/clickhouse.rs/pull/83 ## [0.11.6] - 2023-09-27 diff --git a/Cargo.toml b/Cargo.toml index 506ea1a..66230e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,7 +58,7 @@ hyper-tls = { version = "0.5.0", optional = true } url = "2.1.1" futures = "0.3.5" static_assertions = "1.1" -sealed = "0.4" +sealed = "0.5" sha-1 = { version = "0.10", optional = true } serde_json = { version = "1.0.68", optional = true } lz4 = { version = "1.23.3", optional = true } @@ -68,7 +68,7 @@ time = { version = "0.3", optional = true } bstr = { version = "1.2", default-features = false } [dev-dependencies] -criterion = "0.4.0" +criterion = "0.5.0" serde = { version = "1.0.106", features = ["derive"] } tokio = { version = "1.0.1", features = ["full", "test-util"] } hyper = { version = "0.14", features = ["client", "tcp", "http1", "stream", "server"] } diff --git a/derive/Cargo.toml b/derive/Cargo.toml index efef66a..57f9fa8 100644 --- a/derive/Cargo.toml +++ b/derive/Cargo.toml @@ -11,6 +11,6 @@ proc-macro = true [dependencies] proc-macro2 = "1.0" -syn = "1.0" +syn = "2.0" quote = "1.0" -serde_derive_internals = "0.26" +serde_derive_internals = "0.29" diff --git a/derive/src/lib.rs b/derive/src/lib.rs index ec17f1b..409dad8 100644 --- a/derive/src/lib.rs +++ b/derive/src/lib.rs @@ -1,60 +1,10 @@ use proc_macro2::TokenStream; use quote::quote; -use serde_derive_internals::{attr::get_serde_meta_items, Ctxt}; -use syn::{parse_macro_input, Data, DataStruct, DeriveInput, Fields, Ident, Lit, Meta, NestedMeta}; - -/// Parses `#[serde(skip_serializing)]` and `#[serde(skip_deserializing)]` -fn serde_skipped(cx: &Ctxt, attrs: &[syn::Attribute]) -> bool { - for meta_items in attrs - .iter() - .filter_map(|attr| get_serde_meta_items(cx, attr).ok()) - { - for meta_item in meta_items { - match meta_item { - NestedMeta::Meta(Meta::Path(path)) - if path - .get_ident() - // will not work with skip_serializing_if - .map_or(false, |i| *i == "skip_serializing" || *i == "skip_deserializing") => - { - return true - } - _ => continue, - } - } - } - false -} - -/// Parses `#[serde(rename = "..")]` -fn serde_rename(cx: &Ctxt, field: &syn::Field) -> Option { - for meta_items in field - .attrs - .iter() - .filter_map(|attr| get_serde_meta_items(cx, attr).ok()) - { - for meta_item in meta_items { - match meta_item { - NestedMeta::Meta(Meta::NameValue(nv)) - if nv - .path - .get_ident() - .map_or(false, |i| *i == "rename") => - { - if let Lit::Str(lit) = nv.lit { - return Some(lit.value()); - } - } - _ => continue, - } - } - } - None -} - -fn unraw(ident: &Ident) -> String { - ident.to_string().trim_start_matches("r#").to_owned() -} +use serde_derive_internals::{ + attr::{Default as SerdeDefault, Field}, + Ctxt, +}; +use syn::{parse_macro_input, Data, DataStruct, DeriveInput, Fields}; fn column_names(data: &DataStruct) -> TokenStream { match &data.fields { @@ -63,11 +13,10 @@ fn column_names(data: &DataStruct) -> TokenStream { let column_names_iter = fields .named .iter() - .filter(|f| !serde_skipped(&cx, &f.attrs)) - .map(|f| match serde_rename(&cx, f) { - Some(name) => name, - None => unraw(f.ident.as_ref().unwrap()), - }); + .enumerate() + .map(|(index, field)| Field::from_ast(&cx, index, field, None, &SerdeDefault::None)) + .filter(|field| !field.skip_serializing() && !field.skip_deserializing()) + .map(|field| field.name().serialize_name().to_string()); let tokens = quote! { &[#( #column_names_iter,)*] diff --git a/src/row.rs b/src/row.rs index 11361ff..c46fc55 100644 --- a/src/row.rs +++ b/src/row.rs @@ -4,6 +4,7 @@ pub trait Row { const COLUMN_NAMES: &'static [&'static str]; // TODO: count + // TODO: different list for SELECT/INSERT (de/ser) } // Actually, it's not public now. @@ -138,6 +139,21 @@ mod tests { assert_eq!(join_column_names::().unwrap(), "`one`"); } + #[test] + fn it_skips_deserializing() { + use serde::Deserialize; + + #[derive(Row, Deserialize)] + #[allow(dead_code)] + struct TopLevel { + one: u32, + #[serde(skip_deserializing)] + two: u32, + } + + assert_eq!(join_column_names::().unwrap(), "`one`"); + } + #[test] fn it_rejects_other() { #[derive(Row)] From d678b15a5535dffaa30273b98a2411a7d777d559 Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Sun, 12 Nov 2023 20:29:15 +0400 Subject: [PATCH 14/24] fix(watch): support a new syntax --- CHANGELOG.md | 3 +++ src/lib.rs | 3 +++ src/watch.rs | 10 +++++----- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ed9144f..256ebe4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - derive: move to syn v2. +### Fixed +- watch: support a new syntax. + [#83]: https://github.com/loyd/clickhouse.rs/pull/83 ## [0.11.6] - 2023-09-27 diff --git a/src/lib.rs b/src/lib.rs index 7afda14..1533371 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -194,6 +194,9 @@ impl Client { } /// Starts a new WATCH query. + /// + /// The `query` can be either the table name or a SELECT query. + /// In the second case, a new LV table is created. #[cfg(feature = "watch")] pub fn watch(&self, query: &str) -> watch::Watch { watch::Watch::new(self, query) diff --git a/src/watch.rs b/src/watch.rs index 113c883..aacadd9 100644 --- a/src/watch.rs +++ b/src/watch.rs @@ -29,15 +29,15 @@ impl Watch { self } - // TODO: `timeout()`. - /// Limits the number of updates after initial one. pub fn limit(mut self, limit: impl Into>) -> Self { self.limit = limit.into(); self } - /// See [docs](https://clickhouse.tech/docs/en/sql-reference/statements/create/view/#live-view-with-refresh) + /// See [docs](https://clickhouse.com/docs/en/sql-reference/statements/create/view#with-refresh-clause). + /// + /// Makes sense only for SQL queries (`client.watch("SELECT X")`). pub fn refresh(mut self, interval: impl Into>) -> Self { self.refresh = interval.into(); self @@ -227,10 +227,10 @@ async fn init_cursor(client: &Client, params: &WatchParams) -> Result Date: Sun, 12 Nov 2023 20:35:00 +0400 Subject: [PATCH 15/24] feat!: drop the `wa-37420` feature --- .github/workflows/ci.yml | 1 + CHANGELOG.md | 1 + Cargo.toml | 3 -- README.md | 2 +- src/insert.rs | 20 ------------ tests/it/main.rs | 1 - tests/it/wa_37420.rs | 70 ---------------------------------------- 7 files changed, 3 insertions(+), 95 deletions(-) delete mode 100644 tests/it/wa_37420.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c797706..4bead9d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -41,6 +41,7 @@ jobs: - run: cargo test - run: cargo test --no-default-features - run: cargo test --features uuid,time + - run: cargo test --all-features services: clickhouse: diff --git a/CHANGELOG.md b/CHANGELOG.md index 256ebe4..787ca72 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - derive: support `serde::skip_deserializing` ([#83]). ### Changed +- **BREAKING** drop the `wa-37420` feature. - derive: move to syn v2. ### Fixed diff --git a/Cargo.toml b/Cargo.toml index 66230e8..5233f23 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,9 +43,6 @@ uuid = ["dep:uuid"] time = ["dep:time"] tls = ["dep:hyper-tls"] -# Temporary workaround for https://github.com/ClickHouse/ClickHouse/issues/37420 -wa-37420 = [] - [dependencies] clickhouse-derive = { version = "0.1.1", path = "derive" } diff --git a/README.md b/README.md index c0f3484..db37fa1 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,7 @@ clickhouse = { version = "0.11.6", features = ["test-util"] } -CH server older than v22.6 (2022-06-16) handles `RowBinary` [incorrectly](https://github.com/ClickHouse/ClickHouse/issues/37420) in some rare cases. Enable `wa-37420` feature to solve this problem. Don't use it for newer versions. +CH server older than v22.6 (2022-06-16) handles `RowBinary` [incorrectly](https://github.com/ClickHouse/ClickHouse/issues/37420) in some rare cases. Use 0.11 and enable `wa-37420` feature to solve this problem. Don't use it for newer versions.
diff --git a/src/insert.rs b/src/insert.rs index f9af43a..c0dd3cf 100644 --- a/src/insert.rs +++ b/src/insert.rs @@ -25,8 +25,6 @@ const MIN_CHUNK_SIZE: usize = BUFFER_SIZE - 1024; // slightly less to avoid extr #[must_use] pub struct Insert { buffer: BytesMut, - #[cfg(feature = "wa-37420")] - chunk_count: usize, sender: Option, #[cfg(feature = "lz4")] compression: Compression, @@ -102,8 +100,6 @@ impl Insert { Ok(Self { buffer: BytesMut::with_capacity(BUFFER_SIZE), - #[cfg(feature = "wa-37420")] - chunk_count: 0, sender: Some(sender), #[cfg(feature = "lz4")] compression: client.compression, @@ -189,10 +185,6 @@ impl Insert { return Ok(()); } - // A temporary workaround for https://github.com/ClickHouse/ClickHouse/issues/37420. - #[cfg(feature = "wa-37420")] - self.prepend_bom(); - // Hyper uses non-trivial and inefficient schema of buffering chunks. // It's difficult to determine when allocations occur. // So, instead we control it manually here and rely on the system allocator. @@ -250,18 +242,6 @@ impl Insert { Ok(mem::replace(&mut self.buffer, BytesMut::with_capacity(BUFFER_SIZE)).freeze()) } - #[cfg(feature = "wa-37420")] - fn prepend_bom(&mut self) { - if self.chunk_count == 0 && self.buffer.starts_with(&[0xef, 0xbb, 0xbf]) { - let mut new_chunk = BytesMut::with_capacity(self.buffer.len() + 3); - new_chunk.extend_from_slice(&[0xef, 0xbb, 0xbf]); - new_chunk.extend_from_slice(&self.buffer); - self.buffer = new_chunk; - } - - self.chunk_count += 1; - } - fn abort(&mut self) { if let Some(sender) = self.sender.take() { sender.abort(); diff --git a/tests/it/main.rs b/tests/it/main.rs index 6a24da0..1f83475 100644 --- a/tests/it/main.rs +++ b/tests/it/main.rs @@ -14,7 +14,6 @@ mod nested; mod query; mod time; mod uuid; -mod wa_37420; mod watch; const HOST: &str = "localhost:8123"; diff --git a/tests/it/wa_37420.rs b/tests/it/wa_37420.rs deleted file mode 100644 index 26d92e6..0000000 --- a/tests/it/wa_37420.rs +++ /dev/null @@ -1,70 +0,0 @@ -#![cfg(feature = "wa-37420")] - -use serde::{Deserialize, Serialize}; - -use clickhouse::Row; - -#[crate::named] -#[tokio::test] -async fn smoke() { - let client = prepare_database!(); - - #[derive(Debug, Row, Serialize, Deserialize)] - pub struct Row { - time: u64, - name: String, - } - - #[derive(Debug, Row, Serialize, Deserialize)] - pub struct Row2 { - name: String, - time: u64, - } - - // Create a table. - client - .query("CREATE TABLE test(name String, time UInt64) ENGINE = MergeTree ORDER BY time") - .execute() - .await - .unwrap(); - - let row = Row { - time: 1651760768976141295, - name: "first".into(), - }; - - // Write to the table. - let mut insert = client.insert("test").unwrap(); - insert.write(&row).await.unwrap(); - insert.end().await.unwrap(); - - // Read from the table. - let inserted = client - .query("SELECT ?fields FROM test WHERE name = 'first'") - .fetch_one::() - .await - .unwrap(); - - assert_eq!(inserted.time, row.time); - - // ------- - - let row = Row2 { - name: "second".into(), - time: 1651760768976141295, - }; - - // Write to the table. - let mut insert = client.insert("test").unwrap(); - insert.write(&row).await.unwrap(); - insert.end().await.unwrap(); - - // Read from the table. - let inserted = client - .query("SELECT ?fields FROM test WHERE name = 'second'") - .fetch_one::() - .await - .unwrap(); - - assert_eq!(inserted.time, row.time); -} From 843ca7e38da953be2f9938cb43252458de23f652 Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Sun, 12 Nov 2023 21:02:20 +0400 Subject: [PATCH 16/24] refactor(tests): get rid of `function_name` --- Cargo.toml | 1 - tests/it/compression.rs | 3 --- tests/it/cursor_error.rs | 3 --- tests/it/ip.rs | 1 - tests/it/main.rs | 25 +++++++++++++++++-------- tests/it/nested.rs | 1 - tests/it/query.rs | 5 ----- tests/it/time.rs | 3 --- tests/it/uuid.rs | 1 - tests/it/watch.rs | 2 -- 10 files changed, 17 insertions(+), 28 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5233f23..68382cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,6 +72,5 @@ hyper = { version = "0.14", features = ["client", "tcp", "http1", "stream", "ser serde_bytes = "0.11.4" serde_repr = "0.1.7" uuid = { version = "1", features = ["v4"] } -function_name = "0.3" time = { version = "0.3.17", features = ["macros", "rand"] } rand = "0.8.5" diff --git a/tests/it/compression.rs b/tests/it/compression.rs index e9b248e..1ade29a 100644 --- a/tests/it/compression.rs +++ b/tests/it/compression.rs @@ -42,7 +42,6 @@ async fn check(client: Client) { assert_eq!(sum_len, 600_000); } -#[crate::named] #[tokio::test] async fn none() { let client = prepare_database!().with_compression(Compression::None); @@ -50,7 +49,6 @@ async fn none() { } #[cfg(feature = "lz4")] -#[crate::named] #[tokio::test] async fn lz4() { let client = prepare_database!().with_compression(Compression::Lz4); @@ -58,7 +56,6 @@ async fn lz4() { } #[cfg(feature = "lz4")] -#[crate::named] #[tokio::test] async fn lz4_hc() { let client = prepare_database!().with_compression(Compression::Lz4Hc(4)); diff --git a/tests/it/cursor_error.rs b/tests/it/cursor_error.rs index a26e5a8..21d1bad 100644 --- a/tests/it/cursor_error.rs +++ b/tests/it/cursor_error.rs @@ -1,13 +1,11 @@ use clickhouse::{Client, Compression}; -#[crate::named] #[tokio::test] async fn deferred() { let client = prepare_database!(); max_execution_time(client, false).await; } -#[crate::named] #[tokio::test] async fn wait_end_of_query() { let client = prepare_database!(); @@ -46,7 +44,6 @@ async fn max_execution_time(mut client: Client, wait_end_of_query: bool) { } #[cfg(feature = "lz4")] -#[crate::named] #[tokio::test] async fn deferred_lz4() { let client = prepare_database!().with_compression(Compression::Lz4); diff --git a/tests/it/ip.rs b/tests/it/ip.rs index 21efb84..3476dd0 100644 --- a/tests/it/ip.rs +++ b/tests/it/ip.rs @@ -4,7 +4,6 @@ use serde::{Deserialize, Serialize}; use clickhouse::Row; -#[crate::named] #[tokio::test] async fn smoke() { let client = prepare_database!(); diff --git a/tests/it/main.rs b/tests/it/main.rs index 1f83475..7a1958a 100644 --- a/tests/it/main.rs +++ b/tests/it/main.rs @@ -1,9 +1,15 @@ use clickhouse::{sql, Client}; -use function_name::named; macro_rules! prepare_database { () => { - crate::_priv::prepare_database(file!(), function_name!()).await + crate::_priv::prepare_database({ + fn f() {} + fn type_name_of_val(_: T) -> &'static str { + std::any::type_name::() + } + type_name_of_val(f) + }) + .await }; } @@ -21,8 +27,8 @@ const HOST: &str = "localhost:8123"; mod _priv { use super::*; - pub(crate) async fn prepare_database(file_path: &str, fn_name: &str) -> Client { - let name = make_db_name(file_path, fn_name); + pub(crate) async fn prepare_database(fn_path: &str) -> Client { + let name = make_db_name(fn_path); let client = Client::default().with_url(format!("http://{HOST}")); client @@ -42,9 +48,12 @@ mod _priv { client.with_database(name) } - fn make_db_name(file_path: &str, fn_name: &str) -> String { - let (_, basename) = file_path.rsplit_once('/').expect("invalid file's path"); - let prefix = basename.strip_suffix(".rs").expect("invalid file's path"); - format!("{prefix}__{fn_name}") + // `it::compression::lz4::{{closure}}::f` -> `chrs__compression__lz4` + fn make_db_name(fn_path: &str) -> String { + assert!(fn_path.starts_with("it::")); + let mut iter = fn_path.split("::").skip(1); + let module = iter.next().unwrap(); + let test = iter.next().unwrap(); + format!("chrs__{module}__{test}") } } diff --git a/tests/it/nested.rs b/tests/it/nested.rs index 9b2fde7..928ac72 100644 --- a/tests/it/nested.rs +++ b/tests/it/nested.rs @@ -2,7 +2,6 @@ use serde::{Deserialize, Serialize}; use clickhouse::Row; -#[crate::named] #[tokio::test] async fn smoke() { let client = prepare_database!(); diff --git a/tests/it/query.rs b/tests/it/query.rs index 4a8e4a9..1ed81d7 100644 --- a/tests/it/query.rs +++ b/tests/it/query.rs @@ -2,7 +2,6 @@ use serde::{Deserialize, Serialize}; use clickhouse::{error::Error, Row}; -#[crate::named] #[tokio::test] async fn smoke() { let client = prepare_database!(); @@ -52,7 +51,6 @@ async fn smoke() { } } -#[crate::named] #[tokio::test] async fn fetch_one_and_optional() { let client = prepare_database!(); @@ -88,7 +86,6 @@ async fn fetch_one_and_optional() { } // See #19. -#[crate::named] #[tokio::test] async fn long_query() { let client = prepare_database!(); @@ -112,7 +109,6 @@ async fn long_query() { } // See #22. -#[crate::named] #[tokio::test] async fn big_borrowed_str() { let client = prepare_database!(); @@ -151,7 +147,6 @@ async fn big_borrowed_str() { } // See #31. -#[crate::named] #[tokio::test] async fn all_floats() { let client = prepare_database!(); diff --git a/tests/it/time.rs b/tests/it/time.rs index 1a7597b..9a73653 100644 --- a/tests/it/time.rs +++ b/tests/it/time.rs @@ -8,7 +8,6 @@ use time::{macros::datetime, Date, OffsetDateTime}; use clickhouse::Row; -#[crate::named] #[tokio::test] async fn datetime() { let client = prepare_database!(); @@ -114,7 +113,6 @@ async fn datetime() { assert_eq!(row_str.dt64ns, &original_row.dt64ns.to_string()[..29]); } -#[crate::named] #[tokio::test] async fn date() { let client = prepare_database!(); @@ -168,7 +166,6 @@ async fn date() { } } -#[crate::named] #[tokio::test] async fn date32() { let client = prepare_database!(); diff --git a/tests/it/uuid.rs b/tests/it/uuid.rs index fbc196b..14f57cc 100644 --- a/tests/it/uuid.rs +++ b/tests/it/uuid.rs @@ -5,7 +5,6 @@ use uuid::Uuid; use clickhouse::Row; -#[crate::named] #[tokio::test] async fn smoke() { let client = prepare_database!(); diff --git a/tests/it/watch.rs b/tests/it/watch.rs index ffb139b..560bac8 100644 --- a/tests/it/watch.rs +++ b/tests/it/watch.rs @@ -31,7 +31,6 @@ async fn insert_into_table(client: &Client, rows: &[MyRow]) { insert.end().await.unwrap(); } -#[crate::named] #[tokio::test] async fn changes() { let client = prepare_database!(); @@ -69,7 +68,6 @@ async fn changes() { assert_eq!(cursor2.next().await.unwrap(), Some((3, MyRow { num: 21 }))); } -#[crate::named] #[tokio::test] async fn events() { let client = prepare_database!(); From 87bd9e259acffc573dd1673c0ee1201f34423963 Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Sun, 12 Nov 2023 21:05:32 +0400 Subject: [PATCH 17/24] feat(derive): add `automatically_derived` annotation --- derive/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/derive/src/lib.rs b/derive/src/lib.rs index 409dad8..51425bf 100644 --- a/derive/src/lib.rs +++ b/derive/src/lib.rs @@ -50,6 +50,7 @@ pub fn row(input: proc_macro::TokenStream) -> proc_macro::TokenStream { // TODO: replace `clickhouse` with `::clickhouse` here. let expanded = quote! { + #[automatically_derived] impl #impl_generics clickhouse::Row for #name #ty_generics #where_clause { const COLUMN_NAMES: &'static [&'static str] = #column_names; } From 88702a7976a7d598ea81deb9bc45777d3775de07 Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Sun, 12 Nov 2023 21:06:45 +0400 Subject: [PATCH 18/24] docs(README): update docsrs badge --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index db37fa1..c9f2171 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ A typed client for ClickHouse. [crates-badge]: https://img.shields.io/crates/v/clickhouse.svg [crates-url]: https://crates.io/crates/clickhouse -[docs-badge]: https://docs.rs/clickhouse/badge.svg +[docs-badge]: https://img.shields.io/docsrs/clickhouse [docs-url]: https://docs.rs/clickhouse [mit-badge]: https://img.shields.io/badge/license-MIT-blue.svg [mit-url]: https://github.com/loyd/clickhouse.rs/blob/master/LICENSE From c26868ef655d866c8fcdbb754ed7b8ee56c9a29e Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Sun, 12 Nov 2023 21:07:46 +0400 Subject: [PATCH 19/24] docs(README): remove `wa-37420` from the feature list --- README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/README.md b/README.md index c9f2171..dc35100 100644 --- a/README.md +++ b/README.md @@ -213,7 +213,6 @@ See [examples](https://github.com/loyd/clickhouse.rs/tree/master/examples). * `watch` — enables `client.watch` functionality. See the corresponding section for details. * `uuid` — adds `serde::uuid` to work with [uuid](https://docs.rs/uuid/latest/uuid/) crate. * `time` — adds `serde::time` to work with [time](https://docs.rs/time/latest/time/) crate. -* `wa-37420` — implements a workaround for CH versions prior to v22.6. See the corresponding section for details. ## Data Types * `(U)Int(8|16|32|64|128)` maps to/from corresponding `(u|i)(8|16|32|64|128)` types or newtypes around them. From c8eddbf195addf043127fda02339e644edbd9a9e Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Sun, 12 Nov 2023 21:36:53 +0400 Subject: [PATCH 20/24] feat(inserter): use `quanta::Instant` --- CHANGELOG.md | 2 ++ Cargo.toml | 4 +++- README.md | 5 +++-- src/inserter.rs | 18 ++++++++++++------ src/ticks.rs | 21 ++++++++++++++++----- 5 files changed, 36 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 787ca72..82916ae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,9 +9,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] - ReleaseDate ### Added - derive: support `serde::skip_deserializing` ([#83]). +- the `quanta` feature, enabled by default. ### Changed - **BREAKING** drop the `wa-37420` feature. +- inserter: increase performance if the `quanta` feature is enabled. - derive: move to syn v2. ### Fixed diff --git a/Cargo.toml b/Cargo.toml index 68382cd..55e4dc2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,7 +34,7 @@ required-features = ["test-util"] debug = true [features] -default = ["lz4", "tls"] +default = ["lz4", "tls", "quanta"] test-util = ["hyper/server"] watch = ["dep:sha-1", "dep:serde_json"] @@ -42,6 +42,7 @@ lz4 = ["dep:lz4", "dep:clickhouse-rs-cityhash-sys"] uuid = ["dep:uuid"] time = ["dep:time"] tls = ["dep:hyper-tls"] +quanta = ["dep:quanta"] [dependencies] clickhouse-derive = { version = "0.1.1", path = "derive" } @@ -63,6 +64,7 @@ clickhouse-rs-cityhash-sys = { version = "0.1.2", optional = true } uuid = { version = "1", optional = true } time = { version = "0.3", optional = true } bstr = { version = "1.2", default-features = false } +quanta = { version = "0.12", optional = true } [dev-dependencies] criterion = "0.5.0" diff --git a/README.md b/README.md index dc35100..46fe75e 100644 --- a/README.md +++ b/README.md @@ -209,10 +209,11 @@ See [examples](https://github.com/loyd/clickhouse.rs/tree/master/examples). ## Feature Flags * `lz4` (enabled by default) — enables `Compression::Lz4` and `Compression::Lz4Hc(_)` variants. If enabled, `Compression::Lz4` is used by default for all queries except for `WATCH`. * `tls` (enabled by default) — supports urls with the `HTTPS` schema. +* `quanta` (enabled by default) - uses the [`quanta`](https://docs.rs/quanta) crate to speed the inserter up. Not used if `test-util` is enabled (thus, time can be managed by `tokio::time::advance()` in custom tests). * `test-util` — adds mocks. See [the example](https://github.com/loyd/clickhouse.rs/tree/master/examples/mock.rs). Use it only in `dev-dependencies`. * `watch` — enables `client.watch` functionality. See the corresponding section for details. -* `uuid` — adds `serde::uuid` to work with [uuid](https://docs.rs/uuid/latest/uuid/) crate. -* `time` — adds `serde::time` to work with [time](https://docs.rs/time/latest/time/) crate. +* `uuid` — adds `serde::uuid` to work with [uuid](https://docs.rs/uuid) crate. +* `time` — adds `serde::time` to work with [time](https://docs.rs/time) crate. ## Data Types * `(U)Int(8|16|32|64|128)` maps to/from corresponding `(u|i)(8|16|32|64|128)` types or newtypes around them. diff --git a/src/inserter.rs b/src/inserter.rs index 7cd13e2..d89231c 100644 --- a/src/inserter.rs +++ b/src/inserter.rs @@ -5,9 +5,15 @@ use futures::{ Future, }; use serde::Serialize; -use tokio::time::{Duration, Instant}; - -use crate::{error::Result, insert::Insert, row::Row, ticks::Ticks, Client}; +use tokio::time::Duration; + +use crate::{ + error::Result, + insert::Insert, + row::Row, + ticks::{self, Ticks}, + Client, +}; const DEFAULT_MAX_ENTRIES: u64 = 500_000; @@ -154,7 +160,7 @@ where Some( self.ticks .next_at()? - .saturating_duration_since(Instant::now()), + .saturating_duration_since(ticks::Instant::now()), ) } @@ -184,7 +190,7 @@ where self.uncommitted_entries = 0; } - let now = Instant::now(); + let now = ticks::Instant::now(); Ok(if self.is_threshold_reached(now) { let quantities = mem::replace(&mut self.committed, Quantities::ZERO); @@ -207,7 +213,7 @@ where Ok(self.committed) } - fn is_threshold_reached(&self, now: Instant) -> bool { + fn is_threshold_reached(&self, now: ticks::Instant) -> bool { self.committed.entries >= self.max_entries || self.ticks.next_at().map_or(false, |next_at| now >= next_at) } diff --git a/src/ticks.rs b/src/ticks.rs index 37d2157..e8756ae 100644 --- a/src/ticks.rs +++ b/src/ticks.rs @@ -1,7 +1,18 @@ -use tokio::time::{Duration, Instant}; +use tokio::time::Duration; const PERIOD_THRESHOLD: Duration = Duration::from_secs(365 * 24 * 3600); +// === Instant === + +// More efficient `Instant` based on TSC. +#[cfg(all(feature = "quanta", not(feature = "test-util")))] +pub(crate) type Instant = quanta::Instant; + +#[cfg(any(not(feature = "quanta"), feature = "test-util"))] +pub(crate) type Instant = tokio::time::Instant; + +// === Ticks === + pub(crate) struct Ticks { period: Duration, max_bias: f64, @@ -62,9 +73,9 @@ impl Ticks { } } -#[tokio::test] +#[cfg(feature = "test-util")] // only with `tokio::time::Instant` +#[tokio::test(start_paused = true)] async fn it_works() { - tokio::time::pause(); let origin = Instant::now(); // No bias. @@ -101,9 +112,9 @@ async fn it_works() { assert_eq!(ticks.next_at().unwrap() - origin, Duration::from_secs(31)); } -#[tokio::test] +#[cfg(feature = "test-util")] // only with `tokio::time::Instant` +#[tokio::test(start_paused = true)] async fn it_skips_extra_ticks() { - tokio::time::pause(); let origin = Instant::now(); let mut ticks = Ticks::default(); From a505f9bc16bc8c674b512b5e1b929b3e69cdd52a Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Sun, 12 Nov 2023 21:43:44 +0400 Subject: [PATCH 21/24] chore(benches): use dedicated ports --- README.md | 2 +- benches/insert.rs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 46fe75e..93f051b 100644 --- a/README.md +++ b/README.md @@ -209,7 +209,7 @@ See [examples](https://github.com/loyd/clickhouse.rs/tree/master/examples). ## Feature Flags * `lz4` (enabled by default) — enables `Compression::Lz4` and `Compression::Lz4Hc(_)` variants. If enabled, `Compression::Lz4` is used by default for all queries except for `WATCH`. * `tls` (enabled by default) — supports urls with the `HTTPS` schema. -* `quanta` (enabled by default) - uses the [`quanta`](https://docs.rs/quanta) crate to speed the inserter up. Not used if `test-util` is enabled (thus, time can be managed by `tokio::time::advance()` in custom tests). +* `quanta` (enabled by default) - uses the [quanta](https://docs.rs/quanta) crate to speed the inserter up. Not used if `test-util` is enabled (thus, time can be managed by `tokio::time::advance()` in custom tests). * `test-util` — adds mocks. See [the example](https://github.com/loyd/clickhouse.rs/tree/master/examples/mock.rs). Use it only in `dev-dependencies`. * `watch` — enables `client.watch` functionality. See the corresponding section for details. * `uuid` — adds `serde::uuid` to work with [uuid](https://docs.rs/uuid) crate. diff --git a/benches/insert.rs b/benches/insert.rs index a7e486a..4389439 100644 --- a/benches/insert.rs +++ b/benches/insert.rs @@ -86,11 +86,11 @@ async fn run_inserter(client: Client, iters: u64) -> Result { Ok(start.elapsed()) } -fn run(c: &mut Criterion, name: &str, f: impl Fn(Client, u64) -> F) +fn run(c: &mut Criterion, name: &str, port: u16, f: impl Fn(Client, u64) -> F) where F: Future>, { - let addr = "127.0.0.1:6543".parse().unwrap(); + let addr = format!("127.0.0.1:{port}").parse().unwrap(); server::start(addr); let mut group = c.benchmark_group(name); @@ -128,11 +128,11 @@ where } fn insert(c: &mut Criterion) { - run(c, "insert", run_insert); + run(c, "insert", 6543, run_insert); } fn inserter(c: &mut Criterion) { - run(c, "inserter", run_inserter); + run(c, "inserter", 6544, run_inserter); } criterion_group!(benches, insert, inserter); From c4f71f55fb97cba84f9bbcd881dc3460f7abc879 Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Sun, 12 Nov 2023 21:58:39 +0400 Subject: [PATCH 22/24] docs(README): small improvements --- README.md | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 93f051b..0cb5062 100644 --- a/README.md +++ b/README.md @@ -17,16 +17,21 @@ A typed client for ClickHouse. [actions-url]: https://github.com/loyd/clickhouse.rs/actions/workflows/ci.yml * Uses `serde` for encoding/decoding rows. -* Uses `RowBinary` encoding. -* Supports HTTP and HTTPS. +* Supports `serde` attributes: `skip_serializing`, `skip_deserializing`, `rename`. +* Uses `RowBinary` encoding over HTTP transport. + * There are plans to switch to `Native` over TCP. +* Supports TLS. +* Supports compression and decompression (LZ4 and LZ4HC). * Provides API for selecting. * Provides API for inserting. * Provides API for infinite transactional (see below) inserting. * Provides API for watching live views. -* Compression and decompression (LZ4). * Provides mocks for unit testing. +Note: [ch2rs](https://github.com/loyd/ch2rs) is useful to generate a row type from ClickHouse. + ## Usage + To use the crate, add this to your `Cargo.toml`: ```toml [dependencies] @@ -126,7 +131,6 @@ insert.end().await?; * If `end()` isn't called, the `INSERT` is aborted. * Rows are being sent progressively to spread network load. * ClickHouse inserts batches atomically only if all rows fit in the same partition and their number is less [`max_insert_block_size`](https://clickhouse.tech/docs/en/operations/settings/settings/#settings-max_insert_block_size). -* [ch2rs](https://github.com/loyd/ch2rs) is useful to generate a row type from ClickHouse.
@@ -226,7 +230,7 @@ See [examples](https://github.com/loyd/clickhouse.rs/tree/master/examples). Example ```rust,ignore - #[derive(Debug, Serialize, Deserialize)] + #[derive(Row, Debug, Serialize, Deserialize)] struct MyRow<'a> { str: &'a str, string: String, From a99e3046ba488a86be96a683b7c32d60ca89b06f Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Sun, 12 Nov 2023 21:59:38 +0400 Subject: [PATCH 23/24] chore: move to 2021 edition --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 55e4dc2..7517d85 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ authors = ["Paul Loyd "] repository = "https://github.com/loyd/clickhouse.rs" license = "MIT OR Apache-2.0" readme = "README.md" -edition = "2018" +edition = "2021" rust-version = "1.60" [package.metadata.docs.rs] From 8c1cf9f973665fd9cbec764ede6bb8b3af4e01ab Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Wed, 15 Nov 2023 11:33:54 +0400 Subject: [PATCH 24/24] feat!(inserter): limit by size and make `write()` sync --- CHANGELOG.md | 5 + README.md | 16 ++-- benches/insert.rs | 16 ++-- examples/usage.rs | 6 +- rustfmt.toml | 2 +- src/insert.rs | 51 +++++++--- src/inserter.rs | 166 ++++++++++++++++++-------------- src/lib.rs | 7 -- src/ticks.rs | 203 +++++++++++++++++++++++----------------- tests/it/compression.rs | 9 +- tests/it/inserter.rs | 152 ++++++++++++++++++++++++++++++ tests/it/main.rs | 1 + 12 files changed, 432 insertions(+), 202 deletions(-) create mode 100644 tests/it/inserter.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 82916ae..bc5c2ef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,10 +10,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - derive: support `serde::skip_deserializing` ([#83]). - the `quanta` feature, enabled by default. +- inserter: can be limited by size, see `Inserter::with_max_bytes()`. ### Changed +- **BREAKING** inserter: `Inserter::write` is synchronous now. +- **BREAKING** inserter: rename `entries` to `rows`. - **BREAKING** drop the `wa-37420` feature. +- **BREAKING** remove deprecated items. - inserter: increase performance if the `quanta` feature is enabled. +- inserter: increase performance if the time limit isn't used. - derive: move to syn v2. ### Fixed diff --git a/README.md b/README.md index 0cb5062..b80ef65 100644 --- a/README.md +++ b/README.md @@ -143,22 +143,24 @@ insert.end().await?; ```rust,ignore let mut inserter = client.inserter("some")? .with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20))) - .with_max_entries(750_000) + .with_max_bytes(50_000_000) + .with_max_rows(750_000) .with_period(Some(Duration::from_secs(15))); -inserter.write(&MyRow { no: 0, name: "foo".into() }).await?; -inserter.write(&MyRow { no: 1, name: "bar".into() }).await?; +inserter.write(&MyRow { no: 0, name: "foo".into() })?; +inserter.write(&MyRow { no: 1, name: "bar".into() })?; let stats = inserter.commit().await?; -if stats.entries > 0 { +if stats.rows > 0 { println!( - "{} entries ({} transactions) have been inserted", - stats.entries, stats.transactions, + "{} bytes, {} rows, {} transactions have been inserted", + stats.bytes, stats.rows, stats.transactions, ); } ``` -* `Inserter` ends an active insert in `commit()` if thresholds (`max_entries`, `period`) are reached. +* `Inserter` ends an active insert in `commit()` if thresholds (`max_bytes`, `max_rows`, `period`) are reached. * The interval between ending active `INSERT`s can be biased by using `with_period_bias` to avoid load spikes by parallel inserters. +* `Inserter::time_left()` can be used to detect when the current period ends. Call `Inserter::commit()` again to check limits. * All rows between `commit()` calls are inserted in the same `INSERT` statement. * Do not forget to flush if you want to terminate inserting: ```rust,ignore diff --git a/benches/insert.rs b/benches/insert.rs index 4389439..47260b1 100644 --- a/benches/insert.rs +++ b/benches/insert.rs @@ -68,17 +68,15 @@ async fn run_insert(client: Client, iters: u64) -> Result { async fn run_inserter(client: Client, iters: u64) -> Result { let start = Instant::now(); - let mut inserter = client.inserter("table")?.with_max_entries(iters); + let mut inserter = client.inserter("table")?.with_max_rows(iters); for _ in 0..iters { - inserter - .write(&black_box(SomeRow { - a: 42, - b: 42, - c: 42, - d: 42, - })) - .await?; + inserter.write(&black_box(SomeRow { + a: 42, + b: 42, + c: 42, + d: 42, + }))?; inserter.commit().await?; } diff --git a/examples/usage.rs b/examples/usage.rs index a5d386a..c99a064 100644 --- a/examples/usage.rs +++ b/examples/usage.rs @@ -42,15 +42,15 @@ async fn insert(client: &Client) -> Result<()> { async fn inserter(client: &Client) -> Result<()> { let mut inserter = client .inserter("some")? - .with_max_entries(100_000) + .with_max_rows(100_000) .with_period(Some(Duration::from_secs(15))); for i in 0..1000 { if i == 500 { - inserter.set_max_entries(300); + inserter.set_max_rows(300); } - inserter.write(&MyRow { no: i, name: "foo" }).await?; + inserter.write(&MyRow { no: i, name: "foo" })?; inserter.commit().await?; } diff --git a/rustfmt.toml b/rustfmt.toml index f39ac2a..9abe81d 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,3 +1,3 @@ -edition = "2018" +edition = "2021" merge_derives = false diff --git a/src/insert.rs b/src/insert.rs index c0dd3cf..01334d9 100644 --- a/src/insert.rs +++ b/src/insert.rs @@ -19,7 +19,10 @@ use crate::{ const BUFFER_SIZE: usize = 128 * 1024; const MIN_CHUNK_SIZE: usize = BUFFER_SIZE - 1024; // slightly less to avoid extra reallocations -/// Performs only one `INSERT`. +/// Performs one `INSERT`. +/// +/// The [`Insert::end`] must be called to finalize the `INSERT`. +/// Otherwise, the whole `INSERT` will be aborted. /// /// Rows are being sent progressively to spread network load. #[must_use] @@ -143,7 +146,18 @@ impl Insert { self.end_timeout = end_timeout; } - /// Serializes and writes to the socket a provided row. + /// Serializes the provided row into an internal buffer. + /// Once the buffer is full, it's sent to a background task writing to the socket. + /// + /// Close to: + /// ```ignore + /// async fn write(&self, row: &T) -> Result; + /// ``` + /// + /// A returned future doesn't depend on the row's lifetime. + /// + /// Returns an error if the row cannot be serialized or the background task failed. + /// Once failed, the whole `INSERT` is aborted and cannot be used anymore. /// /// # Panics /// If called after previous call returned an error. @@ -151,12 +165,7 @@ impl Insert { where T: Serialize, { - assert!(self.sender.is_some(), "write() after error"); - - let result = rowbinary::serialize_into(&mut self.buffer, row); - if result.is_err() { - self.abort(); - } + let result = self.do_write(row); async move { result?; @@ -167,10 +176,30 @@ impl Insert { } } - /// Ends `INSERT`. - /// Succeeds if the server returns 200. + #[inline(always)] + pub(crate) fn do_write(&mut self, row: &T) -> Result + where + T: Serialize, + { + assert!(self.sender.is_some(), "write() after error"); + + let old_buf_size = self.buffer.len(); + let result = rowbinary::serialize_into(&mut self.buffer, row); + let written = self.buffer.len() - old_buf_size; + + if result.is_err() { + self.abort(); + } + + result.and(Ok(written)) + } + + /// Ends `INSERT`, the server starts processing the data. + /// + /// Succeeds if the server returns 200, that means the `INSERT` was handled successfully, + /// including all materialized views and quorum writes. /// - /// If it isn't called, the whole `INSERT` is aborted. + /// NOTE: If it isn't called, the whole `INSERT` is aborted. pub async fn end(mut self) -> Result<()> { if !self.buffer.is_empty() { self.send_chunk().await?; diff --git a/src/inserter.rs b/src/inserter.rs index d89231c..20611cc 100644 --- a/src/inserter.rs +++ b/src/inserter.rs @@ -1,51 +1,51 @@ use std::mem; -use futures::{ - future::{self, Either}, - Future, -}; use serde::Serialize; use tokio::time::Duration; -use crate::{ - error::Result, - insert::Insert, - row::Row, - ticks::{self, Ticks}, - Client, -}; +use crate::{error::Result, insert::Insert, row::Row, ticks::Ticks, Client}; -const DEFAULT_MAX_ENTRIES: u64 = 500_000; +const DEFAULT_MAX_ROWS: u64 = 500_000; /// Performs multiple consecutive `INSERT`s. /// +/// By default, it ends the current active `INSERT` every 500_000 rows. +/// Use `with_max_bytes`,`with_max_rows` and `with_period` to modify this behaviour. +/// /// Rows are being sent progressively to spread network load. +/// +/// All rows written by [`Inserter::write()`] between [`Inserter::commit()`] calls +/// are sent in one `INSERT` statement. #[must_use] pub struct Inserter { client: Client, table: String, - max_entries: u64, + max_bytes: u64, + max_rows: u64, send_timeout: Option, end_timeout: Option, insert: Option>, ticks: Ticks, - committed: Quantities, - uncommitted_entries: u64, + pending: Quantities, + in_transaction: bool, } -/// Statistics about inserted rows. +/// Statistics about pending or inserted data. #[derive(Debug, Clone, PartialEq, Eq)] pub struct Quantities { - /// How many rows ([`Inserter::write`]) have been inserted. - pub entries: u64, - /// How many nonempty transactions ([`Inserter::commit`]) have been inserted. + /// The number of uncompressed bytes. + pub bytes: u64, + /// The number for rows (calls of [`Inserter::write`]). + pub rows: u64, + /// The number of nonempty transactions (calls of [`Inserter::commit`]). pub transactions: u64, } impl Quantities { /// Just zero quantities, nothing special. pub const ZERO: Quantities = Quantities { - entries: 0, + bytes: 0, + rows: 0, transactions: 0, }; } @@ -58,17 +58,18 @@ where Ok(Self { client: client.clone(), table: table.into(), - max_entries: DEFAULT_MAX_ENTRIES, + max_bytes: u64::MAX, + max_rows: DEFAULT_MAX_ROWS, send_timeout: None, end_timeout: None, insert: None, ticks: Ticks::default(), - committed: Quantities::ZERO, - uncommitted_entries: 0, + pending: Quantities::ZERO, + in_transaction: false, }) } - /// See [`Insert::with_max_entries()`]. + /// See [`Insert::with_timeouts()`]. /// /// Note that [`Inserter::commit()`] can call [`Insert::end()`] inside, /// so `end_timeout` is also applied to `commit()` method. @@ -81,14 +82,25 @@ where self } + /// The maximum number of uncompressed bytes in one `INSERT` statement. + /// + /// Note: ClickHouse inserts batches atomically only if all rows fit in the same partition + /// and their number is less [`max_insert_block_size`](https://clickhouse.tech/docs/en/operations/settings/settings/#settings-max_insert_block_size). + /// + /// Unlimited by default. + pub fn with_max_bytes(mut self, threshold: u64) -> Self { + self.set_max_bytes(threshold); + self + } + /// The maximum number of rows in one `INSERT` statement. /// /// Note: ClickHouse inserts batches atomically only if all rows fit in the same partition /// and their number is less [`max_insert_block_size`](https://clickhouse.tech/docs/en/operations/settings/settings/#settings-max_insert_block_size). /// /// `500_000` by default. - pub fn with_max_entries(mut self, threshold: u64) -> Self { - self.set_max_entries(threshold); + pub fn with_max_rows(mut self, threshold: u64) -> Self { + self.set_max_rows(threshold); self } @@ -99,6 +111,12 @@ where /// However, it's possible to use [`Inserter::time_left()`] and set a timer up /// to call [`Inserter::commit()`] to check passed time again. /// + /// Extra ticks are skipped if the previous `INSERT` is still in progress: + /// ```text + /// Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 | + /// Actual ticks: | work -----| delay | work ---| work -----| work -----| + /// ``` + /// /// `None` by default. pub fn with_period(mut self, period: Option) -> Self { self.set_period(period); @@ -106,22 +124,18 @@ where } /// Adds a bias to the period. The actual period will be in the following range: - /// ```ignore + /// ```text /// [period * (1 - bias), period * (1 + bias)] /// ``` /// + /// The `bias` parameter is clamped to the range `[0, 1]`. + /// /// It helps to avoid producing a lot of `INSERT`s at the same time by multiple inserters. pub fn with_period_bias(mut self, bias: f64) -> Self { self.set_period_bias(bias); self } - #[deprecated(note = "use `with_period()` instead")] - pub fn with_max_duration(mut self, threshold: Duration) -> Self { - self.set_period(Some(threshold)); - self - } - /// See [`Inserter::with_timeouts()`]. pub fn set_timeouts(&mut self, send_timeout: Option, end_timeout: Option) { self.send_timeout = send_timeout; @@ -131,9 +145,14 @@ where } } - /// See [`Inserter::with_max_entries()`]. - pub fn set_max_entries(&mut self, threshold: u64) { - self.max_entries = threshold; + /// See [`Inserter::with_max_bytes()`]. + pub fn set_max_bytes(&mut self, threshold: u64) { + self.max_bytes = threshold; + } + + /// See [`Inserter::with_max_rows()`]. + pub fn set_max_rows(&mut self, threshold: u64) { + self.max_rows = threshold; } /// See [`Inserter::with_period()`]. @@ -148,52 +167,58 @@ where self.ticks.reschedule(); } - #[deprecated(note = "use `set_period()` instead")] - pub fn set_max_duration(&mut self, threshold: Duration) { - self.ticks.set_period(Some(threshold)); - } - /// How much time we have until the next tick. /// /// `None` if the period isn't configured. pub fn time_left(&mut self) -> Option { - Some( - self.ticks - .next_at()? - .saturating_duration_since(ticks::Instant::now()), - ) + self.ticks.time_left() + } + + /// Returns statistics about data not yet inserted into ClickHouse. + pub fn pending(&self) -> &Quantities { + &self.pending } - /// Serializes and writes to the socket a provided row. + /// Serializes the provided row into an internal buffer. + /// + /// To check limits and sent to ClickHouse, call [`Inserter::commit()`]. /// /// # Panics /// If called after previous call returned an error. #[inline] - pub fn write<'a>(&'a mut self, row: &T) -> impl Future> + 'a + Send + pub fn write(&mut self, row: &T) -> Result<()> where T: Serialize, { - self.uncommitted_entries += 1; if self.insert.is_none() { - if let Err(e) = self.init_insert() { - return Either::Right(future::ready(Result::<()>::Err(e))); + self.init_insert()?; + } + + match self.insert.as_mut().unwrap().do_write(row) { + Ok(bytes) => { + self.pending.bytes += bytes as u64; + self.pending.rows += 1; + + if !self.in_transaction { + self.pending.transactions += 1; + self.in_transaction = true; + } + + Ok(()) + } + Err(err) => { + self.pending = Quantities::ZERO; + Err(err) } } - Either::Left(self.insert.as_mut().unwrap().write(row)) } - /// Checks limits and ends a current `INSERT` if they are reached. + /// Checks limits and ends the current `INSERT` if they are reached. pub async fn commit(&mut self) -> Result { - if self.uncommitted_entries > 0 { - self.committed.entries += self.uncommitted_entries; - self.committed.transactions += 1; - self.uncommitted_entries = 0; - } - - let now = ticks::Instant::now(); + self.in_transaction = false; - Ok(if self.is_threshold_reached(now) { - let quantities = mem::replace(&mut self.committed, Quantities::ZERO); + Ok(if self.limits_reached() { + let quantities = mem::replace(&mut self.pending, Quantities::ZERO); let result = self.insert().await; self.ticks.reschedule(); result?; @@ -203,19 +228,18 @@ where }) } - /// Ends a current `INSERT` and whole `Inserter` unconditionally. + /// Ends the current `INSERT` and whole `Inserter` unconditionally. /// /// If it isn't called, the current `INSERT` is aborted. pub async fn end(mut self) -> Result { - if let Some(insert) = self.insert.take() { - insert.end().await?; - } - Ok(self.committed) + self.insert().await?; + Ok(self.pending) } - fn is_threshold_reached(&self, now: ticks::Instant) -> bool { - self.committed.entries >= self.max_entries - || self.ticks.next_at().map_or(false, |next_at| now >= next_at) + fn limits_reached(&self) -> bool { + self.pending.rows >= self.max_rows + || self.pending.bytes >= self.max_bytes + || self.ticks.reached() } async fn insert(&mut self) -> Result<()> { @@ -229,6 +253,8 @@ where #[inline(never)] fn init_insert(&mut self) -> Result<()> { debug_assert!(self.insert.is_none()); + debug_assert_eq!(self.pending, Quantities::ZERO); + let mut new_insert: Insert = self.client.insert(&self.table)?; new_insert.set_timeouts(self.send_timeout, self.end_timeout); self.insert = Some(new_insert); diff --git a/src/lib.rs b/src/lib.rs index 1533371..f767058 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,13 +28,6 @@ pub mod test; #[cfg(feature = "watch")] pub mod watch; -#[cfg(feature = "uuid")] -#[doc(hidden)] -#[deprecated(since = "0.11.1", note = "use `clickhouse::serde::uuid` instead")] -pub mod uuid { - pub use crate::serde::uuid::*; -} - mod buflist; mod compression; mod cursor; diff --git a/src/ticks.rs b/src/ticks.rs index e8756ae..d6ea56a 100644 --- a/src/ticks.rs +++ b/src/ticks.rs @@ -6,10 +6,10 @@ const PERIOD_THRESHOLD: Duration = Duration::from_secs(365 * 24 * 3600); // More efficient `Instant` based on TSC. #[cfg(all(feature = "quanta", not(feature = "test-util")))] -pub(crate) type Instant = quanta::Instant; +type Instant = quanta::Instant; #[cfg(any(not(feature = "quanta"), feature = "test-util"))] -pub(crate) type Instant = tokio::time::Instant; +type Instant = tokio::time::Instant; // === Ticks === @@ -40,8 +40,13 @@ impl Ticks { self.max_bias = max_bias.clamp(0., 1.); } - pub(crate) fn next_at(&self) -> Option { + pub(crate) fn time_left(&self) -> Option { self.next_at + .map(|n| n.saturating_duration_since(Instant::now())) + } + + pub(crate) fn reached(&self) -> bool { + self.next_at.map_or(false, |n| Instant::now() >= n) } pub(crate) fn reschedule(&mut self) { @@ -73,91 +78,113 @@ impl Ticks { } } -#[cfg(feature = "test-util")] // only with `tokio::time::Instant` -#[tokio::test(start_paused = true)] -async fn it_works() { - let origin = Instant::now(); - - // No bias. - let mut ticks = Ticks::default(); - ticks.set_period(Some(Duration::from_secs(10))); - ticks.reschedule(); - - assert_eq!(ticks.next_at().unwrap() - origin, Duration::from_secs(10)); - tokio::time::advance(Duration::from_secs(3)).await; - ticks.reschedule(); - assert_eq!(ticks.next_at().unwrap() - origin, Duration::from_secs(10)); - tokio::time::advance(Duration::from_secs(7)).await; - ticks.reschedule(); - assert_eq!(ticks.next_at().unwrap() - origin, Duration::from_secs(20)); - - // Up to 10% bias. - ticks.set_period_bias(0.1); - ticks.reschedule(); - assert_eq!(ticks.next_at().unwrap() - origin, Duration::from_secs(19)); - tokio::time::advance(Duration::from_secs(12)).await; - ticks.reschedule(); - assert_eq!(ticks.next_at().unwrap() - origin, Duration::from_secs(29)); - - // Try other seeds. - tokio::time::advance(Duration::from_nanos(32768)).await; - ticks.reschedule(); - assert_eq!( - (ticks.next_at().unwrap() - origin).as_secs_f64().round(), - 30. - ); - - tokio::time::advance(Duration::from_nanos(32767)).await; - ticks.reschedule(); - assert_eq!(ticks.next_at().unwrap() - origin, Duration::from_secs(31)); -} +#[cfg(test)] +mod tests { + use super::*; + + #[cfg(feature = "test-util")] // only with `tokio::time::Instant` + #[tokio::test(start_paused = true)] + async fn smoke() { + // No bias. + let mut ticks = Ticks::default(); + ticks.set_period(Some(Duration::from_secs(10))); + ticks.reschedule(); + + assert_eq!(ticks.time_left(), Some(Duration::from_secs(10))); + assert!(!ticks.reached()); + tokio::time::advance(Duration::from_secs(3)).await; + ticks.reschedule(); + assert_eq!(ticks.time_left(), Some(Duration::from_secs(7))); + assert!(!ticks.reached()); + tokio::time::advance(Duration::from_secs(7)).await; + assert!(ticks.reached()); + ticks.reschedule(); + assert_eq!(ticks.time_left(), Some(Duration::from_secs(10))); + assert!(!ticks.reached()); + + // Up to 10% bias. + ticks.set_period_bias(0.1); + ticks.reschedule(); + assert_eq!(ticks.time_left(), Some(Duration::from_secs(9))); + assert!(!ticks.reached()); + tokio::time::advance(Duration::from_secs(12)).await; + assert!(ticks.reached()); + ticks.reschedule(); + assert_eq!(ticks.time_left(), Some(Duration::from_secs(7))); + assert!(!ticks.reached()); + + // Try other seeds. + tokio::time::advance(Duration::from_nanos(32768)).await; + ticks.reschedule(); + assert_eq!( + ticks.time_left(), + Some(Duration::from_secs_f64(7.999982492)) + ); + + tokio::time::advance(Duration::from_nanos(32767)).await; + ticks.reschedule(); + assert_eq!( + ticks.time_left(), + Some(Duration::from_secs_f64(8.999934465)) + ); + } -#[cfg(feature = "test-util")] // only with `tokio::time::Instant` -#[tokio::test(start_paused = true)] -async fn it_skips_extra_ticks() { - let origin = Instant::now(); - - let mut ticks = Ticks::default(); - ticks.set_period(Some(Duration::from_secs(10))); - ticks.set_period_bias(0.1); - ticks.reschedule(); - - // Trivial case, just skip several ticks. - assert_eq!(ticks.next_at().unwrap() - origin, Duration::from_secs(9)); - tokio::time::advance(Duration::from_secs(30)).await; - ticks.reschedule(); - assert_eq!(ticks.next_at().unwrap() - origin, Duration::from_secs(39)); - - // Hit biased zone. - tokio::time::advance(Duration::from_secs(19)).await; - ticks.reschedule(); - assert_eq!(ticks.next_at().unwrap() - origin, Duration::from_secs(59)); -} + #[cfg(feature = "test-util")] // only with `tokio::time::Instant` + #[tokio::test(start_paused = true)] + async fn skip_extra_ticks() { + let mut ticks = Ticks::default(); + ticks.set_period(Some(Duration::from_secs(10))); + ticks.set_period_bias(0.1); + ticks.reschedule(); + + // Trivial case, just skip several ticks. + assert_eq!(ticks.time_left(), Some(Duration::from_secs(9))); + assert!(!ticks.reached()); + tokio::time::advance(Duration::from_secs(30)).await; + assert!(ticks.reached()); + ticks.reschedule(); + assert_eq!(ticks.time_left(), Some(Duration::from_secs(9))); + assert!(!ticks.reached()); + + // Hit biased zone. + tokio::time::advance(Duration::from_secs(19)).await; + assert!(ticks.reached()); + ticks.reschedule(); + assert_eq!(ticks.time_left(), Some(Duration::from_secs(10))); + assert!(!ticks.reached()); + } -#[tokio::test] -async fn it_is_disabled() { - let mut ticks = Ticks::default(); - assert!(ticks.next_at().is_none()); - ticks.reschedule(); - assert!(ticks.next_at().is_none()); - - // Not disabled. - ticks.set_period(Some(Duration::from_secs(10))); - ticks.reschedule(); - assert!(ticks.next_at().is_some()); - - // Explicitly. - ticks.set_period(None); - ticks.reschedule(); - assert!(ticks.next_at().is_none()); - - // Zero duration. - ticks.set_period(Some(Duration::from_secs(0))); - ticks.reschedule(); - assert!(ticks.next_at().is_none()); - - // Too big duration. - ticks.set_period(Some(PERIOD_THRESHOLD)); - ticks.reschedule(); - assert!(ticks.next_at().is_none()); + #[tokio::test] + async fn disabled() { + let mut ticks = Ticks::default(); + assert_eq!(ticks.time_left(), None); + assert!(!ticks.reached()); + ticks.reschedule(); + assert_eq!(ticks.time_left(), None); + assert!(!ticks.reached()); + + // Not disabled. + ticks.set_period(Some(Duration::from_secs(10))); + ticks.reschedule(); + assert!(ticks.time_left().unwrap() < Duration::from_secs(10)); + assert!(!ticks.reached()); + + // Explicitly. + ticks.set_period(None); + ticks.reschedule(); + assert_eq!(ticks.time_left(), None); + assert!(!ticks.reached()); + + // Zero duration. + ticks.set_period(Some(Duration::from_secs(0))); + ticks.reschedule(); + assert_eq!(ticks.time_left(), None); + assert!(!ticks.reached()); + + // Too big duration. + ticks.set_period(Some(PERIOD_THRESHOLD)); + ticks.reschedule(); + assert_eq!(ticks.time_left(), None); + assert!(!ticks.reached()); + } } diff --git a/tests/it/compression.rs b/tests/it/compression.rs index 1ade29a..8994b7e 100644 --- a/tests/it/compression.rs +++ b/tests/it/compression.rs @@ -21,14 +21,11 @@ async fn check(client: Client) { .await .unwrap(); - let mut inserter = client.inserter("test").unwrap(); - + let mut insert = client.insert("test").unwrap(); for i in 0..200_000 { - inserter.write(&MyRow { no: i, name: "foo" }).await.unwrap(); - inserter.commit().await.unwrap(); + insert.write(&MyRow { no: i, name: "foo" }).await.unwrap(); } - - inserter.end().await.unwrap(); + insert.end().await.unwrap(); // Check data. diff --git a/tests/it/inserter.rs b/tests/it/inserter.rs new file mode 100644 index 0000000..a6d35aa --- /dev/null +++ b/tests/it/inserter.rs @@ -0,0 +1,152 @@ +use serde::Serialize; + +use clickhouse::{inserter::Quantities, Client, Row}; + +#[derive(Debug, Row, Serialize)] +struct MyRow { + data: String, +} + +async fn create_table(client: &Client) { + client + .query("CREATE TABLE test(data String) ENGINE = MergeTree ORDER BY data") + .execute() + .await + .unwrap(); +} + +#[tokio::test] +async fn limited_by_rows() { + let client = prepare_database!(); + create_table(&client).await; + + let mut inserter = client.inserter("test").unwrap().with_max_rows(10); + let rows = 100; + + for i in (2..=rows).step_by(2) { + let row = MyRow { + data: (i - 1).to_string(), + }; + inserter.write(&row).unwrap(); + let row = MyRow { + data: i.to_string(), + }; + inserter.write(&row).unwrap(); + + let inserted = inserter.commit().await.unwrap(); + let pending = inserter.pending(); + + if i % 10 == 0 { + assert_ne!(inserted.bytes, 0); + assert_eq!(inserted.rows, 10); + assert_eq!(inserted.transactions, 5); + assert_eq!(pending, &Quantities::ZERO); + } else { + assert_eq!(inserted, Quantities::ZERO); + assert_ne!(pending.bytes, 0); + assert_eq!(pending.rows, i % 10); + assert_eq!(pending.transactions, (i % 10) / 2); + } + } + + assert_eq!(inserter.end().await.unwrap(), Quantities::ZERO); + + let (count, sum) = client + .query("SELECT count(), sum(toUInt64(data)) FROM test") + .fetch_one::<(u64, u64)>() + .await + .unwrap(); + + assert_eq!(count, rows); + assert_eq!(sum, (1..=rows).sum::()); +} + +#[tokio::test] +async fn limited_by_bytes() { + let client = prepare_database!(); + create_table(&client).await; + + let mut inserter = client.inserter("test").unwrap().with_max_bytes(100); + let rows = 100; + + let row = MyRow { + data: "x".repeat(9), // +1 for length + }; + + for i in 1..=rows { + inserter.write(&row).unwrap(); + + let inserted = inserter.commit().await.unwrap(); + let pending = inserter.pending(); + + if i % 10 == 0 { + assert_eq!(inserted.bytes, 100); + assert_eq!(inserted.rows, 10); + assert_eq!(inserted.transactions, 10); + assert_eq!(pending, &Quantities::ZERO); + } else { + assert_eq!(inserted, Quantities::ZERO); + assert_eq!(pending.bytes, (i % 10) * 10); + assert_eq!(pending.rows, i % 10); + assert_eq!(pending.transactions, i % 10); + } + } + + assert_eq!(inserter.end().await.unwrap(), Quantities::ZERO); + + let count = client + .query("SELECT count() FROM test") + .fetch_one::() + .await + .unwrap(); + + assert_eq!(count, rows); +} + +#[cfg(feature = "test-util")] // only with `tokio::time::Instant` +#[tokio::test(start_paused = true)] +async fn limited_by_time() { + use std::time::Duration; + + let client = prepare_database!(); + create_table(&client).await; + + let period = Duration::from_secs(1); + let mut inserter = client.inserter("test").unwrap().with_period(Some(period)); + let rows = 100; + + for i in 1..=rows { + let row = MyRow { + data: i.to_string(), + }; + inserter.write(&row).unwrap(); + + tokio::time::sleep(period / 10).await; + + let inserted = inserter.commit().await.unwrap(); + let pending = inserter.pending(); + + if i % 10 == 0 { + assert_ne!(inserted.bytes, 0); + assert_eq!(inserted.rows, 10); + assert_eq!(inserted.transactions, 10); + assert_eq!(pending, &Quantities::ZERO); + } else { + assert_eq!(inserted, Quantities::ZERO); + assert_ne!(pending.bytes, 0); + assert_eq!(pending.rows, i % 10); + assert_eq!(pending.transactions, i % 10); + } + } + + assert_eq!(inserter.end().await.unwrap(), Quantities::ZERO); + + let (count, sum) = client + .query("SELECT count(), sum(toUInt64(data)) FROM test") + .fetch_one::<(u64, u64)>() + .await + .unwrap(); + + assert_eq!(count, rows); + assert_eq!(sum, (1..=rows).sum::()); +} diff --git a/tests/it/main.rs b/tests/it/main.rs index 7a1958a..5e5b952 100644 --- a/tests/it/main.rs +++ b/tests/it/main.rs @@ -15,6 +15,7 @@ macro_rules! prepare_database { mod compression; mod cursor_error; +mod inserter; mod ip; mod nested; mod query;