diff --git a/README.rst b/README.rst index a924b83a..49a34218 100644 --- a/README.rst +++ b/README.rst @@ -155,6 +155,18 @@ with the path to your queue class:: python manage.py rqworker high default low --queue-class 'path.to.CustomQueue' +Queues cleanup +--------------- +django_rq provides a management command that flushes the queue specified as argument:: + + python manage.py rqflush --queue high + +If you don't specify any queue it will flush the default + +You can suppress confirmation message if you use the option --noinput + + python manage.py rqflush --queue high --noinput + Support for RQ Scheduler ------------------------ diff --git a/django_rq/management/commands/rqclean.py b/django_rq/management/commands/rqclean.py new file mode 100644 index 00000000..ac6e0659 --- /dev/null +++ b/django_rq/management/commands/rqclean.py @@ -0,0 +1,25 @@ +from django.core.management.base import BaseCommand + +from django_rq import get_queue + + +class Command(BaseCommand): + """ + Removes all queue jobs + """ + help = __doc__ + + def add_arguments(self, parser): + parser.add_argument('--queue', '-q', dest='queue', default='default', + help='Specify the queue [default]') + + def handle(self, *args, **options): + """ + Queues the function given with the first argument with the + parameters given with the rest of the argument list. + """ + verbosity = int(options.get('verbosity', 1)) + queue = get_queue(options.get('queue')) + queue.empty() + if verbosity: + print('Queue "%s" cleaned' % queue.name) diff --git a/django_rq/management/commands/rqflush.py b/django_rq/management/commands/rqflush.py new file mode 100644 index 00000000..91296171 --- /dev/null +++ b/django_rq/management/commands/rqflush.py @@ -0,0 +1,42 @@ +from django.core.management.base import BaseCommand + +from django.utils.six.moves import input + +from django_rq import get_queue + + +class Command(BaseCommand): + """ + Flushes the queue specified as argument + """ + help = __doc__ + + def add_arguments(self, parser): + parser.add_argument( + '--noinput', '--no-input', action='store_false', + dest='interactive', default=True, + help='Tells Django to NOT prompt the user for input of any kind.', + ) + parser.add_argument('--queue', '-q', dest='queue', default='default', + help='Specify the queue [default]') + + def handle(self, *args, **options): + verbosity = int(options.get('verbosity', 1)) + interactive = options['interactive'] + queue = get_queue(options.get('queue')) + + if interactive: + confirm = input("""You have requested a flush the "%s" queue. +Are you sure you want to do this? + + Type 'yes' to continue, or 'no' to cancel: """ % queue.name) + else: + confirm = 'yes' + + if confirm == 'yes': + queue.empty() + if verbosity: + print('Queue "%s" flushed.' % queue.name) + else: + if verbosity: + print("Flush cancelled.") diff --git a/django_rq/management/commands/rqworker.py b/django_rq/management/commands/rqworker.py index f70a8336..ab2df02f 100644 --- a/django_rq/management/commands/rqworker.py +++ b/django_rq/management/commands/rqworker.py @@ -5,6 +5,7 @@ import sys from django.core.management.base import BaseCommand +from django.db import connections from django.utils.version import get_version from django_rq.queues import get_queues @@ -34,6 +35,11 @@ def import_attribute(name): return getattr(module, attribute) +def reset_db_connections(): + for c in connections.all(): + c.close() + + class Command(BaseCommand): """ Runs RQ workers on specified queues. Note that all queues passed into a @@ -84,6 +90,8 @@ def handle(self, *args, **options): # Call use_connection to push the redis connection into LocalStack # without this, jobs using RQ's get_current_job() will fail use_connection(w.connection) + # Close any opened DB connection before any fork + reset_db_connections() w.work(burst=options.get('burst', False)) except ConnectionError as e: print(e) diff --git a/django_rq/tests/tests.py b/django_rq/tests/tests.py index 670876f2..5c044b98 100644 --- a/django_rq/tests/tests.py +++ b/django_rq/tests/tests.py @@ -1,6 +1,9 @@ +import sys + from django.contrib.auth.models import User from django.core.management import call_command from django.core.urlresolvers import reverse +from django.db import transaction from django.test import TestCase try: from unittest import skipIf @@ -11,6 +14,7 @@ from django.test import override_settings except ImportError: from django.test.utils import override_settings +from django.utils.six import StringIO from django.conf import settings from rq import get_current_job, Queue @@ -78,6 +82,16 @@ def get_queue_index(name='default'): return queue_index +def stub_stdin(testcase_inst, inputs): + stdin = sys.stdin + + def cleanup(): + sys.stdin = stdin + + testcase_inst.addCleanup(cleanup) + sys.stdin = StringIO(inputs) + + @override_settings(RQ={'AUTOCOMMIT': True}) class QueuesTest(TestCase): @@ -305,6 +319,32 @@ def test_default_timeout(self): queue = get_queue('test1') self.assertEqual(queue._default_timeout, 400) + def test_rqflush_default(self): + queue = get_queue() + queue.enqueue(divide, 42, 1) + call_command("rqflush", "--verbosity", "0", "--noinput") + self.assertFalse(queue.jobs) + + def test_rqflush_test3(self): + queue = get_queue("test3") + queue.enqueue(divide, 42, 1) + call_command("rqflush", "--queue", "test3", "--verbosity", "0", "--noinput") + self.assertFalse(queue.jobs) + + def test_rqflush_test3_interactive_yes(self): + queue = get_queue("test3") + queue.enqueue(divide, 42, 1) + stub_stdin(self, "yes") + call_command("rqflush", "--queue", "test3", "--verbosity", "0") + self.assertFalse(queue.jobs) + + def test_rqflush_test3_interactive_no(self): + queue = get_queue("test3") + queue.enqueue(divide, 42, 1) + stub_stdin(self, "no") + call_command("rqflush", "--queue", "test3", "--verbosity", "0") + self.assertTrue(queue.jobs) + @override_settings(RQ={'AUTOCOMMIT': True}) class DecoratorTest(TestCase): @@ -580,6 +620,73 @@ def test_error(self): self.assertEqual(queue.count, 0) +class ThreadQueueWithTransactionAtomicTest(TestCase): + + @override_settings(RQ={'AUTOCOMMIT': True}) + def test_enqueue_autocommit_on(self): + """ + Running ``enqueue`` when AUTOCOMMIT is on should + immediately persist job into Redis. + """ + queue = get_queue() + queue.empty() + with transaction.atomic(): + job = queue.enqueue(divide, 1, 1) + self.assertTrue(job.id in queue.job_ids) + job.delete() + + @override_settings(RQ={'AUTOCOMMIT': False}) + def test_enqueue_autocommit_off(self): + """ + Running ``enqueue`` when AUTOCOMMIT is off should + puts the job in the delayed queue but ... + """ + thread_queue.clear() + queue = get_queue() + queue.empty() + with transaction.atomic(): + queue.enqueue(divide, 1, 1) + + # the following call SHOULDN'T BE necessary + # it should be called by an on_commit hook + # https://docs.djangoproject.com/en/1.10/topics/db/transactions/#django.db.transaction.on_commit + thread_queue.commit() + + job = queue.dequeue() + self.assertTrue(job) + self.assertTrue(job.func.func_name, "divide") + job.delete() + self.assertFalse(queue.dequeue()) + + @override_settings(RQ={'AUTOCOMMIT': False}) + def test_enqueue_autocommit_offand_db_error(self): + """ + Running ``enqueue`` when AUTOCOMMIT is off should + puts the job in the delayed queue only if dba transaction succedes + """ + thread_queue.clear() + queue = get_queue() + queue.empty() + + try: + with transaction.atomic(): + queue.enqueue(divide, 1, 1) + # something went wrong on DB + assert False + except AssertionError: + # the following call SHOULDN'T BE necessary + # but if you don't make it, the final situation would be inconsistent: + # DB transaction has failed but job is enqueued + thread_queue.clear() + + # the following call SHOULDN'T BE necessary + thread_queue.commit() + + job = queue.dequeue() + self.assertFalse(job) + self.assertFalse(queue.dequeue()) + + class SchedulerTest(TestCase): @skipIf(RQ_SCHEDULER_INSTALLED is False, 'RQ Scheduler not installed') diff --git a/integration_test/README.md b/integration_test/README.md new file mode 100644 index 00000000..632f7710 --- /dev/null +++ b/integration_test/README.md @@ -0,0 +1,29 @@ +A sample project to test rqworker and site interraction + +## Prerequisites + +Install PostgreSQL + + sudo apt-get install postgresql + +Create user and database + + sudo -u postgres psql + # drop database djangorqdb; + # drop user djangorqusr; + # create user djangorqusr with createrole superuser password 'djangorqusr'; + # create database djangorqdb owner djangorqusr; + +Init database schema + + ./manage.py migrate + +Install required packages: + + pip install -r requirements.txt + +## Test + +To run tests: + + python _test.py \ No newline at end of file diff --git a/integration_test/_tests.py b/integration_test/_tests.py new file mode 100644 index 00000000..0f916f10 --- /dev/null +++ b/integration_test/_tests.py @@ -0,0 +1,144 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals +from __future__ import print_function +from __future__ import absolute_import + +import logging +import os +import signal +import subprocess +import sys +import time +import unittest + +from django.conf import settings +import psycopg2 +import requests +from six.moves.urllib.parse import urlunsplit + +DJANGO_SETTINGS_MODULE = "integration_test.settings" +os.environ.setdefault("DJANGO_SETTINGS_MODULE", DJANGO_SETTINGS_MODULE) + +logger = logging.getLogger(__name__) +logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) + + +class Process(object): + @staticmethod + def _command(args): + return list(args) + + @classmethod + def run(cls, *args): + subprocess.check_call(cls._command(args)) + + def __init__(self, *args): + self.args = list(args) + + def start(self): + self.process = subprocess.Popen(self._command(self.args), preexec_fn=os.setsid) + logger.info("START PROCESS args:{} pid:{}".format(self.args, self.process.pid)) + time.sleep(1) + + def stop(self): + # to be sure we kill all the children: + os.killpg(self.process.pid, signal.SIGTERM) + + def __enter__(self): + self.start() + return self + + def __exit__(self, *args): + self.stop() + + +class DjangoCommand(Process): + @staticmethod + def _command(args): + return ["./manage.py"] + list(args) + ["--settings", DJANGO_SETTINGS_MODULE] + + +def terminate_all_postgres_connections(profile="default"): + db_settings = settings.DATABASES[profile] + conn_params = { + 'database': 'template1', + 'user': db_settings["USER"], + 'password': db_settings["PASSWORD"], + 'host': db_settings["HOST"], + 'port': db_settings["PORT"], + } + with psycopg2.connect(**conn_params) as conn: + cursor = conn.cursor() + cursor.execute(""" + SELECT pg_terminate_backend(pg_stat_activity.pid) + FROM pg_stat_activity + WHERE pg_stat_activity.datname = %s + """, (db_settings["NAME"], )) + + +class IntegrationTest(unittest.TestCase): + ADDRPORT = "127.0.0.1:8000" + HOME_URL = urlunsplit(("http", ADDRPORT, "/", "", "")) + + def setUp(self): + DjangoCommand.run("flush", "--noinput") + # self.site = DjangoCommand("runserver", self.ADDRPORT) + self.site = Process( + "gunicorn", "-b", self.ADDRPORT, + "--timeout", "600", # usefull for worker debugging + "integration_test.wsgi:application") + self.site.start() + + def tearDown(self): + self.site.stop() + + def assertFailure(self): + r = requests.get(self.HOME_URL) + self.assertEqual(r.status_code, 500) + + def assertEntries(self, expected): + r = requests.get(self.HOME_URL) + self.assertEqual(r.status_code, 200) + self.assertEqual(r.text, "Entries: {}".format(",".join(expected))) + + def enqueue(self, name): + r = requests.post(self.HOME_URL, {"name": name}) + self.assertEqual(r.status_code, 200) + self.assertEqual(r.text, "Enqueued") + + def test_db_is_empty(self): + self.assertEntries([]) + + def test_burst(self): + self.enqueue("first") + DjangoCommand.run("rqworker", "--burst") + self.assertEntries(["first"]) + + def test_site_fails_and_the_reconnects(self): + self.enqueue("first") + DjangoCommand.run("rqworker", "--burst") + + terminate_all_postgres_connections() + + # the DB connection is gone, so the worker must first detect the problem: + self.assertFailure() + # now the gunicorn worker is ok again: + self.assertEntries(["first"]) + + def test_worker_lost_connection(self): + with DjangoCommand("rqworker") as worker: + self.enqueue("first") + time.sleep(2) # wait for the worker to do the job + self.assertEntries(["first"]) # job is done + + terminate_all_postgres_connections() + + self.enqueue("second") + time.sleep(2) # wait for the worker to do the job + + self.assertFailure() # let the gunicorn worker reconnect + self.assertEntries(["first", "second"]) # work is done + + +if __name__ == '__main__': + unittest.main() diff --git a/integration_test/integration_app/__init__.py b/integration_test/integration_app/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/integration_test/integration_app/admin.py b/integration_test/integration_app/admin.py new file mode 100644 index 00000000..8c38f3f3 --- /dev/null +++ b/integration_test/integration_app/admin.py @@ -0,0 +1,3 @@ +from django.contrib import admin + +# Register your models here. diff --git a/integration_test/integration_app/apps.py b/integration_test/integration_app/apps.py new file mode 100644 index 00000000..a7352cf1 --- /dev/null +++ b/integration_test/integration_app/apps.py @@ -0,0 +1,7 @@ +from __future__ import unicode_literals + +from django.apps import AppConfig + + +class IntegrationAppConfig(AppConfig): + name = 'integration_app' diff --git a/integration_test/integration_app/migrations/0001_initial.py b/integration_test/integration_app/migrations/0001_initial.py new file mode 100644 index 00000000..eddef2d3 --- /dev/null +++ b/integration_test/integration_app/migrations/0001_initial.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.10.5 on 2017-02-09 15:24 +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [ + ] + + operations = [ + migrations.CreateModel( + name='MyModel', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('name', models.TextField(unique=True)), + ], + ), + ] diff --git a/integration_test/integration_app/migrations/__init__.py b/integration_test/integration_app/migrations/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/integration_test/integration_app/models.py b/integration_test/integration_app/models.py new file mode 100644 index 00000000..c6b5869d --- /dev/null +++ b/integration_test/integration_app/models.py @@ -0,0 +1,15 @@ +from django.db import models + + +class MyModel(models.Model): + name = models.TextField(unique=True) + + +def add_mymodel(name): + m = MyModel(name=name) + m.save() + + +# causes a DB connection at import-time +# see TestIntegration.test_worker_lost_connection +list(MyModel.objects.all()) diff --git a/integration_test/integration_app/tests.py b/integration_test/integration_app/tests.py new file mode 100644 index 00000000..7ce503c2 --- /dev/null +++ b/integration_test/integration_app/tests.py @@ -0,0 +1,3 @@ +from django.test import TestCase + +# Create your tests here. diff --git a/integration_test/integration_app/views.py b/integration_test/integration_app/views.py new file mode 100644 index 00000000..300044df --- /dev/null +++ b/integration_test/integration_app/views.py @@ -0,0 +1,16 @@ +from django.http import HttpResponse +from django.views.decorators.csrf import csrf_exempt + +from .models import * + +import django_rq + + +@csrf_exempt +def home(request): + if request.method == 'POST': + django_rq.enqueue(add_mymodel, request.POST["name"]) + return HttpResponse("Enqueued") + names = [m.name for m in MyModel.objects.order_by("name")] + return HttpResponse("Entries: {}".format(",".join(names))) + diff --git a/integration_test/integration_test/__init__.py b/integration_test/integration_test/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/integration_test/integration_test/settings.py b/integration_test/integration_test/settings.py new file mode 100644 index 00000000..4e243b83 --- /dev/null +++ b/integration_test/integration_test/settings.py @@ -0,0 +1,136 @@ +""" +Django settings for integration_test project. + +Generated by 'django-admin startproject' using Django 1.10.5. + +For more information on this file, see +https://docs.djangoproject.com/en/1.10/topics/settings/ + +For the full list of settings and their values, see +https://docs.djangoproject.com/en/1.10/ref/settings/ +""" + +import os + +# Build paths inside the project like this: os.path.join(BASE_DIR, ...) +BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + + +# Quick-start development settings - unsuitable for production +# See https://docs.djangoproject.com/en/1.10/howto/deployment/checklist/ + +# SECURITY WARNING: keep the secret key used in production secret! +SECRET_KEY = '!s1kl4g@+13igo3-&47f4+5-zfj!3j&n*sw$32@m%d65*muwni' + +# SECURITY WARNING: don't run with debug turned on in production! +DEBUG = True + +ALLOWED_HOSTS = [] + + +# Application definition + +INSTALLED_APPS = [ + 'django.contrib.admin', + 'django.contrib.auth', + 'django.contrib.contenttypes', + 'django.contrib.sessions', + 'django.contrib.messages', + 'django.contrib.staticfiles', + 'django_rq', + 'integration_app', +] + +MIDDLEWARE = [ + 'django.middleware.security.SecurityMiddleware', + 'django.contrib.sessions.middleware.SessionMiddleware', + 'django.middleware.common.CommonMiddleware', + 'django.middleware.csrf.CsrfViewMiddleware', + 'django.contrib.auth.middleware.AuthenticationMiddleware', + 'django.contrib.messages.middleware.MessageMiddleware', + 'django.middleware.clickjacking.XFrameOptionsMiddleware', +] + +ROOT_URLCONF = 'integration_test.urls' + +TEMPLATES = [ + { + 'BACKEND': 'django.template.backends.django.DjangoTemplates', + 'DIRS': [], + 'APP_DIRS': True, + 'OPTIONS': { + 'context_processors': [ + 'django.template.context_processors.debug', + 'django.template.context_processors.request', + 'django.contrib.auth.context_processors.auth', + 'django.contrib.messages.context_processors.messages', + ], + }, + }, +] + +WSGI_APPLICATION = 'integration_test.wsgi.application' + + +# Database +# https://docs.djangoproject.com/en/1.10/ref/settings/#databases + +DATABASES = { + 'default': { + 'ENGINE': 'django.db.backends.postgresql_psycopg2', + 'NAME': 'djangorqdb', + 'USER': 'djangorqusr', + 'PASSWORD': 'djangorqusr', + 'HOST': 'localhost', + 'PORT': '5432', + 'CONN_MAX_AGE': 10 * 60, + 'ATOMIC_REQUESTS': True, + }, +} + + +# Password validation +# https://docs.djangoproject.com/en/1.10/ref/settings/#auth-password-validators + +AUTH_PASSWORD_VALIDATORS = [ + { + 'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator', + }, + { + 'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator', + }, + { + 'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator', + }, + { + 'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator', + }, +] + + +# Internationalization +# https://docs.djangoproject.com/en/1.10/topics/i18n/ + +LANGUAGE_CODE = 'en-us' + +TIME_ZONE = 'UTC' + +USE_I18N = True + +USE_L10N = True + +USE_TZ = True + + +# Static files (CSS, JavaScript, Images) +# https://docs.djangoproject.com/en/1.10/howto/static-files/ + +STATIC_URL = '/static/' + +RQ_QUEUES = { + 'default': { + 'HOST': 'localhost', + 'PORT': 6379, + 'DB': 0, + } +} diff --git a/integration_test/integration_test/urls.py b/integration_test/integration_test/urls.py new file mode 100644 index 00000000..d22b442f --- /dev/null +++ b/integration_test/integration_test/urls.py @@ -0,0 +1,23 @@ +"""integration_test URL Configuration + +The `urlpatterns` list routes URLs to views. For more information please see: + https://docs.djangoproject.com/en/1.10/topics/http/urls/ +Examples: +Function views + 1. Add an import: from my_app import views + 2. Add a URL to urlpatterns: url(r'^$', views.home, name='home') +Class-based views + 1. Add an import: from other_app.views import Home + 2. Add a URL to urlpatterns: url(r'^$', Home.as_view(), name='home') +Including another URLconf + 1. Import the include() function: from django.conf.urls import url, include + 2. Add a URL to urlpatterns: url(r'^blog/', include('blog.urls')) +""" +from django.conf.urls import url +from django.contrib import admin +from integration_app import views + +urlpatterns = [ + url(r'^$', views.home, name='home'), + url(r'^admin/', admin.site.urls), +] diff --git a/integration_test/integration_test/wsgi.py b/integration_test/integration_test/wsgi.py new file mode 100644 index 00000000..a90cef97 --- /dev/null +++ b/integration_test/integration_test/wsgi.py @@ -0,0 +1,16 @@ +""" +WSGI config for integration_test project. + +It exposes the WSGI callable as a module-level variable named ``application``. + +For more information on this file, see +https://docs.djangoproject.com/en/1.10/howto/deployment/wsgi/ +""" + +import os + +from django.core.wsgi import get_wsgi_application + +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "integration_test.settings") + +application = get_wsgi_application() diff --git a/integration_test/manage.py b/integration_test/manage.py new file mode 100755 index 00000000..d3f36496 --- /dev/null +++ b/integration_test/manage.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python +import os +import sys + +if __name__ == "__main__": + os.environ.setdefault("DJANGO_SETTINGS_MODULE", "integration_test.settings") + try: + from django.core.management import execute_from_command_line + except ImportError: + # The above import may fail for some other reason. Ensure that the + # issue is really that Django is missing to avoid masking other + # exceptions on Python 2. + try: + import django + except ImportError: + raise ImportError( + "Couldn't import Django. Are you sure it's installed and " + "available on your PYTHONPATH environment variable? Did you " + "forget to activate a virtual environment?" + ) + raise + execute_from_command_line(sys.argv) diff --git a/integration_test/requirements.txt b/integration_test/requirements.txt new file mode 100644 index 00000000..91243393 --- /dev/null +++ b/integration_test/requirements.txt @@ -0,0 +1,5 @@ +-e .. +Django==1.10.5 +gunicorn==19.6.0 +psycopg2==2.6.2 +requests==2.13.0