This commit is contained in:
Alexandre Aubin 2024-07-17 17:03:53 +00:00 committed by GitHub
commit 8f1c59c671
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 71 additions and 124 deletions

3
debian/control vendored
View file

@ -16,7 +16,8 @@ Depends: ${misc:Depends}, ${python3:Depends},
python3-psutil,
python3-tz,
python3-prompt-toolkit,
python3-pygments
python3-pygments,
python3-zeromq
Breaks: yunohost (<< 4.1)
Description: prototype interfaces with ease in Python
Quickly and easily prototype interfaces for your application.

View file

@ -65,47 +65,6 @@ def filter_csrf(callback):
return wrapper
class LogQueues(dict):
"""Map of session ids to queue."""
pass
class APIQueueHandler(logging.Handler):
"""
A handler class which store logging records into a queue, to be used
and retrieved from the API.
"""
def __init__(self):
logging.Handler.__init__(self)
self.queues = LogQueues()
# actionsmap is actually set during the interface's init ...
self.actionsmap = None
def emit(self, record):
# Prevent triggering this function while moulinette
# is being initialized with --debug
if not self.actionsmap or len(request.cookies) == 0:
return
profile = request.params.get("profile", self.actionsmap.default_authentication)
authenticator = self.actionsmap.get_authenticator(profile)
s_id = authenticator.get_session_cookie(raise_if_no_session_exists=False)["id"]
try:
queue = self.queues[s_id]
except KeyError:
# Session is not initialized, abandon.
return
else:
# Put the message as a 2-tuple in the queue
queue.put_nowait((record.levelname.lower(), record.getMessage()))
# Put the current greenlet to sleep for 0 second in order to
# populate the new message in the queue
sleep(0)
class _HTTPArgumentParser:
"""Argument parser for HTTP requests
@ -249,9 +208,8 @@ class _ActionsMapPlugin:
name = "actionsmap"
api = 2
def __init__(self, actionsmap, log_queues={}):
def __init__(self, actionsmap):
self.actionsmap = actionsmap
self.log_queues = log_queues
def setup(self, app):
"""Setup plugin on the application
@ -279,11 +237,10 @@ class _ActionsMapPlugin:
skip=["actionsmap"],
)
# Append messages route
app.route(
"/messages",
name="messages",
callback=self.messages,
"/sse",
name="sse",
callback=self.sse,
skip=["actionsmap"],
)
@ -403,46 +360,36 @@ class _ActionsMapPlugin:
authenticator.delete_session_cookie()
return m18n.g("logged_out")
def messages(self):
"""Listen to the messages WebSocket stream
Retrieve the WebSocket stream and send to it each messages displayed by
the display method. They are JSON encoded as a dict { style: message }.
"""
def sse(self):
import zmq.green as zmq
profile = request.params.get("profile", self.actionsmap.default_authentication)
authenticator = self.actionsmap.get_authenticator(profile)
s_id = authenticator.get_session_cookie()["id"]
try:
queue = self.log_queues[s_id]
authenticator.get_session_cookie()
except KeyError:
# Create a new queue for the session
queue = Queue()
self.log_queues[s_id] = queue
raise HTTPResponse(m18n.g("not_logged_in"), 401)
wsock = request.environ.get("wsgi.websocket")
if not wsock:
raise HTTPResponse(m18n.g("websocket_request_expected"), 500)
ctx = zmq.Context()
sub = ctx.socket(zmq.SUB)
sub.subscribe('')
sub.connect(log.LOG_BROKER_FRONTEND_ENDPOINT)
while True:
item = queue.get()
try:
# Retrieve the message
style, message = item
except TypeError:
if item == StopIteration:
# Delete the current queue and break
del self.log_queues[s_id]
break
logger.exception("invalid item in the messages queue: %r", item)
else:
try:
# Send the message
wsock.send(json_encode({style: message}))
except WebSocketError:
break
sleep(0)
response.content_type = 'text/event-stream'
response.cache_control = 'no-cache'
# Set client-side auto-reconnect timeout, ms.
yield 'retry: 100\n\n'
try:
while True:
if sub.poll(10, zmq.POLLIN):
_, msg = sub.recv_multipart()
yield 'data: ' + str(msg.decode()) + '\n\n'
finally:
sub.close()
ctx.term()
def process(self, _route, arguments={}):
"""Process the relevant action for the route
@ -480,37 +427,9 @@ class _ActionsMapPlugin:
rmtree(UPLOAD_DIR, True)
UPLOAD_DIR = None
# Close opened WebSocket by putting StopIteration in the queue
profile = request.params.get(
"profile", self.actionsmap.default_authentication
)
authenticator = self.actionsmap.get_authenticator(profile)
try:
s_id = authenticator.get_session_cookie()["id"]
queue = self.log_queues[s_id]
except MoulinetteAuthenticationError:
pass
except KeyError:
pass
else:
queue.put(StopIteration)
def display(self, message, style="info"):
profile = request.params.get("profile", self.actionsmap.default_authentication)
authenticator = self.actionsmap.get_authenticator(profile)
s_id = authenticator.get_session_cookie(raise_if_no_session_exists=False)["id"]
try:
queue = self.log_queues[s_id]
except KeyError:
return
# Put the message as a 2-tuple in the queue
queue.put_nowait((style, message))
# Put the current greenlet to sleep for 0 second in order to
# populate the new message in the queue
sleep(0)
pass
def prompt(self, *args, **kwargs):
raise NotImplementedError("Prompt is not implemented for this interface")
@ -707,9 +626,6 @@ class Interface:
Keyword arguments:
- routes -- A dict of additional routes to add in the form of
{(method, path): callback}
- log_queues -- A LogQueues object or None to retrieve it from
registered logging handlers
"""
type = "api"
@ -719,12 +635,6 @@ class Interface:
self.allowed_cors_origins = allowed_cors_origins
# Attempt to retrieve log queues from an APIQueueHandler
handler = log.getHandlersByClass(APIQueueHandler, limit=1)
if handler:
log_queues = handler.queues
handler.actionsmap = actionsmap
# TODO: Return OK to 'OPTIONS' xhr requests (l173)
app = Bottle(autojson=True)
@ -763,7 +673,7 @@ class Interface:
app.install(filter_csrf)
app.install(cors)
app.install(api18n)
actionsmapplugin = _ActionsMapPlugin(actionsmap, log_queues)
actionsmapplugin = _ActionsMapPlugin(actionsmap)
app.install(actionsmapplugin)
self.authenticate = actionsmapplugin.authenticate
@ -804,11 +714,10 @@ class Interface:
)
try:
from gevent.pywsgi import WSGIServer
from geventwebsocket.handler import WebSocketHandler
from gevent import monkey; monkey.patch_all()
from bottle import GeventServer
server = WSGIServer((host, port), self._app, handler_class=WebSocketHandler)
server.serve_forever()
GeventServer(host, port).run(self._app)
except IOError as e:
error_message = "unable to start the server instance on %s:%d: %s" % (
host,

View file

@ -46,7 +46,6 @@ DEFAULT_LOGGING = {
"loggers": {"moulinette": {"level": "DEBUG", "handlers": ["console"]}},
}
def configure_logging(logging_config=None):
"""Configure logging with default and optionally given configuration
@ -119,3 +118,41 @@ class MoulinetteLogger(Logger):
break
return rv
# Log broadcasting via the broker -----------------------------------------------
# FIXME : hard-coded value about yunohost ... and also we need to secure those file such that they are not public
LOG_BROKER_BACKEND_ENDPOINT = "ipc:///var/run/yunohost/log_broker_backend"
LOG_BROKER_FRONTEND_ENDPOINT = "ipc:///var/run/yunohost/log_broker_frontend"
if not os.path.isdir("/var/run/yunohost"):
os.mkdir("/var/run/yunohost")
os.chown("/var/run/yunohost", 0, 0)
os.chmod("/var/run/yunohost", 0o700)
def start_log_broker():
from multiprocessing import Process
def server():
import zmq
ctx = zmq.Context()
backend = ctx.socket(zmq.XSUB)
backend.bind(LOG_BROKER_BACKEND_ENDPOINT)
frontend = ctx.socket(zmq.XPUB)
frontend.bind(LOG_BROKER_FRONTEND_ENDPOINT)
try:
zmq.proxy(frontend, backend)
except KeyboardInterrupt:
pass
frontend.close()
backend.close()
ctx.term()
p = Process(target=server)
p.start()