# Copyright (c) 2016-2017, Yanis Guenane # Copyright (c) 2017, Markus Teufelberger # Copyright (c) 2020, Felix Fontein # GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) # SPDX-License-Identifier: GPL-3.0-or-later from __future__ import annotations import abc import binascii from ansible.module_utils.common.text.converters import to_native from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( CRYPTOGRAPHY_TIMEZONE, cryptography_decode_name, cryptography_get_extensions_from_cert, cryptography_oid_to_name, get_not_valid_after, get_not_valid_before, ) from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.publickey_info import ( get_publickey_info, ) from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( get_fingerprint_of_bytes, load_certificate, ) from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import ( COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION, assert_required_cryptography_version, ) from ansible_collections.community.crypto.plugins.module_utils.time import ( get_now_datetime, ) MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION try: import cryptography from cryptography import x509 from cryptography.hazmat.primitives import serialization except ImportError: pass TIMESTAMP_FORMAT = "%Y%m%d%H%M%SZ" class CertificateInfoRetrieval(metaclass=abc.ABCMeta): def __init__(self, module, content): # content must be a bytes string self.module = module self.content = content @abc.abstractmethod def _get_der_bytes(self): pass @abc.abstractmethod def _get_signature_algorithm(self): pass @abc.abstractmethod def _get_subject_ordered(self): pass @abc.abstractmethod def _get_issuer_ordered(self): pass @abc.abstractmethod def _get_version(self): pass @abc.abstractmethod def _get_key_usage(self): pass @abc.abstractmethod def _get_extended_key_usage(self): pass @abc.abstractmethod def _get_basic_constraints(self): pass @abc.abstractmethod def _get_ocsp_must_staple(self): pass @abc.abstractmethod def _get_subject_alt_name(self): pass @abc.abstractmethod def get_not_before(self): pass @abc.abstractmethod def get_not_after(self): pass @abc.abstractmethod def _get_public_key_pem(self): pass @abc.abstractmethod def _get_public_key_object(self): pass @abc.abstractmethod def _get_subject_key_identifier(self): pass @abc.abstractmethod def _get_authority_key_identifier(self): pass @abc.abstractmethod def _get_serial_number(self): pass @abc.abstractmethod def _get_all_extensions(self): pass @abc.abstractmethod def _get_ocsp_uri(self): pass @abc.abstractmethod def _get_issuer_uri(self): pass def get_info(self, prefer_one_fingerprint=False, der_support_enabled=False): result = dict() self.cert = load_certificate( None, content=self.content, der_support_enabled=der_support_enabled, ) result["signature_algorithm"] = self._get_signature_algorithm() subject = self._get_subject_ordered() issuer = self._get_issuer_ordered() result["subject"] = dict() for k, v in subject: result["subject"][k] = v result["subject_ordered"] = subject result["issuer"] = dict() for k, v in issuer: result["issuer"][k] = v result["issuer_ordered"] = issuer result["version"] = self._get_version() result["key_usage"], result["key_usage_critical"] = self._get_key_usage() result["extended_key_usage"], result["extended_key_usage_critical"] = ( self._get_extended_key_usage() ) result["basic_constraints"], result["basic_constraints_critical"] = ( self._get_basic_constraints() ) result["ocsp_must_staple"], result["ocsp_must_staple_critical"] = ( self._get_ocsp_must_staple() ) result["subject_alt_name"], result["subject_alt_name_critical"] = ( self._get_subject_alt_name() ) not_before = self.get_not_before() not_after = self.get_not_after() result["not_before"] = not_before.strftime(TIMESTAMP_FORMAT) result["not_after"] = not_after.strftime(TIMESTAMP_FORMAT) result["expired"] = not_after < get_now_datetime( with_timezone=CRYPTOGRAPHY_TIMEZONE ) result["public_key"] = to_native(self._get_public_key_pem()) public_key_info = get_publickey_info( self.module, key=self._get_public_key_object(), prefer_one_fingerprint=prefer_one_fingerprint, ) result.update( { "public_key_type": public_key_info["type"], "public_key_data": public_key_info["public_data"], "public_key_fingerprints": public_key_info["fingerprints"], } ) result["fingerprints"] = get_fingerprint_of_bytes( self._get_der_bytes(), prefer_one=prefer_one_fingerprint ) ski = self._get_subject_key_identifier() if ski is not None: ski = binascii.hexlify(ski).decode("ascii") ski = ":".join([ski[i : i + 2] for i in range(0, len(ski), 2)]) result["subject_key_identifier"] = ski aki, aci, acsn = self._get_authority_key_identifier() if aki is not None: aki = binascii.hexlify(aki).decode("ascii") aki = ":".join([aki[i : i + 2] for i in range(0, len(aki), 2)]) result["authority_key_identifier"] = aki result["authority_cert_issuer"] = aci result["authority_cert_serial_number"] = acsn result["serial_number"] = self._get_serial_number() result["extensions_by_oid"] = self._get_all_extensions() result["ocsp_uri"] = self._get_ocsp_uri() result["issuer_uri"] = self._get_issuer_uri() return result class CertificateInfoRetrievalCryptography(CertificateInfoRetrieval): """Validate the supplied cert, using the cryptography backend""" def __init__(self, module, content): super(CertificateInfoRetrievalCryptography, self).__init__(module, content) self.name_encoding = module.params.get("name_encoding", "ignore") def _get_der_bytes(self): return self.cert.public_bytes(serialization.Encoding.DER) def _get_signature_algorithm(self): return cryptography_oid_to_name(self.cert.signature_algorithm_oid) def _get_subject_ordered(self): result = [] for attribute in self.cert.subject: result.append([cryptography_oid_to_name(attribute.oid), attribute.value]) return result def _get_issuer_ordered(self): result = [] for attribute in self.cert.issuer: result.append([cryptography_oid_to_name(attribute.oid), attribute.value]) return result def _get_version(self): if self.cert.version == x509.Version.v1: return 1 if self.cert.version == x509.Version.v3: return 3 return "unknown" def _get_key_usage(self): try: current_key_ext = self.cert.extensions.get_extension_for_class( x509.KeyUsage ) current_key_usage = current_key_ext.value key_usage = dict( digital_signature=current_key_usage.digital_signature, content_commitment=current_key_usage.content_commitment, key_encipherment=current_key_usage.key_encipherment, data_encipherment=current_key_usage.data_encipherment, key_agreement=current_key_usage.key_agreement, key_cert_sign=current_key_usage.key_cert_sign, crl_sign=current_key_usage.crl_sign, encipher_only=False, decipher_only=False, ) if key_usage["key_agreement"]: key_usage.update( dict( encipher_only=current_key_usage.encipher_only, decipher_only=current_key_usage.decipher_only, ) ) key_usage_names = dict( digital_signature="Digital Signature", content_commitment="Non Repudiation", key_encipherment="Key Encipherment", data_encipherment="Data Encipherment", key_agreement="Key Agreement", key_cert_sign="Certificate Sign", crl_sign="CRL Sign", encipher_only="Encipher Only", decipher_only="Decipher Only", ) return ( sorted( [ key_usage_names[name] for name, value in key_usage.items() if value ] ), current_key_ext.critical, ) except cryptography.x509.ExtensionNotFound: return None, False def _get_extended_key_usage(self): try: ext_keyusage_ext = self.cert.extensions.get_extension_for_class( x509.ExtendedKeyUsage ) return ( sorted( [cryptography_oid_to_name(eku) for eku in ext_keyusage_ext.value] ), ext_keyusage_ext.critical, ) except cryptography.x509.ExtensionNotFound: return None, False def _get_basic_constraints(self): try: ext_keyusage_ext = self.cert.extensions.get_extension_for_class( x509.BasicConstraints ) result = [] result.append(f"CA:{'TRUE' if ext_keyusage_ext.value.ca else 'FALSE'}") if ext_keyusage_ext.value.path_length is not None: result.append(f"pathlen:{ext_keyusage_ext.value.path_length}") return sorted(result), ext_keyusage_ext.critical except cryptography.x509.ExtensionNotFound: return None, False def _get_ocsp_must_staple(self): try: tlsfeature_ext = self.cert.extensions.get_extension_for_class( x509.TLSFeature ) value = ( cryptography.x509.TLSFeatureType.status_request in tlsfeature_ext.value ) return value, tlsfeature_ext.critical except cryptography.x509.ExtensionNotFound: return None, False def _get_subject_alt_name(self): try: san_ext = self.cert.extensions.get_extension_for_class( x509.SubjectAlternativeName ) result = [ cryptography_decode_name(san, idn_rewrite=self.name_encoding) for san in san_ext.value ] return result, san_ext.critical except cryptography.x509.ExtensionNotFound: return None, False def get_not_before(self): return get_not_valid_before(self.cert) def get_not_after(self): return get_not_valid_after(self.cert) def _get_public_key_pem(self): return self.cert.public_key().public_bytes( serialization.Encoding.PEM, serialization.PublicFormat.SubjectPublicKeyInfo, ) def _get_public_key_object(self): return self.cert.public_key() def _get_subject_key_identifier(self): try: ext = self.cert.extensions.get_extension_for_class( x509.SubjectKeyIdentifier ) return ext.value.digest except cryptography.x509.ExtensionNotFound: return None def _get_authority_key_identifier(self): try: ext = self.cert.extensions.get_extension_for_class( x509.AuthorityKeyIdentifier ) issuer = None if ext.value.authority_cert_issuer is not None: issuer = [ cryptography_decode_name(san, idn_rewrite=self.name_encoding) for san in ext.value.authority_cert_issuer ] return ( ext.value.key_identifier, issuer, ext.value.authority_cert_serial_number, ) except cryptography.x509.ExtensionNotFound: return None, None, None def _get_serial_number(self): return self.cert.serial_number def _get_all_extensions(self): return cryptography_get_extensions_from_cert(self.cert) def _get_ocsp_uri(self): try: ext = self.cert.extensions.get_extension_for_class( x509.AuthorityInformationAccess ) for desc in ext.value: if desc.access_method == x509.oid.AuthorityInformationAccessOID.OCSP: if isinstance(desc.access_location, x509.UniformResourceIdentifier): return desc.access_location.value except x509.ExtensionNotFound: pass return None def _get_issuer_uri(self): try: ext = self.cert.extensions.get_extension_for_class( x509.AuthorityInformationAccess ) for desc in ext.value: if ( desc.access_method == x509.oid.AuthorityInformationAccessOID.CA_ISSUERS ): if isinstance(desc.access_location, x509.UniformResourceIdentifier): return desc.access_location.value except x509.ExtensionNotFound: pass return None def get_certificate_info(module, content, prefer_one_fingerprint=False): info = CertificateInfoRetrievalCryptography(module, content) return info.get_info(prefer_one_fingerprint=prefer_one_fingerprint) def select_backend(module, content): assert_required_cryptography_version( module, minimum_cryptography_version=MINIMAL_CRYPTOGRAPHY_VERSION ) return CertificateInfoRetrievalCryptography(module, content)