diff --git a/.gitignore b/.gitignore
index 1856070..ffcbef0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -112,4 +112,5 @@ venv.bak/
_catalogs/
_old/
-.DS_Store
\ No newline at end of file
+.DS_Store
+db_init
\ No newline at end of file
diff --git a/api/app/endpoint_handlers/dataset.py b/api/app/endpoint_handlers/dataset.py
index a3f8ca5..109eb0c 100644
--- a/api/app/endpoint_handlers/dataset.py
+++ b/api/app/endpoint_handlers/dataset.py
@@ -3,7 +3,9 @@
import pika
from typing import Optional
-from dbmanager.dbmanager import DBManager
+from fastapi.responses import FileResponse
+
+from dbmanager.dbmanager import DBManager, RequestStatus
from geoquery.geoquery import GeoQuery
from geoquery.task import TaskList
from datastore.datastore import Datastore, DEFAULT_MAX_REQUEST_SIZE_GB
@@ -18,12 +20,18 @@
from api_utils import make_bytes_readable_dict
from validation import assert_product_exists
+from . import request
log = get_dds_logger(__name__)
data_store = Datastore()
MESSAGE_SEPARATOR = os.environ["MESSAGE_SEPARATOR"]
+def _is_etimate_enabled(dataset_id, product_id):
+ if dataset_id in ("sentinel-2",):
+ return False
+ return True
+
@log_execution_time(log)
def get_datasets(user_roles_names: list[str]) -> list[dict]:
@@ -213,7 +221,7 @@ def estimate(
@log_execution_time(log)
@assert_product_exists
-def query(
+def async_query(
user_id: str,
dataset_id: str,
product_id: str,
@@ -250,21 +258,22 @@ def query(
"""
log.debug("geoquery: %s", query)
- estimated_size = estimate(dataset_id, product_id, query, "GB").get("value")
- allowed_size = data_store.product_metadata(dataset_id, product_id).get(
- "maximum_query_size_gb", DEFAULT_MAX_REQUEST_SIZE_GB
- )
- if estimated_size > allowed_size:
- raise exc.MaximumAllowedSizeExceededError(
- dataset_id=dataset_id,
- product_id=product_id,
- estimated_size_gb=estimated_size,
- allowed_size_gb=allowed_size,
- )
- if estimated_size == 0.0:
- raise exc.EmptyDatasetError(
- dataset_id=dataset_id, product_id=product_id
+ if _is_etimate_enabled(dataset_id, product_id):
+ estimated_size = estimate(dataset_id, product_id, query, "GB").get("value")
+ allowed_size = data_store.product_metadata(dataset_id, product_id).get(
+ "maximum_query_size_gb", DEFAULT_MAX_REQUEST_SIZE_GB
)
+ if estimated_size > allowed_size:
+ raise exc.MaximumAllowedSizeExceededError(
+ dataset_id=dataset_id,
+ product_id=product_id,
+ estimated_size_gb=estimated_size,
+ allowed_size_gb=allowed_size,
+ )
+ if estimated_size == 0.0:
+ raise exc.EmptyDatasetError(
+ dataset_id=dataset_id, product_id=product_id
+ )
broker_conn = pika.BlockingConnection(
pika.ConnectionParameters(
host=os.getenv("BROKER_SERVICE_HOST", "broker")
@@ -295,6 +304,68 @@ def query(
broker_conn.close()
return request_id
+@log_execution_time(log)
+@assert_product_exists
+def sync_query(
+ user_id: str,
+ dataset_id: str,
+ product_id: str,
+ query: GeoQuery,
+):
+ """Realize the logic for the endpoint:
+
+ `POST /datasets/{dataset_id}/{product_id}/execute`
+
+ Query the data and return the result of the request.
+
+ Parameters
+ ----------
+ user_id : str
+ ID of the user executing the query
+ dataset_id : str
+ ID of the dataset
+ product_id : str
+ ID of the product
+ query : GeoQuery
+ Query to perform
+
+ Returns
+ -------
+ request_id : int
+ ID of the request
+
+ Raises
+ -------
+ MaximumAllowedSizeExceededError
+ if the allowed size is below the estimated one
+ EmptyDatasetError
+ if estimated size is zero
+
+ """
+
+ import time
+ request_id = async_query(user_id, dataset_id, product_id, query)
+ status, _ = DBManager().get_request_status_and_reason(request_id)
+ log.debug("sync query: status: %s", status)
+ while status in (RequestStatus.RUNNING, RequestStatus.QUEUED,
+ RequestStatus.PENDING):
+ time.sleep(1)
+ status, _ = DBManager().get_request_status_and_reason(request_id)
+ log.debug("sync query: status: %s", status)
+
+ if status is RequestStatus.DONE:
+ download_details = DBManager().get_download_details_for_request_id(
+ request_id
+ )
+ return FileResponse(
+ path=download_details.location_path,
+ filename=download_details.location_path.split(os.sep)[-1],
+ )
+ raise exc.ProductRetrievingError(
+ dataset_id=dataset_id,
+ product_id=product_id,
+ status=status.name)
+
@log_execution_time(log)
def run_workflow(
diff --git a/api/app/endpoint_handlers/request.py b/api/app/endpoint_handlers/request.py
index 320bceb..93a0636 100644
--- a/api/app/endpoint_handlers/request.py
+++ b/api/app/endpoint_handlers/request.py
@@ -86,7 +86,11 @@ def get_request_resulting_size(request_id: int):
If the request was not found
"""
if request := DBManager().get_request_details(request_id):
- return request.download.size_bytes
+ size = request.download.size_bytes
+ if not size or size == 0:
+ raise exc.EmptyDatasetError(dataset_id=request.dataset,
+ product_id=request.product)
+ return size
log.info(
"request with id '%s' could not be found",
request_id,
diff --git a/api/app/exceptions.py b/api/app/exceptions.py
index 01de71c..af4d072 100644
--- a/api/app/exceptions.py
+++ b/api/app/exceptions.py
@@ -180,3 +180,16 @@ def __init__(self, dataset_id, product_id):
product_id=product_id,
)
super().__init__(self.msg)
+
+class ProductRetrievingError(BaseDDSException):
+ """Retrieving of the product failed."""
+
+ msg: str = "Retrieving of the product '{dataset_id}.{product_id}' failed with the status {status}"
+
+ def __init__(self, dataset_id, product_id, status):
+ self.msg = self.msg.format(
+ dataset_id=dataset_id,
+ product_id=product_id,
+ status=status
+ )
+ super().__init__(self.msg)
\ No newline at end of file
diff --git a/api/app/main.py b/api/app/main.py
index d0f5ad8..893a907 100644
--- a/api/app/main.py
+++ b/api/app/main.py
@@ -1,9 +1,13 @@
"""Main module with geolake API endpoints defined"""
__version__ = "0.1.0"
import os
-from typing import Optional
+import re
+from typing import Optional, Dict
+from datetime import datetime
-from fastapi import FastAPI, HTTPException, Request, status
+from oai_dcat.metadata_provider import BASE_URL
+from oai_dcat.oai_utils import serialize_and_concatenate_graphs, convert_to_dcat_ap_it
+from fastapi import FastAPI, HTTPException, Request, status, Query, Response
from fastapi.middleware.cors import CORSMiddleware
from starlette.middleware.authentication import AuthenticationMiddleware
from starlette.authentication import requires
@@ -16,8 +20,8 @@
)
from aioprometheus.asgi.starlette import metrics
-from geoquery.geoquery import GeoQuery
from geoquery.task import TaskList
+from geoquery.geoquery import GeoQuery
from utils.api_logging import get_dds_logger
import exceptions as exc
@@ -31,6 +35,33 @@
from encoders import extend_json_encoders
from const import venv, tags
from auth import scopes
+from oai_dcat import oai_server
+
+def map_to_geoquery(
+ variables: list[str],
+ format: str,
+ bbox: str | None = None, # minx, miny, maxx, maxy (minlon, minlat, maxlon, maxlat)
+ time: datetime | None = None,
+ filters: Optional[Dict] = None,
+ **format_kwargs
+) -> GeoQuery:
+
+ if bbox:
+ bbox_ = [float(x) for x in bbox.split(',')]
+ area = { 'west': bbox_[0], 'south': bbox_[1], 'east': bbox_[2], 'north': bbox_[3], }
+ else:
+ area = None
+ if time:
+ time_ = { 'year': time.year, 'month': time.month, 'day': time.day, 'hour': time.hour}
+ else:
+ time_ = None
+ if filters:
+ query = GeoQuery(variable=variables, time=time_, area=area, filters=filters,
+ format_args=format_kwargs, format=format)
+ else:
+ query = GeoQuery(variable=variables, time=time_, area=area,
+ format_args=format_kwargs, format=format)
+ return query
logger = get_dds_logger(__name__)
@@ -155,7 +186,251 @@ async def get_product_details(
except exc.BaseDDSException as err:
raise err.wrap_around_http_exception() from err
+@app.get("/datasets/{dataset_id}/{product_id}/map", tags=[tags.DATASET])
+@timer(
+ app.state.api_request_duration_seconds,
+ labels={"route": "GET /datasets/{dataset_id}/{product_id}"},
+)
+async def get_map(
+ request: Request,
+ dataset_id: str,
+ product_id: str,
+# OGC WMS parameters
+ width: int,
+ height: int,
+ dpi: int | None = 100,
+ layers: str | None = None,
+ format: str | None = 'png',
+ time: datetime | None = None,
+ transparent: bool | None = 'true',
+ bgcolor: str | None = 'FFFFFF',
+ cmap: str | None = 'RdBu_r',
+ bbox: str | None = None, # minx, miny, maxx, maxy (minlon, minlat, maxlon, maxlat)
+ crs: str | None = None,
+ vmin: float | None = None,
+ vmax: float | None = None
+# OGC map parameters
+ # subset: str | None = None,
+ # subset_crs: str | None = Query(..., alias="subset-crs"),
+ # bbox_crs: str | None = Query(..., alias="bbox-crs"),
+):
+
+ app.state.api_http_requests_total.inc(
+ {"route": "GET /datasets/{dataset_id}/{product_id}/map"}
+ )
+ # query should be the OGC query
+ # map OGC parameters to GeoQuery
+ # variable: Optional[Union[str, List[str]]]
+ # time: Optional[Union[Dict[str, str], Dict[str, List[str]]]]
+ # area: Optional[Dict[str, float]]
+ # location: Optional[Dict[str, Union[float, List[float]]]]
+ # vertical: Optional[Union[float, List[float], Dict[str, float]]]
+ # filters: Optional[Dict]
+ # format: Optional[str]
+
+ query = map_to_geoquery(variables=layers, bbox=bbox, time=time,
+ format="png", width=width, height=height,
+ transparent=transparent, bgcolor=bgcolor,
+ dpi=dpi, cmap=cmap, projection=crs,
+ vmin=vmin, vmax=vmax)
+ try:
+ return dataset_handler.sync_query(
+ user_id=request.user.id,
+ dataset_id=dataset_id,
+ product_id=product_id,
+ query=query
+ )
+ except exc.BaseDDSException as err:
+ raise err.wrap_around_http_exception() from err
+
+@app.get("/datasets/{dataset_id}/{product_id}/{filters:path}/map", tags=[tags.DATASET])
+@timer(
+ app.state.api_request_duration_seconds,
+ labels={"route": "GET /datasets/{dataset_id}/{product_id}"},
+)
+async def get_map_with_filters(
+ request: Request,
+ dataset_id: str,
+ product_id: str,
+ filters: str,
+# OGC WMS parameters
+ width: int,
+ height: int,
+ dpi: int | None = 100,
+ layers: str | None = None,
+ format: str | None = 'png',
+ time: datetime | None = None,
+ transparent: bool | None = 'true',
+ bgcolor: str | None = 'FFFFFF',
+ cmap: str | None = 'RdBu_r',
+ bbox: str | None = None, # minx, miny, maxx, maxy (minlon, minlat, maxlon, maxlat)
+ crs: str | None = None,
+ vmin: float | None = None,
+ vmax: float | None = None
+# OGC map parameters
+ # subset: str | None = None,
+ # subset_crs: str | None = Query(..., alias="subset-crs"),
+ # bbox_crs: str | None = Query(..., alias="bbox-crs"),
+):
+ filters_vals = filters.split("/")
+
+ if dataset_id in ['rs-indices', 'pasture']:
+ filters_dict = {'pasture': filters_vals[0]}
+
+ else:
+ try:
+ product_info = dataset_handler.get_product_details(
+ user_roles_names=request.auth.scopes,
+ dataset_id=dataset_id,
+ product_id=product_id,
+ )
+ except exc.BaseDDSException as err:
+ raise err.wrap_around_http_exception() from err
+
+ filters_keys = product_info['metadata']['filters']
+ filters_dict = {}
+ for i in range(0, len(filters_vals)):
+ filters_dict[filters_keys[i]['name']] = filters_vals[i]
+
+ app.state.api_http_requests_total.inc(
+ {"route": "GET /datasets/{dataset_id}/{product_id}/map"}
+ )
+ # query should be the OGC query
+ # map OGC parameters to GeoQuery
+ # variable: Optional[Union[str, List[str]]]
+ # time: Optional[Union[Dict[str, str], Dict[str, List[str]]]]
+ # area: Optional[Dict[str, float]]
+ # location: Optional[Dict[str, Union[float, List[float]]]]
+ # vertical: Optional[Union[float, List[float], Dict[str, float]]]
+ # filters: Optional[Dict]
+ # format: Optional[str]
+
+ query = map_to_geoquery(variables=layers, bbox=bbox, time=time, filters=filters_dict,
+ format="png", width=width, height=height,
+ transparent=transparent, bgcolor=bgcolor,
+ dpi=dpi, cmap=cmap, projection=crs, vmin=vmin, vmax=vmax)
+
+ try:
+ return dataset_handler.sync_query(
+ user_id=request.user.id,
+ dataset_id=dataset_id,
+ product_id=product_id,
+ query=query
+ )
+ except exc.BaseDDSException as err:
+ raise err.wrap_around_http_exception() from err
+@app.get("/datasets/{dataset_id}/{product_id}/items/{feature_id}", tags=[tags.DATASET])
+@timer(
+ app.state.api_request_duration_seconds,
+ labels={"route": "GET /datasets/{dataset_id}/{product_id}/items/{feature_id}"},
+)
+async def get_feature(
+ request: Request,
+ dataset_id: str,
+ product_id: str,
+ feature_id: str,
+# OGC feature parameters
+ time: datetime | None = None,
+ bbox: str | None = None, # minx, miny, maxx, maxy (minlon, minlat, maxlon, maxlat)
+ crs: str | None = None,
+# OGC map parameters
+ # subset: str | None = None,
+ # subset_crs: str | None = Query(..., alias="subset-crs"),
+ # bbox_crs: str | None = Query(..., alias="bbox-crs"),
+):
+
+ app.state.api_http_requests_total.inc(
+ {"route": "GET /datasets/{dataset_id}/{product_id}/items/{feature_id}"}
+ )
+ # query should be the OGC query
+ # feature OGC parameters to GeoQuery
+ # variable: Optional[Union[str, List[str]]]
+ # time: Optional[Union[Dict[str, str], Dict[str, List[str]]]]
+ # area: Optional[Dict[str, float]]
+ # location: Optional[Dict[str, Union[float, List[float]]]]
+ # vertical: Optional[Union[float, List[float], Dict[str, float]]]
+ # filters: Optional[Dict]
+ # format: Optional[str]
+
+ query = map_to_geoquery(variables=[feature_id], bbox=bbox, time=time,
+ format="geojson")
+ try:
+ return dataset_handler.sync_query(
+ user_id=request.user.id,
+ dataset_id=dataset_id,
+ product_id=product_id,
+ query=query
+ )
+ except exc.BaseDDSException as err:
+ raise err.wrap_around_http_exception() from err
+
+@app.get("/datasets/{dataset_id}/{product_id}/{filters:path}/items/{feature_id}", tags=[tags.DATASET])
+@timer(
+ app.state.api_request_duration_seconds,
+ labels={"route": "GET /datasets/{dataset_id}/{product_id}/items/{feature_id}"},
+)
+async def get_feature_with_filters(
+ request: Request,
+ dataset_id: str,
+ product_id: str,
+ feature_id: str,
+ filters: str,
+# OGC feature parameters
+ time: datetime | None = None,
+ bbox: str | None = None, # minx, miny, maxx, maxy (minlon, minlat, maxlon, maxlat)
+ crs: str | None = None,
+# OGC map parameters
+ # subset: str | None = None,
+ # subset_crs: str | None = Query(..., alias="subset-crs"),
+ # bbox_crs: str | None = Query(..., alias="bbox-crs"),
+):
+ filters_vals = filters.split("/")
+
+ if dataset_id in ['rs-indices', 'pasture']:
+ filters_dict = {'pasture': filters_vals[0]}
+
+ else:
+ try:
+ product_info = dataset_handler.get_product_details(
+ user_roles_names=request.auth.scopes,
+ dataset_id=dataset_id,
+ product_id=product_id,
+ )
+ except exc.BaseDDSException as err:
+ raise err.wrap_around_http_exception() from err
+
+ filters_keys = product_info['metadata']['filters']
+ filters_dict = {}
+ for i in range(0, len(filters_vals)):
+ filters_dict[filters_keys[i]['name']] = filters_vals[i]
+
+ app.state.api_http_requests_total.inc(
+ {"route": "GET /datasets/{dataset_id}/{product_id}/items/{feature_id}"}
+ )
+ # query should be the OGC query
+ # feature OGC parameters to GeoQuery
+ # variable: Optional[Union[str, List[str]]]
+ # time: Optional[Union[Dict[str, str], Dict[str, List[str]]]]
+ # area: Optional[Dict[str, float]]
+ # location: Optional[Dict[str, Union[float, List[float]]]]
+ # vertical: Optional[Union[float, List[float], Dict[str, float]]]
+ # filters: Optional[Dict]
+ # format: Optional[str]
+
+ query = map_to_geoquery(variables=[feature_id], bbox=bbox, time=time, filters=filters_dict,
+ format="geojson")
+ try:
+ return dataset_handler.sync_query(
+ user_id=request.user.id,
+ dataset_id=dataset_id,
+ product_id=product_id,
+ query=query
+ )
+ except exc.BaseDDSException as err:
+ raise err.wrap_around_http_exception() from err
+
+
@app.get("/datasets/{dataset_id}/{product_id}/metadata", tags=[tags.DATASET])
@timer(
app.state.api_request_duration_seconds,
@@ -222,7 +497,7 @@ async def query(
{"route": "POST /datasets/{dataset_id}/{product_id}/execute"}
)
try:
- return dataset_handler.query(
+ return dataset_handler.async_query(
user_id=request.user.id,
dataset_id=dataset_id,
product_id=product_id,
@@ -355,3 +630,53 @@ async def download_request_result(
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="File was not found!"
) from err
+
+# Define OAI-PMH endpoint route
+@app.get("/oai/{dataset_id}")
+@app.post("/oai/{dataset_id}")
+async def oai(request: Request, dataset_id: str):
+ params = dict(request.query_params)
+
+ # Add dataset_id to the parameters as "set_", which is a parameter from the OAI-PMH protocol
+ params['set'] = dataset_id
+ # params['scopes'] = request.auth.scopes
+
+ # Making sure it uses the dcat_ap metadata prefix
+ if 'metadataPrefix' not in params:
+ params['metadataPrefix'] = 'dcat_ap'
+
+ # handleRequest points the request to the appropriate method in metadata_provider.py
+ response = oai_server.oai_server.handleRequest(params)
+ logger.debug(f"OAI-PMH Response: {response}")
+ # Replace date in datestamp by empty string
+ response = re.sub(b'.*', b'', response)
+ return Response(content=response, media_type="text/xml")
+
+# Define an endpoint for getting all the datasets
+@app.get("/oai")
+@app.post("/oai")
+async def oai_all_datasets(request: Request):
+ params = dict(request.query_params)
+
+ # Making sure it uses the dcat_ap metadata prefix
+ if 'metadataPrefix' not in params:
+ params['metadataPrefix'] = 'dcat_ap'
+
+ # handleRequest points the request to the appropriate method in metadata_provider.py
+ response = oai_server.oai_server.handleRequest(params)
+ logger.debug(f"OAI-PMH Response: {response}")
+ # Replace date in datestamp by empty string
+ response = re.sub(b'.*', b'', response)
+ return Response(content=response, media_type="text/xml")
+
+# Endpoint for generating DCAT-AP IT catalog
+@app.get("/dcatapit")
+async def dcatapit(request: Request):
+ data = dataset_handler.get_datasets(
+ user_roles_names=request.auth.scopes
+ )
+ catalog_graph, datasets_graph, distributions_graph, vcard_graph = convert_to_dcat_ap_it(data, BASE_URL)
+ # response = dcatapit_graph.serialize(format='pretty-xml')
+ response = serialize_and_concatenate_graphs(catalog_graph, datasets_graph, distributions_graph, vcard_graph)
+
+ return Response(content=response, media_type="application/rdf+xml")
\ No newline at end of file
diff --git a/api/app/oai_dcat/__init__.py b/api/app/oai_dcat/__init__.py
new file mode 100644
index 0000000..3654022
--- /dev/null
+++ b/api/app/oai_dcat/__init__.py
@@ -0,0 +1,3 @@
+# from . import metadata_provider as metadata_provider
+from . import oai_server as oai_server
+# from . import oai_utils as oai_utils
diff --git a/api/app/oai_dcat/metadata_provider.py b/api/app/oai_dcat/metadata_provider.py
new file mode 100644
index 0000000..6f140ff
--- /dev/null
+++ b/api/app/oai_dcat/metadata_provider.py
@@ -0,0 +1,101 @@
+from oaipmh.common import Identify, Metadata, Header
+from datetime import datetime
+from lxml import etree
+from lxml.etree import Element
+import json
+from .oai_utils import convert_to_dcat_ap
+import logging
+
+import exceptions as exc
+from endpoint_handlers import dataset_handler
+
+BASE_URL = "https://sebastien-datalake.cmcc.it/api/v2/datasets"
+
+# Logging config
+logging.basicConfig(level=logging.DEBUG)
+
+# Each method in this class is a verb from the OAI-PMH protocol. Only listRecords is used by the data.europa harvester
+class MyMetadataProvider:
+ # Method to list records, only method used by data.europa harvester
+ def listRecords(self, metadataPrefix='dcat_ap', from_=None, until=None, set=None):
+ logging.debug("Fetching data from API")
+ # Fetch data from the dataset endpoint
+ '''
+ data = main.fetch_data(
+ f"{BASE_URL}{set}"
+ )
+ '''
+ if set:
+ dataset_url = f"{BASE_URL}/{set}"
+
+ try:
+ data = dataset_handler.get_product_details(
+ user_roles_names=['public'],
+ dataset_id=set,
+ )
+ except exc.BaseDDSException as err:
+ raise err.wrap_around_http_exception() from err
+
+ else:
+ dataset_url = BASE_URL
+ data = dataset_handler.get_datasets(user_roles_names=['public'])
+
+ logging.debug(f"Fetched data: {data}")
+
+ # Convert to RDF graph with proper DCAT-AP fields (URL is being used to fill the accessURL field)
+ rdf_graph = convert_to_dcat_ap(data, f"{BASE_URL}{set}")
+
+ # Serialize the RDF graph into a string, 'pretty-xml' format makes it more readable
+ rdf_string = rdf_graph.serialize(format='pretty-xml')
+ logging.debug(f"RDF string: {rdf_string}")
+
+ # Create a header (mandatory for OAI-PMH)
+ header_element = Element("header")
+ header = Header(deleted=False, element=header_element, identifier="", datestamp=datetime.utcnow(), setspec=[])
+
+ # Create metadata element and fill it with the RDF/XML string
+ metadata_element = Element("metadata")
+ metadata = Metadata(element=metadata_element, map={"rdf": rdf_string})
+
+
+ return [(header, metadata, [])], None
+
+ # The remaining methods are only present because they are mandatory for the OAI-PMH protocol
+
+ # Minimal implementation for identify
+ def identify(self):
+ return Identify(
+ repositoryName='Sebastien DataLake', # Name of the repository
+ baseURL='', # Base URL of the OAI-PMH endpoint
+ protocolVersion='2.0', # OAI-PMH protocol version
+ adminEmails=['sebastien_support@cmcc.it'], # List of admin email addresses
+ earliestDatestamp=datetime(2024, 1, 1), # Earliest datestamp for records
+ deletedRecord='no', # Policy on deleted records
+ granularity='YYYY-MM-DDThh:mm:ssZ', # Date granularity
+ compression=['identity'], # Supported compression methods
+ )
+
+ # Minimal implementation for getRecord
+ def getRecord(self, identifier, metadataPrefix='oai_dc'):
+ # Create a header
+ header = Header(identifier=identifier, datestamp=datetime.now(), setspec=[])
+
+ # Create metadata element (empty in this example)
+ metadata = Metadata(element=Element("record"), map={})
+
+ return header, metadata, []
+
+ # Minimal implementation for listIdentifiers
+ def listIdentifiers(self, metadataPrefix='oai_dc', from_=None, until=None, set_=None):
+ # Create a header
+ header = Header(identifier="id1", datestamp=datetime.now(), setspec=[])
+
+ return [header]
+
+ # Minimal implementation for listMetadataFormats
+ def listMetadataFormats(self, identifier=None):
+ return [('oai_dc', 'http://www.openarchives.org/OAI/2.0/oai_dc.xsd', 'http://www.openarchives.org/OAI/2.0/oai_dc/')]
+
+ # Minimal implementation for listSets
+ def listSets(self):
+ return []
diff --git a/api/app/oai_dcat/oai_server.py b/api/app/oai_dcat/oai_server.py
new file mode 100644
index 0000000..c2ba898
--- /dev/null
+++ b/api/app/oai_dcat/oai_server.py
@@ -0,0 +1,34 @@
+from oaipmh.server import ServerBase, oai_dc_writer
+from oaipmh.metadata import MetadataRegistry, oai_dc_reader
+from . import metadata_provider
+from lxml.etree import fromstring, tostring
+import logging
+
+# Function to write metadata in dcat_ap format (RDF/XML), otherwise it would use the default format (oai_dc)
+def dcat_ap_writer(metadata_element, metadata):
+ rdf_string = metadata["rdf"]
+ rdf_element = fromstring(bytes(rdf_string, encoding='utf-8'))
+
+ for child in rdf_element:
+ metadata_element.append(child)
+ logging.debug(f"Metadata Element: {tostring(metadata_element, pretty_print=True).decode('utf-8')}")
+
+
+# Create reader for dcat_ap metadata
+def dcat_ap_reader(element):
+ rdf_string = tostring(element, encoding='unicode')
+ return {"rdf": rdf_string}
+
+# Server class, it defines writers and readers, as well as the metadata provider (metadata_provider.py)
+class MyServer(ServerBase):
+ def __init__(self):
+ metadata_registry = MetadataRegistry()
+ metadata_registry.registerWriter("oai_dc", oai_dc_writer)
+ metadata_registry.registerReader("oai_dc", oai_dc_reader)
+ metadata_registry.registerWriter("dcat_ap", dcat_ap_writer)
+ metadata_registry.registerReader("dcat_ap", dcat_ap_reader)
+ server = metadata_provider.MyMetadataProvider()
+ super(MyServer, self).__init__(server, metadata_registry)
+
+
+oai_server = MyServer()
diff --git a/api/app/oai_dcat/oai_utils.py b/api/app/oai_dcat/oai_utils.py
new file mode 100644
index 0000000..b45714f
--- /dev/null
+++ b/api/app/oai_dcat/oai_utils.py
@@ -0,0 +1,499 @@
+import re
+from rdflib import Graph, Literal, Namespace, RDF, URIRef, BNode
+from rdflib.namespace import DCAT, DCTERMS, FOAF, RDF, XSD
+import logging
+from datetime import datetime
+
+# Dictionary with accrualPeriodicity values for somw known datasets
+ACCRUAL_PERIODICITY = {
+ "blue-tongue": "AS_NEEDED",
+ "iot-animal": "HOURLY",
+ "pasture": "BIWEEKLY",
+ "pi": "DAILY",
+ "pi-long-term": "AS_NEEDED",
+ "thi": "DAILY",
+ "iot-environmental" : "IRREG"
+}
+
+# Logging config
+logging.basicConfig(level=logging.DEBUG)
+
+# Namespaces for DCAT-AP, to be binded to the RDF graph
+DCAT = Namespace("http://www.w3.org/ns/dcat#")
+DCT = Namespace("http://purl.org/dc/terms/")
+FOAF = Namespace("http://xmlns.com/foaf/0.1/")
+VCARD = Namespace("http://www.w3.org/2006/vcard/ns#")
+EDP = Namespace("https://europeandataportal.eu/voc#")
+SPDX = Namespace("http://spdx.org/rdf/terms#")
+ADMS = Namespace("http://www.w3.org/ns/adms#")
+DQV = Namespace("http://www.w3.org/ns/dqv#")
+SKOS = Namespace("http://www.w3.org/2004/02/skos/core#")
+SCHEMA = Namespace("http://schema.org/")
+# Namespace for DCAT-AP IT
+DCATAPIT = Namespace("http://dati.gov.it/onto/dcatapit#")
+
+
+# Define classes for DCAT-AP entities (Dataset, Distribution and ContactPoint)
+
+class ContactPoint:
+ def __init__(self, name=None, email=None, webpage=None):
+ self.name = name
+ self.email = email
+ self.webpage = webpage
+
+
+class Distribution:
+ def __init__(self, access_url=None, description=None, download_url=None,
+ media_type=None, format=None, rights=None, license=None, identifier=None):
+ self.access_url = access_url
+ self.description = description
+ self.download_url = download_url
+ self.media_type = media_type
+ self.format = format
+ self.rights = rights
+ self.license = license
+ self.identifier = identifier
+
+ # Build the RDF graph for the distribution
+ def to_graph(self, g):
+ distribution = URIRef(self.uri)
+ g.add((distribution, RDF.type, DCAT.Distribution))
+ if self.access_url:
+ g.add((distribution, DCAT.accessURL, URIRef(self.access_url)))
+ if self.description:
+ g.add((distribution, DCTERMS.description, Literal(self.description)))
+ if self.download_url:
+ g.add((distribution, DCAT.downloadURL, URIRef(self.download_url)))
+ if self.media_type:
+ g.add((distribution, DCTERMS.mediaType, URIRef(self.media_type)))
+ if self.format:
+ g.add((distribution, DCTERMS.format, URIRef(self.format)))
+ if self.rights:
+ rights_bnode = BNode()
+ g.add((distribution, DCTERMS.rights, rights_bnode))
+ g.add((rights_bnode, RDF.type, DCTERMS.RightsStatement))
+ g.add((rights_bnode, DCTERMS.rights, URIRef(self.rights)))
+ if self.license:
+ license_bnode = BNode()
+ g.add((distribution, DCTERMS.license, license_bnode))
+ g.add((license_bnode, RDF.type, DCTERMS.LicenseDocument))
+ g.add((license_bnode, DCTERMS.license, URIRef(self.license)))
+ if self.identifier:
+ g.add((distribution, DCTERMS.identifier, Literal(self.identifier)))
+ return g
+
+
+class DatasetDCAT:
+ def __init__(self, uri, title=None, description=None, issued=None, identifier=None, contact_point=None):
+ self.uri = uri
+ self.title = title
+ self.description = description
+ self.issued = issued
+ self.identifier = identifier
+ self.contact_point = contact_point
+ self.distributions = []
+
+ def add_distribution(self, distribution):
+ self.distributions.append(distribution)
+
+ # Build the RDF graph for the dataset
+ def to_graph(self, g):
+ dataset = URIRef(self.uri)
+ g.add((dataset, RDF.type, DCAT.Dataset))
+ logging.debug(f"Adding to graph {g.identifier}: {dataset} a type {DCAT.Dataset}")
+
+ if self.title:
+ g.add((dataset, DCTERMS.title, Literal(self.title)))
+ if self.description:
+ g.add((dataset, DCTERMS.description, Literal(self.description)))
+ if self.issued:
+ g.add((dataset, DCTERMS.issued, Literal(self.issued, datatype=DCTERMS.W3CDTF)))
+ if self.identifier:
+ g.add((dataset, DCTERMS.identifier, Literal(self.identifier)))
+
+ if self.contact_point:
+ contact_bnode = BNode()
+ g.add((dataset, DCAT.contactPoint, contact_bnode))
+ g.add((contact_bnode, RDF.type, VCARD.Kind))
+ if self.contact_point.name:
+ g.add((contact_bnode, VCARD.fn, Literal(self.contact_point.name)))
+ if self.contact_point.email:
+ g.add((contact_bnode, VCARD.hasEmail, URIRef(f"mailto:{self.contact_point.email}")))
+ if self.contact_point.webpage:
+ g.add((contact_bnode, VCARD.hasURL, URIRef(self.contact_point.webpage)))
+
+ for dist in self.distributions:
+ distribution_bnode = BNode()
+ g.add((dataset, DCAT.distribution, distribution_bnode))
+ g.add((distribution_bnode, RDF.type, DCAT.Distribution))
+ if dist.access_url:
+ g.add((distribution_bnode, DCAT.accessURL, URIRef(dist.access_url)))
+ if dist.description:
+ g.add((distribution_bnode, DCTERMS.description, Literal(dist.description)))
+ if dist.download_url:
+ g.add((distribution_bnode, DCAT.downloadURL, URIRef(dist.download_url)))
+ if dist.media_type:
+ g.add((distribution_bnode, DCTERMS.mediaType, URIRef(dist.media_type)))
+ if dist.format:
+ g.add((distribution_bnode, DCTERMS.format, URIRef(dist.format)))
+ if dist.rights:
+ rights_bnode = BNode()
+ g.add((distribution_bnode, DCTERMS.rights, rights_bnode))
+ g.add((rights_bnode, RDF.type, DCTERMS.RightsStatement))
+ g.add((rights_bnode, DCTERMS.rights, URIRef(dist.rights)))
+ if dist.license:
+ license_bnode = BNode()
+ g.add((distribution_bnode, DCTERMS.license, license_bnode))
+ g.add((license_bnode, RDF.type, DCTERMS.LicenseDocument))
+ g.add((license_bnode, DCTERMS.license, URIRef(dist.license)))
+ if dist.identifier:
+ g.add((distribution_bnode, DCTERMS.identifier, Literal(dist.identifier)))
+
+ return g
+
+
+# Define classes for DCAT-AP IT entities (Catalog, Dataset, Distribution, and ContactPoint)
+
+class ContactPointIT:
+ def __init__(self, name=None, email=None, webpage=None):
+ self.name = name
+ self.email = email
+ self.webpage = webpage
+
+ def to_graph(self, g, parent):
+ contact_bnode = BNode()
+ g.add((parent, DCAT.contactPoint, contact_bnode))
+ g.add((contact_bnode, RDF.type, VCARD.Kind))
+ if self.name:
+ g.add((contact_bnode, VCARD.fn, Literal(self.name)))
+ if self.email:
+ g.add((contact_bnode, VCARD.hasEmail, URIRef(f"mailto:{self.email}")))
+ if self.webpage:
+ g.add((contact_bnode, VCARD.hasURL, URIRef(self.webpage)))
+
+
+class DistributionIT:
+ def __init__(self, uri, access_url=None, description=None, download_url=None,
+ media_type=None, format=None, rights=None, license=None, identifier=None):
+ self.uri = uri
+ self.access_url = access_url
+ self.description = description
+ self.download_url = download_url
+ self.media_type = media_type
+ self.format = format
+ self.rights = rights
+ self.license = license
+ self.identifier = identifier
+
+ def to_graph(self, g):
+ distribution = URIRef(self.uri)
+ g.add((distribution, RDF.type, DCATAPIT.Distribution))
+ if self.access_url:
+ g.add((distribution, DCAT.accessURL, URIRef(self.access_url)))
+ if self.description:
+ g.add((distribution, DCTERMS.description, Literal(self.description)))
+ if self.download_url:
+ g.add((distribution, DCAT.downloadURL, URIRef(self.download_url)))
+ if self.media_type:
+ g.add((distribution, DCTERMS.mediaType, URIRef(self.media_type)))
+ if self.format:
+ g.add((distribution, DCTERMS.format, URIRef(self.format)))
+ if self.rights:
+ rights_bnode = BNode()
+ g.add((distribution, DCTERMS.rights, rights_bnode))
+ g.add((rights_bnode, RDF.type, DCTERMS.RightsStatement))
+ g.add((rights_bnode, DCTERMS.rights, URIRef(self.rights)))
+ if self.license:
+ license_bnode = BNode()
+ g.add((distribution, DCTERMS.license, license_bnode))
+ g.add((license_bnode, RDF.type, DCTERMS.LicenseDocument))
+ g.add((license_bnode, DCTERMS.license, URIRef(self.license)))
+ if self.identifier:
+ g.add((distribution, DCTERMS.identifier, Literal(self.identifier)))
+
+
+class DatasetDCATAPIT:
+ def __init__(self, uri, title=None, description=None, issued=None, identifier=None, contact_point=None):
+ self.uri = uri
+ self.title = title
+ self.description = description
+ self.issued = issued
+ self.identifier = identifier
+ self.contact_point = contact_point
+ self.distributions = []
+
+ def add_distribution(self, distribution):
+ self.distributions.append(distribution)
+
+ def to_graph(self, g):
+ dataset = URIRef(self.uri)
+ g.add((dataset, RDF.type, DCATAPIT.Dataset))
+ if self.title:
+ g.add((dataset, DCTERMS.title, Literal(self.title)))
+ if self.description:
+ g.add((dataset, DCTERMS.description, Literal(self.description)))
+ if self.issued:
+ g.add((dataset, DCTERMS.issued, Literal(self.issued, datatype=DCTERMS.W3CDTF)))
+ if self.identifier:
+ g.add((dataset, DCTERMS.identifier, Literal(self.identifier)))
+
+ if self.contact_point:
+ self.contact_point.to_graph(g, dataset)
+
+ for dist in self.distributions:
+ distribution_uri = URIRef(dist.uri)
+ g.add((dataset, DCAT.distribution, distribution_uri))
+
+ return g
+
+
+class CatalogIT:
+ def __init__(self, uri, title, description, modified, publisher_name, publisher_identifier, publisher_homepage,
+ publisher_email, dataset_uris=None):
+ self.uri = uri
+ self.title = title
+ self.description = description
+ self.modified = modified
+ self.publisher_name = publisher_name
+ self.publisher_identifier = publisher_identifier
+ self.publisher_homepage = publisher_homepage
+ self.publisher_email = publisher_email
+ self.dataset_uris = dataset_uris if dataset_uris is not None else []
+
+ def add_dataset(self, dataset_uri):
+ self.dataset_uris.append(dataset_uri)
+
+ def to_graph(self, g):
+ catalog = URIRef(self.uri)
+ g.add((catalog, RDF.type, DCATAPIT.Catalog))
+ g.add((catalog, DCTERMS.title, Literal(self.title)))
+ g.add((catalog, DCTERMS.description, Literal(self.description)))
+ g.add((catalog, DCTERMS.modified, Literal(self.modified, datatype=DCTERMS.W3CDTF)))
+
+ catalog_publisher_node = BNode()
+ g.add((catalog, DCTERMS.publisher, catalog_publisher_node))
+ g.add((catalog_publisher_node, RDF.type, FOAF.Agent))
+ g.add((catalog_publisher_node, RDF.type, DCATAPIT.Agent))
+ g.add((catalog_publisher_node, FOAF.name, Literal(self.publisher_name)))
+ g.add((catalog_publisher_node, DCTERMS.identifier, Literal(self.publisher_identifier)))
+ g.add((catalog_publisher_node, FOAF.homepage, URIRef(self.publisher_homepage)))
+ g.add((catalog_publisher_node, FOAF.mbox, URIRef(f"mailto:{self.publisher_email}")))
+
+ for dataset_uri in self.dataset_uris:
+ g.add((catalog, DCAT.dataset, URIRef(dataset_uri)))
+
+ return g
+
+ # Function to convert to DCAT-AP IT format
+
+
+def convert_to_dcat_ap_it(data, url):
+ # Create separate graphs
+ catalog_graph = Graph()
+ datasets_graph = Graph()
+ distributions_graph = Graph()
+ vcard_graph = Graph()
+
+ # Bind namespaces to all graphs
+ for g in [catalog_graph, datasets_graph, distributions_graph, vcard_graph]:
+ g.bind("dcatapit", DCATAPIT)
+ g.bind("foaf", FOAF)
+ g.bind("dcat", DCAT)
+ g.bind("dct", DCT)
+ g.bind("vcard", VCARD)
+ g.bind("rdf", RDF)
+
+ # Contact point URI
+ contact_point_uri = URIRef("https://www.cmcc.it")
+
+ # Create catalog
+ catalog_uri = URIRef(url)
+ catalog_graph.add((catalog_uri, RDF.type, DCATAPIT.Catalog))
+ catalog_graph.add((catalog_uri, RDF.type, DCAT.Catalog))
+ catalog_graph.add((catalog_uri, DCTERMS.title, Literal("Sebastien Catalog")))
+ catalog_graph.add((catalog_uri, DCTERMS.description, Literal("A catalog of Sebastien datasets")))
+ catalog_graph.add((catalog_uri, FOAF.homepage, Literal(url)))
+ catalog_graph.add(
+ (catalog_uri, DCTERMS.language, Literal("http://publications.europa.eu/resource/authority/language/ITA")))
+ catalog_graph.add((catalog_uri, DCTERMS.modified, Literal(datetime.now(), datatype=XSD.dateTime)))
+
+ # Add publisher information
+ publisher = BNode()
+ catalog_graph.add((catalog_uri, DCTERMS.publisher, publisher))
+ catalog_graph.add((publisher, RDF.type, FOAF.Agent))
+ catalog_graph.add((publisher, RDF.type, DCATAPIT.Agent))
+ catalog_graph.add((publisher, FOAF.name, Literal("CMCC Foundation")))
+ catalog_graph.add((publisher, DCTERMS.identifier, Literal("XW88C90Q")))
+ catalog_graph.add((publisher, FOAF.homepage, URIRef("https://www.cmcc.it")))
+ catalog_graph.add((publisher, FOAF.mbox, URIRef("mailto:dds-support@cmcc.it")))
+
+ for i, dataset in enumerate(data, 1):
+ if "dataset" not in dataset:
+ dataset = {"dataset": dataset}
+ dataset_id = dataset.get("dataset", {}).get("metadata", {}).get("id")
+ dataset_uri = URIRef(f'{url}/{i}')
+
+ # Add dataset reference to catalog
+ catalog_graph.add((catalog_uri, DCAT.dataset, dataset_uri))
+
+ # Create dataset
+ datasets_graph.add((dataset_uri, RDF.type, DCATAPIT.Dataset))
+ datasets_graph.add((dataset_uri, RDF.type, DCAT.Dataset))
+ datasets_graph.add(
+ (dataset_uri, DCTERMS.title, Literal(dataset.get("dataset", {}).get("metadata", {}).get("label"))))
+ datasets_graph.add((dataset_uri, DCTERMS.description,
+ Literal(dataset.get("dataset", {}).get("metadata", {}).get("description"))))
+ datasets_graph.add((dataset_uri, DCTERMS.issued, Literal(
+ datetime.strptime(str(dataset.get("dataset", {}).get("metadata", {}).get("publication_date")), '%Y-%m-%d'),
+ datatype=XSD.dateTime)))
+ datasets_graph.add((dataset_uri, DCTERMS.identifier, Literal(f"XW88C90Q:{dataset_id}")))
+ datasets_graph.add(
+ (dataset_uri, DCTERMS.language, Literal("http://publications.europa.eu/resource/authority/language/ITA")))
+ # Add dct:modified, dcat:theme, dct:rightsHolder and dct:accrualPeriodicity
+ datasets_graph.add((dataset_uri, DCTERMS.modified, Literal(datetime.now(), datatype=XSD.dateTime)))
+ datasets_graph.add(
+ (dataset_uri, DCAT.theme, URIRef("http://publications.europa.eu/resource/authority/data-theme/AGRI")))
+ datasets_graph.add((dataset_uri, DCTERMS.accrualPeriodicity, URIRef(
+ f"http://publications.europa.eu/resource/authority/frequency/{ACCRUAL_PERIODICITY.get(dataset_id)}")))
+ # Add publisher info on dataset
+ publisher_dataset = BNode()
+ datasets_graph.add((dataset_uri, DCTERMS.publisher, publisher_dataset))
+ datasets_graph.add((publisher_dataset, RDF.type, FOAF.Agent))
+ datasets_graph.add((publisher_dataset, RDF.type, DCATAPIT.Agent))
+ datasets_graph.add((publisher_dataset, FOAF.name, Literal("CMCC Foundation")))
+ datasets_graph.add((publisher_dataset, DCTERMS.identifier, Literal("XW88C90Q")))
+ # Add rights holder BNode
+ rights_holder_uri = BNode()
+ datasets_graph.add((dataset_uri, DCTERMS.rightsHolder, rights_holder_uri))
+ datasets_graph.add((rights_holder_uri, RDF.type, DCATAPIT.Agent))
+ datasets_graph.add((rights_holder_uri, RDF.type, FOAF.Agent))
+ datasets_graph.add((rights_holder_uri, DCTERMS.identifier, Literal("XW88C90Q")))
+ datasets_graph.add((rights_holder_uri, FOAF.name, Literal("CMCC Foundation")))
+
+ # Add contact point
+ contact = dataset.get("dataset", {}).get("metadata", {}).get("contact")
+ datasets_graph.add((dataset_uri, DCAT.contactPoint, contact_point_uri))
+
+ # Create distribution
+ # products = dataset.get("dataset", {}).get("metadata", {}).get("products", {}).get("monthly", {})
+ distribution_uri = URIRef(f'{url}/{dataset_id}')
+ datasets_graph.add((dataset_uri, DCAT.distribution, distribution_uri))
+ distributions_graph.add((distribution_uri, RDF.type, DCAT.Distribution))
+ distributions_graph.add((distribution_uri, DCAT.accessURL, distribution_uri))
+ distributions_graph.add((distribution_uri, DCTERMS.title,
+ Literal(dataset.get("dataset", {}).get("metadata", {}).get("description"))))
+ distributions_graph.add((distribution_uri, DCTERMS.description,
+ Literal(dataset.get("dataset", {}).get("metadata", {}).get("description"))))
+ license_uri = URIRef("https://w3id.org/italia/controlled-vocabulary/licences/A21_CCBY40")
+ license_document = BNode()
+ distributions_graph.add((distribution_uri, DCTERMS.license, license_document))
+ distributions_graph.add((license_document, RDF.type, DCATAPIT.LicenseDocument))
+ distributions_graph.add(
+ (license_document, DCTERMS.type, URIRef("http://purl.org/adms/licencetype/Attribution")))
+ distributions_graph.add(
+ (license_document, FOAF.name, Literal("Creative Commons Attribuzione 4.0 Internazionale (CC BY 4.0)")))
+ distributions_graph.add((distribution_uri, DCTERMS.format,
+ URIRef("http://publications.europa.eu/resource/authority/file-type/JSON")))
+ distributions_graph.add((distribution_uri, RDF.type, DCATAPIT.Distribution))
+
+ # Create vcard:Organization node
+ contact = dataset.get("dataset", {}).get("metadata", {}).get("contact")
+ vcard_graph.add((contact_point_uri, RDF.type, VCARD.Organization))
+ vcard_graph.add((contact_point_uri, RDF.type, URIRef("http://dati.gov.it/onto/dcatapit#Organization")))
+ vcard_graph.add((contact_point_uri, RDF.type, URIRef("http://xmlns.com/foaf/0.1/Organization")))
+ vcard_graph.add((contact_point_uri, RDF.type, URIRef("http://www.w3.org/2006/vcard/ns#Kind")))
+ vcard_graph.add((contact_point_uri, VCARD.fn, Literal(contact.get("name"))))
+ vcard_graph.add((contact_point_uri, VCARD.hasEmail, URIRef(f"mailto:{contact.get('email')}")))
+ vcard_graph.add((contact_point_uri, VCARD.hasURL, URIRef(contact.get("webpage"))))
+
+ return catalog_graph, datasets_graph, distributions_graph, vcard_graph
+
+
+def serialize_and_concatenate_graphs(catalog_graph, datasets_graph, distributions_graph, vcard_graph):
+ # Serialize each graph to a string
+ catalog_str = catalog_graph.serialize(format='pretty-xml')
+ datasets_str = datasets_graph.serialize(format='pretty-xml')
+ distributions_str = distributions_graph.serialize(format='pretty-xml')
+ vcard_str = vcard_graph.serialize(format='pretty-xml')
+
+ # Remove XML headers and opening tags from datasets and distributions and vcard strings
+ datasets_str = re.sub(r'<\?xml[^>]+\?>', '', datasets_str)
+ datasets_str = re.sub(r']*>', '', datasets_str, count=1).rsplit('', 1)[0]
+ distributions_str = re.sub(r'<\?xml[^>]+\?>', '', distributions_str)
+ distributions_str = re.sub(r']*>', '', distributions_str, count=1).rsplit('', 1)[0]
+ vcard_str = re.sub(r'<\?xml[^>]+\?>', '', vcard_str)
+ vcard_str = re.sub(r']*>', '', vcard_str, count=1).rsplit('', 1)[0]
+
+ # Concatenate the strings
+ final_str = catalog_str.rsplit('', 1)[0] + datasets_str + distributions_str + vcard_str + ''
+
+ # Manually add the vcard namespace declaration
+ final_str = final_str.replace(
+ ' list:
@@ -419,7 +421,10 @@ def is_product_valid_for_role(
def _process_query(kube, query: GeoQuery, compute: None | bool = False):
if isinstance(kube, Dataset):
Datastore._LOG.debug("filtering with: %s", query.filters)
- kube = kube.filter(**query.filters)
+ try:
+ kube = kube.filter(**query.filters)
+ except ValueError as err:
+ Datastore._LOG.warning("could not filter by one of the key: %s", err)
Datastore._LOG.debug("resulting kube len: %s", len(kube))
if isinstance(kube, Delayed) and compute:
kube = kube.compute()
@@ -444,10 +449,10 @@ def _process_query(kube, query: GeoQuery, compute: None | bool = False):
if query.vertical:
Datastore._LOG.debug("subsetting by vertical...")
if isinstance(
- vertical := Datastore._maybe_convert_dict_slice_to_slice(
- query.vertical
- ),
- slice,
+ vertical := Datastore._maybe_convert_dict_slice_to_slice(
+ query.vertical
+ ),
+ slice,
):
method = None
else:
@@ -463,4 +468,4 @@ def _maybe_convert_dict_slice_to_slice(dict_vals):
dict_vals.get("stop"),
dict_vals.get("step"),
)
- return dict_vals
+ return dict_vals
\ No newline at end of file
diff --git a/datastore/dbmanager/dbmanager.py b/datastore/dbmanager/dbmanager.py
index b11c46c..d4ff293 100644
--- a/datastore/dbmanager/dbmanager.py
+++ b/datastore/dbmanager/dbmanager.py
@@ -43,6 +43,7 @@ class RequestStatus(Enum_):
"""Status of the Request"""
PENDING = auto()
+ QUEUED = auto()
RUNNING = auto()
DONE = auto()
FAILED = auto()
@@ -85,7 +86,7 @@ class User(Base):
String(255), nullable=False, unique=True, default=generate_key
)
contact_name = Column(String(255))
- requests = relationship("Request")
+ requests = relationship("Request", lazy="dynamic")
roles = relationship("Role", secondary=association_table, lazy="selectin")
@@ -96,7 +97,7 @@ class Worker(Base):
host = Column(String(255))
dask_scheduler_port = Column(Integer)
dask_dashboard_address = Column(String(10))
- created_on = Column(DateTime, nullable=False)
+ created_on = Column(DateTime, default=datetime.now)
class Request(Base):
@@ -112,8 +113,8 @@ class Request(Base):
product = Column(String(255))
query = Column(JSON())
estimate_size_bytes = Column(Integer)
- created_on = Column(DateTime, nullable=False)
- last_update = Column(DateTime)
+ created_on = Column(DateTime, default=datetime.now)
+ last_update = Column(DateTime, default=datetime.now, onupdate=datetime.now)
fail_reason = Column(String(1000))
download = relationship("Download", uselist=False, lazy="selectin")
@@ -128,7 +129,7 @@ class Download(Base):
storage_id = Column(Integer, ForeignKey("storages.storage_id"))
location_path = Column(String(255))
size_bytes = Column(Integer)
- created_on = Column(DateTime, nullable=False)
+ created_on = Column(DateTime, default=datetime.now)
class Storage(Base):
@@ -267,16 +268,18 @@ def create_request(
def update_request(
self,
request_id: int,
- worker_id: int,
- status: RequestStatus,
+ worker_id: int | None = None,
+ status: RequestStatus | None = None,
location_path: str = None,
size_bytes: int = None,
fail_reason: str = None,
) -> int:
with self.__session_maker() as session:
request = session.query(Request).get(request_id)
- request.status = status
- request.worker_id = worker_id
+ if status:
+ request.status = status
+ if worker_id:
+ request.worker_id = worker_id
request.last_update = datetime.utcnow()
request.fail_reason = fail_reason
session.commit()
@@ -305,7 +308,17 @@ def get_request_status_and_reason(
def get_requests_for_user_id(self, user_id) -> list[Request]:
with self.__session_maker() as session:
- return session.query(User).get(user_id).requests
+ return session.query(User).get(user_id).requests.all()
+
+ def get_requests_for_user_id_and_status(
+ self, user_id, status: RequestStatus | tuple[RequestStatus]
+ ) -> list[Request]:
+ if isinstance(status, RequestStatus):
+ status = (status,)
+ with self.__session_maker() as session:
+ return session.get(User, user_id).requests.filter(
+ Request.status.in_(status)
+ )
def get_download_details_for_request_id(self, request_id) -> Download:
with self.__session_maker() as session:
diff --git a/datastore/geoquery/geoquery.py b/datastore/geoquery/geoquery.py
index 8446660..b145587 100644
--- a/datastore/geoquery/geoquery.py
+++ b/datastore/geoquery/geoquery.py
@@ -15,6 +15,7 @@ class GeoQuery(BaseModel, extra="allow"):
vertical: Optional[Union[float, List[float], Dict[str, float]]]
filters: Optional[Dict]
format: Optional[str]
+ format_args: Optional[Dict]
# TODO: Check if we are going to allow the vertical coordinates inside both
# `area`/`location` nad `vertical`
diff --git a/datastore/workflow/workflow.py b/datastore/workflow/workflow.py
index e609a77..04720ee 100644
--- a/datastore/workflow/workflow.py
+++ b/datastore/workflow/workflow.py
@@ -108,9 +108,9 @@ def _subset(kube: DataCube | None = None) -> DataCube:
return Datastore().query(
dataset_id=dataset_id,
product_id=product_id,
- query=query
- if isinstance(query, GeoQuery)
- else GeoQuery(**query),
+ query=(
+ query if isinstance(query, GeoQuery) else GeoQuery(**query)
+ ),
compute=False,
)
@@ -153,18 +153,21 @@ def _average(kube: DataCube | None = None) -> DataCube:
)
self._add_computational_node(task)
return self
-
+
def to_regular(
self, id: Hashable, *, dependencies: list[Hashable]
) -> "Workflow":
def _to_regular(kube: DataCube | None = None) -> DataCube:
- assert kube is not None, "`kube` cannot be `None` for `to_regular``"
+ assert (
+ kube is not None
+ ), "`kube` cannot be `None` for `to_regular``"
return kube.to_regular()
+
task = _WorkflowTask(
id=id, operator=_to_regular, dependencies=dependencies
)
self._add_computational_node(task)
- return self
+ return self
def add_task(
self,
diff --git a/docker-compose.yaml b/docker-compose.yaml
index 995b7ca..c510542 100644
--- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -1,19 +1,18 @@
version: "3"
services:
api:
- build:
- context: ./
- dockerfile: ./api/Dockerfile
+ image: local/geolake-api:latest
ports:
- "8080:80"
depends_on:
+ - datastore
- broker
- db
links:
- broker
- db
environment:
- CATALOG_PATH: /code/app/resources/catalogs/catalog.yaml
+ CATALOG_PATH: /catalog/catalog.yaml
POSTGRES_DB: dds
POSTGRES_USER: dds
POSTGRES_PASSWORD: dds
@@ -21,11 +20,9 @@ services:
POSTGRES_PORT: 5432
volumes:
- downloads:/downloads:ro
- command: ["./wait-for-it.sh", "broker:5672", "--", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "80"]
+ command: ["./../wait-for-it.sh", "broker:5672", "--", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "80"]
executor:
- build:
- context: ./
- dockerfile: ./executor/Dockerfile
+ image: local/geolake-executor:latest
depends_on:
- broker
- db
@@ -37,14 +34,20 @@ services:
environment:
EXECUTOR_TYPES: query,info,estimate
CATALOG_PATH: /code/app/resources/catalogs/catalog.yaml
- POSTGRES_DB: dds
- POSTGRES_USER: dds
- POSTGRES_PASSWORD: dds
+ POSTGRES_DB: geolake
+ POSTGRES_USER: geolake
+ POSTGRES_PASSWORD: geolake
POSTGRES_HOST: db
POSTGRES_PORT: 5432
volumes:
- downloads:/downloads:rw
- command: ["./wait-for-it.sh", "broker:5672", "--", "python", "./app/main.py"]
+ command: ["./../wait-for-it.sh", "broker:5672", "--", "python", "./app/main.py"]
+ datastore:
+ image: local/geolake-datastore:latest
+ depends_on:
+ - drivers
+ drivers:
+ image: local/geolake-drivers:latest
broker:
image: rabbitmq:3
db:
@@ -55,31 +58,12 @@ services:
ports:
- 5432:5432
environment:
- POSTGRES_DB: dds
- POSTGRES_USER: dds
- POSTGRES_PASSWORD: dds
+ POSTGRES_DB: geolake
+ POSTGRES_USER: geolake
+ POSTGRES_PASSWORD: geolake
POSTGRES_HOST: db
POSTGRES_PORT: 5432
- web:
- build:
- context: ./
- dockerfile: ./web/Dockerfile
- ports:
- - "8080:80"
- depends_on:
- - db
- links:
- - db
- environment:
- CATALOG_PATH: /code/app/resources/catalogs/catalog.yaml
- POSTGRES_DB: dds
- POSTGRES_USER: dds
- POSTGRES_PASSWORD: dds
- POSTGRES_HOST: db
- POSTGRES_PORT: 5432
- volumes:
- - downloads:/downloads:ro
- command: ["./wait-for-it.sh", "broker:5672", "--", "uvicorn", "web.main:app", "--host", "0.0.0.0", "--port", "80"]
volumes:
- downloads:
\ No newline at end of file
+ downloads:
+ catalog:
\ No newline at end of file
diff --git a/drivers/Dockerfile b/drivers/Dockerfile
index 978eaa1..57874a2 100644
--- a/drivers/Dockerfile
+++ b/drivers/Dockerfile
@@ -1,8 +1,9 @@
ARG REGISTRY=rg.fr-par.scw.cloud/geokube
#ARG TAG=v0.2.6b2
-ARG TAG=2024.05.03.10.36
+ARG TAG=latest
FROM $REGISTRY/geokube:$TAG
COPY dist/intake_geokube-0.1a0-py3-none-any.whl /
+
RUN pip install /intake_geokube-0.1a0-py3-none-any.whl
-RUN rm /intake_geokube-0.1a0-py3-none-any.whl
\ No newline at end of file
+RUN rm /intake_geokube-0.1a0-py3-none-any.whl
diff --git a/drivers/intake_geokube/__init__.py b/drivers/intake_geokube/__init__.py
index dc60a1d..2c990df 100644
--- a/drivers/intake_geokube/__init__.py
+++ b/drivers/intake_geokube/__init__.py
@@ -3,3 +3,4 @@
# This avoids a circilar dependency pitfall by ensuring that the
# driver-discovery code runs first, see:
# https://intake.readthedocs.io/en/latest/making-plugins.html#entrypoints
+from .geoquery import GeoQuery
\ No newline at end of file
diff --git a/drivers/intake_geokube/base.py b/drivers/intake_geokube/base.py
index e3c689b..b30c4d8 100644
--- a/drivers/intake_geokube/base.py
+++ b/drivers/intake_geokube/base.py
@@ -1,8 +1,12 @@
# from . import __version__
+from dask.delayed import Delayed
from intake.source.base import DataSource, Schema
from geokube.core.datacube import DataCube
from geokube.core.dataset import Dataset
+from .geoquery import GeoQuery
+
+
class GeokubeSource(DataSource):
"""Common behaviours for plugins in this repo"""
@@ -10,6 +14,13 @@ class GeokubeSource(DataSource):
version = "0.1a0"
container = "geokube"
partition_access = True
+ geoquery: GeoQuery | None
+ compute: bool
+
+ def __init__(self, metadata, geoquery: GeoQuery = None, compute: bool = False):
+ super().__init__(metadata=metadata)
+ self.geoquery = geoquery
+ self.compute = compute
def _get_schema(self):
"""Make schema object, which embeds goekube fields metadata"""
@@ -56,9 +67,8 @@ def read(self):
def read_chunked(self):
"""Return a lazy geokube object"""
- self._load_metadata()
- return self._kube
-
+ return self.read()
+
def read_partition(self, i):
"""Fetch one chunk of data at tuple index i"""
raise NotImplementedError
@@ -76,3 +86,25 @@ def close(self):
"""Delete open file from memory"""
self._kube = None
self._schema = None
+
+ def process_with_query(self):
+ self.read_chunked()
+ if not self.geoquery:
+ return self._kube.compute() if self.compute else self._kube
+ if isinstance(self._kube, Dataset):
+ self._kube = self._kube.filter(**self.geoquery.filters)
+ if isinstance(self._kube, Delayed) and self.compute:
+ self._kube = self._kube.compute()
+ if self.geoquery.variable:
+ self._kube = self._kube[self.geoquery.variable]
+ if self.geoquery.area:
+ self._kube = self._kube.geobbox(**self.geoquery.area)
+ if self.geoquery.location:
+ self._kube = self._kube.locations(**self.geoquery.location)
+ if self.geoquery.time:
+ self._kube = self._kube.sel(time=self.geoquery.time)
+ if self.geoquery.vertical:
+ method = None if isinstance(self.geoquery.vertical, slice) else "nearest"
+ self._kube = self._kube.sel(vertical=self.geoquery.vertical, method=method)
+ return self._kube.compute() if self.compute else self._kube
+
\ No newline at end of file
diff --git a/drivers/intake_geokube/geoquery.py b/drivers/intake_geokube/geoquery.py
new file mode 100644
index 0000000..544e654
--- /dev/null
+++ b/drivers/intake_geokube/geoquery.py
@@ -0,0 +1,111 @@
+import json
+from typing import Optional, List, Dict, Union, Any, TypeVar
+
+from pydantic import BaseModel, root_validator, validator
+
+TGeoQuery = TypeVar("TGeoQuery")
+
+def _maybe_convert_dict_slice_to_slice(dict_vals):
+ if "start" in dict_vals or "stop" in dict_vals:
+ return slice(
+ dict_vals.get("start"),
+ dict_vals.get("stop"),
+ dict_vals.get("step"),
+ )
+ return dict_vals
+
+class _GeoQueryJSONEncoder(json.JSONEncoder):
+
+ def default(self, obj):
+ if isinstance(obj, slice):
+ return {
+ "start": obj.start,
+ "stop": obj.stop,
+ "step": obj.step
+ }
+ return json.JSONEncoder.default(self, obj)
+
+
+class GeoQuery(BaseModel):
+ variable: Optional[Union[str, List[str]]]
+ # TODO: Check how `time` is to be represented
+ time: Optional[Union[Dict[str, str | None], Dict[str, List[str]]]]
+ area: Optional[Dict[str, float]]
+ location: Optional[Dict[str, Union[float, List[float]]]]
+ vertical: Optional[Union[float, List[float], Dict[str, float]]]
+ filters: Optional[Dict]
+ format: Optional[str]
+ format_args: Optional[Dict]
+
+ class Config:
+ extra = "allow"
+ json_encoders = {slice: lambda s: {
+ "start": s.start,
+ "stop": s.stop,
+ "step": s.step
+ }}
+
+ # TODO: Check if we are going to allow the vertical coordinates inside both
+ # `area`/`location` nad `vertical`
+
+ @root_validator
+ def area_locations_mutually_exclusive_validator(cls, query):
+ if query["area"] is not None and query["location"] is not None:
+ raise KeyError(
+ "area and location couldn't be processed together, please use"
+ " one of them"
+ )
+ return query
+
+ @root_validator(pre=True)
+ def build_filters(cls, values: Dict[str, Any]) -> Dict[str, Any]:
+ if "filters" in values:
+ return values
+ filters = {k: _maybe_convert_dict_slice_to_slice(v) for k, v in values.items() if k not in cls.__fields__}
+ values = {k: v for k, v in values.items() if k in cls.__fields__}
+ values["filters"] = filters
+ return values
+
+ @validator("time", always=True)
+ def match_time_dict(cls, value):
+ if isinstance(value, dict):
+ assert any([k in value for k in ("start", "stop", "year", "month", "day", "hour")]), "Missing dictionary key"
+ if "start" in value or "stop" in value:
+ return _maybe_convert_dict_slice_to_slice(value)
+ return value
+
+
+ @validator("vertical", always=True)
+ def match_vertical_dict(cls, value):
+ if isinstance(value, dict):
+ assert "start" in value, "Missing 'start' key"
+ assert "stop" in value, "Missing 'stop' key"
+ return _maybe_convert_dict_slice_to_slice(value)
+ return value
+
+ def original_query_json(self):
+ """Return the JSON representation of the original query submitted
+ to the geokube-dds"""
+ res = super().dict()
+ res = dict(**res.pop("filters", {}), **res)
+ # NOTE: skip empty values to make query representation
+ # shorter and more elegant
+ res = dict(filter(lambda item: item[1] is not None, res.items()))
+ return json.dumps(res, cls=_GeoQueryJSONEncoder)
+
+ @classmethod
+ def parse(
+ cls, load: TGeoQuery | dict | str | bytes | bytearray
+ ) -> TGeoQuery:
+ if isinstance(load, cls):
+ return load
+ if isinstance(load, (str, bytes, bytearray)):
+ load = json.loads(load)
+ if isinstance(load, dict):
+ load = GeoQuery(**load)
+ else:
+ raise TypeError(
+ f"type of the `load` argument ({type(load).__name__}) is not"
+ " supported!"
+ )
+ return load
diff --git a/drivers/intake_geokube/netcdf.py b/drivers/intake_geokube/netcdf.py
index 7247891..e8b0754 100644
--- a/drivers/intake_geokube/netcdf.py
+++ b/drivers/intake_geokube/netcdf.py
@@ -21,6 +21,7 @@ def __init__(
metadata=None,
mapping: Optional[Mapping[str, Mapping[str, str]]] = None,
load_files_on_persistance: Optional[bool] = True,
+ **kwargs
):
self._kube = None
self.path = path
@@ -34,7 +35,7 @@ def __init__(
self.xarray_kwargs = {} if xarray_kwargs is None else xarray_kwargs
self.load_files_on_persistance = load_files_on_persistance
# self.xarray_kwargs.update({'engine' : 'netcdf'})
- super(NetCDFSource, self).__init__(metadata=metadata)
+ super(NetCDFSource, self).__init__(metadata=metadata, **kwargs)
def _open_dataset(self):
if self.pattern is None:
diff --git a/drivers/intake_geokube/sentinel.py b/drivers/intake_geokube/sentinel.py
new file mode 100644
index 0000000..4c6b612
--- /dev/null
+++ b/drivers/intake_geokube/sentinel.py
@@ -0,0 +1,205 @@
+"""Geokube driver for sentinel data."""
+
+from collections import defaultdict
+from multiprocessing.util import get_temp_dir
+import os
+import dask
+import zipfile
+import glob
+from functools import partial
+from typing import Generator, Iterable, Mapping, Optional, List
+
+import numpy as np
+import pandas as pd
+import xarray as xr
+from pyproj import Transformer
+from pyproj.crs import CRS, GeographicCRS
+from intake.source.utils import reverse_format
+
+from geokube import open_datacube
+from geokube.core.dataset import Dataset
+
+from .base import GeokubeSource
+from .geoquery import GeoQuery
+
+SENSING_TIME_ATTR: str = "sensing_time"
+FILE: str = "files"
+DATACUBE: str = "datacube"
+
+
+def get_field_name_from_path(path: str):
+ res, file = path.split(os.sep)[-2:]
+ band = file.split("_")[-2]
+ return f"{res}_{band}"
+
+
+def preprocess_sentinel(dset: xr.Dataset, pattern: str, **kw) -> xr.Dataset:
+ crs = CRS.from_cf(dset["spatial_ref"].attrs)
+ transformer = Transformer.from_crs(
+ crs_from=crs, crs_to=GeographicCRS(), always_xy=True
+ )
+ x_vals, y_vals = dset["x"].to_numpy(), dset["y"].to_numpy()
+ lon_vals, lat_vals = transformer.transform(*np.meshgrid(x_vals, y_vals))
+ source_path = dset.encoding["source"]
+ sensing_time = os.path.splitext(source_path.split(os.sep)[-6])[0].split(
+ "_"
+ )[-1]
+ time = pd.to_datetime([sensing_time]).to_numpy()
+ dset = dset.assign_coords(
+ {
+ "time": time,
+ "latitude": (("x", "y"), lat_vals),
+ "longitude": (("x", "y"), lon_vals),
+ }
+ ).rename({"band_data": get_field_name_from_path(source_path)})
+ return dset
+
+
+def get_zip_files_from_path(path: str) -> Generator:
+ assert path and isinstance(path, str), "`path` must be a string"
+ assert path.lower().endswith("zip"), "`path` must point to a ZIP archive"
+ if "*" in path:
+ yield from glob.iglob(path)
+ return
+ yield path
+
+
+def unzip_data(files: Iterable[str], target: str) -> List[str]:
+ """Unzip ZIP archive to the `target` directory."""
+ target_files = []
+ for file in files:
+ prod_id = os.path.splitext(os.path.basename(file))[0]
+ target_prod = os.path.join(target, prod_id)
+ os.makedirs(target_prod, exist_ok=True)
+ with zipfile.ZipFile(file) as archive:
+ archive.extractall(path=target_prod)
+ target_files.append(os.listdir(target_prod))
+ return target_files
+
+
+def _prepare_df_from_files(files: Iterable[str], pattern: str) -> pd.DataFrame:
+ data = []
+ for f in files:
+ attr = reverse_format(pattern, f)
+ attr[FILE] = f
+ data.append(attr)
+ return pd.DataFrame(data)
+
+
+class CMCCSentinelSource(GeokubeSource):
+ name = "cmcc_sentinel_geokube"
+ version = "0.0.1"
+
+ def __init__(
+ self,
+ path: str,
+ pattern: str = None,
+ zippath: str = None,
+ zippattern: str = None,
+ metadata=None,
+ xarray_kwargs: dict = None,
+ mapping: Optional[Mapping[str, Mapping[str, str]]] = None,
+ **kwargs,
+ ):
+ super().__init__(metadata=metadata, **kwargs)
+ self._kube = None
+ self.path = path
+ self.pattern = pattern
+ self.zippath = zippath
+ self.zippattern = zippattern
+ self.mapping = mapping
+ self.metadata_caching = False
+ self.xarray_kwargs = {} if xarray_kwargs is None else xarray_kwargs
+ self._unzip_dir = get_temp_dir()
+ self._zipdf = None
+ self._jp2df = None
+ assert (
+ SENSING_TIME_ATTR in self.pattern
+ ), f"{SENSING_TIME_ATTR} is missing in the pattern"
+ self.preprocess = partial(
+ preprocess_sentinel,
+ pattern=self.pattern,
+ )
+ if self.geoquery:
+ self.filters = self.geoquery.filters
+ else:
+ self.filters = {}
+
+ def __post_init__(self) -> None:
+ assert (
+ SENSING_TIME_ATTR in self.pattern
+ ), f"{SENSING_TIME_ATTR} is missing in the pattern"
+ self.preprocess = partial(
+ preprocess_sentinel,
+ pattern=self.pattern,
+ )
+
+ def _compute_res_df(self) -> List[str]:
+ self._zipdf = self._get_files_attr()
+ self._maybe_select_by_zip_attrs()
+ _ = unzip_data(self._zipdf[FILE].values, target=self._unzip_dir)
+ self._create_jp2_df()
+ self._maybe_select_by_jp2_attrs()
+
+ def _get_files_attr(self) -> pd.DataFrame:
+ df = _prepare_df_from_files(
+ get_zip_files_from_path(self.path), self.pattern
+ )
+ assert (
+ SENSING_TIME_ATTR in df
+ ), f"{SENSING_TIME_ATTR} column is missing"
+ return df.set_index(SENSING_TIME_ATTR).sort_index()
+
+ def _maybe_select_by_zip_attrs(self) -> Optional[pd.DataFrame]:
+ filters_to_pop = []
+ for flt in self.filters:
+ if flt in self._zipdf.columns:
+ self._zipdf = self._zipdf.set_index(flt)
+ if flt == self._zipdf.index.name:
+ self._zipdf = self._zipdf.loc[self.filters[flt]]
+ filters_to_pop.append(flt)
+ for f in filters_to_pop:
+ self.filters.pop(f)
+ self._zipdf = self._zipdf.reset_index()
+
+
+ def _create_jp2_df(self) -> None:
+ self._jp2df = _prepare_df_from_files(
+ glob.iglob(os.path.join(self._unzip_dir, self.zippath)),
+ os.path.join(self._unzip_dir, self.zippattern),
+ )
+
+ def _maybe_select_by_jp2_attrs(self):
+ filters_to_pop = []
+ for key, value in self.filters.items():
+ if key not in self._jp2df:
+ continue
+ if isinstance(value, str):
+ self._jp2df = self._jp2df[self._jp2df[key] == value]
+ elif isinstance(value, Iterable):
+ self._jp2df = self._jp2df[self._jp2df[key].isin(value)]
+ else:
+ raise TypeError(f"type `{type(value)}` is not supported!")
+ filters_to_pop.append(key)
+ for f in filters_to_pop:
+ self.filters.pop(f)
+
+ def _open_dataset(self):
+ self._compute_res_df()
+ self._jp2df
+ cubes = []
+ for i, row in self._jp2df.iterrows():
+ cubes.append(
+ dask.delayed(open_datacube)(
+ path=row[FILE],
+ id_pattern=None,
+ mapping=self.mapping,
+ metadata_caching=self.metadata_caching,
+ **self.xarray_kwargs,
+ preprocess=self.preprocess,
+ )
+ )
+ self._jp2df[DATACUBE] = cubes
+ self._kube = Dataset(self._jp2df.reset_index(drop=True))
+ self.geoquery.filters = self.filters
+ return self._kube
diff --git a/drivers/intake_geokube/wrf.py b/drivers/intake_geokube/wrf.py
index 1968e40..196ff68 100644
--- a/drivers/intake_geokube/wrf.py
+++ b/drivers/intake_geokube/wrf.py
@@ -124,6 +124,7 @@ def __init__(
load_files_on_persistance: Optional[bool] = True,
variables_to_keep: Optional[Union[str, list[str]]] = None,
variables_to_skip: Optional[Union[str, list[str]]] = None,
+ **kwargs
):
self._kube = None
self.path = path
@@ -142,7 +143,7 @@ def __init__(
variables_to_skip=variables_to_skip,
)
# self.xarray_kwargs.update({'engine' : 'netcdf'})
- super(CMCCWRFSource, self).__init__(metadata=metadata)
+ super(CMCCWRFSource, self).__init__(metadata=metadata, **kwargs)
def _open_dataset(self):
if self.pattern is None:
diff --git a/drivers/setup.py b/drivers/setup.py
index b3a3032..7723f8b 100644
--- a/drivers/setup.py
+++ b/drivers/setup.py
@@ -13,12 +13,12 @@
long_description_content_type="text/markdown",
url="https://github.com/geokube/intake-geokube",
packages=setuptools.find_packages(),
- install_requires=["intake", "pytest"],
+ install_requires=["intake", "pytest", "pydantic<2.0.0"],
entry_points={
"intake.drivers": [
"geokube_netcdf = intake_geokube.netcdf:NetCDFSource",
"cmcc_wrf_geokube = intake_geokube.wrf:CMCCWRFSource",
- "cmcc_afm_geokube = intake_geokube.afm:CMCCAFMSource",
+ "cmcc_sentinel_geokube = intake_geokube.sentinel:CMCCSentinelSource"
]
},
classifiers=[
diff --git a/executor/app/main.py b/executor/app/main.py
index 5daf7e6..6dc3566 100644
--- a/executor/app/main.py
+++ b/executor/app/main.py
@@ -110,6 +110,7 @@ def persist_datacube(
kube._properties["history"] = get_history_message()
if isinstance(message.content, GeoQuery):
format = message.content.format
+ format_args = message.content.format_args
else:
format = "netcdf"
match format:
@@ -119,6 +120,15 @@ def persist_datacube(
case "geojson":
full_path = os.path.join(base_path, f"{path}.json")
kube.to_geojson(full_path)
+ case "png":
+ full_path = os.path.join(base_path, f"{path}.png")
+ kube.to_image(full_path, **format_args)
+ case "jpeg":
+ full_path = os.path.join(base_path, f"{path}.jpg")
+ kube.to_image(full_path, **format_args)
+ case "csv":
+ full_path = os.path.join(base_path, f"{path}.csv")
+ kube.to_csv(full_path)
case _:
raise ValueError(f"format `{format}` is not supported")
return full_path
@@ -132,7 +142,9 @@ def persist_dataset(
def _get_attr_comb(dataframe_item, attrs):
return "_".join([dataframe_item[attr_name] for attr_name in attrs])
- def _persist_single_datacube(dataframe_item, base_path, format):
+ def _persist_single_datacube(dataframe_item, base_path, format, format_args=None):
+ if not format_args:
+ format_args = {}
dcube = dataframe_item[dset.DATACUBE_COL]
if isinstance(dcube, Delayed):
dcube = dcube.compute()
@@ -169,14 +181,24 @@ def _persist_single_datacube(dataframe_item, base_path, format):
case "geojson":
full_path = os.path.join(base_path, f"{path}.json")
dcube.to_geojson(full_path)
+ case "png":
+ full_path = os.path.join(base_path, f"{path}.png")
+ dcube.to_image(full_path, **format_args)
+ case "jpeg":
+ full_path = os.path.join(base_path, f"{path}.jpg")
+ dcube.to_image(full_path, **format_args)
+ case "csv":
+ full_path = os.path.join(base_path, f"{path}.csv")
+ dcube.to_csv(full_path)
return full_path
if isinstance(message.content, GeoQuery):
format = message.content.format
+ format_args = message.content.format_args
else:
format = "netcdf"
datacubes_paths = dset.data.apply(
- _persist_single_datacube, base_path=base_path, format=format, axis=1
+ _persist_single_datacube, base_path=base_path, format=format, format_args=format_args, axis=1
)
paths = datacubes_paths[~datacubes_paths.isna()]
if len(paths) == 0: