Skip to content

Commit

Permalink
Merge pull request #16 from crim-ca/arch-changes
Browse files Browse the repository at this point in the history
Further revision to the new ingestion workflow
  • Loading branch information
dchandan authored Nov 8, 2023
2 parents 13717c1 + 26fe4ad commit 9cd2ced
Show file tree
Hide file tree
Showing 22 changed files with 1,284 additions and 234 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
*.pyc
STACpopulator.egg-info/
.vscode/
.venv/
jupyter/
.idea
.vscode
20 changes: 20 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
IMP_DIR = STACpopulator/implementations
STAC_HOST = http://localhost:8880/stac

testcmip6:
python $(IMP_DIR)/CMIP6_UofT/add_CMIP6.py $(STAC_HOST) https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/catalog/birdhouse/testdata/xclim/cmip6/catalog.html

delcmip6:
curl --location --request DELETE '$(STAC_HOST)/collections/CMIP6_UofT'
@echo ""

starthost:
docker compose up

stophost:
docker compose down

del_docker_volume: stophost
docker volume rm stac-populator_stac-db

resethost: del_docker_volume starthost
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ Currently, one implementation of `STACpopulatorBase` is provided in [add_CMIP6.p
The provided `docker-compose` file can be used to launch a test STAC server. The `add_CMIP6.py` script can be run as:

```
python implementations/add_CMIP6.py http://localhost:8880/stac/ https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/catalog/datasets/simulations/bias_adjusted/catalog.html implementations/CMIP6.yml
python implementations/CMIP6-UofT/add_CMIP6.py http://localhost:8880/stac/ https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/catalog/birdhouse/testdata/xclim/cmip6/catalog.html implementations/CMIP6-UofT/CMIP6.yml
```
Note: in the script above, I am currently using a sample THREDDS catalog URL and not one relevant to the global scale CMIP6 data.
Note: in the script above, I am currently using a sample THREDDS catalog URL and not one relevant to the global scale CMIP6 data.
94 changes: 94 additions & 0 deletions STACpopulator/api_requests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import logging
import os
from typing import Any, Optional

import requests
from colorlog import ColoredFormatter

LOGGER = logging.getLogger(__name__)
LOGFORMAT = " %(log_color)s%(levelname)s:%(reset)s %(blue)s[%(name)-30s]%(reset)s %(message)s"
formatter = ColoredFormatter(LOGFORMAT)
stream = logging.StreamHandler()
stream.setFormatter(formatter)
LOGGER.addHandler(stream)
LOGGER.setLevel(logging.INFO)
LOGGER.propagate = False


def stac_host_reachable(url: str) -> bool:
try:
registry = requests.get(url)
registry.raise_for_status()
return True
except (requests.exceptions.RequestException, requests.exceptions.ConnectionError):
return False


def stac_collection_exists(stac_host: str, collection_id: str) -> bool:
"""
Get a STAC collection
Returns the collection JSON.
"""
r = requests.get(os.path.join(stac_host, "collections", collection_id), verify=False)

return r.status_code == 200


def post_stac_collection(stac_host: str, json_data: dict[str, Any], update: Optional[bool] = True) -> None:
"""Post/create a collection on the STAC host
:param stac_host: address of the STAC host
:type stac_host: str
:param json_data: JSON representation of the STAC collection
:type json_data: dict[str, Any]
:param update: if True, update the collection on the host server if it is already present, defaults to True
:type update: Optional[bool], optional
"""
collection_id = json_data["id"]
r = requests.post(os.path.join(stac_host, "collections"), json=json_data, verify=False)

if r.status_code == 200:
LOGGER.info(f"Collection {collection_id} successfully created")
elif r.status_code == 409:
if update:
LOGGER.info(f"Collection {collection_id} already exists. Updating.")
r = requests.put(os.path.join(stac_host, "collections"), json=json_data, verify=False)
r.raise_for_status()
else:
LOGGER.info(f"Collection {collection_id} already exists.")
else:
r.raise_for_status()


def post_stac_item(
stac_host: str, collection_id: str, item_name: str, json_data: dict[str, dict], update: Optional[bool] = True
) -> None:
"""Post a STAC item to the host server.
:param stac_host: address of the STAC host
:type stac_host: str
:param collection_id: ID of the collection to which to post this item
:type collection_id: str
:param item_name: name of the STAC item
:type item_name: str
:param json_data: JSON representation of the STAC item
:type json_data: dict[str, dict]
:param update: if True, update the item on the host server if it is already present, defaults to True
:type update: Optional[bool], optional
"""
item_id = json_data["id"]

r = requests.post(os.path.join(stac_host, f"collections/{collection_id}/items"), json=json_data)

if r.status_code == 200:
LOGGER.info(f"Item {item_name} successfully added")
elif r.status_code == 409:
if update:
LOGGER.info(f"Item {item_id} already exists. Updating.")
r = requests.put(os.path.join(stac_host, f"collections/{collection_id}/items/{item_id}"), json=json_data)
r.raise_for_status()
else:
LOGGER.info(f"Item {item_id} already exists.")
else:
r.raise_for_status()
191 changes: 191 additions & 0 deletions STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import argparse
import json
import logging
from datetime import datetime
from typing import Any, List, Literal, MutableMapping, Optional

import pydantic_core
import pyessv
from colorlog import ColoredFormatter
from pydantic import AnyHttpUrl, ConfigDict, Field, FieldValidationInfo, field_validator
from pystac.extensions.datacube import DatacubeExtension

from STACpopulator import STACpopulatorBase
from STACpopulator.implementations.CMIP6_UofT.extensions import DataCubeHelper
from STACpopulator.input import GenericLoader, THREDDSLoader
from STACpopulator.models import GeoJSONPolygon, STACItemProperties
from STACpopulator.stac_utils import STAC_item_from_metadata, collection2literal

LOGGER = logging.getLogger(__name__)
LOGFORMAT = " %(log_color)s%(levelname)s:%(reset)s %(blue)s[%(name)-30s]%(reset)s %(message)s"
formatter = ColoredFormatter(LOGFORMAT)
stream = logging.StreamHandler()
stream.setFormatter(formatter)
LOGGER.addHandler(stream)
LOGGER.setLevel(logging.INFO)
LOGGER.propagate = False

# CMIP6 controlled vocabulary (CV)
CV = pyessv.WCRP.CMIP6

# Enum classes built from the pyessv' CV
ActivityID = collection2literal(CV.activity_id)
ExperimentID = collection2literal(CV.experiment_id)
Frequency = collection2literal(CV.frequency)
GridLabel = collection2literal(CV.grid_label)
InstitutionID = collection2literal(CV.institution_id)
NominalResolution = collection2literal(CV.nominal_resolution)
Realm = collection2literal(CV.realm)
SourceID = collection2literal(CV.source_id)
SourceType = collection2literal(CV.source_type)
SubExperimentID = collection2literal(CV.sub_experiment_id)
TableID = collection2literal(CV.table_id)


def add_cmip6_prefix(name: str) -> str:
return "cmip6:" + name if "datetime" not in name else name


class CMIP6ItemProperties(STACItemProperties, validate_assignment=True):
"""Data model for CMIP6 Controlled Vocabulary."""

Conventions: str
activity_id: ActivityID
creation_date: datetime
data_specs_version: str
experiment: str
experiment_id: ExperimentID
frequency: Frequency
further_info_url: AnyHttpUrl
grid_label: GridLabel
institution: str
institution_id: InstitutionID
nominal_resolution: NominalResolution
realm: List[Realm]
source: str
source_id: SourceID
source_type: List[SourceType]
sub_experiment: str | Literal["none"]
sub_experiment_id: SubExperimentID | Literal["none"]
table_id: TableID
variable_id: str
variant_label: str
initialization_index: int
physics_index: int
realization_index: int
forcing_index: int
tracking_id: str = ""
version: str = Field("")
product: str
license: str
grid: str
mip_era: str

model_config = ConfigDict(alias_generator=add_cmip6_prefix, populate_by_name=True)

@field_validator("initialization_index", "physics_index", "realization_index", "forcing_index", mode="before")
@classmethod
def only_item(cls, v: list[int], info: FieldValidationInfo):
"""Pick single item from list."""
assert len(v) == 1, f"{info.field_name} must have one item only."
return v[0]

@field_validator("realm", "source_type", mode="before")
@classmethod
def split(cls, v: str, info: FieldValidationInfo):
"""Split string into list."""
return v.split(" ")

@field_validator("version")
@classmethod
def validate_version(cls, v: str, info: FieldValidationInfo):
assert v[0] == "v", "Version string should begin with a lower case 'v'"
assert v[1:].isdigit(), "All characters in version string, except first, should be digits"
return v


class CMIP6populator(STACpopulatorBase):
item_properties_model = CMIP6ItemProperties
item_geometry_model = GeoJSONPolygon

def __init__(self, stac_host: str, data_loader: GenericLoader, update: Optional[bool] = False) -> None:
"""Constructor
:param stac_host: URL to the STAC API
:type stac_host: str
:param thredds_catalog_url: the URL to the THREDDS catalog to ingest
:type thredds_catalog_url: str
"""
super().__init__(stac_host, data_loader, update)

@staticmethod
def make_cmip6_item_id(attrs: MutableMapping[str, Any]) -> str:
"""Return a unique ID for CMIP6 data item."""
keys = [
"activity_id",
"institution_id",
"source_id",
"experiment_id",
"variant_label",
"table_id",
"variable_id",
"grid_label",
]
name = "_".join(attrs[k] for k in keys)
return name

def create_stac_item(self, item_name: str, item_data: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
"""Creates the STAC item.
:param item_name: name of the STAC item. Interpretation of name is left to the input loader implementation
:type item_name: str
:param item_data: dictionary like representation of all information on the item
:type item_data: MutableMapping[str, Any]
:return: _description_
:rtype: MutableMapping[str, Any]
"""
iid = self.make_cmip6_item_id(item_data["attributes"])

try:
item = STAC_item_from_metadata(iid, item_data, self.item_properties_model, self.item_geometry_model)
except pydantic_core._pydantic_core.ValidationError:
print(f"ERROR: ValidationError for {iid}")
return -1

# Add the CMIP6 STAC extension
item.stac_extensions.append(
"https://raw.githubusercontent.com/TomAugspurger/cmip6/main/json-schema/schema.json"
)

# Add datacube extension
try:
dchelper = DataCubeHelper(item_data)
dc_ext = DatacubeExtension.ext(item, add_if_missing=True)
dc_ext.apply(dimensions=dchelper.dimensions, variables=dchelper.variables)
except:
LOGGER.warning(f"Failed to add Datacube extension to item {item_name}")

# print(json.dumps(item.to_dict()))
return json.loads(json.dumps(item.to_dict()))


if __name__ == "__main__":
parser = argparse.ArgumentParser(prog="CMIP6 STAC populator")
parser.add_argument("stac_host", type=str, help="STAC API address")
parser.add_argument("thredds_catalog_URL", type=str, help="URL to the CMIP6 THREDDS catalog")
parser.add_argument("--update", action="store_true", help="Update collection and its items")

args = parser.parse_args()

LOGGER.info(f"Arguments to call: {args}")

mode = "full"

if mode == "full":
data_loader = THREDDSLoader(args.thredds_catalog_URL)
else:
# To be implemented
data_loader = ErrorLoader(args.error_file)

c = CMIP6populator(args.stac_host, data_loader, args.update)
c.ingest()
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
title: CMIP6
id: CMIP6_UofT
description: Coupled Model Intercomparison Project phase 6
keywords: ['CMIP', 'CMIP6', 'WCRP', 'Climate Change']
license: "CC-BY-4.0"
Expand Down
Loading

0 comments on commit 9cd2ced

Please sign in to comment.