codequality: fstring all the things! (well, not all but a lot :P)

This commit is contained in:
Alexandre Aubin 2022-01-11 21:37:11 +01:00
parent 3fe44ee73b
commit b1fe61ed68
16 changed files with 127 additions and 178 deletions

View file

@ -170,7 +170,7 @@ def app_info(app, full=False):
ret["label"] = permissions.get(app + ".main", {}).get("label") ret["label"] = permissions.get(app + ".main", {}).get("label")
if not ret["label"]: if not ret["label"]:
logger.warning("Failed to get label for app %s ?" % app) logger.warning(f"Failed to get label for app {app} ?")
return ret return ret
@ -285,8 +285,7 @@ def app_map(app=None, raw=False, user=None):
if user: if user:
if not app_id + ".main" in permissions: if not app_id + ".main" in permissions:
logger.warning( logger.warning(
"Uhoh, no main permission was found for app %s ... sounds like an app was only partially removed due to another bug :/" f"Uhoh, no main permission was found for app {app_id} ... sounds like an app was only partially removed due to another bug :/"
% app_id
) )
continue continue
main_perm = permissions[app_id + ".main"] main_perm = permissions[app_id + ".main"]
@ -406,7 +405,7 @@ def app_change_url(operation_logger, app, domain, path):
# Execute App change_url script # Execute App change_url script
ret = hook_exec(change_url_script, env=env_dict)[0] ret = hook_exec(change_url_script, env=env_dict)[0]
if ret != 0: if ret != 0:
msg = "Failed to change '%s' url." % app msg = f"Failed to change '{app}' url."
logger.error(msg) logger.error(msg)
operation_logger.error(msg) operation_logger.error(msg)
@ -845,7 +844,7 @@ def app_install(
for question in questions: for question in questions:
# Or should it be more generally question.redact ? # Or should it be more generally question.redact ?
if question.type == "password": if question.type == "password":
del env_dict_for_logging["YNH_APP_ARG_%s" % question.name.upper()] del env_dict_for_logging[f"YNH_APP_ARG_{question.name.upper()}"]
operation_logger.extra.update({"env": env_dict_for_logging}) operation_logger.extra.update({"env": env_dict_for_logging})
@ -892,8 +891,7 @@ def app_install(
# This option is meant for packagers to debug their apps more easily # This option is meant for packagers to debug their apps more easily
if no_remove_on_failure: if no_remove_on_failure:
raise YunohostError( raise YunohostError(
"The installation of %s failed, but was not cleaned up as requested by --no-remove-on-failure." f"The installation of {app_id} failed, but was not cleaned up as requested by --no-remove-on-failure.",
% app_id,
raw_msg=True, raw_msg=True,
) )
else: else:
@ -1427,9 +1425,9 @@ def app_action_run(operation_logger, app, action, args=None):
actions = {x["id"]: x for x in actions} actions = {x["id"]: x for x in actions}
if action not in actions: if action not in actions:
available_actions = ", ".join(actions.keys()),
raise YunohostValidationError( raise YunohostValidationError(
"action '%s' not available for app '%s', available actions are: %s" f"action '{action}' not available for app '{app}', available actions are: {available_actions}",
% (action, app, ", ".join(actions.keys())),
raw_msg=True, raw_msg=True,
) )
@ -1852,8 +1850,7 @@ def _get_manifest_of_app(path):
manifest = read_json(os.path.join(path, "manifest.json")) manifest = read_json(os.path.join(path, "manifest.json"))
else: else:
raise YunohostError( raise YunohostError(
"There doesn't seem to be any manifest file in %s ... It looks like an app was not correctly installed/removed." f"There doesn't seem to be any manifest file in {path} ... It looks like an app was not correctly installed/removed.",
% path,
raw_msg=True, raw_msg=True,
) )
@ -2093,7 +2090,7 @@ def _extract_app_from_gitrepo(
cmd = f"git ls-remote --exit-code {url} {branch} | awk '{{print $1}}'" cmd = f"git ls-remote --exit-code {url} {branch} | awk '{{print $1}}'"
manifest["remote"]["revision"] = check_output(cmd) manifest["remote"]["revision"] = check_output(cmd)
except Exception as e: except Exception as e:
logger.warning("cannot get last commit hash because: %s ", e) logger.warning(f"cannot get last commit hash because: {e}")
else: else:
manifest["remote"]["revision"] = revision manifest["remote"]["revision"] = revision
manifest["lastUpdate"] = app_info.get("lastUpdate") manifest["lastUpdate"] = app_info.get("lastUpdate")
@ -2279,14 +2276,7 @@ def _assert_no_conflicting_apps(domain, path, ignore_app=None, full_domain=False
if conflicts: if conflicts:
apps = [] apps = []
for path, app_id, app_label in conflicts: for path, app_id, app_label in conflicts:
apps.append( apps.append(f" * {domain}{path}{app_label} ({app_id})")
" * {domain:s}{path:s}{app_label:s} ({app_id:s})".format(
domain=domain,
path=path,
app_id=app_id,
app_label=app_label,
)
)
if full_domain: if full_domain:
raise YunohostValidationError("app_full_domain_unavailable", domain=domain) raise YunohostValidationError("app_full_domain_unavailable", domain=domain)
@ -2415,7 +2405,7 @@ def is_true(arg):
elif isinstance(arg, str): elif isinstance(arg, str):
return arg.lower() in ["yes", "true", "on"] return arg.lower() in ["yes", "true", "on"]
else: else:
logger.debug("arg should be a boolean or a string, got %r", arg) logger.debug(f"arg should be a boolean or a string, got {arg}")
return True if arg else False return True if arg else False

View file

@ -103,9 +103,7 @@ def _initialize_apps_catalog_system():
) )
write_to_yaml(APPS_CATALOG_CONF, default_apps_catalog_list) write_to_yaml(APPS_CATALOG_CONF, default_apps_catalog_list)
except Exception as e: except Exception as e:
raise YunohostError( raise YunohostError(f"Could not initialize the apps catalog system... : {e}", raw_msg=True)
"Could not initialize the apps catalog system... : %s" % str(e)
)
logger.success(m18n.n("apps_catalog_init_success")) logger.success(m18n.n("apps_catalog_init_success"))
@ -121,14 +119,12 @@ def _read_apps_catalog_list():
# by returning [] if list_ is None # by returning [] if list_ is None
return list_ if list_ else [] return list_ if list_ else []
except Exception as e: except Exception as e:
raise YunohostError("Could not read the apps_catalog list ... : %s" % str(e)) raise YunohostError(f"Could not read the apps_catalog list ... : {e}", raw_msg=True)
def _actual_apps_catalog_api_url(base_url): def _actual_apps_catalog_api_url(base_url):
return "{base_url}/v{version}/apps.json".format( return f"{base_url}/v{APPS_CATALOG_API_VERSION}/apps.json"
base_url=base_url, version=APPS_CATALOG_API_VERSION
)
def _update_apps_catalog(): def _update_apps_catalog():
@ -172,16 +168,11 @@ def _update_apps_catalog():
apps_catalog_content["from_api_version"] = APPS_CATALOG_API_VERSION apps_catalog_content["from_api_version"] = APPS_CATALOG_API_VERSION
# Save the apps_catalog data in the cache # Save the apps_catalog data in the cache
cache_file = "{cache_folder}/{list}.json".format( cache_file = f"{APPS_CATALOG_CACHE}/{apps_catalog_id}.json"
cache_folder=APPS_CATALOG_CACHE, list=apps_catalog_id
)
try: try:
write_to_json(cache_file, apps_catalog_content) write_to_json(cache_file, apps_catalog_content)
except Exception as e: except Exception as e:
raise YunohostError( raise YunohostError(f"Unable to write cache data for {apps_catalog_id} apps_catalog : {e}", raw_msg=True)
"Unable to write cache data for %s apps_catalog : %s"
% (apps_catalog_id, str(e))
)
logger.success(m18n.n("apps_catalog_update_success")) logger.success(m18n.n("apps_catalog_update_success"))
@ -197,9 +188,7 @@ def _load_apps_catalog():
for apps_catalog_id in [L["id"] for L in _read_apps_catalog_list()]: for apps_catalog_id in [L["id"] for L in _read_apps_catalog_list()]:
# Let's load the json from cache for this catalog # Let's load the json from cache for this catalog
cache_file = "{cache_folder}/{list}.json".format( cache_file = f"{APPS_CATALOG_CACHE}/{apps_catalog_id}.json"
cache_folder=APPS_CATALOG_CACHE, list=apps_catalog_id
)
try: try:
apps_catalog_content = ( apps_catalog_content = (
@ -230,10 +219,8 @@ def _load_apps_catalog():
# (N.B. : there's a small edge case where multiple apps catalog could be listing the same apps ... # (N.B. : there's a small edge case where multiple apps catalog could be listing the same apps ...
# in which case we keep only the first one found) # in which case we keep only the first one found)
if app in merged_catalog["apps"]: if app in merged_catalog["apps"]:
logger.warning( other_catalog = merged_catalog["apps"][app]["repository"]
"Duplicate app %s found between apps catalog %s and %s" logger.warning(f"Duplicate app {app} found between apps catalog {apps_catalog_id} and {other_catalog}")
% (app, apps_catalog_id, merged_catalog["apps"][app]["repository"])
)
continue continue
info["repository"] = apps_catalog_id info["repository"] = apps_catalog_id

View file

@ -72,7 +72,7 @@ from yunohost.utils.filesystem import free_space_in_directory
from yunohost.settings import settings_get from yunohost.settings import settings_get
BACKUP_PATH = "/home/yunohost.backup" BACKUP_PATH = "/home/yunohost.backup"
ARCHIVES_PATH = "%s/archives" % BACKUP_PATH ARCHIVES_PATH = f"{BACKUP_PATH}/archives"
APP_MARGIN_SPACE_SIZE = 100 # In MB APP_MARGIN_SPACE_SIZE = 100 # In MB
CONF_MARGIN_SPACE_SIZE = 10 # IN MB CONF_MARGIN_SPACE_SIZE = 10 # IN MB
POSTINSTALL_ESTIMATE_SPACE_SIZE = 5 # In MB POSTINSTALL_ESTIMATE_SPACE_SIZE = 5 # In MB
@ -402,7 +402,7 @@ class BackupManager:
# backup and restore scripts # backup and restore scripts
for app in target_list: for app in target_list:
app_script_folder = "/etc/yunohost/apps/%s/scripts" % app app_script_folder = f"/etc/yunohost/apps/{app}/scripts"
backup_script_path = os.path.join(app_script_folder, "backup") backup_script_path = os.path.join(app_script_folder, "backup")
restore_script_path = os.path.join(app_script_folder, "restore") restore_script_path = os.path.join(app_script_folder, "restore")
@ -555,7 +555,7 @@ class BackupManager:
self._compute_backup_size() self._compute_backup_size()
# Create backup info file # Create backup info file
with open("%s/info.json" % self.work_dir, "w") as f: with open(f"{self.work_dir}/info.json", "w") as f:
f.write(json.dumps(self.info)) f.write(json.dumps(self.info))
def _get_env_var(self, app=None): def _get_env_var(self, app=None):
@ -732,7 +732,7 @@ class BackupManager:
logger.debug(m18n.n("backup_permission", app=app)) logger.debug(m18n.n("backup_permission", app=app))
permissions = user_permission_list(full=True, apps=[app])["permissions"] permissions = user_permission_list(full=True, apps=[app])["permissions"]
this_app_permissions = {name: infos for name, infos in permissions.items()} this_app_permissions = {name: infos for name, infos in permissions.items()}
write_to_yaml("%s/permissions.yml" % settings_dir, this_app_permissions) write_to_yaml(f"{settings_dir}/permissions.yml", this_app_permissions)
except Exception as e: except Exception as e:
logger.debug(e) logger.debug(e)
@ -921,7 +921,7 @@ class RestoreManager:
if not os.path.isfile("/etc/yunohost/installed"): if not os.path.isfile("/etc/yunohost/installed"):
# Retrieve the domain from the backup # Retrieve the domain from the backup
try: try:
with open("%s/conf/ynh/current_host" % self.work_dir, "r") as f: with open(f"{self.work_dir}/conf/ynh/current_host", "r") as f:
domain = f.readline().rstrip() domain = f.readline().rstrip()
except IOError: except IOError:
logger.debug( logger.debug(
@ -1004,7 +1004,7 @@ class RestoreManager:
continue continue
hook_paths = self.info["system"][system_part]["paths"] hook_paths = self.info["system"][system_part]["paths"]
hook_paths = ["hooks/restore/%s" % os.path.basename(p) for p in hook_paths] hook_paths = [f"hooks/restore/{os.path.basename(p)}" for p in hook_paths]
# Otherwise, add it from the archive to the system # Otherwise, add it from the archive to the system
# FIXME: Refactor hook_add and use it instead # FIXME: Refactor hook_add and use it instead
@ -1071,7 +1071,7 @@ class RestoreManager:
ret = subprocess.call(["umount", self.work_dir]) ret = subprocess.call(["umount", self.work_dir])
if ret == 0: if ret == 0:
subprocess.call(["rmdir", self.work_dir]) subprocess.call(["rmdir", self.work_dir])
logger.debug("Unmount dir: {}".format(self.work_dir)) logger.debug(f"Unmount dir: {self.work_dir}")
else: else:
raise YunohostError("restore_removing_tmp_dir_failed") raise YunohostError("restore_removing_tmp_dir_failed")
elif os.path.isdir(self.work_dir): elif os.path.isdir(self.work_dir):
@ -1080,7 +1080,7 @@ class RestoreManager:
) )
ret = subprocess.call(["rm", "-Rf", self.work_dir]) ret = subprocess.call(["rm", "-Rf", self.work_dir])
if ret == 0: if ret == 0:
logger.debug("Delete dir: {}".format(self.work_dir)) logger.debug(f"Delete dir: {self.work_dir}")
else: else:
raise YunohostError("restore_removing_tmp_dir_failed") raise YunohostError("restore_removing_tmp_dir_failed")
@ -1182,7 +1182,7 @@ class RestoreManager:
self._restore_apps() self._restore_apps()
except Exception as e: except Exception as e:
raise YunohostError( raise YunohostError(
"The following critical error happened during restoration: %s" % e f"The following critical error happened during restoration: {e}"
) )
finally: finally:
self.clean() self.clean()
@ -1429,20 +1429,19 @@ class RestoreManager:
restore_script = os.path.join(tmp_workdir_for_app, "restore") restore_script = os.path.join(tmp_workdir_for_app, "restore")
# Restore permissions # Restore permissions
if not os.path.isfile("%s/permissions.yml" % app_settings_new_path): if not os.path.isfile(f"{app_settings_new_path}/permissions.yml"):
raise YunohostError( raise YunohostError(
"Didnt find a permssions.yml for the app !?", raw_msg=True "Didnt find a permssions.yml for the app !?", raw_msg=True
) )
permissions = read_yaml("%s/permissions.yml" % app_settings_new_path) permissions = read_yaml(f"{app_settings_new_path}/permissions.yml")
existing_groups = user_group_list()["groups"] existing_groups = user_group_list()["groups"]
for permission_name, permission_infos in permissions.items(): for permission_name, permission_infos in permissions.items():
if "allowed" not in permission_infos: if "allowed" not in permission_infos:
logger.warning( logger.warning(
"'allowed' key corresponding to allowed groups for permission %s not found when restoring app %s … You might have to reconfigure permissions yourself." f"'allowed' key corresponding to allowed groups for permission {permission_name} not found when restoring app {app_instance_name} … You might have to reconfigure permissions yourself."
% (permission_name, app_instance_name)
) )
should_be_allowed = ["all_users"] should_be_allowed = ["all_users"]
else: else:
@ -1467,7 +1466,7 @@ class RestoreManager:
permission_sync_to_user() permission_sync_to_user()
os.remove("%s/permissions.yml" % app_settings_new_path) os.remove(f"{app_settings_new_path}/permissions.yml")
_tools_migrations_run_before_app_restore( _tools_migrations_run_before_app_restore(
backup_version=self.info["from_yunohost_version"], backup_version=self.info["from_yunohost_version"],
@ -1816,8 +1815,7 @@ class BackupMethod:
# where everything is mapped to /dev/mapper/some-stuff # where everything is mapped to /dev/mapper/some-stuff
# yet there are different devices behind it or idk ... # yet there are different devices behind it or idk ...
logger.warning( logger.warning(
"Could not link %s to %s (%s) ... falling back to regular copy." f"Could not link {src} to {dest} ({e}) ... falling back to regular copy."
% (src, dest, str(e))
) )
else: else:
# Success, go to next file to organize # Success, go to next file to organize
@ -2383,7 +2381,7 @@ def backup_list(with_info=False, human_readable=False):
""" """
# Get local archives sorted according to last modification time # Get local archives sorted according to last modification time
# (we do a realpath() to resolve symlinks) # (we do a realpath() to resolve symlinks)
archives = glob("%s/*.tar.gz" % ARCHIVES_PATH) + glob("%s/*.tar" % ARCHIVES_PATH) archives = glob(f"{ARCHIVES_PATH}/*.tar.gz") + glob(f"{ARCHIVES_PATH}/*.tar")
archives = {os.path.realpath(archive) for archive in archives} archives = {os.path.realpath(archive) for archive in archives}
archives = sorted(archives, key=lambda x: os.path.getctime(x)) archives = sorted(archives, key=lambda x: os.path.getctime(x))
# Extract only filename without the extension # Extract only filename without the extension
@ -2405,10 +2403,9 @@ def backup_list(with_info=False, human_readable=False):
logger.warning(str(e)) logger.warning(str(e))
except Exception: except Exception:
import traceback import traceback
trace_ = "\n" + traceback.format_exc()
logger.warning( logger.warning(
"Could not check infos for archive %s: %s" f"Could not check infos for archive {archive}: {trace_}"
% (archive, "\n" + traceback.format_exc())
) )
archives = d archives = d

View file

@ -228,10 +228,7 @@ def _certificate_install_selfsigned(domain_list, force=False):
) )
operation_logger.success() operation_logger.success()
else: else:
msg = ( msg = f"Installation of self-signed certificate installation for {domain} failed !"
"Installation of self-signed certificate installation for %s failed !"
% (domain)
)
logger.error(msg) logger.error(msg)
operation_logger.error(msg) operation_logger.error(msg)
@ -299,8 +296,7 @@ def _certificate_install_letsencrypt(
operation_logger.error(msg) operation_logger.error(msg)
if no_checks: if no_checks:
logger.error( logger.error(
"Please consider checking the 'DNS records' (basic) and 'Web' categories of the diagnosis to check for possible issues that may prevent installing a Let's Encrypt certificate on domain %s." f"Please consider checking the 'DNS records' (basic) and 'Web' categories of the diagnosis to check for possible issues that may prevent installing a Let's Encrypt certificate on domain {domain}."
% domain
) )
else: else:
logger.success(m18n.n("certmanager_cert_install_success", domain=domain)) logger.success(m18n.n("certmanager_cert_install_success", domain=domain))
@ -417,11 +413,10 @@ def certificate_renew(
stack = StringIO() stack = StringIO()
traceback.print_exc(file=stack) traceback.print_exc(file=stack)
msg = "Certificate renewing for %s failed!" % (domain) msg = f"Certificate renewing for {domain} failed!"
if no_checks: if no_checks:
msg += ( msg += (
"\nPlease consider checking the 'DNS records' (basic) and 'Web' categories of the diagnosis to check for possible issues that may prevent installing a Let's Encrypt certificate on domain %s." f"\nPlease consider checking the 'DNS records' (basic) and 'Web' categories of the diagnosis to check for possible issues that may prevent installing a Let's Encrypt certificate on domain {domain}."
% domain
) )
logger.error(msg) logger.error(msg)
operation_logger.error(msg) operation_logger.error(msg)
@ -442,9 +437,9 @@ def certificate_renew(
def _email_renewing_failed(domain, exception_message, stack=""): def _email_renewing_failed(domain, exception_message, stack=""):
from_ = "certmanager@%s (Certificate Manager)" % domain from_ = f"certmanager@{domain} (Certificate Manager)"
to_ = "root" to_ = "root"
subject_ = "Certificate renewing attempt for %s failed!" % domain subject_ = f"Certificate renewing attempt for {domain} failed!"
logs = _tail(50, "/var/log/yunohost/yunohost-cli.log") logs = _tail(50, "/var/log/yunohost/yunohost-cli.log")
message = f"""\ message = f"""\
@ -476,7 +471,7 @@ investigate :
def _check_acme_challenge_configuration(domain): def _check_acme_challenge_configuration(domain):
domain_conf = "/etc/nginx/conf.d/%s.conf" % domain domain_conf = f"/etc/nginx/conf.d/{domain}.conf"
return "include /etc/nginx/conf.d/acme-challenge.conf.inc" in read_file(domain_conf) return "include /etc/nginx/conf.d/acme-challenge.conf.inc" in read_file(domain_conf)

View file

@ -188,7 +188,7 @@ def diagnosis_run(
# Call the hook ... # Call the hook ...
diagnosed_categories = [] diagnosed_categories = []
for category in categories: for category in categories:
logger.debug("Running diagnosis for %s ..." % category) logger.debug(f"Running diagnosis for {category} ...")
diagnoser = _load_diagnoser(category) diagnoser = _load_diagnoser(category)
@ -282,7 +282,7 @@ def _diagnosis_ignore(add_filter=None, remove_filter=None, list=False):
) )
category = filter_[0] category = filter_[0]
if category not in all_categories_names: if category not in all_categories_names:
raise YunohostValidationError("%s is not a diagnosis category" % category) raise YunohostValidationError(f"{category} is not a diagnosis category")
if any("=" not in criteria for criteria in filter_[1:]): if any("=" not in criteria for criteria in filter_[1:]):
raise YunohostValidationError( raise YunohostValidationError(
"Criterias should be of the form key=value (e.g. domain=yolo.test)" "Criterias should be of the form key=value (e.g. domain=yolo.test)"
@ -423,7 +423,7 @@ class Diagnoser:
not force not force
and self.cached_time_ago() < self.cache_duration and self.cached_time_ago() < self.cache_duration
): ):
logger.debug("Cache still valid : %s" % self.cache_file) logger.debug(f"Cache still valid : {self.cache_file}")
logger.info( logger.info(
m18n.n("diagnosis_cache_still_valid", category=self.description) m18n.n("diagnosis_cache_still_valid", category=self.description)
) )
@ -457,7 +457,7 @@ class Diagnoser:
new_report = {"id": self.id_, "cached_for": self.cache_duration, "items": items} new_report = {"id": self.id_, "cached_for": self.cache_duration, "items": items}
logger.debug("Updating cache %s" % self.cache_file) logger.debug(f"Updating cache {self.cache_file}")
self.write_cache(new_report) self.write_cache(new_report)
Diagnoser.i18n(new_report) Diagnoser.i18n(new_report)
add_ignore_flag_to_issues(new_report) add_ignore_flag_to_issues(new_report)
@ -530,7 +530,7 @@ class Diagnoser:
@staticmethod @staticmethod
def cache_file(id_): def cache_file(id_):
return os.path.join(DIAGNOSIS_CACHE, "%s.json" % id_) return os.path.join(DIAGNOSIS_CACHE, f"{id_}.json")
@staticmethod @staticmethod
def get_cached_report(id_, item=None, warn_if_no_cache=True): def get_cached_report(id_, item=None, warn_if_no_cache=True):
@ -633,7 +633,7 @@ class Diagnoser:
elif ipversion == 6: elif ipversion == 6:
socket.getaddrinfo = getaddrinfo_ipv6_only socket.getaddrinfo = getaddrinfo_ipv6_only
url = "https://{}/{}".format(DIAGNOSIS_SERVER, uri) url = f"https://{DIAGNOSIS_SERVER}/{uri}"
try: try:
r = requests.post(url, json=data, timeout=timeout) r = requests.post(url, json=data, timeout=timeout)
finally: finally:
@ -641,18 +641,16 @@ class Diagnoser:
if r.status_code not in [200, 400]: if r.status_code not in [200, 400]:
raise Exception( raise Exception(
"The remote diagnosis server failed miserably while trying to diagnose your server. This is most likely an error on Yunohost's infrastructure and not on your side. Please contact the YunoHost team an provide them with the following information.<br>URL: <code>%s</code><br>Status code: <code>%s</code>" f"The remote diagnosis server failed miserably while trying to diagnose your server. This is most likely an error on Yunohost's infrastructure and not on your side. Please contact the YunoHost team an provide them with the following information.<br>URL: <code>{url}</code><br>Status code: <code>{r.status_code}</code>"
% (url, r.status_code)
) )
if r.status_code == 400: if r.status_code == 400:
raise Exception("Diagnosis request was refused: %s" % r.content) raise Exception(f"Diagnosis request was refused: {r.content}")
try: try:
r = r.json() r = r.json()
except Exception as e: except Exception as e:
raise Exception( raise Exception(
"Failed to parse json from diagnosis server response.\nError: %s\nOriginal content: %s" f"Failed to parse json from diagnosis server response.\nError: {e}\nOriginal content: {r.content}"
% (e, r.content)
) )
return r return r
@ -681,7 +679,7 @@ def _load_diagnoser(diagnoser_name):
# this is python builtin method to import a module using a name, we # this is python builtin method to import a module using a name, we
# use that to import the migration as a python object so we'll be # use that to import the migration as a python object so we'll be
# able to run it in the next loop # able to run it in the next loop
module = import_module("yunohost.diagnosers.{}".format(module_id)) module = import_module(f"yunohost.diagnosers.{module_id}")
return module.MyDiagnoser() return module.MyDiagnoser()
except Exception as e: except Exception as e:
import traceback import traceback
@ -695,9 +693,9 @@ def _email_diagnosis_issues():
from yunohost.domain import _get_maindomain from yunohost.domain import _get_maindomain
maindomain = _get_maindomain() maindomain = _get_maindomain()
from_ = "diagnosis@{} (Automatic diagnosis on {})".format(maindomain, maindomain) from_ = f"diagnosis@{maindomain} (Automatic diagnosis on {maindomain})"
to_ = "root" to_ = "root"
subject_ = "Issues found by automatic diagnosis on %s" % maindomain subject_ = f"Issues found by automatic diagnosis on {maindomain}"
disclaimer = "The automatic diagnosis on your YunoHost server identified some issues on your server. You will find a description of the issues below. You can manage those issues in the 'Diagnosis' section in your webadmin." disclaimer = "The automatic diagnosis on your YunoHost server identified some issues on your server. You will find a description of the issues below. You can manage those issues in the 'Diagnosis' section in your webadmin."
@ -707,23 +705,17 @@ def _email_diagnosis_issues():
content = _dump_human_readable_reports(issues) content = _dump_human_readable_reports(issues)
message = """\ message = f"""\
From: {} From: {from_}
To: {} To: {to_}
Subject: {} Subject: {subject_}
{} {disclaimer}
--- ---
{} {content}
""".format( """
from_,
to_,
subject_,
disclaimer,
content,
)
import smtplib import smtplib

View file

@ -338,7 +338,7 @@ def _build_dns_conf(base_domain, include_empty_AAAA_if_no_ipv6=False):
def _get_DKIM(domain): def _get_DKIM(domain):
DKIM_file = "/etc/dkim/{domain}.mail.txt".format(domain=domain) DKIM_file = f"/etc/dkim/{domain}.mail.txt"
if not os.path.isfile(DKIM_file): if not os.path.isfile(DKIM_file):
return (None, None) return (None, None)

View file

@ -196,7 +196,7 @@ def domain_add(operation_logger, domain, dyndns=False):
} }
try: try:
ldap.add("virtualdomain=%s,ou=domains" % domain, attr_dict) ldap.add(f"virtualdomain={domain},ou=domains", attr_dict)
except Exception as e: except Exception as e:
raise YunohostError("domain_creation_failed", domain=domain, error=e) raise YunohostError("domain_creation_failed", domain=domain, error=e)
finally: finally:
@ -215,7 +215,7 @@ def domain_add(operation_logger, domain, dyndns=False):
# This is a pretty ad hoc solution and only applied to nginx # This is a pretty ad hoc solution and only applied to nginx
# because it's one of the major service, but in the long term we # because it's one of the major service, but in the long term we
# should identify the root of this bug... # should identify the root of this bug...
_force_clear_hashes(["/etc/nginx/conf.d/%s.conf" % domain]) _force_clear_hashes([f"/etc/nginx/conf.d/{domain}.conf"])
regen_conf( regen_conf(
names=["nginx", "metronome", "dnsmasq", "postfix", "rspamd", "mdns"] names=["nginx", "metronome", "dnsmasq", "postfix", "rspamd", "mdns"]
) )
@ -282,8 +282,7 @@ def domain_remove(operation_logger, domain, remove_apps=False, force=False):
apps_on_that_domain.append( apps_on_that_domain.append(
( (
app, app,
' - %s "%s" on https://%s%s' f" - {app} \"{label}\" on https://{domain}{settings['path']}"
% (app, label, domain, settings["path"])
if "path" in settings if "path" in settings
else app, else app,
) )
@ -342,14 +341,14 @@ def domain_remove(operation_logger, domain, remove_apps=False, force=False):
# This is a pretty ad hoc solution and only applied to nginx # This is a pretty ad hoc solution and only applied to nginx
# because it's one of the major service, but in the long term we # because it's one of the major service, but in the long term we
# should identify the root of this bug... # should identify the root of this bug...
_force_clear_hashes(["/etc/nginx/conf.d/%s.conf" % domain]) _force_clear_hashes([f"/etc/nginx/conf.d/{domain}.conf"])
# And in addition we even force-delete the file Otherwise, if the file was # And in addition we even force-delete the file Otherwise, if the file was
# manually modified, it may not get removed by the regenconf which leads to # manually modified, it may not get removed by the regenconf which leads to
# catastrophic consequences of nginx breaking because it can't load the # catastrophic consequences of nginx breaking because it can't load the
# cert file which disappeared etc.. # cert file which disappeared etc..
if os.path.exists("/etc/nginx/conf.d/%s.conf" % domain): if os.path.exists(f"/etc/nginx/conf.d/{domain}.conf"):
_process_regen_conf( _process_regen_conf(
"/etc/nginx/conf.d/%s.conf" % domain, new_conf=None, save=True f"/etc/nginx/conf.d/{domain}.conf", new_conf=None, save=True
) )
regen_conf(names=["nginx", "metronome", "dnsmasq", "postfix", "rspamd", "mdns"]) regen_conf(names=["nginx", "metronome", "dnsmasq", "postfix", "rspamd", "mdns"])
@ -388,7 +387,7 @@ def domain_main_domain(operation_logger, new_main_domain=None):
domain_list_cache = {} domain_list_cache = {}
_set_hostname(new_main_domain) _set_hostname(new_main_domain)
except Exception as e: except Exception as e:
logger.warning("%s" % e, exc_info=1) logger.warning(str(e), exc_info=1)
raise YunohostError("main_domain_change_failed") raise YunohostError("main_domain_change_failed")
# Generate SSOwat configuration file # Generate SSOwat configuration file

View file

@ -95,7 +95,7 @@ def hook_info(action, name):
priorities = set() priorities = set()
# Search in custom folder first # Search in custom folder first
for h in iglob("{:s}{:s}/*-{:s}".format(CUSTOM_HOOK_FOLDER, action, name)): for h in iglob(f"{CUSTOM_HOOK_FOLDER}{action}/*-{name}"):
priority, _ = _extract_filename_parts(os.path.basename(h)) priority, _ = _extract_filename_parts(os.path.basename(h))
priorities.add(priority) priorities.add(priority)
hooks.append( hooks.append(
@ -105,7 +105,7 @@ def hook_info(action, name):
} }
) )
# Append non-overwritten system hooks # Append non-overwritten system hooks
for h in iglob("{:s}{:s}/*-{:s}".format(HOOK_FOLDER, action, name)): for h in iglob(f"{HOOK_FOLDER}{action}/*-{name}"):
priority, _ = _extract_filename_parts(os.path.basename(h)) priority, _ = _extract_filename_parts(os.path.basename(h))
if priority not in priorities: if priority not in priorities:
hooks.append( hooks.append(
@ -431,8 +431,7 @@ def _hook_exec_bash(path, args, chdir, env, user, return_format, loggers):
# use xtrace on fd 7 which is redirected to stdout # use xtrace on fd 7 which is redirected to stdout
env["BASH_XTRACEFD"] = "7" env["BASH_XTRACEFD"] = "7"
cmd = '/bin/bash -x "{script}" {args} 7>&1' command.append(f'/bin/bash -x "{cmd_script}" {cmd_args} 7>&1')
command.append(cmd.format(script=cmd_script, args=cmd_args))
logger.debug("Executing command '%s'" % command) logger.debug("Executing command '%s'" % command)

View file

@ -133,8 +133,7 @@ def user_permission_list(
main_perm_name = name.split(".")[0] + ".main" main_perm_name = name.split(".")[0] + ".main"
if main_perm_name not in permissions: if main_perm_name not in permissions:
logger.debug( logger.debug(
"Uhoh, unknown permission %s ? (Maybe we're in the process or deleting the perm for this app...)" f"Uhoh, unknown permission {main_perm_name} ? (Maybe we're in the process or deleting the perm for this app...)"
% main_perm_name
) )
continue continue
main_perm_label = permissions[main_perm_name]["label"] main_perm_label = permissions[main_perm_name]["label"]
@ -452,7 +451,7 @@ def permission_create(
operation_logger.start() operation_logger.start()
try: try:
ldap.add("cn=%s,ou=permission" % permission, attr_dict) ldap.add(f"cn={permission},ou=permission", attr_dict)
except Exception as e: except Exception as e:
raise YunohostError( raise YunohostError(
"permission_creation_failed", permission=permission, error=e "permission_creation_failed", permission=permission, error=e
@ -585,7 +584,7 @@ def permission_url(
try: try:
ldap.update( ldap.update(
"cn=%s,ou=permission" % permission, f"cn={permission},ou=permission",
{ {
"URL": [url] if url is not None else [], "URL": [url] if url is not None else [],
"additionalUrls": new_additional_urls, "additionalUrls": new_additional_urls,
@ -633,7 +632,7 @@ def permission_delete(operation_logger, permission, force=False, sync_perm=True)
operation_logger.start() operation_logger.start()
try: try:
ldap.remove("cn=%s,ou=permission" % permission) ldap.remove(f"cn={permission},ou=permission")
except Exception as e: except Exception as e:
raise YunohostError( raise YunohostError(
"permission_deletion_failed", permission=permission, error=e "permission_deletion_failed", permission=permission, error=e
@ -679,7 +678,7 @@ def permission_sync_to_user():
new_inherited_perms = { new_inherited_perms = {
"inheritPermission": [ "inheritPermission": [
"uid=%s,ou=users,dc=yunohost,dc=org" % u f"uid={u},ou=users,dc=yunohost,dc=org"
for u in should_be_allowed_users for u in should_be_allowed_users
], ],
"memberUid": should_be_allowed_users, "memberUid": should_be_allowed_users,
@ -687,7 +686,7 @@ def permission_sync_to_user():
# Commit the change with the new inherited stuff # Commit the change with the new inherited stuff
try: try:
ldap.update("cn=%s,ou=permission" % permission_name, new_inherited_perms) ldap.update(f"cn={permission_name},ou=permission", new_inherited_perms)
except Exception as e: except Exception as e:
raise YunohostError( raise YunohostError(
"permission_update_failed", permission=permission_name, error=e "permission_update_failed", permission=permission_name, error=e
@ -765,7 +764,7 @@ def _update_ldap_group_permission(
update["showTile"] = [str(show_tile).upper()] update["showTile"] = [str(show_tile).upper()]
try: try:
ldap.update("cn=%s,ou=permission" % permission, update) ldap.update(f"cn={permission},ou=permission", update)
except Exception as e: except Exception as e:
raise YunohostError("permission_update_failed", permission=permission, error=e) raise YunohostError("permission_update_failed", permission=permission, error=e)

View file

@ -449,7 +449,7 @@ def _save_regenconf_infos(infos):
yaml.safe_dump(infos, f, default_flow_style=False) yaml.safe_dump(infos, f, default_flow_style=False)
except Exception as e: except Exception as e:
logger.warning( logger.warning(
"Error while saving regenconf infos, exception: %s", e, exc_info=1 f"Error while saving regenconf infos, exception: {e}", exc_info=1
) )
raise raise
@ -506,7 +506,7 @@ def _calculate_hash(path):
except IOError as e: except IOError as e:
logger.warning( logger.warning(
"Error while calculating file '%s' hash: %s", path, e, exc_info=1 f"Error while calculating file '{path}' hash: {e}", exc_info=1
) )
return None return None
@ -559,11 +559,11 @@ def _get_conf_hashes(category):
categories = _get_regenconf_infos() categories = _get_regenconf_infos()
if category not in categories: if category not in categories:
logger.debug("category %s is not in categories.yml yet.", category) logger.debug(f"category {category} is not in categories.yml yet.")
return {} return {}
elif categories[category] is None or "conffiles" not in categories[category]: elif categories[category] is None or "conffiles" not in categories[category]:
logger.debug("No configuration files for category %s.", category) logger.debug(f"No configuration files for category {category}.")
return {} return {}
else: else:
@ -572,7 +572,7 @@ def _get_conf_hashes(category):
def _update_conf_hashes(category, hashes): def _update_conf_hashes(category, hashes):
"""Update the registered conf hashes for a category""" """Update the registered conf hashes for a category"""
logger.debug("updating conf hashes for '%s' with: %s", category, hashes) logger.debug(f"updating conf hashes for '{category}' with: {hashes}")
categories = _get_regenconf_infos() categories = _get_regenconf_infos()
category_conf = categories.get(category, {}) category_conf = categories.get(category, {})
@ -603,8 +603,7 @@ def _force_clear_hashes(paths):
for category in categories.keys(): for category in categories.keys():
if path in categories[category]["conffiles"]: if path in categories[category]["conffiles"]:
logger.debug( logger.debug(
"force-clearing old conf hash for %s in category %s" f"force-clearing old conf hash for {path} in category {category}"
% (path, category)
) )
del categories[category]["conffiles"][path] del categories[category]["conffiles"][path]
@ -647,9 +646,7 @@ def _process_regen_conf(system_conf, new_conf=None, save=True):
logger.debug(m18n.n("regenconf_file_updated", conf=system_conf)) logger.debug(m18n.n("regenconf_file_updated", conf=system_conf))
except Exception as e: except Exception as e:
logger.warning( logger.warning(
"Exception while trying to regenerate conf '%s': %s", f"Exception while trying to regenerate conf '{system_conf}': {e}",
system_conf,
e,
exc_info=1, exc_info=1,
) )
if not new_conf and os.path.exists(system_conf): if not new_conf and os.path.exists(system_conf):

View file

@ -407,8 +407,7 @@ def _get_and_format_service_status(service, infos):
if raw_status is None: if raw_status is None:
logger.error( logger.error(
"Failed to get status information via dbus for service %s, systemctl didn't recognize this service ('NoSuchUnit')." f"Failed to get status information via dbus for service {systemd_service}, systemctl didn't recognize this service ('NoSuchUnit')."
% systemd_service
) )
return { return {
"status": "unknown", "status": "unknown",
@ -424,7 +423,7 @@ def _get_and_format_service_status(service, infos):
# If no description was there, try to get it from the .json locales # If no description was there, try to get it from the .json locales
if not description: if not description:
translation_key = "service_description_%s" % service translation_key = f"service_description_{service}"
if m18n.key_exists(translation_key): if m18n.key_exists(translation_key):
description = m18n.n(translation_key) description = m18n.n(translation_key)
else: else:
@ -445,7 +444,7 @@ def _get_and_format_service_status(service, infos):
"enabled" if glob("/etc/rc[S5].d/S??" + service) else "disabled" "enabled" if glob("/etc/rc[S5].d/S??" + service) else "disabled"
) )
elif os.path.exists( elif os.path.exists(
"/etc/systemd/system/multi-user.target.wants/%s.service" % service f"/etc/systemd/system/multi-user.target.wants/{service}.service"
): ):
output["start_on_boot"] = "enabled" output["start_on_boot"] = "enabled"
@ -585,8 +584,7 @@ def _run_service_command(action, service):
] ]
if action not in possible_actions: if action not in possible_actions:
raise ValueError( raise ValueError(
"Unknown action '%s', available actions are: %s" f"Unknown action '{action}', available actions are: {', '.join(possible_actions)}"
% (action, ", ".join(possible_actions))
) )
cmd = f"systemctl {action} {service}" cmd = f"systemctl {action} {service}"
@ -604,7 +602,7 @@ def _run_service_command(action, service):
try: try:
# Launch the command # Launch the command
logger.debug("Running '%s'" % cmd) logger.debug(f"Running '{cmd}'")
p = subprocess.Popen(cmd.split(), stderr=subprocess.STDOUT) p = subprocess.Popen(cmd.split(), stderr=subprocess.STDOUT)
# If this command needs a lock (because the service uses yunohost # If this command needs a lock (because the service uses yunohost
# commands inside), find the PID and add a lock for it # commands inside), find the PID and add a lock for it
@ -651,7 +649,7 @@ def _give_lock(action, service, p):
if son_PID != 0: if son_PID != 0:
# Append the PID to the lock file # Append the PID to the lock file
logger.debug(f"Giving a lock to PID {son_PID} for service {service} !") logger.debug(f"Giving a lock to PID {son_PID} for service {service} !")
append_to_file(MOULINETTE_LOCK, "\n%s" % str(son_PID)) append_to_file(MOULINETTE_LOCK, f"\n{son_PID}")
return son_PID return son_PID
@ -815,7 +813,7 @@ def _find_previous_log_file(file):
i = int(i[0]) + 1 if len(i) > 0 else 1 i = int(i[0]) + 1 if len(i) > 0 else 1
previous_file = file if i == 1 else splitext[0] previous_file = file if i == 1 else splitext[0]
previous_file = previous_file + ".%d" % (i) previous_file = previous_file + f".{i}"
if os.path.exists(previous_file): if os.path.exists(previous_file):
return previous_file return previous_file
@ -835,8 +833,7 @@ def _get_journalctl_logs(service, number="all"):
) )
except Exception: except Exception:
import traceback import traceback
trace_ = traceback.format_exc()
return ( return (
"error while get services logs from journalctl:\n%s" f"error while get services logs from journalctl:\n{trace_}"
% traceback.format_exc()
) )

View file

@ -285,7 +285,7 @@ def settings_reset_all():
def _get_setting_description(key): def _get_setting_description(key):
return m18n.n("global_settings_setting_%s" % key.replace(".", "_")) return m18n.n(f"global_settings_setting_{key}".replace(".", "_"))
def _get_settings(): def _get_settings():
@ -315,7 +315,7 @@ def _get_settings():
try: try:
unknown_settings = json.load(open(unknown_settings_path, "r")) unknown_settings = json.load(open(unknown_settings_path, "r"))
except Exception as e: except Exception as e:
logger.warning("Error while loading unknown settings %s" % e) logger.warning(f"Error while loading unknown settings {e}")
try: try:
with open(SETTINGS_PATH) as settings_fd: with open(SETTINGS_PATH) as settings_fd:
@ -342,7 +342,7 @@ def _get_settings():
_save_settings(settings) _save_settings(settings)
except Exception as e: except Exception as e:
logger.warning( logger.warning(
"Failed to save unknown settings (because %s), aborting." % e f"Failed to save unknown settings (because {e}), aborting."
) )
return settings return settings
@ -374,11 +374,10 @@ post_change_hooks = {}
def post_change_hook(setting_name): def post_change_hook(setting_name):
def decorator(func): def decorator(func):
assert setting_name in DEFAULTS.keys(), ( assert setting_name in DEFAULTS.keys(), (
"The setting %s does not exists" % setting_name f"The setting {setting_name} does not exists"
) )
assert setting_name not in post_change_hooks, ( assert setting_name not in post_change_hooks, (
"You can only register one post change hook per setting (in particular for %s)" f"You can only register one post change hook per setting (in particular for {setting_name})"
% setting_name
) )
post_change_hooks[setting_name] = func post_change_hooks[setting_name] = func
return func return func
@ -388,7 +387,7 @@ def post_change_hook(setting_name):
def trigger_post_change_hook(setting_name, old_value, new_value): def trigger_post_change_hook(setting_name, old_value, new_value):
if setting_name not in post_change_hooks: if setting_name not in post_change_hooks:
logger.debug("Nothing to do after changing setting %s" % setting_name) logger.debug(f"Nothing to do after changing setting {setting_name}")
return return
f = post_change_hooks[setting_name] f = post_change_hooks[setting_name]

View file

@ -99,7 +99,7 @@ def user_ssh_remove_key(username, key):
if not os.path.exists(authorized_keys_file): if not os.path.exists(authorized_keys_file):
raise YunohostValidationError( raise YunohostValidationError(
"this key doesn't exists ({} dosesn't exists)".format(authorized_keys_file), f"this key doesn't exists ({authorized_keys_file} dosesn't exists)",
raw_msg=True, raw_msg=True,
) )
@ -107,7 +107,7 @@ def user_ssh_remove_key(username, key):
if key not in authorized_keys_content: if key not in authorized_keys_content:
raise YunohostValidationError( raise YunohostValidationError(
"Key '{}' is not present in authorized_keys".format(key), raw_msg=True f"Key '{key}' is not present in authorized_keys", raw_msg=True
) )
# don't delete the previous comment because we can't verify if it's legit # don't delete the previous comment because we can't verify if it's legit

View file

@ -99,7 +99,7 @@ def tools_adminpw(new_password, check_strength=True):
{"userPassword": [new_hash]}, {"userPassword": [new_hash]},
) )
except Exception as e: except Exception as e:
logger.error("unable to change admin password : %s" % e) logger.error(f"unable to change admin password : {e}")
raise YunohostError("admin_password_change_failed") raise YunohostError("admin_password_change_failed")
else: else:
# Write as root password # Write as root password
@ -146,7 +146,7 @@ def _set_hostname(hostname, pretty_hostname=None):
""" """
if not pretty_hostname: if not pretty_hostname:
pretty_hostname = "(YunoHost/%s)" % hostname pretty_hostname = f"(YunoHost/{hostname})"
# First clear nsswitch cache for hosts to make sure hostname is resolved... # First clear nsswitch cache for hosts to make sure hostname is resolved...
subprocess.call(["nscd", "-i", "hosts"]) subprocess.call(["nscd", "-i", "hosts"])
@ -332,7 +332,7 @@ def tools_update(target=None):
if target not in ["system", "apps", "all"]: if target not in ["system", "apps", "all"]:
raise YunohostError( raise YunohostError(
"Unknown target %s, should be 'system', 'apps' or 'all'" % target, f"Unknown target {target}, should be 'system', 'apps' or 'all'",
raw_msg=True, raw_msg=True,
) )
@ -479,7 +479,7 @@ def tools_upgrade(
try: try:
app_upgrade(app=upgradable_apps) app_upgrade(app=upgradable_apps)
except Exception as e: except Exception as e:
logger.warning("unable to upgrade apps: %s" % str(e)) logger.warning(f"unable to upgrade apps: {e}")
logger.error(m18n.n("app_upgrade_some_app_failed")) logger.error(m18n.n("app_upgrade_some_app_failed"))
return return
@ -885,7 +885,7 @@ def _get_migration_by_name(migration_name):
try: try:
from . import migrations from . import migrations
except ImportError: except ImportError:
raise AssertionError("Unable to find migration with name %s" % migration_name) raise AssertionError(f"Unable to find migration with name {migration_name}")
migrations_path = migrations.__path__[0] migrations_path = migrations.__path__[0]
migrations_found = [ migrations_found = [
@ -895,7 +895,7 @@ def _get_migration_by_name(migration_name):
] ]
assert len(migrations_found) == 1, ( assert len(migrations_found) == 1, (
"Unable to find migration with name %s" % migration_name f"Unable to find migration with name {migration_name}"
) )
return _load_migration(migrations_found[0]) return _load_migration(migrations_found[0])
@ -1019,7 +1019,7 @@ class Migration:
@property @property
def description(self): def description(self):
return m18n.n("migration_description_%s" % self.id) return m18n.n(f"migration_description_{self.id}")
def ldap_migration(self, run): def ldap_migration(self, run):
def func(self): def func(self):

View file

@ -163,7 +163,7 @@ def user_create(
maindomain = _get_maindomain() maindomain = _get_maindomain()
domain = Moulinette.prompt( domain = Moulinette.prompt(
m18n.n("ask_user_domain") + " (default: %s)" % maindomain m18n.n("ask_user_domain") + f" (default: {maindomain})"
) )
if not domain: if not domain:
domain = maindomain domain = maindomain
@ -237,7 +237,7 @@ def user_create(
attr_dict["mail"] = [attr_dict["mail"]] + aliases attr_dict["mail"] = [attr_dict["mail"]] + aliases
try: try:
ldap.add("uid=%s,ou=users" % username, attr_dict) ldap.add(f"uid={username},ou=users", attr_dict)
except Exception as e: except Exception as e:
raise YunohostError("user_creation_failed", user=username, error=e) raise YunohostError("user_creation_failed", user=username, error=e)
@ -255,10 +255,10 @@ def user_create(
try: try:
subprocess.check_call( subprocess.check_call(
["setfacl", "-m", "g:all_users:---", "/home/%s" % username] ["setfacl", "-m", "g:all_users:---", f"/home/{username}"]
) )
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
logger.warning("Failed to protect /home/%s" % username, exc_info=1) logger.warning(f"Failed to protect /home/{username}", exc_info=1)
# Create group for user and add to group 'all_users' # Create group for user and add to group 'all_users'
user_group_create(groupname=username, gid=uid, primary_group=True, sync_perm=False) user_group_create(groupname=username, gid=uid, primary_group=True, sync_perm=False)
@ -318,7 +318,7 @@ def user_delete(operation_logger, username, purge=False, from_import=False):
ldap = _get_ldap_interface() ldap = _get_ldap_interface()
try: try:
ldap.remove("uid=%s,ou=users" % username) ldap.remove(f"uid={username},ou=users")
except Exception as e: except Exception as e:
raise YunohostError("user_deletion_failed", user=username, error=e) raise YunohostError("user_deletion_failed", user=username, error=e)
@ -506,7 +506,7 @@ def user_update(
operation_logger.start() operation_logger.start()
try: try:
ldap.update("uid=%s,ou=users" % username, new_attr_dict) ldap.update(f"uid={username},ou=users", new_attr_dict)
except Exception as e: except Exception as e:
raise YunohostError("user_update_failed", user=username, error=e) raise YunohostError("user_update_failed", user=username, error=e)
@ -577,11 +577,11 @@ def user_info(username):
logger.warning(m18n.n("mailbox_disabled", user=username)) logger.warning(m18n.n("mailbox_disabled", user=username))
else: else:
try: try:
cmd = "doveadm -f flow quota get -u %s" % user["uid"][0] uid_ = user["uid"][0]
cmd_result = check_output(cmd) cmd_result = check_output(f"doveadm -f flow quota get -u {uid_}")
except Exception as e: except Exception as e:
cmd_result = "" cmd_result = ""
logger.warning("Failed to fetch quota info ... : %s " % str(e)) logger.warning(f"Failed to fetch quota info ... : {e}")
# Exemple of return value for cmd: # Exemple of return value for cmd:
# """Quota name=User quota Type=STORAGE Value=0 Limit=- %=0 # """Quota name=User quota Type=STORAGE Value=0 Limit=- %=0
@ -707,8 +707,7 @@ def user_import(operation_logger, csvfile, update=False, delete=False):
unknown_groups = [g for g in user["groups"] if g not in existing_groups] unknown_groups = [g for g in user["groups"] if g not in existing_groups]
if unknown_groups: if unknown_groups:
format_errors.append( format_errors.append(
f"username '{user['username']}': unknown groups %s" f"username '{user['username']}': unknown groups {', '.join(unknown_groups)}"
% ", ".join(unknown_groups)
) )
# Validate that domains exist # Validate that domains exist
@ -729,8 +728,7 @@ def user_import(operation_logger, csvfile, update=False, delete=False):
if unknown_domains: if unknown_domains:
format_errors.append( format_errors.append(
f"username '{user['username']}': unknown domains %s" f"username '{user['username']}': unknown domains {', '.join(unknown_domains)}"
% ", ".join(unknown_domains)
) )
if format_errors: if format_errors:
@ -1002,7 +1000,7 @@ def user_group_create(
m18n.n("group_already_exist_on_system_but_removing_it", group=groupname) m18n.n("group_already_exist_on_system_but_removing_it", group=groupname)
) )
subprocess.check_call( subprocess.check_call(
"sed --in-place '/^%s:/d' /etc/group" % groupname, shell=True f"sed --in-place '/^{groupname}:/d' /etc/group", shell=True
) )
else: else:
raise YunohostValidationError( raise YunohostValidationError(
@ -1032,7 +1030,7 @@ def user_group_create(
operation_logger.start() operation_logger.start()
try: try:
ldap.add("cn=%s,ou=groups" % groupname, attr_dict) ldap.add(f"cn={groupname},ou=groups", attr_dict)
except Exception as e: except Exception as e:
raise YunohostError("group_creation_failed", group=groupname, error=e) raise YunohostError("group_creation_failed", group=groupname, error=e)
@ -1075,7 +1073,7 @@ def user_group_delete(operation_logger, groupname, force=False, sync_perm=True):
operation_logger.start() operation_logger.start()
ldap = _get_ldap_interface() ldap = _get_ldap_interface()
try: try:
ldap.remove("cn=%s,ou=groups" % groupname) ldap.remove(f"cn={groupname},ou=groups")
except Exception as e: except Exception as e:
raise YunohostError("group_deletion_failed", group=groupname, error=e) raise YunohostError("group_deletion_failed", group=groupname, error=e)
@ -1171,7 +1169,7 @@ def user_group_update(
ldap = _get_ldap_interface() ldap = _get_ldap_interface()
try: try:
ldap.update( ldap.update(
"cn=%s,ou=groups" % groupname, f"cn={groupname},ou=groups",
{"member": set(new_group_dns), "memberUid": set(new_group)}, {"member": set(new_group_dns), "memberUid": set(new_group)},
) )
except Exception as e: except Exception as e:

View file

@ -117,7 +117,7 @@ def _patch_legacy_php_versions(app_folder):
c = ( c = (
"sed -i " "sed -i "
+ "".join( + "".join(
"-e 's@{pattern}@{replace}@g' ".format(pattern=p, replace=r) f"-e 's@{p}@{r}@g' "
for p, r in LEGACY_PHP_VERSION_REPLACEMENTS for p, r in LEGACY_PHP_VERSION_REPLACEMENTS
) )
+ "%s" % filename + "%s" % filename