mirror of
https://github.com/YunoHost/moulinette.git
synced 2024-09-03 20:06:31 +02:00
pep8
This commit is contained in:
parent
1b20899f0e
commit
ddcc57eb9d
1 changed files with 89 additions and 55 deletions
|
@ -74,7 +74,7 @@ DNS_RESOLVERS = [
|
|||
"80.67.169.40", #
|
||||
"89.234.141.66", # ARN
|
||||
"141.255.128.100", # Aquilenet
|
||||
"141.255.128.101", #
|
||||
"141.255.128.101",
|
||||
"89.234.186.18", # Grifon
|
||||
"80.67.188.188" # LDN
|
||||
]
|
||||
|
@ -107,7 +107,8 @@ def certificate_status(auth, domain_list, full=False):
|
|||
for domain in domain_list:
|
||||
# Is it in Yunohost domain list ?
|
||||
if domain not in yunohost_domains_list:
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_domain_unknown', domain=domain))
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_domain_unknown', domain=domain))
|
||||
|
||||
certificates = {}
|
||||
|
||||
|
@ -128,7 +129,6 @@ def certificate_status(auth, domain_list, full=False):
|
|||
|
||||
|
||||
def certificate_install(auth, domain_list, force=False, no_checks=False, self_signed=False, staging=False):
|
||||
|
||||
"""
|
||||
Install a Let's Encrypt certificate for given domains (all by default)
|
||||
|
||||
|
@ -145,11 +145,11 @@ def certificate_install(auth, domain_list, force=False, no_checks=False, self_si
|
|||
# not used anymore
|
||||
_check_old_letsencrypt_app()
|
||||
|
||||
|
||||
if self_signed:
|
||||
_certificate_install_selfsigned(domain_list, force)
|
||||
else:
|
||||
_certificate_install_letsencrypt(auth, domain_list, force, no_checks, staging)
|
||||
_certificate_install_letsencrypt(
|
||||
auth, domain_list, force, no_checks, staging)
|
||||
|
||||
|
||||
def _certificate_install_selfsigned(domain_list, force=False):
|
||||
|
@ -158,7 +158,8 @@ def _certificate_install_selfsigned(domain_list, force=False):
|
|||
|
||||
# Paths of files and folder we'll need
|
||||
date_tag = datetime.now().strftime("%Y%m%d.%H%M%S")
|
||||
new_cert_folder = "%s/%s-history/%s-selfsigned" % (CERT_FOLDER, domain, date_tag)
|
||||
new_cert_folder = "%s/%s-history/%s-selfsigned" % (
|
||||
CERT_FOLDER, domain, date_tag)
|
||||
|
||||
original_ca_file = '/etc/ssl/certs/ca-yunohost_crt.pem'
|
||||
ssl_dir = '/usr/share/yunohost/yunohost-config/ssl/yunoCA'
|
||||
|
@ -176,7 +177,8 @@ def _certificate_install_selfsigned(domain_list, force=False):
|
|||
status = _get_status(domain)
|
||||
|
||||
if status["summary"]["code"] in ('good', 'great'):
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_attempt_to_replace_valid_cert', domain=domain))
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_attempt_to_replace_valid_cert', domain=domain))
|
||||
|
||||
# Create output folder for new certificate stuff
|
||||
os.makedirs(new_cert_folder)
|
||||
|
@ -197,11 +199,13 @@ def _certificate_install_selfsigned(domain_list, force=False):
|
|||
% (conf_file, csr_file, crt_file))
|
||||
|
||||
for command in commands:
|
||||
p = subprocess.Popen(command.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||||
p = subprocess.Popen(
|
||||
command.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||||
out, _ = p.communicate()
|
||||
if p.returncode != 0:
|
||||
logger.warning(out)
|
||||
raise MoulinetteError(errno.EIO, m18n.n('domain_cert_gen_failed'))
|
||||
raise MoulinetteError(
|
||||
errno.EIO, m18n.n('domain_cert_gen_failed'))
|
||||
else:
|
||||
logger.info(out)
|
||||
|
||||
|
@ -229,10 +233,11 @@ def _certificate_install_selfsigned(domain_list, force=False):
|
|||
status = _get_status(domain)
|
||||
|
||||
if status and status["CA_type"]["code"] == "self-signed" and status["validity"] > 3648:
|
||||
logger.success(m18n.n("certmanager_cert_install_success_selfsigned", domain=domain))
|
||||
logger.success(
|
||||
m18n.n("certmanager_cert_install_success_selfsigned", domain=domain))
|
||||
else:
|
||||
logger.error("Installation of self-signed certificate installation for %s failed !", domain)
|
||||
|
||||
logger.error(
|
||||
"Installation of self-signed certificate installation for %s failed !", domain)
|
||||
|
||||
|
||||
def _certificate_install_letsencrypt(auth, domain_list, force=False, no_checks=False, staging=False):
|
||||
|
@ -255,20 +260,24 @@ def _certificate_install_letsencrypt(auth, domain_list, force=False, no_checks=F
|
|||
for domain in domain_list:
|
||||
yunohost_domains_list = yunohost.domain.domain_list(auth)['domains']
|
||||
if domain not in yunohost_domains_list:
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_domain_unknown', domain=domain))
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_domain_unknown', domain=domain))
|
||||
|
||||
# Is it self-signed ?
|
||||
status = _get_status(domain)
|
||||
if not force and status["CA_type"]["code"] != "self-signed":
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_domain_cert_not_selfsigned', domain=domain))
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_domain_cert_not_selfsigned', domain=domain))
|
||||
|
||||
if (staging):
|
||||
logger.warning("Please note that you used the --staging option, and that no new certificate will actually be enabled !")
|
||||
logger.warning(
|
||||
"Please note that you used the --staging option, and that no new certificate will actually be enabled !")
|
||||
|
||||
# Actual install steps
|
||||
for domain in domain_list:
|
||||
|
||||
logger.info("Now attempting install of certificate for domain %s!", domain)
|
||||
logger.info(
|
||||
"Now attempting install of certificate for domain %s!", domain)
|
||||
|
||||
try:
|
||||
if not no_checks:
|
||||
|
@ -278,7 +287,8 @@ def _certificate_install_letsencrypt(auth, domain_list, force=False, no_checks=F
|
|||
_fetch_and_enable_new_certificate(domain, staging)
|
||||
_install_cron()
|
||||
|
||||
logger.success(m18n.n("certmanager_cert_install_success", domain=domain))
|
||||
logger.success(
|
||||
m18n.n("certmanager_cert_install_success", domain=domain))
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Certificate installation for %s failed !", domain)
|
||||
|
@ -325,31 +335,37 @@ def certificate_renew(auth, domain_list, force=False, no_checks=False, email=Fal
|
|||
|
||||
# Is it in Yunohost dmomain list ?
|
||||
if domain not in yunohost.domain.domain_list(auth)['domains']:
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_domain_unknown', domain=domain))
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_domain_unknown', domain=domain))
|
||||
|
||||
status = _get_status(domain)
|
||||
|
||||
# Does it expire soon ?
|
||||
if not force or status["validity"] <= VALIDITY_LIMIT:
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_attempt_to_renew_valid_cert', domain=domain))
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_attempt_to_renew_valid_cert', domain=domain))
|
||||
|
||||
# Does it have a Let's Encrypt cert ?
|
||||
if status["CA_type"]["code"] != "lets-encrypt":
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_attempt_to_renew_nonLE_cert', domain=domain))
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_attempt_to_renew_nonLE_cert', domain=domain))
|
||||
|
||||
if (staging):
|
||||
logger.warning("Please note that you used the --staging option, and that no new certificate will actually be enabled !")
|
||||
logger.warning(
|
||||
"Please note that you used the --staging option, and that no new certificate will actually be enabled !")
|
||||
|
||||
# Actual renew steps
|
||||
for domain in domain_list:
|
||||
logger.info("Now attempting renewing of certificate for domain %s !", domain)
|
||||
logger.info(
|
||||
"Now attempting renewing of certificate for domain %s !", domain)
|
||||
|
||||
try:
|
||||
if not no_checks:
|
||||
_check_domain_is_ready_for_ACME(domain)
|
||||
_fetch_and_enable_new_certificate(domain, staging)
|
||||
|
||||
logger.success(m18n.n("certmanager_cert_renew_success", domain=domain))
|
||||
logger.success(
|
||||
m18n.n("certmanager_cert_renew_success", domain=domain))
|
||||
|
||||
except Exception as e:
|
||||
import traceback
|
||||
|
@ -375,7 +391,8 @@ def _check_old_letsencrypt_app():
|
|||
if "letsencrypt" not in installedAppIds:
|
||||
return
|
||||
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_old_letsencrypt_app_detected'))
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_old_letsencrypt_app_detected'))
|
||||
|
||||
|
||||
def _install_cron():
|
||||
|
@ -436,21 +453,25 @@ location '/.well-known/acme-challenge'
|
|||
}
|
||||
''' % WEBROOT_FOLDER
|
||||
|
||||
# Check there isn't a conflicting file for the acme-challenge well-known uri
|
||||
# Check there isn't a conflicting file for the acme-challenge well-known
|
||||
# uri
|
||||
for path in glob.glob('%s/*.conf' % nginx_conf_folder):
|
||||
if (path == nginx_conf_file):
|
||||
continue
|
||||
with open(path) as f:
|
||||
contents = f.read()
|
||||
if ('/.well-known/acme-challenge' in contents):
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_conflicting_nginx_file', filepath=path))
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_conflicting_nginx_file', filepath=path))
|
||||
|
||||
# Write the conf
|
||||
if os.path.exists(nginx_conf_file):
|
||||
logger.info("Nginx configuration file for ACME challenge already exists for domain, skipping.")
|
||||
logger.info(
|
||||
"Nginx configuration file for ACME challenge already exists for domain, skipping.")
|
||||
return
|
||||
|
||||
logger.info("Adding Nginx configuration file for Acme challenge for domain %s.", domain)
|
||||
logger.info(
|
||||
"Adding Nginx configuration file for Acme challenge for domain %s.", domain)
|
||||
|
||||
with open(nginx_conf_file, "w") as f:
|
||||
f.write(nginx_configuration)
|
||||
|
@ -477,7 +498,8 @@ def _fetch_and_enable_new_certificate(domain, staging=False):
|
|||
_set_permissions(TMP_FOLDER, "root", "root", 0640)
|
||||
|
||||
# Prepare certificate signing request
|
||||
logger.info("Prepare key and certificate signing request (CSR) for %s...", domain)
|
||||
logger.info(
|
||||
"Prepare key and certificate signing request (CSR) for %s...", domain)
|
||||
|
||||
domain_key_file = "%s/%s.pem" % (TMP_FOLDER, domain)
|
||||
_generate_key(domain_key_file)
|
||||
|
@ -503,13 +525,16 @@ def _fetch_and_enable_new_certificate(domain, staging=False):
|
|||
CA=certification_authority)
|
||||
except ValueError as e:
|
||||
if ("urn:acme:error:rateLimited" in str(e)):
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_hit_rate_limit', domain=domain))
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_hit_rate_limit', domain=domain))
|
||||
else:
|
||||
logger.error(str(e))
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_cert_signing_failed'))
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_cert_signing_failed'))
|
||||
except Exception as e:
|
||||
logger.error(str(e))
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_cert_signing_failed'))
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_cert_signing_failed'))
|
||||
|
||||
intermediate_certificate = requests.get(INTERMEDIATE_CERTIFICATE_URL).text
|
||||
|
||||
|
@ -524,7 +549,8 @@ def _fetch_and_enable_new_certificate(domain, staging=False):
|
|||
else:
|
||||
folder_flag = "letsencrypt"
|
||||
|
||||
new_cert_folder = "%s/%s-history/%s-%s" % (CERT_FOLDER, domain, date_tag, folder_flag)
|
||||
new_cert_folder = "%s/%s-history/%s-%s" % (
|
||||
CERT_FOLDER, domain, date_tag, folder_flag)
|
||||
os.makedirs(new_cert_folder)
|
||||
|
||||
_set_permissions(new_cert_folder, "root", "root", 0655)
|
||||
|
@ -550,7 +576,8 @@ def _fetch_and_enable_new_certificate(domain, staging=False):
|
|||
status_summary = _get_status(domain)["summary"]
|
||||
|
||||
if status_summary["code"] != "great":
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_certificate_fetching_or_enabling_failed', domain=domain))
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_certificate_fetching_or_enabling_failed', domain=domain))
|
||||
|
||||
|
||||
def _prepare_certificate_signing_request(domain, key_file, output_folder):
|
||||
|
@ -582,14 +609,17 @@ def _get_status(domain):
|
|||
cert_file = os.path.join(CERT_FOLDER, domain, "crt.pem")
|
||||
|
||||
if not os.path.isfile(cert_file):
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_no_cert_file', domain=domain, file=cert_file))
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_no_cert_file', domain=domain, file=cert_file))
|
||||
|
||||
try:
|
||||
cert = crypto.load_certificate(crypto.FILETYPE_PEM, open(cert_file).read())
|
||||
cert = crypto.load_certificate(
|
||||
crypto.FILETYPE_PEM, open(cert_file).read())
|
||||
except Exception as exception:
|
||||
import traceback
|
||||
traceback.print_exc(file=sys.stdout)
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_cannot_read_cert', domain=domain, file=cert_file, reason=exception))
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_cannot_read_cert', domain=domain, file=cert_file, reason=exception))
|
||||
|
||||
cert_subject = cert.get_subject().CN
|
||||
cert_issuer = cert.get_issuer().CN
|
||||
|
@ -741,11 +771,13 @@ def _check_domain_is_ready_for_ACME(domain):
|
|||
|
||||
# Check if IP from DNS matches public IP
|
||||
if not _dns_ip_match_public_ip(public_ip, domain):
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_domain_dns_ip_differs_from_public_ip', domain=domain))
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_domain_dns_ip_differs_from_public_ip', domain=domain))
|
||||
|
||||
# Check if domain seems to be accessible through HTTP ?
|
||||
if not _domain_is_accessible_through_HTTP(public_ip, domain):
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_domain_http_not_working', domain=domain))
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_domain_http_not_working', domain=domain))
|
||||
|
||||
|
||||
def _dns_ip_match_public_ip(public_ip, domain):
|
||||
|
@ -754,7 +786,8 @@ def _dns_ip_match_public_ip(public_ip, domain):
|
|||
resolver.nameservers = DNS_RESOLVERS
|
||||
answers = resolver.query(domain, "A")
|
||||
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN):
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_error_no_A_record', domain=domain))
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_error_no_A_record', domain=domain))
|
||||
|
||||
dns_ip = str(answers[0])
|
||||
|
||||
|
@ -771,7 +804,8 @@ def _domain_is_accessible_through_HTTP(ip, domain):
|
|||
|
||||
|
||||
def _name_self_CA():
|
||||
cert = crypto.load_certificate(crypto.FILETYPE_PEM, open(SELF_CA_FILE).read())
|
||||
cert = crypto.load_certificate(
|
||||
crypto.FILETYPE_PEM, open(SELF_CA_FILE).read())
|
||||
return cert.get_subject().CN
|
||||
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue