diff --git a/src/utils/configpanel.py b/src/utils/configpanel.py new file mode 100644 index 000000000..1f1351bcb --- /dev/null +++ b/src/utils/configpanel.py @@ -0,0 +1,687 @@ +# +# Copyright (c) 2023 YunoHost Contributors +# +# This file is part of YunoHost (see https://yunohost.org) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +import glob +import os +import re +import urllib.parse +from collections import OrderedDict +from typing import Union + +from moulinette.interfaces.cli import colorize +from moulinette import Moulinette, m18n +from moulinette.utils.log import getActionLogger +from moulinette.utils.filesystem import ( + read_toml, + read_yaml, + write_to_yaml, + mkdir, +) + +from yunohost.utils.i18n import _value_for_locale +from yunohost.utils.error import YunohostError, YunohostValidationError + +logger = getActionLogger("yunohost.configpanel") +CONFIG_PANEL_VERSION_SUPPORTED = 1.0 + + +class ConfigPanel: + entity_type = "config" + save_path_tpl: Union[str, None] = None + config_path_tpl = "/usr/share/yunohost/config_{entity_type}.toml" + save_mode = "full" + + @classmethod + def list(cls): + """ + List available config panel + """ + try: + entities = [ + re.match( + "^" + cls.save_path_tpl.format(entity="(?p)") + "$", f + ).group("entity") + for f in glob.glob(cls.save_path_tpl.format(entity="*")) + if os.path.isfile(f) + ] + except FileNotFoundError: + entities = [] + return entities + + def __init__(self, entity, config_path=None, save_path=None, creation=False): + self.entity = entity + self.config_path = config_path + if not config_path: + self.config_path = self.config_path_tpl.format( + entity=entity, entity_type=self.entity_type + ) + self.save_path = save_path + if not save_path and self.save_path_tpl: + self.save_path = self.save_path_tpl.format(entity=entity) + self.config = {} + self.values = {} + self.new_values = {} + + if ( + self.save_path + and self.save_mode != "diff" + and not creation + and not os.path.exists(self.save_path) + ): + raise YunohostValidationError( + f"{self.entity_type}_unknown", **{self.entity_type: entity} + ) + if self.save_path and creation and os.path.exists(self.save_path): + raise YunohostValidationError( + f"{self.entity_type}_exists", **{self.entity_type: entity} + ) + + # Search for hooks in the config panel + self.hooks = { + func: getattr(self, func) + for func in dir(self) + if callable(getattr(self, func)) + and re.match("^(validate|post_ask)__", func) + } + + def get(self, key="", mode="classic"): + self.filter_key = key or "" + + # Read config panel toml + self._get_config_panel() + + if not self.config: + raise YunohostValidationError("config_no_panel") + + # Read or get values and hydrate the config + self._load_current_values() + self._hydrate() + + # In 'classic' mode, we display the current value if key refer to an option + if self.filter_key.count(".") == 2 and mode == "classic": + option = self.filter_key.split(".")[-1] + value = self.values.get(option, None) + + option_type = None + for _, _, option_ in self._iterate(): + if option_["id"] == option: + option_type = ARGUMENTS_TYPE_PARSERS[option_["type"]] + break + + return option_type.normalize(value) if option_type else value + + # Format result in 'classic' or 'export' mode + logger.debug(f"Formating result in '{mode}' mode") + result = {} + for panel, section, option in self._iterate(): + if section["is_action_section"] and mode != "full": + continue + + key = f"{panel['id']}.{section['id']}.{option['id']}" + if mode == "export": + result[option["id"]] = option.get("current_value") + continue + + ask = None + if "ask" in option: + ask = _value_for_locale(option["ask"]) + elif "i18n" in self.config: + ask = m18n.n(self.config["i18n"] + "_" + option["id"]) + + if mode == "full": + option["ask"] = ask + question_class = ARGUMENTS_TYPE_PARSERS[option.get("type", "string")] + # FIXME : maybe other properties should be taken from the question, not just choices ?. + option["choices"] = question_class(option).choices + option["default"] = question_class(option).default + option["pattern"] = question_class(option).pattern + else: + result[key] = {"ask": ask} + if "current_value" in option: + question_class = ARGUMENTS_TYPE_PARSERS[ + option.get("type", "string") + ] + result[key]["value"] = question_class.humanize( + option["current_value"], option + ) + # FIXME: semantics, technically here this is not about a prompt... + if question_class.hide_user_input_in_prompt: + result[key][ + "value" + ] = "**************" # Prevent displaying password in `config get` + + if mode == "full": + return self.config + else: + return result + + def list_actions(self): + actions = {} + + # FIXME : meh, loading the entire config panel is again going to cause + # stupid issues for domain (e.g loading registrar stuff when willing to just list available actions ...) + self.filter_key = "" + self._get_config_panel() + for panel, section, option in self._iterate(): + if option["type"] == "button": + key = f"{panel['id']}.{section['id']}.{option['id']}" + actions[key] = _value_for_locale(option["ask"]) + + return actions + + def run_action(self, action=None, args=None, args_file=None, operation_logger=None): + # + # FIXME : this stuff looks a lot like set() ... + # + + self.filter_key = ".".join(action.split(".")[:2]) + action_id = action.split(".")[2] + + # Read config panel toml + self._get_config_panel() + + # FIXME: should also check that there's indeed a key called action + if not self.config: + raise YunohostValidationError(f"No action named {action}", raw_msg=True) + + # Import and parse pre-answered options + logger.debug("Import and parse pre-answered options") + self._parse_pre_answered(args, None, args_file) + + # Read or get values and hydrate the config + self._load_current_values() + self._hydrate() + Question.operation_logger = operation_logger + self._ask(action=action_id) + + # FIXME: here, we could want to check constrains on + # the action's visibility / requirements wrt to the answer to questions ... + + if operation_logger: + operation_logger.start() + + try: + self._run_action(action_id) + except YunohostError: + raise + # Script got manually interrupted ... + # N.B. : KeyboardInterrupt does not inherit from Exception + except (KeyboardInterrupt, EOFError): + error = m18n.n("operation_interrupted") + logger.error(m18n.n("config_action_failed", action=action, error=error)) + raise + # Something wrong happened in Yunohost's code (most probably hook_exec) + except Exception: + import traceback + + error = m18n.n("unexpected_error", error="\n" + traceback.format_exc()) + logger.error(m18n.n("config_action_failed", action=action, error=error)) + raise + finally: + # Delete files uploaded from API + # FIXME : this is currently done in the context of config panels, + # but could also happen in the context of app install ... (or anywhere else + # where we may parse args etc...) + FileQuestion.clean_upload_dirs() + + # FIXME: i18n + logger.success(f"Action {action_id} successful") + operation_logger.success() + + def set( + self, key=None, value=None, args=None, args_file=None, operation_logger=None + ): + self.filter_key = key or "" + + # Read config panel toml + self._get_config_panel() + + if not self.config: + raise YunohostValidationError("config_no_panel") + + if (args is not None or args_file is not None) and value is not None: + raise YunohostValidationError( + "You should either provide a value, or a serie of args/args_file, but not both at the same time", + raw_msg=True, + ) + + if self.filter_key.count(".") != 2 and value is not None: + raise YunohostValidationError("config_cant_set_value_on_section") + + # Import and parse pre-answered options + logger.debug("Import and parse pre-answered options") + self._parse_pre_answered(args, value, args_file) + + # Read or get values and hydrate the config + self._load_current_values() + self._hydrate() + Question.operation_logger = operation_logger + self._ask() + + if operation_logger: + operation_logger.start() + + try: + self._apply() + except YunohostError: + raise + # Script got manually interrupted ... + # N.B. : KeyboardInterrupt does not inherit from Exception + except (KeyboardInterrupt, EOFError): + error = m18n.n("operation_interrupted") + logger.error(m18n.n("config_apply_failed", error=error)) + raise + # Something wrong happened in Yunohost's code (most probably hook_exec) + except Exception: + import traceback + + error = m18n.n("unexpected_error", error="\n" + traceback.format_exc()) + logger.error(m18n.n("config_apply_failed", error=error)) + raise + finally: + # Delete files uploaded from API + # FIXME : this is currently done in the context of config panels, + # but could also happen in the context of app install ... (or anywhere else + # where we may parse args etc...) + FileQuestion.clean_upload_dirs() + + self._reload_services() + + logger.success("Config updated as expected") + operation_logger.success() + + def _get_toml(self): + return read_toml(self.config_path) + + def _get_config_panel(self): + # Split filter_key + filter_key = self.filter_key.split(".") if self.filter_key != "" else [] + if len(filter_key) > 3: + raise YunohostError( + f"The filter key {filter_key} has too many sub-levels, the max is 3.", + raw_msg=True, + ) + + if not os.path.exists(self.config_path): + logger.debug(f"Config panel {self.config_path} doesn't exists") + return None + + toml_config_panel = self._get_toml() + + # Check TOML config panel is in a supported version + if float(toml_config_panel["version"]) < CONFIG_PANEL_VERSION_SUPPORTED: + logger.error( + f"Config panels version {toml_config_panel['version']} are not supported" + ) + return None + + # Transform toml format into internal format + format_description = { + "root": { + "properties": ["version", "i18n"], + "defaults": {"version": 1.0}, + }, + "panels": { + "properties": ["name", "services", "actions", "help"], + "defaults": { + "services": [], + "actions": {"apply": {"en": "Apply"}}, + }, + }, + "sections": { + "properties": ["name", "services", "optional", "help", "visible"], + "defaults": { + "name": "", + "services": [], + "optional": True, + "is_action_section": False, + }, + }, + "options": { + "properties": [ + "ask", + "type", + "bind", + "help", + "example", + "default", + "style", + "icon", + "placeholder", + "visible", + "optional", + "choices", + "yes", + "no", + "pattern", + "limit", + "min", + "max", + "step", + "accept", + "redact", + "filter", + "readonly", + "enabled", + # "confirm", # TODO: to ask confirmation before running an action + ], + "defaults": {}, + }, + } + + def _build_internal_config_panel(raw_infos, level): + """Convert TOML in internal format ('full' mode used by webadmin) + Here are some properties of 1.0 config panel in toml: + - node properties and node children are mixed, + - text are in english only + - some properties have default values + This function detects all children nodes and put them in a list + """ + + defaults = format_description[level]["defaults"] + properties = format_description[level]["properties"] + + # Start building the ouput (merging the raw infos + defaults) + out = {key: raw_infos.get(key, value) for key, value in defaults.items()} + + # Now fill the sublevels (+ apply filter_key) + i = list(format_description).index(level) + sublevel = list(format_description)[i + 1] if level != "options" else None + search_key = filter_key[i] if len(filter_key) > i else False + + for key, value in raw_infos.items(): + # Key/value are a child node + if ( + isinstance(value, OrderedDict) + and key not in properties + and sublevel + ): + # We exclude all nodes not referenced by the filter_key + if search_key and key != search_key: + continue + subnode = _build_internal_config_panel(value, sublevel) + subnode["id"] = key + if level == "root": + subnode.setdefault("name", {"en": key.capitalize()}) + elif level == "sections": + subnode["name"] = key # legacy + subnode.setdefault("optional", raw_infos.get("optional", True)) + # If this section contains at least one button, it becomes an "action" section + if subnode.get("type") == "button": + out["is_action_section"] = True + out.setdefault(sublevel, []).append(subnode) + # Key/value are a property + else: + if key not in properties: + logger.warning(f"Unknown key '{key}' found in config panel") + # Todo search all i18n keys + out[key] = ( + value + if key not in ["ask", "help", "name"] or isinstance(value, dict) + else {"en": value} + ) + return out + + self.config = _build_internal_config_panel(toml_config_panel, "root") + + try: + self.config["panels"][0]["sections"][0]["options"][0] + except (KeyError, IndexError): + raise YunohostValidationError( + "config_unknown_filter_key", filter_key=self.filter_key + ) + + # List forbidden keywords from helpers and sections toml (to avoid conflict) + forbidden_keywords = [ + "old", + "app", + "changed", + "file_hash", + "binds", + "types", + "formats", + "getter", + "setter", + "short_setting", + "type", + "bind", + "nothing_changed", + "changes_validated", + "result", + "max_progression", + ] + forbidden_keywords += format_description["sections"] + forbidden_readonly_types = ["password", "app", "domain", "user", "file"] + + for _, _, option in self._iterate(): + if option["id"] in forbidden_keywords: + raise YunohostError("config_forbidden_keyword", keyword=option["id"]) + if ( + option.get("readonly", False) + and option.get("type", "string") in forbidden_readonly_types + ): + raise YunohostError( + "config_forbidden_readonly_type", + type=option["type"], + id=option["id"], + ) + + return self.config + + def _hydrate(self): + # Hydrating config panel with current value + for _, section, option in self._iterate(): + if option["id"] not in self.values: + allowed_empty_types = [ + "alert", + "display_text", + "markdown", + "file", + "button", + ] + + if section["is_action_section"] and option.get("default") is not None: + self.values[option["id"]] = option["default"] + elif ( + option["type"] in allowed_empty_types + or option.get("bind") == "null" + ): + continue + else: + raise YunohostError( + f"Config panel question '{option['id']}' should be initialized with a value during install or upgrade.", + raw_msg=True, + ) + value = self.values[option["name"]] + + # Allow to use value instead of current_value in app config script. + # e.g. apps may write `echo 'value: "foobar"'` in the config file (which is more intuitive that `echo 'current_value: "foobar"'` + # For example hotspot used it... + # See https://github.com/YunoHost/yunohost/pull/1546 + if ( + isinstance(value, dict) + and "value" in value + and "current_value" not in value + ): + value["current_value"] = value["value"] + + # In general, the value is just a simple value. + # Sometimes it could be a dict used to overwrite the option itself + value = value if isinstance(value, dict) else {"current_value": value} + option.update(value) + + self.values[option["id"]] = value.get("current_value") + + return self.values + + def _ask(self, action=None): + logger.debug("Ask unanswered question and prevalidate data") + + if "i18n" in self.config: + for panel, section, option in self._iterate(): + if "ask" not in option: + option["ask"] = m18n.n(self.config["i18n"] + "_" + option["id"]) + # auto add i18n help text if present in locales + if m18n.key_exists(self.config["i18n"] + "_" + option["id"] + "_help"): + option["help"] = m18n.n( + self.config["i18n"] + "_" + option["id"] + "_help" + ) + + def display_header(message): + """CLI panel/section header display""" + if Moulinette.interface.type == "cli" and self.filter_key.count(".") < 2: + Moulinette.display(colorize(message, "purple")) + + for panel, section, obj in self._iterate(["panel", "section"]): + if ( + section + and section.get("visible") + and not evaluate_simple_js_expression( + section["visible"], context=self.future_values + ) + ): + continue + + # Ugly hack to skip action section ... except when when explicitly running actions + if not action: + if section and section["is_action_section"]: + continue + + if panel == obj: + name = _value_for_locale(panel["name"]) + display_header(f"\n{'='*40}\n>>>> {name}\n{'='*40}") + else: + name = _value_for_locale(section["name"]) + if name: + display_header(f"\n# {name}") + elif section: + # filter action section options in case of multiple buttons + section["options"] = [ + option + for option in section["options"] + if option.get("type", "string") != "button" + or option["id"] == action + ] + + if panel == obj: + continue + + # Check and ask unanswered questions + prefilled_answers = self.args.copy() + prefilled_answers.update(self.new_values) + + questions = ask_questions_and_parse_answers( + {question["name"]: question for question in section["options"]}, + prefilled_answers=prefilled_answers, + current_values=self.values, + hooks=self.hooks, + ) + self.new_values.update( + { + question.name: question.value + for question in questions + if question.value is not None + } + ) + + def _get_default_values(self): + return { + option["id"]: option["default"] + for _, _, option in self._iterate() + if "default" in option + } + + @property + def future_values(self): + return {**self.values, **self.new_values} + + def __getattr__(self, name): + if "new_values" in self.__dict__ and name in self.new_values: + return self.new_values[name] + + if "values" in self.__dict__ and name in self.values: + return self.values[name] + + return self.__dict__[name] + + def _load_current_values(self): + """ + Retrieve entries in YAML file + And set default values if needed + """ + + # Inject defaults if needed (using the magic .update() ;)) + self.values = self._get_default_values() + + # Retrieve entries in the YAML + if os.path.exists(self.save_path) and os.path.isfile(self.save_path): + self.values.update(read_yaml(self.save_path) or {}) + + def _parse_pre_answered(self, args, value, args_file): + args = urllib.parse.parse_qs(args or "", keep_blank_values=True) + self.args = {key: ",".join(value_) for key, value_ in args.items()} + + if args_file: + # Import YAML / JSON file but keep --args values + self.args = {**read_yaml(args_file), **self.args} + + if value is not None: + self.args = {self.filter_key.split(".")[-1]: value} + + def _apply(self): + logger.info("Saving the new configuration...") + dir_path = os.path.dirname(os.path.realpath(self.save_path)) + if not os.path.exists(dir_path): + mkdir(dir_path, mode=0o700) + + values_to_save = self.future_values + if self.save_mode == "diff": + defaults = self._get_default_values() + values_to_save = { + k: v for k, v in values_to_save.items() if defaults.get(k) != v + } + + # Save the settings to the .yaml file + write_to_yaml(self.save_path, values_to_save) + + def _reload_services(self): + from yunohost.service import service_reload_or_restart + + services_to_reload = set() + for panel, section, obj in self._iterate(["panel", "section", "option"]): + services_to_reload |= set(obj.get("services", [])) + + services_to_reload = list(services_to_reload) + services_to_reload.sort(key="nginx".__eq__) + if services_to_reload: + logger.info("Reloading services...") + for service in services_to_reload: + if hasattr(self, "entity"): + service = service.replace("__APP__", self.entity) + service_reload_or_restart(service) + + def _iterate(self, trigger=["option"]): + for panel in self.config.get("panels", []): + if "panel" in trigger: + yield (panel, None, panel) + for section in panel.get("sections", []): + if "section" in trigger: + yield (panel, section, section) + if "option" in trigger: + for option in section.get("options", []): + yield (panel, section, option)