# # Copyright (c) 2023 YunoHost Contributors # # This file is part of YunoHost (see https://yunohost.org) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # import glob import os import re import urllib.parse from collections import OrderedDict from typing import Union from moulinette.interfaces.cli import colorize from moulinette import Moulinette, m18n from moulinette.utils.log import getActionLogger from moulinette.utils.filesystem import ( read_toml, read_yaml, write_to_yaml, mkdir, ) from yunohost.utils.i18n import _value_for_locale from yunohost.utils.error import YunohostError, YunohostValidationError from yunohost.utils.form import ( ARGUMENTS_TYPE_PARSERS, FileQuestion, Question, ask_questions_and_parse_answers, evaluate_simple_js_expression, ) logger = getActionLogger("yunohost.configpanel") CONFIG_PANEL_VERSION_SUPPORTED = 1.0 class ConfigPanel: entity_type = "config" save_path_tpl: Union[str, None] = None config_path_tpl = "/usr/share/yunohost/config_{entity_type}.toml" save_mode = "full" @classmethod def list(cls): """ List available config panel """ try: entities = [ re.match( "^" + cls.save_path_tpl.format(entity="(?p)") + "$", f ).group("entity") for f in glob.glob(cls.save_path_tpl.format(entity="*")) if os.path.isfile(f) ] except FileNotFoundError: entities = [] return entities def __init__(self, entity, config_path=None, save_path=None, creation=False): self.entity = entity self.config_path = config_path if not config_path: self.config_path = self.config_path_tpl.format( entity=entity, entity_type=self.entity_type ) self.save_path = save_path if not save_path and self.save_path_tpl: self.save_path = self.save_path_tpl.format(entity=entity) self.config = {} self.values = {} self.new_values = {} if ( self.save_path and self.save_mode != "diff" and not creation and not os.path.exists(self.save_path) ): raise YunohostValidationError( f"{self.entity_type}_unknown", **{self.entity_type: entity} ) if self.save_path and creation and os.path.exists(self.save_path): raise YunohostValidationError( f"{self.entity_type}_exists", **{self.entity_type: entity} ) # Search for hooks in the config panel self.hooks = { func: getattr(self, func) for func in dir(self) if callable(getattr(self, func)) and re.match("^(validate|post_ask)__", func) } def get(self, key="", mode="classic"): self.filter_key = key or "" # Read config panel toml self._get_config_panel() if not self.config: raise YunohostValidationError("config_no_panel") # Read or get values and hydrate the config self._load_current_values() self._hydrate() # In 'classic' mode, we display the current value if key refer to an option if self.filter_key.count(".") == 2 and mode == "classic": option = self.filter_key.split(".")[-1] value = self.values.get(option, None) option_type = None for _, _, option_ in self._iterate(): if option_["id"] == option: option_type = ARGUMENTS_TYPE_PARSERS[option_["type"]] break return option_type.normalize(value) if option_type else value # Format result in 'classic' or 'export' mode logger.debug(f"Formating result in '{mode}' mode") result = {} for panel, section, option in self._iterate(): if section["is_action_section"] and mode != "full": continue key = f"{panel['id']}.{section['id']}.{option['id']}" if mode == "export": result[option["id"]] = option.get("current_value") continue ask = None if "ask" in option: ask = _value_for_locale(option["ask"]) elif "i18n" in self.config: ask = m18n.n(self.config["i18n"] + "_" + option["id"]) if mode == "full": option["ask"] = ask question_class = ARGUMENTS_TYPE_PARSERS[option.get("type", "string")] # FIXME : maybe other properties should be taken from the question, not just choices ?. option["choices"] = question_class(option).choices option["default"] = question_class(option).default option["pattern"] = question_class(option).pattern else: result[key] = {"ask": ask} if "current_value" in option: question_class = ARGUMENTS_TYPE_PARSERS[ option.get("type", "string") ] result[key]["value"] = question_class.humanize( option["current_value"], option ) # FIXME: semantics, technically here this is not about a prompt... if question_class.hide_user_input_in_prompt: result[key][ "value" ] = "**************" # Prevent displaying password in `config get` if mode == "full": return self.config else: return result def list_actions(self): actions = {} # FIXME : meh, loading the entire config panel is again going to cause # stupid issues for domain (e.g loading registrar stuff when willing to just list available actions ...) self.filter_key = "" self._get_config_panel() for panel, section, option in self._iterate(): if option["type"] == "button": key = f"{panel['id']}.{section['id']}.{option['id']}" actions[key] = _value_for_locale(option["ask"]) return actions def run_action(self, action=None, args=None, args_file=None, operation_logger=None): # # FIXME : this stuff looks a lot like set() ... # self.filter_key = ".".join(action.split(".")[:2]) action_id = action.split(".")[2] # Read config panel toml self._get_config_panel() # FIXME: should also check that there's indeed a key called action if not self.config: raise YunohostValidationError(f"No action named {action}", raw_msg=True) # Import and parse pre-answered options logger.debug("Import and parse pre-answered options") self._parse_pre_answered(args, None, args_file) # Read or get values and hydrate the config self._load_current_values() self._hydrate() Question.operation_logger = operation_logger self._ask(action=action_id) # FIXME: here, we could want to check constrains on # the action's visibility / requirements wrt to the answer to questions ... if operation_logger: operation_logger.start() try: self._run_action(action_id) except YunohostError: raise # Script got manually interrupted ... # N.B. : KeyboardInterrupt does not inherit from Exception except (KeyboardInterrupt, EOFError): error = m18n.n("operation_interrupted") logger.error(m18n.n("config_action_failed", action=action, error=error)) raise # Something wrong happened in Yunohost's code (most probably hook_exec) except Exception: import traceback error = m18n.n("unexpected_error", error="\n" + traceback.format_exc()) logger.error(m18n.n("config_action_failed", action=action, error=error)) raise finally: # Delete files uploaded from API # FIXME : this is currently done in the context of config panels, # but could also happen in the context of app install ... (or anywhere else # where we may parse args etc...) FileQuestion.clean_upload_dirs() # FIXME: i18n logger.success(f"Action {action_id} successful") operation_logger.success() def set( self, key=None, value=None, args=None, args_file=None, operation_logger=None ): self.filter_key = key or "" # Read config panel toml self._get_config_panel() if not self.config: raise YunohostValidationError("config_no_panel") if (args is not None or args_file is not None) and value is not None: raise YunohostValidationError( "You should either provide a value, or a serie of args/args_file, but not both at the same time", raw_msg=True, ) if self.filter_key.count(".") != 2 and value is not None: raise YunohostValidationError("config_cant_set_value_on_section") # Import and parse pre-answered options logger.debug("Import and parse pre-answered options") self._parse_pre_answered(args, value, args_file) # Read or get values and hydrate the config self._load_current_values() self._hydrate() Question.operation_logger = operation_logger self._ask() if operation_logger: operation_logger.start() try: self._apply() except YunohostError: raise # Script got manually interrupted ... # N.B. : KeyboardInterrupt does not inherit from Exception except (KeyboardInterrupt, EOFError): error = m18n.n("operation_interrupted") logger.error(m18n.n("config_apply_failed", error=error)) raise # Something wrong happened in Yunohost's code (most probably hook_exec) except Exception: import traceback error = m18n.n("unexpected_error", error="\n" + traceback.format_exc()) logger.error(m18n.n("config_apply_failed", error=error)) raise finally: # Delete files uploaded from API # FIXME : this is currently done in the context of config panels, # but could also happen in the context of app install ... (or anywhere else # where we may parse args etc...) FileQuestion.clean_upload_dirs() self._reload_services() logger.success("Config updated as expected") operation_logger.success() def _get_toml(self): return read_toml(self.config_path) def _get_config_panel(self): # Split filter_key filter_key = self.filter_key.split(".") if self.filter_key != "" else [] if len(filter_key) > 3: raise YunohostError( f"The filter key {filter_key} has too many sub-levels, the max is 3.", raw_msg=True, ) if not os.path.exists(self.config_path): logger.debug(f"Config panel {self.config_path} doesn't exists") return None toml_config_panel = self._get_toml() # Check TOML config panel is in a supported version if float(toml_config_panel["version"]) < CONFIG_PANEL_VERSION_SUPPORTED: logger.error( f"Config panels version {toml_config_panel['version']} are not supported" ) return None # Transform toml format into internal format format_description = { "root": { "properties": ["version", "i18n"], "defaults": {"version": 1.0}, }, "panels": { "properties": ["name", "services", "actions", "help"], "defaults": { "services": [], "actions": {"apply": {"en": "Apply"}}, }, }, "sections": { "properties": ["name", "services", "optional", "help", "visible"], "defaults": { "name": "", "services": [], "optional": True, "is_action_section": False, }, }, "options": { "properties": [ "ask", "type", "bind", "help", "example", "default", "style", "icon", "placeholder", "visible", "optional", "choices", "yes", "no", "pattern", "limit", "min", "max", "step", "accept", "redact", "filter", "readonly", "enabled", # "confirm", # TODO: to ask confirmation before running an action ], "defaults": {}, }, } def _build_internal_config_panel(raw_infos, level): """Convert TOML in internal format ('full' mode used by webadmin) Here are some properties of 1.0 config panel in toml: - node properties and node children are mixed, - text are in english only - some properties have default values This function detects all children nodes and put them in a list """ defaults = format_description[level]["defaults"] properties = format_description[level]["properties"] # Start building the ouput (merging the raw infos + defaults) out = {key: raw_infos.get(key, value) for key, value in defaults.items()} # Now fill the sublevels (+ apply filter_key) i = list(format_description).index(level) sublevel = list(format_description)[i + 1] if level != "options" else None search_key = filter_key[i] if len(filter_key) > i else False for key, value in raw_infos.items(): # Key/value are a child node if ( isinstance(value, OrderedDict) and key not in properties and sublevel ): # We exclude all nodes not referenced by the filter_key if search_key and key != search_key: continue subnode = _build_internal_config_panel(value, sublevel) subnode["id"] = key if level == "root": subnode.setdefault("name", {"en": key.capitalize()}) elif level == "sections": subnode["name"] = key # legacy subnode.setdefault("optional", raw_infos.get("optional", True)) # If this section contains at least one button, it becomes an "action" section if subnode.get("type") == "button": out["is_action_section"] = True out.setdefault(sublevel, []).append(subnode) # Key/value are a property else: if key not in properties: logger.warning(f"Unknown key '{key}' found in config panel") # Todo search all i18n keys out[key] = ( value if key not in ["ask", "help", "name"] or isinstance(value, dict) else {"en": value} ) return out self.config = _build_internal_config_panel(toml_config_panel, "root") try: self.config["panels"][0]["sections"][0]["options"][0] except (KeyError, IndexError): raise YunohostValidationError( "config_unknown_filter_key", filter_key=self.filter_key ) # List forbidden keywords from helpers and sections toml (to avoid conflict) forbidden_keywords = [ "old", "app", "changed", "file_hash", "binds", "types", "formats", "getter", "setter", "short_setting", "type", "bind", "nothing_changed", "changes_validated", "result", "max_progression", ] forbidden_keywords += format_description["sections"] forbidden_readonly_types = ["password", "app", "domain", "user", "file"] for _, _, option in self._iterate(): if option["id"] in forbidden_keywords: raise YunohostError("config_forbidden_keyword", keyword=option["id"]) if ( option.get("readonly", False) and option.get("type", "string") in forbidden_readonly_types ): raise YunohostError( "config_forbidden_readonly_type", type=option["type"], id=option["id"], ) return self.config def _hydrate(self): # Hydrating config panel with current value for _, section, option in self._iterate(): if option["id"] not in self.values: allowed_empty_types = [ "alert", "display_text", "markdown", "file", "button", ] if section["is_action_section"] and option.get("default") is not None: self.values[option["id"]] = option["default"] elif ( option["type"] in allowed_empty_types or option.get("bind") == "null" ): continue else: raise YunohostError( f"Config panel question '{option['id']}' should be initialized with a value during install or upgrade.", raw_msg=True, ) value = self.values[option["name"]] # Allow to use value instead of current_value in app config script. # e.g. apps may write `echo 'value: "foobar"'` in the config file (which is more intuitive that `echo 'current_value: "foobar"'` # For example hotspot used it... # See https://github.com/YunoHost/yunohost/pull/1546 if ( isinstance(value, dict) and "value" in value and "current_value" not in value ): value["current_value"] = value["value"] # In general, the value is just a simple value. # Sometimes it could be a dict used to overwrite the option itself value = value if isinstance(value, dict) else {"current_value": value} option.update(value) self.values[option["id"]] = value.get("current_value") return self.values def _ask(self, action=None): logger.debug("Ask unanswered question and prevalidate data") if "i18n" in self.config: for panel, section, option in self._iterate(): if "ask" not in option: option["ask"] = m18n.n(self.config["i18n"] + "_" + option["id"]) # auto add i18n help text if present in locales if m18n.key_exists(self.config["i18n"] + "_" + option["id"] + "_help"): option["help"] = m18n.n( self.config["i18n"] + "_" + option["id"] + "_help" ) def display_header(message): """CLI panel/section header display""" if Moulinette.interface.type == "cli" and self.filter_key.count(".") < 2: Moulinette.display(colorize(message, "purple")) for panel, section, obj in self._iterate(["panel", "section"]): if ( section and section.get("visible") and not evaluate_simple_js_expression( section["visible"], context=self.future_values ) ): continue # Ugly hack to skip action section ... except when when explicitly running actions if not action: if section and section["is_action_section"]: continue if panel == obj: name = _value_for_locale(panel["name"]) display_header(f"\n{'='*40}\n>>>> {name}\n{'='*40}") else: name = _value_for_locale(section["name"]) if name: display_header(f"\n# {name}") elif section: # filter action section options in case of multiple buttons section["options"] = [ option for option in section["options"] if option.get("type", "string") != "button" or option["id"] == action ] if panel == obj: continue # Check and ask unanswered questions prefilled_answers = self.args.copy() prefilled_answers.update(self.new_values) questions = ask_questions_and_parse_answers( {question["name"]: question for question in section["options"]}, prefilled_answers=prefilled_answers, current_values=self.values, hooks=self.hooks, ) self.new_values.update( { question.name: question.value for question in questions if question.value is not None } ) def _get_default_values(self): return { option["id"]: option["default"] for _, _, option in self._iterate() if "default" in option } @property def future_values(self): return {**self.values, **self.new_values} def __getattr__(self, name): if "new_values" in self.__dict__ and name in self.new_values: return self.new_values[name] if "values" in self.__dict__ and name in self.values: return self.values[name] return self.__dict__[name] def _load_current_values(self): """ Retrieve entries in YAML file And set default values if needed """ # Inject defaults if needed (using the magic .update() ;)) self.values = self._get_default_values() # Retrieve entries in the YAML if os.path.exists(self.save_path) and os.path.isfile(self.save_path): self.values.update(read_yaml(self.save_path) or {}) def _parse_pre_answered(self, args, value, args_file): args = urllib.parse.parse_qs(args or "", keep_blank_values=True) self.args = {key: ",".join(value_) for key, value_ in args.items()} if args_file: # Import YAML / JSON file but keep --args values self.args = {**read_yaml(args_file), **self.args} if value is not None: self.args = {self.filter_key.split(".")[-1]: value} def _apply(self): logger.info("Saving the new configuration...") dir_path = os.path.dirname(os.path.realpath(self.save_path)) if not os.path.exists(dir_path): mkdir(dir_path, mode=0o700) values_to_save = self.future_values if self.save_mode == "diff": defaults = self._get_default_values() values_to_save = { k: v for k, v in values_to_save.items() if defaults.get(k) != v } # Save the settings to the .yaml file write_to_yaml(self.save_path, values_to_save) def _reload_services(self): from yunohost.service import service_reload_or_restart services_to_reload = set() for panel, section, obj in self._iterate(["panel", "section", "option"]): services_to_reload |= set(obj.get("services", [])) services_to_reload = list(services_to_reload) services_to_reload.sort(key="nginx".__eq__) if services_to_reload: logger.info("Reloading services...") for service in services_to_reload: if hasattr(self, "entity"): service = service.replace("__APP__", self.entity) service_reload_or_restart(service) def _iterate(self, trigger=["option"]): for panel in self.config.get("panels", []): if "panel" in trigger: yield (panel, None, panel) for section in panel.get("sections", []): if "section" in trigger: yield (panel, section, section) if "option" in trigger: for option in section.get("options", []): yield (panel, section, option)