diff --git a/queue_job/README.rst b/queue_job/README.rst index a22f8ad8bb..e7965306f0 100644 --- a/queue_job/README.rst +++ b/queue_job/README.rst @@ -132,18 +132,7 @@ Configuration .. [1] It works with the threaded Odoo server too, although this way of running Odoo is obviously not for production purposes. -* Be sure to check out *Jobs Garbage Collector* CRON and change *enqueued_delta* and *started_delta* parameters to your needs. - - * ``enqueued_delta``: Spent time in minutes after which an enqueued job is considered stuck. - Set it to 0 to disable this check. - * ``started_delta``: Spent time in minutes after which a started job is considered stuck. - This parameter should not be less than ``--limit-time-real // 60`` parameter in your configuration. - Set it to 0 to disable this check. Set it to -1 to automate it, based in the server's ``--limit-time-real`` config parameter. - - .. code-block:: python - - # `model` corresponds to 'queue.job' model - model.requeue_stuck_jobs(enqueued_delta=1, started_delta=-1) +* Jobs that remain in `enqueued` or `started` state (because, for instance, their worker has been killed) will be automatically re-queued. Usage ===== @@ -595,22 +584,6 @@ Known issues / Roadmap * After creating a new database or installing ``queue_job`` on an existing database, Odoo must be restarted for the runner to detect it. -* When Odoo shuts down normally, it waits for running jobs to finish. - However, when the Odoo server crashes or is otherwise force-stopped, - running jobs are interrupted while the runner has no chance to know - they have been aborted. In such situations, jobs may remain in - ``started`` or ``enqueued`` state after the Odoo server is halted. - Since the runner has no way to know if they are actually running or - not, and does not know for sure if it is safe to restart the jobs, - it does not attempt to restart them automatically. Such stale jobs - therefore fill the running queue and prevent other jobs to start. - You must therefore requeue them manually, either from the Jobs view, - or by running the following SQL statement *before starting Odoo*: - -.. code-block:: sql - - update queue_job set state='pending' where state in ('started', 'enqueued') - Changelog ========= diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py index 43850ac203..ce9b31bbb7 100644 --- a/queue_job/__manifest__.py +++ b/queue_job/__manifest__.py @@ -2,7 +2,7 @@ { "name": "Job Queue", - "version": "15.0.2.3.12", + "version": "15.0.2.3.13", "author": "Camptocamp,ACSONE SA/NV,Odoo Community Association (OCA)", "website": "https://github.com/OCA/queue", "license": "LGPL-3", diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index dff6f4fe1c..8a50ae65be 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -31,6 +31,8 @@ def _try_perform_job(self, env, job): job.set_started() job.store() env.cr.commit() + job.lock() + _logger.debug("%s started", job) job.perform() diff --git a/queue_job/data/queue_data.xml b/queue_job/data/queue_data.xml index 55bcb3f5fc..a2680cc475 100644 --- a/queue_job/data/queue_data.xml +++ b/queue_job/data/queue_data.xml @@ -1,17 +1,6 @@ - - Jobs Garbage Collector - 5 - minutes - -1 - - code - model.requeue_stuck_jobs(enqueued_delta=1, started_delta=-1) - Job failed diff --git a/queue_job/job.py b/queue_job/job.py index d38a899095..f7255d1cd4 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -238,6 +238,61 @@ def load_many(cls, env, job_uuids): recordset = cls.db_records_from_uuids(env, job_uuids) return {cls._load_from_db_record(record) for record in recordset} + def add_lock_record(self): + """ + Create row in db to be locked while the job is being performed. + """ + self.env.cr.execute( + """ + INSERT INTO + queue_job_lock (id, queue_job_id) + SELECT + id, id + FROM + queue_job + WHERE + uuid = %s + ON CONFLICT(id) + DO NOTHING; + """, + [self.uuid], + ) + + def lock(self): + """ + Lock row of job that is being performed + + If a job cannot be locked, + it means that the job wasn't started, + a RetryableJobError is thrown. + """ + self.env.cr.execute( + """ + SELECT + * + FROM + queue_job_lock + WHERE + queue_job_id in ( + SELECT + id + FROM + queue_job + WHERE + uuid = %s + AND state='started' + ) + FOR UPDATE; + """, + [self.uuid], + ) + + # 1 job should be locked + if 1 != len(self.env.cr.fetchall()): + raise RetryableJobError( + f"Trying to lock job that wasn't started, uuid: {self.uuid}" + ) + @classmethod def _load_from_db_record(cls, job_db_record): stored = job_db_record @@ -819,6 +874,7 @@ def set_started(self): self.state = STARTED self.date_started = datetime.now() self.worker_pid = os.getpid() + self.add_lock_record() def set_done(self, result=None): self.state = DONE diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 25823a9973..beed291aff 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -114,22 +114,6 @@ * After creating a new database or installing queue_job on an existing database, Odoo must be restarted for the runner to detect it. -* When Odoo shuts down normally, it waits for running jobs to finish. - However, when the Odoo server crashes or is otherwise force-stopped, - running jobs are interrupted while the runner has no chance to know - they have been aborted. In such situations, jobs may remain in - ``started`` or ``enqueued`` state after the Odoo server is halted. - Since the runner has no way to know if they are actually running or - not, and does not know for sure if it is safe to restart the jobs, - it does not attempt to restart them automatically. Such stale jobs - therefore fill the running queue and prevent other jobs to start. - You must therefore requeue them manually, either from the Jobs view, - or by running the following SQL statement *before starting Odoo*: - -.. code-block:: sql - - update queue_job set state='pending' where state in ('started', 'enqueued') - .. rubric:: Footnotes .. [1] From a security standpoint, it is safe to have an anonymous HTTP @@ -207,28 +191,6 @@ def _connection_info_for(db_name): def _async_http_get(scheme, host, port, user, password, db_name, job_uuid): - # Method to set failed job (due to timeout, etc) as pending, - # to avoid keeping it as enqueued. - def set_job_pending(): - connection_info = _connection_info_for(db_name) - conn = psycopg2.connect(**connection_info) - conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) - with closing(conn.cursor()) as cr: - cr.execute( - "UPDATE queue_job SET state=%s, " - "date_enqueued=NULL, date_started=NULL " - "WHERE uuid=%s and state=%s " - "RETURNING uuid", - (PENDING, job_uuid, ENQUEUED), - ) - if cr.fetchone(): - _logger.warning( - "state of job %s was reset from %s to %s", - job_uuid, - ENQUEUED, - PENDING, - ) - # TODO: better way to HTTP GET asynchronously (grequest, ...)? # if this was python3 I would be doing this with # asyncio, aiohttp and aiopg @@ -236,6 +198,7 @@ def urlopen(): url = "{}://{}:{}/queue_job/runjob?db={}&job_uuid={}".format( scheme, host, port, db_name, job_uuid ) + # pylint: disable=except-pass try: auth = None if user: @@ -249,10 +212,10 @@ def urlopen(): # for codes between 500 and 600 response.raise_for_status() except requests.Timeout: - set_job_pending() + # A timeout is a normal behaviour, it shouldn't be logged as an exception + pass except Exception: _logger.exception("exception in GET %s", url) - set_job_pending() thread = threading.Thread(target=urlopen) thread.daemon = True @@ -333,6 +296,13 @@ def keep_alive(self): with closing(self.conn.cursor()) as cr: cr.execute(query) + def set_job_pending(self, uuid): + with closing(self.conn.cursor()) as cr: + cr.execute( + "UPDATE queue_job SET state=%s, " "date_enqueued=NULL " "WHERE uuid=%s", + (PENDING, uuid), + ) + def set_job_enqueued(self, uuid): with closing(self.conn.cursor()) as cr: cr.execute( @@ -343,8 +313,104 @@ def set_job_enqueued(self, uuid): (ENQUEUED, uuid), ) + def _query_requeue_dead_jobs(self): + return """ + UPDATE + queue_job + SET + state=( + CASE + WHEN + max_retries IS NOT NULL AND + retry IS NOT NULL AND + retry>max_retries + THEN 'failed' + ELSE 'pending' + END), + retry=(CASE WHEN state='started' THEN COALESCE(retry,0)+1 ELSE retry END), + exc_name=( + CASE + WHEN + max_retries IS NOT NULL AND + retry IS NOT NULL AND + retry>max_retries + THEN 'JobFoundDead' + ELSE exc_name + END), + exc_info=( + CASE + WHEN + max_retries IS NOT NULL AND + retry IS NOT NULL AND + retry>max_retries + THEN 'Job found dead after too many retries' + ELSE exc_info + END) + WHERE + id in ( + SELECT + queue_job_id + FROM + queue_job_lock + WHERE + queue_job_id in ( + SELECT + id + FROM + queue_job + WHERE + state IN ('enqueued','started') + AND date_enqueued < + (now() AT TIME ZONE 'utc' - INTERVAL '10 sec') + ) + FOR UPDATE SKIP LOCKED + ) + OR + id in ( + SELECT + id + FROM + queue_job + WHERE + state = 'started' AND NOT EXISTS ( + SELECT 1 FROM queue_job_lock + WHERE queue_job_id = queue_job.id + ) + FOR UPDATE SKIP LOCKED + ) + RETURNING uuid + """ -class QueueJobRunner(object): + def requeue_dead_jobs(self): + """ + Set started and enqueued jobs but not locked to pending + + A job is locked when it's being executed + When a job is killed, it releases the lock + + If the number of retries exceeds the number of max retries, + the job is set as 'failed' with the error 'JobFoundDead'. + + Adding a buffer on 'date_enqueued' to check + that it has been enqueued for more than 10sec. + This prevents from requeuing jobs before they are actually started. + + When Odoo shuts down normally, it waits for running jobs to finish. + However, when the Odoo server crashes or is otherwise force-stopped, + running jobs are interrupted while the runner has no chance to know + they have been aborted. + """ + + with closing(self.conn.cursor()) as cr: + query = self._query_requeue_dead_jobs() + + cr.execute(query) + + for (uuid,) in cr.fetchall(): + _logger.warning("Re-queued dead job with uuid: %s", uuid) + + +class QueueJobRunner: def __init__( self, scheme="http", @@ -421,9 +487,26 @@ def initialize_databases(self): self.db_by_name[db_name] = db with db.select_jobs("state in %s", (NOT_DONE,)) as cr: for job_data in cr: + # In case we have enqueued jobs we move them to pending, + # otherwise they remain enqueued and occupy channels slots. + if job_data[6] == "enqueued": + try: + self.db_by_name[db_name].set_job_pending(job_data[1]) + job_data = (*job_data[:6], "pending") + except Exception: + _logger.warning( + "error setting job %s to pending", + job_data[1], + exc_info=True, + ) self.channel_manager.notify(db_name, *job_data) _logger.info("queue job runner ready for db %s", db_name) + def requeue_dead_jobs(self): + for db in self.db_by_name.values(): + if db.has_queue_job: + db.requeue_dead_jobs() + def run_jobs(self): now = _odoo_now() for job in self.channel_manager.get_jobs_to_run(now): @@ -516,6 +599,7 @@ def run(self): _logger.info("database connections ready") # inner loop does the normal processing while not self._stop: + self.requeue_dead_jobs() self.process_notifications() self.run_jobs() self.wait_notification() diff --git a/queue_job/migrations/15.0.2.3.13/pre-migration.py b/queue_job/migrations/15.0.2.3.13/pre-migration.py new file mode 100644 index 0000000000..8dbb6ff7f1 --- /dev/null +++ b/queue_job/migrations/15.0.2.3.13/pre-migration.py @@ -0,0 +1,22 @@ +# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) + + +def migrate(cr, version): + # Deactivate cron garbage collector + cr.execute( + """ + UPDATE + ir_cron + SET + active=False + WHERE id IN ( + SELECT res_id + FROM + ir_model_data + WHERE + module='queue_job' + AND model='ir.cron' + AND name='ir_cron_queue_job_garbage_collector' + ); + """ + ) diff --git a/queue_job/models/__init__.py b/queue_job/models/__init__.py index 4744e7ab46..6265dfe9cb 100644 --- a/queue_job/models/__init__.py +++ b/queue_job/models/__init__.py @@ -3,3 +3,4 @@ from . import queue_job from . import queue_job_channel from . import queue_job_function +from . import queue_job_lock diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index c3a37bed34..5518b96579 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -6,7 +6,6 @@ from datetime import datetime, timedelta from odoo import _, api, exceptions, fields, models -from odoo.osv import expression from odoo.tools import config, html_escape from odoo.addons.base_sparse_field.models.fields import Serialized @@ -417,58 +416,6 @@ def autovacuum(self): break return True - def requeue_stuck_jobs(self, enqueued_delta=1, started_delta=0): - """Fix jobs that are in a bad states - - :param in_queue_delta: lookup time in minutes for jobs - that are in enqueued state, - 0 means that it is not checked - - :param started_delta: lookup time in minutes for jobs - that are in started state, - 0 means that it is not checked, - -1 will use `--limit-time-real` config value - """ - if started_delta == -1: - started_delta = (config["limit_time_real"] // 60) + 1 - return self._get_stuck_jobs_to_requeue( - enqueued_delta=enqueued_delta, started_delta=started_delta - ).requeue() - - def _get_stuck_jobs_domain(self, queue_dl, started_dl): - domain = [] - now = fields.datetime.now() - if queue_dl: - queue_dl = now - timedelta(minutes=queue_dl) - domain.append( - [ - "&", - ("date_enqueued", "<=", fields.Datetime.to_string(queue_dl)), - ("state", "=", "enqueued"), - ] - ) - if started_dl: - started_dl = now - timedelta(minutes=started_dl) - domain.append( - [ - "&", - ("date_started", "<=", fields.Datetime.to_string(started_dl)), - ("state", "=", "started"), - ] - ) - if not domain: - raise exceptions.ValidationError( - _("If both parameters are 0, ALL jobs will be requeued!") - ) - return expression.OR(domain) - - def _get_stuck_jobs_to_requeue(self, enqueued_delta, started_delta): - job_model = self.env["queue.job"] - stuck_jobs = job_model.search( - self._get_stuck_jobs_domain(enqueued_delta, started_delta) - ) - return stuck_jobs - def related_action_open_record(self): """Open a form view with the record(s) of the job. diff --git a/queue_job/models/queue_job_lock.py b/queue_job/models/queue_job_lock.py new file mode 100644 index 0000000000..b01c7f3a91 --- /dev/null +++ b/queue_job/models/queue_job_lock.py @@ -0,0 +1,16 @@ +# Copyright 2025 ACSONE SA/NV +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +from odoo import fields, models + + +class QueueJobLock(models.Model): + _name = "queue.job.lock" + _description = "Queue Job Lock" + + queue_job_id = fields.Many2one( + comodel_name="queue.job", + required=True, + ondelete="cascade", + index=True, + ) diff --git a/queue_job/readme/CONFIGURE.rst b/queue_job/readme/CONFIGURE.rst index b9547b9465..fb5b6f39c4 100644 --- a/queue_job/readme/CONFIGURE.rst +++ b/queue_job/readme/CONFIGURE.rst @@ -42,15 +42,4 @@ .. [1] It works with the threaded Odoo server too, although this way of running Odoo is obviously not for production purposes. -* Be sure to check out *Jobs Garbage Collector* CRON and change *enqueued_delta* and *started_delta* parameters to your needs. - - * ``enqueued_delta``: Spent time in minutes after which an enqueued job is considered stuck. - Set it to 0 to disable this check. - * ``started_delta``: Spent time in minutes after which a started job is considered stuck. - This parameter should not be less than ``--limit-time-real // 60`` parameter in your configuration. - Set it to 0 to disable this check. Set it to -1 to automate it, based in the server's ``--limit-time-real`` config parameter. - - .. code-block:: python - - # `model` corresponds to 'queue.job' model - model.requeue_stuck_jobs(enqueued_delta=1, started_delta=-1) +* Jobs that remain in `enqueued` or `started` state (because, for instance, their worker has been killed) will be automatically re-queued. diff --git a/queue_job/readme/ROADMAP.rst b/queue_job/readme/ROADMAP.rst index 34cc20e6db..c7eb51212f 100644 --- a/queue_job/readme/ROADMAP.rst +++ b/queue_job/readme/ROADMAP.rst @@ -1,18 +1,2 @@ * After creating a new database or installing ``queue_job`` on an existing database, Odoo must be restarted for the runner to detect it. - -* When Odoo shuts down normally, it waits for running jobs to finish. - However, when the Odoo server crashes or is otherwise force-stopped, - running jobs are interrupted while the runner has no chance to know - they have been aborted. In such situations, jobs may remain in - ``started`` or ``enqueued`` state after the Odoo server is halted. - Since the runner has no way to know if they are actually running or - not, and does not know for sure if it is safe to restart the jobs, - it does not attempt to restart them automatically. Such stale jobs - therefore fill the running queue and prevent other jobs to start. - You must therefore requeue them manually, either from the Jobs view, - or by running the following SQL statement *before starting Odoo*: - -.. code-block:: sql - - update queue_job set state='pending' where state in ('started', 'enqueued') diff --git a/queue_job/security/ir.model.access.csv b/queue_job/security/ir.model.access.csv index 634daf8ede..4def7dc38a 100644 --- a/queue_job/security/ir.model.access.csv +++ b/queue_job/security/ir.model.access.csv @@ -1,5 +1,6 @@ id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink access_queue_job_manager,queue job manager,queue_job.model_queue_job,queue_job.group_queue_job_manager,1,1,1,1 +access_queue_job_lock_manager,queue job lock manager,queue_job.model_queue_job_lock,queue_job.group_queue_job_manager,1,0,0,0 access_queue_job_function_manager,queue job functions manager,queue_job.model_queue_job_function,queue_job.group_queue_job_manager,1,1,1,1 access_queue_job_channel_manager,queue job channel manager,queue_job.model_queue_job_channel,queue_job.group_queue_job_manager,1,1,1,1 access_queue_requeue_job,queue requeue job manager,queue_job.model_queue_requeue_job,queue_job.group_queue_job_manager,1,1,1,1 diff --git a/queue_job/static/description/index.html b/queue_job/static/description/index.html index bcd8f0580b..1a17b55e06 100644 --- a/queue_job/static/description/index.html +++ b/queue_job/static/description/index.html @@ -375,19 +375,19 @@

Job Queue

Jobs are executed in the background by a Jobrunner, in their own transaction.

Example:

-from odoo import models, fields, api
+from odoo import models, fields, api
 
-class MyModel(models.Model):
+class MyModel(models.Model):
    _name = 'my.model'
 
-   def my_method(self, a, k=None):
+   def my_method(self, a, k=None):
        _logger.info('executed with a: %s and k: %s', a, k)
 
 
-class MyOtherModel(models.Model):
+class MyOtherModel(models.Model):
     _name = 'my.other.model'
 
-    def button_do_stuff(self):
+    def button_do_stuff(self):
         self.env['my.model'].with_delay().my_method('a', k=2)
 

In the snippet of code above, when we call button_do_stuff, a job capturing @@ -492,20 +492,8 @@

Configuration

of running Odoo is obviously not for production purposes. -
    -
  • Be sure to check out Jobs Garbage Collector CRON and change enqueued_delta and started_delta parameters to your needs.

      -
    • enqueued_delta: Spent time in minutes after which an enqueued job is considered stuck. -Set it to 0 to disable this check.
    • -
    • started_delta: Spent time in minutes after which a started job is considered stuck. -This parameter should not be less than --limit-time-real // 60 parameter in your configuration. -Set it to 0 to disable this check. Set it to -1 to automate it, based in the server’s --limit-time-real config parameter.
    • -
    -
    -# `model` corresponds to 'queue.job' model
    -model.requeue_stuck_jobs(enqueued_delta=1, started_delta=-1)
    -
    -
  • +
  • Jobs that remain in enqueued or started state (because, for instance, their worker has been killed) will be automatically re-queued.
@@ -521,7 +509,7 @@

Delaying jobs

The fast way to enqueue a job for a method is to use with_delay() on a record or model:

-def button_done(self):
+def button_done(self):
     self.with_delay().print_confirmation_document(self.state)
     self.write({"state": "done"})
     return True
@@ -537,7 +525,7 @@ 

Delaying jobs

on a record or model. The following is the equivalent of with_delay() but using the long form:

-def button_done(self):
+def button_done(self):
     delayable = self.delayable()
     delayable.print_confirmation_document(self.state)
     delayable.delay()
@@ -547,7 +535,7 @@ 

Delaying jobs

Methods of Delayable objects return itself, so it can be used as a builder pattern, which in some cases allow to build the jobs dynamically:

-def button_generate_simple_with_delayable(self):
+def button_generate_simple_with_delayable(self):
     self.ensure_one()
     # Introduction of a delayable object, using a builder pattern
     # allowing to chain jobs or set properties. The delay() method
@@ -563,7 +551,7 @@ 

Delaying jobs

The simplest way to define a dependency is to use .on_done(job) on a Delayable:

-def button_chain_done(self):
+def button_chain_done(self):
     self.ensure_one()
     job1 = self.browse(1).delayable().generate_thumbnail((50, 50))
     job2 = self.browse(1).delayable().generate_thumbnail((50, 50))
@@ -580,9 +568,9 @@ 

Delaying jobs

[B] of jobs. When and only when all the jobs of the group [A] are executed, the jobs of the group [B] are executed. The code would look like:

-from odoo.addons.queue_job.delay import group, chain
+from odoo.addons.queue_job.delay import group, chain
 
-def button_done(self):
+def button_done(self):
     group_a = group(self.delayable().method_foo(), self.delayable().method_bar())
     group_b = group(self.delayable().method_baz(1), self.delayable().method_baz(2))
     chain(group_a, group_b).delay()
@@ -674,10 +662,10 @@ 

Configure default options for job

Example of related action code:

-class QueueJob(models.Model):
+class QueueJob(models.Model):
     _inherit = 'queue.job'
 
-    def related_action_partner(self, name):
+    def related_action_partner(self, name):
         self.ensure_one()
         model = self.model_name
         partner = self.records
@@ -719,12 +707,12 @@ 

Configure default options for job be customized in Base._job_prepare_context_before_enqueue_keys.

Example:

-class Base(models.AbstractModel):
+class Base(models.AbstractModel):
 
     _inherit = "base"
 
     @api.model
-    def _job_prepare_context_before_enqueue_keys(self):
+    def _job_prepare_context_before_enqueue_keys(self):
         """Keys to keep in context of stored jobs
 
         Empty by default for backward compatibility.
@@ -742,7 +730,7 @@ 

Configure default options for job

Tip: you can do this at test case level like this

 @classmethod
-def setUpClass(cls):
+def setUpClass(cls):
     super().setUpClass()
     cls.env = cls.env(context=dict(
         cls.env.context,
@@ -782,20 +770,20 @@ 

Testing

A very small example (more details in tests/common.py):

 # code
-def my_job_method(self, name, count):
+def my_job_method(self, name, count):
     self.write({"name": " ".join([name] * count)
 
-def method_to_test(self):
+def method_to_test(self):
     count = self.env["other.model"].search_count([])
     self.with_delay(priority=15).my_job_method("Hi!", count=count)
     return count
 
 # tests
-from odoo.addons.queue_job.tests.common import trap_jobs
+from odoo.addons.queue_job.tests.common import trap_jobs
 
 # first test only check the expected behavior of the method and the proper
 # enqueuing of jobs
-def test_method_to_test(self):
+def test_method_to_test(self):
     with trap_jobs() as trap:
         result = self.env["model"].method_to_test()
         expected_count = 12
@@ -811,7 +799,7 @@ 

Testing

# second test to validate the behavior of the job unitarily - def test_my_job_method(self): + def test_my_job_method(self): record = self.env["model"].browse(1) record.my_job_method("Hi!", count=12) self.assertEqual(record.name, "Hi! Hi! Hi! Hi! Hi! Hi! Hi! Hi! Hi! Hi! Hi! Hi!") @@ -819,7 +807,7 @@

Testing

If you prefer, you can still test the whole thing in a single test, by calling jobs_tester.perform_enqueued_jobs() in your test.

-def test_method_to_test(self):
+def test_method_to_test(self):
     with trap_jobs() as trap:
         result = self.env["model"].method_to_test()
         expected_count = 12
@@ -854,7 +842,7 @@ 

Testing

Tip: you can do this at test case level like this

 @classmethod
-def setUpClass(cls):
+def setUpClass(cls):
     super().setUpClass()
     cls.env = cls.env(context=dict(
         cls.env.context,
@@ -894,21 +882,7 @@ 

Known issues / Roadmap

  • After creating a new database or installing queue_job on an existing database, Odoo must be restarted for the runner to detect it.
  • -
  • When Odoo shuts down normally, it waits for running jobs to finish. -However, when the Odoo server crashes or is otherwise force-stopped, -running jobs are interrupted while the runner has no chance to know -they have been aborted. In such situations, jobs may remain in -started or enqueued state after the Odoo server is halted. -Since the runner has no way to know if they are actually running or -not, and does not know for sure if it is safe to restart the jobs, -it does not attempt to restart them automatically. Such stale jobs -therefore fill the running queue and prevent other jobs to start. -You must therefore requeue them manually, either from the Jobs view, -or by running the following SQL statement before starting Odoo:
-
-update queue_job set state='pending' where state in ('started', 'enqueued')
-

Changelog

diff --git a/queue_job/tests/__init__.py b/queue_job/tests/__init__.py index e0ff9576a5..047942bde4 100644 --- a/queue_job/tests/__init__.py +++ b/queue_job/tests/__init__.py @@ -6,3 +6,4 @@ from . import test_model_job_function from . import test_queue_job_protected_write from . import test_wizards +from . import test_requeue_dead_job diff --git a/queue_job/tests/test_requeue_dead_job.py b/queue_job/tests/test_requeue_dead_job.py new file mode 100644 index 0000000000..5d61043224 --- /dev/null +++ b/queue_job/tests/test_requeue_dead_job.py @@ -0,0 +1,168 @@ +# Copyright 2025 ACSONE SA/NV +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). +from contextlib import closing +from datetime import datetime, timedelta + +from odoo.tests.common import TransactionCase + +from odoo.addons.queue_job.job import Job +from odoo.addons.queue_job.jobrunner.runner import Database + + +class TestRequeueDeadJob(TransactionCase): + def create_dummy_job(self, uuid): + """ + Create dummy job for tests + """ + return ( + self.env["queue.job"] + .with_context( + _job_edit_sentinel=self.env["queue.job"].EDIT_SENTINEL, + ) + .create( + { + "uuid": uuid, + "user_id": self.env.user.id, + "state": "pending", + "model_name": "queue.job", + "method_name": "write", + } + ) + ) + + def get_locks(self, uuid, cr=None): + """ + Retrieve lock rows + """ + if cr is None: + cr = self.env.cr + + cr.execute( + """ + SELECT + id + FROM + queue_job_lock + WHERE + id IN ( + SELECT + id + FROM + queue_job + WHERE + uuid = %s + ) + FOR UPDATE SKIP LOCKED + """, + [uuid], + ) + + return cr.fetchall() + + def test_add_lock_record(self): + queue_job = self.create_dummy_job("test_add_lock") + job_obj = Job.load(self.env, queue_job.uuid) + + job_obj.set_started() + self.assertEqual(job_obj.state, "started") + + locks = self.get_locks(job_obj.uuid) + + self.assertEqual(1, len(locks)) + + def test_lock(self): + queue_job = self.create_dummy_job("test_lock") + job_obj = Job.load(self.env, queue_job.uuid) + + job_obj.set_started() + job_obj.store() + + locks = self.get_locks(job_obj.uuid) + + self.assertEqual(1, len(locks)) + + # commit to update queue_job records in DB + self.env.cr.commit() # pylint: disable=E8102 + + job_obj.lock() + + with closing(self.env.registry.cursor()) as new_cr: + locks = self.get_locks(job_obj.uuid, new_cr) + + # Row should be locked + self.assertEqual(0, len(locks)) + + # clean up + queue_job.unlink() + + self.env.cr.commit() # pylint: disable=E8102 + + # because we committed the cursor, the savepoint of the test method is + # gone, and this would break TransactionCase cleanups + self.cr.execute("SAVEPOINT test_%d" % self._savepoint_id) + + def test_requeue_dead_jobs(self): + uuid = "test_requeue_dead_jobs" + + queue_job = self.create_dummy_job(uuid) + job_obj = Job.load(self.env, queue_job.uuid) + + job_obj.set_enqueued() + # simulate enqueuing was in the past + job_obj.date_enqueued = datetime.now() - timedelta(minutes=1) + job_obj.set_started() + + job_obj.store() + self.env.cr.commit() # pylint: disable=E8102 + + # requeue dead jobs using current cursor + query = Database(self.env.cr.dbname)._query_requeue_dead_jobs() + self.env.cr.execute(query) + + uuids_requeued = self.env.cr.fetchall() + + self.assertEqual(len(uuids_requeued), 1) + self.assertEqual(uuids_requeued[0][0], uuid) + + # clean up + queue_job.unlink() + self.env.cr.commit() # pylint: disable=E8102 + + # because we committed the cursor, the savepoint of the test method is + # gone, and this would break TransactionCase cleanups + self.cr.execute("SAVEPOINT test_%d" % self._savepoint_id) + + def test_requeue_dead_jobs_started_before_patch(self): + uuid = "test_requeue_dead_jobs_before_locking" + + queue_job = self.create_dummy_job(uuid) + job_obj = Job.load(self.env, queue_job.uuid) + + job_obj.set_enqueued() + # simulate enqueuing was in the past + job_obj.date_enqueued = datetime.now() - timedelta(minutes=1) + job_obj.set_started() + # Delete the job lock to simulate job started before new implementation + self.env.cr.execute( + "DELETE FROM queue_job_lock WHERE queue_job_id=%s", (queue_job.id,) + ) + + job_obj.store() + self.env.cr.commit() # pylint: disable=E8102 + + # requeue dead jobs using current cursor + query = Database(self.env.cr.dbname)._query_requeue_dead_jobs() + self.env.cr.execute(query) + + uuids_requeued = self.env.cr.fetchall() + + self.assertEqual(len(uuids_requeued), 1) + self.assertEqual(uuids_requeued[0][0], uuid) + + # clean up + queue_job.unlink() + self.env.cr.commit() # pylint: disable=E8102 + + # because we committed the cursor, the savepoint of the test method is + # gone, and this would break TransactionCase cleanups + self.cr.execute("SAVEPOINT test_%d" % self._savepoint_id)