diff --git a/src/utils/form.py b/src/utils/form.py index a48883c38..9907dafb1 100644 --- a/src/utils/form.py +++ b/src/utils/form.py @@ -16,7 +16,6 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # -import glob import os import re import urllib.parse @@ -24,7 +23,6 @@ import tempfile import shutil import ast import operator as op -from collections import OrderedDict from typing import Optional, Dict, List, Union, Any, Mapping, Callable from moulinette.interfaces.cli import colorize @@ -33,18 +31,13 @@ from moulinette.utils.log import getActionLogger from moulinette.utils.filesystem import ( read_file, write_to_file, - read_toml, - read_yaml, - write_to_yaml, - mkdir, ) from yunohost.utils.i18n import _value_for_locale from yunohost.utils.error import YunohostError, YunohostValidationError from yunohost.log import OperationLogger -logger = getActionLogger("yunohost.config") -CONFIG_PANEL_VERSION_SUPPORTED = 1.0 +logger = getActionLogger("yunohost.form") # Those js-like evaluate functions are used to eval safely visible attributes @@ -190,653 +183,6 @@ def evaluate_simple_js_expression(expr, context={}): return evaluate_simple_ast(node, context) -class ConfigPanel: - entity_type = "config" - save_path_tpl: Union[str, None] = None - config_path_tpl = "/usr/share/yunohost/config_{entity_type}.toml" - save_mode = "full" - - @classmethod - def list(cls): - """ - List available config panel - """ - try: - entities = [ - re.match( - "^" + cls.save_path_tpl.format(entity="(?p)") + "$", f - ).group("entity") - for f in glob.glob(cls.save_path_tpl.format(entity="*")) - if os.path.isfile(f) - ] - except FileNotFoundError: - entities = [] - return entities - - def __init__(self, entity, config_path=None, save_path=None, creation=False): - self.entity = entity - self.config_path = config_path - if not config_path: - self.config_path = self.config_path_tpl.format( - entity=entity, entity_type=self.entity_type - ) - self.save_path = save_path - if not save_path and self.save_path_tpl: - self.save_path = self.save_path_tpl.format(entity=entity) - self.config = {} - self.values = {} - self.new_values = {} - - if ( - self.save_path - and self.save_mode != "diff" - and not creation - and not os.path.exists(self.save_path) - ): - raise YunohostValidationError( - f"{self.entity_type}_unknown", **{self.entity_type: entity} - ) - if self.save_path and creation and os.path.exists(self.save_path): - raise YunohostValidationError( - f"{self.entity_type}_exists", **{self.entity_type: entity} - ) - - # Search for hooks in the config panel - self.hooks = { - func: getattr(self, func) - for func in dir(self) - if callable(getattr(self, func)) - and re.match("^(validate|post_ask)__", func) - } - - def get(self, key="", mode="classic"): - self.filter_key = key or "" - - # Read config panel toml - self._get_config_panel() - - if not self.config: - raise YunohostValidationError("config_no_panel") - - # Read or get values and hydrate the config - self._load_current_values() - self._hydrate() - - # In 'classic' mode, we display the current value if key refer to an option - if self.filter_key.count(".") == 2 and mode == "classic": - option = self.filter_key.split(".")[-1] - value = self.values.get(option, None) - - option_type = None - for _, _, option_ in self._iterate(): - if option_["id"] == option: - option_type = ARGUMENTS_TYPE_PARSERS[option_["type"]] - break - - return option_type.normalize(value) if option_type else value - - # Format result in 'classic' or 'export' mode - logger.debug(f"Formating result in '{mode}' mode") - result = {} - for panel, section, option in self._iterate(): - if section["is_action_section"] and mode != "full": - continue - - key = f"{panel['id']}.{section['id']}.{option['id']}" - if mode == "export": - result[option["id"]] = option.get("current_value") - continue - - ask = None - if "ask" in option: - ask = _value_for_locale(option["ask"]) - elif "i18n" in self.config: - ask = m18n.n(self.config["i18n"] + "_" + option["id"]) - - if mode == "full": - option["ask"] = ask - question_class = ARGUMENTS_TYPE_PARSERS[option.get("type", "string")] - # FIXME : maybe other properties should be taken from the question, not just choices ?. - option["choices"] = question_class(option).choices - option["default"] = question_class(option).default - option["pattern"] = question_class(option).pattern - else: - result[key] = {"ask": ask} - if "current_value" in option: - question_class = ARGUMENTS_TYPE_PARSERS[ - option.get("type", "string") - ] - result[key]["value"] = question_class.humanize( - option["current_value"], option - ) - # FIXME: semantics, technically here this is not about a prompt... - if question_class.hide_user_input_in_prompt: - result[key][ - "value" - ] = "**************" # Prevent displaying password in `config get` - - if mode == "full": - return self.config - else: - return result - - def list_actions(self): - actions = {} - - # FIXME : meh, loading the entire config panel is again going to cause - # stupid issues for domain (e.g loading registrar stuff when willing to just list available actions ...) - self.filter_key = "" - self._get_config_panel() - for panel, section, option in self._iterate(): - if option["type"] == "button": - key = f"{panel['id']}.{section['id']}.{option['id']}" - actions[key] = _value_for_locale(option["ask"]) - - return actions - - def run_action(self, action=None, args=None, args_file=None, operation_logger=None): - # - # FIXME : this stuff looks a lot like set() ... - # - - self.filter_key = ".".join(action.split(".")[:2]) - action_id = action.split(".")[2] - - # Read config panel toml - self._get_config_panel() - - # FIXME: should also check that there's indeed a key called action - if not self.config: - raise YunohostValidationError(f"No action named {action}", raw_msg=True) - - # Import and parse pre-answered options - logger.debug("Import and parse pre-answered options") - self._parse_pre_answered(args, None, args_file) - - # Read or get values and hydrate the config - self._load_current_values() - self._hydrate() - Question.operation_logger = operation_logger - self._ask(action=action_id) - - # FIXME: here, we could want to check constrains on - # the action's visibility / requirements wrt to the answer to questions ... - - if operation_logger: - operation_logger.start() - - try: - self._run_action(action_id) - except YunohostError: - raise - # Script got manually interrupted ... - # N.B. : KeyboardInterrupt does not inherit from Exception - except (KeyboardInterrupt, EOFError): - error = m18n.n("operation_interrupted") - logger.error(m18n.n("config_action_failed", action=action, error=error)) - raise - # Something wrong happened in Yunohost's code (most probably hook_exec) - except Exception: - import traceback - - error = m18n.n("unexpected_error", error="\n" + traceback.format_exc()) - logger.error(m18n.n("config_action_failed", action=action, error=error)) - raise - finally: - # Delete files uploaded from API - # FIXME : this is currently done in the context of config panels, - # but could also happen in the context of app install ... (or anywhere else - # where we may parse args etc...) - FileQuestion.clean_upload_dirs() - - # FIXME: i18n - logger.success(f"Action {action_id} successful") - operation_logger.success() - - def set( - self, key=None, value=None, args=None, args_file=None, operation_logger=None - ): - self.filter_key = key or "" - - # Read config panel toml - self._get_config_panel() - - if not self.config: - raise YunohostValidationError("config_no_panel") - - if (args is not None or args_file is not None) and value is not None: - raise YunohostValidationError( - "You should either provide a value, or a serie of args/args_file, but not both at the same time", - raw_msg=True, - ) - - if self.filter_key.count(".") != 2 and value is not None: - raise YunohostValidationError("config_cant_set_value_on_section") - - # Import and parse pre-answered options - logger.debug("Import and parse pre-answered options") - self._parse_pre_answered(args, value, args_file) - - # Read or get values and hydrate the config - self._load_current_values() - self._hydrate() - Question.operation_logger = operation_logger - self._ask() - - if operation_logger: - operation_logger.start() - - try: - self._apply() - except YunohostError: - raise - # Script got manually interrupted ... - # N.B. : KeyboardInterrupt does not inherit from Exception - except (KeyboardInterrupt, EOFError): - error = m18n.n("operation_interrupted") - logger.error(m18n.n("config_apply_failed", error=error)) - raise - # Something wrong happened in Yunohost's code (most probably hook_exec) - except Exception: - import traceback - - error = m18n.n("unexpected_error", error="\n" + traceback.format_exc()) - logger.error(m18n.n("config_apply_failed", error=error)) - raise - finally: - # Delete files uploaded from API - # FIXME : this is currently done in the context of config panels, - # but could also happen in the context of app install ... (or anywhere else - # where we may parse args etc...) - FileQuestion.clean_upload_dirs() - - self._reload_services() - - logger.success("Config updated as expected") - operation_logger.success() - - def _get_toml(self): - return read_toml(self.config_path) - - def _get_config_panel(self): - # Split filter_key - filter_key = self.filter_key.split(".") if self.filter_key != "" else [] - if len(filter_key) > 3: - raise YunohostError( - f"The filter key {filter_key} has too many sub-levels, the max is 3.", - raw_msg=True, - ) - - if not os.path.exists(self.config_path): - logger.debug(f"Config panel {self.config_path} doesn't exists") - return None - - toml_config_panel = self._get_toml() - - # Check TOML config panel is in a supported version - if float(toml_config_panel["version"]) < CONFIG_PANEL_VERSION_SUPPORTED: - logger.error( - f"Config panels version {toml_config_panel['version']} are not supported" - ) - return None - - # Transform toml format into internal format - format_description = { - "root": { - "properties": ["version", "i18n"], - "defaults": {"version": 1.0}, - }, - "panels": { - "properties": ["name", "services", "actions", "help"], - "defaults": { - "services": [], - "actions": {"apply": {"en": "Apply"}}, - }, - }, - "sections": { - "properties": ["name", "services", "optional", "help", "visible"], - "defaults": { - "name": "", - "services": [], - "optional": True, - "is_action_section": False, - }, - }, - "options": { - "properties": [ - "ask", - "type", - "bind", - "help", - "example", - "default", - "style", - "icon", - "placeholder", - "visible", - "optional", - "choices", - "yes", - "no", - "pattern", - "limit", - "min", - "max", - "step", - "accept", - "redact", - "filter", - "readonly", - "enabled", - # "confirm", # TODO: to ask confirmation before running an action - ], - "defaults": {}, - }, - } - - def _build_internal_config_panel(raw_infos, level): - """Convert TOML in internal format ('full' mode used by webadmin) - Here are some properties of 1.0 config panel in toml: - - node properties and node children are mixed, - - text are in english only - - some properties have default values - This function detects all children nodes and put them in a list - """ - - defaults = format_description[level]["defaults"] - properties = format_description[level]["properties"] - - # Start building the ouput (merging the raw infos + defaults) - out = {key: raw_infos.get(key, value) for key, value in defaults.items()} - - # Now fill the sublevels (+ apply filter_key) - i = list(format_description).index(level) - sublevel = list(format_description)[i + 1] if level != "options" else None - search_key = filter_key[i] if len(filter_key) > i else False - - for key, value in raw_infos.items(): - # Key/value are a child node - if ( - isinstance(value, OrderedDict) - and key not in properties - and sublevel - ): - # We exclude all nodes not referenced by the filter_key - if search_key and key != search_key: - continue - subnode = _build_internal_config_panel(value, sublevel) - subnode["id"] = key - if level == "root": - subnode.setdefault("name", {"en": key.capitalize()}) - elif level == "sections": - subnode["name"] = key # legacy - subnode.setdefault("optional", raw_infos.get("optional", True)) - # If this section contains at least one button, it becomes an "action" section - if subnode.get("type") == "button": - out["is_action_section"] = True - out.setdefault(sublevel, []).append(subnode) - # Key/value are a property - else: - if key not in properties: - logger.warning(f"Unknown key '{key}' found in config panel") - # Todo search all i18n keys - out[key] = ( - value - if key not in ["ask", "help", "name"] or isinstance(value, dict) - else {"en": value} - ) - return out - - self.config = _build_internal_config_panel(toml_config_panel, "root") - - try: - self.config["panels"][0]["sections"][0]["options"][0] - except (KeyError, IndexError): - raise YunohostValidationError( - "config_unknown_filter_key", filter_key=self.filter_key - ) - - # List forbidden keywords from helpers and sections toml (to avoid conflict) - forbidden_keywords = [ - "old", - "app", - "changed", - "file_hash", - "binds", - "types", - "formats", - "getter", - "setter", - "short_setting", - "type", - "bind", - "nothing_changed", - "changes_validated", - "result", - "max_progression", - ] - forbidden_keywords += format_description["sections"] - forbidden_readonly_types = ["password", "app", "domain", "user", "file"] - - for _, _, option in self._iterate(): - if option["id"] in forbidden_keywords: - raise YunohostError("config_forbidden_keyword", keyword=option["id"]) - if ( - option.get("readonly", False) - and option.get("type", "string") in forbidden_readonly_types - ): - raise YunohostError( - "config_forbidden_readonly_type", - type=option["type"], - id=option["id"], - ) - - return self.config - - def _hydrate(self): - # Hydrating config panel with current value - for _, section, option in self._iterate(): - if option["id"] not in self.values: - allowed_empty_types = [ - "alert", - "display_text", - "markdown", - "file", - "button", - ] - - if section["is_action_section"] and option.get("default") is not None: - self.values[option["id"]] = option["default"] - elif ( - option["type"] in allowed_empty_types - or option.get("bind") == "null" - ): - continue - else: - raise YunohostError( - f"Config panel question '{option['id']}' should be initialized with a value during install or upgrade.", - raw_msg=True, - ) - value = self.values[option["name"]] - - # Allow to use value instead of current_value in app config script. - # e.g. apps may write `echo 'value: "foobar"'` in the config file (which is more intuitive that `echo 'current_value: "foobar"'` - # For example hotspot used it... - # See https://github.com/YunoHost/yunohost/pull/1546 - if ( - isinstance(value, dict) - and "value" in value - and "current_value" not in value - ): - value["current_value"] = value["value"] - - # In general, the value is just a simple value. - # Sometimes it could be a dict used to overwrite the option itself - value = value if isinstance(value, dict) else {"current_value": value} - option.update(value) - - self.values[option["id"]] = value.get("current_value") - - return self.values - - def _ask(self, action=None): - logger.debug("Ask unanswered question and prevalidate data") - - if "i18n" in self.config: - for panel, section, option in self._iterate(): - if "ask" not in option: - option["ask"] = m18n.n(self.config["i18n"] + "_" + option["id"]) - # auto add i18n help text if present in locales - if m18n.key_exists(self.config["i18n"] + "_" + option["id"] + "_help"): - option["help"] = m18n.n( - self.config["i18n"] + "_" + option["id"] + "_help" - ) - - def display_header(message): - """CLI panel/section header display""" - if Moulinette.interface.type == "cli" and self.filter_key.count(".") < 2: - Moulinette.display(colorize(message, "purple")) - - for panel, section, obj in self._iterate(["panel", "section"]): - if ( - section - and section.get("visible") - and not evaluate_simple_js_expression( - section["visible"], context=self.future_values - ) - ): - continue - - # Ugly hack to skip action section ... except when when explicitly running actions - if not action: - if section and section["is_action_section"]: - continue - - if panel == obj: - name = _value_for_locale(panel["name"]) - display_header(f"\n{'='*40}\n>>>> {name}\n{'='*40}") - else: - name = _value_for_locale(section["name"]) - if name: - display_header(f"\n# {name}") - elif section: - # filter action section options in case of multiple buttons - section["options"] = [ - option - for option in section["options"] - if option.get("type", "string") != "button" - or option["id"] == action - ] - - if panel == obj: - continue - - # Check and ask unanswered questions - prefilled_answers = self.args.copy() - prefilled_answers.update(self.new_values) - - questions = ask_questions_and_parse_answers( - {question["name"]: question for question in section["options"]}, - prefilled_answers=prefilled_answers, - current_values=self.values, - hooks=self.hooks, - ) - self.new_values.update( - { - question.name: question.value - for question in questions - if question.value is not None - } - ) - - def _get_default_values(self): - return { - option["id"]: option["default"] - for _, _, option in self._iterate() - if "default" in option - } - - @property - def future_values(self): - return {**self.values, **self.new_values} - - def __getattr__(self, name): - if "new_values" in self.__dict__ and name in self.new_values: - return self.new_values[name] - - if "values" in self.__dict__ and name in self.values: - return self.values[name] - - return self.__dict__[name] - - def _load_current_values(self): - """ - Retrieve entries in YAML file - And set default values if needed - """ - - # Inject defaults if needed (using the magic .update() ;)) - self.values = self._get_default_values() - - # Retrieve entries in the YAML - if os.path.exists(self.save_path) and os.path.isfile(self.save_path): - self.values.update(read_yaml(self.save_path) or {}) - - def _parse_pre_answered(self, args, value, args_file): - args = urllib.parse.parse_qs(args or "", keep_blank_values=True) - self.args = {key: ",".join(value_) for key, value_ in args.items()} - - if args_file: - # Import YAML / JSON file but keep --args values - self.args = {**read_yaml(args_file), **self.args} - - if value is not None: - self.args = {self.filter_key.split(".")[-1]: value} - - def _apply(self): - logger.info("Saving the new configuration...") - dir_path = os.path.dirname(os.path.realpath(self.save_path)) - if not os.path.exists(dir_path): - mkdir(dir_path, mode=0o700) - - values_to_save = self.future_values - if self.save_mode == "diff": - defaults = self._get_default_values() - values_to_save = { - k: v for k, v in values_to_save.items() if defaults.get(k) != v - } - - # Save the settings to the .yaml file - write_to_yaml(self.save_path, values_to_save) - - def _reload_services(self): - from yunohost.service import service_reload_or_restart - - services_to_reload = set() - for panel, section, obj in self._iterate(["panel", "section", "option"]): - services_to_reload |= set(obj.get("services", [])) - - services_to_reload = list(services_to_reload) - services_to_reload.sort(key="nginx".__eq__) - if services_to_reload: - logger.info("Reloading services...") - for service in services_to_reload: - if hasattr(self, "entity"): - service = service.replace("__APP__", self.entity) - service_reload_or_restart(service) - - def _iterate(self, trigger=["option"]): - for panel in self.config.get("panels", []): - if "panel" in trigger: - yield (panel, None, panel) - for section in panel.get("sections", []): - if "section" in trigger: - yield (panel, section, section) - if "option" in trigger: - for option in section.get("options", []): - yield (panel, section, option) - - class Question: hide_user_input_in_prompt = False pattern: Optional[Dict] = None