diff --git a/changelogs/fragments/203-x509_crl_info.yml b/changelogs/fragments/203-x509_crl_info.yml new file mode 100644 index 00000000..341663ad --- /dev/null +++ b/changelogs/fragments/203-x509_crl_info.yml @@ -0,0 +1,2 @@ +minor_changes: +- "x509_crl_info - refactor module to allow code re-use for diff mode (https://github.com/ansible-collections/community.crypto/pull/203)." diff --git a/plugins/module_utils/crypto/module_backends/crl_info.py b/plugins/module_utils/crypto/module_backends/crl_info.py new file mode 100644 index 00000000..41ce9302 --- /dev/null +++ b/plugins/module_utils/crypto/module_backends/crl_info.py @@ -0,0 +1,99 @@ +# -*- coding: utf-8 -*- +# +# Copyright: (c) 2020, Felix Fontein +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +import traceback + +from distutils.version import LooseVersion + +from ansible.module_utils.basic import missing_required_lib + +from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( + cryptography_oid_to_name, +) + +from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_crl import ( + TIMESTAMP_FORMAT, + cryptography_decode_revoked_certificate, + cryptography_dump_revoked, + cryptography_get_signature_algorithm_oid_from_crl, +) + +from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import ( + identify_pem_format, +) + +# crypto_utils + +MINIMAL_CRYPTOGRAPHY_VERSION = '1.2' + +CRYPTOGRAPHY_IMP_ERR = None +try: + import cryptography + from cryptography import x509 + from cryptography.hazmat.backends import default_backend + CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) +except ImportError: + CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() + CRYPTOGRAPHY_FOUND = False +else: + CRYPTOGRAPHY_FOUND = True + + +class CRLInfoRetrieval(object): + def __init__(self, module, content): + # content must be a bytes string + self.module = module + self.content = content + + def get_info(self): + self.crl_pem = identify_pem_format(self.content) + try: + if self.crl_pem: + self.crl = x509.load_pem_x509_crl(self.content, default_backend()) + else: + self.crl = x509.load_der_x509_crl(self.content, default_backend()) + except ValueError as e: + self.module.fail_json(msg='Error while decoding CRL: {0}'.format(e)) + + result = { + 'changed': False, + 'format': 'pem' if self.crl_pem else 'der', + 'last_update': None, + 'next_update': None, + 'digest': None, + 'issuer_ordered': None, + 'issuer': None, + 'revoked_certificates': [], + } + + result['last_update'] = self.crl.last_update.strftime(TIMESTAMP_FORMAT) + result['next_update'] = self.crl.next_update.strftime(TIMESTAMP_FORMAT) + result['digest'] = cryptography_oid_to_name(cryptography_get_signature_algorithm_oid_from_crl(self.crl)) + issuer = [] + for attribute in self.crl.issuer: + issuer.append([cryptography_oid_to_name(attribute.oid), attribute.value]) + result['issuer_ordered'] = issuer + result['issuer'] = {} + for k, v in issuer: + result['issuer'][k] = v + result['revoked_certificates'] = [] + for cert in self.crl: + entry = cryptography_decode_revoked_certificate(cert) + result['revoked_certificates'].append(cryptography_dump_revoked(entry)) + + return result + + +def get_crl_info(module, content): + if not CRYPTOGRAPHY_FOUND: + module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)), + exception=CRYPTOGRAPHY_IMP_ERR) + + info = CRLInfoRetrieval(module, content) + return info.get_info() diff --git a/plugins/modules/x509_crl_info.py b/plugins/modules/x509_crl_info.py index 94ae95b1..ee66c43f 100644 --- a/plugins/modules/x509_crl_info.py +++ b/plugins/modules/x509_crl_info.py @@ -134,129 +134,22 @@ revoked_certificates: import base64 -import traceback +import binascii -from distutils.version import LooseVersion - -from ansible.module_utils.basic import AnsibleModule, missing_required_lib +from ansible.module_utils.basic import AnsibleModule from ansible.module_utils._text import to_native from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( OpenSSLObjectError, ) -from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( - OpenSSLObject, -) - -from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( - cryptography_oid_to_name, -) - -from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_crl import ( - TIMESTAMP_FORMAT, - cryptography_decode_revoked_certificate, - cryptography_dump_revoked, - cryptography_get_signature_algorithm_oid_from_crl, -) - from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import ( identify_pem_format, ) -# crypto_utils - -MINIMAL_CRYPTOGRAPHY_VERSION = '1.2' - -CRYPTOGRAPHY_IMP_ERR = None -try: - import cryptography - from cryptography import x509 - from cryptography.hazmat.backends import default_backend - CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) -except ImportError: - CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() - CRYPTOGRAPHY_FOUND = False -else: - CRYPTOGRAPHY_FOUND = True - - -class CRLError(OpenSSLObjectError): - pass - - -class CRLInfo(OpenSSLObject): - """The main module implementation.""" - - def __init__(self, module): - super(CRLInfo, self).__init__( - module.params['path'] or '', - 'present', - False, - module.check_mode - ) - - self.content = module.params['content'] - - self.module = module - - self.crl = None - if self.content is None: - try: - with open(self.path, 'rb') as f: - data = f.read() - except Exception as e: - self.module.fail_json(msg='Error while reading CRL file from disk: {0}'.format(e)) - else: - data = self.content.encode('utf-8') - if not identify_pem_format(data): - data = base64.b64decode(self.content) - - self.crl_pem = identify_pem_format(data) - try: - if self.crl_pem: - self.crl = x509.load_pem_x509_crl(data, default_backend()) - else: - self.crl = x509.load_der_x509_crl(data, default_backend()) - except Exception as e: - self.module.fail_json(msg='Error while decoding CRL: {0}'.format(e)) - - def get_info(self): - result = { - 'changed': False, - 'format': 'pem' if self.crl_pem else 'der', - 'last_update': None, - 'next_update': None, - 'digest': None, - 'issuer_ordered': None, - 'issuer': None, - 'revoked_certificates': [], - } - - result['last_update'] = self.crl.last_update.strftime(TIMESTAMP_FORMAT) - result['next_update'] = self.crl.next_update.strftime(TIMESTAMP_FORMAT) - result['digest'] = cryptography_oid_to_name(cryptography_get_signature_algorithm_oid_from_crl(self.crl)) - issuer = [] - for attribute in self.crl.issuer: - issuer.append([cryptography_oid_to_name(attribute.oid), attribute.value]) - result['issuer_ordered'] = issuer - result['issuer'] = {} - for k, v in issuer: - result['issuer'][k] = v - result['revoked_certificates'] = [] - for cert in self.crl: - entry = cryptography_decode_revoked_certificate(cert) - result['revoked_certificates'].append(cryptography_dump_revoked(entry)) - - return result - - def generate(self): - # Empty method because OpenSSLObject wants this - pass - - def dump(self): - # Empty method because OpenSSLObject wants this - pass +from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.crl_info import ( + get_crl_info, +) def main(): @@ -274,13 +167,22 @@ def main(): supports_check_mode=True, ) - if not CRYPTOGRAPHY_FOUND: - module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)), - exception=CRYPTOGRAPHY_IMP_ERR) + if module.params['content'] is None: + try: + with open(module.params['path'], 'rb') as f: + data = f.read() + except (IOError, OSError) as e: + module.fail_json(msg='Error while reading CRL file from disk: {0}'.format(e)) + else: + data = module.params['content'].encode('utf-8') + if not identify_pem_format(data): + try: + data = base64.b64decode(module.params['content']) + except (binascii.Error, TypeError) as e: + module.fail_json(msg='Error while Base64 decoding content: {0}'.format(e)) try: - crl = CRLInfo(module) - result = crl.get_info() + result = get_crl_info(module, data) module.exit_json(**result) except OpenSSLObjectError as e: module.fail_json(msg=to_native(e)) diff --git a/tests/integration/targets/x509_crl/meta/main.yml b/tests/integration/targets/x509_crl/meta/main.yml index 800aff64..50a66bb6 100644 --- a/tests/integration/targets/x509_crl/meta/main.yml +++ b/tests/integration/targets/x509_crl/meta/main.yml @@ -1,2 +1,3 @@ dependencies: - setup_openssl + - setup_pyopenssl # the x509_crl* modules don't need this, but the other modules using during the tests do in some situations