diff --git a/src/domain.py b/src/domain.py index 14b28940a..eab56761b 100644 --- a/src/domain.py +++ b/src/domain.py @@ -24,7 +24,7 @@ Manage domains """ import os -from typing import Dict, Any +from typing import List, Any from moulinette import m18n, Moulinette from moulinette.core import MoulinetteError @@ -47,7 +47,47 @@ logger = getActionLogger("yunohost.domain") DOMAIN_SETTINGS_DIR = "/etc/yunohost/domains" # Lazy dev caching to avoid re-query ldap every time we need the domain list -domain_list_cache: Dict[str, Any] = {} +domain_list_cache: List[str] = [] +main_domain_cache: str = None + + +def _get_maindomain(no_cache=False): + global main_domain_cache + if not main_domain_cache or no_cache: + with open("/etc/yunohost/current_host", "r") as f: + main_domain_cache = f.readline().rstrip() + + return main_domain_cache + + +def _get_domains(exclude_subdomains=False, no_cache=False): + global domain_list_cache + if not domain_list_cache or no_cache: + from yunohost.utils.ldap import _get_ldap_interface + + ldap = _get_ldap_interface() + result = [ + entry["virtualdomain"][0] + for entry in ldap.search("ou=domains", "virtualdomain=*", ["virtualdomain"]) + ] + + def cmp_domain(domain): + # Keep the main part of the domain and the extension together + # eg: this.is.an.example.com -> ['example.com', 'an', 'is', 'this'] + domain = domain.split(".") + domain[-1] = domain[-2] + domain.pop() + return list(reversed(domain)) + + domain_list_cache = sorted(result, key=cmp_domain) + + if exclude_subdomains: + return [ + domain + for domain in domain_list_cache + if not _get_parent_domain_of(domain, return_self=False) + ] + + return domain_list_cache def domain_list(exclude_subdomains=False): @@ -58,47 +98,11 @@ def domain_list(exclude_subdomains=False): exclude_subdomains -- Filter out domains that are subdomains of other declared domains """ - global domain_list_cache - if not exclude_subdomains and domain_list_cache: - return domain_list_cache - - from yunohost.utils.ldap import _get_ldap_interface - - ldap = _get_ldap_interface() - result = [ - entry["virtualdomain"][0] - for entry in ldap.search("ou=domains", "virtualdomain=*", ["virtualdomain"]) - ] - - result_list = [] - for domain in result: - if exclude_subdomains: - parent_domain = domain.split(".", 1)[1] - if parent_domain in result: - continue - - result_list.append(domain) - - def cmp_domain(domain): - # Keep the main part of the domain and the extension together - # eg: this.is.an.example.com -> ['example.com', 'an', 'is', 'this'] - domain = domain.split(".") - domain[-1] = domain[-2] + domain.pop() - domain = list(reversed(domain)) - return domain - - result_list = sorted(result_list, key=cmp_domain) - - # Don't cache answer if using exclude_subdomains - if exclude_subdomains: - return {"domains": result_list, "main": _get_maindomain()} - - domain_list_cache = {"domains": result_list, "main": _get_maindomain()} - return domain_list_cache + return {"domains": _get_domains(exclude_subdomains), "main": _get_maindomain()} def _assert_domain_exists(domain): - if domain not in domain_list()["domains"]: + if domain not in _get_domains(): raise YunohostValidationError("domain_unknown", domain=domain) @@ -107,26 +111,24 @@ def _list_subdomains_of(parent_domain): _assert_domain_exists(parent_domain) out = [] - for domain in domain_list()["domains"]: + for domain in _get_domains(): if domain.endswith(f".{parent_domain}"): out.append(domain) return out -def _get_parent_domain_of(domain): +def _get_parent_domain_of(domain, return_self=True): _assert_domain_exists(domain) - if "." not in domain: - return domain + domains = _get_domains() + while "." in domain: + domain = domain.split(".", 1)[1] + if domain in domains: + return domain - parent_domain = domain.split(".", 1)[-1] - if parent_domain not in domain_list()["domains"]: - return domain # Domain is its own parent - - else: - return _get_parent_domain_of(parent_domain) + return domain if return_self else None @is_unit_operation() @@ -198,7 +200,7 @@ def domain_add(operation_logger, domain, dyndns=False): raise YunohostError("domain_creation_failed", domain=domain, error=e) finally: global domain_list_cache - domain_list_cache = {} + domain_list_cache = [] # Don't regen these conf if we're still in postinstall if os.path.exists("/etc/yunohost/installed"): @@ -255,7 +257,7 @@ def domain_remove(operation_logger, domain, remove_apps=False, force=False): # Check domain is not the main domain if domain == _get_maindomain(): - other_domains = domain_list()["domains"] + other_domains = _get_domains() other_domains.remove(domain) if other_domains: @@ -316,7 +318,7 @@ def domain_remove(operation_logger, domain, remove_apps=False, force=False): raise YunohostError("domain_deletion_failed", domain=domain, error=e) finally: global domain_list_cache - domain_list_cache = {} + domain_list_cache = [] stuff_to_delete = [ f"/etc/yunohost/certs/{domain}", @@ -380,8 +382,8 @@ def domain_main_domain(operation_logger, new_main_domain=None): # Apply changes to ssl certs try: write_to_file("/etc/yunohost/current_host", new_main_domain) - global domain_list_cache - domain_list_cache = {} + global main_domain_cache + main_domain_cache = None _set_hostname(new_main_domain) except Exception as e: logger.warning(str(e), exc_info=1) @@ -409,12 +411,6 @@ def domain_url_available(domain, path): return len(_get_conflicting_apps(domain, path)) == 0 -def _get_maindomain(): - with open("/etc/yunohost/current_host", "r") as f: - maindomain = f.readline().rstrip() - return maindomain - - def domain_config_get(domain, key="", full=False, export=False): """ Display a domain configuration