Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous migration executor #553

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tenant_schemas/management/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def add_arguments(self, parser):
'migration. Use the name "zero" to unapply all migrations.'))
parser.add_argument("-s", "--schema", dest="schema_name")
parser.add_argument('--executor', action='store', dest='executor', default=None,
help='Executor for running migrations [standard (default)|parallel]')
help='Executor for running migrations [standard (default)|parallel|async]')

def handle(self, *args, **options):
self.sync_tenant = options.get('tenant')
Expand Down
1 change: 1 addition & 0 deletions tenant_schemas/migration_executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from tenant_schemas.migration_executors.base import MigrationExecutor
from tenant_schemas.migration_executors.parallel import ParallelExecutor
from tenant_schemas.migration_executors.standard import StandardExecutor
from tenant_schemas.migration_executors.async import CeleryExecutor


def get_executor(codename=None):
Expand Down
25 changes: 25 additions & 0 deletions tenant_schemas/migration_executors/async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from django.conf import settings
from celery import group

from tenant_schemas.migration_executors.base import MigrationExecutor


class CeleryExecutor(MigrationExecutor):
codename = 'async'

def run_migrations(self, tenants):
is_interactive = getattr(settings, 'TENANT_INTERACTIVE_MIGRATION', False)
self.options['interactive'] = is_interactive
super(CeleryExecutor, self).run_migrations(tenants)

def run_tenant_migrations(self, tenants):
from tenant_schemas.tasks import run_schema_migration
tenant_migrations = group(
run_schema_migration.s(self.args, self.options, schema_name)
for schema_name in tenants
)
result = tenant_migrations.apply_async()
unsuccessfully_migrated_schemas = filter(lambda schema_name: schema_name is not None, result.get())
self.logger.info('Completed migrations for private tenants: {} correct, {} incorrect ({})'.format(
len(tenants) - len(unsuccessfully_migrated_schemas), len(unsuccessfully_migrated_schemas),
", ".join(str(x) for x in unsuccessfully_migrated_schemas)))
111 changes: 69 additions & 42 deletions tenant_schemas/migration_executors/base.py
Original file line number Diff line number Diff line change
@@ -1,64 +1,91 @@
import logging
import os
import sys

from django.conf import settings
from django.core.management.commands.migrate import Command as MigrateCommand
from django.db import transaction

from tenant_schemas.utils import get_public_schema_name


def run_migrations(args, options, executor_codename, schema_name, allow_atomic=True):
from django.core.management import color
from django.core.management.base import OutputWrapper
from django.db import connection

style = color.color_style()

def style_func(msg):
return '[%s:%s] %s' % (
style.NOTICE(executor_codename),
style.NOTICE(schema_name),
msg
)

stdout = OutputWrapper(sys.stdout)
stdout.style_func = style_func
stderr = OutputWrapper(sys.stderr)
stderr.style_func = style_func
if int(options.get('verbosity', 1)) >= 1:
stdout.write(style.NOTICE("=== Running migrate for schema %s" % schema_name))

connection.set_schema(schema_name)
MigrateCommand(stdout=stdout, stderr=stderr).execute(*args, **options)

try:
transaction.commit()
connection.close()
connection.connection = None
except transaction.TransactionManagementError:
if not allow_atomic:
raise

# We are in atomic transaction, don't close connections
pass

connection.set_schema_to_public()


class MigrationExecutor(object):
codename = None
LOGGER_NAME = 'migration'

def __init__(self, args, options):
self.args = args
self.options = options
self.logger = self.get_or_create_logger()

def run_migrations(self, tenants):
public_schema_name = get_public_schema_name()

if public_schema_name in tenants:
run_migrations(self.args, self.options, self.codename, public_schema_name)
self.logger.info("Started migration for public tenant")
self.run_migration(public_schema_name)
tenants.pop(tenants.index(public_schema_name))
if tenants:
self.logger.info("Started migrations for {} private tenants".format(len(tenants)))
self.run_tenant_migrations(tenants)

def run_migration(self, schema_name, allow_atomic=True):
from django.core.management import color
from django.core.management.base import OutputWrapper
from django.db import connection

style = color.color_style()
executor_codename = self.codename

def style_func(msg):
return '[%s:%s] %s' % (
style.NOTICE(executor_codename),
style.NOTICE(schema_name),
msg
)

stdout = OutputWrapper(sys.stdout)
stdout.style_func = style_func
stderr = OutputWrapper(sys.stderr)
stderr.style_func = style_func
if int(self.options.get('verbosity', 1)) >= 1:
stdout.write(style.NOTICE("=== Running migrate for schema %s" % schema_name))

connection.set_schema(schema_name)

try:
MigrateCommand(stdout=stdout, stderr=stderr).execute(*self.args, **self.options)
except Exception as e:
self.logger.error('Migration fails for tenant {}. Error: {}'.format(schema_name, str(e)))
raise

try:
transaction.commit()
connection.close()
connection.connection = None
except transaction.TransactionManagementError:
if not allow_atomic:
raise

self.run_tenant_migrations(tenants)
# We are in atomic transaction, don't close connections
pass

connection.set_schema_to_public()

def run_tenant_migrations(self, tenant):
raise NotImplementedError

@classmethod
def get_or_create_logger(cls):
"""
Return logger for migration executor.
Configure logger handlers if they are not already configured
"""
logger = logging.getLogger(cls.LOGGER_NAME)
if len(logger.handlers) == 0:
logger_path = getattr(settings, 'TENANT_MIGRATION_LOGGER_PATH', '')
hdlr = logging.FileHandler(os.path.join(logger_path, '{}.log'.format(cls.LOGGER_NAME)))
formatter = logging.Formatter('[%(asctime)s][%(levelname)s] %(message)s')
hdlr.setFormatter(formatter)
logger.addHandler(hdlr)
logger.setLevel(logging.INFO)

return logger
15 changes: 12 additions & 3 deletions tenant_schemas/migration_executors/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,17 @@

from django.conf import settings

from tenant_schemas.migration_executors.base import MigrationExecutor, run_migrations
from tenant_schemas.migration_executors.base import MigrationExecutor


def _run_migration_alias(args, options, tenants, allow_atomic=True):
"""
Alias for run_migration method that allows the method to be called in a
multiprocessing pool
"""
parallel_executor = ParallelExecutor(args, options)
parallel_executor.run_migration(tenants, allow_atomic=allow_atomic)
return


class ParallelExecutor(MigrationExecutor):
Expand All @@ -20,10 +30,9 @@ def run_tenant_migrations(self, tenants):
connection.connection = None

run_migrations_p = functools.partial(
run_migrations,
_run_migration_alias,
self.args,
self.options,
self.codename,
allow_atomic=False
)
p = multiprocessing.Pool(processes=processes)
Expand Down
4 changes: 2 additions & 2 deletions tenant_schemas/migration_executors/standard.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from tenant_schemas.migration_executors.base import MigrationExecutor, run_migrations
from tenant_schemas.migration_executors.base import MigrationExecutor


class StandardExecutor(MigrationExecutor):
codename = 'standard'

def run_tenant_migrations(self, tenants):
for schema_name in tenants:
run_migrations(self.args, self.options, self.codename, schema_name)
self.run_migration(schema_name)
11 changes: 11 additions & 0 deletions tenant_schemas/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from celery import shared_task
from tenant_schemas.migration_executors.base import MigrationExecutor


@shared_task(ignore_result=False)
def run_schema_migration(args, options, schema_name):
migration_executor = MigrationExecutor(args, options)
try:
migration_executor.run_migration(schema_name)
except Exception:
return schema_name
10 changes: 6 additions & 4 deletions tox.ini
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
[tox]
envlist = py{27,35}-dj{18,110,111}-{standard,parallel}
envlist = py{27,35}-dj{18,110,111}-{standard,parallel,async}

[travis:env]
DJANGO =
1.8: dj18-{standard,parallel}
1.10: dj110-{standard,parallel}
1.11: dj111-{standard,parallel}
1.8: dj18-{standard,parallel,async}
1.10: dj110-{standard,parallel,async}
1.11: dj111-{standard,parallel,async}

[testenv]
usedevelop = True

deps =
celery
coverage
mock
tblib
Expand All @@ -25,6 +26,7 @@ passenv = PG_NAME PG_USER PG_PASSWORD PG_HOST PG_PORT
setenv =
standard: MIGRATION_EXECUTOR=standard
parallel: MIGRATION_EXECUTOR=parallel
async: MIGRATION_EXECUTOR=async

commands =
coverage run manage.py test --noinput {posargs:tenant_schemas}
Expand Down