diff --git a/share/actionsmap.yml b/share/actionsmap.yml index 7417bb98d..6789f1dd6 100644 --- a/share/actionsmap.yml +++ b/share/actionsmap.yml @@ -443,6 +443,19 @@ domain: --exclude-subdomains: help: Filter out domains that are obviously subdomains of other declared domains action: store_true + --tree: + help: Display domains as a tree + action: store_true + + ### domain_info() + info: + action_help: Get domain aggredated data + api: GET /domains/ + arguments: + domain: + help: Domain to check + extra: + pattern: *pattern_domain ### domain_add() add: diff --git a/share/config_domain.toml b/share/config_domain.toml index 28c394cf1..0a3ba96cc 100644 --- a/share/config_domain.toml +++ b/share/config_domain.toml @@ -5,12 +5,13 @@ i18n = "domain_config" # Other things we may want to implement in the future: # # - maindomain handling -# - default app # - autoredirect www in nginx conf # - ? # [feature] +name = "Features" + [feature.app] [feature.app.default_app] type = "app" @@ -46,6 +47,7 @@ i18n = "domain_config" default = 0 [dns] +name = "DNS" [dns.registrar] optional = true @@ -61,6 +63,7 @@ i18n = "domain_config" [cert] +name = "Certificate" [cert.status] name = "Status" @@ -90,13 +93,13 @@ i18n = "domain_config" [cert.cert.acme_eligible_explain] type = "alert" style = "warning" - visible = "acme_eligible == false" + visible = "acme_eligible == false || acme_elligible == null" [cert.cert.cert_no_checks] ask = "Ignore diagnosis checks" type = "boolean" default = false - visible = "acme_eligible == false" + visible = "acme_eligible == false || acme_elligible == null" [cert.cert.cert_install] type = "button" diff --git a/src/certificate.py b/src/certificate.py index 076a12980..299095af0 100644 --- a/src/certificate.py +++ b/src/certificate.py @@ -99,8 +99,12 @@ def certificate_status(domains, full=False): try: _check_domain_is_ready_for_ACME(domain) status["ACME_eligible"] = True - except Exception: - status["ACME_eligible"] = False + except Exception as e: + if e.key == 'certmanager_domain_not_diagnosed_yet': + status["ACME_eligible"] = None # = unknown status + else: + status["ACME_eligible"] = False + del status["domain"] certificates[domain] = status @@ -794,7 +798,7 @@ def _check_domain_is_ready_for_ACME(domain): or {} ) - parent_domain = _get_parent_domain_of(domain) + parent_domain = _get_parent_domain_of(domain, return_self=True) dnsrecords = ( Diagnoser.get_cached_report( diff --git a/src/domain.py b/src/domain.py index 2e82ab199..97832a065 100644 --- a/src/domain.py +++ b/src/domain.py @@ -24,7 +24,9 @@ Manage domains """ import os -from typing import Dict, Any +import time +from typing import List +from collections import OrderedDict from moulinette import m18n, Moulinette from moulinette.core import MoulinetteError @@ -47,58 +49,131 @@ logger = getActionLogger("yunohost.domain") DOMAIN_SETTINGS_DIR = "/etc/yunohost/domains" # Lazy dev caching to avoid re-query ldap every time we need the domain list -domain_list_cache: Dict[str, Any] = {} +# The cache automatically expire every 15 seconds, to prevent desync between +# yunohost CLI and API which run in different processes +domain_list_cache: List[str] = [] +domain_list_cache_timestamp = 0 +main_domain_cache: str = None +main_domain_cache_timestamp = 0 +DOMAIN_CACHE_DURATION = 15 -def domain_list(exclude_subdomains=False): +def _get_maindomain(): + global main_domain_cache + global main_domain_cache_timestamp + if not main_domain_cache or abs(main_domain_cache_timestamp - time.time()) > DOMAIN_CACHE_DURATION: + with open("/etc/yunohost/current_host", "r") as f: + main_domain_cache = f.readline().rstrip() + main_domain_cache_timestamp = time.time() + + return main_domain_cache + + +def _get_domains(exclude_subdomains=False): + global domain_list_cache + global domain_list_cache_timestamp + if not domain_list_cache or abs(domain_list_cache_timestamp - time.time()) > DOMAIN_CACHE_DURATION: + from yunohost.utils.ldap import _get_ldap_interface + + ldap = _get_ldap_interface() + result = [ + entry["virtualdomain"][0] + for entry in ldap.search("ou=domains", "virtualdomain=*", ["virtualdomain"]) + ] + + def cmp_domain(domain): + # Keep the main part of the domain and the extension together + # eg: this.is.an.example.com -> ['example.com', 'an', 'is', 'this'] + domain = domain.split(".") + domain[-1] = domain[-2] + domain.pop() + return list(reversed(domain)) + + domain_list_cache = sorted(result, key=cmp_domain) + domain_list_cache_timestamp = time.time() + + if exclude_subdomains: + return [ + domain + for domain in domain_list_cache + if not _get_parent_domain_of(domain, return_self=False) + ] + + return domain_list_cache + + +def domain_list(exclude_subdomains=False, tree=False): """ List domains Keyword argument: exclude_subdomains -- Filter out domains that are subdomains of other declared domains + tree -- Display domains as a hierarchy tree """ - global domain_list_cache - if not exclude_subdomains and domain_list_cache: - return domain_list_cache - from yunohost.utils.ldap import _get_ldap_interface + domains = _get_domains(exclude_subdomains) + main = _get_maindomain() - ldap = _get_ldap_interface() - result = [ - entry["virtualdomain"][0] - for entry in ldap.search("ou=domains", "virtualdomain=*", ["virtualdomain"]) - ] + if not tree: + return {"domains": domains, "main": main} - result_list = [] - for domain in result: - if exclude_subdomains: - parent_domain = domain.split(".", 1)[1] - if parent_domain in result: - continue + if tree and exclude_subdomains: + return { + "domains": OrderedDict({domain: {} for domain in domains}), + "main": main, + } - result_list.append(domain) + def get_parent_dict(tree, child): + # If parent exists it should be the last added (see `_get_domains` ordering) + possible_parent = next(reversed(tree)) if tree else None + if possible_parent and child.endswith(f".{possible_parent}"): + return get_parent_dict(tree[possible_parent], child) + return tree - def cmp_domain(domain): - # Keep the main part of the domain and the extension together - # eg: this.is.an.example.com -> ['example.com', 'an', 'is', 'this'] - domain = domain.split(".") - domain[-1] = domain[-2] + domain.pop() - domain = list(reversed(domain)) - return domain + result = OrderedDict() + for domain in domains: + parent = get_parent_dict(result, domain) + parent[domain] = OrderedDict() - result_list = sorted(result_list, key=cmp_domain) + return {"domains": result, "main": main} - # Don't cache answer if using exclude_subdomains - if exclude_subdomains: - return {"domains": result_list, "main": _get_maindomain()} - domain_list_cache = {"domains": result_list, "main": _get_maindomain()} - return domain_list_cache +def domain_info(domain): + """ + Print aggregate data for a specific domain + + Keyword argument: + domain -- Domain to be checked + """ + + from yunohost.app import app_info + from yunohost.dns import _get_registar_settings + + _assert_domain_exists(domain) + + registrar, _ = _get_registar_settings(domain) + certificate = domain_cert_status([domain], full=True)["certificates"][domain] + + apps = [] + for app in _installed_apps(): + settings = _get_app_settings(app) + if settings.get("domain") == domain: + apps.append( + {"name": app_info(app)["name"], "id": app, "path": settings["path"]} + ) + + return { + "certificate": certificate, + "registrar": registrar, + "apps": apps, + "main": _get_maindomain() == domain, + "topest_parent": _get_parent_domain_of(domain, return_self=True, topest=True), + # TODO : add parent / child domains ? + } def _assert_domain_exists(domain): - if domain not in domain_list()["domains"]: + if domain not in _get_domains(): raise YunohostValidationError("domain_unknown", domain=domain) @@ -107,26 +182,26 @@ def _list_subdomains_of(parent_domain): _assert_domain_exists(parent_domain) out = [] - for domain in domain_list()["domains"]: + for domain in _get_domains(): if domain.endswith(f".{parent_domain}"): out.append(domain) return out -def _get_parent_domain_of(domain): +def _get_parent_domain_of(domain, return_self=True, topest=False): _assert_domain_exists(domain) - if "." not in domain: - return domain + domains = _get_domains(exclude_subdomains=topest) - parent_domain = domain.split(".", 1)[-1] - if parent_domain not in domain_list()["domains"]: - return domain # Domain is its own parent + domain_ = domain + while "." in domain_: + domain_ = domain_.split(".", 1)[1] + if domain_ in domains: + return domain_ - else: - return _get_parent_domain_of(parent_domain) + return domain if return_self else None @is_unit_operation() @@ -198,7 +273,7 @@ def domain_add(operation_logger, domain, dyndns=False): raise YunohostError("domain_creation_failed", domain=domain, error=e) finally: global domain_list_cache - domain_list_cache = {} + domain_list_cache = [] # Don't regen these conf if we're still in postinstall if os.path.exists("/etc/yunohost/installed"): @@ -255,7 +330,7 @@ def domain_remove(operation_logger, domain, remove_apps=False, force=False): # Check domain is not the main domain if domain == _get_maindomain(): - other_domains = domain_list()["domains"] + other_domains = _get_domains() other_domains.remove(domain) if other_domains: @@ -316,7 +391,7 @@ def domain_remove(operation_logger, domain, remove_apps=False, force=False): raise YunohostError("domain_deletion_failed", domain=domain, error=e) finally: global domain_list_cache - domain_list_cache = {} + domain_list_cache = [] stuff_to_delete = [ f"/etc/yunohost/certs/{domain}", @@ -380,8 +455,8 @@ def domain_main_domain(operation_logger, new_main_domain=None): # Apply changes to ssl certs try: write_to_file("/etc/yunohost/current_host", new_main_domain) - global domain_list_cache - domain_list_cache = {} + global main_domain_cache + main_domain_cache = new_main_domain _set_hostname(new_main_domain) except Exception as e: logger.warning(str(e), exc_info=1) @@ -409,12 +484,6 @@ def domain_url_available(domain, path): return len(_get_conflicting_apps(domain, path)) == 0 -def _get_maindomain(): - with open("/etc/yunohost/current_host", "r") as f: - maindomain = f.readline().rstrip() - return maindomain - - def domain_config_get(domain, key="", full=False, export=False): """ Display a domain configuration