From bec8f6347959cb5ab38cb24842703e5858bef8f4 Mon Sep 17 00:00:00 2001 From: Laurent Peuch Date: Sun, 30 Oct 2016 04:24:54 +0100 Subject: [PATCH] [mod] autopep8 --- src/yunohost/certificate.py | 415 ++++++++++++++++++------------------ 1 file changed, 210 insertions(+), 205 deletions(-) diff --git a/src/yunohost/certificate.py b/src/yunohost/certificate.py index 69a78964e..e11ea9e3b 100644 --- a/src/yunohost/certificate.py +++ b/src/yunohost/certificate.py @@ -33,32 +33,34 @@ import grp import json import smtplib -from OpenSSL import crypto -from datetime import datetime -from tabulate import tabulate -from acme_tiny import get_crt as sign_certificate +from OpenSSL import crypto +from datetime import datetime +from tabulate import tabulate +from acme_tiny import get_crt as sign_certificate -from moulinette.core import MoulinetteError -from moulinette.utils.log import getActionLogger import yunohost.domain -from yunohost.service import _run_service_command -from yunohost.app import app_ssowatconf + +from moulinette.core import MoulinetteError +from moulinette.utils.log import getActionLogger + +from yunohost.app import app_ssowatconf +from yunohost.service import _run_service_command logger = getActionLogger('yunohost.certmanager') # Misc stuff we need -cert_folder = "/etc/yunohost/certs/" -tmp_folder = "/tmp/acme-challenge-private/" -webroot_folder = "/tmp/acme-challenge-public/" +cert_folder = "/etc/yunohost/certs/" +tmp_folder = "/tmp/acme-challenge-private/" +webroot_folder = "/tmp/acme-challenge-public/" -selfCA_file = "/etc/ssl/certs/ca-yunohost_crt.pem" +selfCA_file = "/etc/ssl/certs/ca-yunohost_crt.pem" account_key_file = "/etc/yunohost/letsencrypt_account.pem" -key_size = 2048 +key_size = 2048 -validity_limit = 15 # days +validity_limit = 15 # days # For tests #certification_authority = "https://acme-staging.api.letsencrypt.org" @@ -73,7 +75,8 @@ intermediate_certificate_url = "https://letsencrypt.org/certs/lets-encrypt-x3-cr # Status -def certificate_status(auth, domainList, full = False): + +def certificate_status(auth, domainList, full=False): """ Print the status of certificate for given domains (all by default) @@ -83,36 +86,39 @@ def certificate_status(auth, domainList, full = False): """ # If no domains given, consider all yunohost domains - if (domainList == []) : domainList = yunohost.domain.domain_list(auth)['domains'] + if (domainList == []): + domainList = yunohost.domain.domain_list(auth)['domains'] # Else, validate that yunohost knows the domains given - else : - for domain in domainList : + else: + for domain in domainList: # Is it in Yunohost dmomain list ? if domain not in yunohost.domain.domain_list(auth)['domains']: raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_domain_unknown', domain=domain)) # Get status for each domain, and prepare display using tabulate - if not full : - headers = [ "Domain", "Certificate status", "Authority type", "Days remaining"] - else : - headers = [ "Domain", "Certificate subject", "Certificate status", "Authority type", "Authority name", "Days remaining"] + if not full: + headers = ["Domain", "Certificate status", "Authority type", "Days remaining"] + else: + headers = ["Domain", "Certificate subject", "Certificate status", "Authority type", "Authority name", "Days remaining"] lines = [] - for domain in domainList : + for domain in domainList: status = _get_status(domain) line = [] line.append(domain) - if (full) : line.append(status["subject"]) + if (full): + line.append(status["subject"]) line.append(_summary_code_to_string(status["summaryCode"])) line.append(status["CAtype"]) - if (full) : line.append(status["CAname"]) + if (full): + line.append(status["CAname"]) line.append(status["validity"]) lines.append(line) print(tabulate(lines, headers=headers, tablefmt="simple", stralign="center")) -def certificate_install(auth, domainList, force=False, no_checks=False, self_signed=False) : +def certificate_install(auth, domainList, force=False, no_checks=False, self_signed=False): """ Install a Let's Encrypt certificate for given domains (all by default) @@ -122,31 +128,29 @@ def certificate_install(auth, domainList, force=False, no_checks=False, self_sig no-check -- Disable some checks about the reachability of web server before attempting the install self-signed -- Instal self-signed certificates instead of Let's Encrypt - """ - if (self_signed) : + """ + if (self_signed): certificate_install_selfsigned(domainList, force) - else : + else: certificate_install_letsencrypt(auth, domainList, force, no_checks) -# Install self-signed +# Install self-signed -def certificate_install_selfsigned(domainList, force=False) : - - for domain in domainList : +def certificate_install_selfsigned(domainList, force=False): + for domain in domainList: # Check we ain't trying to overwrite a good cert ! status = _get_status(domain) - if (status != {}) and (status["summaryCode"] > 0) and (not force) : + if (status != {}) and (status["summaryCode"] > 0) and (not force): raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_attempt_to_replace_valid_cert', domain=domain)) - - cert_folder_domain = cert_folder+"/"+domain + cert_folder_domain = cert_folder + "/" + domain # Create cert folder if it does not exists yet - try: + try: os.listdir(cert_folder_domain) - except OSError: + except OSError: os.makedirs(cert_folder_domain) # Get serial @@ -157,79 +161,77 @@ def certificate_install_selfsigned(domainList, force=False) : # FIXME : should refactor this to avoid so many os.system() calls... # We should be able to do all this using OpenSSL.crypto and os/shutil command_list = [ - 'cp %s/openssl.cnf %s' % (ssl_dir, cert_folder_domain), - 'sed -i "s/yunohost.org/%s/g" %s/openssl.cnf' % (domain, cert_folder_domain), + 'cp %s/openssl.cnf %s' % (ssl_dir, cert_folder_domain), + 'sed -i "s/yunohost.org/%s/g" %s/openssl.cnf' % (domain, cert_folder_domain), 'openssl req -new -config %s/openssl.cnf -days 3650 -out %s/certs/yunohost_csr.pem -keyout %s/certs/yunohost_key.pem -nodes -batch' % (cert_folder_domain, ssl_dir, ssl_dir), 'openssl ca -config %s/openssl.cnf -days 3650 -in %s/certs/yunohost_csr.pem -out %s/certs/yunohost_crt.pem -batch' % (cert_folder_domain, ssl_dir, ssl_dir), 'ln -s /etc/ssl/certs/ca-yunohost_crt.pem %s/ca.pem' % cert_folder_domain, - 'cp %s/certs/yunohost_key.pem %s/key.pem' % (ssl_dir, cert_folder_domain), - 'cp %s/newcerts/%s.pem %s/crt.pem' % (ssl_dir, serial, cert_folder_domain), - 'cat %s/ca.pem >> %s/crt.pem' % (cert_folder_domain, cert_folder_domain) + 'cp %s/certs/yunohost_key.pem %s/key.pem' % (ssl_dir, cert_folder_domain), + 'cp %s/newcerts/%s.pem %s/crt.pem' % (ssl_dir, serial, cert_folder_domain), + 'cat %s/ca.pem >> %s/crt.pem' % (cert_folder_domain, cert_folder_domain) ] for command in command_list: if os.system(command) != 0: raise MoulinetteError(errno.EIO, m18n.n('certmanager_domain_cert_gen_failed')) - _set_permissions(cert_folder_domain, "root", "root", 0755); - _set_permissions(cert_folder_domain+"/key.pem", "root", "metronome", 0640); - _set_permissions(cert_folder_domain+"/crt.pem", "root", "metronome", 0640); - _set_permissions(cert_folder_domain+"/openssl.cnf", "root", "root", 0600); - + _set_permissions(cert_folder_domain, "root", "root", 0755) + _set_permissions(cert_folder_domain + "/key.pem", "root", "metronome", 0640) + _set_permissions(cert_folder_domain + "/crt.pem", "root", "metronome", 0640) + _set_permissions(cert_folder_domain + "/openssl.cnf", "root", "root", 0600) # Install ACME / Let's Encrypt certificate def certificate_install_letsencrypt(auth, domainList, force=False, no_checks=False): - - - if not os.path.exists(account_key_file) : + if not os.path.exists(account_key_file): _generate_account_key() # If no domains given, consider all yunohost domains with self-signed # certificates - if (domainList == []) : - for domain in yunohost.domain.domain_list(auth)['domains'] : + if (domainList == []): + for domain in yunohost.domain.domain_list(auth)['domains']: # Is it self-signed ? status = _get_status(domain) - if (status["CAtype"] != "Self-signed") : continue + if (status["CAtype"] != "Self-signed"): + continue domainList.append(domain) # Else, validate that yunohost knows the domains given - else : - for domain in domainList : + else: + for domain in domainList: # Is it in Yunohost dmomain list ? if domain not in yunohost.domain.domain_list(auth)['domains']: raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_domain_unknown', domain=domain)) # Is it self-signed ? status = _get_status(domain) - if (not force) and (status["CAtype"] != "Self-signed") : + if (not force) and (status["CAtype"] != "Self-signed"): raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_domain_cert_not_selfsigned', domain=domain)) - # Actual install steps - for domain in domainList : - - logger.info("Now attempting install of certificate for domain "+domain+" !") + for domain in domainList: - try : + logger.info("Now attempting install of certificate for domain " + domain + " !") - if not no_checks : _check_domain_is_correctly_configured(domain) + try: + + if not no_checks: + _check_domain_is_correctly_configured(domain) _backup_current_cert(domain) _configure_for_acme_challenge(auth, domain) _fetch_and_enable_new_certificate(domain) _install_cron() - + logger.success(m18n.n("certmanager_cert_install_success", domain=domain)) - - except Exception as e : - - logger.error("Certificate installation for "+domain+" failed !") + + except Exception as e: + + logger.error("Certificate installation for " + domain + " failed !") logger.error(str(e)) @@ -250,86 +252,82 @@ def certificate_renew(auth, domainList, force=False, no_checks=False, email=Fals # If no domains given, consider all yunohost domains with Let's Encrypt # certificates - if (domainList == []) : - for domain in yunohost.domain.domain_list(auth)['domains'] : + if (domainList == []): + for domain in yunohost.domain.domain_list(auth)['domains']: # Does it has a Let's Encrypt cert ? status = _get_status(domain) - if (status["CAtype"] != "Let's Encrypt") : continue + if (status["CAtype"] != "Let's Encrypt"): + continue # Does it expires soon ? - if (force) or (status["validity"] <= validity_limit) : + if (force) or (status["validity"] <= validity_limit): domainList.append(domain) - if (len(domainList) == 0) : + if (len(domainList) == 0): logger.info("No certificate needs to be renewed.") # Else, validate the domain list given - else : - for domain in domainList : + else: + for domain in domainList: # Is it in Yunohost dmomain list ? if domain not in yunohost.domain.domain_list(auth)['domains']: raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_domain_unknown', domain=domain)) - + status = _get_status(domain) # Does it expires soon ? - if not ((force) or (status["validity"] <= validity_limit)) : + if not ((force) or (status["validity"] <= validity_limit)): raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_attempt_to_renew_valid_cert', domain=domain)) - # Does it has a Let's Encrypt cert ? - if (status["CAtype"] != "Let's Encrypt") : + if (status["CAtype"] != "Let's Encrypt"): raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_attempt_to_renew_nonLE_cert', domain=domain)) - # Actual renew steps - for domain in domainList : - - logger.info("Now attempting renewing of certificate for domain "+domain+" !") + for domain in domainList: - try : + logger.info("Now attempting renewing of certificate for domain " + domain + " !") - if not no_checks : _check_domain_is_correctly_configured(domain) + try: + + if not no_checks: + _check_domain_is_correctly_configured(domain) _backup_current_cert(domain) _fetch_and_enable_new_certificate(domain) logger.success(m18n.n("certmanager_cert_renew_success", domain=domain)) - - except Exception as e : - - logger.error("Certificate renewing for "+domain+" failed !") + + except Exception as e: + + logger.error("Certificate renewing for " + domain + " failed !") logger.error(str(e)) - if (email) : + if (email): logger.error("Sending email with details to root ...") _email_renewing_failed(domain, e) - - - ############################################################################### # Back-end stuff # ############################################################################### -def _install_cron() : - +def _install_cron(): cron_job_file = "/etc/cron.weekly/certificateRenewer" - with open(cron_job_file, "w") as f : + with open(cron_job_file, "w") as f: f.write("#!/bin/bash\n") f.write("yunohost domain cert-renew --email\n") - _set_permissions(cron_job_file, "root", "root", 0755); + _set_permissions(cron_job_file, "root", "root", 0755) -def _email_renewing_failed(domain, e) : - from_ = "certmanager@"+domain+" (Certificate Manager)" - to_ = "root" - subject_ = "Certificate renewing attempt for "+domain+" failed!" +def _email_renewing_failed(domain, e): + from_ = "certmanager@" + domain + " (Certificate Manager)" + to_ = "root" + subject_ = "Certificate renewing attempt for " + domain + " failed!" exceptionMessage = str(e) logs = _tail(50, "/var/log/yunohost/yunohost-cli.log") @@ -357,29 +355,27 @@ Subject: %s """ % (from_, to_, subject_, text) smtp = smtplib.SMTP("localhost") - smtp.sendmail(from_, [ to_ ], message) + smtp.sendmail(from_, [to_], message) smtp.quit() - -def _configure_for_acme_challenge(auth, domain) : - - nginx_conf_file = "/etc/nginx/conf.d/"+domain+".d/000-acmechallenge.conf" +def _configure_for_acme_challenge(auth, domain): + nginx_conf_file = "/etc/nginx/conf.d/" + domain + ".d/000-acmechallenge.conf" nginx_configuration = ''' location '/.well-known/acme-challenge' { default_type "text/plain"; - alias '''+webroot_folder+'''; + alias ''' + webroot_folder + '''; } ''' # Write the conf - if os.path.exists(nginx_conf_file) : + if os.path.exists(nginx_conf_file): logger.info("Nginx configuration file for ACME challenge already exists for domain, skipping.") - else : + else: logger.info("Adding Nginx configuration file for Acme challenge for domain " + domain + ".") - with open(nginx_conf_file, "w") as f : + with open(nginx_conf_file, "w") as f: f.write(nginx_configuration) # Assume nginx conf is okay, and reload it @@ -389,29 +385,31 @@ location '/.well-known/acme-challenge' app_ssowatconf(auth) -def _fetch_and_enable_new_certificate(domain) : +def _fetch_and_enable_new_certificate(domain): # Make sure tmp folder exists logger.debug("Making sure tmp folders exists...") - if not (os.path.exists(webroot_folder)) : os.makedirs(webroot_folder) - if not (os.path.exists(tmp_folder)) : os.makedirs(tmp_folder) - _set_permissions(webroot_folder, "root", "www-data", 0650); - _set_permissions(tmp_folder, "root", "root", 0640); + if not (os.path.exists(webroot_folder)): + os.makedirs(webroot_folder) + if not (os.path.exists(tmp_folder)): + os.makedirs(tmp_folder) + _set_permissions(webroot_folder, "root", "www-data", 0650) + _set_permissions(tmp_folder, "root", "root", 0640) # Prepare certificate signing request - logger.info("Prepare key and certificate signing request (CSR) for "+domain+"...") + logger.info("Prepare key and certificate signing request (CSR) for " + domain + "...") - domain_key_file = tmp_folder+"/"+domain+".pem" + domain_key_file = tmp_folder + "/" + domain + ".pem" _generate_key(domain_key_file) - _set_permissions(domain_key_file, "root", "metronome", 0640); + _set_permissions(domain_key_file, "root", "metronome", 0640) _prepare_certificate_signing_request(domain, domain_key_file, tmp_folder) # Sign the certificate logger.info("Now using ACME Tiny to sign the certificate...") - domain_csr_file = tmp_folder+"/"+domain+".csr" + domain_csr_file = tmp_folder + "/" + domain + ".csr" signed_certificate = sign_certificate(account_key_file, domain_csr_file, @@ -425,30 +423,28 @@ def _fetch_and_enable_new_certificate(domain) : # Create corresponding directory date_tag = datetime.now().strftime("%Y%m%d.%H%M%S") - new_cert_folder = cert_folder+"/" + domain + "." + date_tag + new_cert_folder = cert_folder + "/" + domain + "." + date_tag os.makedirs(new_cert_folder) - _set_permissions(new_cert_folder, "root", "root", 0655); + _set_permissions(new_cert_folder, "root", "root", 0655) # Move the private key - shutil.move(domain_key_file, new_cert_folder+"/key.pem") + shutil.move(domain_key_file, new_cert_folder + "/key.pem") # Write the cert - domain_cert_file = new_cert_folder+"/crt.pem" - with open(domain_cert_file, "w") as f : + domain_cert_file = new_cert_folder + "/crt.pem" + with open(domain_cert_file, "w") as f: f.write(signed_certificate) f.write(intermediate_certificate) - _set_permissions(domain_cert_file, "root", "metronome", 0640); - - + _set_permissions(domain_cert_file, "root", "metronome", 0640) logger.info("Enabling the new certificate...") # Replace (if necessary) the link or folder for live cert - live_link = cert_folder+"/"+domain + live_link = cert_folder + "/" + domain - if not os.path.islink(live_link) : - shutil.rmtree(live_link) # Well, yep, hopefully that's not too dangerous (directory should have been backuped before calling this command) - elif os.path.lexists(live_link) : + if not os.path.islink(live_link): + shutil.rmtree(live_link) # Well, yep, hopefully that's not too dangerous (directory should have been backuped before calling this command) + elif os.path.lexists(live_link): os.remove(live_link) os.symlink(new_cert_folder, live_link) @@ -456,18 +452,16 @@ def _fetch_and_enable_new_certificate(domain) : # Check the status of the certificate is now good statusSummaryCode = _get_status(domain)["summaryCode"] - if (statusSummaryCode < 20) : + if (statusSummaryCode < 20): raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_certificate_fetching_or_enabling_failed', domain=domain)) - logger.info("Restarting services...") - for s in [ "nginx", "postfix", "dovecot", "metronome" ] : + for s in ["nginx", "postfix", "dovecot", "metronome"]: _run_service_command("restart", s) -def _prepare_certificate_signing_request(domain, key_file, output_folder) : - +def _prepare_certificate_signing_request(domain, key_file, output_folder): # Init a request csr = crypto.X509Req() @@ -475,7 +469,7 @@ def _prepare_certificate_signing_request(domain, key_file, output_folder) : csr.get_subject().CN = domain # Set the key - with open(key_file, 'rt') as f : + with open(key_file, 'rt') as f: key = crypto.load_privatekey(crypto.FILETYPE_PEM, f.read()) csr.set_pubkey(key) @@ -483,158 +477,169 @@ def _prepare_certificate_signing_request(domain, key_file, output_folder) : csr.sign(key, "sha256") # Save the request in tmp folder - csr_file = output_folder+domain+".csr" - logger.info("Saving to "+csr_file+" .") - with open(csr_file, "w") as f : + csr_file = output_folder + domain + ".csr" + logger.info("Saving to " + csr_file + " .") + with open(csr_file, "w") as f: f.write(crypto.dump_certificate_request(crypto.FILETYPE_PEM, csr)) -def _get_status(domain) : +def _get_status(domain): + cert_file = cert_folder + "/" + domain + "/crt.pem" - cert_file = cert_folder+"/"+domain+"/crt.pem" + if (not os.path.isfile(cert_file)): + return {} - if (not os.path.isfile(cert_file)) : return {} - - try : + try: cert = crypto.load_certificate(crypto.FILETYPE_PEM, open(cert_file).read()) - except : - raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_cannot_read_cert', domain=domain, file=cert_file)) + except: + raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_cannot_read_cert', domain=domain, file=cert_file)) certSubject = cert.get_subject().CN certIssuer = cert.get_issuer().CN - validUpTo = datetime.strptime(cert.get_notAfter(),"%Y%m%d%H%M%SZ") + validUpTo = datetime.strptime(cert.get_notAfter(), "%Y%m%d%H%M%SZ") daysRemaining = (validUpTo - datetime.now()).days CAtype = None - if (certIssuer == _name_selfCA()) : + if (certIssuer == _name_selfCA()): CAtype = "Self-signed" - elif (certIssuer.startswith("Let's Encrypt")) : + elif (certIssuer.startswith("Let's Encrypt")): CAtype = "Let's Encrypt" - elif (certIssuer.startswith("Fake LE")) : + elif (certIssuer.startswith("Fake LE")): CAtype = "Fake Let's Encrypt" - else : + else: CAtype = "Other / Unknown" # Unknown by default statusSummaryCode = 0 # Critical - if (daysRemaining <= 0) : statusSummaryCode = -30 + if (daysRemaining <= 0): + statusSummaryCode = -30 # Warning, self-signed, browser will display a warning discouraging visitors to enter website - elif (CAtype == "Self-signed") or (CAtype == "Fake Let's Encrypt") : statusSummaryCode = -20 + elif (CAtype == "Self-signed") or (CAtype == "Fake Let's Encrypt"): + statusSummaryCode = -20 # Attention, certificate will expire soon (should be renewed automatically if Let's Encrypt) - elif (daysRemaining < validity_limit) : statusSummaryCode = -10 + elif (daysRemaining < validity_limit): + statusSummaryCode = -10 # CA not known, but still a valid certificate, so okay ! - elif (CAtype == "Other / Unknown") : statusSummaryCode = 10 + elif (CAtype == "Other / Unknown"): + statusSummaryCode = 10 # Let's Encrypt, great ! - elif (CAtype == "Let's Encrypt") : statusSummaryCode = 20 + elif (CAtype == "Let's Encrypt"): + statusSummaryCode = 20 - return { "domain" : domain, - "subject" : certSubject, - "CAname" : certIssuer, - "CAtype" : CAtype, - "validity" : daysRemaining, - "summaryCode" : statusSummaryCode - } + return {"domain": domain, + "subject": certSubject, + "CAname": certIssuer, + "CAtype": CAtype, + "validity": daysRemaining, + "summaryCode": statusSummaryCode + } ############################################################################### # Misc small stuff ... # ############################################################################### -def _generate_account_key() : +def _generate_account_key(): logger.info("Generating account key ...") _generate_key(account_key_file) _set_permissions(account_key_file, "root", "root", 0400) -def _generate_key(destinationPath) : +def _generate_key(destinationPath): k = crypto.PKey() k.generate_key(crypto.TYPE_RSA, key_size) - with open(destinationPath, "w") as f : + with open(destinationPath, "w") as f: f.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, k)) -def _set_permissions(path, user, group, permissions) : +def _set_permissions(path, user, group, permissions): uid = pwd.getpwnam(user).pw_uid gid = grp.getgrnam(group).gr_gid os.chown(path, uid, gid) os.chmod(path, permissions) + def _backup_current_cert(domain): + logger.info("Backuping existing certificate for domain " + domain) - logger.info("Backuping existing certificate for domain "+domain) - - cert_folder_domain = cert_folder+"/"+domain + cert_folder_domain = cert_folder + "/" + domain dateTag = datetime.now().strftime("%Y%m%d.%H%M%S") - backup_folder = cert_folder_domain+"-backup-"+dateTag + backup_folder = cert_folder_domain + "-backup-" + dateTag shutil.copytree(cert_folder_domain, backup_folder) -def _check_domain_is_correctly_configured(domain) : - +def _check_domain_is_correctly_configured(domain): public_ip = yunohost.domain.get_public_ip() # Check if IP from DNS matches public IP - if not _dns_ip_match_public_ip(public_ip, domain) : + if not _dns_ip_match_public_ip(public_ip, domain): raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_domain_dns_ip_differs_from_public_ip', domain=domain)) # Check if domain seems to be accessible through HTTP ? - if not _domain_is_accessible_through_HTTP(public_ip, domain) : - raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_domain_http_not_working', domain=domain)) - -def _dns_ip_match_public_ip(public_ip, domain) : + if not _domain_is_accessible_through_HTTP(public_ip, domain): + raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_domain_http_not_working', domain=domain)) - try : - r = requests.get("http://dns-api.org/A/"+domain) - except : + +def _dns_ip_match_public_ip(public_ip, domain): + try: + r = requests.get("http://dns-api.org/A/" + domain) + except: raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_error_contacting_dns_api', api="dns-api.org")) - - if (r.text == "[{\"error\":\"NXDOMAIN\"}]") : + + if (r.text == "[{\"error\":\"NXDOMAIN\"}]"): raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_no_A_dns_record', domain=domain)) - try : + try: dns_ip = json.loads(r.text)[0]["value"] - except : + except: raise MoulinetteError(errno.EINVAL, m18n.n('certmanager_error_parsing_dns', domain=domain, value=r.text)) - if (dns_ip != public_ip) : + if (dns_ip != public_ip): return False - else : + else: return True -def _domain_is_accessible_through_HTTP(ip, domain) : - try : - requests.head("http://"+ip, headers = { "Host" : domain }) - except Exception : +def _domain_is_accessible_through_HTTP(ip, domain): + try: + requests.head("http://" + ip, headers={"Host": domain}) + except Exception: return False return True -def _summary_code_to_string(code) : - if (code <= -30) : return "CRITICAL" - elif (code <= -20) : return "WARNING" - elif (code <= -10) : return "Attention" - elif (code <= 0) : return "Unknown?" - elif (code <= 10) : return "Good" - elif (code <= 20) : return "Great!" +def _summary_code_to_string(code): + if (code <= -30): + return "CRITICAL" + elif (code <= -20): + return "WARNING" + elif (code <= -10): + return "Attention" + elif (code <= 0): + return "Unknown?" + elif (code <= 10): + return "Good" + elif (code <= 20): + return "Great!" return "Unknown?" -def _name_selfCA() : +def _name_selfCA(): cert = crypto.load_certificate(crypto.FILETYPE_PEM, open(selfCA_file).read()) return cert.get_subject().CN def _tail(n, filePath): - stdin,stdout = os.popen2("tail -n "+str(n)+" "+filePath) - stdin.close() - lines = stdout.readlines(); stdout.close() - lines = "".join(lines) - return lines + stdin, stdout = os.popen2("tail -n " + str(n) + " " + filePath) + stdin.close() + lines = stdout.readlines() + stdout.close() + lines = "".join(lines) + return lines