From 6e63b6fc53e6e95fff1c936898fca436d62ef105 Mon Sep 17 00:00:00 2001 From: yunohost-bot Date: Mon, 17 Jul 2023 16:00:11 +0000 Subject: [PATCH] [CI] Format code with Black --- src/app.py | 14 +++++------ src/dns.py | 1 + src/domain.py | 34 ++++++++++++++++++++----- src/dyndns.py | 49 +++++++++++++++++++++++++------------ src/tests/test_domains.py | 12 ++++++--- src/tests/test_questions.py | 2 +- src/tools.py | 10 ++++++-- src/utils/resources.py | 25 +++++++++++++------ 8 files changed, 104 insertions(+), 43 deletions(-) diff --git a/src/app.py b/src/app.py index ab27f8818..6f18d341f 100644 --- a/src/app.py +++ b/src/app.py @@ -2360,9 +2360,7 @@ def _set_default_ask_questions(questions, script_name="install"): for question_with_default in questions_with_default ): # The key is for example "app_manifest_install_ask_domain" - question["ask"] = m18n.n( - f"app_manifest_{script_name}_ask_{question['id']}" - ) + question["ask"] = m18n.n(f"app_manifest_{script_name}_ask_{question['id']}") # Also it in fact doesn't make sense for any of those questions to have an example value nor a default value... if question.get("type") in ["domain", "user", "password"]: @@ -3212,7 +3210,6 @@ def _ask_confirmation( def regen_mail_app_user_config_for_dovecot_and_postfix(only=None): - dovecot = True if only in [None, "dovecot"] else False postfix = True if only in [None, "postfix"] else False @@ -3221,7 +3218,6 @@ def regen_mail_app_user_config_for_dovecot_and_postfix(only=None): postfix_map = [] dovecot_passwd = [] for app in _installed_apps(): - settings = _get_app_settings(app) if "domain" not in settings or "mail_pwd" not in settings: @@ -3229,7 +3225,9 @@ def regen_mail_app_user_config_for_dovecot_and_postfix(only=None): if dovecot: hashed_password = _hash_user_password(settings["mail_pwd"]) - dovecot_passwd.append(f"{app}:{hashed_password}::::::allow_nets=127.0.0.1/24") + dovecot_passwd.append( + f"{app}:{hashed_password}::::::allow_nets=127.0.0.1/24" + ) if postfix: mail_user = settings.get("mail_user", app) mail_domain = settings.get("mail_domain", settings["domain"]) @@ -3238,7 +3236,7 @@ def regen_mail_app_user_config_for_dovecot_and_postfix(only=None): if dovecot: app_senders_passwd = "/etc/dovecot/app-senders-passwd" content = "# This file is regenerated automatically.\n# Please DO NOT edit manually ... changes will be overwritten!" - content += '\n' + '\n'.join(dovecot_passwd) + content += "\n" + "\n".join(dovecot_passwd) write_to_file(app_senders_passwd, content) chmod(app_senders_passwd, 0o440) chown(app_senders_passwd, "root", "dovecot") @@ -3246,7 +3244,7 @@ def regen_mail_app_user_config_for_dovecot_and_postfix(only=None): if postfix: app_senders_map = "/etc/postfix/app_senders_login_maps" content = "# This file is regenerated automatically.\n# Please DO NOT edit manually ... changes will be overwritten!" - content += '\n' + '\n'.join(postfix_map) + content += "\n" + "\n".join(postfix_map) write_to_file(app_senders_map, content) chmod(app_senders_map, 0o440) chown(app_senders_map, "postfix", "root") diff --git a/src/dns.py b/src/dns.py index d514d1b17..e25d0f3ec 100644 --- a/src/dns.py +++ b/src/dns.py @@ -641,6 +641,7 @@ def domain_dns_push(operation_logger, domain, dry_run=False, force=False, purge= # FIXME: in the future, properly unify this with yunohost dyndns update if registrar == "yunohost": from yunohost.dyndns import dyndns_update + dyndns_update(domain=domain, force=force) return {} diff --git a/src/domain.py b/src/domain.py index 3c648ad4f..8fc9799cd 100644 --- a/src/domain.py +++ b/src/domain.py @@ -212,7 +212,9 @@ def _get_parent_domain_of(domain, return_self=False, topest=False): @is_unit_operation(exclude=["dyndns_recovery_password"]) -def domain_add(operation_logger, domain, dyndns_recovery_password=None, ignore_dyndns=False): +def domain_add( + operation_logger, domain, dyndns_recovery_password=None, ignore_dyndns=False +): """ Create a custom domain @@ -252,9 +254,14 @@ def domain_add(operation_logger, domain, dyndns_recovery_password=None, ignore_d domain = domain.encode("idna").decode("utf-8") # Detect if this is a DynDNS domain ( and not a subdomain of a DynDNS domain ) - dyndns = not ignore_dyndns and is_yunohost_dyndns_domain(domain) and len(domain.split(".")) == 3 + dyndns = ( + not ignore_dyndns + and is_yunohost_dyndns_domain(domain) + and len(domain.split(".")) == 3 + ) if dyndns: from yunohost.dyndns import is_subscribing_allowed + # Do not allow to subscribe to multiple dyndns domains... if not is_subscribing_allowed(): raise YunohostValidationError("domain_dyndns_already_subscribed") @@ -264,7 +271,9 @@ def domain_add(operation_logger, domain, dyndns_recovery_password=None, ignore_d operation_logger.start() if dyndns: - domain_dyndns_subscribe(domain=domain, recovery_password=dyndns_recovery_password) + domain_dyndns_subscribe( + domain=domain, recovery_password=dyndns_recovery_password + ) _certificate_install_selfsigned([domain], True) @@ -314,7 +323,14 @@ def domain_add(operation_logger, domain, dyndns_recovery_password=None, ignore_d @is_unit_operation(exclude=["dyndns_recovery_password"]) -def domain_remove(operation_logger, domain, remove_apps=False, force=False, dyndns_recovery_password=None, ignore_dyndns=False): +def domain_remove( + operation_logger, + domain, + remove_apps=False, + force=False, + dyndns_recovery_password=None, + ignore_dyndns=False, +): """ Delete domains @@ -394,7 +410,11 @@ def domain_remove(operation_logger, domain, remove_apps=False, force=False, dynd ) # Detect if this is a DynDNS domain ( and not a subdomain of a DynDNS domain ) - dyndns = not ignore_dyndns and is_yunohost_dyndns_domain(domain) and len(domain.split(".")) == 3 + dyndns = ( + not ignore_dyndns + and is_yunohost_dyndns_domain(domain) + and len(domain.split(".")) == 3 + ) operation_logger.start() @@ -445,7 +465,9 @@ def domain_remove(operation_logger, domain, remove_apps=False, force=False, dynd # If a password is provided, delete the DynDNS record if dyndns: # Actually unsubscribe - domain_dyndns_unsubscribe(domain=domain, recovery_password=dyndns_recovery_password) + domain_dyndns_unsubscribe( + domain=domain, recovery_password=dyndns_recovery_password + ) logger.success(m18n.n("domain_deleted")) diff --git a/src/dyndns.py b/src/dyndns.py index c3fa80d3a..a3afd655f 100644 --- a/src/dyndns.py +++ b/src/dyndns.py @@ -106,9 +106,7 @@ def dyndns_subscribe(operation_logger, domain=None, recovery_password=None): if not recovery_password and Moulinette.interface.type == "cli": logger.warning(m18n.n("ask_dyndns_recovery_password_explain")) recovery_password = Moulinette.prompt( - m18n.n("ask_dyndns_recovery_password"), - is_password=True, - confirm=True + m18n.n("ask_dyndns_recovery_password"), is_password=True, confirm=True ) if not recovery_password: @@ -116,6 +114,7 @@ def dyndns_subscribe(operation_logger, domain=None, recovery_password=None): if recovery_password: from yunohost.utils.password import assert_password_is_strong_enough + assert_password_is_strong_enough("admin", recovery_password) operation_logger.data_to_redact.append(recovery_password) @@ -158,7 +157,9 @@ def dyndns_subscribe(operation_logger, domain=None, recovery_password=None): b64encoded_key = base64.b64encode(secret.encode()).decode() data = {"subdomain": domain} if recovery_password: - data["recovery_password"] = hashlib.sha256((domain + ":" + recovery_password.strip()).encode('utf-8')).hexdigest() + data["recovery_password"] = hashlib.sha256( + (domain + ":" + recovery_password.strip()).encode("utf-8") + ).hexdigest() r = requests.post( f"https://{DYNDNS_PROVIDER}/key/{b64encoded_key}?key_algo=hmac-sha512", data=data, @@ -214,18 +215,23 @@ def dyndns_unsubscribe(operation_logger, domain, recovery_password=None): # Otherwise, ask for the recovery password else: if Moulinette.interface.type == "cli" and not recovery_password: - logger.warning(m18n.n("ask_dyndns_recovery_password_explain_during_unsubscribe")) + logger.warning( + m18n.n("ask_dyndns_recovery_password_explain_during_unsubscribe") + ) recovery_password = Moulinette.prompt( - m18n.n("ask_dyndns_recovery_password"), - is_password=True + m18n.n("ask_dyndns_recovery_password"), is_password=True ) if not recovery_password: - logger.error(f"Cannot unsubscribe the domain {domain}: no credential provided") + logger.error( + f"Cannot unsubscribe the domain {domain}: no credential provided" + ) return secret = str(domain) + ":" + str(recovery_password).strip() - credential = {"recovery_password": hashlib.sha256(secret.encode('utf-8')).hexdigest()} + credential = { + "recovery_password": hashlib.sha256(secret.encode("utf-8")).hexdigest() + } operation_logger.start() @@ -250,19 +256,22 @@ def dyndns_unsubscribe(operation_logger, domain, recovery_password=None): elif r.status_code == 409: raise YunohostError("dyndns_unsubscribe_already_unsubscribed") else: - raise YunohostError("dyndns_unsubscribe_failed", error=f"The server returned code {r.status_code}") + raise YunohostError( + "dyndns_unsubscribe_failed", + error=f"The server returned code {r.status_code}", + ) logger.success(m18n.n("dyndns_unsubscribed")) def dyndns_set_recovery_password(domain, recovery_password): - keys = glob.glob(f"/etc/yunohost/dyndns/K{domain}.+*.key") if not keys: raise YunohostValidationError("dyndns_key_not_found") from yunohost.utils.password import assert_password_is_strong_enough + assert_password_is_strong_enough("admin", recovery_password) secret = str(domain) + ":" + str(recovery_password).strip() @@ -277,7 +286,10 @@ def dyndns_set_recovery_password(domain, recovery_password): try: r = requests.put( f"https://{DYNDNS_PROVIDER}/domains/{domain}/recovery_password", - data={"key": base64key, "recovery_password": hashlib.sha256(secret.encode('utf-8')).hexdigest()}, + data={ + "key": base64key, + "recovery_password": hashlib.sha256(secret.encode("utf-8")).hexdigest(), + }, timeout=30, ) except Exception as e: @@ -292,7 +304,10 @@ def dyndns_set_recovery_password(domain, recovery_password): elif r.status_code == 409: raise YunohostError("dyndns_set_recovery_password_invalid_password") else: - raise YunohostError("dyndns_set_recovery_password_failed", error=f"The server returned code {r.status_code}") + raise YunohostError( + "dyndns_set_recovery_password_failed", + error=f"The server returned code {r.status_code}", + ) def dyndns_list(): @@ -303,7 +318,12 @@ def dyndns_list(): from yunohost.domain import domain_list domains = domain_list(exclude_subdomains=True)["domains"] - dyndns_domains = [d for d in domains if is_yunohost_dyndns_domain(d) and glob.glob(f"/etc/yunohost/dyndns/K{d}.+*.key")] + dyndns_domains = [ + d + for d in domains + if is_yunohost_dyndns_domain(d) + and glob.glob(f"/etc/yunohost/dyndns/K{d}.+*.key") + ] return {"domains": dyndns_domains} @@ -330,7 +350,6 @@ def dyndns_update( # If domain is not given, update all DynDNS domains if domain is None: - dyndns_domains = dyndns_list()["domains"] if not dyndns_domains: diff --git a/src/tests/test_domains.py b/src/tests/test_domains.py index c5c1ab7ae..1bbbb7890 100644 --- a/src/tests/test_domains.py +++ b/src/tests/test_domains.py @@ -18,7 +18,11 @@ from yunohost.domain import ( ) TEST_DOMAINS = ["example.tld", "sub.example.tld", "other-example.com"] -TEST_DYNDNS_DOMAIN = "ci-test-" + "".join(chr(random.randint(ord("a"), ord("z"))) for x in range(12)) + random.choice([".noho.st", ".ynh.fr", ".nohost.me"]) +TEST_DYNDNS_DOMAIN = ( + "ci-test-" + + "".join(chr(random.randint(ord("a"), ord("z"))) for x in range(12)) + + random.choice([".noho.st", ".ynh.fr", ".nohost.me"]) +) TEST_DYNDNS_PASSWORD = "astrongandcomplicatedpassphrasethatisverysecure" @@ -38,7 +42,9 @@ def setup_function(function): # Clear other domains for domain in domains: - if (domain not in TEST_DOMAINS or domain == TEST_DOMAINS[2]) and domain != TEST_DYNDNS_DOMAIN: + if ( + domain not in TEST_DOMAINS or domain == TEST_DOMAINS[2] + ) and domain != TEST_DYNDNS_DOMAIN: # Clean domains not used for testing domain_remove(domain) elif domain in TEST_DOMAINS: @@ -70,7 +76,6 @@ def test_domain_add(): def test_domain_add_subscribe(): - time.sleep(35) # Dynette blocks requests that happen too frequently assert TEST_DYNDNS_DOMAIN not in domain_list()["domains"] domain_add(TEST_DYNDNS_DOMAIN, dyndns_recovery_password=TEST_DYNDNS_PASSWORD) @@ -90,7 +95,6 @@ def test_domain_remove(): def test_domain_remove_unsubscribe(): - time.sleep(35) # Dynette blocks requests that happen too frequently assert TEST_DYNDNS_DOMAIN in domain_list()["domains"] domain_remove(TEST_DYNDNS_DOMAIN, dyndns_recovery_password=TEST_DYNDNS_PASSWORD) diff --git a/src/tests/test_questions.py b/src/tests/test_questions.py index a9b61aad9..a695e834d 100644 --- a/src/tests/test_questions.py +++ b/src/tests/test_questions.py @@ -1968,7 +1968,7 @@ def test_option_default_type_with_choices_is_select(): "some_choices": {"choices": ["a", "b"]}, # LEGACY (`choices` in option `string` used to be valid) # make sure this result as a `select` option - "some_legacy": {"type": "string", "choices": ["a", "b"]} + "some_legacy": {"type": "string", "choices": ["a", "b"]}, } answers = {"some_choices": "a", "some_legacy": "a"} diff --git a/src/tools.py b/src/tools.py index 7704a99b5..cd48f00ee 100644 --- a/src/tools.py +++ b/src/tools.py @@ -213,7 +213,9 @@ def tools_postinstall( # connectivity or something. Assume that this domain isn't manageable # and inform the user that we could not contact the dyndns host server. except Exception: - raise YunohostValidationError("dyndns_provider_unreachable", provider="dyndns.yunohost.org") + raise YunohostValidationError( + "dyndns_provider_unreachable", provider="dyndns.yunohost.org" + ) else: if not available: raise YunohostValidationError("dyndns_unavailable", domain=domain) @@ -228,7 +230,11 @@ def tools_postinstall( logger.info(m18n.n("yunohost_installing")) # New domain config - domain_add(domain, dyndns_recovery_password=dyndns_recovery_password, ignore_dyndns=ignore_dyndns) + domain_add( + domain, + dyndns_recovery_password=dyndns_recovery_password, + ignore_dyndns=ignore_dyndns, + ) domain_main_domain(domain) # First user diff --git a/src/utils/resources.py b/src/utils/resources.py index be7f9fba5..60a5f44f6 100644 --- a/src/utils/resources.py +++ b/src/utils/resources.py @@ -716,7 +716,6 @@ class SystemuserAppResource(AppResource): home: str = "" def provision_or_update(self, context: Dict = {}): - from yunohost.app import regen_mail_app_user_config_for_dovecot_and_postfix # FIXME : validate that no yunohost user exists with that name? @@ -773,13 +772,19 @@ class SystemuserAppResource(AppResource): regen_mail_app_user_config_for_dovecot_and_postfix() else: self.delete_setting("mail_pwd") - if os.system(f"grep --quiet ' {self.app}$' /etc/postfix/app_senders_login_maps") == 0 \ - or os.system(f"grep --quiet '^{self.app}:' /etc/dovecot/app-senders-passwd") == 0: + if ( + os.system( + f"grep --quiet ' {self.app}$' /etc/postfix/app_senders_login_maps" + ) + == 0 + or os.system( + f"grep --quiet '^{self.app}:' /etc/dovecot/app-senders-passwd" + ) + == 0 + ): regen_mail_app_user_config_for_dovecot_and_postfix() - def deprovision(self, context: Dict = {}): - from yunohost.app import regen_mail_app_user_config_for_dovecot_and_postfix if os.system(f"getent passwd {self.app} >/dev/null 2>/dev/null") == 0: @@ -797,8 +802,14 @@ class SystemuserAppResource(AppResource): ) self.delete_setting("mail_pwd") - if os.system(f"grep --quiet ' {self.app}$' /etc/postfix/app_senders_login_maps") == 0 \ - or os.system(f"grep --quiet '^{self.app}:' /etc/dovecot/app-senders-passwd") == 0: + if ( + os.system( + f"grep --quiet ' {self.app}$' /etc/postfix/app_senders_login_maps" + ) + == 0 + or os.system(f"grep --quiet '^{self.app}:' /etc/dovecot/app-senders-passwd") + == 0 + ): regen_mail_app_user_config_for_dovecot_and_postfix() # FIXME : better logging and error handling, add stdout/stderr from the deluser/delgroup commands...