Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add error boundaries to prevent write stream errors in InstigationLogger from failing schedule/sensor ticks #26609

Merged
merged 1 commit into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import logging
import sys
import threading
import traceback
from contextlib import ExitStack
Expand All @@ -10,6 +11,7 @@
from dagster._core.log_manager import LOG_RECORD_METADATA_ATTR
from dagster._core.storage.compute_log_manager import ComputeIOType, ComputeLogManager
from dagster._core.utils import coerce_valid_log_level
from dagster._utils.error import serializable_error_info_from_exc_info
from dagster._utils.log import create_console_logger


Expand Down Expand Up @@ -66,7 +68,12 @@ def emit(self, record: logging.LogRecord):
if exc_info:
record_dict["exc_info"] = "".join(traceback.format_exception(*exc_info))

self._write_stream.write(_seven.json.dumps(record_dict) + "\n")
try:
self._write_stream.write(_seven.json.dumps(record_dict) + "\n")
except Exception:
sys.stderr.write(
f"Exception writing to logger event stream: {serializable_error_info_from_exc_info(sys.exc_info())}\n"
)


class InstigationLogger(logging.Logger):
Expand Down Expand Up @@ -107,18 +114,30 @@ def __enter__(self):
and self._instance
and isinstance(self._instance.compute_log_manager, ComputeLogManager)
):
write_stream = self._exit_stack.enter_context(
self._instance.compute_log_manager.open_log_stream(
self._log_key, ComputeIOType.STDERR
try:
write_stream = self._exit_stack.enter_context(
self._instance.compute_log_manager.open_log_stream(
self._log_key, ComputeIOType.STDERR
)
)
)
except Exception:
sys.stderr.write(
f"Exception initializing logger write stream: {serializable_error_info_from_exc_info(sys.exc_info())}\n"
)
write_stream = None

if write_stream:
self._capture_handler = CapturedLogHandler(write_stream)
self.addHandler(self._capture_handler)
return self

def __exit__(self, _exception_type, _exception_value, _traceback):
self._exit_stack.close()
try:
self._exit_stack.close()
except Exception:
sys.stderr.write(
f"Exception closing logger write stream: {serializable_error_info_from_exc_info(sys.exc_info())}\n"
)

def _annotate_record(self, record: logging.LogRecord) -> logging.LogRecord:
if self._repository_name and self._instigator_name:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
from contextlib import contextmanager
from unittest import mock

from dagster._core.definitions.instigation_logger import InstigationLogger
from dagster._core.storage.noop_compute_log_manager import NoOpComputeLogManager
from dagster._core.test_utils import instance_for_test


def test_gets_correct_logger():
Expand All @@ -9,3 +14,63 @@ def test_gets_correct_logger():

instigation_logger = InstigationLogger(logger_name=custom_logger_name)
assert instigation_logger.name == custom_logger_name


class CrashyStartupComputeLogManager(NoOpComputeLogManager):
@contextmanager
def open_log_stream(self, log_key, io_type):
raise Exception("OOPS")
yield None


class MockLogStreamComputeLogManager(NoOpComputeLogManager):
@contextmanager
def open_log_stream(self, log_key, io_type):
yield mock.MagicMock()
raise Exception("OOPS ON EXIT")


def test_instigation_logger_start_failure(capsys):
with instance_for_test(
overrides={
"compute_logs": {
"module": "dagster_tests.storage_tests.test_instigation_logger",
"class": "CrashyStartupComputeLogManager",
}
}
) as instance:
with InstigationLogger(log_key="foo", instance=instance) as logger:
captured = capsys.readouterr()
assert (
captured.err.count("Exception initializing logger write stream: Exception: OOPS")
== 1
)
logger.info("I can log without failing")


def test_instigation_logger_log_failure(capsys):
with instance_for_test(
overrides={
"compute_logs": {
"module": "dagster_tests.storage_tests.test_instigation_logger",
"class": "MockLogStreamComputeLogManager",
}
}
) as instance:
with InstigationLogger(log_key="foo", instance=instance) as logger:
mock_write_stream = logger._capture_handler._write_stream # type: ignore # noqa
mock_write_stream.write.side_effect = Exception("OOPS")

logger.info("HELLO")
captured = capsys.readouterr()

assert (
captured.err.count("Exception writing to logger event stream: Exception: OOPS") == 1
)

captured = capsys.readouterr()

assert (
captured.err.count("Exception closing logger write stream: Exception: OOPS ON EXIT")
== 1
)