diff --git a/README.md b/README.md index a0ed146..9223514 100644 --- a/README.md +++ b/README.md @@ -12,8 +12,8 @@ The struct implements the `Table.Reader` protocol and thus can be efficiently tr ```elixir Mix.install([ - {:req, "~> 0.3.5"}, - {:req_athena, "~> 0.1.3"} + {:req, "~> 0.5.8"}, + {:req_athena, "~> 0.3.0"} ]) opts = [ @@ -24,7 +24,7 @@ opts = [ output_location: "s3://my-bucket" ] -req = Req.new() |> ReqAthena.attach(opts) +req = ReqAthena.new(opts) # Create table from Registry of Open Data on AWS # See: https://registry.opendata.aws/osm/ @@ -48,7 +48,7 @@ STORED AS ORCFILE LOCATION 's3://osm-pds/planet/'; """ -Req.post!(req, athena: query).body +ReqAthena.query!(req, query).body # => # %{ # "Output" => "", @@ -63,7 +63,7 @@ Req.post!(req, athena: query).body # With plain string query query = "SELECT id, type, tags, members, timestamp, visible FROM planet WHERE id = 470454 and type = 'relation'" -Req.post!(req, athena: query, format: :json).body +ReqAthena.query!(req, query, [], format: :json).body # => # [ # %{ @@ -90,7 +90,7 @@ Req.post!(req, athena: query, format: :json).body # With parameterized query query = "SELECT id, type FROM planet WHERE id = ? and type = ?" -Req.post!(req, athena: {query, [239_970_142, "node"]}).body +ReqAthena.query!(req, query, [239_970_142, "node"], format: :json).body #=> [%{"id" => 239970142, "type" => "node"}] ``` diff --git a/lib/req_athena.ex b/lib/req_athena.ex index 065db04..088c9df 100644 --- a/lib/req_athena.ex +++ b/lib/req_athena.ex @@ -31,7 +31,7 @@ defmodule ReqAthena do defguardp is_empty(value) when value in [nil, ""] @doc """ - Attaches to Req request. + Builds a new `%Req.Request{}` for Athena requests. ## Request Options @@ -55,14 +55,14 @@ defmodule ReqAthena do * `:format` - Optional. The output format. Can be one of: * `:none` (default) - return decoded API response from Athena. - + * `:csv` - return contents of the CSV file. - + * `:json` - return contents of the JSON file. - + Note: Req by default automatically decodes JSON response body ([`decode_body`](Req.Steps.decode_body1/) step) and to prevent it from doing so, set `decode_body: false`. - + * `:explorer` - return contents in parquet format, lazy loaded into Explorer data frame. @@ -77,21 +77,36 @@ defmodule ReqAthena do or a tuple with `{format, level}`, like: `{"ZSTD", 4}`. By default this is `nil`, which means that for Parquet (the format that Explorer uses) this is going to be `"gzip"`. - * `:athena` - Required. The query to execute. It can be a plain SQL string or - a `{query, params}` tuple, where `query` can contain `?` placeholders and `params` - is a list of corresponding values. - - There is a limitation of Athena that requires the `:output_location` to be present - for every query that outputs to a format other than "CSV". So we append "results" - to the `:output_location` to make the partition files be saved there. + There is a limitation of Athena that requires the `:output_location` to be present + for every query that outputs to a format other than "CSV". So we append "results" + to the `:output_location` to make the partition files be saved there. Conditional fields must always be defined, and can be one of the fields or both. + """ + @spec new(keyword()) :: Req.Request.t() + def new(opts \\ []) do + attach(Req.new(), opts) + end + + defp attach(%Request{} = request, opts) do + request + |> Request.prepend_request_steps(athena_run: &run/1) + |> Request.register_options(@allowed_options) + |> Request.merge_options(opts) + |> maybe_put_aws_credentials() + end + + @doc """ + Performs a query against the Athena API. + + The SQL query can container `?` placeholders and `sql_query_params` + is a list of corresponding values. - If you want to set any of these options when attaching the plugin, pass them as the second argument. + This function accepts the same options as `new/1`. ## Examples - With plain query string: + With plain query: iex> opts = [ ...> access_key_id: System.fetch_env!("AWS_ACCESS_KEY_ID"), @@ -100,9 +115,9 @@ defmodule ReqAthena do ...> database: "default", ...> output_location: System.fetch_env!("AWS_ATHENA_OUTPUT_LOCATION") ...> ] + iex> req = ReqAthena.new(opts) iex> query = "SELECT id, type, tags, members, timestamp, visible FROM planet WHERE id = 470454 and type = 'relation'" - iex> req = Req.new() |> ReqAthena.attach(opts) - iex> Req.post!(req, athena: query, format: :json).body + iex> ReqAthena.query!(req, query, [], format: :json).body %{ "id" => 470454, "members" => [ @@ -132,47 +147,56 @@ defmodule ReqAthena do ...> database: "default", ...> output_location: System.fetch_env!("AWS_ATHENA_OUTPUT_LOCATION") ...> ] + iex> req = ReqAthena.new(opts) iex> query = "SELECT id, type FROM planet WHERE id = ? and type = ?" - iex> req = Req.new() |> ReqAthena.attach(opts) - iex> Req.post!(req, athena: {query, [239_970_142, "node"]}, format: :json).body + iex> ReqAthena.query!(req, query, [239_970_142, "node"], format: :json).body [%{"id" => 239970142, "type" => "node"}] """ - def attach(%Request{} = request, options \\ []) do - request - |> Request.prepend_request_steps(athena_run: &run/1) - |> Request.register_options(@allowed_options) - |> Request.merge_options(options) - |> maybe_put_aws_credentials() + @spec query(Req.Request.t(), binary(), list(), Keyword.t()) :: + {:ok, Req.Response.t()} | {:error, Exception.t()} + def query(req, sql_query, sql_query_params \\ [], opts \\ []) + + def query(%Req.Request{} = req, sql_query, sql_query_params, opts) + when is_binary(sql_query) and is_list(sql_query_params) and is_list(opts) do + req + |> attach(opts) + |> put_request_body({sql_query, sql_query_params}) + |> Req.post() + end + + @doc """ + Same as `query/4`, but raises in case of error. + """ + @spec query!(Req.Request.t(), binary(), list(), Keyword.t()) :: Req.Response.t() + def query!(req, sql_query, sql_query_params \\ [], opts \\ []) + + def query!(%Req.Request{} = req, sql_query, sql_query_params, opts) do + case query(req, sql_query, sql_query_params, opts) do + {:ok, response} -> response + {:error, exception} -> raise exception + end end defp run(%Request{private: %{athena_action: _}} = request), do: request defp run(request) do - if query = request.options[:athena] do - region = Request.fetch_option!(request, :region) + region = Request.fetch_option!(request, :region) - url = "https://athena.#{region}.amazonaws.com" - cache_query = Request.get_option(request, :cache_query, true) + url = "https://athena.#{region}.amazonaws.com" - %{request | url: URI.parse(url)} - |> put_request_body(query, cache_query) - |> sign_request("StartQueryExecution") - |> Request.append_response_steps(athena_result: &handle_athena_result/1) - else - request - end + %{request | url: URI.parse(url)} + |> sign_request("StartQueryExecution") + |> Request.append_response_steps(athena_result: &handle_athena_result/1) end - defp put_request_body(request, query, cache_query) when is_binary(query) do - put_request_body(request, %ReqAthena.Query{query: query}, cache_query) + defp put_request_body(request, {query, []}) do + put_request_body(request, %ReqAthena.Query{query: query}) end - defp put_request_body(request, {query, []}, cache_query) do - put_request_body(request, %ReqAthena.Query{query: query}, cache_query) - end + defp put_request_body(request, {query, params}) do + cache_query = Request.get_option(request, :cache_query, true) - defp put_request_body(request, {query, params}, cache_query) do hash = if cache_query do query |> :erlang.md5() |> Base.encode16() @@ -182,10 +206,12 @@ defmodule ReqAthena do query = %ReqAthena.Query{query: query, params: params, statement_name: "query_" <> hash} - put_request_body(request, query, cache_query) + put_request_body(request, query) end - defp put_request_body(request, %ReqAthena.Query{} = query, cache_query) do + defp put_request_body(request, %ReqAthena.Query{} = query) do + cache_query = Request.get_option(request, :cache_query, true) + output_config = case {request.options[:output_location], request.options[:workgroup]} do {output, workgroup} when is_empty(output) and is_empty(workgroup) -> @@ -386,7 +412,7 @@ defmodule ReqAthena do |> Explorer.DataFrame.concat_rows() end else - defp build_lazy_frame(parquet_locations, aws_credentials) do + defp build_lazy_frame(_parquet_locations, _aws_credentials) do raise ArgumentError, "format: :explorer - you need to install Explorer as a dependency in order to use this format" end @@ -469,7 +495,7 @@ defmodule ReqAthena do current_request_steps: Keyword.keys(request.request_steps) } - Request.halt(request, Req.post!(request, athena: prepared_query)) + Request.halt(request, Req.post!(put_request_body(request, prepared_query))) end # TODO: Add step `put_aws_sigv4` to Req diff --git a/test/integration_test.exs b/test/integration_test.exs index 1c4e2ca..c196b4d 100644 --- a/test/integration_test.exs +++ b/test/integration_test.exs @@ -35,17 +35,18 @@ defmodule IntegrationTest do # create table req = - Req.new(http_errors: :raise) - |> ReqAthena.attach(opts) + ReqAthena.new(opts) + |> Req.merge(http_errors: :raise) - response = Req.post!(req, athena: @create_table) + response = ReqAthena.query!(req, @create_table) assert response.status == 200 # query single row from planet table assert query_response = - Req.post!(req, - athena: """ + ReqAthena.query!( + req, + """ SELECT id, type, tags, members, timestamp, visible FROM planet WHERE id = 470454 @@ -266,22 +267,24 @@ defmodule IntegrationTest do # create table req = - Req.new(http_errors: :raise) - |> ReqAthena.attach(opts) + ReqAthena.new(opts) + |> Req.merge(http_errors: :raise) - response = Req.post!(req, athena: @create_table) + response = ReqAthena.query!(req, @create_table) assert response.status == 200 # query single row from planet table assert query_response = - Req.post!(req, - athena: """ + ReqAthena.query!( + req, + """ SELECT id, type, tags, members, timestamp, visible FROM planet WHERE id = 470454 and type = 'relation' """, + [], format: :explorer ) @@ -345,22 +348,24 @@ defmodule IntegrationTest do # create table req = - Req.new(http_errors: :raise) - |> ReqAthena.attach(opts) + ReqAthena.new(opts) + |> Req.merge(http_errors: :raise) - response = Req.post!(req, athena: @create_table) + response = ReqAthena.query!(req, @create_table) assert response.status == 200 # query single row from planet table assert query_response = - Req.post!(req, - athena: """ + ReqAthena.query!( + req, + """ SELECT id, type, tags, members, timestamp, visible FROM planet WHERE id = 470454 and type = 'relation' """, + [], format: :explorer, decode_body: false ) @@ -385,22 +390,24 @@ defmodule IntegrationTest do # create table req = - Req.new(http_errors: :raise) - |> ReqAthena.attach(opts) + ReqAthena.new(opts) + |> Req.merge(http_errors: :raise) - response = Req.post!(req, athena: @create_table) + response = ReqAthena.query!(req, @create_table) assert response.status == 200 # query single row from planet table assert query_response = - Req.post!(req, - athena: """ + ReqAthena.query!( + req, + """ SELECT id, type, tags, members, timestamp, visible FROM planet WHERE id = 470454 and type = 'relation' """, + [], format: :csv ) @@ -425,22 +432,24 @@ defmodule IntegrationTest do # create table req = - Req.new(http_errors: :raise) - |> ReqAthena.attach(opts) + ReqAthena.new(opts) + |> Req.merge(http_errors: :raise) - response = Req.post!(req, athena: @create_table) + response = ReqAthena.query!(req, @create_table) assert response.status == 200 # query single row from planet table assert query_response = - Req.post!(req, - athena: """ + ReqAthena.query!( + req, + """ SELECT id, type, tags, members, timestamp, visible FROM planet WHERE id = 470454 and type = 'relation' """, + [], format: :csv, decode_body: false ) @@ -463,22 +472,24 @@ defmodule IntegrationTest do # create table req = - Req.new(http_errors: :raise) - |> ReqAthena.attach(opts) + ReqAthena.new(opts) + |> Req.merge(http_errors: :raise) - response = Req.post!(req, athena: @create_table) + response = ReqAthena.query!(req, @create_table) assert response.status == 200 # query single row from planet table assert query_response = - Req.post!(req, - athena: """ + ReqAthena.query!( + req, + """ SELECT id, type, tags, members, timestamp, visible FROM planet WHERE (id in (470454, 470455)) and type = 'relation' """, + [], format: :json ) @@ -539,22 +550,24 @@ defmodule IntegrationTest do # create table req = - Req.new(http_errors: :raise) - |> ReqAthena.attach(opts) + ReqAthena.new(opts) + |> Req.merge(http_errors: :raise) - response = Req.post!(req, athena: @create_table) + response = ReqAthena.query!(req, @create_table) assert response.status == 200 # query single row from planet table assert query_response = - Req.post!(req, - athena: """ + ReqAthena.query!( + req, + """ SELECT id, type, tags, members, timestamp, visible FROM planet WHERE id = 470454 and type = 'relation' """, + [], format: :json, decode_body: false ) @@ -579,19 +592,20 @@ defmodule IntegrationTest do # create table req = - Req.new(http_errors: :raise) - |> ReqAthena.attach(opts) + ReqAthena.new(opts) + |> Req.merge(http_errors: :raise) - response = Req.post!(req, athena: @create_table) + response = ReqAthena.query!(req, @create_table) assert response.status == 200 # query single row from planet table assert query_response = - Req.post!(req, - format: :json, - athena: - {"SELECT id, type FROM planet WHERE id = ? and type = ?", [239_970_142, "node"]} + ReqAthena.query!( + req, + "SELECT id, type FROM planet WHERE id = ? and type = ?", + [239_970_142, "node"], + format: :json ) assert query_response.status == 200 @@ -608,8 +622,8 @@ defmodule IntegrationTest do output_location: System.fetch_env!("AWS_ATHENA_OUTPUT_LOCATION") ] - req = Req.new() |> ReqAthena.attach(opts) - response = Req.post!(req, athena: {"SELECT ? + 10", ["foo"]}) + req = ReqAthena.new(opts) + response = ReqAthena.query!(req, "SELECT ? + 10", ["foo"]) assert response.status == 200 assert response.body["QueryExecution"]["Status"]["State"] == "FAILED" @@ -619,11 +633,13 @@ defmodule IntegrationTest do assert_raise RuntimeError, "failed query with error: line 1:8: Column 'foo' cannot be resolved", - fn -> Req.post!(req, http_errors: :raise, athena: "SELECT foo") end + fn -> ReqAthena.query!(req, "SELECT foo", [], http_errors: :raise) end assert_raise RuntimeError, "failed query with error: line 1:11: '+' cannot be applied to varchar(3), integer", - fn -> Req.post!(req, http_errors: :raise, athena: {"SELECT ? + 10", ["foo"]}) end + fn -> + ReqAthena.query!(req, "SELECT ? + 10", ["foo"], http_errors: :raise) + end end test "creates table inside AWS Athena's database with session credentials" do @@ -636,8 +652,8 @@ defmodule IntegrationTest do output_location: System.fetch_env!("AWS_ATHENA_OUTPUT_LOCATION") ] - req = Req.new(http_errors: :raise) |> ReqAthena.attach(opts) - response = Req.post!(req, athena: @create_table) + req = ReqAthena.new(opts) |> Req.merge(http_errors: :raise) + response = ReqAthena.query!(req, @create_table) assert response.status == 200 end @@ -652,8 +668,8 @@ defmodule IntegrationTest do # workgroup: "primary" # ] - # req = Req.new(http_errors: :raise) |> ReqAthena.attach(opts) - # response = Req.post!(req, athena: @create_table) + # req = ReqAthena.new(opts) |> Req.merge(http_errors: :raise) + # response = ReqAthena.query!(req, @create_table) # assert response.status == 200 # end @@ -668,8 +684,8 @@ defmodule IntegrationTest do output_location: System.fetch_env!("AWS_ATHENA_OUTPUT_LOCATION") ] - req = Req.new(http_errors: :raise) |> ReqAthena.attach(opts) - response = Req.post!(req, athena: @create_table) + req = ReqAthena.new(opts) |> Req.merge(http_errors: :raise) + response = ReqAthena.query!(req, @create_table) assert %{} = response.body assert response.status == 200 @@ -685,8 +701,8 @@ defmodule IntegrationTest do ] req = - Req.new(http_errors: :raise) - |> ReqAthena.attach(opts) + ReqAthena.new(opts) + |> Req.merge(http_errors: :raise) query = """ SELECT id, type, tags, members, timestamp, visible @@ -695,12 +711,12 @@ defmodule IntegrationTest do and type = 'relation'\ """ - assert query_response = Req.post!(req, athena: query) + assert query_response = ReqAthena.query!(req, query) assert query_response.status == 200 result = query_response.body - assert response = Req.post!(req, athena: query) + assert response = ReqAthena.query!(req, query) assert response.status == 200 assert result == response.body @@ -717,8 +733,8 @@ defmodule IntegrationTest do ] req = - Req.new(http_errors: :raise) - |> ReqAthena.attach(opts) + ReqAthena.new(opts) + |> Req.merge(http_errors: :raise) query = """ SELECT id, type, tags, members, timestamp, visible @@ -727,12 +743,12 @@ defmodule IntegrationTest do and type = 'relation'\ """ - assert query_response = Req.post!(req, athena: query) + assert query_response = ReqAthena.query!(req, query) assert query_response.status == 200 result = query_response.body - assert response = Req.post!(req, athena: query) + assert response = ReqAthena.query!(req, query) assert response.status == 200 assert result == response.body @@ -771,8 +787,8 @@ defmodule IntegrationTest do output_location: System.fetch_env!("AWS_ATHENA_OUTPUT_LOCATION") ] - req = Req.new(http_errors: :raise) |> ReqAthena.attach(opts) - response = Req.post!(req, athena: @create_table) + req = ReqAthena.new(opts) |> Req.merge(http_errors: :raise) + response = ReqAthena.query!(req, @create_table) assert response.status == 200 end @@ -792,8 +808,8 @@ defmodule IntegrationTest do output_location: System.fetch_env!("AWS_ATHENA_OUTPUT_LOCATION") ] - req = Req.new(http_errors: :raise) |> ReqAthena.attach(opts) - response = Req.post!(req, athena: @create_table) + req = ReqAthena.new(opts) |> Req.merge(http_errors: :raise) + response = ReqAthena.query!(req, @create_table) assert response.status == 200 end @@ -813,8 +829,8 @@ defmodule IntegrationTest do output_location: System.fetch_env!("AWS_ATHENA_OUTPUT_LOCATION") ] - req = Req.new(http_errors: :raise) |> ReqAthena.attach(opts) - response = Req.post!(req, athena: @create_table) + req = ReqAthena.new(opts) |> Req.merge(http_errors: :raise) + response = ReqAthena.query!(req, @create_table) assert response.status == 200 end @@ -830,8 +846,8 @@ defmodule IntegrationTest do output_location: System.fetch_env!("AWS_ATHENA_OUTPUT_LOCATION") ] - req = Req.new(http_errors: :raise) |> ReqAthena.attach(opts) - response = Req.post!(req, athena: @create_table) + req = ReqAthena.new(opts) |> Req.merge(http_errors: :raise) + response = ReqAthena.query!(req, @create_table) assert response.status == 200 end diff --git a/test/req_athena_test.exs b/test/req_athena_test.exs index c8a2396..4677db0 100644 --- a/test/req_athena_test.exs +++ b/test/req_athena_test.exs @@ -29,10 +29,10 @@ defmodule ReqAthenaTest do } assert response = - Req.new(adapter: fake_athena(request_validations)) + ReqAthena.new(opts) + |> Req.merge(adapter: fake_athena(request_validations)) |> Req.Request.put_header("x-auth", "my awesome auth header") - |> ReqAthena.attach(opts) - |> Req.post!(athena: "select * from iris") + |> ReqAthena.query!("select * from iris") assert response.status == 200 @@ -135,12 +135,13 @@ defmodule ReqAthenaTest do } assert response = - Req.new( + ReqAthena.new(opts) + |> Req.merge( adapter: fake_athena(request_validations), - headers: [x_auth: "my awesome auth header"] + headers: [x_auth: "my awesome auth header"], + decode_body: false ) - |> ReqAthena.attach(opts) - |> Req.post!(athena: "select * from iris", decode_body: false) + |> ReqAthena.query!("select * from iris") assert response.status == 200 @@ -281,9 +282,9 @@ defmodule ReqAthenaTest do ] assert response = - Req.new(adapter: fake_athena(validations, results)) - |> ReqAthena.attach(opts) - |> Req.post!(athena: {"select * from iris where id = ?", [1]}) + ReqAthena.new(opts) + |> Req.merge(adapter: fake_athena(validations, results)) + |> ReqAthena.query!("select * from iris where id = ?", [1]) assert response.status == 200 @@ -318,9 +319,9 @@ defmodule ReqAthenaTest do ] response = - Req.new(adapter: fake_athena(validations)) - |> ReqAthena.attach(opts) - |> Req.post!(athena: "select * from iris") + ReqAthena.new(opts) + |> Req.merge(adapter: fake_athena(validations)) + |> ReqAthena.query!("select * from iris") assert response.status == 200 assert is_map(response.body) @@ -389,9 +390,9 @@ defmodule ReqAthenaTest do ] assert response = - Req.new(adapter: fake_athena(validations, results)) - |> ReqAthena.attach(opts) - |> Req.post!(athena: "select * from iris") + ReqAthena.new(opts) + |> Req.merge(adapter: fake_athena(validations, results)) + |> ReqAthena.query!("select * from iris") assert response.status == 200 assert %{"ResultSet" => _} = response.body @@ -430,7 +431,8 @@ defmodule ReqAthenaTest do me = self() assert response = - Req.new(adapter: fake_athena(request_validations)) + ReqAthena.new(opts) + |> Req.merge(adapter: fake_athena(request_validations)) |> Req.Request.put_header("x-auth", "my awesome auth header") |> Req.Request.put_private(:athena_dataframe_builder, fn output_location, credentials, @@ -446,8 +448,7 @@ defmodule ReqAthenaTest do Explorer.DataFrame.new(id: [1, 2], name: ["Ale", "Wojtek"]) end) - |> ReqAthena.attach(opts) - |> Req.post!(athena: "select * from iris", format: :explorer) + |> ReqAthena.query!("select * from iris", [], format: :explorer) assert response.status == 200 @@ -477,7 +478,8 @@ defmodule ReqAthenaTest do } assert response = - Req.new(adapter: fake_athena(request_validations)) + ReqAthena.new(opts) + |> Req.merge(adapter: fake_athena(request_validations)) |> Req.Request.put_header("x-auth", "my awesome auth header") |> Req.Request.put_private(:athena_dataframe_builder, fn output_location, credentials, @@ -491,8 +493,7 @@ defmodule ReqAthenaTest do ["s3://foo/results/first"] end) - |> ReqAthena.attach(opts) - |> Req.post!(athena: "select * from iris", format: :explorer, decode_body: false) + |> ReqAthena.query!("select * from iris", [], format: :explorer, decode_body: false) assert response.status == 200 @@ -509,11 +510,11 @@ defmodule ReqAthenaTest do database: "my_awesome_database" ] - req = Req.new(adapter: fake_athena()) |> ReqAthena.attach(opts) + req = ReqAthena.new(opts) |> Req.merge(adapter: fake_athena()) assert_raise ArgumentError, "options must have :workgroup, :output_location or both defined", - fn -> Req.post!(req, athena: "select * from iris") end + fn -> ReqAthena.query!(req, "select * from iris") end end defp fake_athena, do: fake_athena(%{})