|
| 1 | +# Copyright 2022 Camptocamp SA (https://www.camptocamp.com). |
| 2 | +# @author Iván Todorovich <[email protected]> |
| 3 | +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). |
| 4 | + |
| 5 | +import logging |
| 6 | +import traceback |
| 7 | +from io import StringIO |
| 8 | + |
| 9 | +from psycopg2 import OperationalError |
| 10 | + |
| 11 | +from odoo import _, api, models, tools |
| 12 | +from odoo.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY |
| 13 | + |
| 14 | +from odoo.addons.queue_job.controllers.main import PG_RETRY |
| 15 | +from odoo.addons.queue_job.exception import ( |
| 16 | + FailedJobError, |
| 17 | + NothingToDoJob, |
| 18 | + RetryableJobError, |
| 19 | +) |
| 20 | +from odoo.addons.queue_job.job import Job |
| 21 | + |
| 22 | +_logger = logging.getLogger(__name__) |
| 23 | + |
| 24 | + |
| 25 | +class QueueJob(models.Model): |
| 26 | + _inherit = "queue.job" |
| 27 | + |
| 28 | + @api.model |
| 29 | + def _acquire_one_job(self): |
| 30 | + """Acquire the next job to be run. |
| 31 | +
|
| 32 | + :returns: queue.job record (locked for update) |
| 33 | + """ |
| 34 | + # TODO: This method should respect channel priority and capacity, |
| 35 | + # rather than just fetching them by creation date. |
| 36 | + self.flush() |
| 37 | + self.env.cr.execute( |
| 38 | + """ |
| 39 | + SELECT id |
| 40 | + FROM queue_job |
| 41 | + WHERE state = 'pending' |
| 42 | + AND (eta IS NULL OR eta <= (now() AT TIME ZONE 'UTC')) |
| 43 | + ORDER BY date_created DESC |
| 44 | + LIMIT 1 FOR NO KEY UPDATE SKIP LOCKED |
| 45 | + """ |
| 46 | + ) |
| 47 | + row = self.env.cr.fetchone() |
| 48 | + return self.browse(row and row[0]) |
| 49 | + |
| 50 | + def _process(self, commit=False): |
| 51 | + """Process the job""" |
| 52 | + self.ensure_one() |
| 53 | + job = Job._load_from_db_record(self) |
| 54 | + # Set it as started |
| 55 | + job.set_started() |
| 56 | + job.store() |
| 57 | + _logger.debug("%s started", job.uuid) |
| 58 | + # TODO: Commit the state change so that the state can be read from the UI |
| 59 | + # while the job is processing. However, doing this will release the |
| 60 | + # lock on the db, so we need to find another way. |
| 61 | + # if commit: |
| 62 | + # self.flush() |
| 63 | + # self.env.cr.commit() |
| 64 | + |
| 65 | + # Actual processing |
| 66 | + try: |
| 67 | + try: |
| 68 | + with self.env.cr.savepoint(): |
| 69 | + job.perform() |
| 70 | + job.set_done() |
| 71 | + job.store() |
| 72 | + except OperationalError as err: |
| 73 | + # Automatically retry the typical transaction serialization errors |
| 74 | + if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY: |
| 75 | + raise |
| 76 | + message = tools.ustr(err.pgerror, errors="replace") |
| 77 | + job.postpone(result=message, seconds=PG_RETRY) |
| 78 | + job.set_pending(reset_retry=False) |
| 79 | + job.store() |
| 80 | + _logger.debug("%s OperationalError, postponed", job) |
| 81 | + |
| 82 | + except NothingToDoJob as err: |
| 83 | + if str(err): |
| 84 | + msg = str(err) |
| 85 | + else: |
| 86 | + msg = _("Job interrupted and set to Done: nothing to do.") |
| 87 | + job.set_done(msg) |
| 88 | + job.store() |
| 89 | + |
| 90 | + except RetryableJobError as err: |
| 91 | + # delay the job later, requeue |
| 92 | + job.postpone(result=str(err), seconds=5) |
| 93 | + job.set_pending(reset_retry=False) |
| 94 | + job.store() |
| 95 | + _logger.debug("%s postponed", job) |
| 96 | + |
| 97 | + except (FailedJobError, Exception): |
| 98 | + with StringIO() as buff: |
| 99 | + traceback.print_exc(file=buff) |
| 100 | + _logger.error(buff.getvalue()) |
| 101 | + job.set_failed(exc_info=buff.getvalue()) |
| 102 | + job.store() |
| 103 | + |
| 104 | + if commit: # pragma: no cover |
| 105 | + self.env["base"].flush() |
| 106 | + self.env.cr.commit() # pylint: disable=invalid-commit |
| 107 | + |
| 108 | + @api.model |
| 109 | + def _job_runner(self, commit=True): |
| 110 | + """Short-lived job runner, triggered by async crons""" |
| 111 | + job = self._acquire_one_job() |
| 112 | + while job: |
| 113 | + job._process(commit=commit) |
| 114 | + job = self._acquire_one_job() |
| 115 | + # TODO: If limit_time_real_cron is reached before all the jobs are done, |
| 116 | + # the worker will be killed abruptly. |
| 117 | + # Ideally, find a way to know if we're close to reaching this limit, |
| 118 | + # stop processing, and trigger a new execution to continue. |
| 119 | + # |
| 120 | + # if job and limit_time_real_cron_reached_or_about_to_reach: |
| 121 | + # self._cron_trigger() |
| 122 | + # break |
| 123 | + |
| 124 | + @api.model |
| 125 | + def _cron_trigger(self, at=None): |
| 126 | + """Trigger the cron job runners |
| 127 | +
|
| 128 | + Odoo will prevent concurrent cron jobs from running. |
| 129 | + So, to support parallel execution, we'd need to have (at least) the |
| 130 | + same number of ir.crons records as cron workers. |
| 131 | +
|
| 132 | + All crons should be triggered at the same time. |
| 133 | + """ |
| 134 | + crons = self.env["ir.cron"].sudo().search([("queue_job_runner", "=", True)]) |
| 135 | + for cron in crons: |
| 136 | + cron._trigger(at=at) |
| 137 | + |
| 138 | + def _ensure_cron_trigger(self): |
| 139 | + """Create cron triggers for these jobs""" |
| 140 | + records = self.filtered(lambda r: r.state == "pending") |
| 141 | + if not records: |
| 142 | + return |
| 143 | + # Trigger immediate runs |
| 144 | + immediate = any(not rec.eta for rec in records) |
| 145 | + if immediate: |
| 146 | + self._cron_trigger() |
| 147 | + # Trigger delayed eta runs |
| 148 | + delayed_etas = {rec.eta for rec in records if rec.eta} |
| 149 | + if delayed_etas: |
| 150 | + self._cron_trigger(at=list(delayed_etas)) |
| 151 | + |
| 152 | + @api.model_create_multi |
| 153 | + def create(self, vals_list): |
| 154 | + # When jobs are created, also create the cron trigger |
| 155 | + records = super().create(vals_list) |
| 156 | + records._ensure_cron_trigger() |
| 157 | + return records |
| 158 | + |
| 159 | + def write(self, vals): |
| 160 | + # When a job state or eta changes, make sure a cron trigger is created |
| 161 | + res = super().write(vals) |
| 162 | + if "state" in vals or "eta" in vals: |
| 163 | + self._ensure_cron_trigger() |
| 164 | + return res |
0 commit comments