POC for new log streaming API using a zero-mq broker

This commit is contained in:
Alexandre Aubin 2023-05-19 18:19:20 +02:00
parent 3b75485923
commit c3bf0a9739
2 changed files with 84 additions and 10 deletions

View file

@ -23,7 +23,7 @@ import sys
import moulinette
from moulinette import m18n
from moulinette.utils.log import configure_logging
from moulinette.utils.log import configure_logging, start_log_broker
from moulinette.interfaces.cli import colorize, get_locale
@ -111,6 +111,8 @@ def init_logging(interface="cli", debug=False, quiet=False, logdir="/var/log/yun
if not os.path.isdir(logdir):
os.makedirs(logdir, 0o750)
base_handlers = ["file"] + (["cli"] if interface == "cli" else [])
logging_configuration = {
"version": 1,
"disable_existing_loggers": True,
@ -134,32 +136,29 @@ def init_logging(interface="cli", debug=False, quiet=False, logdir="/var/log/yun
"class": "moulinette.interfaces.cli.TTYHandler",
"formatter": "tty-debug" if debug else "",
},
"api": {
"level": "DEBUG" if debug else "INFO",
"class": "moulinette.interfaces.api.APIQueueHandler",
},
"file": {
"class": "logging.FileHandler",
"formatter": "precise",
"filename": logfile,
"filters": ["action"],
},
},
"loggers": {
"yunohost": {
"level": "DEBUG",
"handlers": ["file", interface] if not quiet else ["file"],
"handlers": base_handlers if not quiet else ["file"],
"propagate": False,
},
"moulinette": {
"level": "DEBUG",
"handlers": ["file", interface] if not quiet else ["file"],
"handlers": base_handlers if not quiet else ["file"],
"propagate": False,
},
},
"root": {
"level": "DEBUG",
"handlers": ["file", interface] if debug else ["file"],
"handlers": base_handlers if debug else ["file"],
},
}
@ -180,4 +179,5 @@ def init_logging(interface="cli", debug=False, quiet=False, logdir="/var/log/yun
logging_configuration["loggers"]["moulinette"]["handlers"].append("cli")
logging_configuration["root"]["handlers"].append("cli")
start_log_broker()
configure_logging(logging_configuration)

View file

@ -16,23 +16,26 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import base64
import copy
import os
import re
import yaml
import glob
import psutil
import zmq
import json
from typing import List
from datetime import datetime, timedelta
from logging import FileHandler, getLogger, Formatter
from logging import FileHandler, getLogger, Formatter, Handler, INFO
from io import IOBase
from moulinette import m18n, Moulinette
from moulinette.core import MoulinetteError
from yunohost.utils.error import YunohostError, YunohostValidationError
from yunohost.utils.system import get_ynh_package_version
from moulinette.utils.log import getActionLogger
from moulinette.utils.log import getActionLogger, LOG_BROKER_BACKEND_ENDPOINT
from moulinette.utils.filesystem import read_file, read_yaml
logger = getActionLogger("yunohost.log")
@ -420,6 +423,57 @@ def is_unit_operation(
return decorate
class APIHandler(Handler):
def __init__(self, operation_id):
super().__init__()
self.operation_id = operation_id
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PUB)
self.socket.connect(LOG_BROKER_BACKEND_ENDPOINT)
# FIXME ? ... Boring hack because otherwise it seems we lose messages emitted while
# the socket ain't properly connected to the other side
import time
time.sleep(1)
def emit(self, record):
self._encode_and_pub({
"type": "msg",
"timestamp": record.created,
"level": record.levelname,
"msg": self.format(record),
})
def emit_operation_start(self, time):
self._encode_and_pub({
"type": "start",
"timestamp": time.timestamp(),
})
def emit_operation_end(self, time, success, errormsg):
self._encode_and_pub({
"type": "end",
"success": success,
"errormsg": errormsg,
"timestamp": time.timestamp(),
})
def _encode_and_pub(self, data):
data["operation_id"] = self.operation_id
payload = base64.b64encode(json.dumps(data).encode())
self.socket.send_multipart([b'', payload])
def close(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.socket.close()
self.context.term()
class RedactingFormatter(Formatter):
def __init__(self, format_string, data_to_redact):
super(RedactingFormatter, self).__init__(format_string)
@ -480,6 +534,7 @@ class OperationLogger:
self.started_at = None
self.ended_at = None
self.logger = None
self.api_handler = None
self._name = None
self.data_to_redact = []
self.parent = self.parent_logger()
@ -557,6 +612,8 @@ class OperationLogger:
self.started_at = datetime.utcnow()
self.flush()
self._register_log()
if self.api_handler:
self.api_handler.emit_operation_start(self.started_at)
@property
def md_path(self):
@ -586,10 +643,21 @@ class OperationLogger:
"%(asctime)s: %(levelname)s - %(message)s", self.data_to_redact
)
# Only do this one for the main parent operation
if not self.parent:
self.api_handler = APIHandler(self.name)
self.api_handler.level = INFO
self.api_handler.formatter = RedactingFormatter(
"%(message)s", self.data_to_redact
)
# Listen to the root logger
self.logger = getLogger("yunohost")
self.logger.addHandler(self.file_handler)
if not self.parent:
self.logger.addHandler(self.api_handler)
def flush(self):
"""
Write or rewrite the metadata file with all metadata known
@ -702,9 +770,15 @@ class OperationLogger:
self._error = error
self._success = error is None
if self.api_handler:
self.api_handler.emit_operation_end(self.ended_at, self._success, self._error)
if self.logger is not None:
self.logger.removeHandler(self.file_handler)
self.file_handler.close()
if self.api_handler:
self.logger.removeHandler(self.api_handler)
self.api_handler.close()
is_api = Moulinette.interface.type == "api"
desc = _get_description_from_name(self.name)