[mod] propagate the no_checks logic to acme-tiny code

This commit is contained in:
Laurent Peuch 2018-07-22 11:24:32 +02:00
parent 3facf89c7e
commit f528893b4d
2 changed files with 16 additions and 14 deletions

View file

@ -289,7 +289,7 @@ def _certificate_install_letsencrypt(auth, domain_list, force=False, no_checks=F
_check_domain_is_ready_for_ACME(domain) _check_domain_is_ready_for_ACME(domain)
_configure_for_acme_challenge(auth, domain) _configure_for_acme_challenge(auth, domain)
_fetch_and_enable_new_certificate(domain, staging) _fetch_and_enable_new_certificate(domain, staging, no_checks=no_checks)
_install_cron() _install_cron()
logger.success( logger.success(
@ -383,7 +383,7 @@ def certificate_renew(auth, domain_list, force=False, no_checks=False, email=Fal
if not no_checks: if not no_checks:
_check_domain_is_ready_for_ACME(domain) _check_domain_is_ready_for_ACME(domain)
_fetch_and_enable_new_certificate(domain, staging) _fetch_and_enable_new_certificate(domain, staging, no_checks=no_checks)
logger.success( logger.success(
m18n.n("certmanager_cert_renew_success", domain=domain)) m18n.n("certmanager_cert_renew_success", domain=domain))
@ -521,7 +521,7 @@ def _check_acme_challenge_configuration(domain):
return True return True
def _fetch_and_enable_new_certificate(domain, staging=False): def _fetch_and_enable_new_certificate(domain, staging=False, no_checks=False):
# Make sure tmp folder exists # Make sure tmp folder exists
logger.debug("Making sure tmp folders exists...") logger.debug("Making sure tmp folders exists...")
@ -562,6 +562,7 @@ def _fetch_and_enable_new_certificate(domain, staging=False):
domain_csr_file, domain_csr_file,
WEBROOT_FOLDER, WEBROOT_FOLDER,
log=logger, log=logger,
no_checks=no_checks,
CA=certification_authority) CA=certification_authority)
except ValueError as e: except ValueError as e:
if "urn:acme:error:rateLimited" in str(e): if "urn:acme:error:rateLimited" in str(e):

View file

@ -12,7 +12,7 @@ LOGGER = logging.getLogger(__name__)
LOGGER.addHandler(logging.StreamHandler()) LOGGER.addHandler(logging.StreamHandler())
LOGGER.setLevel(logging.INFO) LOGGER.setLevel(logging.INFO)
def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA): def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, no_checks=False):
# helper function base64 encode for jose spec # helper function base64 encode for jose spec
def _b64(b): def _b64(b):
return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "") return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "")
@ -111,16 +111,17 @@ def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA):
with open(wellknown_path, "w") as wellknown_file: with open(wellknown_path, "w") as wellknown_file:
wellknown_file.write(keyauthorization) wellknown_file.write(keyauthorization)
# check that the file is in place if not no_checks: # sometime the local g
wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(domain, token) # check that the file is in place
try: wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(domain, token)
resp = urlopen(wellknown_url) try:
resp_data = resp.read().decode('utf8').strip() resp = urlopen(wellknown_url)
assert resp_data == keyauthorization resp_data = resp.read().decode('utf8').strip()
except (IOError, AssertionError): assert resp_data == keyauthorization
os.remove(wellknown_path) except (IOError, AssertionError):
raise ValueError("Wrote file to {0}, but couldn't download {1}".format( os.remove(wellknown_path)
wellknown_path, wellknown_url)) raise ValueError("Wrote file to {0}, but couldn't download {1}".format(
wellknown_path, wellknown_url))
# notify challenge are met # notify challenge are met
code, result = _send_signed_request(challenge['uri'], { code, result = _send_signed_request(challenge['uri'], {