Skip to content

Commit

Permalink
Merge pull request #12 from crim-ca/new_ingestion_workflow
Browse files Browse the repository at this point in the history
New ingestion workflow
  • Loading branch information
dchandan authored Aug 22, 2023
2 parents 7341bf7 + add5e79 commit 13717c1
Show file tree
Hide file tree
Showing 17 changed files with 534 additions and 42 deletions.
File renamed without changes.
66 changes: 33 additions & 33 deletions collection_processor.py → .deprecated/collection_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,26 @@
__holder__ = "Computer Research Institute of Montreal (CRIM)"
__contact__ = "[email protected]"

import requests
import os
import pystac
import datetime
import hashlib
import yaml
import os
import sys

import pystac
import requests
import yaml


class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
HEADER = "\033[95m"
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
WARNING = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"


class CollectionProcessor:
Expand All @@ -55,9 +56,7 @@ def __init__(self):
self.process_collection(stac_host, col["name"], col["description"])

def process_collection(self, stac_host, collection_name, collection_description):
collection_id = hashlib.md5(
collection_name.encode("utf-8")
).hexdigest()
collection_id = hashlib.md5(collection_name.encode("utf-8")).hexdigest()
stac_collection = self.get_stac_collection(stac_host, collection_id)

if stac_collection:
Expand Down Expand Up @@ -119,23 +118,20 @@ def create_stac_collection(self, collection_id, collection_name, collection_desc
"""

sp_extent = pystac.SpatialExtent([[-140.99778, 41.6751050889, -52.6480987209, 83.23324]])
capture_date = datetime.datetime.strptime('2015-10-22', '%Y-%m-%d')
end_capture_date = datetime.datetime.strptime('2100-10-22', '%Y-%m-%d')
capture_date = datetime.datetime.strptime("2015-10-22", "%Y-%m-%d")
end_capture_date = datetime.datetime.strptime("2100-10-22", "%Y-%m-%d")
tmp_extent = pystac.TemporalExtent([(capture_date, end_capture_date)])
extent = pystac.Extent(sp_extent, tmp_extent)

collection = pystac.Collection(id=collection_id,
title=collection_name,
description=collection_description,
extent=extent,
keywords=[
"climate change",
"CMIP5",
"WCRP",
"CMIP"
],
providers=None,
summaries=pystac.Summaries({"needs_summaries_update": ["true"]}))
collection = pystac.Collection(
id=collection_id,
title=collection_name,
description=collection_description,
extent=extent,
keywords=["climate change", "CMIP5", "WCRP", "CMIP"],
providers=None,
summaries=pystac.Summaries({"needs_summaries_update": ["true"]}),
)

return collection.to_dict()

Expand All @@ -145,13 +141,17 @@ def post_collection(self, stac_host, json_data):
Returns the collection id.
"""
collection_id = json_data['id']
collection_id = json_data["id"]
r = requests.post(os.path.join(stac_host, "collections"), json=json_data, verify=False)

if r.status_code == 200:
print(f"{bcolors.OKGREEN}[INFO] Pushed STAC collection [{collection_id}] to [{stac_host}] ({r.status_code}){bcolors.ENDC}")
print(
f"{bcolors.OKGREEN}[INFO] Pushed STAC collection [{collection_id}] to [{stac_host}] ({r.status_code}){bcolors.ENDC}"
)
elif r.status_code == 409:
print(f"{bcolors.WARNING}[INFO] STAC collection [{collection_id}] already exists on [{stac_host}] ({r.status_code}), updating..{bcolors.ENDC}")
print(
f"{bcolors.WARNING}[INFO] STAC collection [{collection_id}] already exists on [{stac_host}] ({r.status_code}), updating..{bcolors.ENDC}"
)
r = requests.put(os.path.join(stac_host, "collections"), json=json_data, verify=False)
r.raise_for_status()
else:
Expand Down
File renamed without changes.
File renamed without changes.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*.pyc
STACpopulator.egg-info/
.vscode/
18 changes: 14 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
# STAC Catalog Populator

Populate STAC catalog with sample collection items via [CEDA STAC Generator](https://github.com/cedadev/stac-generator), employed in sample
[CMIP Dataset Ingestion Workflows](https://github.com/cedadev/stac-generator-example/tree/master/conf).

This repository contains a framework [STACpopulator](STACpopulator) that can be used to implement concrete populators (see [implementations](implementations)) for populating the STAC catalog on a DACCS node.

**Sample call via Docker image**
## Framework

The framwork is centered around a Python Abstract Base Class: `STACpopulatorBase` that implements all the logic for populating a STAC catalog. This class implements an abstract method called `process_STAC_item` that should be defined in implementations of the class and contain all the logic for constructing the STAC representation for an item in the collection that is to be processed.

## Implementations

Currently, one implementation of `STACpopulatorBase` is provided in [add_CMIP6.py](implementations/add_CMIP6.py).

## Testing

The provided `docker-compose` file can be used to launch a test STAC server. The `add_CMIP6.py` script can be run as:

```
docker run -e STAC_HOST=https://stac-dev.crim.ca/stac/ -e STAC_ASSET_GENERATOR_TIMEOUT=300 stac-populator
python implementations/add_CMIP6.py http://localhost:8880/stac/ https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/catalog/datasets/simulations/bias_adjusted/catalog.html implementations/CMIP6.yml
```
Note: in the script above, I am currently using a sample THREDDS catalog URL and not one relevant to the global scale CMIP6 data.
1 change: 1 addition & 0 deletions STACpopulator/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .populator_base import STACpopulatorBase
86 changes: 86 additions & 0 deletions STACpopulator/input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import logging
from abc import ABC, abstractmethod
from typing import Optional

from colorlog import ColoredFormatter
from siphon.catalog import TDSCatalog

LOGGER = logging.getLogger(__name__)
LOGFORMAT = " %(log_color)s%(levelname)s:%(reset)s %(blue)s[%(name)-30s]%(reset)s %(message)s"
formatter = ColoredFormatter(LOGFORMAT)
stream = logging.StreamHandler()
stream.setFormatter(formatter)
LOGGER.addHandler(stream)
LOGGER.setLevel(logging.INFO)
LOGGER.propagate = False


class GenericLoader(ABC):
def __init__(self) -> None:
pass

@abstractmethod
def __iter__(self):
"""
A generator that returns an item from the input. The item could be anything
depending on the specific concrete implementation of this abstract class.
"""
pass

@abstractmethod
def reset(self):
"""Reset the internal state of the generator."""
pass


class THREDDSLoader(GenericLoader):
def __init__(self, thredds_catalog_url: str, depth: Optional[int] = None) -> None:
"""Constructor
:param thredds_catalog_url: the URL to the THREDDS catalog to ingest
:type thredds_catalog_url: str
:param depth: Maximum recursive depth for the class's generator. Setting 0 will return only datasets within the
top-level catalog. If None, depth is set to 1000, defaults to None
:type depth: int, optional
"""
super().__init__()
self._depth = depth if depth is not None else 1000

if thredds_catalog_url.endswith(".html"):
thredds_catalog_url = thredds_catalog_url.replace(".html", ".xml")
LOGGER.info("Converting catalog URL from html to xml")

self.thredds_catalog_URL = thredds_catalog_url
self.catalog = TDSCatalog(self.thredds_catalog_URL)
self.catalog_head = self.catalog

def reset(self):
"""Reset the generator."""
self.catalog_head = self.catalog

def __iter__(self):
"""Return a generator walking a THREDDS data catalog for datasets."""
yield from self.catalog_head.datasets.items()

if self._depth > 0:
for name, ref in self.catalog_head.catalog_refs.items():
self.catalog_head = ref.follow()
self._depth -= 1
yield from self


class RemoteTHREDDSLoader(THREDDSLoader):
def __init__(self, thredds_catalog_url: str, depth: int | None = None) -> None:
super().__init__(thredds_catalog_url, depth)
# more stuff to follow based on needs of a concrete implementation


class GeoServerLoader(GenericLoader):
def __init__(self) -> None:
super().__init__()

def __iter__(self):
raise NotImplementedError

def reset(self):
raise NotImplementedError
61 changes: 61 additions & 0 deletions STACpopulator/metadata_parsers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import lxml.etree
import requests


def nc_attrs_from_ncml(url):
"""Extract attributes from NcML file.
Parameters
----------
url : str
Link to NcML service of THREDDS server for a dataset.
Returns
-------
dict
Global attribute values keyed by facet names, with variable attributes in `__variable__` nested dict, and
additional specialized attributes in `__group__` nested dict.
"""
parser = lxml.etree.XMLParser(encoding="UTF-8")

ns = {"ncml": "http://www.unidata.ucar.edu/namespaces/netcdf/ncml-2.2"}

# Parse XML content - UTF-8 encoded documents need to be read as bytes
xml = requests.get(url).content
doc = lxml.etree.fromstring(xml, parser=parser)
nc = doc.xpath("/ncml:netcdf", namespaces=ns)[0]

# Extract global attributes
out = _attrib_to_dict(nc.xpath("ncml:attribute", namespaces=ns))

# Extract group attributes
gr = {}
for group in nc.xpath("ncml:group", namespaces=ns):
gr[group.attrib["name"]] = _attrib_to_dict(group.xpath("ncml:attribute", namespaces=ns))

# Extract variable attributes
va = {}
for variable in nc.xpath("ncml:variable", namespaces=ns):
if "_CoordinateAxisType" in variable.xpath("ncml:attribute/@name", namespaces=ns):
continue
va[variable.attrib["name"]] = _attrib_to_dict(variable.xpath("ncml:attribute", namespaces=ns))

out["__group__"] = gr
out["__variable__"] = va

return out


def _attrib_to_dict(elems):
"""Convert element attributes to dictionary.
Ignore attributes with names starting with _
"""
hidden_prefix = "_"
out = {}
for e in elems:
a = e.attrib
if a["name"].startswith(hidden_prefix):
continue
out[a["name"]] = a["value"]
return out
104 changes: 104 additions & 0 deletions STACpopulator/populator_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import hashlib
import logging
from abc import ABC, abstractmethod

import yaml
from colorlog import ColoredFormatter

from STACpopulator.input import GenericLoader
from STACpopulator.stac_utils import (
create_stac_collection,
post_collection,
stac_collection_exists,
stac_host_reachable,
url_validate,
)

LOGGER = logging.getLogger(__name__)
LOGFORMAT = " %(log_color)s%(levelname)s:%(reset)s %(blue)s[%(name)-30s]%(reset)s %(message)s"
formatter = ColoredFormatter(LOGFORMAT)
stream = logging.StreamHandler()
stream.setFormatter(formatter)
LOGGER.addHandler(stream)
LOGGER.setLevel(logging.INFO)
LOGGER.propagate = False


class STACpopulatorBase(ABC):
def __init__(
self,
stac_host: str,
data_loader: GenericLoader,
collection_info_filename: str,
) -> None:
"""Constructor
:param stac_host: URL to the STAC API
:type stac_host: str
:param data_loader: A concrete implementation of the GenericLoader abstract base class
:type data_loader: GenericLoader
:param collection_info_filename: Yaml file containing the information about the collection to populate
:type collection_info_filename: str
:raises RuntimeError: Raised if one of the required definitions is not found in the collection info filename
"""

super().__init__()
with open(collection_info_filename) as f:
self._collection_info = yaml.load(f, yaml.Loader)

req_definitions = ["title", "description", "keywords", "license"]
for req in req_definitions:
if req not in self._collection_info.keys():
LOGGER.error(f"'{req}' is required in the configuration file")
raise RuntimeError(f"'{req}' is required in the configuration file")

self._ingest_pipeline = data_loader
self._stac_host = self.validate_host(stac_host)

self._collection_id = hashlib.md5(self.collection_name.encode("utf-8")).hexdigest()
LOGGER.info("Initialization complete")
LOGGER.info(f"Collection {self.collection_name} is assigned id {self._collection_id}")

@property
def collection_name(self) -> str:
return self._collection_info["title"]

@property
def stac_host(self) -> str:
return self._stac_host

@property
def collection_id(self) -> str:
return self._collection_id

def validate_host(self, stac_host: str) -> str:
if not url_validate(stac_host):
raise ValueError("stac_host URL is not appropriately formatted")
if not stac_host_reachable(stac_host):
raise ValueError("stac_host is not reachable")

return stac_host

def ingest(self) -> None:
# First create collection if it doesn't exist
if not stac_collection_exists(self.stac_host, self.collection_id):
LOGGER.info(f"Creating collection '{self.collection_name}'")
pystac_collection = create_stac_collection(self.collection_id, self._collection_info)
post_collection(self.stac_host, pystac_collection)
LOGGER.info("Collection successfully created")
else:
LOGGER.info(f"Collection '{self.collection_name}' already exists")
# for item in self.crawler(self.catalog, **self._crawler_args):
# stac_item = self.process_STAC_item(item)
# self.post_item(stac_item)

def post_item(self, data: dict[str, dict]) -> None:
pass

@abstractmethod
def process_stac_item(self): # noqa N802
pass

@abstractmethod
def validate_stac_item_cv(self): # noqa N802
pass
Loading

0 comments on commit 13717c1

Please sign in to comment.