diff --git a/data/helpers.d/utils b/data/helpers.d/utils index 1c4f73ddf..14e7ebe4a 100644 --- a/data/helpers.d/utils +++ b/data/helpers.d/utils @@ -551,22 +551,23 @@ ynh_set_var() { local crazy_value="$(grep -i -o -P '^[ \t]*\$?(\w*\[)?[ \t]*["'"']?${key}['"'"]?[ \t]*\]?[ \t]*[:=]>?[ \t]*\K.*(?=[ \t,\n;]*$)' ${file} | head -n1)" # local crazy_value="$(grep -i -o -P "^${var_part}\K.*(?=[ \t,\n;]*\$)" ${file} | head -n1)" local first_char="${crazy_value:0:1}" + delimiter=$'\001' if [[ "$first_char" == '"' ]] ; then # \ and sed is quite complex you need 2 \\ to get one in a sed # So we need \\\\ to go through 2 sed value="$(echo "$value" | sed 's/"/\\\\"/g')" - sed -ri 'sø^('"${var_part}"'")([^"]|\\")*("[ \t;,]*)$ø\1'"${value}"'\4øi' ${file} + sed -ri s$delimiter'^('"${var_part}"'")([^"]|\\")*("[ \t;,]*)$'$delimiter'\1'"${value}"'\4'$delimiter'i' ${file} elif [[ "$first_char" == "'" ]] ; then # \ and sed is quite complex you need 2 \\ to get one in a sed # However double quotes implies to double \\ to # So we need \\\\\\\\ to go through 2 sed and 1 double quotes str value="$(echo "$value" | sed "s/'/\\\\\\\\'/g")" - sed -ri "sø^(${var_part}')([^']|\\')*('"'[ \t,;]*)$ø\1'"${value}"'\4øi' ${file} + sed -ri "s$delimiter^(${var_part}')([^']|\\')*('"'[ \t,;]*)$'$delimiter'\1'"${value}"'\4'$delimiter'i' ${file} else if [[ "$value" == *"'"* ]] || [[ "$value" == *'"'* ]] ; then value='\"'"$(echo "$value" | sed 's/"/\\\\"/g')"'\"' fi - sed -ri "sø^(${var_part}).*"'$ø\1'"${value}"'øi' ${file} + sed -ri "s$delimiter^(${var_part}).*"'$'$delimiter'\1'"${value}"$delimiter'i' ${file} fi } diff --git a/src/yunohost/app.py b/src/yunohost/app.py index de6df6579..522f695e2 100644 --- a/src/yunohost/app.py +++ b/src/yunohost/app.py @@ -33,7 +33,6 @@ import re import subprocess import glob import urllib.parse -import base64 import tempfile from collections import OrderedDict @@ -54,8 +53,10 @@ from moulinette.utils.filesystem import ( mkdir, ) -from yunohost.service import service_status, _run_service_command, _get_services -from yunohost.utils import packages +from yunohost.service import service_status, _run_service_command +from yunohost.utils import packages, config +from yunohost.utils.config import ConfigPanel, parse_args_in_yunohost_format, YunoHostArgumentFormatParser +from yunohost.utils.i18n import _value_for_locale from yunohost.utils.error import YunohostError, YunohostValidationError from yunohost.utils.filesystem import free_space_in_directory from yunohost.log import is_unit_operation, OperationLogger @@ -70,7 +71,6 @@ APPS_CATALOG_CONF = "/etc/yunohost/apps_catalog.yml" APPS_CATALOG_API_VERSION = 2 APPS_CATALOG_DEFAULT_URL = "https://app.yunohost.org/default" -APPS_CONFIG_PANEL_VERSION_SUPPORTED = 1.0 re_app_instance_name = re.compile( r"^(?P[\w-]+?)(__(?P[1-9][0-9]*))?$" ) @@ -1756,51 +1756,12 @@ def app_action_run(operation_logger, app, action, args=None): return logger.success("Action successed!") -@is_unit_operation() -def app_config_get(operation_logger, app, key='', mode='classic'): +def app_config_get(app, key='', mode='classic'): """ Display an app configuration in classic, full or export mode """ - - # Check app is installed - _assert_is_installed(app) - - filter_key = key or '' - - # Read config panel toml - config_panel = _get_app_config_panel(app, filter_key=filter_key) - - if not config_panel: - raise YunohostError("app_config_no_panel") - - # Call config script in order to hydrate config panel with current values - values = _call_config_script(operation_logger, app, 'show', config_panel=config_panel) - - # Format result in full mode - if mode == 'full': - operation_logger.success() - return config_panel - - # In 'classic' mode, we display the current value if key refer to an option - if filter_key.count('.') == 2 and mode == 'classic': - option = filter_key.split('.')[-1] - operation_logger.success() - return values.get(option, None) - - # Format result in 'classic' or 'export' mode - logger.debug(f"Formating result in '{mode}' mode") - result = {} - for panel, section, option in _get_config_iterator(config_panel): - key = f"{panel['id']}.{section['id']}.{option['id']}" - if mode == 'export': - result[option['id']] = option.get('current_value') - else: - result[key] = { 'ask': _value_for_locale(option['ask']) } - if 'current_value' in option: - result[key]['value'] = option['current_value'] - - operation_logger.success() - return result + config = AppConfigPanel(app) + return config.get(key, mode) @is_unit_operation() @@ -1809,182 +1770,65 @@ def app_config_set(operation_logger, app, key=None, value=None, args=None, args_ Apply a new app configuration """ - # Check app is installed - _assert_is_installed(app) - - filter_key = key or '' - - # Read config panel toml - config_panel = _get_app_config_panel(app, filter_key=filter_key) - - if not config_panel: - raise YunohostError("app_config_no_panel") - - if (args is not None or args_file is not None) and value is not None: - raise YunohostError("app_config_args_value") - - if filter_key.count('.') != 2 and not value is None: - raise YunohostError("app_config_set_value_on_section") - - # Import and parse pre-answered options - logger.debug("Import and parse pre-answered options") - args = urllib.parse.parse_qs(args or '', keep_blank_values=True) - args = { key: ','.join(value_) for key, value_ in args.items() } - - if args_file: - # Import YAML / JSON file but keep --args values - args = { **read_yaml(args_file), **args } - - if value is not None: - args = {filter_key.split('.')[-1]: value} - - # Call config script in order to hydrate config panel with current values - _call_config_script(operation_logger, app, 'show', config_panel=config_panel) - - # Ask unanswered question and prevalidate - logger.debug("Ask unanswered question and prevalidate data") - def display_header(message): - """ CLI panel/section header display - """ - if Moulinette.interface.type == 'cli' and filter_key.count('.') < 2: - Moulinette.display(colorize(message, 'purple')) - - try: - env = {} - for panel, section, obj in _get_config_iterator(config_panel, - ['panel', 'section']): - if panel == obj: - name = _value_for_locale(panel['name']) - display_header(f"\n{'='*40}\n>>>> {name}\n{'='*40}") - continue - name = _value_for_locale(section['name']) - display_header(f"\n# {name}") - - # Check and ask unanswered questions - env.update(_parse_args_in_yunohost_format( - args, section['options'] - )) - - # Call config script in 'apply' mode - logger.info("Running config script...") - env = {key: str(value[0]) for key, value in env.items() if not value[0] is None} - - errors = _call_config_script(operation_logger, app, 'apply', env=env) - # Script got manually interrupted ... - # N.B. : KeyboardInterrupt does not inherit from Exception - except (KeyboardInterrupt, EOFError): - error = m18n.n("operation_interrupted") - logger.error(m18n.n("app_config_failed", app=app, error=error)) - failure_message_with_debug_instructions = operation_logger.error(error) - raise - # Something wrong happened in Yunohost's code (most probably hook_exec) - except Exception: - import traceback - - error = m18n.n("unexpected_error", error="\n" + traceback.format_exc()) - logger.error(m18n.n("app_config_failed", app=app, error=error)) - failure_message_with_debug_instructions = operation_logger.error(error) - raise - finally: - # Delete files uploaded from API - FileArgumentParser.clean_upload_dirs() - - if errors: - return { - "errors": errors, - } - - # Reload services - logger.info("Reloading services...") - services_to_reload = set() - for panel, section, obj in _get_config_iterator(config_panel, - ['panel', 'section', 'option']): - services_to_reload |= set(obj.get('services', [])) - - services_to_reload = list(services_to_reload) - services_to_reload.sort(key = 'nginx'.__eq__) - for service in services_to_reload: - service = service.replace('__APP__', app) - logger.debug(f"Reloading {service}") - if not _run_service_command('reload-or-restart', service): - services = _get_services() - test_conf = services[service].get('test_conf', 'true') - errors = check_output(f"{test_conf}; exit 0") if test_conf else '' - raise YunohostError( - "app_config_failed_service_reload", - service=service, errors=errors - ) - - logger.success("Config updated as expected") - return {} - - -def _get_config_iterator(config_panel, trigger=['option']): - for panel in config_panel.get("panels", []): - if 'panel' in trigger: - yield (panel, None, panel) - for section in panel.get("sections", []): - if 'section' in trigger: - yield (panel, section, section) - if 'option' in trigger: - for option in section.get("options", []): - yield (panel, section, option) - - -def _call_config_script(operation_logger, app, action, env={}, config_panel=None): - from yunohost.hook import hook_exec + config = AppConfigPanel(app) YunoHostArgumentFormatParser.operation_logger = operation_logger operation_logger.start() - # Add default config script if needed - config_script = os.path.join(APPS_SETTING_PATH, app, "scripts", "config") - if not os.path.exists(config_script): - logger.debug("Adding a default config script") - default_script = """#!/bin/bash + result = config.set(key, value, args, args_file) + if "errors" not in result: + operation_logger.success() + return result + +class AppConfigPanel(ConfigPanel): + def __init__(self, app): + + # Check app is installed + _assert_is_installed(app) + + self.app = app + config_path = os.path.join(APPS_SETTING_PATH, app, "config_panel.toml") + super().__init__(config_path=config_path) + + def _load_current_values(self): + self.values = self._call_config_script('show') + + def _apply(self): + self.errors = self._call_config_script('apply', self.new_values) + + def _call_config_script(self, action, env={}): + from yunohost.hook import hook_exec + + # Add default config script if needed + config_script = os.path.join(APPS_SETTING_PATH, self.app, "scripts", "config") + if not os.path.exists(config_script): + logger.debug("Adding a default config script") + default_script = """#!/bin/bash source /usr/share/yunohost/helpers ynh_abort_if_errors final_path=$(ynh_app_setting_get $app final_path) ynh_app_config_run $1 """ - write_to_file(config_script, default_script) + write_to_file(config_script, default_script) - # Call config script to extract current values - logger.debug(f"Calling '{action}' action from config script") - app_id, app_instance_nb = _parse_app_instance_name(app) - env.update({ - "app_id": app_id, - "app": app, - "app_instance_nb": str(app_instance_nb), - }) - - ret, parsed_values = hook_exec( - config_script, args=[action], env=env - ) - if ret != 0: - if action == 'show': - raise YunohostError("app_config_unable_to_read_values") - else: - raise YunohostError("app_config_unable_to_apply_values_correctly") - - return parsed_values - - if not config_panel: - return parsed_values - - # Hydrating config panel with current value - logger.debug("Hydrating config with current values") - for _, _, option in _get_config_iterator(config_panel): - if option['name'] not in parsed_values: - continue - value = parsed_values[option['name']] - # In general, the value is just a simple value. - # Sometimes it could be a dict used to overwrite the option itself - value = value if isinstance(value, dict) else {'current_value': value } - option.update(value) - - return parsed_values + # Call config script to extract current values + logger.debug(f"Calling '{action}' action from config script") + app_id, app_instance_nb = _parse_app_instance_name(self.app) + env.update({ + "app_id": app_id, + "app": self.app, + "app_instance_nb": str(app_instance_nb), + }) + ret, values = hook_exec( + config_script, args=[action], env=env + ) + if ret != 0: + if action == 'show': + raise YunohostError("app_config_unable_to_read_values") + else: + raise YunohostError("app_config_unable_to_apply_values_correctly") + return values def _get_all_installed_apps_id(): """ @@ -2087,163 +1931,6 @@ def _get_app_actions(app_id): return None -def _get_app_config_panel(app_id, filter_key=''): - "Get app config panel stored in json or in toml" - - # Split filter_key - filter_key = dict(enumerate(filter_key.split('.'))) - if len(filter_key) > 3: - raise YunohostError("app_config_too_much_sub_keys") - - # Open TOML - config_panel_toml_path = os.path.join( - APPS_SETTING_PATH, app_id, "config_panel.toml" - ) - - # sample data to get an idea of what is going on - # this toml extract: - # - # version = "0.1" - # name = "Unattended-upgrades configuration panel" - # - # [main] - # name = "Unattended-upgrades configuration" - # - # [main.unattended_configuration] - # name = "50unattended-upgrades configuration file" - # - # [main.unattended_configuration.upgrade_level] - # name = "Choose the sources of packages to automatically upgrade." - # default = "Security only" - # type = "text" - # help = "We can't use a choices field for now. In the meantime[...]" - # # choices = ["Security only", "Security and updates"] - - # [main.unattended_configuration.ynh_update] - # name = "Would you like to update YunoHost packages automatically ?" - # type = "bool" - # default = true - # - # will be parsed into this: - # - # OrderedDict([(u'version', u'0.1'), - # (u'name', u'Unattended-upgrades configuration panel'), - # (u'main', - # OrderedDict([(u'name', u'Unattended-upgrades configuration'), - # (u'unattended_configuration', - # OrderedDict([(u'name', - # u'50unattended-upgrades configuration file'), - # (u'upgrade_level', - # OrderedDict([(u'name', - # u'Choose the sources of packages to automatically upgrade.'), - # (u'default', - # u'Security only'), - # (u'type', u'text'), - # (u'help', - # u"We can't use a choices field for now. In the meantime please choose between one of this values:
Security only, Security and updates.")])), - # (u'ynh_update', - # OrderedDict([(u'name', - # u'Would you like to update YunoHost packages automatically ?'), - # (u'type', u'bool'), - # (u'default', True)])), - # - # and needs to be converted into this: - # - # {u'name': u'Unattended-upgrades configuration panel', - # u'panel': [{u'id': u'main', - # u'name': u'Unattended-upgrades configuration', - # u'sections': [{u'id': u'unattended_configuration', - # u'name': u'50unattended-upgrades configuration file', - # u'options': [{u'//': u'"choices" : ["Security only", "Security and updates"]', - # u'default': u'Security only', - # u'help': u"We can't use a choices field for now. In the meantime[...]", - # u'id': u'upgrade_level', - # u'name': u'Choose the sources of packages to automatically upgrade.', - # u'type': u'text'}, - # {u'default': True, - # u'id': u'ynh_update', - # u'name': u'Would you like to update YunoHost packages automatically ?', - # u'type': u'bool'}, - - if not os.path.exists(config_panel_toml_path): - return None - toml_config_panel = read_toml(config_panel_toml_path) - - # Check TOML config panel is in a supported version - if float(toml_config_panel["version"]) < APPS_CONFIG_PANEL_VERSION_SUPPORTED: - raise YunohostError( - "app_config_too_old_version", app=app_id, - version=toml_config_panel["version"] - ) - - # Transform toml format into internal format - defaults = { - 'toml': { - 'version': 1.0 - }, - 'panels': { - 'name': '', - 'services': [], - 'actions': {'apply': {'en': 'Apply'}} - }, # help - 'sections': { - 'name': '', - 'services': [], - 'optional': True - }, # visibleIf help - 'options': {} - # ask type source help helpLink example style icon placeholder visibleIf - # optional choices pattern limit min max step accept redact - } - - def convert(toml_node, node_type): - """Convert TOML in internal format ('full' mode used by webadmin) - - Here are some properties of 1.0 config panel in toml: - - node properties and node children are mixed, - - text are in english only - - some properties have default values - This function detects all children nodes and put them in a list - """ - # Prefill the node default keys if needed - default = defaults[node_type] - node = {key: toml_node.get(key, value) for key, value in default.items()} - - # Define the filter_key part to use and the children type - i = list(defaults).index(node_type) - search_key = filter_key.get(i) - subnode_type = list(defaults)[i+1] if node_type != 'options' else None - - for key, value in toml_node.items(): - # Key/value are a child node - if isinstance(value, OrderedDict) and key not in default and subnode_type: - # We exclude all nodes not referenced by the filter_key - if search_key and key != search_key: - continue - subnode = convert(value, subnode_type) - subnode['id'] = key - if node_type == 'sections': - subnode['name'] = key # legacy - subnode.setdefault('optional', toml_node.get('optional', True)) - node.setdefault(subnode_type, []).append(subnode) - # Key/value are a property - else: - # Todo search all i18n keys - node[key] = value if key not in ['ask', 'help', 'name'] else { 'en': value } - return node - - config_panel = convert(toml_config_panel, 'toml') - - try: - config_panel['panels'][0]['sections'][0]['options'][0] - except (KeyError, IndexError): - raise YunohostError( - "app_config_empty_or_bad_filter_key", app=app_id, filter_key=filter_key - ) - - return config_panel - - def _get_app_settings(app_id): """ Get settings of an installed app @@ -2695,30 +2382,6 @@ def _installed_apps(): return os.listdir(APPS_SETTING_PATH) -def _value_for_locale(values): - """ - Return proper value for current locale - - Keyword arguments: - values -- A dict of values associated to their locale - - Returns: - An utf-8 encoded string - - """ - if not isinstance(values, dict): - return values - - for lang in [m18n.locale, m18n.default_locale]: - try: - return values[lang] - except KeyError: - continue - - # Fallback to first value - return list(values.values())[0] - - def _check_manifest_requirements(manifest, app_instance_name): """Check if required packages are met from the manifest""" @@ -2765,7 +2428,7 @@ def _parse_args_from_manifest(manifest, action, args={}): return OrderedDict() action_args = manifest["arguments"][action] - return _parse_args_in_yunohost_format(args, action_args) + return parse_args_in_yunohost_format(args, action_args) def _parse_args_for_action(action, args={}): @@ -2789,507 +2452,11 @@ def _parse_args_for_action(action, args={}): action_args = action["arguments"] - return _parse_args_in_yunohost_format(args, action_args) + return parse_args_in_yunohost_format(args, action_args) -class Question: - "empty class to store questions information" -class YunoHostArgumentFormatParser(object): - hide_user_input_in_prompt = False - operation_logger = None - - def parse_question(self, question, user_answers): - parsed_question = Question() - - parsed_question.name = question["name"] - parsed_question.type = question.get("type", 'string') - parsed_question.default = question.get("default", None) - parsed_question.current_value = question.get("current_value") - parsed_question.optional = question.get("optional", False) - parsed_question.choices = question.get("choices", []) - parsed_question.pattern = question.get("pattern") - parsed_question.ask = question.get("ask", {'en': f"{parsed_question.name}"}) - parsed_question.help = question.get("help") - parsed_question.helpLink = question.get("helpLink") - parsed_question.value = user_answers.get(parsed_question.name) - parsed_question.redact = question.get('redact', False) - - # Empty value is parsed as empty string - if parsed_question.default == "": - parsed_question.default = None - - return parsed_question - - def parse(self, question, user_answers): - question = self.parse_question(question, user_answers) - - while True: - # Display question if no value filled or if it's a readonly message - if Moulinette.interface.type== 'cli': - text_for_user_input_in_cli = self._format_text_for_user_input_in_cli( - question - ) - if getattr(self, "readonly", False): - Moulinette.display(text_for_user_input_in_cli) - - elif question.value is None: - prefill = "" - if question.current_value is not None: - prefill = question.current_value - elif question.default is not None: - prefill = question.default - question.value = Moulinette.prompt( - message=text_for_user_input_in_cli, - is_password=self.hide_user_input_in_prompt, - confirm=self.hide_user_input_in_prompt, - prefill=prefill, - is_multiline=(question.type == "text") - ) - - - # Apply default value - if question.value in [None, ""] and question.default is not None: - question.value = ( - getattr(self, "default_value", None) - if question.default is None - else question.default - ) - - # Prevalidation - try: - self._prevalidate(question) - except YunohostValidationError as e: - if Moulinette.interface.type== 'api': - raise - Moulinette.display(str(e), 'error') - question.value = None - continue - break - # this is done to enforce a certain formating like for boolean - # by default it doesn't do anything - question.value = self._post_parse_value(question) - - return (question.value, self.argument_type) - - def _prevalidate(self, question): - if question.value in [None, ""] and not question.optional: - raise YunohostValidationError( - "app_argument_required", name=question.name - ) - - # we have an answer, do some post checks - if question.value is not None: - if question.choices and question.value not in question.choices: - self._raise_invalid_answer(question) - if question.pattern and not re.match(question.pattern['regexp'], str(question.value)): - raise YunohostValidationError( - question.pattern['error'], - name=question.name, - value=question.value, - ) - - def _raise_invalid_answer(self, question): - raise YunohostValidationError( - "app_argument_choice_invalid", - name=question.name, - value=question.value, - choices=", ".join(question.choices), - ) - - def _format_text_for_user_input_in_cli(self, question): - text_for_user_input_in_cli = _value_for_locale(question.ask) - - if question.choices: - text_for_user_input_in_cli += " [{0}]".format(" | ".join(question.choices)) - - if question.help or question.helpLink: - text_for_user_input_in_cli += ":\033[m" - if question.help: - text_for_user_input_in_cli += "\n - " - text_for_user_input_in_cli += _value_for_locale(question.help) - if question.helpLink: - if not isinstance(question.helpLink, dict): - question.helpLink = {'href': question.helpLink} - text_for_user_input_in_cli += f"\n - See {question.helpLink['href']}" - return text_for_user_input_in_cli - - def _post_parse_value(self, question): - if not question.redact: - return question.value - - # Tell the operation_logger to redact all password-type / secret args - # Also redact the % escaped version of the password that might appear in - # the 'args' section of metadata (relevant for password with non-alphanumeric char) - data_to_redact = [] - if question.value and isinstance(question.value, str): - data_to_redact.append(question.value) - if question.current_value and isinstance(question.current_value, str): - data_to_redact.append(question.current_value) - data_to_redact += [ - urllib.parse.quote(data) - for data in data_to_redact - if urllib.parse.quote(data) != data - ] - if self.operation_logger: - self.operation_logger.data_to_redact.extend(data_to_redact) - elif data_to_redact: - raise YunohostError("app_argument_cant_redact", arg=question.name) - - return question.value - - -class StringArgumentParser(YunoHostArgumentFormatParser): - argument_type = "string" - default_value = "" - -class TagsArgumentParser(YunoHostArgumentFormatParser): - argument_type = "tags" - - def _prevalidate(self, question): - values = question.value - for value in values.split(','): - question.value = value - super()._prevalidate(question) - question.value = values - - - -class PasswordArgumentParser(YunoHostArgumentFormatParser): - hide_user_input_in_prompt = True - argument_type = "password" - default_value = "" - forbidden_chars = "{}" - - def parse_question(self, question, user_answers): - question = super(PasswordArgumentParser, self).parse_question( - question, user_answers - ) - question.redact = True - if question.default is not None: - raise YunohostValidationError( - "app_argument_password_no_default", name=question.name - ) - - return question - - def _prevalidate(self, question): - super()._prevalidate(question) - - if question.value is not None: - if any(char in question.value for char in self.forbidden_chars): - raise YunohostValidationError( - "pattern_password_app", forbidden_chars=self.forbidden_chars - ) - - # If it's an optional argument the value should be empty or strong enough - from yunohost.utils.password import assert_password_is_strong_enough - - assert_password_is_strong_enough("user", question.value) - - -class PathArgumentParser(YunoHostArgumentFormatParser): - argument_type = "path" - default_value = "" - - -class BooleanArgumentParser(YunoHostArgumentFormatParser): - argument_type = "boolean" - default_value = False - - def parse_question(self, question, user_answers): - question = super().parse_question( - question, user_answers - ) - - if question.default is None: - question.default = False - - return question - - def _format_text_for_user_input_in_cli(self, question): - text_for_user_input_in_cli = _value_for_locale(question.ask) - - text_for_user_input_in_cli += " [yes | no]" - - if question.default is not None: - formatted_default = "yes" if question.default else "no" - text_for_user_input_in_cli += " (default: {0})".format(formatted_default) - - return text_for_user_input_in_cli - - def _post_parse_value(self, question): - if isinstance(question.value, bool): - return 1 if question.value else 0 - - if str(question.value).lower() in ["1", "yes", "y", "true"]: - return 1 - - if str(question.value).lower() in ["0", "no", "n", "false"]: - return 0 - - raise YunohostValidationError( - "app_argument_choice_invalid", - name=question.name, - value=question.value, - choices="yes, no, y, n, 1, 0", - ) - - -class DomainArgumentParser(YunoHostArgumentFormatParser): - argument_type = "domain" - - def parse_question(self, question, user_answers): - from yunohost.domain import domain_list, _get_maindomain - - question = super(DomainArgumentParser, self).parse_question( - question, user_answers - ) - - if question.default is None: - question.default = _get_maindomain() - - question.choices = domain_list()["domains"] - - return question - - def _raise_invalid_answer(self, question): - raise YunohostValidationError( - "app_argument_invalid", field=question.name, error=m18n.n("domain_unknown") - ) - - -class UserArgumentParser(YunoHostArgumentFormatParser): - argument_type = "user" - - def parse_question(self, question, user_answers): - from yunohost.user import user_list, user_info - from yunohost.domain import _get_maindomain - - question = super(UserArgumentParser, self).parse_question( - question, user_answers - ) - question.choices = user_list()["users"] - if question.default is None: - root_mail = "root@%s" % _get_maindomain() - for user in question.choices.keys(): - if root_mail in user_info(user).get("mail-aliases", []): - question.default = user - break - - return question - - def _raise_invalid_answer(self, question): - raise YunohostValidationError( - "app_argument_invalid", - field=question.name, - error=m18n.n("user_unknown", user=question.value), - ) - - -class NumberArgumentParser(YunoHostArgumentFormatParser): - argument_type = "number" - default_value = "" - - def parse_question(self, question, user_answers): - question_parsed = super().parse_question( - question, user_answers - ) - question_parsed.min = question.get('min', None) - question_parsed.max = question.get('max', None) - if question_parsed.default is None: - question_parsed.default = 0 - - return question_parsed - - def _prevalidate(self, question): - super()._prevalidate(question) - if not isinstance(question.value, int) and not (isinstance(question.value, str) and question.value.isdigit()): - raise YunohostValidationError( - "app_argument_invalid", field=question.name, error=m18n.n("invalid_number") - ) - - if question.min is not None and int(question.value) < question.min: - raise YunohostValidationError( - "app_argument_invalid", field=question.name, error=m18n.n("invalid_number") - ) - - if question.max is not None and int(question.value) > question.max: - raise YunohostValidationError( - "app_argument_invalid", field=question.name, error=m18n.n("invalid_number") - ) - - def _post_parse_value(self, question): - if isinstance(question.value, int): - return super()._post_parse_value(question) - - if isinstance(question.value, str) and question.value.isdigit(): - return int(question.value) - - raise YunohostValidationError( - "app_argument_invalid", field=question.name, error=m18n.n("invalid_number") - ) - - -class DisplayTextArgumentParser(YunoHostArgumentFormatParser): - argument_type = "display_text" - readonly = True - - def parse_question(self, question, user_answers): - question_parsed = super().parse_question( - question, user_answers - ) - - question_parsed.optional = True - question_parsed.style = question.get('style', 'info') - - return question_parsed - - def _format_text_for_user_input_in_cli(self, question): - text = question.ask['en'] - - if question.style in ['success', 'info', 'warning', 'danger']: - color = { - 'success': 'green', - 'info': 'cyan', - 'warning': 'yellow', - 'danger': 'red' - } - return colorize(m18n.g(question.style), color[question.style]) + f" {text}" - else: - return text - -class FileArgumentParser(YunoHostArgumentFormatParser): - argument_type = "file" - upload_dirs = [] - - @classmethod - def clean_upload_dirs(cls): - # Delete files uploaded from API - if Moulinette.interface.type== 'api': - for upload_dir in cls.upload_dirs: - if os.path.exists(upload_dir): - shutil.rmtree(upload_dir) - - def parse_question(self, question, user_answers): - question_parsed = super().parse_question( - question, user_answers - ) - if question.get('accept'): - question_parsed.accept = question.get('accept').replace(' ', '').split(',') - else: - question_parsed.accept = [] - if Moulinette.interface.type== 'api': - if user_answers.get(f"{question_parsed.name}[name]"): - question_parsed.value = { - 'content': question_parsed.value, - 'filename': user_answers.get(f"{question_parsed.name}[name]", question_parsed.name), - } - # If path file are the same - if question_parsed.value and str(question_parsed.value) == question_parsed.current_value: - question_parsed.value = None - - return question_parsed - - def _prevalidate(self, question): - super()._prevalidate(question) - if isinstance(question.value, str) and question.value and not os.path.exists(question.value): - raise YunohostValidationError( - "app_argument_invalid", field=question.name, error=m18n.n("invalid_number1") - ) - if question.value in [None, ''] or not question.accept: - return - - filename = question.value if isinstance(question.value, str) else question.value['filename'] - if '.' not in filename or '.' + filename.split('.')[-1] not in question.accept: - raise YunohostValidationError( - "app_argument_invalid", field=question.name, error=m18n.n("invalid_number2") - ) - - - def _post_parse_value(self, question): - from base64 import b64decode - # Upload files from API - # A file arg contains a string with "FILENAME:BASE64_CONTENT" - if not question.value: - return question.value - - if Moulinette.interface.type== 'api': - - upload_dir = tempfile.mkdtemp(prefix='tmp_configpanel_') - FileArgumentParser.upload_dirs += [upload_dir] - filename = question.value['filename'] - logger.debug(f"Save uploaded file {question.value['filename']} from API into {upload_dir}") - - # Filename is given by user of the API. For security reason, we have replaced - # os.path.join to avoid the user to be able to rewrite a file in filesystem - # i.e. os.path.join("/foo", "/etc/passwd") == "/etc/passwd" - file_path = os.path.normpath(upload_dir + "/" + filename) - if not file_path.startswith(upload_dir + "/"): - raise YunohostError("relative_parent_path_in_filename_forbidden") - i = 2 - while os.path.exists(file_path): - file_path = os.path.normpath(upload_dir + "/" + filename + (".%d" % i)) - i += 1 - content = question.value['content'] - try: - with open(file_path, 'wb') as f: - f.write(b64decode(content)) - except IOError as e: - raise YunohostError("cannot_write_file", file=file_path, error=str(e)) - except Exception as e: - raise YunohostError("error_writing_file", file=file_path, error=str(e)) - question.value = file_path - return question.value - - -ARGUMENTS_TYPE_PARSERS = { - "string": StringArgumentParser, - "text": StringArgumentParser, - "select": StringArgumentParser, - "tags": TagsArgumentParser, - "email": StringArgumentParser, - "url": StringArgumentParser, - "date": StringArgumentParser, - "time": StringArgumentParser, - "color": StringArgumentParser, - "password": PasswordArgumentParser, - "path": PathArgumentParser, - "boolean": BooleanArgumentParser, - "domain": DomainArgumentParser, - "user": UserArgumentParser, - "number": NumberArgumentParser, - "range": NumberArgumentParser, - "display_text": DisplayTextArgumentParser, - "alert": DisplayTextArgumentParser, - "markdown": DisplayTextArgumentParser, - "file": FileArgumentParser, -} - - -def _parse_args_in_yunohost_format(user_answers, argument_questions): - """Parse arguments store in either manifest.json or actions.json or from a - config panel against the user answers when they are present. - - Keyword arguments: - user_answers -- a dictionnary of arguments from the user (generally - empty in CLI, filed from the admin interface) - argument_questions -- the arguments description store in yunohost - format from actions.json/toml, manifest.json/toml - or config_panel.json/toml - """ - parsed_answers_dict = OrderedDict() - - for question in argument_questions: - parser = ARGUMENTS_TYPE_PARSERS[question.get("type", "string")]() - - answer = parser.parse(question=question, user_answers=user_answers) - if answer is not None: - parsed_answers_dict[question["name"]] = answer - - return parsed_answers_dict - def _validate_and_normalize_webpath(args_dict, app_folder): diff --git a/src/yunohost/utils/config.py b/src/yunohost/utils/config.py new file mode 100644 index 000000000..34883dcf7 --- /dev/null +++ b/src/yunohost/utils/config.py @@ -0,0 +1,814 @@ +# -*- coding: utf-8 -*- + +""" License + + Copyright (C) 2018 YUNOHOST.ORG + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program; if not, see http://www.gnu.org/licenses + +""" + +import os +import re +import toml +import urllib.parse +import tempfile +from collections import OrderedDict + +from moulinette.interfaces.cli import colorize +from moulinette import Moulinette, m18n +from moulinette.utils.log import getActionLogger +from moulinette.utils.process import check_output +from moulinette.utils.filesystem import ( + read_toml, + read_yaml, + write_to_yaml, + mkdir, +) + +from yunohost.service import _get_services +from yunohost.service import _run_service_command, _get_services +from yunohost.utils.i18n import _value_for_locale +from yunohost.utils.error import YunohostError, YunohostValidationError + +logger = getActionLogger("yunohost.config") +CONFIG_PANEL_VERSION_SUPPORTED = 1.0 + +class ConfigPanel: + + def __init__(self, config_path, save_path=None): + self.config_path = config_path + self.save_path = save_path + self.config = {} + self.values = {} + self.new_values = {} + + def get(self, key='', mode='classic'): + self.filter_key = key or '' + + # Read config panel toml + self._get_config_panel() + + if not self.config: + raise YunohostError("config_no_panel") + + # Read or get values and hydrate the config + self._load_current_values() + self._hydrate() + + # Format result in full mode + if mode == 'full': + return self.config + + # In 'classic' mode, we display the current value if key refer to an option + if self.filter_key.count('.') == 2 and mode == 'classic': + option = self.filter_key.split('.')[-1] + return self.values.get(option, None) + + # Format result in 'classic' or 'export' mode + logger.debug(f"Formating result in '{mode}' mode") + result = {} + for panel, section, option in self._iterate(): + key = f"{panel['id']}.{section['id']}.{option['id']}" + if mode == 'export': + result[option['id']] = option.get('current_value') + else: + result[key] = { 'ask': _value_for_locale(option['ask']) } + if 'current_value' in option: + result[key]['value'] = option['current_value'] + + return result + + def set(self, key=None, value=None, args=None, args_file=None): + self.filter_key = key or '' + + # Read config panel toml + self._get_config_panel() + + if not self.config: + raise YunohostError("config_no_panel") + + if (args is not None or args_file is not None) and value is not None: + raise YunohostError("config_args_value") + + if self.filter_key.count('.') != 2 and not value is None: + raise YunohostError("config_set_value_on_section") + + # Import and parse pre-answered options + logger.debug("Import and parse pre-answered options") + args = urllib.parse.parse_qs(args or '', keep_blank_values=True) + self.args = { key: ','.join(value_) for key, value_ in args.items() } + + if args_file: + # Import YAML / JSON file but keep --args values + self.args = { **read_yaml(args_file), **self.args } + + if value is not None: + self.args = {self.filter_key.split('.')[-1]: value} + + # Read or get values and hydrate the config + self._load_current_values() + self._hydrate() + + try: + self._ask() + self._apply() + + # Script got manually interrupted ... + # N.B. : KeyboardInterrupt does not inherit from Exception + except (KeyboardInterrupt, EOFError): + error = m18n.n("operation_interrupted") + logger.error(m18n.n("config_failed", error=error)) + raise + # Something wrong happened in Yunohost's code (most probably hook_exec) + except Exception: + import traceback + + error = m18n.n("unexpected_error", error="\n" + traceback.format_exc()) + logger.error(m18n.n("config_failed", error=error)) + raise + finally: + # Delete files uploaded from API + FileArgumentParser.clean_upload_dirs() + + if self.errors: + return { + "errors": errors, + } + + self._reload_services() + + logger.success("Config updated as expected") + return {} + + def _get_toml(self): + return read_toml(self.config_path) + + + def _get_config_panel(self): + # Split filter_key + filter_key = dict(enumerate(self.filter_key.split('.'))) + if len(filter_key) > 3: + raise YunohostError("config_too_much_sub_keys") + + if not os.path.exists(self.config_path): + return None + toml_config_panel = self._get_toml() + + # Check TOML config panel is in a supported version + if float(toml_config_panel["version"]) < CONFIG_PANEL_VERSION_SUPPORTED: + raise YunohostError( + "config_too_old_version", version=toml_config_panel["version"] + ) + + # Transform toml format into internal format + defaults = { + 'toml': { + 'version': 1.0 + }, + 'panels': { + 'name': '', + 'services': [], + 'actions': {'apply': {'en': 'Apply'}} + }, # help + 'sections': { + 'name': '', + 'services': [], + 'optional': True + }, # visibleIf help + 'options': {} + # ask type source help helpLink example style icon placeholder visibleIf + # optional choices pattern limit min max step accept redact + } + + def convert(toml_node, node_type): + """Convert TOML in internal format ('full' mode used by webadmin) + Here are some properties of 1.0 config panel in toml: + - node properties and node children are mixed, + - text are in english only + - some properties have default values + This function detects all children nodes and put them in a list + """ + # Prefill the node default keys if needed + default = defaults[node_type] + node = {key: toml_node.get(key, value) for key, value in default.items()} + + # Define the filter_key part to use and the children type + i = list(defaults).index(node_type) + search_key = filter_key.get(i) + subnode_type = list(defaults)[i+1] if node_type != 'options' else None + + for key, value in toml_node.items(): + # Key/value are a child node + if isinstance(value, OrderedDict) and key not in default and subnode_type: + # We exclude all nodes not referenced by the filter_key + if search_key and key != search_key: + continue + subnode = convert(value, subnode_type) + subnode['id'] = key + if node_type == 'sections': + subnode['name'] = key # legacy + subnode.setdefault('optional', toml_node.get('optional', True)) + node.setdefault(subnode_type, []).append(subnode) + # Key/value are a property + else: + # Todo search all i18n keys + node[key] = value if key not in ['ask', 'help', 'name'] else { 'en': value } + return node + + self.config = convert(toml_config_panel, 'toml') + + try: + self.config['panels'][0]['sections'][0]['options'][0] + except (KeyError, IndexError): + raise YunohostError( + "config_empty_or_bad_filter_key", filter_key=self.filter_key + ) + + return self.config + + def _hydrate(self): + # Hydrating config panel with current value + logger.debug("Hydrating config with current values") + for _, _, option in self._iterate(): + if option['name'] not in self.values: + continue + value = self.values[option['name']] + # In general, the value is just a simple value. + # Sometimes it could be a dict used to overwrite the option itself + value = value if isinstance(value, dict) else {'current_value': value } + option.update(value) + + return self.values + + def _ask(self): + logger.debug("Ask unanswered question and prevalidate data") + def display_header(message): + """ CLI panel/section header display + """ + if Moulinette.interface.type == 'cli' and self.filter_key.count('.') < 2: + Moulinette.display(colorize(message, 'purple')) + for panel, section, obj in self._iterate(['panel', 'section']): + if panel == obj: + name = _value_for_locale(panel['name']) + display_header(f"\n{'='*40}\n>>>> {name}\n{'='*40}") + continue + name = _value_for_locale(section['name']) + display_header(f"\n# {name}") + + # Check and ask unanswered questions + self.new_values.update(parse_args_in_yunohost_format( + self.args, section['options'] + )) + self.new_values = {key: str(value[0]) for key, value in self.new_values.items() if not value[0] is None} + + def _apply(self): + logger.info("Running config script...") + dir_path = os.path.dirname(os.path.realpath(self.save_path)) + if not os.path.exists(dir_path): + mkdir(dir_path, mode=0o700) + # Save the settings to the .yaml file + write_to_yaml(self.save_path, self.new_values) + + + def _reload_services(self): + logger.info("Reloading services...") + services_to_reload = set() + for panel, section, obj in self._iterate(['panel', 'section', 'option']): + services_to_reload |= set(obj.get('services', [])) + + services_to_reload = list(services_to_reload) + services_to_reload.sort(key = 'nginx'.__eq__) + for service in services_to_reload: + if '__APP__': + service = service.replace('__APP__', self.app) + logger.debug(f"Reloading {service}") + if not _run_service_command('reload-or-restart', service): + services = _get_services() + test_conf = services[service].get('test_conf', 'true') + errors = check_output(f"{test_conf}; exit 0") if test_conf else '' + raise YunohostError( + "config_failed_service_reload", + service=service, errors=errors + ) + + def _iterate(self, trigger=['option']): + for panel in self.config.get("panels", []): + if 'panel' in trigger: + yield (panel, None, panel) + for section in panel.get("sections", []): + if 'section' in trigger: + yield (panel, section, section) + if 'option' in trigger: + for option in section.get("options", []): + yield (panel, section, option) + + +class Question: + "empty class to store questions information" + + +class YunoHostArgumentFormatParser(object): + hide_user_input_in_prompt = False + operation_logger = None + + def parse_question(self, question, user_answers): + parsed_question = Question() + + parsed_question.name = question["name"] + parsed_question.type = question.get("type", 'string') + parsed_question.default = question.get("default", None) + parsed_question.current_value = question.get("current_value") + parsed_question.optional = question.get("optional", False) + parsed_question.choices = question.get("choices", []) + parsed_question.pattern = question.get("pattern") + parsed_question.ask = question.get("ask", {'en': f"{parsed_question.name}"}) + parsed_question.help = question.get("help") + parsed_question.helpLink = question.get("helpLink") + parsed_question.value = user_answers.get(parsed_question.name) + parsed_question.redact = question.get('redact', False) + + # Empty value is parsed as empty string + if parsed_question.default == "": + parsed_question.default = None + + return parsed_question + + def parse(self, question, user_answers): + question = self.parse_question(question, user_answers) + + while True: + # Display question if no value filled or if it's a readonly message + if Moulinette.interface.type== 'cli': + text_for_user_input_in_cli = self._format_text_for_user_input_in_cli( + question + ) + if getattr(self, "readonly", False): + Moulinette.display(text_for_user_input_in_cli) + + elif question.value is None: + prefill = "" + if question.current_value is not None: + prefill = question.current_value + elif question.default is not None: + prefill = question.default + question.value = Moulinette.prompt( + message=text_for_user_input_in_cli, + is_password=self.hide_user_input_in_prompt, + confirm=self.hide_user_input_in_prompt, + prefill=prefill, + is_multiline=(question.type == "text") + ) + + + # Apply default value + if question.value in [None, ""] and question.default is not None: + question.value = ( + getattr(self, "default_value", None) + if question.default is None + else question.default + ) + + # Prevalidation + try: + self._prevalidate(question) + except YunohostValidationError as e: + if Moulinette.interface.type== 'api': + raise + Moulinette.display(str(e), 'error') + question.value = None + continue + break + # this is done to enforce a certain formating like for boolean + # by default it doesn't do anything + question.value = self._post_parse_value(question) + + return (question.value, self.argument_type) + + def _prevalidate(self, question): + if question.value in [None, ""] and not question.optional: + raise YunohostValidationError( + "app_argument_required", name=question.name + ) + + # we have an answer, do some post checks + if question.value is not None: + if question.choices and question.value not in question.choices: + self._raise_invalid_answer(question) + if question.pattern and not re.match(question.pattern['regexp'], str(question.value)): + raise YunohostValidationError( + question.pattern['error'], + name=question.name, + value=question.value, + ) + + def _raise_invalid_answer(self, question): + raise YunohostValidationError( + "app_argument_choice_invalid", + name=question.name, + value=question.value, + choices=", ".join(question.choices), + ) + + def _format_text_for_user_input_in_cli(self, question): + text_for_user_input_in_cli = _value_for_locale(question.ask) + + if question.choices: + text_for_user_input_in_cli += " [{0}]".format(" | ".join(question.choices)) + + if question.help or question.helpLink: + text_for_user_input_in_cli += ":\033[m" + if question.help: + text_for_user_input_in_cli += "\n - " + text_for_user_input_in_cli += _value_for_locale(question.help) + if question.helpLink: + if not isinstance(question.helpLink, dict): + question.helpLink = {'href': question.helpLink} + text_for_user_input_in_cli += f"\n - See {question.helpLink['href']}" + return text_for_user_input_in_cli + + def _post_parse_value(self, question): + if not question.redact: + return question.value + + # Tell the operation_logger to redact all password-type / secret args + # Also redact the % escaped version of the password that might appear in + # the 'args' section of metadata (relevant for password with non-alphanumeric char) + data_to_redact = [] + if question.value and isinstance(question.value, str): + data_to_redact.append(question.value) + if question.current_value and isinstance(question.current_value, str): + data_to_redact.append(question.current_value) + data_to_redact += [ + urllib.parse.quote(data) + for data in data_to_redact + if urllib.parse.quote(data) != data + ] + if self.operation_logger: + self.operation_logger.data_to_redact.extend(data_to_redact) + elif data_to_redact: + raise YunohostError("app_argument_cant_redact", arg=question.name) + + return question.value + + +class StringArgumentParser(YunoHostArgumentFormatParser): + argument_type = "string" + default_value = "" + +class TagsArgumentParser(YunoHostArgumentFormatParser): + argument_type = "tags" + + def _prevalidate(self, question): + values = question.value + for value in values.split(','): + question.value = value + super()._prevalidate(question) + question.value = values + + + +class PasswordArgumentParser(YunoHostArgumentFormatParser): + hide_user_input_in_prompt = True + argument_type = "password" + default_value = "" + forbidden_chars = "{}" + + def parse_question(self, question, user_answers): + question = super(PasswordArgumentParser, self).parse_question( + question, user_answers + ) + question.redact = True + if question.default is not None: + raise YunohostValidationError( + "app_argument_password_no_default", name=question.name + ) + + return question + + def _prevalidate(self, question): + super()._prevalidate(question) + + if question.value is not None: + if any(char in question.value for char in self.forbidden_chars): + raise YunohostValidationError( + "pattern_password_app", forbidden_chars=self.forbidden_chars + ) + + # If it's an optional argument the value should be empty or strong enough + from yunohost.utils.password import assert_password_is_strong_enough + + assert_password_is_strong_enough("user", question.value) + + +class PathArgumentParser(YunoHostArgumentFormatParser): + argument_type = "path" + default_value = "" + + +class BooleanArgumentParser(YunoHostArgumentFormatParser): + argument_type = "boolean" + default_value = False + + def parse_question(self, question, user_answers): + question = super().parse_question( + question, user_answers + ) + + if question.default is None: + question.default = False + + return question + + def _format_text_for_user_input_in_cli(self, question): + text_for_user_input_in_cli = _value_for_locale(question.ask) + + text_for_user_input_in_cli += " [yes | no]" + + if question.default is not None: + formatted_default = "yes" if question.default else "no" + text_for_user_input_in_cli += " (default: {0})".format(formatted_default) + + return text_for_user_input_in_cli + + def _post_parse_value(self, question): + if isinstance(question.value, bool): + return 1 if question.value else 0 + + if str(question.value).lower() in ["1", "yes", "y", "true"]: + return 1 + + if str(question.value).lower() in ["0", "no", "n", "false"]: + return 0 + + raise YunohostValidationError( + "app_argument_choice_invalid", + name=question.name, + value=question.value, + choices="yes, no, y, n, 1, 0", + ) + + +class DomainArgumentParser(YunoHostArgumentFormatParser): + argument_type = "domain" + + def parse_question(self, question, user_answers): + from yunohost.domain import domain_list, _get_maindomain + + question = super(DomainArgumentParser, self).parse_question( + question, user_answers + ) + + if question.default is None: + question.default = _get_maindomain() + + question.choices = domain_list()["domains"] + + return question + + def _raise_invalid_answer(self, question): + raise YunohostValidationError( + "app_argument_invalid", field=question.name, error=m18n.n("domain_unknown") + ) + + +class UserArgumentParser(YunoHostArgumentFormatParser): + argument_type = "user" + + def parse_question(self, question, user_answers): + from yunohost.user import user_list, user_info + from yunohost.domain import _get_maindomain + + question = super(UserArgumentParser, self).parse_question( + question, user_answers + ) + question.choices = user_list()["users"] + if question.default is None: + root_mail = "root@%s" % _get_maindomain() + for user in question.choices.keys(): + if root_mail in user_info(user).get("mail-aliases", []): + question.default = user + break + + return question + + def _raise_invalid_answer(self, question): + raise YunohostValidationError( + "app_argument_invalid", + field=question.name, + error=m18n.n("user_unknown", user=question.value), + ) + + +class NumberArgumentParser(YunoHostArgumentFormatParser): + argument_type = "number" + default_value = "" + + def parse_question(self, question, user_answers): + question_parsed = super().parse_question( + question, user_answers + ) + question_parsed.min = question.get('min', None) + question_parsed.max = question.get('max', None) + if question_parsed.default is None: + question_parsed.default = 0 + + return question_parsed + + def _prevalidate(self, question): + super()._prevalidate(question) + if not isinstance(question.value, int) and not (isinstance(question.value, str) and question.value.isdigit()): + raise YunohostValidationError( + "app_argument_invalid", field=question.name, error=m18n.n("invalid_number") + ) + + if question.min is not None and int(question.value) < question.min: + raise YunohostValidationError( + "app_argument_invalid", field=question.name, error=m18n.n("invalid_number") + ) + + if question.max is not None and int(question.value) > question.max: + raise YunohostValidationError( + "app_argument_invalid", field=question.name, error=m18n.n("invalid_number") + ) + + def _post_parse_value(self, question): + if isinstance(question.value, int): + return super()._post_parse_value(question) + + if isinstance(question.value, str) and question.value.isdigit(): + return int(question.value) + + raise YunohostValidationError( + "app_argument_invalid", field=question.name, error=m18n.n("invalid_number") + ) + + +class DisplayTextArgumentParser(YunoHostArgumentFormatParser): + argument_type = "display_text" + readonly = True + + def parse_question(self, question, user_answers): + question_parsed = super().parse_question( + question, user_answers + ) + + question_parsed.optional = True + question_parsed.style = question.get('style', 'info') + + return question_parsed + + def _format_text_for_user_input_in_cli(self, question): + text = question.ask['en'] + + if question.style in ['success', 'info', 'warning', 'danger']: + color = { + 'success': 'green', + 'info': 'cyan', + 'warning': 'yellow', + 'danger': 'red' + } + return colorize(m18n.g(question.style), color[question.style]) + f" {text}" + else: + return text + +class FileArgumentParser(YunoHostArgumentFormatParser): + argument_type = "file" + upload_dirs = [] + + @classmethod + def clean_upload_dirs(cls): + # Delete files uploaded from API + if Moulinette.interface.type== 'api': + for upload_dir in cls.upload_dirs: + if os.path.exists(upload_dir): + shutil.rmtree(upload_dir) + + def parse_question(self, question, user_answers): + question_parsed = super().parse_question( + question, user_answers + ) + if question.get('accept'): + question_parsed.accept = question.get('accept').replace(' ', '').split(',') + else: + question_parsed.accept = [] + if Moulinette.interface.type== 'api': + if user_answers.get(f"{question_parsed.name}[name]"): + question_parsed.value = { + 'content': question_parsed.value, + 'filename': user_answers.get(f"{question_parsed.name}[name]", question_parsed.name), + } + # If path file are the same + if question_parsed.value and str(question_parsed.value) == question_parsed.current_value: + question_parsed.value = None + + return question_parsed + + def _prevalidate(self, question): + super()._prevalidate(question) + if isinstance(question.value, str) and question.value and not os.path.exists(question.value): + raise YunohostValidationError( + "app_argument_invalid", field=question.name, error=m18n.n("invalid_number1") + ) + if question.value in [None, ''] or not question.accept: + return + + filename = question.value if isinstance(question.value, str) else question.value['filename'] + if '.' not in filename or '.' + filename.split('.')[-1] not in question.accept: + raise YunohostValidationError( + "app_argument_invalid", field=question.name, error=m18n.n("invalid_number2") + ) + + + def _post_parse_value(self, question): + from base64 import b64decode + # Upload files from API + # A file arg contains a string with "FILENAME:BASE64_CONTENT" + if not question.value: + return question.value + + if Moulinette.interface.type== 'api': + + upload_dir = tempfile.mkdtemp(prefix='tmp_configpanel_') + FileArgumentParser.upload_dirs += [upload_dir] + filename = question.value['filename'] + logger.debug(f"Save uploaded file {question.value['filename']} from API into {upload_dir}") + + # Filename is given by user of the API. For security reason, we have replaced + # os.path.join to avoid the user to be able to rewrite a file in filesystem + # i.e. os.path.join("/foo", "/etc/passwd") == "/etc/passwd" + file_path = os.path.normpath(upload_dir + "/" + filename) + if not file_path.startswith(upload_dir + "/"): + raise YunohostError("relative_parent_path_in_filename_forbidden") + i = 2 + while os.path.exists(file_path): + file_path = os.path.normpath(upload_dir + "/" + filename + (".%d" % i)) + i += 1 + content = question.value['content'] + try: + with open(file_path, 'wb') as f: + f.write(b64decode(content)) + except IOError as e: + raise YunohostError("cannot_write_file", file=file_path, error=str(e)) + except Exception as e: + raise YunohostError("error_writing_file", file=file_path, error=str(e)) + question.value = file_path + return question.value + + +ARGUMENTS_TYPE_PARSERS = { + "string": StringArgumentParser, + "text": StringArgumentParser, + "select": StringArgumentParser, + "tags": TagsArgumentParser, + "email": StringArgumentParser, + "url": StringArgumentParser, + "date": StringArgumentParser, + "time": StringArgumentParser, + "color": StringArgumentParser, + "password": PasswordArgumentParser, + "path": PathArgumentParser, + "boolean": BooleanArgumentParser, + "domain": DomainArgumentParser, + "user": UserArgumentParser, + "number": NumberArgumentParser, + "range": NumberArgumentParser, + "display_text": DisplayTextArgumentParser, + "alert": DisplayTextArgumentParser, + "markdown": DisplayTextArgumentParser, + "file": FileArgumentParser, +} + +def parse_args_in_yunohost_format(user_answers, argument_questions): + """Parse arguments store in either manifest.json or actions.json or from a + config panel against the user answers when they are present. + + Keyword arguments: + user_answers -- a dictionnary of arguments from the user (generally + empty in CLI, filed from the admin interface) + argument_questions -- the arguments description store in yunohost + format from actions.json/toml, manifest.json/toml + or config_panel.json/toml + """ + parsed_answers_dict = OrderedDict() + + for question in argument_questions: + parser = ARGUMENTS_TYPE_PARSERS[question.get("type", "string")]() + + answer = parser.parse(question=question, user_answers=user_answers) + if answer is not None: + parsed_answers_dict[question["name"]] = answer + + return parsed_answers_dict + diff --git a/src/yunohost/utils/i18n.py b/src/yunohost/utils/i18n.py new file mode 100644 index 000000000..89d1d0b34 --- /dev/null +++ b/src/yunohost/utils/i18n.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- + +""" License + + Copyright (C) 2018 YUNOHOST.ORG + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program; if not, see http://www.gnu.org/licenses + +""" +from moulinette import Moulinette, m18n + +def _value_for_locale(values): + """ + Return proper value for current locale + + Keyword arguments: + values -- A dict of values associated to their locale + + Returns: + An utf-8 encoded string + + """ + if not isinstance(values, dict): + return values + + for lang in [m18n.locale, m18n.default_locale]: + try: + return values[lang] + except KeyError: + continue + + # Fallback to first value + return list(values.values())[0] + +