diff --git a/moulinette/utils/stream.py b/moulinette/utils/stream.py index 652565ac..c69f8e02 100644 --- a/moulinette/utils/stream.py +++ b/moulinette/utils/stream.py @@ -1,8 +1,8 @@ import os import time -from multiprocessing.process import BaseProcess as Process -from multiprocessing.queues import SimpleQueue +from multiprocessing.context import Process +from multiprocessing import SimpleQueue # Read from a stream --------------------------------------------------- @@ -33,39 +33,13 @@ class AsynchronousFileReader(Process): # If self._fd is a file opened with open()... # Typically that's for stdout/stderr pipes - # We can read the stuff easily with 'readline' - if not isinstance(self._fd, int): - for line in iter(self._fd.readline, ""): - self._queue.put(line) - - # Else, it got opened with os.open() and we have to read it - # wit low level crap... - else: - data = "" - while True: - try: - # Try to read (non-blockingly) a few bytes, append them to - # the buffer - data += os.read(self._fd, 50) - except Exception as e: - print( - "from moulinette.utils.stream: could not read file descriptor : %s" - % str(e) - ) - continue - - # If nobody's writing in there anymore, get out - if not data and os.fstat(self._fd).st_nlink == 0: - return - - # If we have data, extract a line (ending with \n) and feed - # it to the consumer - if data and "\n" in data: - lines = data.split("\n") - self._queue.put(lines[0]) - data = "\n".join(lines[1:]) - else: - time.sleep(0.05) + # We can read the stuff easily by iterating it + # If self._fd has been opened with os.read, it will be an int + # let's wrap it in a TextIOWrapper to be able to do the same + if isinstance(self._fd, int): + self._fd = os.fdopen(self._fd) + for line in self._fd: + self._queue.put(line) def eof(self): """Check whether there is no more content to expect."""