Merge pull request #1649 from YunoHost/before-pydantic

ConfigPanel: Before pydantic (renames) 1/3
This commit is contained in:
Alexandre Aubin 2023-04-28 11:57:20 +02:00 committed by GitHub
commit 76bf9044c4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 777 additions and 739 deletions

View file

@ -50,8 +50,8 @@ from moulinette.utils.filesystem import (
from yunohost.utils.configpanel import ConfigPanel, ask_questions_and_parse_answers
from yunohost.utils.form import (
DomainQuestion,
PathQuestion,
DomainOption,
WebPathOption,
hydrate_questions_with_choices,
)
from yunohost.utils.i18n import _value_for_locale
@ -430,10 +430,10 @@ def app_change_url(operation_logger, app, domain, path):
# Normalize path and domain format
domain = DomainQuestion.normalize(domain)
old_domain = DomainQuestion.normalize(old_domain)
path = PathQuestion.normalize(path)
old_path = PathQuestion.normalize(old_path)
domain = DomainOption.normalize(domain)
old_domain = DomainOption.normalize(old_domain)
path = WebPathOption.normalize(path)
old_path = WebPathOption.normalize(old_path)
if (domain, path) == (old_domain, old_path):
raise YunohostValidationError(
@ -1660,8 +1660,8 @@ def app_register_url(app, domain, path):
permission_sync_to_user,
)
domain = DomainQuestion.normalize(domain)
path = PathQuestion.normalize(path)
domain = DomainOption.normalize(domain)
path = WebPathOption.normalize(path)
# We cannot change the url of an app already installed simply by changing
# the settings...
@ -1878,13 +1878,13 @@ class AppConfigPanel(ConfigPanel):
save_path_tpl = os.path.join(APPS_SETTING_PATH, "{entity}/settings.yml")
config_path_tpl = os.path.join(APPS_SETTING_PATH, "{entity}/config_panel.toml")
def _load_current_values(self):
self.values = self._call_config_script("show")
def _run_action(self, action):
env = {key: str(value) for key, value in self.new_values.items()}
self._call_config_script(action, env=env)
def _get_raw_settings(self):
self.values = self._call_config_script("show")
def _apply(self):
env = {key: str(value) for key, value in self.new_values.items()}
return_content = self._call_config_script("apply", env=env)
@ -2853,8 +2853,8 @@ def _get_conflicting_apps(domain, path, ignore_app=None):
from yunohost.domain import _assert_domain_exists
domain = DomainQuestion.normalize(domain)
path = PathQuestion.normalize(path)
domain = DomainOption.normalize(domain)
path = WebPathOption.normalize(path)
# Abort if domain is unknown
_assert_domain_exists(domain)

View file

@ -34,7 +34,7 @@ from yunohost.app import (
)
from yunohost.regenconf import regen_conf, _force_clear_hashes, _process_regen_conf
from yunohost.utils.configpanel import ConfigPanel
from yunohost.utils.form import Question
from yunohost.utils.form import BaseOption
from yunohost.utils.error import YunohostError, YunohostValidationError
from yunohost.log import is_unit_operation
@ -528,7 +528,7 @@ def domain_config_set(
"""
Apply a new domain configuration
"""
Question.operation_logger = operation_logger
BaseOption.operation_logger = operation_logger
config = DomainConfigPanel(domain)
return config.set(key, value, args, args_file, operation_logger=operation_logger)
@ -538,6 +538,83 @@ class DomainConfigPanel(ConfigPanel):
save_path_tpl = f"{DOMAIN_SETTINGS_DIR}/{{entity}}.yml"
save_mode = "diff"
def get(self, key="", mode="classic"):
result = super().get(key=key, mode=mode)
if mode == "full":
for panel, section, option in self._iterate():
# This injects:
# i18n: domain_config_cert_renew_help
# i18n: domain_config_default_app_help
# i18n: domain_config_xmpp_help
if m18n.key_exists(self.config["i18n"] + "_" + option["id"] + "_help"):
option["help"] = m18n.n(
self.config["i18n"] + "_" + option["id"] + "_help"
)
return self.config
return result
def _get_raw_config(self):
toml = super()._get_raw_config()
toml["feature"]["xmpp"]["xmpp"]["default"] = (
1 if self.entity == _get_maindomain() else 0
)
# Optimize wether or not to load the DNS section,
# e.g. we don't want to trigger the whole _get_registary_config_section
# when just getting the current value from the feature section
filter_key = self.filter_key.split(".") if self.filter_key != "" else []
if not filter_key or filter_key[0] == "dns":
from yunohost.dns import _get_registrar_config_section
toml["dns"]["registrar"] = _get_registrar_config_section(self.entity)
# FIXME: Ugly hack to save the registar id/value and reinject it in _get_raw_settings ...
self.registar_id = toml["dns"]["registrar"]["registrar"]["value"]
del toml["dns"]["registrar"]["registrar"]["value"]
# Cert stuff
if not filter_key or filter_key[0] == "cert":
from yunohost.certificate import certificate_status
status = certificate_status([self.entity], full=True)["certificates"][
self.entity
]
toml["cert"]["cert"]["cert_summary"]["style"] = status["style"]
# i18n: domain_config_cert_summary_expired
# i18n: domain_config_cert_summary_selfsigned
# i18n: domain_config_cert_summary_abouttoexpire
# i18n: domain_config_cert_summary_ok
# i18n: domain_config_cert_summary_letsencrypt
toml["cert"]["cert"]["cert_summary"]["ask"] = m18n.n(
f"domain_config_cert_summary_{status['summary']}"
)
# FIXME: Ugly hack to save the cert status and reinject it in _get_raw_settings ...
self.cert_status = status
return toml
def _get_raw_settings(self):
# TODO add mechanism to share some settings with other domains on the same zone
super()._get_raw_settings()
# FIXME: Ugly hack to save the registar id/value and reinject it in _get_raw_settings ...
filter_key = self.filter_key.split(".") if self.filter_key != "" else []
if not filter_key or filter_key[0] == "dns":
self.values["registrar"] = self.registar_id
# FIXME: Ugly hack to save the cert status and reinject it in _get_raw_settings ...
if not filter_key or filter_key[0] == "cert":
self.values["cert_validity"] = self.cert_status["validity"]
self.values["cert_issuer"] = self.cert_status["CA_type"]
self.values["acme_eligible"] = self.cert_status["ACME_eligible"]
self.values["summary"] = self.cert_status["summary"]
def _apply(self):
if (
"default_app" in self.future_values
@ -586,83 +663,6 @@ class DomainConfigPanel(ConfigPanel):
if stuff_to_regen_conf:
regen_conf(names=stuff_to_regen_conf)
def _get_toml(self):
toml = super()._get_toml()
toml["feature"]["xmpp"]["xmpp"]["default"] = (
1 if self.entity == _get_maindomain() else 0
)
# Optimize wether or not to load the DNS section,
# e.g. we don't want to trigger the whole _get_registary_config_section
# when just getting the current value from the feature section
filter_key = self.filter_key.split(".") if self.filter_key != "" else []
if not filter_key or filter_key[0] == "dns":
from yunohost.dns import _get_registrar_config_section
toml["dns"]["registrar"] = _get_registrar_config_section(self.entity)
# FIXME: Ugly hack to save the registar id/value and reinject it in _load_current_values ...
self.registar_id = toml["dns"]["registrar"]["registrar"]["value"]
del toml["dns"]["registrar"]["registrar"]["value"]
# Cert stuff
if not filter_key or filter_key[0] == "cert":
from yunohost.certificate import certificate_status
status = certificate_status([self.entity], full=True)["certificates"][
self.entity
]
toml["cert"]["cert"]["cert_summary"]["style"] = status["style"]
# i18n: domain_config_cert_summary_expired
# i18n: domain_config_cert_summary_selfsigned
# i18n: domain_config_cert_summary_abouttoexpire
# i18n: domain_config_cert_summary_ok
# i18n: domain_config_cert_summary_letsencrypt
toml["cert"]["cert"]["cert_summary"]["ask"] = m18n.n(
f"domain_config_cert_summary_{status['summary']}"
)
# FIXME: Ugly hack to save the cert status and reinject it in _load_current_values ...
self.cert_status = status
return toml
def get(self, key="", mode="classic"):
result = super().get(key=key, mode=mode)
if mode == "full":
for panel, section, option in self._iterate():
# This injects:
# i18n: domain_config_cert_renew_help
# i18n: domain_config_default_app_help
# i18n: domain_config_xmpp_help
if m18n.key_exists(self.config["i18n"] + "_" + option["id"] + "_help"):
option["help"] = m18n.n(
self.config["i18n"] + "_" + option["id"] + "_help"
)
return self.config
return result
def _load_current_values(self):
# TODO add mechanism to share some settings with other domains on the same zone
super()._load_current_values()
# FIXME: Ugly hack to save the registar id/value and reinject it in _load_current_values ...
filter_key = self.filter_key.split(".") if self.filter_key != "" else []
if not filter_key or filter_key[0] == "dns":
self.values["registrar"] = self.registar_id
# FIXME: Ugly hack to save the cert status and reinject it in _load_current_values ...
if not filter_key or filter_key[0] == "cert":
self.values["cert_validity"] = self.cert_status["validity"]
self.values["cert_issuer"] = self.cert_status["CA_type"]
self.values["acme_eligible"] = self.cert_status["ACME_eligible"]
self.values["summary"] = self.cert_status["summary"]
def domain_action_run(domain, action, args=None):
import urllib.parse

View file

@ -22,7 +22,7 @@ import subprocess
from moulinette import m18n
from yunohost.utils.error import YunohostError, YunohostValidationError
from yunohost.utils.configpanel import ConfigPanel
from yunohost.utils.form import Question
from yunohost.utils.form import BaseOption
from moulinette.utils.log import getActionLogger
from yunohost.regenconf import regen_conf
from yunohost.firewall import firewall_reload
@ -82,7 +82,7 @@ def settings_set(operation_logger, key=None, value=None, args=None, args_file=No
value -- New value
"""
Question.operation_logger = operation_logger
BaseOption.operation_logger = operation_logger
settings = SettingsConfigPanel()
key = translate_legacy_settings_to_configpanel_settings(key)
return settings.set(key, value, args, args_file, operation_logger=operation_logger)
@ -125,6 +125,93 @@ class SettingsConfigPanel(ConfigPanel):
def __init__(self, config_path=None, save_path=None, creation=False):
super().__init__("settings")
def get(self, key="", mode="classic"):
result = super().get(key=key, mode=mode)
if mode == "full":
for panel, section, option in self._iterate():
if m18n.key_exists(self.config["i18n"] + "_" + option["id"] + "_help"):
option["help"] = m18n.n(
self.config["i18n"] + "_" + option["id"] + "_help"
)
return self.config
# Dirty hack to let settings_get() to work from a python script
if isinstance(result, str) and result in ["True", "False"]:
result = bool(result == "True")
return result
def reset(self, key="", operation_logger=None):
self.filter_key = key
# Read config panel toml
self._get_config_panel()
if not self.config:
raise YunohostValidationError("config_no_panel")
# Replace all values with default values
self.values = self._get_default_values()
BaseOption.operation_logger = operation_logger
if operation_logger:
operation_logger.start()
try:
self._apply()
except YunohostError:
raise
# Script got manually interrupted ...
# N.B. : KeyboardInterrupt does not inherit from Exception
except (KeyboardInterrupt, EOFError):
error = m18n.n("operation_interrupted")
logger.error(m18n.n("config_apply_failed", error=error))
raise
# Something wrong happened in Yunohost's code (most probably hook_exec)
except Exception:
import traceback
error = m18n.n("unexpected_error", error="\n" + traceback.format_exc())
logger.error(m18n.n("config_apply_failed", error=error))
raise
logger.success(m18n.n("global_settings_reset_success"))
operation_logger.success()
def _get_raw_config(self):
toml = super()._get_raw_config()
# Dynamic choice list for portal themes
THEMEDIR = "/usr/share/ssowat/portal/assets/themes/"
try:
themes = [d for d in os.listdir(THEMEDIR) if os.path.isdir(THEMEDIR + d)]
except Exception:
themes = ["unsplash", "vapor", "light", "default", "clouds"]
toml["misc"]["portal"]["portal_theme"]["choices"] = themes
return toml
def _get_raw_settings(self):
super()._get_raw_settings()
# Specific logic for those settings who are "virtual" settings
# and only meant to have a custom setter mapped to tools_rootpw
self.values["root_password"] = ""
self.values["root_password_confirm"] = ""
# Specific logic for virtual setting "passwordless_sudo"
try:
from yunohost.utils.ldap import _get_ldap_interface
ldap = _get_ldap_interface()
self.values["passwordless_sudo"] = "!authenticate" in ldap.search(
"ou=sudo", "cn=admins", ["sudoOption"]
)[0].get("sudoOption", [])
except Exception:
self.values["passwordless_sudo"] = False
def _apply(self):
root_password = self.new_values.pop("root_password", None)
root_password_confirm = self.new_values.pop("root_password_confirm", None)
@ -170,93 +257,6 @@ class SettingsConfigPanel(ConfigPanel):
logger.error(f"Post-change hook for setting failed : {e}")
raise
def _get_toml(self):
toml = super()._get_toml()
# Dynamic choice list for portal themes
THEMEDIR = "/usr/share/ssowat/portal/assets/themes/"
try:
themes = [d for d in os.listdir(THEMEDIR) if os.path.isdir(THEMEDIR + d)]
except Exception:
themes = ["unsplash", "vapor", "light", "default", "clouds"]
toml["misc"]["portal"]["portal_theme"]["choices"] = themes
return toml
def _load_current_values(self):
super()._load_current_values()
# Specific logic for those settings who are "virtual" settings
# and only meant to have a custom setter mapped to tools_rootpw
self.values["root_password"] = ""
self.values["root_password_confirm"] = ""
# Specific logic for virtual setting "passwordless_sudo"
try:
from yunohost.utils.ldap import _get_ldap_interface
ldap = _get_ldap_interface()
self.values["passwordless_sudo"] = "!authenticate" in ldap.search(
"ou=sudo", "cn=admins", ["sudoOption"]
)[0].get("sudoOption", [])
except Exception:
self.values["passwordless_sudo"] = False
def get(self, key="", mode="classic"):
result = super().get(key=key, mode=mode)
if mode == "full":
for panel, section, option in self._iterate():
if m18n.key_exists(self.config["i18n"] + "_" + option["id"] + "_help"):
option["help"] = m18n.n(
self.config["i18n"] + "_" + option["id"] + "_help"
)
return self.config
# Dirty hack to let settings_get() to work from a python script
if isinstance(result, str) and result in ["True", "False"]:
result = bool(result == "True")
return result
def reset(self, key="", operation_logger=None):
self.filter_key = key
# Read config panel toml
self._get_config_panel()
if not self.config:
raise YunohostValidationError("config_no_panel")
# Replace all values with default values
self.values = self._get_default_values()
Question.operation_logger = operation_logger
if operation_logger:
operation_logger.start()
try:
self._apply()
except YunohostError:
raise
# Script got manually interrupted ...
# N.B. : KeyboardInterrupt does not inherit from Exception
except (KeyboardInterrupt, EOFError):
error = m18n.n("operation_interrupted")
logger.error(m18n.n("config_apply_failed", error=error))
raise
# Something wrong happened in Yunohost's code (most probably hook_exec)
except Exception:
import traceback
error = m18n.n("unexpected_error", error="\n" + traceback.format_exc())
logger.error(m18n.n("config_apply_failed", error=error))
raise
logger.success(m18n.n("global_settings_reset_success"))
operation_logger.success()
# Meant to be a dict of setting_name -> function to call
post_change_hooks = {}

View file

@ -15,14 +15,14 @@ from _pytest.mark.structures import ParameterSet
from moulinette import Moulinette
from yunohost import app, domain, user
from yunohost.utils.form import (
ARGUMENTS_TYPE_PARSERS,
OPTIONS,
ask_questions_and_parse_answers,
DisplayTextQuestion,
PasswordQuestion,
DomainQuestion,
PathQuestion,
BooleanQuestion,
FileQuestion,
DisplayTextOption,
PasswordOption,
DomainOption,
WebPathOption,
BooleanOption,
FileOption,
evaluate_simple_js_expression,
)
from yunohost.utils.error import YunohostError, YunohostValidationError
@ -438,9 +438,9 @@ class BaseTest:
id_ = raw_option["id"]
option, value = _fill_or_prompt_one_option(raw_option, None)
is_special_readonly_option = isinstance(option, DisplayTextQuestion)
is_special_readonly_option = isinstance(option, DisplayTextOption)
assert isinstance(option, ARGUMENTS_TYPE_PARSERS[raw_option["type"]])
assert isinstance(option, OPTIONS[raw_option["type"]])
assert option.type == raw_option["type"]
assert option.name == id_
assert option.ask == {"en": id_}
@ -734,7 +734,7 @@ class TestPassword(BaseTest):
], reason="Should output exactly the same"),
("s3cr3t!!", "s3cr3t!!"),
("secret", FAIL),
*[("supersecret" + char, FAIL) for char in PasswordQuestion.forbidden_chars], # FIXME maybe add ` \n` to the list?
*[("supersecret" + char, FAIL) for char in PasswordOption.forbidden_chars], # FIXME maybe add ` \n` to the list?
# readonly
*xpass(scenarios=[
("s3cr3t!!", "s3cr3t!!", {"readonly": True}),
@ -1225,9 +1225,9 @@ class TestUrl(BaseTest):
@pytest.fixture
def file_clean():
FileQuestion.clean_upload_dirs()
FileOption.clean_upload_dirs()
yield
FileQuestion.clean_upload_dirs()
FileOption.clean_upload_dirs()
@contextmanager
@ -1263,7 +1263,7 @@ def _test_file_intake_may_fail(raw_option, intake, expected_output):
with open(value) as f:
assert f.read() == expected_output
FileQuestion.clean_upload_dirs()
FileOption.clean_upload_dirs()
assert not os.path.exists(value)
@ -2138,88 +2138,88 @@ def test_question_number_input_test_ask_with_example():
def test_normalize_boolean_nominal():
assert BooleanQuestion.normalize("yes") == 1
assert BooleanQuestion.normalize("Yes") == 1
assert BooleanQuestion.normalize(" yes ") == 1
assert BooleanQuestion.normalize("y") == 1
assert BooleanQuestion.normalize("true") == 1
assert BooleanQuestion.normalize("True") == 1
assert BooleanQuestion.normalize("on") == 1
assert BooleanQuestion.normalize("1") == 1
assert BooleanQuestion.normalize(1) == 1
assert BooleanOption.normalize("yes") == 1
assert BooleanOption.normalize("Yes") == 1
assert BooleanOption.normalize(" yes ") == 1
assert BooleanOption.normalize("y") == 1
assert BooleanOption.normalize("true") == 1
assert BooleanOption.normalize("True") == 1
assert BooleanOption.normalize("on") == 1
assert BooleanOption.normalize("1") == 1
assert BooleanOption.normalize(1) == 1
assert BooleanQuestion.normalize("no") == 0
assert BooleanQuestion.normalize("No") == 0
assert BooleanQuestion.normalize(" no ") == 0
assert BooleanQuestion.normalize("n") == 0
assert BooleanQuestion.normalize("false") == 0
assert BooleanQuestion.normalize("False") == 0
assert BooleanQuestion.normalize("off") == 0
assert BooleanQuestion.normalize("0") == 0
assert BooleanQuestion.normalize(0) == 0
assert BooleanOption.normalize("no") == 0
assert BooleanOption.normalize("No") == 0
assert BooleanOption.normalize(" no ") == 0
assert BooleanOption.normalize("n") == 0
assert BooleanOption.normalize("false") == 0
assert BooleanOption.normalize("False") == 0
assert BooleanOption.normalize("off") == 0
assert BooleanOption.normalize("0") == 0
assert BooleanOption.normalize(0) == 0
assert BooleanQuestion.normalize("") is None
assert BooleanQuestion.normalize(" ") is None
assert BooleanQuestion.normalize(" none ") is None
assert BooleanQuestion.normalize("None") is None
assert BooleanQuestion.normalize("noNe") is None
assert BooleanQuestion.normalize(None) is None
assert BooleanOption.normalize("") is None
assert BooleanOption.normalize(" ") is None
assert BooleanOption.normalize(" none ") is None
assert BooleanOption.normalize("None") is None
assert BooleanOption.normalize("noNe") is None
assert BooleanOption.normalize(None) is None
def test_normalize_boolean_humanize():
assert BooleanQuestion.humanize("yes") == "yes"
assert BooleanQuestion.humanize("true") == "yes"
assert BooleanQuestion.humanize("on") == "yes"
assert BooleanOption.humanize("yes") == "yes"
assert BooleanOption.humanize("true") == "yes"
assert BooleanOption.humanize("on") == "yes"
assert BooleanQuestion.humanize("no") == "no"
assert BooleanQuestion.humanize("false") == "no"
assert BooleanQuestion.humanize("off") == "no"
assert BooleanOption.humanize("no") == "no"
assert BooleanOption.humanize("false") == "no"
assert BooleanOption.humanize("off") == "no"
def test_normalize_boolean_invalid():
with pytest.raises(YunohostValidationError):
BooleanQuestion.normalize("yesno")
BooleanOption.normalize("yesno")
with pytest.raises(YunohostValidationError):
BooleanQuestion.normalize("foobar")
BooleanOption.normalize("foobar")
with pytest.raises(YunohostValidationError):
BooleanQuestion.normalize("enabled")
BooleanOption.normalize("enabled")
def test_normalize_boolean_special_yesno():
customyesno = {"yes": "enabled", "no": "disabled"}
assert BooleanQuestion.normalize("yes", customyesno) == "enabled"
assert BooleanQuestion.normalize("true", customyesno) == "enabled"
assert BooleanQuestion.normalize("enabled", customyesno) == "enabled"
assert BooleanQuestion.humanize("yes", customyesno) == "yes"
assert BooleanQuestion.humanize("true", customyesno) == "yes"
assert BooleanQuestion.humanize("enabled", customyesno) == "yes"
assert BooleanOption.normalize("yes", customyesno) == "enabled"
assert BooleanOption.normalize("true", customyesno) == "enabled"
assert BooleanOption.normalize("enabled", customyesno) == "enabled"
assert BooleanOption.humanize("yes", customyesno) == "yes"
assert BooleanOption.humanize("true", customyesno) == "yes"
assert BooleanOption.humanize("enabled", customyesno) == "yes"
assert BooleanQuestion.normalize("no", customyesno) == "disabled"
assert BooleanQuestion.normalize("false", customyesno) == "disabled"
assert BooleanQuestion.normalize("disabled", customyesno) == "disabled"
assert BooleanQuestion.humanize("no", customyesno) == "no"
assert BooleanQuestion.humanize("false", customyesno) == "no"
assert BooleanQuestion.humanize("disabled", customyesno) == "no"
assert BooleanOption.normalize("no", customyesno) == "disabled"
assert BooleanOption.normalize("false", customyesno) == "disabled"
assert BooleanOption.normalize("disabled", customyesno) == "disabled"
assert BooleanOption.humanize("no", customyesno) == "no"
assert BooleanOption.humanize("false", customyesno) == "no"
assert BooleanOption.humanize("disabled", customyesno) == "no"
def test_normalize_domain():
assert DomainQuestion.normalize("https://yolo.swag/") == "yolo.swag"
assert DomainQuestion.normalize("http://yolo.swag") == "yolo.swag"
assert DomainQuestion.normalize("yolo.swag/") == "yolo.swag"
assert DomainOption.normalize("https://yolo.swag/") == "yolo.swag"
assert DomainOption.normalize("http://yolo.swag") == "yolo.swag"
assert DomainOption.normalize("yolo.swag/") == "yolo.swag"
def test_normalize_path():
assert PathQuestion.normalize("") == "/"
assert PathQuestion.normalize("") == "/"
assert PathQuestion.normalize("macnuggets") == "/macnuggets"
assert PathQuestion.normalize("/macnuggets") == "/macnuggets"
assert PathQuestion.normalize(" /macnuggets ") == "/macnuggets"
assert PathQuestion.normalize("/macnuggets") == "/macnuggets"
assert PathQuestion.normalize("mac/nuggets") == "/mac/nuggets"
assert PathQuestion.normalize("/macnuggets/") == "/macnuggets"
assert PathQuestion.normalize("macnuggets/") == "/macnuggets"
assert PathQuestion.normalize("////macnuggets///") == "/macnuggets"
assert WebPathOption.normalize("") == "/"
assert WebPathOption.normalize("") == "/"
assert WebPathOption.normalize("macnuggets") == "/macnuggets"
assert WebPathOption.normalize("/macnuggets") == "/macnuggets"
assert WebPathOption.normalize(" /macnuggets ") == "/macnuggets"
assert WebPathOption.normalize("/macnuggets") == "/macnuggets"
assert WebPathOption.normalize("mac/nuggets") == "/mac/nuggets"
assert WebPathOption.normalize("/macnuggets/") == "/macnuggets"
assert WebPathOption.normalize("macnuggets/") == "/macnuggets"
assert WebPathOption.normalize("////macnuggets///") == "/macnuggets"
def test_simple_evaluate():

View file

@ -23,25 +23,19 @@ import urllib.parse
from collections import OrderedDict
from typing import Union
from moulinette.interfaces.cli import colorize
from moulinette import Moulinette, m18n
from moulinette.interfaces.cli import colorize
from moulinette.utils.filesystem import mkdir, read_toml, read_yaml, write_to_yaml
from moulinette.utils.log import getActionLogger
from moulinette.utils.filesystem import (
read_toml,
read_yaml,
write_to_yaml,
mkdir,
)
from yunohost.utils.i18n import _value_for_locale
from yunohost.utils.error import YunohostError, YunohostValidationError
from yunohost.utils.form import (
ARGUMENTS_TYPE_PARSERS,
FileQuestion,
Question,
OPTIONS,
BaseOption,
FileOption,
ask_questions_and_parse_answers,
evaluate_simple_js_expression,
)
from yunohost.utils.i18n import _value_for_locale
logger = getActionLogger("yunohost.configpanel")
CONFIG_PANEL_VERSION_SUPPORTED = 1.0
@ -116,7 +110,7 @@ class ConfigPanel:
raise YunohostValidationError("config_no_panel")
# Read or get values and hydrate the config
self._load_current_values()
self._get_raw_settings()
self._hydrate()
# In 'classic' mode, we display the current value if key refer to an option
@ -127,7 +121,7 @@ class ConfigPanel:
option_type = None
for _, _, option_ in self._iterate():
if option_["id"] == option:
option_type = ARGUMENTS_TYPE_PARSERS[option_["type"]]
option_type = OPTIONS[option_["type"]]
break
return option_type.normalize(value) if option_type else value
@ -152,7 +146,7 @@ class ConfigPanel:
if mode == "full":
option["ask"] = ask
question_class = ARGUMENTS_TYPE_PARSERS[option.get("type", "string")]
question_class = OPTIONS[option.get("type", "string")]
# FIXME : maybe other properties should be taken from the question, not just choices ?.
option["choices"] = question_class(option).choices
option["default"] = question_class(option).default
@ -160,9 +154,7 @@ class ConfigPanel:
else:
result[key] = {"ask": ask}
if "current_value" in option:
question_class = ARGUMENTS_TYPE_PARSERS[
option.get("type", "string")
]
question_class = OPTIONS[option.get("type", "string")]
result[key]["value"] = question_class.humanize(
option["current_value"], option
)
@ -177,6 +169,68 @@ class ConfigPanel:
else:
return result
def set(
self, key=None, value=None, args=None, args_file=None, operation_logger=None
):
self.filter_key = key or ""
# Read config panel toml
self._get_config_panel()
if not self.config:
raise YunohostValidationError("config_no_panel")
if (args is not None or args_file is not None) and value is not None:
raise YunohostValidationError(
"You should either provide a value, or a serie of args/args_file, but not both at the same time",
raw_msg=True,
)
if self.filter_key.count(".") != 2 and value is not None:
raise YunohostValidationError("config_cant_set_value_on_section")
# Import and parse pre-answered options
logger.debug("Import and parse pre-answered options")
self._parse_pre_answered(args, value, args_file)
# Read or get values and hydrate the config
self._get_raw_settings()
self._hydrate()
BaseOption.operation_logger = operation_logger
self._ask()
if operation_logger:
operation_logger.start()
try:
self._apply()
except YunohostError:
raise
# Script got manually interrupted ...
# N.B. : KeyboardInterrupt does not inherit from Exception
except (KeyboardInterrupt, EOFError):
error = m18n.n("operation_interrupted")
logger.error(m18n.n("config_apply_failed", error=error))
raise
# Something wrong happened in Yunohost's code (most probably hook_exec)
except Exception:
import traceback
error = m18n.n("unexpected_error", error="\n" + traceback.format_exc())
logger.error(m18n.n("config_apply_failed", error=error))
raise
finally:
# Delete files uploaded from API
# FIXME : this is currently done in the context of config panels,
# but could also happen in the context of app install ... (or anywhere else
# where we may parse args etc...)
FileOption.clean_upload_dirs()
self._reload_services()
logger.success("Config updated as expected")
operation_logger.success()
def list_actions(self):
actions = {}
@ -211,9 +265,9 @@ class ConfigPanel:
self._parse_pre_answered(args, None, args_file)
# Read or get values and hydrate the config
self._load_current_values()
self._get_raw_settings()
self._hydrate()
Question.operation_logger = operation_logger
BaseOption.operation_logger = operation_logger
self._ask(action=action_id)
# FIXME: here, we could want to check constrains on
@ -244,75 +298,13 @@ class ConfigPanel:
# FIXME : this is currently done in the context of config panels,
# but could also happen in the context of app install ... (or anywhere else
# where we may parse args etc...)
FileQuestion.clean_upload_dirs()
FileOption.clean_upload_dirs()
# FIXME: i18n
logger.success(f"Action {action_id} successful")
operation_logger.success()
def set(
self, key=None, value=None, args=None, args_file=None, operation_logger=None
):
self.filter_key = key or ""
# Read config panel toml
self._get_config_panel()
if not self.config:
raise YunohostValidationError("config_no_panel")
if (args is not None or args_file is not None) and value is not None:
raise YunohostValidationError(
"You should either provide a value, or a serie of args/args_file, but not both at the same time",
raw_msg=True,
)
if self.filter_key.count(".") != 2 and value is not None:
raise YunohostValidationError("config_cant_set_value_on_section")
# Import and parse pre-answered options
logger.debug("Import and parse pre-answered options")
self._parse_pre_answered(args, value, args_file)
# Read or get values and hydrate the config
self._load_current_values()
self._hydrate()
Question.operation_logger = operation_logger
self._ask()
if operation_logger:
operation_logger.start()
try:
self._apply()
except YunohostError:
raise
# Script got manually interrupted ...
# N.B. : KeyboardInterrupt does not inherit from Exception
except (KeyboardInterrupt, EOFError):
error = m18n.n("operation_interrupted")
logger.error(m18n.n("config_apply_failed", error=error))
raise
# Something wrong happened in Yunohost's code (most probably hook_exec)
except Exception:
import traceback
error = m18n.n("unexpected_error", error="\n" + traceback.format_exc())
logger.error(m18n.n("config_apply_failed", error=error))
raise
finally:
# Delete files uploaded from API
# FIXME : this is currently done in the context of config panels,
# but could also happen in the context of app install ... (or anywhere else
# where we may parse args etc...)
FileQuestion.clean_upload_dirs()
self._reload_services()
logger.success("Config updated as expected")
operation_logger.success()
def _get_toml(self):
def _get_raw_config(self):
return read_toml(self.config_path)
def _get_config_panel(self):
@ -328,7 +320,7 @@ class ConfigPanel:
logger.debug(f"Config panel {self.config_path} doesn't exists")
return None
toml_config_panel = self._get_toml()
toml_config_panel = self._get_raw_config()
# Check TOML config panel is in a supported version
if float(toml_config_panel["version"]) < CONFIG_PANEL_VERSION_SUPPORTED:
@ -490,6 +482,26 @@ class ConfigPanel:
return self.config
def _get_default_values(self):
return {
option["id"]: option["default"]
for _, _, option in self._iterate()
if "default" in option
}
def _get_raw_settings(self):
"""
Retrieve entries in YAML file
And set default values if needed
"""
# Inject defaults if needed (using the magic .update() ;))
self.values = self._get_default_values()
# Retrieve entries in the YAML
if os.path.exists(self.save_path) and os.path.isfile(self.save_path):
self.values.update(read_yaml(self.save_path) or {})
def _hydrate(self):
# Hydrating config panel with current value
for _, section, option in self._iterate():
@ -606,13 +618,6 @@ class ConfigPanel:
}
)
def _get_default_values(self):
return {
option["id"]: option["default"]
for _, _, option in self._iterate()
if "default" in option
}
@property
def future_values(self):
return {**self.values, **self.new_values}
@ -626,19 +631,6 @@ class ConfigPanel:
return self.__dict__[name]
def _load_current_values(self):
"""
Retrieve entries in YAML file
And set default values if needed
"""
# Inject defaults if needed (using the magic .update() ;))
self.values = self._get_default_values()
# Retrieve entries in the YAML
if os.path.exists(self.save_path) and os.path.isfile(self.save_path):
self.values.update(read_yaml(self.save_path) or {})
def _parse_pre_answered(self, args, value, args_file):
args = urllib.parse.parse_qs(args or "", keep_blank_values=True)
self.args = {key: ",".join(value_) for key, value_ in args.items()}

View file

@ -16,30 +16,33 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
import re
import urllib.parse
import tempfile
import shutil
import ast
import operator as op
from typing import Optional, Dict, List, Union, Any, Mapping, Callable
import os
import re
import shutil
import tempfile
import urllib.parse
from typing import Any, Callable, Dict, List, Mapping, Optional, Union
from moulinette.interfaces.cli import colorize
from moulinette import Moulinette, m18n
from moulinette.interfaces.cli import colorize
from moulinette.utils.filesystem import read_file, write_to_file
from moulinette.utils.log import getActionLogger
from moulinette.utils.filesystem import (
read_file,
write_to_file,
)
from yunohost.utils.i18n import _value_for_locale
from yunohost.utils.error import YunohostError, YunohostValidationError
from yunohost.log import OperationLogger
from yunohost.utils.error import YunohostError, YunohostValidationError
from yunohost.utils.i18n import _value_for_locale
logger = getActionLogger("yunohost.form")
# ╭───────────────────────────────────────────────────────╮
# │ ┌─╴╷ ╷╭─┐╷ │
# │ ├─╴│╭╯├─┤│ │
# │ ╰─╴╰╯ ╵ ╵╰─╴ │
# ╰───────────────────────────────────────────────────────╯
# Those js-like evaluate functions are used to eval safely visible attributes
# The goal is to evaluate in the same way than js simple-evaluate
# https://github.com/shepherdwind/simple-evaluate
@ -183,7 +186,14 @@ def evaluate_simple_js_expression(expr, context={}):
return evaluate_simple_ast(node, context)
class Question:
# ╭───────────────────────────────────────────────────────╮
# │ ╭─╮┌─╮╶┬╴╶┬╴╭─╮╭╮╷╭─╴ │
# │ │ │├─╯ │ │ │ ││││╰─╮ │
# │ ╰─╯╵ ╵ ╶┴╴╰─╯╵╰╯╶─╯ │
# ╰───────────────────────────────────────────────────────╯
class BaseOption:
hide_user_input_in_prompt = False
pattern: Optional[Dict] = None
@ -231,22 +241,6 @@ class Question:
value = value.strip()
return value
def _prompt(self, text):
prefill = ""
if self.current_value is not None:
prefill = self.humanize(self.current_value, self)
elif self.default is not None:
prefill = self.humanize(self.default, self)
self.value = Moulinette.prompt(
message=text,
is_password=self.hide_user_input_in_prompt,
confirm=False,
prefill=prefill,
is_multiline=(self.type == "text"),
autocomplete=self.choices or [],
help=_value_for_locale(self.help),
)
def ask_if_needed(self):
if self.visible and not evaluate_simple_js_expression(
self.visible, context=self.context
@ -279,7 +273,7 @@ class Question:
try:
# Normalize and validate
self.value = self.normalize(self.value, self)
self._prevalidate()
self._value_pre_validator()
except YunohostValidationError as e:
# If in interactive cli, re-ask the current question
if i < 4 and Moulinette.interface.type == "cli" and os.isatty(1):
@ -292,7 +286,7 @@ class Question:
break
self.value = self.values[self.name] = self._post_parse_value()
self.value = self.values[self.name] = self._value_post_validator()
# Search for post actions in hooks
post_hook = f"post_ask__{self.name}"
@ -301,25 +295,21 @@ class Question:
return self.values
def _prevalidate(self):
if self.value in [None, ""] and not self.optional:
raise YunohostValidationError("app_argument_required", name=self.name)
# we have an answer, do some post checks
if self.value not in [None, ""]:
if self.choices and self.value not in self.choices:
raise YunohostValidationError(
"app_argument_choice_invalid",
name=self.name,
value=self.value,
choices=", ".join(str(choice) for choice in self.choices),
)
if self.pattern and not re.match(self.pattern["regexp"], str(self.value)):
raise YunohostValidationError(
self.pattern["error"],
name=self.name,
value=self.value,
)
def _prompt(self, text):
prefill = ""
if self.current_value is not None:
prefill = self.humanize(self.current_value, self)
elif self.default is not None:
prefill = self.humanize(self.default, self)
self.value = Moulinette.prompt(
message=text,
is_password=self.hide_user_input_in_prompt,
confirm=False,
prefill=prefill,
is_multiline=(self.type == "text"),
autocomplete=self.choices or [],
help=_value_for_locale(self.help),
)
def _format_text_for_user_input_in_cli(self):
text_for_user_input_in_cli = _value_for_locale(self.ask)
@ -353,7 +343,27 @@ class Question:
return text_for_user_input_in_cli
def _post_parse_value(self):
def _value_pre_validator(self):
if self.value in [None, ""] and not self.optional:
raise YunohostValidationError("app_argument_required", name=self.name)
# we have an answer, do some post checks
if self.value not in [None, ""]:
if self.choices and self.value not in self.choices:
raise YunohostValidationError(
"app_argument_choice_invalid",
name=self.name,
value=self.value,
choices=", ".join(str(choice) for choice in self.choices),
)
if self.pattern and not re.match(self.pattern["regexp"], str(self.value)):
raise YunohostValidationError(
self.pattern["error"],
name=self.name,
value=self.value,
)
def _value_post_validator(self):
if not self.redact:
return self.value
@ -377,108 +387,66 @@ class Question:
return self.value
class StringQuestion(Question):
# ╭───────────────────────────────────────────────────────╮
# │ DISPLAY OPTIONS │
# ╰───────────────────────────────────────────────────────╯
class DisplayTextOption(BaseOption):
argument_type = "display_text"
def __init__(
self, question, context: Mapping[str, Any] = {}, hooks: Dict[str, Callable] = {}
):
super().__init__(question, context, hooks)
self.optional = True
self.readonly = True
self.style = question.get(
"style", "info" if question["type"] == "alert" else ""
)
def _format_text_for_user_input_in_cli(self):
text = _value_for_locale(self.ask)
if self.style in ["success", "info", "warning", "danger"]:
color = {
"success": "green",
"info": "cyan",
"warning": "yellow",
"danger": "red",
}
prompt = m18n.g(self.style) if self.style != "danger" else m18n.n("danger")
return colorize(prompt, color[self.style]) + f" {text}"
else:
return text
class ButtonOption(BaseOption):
argument_type = "button"
enabled = None
def __init__(
self, question, context: Mapping[str, Any] = {}, hooks: Dict[str, Callable] = {}
):
super().__init__(question, context, hooks)
self.enabled = question.get("enabled", None)
# ╭───────────────────────────────────────────────────────╮
# │ INPUT OPTIONS │
# ╰───────────────────────────────────────────────────────╯
# ─ STRINGS ───────────────────────────────────────────────
class StringOption(BaseOption):
argument_type = "string"
default_value = ""
class EmailQuestion(StringQuestion):
pattern = {
"regexp": r"^.+@.+",
"error": "config_validate_email", # i18n: config_validate_email
}
class URLQuestion(StringQuestion):
pattern = {
"regexp": r"^https?://.*$",
"error": "config_validate_url", # i18n: config_validate_url
}
class DateQuestion(StringQuestion):
pattern = {
"regexp": r"^\d{4}-\d\d-\d\d$",
"error": "config_validate_date", # i18n: config_validate_date
}
def _prevalidate(self):
from datetime import datetime
super()._prevalidate()
if self.value not in [None, ""]:
try:
datetime.strptime(self.value, "%Y-%m-%d")
except ValueError:
raise YunohostValidationError("config_validate_date")
class TimeQuestion(StringQuestion):
pattern = {
"regexp": r"^(?:\d|[01]\d|2[0-3]):[0-5]\d$",
"error": "config_validate_time", # i18n: config_validate_time
}
class ColorQuestion(StringQuestion):
pattern = {
"regexp": r"^#[ABCDEFabcdef\d]{3,6}$",
"error": "config_validate_color", # i18n: config_validate_color
}
class TagsQuestion(Question):
argument_type = "tags"
default_value = ""
@staticmethod
def humanize(value, option={}):
if isinstance(value, list):
return ",".join(str(v) for v in value)
return value
@staticmethod
def normalize(value, option={}):
if isinstance(value, list):
return ",".join(str(v) for v in value)
if isinstance(value, str):
value = value.strip()
return value
def _prevalidate(self):
values = self.value
if isinstance(values, str):
values = values.split(",")
elif values is None:
values = []
if not isinstance(values, list):
if self.choices:
raise YunohostValidationError(
"app_argument_choice_invalid",
name=self.name,
value=self.value,
choices=", ".join(str(choice) for choice in self.choices),
)
raise YunohostValidationError(
"app_argument_invalid",
name=self.name,
error=f"'{str(self.value)}' is not a list",
)
for value in values:
self.value = value
super()._prevalidate()
self.value = values
def _post_parse_value(self):
if isinstance(self.value, list):
self.value = ",".join(self.value)
return super()._post_parse_value()
class PasswordQuestion(Question):
class PasswordOption(BaseOption):
hide_user_input_in_prompt = True
argument_type = "password"
default_value = ""
@ -494,8 +462,8 @@ class PasswordQuestion(Question):
"app_argument_password_no_default", name=self.name
)
def _prevalidate(self):
super()._prevalidate()
def _value_pre_validator(self):
super()._value_pre_validator()
if self.value not in [None, ""]:
if any(char in self.value for char in self.forbidden_chars):
@ -509,51 +477,95 @@ class PasswordQuestion(Question):
assert_password_is_strong_enough("user", self.value)
class PathQuestion(Question):
argument_type = "path"
default_value = ""
class ColorOption(StringOption):
pattern = {
"regexp": r"^#[ABCDEFabcdef\d]{3,6}$",
"error": "config_validate_color", # i18n: config_validate_color
}
# ─ NUMERIC ───────────────────────────────────────────────
class NumberOption(BaseOption):
argument_type = "number"
default_value = None
def __init__(
self, question, context: Mapping[str, Any] = {}, hooks: Dict[str, Callable] = {}
):
super().__init__(question, context, hooks)
self.min = question.get("min", None)
self.max = question.get("max", None)
self.step = question.get("step", None)
@staticmethod
def normalize(value, option={}):
option = option.__dict__ if isinstance(option, Question) else option
if isinstance(value, int):
return value
if not isinstance(value, str):
if isinstance(value, str):
value = value.strip()
if isinstance(value, str) and value.isdigit():
return int(value)
if value in [None, ""]:
return None
option = option.__dict__ if isinstance(option, BaseOption) else option
raise YunohostValidationError(
"app_argument_invalid",
name=option.get("name"),
error=m18n.n("invalid_number"),
)
def _value_pre_validator(self):
super()._value_pre_validator()
if self.value in [None, ""]:
return
if self.min is not None and int(self.value) < self.min:
raise YunohostValidationError(
"app_argument_invalid",
name=option.get("name"),
error="Argument for path should be a string.",
name=self.name,
error=m18n.n("invalid_number_min", min=self.min),
)
if not value.strip():
if option.get("optional"):
return ""
# Hmpf here we could just have a "else" case
# but we also want PathQuestion.normalize("") to return "/"
# (i.e. if no option is provided, hence .get("optional") is None
elif option.get("optional") is False:
raise YunohostValidationError(
"app_argument_invalid",
name=option.get("name"),
error="Question is mandatory",
)
return "/" + value.strip().strip(" /")
if self.max is not None and int(self.value) > self.max:
raise YunohostValidationError(
"app_argument_invalid",
name=self.name,
error=m18n.n("invalid_number_max", max=self.max),
)
class BooleanQuestion(Question):
# ─ BOOLEAN ───────────────────────────────────────────────
class BooleanOption(BaseOption):
argument_type = "boolean"
default_value = 0
yes_answers = ["1", "yes", "y", "true", "t", "on"]
no_answers = ["0", "no", "n", "false", "f", "off"]
def __init__(
self, question, context: Mapping[str, Any] = {}, hooks: Dict[str, Callable] = {}
):
super().__init__(question, context, hooks)
self.yes = question.get("yes", 1)
self.no = question.get("no", 0)
if self.default is None:
self.default = self.no
@staticmethod
def humanize(value, option={}):
option = option.__dict__ if isinstance(option, Question) else option
option = option.__dict__ if isinstance(option, BaseOption) else option
yes = option.get("yes", 1)
no = option.get("no", 0)
value = BooleanQuestion.normalize(value, option)
value = BooleanOption.normalize(value, option)
if value == yes:
return "yes"
@ -571,7 +583,7 @@ class BooleanQuestion(Question):
@staticmethod
def normalize(value, option={}):
option = option.__dict__ if isinstance(option, Question) else option
option = option.__dict__ if isinstance(option, BaseOption) else option
if isinstance(value, str):
value = value.strip()
@ -579,8 +591,8 @@ class BooleanQuestion(Question):
technical_yes = option.get("yes", 1)
technical_no = option.get("no", 0)
no_answers = BooleanQuestion.no_answers
yes_answers = BooleanQuestion.yes_answers
no_answers = BooleanOption.no_answers
yes_answers = BooleanOption.yes_answers
assert (
str(technical_yes).lower() not in no_answers
@ -609,14 +621,8 @@ class BooleanQuestion(Question):
choices="yes/no",
)
def __init__(
self, question, context: Mapping[str, Any] = {}, hooks: Dict[str, Callable] = {}
):
super().__init__(question, context, hooks)
self.yes = question.get("yes", 1)
self.no = question.get("no", 0)
if self.default is None:
self.default = self.no
def get(self, key, default=None):
return getattr(self, key, default)
def _format_text_for_user_input_in_cli(self):
text_for_user_input_in_cli = super()._format_text_for_user_input_in_cli()
@ -626,11 +632,205 @@ class BooleanQuestion(Question):
return text_for_user_input_in_cli
def get(self, key, default=None):
return getattr(self, key, default)
# ─ TIME ──────────────────────────────────────────────────
class DomainQuestion(Question):
class DateOption(StringOption):
pattern = {
"regexp": r"^\d{4}-\d\d-\d\d$",
"error": "config_validate_date", # i18n: config_validate_date
}
def _value_pre_validator(self):
from datetime import datetime
super()._value_pre_validator()
if self.value not in [None, ""]:
try:
datetime.strptime(self.value, "%Y-%m-%d")
except ValueError:
raise YunohostValidationError("config_validate_date")
class TimeOption(StringOption):
pattern = {
"regexp": r"^(?:\d|[01]\d|2[0-3]):[0-5]\d$",
"error": "config_validate_time", # i18n: config_validate_time
}
# ─ LOCATIONS ─────────────────────────────────────────────
class EmailOption(StringOption):
pattern = {
"regexp": r"^.+@.+",
"error": "config_validate_email", # i18n: config_validate_email
}
class WebPathOption(BaseOption):
argument_type = "path"
default_value = ""
@staticmethod
def normalize(value, option={}):
option = option.__dict__ if isinstance(option, BaseOption) else option
if not isinstance(value, str):
raise YunohostValidationError(
"app_argument_invalid",
name=option.get("name"),
error="Argument for path should be a string.",
)
if not value.strip():
if option.get("optional"):
return ""
# Hmpf here we could just have a "else" case
# but we also want WebPathOption.normalize("") to return "/"
# (i.e. if no option is provided, hence .get("optional") is None
elif option.get("optional") is False:
raise YunohostValidationError(
"app_argument_invalid",
name=option.get("name"),
error="Option is mandatory",
)
return "/" + value.strip().strip(" /")
class URLOption(StringOption):
pattern = {
"regexp": r"^https?://.*$",
"error": "config_validate_url", # i18n: config_validate_url
}
# ─ FILE ──────────────────────────────────────────────────
class FileOption(BaseOption):
argument_type = "file"
upload_dirs: List[str] = []
def __init__(
self, question, context: Mapping[str, Any] = {}, hooks: Dict[str, Callable] = {}
):
super().__init__(question, context, hooks)
self.accept = question.get("accept", "")
@classmethod
def clean_upload_dirs(cls):
# Delete files uploaded from API
for upload_dir in cls.upload_dirs:
if os.path.exists(upload_dir):
shutil.rmtree(upload_dir)
def _value_pre_validator(self):
if self.value is None:
self.value = self.current_value
super()._value_pre_validator()
# Validation should have already failed if required
if self.value in [None, ""]:
return self.value
if Moulinette.interface.type != "api":
if not os.path.exists(str(self.value)) or not os.path.isfile(
str(self.value)
):
raise YunohostValidationError(
"app_argument_invalid",
name=self.name,
error=m18n.n("file_does_not_exist", path=str(self.value)),
)
def _value_post_validator(self):
from base64 import b64decode
if not self.value:
return ""
upload_dir = tempfile.mkdtemp(prefix="ynh_filequestion_")
_, file_path = tempfile.mkstemp(dir=upload_dir)
FileOption.upload_dirs += [upload_dir]
logger.debug(f"Saving file {self.name} for file question into {file_path}")
def is_file_path(s):
return isinstance(s, str) and s.startswith("/") and os.path.exists(s)
if Moulinette.interface.type != "api" or is_file_path(self.value):
content = read_file(str(self.value), file_mode="rb")
else:
content = b64decode(self.value)
write_to_file(file_path, content, file_mode="wb")
self.value = file_path
return self.value
# ─ CHOICES ───────────────────────────────────────────────
class TagsOption(BaseOption):
argument_type = "tags"
default_value = ""
@staticmethod
def humanize(value, option={}):
if isinstance(value, list):
return ",".join(str(v) for v in value)
return value
@staticmethod
def normalize(value, option={}):
if isinstance(value, list):
return ",".join(str(v) for v in value)
if isinstance(value, str):
value = value.strip()
return value
def _value_pre_validator(self):
values = self.value
if isinstance(values, str):
values = values.split(",")
elif values is None:
values = []
if not isinstance(values, list):
if self.choices:
raise YunohostValidationError(
"app_argument_choice_invalid",
name=self.name,
value=self.value,
choices=", ".join(str(choice) for choice in self.choices),
)
raise YunohostValidationError(
"app_argument_invalid",
name=self.name,
error=f"'{str(self.value)}' is not a list",
)
for value in values:
self.value = value
super()._value_pre_validator()
self.value = values
def _value_post_validator(self):
if isinstance(self.value, list):
self.value = ",".join(self.value)
return super()._value_post_validator()
class DomainOption(BaseOption):
argument_type = "domain"
def __init__(
@ -661,7 +861,7 @@ class DomainQuestion(Question):
return value
class AppQuestion(Question):
class AppOption(BaseOption):
argument_type = "app"
def __init__(
@ -688,7 +888,7 @@ class AppQuestion(Question):
self.choices.update({app["id"]: _app_display(app) for app in apps})
class UserQuestion(Question):
class UserOption(BaseOption):
argument_type = "user"
def __init__(
@ -721,7 +921,7 @@ class UserQuestion(Question):
break
class GroupQuestion(Question):
class GroupOption(BaseOption):
argument_type = "group"
def __init__(
@ -747,198 +947,46 @@ class GroupQuestion(Question):
self.default = "all_users"
class NumberQuestion(Question):
argument_type = "number"
default_value = None
def __init__(
self, question, context: Mapping[str, Any] = {}, hooks: Dict[str, Callable] = {}
):
super().__init__(question, context, hooks)
self.min = question.get("min", None)
self.max = question.get("max", None)
self.step = question.get("step", None)
@staticmethod
def normalize(value, option={}):
if isinstance(value, int):
return value
if isinstance(value, str):
value = value.strip()
if isinstance(value, str) and value.isdigit():
return int(value)
if value in [None, ""]:
return None
option = option.__dict__ if isinstance(option, Question) else option
raise YunohostValidationError(
"app_argument_invalid",
name=option.get("name"),
error=m18n.n("invalid_number"),
)
def _prevalidate(self):
super()._prevalidate()
if self.value in [None, ""]:
return
if self.min is not None and int(self.value) < self.min:
raise YunohostValidationError(
"app_argument_invalid",
name=self.name,
error=m18n.n("invalid_number_min", min=self.min),
)
if self.max is not None and int(self.value) > self.max:
raise YunohostValidationError(
"app_argument_invalid",
name=self.name,
error=m18n.n("invalid_number_max", max=self.max),
)
class DisplayTextQuestion(Question):
argument_type = "display_text"
def __init__(
self, question, context: Mapping[str, Any] = {}, hooks: Dict[str, Callable] = {}
):
super().__init__(question, context, hooks)
self.optional = True
self.readonly = True
self.style = question.get(
"style", "info" if question["type"] == "alert" else ""
)
def _format_text_for_user_input_in_cli(self):
text = _value_for_locale(self.ask)
if self.style in ["success", "info", "warning", "danger"]:
color = {
"success": "green",
"info": "cyan",
"warning": "yellow",
"danger": "red",
}
prompt = m18n.g(self.style) if self.style != "danger" else m18n.n("danger")
return colorize(prompt, color[self.style]) + f" {text}"
else:
return text
class FileQuestion(Question):
argument_type = "file"
upload_dirs: List[str] = []
@classmethod
def clean_upload_dirs(cls):
# Delete files uploaded from API
for upload_dir in cls.upload_dirs:
if os.path.exists(upload_dir):
shutil.rmtree(upload_dir)
def __init__(
self, question, context: Mapping[str, Any] = {}, hooks: Dict[str, Callable] = {}
):
super().__init__(question, context, hooks)
self.accept = question.get("accept", "")
def _prevalidate(self):
if self.value is None:
self.value = self.current_value
super()._prevalidate()
# Validation should have already failed if required
if self.value in [None, ""]:
return self.value
if Moulinette.interface.type != "api":
if not os.path.exists(str(self.value)) or not os.path.isfile(
str(self.value)
):
raise YunohostValidationError(
"app_argument_invalid",
name=self.name,
error=m18n.n("file_does_not_exist", path=str(self.value)),
)
def _post_parse_value(self):
from base64 import b64decode
if not self.value:
return ""
upload_dir = tempfile.mkdtemp(prefix="ynh_filequestion_")
_, file_path = tempfile.mkstemp(dir=upload_dir)
FileQuestion.upload_dirs += [upload_dir]
logger.debug(f"Saving file {self.name} for file question into {file_path}")
def is_file_path(s):
return isinstance(s, str) and s.startswith("/") and os.path.exists(s)
if Moulinette.interface.type != "api" or is_file_path(self.value):
content = read_file(str(self.value), file_mode="rb")
else:
content = b64decode(self.value)
write_to_file(file_path, content, file_mode="wb")
self.value = file_path
return self.value
class ButtonQuestion(Question):
argument_type = "button"
enabled = None
def __init__(
self, question, context: Mapping[str, Any] = {}, hooks: Dict[str, Callable] = {}
):
super().__init__(question, context, hooks)
self.enabled = question.get("enabled", None)
ARGUMENTS_TYPE_PARSERS = {
"string": StringQuestion,
"text": StringQuestion,
"select": StringQuestion,
"tags": TagsQuestion,
"email": EmailQuestion,
"url": URLQuestion,
"date": DateQuestion,
"time": TimeQuestion,
"color": ColorQuestion,
"password": PasswordQuestion,
"path": PathQuestion,
"boolean": BooleanQuestion,
"domain": DomainQuestion,
"user": UserQuestion,
"group": GroupQuestion,
"number": NumberQuestion,
"range": NumberQuestion,
"display_text": DisplayTextQuestion,
"alert": DisplayTextQuestion,
"markdown": DisplayTextQuestion,
"file": FileQuestion,
"app": AppQuestion,
"button": ButtonQuestion,
OPTIONS = {
"display_text": DisplayTextOption,
"markdown": DisplayTextOption,
"alert": DisplayTextOption,
"button": ButtonOption,
"string": StringOption,
"text": StringOption,
"password": PasswordOption,
"color": ColorOption,
"number": NumberOption,
"range": NumberOption,
"boolean": BooleanOption,
"date": DateOption,
"time": TimeOption,
"email": EmailOption,
"path": WebPathOption,
"url": URLOption,
"file": FileOption,
"select": StringOption,
"tags": TagsOption,
"domain": DomainOption,
"app": AppOption,
"user": UserOption,
"group": GroupOption,
}
# ╭───────────────────────────────────────────────────────╮
# │ ╷ ╷╶┬╴╶┬╴╷ ╭─╴ │
# │ │ │ │ │ │ ╰─╮ │
# │ ╰─╯ ╵ ╶┴╴╰─╴╶─╯ │
# ╰───────────────────────────────────────────────────────╯
def ask_questions_and_parse_answers(
raw_questions: Dict,
prefilled_answers: Union[str, Mapping[str, Any]] = {},
current_values: Mapping[str, Any] = {},
hooks: Dict[str, Callable[[], None]] = {},
) -> List[Question]:
) -> List[BaseOption]:
"""Parse arguments store in either manifest.json or actions.json or from a
config panel against the user answers when they are present.
@ -969,7 +1017,7 @@ def ask_questions_and_parse_answers(
for name, raw_question in raw_questions.items():
raw_question["name"] = name
question_class = ARGUMENTS_TYPE_PARSERS[raw_question.get("type", "string")]
question_class = OPTIONS[raw_question.get("type", "string")]
raw_question["value"] = answers.get(name)
question = question_class(raw_question, context=context, hooks=hooks)
if question.type == "button":
@ -996,9 +1044,7 @@ def hydrate_questions_with_choices(raw_questions: List) -> List:
out = []
for raw_question in raw_questions:
question = ARGUMENTS_TYPE_PARSERS[raw_question.get("type", "string")](
raw_question
)
question = OPTIONS[raw_question.get("type", "string")](raw_question)
if question.choices:
raw_question["choices"] = question.choices
raw_question["default"] = question.default