# Copyright (c) 2016-2017, Yanis Guenane # Copyright (c) 2017, Markus Teufelberger # Copyright (c) 2020, Felix Fontein # GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) # SPDX-License-Identifier: GPL-3.0-or-later from __future__ import annotations import abc import binascii import traceback from ansible.module_utils import six from ansible.module_utils.basic import missing_required_lib from ansible.module_utils.common.text.converters import to_native from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( cryptography_decode_name, cryptography_get_extensions_from_csr, cryptography_oid_to_name, ) from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.publickey_info import ( get_publickey_info, ) from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( load_certificate_request, ) from ansible_collections.community.crypto.plugins.module_utils.version import ( LooseVersion, ) MINIMAL_CRYPTOGRAPHY_VERSION = "1.3" CRYPTOGRAPHY_IMP_ERR = None try: import cryptography from cryptography import x509 from cryptography.hazmat.primitives import serialization CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) except ImportError: CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() CRYPTOGRAPHY_FOUND = False else: CRYPTOGRAPHY_FOUND = True TIMESTAMP_FORMAT = "%Y%m%d%H%M%SZ" @six.add_metaclass(abc.ABCMeta) class CSRInfoRetrieval: def __init__(self, module, backend, content, validate_signature): # content must be a bytes string self.module = module self.backend = backend self.content = content self.validate_signature = validate_signature @abc.abstractmethod def _get_subject_ordered(self): pass @abc.abstractmethod def _get_key_usage(self): pass @abc.abstractmethod def _get_extended_key_usage(self): pass @abc.abstractmethod def _get_basic_constraints(self): pass @abc.abstractmethod def _get_ocsp_must_staple(self): pass @abc.abstractmethod def _get_subject_alt_name(self): pass @abc.abstractmethod def _get_name_constraints(self): pass @abc.abstractmethod def _get_public_key_pem(self): pass @abc.abstractmethod def _get_public_key_object(self): pass @abc.abstractmethod def _get_subject_key_identifier(self): pass @abc.abstractmethod def _get_authority_key_identifier(self): pass @abc.abstractmethod def _get_all_extensions(self): pass @abc.abstractmethod def _is_signature_valid(self): pass def get_info(self, prefer_one_fingerprint=False): result = dict() self.csr = load_certificate_request( None, content=self.content, backend=self.backend ) subject = self._get_subject_ordered() result["subject"] = dict() for k, v in subject: result["subject"][k] = v result["subject_ordered"] = subject result["key_usage"], result["key_usage_critical"] = self._get_key_usage() result["extended_key_usage"], result["extended_key_usage_critical"] = ( self._get_extended_key_usage() ) result["basic_constraints"], result["basic_constraints_critical"] = ( self._get_basic_constraints() ) result["ocsp_must_staple"], result["ocsp_must_staple_critical"] = ( self._get_ocsp_must_staple() ) result["subject_alt_name"], result["subject_alt_name_critical"] = ( self._get_subject_alt_name() ) ( result["name_constraints_permitted"], result["name_constraints_excluded"], result["name_constraints_critical"], ) = self._get_name_constraints() result["public_key"] = to_native(self._get_public_key_pem()) public_key_info = get_publickey_info( self.module, self.backend, key=self._get_public_key_object(), prefer_one_fingerprint=prefer_one_fingerprint, ) result.update( { "public_key_type": public_key_info["type"], "public_key_data": public_key_info["public_data"], "public_key_fingerprints": public_key_info["fingerprints"], } ) ski = self._get_subject_key_identifier() if ski is not None: ski = to_native(binascii.hexlify(ski)) ski = ":".join([ski[i : i + 2] for i in range(0, len(ski), 2)]) result["subject_key_identifier"] = ski aki, aci, acsn = self._get_authority_key_identifier() if aki is not None: aki = to_native(binascii.hexlify(aki)) aki = ":".join([aki[i : i + 2] for i in range(0, len(aki), 2)]) result["authority_key_identifier"] = aki result["authority_cert_issuer"] = aci result["authority_cert_serial_number"] = acsn result["extensions_by_oid"] = self._get_all_extensions() result["signature_valid"] = self._is_signature_valid() if self.validate_signature and not result["signature_valid"]: self.module.fail_json(msg="CSR signature is invalid!", **result) return result class CSRInfoRetrievalCryptography(CSRInfoRetrieval): """Validate the supplied CSR, using the cryptography backend""" def __init__(self, module, content, validate_signature): super(CSRInfoRetrievalCryptography, self).__init__( module, "cryptography", content, validate_signature ) self.name_encoding = module.params.get("name_encoding", "ignore") def _get_subject_ordered(self): result = [] for attribute in self.csr.subject: result.append([cryptography_oid_to_name(attribute.oid), attribute.value]) return result def _get_key_usage(self): try: current_key_ext = self.csr.extensions.get_extension_for_class(x509.KeyUsage) current_key_usage = current_key_ext.value key_usage = dict( digital_signature=current_key_usage.digital_signature, content_commitment=current_key_usage.content_commitment, key_encipherment=current_key_usage.key_encipherment, data_encipherment=current_key_usage.data_encipherment, key_agreement=current_key_usage.key_agreement, key_cert_sign=current_key_usage.key_cert_sign, crl_sign=current_key_usage.crl_sign, encipher_only=False, decipher_only=False, ) if key_usage["key_agreement"]: key_usage.update( dict( encipher_only=current_key_usage.encipher_only, decipher_only=current_key_usage.decipher_only, ) ) key_usage_names = dict( digital_signature="Digital Signature", content_commitment="Non Repudiation", key_encipherment="Key Encipherment", data_encipherment="Data Encipherment", key_agreement="Key Agreement", key_cert_sign="Certificate Sign", crl_sign="CRL Sign", encipher_only="Encipher Only", decipher_only="Decipher Only", ) return ( sorted( [ key_usage_names[name] for name, value in key_usage.items() if value ] ), current_key_ext.critical, ) except cryptography.x509.ExtensionNotFound: return None, False def _get_extended_key_usage(self): try: ext_keyusage_ext = self.csr.extensions.get_extension_for_class( x509.ExtendedKeyUsage ) return ( sorted( [cryptography_oid_to_name(eku) for eku in ext_keyusage_ext.value] ), ext_keyusage_ext.critical, ) except cryptography.x509.ExtensionNotFound: return None, False def _get_basic_constraints(self): try: ext_keyusage_ext = self.csr.extensions.get_extension_for_class( x509.BasicConstraints ) result = ["CA:{0}".format("TRUE" if ext_keyusage_ext.value.ca else "FALSE")] if ext_keyusage_ext.value.path_length is not None: result.append("pathlen:{0}".format(ext_keyusage_ext.value.path_length)) return sorted(result), ext_keyusage_ext.critical except cryptography.x509.ExtensionNotFound: return None, False def _get_ocsp_must_staple(self): try: try: # This only works with cryptography >= 2.1 tlsfeature_ext = self.csr.extensions.get_extension_for_class( x509.TLSFeature ) value = ( cryptography.x509.TLSFeatureType.status_request in tlsfeature_ext.value ) except AttributeError: # Fallback for cryptography < 2.1 oid = x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.1.24") tlsfeature_ext = self.csr.extensions.get_extension_for_oid(oid) value = tlsfeature_ext.value.value == b"\x30\x03\x02\x01\x05" return value, tlsfeature_ext.critical except cryptography.x509.ExtensionNotFound: return None, False def _get_subject_alt_name(self): try: san_ext = self.csr.extensions.get_extension_for_class( x509.SubjectAlternativeName ) result = [ cryptography_decode_name(san, idn_rewrite=self.name_encoding) for san in san_ext.value ] return result, san_ext.critical except cryptography.x509.ExtensionNotFound: return None, False def _get_name_constraints(self): try: nc_ext = self.csr.extensions.get_extension_for_class(x509.NameConstraints) permitted = [ cryptography_decode_name(san, idn_rewrite=self.name_encoding) for san in nc_ext.value.permitted_subtrees or [] ] excluded = [ cryptography_decode_name(san, idn_rewrite=self.name_encoding) for san in nc_ext.value.excluded_subtrees or [] ] return permitted, excluded, nc_ext.critical except cryptography.x509.ExtensionNotFound: return None, None, False def _get_public_key_pem(self): return self.csr.public_key().public_bytes( serialization.Encoding.PEM, serialization.PublicFormat.SubjectPublicKeyInfo, ) def _get_public_key_object(self): return self.csr.public_key() def _get_subject_key_identifier(self): try: ext = self.csr.extensions.get_extension_for_class(x509.SubjectKeyIdentifier) return ext.value.digest except cryptography.x509.ExtensionNotFound: return None def _get_authority_key_identifier(self): try: ext = self.csr.extensions.get_extension_for_class( x509.AuthorityKeyIdentifier ) issuer = None if ext.value.authority_cert_issuer is not None: issuer = [ cryptography_decode_name(san, idn_rewrite=self.name_encoding) for san in ext.value.authority_cert_issuer ] return ( ext.value.key_identifier, issuer, ext.value.authority_cert_serial_number, ) except cryptography.x509.ExtensionNotFound: return None, None, None def _get_all_extensions(self): return cryptography_get_extensions_from_csr(self.csr) def _is_signature_valid(self): return self.csr.is_signature_valid def get_csr_info( module, backend, content, validate_signature=True, prefer_one_fingerprint=False ): if backend == "cryptography": info = CSRInfoRetrievalCryptography( module, content, validate_signature=validate_signature ) return info.get_info(prefer_one_fingerprint=prefer_one_fingerprint) def select_backend(module, backend, content, validate_signature=True): if backend == "auto": # Detection what is possible can_use_cryptography = ( CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION) ) # Try cryptography if can_use_cryptography: backend = "cryptography" # Success? if backend == "auto": module.fail_json( msg=( "Cannot detect the required Python library " "cryptography (>= {0})" ).format(MINIMAL_CRYPTOGRAPHY_VERSION) ) if backend == "cryptography": if not CRYPTOGRAPHY_FOUND: module.fail_json( msg=missing_required_lib( "cryptography >= {0}".format(MINIMAL_CRYPTOGRAPHY_VERSION) ), exception=CRYPTOGRAPHY_IMP_ERR, ) return backend, CSRInfoRetrievalCryptography( module, content, validate_signature=validate_signature ) else: raise ValueError("Unsupported value for backend: {0}".format(backend))