Skip to content

Commit

Permalink
Let Vespa have its own worker and queue.
Browse files Browse the repository at this point in the history
Every search database behaves a bit differently. Vespa takes quite long to index documents (as those indexes are directly available). So we make sure here that each full text search database can be decide for itself that if it need some background task for indexing or do it directly in the request itself.
  • Loading branch information
medihack committed Mar 19, 2024
1 parent 5c23feb commit 4f91612
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 66 deletions.
4 changes: 4 additions & 0 deletions compose/docker-compose.base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ services:
<<: *default-app
hostname: worker_default.local

worker_vespa:
<<: *default-app
hostname: worker_vespa.local

worker_llm:
<<: *default-app
hostname: worker_llm.local
Expand Down
7 changes: 7 additions & 0 deletions compose/docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ services:
profiles:
- full

worker_vespa:
<<: *default-app
command: |
./manage.py celery_worker -c 1 -Q vespa_queue --autoreload
profiles:
- full

worker_llm:
<<: *default-app
command: |
Expand Down
6 changes: 6 additions & 0 deletions compose/docker-compose.prod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ services:
deploy:
replicas: 1

worker_vespa:
<<: *default-app
command: ./manage.py celery_worker -c 1 -Q vespa_queue
deploy:
replicas: 1

worker_llm:
<<: *default-app
command: ./manage.py celery_worker -c 1 -Q llm_queue
Expand Down
54 changes: 27 additions & 27 deletions notebooks/radis_api.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"cells": [
{
"cell_type": "code",
"execution_count": 64,
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -19,7 +19,7 @@
},
{
"cell_type": "code",
"execution_count": 65,
"execution_count": 8,
"metadata": {},
"outputs": [
{
Expand All @@ -32,7 +32,7 @@
{
"data": {
"text/plain": [
"{'id': 109,\n",
"{'id': 1003,\n",
" 'metadata': {'study_instance_uid': '34343-34343-34343',\n",
" 'accession_number': '345348389',\n",
" 'series_instance_uid': '34343-676556-3343',\n",
Expand All @@ -49,8 +49,8 @@
" 'study_datetime': '2000-08-10T00:00:00+02:00',\n",
" 'links': ['http://gepacs.com/34343-34343-34343'],\n",
" 'body': 'This is the report',\n",
" 'created_at': '2024-03-16T17:51:07.608466+01:00',\n",
" 'updated_at': '2024-03-16T17:51:07.608474+01:00',\n",
" 'created_at': '2024-03-20T00:41:34.811162+01:00',\n",
" 'updated_at': '2024-03-20T00:41:34.811170+01:00',\n",
" 'groups': [2]}"
]
},
Expand Down Expand Up @@ -93,13 +93,13 @@
},
{
"cell_type": "code",
"execution_count": 66,
"execution_count": 9,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'id': 109,\n",
"{'id': 1003,\n",
" 'metadata': {'study_instance_uid': '34343-34343-34343',\n",
" 'accession_number': '345348389',\n",
" 'series_instance_uid': '34343-676556-3343',\n",
Expand All @@ -116,12 +116,12 @@
" 'study_datetime': '2000-08-10T00:00:00+02:00',\n",
" 'links': ['http://gepacs.com/34343-34343-34343'],\n",
" 'body': 'This is the report',\n",
" 'created_at': '2024-03-16T17:51:07.608466+01:00',\n",
" 'updated_at': '2024-03-16T17:51:07.608474+01:00',\n",
" 'created_at': '2024-03-20T00:41:34.811162+01:00',\n",
" 'updated_at': '2024-03-20T00:41:34.811170+01:00',\n",
" 'groups': [2]}"
]
},
"execution_count": 66,
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
Expand Down Expand Up @@ -163,13 +163,13 @@
},
{
"cell_type": "code",
"execution_count": 67,
"execution_count": 10,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'id': 109,\n",
"{'id': 1003,\n",
" 'metadata': {'study_instance_uid': '34343-34343-34343',\n",
" 'accession_number': '345348389',\n",
" 'series_instance_uid': '34343-676556-3343',\n",
Expand All @@ -186,12 +186,12 @@
" 'study_datetime': '2000-08-10T00:00:00+02:00',\n",
" 'links': ['http://gepacs.com/34343-34343-34343'],\n",
" 'body': 'This is the report',\n",
" 'created_at': '2024-03-16T17:51:07.608466+01:00',\n",
" 'updated_at': '2024-03-16T17:51:07.608474+01:00',\n",
" 'created_at': '2024-03-20T00:41:34.811162+01:00',\n",
" 'updated_at': '2024-03-20T00:41:34.811170+01:00',\n",
" 'groups': [2]}"
]
},
"execution_count": 67,
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
Expand All @@ -209,13 +209,13 @@
},
{
"cell_type": "code",
"execution_count": 68,
"execution_count": 11,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'id': 109,\n",
"{'id': 1003,\n",
" 'metadata': {'study_instance_uid': '34343-34343-34343',\n",
" 'accession_number': '345348389',\n",
" 'series_instance_uid': '34343-676556-3343',\n",
Expand All @@ -232,8 +232,8 @@
" 'study_datetime': '2000-08-10T00:00:00+02:00',\n",
" 'links': ['http://gepacs.com/34343-34343-34343'],\n",
" 'body': 'This is the report',\n",
" 'created_at': '2024-03-16T17:51:07.608466+01:00',\n",
" 'updated_at': '2024-03-16T17:51:07.608474+01:00',\n",
" 'created_at': '2024-03-20T00:41:34.811162+01:00',\n",
" 'updated_at': '2024-03-20T00:41:34.811170+01:00',\n",
" 'groups': [2],\n",
" 'documents': {'vespa': {'pathId': '/document/v1/report/report/docid/gepacs_3dfidii5858-6633i4-ii398841',\n",
" 'id': 'id:report:report::gepacs_3dfidii5858-6633i4-ii398841',\n",
Expand All @@ -252,7 +252,7 @@
" 'study_datetime': 965858400}}}}"
]
},
"execution_count": 68,
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
Expand All @@ -271,7 +271,7 @@
},
{
"cell_type": "code",
"execution_count": 69,
"execution_count": 12,
"metadata": {},
"outputs": [
{
Expand All @@ -294,13 +294,13 @@
},
{
"cell_type": "code",
"execution_count": 70,
"execution_count": 13,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'id': 110,\n",
"{'id': 1004,\n",
" 'metadata': {'study_instance_uid': '34343-34343-34343',\n",
" 'accession_number': '345348389',\n",
" 'series_instance_uid': '34343-676556-3343',\n",
Expand All @@ -317,12 +317,12 @@
" 'study_datetime': '2000-08-10T00:00:00+02:00',\n",
" 'links': ['http://gepacs.com/34343-34343-34343'],\n",
" 'body': 'This is an upserted report',\n",
" 'created_at': '2024-03-16T17:51:10.458025+01:00',\n",
" 'updated_at': '2024-03-16T17:51:10.458038+01:00',\n",
" 'created_at': '2024-03-20T00:41:37.176470+01:00',\n",
" 'updated_at': '2024-03-20T00:41:37.176478+01:00',\n",
" 'groups': [2]}"
]
},
"execution_count": 70,
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
Expand Down Expand Up @@ -362,7 +362,7 @@
},
{
"cell_type": "code",
"execution_count": 71,
"execution_count": 14,
"metadata": {},
"outputs": [
{
Expand Down
6 changes: 3 additions & 3 deletions radis/core/management/commands/populate_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@
from django.conf import settings
from django.contrib.auth.models import Group, Permission
from django.core.management.base import BaseCommand, CommandParser
from django.db import transaction
from faker import Faker

from radis.accounts.factories import AdminUserFactory, GroupFactory, UserFactory
from radis.accounts.models import User
from radis.reports.factories import ReportFactory
from radis.reports.models import Report
from radis.reports.site import reports_created_handlers
from radis.token_authentication.factories import TokenFactory
from radis.token_authentication.models import FRACTION_LENGTH
from radis.token_authentication.utils.crypto import hash_token
from radis.vespa.utils.document_utils import create_documents

USER_COUNT = 20
GROUP_COUNT = 3
Expand Down Expand Up @@ -47,8 +48,7 @@ def feed_reports(language: Literal["en", "de"]):
for report_body in report_bodies:
reports.append(create_report(report_body, language))

for handler in reports_created_handlers:
handler([report.id for report in reports])
transaction.on_commit(lambda: create_documents([report.id for report in reports]))


def create_admin() -> User:
Expand Down
25 changes: 19 additions & 6 deletions radis/reports/api/viewsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@
from rest_framework.response import Response
from rest_framework.serializers import BaseSerializer

from radis.reports.tasks import reports_created, reports_deleted, reports_updated

from ..models import Report
from ..site import document_fetchers
from ..site import (
document_fetchers,
reports_created_handlers,
reports_deleted_handlers,
reports_updated_handlers,
)
from .serializers import ReportSerializer


Expand Down Expand Up @@ -66,7 +69,11 @@ def perform_create(self, serializer: BaseSerializer) -> None:
if not isinstance(reports, list):
reports = [reports]

transaction.on_commit(lambda: reports_created.delay([report.id for report in reports]))
transaction.on_commit(
lambda: [
handler([report.id for report in reports]) for handler in reports_created_handlers
]
)

def update(self, request: Request, *args: Any, **kwargs: Any) -> Response:
# DRF itself does not support upsert.
Expand Down Expand Up @@ -102,7 +109,11 @@ def perform_update(self, serializer: BaseSerializer) -> None:
if not isinstance(reports, list):
reports = [reports]

transaction.on_commit(lambda: reports_updated.delay([report.id for report in reports]))
transaction.on_commit(
lambda: [
handler([report.id for report in reports]) for handler in reports_updated_handlers
]
)

def partial_update(self, request: Request, *args: Any, **kwargs: Any) -> Response:
# Disallow partial updates
Expand All @@ -111,4 +122,6 @@ def partial_update(self, request: Request, *args: Any, **kwargs: Any) -> Respons

def perform_destroy(self, instance: Report) -> None:
super().perform_destroy(instance)
transaction.on_commit(lambda: reports_deleted.delay([instance.document_id]))
transaction.on_commit(
lambda: [handler([instance.document_id]) for handler in reports_deleted_handlers]
)
21 changes: 0 additions & 21 deletions radis/reports/tasks.py

This file was deleted.

24 changes: 15 additions & 9 deletions radis/vespa/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,27 @@ def register_app():
)
from radis.search.site import SearchProvider, register_search_provider
from radis.vespa.providers import retrieve_bm25
from radis.vespa.tasks import process_deleted_reports, process_updated_reports
from radis.vespa.vespa_app import MAX_RETRIEVAL_HITS, MAX_SEARCH_HITS

from .providers import search_bm25, search_hybrid, search_semantic
from .utils.document_utils import (
create_documents,
delete_documents,
fetch_document,
update_documents,
)
from .tasks import process_created_reports
from .utils.document_utils import fetch_document

def handle_created_reports(report_ids: list[int]) -> None:
process_created_reports.delay(report_ids)

register_reports_created_handler(handle_created_reports)

def handle_updated_reports(report_ids: list[int]) -> None:
process_updated_reports.delay(report_ids)

register_reports_created_handler(lambda report_ids: create_documents(report_ids))
register_reports_updated_handler(handle_updated_reports)

register_reports_updated_handler(lambda report_ids: update_documents(report_ids))
def handle_deleted_reports(document_ids: list[str]) -> None:
process_deleted_reports.delay(document_ids)

register_reports_deleted_handler(lambda document_ids: delete_documents(document_ids))
register_reports_deleted_handler(handle_deleted_reports)

def fetch_vespa_document(report: Report) -> dict[str, Any]:
return fetch_document(report.document_id)
Expand Down
18 changes: 18 additions & 0 deletions radis/vespa/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from celery import shared_task

from radis.vespa.utils.document_utils import create_documents, delete_documents, update_documents


@shared_task(queue="vespa_queue")
def process_created_reports(report_ids: list[int]) -> None:
create_documents(report_ids)


@shared_task(queue="vespa_queue")
def process_updated_reports(report_ids: list[int]) -> None:
update_documents(report_ids)


@shared_task(queue="vespa_queue")
def process_deleted_reports(document_ids: list[str]) -> None:
delete_documents(document_ids)

0 comments on commit 4f91612

Please sign in to comment.