Skip to content

Commit

Permalink
Add OGC API item and map
Browse files Browse the repository at this point in the history
  • Loading branch information
Marco Mancini committed Jan 29, 2024
1 parent 3734918 commit 032b398
Show file tree
Hide file tree
Showing 15 changed files with 630 additions and 79 deletions.
103 changes: 87 additions & 16 deletions api/app/endpoint_handlers/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import pika
from typing import Optional

from dbmanager.dbmanager import DBManager
from fastapi.responses import FileResponse

from dbmanager.dbmanager import DBManager, RequestStatus
from geoquery.geoquery import GeoQuery
from geoquery.task import TaskList
from datastore.datastore import Datastore, DEFAULT_MAX_REQUEST_SIZE_GB
Expand All @@ -18,12 +20,18 @@
from api_utils import make_bytes_readable_dict
from validation import assert_product_exists

from . import request

log = get_dds_logger(__name__)
data_store = Datastore()

MESSAGE_SEPARATOR = os.environ["MESSAGE_SEPARATOR"]

def _is_etimate_enabled(dataset_id, product_id):
if dataset_id in ("sentinel-2",):
return False
return True


@log_execution_time(log)
def get_datasets(user_roles_names: list[str]) -> list[dict]:
Expand Down Expand Up @@ -213,7 +221,7 @@ def estimate(

@log_execution_time(log)
@assert_product_exists
def query(
def async_query(
user_id: str,
dataset_id: str,
product_id: str,
Expand Down Expand Up @@ -250,21 +258,22 @@ def query(
"""
log.debug("geoquery: %s", query)
estimated_size = estimate(dataset_id, product_id, query, "GB").get("value")
allowed_size = data_store.product_metadata(dataset_id, product_id).get(
"maximum_query_size_gb", DEFAULT_MAX_REQUEST_SIZE_GB
)
if estimated_size > allowed_size:
raise exc.MaximumAllowedSizeExceededError(
dataset_id=dataset_id,
product_id=product_id,
estimated_size_gb=estimated_size,
allowed_size_gb=allowed_size,
)
if estimated_size == 0.0:
raise exc.EmptyDatasetError(
dataset_id=dataset_id, product_id=product_id
if _is_etimate_enabled(dataset_id, product_id):
estimated_size = estimate(dataset_id, product_id, query, "GB").get("value")
allowed_size = data_store.product_metadata(dataset_id, product_id).get(
"maximum_query_size_gb", DEFAULT_MAX_REQUEST_SIZE_GB
)
if estimated_size > allowed_size:
raise exc.MaximumAllowedSizeExceededError(
dataset_id=dataset_id,
product_id=product_id,
estimated_size_gb=estimated_size,
allowed_size_gb=allowed_size,
)
if estimated_size == 0.0:
raise exc.EmptyDatasetError(
dataset_id=dataset_id, product_id=product_id
)
broker_conn = pika.BlockingConnection(
pika.ConnectionParameters(
host=os.getenv("BROKER_SERVICE_HOST", "broker")
Expand Down Expand Up @@ -295,6 +304,68 @@ def query(
broker_conn.close()
return request_id

@log_execution_time(log)
@assert_product_exists
def sync_query(
user_id: str,
dataset_id: str,
product_id: str,
query: GeoQuery,
):
"""Realize the logic for the endpoint:
`POST /datasets/{dataset_id}/{product_id}/execute`
Query the data and return the result of the request.
Parameters
----------
user_id : str
ID of the user executing the query
dataset_id : str
ID of the dataset
product_id : str
ID of the product
query : GeoQuery
Query to perform
Returns
-------
request_id : int
ID of the request
Raises
-------
MaximumAllowedSizeExceededError
if the allowed size is below the estimated one
EmptyDatasetError
if estimated size is zero
"""

import time
request_id = async_query(user_id, dataset_id, product_id, query)
status, _ = DBManager().get_request_status_and_reason(request_id)
log.debug("sync query: status: %s", status)
while status in (RequestStatus.RUNNING, RequestStatus.QUEUED,
RequestStatus.PENDING):
time.sleep(1)
status, _ = DBManager().get_request_status_and_reason(request_id)
log.debug("sync query: status: %s", status)

if status is RequestStatus.DONE:
download_details = DBManager().get_download_details_for_request_id(
request_id
)
return FileResponse(
path=download_details.location_path,
filename=download_details.location_path.split(os.sep)[-1],
)
raise exc.ProductRetrievingError(
dataset_id=dataset_id,
product_id=product_id,
status=status.name)


@log_execution_time(log)
def run_workflow(
Expand Down
6 changes: 5 additions & 1 deletion api/app/endpoint_handlers/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ def get_request_resulting_size(request_id: int):
If the request was not found
"""
if request := DBManager().get_request_details(request_id):
return request.download.size_bytes
size = request.download.size_bytes
if not size or size == 0:
raise exc.EmptyDatasetError(dataset_id=request.dataset,
product_id=request.product)
return size
log.info(
"request with id '%s' could not be found",
request_id,
Expand Down
13 changes: 13 additions & 0 deletions api/app/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,16 @@ def __init__(self, dataset_id, product_id):
product_id=product_id,
)
super().__init__(self.msg)

class ProductRetrievingError(BaseDDSException):
"""Retrieving of the product failed."""

msg: str = "Retrieving of the product '{dataset_id}.{product_id}' failed with the status {status}"

def __init__(self, dataset_id, product_id, status):
self.msg = self.msg.format(
dataset_id=dataset_id,
product_id=product_id,
status=status
)
super().__init__(self.msg)
116 changes: 114 additions & 2 deletions api/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import os
from typing import Optional

from fastapi import FastAPI, HTTPException, Request, status
from datetime import datetime

from fastapi import FastAPI, HTTPException, Request, status, Query
from fastapi.middleware.cors import CORSMiddleware
from starlette.middleware.authentication import AuthenticationMiddleware
from starlette.authentication import requires
Expand All @@ -18,6 +20,7 @@

from geoquery.geoquery import GeoQuery
from geoquery.task import TaskList
from geoquery.geoquery import GeoQuery

from utils.api_logging import get_dds_logger
import exceptions as exc
Expand All @@ -32,6 +35,21 @@
from const import venv, tags
from auth import scopes

def map_to_geoquery(
variables: list[str],
format: str,
bbox: str | None = None, # minx, miny, maxx, maxy (minlon, minlat, maxlon, maxlat)
time: datetime | None = None,
**format_kwargs
) -> GeoQuery:

bbox_ = [float(x) for x in bbox.split(',')]
area = { 'west': bbox_[0], 'south': bbox_[1], 'east': bbox_[2], 'north': bbox_[3], }
time_ = { 'year': time.year, 'month': time.month, 'day': time.day, 'hour': time.hour}
query = GeoQuery(variable=variables, time=time_, area=area,
format_args=format_kwargs, format=format)
return query

logger = get_dds_logger(__name__)

# ======== JSON encoders extension ========= #
Expand Down Expand Up @@ -155,6 +173,100 @@ async def get_product_details(
except exc.BaseDDSException as err:
raise err.wrap_around_http_exception() from err

@app.get("/datasets/{dataset_id}/{product_id}/map", tags=[tags.DATASET])
@timer(
app.state.api_request_duration_seconds,
labels={"route": "GET /datasets/{dataset_id}/{product_id}"},
)
async def get_map(
request: Request,
dataset_id: str,
product_id: str,
# OGC WMS parameters
width: int,
height: int,
layers: str | None = None,
format: str | None = 'png',
time: datetime | None = None,
transparent: bool | None = 'true',
bgcolor: str | None = 'FFFFFF',
bbox: str | None = None, # minx, miny, maxx, maxy (minlon, minlat, maxlon, maxlat)
crs: str | None = None,
# OGC map parameters
# subset: str | None = None,
# subset_crs: str | None = Query(..., alias="subset-crs"),
# bbox_crs: str | None = Query(..., alias="bbox-crs"),
):

app.state.api_http_requests_total.inc(
{"route": "GET /datasets/{dataset_id}/{product_id}/map"}
)
# query should be the OGC query
# map OGC parameters to GeoQuery
# variable: Optional[Union[str, List[str]]]
# time: Optional[Union[Dict[str, str], Dict[str, List[str]]]]
# area: Optional[Dict[str, float]]
# location: Optional[Dict[str, Union[float, List[float]]]]
# vertical: Optional[Union[float, List[float], Dict[str, float]]]
# filters: Optional[Dict]
# format: Optional[str]
query = map_to_geoquery(variables=layers, bbox=bbox, time=time,
format="png", width=width, height=height,
transparent=transparent, bgcolor=bgcolor)
try:
return dataset_handler.sync_query(
user_id=request.user.id,
dataset_id=dataset_id,
product_id=product_id,
query=query
)
except exc.BaseDDSException as err:
raise err.wrap_around_http_exception() from err

@app.get("/datasets/{dataset_id}/{product_id}/items/{feature_id}", tags=[tags.DATASET])
@timer(
app.state.api_request_duration_seconds,
labels={"route": "GET /datasets/{dataset_id}/{product_id}/items/{feature_id}"},
)
async def get_feature(
request: Request,
dataset_id: str,
product_id: str,
feature_id: str,
# OGC feature parameters
time: datetime | None = None,
bbox: str | None = None, # minx, miny, maxx, maxy (minlon, minlat, maxlon, maxlat)
crs: str | None = None,
# OGC map parameters
# subset: str | None = None,
# subset_crs: str | None = Query(..., alias="subset-crs"),
# bbox_crs: str | None = Query(..., alias="bbox-crs"),
):

app.state.api_http_requests_total.inc(
{"route": "GET /datasets/{dataset_id}/{product_id}/items/{feature_id}"}
)
# query should be the OGC query
# feature OGC parameters to GeoQuery
# variable: Optional[Union[str, List[str]]]
# time: Optional[Union[Dict[str, str], Dict[str, List[str]]]]
# area: Optional[Dict[str, float]]
# location: Optional[Dict[str, Union[float, List[float]]]]
# vertical: Optional[Union[float, List[float], Dict[str, float]]]
# filters: Optional[Dict]
# format: Optional[str]

query = map_to_geoquery(variables=[feature_id], bbox=bbox, time=time,
format="geojson")
try:
return dataset_handler.sync_query(
user_id=request.user.id,
dataset_id=dataset_id,
product_id=product_id,
query=query
)
except exc.BaseDDSException as err:
raise err.wrap_around_http_exception() from err

@app.get("/datasets/{dataset_id}/{product_id}/metadata", tags=[tags.DATASET])
@timer(
Expand Down Expand Up @@ -222,7 +334,7 @@ async def query(
{"route": "POST /datasets/{dataset_id}/{product_id}/execute"}
)
try:
return dataset_handler.query(
return dataset_handler.async_query(
user_id=request.user.id,
dataset_id=dataset_id,
product_id=product_id,
Expand Down
Loading

0 comments on commit 032b398

Please sign in to comment.