From ebd4535574aafc6257de46da0d75aff730df84d1 Mon Sep 17 00:00:00 2001 From: axolotle Date: Mon, 17 Apr 2023 15:06:53 +0200 Subject: [PATCH] form: update asking flow, separate form and options --- src/tests/test_questions.py | 32 ++-- src/utils/form.py | 297 +++++++++++++++++++++++++----------- 2 files changed, 225 insertions(+), 104 deletions(-) diff --git a/src/tests/test_questions.py b/src/tests/test_questions.py index 7737c4546..e2e2f66d1 100644 --- a/src/tests/test_questions.py +++ b/src/tests/test_questions.py @@ -11,11 +11,11 @@ from typing import Any, Literal, Sequence, TypedDict, Union from _pytest.mark.structures import ParameterSet - from moulinette import Moulinette from yunohost import app, domain, user from yunohost.utils.form import ( OPTIONS, + FORBIDDEN_PASSWORD_CHARS, ask_questions_and_parse_answers, BaseChoicesOption, BaseInputOption, @@ -378,8 +378,8 @@ def _fill_or_prompt_one_option(raw_option, intake): options = {id_: raw_option} answers = {id_: intake} if intake is not None else {} - option = ask_questions_and_parse_answers(options, answers)[0] - return (option, option.value if isinstance(option, BaseInputOption) else None) + options, form = ask_questions_and_parse_answers(options, answers) + return (options[0], form[id_] if isinstance(options[0], BaseInputOption) else None) def _test_value_is_expected_output(value, expected_output): @@ -551,7 +551,7 @@ class TestDisplayText(BaseTest): ask_questions_and_parse_answers({_id: raw_option}, answers) else: with patch.object(sys, "stdout", new_callable=StringIO) as stdout: - options = ask_questions_and_parse_answers( + options, form = ask_questions_and_parse_answers( {_id: raw_option}, answers ) assert stdout.getvalue() == f"{options[0].ask['en']}\n" @@ -604,7 +604,7 @@ class TestAlert(TestDisplayText): ) else: with patch.object(sys, "stdout", new_callable=StringIO) as stdout: - options = ask_questions_and_parse_answers( + options, form = ask_questions_and_parse_answers( {"display_text_id": raw_option}, answers ) ask = options[0].ask["en"] @@ -1912,9 +1912,7 @@ def test_options_query_string(): "&fake_id=fake_value" ) - def _assert_correct_values(options, raw_options): - form = {option.id: option.value for option in options} - + def _assert_correct_values(options, form, raw_options): for k, v in results.items(): if k == "file_id": assert os.path.exists(form["file_id"]) and os.path.isfile( @@ -1930,24 +1928,24 @@ def test_options_query_string(): with patch_interface("api"), patch_file_api(file_content1) as b64content: with patch_query_string(b64content.decode("utf-8")) as query_string: - options = ask_questions_and_parse_answers(raw_options, query_string) - _assert_correct_values(options, raw_options) + options, form = ask_questions_and_parse_answers(raw_options, query_string) + _assert_correct_values(options, form, raw_options) with patch_interface("cli"), patch_file_cli(file_content1) as filepath: with patch_query_string(filepath) as query_string: - options = ask_questions_and_parse_answers(raw_options, query_string) - _assert_correct_values(options, raw_options) + options, form = ask_questions_and_parse_answers(raw_options, query_string) + _assert_correct_values(options, form, raw_options) def test_question_string_default_type(): questions = {"some_string": {}} answers = {"some_string": "some_value"} - out = ask_questions_and_parse_answers(questions, answers)[0] - - assert out.id == "some_string" - assert out.type == "string" - assert out.value == "some_value" + options, form = ask_questions_and_parse_answers(questions, answers) + option = options[0] + assert option.id == "some_string" + assert option.type == "string" + assert form[option.id] == "some_value" def test_option_default_type_with_choices_is_select(): diff --git a/src/utils/form.py b/src/utils/form.py index 92ad65af3..bb18a5963 100644 --- a/src/utils/form.py +++ b/src/utils/form.py @@ -25,16 +25,29 @@ import shutil import tempfile import urllib.parse from enum import Enum -from typing import Any, Callable, Dict, List, Literal, Mapping, Union +from typing import ( + Annotated, + Any, + Callable, + List, + Literal, + Mapping, + Type, + Union, + cast, +) from pydantic import ( BaseModel, + Extra, + ValidationError, + create_model, root_validator, validator, ) from pydantic.color import Color +from pydantic.fields import Field from pydantic.networks import EmailStr, HttpUrl -from pydantic.types import FilePath from moulinette import Moulinette, m18n from moulinette.interfaces.cli import colorize @@ -385,15 +398,6 @@ class BaseInputOption(BaseOption): return None return value - # FIXME remove - def old__init__(self, question: Dict[str, Any]): - # .current_value is the currently stored value - self.current_value = question.get("current_value") - # .value is the "proposed" value which we got from the user - self.value = question.get("value") - # Use to return several values in case answer is in mutipart - self.values: Dict[str, Any] = {} - @staticmethod def humanize(value, option={}): return str(value) @@ -650,8 +654,6 @@ class DateOption(BaseInputOption): # } def _value_pre_validator(self): - from datetime import datetime - super()._value_pre_validator() if self.value not in [None, ""]: @@ -1064,6 +1066,120 @@ OPTIONS = { OptionType.group: GroupOption, } +AnyOption = Union[ + DisplayTextOption, + MarkdownOption, + AlertOption, + ButtonOption, + StringOption, + TextOption, + PasswordOption, + ColorOption, + NumberOption, + BooleanOption, + DateOption, + TimeOption, + EmailOption, + WebPathOption, + URLOption, + FileOption, + SelectOption, + TagsOption, + DomainOption, + AppOption, + UserOption, + GroupOption, +] + + +# ╭───────────────────────────────────────────────────────╮ +# │ ┌─╴╭─╮┌─╮╭╮╮ │ +# │ ├─╴│ │├┬╯│││ │ +# │ ╵ ╰─╯╵ ╰╵╵╵ │ +# ╰───────────────────────────────────────────────────────╯ + + +class OptionsModel(BaseModel): + # Pydantic will match option types to their models class based on the "type" attribute + options: list[Annotated[AnyOption, Field(discriminator="type")]] + + @staticmethod + def options_dict_to_list(options: dict[str, Any], defaults: dict[str, Any] = {}): + return [ + option + | { + "id": id_, + "type": option.get("type", "string"), + } + for id_, option in options.items() + ] + + def __init__(self, **kwargs) -> None: + super().__init__(options=self.options_dict_to_list(kwargs)) + + +class FormModel(BaseModel): + """ + Base form on which dynamic forms are built upon Options. + """ + + class Config: + validate_assignment = True + extra = Extra.ignore + + def __getitem__(self, name: str): + # FIXME + # if a FormModel's required field is not instancied with a value, it is + # not available as an attr and therefor triggers an `AttributeError` + # Also since `BaseReadonlyOption`s do not end up in form, + # `form[AlertOption.id]` would also triggers an error + # For convinience in those 2 cases, we return `None` + if not hasattr(self, name): + # Return None to trigger a validation error instead for required fields + return None + + return getattr(self, name) + + def __setitem__(self, name: str, value: Any): + setattr(self, name, value) + + def get(self, attr: str, default: Any = None) -> Any: + try: + return getattr(self, attr) + except AttributeError: + return default + + +def build_form(options: list[AnyOption], name: str = "DynamicForm") -> Type[FormModel]: + """ + Returns a dynamic pydantic model class that can be used as a form. + Parsing/validation occurs at instanciation and assignements. + To avoid validation at instanciation, use `my_form.construct(**values)` + """ + options_as_fields: Any = {} + validators: dict[str, Any] = {} + + for option in options: + if not isinstance(option, BaseInputOption): + continue # filter out non input options + + options_as_fields[option.id] = option._as_dynamic_model_field() + + for step in ("pre", "post"): + validators[f"{option.id}_{step}_validator"] = validator( + option.id, allow_reuse=True, pre=step == "pre" + )(getattr(option, f"_value_{step}_validator")) + + return cast( + Type[FormModel], + create_model( + name, + __base__=FormModel, + __validators__=validators, + **options_as_fields, + ), + ) + def hydrate_option_type(raw_option: dict[str, Any]) -> dict[str, Any]: type_ = raw_option.get( @@ -1092,20 +1208,16 @@ Hooks = dict[str, Callable[[BaseInputOption], Any]] def prompt_or_validate_form( - raw_options: dict[str, Any], + options: list[AnyOption], + form: FormModel, prefilled_answers: dict[str, Any] = {}, context: Context = {}, hooks: Hooks = {}, -) -> list[BaseOption]: - options = [] +) -> FormModel: answers = {**prefilled_answers} + values = {} - for id_, raw_option in raw_options.items(): - raw_option["id"] = id_ - raw_option["value"] = answers.get(id_) - raw_option = hydrate_option_type(raw_option) - option = OPTIONS[raw_option["type"]](raw_option) - + for option in options: interactive = Moulinette.interface.type == "cli" and os.isatty(1) if isinstance(option, ButtonOption): @@ -1118,89 +1230,88 @@ def prompt_or_validate_form( help=_value_for_locale(option.help), ) - # FIXME not sure why we do not append Buttons to returned options - options.append(option) - if not option.is_visible(context): if isinstance(option, BaseInputOption): # FIXME There could be several use case if the question is not displayed: # - we doesn't want to give a specific value # - we want to keep the previous value # - we want the default value - option.value = context[option.id] = None + context[option.id] = form[option.id] = None continue - message = option._get_prompt_message() - - if option.readonly: - if interactive: - Moulinette.display(message) + # if we try to get a `BaseReadonlyOption` value, which doesn't exists in the form, + # we get `None` + value = form[option.id] + if isinstance(option, BaseReadonlyOption) or option.readonly: if isinstance(option, BaseInputOption): - option.value = context[option.id] = option.current_value + # FIXME normalized needed, form[option.id] should already be normalized + # only update the context with the value + context[option.id] = form[option.id] + + # FIXME here we could error out + if option.id in prefilled_answers: + logger.warning( + f"'{option.id}' is readonly, value '{prefilled_answers[option.id]}' is then ignored." + ) + + if interactive: + Moulinette.display(option._get_prompt_message(value)) continue - if isinstance(option, BaseInputOption): - for i in range(5): - if interactive and option.value is None: - prefill = "" - choices = ( - option.choices if isinstance(option, BaseChoicesOption) else [] - ) + for i in range(5): + if option.id in prefilled_answers: + value = prefilled_answers[option.id] + elif interactive: + value = option.humanize(value, option) + choices = ( + option.choices if isinstance(option, BaseChoicesOption) else [] + ) + value = Moulinette.prompt( + message=option._get_prompt_message(value), + is_password=isinstance(option, PasswordOption), + confirm=False, + prefill=value, + is_multiline=isinstance(option, TextOption), + autocomplete=choices, + help=_value_for_locale(option.help), + ) - if option.current_value is not None: - prefill = option.humanize(option.current_value, option) - elif option.default is not None: - prefill = option.humanize(option.default, option) + # Apply default value if none + if value is None or value == "" and option.default is not None: + value = option.default - option.value = Moulinette.prompt( - message=message, - is_password=isinstance(option, PasswordOption), - confirm=False, - prefill=prefill, - is_multiline=(option.type == "text"), - autocomplete=choices, - help=_value_for_locale(option.help), - ) + try: + # Normalize and validate + values[option.id] = form[option.id] = option.normalize(value, option) + except (ValidationError, YunohostValidationError) as e: + # If in interactive cli, re-ask the current question + if i < 4 and interactive: + logger.error(str(e)) + value = None + continue - # Apply default value - class_default = getattr(option, "default_value", None) - if option.value in [None, ""] and ( - option.default is not None or class_default is not None - ): - option.value = ( - class_default if option.default is None else option.default - ) + if isinstance(e, ValidationError): + error = "\n".join([err["msg"] for err in e.errors()]) + raise YunohostValidationError(error, raw_msg=True) - try: - # Normalize and validate - option.value = option.normalize(option.value, option) - option._value_pre_validator() - except YunohostValidationError as e: - # If in interactive cli, re-ask the current question - if i < 4 and interactive: - logger.error(str(e)) - option.value = None - continue + # Otherwise raise the ValidationError + raise e - # Otherwise raise the ValidationError - raise + break - break + # Search for post actions in hooks + post_hook = f"post_ask__{option.id}" + if post_hook in hooks: + values.update(hooks[post_hook](option)) + # FIXME reapply new values to form to validate it - option.value = option.values[option.id] = option._value_post_validator() + answers.update(values) + context.update(values) - # Search for post actions in hooks - post_hook = f"post_ask__{option.id}" - if post_hook in hooks: - option.values.update(hooks[post_hook](option)) - - answers.update(option.values) - context.update(option.values) - - return options + return form def ask_questions_and_parse_answers( @@ -1208,7 +1319,7 @@ def ask_questions_and_parse_answers( prefilled_answers: Union[str, Mapping[str, Any]] = {}, current_values: Mapping[str, Any] = {}, hooks: Hooks = {}, -) -> list[BaseOption]: +) -> tuple[list[AnyOption], FormModel]: """Parse arguments store in either manifest.json or actions.json or from a config panel against the user answers when they are present. @@ -1236,9 +1347,21 @@ def ask_questions_and_parse_answers( context = {**current_values, **answers} - return prompt_or_validate_form( - raw_options, prefilled_answers=answers, context=context, hooks=hooks + # Validate/parse the options attributes + try: + model = OptionsModel(**raw_options) + except ValidationError as e: + error = "\n".join([err["msg"] for err in e.errors()]) + # FIXME use YunohostError instead since it is not really a user mistake? + raise YunohostValidationError(error, raw_msg=True) + + # Build the form from those questions and instantiate it without + # parsing/validation (construct) since it may contains required questions. + form = build_form(model.options).construct() + form = prompt_or_validate_form( + model.options, form, prefilled_answers=answers, context=context, hooks=hooks ) + return (model.options, form) def hydrate_questions_with_choices(raw_questions: List) -> List: @@ -1246,7 +1369,7 @@ def hydrate_questions_with_choices(raw_questions: List) -> List: for raw_question in raw_questions: raw_question = hydrate_option_type(raw_question) - question = OPTIONS[raw_question["type"]](raw_question) + question = OPTIONS[raw_question["type"]](**raw_question) if isinstance(question, BaseChoicesOption) and question.choices: raw_question["choices"] = question.choices raw_question["default"] = question.default