Skip to content

Commit

Permalink
[CHG] multiprocessing: The Jobs workers are affected by the database …
Browse files Browse the repository at this point in the history
…being closed when they

run in the Cron Worker process.

Thus, when OpenERP run in multiprocess, the connector does not start job workers threads.
Instead, the new script ``openerp-connector-worker`` should be used. It spawns processes
which start the job workers threads themselves.

Example of usage:
$ PYTHONPATH=/path/to/server /path/to/connector/openerp-connector-worker \
  --config /path/to/configfile \
  --workers=2 --logfile=/path/to/logfile

This is not ideal as soon as we have to ensure that OpenERP AND the script are running.
However: it still works normally when OpenERP is not using multiprocessing and this change
allow more control on the worker processes (to implement PG's NOTIFY for instance).

More details in the nested history of the revision.
  • Loading branch information
guewen committed Sep 12, 2013
2 parents 0961df4 + ac316d0 commit 5966096
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 62 deletions.
32 changes: 32 additions & 0 deletions connector/doc/guides/multiprocessing.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
.. _multiprocessing:


######################################
Use the connector with multiprocessing
######################################

When OpenERP is launched with 1 process, the jobs worker will run
threaded in the same process.

When OpenERP is launched with multiple processes using the option
``--workers``, the jobs workers are not independant processes, however,
you have to launch them separately with the script
``openerp-connector-worker`` located in the connector module.

It takes the same arguments and configuration file than the OpenERP
server.

.. important:: The Python path must contain the path to the OpenERP
server when ``openerp-connector-worker`` is launched.

Example::

$ PYTHONPATH=/path/to/server connector/openerp-connector-worker --config /path/to/configfile \
--workers=2 --logfile=/path/to/logfile

The 'Enqueue Jobs' scheduled action is useless when multiprocessing is
used.

.. note:: The ``openerp-connector-worker`` should not be launched
alongside OpenERP when the latter does not run in multiprocess
mode, because the interprocess signaling would not be done.
1 change: 1 addition & 0 deletions connector/doc/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Developer's guide

guides/overview.rst
guides/bootstrap_connector.rst
guides/multiprocessing.rst

API Reference
=============
Expand Down
102 changes: 102 additions & 0 deletions connector/openerp-connector-worker
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#!/usr/bin/env python
import sys
import logging
import os
import signal
import time
import threading
from contextlib import closing

import openerp
from openerp.cli import server as servercli
import openerp.service.workers as workers
from openerp.modules.registry import RegistryManager
from openerp.tools import config

_logger = logging.getLogger('openerp-connector-worker')


class Multicornnector(workers.Multicorn):

def __init__(self, app):
super(Multicornnector, self).__init__(app)
self.address = ('0.0.0.0', 0)
self.population = config['workers'] or 1
self.workers_connector = {}

def process_spawn(self):
while len(self.workers_connector) < self.population:
self.worker_spawn(WorkerConnector, self.workers_connector)

def worker_pop(self, pid):
if pid in self.workers:
_logger.debug("Worker (%s) unregistered", pid)
try:
self.workers_connector.pop(pid, None)
u = self.workers.pop(pid)
u.close()
except OSError:
return


class WorkerConnector(workers.Worker):
""" HTTP Request workers """

def __init__(self, multi):
super(WorkerConnector, self).__init__(multi)
self.db_index = 0

def process_work(self):
if config['db_name']:
db_names = config['db_name'].split(',')
else:
services = openerp.netsvc.ExportService._services
if services.get('db'):
db_names = services['db'].exp_list(True)
else:
db_names = []
if len(db_names):
self.db_index = (self.db_index + 1) % len(db_names)
db_name = db_names[self.db_index]
self.setproctitle(db_name)
db = openerp.sql_db.db_connect(db_name)
threading.current_thread().dbname = db_name
with closing(db.cursor()) as cr:
cr.execute("SELECT 1 FROM ir_module_module "
"WHERE name = %s "
"AND state = %s", ('connector', 'installed'))
if cr.fetchone():
RegistryManager.check_registry_signaling(db_name)
registry = openerp.pooler.get_pool(db_name)
if registry:
queue_worker = registry['queue.worker']
queue_worker.assign_then_enqueue(cr,
openerp.SUPERUSER_ID,
max_jobs=50)
RegistryManager.signal_caches_change(db_name)
else:
self.db_index = 0

def sleep(self):
# Really sleep once all the databases have been processed.
if self.db_index == 0:
interval = 15 + self.pid % self.multi.population # chorus effect
time.sleep(interval)

def start(self):
workers.Worker.start(self)
openerp.service.start_internal()


if __name__ == "__main__":
args = sys.argv[1:]
servercli.check_root_user()
config.parse_config(args)

servercli.check_postgres_user()
openerp.netsvc.init_logger()
servercli.report_configuration()

openerp.multi_process = True
openerp.worker_connector = True
Multicornnector(openerp.service.wsgi_server.application).run()
56 changes: 28 additions & 28 deletions connector/queue/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from .job import STATES, DONE, PENDING, OpenERPJobStorage
from .worker import WORKER_TIMEOUT
from ..session import ConnectorSession
from .worker import watcher

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -180,7 +181,6 @@ class QueueWorker(orm.Model):
_rec_name = 'uuid'

worker_timeout = WORKER_TIMEOUT
_worker = None

_columns = {
'uuid': fields.char('UUID', readonly=True, select=True, required=True),
Expand All @@ -189,7 +189,7 @@ class QueueWorker(orm.Model):
'date_alive': fields.datetime('Last Alive Check', readonly=True),
'job_ids': fields.one2many('queue.job', 'worker_id',
string='Jobs', readonly=True),
}
}

def _notify_alive(self, cr, uid, worker, context=None):
worker_ids = self.search(cr, uid,
Expand All @@ -204,7 +204,6 @@ def _notify_alive(self, cr, uid, worker, context=None):
'date_start': now_fmt,
'date_alive': now_fmt},
context=context)
self._worker = worker
else:
self.write(cr, uid, worker_ids,
{'date_alive': now_fmt}, context=context)
Expand All @@ -218,23 +217,16 @@ def _purge_dead_workers(self, cr, uid, context=None):
dead_workers = self.read(cr, uid, dead_ids, ['uuid'], context=context)
for worker in dead_workers:
_logger.debug('Worker %s is dead', worker['uuid'])
# exists in self._workers only for the same process and pool
if worker['uuid'] == self._worker:
_logger.error('Worker %s should be alive, '
'but appears to be dead.',
worker['uuid'])
self._worker = None
try:
self.unlink(cr, uid, dead_ids, context=context)
except Exception:
_logger.debug("Failed attempt to unlink a dead worker, likely due "
"to another transaction in progress. "
"Trace of the failed unlink "
"%s attempt: ", self._worker.uuid, exc_info=True)
"to another transaction in progress.")

def _worker_id(self, cr, uid, context=None):
assert self._worker
worker_ids = self.search(cr, uid, [('uuid', '=', self._worker.uuid)],
worker = watcher.worker_for_db(cr.dbname)
assert worker
worker_ids = self.search(cr, uid, [('uuid', '=', worker.uuid)],
context=context)
assert len(worker_ids) == 1, ("%s worker found in database instead "
"of 1" % len(worker_ids))
Expand Down Expand Up @@ -268,7 +260,8 @@ def assign_jobs(self, cr, uid, max_jobs=None, context=None):
:param max_jobs: maximal limit of jobs to assign on a worker
:type max_jobs: int
"""
if self._worker:
worker = watcher.worker_for_db(cr.dbname)
if worker:
self._assign_jobs(cr, uid, max_jobs=max_jobs, context=context)
else:
_logger.debug('No worker started for process %s', os.getpid())
Expand All @@ -278,7 +271,8 @@ def enqueue_jobs(self, cr, uid, context=None):
""" Enqueue all the jobs assigned to the worker of the current
process
"""
if self._worker:
worker = watcher.worker_for_db(cr.dbname)
if worker:
self._enqueue_jobs(cr, uid, context=context)
else:
_logger.debug('No worker started for process %s', os.getpid())
Expand All @@ -295,6 +289,7 @@ def _assign_jobs(self, cr, uid, max_jobs=None, context=None):
# use a SAVEPOINT to be able to rollback this part of the
# transaction without failing the whole transaction if the LOCK
# cannot be acquired
worker = watcher.worker_for_db(cr.dbname)
cr.execute("SAVEPOINT queue_assign_jobs")
try:
cr.execute(sql, log_exceptions=False)
Expand All @@ -306,34 +301,39 @@ def _assign_jobs(self, cr, uid, max_jobs=None, context=None):
_logger.debug("Failed attempt to assign jobs, likely due to "
"another transaction in progress. "
"Trace of the failed assignment of jobs on worker "
"%s attempt: ", self._worker.uuid, exc_info=True)
"%s attempt: ", worker.uuid, exc_info=True)
return
job_rows = cr.fetchall()
if not job_rows:
_logger.debug('No job to assign to worker %s', self._worker.uuid)
_logger.debug('No job to assign to worker %s', worker.uuid)
return
job_ids = [id for id, in job_rows]

worker_id = self._worker_id(cr, uid, context=context)
_logger.debug('Assign %d jobs to worker %s', len(job_ids),
self._worker.uuid)
worker.uuid)
# ready to be enqueued in the worker
try:
self.pool.get('queue.job').write(cr, uid, job_ids,
{'state': 'pending',
'worker_id': worker_id},
context=context)
{'state': 'pending',
'worker_id': worker_id},
context=context)
except Exception:
pass # will be assigned to another worker

def _enqueue_jobs(self, cr, uid, context=None):
""" Called by an ir.cron, add to the queue all the jobs not
already queued"""
""" Add to the queue of the worker all the jobs not
yet queued but already assigned."""
job_obj = self.pool.get('queue.job')
db_worker_id = self._worker_id(cr, uid, context=context)
db_worker = self.browse(cr, uid, db_worker_id, context=context)
for job in db_worker.job_ids:
if job.state == 'pending':
self._worker.enqueue_job_uuid(job.uuid)
job_ids = job_obj.search(cr, uid,
[('worker_id', '=', db_worker_id),
('state', '=', 'pending')],
context=context)
worker = watcher.worker_for_db(cr.dbname)
jobs = job_obj.read(cr, uid, job_ids, ['uuid'], context=context)
for job in jobs:
worker.enqueue_job_uuid(job['uuid'])


class requeue_job(orm.TransientModel):
Expand Down
Loading

0 comments on commit 5966096

Please sign in to comment.