diff --git a/STACpopulator/api_requests.py b/STACpopulator/api_requests.py index e7a5380..58f4d1e 100644 --- a/STACpopulator/api_requests.py +++ b/STACpopulator/api_requests.py @@ -3,6 +3,7 @@ from typing import Any, Optional import requests +from requests import Session from colorlog import ColoredFormatter LOGGER = logging.getLogger(__name__) @@ -15,27 +16,36 @@ LOGGER.propagate = False -def stac_host_reachable(url: str) -> bool: +def stac_host_reachable(url: str, session: Optional[Session] = None) -> bool: try: - registry = requests.get(url) - registry.raise_for_status() - return True - except (requests.exceptions.RequestException, requests.exceptions.ConnectionError): - return False - - -def stac_collection_exists(stac_host: str, collection_id: str) -> bool: + session = session or requests + response = session.get(url, headers={"Accept": "application/json"}) + response.raise_for_status() + body = response.json() + if body["type"] == "Catalog" and "stac_version" in body: + return True + except (requests.exceptions.RequestException, requests.exceptions.ConnectionError) as exc: + LOGGER.error("Could not validate STAC host. Not reachable [%s] due to [%s]", url, exc, exc_info=exc) + return False + + +def stac_collection_exists(stac_host: str, collection_id: str, session: Optional[Session] = None) -> bool: """ Get a STAC collection Returns the collection JSON. """ - r = requests.get(os.path.join(stac_host, "collections", collection_id), verify=False) - + session = session or requests + r = session.get(os.path.join(stac_host, "collections", collection_id), verify=False) return r.status_code == 200 -def post_stac_collection(stac_host: str, json_data: dict[str, Any], update: Optional[bool] = True) -> None: +def post_stac_collection( + stac_host: str, + json_data: dict[str, Any], + update: Optional[bool] = True, + session: Optional[Session] = None, +) -> None: """Post/create a collection on the STAC host :param stac_host: address of the STAC host @@ -44,16 +54,18 @@ def post_stac_collection(stac_host: str, json_data: dict[str, Any], update: Opti :type json_data: dict[str, Any] :param update: if True, update the collection on the host server if it is already present, defaults to True :type update: Optional[bool], optional + :param session: Session with additional configuration to perform requests. """ + session = session or requests collection_id = json_data["id"] - r = requests.post(os.path.join(stac_host, "collections"), json=json_data, verify=False) + r = session.post(os.path.join(stac_host, "collections"), json=json_data) if r.status_code == 200: LOGGER.info(f"Collection {collection_id} successfully created") elif r.status_code == 409: if update: LOGGER.info(f"Collection {collection_id} already exists. Updating.") - r = requests.put(os.path.join(stac_host, "collections"), json=json_data, verify=False) + r = session.put(os.path.join(stac_host, "collections"), json=json_data) r.raise_for_status() else: LOGGER.info(f"Collection {collection_id} already exists.") @@ -67,6 +79,7 @@ def post_stac_item( item_name: str, json_data: dict[str, dict], update: Optional[bool] = True, + session: Optional[Session] = None, ) -> None: """Post a STAC item to the host server. @@ -80,17 +93,18 @@ def post_stac_item( :type json_data: dict[str, dict] :param update: if True, update the item on the host server if it is already present, defaults to True :type update: Optional[bool], optional + :param session: Session with additional configuration to perform requests. """ item_id = json_data["id"] - - r = requests.post(os.path.join(stac_host, f"collections/{collection_id}/items"), json=json_data) + session = session or requests + r = session.post(os.path.join(stac_host, f"collections/{collection_id}/items"), json=json_data) if r.status_code == 200: LOGGER.info(f"Item {item_name} successfully added") elif r.status_code == 409: if update: LOGGER.info(f"Item {item_id} already exists. Updating.") - r = requests.put(os.path.join(stac_host, f"collections/{collection_id}/items/{item_id}"), json=json_data) + r = session.put(os.path.join(stac_host, f"collections/{collection_id}/items/{item_id}"), json=json_data) r.raise_for_status() else: LOGGER.info(f"Item {item_id} already exists.") diff --git a/STACpopulator/cli.py b/STACpopulator/cli.py index d209757..f91d97b 100644 --- a/STACpopulator/cli.py +++ b/STACpopulator/cli.py @@ -5,11 +5,85 @@ import sys from typing import Callable, Optional +import requests +from http import cookiejar +from requests.auth import AuthBase, HTTPBasicAuth, HTTPDigestAuth, HTTPProxyAuth +from requests.sessions import Session + from STACpopulator import __version__ POPULATORS = {} +class HTTPBearerTokenAuth(AuthBase): + def __init__(self, token: str) -> None: + self._token = token + + def __call__(self, r: requests.PreparedRequest) -> requests.PreparedRequest: + r.headers["Authorization"] = f"Bearer {self._token}" + return r + + +class HTTPCookieAuth(AuthBase): + """ + Employ a cookie-jar file for authorization. + + Useful command: + + .. code-block:: shell + + curl --cookie-jar /path/to/cookie-jar.txt [authorization-provider-arguments] + + """ + def __init__(self, cookie_jar: str) -> None: + self._cookie_jar = cookie_jar + + def __call__(self, r: requests.PreparedRequest) -> requests.PreparedRequest: + r.prepare_cookies(cookiejar.FileCookieJar(self._cookie_jar)) + return r + + +def add_request_options(parser: argparse.ArgumentParser) -> None: + """ + Adds arguments to a parser to allow update of a request session definition used across a populator procedure. + """ + parser.add_argument( + "--no-verify", "--no-ssl", "--no-ssl-verify", dest="verify", action="store_false", + help="Disable SSL verification (not recommended unless for development/test servers)." + ) + parser.add_argument( + "--cert", type=argparse.FileType(), required=False, help="Path to a certificate file to use." + ) + parser.add_argument( + "--auth-handler", choices=["basic", "digest", "bearer", "proxy", "cookie"], required=False, + help="Authentication strategy to employ for the requests session." + ) + parser.add_argument( + "--auth-identity", required=False, + help="Bearer token, cookie-jar file or proxy/digest/basic username:password for selected authorization handler." + ) + + +def apply_request_options(session: Session, namespace: argparse.Namespace) -> None: + """ + Applies the relevant request session options from parsed input arguments. + """ + session.verify = namespace.verify + session.cert = namespace.cert + if namespace.auth_handler in ["basic", "digest", "proxy"]: + usr, pwd = namespace.auth_identity.split(":", 1) + if namespace.auth_handler == "basic": + session.auth = HTTPBasicAuth(usr, pwd) + elif namespace.auth_handler == "digest": + session.auth = HTTPDigestAuth(usr, pwd) + else: + session.auth = HTTPProxyAuth(usr, pwd) + elif namespace.auth_handler == "bearer": + session.auth = HTTPBearerTokenAuth(namespace.auth_identity) + elif namespace.auth_handler == "cookie": + session.auth = HTTPCookieAuth(namespace.auth_identity) + + def make_main_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(prog="stac-populator", description="STACpopulator operations.") parser.add_argument("--version", "-V", action="version", version=f"%(prog)s {__version__}", diff --git a/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py b/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py index c36f5e2..6dd30d6 100644 --- a/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py +++ b/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py @@ -1,11 +1,11 @@ import argparse import json -import logging from datetime import datetime from typing import Any, List, Literal, MutableMapping, NoReturn, Optional import pydantic_core import pyessv +from requests.sessions import Session from pydantic import AnyHttpUrl, ConfigDict, Field, FieldValidationInfo, field_validator from pystac.extensions.datacube import DatacubeExtension @@ -98,7 +98,13 @@ class CMIP6populator(STACpopulatorBase): item_properties_model = CMIP6ItemProperties item_geometry_model = GeoJSONPolygon - def __init__(self, stac_host: str, data_loader: GenericLoader, update: Optional[bool] = False) -> None: + def __init__( + self, + stac_host: str, + data_loader: GenericLoader, + update: Optional[bool] = False, + session: Optional[Session] = None, + ) -> None: """Constructor :param stac_host: URL to the STAC API @@ -106,7 +112,7 @@ def __init__(self, stac_host: str, data_loader: GenericLoader, update: Optional[ :param thredds_catalog_url: the URL to the THREDDS catalog to ingest :type thredds_catalog_url: str """ - super().__init__(stac_host, data_loader, update) + super().__init__(stac_host, data_loader, update=update, session=session) @staticmethod def make_cmip6_item_id(attrs: MutableMapping[str, Any]) -> str: @@ -184,7 +190,7 @@ def runner(ns: argparse.Namespace) -> Optional[int] | NoReturn: def main(*args: str) -> Optional[int]: parser = make_parser() - ns = parser.parse_args(args) + ns = parser.parse_args(args or None) return runner(ns) diff --git a/STACpopulator/implementations/DirectoryLoader/crawl_directory.py b/STACpopulator/implementations/DirectoryLoader/crawl_directory.py index b6a15dc..d2517d8 100644 --- a/STACpopulator/implementations/DirectoryLoader/crawl_directory.py +++ b/STACpopulator/implementations/DirectoryLoader/crawl_directory.py @@ -1,6 +1,10 @@ import argparse +import os.path from typing import NoReturn, Optional, MutableMapping, Any +from requests.sessions import Session + +from STACpopulator.cli import add_request_options, apply_request_options from STACpopulator.input import STACDirectoryLoader from STACpopulator.models import GeoJSONPolygon, STACItemProperties from STACpopulator.populator_base import STACpopulatorBase @@ -16,15 +20,18 @@ def __init__( stac_host: str, loader: STACDirectoryLoader, update: bool, - collection: MutableMapping[str, Any], + collection: dict[str, Any], + session: Optional[Session] = None, ) -> None: - self._collection_info = collection - super().__init__(stac_host, loader, update) + self._collection = collection + super().__init__(stac_host, loader, update=update, session=session) - def load_config(self): - pass # ignore + def load_config(self) -> MutableMapping[str, Any]: + self._collection_info = self._collection + return self._collection_info def create_stac_collection(self) -> MutableMapping[str, Any]: + self.publish_stac_collection(self._collection_info) return self._collection_info def create_stac_item(self, item_name: str, item_data: MutableMapping[str, Any]) -> MutableMapping[str, Any]: @@ -40,21 +47,25 @@ def make_parser() -> argparse.ArgumentParser: "--prune", action="store_true", help="Limit search of STAC Collections only to first top-most matches in the crawled directory structure." ) + add_request_options(parser) return parser def runner(ns: argparse.Namespace) -> Optional[int] | NoReturn: LOGGER.info(f"Arguments to call: {vars(ns)}") - for collection_path, collection_json in STACDirectoryLoader(ns.directory, "collection", ns.prune): - loader = STACDirectoryLoader(collection_path, "item", False) - populator = DirectoryPopulator(ns.stac_host, loader, ns.update, collection_json) - populator.ingest() + with Session() as session: + apply_request_options(session, ns) + for collection_path, collection_json in STACDirectoryLoader(ns.directory, "collection", ns.prune): + collection_dir = os.path.dirname(collection_path) + loader = STACDirectoryLoader(collection_dir, "item", False) + populator = DirectoryPopulator(ns.stac_host, loader, ns.update, collection_json, session=session) + populator.ingest() def main(*args: str) -> Optional[int]: parser = make_parser() - ns = parser.parse_args(args) + ns = parser.parse_args(args or None) return runner(ns) diff --git a/STACpopulator/input.py b/STACpopulator/input.py index 59861ee..62ebcfe 100644 --- a/STACpopulator/input.py +++ b/STACpopulator/input.py @@ -147,7 +147,7 @@ class STACDirectoryLoader(GenericLoader): .. code-block:: python for collection_path, collection_json in STACDirectoryLoader(dir_path, mode="collection"): - for item_path, item_json in STACDirectoryLoader(collection_path, mode="item"): + for item_path, item_json in STACDirectoryLoader(os.path.dirname(collection_path), mode="item"): ... # do stuff For convenience, option ``prune`` can be used to stop crawling deeper once a STAC Collection is found. @@ -166,20 +166,18 @@ def __init__(self, path: str, mode: Literal["collection", "item"], prune: bool = def __iter__(self) -> Iterator[Tuple[str, MutableMapping[str, Any]]]: for root, dirs, files in self.iter: - if self.prune and self._collection_mode and self._collection_name in files: - del dirs[:] + # since there can ever be only one 'collection' file name in a same directory + # directly retrieve it instead of looping through all other files + if self._collection_mode and self._collection_name in files: + if self.prune: # stop recursive search if requested + del dirs[:] + col_path = os.path.join(root, self._collection_name) + yield col_path, self._load_json(col_path) for name in files: - if self._collection_mode and self._is_collection(name): - col_path = os.path.join(root, name) - yield col_path, self._load_json(col_path) - elif not self._collection_mode and self._is_item(name): + if not self._collection_mode and self._is_item(name): item_path = os.path.join(root, name) yield item_path, self._load_json(item_path) - def _is_collection(self, path: Union[os.PathLike[str], str]) -> bool: - name = os.path.split(path)[-1] - return name == self._collection_name - def _is_item(self, path: Union[os.PathLike[str], str]) -> bool: name = os.path.split(path)[-1] return name != self._collection_name and os.path.splitext(name)[-1] in [".json", ".geojson"] diff --git a/STACpopulator/populator_base.py b/STACpopulator/populator_base.py index 55db015..a2f1cb3 100644 --- a/STACpopulator/populator_base.py +++ b/STACpopulator/populator_base.py @@ -6,6 +6,7 @@ import pystac from colorlog import ColoredFormatter +from requests.sessions import Session from STACpopulator.api_requests import ( post_stac_collection, @@ -31,6 +32,7 @@ def __init__( stac_host: str, data_loader: GenericLoader, update: Optional[bool] = False, + session: Optional[Session] = None, ) -> None: """Constructor @@ -43,15 +45,15 @@ def __init__( super().__init__() self._collection_info = None + self._session = session self.load_config() self._ingest_pipeline = data_loader self._stac_host = self.validate_host(stac_host) self.update = update - self._collection_id = self.collection_name LOGGER.info("Initialization complete") - LOGGER.info(f"Collection {self.collection_name} is assigned id {self._collection_id}") + LOGGER.info(f"Collection {self.collection_name} is assigned ID {self.collection_id}") self.create_stac_collection() def load_config(self): @@ -90,7 +92,7 @@ def create_stac_item(self, item_name: str, item_data: dict[str, Any]) -> dict[st def validate_host(self, stac_host: str) -> str: if not url_validate(stac_host): raise ValueError("stac_host URL is not appropriately formatted") - if not stac_host_reachable(stac_host): + if not stac_host_reachable(stac_host, session=self._session): raise RuntimeError("stac_host is not reachable") return stac_host @@ -126,7 +128,7 @@ def create_stac_collection(self) -> dict[str, Any]: return collection_data def publish_stac_collection(self, collection_data: dict[str, Any]) -> None: - post_stac_collection(self.stac_host, collection_data, self.update) + post_stac_collection(self.stac_host, collection_data, self.update, session=self._session) def ingest(self) -> None: LOGGER.info("Data ingestion") @@ -134,4 +136,11 @@ def ingest(self) -> None: LOGGER.info(f"Creating STAC representation for {item_name}") stac_item = self.create_stac_item(item_name, item_data) if stac_item != -1: - post_stac_item(self.stac_host, self.collection_id, item_name, stac_item, self.update) + post_stac_item( + self.stac_host, + self.collection_id, + item_name, + stac_item, + update=self.update, + session=self._session, + )