Files
community.crypto/plugins/module_utils/crypto/module_backends/certificate_info.py
Felix Fontein 641e63b08c Replace % and str.format() with f-strings (#875)
* Replace % and str.format() with f-strings.

* Apply suggestions from review.
2025-05-01 11:50:10 +02:00

489 lines
16 KiB
Python

# Copyright (c) 2016-2017, Yanis Guenane <yanis+ansible@guenane.org>
# Copyright (c) 2017, Markus Teufelberger <mteufelberger+ansible@mgit.at>
# Copyright (c) 2020, Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import annotations
import abc
import binascii
import traceback
from ansible.module_utils import six
from ansible.module_utils.basic import missing_required_lib
from ansible.module_utils.common.text.converters import to_native
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
CRYPTOGRAPHY_TIMEZONE,
cryptography_decode_name,
cryptography_get_extensions_from_cert,
cryptography_oid_to_name,
cryptography_serial_number_of_cert,
get_not_valid_after,
get_not_valid_before,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.publickey_info import (
get_publickey_info,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
get_fingerprint_of_bytes,
load_certificate,
)
from ansible_collections.community.crypto.plugins.module_utils.time import (
get_now_datetime,
)
from ansible_collections.community.crypto.plugins.module_utils.version import (
LooseVersion,
)
MINIMAL_CRYPTOGRAPHY_VERSION = "1.6"
CRYPTOGRAPHY_IMP_ERR = None
try:
import cryptography
from cryptography import x509
from cryptography.hazmat.primitives import serialization
CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
except ImportError:
CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
CRYPTOGRAPHY_FOUND = False
else:
CRYPTOGRAPHY_FOUND = True
TIMESTAMP_FORMAT = "%Y%m%d%H%M%SZ"
@six.add_metaclass(abc.ABCMeta)
class CertificateInfoRetrieval:
def __init__(self, module, backend, content):
# content must be a bytes string
self.module = module
self.backend = backend
self.content = content
@abc.abstractmethod
def _get_der_bytes(self):
pass
@abc.abstractmethod
def _get_signature_algorithm(self):
pass
@abc.abstractmethod
def _get_subject_ordered(self):
pass
@abc.abstractmethod
def _get_issuer_ordered(self):
pass
@abc.abstractmethod
def _get_version(self):
pass
@abc.abstractmethod
def _get_key_usage(self):
pass
@abc.abstractmethod
def _get_extended_key_usage(self):
pass
@abc.abstractmethod
def _get_basic_constraints(self):
pass
@abc.abstractmethod
def _get_ocsp_must_staple(self):
pass
@abc.abstractmethod
def _get_subject_alt_name(self):
pass
@abc.abstractmethod
def get_not_before(self):
pass
@abc.abstractmethod
def get_not_after(self):
pass
@abc.abstractmethod
def _get_public_key_pem(self):
pass
@abc.abstractmethod
def _get_public_key_object(self):
pass
@abc.abstractmethod
def _get_subject_key_identifier(self):
pass
@abc.abstractmethod
def _get_authority_key_identifier(self):
pass
@abc.abstractmethod
def _get_serial_number(self):
pass
@abc.abstractmethod
def _get_all_extensions(self):
pass
@abc.abstractmethod
def _get_ocsp_uri(self):
pass
@abc.abstractmethod
def _get_issuer_uri(self):
pass
def get_info(self, prefer_one_fingerprint=False, der_support_enabled=False):
result = dict()
self.cert = load_certificate(
None,
content=self.content,
backend=self.backend,
der_support_enabled=der_support_enabled,
)
result["signature_algorithm"] = self._get_signature_algorithm()
subject = self._get_subject_ordered()
issuer = self._get_issuer_ordered()
result["subject"] = dict()
for k, v in subject:
result["subject"][k] = v
result["subject_ordered"] = subject
result["issuer"] = dict()
for k, v in issuer:
result["issuer"][k] = v
result["issuer_ordered"] = issuer
result["version"] = self._get_version()
result["key_usage"], result["key_usage_critical"] = self._get_key_usage()
result["extended_key_usage"], result["extended_key_usage_critical"] = (
self._get_extended_key_usage()
)
result["basic_constraints"], result["basic_constraints_critical"] = (
self._get_basic_constraints()
)
result["ocsp_must_staple"], result["ocsp_must_staple_critical"] = (
self._get_ocsp_must_staple()
)
result["subject_alt_name"], result["subject_alt_name_critical"] = (
self._get_subject_alt_name()
)
not_before = self.get_not_before()
not_after = self.get_not_after()
result["not_before"] = not_before.strftime(TIMESTAMP_FORMAT)
result["not_after"] = not_after.strftime(TIMESTAMP_FORMAT)
result["expired"] = not_after < get_now_datetime(
with_timezone=CRYPTOGRAPHY_TIMEZONE
)
result["public_key"] = to_native(self._get_public_key_pem())
public_key_info = get_publickey_info(
self.module,
self.backend,
key=self._get_public_key_object(),
prefer_one_fingerprint=prefer_one_fingerprint,
)
result.update(
{
"public_key_type": public_key_info["type"],
"public_key_data": public_key_info["public_data"],
"public_key_fingerprints": public_key_info["fingerprints"],
}
)
result["fingerprints"] = get_fingerprint_of_bytes(
self._get_der_bytes(), prefer_one=prefer_one_fingerprint
)
ski = self._get_subject_key_identifier()
if ski is not None:
ski = to_native(binascii.hexlify(ski))
ski = ":".join([ski[i : i + 2] for i in range(0, len(ski), 2)])
result["subject_key_identifier"] = ski
aki, aci, acsn = self._get_authority_key_identifier()
if aki is not None:
aki = to_native(binascii.hexlify(aki))
aki = ":".join([aki[i : i + 2] for i in range(0, len(aki), 2)])
result["authority_key_identifier"] = aki
result["authority_cert_issuer"] = aci
result["authority_cert_serial_number"] = acsn
result["serial_number"] = self._get_serial_number()
result["extensions_by_oid"] = self._get_all_extensions()
result["ocsp_uri"] = self._get_ocsp_uri()
result["issuer_uri"] = self._get_issuer_uri()
return result
class CertificateInfoRetrievalCryptography(CertificateInfoRetrieval):
"""Validate the supplied cert, using the cryptography backend"""
def __init__(self, module, content):
super(CertificateInfoRetrievalCryptography, self).__init__(
module, "cryptography", content
)
self.name_encoding = module.params.get("name_encoding", "ignore")
def _get_der_bytes(self):
return self.cert.public_bytes(serialization.Encoding.DER)
def _get_signature_algorithm(self):
return cryptography_oid_to_name(self.cert.signature_algorithm_oid)
def _get_subject_ordered(self):
result = []
for attribute in self.cert.subject:
result.append([cryptography_oid_to_name(attribute.oid), attribute.value])
return result
def _get_issuer_ordered(self):
result = []
for attribute in self.cert.issuer:
result.append([cryptography_oid_to_name(attribute.oid), attribute.value])
return result
def _get_version(self):
if self.cert.version == x509.Version.v1:
return 1
if self.cert.version == x509.Version.v3:
return 3
return "unknown"
def _get_key_usage(self):
try:
current_key_ext = self.cert.extensions.get_extension_for_class(
x509.KeyUsage
)
current_key_usage = current_key_ext.value
key_usage = dict(
digital_signature=current_key_usage.digital_signature,
content_commitment=current_key_usage.content_commitment,
key_encipherment=current_key_usage.key_encipherment,
data_encipherment=current_key_usage.data_encipherment,
key_agreement=current_key_usage.key_agreement,
key_cert_sign=current_key_usage.key_cert_sign,
crl_sign=current_key_usage.crl_sign,
encipher_only=False,
decipher_only=False,
)
if key_usage["key_agreement"]:
key_usage.update(
dict(
encipher_only=current_key_usage.encipher_only,
decipher_only=current_key_usage.decipher_only,
)
)
key_usage_names = dict(
digital_signature="Digital Signature",
content_commitment="Non Repudiation",
key_encipherment="Key Encipherment",
data_encipherment="Data Encipherment",
key_agreement="Key Agreement",
key_cert_sign="Certificate Sign",
crl_sign="CRL Sign",
encipher_only="Encipher Only",
decipher_only="Decipher Only",
)
return (
sorted(
[
key_usage_names[name]
for name, value in key_usage.items()
if value
]
),
current_key_ext.critical,
)
except cryptography.x509.ExtensionNotFound:
return None, False
def _get_extended_key_usage(self):
try:
ext_keyusage_ext = self.cert.extensions.get_extension_for_class(
x509.ExtendedKeyUsage
)
return (
sorted(
[cryptography_oid_to_name(eku) for eku in ext_keyusage_ext.value]
),
ext_keyusage_ext.critical,
)
except cryptography.x509.ExtensionNotFound:
return None, False
def _get_basic_constraints(self):
try:
ext_keyusage_ext = self.cert.extensions.get_extension_for_class(
x509.BasicConstraints
)
result = []
result.append(f"CA:{'TRUE' if ext_keyusage_ext.value.ca else 'FALSE'}")
if ext_keyusage_ext.value.path_length is not None:
result.append(f"pathlen:{ext_keyusage_ext.value.path_length}")
return sorted(result), ext_keyusage_ext.critical
except cryptography.x509.ExtensionNotFound:
return None, False
def _get_ocsp_must_staple(self):
try:
try:
# This only works with cryptography >= 2.1
tlsfeature_ext = self.cert.extensions.get_extension_for_class(
x509.TLSFeature
)
value = (
cryptography.x509.TLSFeatureType.status_request
in tlsfeature_ext.value
)
except AttributeError:
# Fallback for cryptography < 2.1
oid = x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.1.24")
tlsfeature_ext = self.cert.extensions.get_extension_for_oid(oid)
value = tlsfeature_ext.value.value == b"\x30\x03\x02\x01\x05"
return value, tlsfeature_ext.critical
except cryptography.x509.ExtensionNotFound:
return None, False
def _get_subject_alt_name(self):
try:
san_ext = self.cert.extensions.get_extension_for_class(
x509.SubjectAlternativeName
)
result = [
cryptography_decode_name(san, idn_rewrite=self.name_encoding)
for san in san_ext.value
]
return result, san_ext.critical
except cryptography.x509.ExtensionNotFound:
return None, False
def get_not_before(self):
return get_not_valid_before(self.cert)
def get_not_after(self):
return get_not_valid_after(self.cert)
def _get_public_key_pem(self):
return self.cert.public_key().public_bytes(
serialization.Encoding.PEM,
serialization.PublicFormat.SubjectPublicKeyInfo,
)
def _get_public_key_object(self):
return self.cert.public_key()
def _get_subject_key_identifier(self):
try:
ext = self.cert.extensions.get_extension_for_class(
x509.SubjectKeyIdentifier
)
return ext.value.digest
except cryptography.x509.ExtensionNotFound:
return None
def _get_authority_key_identifier(self):
try:
ext = self.cert.extensions.get_extension_for_class(
x509.AuthorityKeyIdentifier
)
issuer = None
if ext.value.authority_cert_issuer is not None:
issuer = [
cryptography_decode_name(san, idn_rewrite=self.name_encoding)
for san in ext.value.authority_cert_issuer
]
return (
ext.value.key_identifier,
issuer,
ext.value.authority_cert_serial_number,
)
except cryptography.x509.ExtensionNotFound:
return None, None, None
def _get_serial_number(self):
return cryptography_serial_number_of_cert(self.cert)
def _get_all_extensions(self):
return cryptography_get_extensions_from_cert(self.cert)
def _get_ocsp_uri(self):
try:
ext = self.cert.extensions.get_extension_for_class(
x509.AuthorityInformationAccess
)
for desc in ext.value:
if desc.access_method == x509.oid.AuthorityInformationAccessOID.OCSP:
if isinstance(desc.access_location, x509.UniformResourceIdentifier):
return desc.access_location.value
except x509.ExtensionNotFound:
pass
return None
def _get_issuer_uri(self):
try:
ext = self.cert.extensions.get_extension_for_class(
x509.AuthorityInformationAccess
)
for desc in ext.value:
if (
desc.access_method
== x509.oid.AuthorityInformationAccessOID.CA_ISSUERS
):
if isinstance(desc.access_location, x509.UniformResourceIdentifier):
return desc.access_location.value
except x509.ExtensionNotFound:
pass
return None
def get_certificate_info(module, backend, content, prefer_one_fingerprint=False):
if backend == "cryptography":
info = CertificateInfoRetrievalCryptography(module, content)
return info.get_info(prefer_one_fingerprint=prefer_one_fingerprint)
def select_backend(module, backend, content):
if backend == "auto":
# Detection what is possible
can_use_cryptography = (
CRYPTOGRAPHY_FOUND
and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION)
)
# Try cryptography
if can_use_cryptography:
backend = "cryptography"
# Success?
if backend == "auto":
module.fail_json(
msg=f"Cannot detect any of the required Python libraries cryptography (>= {MINIMAL_CRYPTOGRAPHY_VERSION})"
)
if backend == "cryptography":
if not CRYPTOGRAPHY_FOUND:
module.fail_json(
msg=missing_required_lib(
f"cryptography >= {MINIMAL_CRYPTOGRAPHY_VERSION}"
),
exception=CRYPTOGRAPHY_IMP_ERR,
)
return backend, CertificateInfoRetrievalCryptography(module, content)
else:
raise ValueError(f"Unsupported value for backend: {backend}")