From 1588ba8684ba035ecb06215dc07d241888698089 Mon Sep 17 00:00:00 2001 From: Deepak Chandan Date: Mon, 22 Jan 2024 11:16:13 -0500 Subject: [PATCH 01/11] error handling save to log --- STACpopulator/api_requests.py | 16 ++++++---- .../implementations/CMIP6_UofT/add_CMIP6.py | 32 ++++++++++++------- STACpopulator/populator_base.py | 30 +++++++++++++---- 3 files changed, 53 insertions(+), 25 deletions(-) diff --git a/STACpopulator/api_requests.py b/STACpopulator/api_requests.py index 793fd03..4d790c6 100644 --- a/STACpopulator/api_requests.py +++ b/STACpopulator/api_requests.py @@ -1,10 +1,10 @@ import logging import os -from typing import Any, Optional +from typing import Any, Optional, Union import requests -from requests import Session from colorlog import ColoredFormatter +from requests import Session LOGGER = logging.getLogger(__name__) LOGFORMAT = " %(log_color)s%(levelname)s:%(reset)s %(blue)s[%(name)-30s]%(reset)s %(message)s" @@ -81,7 +81,7 @@ def post_stac_item( json_data: dict[str, dict], update: Optional[bool] = True, session: Optional[Session] = None, -) -> None: +) -> Union[None, str]: """Post a STAC item to the host server. :param stac_host: address of the STAC host @@ -107,8 +107,12 @@ def post_stac_item( if update: LOGGER.info(f"Item {item_id} already exists. Updating.") r = session.put(os.path.join(stac_host, f"collections/{collection_id}/items/{item_id}"), json=json_data) - r.raise_for_status() + return f"Requests: {r.reason}" + # r.raise_for_status() else: - LOGGER.info(f"Item {item_id} already exists.") + LOGGER.warn(f"Item {item_id} already exists.") else: - r.raise_for_status() + return f"Requests: {r.reason}" + # r.raise_for_status() + + return None diff --git a/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py b/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py index 2a3e3cc..4832011 100644 --- a/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py +++ b/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py @@ -3,14 +3,14 @@ import os from typing import Any, MutableMapping, NoReturn, Optional, Union -from requests.sessions import Session from pystac.extensions.datacube import DatacubeExtension +from requests.sessions import Session from STACpopulator.cli import add_request_options, apply_request_options -from STACpopulator.extensions.cmip6 import CMIP6Properties, CMIP6Helper +from STACpopulator.extensions.cmip6 import CMIP6Helper, CMIP6Properties from STACpopulator.extensions.datacube import DataCubeHelper -from STACpopulator.extensions.thredds import THREDDSHelper, THREDDSExtension -from STACpopulator.input import GenericLoader, ErrorLoader, THREDDSLoader +from STACpopulator.extensions.thredds import THREDDSExtension, THREDDSHelper +from STACpopulator.input import ErrorLoader, GenericLoader, THREDDSLoader from STACpopulator.models import GeoJSONPolygon from STACpopulator.populator_base import STACpopulatorBase from STACpopulator.stac_utils import get_logger @@ -44,7 +44,9 @@ def __init__( config_file=config_file, ) - def create_stac_item(self, item_name: str, item_data: MutableMapping[str, Any]) -> MutableMapping[str, Any]: + def create_stac_item( + self, item_name: str, item_data: MutableMapping[str, Any] + ) -> Union[(int, str), MutableMapping[str, Any]]: """Creates the STAC item. :param item_name: name of the STAC item. Interpretation of name is left to the input loader implementation @@ -60,7 +62,7 @@ def create_stac_item(self, item_name: str, item_data: MutableMapping[str, Any]) item = cmip_helper.stac_item() except Exception: LOGGER.error("Failed to add CMIP6 extension to item %s", item_name) - raise + return (-1, "Failed to add CMIP6 extension") # Add datacube extension try: @@ -69,7 +71,7 @@ def create_stac_item(self, item_name: str, item_data: MutableMapping[str, Any]) dc_ext.apply(dimensions=dc_helper.dimensions, variables=dc_helper.variables) except Exception: LOGGER.error("Failed to add Datacube extension to item %s", item_name) - raise + return (-1, "Failed to add Datacube extension") try: thredds_helper = THREDDSHelper(item_data["access_urls"]) @@ -77,7 +79,7 @@ def create_stac_item(self, item_name: str, item_data: MutableMapping[str, Any]) thredds_ext.apply(thredds_helper.services, thredds_helper.links) except Exception: LOGGER.error("Failed to add THREDDS references to item %s", item_name) - raise + return (-1, "Failed to add THREDDS references") # print(json.dumps(item.to_dict())) return json.loads(json.dumps(item.to_dict())) @@ -88,13 +90,19 @@ def make_parser() -> argparse.ArgumentParser: parser.add_argument("stac_host", type=str, help="STAC API address") parser.add_argument("href", type=str, help="URL to a THREDDS catalog or a NCML XML with CMIP6 metadata.") parser.add_argument("--update", action="store_true", help="Update collection and its items") - parser.add_argument("--mode", choices=["full", "single"], default="full", - help="Operation mode, processing the full dataset or only the single reference.") parser.add_argument( - "--config", type=str, help=( + "--mode", + choices=["full", "single"], + default="full", + help="Operation mode, processing the full dataset or only the single reference.", + ) + parser.add_argument( + "--config", + type=str, + help=( "Override configuration file for the populator. " "By default, uses the adjacent configuration to the implementation class." - ) + ), ) add_request_options(parser) return parser diff --git a/STACpopulator/populator_base.py b/STACpopulator/populator_base.py index e3da33e..9c3f8ce 100644 --- a/STACpopulator/populator_base.py +++ b/STACpopulator/populator_base.py @@ -3,7 +3,7 @@ import os from abc import ABC, abstractmethod from datetime import datetime -from typing import Any, MutableMapping, Optional, Type, Union +from typing import Any, MutableMapping, Optional, TextIO, Type, Union import pystac from requests.sessions import Session @@ -142,15 +142,26 @@ def create_stac_collection(self) -> dict[str, Any]: def publish_stac_collection(self, collection_data: dict[str, Any]) -> None: post_stac_collection(self.stac_host, collection_data, self.update, session=self._session) + def open_error_file(self) -> TextIO: + implementation_name = type(self).__name__ + fname = f"stac-populator_{implementation_name}_errors_{datetime.strftime(datetime.now(), '%Y%m%d-%H%M%S')}.txt" + return open(fname, "w", buffering=1) + def ingest(self) -> None: + error_file = self.open_error_file() counter = 0 + failures = 0 LOGGER.info("Data ingestion") for item_name, item_loc, item_data in self._ingest_pipeline: LOGGER.info(f"New data item: {item_name}") LOGGER.info(f"Data location: {item_loc}") stac_item = self.create_stac_item(item_name, item_data) - if stac_item: - post_stac_item( + if isinstance(stac_item, tuple): + LOGGER.error("Failed to create STAC representation") + error_file.write(f"{stac_item[1]}: {item_loc}\n") + failures += 1 + else: + errc = post_stac_item( self.stac_host, self.collection_id, item_name, @@ -158,7 +169,12 @@ def ingest(self) -> None: update=self.update, session=self._session, ) - counter += 1 - LOGGER.info(f"Processed {counter} data items") - else: - LOGGER.error("Failed to create STAC representation") + if errc: + LOGGER.error(errc) + error_file.write(f"{errc}: {item_loc}\n") + failures += 1 + + counter += 1 + LOGGER.info(f"Processed {counter} data items. {failures} failures") + + error_file.close() From 62381a3d567697031be082447626a5a7cfe8ffe2 Mon Sep 17 00:00:00 2001 From: Deepak Chandan Date: Fri, 16 Feb 2024 19:50:06 -0500 Subject: [PATCH 02/11] fixing type issue --- STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py b/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py index 4832011..f506507 100644 --- a/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py +++ b/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py @@ -46,7 +46,7 @@ def __init__( def create_stac_item( self, item_name: str, item_data: MutableMapping[str, Any] - ) -> Union[(int, str), MutableMapping[str, Any]]: + ) -> Union[tuple[int, str], MutableMapping[str, Any]]: """Creates the STAC item. :param item_name: name of the STAC item. Interpretation of name is left to the input loader implementation From 3e4ae78871c3216211dc132dd4522e4afaeb28b3 Mon Sep 17 00:00:00 2001 From: Deepak Chandan Date: Sat, 17 Feb 2024 11:09:50 -0500 Subject: [PATCH 03/11] updating app logging --- STACpopulator/api_requests.py | 8 -- .../implementations/CMIP6_UofT/add_CMIP6.py | 4 +- .../DirectoryLoader/crawl_directory.py | 4 +- STACpopulator/input.py | 8 -- STACpopulator/logging.py | 113 ++++++++++++++++++ STACpopulator/populator_base.py | 23 ++-- STACpopulator/stac_utils.py | 26 ++-- 7 files changed, 142 insertions(+), 44 deletions(-) create mode 100644 STACpopulator/logging.py diff --git a/STACpopulator/api_requests.py b/STACpopulator/api_requests.py index 4d790c6..c66f0e9 100644 --- a/STACpopulator/api_requests.py +++ b/STACpopulator/api_requests.py @@ -3,17 +3,9 @@ from typing import Any, Optional, Union import requests -from colorlog import ColoredFormatter from requests import Session LOGGER = logging.getLogger(__name__) -LOGFORMAT = " %(log_color)s%(levelname)s:%(reset)s %(blue)s[%(name)-30s]%(reset)s %(message)s" -formatter = ColoredFormatter(LOGFORMAT) -stream = logging.StreamHandler() -stream.setFormatter(formatter) -LOGGER.addHandler(stream) -LOGGER.setLevel(logging.INFO) -LOGGER.propagate = False def stac_host_reachable(url: str, session: Optional[Session] = None) -> bool: diff --git a/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py b/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py index f506507..e7ec981 100644 --- a/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py +++ b/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py @@ -1,5 +1,6 @@ import argparse import json +import logging import os from typing import Any, MutableMapping, NoReturn, Optional, Union @@ -13,9 +14,8 @@ from STACpopulator.input import ErrorLoader, GenericLoader, THREDDSLoader from STACpopulator.models import GeoJSONPolygon from STACpopulator.populator_base import STACpopulatorBase -from STACpopulator.stac_utils import get_logger -LOGGER = get_logger(__name__) +LOGGER = logging.getLogger(__name__) class CMIP6populator(STACpopulatorBase): diff --git a/STACpopulator/implementations/DirectoryLoader/crawl_directory.py b/STACpopulator/implementations/DirectoryLoader/crawl_directory.py index 5336d08..bae4c61 100644 --- a/STACpopulator/implementations/DirectoryLoader/crawl_directory.py +++ b/STACpopulator/implementations/DirectoryLoader/crawl_directory.py @@ -1,4 +1,5 @@ import argparse +import logging import os.path from typing import Any, MutableMapping, NoReturn, Optional @@ -8,9 +9,8 @@ from STACpopulator.input import STACDirectoryLoader from STACpopulator.models import GeoJSONPolygon from STACpopulator.populator_base import STACpopulatorBase -from STACpopulator.stac_utils import get_logger -LOGGER = get_logger(__name__) +LOGGER = logging.getLogger(__name__) class DirectoryPopulator(STACpopulatorBase): diff --git a/STACpopulator/input.py b/STACpopulator/input.py index a9525e8..5d4cab7 100644 --- a/STACpopulator/input.py +++ b/STACpopulator/input.py @@ -8,20 +8,12 @@ import requests import siphon import xncml -from colorlog import ColoredFormatter from requests.sessions import Session from siphon.catalog import TDSCatalog, session_manager from STACpopulator.stac_utils import numpy_to_python_datatypes, url_validate LOGGER = logging.getLogger(__name__) -LOGFORMAT = " %(log_color)s%(levelname)s:%(reset)s %(blue)s[%(name)-30s]%(reset)s %(message)s" -formatter = ColoredFormatter(LOGFORMAT) -stream = logging.StreamHandler() -stream.setFormatter(formatter) -LOGGER.addHandler(stream) -LOGGER.setLevel(logging.INFO) -LOGGER.propagate = False class GenericLoader(ABC): diff --git a/STACpopulator/logging.py b/STACpopulator/logging.py new file mode 100644 index 0000000..6dfad80 --- /dev/null +++ b/STACpopulator/logging.py @@ -0,0 +1,113 @@ +import datetime as dt +import json +import logging + +LOG_RECORD_BUILTIN_ATTRS = { + "args", + "asctime", + "created", + "exc_info", + "exc_text", + "filename", + "funcName", + "levelname", + "levelno", + "lineno", + "module", + "msecs", + "message", + "msg", + "name", + "pathname", + "process", + "processName", + "relativeCreated", + "stack_info", + "thread", + "threadName", + "taskName", +} + + +class JSONLogFormatter(logging.Formatter): + # From: https://github.com/mCodingLLC/VideosSampleCode/tree/master/videos/135_modern_logging + def __init__( + self, + *, + fmt_keys: dict[str, str] | None = None, + ): + super().__init__() + self.fmt_keys = fmt_keys if fmt_keys is not None else {} + + def format(self, record: logging.LogRecord) -> str: + message = self._prepare_log_dict(record) + return json.dumps(message, default=str) + + def _prepare_log_dict(self, record: logging.LogRecord): + always_fields = { + "message": record.getMessage(), + "timestamp": dt.datetime.fromtimestamp(record.created, tz=dt.timezone.utc).isoformat(), + } + if record.exc_info is not None: + always_fields["exc_info"] = self.formatException(record.exc_info) + + if record.stack_info is not None: + always_fields["stack_info"] = self.formatStack(record.stack_info) + + message = { + key: msg_val if (msg_val := always_fields.pop(val, None)) is not None else getattr(record, val) + for key, val in self.fmt_keys.items() + } + message.update(always_fields) + + for key, val in record.__dict__.items(): + if key not in LOG_RECORD_BUILTIN_ATTRS: + message[key] = val + + return message + + +class NonErrorFilter(logging.Filter): + def filter(self, record: logging.LogRecord) -> bool | logging.LogRecord: + return record.levelno <= logging.INFO + + +logconfig = { + "version": 1, + "disable_existing_loggers": False, + "formatters": { + "simple": { + "()": "colorlog.ColoredFormatter", + "format": " %(log_color)s%(levelname)s:%(reset)s %(blue)s[%(name)-30s]%(reset)s %(message)s", + "datefmt": "%Y-%m-%dT%H:%M:%S%z", + }, + "json": { + "()": "STACpopulator.logging.JSONLogFormatter", + "fmt_keys": { + "level": "levelname", + "message": "message", + "timestamp": "timestamp", + "logger": "name", + "module": "module", + "function": "funcName", + "line": "lineno", + "thread_name": "threadName", + }, + }, + }, + "handlers": { + "stderr": { + "class": "logging.StreamHandler", + "level": "INFO", + "formatter": "simple", + "stream": "ext://sys.stderr", + }, + "file": { + "class": "logging.FileHandler", + "level": "INFO", + "formatter": "json", + "filename": "__added_dynamically__", + }, + }, + "loggers": {"root": {"level": "DEBUG", "handlers": ["stderr", "file"]}}, +} diff --git a/STACpopulator/populator_base.py b/STACpopulator/populator_base.py index 9c3f8ce..0b42b79 100644 --- a/STACpopulator/populator_base.py +++ b/STACpopulator/populator_base.py @@ -1,9 +1,10 @@ import functools import inspect +import logging import os from abc import ABC, abstractmethod from datetime import datetime -from typing import Any, MutableMapping, Optional, TextIO, Type, Union +from typing import Any, MutableMapping, Optional, Type, Union import pystac from requests.sessions import Session @@ -15,9 +16,9 @@ ) from STACpopulator.input import GenericLoader from STACpopulator.models import AnyGeometry -from STACpopulator.stac_utils import get_logger, load_config, url_validate +from STACpopulator.stac_utils import load_config, setup_logging, url_validate -LOGGER = get_logger(__name__) +LOGGER = logging.getLogger(__name__) class STACpopulatorBase(ABC): @@ -39,6 +40,7 @@ def __init__( """ super().__init__() + self.configure_app_logging() self._collection_config_path = config_file self._collection_info: MutableMapping[str, Any] = None self._session = session @@ -142,13 +144,14 @@ def create_stac_collection(self) -> dict[str, Any]: def publish_stac_collection(self, collection_data: dict[str, Any]) -> None: post_stac_collection(self.stac_host, collection_data, self.update, session=self._session) - def open_error_file(self) -> TextIO: + def configure_app_logging(self) -> None: + """Configure the logger for the App.""" + # generating the log file name implementation_name = type(self).__name__ - fname = f"stac-populator_{implementation_name}_errors_{datetime.strftime(datetime.now(), '%Y%m%d-%H%M%S')}.txt" - return open(fname, "w", buffering=1) + fname = f"{implementation_name}_log_{datetime.strftime(datetime.now(), '%Y%m%d-%H%M%S')}.jsonl" + setup_logging(fname) def ingest(self) -> None: - error_file = self.open_error_file() counter = 0 failures = 0 LOGGER.info("Data ingestion") @@ -158,7 +161,7 @@ def ingest(self) -> None: stac_item = self.create_stac_item(item_name, item_data) if isinstance(stac_item, tuple): LOGGER.error("Failed to create STAC representation") - error_file.write(f"{stac_item[1]}: {item_loc}\n") + # error_file.write(f"{stac_item[1]}: {item_loc}\n") failures += 1 else: errc = post_stac_item( @@ -171,10 +174,10 @@ def ingest(self) -> None: ) if errc: LOGGER.error(errc) - error_file.write(f"{errc}: {item_loc}\n") + # error_file.write(f"{errc}: {item_loc}\n") failures += 1 counter += 1 LOGGER.info(f"Processed {counter} data items. {failures} failures") - error_file.close() + # error_file.close() diff --git a/STACpopulator/stac_utils.py b/STACpopulator/stac_utils.py index 5c3aa5d..302421a 100644 --- a/STACpopulator/stac_utils.py +++ b/STACpopulator/stac_utils.py @@ -7,24 +7,22 @@ import numpy as np import pystac import yaml -from colorlog import ColoredFormatter +from STACpopulator.logging import logconfig -def get_logger( - name: str, - log_fmt: str = " %(log_color)s%(levelname)s:%(reset)s %(blue)s[%(name)-30s]%(reset)s %(message)s", -) -> logging.Logger: - logger = logging.getLogger(name) - formatter = ColoredFormatter(log_fmt) - stream = logging.StreamHandler() - stream.setFormatter(formatter) - logger.addHandler(stream) - logger.setLevel(logging.INFO) - logger.propagate = False - return logger +def setup_logging(logfname: str) -> None: + """Setup the logger for the app. -LOGGER = get_logger(__name__) + :param logfname: name of the file to which to write log outputs + :type logfname: str + """ + config = logconfig + config["handlers"]["file"]["filename"] = logfname + logging.config.dictConfig(config) + + +LOGGER = logging.getLogger(__name__) def url_validate(target: str) -> bool: From ffd82988cdddfadebd584a8fd1f8c2047dbf2bdd Mon Sep 17 00:00:00 2001 From: Deepak Chandan Date: Sat, 17 Feb 2024 11:11:53 -0500 Subject: [PATCH 04/11] updating gitignore to ignore log files --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index 9d5674a..df45bde 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,6 @@ reports STACpopulator.egg-info/ build *.pyc + +## Logs +*.jsonl \ No newline at end of file From d52701d0634fc83b435576e017dce3f02dd9f0af Mon Sep 17 00:00:00 2001 From: Deepak Chandan Date: Mon, 19 Feb 2024 22:24:22 -0500 Subject: [PATCH 05/11] improve exception handling in ingest loop --- .../implementations/CMIP6_UofT/add_CMIP6.py | 17 +++++++--------- STACpopulator/populator_base.py | 20 +++++++++---------- 2 files changed, 16 insertions(+), 21 deletions(-) diff --git a/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py b/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py index e7ec981..be87dc1 100644 --- a/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py +++ b/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py @@ -46,7 +46,7 @@ def __init__( def create_stac_item( self, item_name: str, item_data: MutableMapping[str, Any] - ) -> Union[tuple[int, str], MutableMapping[str, Any]]: + ) -> Union[None, MutableMapping[str, Any]]: """Creates the STAC item. :param item_name: name of the STAC item. Interpretation of name is left to the input loader implementation @@ -60,26 +60,23 @@ def create_stac_item( try: cmip_helper = CMIP6Helper(item_data, self.item_geometry_model) item = cmip_helper.stac_item() - except Exception: - LOGGER.error("Failed to add CMIP6 extension to item %s", item_name) - return (-1, "Failed to add CMIP6 extension") + except Exception as e: + raise Exception("Failed to add CMIP6 extension") from e # Add datacube extension try: dc_helper = DataCubeHelper(item_data) dc_ext = DatacubeExtension.ext(item, add_if_missing=True) dc_ext.apply(dimensions=dc_helper.dimensions, variables=dc_helper.variables) - except Exception: - LOGGER.error("Failed to add Datacube extension to item %s", item_name) - return (-1, "Failed to add Datacube extension") + except Exception as e: + raise Exception("Failed to add Datacube extension") from e try: thredds_helper = THREDDSHelper(item_data["access_urls"]) thredds_ext = THREDDSExtension.ext(item) thredds_ext.apply(thredds_helper.services, thredds_helper.links) - except Exception: - LOGGER.error("Failed to add THREDDS references to item %s", item_name) - return (-1, "Failed to add THREDDS references") + except Exception as e: + raise Exception("Failed to add THREDDS extension") from e # print(json.dumps(item.to_dict())) return json.loads(json.dumps(item.to_dict())) diff --git a/STACpopulator/populator_base.py b/STACpopulator/populator_base.py index 0b42b79..2f3873d 100644 --- a/STACpopulator/populator_base.py +++ b/STACpopulator/populator_base.py @@ -156,14 +156,9 @@ def ingest(self) -> None: failures = 0 LOGGER.info("Data ingestion") for item_name, item_loc, item_data in self._ingest_pipeline: - LOGGER.info(f"New data item: {item_name}") - LOGGER.info(f"Data location: {item_loc}") - stac_item = self.create_stac_item(item_name, item_data) - if isinstance(stac_item, tuple): - LOGGER.error("Failed to create STAC representation") - # error_file.write(f"{stac_item[1]}: {item_loc}\n") - failures += 1 - else: + LOGGER.info(f"New data item: {item_name}", extra={"item_loc": item_loc}) + try: + stac_item = self.create_stac_item(item_name, item_data) errc = post_stac_item( self.stac_host, self.collection_id, @@ -174,10 +169,13 @@ def ingest(self) -> None: ) if errc: LOGGER.error(errc) - # error_file.write(f"{errc}: {item_loc}\n") failures += 1 + except Exception: + LOGGER.exception( + f"Failed to create STAC item for {item_name}", + extra={"item_loc": item_loc, "loader": type(self._ingest_pipeline)}, + ) + failures += 1 counter += 1 LOGGER.info(f"Processed {counter} data items. {failures} failures") - - # error_file.close() From 6719447b98af5c7e9caf528635aa597f3b57542c Mon Sep 17 00:00:00 2001 From: Deepak Chandan Date: Mon, 19 Feb 2024 22:25:44 -0500 Subject: [PATCH 06/11] improvement to item location returned by THREDDSLoader --- STACpopulator/input.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/STACpopulator/input.py b/STACpopulator/input.py index 5d4cab7..daccc33 100644 --- a/STACpopulator/input.py +++ b/STACpopulator/input.py @@ -141,7 +141,9 @@ def __iter__(self) -> Iterator[Tuple[str, str, MutableMapping[str, Any]]]: if self.catalog_head.datasets.items(): for item_name, ds in self.catalog_head.datasets.items(): attrs = self.extract_metadata(ds) - yield item_name, ds.url_path, attrs + filename = ds.url_path[ds.url_path.rfind("/") :] + url = self.catalog_head.catalog_url[: self.catalog_head.catalog_url.rfind("/")] + filename + yield item_name, url, attrs for name, ref in self.catalog_head.catalog_refs.items(): self.catalog_head = ref.follow() From d671dae19ef28aefde3cd3e41141e646002f3114 Mon Sep 17 00:00:00 2001 From: Deepak Chandan Date: Mon, 19 Feb 2024 22:27:14 -0500 Subject: [PATCH 07/11] add optional cli argument to configure logger's debug level --- .../implementations/CMIP6_UofT/add_CMIP6.py | 12 ++++++------ STACpopulator/populator_base.py | 8 +++++--- STACpopulator/stac_utils.py | 6 +++++- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py b/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py index be87dc1..c29b6f7 100644 --- a/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py +++ b/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py @@ -29,6 +29,7 @@ def __init__( update: Optional[bool] = False, session: Optional[Session] = None, config_file: Optional[Union[os.PathLike[str], str]] = None, + log_debug: Optional[bool] = False, ) -> None: """Constructor @@ -37,11 +38,7 @@ def __init__( :param data_loader: loader to iterate over ingestion data. """ super().__init__( - stac_host, - data_loader, - update=update, - session=session, - config_file=config_file, + stac_host, data_loader, update=update, session=session, config_file=config_file, log_debug=log_debug ) def create_stac_item( @@ -101,6 +98,7 @@ def make_parser() -> argparse.ArgumentParser: "By default, uses the adjacent configuration to the implementation class." ), ) + parser.add_argument("--debug", action="store_true", help="Set logger level to debug") add_request_options(parser) return parser @@ -116,7 +114,9 @@ def runner(ns: argparse.Namespace) -> Optional[int] | NoReturn: # To be implemented data_loader = ErrorLoader() - c = CMIP6populator(ns.stac_host, data_loader, update=ns.update, session=session, config_file=ns.config) + c = CMIP6populator( + ns.stac_host, data_loader, update=ns.update, session=session, config_file=ns.config, log_debug=ns.debug + ) c.ingest() diff --git a/STACpopulator/populator_base.py b/STACpopulator/populator_base.py index 2f3873d..e9432b3 100644 --- a/STACpopulator/populator_base.py +++ b/STACpopulator/populator_base.py @@ -29,6 +29,7 @@ def __init__( update: Optional[bool] = False, session: Optional[Session] = None, config_file: Optional[Union[os.PathLike[str], str]] = "collection_config.yml", + log_debug: Optional[bool] = False, ) -> None: """Constructor @@ -40,7 +41,7 @@ def __init__( """ super().__init__() - self.configure_app_logging() + self.configure_app_logging(log_debug) self._collection_config_path = config_file self._collection_info: MutableMapping[str, Any] = None self._session = session @@ -144,12 +145,13 @@ def create_stac_collection(self) -> dict[str, Any]: def publish_stac_collection(self, collection_data: dict[str, Any]) -> None: post_stac_collection(self.stac_host, collection_data, self.update, session=self._session) - def configure_app_logging(self) -> None: + def configure_app_logging(self, log_debug) -> None: """Configure the logger for the App.""" # generating the log file name implementation_name = type(self).__name__ fname = f"{implementation_name}_log_{datetime.strftime(datetime.now(), '%Y%m%d-%H%M%S')}.jsonl" - setup_logging(fname) + log_level = "DEBUG" if log_debug else "INFO" + setup_logging(fname, log_level) def ingest(self) -> None: counter = 0 diff --git a/STACpopulator/stac_utils.py b/STACpopulator/stac_utils.py index 302421a..1d5f6cd 100644 --- a/STACpopulator/stac_utils.py +++ b/STACpopulator/stac_utils.py @@ -11,14 +11,18 @@ from STACpopulator.logging import logconfig -def setup_logging(logfname: str) -> None: +def setup_logging(logfname: str, log_level: str) -> None: """Setup the logger for the app. :param logfname: name of the file to which to write log outputs :type logfname: str + :param log_level: base logging level (e.g. "INFO") + :type log_level: str """ config = logconfig config["handlers"]["file"]["filename"] = logfname + for handler in config["handlers"]: + config["handlers"][handler]["level"] = log_level logging.config.dictConfig(config) From 1c8cff979c3320f69ec4c869bd29ce57e7cc958c Mon Sep 17 00:00:00 2001 From: Deepak Chandan Date: Mon, 19 Feb 2024 22:30:39 -0500 Subject: [PATCH 08/11] moving setup_logging to logging module --- STACpopulator/logging.py | 15 +++++++++++++++ STACpopulator/populator_base.py | 3 ++- STACpopulator/stac_utils.py | 18 ------------------ 3 files changed, 17 insertions(+), 19 deletions(-) diff --git a/STACpopulator/logging.py b/STACpopulator/logging.py index 6dfad80..bb15e45 100644 --- a/STACpopulator/logging.py +++ b/STACpopulator/logging.py @@ -29,6 +29,21 @@ } +def setup_logging(logfname: str, log_level: str) -> None: + """Setup the logger for the app. + + :param logfname: name of the file to which to write log outputs + :type logfname: str + :param log_level: base logging level (e.g. "INFO") + :type log_level: str + """ + config = logconfig + config["handlers"]["file"]["filename"] = logfname + for handler in config["handlers"]: + config["handlers"][handler]["level"] = log_level + logging.config.dictConfig(config) + + class JSONLogFormatter(logging.Formatter): # From: https://github.com/mCodingLLC/VideosSampleCode/tree/master/videos/135_modern_logging def __init__( diff --git a/STACpopulator/populator_base.py b/STACpopulator/populator_base.py index e9432b3..23c4799 100644 --- a/STACpopulator/populator_base.py +++ b/STACpopulator/populator_base.py @@ -15,8 +15,9 @@ stac_host_reachable, ) from STACpopulator.input import GenericLoader +from STACpopulator.logging import setup_logging from STACpopulator.models import AnyGeometry -from STACpopulator.stac_utils import load_config, setup_logging, url_validate +from STACpopulator.stac_utils import load_config, url_validate LOGGER = logging.getLogger(__name__) diff --git a/STACpopulator/stac_utils.py b/STACpopulator/stac_utils.py index 1d5f6cd..515507a 100644 --- a/STACpopulator/stac_utils.py +++ b/STACpopulator/stac_utils.py @@ -8,24 +8,6 @@ import pystac import yaml -from STACpopulator.logging import logconfig - - -def setup_logging(logfname: str, log_level: str) -> None: - """Setup the logger for the app. - - :param logfname: name of the file to which to write log outputs - :type logfname: str - :param log_level: base logging level (e.g. "INFO") - :type log_level: str - """ - config = logconfig - config["handlers"]["file"]["filename"] = logfname - for handler in config["handlers"]: - config["handlers"][handler]["level"] = log_level - logging.config.dictConfig(config) - - LOGGER = logging.getLogger(__name__) From 19941c0a3333ce13e23405d787e34804ba20e702 Mon Sep 17 00:00:00 2001 From: Deepak Chandan Date: Tue, 20 Feb 2024 11:59:11 -0500 Subject: [PATCH 09/11] fixing error handling while posting STAC items --- .gitignore | 3 ++- STACpopulator/api_requests.py | 12 +++++------ STACpopulator/populator_base.py | 38 +++++++++++++++++++++++---------- 3 files changed, 34 insertions(+), 19 deletions(-) diff --git a/.gitignore b/.gitignore index df45bde..c6c977b 100644 --- a/.gitignore +++ b/.gitignore @@ -21,4 +21,5 @@ build *.pyc ## Logs -*.jsonl \ No newline at end of file +*.jsonl +*.json \ No newline at end of file diff --git a/STACpopulator/api_requests.py b/STACpopulator/api_requests.py index c66f0e9..8056cc2 100644 --- a/STACpopulator/api_requests.py +++ b/STACpopulator/api_requests.py @@ -73,7 +73,7 @@ def post_stac_item( json_data: dict[str, dict], update: Optional[bool] = True, session: Optional[Session] = None, -) -> Union[None, str]: +) -> None: """Post a STAC item to the host server. :param stac_host: address of the STAC host @@ -99,12 +99,10 @@ def post_stac_item( if update: LOGGER.info(f"Item {item_id} already exists. Updating.") r = session.put(os.path.join(stac_host, f"collections/{collection_id}/items/{item_id}"), json=json_data) - return f"Requests: {r.reason}" - # r.raise_for_status() + # return f"Requests: {r.reason}" + r.raise_for_status() else: LOGGER.warn(f"Item {item_id} already exists.") else: - return f"Requests: {r.reason}" - # r.raise_for_status() - - return None + # return f"Requests: {r.reason}" + r.raise_for_status() diff --git a/STACpopulator/populator_base.py b/STACpopulator/populator_base.py index 23c4799..0873ccb 100644 --- a/STACpopulator/populator_base.py +++ b/STACpopulator/populator_base.py @@ -1,5 +1,6 @@ import functools import inspect +import json import logging import os from abc import ABC, abstractmethod @@ -162,23 +163,38 @@ def ingest(self) -> None: LOGGER.info(f"New data item: {item_name}", extra={"item_loc": item_loc}) try: stac_item = self.create_stac_item(item_name, item_data) - errc = post_stac_item( - self.stac_host, - self.collection_id, - item_name, - stac_item, - update=self.update, - session=self._session, - ) - if errc: - LOGGER.error(errc) - failures += 1 except Exception: LOGGER.exception( f"Failed to create STAC item for {item_name}", extra={"item_loc": item_loc, "loader": type(self._ingest_pipeline)}, ) failures += 1 + stac_item = None + + if stac_item: + try: + post_stac_item( + self.stac_host, + self.collection_id, + item_name, + stac_item, + update=self.update, + session=self._session, + ) + except Exception: + # Something went wrong on the server side, most likely because the STAC item generated above has + # incorrect data. Writing the STAC item to file so that the issue could be diagnosed and fixed. + stac_output_fname = "error_STAC_rep_" + item_name.split(".")[0] + ".json" + json.dump(stac_item, open(stac_output_fname, "w"), indent=2) + LOGGER.exception( + f"Failed to post STAC item for {item_name}", + extra={ + "item_loc": item_loc, + "loader": type(self._ingest_pipeline), + "stac_output_fname": stac_output_fname, + }, + ) + failures += 1 counter += 1 LOGGER.info(f"Processed {counter} data items. {failures} failures") From 24eeb6b0b863dd5a19f1903527300903b129f9ca Mon Sep 17 00:00:00 2001 From: Deepak Chandan Date: Tue, 20 Feb 2024 14:34:12 -0500 Subject: [PATCH 10/11] PR changes --- STACpopulator/api_requests.py | 2 -- STACpopulator/cli.py | 60 +++++++++++++++++++++++---------- STACpopulator/logging.py | 6 ++-- STACpopulator/populator_base.py | 10 ------ 4 files changed, 46 insertions(+), 32 deletions(-) diff --git a/STACpopulator/api_requests.py b/STACpopulator/api_requests.py index 8056cc2..05aeaa5 100644 --- a/STACpopulator/api_requests.py +++ b/STACpopulator/api_requests.py @@ -99,10 +99,8 @@ def post_stac_item( if update: LOGGER.info(f"Item {item_id} already exists. Updating.") r = session.put(os.path.join(stac_host, f"collections/{collection_id}/items/{item_id}"), json=json_data) - # return f"Requests: {r.reason}" r.raise_for_status() else: LOGGER.warn(f"Item {item_id} already exists.") else: - # return f"Requests: {r.reason}" r.raise_for_status() diff --git a/STACpopulator/cli.py b/STACpopulator/cli.py index 2a2dad2..9f3bd65 100644 --- a/STACpopulator/cli.py +++ b/STACpopulator/cli.py @@ -1,16 +1,19 @@ import argparse import glob import importlib +import logging import os import sys +from datetime import datetime +from http import cookiejar from typing import Callable, Optional import requests -from http import cookiejar from requests.auth import AuthBase, HTTPBasicAuth, HTTPDigestAuth, HTTPProxyAuth from requests.sessions import Session from STACpopulator import __version__ +from STACpopulator.logging import setup_logging POPULATORS = {} @@ -54,19 +57,24 @@ def add_request_options(parser: argparse.ArgumentParser) -> None: Adds arguments to a parser to allow update of a request session definition used across a populator procedure. """ parser.add_argument( - "--no-verify", "--no-ssl", "--no-ssl-verify", dest="verify", action="store_false", - help="Disable SSL verification (not recommended unless for development/test servers)." - ) - parser.add_argument( - "--cert", type=argparse.FileType(), required=False, help="Path to a certificate file to use." + "--no-verify", + "--no-ssl", + "--no-ssl-verify", + dest="verify", + action="store_false", + help="Disable SSL verification (not recommended unless for development/test servers).", ) + parser.add_argument("--cert", type=argparse.FileType(), required=False, help="Path to a certificate file to use.") parser.add_argument( - "--auth-handler", choices=["basic", "digest", "bearer", "proxy", "cookie"], required=False, - help="Authentication strategy to employ for the requests session." + "--auth-handler", + choices=["basic", "digest", "bearer", "proxy", "cookie"], + required=False, + help="Authentication strategy to employ for the requests session.", ) parser.add_argument( - "--auth-identity", required=False, - help="Bearer token, cookie-jar file or proxy/digest/basic username:password for selected authorization handler." + "--auth-identity", + required=False, + help="Bearer token, cookie-jar file or proxy/digest/basic username:password for selected authorization handler.", ) @@ -93,16 +101,25 @@ def apply_request_options(session: Session, namespace: argparse.Namespace) -> No def make_main_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(prog="stac-populator", description="STACpopulator operations.") - parser.add_argument("--version", "-V", action="version", version=f"%(prog)s {__version__}", - help="prints the version of the library and exits") + parser.add_argument( + "--version", + "-V", + action="version", + version=f"%(prog)s {__version__}", + help="prints the version of the library and exits", + ) commands = parser.add_subparsers(title="command", dest="command", description="STAC populator command to execute.") run_cmd_parser = make_run_command_parser(parser.prog) commands.add_parser( "run", - prog=f"{parser.prog} {run_cmd_parser.prog}", parents=[run_cmd_parser], - formatter_class=run_cmd_parser.formatter_class, usage=run_cmd_parser.usage, - add_help=False, help=run_cmd_parser.description, description=run_cmd_parser.description + prog=f"{parser.prog} {run_cmd_parser.prog}", + parents=[run_cmd_parser], + formatter_class=run_cmd_parser.formatter_class, + usage=run_cmd_parser.usage, + add_help=False, + help=run_cmd_parser.description, + description=run_cmd_parser.description, ) # add more commands as needed... @@ -142,9 +159,12 @@ def make_run_command_parser(parent) -> argparse.ArgumentParser: populator_prog = f"{parent} {parser.prog} {populator_name}" subparsers.add_parser( populator_name, - prog=populator_prog, parents=[populator_parser], formatter_class=populator_parser.formatter_class, + prog=populator_prog, + parents=[populator_parser], + formatter_class=populator_parser.formatter_class, add_help=False, # add help disabled otherwise conflicts with this main populator help - help=populator_parser.description, description=populator_parser.description, + help=populator_parser.description, + description=populator_parser.description, usage=populator_parser.usage, ) POPULATORS[populator_name] = { @@ -168,6 +188,12 @@ def main(*args: str) -> Optional[int]: result = None if populator_cmd == "run": populator_name = params.pop("populator") + + # Setup the application logger: + fname = f"{populator_name}_log_{datetime.utcnow().isoformat() + 'Z'}.jsonl" + log_level = logging.DEBUG if ns.debug else logging.INFO + setup_logging(fname, log_level) + if not populator_name: parser.print_help() return 0 diff --git a/STACpopulator/logging.py b/STACpopulator/logging.py index bb15e45..0d7caad 100644 --- a/STACpopulator/logging.py +++ b/STACpopulator/logging.py @@ -29,7 +29,7 @@ } -def setup_logging(logfname: str, log_level: str) -> None: +def setup_logging(logfname: str, log_level: int) -> None: """Setup the logger for the app. :param logfname: name of the file to which to write log outputs @@ -40,7 +40,7 @@ def setup_logging(logfname: str, log_level: str) -> None: config = logconfig config["handlers"]["file"]["filename"] = logfname for handler in config["handlers"]: - config["handlers"][handler]["level"] = log_level + config["handlers"][handler]["level"] = logging.getLevelName(log_level) logging.config.dictConfig(config) @@ -97,7 +97,7 @@ def filter(self, record: logging.LogRecord) -> bool | logging.LogRecord: "datefmt": "%Y-%m-%dT%H:%M:%S%z", }, "json": { - "()": "STACpopulator.logging.JSONLogFormatter", + "()": f"{JSONLogFormatter.__module__}.{JSONLogFormatter.__name__}", "fmt_keys": { "level": "levelname", "message": "message", diff --git a/STACpopulator/populator_base.py b/STACpopulator/populator_base.py index 0873ccb..9e5fa53 100644 --- a/STACpopulator/populator_base.py +++ b/STACpopulator/populator_base.py @@ -16,7 +16,6 @@ stac_host_reachable, ) from STACpopulator.input import GenericLoader -from STACpopulator.logging import setup_logging from STACpopulator.models import AnyGeometry from STACpopulator.stac_utils import load_config, url_validate @@ -43,7 +42,6 @@ def __init__( """ super().__init__() - self.configure_app_logging(log_debug) self._collection_config_path = config_file self._collection_info: MutableMapping[str, Any] = None self._session = session @@ -147,14 +145,6 @@ def create_stac_collection(self) -> dict[str, Any]: def publish_stac_collection(self, collection_data: dict[str, Any]) -> None: post_stac_collection(self.stac_host, collection_data, self.update, session=self._session) - def configure_app_logging(self, log_debug) -> None: - """Configure the logger for the App.""" - # generating the log file name - implementation_name = type(self).__name__ - fname = f"{implementation_name}_log_{datetime.strftime(datetime.now(), '%Y%m%d-%H%M%S')}.jsonl" - log_level = "DEBUG" if log_debug else "INFO" - setup_logging(fname, log_level) - def ingest(self) -> None: counter = 0 failures = 0 From 00cb1d2799b7a53cdf5bcc9c7e4a81847952ec13 Mon Sep 17 00:00:00 2001 From: Deepak Chandan Date: Wed, 21 Feb 2024 12:37:10 -0500 Subject: [PATCH 11/11] Change location of debug argparse argument --- STACpopulator/cli.py | 1 + STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/STACpopulator/cli.py b/STACpopulator/cli.py index 9f3bd65..7cde7b6 100644 --- a/STACpopulator/cli.py +++ b/STACpopulator/cli.py @@ -123,6 +123,7 @@ def make_main_parser() -> argparse.ArgumentParser: ) # add more commands as needed... + parser.add_argument("--debug", action="store_true", help="Set logger level to debug") return parser diff --git a/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py b/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py index c29b6f7..110b269 100644 --- a/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py +++ b/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py @@ -98,7 +98,6 @@ def make_parser() -> argparse.ArgumentParser: "By default, uses the adjacent configuration to the implementation class." ), ) - parser.add_argument("--debug", action="store_true", help="Set logger level to debug") add_request_options(parser) return parser