mirror of
https://github.com/YunoHost/moulinette.git
synced 2024-09-03 20:06:31 +02:00
Merge pull request #155 from YunoHost/stdinfo
[enh] Optionnal stdinfo communication channel when running commands
This commit is contained in:
commit
f462ed83c6
3 changed files with 63 additions and 5 deletions
|
@ -36,6 +36,7 @@
|
||||||
"unknown_group": "Unknown '{group}' group",
|
"unknown_group": "Unknown '{group}' group",
|
||||||
"unknown_user": "Unknown '{user}' user",
|
"unknown_user": "Unknown '{user}' user",
|
||||||
"values_mismatch": "Values don't match",
|
"values_mismatch": "Values don't match",
|
||||||
|
"info": "Info:",
|
||||||
"warning": "Warning:",
|
"warning": "Warning:",
|
||||||
"websocket_request_expected": "Expected a WebSocket request",
|
"websocket_request_expected": "Expected a WebSocket request",
|
||||||
"cannot_open_file": "Could not open file {file:s} (reason: {error:s})",
|
"cannot_open_file": "Could not open file {file:s} (reason: {error:s})",
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
import errno
|
import errno
|
||||||
import time
|
import time
|
||||||
import subprocess
|
import subprocess
|
||||||
|
import os
|
||||||
|
|
||||||
from moulinette.core import MoulinetteError
|
from moulinette.core import MoulinetteError
|
||||||
|
|
||||||
|
@ -59,9 +60,20 @@ def call_async_output(args, callback, **kwargs):
|
||||||
raise ValueError('%s argument not allowed, '
|
raise ValueError('%s argument not allowed, '
|
||||||
'it will be overridden.' % a)
|
'it will be overridden.' % a)
|
||||||
|
|
||||||
|
if "stdinfo" in kwargs and kwargs["stdinfo"] != None:
|
||||||
|
assert len(callback) == 3
|
||||||
|
stdinfo = kwargs.pop("stdinfo")
|
||||||
|
os.mkfifo(stdinfo, 0600)
|
||||||
|
# Open stdinfo for reading (in a nonblocking way, i.e. even
|
||||||
|
# if command does not write in the stdinfo pipe...)
|
||||||
|
stdinfo_f = os.open(stdinfo, os.O_RDONLY|os.O_NONBLOCK)
|
||||||
|
else:
|
||||||
|
kwargs.pop("stdinfo")
|
||||||
|
stdinfo = None
|
||||||
|
|
||||||
# Validate callback argument
|
# Validate callback argument
|
||||||
if isinstance(callback, tuple):
|
if isinstance(callback, tuple):
|
||||||
if len(callback) != 2:
|
if len(callback) < 2:
|
||||||
raise ValueError('callback argument should be a 2-tuple')
|
raise ValueError('callback argument should be a 2-tuple')
|
||||||
kwargs['stdout'] = kwargs['stderr'] = subprocess.PIPE
|
kwargs['stdout'] = kwargs['stderr'] = subprocess.PIPE
|
||||||
separate_stderr = True
|
separate_stderr = True
|
||||||
|
@ -80,17 +92,22 @@ def call_async_output(args, callback, **kwargs):
|
||||||
stdout_reader, stdout_consum = async_file_reading(p.stdout, callback[0])
|
stdout_reader, stdout_consum = async_file_reading(p.stdout, callback[0])
|
||||||
if separate_stderr:
|
if separate_stderr:
|
||||||
stderr_reader, stderr_consum = async_file_reading(p.stderr, callback[1])
|
stderr_reader, stderr_consum = async_file_reading(p.stderr, callback[1])
|
||||||
|
if stdinfo:
|
||||||
|
stdinfo_reader, stdinfo_consum = async_file_reading(stdinfo_f, callback[2])
|
||||||
|
|
||||||
while not stdout_reader.eof() and not stderr_reader.eof():
|
while not stdout_reader.eof() and not stderr_reader.eof():
|
||||||
while not stdout_consum.empty() or not stderr_consum.empty():
|
while not stdout_consum.empty() or not stderr_consum.empty():
|
||||||
# alternate between the 2 consumers to avoid desynchronisation
|
# alternate between the 2 consumers to avoid desynchronisation
|
||||||
# this way is not 100% perfect but should do it
|
# this way is not 100% perfect but should do it
|
||||||
stdout_consum.process_next_line()
|
stdout_consum.process_next_line()
|
||||||
stderr_consum.process_next_line()
|
stderr_consum.process_next_line()
|
||||||
|
stdinfo_consum.process_next_line()
|
||||||
time.sleep(.1)
|
time.sleep(.1)
|
||||||
stderr_reader.join()
|
stderr_reader.join()
|
||||||
# clear the queues
|
# clear the queues
|
||||||
stdout_consum.process_current_queue()
|
stdout_consum.process_current_queue()
|
||||||
stderr_consum.process_current_queue()
|
stderr_consum.process_current_queue()
|
||||||
|
stdinfo_consum.process_current_queue()
|
||||||
else:
|
else:
|
||||||
while not stdout_reader.eof():
|
while not stdout_reader.eof():
|
||||||
stdout_consum.process_current_queue()
|
stdout_consum.process_current_queue()
|
||||||
|
@ -99,6 +116,13 @@ def call_async_output(args, callback, **kwargs):
|
||||||
# clear the queue
|
# clear the queue
|
||||||
stdout_consum.process_current_queue()
|
stdout_consum.process_current_queue()
|
||||||
|
|
||||||
|
if stdinfo:
|
||||||
|
# Remove the stdinfo pipe
|
||||||
|
os.remove(stdinfo)
|
||||||
|
os.rmdir(os.path.dirname(stdinfo))
|
||||||
|
stdinfo_reader.join()
|
||||||
|
stdinfo_consum.process_current_queue()
|
||||||
|
|
||||||
# on slow hardware, in very edgy situations it is possible that the process
|
# on slow hardware, in very edgy situations it is possible that the process
|
||||||
# isn't finished just after having closed stdout and stderr, so we wait a
|
# isn't finished just after having closed stdout and stderr, so we wait a
|
||||||
# bit to give hime the time to finish (while having a timeout)
|
# bit to give hime the time to finish (while having a timeout)
|
||||||
|
|
|
@ -1,3 +1,6 @@
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
|
||||||
from multiprocessing.process import Process
|
from multiprocessing.process import Process
|
||||||
from multiprocessing.queues import SimpleQueue
|
from multiprocessing.queues import SimpleQueue
|
||||||
|
|
||||||
|
@ -18,15 +21,42 @@ class AsynchronousFileReader(Process):
|
||||||
def __init__(self, fd, queue):
|
def __init__(self, fd, queue):
|
||||||
assert hasattr(queue, 'put')
|
assert hasattr(queue, 'put')
|
||||||
assert hasattr(queue, 'empty')
|
assert hasattr(queue, 'empty')
|
||||||
assert callable(fd.readline)
|
assert isinstance(fd, int) or callable(fd.readline)
|
||||||
Process.__init__(self)
|
Process.__init__(self)
|
||||||
self._fd = fd
|
self._fd = fd
|
||||||
self._queue = queue
|
self._queue = queue
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
"""The body of the tread: read lines and put them on the queue."""
|
"""The body of the tread: read lines and put them on the queue."""
|
||||||
for line in iter(self._fd.readline, ''):
|
|
||||||
self._queue.put(line)
|
# If self._fd is a file opened with open()...
|
||||||
|
# Typically that's for stdout/stderr pipes
|
||||||
|
# We can read the stuff easily with 'readline'
|
||||||
|
if not isinstance(self._fd, int):
|
||||||
|
for line in iter(self._fd.readline, ''):
|
||||||
|
self._queue.put(line)
|
||||||
|
|
||||||
|
# Else, it got opened with os.open() and we have to read it
|
||||||
|
# wit low level crap...
|
||||||
|
else:
|
||||||
|
data = ""
|
||||||
|
while True:
|
||||||
|
# Try to read (non-blockingly) a few bytes, append them to
|
||||||
|
# the buffer
|
||||||
|
data += os.read(self._fd, 50)
|
||||||
|
|
||||||
|
# If nobody's writing in there anymore, get out
|
||||||
|
if not data and os.fstat(self._fd).st_nlink == 0:
|
||||||
|
return
|
||||||
|
|
||||||
|
# If we have data, extract a line (ending with \n) and feed
|
||||||
|
# it to the consumer
|
||||||
|
if data and '\n' in data:
|
||||||
|
lines = data.split('\n')
|
||||||
|
self._queue.put(lines[0])
|
||||||
|
data = '\n'.join(lines[1:])
|
||||||
|
else:
|
||||||
|
time.sleep(0.05)
|
||||||
|
|
||||||
def eof(self):
|
def eof(self):
|
||||||
"""Check whether there is no more content to expect."""
|
"""Check whether there is no more content to expect."""
|
||||||
|
@ -36,7 +66,10 @@ class AsynchronousFileReader(Process):
|
||||||
"""Close the file and join the thread."""
|
"""Close the file and join the thread."""
|
||||||
if close:
|
if close:
|
||||||
self._queue.put(StopIteration)
|
self._queue.put(StopIteration)
|
||||||
self._fd.close()
|
if isinstance(self._fd, int):
|
||||||
|
os.close(self._fd)
|
||||||
|
else:
|
||||||
|
self._fd.close()
|
||||||
Process.join(self, timeout)
|
Process.join(self, timeout)
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue