ACME modules refactor (#187)

* Move acme.py to acme/__init__.py to prepare splitup.

* Began moving generic code out.

* Creating backends.

* Update unit tests.

* Move remaining new code out.

* Use new interface.

* Rewrite module init code.

* Add changelog.

* Add BackendException for crypto backend errors.

* Improve / uniformize ACME error reporting.

* Create ACMELegacyAccount for backwards compatibility.

* Split up ACMEAccount into ACMEClient and ACMEAccount.

* Move get_keyauthorization into module_utils.acme.challenges.

* Improve error handling.

* Move challenge and authorization handling code into module_utils.

* Add split_identifier helper.

* Move order code into module_utils.

* Move ACME v2 certificate handling code to module_utils.

* Fix/move ACME v1 certificate retrieval to module_utils as well.

* Refactor alternate chain handling code by splitting it up into simpler functions.

* Make chain matcher creation part of backend.
This commit is contained in:
Felix Fontein
2021-03-21 09:40:25 +01:00
committed by GitHub
parent 8de9376a10
commit 5d32937321
30 changed files with 3029 additions and 1977 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,90 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2016 Michael Gruener <michael.gruener@chaosmoon.net>
# Copyright: (c) 2021 Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import base64
import binascii
import copy
import datetime
import hashlib
import json
import locale
import os
import re
import shutil
import sys
import tempfile
import traceback
from ansible.module_utils.basic import missing_required_lib
from ansible.module_utils.urls import fetch_url
from ansible.module_utils.six.moves.urllib.parse import unquote
from ansible.module_utils._text import to_native, to_text, to_bytes
from ansible_collections.community.crypto.plugins.module_utils.acme.acme import (
get_default_argspec,
ACMEDirectory,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.backend_cryptography import (
CryptographyBackend,
CRYPTOGRAPHY_VERSION,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.backend_openssl_cli import (
OpenSSLCLIBackend,
)
from ansible_collections.community.crypto.plugins.module_utils.acme._compatibility import (
handle_standard_module_arguments,
set_crypto_backend,
HAS_CURRENT_CRYPTOGRAPHY,
)
from ansible_collections.community.crypto.plugins.module_utils.acme._compatibility import ACMELegacyAccount as ACMEAccount
from ansible_collections.community.crypto.plugins.module_utils.acme.errors import ModuleFailException
from ansible_collections.community.crypto.plugins.module_utils.acme.io import (
read_file,
write_file,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.utils import (
nopad_b64,
pem_to_der,
process_links,
)
def openssl_get_csr_identifiers(openssl_binary, module, csr_filename, csr_content=None):
module.deprecate(
'Please adjust your custom module/plugin to the ACME module_utils refactor '
'(https://github.com/ansible-collections/community.crypto/pull/184). The '
'compatibility layer will be removed in community.crypto 2.0.0, thus breaking '
'your code', version='2.0.0', collection_name='community.crypto')
return OpenSSLCLIBackend(module, openssl_binary=openssl_binary).get_csr_identifiers(csr_filename=csr_filename, csr_content=csr_content)
def cryptography_get_csr_identifiers(module, csr_filename, csr_content=None):
module.deprecate(
'Please adjust your custom module/plugin to the ACME module_utils refactor '
'(https://github.com/ansible-collections/community.crypto/pull/184). The '
'compatibility layer will be removed in community.crypto 2.0.0, thus breaking '
'your code', version='2.0.0', collection_name='community.crypto')
return CryptographyBackend(module).get_csr_identifiers(csr_filename=csr_filename, csr_content=csr_content)
def cryptography_get_cert_days(module, cert_file, now=None):
module.deprecate(
'Please adjust your custom module/plugin to the ACME module_utils refactor '
'(https://github.com/ansible-collections/community.crypto/pull/184). The '
'compatibility layer will be removed in community.crypto 2.0.0, thus breaking '
'your code', version='2.0.0', collection_name='community.crypto')
return CryptographyBackend(module).get_cert_days(cert_filename=cert_file, now=now)

View File

@@ -0,0 +1,267 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2016 Michael Gruener <michael.gruener@chaosmoon.net>
# Copyright: (c) 2021 Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import locale
from ansible.module_utils.basic import missing_required_lib
from ansible_collections.community.crypto.plugins.module_utils.acme.backend_cryptography import HAS_CURRENT_CRYPTOGRAPHY as _ORIGINAL_HAS_CURRENT_CRYPTOGRAPHY
from ansible_collections.community.crypto.plugins.module_utils.acme.backend_cryptography import (
CryptographyBackend,
CRYPTOGRAPHY_VERSION,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.backend_openssl_cli import (
OpenSSLCLIBackend,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.acme import (
ACMEClient,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.account import (
ACMEAccount,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.challenges import (
create_key_authorization,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.errors import (
KeyParsingError,
)
HAS_CURRENT_CRYPTOGRAPHY = _ORIGINAL_HAS_CURRENT_CRYPTOGRAPHY
def set_crypto_backend(module):
'''
Sets which crypto backend to use (default: auto detection).
Does not care whether a new enough cryptoraphy is available or not. Must
be called before any real stuff is done which might evaluate
``HAS_CURRENT_CRYPTOGRAPHY``.
'''
global HAS_CURRENT_CRYPTOGRAPHY
module.deprecate(
'Please adjust your custom module/plugin to the ACME module_utils refactor '
'(https://github.com/ansible-collections/community.crypto/pull/184). The '
'compatibility layer will be removed in community.crypto 2.0.0, thus breaking '
'your code', version='2.0.0', collection_name='community.crypto')
# Choose backend
backend = module.params['select_crypto_backend']
if backend == 'auto':
pass
elif backend == 'openssl':
HAS_CURRENT_CRYPTOGRAPHY = False
elif backend == 'cryptography':
if not _ORIGINAL_HAS_CURRENT_CRYPTOGRAPHY:
module.fail_json(msg=missing_required_lib('cryptography'))
HAS_CURRENT_CRYPTOGRAPHY = True
else:
module.fail_json(msg='Unknown crypto backend "{0}"!'.format(backend))
# Inform about choices
if HAS_CURRENT_CRYPTOGRAPHY:
module.debug('Using cryptography backend (library version {0})'.format(CRYPTOGRAPHY_VERSION))
return 'cryptography'
else:
module.debug('Using OpenSSL binary backend')
return 'openssl'
def handle_standard_module_arguments(module, needs_acme_v2=False):
'''
Do standard module setup, argument handling and warning emitting.
'''
backend = set_crypto_backend(module)
if not module.params['validate_certs']:
module.warn(
'Disabling certificate validation for communications with ACME endpoint. '
'This should only be done for testing against a local ACME server for '
'development purposes, but *never* for production purposes.'
)
if module.params['acme_version'] is None:
module.params['acme_version'] = 1
module.deprecate("The option 'acme_version' will be required from community.crypto 2.0.0 on",
version='2.0.0', collection_name='community.crypto')
if module.params['acme_directory'] is None:
module.params['acme_directory'] = 'https://acme-staging.api.letsencrypt.org/directory'
module.deprecate("The option 'acme_directory' will be required from community.crypto 2.0.0 on",
version='2.0.0', collection_name='community.crypto')
if needs_acme_v2 and module.params['acme_version'] < 2:
module.fail_json(msg='The {0} module requires the ACME v2 protocol!'.format(module._name))
# AnsibleModule() changes the locale, so change it back to C because we rely on time.strptime() when parsing certificate dates.
module.run_command_environ_update = dict(LANG='C', LC_ALL='C', LC_MESSAGES='C', LC_CTYPE='C')
locale.setlocale(locale.LC_ALL, 'C')
return backend
def get_compatibility_backend(module):
if HAS_CURRENT_CRYPTOGRAPHY:
return CryptographyBackend(module)
else:
return OpenSSLCLIBackend(module)
class ACMELegacyAccount(object):
'''
ACME account object. Handles the authorized communication with the
ACME server. Provides access to account bound information like
the currently active authorizations and valid certificates
'''
def __init__(self, module):
module.deprecate(
'Please adjust your custom module/plugin to the ACME module_utils refactor '
'(https://github.com/ansible-collections/community.crypto/pull/184). The '
'compatibility layer will be removed in community.crypto 2.0.0, thus breaking '
'your code', version='2.0.0', collection_name='community.crypto')
backend = get_compatibility_backend(module)
self.client = ACMEClient(module, backend)
self.account = ACMEAccount(self.client)
self.key = self.client.account_key_file
self.key_content = self.client.account_key_content
self.uri = self.client.account_uri
self.key_data = self.client.account_key_data
self.jwk = self.client.account_jwk
self.jws_header = self.client.account_jws_header
self.directory = self.client.directory
def get_keyauthorization(self, token):
'''
Returns the key authorization for the given token
https://tools.ietf.org/html/rfc8555#section-8.1
'''
return create_key_authorization(self.client, token)
def parse_key(self, key_file=None, key_content=None):
'''
Parses an RSA or Elliptic Curve key file in PEM format and returns a pair
(error, key_data).
'''
try:
return None, self.client.parse_key(key_file=key_file, key_content=key_content)
except KeyParsingError as e:
return e.msg, None
def sign_request(self, protected, payload, key_data, encode_payload=True):
return self.client.sign_request(protected, payload, key_data, encode_payload=encode_payload)
def send_signed_request(self, url, payload, key_data=None, jws_header=None, parse_json_result=True, encode_payload=True):
'''
Sends a JWS signed HTTP POST request to the ACME server and returns
the response as dictionary
https://tools.ietf.org/html/rfc8555#section-6.2
If payload is None, a POST-as-GET is performed.
(https://tools.ietf.org/html/rfc8555#section-6.3)
'''
return self.client.send_signed_request(
url,
payload,
key_data=key_data,
jws_header=jws_header,
parse_json_result=parse_json_result,
encode_payload=encode_payload,
fail_on_error=False,
)
def get_request(self, uri, parse_json_result=True, headers=None, get_only=False, fail_on_error=True):
'''
Perform a GET-like request. Will try POST-as-GET for ACMEv2, with fallback
to GET if server replies with a status code of 405.
'''
return self.client.get_request(
uri,
parse_json_result=parse_json_result,
headers=headers,
get_only=get_only,
fail_on_error=fail_on_error,
)
def set_account_uri(self, uri):
'''
Set account URI. For ACME v2, it needs to be used to sending signed
requests.
'''
self.client.set_account_uri(uri)
self.uri = self.client.account_uri
def get_account_data(self):
'''
Retrieve account information. Can only be called when the account
URI is already known (such as after calling setup_account).
Return None if the account was deactivated, or a dict otherwise.
'''
return self.account.get_account_data()
def setup_account(self, contact=None, agreement=None, terms_agreed=False,
allow_creation=True, remove_account_uri_if_not_exists=False,
external_account_binding=None):
'''
Detect or create an account on the ACME server. For ACME v1,
as the only way (without knowing an account URI) to test if an
account exists is to try and create one with the provided account
key, this method will always result in an account being present
(except on error situations). For ACME v2, a new account will
only be created if ``allow_creation`` is set to True.
For ACME v2, ``check_mode`` is fully respected. For ACME v1, the
account might be created if it does not yet exist.
Return a pair ``(created, account_data)``. Here, ``created`` will
be ``True`` in case the account was created or would be created
(check mode). ``account_data`` will be the current account data,
or ``None`` if the account does not exist.
The account URI will be stored in ``self.uri``; if it is ``None``,
the account does not exist.
If specified, ``external_account_binding`` should be a dictionary
with keys ``kid``, ``alg`` and ``key``
(https://tools.ietf.org/html/rfc8555#section-7.3.4).
https://tools.ietf.org/html/rfc8555#section-7.3
'''
result = self.account.setup_account(
contact=contact,
agreement=agreement,
terms_agreed=terms_agreed,
allow_creation=allow_creation,
remove_account_uri_if_not_exists=remove_account_uri_if_not_exists,
external_account_binding=external_account_binding,
)
self.uri = self.client.account_uri
return result
def update_account(self, account_data, contact=None):
'''
Update an account on the ACME server. Check mode is fully respected.
The current account data must be provided as ``account_data``.
Return a pair ``(updated, account_data)``, where ``updated`` is
``True`` in case something changed (contact info updated) or
would be changed (check mode), and ``account_data`` the updated
account data.
https://tools.ietf.org/html/rfc8555#section-7.3.2
'''
return self.account.update_account(account_data, contact=contact)

View File

@@ -0,0 +1,251 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2016 Michael Gruener <michael.gruener@chaosmoon.net>
# Copyright: (c) 2021 Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
from ansible_collections.community.crypto.plugins.module_utils.acme.errors import (
ACMEProtocolException,
ModuleFailException,
)
class ACMEAccount(object):
'''
ACME account object. Allows to create new accounts, check for existence of accounts,
retrieve account data.
'''
def __init__(self, client):
# Set to true to enable logging of all signed requests
self._debug = False
self.client = client
def _new_reg(self, contact=None, agreement=None, terms_agreed=False, allow_creation=True,
external_account_binding=None):
'''
Registers a new ACME account. Returns a pair ``(created, data)``.
Here, ``created`` is ``True`` if the account was created and
``False`` if it already existed (e.g. it was not newly created),
or does not exist. In case the account was created or exists,
``data`` contains the account data; otherwise, it is ``None``.
If specified, ``external_account_binding`` should be a dictionary
with keys ``kid``, ``alg`` and ``key``
(https://tools.ietf.org/html/rfc8555#section-7.3.4).
https://tools.ietf.org/html/rfc8555#section-7.3
'''
contact = contact or []
if self.client.version == 1:
new_reg = {
'resource': 'new-reg',
'contact': contact
}
if agreement:
new_reg['agreement'] = agreement
else:
new_reg['agreement'] = self.client.directory['meta']['terms-of-service']
if external_account_binding is not None:
raise ModuleFailException('External account binding is not supported for ACME v1')
url = self.client.directory['new-reg']
else:
if (external_account_binding is not None or self.client.directory['meta'].get('externalAccountRequired')) and allow_creation:
# Some ACME servers such as ZeroSSL do not like it when you try to register an existing account
# and provide external_account_binding credentials. Thus we first send a request with allow_creation=False
# to see whether the account already exists.
# Note that we pass contact here: ZeroSSL does not accept regisration calls without contacts, even
# if onlyReturnExisting is set to true.
created, data = self._new_reg(contact=contact, allow_creation=False)
if data:
# An account already exists! Return data
return created, data
# An account does not yet exist. Try to create one next.
new_reg = {
'contact': contact
}
if not allow_creation:
# https://tools.ietf.org/html/rfc8555#section-7.3.1
new_reg['onlyReturnExisting'] = True
if terms_agreed:
new_reg['termsOfServiceAgreed'] = True
url = self.client.directory['newAccount']
if external_account_binding is not None:
new_reg['externalAccountBinding'] = self.client.sign_request(
{
'alg': external_account_binding['alg'],
'kid': external_account_binding['kid'],
'url': url,
},
self.client.account_jwk,
self.client.backend.create_mac_key(external_account_binding['alg'], external_account_binding['key'])
)
elif self.client.directory['meta'].get('externalAccountRequired') and allow_creation:
raise ModuleFailException(
'To create an account, an external account binding must be specified. '
'Use the acme_account module with the external_account_binding option.'
)
result, info = self.client.send_signed_request(url, new_reg, fail_on_error=False)
if info['status'] in ([200, 201] if self.client.version == 1 else [201]):
# Account did not exist
if 'location' in info:
self.client.set_account_uri(info['location'])
return True, result
elif info['status'] == (409 if self.client.version == 1 else 200):
# Account did exist
if result.get('status') == 'deactivated':
# A bug in Pebble (https://github.com/letsencrypt/pebble/issues/179) and
# Boulder (https://github.com/letsencrypt/boulder/issues/3971): this should
# not return a valid account object according to
# https://tools.ietf.org/html/rfc8555#section-7.3.6:
# "Once an account is deactivated, the server MUST NOT accept further
# requests authorized by that account's key."
if not allow_creation:
return False, None
else:
raise ModuleFailException("Account is deactivated")
if 'location' in info:
self.client.set_account_uri(info['location'])
return False, result
elif info['status'] == 400 and result['type'] == 'urn:ietf:params:acme:error:accountDoesNotExist' and not allow_creation:
# Account does not exist (and we didn't try to create it)
return False, None
elif info['status'] == 403 and result['type'] == 'urn:ietf:params:acme:error:unauthorized' and 'deactivated' in (result.get('detail') or ''):
# Account has been deactivated; currently works for Pebble; hasn't been
# implemented for Boulder (https://github.com/letsencrypt/boulder/issues/3971),
# might need adjustment in error detection.
if not allow_creation:
return False, None
else:
raise ModuleFailException("Account is deactivated")
else:
raise ACMEProtocolException(
self.client.module, msg='Registering ACME account failed', info=info, content_json=result)
def get_account_data(self):
'''
Retrieve account information. Can only be called when the account
URI is already known (such as after calling setup_account).
Return None if the account was deactivated, or a dict otherwise.
'''
if self.client.account_uri is None:
raise ModuleFailException("Account URI unknown")
if self.client.version == 1:
data = {}
data['resource'] = 'reg'
result, info = self.client.send_signed_request(self.client.account_uri, data, fail_on_error=False)
else:
# try POST-as-GET first (draft-15 or newer)
data = None
result, info = self.client.send_signed_request(self.client.account_uri, data, fail_on_error=False)
# check whether that failed with a malformed request error
if info['status'] >= 400 and result.get('type') == 'urn:ietf:params:acme:error:malformed':
# retry as a regular POST (with no changed data) for pre-draft-15 ACME servers
data = {}
result, info = self.client.send_signed_request(self.client.account_uri, data, fail_on_error=False)
if info['status'] in (400, 403) and result.get('type') == 'urn:ietf:params:acme:error:unauthorized':
# Returned when account is deactivated
return None
if info['status'] in (400, 404) and result.get('type') == 'urn:ietf:params:acme:error:accountDoesNotExist':
# Returned when account does not exist
return None
if info['status'] < 200 or info['status'] >= 300:
raise ACMEProtocolException(
self.client.module, msg='Error retrieving account data', info=info, content_json=result)
return result
def setup_account(self, contact=None, agreement=None, terms_agreed=False,
allow_creation=True, remove_account_uri_if_not_exists=False,
external_account_binding=None):
'''
Detect or create an account on the ACME server. For ACME v1,
as the only way (without knowing an account URI) to test if an
account exists is to try and create one with the provided account
key, this method will always result in an account being present
(except on error situations). For ACME v2, a new account will
only be created if ``allow_creation`` is set to True.
For ACME v2, ``check_mode`` is fully respected. For ACME v1, the
account might be created if it does not yet exist.
Return a pair ``(created, account_data)``. Here, ``created`` will
be ``True`` in case the account was created or would be created
(check mode). ``account_data`` will be the current account data,
or ``None`` if the account does not exist.
The account URI will be stored in ``client.account_uri``; if it is ``None``,
the account does not exist.
If specified, ``external_account_binding`` should be a dictionary
with keys ``kid``, ``alg`` and ``key``
(https://tools.ietf.org/html/rfc8555#section-7.3.4).
https://tools.ietf.org/html/rfc8555#section-7.3
'''
if self.client.account_uri is not None:
created = False
# Verify that the account key belongs to the URI.
# (If update_contact is True, this will be done below.)
account_data = self.get_account_data()
if account_data is None:
if remove_account_uri_if_not_exists and not allow_creation:
self.client.account_uri = None
else:
raise ModuleFailException("Account is deactivated or does not exist!")
else:
created, account_data = self._new_reg(
contact,
agreement=agreement,
terms_agreed=terms_agreed,
allow_creation=allow_creation and not self.client.module.check_mode,
external_account_binding=external_account_binding,
)
if self.client.module.check_mode and self.client.account_uri is None and allow_creation:
created = True
account_data = {
'contact': contact or []
}
return created, account_data
def update_account(self, account_data, contact=None):
'''
Update an account on the ACME server. Check mode is fully respected.
The current account data must be provided as ``account_data``.
Return a pair ``(updated, account_data)``, where ``updated`` is
``True`` in case something changed (contact info updated) or
would be changed (check mode), and ``account_data`` the updated
account data.
https://tools.ietf.org/html/rfc8555#section-7.3.2
'''
# Create request
update_request = {}
if contact is not None and account_data.get('contact', []) != contact:
update_request['contact'] = list(contact)
# No change?
if not update_request:
return False, dict(account_data)
# Apply change
if self.client.module.check_mode:
account_data = dict(account_data)
account_data.update(update_request)
else:
if self.client.version == 1:
update_request['resource'] = 'reg'
account_data, dummy = self.client.send_signed_request(self.client.account_uri, update_request)
return True, account_data

View File

@@ -0,0 +1,366 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2016 Michael Gruener <michael.gruener@chaosmoon.net>
# Copyright: (c) 2021 Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import copy
import datetime
import json
import locale
from ansible.module_utils.basic import missing_required_lib
from ansible.module_utils.urls import fetch_url
from ansible.module_utils._text import to_bytes
from ansible_collections.community.crypto.plugins.module_utils.acme.backend_openssl_cli import (
OpenSSLCLIBackend,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.backend_cryptography import (
CryptographyBackend,
CRYPTOGRAPHY_VERSION,
HAS_CURRENT_CRYPTOGRAPHY,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.errors import (
ACMEProtocolException,
NetworkException,
ModuleFailException,
KeyParsingError,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.utils import (
nopad_b64,
)
def _assert_fetch_url_success(module, response, info, allow_redirect=False, allow_client_error=True, allow_server_error=True):
if info['status'] < 0:
raise NetworkException(msg="Failure downloading %s, %s" % (info['url'], info['msg']))
if (300 <= info['status'] < 400 and not allow_redirect) or \
(400 <= info['status'] < 500 and not allow_client_error) or \
(info['status'] >= 500 and not allow_server_error):
raise ACMEProtocolException(module, info=info, response=response)
def _is_failed(info, expected_status_codes=None):
if info['status'] < 200 or info['status'] >= 400:
return True
if expected_status_codes is not None and info['status'] not in expected_status_codes:
return True
return False
class ACMEDirectory(object):
'''
The ACME server directory. Gives access to the available resources,
and allows to obtain a Replay-Nonce. The acme_directory URL
needs to support unauthenticated GET requests; ACME endpoints
requiring authentication are not supported.
https://tools.ietf.org/html/rfc8555#section-7.1.1
'''
def __init__(self, module, account):
self.module = module
self.directory_root = module.params['acme_directory']
self.version = module.params['acme_version']
self.directory, dummy = account.get_request(self.directory_root, get_only=True)
# Check whether self.version matches what we expect
if self.version == 1:
for key in ('new-reg', 'new-authz', 'new-cert'):
if key not in self.directory:
raise ModuleFailException("ACME directory does not seem to follow protocol ACME v1")
if self.version == 2:
for key in ('newNonce', 'newAccount', 'newOrder'):
if key not in self.directory:
raise ModuleFailException("ACME directory does not seem to follow protocol ACME v2")
def __getitem__(self, key):
return self.directory[key]
def get_nonce(self, resource=None):
url = self.directory_root if self.version == 1 else self.directory['newNonce']
if resource is not None:
url = resource
dummy, info = fetch_url(self.module, url, method='HEAD')
if info['status'] not in (200, 204):
raise NetworkException("Failed to get replay-nonce, got status {0}".format(info['status']))
return info['replay-nonce']
class ACMEClient(object):
'''
ACME client object. Handles the authorized communication with the
ACME server.
'''
def __init__(self, module, backend):
# Set to true to enable logging of all signed requests
self._debug = False
self.module = module
self.backend = backend
self.version = module.params['acme_version']
# account_key path and content are mutually exclusive
self.account_key_file = module.params['account_key_src']
self.account_key_content = module.params['account_key_content']
# Grab account URI from module parameters.
# Make sure empty string is treated as None.
self.account_uri = module.params.get('account_uri') or None
self.account_key_data = None
self.account_jwk = None
self.account_jws_header = None
if self.account_key_file is not None or self.account_key_content is not None:
try:
self.account_key_data = self.parse_key(key_file=self.account_key_file, key_content=self.account_key_content)
except KeyParsingError as e:
raise ModuleFailException("Error while parsing account key: {msg}".format(msg=e.msg))
self.account_jwk = self.account_key_data['jwk']
self.account_jws_header = {
"alg": self.account_key_data['alg'],
"jwk": self.account_jwk,
}
if self.account_uri:
# Make sure self.account_jws_header is updated
self.set_account_uri(self.account_uri)
self.directory = ACMEDirectory(module, self)
def set_account_uri(self, uri):
'''
Set account URI. For ACME v2, it needs to be used to sending signed
requests.
'''
self.account_uri = uri
if self.version != 1:
self.account_jws_header.pop('jwk')
self.account_jws_header['kid'] = self.account_uri
def parse_key(self, key_file=None, key_content=None):
'''
Parses an RSA or Elliptic Curve key file in PEM format and returns key_data.
In case of an error, raises KeyParsingError.
'''
if key_file is None and key_content is None:
raise AssertionError('One of key_file and key_content must be specified!')
error, key_data = self.backend.parse_key(key_file, key_content)
if error:
raise KeyParsingError(error)
return key_data
def sign_request(self, protected, payload, key_data, encode_payload=True):
'''
Signs an ACME request.
'''
try:
if payload is None:
# POST-as-GET
payload64 = ''
else:
# POST
if encode_payload:
payload = self.module.jsonify(payload).encode('utf8')
payload64 = nopad_b64(to_bytes(payload))
protected64 = nopad_b64(self.module.jsonify(protected).encode('utf8'))
except Exception as e:
raise ModuleFailException("Failed to encode payload / headers as JSON: {0}".format(e))
return self.backend.sign(payload64, protected64, key_data)
def _log(self, msg, data=None):
'''
Write arguments to acme.log when logging is enabled.
'''
if self._debug:
with open('acme.log', 'ab') as f:
f.write('[{0}] {1}\n'.format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%s'), msg).encode('utf-8'))
if data is not None:
f.write('{0}\n\n'.format(json.dumps(data, indent=2, sort_keys=True)).encode('utf-8'))
def send_signed_request(self, url, payload, key_data=None, jws_header=None, parse_json_result=True,
encode_payload=True, fail_on_error=True, error_msg=None, expected_status_codes=None):
'''
Sends a JWS signed HTTP POST request to the ACME server and returns
the response as dictionary (if parse_json_result is True) or in raw form
(if parse_json_result is False).
https://tools.ietf.org/html/rfc8555#section-6.2
If payload is None, a POST-as-GET is performed.
(https://tools.ietf.org/html/rfc8555#section-6.3)
'''
key_data = key_data or self.account_key_data
jws_header = jws_header or self.account_jws_header
failed_tries = 0
while True:
protected = copy.deepcopy(jws_header)
protected["nonce"] = self.directory.get_nonce()
if self.version != 1:
protected["url"] = url
self._log('URL', url)
self._log('protected', protected)
self._log('payload', payload)
data = self.sign_request(protected, payload, key_data, encode_payload=encode_payload)
if self.version == 1:
data["header"] = jws_header.copy()
for k, v in protected.items():
dummy = data["header"].pop(k, None)
self._log('signed request', data)
data = self.module.jsonify(data)
headers = {
'Content-Type': 'application/jose+json',
}
resp, info = fetch_url(self.module, url, data=data, headers=headers, method='POST')
_assert_fetch_url_success(self.module, resp, info)
result = {}
try:
content = resp.read()
except AttributeError:
content = info.pop('body', None)
if content or not parse_json_result:
if (parse_json_result and info['content-type'].startswith('application/json')) or 400 <= info['status'] < 600:
try:
decoded_result = self.module.from_json(content.decode('utf8'))
self._log('parsed result', decoded_result)
# In case of badNonce error, try again (up to 5 times)
# (https://tools.ietf.org/html/rfc8555#section-6.7)
if all((
400 <= info['status'] < 600,
decoded_result.get('type') == 'urn:ietf:params:acme:error:badNonce',
failed_tries <= 5,
)):
failed_tries += 1
continue
if parse_json_result:
result = decoded_result
else:
result = content
except ValueError:
raise NetworkException("Failed to parse the ACME response: {0} {1}".format(url, content))
else:
result = content
if fail_on_error and _is_failed(info, expected_status_codes=expected_status_codes):
raise ACMEProtocolException(
self.module, msg=error_msg, info=info, content=content, content_json=result if parse_json_result else None)
return result, info
def get_request(self, uri, parse_json_result=True, headers=None, get_only=False,
fail_on_error=True, error_msg=None, expected_status_codes=None):
'''
Perform a GET-like request. Will try POST-as-GET for ACMEv2, with fallback
to GET if server replies with a status code of 405.
'''
if not get_only and self.version != 1:
# Try POST-as-GET
content, info = self.send_signed_request(uri, None, parse_json_result=False, fail_on_error=False)
if info['status'] == 405:
# Instead, do unauthenticated GET
get_only = True
else:
# Do unauthenticated GET
get_only = True
if get_only:
# Perform unauthenticated GET
resp, info = fetch_url(self.module, uri, method='GET', headers=headers)
_assert_fetch_url_success(self.module, resp, info)
try:
content = resp.read()
except AttributeError:
content = info.pop('body', None)
# Process result
if parse_json_result:
result = {}
if content:
if info['content-type'].startswith('application/json'):
try:
result = self.module.from_json(content.decode('utf8'))
except ValueError:
raise NetworkException("Failed to parse the ACME response: {0} {1}".format(uri, content))
else:
result = content
else:
result = content
if fail_on_error and _is_failed(info, expected_status_codes=expected_status_codes):
raise ACMEProtocolException(
self.module, msg=error_msg, info=info, content=content, content_json=result if parse_json_result else None)
return result, info
def get_default_argspec():
'''
Provides default argument spec for the options documented in the acme doc fragment.
'''
return dict(
account_key_src=dict(type='path', aliases=['account_key']),
account_key_content=dict(type='str', no_log=True),
account_uri=dict(type='str'),
acme_directory=dict(type='str'),
acme_version=dict(type='int', choices=[1, 2]),
validate_certs=dict(type='bool', default=True),
select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'openssl', 'cryptography']),
)
def create_backend(module, needs_acme_v2):
backend = module.params['select_crypto_backend']
# Backend autodetect
if backend == 'auto':
backend = 'cryptography' if HAS_CURRENT_CRYPTOGRAPHY else 'openssl'
# Create backend object
if backend == 'cryptography':
if not HAS_CURRENT_CRYPTOGRAPHY:
module.fail_json(msg=missing_required_lib('cryptography'))
module.debug('Using cryptography backend (library version {0})'.format(CRYPTOGRAPHY_VERSION))
module_backend = CryptographyBackend(module)
elif backend == 'openssl':
module.debug('Using OpenSSL binary backend')
module_backend = OpenSSLCLIBackend(module)
else:
module.fail_json(msg='Unknown crypto backend "{0}"!'.format(backend))
# Check common module parameters
if not module.params['validate_certs']:
module.warn(
'Disabling certificate validation for communications with ACME endpoint. '
'This should only be done for testing against a local ACME server for '
'development purposes, but *never* for production purposes.'
)
if module.params['acme_version'] is None:
module.params['acme_version'] = 1
module.deprecate("The option 'acme_version' will be required from community.crypto 2.0.0 on",
version='2.0.0', collection_name='community.crypto')
if module.params['acme_directory'] is None:
module.params['acme_directory'] = 'https://acme-staging.api.letsencrypt.org/directory'
module.deprecate("The option 'acme_directory' will be required from community.crypto 2.0.0 on",
version='2.0.0', collection_name='community.crypto')
if needs_acme_v2 and module.params['acme_version'] < 2:
module.fail_json(msg='The {0} module requires the ACME v2 protocol!'.format(module._name))
# AnsibleModule() changes the locale, so change it back to C because we rely
# on datetime.datetime.strptime() when parsing certificate dates.
locale.setlocale(locale.LC_ALL, 'C')
return module_backend

View File

@@ -0,0 +1,369 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2016 Michael Gruener <michael.gruener@chaosmoon.net>
# Copyright: (c) 2021 Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import base64
import binascii
import datetime
import os
import sys
from ansible.module_utils._text import to_bytes, to_native
from ansible_collections.community.crypto.plugins.module_utils.acme.backends import (
CryptoBackend,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.certificates import (
ChainMatcher,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.errors import BackendException
from ansible_collections.community.crypto.plugins.module_utils.acme.io import read_file
from ansible_collections.community.crypto.plugins.module_utils.acme.utils import nopad_b64
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
parse_name_field,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
cryptography_name_to_oid,
)
try:
import cryptography
import cryptography.hazmat.backends
import cryptography.hazmat.primitives.hashes
import cryptography.hazmat.primitives.hmac
import cryptography.hazmat.primitives.asymmetric.ec
import cryptography.hazmat.primitives.asymmetric.padding
import cryptography.hazmat.primitives.asymmetric.rsa
import cryptography.hazmat.primitives.asymmetric.utils
import cryptography.hazmat.primitives.serialization
import cryptography.x509
import cryptography.x509.oid
from distutils.version import LooseVersion
CRYPTOGRAPHY_VERSION = cryptography.__version__
HAS_CURRENT_CRYPTOGRAPHY = (LooseVersion(CRYPTOGRAPHY_VERSION) >= LooseVersion('1.5'))
if HAS_CURRENT_CRYPTOGRAPHY:
_cryptography_backend = cryptography.hazmat.backends.default_backend()
except Exception as dummy:
HAS_CURRENT_CRYPTOGRAPHY = False
CRYPTOGRAPHY_VERSION = None
if sys.version_info[0] >= 3:
# Python 3 (and newer)
def _count_bytes(n):
return (n.bit_length() + 7) // 8 if n > 0 else 0
def _convert_int_to_bytes(count, no):
return no.to_bytes(count, byteorder='big')
def _pad_hex(n, digits):
res = hex(n)[2:]
if len(res) < digits:
res = '0' * (digits - len(res)) + res
return res
else:
# Python 2
def _count_bytes(n):
if n <= 0:
return 0
h = '%x' % n
return (len(h) + 1) // 2
def _convert_int_to_bytes(count, n):
h = '%x' % n
if len(h) > 2 * count:
raise Exception('Number {1} needs more than {0} bytes!'.format(count, n))
return ('0' * (2 * count - len(h)) + h).decode('hex')
def _pad_hex(n, digits):
h = '%x' % n
if len(h) < digits:
h = '0' * (digits - len(h)) + h
return h
class CryptographyChainMatcher(ChainMatcher):
@staticmethod
def _parse_key_identifier(key_identifier, name, criterium_idx, module):
if key_identifier:
try:
return binascii.unhexlify(key_identifier.replace(':', ''))
except Exception:
if criterium_idx is None:
module.warn('Criterium has invalid {0} value. Ignoring criterium.'.format(name))
else:
module.warn('Criterium {0} in select_chain has invalid {1} value. '
'Ignoring criterium.'.format(criterium_idx, name))
return None
def __init__(self, criterium, module):
self.criterium = criterium
self.test_certificates = criterium.test_certificates
self.subject = []
self.issuer = []
if criterium.subject:
self.subject = [
(cryptography_name_to_oid(k), to_native(v)) for k, v in parse_name_field(criterium.subject)
]
if criterium.issuer:
self.issuer = [
(cryptography_name_to_oid(k), to_native(v)) for k, v in parse_name_field(criterium.issuer)
]
self.subject_key_identifier = CryptographyChainMatcher._parse_key_identifier(
criterium.subject_key_identifier, 'subject_key_identifier', criterium.index, module)
self.authority_key_identifier = CryptographyChainMatcher._parse_key_identifier(
criterium.authority_key_identifier, 'authority_key_identifier', criterium.index, module)
def _match_subject(self, x509_subject, match_subject):
for oid, value in match_subject:
found = False
for attribute in x509_subject:
if attribute.oid == oid and value == to_native(attribute.value):
found = True
break
if not found:
return False
return True
def match(self, certificate):
'''
Check whether an alternate chain matches the specified criterium.
'''
chain = certificate.chain
if self.test_certificates == 'last':
chain = chain[-1:]
elif self.test_certificates == 'first':
chain = chain[:1]
for cert in chain:
try:
x509 = cryptography.x509.load_pem_x509_certificate(to_bytes(cert), cryptography.hazmat.backends.default_backend())
matches = True
if not self._match_subject(x509.subject, self.subject):
matches = False
if not self._match_subject(x509.issuer, self.issuer):
matches = False
if self.subject_key_identifier:
try:
ext = x509.extensions.get_extension_for_class(cryptography.x509.SubjectKeyIdentifier)
if self.subject_key_identifier != ext.value.digest:
matches = False
except cryptography.x509.ExtensionNotFound:
matches = False
if self.authority_key_identifier:
try:
ext = x509.extensions.get_extension_for_class(cryptography.x509.AuthorityKeyIdentifier)
if self.authority_key_identifier != ext.value.key_identifier:
matches = False
except cryptography.x509.ExtensionNotFound:
matches = False
if matches:
return True
except Exception as e:
self.module.warn('Error while loading certificate {0}: {1}'.format(cert, e))
return False
class CryptographyBackend(CryptoBackend):
def __init__(self, module):
super(CryptographyBackend, self).__init__(module)
def parse_key(self, key_file=None, key_content=None):
'''
Parses an RSA or Elliptic Curve key file in PEM format and returns a pair
(error, key_data).
'''
# If key_content isn't given, read key_file
if key_content is None:
key_content = read_file(key_file)
else:
key_content = to_bytes(key_content)
# Parse key
try:
key = cryptography.hazmat.primitives.serialization.load_pem_private_key(key_content, password=None, backend=_cryptography_backend)
except Exception as e:
return 'error while loading key: {0}'.format(e), None
if isinstance(key, cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey):
pk = key.public_key().public_numbers()
return None, {
'key_obj': key,
'type': 'rsa',
'alg': 'RS256',
'jwk': {
"kty": "RSA",
"e": nopad_b64(_convert_int_to_bytes(_count_bytes(pk.e), pk.e)),
"n": nopad_b64(_convert_int_to_bytes(_count_bytes(pk.n), pk.n)),
},
'hash': 'sha256',
}
elif isinstance(key, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey):
pk = key.public_key().public_numbers()
if pk.curve.name == 'secp256r1':
bits = 256
alg = 'ES256'
hashalg = 'sha256'
point_size = 32
curve = 'P-256'
elif pk.curve.name == 'secp384r1':
bits = 384
alg = 'ES384'
hashalg = 'sha384'
point_size = 48
curve = 'P-384'
elif pk.curve.name == 'secp521r1':
# Not yet supported on Let's Encrypt side, see
# https://github.com/letsencrypt/boulder/issues/2217
bits = 521
alg = 'ES512'
hashalg = 'sha512'
point_size = 66
curve = 'P-521'
else:
return 'unknown elliptic curve: {0}'.format(pk.curve.name), {}
num_bytes = (bits + 7) // 8
return None, {
'key_obj': key,
'type': 'ec',
'alg': alg,
'jwk': {
"kty": "EC",
"crv": curve,
"x": nopad_b64(_convert_int_to_bytes(num_bytes, pk.x)),
"y": nopad_b64(_convert_int_to_bytes(num_bytes, pk.y)),
},
'hash': hashalg,
'point_size': point_size,
}
else:
return 'unknown key type "{0}"'.format(type(key)), {}
def sign(self, payload64, protected64, key_data):
sign_payload = "{0}.{1}".format(protected64, payload64).encode('utf8')
if 'mac_obj' in key_data:
mac = key_data['mac_obj']()
mac.update(sign_payload)
signature = mac.finalize()
elif isinstance(key_data['key_obj'], cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey):
padding = cryptography.hazmat.primitives.asymmetric.padding.PKCS1v15()
hashalg = cryptography.hazmat.primitives.hashes.SHA256
signature = key_data['key_obj'].sign(sign_payload, padding, hashalg())
elif isinstance(key_data['key_obj'], cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey):
if key_data['hash'] == 'sha256':
hashalg = cryptography.hazmat.primitives.hashes.SHA256
elif key_data['hash'] == 'sha384':
hashalg = cryptography.hazmat.primitives.hashes.SHA384
elif key_data['hash'] == 'sha512':
hashalg = cryptography.hazmat.primitives.hashes.SHA512
ecdsa = cryptography.hazmat.primitives.asymmetric.ec.ECDSA(hashalg())
r, s = cryptography.hazmat.primitives.asymmetric.utils.decode_dss_signature(key_data['key_obj'].sign(sign_payload, ecdsa))
rr = _pad_hex(r, 2 * key_data['point_size'])
ss = _pad_hex(s, 2 * key_data['point_size'])
signature = binascii.unhexlify(rr) + binascii.unhexlify(ss)
return {
"protected": protected64,
"payload": payload64,
"signature": nopad_b64(signature),
}
def create_mac_key(self, alg, key):
'''Create a MAC key.'''
if alg == 'HS256':
hashalg = cryptography.hazmat.primitives.hashes.SHA256
hashbytes = 32
elif alg == 'HS384':
hashalg = cryptography.hazmat.primitives.hashes.SHA384
hashbytes = 48
elif alg == 'HS512':
hashalg = cryptography.hazmat.primitives.hashes.SHA512
hashbytes = 64
else:
raise BackendException('Unsupported MAC key algorithm for cryptography backend: {0}'.format(alg))
key_bytes = base64.urlsafe_b64decode(key)
if len(key_bytes) < hashbytes:
raise BackendException(
'{0} key must be at least {1} bytes long (after Base64 decoding)'.format(alg, hashbytes))
return {
'mac_obj': lambda: cryptography.hazmat.primitives.hmac.HMAC(
key_bytes,
hashalg(),
_cryptography_backend),
'type': 'hmac',
'alg': alg,
'jwk': {
'kty': 'oct',
'k': key,
},
}
def get_csr_identifiers(self, csr_filename=None, csr_content=None):
'''
Return a set of requested identifiers (CN and SANs) for the CSR.
Each identifier is a pair (type, identifier), where type is either
'dns' or 'ip'.
'''
identifiers = set([])
if csr_content is None:
csr_content = read_file(csr_filename)
else:
csr_content = to_bytes(csr_content)
csr = cryptography.x509.load_pem_x509_csr(csr_content, _cryptography_backend)
for sub in csr.subject:
if sub.oid == cryptography.x509.oid.NameOID.COMMON_NAME:
identifiers.add(('dns', sub.value))
for extension in csr.extensions:
if extension.oid == cryptography.x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME:
for name in extension.value:
if isinstance(name, cryptography.x509.DNSName):
identifiers.add(('dns', name.value))
elif isinstance(name, cryptography.x509.IPAddress):
identifiers.add(('ip', name.value.compressed))
else:
raise BackendException('Found unsupported SAN identifier {0}'.format(name))
return identifiers
def get_cert_days(self, cert_filename=None, cert_content=None, now=None):
'''
Return the days the certificate in cert_filename remains valid and -1
if the file was not found. If cert_filename contains more than one
certificate, only the first one will be considered.
If now is not specified, datetime.datetime.now() is used.
'''
if cert_filename is not None:
cert_content = None
if os.path.exists(cert_filename):
cert_content = read_file(cert_filename)
else:
cert_content = to_bytes(cert_content)
if cert_content is None:
return -1
try:
cert = cryptography.x509.load_pem_x509_certificate(cert_content, _cryptography_backend)
except Exception as e:
if cert_filename is None:
raise BackendException('Cannot parse certificate: {0}'.format(e))
raise BackendException('Cannot parse certificate {0}: {1}'.format(cert_filename, e))
if now is None:
now = datetime.datetime.now()
return (cert.not_valid_after - now).days
def create_chain_matcher(self, criterium):
'''
Given a Criterium object, creates a ChainMatcher object.
'''
return CryptographyChainMatcher(criterium, self.module)

View File

@@ -0,0 +1,295 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2016 Michael Gruener <michael.gruener@chaosmoon.net>
# Copyright: (c) 2021 Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import base64
import binascii
import datetime
import os
import re
import tempfile
import traceback
from ansible.module_utils._text import to_native, to_text, to_bytes
from ansible_collections.community.crypto.plugins.module_utils.acme.backends import (
CryptoBackend,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.errors import (
BackendException,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.utils import nopad_b64
from ansible_collections.community.crypto.plugins.module_utils.compat import ipaddress as compat_ipaddress
_OPENSSL_ENVIRONMENT_UPDATE = dict(LANG='C', LC_ALL='C', LC_MESSAGES='C', LC_CTYPE='C')
class OpenSSLCLIBackend(CryptoBackend):
def __init__(self, module, openssl_binary=None):
super(OpenSSLCLIBackend, self).__init__(module)
if openssl_binary is None:
openssl_binary = module.get_bin_path('openssl', True)
self.openssl_binary = openssl_binary
def parse_key(self, key_file=None, key_content=None):
'''
Parses an RSA or Elliptic Curve key file in PEM format and returns a pair
(error, key_data).
'''
# If key_file isn't given, but key_content, write that to a temporary file
if key_file is None:
fd, tmpsrc = tempfile.mkstemp()
self.module.add_cleanup_file(tmpsrc) # Ansible will delete the file on exit
f = os.fdopen(fd, 'wb')
try:
f.write(key_content.encode('utf-8'))
key_file = tmpsrc
except Exception as err:
try:
f.close()
except Exception as dummy:
pass
raise BackendException("failed to create temporary content file: %s" % to_native(err), exception=traceback.format_exc())
f.close()
# Parse key
account_key_type = None
with open(key_file, "rt") as f:
for line in f:
m = re.match(r"^\s*-{5,}BEGIN\s+(EC|RSA)\s+PRIVATE\s+KEY-{5,}\s*$", line)
if m is not None:
account_key_type = m.group(1).lower()
break
if account_key_type is None:
# This happens for example if openssl_privatekey created this key
# (as opposed to the OpenSSL binary). For now, we assume this is
# an RSA key.
# FIXME: add some kind of auto-detection
account_key_type = "rsa"
if account_key_type not in ("rsa", "ec"):
return 'unknown key type "%s"' % account_key_type, {}
openssl_keydump_cmd = [self.openssl_binary, account_key_type, "-in", key_file, "-noout", "-text"]
dummy, out, dummy = self.module.run_command(
openssl_keydump_cmd, check_rc=True, environ_update=_OPENSSL_ENVIRONMENT_UPDATE)
if account_key_type == 'rsa':
pub_hex, pub_exp = re.search(
r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)",
to_text(out, errors='surrogate_or_strict'), re.MULTILINE | re.DOTALL).groups()
pub_exp = "{0:x}".format(int(pub_exp))
if len(pub_exp) % 2:
pub_exp = "0{0}".format(pub_exp)
return None, {
'key_file': key_file,
'type': 'rsa',
'alg': 'RS256',
'jwk': {
"kty": "RSA",
"e": nopad_b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
"n": nopad_b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),
},
'hash': 'sha256',
}
elif account_key_type == 'ec':
pub_data = re.search(
r"pub:\s*\n\s+04:([a-f0-9\:\s]+?)\nASN1 OID: (\S+)(?:\nNIST CURVE: (\S+))?",
to_text(out, errors='surrogate_or_strict'), re.MULTILINE | re.DOTALL)
if pub_data is None:
return 'cannot parse elliptic curve key', {}
pub_hex = binascii.unhexlify(re.sub(r"(\s|:)", "", pub_data.group(1)).encode("utf-8"))
asn1_oid_curve = pub_data.group(2).lower()
nist_curve = pub_data.group(3).lower() if pub_data.group(3) else None
if asn1_oid_curve == 'prime256v1' or nist_curve == 'p-256':
bits = 256
alg = 'ES256'
hashalg = 'sha256'
point_size = 32
curve = 'P-256'
elif asn1_oid_curve == 'secp384r1' or nist_curve == 'p-384':
bits = 384
alg = 'ES384'
hashalg = 'sha384'
point_size = 48
curve = 'P-384'
elif asn1_oid_curve == 'secp521r1' or nist_curve == 'p-521':
# Not yet supported on Let's Encrypt side, see
# https://github.com/letsencrypt/boulder/issues/2217
bits = 521
alg = 'ES512'
hashalg = 'sha512'
point_size = 66
curve = 'P-521'
else:
return 'unknown elliptic curve: %s / %s' % (asn1_oid_curve, nist_curve), {}
num_bytes = (bits + 7) // 8
if len(pub_hex) != 2 * num_bytes:
return 'bad elliptic curve point (%s / %s)' % (asn1_oid_curve, nist_curve), {}
return None, {
'key_file': key_file,
'type': 'ec',
'alg': alg,
'jwk': {
"kty": "EC",
"crv": curve,
"x": nopad_b64(pub_hex[:num_bytes]),
"y": nopad_b64(pub_hex[num_bytes:]),
},
'hash': hashalg,
'point_size': point_size,
}
def sign(self, payload64, protected64, key_data):
sign_payload = "{0}.{1}".format(protected64, payload64).encode('utf8')
if key_data['type'] == 'hmac':
hex_key = to_native(binascii.hexlify(base64.urlsafe_b64decode(key_data['jwk']['k'])))
cmd_postfix = ["-mac", "hmac", "-macopt", "hexkey:{0}".format(hex_key), "-binary"]
else:
cmd_postfix = ["-sign", key_data['key_file']]
openssl_sign_cmd = [self.openssl_binary, "dgst", "-{0}".format(key_data['hash'])] + cmd_postfix
dummy, out, dummy = self.module.run_command(
openssl_sign_cmd, data=sign_payload, check_rc=True, binary_data=True, environ_update=_OPENSSL_ENVIRONMENT_UPDATE)
if key_data['type'] == 'ec':
dummy, der_out, dummy = self.module.run_command(
[self.openssl_binary, "asn1parse", "-inform", "DER"],
data=out, binary_data=True, environ_update=_OPENSSL_ENVIRONMENT_UPDATE)
expected_len = 2 * key_data['point_size']
sig = re.findall(
r"prim:\s+INTEGER\s+:([0-9A-F]{1,%s})\n" % expected_len,
to_text(der_out, errors='surrogate_or_strict'))
if len(sig) != 2:
raise BackendException(
"failed to generate Elliptic Curve signature; cannot parse DER output: {0}".format(
to_text(der_out, errors='surrogate_or_strict')))
sig[0] = (expected_len - len(sig[0])) * '0' + sig[0]
sig[1] = (expected_len - len(sig[1])) * '0' + sig[1]
out = binascii.unhexlify(sig[0]) + binascii.unhexlify(sig[1])
return {
"protected": protected64,
"payload": payload64,
"signature": nopad_b64(to_bytes(out)),
}
def create_mac_key(self, alg, key):
'''Create a MAC key.'''
if alg == 'HS256':
hashalg = 'sha256'
hashbytes = 32
elif alg == 'HS384':
hashalg = 'sha384'
hashbytes = 48
elif alg == 'HS512':
hashalg = 'sha512'
hashbytes = 64
else:
raise BackendException('Unsupported MAC key algorithm for OpenSSL backend: {0}'.format(alg))
key_bytes = base64.urlsafe_b64decode(key)
if len(key_bytes) < hashbytes:
raise BackendException(
'{0} key must be at least {1} bytes long (after Base64 decoding)'.format(alg, hashbytes))
return {
'type': 'hmac',
'alg': alg,
'jwk': {
'kty': 'oct',
'k': key,
},
'hash': hashalg,
}
@staticmethod
def _normalize_ip(ip):
try:
return to_native(compat_ipaddress.ip_address(to_text(ip)).compressed)
except ValueError:
# We don't want to error out on something IPAddress() can't parse
return ip
def get_csr_identifiers(self, csr_filename=None, csr_content=None):
'''
Return a set of requested identifiers (CN and SANs) for the CSR.
Each identifier is a pair (type, identifier), where type is either
'dns' or 'ip'.
'''
filename = csr_filename
data = None
if csr_content is not None:
filename = '-'
data = csr_content.encode('utf-8')
openssl_csr_cmd = [self.openssl_binary, "req", "-in", filename, "-noout", "-text"]
dummy, out, dummy = self.module.run_command(
openssl_csr_cmd, data=data, check_rc=True, binary_data=True, environ_update=_OPENSSL_ENVIRONMENT_UPDATE)
identifiers = set([])
common_name = re.search(r"Subject:.* CN\s?=\s?([^\s,;/]+)", to_text(out, errors='surrogate_or_strict'))
if common_name is not None:
identifiers.add(('dns', common_name.group(1)))
subject_alt_names = re.search(
r"X509v3 Subject Alternative Name: (?:critical)?\n +([^\n]+)\n",
to_text(out, errors='surrogate_or_strict'), re.MULTILINE | re.DOTALL)
if subject_alt_names is not None:
for san in subject_alt_names.group(1).split(", "):
if san.lower().startswith("dns:"):
identifiers.add(('dns', san[4:]))
elif san.lower().startswith("ip:"):
identifiers.add(('ip', self._normalize_ip(san[3:])))
elif san.lower().startswith("ip address:"):
identifiers.add(('ip', self._normalize_ip(san[11:])))
else:
raise BackendException('Found unsupported SAN identifier "{0}"'.format(san))
return identifiers
def get_cert_days(self, cert_filename=None, cert_content=None, now=None):
'''
Return the days the certificate in cert_filename remains valid and -1
if the file was not found. If cert_filename contains more than one
certificate, only the first one will be considered.
If now is not specified, datetime.datetime.now() is used.
'''
filename = cert_filename
data = None
if cert_content is not None:
filename = '-'
data = cert_content.encode('utf-8')
cert_filename_suffix = ''
elif cert_filename is not None:
if not os.path.exists(cert_filename):
return -1
cert_filename_suffix = ' in {0}'.format(cert_filename)
else:
return -1
openssl_cert_cmd = [self.openssl_binary, "x509", "-in", filename, "-noout", "-text"]
dummy, out, dummy = self.module.run_command(
openssl_cert_cmd, data=data, check_rc=True, binary_data=True, environ_update=_OPENSSL_ENVIRONMENT_UPDATE)
try:
not_after_str = re.search(r"\s+Not After\s*:\s+(.*)", to_text(out, errors='surrogate_or_strict')).group(1)
not_after = datetime.datetime.strptime(not_after_str, '%b %d %H:%M:%S %Y %Z')
except AttributeError:
raise BackendException("No 'Not after' date found{0}".format(cert_filename_suffix))
except ValueError:
raise BackendException("Failed to parse 'Not after' date{0}".format(cert_filename_suffix))
if now is None:
now = datetime.datetime.now()
return (not_after - now).days
def create_chain_matcher(self, criterium):
'''
Given a Criterium object, creates a ChainMatcher object.
'''
raise BackendException('Alternate chain matching can only be used with the "cryptography" backend.')

View File

@@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2016 Michael Gruener <michael.gruener@chaosmoon.net>
# Copyright: (c) 2021 Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import abc
from ansible.module_utils import six
@six.add_metaclass(abc.ABCMeta)
class CryptoBackend(object):
def __init__(self, module):
self.module = module
@abc.abstractmethod
def parse_key(self, key_file=None, key_content=None):
'''
Parses an RSA or Elliptic Curve key file in PEM format and returns a pair
(error, key_data).
'''
@abc.abstractmethod
def sign(self, payload64, protected64, key_data):
pass
@abc.abstractmethod
def create_mac_key(self, alg, key):
'''Create a MAC key.'''
@abc.abstractmethod
def get_csr_identifiers(self, csr_filename=None, csr_content=None):
'''
Return a set of requested identifiers (CN and SANs) for the CSR.
Each identifier is a pair (type, identifier), where type is either
'dns' or 'ip'.
'''
@abc.abstractmethod
def get_cert_days(self, cert_filename=None, cert_content=None, now=None):
'''
Return the days the certificate in cert_filename remains valid and -1
if the file was not found. If cert_filename contains more than one
certificate, only the first one will be considered.
If now is not specified, datetime.datetime.now() is used.
'''
@abc.abstractmethod
def create_chain_matcher(self, criterium):
'''
Given a Criterium object, creates a ChainMatcher object.
'''

View File

@@ -0,0 +1,128 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2016 Michael Gruener <michael.gruener@chaosmoon.net>
# Copyright: (c) 2021 Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import abc
from ansible.module_utils import six
from ansible_collections.community.crypto.plugins.module_utils.acme.errors import (
ModuleFailException,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.utils import (
der_to_pem,
nopad_b64,
process_links,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import (
split_pem_list,
)
class CertificateChain(object):
'''
Download and parse the certificate chain.
https://tools.ietf.org/html/rfc8555#section-7.4.2
'''
def __init__(self, url):
self.url = url
self.cert = None
self.chain = []
self.alternates = []
@classmethod
def download(cls, client, url):
content, info = client.get_request(url, parse_json_result=False, headers={'Accept': 'application/pem-certificate-chain'})
if not content or not info['content-type'].startswith('application/pem-certificate-chain'):
raise ModuleFailException(
"Cannot download certificate chain from {0}, as content type is not application/pem-certificate-chain: {1} (headers: {2})".format(
url, content, info))
result = cls(url)
# Parse data
certs = split_pem_list(content.decode('utf-8'), keep_inbetween=True)
if certs:
result.cert = certs[0]
result.chain = certs[1:]
process_links(info, lambda link, relation: result._process_links(client, link, relation))
if result.cert is None:
raise ModuleFailException("Failed to parse certificate chain download from {0}: {1} (headers: {2})".format(url, content, info))
return result
def _process_links(self, client, link, relation):
if relation == 'up':
# Process link-up headers if there was no chain in reply
if not self.chain:
chain_result, chain_info = client.get_request(link, parse_json_result=False)
if chain_info['status'] in [200, 201]:
self.chain.append(der_to_pem(chain_result))
elif relation == 'alternate':
self.alternates.append(link)
def to_json(self):
cert = self.cert.encode('utf8')
chain = ('\n'.join(self.chain)).encode('utf8')
return {
'cert': cert,
'chain': chain,
'full_chain': cert + chain,
}
class Criterium(object):
def __init__(self, criterium, index=None):
self.index = index
self.test_certificates = criterium['test_certificates']
self.subject = criterium['subject']
self.issuer = criterium['issuer']
self.subject_key_identifier = criterium['subject_key_identifier']
self.authority_key_identifier = criterium['authority_key_identifier']
@six.add_metaclass(abc.ABCMeta)
class ChainMatcher(object):
@abc.abstractmethod
def match(self, certificate):
'''
Check whether a certificate chain (CertificateChain instance) matches.
'''
def retrieve_acme_v1_certificate(client, csr_der):
'''
Create a new certificate based on the CSR (ACME v1 protocol).
Return the certificate object as dict
https://tools.ietf.org/html/draft-ietf-acme-acme-02#section-6.5
'''
new_cert = {
"resource": "new-cert",
"csr": nopad_b64(csr_der),
}
result, info = client.send_signed_request(
client.directory['new-cert'], new_cert, error_msg='Failed to receive certificate', expected_status_codes=[200, 201])
cert = CertificateChain(info['location'])
cert.cert = der_to_pem(result)
def f(link, relation):
if relation == 'up':
chain_result, chain_info = client.get_request(link, parse_json_result=False)
if chain_info['status'] in [200, 201]:
del cert.chain[:]
cert.chain.append(der_to_pem(chain_result))
process_links(info, f)
return cert

View File

@@ -0,0 +1,296 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2016 Michael Gruener <michael.gruener@chaosmoon.net>
# Copyright: (c) 2021 Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import base64
import hashlib
import json
import re
import time
from ansible.module_utils._text import to_bytes
from ansible_collections.community.crypto.plugins.module_utils.compat import ipaddress as compat_ipaddress
from ansible_collections.community.crypto.plugins.module_utils.acme.utils import (
nopad_b64,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.errors import (
format_error_problem,
ACMEProtocolException,
ModuleFailException,
)
def create_key_authorization(client, token):
'''
Returns the key authorization for the given token
https://tools.ietf.org/html/rfc8555#section-8.1
'''
accountkey_json = json.dumps(client.account_jwk, sort_keys=True, separators=(',', ':'))
thumbprint = nopad_b64(hashlib.sha256(accountkey_json.encode('utf8')).digest())
return "{0}.{1}".format(token, thumbprint)
def combine_identifier(identifier_type, identifier):
return '{type}:{identifier}'.format(type=identifier_type, identifier=identifier)
def split_identifier(identifier):
parts = identifier.split(':', 1)
if len(parts) != 2:
raise ModuleFailException(
'Identifier "{identifier}" is not of the form <type>:<identifier>'.format(identifier=identifier))
return parts
class Challenge(object):
def __init__(self, data, url):
self.data = data
self.type = data['type']
self.url = url
self.status = data['status']
self.token = data.get('token')
@classmethod
def from_json(cls, client, data, url=None):
return cls(data, url or (data['uri'] if client.version == 1 else data['url']))
def call_validate(self, client):
challenge_response = {}
if client.version == 1:
token = re.sub(r"[^A-Za-z0-9_\-]", "_", self.token)
key_authorization = create_key_authorization(client, token)
challenge_response['resource'] = 'challenge'
challenge_response['keyAuthorization'] = key_authorization
challenge_response['type'] = self.type
client.send_signed_request(
self.url,
challenge_response,
error_msg='Failed to validate challenge',
expected_status_codes=[200, 202],
)
def to_json(self):
return self.data.copy()
def get_validation_data(self, client, identifier_type, identifier):
token = re.sub(r"[^A-Za-z0-9_\-]", "_", self.token)
key_authorization = create_key_authorization(client, token)
if self.type == 'http-01':
# https://tools.ietf.org/html/rfc8555#section-8.3
return {
'resource': '.well-known/acme-challenge/{token}'.format(token=token),
'resource_value': key_authorization,
}
if self.type == 'dns-01':
if identifier_type != 'dns':
return None
# https://tools.ietf.org/html/rfc8555#section-8.4
resource = '_acme-challenge'
value = nopad_b64(hashlib.sha256(to_bytes(key_authorization)).digest())
record = (resource + identifier[1:]) if identifier.startswith('*.') else '{0}.{1}'.format(resource, identifier)
return {
'resource': resource,
'resource_value': value,
'record': record,
}
if self.type == 'tls-alpn-01':
# https://www.rfc-editor.org/rfc/rfc8737.html#section-3
if identifier_type == 'ip':
# IPv4/IPv6 address: use reverse mapping (RFC1034, RFC3596)
resource = compat_ipaddress.ip_address(identifier).reverse_pointer
if not resource.endswith('.'):
resource += '.'
else:
resource = identifier
value = base64.b64encode(hashlib.sha256(to_bytes(key_authorization)).digest())
return {
'resource': resource,
'resource_original': combine_identifier(identifier_type, identifier),
'resource_value': value,
}
# Unknown challenge type: ignore
return None
class Authorization(object):
def _setup(self, client, data):
data['uri'] = self.url
self.data = data
self.challenges = [Challenge.from_json(client, challenge) for challenge in data['challenges']]
if client.version == 1 and 'status' not in data:
# https://tools.ietf.org/html/draft-ietf-acme-acme-02#section-6.1.2
# "status (required, string): ...
# If this field is missing, then the default value is "pending"."
self.status = 'pending'
else:
self.status = data['status']
self.identifier = data['identifier']['value']
self.identifier_type = data['identifier']['type']
if data.get('wildcard', False):
self.identifier = '*.{0}'.format(self.identifier)
def __init__(self, url):
self.url = url
self.data = None
self.challenges = []
self.status = None
self.identifier_type = None
self.identifier = None
@classmethod
def from_json(cls, client, data, url):
result = cls(url)
result._setup(client, data)
return result
@classmethod
def from_url(cls, client, url):
result = cls(url)
result.refresh(client)
return result
@classmethod
def create(cls, client, identifier_type, identifier):
'''
Create a new authorization for the given identifier.
Return the authorization object of the new authorization
https://tools.ietf.org/html/draft-ietf-acme-acme-02#section-6.4
'''
new_authz = {
"identifier": {
"type": identifier_type,
"value": identifier,
},
}
if client.version == 1:
url = client.directory['new-authz']
new_authz["resource"] = "new-authz"
else:
if 'newAuthz' not in client.directory.directory:
raise ACMEProtocolException('ACME endpoint does not support pre-authorization')
url = client.directory['newAuthz']
result, info = client.send_signed_request(
url, new_authz, error_msg='Failed to request challenges', expected_status_codes=[200, 201])
return cls.from_json(client, result, info['location'])
@property
def combined_identifier(self):
return combine_identifier(self.identifier_type, self.identifier)
def to_json(self):
return self.data.copy()
def refresh(self, client):
result, dummy = client.get_request(self.url)
changed = self.data != result
self._setup(client, result)
return changed
def get_challenge_data(self, client):
'''
Returns a dict with the data for all proposed (and supported) challenges
of the given authorization.
'''
data = {}
for challenge in self.challenges:
validation_data = challenge.get_validation_data(client, self.identifier_type, self.identifier)
if validation_data is not None:
data[challenge.type] = validation_data
return data
def raise_error(self, error_msg):
'''
Aborts with a specific error for a challenge.
'''
error_details = []
# multiple challenges could have failed at this point, gather error
# details for all of them before failing
for challenge in self.challenges:
if challenge.status == 'invalid':
msg = 'Challenge {type}'.format(type=challenge.type)
if 'error' in challenge.data:
msg = '{msg}: {problem}'.format(
msg=msg,
problem=format_error_problem(challenge.data['error'], subproblem_prefix='{0}.'.format(type)),
)
error_details.append(msg)
raise ACMEProtocolException(
'Failed to validate challenge for {identifier}: {error}. {details}'.format(
identifier=self.combined_identifier,
error=error_msg,
details='; '.join(error_details),
),
identifier=self.combined_identifier,
authorization=self.data,
)
def find_challenge(self, challenge_type):
for challenge in self.challenges:
if challenge_type == challenge.type:
return challenge
return None
def wait_for_validation(self, client, callenge_type):
while True:
self.refresh(client)
if self.status in ['valid', 'invalid', 'revoked']:
break
time.sleep(2)
if self.status == 'invalid':
self.raise_error('Status is "invalid"')
return self.status == 'valid'
def call_validate(self, client, challenge_type, wait=True):
'''
Validate the authorization provided in the auth dict. Returns True
when the validation was successful and False when it was not.
'''
challenge = self.find_challenge(challenge_type)
if challenge is None:
raise ModuleFailException('Found no challenge of type "{challenge}" for identifier {identifier}!'.format(
challenge=challenge_type,
identifier=self.combined_identifier,
))
challenge.call_validate(client)
if not wait:
return self.status == 'valid'
return self.wait_for_validation(client, challenge_type)
def deactivate(self, client):
'''
Deactivates this authorization.
https://community.letsencrypt.org/t/authorization-deactivation/19860/2
https://tools.ietf.org/html/rfc8555#section-7.5.2
'''
if self.status != 'valid':
return
authz_deactivate = {
'status': 'deactivated'
}
if client.version == 1:
authz_deactivate['resource'] = 'authz'
result, info = client.send_signed_request(self.url, authz_deactivate, fail_on_error=False)
if 200 <= info['status'] < 300 and result.get('status') == 'deactivated':
self.status = 'deactivated'
return True
return False

View File

@@ -0,0 +1,117 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2016 Michael Gruener <michael.gruener@chaosmoon.net>
# Copyright: (c) 2021 Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
def format_error_problem(problem, subproblem_prefix=''):
if 'title' in problem:
msg = 'Error "{title}" ({type})'.format(
type=problem['type'],
title=problem['title'],
)
else:
msg = 'Error {type}'.format(type=problem['type'])
if 'detail' in problem:
msg += ': "{detail}"'.format(detail=problem['detail'])
subproblems = problem.get('subproblems')
if subproblems is not None:
msg = '{msg} Subproblems:'.format(msg=msg)
for index, problem in enumerate(subproblems):
index_str = '{prefix}{index}'.format(prefix=subproblem_prefix, index=index)
msg = '{msg}\n({index}) {problem}.'.format(
msg=msg,
index=index_str,
problem=format_error_problem(problem, subproblem_prefix='{0}.'.format(index_str)),
)
return msg
class ModuleFailException(Exception):
'''
If raised, module.fail_json() will be called with the given parameters after cleanup.
'''
def __init__(self, msg, **args):
super(ModuleFailException, self).__init__(self, msg)
self.msg = msg
self.module_fail_args = args
def do_fail(self, module, **arguments):
module.fail_json(msg=self.msg, other=self.module_fail_args, **arguments)
class ACMEProtocolException(ModuleFailException):
def __init__(self, module, msg=None, info=None, response=None, content=None, content_json=None):
# Try to get hold of content, if response is given and content is not provided
if content is None and content_json is None and response is not None:
try:
content = response.read()
except AttributeError:
content = info.pop('body', None)
# Try to get hold of JSON decoded content, when content is given and JSON not provided
if content_json is None and content is not None:
try:
content_json = module.from_json(content.decode('utf8'))
except Exception:
pass
extras = dict()
url = info['url'] if info else None
code = info['status'] if info else None
extras['http_url'] = url
extras['http_status'] = code
if msg is None:
msg = 'ACME request failed'
add_msg = ''
if code >= 400 and content_json is not None and 'type' in content_json:
if 'status' in content_json and content_json['status'] != code:
code = 'status {problem_code} (HTTP status: {http_code})'.format(http_code=code, problem_code=content_json['status'])
else:
code = 'status {problem_code}'.format(problem_code=code)
add_msg = ' {problem}.'.format(problem=format_error_problem(content_json))
subproblems = content_json.pop('subproblems', None)
extras['problem'] = content_json
extras['subproblems'] = subproblems or []
if subproblems is not None:
add_msg = '{add_msg} Subproblems:'.format(add_msg=add_msg)
for index, problem in enumerate(subproblems):
add_msg = '{add_msg}\n({index}) {problem}.'.format(
add_msg=add_msg,
index=index,
problem=format_error_problem(problem, subproblem_prefix='{0}.'.format(index)),
)
else:
code = 'HTTP status {code}'.format(code=code)
if content_json is not None:
add_msg = ' The JSON error result: {content}'.format(content=content_json)
elif content is not None:
add_msg = ' The raw error result: {content}'.format(content=content.decode('utf-8'))
super(ACMEProtocolException, self).__init__(
'{msg} for {url} with {code}.{add_msg}'.format(msg=msg, url=url, code=code, add_msg=add_msg),
**extras
)
self.problem = {}
self.subproblems = []
for k, v in extras.items():
setattr(self, k, v)
class BackendException(ModuleFailException):
pass
class NetworkException(ModuleFailException):
pass
class KeyParsingError(ModuleFailException):
pass

View File

@@ -0,0 +1,86 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2013, Romeo Theriault <romeot () hawaii.edu>
# Copyright: (c) 2016 Michael Gruener <michael.gruener@chaosmoon.net>
# Copyright: (c) 2021 Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import os
import shutil
import tempfile
import traceback
from ansible.module_utils._text import to_native
from ansible_collections.community.crypto.plugins.module_utils.acme.errors import ModuleFailException
def read_file(fn, mode='b'):
try:
with open(fn, 'r' + mode) as f:
return f.read()
except Exception as e:
raise ModuleFailException('Error while reading file "{0}": {1}'.format(fn, e))
# This function was adapted from an earlier version of https://github.com/ansible/ansible/blob/devel/lib/ansible/modules/uri.py
def write_file(module, dest, content):
'''
Write content to destination file dest, only if the content
has changed.
'''
changed = False
# create a tempfile
fd, tmpsrc = tempfile.mkstemp(text=False)
f = os.fdopen(fd, 'wb')
try:
f.write(content)
except Exception as err:
try:
f.close()
except Exception as dummy:
pass
os.remove(tmpsrc)
raise ModuleFailException("failed to create temporary content file: %s" % to_native(err), exception=traceback.format_exc())
f.close()
checksum_src = None
checksum_dest = None
# raise an error if there is no tmpsrc file
if not os.path.exists(tmpsrc):
try:
os.remove(tmpsrc)
except Exception as dummy:
pass
raise ModuleFailException("Source %s does not exist" % (tmpsrc))
if not os.access(tmpsrc, os.R_OK):
os.remove(tmpsrc)
raise ModuleFailException("Source %s not readable" % (tmpsrc))
checksum_src = module.sha1(tmpsrc)
# check if there is no dest file
if os.path.exists(dest):
# raise an error if copy has no permission on dest
if not os.access(dest, os.W_OK):
os.remove(tmpsrc)
raise ModuleFailException("Destination %s not writable" % (dest))
if not os.access(dest, os.R_OK):
os.remove(tmpsrc)
raise ModuleFailException("Destination %s not readable" % (dest))
checksum_dest = module.sha1(dest)
else:
dirname = os.path.dirname(dest) or '.'
if not os.access(dirname, os.W_OK):
os.remove(tmpsrc)
raise ModuleFailException("Destination dir %s not writable" % (dirname))
if checksum_src != checksum_dest:
try:
shutil.copyfile(tmpsrc, dest)
changed = True
except Exception as err:
os.remove(tmpsrc)
raise ModuleFailException("failed to copy %s to %s: %s" % (tmpsrc, dest, to_native(err)), exception=traceback.format_exc())
os.remove(tmpsrc)
return changed

View File

@@ -0,0 +1,125 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2016 Michael Gruener <michael.gruener@chaosmoon.net>
# Copyright: (c) 2021 Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import time
from ansible_collections.community.crypto.plugins.module_utils.acme.utils import (
nopad_b64,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.errors import (
ACMEProtocolException,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.challenges import (
Authorization,
)
class Order(object):
def _setup(self, client, data):
self.data = data
self.status = data['status']
self.identifiers = []
for identifier in data['identifiers']:
self.identifiers.append((identifier['type'], identifier['value']))
self.finalize_uri = data.get('finalize')
self.certificate_uri = data.get('certificate')
self.authorization_uris = data['authorizations']
self.authorizations = {}
def __init__(self, url):
self.url = url
self.data = None
self.status = None
self.identifiers = []
self.finalize_uri = None
self.certificate_uri = None
self.authorization_uris = []
self.authorizations = {}
@classmethod
def from_json(cls, client, data, url):
result = cls(url)
result._setup(client, data)
return result
@classmethod
def from_url(cls, client, url):
result = cls(url)
result.refresh(client)
return result
@classmethod
def create(cls, client, identifiers):
'''
Start a new certificate order (ACME v2 protocol).
https://tools.ietf.org/html/rfc8555#section-7.4
'''
acme_identifiers = []
for identifier_type, identifier in identifiers:
acme_identifiers.append({
'type': identifier_type,
'value': identifier,
})
new_order = {
"identifiers": acme_identifiers
}
result, info = client.send_signed_request(
client.directory['newOrder'], new_order, error_msg='Failed to start new order', expected_status_codes=[201])
return cls.from_json(client, result, info['location'])
def refresh(self, client):
result, dummy = client.get_request(self.url)
changed = self.data != result
self._setup(client, result)
return changed
def load_authorizations(self, client):
for auth_uri in self.authorization_uris:
authz = Authorization.from_url(client, auth_uri)
self.authorizations[authz.combined_identifier] = authz
def wait_for_finalization(self, client):
while True:
self.refresh(client)
if self.status in ['valid', 'invalid', 'pending', 'ready']:
break
time.sleep(2)
if self.status != 'valid':
raise ACMEProtocolException(
'Failed to wait for order to complete; got status "{status}"'.format(status=self.status), content_json=self.data)
def finalize(self, client, csr_der, wait=True):
'''
Create a new certificate based on the csr.
Return the certificate object as dict
https://tools.ietf.org/html/rfc8555#section-7.4
'''
new_cert = {
"csr": nopad_b64(csr_der),
}
result, info = client.send_signed_request(
self.finalize_uri, new_cert, error_msg='Failed to finalizing order', expected_status_codes=[200])
# It is not clear from the RFC whether the finalize call returns the order object or not.
# Instead of using the result, we call self.refresh(client) below.
if wait:
self.wait_for_finalization(client)
else:
self.refresh(client)
if self.status not in ['procesing', 'valid', 'invalid']:
raise ACMEProtocolException(
'Failed to finalize order; got status "{status}"'.format(
status=self.status), info=info, content_json=result)

View File

@@ -0,0 +1,71 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2016 Michael Gruener <michael.gruener@chaosmoon.net>
# Copyright: (c) 2021 Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import base64
import re
import textwrap
import traceback
from ansible.module_utils._text import to_native
from ansible.module_utils.six.moves.urllib.parse import unquote
from ansible_collections.community.crypto.plugins.module_utils.acme.errors import ModuleFailException
def nopad_b64(data):
return base64.urlsafe_b64encode(data).decode('utf8').replace("=", "")
def der_to_pem(der_cert):
'''
Convert the DER format certificate in der_cert to a PEM format certificate and return it.
'''
return """-----BEGIN CERTIFICATE-----\n{0}\n-----END CERTIFICATE-----\n""".format(
"\n".join(textwrap.wrap(base64.b64encode(der_cert).decode('utf8'), 64)))
def pem_to_der(pem_filename=None, pem_content=None):
'''
Load PEM file, or use PEM file's content, and convert to DER.
If PEM contains multiple entities, the first entity will be used.
'''
certificate_lines = []
if pem_content is not None:
lines = pem_content.splitlines()
elif pem_filename is not None:
try:
with open(pem_filename, "rt") as f:
lines = list(f)
except Exception as err:
raise ModuleFailException("cannot load PEM file {0}: {1}".format(pem_filename, to_native(err)), exception=traceback.format_exc())
else:
raise ModuleFailException('One of pem_filename and pem_content must be provided')
header_line_count = 0
for line in lines:
if line.startswith('-----'):
header_line_count += 1
if header_line_count == 2:
# If certificate file contains other certs appended
# (like intermediate certificates), ignore these.
break
continue
certificate_lines.append(line.strip())
return base64.b64decode(''.join(certificate_lines))
def process_links(info, callback):
'''
Process link header, calls callback for every link header with the URL and relation as options.
'''
if 'link' in info:
link = info['link']
for url, relation in re.findall(r'<([^>]+)>;\s*rel="(\w+)"', link):
callback(unquote(url), relation)