Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions awx/main/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ def ready(self):
from django.conf import settings
from awx.main.utils.db import get_pg_notify_params
from awx.main.dispatch import get_task_queuename
from awx.main.dispatch.config import get_max_workers
from awx.main.dispatch.pool import get_auto_max_workers

dispatcher_setup(
{
"version": 2,
"service": {
"pool_kwargs": {
"min_workers": settings.JOB_EVENT_WORKERS,
"max_workers": get_max_workers(),
"max_workers": get_auto_max_workers(),
},
"main_kwargs": {"node_id": settings.CLUSTER_HOST_ID},
"process_manager_cls": "ForkServerManager",
Expand All @@ -116,6 +116,7 @@ def ready(self):
"default_control_broker": "socket",
"default_broker": "pg_notify",
},
"worker": {"worker_cls": "awx.main.dispatch.worker.dispatcherd.AWXTaskWorker"},
}
)

Expand Down
14 changes: 14 additions & 0 deletions awx/main/dispatch/worker/dispatcherd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from dispatcherd.worker.task import TaskWorker

from django.db import connection


class AWXTaskWorker(TaskWorker):

def on_start(self) -> None:
"""Get worker connected so that first task it gets will be worked quickly"""
connection.ensure_connection()

def pre_task(self, message) -> None:
"""This should remedy bad connections that can not fix themselves"""
connection.close_if_unusable_or_obsolete()
9 changes: 9 additions & 0 deletions awx/main/tests/data/projects/debug/sleep.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
- hosts: all
gather_facts: false
connection: local
vars:
sleep_interval: 5
tasks:
- name: sleep for a specified interval
command: sleep '{{ sleep_interval }}'
44 changes: 42 additions & 2 deletions awx/main/tests/data/sleep_task.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,57 @@
import time
import logging

from dispatcherd.publish import task

from django.db import connection

from awx.main.dispatch import get_task_queuename
from awx.main.dispatch.publish import task
from awx.main.dispatch.publish import task as old_task

from ansible_base.lib.utils.db import advisory_lock


logger = logging.getLogger(__name__)


@task(queue=get_task_queuename)
@old_task(queue=get_task_queuename)
def sleep_task(seconds=10, log=False):
if log:
logger.info('starting sleep_task')
time.sleep(seconds)
if log:
logger.info('finished sleep_task')


@task()
def sleep_break_connection(seconds=0.2):
"""
Interact with the database in an intentionally breaking way.
After this finishes, queries made by this connection are expected to error
with "the connection is closed"
This is obviously a problem for any task that comes afterwards.
So this is used to break things so that the fixes may be demonstrated.
"""
with connection.cursor() as cursor:
cursor.execute(f"SET idle_session_timeout = '{seconds / 2}s';")

logger.info(f'sleeping for {seconds}s > {seconds / 2}s session timeout')
time.sleep(seconds)

for i in range(1, 3):
logger.info(f'\nRunning query number {i}')
try:
with connection.cursor() as cursor:
cursor.execute("SELECT 1;")
logger.info(' query worked, not expected')
except Exception as exc:
logger.info(f' query errored as expected\ntype: {type(exc)}\nstr: {str(exc)}')

logger.info(f'Connection present: {bool(connection.connection)}, reports closed: {getattr(connection.connection, "closed", "not_found")}')


@task()
def advisory_lock_exception():
time.sleep(0.2) # so it can fill up all the workers... hacky for now
with advisory_lock('advisory_lock_exception', lock_session_timeout_milliseconds=20):
raise RuntimeError('this is an intentional error')
8 changes: 5 additions & 3 deletions awx/main/tests/live/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def fn():

@pytest.fixture
def run_job_from_playbook(default_org, demo_inv, post, admin):
def _rf(test_name, playbook, local_path=None, scm_url=None, jt_params=None):
def _rf(test_name, playbook, local_path=None, scm_url=None, jt_params=None, wait=True):
project_name = f'{test_name} project'
jt_name = f'{test_name} JT: {playbook}'

Expand Down Expand Up @@ -180,7 +180,9 @@ def _rf(test_name, playbook, local_path=None, scm_url=None, jt_params=None):
job = jt.create_unified_job()
job.signal_start()

wait_for_job(job)
assert job.status == 'successful'
if wait:
wait_for_job(job)
assert job.status == 'successful'
return {'job': job}

return _rf
74 changes: 74 additions & 0 deletions awx/main/tests/live/tests/dispatcherd/test_connection_recovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import time

from dispatcherd.config import settings
from dispatcherd.factories import get_control_from_settings
from dispatcherd.utils import serialize_task

from awx.main.models import JobTemplate

from awx.main.tests.data.sleep_task import sleep_break_connection, advisory_lock_exception
from awx.main.tests.live.tests.conftest import wait_for_job


def poll_for_task_finish(task_name):
running_tasks = [1]
start = time.monotonic()
ctl = get_control_from_settings()
while running_tasks:
responses = ctl.control_with_reply('running')
assert len(responses) == 1
response = responses[0]
response.pop('node_id')
running_tasks = [task_data for task_data in response.values() if task_data['task'] == task_name]
if time.monotonic() - start > 5.0:
assert False, f'Never finished working through tasks: {running_tasks}'


def check_jobs_work():
jt = JobTemplate.objects.get(name='Demo Job Template')
job = jt.create_unified_job()
job.signal_start()
wait_for_job(job)


def test_advisory_lock_error_clears():
"""Run a task that has an exception while holding advisory_lock

This is regression testing for a bug in its exception handling
expected to be fixed by
https://github.com/ansible/django-ansible-base/pull/713

This is an "easier" test case than the next,
because it passes just by fixing the DAB case,
and passing this does not generally guarentee that
workers will not be left with a connection in a bad state.
"""
min_workers = settings.service['pool_kwargs']['min_workers']

for i in range(min_workers):
advisory_lock_exception.delay()

task_name = serialize_task(advisory_lock_exception)
poll_for_task_finish(task_name)

# Jobs should still work even after the breaking task has ran
check_jobs_work()


def test_can_recover_connection():
"""Run a task that intentionally times out the worker connection

If no connection fixing is implemented outside of that task scope,
then subsequent tasks will all error, thus checking that jobs run,
after running the sleep_break_connection task.
"""
min_workers = settings.service['pool_kwargs']['min_workers']

for i in range(min_workers):
sleep_break_connection.delay()

task_name = serialize_task(sleep_break_connection)
poll_for_task_finish(task_name)

# Jobs should still work even after the breaking task has ran
check_jobs_work()
40 changes: 40 additions & 0 deletions awx/main/tests/live/tests/test_job_cancel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import time

from awx.api.versioning import reverse
from awx.main.models import Job

from awx.main.tests.live.tests.conftest import wait_for_events


def test_cancel_and_delete_job(live_tmp_folder, run_job_from_playbook, post, delete, admin):
res = run_job_from_playbook('test_cancel_and_delete_job', 'sleep.yml', scm_url=f'file://{live_tmp_folder}/debug', wait=False)
job = res['job']
assert job.status == 'pending'

# Wait for first event so that we can be sure the job is in-progress first
start = time.time()
timeout = 3.0
while not job.job_events.exists():
time.sleep(0.2)
if time.time() - start > timeout:
assert False, f'Did not receive first event for job_id={job.id} in {timeout} seconds'

# Now cancel the job
url = reverse("api:job_cancel", kwargs={'pk': job.pk})
post(url, user=admin, expect=202)

# Job status should change to expected status before infinity
start = time.time()
timeout = 5.0
job.refresh_from_db()
while job.status != 'canceled':
time.sleep(0.05)
job.refresh_from_db(fields=['status'])
if time.time() - start > timeout:
assert False, f'job_id={job.id} still status={job.status} after {timeout} seconds'

wait_for_events(job)
url = reverse("api:job_detail", kwargs={'pk': job.pk})
delete(url, user=admin, expect=204)

assert not Job.objects.filter(id=job.id).exists()
8 changes: 7 additions & 1 deletion awx/main/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@ def set_connection_name(function):


def psycopg_connection_from_django(**kwargs) -> psycopg.Connection:
"Compatibility with dispatcher connection factory, just returns the Django connection"
"""Compatibility with dispatcherd connection factory, just returns the Django connection

dispatcherd passes config info as kwargs, but in this case we just want to ignore then.
Because the point of this it to not reconnect, but rely on existing Django connection management.
"""
if connection.connection is None:
connection.ensure_connection()
return connection.connection


Expand Down
2 changes: 1 addition & 1 deletion requirements/requirements_git.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ git+https://github.com/ansible/ansible-runner.git@devel#egg=ansible-runner
awx-plugins-core @ git+https://github.com/ansible/awx-plugins.git@devel#egg=awx-plugins-core[credentials-github-app]
django-ansible-base @ git+https://github.com/ansible/django-ansible-base@devel#egg=django-ansible-base[rest-filters,jwt_consumer,resource-registry,rbac,feature-flags]
awx_plugins.interfaces @ git+https://github.com/ansible/awx_plugins.interfaces.git
dispatcherd @ git+https://github.com/ansible/dispatcherd.git
dispatcherd @ git+https://github.com/ansible/dispatcher.git
Loading