POC for new log streaming API using a zero-mq broker

This commit is contained in:
Alexandre Aubin 2023-05-19 18:18:21 +02:00
parent e8f10ce54e
commit 2ac17c2f46
3 changed files with 64 additions and 127 deletions

3
debian/control vendored
View file

@ -16,7 +16,8 @@ Depends: ${misc:Depends}, ${python3:Depends},
python3-psutil,
python3-tz,
python3-prompt-toolkit,
python3-pygments
python3-pygments,
python3-zeromq
Breaks: yunohost (<< 4.1)
Description: prototype interfaces with ease in Python
Quickly and easily prototype interfaces for your application.

View file

@ -65,49 +65,6 @@ def filter_csrf(callback):
return wrapper
class LogQueues(dict):
"""Map of session ids to queue."""
pass
class APIQueueHandler(logging.Handler):
"""
A handler class which store logging records into a queue, to be used
and retrieved from the API.
"""
def __init__(self):
logging.Handler.__init__(self)
self.queues = LogQueues()
# actionsmap is actually set during the interface's init ...
self.actionsmap = None
def emit(self, record):
# Prevent triggering this function while moulinette
# is being initialized with --debug
if not self.actionsmap or len(request.cookies) == 0:
return
profile = request.params.get("profile", self.actionsmap.default_authentication)
authenticator = self.actionsmap.get_authenticator(profile)
s_id = authenticator.get_session_cookie(raise_if_no_session_exists=False)["id"]
try:
queue = self.queues[s_id]
except KeyError:
# Session is not initialized, abandon.
return
else:
# Put the message as a 2-tuple in the queue
queue.put_nowait((record.levelname.lower(), record.getMessage()))
# Put the current greenlet to sleep for 0 second in order to
# populate the new message in the queue
sleep(0)
class _HTTPArgumentParser:
"""Argument parser for HTTP requests
@ -252,9 +209,8 @@ class _ActionsMapPlugin:
name = "actionsmap"
api = 2
def __init__(self, actionsmap, log_queues={}):
def __init__(self, actionsmap):
self.actionsmap = actionsmap
self.log_queues = log_queues
def setup(self, app):
"""Setup plugin on the application
@ -282,11 +238,10 @@ class _ActionsMapPlugin:
skip=["actionsmap"],
)
# Append messages route
app.route(
"/messages",
name="messages",
callback=self.messages,
"/sse",
name="sse",
callback=self.sse,
skip=["actionsmap"],
)
@ -395,46 +350,30 @@ class _ActionsMapPlugin:
authenticator.delete_session_cookie()
return m18n.g("logged_out")
def messages(self):
"""Listen to the messages WebSocket stream
def sse(self):
import time
import zmq
Retrieve the WebSocket stream and send to it each messages displayed by
the display method. They are JSON encoded as a dict { style: message }.
"""
# FIXME : check auth...
profile = request.params.get("profile", self.actionsmap.default_authentication)
authenticator = self.actionsmap.get_authenticator(profile)
ctx = zmq.Context()
sub = ctx.socket(zmq.SUB)
sub.subscribe('')
sub.connect(log.LOG_BROKER_FRONTEND_ENDPOINT)
s_id = authenticator.get_session_cookie()["id"]
try:
queue = self.log_queues[s_id]
except KeyError:
# Create a new queue for the session
queue = Queue()
self.log_queues[s_id] = queue
response.content_type = 'text/event-stream'
response.cache_control = 'no-cache'
wsock = request.environ.get("wsgi.websocket")
if not wsock:
raise HTTPResponse(m18n.g("websocket_request_expected"), 500)
# Set client-side auto-reconnect timeout, ms.
yield 'retry: 100\n\n'
while True:
item = queue.get()
try:
# Retrieve the message
style, message = item
except TypeError:
if item == StopIteration:
# Delete the current queue and break
del self.log_queues[s_id]
break
logger.exception("invalid item in the messages queue: %r", item)
else:
try:
# Send the message
wsock.send(json_encode({style: message}))
except WebSocketError:
break
sleep(0)
if sub.poll(10, zmq.POLLIN):
_, msg = sub.recv_multipart()
yield 'data: ' + str(msg.decode()) + '\n\n'
except KeyboardInterrupt:
break
def process(self, _route, arguments={}):
"""Process the relevant action for the route
@ -471,37 +410,9 @@ class _ActionsMapPlugin:
rmtree(UPLOAD_DIR, True)
UPLOAD_DIR = None
# Close opened WebSocket by putting StopIteration in the queue
profile = request.params.get(
"profile", self.actionsmap.default_authentication
)
authenticator = self.actionsmap.get_authenticator(profile)
try:
s_id = authenticator.get_session_cookie()["id"]
queue = self.log_queues[s_id]
except MoulinetteAuthenticationError:
pass
except KeyError:
pass
else:
queue.put(StopIteration)
def display(self, message, style="info"):
profile = request.params.get("profile", self.actionsmap.default_authentication)
authenticator = self.actionsmap.get_authenticator(profile)
s_id = authenticator.get_session_cookie(raise_if_no_session_exists=False)["id"]
try:
queue = self.log_queues[s_id]
except KeyError:
return
# Put the message as a 2-tuple in the queue
queue.put_nowait((style, message))
# Put the current greenlet to sleep for 0 second in order to
# populate the new message in the queue
sleep(0)
pass
def prompt(self, *args, **kwargs):
raise NotImplementedError("Prompt is not implemented for this interface")
@ -700,9 +611,6 @@ class Interface:
Keyword arguments:
- routes -- A dict of additional routes to add in the form of
{(method, path): callback}
- log_queues -- A LogQueues object or None to retrieve it from
registered logging handlers
"""
type = "api"
@ -710,12 +618,6 @@ class Interface:
def __init__(self, routes={}, actionsmap=None):
actionsmap = ActionsMap(actionsmap, ActionsMapParser())
# Attempt to retrieve log queues from an APIQueueHandler
handler = log.getHandlersByClass(APIQueueHandler, limit=1)
if handler:
log_queues = handler.queues
handler.actionsmap = actionsmap
# TODO: Return OK to 'OPTIONS' xhr requests (l173)
app = Bottle(autojson=True)
@ -743,7 +645,7 @@ class Interface:
app.install(filter_csrf)
app.install(apiheader)
app.install(api18n)
actionsmapplugin = _ActionsMapPlugin(actionsmap, log_queues)
actionsmapplugin = _ActionsMapPlugin(actionsmap)
app.install(actionsmapplugin)
self.authenticate = actionsmapplugin.authenticate
@ -778,11 +680,10 @@ class Interface:
)
try:
from gevent.pywsgi import WSGIServer
from geventwebsocket.handler import WebSocketHandler
from gevent import monkey; monkey.patch_all()
from bottle import GeventServer
server = WSGIServer((host, port), self._app, handler_class=WebSocketHandler)
server.serve_forever()
GeventServer(host, port).run(self._app)
except IOError as e:
error_message = "unable to start the server instance on %s:%d: %s" % (
host,

View file

@ -47,7 +47,6 @@ DEFAULT_LOGGING = {
"loggers": {"moulinette": {"level": "DEBUG", "handlers": ["console"]}},
}
def configure_logging(logging_config=None):
"""Configure logging with default and optionally given configuration
@ -196,3 +195,39 @@ class ActionFilter:
return False
record.__dict__[self.message_key] = msg
return True
# Log broadcasting via the broker -----------------------------------------------
# FIXME : hard-coded value about yunohost ... and also we need to secure those file such that they are not public
LOG_BROKER_BACKEND_ENDPOINT = "ipc:///var/run/yunohost/log_broker_backend"
LOG_BROKER_FRONTEND_ENDPOINT = "ipc:///var/run/yunohost/log_broker_frontend"
if not os.path.isdir("/var/run/yunohost"):
os.mkdir("/var/run/yunohost")
os.chown("/var/run/yunohost", 0, 0)
os.chmod("/var/run/yunohost", 0o700)
def start_log_broker():
from multiprocessing import Process
def server():
import zmq
ctx = zmq.Context()
backend = ctx.socket(zmq.XSUB)
backend.bind(LOG_BROKER_BACKEND_ENDPOINT)
frontend = ctx.socket(zmq.XPUB)
frontend.bind(LOG_BROKER_FRONTEND_ENDPOINT)
zmq.proxy(frontend, backend)
# Example says "we never get here"?
frontend.close()
backend.close()
ctx.term()
p = Process(target=server)
p.start()