form: use pydantic BaseModel in Options and add some validators

This commit is contained in:
axolotle 2023-04-13 20:11:03 +02:00
parent 5bd8680847
commit f5c56db10e

View file

@ -17,6 +17,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import ast
import datetime
import operator as op
import os
import re
@ -24,9 +25,18 @@ import shutil
import tempfile
import urllib.parse
from enum import Enum
from typing import Any, Callable, Dict, List, Literal, Mapping, Optional, Union
from typing import Any, Callable, Dict, List, Literal, Mapping, Union
from logging import getLogger
from pydantic import (
BaseModel,
root_validator,
validator,
)
from pydantic.color import Color
from pydantic.networks import EmailStr, HttpUrl
from pydantic.types import FilePath
from moulinette import Moulinette, m18n
from moulinette.interfaces.cli import colorize
from moulinette.utils.filesystem import read_file, write_to_file
@ -36,7 +46,6 @@ from yunohost.utils.i18n import _value_for_locale
logger = getLogger("yunohost.form")
Context = dict[str, Any]
# ╭───────────────────────────────────────────────────────╮
# │ ┌─╴╷ ╷╭─┐╷ │
@ -240,27 +249,58 @@ FORBIDDEN_READONLY_TYPES = {
}
class BaseOption:
def __init__(
self,
question: Dict[str, Any],
):
self.id = question["id"]
self.type = question.get("type", OptionType.string)
self.visible = question.get("visible", True)
Context = dict[str, Any]
Translation = Union[dict[str, str], str]
JSExpression = str
Values = dict[str, Any]
self.readonly = question.get("readonly", False)
if self.readonly and self.type in FORBIDDEN_READONLY_TYPES:
# FIXME i18n
raise YunohostError(
"config_forbidden_readonly_type",
type=self.type,
id=self.id,
class Pattern(BaseModel):
regexp: str
error: Translation = "error_pattern" # FIXME add generic i18n key
class BaseOption(BaseModel):
type: OptionType
id: str
ask: Union[Translation, None]
readonly: bool = False
visible: Union[JSExpression, bool] = True
bind: Union[str, None] = None
class Config:
arbitrary_types_allowed = True
use_enum_values = True
validate_assignment = True
@staticmethod
def schema_extra(schema: dict[str, Any], model: Type["BaseOption"]) -> None:
# FIXME Do proper doctstring for Options
del schema["description"]
schema["additionalProperties"] = False
@validator("ask", always=True)
def parse_or_set_default_ask(
cls, value: Union[Translation, None], values: Values
) -> Translation:
if value is None:
return {"en": values["id"]}
if isinstance(value, str):
return {"en": value}
return value
@validator("readonly", pre=True)
def can_be_readonly(cls, value: bool, values: Values) -> bool:
forbidden_types = ("password", "app", "domain", "user", "file")
if value is True and values["type"] in forbidden_types:
raise ValueError(
m18n.n(
"config_forbidden_readonly_type",
type=values["type"],
id=values["id"],
)
)
self.ask = question.get("ask", self.id)
if not isinstance(self.ask, dict):
self.ask = {"en": self.ask}
return value
def is_visible(self, context: Context) -> bool:
if isinstance(self.visible, bool):
@ -268,7 +308,7 @@ class BaseOption:
return evaluate_simple_js_expression(self.visible, context=context)
def _get_prompt_message(self) -> str:
def _get_prompt_message(self, value: None) -> str:
return _value_for_locale(self.ask)
@ -278,9 +318,7 @@ class BaseOption:
class BaseReadonlyOption(BaseOption):
def __init__(self, question):
super().__init__(question)
self.readonly = True
readonly: Literal[True] = True
class DisplayTextOption(BaseReadonlyOption):
@ -291,38 +329,35 @@ class MarkdownOption(BaseReadonlyOption):
type: Literal[OptionType.markdown] = OptionType.markdown
class State(str, Enum):
success = "success"
info = "info"
warning = "warning"
danger = "danger"
class AlertOption(BaseReadonlyOption):
type: Literal[OptionType.alert] = OptionType.alert
style: State = State.info
icon: Union[str, None] = None
def __init__(self, question):
super().__init__(question)
self.style = question.get("style", "info")
def _get_prompt_message(self) -> str:
text = _value_for_locale(self.ask)
if self.style in ["success", "info", "warning", "danger"]:
color = {
"success": "green",
"info": "cyan",
"warning": "yellow",
"danger": "red",
}
prompt = m18n.g(self.style) if self.style != "danger" else m18n.n("danger")
return colorize(prompt, color[self.style]) + f" {text}"
else:
return text
def _get_prompt_message(self, value: None) -> str:
colors = {
State.success: "green",
State.info: "cyan",
State.warning: "yellow",
State.danger: "red",
}
message = m18n.g(self.style) if self.style != State.danger else m18n.n("danger")
return f"{colorize(message, colors[self.style])} {_value_for_locale(self.ask)}"
class ButtonOption(BaseReadonlyOption):
type: Literal[OptionType.button] = OptionType.button
enabled = True
def __init__(self, question):
super().__init__(question)
self.help = question.get("help")
self.style = question.get("style", "success")
self.enabled = question.get("enabled", True)
help: Union[Translation, None] = None
style: State = State.success
icon: Union[str, None] = None
enabled: Union[JSExpression, bool] = True
def is_enabled(self, context: Context) -> bool:
if isinstance(self.enabled, bool):
@ -337,16 +372,21 @@ class ButtonOption(BaseReadonlyOption):
class BaseInputOption(BaseOption):
hide_user_input_in_prompt = False
pattern: Optional[Dict] = None
help: Union[Translation, None] = None
example: Union[str, None] = None
placeholder: Union[str, None] = None
redact: bool = False
optional: bool = False # FIXME keep required as default?
default: Any = None
def __init__(self, question: Dict[str, Any]):
super().__init__(question)
self.default = question.get("default", None)
self.optional = question.get("optional", False)
self.pattern = question.get("pattern", self.pattern)
self.help = question.get("help")
self.redact = question.get("redact", False)
@validator("default", pre=True)
def check_empty_default(value: Any) -> Any:
if value == "":
return None
return value
# FIXME remove
def old__init__(self, question: Dict[str, Any]):
# .current_value is the currently stored value
self.current_value = question.get("current_value")
# .value is the "proposed" value which we got from the user
@ -354,10 +394,6 @@ class BaseInputOption(BaseOption):
# Use to return several values in case answer is in mutipart
self.values: Dict[str, Any] = {}
# Empty value is parsed as empty string
if self.default == "":
self.default = None
@staticmethod
def humanize(value, option={}):
return str(value)
@ -368,12 +404,12 @@ class BaseInputOption(BaseOption):
value = value.strip()
return value
def _get_prompt_message(self) -> str:
message = super()._get_prompt_message()
def _get_prompt_message(self, value: Any) -> str:
message = super()._get_prompt_message(value)
if self.readonly:
message = colorize(message, "purple")
return f"{message} {self.humanize(self.current_value)}"
return f"{message} {self.humanize(value, self)}"
return message
@ -418,7 +454,8 @@ class BaseInputOption(BaseOption):
class BaseStringOption(BaseInputOption):
default_value = ""
default: Union[str, None]
pattern: Union[Pattern, None] = None
class StringOption(BaseStringOption):
@ -429,27 +466,23 @@ class TextOption(BaseStringOption):
type: Literal[OptionType.text] = OptionType.text
FORBIDDEN_PASSWORD_CHARS = r"{}"
class PasswordOption(BaseInputOption):
type: Literal[OptionType.password] = OptionType.password
hide_user_input_in_prompt = True
default_value = ""
forbidden_chars = "{}"
def __init__(self, question):
super().__init__(question)
self.redact = True
if self.default is not None:
raise YunohostValidationError(
"app_argument_password_no_default", name=self.id
)
example: Literal[None] = None
default: Literal[None] = None
redact: Literal[True] = True
_forbidden_chars: str = FORBIDDEN_PASSWORD_CHARS
def _value_pre_validator(self):
super()._value_pre_validator()
if self.value not in [None, ""]:
if any(char in self.value for char in self.forbidden_chars):
if any(char in self.value for char in self._forbidden_chars):
raise YunohostValidationError(
"pattern_password_app", forbidden_chars=self.forbidden_chars
"pattern_password_app", forbidden_chars=self._forbidden_chars
)
# If it's an optional argument the value should be empty or strong enough
@ -458,26 +491,25 @@ class PasswordOption(BaseInputOption):
assert_password_is_strong_enough("user", self.value)
class ColorOption(BaseStringOption):
class ColorOption(BaseInputOption):
type: Literal[OptionType.color] = OptionType.color
pattern = {
"regexp": r"^#[ABCDEFabcdef\d]{3,6}$",
"error": "config_validate_color", # i18n: config_validate_color
}
default: Union[str, None]
# pattern = {
# "regexp": r"^#[ABCDEFabcdef\d]{3,6}$",
# "error": "config_validate_color", # i18n: config_validate_color
# }
# ─ NUMERIC ───────────────────────────────────────────────
class NumberOption(BaseInputOption):
# `number` and `range` are exactly the same, but `range` does render as a slider in web-admin
type: Literal[OptionType.number, OptionType.range] = OptionType.number
default_value = None
def __init__(self, question):
super().__init__(question)
self.min = question.get("min", None)
self.max = question.get("max", None)
self.step = question.get("step", None)
default: Union[int, None]
min: Union[int, None] = None
max: Union[int, None] = None
step: Union[int, None] = None
@staticmethod
def normalize(value, option={}):
@ -493,7 +525,7 @@ class NumberOption(BaseInputOption):
if value in [None, ""]:
return None
option = option.__dict__ if isinstance(option, BaseOption) else option
option = option.dict() if isinstance(option, BaseOption) else option
raise YunohostValidationError(
"app_argument_invalid",
name=option.get("id"),
@ -525,20 +557,15 @@ class NumberOption(BaseInputOption):
class BooleanOption(BaseInputOption):
type: Literal[OptionType.boolean] = OptionType.boolean
default_value = 0
yes_answers = ["1", "yes", "y", "true", "t", "on"]
no_answers = ["0", "no", "n", "false", "f", "off"]
def __init__(self, question):
super().__init__(question)
self.yes = question.get("yes", 1)
self.no = question.get("no", 0)
if self.default is None:
self.default = self.no
yes: Any = 1
no: Any = 0
default: Union[bool, int, str, None] = 0
_yes_answers: set[str] = {"1", "yes", "y", "true", "t", "on"}
_no_answers: set[str] = {"0", "no", "n", "false", "f", "off"}
@staticmethod
def humanize(value, option={}):
option = option.__dict__ if isinstance(option, BaseOption) else option
option = option.dict() if isinstance(option, BaseOption) else option
yes = option.get("yes", 1)
no = option.get("no", 0)
@ -561,7 +588,7 @@ class BooleanOption(BaseInputOption):
@staticmethod
def normalize(value, option={}):
option = option.__dict__ if isinstance(option, BaseOption) else option
option = option.dict() if isinstance(option, BaseOption) else option
if isinstance(value, str):
value = value.strip()
@ -569,8 +596,8 @@ class BooleanOption(BaseInputOption):
technical_yes = option.get("yes", 1)
technical_no = option.get("no", 0)
no_answers = BooleanOption.no_answers
yes_answers = BooleanOption.yes_answers
no_answers = BooleanOption._no_answers
yes_answers = BooleanOption._yes_answers
assert (
str(technical_yes).lower() not in no_answers
@ -579,8 +606,8 @@ class BooleanOption(BaseInputOption):
str(technical_no).lower() not in yes_answers
), f"'no' value can't be in {yes_answers}"
no_answers += [str(technical_no).lower()]
yes_answers += [str(technical_yes).lower()]
no_answers.add(str(technical_no).lower())
yes_answers.add(str(technical_yes).lower())
strvalue = str(value).lower()
@ -602,8 +629,8 @@ class BooleanOption(BaseInputOption):
def get(self, key, default=None):
return getattr(self, key, default)
def _get_prompt_message(self):
message = super()._get_prompt_message()
def _get_prompt_message(self, value: Union[bool, None]) -> str:
message = super()._get_prompt_message(value)
if not self.readonly:
message += " [yes | no]"
@ -614,12 +641,13 @@ class BooleanOption(BaseInputOption):
# ─ TIME ──────────────────────────────────────────────────
class DateOption(BaseStringOption):
class DateOption(BaseInputOption):
type: Literal[OptionType.date] = OptionType.date
pattern = {
"regexp": r"^\d{4}-\d\d-\d\d$",
"error": "config_validate_date", # i18n: config_validate_date
}
default: Union[str, None]
# pattern = {
# "regexp": r"^\d{4}-\d\d-\d\d$",
# "error": "config_validate_date", # i18n: config_validate_date
# }
def _value_pre_validator(self):
from datetime import datetime
@ -633,32 +661,34 @@ class DateOption(BaseStringOption):
raise YunohostValidationError("config_validate_date")
class TimeOption(BaseStringOption):
class TimeOption(BaseInputOption):
type: Literal[OptionType.time] = OptionType.time
pattern = {
"regexp": r"^(?:\d|[01]\d|2[0-3]):[0-5]\d$",
"error": "config_validate_time", # i18n: config_validate_time
}
default: Union[str, int, None]
# pattern = {
# "regexp": r"^(?:\d|[01]\d|2[0-3]):[0-5]\d$",
# "error": "config_validate_time", # i18n: config_validate_time
# }
# ─ LOCATIONS ─────────────────────────────────────────────
class EmailOption(BaseStringOption):
class EmailOption(BaseInputOption):
type: Literal[OptionType.email] = OptionType.email
pattern = {
"regexp": r"^.+@.+",
"error": "config_validate_email", # i18n: config_validate_email
}
default: Union[EmailStr, None]
# pattern = {
# "regexp": r"^.+@.+",
# "error": "config_validate_email", # i18n: config_validate_email
# }
class WebPathOption(BaseInputOption):
type: Literal[OptionType.path] = OptionType.path
default_value = ""
default: Union[str, None]
@staticmethod
def normalize(value, option={}):
option = option.__dict__ if isinstance(option, BaseOption) else option
option = option.dict() if isinstance(option, BaseOption) else option
if not isinstance(value, str):
raise YunohostValidationError(
@ -685,10 +715,11 @@ class WebPathOption(BaseInputOption):
class URLOption(BaseStringOption):
type: Literal[OptionType.url] = OptionType.url
pattern = {
"regexp": r"^https?://.*$",
"error": "config_validate_url", # i18n: config_validate_url
}
default: Union[str, None]
# pattern = {
# "regexp": r"^https?://.*$",
# "error": "config_validate_url", # i18n: config_validate_url
# }
# ─ FILE ──────────────────────────────────────────────────
@ -696,16 +727,16 @@ class URLOption(BaseStringOption):
class FileOption(BaseInputOption):
type: Literal[OptionType.file] = OptionType.file
upload_dirs: List[str] = []
def __init__(self, question):
super().__init__(question)
self.accept = question.get("accept", "")
# `FilePath` for CLI (path must exists and must be a file)
# `bytes` for API (a base64 encoded file actually)
accept: Union[str, None] = "" # currently only used by the web-admin
default: Union[str, None]
_upload_dirs: set[str] = set()
@classmethod
def clean_upload_dirs(cls):
# Delete files uploaded from API
for upload_dir in cls.upload_dirs:
for upload_dir in cls._upload_dirs:
if os.path.exists(upload_dir):
shutil.rmtree(upload_dir)
@ -738,7 +769,7 @@ class FileOption(BaseInputOption):
upload_dir = tempfile.mkdtemp(prefix="ynh_filequestion_")
_, file_path = tempfile.mkstemp(dir=upload_dir)
FileOption.upload_dirs += [upload_dir]
FileOption._upload_dirs.add(upload_dir)
logger.debug(f"Saving file {self.id} for file question into {file_path}")
@ -760,26 +791,30 @@ class FileOption(BaseInputOption):
# ─ CHOICES ───────────────────────────────────────────────
class BaseChoicesOption(BaseInputOption):
def __init__(
self,
question: Dict[str, Any],
):
super().__init__(question)
# Don't restrict choices if there's none specified
self.choices = question.get("choices", None)
ChoosableOptions = Literal[
OptionType.string,
OptionType.color,
OptionType.number,
OptionType.date,
OptionType.time,
OptionType.email,
OptionType.path,
OptionType.url,
]
def _get_prompt_message(self) -> str:
message = super()._get_prompt_message()
class BaseChoicesOption(BaseInputOption):
# FIXME probably forbid choices to be None?
choices: Union[dict[str, Any], list[Any], None]
def _get_prompt_message(self, value: Any) -> str:
message = super()._get_prompt_message(value)
if self.readonly:
message = message
choice = self.current_value
if isinstance(self.choices, dict) and value is not None:
value = self.choices[value]
if isinstance(self.choices, dict) and choice is not None:
choice = self.choices[choice]
return f"{colorize(message, 'purple')} {choice}"
return f"{colorize(message, 'purple')} {value}"
if self.choices:
# Prevent displaying a shitload of choices
@ -789,17 +824,15 @@ class BaseChoicesOption(BaseInputOption):
if isinstance(self.choices, dict)
else self.choices
)
choices_to_display = choices[:20]
splitted_choices = choices[:20]
remaining_choices = len(choices[20:])
if remaining_choices > 0:
choices_to_display += [
splitted_choices += [
m18n.n("other_available_options", n=remaining_choices)
]
choices_to_display = " | ".join(
str(choice) for choice in choices_to_display
)
choices_to_display = " | ".join(str(choice) for choice in splitted_choices)
return f"{message} [{choices_to_display}]"
@ -821,12 +854,15 @@ class BaseChoicesOption(BaseInputOption):
class SelectOption(BaseChoicesOption):
type: Literal[OptionType.select] = OptionType.select
default_value = ""
choices: Union[dict[str, Any], list[Any]]
default: Union[str, None]
class TagsOption(BaseChoicesOption):
type: Literal[OptionType.tags] = OptionType.tags
default_value = ""
choices: Union[list[str], None] = None
pattern: Union[Pattern, None] = None
default: Union[str, list[str], None]
@staticmethod
def humanize(value, option={}):
@ -879,20 +915,24 @@ class TagsOption(BaseChoicesOption):
class DomainOption(BaseChoicesOption):
type: Literal[OptionType.domain] = OptionType.domain
choices: Union[dict[str, str], None]
def __init__(self, question):
from yunohost.domain import domain_list, _get_maindomain
@root_validator()
def inject_domains_choices_and_default(cls, values: Values) -> Values:
# TODO remove calls to resources in validators (pydantic V2 should adress this)
from yunohost.domain import domain_list
super().__init__(question)
if self.default is None:
self.default = _get_maindomain()
self.choices = {
domain: domain + "" if domain == self.default else domain
for domain in domain_list()["domains"]
data = domain_list()
values["choices"] = {
domain: domain + "" if domain == data["main"] else domain
for domain in data["domains"]
}
if values["default"] is None:
values["default"] = data["main"]
return values
@staticmethod
def normalize(value, option={}):
if value.startswith("https://"):
@ -908,87 +948,99 @@ class DomainOption(BaseChoicesOption):
class AppOption(BaseChoicesOption):
type: Literal[OptionType.app] = OptionType.app
choices: Union[dict[str, str], None]
add_yunohost_portal_to_choices: bool = False
filter: Union[str, None] = None
def __init__(self, question):
@root_validator()
def inject_apps_choices(cls, values: Values) -> Values:
from yunohost.app import app_list
super().__init__(question)
self.filter = question.get("filter", None)
self.add_yunohost_portal_to_choices = question.get("add_yunohost_portal_to_choices", False)
apps = app_list(full=True)["apps"]
if self.filter:
if values.get("filter", None):
apps = [
app
for app in apps
if evaluate_simple_js_expression(self.filter, context=app)
if evaluate_simple_js_expression(values["filter"], context=app)
]
values["choices"] = {"_none": "---"}
def _app_display(app):
domain_path_or_id = f" ({app.get('domain_path', app['id'])})"
return app["label"] + domain_path_or_id
if values.get("add_yunohost_portal_to_choices", False):
values["choices"]["_yunohost_portal_with_public_apps"] = "YunoHost's portal with public apps"
self.choices = {"_none": "---"}
if self.add_yunohost_portal_to_choices:
# FIXME: i18n
self.choices["_yunohost_portal_with_public_apps"] = "YunoHost's portal with public apps"
self.choices.update({app["id"]: _app_display(app) for app in apps})
values["choices"].update(
{
app["id"]: f"{app['label']} ({app.get('domain_path', app['id'])})"
for app in apps
}
)
return values
class UserOption(BaseChoicesOption):
type: Literal[OptionType.user] = OptionType.user
choices: Union[dict[str, str], None]
def __init__(self, question):
from yunohost.user import user_list, user_info
@root_validator()
def inject_users_choices_and_default(cls, values: dict[str, Any]) -> dict[str, Any]:
from yunohost.domain import _get_maindomain
from yunohost.user import user_info, user_list
super().__init__(question)
self.choices = {
values["choices"] = {
username: f"{infos['fullname']} ({infos['mail']})"
for username, infos in user_list()["users"].items()
}
if not self.choices:
# FIXME keep this to test if any user, do not raise error if no admin?
if not values["choices"]:
raise YunohostValidationError(
"app_argument_invalid",
name=self.id,
name=values["id"],
error="You should create a YunoHost user first.",
)
if self.default is None:
if values["default"] is None:
# FIXME: this code is obsolete with the new admins group
# Should be replaced by something like "any first user we find in the admin group"
root_mail = "root@%s" % _get_maindomain()
for user in self.choices.keys():
for user in values["choices"].keys():
if root_mail in user_info(user).get("mail-aliases", []):
self.default = user
values["default"] = user
break
return values
class GroupOption(BaseChoicesOption):
type: Literal[OptionType.group] = OptionType.group
choices: Union[dict[str, str], None]
def __init__(self, question):
@root_validator()
def inject_groups_choices_and_default(cls, values: Values) -> Values:
from yunohost.user import user_group_list
super().__init__(question)
groups = user_group_list(short=True, include_primary_groups=False)["groups"]
self.choices = list(
user_group_list(short=True, include_primary_groups=False)["groups"]
)
def _human_readable_group(g):
def _human_readable_group(groupname):
# i18n: visitors
# i18n: all_users
# i18n: admins
return m18n.n(g) if g in ["visitors", "all_users", "admins"] else g
return (
m18n.n(groupname)
if groupname in ["visitors", "all_users", "admins"]
else groupname
)
self.choices = {g: _human_readable_group(g) for g in self.choices}
values["choices"] = {
groupname: _human_readable_group(groupname) for groupname in groups
}
if self.default is None:
self.default = "all_users"
if values["default"] is None:
values["default"] = "all_users"
return values
OPTIONS = {
@ -997,7 +1049,7 @@ OPTIONS = {
OptionType.alert: AlertOption,
OptionType.button: ButtonOption,
OptionType.string: StringOption,
OptionType.text: StringOption,
OptionType.text: TextOption,
OptionType.password: PasswordOption,
OptionType.color: ColorOption,
OptionType.number: NumberOption,