Merge pull request #1520 from YunoHost/ci-format-dev

[CI] Format code with Black
This commit is contained in:
Alexandre Aubin 2022-10-25 01:07:48 +02:00 committed by GitHub
commit b27908a454
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 566 additions and 245 deletions

View file

@ -150,7 +150,17 @@ def find_expected_string_keys():
# Global settings
global_config = toml.load(open(ROOT + "share/config_global.toml"))
# Boring hard-coding because there's no simple other way idk
settings_without_help_key = ["smtp_relay_host", "smtp_relay_password", "smtp_relay_port", "smtp_relay_user", "ssh_port", "ssowat_panel_overlay_enabled", "root_password", "root_access_explain", "root_password_confirm"]
settings_without_help_key = [
"smtp_relay_host",
"smtp_relay_password",
"smtp_relay_port",
"smtp_relay_user",
"ssh_port",
"ssowat_panel_overlay_enabled",
"root_password",
"root_access_explain",
"root_password_confirm",
]
for panel in global_config.values():
if not isinstance(panel, dict):

View file

@ -182,7 +182,9 @@ def app_info(app, full=False, upgradable=False):
# Hydrate app notifications and doc
for pagename, content_per_lang in ret["manifest"]["doc"].items():
for lang, content in content_per_lang.items():
ret["manifest"]["doc"][pagename][lang] = _hydrate_app_template(content, settings)
ret["manifest"]["doc"][pagename][lang] = _hydrate_app_template(
content, settings
)
for step, notifications in ret["manifest"]["notifications"].items():
for name, content_per_lang in notifications.items():
for lang, content in content_per_lang.items():
@ -201,7 +203,9 @@ def app_info(app, full=False, upgradable=False):
ret["supports_backup_restore"] = os.path.exists(
os.path.join(setting_path, "scripts", "backup")
) and os.path.exists(os.path.join(setting_path, "scripts", "restore"))
ret["supports_multi_instance"] = local_manifest.get("integration", {}).get("multi_instance", False)
ret["supports_multi_instance"] = local_manifest.get("integration", {}).get(
"multi_instance", False
)
ret["supports_config_panel"] = os.path.exists(
os.path.join(setting_path, "config_panel.toml")
)
@ -429,7 +433,9 @@ def app_change_url(operation_logger, app, domain, path):
tmp_workdir_for_app = _make_tmp_workdir_for_app(app=app)
# Prepare env. var. to pass to script
env_dict = _make_environment_for_app_script(app, workdir=tmp_workdir_for_app, action="change_url")
env_dict = _make_environment_for_app_script(
app, workdir=tmp_workdir_for_app, action="change_url"
)
env_dict["YNH_APP_OLD_DOMAIN"] = old_domain
env_dict["YNH_APP_OLD_PATH"] = old_path
env_dict["YNH_APP_NEW_DOMAIN"] = domain
@ -489,7 +495,12 @@ def app_upgrade(app=[], url=None, file=None, force=False, no_safety_backup=False
from yunohost.permission import permission_sync_to_user
from yunohost.regenconf import manually_modified_files
from yunohost.utils.legacy import _patch_legacy_php_versions, _patch_legacy_helpers
from yunohost.backup import backup_list, backup_create, backup_delete, backup_restore
from yunohost.backup import (
backup_list,
backup_create,
backup_delete,
backup_restore,
)
apps = app
# Check if disk space available
@ -581,7 +592,9 @@ def app_upgrade(app=[], url=None, file=None, force=False, no_safety_backup=False
if manifest["packaging_format"] >= 2:
if no_safety_backup:
# FIXME: i18n
logger.warning("Skipping the creation of a backup prior to the upgrade.")
logger.warning(
"Skipping the creation of a backup prior to the upgrade."
)
else:
# FIXME: i18n
logger.info("Creating a safety backup prior to the upgrade")
@ -601,7 +614,10 @@ def app_upgrade(app=[], url=None, file=None, force=False, no_safety_backup=False
backup_delete(other_safety_backup_name)
else:
# Is this needed ? Shouldn't backup_create report an expcetion if backup failed ?
raise YunohostError("Uhoh the safety backup failed ?! Aborting the upgrade process.", raw_msg=True)
raise YunohostError(
"Uhoh the safety backup failed ?! Aborting the upgrade process.",
raw_msg=True,
)
_assert_system_is_sane_for_app(manifest, "pre")
@ -633,8 +649,11 @@ def app_upgrade(app=[], url=None, file=None, force=False, no_safety_backup=False
if manifest["packaging_format"] >= 2:
from yunohost.utils.resources import AppResourceManager
try:
AppResourceManager(app_instance_name, wanted=manifest, current=app_dict["manifest"]).apply(rollback_if_failure=True)
AppResourceManager(
app_instance_name, wanted=manifest, current=app_dict["manifest"]
).apply(rollback_if_failure=True)
except Exception:
# FIXME : improve error handling ....
raise
@ -657,13 +676,23 @@ def app_upgrade(app=[], url=None, file=None, force=False, no_safety_backup=False
finally:
# If upgrade failed, try to restore the safety backup
if upgrade_failed and manifest["packaging_format"] >= 2 and not no_safety_backup:
logger.warning("Upgrade failed ... attempting to restore the satefy backup (Yunohost first need to remove the app for this) ...")
if (
upgrade_failed
and manifest["packaging_format"] >= 2
and not no_safety_backup
):
logger.warning(
"Upgrade failed ... attempting to restore the satefy backup (Yunohost first need to remove the app for this) ..."
)
app_remove(app_instance_name)
backup_restore(name=safety_backup_name, apps=[app_instance_name], force=True)
backup_restore(
name=safety_backup_name, apps=[app_instance_name], force=True
)
if not _is_installed(app_instance_name):
logger.error("Uhoh ... Yunohost failed to restore the app to the way it was before the failed upgrade :|")
logger.error(
"Uhoh ... Yunohost failed to restore the app to the way it was before the failed upgrade :|"
)
# Whatever happened (install success or failure) we check if it broke the system
# and warn the user about it
@ -934,8 +963,11 @@ def app_install(
if packaging_format >= 2:
from yunohost.utils.resources import AppResourceManager
try:
AppResourceManager(app_instance_name, wanted=manifest, current={}).apply(rollback_if_failure=True)
AppResourceManager(app_instance_name, wanted=manifest, current={}).apply(
rollback_if_failure=True
)
except Exception:
# FIXME : improve error handling ....
raise
@ -1050,8 +1082,11 @@ def app_install(
if packaging_format >= 2:
from yunohost.utils.resources import AppResourceManager
try:
AppResourceManager(app_instance_name, wanted={}, current=manifest).apply(rollback_if_failure=False)
AppResourceManager(
app_instance_name, wanted={}, current=manifest
).apply(rollback_if_failure=False)
except Exception:
# FIXME : improve error handling ....
raise
@ -1151,7 +1186,9 @@ def app_remove(operation_logger, app, purge=False):
remove_script = f"{tmp_workdir_for_app}/scripts/remove"
env_dict = {}
env_dict = _make_environment_for_app_script(app, workdir=tmp_workdir_for_app, action="remove")
env_dict = _make_environment_for_app_script(
app, workdir=tmp_workdir_for_app, action="remove"
)
env_dict["YNH_APP_PURGE"] = str(1 if purge else 0)
operation_logger.extra.update({"env": env_dict})
@ -1175,7 +1212,10 @@ def app_remove(operation_logger, app, purge=False):
if packaging_format >= 2:
try:
from yunohost.utils.resources import AppResourceManager
AppResourceManager(app, wanted={}, current=manifest).apply(rollback_if_failure=False)
AppResourceManager(app, wanted={}, current=manifest).apply(
rollback_if_failure=False
)
except Exception:
# FIXME : improve error handling ....
raise
@ -1550,11 +1590,11 @@ def app_action_list(app):
@is_unit_operation()
def app_action_run(
operation_logger, app, action, args=None, args_file=None
):
def app_action_run(operation_logger, app, action, args=None, args_file=None):
return AppConfigPanel(app).run_action(action, args=args, args_file=args_file, operation_logger=operation_logger)
return AppConfigPanel(app).run_action(
action, args=args, args_file=args_file, operation_logger=operation_logger
)
def app_config_get(app, key="", full=False, export=False):
@ -1865,7 +1905,9 @@ def _get_manifest_of_app(path):
raw_msg=True,
)
manifest["packaging_format"] = float(str(manifest.get("packaging_format", "")).strip() or "0")
manifest["packaging_format"] = float(
str(manifest.get("packaging_format", "")).strip() or "0"
)
if manifest["packaging_format"] < 2:
manifest = _convert_v1_manifest_to_v2(manifest)
@ -1900,7 +1942,9 @@ def _parse_app_doc_and_notifications(path):
for step in ["pre_install", "post_install", "pre_upgrade", "post_upgrade"]:
notifications[step] = {}
for filepath in glob.glob(os.path.join(path, "doc", "notifications", f"{step}*.md")):
for filepath in glob.glob(
os.path.join(path, "doc", "notifications", f"{step}*.md")
):
m = re.match(step + "(_[a-z]{2,3})?.md", filepath.split("/")[-1])
if not m:
continue
@ -1910,8 +1954,12 @@ def _parse_app_doc_and_notifications(path):
notifications[step][pagename] = {}
notifications[step][pagename][lang] = read_file(filepath).strip()
for filepath in glob.glob(os.path.join(path, "doc", "notifications", f"{step}.d") + "/*.md"):
m = re.match(r"([A-Za-z0-9\.\~]*)(_[a-z]{2,3})?.md", filepath.split("/")[-1])
for filepath in glob.glob(
os.path.join(path, "doc", "notifications", f"{step}.d") + "/*.md"
):
m = re.match(
r"([A-Za-z0-9\.\~]*)(_[a-z]{2,3})?.md", filepath.split("/")[-1]
)
if not m:
continue
pagename, lang = m.groups()
@ -1925,7 +1973,7 @@ def _parse_app_doc_and_notifications(path):
def _hydrate_app_template(template, data):
stuff_to_replace = set(re.findall(r'__[A-Z0-9]+?[A-Z0-9_]*?[A-Z0-9]*?__', template))
stuff_to_replace = set(re.findall(r"__[A-Z0-9]+?[A-Z0-9_]*?[A-Z0-9]*?__", template))
for stuff in stuff_to_replace:
@ -1951,18 +1999,22 @@ def _convert_v1_manifest_to_v2(manifest):
manifest["upstream"]["website"] = manifest["url"]
manifest["integration"] = {
"yunohost": manifest.get("requirements", {}).get("yunohost", "").replace(">", "").replace("=", "").replace(" ", ""),
"yunohost": manifest.get("requirements", {})
.get("yunohost", "")
.replace(">", "")
.replace("=", "")
.replace(" ", ""),
"architectures": "all",
"multi_instance": manifest.get("multi_instance", False),
"ldap": "?",
"sso": "?",
"disk": "50M",
"ram": {"build": "50M", "runtime": "10M"}
"ram": {"build": "50M", "runtime": "10M"},
}
maintainers = manifest.get("maintainer", {})
if isinstance(maintainers, list):
maintainers = [m['name'] for m in maintainers]
maintainers = [m["name"] for m in maintainers]
else:
maintainers = [maintainers["name"]] if maintainers.get("name") else []
@ -1973,21 +2025,39 @@ def _convert_v1_manifest_to_v2(manifest):
manifest["install"] = {}
for question in install_questions:
name = question.pop("name")
if "ask" in question and name in ["domain", "path", "admin", "is_public", "password"]:
if "ask" in question and name in [
"domain",
"path",
"admin",
"is_public",
"password",
]:
question.pop("ask")
if question.get("example") and question.get("type") in ["domain", "path", "user", "boolean", "password"]:
if question.get("example") and question.get("type") in [
"domain",
"path",
"user",
"boolean",
"password",
]:
question.pop("example")
manifest["install"][name] = question
manifest["resources"] = {
"system_user": {},
"install_dir": {
"alias": "final_path"
}
}
manifest["resources"] = {"system_user": {}, "install_dir": {"alias": "final_path"}}
keys_to_keep = ["packaging_format", "id", "name", "description", "version", "maintainers", "upstream", "integration", "install", "resources"]
keys_to_keep = [
"packaging_format",
"id",
"name",
"description",
"version",
"maintainers",
"upstream",
"integration",
"install",
"resources",
]
keys_to_del = [key for key in manifest.keys() if key not in keys_to_keep]
for key in keys_to_del:
@ -2021,8 +2091,14 @@ def _set_default_ask_questions(questions, script_name="install"):
("password", "password"), # i18n: app_manifest_install_ask_password
("user", "admin"), # i18n: app_manifest_install_ask_admin
("boolean", "is_public"), # i18n: app_manifest_install_ask_is_public
("group", "init_main_permission"), # i18n: app_manifest_install_ask_init_main_permission
("group", "init_admin_permission"), # i18n: app_manifest_install_ask_init_admin_permission
(
"group",
"init_main_permission",
), # i18n: app_manifest_install_ask_init_main_permission
(
"group",
"init_admin_permission",
), # i18n: app_manifest_install_ask_init_admin_permission
]
for question_name, question in questions.items():
@ -2034,7 +2110,9 @@ def _set_default_ask_questions(questions, script_name="install"):
for question_with_default in questions_with_default
):
# The key is for example "app_manifest_install_ask_domain"
question["ask"] = m18n.n(f"app_manifest_{script_name}_ask_{question['name']}")
question["ask"] = m18n.n(
f"app_manifest_{script_name}_ask_{question['name']}"
)
# Also it in fact doesn't make sense for any of those questions to have an example value nor a default value...
if question.get("type") in ["domain", "user", "password"]:
@ -2286,10 +2364,14 @@ def _check_manifest_requirements(manifest: Dict, action: str):
# Yunohost version requirement
yunohost_requirement = version.parse(manifest["integration"]["yunohost"] or "4.3")
yunohost_installed_version = version.parse(get_ynh_package_version("yunohost")["version"])
yunohost_installed_version = version.parse(
get_ynh_package_version("yunohost")["version"]
)
if yunohost_requirement > yunohost_installed_version:
# FIXME : i18n
raise YunohostValidationError(f"This app requires Yunohost >= {yunohost_requirement} but current installed version is {yunohost_installed_version}")
raise YunohostValidationError(
f"This app requires Yunohost >= {yunohost_requirement} but current installed version is {yunohost_installed_version}"
)
# Architectures
arch_requirement = manifest["integration"]["architectures"]
@ -2297,7 +2379,9 @@ def _check_manifest_requirements(manifest: Dict, action: str):
arch = system_arch()
if arch not in arch_requirement:
# FIXME: i18n
raise YunohostValidationError(f"This app can only be installed on architectures {', '.join(arch_requirement)} but your server architecture is {arch}")
raise YunohostValidationError(
f"This app can only be installed on architectures {', '.join(arch_requirement)} but your server architecture is {arch}"
)
# Multi-instance
if action == "install" and manifest["integration"]["multi_instance"] == False:
@ -2310,10 +2394,13 @@ def _check_manifest_requirements(manifest: Dict, action: str):
if action == "install":
disk_requirement = manifest["integration"]["disk"]
if free_space_in_directory("/") <= human_to_binary(disk_requirement) \
or free_space_in_directory("/var") <= human_to_binary(disk_requirement):
if free_space_in_directory("/") <= human_to_binary(
disk_requirement
) or free_space_in_directory("/var") <= human_to_binary(disk_requirement):
# FIXME : i18m
raise YunohostValidationError(f"This app requires {disk_requirement} free space.")
raise YunohostValidationError(
f"This app requires {disk_requirement} free space."
)
# Ram for build
ram_build_requirement = manifest["integration"]["ram"]["build"]
@ -2327,7 +2414,9 @@ def _check_manifest_requirements(manifest: Dict, action: str):
if ram < human_to_binary(ram_build_requirement):
# FIXME : i18n
ram_human = binary_to_human(ram)
raise YunohostValidationError(f"This app requires {ram_build_requirement} RAM to install/upgrade but only {ram_human} is available right now.")
raise YunohostValidationError(
f"This app requires {ram_build_requirement} RAM to install/upgrade but only {ram_human} is available right now."
)
def _guess_webapp_path_requirement(app_folder: str) -> str:
@ -2339,10 +2428,14 @@ def _guess_webapp_path_requirement(app_folder: str) -> str:
raw_questions = manifest["install"]
domain_questions = [
question for question in raw_questions.values() if question.get("type") == "domain"
question
for question in raw_questions.values()
if question.get("type") == "domain"
]
path_questions = [
question for question in raw_questions.values() if question.get("type") == "path"
question
for question in raw_questions.values()
if question.get("type") == "path"
]
if len(domain_questions) == 0 and len(path_questions) == 0:
@ -2442,11 +2535,7 @@ def _assert_no_conflicting_apps(domain, path, ignore_app=None, full_domain=False
def _make_environment_for_app_script(
app,
args={},
args_prefix="APP_ARG_",
workdir=None,
action=None
app, args={}, args_prefix="APP_ARG_", workdir=None, action=None
):
app_setting_path = os.path.join(APPS_SETTING_PATH, app)
@ -2487,7 +2576,7 @@ def _make_environment_for_app_script(
# Special weird case for backward compatibility...
# 'path' was loaded into 'path_url' .....
if 'path' in env_dict:
if "path" in env_dict:
env_dict["path_url"] = env_dict["path"]
return env_dict
@ -2651,4 +2740,3 @@ def _assert_system_is_sane_for_app(manifest, when):
raise YunohostValidationError("dpkg_is_broken")
elif when == "post":
raise YunohostError("this_action_broke_dpkg")

View file

@ -36,6 +36,7 @@ LDAP_URI = "ldap://localhost:389"
ADMIN_GROUP = "cn=admins,ou=groups"
AUTH_DN = "uid={uid},ou=users,dc=yunohost,dc=org"
class Authenticator(BaseAuthenticator):
name = "ldap_admin"
@ -46,7 +47,11 @@ class Authenticator(BaseAuthenticator):
def _authenticate_credentials(self, credentials=None):
try:
admins = _get_ldap_interface().search(ADMIN_GROUP, attrs=["memberUid"])[0].get("memberUid", [])
admins = (
_get_ldap_interface()
.search(ADMIN_GROUP, attrs=["memberUid"])[0]
.get("memberUid", [])
)
except ldap.SERVER_DOWN:
# ldap is down, attempt to restart it before really failing
logger.warning(m18n.n("ldap_server_is_down_restart_it"))
@ -55,10 +60,15 @@ class Authenticator(BaseAuthenticator):
# Force-reset existing LDAP interface
from yunohost.utils import ldap as ldaputils
ldaputils._ldap_interface = None
try:
admins = _get_ldap_interface().search(ADMIN_GROUP, attrs=["memberUid"])[0].get("memberUid", [])
admins = (
_get_ldap_interface()
.search(ADMIN_GROUP, attrs=["memberUid"])[0]
.get("memberUid", [])
)
except ldap.SERVER_DOWN:
raise YunohostError("ldap_server_down")
@ -105,7 +115,10 @@ class Authenticator(BaseAuthenticator):
raise
else:
if who != dn:
raise YunohostError(f"Not logged with the appropriate identity ? Found {who}, expected {dn} !?", raw_msg=True)
raise YunohostError(
f"Not logged with the appropriate identity ? Found {who}, expected {dn} !?",
raw_msg=True,
)
finally:
# Free the connection, we don't really need it to keep it open as the point is only to check authentication...
if con:

View file

@ -33,7 +33,15 @@ from packaging import version
from moulinette import Moulinette, m18n
from moulinette.utils.log import getActionLogger
from moulinette.utils.filesystem import read_file, mkdir, write_to_yaml, read_yaml, rm, chown, chmod
from moulinette.utils.filesystem import (
read_file,
mkdir,
write_to_yaml,
read_yaml,
rm,
chown,
chmod,
)
from moulinette.utils.process import check_output
import yunohost.domain
@ -1509,8 +1517,11 @@ class RestoreManager:
manifest = _get_manifest_of_app(app_settings_in_archive)
if manifest["packaging_format"] >= 2:
from yunohost.utils.resources import AppResourceManager
try:
AppResourceManager(app_instance_name, wanted=manifest, current={}).apply(rollback_if_failure=True)
AppResourceManager(
app_instance_name, wanted=manifest, current={}
).apply(rollback_if_failure=True)
except Exception:
# FIXME : improve error handling ....
raise
@ -1838,7 +1849,10 @@ class BackupMethod:
# to mounting error
# Compute size to copy
size = sum(space_used_by_directory(path["source"], follow_symlinks=False) for path in paths_needed_to_be_copied)
size = sum(
space_used_by_directory(path["source"], follow_symlinks=False)
for path in paths_needed_to_be_copied
)
size /= 1024 * 1024 # Convert bytes to megabytes
# Ask confirmation for copying

View file

@ -94,12 +94,11 @@ def certificate_status(domains, full=False):
_check_domain_is_ready_for_ACME(domain)
status["ACME_eligible"] = True
except Exception as e:
if e.key == 'certmanager_domain_not_diagnosed_yet':
if e.key == "certmanager_domain_not_diagnosed_yet":
status["ACME_eligible"] = None # = unknown status
else:
status["ACME_eligible"] = False
del status["domain"]
certificates[domain] = status
@ -210,11 +209,7 @@ def _certificate_install_selfsigned(domain_list, force=False):
# Check new status indicate a recently created self-signed certificate
status = _get_status(domain)
if (
status
and status["CA_type"] == "selfsigned"
and status["validity"] > 3648
):
if status and status["CA_type"] == "selfsigned" and status["validity"] > 3648:
logger.success(
m18n.n("certmanager_cert_install_success_selfsigned", domain=domain)
)
@ -229,7 +224,7 @@ def _certificate_install_selfsigned(domain_list, force=False):
if failed_cert_install:
raise YunohostError(
"certmanager_cert_install_failed_selfsigned",
domains=",".join(failed_cert_install)
domains=",".join(failed_cert_install),
)
@ -300,8 +295,7 @@ def _certificate_install_letsencrypt(domains, force=False, no_checks=False):
if failed_cert_install:
raise YunohostError(
"certmanager_cert_install_failed",
domains=",".join(failed_cert_install)
"certmanager_cert_install_failed", domains=",".join(failed_cert_install)
)
@ -426,10 +420,10 @@ def certificate_renew(domains, force=False, no_checks=False, email=False):
if failed_cert_install:
raise YunohostError(
"certmanager_cert_renew_failed",
domains=",".join(failed_cert_install)
"certmanager_cert_renew_failed", domains=",".join(failed_cert_install)
)
#
# Back-end stuff #
#
@ -658,10 +652,14 @@ def _get_status(domain):
# FIXME: is the .ca.cnf one actually used anywhere ? x_x
conf = os.path.join(SSL_DIR, "openssl.ca.cnf")
if os.path.exists(conf):
self_signed_issuers.append(check_output(f"grep commonName_default {conf}").split()[-1])
self_signed_issuers.append(
check_output(f"grep commonName_default {conf}").split()[-1]
)
conf = os.path.join(SSL_DIR, "openssl.cnf")
if os.path.exists(conf):
self_signed_issuers.append(check_output(f"grep commonName_default {conf}").split()[-1])
self_signed_issuers.append(
check_output(f"grep commonName_default {conf}").split()[-1]
)
if cert_issuer in self_signed_issuers:
CA_type = "selfsigned"

View file

@ -506,7 +506,9 @@ def _get_registrar_config_section(domain):
from lexicon.providers.auto import _relevant_provider_for_domain
registrar_infos = {
"name": m18n.n('registrar_infos'), # This is meant to name the config panel section, for proper display in the webadmin
"name": m18n.n(
"registrar_infos"
), # This is meant to name the config panel section, for proper display in the webadmin
}
dns_zone = _get_dns_zone_for_domain(domain)

View file

@ -54,7 +54,10 @@ DOMAIN_CACHE_DURATION = 15
def _get_maindomain():
global main_domain_cache
global main_domain_cache_timestamp
if not main_domain_cache or abs(main_domain_cache_timestamp - time.time()) > DOMAIN_CACHE_DURATION:
if (
not main_domain_cache
or abs(main_domain_cache_timestamp - time.time()) > DOMAIN_CACHE_DURATION
):
with open("/etc/yunohost/current_host", "r") as f:
main_domain_cache = f.readline().rstrip()
main_domain_cache_timestamp = time.time()
@ -65,7 +68,10 @@ def _get_maindomain():
def _get_domains(exclude_subdomains=False):
global domain_list_cache
global domain_list_cache_timestamp
if not domain_list_cache or abs(domain_list_cache_timestamp - time.time()) > DOMAIN_CACHE_DURATION:
if (
not domain_list_cache
or abs(domain_list_cache_timestamp - time.time()) > DOMAIN_CACHE_DURATION
):
from yunohost.utils.ldap import _get_ldap_interface
ldap = _get_ldap_interface()
@ -86,9 +92,7 @@ def _get_domains(exclude_subdomains=False):
if exclude_subdomains:
return [
domain
for domain in domain_list_cache
if not _get_parent_domain_of(domain)
domain for domain in domain_list_cache if not _get_parent_domain_of(domain)
]
return domain_list_cache
@ -562,7 +566,10 @@ class DomainConfigPanel(ConfigPanel):
if not filter_key or filter_key[0] == "cert":
from yunohost.certificate import certificate_status
status = certificate_status([self.entity], full=True)["certificates"][self.entity]
status = certificate_status([self.entity], full=True)["certificates"][
self.entity
]
toml["cert"]["cert"]["cert_summary"]["style"] = status["style"]
@ -571,7 +578,9 @@ class DomainConfigPanel(ConfigPanel):
# i18n: domain_config_cert_summary_abouttoexpire
# i18n: domain_config_cert_summary_ok
# i18n: domain_config_cert_summary_letsencrypt
toml["cert"]["cert"]["cert_summary"]["ask"] = m18n.n(f"domain_config_cert_summary_{status['summary']}")
toml["cert"]["cert"]["cert_summary"]["ask"] = m18n.n(
f"domain_config_cert_summary_{status['summary']}"
)
# Other specific strings used in config panels
# i18n: domain_config_cert_renew_help

View file

@ -29,7 +29,9 @@ class MyMigration(Migration):
raise YunohostError(f"Can't open setting file : {e}", raw_msg=True)
settings = {
translate_legacy_settings_to_configpanel_settings(k).split('.')[-1]: v["value"]
translate_legacy_settings_to_configpanel_settings(k).split(".")[-1]: v[
"value"
]
for k, v in old_settings.items()
}

View file

@ -31,7 +31,10 @@ class MyMigration(Migration):
all_users = user_list()["users"].keys()
new_admin_user = None
for user in all_users:
if any(alias.startswith("root@") for alias in user_info(user).get("mail-aliases", [])):
if any(
alias.startswith("root@")
for alias in user_info(user).get("mail-aliases", [])
):
new_admin_user = user
break
@ -39,7 +42,21 @@ class MyMigration(Migration):
if new_admin_user:
aliases = user_info(new_admin_user).get("mail-aliases", [])
old_admin_aliases_to_remove = [alias for alias in aliases if any(alias.startswith(a) for a in ["root@", "admin@", "admins@", "webmaster@", "postmaster@", "abuse@"])]
old_admin_aliases_to_remove = [
alias
for alias in aliases
if any(
alias.startswith(a)
for a in [
"root@",
"admin@",
"admins@",
"webmaster@",
"postmaster@",
"abuse@",
]
)
]
user_update(new_admin_user, remove_mailalias=old_admin_aliases_to_remove)
@ -63,7 +80,7 @@ class MyMigration(Migration):
"sudoCommand": ["ALL"],
"sudoUser": ["%admins"],
"sudoHost": ["ALL"],
}
},
)
ldap.add(
@ -73,7 +90,7 @@ class MyMigration(Migration):
"objectClass": ["top", "posixGroup", "groupOfNamesYnh", "mailGroup"],
"gidNumber": ["4001"],
"mail": ["root", "admin", "admins", "webmaster", "postmaster", "abuse"],
}
},
)
permission_sync_to_user()
@ -106,6 +123,5 @@ class MyMigration(Migration):
ldap.add("uid=admin,ou=users", attr_dict)
user_group_update(groupname="admins", add="admin", sync_perm=True)
def run_after_system_restore(self):
self.run()

View file

@ -147,6 +147,7 @@ class SettingsConfigPanel(ConfigPanel):
raise YunohostValidationError("password_confirmation_not_the_same")
from yunohost.tools import tools_rootpw
tools_rootpw(root_password, check_strength=True)
super()._apply()

View file

@ -5,7 +5,11 @@ from moulinette.utils.process import check_output
from yunohost.app import app_setting
from yunohost.domain import _get_maindomain
from yunohost.utils.resources import AppResource, AppResourceManager, AppResourceClassesByType
from yunohost.utils.resources import (
AppResource,
AppResourceManager,
AppResourceClassesByType,
)
from yunohost.permission import user_permission_list, permission_delete
dummyfile = "/tmp/dummyappresource-testapp"
@ -70,7 +74,9 @@ def test_provision_dummy():
wanted = {"resources": {"dummy": {}}}
assert not os.path.exists(dummyfile)
AppResourceManager("testapp", current=current, wanted=wanted).apply(rollback_if_failure=False)
AppResourceManager("testapp", current=current, wanted=wanted).apply(
rollback_if_failure=False
)
assert open(dummyfile).read().strip() == "foo"
@ -82,7 +88,9 @@ def test_deprovision_dummy():
open(dummyfile, "w").write("foo")
assert open(dummyfile).read().strip() == "foo"
AppResourceManager("testapp", current=current, wanted=wanted).apply(rollback_if_failure=False)
AppResourceManager("testapp", current=current, wanted=wanted).apply(
rollback_if_failure=False
)
assert not os.path.exists(dummyfile)
@ -92,7 +100,9 @@ def test_provision_dummy_nondefaultvalue():
wanted = {"resources": {"dummy": {"content": "bar"}}}
assert not os.path.exists(dummyfile)
AppResourceManager("testapp", current=current, wanted=wanted).apply(rollback_if_failure=False)
AppResourceManager("testapp", current=current, wanted=wanted).apply(
rollback_if_failure=False
)
assert open(dummyfile).read().strip() == "bar"
@ -104,7 +114,9 @@ def test_update_dummy():
open(dummyfile, "w").write("foo")
assert open(dummyfile).read().strip() == "foo"
AppResourceManager("testapp", current=current, wanted=wanted).apply(rollback_if_failure=False)
AppResourceManager("testapp", current=current, wanted=wanted).apply(
rollback_if_failure=False
)
assert open(dummyfile).read().strip() == "bar"
@ -117,7 +129,9 @@ def test_update_dummy_fail():
assert open(dummyfile).read().strip() == "foo"
with pytest.raises(Exception):
AppResourceManager("testapp", current=current, wanted=wanted).apply(rollback_if_failure=False)
AppResourceManager("testapp", current=current, wanted=wanted).apply(
rollback_if_failure=False
)
assert open(dummyfile).read().strip() == "forbiddenvalue"
@ -130,7 +144,9 @@ def test_update_dummy_failwithrollback():
assert open(dummyfile).read().strip() == "foo"
with pytest.raises(Exception):
AppResourceManager("testapp", current=current, wanted=wanted).apply(rollback_if_failure=True)
AppResourceManager("testapp", current=current, wanted=wanted).apply(
rollback_if_failure=True
)
assert open(dummyfile).read().strip() == "foo"
@ -296,7 +312,7 @@ def test_resource_apt():
"key": "https://dl.yarnpkg.com/debian/pubkey.gpg",
"packages": "yarn",
}
}
},
}
assert os.system("dpkg --list | grep -q 'ii *nyancat '") != 0
@ -310,7 +326,9 @@ def test_resource_apt():
assert os.system("dpkg --list | grep -q 'ii *nyancat '") == 0
assert os.system("dpkg --list | grep -q 'ii *sl '") == 0
assert os.system("dpkg --list | grep -q 'ii *yarn '") == 0
assert os.system("dpkg --list | grep -q 'ii *lolcat '") != 0 # Lolcat shouldnt be installed yet
assert (
os.system("dpkg --list | grep -q 'ii *lolcat '") != 0
) # Lolcat shouldnt be installed yet
assert os.system("dpkg --list | grep -q 'ii *testapp-ynh-deps '") == 0
conf["packages"] += ", lolcat"
@ -359,10 +377,7 @@ def test_resource_permissions():
assert res["testapp.main"]["url"] == "/"
assert "testapp.admin" not in res
conf["admin"] = {
"url": "/admin",
"allowed": ""
}
conf["admin"] = {"url": "/admin", "allowed": ""}
r(conf, "testapp", manager).provision_or_update()

View file

@ -167,7 +167,9 @@ def install_manifestv2_app(domain, path, public=True):
app_install(
os.path.join(get_test_apps_dir(), "manifestv2_app_ynh"),
args="domain={}&path={}&init_main_permission={}".format(domain, path, "visitors" if public else "all_users"),
args="domain={}&path={}&init_main_permission={}".format(
domain, path, "visitors" if public else "all_users"
),
force=True,
)
@ -220,7 +222,12 @@ def test_legacy_app_manifest_preinstall():
assert "integration" in m
assert "install" in m
assert m["doc"] == {}
assert m["notifications"] == {"pre_install": {}, "pre_upgrade": {}, "post_install": {}, "post_upgrade": {}}
assert m["notifications"] == {
"pre_install": {},
"pre_upgrade": {},
"post_install": {},
"post_upgrade": {},
}
def test_manifestv2_app_manifest_preinstall():
@ -231,11 +238,23 @@ def test_manifestv2_app_manifest_preinstall():
assert "install" in m
assert "description" in m
assert "doc" in m
assert "This is a dummy description of this app features" in m["doc"]["DESCRIPTION"]["en"]
assert "Ceci est une fausse description des fonctionalités de l'app" in m["doc"]["DESCRIPTION"]["fr"]
assert (
"This is a dummy description of this app features"
in m["doc"]["DESCRIPTION"]["en"]
)
assert (
"Ceci est une fausse description des fonctionalités de l'app"
in m["doc"]["DESCRIPTION"]["fr"]
)
assert "notifications" in m
assert "This is a dummy disclaimer to display prior to the install" in m["notifications"]["pre_install"]["main"]["en"]
assert "Ceci est un faux disclaimer à présenter avant l'installation" in m["notifications"]["pre_install"]["main"]["fr"]
assert (
"This is a dummy disclaimer to display prior to the install"
in m["notifications"]["pre_install"]["main"]["en"]
)
assert (
"Ceci est un faux disclaimer à présenter avant l'installation"
in m["notifications"]["pre_install"]["main"]["fr"]
)
def test_manifestv2_app_install_main_domain():
@ -269,11 +288,23 @@ def test_manifestv2_app_info_postinstall():
assert "description" in m
assert "doc" in m
assert "The app install dir is /var/www/manifestv2_app" in m["doc"]["ADMIN"]["en"]
assert "Le dossier d'install de l'app est /var/www/manifestv2_app" in m["doc"]["ADMIN"]["fr"]
assert (
"Le dossier d'install de l'app est /var/www/manifestv2_app"
in m["doc"]["ADMIN"]["fr"]
)
assert "notifications" in m
assert "The app install dir is /var/www/manifestv2_app" in m["notifications"]["post_install"]["main"]["en"]
assert "The app id is manifestv2_app" in m["notifications"]["post_install"]["main"]["en"]
assert f"The app url is {main_domain}/manifestv2" in m["notifications"]["post_install"]["main"]["en"]
assert (
"The app install dir is /var/www/manifestv2_app"
in m["notifications"]["post_install"]["main"]["en"]
)
assert (
"The app id is manifestv2_app"
in m["notifications"]["post_install"]["main"]["en"]
)
assert (
f"The app url is {main_domain}/manifestv2"
in m["notifications"]["post_install"]["main"]["en"]
)
def test_manifestv2_app_info_preupgrade(monkeypatch):
@ -281,6 +312,7 @@ def test_manifestv2_app_info_preupgrade(monkeypatch):
manifest = app_manifest(os.path.join(get_test_apps_dir(), "manifestv2_app_ynh"))
from yunohost.app_catalog import _load_apps_catalog as original_load_apps_catalog
def custom_load_apps_catalog(*args, **kwargs):
res = original_load_apps_catalog(*args, **kwargs)
@ -295,6 +327,7 @@ def test_manifestv2_app_info_preupgrade(monkeypatch):
res["apps"]["manifestv2_app"]["manifest"]["version"] = "99999~ynh1"
return res
monkeypatch.setattr("yunohost.app._load_apps_catalog", custom_load_apps_catalog)
main_domain = _get_maindomain()
@ -306,8 +339,11 @@ def test_manifestv2_app_info_preupgrade(monkeypatch):
# FIXME : as I write this test, I realize that this implies the catalog API
# does provide the notifications, which means the list builder script
# should parse the files in the original app repo, possibly with proper i18n etc
assert "This is a dummy disclaimer to display prior to any upgrade" \
assert (
"This is a dummy disclaimer to display prior to any upgrade"
in i["from_catalog"]["manifest"]["notifications"]["pre_upgrade"]["main"]["en"]
)
def test_app_from_catalog():
main_domain = _get_maindomain()

View file

@ -361,7 +361,9 @@ def test_backup_not_enough_free_space(monkeypatch, mocker):
def custom_free_space_in_directory(dirpath):
return 0
monkeypatch.setattr("yunohost.backup.space_used_by_directory", custom_space_used_by_directory)
monkeypatch.setattr(
"yunohost.backup.space_used_by_directory", custom_space_used_by_directory
)
monkeypatch.setattr(
"yunohost.backup.free_space_in_directory", custom_free_space_in_directory
)

View file

@ -78,7 +78,7 @@ def _permission_create_with_dummy_app(
"name": app,
"id": app,
"description": {"en": "Dummy app to test permissions"},
"arguments": {"install": []}
"arguments": {"install": []},
},
f,
)

View file

@ -12,7 +12,7 @@ from yunohost.settings import (
settings_set,
settings_reset,
settings_reset_all,
SETTINGS_PATH
SETTINGS_PATH,
)
EXAMPLE_SETTINGS = """
@ -38,12 +38,15 @@ EXAMPLE_SETTINGS = """
default = "a"
"""
def setup_function(function):
# Backup settings
if os.path.exists(SETTINGS_PATH):
os.system(f"mv {SETTINGS_PATH} {SETTINGS_PATH}.saved")
# Add example settings to config panel
os.system("cp /usr/share/yunohost/config_global.toml /usr/share/yunohost/config_global.toml.saved")
os.system(
"cp /usr/share/yunohost/config_global.toml /usr/share/yunohost/config_global.toml.saved"
)
with open("/usr/share/yunohost/config_global.toml", "a") as file:
file.write(EXAMPLE_SETTINGS)
@ -53,11 +56,14 @@ def teardown_function(function):
os.system(f"mv {SETTINGS_PATH}.saved {SETTINGS_PATH}")
elif os.path.exists(SETTINGS_PATH):
os.remove(SETTINGS_PATH)
os.system("mv /usr/share/yunohost/config_global.toml.saved /usr/share/yunohost/config_global.toml")
os.system(
"mv /usr/share/yunohost/config_global.toml.saved /usr/share/yunohost/config_global.toml"
)
old_translate = moulinette.core.Translator.translate
def _monkeypatch_translator(self, key, *args, **kwargs):
if key.startswith("global_settings_setting_"):
@ -65,6 +71,7 @@ def _monkeypatch_translator(self, key, *args, **kwargs):
return old_translate(self, key, *args, **kwargs)
moulinette.core.Translator.translate = _monkeypatch_translator
@ -205,7 +212,12 @@ def test_settings_list_modified():
def test_reset():
option = settings_get("example.example.number", full=True).get('panels')[0].get('sections')[0].get('options')[0]
option = (
settings_get("example.example.number", full=True)
.get("panels")[0]
.get("sections")[0]
.get("options")[0]
)
settings_set("example.example.number", 21)
assert settings_get("example.example.number") == 21
settings_reset("example.example.number")

View file

@ -255,6 +255,7 @@ def test_del_group_all_users(mocker):
with raiseYunohostError(mocker, "group_cannot_be_deleted"):
user_group_delete("all_users")
def test_del_group_that_does_not_exist(mocker):
with raiseYunohostError(mocker, "group_unknown"):
user_group_delete("doesnt_exist")

View file

@ -138,17 +138,25 @@ def user_create(
):
if firstname or lastname:
logger.warning("Options --firstname / --lastname of 'yunohost user create' are deprecated. We recommend using --fullname instead.")
logger.warning(
"Options --firstname / --lastname of 'yunohost user create' are deprecated. We recommend using --fullname instead."
)
if not fullname or not fullname.strip():
if not firstname.strip():
raise YunohostValidationError("You should specify the fullname of the user using option -F")
lastname = lastname or " " # Stupid hack because LDAP requires the sn/lastname attr, but it accepts a single whitespace...
raise YunohostValidationError(
"You should specify the fullname of the user using option -F"
)
lastname = (
lastname or " "
) # Stupid hack because LDAP requires the sn/lastname attr, but it accepts a single whitespace...
fullname = f"{firstname} {lastname}".strip()
else:
fullname = fullname.strip()
firstname = fullname.split()[0]
lastname = ' '.join(fullname.split()[1:]) or " " # Stupid hack because LDAP requires the sn/lastname attr, but it accepts a single whitespace...
lastname = (
" ".join(fullname.split()[1:]) or " "
) # Stupid hack because LDAP requires the sn/lastname attr, but it accepts a single whitespace...
from yunohost.domain import domain_list, _get_maindomain, _assert_domain_exists
from yunohost.hook import hook_callback
@ -358,12 +366,16 @@ def user_update(
):
if firstname or lastname:
logger.warning("Options --firstname / --lastname of 'yunohost user create' are deprecated. We recommend using --fullname instead.")
logger.warning(
"Options --firstname / --lastname of 'yunohost user create' are deprecated. We recommend using --fullname instead."
)
if fullname and fullname.strip():
fullname = fullname.strip()
firstname = fullname.split()[0]
lastname = ' '.join(fullname.split()[1:]) or " " # Stupid hack because LDAP requires the sn/lastname attr, but it accepts a single whitespace...
lastname = (
" ".join(fullname.split()[1:]) or " "
) # Stupid hack because LDAP requires the sn/lastname attr, but it accepts a single whitespace...
from yunohost.domain import domain_list, _get_maindomain
from yunohost.app import app_ssowatconf
@ -423,7 +435,9 @@ def user_update(
# Ensure compatibility and sufficiently complex password
assert_password_is_compatible(change_password)
is_admin = "cn=admins,ou=groups,dc=yunohost,dc=org" in user["memberOf"]
assert_password_is_strong_enough("admin" if is_admin else "user", change_password)
assert_password_is_strong_enough(
"admin" if is_admin else "user", change_password
)
new_attr_dict["userPassword"] = [_hash_user_password(change_password)]
env_dict["YNH_USER_PASSWORD"] = change_password
@ -1322,6 +1336,7 @@ def user_ssh_remove_key(username, key):
# End SSH subcategory
#
def _hash_user_password(password):
"""
This function computes and return a salted hash for the password in input.

View file

@ -328,9 +328,7 @@ class ConfigPanel:
return actions
def run_action(
self, action=None, args=None, args_file=None, operation_logger=None
):
def run_action(self, action=None, args=None, args_file=None, operation_logger=None):
#
# FIXME : this stuff looks a lot like set() ...
#
@ -610,25 +608,19 @@ class ConfigPanel:
"max_progression",
]
forbidden_keywords += format_description["sections"]
forbidden_readonly_types = [
"password",
"app",
"domain",
"user",
"file"
]
forbidden_readonly_types = ["password", "app", "domain", "user", "file"]
for _, _, option in self._iterate():
if option["id"] in forbidden_keywords:
raise YunohostError("config_forbidden_keyword", keyword=option["id"])
if (
option.get("readonly", False) and
option.get("type", "string") in forbidden_readonly_types
option.get("readonly", False)
and option.get("type", "string") in forbidden_readonly_types
):
raise YunohostError(
"config_forbidden_readonly_type",
type=option["type"],
id=option["id"]
id=option["id"],
)
return self.config
@ -638,7 +630,13 @@ class ConfigPanel:
for _, section, option in self._iterate():
if option["id"] not in self.values:
allowed_empty_types = ["alert", "display_text", "markdown", "file", "button"]
allowed_empty_types = [
"alert",
"display_text",
"markdown",
"file",
"button",
]
if section["is_action_section"] and option.get("default") is not None:
self.values[option["id"]] = option["default"]
@ -668,8 +666,10 @@ class ConfigPanel:
if "ask" not in option:
option["ask"] = m18n.n(self.config["i18n"] + "_" + option["id"])
# auto add i18n help text if present in locales
if m18n.key_exists(self.config["i18n"] + "_" + option["id"] + '_help'):
option["help"] = m18n.n(self.config["i18n"] + "_" + option["id"] + '_help')
if m18n.key_exists(self.config["i18n"] + "_" + option["id"] + "_help"):
option["help"] = m18n.n(
self.config["i18n"] + "_" + option["id"] + "_help"
)
def display_header(message):
"""CLI panel/section header display"""
@ -678,8 +678,12 @@ class ConfigPanel:
for panel, section, obj in self._iterate(["panel", "section"]):
if section and section.get("visible") and not evaluate_simple_js_expression(
if (
section
and section.get("visible")
and not evaluate_simple_js_expression(
section["visible"], context=self.new_values
)
):
continue
@ -698,8 +702,10 @@ class ConfigPanel:
elif section:
# filter action section options in case of multiple buttons
section["options"] = [
option for option in section["options"]
if option.get("type", "string") != "button" or option["id"] == action
option
for option in section["options"]
if option.get("type", "string") != "button"
or option["id"] == action
]
if panel == obj:
@ -956,7 +962,9 @@ class Question:
if self.readonly:
text_for_user_input_in_cli = colorize(text_for_user_input_in_cli, "purple")
if self.choices:
return text_for_user_input_in_cli + f" {self.choices[self.current_value]}"
return (
text_for_user_input_in_cli + f" {self.choices[self.current_value]}"
)
return text_for_user_input_in_cli + f" {self.humanize(self.current_value)}"
elif self.choices:
@ -1333,7 +1341,9 @@ class UserQuestion(Question):
class GroupQuestion(Question):
argument_type = "group"
def __init__(self, question, context: Mapping[str, Any] = {}, hooks: Dict[str, Callable] = {}):
def __init__(
self, question, context: Mapping[str, Any] = {}, hooks: Dict[str, Callable] = {}
):
from yunohost.user import user_group_list
@ -1569,21 +1579,20 @@ def ask_questions_and_parse_answers(
out = []
for name, raw_question in raw_questions.items():
raw_question['name'] = name
raw_question["name"] = name
question_class = ARGUMENTS_TYPE_PARSERS[raw_question.get("type", "string")]
raw_question["value"] = answers.get(name)
question = question_class(raw_question, context=context, hooks=hooks)
if question.type == "button":
if (
question.enabled is None # type: ignore
or evaluate_simple_js_expression(question.enabled, context=context) # type: ignore
):
if question.enabled is None or evaluate_simple_js_expression( # type: ignore
question.enabled, context=context
): # type: ignore
continue
else:
raise YunohostValidationError(
"config_action_disabled",
action=question.name,
help=_value_for_locale(question.help)
help=_value_for_locale(question.help),
)
new_values = question.ask_if_needed()

View file

@ -99,12 +99,14 @@ LEGACY_SETTINGS = {
"ssowat.panel_overlay.enabled": "misc.portal.ssowat_panel_overlay_enabled",
"security.webadmin.allowlist.enabled": "security.webadmin.webadmin_allowlist_enabled",
"security.webadmin.allowlist": "security.webadmin.webadmin_allowlist",
"security.experimental.enabled": "security.experimental.security_experimental_enabled"
"security.experimental.enabled": "security.experimental.security_experimental_enabled",
}
def translate_legacy_settings_to_configpanel_settings(settings):
return LEGACY_SETTINGS.get(settings, settings)
def legacy_permission_label(app, permission_type):
return LEGACY_PERMISSION_LABEL.get(
(app, permission_type), "Legacy %s urls" % permission_type

View file

@ -117,7 +117,9 @@ class AppResourceManager:
yield ("provision", name, None, wanted_resource)
else:
infos_ = self.current["resources"][name]
current_resource = AppResourceClassesByType[name](infos_, self.app, self)
current_resource = AppResourceClassesByType[name](
infos_, self.app, self
)
yield ("update", name, current_resource, wanted_resource)
@ -143,24 +145,32 @@ class AppResource:
def get_setting(self, key):
from yunohost.app import app_setting
return app_setting(self.app, key)
def set_setting(self, key, value):
from yunohost.app import app_setting
app_setting(self.app, key, value=value)
def delete_setting(self, key):
from yunohost.app import app_setting
app_setting(self.app, key, delete=True)
def _run_script(self, action, script, env={}, user="root"):
from yunohost.app import _make_tmp_workdir_for_app, _make_environment_for_app_script
from yunohost.app import (
_make_tmp_workdir_for_app,
_make_environment_for_app_script,
)
from yunohost.hook import hook_exec_with_script_debug_if_failure
tmpdir = _make_tmp_workdir_for_app(app=self.app)
env_ = _make_environment_for_app_script(self.app, workdir=tmpdir, action=f"{action}_{self.type}")
env_ = _make_environment_for_app_script(
self.app, workdir=tmpdir, action=f"{action}_{self.type}"
)
env_.update(env)
script_path = f"{tmpdir}/{action}_{self.type}"
@ -179,7 +189,9 @@ ynh_abort_if_errors
# FIXME ? : this is an ugly hack :(
operation_logger = OperationLogger._instances[-1]
else:
operation_logger = OperationLogger("resource_snippet", [("app", self.app)], env=env_)
operation_logger = OperationLogger(
"resource_snippet", [("app", self.app)], env=env_
)
operation_logger.start()
try:
@ -191,7 +203,7 @@ ynh_abort_if_errors
env=env_,
operation_logger=operation_logger,
error_message_if_script_failed="An error occured inside the script snippet",
error_message_if_failed=lambda e: f"{action} failed for {self.type} : {e}"
error_message_if_failed=lambda e: f"{action} failed for {self.type} : {e}",
)
finally:
if call_failed:
@ -253,8 +265,7 @@ class PermissionsResource(AppResource):
type = "permissions"
priority = 80
default_properties: Dict[str, Any] = {
}
default_properties: Dict[str, Any] = {}
default_perm_properties: Dict[str, Any] = {
"url": None,
@ -277,8 +288,13 @@ class PermissionsResource(AppResource):
if properties[perm]["show_tile"] is None:
properties[perm]["show_tile"] = bool(properties[perm]["url"])
if isinstance(properties["main"]["url"], str) and properties["main"]["url"] != "/":
raise YunohostError("URL for the 'main' permission should be '/' for webapps (or undefined/None for non-webapps). Note that / refers to the install url of the app")
if (
isinstance(properties["main"]["url"], str)
and properties["main"]["url"] != "/"
):
raise YunohostError(
"URL for the 'main' permission should be '/' for webapps (or undefined/None for non-webapps). Note that / refers to the install url of the app"
)
super().__init__({"permissions": properties}, *args, **kwargs)
@ -296,7 +312,9 @@ class PermissionsResource(AppResource):
# Delete legacy is_public setting if not already done
self.delete_setting("is_public")
existing_perms = user_permission_list(short=True, apps=[self.app])["permissions"]
existing_perms = user_permission_list(short=True, apps=[self.app])[
"permissions"
]
for perm in existing_perms:
if perm.split(".")[1] not in self.permissions.keys():
permission_delete(perm, force=True, sync_perm=False)
@ -306,7 +324,11 @@ class PermissionsResource(AppResource):
# Use the 'allowed' key from the manifest,
# or use the 'init_{perm}_permission' from the install questions
# which is temporarily saved as a setting as an ugly hack to pass the info to this piece of code...
init_allowed = infos["allowed"] or self.get_setting(f"init_{perm}_permission") or []
init_allowed = (
infos["allowed"]
or self.get_setting(f"init_{perm}_permission")
or []
)
permission_create(
f"{self.app}.{perm}",
allowed=init_allowed,
@ -323,7 +345,7 @@ class PermissionsResource(AppResource):
f"{self.app}.{perm}",
show_tile=infos["show_tile"],
protected=infos["protected"],
sync_perm=False
sync_perm=False,
)
else:
pass
@ -341,7 +363,9 @@ class PermissionsResource(AppResource):
permission_sync_to_user,
)
existing_perms = user_permission_list(short=True, apps=[self.app])["permissions"]
existing_perms = user_permission_list(short=True, apps=[self.app])[
"permissions"
]
for perm in existing_perms:
permission_delete(perm, force=True, sync_perm=False)
@ -380,10 +404,7 @@ class SystemuserAppResource(AppResource):
type = "system_user"
priority = 20
default_properties: Dict[str, Any] = {
"allow_ssh": False,
"allow_sftp": False
}
default_properties: Dict[str, Any] = {"allow_ssh": False, "allow_sftp": False}
# FIXME : wat do regarding ssl-cert, multimedia
# FIXME : wat do about home dir
@ -403,7 +424,9 @@ class SystemuserAppResource(AppResource):
assert ret == 0, f"useradd command failed with exit code {ret}"
if not check_output(f"getent passwd {self.app} &>/dev/null || true").strip():
raise YunohostError(f"Failed to create system user for {self.app}", raw_msg=True)
raise YunohostError(
f"Failed to create system user for {self.app}", raw_msg=True
)
groups = set(check_output(f"groups {self.app}").strip().split()[2:])
@ -491,7 +514,9 @@ class InstalldirAppResource(AppResource):
assert self.owner.strip()
assert self.group.strip()
current_install_dir = self.get_setting("install_dir") or self.get_setting("final_path")
current_install_dir = self.get_setting("install_dir") or self.get_setting(
"final_path"
)
# If during install, /var/www/$app already exists, assume that it's okay to remove and recreate it
# FIXME : is this the right thing to do ?
@ -504,15 +529,25 @@ class InstalldirAppResource(AppResource):
# Maybe a middle ground could be to compute the size, check that it's not too crazy (eg > 1G idk),
# and check for available space on the destination
if current_install_dir and os.path.isdir(current_install_dir):
logger.warning(f"Moving {current_install_dir} to {self.dir} ... (this may take a while)")
logger.warning(
f"Moving {current_install_dir} to {self.dir} ... (this may take a while)"
)
shutil.move(current_install_dir, self.dir)
else:
mkdir(self.dir)
owner, owner_perm = self.owner.split(":")
group, group_perm = self.group.split(":")
owner_perm_octal = (4 if "r" in owner_perm else 0) + (2 if "w" in owner_perm else 0) + (1 if "x" in owner_perm else 0)
group_perm_octal = (4 if "r" in group_perm else 0) + (2 if "w" in group_perm else 0) + (1 if "x" in group_perm else 0)
owner_perm_octal = (
(4 if "r" in owner_perm else 0)
+ (2 if "w" in owner_perm else 0)
+ (1 if "x" in owner_perm else 0)
)
group_perm_octal = (
(4 if "r" in group_perm else 0)
+ (2 if "w" in group_perm else 0)
+ (1 if "x" in group_perm else 0)
)
perm_octal = 0o100 * owner_perm_octal + 0o010 * group_perm_octal
@ -604,8 +639,16 @@ class DatadirAppResource(AppResource):
owner, owner_perm = self.owner.split(":")
group, group_perm = self.group.split(":")
owner_perm_octal = (4 if "r" in owner_perm else 0) + (2 if "w" in owner_perm else 0) + (1 if "x" in owner_perm else 0)
group_perm_octal = (4 if "r" in group_perm else 0) + (2 if "w" in group_perm else 0) + (1 if "x" in group_perm else 0)
owner_perm_octal = (
(4 if "r" in owner_perm else 0)
+ (2 if "w" in owner_perm else 0)
+ (1 if "x" in owner_perm else 0)
)
group_perm_octal = (
(4 if "r" in group_perm else 0)
+ (2 if "w" in group_perm else 0)
+ (1 if "x" in group_perm else 0)
)
perm_octal = 0o100 * owner_perm_octal + 0o010 * group_perm_octal
chmod(self.dir, perm_octal)
@ -661,10 +704,7 @@ class AptDependenciesAppResource(AppResource):
type = "apt"
priority = 50
default_properties: Dict[str, Any] = {
"packages": [],
"extras": {}
}
default_properties: Dict[str, Any] = {"packages": [], "extras": {}}
packages: List = []
extras: Dict[str, Dict[str, str]] = {}
@ -672,8 +712,12 @@ class AptDependenciesAppResource(AppResource):
def __init__(self, properties: Dict[str, Any], *args, **kwargs):
for key, values in properties.get("extras", {}).items():
if not all(isinstance(values.get(k), str) for k in ["repo", "key", "packages"]):
raise YunohostError("In apt resource in the manifest: 'extras' repo should have the keys 'repo', 'key' and 'packages' defined and be strings")
if not all(
isinstance(values.get(k), str) for k in ["repo", "key", "packages"]
):
raise YunohostError(
"In apt resource in the manifest: 'extras' repo should have the keys 'repo', 'key' and 'packages' defined and be strings"
)
super().__init__(properties, *args, **kwargs)
@ -681,10 +725,12 @@ class AptDependenciesAppResource(AppResource):
script = [f"ynh_install_app_dependencies {self.packages}"]
for repo, values in self.extras.items():
script += [f"ynh_install_extra_app_dependencies --repo='{values['repo']}' --key='{values['key']}' --package='{values['packages']}'"]
script += [
f"ynh_install_extra_app_dependencies --repo='{values['repo']}' --key='{values['key']}' --package='{values['packages']}'"
]
# FIXME : we're feeding the raw value of values['packages'] to the helper .. if we want to be consistent, may they should be comma-separated, though in the majority of cases, only a single package is installed from an extra repo..
self._run_script("provision_or_update", '\n'.join(script))
self._run_script("provision_or_update", "\n".join(script))
def deprovision(self, context: Dict = {}):
@ -734,8 +780,7 @@ class PortsResource(AppResource):
type = "ports"
priority = 70
default_properties: Dict[str, Any] = {
}
default_properties: Dict[str, Any] = {}
default_port_properties = {
"default": None,
@ -762,7 +807,10 @@ class PortsResource(AppResource):
def _port_is_used(self, port):
# FIXME : this could be less brutal than two os.system ...
cmd1 = "ss --numeric --listening --tcp --udp | awk '{print$5}' | grep --quiet --extended-regexp ':%s$'" % port
cmd1 = (
"ss --numeric --listening --tcp --udp | awk '{print$5}' | grep --quiet --extended-regexp ':%s$'"
% port
)
# This second command is mean to cover (most) case where an app is using a port yet ain't currently using it for some reason (typically service ain't up)
cmd2 = f"grep --quiet \"port: '{port}'\" /etc/yunohost/apps/*/settings.yml"
return os.system(cmd1) == 0 and os.system(cmd2) == 0
@ -840,8 +888,13 @@ class DatabaseAppResource(AppResource):
def __init__(self, properties: Dict[str, Any], *args, **kwargs):
if "type" not in properties or properties["type"] not in ["mysql", "postgresql"]:
raise YunohostError("Specifying the type of db ('mysql' or 'postgresql') is mandatory for db resources")
if "type" not in properties or properties["type"] not in [
"mysql",
"postgresql",
]:
raise YunohostError(
"Specifying the type of db ('mysql' or 'postgresql') is mandatory for db resources"
)
super().__init__(properties, *args, **kwargs)
@ -850,14 +903,19 @@ class DatabaseAppResource(AppResource):
if self.type == "mysql":
return os.system(f"mysqlshow '{db_name}' >/dev/null 2>/dev/null") == 0
elif self.type == "postgresql":
return os.system(f"sudo --login --user=postgres psql -c '' '{db_name}' >/dev/null 2>/dev/null") == 0
return (
os.system(
f"sudo --login --user=postgres psql -c '' '{db_name}' >/dev/null 2>/dev/null"
)
== 0
)
else:
return False
def provision_or_update(self, context: Dict = {}):
# This is equivalent to ynh_sanitize_dbid
db_name = self.app.replace('-', '_').replace('.', '_')
db_name = self.app.replace("-", "_").replace(".", "_")
db_user = db_name
self.set_setting("db_name", db_name)
self.set_setting("db_user", db_user)
@ -867,7 +925,9 @@ class DatabaseAppResource(AppResource):
db_pwd = self.get_setting("db_pwd")
else:
# Legacy setting migration
legacypasswordsetting = "psqlpwd" if self.type == "postgresql" else "mysqlpwd"
legacypasswordsetting = (
"psqlpwd" if self.type == "postgresql" else "mysqlpwd"
)
if self.get_setting(legacypasswordsetting):
db_pwd = self.get_setting(legacypasswordsetting)
self.delete_setting(legacypasswordsetting)
@ -875,25 +935,36 @@ class DatabaseAppResource(AppResource):
if not db_pwd:
from moulinette.utils.text import random_ascii
db_pwd = random_ascii(24)
self.set_setting("db_pwd", db_pwd)
if not self.db_exists(db_name):
if self.type == "mysql":
self._run_script("provision", f"ynh_mysql_create_db '{db_name}' '{db_user}' '{db_pwd}'")
self._run_script(
"provision",
f"ynh_mysql_create_db '{db_name}' '{db_user}' '{db_pwd}'",
)
elif self.type == "postgresql":
self._run_script("provision", f"ynh_psql_create_user '{db_user}' '{db_pwd}'; ynh_psql_create_db '{db_name}' '{db_user}'")
self._run_script(
"provision",
f"ynh_psql_create_user '{db_user}' '{db_pwd}'; ynh_psql_create_db '{db_name}' '{db_user}'",
)
def deprovision(self, context: Dict = {}):
db_name = self.app.replace('-', '_').replace('.', '_')
db_name = self.app.replace("-", "_").replace(".", "_")
db_user = db_name
if self.type == "mysql":
self._run_script("deprovision", f"ynh_mysql_remove_db '{db_name}' '{db_user}'")
self._run_script(
"deprovision", f"ynh_mysql_remove_db '{db_name}' '{db_user}'"
)
elif self.type == "postgresql":
self._run_script("deprovision", f"ynh_psql_remove_db '{db_name}' '{db_user}'")
self._run_script(
"deprovision", f"ynh_psql_remove_db '{db_name}' '{db_user}'"
)
self.delete_setting("db_name")
self.delete_setting("db_user")

View file

@ -55,7 +55,9 @@ def space_used_by_directory(dirpath, follow_symlinks=True):
return int(du_output.split()[0])
stat = os.statvfs(dirpath)
return stat.f_frsize * stat.f_blocks # FIXME : this doesnt do what the function name suggest this does ...
return (
stat.f_frsize * stat.f_blocks
) # FIXME : this doesnt do what the function name suggest this does ...
def human_to_binary(size: str) -> int:
@ -69,7 +71,9 @@ def human_to_binary(size: str) -> int:
size = size[:-1]
if suffix not in symbols:
raise YunohostError(f"Invalid size suffix '{suffix}', expected one of {symbols}")
raise YunohostError(
f"Invalid size suffix '{suffix}', expected one of {symbols}"
)
try:
size_ = float(size)
@ -97,6 +101,7 @@ def binary_to_human(n: int) -> str:
def ram_available():
import psutil
return (psutil.virtual_memory().available, psutil.swap_memory().free)