Skip to content

Commit

Permalink
Change the API to new + query (#43)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonatanklosko authored Dec 3, 2024
1 parent d19c65b commit fda00b6
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 143 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ The struct implements the `Table.Reader` protocol and thus can be efficiently tr

```elixir
Mix.install([
{:req, "~> 0.3.5"},
{:req_athena, "~> 0.1.3"}
{:req, "~> 0.5.8"},
{:req_athena, "~> 0.3.0"}
])

opts = [
Expand All @@ -24,7 +24,7 @@ opts = [
output_location: "s3://my-bucket"
]

req = Req.new() |> ReqAthena.attach(opts)
req = ReqAthena.new(opts)

# Create table from Registry of Open Data on AWS
# See: https://registry.opendata.aws/osm/
Expand All @@ -48,7 +48,7 @@ STORED AS ORCFILE
LOCATION 's3://osm-pds/planet/';
"""

Req.post!(req, athena: query).body
ReqAthena.query!(req, query).body
# =>
# %{
# "Output" => "",
Expand All @@ -63,7 +63,7 @@ Req.post!(req, athena: query).body
# With plain string query
query = "SELECT id, type, tags, members, timestamp, visible FROM planet WHERE id = 470454 and type = 'relation'"

Req.post!(req, athena: query, format: :json).body
ReqAthena.query!(req, query, [], format: :json).body
# =>
# [
# %{
Expand All @@ -90,7 +90,7 @@ Req.post!(req, athena: query, format: :json).body
# With parameterized query
query = "SELECT id, type FROM planet WHERE id = ? and type = ?"

Req.post!(req, athena: {query, [239_970_142, "node"]}).body
ReqAthena.query!(req, query, [239_970_142, "node"], format: :json).body
#=> [%{"id" => 239970142, "type" => "node"}]
```

Expand Down
116 changes: 71 additions & 45 deletions lib/req_athena.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ defmodule ReqAthena do
defguardp is_empty(value) when value in [nil, ""]

@doc """
Attaches to Req request.
Builds a new `%Req.Request{}` for Athena requests.
## Request Options
Expand All @@ -55,14 +55,14 @@ defmodule ReqAthena do
* `:format` - Optional. The output format. Can be one of:
* `:none` (default) - return decoded API response from Athena.
* `:csv` - return contents of the CSV file.
* `:json` - return contents of the JSON file.
Note: Req by default automatically decodes JSON response body ([`decode_body`](Req.Steps.decode_body1/) step)
and to prevent it from doing so, set `decode_body: false`.
* `:explorer` - return contents in parquet format, lazy loaded into Explorer data frame.
Expand All @@ -77,21 +77,36 @@ defmodule ReqAthena do
or a tuple with `{format, level}`, like: `{"ZSTD", 4}`. By default this is `nil`,
which means that for Parquet (the format that Explorer uses) this is going to be `"gzip"`.
* `:athena` - Required. The query to execute. It can be a plain SQL string or
a `{query, params}` tuple, where `query` can contain `?` placeholders and `params`
is a list of corresponding values.
There is a limitation of Athena that requires the `:output_location` to be present
for every query that outputs to a format other than "CSV". So we append "results"
to the `:output_location` to make the partition files be saved there.
There is a limitation of Athena that requires the `:output_location` to be present
for every query that outputs to a format other than "CSV". So we append "results"
to the `:output_location` to make the partition files be saved there.
Conditional fields must always be defined, and can be one of the fields or both.
"""
@spec new(keyword()) :: Req.Request.t()
def new(opts \\ []) do
attach(Req.new(), opts)
end

defp attach(%Request{} = request, opts) do
request
|> Request.prepend_request_steps(athena_run: &run/1)
|> Request.register_options(@allowed_options)
|> Request.merge_options(opts)
|> maybe_put_aws_credentials()
end

@doc """
Performs a query against the Athena API.
The SQL query can container `?` placeholders and `sql_query_params`
is a list of corresponding values.
If you want to set any of these options when attaching the plugin, pass them as the second argument.
This function accepts the same options as `new/1`.
## Examples
With plain query string:
With plain query:
iex> opts = [
...> access_key_id: System.fetch_env!("AWS_ACCESS_KEY_ID"),
Expand All @@ -100,9 +115,9 @@ defmodule ReqAthena do
...> database: "default",
...> output_location: System.fetch_env!("AWS_ATHENA_OUTPUT_LOCATION")
...> ]
iex> req = ReqAthena.new(opts)
iex> query = "SELECT id, type, tags, members, timestamp, visible FROM planet WHERE id = 470454 and type = 'relation'"
iex> req = Req.new() |> ReqAthena.attach(opts)
iex> Req.post!(req, athena: query, format: :json).body
iex> ReqAthena.query!(req, query, [], format: :json).body
%{
"id" => 470454,
"members" => [
Expand Down Expand Up @@ -132,47 +147,56 @@ defmodule ReqAthena do
...> database: "default",
...> output_location: System.fetch_env!("AWS_ATHENA_OUTPUT_LOCATION")
...> ]
iex> req = ReqAthena.new(opts)
iex> query = "SELECT id, type FROM planet WHERE id = ? and type = ?"
iex> req = Req.new() |> ReqAthena.attach(opts)
iex> Req.post!(req, athena: {query, [239_970_142, "node"]}, format: :json).body
iex> ReqAthena.query!(req, query, [239_970_142, "node"], format: :json).body
[%{"id" => 239970142, "type" => "node"}]
"""
def attach(%Request{} = request, options \\ []) do
request
|> Request.prepend_request_steps(athena_run: &run/1)
|> Request.register_options(@allowed_options)
|> Request.merge_options(options)
|> maybe_put_aws_credentials()
@spec query(Req.Request.t(), binary(), list(), Keyword.t()) ::
{:ok, Req.Response.t()} | {:error, Exception.t()}
def query(req, sql_query, sql_query_params \\ [], opts \\ [])

def query(%Req.Request{} = req, sql_query, sql_query_params, opts)
when is_binary(sql_query) and is_list(sql_query_params) and is_list(opts) do
req
|> attach(opts)
|> put_request_body({sql_query, sql_query_params})
|> Req.post()
end

@doc """
Same as `query/4`, but raises in case of error.
"""
@spec query!(Req.Request.t(), binary(), list(), Keyword.t()) :: Req.Response.t()
def query!(req, sql_query, sql_query_params \\ [], opts \\ [])

def query!(%Req.Request{} = req, sql_query, sql_query_params, opts) do
case query(req, sql_query, sql_query_params, opts) do
{:ok, response} -> response
{:error, exception} -> raise exception
end
end

defp run(%Request{private: %{athena_action: _}} = request), do: request

defp run(request) do
if query = request.options[:athena] do
region = Request.fetch_option!(request, :region)
region = Request.fetch_option!(request, :region)

url = "https://athena.#{region}.amazonaws.com"
cache_query = Request.get_option(request, :cache_query, true)
url = "https://athena.#{region}.amazonaws.com"

%{request | url: URI.parse(url)}
|> put_request_body(query, cache_query)
|> sign_request("StartQueryExecution")
|> Request.append_response_steps(athena_result: &handle_athena_result/1)
else
request
end
%{request | url: URI.parse(url)}
|> sign_request("StartQueryExecution")
|> Request.append_response_steps(athena_result: &handle_athena_result/1)
end

defp put_request_body(request, query, cache_query) when is_binary(query) do
put_request_body(request, %ReqAthena.Query{query: query}, cache_query)
defp put_request_body(request, {query, []}) do
put_request_body(request, %ReqAthena.Query{query: query})
end

defp put_request_body(request, {query, []}, cache_query) do
put_request_body(request, %ReqAthena.Query{query: query}, cache_query)
end
defp put_request_body(request, {query, params}) do
cache_query = Request.get_option(request, :cache_query, true)

defp put_request_body(request, {query, params}, cache_query) do
hash =
if cache_query do
query |> :erlang.md5() |> Base.encode16()
Expand All @@ -182,10 +206,12 @@ defmodule ReqAthena do

query = %ReqAthena.Query{query: query, params: params, statement_name: "query_" <> hash}

put_request_body(request, query, cache_query)
put_request_body(request, query)
end

defp put_request_body(request, %ReqAthena.Query{} = query, cache_query) do
defp put_request_body(request, %ReqAthena.Query{} = query) do
cache_query = Request.get_option(request, :cache_query, true)

output_config =
case {request.options[:output_location], request.options[:workgroup]} do
{output, workgroup} when is_empty(output) and is_empty(workgroup) ->
Expand Down Expand Up @@ -386,7 +412,7 @@ defmodule ReqAthena do
|> Explorer.DataFrame.concat_rows()
end
else
defp build_lazy_frame(parquet_locations, aws_credentials) do
defp build_lazy_frame(_parquet_locations, _aws_credentials) do
raise ArgumentError,
"format: :explorer - you need to install Explorer as a dependency in order to use this format"
end
Expand Down Expand Up @@ -469,7 +495,7 @@ defmodule ReqAthena do
current_request_steps: Keyword.keys(request.request_steps)
}

Request.halt(request, Req.post!(request, athena: prepared_query))
Request.halt(request, Req.post!(put_request_body(request, prepared_query)))
end

# TODO: Add step `put_aws_sigv4` to Req
Expand Down
Loading

0 comments on commit fda00b6

Please sign in to comment.