diff --git a/src/yunohost/certificate.py b/src/yunohost/certificate.py index 89c00943c..c318b63c2 100644 --- a/src/yunohost/certificate.py +++ b/src/yunohost/certificate.py @@ -25,12 +25,12 @@ """ import os +import sys import errno import requests import shutil import pwd import grp -import json import smtplib from OpenSSL import crypto @@ -100,18 +100,23 @@ def certificate_status(auth, domainList, full=False): headers = ["Domain", "Certificate status", "Authority type", "Days remaining"] else: headers = ["Domain", "Certificate subject", "Certificate status", "Authority type", "Authority name", "Days remaining"] + lines = [] for domain in domainList: status = _get_status(domain) line = [] line.append(domain) - if (full): + + if full: line.append(status["subject"]) + line.append(_summary_code_to_string(status["summaryCode"])) line.append(status["CAtype"]) - if (full): + + if full: line.append(status["CAname"]) + line.append(status["validity"]) lines.append(line) @@ -142,20 +147,19 @@ def certificate_install_selfsigned(domainList, force=False): # Check we ain't trying to overwrite a good cert ! status = _get_status(domain) - if (status != {}) and (status["summaryCode"] > 0) and (not force): + + if status != {} and status["summaryCode"] > 0 and not force: raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_attempt_to_replace_valid_cert', domain=domain)) - cert_folder_domain = cert_folder + "/" + domain + cert_folder_domain = os.path.join(cert_folder, domain) # Create cert folder if it does not exists yet - try: - os.listdir(cert_folder_domain) - except OSError: + if not os.path.exists(cert_folder_domain): os.makedirs(cert_folder_domain) # Get serial ssl_dir = '/usr/share/yunohost/yunohost-config/ssl/yunoCA' - with open('%s/serial' % ssl_dir, 'r') as f: + with open(os.path.join(ssl_dir, 'serial'), 'r') as f: serial = f.readline().rstrip() # FIXME : should refactor this to avoid so many os.system() calls... @@ -178,25 +182,22 @@ def certificate_install_selfsigned(domainList, force=False): raise MoulinetteError(errno.EIO, m18n.n('certmanager_domain_cert_gen_failed')) _set_permissions(cert_folder_domain, "root", "root", 0755) - _set_permissions(cert_folder_domain + "/key.pem", "root", "metronome", 0640) - _set_permissions(cert_folder_domain + "/crt.pem", "root", "metronome", 0640) - _set_permissions(cert_folder_domain + "/openssl.cnf", "root", "root", 0600) + _set_permissions(os.path.join(cert_folder_domain, "key.pem"), "root", "metronome", 0640) + _set_permissions(os.path.join(cert_folder_domain, "crt.pem"), "root", "metronome", 0640) + _set_permissions(os.path.join(cert_folder_domain, "openssl.cnf"), "root", "root", 0600) -# Install ACME / Let's Encrypt certificate - def certificate_install_letsencrypt(auth, domainList, force=False, no_checks=False): if not os.path.exists(account_key_file): _generate_account_key() # If no domains given, consider all yunohost domains with self-signed # certificates - if (domainList == []): + if domainList == []: for domain in yunohost.domain.domain_list(auth)['domains']: - # Is it self-signed ? status = _get_status(domain) - if (status["CAtype"] != "Self-signed"): + if status["CAtype"] != "Self-signed": continue domainList.append(domain) @@ -210,7 +211,7 @@ def certificate_install_letsencrypt(auth, domainList, force=False, no_checks=Fal # Is it self-signed ? status = _get_status(domain) - if (not force) and (status["CAtype"] != "Self-signed"): + if not force and status["CAtype"] != "Self-signed": raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_domain_cert_not_selfsigned', domain=domain)) # Actual install steps @@ -219,9 +220,9 @@ def certificate_install_letsencrypt(auth, domainList, force=False, no_checks=Fal logger.info("Now attempting install of certificate for domain " + domain + " !") try: - if not no_checks: _check_domain_is_correctly_configured(domain) + _backup_current_cert(domain) _configure_for_acme_challenge(auth, domain) _fetch_and_enable_new_certificate(domain) @@ -230,14 +231,10 @@ def certificate_install_letsencrypt(auth, domainList, force=False, no_checks=Fal logger.success(m18n.n("certmanager_cert_install_success", domain=domain)) except Exception as e: - - logger.error("Certificate installation for " + domain + " failed !") + logger.error("Certificate installation for %s failed !" % domain) logger.error(str(e)) -# Renew - - def certificate_renew(auth, domainList, force=False, no_checks=False, email=False): """ Renew Let's Encrypt certificate for given domains (all by default) @@ -257,7 +254,7 @@ def certificate_renew(auth, domainList, force=False, no_checks=False, email=Fals # Does it has a Let's Encrypt cert ? status = _get_status(domain) - if (status["CAtype"] != "Let's Encrypt"): + if status["CAtype"] != "Let's Encrypt": continue # Does it expires soon ? @@ -278,20 +275,18 @@ def certificate_renew(auth, domainList, force=False, no_checks=False, email=Fals status = _get_status(domain) # Does it expires soon ? - if not ((force) or (status["validity"] <= validity_limit)): + if not (force or status["validity"] <= validity_limit): raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_attempt_to_renew_valid_cert', domain=domain)) # Does it has a Let's Encrypt cert ? - if (status["CAtype"] != "Let's Encrypt"): + if status["CAtype"] != "Let's Encrypt": raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_attempt_to_renew_nonLE_cert', domain=domain)) # Actual renew steps for domain in domainList: - logger.info("Now attempting renewing of certificate for domain " + domain + " !") try: - if not no_checks: _check_domain_is_correctly_configured(domain) _backup_current_cert(domain) @@ -300,11 +295,10 @@ def certificate_renew(auth, domainList, force=False, no_checks=False, email=Fals logger.success(m18n.n("certmanager_cert_renew_success", domain=domain)) except Exception as e: - logger.error("Certificate renewing for " + domain + " failed !") logger.error(str(e)) - if (email): + if email: logger.error("Sending email with details to root ...") _email_renewing_failed(domain, e) @@ -317,7 +311,6 @@ def _install_cron(): cron_job_file = "/etc/cron.weekly/certificateRenewer" with open(cron_job_file, "w") as f: - f.write("#!/bin/bash\n") f.write("yunohost domain cert-renew --email\n") @@ -325,9 +318,9 @@ def _install_cron(): def _email_renewing_failed(domain, e): - from_ = "certmanager@" + domain + " (Certificate Manager)" + from_ = "certmanager@%s (Certificate Manager)" % domain to_ = "root" - subject_ = "Certificate renewing attempt for " + domain + " failed!" + subject_ = "Certificate renewing attempt for %s failed!" % domain exceptionMessage = str(e) logs = _tail(50, "/var/log/yunohost/yunohost-cli.log") @@ -360,7 +353,7 @@ Subject: %s def _configure_for_acme_challenge(auth, domain): - nginx_conf_file = "/etc/nginx/conf.d/" + domain + ".d/000-acmechallenge.conf" + nginx_conf_file = "/etc/nginx/conf.d/%s.d/000-acmechallenge.conf" % domain nginx_configuration = ''' location '/.well-known/acme-challenge' @@ -375,6 +368,7 @@ location '/.well-known/acme-challenge' logger.info("Nginx configuration file for ACME challenge already exists for domain, skipping.") else: logger.info("Adding Nginx configuration file for Acme challenge for domain " + domain + ".") + with open(nginx_conf_file, "w") as f: f.write(nginx_configuration) @@ -390,17 +384,19 @@ def _fetch_and_enable_new_certificate(domain): # Make sure tmp folder exists logger.debug("Making sure tmp folders exists...") - if not (os.path.exists(webroot_folder)): + if not os.path.exists(webroot_folder): os.makedirs(webroot_folder) - if not (os.path.exists(tmp_folder)): + + if not os.path.exists(tmp_folder): os.makedirs(tmp_folder) + _set_permissions(webroot_folder, "root", "www-data", 0650) _set_permissions(tmp_folder, "root", "root", 0640) # Prepare certificate signing request logger.info("Prepare key and certificate signing request (CSR) for " + domain + "...") - domain_key_file = tmp_folder + "/" + domain + ".pem" + domain_key_file = "%s/%s.pem" % (tmp_folder, domain) _generate_key(domain_key_file) _set_permissions(domain_key_file, "root", "metronome", 0640) @@ -409,13 +405,14 @@ def _fetch_and_enable_new_certificate(domain): # Sign the certificate logger.info("Now using ACME Tiny to sign the certificate...") - domain_csr_file = tmp_folder + "/" + domain + ".csr" + domain_csr_file = "%s/%s.csr" % (tmp_folder, domain) signed_certificate = sign_certificate(account_key_file, domain_csr_file, webroot_folder, log=logger, CA=certification_authority) + intermediate_certificate = requests.get(intermediate_certificate_url).text # Now save the key and signed certificate @@ -423,42 +420,47 @@ def _fetch_and_enable_new_certificate(domain): # Create corresponding directory date_tag = datetime.now().strftime("%Y%m%d.%H%M%S") - new_cert_folder = cert_folder + "/" + domain + "." + date_tag + + new_cert_folder = "%s/%s.%s" % (cert_folder, domain, date_tag) os.makedirs(new_cert_folder) + _set_permissions(new_cert_folder, "root", "root", 0655) # Move the private key - shutil.move(domain_key_file, new_cert_folder + "/key.pem") + shutil.move(domain_key_file, os.path.join(new_cert_folder, "key.pem")) # Write the cert - domain_cert_file = new_cert_folder + "/crt.pem" + domain_cert_file = os.path.join(new_cert_folder, "crt.pem") + with open(domain_cert_file, "w") as f: f.write(signed_certificate) f.write(intermediate_certificate) + _set_permissions(domain_cert_file, "root", "metronome", 0640) logger.info("Enabling the new certificate...") # Replace (if necessary) the link or folder for live cert - live_link = cert_folder + "/" + domain + live_link = os.path.join(cert_folder, domain) if not os.path.islink(live_link): shutil.rmtree(live_link) # Well, yep, hopefully that's not too dangerous (directory should have been backuped before calling this command) + elif os.path.lexists(live_link): os.remove(live_link) os.symlink(new_cert_folder, live_link) # Check the status of the certificate is now good - statusSummaryCode = _get_status(domain)["summaryCode"] - if (statusSummaryCode < 20): + + if statusSummaryCode < 20: raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_certificate_fetching_or_enabling_failed', domain=domain)) logger.info("Restarting services...") - for s in ["nginx", "postfix", "dovecot", "metronome"]: - _run_service_command("restart", s) + for service in ("nginx", "postfix", "dovecot", "metronome"): + _run_service_command("restart", service) def _prepare_certificate_signing_request(domain, key_file, output_folder): @@ -471,6 +473,7 @@ def _prepare_certificate_signing_request(domain, key_file, output_folder): # Set the key with open(key_file, 'rt') as f: key = crypto.load_privatekey(crypto.FILETYPE_PEM, f.read()) + csr.set_pubkey(key) # Sign the request @@ -479,6 +482,7 @@ def _prepare_certificate_signing_request(domain, key_file, output_folder): # Save the request in tmp folder csr_file = output_folder + domain + ".csr" logger.info("Saving to " + csr_file + " .") + with open(csr_file, "w") as f: f.write(crypto.dump_certificate_request(crypto.FILETYPE_PEM, csr)) @@ -486,7 +490,7 @@ def _prepare_certificate_signing_request(domain, key_file, output_folder): def _get_status(domain): cert_file = cert_folder + "/" + domain + "/crt.pem" - if (not os.path.isfile(cert_file)): + if not os.path.isfile(cert_file): return {} try: @@ -504,38 +508,47 @@ def _get_status(domain): CAtype = None if (certIssuer == _name_selfCA()): CAtype = "Self-signed" + elif (certIssuer.startswith("Let's Encrypt")): CAtype = "Let's Encrypt" + elif (certIssuer.startswith("Fake LE")): CAtype = "Fake Let's Encrypt" + else: CAtype = "Other / Unknown" # Unknown by default statusSummaryCode = 0 + # Critical if (daysRemaining <= 0): statusSummaryCode = -30 + # Warning, self-signed, browser will display a warning discouraging visitors to enter website elif (CAtype == "Self-signed") or (CAtype == "Fake Let's Encrypt"): statusSummaryCode = -20 + # Attention, certificate will expire soon (should be renewed automatically if Let's Encrypt) elif (daysRemaining < validity_limit): statusSummaryCode = -10 + # CA not known, but still a valid certificate, so okay ! elif (CAtype == "Other / Unknown"): statusSummaryCode = 10 + # Let's Encrypt, great ! elif (CAtype == "Let's Encrypt"): statusSummaryCode = 20 - return {"domain": domain, - "subject": certSubject, - "CAname": certIssuer, - "CAtype": CAtype, - "validity": daysRemaining, - "summaryCode": statusSummaryCode - } + return { + "domain": domain, + "subject": certSubject, + "CAname": certIssuer, + "CAtype": CAtype, + "validity": daysRemaining, + "summaryCode": statusSummaryCode + } ############################################################################### # Misc small stuff ... # @@ -567,10 +580,10 @@ def _set_permissions(path, user, group, permissions): def _backup_current_cert(domain): logger.info("Backuping existing certificate for domain " + domain) - cert_folder_domain = cert_folder + "/" + domain + cert_folder_domain = os.path.join(cert_folder, domain) dateTag = datetime.now().strftime("%Y%m%d.%H%M%S") - backup_folder = cert_folder_domain + "-backup-" + dateTag + backup_folder = "%s-backup-%s" % (cert_folder_domain, dateTag) shutil.copytree(cert_folder_domain, backup_folder)