diff --git a/moulinette/utils/process.py b/moulinette/utils/process.py index f945cd54..31f99585 100644 --- a/moulinette/utils/process.py +++ b/moulinette/utils/process.py @@ -10,7 +10,7 @@ try: except ImportError: from shlex import quote # Python3 >= 3.3 -from .stream import async_file_reading +from .stream import LogPipe quote # This line is here to avoid W0611 PEP8 error (see comments above) @@ -59,71 +59,20 @@ def call_async_output(args, callback, **kwargs): if a in kwargs: raise ValueError("%s argument not allowed, " "it will be overridden." % a) - if "stdinfo" in kwargs and kwargs["stdinfo"] is not None: - assert len(callback) == 3 - stdinfo = kwargs.pop("stdinfo") - os.mkfifo(stdinfo, 0o600) - # Open stdinfo for reading (in a nonblocking way, i.e. even - # if command does not write in the stdinfo pipe...) - stdinfo_f = os.open(stdinfo, os.O_RDONLY | os.O_NONBLOCK) - else: - if "stdinfo" in kwargs: - kwargs.pop("stdinfo") - stdinfo = None - - # Validate callback argument - if isinstance(callback, tuple): - if len(callback) < 2: - raise ValueError("callback argument should be a 2-tuple") - kwargs["stdout"] = kwargs["stderr"] = subprocess.PIPE - separate_stderr = True - elif callable(callback): - kwargs["stdout"] = subprocess.PIPE - kwargs["stderr"] = subprocess.STDOUT - separate_stderr = False - callback = (callback,) - else: - raise ValueError("callback argument must be callable or a 2-tuple") - - # Run the command - p = subprocess.Popen(args, **kwargs) - - # Wrap and get command outputs - stdout_reader, stdout_consum = async_file_reading(p.stdout, callback[0]) - if separate_stderr: - stderr_reader, stderr_consum = async_file_reading(p.stderr, callback[1]) - if stdinfo: - stdinfo_reader, stdinfo_consum = async_file_reading(stdinfo_f, callback[2]) - - while not stdout_reader.eof() and not stderr_reader.eof(): - while not stdout_consum.empty() or not stderr_consum.empty(): - # alternate between the 2 consumers to avoid desynchronisation - # this way is not 100% perfect but should do it - stdout_consum.process_next_line() - stderr_consum.process_next_line() - if stdinfo: - stdinfo_consum.process_next_line() - time.sleep(0.1) - stderr_reader.join() - # clear the queues - stdout_consum.process_current_queue() - stderr_consum.process_current_queue() - if stdinfo: - stdinfo_consum.process_current_queue() - else: - while not stdout_reader.eof(): - stdout_consum.process_current_queue() - time.sleep(0.1) - stdout_reader.join() - # clear the queue - stdout_consum.process_current_queue() - + kwargs["stdout"] = LogPipe(callback[0]) + kwargs["stderr"] = LogPipe(callback[1]) + stdinfo = LogPipe(callback[2]) if len(callback) >= 3 else None if stdinfo: - # Remove the stdinfo pipe - os.remove(stdinfo) - os.rmdir(os.path.dirname(stdinfo)) - stdinfo_reader.join() - stdinfo_consum.process_current_queue() + kwargs["pass_fds"] = [stdinfo.fdWrite] + if "env" not in kwargs: + kwargs["env"] = os.environ + kwargs["env"]['YNH_STDINFO'] = str(stdinfo.fdWrite) + + with subprocess.Popen(args, **kwargs) as p: + kwargs["stdout"].close() + kwargs["stderr"].close() + if stdinfo: + stdinfo.close() # on slow hardware, in very edgy situations it is possible that the process # isn't finished just after having closed stdout and stderr, so we wait a diff --git a/moulinette/utils/stream.py b/moulinette/utils/stream.py index d453e66d..af9a866a 100644 --- a/moulinette/utils/stream.py +++ b/moulinette/utils/stream.py @@ -1,123 +1,38 @@ import os -import time +import threading -import sys -if sys.version_info[0] == 3: - from multiprocessing.process import BaseProcess as Process - from multiprocessing import SimpleQueue -else: - # python 2 - from multiprocessing.process import Process - from multiprocessing.queues import SimpleQueue +# Adapted from https://codereview.stackexchange.com/a/17959 +class LogPipe(threading.Thread): -# Read from a stream --------------------------------------------------- + def __init__(self, log_callback): + """Setup the object with a logger and a loglevel + and start the thread + """ + threading.Thread.__init__(self) + self.daemon = False + self.log_callback = log_callback + self.fdRead, self.fdWrite = os.pipe() + self.pipeReader = os.fdopen(self.fdRead) -class AsynchronousFileReader(Process): + self.start() - """ - Helper class to implement asynchronous reading of a file - in a separate thread. Pushes read lines on a queue to - be consumed in another thread. - - Based on: - http://stefaanlippens.net/python-asynchronous-subprocess-pipe-reading - - """ - - def __init__(self, fd, queue): - assert hasattr(queue, "put") - assert hasattr(queue, "empty") - assert isinstance(fd, int) or callable(fd.readline) - Process.__init__(self) - self._fd = fd - self._queue = queue + def fileno(self): + """Return the write file descriptor of the pipe + """ + return self.fdWrite def run(self): - """The body of the tread: read lines and put them on the queue.""" + """Run the thread, logging everything. + """ + for line in iter(self.pipeReader.readline, ''): + self.log_callback(line.strip('\n')) - # If self._fd is a file opened with open()... - # Typically that's for stdout/stderr pipes - # We can read the stuff easily with 'readline' - if not isinstance(self._fd, int): - for line in iter(self._fd.readline, ""): - self._queue.put(line) + self.pipeReader.close() - # Else, it got opened with os.open() and we have to read it - # wit low level crap... - else: - data = "" - while True: - try: - # Try to read (non-blockingly) a few bytes, append them to - # the buffer - data += os.read(self._fd, 50) - except Exception as e: - print( - "from moulinette.utils.stream: could not read file descriptor : %s" - % str(e) - ) - continue - - # If nobody's writing in there anymore, get out - if not data and os.fstat(self._fd).st_nlink == 0: - return - - # If we have data, extract a line (ending with \n) and feed - # it to the consumer - if data and "\n" in data: - lines = data.split("\n") - self._queue.put(lines[0]) - data = "\n".join(lines[1:]) - else: - time.sleep(0.05) - - def eof(self): - """Check whether there is no more content to expect.""" - return not self.is_alive() and self._queue.empty() - - def join(self, timeout=None, close=True): - """Close the file and join the thread.""" - if close: - self._queue.put(StopIteration) - if isinstance(self._fd, int): - os.close(self._fd) - else: - self._fd.close() - Process.join(self, timeout) - - -class Consummer(object): - def __init__(self, queue, callback): - self.queue = queue - self.callback = callback - - def empty(self): - return self.queue.empty() - - def process_next_line(self): - if not self.empty(): - line = self.queue.get() - if line: - if line == StopIteration: - return - self.callback(line) - - def process_current_queue(self): - while not self.empty(): - line = self.queue.get() - if line: - if line == StopIteration: - break - self.callback(line) - - -def async_file_reading(fd, callback): - """Helper which instantiate and run an AsynchronousFileReader.""" - queue = SimpleQueue() - reader = AsynchronousFileReader(fd, queue) - reader.start() - consummer = Consummer(queue, callback) - return (reader, consummer) + def close(self): + """Close the write end of the pipe. + """ + os.close(self.fdWrite)