Merge pull request #1773 from YunoHost/ci-format-debian/11.2.10

[CI] Format code with Black
This commit is contained in:
Alexandre Aubin 2024-02-09 21:16:18 +01:00 committed by GitHub
commit f130f4fc56
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 132 additions and 87 deletions

View file

@ -8,6 +8,7 @@ adds `--help` at the end if one presses [tab] again.
author: Christophe Vuillot
"""
import os
import yaml

View file

@ -990,9 +990,9 @@ def app_manifest(app, with_screenshot=False):
if entry.is_file() and ext in ("png", "jpg", "jpeg", "webp", "gif"):
with open(entry.path, "rb") as img_file:
data = base64.b64encode(img_file.read()).decode("utf-8")
manifest[
"screenshot"
] = f"data:image/{ext};charset=utf-8;base64,{data}"
manifest["screenshot"] = (
f"data:image/{ext};charset=utf-8;base64,{data}"
)
break
shutil.rmtree(extracted_app_folder)
@ -1093,7 +1093,9 @@ def app_install(
app_id = manifest["id"]
if app_id in user_list()["users"].keys():
raise YunohostValidationError(f"There is already a YunoHost user called {app_id} ...", raw_msg=True)
raise YunohostValidationError(
f"There is already a YunoHost user called {app_id} ...", raw_msg=True
)
# Check requirements
for name, passed, values, err in _check_manifest_requirements(
@ -1639,9 +1641,11 @@ def app_setting(app, key, value=None, delete=False):
permission_create(
permission=permission_name,
# FIXME find a way to limit to only the user allowed to the main permission
allowed=["all_users"]
if key.startswith("protected_")
else ["all_users", "visitors"],
allowed=(
["all_users"]
if key.startswith("protected_")
else ["all_users", "visitors"]
),
url=None,
additional_urls=urls,
auth_header=not key.startswith("skipped_"),

View file

@ -88,7 +88,6 @@ logger = getActionLogger("yunohost.backup")
class BackupRestoreTargetsManager:
"""
BackupRestoreTargetsManager manage the targets
in BackupManager and RestoreManager
@ -211,7 +210,6 @@ class BackupRestoreTargetsManager:
class BackupManager:
"""
This class collect files to backup in a list and apply one or several
backup method on it.
@ -825,7 +823,6 @@ class BackupManager:
class RestoreManager:
"""
RestoreManager allow to restore a past backup archive
@ -1328,9 +1325,11 @@ class RestoreManager:
url=permission_infos["url"],
additional_urls=permission_infos["additional_urls"],
auth_header=permission_infos["auth_header"],
label=permission_infos["label"]
if perm_name == "main"
else permission_infos["sublabel"],
label=(
permission_infos["label"]
if perm_name == "main"
else permission_infos["sublabel"]
),
show_tile=permission_infos["show_tile"],
protected=permission_infos["protected"],
sync_perm=False,
@ -1468,9 +1467,11 @@ class RestoreManager:
url=permission_infos.get("url"),
additional_urls=permission_infos.get("additional_urls"),
auth_header=permission_infos.get("auth_header"),
label=permission_infos.get("label")
if perm_name == "main"
else permission_infos.get("sublabel"),
label=(
permission_infos.get("label")
if perm_name == "main"
else permission_infos.get("sublabel")
),
show_tile=permission_infos.get("show_tile", True),
protected=permission_infos.get("protected", False),
sync_perm=False,
@ -1570,7 +1571,6 @@ class RestoreManager:
# Backup methods #
#
class BackupMethod:
"""
BackupMethod is an abstract class that represents a way to backup and
restore a list of files.
@ -1861,7 +1861,6 @@ class BackupMethod:
class CopyBackupMethod(BackupMethod):
"""
This class just do an uncompress copy of each file in a location, and
could be the inverse for restoring
@ -2093,7 +2092,6 @@ class TarBackupMethod(BackupMethod):
class CustomBackupMethod(BackupMethod):
"""
This class use a bash script/hook "backup_method" to do the
backup/restore operations. A user can add his own hook inside

View file

@ -579,11 +579,15 @@ def _prepare_certificate_signing_request(domain, key_file, output_folder):
sanlist = []
# Handle the boring case where the domain is not the root of the dns zone etc...
from yunohost.dns import _get_relative_name_for_dns_zone, _get_dns_zone_for_domain
from yunohost.dns import (
_get_relative_name_for_dns_zone,
_get_dns_zone_for_domain,
)
base_dns_zone = _get_dns_zone_for_domain(domain)
basename = _get_relative_name_for_dns_zone(domain, base_dns_zone)
suffix = f".{basename}" if basename != "@" else ""
for sub in ("xmpp-upload", "muc"):
subdomain = sub + "." + domain
if xmpp_records.get("CNAME:" + sub + suffix) == "OK":

View file

@ -118,9 +118,11 @@ class MyDiagnoser(Diagnoser):
"repo": ynh_packages["yunohost"]["repo"],
},
status="INFO" if consistent_versions else "ERROR",
summary="diagnosis_basesystem_ynh_main_version"
if consistent_versions
else "diagnosis_basesystem_ynh_inconsistent_versions",
summary=(
"diagnosis_basesystem_ynh_main_version"
if consistent_versions
else "diagnosis_basesystem_ynh_inconsistent_versions"
),
details=ynh_version_details,
)

View file

@ -73,9 +73,11 @@ class MyDiagnoser(Diagnoser):
yield dict(
meta={"test": "dnsresolv"},
status="ERROR",
summary="diagnosis_ip_broken_dnsresolution"
if good_resolvconf
else "diagnosis_ip_broken_resolvconf",
summary=(
"diagnosis_ip_broken_dnsresolution"
if good_resolvconf
else "diagnosis_ip_broken_resolvconf"
),
)
return
# Otherwise, if the resolv conf is bad but we were able to resolve domain name,
@ -123,11 +125,9 @@ class MyDiagnoser(Diagnoser):
yield dict(
meta={"test": "ipv4"},
data={"global": ipv4, "local": get_local_ip("ipv4")},
status="SUCCESS"
if ipv4
else "ERROR"
if is_ipvx_important(4)
else "WARNING",
status=(
"SUCCESS" if ipv4 else "ERROR" if is_ipvx_important(4) else "WARNING"
),
summary="diagnosis_ip_connected_ipv4" if ipv4 else "diagnosis_ip_no_ipv4",
details=["diagnosis_ip_global", "diagnosis_ip_local"] if ipv4 else None,
)
@ -135,19 +135,27 @@ class MyDiagnoser(Diagnoser):
yield dict(
meta={"test": "ipv6"},
data={"global": ipv6, "local": get_local_ip("ipv6")},
status="SUCCESS"
if ipv6
else "ERROR"
if settings_get("misc.network.dns_exposure") == "ipv6"
else "WARNING",
status=(
"SUCCESS"
if ipv6
else (
"ERROR"
if settings_get("misc.network.dns_exposure") == "ipv6"
else "WARNING"
)
),
summary="diagnosis_ip_connected_ipv6" if ipv6 else "diagnosis_ip_no_ipv6",
details=["diagnosis_ip_global", "diagnosis_ip_local"]
if ipv6
else [
"diagnosis_ip_no_ipv6_tip_important"
if is_ipvx_important(6)
else "diagnosis_ip_no_ipv6_tip"
],
details=(
["diagnosis_ip_global", "diagnosis_ip_local"]
if ipv6
else [
(
"diagnosis_ip_no_ipv6_tip_important"
if is_ipvx_important(6)
else "diagnosis_ip_no_ipv6_tip"
)
]
),
)
# TODO / FIXME : add some attempt to detect ISP (using whois ?) ?

View file

@ -291,9 +291,9 @@ class MyDiagnoser(Diagnoser):
yield dict(
meta=meta,
data={},
status=alert_type.upper()
if alert_type != "not_found"
else "WARNING",
status=(
alert_type.upper() if alert_type != "not_found" else "WARNING"
),
summary="diagnosis_domain_expiration_" + alert_type,
details=details[alert_type],
)

View file

@ -390,9 +390,11 @@ def domain_remove(
apps_on_that_domain.append(
(
app,
f" - {app} \"{label}\" on https://{domain}{settings['path']}"
if "path" in settings
else app,
(
f" - {app} \"{label}\" on https://{domain}{settings['path']}"
if "path" in settings
else app
),
)
)

View file

@ -387,9 +387,11 @@ def hook_exec(
# Define output loggers and call command
loggers = (
lambda l: logger.debug(l.rstrip() + "\r"),
lambda l: logger.warning(l.rstrip())
if is_relevant_warning(l.rstrip())
else logger.debug(l.rstrip()),
lambda l: (
logger.warning(l.rstrip())
if is_relevant_warning(l.rstrip())
else logger.debug(l.rstrip())
),
lambda l: logger.info(l.rstrip()),
)

View file

@ -460,7 +460,6 @@ class RedactingFormatter(Formatter):
class OperationLogger:
"""
Instances of this class represents unit operation done on the ynh instance.

View file

@ -535,12 +535,16 @@ class MyMigration(Migration):
return "Reading database ..." not in line.rstrip()
callbacks = (
lambda l: logger.info("+ " + l.rstrip() + "\r")
if _apt_log_line_is_relevant(l)
else logger.debug(l.rstrip() + "\r"),
lambda l: logger.warning(l.rstrip())
if _apt_log_line_is_relevant(l)
else logger.debug(l.rstrip()),
lambda l: (
logger.info("+ " + l.rstrip() + "\r")
if _apt_log_line_is_relevant(l)
else logger.debug(l.rstrip() + "\r")
),
lambda l: (
logger.warning(l.rstrip())
if _apt_log_line_is_relevant(l)
else logger.debug(l.rstrip())
),
)
cmd = (

View file

@ -347,7 +347,7 @@ def test_resource_permissions():
conf = {
"main": {
"url": "/",
"allowed": "visitors"
"allowed": "visitors",
# TODO: test protected?
},
}

View file

@ -217,9 +217,11 @@ def generate_test_name(intake, output, raw_option, data):
"=".join(
[
key,
str(raw_option[key])
if not isinstance(raw_option[key], str)
else f"'{raw_option[key]}'",
(
str(raw_option[key])
if not isinstance(raw_option[key], str)
else f"'{raw_option[key]}'"
),
]
)
for key in raw_option.keys()
@ -256,9 +258,11 @@ def pytest_generate_tests(metafunc):
[metafunc.cls.raw_option], metafunc.cls.scenarios
)
ids += [
generate_test_name(*args.values)
if isinstance(args, ParameterSet)
else generate_test_name(*args)
(
generate_test_name(*args.values)
if isinstance(args, ParameterSet)
else generate_test_name(*args)
)
for args in argvalues
]
elif params[1] == "expected_normalized":

View file

@ -351,9 +351,11 @@ def tools_update(target=None):
# stdout goes to debug
lambda l: logger.debug(l.rstrip()),
# stderr goes to warning except for the boring apt messages
lambda l: logger.warning(l.rstrip())
if is_legit_warning(l)
else logger.debug(l.rstrip()),
lambda l: (
logger.warning(l.rstrip())
if is_legit_warning(l)
else logger.debug(l.rstrip())
),
)
logger.info(m18n.n("updating_apt_cache"))
@ -490,12 +492,16 @@ def tools_upgrade(operation_logger, target=None):
logger.debug("Running apt command :\n{}".format(dist_upgrade))
callbacks = (
lambda l: logger.info("+ " + l.rstrip() + "\r")
if _apt_log_line_is_relevant(l)
else logger.debug(l.rstrip() + "\r"),
lambda l: logger.warning(l.rstrip())
if _apt_log_line_is_relevant(l)
else logger.debug(l.rstrip()),
lambda l: (
logger.info("+ " + l.rstrip() + "\r")
if _apt_log_line_is_relevant(l)
else logger.debug(l.rstrip() + "\r")
),
lambda l: (
logger.warning(l.rstrip())
if _apt_log_line_is_relevant(l)
else logger.debug(l.rstrip())
),
)
returncode = call_async_output(dist_upgrade, callbacks, shell=True)
@ -623,6 +629,7 @@ def tools_shell(command=None):
shell = code.InteractiveConsole(vars)
shell.interact()
def tools_basic_space_cleanup():
"""
Basic space cleanup.
@ -634,7 +641,11 @@ def tools_basic_space_cleanup():
"""
subprocess.run("apt autoremove && apt autoclean", shell=True)
subprocess.run("journalctl --vacuum-size=50M", shell=True)
subprocess.run("rm /var/log/*.gz && rm /var/log/*/*.gz && rm /var/log/*.? && rm /var/log/*/*.?", shell=True)
subprocess.run(
"rm /var/log/*.gz && rm /var/log/*/*.gz && rm /var/log/*.? && rm /var/log/*/*.?",
shell=True,
)
# ############################################ #
# #
@ -970,9 +981,9 @@ class Migration:
# Those are to be implemented by daughter classes
mode = "auto"
dependencies: List[
str
] = [] # List of migration ids required before running this migration
dependencies: List[str] = (
[]
) # List of migration ids required before running this migration
@property
def disclaimer(self):

View file

@ -1223,14 +1223,20 @@ class PortsResource(AppResource):
def _port_is_used(self, port):
# FIXME : this could be less brutal than two os.system...
used_by_process = os.system(
"ss --numeric --listening --tcp --udp | awk '{print$5}' | grep --quiet --extended-regexp ':%s$'"
% port
) == 0
used_by_process = (
os.system(
"ss --numeric --listening --tcp --udp | awk '{print$5}' | grep --quiet --extended-regexp ':%s$'"
% port
)
== 0
)
# This second command is mean to cover (most) case where an app is using a port yet ain't currently using it for some reason (typically service ain't up)
used_by_app = os.system(
f"grep --quiet --extended-regexp \"port: '?{port}'?\" /etc/yunohost/apps/*/settings.yml"
) == 0
used_by_app = (
os.system(
f"grep --quiet --extended-regexp \"port: '?{port}'?\" /etc/yunohost/apps/*/settings.yml"
)
== 0
)
used_by_self_provisioning = port in self.ports_used_by_self
return used_by_process or used_by_app or used_by_self_provisioning