Merge branches 'split-config-form' and 'split-config-cp' into split-config

This commit is contained in:
axolotle 2023-04-04 15:55:08 +02:00
commit f79cfcc067

687
src/utils/configpanel.py Normal file
View file

@ -0,0 +1,687 @@
#
# Copyright (c) 2023 YunoHost Contributors
#
# This file is part of YunoHost (see https://yunohost.org)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import glob
import os
import re
import urllib.parse
from collections import OrderedDict
from typing import Union
from moulinette.interfaces.cli import colorize
from moulinette import Moulinette, m18n
from moulinette.utils.log import getActionLogger
from moulinette.utils.filesystem import (
read_toml,
read_yaml,
write_to_yaml,
mkdir,
)
from yunohost.utils.i18n import _value_for_locale
from yunohost.utils.error import YunohostError, YunohostValidationError
logger = getActionLogger("yunohost.configpanel")
CONFIG_PANEL_VERSION_SUPPORTED = 1.0
class ConfigPanel:
entity_type = "config"
save_path_tpl: Union[str, None] = None
config_path_tpl = "/usr/share/yunohost/config_{entity_type}.toml"
save_mode = "full"
@classmethod
def list(cls):
"""
List available config panel
"""
try:
entities = [
re.match(
"^" + cls.save_path_tpl.format(entity="(?p<entity>)") + "$", f
).group("entity")
for f in glob.glob(cls.save_path_tpl.format(entity="*"))
if os.path.isfile(f)
]
except FileNotFoundError:
entities = []
return entities
def __init__(self, entity, config_path=None, save_path=None, creation=False):
self.entity = entity
self.config_path = config_path
if not config_path:
self.config_path = self.config_path_tpl.format(
entity=entity, entity_type=self.entity_type
)
self.save_path = save_path
if not save_path and self.save_path_tpl:
self.save_path = self.save_path_tpl.format(entity=entity)
self.config = {}
self.values = {}
self.new_values = {}
if (
self.save_path
and self.save_mode != "diff"
and not creation
and not os.path.exists(self.save_path)
):
raise YunohostValidationError(
f"{self.entity_type}_unknown", **{self.entity_type: entity}
)
if self.save_path and creation and os.path.exists(self.save_path):
raise YunohostValidationError(
f"{self.entity_type}_exists", **{self.entity_type: entity}
)
# Search for hooks in the config panel
self.hooks = {
func: getattr(self, func)
for func in dir(self)
if callable(getattr(self, func))
and re.match("^(validate|post_ask)__", func)
}
def get(self, key="", mode="classic"):
self.filter_key = key or ""
# Read config panel toml
self._get_config_panel()
if not self.config:
raise YunohostValidationError("config_no_panel")
# Read or get values and hydrate the config
self._load_current_values()
self._hydrate()
# In 'classic' mode, we display the current value if key refer to an option
if self.filter_key.count(".") == 2 and mode == "classic":
option = self.filter_key.split(".")[-1]
value = self.values.get(option, None)
option_type = None
for _, _, option_ in self._iterate():
if option_["id"] == option:
option_type = ARGUMENTS_TYPE_PARSERS[option_["type"]]
break
return option_type.normalize(value) if option_type else value
# Format result in 'classic' or 'export' mode
logger.debug(f"Formating result in '{mode}' mode")
result = {}
for panel, section, option in self._iterate():
if section["is_action_section"] and mode != "full":
continue
key = f"{panel['id']}.{section['id']}.{option['id']}"
if mode == "export":
result[option["id"]] = option.get("current_value")
continue
ask = None
if "ask" in option:
ask = _value_for_locale(option["ask"])
elif "i18n" in self.config:
ask = m18n.n(self.config["i18n"] + "_" + option["id"])
if mode == "full":
option["ask"] = ask
question_class = ARGUMENTS_TYPE_PARSERS[option.get("type", "string")]
# FIXME : maybe other properties should be taken from the question, not just choices ?.
option["choices"] = question_class(option).choices
option["default"] = question_class(option).default
option["pattern"] = question_class(option).pattern
else:
result[key] = {"ask": ask}
if "current_value" in option:
question_class = ARGUMENTS_TYPE_PARSERS[
option.get("type", "string")
]
result[key]["value"] = question_class.humanize(
option["current_value"], option
)
# FIXME: semantics, technically here this is not about a prompt...
if question_class.hide_user_input_in_prompt:
result[key][
"value"
] = "**************" # Prevent displaying password in `config get`
if mode == "full":
return self.config
else:
return result
def list_actions(self):
actions = {}
# FIXME : meh, loading the entire config panel is again going to cause
# stupid issues for domain (e.g loading registrar stuff when willing to just list available actions ...)
self.filter_key = ""
self._get_config_panel()
for panel, section, option in self._iterate():
if option["type"] == "button":
key = f"{panel['id']}.{section['id']}.{option['id']}"
actions[key] = _value_for_locale(option["ask"])
return actions
def run_action(self, action=None, args=None, args_file=None, operation_logger=None):
#
# FIXME : this stuff looks a lot like set() ...
#
self.filter_key = ".".join(action.split(".")[:2])
action_id = action.split(".")[2]
# Read config panel toml
self._get_config_panel()
# FIXME: should also check that there's indeed a key called action
if not self.config:
raise YunohostValidationError(f"No action named {action}", raw_msg=True)
# Import and parse pre-answered options
logger.debug("Import and parse pre-answered options")
self._parse_pre_answered(args, None, args_file)
# Read or get values and hydrate the config
self._load_current_values()
self._hydrate()
Question.operation_logger = operation_logger
self._ask(action=action_id)
# FIXME: here, we could want to check constrains on
# the action's visibility / requirements wrt to the answer to questions ...
if operation_logger:
operation_logger.start()
try:
self._run_action(action_id)
except YunohostError:
raise
# Script got manually interrupted ...
# N.B. : KeyboardInterrupt does not inherit from Exception
except (KeyboardInterrupt, EOFError):
error = m18n.n("operation_interrupted")
logger.error(m18n.n("config_action_failed", action=action, error=error))
raise
# Something wrong happened in Yunohost's code (most probably hook_exec)
except Exception:
import traceback
error = m18n.n("unexpected_error", error="\n" + traceback.format_exc())
logger.error(m18n.n("config_action_failed", action=action, error=error))
raise
finally:
# Delete files uploaded from API
# FIXME : this is currently done in the context of config panels,
# but could also happen in the context of app install ... (or anywhere else
# where we may parse args etc...)
FileQuestion.clean_upload_dirs()
# FIXME: i18n
logger.success(f"Action {action_id} successful")
operation_logger.success()
def set(
self, key=None, value=None, args=None, args_file=None, operation_logger=None
):
self.filter_key = key or ""
# Read config panel toml
self._get_config_panel()
if not self.config:
raise YunohostValidationError("config_no_panel")
if (args is not None or args_file is not None) and value is not None:
raise YunohostValidationError(
"You should either provide a value, or a serie of args/args_file, but not both at the same time",
raw_msg=True,
)
if self.filter_key.count(".") != 2 and value is not None:
raise YunohostValidationError("config_cant_set_value_on_section")
# Import and parse pre-answered options
logger.debug("Import and parse pre-answered options")
self._parse_pre_answered(args, value, args_file)
# Read or get values and hydrate the config
self._load_current_values()
self._hydrate()
Question.operation_logger = operation_logger
self._ask()
if operation_logger:
operation_logger.start()
try:
self._apply()
except YunohostError:
raise
# Script got manually interrupted ...
# N.B. : KeyboardInterrupt does not inherit from Exception
except (KeyboardInterrupt, EOFError):
error = m18n.n("operation_interrupted")
logger.error(m18n.n("config_apply_failed", error=error))
raise
# Something wrong happened in Yunohost's code (most probably hook_exec)
except Exception:
import traceback
error = m18n.n("unexpected_error", error="\n" + traceback.format_exc())
logger.error(m18n.n("config_apply_failed", error=error))
raise
finally:
# Delete files uploaded from API
# FIXME : this is currently done in the context of config panels,
# but could also happen in the context of app install ... (or anywhere else
# where we may parse args etc...)
FileQuestion.clean_upload_dirs()
self._reload_services()
logger.success("Config updated as expected")
operation_logger.success()
def _get_toml(self):
return read_toml(self.config_path)
def _get_config_panel(self):
# Split filter_key
filter_key = self.filter_key.split(".") if self.filter_key != "" else []
if len(filter_key) > 3:
raise YunohostError(
f"The filter key {filter_key} has too many sub-levels, the max is 3.",
raw_msg=True,
)
if not os.path.exists(self.config_path):
logger.debug(f"Config panel {self.config_path} doesn't exists")
return None
toml_config_panel = self._get_toml()
# Check TOML config panel is in a supported version
if float(toml_config_panel["version"]) < CONFIG_PANEL_VERSION_SUPPORTED:
logger.error(
f"Config panels version {toml_config_panel['version']} are not supported"
)
return None
# Transform toml format into internal format
format_description = {
"root": {
"properties": ["version", "i18n"],
"defaults": {"version": 1.0},
},
"panels": {
"properties": ["name", "services", "actions", "help"],
"defaults": {
"services": [],
"actions": {"apply": {"en": "Apply"}},
},
},
"sections": {
"properties": ["name", "services", "optional", "help", "visible"],
"defaults": {
"name": "",
"services": [],
"optional": True,
"is_action_section": False,
},
},
"options": {
"properties": [
"ask",
"type",
"bind",
"help",
"example",
"default",
"style",
"icon",
"placeholder",
"visible",
"optional",
"choices",
"yes",
"no",
"pattern",
"limit",
"min",
"max",
"step",
"accept",
"redact",
"filter",
"readonly",
"enabled",
# "confirm", # TODO: to ask confirmation before running an action
],
"defaults": {},
},
}
def _build_internal_config_panel(raw_infos, level):
"""Convert TOML in internal format ('full' mode used by webadmin)
Here are some properties of 1.0 config panel in toml:
- node properties and node children are mixed,
- text are in english only
- some properties have default values
This function detects all children nodes and put them in a list
"""
defaults = format_description[level]["defaults"]
properties = format_description[level]["properties"]
# Start building the ouput (merging the raw infos + defaults)
out = {key: raw_infos.get(key, value) for key, value in defaults.items()}
# Now fill the sublevels (+ apply filter_key)
i = list(format_description).index(level)
sublevel = list(format_description)[i + 1] if level != "options" else None
search_key = filter_key[i] if len(filter_key) > i else False
for key, value in raw_infos.items():
# Key/value are a child node
if (
isinstance(value, OrderedDict)
and key not in properties
and sublevel
):
# We exclude all nodes not referenced by the filter_key
if search_key and key != search_key:
continue
subnode = _build_internal_config_panel(value, sublevel)
subnode["id"] = key
if level == "root":
subnode.setdefault("name", {"en": key.capitalize()})
elif level == "sections":
subnode["name"] = key # legacy
subnode.setdefault("optional", raw_infos.get("optional", True))
# If this section contains at least one button, it becomes an "action" section
if subnode.get("type") == "button":
out["is_action_section"] = True
out.setdefault(sublevel, []).append(subnode)
# Key/value are a property
else:
if key not in properties:
logger.warning(f"Unknown key '{key}' found in config panel")
# Todo search all i18n keys
out[key] = (
value
if key not in ["ask", "help", "name"] or isinstance(value, dict)
else {"en": value}
)
return out
self.config = _build_internal_config_panel(toml_config_panel, "root")
try:
self.config["panels"][0]["sections"][0]["options"][0]
except (KeyError, IndexError):
raise YunohostValidationError(
"config_unknown_filter_key", filter_key=self.filter_key
)
# List forbidden keywords from helpers and sections toml (to avoid conflict)
forbidden_keywords = [
"old",
"app",
"changed",
"file_hash",
"binds",
"types",
"formats",
"getter",
"setter",
"short_setting",
"type",
"bind",
"nothing_changed",
"changes_validated",
"result",
"max_progression",
]
forbidden_keywords += format_description["sections"]
forbidden_readonly_types = ["password", "app", "domain", "user", "file"]
for _, _, option in self._iterate():
if option["id"] in forbidden_keywords:
raise YunohostError("config_forbidden_keyword", keyword=option["id"])
if (
option.get("readonly", False)
and option.get("type", "string") in forbidden_readonly_types
):
raise YunohostError(
"config_forbidden_readonly_type",
type=option["type"],
id=option["id"],
)
return self.config
def _hydrate(self):
# Hydrating config panel with current value
for _, section, option in self._iterate():
if option["id"] not in self.values:
allowed_empty_types = [
"alert",
"display_text",
"markdown",
"file",
"button",
]
if section["is_action_section"] and option.get("default") is not None:
self.values[option["id"]] = option["default"]
elif (
option["type"] in allowed_empty_types
or option.get("bind") == "null"
):
continue
else:
raise YunohostError(
f"Config panel question '{option['id']}' should be initialized with a value during install or upgrade.",
raw_msg=True,
)
value = self.values[option["name"]]
# Allow to use value instead of current_value in app config script.
# e.g. apps may write `echo 'value: "foobar"'` in the config file (which is more intuitive that `echo 'current_value: "foobar"'`
# For example hotspot used it...
# See https://github.com/YunoHost/yunohost/pull/1546
if (
isinstance(value, dict)
and "value" in value
and "current_value" not in value
):
value["current_value"] = value["value"]
# In general, the value is just a simple value.
# Sometimes it could be a dict used to overwrite the option itself
value = value if isinstance(value, dict) else {"current_value": value}
option.update(value)
self.values[option["id"]] = value.get("current_value")
return self.values
def _ask(self, action=None):
logger.debug("Ask unanswered question and prevalidate data")
if "i18n" in self.config:
for panel, section, option in self._iterate():
if "ask" not in option:
option["ask"] = m18n.n(self.config["i18n"] + "_" + option["id"])
# auto add i18n help text if present in locales
if m18n.key_exists(self.config["i18n"] + "_" + option["id"] + "_help"):
option["help"] = m18n.n(
self.config["i18n"] + "_" + option["id"] + "_help"
)
def display_header(message):
"""CLI panel/section header display"""
if Moulinette.interface.type == "cli" and self.filter_key.count(".") < 2:
Moulinette.display(colorize(message, "purple"))
for panel, section, obj in self._iterate(["panel", "section"]):
if (
section
and section.get("visible")
and not evaluate_simple_js_expression(
section["visible"], context=self.future_values
)
):
continue
# Ugly hack to skip action section ... except when when explicitly running actions
if not action:
if section and section["is_action_section"]:
continue
if panel == obj:
name = _value_for_locale(panel["name"])
display_header(f"\n{'='*40}\n>>>> {name}\n{'='*40}")
else:
name = _value_for_locale(section["name"])
if name:
display_header(f"\n# {name}")
elif section:
# filter action section options in case of multiple buttons
section["options"] = [
option
for option in section["options"]
if option.get("type", "string") != "button"
or option["id"] == action
]
if panel == obj:
continue
# Check and ask unanswered questions
prefilled_answers = self.args.copy()
prefilled_answers.update(self.new_values)
questions = ask_questions_and_parse_answers(
{question["name"]: question for question in section["options"]},
prefilled_answers=prefilled_answers,
current_values=self.values,
hooks=self.hooks,
)
self.new_values.update(
{
question.name: question.value
for question in questions
if question.value is not None
}
)
def _get_default_values(self):
return {
option["id"]: option["default"]
for _, _, option in self._iterate()
if "default" in option
}
@property
def future_values(self):
return {**self.values, **self.new_values}
def __getattr__(self, name):
if "new_values" in self.__dict__ and name in self.new_values:
return self.new_values[name]
if "values" in self.__dict__ and name in self.values:
return self.values[name]
return self.__dict__[name]
def _load_current_values(self):
"""
Retrieve entries in YAML file
And set default values if needed
"""
# Inject defaults if needed (using the magic .update() ;))
self.values = self._get_default_values()
# Retrieve entries in the YAML
if os.path.exists(self.save_path) and os.path.isfile(self.save_path):
self.values.update(read_yaml(self.save_path) or {})
def _parse_pre_answered(self, args, value, args_file):
args = urllib.parse.parse_qs(args or "", keep_blank_values=True)
self.args = {key: ",".join(value_) for key, value_ in args.items()}
if args_file:
# Import YAML / JSON file but keep --args values
self.args = {**read_yaml(args_file), **self.args}
if value is not None:
self.args = {self.filter_key.split(".")[-1]: value}
def _apply(self):
logger.info("Saving the new configuration...")
dir_path = os.path.dirname(os.path.realpath(self.save_path))
if not os.path.exists(dir_path):
mkdir(dir_path, mode=0o700)
values_to_save = self.future_values
if self.save_mode == "diff":
defaults = self._get_default_values()
values_to_save = {
k: v for k, v in values_to_save.items() if defaults.get(k) != v
}
# Save the settings to the .yaml file
write_to_yaml(self.save_path, values_to_save)
def _reload_services(self):
from yunohost.service import service_reload_or_restart
services_to_reload = set()
for panel, section, obj in self._iterate(["panel", "section", "option"]):
services_to_reload |= set(obj.get("services", []))
services_to_reload = list(services_to_reload)
services_to_reload.sort(key="nginx".__eq__)
if services_to_reload:
logger.info("Reloading services...")
for service in services_to_reload:
if hasattr(self, "entity"):
service = service.replace("__APP__", self.entity)
service_reload_or_restart(service)
def _iterate(self, trigger=["option"]):
for panel in self.config.get("panels", []):
if "panel" in trigger:
yield (panel, None, panel)
for section in panel.get("sections", []):
if "section" in trigger:
yield (panel, section, section)
if "option" in trigger:
for option in section.get("options", []):
yield (panel, section, option)