From 2ac17c2f469b1f43f6661a58a30f7acf5448a04a Mon Sep 17 00:00:00 2001 From: Alexandre Aubin Date: Fri, 19 May 2023 18:18:21 +0200 Subject: [PATCH 1/4] POC for new log streaming API using a zero-mq broker --- debian/control | 3 +- moulinette/interfaces/api.py | 151 ++++++----------------------------- moulinette/utils/log.py | 37 ++++++++- 3 files changed, 64 insertions(+), 127 deletions(-) diff --git a/debian/control b/debian/control index a6d14b00..f49a6870 100644 --- a/debian/control +++ b/debian/control @@ -16,7 +16,8 @@ Depends: ${misc:Depends}, ${python3:Depends}, python3-psutil, python3-tz, python3-prompt-toolkit, - python3-pygments + python3-pygments, + python3-zeromq Breaks: yunohost (<< 4.1) Description: prototype interfaces with ease in Python Quickly and easily prototype interfaces for your application. diff --git a/moulinette/interfaces/api.py b/moulinette/interfaces/api.py index 14ba0b54..92488623 100644 --- a/moulinette/interfaces/api.py +++ b/moulinette/interfaces/api.py @@ -65,49 +65,6 @@ def filter_csrf(callback): return wrapper -class LogQueues(dict): - - """Map of session ids to queue.""" - - pass - - -class APIQueueHandler(logging.Handler): - - """ - A handler class which store logging records into a queue, to be used - and retrieved from the API. - """ - - def __init__(self): - logging.Handler.__init__(self) - self.queues = LogQueues() - # actionsmap is actually set during the interface's init ... - self.actionsmap = None - - def emit(self, record): - # Prevent triggering this function while moulinette - # is being initialized with --debug - if not self.actionsmap or len(request.cookies) == 0: - return - - profile = request.params.get("profile", self.actionsmap.default_authentication) - authenticator = self.actionsmap.get_authenticator(profile) - - s_id = authenticator.get_session_cookie(raise_if_no_session_exists=False)["id"] - try: - queue = self.queues[s_id] - except KeyError: - # Session is not initialized, abandon. - return - else: - # Put the message as a 2-tuple in the queue - queue.put_nowait((record.levelname.lower(), record.getMessage())) - # Put the current greenlet to sleep for 0 second in order to - # populate the new message in the queue - sleep(0) - - class _HTTPArgumentParser: """Argument parser for HTTP requests @@ -252,9 +209,8 @@ class _ActionsMapPlugin: name = "actionsmap" api = 2 - def __init__(self, actionsmap, log_queues={}): + def __init__(self, actionsmap): self.actionsmap = actionsmap - self.log_queues = log_queues def setup(self, app): """Setup plugin on the application @@ -282,11 +238,10 @@ class _ActionsMapPlugin: skip=["actionsmap"], ) - # Append messages route app.route( - "/messages", - name="messages", - callback=self.messages, + "/sse", + name="sse", + callback=self.sse, skip=["actionsmap"], ) @@ -395,46 +350,30 @@ class _ActionsMapPlugin: authenticator.delete_session_cookie() return m18n.g("logged_out") - def messages(self): - """Listen to the messages WebSocket stream + def sse(self): + import time + import zmq - Retrieve the WebSocket stream and send to it each messages displayed by - the display method. They are JSON encoded as a dict { style: message }. - """ + # FIXME : check auth... - profile = request.params.get("profile", self.actionsmap.default_authentication) - authenticator = self.actionsmap.get_authenticator(profile) + ctx = zmq.Context() + sub = ctx.socket(zmq.SUB) + sub.subscribe('') + sub.connect(log.LOG_BROKER_FRONTEND_ENDPOINT) - s_id = authenticator.get_session_cookie()["id"] - try: - queue = self.log_queues[s_id] - except KeyError: - # Create a new queue for the session - queue = Queue() - self.log_queues[s_id] = queue + response.content_type = 'text/event-stream' + response.cache_control = 'no-cache' - wsock = request.environ.get("wsgi.websocket") - if not wsock: - raise HTTPResponse(m18n.g("websocket_request_expected"), 500) + # Set client-side auto-reconnect timeout, ms. + yield 'retry: 100\n\n' while True: - item = queue.get() try: - # Retrieve the message - style, message = item - except TypeError: - if item == StopIteration: - # Delete the current queue and break - del self.log_queues[s_id] - break - logger.exception("invalid item in the messages queue: %r", item) - else: - try: - # Send the message - wsock.send(json_encode({style: message})) - except WebSocketError: - break - sleep(0) + if sub.poll(10, zmq.POLLIN): + _, msg = sub.recv_multipart() + yield 'data: ' + str(msg.decode()) + '\n\n' + except KeyboardInterrupt: + break def process(self, _route, arguments={}): """Process the relevant action for the route @@ -471,37 +410,9 @@ class _ActionsMapPlugin: rmtree(UPLOAD_DIR, True) UPLOAD_DIR = None - # Close opened WebSocket by putting StopIteration in the queue - profile = request.params.get( - "profile", self.actionsmap.default_authentication - ) - authenticator = self.actionsmap.get_authenticator(profile) - try: - s_id = authenticator.get_session_cookie()["id"] - queue = self.log_queues[s_id] - except MoulinetteAuthenticationError: - pass - except KeyError: - pass - else: - queue.put(StopIteration) def display(self, message, style="info"): - profile = request.params.get("profile", self.actionsmap.default_authentication) - authenticator = self.actionsmap.get_authenticator(profile) - s_id = authenticator.get_session_cookie(raise_if_no_session_exists=False)["id"] - - try: - queue = self.log_queues[s_id] - except KeyError: - return - - # Put the message as a 2-tuple in the queue - queue.put_nowait((style, message)) - - # Put the current greenlet to sleep for 0 second in order to - # populate the new message in the queue - sleep(0) + pass def prompt(self, *args, **kwargs): raise NotImplementedError("Prompt is not implemented for this interface") @@ -700,9 +611,6 @@ class Interface: Keyword arguments: - routes -- A dict of additional routes to add in the form of {(method, path): callback} - - log_queues -- A LogQueues object or None to retrieve it from - registered logging handlers - """ type = "api" @@ -710,12 +618,6 @@ class Interface: def __init__(self, routes={}, actionsmap=None): actionsmap = ActionsMap(actionsmap, ActionsMapParser()) - # Attempt to retrieve log queues from an APIQueueHandler - handler = log.getHandlersByClass(APIQueueHandler, limit=1) - if handler: - log_queues = handler.queues - handler.actionsmap = actionsmap - # TODO: Return OK to 'OPTIONS' xhr requests (l173) app = Bottle(autojson=True) @@ -743,7 +645,7 @@ class Interface: app.install(filter_csrf) app.install(apiheader) app.install(api18n) - actionsmapplugin = _ActionsMapPlugin(actionsmap, log_queues) + actionsmapplugin = _ActionsMapPlugin(actionsmap) app.install(actionsmapplugin) self.authenticate = actionsmapplugin.authenticate @@ -778,11 +680,10 @@ class Interface: ) try: - from gevent.pywsgi import WSGIServer - from geventwebsocket.handler import WebSocketHandler + from gevent import monkey; monkey.patch_all() + from bottle import GeventServer - server = WSGIServer((host, port), self._app, handler_class=WebSocketHandler) - server.serve_forever() + GeventServer(host, port).run(self._app) except IOError as e: error_message = "unable to start the server instance on %s:%d: %s" % ( host, diff --git a/moulinette/utils/log.py b/moulinette/utils/log.py index 02f8a30a..07ee8bf0 100644 --- a/moulinette/utils/log.py +++ b/moulinette/utils/log.py @@ -47,7 +47,6 @@ DEFAULT_LOGGING = { "loggers": {"moulinette": {"level": "DEBUG", "handlers": ["console"]}}, } - def configure_logging(logging_config=None): """Configure logging with default and optionally given configuration @@ -196,3 +195,39 @@ class ActionFilter: return False record.__dict__[self.message_key] = msg return True + + +# Log broadcasting via the broker ----------------------------------------------- + + +# FIXME : hard-coded value about yunohost ... and also we need to secure those file such that they are not public +LOG_BROKER_BACKEND_ENDPOINT = "ipc:///var/run/yunohost/log_broker_backend" +LOG_BROKER_FRONTEND_ENDPOINT = "ipc:///var/run/yunohost/log_broker_frontend" + +if not os.path.isdir("/var/run/yunohost"): + os.mkdir("/var/run/yunohost") +os.chown("/var/run/yunohost", 0, 0) +os.chmod("/var/run/yunohost", 0o700) + +def start_log_broker(): + + from multiprocessing import Process + + def server(): + import zmq + + ctx = zmq.Context() + backend = ctx.socket(zmq.XSUB) + backend.bind(LOG_BROKER_BACKEND_ENDPOINT) + frontend = ctx.socket(zmq.XPUB) + frontend.bind(LOG_BROKER_FRONTEND_ENDPOINT) + + zmq.proxy(frontend, backend) + + # Example says "we never get here"? + frontend.close() + backend.close() + ctx.term() + + p = Process(target=server) + p.start() From ad345ee1bf397e61a542fb0022e2f1c395a7bc2b Mon Sep 17 00:00:00 2001 From: Alexandre Aubin Date: Fri, 19 May 2023 20:12:43 +0200 Subject: [PATCH 2/4] logbroker: properly catch KeyboardInterrupt to avoid a stracktrace when Ctrl+C --- moulinette/utils/log.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/moulinette/utils/log.py b/moulinette/utils/log.py index 07ee8bf0..2ac3d1d0 100644 --- a/moulinette/utils/log.py +++ b/moulinette/utils/log.py @@ -222,9 +222,11 @@ def start_log_broker(): frontend = ctx.socket(zmq.XPUB) frontend.bind(LOG_BROKER_FRONTEND_ENDPOINT) - zmq.proxy(frontend, backend) + try: + zmq.proxy(frontend, backend) + except KeyboardInterrupt: + pass - # Example says "we never get here"? frontend.close() backend.close() ctx.term() From af2316233ad66517dc7b3bb710d5077f9347ede7 Mon Sep 17 00:00:00 2001 From: Alexandre Aubin Date: Fri, 19 May 2023 20:13:18 +0200 Subject: [PATCH 3/4] sse: gotta use a special version of zeromq (zmq.green) when in gevent context --- moulinette/interfaces/api.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/moulinette/interfaces/api.py b/moulinette/interfaces/api.py index 92488623..06f146dc 100644 --- a/moulinette/interfaces/api.py +++ b/moulinette/interfaces/api.py @@ -352,7 +352,7 @@ class _ActionsMapPlugin: def sse(self): import time - import zmq + import zmq.green as zmq # FIXME : check auth... @@ -367,13 +367,14 @@ class _ActionsMapPlugin: # Set client-side auto-reconnect timeout, ms. yield 'retry: 100\n\n' - while True: - try: + try: + while True: if sub.poll(10, zmq.POLLIN): _, msg = sub.recv_multipart() yield 'data: ' + str(msg.decode()) + '\n\n' - except KeyboardInterrupt: - break + finally: + sub.close() + ctx.term() def process(self, _route, arguments={}): """Process the relevant action for the route From 47a60069800a457264888a51041679922c98fdcd Mon Sep 17 00:00:00 2001 From: Alexandre Aubin Date: Sat, 20 May 2023 02:02:26 +0200 Subject: [PATCH 4/4] sse: add auth check --- moulinette/interfaces/api.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/moulinette/interfaces/api.py b/moulinette/interfaces/api.py index 06f146dc..ed71f6cd 100644 --- a/moulinette/interfaces/api.py +++ b/moulinette/interfaces/api.py @@ -351,10 +351,15 @@ class _ActionsMapPlugin: return m18n.g("logged_out") def sse(self): - import time import zmq.green as zmq - # FIXME : check auth... + profile = request.params.get("profile", self.actionsmap.default_authentication) + authenticator = self.actionsmap.get_authenticator(profile) + + try: + authenticator.get_session_cookie() + except KeyError: + raise HTTPResponse(m18n.g("not_logged_in"), 401) ctx = zmq.Context() sub = ctx.socket(zmq.SUB)