From 4f31cf00442a7de49726f7f0c4b1bceea7edb762 Mon Sep 17 00:00:00 2001 From: Alexandre Aubin Date: Sun, 29 Oct 2017 23:12:03 +0100 Subject: [PATCH 1/5] [enh] Handle a particular 'stdinfo' output through named pipe --- moulinette/utils/process.py | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/moulinette/utils/process.py b/moulinette/utils/process.py index e48f27a9..74c56c65 100644 --- a/moulinette/utils/process.py +++ b/moulinette/utils/process.py @@ -1,6 +1,7 @@ import errno import time import subprocess +import os from moulinette.core import MoulinetteError @@ -59,9 +60,18 @@ def call_async_output(args, callback, **kwargs): raise ValueError('%s argument not allowed, ' 'it will be overridden.' % a) + + if "stdinfo" in kwargs and kwargs["stdinfo"] != None: + assert len(callback) == 3 + stdinfo = kwargs.pop("stdinfo") + os.mkfifo(stdinfo, 0600) + else: + kwargs.pop("stdinfo") + stdinfo = None + # Validate callback argument if isinstance(callback, tuple): - if len(callback) != 2: + if len(callback) < 2: raise ValueError('callback argument should be a 2-tuple') kwargs['stdout'] = kwargs['stderr'] = subprocess.PIPE separate_stderr = True @@ -80,6 +90,14 @@ def call_async_output(args, callback, **kwargs): stdout_reader, stdout_consum = async_file_reading(p.stdout, callback[0]) if separate_stderr: stderr_reader, stderr_consum = async_file_reading(p.stderr, callback[1]) + if stdinfo: + stdinfo_f = open(stdinfo, "r") + stdinfo_reader, stdinfo_consum = async_file_reading(stdinfo_f, callback[2]) + while not stdinfo_reader.eof() and not stdinfo_reader.eof(): + time.sleep(.1) + stdinfo_reader.join() + stdinfo_consum.join() + while not stdout_reader.eof() and not stderr_reader.eof(): time.sleep(.1) stderr_reader.join() @@ -90,6 +108,11 @@ def call_async_output(args, callback, **kwargs): stdout_reader.join() stdout_consum.join() + if stdinfo: + stdinfo_f.close() + os.remove(stdinfo) + os.rmdir(os.path.dirname(stdinfo)) + # on slow hardware, in very edgy situations it is possible that the process # isn't finished just after having closed stdout and stderr, so we wait a # bit to give hime the time to finish (while having a timeout) From 013e1155f958ac7268a83274a323c7dab93ad6b3 Mon Sep 17 00:00:00 2001 From: Alexandre Aubin Date: Sun, 29 Oct 2017 23:12:34 +0100 Subject: [PATCH 2/5] [fix] Fix display of 'INFO' messages in --verbose --- locales/en.json | 1 + moulinette/interfaces/cli.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/locales/en.json b/locales/en.json index 6eda60ca..a7156acd 100644 --- a/locales/en.json +++ b/locales/en.json @@ -35,6 +35,7 @@ "unknown_group": "Unknown '{group}' group", "unknown_user": "Unknown '{user}' user", "values_mismatch": "Values don't match", + "info": "Info:", "warning": "Warning:", "websocket_request_expected": "Expected a WebSocket request", "cannot_open_file": "Could not open file {file:s} (reason: {error:s})", diff --git a/moulinette/interfaces/cli.py b/moulinette/interfaces/cli.py index 8ed96ced..00737b5e 100644 --- a/moulinette/interfaces/cli.py +++ b/moulinette/interfaces/cli.py @@ -182,7 +182,7 @@ class TTYHandler(logging.StreamHandler): if self.level <= log.DEBUG: # add level name before message level = '%s ' % record.levelname - elif record.levelname in ['SUCCESS', 'WARNING', 'ERROR']: + elif record.levelname in ['SUCCESS', 'WARNING', 'ERROR', 'INFO']: # add translated level name before message level = '%s ' % m18n.g(record.levelname.lower()) color = self.LEVELS_COLOR.get(record.levelno, 'white') From 37918d9816a4b143dc456f9256bc404f3bc8cc8d Mon Sep 17 00:00:00 2001 From: Alexandre Aubin Date: Sun, 29 Oct 2017 23:58:54 +0100 Subject: [PATCH 3/5] [fix] Go to sleep Alex, you drunk --- moulinette/utils/process.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/moulinette/utils/process.py b/moulinette/utils/process.py index 74c56c65..dda91843 100644 --- a/moulinette/utils/process.py +++ b/moulinette/utils/process.py @@ -93,12 +93,12 @@ def call_async_output(args, callback, **kwargs): if stdinfo: stdinfo_f = open(stdinfo, "r") stdinfo_reader, stdinfo_consum = async_file_reading(stdinfo_f, callback[2]) - while not stdinfo_reader.eof() and not stdinfo_reader.eof(): + while not stdout_reader.eof() or not stderr_reader.eof() or not stdinfo_reader.eof(): time.sleep(.1) stdinfo_reader.join() stdinfo_consum.join() - while not stdout_reader.eof() and not stderr_reader.eof(): + while not stdout_reader.eof() or not stderr_reader.eof(): time.sleep(.1) stderr_reader.join() stderr_consum.join() From 8151ab3cafc95d846bbae1d82d2074ee0af1228d Mon Sep 17 00:00:00 2001 From: Alexandre Aubin Date: Sat, 4 Nov 2017 01:36:38 +0100 Subject: [PATCH 4/5] =?UTF-8?q?(=E2=95=AF=C2=B0=E2=96=A1=C2=B0=EF=BC=89?= =?UTF-8?q?=E2=95=AF=EF=B8=B5=20=E2=94=BB=E2=94=81=E2=94=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- moulinette/utils/process.py | 15 ++++++------- moulinette/utils/stream.py | 43 +++++++++++++++++++++++++++++++++---- 2 files changed, 46 insertions(+), 12 deletions(-) diff --git a/moulinette/utils/process.py b/moulinette/utils/process.py index dda91843..2151763f 100644 --- a/moulinette/utils/process.py +++ b/moulinette/utils/process.py @@ -60,7 +60,6 @@ def call_async_output(args, callback, **kwargs): raise ValueError('%s argument not allowed, ' 'it will be overridden.' % a) - if "stdinfo" in kwargs and kwargs["stdinfo"] != None: assert len(callback) == 3 stdinfo = kwargs.pop("stdinfo") @@ -91,10 +90,15 @@ def call_async_output(args, callback, **kwargs): if separate_stderr: stderr_reader, stderr_consum = async_file_reading(p.stderr, callback[1]) if stdinfo: - stdinfo_f = open(stdinfo, "r") + # Open stdinfo for reading (in a nonblocking way, i.e. even + # if command does not write in the stdinfo pipe...) + stdinfo_f = os.open(stdinfo, os.O_RDONLY|os.O_NONBLOCK) stdinfo_reader, stdinfo_consum = async_file_reading(stdinfo_f, callback[2]) - while not stdout_reader.eof() or not stderr_reader.eof() or not stdinfo_reader.eof(): + while not stdout_reader.eof() or not stderr_reader.eof(): time.sleep(.1) + # Remove the stdinfo pipe + os.remove(stdinfo) + os.rmdir(os.path.dirname(stdinfo)) stdinfo_reader.join() stdinfo_consum.join() @@ -108,11 +112,6 @@ def call_async_output(args, callback, **kwargs): stdout_reader.join() stdout_consum.join() - if stdinfo: - stdinfo_f.close() - os.remove(stdinfo) - os.rmdir(os.path.dirname(stdinfo)) - # on slow hardware, in very edgy situations it is possible that the process # isn't finished just after having closed stdout and stderr, so we wait a # bit to give hime the time to finish (while having a timeout) diff --git a/moulinette/utils/stream.py b/moulinette/utils/stream.py index 46b6ff88..32dbaa41 100644 --- a/moulinette/utils/stream.py +++ b/moulinette/utils/stream.py @@ -1,3 +1,6 @@ +import os +import time + from multiprocessing.process import Process from multiprocessing.queues import SimpleQueue @@ -18,15 +21,44 @@ class AsynchronousFileReader(Process): def __init__(self, fd, queue): assert hasattr(queue, 'put') assert hasattr(queue, 'empty') - assert callable(fd.readline) + assert isinstance(fd, int) or callable(fd.readline) Process.__init__(self) self._fd = fd self._queue = queue def run(self): """The body of the tread: read lines and put them on the queue.""" - for line in iter(self._fd.readline, ''): - self._queue.put(line) + + # If self._fd is a file opened with open()... + # Typically that's for stdout/stderr pipes + # We can read the stuff easily with 'readline' + if not isinstance(self._fd, int): + for line in iter(self._fd.readline, ''): + self._queue.put(line) + + # Else, it got opened with os.open() and we have to read it + # wit low level crap... + else: + data = "" + while True: + # If nobody's writing in there anymore, get out + if os.fstat(self._fd).st_nlink == 0: + print "Plop" + self._queue.put("(Info returning because no link)") + return + + # Read (non-blockingly) a few bytes, append them to the buffer + data += os.read(self._fd, 50) + print data + + # If we have data, extract a line (ending with \n) and feed + # it to the consumer + if data and '\n' in data: + lines = data.split('\n') + self._queue.put(lines[0]) + data = '\n'.join(lines[1:]) + else: + time.sleep(0.1) def eof(self): """Check whether there is no more content to expect.""" @@ -36,7 +68,10 @@ class AsynchronousFileReader(Process): """Close the file and join the thread.""" if close: self._queue.put(StopIteration) - self._fd.close() + if isinstance(self._fd, int): + os.close(self._fd) + else: + self._fd.close() Process.join(self, timeout) From e3571eb93a577a0e1371dc5e678262c2d078007d Mon Sep 17 00:00:00 2001 From: Alexandre Aubin Date: Sat, 4 Nov 2017 19:43:38 +0100 Subject: [PATCH 5/5] Cleaning / looks like it's working :/ --- moulinette/utils/process.py | 20 ++++++++++---------- moulinette/utils/stream.py | 16 +++++++--------- 2 files changed, 17 insertions(+), 19 deletions(-) diff --git a/moulinette/utils/process.py b/moulinette/utils/process.py index 2151763f..6e10d4dd 100644 --- a/moulinette/utils/process.py +++ b/moulinette/utils/process.py @@ -64,6 +64,9 @@ def call_async_output(args, callback, **kwargs): assert len(callback) == 3 stdinfo = kwargs.pop("stdinfo") os.mkfifo(stdinfo, 0600) + # Open stdinfo for reading (in a nonblocking way, i.e. even + # if command does not write in the stdinfo pipe...) + stdinfo_f = os.open(stdinfo, os.O_RDONLY|os.O_NONBLOCK) else: kwargs.pop("stdinfo") stdinfo = None @@ -90,17 +93,7 @@ def call_async_output(args, callback, **kwargs): if separate_stderr: stderr_reader, stderr_consum = async_file_reading(p.stderr, callback[1]) if stdinfo: - # Open stdinfo for reading (in a nonblocking way, i.e. even - # if command does not write in the stdinfo pipe...) - stdinfo_f = os.open(stdinfo, os.O_RDONLY|os.O_NONBLOCK) stdinfo_reader, stdinfo_consum = async_file_reading(stdinfo_f, callback[2]) - while not stdout_reader.eof() or not stderr_reader.eof(): - time.sleep(.1) - # Remove the stdinfo pipe - os.remove(stdinfo) - os.rmdir(os.path.dirname(stdinfo)) - stdinfo_reader.join() - stdinfo_consum.join() while not stdout_reader.eof() or not stderr_reader.eof(): time.sleep(.1) @@ -112,6 +105,13 @@ def call_async_output(args, callback, **kwargs): stdout_reader.join() stdout_consum.join() + if stdinfo: + # Remove the stdinfo pipe + os.remove(stdinfo) + os.rmdir(os.path.dirname(stdinfo)) + stdinfo_reader.join() + stdinfo_consum.join() + # on slow hardware, in very edgy situations it is possible that the process # isn't finished just after having closed stdout and stderr, so we wait a # bit to give hime the time to finish (while having a timeout) diff --git a/moulinette/utils/stream.py b/moulinette/utils/stream.py index 32dbaa41..7b16897e 100644 --- a/moulinette/utils/stream.py +++ b/moulinette/utils/stream.py @@ -41,15 +41,13 @@ class AsynchronousFileReader(Process): else: data = "" while True: - # If nobody's writing in there anymore, get out - if os.fstat(self._fd).st_nlink == 0: - print "Plop" - self._queue.put("(Info returning because no link)") - return - - # Read (non-blockingly) a few bytes, append them to the buffer + # Try to read (non-blockingly) a few bytes, append them to + # the buffer data += os.read(self._fd, 50) - print data + + # If nobody's writing in there anymore, get out + if not data and os.fstat(self._fd).st_nlink == 0: + return # If we have data, extract a line (ending with \n) and feed # it to the consumer @@ -58,7 +56,7 @@ class AsynchronousFileReader(Process): self._queue.put(lines[0]) data = '\n'.join(lines[1:]) else: - time.sleep(0.1) + time.sleep(0.05) def eof(self): """Check whether there is no more content to expect."""