diff --git a/README.md b/README.md index 5d44e0b..0bece3e 100644 --- a/README.md +++ b/README.md @@ -161,6 +161,11 @@ settings = [async_insert: 1] Ch.query!(pid, "SHOW SETTINGS LIKE 'async_insert'", [], settings: settings) ``` +#### Multipart requests + +SELECT queries will be automatically sent as multipart requests. +INSERT queries and streams are treated normally. + ## Caveats #### NULL in RowBinary diff --git a/lib/ch/encode/multipart.ex b/lib/ch/encode/multipart.ex new file mode 100644 index 0000000..c058fde --- /dev/null +++ b/lib/ch/encode/multipart.ex @@ -0,0 +1,87 @@ +defmodule Ch.Encode.Multipart do + @moduledoc false + + alias Ch.Encode.Parameters + + @doc """ + Encodes a query statement and params into a multipart request. + """ + @spec encode(iodata, map, [Ch.query_option()]) :: + {list, Mint.Types.headers(), iodata} + def encode(statement, params, opts) do + types = Keyword.get(opts, :types) + settings = Keyword.get(opts, :settings, []) + default_format = if types, do: "RowBinary", else: "RowBinaryWithNamesAndTypes" + format = Keyword.get(opts, :format) || default_format + + boundary = "ChFormBoundary" <> Base.url_encode64(:crypto.strong_rand_bytes(24)) + content_type = "multipart/form-data; boundary=\"#{boundary}\"" + enc_boundary = "--#{boundary}\r\n" + + multipart = + params + |> multipart_params(enc_boundary) + |> add_multipart_part("query", statement, enc_boundary) + |> then(&[&1 | "--#{boundary}--\r\n"]) + + headers = [{"x-clickhouse-format", format}, {"content-type", content_type} | headers(opts)] + + {settings, headers, multipart} + end + + defp multipart_params(params, boundary) when is_map(params) do + multipart_named_params(Map.to_list(params), boundary, []) + end + + defp multipart_params(params, boundary) when is_list(params) do + multipart_positional_params(params, 0, boundary, []) + end + + defp multipart_named_params([{name, value} | params], boundary, acc) do + acc = + add_multipart_part( + acc, + "param_" <> URI.encode_www_form(name), + Parameters.encode(value), + boundary + ) + + multipart_named_params(params, boundary, acc) + end + + defp multipart_named_params([], _boundary, acc), do: acc + + defp multipart_positional_params([value | params], idx, boundary, acc) do + acc = + add_multipart_part( + acc, + "param_$" <> Integer.to_string(idx), + Parameters.encode(value), + boundary + ) + + multipart_positional_params(params, idx + 1, boundary, acc) + end + + defp multipart_positional_params([], _idx, _boundary, acc), do: acc + + @compile inline: [add_multipart_part: 4] + defp add_multipart_part(multipart, name, value, boundary) do + part = [ + boundary, + "content-disposition: form-data; name=\"", + name, + "\"\r\n\r\n", + value, + "\r\n" + ] + + case multipart do + [] -> part + _ -> [multipart | part] + end + end + + @spec headers(Keyword.t()) :: Mint.Types.headers() + defp headers(opts), do: Keyword.get(opts, :headers, []) +end diff --git a/lib/ch/encode/parameters.ex b/lib/ch/encode/parameters.ex new file mode 100644 index 0000000..0ac0828 --- /dev/null +++ b/lib/ch/encode/parameters.ex @@ -0,0 +1,113 @@ +defmodule Ch.Encode.Parameters do + @moduledoc false + + @doc """ + Encodes a map/list of parameters into a list of clickhouse parameter tuples. + + The format is `[{"param_", ""}, ...]`. + """ + @spec encode_many(map | [term]) :: [{String.t(), String.t()}] + def encode_many(params) when is_map(params) do + Enum.map(params, fn {k, v} -> {"param_#{k}", encode(v)} end) + end + + def encode_many(params) when is_list(params) do + params + |> Enum.with_index() + |> Enum.map(fn {v, idx} -> {"param_$#{idx}", encode(v)} end) + end + + @doc """ + Encodes a clickhouse parameter to a string. + """ + @spec encode(term) :: binary + def encode(n) when is_integer(n), do: Integer.to_string(n) + def encode(f) when is_float(f), do: Float.to_string(f) + + # TODO possibly speed up + # For more info see + # https://clickhouse.com/docs/en/interfaces/http#tabs-in-url-parameters + # "escaped" format is the same as https://clickhouse.com/docs/en/interfaces/formats#tabseparated-data-formatting + def encode(b) when is_binary(b) do + escape_param([{"\\", "\\\\"}, {"\t", "\\\t"}, {"\n", "\\\n"}], b) + end + + def encode(b) when is_boolean(b), do: Atom.to_string(b) + def encode(nil), do: "\\N" + def encode(%Decimal{} = d), do: Decimal.to_string(d, :normal) + def encode(%Date{} = date), do: Date.to_iso8601(date) + def encode(%NaiveDateTime{} = naive), do: NaiveDateTime.to_iso8601(naive) + def encode(%Time{} = time), do: Time.to_iso8601(time) + + def encode(%DateTime{microsecond: microsecond} = dt) do + dt = DateTime.shift_zone!(dt, "Etc/UTC") + + case microsecond do + {val, precision} when val > 0 and precision > 0 -> + size = round(:math.pow(10, precision)) + unix = DateTime.to_unix(dt, size) + seconds = div(unix, size) + fractional = rem(unix, size) + + IO.iodata_to_binary([ + Integer.to_string(seconds), + ?., + String.pad_leading(Integer.to_string(fractional), precision, "0") + ]) + + _ -> + dt |> DateTime.to_unix(:second) |> Integer.to_string() + end + end + + def encode(tuple) when is_tuple(tuple) do + IO.iodata_to_binary([?(, encode_array_params(Tuple.to_list(tuple)), ?)]) + end + + def encode(a) when is_list(a) do + IO.iodata_to_binary([?[, encode_array_params(a), ?]]) + end + + def encode(m) when is_map(m) do + IO.iodata_to_binary([?{, encode_map_params(Map.to_list(m)), ?}]) + end + + defp encode_array_params([last]), do: encode_array_param(last) + + defp encode_array_params([s | rest]) do + [encode_array_param(s), ?, | encode_array_params(rest)] + end + + defp encode_array_params([] = empty), do: empty + + defp encode_map_params([last]), do: encode_map_param(last) + + defp encode_map_params([kv | rest]) do + [encode_map_param(kv), ?, | encode_map_params(rest)] + end + + defp encode_map_params([] = empty), do: empty + + defp encode_array_param(s) when is_binary(s) do + [?', escape_param([{"'", "''"}, {"\\", "\\\\"}], s), ?'] + end + + defp encode_array_param(nil), do: "null" + + defp encode_array_param(%s{} = param) when s in [Date, NaiveDateTime] do + [?', encode(param), ?'] + end + + defp encode_array_param(v), do: encode(v) + + defp encode_map_param({k, v}) do + [encode_array_param(k), ?:, encode_array_param(v)] + end + + defp escape_param([{pattern, replacement} | escapes], param) do + param = String.replace(param, pattern, replacement) + escape_param(escapes, param) + end + + defp escape_param([], param), do: param +end diff --git a/lib/ch/query.ex b/lib/ch/query.ex index ac19e81..0e1cd78 100644 --- a/lib/ch/query.ex +++ b/lib/ch/query.ex @@ -73,6 +73,7 @@ end defimpl DBConnection.Query, for: Ch.Query do alias Ch.{Query, Result, RowBinary} + alias Ch.Encode.{Multipart, Parameters} @spec parse(Query.t(), [Ch.query_option()]) :: Query.t() def parse(query, _opts), do: query @@ -124,15 +125,12 @@ defimpl DBConnection.Query, for: Ch.Query do {_query_params = [], headers(opts), [statement, ?\n | data]} true -> - {query_params(params), headers(opts), statement} + {Parameters.encode_many(params), headers(opts), statement} end end def encode(%Query{statement: statement}, params, opts) do - types = Keyword.get(opts, :types) - default_format = if types, do: "RowBinary", else: "RowBinaryWithNamesAndTypes" - format = Keyword.get(opts, :format) || default_format - {query_params(params), [{"x-clickhouse-format", format} | headers(opts)], statement} + Multipart.encode(statement, params, opts) end defp format_row_binary?(statement) when is_binary(statement) do @@ -208,106 +206,6 @@ defimpl DBConnection.Query, for: Ch.Query do end end - defp query_params(params) when is_map(params) do - Enum.map(params, fn {k, v} -> {"param_#{k}", encode_param(v)} end) - end - - defp query_params(params) when is_list(params) do - params - |> Enum.with_index() - |> Enum.map(fn {v, idx} -> {"param_$#{idx}", encode_param(v)} end) - end - - defp encode_param(n) when is_integer(n), do: Integer.to_string(n) - defp encode_param(f) when is_float(f), do: Float.to_string(f) - - # TODO possibly speed up - # For more info see - # https://clickhouse.com/docs/en/interfaces/http#tabs-in-url-parameters - # "escaped" format is the same as https://clickhouse.com/docs/en/interfaces/formats#tabseparated-data-formatting - defp encode_param(b) when is_binary(b) do - escape_param([{"\\", "\\\\"}, {"\t", "\\\t"}, {"\n", "\\\n"}], b) - end - - defp encode_param(b) when is_boolean(b), do: Atom.to_string(b) - defp encode_param(nil), do: "\\N" - defp encode_param(%Decimal{} = d), do: Decimal.to_string(d, :normal) - defp encode_param(%Date{} = date), do: Date.to_iso8601(date) - defp encode_param(%NaiveDateTime{} = naive), do: NaiveDateTime.to_iso8601(naive) - defp encode_param(%Time{} = time), do: Time.to_iso8601(time) - - defp encode_param(%DateTime{microsecond: microsecond} = dt) do - dt = DateTime.shift_zone!(dt, "Etc/UTC") - - case microsecond do - {val, precision} when val > 0 and precision > 0 -> - size = round(:math.pow(10, precision)) - unix = DateTime.to_unix(dt, size) - seconds = div(unix, size) - fractional = rem(unix, size) - - IO.iodata_to_binary([ - Integer.to_string(seconds), - ?., - String.pad_leading(Integer.to_string(fractional), precision, "0") - ]) - - _ -> - dt |> DateTime.to_unix(:second) |> Integer.to_string() - end - end - - defp encode_param(tuple) when is_tuple(tuple) do - IO.iodata_to_binary([?(, encode_array_params(Tuple.to_list(tuple)), ?)]) - end - - defp encode_param(a) when is_list(a) do - IO.iodata_to_binary([?[, encode_array_params(a), ?]]) - end - - defp encode_param(m) when is_map(m) do - IO.iodata_to_binary([?{, encode_map_params(Map.to_list(m)), ?}]) - end - - defp encode_array_params([last]), do: encode_array_param(last) - - defp encode_array_params([s | rest]) do - [encode_array_param(s), ?, | encode_array_params(rest)] - end - - defp encode_array_params([] = empty), do: empty - - defp encode_map_params([last]), do: encode_map_param(last) - - defp encode_map_params([kv | rest]) do - [encode_map_param(kv), ?, | encode_map_params(rest)] - end - - defp encode_map_params([] = empty), do: empty - - defp encode_array_param(s) when is_binary(s) do - [?', escape_param([{"'", "''"}, {"\\", "\\\\"}], s), ?'] - end - - defp encode_array_param(nil), do: "null" - - defp encode_array_param(%s{} = param) when s in [Date, NaiveDateTime] do - [?', encode_param(param), ?'] - end - - defp encode_array_param(v), do: encode_param(v) - - defp encode_map_param({k, v}) do - [encode_array_param(k), ?:, encode_array_param(v)] - end - - defp escape_param([{pattern, replacement} | escapes], param) do - param = String.replace(param, pattern, replacement) - escape_param(escapes, param) - end - - defp escape_param([], param), do: param - @spec headers(Keyword.t()) :: Mint.Types.headers() defp headers(opts), do: Keyword.get(opts, :headers, []) end