[mod] more pythonic code

This commit is contained in:
Laurent Peuch 2016-10-30 04:56:06 +01:00
parent 917c230735
commit 29f5f2d753

View file

@ -25,12 +25,12 @@
"""
import os
import sys
import errno
import requests
import shutil
import pwd
import grp
import json
import smtplib
from OpenSSL import crypto
@ -100,18 +100,23 @@ def certificate_status(auth, domainList, full=False):
headers = ["Domain", "Certificate status", "Authority type", "Days remaining"]
else:
headers = ["Domain", "Certificate subject", "Certificate status", "Authority type", "Authority name", "Days remaining"]
lines = []
for domain in domainList:
status = _get_status(domain)
line = []
line.append(domain)
if (full):
if full:
line.append(status["subject"])
line.append(_summary_code_to_string(status["summaryCode"]))
line.append(status["CAtype"])
if (full):
if full:
line.append(status["CAname"])
line.append(status["validity"])
lines.append(line)
@ -142,20 +147,19 @@ def certificate_install_selfsigned(domainList, force=False):
# Check we ain't trying to overwrite a good cert !
status = _get_status(domain)
if (status != {}) and (status["summaryCode"] > 0) and (not force):
if status != {} and status["summaryCode"] > 0 and not force:
raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_attempt_to_replace_valid_cert', domain=domain))
cert_folder_domain = cert_folder + "/" + domain
cert_folder_domain = os.path.join(cert_folder, domain)
# Create cert folder if it does not exists yet
try:
os.listdir(cert_folder_domain)
except OSError:
if not os.path.exists(cert_folder_domain):
os.makedirs(cert_folder_domain)
# Get serial
ssl_dir = '/usr/share/yunohost/yunohost-config/ssl/yunoCA'
with open('%s/serial' % ssl_dir, 'r') as f:
with open(os.path.join(ssl_dir, 'serial'), 'r') as f:
serial = f.readline().rstrip()
# FIXME : should refactor this to avoid so many os.system() calls...
@ -178,25 +182,22 @@ def certificate_install_selfsigned(domainList, force=False):
raise MoulinetteError(errno.EIO, m18n.n('certmanager_domain_cert_gen_failed'))
_set_permissions(cert_folder_domain, "root", "root", 0755)
_set_permissions(cert_folder_domain + "/key.pem", "root", "metronome", 0640)
_set_permissions(cert_folder_domain + "/crt.pem", "root", "metronome", 0640)
_set_permissions(cert_folder_domain + "/openssl.cnf", "root", "root", 0600)
_set_permissions(os.path.join(cert_folder_domain, "key.pem"), "root", "metronome", 0640)
_set_permissions(os.path.join(cert_folder_domain, "crt.pem"), "root", "metronome", 0640)
_set_permissions(os.path.join(cert_folder_domain, "openssl.cnf"), "root", "root", 0600)
# Install ACME / Let's Encrypt certificate
def certificate_install_letsencrypt(auth, domainList, force=False, no_checks=False):
if not os.path.exists(account_key_file):
_generate_account_key()
# If no domains given, consider all yunohost domains with self-signed
# certificates
if (domainList == []):
if domainList == []:
for domain in yunohost.domain.domain_list(auth)['domains']:
# Is it self-signed ?
status = _get_status(domain)
if (status["CAtype"] != "Self-signed"):
if status["CAtype"] != "Self-signed":
continue
domainList.append(domain)
@ -210,7 +211,7 @@ def certificate_install_letsencrypt(auth, domainList, force=False, no_checks=Fal
# Is it self-signed ?
status = _get_status(domain)
if (not force) and (status["CAtype"] != "Self-signed"):
if not force and status["CAtype"] != "Self-signed":
raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_domain_cert_not_selfsigned', domain=domain))
# Actual install steps
@ -219,9 +220,9 @@ def certificate_install_letsencrypt(auth, domainList, force=False, no_checks=Fal
logger.info("Now attempting install of certificate for domain " + domain + " !")
try:
if not no_checks:
_check_domain_is_correctly_configured(domain)
_backup_current_cert(domain)
_configure_for_acme_challenge(auth, domain)
_fetch_and_enable_new_certificate(domain)
@ -230,14 +231,10 @@ def certificate_install_letsencrypt(auth, domainList, force=False, no_checks=Fal
logger.success(m18n.n("certmanager_cert_install_success", domain=domain))
except Exception as e:
logger.error("Certificate installation for " + domain + " failed !")
logger.error("Certificate installation for %s failed !" % domain)
logger.error(str(e))
# Renew
def certificate_renew(auth, domainList, force=False, no_checks=False, email=False):
"""
Renew Let's Encrypt certificate for given domains (all by default)
@ -257,7 +254,7 @@ def certificate_renew(auth, domainList, force=False, no_checks=False, email=Fals
# Does it has a Let's Encrypt cert ?
status = _get_status(domain)
if (status["CAtype"] != "Let's Encrypt"):
if status["CAtype"] != "Let's Encrypt":
continue
# Does it expires soon ?
@ -278,20 +275,18 @@ def certificate_renew(auth, domainList, force=False, no_checks=False, email=Fals
status = _get_status(domain)
# Does it expires soon ?
if not ((force) or (status["validity"] <= validity_limit)):
if not (force or status["validity"] <= validity_limit):
raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_attempt_to_renew_valid_cert', domain=domain))
# Does it has a Let's Encrypt cert ?
if (status["CAtype"] != "Let's Encrypt"):
if status["CAtype"] != "Let's Encrypt":
raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_attempt_to_renew_nonLE_cert', domain=domain))
# Actual renew steps
for domain in domainList:
logger.info("Now attempting renewing of certificate for domain " + domain + " !")
try:
if not no_checks:
_check_domain_is_correctly_configured(domain)
_backup_current_cert(domain)
@ -300,11 +295,10 @@ def certificate_renew(auth, domainList, force=False, no_checks=False, email=Fals
logger.success(m18n.n("certmanager_cert_renew_success", domain=domain))
except Exception as e:
logger.error("Certificate renewing for " + domain + " failed !")
logger.error(str(e))
if (email):
if email:
logger.error("Sending email with details to root ...")
_email_renewing_failed(domain, e)
@ -317,7 +311,6 @@ def _install_cron():
cron_job_file = "/etc/cron.weekly/certificateRenewer"
with open(cron_job_file, "w") as f:
f.write("#!/bin/bash\n")
f.write("yunohost domain cert-renew --email\n")
@ -325,9 +318,9 @@ def _install_cron():
def _email_renewing_failed(domain, e):
from_ = "certmanager@" + domain + " (Certificate Manager)"
from_ = "certmanager@%s (Certificate Manager)" % domain
to_ = "root"
subject_ = "Certificate renewing attempt for " + domain + " failed!"
subject_ = "Certificate renewing attempt for %s failed!" % domain
exceptionMessage = str(e)
logs = _tail(50, "/var/log/yunohost/yunohost-cli.log")
@ -360,7 +353,7 @@ Subject: %s
def _configure_for_acme_challenge(auth, domain):
nginx_conf_file = "/etc/nginx/conf.d/" + domain + ".d/000-acmechallenge.conf"
nginx_conf_file = "/etc/nginx/conf.d/%s.d/000-acmechallenge.conf" % domain
nginx_configuration = '''
location '/.well-known/acme-challenge'
@ -375,6 +368,7 @@ location '/.well-known/acme-challenge'
logger.info("Nginx configuration file for ACME challenge already exists for domain, skipping.")
else:
logger.info("Adding Nginx configuration file for Acme challenge for domain " + domain + ".")
with open(nginx_conf_file, "w") as f:
f.write(nginx_configuration)
@ -390,17 +384,19 @@ def _fetch_and_enable_new_certificate(domain):
# Make sure tmp folder exists
logger.debug("Making sure tmp folders exists...")
if not (os.path.exists(webroot_folder)):
if not os.path.exists(webroot_folder):
os.makedirs(webroot_folder)
if not (os.path.exists(tmp_folder)):
if not os.path.exists(tmp_folder):
os.makedirs(tmp_folder)
_set_permissions(webroot_folder, "root", "www-data", 0650)
_set_permissions(tmp_folder, "root", "root", 0640)
# Prepare certificate signing request
logger.info("Prepare key and certificate signing request (CSR) for " + domain + "...")
domain_key_file = tmp_folder + "/" + domain + ".pem"
domain_key_file = "%s/%s.pem" % (tmp_folder, domain)
_generate_key(domain_key_file)
_set_permissions(domain_key_file, "root", "metronome", 0640)
@ -409,13 +405,14 @@ def _fetch_and_enable_new_certificate(domain):
# Sign the certificate
logger.info("Now using ACME Tiny to sign the certificate...")
domain_csr_file = tmp_folder + "/" + domain + ".csr"
domain_csr_file = "%s/%s.csr" % (tmp_folder, domain)
signed_certificate = sign_certificate(account_key_file,
domain_csr_file,
webroot_folder,
log=logger,
CA=certification_authority)
intermediate_certificate = requests.get(intermediate_certificate_url).text
# Now save the key and signed certificate
@ -423,42 +420,47 @@ def _fetch_and_enable_new_certificate(domain):
# Create corresponding directory
date_tag = datetime.now().strftime("%Y%m%d.%H%M%S")
new_cert_folder = cert_folder + "/" + domain + "." + date_tag
new_cert_folder = "%s/%s.%s" % (cert_folder, domain, date_tag)
os.makedirs(new_cert_folder)
_set_permissions(new_cert_folder, "root", "root", 0655)
# Move the private key
shutil.move(domain_key_file, new_cert_folder + "/key.pem")
shutil.move(domain_key_file, os.path.join(new_cert_folder, "key.pem"))
# Write the cert
domain_cert_file = new_cert_folder + "/crt.pem"
domain_cert_file = os.path.join(new_cert_folder, "crt.pem")
with open(domain_cert_file, "w") as f:
f.write(signed_certificate)
f.write(intermediate_certificate)
_set_permissions(domain_cert_file, "root", "metronome", 0640)
logger.info("Enabling the new certificate...")
# Replace (if necessary) the link or folder for live cert
live_link = cert_folder + "/" + domain
live_link = os.path.join(cert_folder, domain)
if not os.path.islink(live_link):
shutil.rmtree(live_link) # Well, yep, hopefully that's not too dangerous (directory should have been backuped before calling this command)
elif os.path.lexists(live_link):
os.remove(live_link)
os.symlink(new_cert_folder, live_link)
# Check the status of the certificate is now good
statusSummaryCode = _get_status(domain)["summaryCode"]
if (statusSummaryCode < 20):
if statusSummaryCode < 20:
raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_certificate_fetching_or_enabling_failed', domain=domain))
logger.info("Restarting services...")
for s in ["nginx", "postfix", "dovecot", "metronome"]:
_run_service_command("restart", s)
for service in ("nginx", "postfix", "dovecot", "metronome"):
_run_service_command("restart", service)
def _prepare_certificate_signing_request(domain, key_file, output_folder):
@ -471,6 +473,7 @@ def _prepare_certificate_signing_request(domain, key_file, output_folder):
# Set the key
with open(key_file, 'rt') as f:
key = crypto.load_privatekey(crypto.FILETYPE_PEM, f.read())
csr.set_pubkey(key)
# Sign the request
@ -479,6 +482,7 @@ def _prepare_certificate_signing_request(domain, key_file, output_folder):
# Save the request in tmp folder
csr_file = output_folder + domain + ".csr"
logger.info("Saving to " + csr_file + " .")
with open(csr_file, "w") as f:
f.write(crypto.dump_certificate_request(crypto.FILETYPE_PEM, csr))
@ -486,7 +490,7 @@ def _prepare_certificate_signing_request(domain, key_file, output_folder):
def _get_status(domain):
cert_file = cert_folder + "/" + domain + "/crt.pem"
if (not os.path.isfile(cert_file)):
if not os.path.isfile(cert_file):
return {}
try:
@ -504,38 +508,47 @@ def _get_status(domain):
CAtype = None
if (certIssuer == _name_selfCA()):
CAtype = "Self-signed"
elif (certIssuer.startswith("Let's Encrypt")):
CAtype = "Let's Encrypt"
elif (certIssuer.startswith("Fake LE")):
CAtype = "Fake Let's Encrypt"
else:
CAtype = "Other / Unknown"
# Unknown by default
statusSummaryCode = 0
# Critical
if (daysRemaining <= 0):
statusSummaryCode = -30
# Warning, self-signed, browser will display a warning discouraging visitors to enter website
elif (CAtype == "Self-signed") or (CAtype == "Fake Let's Encrypt"):
statusSummaryCode = -20
# Attention, certificate will expire soon (should be renewed automatically if Let's Encrypt)
elif (daysRemaining < validity_limit):
statusSummaryCode = -10
# CA not known, but still a valid certificate, so okay !
elif (CAtype == "Other / Unknown"):
statusSummaryCode = 10
# Let's Encrypt, great !
elif (CAtype == "Let's Encrypt"):
statusSummaryCode = 20
return {"domain": domain,
"subject": certSubject,
"CAname": certIssuer,
"CAtype": CAtype,
"validity": daysRemaining,
"summaryCode": statusSummaryCode
}
return {
"domain": domain,
"subject": certSubject,
"CAname": certIssuer,
"CAtype": CAtype,
"validity": daysRemaining,
"summaryCode": statusSummaryCode
}
###############################################################################
# Misc small stuff ... #
@ -567,10 +580,10 @@ def _set_permissions(path, user, group, permissions):
def _backup_current_cert(domain):
logger.info("Backuping existing certificate for domain " + domain)
cert_folder_domain = cert_folder + "/" + domain
cert_folder_domain = os.path.join(cert_folder, domain)
dateTag = datetime.now().strftime("%Y%m%d.%H%M%S")
backup_folder = cert_folder_domain + "-backup-" + dateTag
backup_folder = "%s-backup-%s" % (cert_folder_domain, dateTag)
shutil.copytree(cert_folder_domain, backup_folder)