diff --git a/src/yunohost/certificate.py b/src/yunohost/certificate.py index 81f2f0e2..b9fcb9b3 100644 --- a/src/yunohost/certificate.py +++ b/src/yunohost/certificate.py @@ -73,8 +73,8 @@ DNS_RESOLVERS = [ "80.67.169.12", # FDN "80.67.169.40", # "89.234.141.66", # ARN - "141.255.128.100", # Aquilenet - "141.255.128.101", # + "141.255.128.100", # Aquilenet + "141.255.128.101", "89.234.186.18", # Grifon "80.67.188.188" # LDN ] @@ -107,7 +107,8 @@ def certificate_status(auth, domain_list, full=False): for domain in domain_list: # Is it in Yunohost domain list ? if domain not in yunohost_domains_list: - raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_domain_unknown', domain=domain)) + raise MoulinetteError(errno.EINVAL, m18n.n( + 'certmanager_domain_unknown', domain=domain)) certificates = {} @@ -124,11 +125,10 @@ def certificate_status(auth, domain_list, full=False): del status["domain"] certificates[domain] = status - return {"certificates" : certificates} + return {"certificates": certificates} def certificate_install(auth, domain_list, force=False, no_checks=False, self_signed=False, staging=False): - """ Install a Let's Encrypt certificate for given domains (all by default) @@ -145,11 +145,11 @@ def certificate_install(auth, domain_list, force=False, no_checks=False, self_si # not used anymore _check_old_letsencrypt_app() - if self_signed: _certificate_install_selfsigned(domain_list, force) else: - _certificate_install_letsencrypt(auth, domain_list, force, no_checks, staging) + _certificate_install_letsencrypt( + auth, domain_list, force, no_checks, staging) def _certificate_install_selfsigned(domain_list, force=False): @@ -158,7 +158,8 @@ def _certificate_install_selfsigned(domain_list, force=False): # Paths of files and folder we'll need date_tag = datetime.now().strftime("%Y%m%d.%H%M%S") - new_cert_folder = "%s/%s-history/%s-selfsigned" % (CERT_FOLDER, domain, date_tag) + new_cert_folder = "%s/%s-history/%s-selfsigned" % ( + CERT_FOLDER, domain, date_tag) original_ca_file = '/etc/ssl/certs/ca-yunohost_crt.pem' ssl_dir = '/usr/share/yunohost/yunohost-config/ssl/yunoCA' @@ -175,17 +176,18 @@ def _certificate_install_selfsigned(domain_list, force=False): if (not force) and (os.path.isfile(current_cert_file)): status = _get_status(domain) - if status["summary"]["code"] in ('good', 'great') : - raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_attempt_to_replace_valid_cert', domain=domain)) + if status["summary"]["code"] in ('good', 'great'): + raise MoulinetteError(errno.EINVAL, m18n.n( + 'certmanager_attempt_to_replace_valid_cert', domain=domain)) # Create output folder for new certificate stuff os.makedirs(new_cert_folder) # Create our conf file, based on template, replacing the occurences of # "yunohost.org" with the given domain - with open(conf_file, "w") as f : - with open(conf_template, "r") as template : - for line in template : + with open(conf_file, "w") as f: + with open(conf_template, "r") as template: + for line in template: f.write(line.replace("yunohost.org", domain)) # Use OpenSSL command line to create a certificate signing request, @@ -196,13 +198,15 @@ def _certificate_install_selfsigned(domain_list, force=False): commands.append("openssl ca -config %s -days 3650 -in %s -out %s -batch" % (conf_file, csr_file, crt_file)) - for command in commands : - p = subprocess.Popen(command.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + for command in commands: + p = subprocess.Popen( + command.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) out, _ = p.communicate() if p.returncode != 0: logger.warning(out) - raise MoulinetteError(errno.EIO, m18n.n('domain_cert_gen_failed')) - else : + raise MoulinetteError( + errno.EIO, m18n.n('domain_cert_gen_failed')) + else: logger.info(out) # Link the CA cert (not sure it's actually needed in practice though, @@ -229,10 +233,11 @@ def _certificate_install_selfsigned(domain_list, force=False): status = _get_status(domain) if status and status["CA_type"]["code"] == "self-signed" and status["validity"] > 3648: - logger.success(m18n.n("certmanager_cert_install_success_selfsigned", domain=domain)) - else : - logger.error("Installation of self-signed certificate installation for %s failed !", domain) - + logger.success( + m18n.n("certmanager_cert_install_success_selfsigned", domain=domain)) + else: + logger.error( + "Installation of self-signed certificate installation for %s failed !", domain) def _certificate_install_letsencrypt(auth, domain_list, force=False, no_checks=False, staging=False): @@ -255,20 +260,24 @@ def _certificate_install_letsencrypt(auth, domain_list, force=False, no_checks=F for domain in domain_list: yunohost_domains_list = yunohost.domain.domain_list(auth)['domains'] if domain not in yunohost_domains_list: - raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_domain_unknown', domain=domain)) + raise MoulinetteError(errno.EINVAL, m18n.n( + 'certmanager_domain_unknown', domain=domain)) # Is it self-signed ? status = _get_status(domain) if not force and status["CA_type"]["code"] != "self-signed": - raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_domain_cert_not_selfsigned', domain=domain)) + raise MoulinetteError(errno.EINVAL, m18n.n( + 'certmanager_domain_cert_not_selfsigned', domain=domain)) if (staging): - logger.warning("Please note that you used the --staging option, and that no new certificate will actually be enabled !") + logger.warning( + "Please note that you used the --staging option, and that no new certificate will actually be enabled !") # Actual install steps for domain in domain_list: - logger.info("Now attempting install of certificate for domain %s!", domain) + logger.info( + "Now attempting install of certificate for domain %s!", domain) try: if not no_checks: @@ -278,7 +287,8 @@ def _certificate_install_letsencrypt(auth, domain_list, force=False, no_checks=F _fetch_and_enable_new_certificate(domain, staging) _install_cron() - logger.success(m18n.n("certmanager_cert_install_success", domain=domain)) + logger.success( + m18n.n("certmanager_cert_install_success", domain=domain)) except Exception as e: logger.error("Certificate installation for %s failed !", domain) @@ -325,31 +335,37 @@ def certificate_renew(auth, domain_list, force=False, no_checks=False, email=Fal # Is it in Yunohost dmomain list ? if domain not in yunohost.domain.domain_list(auth)['domains']: - raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_domain_unknown', domain=domain)) + raise MoulinetteError(errno.EINVAL, m18n.n( + 'certmanager_domain_unknown', domain=domain)) status = _get_status(domain) # Does it expire soon ? if not force or status["validity"] <= VALIDITY_LIMIT: - raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_attempt_to_renew_valid_cert', domain=domain)) + raise MoulinetteError(errno.EINVAL, m18n.n( + 'certmanager_attempt_to_renew_valid_cert', domain=domain)) # Does it have a Let's Encrypt cert ? if status["CA_type"]["code"] != "lets-encrypt": - raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_attempt_to_renew_nonLE_cert', domain=domain)) + raise MoulinetteError(errno.EINVAL, m18n.n( + 'certmanager_attempt_to_renew_nonLE_cert', domain=domain)) if (staging): - logger.warning("Please note that you used the --staging option, and that no new certificate will actually be enabled !") + logger.warning( + "Please note that you used the --staging option, and that no new certificate will actually be enabled !") # Actual renew steps for domain in domain_list: - logger.info("Now attempting renewing of certificate for domain %s !", domain) + logger.info( + "Now attempting renewing of certificate for domain %s !", domain) try: if not no_checks: _check_domain_is_ready_for_ACME(domain) _fetch_and_enable_new_certificate(domain, staging) - logger.success(m18n.n("certmanager_cert_renew_success", domain=domain)) + logger.success( + m18n.n("certmanager_cert_renew_success", domain=domain)) except Exception as e: import traceback @@ -375,7 +391,8 @@ def _check_old_letsencrypt_app(): if "letsencrypt" not in installedAppIds: return - raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_old_letsencrypt_app_detected')) + raise MoulinetteError(errno.EINVAL, m18n.n( + 'certmanager_old_letsencrypt_app_detected')) def _install_cron(): @@ -436,21 +453,25 @@ location '/.well-known/acme-challenge' } ''' % WEBROOT_FOLDER - # Check there isn't a conflicting file for the acme-challenge well-known uri + # Check there isn't a conflicting file for the acme-challenge well-known + # uri for path in glob.glob('%s/*.conf' % nginx_conf_folder): - if (path == nginx_conf_file) : + if (path == nginx_conf_file): continue with open(path) as f: contents = f.read() - if ('/.well-known/acme-challenge' in contents) : - raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_conflicting_nginx_file', filepath=path)) + if ('/.well-known/acme-challenge' in contents): + raise MoulinetteError(errno.EINVAL, m18n.n( + 'certmanager_conflicting_nginx_file', filepath=path)) # Write the conf if os.path.exists(nginx_conf_file): - logger.info("Nginx configuration file for ACME challenge already exists for domain, skipping.") + logger.info( + "Nginx configuration file for ACME challenge already exists for domain, skipping.") return - logger.info("Adding Nginx configuration file for Acme challenge for domain %s.", domain) + logger.info( + "Adding Nginx configuration file for Acme challenge for domain %s.", domain) with open(nginx_conf_file, "w") as f: f.write(nginx_configuration) @@ -477,7 +498,8 @@ def _fetch_and_enable_new_certificate(domain, staging=False): _set_permissions(TMP_FOLDER, "root", "root", 0640) # Prepare certificate signing request - logger.info("Prepare key and certificate signing request (CSR) for %s...", domain) + logger.info( + "Prepare key and certificate signing request (CSR) for %s...", domain) domain_key_file = "%s/%s.pem" % (TMP_FOLDER, domain) _generate_key(domain_key_file) @@ -503,13 +525,16 @@ def _fetch_and_enable_new_certificate(domain, staging=False): CA=certification_authority) except ValueError as e: if ("urn:acme:error:rateLimited" in str(e)): - raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_hit_rate_limit', domain=domain)) + raise MoulinetteError(errno.EINVAL, m18n.n( + 'certmanager_hit_rate_limit', domain=domain)) else: logger.error(str(e)) - raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_cert_signing_failed')) + raise MoulinetteError(errno.EINVAL, m18n.n( + 'certmanager_cert_signing_failed')) except Exception as e: logger.error(str(e)) - raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_cert_signing_failed')) + raise MoulinetteError(errno.EINVAL, m18n.n( + 'certmanager_cert_signing_failed')) intermediate_certificate = requests.get(INTERMEDIATE_CERTIFICATE_URL).text @@ -524,7 +549,8 @@ def _fetch_and_enable_new_certificate(domain, staging=False): else: folder_flag = "letsencrypt" - new_cert_folder = "%s/%s-history/%s-%s" % (CERT_FOLDER, domain, date_tag, folder_flag) + new_cert_folder = "%s/%s-history/%s-%s" % ( + CERT_FOLDER, domain, date_tag, folder_flag) os.makedirs(new_cert_folder) _set_permissions(new_cert_folder, "root", "root", 0655) @@ -550,7 +576,8 @@ def _fetch_and_enable_new_certificate(domain, staging=False): status_summary = _get_status(domain)["summary"] if status_summary["code"] != "great": - raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_certificate_fetching_or_enabling_failed', domain=domain)) + raise MoulinetteError(errno.EINVAL, m18n.n( + 'certmanager_certificate_fetching_or_enabling_failed', domain=domain)) def _prepare_certificate_signing_request(domain, key_file, output_folder): @@ -582,14 +609,17 @@ def _get_status(domain): cert_file = os.path.join(CERT_FOLDER, domain, "crt.pem") if not os.path.isfile(cert_file): - raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_no_cert_file', domain=domain, file=cert_file)) + raise MoulinetteError(errno.EINVAL, m18n.n( + 'certmanager_no_cert_file', domain=domain, file=cert_file)) try: - cert = crypto.load_certificate(crypto.FILETYPE_PEM, open(cert_file).read()) + cert = crypto.load_certificate( + crypto.FILETYPE_PEM, open(cert_file).read()) except Exception as exception: import traceback traceback.print_exc(file=sys.stdout) - raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_cannot_read_cert', domain=domain, file=cert_file, reason=exception)) + raise MoulinetteError(errno.EINVAL, m18n.n( + 'certmanager_cannot_read_cert', domain=domain, file=cert_file, reason=exception)) cert_subject = cert.get_subject().CN cert_issuer = cert.get_issuer().CN @@ -626,7 +656,7 @@ def _get_status(domain): "verbose": "CRITICAL", } - elif CA_type["code"] in ("self-signed","fake-lets-encrypt"): + elif CA_type["code"] in ("self-signed", "fake-lets-encrypt"): status_summary = { "code": "warning", "verbose": "WARNING", @@ -699,13 +729,13 @@ def _set_permissions(path, user, group, permissions): os.chmod(path, permissions) -def _enable_certificate(domain, new_cert_folder) : +def _enable_certificate(domain, new_cert_folder): logger.info("Enabling the certificate for domain %s ...", domain) live_link = os.path.join(CERT_FOLDER, domain) # If a live link (or folder) already exists - if os.path.exists(live_link) : + if os.path.exists(live_link): # If it's not a link ... expect if to be a folder if not os.path.islink(live_link): # Backup it and remove it @@ -741,11 +771,13 @@ def _check_domain_is_ready_for_ACME(domain): # Check if IP from DNS matches public IP if not _dns_ip_match_public_ip(public_ip, domain): - raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_domain_dns_ip_differs_from_public_ip', domain=domain)) + raise MoulinetteError(errno.EINVAL, m18n.n( + 'certmanager_domain_dns_ip_differs_from_public_ip', domain=domain)) # Check if domain seems to be accessible through HTTP ? if not _domain_is_accessible_through_HTTP(public_ip, domain): - raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_domain_http_not_working', domain=domain)) + raise MoulinetteError(errno.EINVAL, m18n.n( + 'certmanager_domain_http_not_working', domain=domain)) def _dns_ip_match_public_ip(public_ip, domain): @@ -754,7 +786,8 @@ def _dns_ip_match_public_ip(public_ip, domain): resolver.nameservers = DNS_RESOLVERS answers = resolver.query(domain, "A") except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN): - raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_error_no_A_record', domain=domain)) + raise MoulinetteError(errno.EINVAL, m18n.n( + 'certmanager_error_no_A_record', domain=domain)) dns_ip = str(answers[0]) @@ -771,7 +804,8 @@ def _domain_is_accessible_through_HTTP(ip, domain): def _name_self_CA(): - cert = crypto.load_certificate(crypto.FILETYPE_PEM, open(SELF_CA_FILE).read()) + cert = crypto.load_certificate( + crypto.FILETYPE_PEM, open(SELF_CA_FILE).read()) return cert.get_subject().CN