[CI] Format code with Black

This commit is contained in:
yunohost-bot 2023-07-17 16:00:11 +00:00
parent 036119d9ba
commit 6e63b6fc53
8 changed files with 104 additions and 43 deletions

View file

@ -2360,9 +2360,7 @@ def _set_default_ask_questions(questions, script_name="install"):
for question_with_default in questions_with_default
):
# The key is for example "app_manifest_install_ask_domain"
question["ask"] = m18n.n(
f"app_manifest_{script_name}_ask_{question['id']}"
)
question["ask"] = m18n.n(f"app_manifest_{script_name}_ask_{question['id']}")
# Also it in fact doesn't make sense for any of those questions to have an example value nor a default value...
if question.get("type") in ["domain", "user", "password"]:
@ -3212,7 +3210,6 @@ def _ask_confirmation(
def regen_mail_app_user_config_for_dovecot_and_postfix(only=None):
dovecot = True if only in [None, "dovecot"] else False
postfix = True if only in [None, "postfix"] else False
@ -3221,7 +3218,6 @@ def regen_mail_app_user_config_for_dovecot_and_postfix(only=None):
postfix_map = []
dovecot_passwd = []
for app in _installed_apps():
settings = _get_app_settings(app)
if "domain" not in settings or "mail_pwd" not in settings:
@ -3229,7 +3225,9 @@ def regen_mail_app_user_config_for_dovecot_and_postfix(only=None):
if dovecot:
hashed_password = _hash_user_password(settings["mail_pwd"])
dovecot_passwd.append(f"{app}:{hashed_password}::::::allow_nets=127.0.0.1/24")
dovecot_passwd.append(
f"{app}:{hashed_password}::::::allow_nets=127.0.0.1/24"
)
if postfix:
mail_user = settings.get("mail_user", app)
mail_domain = settings.get("mail_domain", settings["domain"])
@ -3238,7 +3236,7 @@ def regen_mail_app_user_config_for_dovecot_and_postfix(only=None):
if dovecot:
app_senders_passwd = "/etc/dovecot/app-senders-passwd"
content = "# This file is regenerated automatically.\n# Please DO NOT edit manually ... changes will be overwritten!"
content += '\n' + '\n'.join(dovecot_passwd)
content += "\n" + "\n".join(dovecot_passwd)
write_to_file(app_senders_passwd, content)
chmod(app_senders_passwd, 0o440)
chown(app_senders_passwd, "root", "dovecot")
@ -3246,7 +3244,7 @@ def regen_mail_app_user_config_for_dovecot_and_postfix(only=None):
if postfix:
app_senders_map = "/etc/postfix/app_senders_login_maps"
content = "# This file is regenerated automatically.\n# Please DO NOT edit manually ... changes will be overwritten!"
content += '\n' + '\n'.join(postfix_map)
content += "\n" + "\n".join(postfix_map)
write_to_file(app_senders_map, content)
chmod(app_senders_map, 0o440)
chown(app_senders_map, "postfix", "root")

View file

@ -641,6 +641,7 @@ def domain_dns_push(operation_logger, domain, dry_run=False, force=False, purge=
# FIXME: in the future, properly unify this with yunohost dyndns update
if registrar == "yunohost":
from yunohost.dyndns import dyndns_update
dyndns_update(domain=domain, force=force)
return {}

View file

@ -212,7 +212,9 @@ def _get_parent_domain_of(domain, return_self=False, topest=False):
@is_unit_operation(exclude=["dyndns_recovery_password"])
def domain_add(operation_logger, domain, dyndns_recovery_password=None, ignore_dyndns=False):
def domain_add(
operation_logger, domain, dyndns_recovery_password=None, ignore_dyndns=False
):
"""
Create a custom domain
@ -252,9 +254,14 @@ def domain_add(operation_logger, domain, dyndns_recovery_password=None, ignore_d
domain = domain.encode("idna").decode("utf-8")
# Detect if this is a DynDNS domain ( and not a subdomain of a DynDNS domain )
dyndns = not ignore_dyndns and is_yunohost_dyndns_domain(domain) and len(domain.split(".")) == 3
dyndns = (
not ignore_dyndns
and is_yunohost_dyndns_domain(domain)
and len(domain.split(".")) == 3
)
if dyndns:
from yunohost.dyndns import is_subscribing_allowed
# Do not allow to subscribe to multiple dyndns domains...
if not is_subscribing_allowed():
raise YunohostValidationError("domain_dyndns_already_subscribed")
@ -264,7 +271,9 @@ def domain_add(operation_logger, domain, dyndns_recovery_password=None, ignore_d
operation_logger.start()
if dyndns:
domain_dyndns_subscribe(domain=domain, recovery_password=dyndns_recovery_password)
domain_dyndns_subscribe(
domain=domain, recovery_password=dyndns_recovery_password
)
_certificate_install_selfsigned([domain], True)
@ -314,7 +323,14 @@ def domain_add(operation_logger, domain, dyndns_recovery_password=None, ignore_d
@is_unit_operation(exclude=["dyndns_recovery_password"])
def domain_remove(operation_logger, domain, remove_apps=False, force=False, dyndns_recovery_password=None, ignore_dyndns=False):
def domain_remove(
operation_logger,
domain,
remove_apps=False,
force=False,
dyndns_recovery_password=None,
ignore_dyndns=False,
):
"""
Delete domains
@ -394,7 +410,11 @@ def domain_remove(operation_logger, domain, remove_apps=False, force=False, dynd
)
# Detect if this is a DynDNS domain ( and not a subdomain of a DynDNS domain )
dyndns = not ignore_dyndns and is_yunohost_dyndns_domain(domain) and len(domain.split(".")) == 3
dyndns = (
not ignore_dyndns
and is_yunohost_dyndns_domain(domain)
and len(domain.split(".")) == 3
)
operation_logger.start()
@ -445,7 +465,9 @@ def domain_remove(operation_logger, domain, remove_apps=False, force=False, dynd
# If a password is provided, delete the DynDNS record
if dyndns:
# Actually unsubscribe
domain_dyndns_unsubscribe(domain=domain, recovery_password=dyndns_recovery_password)
domain_dyndns_unsubscribe(
domain=domain, recovery_password=dyndns_recovery_password
)
logger.success(m18n.n("domain_deleted"))

View file

@ -106,9 +106,7 @@ def dyndns_subscribe(operation_logger, domain=None, recovery_password=None):
if not recovery_password and Moulinette.interface.type == "cli":
logger.warning(m18n.n("ask_dyndns_recovery_password_explain"))
recovery_password = Moulinette.prompt(
m18n.n("ask_dyndns_recovery_password"),
is_password=True,
confirm=True
m18n.n("ask_dyndns_recovery_password"), is_password=True, confirm=True
)
if not recovery_password:
@ -116,6 +114,7 @@ def dyndns_subscribe(operation_logger, domain=None, recovery_password=None):
if recovery_password:
from yunohost.utils.password import assert_password_is_strong_enough
assert_password_is_strong_enough("admin", recovery_password)
operation_logger.data_to_redact.append(recovery_password)
@ -158,7 +157,9 @@ def dyndns_subscribe(operation_logger, domain=None, recovery_password=None):
b64encoded_key = base64.b64encode(secret.encode()).decode()
data = {"subdomain": domain}
if recovery_password:
data["recovery_password"] = hashlib.sha256((domain + ":" + recovery_password.strip()).encode('utf-8')).hexdigest()
data["recovery_password"] = hashlib.sha256(
(domain + ":" + recovery_password.strip()).encode("utf-8")
).hexdigest()
r = requests.post(
f"https://{DYNDNS_PROVIDER}/key/{b64encoded_key}?key_algo=hmac-sha512",
data=data,
@ -214,18 +215,23 @@ def dyndns_unsubscribe(operation_logger, domain, recovery_password=None):
# Otherwise, ask for the recovery password
else:
if Moulinette.interface.type == "cli" and not recovery_password:
logger.warning(m18n.n("ask_dyndns_recovery_password_explain_during_unsubscribe"))
logger.warning(
m18n.n("ask_dyndns_recovery_password_explain_during_unsubscribe")
)
recovery_password = Moulinette.prompt(
m18n.n("ask_dyndns_recovery_password"),
is_password=True
m18n.n("ask_dyndns_recovery_password"), is_password=True
)
if not recovery_password:
logger.error(f"Cannot unsubscribe the domain {domain}: no credential provided")
logger.error(
f"Cannot unsubscribe the domain {domain}: no credential provided"
)
return
secret = str(domain) + ":" + str(recovery_password).strip()
credential = {"recovery_password": hashlib.sha256(secret.encode('utf-8')).hexdigest()}
credential = {
"recovery_password": hashlib.sha256(secret.encode("utf-8")).hexdigest()
}
operation_logger.start()
@ -250,19 +256,22 @@ def dyndns_unsubscribe(operation_logger, domain, recovery_password=None):
elif r.status_code == 409:
raise YunohostError("dyndns_unsubscribe_already_unsubscribed")
else:
raise YunohostError("dyndns_unsubscribe_failed", error=f"The server returned code {r.status_code}")
raise YunohostError(
"dyndns_unsubscribe_failed",
error=f"The server returned code {r.status_code}",
)
logger.success(m18n.n("dyndns_unsubscribed"))
def dyndns_set_recovery_password(domain, recovery_password):
keys = glob.glob(f"/etc/yunohost/dyndns/K{domain}.+*.key")
if not keys:
raise YunohostValidationError("dyndns_key_not_found")
from yunohost.utils.password import assert_password_is_strong_enough
assert_password_is_strong_enough("admin", recovery_password)
secret = str(domain) + ":" + str(recovery_password).strip()
@ -277,7 +286,10 @@ def dyndns_set_recovery_password(domain, recovery_password):
try:
r = requests.put(
f"https://{DYNDNS_PROVIDER}/domains/{domain}/recovery_password",
data={"key": base64key, "recovery_password": hashlib.sha256(secret.encode('utf-8')).hexdigest()},
data={
"key": base64key,
"recovery_password": hashlib.sha256(secret.encode("utf-8")).hexdigest(),
},
timeout=30,
)
except Exception as e:
@ -292,7 +304,10 @@ def dyndns_set_recovery_password(domain, recovery_password):
elif r.status_code == 409:
raise YunohostError("dyndns_set_recovery_password_invalid_password")
else:
raise YunohostError("dyndns_set_recovery_password_failed", error=f"The server returned code {r.status_code}")
raise YunohostError(
"dyndns_set_recovery_password_failed",
error=f"The server returned code {r.status_code}",
)
def dyndns_list():
@ -303,7 +318,12 @@ def dyndns_list():
from yunohost.domain import domain_list
domains = domain_list(exclude_subdomains=True)["domains"]
dyndns_domains = [d for d in domains if is_yunohost_dyndns_domain(d) and glob.glob(f"/etc/yunohost/dyndns/K{d}.+*.key")]
dyndns_domains = [
d
for d in domains
if is_yunohost_dyndns_domain(d)
and glob.glob(f"/etc/yunohost/dyndns/K{d}.+*.key")
]
return {"domains": dyndns_domains}
@ -330,7 +350,6 @@ def dyndns_update(
# If domain is not given, update all DynDNS domains
if domain is None:
dyndns_domains = dyndns_list()["domains"]
if not dyndns_domains:

View file

@ -18,7 +18,11 @@ from yunohost.domain import (
)
TEST_DOMAINS = ["example.tld", "sub.example.tld", "other-example.com"]
TEST_DYNDNS_DOMAIN = "ci-test-" + "".join(chr(random.randint(ord("a"), ord("z"))) for x in range(12)) + random.choice([".noho.st", ".ynh.fr", ".nohost.me"])
TEST_DYNDNS_DOMAIN = (
"ci-test-"
+ "".join(chr(random.randint(ord("a"), ord("z"))) for x in range(12))
+ random.choice([".noho.st", ".ynh.fr", ".nohost.me"])
)
TEST_DYNDNS_PASSWORD = "astrongandcomplicatedpassphrasethatisverysecure"
@ -38,7 +42,9 @@ def setup_function(function):
# Clear other domains
for domain in domains:
if (domain not in TEST_DOMAINS or domain == TEST_DOMAINS[2]) and domain != TEST_DYNDNS_DOMAIN:
if (
domain not in TEST_DOMAINS or domain == TEST_DOMAINS[2]
) and domain != TEST_DYNDNS_DOMAIN:
# Clean domains not used for testing
domain_remove(domain)
elif domain in TEST_DOMAINS:
@ -70,7 +76,6 @@ def test_domain_add():
def test_domain_add_subscribe():
time.sleep(35) # Dynette blocks requests that happen too frequently
assert TEST_DYNDNS_DOMAIN not in domain_list()["domains"]
domain_add(TEST_DYNDNS_DOMAIN, dyndns_recovery_password=TEST_DYNDNS_PASSWORD)
@ -90,7 +95,6 @@ def test_domain_remove():
def test_domain_remove_unsubscribe():
time.sleep(35) # Dynette blocks requests that happen too frequently
assert TEST_DYNDNS_DOMAIN in domain_list()["domains"]
domain_remove(TEST_DYNDNS_DOMAIN, dyndns_recovery_password=TEST_DYNDNS_PASSWORD)

View file

@ -1968,7 +1968,7 @@ def test_option_default_type_with_choices_is_select():
"some_choices": {"choices": ["a", "b"]},
# LEGACY (`choices` in option `string` used to be valid)
# make sure this result as a `select` option
"some_legacy": {"type": "string", "choices": ["a", "b"]}
"some_legacy": {"type": "string", "choices": ["a", "b"]},
}
answers = {"some_choices": "a", "some_legacy": "a"}

View file

@ -213,7 +213,9 @@ def tools_postinstall(
# connectivity or something. Assume that this domain isn't manageable
# and inform the user that we could not contact the dyndns host server.
except Exception:
raise YunohostValidationError("dyndns_provider_unreachable", provider="dyndns.yunohost.org")
raise YunohostValidationError(
"dyndns_provider_unreachable", provider="dyndns.yunohost.org"
)
else:
if not available:
raise YunohostValidationError("dyndns_unavailable", domain=domain)
@ -228,7 +230,11 @@ def tools_postinstall(
logger.info(m18n.n("yunohost_installing"))
# New domain config
domain_add(domain, dyndns_recovery_password=dyndns_recovery_password, ignore_dyndns=ignore_dyndns)
domain_add(
domain,
dyndns_recovery_password=dyndns_recovery_password,
ignore_dyndns=ignore_dyndns,
)
domain_main_domain(domain)
# First user

View file

@ -716,7 +716,6 @@ class SystemuserAppResource(AppResource):
home: str = ""
def provision_or_update(self, context: Dict = {}):
from yunohost.app import regen_mail_app_user_config_for_dovecot_and_postfix
# FIXME : validate that no yunohost user exists with that name?
@ -773,13 +772,19 @@ class SystemuserAppResource(AppResource):
regen_mail_app_user_config_for_dovecot_and_postfix()
else:
self.delete_setting("mail_pwd")
if os.system(f"grep --quiet ' {self.app}$' /etc/postfix/app_senders_login_maps") == 0 \
or os.system(f"grep --quiet '^{self.app}:' /etc/dovecot/app-senders-passwd") == 0:
if (
os.system(
f"grep --quiet ' {self.app}$' /etc/postfix/app_senders_login_maps"
)
== 0
or os.system(
f"grep --quiet '^{self.app}:' /etc/dovecot/app-senders-passwd"
)
== 0
):
regen_mail_app_user_config_for_dovecot_and_postfix()
def deprovision(self, context: Dict = {}):
from yunohost.app import regen_mail_app_user_config_for_dovecot_and_postfix
if os.system(f"getent passwd {self.app} >/dev/null 2>/dev/null") == 0:
@ -797,8 +802,14 @@ class SystemuserAppResource(AppResource):
)
self.delete_setting("mail_pwd")
if os.system(f"grep --quiet ' {self.app}$' /etc/postfix/app_senders_login_maps") == 0 \
or os.system(f"grep --quiet '^{self.app}:' /etc/dovecot/app-senders-passwd") == 0:
if (
os.system(
f"grep --quiet ' {self.app}$' /etc/postfix/app_senders_login_maps"
)
== 0
or os.system(f"grep --quiet '^{self.app}:' /etc/dovecot/app-senders-passwd")
== 0
):
regen_mail_app_user_config_for_dovecot_and_postfix()
# FIXME : better logging and error handling, add stdout/stderr from the deluser/delgroup commands...