From c3bf0a97390bbae8779fe746844ba32eea80d893 Mon Sep 17 00:00:00 2001 From: Alexandre Aubin Date: Fri, 19 May 2023 18:19:20 +0200 Subject: [PATCH] POC for new log streaming API using a zero-mq broker --- src/__init__.py | 16 +++++----- src/log.py | 78 +++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 84 insertions(+), 10 deletions(-) diff --git a/src/__init__.py b/src/__init__.py index d13d61089..28b951801 100644 --- a/src/__init__.py +++ b/src/__init__.py @@ -23,7 +23,7 @@ import sys import moulinette from moulinette import m18n -from moulinette.utils.log import configure_logging +from moulinette.utils.log import configure_logging, start_log_broker from moulinette.interfaces.cli import colorize, get_locale @@ -111,6 +111,8 @@ def init_logging(interface="cli", debug=False, quiet=False, logdir="/var/log/yun if not os.path.isdir(logdir): os.makedirs(logdir, 0o750) + base_handlers = ["file"] + (["cli"] if interface == "cli" else []) + logging_configuration = { "version": 1, "disable_existing_loggers": True, @@ -134,32 +136,29 @@ def init_logging(interface="cli", debug=False, quiet=False, logdir="/var/log/yun "class": "moulinette.interfaces.cli.TTYHandler", "formatter": "tty-debug" if debug else "", }, - "api": { - "level": "DEBUG" if debug else "INFO", - "class": "moulinette.interfaces.api.APIQueueHandler", - }, "file": { "class": "logging.FileHandler", "formatter": "precise", "filename": logfile, "filters": ["action"], }, + }, "loggers": { "yunohost": { "level": "DEBUG", - "handlers": ["file", interface] if not quiet else ["file"], + "handlers": base_handlers if not quiet else ["file"], "propagate": False, }, "moulinette": { "level": "DEBUG", - "handlers": ["file", interface] if not quiet else ["file"], + "handlers": base_handlers if not quiet else ["file"], "propagate": False, }, }, "root": { "level": "DEBUG", - "handlers": ["file", interface] if debug else ["file"], + "handlers": base_handlers if debug else ["file"], }, } @@ -180,4 +179,5 @@ def init_logging(interface="cli", debug=False, quiet=False, logdir="/var/log/yun logging_configuration["loggers"]["moulinette"]["handlers"].append("cli") logging_configuration["root"]["handlers"].append("cli") + start_log_broker() configure_logging(logging_configuration) diff --git a/src/log.py b/src/log.py index 5ab918e76..753d19839 100644 --- a/src/log.py +++ b/src/log.py @@ -16,23 +16,26 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # +import base64 import copy import os import re import yaml import glob import psutil +import zmq +import json from typing import List from datetime import datetime, timedelta -from logging import FileHandler, getLogger, Formatter +from logging import FileHandler, getLogger, Formatter, Handler, INFO from io import IOBase from moulinette import m18n, Moulinette from moulinette.core import MoulinetteError from yunohost.utils.error import YunohostError, YunohostValidationError from yunohost.utils.system import get_ynh_package_version -from moulinette.utils.log import getActionLogger +from moulinette.utils.log import getActionLogger, LOG_BROKER_BACKEND_ENDPOINT from moulinette.utils.filesystem import read_file, read_yaml logger = getActionLogger("yunohost.log") @@ -420,6 +423,57 @@ def is_unit_operation( return decorate +class APIHandler(Handler): + + def __init__(self, operation_id): + super().__init__() + self.operation_id = operation_id + self.context = zmq.Context() + self.socket = self.context.socket(zmq.PUB) + self.socket.connect(LOG_BROKER_BACKEND_ENDPOINT) + + # FIXME ? ... Boring hack because otherwise it seems we lose messages emitted while + # the socket ain't properly connected to the other side + import time + time.sleep(1) + + def emit(self, record): + self._encode_and_pub({ + "type": "msg", + "timestamp": record.created, + "level": record.levelname, + "msg": self.format(record), + }) + + def emit_operation_start(self, time): + + self._encode_and_pub({ + "type": "start", + "timestamp": time.timestamp(), + }) + + def emit_operation_end(self, time, success, errormsg): + + self._encode_and_pub({ + "type": "end", + "success": success, + "errormsg": errormsg, + "timestamp": time.timestamp(), + }) + + def _encode_and_pub(self, data): + + data["operation_id"] = self.operation_id + + payload = base64.b64encode(json.dumps(data).encode()) + self.socket.send_multipart([b'', payload]) + + def close(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.socket.close() + self.context.term() + + class RedactingFormatter(Formatter): def __init__(self, format_string, data_to_redact): super(RedactingFormatter, self).__init__(format_string) @@ -480,6 +534,7 @@ class OperationLogger: self.started_at = None self.ended_at = None self.logger = None + self.api_handler = None self._name = None self.data_to_redact = [] self.parent = self.parent_logger() @@ -557,6 +612,8 @@ class OperationLogger: self.started_at = datetime.utcnow() self.flush() self._register_log() + if self.api_handler: + self.api_handler.emit_operation_start(self.started_at) @property def md_path(self): @@ -586,10 +643,21 @@ class OperationLogger: "%(asctime)s: %(levelname)s - %(message)s", self.data_to_redact ) + # Only do this one for the main parent operation + if not self.parent: + self.api_handler = APIHandler(self.name) + self.api_handler.level = INFO + self.api_handler.formatter = RedactingFormatter( + "%(message)s", self.data_to_redact + ) + # Listen to the root logger self.logger = getLogger("yunohost") self.logger.addHandler(self.file_handler) + if not self.parent: + self.logger.addHandler(self.api_handler) + def flush(self): """ Write or rewrite the metadata file with all metadata known @@ -702,9 +770,15 @@ class OperationLogger: self._error = error self._success = error is None + if self.api_handler: + self.api_handler.emit_operation_end(self.ended_at, self._success, self._error) + if self.logger is not None: self.logger.removeHandler(self.file_handler) self.file_handler.close() + if self.api_handler: + self.logger.removeHandler(self.api_handler) + self.api_handler.close() is_api = Moulinette.interface.type == "api" desc = _get_description_from_name(self.name)