From 065903ff681e19a5f2fabb116223eb22a1fc94e0 Mon Sep 17 00:00:00 2001 From: Fiona Sanggang Date: Tue, 11 Jun 2019 18:08:12 +1200 Subject: [PATCH 1/7] Support suspend and resume per queue --- README.md | 20 ++++++++++++++-- lib/honeydew_ecto_notify_queue.ex | 23 +++++++++---------- .../jobs/job_config.ex | 2 +- ...180702161041_honeydew_ecto_notify_jobs.exs | 2 +- priv/templates/migration.exs.eex | 4 +++- ...dew_ecto_notify_queue_integration_test.exs | 2 +- 6 files changed, 35 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index e4d99f0..2abf1c5 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,18 @@ You can generate a migration to set up the required db tables with $ mix honeydew_ecto_notify_queue.db.gen.migration ``` +You will need to populate the `job_configs` table with the 'suspended' value yourself, which you can do by editing the migration generated in the previous step. For example: + +``` +execute "INSERT INTO job_configs VALUES (uuid_generate_v4(), 'suspended', false, now(), now())" + +// or, if you have multiple queues and want to be able to suspend and resume them individially + +execute "INSERT INTO job_configs VALUES (uuid_generate_v4(), 'my_first_queue_suspended', false, now(), now())" +execute "INSERT INTO job_configs VALUES (uuid_generate_v4(), 'my_second_queue_suspended', false, now(), now())" +``` + + ### Starting the queue Note: You should read [how to install honeydew here first](https://github.com/koudelka/honeydew) @@ -109,7 +121,7 @@ def start(_type, _args) do children = [ # ... The rest of your app's supervision tree ] ++ background_job_processes - + Supervisor.start_link(children, opts) end ``` @@ -119,6 +131,10 @@ end ```bash $ MIX_ENV=test mix do ecto.create, ecto.migrate $ mix test + +or + +$ docker-compose up test ``` ## Custom job configuration persistence @@ -130,5 +146,5 @@ across instances. An example of this may be the disabling of automatic queuing of a job when an API is hit. -You can see an example of how to listen for configuration changes in +You can see an example of how to listen for configuration changes in `examples/configuration_listener.ex` diff --git a/lib/honeydew_ecto_notify_queue.ex b/lib/honeydew_ecto_notify_queue.ex index 3adf6e9..aec64e1 100644 --- a/lib/honeydew_ecto_notify_queue.ex +++ b/lib/honeydew_ecto_notify_queue.ex @@ -350,19 +350,18 @@ defmodule HoneydewEctoNotifyQueue do |> Honeydew.Job.from_record() end - defp refresh_config(state) do + @spec refresh_config(%QState{}) :: :ok | {:error, any()} + defp refresh_config(%QState{repo: repo, queue_name: queue_name} = state) do with {:ok, config} <- - HoneydewEctoNotifyQueue.Config.get_config(state.repo, JobConfig.suspended_key()) do - suspended = config.value == "true" - - case suspended do - true -> - debug_log("Synchronised queue status to suspended") - Honeydew.suspend(state.queue_name) - - false -> - debug_log("Synchronised queue status to resumed") - Honeydew.resume(state.queue_name) + HoneydewEctoNotifyQueue.Config.get_config(repo, JobConfig.suspended_key(queue_name)) do + suspended = String.to_existing_atom(config.value) + + if suspended do + debug_log("Synchronised queue status to suspended") + Honeydew.suspend(queue_name) + else + debug_log("Synchronised queue status to resumed") + Honeydew.resume(queue_name) end Map.put(state, :database_suspended, suspended) diff --git a/lib/honeydew_ecto_notify_queue/jobs/job_config.ex b/lib/honeydew_ecto_notify_queue/jobs/job_config.ex index a6e68e9..8448600 100644 --- a/lib/honeydew_ecto_notify_queue/jobs/job_config.ex +++ b/lib/honeydew_ecto_notify_queue/jobs/job_config.ex @@ -20,5 +20,5 @@ defmodule HoneydewEctoNotifyQueue.JobConfig do |> validate_required([:key, :value]) end - def suspended_key, do: "suspended" + def suspended_key(queue_name), do: "#{queue_name}_suspended" end diff --git a/priv/repo/migrations/20180702161041_honeydew_ecto_notify_jobs.exs b/priv/repo/migrations/20180702161041_honeydew_ecto_notify_jobs.exs index 6dcad01..5b4ac74 100644 --- a/priv/repo/migrations/20180702161041_honeydew_ecto_notify_jobs.exs +++ b/priv/repo/migrations/20180702161041_honeydew_ecto_notify_jobs.exs @@ -31,7 +31,7 @@ defmodule HoneydewEctoNotifyQueue.Repo.Migrations.CreateHoneydewEctoNotifyTables create unique_index(:job_configs, [:key], using: :btree) execute "CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\"" - execute "INSERT INTO job_configs VALUES (uuid_generate_v4(), 'suspended', false, now(), now())" + execute "INSERT INTO job_configs VALUES (uuid_generate_v4(), 'my_queue_suspended', false, now(), now())" execute """ CREATE FUNCTION f_notify_config_change() diff --git a/priv/templates/migration.exs.eex b/priv/templates/migration.exs.eex index 7e270ce..c4cac65 100644 --- a/priv/templates/migration.exs.eex +++ b/priv/templates/migration.exs.eex @@ -31,7 +31,9 @@ defmodule <%= module_prefix %>.Repo.Migrations.CreateHoneydewEctoNotifyTables do create unique_index(:job_configs, [:key], using: :btree) execute "CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\"" - execute "INSERT INTO job_configs VALUES (uuid_generate_v4(), 'suspended', false, now(), now())" + + # Uncomment the following line or replace with your own custom suspended key + # execute "INSERT INTO job_configs VALUES (uuid_generate_v4(), 'suspended', false, now(), now())" execute """ CREATE FUNCTION f_notify_config_change() diff --git a/test/honeydew_ecto_notify_queue_integration_test.exs b/test/honeydew_ecto_notify_queue_integration_test.exs index 41c7470..07ea830 100644 --- a/test/honeydew_ecto_notify_queue_integration_test.exs +++ b/test/honeydew_ecto_notify_queue_integration_test.exs @@ -34,7 +34,7 @@ defmodule HoneydewEctoNotifyQueueIntegrationTest do :ok = Ecto.Adapters.SQL.Sandbox.checkout(Repo) Ecto.Adapters.SQL.Sandbox.mode(Repo, {:shared, self()}) - queue = :"#{:erlang.monotonic_time()}_#{:erlang.unique_integer()}" + queue = :my_queue spec = Honeydew.queue_spec(queue, From f1da79ccf83c4cc76a7c8ba5ffbbb87e99da6b46 Mon Sep 17 00:00:00 2001 From: Fiona Sanggang Date: Wed, 12 Jun 2019 10:01:26 +1200 Subject: [PATCH 2/7] Set DATABASE_URL in .travis.yml so tests on Travis pass --- .travis.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.travis.yml b/.travis.yml index 7f33a36..e75980e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,6 +8,9 @@ otp_release: - 19.3 - 20.0 +env: + - DATABASE_URL=postgres://postgres@127.0.0.1:5432/test + services: - postgresql From bfb115b935eaa202838cfd454a7cd8a6fb8334fb Mon Sep 17 00:00:00 2001 From: Fiona Sanggang Date: Wed, 12 Jun 2019 10:43:19 +1200 Subject: [PATCH 3/7] Correct typo in README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 2abf1c5..50ff7a8 100644 --- a/README.md +++ b/README.md @@ -71,7 +71,7 @@ You will need to populate the `job_configs` table with the 'suspended' value you ``` execute "INSERT INTO job_configs VALUES (uuid_generate_v4(), 'suspended', false, now(), now())" -// or, if you have multiple queues and want to be able to suspend and resume them individially +// or, if you have multiple queues and want to be able to suspend and resume them individually execute "INSERT INTO job_configs VALUES (uuid_generate_v4(), 'my_first_queue_suspended', false, now(), now())" execute "INSERT INTO job_configs VALUES (uuid_generate_v4(), 'my_second_queue_suspended', false, now(), now())" From b935e5c453604859499853606da4455ecf4f892b Mon Sep 17 00:00:00 2001 From: Fiona Sanggang Date: Wed, 12 Jun 2019 14:40:10 +1200 Subject: [PATCH 4/7] Tidy up elixir + otp versions in .travis.yml --- .travis.yml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index e75980e..7f635b6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,12 +1,10 @@ language: elixir elixir: - - 1.5.1 - - 1.6.1 - 1.7.1 - 1.8.1 otp_release: - - 19.3 - 20.0 + - 21.0 env: - DATABASE_URL=postgres://postgres@127.0.0.1:5432/test From 1bbe4496f52bbf8c3ea1e75f0cf655c85ceb47fe Mon Sep 17 00:00:00 2001 From: Fiona Sanggang Date: Fri, 14 Jun 2019 14:28:05 +1200 Subject: [PATCH 5/7] Add per_queue_suspension as a configuration option --- lib/honeydew_ecto_notify_queue.ex | 59 +++++++++++-------- .../jobs/job_config.ex | 4 +- ...dew_ecto_notify_queue_integration_test.exs | 25 ++++---- 3 files changed, 50 insertions(+), 38 deletions(-) diff --git a/lib/honeydew_ecto_notify_queue.ex b/lib/honeydew_ecto_notify_queue.ex index aec64e1..62fe499 100644 --- a/lib/honeydew_ecto_notify_queue.ex +++ b/lib/honeydew_ecto_notify_queue.ex @@ -29,36 +29,43 @@ defmodule HoneydewEctoNotifyQueue do :config_notification_ref, :jobs_notification_ref, :database_suspended, - :quiet_locking_errors + quiet_locking_errors: false, + per_queue_suspension: false ] end @impl true - def init( - queue_name, - [ - repo: repo, - max_job_time: max_job_time, - retry_seconds: retry_seconds, - notifier: notifier - ] = opts - ) do + @spec init(String.t(), list()) :: {:ok, %QState{}} + def init(queue_name, opts) when is_list(opts) do + allowed_opts_map = + opts + |> Enum.reduce(%{}, fn {key, value}, acc -> Map.put(acc, key, value) end) + |> Map.take([ + :repo, + :max_job_time, + :retry_seconds, + :notifier, + :quiet_locking_errors, + :per_queue_suspension + ]) + + init(queue_name, allowed_opts_map) + end + + @spec init(String.t(), map()) :: {:ok, %QState{}} + def init(queue_name, %{notifier: notifier} = opts) do {:ok, config_notification_ref} = start_config_notifier(notifier) {:ok, jobs_notification_ref} = start_jobs_notifier(notifier) - quiet_locking_errors = Keyword.get(opts, :quiet_locking_errors, true) - - state = %QState{ - repo: repo, - queue_name: queue_name, - max_job_time: max_job_time, - retry_seconds: retry_seconds, - config_notification_ref: config_notification_ref, - jobs_notification_ref: jobs_notification_ref, - quiet_locking_errors: quiet_locking_errors - } - - state = refresh_config(state) + state = + %QState{} + |> Map.merge(opts) + |> Map.merge(%{ + queue_name: queue_name, + config_notification_ref: config_notification_ref, + jobs_notification_ref: jobs_notification_ref + }) + |> refresh_config() {:ok, state} end @@ -351,9 +358,9 @@ defmodule HoneydewEctoNotifyQueue do end @spec refresh_config(%QState{}) :: :ok | {:error, any()} - defp refresh_config(%QState{repo: repo, queue_name: queue_name} = state) do - with {:ok, config} <- - HoneydewEctoNotifyQueue.Config.get_config(repo, JobConfig.suspended_key(queue_name)) do + defp refresh_config(%QState{repo: repo, queue_name: queue_name, per_queue_suspension: per_queue_suspension} = state) do + with suspended_key <- JobConfig.suspended_key(queue_name, per_queue_suspension), + {:ok, config} <- HoneydewEctoNotifyQueue.Config.get_config(repo, suspended_key) do suspended = String.to_existing_atom(config.value) if suspended do diff --git a/lib/honeydew_ecto_notify_queue/jobs/job_config.ex b/lib/honeydew_ecto_notify_queue/jobs/job_config.ex index 8448600..6198c21 100644 --- a/lib/honeydew_ecto_notify_queue/jobs/job_config.ex +++ b/lib/honeydew_ecto_notify_queue/jobs/job_config.ex @@ -20,5 +20,7 @@ defmodule HoneydewEctoNotifyQueue.JobConfig do |> validate_required([:key, :value]) end - def suspended_key(queue_name), do: "#{queue_name}_suspended" + @spec suspended_key(String.t(), boolean()) :: String.t() + def suspended_key(_, false), do: "suspended" + def suspended_key(queue_name, true), do: "#{queue_name}_suspended" end diff --git a/test/honeydew_ecto_notify_queue_integration_test.exs b/test/honeydew_ecto_notify_queue_integration_test.exs index 07ea830..4b94cf0 100644 --- a/test/honeydew_ecto_notify_queue_integration_test.exs +++ b/test/honeydew_ecto_notify_queue_integration_test.exs @@ -37,17 +37,20 @@ defmodule HoneydewEctoNotifyQueueIntegrationTest do queue = :my_queue spec = - Honeydew.queue_spec(queue, - queue: - {HoneydewEctoNotifyQueue, - [ - repo: HoneydewEctoNotifyQueue.Repo, - # seconds - max_job_time: 3_600, - # seconds, - retry_seconds: 15, - notifier: Notifier - ]}, + Honeydew.queue_spec( + queue, + queue: { + HoneydewEctoNotifyQueue, + [ + repo: HoneydewEctoNotifyQueue.Repo, + # seconds + max_job_time: 3_600, + # seconds, + retry_seconds: 15, + notifier: Notifier, + per_queue_suspension: true + ] + }, failure_mode: {Honeydew.FailureMode.Retry, times: 3} ) From 970d2dffbe9cc19faa6724e4539ad4cd7aff16df Mon Sep 17 00:00:00 2001 From: Fiona Sanggang Date: Mon, 17 Jun 2019 09:28:45 +1200 Subject: [PATCH 6/7] Check for required queue options when initialising queue --- lib/honeydew_ecto_notify_queue.ex | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/lib/honeydew_ecto_notify_queue.ex b/lib/honeydew_ecto_notify_queue.ex index 62fe499..ad7d251 100644 --- a/lib/honeydew_ecto_notify_queue.ex +++ b/lib/honeydew_ecto_notify_queue.ex @@ -39,7 +39,7 @@ defmodule HoneydewEctoNotifyQueue do def init(queue_name, opts) when is_list(opts) do allowed_opts_map = opts - |> Enum.reduce(%{}, fn {key, value}, acc -> Map.put(acc, key, value) end) + |> Map.new() |> Map.take([ :repo, :max_job_time, @@ -49,11 +49,13 @@ defmodule HoneydewEctoNotifyQueue do :per_queue_suspension ]) - init(queue_name, allowed_opts_map) + %{repo: _, max_job_time: _, retry_seconds: _, notifier: _} = allowed_opts_map + + do_init(queue_name, allowed_opts_map) end - @spec init(String.t(), map()) :: {:ok, %QState{}} - def init(queue_name, %{notifier: notifier} = opts) do + @spec do_init(String.t(), map()) :: {:ok, %QState{}} + defp do_init(queue_name, %{notifier: notifier} = opts) do {:ok, config_notification_ref} = start_config_notifier(notifier) {:ok, jobs_notification_ref} = start_jobs_notifier(notifier) From 1672efa4c8009875f8a9e07bf9f999f920b6cc9c Mon Sep 17 00:00:00 2001 From: Fiona Sanggang Date: Mon, 17 Jun 2019 09:29:52 +1200 Subject: [PATCH 7/7] Default quiet_locking_errors back to true --- lib/honeydew_ecto_notify_queue.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/honeydew_ecto_notify_queue.ex b/lib/honeydew_ecto_notify_queue.ex index ad7d251..30e2250 100644 --- a/lib/honeydew_ecto_notify_queue.ex +++ b/lib/honeydew_ecto_notify_queue.ex @@ -29,7 +29,7 @@ defmodule HoneydewEctoNotifyQueue do :config_notification_ref, :jobs_notification_ref, :database_suspended, - quiet_locking_errors: false, + quiet_locking_errors: true, per_queue_suspension: false ] end @@ -39,7 +39,7 @@ defmodule HoneydewEctoNotifyQueue do def init(queue_name, opts) when is_list(opts) do allowed_opts_map = opts - |> Map.new() + |> Map.new() |> Map.take([ :repo, :max_job_time,