autodns: Various tweaks and refactorings to make test pass

This commit is contained in:
Alexandre Aubin 2021-09-12 21:32:43 +02:00
parent f276a031af
commit 4533b74d6c
13 changed files with 299 additions and 149 deletions

View file

@ -85,6 +85,25 @@ test-helpers:
changes:
- data/helpers.d/*
test-domains:
extends: .test-stage
script:
- cd src/yunohost
- python3 -m pytest tests/test_domains.py
only:
changes:
- src/yunohost/domain.py
test-dns:
extends: .test-stage
script:
- cd src/yunohost
- python3 -m pytest tests/test_dns.py
only:
changes:
- src/yunohost/dns.py
- src/yunohost/utils/dns.py
test-apps:
extends: .test-stage
script:

View file

@ -589,8 +589,16 @@ domain:
domain:
help: Domain name
key:
help: A question or form key
help: A specific panel, section or a question identifier
nargs: '?'
-f:
full: --full
help: Display all details (meant to be used by the API)
action: store_true
-e:
full: --export
help: Only export key/values, meant to be reimported using "config set --args-file"
action: store_true
### domain_config_set()
set:

View file

@ -65,7 +65,7 @@ do_pre_regen() {
export experimental="$(yunohost settings get 'security.experimental.enabled')"
ynh_render_template "security.conf.inc" "${nginx_conf_dir}/security.conf.inc"
cert_status=$(yunohost domain cert-status --json)
cert_status=$(yunohost domain cert status --json)
# add domain conf files
for domain in $YNH_DOMAINS; do

View file

@ -8,12 +8,11 @@ from publicsuffix import PublicSuffixList
from moulinette.utils.process import check_output
from yunohost.utils.dns import dig
from yunohost.utils.dns import dig, YNH_DYNDNS_DOMAINS
from yunohost.diagnosis import Diagnoser
from yunohost.domain import domain_list, _get_maindomain
from yunohost.dns import _build_dns_conf
YNH_DYNDNS_DOMAINS = ["nohost.me", "noho.st", "ynh.fr"]
SPECIAL_USE_TLDS = ["local", "localhost", "onion", "test"]

View file

@ -2,8 +2,10 @@ version = "1.0"
i18n = "domain_config"
[feature]
[feature.mail]
services = ['postfix', 'dovecot']
[feature.mail.mail_out]
type = "boolean"
default = 1
@ -25,17 +27,14 @@ i18n = "domain_config"
default = 0
[dns]
[dns.registrar]
optional = true
# This part is replace dynamically by DomainConfigPanel
[dns.registrar.unsupported]
ask = "DNS zone of this domain can't be auto-configured, you should do it manually."
type = "alert"
style = "info"
helpLink.href = "https://yunohost.org/dns_config"
helpLink.text = "How to configure manually my DNS zone"
# This part is automatically generated in DomainConfigPanel
[dns.advanced]
[dns.advanced.ttl]
type = "number"
min = 0

View file

@ -395,6 +395,7 @@
"iptables_unavailable": "You cannot play with iptables here. You are either in a container or your kernel does not support it",
"ldap_server_down": "Unable to reach LDAP server",
"ldap_server_is_down_restart_it": "The LDAP service is down, attempt to restart it...",
"ldap_attribute_already_exists": "LDAP attribute '{attribute}' already exists with value '{value}'",
"log_app_action_run": "Run action of the '{}' app",
"log_app_change_url": "Change the URL of the '{}' app",
"log_app_config_set": "Apply config to the '{}' app",
@ -409,6 +410,7 @@
"log_corrupted_md_file": "The YAML metadata file associated with logs is damaged: '{md_file}\nError: {error}'",
"log_does_exists": "There is no operation log with the name '{log}', use 'yunohost log list' to see all available operation logs",
"log_domain_add": "Add '{}' domain into system configuration",
"log_domain_config_set": "Update configuration for domain '{}'",
"log_domain_main_domain": "Make '{}' the main domain",
"log_domain_remove": "Remove '{}' domain from system configuration",
"log_dyndns_subscribe": "Subscribe to a YunoHost subdomain '{}'",

View file

@ -25,11 +25,15 @@
"""
import os
import re
import time
from collections import OrderedDict
from moulinette import m18n, Moulinette
from moulinette.utils.log import getActionLogger
from moulinette.utils.filesystem import read_file, write_to_file, read_toml
from yunohost.domain import domain_list, _get_domain_settings, _assert_domain_exists
from yunohost.domain import domain_list, _assert_domain_exists, domain_config_get
from yunohost.utils.dns import dig, YNH_DYNDNS_DOMAINS
from yunohost.utils.error import YunohostValidationError
from yunohost.utils.network import get_public_ip
from yunohost.log import is_unit_operation
@ -37,8 +41,10 @@ from yunohost.hook import hook_callback
logger = getActionLogger("yunohost.domain")
DOMAIN_REGISTRAR_LIST_PATH = "/usr/share/yunohost/other/registrar_list.toml"
def domain_dns_conf(domain):
def domain_dns_suggest(domain):
"""
Generate DNS configuration for a domain
@ -149,10 +155,10 @@ def _build_dns_conf(base_domain):
ipv6 = get_public_ip(6)
subdomains = _list_subdomains_of(base_domain)
domains_settings = {domain: _get_domain_settings(domain)
domains_settings = {domain: domain_config_get(domain)
for domain in [base_domain] + subdomains}
base_dns_zone = domains_settings[base_domain].get("dns_zone")
base_dns_zone = _get_dns_zone_for_domain(base_domain)
for domain, settings in domains_settings.items():
@ -384,6 +390,126 @@ def _get_DKIM(domain):
)
def _get_dns_zone_for_domain(domain):
"""
Get the DNS zone of a domain
Keyword arguments:
domain -- The domain name
"""
# First, check if domain is a nohost.me / noho.st / ynh.fr
# This is mainly meant to speed up things for "dyndns update"
# ... otherwise we end up constantly doing a bunch of dig requests
for ynh_dyndns_domain in YNH_DYNDNS_DOMAINS:
if domain.endswith('.' + ynh_dyndns_domain):
return ynh_dyndns_domain
# Check cache
cache_folder = "/var/cache/yunohost/dns_zones"
cache_file = f"{cache_folder}/{domain}"
cache_duration = 3600 # one hour
if (
os.path.exists(cache_file)
and abs(os.path.getctime(cache_file) - time.time()) < cache_duration
):
dns_zone = read_file(cache_file).strip()
if dns_zone:
return dns_zone
# Check cache for parent domain
# This is another strick to try to prevent this function from being
# a bottleneck on system with 1 main domain + 10ish subdomains
# when building the dns conf for the main domain (which will call domain_config_get, etc...)
parent_domain = domain.split(".", 1)[1]
if parent_domain in domain_list()["domains"]:
parent_cache_file = f"{cache_folder}/{parent_domain}"
if (
os.path.exists(parent_cache_file)
and abs(os.path.getctime(parent_cache_file) - time.time()) < cache_duration
):
dns_zone = read_file(parent_cache_file).strip()
if dns_zone:
return dns_zone
# For foo.bar.baz.gni we want to scan all the parent domains
# (including the domain itself)
# foo.bar.baz.gni
# bar.baz.gni
# baz.gni
# gni
# Until we find the first one that has a NS record
parent_list = [domain.split(".", i)[-1]
for i, _ in enumerate(domain.split("."))]
for parent in parent_list:
# Check if there's a NS record for that domain
answer = dig(parent, rdtype="NS", full_answers=True, resolvers="force_external")
if answer[0] == "ok":
os.system(f"mkdir -p {cache_folder}")
write_to_file(cache_file, parent)
return parent
logger.warning(f"Could not identify the dns_zone for domain {domain}, returning {parent_list[-1]}")
return parent_list[-1]
def _get_registrar_config_section(domain):
from lexicon.providers.auto import _relevant_provider_for_domain
registrar_infos = {}
dns_zone = _get_dns_zone_for_domain(domain)
# If parent domain exists in yunohost
parent_domain = domain.split(".", 1)[1]
if parent_domain in domain_list()["domains"]:
registrar_infos["explanation"] = OrderedDict({
"type": "alert",
"style": "info",
"ask": f"This domain is a subdomain of {parent_domain}. DNS registrar configuration should be managed in {parent_domain}'s configuration panel.", # FIXME: i18n
"value": None
})
return OrderedDict(registrar_infos)
# TODO big project, integrate yunohost's dynette as a registrar-like provider
# TODO big project, integrate other dyndns providers such as netlib.re, or cf the list of dyndns providers supported by cloudron...
if dns_zone in YNH_DYNDNS_DOMAINS:
registrar_infos["explanation"] = OrderedDict({
"type": "alert",
"style": "success",
"ask": "This domain is a nohost.me / nohost.st / ynh.fr and its DNS configuration is therefore automatically handled by Yunohost.", # FIXME: i18n
"value": "yunohost"
})
return OrderedDict(registrar_infos)
try:
registrar = _relevant_provider_for_domain(dns_zone)[0]
except ValueError:
registrar_infos["explanation"] = OrderedDict({
"type": "alert",
"style": "warning",
"ask": "YunoHost could not automatically detect the registrar handling this domain. You should manually configure your DNS records following the documentation at https://yunohost.org/dns.", # FIXME : i18n
"value": None
})
else:
registrar_infos["explanation"] = OrderedDict({
"type": "alert",
"style": "info",
"ask": f"YunoHost automatically detected that this domain is handled by the registrar **{registrar}**. If you want, YunoHost will automatically configure this DNS zone, if you provide it with the following informations. You can also manually configure your DNS records following the documentation as https://yunohost.org/dns.", # FIXME: i18n
"value": registrar
})
# TODO : add a help tip with the link to the registar's API doc (c.f. Lexicon's README)
registrar_list = read_toml(DOMAIN_REGISTRAR_LIST_PATH)
registrar_infos.update(registrar_list[registrar])
return OrderedDict(registrar_infos)
@is_unit_operation()
def domain_registrar_push(operation_logger, domain, dry_run=False):
"""
@ -395,8 +521,7 @@ def domain_registrar_push(operation_logger, domain, dry_run=False):
_assert_domain_exists(domain)
dns_zone = _get_domain_settings(domain)["dns_zone"]
registrar_settings = _get_registrar_settings(dns_zone)
registrar_settings = domain_config_get(domain, key='', full=True)
if not registrar_settings:
raise YunohostValidationError("registrar_is_not_set", domain=domain)

View file

@ -29,7 +29,7 @@ from moulinette import m18n, Moulinette
from moulinette.core import MoulinetteError
from moulinette.utils.log import getActionLogger
from moulinette.utils.filesystem import (
mkdir, write_to_file, read_yaml, write_to_yaml, read_toml
mkdir, write_to_file, read_yaml, write_to_yaml
)
from yunohost.app import (
@ -49,7 +49,6 @@ logger = getActionLogger("yunohost.domain")
DOMAIN_CONFIG_PATH = "/usr/share/yunohost/other/config_domain.toml"
DOMAIN_SETTINGS_DIR = "/etc/yunohost/domains"
DOMAIN_REGISTRAR_LIST_PATH = "/usr/share/yunohost/other/registrar_list.toml"
# Lazy dev caching to avoid re-query ldap every time we need the domain list
domain_list_cache = {}
@ -391,23 +390,25 @@ def _get_maindomain():
return maindomain
def _get_domain_settings(domain):
"""
Retrieve entries in /etc/yunohost/domains/[domain].yml
And set default values if needed
"""
config = DomainConfigPanel(domain)
return config.get(mode='export')
def domain_config_get(domain, key='', mode='classic'):
def domain_config_get(domain, key='', full=False, export=False):
"""
Display a domain configuration
"""
if full and export:
raise YunohostValidationError("You can't use --full and --export together.", raw_msg=True)
if full:
mode = "full"
elif export:
mode = "export"
else:
mode = "classic"
config = DomainConfigPanel(domain)
return config.get(key, mode)
@is_unit_operation()
def domain_config_set(operation_logger, domain, key=None, value=None, args=None, args_file=None):
"""
@ -415,31 +416,28 @@ def domain_config_set(operation_logger, domain, key=None, value=None, args=None,
"""
Question.operation_logger = operation_logger
config = DomainConfigPanel(domain)
return config.set(key, value, args, args_file)
return config.set(key, value, args, args_file, operation_logger=operation_logger)
class DomainConfigPanel(ConfigPanel):
def __init__(self, domain):
_assert_domain_exists(domain)
self.domain = domain
self.save_mode = "diff"
super().__init__(
config_path=DOMAIN_CONFIG_PATH,
save_path=f"{DOMAIN_SETTINGS_DIR}/{domain}.yml"
)
def _get_toml(self):
from lexicon.providers.auto import _relevant_provider_for_domain
from yunohost.utils.dns import get_dns_zone_from_domain
from yunohost.dns import _get_registrar_config_section
toml = super()._get_toml()
self.dns_zone = get_dns_zone_from_domain(self.domain)
try:
registrar = _relevant_provider_for_domain(self.dns_zone)[0]
except ValueError:
return toml
toml['feature']['xmpp']['xmpp']['default'] = 1 if self.domain == _get_maindomain() else 0
toml['dns']['registrar'] = _get_registrar_config_section(self.domain)
registrar_list = read_toml(DOMAIN_REGISTRAR_LIST_PATH)
toml['dns']['registrar'] = registrar_list[registrar]
return toml
def _load_current_values(self):
@ -480,8 +478,12 @@ def domain_cert_renew(
def domain_dns_conf(domain):
return domain_dns_suggest(domain)
def domain_dns_suggest(domain):
import yunohost.dns
return yunohost.dns.domain_dns_conf(domain)
return yunohost.dns.domain_dns_suggest(domain)
def domain_dns_push(domain, dry_run):

View file

@ -0,0 +1,66 @@
import pytest
import yaml
import os
from moulinette.utils.filesystem import read_toml
from yunohost.utils.error import YunohostError, YunohostValidationError
from yunohost.dns import (
DOMAIN_REGISTRAR_LIST_PATH,
_get_dns_zone_for_domain,
_get_registrar_config_section
)
def setup_function(function):
clean()
def teardown_function(function):
clean()
def clean():
pass
# DNS utils testing
def test_get_dns_zone_from_domain_existing():
assert _get_dns_zone_for_domain("yunohost.org") == "yunohost.org"
assert _get_dns_zone_for_domain("donate.yunohost.org") == "yunohost.org"
assert _get_dns_zone_for_domain("fr.wikipedia.org") == "wikipedia.org"
assert _get_dns_zone_for_domain("www.fr.wikipedia.org") == "wikipedia.org"
assert _get_dns_zone_for_domain("non-existing-domain.yunohost.org") == "yunohost.org"
assert _get_dns_zone_for_domain("yolo.nohost.me") == "nohost.me"
assert _get_dns_zone_for_domain("foo.yolo.nohost.me") == "nohost.me"
assert _get_dns_zone_for_domain("yolo.test") == "test"
assert _get_dns_zone_for_domain("foo.yolo.test") == "test"
# Domain registrar testing
def test_registrar_list_integrity():
assert read_toml(DOMAIN_REGISTRAR_LIST_PATH)
def test_magic_guess_registrar_weird_domain():
assert _get_registrar_config_section("yolo.test")["explanation"]["value"] is None
def test_magic_guess_registrar_ovh():
assert _get_registrar_config_section("yolo.yunohost.org")["explanation"]["value"] == "ovh"
def test_magic_guess_registrar_yunodyndns():
assert _get_registrar_config_section("yolo.nohost.me")["explanation"]["value"] == "yunohost"
#def domain_dns_suggest(domain):
# return yunohost.dns.domain_dns_conf(domain)
#
#
#def domain_dns_push(domain, dry_run):
# import yunohost.dns
# return yunohost.dns.domain_registrar_push(domain, dry_run)

View file

@ -1,24 +1,18 @@
import pytest
import yaml
import os
from moulinette.core import MoulinetteError
from yunohost.utils.error import YunohostError, YunohostValidationError
from yunohost.utils.dns import get_dns_zone_from_domain
from yunohost.utils.error import YunohostValidationError
from yunohost.domain import (
DOMAIN_SETTINGS_DIR,
REGISTRAR_LIST_PATH,
_get_maindomain,
domain_add,
domain_remove,
domain_list,
domain_main_domain,
domain_setting,
domain_dns_conf,
domain_registrar_set,
domain_registrar_catalog
domain_config_get,
domain_config_set,
)
TEST_DOMAINS = [
@ -27,6 +21,7 @@ TEST_DOMAINS = [
"other-example.com"
]
def setup_function(function):
# Save domain list in variable to avoid multiple calls to domain_list()
@ -51,7 +46,6 @@ def setup_function(function):
# Reset settings if any
os.system(f"rm -rf {DOMAIN_SETTINGS_DIR}/{domain}.yml")
# Create classical second domain of not exist
if TEST_DOMAINS[1] not in domains:
domain_add(TEST_DOMAINS[1])
@ -65,101 +59,62 @@ def teardown_function(function):
clean()
def clean():
pass
# Domains management testing
def test_domain_add():
assert TEST_DOMAINS[2] not in domain_list()["domains"]
domain_add(TEST_DOMAINS[2])
assert TEST_DOMAINS[2] in domain_list()["domains"]
def test_domain_add_existing_domain():
with pytest.raises(MoulinetteError) as e_info:
with pytest.raises(MoulinetteError):
assert TEST_DOMAINS[1] in domain_list()["domains"]
domain_add(TEST_DOMAINS[1])
def test_domain_remove():
assert TEST_DOMAINS[1] in domain_list()["domains"]
domain_remove(TEST_DOMAINS[1])
assert TEST_DOMAINS[1] not in domain_list()["domains"]
def test_main_domain():
current_main_domain = _get_maindomain()
assert domain_main_domain()["current_main_domain"] == current_main_domain
def test_main_domain_change_unknown():
with pytest.raises(YunohostValidationError) as e_info:
with pytest.raises(YunohostValidationError):
domain_main_domain(TEST_DOMAINS[2])
def test_change_main_domain():
assert _get_maindomain() != TEST_DOMAINS[1]
domain_main_domain(TEST_DOMAINS[1])
assert _get_maindomain() == TEST_DOMAINS[1]
# Domain settings testing
def test_domain_setting_get_default_xmpp_main_domain():
assert TEST_DOMAINS[0] in domain_list()["domains"]
assert domain_setting(TEST_DOMAINS[0], "xmpp") == True
def test_domain_config_get_default():
assert domain_config_get(TEST_DOMAINS[0], "feature.xmpp.xmpp") == 1
assert domain_config_get(TEST_DOMAINS[1], "feature.xmpp.xmpp") == 0
assert domain_config_get(TEST_DOMAINS[1], "dns.advanced.ttl") == 3600
def test_domain_setting_get_default_xmpp():
assert domain_setting(TEST_DOMAINS[1], "xmpp") == False
def test_domain_setting_get_default_ttl():
assert domain_setting(TEST_DOMAINS[1], "ttl") == 3600
def test_domain_config_set():
assert domain_config_get(TEST_DOMAINS[1], "feature.xmpp.xmpp") == 0
domain_config_set(TEST_DOMAINS[1], "feature.xmpp.xmpp", "yes")
assert domain_config_get(TEST_DOMAINS[1], "feature.xmpp.xmpp") == 1
def test_domain_setting_set_int():
domain_setting(TEST_DOMAINS[1], "ttl", "10")
assert domain_setting(TEST_DOMAINS[1], "ttl") == 10
domain_config_set(TEST_DOMAINS[1], "dns.advanced.ttl", 10)
assert domain_config_get(TEST_DOMAINS[1], "dns.advanced.ttl") == 10
def test_domain_setting_set_bool_true():
domain_setting(TEST_DOMAINS[1], "xmpp", "True")
assert domain_setting(TEST_DOMAINS[1], "xmpp") == True
domain_setting(TEST_DOMAINS[1], "xmpp", "true")
assert domain_setting(TEST_DOMAINS[1], "xmpp") == True
domain_setting(TEST_DOMAINS[1], "xmpp", "t")
assert domain_setting(TEST_DOMAINS[1], "xmpp") == True
domain_setting(TEST_DOMAINS[1], "xmpp", "1")
assert domain_setting(TEST_DOMAINS[1], "xmpp") == True
domain_setting(TEST_DOMAINS[1], "xmpp", "yes")
assert domain_setting(TEST_DOMAINS[1], "xmpp") == True
domain_setting(TEST_DOMAINS[1], "xmpp", "y")
assert domain_setting(TEST_DOMAINS[1], "xmpp") == True
def test_domain_setting_set_bool_false():
domain_setting(TEST_DOMAINS[1], "xmpp", "False")
assert domain_setting(TEST_DOMAINS[1], "xmpp") == False
domain_setting(TEST_DOMAINS[1], "xmpp", "false")
assert domain_setting(TEST_DOMAINS[1], "xmpp") == False
domain_setting(TEST_DOMAINS[1], "xmpp", "f")
assert domain_setting(TEST_DOMAINS[1], "xmpp") == False
domain_setting(TEST_DOMAINS[1], "xmpp", "0")
assert domain_setting(TEST_DOMAINS[1], "xmpp") == False
domain_setting(TEST_DOMAINS[1], "xmpp", "no")
assert domain_setting(TEST_DOMAINS[1], "xmpp") == False
domain_setting(TEST_DOMAINS[1], "xmpp", "n")
assert domain_setting(TEST_DOMAINS[1], "xmpp") == False
def test_domain_settings_unknown():
with pytest.raises(YunohostValidationError) as e_info:
domain_setting(TEST_DOMAINS[2], "xmpp", "False")
# DNS utils testing
def test_get_dns_zone_from_domain_existing():
assert get_dns_zone_from_domain("donate.yunohost.org") == "yunohost.org"
def test_get_dns_zone_from_domain_not_existing():
assert get_dns_zone_from_domain("non-existing-domain.yunohost.org") == "yunohost.org"
# Domain registrar testing
def test_registrar_list_yaml_integrity():
yaml.load(open(REGISTRAR_LIST_PATH, 'r'))
def test_domain_registrar_catalog():
domain_registrar_catalog()
def test_domain_registrar_catalog_full():
domain_registrar_catalog(None, True)
def test_domain_registrar_catalog_registrar():
domain_registrar_catalog("ovh")
def test_domain_configs_unknown():
with pytest.raises(YunohostValidationError):
domain_config_get(TEST_DOMAINS[2], "feature.xmpp.xmpp.xmpp")

View file

@ -170,7 +170,9 @@ class ConfigPanel:
raise YunohostError(f"The filter key {filter_key} has too many sub-levels, the max is 3.", raw_msg=True)
if not os.path.exists(self.config_path):
logger.debug(f"Config panel {self.config_path} doesn't exists")
return None
toml_config_panel = self._get_toml()
# Check TOML config panel is in a supported version

View file

@ -21,6 +21,8 @@
import dns.resolver
from moulinette.utils.filesystem import read_file
YNH_DYNDNS_DOMAINS = ["nohost.me", "noho.st", "ynh.fr"]
# Lazy dev caching to avoid re-reading the file multiple time when calling
# dig() often during same yunohost operation
external_resolvers_ = []
@ -90,33 +92,3 @@ def dig(
return ("ok", answers)
def get_dns_zone_from_domain(domain):
# TODO Check if this function is YNH_DYNDNS_DOMAINS compatible
"""
Get the DNS zone of a domain
Keyword arguments:
domain -- The domain name
"""
# For foo.bar.baz.gni we want to scan all the parent domains
# (including the domain itself)
# foo.bar.baz.gni
# bar.baz.gni
# baz.gni
# gni
parent_list = [domain.split(".", i)[-1]
for i, _ in enumerate(domain.split("."))]
for parent in parent_list:
# Check if there's a NS record for that domain
answer = dig(parent, rdtype="NS", full_answers=True, resolvers="force_external")
if answer[0] == "ok":
# Domain is dns_zone
return parent
# FIXME: returning None will probably trigger bugs when this happens, code expects a domain string
return None

View file

@ -101,7 +101,8 @@ class LDAPInterface:
except ldap.SERVER_DOWN:
raise YunohostError(
"Service slapd is not running but is required to perform this action ... "
"You can try to investigate what's happening with 'systemctl status slapd'"
"You can try to investigate what's happening with 'systemctl status slapd'",
raw_msg=True
)
# Check that we are indeed logged in with the right identity
@ -289,7 +290,7 @@ class LDAPInterface:
attr_found[0],
attr_found[1],
)
raise MoulinetteError(
raise YunohostError(
"ldap_attribute_already_exists",
attribute=attr_found[0],
value=attr_found[1],