Skip to content

Commit

Permalink
Merge pull request #3 from fsanggang/feature/per_queue_suspend_and_re…
Browse files Browse the repository at this point in the history
…sume

Support suspend and resume per queue
  • Loading branch information
aspett authored Jun 17, 2019
2 parents fc58be7 + 1672efa commit f613489
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 56 deletions.
7 changes: 4 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
language: elixir
elixir:
- 1.5.1
- 1.6.1
- 1.7.1
- 1.8.1
otp_release:
- 19.3
- 20.0
- 21.0

env:
- DATABASE_URL=postgres://[email protected]:5432/test

services:
- postgresql
Expand Down
20 changes: 18 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ You can generate a migration to set up the required db tables with
$ mix honeydew_ecto_notify_queue.db.gen.migration
```

You will need to populate the `job_configs` table with the 'suspended' value yourself, which you can do by editing the migration generated in the previous step. For example:

```
execute "INSERT INTO job_configs VALUES (uuid_generate_v4(), 'suspended', false, now(), now())"
// or, if you have multiple queues and want to be able to suspend and resume them individually
execute "INSERT INTO job_configs VALUES (uuid_generate_v4(), 'my_first_queue_suspended', false, now(), now())"
execute "INSERT INTO job_configs VALUES (uuid_generate_v4(), 'my_second_queue_suspended', false, now(), now())"
```


### Starting the queue

Note: You should read [how to install honeydew here first](https://github.com/koudelka/honeydew)
Expand Down Expand Up @@ -109,7 +121,7 @@ def start(_type, _args) do
children = [
# ... The rest of your app's supervision tree
] ++ background_job_processes

Supervisor.start_link(children, opts)
end
```
Expand All @@ -119,6 +131,10 @@ end
```bash
$ MIX_ENV=test mix do ecto.create, ecto.migrate
$ mix test

or

$ docker-compose up test
```

## Custom job configuration persistence
Expand All @@ -130,5 +146,5 @@ across instances.

An example of this may be the disabling of automatic queuing of a job when an API is hit.

You can see an example of how to listen for configuration changes in
You can see an example of how to listen for configuration changes in
`examples/configuration_listener.ex`
80 changes: 44 additions & 36 deletions lib/honeydew_ecto_notify_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,36 +29,45 @@ defmodule HoneydewEctoNotifyQueue do
:config_notification_ref,
:jobs_notification_ref,
:database_suspended,
:quiet_locking_errors
quiet_locking_errors: true,
per_queue_suspension: false
]
end

@impl true
def init(
queue_name,
[
repo: repo,
max_job_time: max_job_time,
retry_seconds: retry_seconds,
notifier: notifier
] = opts
) do
@spec init(String.t(), list()) :: {:ok, %QState{}}
def init(queue_name, opts) when is_list(opts) do
allowed_opts_map =
opts
|> Map.new()
|> Map.take([
:repo,
:max_job_time,
:retry_seconds,
:notifier,
:quiet_locking_errors,
:per_queue_suspension
])

%{repo: _, max_job_time: _, retry_seconds: _, notifier: _} = allowed_opts_map

do_init(queue_name, allowed_opts_map)
end

@spec do_init(String.t(), map()) :: {:ok, %QState{}}
defp do_init(queue_name, %{notifier: notifier} = opts) do
{:ok, config_notification_ref} = start_config_notifier(notifier)
{:ok, jobs_notification_ref} = start_jobs_notifier(notifier)

quiet_locking_errors = Keyword.get(opts, :quiet_locking_errors, true)

state = %QState{
repo: repo,
queue_name: queue_name,
max_job_time: max_job_time,
retry_seconds: retry_seconds,
config_notification_ref: config_notification_ref,
jobs_notification_ref: jobs_notification_ref,
quiet_locking_errors: quiet_locking_errors
}

state = refresh_config(state)
state =
%QState{}
|> Map.merge(opts)
|> Map.merge(%{
queue_name: queue_name,
config_notification_ref: config_notification_ref,
jobs_notification_ref: jobs_notification_ref
})
|> refresh_config()

{:ok, state}
end
Expand Down Expand Up @@ -350,19 +359,18 @@ defmodule HoneydewEctoNotifyQueue do
|> Honeydew.Job.from_record()
end

defp refresh_config(state) do
with {:ok, config} <-
HoneydewEctoNotifyQueue.Config.get_config(state.repo, JobConfig.suspended_key()) do
suspended = config.value == "true"

case suspended do
true ->
debug_log("Synchronised queue status to suspended")
Honeydew.suspend(state.queue_name)

false ->
debug_log("Synchronised queue status to resumed")
Honeydew.resume(state.queue_name)
@spec refresh_config(%QState{}) :: :ok | {:error, any()}
defp refresh_config(%QState{repo: repo, queue_name: queue_name, per_queue_suspension: per_queue_suspension} = state) do
with suspended_key <- JobConfig.suspended_key(queue_name, per_queue_suspension),
{:ok, config} <- HoneydewEctoNotifyQueue.Config.get_config(repo, suspended_key) do
suspended = String.to_existing_atom(config.value)

if suspended do
debug_log("Synchronised queue status to suspended")
Honeydew.suspend(queue_name)
else
debug_log("Synchronised queue status to resumed")
Honeydew.resume(queue_name)
end

Map.put(state, :database_suspended, suspended)
Expand Down
4 changes: 3 additions & 1 deletion lib/honeydew_ecto_notify_queue/jobs/job_config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,7 @@ defmodule HoneydewEctoNotifyQueue.JobConfig do
|> validate_required([:key, :value])
end

def suspended_key, do: "suspended"
@spec suspended_key(String.t(), boolean()) :: String.t()
def suspended_key(_, false), do: "suspended"
def suspended_key(queue_name, true), do: "#{queue_name}_suspended"
end
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ defmodule HoneydewEctoNotifyQueue.Repo.Migrations.CreateHoneydewEctoNotifyTables
create unique_index(:job_configs, [:key], using: :btree)

execute "CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\""
execute "INSERT INTO job_configs VALUES (uuid_generate_v4(), 'suspended', false, now(), now())"
execute "INSERT INTO job_configs VALUES (uuid_generate_v4(), 'my_queue_suspended', false, now(), now())"

execute """
CREATE FUNCTION f_notify_config_change()
Expand Down
4 changes: 3 additions & 1 deletion priv/templates/migration.exs.eex
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ defmodule <%= module_prefix %>.Repo.Migrations.CreateHoneydewEctoNotifyTables do
create unique_index(:job_configs, [:key], using: :btree)

execute "CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\""
execute "INSERT INTO job_configs VALUES (uuid_generate_v4(), 'suspended', false, now(), now())"

# Uncomment the following line or replace with your own custom suspended key
# execute "INSERT INTO job_configs VALUES (uuid_generate_v4(), 'suspended', false, now(), now())"

execute """
CREATE FUNCTION f_notify_config_change()
Expand Down
27 changes: 15 additions & 12 deletions test/honeydew_ecto_notify_queue_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,23 @@ defmodule HoneydewEctoNotifyQueueIntegrationTest do
:ok = Ecto.Adapters.SQL.Sandbox.checkout(Repo)
Ecto.Adapters.SQL.Sandbox.mode(Repo, {:shared, self()})

queue = :"#{:erlang.monotonic_time()}_#{:erlang.unique_integer()}"
queue = :my_queue

spec =
Honeydew.queue_spec(queue,
queue:
{HoneydewEctoNotifyQueue,
[
repo: HoneydewEctoNotifyQueue.Repo,
# seconds
max_job_time: 3_600,
# seconds,
retry_seconds: 15,
notifier: Notifier
]},
Honeydew.queue_spec(
queue,
queue: {
HoneydewEctoNotifyQueue,
[
repo: HoneydewEctoNotifyQueue.Repo,
# seconds
max_job_time: 3_600,
# seconds,
retry_seconds: 15,
notifier: Notifier,
per_queue_suspension: true
]
},
failure_mode: {Honeydew.FailureMode.Retry, times: 3}
)

Expand Down

0 comments on commit f613489

Please sign in to comment.