diff --git a/bench/query.exs b/bench/query.exs new file mode 100644 index 00000000..b05e999d --- /dev/null +++ b/bench/query.exs @@ -0,0 +1,66 @@ +Benchee.run( + %{ + "query string encode" => fn input -> + DBConnection.Query.Ch.Query.query_string_encode(input.query, input.params, input.opts) + end, + "multipart encode" => fn input -> + DBConnection.Query.Ch.Query.multipart_encode(input.query, input.params, input.opts) + end, + "custom multipart encode" => fn input -> + DBConnection.Query.Ch.Query.custom_multipart_encode(input.query, input.params, input.opts) + end + }, + inputs: %{ + "0 params" => %{ + query: %Ch.Query{ + statement: "select 1", + command: :select, + encode: true, + decode: true + }, + params: [], + opts: [] + }, + "1 named param" => %{ + query: %Ch.Query{ + statement: "select {a:UInt8}", + command: :select, + encode: true, + decode: true + }, + params: %{"a" => 1}, + opts: [] + }, + "10 named params" => %{ + query: %Ch.Query{ + statement: "select " <> Enum.map_join(1..10, ", ", &"{a#{&1}:UInt8}"), + command: :select, + encode: true, + decode: true + }, + params: Map.new(1..10, &{"a#{&1}", &1}), + opts: [] + }, + "10 positional params" => %{ + query: %Ch.Query{ + statement: "select " <> Enum.map_join(1..10, ", ", &"{$#{&1}:UInt8}"), + command: :select, + encode: true, + decode: true + }, + params: Enum.to_list(1..10), + opts: [] + }, + "100 positional params" => %{ + query: %Ch.Query{ + statement: "select " <> Enum.map_join(1..100, ", ", &"{$#{&1}:UInt8}"), + command: :select, + encode: true, + decode: true + }, + params: Enum.to_list(1..100), + opts: [] + } + } + # profile_after: true +) diff --git a/lib/ch/query.ex b/lib/ch/query.ex index ac19e811..f0fb340d 100644 --- a/lib/ch/query.ex +++ b/lib/ch/query.ex @@ -128,13 +128,52 @@ defimpl DBConnection.Query, for: Ch.Query do end end - def encode(%Query{statement: statement}, params, opts) do + def encode(%Query{} = q, params, opts) do + custom_multipart_encode(q, params, opts) + end + + def query_string_encode(%Query{statement: statement}, params, opts) do types = Keyword.get(opts, :types) default_format = if types, do: "RowBinary", else: "RowBinaryWithNamesAndTypes" format = Keyword.get(opts, :format) || default_format {query_params(params), [{"x-clickhouse-format", format} | headers(opts)], statement} end + def multipart_encode(%Query{statement: statement}, params, opts) do + types = Keyword.get(opts, :types) + default_format = if types, do: "RowBinary", else: "RowBinaryWithNamesAndTypes" + format = Keyword.get(opts, :format) || default_format + + form = + query_params(params) + |> Enum.reduce(Multipart.new(), fn {k, v}, acc -> + Multipart.add_part(acc, Multipart.Part.text_field(v, k)) + end) + |> Multipart.add_part(Multipart.Part.text_field(IO.iodata_to_binary(statement), "query")) + + content_type = Multipart.content_type(form, "multipart/form-data") + + {_no_query_params = [], + [{"x-clickhouse-format", format}, {"content-type", content_type} | headers(opts)], + Multipart.body_binary(form)} + end + + def custom_multipart_encode(%Query{statement: statement}, params, opts) do + types = Keyword.get(opts, :types) + default_format = if types, do: "RowBinary", else: "RowBinaryWithNamesAndTypes" + format = Keyword.get(opts, :format) || default_format + + boundary = "ChFormBoundary" <> Base.url_encode64(:crypto.strong_rand_bytes(24)) + content_type = "multipart/form-data; boundary=\"#{boundary}\"" + enc_boundary = "--#{boundary}\r\n" + multipart = multipart_params(params, enc_boundary) + multipart = add_multipart_part(multipart, "query", statement, enc_boundary) + multipart = [multipart | "--#{boundary}--\r\n"] + + {_no_query_params = [], + [{"x-clickhouse-format", format}, {"content-type", content_type} | headers(opts)], multipart} + end + defp format_row_binary?(statement) when is_binary(statement) do statement |> String.trim_trailing() |> String.ends_with?("RowBinary") end @@ -208,6 +247,59 @@ defimpl DBConnection.Query, for: Ch.Query do end end + defp multipart_params(params, boundary) when is_map(params) do + multipart_named_params(Map.to_list(params), boundary, []) + end + + defp multipart_params(params, boundary) when is_list(params) do + multipart_positional_params(params, 0, boundary, []) + end + + defp multipart_named_params([{name, value} | params], boundary, acc) do + acc = + add_multipart_part( + acc, + "param_" <> URI.encode_www_form(name), + encode_param(value), + boundary + ) + + multipart_named_params(params, boundary, acc) + end + + defp multipart_named_params([], _boundary, acc), do: acc + + defp multipart_positional_params([value | params], idx, boundary, acc) do + acc = + add_multipart_part( + acc, + "param_$" <> Integer.to_string(idx), + encode_param(value), + boundary + ) + + multipart_positional_params(params, idx + 1, boundary, acc) + end + + defp multipart_positional_params([], _idx, _boundary, acc), do: acc + + @compile inline: [add_multipart_part: 4] + defp add_multipart_part(multipart, name, value, boundary) do + part = [ + boundary, + "content-disposition: form-data; name=\"", + name, + "\"\r\n\r\n", + value, + "\r\n" + ] + + case multipart do + [] -> part + _ -> [multipart | part] + end + end + defp query_params(params) when is_map(params) do Enum.map(params, fn {k, v} -> {"param_#{k}", encode_param(v)} end) end diff --git a/mix.exs b/mix.exs index 48ff3013..f087d599 100644 --- a/mix.exs +++ b/mix.exs @@ -45,7 +45,8 @@ defmodule Ch.MixProject do {:benchee, "~> 1.0", only: [:bench]}, {:dialyxir, "~> 1.0", only: [:dev], runtime: false}, {:ex_doc, ">= 0.0.0", only: :docs}, - {:tz, "~> 0.28.1", only: [:test]} + {:tz, "~> 0.28.1", only: [:test]}, + {:multipart, "~> 0.4.0"} ] end diff --git a/mix.lock b/mix.lock index 0e57a37c..94448ac1 100644 --- a/mix.lock +++ b/mix.lock @@ -13,7 +13,9 @@ "makeup": {:hex, :makeup, "1.2.1", "e90ac1c65589ef354378def3ba19d401e739ee7ee06fb47f94c687016e3713d1", [:mix], [{:nimble_parsec, "~> 1.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d36484867b0bae0fea568d10131197a4c2e47056a6fbe84922bf6ba71c8d17ce"}, "makeup_elixir": {:hex, :makeup_elixir, "1.0.1", "e928a4f984e795e41e3abd27bfc09f51db16ab8ba1aebdba2b3a575437efafc2", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "7284900d412a3e5cfd97fdaed4f5ed389b8f2b4cb49efc0eb3bd10e2febf9507"}, "makeup_erlang": {:hex, :makeup_erlang, "1.0.2", "03e1804074b3aa64d5fad7aa64601ed0fb395337b982d9bcf04029d68d51b6a7", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "af33ff7ef368d5893e4a267933e7744e46ce3cf1f61e2dccf53a111ed3aa3727"}, + "mime": {:hex, :mime, "2.0.7", "b8d739037be7cd402aee1ba0306edfdef982687ee7e9859bee6198c1e7e2f128", [:mix], [], "hexpm", "6171188e399ee16023ffc5b76ce445eb6d9672e2e241d2df6050f3c771e80ccd"}, "mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"}, + "multipart": {:hex, :multipart, "0.4.0", "634880a2148d4555d050963373d0e3bbb44a55b2badd87fa8623166172e9cda0", [:mix], [{:mime, "~> 1.2 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}], "hexpm", "3c5604bc2fb17b3137e5d2abdf5dacc2647e60c5cc6634b102cf1aef75a06f0a"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"}, "statistex": {:hex, :statistex, "1.1.0", "7fec1eb2f580a0d2c1a05ed27396a084ab064a40cfc84246dbfb0c72a5c761e5", [:mix], [], "hexpm", "f5950ea26ad43246ba2cce54324ac394a4e7408fdcf98b8e230f503a0cba9cf5"}, "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"},