Skip to content

Commit

Permalink
add requests session capabilities, CLI args parsing for session auth …
Browse files Browse the repository at this point in the history
…+ working STAC Collection/Items dir iter loading
  • Loading branch information
fmigneault committed Nov 14, 2023
1 parent fa78f85 commit 1305b48
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 47 deletions.
48 changes: 31 additions & 17 deletions STACpopulator/api_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Any, Optional

import requests
from requests import Session
from colorlog import ColoredFormatter

LOGGER = logging.getLogger(__name__)
Expand All @@ -15,27 +16,36 @@
LOGGER.propagate = False


def stac_host_reachable(url: str) -> bool:
def stac_host_reachable(url: str, session: Optional[Session] = None) -> bool:
try:
registry = requests.get(url)
registry.raise_for_status()
return True
except (requests.exceptions.RequestException, requests.exceptions.ConnectionError):
return False


def stac_collection_exists(stac_host: str, collection_id: str) -> bool:
session = session or requests
response = session.get(url, headers={"Accept": "application/json"})
response.raise_for_status()
body = response.json()
if body["type"] == "Catalog" and "stac_version" in body:
return True
except (requests.exceptions.RequestException, requests.exceptions.ConnectionError) as exc:
LOGGER.error("Could not validate STAC host. Not reachable [%s] due to [%s]", url, exc, exc_info=exc)
return False


def stac_collection_exists(stac_host: str, collection_id: str, session: Optional[Session] = None) -> bool:
"""
Get a STAC collection
Returns the collection JSON.
"""
r = requests.get(os.path.join(stac_host, "collections", collection_id), verify=False)

session = session or requests
r = session.get(os.path.join(stac_host, "collections", collection_id), verify=False)
return r.status_code == 200


def post_stac_collection(stac_host: str, json_data: dict[str, Any], update: Optional[bool] = True) -> None:
def post_stac_collection(
stac_host: str,
json_data: dict[str, Any],
update: Optional[bool] = True,
session: Optional[Session] = None,
) -> None:
"""Post/create a collection on the STAC host
:param stac_host: address of the STAC host
Expand All @@ -44,16 +54,18 @@ def post_stac_collection(stac_host: str, json_data: dict[str, Any], update: Opti
:type json_data: dict[str, Any]
:param update: if True, update the collection on the host server if it is already present, defaults to True
:type update: Optional[bool], optional
:param session: Session with additional configuration to perform requests.
"""
session = session or requests
collection_id = json_data["id"]
r = requests.post(os.path.join(stac_host, "collections"), json=json_data, verify=False)
r = session.post(os.path.join(stac_host, "collections"), json=json_data)

if r.status_code == 200:
LOGGER.info(f"Collection {collection_id} successfully created")
elif r.status_code == 409:
if update:
LOGGER.info(f"Collection {collection_id} already exists. Updating.")
r = requests.put(os.path.join(stac_host, "collections"), json=json_data, verify=False)
r = session.put(os.path.join(stac_host, "collections"), json=json_data)
r.raise_for_status()
else:
LOGGER.info(f"Collection {collection_id} already exists.")
Expand All @@ -67,6 +79,7 @@ def post_stac_item(
item_name: str,
json_data: dict[str, dict],
update: Optional[bool] = True,
session: Optional[Session] = None,
) -> None:
"""Post a STAC item to the host server.
Expand All @@ -80,17 +93,18 @@ def post_stac_item(
:type json_data: dict[str, dict]
:param update: if True, update the item on the host server if it is already present, defaults to True
:type update: Optional[bool], optional
:param session: Session with additional configuration to perform requests.
"""
item_id = json_data["id"]

r = requests.post(os.path.join(stac_host, f"collections/{collection_id}/items"), json=json_data)
session = session or requests
r = session.post(os.path.join(stac_host, f"collections/{collection_id}/items"), json=json_data)

if r.status_code == 200:
LOGGER.info(f"Item {item_name} successfully added")
elif r.status_code == 409:
if update:
LOGGER.info(f"Item {item_id} already exists. Updating.")
r = requests.put(os.path.join(stac_host, f"collections/{collection_id}/items/{item_id}"), json=json_data)
r = session.put(os.path.join(stac_host, f"collections/{collection_id}/items/{item_id}"), json=json_data)
r.raise_for_status()
else:
LOGGER.info(f"Item {item_id} already exists.")
Expand Down
74 changes: 74 additions & 0 deletions STACpopulator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,85 @@
import sys
from typing import Callable, Optional

import requests
from http import cookiejar
from requests.auth import AuthBase, HTTPBasicAuth, HTTPDigestAuth, HTTPProxyAuth
from requests.sessions import Session

from STACpopulator import __version__

POPULATORS = {}


class HTTPBearerTokenAuth(AuthBase):
def __init__(self, token: str) -> None:
self._token = token

def __call__(self, r: requests.PreparedRequest) -> requests.PreparedRequest:
r.headers["Authorization"] = f"Bearer {self._token}"
return r


class HTTPCookieAuth(AuthBase):
"""
Employ a cookie-jar file for authorization.
Useful command:
.. code-block:: shell
curl --cookie-jar /path/to/cookie-jar.txt [authorization-provider-arguments]
"""
def __init__(self, cookie_jar: str) -> None:
self._cookie_jar = cookie_jar

def __call__(self, r: requests.PreparedRequest) -> requests.PreparedRequest:
r.prepare_cookies(cookiejar.FileCookieJar(self._cookie_jar))
return r


def add_request_options(parser: argparse.ArgumentParser) -> None:
"""
Adds arguments to a parser to allow update of a request session definition used across a populator procedure.
"""
parser.add_argument(
"--no-verify", "--no-ssl", "--no-ssl-verify", dest="verify", action="store_false",
help="Disable SSL verification (not recommended unless for development/test servers)."
)
parser.add_argument(
"--cert", type=argparse.FileType(), required=False, help="Path to a certificate file to use."
)
parser.add_argument(
"--auth-handler", choices=["basic", "digest", "bearer", "proxy", "cookie"], required=False,
help="Authentication strategy to employ for the requests session."
)
parser.add_argument(
"--auth-identity", required=False,
help="Bearer token, cookie-jar file or proxy/digest/basic username:password for selected authorization handler."
)


def apply_request_options(session: Session, namespace: argparse.Namespace) -> None:
"""
Applies the relevant request session options from parsed input arguments.
"""
session.verify = namespace.verify
session.cert = namespace.cert
if namespace.auth_handler in ["basic", "digest", "proxy"]:
usr, pwd = namespace.auth_identity.split(":", 1)
if namespace.auth_handler == "basic":
session.auth = HTTPBasicAuth(usr, pwd)
elif namespace.auth_handler == "digest":
session.auth = HTTPDigestAuth(usr, pwd)
else:
session.auth = HTTPProxyAuth(usr, pwd)
elif namespace.auth_handler == "bearer":
session.auth = HTTPBearerTokenAuth(namespace.auth_identity)
elif namespace.auth_handler == "cookie":
session.auth = HTTPCookieAuth(namespace.auth_identity)


def make_main_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="stac-populator", description="STACpopulator operations.")
parser.add_argument("--version", "-V", action="version", version=f"%(prog)s {__version__}",
Expand Down
14 changes: 10 additions & 4 deletions STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import argparse
import json
import logging
from datetime import datetime
from typing import Any, List, Literal, MutableMapping, NoReturn, Optional

import pydantic_core
import pyessv
from requests.sessions import Session
from pydantic import AnyHttpUrl, ConfigDict, Field, FieldValidationInfo, field_validator
from pystac.extensions.datacube import DatacubeExtension

Expand Down Expand Up @@ -98,15 +98,21 @@ class CMIP6populator(STACpopulatorBase):
item_properties_model = CMIP6ItemProperties
item_geometry_model = GeoJSONPolygon

def __init__(self, stac_host: str, data_loader: GenericLoader, update: Optional[bool] = False) -> None:
def __init__(
self,
stac_host: str,
data_loader: GenericLoader,
update: Optional[bool] = False,
session: Optional[Session] = None,
) -> None:
"""Constructor
:param stac_host: URL to the STAC API
:type stac_host: str
:param thredds_catalog_url: the URL to the THREDDS catalog to ingest
:type thredds_catalog_url: str
"""
super().__init__(stac_host, data_loader, update)
super().__init__(stac_host, data_loader, update=update, session=session)

@staticmethod
def make_cmip6_item_id(attrs: MutableMapping[str, Any]) -> str:
Expand Down Expand Up @@ -184,7 +190,7 @@ def runner(ns: argparse.Namespace) -> Optional[int] | NoReturn:

def main(*args: str) -> Optional[int]:
parser = make_parser()
ns = parser.parse_args(args)
ns = parser.parse_args(args or None)
return runner(ns)


Expand Down
31 changes: 21 additions & 10 deletions STACpopulator/implementations/DirectoryLoader/crawl_directory.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import argparse
import os.path
from typing import NoReturn, Optional, MutableMapping, Any

from requests.sessions import Session

from STACpopulator.cli import add_request_options, apply_request_options
from STACpopulator.input import STACDirectoryLoader
from STACpopulator.models import GeoJSONPolygon, STACItemProperties
from STACpopulator.populator_base import STACpopulatorBase
Expand All @@ -16,15 +20,18 @@ def __init__(
stac_host: str,
loader: STACDirectoryLoader,
update: bool,
collection: MutableMapping[str, Any],
collection: dict[str, Any],
session: Optional[Session] = None,
) -> None:
self._collection_info = collection
super().__init__(stac_host, loader, update)
self._collection = collection
super().__init__(stac_host, loader, update=update, session=session)

def load_config(self):
pass # ignore
def load_config(self) -> MutableMapping[str, Any]:
self._collection_info = self._collection
return self._collection_info

def create_stac_collection(self) -> MutableMapping[str, Any]:
self.publish_stac_collection(self._collection_info)
return self._collection_info

def create_stac_item(self, item_name: str, item_data: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
Expand All @@ -40,21 +47,25 @@ def make_parser() -> argparse.ArgumentParser:
"--prune", action="store_true",
help="Limit search of STAC Collections only to first top-most matches in the crawled directory structure."
)
add_request_options(parser)
return parser


def runner(ns: argparse.Namespace) -> Optional[int] | NoReturn:
LOGGER.info(f"Arguments to call: {vars(ns)}")

for collection_path, collection_json in STACDirectoryLoader(ns.directory, "collection", ns.prune):
loader = STACDirectoryLoader(collection_path, "item", False)
populator = DirectoryPopulator(ns.stac_host, loader, ns.update, collection_json)
populator.ingest()
with Session() as session:
apply_request_options(session, ns)
for collection_path, collection_json in STACDirectoryLoader(ns.directory, "collection", ns.prune):
collection_dir = os.path.dirname(collection_path)
loader = STACDirectoryLoader(collection_dir, "item", False)
populator = DirectoryPopulator(ns.stac_host, loader, ns.update, collection_json, session=session)
populator.ingest()


def main(*args: str) -> Optional[int]:
parser = make_parser()
ns = parser.parse_args(args)
ns = parser.parse_args(args or None)
return runner(ns)


Expand Down
20 changes: 9 additions & 11 deletions STACpopulator/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class STACDirectoryLoader(GenericLoader):
.. code-block:: python
for collection_path, collection_json in STACDirectoryLoader(dir_path, mode="collection"):
for item_path, item_json in STACDirectoryLoader(collection_path, mode="item"):
for item_path, item_json in STACDirectoryLoader(os.path.dirname(collection_path), mode="item"):
... # do stuff
For convenience, option ``prune`` can be used to stop crawling deeper once a STAC Collection is found.
Expand All @@ -166,20 +166,18 @@ def __init__(self, path: str, mode: Literal["collection", "item"], prune: bool =

def __iter__(self) -> Iterator[Tuple[str, MutableMapping[str, Any]]]:
for root, dirs, files in self.iter:
if self.prune and self._collection_mode and self._collection_name in files:
del dirs[:]
# since there can ever be only one 'collection' file name in a same directory
# directly retrieve it instead of looping through all other files
if self._collection_mode and self._collection_name in files:
if self.prune: # stop recursive search if requested
del dirs[:]
col_path = os.path.join(root, self._collection_name)
yield col_path, self._load_json(col_path)
for name in files:
if self._collection_mode and self._is_collection(name):
col_path = os.path.join(root, name)
yield col_path, self._load_json(col_path)
elif not self._collection_mode and self._is_item(name):
if not self._collection_mode and self._is_item(name):
item_path = os.path.join(root, name)
yield item_path, self._load_json(item_path)

def _is_collection(self, path: Union[os.PathLike[str], str]) -> bool:
name = os.path.split(path)[-1]
return name == self._collection_name

def _is_item(self, path: Union[os.PathLike[str], str]) -> bool:
name = os.path.split(path)[-1]
return name != self._collection_name and os.path.splitext(name)[-1] in [".json", ".geojson"]
Expand Down
Loading

0 comments on commit 1305b48

Please sign in to comment.