mirror of
https://github.com/YunoHost/yunohost.git
synced 2024-09-03 20:06:10 +02:00
form: rework pre + post options validators
This commit is contained in:
parent
3943774811
commit
ec5da99a79
1 changed files with 155 additions and 122 deletions
|
@ -60,7 +60,7 @@ from yunohost.utils.error import YunohostError, YunohostValidationError
|
|||
from yunohost.utils.i18n import _value_for_locale
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pydantic.fields import FieldInfo
|
||||
from pydantic.fields import ModelField, FieldInfo
|
||||
|
||||
logger = getLogger("yunohost.form")
|
||||
|
||||
|
@ -397,6 +397,7 @@ class BaseInputOption(BaseOption):
|
|||
optional: bool = False # FIXME keep required as default?
|
||||
default: Any = None
|
||||
_annotation = Any
|
||||
_none_as_empty_str: bool = True
|
||||
|
||||
@validator("default", pre=True)
|
||||
def check_empty_default(value: Any) -> Any:
|
||||
|
@ -405,7 +406,9 @@ class BaseInputOption(BaseOption):
|
|||
return value
|
||||
|
||||
@staticmethod
|
||||
def humanize(value, option={}):
|
||||
def humanize(value: Any, option={}) -> str:
|
||||
if value is None:
|
||||
return ""
|
||||
return str(value)
|
||||
|
||||
@staticmethod
|
||||
|
@ -474,31 +477,30 @@ class BaseInputOption(BaseOption):
|
|||
|
||||
return message
|
||||
|
||||
def _value_pre_validator(self):
|
||||
if self.value in [None, ""] and not self.optional:
|
||||
raise YunohostValidationError("app_argument_required", name=self.id)
|
||||
@classmethod
|
||||
def _value_pre_validator(cls, value: Any, field: "ModelField") -> Any:
|
||||
if value == "":
|
||||
return None
|
||||
|
||||
# we have an answer, do some post checks
|
||||
if self.value not in [None, ""]:
|
||||
if self.pattern and not re.match(self.pattern["regexp"], str(self.value)):
|
||||
raise YunohostValidationError(
|
||||
self.pattern["error"],
|
||||
name=self.id,
|
||||
value=self.value,
|
||||
)
|
||||
return value
|
||||
|
||||
def _value_post_validator(self):
|
||||
if not self.redact:
|
||||
return self.value
|
||||
@classmethod
|
||||
def _value_post_validator(cls, value: Any, field: "ModelField") -> Any:
|
||||
extras = field.field_info.extra
|
||||
|
||||
if value is None and extras["none_as_empty_str"]:
|
||||
value = ""
|
||||
|
||||
if not extras.get("redact"):
|
||||
return value
|
||||
|
||||
# Tell the operation_logger to redact all password-type / secret args
|
||||
# Also redact the % escaped version of the password that might appear in
|
||||
# the 'args' section of metadata (relevant for password with non-alphanumeric char)
|
||||
data_to_redact = []
|
||||
if self.value and isinstance(self.value, str):
|
||||
data_to_redact.append(self.value)
|
||||
if self.current_value and isinstance(self.current_value, str):
|
||||
data_to_redact.append(self.current_value)
|
||||
if value and isinstance(value, str):
|
||||
data_to_redact.append(value)
|
||||
|
||||
data_to_redact += [
|
||||
urllib.parse.quote(data)
|
||||
for data in data_to_redact
|
||||
|
@ -508,7 +510,7 @@ class BaseInputOption(BaseOption):
|
|||
for operation_logger in OperationLogger._instances:
|
||||
operation_logger.data_to_redact.extend(data_to_redact)
|
||||
|
||||
return self.value
|
||||
return value
|
||||
|
||||
|
||||
# ─ STRINGS ───────────────────────────────────────────────
|
||||
|
@ -561,19 +563,25 @@ class PasswordOption(BaseInputOption):
|
|||
|
||||
return attrs
|
||||
|
||||
def _value_pre_validator(self):
|
||||
super()._value_pre_validator()
|
||||
@classmethod
|
||||
def _value_pre_validator(
|
||||
cls, value: Union[str, None], field: "ModelField"
|
||||
) -> Union[str, None]:
|
||||
value = super()._value_pre_validator(value, field)
|
||||
|
||||
if self.value not in [None, ""]:
|
||||
if any(char in self.value for char in self._forbidden_chars):
|
||||
if value is not None and value != "":
|
||||
forbidden_chars: str = field.field_info.extra["forbidden_chars"]
|
||||
if any(char in value for char in forbidden_chars):
|
||||
raise YunohostValidationError(
|
||||
"pattern_password_app", forbidden_chars=self._forbidden_chars
|
||||
"pattern_password_app", forbidden_chars=forbidden_chars
|
||||
)
|
||||
|
||||
# If it's an optional argument the value should be empty or strong enough
|
||||
from yunohost.utils.password import assert_password_is_strong_enough
|
||||
|
||||
assert_password_is_strong_enough("user", self.value)
|
||||
assert_password_is_strong_enough("user", value)
|
||||
|
||||
return value
|
||||
|
||||
|
||||
class ColorOption(BaseInputOption):
|
||||
|
@ -581,6 +589,29 @@ class ColorOption(BaseInputOption):
|
|||
default: Union[str, None]
|
||||
_annotation = Color
|
||||
|
||||
@staticmethod
|
||||
def humanize(value: Union[Color, str, None], option={}) -> str:
|
||||
if isinstance(value, Color):
|
||||
value.as_named(fallback=True)
|
||||
|
||||
return super(ColorOption, ColorOption).humanize(value, option)
|
||||
|
||||
@staticmethod
|
||||
def normalize(value: Union[Color, str, None], option={}) -> str:
|
||||
if isinstance(value, Color):
|
||||
return value.as_hex()
|
||||
|
||||
return super(ColorOption, ColorOption).normalize(value, option)
|
||||
|
||||
@classmethod
|
||||
def _value_post_validator(
|
||||
cls, value: Union[Color, None], field: "ModelField"
|
||||
) -> Union[str, None]:
|
||||
if isinstance(value, Color):
|
||||
return value.as_hex()
|
||||
|
||||
return super()._value_post_validator(value, field)
|
||||
|
||||
|
||||
# ─ NUMERIC ───────────────────────────────────────────────
|
||||
|
||||
|
@ -623,24 +654,16 @@ class NumberOption(BaseInputOption):
|
|||
|
||||
return attrs
|
||||
|
||||
def _value_pre_validator(self):
|
||||
super()._value_pre_validator()
|
||||
if self.value in [None, ""]:
|
||||
return
|
||||
@classmethod
|
||||
def _value_pre_validator(
|
||||
cls, value: Union[int, None], field: "ModelField"
|
||||
) -> Union[int, None]:
|
||||
value = super()._value_pre_validator(value, field)
|
||||
|
||||
if self.min is not None and int(self.value) < self.min:
|
||||
raise YunohostValidationError(
|
||||
"app_argument_invalid",
|
||||
name=self.id,
|
||||
error=m18n.n("invalid_number_min", min=self.min),
|
||||
)
|
||||
if value is None:
|
||||
return None
|
||||
|
||||
if self.max is not None and int(self.value) > self.max:
|
||||
raise YunohostValidationError(
|
||||
"app_argument_invalid",
|
||||
name=self.id,
|
||||
error=m18n.n("invalid_number_max", max=self.max),
|
||||
)
|
||||
return value
|
||||
|
||||
|
||||
# ─ BOOLEAN ───────────────────────────────────────────────
|
||||
|
@ -654,6 +677,7 @@ class BooleanOption(BaseInputOption):
|
|||
_annotation = Union[bool, int, str]
|
||||
_yes_answers: set[str] = {"1", "yes", "y", "true", "t", "on"}
|
||||
_no_answers: set[str] = {"0", "no", "n", "false", "f", "off"}
|
||||
_none_as_empty_str = False
|
||||
|
||||
@staticmethod
|
||||
def humanize(value, option={}):
|
||||
|
@ -737,6 +761,15 @@ class BooleanOption(BaseInputOption):
|
|||
|
||||
return message
|
||||
|
||||
@classmethod
|
||||
def _value_post_validator(
|
||||
cls, value: Union[bool, None], field: "ModelField"
|
||||
) -> Any:
|
||||
if isinstance(value, bool):
|
||||
return field.field_info.extra["parse"][value]
|
||||
|
||||
return super()._value_post_validator(value, field)
|
||||
|
||||
|
||||
# ─ TIME ──────────────────────────────────────────────────
|
||||
|
||||
|
@ -746,14 +779,14 @@ class DateOption(BaseInputOption):
|
|||
default: Union[str, None]
|
||||
_annotation = datetime.date
|
||||
|
||||
def _value_pre_validator(self):
|
||||
super()._value_pre_validator()
|
||||
@classmethod
|
||||
def _value_post_validator(
|
||||
cls, value: Union[datetime.date, None], field: "ModelField"
|
||||
) -> Union[str, None]:
|
||||
if isinstance(value, datetime.date):
|
||||
return value.isoformat()
|
||||
|
||||
if self.value not in [None, ""]:
|
||||
try:
|
||||
datetime.strptime(self.value, "%Y-%m-%d")
|
||||
except ValueError:
|
||||
raise YunohostValidationError("config_validate_date")
|
||||
return super()._value_post_validator(value, field)
|
||||
|
||||
|
||||
class TimeOption(BaseInputOption):
|
||||
|
@ -761,6 +794,16 @@ class TimeOption(BaseInputOption):
|
|||
default: Union[str, int, None]
|
||||
_annotation = datetime.time
|
||||
|
||||
@classmethod
|
||||
def _value_post_validator(
|
||||
cls, value: Union[datetime.date, None], field: "ModelField"
|
||||
) -> Union[str, None]:
|
||||
if isinstance(value, datetime.time):
|
||||
# FIXME could use `value.isoformat()` to get `%H:%M:%S`
|
||||
return value.strftime("%H:%M")
|
||||
|
||||
return super()._value_post_validator(value, field)
|
||||
|
||||
|
||||
# ─ LOCATIONS ─────────────────────────────────────────────
|
||||
|
||||
|
@ -780,6 +823,9 @@ class WebPathOption(BaseInputOption):
|
|||
def normalize(value, option={}):
|
||||
option = option.dict() if isinstance(option, BaseOption) else option
|
||||
|
||||
if value is None:
|
||||
value = ""
|
||||
|
||||
if not isinstance(value, str):
|
||||
raise YunohostValidationError(
|
||||
"app_argument_invalid",
|
||||
|
@ -828,52 +874,40 @@ class FileOption(BaseInputOption):
|
|||
if os.path.exists(upload_dir):
|
||||
shutil.rmtree(upload_dir)
|
||||
|
||||
def _value_pre_validator(self):
|
||||
if self.value is None:
|
||||
self.value = self.current_value
|
||||
|
||||
super()._value_pre_validator()
|
||||
|
||||
# Validation should have already failed if required
|
||||
if self.value in [None, ""]:
|
||||
return self.value
|
||||
|
||||
if Moulinette.interface.type != "api":
|
||||
if not os.path.exists(str(self.value)) or not os.path.isfile(
|
||||
str(self.value)
|
||||
):
|
||||
raise YunohostValidationError(
|
||||
"app_argument_invalid",
|
||||
name=self.id,
|
||||
error=m18n.n("file_does_not_exist", path=str(self.value)),
|
||||
)
|
||||
|
||||
def _value_post_validator(self):
|
||||
@classmethod
|
||||
def _value_post_validator(cls, value: Any, field: "ModelField") -> Any:
|
||||
from base64 import b64decode
|
||||
|
||||
if not self.value:
|
||||
if not value:
|
||||
return ""
|
||||
|
||||
def is_file_path(s):
|
||||
return (
|
||||
isinstance(s, str)
|
||||
and s.startswith("/")
|
||||
and os.path.exists(s)
|
||||
and os.path.isfile(s)
|
||||
)
|
||||
|
||||
file_exists = is_file_path(value)
|
||||
if Moulinette.interface.type != "api" and not file_exists:
|
||||
# FIXME error
|
||||
raise YunohostValidationError("File doesn't exists", raw_msg=True)
|
||||
elif file_exists:
|
||||
content = read_file(str(value), file_mode="rb")
|
||||
else:
|
||||
content = b64decode(value)
|
||||
|
||||
upload_dir = tempfile.mkdtemp(prefix="ynh_filequestion_")
|
||||
_, file_path = tempfile.mkstemp(dir=upload_dir)
|
||||
|
||||
FileOption._upload_dirs.add(upload_dir)
|
||||
|
||||
logger.debug(f"Saving file {self.id} for file question into {file_path}")
|
||||
|
||||
def is_file_path(s):
|
||||
return isinstance(s, str) and s.startswith("/") and os.path.exists(s)
|
||||
|
||||
if Moulinette.interface.type != "api" or is_file_path(self.value):
|
||||
content = read_file(str(self.value), file_mode="rb")
|
||||
else:
|
||||
content = b64decode(self.value)
|
||||
logger.debug(f"Saving file {field.name} for file question into {file_path}")
|
||||
|
||||
write_to_file(file_path, content, file_mode="wb")
|
||||
|
||||
self.value = file_path
|
||||
|
||||
return self.value
|
||||
return file_path
|
||||
|
||||
|
||||
# ─ CHOICES ───────────────────────────────────────────────
|
||||
|
@ -895,6 +929,13 @@ class BaseChoicesOption(BaseInputOption):
|
|||
# FIXME probably forbid choices to be None?
|
||||
choices: Union[dict[str, Any], list[Any], None]
|
||||
|
||||
@validator("choices", pre=True)
|
||||
def parse_comalist_choices(value: Any) -> Union[dict[str, Any], list[Any], None]:
|
||||
if isinstance(value, str):
|
||||
values = [value.strip() for value in value.split(",")]
|
||||
return [value for value in values if value]
|
||||
return value
|
||||
|
||||
@property
|
||||
def _dynamic_annotation(self) -> Union[object, Type[str]]:
|
||||
if self.choices is not None:
|
||||
|
@ -937,19 +978,6 @@ class BaseChoicesOption(BaseInputOption):
|
|||
|
||||
return message
|
||||
|
||||
def _value_pre_validator(self):
|
||||
super()._value_pre_validator()
|
||||
|
||||
# we have an answer, do some post checks
|
||||
if self.value not in [None, ""]:
|
||||
if self.choices and self.value not in self.choices:
|
||||
raise YunohostValidationError(
|
||||
"app_argument_choice_invalid",
|
||||
name=self.id,
|
||||
value=self.value,
|
||||
choices=", ".join(str(choice) for choice in self.choices),
|
||||
)
|
||||
|
||||
|
||||
class SelectOption(BaseChoicesOption):
|
||||
type: Literal[OptionType.select] = OptionType.select
|
||||
|
@ -969,6 +997,8 @@ class TagsOption(BaseChoicesOption):
|
|||
def humanize(value, option={}):
|
||||
if isinstance(value, list):
|
||||
return ",".join(str(v) for v in value)
|
||||
if not value:
|
||||
return ""
|
||||
return value
|
||||
|
||||
@staticmethod
|
||||
|
@ -976,7 +1006,9 @@ class TagsOption(BaseChoicesOption):
|
|||
if isinstance(value, list):
|
||||
return ",".join(str(v) for v in value)
|
||||
if isinstance(value, str):
|
||||
value = value.strip()
|
||||
value = value.strip().strip(",")
|
||||
if value is None or value == "":
|
||||
return ""
|
||||
return value
|
||||
|
||||
@property
|
||||
|
@ -999,36 +1031,37 @@ class TagsOption(BaseChoicesOption):
|
|||
|
||||
return attrs
|
||||
|
||||
def _value_pre_validator(self):
|
||||
values = self.value
|
||||
if isinstance(values, str):
|
||||
values = values.split(",")
|
||||
elif values is None:
|
||||
values = []
|
||||
@classmethod
|
||||
def _value_pre_validator(
|
||||
cls, value: Union[list, str, None], field: "ModelField"
|
||||
) -> Union[str, None]:
|
||||
if value is None or value == "":
|
||||
return None
|
||||
|
||||
if not isinstance(values, list):
|
||||
if self.choices:
|
||||
raise YunohostValidationError(
|
||||
"app_argument_choice_invalid",
|
||||
name=self.id,
|
||||
value=self.value,
|
||||
choices=", ".join(str(choice) for choice in self.choices),
|
||||
)
|
||||
if not isinstance(value, (list, str, type(None))):
|
||||
raise YunohostValidationError(
|
||||
"app_argument_invalid",
|
||||
name=self.id,
|
||||
error=f"'{str(self.value)}' is not a list",
|
||||
name=field.name,
|
||||
error=f"'{str(value)}' is not a list",
|
||||
)
|
||||
|
||||
for value in values:
|
||||
self.value = value
|
||||
super()._value_pre_validator()
|
||||
self.value = values
|
||||
if isinstance(value, str):
|
||||
value = [v.strip() for v in value.split(",")]
|
||||
value = [v for v in value if v]
|
||||
|
||||
def _value_post_validator(self):
|
||||
if isinstance(self.value, list):
|
||||
self.value = ",".join(self.value)
|
||||
return super()._value_post_validator()
|
||||
if isinstance(value, list):
|
||||
choices = field.field_info.extra.get("choices")
|
||||
if choices:
|
||||
if not all(v in choices for v in value):
|
||||
raise YunohostValidationError(
|
||||
"app_argument_choice_invalid",
|
||||
name=field.name,
|
||||
value=value,
|
||||
choices=", ".join(str(choice) for choice in choices),
|
||||
)
|
||||
|
||||
return ",".join(str(v) for v in value)
|
||||
return value
|
||||
|
||||
|
||||
# ─ ENTITIES ──────────────────────────────────────────────
|
||||
|
|
Loading…
Add table
Reference in a new issue