form: update asking flow, separate form and options

This commit is contained in:
axolotle 2023-04-17 15:06:53 +02:00
parent f5c56db10e
commit 89ae5e654d
2 changed files with 225 additions and 104 deletions

View file

@ -11,11 +11,11 @@ from typing import Any, Literal, Sequence, TypedDict, Union
from _pytest.mark.structures import ParameterSet from _pytest.mark.structures import ParameterSet
from moulinette import Moulinette from moulinette import Moulinette
from yunohost import app, domain, user from yunohost import app, domain, user
from yunohost.utils.form import ( from yunohost.utils.form import (
OPTIONS, OPTIONS,
FORBIDDEN_PASSWORD_CHARS,
ask_questions_and_parse_answers, ask_questions_and_parse_answers,
BaseChoicesOption, BaseChoicesOption,
BaseInputOption, BaseInputOption,
@ -378,8 +378,8 @@ def _fill_or_prompt_one_option(raw_option, intake):
options = {id_: raw_option} options = {id_: raw_option}
answers = {id_: intake} if intake is not None else {} answers = {id_: intake} if intake is not None else {}
option = ask_questions_and_parse_answers(options, answers)[0] options, form = ask_questions_and_parse_answers(options, answers)
return (option, option.value if isinstance(option, BaseInputOption) else None) return (options[0], form[id_] if isinstance(options[0], BaseInputOption) else None)
def _test_value_is_expected_output(value, expected_output): def _test_value_is_expected_output(value, expected_output):
@ -551,7 +551,7 @@ class TestDisplayText(BaseTest):
ask_questions_and_parse_answers({_id: raw_option}, answers) ask_questions_and_parse_answers({_id: raw_option}, answers)
else: else:
with patch.object(sys, "stdout", new_callable=StringIO) as stdout: with patch.object(sys, "stdout", new_callable=StringIO) as stdout:
options = ask_questions_and_parse_answers( options, form = ask_questions_and_parse_answers(
{_id: raw_option}, answers {_id: raw_option}, answers
) )
assert stdout.getvalue() == f"{options[0].ask['en']}\n" assert stdout.getvalue() == f"{options[0].ask['en']}\n"
@ -604,7 +604,7 @@ class TestAlert(TestDisplayText):
) )
else: else:
with patch.object(sys, "stdout", new_callable=StringIO) as stdout: with patch.object(sys, "stdout", new_callable=StringIO) as stdout:
options = ask_questions_and_parse_answers( options, form = ask_questions_and_parse_answers(
{"display_text_id": raw_option}, answers {"display_text_id": raw_option}, answers
) )
ask = options[0].ask["en"] ask = options[0].ask["en"]
@ -1925,9 +1925,7 @@ def test_options_query_string():
"&fake_id=fake_value" "&fake_id=fake_value"
) )
def _assert_correct_values(options, raw_options): def _assert_correct_values(options, form, raw_options):
form = {option.id: option.value for option in options}
for k, v in results.items(): for k, v in results.items():
if k == "file_id": if k == "file_id":
assert os.path.exists(form["file_id"]) and os.path.isfile( assert os.path.exists(form["file_id"]) and os.path.isfile(
@ -1943,24 +1941,24 @@ def test_options_query_string():
with patch_interface("api"), patch_file_api(file_content1) as b64content: with patch_interface("api"), patch_file_api(file_content1) as b64content:
with patch_query_string(b64content.decode("utf-8")) as query_string: with patch_query_string(b64content.decode("utf-8")) as query_string:
options = ask_questions_and_parse_answers(raw_options, query_string) options, form = ask_questions_and_parse_answers(raw_options, query_string)
_assert_correct_values(options, raw_options) _assert_correct_values(options, form, raw_options)
with patch_interface("cli"), patch_file_cli(file_content1) as filepath: with patch_interface("cli"), patch_file_cli(file_content1) as filepath:
with patch_query_string(filepath) as query_string: with patch_query_string(filepath) as query_string:
options = ask_questions_and_parse_answers(raw_options, query_string) options, form = ask_questions_and_parse_answers(raw_options, query_string)
_assert_correct_values(options, raw_options) _assert_correct_values(options, form, raw_options)
def test_question_string_default_type(): def test_question_string_default_type():
questions = {"some_string": {}} questions = {"some_string": {}}
answers = {"some_string": "some_value"} answers = {"some_string": "some_value"}
out = ask_questions_and_parse_answers(questions, answers)[0] options, form = ask_questions_and_parse_answers(questions, answers)
option = options[0]
assert out.id == "some_string" assert option.id == "some_string"
assert out.type == "string" assert option.type == "string"
assert out.value == "some_value" assert form[option.id] == "some_value"
def test_option_default_type_with_choices_is_select(): def test_option_default_type_with_choices_is_select():

View file

@ -25,17 +25,30 @@ import shutil
import tempfile import tempfile
import urllib.parse import urllib.parse
from enum import Enum from enum import Enum
from typing import Any, Callable, Dict, List, Literal, Mapping, Union
from logging import getLogger from logging import getLogger
from typing import (
Annotated,
Any,
Callable,
List,
Literal,
Mapping,
Type,
Union,
cast,
)
from pydantic import ( from pydantic import (
BaseModel, BaseModel,
Extra,
ValidationError,
create_model,
root_validator, root_validator,
validator, validator,
) )
from pydantic.color import Color from pydantic.color import Color
from pydantic.fields import Field
from pydantic.networks import EmailStr, HttpUrl from pydantic.networks import EmailStr, HttpUrl
from pydantic.types import FilePath
from moulinette import Moulinette, m18n from moulinette import Moulinette, m18n
from moulinette.interfaces.cli import colorize from moulinette.interfaces.cli import colorize
@ -385,15 +398,6 @@ class BaseInputOption(BaseOption):
return None return None
return value return value
# FIXME remove
def old__init__(self, question: Dict[str, Any]):
# .current_value is the currently stored value
self.current_value = question.get("current_value")
# .value is the "proposed" value which we got from the user
self.value = question.get("value")
# Use to return several values in case answer is in mutipart
self.values: Dict[str, Any] = {}
@staticmethod @staticmethod
def humanize(value, option={}): def humanize(value, option={}):
return str(value) return str(value)
@ -650,8 +654,6 @@ class DateOption(BaseInputOption):
# } # }
def _value_pre_validator(self): def _value_pre_validator(self):
from datetime import datetime
super()._value_pre_validator() super()._value_pre_validator()
if self.value not in [None, ""]: if self.value not in [None, ""]:
@ -1069,6 +1071,120 @@ OPTIONS = {
OptionType.group: GroupOption, OptionType.group: GroupOption,
} }
AnyOption = Union[
DisplayTextOption,
MarkdownOption,
AlertOption,
ButtonOption,
StringOption,
TextOption,
PasswordOption,
ColorOption,
NumberOption,
BooleanOption,
DateOption,
TimeOption,
EmailOption,
WebPathOption,
URLOption,
FileOption,
SelectOption,
TagsOption,
DomainOption,
AppOption,
UserOption,
GroupOption,
]
# ╭───────────────────────────────────────────────────────╮
# │ ┌─╴╭─╮┌─╮╭╮╮ │
# │ ├─╴│ │├┬╯│││ │
# │ ╵ ╰─╯╵ ╰╵╵╵ │
# ╰───────────────────────────────────────────────────────╯
class OptionsModel(BaseModel):
# Pydantic will match option types to their models class based on the "type" attribute
options: list[Annotated[AnyOption, Field(discriminator="type")]]
@staticmethod
def options_dict_to_list(options: dict[str, Any], defaults: dict[str, Any] = {}):
return [
option
| {
"id": id_,
"type": option.get("type", "string"),
}
for id_, option in options.items()
]
def __init__(self, **kwargs) -> None:
super().__init__(options=self.options_dict_to_list(kwargs))
class FormModel(BaseModel):
"""
Base form on which dynamic forms are built upon Options.
"""
class Config:
validate_assignment = True
extra = Extra.ignore
def __getitem__(self, name: str):
# FIXME
# if a FormModel's required field is not instancied with a value, it is
# not available as an attr and therefor triggers an `AttributeError`
# Also since `BaseReadonlyOption`s do not end up in form,
# `form[AlertOption.id]` would also triggers an error
# For convinience in those 2 cases, we return `None`
if not hasattr(self, name):
# Return None to trigger a validation error instead for required fields
return None
return getattr(self, name)
def __setitem__(self, name: str, value: Any):
setattr(self, name, value)
def get(self, attr: str, default: Any = None) -> Any:
try:
return getattr(self, attr)
except AttributeError:
return default
def build_form(options: list[AnyOption], name: str = "DynamicForm") -> Type[FormModel]:
"""
Returns a dynamic pydantic model class that can be used as a form.
Parsing/validation occurs at instanciation and assignements.
To avoid validation at instanciation, use `my_form.construct(**values)`
"""
options_as_fields: Any = {}
validators: dict[str, Any] = {}
for option in options:
if not isinstance(option, BaseInputOption):
continue # filter out non input options
options_as_fields[option.id] = option._as_dynamic_model_field()
for step in ("pre", "post"):
validators[f"{option.id}_{step}_validator"] = validator(
option.id, allow_reuse=True, pre=step == "pre"
)(getattr(option, f"_value_{step}_validator"))
return cast(
Type[FormModel],
create_model(
name,
__base__=FormModel,
__validators__=validators,
**options_as_fields,
),
)
def hydrate_option_type(raw_option: dict[str, Any]) -> dict[str, Any]: def hydrate_option_type(raw_option: dict[str, Any]) -> dict[str, Any]:
type_ = raw_option.get( type_ = raw_option.get(
@ -1097,20 +1213,16 @@ Hooks = dict[str, Callable[[BaseInputOption], Any]]
def prompt_or_validate_form( def prompt_or_validate_form(
raw_options: dict[str, Any], options: list[AnyOption],
form: FormModel,
prefilled_answers: dict[str, Any] = {}, prefilled_answers: dict[str, Any] = {},
context: Context = {}, context: Context = {},
hooks: Hooks = {}, hooks: Hooks = {},
) -> list[BaseOption]: ) -> FormModel:
options = []
answers = {**prefilled_answers} answers = {**prefilled_answers}
values = {}
for id_, raw_option in raw_options.items(): for option in options:
raw_option["id"] = id_
raw_option["value"] = answers.get(id_)
raw_option = hydrate_option_type(raw_option)
option = OPTIONS[raw_option["type"]](raw_option)
interactive = Moulinette.interface.type == "cli" and os.isatty(1) interactive = Moulinette.interface.type == "cli" and os.isatty(1)
if isinstance(option, ButtonOption): if isinstance(option, ButtonOption):
@ -1123,89 +1235,88 @@ def prompt_or_validate_form(
help=_value_for_locale(option.help), help=_value_for_locale(option.help),
) )
# FIXME not sure why we do not append Buttons to returned options
options.append(option)
if not option.is_visible(context): if not option.is_visible(context):
if isinstance(option, BaseInputOption): if isinstance(option, BaseInputOption):
# FIXME There could be several use case if the question is not displayed: # FIXME There could be several use case if the question is not displayed:
# - we doesn't want to give a specific value # - we doesn't want to give a specific value
# - we want to keep the previous value # - we want to keep the previous value
# - we want the default value # - we want the default value
option.value = context[option.id] = None context[option.id] = form[option.id] = None
continue continue
message = option._get_prompt_message() # if we try to get a `BaseReadonlyOption` value, which doesn't exists in the form,
# we get `None`
value = form[option.id]
if isinstance(option, BaseReadonlyOption) or option.readonly:
if isinstance(option, BaseInputOption):
# FIXME normalized needed, form[option.id] should already be normalized
# only update the context with the value
context[option.id] = form[option.id]
# FIXME here we could error out
if option.id in prefilled_answers:
logger.warning(
f"'{option.id}' is readonly, value '{prefilled_answers[option.id]}' is then ignored."
)
if option.readonly:
if interactive: if interactive:
Moulinette.display(message) Moulinette.display(option._get_prompt_message(value))
if isinstance(option, BaseInputOption):
option.value = context[option.id] = option.current_value
continue continue
if isinstance(option, BaseInputOption):
for i in range(5): for i in range(5):
if interactive and option.value is None: if option.id in prefilled_answers:
prefill = "" value = prefilled_answers[option.id]
elif interactive:
value = option.humanize(value, option)
choices = ( choices = (
option.choices if isinstance(option, BaseChoicesOption) else [] option.choices if isinstance(option, BaseChoicesOption) else []
) )
value = Moulinette.prompt(
if option.current_value is not None: message=option._get_prompt_message(value),
prefill = option.humanize(option.current_value, option)
elif option.default is not None:
prefill = option.humanize(option.default, option)
option.value = Moulinette.prompt(
message=message,
is_password=isinstance(option, PasswordOption), is_password=isinstance(option, PasswordOption),
confirm=False, confirm=False,
prefill=prefill, prefill=value,
is_multiline=(option.type == "text"), is_multiline=isinstance(option, TextOption),
autocomplete=choices, autocomplete=choices,
help=_value_for_locale(option.help), help=_value_for_locale(option.help),
) )
# Apply default value # Apply default value if none
class_default = getattr(option, "default_value", None) if value is None or value == "" and option.default is not None:
if option.value in [None, ""] and ( value = option.default
option.default is not None or class_default is not None
):
option.value = (
class_default if option.default is None else option.default
)
try: try:
# Normalize and validate # Normalize and validate
option.value = option.normalize(option.value, option) values[option.id] = form[option.id] = option.normalize(value, option)
option._value_pre_validator() except (ValidationError, YunohostValidationError) as e:
except YunohostValidationError as e:
# If in interactive cli, re-ask the current question # If in interactive cli, re-ask the current question
if i < 4 and interactive: if i < 4 and interactive:
logger.error(str(e)) logger.error(str(e))
option.value = None value = None
continue continue
if isinstance(e, ValidationError):
error = "\n".join([err["msg"] for err in e.errors()])
raise YunohostValidationError(error, raw_msg=True)
# Otherwise raise the ValidationError # Otherwise raise the ValidationError
raise raise e
break break
option.value = option.values[option.id] = option._value_post_validator()
# Search for post actions in hooks # Search for post actions in hooks
post_hook = f"post_ask__{option.id}" post_hook = f"post_ask__{option.id}"
if post_hook in hooks: if post_hook in hooks:
option.values.update(hooks[post_hook](option)) values.update(hooks[post_hook](option))
# FIXME reapply new values to form to validate it
answers.update(option.values) answers.update(values)
context.update(option.values) context.update(values)
return options return form
def ask_questions_and_parse_answers( def ask_questions_and_parse_answers(
@ -1213,7 +1324,7 @@ def ask_questions_and_parse_answers(
prefilled_answers: Union[str, Mapping[str, Any]] = {}, prefilled_answers: Union[str, Mapping[str, Any]] = {},
current_values: Mapping[str, Any] = {}, current_values: Mapping[str, Any] = {},
hooks: Hooks = {}, hooks: Hooks = {},
) -> list[BaseOption]: ) -> tuple[list[AnyOption], FormModel]:
"""Parse arguments store in either manifest.json or actions.json or from a """Parse arguments store in either manifest.json or actions.json or from a
config panel against the user answers when they are present. config panel against the user answers when they are present.
@ -1241,9 +1352,21 @@ def ask_questions_and_parse_answers(
context = {**current_values, **answers} context = {**current_values, **answers}
return prompt_or_validate_form( # Validate/parse the options attributes
raw_options, prefilled_answers=answers, context=context, hooks=hooks try:
model = OptionsModel(**raw_options)
except ValidationError as e:
error = "\n".join([err["msg"] for err in e.errors()])
# FIXME use YunohostError instead since it is not really a user mistake?
raise YunohostValidationError(error, raw_msg=True)
# Build the form from those questions and instantiate it without
# parsing/validation (construct) since it may contains required questions.
form = build_form(model.options).construct()
form = prompt_or_validate_form(
model.options, form, prefilled_answers=answers, context=context, hooks=hooks
) )
return (model.options, form)
def hydrate_questions_with_choices(raw_questions: List) -> List: def hydrate_questions_with_choices(raw_questions: List) -> List:
@ -1251,7 +1374,7 @@ def hydrate_questions_with_choices(raw_questions: List) -> List:
for raw_question in raw_questions: for raw_question in raw_questions:
raw_question = hydrate_option_type(raw_question) raw_question = hydrate_option_type(raw_question)
question = OPTIONS[raw_question["type"]](raw_question) question = OPTIONS[raw_question["type"]](**raw_question)
if isinstance(question, BaseChoicesOption) and question.choices: if isinstance(question, BaseChoicesOption) and question.choices:
raw_question["choices"] = question.choices raw_question["choices"] = question.choices
raw_question["default"] = question.default raw_question["default"] = question.default