(╯°□°)╯︵ ┻━┻

This commit is contained in:
Alexandre Aubin 2017-11-04 01:36:38 +01:00
parent 37918d9816
commit 8151ab3caf
2 changed files with 46 additions and 12 deletions

View file

@ -60,7 +60,6 @@ def call_async_output(args, callback, **kwargs):
raise ValueError('%s argument not allowed, ' raise ValueError('%s argument not allowed, '
'it will be overridden.' % a) 'it will be overridden.' % a)
if "stdinfo" in kwargs and kwargs["stdinfo"] != None: if "stdinfo" in kwargs and kwargs["stdinfo"] != None:
assert len(callback) == 3 assert len(callback) == 3
stdinfo = kwargs.pop("stdinfo") stdinfo = kwargs.pop("stdinfo")
@ -91,10 +90,15 @@ def call_async_output(args, callback, **kwargs):
if separate_stderr: if separate_stderr:
stderr_reader, stderr_consum = async_file_reading(p.stderr, callback[1]) stderr_reader, stderr_consum = async_file_reading(p.stderr, callback[1])
if stdinfo: if stdinfo:
stdinfo_f = open(stdinfo, "r") # Open stdinfo for reading (in a nonblocking way, i.e. even
# if command does not write in the stdinfo pipe...)
stdinfo_f = os.open(stdinfo, os.O_RDONLY|os.O_NONBLOCK)
stdinfo_reader, stdinfo_consum = async_file_reading(stdinfo_f, callback[2]) stdinfo_reader, stdinfo_consum = async_file_reading(stdinfo_f, callback[2])
while not stdout_reader.eof() or not stderr_reader.eof() or not stdinfo_reader.eof(): while not stdout_reader.eof() or not stderr_reader.eof():
time.sleep(.1) time.sleep(.1)
# Remove the stdinfo pipe
os.remove(stdinfo)
os.rmdir(os.path.dirname(stdinfo))
stdinfo_reader.join() stdinfo_reader.join()
stdinfo_consum.join() stdinfo_consum.join()
@ -108,11 +112,6 @@ def call_async_output(args, callback, **kwargs):
stdout_reader.join() stdout_reader.join()
stdout_consum.join() stdout_consum.join()
if stdinfo:
stdinfo_f.close()
os.remove(stdinfo)
os.rmdir(os.path.dirname(stdinfo))
# on slow hardware, in very edgy situations it is possible that the process # on slow hardware, in very edgy situations it is possible that the process
# isn't finished just after having closed stdout and stderr, so we wait a # isn't finished just after having closed stdout and stderr, so we wait a
# bit to give hime the time to finish (while having a timeout) # bit to give hime the time to finish (while having a timeout)

View file

@ -1,3 +1,6 @@
import os
import time
from multiprocessing.process import Process from multiprocessing.process import Process
from multiprocessing.queues import SimpleQueue from multiprocessing.queues import SimpleQueue
@ -18,15 +21,44 @@ class AsynchronousFileReader(Process):
def __init__(self, fd, queue): def __init__(self, fd, queue):
assert hasattr(queue, 'put') assert hasattr(queue, 'put')
assert hasattr(queue, 'empty') assert hasattr(queue, 'empty')
assert callable(fd.readline) assert isinstance(fd, int) or callable(fd.readline)
Process.__init__(self) Process.__init__(self)
self._fd = fd self._fd = fd
self._queue = queue self._queue = queue
def run(self): def run(self):
"""The body of the tread: read lines and put them on the queue.""" """The body of the tread: read lines and put them on the queue."""
for line in iter(self._fd.readline, ''):
self._queue.put(line) # If self._fd is a file opened with open()...
# Typically that's for stdout/stderr pipes
# We can read the stuff easily with 'readline'
if not isinstance(self._fd, int):
for line in iter(self._fd.readline, ''):
self._queue.put(line)
# Else, it got opened with os.open() and we have to read it
# wit low level crap...
else:
data = ""
while True:
# If nobody's writing in there anymore, get out
if os.fstat(self._fd).st_nlink == 0:
print "Plop"
self._queue.put("(Info returning because no link)")
return
# Read (non-blockingly) a few bytes, append them to the buffer
data += os.read(self._fd, 50)
print data
# If we have data, extract a line (ending with \n) and feed
# it to the consumer
if data and '\n' in data:
lines = data.split('\n')
self._queue.put(lines[0])
data = '\n'.join(lines[1:])
else:
time.sleep(0.1)
def eof(self): def eof(self):
"""Check whether there is no more content to expect.""" """Check whether there is no more content to expect."""
@ -36,7 +68,10 @@ class AsynchronousFileReader(Process):
"""Close the file and join the thread.""" """Close the file and join the thread."""
if close: if close:
self._queue.put(StopIteration) self._queue.put(StopIteration)
self._fd.close() if isinstance(self._fd, int):
os.close(self._fd)
else:
self._fd.close()
Process.join(self, timeout) Process.join(self, timeout)