diff --git a/src/yunohost/utils/config.py b/src/yunohost/utils/config.py index b8ba489e6..380da1027 100644 --- a/src/yunohost/utils/config.py +++ b/src/yunohost/utils/config.py @@ -90,7 +90,8 @@ class ConfigPanel: elif 'i18n' in self.config: result[key] = { 'ask': m18n.n(self.config['i18n'] + '_' + option['id']) } if 'current_value' in option: - result[key]['value'] = option['current_value'] + question_class = ARGUMENTS_TYPE_PARSERS[option.get("type", "string")] + result[key]['value'] = question_class.humanize(option['current_value'], option) return result @@ -144,7 +145,7 @@ class ConfigPanel: raise finally: # Delete files uploaded from API - FileArgumentParser.clean_upload_dirs() + FileQuestion.clean_upload_dirs() if self.errors: return { @@ -275,7 +276,8 @@ class ConfigPanel: self.new_values.update(parse_args_in_yunohost_format( self.args, section['options'] )) - self.new_values = {key: str(value[0]) for key, value in self.new_values.items() if not value[0] is None} + self.new_values = {key: value[0] for key, value in self.new_values.items() if not value[0] is None} + self.errors = None def _get_default_values(self): return { option['id']: option['default'] @@ -297,31 +299,31 @@ class ConfigPanel: self.values.update(on_disk_settings) def _apply(self): - logger.info("Running config script...") + logger.info("Saving the new configuration...") dir_path = os.path.dirname(os.path.realpath(self.save_path)) if not os.path.exists(dir_path): mkdir(dir_path, mode=0o700) + values_to_save = {**self.values, **self.new_values} if self.save_mode == 'diff': defaults = self._get_default_values() - values_to_save = {k: v for k, v in values.items() if defaults.get(k) != v} - else: - values_to_save = {**self.values, **self.new_values} + values_to_save = {k: v for k, v in values_to_save.items() if defaults.get(k) != v} # Save the settings to the .yaml file write_to_yaml(self.save_path, values_to_save) def _reload_services(self): - logger.info("Reloading services...") services_to_reload = set() for panel, section, obj in self._iterate(['panel', 'section', 'option']): services_to_reload |= set(obj.get('services', [])) services_to_reload = list(services_to_reload) services_to_reload.sort(key = 'nginx'.__eq__) + if services_to_reload: + logger.info("Reloading services...") for service in services_to_reload: - if '__APP__': + if '__APP__' in service: service = service.replace('__APP__', self.app) logger.debug(f"Reloading {service}") if not _run_service_command('reload-or-restart', service): @@ -345,141 +347,140 @@ class ConfigPanel: yield (panel, section, option) -class Question: - "empty class to store questions information" - - -class YunoHostArgumentFormatParser(object): +class Question(object): hide_user_input_in_prompt = False operation_logger = None - def parse_question(self, question, user_answers): - parsed_question = Question() - - parsed_question.name = question["name"] - parsed_question.type = question.get("type", 'string') - parsed_question.default = question.get("default", None) - parsed_question.current_value = question.get("current_value") - parsed_question.optional = question.get("optional", False) - parsed_question.choices = question.get("choices", []) - parsed_question.pattern = question.get("pattern") - parsed_question.ask = question.get("ask", {'en': f"{parsed_question.name}"}) - parsed_question.help = question.get("help") - parsed_question.helpLink = question.get("helpLink") - parsed_question.value = user_answers.get(parsed_question.name) - parsed_question.redact = question.get('redact', False) + def __init__(self, question, user_answers): + self.name = question["name"] + self.type = question.get("type", 'string') + self.default = question.get("default", None) + self.current_value = question.get("current_value") + self.optional = question.get("optional", False) + self.choices = question.get("choices", []) + self.pattern = question.get("pattern") + self.ask = question.get("ask", {'en': f"{self.name}"}) + self.help = question.get("help") + self.helpLink = question.get("helpLink") + self.value = user_answers.get(self.name) + self.redact = question.get('redact', False) # Empty value is parsed as empty string - if parsed_question.default == "": - parsed_question.default = None + if self.default == "": + self.default = None - return parsed_question + @staticmethod + def humanize(value, option={}): + return str(value) - def parse(self, question, user_answers): - question = self.parse_question(question, user_answers) + @staticmethod + def normalize(value, option={}): + return value + + def ask_if_needed(self): while True: # Display question if no value filled or if it's a readonly message if Moulinette.interface.type== 'cli': - text_for_user_input_in_cli = self._format_text_for_user_input_in_cli( - question - ) + text_for_user_input_in_cli = self._format_text_for_user_input_in_cli() if getattr(self, "readonly", False): Moulinette.display(text_for_user_input_in_cli) - elif question.value is None: + elif self.value is None: prefill = "" - if question.current_value is not None: - prefill = question.current_value - elif question.default is not None: - prefill = question.default - question.value = Moulinette.prompt( + if self.current_value is not None: + prefill = self.humanize(self.current_value, self) + elif self.default is not None: + prefill = self.default + self.value = Moulinette.prompt( message=text_for_user_input_in_cli, is_password=self.hide_user_input_in_prompt, confirm=self.hide_user_input_in_prompt, prefill=prefill, - is_multiline=(question.type == "text") + is_multiline=(self.type == "text") ) + # Normalization + # This is done to enforce a certain formating like for boolean + self.value = self.normalize(self.value, self) # Apply default value - if question.value in [None, ""] and question.default is not None: - question.value = ( + if self.value in [None, ""] and self.default is not None: + self.value = ( getattr(self, "default_value", None) - if question.default is None - else question.default + if self.default is None + else self.default ) # Prevalidation try: - self._prevalidate(question) + self._prevalidate() except YunohostValidationError as e: if Moulinette.interface.type== 'api': raise Moulinette.display(str(e), 'error') - question.value = None + self.value = None continue break - # this is done to enforce a certain formating like for boolean - # by default it doesn't do anything - question.value = self._post_parse_value(question) + self.value = self._post_parse_value() - return (question.value, self.argument_type) + return (self.value, self.argument_type) - def _prevalidate(self, question): - if question.value in [None, ""] and not question.optional: + + def _prevalidate(self): + if self.value in [None, ""] and not self.optional: raise YunohostValidationError( - "app_argument_required", name=question.name + "app_argument_required", name=self.name ) # we have an answer, do some post checks - if question.value is not None: - if question.choices and question.value not in question.choices: - self._raise_invalid_answer(question) - if question.pattern and not re.match(question.pattern['regexp'], str(question.value)): + if self.value is not None: + if self.choices and self.value not in self.choices: + self._raise_invalid_answer(self) + if self.pattern and not re.match(self.pattern['regexp'], str(self.value)): raise YunohostValidationError( - question.pattern['error'], - name=question.name, - value=question.value, + self.pattern['error'], + name=self.name, + value=self.value, ) - def _raise_invalid_answer(self, question): + def _raise_invalid_answer(self): raise YunohostValidationError( "app_argument_choice_invalid", - name=question.name, - value=question.value, - choices=", ".join(question.choices), + name=self.name, + value=self.value, + choices=", ".join(self.choices), ) - def _format_text_for_user_input_in_cli(self, question): - text_for_user_input_in_cli = _value_for_locale(question.ask) + def _format_text_for_user_input_in_cli(self): + text_for_user_input_in_cli = _value_for_locale(self.ask) - if question.choices: - text_for_user_input_in_cli += " [{0}]".format(" | ".join(question.choices)) + if self.choices: + text_for_user_input_in_cli += " [{0}]".format(" | ".join(self.choices)) - if question.help or question.helpLink: + if self.help or self.helpLink: text_for_user_input_in_cli += ":\033[m" - if question.help: + if self.help: text_for_user_input_in_cli += "\n - " - text_for_user_input_in_cli += _value_for_locale(question.help) - if question.helpLink: - if not isinstance(question.helpLink, dict): - question.helpLink = {'href': question.helpLink} - text_for_user_input_in_cli += f"\n - See {question.helpLink['href']}" + text_for_user_input_in_cli += _value_for_locale(self.help) + if self.helpLink: + if not isinstance(self.helpLink, dict): + self.helpLink = {'href': self.helpLink} + text_for_user_input_in_cli += f"\n - See {self.helpLink['href']}" return text_for_user_input_in_cli - def _post_parse_value(self, question): - if not question.redact: - return question.value + def _post_parse_value(self): + if not self.redact: + return self.value # Tell the operation_logger to redact all password-type / secret args # Also redact the % escaped version of the password that might appear in # the 'args' section of metadata (relevant for password with non-alphanumeric char) data_to_redact = [] - if question.value and isinstance(question.value, str): - data_to_redact.append(question.value) - if question.current_value and isinstance(question.current_value, str): - data_to_redact.append(question.current_value) + if self.value and isinstance(self.value, str): + data_to_redact.append(self.value) + if self.current_value and isinstance(self.current_value, str): + data_to_redact.append(self.current_value) data_to_redact += [ urllib.parse.quote(data) for data in data_to_redact @@ -488,50 +489,60 @@ class YunoHostArgumentFormatParser(object): if self.operation_logger: self.operation_logger.data_to_redact.extend(data_to_redact) elif data_to_redact: - raise YunohostError("app_argument_cant_redact", arg=question.name) + raise YunohostError("app_argument_cant_redact", arg=self.name) - return question.value + return self.value -class StringArgumentParser(YunoHostArgumentFormatParser): +class StringQuestion(Question): argument_type = "string" default_value = "" -class TagsArgumentParser(YunoHostArgumentFormatParser): +class TagsQuestion(Question): argument_type = "tags" - def _prevalidate(self, question): - values = question.value - for value in values.split(','): - question.value = value - super()._prevalidate(question) - question.value = values + @staticmethod + def humanize(value, option={}): + if isinstance(value, list): + return ','.join(value) + return value + + def _prevalidate(self): + values = self.value + if isinstance(values, str): + values = values.split(',') + for value in values: + self.value = value + super()._prevalidate() + self.value = values -class PasswordArgumentParser(YunoHostArgumentFormatParser): +class PasswordQuestion(Question): hide_user_input_in_prompt = True argument_type = "password" default_value = "" forbidden_chars = "{}" - def parse_question(self, question, user_answers): - question = super(PasswordArgumentParser, self).parse_question( - question, user_answers - ) - question.redact = True - if question.default is not None: + def __init__(self, question, user_answers): + super().__init__(question, user_answers) + self.redact = True + if self.default is not None: raise YunohostValidationError( - "app_argument_password_no_default", name=question.name + "app_argument_password_no_default", name=self.name ) - return question + @staticmethod + def humanize(value, option={}): + if value: + return '***' # Avoid to display the password on screen + return "" - def _prevalidate(self, question): - super()._prevalidate(question) + def _prevalidate(self): + super()._prevalidate() - if question.value is not None: - if any(char in question.value for char in self.forbidden_chars): + if self.value is not None: + if any(char in self.value for char in self.forbidden_chars): raise YunohostValidationError( "pattern_password_app", forbidden_chars=self.forbidden_chars ) @@ -539,181 +550,206 @@ class PasswordArgumentParser(YunoHostArgumentFormatParser): # If it's an optional argument the value should be empty or strong enough from yunohost.utils.password import assert_password_is_strong_enough - assert_password_is_strong_enough("user", question.value) + assert_password_is_strong_enough("user", self.value) -class PathArgumentParser(YunoHostArgumentFormatParser): +class PathQuestion(Question): argument_type = "path" default_value = "" -class BooleanArgumentParser(YunoHostArgumentFormatParser): +class BooleanQuestion(Question): argument_type = "boolean" default_value = False + yes_answers = ["1", "yes", "y", "true", "t", "on"] + no_answers = ["0", "no", "n", "false", "f", "off"] - def parse_question(self, question, user_answers): - question = super().parse_question( - question, user_answers + @staticmethod + def humanize(value, option={}): + yes = option.get('yes', 1) + no = option.get('no', 0) + value = str(value).lower() + if value == str(yes).lower(): + return 'yes' + if value == str(no).lower(): + return 'no' + if value in BooleanQuestion.yes_answers: + return 'yes' + if value in BooleanQuestion.no_answers: + return 'no' + + if value in ['none', ""]: + return '' + + raise YunohostValidationError( + "app_argument_choice_invalid", + name=self.name, + value=self.value, + choices="yes, no, y, n, 1, 0", ) - if question.default is None: - question.default = False + @staticmethod + def normalize(value, option={}): + yes = option.get('yes', 1) + no = option.get('no', 0) - return question + if str(value).lower() in BooleanQuestion.yes_answers: + return yes - def _format_text_for_user_input_in_cli(self, question): - text_for_user_input_in_cli = _value_for_locale(question.ask) + if str(value).lower() in BooleanQuestion.no_answers: + return no + + if value in [None, ""]: + return None + raise YunohostValidationError( + "app_argument_choice_invalid", + name=self.name, + value=self.value, + choices="yes, no, y, n, 1, 0", + ) + + def __init__(self, question, user_answers): + super().__init__(question, user_answers) + self.yes = question.get('yes', 1) + self.no = question.get('no', 0) + if self.default is None: + self.default = False + + + def _format_text_for_user_input_in_cli(self): + text_for_user_input_in_cli = _value_for_locale(self.ask) text_for_user_input_in_cli += " [yes | no]" - if question.default is not None: - formatted_default = "yes" if question.default else "no" + if self.default is not None: + formatted_default = self.humanize(self.default) text_for_user_input_in_cli += " (default: {0})".format(formatted_default) return text_for_user_input_in_cli - def _post_parse_value(self, question): - if isinstance(question.value, bool): - return 1 if question.value else 0 - - if str(question.value).lower() in ["1", "yes", "y", "true"]: - return 1 - - if str(question.value).lower() in ["0", "no", "n", "false"]: - return 0 - - raise YunohostValidationError( - "app_argument_choice_invalid", - name=question.name, - value=question.value, - choices="yes, no, y, n, 1, 0", - ) + def get(self, key, default=None): + try: + return getattr(self, key) + except AttributeError: + return default -class DomainArgumentParser(YunoHostArgumentFormatParser): + +class DomainQuestion(Question): argument_type = "domain" - def parse_question(self, question, user_answers): + def __init__(self, question, user_answers): from yunohost.domain import domain_list, _get_maindomain - question = super(DomainArgumentParser, self).parse_question( - question, user_answers - ) + super().__init__(question, user_answers) - if question.default is None: - question.default = _get_maindomain() + if self.default is None: + self.default = _get_maindomain() - question.choices = domain_list()["domains"] + self.choices = domain_list()["domains"] - return question - def _raise_invalid_answer(self, question): + def _raise_invalid_answer(self): raise YunohostValidationError( - "app_argument_invalid", field=question.name, error=m18n.n("domain_unknown") + "app_argument_invalid", field=self.name, error=m18n.n("domain_unknown") ) -class UserArgumentParser(YunoHostArgumentFormatParser): +class UserQuestion(Question): argument_type = "user" - def parse_question(self, question, user_answers): + def __init__(self, question, user_answers): from yunohost.user import user_list, user_info from yunohost.domain import _get_maindomain - question = super(UserArgumentParser, self).parse_question( - question, user_answers - ) - question.choices = user_list()["users"] - if question.default is None: + super().__init__(question, user_answers) + self.choices = user_list()["users"] + if self.default is None: root_mail = "root@%s" % _get_maindomain() - for user in question.choices.keys(): + for user in self.choices.keys(): if root_mail in user_info(user).get("mail-aliases", []): - question.default = user + self.default = user break - return question - def _raise_invalid_answer(self, question): + def _raise_invalid_answer(self): raise YunohostValidationError( "app_argument_invalid", - field=question.name, - error=m18n.n("user_unknown", user=question.value), + field=self.name, + error=m18n.n("user_unknown", user=self.value), ) -class NumberArgumentParser(YunoHostArgumentFormatParser): +class NumberQuestion(Question): argument_type = "number" default_value = "" - def parse_question(self, question, user_answers): - question_parsed = super().parse_question( - question, user_answers - ) - question_parsed.min = question.get('min', None) - question_parsed.max = question.get('max', None) - if question_parsed.default is None: - question_parsed.default = 0 + @staticmethod + def humanize(value, option={}): + return str(value) - return question_parsed + def __init__(self, question, user_answers): + super().__init__(question, user_answers) + self.min = question.get('min', None) + self.max = question.get('max', None) + self.step = question.get('step', None) - def _prevalidate(self, question): - super()._prevalidate(question) - if not isinstance(question.value, int) and not (isinstance(question.value, str) and question.value.isdigit()): + + def _prevalidate(self): + super()._prevalidate() + if not isinstance(self.value, int) and not (isinstance(self.value, str) and self.value.isdigit()): raise YunohostValidationError( - "app_argument_invalid", field=question.name, error=m18n.n("invalid_number") + "app_argument_invalid", field=self.name, error=m18n.n("invalid_number") ) - if question.min is not None and int(question.value) < question.min: + if self.min is not None and int(self.value) < self.min: raise YunohostValidationError( - "app_argument_invalid", field=question.name, error=m18n.n("invalid_number") + "app_argument_invalid", field=self.name, error=m18n.n("invalid_number") ) - if question.max is not None and int(question.value) > question.max: + if self.max is not None and int(self.value) > self.max: raise YunohostValidationError( - "app_argument_invalid", field=question.name, error=m18n.n("invalid_number") + "app_argument_invalid", field=self.name, error=m18n.n("invalid_number") ) - def _post_parse_value(self, question): - if isinstance(question.value, int): - return super()._post_parse_value(question) + def _post_parse_value(self): + if isinstance(self.value, int): + return super()._post_parse_value() - if isinstance(question.value, str) and question.value.isdigit(): - return int(question.value) + if isinstance(self.value, str) and self.value.isdigit(): + return int(self.value) raise YunohostValidationError( - "app_argument_invalid", field=question.name, error=m18n.n("invalid_number") + "app_argument_invalid", field=self.name, error=m18n.n("invalid_number") ) -class DisplayTextArgumentParser(YunoHostArgumentFormatParser): +class DisplayTextQuestion(Question): argument_type = "display_text" readonly = True - def parse_question(self, question, user_answers): - question_parsed = super().parse_question( - question, user_answers - ) + def __init__(self, question, user_answers): + super().__init__(question, user_answers) - question_parsed.optional = True - question_parsed.style = question.get('style', 'info') + self.optional = True + self.style = question.get('style', 'info') - return question_parsed - def _format_text_for_user_input_in_cli(self, question): - text = question.ask['en'] + def _format_text_for_user_input_in_cli(self): + text = self.ask['en'] - if question.style in ['success', 'info', 'warning', 'danger']: + if self.style in ['success', 'info', 'warning', 'danger']: color = { 'success': 'green', 'info': 'cyan', 'warning': 'yellow', 'danger': 'red' } - return colorize(m18n.g(question.style), color[question.style]) + f" {text}" + return colorize(m18n.g(self.style), color[self.style]) + f" {text}" else: return text -class FileArgumentParser(YunoHostArgumentFormatParser): +class FileQuestion(Question): argument_type = "file" upload_dirs = [] @@ -725,55 +761,52 @@ class FileArgumentParser(YunoHostArgumentFormatParser): if os.path.exists(upload_dir): shutil.rmtree(upload_dir) - def parse_question(self, question, user_answers): - question_parsed = super().parse_question( - question, user_answers - ) - if question.get('accept'): - question_parsed.accept = question.get('accept').replace(' ', '').split(',') + def __init__(self, question, user_answers): + super().__init__(question, user_answers) + if self.get('accept'): + self.accept = question.get('accept').replace(' ', '').split(',') else: - question_parsed.accept = [] + self.accept = [] if Moulinette.interface.type== 'api': - if user_answers.get(f"{question_parsed.name}[name]"): - question_parsed.value = { - 'content': question_parsed.value, - 'filename': user_answers.get(f"{question_parsed.name}[name]", question_parsed.name), + if user_answers.get(f"{self.name}[name]"): + self.value = { + 'content': self.value, + 'filename': user_answers.get(f"{self.name}[name]", self.name), } # If path file are the same - if question_parsed.value and str(question_parsed.value) == question_parsed.current_value: - question_parsed.value = None + if self.value and str(self.value) == self.current_value: + self.value = None - return question_parsed - def _prevalidate(self, question): - super()._prevalidate(question) - if isinstance(question.value, str) and question.value and not os.path.exists(question.value): + def _prevalidate(self): + super()._prevalidate() + if isinstance(self.value, str) and self.value and not os.path.exists(self.value): raise YunohostValidationError( - "app_argument_invalid", field=question.name, error=m18n.n("invalid_number1") + "app_argument_invalid", field=self.name, error=m18n.n("invalid_number1") ) - if question.value in [None, ''] or not question.accept: + if self.value in [None, ''] or not self.accept: return - filename = question.value if isinstance(question.value, str) else question.value['filename'] - if '.' not in filename or '.' + filename.split('.')[-1] not in question.accept: + filename = self.value if isinstance(self.value, str) else self.value['filename'] + if '.' not in filename or '.' + filename.split('.')[-1] not in self.accept: raise YunohostValidationError( - "app_argument_invalid", field=question.name, error=m18n.n("invalid_number2") + "app_argument_invalid", field=self.name, error=m18n.n("invalid_number2") ) - def _post_parse_value(self, question): + def _post_parse_value(self): from base64 import b64decode # Upload files from API # A file arg contains a string with "FILENAME:BASE64_CONTENT" - if not question.value: - return question.value + if not self.value: + return self.value if Moulinette.interface.type== 'api': upload_dir = tempfile.mkdtemp(prefix='tmp_configpanel_') - FileArgumentParser.upload_dirs += [upload_dir] - filename = question.value['filename'] - logger.debug(f"Save uploaded file {question.value['filename']} from API into {upload_dir}") + FileQuestion.upload_dirs += [upload_dir] + filename = self.value['filename'] + logger.debug(f"Save uploaded file {self.value['filename']} from API into {upload_dir}") # Filename is given by user of the API. For security reason, we have replaced # os.path.join to avoid the user to be able to rewrite a file in filesystem @@ -785,7 +818,7 @@ class FileArgumentParser(YunoHostArgumentFormatParser): while os.path.exists(file_path): file_path = os.path.normpath(upload_dir + "/" + filename + (".%d" % i)) i += 1 - content = question.value['content'] + content = self.value['content'] try: with open(file_path, 'wb') as f: f.write(b64decode(content)) @@ -793,31 +826,31 @@ class FileArgumentParser(YunoHostArgumentFormatParser): raise YunohostError("cannot_write_file", file=file_path, error=str(e)) except Exception as e: raise YunohostError("error_writing_file", file=file_path, error=str(e)) - question.value = file_path - return question.value + self.value = file_path + return self.value ARGUMENTS_TYPE_PARSERS = { - "string": StringArgumentParser, - "text": StringArgumentParser, - "select": StringArgumentParser, - "tags": TagsArgumentParser, - "email": StringArgumentParser, - "url": StringArgumentParser, - "date": StringArgumentParser, - "time": StringArgumentParser, - "color": StringArgumentParser, - "password": PasswordArgumentParser, - "path": PathArgumentParser, - "boolean": BooleanArgumentParser, - "domain": DomainArgumentParser, - "user": UserArgumentParser, - "number": NumberArgumentParser, - "range": NumberArgumentParser, - "display_text": DisplayTextArgumentParser, - "alert": DisplayTextArgumentParser, - "markdown": DisplayTextArgumentParser, - "file": FileArgumentParser, + "string": StringQuestion, + "text": StringQuestion, + "select": StringQuestion, + "tags": TagsQuestion, + "email": StringQuestion, + "url": StringQuestion, + "date": StringQuestion, + "time": StringQuestion, + "color": StringQuestion, + "password": PasswordQuestion, + "path": PathQuestion, + "boolean": BooleanQuestion, + "domain": DomainQuestion, + "user": UserQuestion, + "number": NumberQuestion, + "range": NumberQuestion, + "display_text": DisplayTextQuestion, + "alert": DisplayTextQuestion, + "markdown": DisplayTextQuestion, + "file": FileQuestion, } def parse_args_in_yunohost_format(user_answers, argument_questions): @@ -834,11 +867,12 @@ def parse_args_in_yunohost_format(user_answers, argument_questions): parsed_answers_dict = OrderedDict() for question in argument_questions: - parser = ARGUMENTS_TYPE_PARSERS[question.get("type", "string")]() + question_class = ARGUMENTS_TYPE_PARSERS[question.get("type", "string")] + question = question_class(question, user_answers) - answer = parser.parse(question=question, user_answers=user_answers) + answer = question.ask_if_needed() if answer is not None: - parsed_answers_dict[question["name"]] = answer + parsed_answers_dict[question.name] = answer return parsed_answers_dict