diff --git a/src/component.py b/src/component.py index aeb7725..7521c6e 100644 --- a/src/component.py +++ b/src/component.py @@ -21,6 +21,16 @@ from subprocess_runner import SubprocessRunner from venv_manager import VenvManager +MAX_MESSAGE_LENGTH = 3500 +MAX_DETAIL_LENGTH = 50000 + + +def truncate_message(message: str, max_length: int, suffix: str = "... [truncated]") -> str: + """Truncate a message to max_length, adding suffix if truncated.""" + if len(message) <= max_length: + return message + return message[: max_length - len(suffix)] + suffix + class Component(ComponentBase): """ @@ -85,13 +95,16 @@ def execute_script_file(self, file_path: Path): logging.info("Executing script:\n%s", self.script_excerpt(script)) args = ["uv", "run", str(file_path)] SubprocessRunner.run(args, "Script executed successfully.", "Script execution failed.") + except UserException: + raise except Exception as err: _, _, tb = sys.exc_info() stack_len = len(traceback.extract_tb(tb)[4:]) stack_trace_records = self._get_stack_trace_records(*sys.exc_info(), -stack_len, chain=True) stack_cropped = "\n".join(stack_trace_records) - - raise UserException(f"Script failed. {err}. Detail: {stack_cropped}") from err + error_msg = truncate_message(str(err), MAX_MESSAGE_LENGTH) + detail = truncate_message(stack_cropped, MAX_DETAIL_LENGTH) + raise UserException(f"Script failed. {error_msg}", detail) from err @staticmethod def _get_stack_trace_records(etype, value, tb, limit=None, chain=True): @@ -158,10 +171,11 @@ def get_repository_files(self): # this triggers the run method by default and is controlled by the configuration.action parameter comp.execute_action() except UserException as exc: - detail = "" + error_msg = truncate_message(str(exc.args[0]) if exc.args else str(exc), MAX_MESSAGE_LENGTH) + logging.error(error_msg) if len(exc.args) > 1: - detail = exc.args[1] - logging.exception(exc, extra={"full_message": detail}) + detail = truncate_message(str(exc.args[1]), MAX_DETAIL_LENGTH) + logging.error("Error details:", extra={"full_message": detail}) exit(1) except Exception as exc: logging.exception(exc) diff --git a/src/subprocess_runner.py b/src/subprocess_runner.py index 0e1682c..f6e68f2 100644 --- a/src/subprocess_runner.py +++ b/src/subprocess_runner.py @@ -1,15 +1,67 @@ import logging import subprocess import threading +import time +from collections import deque from keboola.component.exceptions import UserException +BUFFER_FLUSH_INTERVAL = 0.5 +MAX_BUFFER_SIZE = 50000 +MAX_STDERR_LINES = 1000 + + +class LogBuffer: + """Thread-safe buffer for batching log messages.""" + + def __init__(self, prefix: str = "", flush_interval: float = BUFFER_FLUSH_INTERVAL): + self._buffer: list[str] = [] + self._buffer_size = 0 + self._lock = threading.Lock() + self._prefix = prefix + self._flush_interval = flush_interval + self._last_flush = time.time() + + def add_line(self, line: str) -> None: + """Add a line to the buffer, flushing if needed.""" + with self._lock: + self._buffer.append(line) + self._buffer_size += len(line) + 1 + if self._should_flush(): + self._flush_unlocked() + + def _should_flush(self) -> bool: + """Check if buffer should be flushed based on size or time.""" + if self._buffer_size >= MAX_BUFFER_SIZE: + return True + if time.time() - self._last_flush >= self._flush_interval: + return True + return False + + def _flush_unlocked(self) -> None: + """Flush buffer to log (must hold lock).""" + if not self._buffer: + return + content = "\n".join(self._buffer) + if self._prefix: + logging.info("%s:\n%s", self._prefix, content) + else: + logging.info(content) + self._buffer = [] + self._buffer_size = 0 + self._last_flush = time.time() + + def flush(self) -> None: + """Flush any remaining content in the buffer.""" + with self._lock: + self._flush_unlocked() + class SubprocessRunner: @staticmethod def run( args: list[str], - ok_message: str = "Command finished sucessfully.", + ok_message: str = "Command finished successfully.", err_message: str = "Command failed.", ): logging.debug("Running command: %s", " ".join(args)) @@ -20,33 +72,34 @@ def run( text=True, ) - stderr_output = [] + stderr_output: deque[str] = deque(maxlen=MAX_STDERR_LINES) + stdout_buffer = LogBuffer() + stderr_buffer = LogBuffer(prefix="Command stderr") def read_stderr(): if process.stderr: for line in iter(process.stderr.readline, ""): - stderr_output.append(line.strip()) - logging.info("Command stderr: %s", line.strip()) + stripped = line.strip() + stderr_output.append(stripped) + stderr_buffer.add_line(stripped) process.stderr.close() + stderr_buffer.flush() - # Start stderr reader thread stderr_thread = threading.Thread(target=read_stderr) stderr_thread.start() - # Read stdout in main thread - stdout_lines = [] if process.stdout: for line in iter(process.stdout.readline, ""): - stdout_lines.append(line.strip()) - logging.info(line.strip()) + stdout_buffer.add_line(line.strip()) process.stdout.close() + stdout_buffer.flush() stderr_thread.join() process.wait() stderr_str = "\n".join(stderr_output) if stderr_output else "Unknown error." if process.returncode != 0: raise UserException(f"{err_message} Log in event detail.", stderr_str) - elif stderr_str: + elif stderr_output: logging.info("%s Full log in detail.", ok_message, extra={"full_message": stderr_str}) else: logging.info(ok_message)