Skip to content

Commit

Permalink
Fix stalling in Pipeline command
Browse files Browse the repository at this point in the history
When a command (other than the first) in a pipeline wrote more than
64k to the stderr, and the output was consumed with the iter_lines
function, the whole pipeline stalled. Fixed by reading the output
of all commands where either stdout or stderr was set to PIPE.
  • Loading branch information
astaric committed Jan 18, 2023
1 parent 2d82fad commit 0a691ba
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 9 deletions.
2 changes: 0 additions & 2 deletions plumbum/commands/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,6 @@ def popen(self, args=(), **kwargs):
dstproc = self.dstcmd.popen(**kwargs)
# allow p1 to receive a SIGPIPE if p2 exits
srcproc.stdout.close()
if srcproc.stderr is not None:
dstproc.stderr = srcproc.stderr
if srcproc.stdin and src_kwargs.get("stdin") != PIPE:
srcproc.stdin.close()
dstproc.srcproc = srcproc
Expand Down
41 changes: 34 additions & 7 deletions plumbum/commands/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,42 @@ def _check_process(proc, retcode, timeout, stdout, stderr):
return proc.returncode, stdout, stderr


def get_piped_streams(proc):
"""Get a list of all valid standard streams for proc that were opened with PIPE option.
If proc was started from a Pipeline command, this function assumes it will have a
"srcproc" member pointing to the previous command in the pipeline. That link will
be used to traverse all started processes started from the pipeline, the list will
include stdout/stderr streams opened as PIPE for all commands in the pipeline.
If that was not the case, some processes could write to pipes no one reads from
which would result in process stalling after the pipe's buffer is filled.
Streams that were closed (because they were redirected to the input of a subsequent command)
are not included in the result
"""
streams = []

def add_stream(type_, stream):
if stream is None or stream.closed:
return
streams.append((type_, stream))

while proc:
add_stream(1, proc.stderr)
add_stream(0, proc.stdout)
proc = getattr(proc, "srcproc", None)

return streams


def _iter_lines_posix(proc, decode, linesize, line_timeout=None):
from selectors import EVENT_READ, DefaultSelector

streams = get_piped_streams(proc)
# Python 3.4+ implementation
def selector():
sel = DefaultSelector()
sel.register(proc.stdout, EVENT_READ, 0)
sel.register(proc.stderr, EVENT_READ, 1)
for stream_type, stream in streams:
sel.register(stream, EVENT_READ, stream_type)
while True:
ready = sel.select(line_timeout)
if not ready and line_timeout:
Expand All @@ -41,10 +69,9 @@ def selector():
yield ret
if proc.poll() is not None:
break
for line in proc.stdout:
yield 0, decode(line)
for line in proc.stderr:
yield 1, decode(line)
for stream_type, stream in streams:
for line in stream:
yield stream_type, decode(line)


def _iter_lines_win32(proc, decode, linesize, line_timeout=None):
Expand Down
70 changes: 70 additions & 0 deletions tests/test_pipelines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from typing import List, Tuple

import pytest

import plumbum
from plumbum.commands import BaseCommand


@pytest.mark.timeout(1)
def test_draining_stderr(generate_cmd, process_cmd):
stdout, stderr = get_output_with_iter_lines(
generate_cmd | process_cmd | process_cmd
)
assert len(stderr) == 15000
assert len(stdout) == 5000


@pytest.mark.timeout(1)
def test_draining_stderr_with_stderr_redirect(tmp_path, generate_cmd, process_cmd):
stdout, stderr = get_output_with_iter_lines(
generate_cmd | (process_cmd >= str(tmp_path / "output.txt")) | process_cmd
)
assert len(stderr) == 10000
assert len(stdout) == 5000


@pytest.mark.timeout(1)
def test_draining_stderr_with_stdout_redirect(tmp_path, generate_cmd, process_cmd):
stdout, stderr = get_output_with_iter_lines(
generate_cmd | process_cmd | process_cmd > str(tmp_path / "output.txt")
)
assert len(stderr) == 15000
assert len(stdout) == 0


@pytest.fixture()
def generate_cmd(tmp_path):
generate = tmp_path / "generate.py"
generate.write_text("""\
import sys
for i in range(5000):
print("generated", i, file=sys.stderr)
print(i)
""")
return plumbum.local["python"][generate]


@pytest.fixture()
def process_cmd(tmp_path):
process = tmp_path / "process.py"
process.write_text("""\
import sys
for line in sys.stdin:
i = line.strip()
print("consumed", i, file=sys.stderr)
print(i)
""")
return plumbum.local["python"][process]


def get_output_with_iter_lines(cmd: BaseCommand) -> Tuple[List[str], List[str]]:
stderr, stdout = [], []
proc = cmd.popen()
for stdout_line, stderr_line in proc.iter_lines(retcode=[0, None]):
if stderr_line is not None:
stderr.append(stderr_line)
if stdout_line is not None:
stdout.append(stdout_line)
proc.wait()
return stdout, stderr

0 comments on commit 0a691ba

Please sign in to comment.