Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions src/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@
from subprocess_runner import SubprocessRunner
from venv_manager import VenvManager

MAX_MESSAGE_LENGTH = 3500
MAX_DETAIL_LENGTH = 50000


def truncate_message(message: str, max_length: int, suffix: str = "... [truncated]") -> str:
"""Truncate a message to max_length, adding suffix if truncated."""
if len(message) <= max_length:
return message
return message[: max_length - len(suffix)] + suffix


class Component(ComponentBase):
"""
Expand Down Expand Up @@ -85,13 +95,16 @@ def execute_script_file(self, file_path: Path):
logging.info("Executing script:\n%s", self.script_excerpt(script))
args = ["uv", "run", str(file_path)]
SubprocessRunner.run(args, "Script executed successfully.", "Script execution failed.")
except UserException:
raise
except Exception as err:
_, _, tb = sys.exc_info()
stack_len = len(traceback.extract_tb(tb)[4:])
stack_trace_records = self._get_stack_trace_records(*sys.exc_info(), -stack_len, chain=True)
stack_cropped = "\n".join(stack_trace_records)

raise UserException(f"Script failed. {err}. Detail: {stack_cropped}") from err
error_msg = truncate_message(str(err), MAX_MESSAGE_LENGTH)
detail = truncate_message(stack_cropped, MAX_DETAIL_LENGTH)
raise UserException(f"Script failed. {error_msg}", detail) from err

@staticmethod
def _get_stack_trace_records(etype, value, tb, limit=None, chain=True):
Expand Down Expand Up @@ -158,10 +171,11 @@ def get_repository_files(self):
# this triggers the run method by default and is controlled by the configuration.action parameter
comp.execute_action()
except UserException as exc:
detail = ""
error_msg = truncate_message(str(exc.args[0]) if exc.args else str(exc), MAX_MESSAGE_LENGTH)
logging.error(error_msg)
if len(exc.args) > 1:
detail = exc.args[1]
logging.exception(exc, extra={"full_message": detail})
detail = truncate_message(str(exc.args[1]), MAX_DETAIL_LENGTH)
logging.error("Error details:", extra={"full_message": detail})
exit(1)
except Exception as exc:
logging.exception(exc)
Expand Down
73 changes: 63 additions & 10 deletions src/subprocess_runner.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,67 @@
import logging
import subprocess
import threading
import time
from collections import deque

from keboola.component.exceptions import UserException

BUFFER_FLUSH_INTERVAL = 0.5
MAX_BUFFER_SIZE = 50000
MAX_STDERR_LINES = 1000


class LogBuffer:
"""Thread-safe buffer for batching log messages."""

def __init__(self, prefix: str = "", flush_interval: float = BUFFER_FLUSH_INTERVAL):
self._buffer: list[str] = []
self._buffer_size = 0
self._lock = threading.Lock()
self._prefix = prefix
self._flush_interval = flush_interval
self._last_flush = time.time()

def add_line(self, line: str) -> None:
"""Add a line to the buffer, flushing if needed."""
with self._lock:
self._buffer.append(line)
self._buffer_size += len(line) + 1
if self._should_flush():
self._flush_unlocked()

def _should_flush(self) -> bool:
"""Check if buffer should be flushed based on size or time."""
if self._buffer_size >= MAX_BUFFER_SIZE:
return True
if time.time() - self._last_flush >= self._flush_interval:
return True
return False

def _flush_unlocked(self) -> None:
"""Flush buffer to log (must hold lock)."""
if not self._buffer:
return
content = "\n".join(self._buffer)
if self._prefix:
logging.info("%s:\n%s", self._prefix, content)
else:
logging.info(content)
self._buffer = []
self._buffer_size = 0
self._last_flush = time.time()

def flush(self) -> None:
"""Flush any remaining content in the buffer."""
with self._lock:
self._flush_unlocked()


class SubprocessRunner:
@staticmethod
def run(
args: list[str],
ok_message: str = "Command finished sucessfully.",
ok_message: str = "Command finished successfully.",
err_message: str = "Command failed.",
):
logging.debug("Running command: %s", " ".join(args))
Expand All @@ -20,33 +72,34 @@ def run(
text=True,
)

stderr_output = []
stderr_output: deque[str] = deque(maxlen=MAX_STDERR_LINES)
stdout_buffer = LogBuffer()
stderr_buffer = LogBuffer(prefix="Command stderr")

def read_stderr():
if process.stderr:
for line in iter(process.stderr.readline, ""):
stderr_output.append(line.strip())
logging.info("Command stderr: %s", line.strip())
stripped = line.strip()
stderr_output.append(stripped)
stderr_buffer.add_line(stripped)
process.stderr.close()
stderr_buffer.flush()

# Start stderr reader thread
stderr_thread = threading.Thread(target=read_stderr)
stderr_thread.start()

# Read stdout in main thread
stdout_lines = []
if process.stdout:
for line in iter(process.stdout.readline, ""):
stdout_lines.append(line.strip())
logging.info(line.strip())
stdout_buffer.add_line(line.strip())
process.stdout.close()
stdout_buffer.flush()
stderr_thread.join()

process.wait()
stderr_str = "\n".join(stderr_output) if stderr_output else "Unknown error."
if process.returncode != 0:
raise UserException(f"{err_message} Log in event detail.", stderr_str)
elif stderr_str:
elif stderr_output:
logging.info("%s Full log in detail.", ok_message, extra={"full_message": stderr_str})
else:
logging.info(ok_message)