diff --git a/Dockerfile b/.deprecated/Dockerfile similarity index 100% rename from Dockerfile rename to .deprecated/Dockerfile diff --git a/collection_processor.py b/.deprecated/collection_processor.py similarity index 72% rename from collection_processor.py rename to .deprecated/collection_processor.py index 5042351..e676219 100644 --- a/collection_processor.py +++ b/.deprecated/collection_processor.py @@ -12,25 +12,26 @@ __holder__ = "Computer Research Institute of Montreal (CRIM)" __contact__ = "mathieu.provencher@crim.ca" -import requests -import os -import pystac import datetime import hashlib -import yaml +import os import sys +import pystac +import requests +import yaml + class bcolors: - HEADER = '\033[95m' - OKBLUE = '\033[94m' - OKCYAN = '\033[96m' - OKGREEN = '\033[92m' - WARNING = '\033[93m' - FAIL = '\033[91m' - ENDC = '\033[0m' - BOLD = '\033[1m' - UNDERLINE = '\033[4m' + HEADER = "\033[95m" + OKBLUE = "\033[94m" + OKCYAN = "\033[96m" + OKGREEN = "\033[92m" + WARNING = "\033[93m" + FAIL = "\033[91m" + ENDC = "\033[0m" + BOLD = "\033[1m" + UNDERLINE = "\033[4m" class CollectionProcessor: @@ -55,9 +56,7 @@ def __init__(self): self.process_collection(stac_host, col["name"], col["description"]) def process_collection(self, stac_host, collection_name, collection_description): - collection_id = hashlib.md5( - collection_name.encode("utf-8") - ).hexdigest() + collection_id = hashlib.md5(collection_name.encode("utf-8")).hexdigest() stac_collection = self.get_stac_collection(stac_host, collection_id) if stac_collection: @@ -119,23 +118,20 @@ def create_stac_collection(self, collection_id, collection_name, collection_desc """ sp_extent = pystac.SpatialExtent([[-140.99778, 41.6751050889, -52.6480987209, 83.23324]]) - capture_date = datetime.datetime.strptime('2015-10-22', '%Y-%m-%d') - end_capture_date = datetime.datetime.strptime('2100-10-22', '%Y-%m-%d') + capture_date = datetime.datetime.strptime("2015-10-22", "%Y-%m-%d") + end_capture_date = datetime.datetime.strptime("2100-10-22", "%Y-%m-%d") tmp_extent = pystac.TemporalExtent([(capture_date, end_capture_date)]) extent = pystac.Extent(sp_extent, tmp_extent) - collection = pystac.Collection(id=collection_id, - title=collection_name, - description=collection_description, - extent=extent, - keywords=[ - "climate change", - "CMIP5", - "WCRP", - "CMIP" - ], - providers=None, - summaries=pystac.Summaries({"needs_summaries_update": ["true"]})) + collection = pystac.Collection( + id=collection_id, + title=collection_name, + description=collection_description, + extent=extent, + keywords=["climate change", "CMIP5", "WCRP", "CMIP"], + providers=None, + summaries=pystac.Summaries({"needs_summaries_update": ["true"]}), + ) return collection.to_dict() @@ -145,13 +141,17 @@ def post_collection(self, stac_host, json_data): Returns the collection id. """ - collection_id = json_data['id'] + collection_id = json_data["id"] r = requests.post(os.path.join(stac_host, "collections"), json=json_data, verify=False) if r.status_code == 200: - print(f"{bcolors.OKGREEN}[INFO] Pushed STAC collection [{collection_id}] to [{stac_host}] ({r.status_code}){bcolors.ENDC}") + print( + f"{bcolors.OKGREEN}[INFO] Pushed STAC collection [{collection_id}] to [{stac_host}] ({r.status_code}){bcolors.ENDC}" + ) elif r.status_code == 409: - print(f"{bcolors.WARNING}[INFO] STAC collection [{collection_id}] already exists on [{stac_host}] ({r.status_code}), updating..{bcolors.ENDC}") + print( + f"{bcolors.WARNING}[INFO] STAC collection [{collection_id}] already exists on [{stac_host}] ({r.status_code}), updating..{bcolors.ENDC}" + ) r = requests.put(os.path.join(stac_host, "collections"), json=json_data, verify=False) r.raise_for_status() else: diff --git a/collections.yaml b/.deprecated/collections.yaml similarity index 100% rename from collections.yaml rename to .deprecated/collections.yaml diff --git a/populate.sh b/.deprecated/populate.sh similarity index 100% rename from populate.sh rename to .deprecated/populate.sh diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..80f0926 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +*.pyc +STACpopulator.egg-info/ +.vscode/ diff --git a/README.md b/README.md index 50bef0b..bcac544 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,21 @@ # STAC Catalog Populator -Populate STAC catalog with sample collection items via [CEDA STAC Generator](https://github.com/cedadev/stac-generator), employed in sample -[CMIP Dataset Ingestion Workflows](https://github.com/cedadev/stac-generator-example/tree/master/conf). +This repository contains a framework [STACpopulator](STACpopulator) that can be used to implement concrete populators (see [implementations](implementations)) for populating the STAC catalog on a DACCS node. -**Sample call via Docker image** +## Framework + +The framwork is centered around a Python Abstract Base Class: `STACpopulatorBase` that implements all the logic for populating a STAC catalog. This class implements an abstract method called `process_STAC_item` that should be defined in implementations of the class and contain all the logic for constructing the STAC representation for an item in the collection that is to be processed. + +## Implementations + +Currently, one implementation of `STACpopulatorBase` is provided in [add_CMIP6.py](implementations/add_CMIP6.py). + +## Testing + +The provided `docker-compose` file can be used to launch a test STAC server. The `add_CMIP6.py` script can be run as: ``` -docker run -e STAC_HOST=https://stac-dev.crim.ca/stac/ -e STAC_ASSET_GENERATOR_TIMEOUT=300 stac-populator +python implementations/add_CMIP6.py http://localhost:8880/stac/ https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/catalog/datasets/simulations/bias_adjusted/catalog.html implementations/CMIP6.yml ``` +Note: in the script above, I am currently using a sample THREDDS catalog URL and not one relevant to the global scale CMIP6 data. \ No newline at end of file diff --git a/STACpopulator/__init__.py b/STACpopulator/__init__.py new file mode 100644 index 0000000..f217a91 --- /dev/null +++ b/STACpopulator/__init__.py @@ -0,0 +1 @@ +from .populator_base import STACpopulatorBase diff --git a/STACpopulator/input.py b/STACpopulator/input.py new file mode 100644 index 0000000..f59328f --- /dev/null +++ b/STACpopulator/input.py @@ -0,0 +1,86 @@ +import logging +from abc import ABC, abstractmethod +from typing import Optional + +from colorlog import ColoredFormatter +from siphon.catalog import TDSCatalog + +LOGGER = logging.getLogger(__name__) +LOGFORMAT = " %(log_color)s%(levelname)s:%(reset)s %(blue)s[%(name)-30s]%(reset)s %(message)s" +formatter = ColoredFormatter(LOGFORMAT) +stream = logging.StreamHandler() +stream.setFormatter(formatter) +LOGGER.addHandler(stream) +LOGGER.setLevel(logging.INFO) +LOGGER.propagate = False + + +class GenericLoader(ABC): + def __init__(self) -> None: + pass + + @abstractmethod + def __iter__(self): + """ + A generator that returns an item from the input. The item could be anything + depending on the specific concrete implementation of this abstract class. + """ + pass + + @abstractmethod + def reset(self): + """Reset the internal state of the generator.""" + pass + + +class THREDDSLoader(GenericLoader): + def __init__(self, thredds_catalog_url: str, depth: Optional[int] = None) -> None: + """Constructor + + :param thredds_catalog_url: the URL to the THREDDS catalog to ingest + :type thredds_catalog_url: str + :param depth: Maximum recursive depth for the class's generator. Setting 0 will return only datasets within the + top-level catalog. If None, depth is set to 1000, defaults to None + :type depth: int, optional + """ + super().__init__() + self._depth = depth if depth is not None else 1000 + + if thredds_catalog_url.endswith(".html"): + thredds_catalog_url = thredds_catalog_url.replace(".html", ".xml") + LOGGER.info("Converting catalog URL from html to xml") + + self.thredds_catalog_URL = thredds_catalog_url + self.catalog = TDSCatalog(self.thredds_catalog_URL) + self.catalog_head = self.catalog + + def reset(self): + """Reset the generator.""" + self.catalog_head = self.catalog + + def __iter__(self): + """Return a generator walking a THREDDS data catalog for datasets.""" + yield from self.catalog_head.datasets.items() + + if self._depth > 0: + for name, ref in self.catalog_head.catalog_refs.items(): + self.catalog_head = ref.follow() + self._depth -= 1 + yield from self + + +class RemoteTHREDDSLoader(THREDDSLoader): + def __init__(self, thredds_catalog_url: str, depth: int | None = None) -> None: + super().__init__(thredds_catalog_url, depth) + # more stuff to follow based on needs of a concrete implementation + + +class GeoServerLoader(GenericLoader): + def __init__(self) -> None: + super().__init__() + + def __iter__(self): + raise NotImplementedError + + def reset(self): + raise NotImplementedError diff --git a/STACpopulator/metadata_parsers.py b/STACpopulator/metadata_parsers.py new file mode 100644 index 0000000..84636f8 --- /dev/null +++ b/STACpopulator/metadata_parsers.py @@ -0,0 +1,61 @@ +import lxml.etree +import requests + + +def nc_attrs_from_ncml(url): + """Extract attributes from NcML file. + + Parameters + ---------- + url : str + Link to NcML service of THREDDS server for a dataset. + + Returns + ------- + dict + Global attribute values keyed by facet names, with variable attributes in `__variable__` nested dict, and + additional specialized attributes in `__group__` nested dict. + """ + parser = lxml.etree.XMLParser(encoding="UTF-8") + + ns = {"ncml": "http://www.unidata.ucar.edu/namespaces/netcdf/ncml-2.2"} + + # Parse XML content - UTF-8 encoded documents need to be read as bytes + xml = requests.get(url).content + doc = lxml.etree.fromstring(xml, parser=parser) + nc = doc.xpath("/ncml:netcdf", namespaces=ns)[0] + + # Extract global attributes + out = _attrib_to_dict(nc.xpath("ncml:attribute", namespaces=ns)) + + # Extract group attributes + gr = {} + for group in nc.xpath("ncml:group", namespaces=ns): + gr[group.attrib["name"]] = _attrib_to_dict(group.xpath("ncml:attribute", namespaces=ns)) + + # Extract variable attributes + va = {} + for variable in nc.xpath("ncml:variable", namespaces=ns): + if "_CoordinateAxisType" in variable.xpath("ncml:attribute/@name", namespaces=ns): + continue + va[variable.attrib["name"]] = _attrib_to_dict(variable.xpath("ncml:attribute", namespaces=ns)) + + out["__group__"] = gr + out["__variable__"] = va + + return out + + +def _attrib_to_dict(elems): + """Convert element attributes to dictionary. + + Ignore attributes with names starting with _ + """ + hidden_prefix = "_" + out = {} + for e in elems: + a = e.attrib + if a["name"].startswith(hidden_prefix): + continue + out[a["name"]] = a["value"] + return out diff --git a/STACpopulator/populator_base.py b/STACpopulator/populator_base.py new file mode 100644 index 0000000..8c6465c --- /dev/null +++ b/STACpopulator/populator_base.py @@ -0,0 +1,104 @@ +import hashlib +import logging +from abc import ABC, abstractmethod + +import yaml +from colorlog import ColoredFormatter + +from STACpopulator.input import GenericLoader +from STACpopulator.stac_utils import ( + create_stac_collection, + post_collection, + stac_collection_exists, + stac_host_reachable, + url_validate, +) + +LOGGER = logging.getLogger(__name__) +LOGFORMAT = " %(log_color)s%(levelname)s:%(reset)s %(blue)s[%(name)-30s]%(reset)s %(message)s" +formatter = ColoredFormatter(LOGFORMAT) +stream = logging.StreamHandler() +stream.setFormatter(formatter) +LOGGER.addHandler(stream) +LOGGER.setLevel(logging.INFO) +LOGGER.propagate = False + + +class STACpopulatorBase(ABC): + def __init__( + self, + stac_host: str, + data_loader: GenericLoader, + collection_info_filename: str, + ) -> None: + """Constructor + + :param stac_host: URL to the STAC API + :type stac_host: str + :param data_loader: A concrete implementation of the GenericLoader abstract base class + :type data_loader: GenericLoader + :param collection_info_filename: Yaml file containing the information about the collection to populate + :type collection_info_filename: str + :raises RuntimeError: Raised if one of the required definitions is not found in the collection info filename + """ + + super().__init__() + with open(collection_info_filename) as f: + self._collection_info = yaml.load(f, yaml.Loader) + + req_definitions = ["title", "description", "keywords", "license"] + for req in req_definitions: + if req not in self._collection_info.keys(): + LOGGER.error(f"'{req}' is required in the configuration file") + raise RuntimeError(f"'{req}' is required in the configuration file") + + self._ingest_pipeline = data_loader + self._stac_host = self.validate_host(stac_host) + + self._collection_id = hashlib.md5(self.collection_name.encode("utf-8")).hexdigest() + LOGGER.info("Initialization complete") + LOGGER.info(f"Collection {self.collection_name} is assigned id {self._collection_id}") + + @property + def collection_name(self) -> str: + return self._collection_info["title"] + + @property + def stac_host(self) -> str: + return self._stac_host + + @property + def collection_id(self) -> str: + return self._collection_id + + def validate_host(self, stac_host: str) -> str: + if not url_validate(stac_host): + raise ValueError("stac_host URL is not appropriately formatted") + if not stac_host_reachable(stac_host): + raise ValueError("stac_host is not reachable") + + return stac_host + + def ingest(self) -> None: + # First create collection if it doesn't exist + if not stac_collection_exists(self.stac_host, self.collection_id): + LOGGER.info(f"Creating collection '{self.collection_name}'") + pystac_collection = create_stac_collection(self.collection_id, self._collection_info) + post_collection(self.stac_host, pystac_collection) + LOGGER.info("Collection successfully created") + else: + LOGGER.info(f"Collection '{self.collection_name}' already exists") + # for item in self.crawler(self.catalog, **self._crawler_args): + # stac_item = self.process_STAC_item(item) + # self.post_item(stac_item) + + def post_item(self, data: dict[str, dict]) -> None: + pass + + @abstractmethod + def process_stac_item(self): # noqa N802 + pass + + @abstractmethod + def validate_stac_item_cv(self): # noqa N802 + pass diff --git a/STACpopulator/stac_utils.py b/STACpopulator/stac_utils.py new file mode 100644 index 0000000..743f53a --- /dev/null +++ b/STACpopulator/stac_utils.py @@ -0,0 +1,99 @@ +import os +import re +from datetime import datetime +from typing import Any + +import pystac +import requests + + +def url_validate(target: str) -> bool: + """Validate whether a supplied URL is reliably written. + + Parameters + ---------- + target : str + + References + ---------- + https://stackoverflow.com/a/7160778/7322852 + """ + url_regex = re.compile( + r"^(?:http|ftp)s?://" # http:// or https:// + # domain... + r"(?:(?:[A-Z\d](?:[A-Z\d-]{0,61}[A-Z\d])?\.)+(?:[A-Z]{2,6}\.?|[A-Z\d-]{2,}\.?)|" + r"localhost|" # localhost... + r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})" # ...or ip + r"(?::\d+)?" # optional port + r"(?:/?|[/?]\S+)$", + re.IGNORECASE, + ) + return True if re.match(url_regex, target) else False + + +def stac_host_reachable(url: str) -> bool: + try: + registry = requests.get(url) + registry.raise_for_status() + return True + except (requests.exceptions.RequestException, requests.exceptions.ConnectionError): + return False + + +def stac_collection_exists(stac_host: str, collection_id: str) -> bool: + """ + Get a STAC collection + + Returns the collection JSON. + """ + r = requests.get(os.path.join(stac_host, "collections", collection_id), verify=False) + + return r.status_code == 200 + + +def create_stac_collection(collection_id: str, collection_info: dict[str, Any]) -> dict[str, Any]: + """ + Create a basic STAC collection. + + Returns the collection. + """ + + sp_extent = pystac.SpatialExtent([collection_info.pop("spatialextent")]) + tmp = collection_info.pop("temporalextent") + tmp_extent = pystac.TemporalExtent( + [ + [ + datetime.strptime(tmp[0], "%Y-%m-%d") if tmp[0] is not None else None, + datetime.strptime(tmp[1], "%Y-%m-%d") if tmp[1] is not None else None, + ] + ] + ) + collection_info["extent"] = pystac.Extent(sp_extent, tmp_extent) + collection_info["summaries"] = pystac.Summaries({"needs_summaries_update": ["true"]}) + + collection = pystac.Collection(id=collection_id, **collection_info) + + return collection.to_dict() + + +def post_collection(stac_host: str, json_data: dict[str, Any]) -> None: + """ + Post a STAC collection. + + Returns the collection id. + """ + collection_id = json_data["id"] + r = requests.post(os.path.join(stac_host, "collections"), json=json_data, verify=False) + + if r.status_code == 200: + print( + f"{bcolors.OKGREEN}[INFO] Pushed STAC collection [{collection_id}] to [{stac_host}] ({r.status_code}){bcolors.ENDC}" + ) + elif r.status_code == 409: + print( + f"{bcolors.WARNING}[INFO] STAC collection [{collection_id}] already exists on [{stac_host}] ({r.status_code}), updating..{bcolors.ENDC}" + ) + r = requests.put(os.path.join(stac_host, "collections"), json=json_data, verify=False) + r.raise_for_status() + else: + r.raise_for_status() diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..23ffae5 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,46 @@ +x-logging: &default-logging + driver: "json-file" + options: + max-size: "50m" + max-file: "10" + +services: + stac: + container_name: stac-populator-test + image: ghcr.io/crim-ca/stac-app:main + depends_on: + - stac-db + ports: + - "8880:8000" + environment: + - POSTGRES_USER=dchandan + - POSTGRES_PASS=password + - POSTGRES_DBNAME=postgis + - POSTGRES_HOST_READER=stac-db + - POSTGRES_HOST_WRITER=stac-db + - POSTGRES_PORT=5432 + - ROUTER_PREFIX=/stac + logging: *default-logging + restart: always + + stac-db: + container_name: stac-populator-test-db + image: ghcr.io/stac-utils/pgstac:v0.6.10 + environment: + - POSTGRES_USER=testuser + - POSTGRES_PASSWORD=password + - POSTGRES_DB=postgis + - PGUSER=dchandan + - PGPASSWORD=password + - PGHOST=localhost + - PGDATABASE=postgis + volumes: + - stac-db:/var/lib/postgresql/data + healthcheck: + test: [ "CMD-SHELL", "pg_isready" ] + interval: 10s + timeout: 5s + retries: 5 + +volumes: + stac-db: diff --git a/implementations/CMIP6-UofT/CMIP6.yml b/implementations/CMIP6-UofT/CMIP6.yml new file mode 100644 index 0000000..a57875b --- /dev/null +++ b/implementations/CMIP6-UofT/CMIP6.yml @@ -0,0 +1,6 @@ +title: CMIP6 +description: Coupled Model Intercomparison Project phase 6 +keywords: ['CMIP', 'CMIP6', 'WCRP', 'Climate Change'] +license: "CC-BY-4.0" +spatialextent: [-180, -90, 180, 90] +temporalextent: ['1850-01-01', null] \ No newline at end of file diff --git a/implementations/CMIP6-UofT/add_CMIP6.py b/implementations/CMIP6-UofT/add_CMIP6.py new file mode 100644 index 0000000..349540d --- /dev/null +++ b/implementations/CMIP6-UofT/add_CMIP6.py @@ -0,0 +1,60 @@ +import argparse +import logging + +from colorlog import ColoredFormatter + +from STACpopulator import STACpopulatorBase +from STACpopulator.input import THREDDSLoader + +# from STACpopulator.metadata_parsers import nc_attrs_from_ncml + +LOGGER = logging.getLogger(__name__) +LOGFORMAT = " %(log_color)s%(levelname)s:%(reset)s %(blue)s[%(name)-30s]%(reset)s %(message)s" +formatter = ColoredFormatter(LOGFORMAT) +stream = logging.StreamHandler() +stream.setFormatter(formatter) +LOGGER.addHandler(stream) +LOGGER.setLevel(logging.INFO) +LOGGER.propagate = False + + +class CMIP6populator(STACpopulatorBase): + def __init__( + self, + stac_host: str, + thredds_catalog_url: str, + config_filename: str, + ) -> None: + """Constructor + + :param stac_host: URL to the STAC API + :type stac_host: str + :param thredds_catalog_url: the URL to the THREDDS catalog to ingest + :type thredds_catalog_url: str + :param config_filename: Yaml file containing the information about the collection to populate + :type config_filename: str + """ + data_loader = THREDDSLoader(thredds_catalog_url) + for item in data_loader: + print(item) + super().__init__(stac_host, data_loader, config_filename) + + def process_stac_item(self): # noqa N802 + # TODO: next step is to implement this + print("here") + + def validate_stac_item_cv(self): + # TODO: next step is to implement this + pass + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(prog="CMIP6 STAC populator") + parser.add_argument("stac_host", type=str, help="STAC API address") + parser.add_argument("thredds_catalog_URL", type=str, help="URL to the CMIP6 THREDDS catalog") + parser.add_argument("config_file", type=str, help="Name of the configuration file") + + args = parser.parse_args() + LOGGER.info(f"Arguments to call: {args}") + c = CMIP6populator(args.stac_host, args.thredds_catalog_URL, args.config_file) + c.ingest() diff --git a/implementations/NEX-GDDP-UofT/add_NEX-GDDP.py b/implementations/NEX-GDDP-UofT/add_NEX-GDDP.py new file mode 100644 index 0000000..e69de29 diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..1c94eaf --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,19 @@ +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "STACpopulator" +version = "0.0.1" +requires-python = ">=3.10" +dependencies = [ + "colorlog", + "pyyaml", + "siphon", + "pystac" +] + +[tool.setuptools] +py-modules = ["STACpopulator"] + + diff --git a/requirements.txt b/requirements.txt index 10a7a4a..dc03813 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,6 @@ +colorlog requests pystac pyyaml -git+https://github.com/crim-ca/stac-generator#egg=stac-generator siphon -shapely -boto3 -fsspec -xarray +lxml