From f5c56db10ea6e27fae15c959b4ee9c09c16fef6f Mon Sep 17 00:00:00 2001 From: axolotle Date: Thu, 13 Apr 2023 20:11:03 +0200 Subject: [PATCH] form: use pydantic BaseModel in Options and add some validators --- src/utils/form.py | 470 +++++++++++++++++++++++++--------------------- 1 file changed, 261 insertions(+), 209 deletions(-) diff --git a/src/utils/form.py b/src/utils/form.py index f201f507b..8b47be430 100644 --- a/src/utils/form.py +++ b/src/utils/form.py @@ -17,6 +17,7 @@ # along with this program. If not, see . # import ast +import datetime import operator as op import os import re @@ -24,9 +25,18 @@ import shutil import tempfile import urllib.parse from enum import Enum -from typing import Any, Callable, Dict, List, Literal, Mapping, Optional, Union +from typing import Any, Callable, Dict, List, Literal, Mapping, Union from logging import getLogger +from pydantic import ( + BaseModel, + root_validator, + validator, +) +from pydantic.color import Color +from pydantic.networks import EmailStr, HttpUrl +from pydantic.types import FilePath + from moulinette import Moulinette, m18n from moulinette.interfaces.cli import colorize from moulinette.utils.filesystem import read_file, write_to_file @@ -36,7 +46,6 @@ from yunohost.utils.i18n import _value_for_locale logger = getLogger("yunohost.form") -Context = dict[str, Any] # ╭───────────────────────────────────────────────────────╮ # │ ┌─╴╷ ╷╭─┐╷ │ @@ -240,27 +249,58 @@ FORBIDDEN_READONLY_TYPES = { } -class BaseOption: - def __init__( - self, - question: Dict[str, Any], - ): - self.id = question["id"] - self.type = question.get("type", OptionType.string) - self.visible = question.get("visible", True) +Context = dict[str, Any] +Translation = Union[dict[str, str], str] +JSExpression = str +Values = dict[str, Any] - self.readonly = question.get("readonly", False) - if self.readonly and self.type in FORBIDDEN_READONLY_TYPES: - # FIXME i18n - raise YunohostError( - "config_forbidden_readonly_type", - type=self.type, - id=self.id, + +class Pattern(BaseModel): + regexp: str + error: Translation = "error_pattern" # FIXME add generic i18n key + + +class BaseOption(BaseModel): + type: OptionType + id: str + ask: Union[Translation, None] + readonly: bool = False + visible: Union[JSExpression, bool] = True + bind: Union[str, None] = None + + class Config: + arbitrary_types_allowed = True + use_enum_values = True + validate_assignment = True + + @staticmethod + def schema_extra(schema: dict[str, Any], model: Type["BaseOption"]) -> None: + # FIXME Do proper doctstring for Options + del schema["description"] + schema["additionalProperties"] = False + + @validator("ask", always=True) + def parse_or_set_default_ask( + cls, value: Union[Translation, None], values: Values + ) -> Translation: + if value is None: + return {"en": values["id"]} + if isinstance(value, str): + return {"en": value} + return value + + @validator("readonly", pre=True) + def can_be_readonly(cls, value: bool, values: Values) -> bool: + forbidden_types = ("password", "app", "domain", "user", "file") + if value is True and values["type"] in forbidden_types: + raise ValueError( + m18n.n( + "config_forbidden_readonly_type", + type=values["type"], + id=values["id"], + ) ) - - self.ask = question.get("ask", self.id) - if not isinstance(self.ask, dict): - self.ask = {"en": self.ask} + return value def is_visible(self, context: Context) -> bool: if isinstance(self.visible, bool): @@ -268,7 +308,7 @@ class BaseOption: return evaluate_simple_js_expression(self.visible, context=context) - def _get_prompt_message(self) -> str: + def _get_prompt_message(self, value: None) -> str: return _value_for_locale(self.ask) @@ -278,9 +318,7 @@ class BaseOption: class BaseReadonlyOption(BaseOption): - def __init__(self, question): - super().__init__(question) - self.readonly = True + readonly: Literal[True] = True class DisplayTextOption(BaseReadonlyOption): @@ -291,38 +329,35 @@ class MarkdownOption(BaseReadonlyOption): type: Literal[OptionType.markdown] = OptionType.markdown +class State(str, Enum): + success = "success" + info = "info" + warning = "warning" + danger = "danger" + + class AlertOption(BaseReadonlyOption): type: Literal[OptionType.alert] = OptionType.alert + style: State = State.info + icon: Union[str, None] = None - def __init__(self, question): - super().__init__(question) - self.style = question.get("style", "info") - - def _get_prompt_message(self) -> str: - text = _value_for_locale(self.ask) - - if self.style in ["success", "info", "warning", "danger"]: - color = { - "success": "green", - "info": "cyan", - "warning": "yellow", - "danger": "red", - } - prompt = m18n.g(self.style) if self.style != "danger" else m18n.n("danger") - return colorize(prompt, color[self.style]) + f" {text}" - else: - return text + def _get_prompt_message(self, value: None) -> str: + colors = { + State.success: "green", + State.info: "cyan", + State.warning: "yellow", + State.danger: "red", + } + message = m18n.g(self.style) if self.style != State.danger else m18n.n("danger") + return f"{colorize(message, colors[self.style])} {_value_for_locale(self.ask)}" class ButtonOption(BaseReadonlyOption): type: Literal[OptionType.button] = OptionType.button - enabled = True - - def __init__(self, question): - super().__init__(question) - self.help = question.get("help") - self.style = question.get("style", "success") - self.enabled = question.get("enabled", True) + help: Union[Translation, None] = None + style: State = State.success + icon: Union[str, None] = None + enabled: Union[JSExpression, bool] = True def is_enabled(self, context: Context) -> bool: if isinstance(self.enabled, bool): @@ -337,16 +372,21 @@ class ButtonOption(BaseReadonlyOption): class BaseInputOption(BaseOption): - hide_user_input_in_prompt = False - pattern: Optional[Dict] = None + help: Union[Translation, None] = None + example: Union[str, None] = None + placeholder: Union[str, None] = None + redact: bool = False + optional: bool = False # FIXME keep required as default? + default: Any = None - def __init__(self, question: Dict[str, Any]): - super().__init__(question) - self.default = question.get("default", None) - self.optional = question.get("optional", False) - self.pattern = question.get("pattern", self.pattern) - self.help = question.get("help") - self.redact = question.get("redact", False) + @validator("default", pre=True) + def check_empty_default(value: Any) -> Any: + if value == "": + return None + return value + + # FIXME remove + def old__init__(self, question: Dict[str, Any]): # .current_value is the currently stored value self.current_value = question.get("current_value") # .value is the "proposed" value which we got from the user @@ -354,10 +394,6 @@ class BaseInputOption(BaseOption): # Use to return several values in case answer is in mutipart self.values: Dict[str, Any] = {} - # Empty value is parsed as empty string - if self.default == "": - self.default = None - @staticmethod def humanize(value, option={}): return str(value) @@ -368,12 +404,12 @@ class BaseInputOption(BaseOption): value = value.strip() return value - def _get_prompt_message(self) -> str: - message = super()._get_prompt_message() + def _get_prompt_message(self, value: Any) -> str: + message = super()._get_prompt_message(value) if self.readonly: message = colorize(message, "purple") - return f"{message} {self.humanize(self.current_value)}" + return f"{message} {self.humanize(value, self)}" return message @@ -418,7 +454,8 @@ class BaseInputOption(BaseOption): class BaseStringOption(BaseInputOption): - default_value = "" + default: Union[str, None] + pattern: Union[Pattern, None] = None class StringOption(BaseStringOption): @@ -429,27 +466,23 @@ class TextOption(BaseStringOption): type: Literal[OptionType.text] = OptionType.text +FORBIDDEN_PASSWORD_CHARS = r"{}" + + class PasswordOption(BaseInputOption): type: Literal[OptionType.password] = OptionType.password - hide_user_input_in_prompt = True - default_value = "" - forbidden_chars = "{}" - - def __init__(self, question): - super().__init__(question) - self.redact = True - if self.default is not None: - raise YunohostValidationError( - "app_argument_password_no_default", name=self.id - ) + example: Literal[None] = None + default: Literal[None] = None + redact: Literal[True] = True + _forbidden_chars: str = FORBIDDEN_PASSWORD_CHARS def _value_pre_validator(self): super()._value_pre_validator() if self.value not in [None, ""]: - if any(char in self.value for char in self.forbidden_chars): + if any(char in self.value for char in self._forbidden_chars): raise YunohostValidationError( - "pattern_password_app", forbidden_chars=self.forbidden_chars + "pattern_password_app", forbidden_chars=self._forbidden_chars ) # If it's an optional argument the value should be empty or strong enough @@ -458,26 +491,25 @@ class PasswordOption(BaseInputOption): assert_password_is_strong_enough("user", self.value) -class ColorOption(BaseStringOption): +class ColorOption(BaseInputOption): type: Literal[OptionType.color] = OptionType.color - pattern = { - "regexp": r"^#[ABCDEFabcdef\d]{3,6}$", - "error": "config_validate_color", # i18n: config_validate_color - } + default: Union[str, None] + # pattern = { + # "regexp": r"^#[ABCDEFabcdef\d]{3,6}$", + # "error": "config_validate_color", # i18n: config_validate_color + # } # ─ NUMERIC ─────────────────────────────────────────────── class NumberOption(BaseInputOption): + # `number` and `range` are exactly the same, but `range` does render as a slider in web-admin type: Literal[OptionType.number, OptionType.range] = OptionType.number - default_value = None - - def __init__(self, question): - super().__init__(question) - self.min = question.get("min", None) - self.max = question.get("max", None) - self.step = question.get("step", None) + default: Union[int, None] + min: Union[int, None] = None + max: Union[int, None] = None + step: Union[int, None] = None @staticmethod def normalize(value, option={}): @@ -493,7 +525,7 @@ class NumberOption(BaseInputOption): if value in [None, ""]: return None - option = option.__dict__ if isinstance(option, BaseOption) else option + option = option.dict() if isinstance(option, BaseOption) else option raise YunohostValidationError( "app_argument_invalid", name=option.get("id"), @@ -525,20 +557,15 @@ class NumberOption(BaseInputOption): class BooleanOption(BaseInputOption): type: Literal[OptionType.boolean] = OptionType.boolean - default_value = 0 - yes_answers = ["1", "yes", "y", "true", "t", "on"] - no_answers = ["0", "no", "n", "false", "f", "off"] - - def __init__(self, question): - super().__init__(question) - self.yes = question.get("yes", 1) - self.no = question.get("no", 0) - if self.default is None: - self.default = self.no + yes: Any = 1 + no: Any = 0 + default: Union[bool, int, str, None] = 0 + _yes_answers: set[str] = {"1", "yes", "y", "true", "t", "on"} + _no_answers: set[str] = {"0", "no", "n", "false", "f", "off"} @staticmethod def humanize(value, option={}): - option = option.__dict__ if isinstance(option, BaseOption) else option + option = option.dict() if isinstance(option, BaseOption) else option yes = option.get("yes", 1) no = option.get("no", 0) @@ -561,7 +588,7 @@ class BooleanOption(BaseInputOption): @staticmethod def normalize(value, option={}): - option = option.__dict__ if isinstance(option, BaseOption) else option + option = option.dict() if isinstance(option, BaseOption) else option if isinstance(value, str): value = value.strip() @@ -569,8 +596,8 @@ class BooleanOption(BaseInputOption): technical_yes = option.get("yes", 1) technical_no = option.get("no", 0) - no_answers = BooleanOption.no_answers - yes_answers = BooleanOption.yes_answers + no_answers = BooleanOption._no_answers + yes_answers = BooleanOption._yes_answers assert ( str(technical_yes).lower() not in no_answers @@ -579,8 +606,8 @@ class BooleanOption(BaseInputOption): str(technical_no).lower() not in yes_answers ), f"'no' value can't be in {yes_answers}" - no_answers += [str(technical_no).lower()] - yes_answers += [str(technical_yes).lower()] + no_answers.add(str(technical_no).lower()) + yes_answers.add(str(technical_yes).lower()) strvalue = str(value).lower() @@ -602,8 +629,8 @@ class BooleanOption(BaseInputOption): def get(self, key, default=None): return getattr(self, key, default) - def _get_prompt_message(self): - message = super()._get_prompt_message() + def _get_prompt_message(self, value: Union[bool, None]) -> str: + message = super()._get_prompt_message(value) if not self.readonly: message += " [yes | no]" @@ -614,12 +641,13 @@ class BooleanOption(BaseInputOption): # ─ TIME ────────────────────────────────────────────────── -class DateOption(BaseStringOption): +class DateOption(BaseInputOption): type: Literal[OptionType.date] = OptionType.date - pattern = { - "regexp": r"^\d{4}-\d\d-\d\d$", - "error": "config_validate_date", # i18n: config_validate_date - } + default: Union[str, None] + # pattern = { + # "regexp": r"^\d{4}-\d\d-\d\d$", + # "error": "config_validate_date", # i18n: config_validate_date + # } def _value_pre_validator(self): from datetime import datetime @@ -633,32 +661,34 @@ class DateOption(BaseStringOption): raise YunohostValidationError("config_validate_date") -class TimeOption(BaseStringOption): +class TimeOption(BaseInputOption): type: Literal[OptionType.time] = OptionType.time - pattern = { - "regexp": r"^(?:\d|[01]\d|2[0-3]):[0-5]\d$", - "error": "config_validate_time", # i18n: config_validate_time - } + default: Union[str, int, None] + # pattern = { + # "regexp": r"^(?:\d|[01]\d|2[0-3]):[0-5]\d$", + # "error": "config_validate_time", # i18n: config_validate_time + # } # ─ LOCATIONS ───────────────────────────────────────────── -class EmailOption(BaseStringOption): +class EmailOption(BaseInputOption): type: Literal[OptionType.email] = OptionType.email - pattern = { - "regexp": r"^.+@.+", - "error": "config_validate_email", # i18n: config_validate_email - } + default: Union[EmailStr, None] + # pattern = { + # "regexp": r"^.+@.+", + # "error": "config_validate_email", # i18n: config_validate_email + # } class WebPathOption(BaseInputOption): type: Literal[OptionType.path] = OptionType.path - default_value = "" + default: Union[str, None] @staticmethod def normalize(value, option={}): - option = option.__dict__ if isinstance(option, BaseOption) else option + option = option.dict() if isinstance(option, BaseOption) else option if not isinstance(value, str): raise YunohostValidationError( @@ -685,10 +715,11 @@ class WebPathOption(BaseInputOption): class URLOption(BaseStringOption): type: Literal[OptionType.url] = OptionType.url - pattern = { - "regexp": r"^https?://.*$", - "error": "config_validate_url", # i18n: config_validate_url - } + default: Union[str, None] + # pattern = { + # "regexp": r"^https?://.*$", + # "error": "config_validate_url", # i18n: config_validate_url + # } # ─ FILE ────────────────────────────────────────────────── @@ -696,16 +727,16 @@ class URLOption(BaseStringOption): class FileOption(BaseInputOption): type: Literal[OptionType.file] = OptionType.file - upload_dirs: List[str] = [] - - def __init__(self, question): - super().__init__(question) - self.accept = question.get("accept", "") + # `FilePath` for CLI (path must exists and must be a file) + # `bytes` for API (a base64 encoded file actually) + accept: Union[str, None] = "" # currently only used by the web-admin + default: Union[str, None] + _upload_dirs: set[str] = set() @classmethod def clean_upload_dirs(cls): # Delete files uploaded from API - for upload_dir in cls.upload_dirs: + for upload_dir in cls._upload_dirs: if os.path.exists(upload_dir): shutil.rmtree(upload_dir) @@ -738,7 +769,7 @@ class FileOption(BaseInputOption): upload_dir = tempfile.mkdtemp(prefix="ynh_filequestion_") _, file_path = tempfile.mkstemp(dir=upload_dir) - FileOption.upload_dirs += [upload_dir] + FileOption._upload_dirs.add(upload_dir) logger.debug(f"Saving file {self.id} for file question into {file_path}") @@ -760,26 +791,30 @@ class FileOption(BaseInputOption): # ─ CHOICES ─────────────────────────────────────────────── -class BaseChoicesOption(BaseInputOption): - def __init__( - self, - question: Dict[str, Any], - ): - super().__init__(question) - # Don't restrict choices if there's none specified - self.choices = question.get("choices", None) +ChoosableOptions = Literal[ + OptionType.string, + OptionType.color, + OptionType.number, + OptionType.date, + OptionType.time, + OptionType.email, + OptionType.path, + OptionType.url, +] - def _get_prompt_message(self) -> str: - message = super()._get_prompt_message() + +class BaseChoicesOption(BaseInputOption): + # FIXME probably forbid choices to be None? + choices: Union[dict[str, Any], list[Any], None] + + def _get_prompt_message(self, value: Any) -> str: + message = super()._get_prompt_message(value) if self.readonly: - message = message - choice = self.current_value + if isinstance(self.choices, dict) and value is not None: + value = self.choices[value] - if isinstance(self.choices, dict) and choice is not None: - choice = self.choices[choice] - - return f"{colorize(message, 'purple')} {choice}" + return f"{colorize(message, 'purple')} {value}" if self.choices: # Prevent displaying a shitload of choices @@ -789,17 +824,15 @@ class BaseChoicesOption(BaseInputOption): if isinstance(self.choices, dict) else self.choices ) - choices_to_display = choices[:20] + splitted_choices = choices[:20] remaining_choices = len(choices[20:]) if remaining_choices > 0: - choices_to_display += [ + splitted_choices += [ m18n.n("other_available_options", n=remaining_choices) ] - choices_to_display = " | ".join( - str(choice) for choice in choices_to_display - ) + choices_to_display = " | ".join(str(choice) for choice in splitted_choices) return f"{message} [{choices_to_display}]" @@ -821,12 +854,15 @@ class BaseChoicesOption(BaseInputOption): class SelectOption(BaseChoicesOption): type: Literal[OptionType.select] = OptionType.select - default_value = "" + choices: Union[dict[str, Any], list[Any]] + default: Union[str, None] class TagsOption(BaseChoicesOption): type: Literal[OptionType.tags] = OptionType.tags - default_value = "" + choices: Union[list[str], None] = None + pattern: Union[Pattern, None] = None + default: Union[str, list[str], None] @staticmethod def humanize(value, option={}): @@ -879,20 +915,24 @@ class TagsOption(BaseChoicesOption): class DomainOption(BaseChoicesOption): type: Literal[OptionType.domain] = OptionType.domain + choices: Union[dict[str, str], None] - def __init__(self, question): - from yunohost.domain import domain_list, _get_maindomain + @root_validator() + def inject_domains_choices_and_default(cls, values: Values) -> Values: + # TODO remove calls to resources in validators (pydantic V2 should adress this) + from yunohost.domain import domain_list - super().__init__(question) - - if self.default is None: - self.default = _get_maindomain() - - self.choices = { - domain: domain + " ★" if domain == self.default else domain - for domain in domain_list()["domains"] + data = domain_list() + values["choices"] = { + domain: domain + " ★" if domain == data["main"] else domain + for domain in data["domains"] } + if values["default"] is None: + values["default"] = data["main"] + + return values + @staticmethod def normalize(value, option={}): if value.startswith("https://"): @@ -908,87 +948,99 @@ class DomainOption(BaseChoicesOption): class AppOption(BaseChoicesOption): type: Literal[OptionType.app] = OptionType.app + choices: Union[dict[str, str], None] + add_yunohost_portal_to_choices: bool = False + filter: Union[str, None] = None - def __init__(self, question): + @root_validator() + def inject_apps_choices(cls, values: Values) -> Values: from yunohost.app import app_list - super().__init__(question) - self.filter = question.get("filter", None) - self.add_yunohost_portal_to_choices = question.get("add_yunohost_portal_to_choices", False) - apps = app_list(full=True)["apps"] - if self.filter: + if values.get("filter", None): apps = [ app for app in apps - if evaluate_simple_js_expression(self.filter, context=app) + if evaluate_simple_js_expression(values["filter"], context=app) ] + values["choices"] = {"_none": "---"} - def _app_display(app): - domain_path_or_id = f" ({app.get('domain_path', app['id'])})" - return app["label"] + domain_path_or_id + if values.get("add_yunohost_portal_to_choices", False): + values["choices"]["_yunohost_portal_with_public_apps"] = "YunoHost's portal with public apps" - self.choices = {"_none": "---"} - if self.add_yunohost_portal_to_choices: - # FIXME: i18n - self.choices["_yunohost_portal_with_public_apps"] = "YunoHost's portal with public apps" - self.choices.update({app["id"]: _app_display(app) for app in apps}) + values["choices"].update( + { + app["id"]: f"{app['label']} ({app.get('domain_path', app['id'])})" + for app in apps + } + ) + + return values class UserOption(BaseChoicesOption): type: Literal[OptionType.user] = OptionType.user + choices: Union[dict[str, str], None] - def __init__(self, question): - from yunohost.user import user_list, user_info + @root_validator() + def inject_users_choices_and_default(cls, values: dict[str, Any]) -> dict[str, Any]: from yunohost.domain import _get_maindomain + from yunohost.user import user_info, user_list - super().__init__(question) - - self.choices = { + values["choices"] = { username: f"{infos['fullname']} ({infos['mail']})" for username, infos in user_list()["users"].items() } - if not self.choices: + # FIXME keep this to test if any user, do not raise error if no admin? + if not values["choices"]: raise YunohostValidationError( "app_argument_invalid", - name=self.id, + name=values["id"], error="You should create a YunoHost user first.", ) - if self.default is None: + if values["default"] is None: # FIXME: this code is obsolete with the new admins group # Should be replaced by something like "any first user we find in the admin group" root_mail = "root@%s" % _get_maindomain() - for user in self.choices.keys(): + for user in values["choices"].keys(): if root_mail in user_info(user).get("mail-aliases", []): - self.default = user + values["default"] = user break + return values + class GroupOption(BaseChoicesOption): type: Literal[OptionType.group] = OptionType.group + choices: Union[dict[str, str], None] - def __init__(self, question): + @root_validator() + def inject_groups_choices_and_default(cls, values: Values) -> Values: from yunohost.user import user_group_list - super().__init__(question) + groups = user_group_list(short=True, include_primary_groups=False)["groups"] - self.choices = list( - user_group_list(short=True, include_primary_groups=False)["groups"] - ) - - def _human_readable_group(g): + def _human_readable_group(groupname): # i18n: visitors # i18n: all_users # i18n: admins - return m18n.n(g) if g in ["visitors", "all_users", "admins"] else g + return ( + m18n.n(groupname) + if groupname in ["visitors", "all_users", "admins"] + else groupname + ) - self.choices = {g: _human_readable_group(g) for g in self.choices} + values["choices"] = { + groupname: _human_readable_group(groupname) for groupname in groups + } - if self.default is None: - self.default = "all_users" + if values["default"] is None: + values["default"] = "all_users" + + return values OPTIONS = { @@ -997,7 +1049,7 @@ OPTIONS = { OptionType.alert: AlertOption, OptionType.button: ButtonOption, OptionType.string: StringOption, - OptionType.text: StringOption, + OptionType.text: TextOption, OptionType.password: PasswordOption, OptionType.color: ColorOption, OptionType.number: NumberOption,