diff --git a/moulinette/utils/process.py b/moulinette/utils/process.py index e48f27a9..332e7ebc 100644 --- a/moulinette/utils/process.py +++ b/moulinette/utils/process.py @@ -81,14 +81,23 @@ def call_async_output(args, callback, **kwargs): if separate_stderr: stderr_reader, stderr_consum = async_file_reading(p.stderr, callback[1]) while not stdout_reader.eof() and not stderr_reader.eof(): + while not stdout_consum.empty() or not stderr_consum.empty(): + # alternate between the 2 consumers to avoid desynchronisation + # this way is not 100% perfect but should do it + stdout_consum.process_next_line() + stderr_consum.process_next_line() time.sleep(.1) stderr_reader.join() - stderr_consum.join() + # clear the queues + stdout_consum.process_current_queue() + stderr_consum.process_current_queue() else: while not stdout_reader.eof(): + stdout_consum.process_current_queue() time.sleep(.1) stdout_reader.join() - stdout_consum.join() + # clear the queue + stdout_consum.process_current_queue() # on slow hardware, in very edgy situations it is possible that the process # isn't finished just after having closed stdout and stderr, so we wait a diff --git a/moulinette/utils/stream.py b/moulinette/utils/stream.py index 46b6ff88..e0350af5 100644 --- a/moulinette/utils/stream.py +++ b/moulinette/utils/stream.py @@ -40,14 +40,29 @@ class AsynchronousFileReader(Process): Process.join(self, timeout) -def consume_queue(queue, callback): - """Consume the queue and give content to the callback.""" - while True: - line = queue.get() - if line: - if line == StopIteration: - break - callback(line) +class Consummer(object): + def __init__(self, queue, callback): + self.queue = queue + self.callback = callback + + def empty(self): + return self.queue.empty() + + def process_next_line(self): + if not self.empty(): + line = self.queue.get() + if line: + if line == StopIteration: + return + self.callback(line) + + def process_current_queue(self): + while not self.empty(): + line = self.queue.get() + if line: + if line == StopIteration: + break + self.callback(line) def async_file_reading(fd, callback): @@ -55,6 +70,5 @@ def async_file_reading(fd, callback): queue = SimpleQueue() reader = AsynchronousFileReader(fd, queue) reader.start() - consummer = Process(target=consume_queue, args=(queue, callback)) - consummer.start() + consummer = Consummer(queue, callback) return (reader, consummer)