-
-
Notifications
You must be signed in to change notification settings - Fork 528
[15.0] [IMP] Automatically requeue dead jobs #796
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
feketemihai
wants to merge
6
commits into
OCA:15.0
Choose a base branch
from
NextERP-Romania:15.0-remove_dead_jobs
base: 15.0
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
177f398
[IMP] queue_job: tweak comment and warning message
sbidoul 303d347
[IMP] queue_job: use queue_job_lock model
sbidoul 8319329
[IMP] queue_job: remove cron garbage collector and automatically requ…
AnizR e673218
Update code to allow jobs requeing.
feketemihai 82fff58
Update code to allow jobs requeing.
feketemihai bb5c144
Add requeue automatically of previous enqueued jobs
feketemihai File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -114,22 +114,6 @@ | |
| * After creating a new database or installing queue_job on an | ||
| existing database, Odoo must be restarted for the runner to detect it. | ||
|
|
||
| * When Odoo shuts down normally, it waits for running jobs to finish. | ||
| However, when the Odoo server crashes or is otherwise force-stopped, | ||
| running jobs are interrupted while the runner has no chance to know | ||
| they have been aborted. In such situations, jobs may remain in | ||
| ``started`` or ``enqueued`` state after the Odoo server is halted. | ||
| Since the runner has no way to know if they are actually running or | ||
| not, and does not know for sure if it is safe to restart the jobs, | ||
| it does not attempt to restart them automatically. Such stale jobs | ||
| therefore fill the running queue and prevent other jobs to start. | ||
| You must therefore requeue them manually, either from the Jobs view, | ||
| or by running the following SQL statement *before starting Odoo*: | ||
|
|
||
| .. code-block:: sql | ||
|
|
||
| update queue_job set state='pending' where state in ('started', 'enqueued') | ||
|
|
||
| .. rubric:: Footnotes | ||
|
|
||
| .. [1] From a security standpoint, it is safe to have an anonymous HTTP | ||
|
|
@@ -207,35 +191,14 @@ def _connection_info_for(db_name): | |
|
|
||
|
|
||
| def _async_http_get(scheme, host, port, user, password, db_name, job_uuid): | ||
| # Method to set failed job (due to timeout, etc) as pending, | ||
| # to avoid keeping it as enqueued. | ||
| def set_job_pending(): | ||
| connection_info = _connection_info_for(db_name) | ||
| conn = psycopg2.connect(**connection_info) | ||
| conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) | ||
| with closing(conn.cursor()) as cr: | ||
| cr.execute( | ||
| "UPDATE queue_job SET state=%s, " | ||
| "date_enqueued=NULL, date_started=NULL " | ||
| "WHERE uuid=%s and state=%s " | ||
| "RETURNING uuid", | ||
| (PENDING, job_uuid, ENQUEUED), | ||
| ) | ||
| if cr.fetchone(): | ||
| _logger.warning( | ||
| "state of job %s was reset from %s to %s", | ||
| job_uuid, | ||
| ENQUEUED, | ||
| PENDING, | ||
| ) | ||
|
|
||
| # TODO: better way to HTTP GET asynchronously (grequest, ...)? | ||
| # if this was python3 I would be doing this with | ||
| # asyncio, aiohttp and aiopg | ||
| def urlopen(): | ||
| url = "{}://{}:{}/queue_job/runjob?db={}&job_uuid={}".format( | ||
| scheme, host, port, db_name, job_uuid | ||
| ) | ||
| # pylint: disable=except-pass | ||
| try: | ||
| auth = None | ||
| if user: | ||
|
|
@@ -249,10 +212,10 @@ def urlopen(): | |
| # for codes between 500 and 600 | ||
| response.raise_for_status() | ||
| except requests.Timeout: | ||
| set_job_pending() | ||
| # A timeout is a normal behaviour, it shouldn't be logged as an exception | ||
| pass | ||
| except Exception: | ||
| _logger.exception("exception in GET %s", url) | ||
| set_job_pending() | ||
|
|
||
| thread = threading.Thread(target=urlopen) | ||
| thread.daemon = True | ||
|
|
@@ -333,6 +296,13 @@ def keep_alive(self): | |
| with closing(self.conn.cursor()) as cr: | ||
| cr.execute(query) | ||
|
|
||
| def set_job_pending(self, uuid): | ||
| with closing(self.conn.cursor()) as cr: | ||
| cr.execute( | ||
| "UPDATE queue_job SET state=%s, " "date_enqueued=NULL " "WHERE uuid=%s", | ||
| (PENDING, uuid), | ||
| ) | ||
|
|
||
| def set_job_enqueued(self, uuid): | ||
| with closing(self.conn.cursor()) as cr: | ||
| cr.execute( | ||
|
|
@@ -343,8 +313,104 @@ def set_job_enqueued(self, uuid): | |
| (ENQUEUED, uuid), | ||
| ) | ||
|
|
||
| def _query_requeue_dead_jobs(self): | ||
| return """ | ||
| UPDATE | ||
| queue_job | ||
| SET | ||
| state=( | ||
| CASE | ||
| WHEN | ||
| max_retries IS NOT NULL AND | ||
| retry IS NOT NULL AND | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would cherry pick: #816 that fixes the case of having max_retries = 0 |
||
| retry>max_retries | ||
| THEN 'failed' | ||
| ELSE 'pending' | ||
| END), | ||
| retry=(CASE WHEN state='started' THEN COALESCE(retry,0)+1 ELSE retry END), | ||
| exc_name=( | ||
| CASE | ||
| WHEN | ||
| max_retries IS NOT NULL AND | ||
| retry IS NOT NULL AND | ||
| retry>max_retries | ||
| THEN 'JobFoundDead' | ||
| ELSE exc_name | ||
| END), | ||
| exc_info=( | ||
| CASE | ||
| WHEN | ||
| max_retries IS NOT NULL AND | ||
| retry IS NOT NULL AND | ||
| retry>max_retries | ||
| THEN 'Job found dead after too many retries' | ||
| ELSE exc_info | ||
| END) | ||
| WHERE | ||
| id in ( | ||
| SELECT | ||
| queue_job_id | ||
| FROM | ||
| queue_job_lock | ||
| WHERE | ||
| queue_job_id in ( | ||
| SELECT | ||
| id | ||
| FROM | ||
| queue_job | ||
| WHERE | ||
| state IN ('enqueued','started') | ||
| AND date_enqueued < | ||
| (now() AT TIME ZONE 'utc' - INTERVAL '10 sec') | ||
| ) | ||
| FOR UPDATE SKIP LOCKED | ||
| ) | ||
| OR | ||
| id in ( | ||
| SELECT | ||
| id | ||
| FROM | ||
| queue_job | ||
| WHERE | ||
| state = 'started' AND NOT EXISTS ( | ||
| SELECT 1 FROM queue_job_lock | ||
| WHERE queue_job_id = queue_job.id | ||
| ) | ||
| FOR UPDATE SKIP LOCKED | ||
| ) | ||
| RETURNING uuid | ||
| """ | ||
|
|
||
| class QueueJobRunner(object): | ||
| def requeue_dead_jobs(self): | ||
| """ | ||
| Set started and enqueued jobs but not locked to pending | ||
|
|
||
| A job is locked when it's being executed | ||
| When a job is killed, it releases the lock | ||
|
|
||
| If the number of retries exceeds the number of max retries, | ||
| the job is set as 'failed' with the error 'JobFoundDead'. | ||
|
|
||
| Adding a buffer on 'date_enqueued' to check | ||
| that it has been enqueued for more than 10sec. | ||
| This prevents from requeuing jobs before they are actually started. | ||
|
|
||
| When Odoo shuts down normally, it waits for running jobs to finish. | ||
| However, when the Odoo server crashes or is otherwise force-stopped, | ||
| running jobs are interrupted while the runner has no chance to know | ||
| they have been aborted. | ||
| """ | ||
|
|
||
| with closing(self.conn.cursor()) as cr: | ||
| query = self._query_requeue_dead_jobs() | ||
|
|
||
| cr.execute(query) | ||
|
|
||
| for (uuid,) in cr.fetchall(): | ||
| _logger.warning("Re-queued dead job with uuid: %s", uuid) | ||
|
|
||
|
|
||
| class QueueJobRunner: | ||
| def __init__( | ||
| self, | ||
| scheme="http", | ||
|
|
@@ -421,9 +487,26 @@ def initialize_databases(self): | |
| self.db_by_name[db_name] = db | ||
| with db.select_jobs("state in %s", (NOT_DONE,)) as cr: | ||
| for job_data in cr: | ||
| # In case we have enqueued jobs we move them to pending, | ||
| # otherwise they remain enqueued and occupy channels slots. | ||
| if job_data[6] == "enqueued": | ||
| try: | ||
| self.db_by_name[db_name].set_job_pending(job_data[1]) | ||
| job_data = (*job_data[:6], "pending") | ||
| except Exception: | ||
| _logger.warning( | ||
| "error setting job %s to pending", | ||
| job_data[1], | ||
| exc_info=True, | ||
| ) | ||
|
Comment on lines
+490
to
+501
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you explain why do you do this? |
||
| self.channel_manager.notify(db_name, *job_data) | ||
| _logger.info("queue job runner ready for db %s", db_name) | ||
|
|
||
| def requeue_dead_jobs(self): | ||
| for db in self.db_by_name.values(): | ||
| if db.has_queue_job: | ||
| db.requeue_dead_jobs() | ||
|
|
||
| def run_jobs(self): | ||
| now = _odoo_now() | ||
| for job in self.channel_manager.get_jobs_to_run(now): | ||
|
|
@@ -516,6 +599,7 @@ def run(self): | |
| _logger.info("database connections ready") | ||
| # inner loop does the normal processing | ||
| while not self._stop: | ||
| self.requeue_dead_jobs() | ||
| self.process_notifications() | ||
| self.run_jobs() | ||
| self.wait_notification() | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| # License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) | ||
|
|
||
|
|
||
| def migrate(cr, version): | ||
| # Deactivate cron garbage collector | ||
| cr.execute( | ||
| """ | ||
| UPDATE | ||
| ir_cron | ||
| SET | ||
| active=False | ||
| WHERE id IN ( | ||
| SELECT res_id | ||
| FROM | ||
| ir_model_data | ||
| WHERE | ||
| module='queue_job' | ||
| AND model='ir.cron' | ||
| AND name='ir_cron_queue_job_garbage_collector' | ||
| ); | ||
| """ | ||
| ) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that I can be removed