[ref] Use a new asynchronous file reader helper

This commit is contained in:
Jérôme Lebleu 2015-11-14 21:59:38 +01:00
parent 4cf898d9c7
commit 2be9da941f

View file

@ -1,59 +1,45 @@
from threading import Thread
from Queue import Queue, Empty
import threading
import Queue
# Read from a stream ---------------------------------------------------
class NonBlockingStreamReader:
"""A non-blocking stream reader
class AsynchronousFileReader(threading.Thread):
"""
Helper class to implement asynchronous reading of a file
in a separate thread. Pushes read lines on a queue to
be consumed in another thread.
Open a separate thread which reads lines from the stream whenever data
becomes available and stores the data in a queue.
Based on: http://eyalarubas.com/python-subproc-nonblock.html
Keyword arguments:
- stream -- The stream to read from
Based on:
http://stefaanlippens.net/python-asynchronous-subprocess-pipe-reading
"""
def __init__(self, stream):
self._s = stream
self._q = Queue()
def __init__(self, fd, queue):
assert isinstance(queue, Queue.Queue)
assert callable(fd.readline)
threading.Thread.__init__(self)
self._fd = fd
self._queue = queue
def _populateQueue(stream, queue):
"""Collect lines from the stream and put them in the queue"""
while True:
line = stream.readline()
if line:
queue.put(line)
else:
break
def run(self):
"""The body of the tread: read lines and put them on the queue."""
for line in iter(self._fd.readline, ''):
self._queue.put(line)
self._t = Thread(target=_populateQueue, args=(self._s, self._q))
self._t.daemon = True
# Start collecting lines from the stream
self._t.start()
def eof(self):
"""Check whether there is no more content to expect."""
return not self.is_alive() and self._queue.empty()
def readline(self, block=False, timeout=None):
"""Read line from the stream
def join(self, timeout=None, close=True):
"""Close the file and join the thread."""
if close:
self._fd.close()
threading.Thread.join(self, timeout)
Attempt to pull from the queue the data and return it. If no data is
available or timeout has expired, it returns None.
Keyword arguments:
- block -- If True, block if necessary until data is available
- timeout -- The number of seconds to block
"""
try:
return self._q.get(block=timeout is not None,
timeout=timeout)
except Empty:
return None
def close(self):
"""Close the stream"""
try:
self._s.close()
except IOError:
pass
def start_async_file_reading(fd):
"""Helper which instantiate and run an AsynchronousFileReader."""
queue = Queue.Queue()
reader = AsynchronousFileReader(fd, queue)
reader.start()
return (reader, queue)