diff --git a/moulinette/utils/process.py b/moulinette/utils/process.py index 2765abbd..f66aa7ed 100644 --- a/moulinette/utils/process.py +++ b/moulinette/utils/process.py @@ -5,7 +5,7 @@ try: except ImportError: from shlex import quote # Python3 >= 3.3 -from .stream import start_async_file_reading +from .stream import async_file_reading # Prevent to import subprocess only for common classes CalledProcessError = subprocess.CalledProcessError @@ -29,8 +29,10 @@ def call_async_output(args, callback, **kwargs): """Run command and provide its output asynchronously Run command with arguments and wait for it to complete to return the - returncode attribute. The callback must take one byte string argument - and will be called each time the command produces some output. + returncode attribute. The `callback` can be a method or a 2-tuple of + methods - for stdout and stderr respectively - which must take one + byte string argument. It will be called each time the command produces + some output. The stdout and stderr additional arguments for the Popen constructor are not allowed as they are used internally. @@ -48,23 +50,37 @@ def call_async_output(args, callback, **kwargs): if a in kwargs: raise ValueError('%s argument not allowed, ' 'it will be overridden.' % a) - if not callable(callback): - raise ValueError('callback argument must be callable') + + # Validate callback argument + if isinstance(callback, tuple): + if len(callback) != 2: + raise ValueError('callback argument should be a 2-tuple') + kwargs['stdout'] = kwargs['stderr'] = subprocess.PIPE + separate_stderr = True + elif callable(callback): + kwargs['stdout'] = subprocess.PIPE + kwargs['stderr'] = subprocess.STDOUT + separate_stderr = False + callback = (callback,) + else: + raise ValueError('callback argument must be callable or a 2-tuple') # Run the command - p = subprocess.Popen(args, stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, **kwargs) + p = subprocess.Popen(args, **kwargs) - # Wrap and get command output - reader, queue = start_async_file_reading(p.stdout) - while not reader.eof(): - while not queue.empty(): - line = queue.get() - try: - callback(line.rstrip()) - except: - pass - reader.join() + # Wrap and get command outputs + stdout_reader, stdout_consum = async_file_reading(p.stdout, callback[0]) + if separate_stderr: + stderr_reader, stderr_consum = async_file_reading(p.stderr, callback[1]) + while not stdout_reader.eof() and not stderr_reader.eof(): + time.sleep(.1) + stderr_reader.join() + stderr_consum.join() + else: + while not stdout_reader.eof(): + time.sleep(.1) + stdout_reader.join() + stdout_consum.join() return p.poll() diff --git a/moulinette/utils/stream.py b/moulinette/utils/stream.py index 0e696f35..8986067a 100644 --- a/moulinette/utils/stream.py +++ b/moulinette/utils/stream.py @@ -1,10 +1,10 @@ -import threading -import Queue +from multiprocessing.process import Process +from multiprocessing.queues import SimpleQueue # Read from a stream --------------------------------------------------- -class AsynchronousFileReader(threading.Thread): +class AsynchronousFileReader(Process): """ Helper class to implement asynchronous reading of a file in a separate thread. Pushes read lines on a queue to @@ -15,9 +15,10 @@ class AsynchronousFileReader(threading.Thread): """ def __init__(self, fd, queue): - assert isinstance(queue, Queue.Queue) + assert hasattr(queue, 'put') + assert hasattr(queue, 'empty') assert callable(fd.readline) - threading.Thread.__init__(self) + Process.__init__(self) self._fd = fd self._queue = queue @@ -33,13 +34,25 @@ class AsynchronousFileReader(threading.Thread): def join(self, timeout=None, close=True): """Close the file and join the thread.""" if close: + self._queue.put(StopIteration) self._fd.close() - threading.Thread.join(self, timeout) + Process.join(self, timeout) -def start_async_file_reading(fd): +def consume_queue(queue, callback): + """Consume the queue and give content to the callback.""" + while True: + line = queue.get() + if line: + if line == StopIteration: + break + callback(line) + +def async_file_reading(fd, callback): """Helper which instantiate and run an AsynchronousFileReader.""" - queue = Queue.Queue() + queue = SimpleQueue() reader = AsynchronousFileReader(fd, queue) reader.start() - return (reader, queue) + consummer = Process(target=consume_queue, args=(queue, callback)) + consummer.start() + return (reader, consummer)