Skip to content

Commit

Permalink
Merge pull request #3 from nash-io/improve_circular_buffer
Browse files Browse the repository at this point in the history
Improve circular buffer
  • Loading branch information
adrienmo authored Dec 3, 2021
2 parents 3dfd372 + 4a7d9e5 commit e219b0f
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 33 deletions.
10 changes: 5 additions & 5 deletions lib/http_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ defmodule HttpClient do
def delete(url, headers, options), do: request(:delete, [url, headers, options])

defp request(method, [url | _] = args) do
:ok = RateLimiter.rate_limit(url)

fn -> apply(impl(), method, args) end
|> :timer.tc()
|> Instrumenter.instrument(url)
RateLimiter.rate_limit(url, fn ->
fn -> apply(impl(), method, args) end
|> :timer.tc()
|> Instrumenter.instrument(url)
end)
end

defp impl, do: Application.get_env(:http_client, :http_client_impl, HTTPoison)
Expand Down
14 changes: 14 additions & 0 deletions lib/http_client/application.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
defmodule HttpClient.Application do
@moduledoc """
Application module that start supervision tree.
It is only used to start an agent that will initialize an ETS table
"""
use Application
alias HttpClient.RateLimiter

@impl Application
def start(_type, _args) do
config = [{Agent, fn -> RateLimiter.init() end}]
Supervisor.start_link(config, strategy: :one_for_one, name: HttpClient.Supervisor)
end
end
159 changes: 140 additions & 19 deletions lib/http_client/rate_limiter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,52 @@ defmodule HttpClient.RateLimiter do
"google.com": [{:timer.seconds(1), 5}, {:timer.hours(24), 500_000}]
"""

@spec rate_limit(String.t()) :: :ok
import Ex2ms

@ets_table_name :http_client_rate_limits

@type record :: {term(), non_neg_integer(), non_neg_integer()}

@doc """
Function that needs to be called before using the rate limiter
It should be ran by a long running process, if the procees gets killed,
the ETS table is also killed
"""
@spec init() :: :ok
def init do
:ets.new(@ets_table_name, [
:named_table,
:ordered_set,
:public,
{:read_concurrency, true},
{:write_concurrency, true}
])

:ok
end

@spec rate_limit(String.t(), (() -> any())) :: any()
def rate_limit(url, function) do
{:ok, callback} = rate_limit(url)
result = function.()
callback.()
result
end

@spec rate_limit(String.t()) :: {:ok, (() -> :ok)}
def rate_limit(url) do
case retrieve_rate_limit_config(url) do
{:ok, {host, rate_limits}} ->
:ok = enforce_rate_limits(host, rate_limits)
{:ok, records} = enforce_rate_limits(host, rate_limits, [])
{:ok, fn -> update_records(records) end}

:error ->
:ok
{:ok, &nothing/0}
end
end

@spec enforce_rate_limits(atom(), list({non_neg_integer(), non_neg_integer()})) :: :ok
defp enforce_rate_limits(_host, []), do: :ok

defp enforce_rate_limits(host, [{scale, limit} | remaining_limits] = rate_limits) do
bucket = {host, scale}
{_, _, ms_to_next_bucket, _, _} = ExRated.inspect_bucket(bucket, scale, limit)

case ExRated.check_rate(bucket, scale, limit) do
{:ok, _} ->
enforce_rate_limits(host, remaining_limits)

_ ->
Process.sleep(ms_to_next_bucket)
enforce_rate_limits(host, rate_limits)
end
end
@spec nothing() :: :ok
def nothing, do: :ok

@spec retrieve_rate_limit_config(String.t()) ::
{:ok, {atom(), list({non_neg_integer(), non_neg_integer()})}} | :error
Expand All @@ -47,4 +66,106 @@ defmodule HttpClient.RateLimiter do
rescue
_ -> :error
end

@spec update_records(list(record)) :: :ok
defp update_records(records) do
:ok =
Enum.each(
records,
fn {host, scale, unique_id} ->
bucket = {host, scale}

case :ets.take(@ets_table_name, {bucket, unique_id}) do
[_] -> insert_unique(bucket)
_ -> :ok
end

nil
end
)
end

@spec enforce_rate_limits(atom(), list({non_neg_integer(), non_neg_integer()}), list(record())) ::
{:ok, list(record())}
defp enforce_rate_limits(_host, [], acc), do: {:ok, acc}

defp enforce_rate_limits(host, [{scale, limit} | remaining_limits] = rate_limits, acc) do
case check_rate(host, scale, limit) do
{:ok, {_, unique_id}} ->
enforce_rate_limits(host, remaining_limits, [{host, scale, unique_id} | acc])

{:error, wait_time} ->
Process.sleep(wait_time)
enforce_rate_limits(host, rate_limits, acc)
end
end

defp check_rate(id, scale, limit, first_pass \\ true) do
bucket = {id, scale}
# We use an optmistic logic here,
delete_count = if first_pass, do: 0, else: prune_table(id, scale)

case :ets.update_counter(
@ets_table_name,
{bucket, :counter},
[{2, -delete_count}, {2, 1, limit, limit}],
{{bucket, :counter}, 0}
) do
[^limit, ^limit] when first_pass ->
# We make a recursive call that will trigger the deletion of items if needed
check_rate(id, scale, limit, false)

[^limit, ^limit] when not first_pass ->
{result, _} =
:ets.select(
@ets_table_name,
fun do
{{^bucket, timestamp}, _} -> timestamp
end,
1
)

wait_time =
if Enum.count(result) == 1,
do: max(0, div(hd(result), 1_000) + scale - div(now(), 1_000)),
else: scale

{:error, wait_time}

[_, count] ->
{:ok, now} = insert_unique(bucket)

{:ok, {count, now}}
end
end

@spec insert_unique(any()) :: {:ok, non_neg_integer()}
defp insert_unique(bucket) do
now = now()

if :ets.insert_new(@ets_table_name, {{bucket, now}, 0}) do
{:ok, now}
else
insert_unique(bucket)
end
end

@spec prune_table(any(), non_neg_integer()) :: non_neg_integer()
defp prune_table(id, scale) do
now = now()
bucket = {id, scale}
outdated = now - scale * 1_000
range_bottom = {bucket, 0}
range_top = {bucket, outdated}

:ets.select_delete(
@ets_table_name,
fun do
{key, _} when key > ^range_bottom and key < ^range_top -> true
end
)
end

@spec now() :: non_neg_integer()
defp now, do: :erlang.system_time(:micro_seconds)
end
8 changes: 4 additions & 4 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ defmodule HttpClient.MixProject do
def project do
[
app: :http_client,
version: "0.2.3",
elixir: "~> 1.10",
version: "0.2.4",
elixir: "~> 1.12",
start_permanent: Mix.env() == :prod,
test_coverage: [tool: ExCoveralls],
deps: deps(),
Expand All @@ -21,14 +21,14 @@ defmodule HttpClient.MixProject do
end

def application do
[extra_applications: [:logger]]
[extra_applications: [:logger], mod: {HttpClient.Application, []}]
end

defp deps do
[
{:httpoison, "~> 1.5"},
{:telemetry, "~> 0.4.0"},
{:ex_rated, "~> 2.0"},
{:ex2ms, "~> 1.6.1"},
{:mox, "~> 0.5", only: :test},
{:excoveralls, "~> 0.12", only: [:test]},
{:ex_unit_sonarqube, "~> 0.1.2", only: [:dev, :test]},
Expand Down
10 changes: 5 additions & 5 deletions test/http_client/rate_limiter_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,22 @@ defmodule HttpClient.RateLimiterTest do
alias HttpClient.RateLimiter

test "rate_limit(url) | no config" do
:ok = RateLimiter.rate_limit("http://mydomain.com")
{:ok, _} = RateLimiter.rate_limit("http://mydomain.com")
end

test "rate_limit(url) | config available" do
Application.put_env(:http_client, :rate_limits, "mydomain2.com": [{:timer.seconds(2), 2}])

now_1 = now()
:ok = RateLimiter.rate_limit("http://mydomain2.com")
:ok = RateLimiter.rate_limit("http://mydomain2.com")
{:ok, _} = RateLimiter.rate_limit("http://mydomain2.com")
{:ok, _} = RateLimiter.rate_limit("http://mydomain2.com")
now_2 = now()
:ok = RateLimiter.rate_limit("http://mydomain2.com")
{:ok, _} = RateLimiter.rate_limit("http://mydomain2.com")
now_3 = now()

assert now_2 - now_1 < 500
assert div(now_3, 1_000) != div(now_2, 1_000)
end

defp now, do: DateTime.to_unix(DateTime.utc_now(), :millisecond)
defp now, do: :erlang.system_time(:micro_seconds)
end

0 comments on commit e219b0f

Please sign in to comment.