mirror of
https://github.com/ansible-collections/community.crypto.git
synced 2026-05-06 05:12:54 +00:00
Remove PyOpenSSL backends (except for openssl_pkcs12) (#273)
* Remove Ubuntu 16.04 (Xenial Xerus) from CI. * Removing PyOpenSSL backend from everywhere but openssl_pkcs12. * Remove PyOpenSSL support from module_utils that's not needed for openssl_pkcs12. * Add changelog fragment.
This commit is contained in:
@@ -25,10 +25,6 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptograp
|
||||
cryptography_parse_key_usage_params,
|
||||
)
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.pyopenssl_support import (
|
||||
pyopenssl_normalize_name_attribute,
|
||||
)
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.certificate import (
|
||||
CertificateBackend,
|
||||
CertificateProvider,
|
||||
@@ -460,164 +456,6 @@ class AssertOnlyCertificateBackendCryptography(AssertOnlyCertificateBackend):
|
||||
return self.existing_certificate.not_valid_before, valid_in_date, self.existing_certificate.not_valid_after
|
||||
|
||||
|
||||
class AssertOnlyCertificateBackendPyOpenSSL(AssertOnlyCertificateBackend):
|
||||
"""validate the supplied certificate."""
|
||||
|
||||
def __init__(self, module):
|
||||
super(AssertOnlyCertificateBackendPyOpenSSL, self).__init__(module, 'pyopenssl')
|
||||
|
||||
# Ensure inputs are properly sanitized before comparison.
|
||||
for param in ['signature_algorithms', 'key_usage', 'extended_key_usage',
|
||||
'subject_alt_name', 'subject', 'issuer', 'not_before',
|
||||
'not_after', 'valid_at', 'invalid_at']:
|
||||
attr = getattr(self, param)
|
||||
if isinstance(attr, list) and attr:
|
||||
if isinstance(attr[0], str):
|
||||
setattr(self, param, [to_bytes(item) for item in attr])
|
||||
elif isinstance(attr[0], tuple):
|
||||
setattr(self, param, [(to_bytes(item[0]), to_bytes(item[1])) for item in attr])
|
||||
elif isinstance(attr, tuple):
|
||||
setattr(self, param, dict((to_bytes(k), to_bytes(v)) for (k, v) in attr.items()))
|
||||
elif isinstance(attr, dict):
|
||||
setattr(self, param, dict((to_bytes(k), to_bytes(v)) for (k, v) in attr.items()))
|
||||
elif isinstance(attr, str):
|
||||
setattr(self, param, to_bytes(attr))
|
||||
|
||||
def _validate_privatekey(self):
|
||||
ctx = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_2_METHOD)
|
||||
ctx.use_privatekey(self.privatekey)
|
||||
ctx.use_certificate(self.existing_certificate)
|
||||
try:
|
||||
ctx.check_privatekey()
|
||||
return True
|
||||
except OpenSSL.SSL.Error:
|
||||
return False
|
||||
|
||||
def _validate_csr_signature(self):
|
||||
try:
|
||||
self.csr.verify(self.existing_certificate.get_pubkey())
|
||||
except OpenSSL.crypto.Error:
|
||||
return False
|
||||
|
||||
def _validate_csr_subject(self):
|
||||
if self.csr.get_subject() != self.existing_certificate.get_subject():
|
||||
return False
|
||||
|
||||
def _validate_csr_extensions(self):
|
||||
csr_extensions = self.csr.get_extensions()
|
||||
cert_extension_count = self.existing_certificate.get_extension_count()
|
||||
if len(csr_extensions) != cert_extension_count:
|
||||
return False
|
||||
for extension_number in range(0, cert_extension_count):
|
||||
cert_extension = self.existing_certificate.get_extension(extension_number)
|
||||
csr_extension = filter(lambda extension: extension.get_short_name() == cert_extension.get_short_name(), csr_extensions)
|
||||
if cert_extension.get_data() != list(csr_extension)[0].get_data():
|
||||
return False
|
||||
return True
|
||||
|
||||
def _validate_signature_algorithms(self):
|
||||
if self.existing_certificate.get_signature_algorithm() not in self.signature_algorithms:
|
||||
return self.existing_certificate.get_signature_algorithm()
|
||||
|
||||
def _validate_subject(self):
|
||||
expected_subject = [(OpenSSL._util.lib.OBJ_txt2nid(sub[0]), sub[1]) for sub in self.subject]
|
||||
cert_subject = self.existing_certificate.get_subject().get_components()
|
||||
current_subject = [(OpenSSL._util.lib.OBJ_txt2nid(sub[0]), sub[1]) for sub in cert_subject]
|
||||
if not compare_sets(expected_subject, current_subject, self.subject_strict):
|
||||
return expected_subject, current_subject
|
||||
|
||||
def _validate_issuer(self):
|
||||
expected_issuer = [(OpenSSL._util.lib.OBJ_txt2nid(iss[0]), iss[1]) for iss in self.issuer]
|
||||
cert_issuer = self.existing_certificate.get_issuer().get_components()
|
||||
current_issuer = [(OpenSSL._util.lib.OBJ_txt2nid(iss[0]), iss[1]) for iss in cert_issuer]
|
||||
if not compare_sets(expected_issuer, current_issuer, self.issuer_strict):
|
||||
return self.issuer, cert_issuer
|
||||
|
||||
def _validate_has_expired(self):
|
||||
# The following 3 lines are the same as the current PyOpenSSL code for cert.has_expired().
|
||||
# Older version of PyOpenSSL have a buggy implementation,
|
||||
# to avoid issues with those we added the code from a more recent release here.
|
||||
|
||||
time_string = to_native(self.existing_certificate.get_notAfter())
|
||||
not_after = datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ")
|
||||
cert_expired = not_after < datetime.datetime.utcnow()
|
||||
return cert_expired
|
||||
|
||||
def _validate_version(self):
|
||||
# Version numbers in certs are off by one:
|
||||
# v1: 0, v2: 1, v3: 2 ...
|
||||
return self.existing_certificate.get_version() + 1
|
||||
|
||||
def _validate_key_usage(self):
|
||||
found = False
|
||||
for extension_idx in range(0, self.existing_certificate.get_extension_count()):
|
||||
extension = self.existing_certificate.get_extension(extension_idx)
|
||||
if extension.get_short_name() == b'keyUsage':
|
||||
found = True
|
||||
expected_extension = crypto.X509Extension(b"keyUsage", False, b', '.join(self.key_usage))
|
||||
key_usage = [usage.strip() for usage in to_text(expected_extension, errors='surrogate_or_strict').split(',')]
|
||||
current_ku = [usage.strip() for usage in to_text(extension, errors='surrogate_or_strict').split(',')]
|
||||
if not compare_sets(key_usage, current_ku, self.key_usage_strict):
|
||||
return self.key_usage, str(extension).split(', ')
|
||||
if not found:
|
||||
# This is only bad if the user specified a non-empty list
|
||||
if self.key_usage:
|
||||
return NO_EXTENSION
|
||||
|
||||
def _validate_extended_key_usage(self):
|
||||
found = False
|
||||
for extension_idx in range(0, self.existing_certificate.get_extension_count()):
|
||||
extension = self.existing_certificate.get_extension(extension_idx)
|
||||
if extension.get_short_name() == b'extendedKeyUsage':
|
||||
found = True
|
||||
extKeyUsage = [OpenSSL._util.lib.OBJ_txt2nid(keyUsage) for keyUsage in self.extended_key_usage]
|
||||
current_xku = [OpenSSL._util.lib.OBJ_txt2nid(usage.strip()) for usage in
|
||||
to_bytes(extension, errors='surrogate_or_strict').split(b',')]
|
||||
if not compare_sets(extKeyUsage, current_xku, self.extended_key_usage_strict):
|
||||
return self.extended_key_usage, str(extension).split(', ')
|
||||
if not found:
|
||||
# This is only bad if the user specified a non-empty list
|
||||
if self.extended_key_usage:
|
||||
return NO_EXTENSION
|
||||
|
||||
def _validate_subject_alt_name(self):
|
||||
found = False
|
||||
for extension_idx in range(0, self.existing_certificate.get_extension_count()):
|
||||
extension = self.existing_certificate.get_extension(extension_idx)
|
||||
if extension.get_short_name() == b'subjectAltName':
|
||||
found = True
|
||||
l_altnames = [pyopenssl_normalize_name_attribute(altname.strip()) for altname in
|
||||
to_text(extension, errors='surrogate_or_strict').split(', ')]
|
||||
sans = [pyopenssl_normalize_name_attribute(to_text(san, errors='surrogate_or_strict')) for san in self.subject_alt_name]
|
||||
if not compare_sets(sans, l_altnames, self.subject_alt_name_strict):
|
||||
return self.subject_alt_name, l_altnames
|
||||
if not found:
|
||||
# This is only bad if the user specified a non-empty list
|
||||
if self.subject_alt_name:
|
||||
return NO_EXTENSION
|
||||
|
||||
def _validate_not_before(self):
|
||||
return self.existing_certificate.get_notBefore()
|
||||
|
||||
def _validate_not_after(self):
|
||||
return self.existing_certificate.get_notAfter()
|
||||
|
||||
def _validate_valid_at(self):
|
||||
rt = get_relative_time_option(self.valid_at, "valid_at", backend=self.backend)
|
||||
rt = to_bytes(rt, errors='surrogate_or_strict')
|
||||
return self.existing_certificate.get_notBefore(), rt, self.existing_certificate.get_notAfter()
|
||||
|
||||
def _validate_invalid_at(self):
|
||||
rt = get_relative_time_option(self.invalid_at, "invalid_at", backend=self.backend)
|
||||
rt = to_bytes(rt, errors='surrogate_or_strict')
|
||||
return self.existing_certificate.get_notBefore(), rt, self.existing_certificate.get_notAfter()
|
||||
|
||||
def _validate_valid_in(self):
|
||||
valid_in_asn1 = get_relative_time_option(self.valid_in, "valid_in", backend=self.backend)
|
||||
valid_in_date = to_bytes(valid_in_asn1, errors='surrogate_or_strict')
|
||||
return self.existing_certificate.get_notBefore(), valid_in_date, self.existing_certificate.get_notAfter()
|
||||
|
||||
|
||||
class AssertOnlyCertificateProvider(CertificateProvider):
|
||||
def validate_module_args(self, module):
|
||||
module.deprecate("The 'assertonly' provider is deprecated; please see the examples of "
|
||||
@@ -630,8 +468,6 @@ class AssertOnlyCertificateProvider(CertificateProvider):
|
||||
def create_backend(self, module, backend):
|
||||
if backend == 'cryptography':
|
||||
return AssertOnlyCertificateBackendCryptography(module)
|
||||
if backend == 'pyopenssl':
|
||||
return AssertOnlyCertificateBackendPyOpenSSL(module)
|
||||
|
||||
|
||||
def add_assertonly_provider_to_argument_spec(argument_spec):
|
||||
|
||||
Reference in New Issue
Block a user