config: rework config+settings getter methods

This commit is contained in:
axolotle 2023-04-18 01:26:41 +02:00
parent 564a66de2f
commit 02948ad49c
7 changed files with 166 additions and 276 deletions

View file

@ -26,7 +26,8 @@ import re
import subprocess
import tempfile
import copy
from typing import List, Tuple, Dict, Any, Iterator, Optional
from collections import OrderedDict
from typing import TYPE_CHECKING, List, Tuple, Dict, Any, Iterator, Optional
from packaging import version
from logging import getLogger
from pathlib import Path
@ -71,6 +72,9 @@ from yunohost.app_catalog import ( # noqa
APPS_CATALOG_LOGOS,
)
if TYPE_CHECKING:
from yunohost.utils.configpanel import ConfigPanelModel, RawSettings
logger = getLogger("yunohost.app")
APPS_SETTING_PATH = "/etc/yunohost/apps/"
@ -1802,8 +1806,8 @@ class AppConfigPanel(ConfigPanel):
env = {key: str(value) for key, value in self.new_values.items()}
self._call_config_script(action, env=env)
def _get_raw_settings(self):
self.values = self._call_config_script("show")
def _get_raw_settings(self, config: "ConfigPanelModel") -> "RawSettings":
return self._call_config_script("show")
def _apply(self):
env = {key: str(value) for key, value in self.new_values.items()}

View file

@ -528,7 +528,7 @@ def _get_registrar_config_section(domain):
parent_domain=parent_domain,
parent_domain_link=parent_domain_link,
),
"value": "parent_domain",
"default": "parent_domain",
}
)
return OrderedDict(registrar_infos)
@ -541,7 +541,7 @@ def _get_registrar_config_section(domain):
"type": "alert",
"style": "success",
"ask": m18n.n("domain_dns_registrar_yunohost"),
"value": "yunohost",
"default": "yunohost",
}
)
return OrderedDict(registrar_infos)
@ -551,7 +551,7 @@ def _get_registrar_config_section(domain):
"type": "alert",
"style": "info",
"ask": m18n.n("domain_dns_conf_special_use_tld"),
"value": None,
"default": None,
}
)
@ -563,7 +563,7 @@ def _get_registrar_config_section(domain):
"type": "alert",
"style": "warning",
"ask": m18n.n("domain_dns_registrar_not_supported"),
"value": None,
"default": None,
}
)
else:
@ -572,7 +572,7 @@ def _get_registrar_config_section(domain):
"type": "alert",
"style": "info",
"ask": m18n.n("domain_dns_registrar_supported", registrar=registrar),
"value": registrar,
"default": registrar,
}
)

View file

@ -19,7 +19,7 @@
import os
import time
from pathlib import Path
from typing import List, Optional
from typing import TYPE_CHECKING, List, Optional
from collections import OrderedDict
from logging import getLogger
@ -47,6 +47,9 @@ from yunohost.utils.error import YunohostError, YunohostValidationError
from yunohost.utils.dns import is_yunohost_dyndns_domain
from yunohost.log import is_unit_operation
if TYPE_CHECKING:
from yunohost.utils.configpanel import RawConfig
logger = getLogger("yunohost.domain")
DOMAIN_SETTINGS_DIR = "/etc/yunohost/domains"
@ -666,10 +669,14 @@ class DomainConfigPanel(ConfigPanel):
return result
def _get_raw_config(self):
toml = super()._get_raw_config()
def _get_raw_config(self) -> "RawConfig":
# TODO add mechanism to share some settings with other domains on the same zone
raw_config = super()._get_raw_config()
toml["feature"]["xmpp"]["xmpp"]["default"] = (
any_filter = all(self.filter_key)
panel_id, section_id, option_id = self.filter_key
raw_config["feature"]["xmpp"]["xmpp"]["default"] = (
1 if self.entity == _get_maindomain() else 0
)
@ -680,55 +687,43 @@ class DomainConfigPanel(ConfigPanel):
# Optimize wether or not to load the DNS section,
# e.g. we don't want to trigger the whole _get_registary_config_section
# when just getting the current value from the feature section
filter_key = self.filter_key.split(".") if self.filter_key != "" else []
if not filter_key or filter_key[0] == "dns":
if not any_filter or panel_id == "dns":
from yunohost.dns import _get_registrar_config_section
toml["dns"]["registrar"] = _get_registrar_config_section(self.entity)
# FIXME: Ugly hack to save the registar id/value and reinject it in _get_raw_settings ...
self.registar_id = toml["dns"]["registrar"]["registrar"]["value"]
del toml["dns"]["registrar"]["registrar"]["value"]
raw_config["dns"]["registrar"] = _get_registrar_config_section(self.entity)
# Cert stuff
if not filter_key or filter_key[0] == "cert":
if not any_filter or panel_id == "cert":
from yunohost.certificate import certificate_status
status = certificate_status([self.entity], full=True)["certificates"][
self.entity
]
toml["cert"]["cert"]["cert_summary"]["style"] = status["style"]
raw_config["cert"]["cert"]["cert_summary"]["style"] = status["style"]
# i18n: domain_config_cert_summary_expired
# i18n: domain_config_cert_summary_selfsigned
# i18n: domain_config_cert_summary_abouttoexpire
# i18n: domain_config_cert_summary_ok
# i18n: domain_config_cert_summary_letsencrypt
toml["cert"]["cert"]["cert_summary"]["ask"] = m18n.n(
raw_config["cert"]["cert"]["cert_summary"]["ask"] = m18n.n(
f"domain_config_cert_summary_{status['summary']}"
)
# FIXME: Ugly hack to save the cert status and reinject it in _get_raw_settings ...
self.cert_status = status
for option_id, status_key in [
("cert_validity", "validity"),
("cert_issuer", "CA_type"),
("acme_eligible", "ACME_eligible"),
# FIXME not sure why "summary" was injected in settings values
# ("summary", "summary")
]:
raw_config["cert"]["cert"][option_id]["default"] = status[status_key]
return toml
# Other specific strings used in config panels
# i18n: domain_config_cert_renew_help
def _get_raw_settings(self):
# TODO add mechanism to share some settings with other domains on the same zone
super()._get_raw_settings()
# FIXME: Ugly hack to save the registar id/value and reinject it in _get_raw_settings ...
filter_key = self.filter_key.split(".") if self.filter_key != "" else []
if not filter_key or filter_key[0] == "dns":
self.values["registrar"] = self.registar_id
# FIXME: Ugly hack to save the cert status and reinject it in _get_raw_settings ...
if not filter_key or filter_key[0] == "cert":
self.values["cert_validity"] = self.cert_status["validity"]
self.values["cert_issuer"] = self.cert_status["CA_type"]
self.values["acme_eligible"] = self.cert_status["ACME_eligible"]
self.values["summary"] = self.cert_status["summary"]
return raw_config
def _apply(self):
if (

View file

@ -19,6 +19,7 @@
import os
import subprocess
from logging import getLogger
from typing import TYPE_CHECKING
from moulinette import m18n
from yunohost.utils.error import YunohostError, YunohostValidationError
@ -29,6 +30,9 @@ from yunohost.firewall import firewall_reload
from yunohost.log import is_unit_operation
from yunohost.utils.legacy import translate_legacy_settings_to_configpanel_settings
if TYPE_CHECKING:
from yunohost.utils.configpanel import ConfigPanelModel, RawConfig, RawSettings
logger = getLogger("yunohost.settings")
SETTINGS_PATH = "/etc/yunohost/settings.yml"
@ -180,8 +184,8 @@ class SettingsConfigPanel(ConfigPanel):
logger.success(m18n.n("global_settings_reset_success"))
operation_logger.success()
def _get_raw_config(self):
toml = super()._get_raw_config()
def _get_raw_config(self) -> "RawConfig":
raw_config = super()._get_raw_config()
# Dynamic choice list for portal themes
THEMEDIR = "/usr/share/ssowat/portal/assets/themes/"
@ -189,28 +193,30 @@ class SettingsConfigPanel(ConfigPanel):
themes = [d for d in os.listdir(THEMEDIR) if os.path.isdir(THEMEDIR + d)]
except Exception:
themes = ["unsplash", "vapor", "light", "default", "clouds"]
toml["misc"]["portal"]["portal_theme"]["choices"] = themes
raw_config["misc"]["portal"]["portal_theme"]["choices"] = themes
return toml
return raw_config
def _get_raw_settings(self):
super()._get_raw_settings()
def _get_raw_settings(self, config: "ConfigPanelModel") -> "RawSettings":
raw_settings = super()._get_raw_settings(config)
# Specific logic for those settings who are "virtual" settings
# and only meant to have a custom setter mapped to tools_rootpw
self.values["root_password"] = ""
self.values["root_password_confirm"] = ""
raw_settings["root_password"] = ""
raw_settings["root_password_confirm"] = ""
# Specific logic for virtual setting "passwordless_sudo"
try:
from yunohost.utils.ldap import _get_ldap_interface
ldap = _get_ldap_interface()
self.values["passwordless_sudo"] = "!authenticate" in ldap.search(
raw_settings["passwordless_sudo"] = "!authenticate" in ldap.search(
"ou=sudo", "cn=admins", ["sudoOption"]
)[0].get("sudoOption", [])
except Exception:
self.values["passwordless_sudo"] = False
raw_settings["passwordless_sudo"] = False
return raw_settings
def _apply(self):
root_password = self.new_values.pop("root_password", None)

View file

@ -49,19 +49,19 @@ def test_registrar_list_integrity():
def test_magic_guess_registrar_weird_domain():
assert _get_registrar_config_section("yolo.tld")["registrar"]["value"] is None
assert _get_registrar_config_section("yolo.tld")["registrar"]["default"] is None
def test_magic_guess_registrar_ovh():
assert (
_get_registrar_config_section("yolo.yunohost.org")["registrar"]["value"]
_get_registrar_config_section("yolo.yunohost.org")["registrar"]["default"]
== "ovh"
)
def test_magic_guess_registrar_yunodyndns():
assert (
_get_registrar_config_section("yolo.nohost.me")["registrar"]["value"]
_get_registrar_config_section("yolo.nohost.me")["registrar"]["default"]
== "yunohost"
)

View file

@ -31,15 +31,16 @@ from moulinette.interfaces.cli import colorize
from moulinette.utils.filesystem import mkdir, read_toml, read_yaml, write_to_yaml
from yunohost.utils.error import YunohostError, YunohostValidationError
from yunohost.utils.form import (
OPTIONS,
BaseInputOption,
BaseOption,
BaseReadonlyOption,
FileOption,
FormModel,
OptionsModel,
OptionType,
Translation,
ask_questions_and_parse_answers,
build_form,
evaluate_simple_js_expression,
)
from yunohost.utils.i18n import _value_for_locale
@ -93,7 +94,7 @@ class SectionModel(ContainerModel, OptionsModel):
visible: Union[bool, str] = True,
**kwargs,
) -> None:
options = self.options_dict_to_list(kwargs, defaults={"optional": True})
options = self.options_dict_to_list(kwargs, optional=True)
ContainerModel.__init__(
self,
@ -231,12 +232,33 @@ class ConfigPanelModel(BaseModel):
# │ ╰─╴╰─╯╵╰╯╵ ╶┴╴╰─╯ ╶┴╴╵╵╵╵ ╰─╴ │
# ╰───────────────────────────────────────────────────────╯
if TYPE_CHECKING:
FilterKey = Sequence[Union[str, None]]
RawConfig = OrderedDict[str, Any]
RawSettings = dict[str, Any]
def parse_filter_key(key: Union[str, None] = None) -> "FilterKey":
if key and key.count(".") > 2:
raise YunohostError(
f"The filter key {key} has too many sub-levels, the max is 3.",
raw_msg=True,
)
if not key:
return (None, None, None)
keys = key.split(".")
return tuple(keys[i] if len(keys) > i else None for i in range(3))
class ConfigPanel:
entity_type = "config"
save_path_tpl: Union[str, None] = None
config_path_tpl = "/usr/share/yunohost/config_{entity_type}.toml"
save_mode = "full"
filter_key: "FilterKey" = (None, None, None)
config: Union[ConfigPanelModel, None] = None
form: Union[FormModel, None] = None
@classmethod
def list(cls):
@ -265,9 +287,6 @@ class ConfigPanel:
self.save_path = save_path
if not save_path and self.save_path_tpl:
self.save_path = self.save_path_tpl.format(entity=entity)
self.config = {}
self.values = {}
self.new_values = {}
if (
self.save_path
@ -501,215 +520,103 @@ class ConfigPanel:
logger.success(f"Action {action_id} successful")
operation_logger.success()
def _get_raw_config(self):
def _get_raw_config(self) -> "RawConfig":
if not os.path.exists(self.config_path):
raise YunohostValidationError("config_no_panel")
return read_toml(self.config_path)
def _get_config_panel(self):
# Split filter_key
filter_key = self.filter_key.split(".") if self.filter_key != "" else []
if len(filter_key) > 3:
raise YunohostError(
f"The filter key {filter_key} has too many sub-levels, the max is 3.",
raw_msg=True,
def _get_raw_settings(self, config: ConfigPanelModel) -> "RawSettings":
if not self.save_path or not os.path.exists(self.save_path):
raise YunohostValidationError("config_no_settings")
return read_yaml(self.save_path)
def _get_partial_raw_config(self) -> "RawConfig":
def filter_keys(
data: "RawConfig",
key: str,
model: Union[Type[ConfigPanelModel], Type[PanelModel], Type[SectionModel]],
) -> "RawConfig":
# filter in keys defined in model, filter out panels/sections/options that aren't `key`
return OrderedDict(
{k: v for k, v in data.items() if k in model.__fields__ or k == key}
)
if not os.path.exists(self.config_path):
logger.debug(f"Config panel {self.config_path} doesn't exists")
return None
raw_config = self._get_raw_config()
toml_config_panel = self._get_raw_config()
panel_id, section_id, option_id = self.filter_key
if panel_id:
raw_config = filter_keys(raw_config, panel_id, ConfigPanelModel)
# Check TOML config panel is in a supported version
if float(toml_config_panel["version"]) < CONFIG_PANEL_VERSION_SUPPORTED:
logger.error(
f"Config panels version {toml_config_panel['version']} are not supported"
)
return None
if section_id:
raw_config[panel_id] = filter_keys(
raw_config[panel_id], section_id, PanelModel
)
# Transform toml format into internal format
format_description = {
"root": {
"properties": ["version", "i18n"],
"defaults": {"version": 1.0},
},
"panels": {
"properties": ["name", "services", "actions", "help"],
"defaults": {
"services": [],
"actions": {"apply": {"en": "Apply"}},
},
},
"sections": {
"properties": ["name", "services", "optional", "help", "visible"],
"defaults": {
"name": "",
"services": [],
"optional": True,
"is_action_section": False,
},
},
"options": {
"properties": [
"ask",
"type",
"bind",
"help",
"example",
"default",
"style",
"icon",
"placeholder",
"visible",
"optional",
"choices",
"yes",
"no",
"pattern",
"limit",
"min",
"max",
"step",
"accept",
"redact",
"filter",
"readonly",
"enabled",
"add_yunohost_portal_to_choices",
# "confirm", # TODO: to ask confirmation before running an action
],
"defaults": {},
},
}
def _build_internal_config_panel(raw_infos, level):
"""Convert TOML in internal format ('full' mode used by webadmin)
Here are some properties of 1.0 config panel in toml:
- node properties and node children are mixed,
- text are in english only
- some properties have default values
This function detects all children nodes and put them in a list
"""
defaults = format_description[level]["defaults"]
properties = format_description[level]["properties"]
# Start building the ouput (merging the raw infos + defaults)
out = {key: raw_infos.get(key, value) for key, value in defaults.items()}
# Now fill the sublevels (+ apply filter_key)
i = list(format_description).index(level)
sublevel = list(format_description)[i + 1] if level != "options" else None
search_key = filter_key[i] if len(filter_key) > i else False
for key, value in raw_infos.items():
# Key/value are a child node
if (
isinstance(value, OrderedDict)
and key not in properties
and sublevel
):
# We exclude all nodes not referenced by the filter_key
if search_key and key != search_key:
continue
subnode = _build_internal_config_panel(value, sublevel)
subnode["id"] = key
if level == "root":
subnode.setdefault("name", {"en": key.capitalize()})
elif level == "sections":
subnode["name"] = key # legacy
subnode.setdefault("optional", raw_infos.get("optional", True))
# If this section contains at least one button, it becomes an "action" section
if subnode.get("type") == OptionType.button:
out["is_action_section"] = True
out.setdefault(sublevel, []).append(subnode)
# Key/value are a property
else:
if key not in properties:
logger.warning(f"Unknown key '{key}' found in config panel")
# Todo search all i18n keys
out[key] = (
value
if key not in ["ask", "help", "name"] or isinstance(value, dict)
else {"en": value}
if option_id:
raw_config[panel_id][section_id] = filter_keys(
raw_config[panel_id][section_id], option_id, SectionModel
)
return out
self.config = _build_internal_config_panel(toml_config_panel, "root")
return raw_config
def _get_partial_raw_settings_and_mutate_config(
self, config: ConfigPanelModel
) -> tuple[ConfigPanelModel, "RawSettings"]:
raw_settings = self._get_raw_settings(config)
values = {}
for _, section, option in config.iter_children():
value = data = raw_settings.get(option.id, getattr(option, "default", None))
if isinstance(data, dict):
# Settings data if gathered from bash "ynh_app_config_show"
# may be a custom getter that returns a dict with `value` or `current_value`
# and other attributes meant to override those of the option.
if "value" in data:
value = data.pop("value")
# Allow to use value instead of current_value in app config script.
# e.g. apps may write `echo 'value: "foobar"'` in the config file (which is more intuitive that `echo 'current_value: "foobar"'`
# For example hotspot used it...
# See https://github.com/YunoHost/yunohost/pull/1546
# FIXME do we still need the `current_value`?
if "current_value" in data:
value = data.pop("current_value")
# Mutate other possible option attributes
for k, v in data.items():
setattr(option, k, v)
if isinstance(option, BaseInputOption): # or option.bind == "null":
values[option.id] = value
return (config, values)
def _get_config_panel(
self, prevalidate: bool = False
) -> tuple[ConfigPanelModel, FormModel]:
raw_config = self._get_partial_raw_config()
config = ConfigPanelModel(**raw_config)
config, raw_settings = self._get_partial_raw_settings_and_mutate_config(config)
config.translate()
Settings = build_form(config.options)
settings = (
Settings(**raw_settings)
if prevalidate
else Settings.construct(**raw_settings)
)
try:
self.config["panels"][0]["sections"][0]["options"][0]
config.panels[0].sections[0].options[0]
except (KeyError, IndexError):
raise YunohostValidationError(
"config_unknown_filter_key", filter_key=self.filter_key
)
return self.config
def _get_default_values(self):
return {
option["id"]: option["default"]
for _, _, option in self._iterate()
if "default" in option
}
def _get_raw_settings(self):
"""
Retrieve entries in YAML file
And set default values if needed
"""
# Inject defaults if needed (using the magic .update() ;))
self.values = self._get_default_values()
# Retrieve entries in the YAML
if os.path.exists(self.save_path) and os.path.isfile(self.save_path):
self.values.update(read_yaml(self.save_path) or {})
def _hydrate(self):
# Hydrating config panel with current value
for _, section, option in self._iterate():
if option["id"] not in self.values:
allowed_empty_types = {
OptionType.alert,
OptionType.display_text,
OptionType.markdown,
OptionType.file,
OptionType.button,
}
if section["is_action_section"] and option.get("default") is not None:
self.values[option["id"]] = option["default"]
elif (
option["type"] in allowed_empty_types
or option.get("bind") == "null"
):
continue
else:
raise YunohostError(
f"Config panel question '{option['id']}' should be initialized with a value during install or upgrade.",
raw_msg=True,
)
value = self.values[option["id"]]
# Allow to use value instead of current_value in app config script.
# e.g. apps may write `echo 'value: "foobar"'` in the config file (which is more intuitive that `echo 'current_value: "foobar"'`
# For example hotspot used it...
# See https://github.com/YunoHost/yunohost/pull/1546
if (
isinstance(value, dict)
and "value" in value
and "current_value" not in value
):
value["current_value"] = value["value"]
# In general, the value is just a simple value.
# Sometimes it could be a dict used to overwrite the option itself
value = value if isinstance(value, dict) else {"current_value": value}
option.update(value)
self.values[option["id"]] = value.get("current_value")
return self.values
return (config, settings)
def _ask(self, action=None):
logger.debug("Ask unanswered question and prevalidate data")
@ -781,19 +688,6 @@ class ConfigPanel:
}
)
@property
def future_values(self):
return {**self.values, **self.new_values}
def __getattr__(self, name):
if "new_values" in self.__dict__ and name in self.new_values:
return self.new_values[name]
if "values" in self.__dict__ and name in self.values:
return self.values[name]
return self.__dict__[name]
def _parse_pre_answered(self, args, value, args_file):
args = urllib.parse.parse_qs(args or "", keep_blank_values=True)
self.args = {key: ",".join(value_) for key, value_ in args.items()}
@ -836,14 +730,3 @@ class ConfigPanel:
if hasattr(self, "entity"):
service = service.replace("__APP__", self.entity)
service_reload_or_restart(service)
def _iterate(self, trigger=["option"]):
for panel in self.config.get("panels", []):
if "panel" in trigger:
yield (panel, None, panel)
for section in panel.get("sections", []):
if "section" in trigger:
yield (panel, section, section)
if "option" in trigger:
for option in section.get("options", []):
yield (panel, section, option)

View file

@ -1286,12 +1286,14 @@ class OptionsModel(BaseModel):
options: list[Annotated[AnyOption, Field(discriminator="type")]]
@staticmethod
def options_dict_to_list(options: dict[str, Any], defaults: dict[str, Any] = {}):
def options_dict_to_list(options: dict[str, Any], optional: bool = False):
return [
option
| {
"id": id_,
"type": option.get("type", "string"),
# ConfigPanel options needs to be set as optional by default
"optional": option.get("optional", optional)
}
for id_, option in options.items()
]