diff --git a/locales/en.json b/locales/en.json index 1d95b6f1..e46a8751 100644 --- a/locales/en.json +++ b/locales/en.json @@ -36,6 +36,7 @@ "unknown_group": "Unknown '{group}' group", "unknown_user": "Unknown '{user}' user", "values_mismatch": "Values don't match", + "info": "Info:", "warning": "Warning:", "websocket_request_expected": "Expected a WebSocket request", "cannot_open_file": "Could not open file {file:s} (reason: {error:s})", diff --git a/moulinette/utils/process.py b/moulinette/utils/process.py index 332e7ebc..299b96d5 100644 --- a/moulinette/utils/process.py +++ b/moulinette/utils/process.py @@ -1,6 +1,7 @@ import errno import time import subprocess +import os from moulinette.core import MoulinetteError @@ -59,9 +60,20 @@ def call_async_output(args, callback, **kwargs): raise ValueError('%s argument not allowed, ' 'it will be overridden.' % a) + if "stdinfo" in kwargs and kwargs["stdinfo"] != None: + assert len(callback) == 3 + stdinfo = kwargs.pop("stdinfo") + os.mkfifo(stdinfo, 0600) + # Open stdinfo for reading (in a nonblocking way, i.e. even + # if command does not write in the stdinfo pipe...) + stdinfo_f = os.open(stdinfo, os.O_RDONLY|os.O_NONBLOCK) + else: + kwargs.pop("stdinfo") + stdinfo = None + # Validate callback argument if isinstance(callback, tuple): - if len(callback) != 2: + if len(callback) < 2: raise ValueError('callback argument should be a 2-tuple') kwargs['stdout'] = kwargs['stderr'] = subprocess.PIPE separate_stderr = True @@ -80,17 +92,22 @@ def call_async_output(args, callback, **kwargs): stdout_reader, stdout_consum = async_file_reading(p.stdout, callback[0]) if separate_stderr: stderr_reader, stderr_consum = async_file_reading(p.stderr, callback[1]) + if stdinfo: + stdinfo_reader, stdinfo_consum = async_file_reading(stdinfo_f, callback[2]) + while not stdout_reader.eof() and not stderr_reader.eof(): while not stdout_consum.empty() or not stderr_consum.empty(): # alternate between the 2 consumers to avoid desynchronisation # this way is not 100% perfect but should do it stdout_consum.process_next_line() stderr_consum.process_next_line() + stdinfo_consum.process_next_line() time.sleep(.1) stderr_reader.join() # clear the queues stdout_consum.process_current_queue() stderr_consum.process_current_queue() + stdinfo_consum.process_current_queue() else: while not stdout_reader.eof(): stdout_consum.process_current_queue() @@ -99,6 +116,13 @@ def call_async_output(args, callback, **kwargs): # clear the queue stdout_consum.process_current_queue() + if stdinfo: + # Remove the stdinfo pipe + os.remove(stdinfo) + os.rmdir(os.path.dirname(stdinfo)) + stdinfo_reader.join() + stdinfo_consum.process_current_queue() + # on slow hardware, in very edgy situations it is possible that the process # isn't finished just after having closed stdout and stderr, so we wait a # bit to give hime the time to finish (while having a timeout) diff --git a/moulinette/utils/stream.py b/moulinette/utils/stream.py index e0350af5..b11f7490 100644 --- a/moulinette/utils/stream.py +++ b/moulinette/utils/stream.py @@ -1,3 +1,6 @@ +import os +import time + from multiprocessing.process import Process from multiprocessing.queues import SimpleQueue @@ -18,15 +21,42 @@ class AsynchronousFileReader(Process): def __init__(self, fd, queue): assert hasattr(queue, 'put') assert hasattr(queue, 'empty') - assert callable(fd.readline) + assert isinstance(fd, int) or callable(fd.readline) Process.__init__(self) self._fd = fd self._queue = queue def run(self): """The body of the tread: read lines and put them on the queue.""" - for line in iter(self._fd.readline, ''): - self._queue.put(line) + + # If self._fd is a file opened with open()... + # Typically that's for stdout/stderr pipes + # We can read the stuff easily with 'readline' + if not isinstance(self._fd, int): + for line in iter(self._fd.readline, ''): + self._queue.put(line) + + # Else, it got opened with os.open() and we have to read it + # wit low level crap... + else: + data = "" + while True: + # Try to read (non-blockingly) a few bytes, append them to + # the buffer + data += os.read(self._fd, 50) + + # If nobody's writing in there anymore, get out + if not data and os.fstat(self._fd).st_nlink == 0: + return + + # If we have data, extract a line (ending with \n) and feed + # it to the consumer + if data and '\n' in data: + lines = data.split('\n') + self._queue.put(lines[0]) + data = '\n'.join(lines[1:]) + else: + time.sleep(0.05) def eof(self): """Check whether there is no more content to expect.""" @@ -36,7 +66,10 @@ class AsynchronousFileReader(Process): """Close the file and join the thread.""" if close: self._queue.put(StopIteration) - self._fd.close() + if isinstance(self._fd, int): + os.close(self._fd) + else: + self._fd.close() Process.join(self, timeout)