mirror of
https://github.com/YunoHost/yunohost.git
synced 2024-09-03 20:06:10 +02:00
[mod] autopep8
This commit is contained in:
parent
d47f5919d6
commit
bec8f63479
1 changed files with 210 additions and 205 deletions
|
@ -38,11 +38,13 @@ from datetime import datetime
|
|||
from tabulate import tabulate
|
||||
from acme_tiny import get_crt as sign_certificate
|
||||
|
||||
import yunohost.domain
|
||||
|
||||
from moulinette.core import MoulinetteError
|
||||
from moulinette.utils.log import getActionLogger
|
||||
import yunohost.domain
|
||||
from yunohost.service import _run_service_command
|
||||
|
||||
from yunohost.app import app_ssowatconf
|
||||
from yunohost.service import _run_service_command
|
||||
|
||||
|
||||
logger = getActionLogger('yunohost.certmanager')
|
||||
|
@ -73,6 +75,7 @@ intermediate_certificate_url = "https://letsencrypt.org/certs/lets-encrypt-x3-cr
|
|||
|
||||
# Status
|
||||
|
||||
|
||||
def certificate_status(auth, domainList, full=False):
|
||||
"""
|
||||
Print the status of certificate for given domains (all by default)
|
||||
|
@ -83,7 +86,8 @@ def certificate_status(auth, domainList, full = False):
|
|||
"""
|
||||
|
||||
# If no domains given, consider all yunohost domains
|
||||
if (domainList == []) : domainList = yunohost.domain.domain_list(auth)['domains']
|
||||
if (domainList == []):
|
||||
domainList = yunohost.domain.domain_list(auth)['domains']
|
||||
# Else, validate that yunohost knows the domains given
|
||||
else:
|
||||
for domain in domainList:
|
||||
|
@ -102,10 +106,12 @@ def certificate_status(auth, domainList, full = False):
|
|||
|
||||
line = []
|
||||
line.append(domain)
|
||||
if (full) : line.append(status["subject"])
|
||||
if (full):
|
||||
line.append(status["subject"])
|
||||
line.append(_summary_code_to_string(status["summaryCode"]))
|
||||
line.append(status["CAtype"])
|
||||
if (full) : line.append(status["CAname"])
|
||||
if (full):
|
||||
line.append(status["CAname"])
|
||||
line.append(status["validity"])
|
||||
lines.append(line)
|
||||
|
||||
|
@ -132,7 +138,6 @@ def certificate_install(auth, domainList, force=False, no_checks=False, self_sig
|
|||
# Install self-signed
|
||||
|
||||
def certificate_install_selfsigned(domainList, force=False):
|
||||
|
||||
for domain in domainList:
|
||||
|
||||
# Check we ain't trying to overwrite a good cert !
|
||||
|
@ -140,7 +145,6 @@ def certificate_install_selfsigned(domainList, force=False) :
|
|||
if (status != {}) and (status["summaryCode"] > 0) and (not force):
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_attempt_to_replace_valid_cert', domain=domain))
|
||||
|
||||
|
||||
cert_folder_domain = cert_folder + "/" + domain
|
||||
|
||||
# Create cert folder if it does not exists yet
|
||||
|
@ -173,18 +177,15 @@ def certificate_install_selfsigned(domainList, force=False) :
|
|||
if os.system(command) != 0:
|
||||
raise MoulinetteError(errno.EIO, m18n.n('certmanager_domain_cert_gen_failed'))
|
||||
|
||||
_set_permissions(cert_folder_domain, "root", "root", 0755);
|
||||
_set_permissions(cert_folder_domain+"/key.pem", "root", "metronome", 0640);
|
||||
_set_permissions(cert_folder_domain+"/crt.pem", "root", "metronome", 0640);
|
||||
_set_permissions(cert_folder_domain+"/openssl.cnf", "root", "root", 0600);
|
||||
|
||||
_set_permissions(cert_folder_domain, "root", "root", 0755)
|
||||
_set_permissions(cert_folder_domain + "/key.pem", "root", "metronome", 0640)
|
||||
_set_permissions(cert_folder_domain + "/crt.pem", "root", "metronome", 0640)
|
||||
_set_permissions(cert_folder_domain + "/openssl.cnf", "root", "root", 0600)
|
||||
|
||||
|
||||
# Install ACME / Let's Encrypt certificate
|
||||
|
||||
def certificate_install_letsencrypt(auth, domainList, force=False, no_checks=False):
|
||||
|
||||
|
||||
if not os.path.exists(account_key_file):
|
||||
_generate_account_key()
|
||||
|
||||
|
@ -195,7 +196,8 @@ def certificate_install_letsencrypt(auth, domainList, force=False, no_checks=Fal
|
|||
|
||||
# Is it self-signed ?
|
||||
status = _get_status(domain)
|
||||
if (status["CAtype"] != "Self-signed") : continue
|
||||
if (status["CAtype"] != "Self-signed"):
|
||||
continue
|
||||
|
||||
domainList.append(domain)
|
||||
|
||||
|
@ -211,7 +213,6 @@ def certificate_install_letsencrypt(auth, domainList, force=False, no_checks=Fal
|
|||
if (not force) and (status["CAtype"] != "Self-signed"):
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_domain_cert_not_selfsigned', domain=domain))
|
||||
|
||||
|
||||
# Actual install steps
|
||||
for domain in domainList:
|
||||
|
||||
|
@ -219,7 +220,8 @@ def certificate_install_letsencrypt(auth, domainList, force=False, no_checks=Fal
|
|||
|
||||
try:
|
||||
|
||||
if not no_checks : _check_domain_is_correctly_configured(domain)
|
||||
if not no_checks:
|
||||
_check_domain_is_correctly_configured(domain)
|
||||
_backup_current_cert(domain)
|
||||
_configure_for_acme_challenge(auth, domain)
|
||||
_fetch_and_enable_new_certificate(domain)
|
||||
|
@ -255,7 +257,8 @@ def certificate_renew(auth, domainList, force=False, no_checks=False, email=Fals
|
|||
|
||||
# Does it has a Let's Encrypt cert ?
|
||||
status = _get_status(domain)
|
||||
if (status["CAtype"] != "Let's Encrypt") : continue
|
||||
if (status["CAtype"] != "Let's Encrypt"):
|
||||
continue
|
||||
|
||||
# Does it expires soon ?
|
||||
if (force) or (status["validity"] <= validity_limit):
|
||||
|
@ -278,12 +281,10 @@ def certificate_renew(auth, domainList, force=False, no_checks=False, email=Fals
|
|||
if not ((force) or (status["validity"] <= validity_limit)):
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_attempt_to_renew_valid_cert', domain=domain))
|
||||
|
||||
|
||||
# Does it has a Let's Encrypt cert ?
|
||||
if (status["CAtype"] != "Let's Encrypt"):
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_attempt_to_renew_nonLE_cert', domain=domain))
|
||||
|
||||
|
||||
# Actual renew steps
|
||||
for domain in domainList:
|
||||
|
||||
|
@ -291,7 +292,8 @@ def certificate_renew(auth, domainList, force=False, no_checks=False, email=Fals
|
|||
|
||||
try:
|
||||
|
||||
if not no_checks : _check_domain_is_correctly_configured(domain)
|
||||
if not no_checks:
|
||||
_check_domain_is_correctly_configured(domain)
|
||||
_backup_current_cert(domain)
|
||||
_fetch_and_enable_new_certificate(domain)
|
||||
|
||||
|
@ -307,15 +309,11 @@ def certificate_renew(auth, domainList, force=False, no_checks=False, email=Fals
|
|||
_email_renewing_failed(domain, e)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
###############################################################################
|
||||
# Back-end stuff #
|
||||
###############################################################################
|
||||
|
||||
def _install_cron():
|
||||
|
||||
cron_job_file = "/etc/cron.weekly/certificateRenewer"
|
||||
|
||||
with open(cron_job_file, "w") as f:
|
||||
|
@ -323,10 +321,10 @@ def _install_cron() :
|
|||
f.write("#!/bin/bash\n")
|
||||
f.write("yunohost domain cert-renew --email\n")
|
||||
|
||||
_set_permissions(cron_job_file, "root", "root", 0755);
|
||||
_set_permissions(cron_job_file, "root", "root", 0755)
|
||||
|
||||
|
||||
def _email_renewing_failed(domain, e):
|
||||
|
||||
from_ = "certmanager@" + domain + " (Certificate Manager)"
|
||||
to_ = "root"
|
||||
subject_ = "Certificate renewing attempt for " + domain + " failed!"
|
||||
|
@ -361,9 +359,7 @@ Subject: %s
|
|||
smtp.quit()
|
||||
|
||||
|
||||
|
||||
def _configure_for_acme_challenge(auth, domain):
|
||||
|
||||
nginx_conf_file = "/etc/nginx/conf.d/" + domain + ".d/000-acmechallenge.conf"
|
||||
|
||||
nginx_configuration = '''
|
||||
|
@ -389,22 +385,24 @@ location '/.well-known/acme-challenge'
|
|||
|
||||
app_ssowatconf(auth)
|
||||
|
||||
def _fetch_and_enable_new_certificate(domain) :
|
||||
|
||||
def _fetch_and_enable_new_certificate(domain):
|
||||
# Make sure tmp folder exists
|
||||
logger.debug("Making sure tmp folders exists...")
|
||||
|
||||
if not (os.path.exists(webroot_folder)) : os.makedirs(webroot_folder)
|
||||
if not (os.path.exists(tmp_folder)) : os.makedirs(tmp_folder)
|
||||
_set_permissions(webroot_folder, "root", "www-data", 0650);
|
||||
_set_permissions(tmp_folder, "root", "root", 0640);
|
||||
if not (os.path.exists(webroot_folder)):
|
||||
os.makedirs(webroot_folder)
|
||||
if not (os.path.exists(tmp_folder)):
|
||||
os.makedirs(tmp_folder)
|
||||
_set_permissions(webroot_folder, "root", "www-data", 0650)
|
||||
_set_permissions(tmp_folder, "root", "root", 0640)
|
||||
|
||||
# Prepare certificate signing request
|
||||
logger.info("Prepare key and certificate signing request (CSR) for " + domain + "...")
|
||||
|
||||
domain_key_file = tmp_folder + "/" + domain + ".pem"
|
||||
_generate_key(domain_key_file)
|
||||
_set_permissions(domain_key_file, "root", "metronome", 0640);
|
||||
_set_permissions(domain_key_file, "root", "metronome", 0640)
|
||||
|
||||
_prepare_certificate_signing_request(domain, domain_key_file, tmp_folder)
|
||||
|
||||
|
@ -427,7 +425,7 @@ def _fetch_and_enable_new_certificate(domain) :
|
|||
date_tag = datetime.now().strftime("%Y%m%d.%H%M%S")
|
||||
new_cert_folder = cert_folder + "/" + domain + "." + date_tag
|
||||
os.makedirs(new_cert_folder)
|
||||
_set_permissions(new_cert_folder, "root", "root", 0655);
|
||||
_set_permissions(new_cert_folder, "root", "root", 0655)
|
||||
|
||||
# Move the private key
|
||||
shutil.move(domain_key_file, new_cert_folder + "/key.pem")
|
||||
|
@ -437,9 +435,7 @@ def _fetch_and_enable_new_certificate(domain) :
|
|||
with open(domain_cert_file, "w") as f:
|
||||
f.write(signed_certificate)
|
||||
f.write(intermediate_certificate)
|
||||
_set_permissions(domain_cert_file, "root", "metronome", 0640);
|
||||
|
||||
|
||||
_set_permissions(domain_cert_file, "root", "metronome", 0640)
|
||||
|
||||
logger.info("Enabling the new certificate...")
|
||||
|
||||
|
@ -459,7 +455,6 @@ def _fetch_and_enable_new_certificate(domain) :
|
|||
if (statusSummaryCode < 20):
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_certificate_fetching_or_enabling_failed', domain=domain))
|
||||
|
||||
|
||||
logger.info("Restarting services...")
|
||||
|
||||
for s in ["nginx", "postfix", "dovecot", "metronome"]:
|
||||
|
@ -467,7 +462,6 @@ def _fetch_and_enable_new_certificate(domain) :
|
|||
|
||||
|
||||
def _prepare_certificate_signing_request(domain, key_file, output_folder):
|
||||
|
||||
# Init a request
|
||||
csr = crypto.X509Req()
|
||||
|
||||
|
@ -490,10 +484,10 @@ def _prepare_certificate_signing_request(domain, key_file, output_folder) :
|
|||
|
||||
|
||||
def _get_status(domain):
|
||||
|
||||
cert_file = cert_folder + "/" + domain + "/crt.pem"
|
||||
|
||||
if (not os.path.isfile(cert_file)) : return {}
|
||||
if (not os.path.isfile(cert_file)):
|
||||
return {}
|
||||
|
||||
try:
|
||||
cert = crypto.load_certificate(crypto.FILETYPE_PEM, open(cert_file).read())
|
||||
|
@ -518,15 +512,20 @@ def _get_status(domain) :
|
|||
# Unknown by default
|
||||
statusSummaryCode = 0
|
||||
# Critical
|
||||
if (daysRemaining <= 0) : statusSummaryCode = -30
|
||||
if (daysRemaining <= 0):
|
||||
statusSummaryCode = -30
|
||||
# Warning, self-signed, browser will display a warning discouraging visitors to enter website
|
||||
elif (CAtype == "Self-signed") or (CAtype == "Fake Let's Encrypt") : statusSummaryCode = -20
|
||||
elif (CAtype == "Self-signed") or (CAtype == "Fake Let's Encrypt"):
|
||||
statusSummaryCode = -20
|
||||
# Attention, certificate will expire soon (should be renewed automatically if Let's Encrypt)
|
||||
elif (daysRemaining < validity_limit) : statusSummaryCode = -10
|
||||
elif (daysRemaining < validity_limit):
|
||||
statusSummaryCode = -10
|
||||
# CA not known, but still a valid certificate, so okay !
|
||||
elif (CAtype == "Other / Unknown") : statusSummaryCode = 10
|
||||
elif (CAtype == "Other / Unknown"):
|
||||
statusSummaryCode = 10
|
||||
# Let's Encrypt, great !
|
||||
elif (CAtype == "Let's Encrypt") : statusSummaryCode = 20
|
||||
elif (CAtype == "Let's Encrypt"):
|
||||
statusSummaryCode = 20
|
||||
|
||||
return {"domain": domain,
|
||||
"subject": certSubject,
|
||||
|
@ -540,30 +539,30 @@ def _get_status(domain) :
|
|||
# Misc small stuff ... #
|
||||
###############################################################################
|
||||
|
||||
def _generate_account_key() :
|
||||
|
||||
def _generate_account_key():
|
||||
logger.info("Generating account key ...")
|
||||
_generate_key(account_key_file)
|
||||
_set_permissions(account_key_file, "root", "root", 0400)
|
||||
|
||||
def _generate_key(destinationPath) :
|
||||
|
||||
def _generate_key(destinationPath):
|
||||
k = crypto.PKey()
|
||||
k.generate_key(crypto.TYPE_RSA, key_size)
|
||||
|
||||
with open(destinationPath, "w") as f:
|
||||
f.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, k))
|
||||
|
||||
def _set_permissions(path, user, group, permissions) :
|
||||
|
||||
def _set_permissions(path, user, group, permissions):
|
||||
uid = pwd.getpwnam(user).pw_uid
|
||||
gid = grp.getgrnam(group).gr_gid
|
||||
|
||||
os.chown(path, uid, gid)
|
||||
os.chmod(path, permissions)
|
||||
|
||||
def _backup_current_cert(domain):
|
||||
|
||||
def _backup_current_cert(domain):
|
||||
logger.info("Backuping existing certificate for domain " + domain)
|
||||
|
||||
cert_folder_domain = cert_folder + "/" + domain
|
||||
|
@ -575,7 +574,6 @@ def _backup_current_cert(domain):
|
|||
|
||||
|
||||
def _check_domain_is_correctly_configured(domain):
|
||||
|
||||
public_ip = yunohost.domain.get_public_ip()
|
||||
|
||||
# Check if IP from DNS matches public IP
|
||||
|
@ -586,8 +584,8 @@ def _check_domain_is_correctly_configured(domain) :
|
|||
if not _domain_is_accessible_through_HTTP(public_ip, domain):
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_domain_http_not_working', domain=domain))
|
||||
|
||||
def _dns_ip_match_public_ip(public_ip, domain) :
|
||||
|
||||
def _dns_ip_match_public_ip(public_ip, domain):
|
||||
try:
|
||||
r = requests.get("http://dns-api.org/A/" + domain)
|
||||
except:
|
||||
|
@ -606,8 +604,8 @@ def _dns_ip_match_public_ip(public_ip, domain) :
|
|||
else:
|
||||
return True
|
||||
|
||||
def _domain_is_accessible_through_HTTP(ip, domain) :
|
||||
|
||||
def _domain_is_accessible_through_HTTP(ip, domain):
|
||||
try:
|
||||
requests.head("http://" + ip, headers={"Host": domain})
|
||||
except Exception:
|
||||
|
@ -615,19 +613,25 @@ def _domain_is_accessible_through_HTTP(ip, domain) :
|
|||
|
||||
return True
|
||||
|
||||
def _summary_code_to_string(code) :
|
||||
|
||||
if (code <= -30) : return "CRITICAL"
|
||||
elif (code <= -20) : return "WARNING"
|
||||
elif (code <= -10) : return "Attention"
|
||||
elif (code <= 0) : return "Unknown?"
|
||||
elif (code <= 10) : return "Good"
|
||||
elif (code <= 20) : return "Great!"
|
||||
def _summary_code_to_string(code):
|
||||
if (code <= -30):
|
||||
return "CRITICAL"
|
||||
elif (code <= -20):
|
||||
return "WARNING"
|
||||
elif (code <= -10):
|
||||
return "Attention"
|
||||
elif (code <= 0):
|
||||
return "Unknown?"
|
||||
elif (code <= 10):
|
||||
return "Good"
|
||||
elif (code <= 20):
|
||||
return "Great!"
|
||||
|
||||
return "Unknown?"
|
||||
|
||||
def _name_selfCA() :
|
||||
|
||||
def _name_selfCA():
|
||||
cert = crypto.load_certificate(crypto.FILETYPE_PEM, open(selfCA_file).read())
|
||||
return cert.get_subject().CN
|
||||
|
||||
|
@ -635,6 +639,7 @@ def _name_selfCA() :
|
|||
def _tail(n, filePath):
|
||||
stdin, stdout = os.popen2("tail -n " + str(n) + " " + filePath)
|
||||
stdin.close()
|
||||
lines = stdout.readlines(); stdout.close()
|
||||
lines = stdout.readlines()
|
||||
stdout.close()
|
||||
lines = "".join(lines)
|
||||
return lines
|
||||
|
|
Loading…
Add table
Reference in a new issue