diff --git a/.gitignore b/.gitignore index 1856070..ffcbef0 100644 --- a/.gitignore +++ b/.gitignore @@ -112,4 +112,5 @@ venv.bak/ _catalogs/ _old/ -.DS_Store \ No newline at end of file +.DS_Store +db_init \ No newline at end of file diff --git a/api/app/endpoint_handlers/dataset.py b/api/app/endpoint_handlers/dataset.py index a3f8ca5..109eb0c 100644 --- a/api/app/endpoint_handlers/dataset.py +++ b/api/app/endpoint_handlers/dataset.py @@ -3,7 +3,9 @@ import pika from typing import Optional -from dbmanager.dbmanager import DBManager +from fastapi.responses import FileResponse + +from dbmanager.dbmanager import DBManager, RequestStatus from geoquery.geoquery import GeoQuery from geoquery.task import TaskList from datastore.datastore import Datastore, DEFAULT_MAX_REQUEST_SIZE_GB @@ -18,12 +20,18 @@ from api_utils import make_bytes_readable_dict from validation import assert_product_exists +from . import request log = get_dds_logger(__name__) data_store = Datastore() MESSAGE_SEPARATOR = os.environ["MESSAGE_SEPARATOR"] +def _is_etimate_enabled(dataset_id, product_id): + if dataset_id in ("sentinel-2",): + return False + return True + @log_execution_time(log) def get_datasets(user_roles_names: list[str]) -> list[dict]: @@ -213,7 +221,7 @@ def estimate( @log_execution_time(log) @assert_product_exists -def query( +def async_query( user_id: str, dataset_id: str, product_id: str, @@ -250,21 +258,22 @@ def query( """ log.debug("geoquery: %s", query) - estimated_size = estimate(dataset_id, product_id, query, "GB").get("value") - allowed_size = data_store.product_metadata(dataset_id, product_id).get( - "maximum_query_size_gb", DEFAULT_MAX_REQUEST_SIZE_GB - ) - if estimated_size > allowed_size: - raise exc.MaximumAllowedSizeExceededError( - dataset_id=dataset_id, - product_id=product_id, - estimated_size_gb=estimated_size, - allowed_size_gb=allowed_size, - ) - if estimated_size == 0.0: - raise exc.EmptyDatasetError( - dataset_id=dataset_id, product_id=product_id + if _is_etimate_enabled(dataset_id, product_id): + estimated_size = estimate(dataset_id, product_id, query, "GB").get("value") + allowed_size = data_store.product_metadata(dataset_id, product_id).get( + "maximum_query_size_gb", DEFAULT_MAX_REQUEST_SIZE_GB ) + if estimated_size > allowed_size: + raise exc.MaximumAllowedSizeExceededError( + dataset_id=dataset_id, + product_id=product_id, + estimated_size_gb=estimated_size, + allowed_size_gb=allowed_size, + ) + if estimated_size == 0.0: + raise exc.EmptyDatasetError( + dataset_id=dataset_id, product_id=product_id + ) broker_conn = pika.BlockingConnection( pika.ConnectionParameters( host=os.getenv("BROKER_SERVICE_HOST", "broker") @@ -295,6 +304,68 @@ def query( broker_conn.close() return request_id +@log_execution_time(log) +@assert_product_exists +def sync_query( + user_id: str, + dataset_id: str, + product_id: str, + query: GeoQuery, +): + """Realize the logic for the endpoint: + + `POST /datasets/{dataset_id}/{product_id}/execute` + + Query the data and return the result of the request. + + Parameters + ---------- + user_id : str + ID of the user executing the query + dataset_id : str + ID of the dataset + product_id : str + ID of the product + query : GeoQuery + Query to perform + + Returns + ------- + request_id : int + ID of the request + + Raises + ------- + MaximumAllowedSizeExceededError + if the allowed size is below the estimated one + EmptyDatasetError + if estimated size is zero + + """ + + import time + request_id = async_query(user_id, dataset_id, product_id, query) + status, _ = DBManager().get_request_status_and_reason(request_id) + log.debug("sync query: status: %s", status) + while status in (RequestStatus.RUNNING, RequestStatus.QUEUED, + RequestStatus.PENDING): + time.sleep(1) + status, _ = DBManager().get_request_status_and_reason(request_id) + log.debug("sync query: status: %s", status) + + if status is RequestStatus.DONE: + download_details = DBManager().get_download_details_for_request_id( + request_id + ) + return FileResponse( + path=download_details.location_path, + filename=download_details.location_path.split(os.sep)[-1], + ) + raise exc.ProductRetrievingError( + dataset_id=dataset_id, + product_id=product_id, + status=status.name) + @log_execution_time(log) def run_workflow( diff --git a/api/app/endpoint_handlers/request.py b/api/app/endpoint_handlers/request.py index 320bceb..93a0636 100644 --- a/api/app/endpoint_handlers/request.py +++ b/api/app/endpoint_handlers/request.py @@ -86,7 +86,11 @@ def get_request_resulting_size(request_id: int): If the request was not found """ if request := DBManager().get_request_details(request_id): - return request.download.size_bytes + size = request.download.size_bytes + if not size or size == 0: + raise exc.EmptyDatasetError(dataset_id=request.dataset, + product_id=request.product) + return size log.info( "request with id '%s' could not be found", request_id, diff --git a/api/app/exceptions.py b/api/app/exceptions.py index 01de71c..af4d072 100644 --- a/api/app/exceptions.py +++ b/api/app/exceptions.py @@ -180,3 +180,16 @@ def __init__(self, dataset_id, product_id): product_id=product_id, ) super().__init__(self.msg) + +class ProductRetrievingError(BaseDDSException): + """Retrieving of the product failed.""" + + msg: str = "Retrieving of the product '{dataset_id}.{product_id}' failed with the status {status}" + + def __init__(self, dataset_id, product_id, status): + self.msg = self.msg.format( + dataset_id=dataset_id, + product_id=product_id, + status=status + ) + super().__init__(self.msg) \ No newline at end of file diff --git a/api/app/main.py b/api/app/main.py index d0f5ad8..893a907 100644 --- a/api/app/main.py +++ b/api/app/main.py @@ -1,9 +1,13 @@ """Main module with geolake API endpoints defined""" __version__ = "0.1.0" import os -from typing import Optional +import re +from typing import Optional, Dict +from datetime import datetime -from fastapi import FastAPI, HTTPException, Request, status +from oai_dcat.metadata_provider import BASE_URL +from oai_dcat.oai_utils import serialize_and_concatenate_graphs, convert_to_dcat_ap_it +from fastapi import FastAPI, HTTPException, Request, status, Query, Response from fastapi.middleware.cors import CORSMiddleware from starlette.middleware.authentication import AuthenticationMiddleware from starlette.authentication import requires @@ -16,8 +20,8 @@ ) from aioprometheus.asgi.starlette import metrics -from geoquery.geoquery import GeoQuery from geoquery.task import TaskList +from geoquery.geoquery import GeoQuery from utils.api_logging import get_dds_logger import exceptions as exc @@ -31,6 +35,33 @@ from encoders import extend_json_encoders from const import venv, tags from auth import scopes +from oai_dcat import oai_server + +def map_to_geoquery( + variables: list[str], + format: str, + bbox: str | None = None, # minx, miny, maxx, maxy (minlon, minlat, maxlon, maxlat) + time: datetime | None = None, + filters: Optional[Dict] = None, + **format_kwargs +) -> GeoQuery: + + if bbox: + bbox_ = [float(x) for x in bbox.split(',')] + area = { 'west': bbox_[0], 'south': bbox_[1], 'east': bbox_[2], 'north': bbox_[3], } + else: + area = None + if time: + time_ = { 'year': time.year, 'month': time.month, 'day': time.day, 'hour': time.hour} + else: + time_ = None + if filters: + query = GeoQuery(variable=variables, time=time_, area=area, filters=filters, + format_args=format_kwargs, format=format) + else: + query = GeoQuery(variable=variables, time=time_, area=area, + format_args=format_kwargs, format=format) + return query logger = get_dds_logger(__name__) @@ -155,7 +186,251 @@ async def get_product_details( except exc.BaseDDSException as err: raise err.wrap_around_http_exception() from err +@app.get("/datasets/{dataset_id}/{product_id}/map", tags=[tags.DATASET]) +@timer( + app.state.api_request_duration_seconds, + labels={"route": "GET /datasets/{dataset_id}/{product_id}"}, +) +async def get_map( + request: Request, + dataset_id: str, + product_id: str, +# OGC WMS parameters + width: int, + height: int, + dpi: int | None = 100, + layers: str | None = None, + format: str | None = 'png', + time: datetime | None = None, + transparent: bool | None = 'true', + bgcolor: str | None = 'FFFFFF', + cmap: str | None = 'RdBu_r', + bbox: str | None = None, # minx, miny, maxx, maxy (minlon, minlat, maxlon, maxlat) + crs: str | None = None, + vmin: float | None = None, + vmax: float | None = None +# OGC map parameters + # subset: str | None = None, + # subset_crs: str | None = Query(..., alias="subset-crs"), + # bbox_crs: str | None = Query(..., alias="bbox-crs"), +): + + app.state.api_http_requests_total.inc( + {"route": "GET /datasets/{dataset_id}/{product_id}/map"} + ) + # query should be the OGC query + # map OGC parameters to GeoQuery + # variable: Optional[Union[str, List[str]]] + # time: Optional[Union[Dict[str, str], Dict[str, List[str]]]] + # area: Optional[Dict[str, float]] + # location: Optional[Dict[str, Union[float, List[float]]]] + # vertical: Optional[Union[float, List[float], Dict[str, float]]] + # filters: Optional[Dict] + # format: Optional[str] + + query = map_to_geoquery(variables=layers, bbox=bbox, time=time, + format="png", width=width, height=height, + transparent=transparent, bgcolor=bgcolor, + dpi=dpi, cmap=cmap, projection=crs, + vmin=vmin, vmax=vmax) + try: + return dataset_handler.sync_query( + user_id=request.user.id, + dataset_id=dataset_id, + product_id=product_id, + query=query + ) + except exc.BaseDDSException as err: + raise err.wrap_around_http_exception() from err + +@app.get("/datasets/{dataset_id}/{product_id}/{filters:path}/map", tags=[tags.DATASET]) +@timer( + app.state.api_request_duration_seconds, + labels={"route": "GET /datasets/{dataset_id}/{product_id}"}, +) +async def get_map_with_filters( + request: Request, + dataset_id: str, + product_id: str, + filters: str, +# OGC WMS parameters + width: int, + height: int, + dpi: int | None = 100, + layers: str | None = None, + format: str | None = 'png', + time: datetime | None = None, + transparent: bool | None = 'true', + bgcolor: str | None = 'FFFFFF', + cmap: str | None = 'RdBu_r', + bbox: str | None = None, # minx, miny, maxx, maxy (minlon, minlat, maxlon, maxlat) + crs: str | None = None, + vmin: float | None = None, + vmax: float | None = None +# OGC map parameters + # subset: str | None = None, + # subset_crs: str | None = Query(..., alias="subset-crs"), + # bbox_crs: str | None = Query(..., alias="bbox-crs"), +): + filters_vals = filters.split("/") + + if dataset_id in ['rs-indices', 'pasture']: + filters_dict = {'pasture': filters_vals[0]} + + else: + try: + product_info = dataset_handler.get_product_details( + user_roles_names=request.auth.scopes, + dataset_id=dataset_id, + product_id=product_id, + ) + except exc.BaseDDSException as err: + raise err.wrap_around_http_exception() from err + + filters_keys = product_info['metadata']['filters'] + filters_dict = {} + for i in range(0, len(filters_vals)): + filters_dict[filters_keys[i]['name']] = filters_vals[i] + + app.state.api_http_requests_total.inc( + {"route": "GET /datasets/{dataset_id}/{product_id}/map"} + ) + # query should be the OGC query + # map OGC parameters to GeoQuery + # variable: Optional[Union[str, List[str]]] + # time: Optional[Union[Dict[str, str], Dict[str, List[str]]]] + # area: Optional[Dict[str, float]] + # location: Optional[Dict[str, Union[float, List[float]]]] + # vertical: Optional[Union[float, List[float], Dict[str, float]]] + # filters: Optional[Dict] + # format: Optional[str] + + query = map_to_geoquery(variables=layers, bbox=bbox, time=time, filters=filters_dict, + format="png", width=width, height=height, + transparent=transparent, bgcolor=bgcolor, + dpi=dpi, cmap=cmap, projection=crs, vmin=vmin, vmax=vmax) + + try: + return dataset_handler.sync_query( + user_id=request.user.id, + dataset_id=dataset_id, + product_id=product_id, + query=query + ) + except exc.BaseDDSException as err: + raise err.wrap_around_http_exception() from err +@app.get("/datasets/{dataset_id}/{product_id}/items/{feature_id}", tags=[tags.DATASET]) +@timer( + app.state.api_request_duration_seconds, + labels={"route": "GET /datasets/{dataset_id}/{product_id}/items/{feature_id}"}, +) +async def get_feature( + request: Request, + dataset_id: str, + product_id: str, + feature_id: str, +# OGC feature parameters + time: datetime | None = None, + bbox: str | None = None, # minx, miny, maxx, maxy (minlon, minlat, maxlon, maxlat) + crs: str | None = None, +# OGC map parameters + # subset: str | None = None, + # subset_crs: str | None = Query(..., alias="subset-crs"), + # bbox_crs: str | None = Query(..., alias="bbox-crs"), +): + + app.state.api_http_requests_total.inc( + {"route": "GET /datasets/{dataset_id}/{product_id}/items/{feature_id}"} + ) + # query should be the OGC query + # feature OGC parameters to GeoQuery + # variable: Optional[Union[str, List[str]]] + # time: Optional[Union[Dict[str, str], Dict[str, List[str]]]] + # area: Optional[Dict[str, float]] + # location: Optional[Dict[str, Union[float, List[float]]]] + # vertical: Optional[Union[float, List[float], Dict[str, float]]] + # filters: Optional[Dict] + # format: Optional[str] + + query = map_to_geoquery(variables=[feature_id], bbox=bbox, time=time, + format="geojson") + try: + return dataset_handler.sync_query( + user_id=request.user.id, + dataset_id=dataset_id, + product_id=product_id, + query=query + ) + except exc.BaseDDSException as err: + raise err.wrap_around_http_exception() from err + +@app.get("/datasets/{dataset_id}/{product_id}/{filters:path}/items/{feature_id}", tags=[tags.DATASET]) +@timer( + app.state.api_request_duration_seconds, + labels={"route": "GET /datasets/{dataset_id}/{product_id}/items/{feature_id}"}, +) +async def get_feature_with_filters( + request: Request, + dataset_id: str, + product_id: str, + feature_id: str, + filters: str, +# OGC feature parameters + time: datetime | None = None, + bbox: str | None = None, # minx, miny, maxx, maxy (minlon, minlat, maxlon, maxlat) + crs: str | None = None, +# OGC map parameters + # subset: str | None = None, + # subset_crs: str | None = Query(..., alias="subset-crs"), + # bbox_crs: str | None = Query(..., alias="bbox-crs"), +): + filters_vals = filters.split("/") + + if dataset_id in ['rs-indices', 'pasture']: + filters_dict = {'pasture': filters_vals[0]} + + else: + try: + product_info = dataset_handler.get_product_details( + user_roles_names=request.auth.scopes, + dataset_id=dataset_id, + product_id=product_id, + ) + except exc.BaseDDSException as err: + raise err.wrap_around_http_exception() from err + + filters_keys = product_info['metadata']['filters'] + filters_dict = {} + for i in range(0, len(filters_vals)): + filters_dict[filters_keys[i]['name']] = filters_vals[i] + + app.state.api_http_requests_total.inc( + {"route": "GET /datasets/{dataset_id}/{product_id}/items/{feature_id}"} + ) + # query should be the OGC query + # feature OGC parameters to GeoQuery + # variable: Optional[Union[str, List[str]]] + # time: Optional[Union[Dict[str, str], Dict[str, List[str]]]] + # area: Optional[Dict[str, float]] + # location: Optional[Dict[str, Union[float, List[float]]]] + # vertical: Optional[Union[float, List[float], Dict[str, float]]] + # filters: Optional[Dict] + # format: Optional[str] + + query = map_to_geoquery(variables=[feature_id], bbox=bbox, time=time, filters=filters_dict, + format="geojson") + try: + return dataset_handler.sync_query( + user_id=request.user.id, + dataset_id=dataset_id, + product_id=product_id, + query=query + ) + except exc.BaseDDSException as err: + raise err.wrap_around_http_exception() from err + + @app.get("/datasets/{dataset_id}/{product_id}/metadata", tags=[tags.DATASET]) @timer( app.state.api_request_duration_seconds, @@ -222,7 +497,7 @@ async def query( {"route": "POST /datasets/{dataset_id}/{product_id}/execute"} ) try: - return dataset_handler.query( + return dataset_handler.async_query( user_id=request.user.id, dataset_id=dataset_id, product_id=product_id, @@ -355,3 +630,53 @@ async def download_request_result( raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="File was not found!" ) from err + +# Define OAI-PMH endpoint route +@app.get("/oai/{dataset_id}") +@app.post("/oai/{dataset_id}") +async def oai(request: Request, dataset_id: str): + params = dict(request.query_params) + + # Add dataset_id to the parameters as "set_", which is a parameter from the OAI-PMH protocol + params['set'] = dataset_id + # params['scopes'] = request.auth.scopes + + # Making sure it uses the dcat_ap metadata prefix + if 'metadataPrefix' not in params: + params['metadataPrefix'] = 'dcat_ap' + + # handleRequest points the request to the appropriate method in metadata_provider.py + response = oai_server.oai_server.handleRequest(params) + logger.debug(f"OAI-PMH Response: {response}") + # Replace date in datestamp by empty string + response = re.sub(b'.*', b'', response) + return Response(content=response, media_type="text/xml") + +# Define an endpoint for getting all the datasets +@app.get("/oai") +@app.post("/oai") +async def oai_all_datasets(request: Request): + params = dict(request.query_params) + + # Making sure it uses the dcat_ap metadata prefix + if 'metadataPrefix' not in params: + params['metadataPrefix'] = 'dcat_ap' + + # handleRequest points the request to the appropriate method in metadata_provider.py + response = oai_server.oai_server.handleRequest(params) + logger.debug(f"OAI-PMH Response: {response}") + # Replace date in datestamp by empty string + response = re.sub(b'.*', b'', response) + return Response(content=response, media_type="text/xml") + +# Endpoint for generating DCAT-AP IT catalog +@app.get("/dcatapit") +async def dcatapit(request: Request): + data = dataset_handler.get_datasets( + user_roles_names=request.auth.scopes + ) + catalog_graph, datasets_graph, distributions_graph, vcard_graph = convert_to_dcat_ap_it(data, BASE_URL) + # response = dcatapit_graph.serialize(format='pretty-xml') + response = serialize_and_concatenate_graphs(catalog_graph, datasets_graph, distributions_graph, vcard_graph) + + return Response(content=response, media_type="application/rdf+xml") \ No newline at end of file diff --git a/api/app/oai_dcat/__init__.py b/api/app/oai_dcat/__init__.py new file mode 100644 index 0000000..3654022 --- /dev/null +++ b/api/app/oai_dcat/__init__.py @@ -0,0 +1,3 @@ +# from . import metadata_provider as metadata_provider +from . import oai_server as oai_server +# from . import oai_utils as oai_utils diff --git a/api/app/oai_dcat/metadata_provider.py b/api/app/oai_dcat/metadata_provider.py new file mode 100644 index 0000000..6f140ff --- /dev/null +++ b/api/app/oai_dcat/metadata_provider.py @@ -0,0 +1,101 @@ +from oaipmh.common import Identify, Metadata, Header +from datetime import datetime +from lxml import etree +from lxml.etree import Element +import json +from .oai_utils import convert_to_dcat_ap +import logging + +import exceptions as exc +from endpoint_handlers import dataset_handler + +BASE_URL = "https://sebastien-datalake.cmcc.it/api/v2/datasets" + +# Logging config +logging.basicConfig(level=logging.DEBUG) + +# Each method in this class is a verb from the OAI-PMH protocol. Only listRecords is used by the data.europa harvester +class MyMetadataProvider: + # Method to list records, only method used by data.europa harvester + def listRecords(self, metadataPrefix='dcat_ap', from_=None, until=None, set=None): + logging.debug("Fetching data from API") + # Fetch data from the dataset endpoint + ''' + data = main.fetch_data( + f"{BASE_URL}{set}" + ) + ''' + if set: + dataset_url = f"{BASE_URL}/{set}" + + try: + data = dataset_handler.get_product_details( + user_roles_names=['public'], + dataset_id=set, + ) + except exc.BaseDDSException as err: + raise err.wrap_around_http_exception() from err + + else: + dataset_url = BASE_URL + data = dataset_handler.get_datasets(user_roles_names=['public']) + + logging.debug(f"Fetched data: {data}") + + # Convert to RDF graph with proper DCAT-AP fields (URL is being used to fill the accessURL field) + rdf_graph = convert_to_dcat_ap(data, f"{BASE_URL}{set}") + + # Serialize the RDF graph into a string, 'pretty-xml' format makes it more readable + rdf_string = rdf_graph.serialize(format='pretty-xml') + logging.debug(f"RDF string: {rdf_string}") + + # Create a header (mandatory for OAI-PMH) + header_element = Element("header") + header = Header(deleted=False, element=header_element, identifier="", datestamp=datetime.utcnow(), setspec=[]) + + # Create metadata element and fill it with the RDF/XML string + metadata_element = Element("metadata") + metadata = Metadata(element=metadata_element, map={"rdf": rdf_string}) + + + return [(header, metadata, [])], None + + # The remaining methods are only present because they are mandatory for the OAI-PMH protocol + + # Minimal implementation for identify + def identify(self): + return Identify( + repositoryName='Sebastien DataLake', # Name of the repository + baseURL='', # Base URL of the OAI-PMH endpoint + protocolVersion='2.0', # OAI-PMH protocol version + adminEmails=['sebastien_support@cmcc.it'], # List of admin email addresses + earliestDatestamp=datetime(2024, 1, 1), # Earliest datestamp for records + deletedRecord='no', # Policy on deleted records + granularity='YYYY-MM-DDThh:mm:ssZ', # Date granularity + compression=['identity'], # Supported compression methods + ) + + # Minimal implementation for getRecord + def getRecord(self, identifier, metadataPrefix='oai_dc'): + # Create a header + header = Header(identifier=identifier, datestamp=datetime.now(), setspec=[]) + + # Create metadata element (empty in this example) + metadata = Metadata(element=Element("record"), map={}) + + return header, metadata, [] + + # Minimal implementation for listIdentifiers + def listIdentifiers(self, metadataPrefix='oai_dc', from_=None, until=None, set_=None): + # Create a header + header = Header(identifier="id1", datestamp=datetime.now(), setspec=[]) + + return [header] + + # Minimal implementation for listMetadataFormats + def listMetadataFormats(self, identifier=None): + return [('oai_dc', 'http://www.openarchives.org/OAI/2.0/oai_dc.xsd', 'http://www.openarchives.org/OAI/2.0/oai_dc/')] + + # Minimal implementation for listSets + def listSets(self): + return [] diff --git a/api/app/oai_dcat/oai_server.py b/api/app/oai_dcat/oai_server.py new file mode 100644 index 0000000..c2ba898 --- /dev/null +++ b/api/app/oai_dcat/oai_server.py @@ -0,0 +1,34 @@ +from oaipmh.server import ServerBase, oai_dc_writer +from oaipmh.metadata import MetadataRegistry, oai_dc_reader +from . import metadata_provider +from lxml.etree import fromstring, tostring +import logging + +# Function to write metadata in dcat_ap format (RDF/XML), otherwise it would use the default format (oai_dc) +def dcat_ap_writer(metadata_element, metadata): + rdf_string = metadata["rdf"] + rdf_element = fromstring(bytes(rdf_string, encoding='utf-8')) + + for child in rdf_element: + metadata_element.append(child) + logging.debug(f"Metadata Element: {tostring(metadata_element, pretty_print=True).decode('utf-8')}") + + +# Create reader for dcat_ap metadata +def dcat_ap_reader(element): + rdf_string = tostring(element, encoding='unicode') + return {"rdf": rdf_string} + +# Server class, it defines writers and readers, as well as the metadata provider (metadata_provider.py) +class MyServer(ServerBase): + def __init__(self): + metadata_registry = MetadataRegistry() + metadata_registry.registerWriter("oai_dc", oai_dc_writer) + metadata_registry.registerReader("oai_dc", oai_dc_reader) + metadata_registry.registerWriter("dcat_ap", dcat_ap_writer) + metadata_registry.registerReader("dcat_ap", dcat_ap_reader) + server = metadata_provider.MyMetadataProvider() + super(MyServer, self).__init__(server, metadata_registry) + + +oai_server = MyServer() diff --git a/api/app/oai_dcat/oai_utils.py b/api/app/oai_dcat/oai_utils.py new file mode 100644 index 0000000..b45714f --- /dev/null +++ b/api/app/oai_dcat/oai_utils.py @@ -0,0 +1,499 @@ +import re +from rdflib import Graph, Literal, Namespace, RDF, URIRef, BNode +from rdflib.namespace import DCAT, DCTERMS, FOAF, RDF, XSD +import logging +from datetime import datetime + +# Dictionary with accrualPeriodicity values for somw known datasets +ACCRUAL_PERIODICITY = { + "blue-tongue": "AS_NEEDED", + "iot-animal": "HOURLY", + "pasture": "BIWEEKLY", + "pi": "DAILY", + "pi-long-term": "AS_NEEDED", + "thi": "DAILY", + "iot-environmental" : "IRREG" +} + +# Logging config +logging.basicConfig(level=logging.DEBUG) + +# Namespaces for DCAT-AP, to be binded to the RDF graph +DCAT = Namespace("http://www.w3.org/ns/dcat#") +DCT = Namespace("http://purl.org/dc/terms/") +FOAF = Namespace("http://xmlns.com/foaf/0.1/") +VCARD = Namespace("http://www.w3.org/2006/vcard/ns#") +EDP = Namespace("https://europeandataportal.eu/voc#") +SPDX = Namespace("http://spdx.org/rdf/terms#") +ADMS = Namespace("http://www.w3.org/ns/adms#") +DQV = Namespace("http://www.w3.org/ns/dqv#") +SKOS = Namespace("http://www.w3.org/2004/02/skos/core#") +SCHEMA = Namespace("http://schema.org/") +# Namespace for DCAT-AP IT +DCATAPIT = Namespace("http://dati.gov.it/onto/dcatapit#") + + +# Define classes for DCAT-AP entities (Dataset, Distribution and ContactPoint) + +class ContactPoint: + def __init__(self, name=None, email=None, webpage=None): + self.name = name + self.email = email + self.webpage = webpage + + +class Distribution: + def __init__(self, access_url=None, description=None, download_url=None, + media_type=None, format=None, rights=None, license=None, identifier=None): + self.access_url = access_url + self.description = description + self.download_url = download_url + self.media_type = media_type + self.format = format + self.rights = rights + self.license = license + self.identifier = identifier + + # Build the RDF graph for the distribution + def to_graph(self, g): + distribution = URIRef(self.uri) + g.add((distribution, RDF.type, DCAT.Distribution)) + if self.access_url: + g.add((distribution, DCAT.accessURL, URIRef(self.access_url))) + if self.description: + g.add((distribution, DCTERMS.description, Literal(self.description))) + if self.download_url: + g.add((distribution, DCAT.downloadURL, URIRef(self.download_url))) + if self.media_type: + g.add((distribution, DCTERMS.mediaType, URIRef(self.media_type))) + if self.format: + g.add((distribution, DCTERMS.format, URIRef(self.format))) + if self.rights: + rights_bnode = BNode() + g.add((distribution, DCTERMS.rights, rights_bnode)) + g.add((rights_bnode, RDF.type, DCTERMS.RightsStatement)) + g.add((rights_bnode, DCTERMS.rights, URIRef(self.rights))) + if self.license: + license_bnode = BNode() + g.add((distribution, DCTERMS.license, license_bnode)) + g.add((license_bnode, RDF.type, DCTERMS.LicenseDocument)) + g.add((license_bnode, DCTERMS.license, URIRef(self.license))) + if self.identifier: + g.add((distribution, DCTERMS.identifier, Literal(self.identifier))) + return g + + +class DatasetDCAT: + def __init__(self, uri, title=None, description=None, issued=None, identifier=None, contact_point=None): + self.uri = uri + self.title = title + self.description = description + self.issued = issued + self.identifier = identifier + self.contact_point = contact_point + self.distributions = [] + + def add_distribution(self, distribution): + self.distributions.append(distribution) + + # Build the RDF graph for the dataset + def to_graph(self, g): + dataset = URIRef(self.uri) + g.add((dataset, RDF.type, DCAT.Dataset)) + logging.debug(f"Adding to graph {g.identifier}: {dataset} a type {DCAT.Dataset}") + + if self.title: + g.add((dataset, DCTERMS.title, Literal(self.title))) + if self.description: + g.add((dataset, DCTERMS.description, Literal(self.description))) + if self.issued: + g.add((dataset, DCTERMS.issued, Literal(self.issued, datatype=DCTERMS.W3CDTF))) + if self.identifier: + g.add((dataset, DCTERMS.identifier, Literal(self.identifier))) + + if self.contact_point: + contact_bnode = BNode() + g.add((dataset, DCAT.contactPoint, contact_bnode)) + g.add((contact_bnode, RDF.type, VCARD.Kind)) + if self.contact_point.name: + g.add((contact_bnode, VCARD.fn, Literal(self.contact_point.name))) + if self.contact_point.email: + g.add((contact_bnode, VCARD.hasEmail, URIRef(f"mailto:{self.contact_point.email}"))) + if self.contact_point.webpage: + g.add((contact_bnode, VCARD.hasURL, URIRef(self.contact_point.webpage))) + + for dist in self.distributions: + distribution_bnode = BNode() + g.add((dataset, DCAT.distribution, distribution_bnode)) + g.add((distribution_bnode, RDF.type, DCAT.Distribution)) + if dist.access_url: + g.add((distribution_bnode, DCAT.accessURL, URIRef(dist.access_url))) + if dist.description: + g.add((distribution_bnode, DCTERMS.description, Literal(dist.description))) + if dist.download_url: + g.add((distribution_bnode, DCAT.downloadURL, URIRef(dist.download_url))) + if dist.media_type: + g.add((distribution_bnode, DCTERMS.mediaType, URIRef(dist.media_type))) + if dist.format: + g.add((distribution_bnode, DCTERMS.format, URIRef(dist.format))) + if dist.rights: + rights_bnode = BNode() + g.add((distribution_bnode, DCTERMS.rights, rights_bnode)) + g.add((rights_bnode, RDF.type, DCTERMS.RightsStatement)) + g.add((rights_bnode, DCTERMS.rights, URIRef(dist.rights))) + if dist.license: + license_bnode = BNode() + g.add((distribution_bnode, DCTERMS.license, license_bnode)) + g.add((license_bnode, RDF.type, DCTERMS.LicenseDocument)) + g.add((license_bnode, DCTERMS.license, URIRef(dist.license))) + if dist.identifier: + g.add((distribution_bnode, DCTERMS.identifier, Literal(dist.identifier))) + + return g + + +# Define classes for DCAT-AP IT entities (Catalog, Dataset, Distribution, and ContactPoint) + +class ContactPointIT: + def __init__(self, name=None, email=None, webpage=None): + self.name = name + self.email = email + self.webpage = webpage + + def to_graph(self, g, parent): + contact_bnode = BNode() + g.add((parent, DCAT.contactPoint, contact_bnode)) + g.add((contact_bnode, RDF.type, VCARD.Kind)) + if self.name: + g.add((contact_bnode, VCARD.fn, Literal(self.name))) + if self.email: + g.add((contact_bnode, VCARD.hasEmail, URIRef(f"mailto:{self.email}"))) + if self.webpage: + g.add((contact_bnode, VCARD.hasURL, URIRef(self.webpage))) + + +class DistributionIT: + def __init__(self, uri, access_url=None, description=None, download_url=None, + media_type=None, format=None, rights=None, license=None, identifier=None): + self.uri = uri + self.access_url = access_url + self.description = description + self.download_url = download_url + self.media_type = media_type + self.format = format + self.rights = rights + self.license = license + self.identifier = identifier + + def to_graph(self, g): + distribution = URIRef(self.uri) + g.add((distribution, RDF.type, DCATAPIT.Distribution)) + if self.access_url: + g.add((distribution, DCAT.accessURL, URIRef(self.access_url))) + if self.description: + g.add((distribution, DCTERMS.description, Literal(self.description))) + if self.download_url: + g.add((distribution, DCAT.downloadURL, URIRef(self.download_url))) + if self.media_type: + g.add((distribution, DCTERMS.mediaType, URIRef(self.media_type))) + if self.format: + g.add((distribution, DCTERMS.format, URIRef(self.format))) + if self.rights: + rights_bnode = BNode() + g.add((distribution, DCTERMS.rights, rights_bnode)) + g.add((rights_bnode, RDF.type, DCTERMS.RightsStatement)) + g.add((rights_bnode, DCTERMS.rights, URIRef(self.rights))) + if self.license: + license_bnode = BNode() + g.add((distribution, DCTERMS.license, license_bnode)) + g.add((license_bnode, RDF.type, DCTERMS.LicenseDocument)) + g.add((license_bnode, DCTERMS.license, URIRef(self.license))) + if self.identifier: + g.add((distribution, DCTERMS.identifier, Literal(self.identifier))) + + +class DatasetDCATAPIT: + def __init__(self, uri, title=None, description=None, issued=None, identifier=None, contact_point=None): + self.uri = uri + self.title = title + self.description = description + self.issued = issued + self.identifier = identifier + self.contact_point = contact_point + self.distributions = [] + + def add_distribution(self, distribution): + self.distributions.append(distribution) + + def to_graph(self, g): + dataset = URIRef(self.uri) + g.add((dataset, RDF.type, DCATAPIT.Dataset)) + if self.title: + g.add((dataset, DCTERMS.title, Literal(self.title))) + if self.description: + g.add((dataset, DCTERMS.description, Literal(self.description))) + if self.issued: + g.add((dataset, DCTERMS.issued, Literal(self.issued, datatype=DCTERMS.W3CDTF))) + if self.identifier: + g.add((dataset, DCTERMS.identifier, Literal(self.identifier))) + + if self.contact_point: + self.contact_point.to_graph(g, dataset) + + for dist in self.distributions: + distribution_uri = URIRef(dist.uri) + g.add((dataset, DCAT.distribution, distribution_uri)) + + return g + + +class CatalogIT: + def __init__(self, uri, title, description, modified, publisher_name, publisher_identifier, publisher_homepage, + publisher_email, dataset_uris=None): + self.uri = uri + self.title = title + self.description = description + self.modified = modified + self.publisher_name = publisher_name + self.publisher_identifier = publisher_identifier + self.publisher_homepage = publisher_homepage + self.publisher_email = publisher_email + self.dataset_uris = dataset_uris if dataset_uris is not None else [] + + def add_dataset(self, dataset_uri): + self.dataset_uris.append(dataset_uri) + + def to_graph(self, g): + catalog = URIRef(self.uri) + g.add((catalog, RDF.type, DCATAPIT.Catalog)) + g.add((catalog, DCTERMS.title, Literal(self.title))) + g.add((catalog, DCTERMS.description, Literal(self.description))) + g.add((catalog, DCTERMS.modified, Literal(self.modified, datatype=DCTERMS.W3CDTF))) + + catalog_publisher_node = BNode() + g.add((catalog, DCTERMS.publisher, catalog_publisher_node)) + g.add((catalog_publisher_node, RDF.type, FOAF.Agent)) + g.add((catalog_publisher_node, RDF.type, DCATAPIT.Agent)) + g.add((catalog_publisher_node, FOAF.name, Literal(self.publisher_name))) + g.add((catalog_publisher_node, DCTERMS.identifier, Literal(self.publisher_identifier))) + g.add((catalog_publisher_node, FOAF.homepage, URIRef(self.publisher_homepage))) + g.add((catalog_publisher_node, FOAF.mbox, URIRef(f"mailto:{self.publisher_email}"))) + + for dataset_uri in self.dataset_uris: + g.add((catalog, DCAT.dataset, URIRef(dataset_uri))) + + return g + + # Function to convert to DCAT-AP IT format + + +def convert_to_dcat_ap_it(data, url): + # Create separate graphs + catalog_graph = Graph() + datasets_graph = Graph() + distributions_graph = Graph() + vcard_graph = Graph() + + # Bind namespaces to all graphs + for g in [catalog_graph, datasets_graph, distributions_graph, vcard_graph]: + g.bind("dcatapit", DCATAPIT) + g.bind("foaf", FOAF) + g.bind("dcat", DCAT) + g.bind("dct", DCT) + g.bind("vcard", VCARD) + g.bind("rdf", RDF) + + # Contact point URI + contact_point_uri = URIRef("https://www.cmcc.it") + + # Create catalog + catalog_uri = URIRef(url) + catalog_graph.add((catalog_uri, RDF.type, DCATAPIT.Catalog)) + catalog_graph.add((catalog_uri, RDF.type, DCAT.Catalog)) + catalog_graph.add((catalog_uri, DCTERMS.title, Literal("Sebastien Catalog"))) + catalog_graph.add((catalog_uri, DCTERMS.description, Literal("A catalog of Sebastien datasets"))) + catalog_graph.add((catalog_uri, FOAF.homepage, Literal(url))) + catalog_graph.add( + (catalog_uri, DCTERMS.language, Literal("http://publications.europa.eu/resource/authority/language/ITA"))) + catalog_graph.add((catalog_uri, DCTERMS.modified, Literal(datetime.now(), datatype=XSD.dateTime))) + + # Add publisher information + publisher = BNode() + catalog_graph.add((catalog_uri, DCTERMS.publisher, publisher)) + catalog_graph.add((publisher, RDF.type, FOAF.Agent)) + catalog_graph.add((publisher, RDF.type, DCATAPIT.Agent)) + catalog_graph.add((publisher, FOAF.name, Literal("CMCC Foundation"))) + catalog_graph.add((publisher, DCTERMS.identifier, Literal("XW88C90Q"))) + catalog_graph.add((publisher, FOAF.homepage, URIRef("https://www.cmcc.it"))) + catalog_graph.add((publisher, FOAF.mbox, URIRef("mailto:dds-support@cmcc.it"))) + + for i, dataset in enumerate(data, 1): + if "dataset" not in dataset: + dataset = {"dataset": dataset} + dataset_id = dataset.get("dataset", {}).get("metadata", {}).get("id") + dataset_uri = URIRef(f'{url}/{i}') + + # Add dataset reference to catalog + catalog_graph.add((catalog_uri, DCAT.dataset, dataset_uri)) + + # Create dataset + datasets_graph.add((dataset_uri, RDF.type, DCATAPIT.Dataset)) + datasets_graph.add((dataset_uri, RDF.type, DCAT.Dataset)) + datasets_graph.add( + (dataset_uri, DCTERMS.title, Literal(dataset.get("dataset", {}).get("metadata", {}).get("label")))) + datasets_graph.add((dataset_uri, DCTERMS.description, + Literal(dataset.get("dataset", {}).get("metadata", {}).get("description")))) + datasets_graph.add((dataset_uri, DCTERMS.issued, Literal( + datetime.strptime(str(dataset.get("dataset", {}).get("metadata", {}).get("publication_date")), '%Y-%m-%d'), + datatype=XSD.dateTime))) + datasets_graph.add((dataset_uri, DCTERMS.identifier, Literal(f"XW88C90Q:{dataset_id}"))) + datasets_graph.add( + (dataset_uri, DCTERMS.language, Literal("http://publications.europa.eu/resource/authority/language/ITA"))) + # Add dct:modified, dcat:theme, dct:rightsHolder and dct:accrualPeriodicity + datasets_graph.add((dataset_uri, DCTERMS.modified, Literal(datetime.now(), datatype=XSD.dateTime))) + datasets_graph.add( + (dataset_uri, DCAT.theme, URIRef("http://publications.europa.eu/resource/authority/data-theme/AGRI"))) + datasets_graph.add((dataset_uri, DCTERMS.accrualPeriodicity, URIRef( + f"http://publications.europa.eu/resource/authority/frequency/{ACCRUAL_PERIODICITY.get(dataset_id)}"))) + # Add publisher info on dataset + publisher_dataset = BNode() + datasets_graph.add((dataset_uri, DCTERMS.publisher, publisher_dataset)) + datasets_graph.add((publisher_dataset, RDF.type, FOAF.Agent)) + datasets_graph.add((publisher_dataset, RDF.type, DCATAPIT.Agent)) + datasets_graph.add((publisher_dataset, FOAF.name, Literal("CMCC Foundation"))) + datasets_graph.add((publisher_dataset, DCTERMS.identifier, Literal("XW88C90Q"))) + # Add rights holder BNode + rights_holder_uri = BNode() + datasets_graph.add((dataset_uri, DCTERMS.rightsHolder, rights_holder_uri)) + datasets_graph.add((rights_holder_uri, RDF.type, DCATAPIT.Agent)) + datasets_graph.add((rights_holder_uri, RDF.type, FOAF.Agent)) + datasets_graph.add((rights_holder_uri, DCTERMS.identifier, Literal("XW88C90Q"))) + datasets_graph.add((rights_holder_uri, FOAF.name, Literal("CMCC Foundation"))) + + # Add contact point + contact = dataset.get("dataset", {}).get("metadata", {}).get("contact") + datasets_graph.add((dataset_uri, DCAT.contactPoint, contact_point_uri)) + + # Create distribution + # products = dataset.get("dataset", {}).get("metadata", {}).get("products", {}).get("monthly", {}) + distribution_uri = URIRef(f'{url}/{dataset_id}') + datasets_graph.add((dataset_uri, DCAT.distribution, distribution_uri)) + distributions_graph.add((distribution_uri, RDF.type, DCAT.Distribution)) + distributions_graph.add((distribution_uri, DCAT.accessURL, distribution_uri)) + distributions_graph.add((distribution_uri, DCTERMS.title, + Literal(dataset.get("dataset", {}).get("metadata", {}).get("description")))) + distributions_graph.add((distribution_uri, DCTERMS.description, + Literal(dataset.get("dataset", {}).get("metadata", {}).get("description")))) + license_uri = URIRef("https://w3id.org/italia/controlled-vocabulary/licences/A21_CCBY40") + license_document = BNode() + distributions_graph.add((distribution_uri, DCTERMS.license, license_document)) + distributions_graph.add((license_document, RDF.type, DCATAPIT.LicenseDocument)) + distributions_graph.add( + (license_document, DCTERMS.type, URIRef("http://purl.org/adms/licencetype/Attribution"))) + distributions_graph.add( + (license_document, FOAF.name, Literal("Creative Commons Attribuzione 4.0 Internazionale (CC BY 4.0)"))) + distributions_graph.add((distribution_uri, DCTERMS.format, + URIRef("http://publications.europa.eu/resource/authority/file-type/JSON"))) + distributions_graph.add((distribution_uri, RDF.type, DCATAPIT.Distribution)) + + # Create vcard:Organization node + contact = dataset.get("dataset", {}).get("metadata", {}).get("contact") + vcard_graph.add((contact_point_uri, RDF.type, VCARD.Organization)) + vcard_graph.add((contact_point_uri, RDF.type, URIRef("http://dati.gov.it/onto/dcatapit#Organization"))) + vcard_graph.add((contact_point_uri, RDF.type, URIRef("http://xmlns.com/foaf/0.1/Organization"))) + vcard_graph.add((contact_point_uri, RDF.type, URIRef("http://www.w3.org/2006/vcard/ns#Kind"))) + vcard_graph.add((contact_point_uri, VCARD.fn, Literal(contact.get("name")))) + vcard_graph.add((contact_point_uri, VCARD.hasEmail, URIRef(f"mailto:{contact.get('email')}"))) + vcard_graph.add((contact_point_uri, VCARD.hasURL, URIRef(contact.get("webpage")))) + + return catalog_graph, datasets_graph, distributions_graph, vcard_graph + + +def serialize_and_concatenate_graphs(catalog_graph, datasets_graph, distributions_graph, vcard_graph): + # Serialize each graph to a string + catalog_str = catalog_graph.serialize(format='pretty-xml') + datasets_str = datasets_graph.serialize(format='pretty-xml') + distributions_str = distributions_graph.serialize(format='pretty-xml') + vcard_str = vcard_graph.serialize(format='pretty-xml') + + # Remove XML headers and opening tags from datasets and distributions and vcard strings + datasets_str = re.sub(r'<\?xml[^>]+\?>', '', datasets_str) + datasets_str = re.sub(r']*>', '', datasets_str, count=1).rsplit('', 1)[0] + distributions_str = re.sub(r'<\?xml[^>]+\?>', '', distributions_str) + distributions_str = re.sub(r']*>', '', distributions_str, count=1).rsplit('', 1)[0] + vcard_str = re.sub(r'<\?xml[^>]+\?>', '', vcard_str) + vcard_str = re.sub(r']*>', '', vcard_str, count=1).rsplit('', 1)[0] + + # Concatenate the strings + final_str = catalog_str.rsplit('', 1)[0] + datasets_str + distributions_str + vcard_str + '' + + # Manually add the vcard namespace declaration + final_str = final_str.replace( + ' list: @@ -419,7 +421,10 @@ def is_product_valid_for_role( def _process_query(kube, query: GeoQuery, compute: None | bool = False): if isinstance(kube, Dataset): Datastore._LOG.debug("filtering with: %s", query.filters) - kube = kube.filter(**query.filters) + try: + kube = kube.filter(**query.filters) + except ValueError as err: + Datastore._LOG.warning("could not filter by one of the key: %s", err) Datastore._LOG.debug("resulting kube len: %s", len(kube)) if isinstance(kube, Delayed) and compute: kube = kube.compute() @@ -444,10 +449,10 @@ def _process_query(kube, query: GeoQuery, compute: None | bool = False): if query.vertical: Datastore._LOG.debug("subsetting by vertical...") if isinstance( - vertical := Datastore._maybe_convert_dict_slice_to_slice( - query.vertical - ), - slice, + vertical := Datastore._maybe_convert_dict_slice_to_slice( + query.vertical + ), + slice, ): method = None else: @@ -463,4 +468,4 @@ def _maybe_convert_dict_slice_to_slice(dict_vals): dict_vals.get("stop"), dict_vals.get("step"), ) - return dict_vals + return dict_vals \ No newline at end of file diff --git a/datastore/dbmanager/dbmanager.py b/datastore/dbmanager/dbmanager.py index b11c46c..d4ff293 100644 --- a/datastore/dbmanager/dbmanager.py +++ b/datastore/dbmanager/dbmanager.py @@ -43,6 +43,7 @@ class RequestStatus(Enum_): """Status of the Request""" PENDING = auto() + QUEUED = auto() RUNNING = auto() DONE = auto() FAILED = auto() @@ -85,7 +86,7 @@ class User(Base): String(255), nullable=False, unique=True, default=generate_key ) contact_name = Column(String(255)) - requests = relationship("Request") + requests = relationship("Request", lazy="dynamic") roles = relationship("Role", secondary=association_table, lazy="selectin") @@ -96,7 +97,7 @@ class Worker(Base): host = Column(String(255)) dask_scheduler_port = Column(Integer) dask_dashboard_address = Column(String(10)) - created_on = Column(DateTime, nullable=False) + created_on = Column(DateTime, default=datetime.now) class Request(Base): @@ -112,8 +113,8 @@ class Request(Base): product = Column(String(255)) query = Column(JSON()) estimate_size_bytes = Column(Integer) - created_on = Column(DateTime, nullable=False) - last_update = Column(DateTime) + created_on = Column(DateTime, default=datetime.now) + last_update = Column(DateTime, default=datetime.now, onupdate=datetime.now) fail_reason = Column(String(1000)) download = relationship("Download", uselist=False, lazy="selectin") @@ -128,7 +129,7 @@ class Download(Base): storage_id = Column(Integer, ForeignKey("storages.storage_id")) location_path = Column(String(255)) size_bytes = Column(Integer) - created_on = Column(DateTime, nullable=False) + created_on = Column(DateTime, default=datetime.now) class Storage(Base): @@ -267,16 +268,18 @@ def create_request( def update_request( self, request_id: int, - worker_id: int, - status: RequestStatus, + worker_id: int | None = None, + status: RequestStatus | None = None, location_path: str = None, size_bytes: int = None, fail_reason: str = None, ) -> int: with self.__session_maker() as session: request = session.query(Request).get(request_id) - request.status = status - request.worker_id = worker_id + if status: + request.status = status + if worker_id: + request.worker_id = worker_id request.last_update = datetime.utcnow() request.fail_reason = fail_reason session.commit() @@ -305,7 +308,17 @@ def get_request_status_and_reason( def get_requests_for_user_id(self, user_id) -> list[Request]: with self.__session_maker() as session: - return session.query(User).get(user_id).requests + return session.query(User).get(user_id).requests.all() + + def get_requests_for_user_id_and_status( + self, user_id, status: RequestStatus | tuple[RequestStatus] + ) -> list[Request]: + if isinstance(status, RequestStatus): + status = (status,) + with self.__session_maker() as session: + return session.get(User, user_id).requests.filter( + Request.status.in_(status) + ) def get_download_details_for_request_id(self, request_id) -> Download: with self.__session_maker() as session: diff --git a/datastore/geoquery/geoquery.py b/datastore/geoquery/geoquery.py index 8446660..b145587 100644 --- a/datastore/geoquery/geoquery.py +++ b/datastore/geoquery/geoquery.py @@ -15,6 +15,7 @@ class GeoQuery(BaseModel, extra="allow"): vertical: Optional[Union[float, List[float], Dict[str, float]]] filters: Optional[Dict] format: Optional[str] + format_args: Optional[Dict] # TODO: Check if we are going to allow the vertical coordinates inside both # `area`/`location` nad `vertical` diff --git a/datastore/workflow/workflow.py b/datastore/workflow/workflow.py index e609a77..04720ee 100644 --- a/datastore/workflow/workflow.py +++ b/datastore/workflow/workflow.py @@ -108,9 +108,9 @@ def _subset(kube: DataCube | None = None) -> DataCube: return Datastore().query( dataset_id=dataset_id, product_id=product_id, - query=query - if isinstance(query, GeoQuery) - else GeoQuery(**query), + query=( + query if isinstance(query, GeoQuery) else GeoQuery(**query) + ), compute=False, ) @@ -153,18 +153,21 @@ def _average(kube: DataCube | None = None) -> DataCube: ) self._add_computational_node(task) return self - + def to_regular( self, id: Hashable, *, dependencies: list[Hashable] ) -> "Workflow": def _to_regular(kube: DataCube | None = None) -> DataCube: - assert kube is not None, "`kube` cannot be `None` for `to_regular``" + assert ( + kube is not None + ), "`kube` cannot be `None` for `to_regular``" return kube.to_regular() + task = _WorkflowTask( id=id, operator=_to_regular, dependencies=dependencies ) self._add_computational_node(task) - return self + return self def add_task( self, diff --git a/docker-compose.yaml b/docker-compose.yaml index 995b7ca..c510542 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,19 +1,18 @@ version: "3" services: api: - build: - context: ./ - dockerfile: ./api/Dockerfile + image: local/geolake-api:latest ports: - "8080:80" depends_on: + - datastore - broker - db links: - broker - db environment: - CATALOG_PATH: /code/app/resources/catalogs/catalog.yaml + CATALOG_PATH: /catalog/catalog.yaml POSTGRES_DB: dds POSTGRES_USER: dds POSTGRES_PASSWORD: dds @@ -21,11 +20,9 @@ services: POSTGRES_PORT: 5432 volumes: - downloads:/downloads:ro - command: ["./wait-for-it.sh", "broker:5672", "--", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "80"] + command: ["./../wait-for-it.sh", "broker:5672", "--", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "80"] executor: - build: - context: ./ - dockerfile: ./executor/Dockerfile + image: local/geolake-executor:latest depends_on: - broker - db @@ -37,14 +34,20 @@ services: environment: EXECUTOR_TYPES: query,info,estimate CATALOG_PATH: /code/app/resources/catalogs/catalog.yaml - POSTGRES_DB: dds - POSTGRES_USER: dds - POSTGRES_PASSWORD: dds + POSTGRES_DB: geolake + POSTGRES_USER: geolake + POSTGRES_PASSWORD: geolake POSTGRES_HOST: db POSTGRES_PORT: 5432 volumes: - downloads:/downloads:rw - command: ["./wait-for-it.sh", "broker:5672", "--", "python", "./app/main.py"] + command: ["./../wait-for-it.sh", "broker:5672", "--", "python", "./app/main.py"] + datastore: + image: local/geolake-datastore:latest + depends_on: + - drivers + drivers: + image: local/geolake-drivers:latest broker: image: rabbitmq:3 db: @@ -55,31 +58,12 @@ services: ports: - 5432:5432 environment: - POSTGRES_DB: dds - POSTGRES_USER: dds - POSTGRES_PASSWORD: dds + POSTGRES_DB: geolake + POSTGRES_USER: geolake + POSTGRES_PASSWORD: geolake POSTGRES_HOST: db POSTGRES_PORT: 5432 - web: - build: - context: ./ - dockerfile: ./web/Dockerfile - ports: - - "8080:80" - depends_on: - - db - links: - - db - environment: - CATALOG_PATH: /code/app/resources/catalogs/catalog.yaml - POSTGRES_DB: dds - POSTGRES_USER: dds - POSTGRES_PASSWORD: dds - POSTGRES_HOST: db - POSTGRES_PORT: 5432 - volumes: - - downloads:/downloads:ro - command: ["./wait-for-it.sh", "broker:5672", "--", "uvicorn", "web.main:app", "--host", "0.0.0.0", "--port", "80"] volumes: - downloads: \ No newline at end of file + downloads: + catalog: \ No newline at end of file diff --git a/drivers/Dockerfile b/drivers/Dockerfile index 978eaa1..57874a2 100644 --- a/drivers/Dockerfile +++ b/drivers/Dockerfile @@ -1,8 +1,9 @@ ARG REGISTRY=rg.fr-par.scw.cloud/geokube #ARG TAG=v0.2.6b2 -ARG TAG=2024.05.03.10.36 +ARG TAG=latest FROM $REGISTRY/geokube:$TAG COPY dist/intake_geokube-0.1a0-py3-none-any.whl / + RUN pip install /intake_geokube-0.1a0-py3-none-any.whl -RUN rm /intake_geokube-0.1a0-py3-none-any.whl \ No newline at end of file +RUN rm /intake_geokube-0.1a0-py3-none-any.whl diff --git a/drivers/intake_geokube/__init__.py b/drivers/intake_geokube/__init__.py index dc60a1d..2c990df 100644 --- a/drivers/intake_geokube/__init__.py +++ b/drivers/intake_geokube/__init__.py @@ -3,3 +3,4 @@ # This avoids a circilar dependency pitfall by ensuring that the # driver-discovery code runs first, see: # https://intake.readthedocs.io/en/latest/making-plugins.html#entrypoints +from .geoquery import GeoQuery \ No newline at end of file diff --git a/drivers/intake_geokube/base.py b/drivers/intake_geokube/base.py index e3c689b..b30c4d8 100644 --- a/drivers/intake_geokube/base.py +++ b/drivers/intake_geokube/base.py @@ -1,8 +1,12 @@ # from . import __version__ +from dask.delayed import Delayed from intake.source.base import DataSource, Schema from geokube.core.datacube import DataCube from geokube.core.dataset import Dataset +from .geoquery import GeoQuery + + class GeokubeSource(DataSource): """Common behaviours for plugins in this repo""" @@ -10,6 +14,13 @@ class GeokubeSource(DataSource): version = "0.1a0" container = "geokube" partition_access = True + geoquery: GeoQuery | None + compute: bool + + def __init__(self, metadata, geoquery: GeoQuery = None, compute: bool = False): + super().__init__(metadata=metadata) + self.geoquery = geoquery + self.compute = compute def _get_schema(self): """Make schema object, which embeds goekube fields metadata""" @@ -56,9 +67,8 @@ def read(self): def read_chunked(self): """Return a lazy geokube object""" - self._load_metadata() - return self._kube - + return self.read() + def read_partition(self, i): """Fetch one chunk of data at tuple index i""" raise NotImplementedError @@ -76,3 +86,25 @@ def close(self): """Delete open file from memory""" self._kube = None self._schema = None + + def process_with_query(self): + self.read_chunked() + if not self.geoquery: + return self._kube.compute() if self.compute else self._kube + if isinstance(self._kube, Dataset): + self._kube = self._kube.filter(**self.geoquery.filters) + if isinstance(self._kube, Delayed) and self.compute: + self._kube = self._kube.compute() + if self.geoquery.variable: + self._kube = self._kube[self.geoquery.variable] + if self.geoquery.area: + self._kube = self._kube.geobbox(**self.geoquery.area) + if self.geoquery.location: + self._kube = self._kube.locations(**self.geoquery.location) + if self.geoquery.time: + self._kube = self._kube.sel(time=self.geoquery.time) + if self.geoquery.vertical: + method = None if isinstance(self.geoquery.vertical, slice) else "nearest" + self._kube = self._kube.sel(vertical=self.geoquery.vertical, method=method) + return self._kube.compute() if self.compute else self._kube + \ No newline at end of file diff --git a/drivers/intake_geokube/geoquery.py b/drivers/intake_geokube/geoquery.py new file mode 100644 index 0000000..544e654 --- /dev/null +++ b/drivers/intake_geokube/geoquery.py @@ -0,0 +1,111 @@ +import json +from typing import Optional, List, Dict, Union, Any, TypeVar + +from pydantic import BaseModel, root_validator, validator + +TGeoQuery = TypeVar("TGeoQuery") + +def _maybe_convert_dict_slice_to_slice(dict_vals): + if "start" in dict_vals or "stop" in dict_vals: + return slice( + dict_vals.get("start"), + dict_vals.get("stop"), + dict_vals.get("step"), + ) + return dict_vals + +class _GeoQueryJSONEncoder(json.JSONEncoder): + + def default(self, obj): + if isinstance(obj, slice): + return { + "start": obj.start, + "stop": obj.stop, + "step": obj.step + } + return json.JSONEncoder.default(self, obj) + + +class GeoQuery(BaseModel): + variable: Optional[Union[str, List[str]]] + # TODO: Check how `time` is to be represented + time: Optional[Union[Dict[str, str | None], Dict[str, List[str]]]] + area: Optional[Dict[str, float]] + location: Optional[Dict[str, Union[float, List[float]]]] + vertical: Optional[Union[float, List[float], Dict[str, float]]] + filters: Optional[Dict] + format: Optional[str] + format_args: Optional[Dict] + + class Config: + extra = "allow" + json_encoders = {slice: lambda s: { + "start": s.start, + "stop": s.stop, + "step": s.step + }} + + # TODO: Check if we are going to allow the vertical coordinates inside both + # `area`/`location` nad `vertical` + + @root_validator + def area_locations_mutually_exclusive_validator(cls, query): + if query["area"] is not None and query["location"] is not None: + raise KeyError( + "area and location couldn't be processed together, please use" + " one of them" + ) + return query + + @root_validator(pre=True) + def build_filters(cls, values: Dict[str, Any]) -> Dict[str, Any]: + if "filters" in values: + return values + filters = {k: _maybe_convert_dict_slice_to_slice(v) for k, v in values.items() if k not in cls.__fields__} + values = {k: v for k, v in values.items() if k in cls.__fields__} + values["filters"] = filters + return values + + @validator("time", always=True) + def match_time_dict(cls, value): + if isinstance(value, dict): + assert any([k in value for k in ("start", "stop", "year", "month", "day", "hour")]), "Missing dictionary key" + if "start" in value or "stop" in value: + return _maybe_convert_dict_slice_to_slice(value) + return value + + + @validator("vertical", always=True) + def match_vertical_dict(cls, value): + if isinstance(value, dict): + assert "start" in value, "Missing 'start' key" + assert "stop" in value, "Missing 'stop' key" + return _maybe_convert_dict_slice_to_slice(value) + return value + + def original_query_json(self): + """Return the JSON representation of the original query submitted + to the geokube-dds""" + res = super().dict() + res = dict(**res.pop("filters", {}), **res) + # NOTE: skip empty values to make query representation + # shorter and more elegant + res = dict(filter(lambda item: item[1] is not None, res.items())) + return json.dumps(res, cls=_GeoQueryJSONEncoder) + + @classmethod + def parse( + cls, load: TGeoQuery | dict | str | bytes | bytearray + ) -> TGeoQuery: + if isinstance(load, cls): + return load + if isinstance(load, (str, bytes, bytearray)): + load = json.loads(load) + if isinstance(load, dict): + load = GeoQuery(**load) + else: + raise TypeError( + f"type of the `load` argument ({type(load).__name__}) is not" + " supported!" + ) + return load diff --git a/drivers/intake_geokube/netcdf.py b/drivers/intake_geokube/netcdf.py index 7247891..e8b0754 100644 --- a/drivers/intake_geokube/netcdf.py +++ b/drivers/intake_geokube/netcdf.py @@ -21,6 +21,7 @@ def __init__( metadata=None, mapping: Optional[Mapping[str, Mapping[str, str]]] = None, load_files_on_persistance: Optional[bool] = True, + **kwargs ): self._kube = None self.path = path @@ -34,7 +35,7 @@ def __init__( self.xarray_kwargs = {} if xarray_kwargs is None else xarray_kwargs self.load_files_on_persistance = load_files_on_persistance # self.xarray_kwargs.update({'engine' : 'netcdf'}) - super(NetCDFSource, self).__init__(metadata=metadata) + super(NetCDFSource, self).__init__(metadata=metadata, **kwargs) def _open_dataset(self): if self.pattern is None: diff --git a/drivers/intake_geokube/sentinel.py b/drivers/intake_geokube/sentinel.py new file mode 100644 index 0000000..4c6b612 --- /dev/null +++ b/drivers/intake_geokube/sentinel.py @@ -0,0 +1,205 @@ +"""Geokube driver for sentinel data.""" + +from collections import defaultdict +from multiprocessing.util import get_temp_dir +import os +import dask +import zipfile +import glob +from functools import partial +from typing import Generator, Iterable, Mapping, Optional, List + +import numpy as np +import pandas as pd +import xarray as xr +from pyproj import Transformer +from pyproj.crs import CRS, GeographicCRS +from intake.source.utils import reverse_format + +from geokube import open_datacube +from geokube.core.dataset import Dataset + +from .base import GeokubeSource +from .geoquery import GeoQuery + +SENSING_TIME_ATTR: str = "sensing_time" +FILE: str = "files" +DATACUBE: str = "datacube" + + +def get_field_name_from_path(path: str): + res, file = path.split(os.sep)[-2:] + band = file.split("_")[-2] + return f"{res}_{band}" + + +def preprocess_sentinel(dset: xr.Dataset, pattern: str, **kw) -> xr.Dataset: + crs = CRS.from_cf(dset["spatial_ref"].attrs) + transformer = Transformer.from_crs( + crs_from=crs, crs_to=GeographicCRS(), always_xy=True + ) + x_vals, y_vals = dset["x"].to_numpy(), dset["y"].to_numpy() + lon_vals, lat_vals = transformer.transform(*np.meshgrid(x_vals, y_vals)) + source_path = dset.encoding["source"] + sensing_time = os.path.splitext(source_path.split(os.sep)[-6])[0].split( + "_" + )[-1] + time = pd.to_datetime([sensing_time]).to_numpy() + dset = dset.assign_coords( + { + "time": time, + "latitude": (("x", "y"), lat_vals), + "longitude": (("x", "y"), lon_vals), + } + ).rename({"band_data": get_field_name_from_path(source_path)}) + return dset + + +def get_zip_files_from_path(path: str) -> Generator: + assert path and isinstance(path, str), "`path` must be a string" + assert path.lower().endswith("zip"), "`path` must point to a ZIP archive" + if "*" in path: + yield from glob.iglob(path) + return + yield path + + +def unzip_data(files: Iterable[str], target: str) -> List[str]: + """Unzip ZIP archive to the `target` directory.""" + target_files = [] + for file in files: + prod_id = os.path.splitext(os.path.basename(file))[0] + target_prod = os.path.join(target, prod_id) + os.makedirs(target_prod, exist_ok=True) + with zipfile.ZipFile(file) as archive: + archive.extractall(path=target_prod) + target_files.append(os.listdir(target_prod)) + return target_files + + +def _prepare_df_from_files(files: Iterable[str], pattern: str) -> pd.DataFrame: + data = [] + for f in files: + attr = reverse_format(pattern, f) + attr[FILE] = f + data.append(attr) + return pd.DataFrame(data) + + +class CMCCSentinelSource(GeokubeSource): + name = "cmcc_sentinel_geokube" + version = "0.0.1" + + def __init__( + self, + path: str, + pattern: str = None, + zippath: str = None, + zippattern: str = None, + metadata=None, + xarray_kwargs: dict = None, + mapping: Optional[Mapping[str, Mapping[str, str]]] = None, + **kwargs, + ): + super().__init__(metadata=metadata, **kwargs) + self._kube = None + self.path = path + self.pattern = pattern + self.zippath = zippath + self.zippattern = zippattern + self.mapping = mapping + self.metadata_caching = False + self.xarray_kwargs = {} if xarray_kwargs is None else xarray_kwargs + self._unzip_dir = get_temp_dir() + self._zipdf = None + self._jp2df = None + assert ( + SENSING_TIME_ATTR in self.pattern + ), f"{SENSING_TIME_ATTR} is missing in the pattern" + self.preprocess = partial( + preprocess_sentinel, + pattern=self.pattern, + ) + if self.geoquery: + self.filters = self.geoquery.filters + else: + self.filters = {} + + def __post_init__(self) -> None: + assert ( + SENSING_TIME_ATTR in self.pattern + ), f"{SENSING_TIME_ATTR} is missing in the pattern" + self.preprocess = partial( + preprocess_sentinel, + pattern=self.pattern, + ) + + def _compute_res_df(self) -> List[str]: + self._zipdf = self._get_files_attr() + self._maybe_select_by_zip_attrs() + _ = unzip_data(self._zipdf[FILE].values, target=self._unzip_dir) + self._create_jp2_df() + self._maybe_select_by_jp2_attrs() + + def _get_files_attr(self) -> pd.DataFrame: + df = _prepare_df_from_files( + get_zip_files_from_path(self.path), self.pattern + ) + assert ( + SENSING_TIME_ATTR in df + ), f"{SENSING_TIME_ATTR} column is missing" + return df.set_index(SENSING_TIME_ATTR).sort_index() + + def _maybe_select_by_zip_attrs(self) -> Optional[pd.DataFrame]: + filters_to_pop = [] + for flt in self.filters: + if flt in self._zipdf.columns: + self._zipdf = self._zipdf.set_index(flt) + if flt == self._zipdf.index.name: + self._zipdf = self._zipdf.loc[self.filters[flt]] + filters_to_pop.append(flt) + for f in filters_to_pop: + self.filters.pop(f) + self._zipdf = self._zipdf.reset_index() + + + def _create_jp2_df(self) -> None: + self._jp2df = _prepare_df_from_files( + glob.iglob(os.path.join(self._unzip_dir, self.zippath)), + os.path.join(self._unzip_dir, self.zippattern), + ) + + def _maybe_select_by_jp2_attrs(self): + filters_to_pop = [] + for key, value in self.filters.items(): + if key not in self._jp2df: + continue + if isinstance(value, str): + self._jp2df = self._jp2df[self._jp2df[key] == value] + elif isinstance(value, Iterable): + self._jp2df = self._jp2df[self._jp2df[key].isin(value)] + else: + raise TypeError(f"type `{type(value)}` is not supported!") + filters_to_pop.append(key) + for f in filters_to_pop: + self.filters.pop(f) + + def _open_dataset(self): + self._compute_res_df() + self._jp2df + cubes = [] + for i, row in self._jp2df.iterrows(): + cubes.append( + dask.delayed(open_datacube)( + path=row[FILE], + id_pattern=None, + mapping=self.mapping, + metadata_caching=self.metadata_caching, + **self.xarray_kwargs, + preprocess=self.preprocess, + ) + ) + self._jp2df[DATACUBE] = cubes + self._kube = Dataset(self._jp2df.reset_index(drop=True)) + self.geoquery.filters = self.filters + return self._kube diff --git a/drivers/intake_geokube/wrf.py b/drivers/intake_geokube/wrf.py index 1968e40..196ff68 100644 --- a/drivers/intake_geokube/wrf.py +++ b/drivers/intake_geokube/wrf.py @@ -124,6 +124,7 @@ def __init__( load_files_on_persistance: Optional[bool] = True, variables_to_keep: Optional[Union[str, list[str]]] = None, variables_to_skip: Optional[Union[str, list[str]]] = None, + **kwargs ): self._kube = None self.path = path @@ -142,7 +143,7 @@ def __init__( variables_to_skip=variables_to_skip, ) # self.xarray_kwargs.update({'engine' : 'netcdf'}) - super(CMCCWRFSource, self).__init__(metadata=metadata) + super(CMCCWRFSource, self).__init__(metadata=metadata, **kwargs) def _open_dataset(self): if self.pattern is None: diff --git a/drivers/setup.py b/drivers/setup.py index b3a3032..7723f8b 100644 --- a/drivers/setup.py +++ b/drivers/setup.py @@ -13,12 +13,12 @@ long_description_content_type="text/markdown", url="https://github.com/geokube/intake-geokube", packages=setuptools.find_packages(), - install_requires=["intake", "pytest"], + install_requires=["intake", "pytest", "pydantic<2.0.0"], entry_points={ "intake.drivers": [ "geokube_netcdf = intake_geokube.netcdf:NetCDFSource", "cmcc_wrf_geokube = intake_geokube.wrf:CMCCWRFSource", - "cmcc_afm_geokube = intake_geokube.afm:CMCCAFMSource", + "cmcc_sentinel_geokube = intake_geokube.sentinel:CMCCSentinelSource" ] }, classifiers=[ diff --git a/executor/app/main.py b/executor/app/main.py index 5daf7e6..6dc3566 100644 --- a/executor/app/main.py +++ b/executor/app/main.py @@ -110,6 +110,7 @@ def persist_datacube( kube._properties["history"] = get_history_message() if isinstance(message.content, GeoQuery): format = message.content.format + format_args = message.content.format_args else: format = "netcdf" match format: @@ -119,6 +120,15 @@ def persist_datacube( case "geojson": full_path = os.path.join(base_path, f"{path}.json") kube.to_geojson(full_path) + case "png": + full_path = os.path.join(base_path, f"{path}.png") + kube.to_image(full_path, **format_args) + case "jpeg": + full_path = os.path.join(base_path, f"{path}.jpg") + kube.to_image(full_path, **format_args) + case "csv": + full_path = os.path.join(base_path, f"{path}.csv") + kube.to_csv(full_path) case _: raise ValueError(f"format `{format}` is not supported") return full_path @@ -132,7 +142,9 @@ def persist_dataset( def _get_attr_comb(dataframe_item, attrs): return "_".join([dataframe_item[attr_name] for attr_name in attrs]) - def _persist_single_datacube(dataframe_item, base_path, format): + def _persist_single_datacube(dataframe_item, base_path, format, format_args=None): + if not format_args: + format_args = {} dcube = dataframe_item[dset.DATACUBE_COL] if isinstance(dcube, Delayed): dcube = dcube.compute() @@ -169,14 +181,24 @@ def _persist_single_datacube(dataframe_item, base_path, format): case "geojson": full_path = os.path.join(base_path, f"{path}.json") dcube.to_geojson(full_path) + case "png": + full_path = os.path.join(base_path, f"{path}.png") + dcube.to_image(full_path, **format_args) + case "jpeg": + full_path = os.path.join(base_path, f"{path}.jpg") + dcube.to_image(full_path, **format_args) + case "csv": + full_path = os.path.join(base_path, f"{path}.csv") + dcube.to_csv(full_path) return full_path if isinstance(message.content, GeoQuery): format = message.content.format + format_args = message.content.format_args else: format = "netcdf" datacubes_paths = dset.data.apply( - _persist_single_datacube, base_path=base_path, format=format, axis=1 + _persist_single_datacube, base_path=base_path, format=format, format_args=format_args, axis=1 ) paths = datacubes_paths[~datacubes_paths.isna()] if len(paths) == 0: