Skip to content

Commit

Permalink
Merge pull request #78 from openradx/procrastinate
Browse files Browse the repository at this point in the history
Use Procrastinate instead of Celery tasks and DICOM worker
  • Loading branch information
medihack committed Jul 14, 2024
2 parents eec3e2c + d53fbb4 commit 23985f1
Show file tree
Hide file tree
Showing 56 changed files with 1,462 additions and 2,265 deletions.
20 changes: 9 additions & 11 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ ENV PYTHONUNBUFFERED=1 \
# poetry
# https://python-poetry.org/docs/#installing-with-the-official-installer
# https://python-poetry.org/docs/configuration/#using-environment-variables
POETRY_VERSION=1.8.2 \
POETRY_VERSION=1.8.3 \
# make poetry install to this location
POETRY_HOME="/opt/poetry" \
# make poetry create the virtual environment in the project's root
Expand Down Expand Up @@ -73,11 +73,10 @@ RUN poetry install
# Install requirements for end-to-end testing
RUN playwright install --with-deps chromium

# Required folders for ADIT
RUN mkdir -p /var/www/adit/logs \
/var/www/adit/static \
/var/www/adit/ssl \
/var/www/adit/celery
# Required folders for web service
RUN mkdir -p /var/www/web/logs \
/var/www/web/static \
/var/www/web/ssl

# will become mountpoint of our code
WORKDIR /app
Expand All @@ -88,10 +87,9 @@ FROM python-base as production
COPY --from=builder-base $PYSETUP_PATH $PYSETUP_PATH
COPY . /app/

# Required folders for ADIT
RUN mkdir -p /var/www/adit/logs \
/var/www/adit/static \
/var/www/adit/ssl \
/var/www/adit/celery
# Required folders for web service
RUN mkdir -p /var/www/web/logs \
/var/www/web/static \
/var/www/web/ssl

WORKDIR /app
26 changes: 3 additions & 23 deletions KNOWLEDGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,6 @@
- The SECRET_KEY should not start with a dollar sign (\$), django-environ has problems with it (see Proxy value in the documentation)
- Multi table inheritance extensions: <https://github.com/django-polymorphic/django-polymorphic> and <https://github.com/jazzband/django-model-utils>

## Celery Manage Python API

- python manage.py shell_plus
from adit.celery import app
i = app.control.inspect()
i.scheduled()
app.AsyncResult("task_id").state

## Celery

<https://medium.com/better-programming/python-celery-best-practices-ae182730bb81>
<https://stackoverflow.com/a/47980598/166229>

## DICOM

- All available DICOM tags: <https://dicom.nema.org/medical/dicom/current/output/chtml/part06/chapter_6.html>
Expand Down Expand Up @@ -109,8 +96,8 @@ and <https://forums.docker.com/t/docker-swarm-mode-not-picking-up-proxy-configur
- docker build . --target development -t adit_dev # Build a volume from our Dockerfile
- docker run -v C:\Users\kaisc\Projects\adit:/src -it adit_dev /bin/bash # Run the built container with src folder mounted from host
- docker ps -a --filter volume=vol_name # Find container that mounts volume
- docker run -v=adit_web_data:/var/www/adit -it busybox /bin/sh # Start interactive shell with named volume mounted
- docker run --rm -i -v=adit_web_data:/foo busybox find /foo # List files in named volume
- docker run -v=adit_dev_web_data:/var/www/web -it busybox /bin/sh # Start interactive shell with named volume mounted
- docker run --rm -i -v=adit_dev_web_data:/foo busybox find /foo # List files in named volume
- docker volume ls -f "name=adit_dev-\*" # Show all volumes that begin with "adit_dev-"
- docker volume rm $(docker volume ls -f "name=foobar-\*" -q) # Delete all volumes that begin with "foobar-", cave delete the \

Expand All @@ -132,16 +119,9 @@ and <https://forums.docker.com/t/docker-swarm-mode-not-picking-up-proxy-configur
- docker stack services foobar
- docker stack rm foobar

### Celery commands

- celery -A adit purge -Q default_queue,dicom_task_queue
- celery -A adit inspect active_queues
- celery -A adit inspect scheduled
- celery -A adit inspect stats

## Deployment for production

- Copy cert.pem and key.pem from N:\Dokumente\Projekte\ADIT_Server\ssl_certificate to /var/www/adit/ssl/
- Copy cert.pem and key.pem from N:\Dokumente\Projekte\ADIT_Server\ssl_certificate to /var/www/web/ssl/
- Restart adit_prod_web container
- Add the DICOM servers and folders

Expand Down
29 changes: 2 additions & 27 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Top

- Delete VSCode stuff inside containers (I think its only when using the container itself as devcontainer)
- When populate_data then also reset_orthancs
- Study Date is changed when using Selective Transfer
- Fix that ADIT DICOMweb supports STOW of multiple image files at once
Expand Down Expand Up @@ -64,7 +65,6 @@
- We can also make the network attachable to do the "exec" stuff in tasks.py using one off containers using "run"
- Redirect after restart/retry/delete job
- Option in batch query to query whole study or explicit series
- Allow to terminate a specific Celery task with revoke(celery_task_id, terminate=True)
- Make whole receiver crash if one asyncio task crashes
- Auto refresh job pages und success or failure
- Query with StudyDateStart, StudyDateEnd, StudyDate
Expand All @@ -74,18 +74,9 @@
- Fix the ineffective stuff in transfer_utils, see TODO there
- Write test_parsers.py
- DICOM data that does not need to be modified can be directly transferred between the source and destination server (C-MOVE). The only exception is when source and destination server are the same, then the data will still be downloaded and uploaded again. This may be helpful when the PACS server treats the data somehow differently when sent by ADIT.
- Check if we still need Abortable Celery Tasks (and just use Task)
- Currently we don't use this functionality to abort running task, but we could
- <https://docs.celeryq.dev/en/stable/reference/celery.contrib.abortable.html>
- <https://docs.celeryq.dev/en/latest/faq.html#how-do-i-get-the-result-of-a-task-if-i-have-the-id-that-points-there>
- Use Django ORM as Celery result backend (currently we use Redis for that)
- <https://docs.celeryq.dev/en/stable/django/first-steps-with-django.html#django-celery-results-using-the-django-orm-cache-as-a-result-backend>

## Fix

- Fix Celery logging (task ids are not appended to logging messages even as we use get_task_logger)
- Look into how the setup is in <https://youtube.com/playlist?list=PLOLrQ9Pn6caz-6WpcBYxV84g9gwptoN20&si=jUU6wttECucsbGFv>
- and its code <https://github.com/veryacademy?q=Django&type=all&language=&sort=>
- Shorter timeout for offline studies
- Tests: test_query_utils, test serializers, test all views (as integration tests using real Orthanc), improve tests of transferutil, BatchFileSizeError
- c-get download timeout
Expand All @@ -95,8 +86,6 @@

## Maybe

- Replace Redis distributed locks with <https://django-pglock.readthedocs.io/en/1.5.1/>
- Replace Celery with <https://procrastinate.readthedocs.io/en/stable/howto/django.html>
- Do some prechecks before trying the task (is source and destination online?)
- Generate exposed IDs for URLs by hashing the primary (number) keys:
- <https://sqids.org/python>
Expand All @@ -119,15 +108,14 @@
- Currently we don't allow this, but this can happen when a patient has multiple PatientIDs in the same PACS (e.g. has external images)
- exclude test folders from autorelad in ServerCommand (maybe a custom filter is needed)
- Switch from Daphne to Uvicorn (maybe it has faster restart times during development)
- Switch from Celery to Huey
- Upgrade postgres server to v15, but we have to migrate the data then as the database files are incompatible a newer version
- <https://hollo.me/devops/upgrade-postgresql-database-with-docker.html>
- <https://thomasbandt.com/postgres-docker-major-version-upgrade>
- <https://betterprogramming.pub/how-to-upgrade-your-postgresql-version-using-docker-d1e81dbbbdf9>
- look into <https://github.com/tianon/docker-postgres-upgrade>
- Allow to search multiple source servers with one query (maybe only in dicom explorer)
- Bring everything behind Nginx as reverse proxy
- Orthanc and Flower should then be directly behind Nginx (without Django-revproxy)
- Orthanc could then be directly behind Nginx (without Django-revproxy)
- Use authentication module of nginx
- <http://nginx.org/en/docs/http/ngx_http_auth_request_module.html>
- <https://stackoverflow.com/a/70961666/166229>
Expand All @@ -143,7 +131,6 @@
- Unfortunately, I could not get it to work with Django autoreload itself, but we can use something similiar by using watchman directly and integrate it in ServerCommand
- BatchQuery with custom DICOM keywords
- Watchdog server
- pull celery_task stuff out of transfer_utils
- Allow provide a regex of StudyDescription in batch file
- move date parsing part in parsers.py and consumers.py to date_util.py
- <https://stackoverflow.com/questions/14259852/how-to-identify-image-receive-in-c-store-as-result-of-a-c-move-query>
Expand All @@ -157,18 +144,6 @@
- Can be work around by wrapping another zip file in an encrypted zip file <https://unix.stackexchange.com/a/290088/469228>
- Rewrite dicom_connector to use asyncio (wrap all pynetdicom calls in asyncio.to_thread)
- I don't think that this will gain any performance improvements, so maybe not worth it
- Look out for a django-revproxy fix (see <https://github.com/jazzband/django-revproxy/issues/144>)
- Flower is running behind a Django internal reverse proxy (django-revproxy) so that only admins can access them
- Unfortunately the last released django-revproxy is broken with latest Django
- So we use the master branch here directly from Github (see pyproject.toml)
- Alternatively we could use <https://github.com/thomasw/djproxy>
- Rethink task queues and rescheduling
- Currently we use Celery to schedule tasks in the future using Celery's ETA feature, but this is not recommended for tasks in the distant future (see <https://docs.celeryq.dev/en/stable/userguide/calling.html#eta-and-countdown>)
- An option would be to introduce a "rescheduled" property in task model and reschedule them by checking periodically using Celery Beat PeriodicTasks (maybe every hour or so) or using "one_off" PeriodicTasks.
- But then we can't use Celery Canvas anymore as tasks in a worker finish with such a rescheduling outside of the Celery queue system. We then have to check at the end of each task if the job is finished or erroneous (by checking all the other sibling tasks). This should be done with a distributed lock (e.g. using <https://sher-lock.readthedocs.io/en/latest/>) so that if we have multiple workers there are no race conditions.
- Maybe it isn't even a big problem as in a hospital site we never accumulate such many Celery tasks on a worker and ETA is totally fine (just keep it in mind that it could get a problem).
- Make sure if using PeriodicTask that those are also cancelled when job is cancelled.
- Another solution would be to use Temporal.io as soon as they implement task priorities <https://github.com/temporalio/temporal/issues/1507>
- Evaluate other task runners
- <https://www.pyinvoke.org/> # used currently
- <https://github.com/taskipy/taskipy>
Expand Down
7 changes: 0 additions & 7 deletions adit/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +0,0 @@
from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ("celery_app",)
5 changes: 3 additions & 2 deletions adit/batch_query/apps.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from django.apps import AppConfig
from django.db.models.signals import post_migrate

from adit.core.utils.model_utils import get_model_label

SECTION_NAME = "Batch Query"


Expand Down Expand Up @@ -29,8 +31,7 @@ def register_app():
)
)

model_label = f"{BatchQueryTask._meta.app_label}.{BatchQueryTask._meta.model_name}"
register_dicom_processor(model_label, BatchQueryTaskProcessor)
register_dicom_processor(get_model_label(BatchQueryTask), BatchQueryTaskProcessor)

def collect_job_stats():
counts: dict[BatchQueryJob.Status, int] = {}
Expand Down
47 changes: 47 additions & 0 deletions adit/batch_query/migrations/0032_switch_to_procrastinate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Generated by Django 5.0.6 on 2024-07-07 19:47

import django.db.models.deletion
from django.apps import AppConfig
from django.db import migrations, models

from adit_radis_shared.common.utils.migration_utils import procrastinate_on_delete_sql


def increase_attempt(apps: AppConfig, schema_editor):
BatchQueryTask = apps.get_model("batch_query.BatchQueryTask")
BatchQueryTask.objects.update(attempts=models.F("attempts") + 1)

def decrease_attempt(apps: AppConfig, schema_editor):
BatchQueryTask = apps.get_model("batch_query.BatchQueryTask")
BatchQueryTask.objects.update(attempts=models.F("attempts") - 1)


class Migration(migrations.Migration):

dependencies = [
("batch_query", "0031_remove_batchquerysettings_slot_begin_time_and_more"),
("procrastinate", "0028_add_cancel_states"),
]

operations = [
migrations.RenameField(
model_name="batchquerytask",
old_name="retries",
new_name="attempts",
),
migrations.AddField(
model_name="batchquerytask",
name="queued_job",
field=models.OneToOneField(
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="+",
to="procrastinate.procrastinatejob",
),
),
migrations.RunSQL(
sql=procrastinate_on_delete_sql("batch_query", "batchquerytask"),
reverse_sql=procrastinate_on_delete_sql("batch_query", "batchquerytask", reverse=True),
),
migrations.RunPython(increase_attempt, decrease_attempt),
]
8 changes: 6 additions & 2 deletions adit/batch_query/tests/integration/test_batch_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def test_urgent_batch_query_with_dimse_server(
page: Page,
poll: Callable[[Locator], Locator],
dimse_orthancs,
dicom_worker,
run_worker,
live_server,
create_and_login_user,
batch_query_group,
Expand Down Expand Up @@ -44,6 +44,8 @@ def test_urgent_batch_query_with_dimse_server(
page.get_by_label("Batch file*", exact=True).set_input_files(files=[batch_file])
page.locator('input:has-text("Create job")').click()

run_worker()

# Assert
expect(poll(page.locator('dl:has-text("Success")'))).to_be_visible()

Expand All @@ -54,7 +56,7 @@ def test_urgent_batch_query_with_dicomweb_server(
page: Page,
poll: Callable[[Locator], Locator],
dicomweb_orthancs,
dicom_worker,
run_worker,
live_server,
create_and_login_user,
batch_query_group,
Expand Down Expand Up @@ -82,5 +84,7 @@ def test_urgent_batch_query_with_dicomweb_server(
page.get_by_label("Batch file*", exact=True).set_input_files(files=[batch_file])
page.locator('input:has-text("Create job")').click()

run_worker()

# Assert
expect(poll(page.locator('dl:has-text("Success")'))).to_be_visible()
54 changes: 45 additions & 9 deletions adit/batch_query/tests/test_migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@


@pytest.mark.django_db
def test_0016_convert_json_to_text(migrator: Migrator):
old_state = migrator.apply_initial_migration(
def test_0016_convert_json_to_text(migrator_ext: Migrator):
old_state = migrator_ext.apply_initial_migration(
("batch_query", "0015_alter_batchqueryresult_modalities_and_more")
)

Expand Down Expand Up @@ -49,7 +49,7 @@ def test_0016_convert_json_to_text(migrator: Migrator):
study_uid="1.2.3",
)

new_state = migrator.apply_tested_migration(("batch_query", "0016_convert_json_to_text"))
new_state = migrator_ext.apply_tested_migration(("batch_query", "0016_convert_json_to_text"))
BatchQueryTask = new_state.apps.get_model("batch_query", "BatchQueryTask")
BatchQueryResult = new_state.apps.get_model("batch_query", "BatchQueryResult")

Expand All @@ -64,8 +64,8 @@ def test_0016_convert_json_to_text(migrator: Migrator):


@pytest.mark.django_db
def test_0021_set_source_in_tasks(migrator: Migrator):
old_state = migrator.apply_initial_migration(("batch_query", "0020_batchquerytask_source"))
def test_0021_set_source_in_tasks(migrator_ext: Migrator):
old_state = migrator_ext.apply_initial_migration(("batch_query", "0020_batchquerytask_source"))

User = old_state.apps.get_model("accounts", "User")
DicomServer = old_state.apps.get_model("core", "DicomServer")
Expand All @@ -90,7 +90,7 @@ def test_0021_set_source_in_tasks(migrator: Migrator):
task_id="123",
)

new_state = migrator.apply_tested_migration(("batch_query", "0021_set_source_in_tasks"))
new_state = migrator_ext.apply_tested_migration(("batch_query", "0021_set_source_in_tasks"))
BatchQueryTask = new_state.apps.get_model("batch_query", "BatchQueryTask")

task = BatchQueryTask.objects.get(id=task.id)
Expand All @@ -99,8 +99,8 @@ def test_0021_set_source_in_tasks(migrator: Migrator):


@pytest.mark.django_db
def test_0026_move_text_to_array_field(migrator: Migrator):
old_state = migrator.apply_initial_migration(
def test_0026_move_text_to_array_field(migrator_ext: Migrator):
old_state = migrator_ext.apply_initial_migration(
("batch_query", "0025_batchqueryresult_modalities_new_and_more")
)

Expand Down Expand Up @@ -146,7 +146,7 @@ def test_0026_move_text_to_array_field(migrator: Migrator):
study_uid="1.2.3",
)

new_state = migrator.apply_tested_migration(("batch_query", "0026_text_to_array_field"))
new_state = migrator_ext.apply_tested_migration(("batch_query", "0026_text_to_array_field"))
BatchQueryTask = new_state.apps.get_model("batch_query", "BatchQueryTask")
BatchQueryResult = new_state.apps.get_model("batch_query", "BatchQueryResult")

Expand All @@ -158,3 +158,39 @@ def test_0026_move_text_to_array_field(migrator: Migrator):
assert query.series_numbers_new == ["4", "5", "6"]

assert result.modalities_new == ["CT", "MR"]


@pytest.mark.django_db
def test_0032_switch_to_procrastinate(migrator_ext: Migrator):
old_state = migrator_ext.apply_initial_migration(
("batch_query", "0031_remove_batchquerysettings_slot_begin_time_and_more")
)

User = old_state.apps.get_model("accounts", "User")
DicomServer = old_state.apps.get_model("core", "DicomServer")
BatchQueryJob = old_state.apps.get_model("batch_query", "BatchQueryJob")
BatchQueryTask = old_state.apps.get_model("batch_query", "BatchQueryTask")

server = DicomServer.objects.create(
ae_title="server1",
name="server1",
host="server1",
port=11112,
)
user = User.objects.create(
username="user",
)
job = BatchQueryJob.objects.create(
owner_id=user.id,
)
task = BatchQueryTask.objects.create(
job_id=job.id,
source_id=server.id,
lines=[],
)

new_state = migrator_ext.apply_tested_migration(("batch_query", "0032_switch_to_procrastinate"))

BatchQueryTask = new_state.apps.get_model("batch_query", "BatchQueryTask")

assert task.retries + 1 == BatchQueryTask.objects.get(id=task.id).attempts
Loading

0 comments on commit 23985f1

Please sign in to comment.