f-string all the things!

This commit is contained in:
Alexandre Aubin 2021-12-27 14:59:08 +01:00
parent 77058ab356
commit 7112deb167
15 changed files with 66 additions and 102 deletions

View file

@ -42,10 +42,8 @@ class BaseSystemDiagnoser(Diagnoser):
elif os.path.exists("/sys/devices/virtual/dmi/id/sys_vendor"): elif os.path.exists("/sys/devices/virtual/dmi/id/sys_vendor"):
model = read_file("/sys/devices/virtual/dmi/id/sys_vendor").strip() model = read_file("/sys/devices/virtual/dmi/id/sys_vendor").strip()
if os.path.exists("/sys/devices/virtual/dmi/id/product_name"): if os.path.exists("/sys/devices/virtual/dmi/id/product_name"):
model = "{} {}".format( product_name = read_file("/sys/devices/virtual/dmi/id/product_name").strip()
model, model = f"{model} {product_name}"
read_file("/sys/devices/virtual/dmi/id/product_name").strip(),
)
hardware["data"]["model"] = model hardware["data"]["model"] = model
hardware["details"] = ["diagnosis_basesystem_hardware_model"] hardware["details"] = ["diagnosis_basesystem_hardware_model"]
@ -116,7 +114,7 @@ class BaseSystemDiagnoser(Diagnoser):
bad_sury_packages = list(self.bad_sury_packages()) bad_sury_packages = list(self.bad_sury_packages())
if bad_sury_packages: if bad_sury_packages:
cmd_to_fix = "apt install --allow-downgrades " + " ".join( cmd_to_fix = "apt install --allow-downgrades " + " ".join(
["{}={}".format(package, version) for package, version in bad_sury_packages] [f"{package}={version}" for package, version in bad_sury_packages]
) )
yield dict( yield dict(
meta={"test": "packages_from_sury"}, meta={"test": "packages_from_sury"},

View file

@ -167,10 +167,7 @@ class IPDiagnoser(Diagnoser):
assert ( assert (
resolvers != [] resolvers != []
), "Uhoh, need at least one IPv{} DNS resolver in {} ...".format( ), f"Uhoh, need at least one IPv{protocol} DNS resolver in {resolver_file} ..."
protocol,
resolver_file,
)
# So let's try to ping the first 4~5 resolvers (shuffled) # So let's try to ping the first 4~5 resolvers (shuffled)
# If we succesfully ping any of them, we conclude that we are indeed connected # If we succesfully ping any of them, we conclude that we are indeed connected
@ -220,9 +217,9 @@ class IPDiagnoser(Diagnoser):
try: try:
return download_text(url, timeout=30).strip() return download_text(url, timeout=30).strip()
except Exception as e: except Exception as e:
self.logger_debug( protocol = str(protocol)
"Could not get public IPv{} : {}".format(str(protocol), str(e)) e = str(e)
) self.logger_debug(f"Could not get public IPv{protocol} : {e}")
return None return None

View file

@ -156,7 +156,7 @@ class SystemResourcesDiagnoser(Diagnoser):
kills_count = self.recent_kills_by_oom_reaper() kills_count = self.recent_kills_by_oom_reaper()
if kills_count: if kills_count:
kills_summary = "\n".join( kills_summary = "\n".join(
["{} (x{})".format(proc, count) for proc, count in kills_count] [f"{proc} (x{count})" for proc, count in kills_count]
) )
yield dict( yield dict(
@ -202,9 +202,11 @@ def human_size(bytes_):
# Adapted from https://stackoverflow.com/a/1094933 # Adapted from https://stackoverflow.com/a/1094933
for unit in ["", "ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: for unit in ["", "ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
if abs(bytes_) < 1024.0: if abs(bytes_) < 1024.0:
return "{} {}B".format(round_(bytes_), unit) bytes_ = round_(bytes_)
return f"{bytes_} {unit}B"
bytes_ /= 1024.0 bytes_ /= 1024.0
return "{} {}B".format(round_(bytes_), "Yi") bytes_ = round_(bytes_)
return f"{bytes_} YiB"
def round_(n): def round_(n):

View file

@ -122,7 +122,7 @@ def app_list(full=False, installed=False, filter=None):
try: try:
app_info_dict = app_info(app_id, full=full) app_info_dict = app_info(app_id, full=full)
except Exception as e: except Exception as e:
logger.error("Failed to read info for {} : {}".format(app_id, e)) logger.error(f"Failed to read info for {app_id} : {e}")
continue continue
app_info_dict["id"] = app_id app_info_dict["id"] = app_id
out.append(app_info_dict) out.append(app_info_dict)
@ -1219,7 +1219,8 @@ def app_setting(app, key, value=None, delete=False):
) )
permissions = user_permission_list(full=True, apps=[app])["permissions"] permissions = user_permission_list(full=True, apps=[app])["permissions"]
permission_name = "{}.legacy_{}_uris".format(app, key.split("_")[0]) key_ = key.split("_")[0]
permission_name = f"{app}.legacy_{key_}_uris"
permission = permissions.get(permission_name) permission = permissions.get(permission_name)
# GET # GET
@ -1562,11 +1563,7 @@ def app_action_run(operation_logger, app, action, args=None):
shutil.rmtree(tmp_workdir_for_app) shutil.rmtree(tmp_workdir_for_app)
if retcode not in action_declaration.get("accepted_return_codes", [0]): if retcode not in action_declaration.get("accepted_return_codes", [0]):
msg = "Error while executing action '{}' of app '{}': return code {}".format( msg = f"Error while executing action '{action}' of app '{app}': return code {retcode}"
action,
app,
retcode,
)
operation_logger.error(msg) operation_logger.error(msg)
raise YunohostError(msg, raw_msg=True) raise YunohostError(msg, raw_msg=True)
@ -1989,7 +1986,8 @@ def _set_default_ask_questions(arguments):
for question in questions_with_default for question in questions_with_default
): ):
# The key is for example "app_manifest_install_ask_domain" # The key is for example "app_manifest_install_ask_domain"
key = "app_manifest_{}_ask_{}".format(script_name, arg["name"]) arg_name = arg["name"]
key = f"app_manifest_{script_name}_ask_{arg_name}"
arg["ask"] = m18n.n(key) arg["ask"] = m18n.n(key)
# Also it in fact doesn't make sense for any of those questions to have an example value nor a default value... # Also it in fact doesn't make sense for any of those questions to have an example value nor a default value...
@ -2397,7 +2395,8 @@ def _make_environment_for_app_script(
env_dict["YNH_APP_BASEDIR"] = workdir env_dict["YNH_APP_BASEDIR"] = workdir
for arg_name, arg_value in args.items(): for arg_name, arg_value in args.items():
env_dict["YNH_{}{}".format(args_prefix, arg_name.upper())] = str(arg_value) arg_name_upper = arg_name.upper()
env_dict[f"YNH_{args_prefix}{arg_name_upper}"] = str(arg_value)
return env_dict return env_dict

View file

@ -217,7 +217,7 @@ def _load_apps_catalog():
) )
except Exception as e: except Exception as e:
raise YunohostError( raise YunohostError(
"Unable to read cache for apps_catalog {} : {}".format(cache_file, e), f"Unable to read cache for apps_catalog {cache_file} : {e}",
raw_msg=True, raw_msg=True,
) )

View file

@ -2420,7 +2420,7 @@ def backup_download(name):
) )
return return
archive_file = "{}/{}.tar".format(ARCHIVES_PATH, name) archive_file = f"{ARCHIVES_PATH}/{name}.tar"
# Check file exist (even if it's a broken symlink) # Check file exist (even if it's a broken symlink)
if not os.path.lexists(archive_file): if not os.path.lexists(archive_file):
@ -2462,7 +2462,7 @@ def backup_info(name, with_details=False, human_readable=False):
elif name.endswith(".tar"): elif name.endswith(".tar"):
name = name[: -len(".tar")] name = name[: -len(".tar")]
archive_file = "{}/{}.tar".format(ARCHIVES_PATH, name) archive_file = f"{ARCHIVES_PATH}/{name}.tar"
# Check file exist (even if it's a broken symlink) # Check file exist (even if it's a broken symlink)
if not os.path.lexists(archive_file): if not os.path.lexists(archive_file):
@ -2480,7 +2480,7 @@ def backup_info(name, with_details=False, human_readable=False):
"backup_archive_broken_link", path=archive_file "backup_archive_broken_link", path=archive_file
) )
info_file = "{}/{}.info.json".format(ARCHIVES_PATH, name) info_file = f"{ARCHIVES_PATH}/{name}.info.json"
if not os.path.exists(info_file): if not os.path.exists(info_file):
tar = tarfile.open( tar = tarfile.open(
@ -2591,10 +2591,10 @@ def backup_delete(name):
hook_callback("pre_backup_delete", args=[name]) hook_callback("pre_backup_delete", args=[name])
archive_file = "{}/{}.tar".format(ARCHIVES_PATH, name) archive_file = f"{ARCHIVES_PATH}/{name}.tar"
if not os.path.exists(archive_file) and os.path.exists(archive_file + ".gz"): if not os.path.exists(archive_file) and os.path.exists(archive_file + ".gz"):
archive_file += ".gz" archive_file += ".gz"
info_file = "{}/{}.info.json".format(ARCHIVES_PATH, name) info_file = f"{ARCHIVES_PATH}/{name}.info.json"
files_to_delete = [archive_file, info_file] files_to_delete = [archive_file, info_file]

View file

@ -143,11 +143,7 @@ def _certificate_install_selfsigned(domain_list, force=False):
# Paths of files and folder we'll need # Paths of files and folder we'll need
date_tag = datetime.utcnow().strftime("%Y%m%d.%H%M%S") date_tag = datetime.utcnow().strftime("%Y%m%d.%H%M%S")
new_cert_folder = "{}/{}-history/{}-selfsigned".format( new_cert_folder = f"{CERT_FOLDER}/{domain}-history/{date_tag}-selfsigned"
CERT_FOLDER,
domain,
date_tag,
)
conf_template = os.path.join(SSL_DIR, "openssl.cnf") conf_template = os.path.join(SSL_DIR, "openssl.cnf")
@ -300,10 +296,7 @@ def _certificate_install_letsencrypt(
try: try:
_fetch_and_enable_new_certificate(domain, staging, no_checks=no_checks) _fetch_and_enable_new_certificate(domain, staging, no_checks=no_checks)
except Exception as e: except Exception as e:
msg = "Certificate installation for {} failed !\nException: {}".format( msg = f"Certificate installation for {domain} failed !\nException: {e}"
domain,
e,
)
logger.error(msg) logger.error(msg)
operation_logger.error(msg) operation_logger.error(msg)
if no_checks: if no_checks:
@ -456,39 +449,25 @@ def _email_renewing_failed(domain, exception_message, stack=""):
subject_ = "Certificate renewing attempt for %s failed!" % domain subject_ = "Certificate renewing attempt for %s failed!" % domain
logs = _tail(50, "/var/log/yunohost/yunohost-cli.log") logs = _tail(50, "/var/log/yunohost/yunohost-cli.log")
text = """ message = f"""\
An attempt for renewing the certificate for domain {} failed with the following From: {from_}
To: {to_}
Subject: {subject_}
An attempt for renewing the certificate for domain {domain} failed with the following
error : error :
{} {exception_message}
{} {stack}
Here's the tail of /var/log/yunohost/yunohost-cli.log, which might help to Here's the tail of /var/log/yunohost/yunohost-cli.log, which might help to
investigate : investigate :
{} {logs}
-- Certificate Manager -- Certificate Manager
"""
""".format(
domain,
exception_message,
stack,
logs,
)
message = """\
From: {}
To: {}
Subject: {}
{}
""".format(
from_,
to_,
subject_,
text,
)
import smtplib import smtplib
@ -532,7 +511,7 @@ def _fetch_and_enable_new_certificate(domain, staging=False, no_checks=False):
# Prepare certificate signing request # Prepare certificate signing request
logger.debug("Prepare key and certificate signing request (CSR) for %s...", domain) logger.debug("Prepare key and certificate signing request (CSR) for %s...", domain)
domain_key_file = "{}/{}.pem".format(TMP_FOLDER, domain) domain_key_file = f"{TMP_FOLDER}/{domain}.pem"
_generate_key(domain_key_file) _generate_key(domain_key_file)
_set_permissions(domain_key_file, "root", "ssl-cert", 0o640) _set_permissions(domain_key_file, "root", "ssl-cert", 0o640)
@ -541,7 +520,7 @@ def _fetch_and_enable_new_certificate(domain, staging=False, no_checks=False):
# Sign the certificate # Sign the certificate
logger.debug("Now using ACME Tiny to sign the certificate...") logger.debug("Now using ACME Tiny to sign the certificate...")
domain_csr_file = "{}/{}.csr".format(TMP_FOLDER, domain) domain_csr_file = f"{TMP_FOLDER}/{domain}.csr"
if staging: if staging:
certification_authority = STAGING_CERTIFICATION_AUTHORITY certification_authority = STAGING_CERTIFICATION_AUTHORITY
@ -580,12 +559,7 @@ def _fetch_and_enable_new_certificate(domain, staging=False, no_checks=False):
else: else:
folder_flag = "letsencrypt" folder_flag = "letsencrypt"
new_cert_folder = "{}/{}-history/{}-{}".format( new_cert_folder = f"{CERT_FOLDER}/{domain}-history/{date_tag}-{folder_flag}"
CERT_FOLDER,
domain,
date_tag,
folder_flag,
)
os.makedirs(new_cert_folder) os.makedirs(new_cert_folder)
@ -844,7 +818,7 @@ def _backup_current_cert(domain):
cert_folder_domain = os.path.join(CERT_FOLDER, domain) cert_folder_domain = os.path.join(CERT_FOLDER, domain)
date_tag = datetime.utcnow().strftime("%Y%m%d.%H%M%S") date_tag = datetime.utcnow().strftime("%Y%m%d.%H%M%S")
backup_folder = "{}-backups/{}".format(cert_folder_domain, date_tag) backup_folder = f"{cert_folder_domain}-backups/{date_tag}"
shutil.copytree(cert_folder_domain, backup_folder) shutil.copytree(cert_folder_domain, backup_folder)

View file

@ -151,7 +151,7 @@ def dyndns_subscribe(operation_logger, domain=None, key=None):
try: try:
error = json.loads(r.text)["error"] error = json.loads(r.text)["error"]
except Exception: except Exception:
error = 'Server error, code: {}. (Message: "{}")'.format(r.status_code, r.text) error = f'Server error, code: {r.status_code}. (Message: "{r.text}")'
raise YunohostError("dyndns_registration_failed", error=error) raise YunohostError("dyndns_registration_failed", error=error)
# Yunohost regen conf will add the dyndns cron job if a private key exists # Yunohost regen conf will add the dyndns cron job if a private key exists
@ -196,7 +196,7 @@ def dyndns_update(
# If key is not given, pick the first file we find with the domain given # If key is not given, pick the first file we find with the domain given
elif key is None: elif key is None:
keys = glob.glob("/etc/yunohost/dyndns/K{}.+*.private".format(domain)) keys = glob.glob(f"/etc/yunohost/dyndns/K{domain}.+*.private")
if not keys: if not keys:
raise YunohostValidationError("dyndns_key_not_found") raise YunohostValidationError("dyndns_key_not_found")
@ -263,14 +263,14 @@ def dyndns_update(
return None return None
raise YunohostError( raise YunohostError(
"Failed to resolve {} for {}".format(rdtype, domain), raw_msg=True f"Failed to resolve {rdtype} for {domain}", raw_msg=True
) )
old_ipv4 = resolve_domain(domain, "A") old_ipv4 = resolve_domain(domain, "A")
old_ipv6 = resolve_domain(domain, "AAAA") old_ipv6 = resolve_domain(domain, "AAAA")
logger.debug("Old IPv4/v6 are ({}, {})".format(old_ipv4, old_ipv6)) logger.debug(f"Old IPv4/v6 are ({old_ipv4}, {old_ipv6})")
logger.debug("Requested IPv4/v6 are ({}, {})".format(ipv4, ipv6)) logger.debug(f"Requested IPv4/v6 are ({ipv4}, {ipv6})")
# no need to update # no need to update
if (not force and not dry_run) and (old_ipv4 == ipv4 and old_ipv6 == ipv6): if (not force and not dry_run) and (old_ipv4 == ipv4 and old_ipv6 == ipv6):

View file

@ -197,7 +197,7 @@ def hook_list(action, list_by="name", show_info=False):
or (f.startswith("__") and f.endswith("__")) or (f.startswith("__") and f.endswith("__"))
): ):
continue continue
path = "{}{}/{}".format(folder, action, f) path = f"{folder}{action}/{f}"
priority, name = _extract_filename_parts(f) priority, name = _extract_filename_parts(f)
_append_hook(d, priority, name, path) _append_hook(d, priority, name, path)
@ -407,7 +407,7 @@ def _hook_exec_bash(path, args, chdir, env, user, return_format, loggers):
if not chdir: if not chdir:
# use the script directory as current one # use the script directory as current one
chdir, cmd_script = os.path.split(path) chdir, cmd_script = os.path.split(path)
cmd_script = "./{}".format(cmd_script) cmd_script = f"./{cmd_script}"
else: else:
cmd_script = path cmd_script = path

View file

@ -139,7 +139,8 @@ def user_permission_list(
continue continue
main_perm_label = permissions[main_perm_name]["label"] main_perm_label = permissions[main_perm_name]["label"]
infos["sublabel"] = infos["label"] infos["sublabel"] = infos["label"]
infos["label"] = "{} ({})".format(main_perm_label, infos["label"]) label_ = infos["label"]
infos["label"] = f"{main_perm_label} ({label_})"
if short: if short:
permissions = list(permissions.keys()) permissions = list(permissions.keys())

View file

@ -638,12 +638,9 @@ def _process_regen_conf(system_conf, new_conf=None, save=True):
""" """
if save: if save:
backup_path = os.path.join( system_conf_ = system_conf.lstrip("/")
BACKUP_CONF_DIR, now_ = datetime.utcnow().strftime("%Y%m%d.%H%M%S")
"{}-{}".format( backup_path = os.path.join(BACKUP_CONF_DIR, f"{system_conf_}-{now_}")
system_conf.lstrip("/"), datetime.utcnow().strftime("%Y%m%d.%H%M%S")
),
)
backup_dir = os.path.dirname(backup_path) backup_dir = os.path.dirname(backup_path)
if not os.path.isdir(backup_dir): if not os.path.isdir(backup_dir):

View file

@ -625,7 +625,7 @@ def _run_service_command(action, service):
% (action, ", ".join(possible_actions)) % (action, ", ".join(possible_actions))
) )
cmd = "systemctl {} {}".format(action, service) cmd = f"systemctl {action} {service}"
need_lock = services[service].get("need_lock", False) and action in [ need_lock = services[service].get("need_lock", False) and action in [
"start", "start",
@ -673,7 +673,7 @@ def _give_lock(action, service, p):
else: else:
systemctl_PID_name = "ControlPID" systemctl_PID_name = "ControlPID"
cmd_get_son_PID = "systemctl show {} -p {}".format(service, systemctl_PID_name) cmd_get_son_PID = f"systemctl show {service} -p {systemctl_PID_name}"
son_PID = 0 son_PID = 0
# As long as we did not found the PID and that the command is still running # As long as we did not found the PID and that the command is still running
while son_PID == 0 and p.poll() is None: while son_PID == 0 and p.poll() is None:
@ -686,9 +686,7 @@ def _give_lock(action, service, p):
# If we found a PID # If we found a PID
if son_PID != 0: if son_PID != 0:
# Append the PID to the lock file # Append the PID to the lock file
logger.debug( logger.debug(f"Giving a lock to PID {son_PID} for service {service} !")
"Giving a lock to PID {} for service {} !".format(str(son_PID), service)
)
append_to_file(MOULINETTE_LOCK, "\n%s" % str(son_PID)) append_to_file(MOULINETTE_LOCK, "\n%s" % str(son_PID))
return son_PID return son_PID
@ -865,9 +863,7 @@ def _get_journalctl_logs(service, number="all"):
systemd_service = services.get(service, {}).get("actual_systemd_service", service) systemd_service = services.get(service, {}).get("actual_systemd_service", service)
try: try:
return check_output( return check_output(
"journalctl --no-hostname --no-pager -u {} -n{}".format( f"journalctl --no-hostname --no-pager -u {systemd_service} -n{number}"
systemd_service, number
)
) )
except Exception: except Exception:
import traceback import traceback

View file

@ -224,7 +224,7 @@ def settings_set(key, value):
try: try:
trigger_post_change_hook(key, old_value, value) trigger_post_change_hook(key, old_value, value)
except Exception as e: except Exception as e:
logger.error("Post-change hook for setting {} failed : {}".format(key, e)) logger.error(f"Post-change hook for setting {key} failed : {e}")
raise raise

View file

@ -166,7 +166,7 @@ def user_create(
# On affiche les differents domaines possibles # On affiche les differents domaines possibles
Moulinette.display(m18n.n("domains_available")) Moulinette.display(m18n.n("domains_available"))
for domain in domain_list()["domains"]: for domain in domain_list()["domains"]:
Moulinette.display("- {}".format(domain)) Moulinette.display(f"- {domain}")
maindomain = _get_maindomain() maindomain = _get_maindomain()
domain = Moulinette.prompt( domain = Moulinette.prompt(
@ -215,7 +215,7 @@ def user_create(
uid_guid_found = uid not in all_uid and uid not in all_gid uid_guid_found = uid not in all_uid and uid not in all_gid
# Adapt values for LDAP # Adapt values for LDAP
fullname = "{} {}".format(firstname, lastname) fullname = f"{firstname} {lastname}"
attr_dict = { attr_dict = {
"objectClass": [ "objectClass": [
@ -333,8 +333,8 @@ def user_delete(operation_logger, username, purge=False, from_import=False):
subprocess.call(["nscd", "-i", "passwd"]) subprocess.call(["nscd", "-i", "passwd"])
if purge: if purge:
subprocess.call(["rm", "-rf", "/home/{}".format(username)]) subprocess.call(["rm", "-rf", f"/home/{username}"])
subprocess.call(["rm", "-rf", "/var/mail/{}".format(username)]) subprocess.call(["rm", "-rf", f"/var/mail/{username}"])
hook_callback("post_user_delete", args=[username, purge]) hook_callback("post_user_delete", args=[username, purge])

View file

@ -44,7 +44,7 @@ def get_public_ip(protocol=4):
): ):
ip = read_file(cache_file).strip() ip = read_file(cache_file).strip()
ip = ip if ip else None # Empty file (empty string) means there's no IP ip = ip if ip else None # Empty file (empty string) means there's no IP
logger.debug("Reusing IPv{} from cache: {}".format(protocol, ip)) logger.debug(f"Reusing IPv{protocol} from cache: {ip}")
else: else:
ip = get_public_ip_from_remote_server(protocol) ip = get_public_ip_from_remote_server(protocol)
logger.debug("IP fetched: %s" % ip) logger.debug("IP fetched: %s" % ip)
@ -87,7 +87,7 @@ def get_public_ip_from_remote_server(protocol=4):
try: try:
return download_text(url, timeout=30).strip() return download_text(url, timeout=30).strip()
except Exception as e: except Exception as e:
logger.debug("Could not get public IPv{} : {}".format(str(protocol), str(e))) logger.debug(f"Could not get public IPv{protocol} : {e}"))
return None return None