Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature #2 #4

Open
wants to merge 3 commits into
base: release_0.2a0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion .github/workflows/build-production.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ jobs:
push: true
build-args: |
REGISTRY=${{ vars.DOCKER_REGISTRY }}
GEODDS_UTILS_PAT=${{ secrets.GEODDS_UTILS_PAT }}
tags: |
${{ vars.DOCKER_REGISTRY }}/geolake-api:${{ env.RELEASE_TAG }}
${{ vars.DOCKER_REGISTRY }}/geolake-api:latest
Expand All @@ -74,6 +75,19 @@ jobs:
push: true
build-args: |
REGISTRY=${{ vars.DOCKER_REGISTRY }}
GEODDS_UTILS_PAT=${{ secrets.GEODDS_UTILS_PAT }}
tags: |
${{ vars.DOCKER_REGISTRY }}/geolake-executor:${{ env.RELEASE_TAG }}
${{ vars.DOCKER_REGISTRY }}/geolake-executor:latest
${{ vars.DOCKER_REGISTRY }}/geolake-executor:latest
- name: Build and push broker component
uses: docker/build-push-action@v4
with:
context: ./broker
file: ./broker/Dockerfile
push: true
build-args: |
REGISTRY=${{ vars.DOCKER_REGISTRY }}
GEODDS_UTILS_PAT=${{ secrets.GEODDS_UTILS_PAT }}
tags: |
${{ vars.DOCKER_REGISTRY }}/geolake-broker:${{ env.RELEASE_TAG }}
${{ vars.DOCKER_REGISTRY }}/geolake-broker:latest
16 changes: 15 additions & 1 deletion .github/workflows/build-staging.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ jobs:
push: true
build-args: |
REGISTRY=${{ vars.DOCKER_REGISTRY }}
GEODDS_UTILS_PAT=${{ secrets.GEODDS_UTILS_PAT }}
tags: |
${{ vars.DOCKER_REGISTRY }}/geolake-api:${{ env.TAG }}
${{ vars.DOCKER_REGISTRY }}/geolake-api:latest
Expand All @@ -72,6 +73,19 @@ jobs:
push: true
build-args: |
REGISTRY=${{ vars.DOCKER_REGISTRY }}
GEODDS_UTILS_PAT=${{ secrets.GEODDS_UTILS_PAT }}
tags: |
${{ vars.DOCKER_REGISTRY }}/geolake-executor:${{ env.TAG }}
${{ vars.DOCKER_REGISTRY }}/geolake-executor:latest
${{ vars.DOCKER_REGISTRY }}/geolake-executor:latest
- name: Build and push broker component
uses: docker/build-push-action@v4
with:
context: ./broker
file: ./broker/Dockerfile
push: true
build-args: |
REGISTRY=${{ vars.DOCKER_REGISTRY }}
GEODDS_UTILS_PAT=${{ secrets.GEODDS_UTILS_PAT }}
tags: |
${{ vars.DOCKER_REGISTRY }}/geolake-broker:${{ env.TAG }}
${{ vars.DOCKER_REGISTRY }}/geolake-broker:latest
2 changes: 2 additions & 0 deletions api/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
ARG REGISTRY=rg.nl-ams.scw.cloud/geodds-production
ARG TAG=latest
ARG GEODDS_UTILS_PAT
FROM $REGISTRY/geolake-datastore:$TAG
WORKDIR /app
COPY requirements.txt /code/requirements.txt
RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt
RUN pip install git+https://opengeokube:${GEODDS_UTILS_PAT}@github.com/opengeokube/geodds-utils.git
COPY app /app
EXPOSE 80
CMD ["uvicorn", "app.main:app", "--proxy-headers", "--host", "0.0.0.0", "--port", "80"]
1 change: 1 addition & 0 deletions api/app/auth/backend.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""The module contains authentication backend"""

from uuid import UUID

from starlette.authentication import (
Expand Down
1 change: 1 addition & 0 deletions api/app/auth/manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Module with access/authentication functions"""

from typing import Optional

from utils.api_logging import get_dds_logger
Expand Down
1 change: 1 addition & 0 deletions api/app/auth/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""The module contains models related to the authentication and authorization"""

from starlette.authentication import SimpleUser


Expand Down
1 change: 1 addition & 0 deletions api/app/callbacks/on_startup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Module with functions call during API server startup"""

from utils.api_logging import get_dds_logger

from datastore.datastore import Datastore
Expand Down
1 change: 1 addition & 0 deletions api/app/decorators_factory.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Modules with utils for creating decorators"""

from inspect import Signature


Expand Down
98 changes: 42 additions & 56 deletions api/app/endpoint_handlers/dataset.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
"""Modules realizing logic for dataset-related endpoints"""
import os
import pika

import json
from typing import Optional

from fastapi.responses import FileResponse

from geodds_utils.units import make_bytes_readable_dict
from geodds_utils.workflow import log_execution_time

from dbmanager.dbmanager import DBManager, RequestStatus
from intake_geokube.queries.geoquery import GeoQuery
from intake_geokube.queries.geoquery import GeoQuery, Workflow
from intake_geokube.queries.workflow import Workflow
from datastore.datastore import Datastore, DEFAULT_MAX_REQUEST_SIZE_GB
from datastore import exception as datastore_exception
Expand All @@ -26,7 +28,24 @@
log = get_dds_logger(__name__)
data_store = Datastore()

MESSAGE_SEPARATOR = os.environ["MESSAGE_SEPARATOR"]
PENDING_QUEUE: str = "pending"


def convert_to_workflow(
dataset_id: str, product_id: str, geoquery: GeoQuery
) -> Workflow:
raw_task = {
"id": "geoquery",
"op": "subset",
"use": [],
"args": {
"dataset_id": dataset_id,
"product_id": product_id,
"query": geoquery.dict(),
},
}
return TaskList.parse([raw_task])


def _is_etimate_enabled(dataset_id, product_id):
if dataset_id in ("sentinel-2",):
Expand Down Expand Up @@ -260,7 +279,9 @@ def async_query(
"""
log.debug("geoquery: %s", query)
if _is_etimate_enabled(dataset_id, product_id):
estimated_size = estimate(dataset_id, product_id, query, "GB").get("value")
estimated_size = estimate(dataset_id, product_id, query, "GB").get(
"value"
)
allowed_size = data_store.product_metadata(dataset_id, product_id).get(
"maximum_query_size_gb", DEFAULT_MAX_REQUEST_SIZE_GB
)
Expand All @@ -275,36 +296,17 @@ def async_query(
raise exc.EmptyDatasetError(
dataset_id=dataset_id, product_id=product_id
)
broker_conn = pika.BlockingConnection(
pika.ConnectionParameters(
host=os.getenv("BROKER_SERVICE_HOST", "broker")
)
)
broker_channel = broker_conn.channel()

request_id = DBManager().create_request(
user_id=user_id,
dataset=dataset_id,
product=product_id,
query=json.dumps(query.model_dump_original()),
)

# TODO: find a separator; for the moment use "\"
message = MESSAGE_SEPARATOR.join(
[str(request_id), "query", dataset_id, product_id, query.json()]
)

broker_channel.basic_publish(
exchange="",
routing_key="query_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
),
query=convert_to_workflow(dataset_id, product_id, query).json(),
status=RequestStatus.PENDING,
)
broker_conn.close()
return request_id


@log_execution_time(log)
@assert_product_exists
def sync_query(
Expand Down Expand Up @@ -343,29 +345,32 @@ def sync_query(
if estimated size is zero

"""

import time

request_id = async_query(user_id, dataset_id, product_id, query)
status, _ = DBManager().get_request_status_and_reason(request_id)
log.debug("sync query: status: %s", status)
while status in (RequestStatus.RUNNING, RequestStatus.QUEUED,
RequestStatus.PENDING):
while status in (
RequestStatus.RUNNING,
RequestStatus.QUEUED,
RequestStatus.PENDING,
):
time.sleep(1)
status, _ = DBManager().get_request_status_and_reason(request_id)
log.debug("sync query: status: %s", status)

if status is RequestStatus.DONE:
download_details = DBManager().get_download_details_for_request_id(
request_id
request_id
)
return FileResponse(
path=download_details.location_path,
filename=download_details.location_path.split(os.sep)[-1],
)
raise exc.ProductRetrievingError(
dataset_id=dataset_id,
product_id=product_id,
status=status.name)
dataset_id=dataset_id, product_id=product_id, status=status.name
)


@log_execution_time(log)
Expand Down Expand Up @@ -400,31 +405,12 @@ def run_workflow(

"""
log.debug("geoquery: %s", workflow)
broker_conn = pika.BlockingConnection(
pika.ConnectionParameters(
host=os.getenv("BROKER_SERVICE_HOST", "broker")
)
)
broker_channel = broker_conn.channel()

request_id = DBManager().create_request(
user_id=user_id,
dataset=workflow.dataset_id,
product=workflow.product_id,
query=workflow.json(),
status=RequestStatus.PENDING,
)

# TODO: find a separator; for the moment use "\"
message = MESSAGE_SEPARATOR.join(
[str(request_id), "workflow", workflow.json()]
)

broker_channel.basic_publish(
exchange="",
routing_key="query_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
),
)
broker_conn.close()
return request_id
10 changes: 4 additions & 6 deletions api/app/endpoint_handlers/file.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
"""Module with functions to handle file related endpoints"""

import os

from fastapi.responses import FileResponse
from dbmanager.dbmanager import DBManager, RequestStatus
from geodds_utils.workflow import log_execution_time

from utils.api_logging import get_dds_logger
from utils.metrics import log_execution_time
import exceptions as exc

log = get_dds_logger(__name__)
Expand Down Expand Up @@ -41,11 +42,8 @@ def download_request_result(request_id: int):
"preparing downloads for request id: %s",
request_id,
)
(
request_status,
_,
) = DBManager().get_request_status_and_reason(request_id=request_id)
if request_status is not RequestStatus.DONE:
request = DBManager().get_request(request_id=request_id)
if request.status is not RequestStatus.DONE:
log.debug(
"request with id: '%s' does not exist or it is not finished yet!",
request_id,
Expand Down
29 changes: 14 additions & 15 deletions api/app/endpoint_handlers/request.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""Modules with functions realizing logic for requests-related endpoints"""

from dbmanager.dbmanager import DBManager

from geodds_utils.workflow import log_execution_time
from utils.api_logging import get_dds_logger
from utils.metrics import log_execution_time

import exceptions as exc

log = get_dds_logger(__name__)
Expand All @@ -26,7 +28,7 @@ def get_requests(user_id: str):
requests : list
List of all requests done by the user
"""
return DBManager().get_requests_for_user_id(user_id=user_id)
return DBManager().get_request(user_id=user_id)


@log_execution_time(log)
Expand All @@ -51,15 +53,14 @@ def get_request_status(user_id: str, request_id: int):
Tuple of status and fail reason.
"""
# NOTE: maybe verification should be added if user checks only him\her requests
try:
status, reason = DBManager().get_request_status_and_reason(request_id)
except IndexError as err:
request = DBManager().get_request(request_id=request_id)
if not request:
log.error(
"request with id: '%s' was not found!",
request_id,
)
raise exc.RequestNotFound(request_id=request_id) from err
return {"status": status.name, "fail_reason": reason}
raise exc.RequestNotFound(request_id=request_id)
return {"status": request.status.name, "fail_reason": request.fail_reason}


@log_execution_time(log)
Expand Down Expand Up @@ -88,8 +89,9 @@ def get_request_resulting_size(request_id: int):
if request := DBManager().get_request_details(request_id):
size = request.download.size_bytes
if not size or size == 0:
raise exc.EmptyDatasetError(dataset_id=request.dataset,
product_id=request.product)
raise exc.EmptyDatasetError(
dataset_id=request.dataset, product_id=request.product
)
return size
log.info(
"request with id '%s' could not be found",
Expand Down Expand Up @@ -128,17 +130,14 @@ def get_request_uri(request_id: int):
)
raise exc.RequestNotFound(request_id=request_id) from err
if download_details is None:
(
request_status,
_,
) = DBManager().get_request_status_and_reason(request_id)
request = DBManager().get_request(request_id=request_id)
log.info(
"download URI not found for request id: '%s'."
" Request status is '%s'",
request_id,
request_status,
request.status,
)
raise exc.RequestStatusNotDone(
request_id=request_id, request_status=request_status
request_id=request_id, request_status=request.status
)
return download_details.download_uri
Loading
Loading