diff --git a/.gitignore b/.gitignore index d6ad56b8..d6c33442 100644 --- a/.gitignore +++ b/.gitignore @@ -336,3 +336,4 @@ DQMIO/ DQMIO_samples/ usercert.pem userkey.pem +ML_models/ diff --git a/backend/Dockerfile b/backend/Dockerfile index ab040f52..b3c9bd2b 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -44,16 +44,17 @@ ENV USERNAME=app ENV HOME=/home/$USERNAME ENV APP_HOME=$HOME/backend -RUN mkdir -p $HOME -RUN mkdir $APP_HOME -RUN addgroup --system --gid $GID $USERNAME && adduser --system --ingroup $USERNAME --uid $UID $USERNAME +RUN mkdir -p $HOME \ + && mkdir $APP_HOME \ + && addgroup --system --gid $GID $USERNAME \ + && adduser --system --ingroup $USERNAME --uid $UID $USERNAME WORKDIR $APP_HOME COPY --from=builder /usr/src/build/wheels /wheels COPY --from=builder /usr/src/build/requirements.txt . -RUN pip3 install --upgrade pip -RUN pip3 install --no-cache /wheels/* +RUN pip3 install --upgrade pip \ + && pip3 install --no-cache /wheels/* COPY backend $APP_HOME @@ -61,9 +62,9 @@ RUN chown -R $USERNAME:$USERNAME $APP_HOME USER $USERNAME -RUN python -m compileall dials -RUN $APP_HOME/scripts/setup-django-production.sh +RUN python -m compileall dials \ + && $APP_HOME/scripts/setup-django-production.sh EXPOSE 8000 -CMD ${APP_HOME}/scripts/run-django-production.sh +CMD ["./scripts/run-django-production.sh"] diff --git a/backend/dials/settings.py b/backend/dials/settings.py index 599aadab..60e17605 100644 --- a/backend/dials/settings.py +++ b/backend/dials/settings.py @@ -62,6 +62,8 @@ "lumisection.apps.LumisectionConfig", "th1.apps.TH1Config", "th2.apps.TH2Config", + "ml_models_index.apps.MLModelsIndexConfig", + "ml_bad_lumisection.apps.MLBadLumisectionConfig", "cern_auth.apps.CERNAuthConfig", ] diff --git a/backend/dials/urls.py b/backend/dials/urls.py index b0f0aae8..fe9bf421 100644 --- a/backend/dials/urls.py +++ b/backend/dials/urls.py @@ -5,6 +5,8 @@ from django.views.generic import TemplateView from file_index.routers import router as file_index_router from lumisection.routers import router as lumisection_router +from ml_bad_lumisection.routers import router as ml_bad_lumisection_router +from ml_models_index.routers import router as ml_models_index_router from oms_proxy.routers import router as oms_proxy_router from rest_framework import routers from run.routers import router as run_router @@ -20,6 +22,8 @@ router.registry.extend(lumisection_router.registry) router.registry.extend(th1_router.registry) router.registry.extend(th2_router.registry) +router.registry.extend(ml_models_index_router.registry) +router.registry.extend(ml_bad_lumisection_router.registry) router.registry.extend(cern_auth_router.registry) router.registry.extend(oms_proxy_router.registry) diff --git a/backend/ml_bad_lumisection/__init__.py b/backend/ml_bad_lumisection/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/backend/ml_bad_lumisection/apps.py b/backend/ml_bad_lumisection/apps.py new file mode 100644 index 00000000..acdbea37 --- /dev/null +++ b/backend/ml_bad_lumisection/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class MLBadLumisectionConfig(AppConfig): + default_auto_field = "django.db.models.BigAutoField" + name = "ml_bad_lumisection" diff --git a/backend/ml_bad_lumisection/filters.py b/backend/ml_bad_lumisection/filters.py new file mode 100644 index 00000000..aee01180 --- /dev/null +++ b/backend/ml_bad_lumisection/filters.py @@ -0,0 +1,18 @@ +from typing import ClassVar + +from django_filters import rest_framework as filters +from utils import filters_mixins + +from .models import MLBadLumisection + + +class MLBadLumisectionFilter(filters_mixins.DatasetFilterMethods, filters_mixins.MEsMethods, filters.FilterSet): + class Meta: + model = MLBadLumisection + fields: ClassVar[dict[str, list[str]]] = { + "model_id": ["exact", "in"], + "dataset_id": ["exact"], + "me_id": ["exact"], + "run_number": ["exact", "in"], + "ls_number": ["exact"], + } diff --git a/backend/ml_bad_lumisection/models.py b/backend/ml_bad_lumisection/models.py new file mode 100644 index 00000000..5c0cd5d5 --- /dev/null +++ b/backend/ml_bad_lumisection/models.py @@ -0,0 +1,32 @@ +from typing import ClassVar + +from django.db import models + + +class MLBadLumisection(models.Model): + """ + - Django doesn't support composite primary key + - The unique constraint set in this class do not exist in the database, + it is used here to select the composite primary key in the viewset and as a documentation + """ + + model_id = models.BigIntegerField(primary_key=True) + dataset_id = models.BigIntegerField() + file_id = models.BigIntegerField() + run_number = models.IntegerField() + ls_number = models.IntegerField() + me_id = models.IntegerField() + mse = models.FloatField() + + class Meta: + managed = False + db_table = "fact_ml_bad_lumis" + constraints: ClassVar[list[models.Index]] = [ + models.UniqueConstraint( + name="fact_ml_bad_lumis_primary_key", + fields=["model_id", "dataset_id", "run_number", "ls_number", "me_id"], + ), + ] + + def __str__(self) -> str: + return f"MLBadLumisection <{self.me_id}@{self.ls_number}@{self.run_number}@{self.dataset_id}@{self.model_id}>" diff --git a/backend/ml_bad_lumisection/routers.py b/backend/ml_bad_lumisection/routers.py new file mode 100644 index 00000000..72287997 --- /dev/null +++ b/backend/ml_bad_lumisection/routers.py @@ -0,0 +1,7 @@ +from rest_framework import routers + +from .viewsets import MLBadLumisectionViewSet + + +router = routers.SimpleRouter() +router.register(r"ml-bad-lumisection", MLBadLumisectionViewSet, basename="ml-bad-lumisection") diff --git a/backend/ml_bad_lumisection/serializers.py b/backend/ml_bad_lumisection/serializers.py new file mode 100644 index 00000000..49387be1 --- /dev/null +++ b/backend/ml_bad_lumisection/serializers.py @@ -0,0 +1,9 @@ +from rest_framework import serializers + +from .models import MLBadLumisection + + +class MLBadLumisectionSerializer(serializers.ModelSerializer): + class Meta: + model = MLBadLumisection + fields = "__all__" diff --git a/backend/ml_bad_lumisection/viewsets.py b/backend/ml_bad_lumisection/viewsets.py new file mode 100644 index 00000000..785ba7de --- /dev/null +++ b/backend/ml_bad_lumisection/viewsets.py @@ -0,0 +1,168 @@ +import logging +from typing import ClassVar + +from django.conf import settings +from django.shortcuts import get_object_or_404 +from django.utils.decorators import method_decorator +from django.views.decorators.cache import cache_page +from django.views.decorators.vary import vary_on_headers +from django_filters.rest_framework import DjangoFilterBackend +from lumisection.models import Lumisection +from ml_models_index.models import MLModelsIndex +from rest_framework import mixins, viewsets +from rest_framework.authentication import BaseAuthentication +from rest_framework.decorators import action +from rest_framework.exceptions import ValidationError +from rest_framework.response import Response +from utils.common import list_to_range +from utils.db_router import GenericViewSetRouter +from utils.rest_framework_cern_sso.authentication import ( + CERNKeycloakClientSecretAuthentication, + CERNKeycloakConfidentialAuthentication, +) + +from .filters import MLBadLumisectionFilter +from .models import MLBadLumisection +from .serializers import MLBadLumisectionSerializer + + +logger = logging.getLogger(__name__) +composite_pks = next(filter(lambda x: "primary_key" in x.name, MLBadLumisection._meta.constraints), None) + + +@method_decorator(cache_page(settings.CACHE_TTL), name="list") +@method_decorator(cache_page(settings.CACHE_TTL), name="get_object") +@method_decorator(vary_on_headers(settings.WORKSPACE_HEADER), name="list") +@method_decorator(vary_on_headers(settings.WORKSPACE_HEADER), name="get_object") +class MLBadLumisectionViewSet(GenericViewSetRouter, mixins.ListModelMixin, viewsets.GenericViewSet): + queryset = MLBadLumisection.objects.all().order_by(*composite_pks.fields) + serializer_class = MLBadLumisectionSerializer + filterset_class = MLBadLumisectionFilter + filter_backends: ClassVar[list[DjangoFilterBackend]] = [DjangoFilterBackend] + authentication_classes: ClassVar[list[BaseAuthentication]] = [ + CERNKeycloakClientSecretAuthentication, + CERNKeycloakConfidentialAuthentication, + ] + + @action( + detail=False, + methods=["GET"], + url_path=r"(?P\d+)/(?P\d+)/(?P\d+)/(?P\d+)/(?P\d+)", + ) + def get_object(self, request, model_id=None, dataset_id=None, run_number=None, ls_number=None, me_id=None): + # Since the MLBadLumisection table in the database has a composite primary key + # that Django doesn't support, we are defining this method + # as a custom retrieve method to query this table by the composite primary key + try: + model_id = int(model_id) + dataset_id = int(dataset_id) + run_number = int(run_number) + ls_number = int(ls_number) + me_id = int(me_id) + except ValueError as err: + raise ValidationError( + "model_id, dataset_id, run_number, ls_number and me_id must be valid integers." + ) from err + + queryset = self.get_queryset() + queryset = get_object_or_404( + queryset, model_id=model_id, dataset_id=dataset_id, run_number=run_number, ls_number=ls_number, me_id=me_id + ) + serializer = self.serializer_class(queryset) + return Response(serializer.data) + + @action(detail=False, methods=["GET"], url_path=r"cert-json") + def generate_certificate_json(self, request): + try: + dataset_id = int(request.query_params.get("dataset_id")) + run_number = list(map(int, request.query_params.get("run_number__in").split(","))) + model_id = list(map(int, request.query_params.get("model_id__in").split(","))) + except ValueError as err: + raise ValidationError( + "dataset_id and run_number must be valid integers and model_ids a valid list of integers" + ) from err + + # Select user's workspace + workspace = self.get_workspace() + + # Fetch models' metadata in the given workspace + models = MLModelsIndex.objects.using(workspace).filter(model_id__in=model_id).all().values() + models = {qs.get("model_id"): qs for qs in models} + + # Fetch predictions for a given dataset, multiple runs from multiple models + queryset = self.get_queryset() + result = ( + queryset.filter(dataset_id=dataset_id, run_number__in=run_number, model_id__in=model_id) + .all() + .order_by("run_number", "ls_number") + .values() + ) + result = [qs for qs in result] + + # Format bad lumi certification json + response = {} + for run in run_number: + response[run] = {} + predictions_in_run = [res for res in result if res.get("run_number") == run] + unique_ls = [res.get("ls_number") for res in predictions_in_run] + for ls in unique_ls: + response[run][ls] = [] + predictions_in_ls = [res for res in predictions_in_run if res.get("ls_number") == ls] + for preds in predictions_in_ls: + mse = preds.get("mse") + model_id = preds.get("model_id") + me_id = preds.get("me_id") + filename = models[model_id].get("filename") + target_me = models[model_id].get("target_me") + response[run][ls].append( + {"model_id": model_id, "me_id": me_id, "filename": filename, "me": target_me, "mse": mse} + ) + + return Response(response) + + @action(detail=False, methods=["GET"], url_path=r"golden-json") + def generate_golden_json(self, request): + try: + dataset_id = int(request.query_params.get("dataset_id")) + run_number = list(map(int, request.query_params.get("run_number__in").split(","))) + model_id = list(map(int, request.query_params.get("model_id__in").split(","))) + except ValueError as err: + raise ValidationError( + "dataset_id and run_number must be valid integers and model_ids a valid list of integers" + ) from err + + # Select user's workspace + workspace = self.get_workspace() + + # Fetch predictions for a given dataset, multiple runs from multiple models + queryset = self.get_queryset() + result = ( + queryset.filter(dataset_id=dataset_id, run_number__in=run_number, model_id__in=model_id) + .all() + .order_by("run_number", "ls_number") + .values() + ) + result = [qs for qs in result] + + # Generate ML golden json + response = {} + for run in run_number: + queryset = self.get_queryset() + bad_lumis = ( + queryset.filter(dataset_id=dataset_id, run_number=run, model_id__in=model_id) + .all() + .order_by("ls_number") + .values_list("ls_number", flat=True) + .distinct() + ) + bad_lumis = [qs for qs in bad_lumis] + all_lumis = ( + Lumisection.objects.using(workspace) + .filter(dataset_id=dataset_id, run_number=run) + .all() + .values_list("ls_number", flat=True) + ) + good_lumis = [ls for ls in all_lumis if ls not in bad_lumis] + response[run] = list_to_range(good_lumis) + + return Response(response) diff --git a/backend/ml_models_index/__init__.py b/backend/ml_models_index/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/backend/ml_models_index/apps.py b/backend/ml_models_index/apps.py new file mode 100644 index 00000000..601654af --- /dev/null +++ b/backend/ml_models_index/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class MLModelsIndexConfig(AppConfig): + default_auto_field = "django.db.models.BigAutoField" + name = "ml_models_index" diff --git a/backend/ml_models_index/filters.py b/backend/ml_models_index/filters.py new file mode 100644 index 00000000..1a7866a6 --- /dev/null +++ b/backend/ml_models_index/filters.py @@ -0,0 +1,15 @@ +from typing import ClassVar + +from django_filters import rest_framework as filters + +from .models import MLModelsIndex + + +class MLModelsIndexFilter(filters.FilterSet): + class Meta: + model = MLModelsIndex + fields: ClassVar[dict[str, list[str]]] = { + "model_id": ["exact", "in"], + "target_me": ["exact", "regex"], + "active": ["exact"], + } diff --git a/backend/ml_models_index/models.py b/backend/ml_models_index/models.py new file mode 100644 index 00000000..ccf78522 --- /dev/null +++ b/backend/ml_models_index/models.py @@ -0,0 +1,20 @@ +from typing import ClassVar + +from django.db import models + + +class MLModelsIndex(models.Model): + model_id = models.IntegerField(primary_key=True) + filename = models.CharField(max_length=255) + target_me = models.CharField(max_length=255) + active = models.BooleanField() + + class Meta: + managed = False + db_table = "dim_ml_models_index" + indexes: ClassVar[list[models.Index]] = [ + models.Index(name="idx_active", fields=["active"]), + ] + + def __str__(self) -> str: + return f"Model <{self.model_id}>" diff --git a/backend/ml_models_index/routers.py b/backend/ml_models_index/routers.py new file mode 100644 index 00000000..791bc711 --- /dev/null +++ b/backend/ml_models_index/routers.py @@ -0,0 +1,7 @@ +from rest_framework import routers + +from .viewsets import MLModelsIndexViewSet + + +router = routers.SimpleRouter() +router.register(r"ml-models-index", MLModelsIndexViewSet, basename="ml-models-index") diff --git a/backend/ml_models_index/serializers.py b/backend/ml_models_index/serializers.py new file mode 100644 index 00000000..15797d04 --- /dev/null +++ b/backend/ml_models_index/serializers.py @@ -0,0 +1,9 @@ +from rest_framework import serializers + +from .models import MLModelsIndex + + +class MLModelsIndexSerializer(serializers.ModelSerializer): + class Meta: + model = MLModelsIndex + fields = "__all__" diff --git a/backend/ml_models_index/viewsets.py b/backend/ml_models_index/viewsets.py new file mode 100644 index 00000000..72536563 --- /dev/null +++ b/backend/ml_models_index/viewsets.py @@ -0,0 +1,39 @@ +import logging +from typing import ClassVar + +from django.conf import settings +from django.utils.decorators import method_decorator +from django.views.decorators.cache import cache_page +from django.views.decorators.vary import vary_on_headers +from django_filters.rest_framework import DjangoFilterBackend +from rest_framework import mixins, viewsets +from rest_framework.authentication import BaseAuthentication +from utils.db_router import GenericViewSetRouter +from utils.rest_framework_cern_sso.authentication import ( + CERNKeycloakClientSecretAuthentication, + CERNKeycloakConfidentialAuthentication, +) + +from .filters import MLModelsIndexFilter +from .models import MLModelsIndex +from .serializers import MLModelsIndexSerializer + + +logger = logging.getLogger(__name__) + + +@method_decorator(cache_page(settings.CACHE_TTL), name="retrieve") +@method_decorator(cache_page(settings.CACHE_TTL), name="list") +@method_decorator(vary_on_headers(settings.WORKSPACE_HEADER), name="retrieve") +@method_decorator(vary_on_headers(settings.WORKSPACE_HEADER), name="list") +class MLModelsIndexViewSet( + GenericViewSetRouter, mixins.RetrieveModelMixin, mixins.ListModelMixin, viewsets.GenericViewSet +): + queryset = MLModelsIndex.objects.all().order_by(MLModelsIndex._meta.pk.name) + serializer_class = MLModelsIndexSerializer + filterset_class = MLModelsIndexFilter + filter_backends: ClassVar[list[DjangoFilterBackend]] = [DjangoFilterBackend] + authentication_classes: ClassVar[list[BaseAuthentication]] = [ + CERNKeycloakClientSecretAuthentication, + CERNKeycloakConfidentialAuthentication, + ] diff --git a/backend/static/swagger.json b/backend/static/swagger.json index 3dd4f9ed..601d2717 100644 --- a/backend/static/swagger.json +++ b/backend/static/swagger.json @@ -1920,48 +1920,563 @@ ] } }, - "/api/v1/oms-proxy/": { + "/api/v1/oms-proxy/": { + "get": { + "operationId": "proxyOMS", + "description": "Proxy request to OMS API", + "parameters": [ + { + "name": "endpoint", + "required": true, + "in": "query", + "description": "OMS API endpoint such as (runs, lumisections, datasetrates and etc)", + "schema": { + "type": "string" + } + }, + { + "name": "filter", + "required": true, + "in": "query", + "description": "OMS API endpoint filters", + "schema": { + "type": "object", + "additionalProperties": true, + "example": { + "run_number": 382921, + "dataset_name": "ZeroBias" + } + }, + "style": "deepObject" + }, + { + "name": "page", + "required": false, + "in": "query", + "description": "OMS API endpoint page configuration", + "schema": { + "type": "object", + "additionalProperties": true, + "example": { + "limit": 10 + } + }, + "style": "deepObject" + } + ], + "responses": { + "200": { + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + } + } + } + }, + "description": "" + } + }, + "tags": [ + "OMSProxy" + ], + "security": [ + { + "Client Secret Key": [] + }, + { + "Confidential JWT Token": [] + }, + {} + ] + } + }, + "/api/v1/ml-models-index/": { + "get": { + "operationId": "listMLModelsIndex", + "description": "", + "parameters": [ + { + "name": "next_token", + "required": false, + "in": "query", + "description": "next_token", + "schema": { + "type": "string" + } + }, + { + "name": "model_id", + "required": false, + "in": "query", + "description": "model_id", + "schema": { + "type": "integer" + } + }, + { + "name": "model_id__in", + "required": false, + "in": "query", + "description": "model_id__in", + "schema": { + "type": "array", + "items": { + "type": "integer" + } + }, + "style": "form", + "explode": false + }, + { + "name": "filename", + "required": false, + "in": "query", + "description": "filename", + "schema": { + "type": "string" + } + }, + { + "name": "filename__regex", + "required": false, + "in": "query", + "description": "filename__regex", + "schema": { + "type": "string" + } + }, + { + "name": "target_me", + "required": false, + "in": "query", + "description": "target_me", + "schema": { + "type": "string" + } + }, + { + "name": "target_me__regex", + "required": false, + "in": "query", + "description": "target_me__regex", + "schema": { + "type": "string" + } + }, + { + "name": "active", + "required": false, + "in": "query", + "description": "active", + "schema": { + "type": "boolean" + } + }, + { + "name": "workspace", + "required": false, + "in": "header", + "description": "workspace", + "schema": { + "type": "string" + } + } + ], + "responses": { + "200": { + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "next": { + "type": "string", + "nullable": true, + "format": "uri", + "example": "http://api.example.org/accounts/?next_token=cD0zMz" + }, + "previous": { + "type": "string", + "nullable": true, + "format": "uri", + "example": "http://api.example.org/accounts/?next_token=cj0xJnA" + }, + "results": { + "type": "array", + "items": { + "$ref": "#/components/schemas/MLModelsIndex" + } + } + } + } + } + }, + "description": "" + } + }, + "tags": [ + "ML Models Index" + ], + "security": [ + { + "Client Secret Key": [] + }, + { + "Confidential JWT Token": [] + }, + {} + ] + } + }, + "/api/v1/ml-models-index/{model_id}/": { + "get": { + "operationId": "retrieveMLModelsIndex", + "description": "", + "parameters": [ + { + "name": "model_id", + "in": "path", + "required": true, + "description": "A unique value identifying this ml model in the index.", + "schema": { + "type": "string" + } + }, + { + "name": "workspace", + "required": false, + "in": "header", + "description": "workspace", + "schema": { + "type": "string" + } + } + ], + "responses": { + "200": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/MLModelsIndex" + } + } + }, + "description": "" + } + }, + "tags": [ + "ML Models Index" + ], + "security": [ + { + "Client Secret Key": [] + }, + { + "Confidential JWT Token": [] + }, + {} + ] + } + }, + "/api/v1/ml-bad-lumisection/": { + "get": { + "operationId": "listMLBadLumisection", + "description": "", + "parameters": [ + { + "name": "next_token", + "required": false, + "in": "query", + "description": "next_token", + "schema": { + "type": "string" + } + }, + { + "name": "model_id", + "required": false, + "in": "query", + "description": "model_id", + "schema": { + "type": "integer" + } + }, + { + "name": "model_id__in", + "required": false, + "in": "query", + "description": "model_id__in", + "schema": { + "type": "array", + "items": { + "type": "integer" + } + }, + "style": "form", + "explode": false + }, + { + "name": "dataset", + "required": false, + "in": "query", + "description": "dataset", + "schema": { + "type": "string" + } + }, + { + "name": "dataset__regex", + "required": false, + "in": "query", + "description": "dataset__regex", + "schema": { + "type": "string" + } + }, + { + "name": "me", + "required": false, + "in": "query", + "description": "me", + "schema": { + "type": "string" + } + }, + { + "name": "me__regex", + "required": false, + "in": "query", + "description": "me__regex", + "schema": { + "type": "string" + } + }, + { + "name": "run_number", + "required": false, + "in": "query", + "description": "run_number", + "schema": { + "type": "integer" + } + }, + { + "name": "run_number__in", + "required": false, + "in": "query", + "description": "run_number__in", + "schema": { + "type": "array", + "items": { + "type": "integer" + } + }, + "style": "form", + "explode": false + }, + { + "name": "ls_number", + "required": false, + "in": "query", + "description": "ls_number", + "schema": { + "type": "string" + } + }, + { + "name": "workspace", + "required": false, + "in": "header", + "description": "workspace", + "schema": { + "type": "string" + } + } + ], + "responses": { + "200": { + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "next": { + "type": "string", + "nullable": true, + "format": "uri", + "example": "http://api.example.org/accounts/?next_token=cD0zMz" + }, + "previous": { + "type": "string", + "nullable": true, + "format": "uri", + "example": "http://api.example.org/accounts/?next_token=cj0xJnA" + }, + "results": { + "type": "array", + "items": { + "$ref": "#/components/schemas/MLBadLumisection" + } + } + } + } + } + }, + "description": "" + } + }, + "tags": [ + "ML Bad Lumisection" + ], + "security": [ + { + "Client Secret Key": [] + }, + { + "Confidential JWT Token": [] + }, + {} + ] + } + }, + "/api/v1/ml-bad-lumisection/{model_id}/{dataset_id}/{run_number}/{ls_number}/{me_id}/": { "get": { - "operationId": "proxyOMS", - "description": "Proxy request to OMS API", + "operationId": "retrieveMLBadLumisection", + "description": "", "parameters": [ { - "name": "endpoint", + "name": "model_id", + "in": "path", "required": true, - "in": "query", - "description": "OMS API endpoint such as (runs, lumisections, datasetrates and etc)", + "description": "A unique value identifying this ml model.", + "schema": { + "type": "string" + } + }, + { + "name": "dataset_id", + "in": "path", + "required": true, + "description": "A unique value identifying the dataset.", + "schema": { + "type": "string" + } + }, + { + "name": "run_number", + "in": "path", + "required": true, + "description": "A unique value identifying the run.", + "schema": { + "type": "string" + } + }, + { + "name": "ls_number", + "in": "path", + "required": true, + "description": "A unique value identifying the lumisection.", "schema": { "type": "string" } }, { - "name": "filter", + "name": "me_id", + "in": "path", "required": true, + "description": "A unique value identifying the monitoring element.", + "schema": { + "type": "string" + } + }, + { + "name": "workspace", + "required": false, + "in": "header", + "description": "workspace", + "schema": { + "type": "string" + } + } + ], + "responses": { + "200": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/MLBadLumisection" + } + } + }, + "description": "" + } + }, + "tags": [ + "ML Bad Lumisection" + ], + "security": [ + { + "Client Secret Key": [] + }, + { + "Confidential JWT Token": [] + }, + {} + ] + } + }, + "/api/v1/ml-bad-lumisection/cert-json": { + "get": { + "operationId": "certJsonMLBadLumisection", + "description": "", + "parameters": [ + { + "name": "model_id__in", + "required": false, "in": "query", - "description": "OMS API endpoint filters", + "description": "model_id__in", "schema": { - "type": "object", - "additionalProperties": true, - "example": { - "run_number": 382921, - "dataset_name": "ZeroBias" + "type": "array", + "items": { + "type": "integer" } }, - "style": "deepObject" + "style": "form", + "explode": false + }, + { + "name": "dataset_id", + "required": false, + "in": "query", + "description": "dataset_id", + "schema": { + "type": "integer" + } }, { - "name": "page", + "name": "run_number__in", "required": false, "in": "query", - "description": "OMS API endpoint page configuration", + "description": "run_number__in", "schema": { - "type": "object", - "additionalProperties": true, - "example": { - "limit": 10, + "type": "array", + "items": { + "type": "integer" } }, - "style": "deepObject" + "style": "form", + "explode": false + }, + { + "name": "workspace", + "required": false, + "in": "header", + "description": "workspace", + "schema": { + "type": "string" + } } ], "responses": { @@ -1969,9 +2484,7 @@ "content": { "application/json": { "schema": { - "type": "object", - "properties": { - } + "$ref": "#/components/schemas/MLBadLumisectionCertJson" } } }, @@ -1979,7 +2492,7 @@ } }, "tags": [ - "OMSProxy" + "ML Bad Lumisection" ], "security": [ { @@ -1992,6 +2505,84 @@ ] } }, + "/api/v1/ml-bad-lumisection/golden-json": { + "get": { + "operationId": "goldenJsonMLBadLumisection", + "description": "", + "parameters": [ + { + "name": "model_id__in", + "required": false, + "in": "query", + "description": "model_id__in", + "schema": { + "type": "array", + "items": { + "type": "integer" + } + }, + "style": "form", + "explode": false + }, + { + "name": "dataset_id", + "required": false, + "in": "query", + "description": "dataset_id", + "schema": { + "type": "integer" + } + }, + { + "name": "run_number__in", + "required": false, + "in": "query", + "description": "run_number__in", + "schema": { + "type": "array", + "items": { + "type": "integer" + } + }, + "style": "form", + "explode": false + }, + { + "name": "workspace", + "required": false, + "in": "header", + "description": "workspace", + "schema": { + "type": "string" + } + } + ], + "responses": { + "200": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/MLBadLumisectionGoldenJson" + } + } + }, + "description": "" + } + }, + "tags": [ + "ML Bad Lumisection" + ], + "security": [ + { + "Client Secret Key": [] + }, + { + "Confidential JWT Token": [] + }, + {} + ] + } + } }, "components": { "schemas": { @@ -2475,6 +3066,149 @@ "entries", "data" ] + }, + "MLModelsIndex": { + "type": "object", + "properties": { + "model_id": { + "type": "integer", + "maximum": 9223372036854776000, + "minimum": -9223372036854776000, + "format": "int64" + }, + "filename": { + "type": "string", + "maxLength": 255 + }, + "target_me": { + "type": "string", + "maxLength": 255 + }, + "active": { + "type": "boolean" + } + }, + "required": [ + "model_id", + "filename", + "target_me", + "active" + ] + }, + "MLBadLumisection": { + "type": "object", + "properties": { + "model_id": { + "type": "integer", + "maximum": 9223372036854776000, + "minimum": -9223372036854776000, + "format": "int64" + }, + "dataset_id": { + "type": "integer", + "maximum": 9223372036854776000, + "minimum": -9223372036854776000, + "format": "int64" + }, + "file_id": { + "type": "integer", + "maximum": 9223372036854776000, + "minimum": -9223372036854776000, + "format": "int64" + }, + "run_number": { + "type": "integer", + "maximum": 2147483647, + "minimum": -2147483648 + }, + "ls_number": { + "type": "integer", + "maximum": 2147483647, + "minimum": -2147483648 + }, + "me_id": { + "type": "integer", + "maximum": 2147483647, + "minimum": -2147483648 + }, + "mse": { + "type": "number" + }, + }, + "required": [ + "model_id", + "dataset_id", + "file_id", + "run_number", + "ls_number", + "me_id", + "mse" + ] + }, + "MLBadLumisectionCertJson": { + "type": "object", + "properties": { + "run_number": { + "type": "object", + "properties": { + "ls_number": { + "type": "array", + "items": { + "type": "object", + "properties": { + "model_id": { + "type": "integer", + "maximum": 2147483647, + "minimum": -2147483648 + }, + "me_id": { + "type": "integer", + "maximum": 2147483647, + "minimum": -2147483648 + }, + "filename": { + "type": "string" + }, + "me": { + "type": "string" + } + } + } + } + } + } + }, + "required": [ + "run_number", + "ls_number", + "model_id", + "me_id", + "filename", + "me" + ] + }, + "MLBadLumisectionGoldenJson": { + "type": "object", + "properties": { + "run_number": { + "type": "array", + "items": { + "type": "array", + "items": { + "type": "integer" + }, + "example": [128, 145] + } + } + }, + "required": [ + "run_number", + "ls_number", + "model_id", + "me_id", + "filename", + "me" + ] } }, "securitySchemes": { diff --git a/backend/utils/common.py b/backend/utils/common.py new file mode 100644 index 00000000..48815ba7 --- /dev/null +++ b/backend/utils/common.py @@ -0,0 +1,7 @@ +import itertools + + +def list_to_range(i): + for _, b in itertools.groupby(enumerate(i), lambda pair: pair[1] - pair[0]): + b = list(b) + yield b[0][1], b[-1][1] diff --git a/backend/utils/db_router.py b/backend/utils/db_router.py index 93d20e59..31b3e969 100644 --- a/backend/utils/db_router.py +++ b/backend/utils/db_router.py @@ -16,16 +16,17 @@ def get_queryset(self): queryset = super().get_queryset() order_by = queryset.query.order_by queryset = queryset.model.objects - workspace = self.request.headers.get(settings.WORKSPACE_HEADER.capitalize()) + workspace = self.get_workspace() + queryset = queryset.using(workspace) + return queryset.all().order_by(*order_by) + def get_workspace(self): + workspace = self.request.headers.get(settings.WORKSPACE_HEADER.capitalize()) if workspace: if workspace not in settings.WORKSPACES.keys(): raise NotFound(detail=f"Workspace '{workspace}' not found", code=404) - queryset = queryset.using(workspace) else: user_roles = self.request.user.cern_roles workspace = get_workspace_from_role(user_roles) workspace = workspace or settings.DEFAULT_WORKSPACE - queryset = queryset.using(workspace) - - return queryset.all().order_by(*order_by) + return workspace diff --git a/etl/alembic/versions/bb3f9a1b3fa2_add_ml_bad_lumis.py b/etl/alembic/versions/bb3f9a1b3fa2_add_ml_bad_lumis.py new file mode 100644 index 00000000..1d51396e --- /dev/null +++ b/etl/alembic/versions/bb3f9a1b3fa2_add_ml_bad_lumis.py @@ -0,0 +1,60 @@ +# noqa: INP001 + +"""add ml bad lumis + +Revision ID: bb3f9a1b3fa2 +Revises: 86e3beee4a68 +Create Date: 2024-03-26 16:09:50.366283 + +""" + +from collections.abc import Sequence + +from alembic import op + + +# revision identifiers, used by Alembic. +revision: str = "bb3f9a1b3fa2" +down_revision: str = "86e3beee4a68" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def fact_ml_bad_lumis() -> list: + # We don't need extra indexes + op.execute(""" + CREATE TABLE IF NOT EXISTS fact_ml_bad_lumis ( + model_id BIGINT, + dataset_id BIGINT, + file_id BIGINT, + run_number INT, + ls_number INT, + me_id INT, + mse DOUBLE PRECISION, + CONSTRAINT fact_ml_bad_lumis_pk PRIMARY KEY (model_id, dataset_id, run_number, ls_number, me_id) + ); + """) + op.execute("CREATE INDEX idx_mlbl_dataset_id_run_number ON fact_ml_bad_lumis (dataset_id, run_number);") + + +def dim_ml_models_index() -> list: + op.execute(""" + CREATE TABLE IF NOT EXISTS dim_ml_models_index ( + model_id SERIAL, + filename VARCHAR(255), + target_me VARCHAR(255), + active BOOLEAN, + CONSTRAINT dim_ml_models_index_pk PRIMARY KEY (model_id) + ); + """) + op.execute("CREATE INDEX idx_active ON dim_ml_models_index (active);") + + +def upgrade(engine_name: str) -> None: + dim_ml_models_index() + fact_ml_bad_lumis() + + +def downgrade(engine_name: str) -> None: + op.drop_table("dim_ml_models_index") + op.drop_table("fact_ml_bad_lumis") diff --git a/etl/cli.py b/etl/cli.py index a14da1e8..521be455 100755 --- a/etl/cli.py +++ b/etl/cli.py @@ -4,7 +4,7 @@ from python.config import common_indexer_queue, primary_datasets, priority_era, workspaces from python.env import conn_str -from python.models import FactFileIndex, FactTH1, FactTH2 +from python.models import DimMLModelsIndex, FactFileIndex, FactTH1, FactTH2 from python.models.file_index import StatusCollection from python.pipelines.dataset_indexer.tasks import dataset_indexer_pipeline_task from python.pipelines.file_downloader.tasks import file_downloader_pipeline_task @@ -154,6 +154,15 @@ def indexing_handler(args): ) +def add_ml_model_to_index_hanlder(args): + engine = get_engine(args.workspace) + Session = sessionmaker(bind=engine) # noqa: N806 + with Session() as session: + model = DimMLModelsIndex(filename=args.filename, target_me=args.target_me, thr=args.thr, active=args.active) + session.add(model) + session.commit() + + def main(): parser = argparse.ArgumentParser(description="DIALS etl command line interface") subparsers = parser.add_subparsers(dest="command", title="Commands") @@ -202,6 +211,19 @@ def main(): ) clean_table_parser.set_defaults(handler=clean_parsing_error_handler) + # Register ml model command + add_ml_model_parser = subparsers.add_parser("add-ml-model-to-index", help="Register ML molde into DB") + add_ml_model_parser.add_argument("-w", "--workspace", help="Workspace name.", required=True) + add_ml_model_parser.add_argument("-f", "--filename", help="Model binary filename", required=True) + add_ml_model_parser.add_argument( + "-m", "--target-me", help="Monitoring element predicted by the model", required=True + ) + add_ml_model_parser.add_argument( + "-t", "--thr", help="Model threshold for anomaly detection", required=True, type=float + ) + add_ml_model_parser.add_argument("-a", "--active", help="Is the model active?", required=True, type=bool) + add_ml_model_parser.set_defaults(handler=add_ml_model_to_index_hanlder) + args = parser.parse_args() if hasattr(args, "handler"): diff --git a/etl/fill_ml_index.py b/etl/fill_ml_index.py new file mode 100644 index 00000000..5b246fe1 --- /dev/null +++ b/etl/fill_ml_index.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python + +from python.env import conn_str +from python.models import DimMLModelsIndex +from sqlalchemy import create_engine +from sqlalchemy.engine.base import Engine +from sqlalchemy.orm import sessionmaker + + +def get_engine(workspace: str) -> Engine: + return create_engine(f"{conn_str}/{workspace}") + + +def register_model(ws, model_metadata): + engine = get_engine(ws) + Session = sessionmaker(bind=engine) # noqa: N806 + with Session() as session: + model = DimMLModelsIndex( + filename=model_metadata["filename"], + target_me=model_metadata["target_me"], + active=model_metadata["active"], + ) + session.add(model) + session.commit() + + +if __name__ == "__main__": + models = [ + { + "filename": "model_CHFrac_highPt_Barrel_checkpoint_20240517.onnx", + "target_me": "JetMET/Jet/Cleanedak4PFJetsCHS/CHFrac_highPt_Barrel", + "active": False, + }, + { + "filename": "model_CHFrac_highPt_EndCap_checkpoint_20240517.onnx", + "target_me": "JetMET/Jet/Cleanedak4PFJetsCHS/CHFrac_highPt_EndCap", + "active": False, + }, + { + "filename": "model_CHFrac_lowPt_Barrel_checkpoint_20240517.onnx", + "target_me": "JetMET/Jet/Cleanedak4PFJetsCHS/CHFrac_lowPt_Barrel", + "active": False, + }, + { + "filename": "model_CHFrac_lowPt_EndCap_checkpoint_20240517.onnx", + "target_me": "JetMET/Jet/Cleanedak4PFJetsCHS/CHFrac_lowPt_EndCap", + "active": False, + }, + { + "filename": "model_CHFrac_mediumPt_Barrel_checkpoint_20240517.onnx", + "target_me": "JetMET/Jet/Cleanedak4PFJetsCHS/CHFrac_mediumPt_Barrel", + "active": False, + }, + { + "filename": "model_CHFrac_mediumPt_EndCap_checkpoint_20240517.onnx", + "target_me": "JetMET/Jet/Cleanedak4PFJetsCHS/CHFrac_mediumPt_EndCap", + "active": False, + }, + { + "filename": "model_MET_2_checkpoint_20240517.onnx", + "target_me": "JetMET/MET/pfMETT1/Cleaned/MET_2", + "active": False, + }, + { + "filename": "model_METPhi_checkpoint_20240517.onnx", + "target_me": "JetMET/MET/pfMETT1/Cleaned/METPhi", + "active": False, + }, + { + "filename": "model_METSig_checkpoint_20240517.onnx", + "target_me": "JetMET/MET/pfMETT1/Cleaned/METSig", + "active": False, + }, + { + "filename": "model_SumET_checkpoint_20240517.onnx", + "target_me": "JetMET/MET/pfMETT1/Cleaned/SumET", + "active": False, + }, + { + "filename": "model_CHFrac_highPt_Barrel_checkpoint_20240720.onnx", + "target_me": "JetMET/Jet/Cleanedak4PFJetsCHS/CHFrac_highPt_Barrel", + "active": True, + }, + { + "filename": "model_CHFrac_highPt_EndCap_checkpoint_20240720.onnx", + "target_me": "JetMET/Jet/Cleanedak4PFJetsCHS/CHFrac_highPt_EndCap", + "active": True, + }, + { + "filename": "model_CHFrac_lowPt_Barrel_checkpoint_20240720.onnx", + "target_me": "JetMET/Jet/Cleanedak4PFJetsCHS/CHFrac_lowPt_Barrel", + "active": True, + }, + { + "filename": "model_CHFrac_lowPt_EndCap_checkpoint_20240720.onnx", + "target_me": "JetMET/Jet/Cleanedak4PFJetsCHS/CHFrac_lowPt_EndCap", + "active": True, + }, + { + "filename": "model_CHFrac_mediumPt_Barrel_checkpoint_20240720.onnx", + "target_me": "JetMET/Jet/Cleanedak4PFJetsCHS/CHFrac_mediumPt_Barrel", + "active": True, + }, + { + "filename": "model_CHFrac_mediumPt_EndCap_checkpoint_20240720.onnx", + "target_me": "JetMET/Jet/Cleanedak4PFJetsCHS/CHFrac_mediumPt_EndCap", + "active": True, + }, + { + "filename": "model_MET_2_checkpoint_20240720.onnx", + "target_me": "JetMET/MET/pfMETT1/Cleaned/MET_2", + "active": True, + }, + { + "filename": "model_METPhi_checkpoint_20240720.onnx", + "target_me": "JetMET/MET/pfMETT1/Cleaned/METPhi", + "active": True, + }, + { + "filename": "model_METSig_checkpoint_20240720.onnx", + "target_me": "JetMET/MET/pfMETT1/Cleaned/METSig", + "active": True, + }, + { + "filename": "model_SumET_checkpoint_20240720.onnx", + "target_me": "JetMET/MET/pfMETT1/Cleaned/SumET", + "active": True, + }, + ] + for model_metadata in models: + register_model("jetmet", model_metadata) diff --git a/etl/python/celery/celeryconfig.py b/etl/python/celery/celeryconfig.py index ff34c21e..8250d465 100644 --- a/etl/python/celery/celeryconfig.py +++ b/etl/python/celery/celeryconfig.py @@ -17,4 +17,5 @@ "python.pipelines.dataset_indexer.tasks", "python.pipelines.file_indexer.tasks", "python.pipelines.file_ingesting.tasks", + "python.pipelines.ml_inference.tasks", ) diff --git a/etl/python/env.py b/etl/python/env.py index 24e80b9c..d7f9bee6 100644 --- a/etl/python/env.py +++ b/etl/python/env.py @@ -4,6 +4,7 @@ app_env = config("ENV") eos_landing_zone = config("EOS_LANDING_ZONE") mounted_eos_path = config("MOUNTED_EOS_PATH", default=None) +model_registry_path = config("MODEL_REGISTRY_PATH") conn_str = config("DATABASE_URI") lxplus_user = config("KEYTAB_USER") lxplus_pwd = config("KEYTAB_PWD") diff --git a/etl/python/models/__init__.py b/etl/python/models/__init__.py index a94704df..31ba9a3f 100644 --- a/etl/python/models/__init__.py +++ b/etl/python/models/__init__.py @@ -1,13 +1,16 @@ from .dataset_index import FactDatasetIndex from .dim_mes import DimMonitoringElements +from .dim_ml_index import DimMLModelsIndex from .file_index import FactFileIndex from .lumisection import FactLumisection +from .ml_bad_lumis import FactMLBadLumis from .run import FactRun from .th1 import FactTH1 from .th2 import FactTH2 __all__ = [ + "DimMLModelsIndex", "DimMonitoringElements", "FactDatasetIndex", "FactFileIndex", @@ -15,4 +18,5 @@ "FactLumisection", "FactTH1", "FactTH2", + "FactMLBadLumis", ] diff --git a/etl/python/models/dim_ml_index.py b/etl/python/models/dim_ml_index.py new file mode 100644 index 00000000..84a64d54 --- /dev/null +++ b/etl/python/models/dim_ml_index.py @@ -0,0 +1,19 @@ +import sqlalchemy as sa +from sqlalchemy.orm import declarative_base + + +Base = declarative_base() + + +class DimMLModelsIndex(Base): + __tablename__ = "dim_ml_models_index" + + model_id = sa.Column("model_id", sa.BigInteger, autoincrement=True) + filename = sa.Column("filename", sa.String(255)) + target_me = sa.Column("target_me", sa.String(255)) + active = sa.Column("active", sa.Boolean) + + __table_args__ = ( + sa.PrimaryKeyConstraint("model_id"), + sa.Index("idx_active", "active"), + ) diff --git a/etl/python/models/ml_bad_lumis.py b/etl/python/models/ml_bad_lumis.py new file mode 100644 index 00000000..370b9d14 --- /dev/null +++ b/etl/python/models/ml_bad_lumis.py @@ -0,0 +1,22 @@ +import sqlalchemy as sa +from sqlalchemy.orm import declarative_base + + +Base = declarative_base() + + +class FactMLBadLumis(Base): + __tablename__ = "fact_ml_bad_lumis" + + model_id = sa.Column("model_id", sa.String(length=255)) + dataset_id = sa.Column("dataset_id", sa.BigInteger) + file_id = sa.Column("file_id", sa.BigInteger) + run_number = sa.Column("run_number", sa.Integer) + ls_number = sa.Column("ls_number", sa.Integer) + me_id = sa.Column("me_id", sa.Integer) + mse = sa.Column("mse", sa.Float) + + __table_args__ = ( + sa.PrimaryKeyConstraint("model_id", "dataset_id", "run_number", "ls_number", "me_id"), + sa.Index("idx_mlbl_dataset_id_run_number", "dataset_id", "run_number"), + ) diff --git a/etl/python/pipelines/file_ingesting/pipeline.py b/etl/python/pipelines/file_ingesting/pipeline.py index 745f18ac..b21b0b0b 100644 --- a/etl/python/pipelines/file_ingesting/pipeline.py +++ b/etl/python/pipelines/file_ingesting/pipeline.py @@ -4,13 +4,14 @@ from ...env import conn_str from ...models.file_index import StatusCollection +from ..ml_inference.pipeline import pipeline as ml_pipeline from ..utils import clean_file, error_handler from .exceptions import PipelineCopyError, PipelineRootfileError from .extract import extract from .post_load import post_load from .pre_extract import pre_extract from .transform_load import transform_load -from .utils import validate_root_file +from .utils import fetch_active_models, validate_root_file def pipeline(workspace_name: str, workspace_mes: str, file_id: int, dataset_id: int): @@ -47,4 +48,18 @@ def pipeline(workspace_name: str, workspace_mes: str, file_id: int, dataset_id: # If everything goes well, we can clean the file clean_file(fpath) + + # Run ML pipeline for each model if workspace has any models registered + active_models = fetch_active_models(engine) + for model in active_models: + ml_pipeline( + workspace_name=workspace_name, + model_id=model.model_id, + model_file=model.filename, + target_me=model.target_me, + dataset_id=dataset_id, + file_id=file_id, + ) + + # Finally finishes post_load(engine, file_id) diff --git a/etl/python/pipelines/file_ingesting/utils.py b/etl/python/pipelines/file_ingesting/utils.py index 31ea1502..000fcd71 100644 --- a/etl/python/pipelines/file_ingesting/utils.py +++ b/etl/python/pipelines/file_ingesting/utils.py @@ -1,4 +1,8 @@ import ROOT +from sqlalchemy.engine.base import Engine +from sqlalchemy.orm import sessionmaker + +from ...models import DimMLModelsIndex def validate_root_file(fpath: str) -> None: @@ -8,3 +12,9 @@ def validate_root_file(fpath: str) -> None: """ with ROOT.TFile(fpath) as root_file: root_file.GetUUID().AsString() + + +def fetch_active_models(engine: Engine) -> list[DimMLModelsIndex]: + Session = sessionmaker(bind=engine) # noqa: N806 + with Session() as session: + return session.query(DimMLModelsIndex).filter(DimMLModelsIndex.active).all() diff --git a/etl/python/pipelines/ml_inference/__init__.py b/etl/python/pipelines/ml_inference/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/etl/python/pipelines/ml_inference/extract.py b/etl/python/pipelines/ml_inference/extract.py new file mode 100644 index 00000000..bb752cf7 --- /dev/null +++ b/etl/python/pipelines/ml_inference/extract.py @@ -0,0 +1,27 @@ +from sqlalchemy.engine.base import Engine +from sqlalchemy.exc import NoResultFound +from sqlalchemy.orm import sessionmaker + +from ...models import DimMonitoringElements, FactTH1, FactTH2 + + +def extract_me(engine: Engine, me: str): + sess = sessionmaker(bind=engine) + with sess() as session: + query = session.query(DimMonitoringElements).filter(DimMonitoringElements.me == me) + try: + result = query.one() + except NoResultFound: + result = None + return result + + +def extract(engine: Engine, th_class: FactTH1 | FactTH2, dataset_id: int, file_id: int, me_id: int): + sess = sessionmaker(bind=engine) + with sess() as session: + query = session.query(th_class).filter( + th_class.dataset_id == dataset_id, + th_class.file_id == file_id, + th_class.me_id == me_id, + ) + return query.all() diff --git a/etl/python/pipelines/ml_inference/pipeline.py b/etl/python/pipelines/ml_inference/pipeline.py new file mode 100644 index 00000000..b253f395 --- /dev/null +++ b/etl/python/pipelines/ml_inference/pipeline.py @@ -0,0 +1,63 @@ +import pandas as pd +from sqlalchemy import create_engine + +from ...common.pgsql import copy_expert +from ...env import conn_str +from ...models import FactMLBadLumis, FactTH1, FactTH2 +from .extract import extract, extract_me +from .predict import predict +from .preprocess import preprocess + + +def pipeline( + workspace_name: str, + model_id: int, + model_file: str, + target_me: str, + dataset_id: int, + file_id: int, +): + engine = create_engine(f"{conn_str}/{workspace_name}") + + # Extrac me_id and TH dimension if me exists in database + me = extract_me(engine, target_me) + if me is None: + return + + # Extract data + th_class = FactTH1 if me.dim == 1 else FactTH2 + hists = extract(engine, th_class, dataset_id, file_id, me.me_id) + if len(hists) == 0: + return + + # Preprocess data + lss_, input_data = preprocess(hists) + + # Predictions + preds = predict(workspace_name, model_file, input_data) + + # Select bad lumis + bad_lumis = [] + for idx, ls_number in enumerate(lss_.flatten()): + mse = preds[1][idx] + is_anomaly = bool(preds[2][idx]) + if is_anomaly: + bad_lumis.append( + { + "model_id": model_id, + "dataset_id": dataset_id, + "file_id": file_id, + "run_number": hists[idx].run_number, + "ls_number": ls_number, + "me_id": me.me_id, + "mse": mse, + } + ) + + if len(bad_lumis) == 0: + return + + # Dump bad lumis if there is any + bad_lumis = pd.DataFrame(bad_lumis) + bad_lumis.to_sql(name=FactMLBadLumis.__tablename__, con=engine, if_exists="append", index=False, method=copy_expert) + engine.dispose() diff --git a/etl/python/pipelines/ml_inference/predict.py b/etl/python/pipelines/ml_inference/predict.py new file mode 100644 index 00000000..cdb3c99c --- /dev/null +++ b/etl/python/pipelines/ml_inference/predict.py @@ -0,0 +1,15 @@ +import numpy as np +from onnxruntime import InferenceSession + +from ...env import model_registry_path + + +def predict(workspace_name: str, model_file: str, input_data: np.array) -> list[dict]: + model_path = f"{model_registry_path}/{workspace_name}/{model_file}" + sess = InferenceSession(model_path) + + # Predict + input_name = sess.get_inputs()[0].name + result = sess.run(None, {input_name: input_data}) + + return result diff --git a/etl/python/pipelines/ml_inference/preprocess.py b/etl/python/pipelines/ml_inference/preprocess.py new file mode 100644 index 00000000..70a2664d --- /dev/null +++ b/etl/python/pipelines/ml_inference/preprocess.py @@ -0,0 +1,10 @@ +import numpy as np + + +def preprocess(data: list[dict]) -> tuple: + results_ = [{"ls_number": result.ls_number, "data": result.data} for result in data] + sorted_ = sorted(results_, key=lambda x: x["ls_number"]) + test_array = np.vstack([histogram["data"] for histogram in sorted_]) + test_array = test_array.astype(np.float32) + lss_ = np.vstack([histogram["ls_number"] for histogram in sorted_]) + return lss_, test_array diff --git a/etl/python/pipelines/ml_inference/tasks.py b/etl/python/pipelines/ml_inference/tasks.py new file mode 100644 index 00000000..73a26239 --- /dev/null +++ b/etl/python/pipelines/ml_inference/tasks.py @@ -0,0 +1,9 @@ +from ...celery import app +from .pipeline import pipeline + + +@app.task( + name="ml_inference_pipeline", +) +def ml_inference_pipeline_task(**kwargs): + pipeline(**kwargs) diff --git a/etl/run_ml_on_old_files.py b/etl/run_ml_on_old_files.py new file mode 100644 index 00000000..f0144946 --- /dev/null +++ b/etl/run_ml_on_old_files.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python + +import argparse + +from python.config import workspaces +from python.env import conn_str +from python.models import FactDatasetIndex, FactFileIndex, FactMLBadLumis +from python.models.file_index import StatusCollection +from python.pipelines.file_ingesting.utils import fetch_active_models +from python.pipelines.ml_inference.pipeline import pipeline as ml_pipeline +from python.pipelines.ml_inference.tasks import ml_inference_pipeline_task +from sqlalchemy import create_engine +from sqlalchemy.engine.base import Engine +from sqlalchemy.orm import sessionmaker + + +def get_ws_bulk_queue_name(): + ws = next(filter(lambda x: x["name"] == args.workspace_name, workspaces), None) + return ws["bulk_ingesting_queue"] + + +def get_dataset(engine: Engine, dataset_name: str): + sess = sessionmaker(bind=engine) + with sess() as session: + return session.query(FactDatasetIndex).filter(FactDatasetIndex.dataset == dataset_name).one() + + +def get_finished_files(engine: Engine, dataset_id: int): + sess = sessionmaker(bind=engine) + with sess() as session: + query = session.query(FactFileIndex).filter( + FactFileIndex.dataset_id == dataset_id, FactFileIndex.status == StatusCollection.FINISHED + ) + return query.all() + + +def get_existing_preds(engine: Engine, models_ids: list[int], dataset_id: int, files_ids: list[int]): + sess = sessionmaker(bind=engine) + with sess() as session: + query = session.query(FactMLBadLumis).filter( + FactMLBadLumis.model_id.in_(models_ids), + FactMLBadLumis.dataset_id == dataset_id, + FactMLBadLumis.file_id.in_(files_ids), + ) + return [(res.model_id, res.dataset_id, res.file_id) for res in query.all()] + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Simple script to trigger ML jobs in old files") + parser.add_argument("-w", "--workspace-name", type=str, required=True) + parser.add_argument("-d", "--dataset-name", type=str, required=True) + parser.add_argument("-n", "--no-queue", action="store_true") + args = parser.parse_args() + + queue = get_ws_bulk_queue_name() + engine = create_engine(f"{conn_str}/{args.workspace_name}") + dataset = get_dataset(engine, args.dataset_name) + files = get_finished_files(engine, dataset.dataset_id) + active_models = fetch_active_models(engine) + existing_preds = get_existing_preds( + engine, [model.model_id for model in active_models], dataset.dataset_id, [file.file_id for file in files] + ) + for file in files: + for model in active_models: + pred_tuple = (model.model_id, dataset.dataset_id, file.file_id) + if pred_tuple in existing_preds: + print("IGNORING", pred_tuple) + continue + kwargs = { + "workspace_name": args.workspace_name, + "model_id": model.model_id, + "model_file": model.filename, + "target_me": model.target_me, + "dataset_id": file.dataset_id, + "file_id": file.file_id, + } + if args.no_queue: + ml_pipeline(**kwargs) + else: + ml_inference_pipeline_task.apply_async(kwargs=kwargs, queue=queue) diff --git a/frontend/src/components/navbar.jsx b/frontend/src/components/navbar.jsx index 3be18359..e69a868e 100644 --- a/frontend/src/components/navbar.jsx +++ b/frontend/src/components/navbar.jsx @@ -69,6 +69,11 @@ const AppNavbar = ({ Lumisections + + + Predictions + +