Merge pull request #166 from YunoHost/async_callback_in_main_process

[mod] execute async consumers in the main process
This commit is contained in:
Bram 2018-07-09 19:59:14 +02:00 committed by GitHub
commit 9bf2642539
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 35 additions and 12 deletions

View file

@ -81,14 +81,23 @@ def call_async_output(args, callback, **kwargs):
if separate_stderr: if separate_stderr:
stderr_reader, stderr_consum = async_file_reading(p.stderr, callback[1]) stderr_reader, stderr_consum = async_file_reading(p.stderr, callback[1])
while not stdout_reader.eof() and not stderr_reader.eof(): while not stdout_reader.eof() and not stderr_reader.eof():
while not stdout_consum.empty() or not stderr_consum.empty():
# alternate between the 2 consumers to avoid desynchronisation
# this way is not 100% perfect but should do it
stdout_consum.process_next_line()
stderr_consum.process_next_line()
time.sleep(.1) time.sleep(.1)
stderr_reader.join() stderr_reader.join()
stderr_consum.join() # clear the queues
stdout_consum.process_current_queue()
stderr_consum.process_current_queue()
else: else:
while not stdout_reader.eof(): while not stdout_reader.eof():
stdout_consum.process_current_queue()
time.sleep(.1) time.sleep(.1)
stdout_reader.join() stdout_reader.join()
stdout_consum.join() # clear the queue
stdout_consum.process_current_queue()
# on slow hardware, in very edgy situations it is possible that the process # on slow hardware, in very edgy situations it is possible that the process
# isn't finished just after having closed stdout and stderr, so we wait a # isn't finished just after having closed stdout and stderr, so we wait a

View file

@ -40,14 +40,29 @@ class AsynchronousFileReader(Process):
Process.join(self, timeout) Process.join(self, timeout)
def consume_queue(queue, callback): class Consummer(object):
"""Consume the queue and give content to the callback.""" def __init__(self, queue, callback):
while True: self.queue = queue
line = queue.get() self.callback = callback
def empty(self):
return self.queue.empty()
def process_next_line(self):
if not self.empty():
line = self.queue.get()
if line:
if line == StopIteration:
return
self.callback(line)
def process_current_queue(self):
while not self.empty():
line = self.queue.get()
if line: if line:
if line == StopIteration: if line == StopIteration:
break break
callback(line) self.callback(line)
def async_file_reading(fd, callback): def async_file_reading(fd, callback):
@ -55,6 +70,5 @@ def async_file_reading(fd, callback):
queue = SimpleQueue() queue = SimpleQueue()
reader = AsynchronousFileReader(fd, queue) reader = AsynchronousFileReader(fd, queue)
reader.start() reader.start()
consummer = Process(target=consume_queue, args=(queue, callback)) consummer = Consummer(queue, callback)
consummer.start()
return (reader, consummer) return (reader, consummer)