mirror of
https://github.com/YunoHost/yunohost.git
synced 2024-09-03 20:06:10 +02:00
Merge pull request #180 from alexAubin/certmanager
[enh] Certificate management integration (e.g. Let's Encrypt ...)
This commit is contained in:
commit
34ca628624
8 changed files with 1128 additions and 37 deletions
|
@ -305,8 +305,71 @@ domain:
|
|||
- !!str ^[0-9]+$
|
||||
- "pattern_positive_number"
|
||||
|
||||
### certificate_status()
|
||||
cert-status:
|
||||
action_help: List status of current certificates (all by default).
|
||||
api: GET /domains/cert-status/<domain_list>
|
||||
configuration:
|
||||
authenticate: all
|
||||
authenticator: ldap-anonymous
|
||||
arguments:
|
||||
domain_list:
|
||||
help: Domains to check
|
||||
nargs: "*"
|
||||
--full:
|
||||
help: Show more details
|
||||
action: store_true
|
||||
|
||||
### domain_info()
|
||||
### certificate_install()
|
||||
cert-install:
|
||||
action_help: Install Let's Encrypt certificates for given domains (all by default).
|
||||
api: POST /domains/cert-install/<domain_list>
|
||||
configuration:
|
||||
authenticate: all
|
||||
authenticator: ldap-anonymous
|
||||
arguments:
|
||||
domain_list:
|
||||
help: Domains for which to install the certificates
|
||||
nargs: "*"
|
||||
--force:
|
||||
help: Install even if current certificate is not self-signed
|
||||
action: store_true
|
||||
--no-checks:
|
||||
help: Does not perform any check that your domain seems correctly configured (DNS, reachability) before attempting to install. (Not recommended)
|
||||
action: store_true
|
||||
--self-signed:
|
||||
help: Install self-signed certificate instead of Let's Encrypt
|
||||
action: store_true
|
||||
--staging:
|
||||
help: Use the fake/staging Let's Encrypt certification authority. The new certificate won't actually be enabled - it is only intended to test the main steps of the procedure.
|
||||
action: store_true
|
||||
|
||||
### certificate_renew()
|
||||
cert-renew:
|
||||
action_help: Renew the Let's Encrypt certificates for given domains (all by default).
|
||||
api: POST /domains/cert-renew/<domain_list>
|
||||
configuration:
|
||||
authenticate: all
|
||||
authenticator: ldap-anonymous
|
||||
arguments:
|
||||
domain_list:
|
||||
help: Domains for which to renew the certificates
|
||||
nargs: "*"
|
||||
--force:
|
||||
help: Ignore the validity threshold (30 days)
|
||||
action: store_true
|
||||
--email:
|
||||
help: Send an email to root with logs if some renewing fails
|
||||
action: store_true
|
||||
--no-checks:
|
||||
help: Does not perform any check that your domain seems correctly configured (DNS, reachability) before attempting to renew. (Not recommended)
|
||||
action: store_true
|
||||
--staging:
|
||||
help: Use the fake/staging Let's Encrypt certification authority. The new certificate won't actually be enabled - it is only intended to test the main steps of the procedure.
|
||||
action: store_true
|
||||
|
||||
|
||||
### domain_info()
|
||||
# info:
|
||||
# action_help: Get domain informations
|
||||
# api: GET /domains/<domain>
|
||||
|
|
|
@ -71,7 +71,6 @@
|
|||
"diagnostic_monitor_system_error": "Can't monitor system: {error}",
|
||||
"diagnostic_no_apps": "No installed application",
|
||||
"dnsmasq_isnt_installed": "dnsmasq does not seem to be installed, please run 'apt-get remove bind9 && apt-get install dnsmasq'",
|
||||
"domain_cert_gen_failed": "Unable to generate certificate",
|
||||
"domain_created": "The domain has been created",
|
||||
"domain_creation_failed": "Unable to create domain",
|
||||
"domain_deleted": "The domain has been deleted",
|
||||
|
@ -237,5 +236,24 @@
|
|||
"yunohost_ca_creation_failed": "Unable to create certificate authority",
|
||||
"yunohost_configured": "YunoHost has been configured",
|
||||
"yunohost_installing": "Installing YunoHost...",
|
||||
"yunohost_not_installed": "YunoHost is not or not correctly installed. Please execute 'yunohost tools postinstall'."
|
||||
"yunohost_not_installed": "YunoHost is not or not correctly installed. Please execute 'yunohost tools postinstall'.",
|
||||
"domain_cert_gen_failed": "Unable to generate certificate",
|
||||
"certmanager_attempt_to_replace_valid_cert" : "You are attempting to overwrite a good and valid certificate for domain {domain:s} ! (Use --force to bypass)",
|
||||
"certmanager_domain_unknown": "Unknown domain {domain:s}",
|
||||
"certmanager_domain_cert_not_selfsigned" : "The certificate for domain {domain:s} is not self-signed. Are you sure you want to replace it ? (Use --force)",
|
||||
"certmanager_certificate_fetching_or_enabling_failed": "Sounds like enabling the new certificate for {domain:s} failed somehow...",
|
||||
"certmanager_attempt_to_renew_nonLE_cert" : "The certificate for domain {domain:s} is not issued by Let's Encrypt. Cannot renew it automatically !",
|
||||
"certmanager_attempt_to_renew_valid_cert" : "The certificate for domain {domain:s} is not about to expire ! Use --force to bypass",
|
||||
"certmanager_domain_http_not_working": "It seems that the domain {domain:s} cannot be accessed through HTTP. Please check your DNS and nginx configuration is okay.",
|
||||
"certmanager_error_no_A_record" : "No DNS 'A' record found for {domain:s}. You need to make your domain name point to your machine to be able to install a Let's Encrypt certificate ! (If you know what you are doing, use --no-checks to disable those checks.)",
|
||||
"certmanager_domain_dns_ip_differs_from_public_ip" : "The DNS 'A' record for domain {domain:s} is different from this server IP. If you recently modified your A record, please wait for it to propagate (some DNS propagation checkers are available online). (If you know what you are doing, use --no-checks to disable those checks.)",
|
||||
"certmanager_cannot_read_cert": "Something wrong happened when trying to open current certificate for domain {domain:s} (file : {file:s}), reason: {reason:s}",
|
||||
"certmanager_cert_install_success_selfsigned" : "Successfully installed a self-signed certificate for domain {domain:s} !",
|
||||
"certmanager_cert_install_success" : "Successfully installed Let's Encrypt certificate for domain {domain:s} !",
|
||||
"certmanager_cert_renew_success" : "Successfully renewed Let's Encrypt certificate for domain {domain:s} !",
|
||||
"certmanager_old_letsencrypt_app_detected" : "\nYunohost detected that the 'letsencrypt' app is installed, which conflits with the new built-in certificate management features in Yunohost. If you wish to use the new built-in features, please run the following commands to migrate your installation :\n\n yunohost app remove letsencrypt\n yunohost domain cert-install\n\nN.B. : this will attempt to re-install certificates for all domains with a Let's Encrypt certificate or self-signed certificate.",
|
||||
"certmanager_hit_rate_limit" :"Too many certificates already issued for exact set of domains {domain:s} recently. Please try again later. See https://letsencrypt.org/docs/rate-limits/ for more details.",
|
||||
"certmanager_cert_signing_failed" : "Signing the new certificate failed.",
|
||||
"certmanager_no_cert_file" : "Unable to read certificate file for domain {domain:s} (file : {file:s})",
|
||||
"certmanager_conflicting_nginx_file": "Unable to prepare domain for ACME challenge : the nginx configuration file {filepath:s} is conflicting and should be removed first."
|
||||
}
|
||||
|
|
|
@ -1027,6 +1027,9 @@ def app_ssowatconf(auth):
|
|||
for domain in domains:
|
||||
skipped_urls.extend([domain + '/yunohost/admin', domain + '/yunohost/api'])
|
||||
|
||||
# Authorize ACME challenge url
|
||||
skipped_regex.append("^[^/]*/%.well%-known/acme%-challenge/.*$")
|
||||
|
||||
conf_dict = {
|
||||
'portal_domain': main_domain,
|
||||
'portal_path': '/yunohost/sso/',
|
||||
|
|
826
src/yunohost/certificate.py
Normal file
826
src/yunohost/certificate.py
Normal file
|
@ -0,0 +1,826 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
""" License
|
||||
|
||||
Copyright (C) 2016 YUNOHOST.ORG
|
||||
|
||||
This program is free software; you can redistribute it and/or modify
|
||||
it under the terms of the GNU Affero General Public License as published
|
||||
by the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Affero General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Affero General Public License
|
||||
along with this program; if not, see http://www.gnu.org/licenses
|
||||
|
||||
yunohost_certificate.py
|
||||
|
||||
Manage certificates, in particular Let's encrypt
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import errno
|
||||
import shutil
|
||||
import pwd
|
||||
import grp
|
||||
import smtplib
|
||||
import requests
|
||||
import subprocess
|
||||
import dns.resolver
|
||||
import glob
|
||||
|
||||
from OpenSSL import crypto
|
||||
from datetime import datetime
|
||||
from yunohost.vendor.acme_tiny.acme_tiny import get_crt as sign_certificate
|
||||
|
||||
from moulinette.core import MoulinetteError
|
||||
from moulinette.utils.log import getActionLogger
|
||||
|
||||
import yunohost.domain
|
||||
|
||||
from yunohost.app import app_ssowatconf
|
||||
from yunohost.service import _run_service_command
|
||||
|
||||
|
||||
logger = getActionLogger('yunohost.certmanager')
|
||||
|
||||
CERT_FOLDER = "/etc/yunohost/certs/"
|
||||
TMP_FOLDER = "/tmp/acme-challenge-private/"
|
||||
WEBROOT_FOLDER = "/tmp/acme-challenge-public/"
|
||||
|
||||
SELF_CA_FILE = "/etc/ssl/certs/ca-yunohost_crt.pem"
|
||||
ACCOUNT_KEY_FILE = "/etc/yunohost/letsencrypt_account.pem"
|
||||
|
||||
KEY_SIZE = 3072
|
||||
|
||||
VALIDITY_LIMIT = 15 # days
|
||||
|
||||
# For tests
|
||||
STAGING_CERTIFICATION_AUTHORITY = "https://acme-staging.api.letsencrypt.org"
|
||||
# For prod
|
||||
PRODUCTION_CERTIFICATION_AUTHORITY = "https://acme-v01.api.letsencrypt.org"
|
||||
|
||||
INTERMEDIATE_CERTIFICATE_URL = "https://letsencrypt.org/certs/lets-encrypt-x3-cross-signed.pem"
|
||||
|
||||
DNS_RESOLVERS = [
|
||||
# FFDN DNS resolvers
|
||||
# See https://www.ffdn.org/wiki/doku.php?id=formations:dns
|
||||
"80.67.169.12", # FDN
|
||||
"80.67.169.40", #
|
||||
"89.234.141.66", # ARN
|
||||
"141.255.128.100", # Aquilenet
|
||||
"141.255.128.101",
|
||||
"89.234.186.18", # Grifon
|
||||
"80.67.188.188" # LDN
|
||||
]
|
||||
|
||||
###############################################################################
|
||||
# Front-end stuff #
|
||||
###############################################################################
|
||||
|
||||
|
||||
def certificate_status(auth, domain_list, full=False):
|
||||
"""
|
||||
Print the status of certificate for given domains (all by default)
|
||||
|
||||
Keyword argument:
|
||||
domain_list -- Domains to be checked
|
||||
full -- Display more info about the certificates
|
||||
"""
|
||||
|
||||
# Check if old letsencrypt_ynh is installed
|
||||
# TODO / FIXME - Remove this in the future once the letsencrypt app is
|
||||
# not used anymore
|
||||
_check_old_letsencrypt_app()
|
||||
|
||||
# If no domains given, consider all yunohost domains
|
||||
if domain_list == []:
|
||||
domain_list = yunohost.domain.domain_list(auth)['domains']
|
||||
# Else, validate that yunohost knows the domains given
|
||||
else:
|
||||
yunohost_domains_list = yunohost.domain.domain_list(auth)['domains']
|
||||
for domain in domain_list:
|
||||
# Is it in Yunohost domain list?
|
||||
if domain not in yunohost_domains_list:
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_domain_unknown', domain=domain))
|
||||
|
||||
certificates = {}
|
||||
|
||||
for domain in domain_list:
|
||||
status = _get_status(domain)
|
||||
|
||||
if not full:
|
||||
del status["subject"]
|
||||
del status["CA_name"]
|
||||
del status["ACME_eligible"]
|
||||
status["CA_type"] = status["CA_type"]["verbose"]
|
||||
status["summary"] = status["summary"]["verbose"]
|
||||
|
||||
del status["domain"]
|
||||
certificates[domain] = status
|
||||
|
||||
return {"certificates": certificates}
|
||||
|
||||
|
||||
def certificate_install(auth, domain_list, force=False, no_checks=False, self_signed=False, staging=False):
|
||||
"""
|
||||
Install a Let's Encrypt certificate for given domains (all by default)
|
||||
|
||||
Keyword argument:
|
||||
domain_list -- Domains on which to install certificates
|
||||
force -- Install even if current certificate is not self-signed
|
||||
no-check -- Disable some checks about the reachability of web server
|
||||
before attempting the install
|
||||
self-signed -- Instal self-signed certificates instead of Let's Encrypt
|
||||
"""
|
||||
|
||||
# Check if old letsencrypt_ynh is installed
|
||||
# TODO / FIXME - Remove this in the future once the letsencrypt app is
|
||||
# not used anymore
|
||||
_check_old_letsencrypt_app()
|
||||
|
||||
if self_signed:
|
||||
_certificate_install_selfsigned(domain_list, force)
|
||||
else:
|
||||
_certificate_install_letsencrypt(
|
||||
auth, domain_list, force, no_checks, staging)
|
||||
|
||||
|
||||
def _certificate_install_selfsigned(domain_list, force=False):
|
||||
|
||||
for domain in domain_list:
|
||||
|
||||
# Paths of files and folder we'll need
|
||||
date_tag = datetime.now().strftime("%Y%m%d.%H%M%S")
|
||||
new_cert_folder = "%s/%s-history/%s-selfsigned" % (
|
||||
CERT_FOLDER, domain, date_tag)
|
||||
|
||||
original_ca_file = '/etc/ssl/certs/ca-yunohost_crt.pem'
|
||||
ssl_dir = '/usr/share/yunohost/yunohost-config/ssl/yunoCA'
|
||||
conf_template = os.path.join(ssl_dir, "openssl.cnf")
|
||||
|
||||
csr_file = os.path.join(ssl_dir, "certs", "yunohost_csr.pem")
|
||||
conf_file = os.path.join(new_cert_folder, "openssl.cnf")
|
||||
key_file = os.path.join(new_cert_folder, "key.pem")
|
||||
crt_file = os.path.join(new_cert_folder, "crt.pem")
|
||||
ca_file = os.path.join(new_cert_folder, "ca.pem")
|
||||
|
||||
# Check we ain't trying to overwrite a good cert !
|
||||
current_cert_file = os.path.join(CERT_FOLDER, domain, "crt.pem")
|
||||
if not force and os.path.isfile(current_cert_file):
|
||||
status = _get_status(domain)
|
||||
|
||||
if status["summary"]["code"] in ('good', 'great'):
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_attempt_to_replace_valid_cert', domain=domain))
|
||||
|
||||
# Create output folder for new certificate stuff
|
||||
os.makedirs(new_cert_folder)
|
||||
|
||||
# Create our conf file, based on template, replacing the occurences of
|
||||
# "yunohost.org" with the given domain
|
||||
with open(conf_file, "w") as f, open(conf_template, "r") as template:
|
||||
for line in template:
|
||||
f.write(line.replace("yunohost.org", domain))
|
||||
|
||||
# Use OpenSSL command line to create a certificate signing request,
|
||||
# and self-sign the cert
|
||||
commands = [
|
||||
"openssl req -new -config %s -days 3650 -out %s -keyout %s -nodes -batch"
|
||||
% (conf_file, csr_file, key_file),
|
||||
"openssl ca -config %s -days 3650 -in %s -out %s -batch"
|
||||
% (conf_file, csr_file, crt_file),
|
||||
]
|
||||
|
||||
for command in commands:
|
||||
p = subprocess.Popen(
|
||||
command.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||||
|
||||
out, _ = p.communicate()
|
||||
|
||||
if p.returncode != 0:
|
||||
logger.warning(out)
|
||||
raise MoulinetteError(
|
||||
errno.EIO, m18n.n('domain_cert_gen_failed'))
|
||||
else:
|
||||
logger.info(out)
|
||||
|
||||
# Link the CA cert (not sure it's actually needed in practice though,
|
||||
# since we append it at the end of crt.pem. For instance for Let's
|
||||
# Encrypt certs, we only need the crt.pem and key.pem)
|
||||
os.symlink(original_ca_file, ca_file)
|
||||
|
||||
# Append ca.pem at the end of crt.pem
|
||||
with open(ca_file, "r") as ca_pem, open(crt_file, "a") as crt_pem:
|
||||
crt_pem.write("\n")
|
||||
crt_pem.write(ca_pem.read())
|
||||
|
||||
# Set appropriate permissions
|
||||
_set_permissions(new_cert_folder, "root", "root", 0755)
|
||||
_set_permissions(key_file, "root", "metronome", 0640)
|
||||
_set_permissions(crt_file, "root", "metronome", 0640)
|
||||
_set_permissions(conf_file, "root", "root", 0600)
|
||||
|
||||
# Actually enable the certificate we created
|
||||
_enable_certificate(domain, new_cert_folder)
|
||||
|
||||
# Check new status indicate a recently created self-signed certificate
|
||||
status = _get_status(domain)
|
||||
|
||||
if status and status["CA_type"]["code"] == "self-signed" and status["validity"] > 3648:
|
||||
logger.success(
|
||||
m18n.n("certmanager_cert_install_success_selfsigned", domain=domain))
|
||||
else:
|
||||
logger.error(
|
||||
"Installation of self-signed certificate installation for %s failed !", domain)
|
||||
|
||||
|
||||
def _certificate_install_letsencrypt(auth, domain_list, force=False, no_checks=False, staging=False):
|
||||
if not os.path.exists(ACCOUNT_KEY_FILE):
|
||||
_generate_account_key()
|
||||
|
||||
# If no domains given, consider all yunohost domains with self-signed
|
||||
# certificates
|
||||
if domain_list == []:
|
||||
for domain in yunohost.domain.domain_list(auth)['domains']:
|
||||
|
||||
status = _get_status(domain)
|
||||
if status["CA_type"]["code"] != "self-signed":
|
||||
continue
|
||||
|
||||
domain_list.append(domain)
|
||||
|
||||
# Else, validate that yunohost knows the domains given
|
||||
else:
|
||||
for domain in domain_list:
|
||||
yunohost_domains_list = yunohost.domain.domain_list(auth)['domains']
|
||||
if domain not in yunohost_domains_list:
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_domain_unknown', domain=domain))
|
||||
|
||||
# Is it self-signed?
|
||||
status = _get_status(domain)
|
||||
if not force and status["CA_type"]["code"] != "self-signed":
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_domain_cert_not_selfsigned', domain=domain))
|
||||
|
||||
if staging:
|
||||
logger.warning(
|
||||
"Please note that you used the --staging option, and that no new certificate will actually be enabled !")
|
||||
|
||||
# Actual install steps
|
||||
for domain in domain_list:
|
||||
|
||||
logger.info(
|
||||
"Now attempting install of certificate for domain %s!", domain)
|
||||
|
||||
try:
|
||||
if not no_checks:
|
||||
_check_domain_is_ready_for_ACME(domain)
|
||||
|
||||
_configure_for_acme_challenge(auth, domain)
|
||||
_fetch_and_enable_new_certificate(domain, staging)
|
||||
_install_cron()
|
||||
|
||||
logger.success(
|
||||
m18n.n("certmanager_cert_install_success", domain=domain))
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Certificate installation for %s failed !\nException: %s", domain, e)
|
||||
|
||||
|
||||
def certificate_renew(auth, domain_list, force=False, no_checks=False, email=False, staging=False):
|
||||
"""
|
||||
Renew Let's Encrypt certificate for given domains (all by default)
|
||||
|
||||
Keyword argument:
|
||||
domain_list -- Domains for which to renew the certificates
|
||||
force -- Ignore the validity threshold (15 days)
|
||||
no-check -- Disable some checks about the reachability of web server
|
||||
before attempting the renewing
|
||||
email -- Emails root if some renewing failed
|
||||
"""
|
||||
|
||||
# Check if old letsencrypt_ynh is installed
|
||||
# TODO / FIXME - Remove this in the future once the letsencrypt app is
|
||||
# not used anymore
|
||||
_check_old_letsencrypt_app()
|
||||
|
||||
# If no domains given, consider all yunohost domains with Let's Encrypt
|
||||
# certificates
|
||||
if domain_list == []:
|
||||
for domain in yunohost.domain.domain_list(auth)['domains']:
|
||||
|
||||
# Does it have a Let's Encrypt cert?
|
||||
status = _get_status(domain)
|
||||
if status["CA_type"]["code"] != "lets-encrypt":
|
||||
continue
|
||||
|
||||
# Does it expire soon?
|
||||
if force or status["validity"] <= VALIDITY_LIMIT:
|
||||
domain_list.append(domain)
|
||||
|
||||
if len(domain_list) == 0:
|
||||
logger.info("No certificate needs to be renewed.")
|
||||
|
||||
# Else, validate the domain list given
|
||||
else:
|
||||
for domain in domain_list:
|
||||
|
||||
# Is it in Yunohost dmomain list?
|
||||
if domain not in yunohost.domain.domain_list(auth)['domains']:
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_domain_unknown', domain=domain))
|
||||
|
||||
status = _get_status(domain)
|
||||
|
||||
# Does it expire soon?
|
||||
if not force or status["validity"] <= VALIDITY_LIMIT:
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_attempt_to_renew_valid_cert', domain=domain))
|
||||
|
||||
# Does it have a Let's Encrypt cert?
|
||||
if status["CA_type"]["code"] != "lets-encrypt":
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_attempt_to_renew_nonLE_cert', domain=domain))
|
||||
|
||||
if staging:
|
||||
logger.warning(
|
||||
"Please note that you used the --staging option, and that no new certificate will actually be enabled !")
|
||||
|
||||
# Actual renew steps
|
||||
for domain in domain_list:
|
||||
logger.info(
|
||||
"Now attempting renewing of certificate for domain %s !", domain)
|
||||
|
||||
try:
|
||||
if not no_checks:
|
||||
_check_domain_is_ready_for_ACME(domain)
|
||||
_fetch_and_enable_new_certificate(domain, staging)
|
||||
|
||||
logger.success(
|
||||
m18n.n("certmanager_cert_renew_success", domain=domain))
|
||||
|
||||
except Exception as e:
|
||||
import traceback
|
||||
from StringIO import StringIO
|
||||
stack = StringIO()
|
||||
traceback.print_exc(file=stack)
|
||||
logger.error("Certificate renewing for %s failed !", domain)
|
||||
logger.error(stack.getvalue())
|
||||
logger.error(str(e))
|
||||
|
||||
if email:
|
||||
logger.error("Sending email with details to root ...")
|
||||
_email_renewing_failed(domain, e, stack.getvalue())
|
||||
|
||||
|
||||
###############################################################################
|
||||
# Back-end stuff #
|
||||
###############################################################################
|
||||
|
||||
def _check_old_letsencrypt_app():
|
||||
installedAppIds = [app["id"] for app in yunohost.app.app_list(installed=True)["apps"]]
|
||||
|
||||
if "letsencrypt" not in installedAppIds:
|
||||
return
|
||||
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_old_letsencrypt_app_detected'))
|
||||
|
||||
|
||||
def _install_cron():
|
||||
cron_job_file = "/etc/cron.daily/yunohost-certificate-renew"
|
||||
|
||||
with open(cron_job_file, "w") as f:
|
||||
f.write("#!/bin/bash\n")
|
||||
f.write("yunohost domain cert-renew --email\n")
|
||||
|
||||
_set_permissions(cron_job_file, "root", "root", 0755)
|
||||
|
||||
|
||||
def _email_renewing_failed(domain, exception_message, stack):
|
||||
from_ = "certmanager@%s (Certificate Manager)" % domain
|
||||
to_ = "root"
|
||||
subject_ = "Certificate renewing attempt for %s failed!" % domain
|
||||
|
||||
logs = _tail(50, "/var/log/yunohost/yunohost-cli.log")
|
||||
text = """
|
||||
An attempt for renewing the certificate for domain %s failed with the following
|
||||
error :
|
||||
|
||||
%s
|
||||
%s
|
||||
|
||||
Here's the tail of /var/log/yunohost/yunohost-cli.log, which might help to
|
||||
investigate :
|
||||
|
||||
%s
|
||||
|
||||
-- Certificate Manager
|
||||
|
||||
""" % (domain, exception_message, stack, logs)
|
||||
|
||||
message = """
|
||||
From: %s
|
||||
To: %s
|
||||
Subject: %s
|
||||
|
||||
%s
|
||||
""" % (from_, to_, subject_, text)
|
||||
|
||||
smtp = smtplib.SMTP("localhost")
|
||||
smtp.sendmail(from_, [to_], message)
|
||||
smtp.quit()
|
||||
|
||||
|
||||
def _configure_for_acme_challenge(auth, domain):
|
||||
|
||||
nginx_conf_folder = "/etc/nginx/conf.d/%s.d" % domain
|
||||
nginx_conf_file = "%s/000-acmechallenge.conf" % nginx_conf_folder
|
||||
|
||||
nginx_configuration = '''
|
||||
location '/.well-known/acme-challenge'
|
||||
{
|
||||
default_type "text/plain";
|
||||
alias %s;
|
||||
}
|
||||
''' % WEBROOT_FOLDER
|
||||
|
||||
# Check there isn't a conflicting file for the acme-challenge well-known
|
||||
# uri
|
||||
for path in glob.glob('%s/*.conf' % nginx_conf_folder):
|
||||
|
||||
if path == nginx_conf_file:
|
||||
continue
|
||||
|
||||
with open(path) as f:
|
||||
contents = f.read()
|
||||
|
||||
if '/.well-known/acme-challenge' in contents:
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_conflicting_nginx_file', filepath=path))
|
||||
|
||||
# Write the conf
|
||||
if os.path.exists(nginx_conf_file):
|
||||
logger.info(
|
||||
"Nginx configuration file for ACME challenge already exists for domain, skipping.")
|
||||
return
|
||||
|
||||
logger.info(
|
||||
"Adding Nginx configuration file for Acme challenge for domain %s.", domain)
|
||||
|
||||
with open(nginx_conf_file, "w") as f:
|
||||
f.write(nginx_configuration)
|
||||
|
||||
# Assume nginx conf is okay, and reload it
|
||||
# (FIXME : maybe add a check that it is, using nginx -t, haven't found
|
||||
# any clean function already implemented in yunohost to do this though)
|
||||
_run_service_command("reload", "nginx")
|
||||
|
||||
app_ssowatconf(auth)
|
||||
|
||||
|
||||
def _fetch_and_enable_new_certificate(domain, staging=False):
|
||||
# Make sure tmp folder exists
|
||||
logger.debug("Making sure tmp folders exists...")
|
||||
|
||||
if not os.path.exists(WEBROOT_FOLDER):
|
||||
os.makedirs(WEBROOT_FOLDER)
|
||||
|
||||
if not os.path.exists(TMP_FOLDER):
|
||||
os.makedirs(TMP_FOLDER)
|
||||
|
||||
_set_permissions(WEBROOT_FOLDER, "root", "www-data", 0650)
|
||||
_set_permissions(TMP_FOLDER, "root", "root", 0640)
|
||||
|
||||
# Prepare certificate signing request
|
||||
logger.info(
|
||||
"Prepare key and certificate signing request (CSR) for %s...", domain)
|
||||
|
||||
domain_key_file = "%s/%s.pem" % (TMP_FOLDER, domain)
|
||||
_generate_key(domain_key_file)
|
||||
_set_permissions(domain_key_file, "root", "metronome", 0640)
|
||||
|
||||
_prepare_certificate_signing_request(domain, domain_key_file, TMP_FOLDER)
|
||||
|
||||
# Sign the certificate
|
||||
logger.info("Now using ACME Tiny to sign the certificate...")
|
||||
|
||||
domain_csr_file = "%s/%s.csr" % (TMP_FOLDER, domain)
|
||||
|
||||
if staging:
|
||||
certification_authority = STAGING_CERTIFICATION_AUTHORITY
|
||||
else:
|
||||
certification_authority = PRODUCTION_CERTIFICATION_AUTHORITY
|
||||
|
||||
try:
|
||||
signed_certificate = sign_certificate(ACCOUNT_KEY_FILE,
|
||||
domain_csr_file,
|
||||
WEBROOT_FOLDER,
|
||||
log=logger,
|
||||
CA=certification_authority)
|
||||
except ValueError as e:
|
||||
if "urn:acme:error:rateLimited" in str(e):
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_hit_rate_limit', domain=domain))
|
||||
else:
|
||||
logger.error(str(e))
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_cert_signing_failed'))
|
||||
|
||||
except Exception as e:
|
||||
logger.error(str(e))
|
||||
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_cert_signing_failed'))
|
||||
|
||||
intermediate_certificate = requests.get(INTERMEDIATE_CERTIFICATE_URL).text
|
||||
|
||||
# Now save the key and signed certificate
|
||||
logger.info("Saving the key and signed certificate...")
|
||||
|
||||
# Create corresponding directory
|
||||
date_tag = datetime.now().strftime("%Y%m%d.%H%M%S")
|
||||
|
||||
if staging:
|
||||
folder_flag = "staging"
|
||||
else:
|
||||
folder_flag = "letsencrypt"
|
||||
|
||||
new_cert_folder = "%s/%s-history/%s-%s" % (
|
||||
CERT_FOLDER, domain, date_tag, folder_flag)
|
||||
|
||||
os.makedirs(new_cert_folder)
|
||||
|
||||
_set_permissions(new_cert_folder, "root", "root", 0655)
|
||||
|
||||
# Move the private key
|
||||
shutil.move(domain_key_file, os.path.join(new_cert_folder, "key.pem"))
|
||||
|
||||
# Write the cert
|
||||
domain_cert_file = os.path.join(new_cert_folder, "crt.pem")
|
||||
|
||||
with open(domain_cert_file, "w") as f:
|
||||
f.write(signed_certificate)
|
||||
f.write(intermediate_certificate)
|
||||
|
||||
_set_permissions(domain_cert_file, "root", "metronome", 0640)
|
||||
|
||||
if staging:
|
||||
return
|
||||
|
||||
_enable_certificate(domain, new_cert_folder)
|
||||
|
||||
# Check the status of the certificate is now good
|
||||
status_summary = _get_status(domain)["summary"]
|
||||
|
||||
if status_summary["code"] != "great":
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_certificate_fetching_or_enabling_failed', domain=domain))
|
||||
|
||||
|
||||
def _prepare_certificate_signing_request(domain, key_file, output_folder):
|
||||
# Init a request
|
||||
csr = crypto.X509Req()
|
||||
|
||||
# Set the domain
|
||||
csr.get_subject().CN = domain
|
||||
|
||||
# Set the key
|
||||
with open(key_file, 'rt') as f:
|
||||
key = crypto.load_privatekey(crypto.FILETYPE_PEM, f.read())
|
||||
|
||||
csr.set_pubkey(key)
|
||||
|
||||
# Sign the request
|
||||
csr.sign(key, "sha256")
|
||||
|
||||
# Save the request in tmp folder
|
||||
csr_file = output_folder + domain + ".csr"
|
||||
logger.info("Saving to %s.", csr_file)
|
||||
|
||||
with open(csr_file, "w") as f:
|
||||
f.write(crypto.dump_certificate_request(crypto.FILETYPE_PEM, csr))
|
||||
|
||||
|
||||
def _get_status(domain):
|
||||
|
||||
cert_file = os.path.join(CERT_FOLDER, domain, "crt.pem")
|
||||
|
||||
if not os.path.isfile(cert_file):
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_no_cert_file', domain=domain, file=cert_file))
|
||||
|
||||
try:
|
||||
cert = crypto.load_certificate(
|
||||
crypto.FILETYPE_PEM, open(cert_file).read())
|
||||
except Exception as exception:
|
||||
import traceback
|
||||
traceback.print_exc(file=sys.stdout)
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_cannot_read_cert', domain=domain, file=cert_file, reason=exception))
|
||||
|
||||
cert_subject = cert.get_subject().CN
|
||||
cert_issuer = cert.get_issuer().CN
|
||||
valid_up_to = datetime.strptime(cert.get_notAfter(), "%Y%m%d%H%M%SZ")
|
||||
days_remaining = (valid_up_to - datetime.now()).days
|
||||
|
||||
if cert_issuer == _name_self_CA():
|
||||
CA_type = {
|
||||
"code": "self-signed",
|
||||
"verbose": "Self-signed",
|
||||
}
|
||||
|
||||
elif cert_issuer.startswith("Let's Encrypt"):
|
||||
CA_type = {
|
||||
"code": "lets-encrypt",
|
||||
"verbose": "Let's Encrypt",
|
||||
}
|
||||
|
||||
elif cert_issuer.startswith("Fake LE"):
|
||||
CA_type = {
|
||||
"code": "fake-lets-encrypt",
|
||||
"verbose": "Fake Let's Encrypt",
|
||||
}
|
||||
|
||||
else:
|
||||
CA_type = {
|
||||
"code": "other-unknown",
|
||||
"verbose": "Other / Unknown",
|
||||
}
|
||||
|
||||
if days_remaining <= 0:
|
||||
status_summary = {
|
||||
"code": "critical",
|
||||
"verbose": "CRITICAL",
|
||||
}
|
||||
|
||||
elif CA_type["code"] in ("self-signed", "fake-lets-encrypt"):
|
||||
status_summary = {
|
||||
"code": "warning",
|
||||
"verbose": "WARNING",
|
||||
}
|
||||
|
||||
elif days_remaining < VALIDITY_LIMIT:
|
||||
status_summary = {
|
||||
"code": "attention",
|
||||
"verbose": "About to expire",
|
||||
}
|
||||
|
||||
elif CA_type["code"] == "other-unknown":
|
||||
status_summary = {
|
||||
"code": "good",
|
||||
"verbose": "Good",
|
||||
}
|
||||
|
||||
elif CA_type["code"] == "lets-encrypt":
|
||||
status_summary = {
|
||||
"code": "great",
|
||||
"verbose": "Great!",
|
||||
}
|
||||
|
||||
else:
|
||||
status_summary = {
|
||||
"code": "unknown",
|
||||
"verbose": "Unknown?",
|
||||
}
|
||||
|
||||
try:
|
||||
_check_domain_is_ready_for_ACME(domain)
|
||||
ACME_eligible = True
|
||||
except:
|
||||
ACME_eligible = False
|
||||
|
||||
return {
|
||||
"domain": domain,
|
||||
"subject": cert_subject,
|
||||
"CA_name": cert_issuer,
|
||||
"CA_type": CA_type,
|
||||
"validity": days_remaining,
|
||||
"summary": status_summary,
|
||||
"ACME_eligible": ACME_eligible
|
||||
}
|
||||
|
||||
###############################################################################
|
||||
# Misc small stuff ... #
|
||||
###############################################################################
|
||||
|
||||
|
||||
def _generate_account_key():
|
||||
logger.info("Generating account key ...")
|
||||
_generate_key(ACCOUNT_KEY_FILE)
|
||||
_set_permissions(ACCOUNT_KEY_FILE, "root", "root", 0400)
|
||||
|
||||
|
||||
def _generate_key(destination_path):
|
||||
k = crypto.PKey()
|
||||
k.generate_key(crypto.TYPE_RSA, KEY_SIZE)
|
||||
|
||||
with open(destination_path, "w") as f:
|
||||
f.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, k))
|
||||
|
||||
|
||||
def _set_permissions(path, user, group, permissions):
|
||||
uid = pwd.getpwnam(user).pw_uid
|
||||
gid = grp.getgrnam(group).gr_gid
|
||||
|
||||
os.chown(path, uid, gid)
|
||||
os.chmod(path, permissions)
|
||||
|
||||
|
||||
def _enable_certificate(domain, new_cert_folder):
|
||||
logger.info("Enabling the certificate for domain %s ...", domain)
|
||||
|
||||
live_link = os.path.join(CERT_FOLDER, domain)
|
||||
|
||||
# If a live link (or folder) already exists
|
||||
if os.path.exists(live_link):
|
||||
# If it's not a link ... expect if to be a folder
|
||||
if not os.path.islink(live_link):
|
||||
# Backup it and remove it
|
||||
_backup_current_cert(domain)
|
||||
shutil.rmtree(live_link)
|
||||
# Else if it's a link, simply delete it
|
||||
elif os.path.lexists(live_link):
|
||||
os.remove(live_link)
|
||||
|
||||
os.symlink(new_cert_folder, live_link)
|
||||
|
||||
logger.info("Restarting services...")
|
||||
|
||||
for service in ("postfix", "dovecot", "metronome"):
|
||||
_run_service_command("restart", service)
|
||||
|
||||
_run_service_command("reload", "nginx")
|
||||
|
||||
|
||||
def _backup_current_cert(domain):
|
||||
logger.info("Backuping existing certificate for domain %s", domain)
|
||||
|
||||
cert_folder_domain = os.path.join(CERT_FOLDER, domain)
|
||||
|
||||
date_tag = datetime.now().strftime("%Y%m%d.%H%M%S")
|
||||
backup_folder = "%s-backups/%s" % (cert_folder_domain, date_tag)
|
||||
|
||||
shutil.copytree(cert_folder_domain, backup_folder)
|
||||
|
||||
|
||||
def _check_domain_is_ready_for_ACME(domain):
|
||||
public_ip = yunohost.domain.get_public_ip()
|
||||
|
||||
# Check if IP from DNS matches public IP
|
||||
if not _dns_ip_match_public_ip(public_ip, domain):
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_domain_dns_ip_differs_from_public_ip', domain=domain))
|
||||
|
||||
# Check if domain seems to be accessible through HTTP?
|
||||
if not _domain_is_accessible_through_HTTP(public_ip, domain):
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_domain_http_not_working', domain=domain))
|
||||
|
||||
|
||||
def _dns_ip_match_public_ip(public_ip, domain):
|
||||
try:
|
||||
resolver = dns.resolver.Resolver()
|
||||
resolver.nameservers = DNS_RESOLVERS
|
||||
answers = resolver.query(domain, "A")
|
||||
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN):
|
||||
raise MoulinetteError(errno.EINVAL, m18n.n(
|
||||
'certmanager_error_no_A_record', domain=domain))
|
||||
|
||||
dns_ip = str(answers[0])
|
||||
|
||||
return dns_ip == public_ip
|
||||
|
||||
|
||||
def _domain_is_accessible_through_HTTP(ip, domain):
|
||||
try:
|
||||
requests.head("http://" + ip, headers={"Host": domain})
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def _name_self_CA():
|
||||
cert = crypto.load_certificate(
|
||||
crypto.FILETYPE_PEM, open(SELF_CA_FILE).read())
|
||||
return cert.get_subject().CN
|
||||
|
||||
|
||||
def _tail(n, file_path):
|
||||
stdin, stdout = os.popen2("tail -n %s '%s'" % (n, file_path))
|
||||
|
||||
stdin.close()
|
||||
|
||||
lines = stdout.readlines()
|
||||
stdout.close()
|
||||
|
||||
return "".join(lines)
|
|
@ -24,19 +24,20 @@
|
|||
Manage domains
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
import datetime
|
||||
import re
|
||||
import shutil
|
||||
import json
|
||||
import yaml
|
||||
import errno
|
||||
import requests
|
||||
|
||||
from urllib import urlopen
|
||||
|
||||
from moulinette.core import MoulinetteError
|
||||
from moulinette.utils.log import getActionLogger
|
||||
|
||||
import yunohost.certificate
|
||||
|
||||
from yunohost.service import service_regen_conf
|
||||
|
||||
logger = getActionLogger('yunohost.domain')
|
||||
|
@ -113,40 +114,10 @@ def domain_add(auth, domain, dyndns=False):
|
|||
m18n.n('domain_dyndns_root_unknown'))
|
||||
|
||||
try:
|
||||
# Commands
|
||||
ssl_dir = '/usr/share/yunohost/yunohost-config/ssl/yunoCA'
|
||||
ssl_domain_path = '/etc/yunohost/certs/%s' % domain
|
||||
with open('%s/serial' % ssl_dir, 'r') as f:
|
||||
serial = f.readline().rstrip()
|
||||
try: os.listdir(ssl_domain_path)
|
||||
except OSError: os.makedirs(ssl_domain_path)
|
||||
|
||||
command_list = [
|
||||
'cp %s/openssl.cnf %s' % (ssl_dir, ssl_domain_path),
|
||||
'sed -i "s/yunohost.org/%s/g" %s/openssl.cnf' % (domain, ssl_domain_path),
|
||||
'openssl req -new -config %s/openssl.cnf -days 3650 -out %s/certs/yunohost_csr.pem -keyout %s/certs/yunohost_key.pem -nodes -batch'
|
||||
% (ssl_domain_path, ssl_dir, ssl_dir),
|
||||
'openssl ca -config %s/openssl.cnf -days 3650 -in %s/certs/yunohost_csr.pem -out %s/certs/yunohost_crt.pem -batch'
|
||||
% (ssl_domain_path, ssl_dir, ssl_dir),
|
||||
'ln -s /etc/ssl/certs/ca-yunohost_crt.pem %s/ca.pem' % ssl_domain_path,
|
||||
'cp %s/certs/yunohost_key.pem %s/key.pem' % (ssl_dir, ssl_domain_path),
|
||||
'cp %s/newcerts/%s.pem %s/crt.pem' % (ssl_dir, serial, ssl_domain_path),
|
||||
'chmod 755 %s' % ssl_domain_path,
|
||||
'chmod 640 %s/key.pem' % ssl_domain_path,
|
||||
'chmod 640 %s/crt.pem' % ssl_domain_path,
|
||||
'chmod 600 %s/openssl.cnf' % ssl_domain_path,
|
||||
'chown root:metronome %s/key.pem' % ssl_domain_path,
|
||||
'chown root:metronome %s/crt.pem' % ssl_domain_path,
|
||||
'cat %s/ca.pem >> %s/crt.pem' % (ssl_domain_path, ssl_domain_path)
|
||||
]
|
||||
|
||||
for command in command_list:
|
||||
if os.system(command) != 0:
|
||||
raise MoulinetteError(errno.EIO,
|
||||
m18n.n('domain_cert_gen_failed'))
|
||||
yunohost.certificate._certificate_install_selfsigned([domain], False)
|
||||
|
||||
try:
|
||||
auth.validate_uniqueness({ 'virtualdomain': domain })
|
||||
auth.validate_uniqueness({'virtualdomain': domain})
|
||||
except MoulinetteError:
|
||||
raise MoulinetteError(errno.EEXIST, m18n.n('domain_exists'))
|
||||
|
||||
|
@ -286,6 +257,18 @@ def domain_dns_conf(domain, ttl=None):
|
|||
return result
|
||||
|
||||
|
||||
def domain_cert_status(auth, domain_list, full=False):
|
||||
return yunohost.certificate.certificate_status(auth, domain_list, full)
|
||||
|
||||
|
||||
def domain_cert_install(auth, domain_list, force=False, no_checks=False, self_signed=False, staging=False):
|
||||
return yunohost.certificate.certificate_install(auth, domain_list, force, no_checks, self_signed, staging)
|
||||
|
||||
|
||||
def domain_cert_renew(auth, domain_list, force=False, no_checks=False, email=False, staging=False):
|
||||
return yunohost.certificate.certificate_renew(auth, domain_list, force, no_checks, email, staging)
|
||||
|
||||
|
||||
def get_public_ip(protocol=4):
|
||||
"""Retrieve the public IP address from ip.yunohost.org"""
|
||||
if protocol == 4:
|
||||
|
|
0
src/yunohost/vendor/__init__.py
vendored
Normal file
0
src/yunohost/vendor/__init__.py
vendored
Normal file
0
src/yunohost/vendor/acme_tiny/__init__.py
vendored
Normal file
0
src/yunohost/vendor/acme_tiny/__init__.py
vendored
Normal file
198
src/yunohost/vendor/acme_tiny/acme_tiny.py
vendored
Normal file
198
src/yunohost/vendor/acme_tiny/acme_tiny.py
vendored
Normal file
|
@ -0,0 +1,198 @@
|
|||
#!/usr/bin/env python
|
||||
import argparse, subprocess, json, os, sys, base64, binascii, time, hashlib, re, copy, textwrap, logging
|
||||
try:
|
||||
from urllib.request import urlopen # Python 3
|
||||
except ImportError:
|
||||
from urllib2 import urlopen # Python 2
|
||||
|
||||
#DEFAULT_CA = "https://acme-staging.api.letsencrypt.org"
|
||||
DEFAULT_CA = "https://acme-v01.api.letsencrypt.org"
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
LOGGER.addHandler(logging.StreamHandler())
|
||||
LOGGER.setLevel(logging.INFO)
|
||||
|
||||
def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA):
|
||||
# helper function base64 encode for jose spec
|
||||
def _b64(b):
|
||||
return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "")
|
||||
|
||||
# parse account key to get public key
|
||||
log.info("Parsing account key...")
|
||||
proc = subprocess.Popen(["openssl", "rsa", "-in", account_key, "-noout", "-text"],
|
||||
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
out, err = proc.communicate()
|
||||
if proc.returncode != 0:
|
||||
raise IOError("OpenSSL Error: {0}".format(err))
|
||||
pub_hex, pub_exp = re.search(
|
||||
r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)",
|
||||
out.decode('utf8'), re.MULTILINE|re.DOTALL).groups()
|
||||
pub_exp = "{0:x}".format(int(pub_exp))
|
||||
pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
|
||||
header = {
|
||||
"alg": "RS256",
|
||||
"jwk": {
|
||||
"e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
|
||||
"kty": "RSA",
|
||||
"n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),
|
||||
},
|
||||
}
|
||||
accountkey_json = json.dumps(header['jwk'], sort_keys=True, separators=(',', ':'))
|
||||
thumbprint = _b64(hashlib.sha256(accountkey_json.encode('utf8')).digest())
|
||||
|
||||
# helper function make signed requests
|
||||
def _send_signed_request(url, payload):
|
||||
payload64 = _b64(json.dumps(payload).encode('utf8'))
|
||||
protected = copy.deepcopy(header)
|
||||
protected["nonce"] = urlopen(CA + "/directory").headers['Replay-Nonce']
|
||||
protected64 = _b64(json.dumps(protected).encode('utf8'))
|
||||
proc = subprocess.Popen(["openssl", "dgst", "-sha256", "-sign", account_key],
|
||||
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
out, err = proc.communicate("{0}.{1}".format(protected64, payload64).encode('utf8'))
|
||||
if proc.returncode != 0:
|
||||
raise IOError("OpenSSL Error: {0}".format(err))
|
||||
data = json.dumps({
|
||||
"header": header, "protected": protected64,
|
||||
"payload": payload64, "signature": _b64(out),
|
||||
})
|
||||
try:
|
||||
resp = urlopen(url, data.encode('utf8'))
|
||||
return resp.getcode(), resp.read()
|
||||
except IOError as e:
|
||||
return getattr(e, "code", None), getattr(e, "read", e.__str__)()
|
||||
|
||||
# find domains
|
||||
log.info("Parsing CSR...")
|
||||
proc = subprocess.Popen(["openssl", "req", "-in", csr, "-noout", "-text"],
|
||||
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
out, err = proc.communicate()
|
||||
if proc.returncode != 0:
|
||||
raise IOError("Error loading {0}: {1}".format(csr, err))
|
||||
domains = set([])
|
||||
common_name = re.search(r"Subject:.*? CN=([^\s,;/]+)", out.decode('utf8'))
|
||||
if common_name is not None:
|
||||
domains.add(common_name.group(1))
|
||||
subject_alt_names = re.search(r"X509v3 Subject Alternative Name: \n +([^\n]+)\n", out.decode('utf8'), re.MULTILINE|re.DOTALL)
|
||||
if subject_alt_names is not None:
|
||||
for san in subject_alt_names.group(1).split(", "):
|
||||
if san.startswith("DNS:"):
|
||||
domains.add(san[4:])
|
||||
|
||||
# get the certificate domains and expiration
|
||||
log.info("Registering account...")
|
||||
code, result = _send_signed_request(CA + "/acme/new-reg", {
|
||||
"resource": "new-reg",
|
||||
"agreement": "https://letsencrypt.org/documents/LE-SA-v1.1.1-August-1-2016.pdf",
|
||||
})
|
||||
if code == 201:
|
||||
log.info("Registered!")
|
||||
elif code == 409:
|
||||
log.info("Already registered!")
|
||||
else:
|
||||
raise ValueError("Error registering: {0} {1}".format(code, result))
|
||||
|
||||
# verify each domain
|
||||
for domain in domains:
|
||||
log.info("Verifying {0}...".format(domain))
|
||||
|
||||
# get new challenge
|
||||
code, result = _send_signed_request(CA + "/acme/new-authz", {
|
||||
"resource": "new-authz",
|
||||
"identifier": {"type": "dns", "value": domain},
|
||||
})
|
||||
if code != 201:
|
||||
raise ValueError("Error requesting challenges: {0} {1}".format(code, result))
|
||||
|
||||
# make the challenge file
|
||||
challenge = [c for c in json.loads(result.decode('utf8'))['challenges'] if c['type'] == "http-01"][0]
|
||||
token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge['token'])
|
||||
keyauthorization = "{0}.{1}".format(token, thumbprint)
|
||||
wellknown_path = os.path.join(acme_dir, token)
|
||||
with open(wellknown_path, "w") as wellknown_file:
|
||||
wellknown_file.write(keyauthorization)
|
||||
|
||||
# check that the file is in place
|
||||
wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(domain, token)
|
||||
try:
|
||||
resp = urlopen(wellknown_url)
|
||||
resp_data = resp.read().decode('utf8').strip()
|
||||
assert resp_data == keyauthorization
|
||||
except (IOError, AssertionError):
|
||||
os.remove(wellknown_path)
|
||||
raise ValueError("Wrote file to {0}, but couldn't download {1}".format(
|
||||
wellknown_path, wellknown_url))
|
||||
|
||||
# notify challenge are met
|
||||
code, result = _send_signed_request(challenge['uri'], {
|
||||
"resource": "challenge",
|
||||
"keyAuthorization": keyauthorization,
|
||||
})
|
||||
if code != 202:
|
||||
raise ValueError("Error triggering challenge: {0} {1}".format(code, result))
|
||||
|
||||
# wait for challenge to be verified
|
||||
while True:
|
||||
try:
|
||||
resp = urlopen(challenge['uri'])
|
||||
challenge_status = json.loads(resp.read().decode('utf8'))
|
||||
except IOError as e:
|
||||
raise ValueError("Error checking challenge: {0} {1}".format(
|
||||
e.code, json.loads(e.read().decode('utf8'))))
|
||||
if challenge_status['status'] == "pending":
|
||||
time.sleep(2)
|
||||
elif challenge_status['status'] == "valid":
|
||||
log.info("{0} verified!".format(domain))
|
||||
os.remove(wellknown_path)
|
||||
break
|
||||
else:
|
||||
raise ValueError("{0} challenge did not pass: {1}".format(
|
||||
domain, challenge_status))
|
||||
|
||||
# get the new certificate
|
||||
log.info("Signing certificate...")
|
||||
proc = subprocess.Popen(["openssl", "req", "-in", csr, "-outform", "DER"],
|
||||
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
csr_der, err = proc.communicate()
|
||||
code, result = _send_signed_request(CA + "/acme/new-cert", {
|
||||
"resource": "new-cert",
|
||||
"csr": _b64(csr_der),
|
||||
})
|
||||
if code != 201:
|
||||
raise ValueError("Error signing certificate: {0} {1}".format(code, result))
|
||||
|
||||
# return signed certificate!
|
||||
log.info("Certificate signed!")
|
||||
return """-----BEGIN CERTIFICATE-----\n{0}\n-----END CERTIFICATE-----\n""".format(
|
||||
"\n".join(textwrap.wrap(base64.b64encode(result).decode('utf8'), 64)))
|
||||
|
||||
def main(argv):
|
||||
parser = argparse.ArgumentParser(
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
description=textwrap.dedent("""\
|
||||
This script automates the process of getting a signed TLS certificate from
|
||||
Let's Encrypt using the ACME protocol. It will need to be run on your server
|
||||
and have access to your private account key, so PLEASE READ THROUGH IT! It's
|
||||
only ~200 lines, so it won't take long.
|
||||
|
||||
===Example Usage===
|
||||
python acme_tiny.py --account-key ./account.key --csr ./domain.csr --acme-dir /usr/share/nginx/html/.well-known/acme-challenge/ > signed.crt
|
||||
===================
|
||||
|
||||
===Example Crontab Renewal (once per month)===
|
||||
0 0 1 * * python /path/to/acme_tiny.py --account-key /path/to/account.key --csr /path/to/domain.csr --acme-dir /usr/share/nginx/html/.well-known/acme-challenge/ > /path/to/signed.crt 2>> /var/log/acme_tiny.log
|
||||
==============================================
|
||||
""")
|
||||
)
|
||||
parser.add_argument("--account-key", required=True, help="path to your Let's Encrypt account private key")
|
||||
parser.add_argument("--csr", required=True, help="path to your certificate signing request")
|
||||
parser.add_argument("--acme-dir", required=True, help="path to the .well-known/acme-challenge/ directory")
|
||||
parser.add_argument("--quiet", action="store_const", const=logging.ERROR, help="suppress output except for errors")
|
||||
parser.add_argument("--ca", default=DEFAULT_CA, help="certificate authority, default is Let's Encrypt")
|
||||
|
||||
args = parser.parse_args(argv)
|
||||
LOGGER.setLevel(args.quiet or LOGGER.level)
|
||||
signed_crt = get_crt(args.account_key, args.csr, args.acme_dir, log=LOGGER, CA=args.ca)
|
||||
sys.stdout.write(signed_crt)
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
main(sys.argv[1:])
|
Loading…
Add table
Reference in a new issue