From 4b2f257a48ec3e0eb82ece72c02142e483619f3f Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Fri, 18 Oct 2024 11:47:19 +0700 Subject: [PATCH 1/4] update readme --- CHANGELOG.md | 4 + README.md | 224 +++++++++++++-------------------------------------- lib/ch.ex | 10 +-- 3 files changed, 66 insertions(+), 172 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cb8dfe8..2904c7a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## Unreleased + +- move rows for INSERT from `params` to `statement` + ## 0.2.8 (2024-09-06) - support named tuples https://github.com/plausible/ch/pull/197 diff --git a/README.md b/README.md index beae3e5..a48199c 100644 --- a/README.md +++ b/README.md @@ -52,9 +52,6 @@ defaults = [ {:ok, %Ch.Result{rows: [[0], [1], [2]]}} = Ch.query(pid, "SELECT * FROM system.numbers LIMIT 3") -{:ok, %Ch.Result{rows: [[0], [1], [2]]}} = - Ch.query(pid, "SELECT * FROM system.numbers LIMIT {$0:UInt8}", [3]) - {:ok, %Ch.Result{rows: [[0], [1], [2]]}} = Ch.query(pid, "SELECT * FROM system.numbers LIMIT {limit:UInt8}", %{"limit" => 3}) ``` @@ -75,9 +72,6 @@ Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") %Ch.Result{num_rows: 2} = Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES (0), (1)") -%Ch.Result{num_rows: 2} = - Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES ({$0:UInt8}), ({$1:UInt32})", [0, 1]) - %Ch.Result{num_rows: 2} = Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES ({a:UInt16}), ({b:UInt64})", %{"a" => 0, "b" => 1}) @@ -85,36 +79,52 @@ Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") Ch.query!(pid, "INSERT INTO ch_demo(id) SELECT number FROM system.numbers LIMIT {limit:UInt8}", %{"limit" => 2}) ``` -#### Insert rows as [RowBinary](https://clickhouse.com/docs/en/interfaces/formats#rowbinary) (efficient) +#### Insert [RowBinary](https://clickhouse.com/docs/en/interfaces/formats#rowbinary) ```elixir {:ok, pid} = Ch.start_link() -Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") +Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64, text String) ENGINE Null") -types = ["UInt64"] +rows = [ + [0, "a"], + [1, "b"] +] + +types = ["UInt64", "String"] # or -types = [Ch.Types.u64()] +types = [Ch.Types.u64(), Ch.Types.string()] # or -types = [:u64] +types = [:u64, :string] + +rowbinary = Ch.RowBinary.encode_rows(rows, types) %Ch.Result{num_rows: 2} = - Ch.query!(pid, "INSERT INTO ch_demo(id) FORMAT RowBinary", [[0], [1]], types: types) + Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT RowBinary\n" | rowbinary]) ``` -Note that RowBinary format encoding requires `:types` option to be provided. - Similarly, you can use [`RowBinaryWithNamesAndTypes`](https://clickhouse.com/docs/en/interfaces/formats#rowbinarywithnamesandtypes) which would additionally do something like a type check. ```elixir -sql = "INSERT INTO ch_demo FORMAT RowBinaryWithNamesAndTypes" -opts = [names: ["id"], types: ["UInt64"]] -rows = [[0], [1]] +sql = "INSERT INTO ch_demo FORMAT RowBinaryWithNamesAndTypes\n" + +rows = [ + [0, "a"], + [1, "b"] +] -%Ch.Result{num_rows: 2} = Ch.query!(pid, sql, rows, opts) +types = ["UInt64", "String"] +names = ["id", "text"] + +data = [ + Ch.RowBinary.encode_names_and_types(names, types), + Ch.RowBinary.encode_rows(rows, types) +] + +%Ch.Result{num_rows: 2} = Ch.query!(pid, [sql | data]) ``` -#### Insert rows in custom [format](https://clickhouse.com/docs/en/interfaces/formats) +#### Insert rows in some other [format](https://clickhouse.com/docs/en/interfaces/formats) ```elixir {:ok, pid} = Ch.start_link() @@ -124,26 +134,27 @@ Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") csv = [0, 1] |> Enum.map(&to_string/1) |> Enum.intersperse(?\n) %Ch.Result{num_rows: 2} = - Ch.query!(pid, "INSERT INTO ch_demo(id) FORMAT CSV", csv, encode: false) + Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT CSV\n" | csv]) ``` -#### Insert rows as chunked RowBinary stream +#### Insert [chunked](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) RowBinary stream ```elixir {:ok, pid} = Ch.start_link() Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") -stream = Stream.repeatedly(fn -> [:rand.uniform(100)] end) -chunked = Stream.chunk_every(stream, 100) -encoded = Stream.map(chunked, fn chunk -> Ch.RowBinary.encode_rows(chunk, _types = ["UInt64"]) end) -ten_encoded_chunks = Stream.take(encoded, 10) - -%Ch.Result{num_rows: 1000} = - Ch.query(pid, "INSERT INTO ch_demo(id) FORMAT RowBinary", ten_encoded_chunks, encode: false) +DBConnection.run(pid, fn conn -> + Stream.repeatedly(fn -> [:rand.uniform(100)] end) + |> Stream.chunk_every(100_000) + |> Stream.map(fn chunk -> Ch.RowBinary.encode_rows(chunk, _types = ["UInt64"]) end) + |> Stream.take(10) + |> Stream.into(Ch.stream(conn, "INSERT INTO ch_demo(id) FORMAT RowBinary\n")) + |> Stream.run() +end) ``` -This query makes a [`transfer-encoding: chunked`](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) HTTP request while unfolding the stream resulting in lower memory usage. +This query makes a [`transfer-encoding: chunked`] HTTP request while unfolding the stream resulting in lower memory usage. #### Query with custom [settings](https://clickhouse.com/docs/en/operations/settings/settings) @@ -156,7 +167,7 @@ settings = [async_insert: 1] Ch.query!(pid, "SHOW SETTINGS LIKE 'async_insert'") %Ch.Result{rows: [["async_insert", "Bool", "1"]]} = - Ch.query!(pid, "SHOW SETTINGS LIKE 'async_insert'", [], settings: settings) + Ch.query!(pid, "SHOW SETTINGS LIKE 'async_insert'", _params = [], settings: settings) ``` ## Caveats @@ -179,13 +190,13 @@ CREATE TABLE ch_nulls ( """) types = ["Nullable(UInt8)", "UInt8", "UInt8"] -inserted_rows = [[nil, nil, nil]] -selected_rows = [[nil, 0, 0]] +row = [nil, nil, nil] +rowbinary = Ch.RowBinary.encode_row(row, types) %Ch.Result{num_rows: 1} = - Ch.query!(pid, "INSERT INTO ch_nulls(a, b, c) FORMAT RowBinary", inserted_rows, types: types) + Ch.query!(pid, ["INSERT INTO ch_nulls(a, b, c) FORMAT RowBinary\n" | rowbinary]) -%Ch.Result{rows: ^selected_rows} = +%Ch.Result{rows: [[nil, _not_10 = 0, 0]]} = Ch.query!(pid, "SELECT * FROM ch_nulls") ``` @@ -197,12 +208,16 @@ However, [`input()`](https://clickhouse.com/docs/en/sql-reference/table-function sql = """ INSERT INTO ch_nulls SELECT * FROM input('a Nullable(UInt8), b Nullable(UInt8), c UInt8') - FORMAT RowBinary\ + FORMAT RowBinary """ -Ch.query!(pid, sql, inserted_rows, types: ["Nullable(UInt8)", "Nullable(UInt8)", "UInt8"]) +types = ["Nullable(UInt8)", "Nullable(UInt8)", "UInt8"] +rowbinary = Ch.RowBinary.encode_row(row, types) + +%Ch.Result{num_rows: 1} = + Ch.query!(pid, [sql | rowbinary]) -%Ch.Result{rows: [[0], [10]]} = +%Ch.Result{rows: [_before = [0], _after = [10]]} = Ch.query!(pid, "SELECT b FROM ch_nulls ORDER BY b") ``` @@ -215,26 +230,18 @@ When decoding [`String`](https://clickhouse.com/docs/en/sql-reference/data-types Ch.query!(pid, "CREATE TABLE ch_utf8(str String) ENGINE Memory") -bin = "\x61\xF0\x80\x80\x80b" -utf8 = "a�b" +rowbinary = Ch.RowBinary.encode(:string, "\x61\xF0\x80\x80\x80b") %Ch.Result{num_rows: 1} = - Ch.query!(pid, "INSERT INTO ch_utf8(str) FORMAT RowBinary", [[bin]], types: ["String"]) + Ch.query!(pid, ["INSERT INTO ch_utf8(str) FORMAT RowBinary\n" | rowbinary]) -%Ch.Result{rows: [[^utf8]]} = +%Ch.Result{rows: [["a�b"]]} = Ch.query!(pid, "SELECT * FROM ch_utf8") -%Ch.Result{rows: %{"data" => [[^utf8]]}} = +%Ch.Result{rows: %{"data" => [["a�b"]]}} = pid |> Ch.query!("SELECT * FROM ch_utf8 FORMAT JSONCompact") |> Map.update!(:rows, &Jason.decode!/1) ``` -To get raw binary from `String` columns use `:binary` type that skips UTF-8 checks. - -```elixir -%Ch.Result{rows: [[^bin]]} = - Ch.query!(pid, "SELECT * FROM ch_utf8", [], types: [:binary]) -``` - #### Timezones in RowBinary Decoding non-UTC datetimes like `DateTime('Asia/Taipei')` requires a [timezone database.](https://hexdocs.pm/elixir/DateTime.html#module-time-zone-database) @@ -268,124 +275,9 @@ utc = DateTime.utc_now() taipei = DateTime.shift_zone!(utc, "Asia/Taipei") # ** (ArgumentError) non-UTC timezones are not supported for encoding: 2023-04-26 01:49:43.044569+08:00 CST Asia/Taipei -Ch.query!(pid, "INSERT INTO ch_datetimes(datetime) FORMAT RowBinary", [[naive], [utc], [taipei]], types: ["DateTime"]) +Ch.RowBinary.encode_rows([[naive], [utc], [taipei]], ["DateTime"]) ``` ## Benchmarks -
-INSERT 1 million rows (original) - -

-$ MIX_ENV=bench mix run bench/insert.exs
-
-This benchmark is based on https://github.com/ClickHouse/clickhouse-go#benchmark
-
-Operating System: macOS
-CPU Information: Apple M1
-Number of Available Cores: 8
-Available memory: 8 GB
-Elixir 1.14.4
-Erlang 25.3
-
-Benchmark suite executing with the following configuration:
-warmup: 2 s
-time: 5 s
-memory time: 0 ns
-reduction time: 0 ns
-parallel: 1
-inputs: 1_000_000 rows
-Estimated total run time: 28 s
-
-Benchmarking encode with input 1_000_000 rows ...
-Benchmarking encode stream with input 1_000_000 rows ...
-Benchmarking insert with input 1_000_000 rows ...
-Benchmarking insert stream with input 1_000_000 rows ...
-
-##### With input 1_000_000 rows #####
-Name                    ips        average  deviation         median         99th %
-encode stream          1.63      612.96 ms    ±11.30%      583.03 ms      773.01 ms
-insert stream          1.22      819.82 ms     ±9.41%      798.94 ms      973.45 ms
-encode                 1.09      915.75 ms    ±44.13%      750.98 ms     1637.02 ms
-insert                 0.73     1373.84 ms    ±31.01%     1331.86 ms     1915.76 ms
-
-Comparison: 
-encode stream          1.63
-insert stream          1.22 - 1.34x slower +206.87 ms
-encode                 1.09 - 1.49x slower +302.79 ms
-insert                 0.73 - 2.24x slower +760.88 ms
-
- -
- -
-SELECT 500, 500 thousand, and 500 million rows (original) - -

-$ MIX_ENV=bench mix run bench/stream.exs
-
-This benchmark is based on https://github.com/ClickHouse/ch-bench
-
-Operating System: macOS
-CPU Information: Apple M1
-Number of Available Cores: 8
-Available memory: 8 GB
-Elixir 1.14.4
-Erlang 25.3
-
-Benchmark suite executing with the following configuration:
-warmup: 2 s
-time: 5 s
-memory time: 0 ns
-reduction time: 0 ns
-parallel: 1
-inputs: 500 rows, 500_000 rows, 500_000_000 rows
-Estimated total run time: 1.05 min
-
-Benchmarking stream with decode with input 500 rows ...
-Benchmarking stream with decode with input 500_000 rows ...
-Benchmarking stream with decode with input 500_000_000 rows ...
-Benchmarking stream with manual decode with input 500 rows ...
-Benchmarking stream with manual decode with input 500_000 rows ...
-Benchmarking stream with manual decode with input 500_000_000 rows ...
-Benchmarking stream without decode with input 500 rows ...
-Benchmarking stream without decode with input 500_000 rows ...
-Benchmarking stream without decode with input 500_000_000 rows ...
-
-##### With input 500 rows #####
-Name                                ips        average  deviation         median         99th %
-stream with decode               4.69 K      213.34 μs    ±12.49%      211.38 μs      290.94 μs
-stream with manual decode        4.69 K      213.43 μs    ±17.40%      210.96 μs      298.75 μs
-stream without decode            4.65 K      215.08 μs    ±10.79%      213.79 μs      284.66 μs
-
-Comparison:
-stream with decode               4.69 K
-stream with manual decode        4.69 K - 1.00x slower +0.0838 μs
-stream without decode            4.65 K - 1.01x slower +1.74 μs
-
-##### With input 500_000 rows #####
-Name                                ips        average  deviation         median         99th %
-stream without decode            234.58        4.26 ms    ±13.99%        4.04 ms        5.95 ms
-stream with manual decode         64.26       15.56 ms     ±8.36%       15.86 ms       17.97 ms
-stream with decode                41.03       24.37 ms     ±6.27%       24.39 ms       26.60 ms
-
-Comparison:
-stream without decode            234.58
-stream with manual decode         64.26 - 3.65x slower +11.30 ms
-stream with decode                41.03 - 5.72x slower +20.11 ms
-
-##### With input 500_000_000 rows #####
-Name                                ips        average  deviation         median         99th %
-stream without decode              0.32         3.17 s     ±0.20%         3.17 s         3.17 s
-stream with manual decode        0.0891        11.23 s     ±0.00%        11.23 s        11.23 s
-stream with decode               0.0462        21.66 s     ±0.00%        21.66 s        21.66 s
-
-Comparison:
-stream without decode              0.32
-stream with manual decode        0.0891 - 3.55x slower +8.06 s
-stream with decode               0.0462 - 6.84x slower +18.50 s
-
- -
- -[CI Results](https://github.com/plausible/ch/actions/workflows/bench.yml) (click the latest workflow run and scroll down to "Artifacts") +Please see [CI Results](https://github.com/plausible/ch/actions/workflows/bench.yml) (make sure to click the latest workflow run and scroll down to "Artifacts") for [some of our benchmarks.](./bench/) :) diff --git a/lib/ch.ex b/lib/ch.ex index ad4217f..dcc3002 100644 --- a/lib/ch.ex +++ b/lib/ch.ex @@ -14,7 +14,7 @@ defmodule Ch do | {:scheme, String.t()} | {:hostname, String.t()} | {:port, :inet.port_number()} - | {:transport_opts, :gen_tcp.connect_option()} + | {:transport_opts, :gen_tcp.connect_option() | :ssl.tls_client_option()} | DBConnection.start_option() @doc """ @@ -29,7 +29,7 @@ defmodule Ch do * `:database` - Database, defaults to `"default"` * `:username` - Username * `:password` - User password - * `:settings` - Keyword list of ClickHouse settings + * `:settings` - Keyword list of ClickHouse settings to send wtih every query * `:timeout` - HTTP receive timeout in milliseconds * `:transport_opts` - options to be given to the transport being used. See `Mint.HTTP1.connect/4` for more info * [`DBConnection.start_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:start_option/0) @@ -55,8 +55,6 @@ defmodule Ch do | {:command, Ch.Query.command()} | {:headers, [{String.t(), String.t()}]} | {:format, String.t()} - # TODO remove - | {:encode, boolean} | {:decode, boolean} | DBConnection.connection_option() @@ -69,8 +67,8 @@ defmodule Ch do * `:database` - Database * `:username` - Username * `:password` - User password - * `:settings` - Keyword list of settings - * `:timeout` - Query request timeout + * `:settings` - Keyword list of settings to merge with `:settings` from `start_link` and send with this query + * `:timeout` - Configures both query request timeout and HTTP receive timeout in milliseconds, whichever happens faster * `:command` - Command tag for the query * `:headers` - Custom HTTP headers for the request * `:format` - Custom response format for the request From 1b7919fbd13b8be0593ddbc7c663f31e320e6c62 Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Fri, 18 Oct 2024 11:48:46 +0700 Subject: [PATCH 2/4] update readme --- README.md | 69 ++++++++++++++++++++++++------- bench/buffer.exs | 0 bench/decode.exs | 0 bench/encode.exs | 55 ++++++++++++++++++++++++ bench/http.exs | 0 bench/insert.exs | 9 +--- bench/native.exs | 0 bench/types.exs | 0 guides/in_memory_insert_buffer.md | 1 + guides/multinode.md | 3 ++ guides/on_disk_insert_buffer.md | 37 +++++++++++++++++ lib/ch.ex | 12 +----- lib/ch/error.ex | 2 +- lib/ch/http.ex | 2 + lib/ch/native.ex | 3 ++ lib/ch/query.ex | 9 ++-- lib/ch/result.ex | 8 ++-- lib/ch/row_binary.ex | 2 + lib/ch/ssl.ex | 3 ++ lib/ch/stream.ex | 1 + lib/ch/types.ex | 2 + mix.exs | 8 ++-- test/test_helper.exs | 4 +- 23 files changed, 179 insertions(+), 51 deletions(-) create mode 100644 bench/buffer.exs create mode 100644 bench/decode.exs create mode 100644 bench/encode.exs create mode 100644 bench/http.exs create mode 100644 bench/native.exs create mode 100644 bench/types.exs create mode 100644 guides/in_memory_insert_buffer.md create mode 100644 guides/multinode.md create mode 100644 guides/on_disk_insert_buffer.md create mode 100644 lib/ch/http.ex create mode 100644 lib/ch/native.ex create mode 100644 lib/ch/ssl.ex diff --git a/README.md b/README.md index a48199c..0d6a654 100644 --- a/README.md +++ b/README.md @@ -3,25 +3,25 @@ [![Documentation badge](https://img.shields.io/badge/Documentation-ff69b4)](https://hexdocs.pm/ch) [![Hex.pm badge](https://img.shields.io/badge/Package%20on%20hex.pm-informational)](https://hex.pm/packages/ch) -Minimal HTTP ClickHouse client for Elixir. +Minimal ClickHouse client for Elixir. Used in [Ecto ClickHouse adapter.](https://github.com/plausible/ecto_ch) ### Key features -- RowBinary - Native query parameters - Per query settings - Minimal API - -Your ideas are welcome [here.](https://github.com/plausible/ch/issues/82) +- HTTP or Native +- [Multinode support](./guides/multihost.md) +- [Compression](./guides/compression.md) ## Installation ```elixir defp deps do [ - {:ch, "~> 0.2.0"} + {:ch, "~> 0.3.0"} ] end ``` @@ -60,7 +60,7 @@ Note on datetime encoding in query parameters: - `%NaiveDateTime{}` is encoded as text to make it assume the column's or ClickHouse server's timezone - `%DateTime{time_zone: "Etc/UTC"}` is encoded as unix timestamp and is treated as UTC timestamp by ClickHouse -- encoding non UTC `%DateTime{}` raises `ArgumentError` +- encoding non-UTC `%DateTime{}` requires a [time zone database](https://hexdocs.pm/elixir/1.17.1/DateTime.html#module-time-zone-database) #### Insert rows @@ -100,10 +100,10 @@ types = [:u64, :string] rowbinary = Ch.RowBinary.encode_rows(rows, types) %Ch.Result{num_rows: 2} = - Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT RowBinary\n" | rowbinary]) + Ch.query!(pid, ["INSERT INTO ch_demo(id, text) FORMAT RowBinary\n" | rowbinary]) ``` -Similarly, you can use [`RowBinaryWithNamesAndTypes`](https://clickhouse.com/docs/en/interfaces/formats#rowbinarywithnamesandtypes) which would additionally do something like a type check. +Similarly, you can use [`RowBinaryWithNamesAndTypes`](https://clickhouse.com/docs/en/interfaces/formats#rowbinarywithnamesandtypes) which would additionally do something not quite unlike a type check. ```elixir sql = "INSERT INTO ch_demo FORMAT RowBinaryWithNamesAndTypes\n" @@ -116,12 +116,24 @@ rows = [ types = ["UInt64", "String"] names = ["id", "text"] -data = [ +rowbinary_with_names_and_types = [ Ch.RowBinary.encode_names_and_types(names, types), Ch.RowBinary.encode_rows(rows, types) ] -%Ch.Result{num_rows: 2} = Ch.query!(pid, [sql | data]) +%Ch.Result{num_rows: 2} = + Ch.query!(pid, [sql | rowbinary_with_names_and_types]) +``` + +And you can use buffer helpers too. They are available for RowBinary, RowBinaryWithNamesAndTypes, and Native formats. + +```elixir +buffer = Ch.RowBinary.new_buffer(_types = ["UInt64", "String"]) +buffer = Ch.RowBinary.push_buffer(buffer, [[0, "a"], [1, "b"]]) +rowbinary = Ch.RowBinary.buffer_to_iodata(buffer) + +%Ch.Result{num_rows: 2} = + Ch.query!(pid, ["INSERT INTO ch_demo(id, text) FORMAT RowBinary\n" | rowbinary]) ``` #### Insert rows in some other [format](https://clickhouse.com/docs/en/interfaces/formats) @@ -129,9 +141,13 @@ data = [ ```elixir {:ok, pid} = Ch.start_link() -Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") +Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64, text String) ENGINE Null") -csv = [0, 1] |> Enum.map(&to_string/1) |> Enum.intersperse(?\n) +csv = + """ + 0,"a"\n + 1,"b"\ + """ %Ch.Result{num_rows: 2} = Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT CSV\n" | csv]) @@ -156,6 +172,20 @@ end) This query makes a [`transfer-encoding: chunked`] HTTP request while unfolding the stream resulting in lower memory usage. +#### Stream from a file + +```elixir +{:ok, pid} = Ch.start_link() + +Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") + +DBConnection.run(pid, fn conn -> + File.stream!("buffer.tmp", _bytes = 2048) + |> Stream.into(Ch.stream(conn, "INSERT INTO ch_demo(id) FORMAT RowBinary\n")) + |> Stream.run() +end) +``` + #### Query with custom [settings](https://clickhouse.com/docs/en/operations/settings/settings) ```elixir @@ -265,7 +295,7 @@ Mix.install([:ch, :tz]) "2023-04-26 01:45:12+08:00 CST Asia/Taipei" = to_string(taipei) ``` -Encoding non-UTC datetimes raises an `ArgumentError` +Encoding non-UTC datetimes is possible but slow. ```elixir Ch.query!(pid, "CREATE TABLE ch_datetimes(datetime DateTime) ENGINE Null") @@ -274,10 +304,17 @@ naive = NaiveDateTime.utc_now() utc = DateTime.utc_now() taipei = DateTime.shift_zone!(utc, "Asia/Taipei") -# ** (ArgumentError) non-UTC timezones are not supported for encoding: 2023-04-26 01:49:43.044569+08:00 CST Asia/Taipei -Ch.RowBinary.encode_rows([[naive], [utc], [taipei]], ["DateTime"]) +rows = [ + [naive], + [utc], + [taipei] +] + +types = ["DateTime"] + +Ch.RowBinary.encode_rows(rows, types) ``` ## Benchmarks -Please see [CI Results](https://github.com/plausible/ch/actions/workflows/bench.yml) (make sure to click the latest workflow run and scroll down to "Artifacts") for [some of our benchmarks.](./bench/) :) +Please see [CI Results](https://github.com/plausible/ch/actions/workflows/bench.yml) (make sure to click the latest workflow run and scroll down to "Artifacts") for [some of our benchmarks.](./bench/) diff --git a/bench/buffer.exs b/bench/buffer.exs new file mode 100644 index 0000000..e69de29 diff --git a/bench/decode.exs b/bench/decode.exs new file mode 100644 index 0000000..e69de29 diff --git a/bench/encode.exs b/bench/encode.exs new file mode 100644 index 0000000..f6f01c4 --- /dev/null +++ b/bench/encode.exs @@ -0,0 +1,55 @@ +IO.puts("This benchmark is based on https://github.com/ClickHouse/clickhouse-go#benchmark\n") + +port = String.to_integer(System.get_env("CH_PORT") || "8123") +hostname = System.get_env("CH_HOSTNAME") || "localhost" +scheme = System.get_env("CH_SCHEME") || "http" +database = System.get_env("CH_DATABASE") || "ch_bench" + +{:ok, conn} = Ch.start_link(scheme: scheme, hostname: hostname, port: port) +Ch.query!(conn, "CREATE DATABASE IF NOT EXISTS {$0:Identifier}", [database]) + +Ch.query!(conn, """ +CREATE TABLE IF NOT EXISTS #{database}.benchmark ( + col1 UInt64, + col2 String, + col3 Array(UInt8), + col4 DateTime +) Engine Null +""") + +types = [Ch.Types.u64(), Ch.Types.string(), Ch.Types.array(Ch.Types.u8()), Ch.Types.datetime()] +statement = "INSERT INTO #{database}.benchmark FORMAT RowBinary" + +rows = fn count -> + Enum.map(1..count, fn i -> + [i, "Golang SQL database driver", [1, 2, 3, 4, 5, 6, 7, 8, 9], NaiveDateTime.utc_now()] + end) +end + +alias Ch.RowBinary + +Benchee.run( + %{ + # "control" => fn rows -> Enum.each(rows, fn _row -> :ok end) end, + "encode" => fn rows -> RowBinary.encode_rows(rows, types) end, + "insert" => fn rows -> Ch.query!(conn, statement, rows, types: types) end, + # "control stream" => fn rows -> rows |> Stream.chunk_every(60_000) |> Stream.run() end, + "encode stream" => fn rows -> + rows + |> Stream.chunk_every(60_000) + |> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end) + |> Stream.run() + end, + "insert stream" => fn rows -> + stream = + rows + |> Stream.chunk_every(60_000) + |> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end) + + Ch.query!(conn, statement, stream, encode: false) + end + }, + inputs: %{ + "1_000_000 rows" => rows.(1_000_000) + } +) diff --git a/bench/http.exs b/bench/http.exs new file mode 100644 index 0000000..e69de29 diff --git a/bench/insert.exs b/bench/insert.exs index f6f01c4..9f06704 100644 --- a/bench/insert.exs +++ b/bench/insert.exs @@ -31,15 +31,10 @@ alias Ch.RowBinary Benchee.run( %{ # "control" => fn rows -> Enum.each(rows, fn _row -> :ok end) end, - "encode" => fn rows -> RowBinary.encode_rows(rows, types) end, + "insert" => fn rows -> Ch.query!(conn, statement, rows, types: types) end, # "control stream" => fn rows -> rows |> Stream.chunk_every(60_000) |> Stream.run() end, - "encode stream" => fn rows -> - rows - |> Stream.chunk_every(60_000) - |> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end) - |> Stream.run() - end, + "insert stream" => fn rows -> stream = rows diff --git a/bench/native.exs b/bench/native.exs new file mode 100644 index 0000000..e69de29 diff --git a/bench/types.exs b/bench/types.exs new file mode 100644 index 0000000..e69de29 diff --git a/guides/in_memory_insert_buffer.md b/guides/in_memory_insert_buffer.md new file mode 100644 index 0000000..3d73602 --- /dev/null +++ b/guides/in_memory_insert_buffer.md @@ -0,0 +1 @@ +# In-memory INSERT buffer diff --git a/guides/multinode.md b/guides/multinode.md new file mode 100644 index 0000000..9d7bff9 --- /dev/null +++ b/guides/multinode.md @@ -0,0 +1,3 @@ +# Connecting to multiple nodes + +Similar to https://clickhouse.com/docs/en/integrations/go#connecting-to-multiple-nodes diff --git a/guides/on_disk_insert_buffer.md b/guides/on_disk_insert_buffer.md new file mode 100644 index 0000000..0301b03 --- /dev/null +++ b/guides/on_disk_insert_buffer.md @@ -0,0 +1,37 @@ +# On-disk INSERT buffer + +Here how you could do it + +```elixir +defmodule WriteBuffer do + use GenServer + + # 5 MB + max_buffer_size = 5_000_000 + + def insert(rows) do + row_binary = Ch.RowBinary.encode_many(rows, unquote(encoding_types)) + GenServer.call(__MODULE__, {:buffer, row_binary}) + end + + def init(opts) do + {:ok, fd} = :file.open() + %{fd: fd, buffer_size: 0} + end + + def handle_call({:buffer, row_binary}, _from, state) do + new_buffer_size = state.buffer_size + IO.iodata_length(row_binary) + :file.write(state.fd, row_binary) + + if new_buffer_size < unquote(max_buffer_size) do + %{state | buffer_size: new_buffer_size} + else + flush(state) + end + end +end +``` + +See [tests](../test/ch/on_disk_buffer_test.exs) for more. + +TODO: notes on using it in docker and "surviving" restarts diff --git a/lib/ch.ex b/lib/ch.ex index dcc3002..a26a3fb 100644 --- a/lib/ch.ex +++ b/lib/ch.ex @@ -55,7 +55,6 @@ defmodule Ch do | {:command, Ch.Query.command()} | {:headers, [{String.t(), String.t()}]} | {:format, String.t()} - | {:decode, boolean} | DBConnection.connection_option() @doc """ @@ -71,8 +70,7 @@ defmodule Ch do * `:timeout` - Configures both query request timeout and HTTP receive timeout in milliseconds, whichever happens faster * `:command` - Command tag for the query * `:headers` - Custom HTTP headers for the request - * `:format` - Custom response format for the request - * `:decode` - Whether to automatically decode the response + * `:format` - Custom response format for the request, if provided, the response is not decoded automatically * [`DBConnection.connection_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:connection_option/0) """ @@ -105,13 +103,7 @@ defmodule Ch do %Ch.Stream{conn: conn, query: query, params: params, opts: opts} end - # TODO drop - @doc false - @spec run(DBConnection.conn(), (DBConnection.t() -> any), Keyword.t()) :: any - def run(conn, f, opts \\ []) when is_function(f, 1) do - DBConnection.run(conn, f, opts) - end - + # TODO need it? if Code.ensure_loaded?(Ecto.ParameterizedType) do @behaviour Ecto.ParameterizedType diff --git a/lib/ch/error.ex b/lib/ch/error.ex index 9b427ee..5c2f313 100644 --- a/lib/ch/error.ex +++ b/lib/ch/error.ex @@ -1,5 +1,5 @@ defmodule Ch.Error do @moduledoc "Error struct wrapping ClickHouse error responses." defexception [:code, :message] - @type t :: %__MODULE__{code: pos_integer | nil, message: String.t()} + @type t :: %__MODULE__{code: pos_integer | nil, message: binary} end diff --git a/lib/ch/http.ex b/lib/ch/http.ex new file mode 100644 index 0000000..bc5fb4d --- /dev/null +++ b/lib/ch/http.ex @@ -0,0 +1,2 @@ +defmodule Ch.HTTP do +end diff --git a/lib/ch/native.ex b/lib/ch/native.ex new file mode 100644 index 0000000..96f7a2f --- /dev/null +++ b/lib/ch/native.ex @@ -0,0 +1,3 @@ +defmodule Ch.Native do + @moduledoc false +end diff --git a/lib/ch/query.ex b/lib/ch/query.ex index 4aaf5bb..ff3d513 100644 --- a/lib/ch/query.ex +++ b/lib/ch/query.ex @@ -1,16 +1,13 @@ defmodule Ch.Query do @moduledoc "Query struct wrapping the SQL statement." - defstruct [:statement, :command, :encode, :decode] - - @type t :: %__MODULE__{statement: iodata, command: command, encode: boolean, decode: boolean} + defstruct [:statement, :command] + @type t :: %__MODULE__{statement: iodata, command: command} @doc false @spec build(iodata, [Ch.query_option()]) :: t def build(statement, opts \\ []) do command = Keyword.get(opts, :command) || extract_command(statement) - encode = Keyword.get(opts, :encode, true) - decode = Keyword.get(opts, :decode, true) - %__MODULE__{statement: statement, command: command, encode: encode, decode: decode} + %__MODULE__{statement: statement, command: command} end statements = [ diff --git a/lib/ch/result.ex b/lib/ch/result.ex index db75fc6..86dfd0d 100644 --- a/lib/ch/result.ex +++ b/lib/ch/result.ex @@ -2,10 +2,9 @@ defmodule Ch.Result do @moduledoc """ Result struct returned from any successful query. Its fields are: - * `command` - An atom of the query command, for example: `:select`, `:insert`; + * `command` - An atom of the query command, for example: `:select`, `:insert` * `rows` - A list of lists, each inner list corresponding to a row, each element in the inner list corresponds to a column - * `num_rows` - The number of fetched or affected rows; - * `headers` - The HTTP response headers + * `num_rows` - The number of fetched or affected rows * `data` - The raw iodata from the response """ @@ -14,8 +13,7 @@ defmodule Ch.Result do @type t :: %__MODULE__{ command: Ch.Query.command(), num_rows: non_neg_integer | nil, - rows: [[term]] | iodata | nil, - headers: Mint.Types.headers(), + rows: [[term]] | nil, data: iodata } end diff --git a/lib/ch/row_binary.ex b/lib/ch/row_binary.ex index d2081b7..afa1d05 100644 --- a/lib/ch/row_binary.ex +++ b/lib/ch/row_binary.ex @@ -1,6 +1,8 @@ defmodule Ch.RowBinary do @moduledoc "Helpers for working with ClickHouse [`RowBinary`](https://clickhouse.com/docs/en/sql-reference/formats#rowbinary) format." + # TODO cleanup + # @compile {:bin_opt_info, true} @dialyzer :no_improper_lists diff --git a/lib/ch/ssl.ex b/lib/ch/ssl.ex new file mode 100644 index 0000000..bd24c14 --- /dev/null +++ b/lib/ch/ssl.ex @@ -0,0 +1,3 @@ +defmodule Ch.SSL do + @moduledoc false +end diff --git a/lib/ch/stream.ex b/lib/ch/stream.ex index 9ec8b5f..cd5fcd2 100644 --- a/lib/ch/stream.ex +++ b/lib/ch/stream.ex @@ -24,6 +24,7 @@ defmodule Ch.Stream do def slice(_), do: {:error, __MODULE__} end + # TODO optimize defimpl Collectable do def into(stream) do %Ch.Stream{conn: conn, query: query, params: params, opts: opts} = stream diff --git a/lib/ch/types.ex b/lib/ch/types.ex index c9e7731..a877435 100644 --- a/lib/ch/types.ex +++ b/lib/ch/types.ex @@ -3,6 +3,8 @@ defmodule Ch.Types do Helpers to turn ClickHouse types into Elixir terms for easier processing. """ + # TODO cleanup + types = [ {_encoded = "String", _decoded = :string, _args = []}, diff --git a/mix.exs b/mix.exs index 1ac899c..5357beb 100644 --- a/mix.exs +++ b/mix.exs @@ -2,7 +2,7 @@ defmodule Ch.MixProject do use Mix.Project @source_url "https://github.com/plausible/ch" - @version "0.2.8" + @version "0.3.0" def project do [ @@ -12,7 +12,7 @@ defmodule Ch.MixProject do elixirc_paths: elixirc_paths(Mix.env()), deps: deps(), name: "Ch", - description: "HTTP ClickHouse driver for Elixir", + description: "ClickHouse driver for Elixir", docs: docs(), package: package(), source_url: @source_url @@ -33,9 +33,9 @@ defmodule Ch.MixProject do # Run "mix help deps" to learn about dependencies. defp deps do [ - {:mint, "~> 1.0"}, + {:mint, "~> 1.0", optional: true}, {:db_connection, "~> 2.0"}, - {:jason, "~> 1.0"}, + {:jason, "~> 1.0", optional: true}, {:decimal, "~> 2.0"}, {:ecto, "~> 3.12", optional: true}, {:benchee, "~> 1.0", only: [:bench]}, diff --git a/test/test_helper.exs b/test/test_helper.exs index 5685699..325ab17 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1,8 +1,8 @@ Calendar.put_time_zone_database(Tz.TimeZoneDatabase) default_test_db = System.get_env("CH_DATABASE", "ch_elixir_test") -{:ok, _} = Ch.Test.sql_exec("DROP DATABASE IF EXISTS #{default_test_db}") -{:ok, _} = Ch.Test.sql_exec("CREATE DATABASE #{default_test_db}") +Ch.HTTP.query!("DROP DATABASE IF EXISTS {db:Identifier}", %{"db" => default_test_db}) +Ch.HTTP.query!("CREATE DATABASE {db:Identifier}}", %{"db" => default_test_db}) Application.put_env(:ch, :database, default_test_db) ExUnit.start(exclude: [:slow]) From b1fc1d3f209a7623025e17af56e8ac3d0ee50220 Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Fri, 18 Oct 2024 12:02:18 +0700 Subject: [PATCH 3/4] mv rows to query arg --- lib/ch.ex | 24 +++++++++++------------- lib/ch/connection.ex | 1 - lib/ch/http.ex | 2 -- lib/ch/native.ex | 3 --- lib/ch/ssl.ex | 3 --- lib/ch/stream.ex | 2 +- 6 files changed, 12 insertions(+), 23 deletions(-) delete mode 100644 lib/ch/http.ex delete mode 100644 lib/ch/native.ex delete mode 100644 lib/ch/ssl.ex diff --git a/lib/ch.ex b/lib/ch.ex index a26a3fb..4643fb1 100644 --- a/lib/ch.ex +++ b/lib/ch.ex @@ -7,14 +7,13 @@ defmodule Ch do | {:username, String.t()} | {:password, String.t()} | {:settings, Keyword.t()} - | {:timeout, timeout} @type start_option :: common_option | {:scheme, String.t()} | {:hostname, String.t()} | {:port, :inet.port_number()} - | {:transport_opts, :gen_tcp.connect_option() | :ssl.tls_client_option()} + | {:transport_opts, [:gen_tcp.connect_option() | :ssl.tls_client_option()]} | DBConnection.start_option() @doc """ @@ -30,8 +29,6 @@ defmodule Ch do * `:username` - Username * `:password` - User password * `:settings` - Keyword list of ClickHouse settings to send wtih every query - * `:timeout` - HTTP receive timeout in milliseconds - * `:transport_opts` - options to be given to the transport being used. See `Mint.HTTP1.connect/4` for more info * [`DBConnection.start_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:start_option/0) """ @@ -50,13 +47,18 @@ defmodule Ch do DBConnection.child_spec(Connection, opts) end + @type query :: iodata + @type query_option :: common_option | {:command, Ch.Query.command()} | {:headers, [{String.t(), String.t()}]} - | {:format, String.t()} | DBConnection.connection_option() + @type query_params :: + %{(name :: String.t()) => value :: term} + | [{name :: String.t(), value :: term}] + @doc """ Runs a query and returns the result as `{:ok, %Ch.Result{}}` or `{:error, Exception.t()}` if there was a database error. @@ -67,16 +69,13 @@ defmodule Ch do * `:username` - Username * `:password` - User password * `:settings` - Keyword list of settings to merge with `:settings` from `start_link` and send with this query - * `:timeout` - Configures both query request timeout and HTTP receive timeout in milliseconds, whichever happens faster - * `:command` - Command tag for the query + * `:command` - Command tag for the query like `:insert` or `:select`, to avoid extracting it from SQL. Used in some Ecto.Repo `:telemetry` events * `:headers` - Custom HTTP headers for the request - * `:format` - Custom response format for the request, if provided, the response is not decoded automatically * [`DBConnection.connection_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:connection_option/0) """ - @spec query(DBConnection.conn(), iodata, params, [query_option]) :: + @spec query(DBConnection.conn(), query, query_params, [query_option]) :: {:ok, Result.t()} | {:error, Exception.t()} - when params: map | [term] | [row :: [term]] | iodata | Enumerable.t() def query(conn, statement, params \\ [], opts \\ []) do query = Query.build(statement, opts) @@ -89,15 +88,14 @@ defmodule Ch do Runs a query and returns the result or raises `Ch.Error` if there was an error. See `query/4`. """ - @spec query!(DBConnection.conn(), iodata, params, [query_option]) :: Result.t() - when params: map | [term] | [row :: [term]] | iodata | Enumerable.t() + @spec query!(DBConnection.conn(), query, query_params, [query_option]) :: Result.t() def query!(conn, statement, params \\ [], opts \\ []) do query = Query.build(statement, opts) DBConnection.execute!(conn, query, params, opts) end @doc false - @spec stream(DBConnection.t(), iodata, map | [term], [query_option]) :: Ch.Stream.t() + @spec stream(DBConnection.t(), query, query_params, [query_option]) :: Ch.Stream.t() def stream(conn, statement, params \\ [], opts \\ []) do query = Query.build(statement, opts) %Ch.Stream{conn: conn, query: query, params: params, opts: opts} diff --git a/lib/ch/connection.ex b/lib/ch/connection.ex index 3742f47..e5ddef8 100644 --- a/lib/ch/connection.ex +++ b/lib/ch/connection.ex @@ -20,7 +20,6 @@ defmodule Ch.Connection do with {:ok, conn} <- HTTP.connect(scheme, address, port, mint_opts) do conn = conn - |> HTTP.put_private(:timeout, opts[:timeout] || :timer.seconds(15)) |> maybe_put_private(:database, opts[:database]) |> maybe_put_private(:username, opts[:username]) |> maybe_put_private(:password, opts[:password]) diff --git a/lib/ch/http.ex b/lib/ch/http.ex deleted file mode 100644 index bc5fb4d..0000000 --- a/lib/ch/http.ex +++ /dev/null @@ -1,2 +0,0 @@ -defmodule Ch.HTTP do -end diff --git a/lib/ch/native.ex b/lib/ch/native.ex deleted file mode 100644 index 96f7a2f..0000000 --- a/lib/ch/native.ex +++ /dev/null @@ -1,3 +0,0 @@ -defmodule Ch.Native do - @moduledoc false -end diff --git a/lib/ch/ssl.ex b/lib/ch/ssl.ex deleted file mode 100644 index bd24c14..0000000 --- a/lib/ch/ssl.ex +++ /dev/null @@ -1,3 +0,0 @@ -defmodule Ch.SSL do - @moduledoc false -end diff --git a/lib/ch/stream.ex b/lib/ch/stream.ex index cd5fcd2..ae2782f 100644 --- a/lib/ch/stream.ex +++ b/lib/ch/stream.ex @@ -8,7 +8,7 @@ defmodule Ch.Stream do conn: DBConnection.conn(), ref: Mint.Types.request_ref() | nil, query: Ch.Query.t(), - params: term, + params: Ch.query_params(), opts: [Ch.query_option()] } From 779e4e1301e612fe355c8881359c5f6f3098804b Mon Sep 17 00:00:00 2001 From: ruslandoga <67764432+ruslandoga@users.noreply.github.com> Date: Fri, 18 Oct 2024 12:28:36 +0700 Subject: [PATCH 4/4] mv rows to query arg --- README.md | 293 +++++------------------------------------------------- 1 file changed, 23 insertions(+), 270 deletions(-) diff --git a/README.md b/README.md index 0d6a654..114aee9 100644 --- a/README.md +++ b/README.md @@ -3,19 +3,10 @@ [![Documentation badge](https://img.shields.io/badge/Documentation-ff69b4)](https://hexdocs.pm/ch) [![Hex.pm badge](https://img.shields.io/badge/Package%20on%20hex.pm-informational)](https://hex.pm/packages/ch) -Minimal ClickHouse client for Elixir. +Minimal HTTP ClickHouse client for Elixir. Used in [Ecto ClickHouse adapter.](https://github.com/plausible/ecto_ch) -### Key features - -- Native query parameters -- Per query settings -- Minimal API -- HTTP or Native -- [Multinode support](./guides/multihost.md) -- [Compression](./guides/compression.md) - ## Installation ```elixir @@ -28,179 +19,30 @@ end ## Usage -#### Start [DBConnection](https://github.com/elixir-ecto/db_connection) pool - -```elixir -defaults = [ - scheme: "http", - hostname: "localhost", - port: 8123, - database: "default", - settings: [], - pool_size: 1, - timeout: :timer.seconds(15) -] - -{:ok, pid} = Ch.start_link(defaults) -``` - -#### Select rows - -```elixir -{:ok, pid} = Ch.start_link() - -{:ok, %Ch.Result{rows: [[0], [1], [2]]}} = - Ch.query(pid, "SELECT * FROM system.numbers LIMIT 3") - -{:ok, %Ch.Result{rows: [[0], [1], [2]]}} = - Ch.query(pid, "SELECT * FROM system.numbers LIMIT {limit:UInt8}", %{"limit" => 3}) -``` - -Note on datetime encoding in query parameters: - -- `%NaiveDateTime{}` is encoded as text to make it assume the column's or ClickHouse server's timezone -- `%DateTime{time_zone: "Etc/UTC"}` is encoded as unix timestamp and is treated as UTC timestamp by ClickHouse -- encoding non-UTC `%DateTime{}` requires a [time zone database](https://hexdocs.pm/elixir/1.17.1/DateTime.html#module-time-zone-database) - -#### Insert rows - -```elixir -{:ok, pid} = Ch.start_link() - -Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") - -%Ch.Result{num_rows: 2} = - Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES (0), (1)") - -%Ch.Result{num_rows: 2} = - Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES ({a:UInt16}), ({b:UInt64})", %{"a" => 0, "b" => 1}) - -%Ch.Result{num_rows: 2} = - Ch.query!(pid, "INSERT INTO ch_demo(id) SELECT number FROM system.numbers LIMIT {limit:UInt8}", %{"limit" => 2}) -``` - -#### Insert [RowBinary](https://clickhouse.com/docs/en/interfaces/formats#rowbinary) - -```elixir -{:ok, pid} = Ch.start_link() - -Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64, text String) ENGINE Null") +Please see the tests for usage examples: -rows = [ - [0, "a"], - [1, "b"] -] +- [Select rows](./tests/examples/select_rows_test.exs) +- [Insert rows](./tests/examples/insert_rows_test.exs) +- [Insert RowBinary](./tests/examples/insert_rowbinary_test.exs) +- [Insert CSV](./tests/examples/insert_csv_test.exs) +- [Insert compressed RowBinary](./tests/examples/insert_compressed_rowbinary_test.exs) +- [Insert chunked RowBinary](./tests/examples/insert_chunked_rowbinary_test.exs) +- [Insert chunked and compressed RowBinary](./tests/examples/insert_chunked_compressed_rowbinary_test.exs) +- [Insert from a file](./tests/examples/insert_from_file_test.exs) +- [Custom settings](./tests/custom_settings_test.exs) +- [Custom headers](./tests/custom_headers_test.exs) -types = ["UInt64", "String"] -# or -types = [Ch.Types.u64(), Ch.Types.string()] -# or -types = [:u64, :string] - -rowbinary = Ch.RowBinary.encode_rows(rows, types) - -%Ch.Result{num_rows: 2} = - Ch.query!(pid, ["INSERT INTO ch_demo(id, text) FORMAT RowBinary\n" | rowbinary]) -``` - -Similarly, you can use [`RowBinaryWithNamesAndTypes`](https://clickhouse.com/docs/en/interfaces/formats#rowbinarywithnamesandtypes) which would additionally do something not quite unlike a type check. - -```elixir -sql = "INSERT INTO ch_demo FORMAT RowBinaryWithNamesAndTypes\n" - -rows = [ - [0, "a"], - [1, "b"] -] - -types = ["UInt64", "String"] -names = ["id", "text"] - -rowbinary_with_names_and_types = [ - Ch.RowBinary.encode_names_and_types(names, types), - Ch.RowBinary.encode_rows(rows, types) -] - -%Ch.Result{num_rows: 2} = - Ch.query!(pid, [sql | rowbinary_with_names_and_types]) -``` - -And you can use buffer helpers too. They are available for RowBinary, RowBinaryWithNamesAndTypes, and Native formats. - -```elixir -buffer = Ch.RowBinary.new_buffer(_types = ["UInt64", "String"]) -buffer = Ch.RowBinary.push_buffer(buffer, [[0, "a"], [1, "b"]]) -rowbinary = Ch.RowBinary.buffer_to_iodata(buffer) - -%Ch.Result{num_rows: 2} = - Ch.query!(pid, ["INSERT INTO ch_demo(id, text) FORMAT RowBinary\n" | rowbinary]) -``` - -#### Insert rows in some other [format](https://clickhouse.com/docs/en/interfaces/formats) - -```elixir -{:ok, pid} = Ch.start_link() - -Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64, text String) ENGINE Null") - -csv = - """ - 0,"a"\n - 1,"b"\ - """ - -%Ch.Result{num_rows: 2} = - Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT CSV\n" | csv]) -``` - -#### Insert [chunked](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) RowBinary stream - -```elixir -{:ok, pid} = Ch.start_link() - -Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") - -DBConnection.run(pid, fn conn -> - Stream.repeatedly(fn -> [:rand.uniform(100)] end) - |> Stream.chunk_every(100_000) - |> Stream.map(fn chunk -> Ch.RowBinary.encode_rows(chunk, _types = ["UInt64"]) end) - |> Stream.take(10) - |> Stream.into(Ch.stream(conn, "INSERT INTO ch_demo(id) FORMAT RowBinary\n")) - |> Stream.run() -end) -``` - -This query makes a [`transfer-encoding: chunked`] HTTP request while unfolding the stream resulting in lower memory usage. - -#### Stream from a file - -```elixir -{:ok, pid} = Ch.start_link() - -Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") - -DBConnection.run(pid, fn conn -> - File.stream!("buffer.tmp", _bytes = 2048) - |> Stream.into(Ch.stream(conn, "INSERT INTO ch_demo(id) FORMAT RowBinary\n")) - |> Stream.run() -end) -``` - -#### Query with custom [settings](https://clickhouse.com/docs/en/operations/settings/settings) - -```elixir -{:ok, pid} = Ch.start_link() +## Caveats -settings = [async_insert: 1] +#### Timestamps in query parameters -%Ch.Result{rows: [["async_insert", "Bool", "0"]]} = - Ch.query!(pid, "SHOW SETTINGS LIKE 'async_insert'") +> [!WARNING] +> - `%NaiveDateTime{}` is encoded as text to make it assume the column's or ClickHouse server's timezone +> - `%DateTime{time_zone: "Etc/UTC"}` is encoded as a unix timestamp and is treated as UTC timestamp by ClickHouse +> - `%DateTime{time_zone: time_zone}` is shifted to `"Etc/UTC"` and then encoded as a unix timestamp, this requires a [time zone database](https://hexdocs.pm/elixir/1.17.1/DateTime.html#module-time-zone-database) to be configured -%Ch.Result{rows: [["async_insert", "Bool", "1"]]} = - Ch.query!(pid, "SHOW SETTINGS LIKE 'async_insert'", _params = [], settings: settings) -``` -## Caveats +TODO: See test. #### NULL in RowBinary @@ -208,112 +50,23 @@ It's the same as in [`ch-go`](https://clickhouse.com/docs/en/integrations/go#nul > At insert time, Nil can be passed for both the normal and Nullable version of a column. For the former, the default value for the type will be persisted, e.g., an empty string for string. For the nullable version, a NULL value will be stored in ClickHouse. -```elixir -{:ok, pid} = Ch.start_link() - -Ch.query!(pid, """ -CREATE TABLE ch_nulls ( - a UInt8 NULL, - b UInt8 DEFAULT 10, - c UInt8 NOT NULL -) ENGINE Memory -""") - -types = ["Nullable(UInt8)", "UInt8", "UInt8"] -row = [nil, nil, nil] -rowbinary = Ch.RowBinary.encode_row(row, types) - -%Ch.Result{num_rows: 1} = - Ch.query!(pid, ["INSERT INTO ch_nulls(a, b, c) FORMAT RowBinary\n" | rowbinary]) - -%Ch.Result{rows: [[nil, _not_10 = 0, 0]]} = - Ch.query!(pid, "SELECT * FROM ch_nulls") -``` - -Note that in this example `DEFAULT 10` is ignored and `0` (the default value for `UInt8`) is persisted instead. - -However, [`input()`](https://clickhouse.com/docs/en/sql-reference/table-functions/input) can be used as a workaround: - -```elixir -sql = """ -INSERT INTO ch_nulls - SELECT * FROM input('a Nullable(UInt8), b Nullable(UInt8), c UInt8') - FORMAT RowBinary -""" - -types = ["Nullable(UInt8)", "Nullable(UInt8)", "UInt8"] -rowbinary = Ch.RowBinary.encode_row(row, types) - -%Ch.Result{num_rows: 1} = - Ch.query!(pid, [sql | rowbinary]) - -%Ch.Result{rows: [_before = [0], _after = [10]]} = - Ch.query!(pid, "SELECT b FROM ch_nulls ORDER BY b") -``` +TODO: See test. #### UTF-8 in RowBinary When decoding [`String`](https://clickhouse.com/docs/en/sql-reference/data-types/string) columns non UTF-8 characters are replaced with `�` (U+FFFD). This behaviour is similar to [`toValidUTF8`](https://clickhouse.com/docs/en/sql-reference/functions/string-functions#tovalidutf8) and [JSON format.](https://clickhouse.com/docs/en/interfaces/formats#json) -```elixir -{:ok, pid} = Ch.start_link() - -Ch.query!(pid, "CREATE TABLE ch_utf8(str String) ENGINE Memory") - -rowbinary = Ch.RowBinary.encode(:string, "\x61\xF0\x80\x80\x80b") - -%Ch.Result{num_rows: 1} = - Ch.query!(pid, ["INSERT INTO ch_utf8(str) FORMAT RowBinary\n" | rowbinary]) - -%Ch.Result{rows: [["a�b"]]} = - Ch.query!(pid, "SELECT * FROM ch_utf8") - -%Ch.Result{rows: %{"data" => [["a�b"]]}} = - pid |> Ch.query!("SELECT * FROM ch_utf8 FORMAT JSONCompact") |> Map.update!(:rows, &Jason.decode!/1) -``` +TODO: See test. #### Timezones in RowBinary Decoding non-UTC datetimes like `DateTime('Asia/Taipei')` requires a [timezone database.](https://hexdocs.pm/elixir/DateTime.html#module-time-zone-database) -```elixir -Mix.install([:ch, :tz]) - -:ok = Calendar.put_time_zone_database(Tz.TimeZoneDatabase) - -{:ok, pid} = Ch.start_link() - -%Ch.Result{rows: [[~N[2023-04-25 17:45:09]]]} = - Ch.query!(pid, "SELECT CAST(now() as DateTime)") - -%Ch.Result{rows: [[~U[2023-04-25 17:45:11Z]]]} = - Ch.query!(pid, "SELECT CAST(now() as DateTime('UTC'))") - -%Ch.Result{rows: [[%DateTime{time_zone: "Asia/Taipei"} = taipei]]} = - Ch.query!(pid, "SELECT CAST(now() as DateTime('Asia/Taipei'))") - -"2023-04-26 01:45:12+08:00 CST Asia/Taipei" = to_string(taipei) -``` +TODO: See test. Encoding non-UTC datetimes is possible but slow. -```elixir -Ch.query!(pid, "CREATE TABLE ch_datetimes(datetime DateTime) ENGINE Null") - -naive = NaiveDateTime.utc_now() -utc = DateTime.utc_now() -taipei = DateTime.shift_zone!(utc, "Asia/Taipei") - -rows = [ - [naive], - [utc], - [taipei] -] - -types = ["DateTime"] - -Ch.RowBinary.encode_rows(rows, types) -``` +TODO: See test. ## Benchmarks