diff --git a/tenant_schemas/management/commands/__init__.py b/tenant_schemas/management/commands/__init__.py index c3c28edd..c8059437 100644 --- a/tenant_schemas/management/commands/__init__.py +++ b/tenant_schemas/management/commands/__init__.py @@ -150,7 +150,7 @@ def add_arguments(self, parser): 'migration. Use the name "zero" to unapply all migrations.')) parser.add_argument("-s", "--schema", dest="schema_name") parser.add_argument('--executor', action='store', dest='executor', default=None, - help='Executor for running migrations [standard (default)|parallel]') + help='Executor for running migrations [standard (default)|parallel|async]') def handle(self, *args, **options): self.sync_tenant = options.get('tenant') diff --git a/tenant_schemas/migration_executors/__init__.py b/tenant_schemas/migration_executors/__init__.py index b83db186..524a55bc 100644 --- a/tenant_schemas/migration_executors/__init__.py +++ b/tenant_schemas/migration_executors/__init__.py @@ -3,6 +3,7 @@ from tenant_schemas.migration_executors.base import MigrationExecutor from tenant_schemas.migration_executors.parallel import ParallelExecutor from tenant_schemas.migration_executors.standard import StandardExecutor +from tenant_schemas.migration_executors.async import CeleryExecutor def get_executor(codename=None): diff --git a/tenant_schemas/migration_executors/async.py b/tenant_schemas/migration_executors/async.py new file mode 100644 index 00000000..0575fabf --- /dev/null +++ b/tenant_schemas/migration_executors/async.py @@ -0,0 +1,25 @@ +from django.conf import settings +from celery import group + +from tenant_schemas.migration_executors.base import MigrationExecutor + + +class CeleryExecutor(MigrationExecutor): + codename = 'async' + + def run_migrations(self, tenants): + is_interactive = getattr(settings, 'TENANT_INTERACTIVE_MIGRATION', False) + self.options['interactive'] = is_interactive + super(CeleryExecutor, self).run_migrations(tenants) + + def run_tenant_migrations(self, tenants): + from tenant_schemas.tasks import run_schema_migration + tenant_migrations = group( + run_schema_migration.s(self.args, self.options, schema_name) + for schema_name in tenants + ) + result = tenant_migrations.apply_async() + unsuccessfully_migrated_schemas = filter(lambda schema_name: schema_name is not None, result.get()) + self.logger.info('Completed migrations for private tenants: {} correct, {} incorrect ({})'.format( + len(tenants) - len(unsuccessfully_migrated_schemas), len(unsuccessfully_migrated_schemas), + ", ".join(str(x) for x in unsuccessfully_migrated_schemas))) diff --git a/tenant_schemas/migration_executors/base.py b/tenant_schemas/migration_executors/base.py index 5ce6c599..05fd0aa3 100644 --- a/tenant_schemas/migration_executors/base.py +++ b/tenant_schemas/migration_executors/base.py @@ -1,64 +1,91 @@ +import logging +import os import sys +from django.conf import settings from django.core.management.commands.migrate import Command as MigrateCommand from django.db import transaction - from tenant_schemas.utils import get_public_schema_name -def run_migrations(args, options, executor_codename, schema_name, allow_atomic=True): - from django.core.management import color - from django.core.management.base import OutputWrapper - from django.db import connection - - style = color.color_style() - - def style_func(msg): - return '[%s:%s] %s' % ( - style.NOTICE(executor_codename), - style.NOTICE(schema_name), - msg - ) - - stdout = OutputWrapper(sys.stdout) - stdout.style_func = style_func - stderr = OutputWrapper(sys.stderr) - stderr.style_func = style_func - if int(options.get('verbosity', 1)) >= 1: - stdout.write(style.NOTICE("=== Running migrate for schema %s" % schema_name)) - - connection.set_schema(schema_name) - MigrateCommand(stdout=stdout, stderr=stderr).execute(*args, **options) - - try: - transaction.commit() - connection.close() - connection.connection = None - except transaction.TransactionManagementError: - if not allow_atomic: - raise - - # We are in atomic transaction, don't close connections - pass - - connection.set_schema_to_public() - - class MigrationExecutor(object): codename = None + LOGGER_NAME = 'migration' def __init__(self, args, options): self.args = args self.options = options + self.logger = self.get_or_create_logger() def run_migrations(self, tenants): public_schema_name = get_public_schema_name() - if public_schema_name in tenants: - run_migrations(self.args, self.options, self.codename, public_schema_name) + self.logger.info("Started migration for public tenant") + self.run_migration(public_schema_name) tenants.pop(tenants.index(public_schema_name)) + if tenants: + self.logger.info("Started migrations for {} private tenants".format(len(tenants))) + self.run_tenant_migrations(tenants) + + def run_migration(self, schema_name, allow_atomic=True): + from django.core.management import color + from django.core.management.base import OutputWrapper + from django.db import connection + + style = color.color_style() + executor_codename = self.codename + + def style_func(msg): + return '[%s:%s] %s' % ( + style.NOTICE(executor_codename), + style.NOTICE(schema_name), + msg + ) + + stdout = OutputWrapper(sys.stdout) + stdout.style_func = style_func + stderr = OutputWrapper(sys.stderr) + stderr.style_func = style_func + if int(self.options.get('verbosity', 1)) >= 1: + stdout.write(style.NOTICE("=== Running migrate for schema %s" % schema_name)) + + connection.set_schema(schema_name) + + try: + MigrateCommand(stdout=stdout, stderr=stderr).execute(*self.args, **self.options) + except Exception as e: + self.logger.error('Migration fails for tenant {}. Error: {}'.format(schema_name, str(e))) + raise + + try: + transaction.commit() + connection.close() + connection.connection = None + except transaction.TransactionManagementError: + if not allow_atomic: + raise - self.run_tenant_migrations(tenants) + # We are in atomic transaction, don't close connections + pass + + connection.set_schema_to_public() def run_tenant_migrations(self, tenant): raise NotImplementedError + + @classmethod + def get_or_create_logger(cls): + """ + Return logger for migration executor. + Configure logger handlers if they are not already configured + """ + logger = logging.getLogger(cls.LOGGER_NAME) + if len(logger.handlers) == 0: + logger_path = getattr(settings, 'TENANT_MIGRATION_LOGGER_PATH', '') + hdlr = logging.FileHandler(os.path.join(logger_path, '{}.log'.format(cls.LOGGER_NAME))) + formatter = logging.Formatter('[%(asctime)s][%(levelname)s] %(message)s') + hdlr.setFormatter(formatter) + logger.addHandler(hdlr) + logger.setLevel(logging.INFO) + + return logger diff --git a/tenant_schemas/migration_executors/parallel.py b/tenant_schemas/migration_executors/parallel.py index 9228378f..73a6de63 100644 --- a/tenant_schemas/migration_executors/parallel.py +++ b/tenant_schemas/migration_executors/parallel.py @@ -3,7 +3,17 @@ from django.conf import settings -from tenant_schemas.migration_executors.base import MigrationExecutor, run_migrations +from tenant_schemas.migration_executors.base import MigrationExecutor + + +def _run_migration_alias(args, options, tenants, allow_atomic=True): + """ + Alias for run_migration method that allows the method to be called in a + multiprocessing pool + """ + parallel_executor = ParallelExecutor(args, options) + parallel_executor.run_migration(tenants, allow_atomic=allow_atomic) + return class ParallelExecutor(MigrationExecutor): @@ -20,10 +30,9 @@ def run_tenant_migrations(self, tenants): connection.connection = None run_migrations_p = functools.partial( - run_migrations, + _run_migration_alias, self.args, self.options, - self.codename, allow_atomic=False ) p = multiprocessing.Pool(processes=processes) diff --git a/tenant_schemas/migration_executors/standard.py b/tenant_schemas/migration_executors/standard.py index be5eb2d6..aaed7767 100644 --- a/tenant_schemas/migration_executors/standard.py +++ b/tenant_schemas/migration_executors/standard.py @@ -1,4 +1,4 @@ -from tenant_schemas.migration_executors.base import MigrationExecutor, run_migrations +from tenant_schemas.migration_executors.base import MigrationExecutor class StandardExecutor(MigrationExecutor): @@ -6,4 +6,4 @@ class StandardExecutor(MigrationExecutor): def run_tenant_migrations(self, tenants): for schema_name in tenants: - run_migrations(self.args, self.options, self.codename, schema_name) + self.run_migration(schema_name) diff --git a/tenant_schemas/tasks.py b/tenant_schemas/tasks.py new file mode 100644 index 00000000..7d83d620 --- /dev/null +++ b/tenant_schemas/tasks.py @@ -0,0 +1,11 @@ +from celery import shared_task +from tenant_schemas.migration_executors.base import MigrationExecutor + + +@shared_task(ignore_result=False) +def run_schema_migration(args, options, schema_name): + migration_executor = MigrationExecutor(args, options) + try: + migration_executor.run_migration(schema_name) + except Exception: + return schema_name diff --git a/tox.ini b/tox.ini index 29ee2643..03834908 100644 --- a/tox.ini +++ b/tox.ini @@ -1,16 +1,17 @@ [tox] -envlist = py{27,35}-dj{18,110,111}-{standard,parallel} +envlist = py{27,35}-dj{18,110,111}-{standard,parallel,async} [travis:env] DJANGO = - 1.8: dj18-{standard,parallel} - 1.10: dj110-{standard,parallel} - 1.11: dj111-{standard,parallel} + 1.8: dj18-{standard,parallel,async} + 1.10: dj110-{standard,parallel,async} + 1.11: dj111-{standard,parallel,async} [testenv] usedevelop = True deps = + celery coverage mock tblib @@ -25,6 +26,7 @@ passenv = PG_NAME PG_USER PG_PASSWORD PG_HOST PG_PORT setenv = standard: MIGRATION_EXECUTOR=standard parallel: MIGRATION_EXECUTOR=parallel + async: MIGRATION_EXECUTOR=async commands = coverage run manage.py test --noinput {posargs:tenant_schemas}