diff --git a/rich/live.py b/rich/live.py index 8738cf09f..94f4ca30c 100644 --- a/rich/live.py +++ b/rich/live.py @@ -1,7 +1,32 @@ import sys +import os +import threading +from contextlib import contextmanager +from copy import copy +from datetime import datetime +from itertools import islice from threading import Event, RLock, Thread +from time import monotonic from types import TracebackType -from typing import IO, Any, Callable, List, Optional, TextIO, Type, cast +from typing import ( + IO, + TYPE_CHECKING, + Any, + Callable, + Dict, + Generic, + Iterable, + List, + Optional, + TextIO, + Tuple, + Type, + TypeVar, + Union, + cast, +) +import signal +import atexit from . import get_console from .console import Console, ConsoleRenderable, RenderableType, RenderHook @@ -37,14 +62,18 @@ class Live(JupyterMixin, RenderHook): Args: renderable (RenderableType, optional): The renderable to live display. Defaults to displaying nothing. - console (Console, optional): Optional Console instance. Defaults to an internal Console instance writing to stdout. + console (Console, optional): Optional Console instance. Default will derive from + get_console(). screen (bool, optional): Enable alternate screen mode. Defaults to False. - auto_refresh (bool, optional): Enable auto refresh. If disabled, you will need to call `refresh()` or `update()` with refresh flag. Defaults to True - refresh_per_second (float, optional): Number of times per second to refresh the live display. Defaults to 4. + auto_refresh (bool, optional): Enable auto refresh. If disabled, you will need to call `refresh()`. + Defaults to True. + refresh_per_second (float, optional): Number of times per second to refresh the live display. + Defaults to 4. transient (bool, optional): Clear the renderable on exit (has no effect when screen=True). Defaults to False. redirect_stdout (bool, optional): Enable redirection of stdout, so ``print`` may be used. Defaults to True. redirect_stderr (bool, optional): Enable redirection of stderr. Defaults to True. - vertical_overflow (VerticalOverflowMethod, optional): How to handle renderable when it is too tall for the console. Defaults to "ellipsis". + vertical_overflow (VerticalOverflowMethod, optional): How to handle renderable when it is too tall for the + console. Defaults to "ellipsis". get_renderable (Callable[[], RenderableType], optional): Optional callable to get renderable. Defaults to None. """ @@ -63,30 +92,41 @@ def __init__( get_renderable: Optional[Callable[[], RenderableType]] = None, ) -> None: assert refresh_per_second > 0, "refresh_per_second must be > 0" + self.console = console or get_console() self._renderable = renderable - self.console = console if console is not None else get_console() self._screen = screen - self._alt_screen = False - + self.auto_refresh = auto_refresh + self.refresh_per_second = refresh_per_second + self.transient = True if screen else transient self._redirect_stdout = redirect_stdout self._redirect_stderr = redirect_stderr + self.vertical_overflow = vertical_overflow + self._get_renderable = get_renderable + self._alt_screen: bool = False self._restore_stdout: Optional[IO[str]] = None self._restore_stderr: Optional[IO[str]] = None - self._lock = RLock() - self.ipy_widget: Optional[Any] = None - self.auto_refresh = auto_refresh - self._started: bool = False - self.transient = True if screen else transient - self._refresh_thread: Optional[_RefreshThread] = None - self.refresh_per_second = refresh_per_second - - self.vertical_overflow = vertical_overflow - self._get_renderable = get_renderable + self._started: bool = False + self.ipy_widget: Any = None self._live_render = LiveRender( self.get_renderable(), vertical_overflow=vertical_overflow ) + self._original_sigint_handler = None + self._exit_handler_added = False + + def _handle_sigint(self, sig, frame): + """Handle SIGINT (Ctrl+C) to ensure cursor is shown.""" + # Restore cursor + if self.console.is_terminal: + self.console.show_cursor(True) + # Re-raise KeyboardInterrupt to allow program to exit + raise KeyboardInterrupt() + + def _ensure_cursor_visible_at_exit(self): + """Ensure cursor is visible when program exits.""" + if self.console.is_terminal: + self.console.show_cursor(True) @property def is_started(self) -> bool: @@ -110,6 +150,13 @@ def start(self, refresh: bool = False) -> None: with self._lock: if self._started: return + # Set up signal handler for Ctrl+C (only on non-Windows platforms) + if not self._exit_handler_added: + atexit.register(self._ensure_cursor_visible_at_exit) + self._exit_handler_added = True + # Only set up SIGINT handler on platforms that support it (not Windows) + if os.name != "nt" and hasattr(signal, "SIGINT"): + self._original_sigint_handler = signal.signal(signal.SIGINT, self._handle_sigint) self.console.set_live(self) self._started = True if self._screen: @@ -144,23 +191,33 @@ def stop(self) -> None: self._refresh_thread = None # allow it to fully render on the last even if overflow self.vertical_overflow = "visible" - with self.console: - try: - if not self._alt_screen and not self.console.is_jupyter: - self.refresh() - finally: - self._disable_redirect_io() - self.console.pop_render_hook() - if not self._alt_screen and self.console.is_terminal: - self.console.line() + try: + with self.console: + try: + if not self._alt_screen and not self.console.is_jupyter: + self.refresh() + finally: + self._disable_redirect_io() + self.console.pop_render_hook() + if not self._alt_screen and self.console.is_terminal: + self.console.line() + self.console.show_cursor(True) + if self._alt_screen: + self.console.set_alt_screen(False) + + if self.transient and not self._alt_screen: + self.console.control(self._live_render.restore_cursor()) + if self.ipy_widget is not None and self.transient: + self.ipy_widget.close() # pragma: no cover + except: + # Ensure cursor is shown even if something goes wrong during cleanup + if self.console.is_terminal: self.console.show_cursor(True) - if self._alt_screen: - self.console.set_alt_screen(False) - - if self.transient and not self._alt_screen: - self.console.control(self._live_render.restore_cursor()) - if self.ipy_widget is not None and self.transient: - self.ipy_widget.close() # pragma: no cover + raise + # Restore original signal handler (only on non-Windows platforms) + if os.name != "nt" and hasattr(signal, "SIGINT") and self._original_sigint_handler is not None: + signal.signal(signal.SIGINT, self._original_sigint_handler) + self._original_sigint_handler = None def __enter__(self) -> "Live": self.start(refresh=self._renderable is not None) diff --git a/rich/progress.py b/rich/progress.py index 1e92eb6b1..504c9ff1a 100644 --- a/rich/progress.py +++ b/rich/progress.py @@ -33,6 +33,9 @@ TypeVar, Union, ) +import signal +import atexit +import os if sys.version_info >= (3, 8): from typing import Literal @@ -391,7 +394,7 @@ def open( finished_style: StyleType = "bar.finished", pulse_style: StyleType = "bar.pulse", disable: bool = False, -) -> ContextManager[TextIO]: +) -> ContextManager[BinaryIO]: pass @@ -1111,6 +1114,21 @@ def __init__( self.get_time = get_time or self.console.get_time self.print = self.console.print self.log = self.console.log + self._original_sigint_handler = None + self._exit_handler_added = False + + def _handle_sigint(self, sig, frame): + """Handle SIGINT (Ctrl+C) to ensure cursor is shown.""" + # Restore cursor + if self.console.is_terminal: + self.console.show_cursor(True) + # Re-raise KeyboardInterrupt to allow program to exit + raise KeyboardInterrupt() + + def _ensure_cursor_visible_at_exit(self): + """Ensure cursor is visible when program exits.""" + if self.console.is_terminal: + self.console.show_cursor(True) @classmethod def get_default_columns(cls) -> Tuple[ProgressColumn, ...]: @@ -1170,13 +1188,30 @@ def finished(self) -> bool: def start(self) -> None: """Start the progress display.""" if not self.disable: + # Set up signal handler for Ctrl+C (only on non-Windows platforms) + if not self._exit_handler_added: + atexit.register(self._ensure_cursor_visible_at_exit) + self._exit_handler_added = True + # Only set up SIGINT handler on platforms that support it (not Windows) + if os.name != "nt" and hasattr(signal, "SIGINT"): + self._original_sigint_handler = signal.signal(signal.SIGINT, self._handle_sigint) self.live.start(refresh=True) def stop(self) -> None: """Stop the progress display.""" - self.live.stop() - if not self.console.is_interactive and not self.console.is_jupyter: - self.console.print() + try: + self.live.stop() + if not self.console.is_interactive and not self.console.is_jupyter: + self.console.print() + # Restore original signal handler (only on non-Windows platforms) + if os.name != "nt" and hasattr(signal, "SIGINT") and self._original_sigint_handler is not None: + signal.signal(signal.SIGINT, self._original_sigint_handler) + self._original_sigint_handler = None + except: + # Ensure cursor is visible even if an exception occurs during stop + if self.console.is_terminal: + self.console.show_cursor(True) + raise def __enter__(self) -> Self: self.start()