From 0e05f57d76ac92a29b3898d9779a672085fbdf04 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Sun, 21 May 2023 18:00:47 +0900 Subject: [PATCH 1/6] test: add a new test case for async insert --- tests/test_query.rs | 63 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 62 insertions(+), 1 deletion(-) diff --git a/tests/test_query.rs b/tests/test_query.rs index 04d566f..d580607 100644 --- a/tests/test_query.rs +++ b/tests/test_query.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -use clickhouse::{error::Error, Row}; +use clickhouse::{error::Error, insert::AsyncInsertOptions, Row}; mod common; @@ -54,6 +54,67 @@ async fn smoke() { } } +#[common::named] +#[tokio::test] +async fn async_insert() { + let client = common::prepare_database!(); + + #[derive(Debug, Row, Serialize, Deserialize)] + struct MyRow<'a> { + no: u32, + name: &'a str, + } + + // Create a table. + client + .query( + " + CREATE TABLE test(no UInt32, name LowCardinality(String)) + ENGINE = MergeTree + ORDER BY no + ", + ) + .execute() + .await + .unwrap(); + + // Async-write to the table. + let opts = AsyncInsertOptions::builder() + .async_insert(true) + .async_insert_threads(8) + .wait_for_async_insert(true) + .wait_for_async_insert_timeout(42) + .async_insert_max_data_size(42) + .async_insert_max_query_number(42) + .async_insert_busy_timeout_ms(42) + .async_insert_stale_timeout_ms(42) + .async_insert_deduplicate(true) + .build(); + let mut insert = client.async_insert("test", opts).unwrap(); + for i in 0..1000 { + insert.write(&MyRow { no: i, name: "foo" }).await.unwrap(); + } + + insert.end().await.unwrap(); + + // Read from the table. + let mut cursor = client + .query("SELECT ?fields FROM test WHERE name = ? AND no BETWEEN ? AND ?.2") + .bind("foo") + .bind(500) + .bind((42, 504)) + .fetch::>() + .unwrap(); + + let mut i = 500; + + while let Some(row) = cursor.next().await.unwrap() { + assert_eq!(row.no, i); + assert_eq!(row.name, "foo"); + i += 1; + } +} + #[common::named] #[tokio::test] async fn fetch_one_and_optional() { From 76de7a4803b60c3b8529b0a5769b1ed960c28f10 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Sun, 21 May 2023 18:04:11 +0900 Subject: [PATCH 2/6] feat: implement AsyncInsertOptions to enable async inserts --- src/insert.rs | 185 +++++++++++++++++++++++++++++++++++++++++++++++++- src/lib.rs | 11 ++- 2 files changed, 193 insertions(+), 3 deletions(-) diff --git a/src/insert.rs b/src/insert.rs index f9af43a..123c6c0 100644 --- a/src/insert.rs +++ b/src/insert.rs @@ -54,7 +54,11 @@ macro_rules! timeout { } impl Insert { - pub(crate) fn new(client: &Client, table: &str) -> Result + pub(crate) fn new( + client: &Client, + table: &str, + async_insert_options: AsyncInsertOptions, + ) -> Result where T: Row, { @@ -69,9 +73,11 @@ impl Insert { let fields = row::join_column_names::() .expect("the row type must be a struct or a wrapper around it"); + let settings_clause = async_insert_options.into_query(); + // TODO: what about escaping a table name? // https://clickhouse.yandex/docs/en/query_language/syntax/#syntax-identifiers - let query = format!("INSERT INTO {table}({fields}) FORMAT RowBinary"); + let query = format!("INSERT INTO {table} ({fields}) FORMAT RowBinary {settings_clause}"); pairs.append_pair("query", &query); if client.compression.is_lz4() { @@ -274,3 +280,178 @@ impl Drop for Insert { self.abort(); } } + +#[derive(Debug, Default)] +pub struct AsyncInsertOptions { + async_insert: OptionValue, + async_insert_threads: OptionValue, + wait_for_async_insert: OptionValue, + wait_for_async_insert_timeout: OptionValue, + async_insert_max_data_size: OptionValue, + async_insert_max_query_number: OptionValue, + async_insert_busy_timeout_ms: OptionValue, + async_insert_stale_timeout_ms: OptionValue, + async_insert_deduplicate: OptionValue, +} + +impl AsyncInsertOptions { + fn into_query(self) -> String { + let mut options = vec![]; + + if let OptionValue::Specified(async_insert) = self.async_insert { + let value = if async_insert { 1 } else { 0 }; + options.push(format!("async_insert={value}")); + } + + if let OptionValue::Specified(async_insert_threads) = self.async_insert_threads { + options.push(format!("async_insert_threads={async_insert_threads}")); + } + + if let OptionValue::Specified(wait_for_async_insert) = self.wait_for_async_insert { + let value = if wait_for_async_insert { 1 } else { 0 }; + options.push(format!("wait_for_async_insert={value}")); + } + + if let OptionValue::Specified(wait_for_async_insert_timeout) = + self.wait_for_async_insert_timeout + { + options.push(format!( + "wait_for_async_insert_timeout={wait_for_async_insert_timeout}" + )); + } + + if let OptionValue::Specified(async_insert_max_data_size) = self.async_insert_max_data_size + { + options.push(format!( + "async_insert_max_data_size={async_insert_max_data_size}" + )); + } + + if let OptionValue::Specified(async_insert_max_query_number) = + self.async_insert_max_query_number + { + options.push(format!( + "async_insert_max_query_number={async_insert_max_query_number}" + )); + } + + if let OptionValue::Specified(async_insert_busy_timeout_ms) = + self.async_insert_busy_timeout_ms + { + options.push(format!( + "async_insert_busy_timeout_ms={async_insert_busy_timeout_ms}" + )); + } + + if let OptionValue::Specified(async_insert_stale_timeout_ms) = + self.async_insert_stale_timeout_ms + { + options.push(format!( + "async_insert_stale_timeout_ms={async_insert_stale_timeout_ms}" + )); + } + + if let OptionValue::Specified(async_insert_deduplicate) = self.async_insert_deduplicate { + let value = if async_insert_deduplicate { 1 } else { 0 }; + options.push(format!("async_insert_deduplicate={value}")); + } + + if options.is_empty() { + return "".to_string(); + } + + format!("SETTINGS {}", options.join(", ")) + } +} + +#[derive(Debug)] +enum OptionValue { + Unspecified, + Specified(T), +} + +impl Default for OptionValue { + fn default() -> Self { + Self::Unspecified + } +} + +#[derive(Debug, Default)] +pub struct AsyncInsertOptionsBuilder { + async_insert: OptionValue, + async_insert_threads: OptionValue, + wait_for_async_insert: OptionValue, + wait_for_async_insert_timeout: OptionValue, + async_insert_max_data_size: OptionValue, + async_insert_max_query_number: OptionValue, + async_insert_busy_timeout_ms: OptionValue, + async_insert_stale_timeout_ms: OptionValue, + async_insert_deduplicate: OptionValue, +} + +impl AsyncInsertOptionsBuilder { + pub fn build(self) -> AsyncInsertOptions { + AsyncInsertOptions { + async_insert: self.async_insert, + async_insert_threads: self.async_insert_threads, + wait_for_async_insert: self.wait_for_async_insert, + wait_for_async_insert_timeout: self.wait_for_async_insert_timeout, + async_insert_max_data_size: self.async_insert_max_data_size, + async_insert_max_query_number: self.async_insert_max_query_number, + async_insert_busy_timeout_ms: self.async_insert_busy_timeout_ms, + async_insert_stale_timeout_ms: self.async_insert_stale_timeout_ms, + async_insert_deduplicate: self.async_insert_deduplicate, + } + } + + pub fn async_insert(mut self, async_insert: bool) -> Self { + self.async_insert = OptionValue::Specified(async_insert); + self + } + + pub fn async_insert_threads(mut self, async_insert_threads: usize) -> Self { + self.async_insert_threads = OptionValue::Specified(async_insert_threads); + self + } + + pub fn wait_for_async_insert(mut self, wait_for_async_insert: bool) -> Self { + self.wait_for_async_insert = OptionValue::Specified(wait_for_async_insert); + self + } + + pub fn wait_for_async_insert_timeout(mut self, wait_for_async_insert_timeout: usize) -> Self { + self.wait_for_async_insert_timeout = OptionValue::Specified(wait_for_async_insert_timeout); + self + } + + pub fn async_insert_max_data_size(mut self, async_insert_max_data_size: usize) -> Self { + self.async_insert_max_data_size = OptionValue::Specified(async_insert_max_data_size); + self + } + + pub fn async_insert_max_query_number(mut self, async_insert_max_query_number: usize) -> Self { + self.async_insert_max_query_number = OptionValue::Specified(async_insert_max_query_number); + self + } + + pub fn async_insert_busy_timeout_ms(mut self, async_insert_busy_timeout_ms: usize) -> Self { + self.async_insert_busy_timeout_ms = OptionValue::Specified(async_insert_busy_timeout_ms); + self + } + + pub fn async_insert_stale_timeout_ms(mut self, async_insert_stale_timeout_ms: usize) -> Self { + self.async_insert_stale_timeout_ms = OptionValue::Specified(async_insert_stale_timeout_ms); + self + } + + pub fn async_insert_deduplicate(mut self, async_insert_deduplicate: bool) -> Self { + self.async_insert_deduplicate = OptionValue::Specified(async_insert_deduplicate); + self + } +} + +impl AsyncInsertOptions { + pub fn builder() -> AsyncInsertOptionsBuilder { + AsyncInsertOptionsBuilder::default() + } +} diff --git a/src/lib.rs b/src/lib.rs index 3af1e77..8cafe3c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,6 +13,7 @@ use hyper::client::connect::HttpConnector; use hyper_tls::HttpsConnector; pub use clickhouse_derive::Row; +use insert::AsyncInsertOptions; pub use self::{compression::Compression, row::Row}; use self::{error::Result, http_client::HttpClient}; @@ -177,7 +178,15 @@ impl Client { /// # Panics /// If `T` has unnamed fields, e.g. tuples. pub fn insert(&self, table: &str) -> Result> { - insert::Insert::new(self, table) + insert::Insert::new(self, table, AsyncInsertOptions::default()) + } + + pub fn async_insert( + &self, + table: &str, + async_insert_options: AsyncInsertOptions, + ) -> Result> { + insert::Insert::new(self, table, async_insert_options) } /// Creates an inserter to perform multiple INSERTs. From ea2eaf67158e06903123caab06271746cbff7c8e Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Sun, 21 May 2023 18:19:34 +0900 Subject: [PATCH 3/6] doc: add doc comments for AsyncInsertOptions --- src/insert.rs | 18 ++++++++++++++++++ src/lib.rs | 1 + 2 files changed, 19 insertions(+) diff --git a/src/insert.rs b/src/insert.rs index 123c6c0..a81338a 100644 --- a/src/insert.rs +++ b/src/insert.rs @@ -281,6 +281,7 @@ impl Drop for Insert { } } +/// The option values to be used when configuring [async insert](https://clickhouse.com/docs/en/cloud/bestpractices/asynchronous-inserts). #[derive(Debug, Default)] pub struct AsyncInsertOptions { async_insert: OptionValue, @@ -376,6 +377,7 @@ impl Default for OptionValue { } } +/// A builder for [`AsyncInsertOptions`]. #[derive(Debug, Default)] pub struct AsyncInsertOptionsBuilder { async_insert: OptionValue, @@ -390,6 +392,12 @@ pub struct AsyncInsertOptionsBuilder { } impl AsyncInsertOptionsBuilder { + /// Starts a new builder. + pub fn new() -> Self { + Self::default() + } + + /// Builds [`AsyncInsertOptions`] with the specified options. pub fn build(self) -> AsyncInsertOptions { AsyncInsertOptions { async_insert: self.async_insert, @@ -404,46 +412,55 @@ impl AsyncInsertOptionsBuilder { } } + /// Specifies whether to enable async insert. See for more info. pub fn async_insert(mut self, async_insert: bool) -> Self { self.async_insert = OptionValue::Specified(async_insert); self } + /// Specifies the number of threads for async insert. See for more info. pub fn async_insert_threads(mut self, async_insert_threads: usize) -> Self { self.async_insert_threads = OptionValue::Specified(async_insert_threads); self } + /// Specifies whether to enable waiting for processing of asynchronous insertion. See for more info. pub fn wait_for_async_insert(mut self, wait_for_async_insert: bool) -> Self { self.wait_for_async_insert = OptionValue::Specified(wait_for_async_insert); self } + /// Specifies the timeout in seconds for waiting for processing of asynchronous insertion. See for more info. pub fn wait_for_async_insert_timeout(mut self, wait_for_async_insert_timeout: usize) -> Self { self.wait_for_async_insert_timeout = OptionValue::Specified(wait_for_async_insert_timeout); self } + /// Specifies the maximum size of unparsed data in bytes collected per query before being inserted. See for more info. pub fn async_insert_max_data_size(mut self, async_insert_max_data_size: usize) -> Self { self.async_insert_max_data_size = OptionValue::Specified(async_insert_max_data_size); self } + /// Specifies the maximum number of queries per block before being inserted. See for more info. pub fn async_insert_max_query_number(mut self, async_insert_max_query_number: usize) -> Self { self.async_insert_max_query_number = OptionValue::Specified(async_insert_max_query_number); self } + /// Specifies the maximum timeout in milliseconds since the first `INSERT` query before inserting. See for more info. pub fn async_insert_busy_timeout_ms(mut self, async_insert_busy_timeout_ms: usize) -> Self { self.async_insert_busy_timeout_ms = OptionValue::Specified(async_insert_busy_timeout_ms); self } + /// Specifies the maximum timeout in milliseconds since the last `INSERT` query before inserting. See for more info. pub fn async_insert_stale_timeout_ms(mut self, async_insert_stale_timeout_ms: usize) -> Self { self.async_insert_stale_timeout_ms = OptionValue::Specified(async_insert_stale_timeout_ms); self } + /// Specifies whether to enable deduplication of data in async insert. See for more info. pub fn async_insert_deduplicate(mut self, async_insert_deduplicate: bool) -> Self { self.async_insert_deduplicate = OptionValue::Specified(async_insert_deduplicate); self @@ -451,6 +468,7 @@ impl AsyncInsertOptionsBuilder { } impl AsyncInsertOptions { + /// Starts a new builder. pub fn builder() -> AsyncInsertOptionsBuilder { AsyncInsertOptionsBuilder::default() } diff --git a/src/lib.rs b/src/lib.rs index 8cafe3c..eeca93d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -181,6 +181,7 @@ impl Client { insert::Insert::new(self, table, AsyncInsertOptions::default()) } + /// Starts a new INSERT statement with async insert options of your choice. pub fn async_insert( &self, table: &str, From 8725518d2d677edf19fc4b38364b98f29095108d Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Sun, 21 May 2023 18:24:40 +0900 Subject: [PATCH 4/6] chore: export AsyncInsertOptions and AsyncInsertOptionsBuilder from root --- src/lib.rs | 7 +++++-- tests/test_query.rs | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index eeca93d..9266135 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,9 +13,12 @@ use hyper::client::connect::HttpConnector; use hyper_tls::HttpsConnector; pub use clickhouse_derive::Row; -use insert::AsyncInsertOptions; -pub use self::{compression::Compression, row::Row}; +pub use self::{ + compression::Compression, + insert::{AsyncInsertOptions, AsyncInsertOptionsBuilder}, + row::Row, +}; use self::{error::Result, http_client::HttpClient}; pub mod error; diff --git a/tests/test_query.rs b/tests/test_query.rs index d580607..7dbfb0b 100644 --- a/tests/test_query.rs +++ b/tests/test_query.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -use clickhouse::{error::Error, insert::AsyncInsertOptions, Row}; +use clickhouse::{error::Error, AsyncInsertOptions, Row}; mod common; From b3cf2b7b55c7251d175eae8b237c31d499991dc0 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Sun, 21 May 2023 18:41:04 +0900 Subject: [PATCH 5/6] doc: add example of async insert to README --- README.md | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/README.md b/README.md index 690bf4b..ae66a5d 100644 --- a/README.md +++ b/README.md @@ -132,6 +132,37 @@ insert.end().await?;
+### Async insert + + + +```rust,ignore +use serde::Serialize; +use clickhouse::{Row, AsyncInsertOptions}; + +#[derive(Row, Serialize)] +struct MyRow { + no: u32, + name: String, +} + +let opts = AsyncInsertOptions::builder() + .async_insert(true) + .async_insert_threads(8) + .build(); +let mut insert = client.async_insert("some", opts)?; +insert.write(&MyRow { no: 0, name: "foo".into() }).await?; +insert.write(&MyRow { no: 1, name: "bar".into() }).await?; +insert.end().await?; +``` + +- To enable [async insert](https://clickhouse.com/docs/en/cloud/bestpractices/asynchronous-inserts), you can use [`Client::async_insert`] method with [`AsyncInsertOptions`] passed to it. +- You can configure async insert via methods on [`AsyncInsertOptionsBuilder`]. + +
+
+ + ### Infinite inserting From 465b1cf699b0784b20393b4a48a2402232b7a4a4 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Mon, 29 May 2023 14:54:05 +0900 Subject: [PATCH 6/6] fix: put settings_clause before FORMAT RowBinary --- src/insert.rs | 2 +- tests/test_query.rs | 16 ++++++++++++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/src/insert.rs b/src/insert.rs index a81338a..f0f9984 100644 --- a/src/insert.rs +++ b/src/insert.rs @@ -77,7 +77,7 @@ impl Insert { // TODO: what about escaping a table name? // https://clickhouse.yandex/docs/en/query_language/syntax/#syntax-identifiers - let query = format!("INSERT INTO {table} ({fields}) FORMAT RowBinary {settings_clause}"); + let query = format!("INSERT INTO {table} ({fields}) {settings_clause} FORMAT RowBinary"); pairs.append_pair("query", &query); if client.compression.is_lz4() { diff --git a/tests/test_query.rs b/tests/test_query.rs index 7dbfb0b..6757cbd 100644 --- a/tests/test_query.rs +++ b/tests/test_query.rs @@ -63,13 +63,18 @@ async fn async_insert() { struct MyRow<'a> { no: u32, name: &'a str, + ts: u32, } // Create a table. client .query( " - CREATE TABLE test(no UInt32, name LowCardinality(String)) + CREATE TABLE test( + no UInt32, + name LowCardinality(String), + ts Datetime + ) ENGINE = MergeTree ORDER BY no ", @@ -92,7 +97,14 @@ async fn async_insert() { .build(); let mut insert = client.async_insert("test", opts).unwrap(); for i in 0..1000 { - insert.write(&MyRow { no: i, name: "foo" }).await.unwrap(); + insert + .write(&MyRow { + no: i, + name: "foo", + ts: 42, + }) + .await + .unwrap(); } insert.end().await.unwrap();