Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for async insert #66

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,37 @@ insert.end().await?;
<details>
<summary>

### Async insert

</summary>

```rust,ignore
use serde::Serialize;
use clickhouse::{Row, AsyncInsertOptions};

#[derive(Row, Serialize)]
struct MyRow {
no: u32,
name: String,
}

let opts = AsyncInsertOptions::builder()
.async_insert(true)
.async_insert_threads(8)
.build();
let mut insert = client.async_insert("some", opts)?;
insert.write(&MyRow { no: 0, name: "foo".into() }).await?;
insert.write(&MyRow { no: 1, name: "bar".into() }).await?;
insert.end().await?;
```

- To enable [async insert](https://clickhouse.com/docs/en/cloud/bestpractices/asynchronous-inserts), you can use [`Client::async_insert`] method with [`AsyncInsertOptions`] passed to it.
- You can configure async insert via methods on [`AsyncInsertOptionsBuilder`].

</details>
<details>
<summary>

### Infinite inserting

</summary>
Expand Down
203 changes: 201 additions & 2 deletions src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ macro_rules! timeout {
}

impl<T> Insert<T> {
pub(crate) fn new(client: &Client, table: &str) -> Result<Self>
pub(crate) fn new(
client: &Client,
table: &str,
async_insert_options: AsyncInsertOptions,
) -> Result<Self>
where
T: Row,
{
Expand All @@ -70,9 +74,11 @@ impl<T> Insert<T> {
let fields = row::join_column_names::<T>()
.expect("the row type must be a struct or a wrapper around it");

let settings_clause = async_insert_options.into_query();

// TODO: what about escaping a table name?
// https://clickhouse.yandex/docs/en/query_language/syntax/#syntax-identifiers
let query = format!("INSERT INTO {table}({fields}) FORMAT RowBinary");
let query = format!("INSERT INTO {table} ({fields}) {settings_clause} FORMAT RowBinary");
pairs.append_pair("query", &query);

if client.compression.is_lz4() {
Expand Down Expand Up @@ -287,3 +293,196 @@ impl<T> Drop for Insert<T> {
self.abort();
}
}

/// The option values to be used when configuring [async insert](https://clickhouse.com/docs/en/cloud/bestpractices/asynchronous-inserts).
#[derive(Debug, Default)]
pub struct AsyncInsertOptions {
async_insert: OptionValue<bool>,
async_insert_threads: OptionValue<usize>,
wait_for_async_insert: OptionValue<bool>,
wait_for_async_insert_timeout: OptionValue<usize>,
async_insert_max_data_size: OptionValue<usize>,
async_insert_max_query_number: OptionValue<usize>,
async_insert_busy_timeout_ms: OptionValue<usize>,
async_insert_stale_timeout_ms: OptionValue<usize>,
async_insert_deduplicate: OptionValue<bool>,
}

impl AsyncInsertOptions {
fn into_query(self) -> String {
let mut options = vec![];

if let OptionValue::Specified(async_insert) = self.async_insert {
let value = if async_insert { 1 } else { 0 };
options.push(format!("async_insert={value}"));
}

if let OptionValue::Specified(async_insert_threads) = self.async_insert_threads {
options.push(format!("async_insert_threads={async_insert_threads}"));
}

if let OptionValue::Specified(wait_for_async_insert) = self.wait_for_async_insert {
let value = if wait_for_async_insert { 1 } else { 0 };
options.push(format!("wait_for_async_insert={value}"));
}

if let OptionValue::Specified(wait_for_async_insert_timeout) =
self.wait_for_async_insert_timeout
{
options.push(format!(
"wait_for_async_insert_timeout={wait_for_async_insert_timeout}"
));
}

if let OptionValue::Specified(async_insert_max_data_size) = self.async_insert_max_data_size
{
options.push(format!(
"async_insert_max_data_size={async_insert_max_data_size}"
));
}

if let OptionValue::Specified(async_insert_max_query_number) =
self.async_insert_max_query_number
{
options.push(format!(
"async_insert_max_query_number={async_insert_max_query_number}"
));
}

if let OptionValue::Specified(async_insert_busy_timeout_ms) =
self.async_insert_busy_timeout_ms
{
options.push(format!(
"async_insert_busy_timeout_ms={async_insert_busy_timeout_ms}"
));
}

if let OptionValue::Specified(async_insert_stale_timeout_ms) =
self.async_insert_stale_timeout_ms
{
options.push(format!(
"async_insert_stale_timeout_ms={async_insert_stale_timeout_ms}"
));
}

if let OptionValue::Specified(async_insert_deduplicate) = self.async_insert_deduplicate {
let value = if async_insert_deduplicate { 1 } else { 0 };
options.push(format!("async_insert_deduplicate={value}"));
}

if options.is_empty() {
return "".to_string();
}

format!("SETTINGS {}", options.join(", "))
}
}

#[derive(Debug)]
enum OptionValue<T> {
Unspecified,
Specified(T),
}

impl<T> Default for OptionValue<T> {
fn default() -> Self {
Self::Unspecified
}
}

/// A builder for [`AsyncInsertOptions`].
#[derive(Debug, Default)]
pub struct AsyncInsertOptionsBuilder {
async_insert: OptionValue<bool>,
async_insert_threads: OptionValue<usize>,
wait_for_async_insert: OptionValue<bool>,
wait_for_async_insert_timeout: OptionValue<usize>,
async_insert_max_data_size: OptionValue<usize>,
async_insert_max_query_number: OptionValue<usize>,
async_insert_busy_timeout_ms: OptionValue<usize>,
async_insert_stale_timeout_ms: OptionValue<usize>,
async_insert_deduplicate: OptionValue<bool>,
}

impl AsyncInsertOptionsBuilder {
/// Starts a new builder.
pub fn new() -> Self {
Self::default()
}

/// Builds [`AsyncInsertOptions`] with the specified options.
pub fn build(self) -> AsyncInsertOptions {
AsyncInsertOptions {
async_insert: self.async_insert,
async_insert_threads: self.async_insert_threads,
wait_for_async_insert: self.wait_for_async_insert,
wait_for_async_insert_timeout: self.wait_for_async_insert_timeout,
async_insert_max_data_size: self.async_insert_max_data_size,
async_insert_max_query_number: self.async_insert_max_query_number,
async_insert_busy_timeout_ms: self.async_insert_busy_timeout_ms,
async_insert_stale_timeout_ms: self.async_insert_stale_timeout_ms,
async_insert_deduplicate: self.async_insert_deduplicate,
}
}

/// Specifies whether to enable async insert. See <https://clickhouse.com/docs/en/operations/settings/settings#async-insert> for more info.
pub fn async_insert(mut self, async_insert: bool) -> Self {
self.async_insert = OptionValue::Specified(async_insert);
self
}

/// Specifies the number of threads for async insert. See <https://clickhouse.com/docs/en/operations/settings/settings#async-insert-threads> for more info.
pub fn async_insert_threads(mut self, async_insert_threads: usize) -> Self {
self.async_insert_threads = OptionValue::Specified(async_insert_threads);
self
}

/// Specifies whether to enable waiting for processing of asynchronous insertion. See <https://clickhouse.com/docs/en/operations/settings/settings#wait-for-async-insert> for more info.
pub fn wait_for_async_insert(mut self, wait_for_async_insert: bool) -> Self {
self.wait_for_async_insert = OptionValue::Specified(wait_for_async_insert);
self
}

/// Specifies the timeout in seconds for waiting for processing of asynchronous insertion. See <https://clickhouse.com/docs/en/operations/settings/settings#wait-for-async-insert-timeout> for more info.
pub fn wait_for_async_insert_timeout(mut self, wait_for_async_insert_timeout: usize) -> Self {
self.wait_for_async_insert_timeout = OptionValue::Specified(wait_for_async_insert_timeout);
self
}

/// Specifies the maximum size of unparsed data in bytes collected per query before being inserted. See <https://clickhouse.com/docs/en/operations/settings/settings#async-insert-max-data-size> for more info.
pub fn async_insert_max_data_size(mut self, async_insert_max_data_size: usize) -> Self {
self.async_insert_max_data_size = OptionValue::Specified(async_insert_max_data_size);
self
}

/// Specifies the maximum number of queries per block before being inserted. See <https://clickhouse.com/docs/en/operations/settings/settings#async-insert-max-query-number> for more info.
pub fn async_insert_max_query_number(mut self, async_insert_max_query_number: usize) -> Self {
self.async_insert_max_query_number = OptionValue::Specified(async_insert_max_query_number);
self
}

/// Specifies the maximum timeout in milliseconds since the first `INSERT` query before inserting. See <https://clickhouse.com/docs/en/operations/settings/settings#async-insert-busy-timeout-ms> for more info.
pub fn async_insert_busy_timeout_ms(mut self, async_insert_busy_timeout_ms: usize) -> Self {
self.async_insert_busy_timeout_ms = OptionValue::Specified(async_insert_busy_timeout_ms);
self
}

/// Specifies the maximum timeout in milliseconds since the last `INSERT` query before inserting. See <https://clickhouse.com/docs/en/operations/settings/settings#async-insert-stale-timeout-ms> for more info.
pub fn async_insert_stale_timeout_ms(mut self, async_insert_stale_timeout_ms: usize) -> Self {
self.async_insert_stale_timeout_ms = OptionValue::Specified(async_insert_stale_timeout_ms);
self
}

/// Specifies whether to enable deduplication of data in async insert. See <https://clickhouse.com/docs/en/operations/settings/settings#async-insert-deduplicate> for more info.
pub fn async_insert_deduplicate(mut self, async_insert_deduplicate: bool) -> Self {
self.async_insert_deduplicate = OptionValue::Specified(async_insert_deduplicate);
self
}
}

impl AsyncInsertOptions {
/// Starts a new builder.
pub fn builder() -> AsyncInsertOptionsBuilder {
AsyncInsertOptionsBuilder::default()
}
}
17 changes: 15 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ use hyper_tls::HttpsConnector;

pub use clickhouse_derive::Row;

pub use self::{compression::Compression, row::Row};
pub use self::{
compression::Compression,
insert::{AsyncInsertOptions, AsyncInsertOptionsBuilder},
row::Row,
};
use self::{error::Result, http_client::HttpClient};

pub mod error;
Expand Down Expand Up @@ -175,7 +179,16 @@ impl Client {
/// # Panics
/// If `T` has unnamed fields, e.g. tuples.
pub fn insert<T: Row>(&self, table: &str) -> Result<insert::Insert<T>> {
insert::Insert::new(self, table)
insert::Insert::new(self, table, AsyncInsertOptions::default())
}

/// Starts a new INSERT statement with async insert options of your choice.
pub fn async_insert<T: Row>(
&self,
table: &str,
async_insert_options: AsyncInsertOptions,
) -> Result<insert::Insert<T>> {
insert::Insert::new(self, table, async_insert_options)
}

/// Creates an inserter to perform multiple INSERTs.
Expand Down
74 changes: 73 additions & 1 deletion tests/it/query.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};

use clickhouse::{error::Error, Row};
use clickhouse::{error::Error, AsyncInsertOptions, Row};

#[tokio::test]
async fn smoke() {
Expand Down Expand Up @@ -51,6 +51,78 @@ async fn smoke() {
}
}

#[tokio::test]
async fn async_insert() {
let client = prepare_database!();

#[derive(Debug, Row, Serialize, Deserialize)]
struct MyRow<'a> {
no: u32,
name: &'a str,
ts: u32,
}

// Create a table.
client
.query(
"
CREATE TABLE test(
no UInt32,
name LowCardinality(String),
ts Datetime
)
ENGINE = MergeTree
ORDER BY no
",
)
.execute()
.await
.unwrap();

// Async-write to the table.
let opts = AsyncInsertOptions::builder()
.async_insert(true)
.async_insert_threads(8)
.wait_for_async_insert(true)
.wait_for_async_insert_timeout(42)
.async_insert_max_data_size(42)
.async_insert_max_query_number(42)
.async_insert_busy_timeout_ms(42)
.async_insert_stale_timeout_ms(42)
.async_insert_deduplicate(true)
.build();
let mut insert = client.async_insert("test", opts).unwrap();
for i in 0..1000 {
insert
.write(&MyRow {
no: i,
name: "foo",
ts: 42,
})
.await
.unwrap();
}

insert.end().await.unwrap();

// Read from the table.
let mut cursor = client
.query("SELECT ?fields FROM test WHERE name = ? AND no BETWEEN ? AND ?.2")
.bind("foo")
.bind(500)
.bind((42, 504))
.fetch::<MyRow<'_>>()
.unwrap();

let mut i = 500;

while let Some(row) = cursor.next().await.unwrap() {
assert_eq!(row.no, i);
assert_eq!(row.name, "foo");
i += 1;
}
}

#[tokio::test]
async fn fetch_one_and_optional() {
let client = prepare_database!();
Expand Down