Merge pull request #1338 from YunoHost/rework-prompt-again

Rework prompt() again
This commit is contained in:
Alexandre Aubin 2021-09-23 21:47:49 +02:00 committed by GitHub
commit 7484f138b1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 835 additions and 629 deletions

View file

@ -13,10 +13,8 @@
"app_already_installed": "{app} is already installed",
"app_already_installed_cant_change_url": "This app is already installed. The URL cannot be changed just by this function. Check in `app changeurl` if it's available.",
"app_already_up_to_date": "{app} is already up-to-date",
"app_argument_choice_invalid": "Use one of these choices '{choices}' for the argument '{name}' instead of '{value}'",
"app_argument_choice_invalid": "Pick a valid value for argument '{name}': '{value}' is not among the available choices ({choices})",
"app_argument_invalid": "Pick a valid value for the argument '{name}': {error}",
"app_argument_password_help_keep": "Press Enter to keep the current value",
"app_argument_password_help_optional": "Type one space to empty the password",
"app_argument_password_no_default": "Error while parsing password argument '{name}': password argument can't have a default value for security reason",
"app_argument_required": "Argument '{name}' is required",
"app_change_url_identical_domains": "The old and new domain/url_path are identical ('{domain}{path}'), nothing to do.",
@ -366,7 +364,6 @@
"extracting": "Extracting...",
"field_invalid": "Invalid field '{}'",
"file_does_not_exist": "The file {path} does not exist.",
"file_extension_not_accepted": "Refusing file '{path}' because its extension is not among the accepted extensions: {accept}",
"firewall_reload_failed": "Could not reload the firewall",
"firewall_reloaded": "Firewall reloaded",
"firewall_rules_cmd_failed": "Some firewall rule commands have failed. More info in log.",
@ -541,6 +538,7 @@
"migrations_to_be_ran_manually": "Migration {id} has to be run manually. Please go to Tools → Migrations on the webadmin page, or run `yunohost tools migrations run`.",
"not_enough_disk_space": "Not enough free space on '{path}'",
"operation_interrupted": "The operation was manually interrupted?",
"other_available_options": "... and {n} other available options not shown",
"packages_upgrade_failed": "Could not upgrade all the packages",
"password_listed": "This password is among the most used passwords in the world. Please choose something more unique.",
"password_too_simple_1": "The password needs to be at least 8 characters long",

View file

@ -32,9 +32,9 @@ import time
import re
import subprocess
import glob
import urllib.parse
import tempfile
from collections import OrderedDict
from typing import List
from moulinette import Moulinette, m18n
from moulinette.core import MoulinetteError
@ -55,7 +55,10 @@ from moulinette.utils.filesystem import (
from yunohost.utils import packages
from yunohost.utils.config import (
ConfigPanel,
parse_args_in_yunohost_format,
ask_questions_and_parse_answers,
Question,
DomainQuestion,
PathQuestion
)
from yunohost.utils.i18n import _value_for_locale
from yunohost.utils.error import YunohostError, YunohostValidationError
@ -442,8 +445,11 @@ def app_change_url(operation_logger, app, domain, path):
old_path = app_setting(app, "path")
# Normalize path and domain format
old_domain, old_path = _normalize_domain_path(old_domain, old_path)
domain, path = _normalize_domain_path(domain, path)
domain = DomainQuestion.normalize(domain)
old_domain = DomainQuestion.normalize(old_domain)
path = PathQuestion.normalize(path)
old_path = PathQuestion.normalize(old_path)
if (domain, path) == (old_domain, old_path):
raise YunohostValidationError(
@ -453,16 +459,10 @@ def app_change_url(operation_logger, app, domain, path):
# Check the url is available
_assert_no_conflicting_apps(domain, path, ignore_app=app)
manifest = _get_manifest_of_app(os.path.join(APPS_SETTING_PATH, app))
# Retrieve arguments list for change_url script
# TODO: Allow to specify arguments
args_odict = _parse_args_from_manifest(manifest, "change_url")
tmp_workdir_for_app = _make_tmp_workdir_for_app(app=app)
# Prepare env. var. to pass to script
env_dict = _make_environment_for_app_script(app, args=args_odict)
env_dict = _make_environment_for_app_script(app)
env_dict["YNH_APP_OLD_DOMAIN"] = old_domain
env_dict["YNH_APP_OLD_PATH"] = old_path
env_dict["YNH_APP_NEW_DOMAIN"] = domain
@ -614,12 +614,8 @@ def app_upgrade(app=[], url=None, file=None, force=False, no_safety_backup=False
app_setting_path = os.path.join(APPS_SETTING_PATH, app_instance_name)
# Retrieve arguments list for upgrade script
# TODO: Allow to specify arguments
args_odict = _parse_args_from_manifest(manifest, "upgrade")
# Prepare env. var. to pass to script
env_dict = _make_environment_for_app_script(app_instance_name, args=args_odict)
env_dict = _make_environment_for_app_script(app_instance_name)
env_dict["YNH_APP_UPGRADE_TYPE"] = upgrade_type
env_dict["YNH_APP_MANIFEST_VERSION"] = str(app_new_version)
env_dict["YNH_APP_CURRENT_VERSION"] = str(app_current_version)
@ -905,13 +901,13 @@ def app_install(
app_instance_name = app_id
# Retrieve arguments list for install script
args_dict = (
{} if not args else dict(urllib.parse.parse_qsl(args, keep_blank_values=True))
)
args_odict = _parse_args_from_manifest(manifest, "install", args=args_dict)
raw_questions = manifest.get("arguments", {}).get("install", {})
questions = ask_questions_and_parse_answers(raw_questions, prefilled_answers=args)
args = {question.name: question.value for question in questions if question.value is not None}
# Validate domain / path availability for webapps
_validate_and_normalize_webpath(args_odict, extracted_app_folder)
path_requirement = _guess_webapp_path_requirement(questions, extracted_app_folder)
_validate_webpath_requirement(questions, path_requirement)
# Attempt to patch legacy helpers ...
_patch_legacy_helpers(extracted_app_folder)
@ -976,13 +972,14 @@ def app_install(
)
# Prepare env. var. to pass to script
env_dict = _make_environment_for_app_script(app_instance_name, args=args_odict)
env_dict = _make_environment_for_app_script(app_instance_name, args=args)
env_dict["YNH_APP_BASEDIR"] = extracted_app_folder
env_dict_for_logging = env_dict.copy()
for arg_name, arg_value_and_type in args_odict.items():
if arg_value_and_type[1] == "password":
del env_dict_for_logging["YNH_APP_ARG_%s" % arg_name.upper()]
for question in questions:
# Or should it be more generally question.redact ?
if question.type == "password":
del env_dict_for_logging["YNH_APP_ARG_%s" % question.name.upper()]
operation_logger.extra.update({"env": env_dict_for_logging})
@ -1472,7 +1469,8 @@ def app_register_url(app, domain, path):
permission_sync_to_user,
)
domain, path = _normalize_domain_path(domain, path)
domain = DomainQuestion.normalize(domain)
path = PathQuestion.normalize(path)
# We cannot change the url of an app already installed simply by changing
# the settings...
@ -1642,15 +1640,14 @@ def app_action_run(operation_logger, app, action, args=None):
action_declaration = actions[action]
# Retrieve arguments list for install script
args_dict = (
dict(urllib.parse.parse_qsl(args, keep_blank_values=True)) if args else {}
)
args_odict = _parse_args_for_action(actions[action], args=args_dict)
raw_questions = actions[action].get("arguments", {})
questions = ask_questions_and_parse_answers(raw_questions, prefilled_answers=args)
args = {question.name: question.value for question in questions if question.value is not None}
tmp_workdir_for_app = _make_tmp_workdir_for_app(app=app)
env_dict = _make_environment_for_app_script(
app, args=args_odict, args_prefix="ACTION_"
app, args=args, args_prefix="ACTION_"
)
env_dict["YNH_ACTION"] = action
env_dict["YNH_APP_BASEDIR"] = tmp_workdir_for_app
@ -2380,79 +2377,20 @@ def _check_manifest_requirements(manifest, app_instance_name):
)
def _parse_args_from_manifest(manifest, action, args={}):
"""Parse arguments needed for an action from the manifest
Retrieve specified arguments for the action from the manifest, and parse
given args according to that. If some required arguments are not provided,
its values will be asked if interaction is possible.
Parsed arguments will be returned as an OrderedDict
Keyword arguments:
manifest -- The app manifest to use
action -- The action to retrieve arguments for
args -- A dictionnary of arguments to parse
"""
if action not in manifest["arguments"]:
logger.debug("no arguments found for '%s' in manifest", action)
return OrderedDict()
action_args = manifest["arguments"][action]
return parse_args_in_yunohost_format(args, action_args)
def _parse_args_for_action(action, args={}):
"""Parse arguments needed for an action from the actions list
Retrieve specified arguments for the action from the manifest, and parse
given args according to that. If some required arguments are not provided,
its values will be asked if interaction is possible.
Parsed arguments will be returned as an OrderedDict
Keyword arguments:
action -- The action
args -- A dictionnary of arguments to parse
"""
args_dict = OrderedDict()
if "arguments" not in action:
logger.debug("no arguments found for '%s' in manifest", action)
return args_dict
action_args = action["arguments"]
return parse_args_in_yunohost_format(args, action_args)
def _validate_and_normalize_webpath(args_dict, app_folder):
def _guess_webapp_path_requirement(questions: List[Question], app_folder: str) -> str:
# If there's only one "domain" and "path", validate that domain/path
# is an available url and normalize the path.
domain_args = [
(name, value[0]) for name, value in args_dict.items() if value[1] == "domain"
]
path_args = [
(name, value[0]) for name, value in args_dict.items() if value[1] == "path"
]
domain_questions = [question for question in questions if question.type == "domain"]
path_questions = [question for question in questions if question.type == "path"]
if len(domain_args) == 1 and len(path_args) == 1:
domain = domain_args[0][1]
path = path_args[0][1]
domain, path = _normalize_domain_path(domain, path)
# Check the url is available
_assert_no_conflicting_apps(domain, path)
# (We save this normalized path so that the install script have a
# standard path format to deal with no matter what the user inputted)
args_dict[path_args[0][0]] = (path, "path")
# This is likely to be a full-domain app...
elif len(domain_args) == 1 and len(path_args) == 0:
if len(domain_questions) == 0 and len(path_questions) == 0:
return ""
if len(domain_questions) == 1 and len(path_questions) == 1:
return "domain_and_path"
if len(domain_questions) == 1 and len(path_questions) == 0:
# This is likely to be a full-domain app...
# Confirm that this is a full-domain app This should cover most cases
# ... though anyway the proper solution is to implement some mechanism
@ -2462,36 +2400,33 @@ def _validate_and_normalize_webpath(args_dict, app_folder):
# Full-domain apps typically declare something like path_url="/" or path=/
# and use ynh_webpath_register or yunohost_app_checkurl inside the install script
install_script_content = open(
os.path.join(app_folder, "scripts/install")
).read()
install_script_content = read_file(os.path.join(app_folder, "scripts/install"))
if re.search(
r"\npath(_url)?=[\"']?/[\"']?\n", install_script_content
r"\npath(_url)?=[\"']?/[\"']?", install_script_content
) and re.search(
r"(ynh_webpath_register|yunohost app checkurl)", install_script_content
r"ynh_webpath_register", install_script_content
):
return "full_domain"
domain = domain_args[0][1]
_assert_no_conflicting_apps(domain, "/", full_domain=True)
return "?"
def _normalize_domain_path(domain, path):
def _validate_webpath_requirement(questions: List[Question], path_requirement: str) -> None:
# We want url to be of the format :
# some.domain.tld/foo
domain_questions = [question for question in questions if question.type == "domain"]
path_questions = [question for question in questions if question.type == "path"]
# Remove http/https prefix if it's there
if domain.startswith("https://"):
domain = domain[len("https://") :]
elif domain.startswith("http://"):
domain = domain[len("http://") :]
if path_requirement == "domain_and_path":
# Remove trailing slashes
domain = domain.rstrip("/").lower()
path = "/" + path.strip("/")
domain = domain_questions[0].value
path = path_questions[0].value
_assert_no_conflicting_apps(domain, path, full_domain=True)
return domain, path
elif path_requirement == "full_domain":
domain = domain_questions[0].value
_assert_no_conflicting_apps(domain, "/", full_domain=True)
def _get_conflicting_apps(domain, path, ignore_app=None):
@ -2506,7 +2441,8 @@ def _get_conflicting_apps(domain, path, ignore_app=None):
from yunohost.domain import _assert_domain_exists
domain, path = _normalize_domain_path(domain, path)
domain = DomainQuestion.normalize(domain)
path = PathQuestion.normalize(path)
# Abort if domain is unknown
_assert_domain_exists(domain)
@ -2569,10 +2505,8 @@ def _make_environment_for_app_script(app, args={}, args_prefix="APP_ARG_"):
"YNH_APP_MANIFEST_VERSION": manifest.get("version", "?"),
}
for arg_name, arg_value_and_type in args.items():
env_dict["YNH_%s%s" % (args_prefix, arg_name.upper())] = str(
arg_value_and_type[0]
)
for arg_name, arg_value in args.items():
env_dict["YNH_%s%s" % (args_prefix, arg_name.upper())] = str(arg_value)
return env_dict

View file

@ -2,9 +2,11 @@ import glob
import os
import shutil
import pytest
from mock import patch
from .conftest import get_test_apps_dir
from moulinette import Moulinette
from moulinette.utils.filesystem import read_file
from yunohost.domain import _get_maindomain
@ -146,7 +148,9 @@ def test_app_config_regular_setting(config_app):
assert app_config_get(config_app, "main.components.boolean") == "1"
assert app_setting(config_app, "boolean") == "1"
with pytest.raises(YunohostValidationError):
with pytest.raises(YunohostValidationError), \
patch.object(os, "isatty", return_value=False), \
patch.object(Moulinette, "prompt", return_value="pwet"):
app_config_set(config_app, "main.components.boolean", "pwet")

View file

@ -4,7 +4,7 @@ import os
from .conftest import get_test_apps_dir
from yunohost.utils.error import YunohostError
from yunohost.app import app_install, app_remove, _normalize_domain_path
from yunohost.app import app_install, app_remove
from yunohost.domain import _get_maindomain, domain_url_available
from yunohost.permission import _validate_and_sanitize_permission_url
@ -28,22 +28,6 @@ def teardown_function(function):
pass
def test_normalize_domain_path():
assert _normalize_domain_path("https://yolo.swag/", "macnuggets") == (
"yolo.swag",
"/macnuggets",
)
assert _normalize_domain_path("http://yolo.swag", "/macnuggets/") == (
"yolo.swag",
"/macnuggets",
)
assert _normalize_domain_path("yolo.swag/", "macnuggets/") == (
"yolo.swag",
"/macnuggets",
)
def test_urlavailable():
# Except the maindomain/macnuggets to be available

File diff suppressed because it is too large Load diff

View file

@ -420,7 +420,7 @@ def user_update(
# without a specified value, change_password will be set to the const 0.
# In this case we prompt for the new password.
if Moulinette.interface.type == "cli" and not change_password:
change_password = Moulinette.prompt(m18n.n("ask_password"), True, True)
change_password = Moulinette.prompt(m18n.n("ask_password"), is_password=True, confirm=True)
# Ensure sufficiently complex password
assert_password_is_strong_enough("user", change_password)

View file

@ -25,12 +25,13 @@ import urllib.parse
import tempfile
import shutil
from collections import OrderedDict
from typing import Optional, Dict, List
from typing import Optional, Dict, List, Union, Any, Mapping
from moulinette.interfaces.cli import colorize
from moulinette import Moulinette, m18n
from moulinette.utils.log import getActionLogger
from moulinette.utils.filesystem import (
read_file,
write_to_file,
read_toml,
read_yaml,
@ -99,6 +100,9 @@ class ConfigPanel:
result[key]["value"] = question_class.humanize(
option["current_value"], option
)
# FIXME: semantics, technically here this is not about a prompt...
if question_class.hide_user_input_in_prompt:
result[key]["value"] = "**************" # Prevent displaying password in `config get`
if mode == "full":
return self.config
@ -164,6 +168,9 @@ class ConfigPanel:
raise
finally:
# Delete files uploaded from API
# FIXME : this is currently done in the context of config panels,
# but could also happen in the context of app install ... (or anywhere else
# where we may parse args etc...)
FileQuestion.clean_upload_dirs()
self._reload_services()
@ -198,20 +205,20 @@ class ConfigPanel:
# Transform toml format into internal format
format_description = {
"toml": {
"root": {
"properties": ["version", "i18n"],
"default": {"version": 1.0},
"defaults": {"version": 1.0},
},
"panels": {
"properties": ["name", "services", "actions", "help"],
"default": {
"defaults": {
"services": [],
"actions": {"apply": {"en": "Apply"}},
},
},
"sections": {
"properties": ["name", "services", "optional", "help", "visible"],
"default": {
"defaults": {
"name": "",
"services": [],
"optional": True,
@ -241,11 +248,11 @@ class ConfigPanel:
"accept",
"redact",
],
"default": {},
"defaults": {},
},
}
def convert(toml_node, node_type):
def _build_internal_config_panel(raw_infos, level):
"""Convert TOML in internal format ('full' mode used by webadmin)
Here are some properties of 1.0 config panel in toml:
- node properties and node children are mixed,
@ -253,48 +260,49 @@ class ConfigPanel:
- some properties have default values
This function detects all children nodes and put them in a list
"""
# Prefill the node default keys if needed
default = format_description[node_type]["default"]
node = {key: toml_node.get(key, value) for key, value in default.items()}
properties = format_description[node_type]["properties"]
defaults = format_description[level]["defaults"]
properties = format_description[level]["properties"]
# Define the filter_key part to use and the children type
i = list(format_description).index(node_type)
subnode_type = (
list(format_description)[i + 1] if node_type != "options" else None
# Start building the ouput (merging the raw infos + defaults)
out = {key: raw_infos.get(key, value) for key, value in defaults.items()}
# Now fill the sublevels (+ apply filter_key)
i = list(format_description).index(level)
sublevel = (
list(format_description)[i + 1] if level != "options" else None
)
search_key = filter_key[i] if len(filter_key) > i else False
for key, value in toml_node.items():
for key, value in raw_infos.items():
# Key/value are a child node
if (
isinstance(value, OrderedDict)
and key not in properties
and subnode_type
and sublevel
):
# We exclude all nodes not referenced by the filter_key
if search_key and key != search_key:
continue
subnode = convert(value, subnode_type)
subnode = _build_internal_config_panel(value, sublevel)
subnode["id"] = key
if node_type == "toml":
if level == "root":
subnode.setdefault("name", {"en": key.capitalize()})
elif node_type == "sections":
elif level == "sections":
subnode["name"] = key # legacy
subnode.setdefault("optional", toml_node.get("optional", True))
node.setdefault(subnode_type, []).append(subnode)
subnode.setdefault("optional", raw_infos.get("optional", True))
out.setdefault(sublevel, []).append(subnode)
# Key/value are a property
else:
if key not in properties:
logger.warning(f"Unknown key '{key}' found in config toml")
logger.warning(f"Unknown key '{key}' found in config panel")
# Todo search all i18n keys
node[key] = (
out[key] = (
value if key not in ["ask", "help", "name"] else {"en": value}
)
return node
return out
self.config = convert(toml_config_panel, "toml")
self.config = _build_internal_config_panel(toml_config_panel, "root")
try:
self.config["panels"][0]["sections"][0]["options"][0]
@ -376,14 +384,13 @@ class ConfigPanel:
display_header(f"\n# {name}")
# Check and ask unanswered questions
self.new_values.update(
parse_args_in_yunohost_format(self.args, section["options"])
)
self.new_values = {
key: value[0]
for key, value in self.new_values.items()
if not value[0] is None
}
questions = ask_questions_and_parse_answers(section["options"], self.args)
self.new_values.update({
question.name: question.value
for question in questions
if question.value is not None
})
self.errors = None
def _get_default_values(self):
@ -457,18 +464,20 @@ class Question(object):
hide_user_input_in_prompt = False
pattern: Optional[Dict] = None
def __init__(self, question, user_answers):
def __init__(self, question: Dict[str, Any]):
self.name = question["name"]
self.type = question.get("type", "string")
self.default = question.get("default", None)
self.current_value = question.get("current_value")
self.optional = question.get("optional", False)
self.choices = question.get("choices", [])
self.pattern = question.get("pattern", self.pattern)
self.ask = question.get("ask", {"en": self.name})
self.help = question.get("help")
self.value = user_answers.get(self.name)
self.redact = question.get("redact", False)
# .current_value is the currently stored value
self.current_value = question.get("current_value")
# .value is the "proposed" value which we got from the user
self.value = question.get("value")
# Empty value is parsed as empty string
if self.default == "":
@ -480,6 +489,8 @@ class Question(object):
@staticmethod
def normalize(value, option={}):
if isinstance(value, str):
value = value.strip()
return value
def _prompt(self, text):
@ -491,9 +502,11 @@ class Question(object):
self.value = Moulinette.prompt(
message=text,
is_password=self.hide_user_input_in_prompt,
confirm=False, # We doesn't want to confirm this kind of password like in webadmin
confirm=False,
prefill=prefill,
is_multiline=(self.type == "text"),
autocomplete=self.choices,
help=_value_for_locale(self.help)
)
def ask_if_needed(self):
@ -513,12 +526,9 @@ class Question(object):
):
self.value = class_default if self.default is None else self.default
# Normalization
# This is done to enforce a certain formating like for boolean
self.value = self.normalize(self.value, self)
# Prevalidation
try:
# Normalize and validate
self.value = self.normalize(self.value, self)
self._prevalidate()
except YunohostValidationError as e:
# If in interactive cli, re-ask the current question
@ -531,9 +541,10 @@ class Question(object):
raise
break
self.value = self._post_parse_value()
return (self.value, self.argument_type)
return self.value
def _prevalidate(self):
if self.value in [None, ""] and not self.optional:
@ -542,7 +553,12 @@ class Question(object):
# we have an answer, do some post checks
if self.value not in [None, ""]:
if self.choices and self.value not in self.choices:
self._raise_invalid_answer()
raise YunohostValidationError(
"app_argument_choice_invalid",
name=self.name,
value=self.value,
choices=", ".join(self.choices),
)
if self.pattern and not re.match(self.pattern["regexp"], str(self.value)):
raise YunohostValidationError(
self.pattern["error"],
@ -550,25 +566,25 @@ class Question(object):
value=self.value,
)
def _raise_invalid_answer(self):
raise YunohostValidationError(
"app_argument_choice_invalid",
name=self.name,
value=self.value,
choices=", ".join(self.choices),
)
def _format_text_for_user_input_in_cli(self):
def _format_text_for_user_input_in_cli(self, column=False):
text_for_user_input_in_cli = _value_for_locale(self.ask)
if self.choices:
text_for_user_input_in_cli += " [{0}]".format(" | ".join(self.choices))
if self.help or column:
text_for_user_input_in_cli += ":\033[m"
if self.help:
text_for_user_input_in_cli += "\n - "
text_for_user_input_in_cli += _value_for_locale(self.help)
# Prevent displaying a shitload of choices
# (e.g. 100+ available users when choosing an app admin...)
choices = list(self.choices.values()) if isinstance(self.choices, dict) else self.choices
choices_to_display = choices[:20]
remaining_choices = len(choices[20:])
if remaining_choices > 0:
choices_to_display += [m18n.n("other_available_options", n=remaining_choices)]
choices_to_display = " | ".join(choices_to_display)
text_for_user_input_in_cli += f" [{choices_to_display}]"
return text_for_user_input_in_cli
def _post_parse_value(self):
@ -659,6 +675,8 @@ class TagsQuestion(Question):
def normalize(value, option={}):
if isinstance(value, list):
return ",".join(value)
if isinstance(value, str):
value = value.strip()
return value
def _prevalidate(self):
@ -684,20 +702,14 @@ class PasswordQuestion(Question):
default_value = ""
forbidden_chars = "{}"
def __init__(self, question, user_answers):
super().__init__(question, user_answers)
def __init__(self, question):
super().__init__(question)
self.redact = True
if self.default is not None:
raise YunohostValidationError(
"app_argument_password_no_default", name=self.name
)
@staticmethod
def humanize(value, option={}):
if value:
return "********" # Avoid to display the password on screen
return ""
def _prevalidate(self):
super()._prevalidate()
@ -712,34 +724,31 @@ class PasswordQuestion(Question):
assert_password_is_strong_enough("user", self.value)
def _format_text_for_user_input_in_cli(self):
need_column = self.current_value or self.optional
text_for_user_input_in_cli = super()._format_text_for_user_input_in_cli(
need_column
)
if self.current_value:
text_for_user_input_in_cli += "\n - " + m18n.n(
"app_argument_password_help_keep"
)
if self.optional:
text_for_user_input_in_cli += "\n - " + m18n.n(
"app_argument_password_help_optional"
)
return text_for_user_input_in_cli
def _prompt(self, text):
super()._prompt(text)
if self.current_value and self.value == "":
self.value = self.current_value
elif self.value == " ":
self.value = ""
class PathQuestion(Question):
argument_type = "path"
default_value = ""
@staticmethod
def normalize(value, option={}):
option = option.__dict__ if isinstance(option, Question) else option
if not value.strip():
if option.get("optional"):
return ""
# Hmpf here we could just have a "else" case
# but we also want PathQuestion.normalize("") to return "/"
# (i.e. if no option is provided, hence .get("optional") is None
elif option.get("optional") is False:
raise YunohostValidationError(
"app_argument_invalid",
name=option.get("name"),
error="Question is mandatory"
)
return "/" + value.strip().strip(" /")
class BooleanQuestion(Question):
argument_type = "boolean"
@ -750,50 +759,66 @@ class BooleanQuestion(Question):
@staticmethod
def humanize(value, option={}):
option = option.__dict__ if isinstance(option, Question) else option
yes = option.get("yes", 1)
no = option.get("no", 0)
value = str(value).lower()
if value == str(yes).lower():
return "yes"
if value == str(no).lower():
return "no"
if value in BooleanQuestion.yes_answers:
return "yes"
if value in BooleanQuestion.no_answers:
return "no"
if value in ["none", ""]:
value = BooleanQuestion.normalize(value, option)
if value == yes:
return "yes"
if value == no:
return "no"
if value is None:
return ""
raise YunohostValidationError(
"app_argument_choice_invalid",
name=option.get("name", ""),
name=option.get("name"),
value=value,
choices="yes, no, y, n, 1, 0",
choices="yes/no",
)
@staticmethod
def normalize(value, option={}):
yes = option.get("yes", 1)
no = option.get("no", 0)
if str(value).lower() in BooleanQuestion.yes_answers:
return yes
option = option.__dict__ if isinstance(option, Question) else option
if str(value).lower() in BooleanQuestion.no_answers:
return no
if isinstance(value, str):
value = value.strip()
if value in [None, ""]:
technical_yes = option.get("yes", 1)
technical_no = option.get("no", 0)
no_answers = BooleanQuestion.no_answers
yes_answers = BooleanQuestion.yes_answers
assert str(technical_yes).lower() not in no_answers, f"'yes' value can't be in {no_answers}"
assert str(technical_no).lower() not in yes_answers, f"'no' value can't be in {yes_answers}"
no_answers += [str(technical_no).lower()]
yes_answers += [str(technical_yes).lower()]
strvalue = str(value).lower()
if strvalue in yes_answers:
return technical_yes
if strvalue in no_answers:
return technical_no
if strvalue in ["none", ""]:
return None
raise YunohostValidationError(
"app_argument_choice_invalid",
name=option.get("name", ""),
value=value,
choices="yes, no, y, n, 1, 0",
name=option.get("name"),
value=strvalue,
choices="yes/no",
)
def __init__(self, question, user_answers):
super().__init__(question, user_answers)
def __init__(self, question):
super().__init__(question)
self.yes = question.get("yes", 1)
self.no = question.get("no", 0)
if self.default is None:
@ -807,42 +832,44 @@ class BooleanQuestion(Question):
return text_for_user_input_in_cli
def get(self, key, default=None):
try:
return getattr(self, key)
except AttributeError:
return default
return getattr(self, key, default)
class DomainQuestion(Question):
argument_type = "domain"
def __init__(self, question, user_answers):
def __init__(self, question):
from yunohost.domain import domain_list, _get_maindomain
super().__init__(question, user_answers)
super().__init__(question)
if self.default is None:
self.default = _get_maindomain()
self.choices = domain_list()["domains"]
def _raise_invalid_answer(self):
raise YunohostValidationError(
"app_argument_invalid",
name=self.name,
error=m18n.n("domain_name_unknown", domain=self.value),
)
@staticmethod
def normalize(value, option={}):
if value.startswith("https://"):
value = value[len("https://"):]
elif value.startswith("http://"):
value = value[len("http://"):]
# Remove trailing slashes
value = value.rstrip("/").lower()
return value
class UserQuestion(Question):
argument_type = "user"
def __init__(self, question, user_answers):
def __init__(self, question):
from yunohost.user import user_list, user_info
from yunohost.domain import _get_maindomain
super().__init__(question, user_answers)
self.choices = user_list()["users"]
super().__init__(question)
self.choices = list(user_list()["users"].keys())
if not self.choices:
raise YunohostValidationError(
@ -853,42 +880,42 @@ class UserQuestion(Question):
if self.default is None:
root_mail = "root@%s" % _get_maindomain()
for user in self.choices.keys():
for user in self.choices:
if root_mail in user_info(user).get("mail-aliases", []):
self.default = user
break
def _raise_invalid_answer(self):
raise YunohostValidationError(
"app_argument_invalid",
name=self.name,
error=m18n.n("user_unknown", user=self.value),
)
class NumberQuestion(Question):
argument_type = "number"
default_value = None
def __init__(self, question, user_answers):
super().__init__(question, user_answers)
def __init__(self, question):
super().__init__(question)
self.min = question.get("min", None)
self.max = question.get("max", None)
self.step = question.get("step", None)
@staticmethod
def normalize(value, option={}):
if isinstance(value, int):
return value
if isinstance(value, str):
value = value.strip()
if isinstance(value, str) and value.isdigit():
return int(value)
if value in [None, ""]:
return value
option = option.__dict__ if isinstance(option, Question) else option
raise YunohostValidationError(
"app_argument_invalid", name=option.name, error=m18n.n("invalid_number")
"app_argument_invalid",
name=option.get("name"),
error=m18n.n("invalid_number")
)
def _prevalidate(self):
@ -915,8 +942,8 @@ class DisplayTextQuestion(Question):
argument_type = "display_text"
readonly = True
def __init__(self, question, user_answers):
super().__init__(question, user_answers)
def __init__(self, question):
super().__init__(question)
self.optional = True
self.style = question.get(
@ -946,90 +973,50 @@ class FileQuestion(Question):
@classmethod
def clean_upload_dirs(cls):
# Delete files uploaded from API
if Moulinette.interface.type == "api":
for upload_dir in cls.upload_dirs:
if os.path.exists(upload_dir):
shutil.rmtree(upload_dir)
for upload_dir in cls.upload_dirs:
if os.path.exists(upload_dir):
shutil.rmtree(upload_dir)
def __init__(self, question, user_answers):
super().__init__(question, user_answers)
if question.get("accept"):
self.accept = question.get("accept")
else:
self.accept = ""
if Moulinette.interface.type == "api":
if user_answers.get(f"{self.name}[name]"):
self.value = {
"content": self.value,
"filename": user_answers.get(f"{self.name}[name]", self.name),
}
def __init__(self, question):
super().__init__(question)
self.accept = question.get("accept", "")
def _prevalidate(self):
if self.value is None:
self.value = self.current_value
super()._prevalidate()
if (
isinstance(self.value, str)
and self.value
and not os.path.exists(self.value)
):
raise YunohostValidationError(
"app_argument_invalid",
name=self.name,
error=m18n.n("file_does_not_exist", path=self.value),
)
if self.value in [None, ""] or not self.accept:
return
filename = self.value if isinstance(self.value, str) else self.value["filename"]
if "." not in filename or "." + filename.split(".")[
-1
] not in self.accept.replace(" ", "").split(","):
raise YunohostValidationError(
"app_argument_invalid",
name=self.name,
error=m18n.n(
"file_extension_not_accepted", file=filename, accept=self.accept
),
)
if Moulinette.interface.type != "api":
if not self.value or not os.path.exists(str(self.value)):
raise YunohostValidationError(
"app_argument_invalid",
name=self.name,
error=m18n.n("file_does_not_exist", path=str(self.value)),
)
def _post_parse_value(self):
from base64 import b64decode
# Upload files from API
# A file arg contains a string with "FILENAME:BASE64_CONTENT"
if not self.value:
return self.value
if Moulinette.interface.type == "api" and isinstance(self.value, dict):
upload_dir = tempfile.mkdtemp(prefix="ynh_filequestion_")
_, file_path = tempfile.mkstemp(dir=upload_dir)
upload_dir = tempfile.mkdtemp(prefix="tmp_configpanel_")
FileQuestion.upload_dirs += [upload_dir]
filename = self.value["filename"]
logger.debug(
f"Save uploaded file {self.value['filename']} from API into {upload_dir}"
)
FileQuestion.upload_dirs += [upload_dir]
# Filename is given by user of the API. For security reason, we have replaced
# os.path.join to avoid the user to be able to rewrite a file in filesystem
# i.e. os.path.join("/foo", "/etc/passwd") == "/etc/passwd"
file_path = os.path.normpath(upload_dir + "/" + filename)
if not file_path.startswith(upload_dir + "/"):
raise YunohostError(
f"Filename '{filename}' received from the API got a relative parent path, which is forbidden",
raw_msg=True,
)
i = 2
while os.path.exists(file_path):
file_path = os.path.normpath(upload_dir + "/" + filename + (".%d" % i))
i += 1
logger.debug(f"Saving file {self.name} for file question into {file_path}")
if Moulinette.interface.type != "api":
content = read_file(str(self.value), file_mode="rb")
content = self.value["content"]
if Moulinette.interface.type == "api":
content = b64decode(self.value)
write_to_file(file_path, b64decode(content), file_mode="wb")
write_to_file(file_path, content, file_mode="wb")
self.value = file_path
self.value = file_path
return self.value
@ -1057,25 +1044,37 @@ ARGUMENTS_TYPE_PARSERS = {
}
def parse_args_in_yunohost_format(user_answers, argument_questions):
def ask_questions_and_parse_answers(questions: Dict, prefilled_answers: Union[str, Mapping[str, Any]] = {}) -> List[Question]:
"""Parse arguments store in either manifest.json or actions.json or from a
config panel against the user answers when they are present.
Keyword arguments:
user_answers -- a dictionnary of arguments from the user (generally
empty in CLI, filed from the admin interface)
argument_questions -- the arguments description store in yunohost
questions -- the arguments description store in yunohost
format from actions.json/toml, manifest.json/toml
or config_panel.json/toml
prefilled_answers -- a url "query-string" such as "domain=yolo.test&path=/foobar&admin=sam"
or a dict such as {"domain": "yolo.test", "path": "/foobar", "admin": "sam"}
"""
parsed_answers_dict = OrderedDict()
for question in argument_questions:
if isinstance(prefilled_answers, str):
# FIXME FIXME : this is not uniform with config_set() which uses parse.qs (no l)
# parse_qsl parse single values
# whereas parse.qs return list of values (which is useful for tags, etc)
# For now, let's not migrate this piece of code to parse_qs
# Because Aleks believes some bits of the app CI rely on overriding values (e.g. foo=foo&...&foo=bar)
prefilled_answers = dict(urllib.parse.parse_qsl(prefilled_answers or "", keep_blank_values=True))
if not prefilled_answers:
prefilled_answers = {}
out = []
for question in questions:
question_class = ARGUMENTS_TYPE_PARSERS[question.get("type", "string")]
question = question_class(question, user_answers)
question["value"] = prefilled_answers.get(question["name"])
question = question_class(question)
answer = question.ask_if_needed()
if answer is not None:
parsed_answers_dict[question.name] = answer
question.ask_if_needed()
out.append(question)
return parsed_answers_dict
return out