diff --git a/plumbum/commands/processes.py b/plumbum/commands/processes.py index c5ab2573c..98d861726 100644 --- a/plumbum/commands/processes.py +++ b/plumbum/commands/processes.py @@ -1,7 +1,6 @@ import time import atexit import heapq -from subprocess import Popen from threading import Thread from plumbum.lib import IS_WIN32, six @@ -24,7 +23,7 @@ def _check_process(proc, retcode, timeout, stdout, stderr): return proc.returncode, stdout, stderr -def _iter_lines(proc, decode, linesize, line_timeout=None): +def _iter_lines_posix(proc, decode, linesize, line_timeout=None): try: from selectors import DefaultSelector, EVENT_READ except ImportError: @@ -62,6 +61,62 @@ def selector(): yield 1, decode(line) +def _iter_lines_win32(proc, decode, linesize, line_timeout=None): + + class Piper(Thread): + + def __init__(self, fd, pipe): + super().__init__(name="PlumbumPiper%sThread" % fd) + self.pipe = pipe + self.fd = fd + self.empty = False + self.daemon = True + super().start() + + def read_from_pipe(self): + return self.pipe.readline(linesize) + + def run(self): + for line in iter(self.read_from_pipe, b''): + queue.put((self.fd, decode(line))) + # self.pipe.close() + + if line_timeout is None: + line_timeout = float("inf") + queue = Queue() + pipers = [Piper(0, proc.stdout), Piper(1, proc.stderr)] + last_line_ts = time.time() + empty = True + while True: + try: + yield queue.get_nowait() + last_line_ts = time.time() + empty = False + except QueueEmpty: + empty = True + if time.time() - last_line_ts > line_timeout: + raise ProcessLineTimedOut("popen line timeout expired", getattr(proc, "argv", None), getattr(proc, "machine", None)) + if proc.poll() is not None: + break + if empty: + time.sleep(0.1) + + for piper in pipers: + piper.join() + + while True: + try: + yield queue.get_nowait() + except QueueEmpty: + break + + +if IS_WIN32: + _iter_lines = _iter_lines_win32 +else: + _iter_lines = _iter_lines_posix + + #=================================================================================================== # Exceptions #===================================================================================================