Merge pull request #1434 from YunoHost/enh-domains

Ability to "list" domain as a tree structure, + add a new domain_info endpoint
This commit is contained in:
Alexandre Aubin 2022-10-07 15:25:51 +02:00 committed by GitHub
commit ef742124ea
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 148 additions and 59 deletions

View file

@ -443,6 +443,19 @@ domain:
--exclude-subdomains:
help: Filter out domains that are obviously subdomains of other declared domains
action: store_true
--tree:
help: Display domains as a tree
action: store_true
### domain_info()
info:
action_help: Get domain aggredated data
api: GET /domains/<domain>
arguments:
domain:
help: Domain to check
extra:
pattern: *pattern_domain
### domain_add()
add:

View file

@ -5,12 +5,13 @@ i18n = "domain_config"
# Other things we may want to implement in the future:
#
# - maindomain handling
# - default app
# - autoredirect www in nginx conf
# - ?
#
[feature]
name = "Features"
[feature.app]
[feature.app.default_app]
type = "app"
@ -46,6 +47,7 @@ i18n = "domain_config"
default = 0
[dns]
name = "DNS"
[dns.registrar]
optional = true
@ -61,6 +63,7 @@ i18n = "domain_config"
[cert]
name = "Certificate"
[cert.status]
name = "Status"
@ -90,13 +93,13 @@ i18n = "domain_config"
[cert.cert.acme_eligible_explain]
type = "alert"
style = "warning"
visible = "acme_eligible == false"
visible = "acme_eligible == false || acme_elligible == null"
[cert.cert.cert_no_checks]
ask = "Ignore diagnosis checks"
type = "boolean"
default = false
visible = "acme_eligible == false"
visible = "acme_eligible == false || acme_elligible == null"
[cert.cert.cert_install]
type = "button"

View file

@ -99,8 +99,12 @@ def certificate_status(domains, full=False):
try:
_check_domain_is_ready_for_ACME(domain)
status["ACME_eligible"] = True
except Exception:
status["ACME_eligible"] = False
except Exception as e:
if e.key == 'certmanager_domain_not_diagnosed_yet':
status["ACME_eligible"] = None # = unknown status
else:
status["ACME_eligible"] = False
del status["domain"]
certificates[domain] = status
@ -794,7 +798,7 @@ def _check_domain_is_ready_for_ACME(domain):
or {}
)
parent_domain = _get_parent_domain_of(domain)
parent_domain = _get_parent_domain_of(domain, return_self=True)
dnsrecords = (
Diagnoser.get_cached_report(

View file

@ -24,7 +24,9 @@
Manage domains
"""
import os
from typing import Dict, Any
import time
from typing import List
from collections import OrderedDict
from moulinette import m18n, Moulinette
from moulinette.core import MoulinetteError
@ -47,58 +49,131 @@ logger = getActionLogger("yunohost.domain")
DOMAIN_SETTINGS_DIR = "/etc/yunohost/domains"
# Lazy dev caching to avoid re-query ldap every time we need the domain list
domain_list_cache: Dict[str, Any] = {}
# The cache automatically expire every 15 seconds, to prevent desync between
# yunohost CLI and API which run in different processes
domain_list_cache: List[str] = []
domain_list_cache_timestamp = 0
main_domain_cache: str = None
main_domain_cache_timestamp = 0
DOMAIN_CACHE_DURATION = 15
def domain_list(exclude_subdomains=False):
def _get_maindomain():
global main_domain_cache
global main_domain_cache_timestamp
if not main_domain_cache or abs(main_domain_cache_timestamp - time.time()) > DOMAIN_CACHE_DURATION:
with open("/etc/yunohost/current_host", "r") as f:
main_domain_cache = f.readline().rstrip()
main_domain_cache_timestamp = time.time()
return main_domain_cache
def _get_domains(exclude_subdomains=False):
global domain_list_cache
global domain_list_cache_timestamp
if not domain_list_cache or abs(domain_list_cache_timestamp - time.time()) > DOMAIN_CACHE_DURATION:
from yunohost.utils.ldap import _get_ldap_interface
ldap = _get_ldap_interface()
result = [
entry["virtualdomain"][0]
for entry in ldap.search("ou=domains", "virtualdomain=*", ["virtualdomain"])
]
def cmp_domain(domain):
# Keep the main part of the domain and the extension together
# eg: this.is.an.example.com -> ['example.com', 'an', 'is', 'this']
domain = domain.split(".")
domain[-1] = domain[-2] + domain.pop()
return list(reversed(domain))
domain_list_cache = sorted(result, key=cmp_domain)
domain_list_cache_timestamp = time.time()
if exclude_subdomains:
return [
domain
for domain in domain_list_cache
if not _get_parent_domain_of(domain, return_self=False)
]
return domain_list_cache
def domain_list(exclude_subdomains=False, tree=False):
"""
List domains
Keyword argument:
exclude_subdomains -- Filter out domains that are subdomains of other declared domains
tree -- Display domains as a hierarchy tree
"""
global domain_list_cache
if not exclude_subdomains and domain_list_cache:
return domain_list_cache
from yunohost.utils.ldap import _get_ldap_interface
domains = _get_domains(exclude_subdomains)
main = _get_maindomain()
ldap = _get_ldap_interface()
result = [
entry["virtualdomain"][0]
for entry in ldap.search("ou=domains", "virtualdomain=*", ["virtualdomain"])
]
if not tree:
return {"domains": domains, "main": main}
result_list = []
for domain in result:
if exclude_subdomains:
parent_domain = domain.split(".", 1)[1]
if parent_domain in result:
continue
if tree and exclude_subdomains:
return {
"domains": OrderedDict({domain: {} for domain in domains}),
"main": main,
}
result_list.append(domain)
def get_parent_dict(tree, child):
# If parent exists it should be the last added (see `_get_domains` ordering)
possible_parent = next(reversed(tree)) if tree else None
if possible_parent and child.endswith(f".{possible_parent}"):
return get_parent_dict(tree[possible_parent], child)
return tree
def cmp_domain(domain):
# Keep the main part of the domain and the extension together
# eg: this.is.an.example.com -> ['example.com', 'an', 'is', 'this']
domain = domain.split(".")
domain[-1] = domain[-2] + domain.pop()
domain = list(reversed(domain))
return domain
result = OrderedDict()
for domain in domains:
parent = get_parent_dict(result, domain)
parent[domain] = OrderedDict()
result_list = sorted(result_list, key=cmp_domain)
return {"domains": result, "main": main}
# Don't cache answer if using exclude_subdomains
if exclude_subdomains:
return {"domains": result_list, "main": _get_maindomain()}
domain_list_cache = {"domains": result_list, "main": _get_maindomain()}
return domain_list_cache
def domain_info(domain):
"""
Print aggregate data for a specific domain
Keyword argument:
domain -- Domain to be checked
"""
from yunohost.app import app_info
from yunohost.dns import _get_registar_settings
_assert_domain_exists(domain)
registrar, _ = _get_registar_settings(domain)
certificate = domain_cert_status([domain], full=True)["certificates"][domain]
apps = []
for app in _installed_apps():
settings = _get_app_settings(app)
if settings.get("domain") == domain:
apps.append(
{"name": app_info(app)["name"], "id": app, "path": settings["path"]}
)
return {
"certificate": certificate,
"registrar": registrar,
"apps": apps,
"main": _get_maindomain() == domain,
"topest_parent": _get_parent_domain_of(domain, return_self=True, topest=True),
# TODO : add parent / child domains ?
}
def _assert_domain_exists(domain):
if domain not in domain_list()["domains"]:
if domain not in _get_domains():
raise YunohostValidationError("domain_unknown", domain=domain)
@ -107,26 +182,26 @@ def _list_subdomains_of(parent_domain):
_assert_domain_exists(parent_domain)
out = []
for domain in domain_list()["domains"]:
for domain in _get_domains():
if domain.endswith(f".{parent_domain}"):
out.append(domain)
return out
def _get_parent_domain_of(domain):
def _get_parent_domain_of(domain, return_self=True, topest=False):
_assert_domain_exists(domain)
if "." not in domain:
return domain
domains = _get_domains(exclude_subdomains=topest)
parent_domain = domain.split(".", 1)[-1]
if parent_domain not in domain_list()["domains"]:
return domain # Domain is its own parent
domain_ = domain
while "." in domain_:
domain_ = domain_.split(".", 1)[1]
if domain_ in domains:
return domain_
else:
return _get_parent_domain_of(parent_domain)
return domain if return_self else None
@is_unit_operation()
@ -198,7 +273,7 @@ def domain_add(operation_logger, domain, dyndns=False):
raise YunohostError("domain_creation_failed", domain=domain, error=e)
finally:
global domain_list_cache
domain_list_cache = {}
domain_list_cache = []
# Don't regen these conf if we're still in postinstall
if os.path.exists("/etc/yunohost/installed"):
@ -255,7 +330,7 @@ def domain_remove(operation_logger, domain, remove_apps=False, force=False):
# Check domain is not the main domain
if domain == _get_maindomain():
other_domains = domain_list()["domains"]
other_domains = _get_domains()
other_domains.remove(domain)
if other_domains:
@ -316,7 +391,7 @@ def domain_remove(operation_logger, domain, remove_apps=False, force=False):
raise YunohostError("domain_deletion_failed", domain=domain, error=e)
finally:
global domain_list_cache
domain_list_cache = {}
domain_list_cache = []
stuff_to_delete = [
f"/etc/yunohost/certs/{domain}",
@ -380,8 +455,8 @@ def domain_main_domain(operation_logger, new_main_domain=None):
# Apply changes to ssl certs
try:
write_to_file("/etc/yunohost/current_host", new_main_domain)
global domain_list_cache
domain_list_cache = {}
global main_domain_cache
main_domain_cache = new_main_domain
_set_hostname(new_main_domain)
except Exception as e:
logger.warning(str(e), exc_info=1)
@ -409,12 +484,6 @@ def domain_url_available(domain, path):
return len(_get_conflicting_apps(domain, path)) == 0
def _get_maindomain():
with open("/etc/yunohost/current_host", "r") as f:
maindomain = f.readline().rstrip()
return maindomain
def domain_config_get(domain, key="", full=False, export=False):
"""
Display a domain configuration