Merge branch 'enh-config-panel-file' of github.com:YunoHost/yunohost into enh-config-panel-file

This commit is contained in:
Alexandre Aubin 2021-09-04 20:27:36 +02:00
commit b06570fb88
2 changed files with 323 additions and 265 deletions

View file

@ -56,7 +56,7 @@ from yunohost.utils import packages
from yunohost.utils.config import ( from yunohost.utils.config import (
ConfigPanel, ConfigPanel,
parse_args_in_yunohost_format, parse_args_in_yunohost_format,
YunoHostArgumentFormatParser, Question,
) )
from yunohost.utils.i18n import _value_for_locale from yunohost.utils.i18n import _value_for_locale
from yunohost.utils.error import YunohostError, YunohostValidationError from yunohost.utils.error import YunohostError, YunohostValidationError
@ -1769,7 +1769,7 @@ def app_config_set(
config_ = AppConfigPanel(app) config_ = AppConfigPanel(app)
YunoHostArgumentFormatParser.operation_logger = operation_logger Question.operation_logger = operation_logger
operation_logger.start() operation_logger.start()
result = config_.set(key, value, args, args_file) result = config_.set(key, value, args, args_file)
@ -1792,7 +1792,8 @@ class AppConfigPanel(ConfigPanel):
self.values = self._call_config_script("show") self.values = self._call_config_script("show")
def _apply(self): def _apply(self):
self.errors = self._call_config_script("apply", self.new_values) env = {key: str(value) for key, value in self.new_values.items()}
self.errors = self._call_config_script("apply", env=env)
def _call_config_script(self, action, env={}): def _call_config_script(self, action, env={}):
from yunohost.hook import hook_exec from yunohost.hook import hook_exec

View file

@ -80,12 +80,16 @@ class ConfigPanel:
result = {} result = {}
for panel, section, option in self._iterate(): for panel, section, option in self._iterate():
key = f"{panel['id']}.{section['id']}.{option['id']}" key = f"{panel['id']}.{section['id']}.{option['id']}"
if mode == "export": if mode == 'export':
result[option["id"]] = option.get("current_value") result[option['id']] = option.get('current_value')
else: else:
result[key] = {"ask": _value_for_locale(option["ask"])} if 'ask' in option:
if "current_value" in option: result[key] = {'ask': _value_for_locale(option['ask'])}
result[key]["value"] = option["current_value"] elif 'i18n' in self.config:
result[key] = {'ask': m18n.n(self.config['i18n'] + '_' + option['id'])}
if 'current_value' in option:
question_class = ARGUMENTS_TYPE_PARSERS[option.get("type", "string")]
result[key]['value'] = question_class.humanize(option['current_value'], option)
return result return result
@ -139,7 +143,7 @@ class ConfigPanel:
raise raise
finally: finally:
# Delete files uploaded from API # Delete files uploaded from API
FileArgumentParser.clean_upload_dirs() FileQuestion.clean_upload_dirs()
if self.errors: if self.errors:
return { return {
@ -283,16 +287,42 @@ class ConfigPanel:
parse_args_in_yunohost_format(self.args, section["options"]) parse_args_in_yunohost_format(self.args, section["options"])
) )
self.new_values = { self.new_values = {
key: str(value[0]) key: value[0]
for key, value in self.new_values.items() for key, value in self.new_values.items()
if not value[0] is None if not value[0] is None
} }
self.errors = None
def _get_default_values(self):
return { option['id']: option['default']
for _, _, option in self._iterate() if 'default' in option }
def _load_current_values(self):
"""
Retrieve entries in YAML file
And set default values if needed
"""
# Retrieve entries in the YAML
on_disk_settings = {}
if os.path.exists(self.save_path) and os.path.isfile(self.save_path):
on_disk_settings = read_yaml(self.save_path) or {}
# Inject defaults if needed (using the magic .update() ;))
self.values = self._get_default_values()
self.values.update(on_disk_settings)
def _apply(self): def _apply(self):
logger.info("Running config script...") logger.info("Saving the new configuration...")
dir_path = os.path.dirname(os.path.realpath(self.save_path)) dir_path = os.path.dirname(os.path.realpath(self.save_path))
if not os.path.exists(dir_path): if not os.path.exists(dir_path):
mkdir(dir_path, mode=0o700) mkdir(dir_path, mode=0o700)
values_to_save = {**self.values, **self.new_values}
if self.save_mode == 'diff':
defaults = self._get_default_values()
values_to_save = {k: v for k, v in values_to_save.items() if defaults.get(k) != v}
# Save the settings to the .yaml file # Save the settings to the .yaml file
write_to_yaml(self.save_path, self.new_values) write_to_yaml(self.save_path, self.new_values)
@ -300,13 +330,14 @@ class ConfigPanel:
from yunohost.service import service_reload_or_restart from yunohost.service import service_reload_or_restart
logger.info("Reloading services...")
services_to_reload = set() services_to_reload = set()
for panel, section, obj in self._iterate(["panel", "section", "option"]): for panel, section, obj in self._iterate(["panel", "section", "option"]):
services_to_reload |= set(obj.get("services", [])) services_to_reload |= set(obj.get("services", []))
services_to_reload = list(services_to_reload) services_to_reload = list(services_to_reload)
services_to_reload.sort(key="nginx".__eq__) services_to_reload.sort(key="nginx".__eq__)
if services_to_reload:
logger.info("Reloading services...")
for service in services_to_reload: for service in services_to_reload:
service = service.replace("__APP__", self.app) service = service.replace("__APP__", self.app)
service_reload_or_restart(service) service_reload_or_restart(service)
@ -323,140 +354,138 @@ class ConfigPanel:
yield (panel, section, option) yield (panel, section, option)
class Question: class Question(object):
"empty class to store questions information"
class YunoHostArgumentFormatParser(object):
hide_user_input_in_prompt = False hide_user_input_in_prompt = False
operation_logger = None operation_logger = None
def parse_question(self, question, user_answers): def __init__(self, question, user_answers):
parsed_question = Question() self.name = question["name"]
self.type = question.get("type", 'string')
parsed_question.name = question["name"] self.default = question.get("default", None)
parsed_question.type = question.get("type", "string") self.current_value = question.get("current_value")
parsed_question.default = question.get("default", None) self.optional = question.get("optional", False)
parsed_question.current_value = question.get("current_value") self.choices = question.get("choices", [])
parsed_question.optional = question.get("optional", False) self.pattern = question.get("pattern")
parsed_question.choices = question.get("choices", []) self.ask = question.get("ask", {'en': self.name})
parsed_question.pattern = question.get("pattern") self.help = question.get("help")
parsed_question.ask = question.get("ask", {"en": f"{parsed_question.name}"}) self.helpLink = question.get("helpLink")
parsed_question.help = question.get("help") self.value = user_answers.get(self.name)
parsed_question.helpLink = question.get("helpLink") self.redact = question.get('redact', False)
parsed_question.value = user_answers.get(parsed_question.name)
parsed_question.redact = question.get("redact", False)
# Empty value is parsed as empty string # Empty value is parsed as empty string
if parsed_question.default == "": if self.default == "":
parsed_question.default = None self.default = None
return parsed_question @staticmethod
def humanize(value, option={}):
return str(value)
def parse(self, question, user_answers): @staticmethod
question = self.parse_question(question, user_answers) def normalize(value, option={}):
return value
def ask_if_needed(self):
while True: while True:
# Display question if no value filled or if it's a readonly message # Display question if no value filled or if it's a readonly message
if Moulinette.interface.type == "cli": if Moulinette.interface.type== 'cli':
text_for_user_input_in_cli = self._format_text_for_user_input_in_cli( text_for_user_input_in_cli = self._format_text_for_user_input_in_cli()
question
)
if getattr(self, "readonly", False): if getattr(self, "readonly", False):
Moulinette.display(text_for_user_input_in_cli) Moulinette.display(text_for_user_input_in_cli)
elif question.value is None: elif self.value is None:
prefill = "" prefill = ""
if question.current_value is not None: if self.current_value is not None:
prefill = question.current_value prefill = self.humanize(self.current_value, self)
elif question.default is not None: elif self.default is not None:
prefill = question.default prefill = self.default
question.value = Moulinette.prompt( self.value = Moulinette.prompt(
message=text_for_user_input_in_cli, message=text_for_user_input_in_cli,
is_password=self.hide_user_input_in_prompt, is_password=self.hide_user_input_in_prompt,
confirm=self.hide_user_input_in_prompt, confirm=self.hide_user_input_in_prompt,
prefill=prefill, prefill=prefill,
is_multiline=(question.type == "text"), is_multiline=(self.type == "text"),
) )
# Normalization
# This is done to enforce a certain formating like for boolean
self.value = self.normalize(self.value, self)
# Apply default value # Apply default value
if question.value in [None, ""] and question.default is not None: if self.value in [None, ""] and self.default is not None:
question.value = ( self.value = (
getattr(self, "default_value", None) getattr(self, "default_value", None)
if question.default is None if self.default is None
else question.default else self.default
) )
# Prevalidation # Prevalidation
try: try:
self._prevalidate(question) self._prevalidate()
except YunohostValidationError as e: except YunohostValidationError as e:
if Moulinette.interface.type == "api": if Moulinette.interface.type == "api":
raise raise
Moulinette.display(str(e), "error") Moulinette.display(str(e), "error")
question.value = None self.value = None
continue continue
break break
# this is done to enforce a certain formating like for boolean self.value = self._post_parse_value()
# by default it doesn't do anything
question.value = self._post_parse_value(question)
return (question.value, self.argument_type) return (self.value, self.argument_type)
def _prevalidate(self, question):
if question.value in [None, ""] and not question.optional: def _prevalidate(self):
raise YunohostValidationError("app_argument_required", name=question.name) if self.value in [None, ""] and not self.optional:
raise YunohostValidationError("app_argument_required", name=self.name)
# we have an answer, do some post checks # we have an answer, do some post checks
if question.value is not None: if self.value is not None:
if question.choices and question.value not in question.choices: if self.choices and self.value not in self.choices:
self._raise_invalid_answer(question) self._raise_invalid_answer()
if question.pattern and not re.match( if self.pattern and not re.match(self.pattern['regexp'], str(self.value)):
question.pattern["regexp"], str(question.value)
):
raise YunohostValidationError( raise YunohostValidationError(
question.pattern["error"], self.pattern['error'],
name=question.name, name=self.name,
value=question.value, value=self.value,
) )
def _raise_invalid_answer(self, question): def _raise_invalid_answer(self):
raise YunohostValidationError( raise YunohostValidationError(
"app_argument_choice_invalid", "app_argument_choice_invalid",
name=question.name, name=self.name,
value=question.value, value=self.value,
choices=", ".join(question.choices), choices=", ".join(self.choices),
) )
def _format_text_for_user_input_in_cli(self, question): def _format_text_for_user_input_in_cli(self):
text_for_user_input_in_cli = _value_for_locale(question.ask) text_for_user_input_in_cli = _value_for_locale(self.ask)
if question.choices: if self.choices:
text_for_user_input_in_cli += " [{0}]".format(" | ".join(question.choices)) text_for_user_input_in_cli += " [{0}]".format(" | ".join(self.choices))
if question.help or question.helpLink: if self.help or self.helpLink:
text_for_user_input_in_cli += ":\033[m" text_for_user_input_in_cli += ":\033[m"
if question.help: if self.help:
text_for_user_input_in_cli += "\n - " text_for_user_input_in_cli += "\n - "
text_for_user_input_in_cli += _value_for_locale(question.help) text_for_user_input_in_cli += _value_for_locale(self.help)
if question.helpLink: if self.helpLink:
if not isinstance(question.helpLink, dict): if not isinstance(self.helpLink, dict):
question.helpLink = {"href": question.helpLink} self.helpLink = {"href": self.helpLink}
text_for_user_input_in_cli += f"\n - See {question.helpLink['href']}" text_for_user_input_in_cli += f"\n - See {self.helpLink['href']}"
return text_for_user_input_in_cli return text_for_user_input_in_cli
def _post_parse_value(self, question): def _post_parse_value(self):
if not question.redact: if not self.redact:
return question.value return self.value
# Tell the operation_logger to redact all password-type / secret args # Tell the operation_logger to redact all password-type / secret args
# Also redact the % escaped version of the password that might appear in # Also redact the % escaped version of the password that might appear in
# the 'args' section of metadata (relevant for password with non-alphanumeric char) # the 'args' section of metadata (relevant for password with non-alphanumeric char)
data_to_redact = [] data_to_redact = []
if question.value and isinstance(question.value, str): if self.value and isinstance(self.value, str):
data_to_redact.append(question.value) data_to_redact.append(self.value)
if question.current_value and isinstance(question.current_value, str): if self.current_value and isinstance(self.current_value, str):
data_to_redact.append(question.current_value) data_to_redact.append(self.current_value)
data_to_redact += [ data_to_redact += [
urllib.parse.quote(data) urllib.parse.quote(data)
for data in data_to_redact for data in data_to_redact
@ -467,48 +496,58 @@ class YunoHostArgumentFormatParser(object):
elif data_to_redact: elif data_to_redact:
raise YunohostError(f"Can't redact {question.name} because no operation logger available in the context", raw_msg=True) raise YunohostError(f"Can't redact {question.name} because no operation logger available in the context", raw_msg=True)
return question.value return self.value
class StringArgumentParser(YunoHostArgumentFormatParser): class StringQuestion(Question):
argument_type = "string" argument_type = "string"
default_value = "" default_value = ""
class TagsArgumentParser(YunoHostArgumentFormatParser): class TagsQuestion(Question):
argument_type = "tags" argument_type = "tags"
def _prevalidate(self, question): @staticmethod
values = question.value def humanize(value, option={}):
for value in values.split(","): if isinstance(value, list):
question.value = value return ','.join(value)
super()._prevalidate(question) return value
question.value = values
def _prevalidate(self):
values = self.value
if isinstance(values, str):
values = values.split(",")
for value in values:
self.value = value
super()._prevalidate()
self.value = values
class PasswordArgumentParser(YunoHostArgumentFormatParser): class PasswordQuestion(Question):
hide_user_input_in_prompt = True hide_user_input_in_prompt = True
argument_type = "password" argument_type = "password"
default_value = "" default_value = ""
forbidden_chars = "{}" forbidden_chars = "{}"
def parse_question(self, question, user_answers): def __init__(self, question, user_answers):
question = super(PasswordArgumentParser, self).parse_question( super().__init__(question, user_answers)
question, user_answers self.redact = True
) if self.default is not None:
question.redact = True
if question.default is not None:
raise YunohostValidationError( raise YunohostValidationError(
"app_argument_password_no_default", name=question.name "app_argument_password_no_default", name=self.name
) )
return question @staticmethod
def humanize(value, option={}):
if value:
return '***' # Avoid to display the password on screen
return ""
def _prevalidate(self, question): def _prevalidate(self):
super()._prevalidate(question) super()._prevalidate()
if question.value is not None: if self.value is not None:
if any(char in question.value for char in self.forbidden_chars): if any(char in self.value for char in self.forbidden_chars):
raise YunohostValidationError( raise YunohostValidationError(
"pattern_password_app", forbidden_chars=self.forbidden_chars "pattern_password_app", forbidden_chars=self.forbidden_chars
) )
@ -516,184 +555,214 @@ class PasswordArgumentParser(YunoHostArgumentFormatParser):
# If it's an optional argument the value should be empty or strong enough # If it's an optional argument the value should be empty or strong enough
from yunohost.utils.password import assert_password_is_strong_enough from yunohost.utils.password import assert_password_is_strong_enough
assert_password_is_strong_enough("user", question.value) assert_password_is_strong_enough("user", self.value)
class PathArgumentParser(YunoHostArgumentFormatParser): class PathQuestion(Question):
argument_type = "path" argument_type = "path"
default_value = "" default_value = ""
class BooleanArgumentParser(YunoHostArgumentFormatParser): class BooleanQuestion(Question):
argument_type = "boolean" argument_type = "boolean"
default_value = False default_value = False
yes_answers = ["1", "yes", "y", "true", "t", "on"]
no_answers = ["0", "no", "n", "false", "f", "off"]
def parse_question(self, question, user_answers): @staticmethod
question = super().parse_question(question, user_answers) def humanize(value, option={}):
yes = option.get('yes', 1)
no = option.get('no', 0)
value = str(value).lower()
if value == str(yes).lower():
return 'yes'
if value == str(no).lower():
return 'no'
if value in BooleanQuestion.yes_answers:
return 'yes'
if value in BooleanQuestion.no_answers:
return 'no'
if question.default is None: if value in ['none', ""]:
question.default = False return ''
return question raise YunohostValidationError(
"app_argument_choice_invalid",
name=self.name,
value=self.value,
choices="yes, no, y, n, 1, 0",
)
def _format_text_for_user_input_in_cli(self, question): @staticmethod
text_for_user_input_in_cli = _value_for_locale(question.ask) def normalize(value, option={}):
yes = option.get('yes', 1)
no = option.get('no', 0)
if str(value).lower() in BooleanQuestion.yes_answers:
return yes
if str(value).lower() in BooleanQuestion.no_answers:
return no
if value in [None, ""]:
return None
raise YunohostValidationError(
"app_argument_choice_invalid",
name=self.name,
value=self.value,
choices="yes, no, y, n, 1, 0",
)
def __init__(self, question, user_answers):
super().__init__(question, user_answers)
self.yes = question.get('yes', 1)
self.no = question.get('no', 0)
if self.default is None:
self.default = False
def _format_text_for_user_input_in_cli(self):
text_for_user_input_in_cli = _value_for_locale(self.ask)
text_for_user_input_in_cli += " [yes | no]" text_for_user_input_in_cli += " [yes | no]"
if question.default is not None: if self.default is not None:
formatted_default = "yes" if question.default else "no" formatted_default = self.humanize(self.default)
text_for_user_input_in_cli += " (default: {0})".format(formatted_default) text_for_user_input_in_cli += " (default: {0})".format(formatted_default)
return text_for_user_input_in_cli return text_for_user_input_in_cli
def _post_parse_value(self, question): def get(self, key, default=None):
if isinstance(question.value, bool): try:
return 1 if question.value else 0 return getattr(self, key)
except AttributeError:
if str(question.value).lower() in ["1", "yes", "y", "true"]: return default
return 1
if str(question.value).lower() in ["0", "no", "n", "false"]:
return 0
raise YunohostValidationError(
"app_argument_choice_invalid",
name=question.name,
value=question.value,
choices="yes, no, y, n, 1, 0",
)
class DomainArgumentParser(YunoHostArgumentFormatParser): class DomainQuestion(Question):
argument_type = "domain" argument_type = "domain"
def parse_question(self, question, user_answers): def __init__(self, question, user_answers):
from yunohost.domain import domain_list, _get_maindomain from yunohost.domain import domain_list, _get_maindomain
question = super(DomainArgumentParser, self).parse_question( super().__init__(question, user_answers)
question, user_answers
)
if question.default is None: if self.default is None:
question.default = _get_maindomain() self.default = _get_maindomain()
question.choices = domain_list()["domains"] self.choices = domain_list()["domains"]
return question
def _raise_invalid_answer(self, question): def _raise_invalid_answer(self):
raise YunohostValidationError( raise YunohostValidationError(
"app_argument_invalid", field=question.name, error=m18n.n("domain_unknown") "app_argument_invalid", field=self.name, error=m18n.n("domain_unknown")
) )
class UserArgumentParser(YunoHostArgumentFormatParser): class UserQuestion(Question):
argument_type = "user" argument_type = "user"
def parse_question(self, question, user_answers): def __init__(self, question, user_answers):
from yunohost.user import user_list, user_info from yunohost.user import user_list, user_info
from yunohost.domain import _get_maindomain from yunohost.domain import _get_maindomain
question = super(UserArgumentParser, self).parse_question( super().__init__(question, user_answers)
question, user_answers self.choices = user_list()["users"]
) if self.default is None:
question.choices = user_list()["users"]
if question.default is None:
root_mail = "root@%s" % _get_maindomain() root_mail = "root@%s" % _get_maindomain()
for user in question.choices.keys(): for user in self.choices.keys():
if root_mail in user_info(user).get("mail-aliases", []): if root_mail in user_info(user).get("mail-aliases", []):
question.default = user self.default = user
break break
return question
def _raise_invalid_answer(self, question): def _raise_invalid_answer(self):
raise YunohostValidationError( raise YunohostValidationError(
"app_argument_invalid", "app_argument_invalid",
field=question.name, field=self.name,
error=m18n.n("user_unknown", user=question.value), error=m18n.n("user_unknown", user=self.value),
) )
class NumberArgumentParser(YunoHostArgumentFormatParser): class NumberQuestion(Question):
argument_type = "number" argument_type = "number"
default_value = "" default_value = ""
def parse_question(self, question, user_answers): @staticmethod
question_parsed = super().parse_question(question, user_answers) def humanize(value, option={}):
question_parsed.min = question.get("min", None) return str(value)
question_parsed.max = question.get("max", None)
if question_parsed.default is None:
question_parsed.default = 0
return question_parsed def __init__(self, question, user_answers):
super().__init__(question, user_answers)
self.min = question.get("min", None)
self.max = question.get("max", None)
self.step = question.get("step", None)
def _prevalidate(self, question):
super()._prevalidate(question) def _prevalidate(self):
if not isinstance(question.value, int) and not ( super()._prevalidate()
isinstance(question.value, str) and question.value.isdigit() if not isinstance(self.value, int) and not (
isinstance(self.value, str) and self.value.isdigit()
): ):
raise YunohostValidationError( raise YunohostValidationError(
"app_argument_invalid", "app_argument_invalid",
field=question.name, field=self.name,
error=m18n.n("invalid_number"), error=m18n.n("invalid_number"),
) )
if question.min is not None and int(question.value) < question.min: if self.min is not None and int(self.value) < self.min:
raise YunohostValidationError( raise YunohostValidationError(
"app_argument_invalid", "app_argument_invalid",
field=question.name, field=self.name,
error=m18n.n("invalid_number"), error=m18n.n("invalid_number"),
) )
if question.max is not None and int(question.value) > question.max: if self.max is not None and int(self.value) > self.max:
raise YunohostValidationError( raise YunohostValidationError(
"app_argument_invalid", "app_argument_invalid",
field=question.name, field=self.name,
error=m18n.n("invalid_number"), error=m18n.n("invalid_number"),
) )
def _post_parse_value(self, question): def _post_parse_value(self):
if isinstance(question.value, int): if isinstance(self.value, int):
return super()._post_parse_value(question) return super()._post_parse_value()
if isinstance(question.value, str) and question.value.isdigit(): if isinstance(self.value, str) and self.value.isdigit():
return int(question.value) return int(self.value)
raise YunohostValidationError( raise YunohostValidationError(
"app_argument_invalid", field=question.name, error=m18n.n("invalid_number") "app_argument_invalid", field=self.name, error=m18n.n("invalid_number")
) )
class DisplayTextArgumentParser(YunoHostArgumentFormatParser): class DisplayTextQuestion(Question):
argument_type = "display_text" argument_type = "display_text"
readonly = True readonly = True
def parse_question(self, question, user_answers): def __init__(self, question, user_answers):
question_parsed = super().parse_question(question, user_answers) super().__init__(question, user_answers)
question_parsed.optional = True self.optional = True
question_parsed.style = question.get("style", "info") self.style = question.get("style", "info")
return question_parsed
def _format_text_for_user_input_in_cli(self, question): def _format_text_for_user_input_in_cli(self):
text = question.ask["en"] text = self.ask["en"]
if question.style in ["success", "info", "warning", "danger"]: if self.style in ["success", "info", "warning", "danger"]:
color = { color = {
"success": "green", "success": "green",
"info": "cyan", "info": "cyan",
"warning": "yellow", "warning": "yellow",
"danger": "red", "danger": "red",
} }
return colorize(m18n.g(question.style), color[question.style]) + f" {text}" return colorize(m18n.g(self.style), color[self.style]) + f" {text}"
else: else:
return text return text
class FileArgumentParser(YunoHostArgumentFormatParser): class FileQuestion(Question):
argument_type = "file" argument_type = "file"
upload_dirs = [] upload_dirs = []
@ -705,71 +774,58 @@ class FileArgumentParser(YunoHostArgumentFormatParser):
if os.path.exists(upload_dir): if os.path.exists(upload_dir):
shutil.rmtree(upload_dir) shutil.rmtree(upload_dir)
def parse_question(self, question, user_answers): def __init__(self, question, user_answers):
question_parsed = super().parse_question(question, user_answers) super().__init__(question, user_answers)
if question.get("accept"): if question.get("accept"):
question_parsed.accept = question.get("accept").replace(" ", "").split(",") self.accept = question.get("accept").replace(" ", "").split(",")
else: else:
question_parsed.accept = [] self.accept = []
if Moulinette.interface.type == "api": if Moulinette.interface.type== "api":
if user_answers.get(f"{question_parsed.name}[name]"): if user_answers.get(f"{self.name}[name]"):
question_parsed.value = { self.value = {
"content": question_parsed.value, "content": self.value,
"filename": user_answers.get( "filename": user_answers.get(f"{self.name}[name]", self.name),
f"{question_parsed.name}[name]", question_parsed.name
),
} }
# If path file are the same # If path file are the same
if ( if self.value and str(self.value) == self.current_value:
question_parsed.value self.value = None
and str(question_parsed.value) == question_parsed.current_value
):
question_parsed.value = None
return question_parsed
def _prevalidate(self, question): def _prevalidate(self):
super()._prevalidate(question) super()._prevalidate()
if ( if isinstance(self.value, str) and self.value and not os.path.exists(self.value):
isinstance(question.value, str)
and question.value
and not os.path.exists(question.value)
):
raise YunohostValidationError( raise YunohostValidationError(
"app_argument_invalid", "app_argument_invalid",
field=question.name, field=self.name,
error=m18n.n("file_does_not_exists"), error=m18n.n("file_does_not_exists"),
) )
if question.value in [None, ""] or not question.accept: if self.value in [None, ""] or not self.accept:
return return
filename = ( filename = self.value if isinstance(self.value, str) else self.value["filename"]
question.value if "." not in filename or "." + filename.split(".")[-1] not in self.accept:
if isinstance(question.value, str)
else question.value["filename"]
)
if "." not in filename or "." + filename.split(".")[-1] not in question.accept:
raise YunohostValidationError( raise YunohostValidationError(
"app_argument_invalid", "app_argument_invalid",
field=question.name, field=self.name,
error=m18n.n("file_extension_not_accepted"), error=m18n.n("file_extension_not_accepted"),
) )
def _post_parse_value(self, question):
def _post_parse_value(self):
from base64 import b64decode from base64 import b64decode
# Upload files from API # Upload files from API
# A file arg contains a string with "FILENAME:BASE64_CONTENT" # A file arg contains a string with "FILENAME:BASE64_CONTENT"
if not question.value: if not self.value:
return question.value return self.value
if Moulinette.interface.type == "api": if Moulinette.interface.type == "api":
upload_dir = tempfile.mkdtemp(prefix="tmp_configpanel_") upload_dir = tempfile.mkdtemp(prefix="tmp_configpanel_")
FileArgumentParser.upload_dirs += [upload_dir] FileQuestion.upload_dirs += [upload_dir]
filename = question.value["filename"] filename = self.value["filename"]
logger.debug( logger.debug(
f"Save uploaded file {question.value['filename']} from API into {upload_dir}" f"Save uploaded file {self.value['filename']} from API into {upload_dir}"
) )
# Filename is given by user of the API. For security reason, we have replaced # Filename is given by user of the API. For security reason, we have replaced
@ -782,35 +838,35 @@ class FileArgumentParser(YunoHostArgumentFormatParser):
while os.path.exists(file_path): while os.path.exists(file_path):
file_path = os.path.normpath(upload_dir + "/" + filename + (".%d" % i)) file_path = os.path.normpath(upload_dir + "/" + filename + (".%d" % i))
i += 1 i += 1
content = question.value["content"] content = self.value["content"]
write_to_file(file_path, b64decode(content), file_mode="wb") write_to_file(file_path, b64decode(content), file_mode="wb")
question.value = file_path self.value = file_path
return question.value return self.value
ARGUMENTS_TYPE_PARSERS = { ARGUMENTS_TYPE_PARSERS = {
"string": StringArgumentParser, "string": StringQuestion,
"text": StringArgumentParser, "text": StringQuestion,
"select": StringArgumentParser, "select": StringQuestion,
"tags": TagsArgumentParser, "tags": TagsQuestion,
"email": StringArgumentParser, "email": StringQuestion,
"url": StringArgumentParser, "url": StringQuestion,
"date": StringArgumentParser, "date": StringQuestion,
"time": StringArgumentParser, "time": StringQuestion,
"color": StringArgumentParser, "color": StringQuestion,
"password": PasswordArgumentParser, "password": PasswordQuestion,
"path": PathArgumentParser, "path": PathQuestion,
"boolean": BooleanArgumentParser, "boolean": BooleanQuestion,
"domain": DomainArgumentParser, "domain": DomainQuestion,
"user": UserArgumentParser, "user": UserQuestion,
"number": NumberArgumentParser, "number": NumberQuestion,
"range": NumberArgumentParser, "range": NumberQuestion,
"display_text": DisplayTextArgumentParser, "display_text": DisplayTextQuestion,
"alert": DisplayTextArgumentParser, "alert": DisplayTextQuestion,
"markdown": DisplayTextArgumentParser, "markdown": DisplayTextQuestion,
"file": FileArgumentParser, "file": FileQuestion,
} }
@ -828,10 +884,11 @@ def parse_args_in_yunohost_format(user_answers, argument_questions):
parsed_answers_dict = OrderedDict() parsed_answers_dict = OrderedDict()
for question in argument_questions: for question in argument_questions:
parser = ARGUMENTS_TYPE_PARSERS[question.get("type", "string")]() question_class = ARGUMENTS_TYPE_PARSERS[question.get("type", "string")]
question = question_class(question, user_answers)
answer = parser.parse(question=question, user_answers=user_answers) answer = question.ask_if_needed()
if answer is not None: if answer is not None:
parsed_answers_dict[question["name"]] = answer parsed_answers_dict[question.name] = answer
return parsed_answers_dict return parsed_answers_dict