-SELECT
500, 500 thousand, and 500 million rows (original)
-
-
-$ MIX_ENV=bench mix run bench/stream.exs
-
-This benchmark is based on https://github.com/ClickHouse/ch-bench
-
-Operating System: macOS
-CPU Information: Apple M1
-Number of Available Cores: 8
-Available memory: 8 GB
-Elixir 1.14.4
-Erlang 25.3
-
-Benchmark suite executing with the following configuration:
-warmup: 2 s
-time: 5 s
-memory time: 0 ns
-reduction time: 0 ns
-parallel: 1
-inputs: 500 rows, 500_000 rows, 500_000_000 rows
-Estimated total run time: 1.05 min
-
-Benchmarking stream with decode with input 500 rows ...
-Benchmarking stream with decode with input 500_000 rows ...
-Benchmarking stream with decode with input 500_000_000 rows ...
-Benchmarking stream with manual decode with input 500 rows ...
-Benchmarking stream with manual decode with input 500_000 rows ...
-Benchmarking stream with manual decode with input 500_000_000 rows ...
-Benchmarking stream without decode with input 500 rows ...
-Benchmarking stream without decode with input 500_000 rows ...
-Benchmarking stream without decode with input 500_000_000 rows ...
-
-##### With input 500 rows #####
-Name ips average deviation median 99th %
-stream with decode 4.69 K 213.34 μs ±12.49% 211.38 μs 290.94 μs
-stream with manual decode 4.69 K 213.43 μs ±17.40% 210.96 μs 298.75 μs
-stream without decode 4.65 K 215.08 μs ±10.79% 213.79 μs 284.66 μs
-
-Comparison:
-stream with decode 4.69 K
-stream with manual decode 4.69 K - 1.00x slower +0.0838 μs
-stream without decode 4.65 K - 1.01x slower +1.74 μs
-
-##### With input 500_000 rows #####
-Name ips average deviation median 99th %
-stream without decode 234.58 4.26 ms ±13.99% 4.04 ms 5.95 ms
-stream with manual decode 64.26 15.56 ms ±8.36% 15.86 ms 17.97 ms
-stream with decode 41.03 24.37 ms ±6.27% 24.39 ms 26.60 ms
-
-Comparison:
-stream without decode 234.58
-stream with manual decode 64.26 - 3.65x slower +11.30 ms
-stream with decode 41.03 - 5.72x slower +20.11 ms
-
-##### With input 500_000_000 rows #####
-Name ips average deviation median 99th %
-stream without decode 0.32 3.17 s ±0.20% 3.17 s 3.17 s
-stream with manual decode 0.0891 11.23 s ±0.00% 11.23 s 11.23 s
-stream with decode 0.0462 21.66 s ±0.00% 21.66 s 21.66 s
-
-Comparison:
-stream without decode 0.32
-stream with manual decode 0.0891 - 3.55x slower +8.06 s
-stream with decode 0.0462 - 6.84x slower +18.50 s
-
-
-
-
-[CI Results](https://github.com/plausible/ch/actions/workflows/bench.yml) (click the latest workflow run and scroll down to "Artifacts")
+Please see [CI Results](https://github.com/plausible/ch/actions/workflows/bench.yml) (make sure to click the latest workflow run and scroll down to "Artifacts") for [some of our benchmarks.](./bench/) :)
diff --git a/lib/ch.ex b/lib/ch.ex
index ad4217f..dcc3002 100644
--- a/lib/ch.ex
+++ b/lib/ch.ex
@@ -14,7 +14,7 @@ defmodule Ch do
| {:scheme, String.t()}
| {:hostname, String.t()}
| {:port, :inet.port_number()}
- | {:transport_opts, :gen_tcp.connect_option()}
+ | {:transport_opts, :gen_tcp.connect_option() | :ssl.tls_client_option()}
| DBConnection.start_option()
@doc """
@@ -29,7 +29,7 @@ defmodule Ch do
* `:database` - Database, defaults to `"default"`
* `:username` - Username
* `:password` - User password
- * `:settings` - Keyword list of ClickHouse settings
+ * `:settings` - Keyword list of ClickHouse settings to send wtih every query
* `:timeout` - HTTP receive timeout in milliseconds
* `:transport_opts` - options to be given to the transport being used. See `Mint.HTTP1.connect/4` for more info
* [`DBConnection.start_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:start_option/0)
@@ -55,8 +55,6 @@ defmodule Ch do
| {:command, Ch.Query.command()}
| {:headers, [{String.t(), String.t()}]}
| {:format, String.t()}
- # TODO remove
- | {:encode, boolean}
| {:decode, boolean}
| DBConnection.connection_option()
@@ -69,8 +67,8 @@ defmodule Ch do
* `:database` - Database
* `:username` - Username
* `:password` - User password
- * `:settings` - Keyword list of settings
- * `:timeout` - Query request timeout
+ * `:settings` - Keyword list of settings to merge with `:settings` from `start_link` and send with this query
+ * `:timeout` - Configures both query request timeout and HTTP receive timeout in milliseconds, whichever happens faster
* `:command` - Command tag for the query
* `:headers` - Custom HTTP headers for the request
* `:format` - Custom response format for the request
From 1b7919fbd13b8be0593ddbc7c663f31e320e6c62 Mon Sep 17 00:00:00 2001
From: ruslandoga <67764432+ruslandoga@users.noreply.github.com>
Date: Fri, 18 Oct 2024 11:48:46 +0700
Subject: [PATCH 2/4] update readme
---
README.md | 69 ++++++++++++++++++++++++-------
bench/buffer.exs | 0
bench/decode.exs | 0
bench/encode.exs | 55 ++++++++++++++++++++++++
bench/http.exs | 0
bench/insert.exs | 9 +---
bench/native.exs | 0
bench/types.exs | 0
guides/in_memory_insert_buffer.md | 1 +
guides/multinode.md | 3 ++
guides/on_disk_insert_buffer.md | 37 +++++++++++++++++
lib/ch.ex | 12 +-----
lib/ch/error.ex | 2 +-
lib/ch/http.ex | 2 +
lib/ch/native.ex | 3 ++
lib/ch/query.ex | 9 ++--
lib/ch/result.ex | 8 ++--
lib/ch/row_binary.ex | 2 +
lib/ch/ssl.ex | 3 ++
lib/ch/stream.ex | 1 +
lib/ch/types.ex | 2 +
mix.exs | 8 ++--
test/test_helper.exs | 4 +-
23 files changed, 179 insertions(+), 51 deletions(-)
create mode 100644 bench/buffer.exs
create mode 100644 bench/decode.exs
create mode 100644 bench/encode.exs
create mode 100644 bench/http.exs
create mode 100644 bench/native.exs
create mode 100644 bench/types.exs
create mode 100644 guides/in_memory_insert_buffer.md
create mode 100644 guides/multinode.md
create mode 100644 guides/on_disk_insert_buffer.md
create mode 100644 lib/ch/http.ex
create mode 100644 lib/ch/native.ex
create mode 100644 lib/ch/ssl.ex
diff --git a/README.md b/README.md
index a48199c..0d6a654 100644
--- a/README.md
+++ b/README.md
@@ -3,25 +3,25 @@
[![Documentation badge](https://img.shields.io/badge/Documentation-ff69b4)](https://hexdocs.pm/ch)
[![Hex.pm badge](https://img.shields.io/badge/Package%20on%20hex.pm-informational)](https://hex.pm/packages/ch)
-Minimal HTTP ClickHouse client for Elixir.
+Minimal ClickHouse client for Elixir.
Used in [Ecto ClickHouse adapter.](https://github.com/plausible/ecto_ch)
### Key features
-- RowBinary
- Native query parameters
- Per query settings
- Minimal API
-
-Your ideas are welcome [here.](https://github.com/plausible/ch/issues/82)
+- HTTP or Native
+- [Multinode support](./guides/multihost.md)
+- [Compression](./guides/compression.md)
## Installation
```elixir
defp deps do
[
- {:ch, "~> 0.2.0"}
+ {:ch, "~> 0.3.0"}
]
end
```
@@ -60,7 +60,7 @@ Note on datetime encoding in query parameters:
- `%NaiveDateTime{}` is encoded as text to make it assume the column's or ClickHouse server's timezone
- `%DateTime{time_zone: "Etc/UTC"}` is encoded as unix timestamp and is treated as UTC timestamp by ClickHouse
-- encoding non UTC `%DateTime{}` raises `ArgumentError`
+- encoding non-UTC `%DateTime{}` requires a [time zone database](https://hexdocs.pm/elixir/1.17.1/DateTime.html#module-time-zone-database)
#### Insert rows
@@ -100,10 +100,10 @@ types = [:u64, :string]
rowbinary = Ch.RowBinary.encode_rows(rows, types)
%Ch.Result{num_rows: 2} =
- Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT RowBinary\n" | rowbinary])
+ Ch.query!(pid, ["INSERT INTO ch_demo(id, text) FORMAT RowBinary\n" | rowbinary])
```
-Similarly, you can use [`RowBinaryWithNamesAndTypes`](https://clickhouse.com/docs/en/interfaces/formats#rowbinarywithnamesandtypes) which would additionally do something like a type check.
+Similarly, you can use [`RowBinaryWithNamesAndTypes`](https://clickhouse.com/docs/en/interfaces/formats#rowbinarywithnamesandtypes) which would additionally do something not quite unlike a type check.
```elixir
sql = "INSERT INTO ch_demo FORMAT RowBinaryWithNamesAndTypes\n"
@@ -116,12 +116,24 @@ rows = [
types = ["UInt64", "String"]
names = ["id", "text"]
-data = [
+rowbinary_with_names_and_types = [
Ch.RowBinary.encode_names_and_types(names, types),
Ch.RowBinary.encode_rows(rows, types)
]
-%Ch.Result{num_rows: 2} = Ch.query!(pid, [sql | data])
+%Ch.Result{num_rows: 2} =
+ Ch.query!(pid, [sql | rowbinary_with_names_and_types])
+```
+
+And you can use buffer helpers too. They are available for RowBinary, RowBinaryWithNamesAndTypes, and Native formats.
+
+```elixir
+buffer = Ch.RowBinary.new_buffer(_types = ["UInt64", "String"])
+buffer = Ch.RowBinary.push_buffer(buffer, [[0, "a"], [1, "b"]])
+rowbinary = Ch.RowBinary.buffer_to_iodata(buffer)
+
+%Ch.Result{num_rows: 2} =
+ Ch.query!(pid, ["INSERT INTO ch_demo(id, text) FORMAT RowBinary\n" | rowbinary])
```
#### Insert rows in some other [format](https://clickhouse.com/docs/en/interfaces/formats)
@@ -129,9 +141,13 @@ data = [
```elixir
{:ok, pid} = Ch.start_link()
-Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")
+Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64, text String) ENGINE Null")
-csv = [0, 1] |> Enum.map(&to_string/1) |> Enum.intersperse(?\n)
+csv =
+ """
+ 0,"a"\n
+ 1,"b"\
+ """
%Ch.Result{num_rows: 2} =
Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT CSV\n" | csv])
@@ -156,6 +172,20 @@ end)
This query makes a [`transfer-encoding: chunked`] HTTP request while unfolding the stream resulting in lower memory usage.
+#### Stream from a file
+
+```elixir
+{:ok, pid} = Ch.start_link()
+
+Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")
+
+DBConnection.run(pid, fn conn ->
+ File.stream!("buffer.tmp", _bytes = 2048)
+ |> Stream.into(Ch.stream(conn, "INSERT INTO ch_demo(id) FORMAT RowBinary\n"))
+ |> Stream.run()
+end)
+```
+
#### Query with custom [settings](https://clickhouse.com/docs/en/operations/settings/settings)
```elixir
@@ -265,7 +295,7 @@ Mix.install([:ch, :tz])
"2023-04-26 01:45:12+08:00 CST Asia/Taipei" = to_string(taipei)
```
-Encoding non-UTC datetimes raises an `ArgumentError`
+Encoding non-UTC datetimes is possible but slow.
```elixir
Ch.query!(pid, "CREATE TABLE ch_datetimes(datetime DateTime) ENGINE Null")
@@ -274,10 +304,17 @@ naive = NaiveDateTime.utc_now()
utc = DateTime.utc_now()
taipei = DateTime.shift_zone!(utc, "Asia/Taipei")
-# ** (ArgumentError) non-UTC timezones are not supported for encoding: 2023-04-26 01:49:43.044569+08:00 CST Asia/Taipei
-Ch.RowBinary.encode_rows([[naive], [utc], [taipei]], ["DateTime"])
+rows = [
+ [naive],
+ [utc],
+ [taipei]
+]
+
+types = ["DateTime"]
+
+Ch.RowBinary.encode_rows(rows, types)
```
## Benchmarks
-Please see [CI Results](https://github.com/plausible/ch/actions/workflows/bench.yml) (make sure to click the latest workflow run and scroll down to "Artifacts") for [some of our benchmarks.](./bench/) :)
+Please see [CI Results](https://github.com/plausible/ch/actions/workflows/bench.yml) (make sure to click the latest workflow run and scroll down to "Artifacts") for [some of our benchmarks.](./bench/)
diff --git a/bench/buffer.exs b/bench/buffer.exs
new file mode 100644
index 0000000..e69de29
diff --git a/bench/decode.exs b/bench/decode.exs
new file mode 100644
index 0000000..e69de29
diff --git a/bench/encode.exs b/bench/encode.exs
new file mode 100644
index 0000000..f6f01c4
--- /dev/null
+++ b/bench/encode.exs
@@ -0,0 +1,55 @@
+IO.puts("This benchmark is based on https://github.com/ClickHouse/clickhouse-go#benchmark\n")
+
+port = String.to_integer(System.get_env("CH_PORT") || "8123")
+hostname = System.get_env("CH_HOSTNAME") || "localhost"
+scheme = System.get_env("CH_SCHEME") || "http"
+database = System.get_env("CH_DATABASE") || "ch_bench"
+
+{:ok, conn} = Ch.start_link(scheme: scheme, hostname: hostname, port: port)
+Ch.query!(conn, "CREATE DATABASE IF NOT EXISTS {$0:Identifier}", [database])
+
+Ch.query!(conn, """
+CREATE TABLE IF NOT EXISTS #{database}.benchmark (
+ col1 UInt64,
+ col2 String,
+ col3 Array(UInt8),
+ col4 DateTime
+) Engine Null
+""")
+
+types = [Ch.Types.u64(), Ch.Types.string(), Ch.Types.array(Ch.Types.u8()), Ch.Types.datetime()]
+statement = "INSERT INTO #{database}.benchmark FORMAT RowBinary"
+
+rows = fn count ->
+ Enum.map(1..count, fn i ->
+ [i, "Golang SQL database driver", [1, 2, 3, 4, 5, 6, 7, 8, 9], NaiveDateTime.utc_now()]
+ end)
+end
+
+alias Ch.RowBinary
+
+Benchee.run(
+ %{
+ # "control" => fn rows -> Enum.each(rows, fn _row -> :ok end) end,
+ "encode" => fn rows -> RowBinary.encode_rows(rows, types) end,
+ "insert" => fn rows -> Ch.query!(conn, statement, rows, types: types) end,
+ # "control stream" => fn rows -> rows |> Stream.chunk_every(60_000) |> Stream.run() end,
+ "encode stream" => fn rows ->
+ rows
+ |> Stream.chunk_every(60_000)
+ |> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end)
+ |> Stream.run()
+ end,
+ "insert stream" => fn rows ->
+ stream =
+ rows
+ |> Stream.chunk_every(60_000)
+ |> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end)
+
+ Ch.query!(conn, statement, stream, encode: false)
+ end
+ },
+ inputs: %{
+ "1_000_000 rows" => rows.(1_000_000)
+ }
+)
diff --git a/bench/http.exs b/bench/http.exs
new file mode 100644
index 0000000..e69de29
diff --git a/bench/insert.exs b/bench/insert.exs
index f6f01c4..9f06704 100644
--- a/bench/insert.exs
+++ b/bench/insert.exs
@@ -31,15 +31,10 @@ alias Ch.RowBinary
Benchee.run(
%{
# "control" => fn rows -> Enum.each(rows, fn _row -> :ok end) end,
- "encode" => fn rows -> RowBinary.encode_rows(rows, types) end,
+
"insert" => fn rows -> Ch.query!(conn, statement, rows, types: types) end,
# "control stream" => fn rows -> rows |> Stream.chunk_every(60_000) |> Stream.run() end,
- "encode stream" => fn rows ->
- rows
- |> Stream.chunk_every(60_000)
- |> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end)
- |> Stream.run()
- end,
+
"insert stream" => fn rows ->
stream =
rows
diff --git a/bench/native.exs b/bench/native.exs
new file mode 100644
index 0000000..e69de29
diff --git a/bench/types.exs b/bench/types.exs
new file mode 100644
index 0000000..e69de29
diff --git a/guides/in_memory_insert_buffer.md b/guides/in_memory_insert_buffer.md
new file mode 100644
index 0000000..3d73602
--- /dev/null
+++ b/guides/in_memory_insert_buffer.md
@@ -0,0 +1 @@
+# In-memory INSERT buffer
diff --git a/guides/multinode.md b/guides/multinode.md
new file mode 100644
index 0000000..9d7bff9
--- /dev/null
+++ b/guides/multinode.md
@@ -0,0 +1,3 @@
+# Connecting to multiple nodes
+
+Similar to https://clickhouse.com/docs/en/integrations/go#connecting-to-multiple-nodes
diff --git a/guides/on_disk_insert_buffer.md b/guides/on_disk_insert_buffer.md
new file mode 100644
index 0000000..0301b03
--- /dev/null
+++ b/guides/on_disk_insert_buffer.md
@@ -0,0 +1,37 @@
+# On-disk INSERT buffer
+
+Here how you could do it
+
+```elixir
+defmodule WriteBuffer do
+ use GenServer
+
+ # 5 MB
+ max_buffer_size = 5_000_000
+
+ def insert(rows) do
+ row_binary = Ch.RowBinary.encode_many(rows, unquote(encoding_types))
+ GenServer.call(__MODULE__, {:buffer, row_binary})
+ end
+
+ def init(opts) do
+ {:ok, fd} = :file.open()
+ %{fd: fd, buffer_size: 0}
+ end
+
+ def handle_call({:buffer, row_binary}, _from, state) do
+ new_buffer_size = state.buffer_size + IO.iodata_length(row_binary)
+ :file.write(state.fd, row_binary)
+
+ if new_buffer_size < unquote(max_buffer_size) do
+ %{state | buffer_size: new_buffer_size}
+ else
+ flush(state)
+ end
+ end
+end
+```
+
+See [tests](../test/ch/on_disk_buffer_test.exs) for more.
+
+TODO: notes on using it in docker and "surviving" restarts
diff --git a/lib/ch.ex b/lib/ch.ex
index dcc3002..a26a3fb 100644
--- a/lib/ch.ex
+++ b/lib/ch.ex
@@ -55,7 +55,6 @@ defmodule Ch do
| {:command, Ch.Query.command()}
| {:headers, [{String.t(), String.t()}]}
| {:format, String.t()}
- | {:decode, boolean}
| DBConnection.connection_option()
@doc """
@@ -71,8 +70,7 @@ defmodule Ch do
* `:timeout` - Configures both query request timeout and HTTP receive timeout in milliseconds, whichever happens faster
* `:command` - Command tag for the query
* `:headers` - Custom HTTP headers for the request
- * `:format` - Custom response format for the request
- * `:decode` - Whether to automatically decode the response
+ * `:format` - Custom response format for the request, if provided, the response is not decoded automatically
* [`DBConnection.connection_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:connection_option/0)
"""
@@ -105,13 +103,7 @@ defmodule Ch do
%Ch.Stream{conn: conn, query: query, params: params, opts: opts}
end
- # TODO drop
- @doc false
- @spec run(DBConnection.conn(), (DBConnection.t() -> any), Keyword.t()) :: any
- def run(conn, f, opts \\ []) when is_function(f, 1) do
- DBConnection.run(conn, f, opts)
- end
-
+ # TODO need it?
if Code.ensure_loaded?(Ecto.ParameterizedType) do
@behaviour Ecto.ParameterizedType
diff --git a/lib/ch/error.ex b/lib/ch/error.ex
index 9b427ee..5c2f313 100644
--- a/lib/ch/error.ex
+++ b/lib/ch/error.ex
@@ -1,5 +1,5 @@
defmodule Ch.Error do
@moduledoc "Error struct wrapping ClickHouse error responses."
defexception [:code, :message]
- @type t :: %__MODULE__{code: pos_integer | nil, message: String.t()}
+ @type t :: %__MODULE__{code: pos_integer | nil, message: binary}
end
diff --git a/lib/ch/http.ex b/lib/ch/http.ex
new file mode 100644
index 0000000..bc5fb4d
--- /dev/null
+++ b/lib/ch/http.ex
@@ -0,0 +1,2 @@
+defmodule Ch.HTTP do
+end
diff --git a/lib/ch/native.ex b/lib/ch/native.ex
new file mode 100644
index 0000000..96f7a2f
--- /dev/null
+++ b/lib/ch/native.ex
@@ -0,0 +1,3 @@
+defmodule Ch.Native do
+ @moduledoc false
+end
diff --git a/lib/ch/query.ex b/lib/ch/query.ex
index 4aaf5bb..ff3d513 100644
--- a/lib/ch/query.ex
+++ b/lib/ch/query.ex
@@ -1,16 +1,13 @@
defmodule Ch.Query do
@moduledoc "Query struct wrapping the SQL statement."
- defstruct [:statement, :command, :encode, :decode]
-
- @type t :: %__MODULE__{statement: iodata, command: command, encode: boolean, decode: boolean}
+ defstruct [:statement, :command]
+ @type t :: %__MODULE__{statement: iodata, command: command}
@doc false
@spec build(iodata, [Ch.query_option()]) :: t
def build(statement, opts \\ []) do
command = Keyword.get(opts, :command) || extract_command(statement)
- encode = Keyword.get(opts, :encode, true)
- decode = Keyword.get(opts, :decode, true)
- %__MODULE__{statement: statement, command: command, encode: encode, decode: decode}
+ %__MODULE__{statement: statement, command: command}
end
statements = [
diff --git a/lib/ch/result.ex b/lib/ch/result.ex
index db75fc6..86dfd0d 100644
--- a/lib/ch/result.ex
+++ b/lib/ch/result.ex
@@ -2,10 +2,9 @@ defmodule Ch.Result do
@moduledoc """
Result struct returned from any successful query. Its fields are:
- * `command` - An atom of the query command, for example: `:select`, `:insert`;
+ * `command` - An atom of the query command, for example: `:select`, `:insert`
* `rows` - A list of lists, each inner list corresponding to a row, each element in the inner list corresponds to a column
- * `num_rows` - The number of fetched or affected rows;
- * `headers` - The HTTP response headers
+ * `num_rows` - The number of fetched or affected rows
* `data` - The raw iodata from the response
"""
@@ -14,8 +13,7 @@ defmodule Ch.Result do
@type t :: %__MODULE__{
command: Ch.Query.command(),
num_rows: non_neg_integer | nil,
- rows: [[term]] | iodata | nil,
- headers: Mint.Types.headers(),
+ rows: [[term]] | nil,
data: iodata
}
end
diff --git a/lib/ch/row_binary.ex b/lib/ch/row_binary.ex
index d2081b7..afa1d05 100644
--- a/lib/ch/row_binary.ex
+++ b/lib/ch/row_binary.ex
@@ -1,6 +1,8 @@
defmodule Ch.RowBinary do
@moduledoc "Helpers for working with ClickHouse [`RowBinary`](https://clickhouse.com/docs/en/sql-reference/formats#rowbinary) format."
+ # TODO cleanup
+
# @compile {:bin_opt_info, true}
@dialyzer :no_improper_lists
diff --git a/lib/ch/ssl.ex b/lib/ch/ssl.ex
new file mode 100644
index 0000000..bd24c14
--- /dev/null
+++ b/lib/ch/ssl.ex
@@ -0,0 +1,3 @@
+defmodule Ch.SSL do
+ @moduledoc false
+end
diff --git a/lib/ch/stream.ex b/lib/ch/stream.ex
index 9ec8b5f..cd5fcd2 100644
--- a/lib/ch/stream.ex
+++ b/lib/ch/stream.ex
@@ -24,6 +24,7 @@ defmodule Ch.Stream do
def slice(_), do: {:error, __MODULE__}
end
+ # TODO optimize
defimpl Collectable do
def into(stream) do
%Ch.Stream{conn: conn, query: query, params: params, opts: opts} = stream
diff --git a/lib/ch/types.ex b/lib/ch/types.ex
index c9e7731..a877435 100644
--- a/lib/ch/types.ex
+++ b/lib/ch/types.ex
@@ -3,6 +3,8 @@ defmodule Ch.Types do
Helpers to turn ClickHouse types into Elixir terms for easier processing.
"""
+ # TODO cleanup
+
types =
[
{_encoded = "String", _decoded = :string, _args = []},
diff --git a/mix.exs b/mix.exs
index 1ac899c..5357beb 100644
--- a/mix.exs
+++ b/mix.exs
@@ -2,7 +2,7 @@ defmodule Ch.MixProject do
use Mix.Project
@source_url "https://github.com/plausible/ch"
- @version "0.2.8"
+ @version "0.3.0"
def project do
[
@@ -12,7 +12,7 @@ defmodule Ch.MixProject do
elixirc_paths: elixirc_paths(Mix.env()),
deps: deps(),
name: "Ch",
- description: "HTTP ClickHouse driver for Elixir",
+ description: "ClickHouse driver for Elixir",
docs: docs(),
package: package(),
source_url: @source_url
@@ -33,9 +33,9 @@ defmodule Ch.MixProject do
# Run "mix help deps" to learn about dependencies.
defp deps do
[
- {:mint, "~> 1.0"},
+ {:mint, "~> 1.0", optional: true},
{:db_connection, "~> 2.0"},
- {:jason, "~> 1.0"},
+ {:jason, "~> 1.0", optional: true},
{:decimal, "~> 2.0"},
{:ecto, "~> 3.12", optional: true},
{:benchee, "~> 1.0", only: [:bench]},
diff --git a/test/test_helper.exs b/test/test_helper.exs
index 5685699..325ab17 100644
--- a/test/test_helper.exs
+++ b/test/test_helper.exs
@@ -1,8 +1,8 @@
Calendar.put_time_zone_database(Tz.TimeZoneDatabase)
default_test_db = System.get_env("CH_DATABASE", "ch_elixir_test")
-{:ok, _} = Ch.Test.sql_exec("DROP DATABASE IF EXISTS #{default_test_db}")
-{:ok, _} = Ch.Test.sql_exec("CREATE DATABASE #{default_test_db}")
+Ch.HTTP.query!("DROP DATABASE IF EXISTS {db:Identifier}", %{"db" => default_test_db})
+Ch.HTTP.query!("CREATE DATABASE {db:Identifier}}", %{"db" => default_test_db})
Application.put_env(:ch, :database, default_test_db)
ExUnit.start(exclude: [:slow])
From b1fc1d3f209a7623025e17af56e8ac3d0ee50220 Mon Sep 17 00:00:00 2001
From: ruslandoga <67764432+ruslandoga@users.noreply.github.com>
Date: Fri, 18 Oct 2024 12:02:18 +0700
Subject: [PATCH 3/4] mv rows to query arg
---
lib/ch.ex | 24 +++++++++++-------------
lib/ch/connection.ex | 1 -
lib/ch/http.ex | 2 --
lib/ch/native.ex | 3 ---
lib/ch/ssl.ex | 3 ---
lib/ch/stream.ex | 2 +-
6 files changed, 12 insertions(+), 23 deletions(-)
delete mode 100644 lib/ch/http.ex
delete mode 100644 lib/ch/native.ex
delete mode 100644 lib/ch/ssl.ex
diff --git a/lib/ch.ex b/lib/ch.ex
index a26a3fb..4643fb1 100644
--- a/lib/ch.ex
+++ b/lib/ch.ex
@@ -7,14 +7,13 @@ defmodule Ch do
| {:username, String.t()}
| {:password, String.t()}
| {:settings, Keyword.t()}
- | {:timeout, timeout}
@type start_option ::
common_option
| {:scheme, String.t()}
| {:hostname, String.t()}
| {:port, :inet.port_number()}
- | {:transport_opts, :gen_tcp.connect_option() | :ssl.tls_client_option()}
+ | {:transport_opts, [:gen_tcp.connect_option() | :ssl.tls_client_option()]}
| DBConnection.start_option()
@doc """
@@ -30,8 +29,6 @@ defmodule Ch do
* `:username` - Username
* `:password` - User password
* `:settings` - Keyword list of ClickHouse settings to send wtih every query
- * `:timeout` - HTTP receive timeout in milliseconds
- * `:transport_opts` - options to be given to the transport being used. See `Mint.HTTP1.connect/4` for more info
* [`DBConnection.start_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:start_option/0)
"""
@@ -50,13 +47,18 @@ defmodule Ch do
DBConnection.child_spec(Connection, opts)
end
+ @type query :: iodata
+
@type query_option ::
common_option
| {:command, Ch.Query.command()}
| {:headers, [{String.t(), String.t()}]}
- | {:format, String.t()}
| DBConnection.connection_option()
+ @type query_params ::
+ %{(name :: String.t()) => value :: term}
+ | [{name :: String.t(), value :: term}]
+
@doc """
Runs a query and returns the result as `{:ok, %Ch.Result{}}` or
`{:error, Exception.t()}` if there was a database error.
@@ -67,16 +69,13 @@ defmodule Ch do
* `:username` - Username
* `:password` - User password
* `:settings` - Keyword list of settings to merge with `:settings` from `start_link` and send with this query
- * `:timeout` - Configures both query request timeout and HTTP receive timeout in milliseconds, whichever happens faster
- * `:command` - Command tag for the query
+ * `:command` - Command tag for the query like `:insert` or `:select`, to avoid extracting it from SQL. Used in some Ecto.Repo `:telemetry` events
* `:headers` - Custom HTTP headers for the request
- * `:format` - Custom response format for the request, if provided, the response is not decoded automatically
* [`DBConnection.connection_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:connection_option/0)
"""
- @spec query(DBConnection.conn(), iodata, params, [query_option]) ::
+ @spec query(DBConnection.conn(), query, query_params, [query_option]) ::
{:ok, Result.t()} | {:error, Exception.t()}
- when params: map | [term] | [row :: [term]] | iodata | Enumerable.t()
def query(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)
@@ -89,15 +88,14 @@ defmodule Ch do
Runs a query and returns the result or raises `Ch.Error` if
there was an error. See `query/4`.
"""
- @spec query!(DBConnection.conn(), iodata, params, [query_option]) :: Result.t()
- when params: map | [term] | [row :: [term]] | iodata | Enumerable.t()
+ @spec query!(DBConnection.conn(), query, query_params, [query_option]) :: Result.t()
def query!(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)
DBConnection.execute!(conn, query, params, opts)
end
@doc false
- @spec stream(DBConnection.t(), iodata, map | [term], [query_option]) :: Ch.Stream.t()
+ @spec stream(DBConnection.t(), query, query_params, [query_option]) :: Ch.Stream.t()
def stream(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)
%Ch.Stream{conn: conn, query: query, params: params, opts: opts}
diff --git a/lib/ch/connection.ex b/lib/ch/connection.ex
index 3742f47..e5ddef8 100644
--- a/lib/ch/connection.ex
+++ b/lib/ch/connection.ex
@@ -20,7 +20,6 @@ defmodule Ch.Connection do
with {:ok, conn} <- HTTP.connect(scheme, address, port, mint_opts) do
conn =
conn
- |> HTTP.put_private(:timeout, opts[:timeout] || :timer.seconds(15))
|> maybe_put_private(:database, opts[:database])
|> maybe_put_private(:username, opts[:username])
|> maybe_put_private(:password, opts[:password])
diff --git a/lib/ch/http.ex b/lib/ch/http.ex
deleted file mode 100644
index bc5fb4d..0000000
--- a/lib/ch/http.ex
+++ /dev/null
@@ -1,2 +0,0 @@
-defmodule Ch.HTTP do
-end
diff --git a/lib/ch/native.ex b/lib/ch/native.ex
deleted file mode 100644
index 96f7a2f..0000000
--- a/lib/ch/native.ex
+++ /dev/null
@@ -1,3 +0,0 @@
-defmodule Ch.Native do
- @moduledoc false
-end
diff --git a/lib/ch/ssl.ex b/lib/ch/ssl.ex
deleted file mode 100644
index bd24c14..0000000
--- a/lib/ch/ssl.ex
+++ /dev/null
@@ -1,3 +0,0 @@
-defmodule Ch.SSL do
- @moduledoc false
-end
diff --git a/lib/ch/stream.ex b/lib/ch/stream.ex
index cd5fcd2..ae2782f 100644
--- a/lib/ch/stream.ex
+++ b/lib/ch/stream.ex
@@ -8,7 +8,7 @@ defmodule Ch.Stream do
conn: DBConnection.conn(),
ref: Mint.Types.request_ref() | nil,
query: Ch.Query.t(),
- params: term,
+ params: Ch.query_params(),
opts: [Ch.query_option()]
}
From 779e4e1301e612fe355c8881359c5f6f3098804b Mon Sep 17 00:00:00 2001
From: ruslandoga <67764432+ruslandoga@users.noreply.github.com>
Date: Fri, 18 Oct 2024 12:28:36 +0700
Subject: [PATCH 4/4] mv rows to query arg
---
README.md | 293 +++++-------------------------------------------------
1 file changed, 23 insertions(+), 270 deletions(-)
diff --git a/README.md b/README.md
index 0d6a654..114aee9 100644
--- a/README.md
+++ b/README.md
@@ -3,19 +3,10 @@
[![Documentation badge](https://img.shields.io/badge/Documentation-ff69b4)](https://hexdocs.pm/ch)
[![Hex.pm badge](https://img.shields.io/badge/Package%20on%20hex.pm-informational)](https://hex.pm/packages/ch)
-Minimal ClickHouse client for Elixir.
+Minimal HTTP ClickHouse client for Elixir.
Used in [Ecto ClickHouse adapter.](https://github.com/plausible/ecto_ch)
-### Key features
-
-- Native query parameters
-- Per query settings
-- Minimal API
-- HTTP or Native
-- [Multinode support](./guides/multihost.md)
-- [Compression](./guides/compression.md)
-
## Installation
```elixir
@@ -28,179 +19,30 @@ end
## Usage
-#### Start [DBConnection](https://github.com/elixir-ecto/db_connection) pool
-
-```elixir
-defaults = [
- scheme: "http",
- hostname: "localhost",
- port: 8123,
- database: "default",
- settings: [],
- pool_size: 1,
- timeout: :timer.seconds(15)
-]
-
-{:ok, pid} = Ch.start_link(defaults)
-```
-
-#### Select rows
-
-```elixir
-{:ok, pid} = Ch.start_link()
-
-{:ok, %Ch.Result{rows: [[0], [1], [2]]}} =
- Ch.query(pid, "SELECT * FROM system.numbers LIMIT 3")
-
-{:ok, %Ch.Result{rows: [[0], [1], [2]]}} =
- Ch.query(pid, "SELECT * FROM system.numbers LIMIT {limit:UInt8}", %{"limit" => 3})
-```
-
-Note on datetime encoding in query parameters:
-
-- `%NaiveDateTime{}` is encoded as text to make it assume the column's or ClickHouse server's timezone
-- `%DateTime{time_zone: "Etc/UTC"}` is encoded as unix timestamp and is treated as UTC timestamp by ClickHouse
-- encoding non-UTC `%DateTime{}` requires a [time zone database](https://hexdocs.pm/elixir/1.17.1/DateTime.html#module-time-zone-database)
-
-#### Insert rows
-
-```elixir
-{:ok, pid} = Ch.start_link()
-
-Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")
-
-%Ch.Result{num_rows: 2} =
- Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES (0), (1)")
-
-%Ch.Result{num_rows: 2} =
- Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES ({a:UInt16}), ({b:UInt64})", %{"a" => 0, "b" => 1})
-
-%Ch.Result{num_rows: 2} =
- Ch.query!(pid, "INSERT INTO ch_demo(id) SELECT number FROM system.numbers LIMIT {limit:UInt8}", %{"limit" => 2})
-```
-
-#### Insert [RowBinary](https://clickhouse.com/docs/en/interfaces/formats#rowbinary)
-
-```elixir
-{:ok, pid} = Ch.start_link()
-
-Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64, text String) ENGINE Null")
+Please see the tests for usage examples:
-rows = [
- [0, "a"],
- [1, "b"]
-]
+- [Select rows](./tests/examples/select_rows_test.exs)
+- [Insert rows](./tests/examples/insert_rows_test.exs)
+- [Insert RowBinary](./tests/examples/insert_rowbinary_test.exs)
+- [Insert CSV](./tests/examples/insert_csv_test.exs)
+- [Insert compressed RowBinary](./tests/examples/insert_compressed_rowbinary_test.exs)
+- [Insert chunked RowBinary](./tests/examples/insert_chunked_rowbinary_test.exs)
+- [Insert chunked and compressed RowBinary](./tests/examples/insert_chunked_compressed_rowbinary_test.exs)
+- [Insert from a file](./tests/examples/insert_from_file_test.exs)
+- [Custom settings](./tests/custom_settings_test.exs)
+- [Custom headers](./tests/custom_headers_test.exs)
-types = ["UInt64", "String"]
-# or
-types = [Ch.Types.u64(), Ch.Types.string()]
-# or
-types = [:u64, :string]
-
-rowbinary = Ch.RowBinary.encode_rows(rows, types)
-
-%Ch.Result{num_rows: 2} =
- Ch.query!(pid, ["INSERT INTO ch_demo(id, text) FORMAT RowBinary\n" | rowbinary])
-```
-
-Similarly, you can use [`RowBinaryWithNamesAndTypes`](https://clickhouse.com/docs/en/interfaces/formats#rowbinarywithnamesandtypes) which would additionally do something not quite unlike a type check.
-
-```elixir
-sql = "INSERT INTO ch_demo FORMAT RowBinaryWithNamesAndTypes\n"
-
-rows = [
- [0, "a"],
- [1, "b"]
-]
-
-types = ["UInt64", "String"]
-names = ["id", "text"]
-
-rowbinary_with_names_and_types = [
- Ch.RowBinary.encode_names_and_types(names, types),
- Ch.RowBinary.encode_rows(rows, types)
-]
-
-%Ch.Result{num_rows: 2} =
- Ch.query!(pid, [sql | rowbinary_with_names_and_types])
-```
-
-And you can use buffer helpers too. They are available for RowBinary, RowBinaryWithNamesAndTypes, and Native formats.
-
-```elixir
-buffer = Ch.RowBinary.new_buffer(_types = ["UInt64", "String"])
-buffer = Ch.RowBinary.push_buffer(buffer, [[0, "a"], [1, "b"]])
-rowbinary = Ch.RowBinary.buffer_to_iodata(buffer)
-
-%Ch.Result{num_rows: 2} =
- Ch.query!(pid, ["INSERT INTO ch_demo(id, text) FORMAT RowBinary\n" | rowbinary])
-```
-
-#### Insert rows in some other [format](https://clickhouse.com/docs/en/interfaces/formats)
-
-```elixir
-{:ok, pid} = Ch.start_link()
-
-Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64, text String) ENGINE Null")
-
-csv =
- """
- 0,"a"\n
- 1,"b"\
- """
-
-%Ch.Result{num_rows: 2} =
- Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT CSV\n" | csv])
-```
-
-#### Insert [chunked](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) RowBinary stream
-
-```elixir
-{:ok, pid} = Ch.start_link()
-
-Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")
-
-DBConnection.run(pid, fn conn ->
- Stream.repeatedly(fn -> [:rand.uniform(100)] end)
- |> Stream.chunk_every(100_000)
- |> Stream.map(fn chunk -> Ch.RowBinary.encode_rows(chunk, _types = ["UInt64"]) end)
- |> Stream.take(10)
- |> Stream.into(Ch.stream(conn, "INSERT INTO ch_demo(id) FORMAT RowBinary\n"))
- |> Stream.run()
-end)
-```
-
-This query makes a [`transfer-encoding: chunked`] HTTP request while unfolding the stream resulting in lower memory usage.
-
-#### Stream from a file
-
-```elixir
-{:ok, pid} = Ch.start_link()
-
-Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")
-
-DBConnection.run(pid, fn conn ->
- File.stream!("buffer.tmp", _bytes = 2048)
- |> Stream.into(Ch.stream(conn, "INSERT INTO ch_demo(id) FORMAT RowBinary\n"))
- |> Stream.run()
-end)
-```
-
-#### Query with custom [settings](https://clickhouse.com/docs/en/operations/settings/settings)
-
-```elixir
-{:ok, pid} = Ch.start_link()
+## Caveats
-settings = [async_insert: 1]
+#### Timestamps in query parameters
-%Ch.Result{rows: [["async_insert", "Bool", "0"]]} =
- Ch.query!(pid, "SHOW SETTINGS LIKE 'async_insert'")
+> [!WARNING]
+> - `%NaiveDateTime{}` is encoded as text to make it assume the column's or ClickHouse server's timezone
+> - `%DateTime{time_zone: "Etc/UTC"}` is encoded as a unix timestamp and is treated as UTC timestamp by ClickHouse
+> - `%DateTime{time_zone: time_zone}` is shifted to `"Etc/UTC"` and then encoded as a unix timestamp, this requires a [time zone database](https://hexdocs.pm/elixir/1.17.1/DateTime.html#module-time-zone-database) to be configured
-%Ch.Result{rows: [["async_insert", "Bool", "1"]]} =
- Ch.query!(pid, "SHOW SETTINGS LIKE 'async_insert'", _params = [], settings: settings)
-```
-## Caveats
+TODO: See test.
#### NULL in RowBinary
@@ -208,112 +50,23 @@ It's the same as in [`ch-go`](https://clickhouse.com/docs/en/integrations/go#nul
> At insert time, Nil can be passed for both the normal and Nullable version of a column. For the former, the default value for the type will be persisted, e.g., an empty string for string. For the nullable version, a NULL value will be stored in ClickHouse.
-```elixir
-{:ok, pid} = Ch.start_link()
-
-Ch.query!(pid, """
-CREATE TABLE ch_nulls (
- a UInt8 NULL,
- b UInt8 DEFAULT 10,
- c UInt8 NOT NULL
-) ENGINE Memory
-""")
-
-types = ["Nullable(UInt8)", "UInt8", "UInt8"]
-row = [nil, nil, nil]
-rowbinary = Ch.RowBinary.encode_row(row, types)
-
-%Ch.Result{num_rows: 1} =
- Ch.query!(pid, ["INSERT INTO ch_nulls(a, b, c) FORMAT RowBinary\n" | rowbinary])
-
-%Ch.Result{rows: [[nil, _not_10 = 0, 0]]} =
- Ch.query!(pid, "SELECT * FROM ch_nulls")
-```
-
-Note that in this example `DEFAULT 10` is ignored and `0` (the default value for `UInt8`) is persisted instead.
-
-However, [`input()`](https://clickhouse.com/docs/en/sql-reference/table-functions/input) can be used as a workaround:
-
-```elixir
-sql = """
-INSERT INTO ch_nulls
- SELECT * FROM input('a Nullable(UInt8), b Nullable(UInt8), c UInt8')
- FORMAT RowBinary
-"""
-
-types = ["Nullable(UInt8)", "Nullable(UInt8)", "UInt8"]
-rowbinary = Ch.RowBinary.encode_row(row, types)
-
-%Ch.Result{num_rows: 1} =
- Ch.query!(pid, [sql | rowbinary])
-
-%Ch.Result{rows: [_before = [0], _after = [10]]} =
- Ch.query!(pid, "SELECT b FROM ch_nulls ORDER BY b")
-```
+TODO: See test.
#### UTF-8 in RowBinary
When decoding [`String`](https://clickhouse.com/docs/en/sql-reference/data-types/string) columns non UTF-8 characters are replaced with `�` (U+FFFD). This behaviour is similar to [`toValidUTF8`](https://clickhouse.com/docs/en/sql-reference/functions/string-functions#tovalidutf8) and [JSON format.](https://clickhouse.com/docs/en/interfaces/formats#json)
-```elixir
-{:ok, pid} = Ch.start_link()
-
-Ch.query!(pid, "CREATE TABLE ch_utf8(str String) ENGINE Memory")
-
-rowbinary = Ch.RowBinary.encode(:string, "\x61\xF0\x80\x80\x80b")
-
-%Ch.Result{num_rows: 1} =
- Ch.query!(pid, ["INSERT INTO ch_utf8(str) FORMAT RowBinary\n" | rowbinary])
-
-%Ch.Result{rows: [["a�b"]]} =
- Ch.query!(pid, "SELECT * FROM ch_utf8")
-
-%Ch.Result{rows: %{"data" => [["a�b"]]}} =
- pid |> Ch.query!("SELECT * FROM ch_utf8 FORMAT JSONCompact") |> Map.update!(:rows, &Jason.decode!/1)
-```
+TODO: See test.
#### Timezones in RowBinary
Decoding non-UTC datetimes like `DateTime('Asia/Taipei')` requires a [timezone database.](https://hexdocs.pm/elixir/DateTime.html#module-time-zone-database)
-```elixir
-Mix.install([:ch, :tz])
-
-:ok = Calendar.put_time_zone_database(Tz.TimeZoneDatabase)
-
-{:ok, pid} = Ch.start_link()
-
-%Ch.Result{rows: [[~N[2023-04-25 17:45:09]]]} =
- Ch.query!(pid, "SELECT CAST(now() as DateTime)")
-
-%Ch.Result{rows: [[~U[2023-04-25 17:45:11Z]]]} =
- Ch.query!(pid, "SELECT CAST(now() as DateTime('UTC'))")
-
-%Ch.Result{rows: [[%DateTime{time_zone: "Asia/Taipei"} = taipei]]} =
- Ch.query!(pid, "SELECT CAST(now() as DateTime('Asia/Taipei'))")
-
-"2023-04-26 01:45:12+08:00 CST Asia/Taipei" = to_string(taipei)
-```
+TODO: See test.
Encoding non-UTC datetimes is possible but slow.
-```elixir
-Ch.query!(pid, "CREATE TABLE ch_datetimes(datetime DateTime) ENGINE Null")
-
-naive = NaiveDateTime.utc_now()
-utc = DateTime.utc_now()
-taipei = DateTime.shift_zone!(utc, "Asia/Taipei")
-
-rows = [
- [naive],
- [utc],
- [taipei]
-]
-
-types = ["DateTime"]
-
-Ch.RowBinary.encode_rows(rows, types)
-```
+TODO: See test.
## Benchmarks