From 2eaa878bece3f13e75b14afa1f372c36fd483bf0 Mon Sep 17 00:00:00 2001 From: Daniel Gibson Date: Wed, 18 Dec 2024 15:32:10 -0600 Subject: [PATCH] Add error boundaries around setting up and tearing down the write stream in InstigationLogger This prevents issues with the compute log manager from failing a schedule or sensor tick in which the logging call is happening. --- .../_core/definitions/instigation_logger.py | 31 +++++++-- .../storage_tests/test_instigation_logger.py | 65 +++++++++++++++++++ 2 files changed, 90 insertions(+), 6 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/instigation_logger.py b/python_modules/dagster/dagster/_core/definitions/instigation_logger.py index e54cfe59db44e..996e4de9b8d85 100644 --- a/python_modules/dagster/dagster/_core/definitions/instigation_logger.py +++ b/python_modules/dagster/dagster/_core/definitions/instigation_logger.py @@ -1,5 +1,6 @@ import json import logging +import sys import threading import traceback from contextlib import ExitStack @@ -10,6 +11,7 @@ from dagster._core.log_manager import LOG_RECORD_METADATA_ATTR from dagster._core.storage.compute_log_manager import ComputeIOType, ComputeLogManager from dagster._core.utils import coerce_valid_log_level +from dagster._utils.error import serializable_error_info_from_exc_info from dagster._utils.log import create_console_logger @@ -66,7 +68,12 @@ def emit(self, record: logging.LogRecord): if exc_info: record_dict["exc_info"] = "".join(traceback.format_exception(*exc_info)) - self._write_stream.write(_seven.json.dumps(record_dict) + "\n") + try: + self._write_stream.write(_seven.json.dumps(record_dict) + "\n") + except Exception: + sys.stderr.write( + f"Exception writing to logger event stream: {serializable_error_info_from_exc_info(sys.exc_info())}\n" + ) class InstigationLogger(logging.Logger): @@ -107,18 +114,30 @@ def __enter__(self): and self._instance and isinstance(self._instance.compute_log_manager, ComputeLogManager) ): - write_stream = self._exit_stack.enter_context( - self._instance.compute_log_manager.open_log_stream( - self._log_key, ComputeIOType.STDERR + try: + write_stream = self._exit_stack.enter_context( + self._instance.compute_log_manager.open_log_stream( + self._log_key, ComputeIOType.STDERR + ) ) - ) + except Exception: + sys.stderr.write( + f"Exception initializing logger write stream: {serializable_error_info_from_exc_info(sys.exc_info())}\n" + ) + write_stream = None + if write_stream: self._capture_handler = CapturedLogHandler(write_stream) self.addHandler(self._capture_handler) return self def __exit__(self, _exception_type, _exception_value, _traceback): - self._exit_stack.close() + try: + self._exit_stack.close() + except Exception: + sys.stderr.write( + f"Exception closing logger write stream: {serializable_error_info_from_exc_info(sys.exc_info())}\n" + ) def _annotate_record(self, record: logging.LogRecord) -> logging.LogRecord: if self._repository_name and self._instigator_name: diff --git a/python_modules/dagster/dagster_tests/storage_tests/test_instigation_logger.py b/python_modules/dagster/dagster_tests/storage_tests/test_instigation_logger.py index 926f2eedece42..a21d1b8e60beb 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/test_instigation_logger.py +++ b/python_modules/dagster/dagster_tests/storage_tests/test_instigation_logger.py @@ -1,4 +1,9 @@ +from contextlib import contextmanager +from unittest import mock + from dagster._core.definitions.instigation_logger import InstigationLogger +from dagster._core.storage.noop_compute_log_manager import NoOpComputeLogManager +from dagster._core.test_utils import instance_for_test def test_gets_correct_logger(): @@ -9,3 +14,63 @@ def test_gets_correct_logger(): instigation_logger = InstigationLogger(logger_name=custom_logger_name) assert instigation_logger.name == custom_logger_name + + +class CrashyStartupComputeLogManager(NoOpComputeLogManager): + @contextmanager + def open_log_stream(self, log_key, io_type): + raise Exception("OOPS") + yield None + + +class MockLogStreamComputeLogManager(NoOpComputeLogManager): + @contextmanager + def open_log_stream(self, log_key, io_type): + yield mock.MagicMock() + raise Exception("OOPS ON EXIT") + + +def test_instigation_logger_start_failure(capsys): + with instance_for_test( + overrides={ + "compute_logs": { + "module": "dagster_tests.storage_tests.test_instigation_logger", + "class": "CrashyStartupComputeLogManager", + } + } + ) as instance: + with InstigationLogger(log_key="foo", instance=instance) as logger: + captured = capsys.readouterr() + assert ( + captured.err.count("Exception initializing logger write stream: Exception: OOPS") + == 1 + ) + logger.info("I can log without failing") + + +def test_instigation_logger_log_failure(capsys): + with instance_for_test( + overrides={ + "compute_logs": { + "module": "dagster_tests.storage_tests.test_instigation_logger", + "class": "MockLogStreamComputeLogManager", + } + } + ) as instance: + with InstigationLogger(log_key="foo", instance=instance) as logger: + mock_write_stream = logger._capture_handler._write_stream # type: ignore # noqa + mock_write_stream.write.side_effect = Exception("OOPS") + + logger.info("HELLO") + captured = capsys.readouterr() + + assert ( + captured.err.count("Exception writing to logger event stream: Exception: OOPS") == 1 + ) + + captured = capsys.readouterr() + + assert ( + captured.err.count("Exception closing logger write stream: Exception: OOPS ON EXIT") + == 1 + )