Skip to content

Commit

Permalink
Switch dagster dev IPC mechanism to pipe from signal
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Feb 13, 2025
1 parent e673957 commit 60d2a1a
Show file tree
Hide file tree
Showing 9 changed files with 260 additions and 26 deletions.
12 changes: 6 additions & 6 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ parameters:
- name: py3_env_suffixes
type: object
default:
- api_tests
# - api_tests
- cli_tests
- general_tests
- launcher_tests
- daemon_tests
- daemon_sensor_tests
- scheduler_tests
# - general_tests
# - launcher_tests
# - daemon_tests
# - daemon_sensor_tests
# - scheduler_tests
jobs:
- job: "dagster"
pool:
Expand Down
28 changes: 23 additions & 5 deletions python_modules/dagster-webserver/dagster_webserver/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
from dagster._core.telemetry_upload import uploading_logging_thread
from dagster._core.workspace.context import IWorkspaceProcessContext, WorkspaceProcessContext
from dagster._serdes import deserialize_value
from dagster._serdes.ipc import interrupt_on_ipc_shutdown_message
from dagster._utils import DEFAULT_WORKSPACE_YAML_FILENAME, find_free_port, is_port_in_use
from dagster._utils.interrupts import setup_interrupt_handlers
from dagster._utils.log import configure_loggers

from dagster_webserver.app import create_app_from_workspace_process_context
Expand Down Expand Up @@ -179,6 +181,13 @@ def create_dagster_webserver_cli():
default=2000,
show_default=True,
)
@click.option(
"--shutdown-pipe",
type=click.INT,
required=False,
hidden=True,
help="Internal use only. Pass a readable pipe file descriptior to the webserver process that will be monitored for a shutdown signal.",
)
@click.version_option(version=__version__, prog_name="dagster-webserver")
@workspace_options
def dagster_webserver(
Expand All @@ -195,6 +204,7 @@ def dagster_webserver(
code_server_log_level: str,
instance_ref: Optional[str],
live_data_poll_rate: int,
shutdown_pipe: Optional[int],
**other_opts: object,
):
workspace_opts = WorkspaceOpts.extract_from_cli_options(other_opts)
Expand All @@ -212,11 +222,19 @@ def dagster_webserver(
" `dagster-webserver` instead."
)

with get_possibly_temporary_instance_for_cli(
cli_command="dagster-webserver",
instance_ref=deserialize_value(instance_ref, InstanceRef) if instance_ref else None,
logger=logger,
) as instance:
# Essential on windows-- will set up windows interrupt signals to raise KeyboardInterrupt
setup_interrupt_handlers()

with contextlib.ExitStack() as stack:
if shutdown_pipe:
stack.enter_context(interrupt_on_ipc_shutdown_message(shutdown_pipe))
instance = stack.enter_context(
get_possibly_temporary_instance_for_cli(
cli_command="dagster-webserver",
instance_ref=deserialize_value(instance_ref, InstanceRef) if instance_ref else None,
logger=logger,
)
)
# Allow the instance components to change behavior in the context of a long running server process
instance.optimize_for_webserver(db_statement_timeout, db_pool_recycle)

Expand Down
44 changes: 36 additions & 8 deletions python_modules/dagster/dagster/_cli/dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import tempfile
import time
from collections.abc import Iterator, Sequence
from contextlib import contextmanager
from contextlib import ExitStack, contextmanager
from pathlib import Path
from typing import Optional

Expand All @@ -19,7 +19,12 @@
from dagster._core.workspace.context import WorkspaceProcessContext
from dagster._grpc.server import GrpcServerCommand
from dagster._serdes import serialize_value
from dagster._serdes.ipc import interrupt_ipc_subprocess, open_ipc_subprocess
from dagster._serdes.ipc import (
get_ipc_shutdown_pipe,
interrupt_on_ipc_shutdown_message,
open_ipc_subprocess,
send_ipc_shutdown_message,
)
from dagster._utils.interrupts import setup_interrupt_handlers
from dagster._utils.log import configure_loggers

Expand Down Expand Up @@ -87,6 +92,13 @@
is_flag=True,
default=False,
)
@click.option(
"--shutdown-pipe",
type=click.INT,
required=False,
hidden=True,
help="Internal use only. Pass a readable pipe file descriptor to the dev process that will be monitored for a shutdown signal.",
)
@workspace_options
@deprecated(
breaking_version="2.0", subject="--dagit-port and --dagit-host args", emit_runtime_warning=False
Expand All @@ -99,6 +111,7 @@ def dev_command(
host: Optional[str],
live_data_poll_rate: Optional[str],
use_legacy_code_server_behavior: bool,
shutdown_pipe: Optional[int],
**other_opts: object,
) -> None:
workspace_opts = WorkspaceOpts.extract_from_cli_options(other_opts)
Expand Down Expand Up @@ -133,10 +146,17 @@ def dev_command(
" unless it is placed in the same folder as DAGSTER_HOME."
)

# Essential on windows-- will set up windows signals to raise KeyboardInterrupt
# Essential on windows-- will set up windows interrupt signals to raise KeyboardInterrupt
setup_interrupt_handlers()

with get_possibly_temporary_instance_for_cli("dagster dev", logger=logger) as instance:
with ExitStack() as stack:
if shutdown_pipe:
stack.enter_context(interrupt_on_ipc_shutdown_message(shutdown_pipe))
instance = stack.enter_context(
get_possibly_temporary_instance_for_cli("dagster dev", logger=logger)
)
# stack.enter_context(raise_interrupts_as(KeyboardInterrupt))

with _optionally_create_temp_workspace(
use_legacy_code_server_behavior=use_legacy_code_server_behavior,
workspace_opts=workspace_opts,
Expand All @@ -152,15 +172,20 @@ def dev_command(
*workspace_args,
]

webserver_read_fd, webserver_write_fd = get_ipc_shutdown_pipe()
webserver_process = open_ipc_subprocess(
[sys.executable, "-m", "dagster_webserver"]
+ (["--port", port] if port else [])
+ (["--host", host] if host else [])
+ (["--dagster-log-level", log_level])
+ (["--log-format", log_format])
+ (["--live-data-poll-rate", live_data_poll_rate] if live_data_poll_rate else [])
+ args
+ ["--shutdown-pipe", str(webserver_read_fd)]
+ args,
pass_fds=[webserver_read_fd],
)

daemon_read_fd, daemon_write_fd = get_ipc_shutdown_pipe()
daemon_process = open_ipc_subprocess(
[
sys.executable,
Expand All @@ -171,8 +196,11 @@ def dev_command(
log_level,
"--log-format",
log_format,
"--shutdown-pipe",
str(daemon_read_fd),
]
+ args
+ args,
pass_fds=[daemon_read_fd],
)
try:
while True:
Expand All @@ -196,8 +224,8 @@ def dev_command(
logger.exception("An unexpected exception has occurred")
finally:
logger.info("Shutting down Dagster services...")
interrupt_ipc_subprocess(daemon_process)
interrupt_ipc_subprocess(webserver_process)
send_ipc_shutdown_message(webserver_write_fd)
send_ipc_shutdown_message(daemon_write_fd)

try:
webserver_process.wait(timeout=_SUBPROCESS_WAIT_TIMEOUT)
Expand Down
20 changes: 18 additions & 2 deletions python_modules/dagster/dagster/_daemon/cli/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import sys
from contextlib import ExitStack
from typing import Optional

import click
Expand All @@ -19,7 +20,8 @@
)
from dagster._daemon.daemon import get_telemetry_daemon_session_id
from dagster._serdes import deserialize_value
from dagster._utils.interrupts import capture_interrupts
from dagster._serdes.ipc import interrupt_on_ipc_shutdown_message
from dagster._utils.interrupts import capture_interrupts, setup_interrupt_handlers


def _get_heartbeat_tolerance():
Expand Down Expand Up @@ -62,19 +64,33 @@ def _get_heartbeat_tolerance():
required=False,
hidden=True,
)
@click.option(
"--shutdown-pipe",
type=click.INT,
required=False,
hidden=True,
help="Internal use only. Pass a readable pipe file descriptor to the daemon process that will be monitored for a shutdown signal.",
)
@workspace_options
def run_command(
code_server_log_level: str,
log_level: str,
log_format: str,
instance_ref: Optional[str],
shutdown_pipe: Optional[int],
**other_opts: object,
) -> None:
workspace_opts = WorkspaceOpts.extract_from_cli_options(other_opts)
assert_no_remaining_opts(other_opts)

# Essential on windows-- will set up windows interrupt signals to raise KeyboardInterrupt
setup_interrupt_handlers()

try:
with capture_interrupts():
with ExitStack() as stack:
if shutdown_pipe:
stack.enter_context(interrupt_on_ipc_shutdown_message(shutdown_pipe))
stack.enter_context(capture_interrupts())
with get_instance_for_cli(
instance_ref=deserialize_value(instance_ref, InstanceRef) if instance_ref else None
) as instance:
Expand Down
93 changes: 92 additions & 1 deletion python_modules/dagster/dagster/_serdes/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
import signal
import subprocess
import sys
import threading
from collections.abc import Iterator, Sequence
from contextlib import contextmanager
from io import TextIOWrapper
from subprocess import Popen
from time import sleep
from typing import Any, NamedTuple, Optional
from typing import Any, Callable, NamedTuple, Optional

import dagster._check as check
from dagster._core.errors import DagsterError
Expand Down Expand Up @@ -196,6 +197,13 @@ def open_ipc_subprocess(parts: Sequence[str], **kwargs: Any) -> "Popen[Any]":
if sys.platform == "win32":
creationflags = subprocess.CREATE_NEW_PROCESS_GROUP

# pass_fds is not supported on Windows. Instead we set close_fds to False, which will allow
# any inheritable file descriptors marked as inheritable to be inherited by the child
# process.
if kwargs.get("pass_fds"):
del kwargs["pass_fds"]
kwargs["close_fds"] = False

return subprocess.Popen(
parts,
creationflags=creationflags,
Expand Down Expand Up @@ -227,3 +235,86 @@ def interrupt_ipc_subprocess_pid(pid: int) -> None:
os.kill(pid, signal.CTRL_BREAK_EVENT)
else:
os.kill(pid, signal.SIGINT)


# ########################
# ##### SHUTDOWN PIPE
# ########################

_PIPE_SHUTDOWN_INDICATOR = "SHUTDOWN"


def get_ipc_shutdown_pipe() -> tuple[int, int]:
r_fd, w_fd = os.pipe()

# On windows, convert fd to a Windows handle so it can be reliably passed across processes.
if sys.platform == "win32":
import msvcrt

# Convert fd to a Windows handle so it can be reliably passed across processes
os.set_inheritable(r_fd, True)
r_fd = msvcrt.get_osfhandle(r_fd)
return r_fd, w_fd


@contextmanager
def monitor_ipc_shutdown_pipe(pipe_fd: int, handler: Callable[[], None]) -> Iterator[None]:
"""Monitor the passed in pipe file descriptor for the shutdown indicator message.
When received, trigger the handler.
Args:
pipe_fd: The file descriptor of the pipe to monitor. Must be readable.
If on windows, this is assumed to be a Windows handle rather than a regular file
descriptor.
handler: The handler to call when the shutdown indicator is received.
"""
# On windows, we expect to receive a raw Windows handle rather than a regular file descriptor.
# Convert to a file descriptor before reading.
if sys.platform == "win32":
import msvcrt

pipe_fd = msvcrt.open_osfhandle(pipe_fd, os.O_RDONLY)

break_event = threading.Event()

def _watch_pipe_for_shutdown():
with open(pipe_fd) as pipe:
while not break_event.is_set():
line = pipe.readline()
if not line: # EOF or pipe closed
break_event.set()
elif _PIPE_SHUTDOWN_INDICATOR in line.strip():
break_event.set()
handler()

# Start a background thread that watches the pipe
monitor_thread = threading.Thread(target=_watch_pipe_for_shutdown, daemon=True)
monitor_thread.start()

try:
yield
finally:
# Signal the thread to exit and wait for it to stop
break_event.set()
monitor_thread.join()


@contextmanager
def interrupt_on_ipc_shutdown_message(pipe_fd: int) -> Iterator[None]:
"""Monitor the passed in pipe file descriptor for the shutdown indicator message. Interrupt the
current process when the message is received.
Args:
pipe_fd: The file descriptor of the pipe to monitor. Must be readable.
If on windows, this is assumed to be raw Windows handle rather than a regular file
descriptor.
"""
with monitor_ipc_shutdown_pipe(
pipe_fd, handler=lambda: interrupt_ipc_subprocess_pid(os.getpid())
):
yield


def send_ipc_shutdown_message(w_fd: int) -> None:
os.write(w_fd, f"{_PIPE_SHUTDOWN_INDICATOR}\n".encode())
os.close(w_fd)
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
from dagster._grpc.client import DagsterGrpcClient
from dagster._grpc.server import wait_for_grpc_server
from dagster._serdes.ipc import (
interrupt_ipc_subprocess,
get_ipc_shutdown_pipe,
interrupt_then_kill_ipc_subprocess,
open_ipc_subprocess,
send_ipc_shutdown_message,
)
from dagster._utils import find_free_port, pushd
from dagster_graphql import DagsterGraphQLClient
Expand Down Expand Up @@ -213,17 +214,19 @@ def test_dagster_dev_command_legacy_code_server_behavior():
def _launch_dev_command(
options: list[str], capture_output: bool = False
) -> Iterator[subprocess.Popen]:
read_fd, write_fd = get_ipc_shutdown_pipe()
proc = open_ipc_subprocess(
["dagster", "dev", *options],
["dagster", "dev", *options, "--shutdown-pipe", str(read_fd)],
stdout=subprocess.PIPE if capture_output else None,
stderr=subprocess.PIPE if capture_output else None,
cwd=os.getcwd(),
pass_fds=[read_fd],
)
try:
yield proc
finally:
child_processes = _get_child_processes(proc.pid)
interrupt_ipc_subprocess(proc)
send_ipc_shutdown_message(write_fd)
proc.wait(timeout=10)
# The `dagster dev` command exits before the gRPC servers it spins up have shutdown. Wait
# for the child processes to exit here to make sure we don't leave any hanging processes.
Expand Down
Loading

0 comments on commit 60d2a1a

Please sign in to comment.