mirror of
https://github.com/YunoHost/yunohost.git
synced 2024-09-03 20:06:10 +02:00
[mod] run pyupgrade on source code
This commit is contained in:
parent
b35f27d580
commit
ad56275801
33 changed files with 126 additions and 128 deletions
|
@ -42,7 +42,7 @@ class BaseSystemDiagnoser(Diagnoser):
|
||||||
elif os.path.exists("/sys/devices/virtual/dmi/id/sys_vendor"):
|
elif os.path.exists("/sys/devices/virtual/dmi/id/sys_vendor"):
|
||||||
model = read_file("/sys/devices/virtual/dmi/id/sys_vendor").strip()
|
model = read_file("/sys/devices/virtual/dmi/id/sys_vendor").strip()
|
||||||
if os.path.exists("/sys/devices/virtual/dmi/id/product_name"):
|
if os.path.exists("/sys/devices/virtual/dmi/id/product_name"):
|
||||||
model = "%s %s" % (
|
model = "{} {}".format(
|
||||||
model,
|
model,
|
||||||
read_file("/sys/devices/virtual/dmi/id/product_name").strip(),
|
read_file("/sys/devices/virtual/dmi/id/product_name").strip(),
|
||||||
)
|
)
|
||||||
|
@ -116,7 +116,7 @@ class BaseSystemDiagnoser(Diagnoser):
|
||||||
bad_sury_packages = list(self.bad_sury_packages())
|
bad_sury_packages = list(self.bad_sury_packages())
|
||||||
if bad_sury_packages:
|
if bad_sury_packages:
|
||||||
cmd_to_fix = "apt install --allow-downgrades " + " ".join(
|
cmd_to_fix = "apt install --allow-downgrades " + " ".join(
|
||||||
["%s=%s" % (package, version) for package, version in bad_sury_packages]
|
["{}={}".format(package, version) for package, version in bad_sury_packages]
|
||||||
)
|
)
|
||||||
yield dict(
|
yield dict(
|
||||||
meta={"test": "packages_from_sury"},
|
meta={"test": "packages_from_sury"},
|
||||||
|
|
|
@ -167,7 +167,7 @@ class IPDiagnoser(Diagnoser):
|
||||||
|
|
||||||
assert (
|
assert (
|
||||||
resolvers != []
|
resolvers != []
|
||||||
), "Uhoh, need at least one IPv%s DNS resolver in %s ..." % (
|
), "Uhoh, need at least one IPv{} DNS resolver in {} ...".format(
|
||||||
protocol,
|
protocol,
|
||||||
resolver_file,
|
resolver_file,
|
||||||
)
|
)
|
||||||
|
@ -221,7 +221,7 @@ class IPDiagnoser(Diagnoser):
|
||||||
return download_text(url, timeout=30).strip()
|
return download_text(url, timeout=30).strip()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger_debug(
|
self.logger_debug(
|
||||||
"Could not get public IPv%s : %s" % (str(protocol), str(e))
|
"Could not get public IPv{} : {}".format(str(protocol), str(e))
|
||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
|
@ -132,7 +132,7 @@ class SystemResourcesDiagnoser(Diagnoser):
|
||||||
d for d in disk_partitions if d.mountpoint in ["/", "/var"]
|
d for d in disk_partitions if d.mountpoint in ["/", "/var"]
|
||||||
]
|
]
|
||||||
main_space = sum(
|
main_space = sum(
|
||||||
[psutil.disk_usage(d.mountpoint).total for d in main_disk_partitions]
|
psutil.disk_usage(d.mountpoint).total for d in main_disk_partitions
|
||||||
)
|
)
|
||||||
if main_space < 10 * GB:
|
if main_space < 10 * GB:
|
||||||
yield dict(
|
yield dict(
|
||||||
|
@ -156,7 +156,7 @@ class SystemResourcesDiagnoser(Diagnoser):
|
||||||
kills_count = self.recent_kills_by_oom_reaper()
|
kills_count = self.recent_kills_by_oom_reaper()
|
||||||
if kills_count:
|
if kills_count:
|
||||||
kills_summary = "\n".join(
|
kills_summary = "\n".join(
|
||||||
["%s (x%s)" % (proc, count) for proc, count in kills_count]
|
["{} (x{})".format(proc, count) for proc, count in kills_count]
|
||||||
)
|
)
|
||||||
|
|
||||||
yield dict(
|
yield dict(
|
||||||
|
@ -202,9 +202,9 @@ def human_size(bytes_):
|
||||||
# Adapted from https://stackoverflow.com/a/1094933
|
# Adapted from https://stackoverflow.com/a/1094933
|
||||||
for unit in ["", "ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
|
for unit in ["", "ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
|
||||||
if abs(bytes_) < 1024.0:
|
if abs(bytes_) < 1024.0:
|
||||||
return "%s %sB" % (round_(bytes_), unit)
|
return "{} {}B".format(round_(bytes_), unit)
|
||||||
bytes_ /= 1024.0
|
bytes_ /= 1024.0
|
||||||
return "%s %sB" % (round_(bytes_), "Yi")
|
return "{} {}B".format(round_(bytes_), "Yi")
|
||||||
|
|
||||||
|
|
||||||
def round_(n):
|
def round_(n):
|
||||||
|
|
|
@ -107,7 +107,7 @@ class Parser:
|
||||||
else:
|
else:
|
||||||
# We're getting out of a comment bloc, we should find
|
# We're getting out of a comment bloc, we should find
|
||||||
# the name of the function
|
# the name of the function
|
||||||
assert len(line.split()) >= 1, "Malformed line %s in %s" % (
|
assert len(line.split()) >= 1, "Malformed line {} in {}".format(
|
||||||
i,
|
i,
|
||||||
self.filename,
|
self.filename,
|
||||||
)
|
)
|
||||||
|
|
|
@ -122,7 +122,7 @@ def app_list(full=False, installed=False, filter=None):
|
||||||
try:
|
try:
|
||||||
app_info_dict = app_info(app_id, full=full)
|
app_info_dict = app_info(app_id, full=full)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Failed to read info for %s : %s" % (app_id, e))
|
logger.error("Failed to read info for {} : {}".format(app_id, e))
|
||||||
continue
|
continue
|
||||||
app_info_dict["id"] = app_id
|
app_info_dict["id"] = app_id
|
||||||
out.append(app_info_dict)
|
out.append(app_info_dict)
|
||||||
|
@ -1219,7 +1219,7 @@ def app_setting(app, key, value=None, delete=False):
|
||||||
)
|
)
|
||||||
|
|
||||||
permissions = user_permission_list(full=True, apps=[app])["permissions"]
|
permissions = user_permission_list(full=True, apps=[app])["permissions"]
|
||||||
permission_name = "%s.legacy_%s_uris" % (app, key.split("_")[0])
|
permission_name = "{}.legacy_{}_uris".format(app, key.split("_")[0])
|
||||||
permission = permissions.get(permission_name)
|
permission = permissions.get(permission_name)
|
||||||
|
|
||||||
# GET
|
# GET
|
||||||
|
@ -1562,7 +1562,7 @@ def app_action_run(operation_logger, app, action, args=None):
|
||||||
shutil.rmtree(tmp_workdir_for_app)
|
shutil.rmtree(tmp_workdir_for_app)
|
||||||
|
|
||||||
if retcode not in action_declaration.get("accepted_return_codes", [0]):
|
if retcode not in action_declaration.get("accepted_return_codes", [0]):
|
||||||
msg = "Error while executing action '%s' of app '%s': return code %s" % (
|
msg = "Error while executing action '{}' of app '{}': return code {}".format(
|
||||||
action,
|
action,
|
||||||
app,
|
app,
|
||||||
retcode,
|
retcode,
|
||||||
|
@ -1989,7 +1989,7 @@ def _set_default_ask_questions(arguments):
|
||||||
for question in questions_with_default
|
for question in questions_with_default
|
||||||
):
|
):
|
||||||
# The key is for example "app_manifest_install_ask_domain"
|
# The key is for example "app_manifest_install_ask_domain"
|
||||||
key = "app_manifest_%s_ask_%s" % (script_name, arg["name"])
|
key = "app_manifest_{}_ask_{}".format(script_name, arg["name"])
|
||||||
arg["ask"] = m18n.n(key)
|
arg["ask"] = m18n.n(key)
|
||||||
|
|
||||||
# Also it in fact doesn't make sense for any of those questions to have an example value nor a default value...
|
# Also it in fact doesn't make sense for any of those questions to have an example value nor a default value...
|
||||||
|
@ -2397,7 +2397,7 @@ def _make_environment_for_app_script(
|
||||||
env_dict["YNH_APP_BASEDIR"] = workdir
|
env_dict["YNH_APP_BASEDIR"] = workdir
|
||||||
|
|
||||||
for arg_name, arg_value in args.items():
|
for arg_name, arg_value in args.items():
|
||||||
env_dict["YNH_%s%s" % (args_prefix, arg_name.upper())] = str(arg_value)
|
env_dict["YNH_{}{}".format(args_prefix, arg_name.upper())] = str(arg_value)
|
||||||
|
|
||||||
return env_dict
|
return env_dict
|
||||||
|
|
||||||
|
|
|
@ -217,7 +217,7 @@ def _load_apps_catalog():
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise YunohostError(
|
raise YunohostError(
|
||||||
"Unable to read cache for apps_catalog %s : %s" % (cache_file, e),
|
"Unable to read cache for apps_catalog {} : {}".format(cache_file, e),
|
||||||
raw_msg=True,
|
raw_msg=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -2380,7 +2380,7 @@ def backup_list(with_info=False, human_readable=False):
|
||||||
# Get local archives sorted according to last modification time
|
# Get local archives sorted according to last modification time
|
||||||
# (we do a realpath() to resolve symlinks)
|
# (we do a realpath() to resolve symlinks)
|
||||||
archives = glob("%s/*.tar.gz" % ARCHIVES_PATH) + glob("%s/*.tar" % ARCHIVES_PATH)
|
archives = glob("%s/*.tar.gz" % ARCHIVES_PATH) + glob("%s/*.tar" % ARCHIVES_PATH)
|
||||||
archives = set([os.path.realpath(archive) for archive in archives])
|
archives = {os.path.realpath(archive) for archive in archives}
|
||||||
archives = sorted(archives, key=lambda x: os.path.getctime(x))
|
archives = sorted(archives, key=lambda x: os.path.getctime(x))
|
||||||
# Extract only filename without the extension
|
# Extract only filename without the extension
|
||||||
|
|
||||||
|
@ -2420,7 +2420,7 @@ def backup_download(name):
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
archive_file = "%s/%s.tar" % (ARCHIVES_PATH, name)
|
archive_file = "{}/{}.tar".format(ARCHIVES_PATH, name)
|
||||||
|
|
||||||
# Check file exist (even if it's a broken symlink)
|
# Check file exist (even if it's a broken symlink)
|
||||||
if not os.path.lexists(archive_file):
|
if not os.path.lexists(archive_file):
|
||||||
|
@ -2462,7 +2462,7 @@ def backup_info(name, with_details=False, human_readable=False):
|
||||||
elif name.endswith(".tar"):
|
elif name.endswith(".tar"):
|
||||||
name = name[: -len(".tar")]
|
name = name[: -len(".tar")]
|
||||||
|
|
||||||
archive_file = "%s/%s.tar" % (ARCHIVES_PATH, name)
|
archive_file = "{}/{}.tar".format(ARCHIVES_PATH, name)
|
||||||
|
|
||||||
# Check file exist (even if it's a broken symlink)
|
# Check file exist (even if it's a broken symlink)
|
||||||
if not os.path.lexists(archive_file):
|
if not os.path.lexists(archive_file):
|
||||||
|
@ -2480,7 +2480,7 @@ def backup_info(name, with_details=False, human_readable=False):
|
||||||
"backup_archive_broken_link", path=archive_file
|
"backup_archive_broken_link", path=archive_file
|
||||||
)
|
)
|
||||||
|
|
||||||
info_file = "%s/%s.info.json" % (ARCHIVES_PATH, name)
|
info_file = "{}/{}.info.json".format(ARCHIVES_PATH, name)
|
||||||
|
|
||||||
if not os.path.exists(info_file):
|
if not os.path.exists(info_file):
|
||||||
tar = tarfile.open(
|
tar = tarfile.open(
|
||||||
|
@ -2591,10 +2591,10 @@ def backup_delete(name):
|
||||||
|
|
||||||
hook_callback("pre_backup_delete", args=[name])
|
hook_callback("pre_backup_delete", args=[name])
|
||||||
|
|
||||||
archive_file = "%s/%s.tar" % (ARCHIVES_PATH, name)
|
archive_file = "{}/{}.tar".format(ARCHIVES_PATH, name)
|
||||||
if not os.path.exists(archive_file) and os.path.exists(archive_file + ".gz"):
|
if not os.path.exists(archive_file) and os.path.exists(archive_file + ".gz"):
|
||||||
archive_file += ".gz"
|
archive_file += ".gz"
|
||||||
info_file = "%s/%s.info.json" % (ARCHIVES_PATH, name)
|
info_file = "{}/{}.info.json".format(ARCHIVES_PATH, name)
|
||||||
|
|
||||||
files_to_delete = [archive_file, info_file]
|
files_to_delete = [archive_file, info_file]
|
||||||
|
|
||||||
|
@ -2693,5 +2693,5 @@ def binary_to_human(n, customary=False):
|
||||||
for s in reversed(symbols):
|
for s in reversed(symbols):
|
||||||
if n >= prefix[s]:
|
if n >= prefix[s]:
|
||||||
value = float(n) / prefix[s]
|
value = float(n) / prefix[s]
|
||||||
return "%.1f%s" % (value, s)
|
return "{:.1f}{}".format(value, s)
|
||||||
return "%s" % n
|
return "%s" % n
|
||||||
|
|
|
@ -143,7 +143,7 @@ def _certificate_install_selfsigned(domain_list, force=False):
|
||||||
|
|
||||||
# Paths of files and folder we'll need
|
# Paths of files and folder we'll need
|
||||||
date_tag = datetime.utcnow().strftime("%Y%m%d.%H%M%S")
|
date_tag = datetime.utcnow().strftime("%Y%m%d.%H%M%S")
|
||||||
new_cert_folder = "%s/%s-history/%s-selfsigned" % (
|
new_cert_folder = "{}/{}-history/{}-selfsigned".format(
|
||||||
CERT_FOLDER,
|
CERT_FOLDER,
|
||||||
domain,
|
domain,
|
||||||
date_tag,
|
date_tag,
|
||||||
|
@ -300,7 +300,7 @@ def _certificate_install_letsencrypt(
|
||||||
try:
|
try:
|
||||||
_fetch_and_enable_new_certificate(domain, staging, no_checks=no_checks)
|
_fetch_and_enable_new_certificate(domain, staging, no_checks=no_checks)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
msg = "Certificate installation for %s failed !\nException: %s" % (
|
msg = "Certificate installation for {} failed !\nException: {}".format(
|
||||||
domain,
|
domain,
|
||||||
e,
|
e,
|
||||||
)
|
)
|
||||||
|
@ -457,20 +457,20 @@ def _email_renewing_failed(domain, exception_message, stack=""):
|
||||||
|
|
||||||
logs = _tail(50, "/var/log/yunohost/yunohost-cli.log")
|
logs = _tail(50, "/var/log/yunohost/yunohost-cli.log")
|
||||||
text = """
|
text = """
|
||||||
An attempt for renewing the certificate for domain %s failed with the following
|
An attempt for renewing the certificate for domain {} failed with the following
|
||||||
error :
|
error :
|
||||||
|
|
||||||
%s
|
{}
|
||||||
%s
|
{}
|
||||||
|
|
||||||
Here's the tail of /var/log/yunohost/yunohost-cli.log, which might help to
|
Here's the tail of /var/log/yunohost/yunohost-cli.log, which might help to
|
||||||
investigate :
|
investigate :
|
||||||
|
|
||||||
%s
|
{}
|
||||||
|
|
||||||
-- Certificate Manager
|
-- Certificate Manager
|
||||||
|
|
||||||
""" % (
|
""".format(
|
||||||
domain,
|
domain,
|
||||||
exception_message,
|
exception_message,
|
||||||
stack,
|
stack,
|
||||||
|
@ -478,12 +478,12 @@ investigate :
|
||||||
)
|
)
|
||||||
|
|
||||||
message = """\
|
message = """\
|
||||||
From: %s
|
From: {}
|
||||||
To: %s
|
To: {}
|
||||||
Subject: %s
|
Subject: {}
|
||||||
|
|
||||||
%s
|
{}
|
||||||
""" % (
|
""".format(
|
||||||
from_,
|
from_,
|
||||||
to_,
|
to_,
|
||||||
subject_,
|
subject_,
|
||||||
|
@ -532,7 +532,7 @@ def _fetch_and_enable_new_certificate(domain, staging=False, no_checks=False):
|
||||||
# Prepare certificate signing request
|
# Prepare certificate signing request
|
||||||
logger.debug("Prepare key and certificate signing request (CSR) for %s...", domain)
|
logger.debug("Prepare key and certificate signing request (CSR) for %s...", domain)
|
||||||
|
|
||||||
domain_key_file = "%s/%s.pem" % (TMP_FOLDER, domain)
|
domain_key_file = "{}/{}.pem".format(TMP_FOLDER, domain)
|
||||||
_generate_key(domain_key_file)
|
_generate_key(domain_key_file)
|
||||||
_set_permissions(domain_key_file, "root", "ssl-cert", 0o640)
|
_set_permissions(domain_key_file, "root", "ssl-cert", 0o640)
|
||||||
|
|
||||||
|
@ -541,7 +541,7 @@ def _fetch_and_enable_new_certificate(domain, staging=False, no_checks=False):
|
||||||
# Sign the certificate
|
# Sign the certificate
|
||||||
logger.debug("Now using ACME Tiny to sign the certificate...")
|
logger.debug("Now using ACME Tiny to sign the certificate...")
|
||||||
|
|
||||||
domain_csr_file = "%s/%s.csr" % (TMP_FOLDER, domain)
|
domain_csr_file = "{}/{}.csr".format(TMP_FOLDER, domain)
|
||||||
|
|
||||||
if staging:
|
if staging:
|
||||||
certification_authority = STAGING_CERTIFICATION_AUTHORITY
|
certification_authority = STAGING_CERTIFICATION_AUTHORITY
|
||||||
|
@ -580,7 +580,7 @@ def _fetch_and_enable_new_certificate(domain, staging=False, no_checks=False):
|
||||||
else:
|
else:
|
||||||
folder_flag = "letsencrypt"
|
folder_flag = "letsencrypt"
|
||||||
|
|
||||||
new_cert_folder = "%s/%s-history/%s-%s" % (
|
new_cert_folder = "{}/{}-history/{}-{}".format(
|
||||||
CERT_FOLDER,
|
CERT_FOLDER,
|
||||||
domain,
|
domain,
|
||||||
date_tag,
|
date_tag,
|
||||||
|
@ -642,7 +642,7 @@ def _prepare_certificate_signing_request(domain, key_file, output_folder):
|
||||||
csr.add_extensions(
|
csr.add_extensions(
|
||||||
[
|
[
|
||||||
crypto.X509Extension(
|
crypto.X509Extension(
|
||||||
"subjectAltName".encode("utf8"),
|
b"subjectAltName",
|
||||||
False,
|
False,
|
||||||
("DNS:" + subdomain).encode("utf8"),
|
("DNS:" + subdomain).encode("utf8"),
|
||||||
)
|
)
|
||||||
|
@ -844,7 +844,7 @@ def _backup_current_cert(domain):
|
||||||
cert_folder_domain = os.path.join(CERT_FOLDER, domain)
|
cert_folder_domain = os.path.join(CERT_FOLDER, domain)
|
||||||
|
|
||||||
date_tag = datetime.utcnow().strftime("%Y%m%d.%H%M%S")
|
date_tag = datetime.utcnow().strftime("%Y%m%d.%H%M%S")
|
||||||
backup_folder = "%s-backups/%s" % (cert_folder_domain, date_tag)
|
backup_folder = "{}-backups/{}".format(cert_folder_domain, date_tag)
|
||||||
|
|
||||||
shutil.copytree(cert_folder_domain, backup_folder)
|
shutil.copytree(cert_folder_domain, backup_folder)
|
||||||
|
|
||||||
|
|
|
@ -269,14 +269,14 @@ class MyMigration(Migration):
|
||||||
% default_crt
|
% default_crt
|
||||||
)
|
)
|
||||||
|
|
||||||
os.system("mv %s %s.old" % (default_crt, default_crt))
|
os.system("mv {} {}.old".format(default_crt, default_crt))
|
||||||
os.system("mv %s %s.old" % (default_key, default_key))
|
os.system("mv {} {}.old".format(default_key, default_key))
|
||||||
ret = os.system("/usr/share/yunohost/hooks/conf_regen/02-ssl init")
|
ret = os.system("/usr/share/yunohost/hooks/conf_regen/02-ssl init")
|
||||||
|
|
||||||
if ret != 0 or not os.path.exists(default_crt):
|
if ret != 0 or not os.path.exists(default_crt):
|
||||||
logger.error("Upgrading the certificate failed ... reverting")
|
logger.error("Upgrading the certificate failed ... reverting")
|
||||||
os.system("mv %s.old %s" % (default_crt, default_crt))
|
os.system("mv {}.old {}".format(default_crt, default_crt))
|
||||||
os.system("mv %s.old %s" % (default_key, default_key))
|
os.system("mv {}.old {}".format(default_key, default_key))
|
||||||
|
|
||||||
signatures = {cert: check_output(cmd % cert) for cert in active_certs}
|
signatures = {cert: check_output(cmd % cert) for cert in active_certs}
|
||||||
|
|
||||||
|
|
|
@ -640,7 +640,7 @@ class Diagnoser:
|
||||||
elif ipversion == 6:
|
elif ipversion == 6:
|
||||||
socket.getaddrinfo = getaddrinfo_ipv6_only
|
socket.getaddrinfo = getaddrinfo_ipv6_only
|
||||||
|
|
||||||
url = "https://%s/%s" % (DIAGNOSIS_SERVER, uri)
|
url = "https://{}/{}".format(DIAGNOSIS_SERVER, uri)
|
||||||
try:
|
try:
|
||||||
r = requests.post(url, json=data, timeout=timeout)
|
r = requests.post(url, json=data, timeout=timeout)
|
||||||
finally:
|
finally:
|
||||||
|
@ -679,7 +679,7 @@ def _email_diagnosis_issues():
|
||||||
from yunohost.domain import _get_maindomain
|
from yunohost.domain import _get_maindomain
|
||||||
|
|
||||||
maindomain = _get_maindomain()
|
maindomain = _get_maindomain()
|
||||||
from_ = "diagnosis@%s (Automatic diagnosis on %s)" % (maindomain, maindomain)
|
from_ = "diagnosis@{} (Automatic diagnosis on {})".format(maindomain, maindomain)
|
||||||
to_ = "root"
|
to_ = "root"
|
||||||
subject_ = "Issues found by automatic diagnosis on %s" % maindomain
|
subject_ = "Issues found by automatic diagnosis on %s" % maindomain
|
||||||
|
|
||||||
|
@ -692,16 +692,16 @@ def _email_diagnosis_issues():
|
||||||
content = _dump_human_readable_reports(issues)
|
content = _dump_human_readable_reports(issues)
|
||||||
|
|
||||||
message = """\
|
message = """\
|
||||||
From: %s
|
From: {}
|
||||||
To: %s
|
To: {}
|
||||||
Subject: %s
|
Subject: {}
|
||||||
|
|
||||||
%s
|
{}
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
%s
|
{}
|
||||||
""" % (
|
""".format(
|
||||||
from_,
|
from_,
|
||||||
to_,
|
to_,
|
||||||
subject_,
|
subject_,
|
||||||
|
|
|
@ -762,7 +762,7 @@ def domain_dns_push(operation_logger, domain, dry_run=False, force=False, purge=
|
||||||
changes = {"delete": [], "update": [], "create": [], "unchanged": []}
|
changes = {"delete": [], "update": [], "create": [], "unchanged": []}
|
||||||
|
|
||||||
type_and_names = sorted(
|
type_and_names = sorted(
|
||||||
set([(r["type"], r["name"]) for r in current_records + wanted_records])
|
{(r["type"], r["name"]) for r in current_records + wanted_records}
|
||||||
)
|
)
|
||||||
comparison = {
|
comparison = {
|
||||||
type_and_name: {"current": [], "wanted": []} for type_and_name in type_and_names
|
type_and_name: {"current": [], "wanted": []} for type_and_name in type_and_names
|
||||||
|
|
|
@ -151,7 +151,7 @@ def dyndns_subscribe(operation_logger, domain=None, key=None):
|
||||||
try:
|
try:
|
||||||
error = json.loads(r.text)["error"]
|
error = json.loads(r.text)["error"]
|
||||||
except Exception:
|
except Exception:
|
||||||
error = 'Server error, code: %s. (Message: "%s")' % (r.status_code, r.text)
|
error = 'Server error, code: {}. (Message: "{}")'.format(r.status_code, r.text)
|
||||||
raise YunohostError("dyndns_registration_failed", error=error)
|
raise YunohostError("dyndns_registration_failed", error=error)
|
||||||
|
|
||||||
# Yunohost regen conf will add the dyndns cron job if a private key exists
|
# Yunohost regen conf will add the dyndns cron job if a private key exists
|
||||||
|
@ -196,7 +196,7 @@ def dyndns_update(
|
||||||
|
|
||||||
# If key is not given, pick the first file we find with the domain given
|
# If key is not given, pick the first file we find with the domain given
|
||||||
elif key is None:
|
elif key is None:
|
||||||
keys = glob.glob("/etc/yunohost/dyndns/K{0}.+*.private".format(domain))
|
keys = glob.glob("/etc/yunohost/dyndns/K{}.+*.private".format(domain))
|
||||||
|
|
||||||
if not keys:
|
if not keys:
|
||||||
raise YunohostValidationError("dyndns_key_not_found")
|
raise YunohostValidationError("dyndns_key_not_found")
|
||||||
|
@ -263,14 +263,14 @@ def dyndns_update(
|
||||||
return None
|
return None
|
||||||
|
|
||||||
raise YunohostError(
|
raise YunohostError(
|
||||||
"Failed to resolve %s for %s" % (rdtype, domain), raw_msg=True
|
"Failed to resolve {} for {}".format(rdtype, domain), raw_msg=True
|
||||||
)
|
)
|
||||||
|
|
||||||
old_ipv4 = resolve_domain(domain, "A")
|
old_ipv4 = resolve_domain(domain, "A")
|
||||||
old_ipv6 = resolve_domain(domain, "AAAA")
|
old_ipv6 = resolve_domain(domain, "AAAA")
|
||||||
|
|
||||||
logger.debug("Old IPv4/v6 are (%s, %s)" % (old_ipv4, old_ipv6))
|
logger.debug("Old IPv4/v6 are ({}, {})".format(old_ipv4, old_ipv6))
|
||||||
logger.debug("Requested IPv4/v6 are (%s, %s)" % (ipv4, ipv6))
|
logger.debug("Requested IPv4/v6 are ({}, {})".format(ipv4, ipv6))
|
||||||
|
|
||||||
# no need to update
|
# no need to update
|
||||||
if (not force and not dry_run) and (old_ipv4 == ipv4 and old_ipv6 == ipv6):
|
if (not force and not dry_run) and (old_ipv4 == ipv4 and old_ipv6 == ipv6):
|
||||||
|
|
|
@ -156,7 +156,7 @@ def hook_list(action, list_by="name", show_info=False):
|
||||||
try:
|
try:
|
||||||
d[priority].add(name)
|
d[priority].add(name)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
d[priority] = set([name])
|
d[priority] = {name}
|
||||||
|
|
||||||
elif list_by == "name" or list_by == "folder":
|
elif list_by == "name" or list_by == "folder":
|
||||||
if show_info:
|
if show_info:
|
||||||
|
@ -197,7 +197,7 @@ def hook_list(action, list_by="name", show_info=False):
|
||||||
or (f.startswith("__") and f.endswith("__"))
|
or (f.startswith("__") and f.endswith("__"))
|
||||||
):
|
):
|
||||||
continue
|
continue
|
||||||
path = "%s%s/%s" % (folder, action, f)
|
path = "{}{}/{}".format(folder, action, f)
|
||||||
priority, name = _extract_filename_parts(f)
|
priority, name = _extract_filename_parts(f)
|
||||||
_append_hook(d, priority, name, path)
|
_append_hook(d, priority, name, path)
|
||||||
|
|
||||||
|
@ -407,7 +407,7 @@ def _hook_exec_bash(path, args, chdir, env, user, return_format, loggers):
|
||||||
if not chdir:
|
if not chdir:
|
||||||
# use the script directory as current one
|
# use the script directory as current one
|
||||||
chdir, cmd_script = os.path.split(path)
|
chdir, cmd_script = os.path.split(path)
|
||||||
cmd_script = "./{0}".format(cmd_script)
|
cmd_script = "./{}".format(cmd_script)
|
||||||
else:
|
else:
|
||||||
cmd_script = path
|
cmd_script = path
|
||||||
|
|
||||||
|
|
|
@ -544,7 +544,7 @@ class OperationLogger(object):
|
||||||
# We use proc.open_files() to list files opened / actively used by this proc
|
# We use proc.open_files() to list files opened / actively used by this proc
|
||||||
# We only keep files matching a recent yunohost operation log
|
# We only keep files matching a recent yunohost operation log
|
||||||
active_logs = sorted(
|
active_logs = sorted(
|
||||||
[f.path for f in proc.open_files() if f.path in recent_operation_logs],
|
(f.path for f in proc.open_files() if f.path in recent_operation_logs),
|
||||||
key=os.path.getctime,
|
key=os.path.getctime,
|
||||||
reverse=True,
|
reverse=True,
|
||||||
)
|
)
|
||||||
|
|
|
@ -139,7 +139,7 @@ def user_permission_list(
|
||||||
continue
|
continue
|
||||||
main_perm_label = permissions[main_perm_name]["label"]
|
main_perm_label = permissions[main_perm_name]["label"]
|
||||||
infos["sublabel"] = infos["label"]
|
infos["sublabel"] = infos["label"]
|
||||||
infos["label"] = "%s (%s)" % (main_perm_label, infos["label"])
|
infos["label"] = "{} ({})".format(main_perm_label, infos["label"])
|
||||||
|
|
||||||
if short:
|
if short:
|
||||||
permissions = list(permissions.keys())
|
permissions = list(permissions.keys())
|
||||||
|
@ -664,13 +664,11 @@ def permission_sync_to_user():
|
||||||
currently_allowed_users = set(permission_infos["corresponding_users"])
|
currently_allowed_users = set(permission_infos["corresponding_users"])
|
||||||
|
|
||||||
# These are the users that should be allowed because they are member of a group that is allowed for this permission ...
|
# These are the users that should be allowed because they are member of a group that is allowed for this permission ...
|
||||||
should_be_allowed_users = set(
|
should_be_allowed_users = {
|
||||||
[
|
|
||||||
user
|
user
|
||||||
for group in permission_infos["allowed"]
|
for group in permission_infos["allowed"]
|
||||||
for user in groups[group]["members"]
|
for user in groups[group]["members"]
|
||||||
]
|
}
|
||||||
)
|
|
||||||
|
|
||||||
# Note that a LDAP operation with the same value that is in LDAP crash SLAP.
|
# Note that a LDAP operation with the same value that is in LDAP crash SLAP.
|
||||||
# So we need to check before each ldap operation that we really change something in LDAP
|
# So we need to check before each ldap operation that we really change something in LDAP
|
||||||
|
|
|
@ -640,7 +640,7 @@ def _process_regen_conf(system_conf, new_conf=None, save=True):
|
||||||
if save:
|
if save:
|
||||||
backup_path = os.path.join(
|
backup_path = os.path.join(
|
||||||
BACKUP_CONF_DIR,
|
BACKUP_CONF_DIR,
|
||||||
"{0}-{1}".format(
|
"{}-{}".format(
|
||||||
system_conf.lstrip("/"), datetime.utcnow().strftime("%Y%m%d.%H%M%S")
|
system_conf.lstrip("/"), datetime.utcnow().strftime("%Y%m%d.%H%M%S")
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
|
@ -625,7 +625,7 @@ def _run_service_command(action, service):
|
||||||
% (action, ", ".join(possible_actions))
|
% (action, ", ".join(possible_actions))
|
||||||
)
|
)
|
||||||
|
|
||||||
cmd = "systemctl %s %s" % (action, service)
|
cmd = "systemctl {} {}".format(action, service)
|
||||||
|
|
||||||
need_lock = services[service].get("need_lock", False) and action in [
|
need_lock = services[service].get("need_lock", False) and action in [
|
||||||
"start",
|
"start",
|
||||||
|
@ -673,7 +673,7 @@ def _give_lock(action, service, p):
|
||||||
else:
|
else:
|
||||||
systemctl_PID_name = "ControlPID"
|
systemctl_PID_name = "ControlPID"
|
||||||
|
|
||||||
cmd_get_son_PID = "systemctl show %s -p %s" % (service, systemctl_PID_name)
|
cmd_get_son_PID = "systemctl show {} -p {}".format(service, systemctl_PID_name)
|
||||||
son_PID = 0
|
son_PID = 0
|
||||||
# As long as we did not found the PID and that the command is still running
|
# As long as we did not found the PID and that the command is still running
|
||||||
while son_PID == 0 and p.poll() is None:
|
while son_PID == 0 and p.poll() is None:
|
||||||
|
@ -687,7 +687,7 @@ def _give_lock(action, service, p):
|
||||||
if son_PID != 0:
|
if son_PID != 0:
|
||||||
# Append the PID to the lock file
|
# Append the PID to the lock file
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Giving a lock to PID %s for service %s !" % (str(son_PID), service)
|
"Giving a lock to PID {} for service {} !".format(str(son_PID), service)
|
||||||
)
|
)
|
||||||
append_to_file(MOULINETTE_LOCK, "\n%s" % str(son_PID))
|
append_to_file(MOULINETTE_LOCK, "\n%s" % str(son_PID))
|
||||||
|
|
||||||
|
@ -865,7 +865,7 @@ def _get_journalctl_logs(service, number="all"):
|
||||||
systemd_service = services.get(service, {}).get("actual_systemd_service", service)
|
systemd_service = services.get(service, {}).get("actual_systemd_service", service)
|
||||||
try:
|
try:
|
||||||
return check_output(
|
return check_output(
|
||||||
"journalctl --no-hostname --no-pager -u {0} -n{1}".format(
|
"journalctl --no-hostname --no-pager -u {} -n{}".format(
|
||||||
systemd_service, number
|
systemd_service, number
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
|
@ -224,7 +224,7 @@ def settings_set(key, value):
|
||||||
try:
|
try:
|
||||||
trigger_post_change_hook(key, old_value, value)
|
trigger_post_change_hook(key, old_value, value)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Post-change hook for setting %s failed : %s" % (key, e))
|
logger.error("Post-change hook for setting {} failed : {}".format(key, e))
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -132,7 +132,7 @@ def test_apps_catalog_update_nominal(mocker):
|
||||||
catalog = app_catalog(with_categories=True)
|
catalog = app_catalog(with_categories=True)
|
||||||
|
|
||||||
assert "apps" in catalog
|
assert "apps" in catalog
|
||||||
assert set(catalog["apps"].keys()) == set(["foo", "bar"])
|
assert set(catalog["apps"].keys()) == {"foo", "bar"}
|
||||||
|
|
||||||
assert "categories" in catalog
|
assert "categories" in catalog
|
||||||
assert [c["id"] for c in catalog["categories"]] == ["yolo", "swag"]
|
assert [c["id"] for c in catalog["categories"]] == ["yolo", "swag"]
|
||||||
|
|
|
@ -70,7 +70,7 @@ def legacy_app(request):
|
||||||
|
|
||||||
app_install(
|
app_install(
|
||||||
os.path.join(get_test_apps_dir(), "legacy_app_ynh"),
|
os.path.join(get_test_apps_dir(), "legacy_app_ynh"),
|
||||||
args="domain=%s&path=%s&is_public=%s" % (main_domain, "/", 1),
|
args="domain={}&path={}&is_public={}".format(main_domain, "/", 1),
|
||||||
force=True,
|
force=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -111,7 +111,7 @@ def secondary_domain(request):
|
||||||
|
|
||||||
def app_expected_files(domain, app):
|
def app_expected_files(domain, app):
|
||||||
|
|
||||||
yield "/etc/nginx/conf.d/%s.d/%s.conf" % (domain, app)
|
yield "/etc/nginx/conf.d/{}.d/{}.conf".format(domain, app)
|
||||||
if app.startswith("legacy_app"):
|
if app.startswith("legacy_app"):
|
||||||
yield "/var/www/%s/index.html" % app
|
yield "/var/www/%s/index.html" % app
|
||||||
yield "/etc/yunohost/apps/%s/settings.yml" % app
|
yield "/etc/yunohost/apps/%s/settings.yml" % app
|
||||||
|
@ -152,7 +152,7 @@ def install_legacy_app(domain, path, public=True):
|
||||||
|
|
||||||
app_install(
|
app_install(
|
||||||
os.path.join(get_test_apps_dir(), "legacy_app_ynh"),
|
os.path.join(get_test_apps_dir(), "legacy_app_ynh"),
|
||||||
args="domain=%s&path=%s&is_public=%s" % (domain, path, 1 if public else 0),
|
args="domain={}&path={}&is_public={}".format(domain, path, 1 if public else 0),
|
||||||
force=True,
|
force=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -170,7 +170,7 @@ def install_break_yo_system(domain, breakwhat):
|
||||||
|
|
||||||
app_install(
|
app_install(
|
||||||
os.path.join(get_test_apps_dir(), "break_yo_system_ynh"),
|
os.path.join(get_test_apps_dir(), "break_yo_system_ynh"),
|
||||||
args="domain=%s&breakwhat=%s" % (domain, breakwhat),
|
args="domain={}&breakwhat={}".format(domain, breakwhat),
|
||||||
force=True,
|
force=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -338,7 +338,7 @@ def test_legacy_app_failed_remove(mocker, secondary_domain):
|
||||||
|
|
||||||
# The remove script runs with set -eu and attempt to remove this
|
# The remove script runs with set -eu and attempt to remove this
|
||||||
# file without -f, so will fail if it's not there ;)
|
# file without -f, so will fail if it's not there ;)
|
||||||
os.remove("/etc/nginx/conf.d/%s.d/%s.conf" % (secondary_domain, "legacy_app"))
|
os.remove("/etc/nginx/conf.d/{}.d/{}.conf".format(secondary_domain, "legacy_app"))
|
||||||
|
|
||||||
# TODO / FIXME : can't easily validate that 'app_not_properly_removed'
|
# TODO / FIXME : can't easily validate that 'app_not_properly_removed'
|
||||||
# is triggered for weird reasons ...
|
# is triggered for weird reasons ...
|
||||||
|
|
|
@ -99,7 +99,7 @@ def test_registerurl():
|
||||||
|
|
||||||
app_install(
|
app_install(
|
||||||
os.path.join(get_test_apps_dir(), "register_url_app_ynh"),
|
os.path.join(get_test_apps_dir(), "register_url_app_ynh"),
|
||||||
args="domain=%s&path=%s" % (maindomain, "/urlregisterapp"),
|
args="domain={}&path={}".format(maindomain, "/urlregisterapp"),
|
||||||
force=True,
|
force=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -109,7 +109,7 @@ def test_registerurl():
|
||||||
with pytest.raises(YunohostError):
|
with pytest.raises(YunohostError):
|
||||||
app_install(
|
app_install(
|
||||||
os.path.join(get_test_apps_dir(), "register_url_app_ynh"),
|
os.path.join(get_test_apps_dir(), "register_url_app_ynh"),
|
||||||
args="domain=%s&path=%s" % (maindomain, "/urlregisterapp"),
|
args="domain={}&path={}".format(maindomain, "/urlregisterapp"),
|
||||||
force=True,
|
force=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -119,7 +119,7 @@ def test_registerurl_baddomain():
|
||||||
with pytest.raises(YunohostError):
|
with pytest.raises(YunohostError):
|
||||||
app_install(
|
app_install(
|
||||||
os.path.join(get_test_apps_dir(), "register_url_app_ynh"),
|
os.path.join(get_test_apps_dir(), "register_url_app_ynh"),
|
||||||
args="domain=%s&path=%s" % ("yolo.swag", "/urlregisterapp"),
|
args="domain={}&path={}".format("yolo.swag", "/urlregisterapp"),
|
||||||
force=True,
|
force=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -234,7 +234,7 @@ def test_normalize_permission_path_with_unknown_domain():
|
||||||
def test_normalize_permission_path_conflicting_path():
|
def test_normalize_permission_path_conflicting_path():
|
||||||
app_install(
|
app_install(
|
||||||
os.path.join(get_test_apps_dir(), "register_url_app_ynh"),
|
os.path.join(get_test_apps_dir(), "register_url_app_ynh"),
|
||||||
args="domain=%s&path=%s" % (maindomain, "/url/registerapp"),
|
args="domain={}&path={}".format(maindomain, "/url/registerapp"),
|
||||||
force=True,
|
force=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -139,7 +139,7 @@ def app_is_installed(app):
|
||||||
|
|
||||||
# These are files we know should be installed by the app
|
# These are files we know should be installed by the app
|
||||||
app_files = []
|
app_files = []
|
||||||
app_files.append("/etc/nginx/conf.d/%s.d/%s.conf" % (maindomain, app))
|
app_files.append("/etc/nginx/conf.d/{}.d/{}.conf".format(maindomain, app))
|
||||||
app_files.append("/var/www/%s/index.html" % app)
|
app_files.append("/var/www/%s/index.html" % app)
|
||||||
app_files.append("/etc/importantfile")
|
app_files.append("/etc/importantfile")
|
||||||
|
|
||||||
|
@ -214,7 +214,7 @@ def install_app(app, path, additionnal_args=""):
|
||||||
|
|
||||||
app_install(
|
app_install(
|
||||||
os.path.join(get_test_apps_dir(), app),
|
os.path.join(get_test_apps_dir(), app),
|
||||||
args="domain=%s&path=%s%s" % (maindomain, path, additionnal_args),
|
args="domain={}&path={}{}".format(maindomain, path, additionnal_args),
|
||||||
force=True,
|
force=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -26,7 +26,7 @@ def teardown_function(function):
|
||||||
def install_changeurl_app(path):
|
def install_changeurl_app(path):
|
||||||
app_install(
|
app_install(
|
||||||
os.path.join(get_test_apps_dir(), "change_url_app_ynh"),
|
os.path.join(get_test_apps_dir(), "change_url_app_ynh"),
|
||||||
args="domain=%s&path=%s" % (maindomain, path),
|
args="domain={}&path={}".format(maindomain, path),
|
||||||
force=True,
|
force=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -347,7 +347,7 @@ def check_permission_for_apps():
|
||||||
# {"bar", "foo"}
|
# {"bar", "foo"}
|
||||||
# and compare this to the list of installed apps ...
|
# and compare this to the list of installed apps ...
|
||||||
|
|
||||||
app_perms_prefix = set(p.split(".")[0] for p in app_perms)
|
app_perms_prefix = {p.split(".")[0] for p in app_perms}
|
||||||
|
|
||||||
assert set(_installed_apps()) == app_perms_prefix
|
assert set(_installed_apps()) == app_perms_prefix
|
||||||
|
|
||||||
|
@ -398,7 +398,7 @@ def test_permission_list():
|
||||||
assert res["wiki.main"]["allowed"] == ["all_users"]
|
assert res["wiki.main"]["allowed"] == ["all_users"]
|
||||||
assert res["blog.main"]["allowed"] == ["alice"]
|
assert res["blog.main"]["allowed"] == ["alice"]
|
||||||
assert res["blog.api"]["allowed"] == ["visitors"]
|
assert res["blog.api"]["allowed"] == ["visitors"]
|
||||||
assert set(res["wiki.main"]["corresponding_users"]) == set(["alice", "bob"])
|
assert set(res["wiki.main"]["corresponding_users"]) == {"alice", "bob"}
|
||||||
assert res["blog.main"]["corresponding_users"] == ["alice"]
|
assert res["blog.main"]["corresponding_users"] == ["alice"]
|
||||||
assert res["blog.api"]["corresponding_users"] == []
|
assert res["blog.api"]["corresponding_users"] == []
|
||||||
assert res["wiki.main"]["url"] == "/"
|
assert res["wiki.main"]["url"] == "/"
|
||||||
|
@ -442,7 +442,7 @@ def test_permission_create_main(mocker):
|
||||||
res = user_permission_list(full=True)["permissions"]
|
res = user_permission_list(full=True)["permissions"]
|
||||||
assert "site.main" in res
|
assert "site.main" in res
|
||||||
assert res["site.main"]["allowed"] == ["all_users"]
|
assert res["site.main"]["allowed"] == ["all_users"]
|
||||||
assert set(res["site.main"]["corresponding_users"]) == set(["alice", "bob"])
|
assert set(res["site.main"]["corresponding_users"]) == {"alice", "bob"}
|
||||||
assert res["site.main"]["protected"] is False
|
assert res["site.main"]["protected"] is False
|
||||||
|
|
||||||
|
|
||||||
|
@ -630,8 +630,8 @@ def test_permission_add_group(mocker):
|
||||||
user_permission_update("wiki.main", add="alice")
|
user_permission_update("wiki.main", add="alice")
|
||||||
|
|
||||||
res = user_permission_list(full=True)["permissions"]
|
res = user_permission_list(full=True)["permissions"]
|
||||||
assert set(res["wiki.main"]["allowed"]) == set(["all_users", "alice"])
|
assert set(res["wiki.main"]["allowed"]) == {"all_users", "alice"}
|
||||||
assert set(res["wiki.main"]["corresponding_users"]) == set(["alice", "bob"])
|
assert set(res["wiki.main"]["corresponding_users"]) == {"alice", "bob"}
|
||||||
|
|
||||||
|
|
||||||
def test_permission_remove_group(mocker):
|
def test_permission_remove_group(mocker):
|
||||||
|
@ -680,7 +680,7 @@ def test_permission_reset(mocker):
|
||||||
|
|
||||||
res = user_permission_list(full=True)["permissions"]
|
res = user_permission_list(full=True)["permissions"]
|
||||||
assert res["blog.main"]["allowed"] == ["all_users"]
|
assert res["blog.main"]["allowed"] == ["all_users"]
|
||||||
assert set(res["blog.main"]["corresponding_users"]) == set(["alice", "bob"])
|
assert set(res["blog.main"]["corresponding_users"]) == {"alice", "bob"}
|
||||||
|
|
||||||
|
|
||||||
def test_permission_reset_idempotency():
|
def test_permission_reset_idempotency():
|
||||||
|
@ -690,7 +690,7 @@ def test_permission_reset_idempotency():
|
||||||
|
|
||||||
res = user_permission_list(full=True)["permissions"]
|
res = user_permission_list(full=True)["permissions"]
|
||||||
assert res["blog.main"]["allowed"] == ["all_users"]
|
assert res["blog.main"]["allowed"] == ["all_users"]
|
||||||
assert set(res["blog.main"]["corresponding_users"]) == set(["alice", "bob"])
|
assert set(res["blog.main"]["corresponding_users"]) == {"alice", "bob"}
|
||||||
|
|
||||||
|
|
||||||
def test_permission_change_label(mocker):
|
def test_permission_change_label(mocker):
|
||||||
|
@ -1013,9 +1013,9 @@ def test_permission_app_install():
|
||||||
assert res["permissions_app.dev"]["url"] == "/dev"
|
assert res["permissions_app.dev"]["url"] == "/dev"
|
||||||
|
|
||||||
assert res["permissions_app.main"]["allowed"] == ["all_users"]
|
assert res["permissions_app.main"]["allowed"] == ["all_users"]
|
||||||
assert set(res["permissions_app.main"]["corresponding_users"]) == set(
|
assert set(res["permissions_app.main"]["corresponding_users"]) == {
|
||||||
["alice", "bob"]
|
"alice", "bob"
|
||||||
)
|
}
|
||||||
|
|
||||||
assert res["permissions_app.admin"]["allowed"] == ["alice"]
|
assert res["permissions_app.admin"]["allowed"] == ["alice"]
|
||||||
assert res["permissions_app.admin"]["corresponding_users"] == ["alice"]
|
assert res["permissions_app.admin"]["corresponding_users"] == ["alice"]
|
||||||
|
|
|
@ -1977,7 +1977,7 @@ def test_question_file_from_api():
|
||||||
|
|
||||||
from base64 import b64encode
|
from base64 import b64encode
|
||||||
|
|
||||||
b64content = b64encode("helloworld".encode())
|
b64content = b64encode(b"helloworld")
|
||||||
questions = [
|
questions = [
|
||||||
{
|
{
|
||||||
"name": "some_file",
|
"name": "some_file",
|
||||||
|
|
|
@ -281,7 +281,7 @@ def test_update_group_add_user(mocker):
|
||||||
user_group_update("dev", add=["bob"])
|
user_group_update("dev", add=["bob"])
|
||||||
|
|
||||||
group_res = user_group_list()["groups"]
|
group_res = user_group_list()["groups"]
|
||||||
assert set(group_res["dev"]["members"]) == set(["alice", "bob"])
|
assert set(group_res["dev"]["members"]) == {"alice", "bob"}
|
||||||
|
|
||||||
|
|
||||||
def test_update_group_add_user_already_in(mocker):
|
def test_update_group_add_user_already_in(mocker):
|
||||||
|
|
|
@ -224,7 +224,7 @@ def tools_postinstall(
|
||||||
disk_partitions = sorted(psutil.disk_partitions(), key=lambda k: k.mountpoint)
|
disk_partitions = sorted(psutil.disk_partitions(), key=lambda k: k.mountpoint)
|
||||||
main_disk_partitions = [d for d in disk_partitions if d.mountpoint in ["/", "/var"]]
|
main_disk_partitions = [d for d in disk_partitions if d.mountpoint in ["/", "/var"]]
|
||||||
main_space = sum(
|
main_space = sum(
|
||||||
[psutil.disk_usage(d.mountpoint).total for d in main_disk_partitions]
|
psutil.disk_usage(d.mountpoint).total for d in main_disk_partitions
|
||||||
)
|
)
|
||||||
GB = 1024 ** 3
|
GB = 1024 ** 3
|
||||||
if not force_diskspace and main_space < 10 * GB:
|
if not force_diskspace and main_space < 10 * GB:
|
||||||
|
|
|
@ -97,7 +97,7 @@ def user_list(fields=None):
|
||||||
and values[0].strip() == "/bin/false",
|
and values[0].strip() == "/bin/false",
|
||||||
}
|
}
|
||||||
|
|
||||||
attrs = set(["uid"])
|
attrs = {"uid"}
|
||||||
users = {}
|
users = {}
|
||||||
|
|
||||||
if not fields:
|
if not fields:
|
||||||
|
@ -215,7 +215,7 @@ def user_create(
|
||||||
uid_guid_found = uid not in all_uid and uid not in all_gid
|
uid_guid_found = uid not in all_uid and uid not in all_gid
|
||||||
|
|
||||||
# Adapt values for LDAP
|
# Adapt values for LDAP
|
||||||
fullname = "%s %s" % (firstname, lastname)
|
fullname = "{} {}".format(firstname, lastname)
|
||||||
|
|
||||||
attr_dict = {
|
attr_dict = {
|
||||||
"objectClass": [
|
"objectClass": [
|
||||||
|
@ -333,8 +333,8 @@ def user_delete(operation_logger, username, purge=False, from_import=False):
|
||||||
subprocess.call(["nscd", "-i", "passwd"])
|
subprocess.call(["nscd", "-i", "passwd"])
|
||||||
|
|
||||||
if purge:
|
if purge:
|
||||||
subprocess.call(["rm", "-rf", "/home/{0}".format(username)])
|
subprocess.call(["rm", "-rf", "/home/{}".format(username)])
|
||||||
subprocess.call(["rm", "-rf", "/var/mail/{0}".format(username)])
|
subprocess.call(["rm", "-rf", "/var/mail/{}".format(username)])
|
||||||
|
|
||||||
hook_callback("post_user_delete", args=[username, purge])
|
hook_callback("post_user_delete", args=[username, purge])
|
||||||
|
|
||||||
|
@ -1334,9 +1334,9 @@ def user_ssh_remove_key(username, key):
|
||||||
def _convertSize(num, suffix=""):
|
def _convertSize(num, suffix=""):
|
||||||
for unit in ["K", "M", "G", "T", "P", "E", "Z"]:
|
for unit in ["K", "M", "G", "T", "P", "E", "Z"]:
|
||||||
if abs(num) < 1024.0:
|
if abs(num) < 1024.0:
|
||||||
return "%3.1f%s%s" % (num, unit, suffix)
|
return "{:3.1f}{}{}".format(num, unit, suffix)
|
||||||
num /= 1024.0
|
num /= 1024.0
|
||||||
return "%.1f%s%s" % (num, "Yi", suffix)
|
return "{:.1f}{}{}".format(num, "Yi", suffix)
|
||||||
|
|
||||||
|
|
||||||
def _hash_user_password(password):
|
def _hash_user_password(password):
|
||||||
|
|
|
@ -44,7 +44,7 @@ def get_public_ip(protocol=4):
|
||||||
):
|
):
|
||||||
ip = read_file(cache_file).strip()
|
ip = read_file(cache_file).strip()
|
||||||
ip = ip if ip else None # Empty file (empty string) means there's no IP
|
ip = ip if ip else None # Empty file (empty string) means there's no IP
|
||||||
logger.debug("Reusing IPv%s from cache: %s" % (protocol, ip))
|
logger.debug("Reusing IPv{} from cache: {}".format(protocol, ip))
|
||||||
else:
|
else:
|
||||||
ip = get_public_ip_from_remote_server(protocol)
|
ip = get_public_ip_from_remote_server(protocol)
|
||||||
logger.debug("IP fetched: %s" % ip)
|
logger.debug("IP fetched: %s" % ip)
|
||||||
|
@ -87,7 +87,7 @@ def get_public_ip_from_remote_server(protocol=4):
|
||||||
try:
|
try:
|
||||||
return download_text(url, timeout=30).strip()
|
return download_text(url, timeout=30).strip()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug("Could not get public IPv%s : %s" % (str(protocol), str(e)))
|
logger.debug("Could not get public IPv{} : {}".format(str(protocol), str(e)))
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -49,7 +49,7 @@ def yunopaste(data):
|
||||||
raw_msg=True,
|
raw_msg=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
return "%s/raw/%s" % (paste_server, url)
|
return "{}/raw/{}".format(paste_server, url)
|
||||||
|
|
||||||
|
|
||||||
def anonymize(data):
|
def anonymize(data):
|
||||||
|
|
36
src/yunohost/vendor/acme_tiny/acme_tiny.py
vendored
36
src/yunohost/vendor/acme_tiny/acme_tiny.py
vendored
|
@ -38,7 +38,7 @@ def get_crt(
|
||||||
)
|
)
|
||||||
out, err = proc.communicate(cmd_input)
|
out, err = proc.communicate(cmd_input)
|
||||||
if proc.returncode != 0:
|
if proc.returncode != 0:
|
||||||
raise IOError("{0}\n{1}".format(err_msg, err))
|
raise IOError("{}\n{}".format(err_msg, err))
|
||||||
return out
|
return out
|
||||||
|
|
||||||
# helper function - make request and automatically parse json response
|
# helper function - make request and automatically parse json response
|
||||||
|
@ -74,7 +74,7 @@ def get_crt(
|
||||||
raise IndexError(resp_data) # allow 100 retrys for bad nonces
|
raise IndexError(resp_data) # allow 100 retrys for bad nonces
|
||||||
if code not in [200, 201, 204]:
|
if code not in [200, 201, 204]:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
"{0}:\nUrl: {1}\nData: {2}\nResponse Code: {3}\nResponse: {4}".format(
|
"{}:\nUrl: {}\nData: {}\nResponse Code: {}\nResponse: {}".format(
|
||||||
err_msg, url, data, code, resp_data
|
err_msg, url, data, code, resp_data
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
@ -89,7 +89,7 @@ def get_crt(
|
||||||
{"jwk": jwk} if acct_headers is None else {"kid": acct_headers["Location"]}
|
{"jwk": jwk} if acct_headers is None else {"kid": acct_headers["Location"]}
|
||||||
)
|
)
|
||||||
protected64 = _b64(json.dumps(protected).encode("utf8"))
|
protected64 = _b64(json.dumps(protected).encode("utf8"))
|
||||||
protected_input = "{0}.{1}".format(protected64, payload64).encode("utf8")
|
protected_input = "{}.{}".format(protected64, payload64).encode("utf8")
|
||||||
out = _cmd(
|
out = _cmd(
|
||||||
["openssl", "dgst", "-sha256", "-sign", account_key],
|
["openssl", "dgst", "-sha256", "-sign", account_key],
|
||||||
stdin=subprocess.PIPE,
|
stdin=subprocess.PIPE,
|
||||||
|
@ -125,8 +125,8 @@ def get_crt(
|
||||||
pub_hex, pub_exp = re.search(
|
pub_hex, pub_exp = re.search(
|
||||||
pub_pattern, out.decode("utf8"), re.MULTILINE | re.DOTALL
|
pub_pattern, out.decode("utf8"), re.MULTILINE | re.DOTALL
|
||||||
).groups()
|
).groups()
|
||||||
pub_exp = "{0:x}".format(int(pub_exp))
|
pub_exp = "{:x}".format(int(pub_exp))
|
||||||
pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
|
pub_exp = "0{}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
|
||||||
alg = "RS256"
|
alg = "RS256"
|
||||||
jwk = {
|
jwk = {
|
||||||
"e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
|
"e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
|
||||||
|
@ -140,9 +140,9 @@ def get_crt(
|
||||||
log.info("Parsing CSR...")
|
log.info("Parsing CSR...")
|
||||||
out = _cmd(
|
out = _cmd(
|
||||||
["openssl", "req", "-in", csr, "-noout", "-text"],
|
["openssl", "req", "-in", csr, "-noout", "-text"],
|
||||||
err_msg="Error loading {0}".format(csr),
|
err_msg="Error loading {}".format(csr),
|
||||||
)
|
)
|
||||||
domains = set([])
|
domains = set()
|
||||||
common_name = re.search(r"Subject:.*? CN\s?=\s?([^\s,;/]+)", out.decode("utf8"))
|
common_name = re.search(r"Subject:.*? CN\s?=\s?([^\s,;/]+)", out.decode("utf8"))
|
||||||
if common_name is not None:
|
if common_name is not None:
|
||||||
domains.add(common_name.group(1))
|
domains.add(common_name.group(1))
|
||||||
|
@ -155,7 +155,7 @@ def get_crt(
|
||||||
for san in subject_alt_names.group(1).split(", "):
|
for san in subject_alt_names.group(1).split(", "):
|
||||||
if san.startswith("DNS:"):
|
if san.startswith("DNS:"):
|
||||||
domains.add(san[4:])
|
domains.add(san[4:])
|
||||||
log.info("Found domains: {0}".format(", ".join(domains)))
|
log.info("Found domains: {}".format(", ".join(domains)))
|
||||||
|
|
||||||
# get the ACME directory of urls
|
# get the ACME directory of urls
|
||||||
log.info("Getting directory...")
|
log.info("Getting directory...")
|
||||||
|
@ -178,7 +178,7 @@ def get_crt(
|
||||||
{"contact": contact},
|
{"contact": contact},
|
||||||
"Error updating contact details",
|
"Error updating contact details",
|
||||||
)
|
)
|
||||||
log.info("Updated contact details:\n{0}".format("\n".join(account["contact"])))
|
log.info("Updated contact details:\n{}".format("\n".join(account["contact"])))
|
||||||
|
|
||||||
# create a new order
|
# create a new order
|
||||||
log.info("Creating new order...")
|
log.info("Creating new order...")
|
||||||
|
@ -194,46 +194,46 @@ def get_crt(
|
||||||
auth_url, None, "Error getting challenges"
|
auth_url, None, "Error getting challenges"
|
||||||
)
|
)
|
||||||
domain = authorization["identifier"]["value"]
|
domain = authorization["identifier"]["value"]
|
||||||
log.info("Verifying {0}...".format(domain))
|
log.info("Verifying {}...".format(domain))
|
||||||
|
|
||||||
# find the http-01 challenge and write the challenge file
|
# find the http-01 challenge and write the challenge file
|
||||||
challenge = [c for c in authorization["challenges"] if c["type"] == "http-01"][
|
challenge = [c for c in authorization["challenges"] if c["type"] == "http-01"][
|
||||||
0
|
0
|
||||||
]
|
]
|
||||||
token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge["token"])
|
token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge["token"])
|
||||||
keyauthorization = "{0}.{1}".format(token, thumbprint)
|
keyauthorization = "{}.{}".format(token, thumbprint)
|
||||||
wellknown_path = os.path.join(acme_dir, token)
|
wellknown_path = os.path.join(acme_dir, token)
|
||||||
with open(wellknown_path, "w") as wellknown_file:
|
with open(wellknown_path, "w") as wellknown_file:
|
||||||
wellknown_file.write(keyauthorization)
|
wellknown_file.write(keyauthorization)
|
||||||
|
|
||||||
# check that the file is in place
|
# check that the file is in place
|
||||||
try:
|
try:
|
||||||
wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(
|
wellknown_url = "http://{}/.well-known/acme-challenge/{}".format(
|
||||||
domain, token
|
domain, token
|
||||||
)
|
)
|
||||||
assert disable_check or _do_request(wellknown_url)[0] == keyauthorization
|
assert disable_check or _do_request(wellknown_url)[0] == keyauthorization
|
||||||
except (AssertionError, ValueError) as e:
|
except (AssertionError, ValueError) as e:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
"Wrote file to {0}, but couldn't download {1}: {2}".format(
|
"Wrote file to {}, but couldn't download {}: {}".format(
|
||||||
wellknown_path, wellknown_url, e
|
wellknown_path, wellknown_url, e
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
# say the challenge is done
|
# say the challenge is done
|
||||||
_send_signed_request(
|
_send_signed_request(
|
||||||
challenge["url"], {}, "Error submitting challenges: {0}".format(domain)
|
challenge["url"], {}, "Error submitting challenges: {}".format(domain)
|
||||||
)
|
)
|
||||||
authorization = _poll_until_not(
|
authorization = _poll_until_not(
|
||||||
auth_url,
|
auth_url,
|
||||||
["pending"],
|
["pending"],
|
||||||
"Error checking challenge status for {0}".format(domain),
|
"Error checking challenge status for {}".format(domain),
|
||||||
)
|
)
|
||||||
if authorization["status"] != "valid":
|
if authorization["status"] != "valid":
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
"Challenge did not pass for {0}: {1}".format(domain, authorization)
|
"Challenge did not pass for {}: {}".format(domain, authorization)
|
||||||
)
|
)
|
||||||
os.remove(wellknown_path)
|
os.remove(wellknown_path)
|
||||||
log.info("{0} verified!".format(domain))
|
log.info("{} verified!".format(domain))
|
||||||
|
|
||||||
# finalize the order with the csr
|
# finalize the order with the csr
|
||||||
log.info("Signing certificate...")
|
log.info("Signing certificate...")
|
||||||
|
@ -251,7 +251,7 @@ def get_crt(
|
||||||
"Error checking order status",
|
"Error checking order status",
|
||||||
)
|
)
|
||||||
if order["status"] != "valid":
|
if order["status"] != "valid":
|
||||||
raise ValueError("Order failed: {0}".format(order))
|
raise ValueError("Order failed: {}".format(order))
|
||||||
|
|
||||||
# download the certificate
|
# download the certificate
|
||||||
certificate_pem, _, _ = _send_signed_request(
|
certificate_pem, _, _ = _send_signed_request(
|
||||||
|
|
|
@ -26,10 +26,10 @@ def find_inconsistencies(locale_file):
|
||||||
# Then we check that every "{stuff}" (for python's .format())
|
# Then we check that every "{stuff}" (for python's .format())
|
||||||
# should also be in the translated string, otherwise the .format
|
# should also be in the translated string, otherwise the .format
|
||||||
# will trigger an exception!
|
# will trigger an exception!
|
||||||
subkeys_in_ref = set(k[0] for k in re.findall(r"{(\w+)(:\w)?}", string))
|
subkeys_in_ref = {k[0] for k in re.findall(r"{(\w+)(:\w)?}", string)}
|
||||||
subkeys_in_this_locale = set(
|
subkeys_in_this_locale = {
|
||||||
k[0] for k in re.findall(r"{(\w+)(:\w)?}", this_locale[key])
|
k[0] for k in re.findall(r"{(\w+)(:\w)?}", this_locale[key])
|
||||||
)
|
}
|
||||||
|
|
||||||
if any(k not in subkeys_in_ref for k in subkeys_in_this_locale):
|
if any(k not in subkeys_in_ref for k in subkeys_in_this_locale):
|
||||||
yield """\n
|
yield """\n
|
||||||
|
|
Loading…
Add table
Reference in a new issue