[enh] Enhance stream utils with callback and use it in call_async_output

This commit is contained in:
Jérôme Lebleu 2015-11-15 15:19:55 +01:00
parent 5c1a7093a5
commit 152f5a3919
2 changed files with 55 additions and 26 deletions

View file

@ -5,7 +5,7 @@ try:
except ImportError:
from shlex import quote # Python3 >= 3.3
from .stream import start_async_file_reading
from .stream import async_file_reading
# Prevent to import subprocess only for common classes
CalledProcessError = subprocess.CalledProcessError
@ -29,8 +29,10 @@ def call_async_output(args, callback, **kwargs):
"""Run command and provide its output asynchronously
Run command with arguments and wait for it to complete to return the
returncode attribute. The callback must take one byte string argument
and will be called each time the command produces some output.
returncode attribute. The `callback` can be a method or a 2-tuple of
methods - for stdout and stderr respectively - which must take one
byte string argument. It will be called each time the command produces
some output.
The stdout and stderr additional arguments for the Popen constructor
are not allowed as they are used internally.
@ -48,23 +50,37 @@ def call_async_output(args, callback, **kwargs):
if a in kwargs:
raise ValueError('%s argument not allowed, '
'it will be overridden.' % a)
if not callable(callback):
raise ValueError('callback argument must be callable')
# Validate callback argument
if isinstance(callback, tuple):
if len(callback) != 2:
raise ValueError('callback argument should be a 2-tuple')
kwargs['stdout'] = kwargs['stderr'] = subprocess.PIPE
separate_stderr = True
elif callable(callback):
kwargs['stdout'] = subprocess.PIPE
kwargs['stderr'] = subprocess.STDOUT
separate_stderr = False
callback = (callback,)
else:
raise ValueError('callback argument must be callable or a 2-tuple')
# Run the command
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, **kwargs)
p = subprocess.Popen(args, **kwargs)
# Wrap and get command output
reader, queue = start_async_file_reading(p.stdout)
while not reader.eof():
while not queue.empty():
line = queue.get()
try:
callback(line.rstrip())
except:
pass
reader.join()
# Wrap and get command outputs
stdout_reader, stdout_consum = async_file_reading(p.stdout, callback[0])
if separate_stderr:
stderr_reader, stderr_consum = async_file_reading(p.stderr, callback[1])
while not stdout_reader.eof() and not stderr_reader.eof():
time.sleep(.1)
stderr_reader.join()
stderr_consum.join()
else:
while not stdout_reader.eof():
time.sleep(.1)
stdout_reader.join()
stdout_consum.join()
return p.poll()

View file

@ -1,10 +1,10 @@
import threading
import Queue
from multiprocessing.process import Process
from multiprocessing.queues import SimpleQueue
# Read from a stream ---------------------------------------------------
class AsynchronousFileReader(threading.Thread):
class AsynchronousFileReader(Process):
"""
Helper class to implement asynchronous reading of a file
in a separate thread. Pushes read lines on a queue to
@ -15,9 +15,10 @@ class AsynchronousFileReader(threading.Thread):
"""
def __init__(self, fd, queue):
assert isinstance(queue, Queue.Queue)
assert hasattr(queue, 'put')
assert hasattr(queue, 'empty')
assert callable(fd.readline)
threading.Thread.__init__(self)
Process.__init__(self)
self._fd = fd
self._queue = queue
@ -33,13 +34,25 @@ class AsynchronousFileReader(threading.Thread):
def join(self, timeout=None, close=True):
"""Close the file and join the thread."""
if close:
self._queue.put(StopIteration)
self._fd.close()
threading.Thread.join(self, timeout)
Process.join(self, timeout)
def start_async_file_reading(fd):
def consume_queue(queue, callback):
"""Consume the queue and give content to the callback."""
while True:
line = queue.get()
if line:
if line == StopIteration:
break
callback(line)
def async_file_reading(fd, callback):
"""Helper which instantiate and run an AsynchronousFileReader."""
queue = Queue.Queue()
queue = SimpleQueue()
reader = AsynchronousFileReader(fd, queue)
reader.start()
return (reader, queue)
consummer = Process(target=consume_queue, args=(queue, callback))
consummer.start()
return (reader, consummer)