diff --git a/src/app.py b/src/app.py
index b37b680ec..1daa14d98 100644
--- a/src/app.py
+++ b/src/app.py
@@ -48,9 +48,8 @@ from moulinette.utils.filesystem import (
chmod,
)
-from yunohost.utils.config import (
- ConfigPanel,
- ask_questions_and_parse_answers,
+from yunohost.utils.configpanel import ConfigPanel, ask_questions_and_parse_answers
+from yunohost.utils.form import (
DomainQuestion,
PathQuestion,
hydrate_questions_with_choices,
diff --git a/src/domain.py b/src/domain.py
index 7839b988d..9f38d6765 100644
--- a/src/domain.py
+++ b/src/domain.py
@@ -33,7 +33,8 @@ from yunohost.app import (
_get_conflicting_apps,
)
from yunohost.regenconf import regen_conf, _force_clear_hashes, _process_regen_conf
-from yunohost.utils.config import ConfigPanel, Question
+from yunohost.utils.configpanel import ConfigPanel
+from yunohost.utils.form import Question
from yunohost.utils.error import YunohostError, YunohostValidationError
from yunohost.log import is_unit_operation
diff --git a/src/settings.py b/src/settings.py
index 4905049d6..5d52329b3 100644
--- a/src/settings.py
+++ b/src/settings.py
@@ -21,7 +21,8 @@ import subprocess
from moulinette import m18n
from yunohost.utils.error import YunohostError, YunohostValidationError
-from yunohost.utils.config import ConfigPanel, Question
+from yunohost.utils.configpanel import ConfigPanel
+from yunohost.utils.form import Question
from moulinette.utils.log import getActionLogger
from yunohost.regenconf import regen_conf
from yunohost.firewall import firewall_reload
diff --git a/src/tests/test_questions.py b/src/tests/test_questions.py
index 8ded2e137..506fde077 100644
--- a/src/tests/test_questions.py
+++ b/src/tests/test_questions.py
@@ -14,7 +14,7 @@ from _pytest.mark.structures import ParameterSet
from moulinette import Moulinette
from yunohost import app, domain, user
-from yunohost.utils.config import (
+from yunohost.utils.form import (
ARGUMENTS_TYPE_PARSERS,
ask_questions_and_parse_answers,
DisplayTextQuestion,
diff --git a/src/utils/configpanel.py b/src/utils/configpanel.py
new file mode 100644
index 000000000..e50d0a3ec
--- /dev/null
+++ b/src/utils/configpanel.py
@@ -0,0 +1,694 @@
+#
+# Copyright (c) 2023 YunoHost Contributors
+#
+# This file is part of YunoHost (see https://yunohost.org)
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+#
+import glob
+import os
+import re
+import urllib.parse
+from collections import OrderedDict
+from typing import Union
+
+from moulinette.interfaces.cli import colorize
+from moulinette import Moulinette, m18n
+from moulinette.utils.log import getActionLogger
+from moulinette.utils.filesystem import (
+ read_toml,
+ read_yaml,
+ write_to_yaml,
+ mkdir,
+)
+
+from yunohost.utils.i18n import _value_for_locale
+from yunohost.utils.error import YunohostError, YunohostValidationError
+from yunohost.utils.form import (
+ ARGUMENTS_TYPE_PARSERS,
+ FileQuestion,
+ Question,
+ ask_questions_and_parse_answers,
+ evaluate_simple_js_expression,
+)
+
+logger = getActionLogger("yunohost.configpanel")
+CONFIG_PANEL_VERSION_SUPPORTED = 1.0
+
+
+class ConfigPanel:
+ entity_type = "config"
+ save_path_tpl: Union[str, None] = None
+ config_path_tpl = "/usr/share/yunohost/config_{entity_type}.toml"
+ save_mode = "full"
+
+ @classmethod
+ def list(cls):
+ """
+ List available config panel
+ """
+ try:
+ entities = [
+ re.match(
+ "^" + cls.save_path_tpl.format(entity="(?p)") + "$", f
+ ).group("entity")
+ for f in glob.glob(cls.save_path_tpl.format(entity="*"))
+ if os.path.isfile(f)
+ ]
+ except FileNotFoundError:
+ entities = []
+ return entities
+
+ def __init__(self, entity, config_path=None, save_path=None, creation=False):
+ self.entity = entity
+ self.config_path = config_path
+ if not config_path:
+ self.config_path = self.config_path_tpl.format(
+ entity=entity, entity_type=self.entity_type
+ )
+ self.save_path = save_path
+ if not save_path and self.save_path_tpl:
+ self.save_path = self.save_path_tpl.format(entity=entity)
+ self.config = {}
+ self.values = {}
+ self.new_values = {}
+
+ if (
+ self.save_path
+ and self.save_mode != "diff"
+ and not creation
+ and not os.path.exists(self.save_path)
+ ):
+ raise YunohostValidationError(
+ f"{self.entity_type}_unknown", **{self.entity_type: entity}
+ )
+ if self.save_path and creation and os.path.exists(self.save_path):
+ raise YunohostValidationError(
+ f"{self.entity_type}_exists", **{self.entity_type: entity}
+ )
+
+ # Search for hooks in the config panel
+ self.hooks = {
+ func: getattr(self, func)
+ for func in dir(self)
+ if callable(getattr(self, func))
+ and re.match("^(validate|post_ask)__", func)
+ }
+
+ def get(self, key="", mode="classic"):
+ self.filter_key = key or ""
+
+ # Read config panel toml
+ self._get_config_panel()
+
+ if not self.config:
+ raise YunohostValidationError("config_no_panel")
+
+ # Read or get values and hydrate the config
+ self._load_current_values()
+ self._hydrate()
+
+ # In 'classic' mode, we display the current value if key refer to an option
+ if self.filter_key.count(".") == 2 and mode == "classic":
+ option = self.filter_key.split(".")[-1]
+ value = self.values.get(option, None)
+
+ option_type = None
+ for _, _, option_ in self._iterate():
+ if option_["id"] == option:
+ option_type = ARGUMENTS_TYPE_PARSERS[option_["type"]]
+ break
+
+ return option_type.normalize(value) if option_type else value
+
+ # Format result in 'classic' or 'export' mode
+ logger.debug(f"Formating result in '{mode}' mode")
+ result = {}
+ for panel, section, option in self._iterate():
+ if section["is_action_section"] and mode != "full":
+ continue
+
+ key = f"{panel['id']}.{section['id']}.{option['id']}"
+ if mode == "export":
+ result[option["id"]] = option.get("current_value")
+ continue
+
+ ask = None
+ if "ask" in option:
+ ask = _value_for_locale(option["ask"])
+ elif "i18n" in self.config:
+ ask = m18n.n(self.config["i18n"] + "_" + option["id"])
+
+ if mode == "full":
+ option["ask"] = ask
+ question_class = ARGUMENTS_TYPE_PARSERS[option.get("type", "string")]
+ # FIXME : maybe other properties should be taken from the question, not just choices ?.
+ option["choices"] = question_class(option).choices
+ option["default"] = question_class(option).default
+ option["pattern"] = question_class(option).pattern
+ else:
+ result[key] = {"ask": ask}
+ if "current_value" in option:
+ question_class = ARGUMENTS_TYPE_PARSERS[
+ option.get("type", "string")
+ ]
+ result[key]["value"] = question_class.humanize(
+ option["current_value"], option
+ )
+ # FIXME: semantics, technically here this is not about a prompt...
+ if question_class.hide_user_input_in_prompt:
+ result[key][
+ "value"
+ ] = "**************" # Prevent displaying password in `config get`
+
+ if mode == "full":
+ return self.config
+ else:
+ return result
+
+ def list_actions(self):
+ actions = {}
+
+ # FIXME : meh, loading the entire config panel is again going to cause
+ # stupid issues for domain (e.g loading registrar stuff when willing to just list available actions ...)
+ self.filter_key = ""
+ self._get_config_panel()
+ for panel, section, option in self._iterate():
+ if option["type"] == "button":
+ key = f"{panel['id']}.{section['id']}.{option['id']}"
+ actions[key] = _value_for_locale(option["ask"])
+
+ return actions
+
+ def run_action(self, action=None, args=None, args_file=None, operation_logger=None):
+ #
+ # FIXME : this stuff looks a lot like set() ...
+ #
+
+ self.filter_key = ".".join(action.split(".")[:2])
+ action_id = action.split(".")[2]
+
+ # Read config panel toml
+ self._get_config_panel()
+
+ # FIXME: should also check that there's indeed a key called action
+ if not self.config:
+ raise YunohostValidationError(f"No action named {action}", raw_msg=True)
+
+ # Import and parse pre-answered options
+ logger.debug("Import and parse pre-answered options")
+ self._parse_pre_answered(args, None, args_file)
+
+ # Read or get values and hydrate the config
+ self._load_current_values()
+ self._hydrate()
+ Question.operation_logger = operation_logger
+ self._ask(action=action_id)
+
+ # FIXME: here, we could want to check constrains on
+ # the action's visibility / requirements wrt to the answer to questions ...
+
+ if operation_logger:
+ operation_logger.start()
+
+ try:
+ self._run_action(action_id)
+ except YunohostError:
+ raise
+ # Script got manually interrupted ...
+ # N.B. : KeyboardInterrupt does not inherit from Exception
+ except (KeyboardInterrupt, EOFError):
+ error = m18n.n("operation_interrupted")
+ logger.error(m18n.n("config_action_failed", action=action, error=error))
+ raise
+ # Something wrong happened in Yunohost's code (most probably hook_exec)
+ except Exception:
+ import traceback
+
+ error = m18n.n("unexpected_error", error="\n" + traceback.format_exc())
+ logger.error(m18n.n("config_action_failed", action=action, error=error))
+ raise
+ finally:
+ # Delete files uploaded from API
+ # FIXME : this is currently done in the context of config panels,
+ # but could also happen in the context of app install ... (or anywhere else
+ # where we may parse args etc...)
+ FileQuestion.clean_upload_dirs()
+
+ # FIXME: i18n
+ logger.success(f"Action {action_id} successful")
+ operation_logger.success()
+
+ def set(
+ self, key=None, value=None, args=None, args_file=None, operation_logger=None
+ ):
+ self.filter_key = key or ""
+
+ # Read config panel toml
+ self._get_config_panel()
+
+ if not self.config:
+ raise YunohostValidationError("config_no_panel")
+
+ if (args is not None or args_file is not None) and value is not None:
+ raise YunohostValidationError(
+ "You should either provide a value, or a serie of args/args_file, but not both at the same time",
+ raw_msg=True,
+ )
+
+ if self.filter_key.count(".") != 2 and value is not None:
+ raise YunohostValidationError("config_cant_set_value_on_section")
+
+ # Import and parse pre-answered options
+ logger.debug("Import and parse pre-answered options")
+ self._parse_pre_answered(args, value, args_file)
+
+ # Read or get values and hydrate the config
+ self._load_current_values()
+ self._hydrate()
+ Question.operation_logger = operation_logger
+ self._ask()
+
+ if operation_logger:
+ operation_logger.start()
+
+ try:
+ self._apply()
+ except YunohostError:
+ raise
+ # Script got manually interrupted ...
+ # N.B. : KeyboardInterrupt does not inherit from Exception
+ except (KeyboardInterrupt, EOFError):
+ error = m18n.n("operation_interrupted")
+ logger.error(m18n.n("config_apply_failed", error=error))
+ raise
+ # Something wrong happened in Yunohost's code (most probably hook_exec)
+ except Exception:
+ import traceback
+
+ error = m18n.n("unexpected_error", error="\n" + traceback.format_exc())
+ logger.error(m18n.n("config_apply_failed", error=error))
+ raise
+ finally:
+ # Delete files uploaded from API
+ # FIXME : this is currently done in the context of config panels,
+ # but could also happen in the context of app install ... (or anywhere else
+ # where we may parse args etc...)
+ FileQuestion.clean_upload_dirs()
+
+ self._reload_services()
+
+ logger.success("Config updated as expected")
+ operation_logger.success()
+
+ def _get_toml(self):
+ return read_toml(self.config_path)
+
+ def _get_config_panel(self):
+ # Split filter_key
+ filter_key = self.filter_key.split(".") if self.filter_key != "" else []
+ if len(filter_key) > 3:
+ raise YunohostError(
+ f"The filter key {filter_key} has too many sub-levels, the max is 3.",
+ raw_msg=True,
+ )
+
+ if not os.path.exists(self.config_path):
+ logger.debug(f"Config panel {self.config_path} doesn't exists")
+ return None
+
+ toml_config_panel = self._get_toml()
+
+ # Check TOML config panel is in a supported version
+ if float(toml_config_panel["version"]) < CONFIG_PANEL_VERSION_SUPPORTED:
+ logger.error(
+ f"Config panels version {toml_config_panel['version']} are not supported"
+ )
+ return None
+
+ # Transform toml format into internal format
+ format_description = {
+ "root": {
+ "properties": ["version", "i18n"],
+ "defaults": {"version": 1.0},
+ },
+ "panels": {
+ "properties": ["name", "services", "actions", "help"],
+ "defaults": {
+ "services": [],
+ "actions": {"apply": {"en": "Apply"}},
+ },
+ },
+ "sections": {
+ "properties": ["name", "services", "optional", "help", "visible"],
+ "defaults": {
+ "name": "",
+ "services": [],
+ "optional": True,
+ "is_action_section": False,
+ },
+ },
+ "options": {
+ "properties": [
+ "ask",
+ "type",
+ "bind",
+ "help",
+ "example",
+ "default",
+ "style",
+ "icon",
+ "placeholder",
+ "visible",
+ "optional",
+ "choices",
+ "yes",
+ "no",
+ "pattern",
+ "limit",
+ "min",
+ "max",
+ "step",
+ "accept",
+ "redact",
+ "filter",
+ "readonly",
+ "enabled",
+ # "confirm", # TODO: to ask confirmation before running an action
+ ],
+ "defaults": {},
+ },
+ }
+
+ def _build_internal_config_panel(raw_infos, level):
+ """Convert TOML in internal format ('full' mode used by webadmin)
+ Here are some properties of 1.0 config panel in toml:
+ - node properties and node children are mixed,
+ - text are in english only
+ - some properties have default values
+ This function detects all children nodes and put them in a list
+ """
+
+ defaults = format_description[level]["defaults"]
+ properties = format_description[level]["properties"]
+
+ # Start building the ouput (merging the raw infos + defaults)
+ out = {key: raw_infos.get(key, value) for key, value in defaults.items()}
+
+ # Now fill the sublevels (+ apply filter_key)
+ i = list(format_description).index(level)
+ sublevel = list(format_description)[i + 1] if level != "options" else None
+ search_key = filter_key[i] if len(filter_key) > i else False
+
+ for key, value in raw_infos.items():
+ # Key/value are a child node
+ if (
+ isinstance(value, OrderedDict)
+ and key not in properties
+ and sublevel
+ ):
+ # We exclude all nodes not referenced by the filter_key
+ if search_key and key != search_key:
+ continue
+ subnode = _build_internal_config_panel(value, sublevel)
+ subnode["id"] = key
+ if level == "root":
+ subnode.setdefault("name", {"en": key.capitalize()})
+ elif level == "sections":
+ subnode["name"] = key # legacy
+ subnode.setdefault("optional", raw_infos.get("optional", True))
+ # If this section contains at least one button, it becomes an "action" section
+ if subnode.get("type") == "button":
+ out["is_action_section"] = True
+ out.setdefault(sublevel, []).append(subnode)
+ # Key/value are a property
+ else:
+ if key not in properties:
+ logger.warning(f"Unknown key '{key}' found in config panel")
+ # Todo search all i18n keys
+ out[key] = (
+ value
+ if key not in ["ask", "help", "name"] or isinstance(value, dict)
+ else {"en": value}
+ )
+ return out
+
+ self.config = _build_internal_config_panel(toml_config_panel, "root")
+
+ try:
+ self.config["panels"][0]["sections"][0]["options"][0]
+ except (KeyError, IndexError):
+ raise YunohostValidationError(
+ "config_unknown_filter_key", filter_key=self.filter_key
+ )
+
+ # List forbidden keywords from helpers and sections toml (to avoid conflict)
+ forbidden_keywords = [
+ "old",
+ "app",
+ "changed",
+ "file_hash",
+ "binds",
+ "types",
+ "formats",
+ "getter",
+ "setter",
+ "short_setting",
+ "type",
+ "bind",
+ "nothing_changed",
+ "changes_validated",
+ "result",
+ "max_progression",
+ ]
+ forbidden_keywords += format_description["sections"]
+ forbidden_readonly_types = ["password", "app", "domain", "user", "file"]
+
+ for _, _, option in self._iterate():
+ if option["id"] in forbidden_keywords:
+ raise YunohostError("config_forbidden_keyword", keyword=option["id"])
+ if (
+ option.get("readonly", False)
+ and option.get("type", "string") in forbidden_readonly_types
+ ):
+ raise YunohostError(
+ "config_forbidden_readonly_type",
+ type=option["type"],
+ id=option["id"],
+ )
+
+ return self.config
+
+ def _hydrate(self):
+ # Hydrating config panel with current value
+ for _, section, option in self._iterate():
+ if option["id"] not in self.values:
+ allowed_empty_types = [
+ "alert",
+ "display_text",
+ "markdown",
+ "file",
+ "button",
+ ]
+
+ if section["is_action_section"] and option.get("default") is not None:
+ self.values[option["id"]] = option["default"]
+ elif (
+ option["type"] in allowed_empty_types
+ or option.get("bind") == "null"
+ ):
+ continue
+ else:
+ raise YunohostError(
+ f"Config panel question '{option['id']}' should be initialized with a value during install or upgrade.",
+ raw_msg=True,
+ )
+ value = self.values[option["name"]]
+
+ # Allow to use value instead of current_value in app config script.
+ # e.g. apps may write `echo 'value: "foobar"'` in the config file (which is more intuitive that `echo 'current_value: "foobar"'`
+ # For example hotspot used it...
+ # See https://github.com/YunoHost/yunohost/pull/1546
+ if (
+ isinstance(value, dict)
+ and "value" in value
+ and "current_value" not in value
+ ):
+ value["current_value"] = value["value"]
+
+ # In general, the value is just a simple value.
+ # Sometimes it could be a dict used to overwrite the option itself
+ value = value if isinstance(value, dict) else {"current_value": value}
+ option.update(value)
+
+ self.values[option["id"]] = value.get("current_value")
+
+ return self.values
+
+ def _ask(self, action=None):
+ logger.debug("Ask unanswered question and prevalidate data")
+
+ if "i18n" in self.config:
+ for panel, section, option in self._iterate():
+ if "ask" not in option:
+ option["ask"] = m18n.n(self.config["i18n"] + "_" + option["id"])
+ # auto add i18n help text if present in locales
+ if m18n.key_exists(self.config["i18n"] + "_" + option["id"] + "_help"):
+ option["help"] = m18n.n(
+ self.config["i18n"] + "_" + option["id"] + "_help"
+ )
+
+ def display_header(message):
+ """CLI panel/section header display"""
+ if Moulinette.interface.type == "cli" and self.filter_key.count(".") < 2:
+ Moulinette.display(colorize(message, "purple"))
+
+ for panel, section, obj in self._iterate(["panel", "section"]):
+ if (
+ section
+ and section.get("visible")
+ and not evaluate_simple_js_expression(
+ section["visible"], context=self.future_values
+ )
+ ):
+ continue
+
+ # Ugly hack to skip action section ... except when when explicitly running actions
+ if not action:
+ if section and section["is_action_section"]:
+ continue
+
+ if panel == obj:
+ name = _value_for_locale(panel["name"])
+ display_header(f"\n{'='*40}\n>>>> {name}\n{'='*40}")
+ else:
+ name = _value_for_locale(section["name"])
+ if name:
+ display_header(f"\n# {name}")
+ elif section:
+ # filter action section options in case of multiple buttons
+ section["options"] = [
+ option
+ for option in section["options"]
+ if option.get("type", "string") != "button"
+ or option["id"] == action
+ ]
+
+ if panel == obj:
+ continue
+
+ # Check and ask unanswered questions
+ prefilled_answers = self.args.copy()
+ prefilled_answers.update(self.new_values)
+
+ questions = ask_questions_and_parse_answers(
+ {question["name"]: question for question in section["options"]},
+ prefilled_answers=prefilled_answers,
+ current_values=self.values,
+ hooks=self.hooks,
+ )
+ self.new_values.update(
+ {
+ question.name: question.value
+ for question in questions
+ if question.value is not None
+ }
+ )
+
+ def _get_default_values(self):
+ return {
+ option["id"]: option["default"]
+ for _, _, option in self._iterate()
+ if "default" in option
+ }
+
+ @property
+ def future_values(self):
+ return {**self.values, **self.new_values}
+
+ def __getattr__(self, name):
+ if "new_values" in self.__dict__ and name in self.new_values:
+ return self.new_values[name]
+
+ if "values" in self.__dict__ and name in self.values:
+ return self.values[name]
+
+ return self.__dict__[name]
+
+ def _load_current_values(self):
+ """
+ Retrieve entries in YAML file
+ And set default values if needed
+ """
+
+ # Inject defaults if needed (using the magic .update() ;))
+ self.values = self._get_default_values()
+
+ # Retrieve entries in the YAML
+ if os.path.exists(self.save_path) and os.path.isfile(self.save_path):
+ self.values.update(read_yaml(self.save_path) or {})
+
+ def _parse_pre_answered(self, args, value, args_file):
+ args = urllib.parse.parse_qs(args or "", keep_blank_values=True)
+ self.args = {key: ",".join(value_) for key, value_ in args.items()}
+
+ if args_file:
+ # Import YAML / JSON file but keep --args values
+ self.args = {**read_yaml(args_file), **self.args}
+
+ if value is not None:
+ self.args = {self.filter_key.split(".")[-1]: value}
+
+ def _apply(self):
+ logger.info("Saving the new configuration...")
+ dir_path = os.path.dirname(os.path.realpath(self.save_path))
+ if not os.path.exists(dir_path):
+ mkdir(dir_path, mode=0o700)
+
+ values_to_save = self.future_values
+ if self.save_mode == "diff":
+ defaults = self._get_default_values()
+ values_to_save = {
+ k: v for k, v in values_to_save.items() if defaults.get(k) != v
+ }
+
+ # Save the settings to the .yaml file
+ write_to_yaml(self.save_path, values_to_save)
+
+ def _reload_services(self):
+ from yunohost.service import service_reload_or_restart
+
+ services_to_reload = set()
+ for panel, section, obj in self._iterate(["panel", "section", "option"]):
+ services_to_reload |= set(obj.get("services", []))
+
+ services_to_reload = list(services_to_reload)
+ services_to_reload.sort(key="nginx".__eq__)
+ if services_to_reload:
+ logger.info("Reloading services...")
+ for service in services_to_reload:
+ if hasattr(self, "entity"):
+ service = service.replace("__APP__", self.entity)
+ service_reload_or_restart(service)
+
+ def _iterate(self, trigger=["option"]):
+ for panel in self.config.get("panels", []):
+ if "panel" in trigger:
+ yield (panel, None, panel)
+ for section in panel.get("sections", []):
+ if "section" in trigger:
+ yield (panel, section, section)
+ if "option" in trigger:
+ for option in section.get("options", []):
+ yield (panel, section, option)
diff --git a/src/utils/config.py b/src/utils/form.py
similarity index 57%
rename from src/utils/config.py
rename to src/utils/form.py
index a48883c38..9907dafb1 100644
--- a/src/utils/config.py
+++ b/src/utils/form.py
@@ -16,7 +16,6 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
#
-import glob
import os
import re
import urllib.parse
@@ -24,7 +23,6 @@ import tempfile
import shutil
import ast
import operator as op
-from collections import OrderedDict
from typing import Optional, Dict, List, Union, Any, Mapping, Callable
from moulinette.interfaces.cli import colorize
@@ -33,18 +31,13 @@ from moulinette.utils.log import getActionLogger
from moulinette.utils.filesystem import (
read_file,
write_to_file,
- read_toml,
- read_yaml,
- write_to_yaml,
- mkdir,
)
from yunohost.utils.i18n import _value_for_locale
from yunohost.utils.error import YunohostError, YunohostValidationError
from yunohost.log import OperationLogger
-logger = getActionLogger("yunohost.config")
-CONFIG_PANEL_VERSION_SUPPORTED = 1.0
+logger = getActionLogger("yunohost.form")
# Those js-like evaluate functions are used to eval safely visible attributes
@@ -190,653 +183,6 @@ def evaluate_simple_js_expression(expr, context={}):
return evaluate_simple_ast(node, context)
-class ConfigPanel:
- entity_type = "config"
- save_path_tpl: Union[str, None] = None
- config_path_tpl = "/usr/share/yunohost/config_{entity_type}.toml"
- save_mode = "full"
-
- @classmethod
- def list(cls):
- """
- List available config panel
- """
- try:
- entities = [
- re.match(
- "^" + cls.save_path_tpl.format(entity="(?p)") + "$", f
- ).group("entity")
- for f in glob.glob(cls.save_path_tpl.format(entity="*"))
- if os.path.isfile(f)
- ]
- except FileNotFoundError:
- entities = []
- return entities
-
- def __init__(self, entity, config_path=None, save_path=None, creation=False):
- self.entity = entity
- self.config_path = config_path
- if not config_path:
- self.config_path = self.config_path_tpl.format(
- entity=entity, entity_type=self.entity_type
- )
- self.save_path = save_path
- if not save_path and self.save_path_tpl:
- self.save_path = self.save_path_tpl.format(entity=entity)
- self.config = {}
- self.values = {}
- self.new_values = {}
-
- if (
- self.save_path
- and self.save_mode != "diff"
- and not creation
- and not os.path.exists(self.save_path)
- ):
- raise YunohostValidationError(
- f"{self.entity_type}_unknown", **{self.entity_type: entity}
- )
- if self.save_path and creation and os.path.exists(self.save_path):
- raise YunohostValidationError(
- f"{self.entity_type}_exists", **{self.entity_type: entity}
- )
-
- # Search for hooks in the config panel
- self.hooks = {
- func: getattr(self, func)
- for func in dir(self)
- if callable(getattr(self, func))
- and re.match("^(validate|post_ask)__", func)
- }
-
- def get(self, key="", mode="classic"):
- self.filter_key = key or ""
-
- # Read config panel toml
- self._get_config_panel()
-
- if not self.config:
- raise YunohostValidationError("config_no_panel")
-
- # Read or get values and hydrate the config
- self._load_current_values()
- self._hydrate()
-
- # In 'classic' mode, we display the current value if key refer to an option
- if self.filter_key.count(".") == 2 and mode == "classic":
- option = self.filter_key.split(".")[-1]
- value = self.values.get(option, None)
-
- option_type = None
- for _, _, option_ in self._iterate():
- if option_["id"] == option:
- option_type = ARGUMENTS_TYPE_PARSERS[option_["type"]]
- break
-
- return option_type.normalize(value) if option_type else value
-
- # Format result in 'classic' or 'export' mode
- logger.debug(f"Formating result in '{mode}' mode")
- result = {}
- for panel, section, option in self._iterate():
- if section["is_action_section"] and mode != "full":
- continue
-
- key = f"{panel['id']}.{section['id']}.{option['id']}"
- if mode == "export":
- result[option["id"]] = option.get("current_value")
- continue
-
- ask = None
- if "ask" in option:
- ask = _value_for_locale(option["ask"])
- elif "i18n" in self.config:
- ask = m18n.n(self.config["i18n"] + "_" + option["id"])
-
- if mode == "full":
- option["ask"] = ask
- question_class = ARGUMENTS_TYPE_PARSERS[option.get("type", "string")]
- # FIXME : maybe other properties should be taken from the question, not just choices ?.
- option["choices"] = question_class(option).choices
- option["default"] = question_class(option).default
- option["pattern"] = question_class(option).pattern
- else:
- result[key] = {"ask": ask}
- if "current_value" in option:
- question_class = ARGUMENTS_TYPE_PARSERS[
- option.get("type", "string")
- ]
- result[key]["value"] = question_class.humanize(
- option["current_value"], option
- )
- # FIXME: semantics, technically here this is not about a prompt...
- if question_class.hide_user_input_in_prompt:
- result[key][
- "value"
- ] = "**************" # Prevent displaying password in `config get`
-
- if mode == "full":
- return self.config
- else:
- return result
-
- def list_actions(self):
- actions = {}
-
- # FIXME : meh, loading the entire config panel is again going to cause
- # stupid issues for domain (e.g loading registrar stuff when willing to just list available actions ...)
- self.filter_key = ""
- self._get_config_panel()
- for panel, section, option in self._iterate():
- if option["type"] == "button":
- key = f"{panel['id']}.{section['id']}.{option['id']}"
- actions[key] = _value_for_locale(option["ask"])
-
- return actions
-
- def run_action(self, action=None, args=None, args_file=None, operation_logger=None):
- #
- # FIXME : this stuff looks a lot like set() ...
- #
-
- self.filter_key = ".".join(action.split(".")[:2])
- action_id = action.split(".")[2]
-
- # Read config panel toml
- self._get_config_panel()
-
- # FIXME: should also check that there's indeed a key called action
- if not self.config:
- raise YunohostValidationError(f"No action named {action}", raw_msg=True)
-
- # Import and parse pre-answered options
- logger.debug("Import and parse pre-answered options")
- self._parse_pre_answered(args, None, args_file)
-
- # Read or get values and hydrate the config
- self._load_current_values()
- self._hydrate()
- Question.operation_logger = operation_logger
- self._ask(action=action_id)
-
- # FIXME: here, we could want to check constrains on
- # the action's visibility / requirements wrt to the answer to questions ...
-
- if operation_logger:
- operation_logger.start()
-
- try:
- self._run_action(action_id)
- except YunohostError:
- raise
- # Script got manually interrupted ...
- # N.B. : KeyboardInterrupt does not inherit from Exception
- except (KeyboardInterrupt, EOFError):
- error = m18n.n("operation_interrupted")
- logger.error(m18n.n("config_action_failed", action=action, error=error))
- raise
- # Something wrong happened in Yunohost's code (most probably hook_exec)
- except Exception:
- import traceback
-
- error = m18n.n("unexpected_error", error="\n" + traceback.format_exc())
- logger.error(m18n.n("config_action_failed", action=action, error=error))
- raise
- finally:
- # Delete files uploaded from API
- # FIXME : this is currently done in the context of config panels,
- # but could also happen in the context of app install ... (or anywhere else
- # where we may parse args etc...)
- FileQuestion.clean_upload_dirs()
-
- # FIXME: i18n
- logger.success(f"Action {action_id} successful")
- operation_logger.success()
-
- def set(
- self, key=None, value=None, args=None, args_file=None, operation_logger=None
- ):
- self.filter_key = key or ""
-
- # Read config panel toml
- self._get_config_panel()
-
- if not self.config:
- raise YunohostValidationError("config_no_panel")
-
- if (args is not None or args_file is not None) and value is not None:
- raise YunohostValidationError(
- "You should either provide a value, or a serie of args/args_file, but not both at the same time",
- raw_msg=True,
- )
-
- if self.filter_key.count(".") != 2 and value is not None:
- raise YunohostValidationError("config_cant_set_value_on_section")
-
- # Import and parse pre-answered options
- logger.debug("Import and parse pre-answered options")
- self._parse_pre_answered(args, value, args_file)
-
- # Read or get values and hydrate the config
- self._load_current_values()
- self._hydrate()
- Question.operation_logger = operation_logger
- self._ask()
-
- if operation_logger:
- operation_logger.start()
-
- try:
- self._apply()
- except YunohostError:
- raise
- # Script got manually interrupted ...
- # N.B. : KeyboardInterrupt does not inherit from Exception
- except (KeyboardInterrupt, EOFError):
- error = m18n.n("operation_interrupted")
- logger.error(m18n.n("config_apply_failed", error=error))
- raise
- # Something wrong happened in Yunohost's code (most probably hook_exec)
- except Exception:
- import traceback
-
- error = m18n.n("unexpected_error", error="\n" + traceback.format_exc())
- logger.error(m18n.n("config_apply_failed", error=error))
- raise
- finally:
- # Delete files uploaded from API
- # FIXME : this is currently done in the context of config panels,
- # but could also happen in the context of app install ... (or anywhere else
- # where we may parse args etc...)
- FileQuestion.clean_upload_dirs()
-
- self._reload_services()
-
- logger.success("Config updated as expected")
- operation_logger.success()
-
- def _get_toml(self):
- return read_toml(self.config_path)
-
- def _get_config_panel(self):
- # Split filter_key
- filter_key = self.filter_key.split(".") if self.filter_key != "" else []
- if len(filter_key) > 3:
- raise YunohostError(
- f"The filter key {filter_key} has too many sub-levels, the max is 3.",
- raw_msg=True,
- )
-
- if not os.path.exists(self.config_path):
- logger.debug(f"Config panel {self.config_path} doesn't exists")
- return None
-
- toml_config_panel = self._get_toml()
-
- # Check TOML config panel is in a supported version
- if float(toml_config_panel["version"]) < CONFIG_PANEL_VERSION_SUPPORTED:
- logger.error(
- f"Config panels version {toml_config_panel['version']} are not supported"
- )
- return None
-
- # Transform toml format into internal format
- format_description = {
- "root": {
- "properties": ["version", "i18n"],
- "defaults": {"version": 1.0},
- },
- "panels": {
- "properties": ["name", "services", "actions", "help"],
- "defaults": {
- "services": [],
- "actions": {"apply": {"en": "Apply"}},
- },
- },
- "sections": {
- "properties": ["name", "services", "optional", "help", "visible"],
- "defaults": {
- "name": "",
- "services": [],
- "optional": True,
- "is_action_section": False,
- },
- },
- "options": {
- "properties": [
- "ask",
- "type",
- "bind",
- "help",
- "example",
- "default",
- "style",
- "icon",
- "placeholder",
- "visible",
- "optional",
- "choices",
- "yes",
- "no",
- "pattern",
- "limit",
- "min",
- "max",
- "step",
- "accept",
- "redact",
- "filter",
- "readonly",
- "enabled",
- # "confirm", # TODO: to ask confirmation before running an action
- ],
- "defaults": {},
- },
- }
-
- def _build_internal_config_panel(raw_infos, level):
- """Convert TOML in internal format ('full' mode used by webadmin)
- Here are some properties of 1.0 config panel in toml:
- - node properties and node children are mixed,
- - text are in english only
- - some properties have default values
- This function detects all children nodes and put them in a list
- """
-
- defaults = format_description[level]["defaults"]
- properties = format_description[level]["properties"]
-
- # Start building the ouput (merging the raw infos + defaults)
- out = {key: raw_infos.get(key, value) for key, value in defaults.items()}
-
- # Now fill the sublevels (+ apply filter_key)
- i = list(format_description).index(level)
- sublevel = list(format_description)[i + 1] if level != "options" else None
- search_key = filter_key[i] if len(filter_key) > i else False
-
- for key, value in raw_infos.items():
- # Key/value are a child node
- if (
- isinstance(value, OrderedDict)
- and key not in properties
- and sublevel
- ):
- # We exclude all nodes not referenced by the filter_key
- if search_key and key != search_key:
- continue
- subnode = _build_internal_config_panel(value, sublevel)
- subnode["id"] = key
- if level == "root":
- subnode.setdefault("name", {"en": key.capitalize()})
- elif level == "sections":
- subnode["name"] = key # legacy
- subnode.setdefault("optional", raw_infos.get("optional", True))
- # If this section contains at least one button, it becomes an "action" section
- if subnode.get("type") == "button":
- out["is_action_section"] = True
- out.setdefault(sublevel, []).append(subnode)
- # Key/value are a property
- else:
- if key not in properties:
- logger.warning(f"Unknown key '{key}' found in config panel")
- # Todo search all i18n keys
- out[key] = (
- value
- if key not in ["ask", "help", "name"] or isinstance(value, dict)
- else {"en": value}
- )
- return out
-
- self.config = _build_internal_config_panel(toml_config_panel, "root")
-
- try:
- self.config["panels"][0]["sections"][0]["options"][0]
- except (KeyError, IndexError):
- raise YunohostValidationError(
- "config_unknown_filter_key", filter_key=self.filter_key
- )
-
- # List forbidden keywords from helpers and sections toml (to avoid conflict)
- forbidden_keywords = [
- "old",
- "app",
- "changed",
- "file_hash",
- "binds",
- "types",
- "formats",
- "getter",
- "setter",
- "short_setting",
- "type",
- "bind",
- "nothing_changed",
- "changes_validated",
- "result",
- "max_progression",
- ]
- forbidden_keywords += format_description["sections"]
- forbidden_readonly_types = ["password", "app", "domain", "user", "file"]
-
- for _, _, option in self._iterate():
- if option["id"] in forbidden_keywords:
- raise YunohostError("config_forbidden_keyword", keyword=option["id"])
- if (
- option.get("readonly", False)
- and option.get("type", "string") in forbidden_readonly_types
- ):
- raise YunohostError(
- "config_forbidden_readonly_type",
- type=option["type"],
- id=option["id"],
- )
-
- return self.config
-
- def _hydrate(self):
- # Hydrating config panel with current value
- for _, section, option in self._iterate():
- if option["id"] not in self.values:
- allowed_empty_types = [
- "alert",
- "display_text",
- "markdown",
- "file",
- "button",
- ]
-
- if section["is_action_section"] and option.get("default") is not None:
- self.values[option["id"]] = option["default"]
- elif (
- option["type"] in allowed_empty_types
- or option.get("bind") == "null"
- ):
- continue
- else:
- raise YunohostError(
- f"Config panel question '{option['id']}' should be initialized with a value during install or upgrade.",
- raw_msg=True,
- )
- value = self.values[option["name"]]
-
- # Allow to use value instead of current_value in app config script.
- # e.g. apps may write `echo 'value: "foobar"'` in the config file (which is more intuitive that `echo 'current_value: "foobar"'`
- # For example hotspot used it...
- # See https://github.com/YunoHost/yunohost/pull/1546
- if (
- isinstance(value, dict)
- and "value" in value
- and "current_value" not in value
- ):
- value["current_value"] = value["value"]
-
- # In general, the value is just a simple value.
- # Sometimes it could be a dict used to overwrite the option itself
- value = value if isinstance(value, dict) else {"current_value": value}
- option.update(value)
-
- self.values[option["id"]] = value.get("current_value")
-
- return self.values
-
- def _ask(self, action=None):
- logger.debug("Ask unanswered question and prevalidate data")
-
- if "i18n" in self.config:
- for panel, section, option in self._iterate():
- if "ask" not in option:
- option["ask"] = m18n.n(self.config["i18n"] + "_" + option["id"])
- # auto add i18n help text if present in locales
- if m18n.key_exists(self.config["i18n"] + "_" + option["id"] + "_help"):
- option["help"] = m18n.n(
- self.config["i18n"] + "_" + option["id"] + "_help"
- )
-
- def display_header(message):
- """CLI panel/section header display"""
- if Moulinette.interface.type == "cli" and self.filter_key.count(".") < 2:
- Moulinette.display(colorize(message, "purple"))
-
- for panel, section, obj in self._iterate(["panel", "section"]):
- if (
- section
- and section.get("visible")
- and not evaluate_simple_js_expression(
- section["visible"], context=self.future_values
- )
- ):
- continue
-
- # Ugly hack to skip action section ... except when when explicitly running actions
- if not action:
- if section and section["is_action_section"]:
- continue
-
- if panel == obj:
- name = _value_for_locale(panel["name"])
- display_header(f"\n{'='*40}\n>>>> {name}\n{'='*40}")
- else:
- name = _value_for_locale(section["name"])
- if name:
- display_header(f"\n# {name}")
- elif section:
- # filter action section options in case of multiple buttons
- section["options"] = [
- option
- for option in section["options"]
- if option.get("type", "string") != "button"
- or option["id"] == action
- ]
-
- if panel == obj:
- continue
-
- # Check and ask unanswered questions
- prefilled_answers = self.args.copy()
- prefilled_answers.update(self.new_values)
-
- questions = ask_questions_and_parse_answers(
- {question["name"]: question for question in section["options"]},
- prefilled_answers=prefilled_answers,
- current_values=self.values,
- hooks=self.hooks,
- )
- self.new_values.update(
- {
- question.name: question.value
- for question in questions
- if question.value is not None
- }
- )
-
- def _get_default_values(self):
- return {
- option["id"]: option["default"]
- for _, _, option in self._iterate()
- if "default" in option
- }
-
- @property
- def future_values(self):
- return {**self.values, **self.new_values}
-
- def __getattr__(self, name):
- if "new_values" in self.__dict__ and name in self.new_values:
- return self.new_values[name]
-
- if "values" in self.__dict__ and name in self.values:
- return self.values[name]
-
- return self.__dict__[name]
-
- def _load_current_values(self):
- """
- Retrieve entries in YAML file
- And set default values if needed
- """
-
- # Inject defaults if needed (using the magic .update() ;))
- self.values = self._get_default_values()
-
- # Retrieve entries in the YAML
- if os.path.exists(self.save_path) and os.path.isfile(self.save_path):
- self.values.update(read_yaml(self.save_path) or {})
-
- def _parse_pre_answered(self, args, value, args_file):
- args = urllib.parse.parse_qs(args or "", keep_blank_values=True)
- self.args = {key: ",".join(value_) for key, value_ in args.items()}
-
- if args_file:
- # Import YAML / JSON file but keep --args values
- self.args = {**read_yaml(args_file), **self.args}
-
- if value is not None:
- self.args = {self.filter_key.split(".")[-1]: value}
-
- def _apply(self):
- logger.info("Saving the new configuration...")
- dir_path = os.path.dirname(os.path.realpath(self.save_path))
- if not os.path.exists(dir_path):
- mkdir(dir_path, mode=0o700)
-
- values_to_save = self.future_values
- if self.save_mode == "diff":
- defaults = self._get_default_values()
- values_to_save = {
- k: v for k, v in values_to_save.items() if defaults.get(k) != v
- }
-
- # Save the settings to the .yaml file
- write_to_yaml(self.save_path, values_to_save)
-
- def _reload_services(self):
- from yunohost.service import service_reload_or_restart
-
- services_to_reload = set()
- for panel, section, obj in self._iterate(["panel", "section", "option"]):
- services_to_reload |= set(obj.get("services", []))
-
- services_to_reload = list(services_to_reload)
- services_to_reload.sort(key="nginx".__eq__)
- if services_to_reload:
- logger.info("Reloading services...")
- for service in services_to_reload:
- if hasattr(self, "entity"):
- service = service.replace("__APP__", self.entity)
- service_reload_or_restart(service)
-
- def _iterate(self, trigger=["option"]):
- for panel in self.config.get("panels", []):
- if "panel" in trigger:
- yield (panel, None, panel)
- for section in panel.get("sections", []):
- if "section" in trigger:
- yield (panel, section, section)
- if "option" in trigger:
- for option in section.get("options", []):
- yield (panel, section, option)
-
-
class Question:
hide_user_input_in_prompt = False
pattern: Optional[Dict] = None