Rework / simplify the whole story of call_async_output that was a complete mess and broken by Python 3 for some reason. New code should be much more simple and fix the synchronization between debug, info, warnings, and probably also more performant overall ?

This commit is contained in:
Alexandre Aubin 2021-01-01 04:00:33 +01:00
parent d0b43f0e4a
commit 84f17f7830
2 changed files with 40 additions and 176 deletions

View file

@ -10,7 +10,7 @@ try:
except ImportError:
from shlex import quote # Python3 >= 3.3
from .stream import async_file_reading
from .stream import LogPipe
quote # This line is here to avoid W0611 PEP8 error (see comments above)
@ -59,71 +59,20 @@ def call_async_output(args, callback, **kwargs):
if a in kwargs:
raise ValueError("%s argument not allowed, " "it will be overridden." % a)
if "stdinfo" in kwargs and kwargs["stdinfo"] is not None:
assert len(callback) == 3
stdinfo = kwargs.pop("stdinfo")
os.mkfifo(stdinfo, 0o600)
# Open stdinfo for reading (in a nonblocking way, i.e. even
# if command does not write in the stdinfo pipe...)
stdinfo_f = os.open(stdinfo, os.O_RDONLY | os.O_NONBLOCK)
else:
if "stdinfo" in kwargs:
kwargs.pop("stdinfo")
stdinfo = None
# Validate callback argument
if isinstance(callback, tuple):
if len(callback) < 2:
raise ValueError("callback argument should be a 2-tuple")
kwargs["stdout"] = kwargs["stderr"] = subprocess.PIPE
separate_stderr = True
elif callable(callback):
kwargs["stdout"] = subprocess.PIPE
kwargs["stderr"] = subprocess.STDOUT
separate_stderr = False
callback = (callback,)
else:
raise ValueError("callback argument must be callable or a 2-tuple")
# Run the command
p = subprocess.Popen(args, **kwargs)
# Wrap and get command outputs
stdout_reader, stdout_consum = async_file_reading(p.stdout, callback[0])
if separate_stderr:
stderr_reader, stderr_consum = async_file_reading(p.stderr, callback[1])
if stdinfo:
stdinfo_reader, stdinfo_consum = async_file_reading(stdinfo_f, callback[2])
while not stdout_reader.eof() and not stderr_reader.eof():
while not stdout_consum.empty() or not stderr_consum.empty():
# alternate between the 2 consumers to avoid desynchronisation
# this way is not 100% perfect but should do it
stdout_consum.process_next_line()
stderr_consum.process_next_line()
if stdinfo:
stdinfo_consum.process_next_line()
time.sleep(0.1)
stderr_reader.join()
# clear the queues
stdout_consum.process_current_queue()
stderr_consum.process_current_queue()
if stdinfo:
stdinfo_consum.process_current_queue()
else:
while not stdout_reader.eof():
stdout_consum.process_current_queue()
time.sleep(0.1)
stdout_reader.join()
# clear the queue
stdout_consum.process_current_queue()
kwargs["stdout"] = LogPipe(callback[0])
kwargs["stderr"] = LogPipe(callback[1])
stdinfo = LogPipe(callback[2]) if len(callback) >= 3 else None
if stdinfo:
# Remove the stdinfo pipe
os.remove(stdinfo)
os.rmdir(os.path.dirname(stdinfo))
stdinfo_reader.join()
stdinfo_consum.process_current_queue()
kwargs["pass_fds"] = [stdinfo.fdWrite]
if "env" not in kwargs:
kwargs["env"] = os.environ
kwargs["env"]['YNH_STDINFO'] = str(stdinfo.fdWrite)
with subprocess.Popen(args, **kwargs) as p:
kwargs["stdout"].close()
kwargs["stderr"].close()
if stdinfo:
stdinfo.close()
# on slow hardware, in very edgy situations it is possible that the process
# isn't finished just after having closed stdout and stderr, so we wait a

View file

@ -1,123 +1,38 @@
import os
import time
import threading
import sys
if sys.version_info[0] == 3:
from multiprocessing.process import BaseProcess as Process
from multiprocessing import SimpleQueue
else:
# python 2
from multiprocessing.process import Process
from multiprocessing.queues import SimpleQueue
# Adapted from https://codereview.stackexchange.com/a/17959
class LogPipe(threading.Thread):
# Read from a stream ---------------------------------------------------
def __init__(self, log_callback):
"""Setup the object with a logger and a loglevel
and start the thread
"""
threading.Thread.__init__(self)
self.daemon = False
self.log_callback = log_callback
self.fdRead, self.fdWrite = os.pipe()
self.pipeReader = os.fdopen(self.fdRead)
class AsynchronousFileReader(Process):
self.start()
"""
Helper class to implement asynchronous reading of a file
in a separate thread. Pushes read lines on a queue to
be consumed in another thread.
Based on:
http://stefaanlippens.net/python-asynchronous-subprocess-pipe-reading
"""
def __init__(self, fd, queue):
assert hasattr(queue, "put")
assert hasattr(queue, "empty")
assert isinstance(fd, int) or callable(fd.readline)
Process.__init__(self)
self._fd = fd
self._queue = queue
def fileno(self):
"""Return the write file descriptor of the pipe
"""
return self.fdWrite
def run(self):
"""The body of the tread: read lines and put them on the queue."""
"""Run the thread, logging everything.
"""
for line in iter(self.pipeReader.readline, ''):
self.log_callback(line.strip('\n'))
# If self._fd is a file opened with open()...
# Typically that's for stdout/stderr pipes
# We can read the stuff easily with 'readline'
if not isinstance(self._fd, int):
for line in iter(self._fd.readline, ""):
self._queue.put(line)
self.pipeReader.close()
# Else, it got opened with os.open() and we have to read it
# wit low level crap...
else:
data = ""
while True:
try:
# Try to read (non-blockingly) a few bytes, append them to
# the buffer
data += os.read(self._fd, 50)
except Exception as e:
print(
"from moulinette.utils.stream: could not read file descriptor : %s"
% str(e)
)
continue
# If nobody's writing in there anymore, get out
if not data and os.fstat(self._fd).st_nlink == 0:
return
# If we have data, extract a line (ending with \n) and feed
# it to the consumer
if data and "\n" in data:
lines = data.split("\n")
self._queue.put(lines[0])
data = "\n".join(lines[1:])
else:
time.sleep(0.05)
def eof(self):
"""Check whether there is no more content to expect."""
return not self.is_alive() and self._queue.empty()
def join(self, timeout=None, close=True):
"""Close the file and join the thread."""
if close:
self._queue.put(StopIteration)
if isinstance(self._fd, int):
os.close(self._fd)
else:
self._fd.close()
Process.join(self, timeout)
class Consummer(object):
def __init__(self, queue, callback):
self.queue = queue
self.callback = callback
def empty(self):
return self.queue.empty()
def process_next_line(self):
if not self.empty():
line = self.queue.get()
if line:
if line == StopIteration:
return
self.callback(line)
def process_current_queue(self):
while not self.empty():
line = self.queue.get()
if line:
if line == StopIteration:
break
self.callback(line)
def async_file_reading(fd, callback):
"""Helper which instantiate and run an AsynchronousFileReader."""
queue = SimpleQueue()
reader = AsynchronousFileReader(fd, queue)
reader.start()
consummer = Consummer(queue, callback)
return (reader, consummer)
def close(self):
"""Close the write end of the pipe.
"""
os.close(self.fdWrite)