diff --git a/.gitignore b/.gitignore index 9d5674a..c6c977b 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,7 @@ reports STACpopulator.egg-info/ build *.pyc + +## Logs +*.jsonl +*.json \ No newline at end of file diff --git a/STACpopulator/api_requests.py b/STACpopulator/api_requests.py index 793fd03..05aeaa5 100644 --- a/STACpopulator/api_requests.py +++ b/STACpopulator/api_requests.py @@ -1,19 +1,11 @@ import logging import os -from typing import Any, Optional +from typing import Any, Optional, Union import requests from requests import Session -from colorlog import ColoredFormatter LOGGER = logging.getLogger(__name__) -LOGFORMAT = " %(log_color)s%(levelname)s:%(reset)s %(blue)s[%(name)-30s]%(reset)s %(message)s" -formatter = ColoredFormatter(LOGFORMAT) -stream = logging.StreamHandler() -stream.setFormatter(formatter) -LOGGER.addHandler(stream) -LOGGER.setLevel(logging.INFO) -LOGGER.propagate = False def stac_host_reachable(url: str, session: Optional[Session] = None) -> bool: @@ -109,6 +101,6 @@ def post_stac_item( r = session.put(os.path.join(stac_host, f"collections/{collection_id}/items/{item_id}"), json=json_data) r.raise_for_status() else: - LOGGER.info(f"Item {item_id} already exists.") + LOGGER.warn(f"Item {item_id} already exists.") else: r.raise_for_status() diff --git a/STACpopulator/cli.py b/STACpopulator/cli.py index 2a2dad2..7cde7b6 100644 --- a/STACpopulator/cli.py +++ b/STACpopulator/cli.py @@ -1,16 +1,19 @@ import argparse import glob import importlib +import logging import os import sys +from datetime import datetime +from http import cookiejar from typing import Callable, Optional import requests -from http import cookiejar from requests.auth import AuthBase, HTTPBasicAuth, HTTPDigestAuth, HTTPProxyAuth from requests.sessions import Session from STACpopulator import __version__ +from STACpopulator.logging import setup_logging POPULATORS = {} @@ -54,19 +57,24 @@ def add_request_options(parser: argparse.ArgumentParser) -> None: Adds arguments to a parser to allow update of a request session definition used across a populator procedure. """ parser.add_argument( - "--no-verify", "--no-ssl", "--no-ssl-verify", dest="verify", action="store_false", - help="Disable SSL verification (not recommended unless for development/test servers)." - ) - parser.add_argument( - "--cert", type=argparse.FileType(), required=False, help="Path to a certificate file to use." + "--no-verify", + "--no-ssl", + "--no-ssl-verify", + dest="verify", + action="store_false", + help="Disable SSL verification (not recommended unless for development/test servers).", ) + parser.add_argument("--cert", type=argparse.FileType(), required=False, help="Path to a certificate file to use.") parser.add_argument( - "--auth-handler", choices=["basic", "digest", "bearer", "proxy", "cookie"], required=False, - help="Authentication strategy to employ for the requests session." + "--auth-handler", + choices=["basic", "digest", "bearer", "proxy", "cookie"], + required=False, + help="Authentication strategy to employ for the requests session.", ) parser.add_argument( - "--auth-identity", required=False, - help="Bearer token, cookie-jar file or proxy/digest/basic username:password for selected authorization handler." + "--auth-identity", + required=False, + help="Bearer token, cookie-jar file or proxy/digest/basic username:password for selected authorization handler.", ) @@ -93,19 +101,29 @@ def apply_request_options(session: Session, namespace: argparse.Namespace) -> No def make_main_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(prog="stac-populator", description="STACpopulator operations.") - parser.add_argument("--version", "-V", action="version", version=f"%(prog)s {__version__}", - help="prints the version of the library and exits") + parser.add_argument( + "--version", + "-V", + action="version", + version=f"%(prog)s {__version__}", + help="prints the version of the library and exits", + ) commands = parser.add_subparsers(title="command", dest="command", description="STAC populator command to execute.") run_cmd_parser = make_run_command_parser(parser.prog) commands.add_parser( "run", - prog=f"{parser.prog} {run_cmd_parser.prog}", parents=[run_cmd_parser], - formatter_class=run_cmd_parser.formatter_class, usage=run_cmd_parser.usage, - add_help=False, help=run_cmd_parser.description, description=run_cmd_parser.description + prog=f"{parser.prog} {run_cmd_parser.prog}", + parents=[run_cmd_parser], + formatter_class=run_cmd_parser.formatter_class, + usage=run_cmd_parser.usage, + add_help=False, + help=run_cmd_parser.description, + description=run_cmd_parser.description, ) # add more commands as needed... + parser.add_argument("--debug", action="store_true", help="Set logger level to debug") return parser @@ -142,9 +160,12 @@ def make_run_command_parser(parent) -> argparse.ArgumentParser: populator_prog = f"{parent} {parser.prog} {populator_name}" subparsers.add_parser( populator_name, - prog=populator_prog, parents=[populator_parser], formatter_class=populator_parser.formatter_class, + prog=populator_prog, + parents=[populator_parser], + formatter_class=populator_parser.formatter_class, add_help=False, # add help disabled otherwise conflicts with this main populator help - help=populator_parser.description, description=populator_parser.description, + help=populator_parser.description, + description=populator_parser.description, usage=populator_parser.usage, ) POPULATORS[populator_name] = { @@ -168,6 +189,12 @@ def main(*args: str) -> Optional[int]: result = None if populator_cmd == "run": populator_name = params.pop("populator") + + # Setup the application logger: + fname = f"{populator_name}_log_{datetime.utcnow().isoformat() + 'Z'}.jsonl" + log_level = logging.DEBUG if ns.debug else logging.INFO + setup_logging(fname, log_level) + if not populator_name: parser.print_help() return 0 diff --git a/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py b/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py index 2a3e3cc..110b269 100644 --- a/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py +++ b/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py @@ -1,21 +1,21 @@ import argparse import json +import logging import os from typing import Any, MutableMapping, NoReturn, Optional, Union -from requests.sessions import Session from pystac.extensions.datacube import DatacubeExtension +from requests.sessions import Session from STACpopulator.cli import add_request_options, apply_request_options -from STACpopulator.extensions.cmip6 import CMIP6Properties, CMIP6Helper +from STACpopulator.extensions.cmip6 import CMIP6Helper, CMIP6Properties from STACpopulator.extensions.datacube import DataCubeHelper -from STACpopulator.extensions.thredds import THREDDSHelper, THREDDSExtension -from STACpopulator.input import GenericLoader, ErrorLoader, THREDDSLoader +from STACpopulator.extensions.thredds import THREDDSExtension, THREDDSHelper +from STACpopulator.input import ErrorLoader, GenericLoader, THREDDSLoader from STACpopulator.models import GeoJSONPolygon from STACpopulator.populator_base import STACpopulatorBase -from STACpopulator.stac_utils import get_logger -LOGGER = get_logger(__name__) +LOGGER = logging.getLogger(__name__) class CMIP6populator(STACpopulatorBase): @@ -29,6 +29,7 @@ def __init__( update: Optional[bool] = False, session: Optional[Session] = None, config_file: Optional[Union[os.PathLike[str], str]] = None, + log_debug: Optional[bool] = False, ) -> None: """Constructor @@ -37,14 +38,12 @@ def __init__( :param data_loader: loader to iterate over ingestion data. """ super().__init__( - stac_host, - data_loader, - update=update, - session=session, - config_file=config_file, + stac_host, data_loader, update=update, session=session, config_file=config_file, log_debug=log_debug ) - def create_stac_item(self, item_name: str, item_data: MutableMapping[str, Any]) -> MutableMapping[str, Any]: + def create_stac_item( + self, item_name: str, item_data: MutableMapping[str, Any] + ) -> Union[None, MutableMapping[str, Any]]: """Creates the STAC item. :param item_name: name of the STAC item. Interpretation of name is left to the input loader implementation @@ -58,26 +57,23 @@ def create_stac_item(self, item_name: str, item_data: MutableMapping[str, Any]) try: cmip_helper = CMIP6Helper(item_data, self.item_geometry_model) item = cmip_helper.stac_item() - except Exception: - LOGGER.error("Failed to add CMIP6 extension to item %s", item_name) - raise + except Exception as e: + raise Exception("Failed to add CMIP6 extension") from e # Add datacube extension try: dc_helper = DataCubeHelper(item_data) dc_ext = DatacubeExtension.ext(item, add_if_missing=True) dc_ext.apply(dimensions=dc_helper.dimensions, variables=dc_helper.variables) - except Exception: - LOGGER.error("Failed to add Datacube extension to item %s", item_name) - raise + except Exception as e: + raise Exception("Failed to add Datacube extension") from e try: thredds_helper = THREDDSHelper(item_data["access_urls"]) thredds_ext = THREDDSExtension.ext(item) thredds_ext.apply(thredds_helper.services, thredds_helper.links) - except Exception: - LOGGER.error("Failed to add THREDDS references to item %s", item_name) - raise + except Exception as e: + raise Exception("Failed to add THREDDS extension") from e # print(json.dumps(item.to_dict())) return json.loads(json.dumps(item.to_dict())) @@ -88,13 +84,19 @@ def make_parser() -> argparse.ArgumentParser: parser.add_argument("stac_host", type=str, help="STAC API address") parser.add_argument("href", type=str, help="URL to a THREDDS catalog or a NCML XML with CMIP6 metadata.") parser.add_argument("--update", action="store_true", help="Update collection and its items") - parser.add_argument("--mode", choices=["full", "single"], default="full", - help="Operation mode, processing the full dataset or only the single reference.") parser.add_argument( - "--config", type=str, help=( + "--mode", + choices=["full", "single"], + default="full", + help="Operation mode, processing the full dataset or only the single reference.", + ) + parser.add_argument( + "--config", + type=str, + help=( "Override configuration file for the populator. " "By default, uses the adjacent configuration to the implementation class." - ) + ), ) add_request_options(parser) return parser @@ -111,7 +113,9 @@ def runner(ns: argparse.Namespace) -> Optional[int] | NoReturn: # To be implemented data_loader = ErrorLoader() - c = CMIP6populator(ns.stac_host, data_loader, update=ns.update, session=session, config_file=ns.config) + c = CMIP6populator( + ns.stac_host, data_loader, update=ns.update, session=session, config_file=ns.config, log_debug=ns.debug + ) c.ingest() diff --git a/STACpopulator/implementations/DirectoryLoader/crawl_directory.py b/STACpopulator/implementations/DirectoryLoader/crawl_directory.py index 5336d08..bae4c61 100644 --- a/STACpopulator/implementations/DirectoryLoader/crawl_directory.py +++ b/STACpopulator/implementations/DirectoryLoader/crawl_directory.py @@ -1,4 +1,5 @@ import argparse +import logging import os.path from typing import Any, MutableMapping, NoReturn, Optional @@ -8,9 +9,8 @@ from STACpopulator.input import STACDirectoryLoader from STACpopulator.models import GeoJSONPolygon from STACpopulator.populator_base import STACpopulatorBase -from STACpopulator.stac_utils import get_logger -LOGGER = get_logger(__name__) +LOGGER = logging.getLogger(__name__) class DirectoryPopulator(STACpopulatorBase): diff --git a/STACpopulator/input.py b/STACpopulator/input.py index a9525e8..daccc33 100644 --- a/STACpopulator/input.py +++ b/STACpopulator/input.py @@ -8,20 +8,12 @@ import requests import siphon import xncml -from colorlog import ColoredFormatter from requests.sessions import Session from siphon.catalog import TDSCatalog, session_manager from STACpopulator.stac_utils import numpy_to_python_datatypes, url_validate LOGGER = logging.getLogger(__name__) -LOGFORMAT = " %(log_color)s%(levelname)s:%(reset)s %(blue)s[%(name)-30s]%(reset)s %(message)s" -formatter = ColoredFormatter(LOGFORMAT) -stream = logging.StreamHandler() -stream.setFormatter(formatter) -LOGGER.addHandler(stream) -LOGGER.setLevel(logging.INFO) -LOGGER.propagate = False class GenericLoader(ABC): @@ -149,7 +141,9 @@ def __iter__(self) -> Iterator[Tuple[str, str, MutableMapping[str, Any]]]: if self.catalog_head.datasets.items(): for item_name, ds in self.catalog_head.datasets.items(): attrs = self.extract_metadata(ds) - yield item_name, ds.url_path, attrs + filename = ds.url_path[ds.url_path.rfind("/") :] + url = self.catalog_head.catalog_url[: self.catalog_head.catalog_url.rfind("/")] + filename + yield item_name, url, attrs for name, ref in self.catalog_head.catalog_refs.items(): self.catalog_head = ref.follow() diff --git a/STACpopulator/logging.py b/STACpopulator/logging.py new file mode 100644 index 0000000..0d7caad --- /dev/null +++ b/STACpopulator/logging.py @@ -0,0 +1,128 @@ +import datetime as dt +import json +import logging + +LOG_RECORD_BUILTIN_ATTRS = { + "args", + "asctime", + "created", + "exc_info", + "exc_text", + "filename", + "funcName", + "levelname", + "levelno", + "lineno", + "module", + "msecs", + "message", + "msg", + "name", + "pathname", + "process", + "processName", + "relativeCreated", + "stack_info", + "thread", + "threadName", + "taskName", +} + + +def setup_logging(logfname: str, log_level: int) -> None: + """Setup the logger for the app. + + :param logfname: name of the file to which to write log outputs + :type logfname: str + :param log_level: base logging level (e.g. "INFO") + :type log_level: str + """ + config = logconfig + config["handlers"]["file"]["filename"] = logfname + for handler in config["handlers"]: + config["handlers"][handler]["level"] = logging.getLevelName(log_level) + logging.config.dictConfig(config) + + +class JSONLogFormatter(logging.Formatter): + # From: https://github.com/mCodingLLC/VideosSampleCode/tree/master/videos/135_modern_logging + def __init__( + self, + *, + fmt_keys: dict[str, str] | None = None, + ): + super().__init__() + self.fmt_keys = fmt_keys if fmt_keys is not None else {} + + def format(self, record: logging.LogRecord) -> str: + message = self._prepare_log_dict(record) + return json.dumps(message, default=str) + + def _prepare_log_dict(self, record: logging.LogRecord): + always_fields = { + "message": record.getMessage(), + "timestamp": dt.datetime.fromtimestamp(record.created, tz=dt.timezone.utc).isoformat(), + } + if record.exc_info is not None: + always_fields["exc_info"] = self.formatException(record.exc_info) + + if record.stack_info is not None: + always_fields["stack_info"] = self.formatStack(record.stack_info) + + message = { + key: msg_val if (msg_val := always_fields.pop(val, None)) is not None else getattr(record, val) + for key, val in self.fmt_keys.items() + } + message.update(always_fields) + + for key, val in record.__dict__.items(): + if key not in LOG_RECORD_BUILTIN_ATTRS: + message[key] = val + + return message + + +class NonErrorFilter(logging.Filter): + def filter(self, record: logging.LogRecord) -> bool | logging.LogRecord: + return record.levelno <= logging.INFO + + +logconfig = { + "version": 1, + "disable_existing_loggers": False, + "formatters": { + "simple": { + "()": "colorlog.ColoredFormatter", + "format": " %(log_color)s%(levelname)s:%(reset)s %(blue)s[%(name)-30s]%(reset)s %(message)s", + "datefmt": "%Y-%m-%dT%H:%M:%S%z", + }, + "json": { + "()": f"{JSONLogFormatter.__module__}.{JSONLogFormatter.__name__}", + "fmt_keys": { + "level": "levelname", + "message": "message", + "timestamp": "timestamp", + "logger": "name", + "module": "module", + "function": "funcName", + "line": "lineno", + "thread_name": "threadName", + }, + }, + }, + "handlers": { + "stderr": { + "class": "logging.StreamHandler", + "level": "INFO", + "formatter": "simple", + "stream": "ext://sys.stderr", + }, + "file": { + "class": "logging.FileHandler", + "level": "INFO", + "formatter": "json", + "filename": "__added_dynamically__", + }, + }, + "loggers": {"root": {"level": "DEBUG", "handlers": ["stderr", "file"]}}, +} diff --git a/STACpopulator/populator_base.py b/STACpopulator/populator_base.py index e3da33e..9e5fa53 100644 --- a/STACpopulator/populator_base.py +++ b/STACpopulator/populator_base.py @@ -1,5 +1,7 @@ import functools import inspect +import json +import logging import os from abc import ABC, abstractmethod from datetime import datetime @@ -15,9 +17,9 @@ ) from STACpopulator.input import GenericLoader from STACpopulator.models import AnyGeometry -from STACpopulator.stac_utils import get_logger, load_config, url_validate +from STACpopulator.stac_utils import load_config, url_validate -LOGGER = get_logger(__name__) +LOGGER = logging.getLogger(__name__) class STACpopulatorBase(ABC): @@ -28,6 +30,7 @@ def __init__( update: Optional[bool] = False, session: Optional[Session] = None, config_file: Optional[Union[os.PathLike[str], str]] = "collection_config.yml", + log_debug: Optional[bool] = False, ) -> None: """Constructor @@ -144,21 +147,44 @@ def publish_stac_collection(self, collection_data: dict[str, Any]) -> None: def ingest(self) -> None: counter = 0 + failures = 0 LOGGER.info("Data ingestion") for item_name, item_loc, item_data in self._ingest_pipeline: - LOGGER.info(f"New data item: {item_name}") - LOGGER.info(f"Data location: {item_loc}") - stac_item = self.create_stac_item(item_name, item_data) - if stac_item: - post_stac_item( - self.stac_host, - self.collection_id, - item_name, - stac_item, - update=self.update, - session=self._session, + LOGGER.info(f"New data item: {item_name}", extra={"item_loc": item_loc}) + try: + stac_item = self.create_stac_item(item_name, item_data) + except Exception: + LOGGER.exception( + f"Failed to create STAC item for {item_name}", + extra={"item_loc": item_loc, "loader": type(self._ingest_pipeline)}, ) - counter += 1 - LOGGER.info(f"Processed {counter} data items") - else: - LOGGER.error("Failed to create STAC representation") + failures += 1 + stac_item = None + + if stac_item: + try: + post_stac_item( + self.stac_host, + self.collection_id, + item_name, + stac_item, + update=self.update, + session=self._session, + ) + except Exception: + # Something went wrong on the server side, most likely because the STAC item generated above has + # incorrect data. Writing the STAC item to file so that the issue could be diagnosed and fixed. + stac_output_fname = "error_STAC_rep_" + item_name.split(".")[0] + ".json" + json.dump(stac_item, open(stac_output_fname, "w"), indent=2) + LOGGER.exception( + f"Failed to post STAC item for {item_name}", + extra={ + "item_loc": item_loc, + "loader": type(self._ingest_pipeline), + "stac_output_fname": stac_output_fname, + }, + ) + failures += 1 + + counter += 1 + LOGGER.info(f"Processed {counter} data items. {failures} failures") diff --git a/STACpopulator/stac_utils.py b/STACpopulator/stac_utils.py index 5c3aa5d..515507a 100644 --- a/STACpopulator/stac_utils.py +++ b/STACpopulator/stac_utils.py @@ -7,24 +7,8 @@ import numpy as np import pystac import yaml -from colorlog import ColoredFormatter - -def get_logger( - name: str, - log_fmt: str = " %(log_color)s%(levelname)s:%(reset)s %(blue)s[%(name)-30s]%(reset)s %(message)s", -) -> logging.Logger: - logger = logging.getLogger(name) - formatter = ColoredFormatter(log_fmt) - stream = logging.StreamHandler() - stream.setFormatter(formatter) - logger.addHandler(stream) - logger.setLevel(logging.INFO) - logger.propagate = False - return logger - - -LOGGER = get_logger(__name__) +LOGGER = logging.getLogger(__name__) def url_validate(target: str) -> bool: