Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ settings = [async_insert: 1]
Ch.query!(pid, "SHOW SETTINGS LIKE 'async_insert'", [], settings: settings)
```

#### Multipart requests

SELECT queries will be automatically sent as multipart requests.
INSERT queries and streams are treated normally.

## Caveats

#### NULL in RowBinary
Expand Down
87 changes: 87 additions & 0 deletions lib/ch/encode/multipart.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
defmodule Ch.Encode.Multipart do
@moduledoc false

alias Ch.Encode.Parameters

@doc """
Encodes a query statement and params into a multipart request.
"""
@spec encode(iodata, map, [Ch.query_option()]) ::
{list, Mint.Types.headers(), iodata}
def encode(statement, params, opts) do
types = Keyword.get(opts, :types)
settings = Keyword.get(opts, :settings, [])
default_format = if types, do: "RowBinary", else: "RowBinaryWithNamesAndTypes"
format = Keyword.get(opts, :format) || default_format

boundary = "ChFormBoundary" <> Base.url_encode64(:crypto.strong_rand_bytes(24))
content_type = "multipart/form-data; boundary=\"#{boundary}\""
enc_boundary = "--#{boundary}\r\n"

multipart =
params
|> multipart_params(enc_boundary)
|> add_multipart_part("query", statement, enc_boundary)
|> then(&[&1 | "--#{boundary}--\r\n"])

headers = [{"x-clickhouse-format", format}, {"content-type", content_type} | headers(opts)]

{settings, headers, multipart}
end

defp multipart_params(params, boundary) when is_map(params) do
multipart_named_params(Map.to_list(params), boundary, [])
end

defp multipart_params(params, boundary) when is_list(params) do
multipart_positional_params(params, 0, boundary, [])
end

defp multipart_named_params([{name, value} | params], boundary, acc) do
acc =
add_multipart_part(
acc,
"param_" <> URI.encode_www_form(name),
Parameters.encode(value),
boundary
)

multipart_named_params(params, boundary, acc)
end

defp multipart_named_params([], _boundary, acc), do: acc

defp multipart_positional_params([value | params], idx, boundary, acc) do
acc =
add_multipart_part(
acc,
"param_$" <> Integer.to_string(idx),
Parameters.encode(value),
boundary
)

multipart_positional_params(params, idx + 1, boundary, acc)
end

defp multipart_positional_params([], _idx, _boundary, acc), do: acc

@compile inline: [add_multipart_part: 4]
defp add_multipart_part(multipart, name, value, boundary) do
part = [
boundary,
"content-disposition: form-data; name=\"",
name,
"\"\r\n\r\n",
value,
"\r\n"
]

case multipart do
[] -> part
_ -> [multipart | part]
end
end

@spec headers(Keyword.t()) :: Mint.Types.headers()
defp headers(opts), do: Keyword.get(opts, :headers, [])
end
113 changes: 113 additions & 0 deletions lib/ch/encode/parameters.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
defmodule Ch.Encode.Parameters do
@moduledoc false

@doc """
Encodes a map/list of parameters into a list of clickhouse parameter tuples.

The format is `[{"param_<key>", "<value>"}, ...]`.
"""
@spec encode_many(map | [term]) :: [{String.t(), String.t()}]
def encode_many(params) when is_map(params) do
Enum.map(params, fn {k, v} -> {"param_#{k}", encode(v)} end)
end

def encode_many(params) when is_list(params) do
params
|> Enum.with_index()
|> Enum.map(fn {v, idx} -> {"param_$#{idx}", encode(v)} end)
end

@doc """
Encodes a clickhouse parameter to a string.
"""
@spec encode(term) :: binary
def encode(n) when is_integer(n), do: Integer.to_string(n)
def encode(f) when is_float(f), do: Float.to_string(f)

# TODO possibly speed up
# For more info see
# https://clickhouse.com/docs/en/interfaces/http#tabs-in-url-parameters
# "escaped" format is the same as https://clickhouse.com/docs/en/interfaces/formats#tabseparated-data-formatting
def encode(b) when is_binary(b) do
escape_param([{"\\", "\\\\"}, {"\t", "\\\t"}, {"\n", "\\\n"}], b)
end

def encode(b) when is_boolean(b), do: Atom.to_string(b)
def encode(nil), do: "\\N"
def encode(%Decimal{} = d), do: Decimal.to_string(d, :normal)
def encode(%Date{} = date), do: Date.to_iso8601(date)
def encode(%NaiveDateTime{} = naive), do: NaiveDateTime.to_iso8601(naive)
def encode(%Time{} = time), do: Time.to_iso8601(time)

def encode(%DateTime{microsecond: microsecond} = dt) do
dt = DateTime.shift_zone!(dt, "Etc/UTC")

case microsecond do
{val, precision} when val > 0 and precision > 0 ->
size = round(:math.pow(10, precision))
unix = DateTime.to_unix(dt, size)
seconds = div(unix, size)
fractional = rem(unix, size)

IO.iodata_to_binary([
Integer.to_string(seconds),
?.,
String.pad_leading(Integer.to_string(fractional), precision, "0")
])

_ ->
dt |> DateTime.to_unix(:second) |> Integer.to_string()
end
end

def encode(tuple) when is_tuple(tuple) do
IO.iodata_to_binary([?(, encode_array_params(Tuple.to_list(tuple)), ?)])
end

def encode(a) when is_list(a) do
IO.iodata_to_binary([?[, encode_array_params(a), ?]])
end

def encode(m) when is_map(m) do
IO.iodata_to_binary([?{, encode_map_params(Map.to_list(m)), ?}])
end

defp encode_array_params([last]), do: encode_array_param(last)

defp encode_array_params([s | rest]) do
[encode_array_param(s), ?, | encode_array_params(rest)]
end

defp encode_array_params([] = empty), do: empty

defp encode_map_params([last]), do: encode_map_param(last)

defp encode_map_params([kv | rest]) do
[encode_map_param(kv), ?, | encode_map_params(rest)]
end

defp encode_map_params([] = empty), do: empty

defp encode_array_param(s) when is_binary(s) do
[?', escape_param([{"'", "''"}, {"\\", "\\\\"}], s), ?']
end

defp encode_array_param(nil), do: "null"

defp encode_array_param(%s{} = param) when s in [Date, NaiveDateTime] do
[?', encode(param), ?']
end

defp encode_array_param(v), do: encode(v)

defp encode_map_param({k, v}) do
[encode_array_param(k), ?:, encode_array_param(v)]
end

defp escape_param([{pattern, replacement} | escapes], param) do
param = String.replace(param, pattern, replacement)
escape_param(escapes, param)
end

defp escape_param([], param), do: param
end
108 changes: 3 additions & 105 deletions lib/ch/query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ end

defimpl DBConnection.Query, for: Ch.Query do
alias Ch.{Query, Result, RowBinary}
alias Ch.Encode.{Multipart, Parameters}

@spec parse(Query.t(), [Ch.query_option()]) :: Query.t()
def parse(query, _opts), do: query
Expand Down Expand Up @@ -124,15 +125,12 @@ defimpl DBConnection.Query, for: Ch.Query do
{_query_params = [], headers(opts), [statement, ?\n | data]}

true ->
{query_params(params), headers(opts), statement}
{Parameters.encode_many(params), headers(opts), statement}
end
end

def encode(%Query{statement: statement}, params, opts) do
types = Keyword.get(opts, :types)
default_format = if types, do: "RowBinary", else: "RowBinaryWithNamesAndTypes"
format = Keyword.get(opts, :format) || default_format
{query_params(params), [{"x-clickhouse-format", format} | headers(opts)], statement}
Multipart.encode(statement, params, opts)
end

defp format_row_binary?(statement) when is_binary(statement) do
Expand Down Expand Up @@ -208,106 +206,6 @@ defimpl DBConnection.Query, for: Ch.Query do
end
end

defp query_params(params) when is_map(params) do
Enum.map(params, fn {k, v} -> {"param_#{k}", encode_param(v)} end)
end

defp query_params(params) when is_list(params) do
params
|> Enum.with_index()
|> Enum.map(fn {v, idx} -> {"param_$#{idx}", encode_param(v)} end)
end

defp encode_param(n) when is_integer(n), do: Integer.to_string(n)
defp encode_param(f) when is_float(f), do: Float.to_string(f)

# TODO possibly speed up
# For more info see
# https://clickhouse.com/docs/en/interfaces/http#tabs-in-url-parameters
# "escaped" format is the same as https://clickhouse.com/docs/en/interfaces/formats#tabseparated-data-formatting
defp encode_param(b) when is_binary(b) do
escape_param([{"\\", "\\\\"}, {"\t", "\\\t"}, {"\n", "\\\n"}], b)
end

defp encode_param(b) when is_boolean(b), do: Atom.to_string(b)
defp encode_param(nil), do: "\\N"
defp encode_param(%Decimal{} = d), do: Decimal.to_string(d, :normal)
defp encode_param(%Date{} = date), do: Date.to_iso8601(date)
defp encode_param(%NaiveDateTime{} = naive), do: NaiveDateTime.to_iso8601(naive)
defp encode_param(%Time{} = time), do: Time.to_iso8601(time)

defp encode_param(%DateTime{microsecond: microsecond} = dt) do
dt = DateTime.shift_zone!(dt, "Etc/UTC")

case microsecond do
{val, precision} when val > 0 and precision > 0 ->
size = round(:math.pow(10, precision))
unix = DateTime.to_unix(dt, size)
seconds = div(unix, size)
fractional = rem(unix, size)

IO.iodata_to_binary([
Integer.to_string(seconds),
?.,
String.pad_leading(Integer.to_string(fractional), precision, "0")
])

_ ->
dt |> DateTime.to_unix(:second) |> Integer.to_string()
end
end

defp encode_param(tuple) when is_tuple(tuple) do
IO.iodata_to_binary([?(, encode_array_params(Tuple.to_list(tuple)), ?)])
end

defp encode_param(a) when is_list(a) do
IO.iodata_to_binary([?[, encode_array_params(a), ?]])
end

defp encode_param(m) when is_map(m) do
IO.iodata_to_binary([?{, encode_map_params(Map.to_list(m)), ?}])
end

defp encode_array_params([last]), do: encode_array_param(last)

defp encode_array_params([s | rest]) do
[encode_array_param(s), ?, | encode_array_params(rest)]
end

defp encode_array_params([] = empty), do: empty

defp encode_map_params([last]), do: encode_map_param(last)

defp encode_map_params([kv | rest]) do
[encode_map_param(kv), ?, | encode_map_params(rest)]
end

defp encode_map_params([] = empty), do: empty

defp encode_array_param(s) when is_binary(s) do
[?', escape_param([{"'", "''"}, {"\\", "\\\\"}], s), ?']
end

defp encode_array_param(nil), do: "null"

defp encode_array_param(%s{} = param) when s in [Date, NaiveDateTime] do
[?', encode_param(param), ?']
end

defp encode_array_param(v), do: encode_param(v)

defp encode_map_param({k, v}) do
[encode_array_param(k), ?:, encode_array_param(v)]
end

defp escape_param([{pattern, replacement} | escapes], param) do
param = String.replace(param, pattern, replacement)
escape_param(escapes, param)
end

defp escape_param([], param), do: param

@spec headers(Keyword.t()) :: Mint.Types.headers()
defp headers(opts), do: Keyword.get(opts, :headers, [])
end
Expand Down