From 645b7bf9ed4d2c754292480166b8f8e4ea58d794 Mon Sep 17 00:00:00 2001 From: Felix Fontein Date: Sat, 3 May 2025 10:46:53 +0200 Subject: [PATCH] Get rid of backend parameter whenever possible (#883) * Get rid of backend parameter whenever possible. * Always auto-detect if backend choices are 'cryptography' and 'auto', resp. always check cryptography version. * Improve error message. * Update documentation. --- changelogs/fragments/883-backend.yml | 5 + plugins/action/openssl_privatekey_pipe.py | 5 +- plugins/doc_fragments/module_certificate.py | 3 + plugins/doc_fragments/module_csr.py | 3 + plugins/doc_fragments/module_privatekey.py | 3 + plugins/filter/openssl_csr_info.py | 4 +- plugins/filter/openssl_privatekey_info.py | 1 - plugins/filter/openssl_publickey_info.py | 2 +- plugins/filter/x509_certificate_info.py | 2 +- plugins/module_utils/acme/backends.py | 2 +- .../crypto/module_backends/certificate.py | 183 +++++++----------- .../module_backends/certificate_acme.py | 8 +- .../module_backends/certificate_entrust.py | 42 ++-- .../module_backends/certificate_info.py | 60 +----- .../module_backends/certificate_ownca.py | 27 +-- .../module_backends/certificate_selfsigned.py | 11 +- .../crypto/module_backends/crl_info.py | 23 +-- .../crypto/module_backends/csr.py | 74 ++----- .../crypto/module_backends/csr_info.py | 68 ++----- .../crypto/module_backends/privatekey.py | 58 +----- .../module_backends/privatekey_convert.py | 26 +-- .../crypto/module_backends/privatekey_info.py | 76 ++------ .../crypto/module_backends/publickey_info.py | 62 +----- plugins/module_utils/crypto/support.py | 95 ++++----- plugins/module_utils/cryptography_dep.py | 47 +++++ .../openssh/backends/keypair_backend.py | 17 +- plugins/module_utils/openssh/cryptography.py | 9 +- plugins/module_utils/time.py | 57 +++--- plugins/modules/acme_challenge_cert_helper.py | 31 +-- plugins/modules/certificate_complete_chain.py | 24 +-- plugins/modules/ecs_certificate.py | 28 +-- plugins/modules/get_certificate.py | 130 +++++-------- plugins/modules/openssh_keypair.py | 2 +- plugins/modules/openssl_csr.py | 3 +- plugins/modules/openssl_csr_info.py | 7 +- plugins/modules/openssl_csr_pipe.py | 3 +- plugins/modules/openssl_dhparam.py | 14 +- plugins/modules/openssl_pkcs12.py | 75 ++----- plugins/modules/openssl_privatekey.py | 5 +- plugins/modules/openssl_privatekey_info.py | 6 +- plugins/modules/openssl_publickey.py | 114 ++++------- plugins/modules/openssl_publickey_info.py | 7 +- plugins/modules/openssl_signature.py | 49 ++--- plugins/modules/openssl_signature_info.py | 49 ++--- plugins/modules/x509_certificate.py | 3 +- plugins/modules/x509_certificate_convert.py | 18 +- plugins/modules/x509_certificate_info.py | 7 +- plugins/modules/x509_certificate_pipe.py | 3 +- plugins/modules/x509_crl.py | 27 +-- tests/unit/plugins/module_utils/test_time.py | 17 +- 50 files changed, 502 insertions(+), 1093 deletions(-) create mode 100644 changelogs/fragments/883-backend.yml diff --git a/changelogs/fragments/883-backend.yml b/changelogs/fragments/883-backend.yml new file mode 100644 index 00000000..3e65b06e --- /dev/null +++ b/changelogs/fragments/883-backend.yml @@ -0,0 +1,5 @@ +minor_changes: + - "Remove ``backend`` parameter from internal code whenever possible (https://github.com/ansible-collections/community.crypto/pull/883)." +breaking_changes: + - "Ignore value of ``select_crypto_backend`` for all modules except acme_* and ..., and always assume the value ``auto``. + This ensures that the ``cryptography`` version is always checked (https://github.com/ansible-collections/community.crypto/pull/883)." diff --git a/plugins/action/openssl_privatekey_pipe.py b/plugins/action/openssl_privatekey_pipe.py index 047ded38..1d84d50e 100644 --- a/plugins/action/openssl_privatekey_pipe.py +++ b/plugins/action/openssl_privatekey_pipe.py @@ -79,10 +79,7 @@ class ActionModule(ActionModuleBase): @staticmethod def run_module(module): - backend, module_backend = select_backend( - module=module, - backend=module.params["select_crypto_backend"], - ) + module_backend = select_backend(module=module) try: private_key = PrivateKeyModule(module, module_backend) diff --git a/plugins/doc_fragments/module_certificate.py b/plugins/doc_fragments/module_certificate.py index 21ec1f5f..1a4d769e 100644 --- a/plugins/doc_fragments/module_certificate.py +++ b/plugins/doc_fragments/module_certificate.py @@ -71,6 +71,9 @@ options: - Determines which crypto backend to use. - The default choice is V(auto), which tries to use C(cryptography) if available. - If set to V(cryptography), will try to use the L(cryptography,https://cryptography.io/) library. + - Note that with community.crypto 3.0.0, all values behave the same. + This option will be deprecated in a later version. + We recommend to not set it explicitly. type: str default: auto choices: [auto, cryptography] diff --git a/plugins/doc_fragments/module_csr.py b/plugins/doc_fragments/module_csr.py index ba3763fa..dbd0c9a2 100644 --- a/plugins/doc_fragments/module_csr.py +++ b/plugins/doc_fragments/module_csr.py @@ -229,6 +229,9 @@ options: - Determines which crypto backend to use. - The default choice is V(auto), which tries to use C(cryptography) if available. - If set to V(cryptography), will try to use the L(cryptography,https://cryptography.io/) library. + - Note that with community.crypto 3.0.0, all values behave the same. + This option will be deprecated in a later version. + We recommend to not set it explicitly. type: str default: auto choices: [auto, cryptography] diff --git a/plugins/doc_fragments/module_privatekey.py b/plugins/doc_fragments/module_privatekey.py index 65030271..caf8d802 100644 --- a/plugins/doc_fragments/module_privatekey.py +++ b/plugins/doc_fragments/module_privatekey.py @@ -78,6 +78,9 @@ options: - Determines which crypto backend to use. - The default choice is V(auto), which tries to use C(cryptography) if available. - If set to V(cryptography), will try to use the L(cryptography,https://cryptography.io/) library. + - Note that with community.crypto 3.0.0, all values behave the same. + This option will be deprecated in a later version. + We recommend to not set it explicitly. type: str default: auto choices: [auto, cryptography] diff --git a/plugins/filter/openssl_csr_info.py b/plugins/filter/openssl_csr_info.py index 1bec6944..d0b4b21d 100644 --- a/plugins/filter/openssl_csr_info.py +++ b/plugins/filter/openssl_csr_info.py @@ -305,9 +305,7 @@ def openssl_csr_info_filter(data, name_encoding="ignore"): module = FilterModuleMock({"name_encoding": name_encoding}) try: - return get_csr_info( - module, "cryptography", content=to_bytes(data), validate_signature=True - ) + return get_csr_info(module, content=to_bytes(data), validate_signature=True) except OpenSSLObjectError as exc: raise AnsibleFilterError(str(exc)) diff --git a/plugins/filter/openssl_privatekey_info.py b/plugins/filter/openssl_privatekey_info.py index 5322cf1d..da6d8ce1 100644 --- a/plugins/filter/openssl_privatekey_info.py +++ b/plugins/filter/openssl_privatekey_info.py @@ -181,7 +181,6 @@ def openssl_privatekey_info_filter( try: result = get_privatekey_info( module, - "cryptography", content=to_bytes(data), passphrase=passphrase, return_private_key_data=return_private_key_data, diff --git a/plugins/filter/openssl_publickey_info.py b/plugins/filter/openssl_publickey_info.py index 88d26edf..037ec7b3 100644 --- a/plugins/filter/openssl_publickey_info.py +++ b/plugins/filter/openssl_publickey_info.py @@ -146,7 +146,7 @@ def openssl_publickey_info_filter(data): module = FilterModuleMock({}) try: - return get_publickey_info(module, "cryptography", content=to_bytes(data)) + return get_publickey_info(module, content=to_bytes(data)) except PublicKeyParseError as exc: raise AnsibleFilterError(exc.error_message) except OpenSSLObjectError as exc: diff --git a/plugins/filter/x509_certificate_info.py b/plugins/filter/x509_certificate_info.py index 2c188d63..82a3757d 100644 --- a/plugins/filter/x509_certificate_info.py +++ b/plugins/filter/x509_certificate_info.py @@ -339,7 +339,7 @@ def x509_certificate_info_filter(data, name_encoding="ignore"): module = FilterModuleMock({"name_encoding": name_encoding}) try: - return get_certificate_info(module, "cryptography", content=to_bytes(data)) + return get_certificate_info(module, content=to_bytes(data)) except OpenSSLObjectError as exc: raise AnsibleFilterError(str(exc)) diff --git a/plugins/module_utils/acme/backends.py b/plugins/module_utils/acme/backends.py index 20a4bf75..9b3061f4 100644 --- a/plugins/module_utils/acme/backends.py +++ b/plugins/module_utils/acme/backends.py @@ -102,7 +102,7 @@ class CryptoBackend: def parse_module_parameter(self, value, name): try: return get_relative_time_option( - value, name, backend="cryptography", with_timezone=self._with_timezone + value, name, with_timezone=self._with_timezone ) except OpenSSLObjectError as exc: raise BackendException(str(exc)) diff --git a/plugins/module_utils/crypto/module_backends/certificate.py b/plugins/module_utils/crypto/module_backends/certificate.py index 0304de50..3950b394 100644 --- a/plugins/module_utils/crypto/module_backends/certificate.py +++ b/plugins/module_utils/crypto/module_backends/certificate.py @@ -6,10 +6,8 @@ from __future__ import annotations import abc -import traceback from ansible.module_utils import six -from ansible.module_utils.basic import missing_required_lib from ansible_collections.community.crypto.plugins.module_utils.argspec import ( ArgumentSpec, ) @@ -32,26 +30,17 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im ) from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import ( COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION, -) -from ansible_collections.community.crypto.plugins.module_utils.version import ( - LooseVersion, + assert_required_cryptography_version, ) MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION -CRYPTOGRAPHY_IMP_ERR = None -CRYPTOGRAPHY_VERSION = None try: import cryptography from cryptography import x509 - - CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) except ImportError: - CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() - CRYPTOGRAPHY_FOUND = False -else: - CRYPTOGRAPHY_FOUND = True + pass class CertificateError(OpenSSLObjectError): @@ -60,9 +49,8 @@ class CertificateError(OpenSSLObjectError): @six.add_metaclass(abc.ABCMeta) class CertificateBackend: - def __init__(self, module, backend): + def __init__(self, module): self.module = module - self.backend = backend self.force = module.params["force"] self.ignore_timestamps = module.params["ignore_timestamps"] @@ -98,7 +86,7 @@ class CertificateBackend: return dict() try: result = get_certificate_info( - self.module, self.backend, data, prefer_one_fingerprint=True + self.module, data, prefer_one_fingerprint=True ) result["can_parse_certificate"] = True return result @@ -137,7 +125,6 @@ class CertificateBackend: path=self.privatekey_path, content=self.privatekey_content, passphrase=self.privatekey_passphrase, - backend=self.backend, ) except OpenSSLBadPassphraseError as exc: raise CertificateError(exc) @@ -151,7 +138,6 @@ class CertificateBackend: self.csr = load_certificate_request( path=self.csr_path, content=self.csr_content, - backend=self.backend, ) def _ensure_existing_certificate_loaded(self): @@ -163,75 +149,72 @@ class CertificateBackend: self.existing_certificate = load_certificate( path=None, content=self.existing_certificate_bytes, - backend=self.backend, ) def _check_privatekey(self): """Check whether provided parameters match, assuming self.existing_certificate and self.privatekey have been populated.""" - if self.backend == "cryptography": - return cryptography_compare_public_keys( - self.existing_certificate.public_key(), self.privatekey.public_key() - ) + return cryptography_compare_public_keys( + self.existing_certificate.public_key(), self.privatekey.public_key() + ) def _check_csr(self): """Check whether provided parameters match, assuming self.existing_certificate and self.csr have been populated.""" - if self.backend == "cryptography": - # Verify that CSR is signed by certificate's private key - if not self.csr.is_signature_valid: - return False - if not cryptography_compare_public_keys( - self.csr.public_key(), self.existing_certificate.public_key() - ): - return False - # Check subject - if ( - self.check_csr_subject - and self.csr.subject != self.existing_certificate.subject - ): - return False - # Check extensions - if not self.check_csr_extensions: - return True - cert_exts = list(self.existing_certificate.extensions) - csr_exts = list(self.csr.extensions) - if self.create_subject_key_identifier != "never_create": - # Filter out SubjectKeyIdentifier extension before comparison - cert_exts = list( - filter( - lambda x: not isinstance(x.value, x509.SubjectKeyIdentifier), - cert_exts, - ) - ) - csr_exts = list( - filter( - lambda x: not isinstance(x.value, x509.SubjectKeyIdentifier), - csr_exts, - ) - ) - if self.create_authority_key_identifier: - # Filter out AuthorityKeyIdentifier extension before comparison - cert_exts = list( - filter( - lambda x: not isinstance(x.value, x509.AuthorityKeyIdentifier), - cert_exts, - ) - ) - csr_exts = list( - filter( - lambda x: not isinstance(x.value, x509.AuthorityKeyIdentifier), - csr_exts, - ) - ) - if len(cert_exts) != len(csr_exts): - return False - for cert_ext in cert_exts: - try: - csr_ext = self.csr.extensions.get_extension_for_oid(cert_ext.oid) - if cert_ext != csr_ext: - return False - except cryptography.x509.ExtensionNotFound: - return False + # Verify that CSR is signed by certificate's private key + if not self.csr.is_signature_valid: + return False + if not cryptography_compare_public_keys( + self.csr.public_key(), self.existing_certificate.public_key() + ): + return False + # Check subject + if ( + self.check_csr_subject + and self.csr.subject != self.existing_certificate.subject + ): + return False + # Check extensions + if not self.check_csr_extensions: return True + cert_exts = list(self.existing_certificate.extensions) + csr_exts = list(self.csr.extensions) + if self.create_subject_key_identifier != "never_create": + # Filter out SubjectKeyIdentifier extension before comparison + cert_exts = list( + filter( + lambda x: not isinstance(x.value, x509.SubjectKeyIdentifier), + cert_exts, + ) + ) + csr_exts = list( + filter( + lambda x: not isinstance(x.value, x509.SubjectKeyIdentifier), + csr_exts, + ) + ) + if self.create_authority_key_identifier: + # Filter out AuthorityKeyIdentifier extension before comparison + cert_exts = list( + filter( + lambda x: not isinstance(x.value, x509.AuthorityKeyIdentifier), + cert_exts, + ) + ) + csr_exts = list( + filter( + lambda x: not isinstance(x.value, x509.AuthorityKeyIdentifier), + csr_exts, + ) + ) + if len(cert_exts) != len(csr_exts): + return False + for cert_ext in cert_exts: + try: + csr_ext = self.csr.extensions.get_extension_for_oid(cert_ext.oid) + if cert_ext != csr_ext: + return False + except cryptography.x509.ExtensionNotFound: + return False + return True def _check_subject_key_identifier(self): """Check whether Subject Key Identifier matches, assuming self.existing_certificate has been populated.""" @@ -336,53 +319,29 @@ class CertificateProvider: """Whether the provider needs to create a version 2 certificate.""" @abc.abstractmethod - def create_backend(self, module, backend): + def create_backend(self, module): """Create an implementation for a backend. Return value must be instance of CertificateBackend. """ -def select_backend(module, backend, provider): +def select_backend(module, provider): """ :type module: AnsibleModule - :type backend: str :type provider: CertificateProvider """ provider.validate_module_args(module) - backend = module.params["select_crypto_backend"] - if backend == "auto": - # Detect what backend we can use - can_use_cryptography = ( - CRYPTOGRAPHY_FOUND - and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION) + assert_required_cryptography_version(MINIMAL_CRYPTOGRAPHY_VERSION) + + if provider.needs_version_two_certs(module): + # TODO: remove + module.fail_json( + msg="The cryptography backend does not support v2 certificates" ) - # If cryptography is available we'll use it - if can_use_cryptography: - backend = "cryptography" - - # Fail if no backend has been found - if backend == "auto": - module.fail_json( - msg=f"Cannot detect the required Python library cryptography (>= {MINIMAL_CRYPTOGRAPHY_VERSION})" - ) - - if backend == "cryptography": - if not CRYPTOGRAPHY_FOUND: - module.fail_json( - msg=missing_required_lib( - f"cryptography >= {MINIMAL_CRYPTOGRAPHY_VERSION}" - ), - exception=CRYPTOGRAPHY_IMP_ERR, - ) - if provider.needs_version_two_certs(module): - module.fail_json( - msg="The cryptography backend does not support v2 certificates" - ) - - return provider.create_backend(module, backend) + return provider.create_backend(module) def get_certificate_argument_spec(): diff --git a/plugins/module_utils/crypto/module_backends/certificate_acme.py b/plugins/module_utils/crypto/module_backends/certificate_acme.py index 0608ad1e..3ba150f4 100644 --- a/plugins/module_utils/crypto/module_backends/certificate_acme.py +++ b/plugins/module_utils/crypto/module_backends/certificate_acme.py @@ -18,8 +18,8 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.module_bac class AcmeCertificateBackend(CertificateBackend): - def __init__(self, module, backend): - super(AcmeCertificateBackend, self).__init__(module, backend) + def __init__(self, module): + super(AcmeCertificateBackend, self).__init__(module) self.accountkey_path = module.params["acme_accountkey_path"] self.challenge_path = module.params["acme_challenge_path"] self.use_chain = module.params["acme_chain"] @@ -105,8 +105,8 @@ class AcmeCertificateProvider(CertificateProvider): def needs_version_two_certs(self, module): return False - def create_backend(self, module, backend): - return AcmeCertificateBackend(module, backend) + def create_backend(self, module): + return AcmeCertificateBackend(module) def add_acme_provider_to_argument_spec(argument_spec): diff --git a/plugins/module_utils/crypto/module_backends/certificate_entrust.py b/plugins/module_utils/crypto/module_backends/certificate_entrust.py index 0fc0fc0b..22779797 100644 --- a/plugins/module_utils/crypto/module_backends/certificate_entrust.py +++ b/plugins/module_utils/crypto/module_backends/certificate_entrust.py @@ -39,13 +39,12 @@ except ImportError: class EntrustCertificateBackend(CertificateBackend): - def __init__(self, module, backend): - super(EntrustCertificateBackend, self).__init__(module, backend) + def __init__(self, module): + super(EntrustCertificateBackend, self).__init__(module) self.trackingId = None self.notAfter = get_relative_time_option( module.params["entrust_not_after"], "entrust_not_after", - backend=self.backend, with_timezone=CRYPTOGRAPHY_TIMEZONE, ) @@ -64,19 +63,18 @@ class EntrustCertificateBackend(CertificateBackend): # We want to always force behavior of trying to use the organization provided in the CSR. # To that end we need to parse out the organization from the CSR. self.csr_org = None - if self.backend == "cryptography": - csr_subject_orgs = self.csr.subject.get_attributes_for_oid( - NameOID.ORGANIZATION_NAME - ) - if len(csr_subject_orgs) == 1: - self.csr_org = csr_subject_orgs[0].value - elif len(csr_subject_orgs) > 1: - self.module.fail_json( - msg=( - "Entrust provider does not currently support multiple validated organizations. Multiple organizations found in " - f"Subject DN: '{self.csr.subject}'. " - ) + csr_subject_orgs = self.csr.subject.get_attributes_for_oid( + NameOID.ORGANIZATION_NAME + ) + if len(csr_subject_orgs) == 1: + self.csr_org = csr_subject_orgs[0].value + elif len(csr_subject_orgs) > 1: + self.module.fail_json( + msg=( + "Entrust provider does not currently support multiple validated organizations. Multiple organizations found in " + f"Subject DN: '{self.csr.subject}'. " ) + ) # If no organization in the CSR, explicitly tell ECS that it should be blank in issued cert, not defaulted to # organization tied to the account. if self.csr_org is None: @@ -136,7 +134,8 @@ class EntrustCertificateBackend(CertificateBackend): self.cert_bytes = to_bytes(result.get("endEntityCert")) self.cert = load_certificate( - path=None, content=self.cert_bytes, backend=self.backend + path=None, + content=self.cert_bytes, ) def get_certificate_data(self): @@ -175,11 +174,8 @@ class EntrustCertificateBackend(CertificateBackend): except Exception: return if self.existing_certificate: - serial_number = None - expiry = None - if self.backend == "cryptography": - serial_number = f"{self.existing_certificate.serial_number:X}" - expiry = get_not_valid_after(self.existing_certificate) + serial_number = f"{self.existing_certificate.serial_number:X}" + expiry = get_not_valid_after(self.existing_certificate) # get some information about the expiry of this certificate expiry_iso3339 = expiry.strftime("%Y-%m-%dT%H:%M:%S.00Z") @@ -213,8 +209,8 @@ class EntrustCertificateProvider(CertificateProvider): def needs_version_two_certs(self, module): return False - def create_backend(self, module, backend): - return EntrustCertificateBackend(module, backend) + def create_backend(self, module): + return EntrustCertificateBackend(module) def add_entrust_provider_to_argument_spec(argument_spec): diff --git a/plugins/module_utils/crypto/module_backends/certificate_info.py b/plugins/module_utils/crypto/module_backends/certificate_info.py index 2734e97d..823b71a8 100644 --- a/plugins/module_utils/crypto/module_backends/certificate_info.py +++ b/plugins/module_utils/crypto/module_backends/certificate_info.py @@ -8,10 +8,8 @@ from __future__ import annotations import abc import binascii -import traceback from ansible.module_utils import six -from ansible.module_utils.basic import missing_required_lib from ansible.module_utils.common.text.converters import to_native from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( CRYPTOGRAPHY_TIMEZONE, @@ -30,29 +28,21 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im ) from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import ( COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION, + assert_required_cryptography_version, ) from ansible_collections.community.crypto.plugins.module_utils.time import ( get_now_datetime, ) -from ansible_collections.community.crypto.plugins.module_utils.version import ( - LooseVersion, -) MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION -CRYPTOGRAPHY_IMP_ERR = None try: import cryptography from cryptography import x509 from cryptography.hazmat.primitives import serialization - - CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) except ImportError: - CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() - CRYPTOGRAPHY_FOUND = False -else: - CRYPTOGRAPHY_FOUND = True + pass TIMESTAMP_FORMAT = "%Y%m%d%H%M%SZ" @@ -60,10 +50,9 @@ TIMESTAMP_FORMAT = "%Y%m%d%H%M%SZ" @six.add_metaclass(abc.ABCMeta) class CertificateInfoRetrieval: - def __init__(self, module, backend, content): + def __init__(self, module, content): # content must be a bytes string self.module = module - self.backend = backend self.content = content @abc.abstractmethod @@ -151,7 +140,6 @@ class CertificateInfoRetrieval: self.cert = load_certificate( None, content=self.content, - backend=self.backend, der_support_enabled=der_support_enabled, ) @@ -193,7 +181,6 @@ class CertificateInfoRetrieval: public_key_info = get_publickey_info( self.module, - self.backend, key=self._get_public_key_object(), prefer_one_fingerprint=prefer_one_fingerprint, ) @@ -235,9 +222,7 @@ class CertificateInfoRetrievalCryptography(CertificateInfoRetrieval): """Validate the supplied cert, using the cryptography backend""" def __init__(self, module, content): - super(CertificateInfoRetrievalCryptography, self).__init__( - module, "cryptography", content - ) + super(CertificateInfoRetrievalCryptography, self).__init__(module, content) self.name_encoding = module.params.get("name_encoding", "ignore") def _get_der_bytes(self): @@ -445,38 +430,11 @@ class CertificateInfoRetrievalCryptography(CertificateInfoRetrieval): return None -def get_certificate_info(module, backend, content, prefer_one_fingerprint=False): - if backend == "cryptography": - info = CertificateInfoRetrievalCryptography(module, content) +def get_certificate_info(module, content, prefer_one_fingerprint=False): + info = CertificateInfoRetrievalCryptography(module, content) return info.get_info(prefer_one_fingerprint=prefer_one_fingerprint) -def select_backend(module, backend, content): - if backend == "auto": - # Detection what is possible - can_use_cryptography = ( - CRYPTOGRAPHY_FOUND - and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION) - ) - - # Try cryptography - if can_use_cryptography: - backend = "cryptography" - - # Success? - if backend == "auto": - module.fail_json( - msg=f"Cannot detect any of the required Python libraries cryptography (>= {MINIMAL_CRYPTOGRAPHY_VERSION})" - ) - - if backend == "cryptography": - if not CRYPTOGRAPHY_FOUND: - module.fail_json( - msg=missing_required_lib( - f"cryptography >= {MINIMAL_CRYPTOGRAPHY_VERSION}" - ), - exception=CRYPTOGRAPHY_IMP_ERR, - ) - return backend, CertificateInfoRetrievalCryptography(module, content) - else: - raise ValueError(f"Unsupported value for backend: {backend}") +def select_backend(module, content): + assert_required_cryptography_version(MINIMAL_CRYPTOGRAPHY_VERSION) + return CertificateInfoRetrievalCryptography(module, content) diff --git a/plugins/module_utils/crypto/module_backends/certificate_ownca.py b/plugins/module_utils/crypto/module_backends/certificate_ownca.py index d289ff0d..b72fed70 100644 --- a/plugins/module_utils/crypto/module_backends/certificate_ownca.py +++ b/plugins/module_utils/crypto/module_backends/certificate_ownca.py @@ -22,7 +22,6 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptograp set_not_valid_before, ) from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.certificate import ( - CRYPTOGRAPHY_VERSION, CertificateBackend, CertificateError, CertificateProvider, @@ -35,9 +34,6 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im from ansible_collections.community.crypto.plugins.module_utils.time import ( get_relative_time_option, ) -from ansible_collections.community.crypto.plugins.module_utils.version import ( - LooseVersion, -) try: @@ -50,9 +46,7 @@ except ImportError: class OwnCACertificateBackendCryptography(CertificateBackend): def __init__(self, module): - super(OwnCACertificateBackendCryptography, self).__init__( - module, "cryptography" - ) + super(OwnCACertificateBackendCryptography, self).__init__(module) self.create_subject_key_identifier = module.params[ "ownca_create_subject_key_identifier" @@ -63,13 +57,11 @@ class OwnCACertificateBackendCryptography(CertificateBackend): self.notBefore = get_relative_time_option( module.params["ownca_not_before"], "ownca_not_before", - backend=self.backend, with_timezone=CRYPTOGRAPHY_TIMEZONE, ) self.notAfter = get_relative_time_option( module.params["ownca_not_after"], "ownca_not_after", - backend=self.backend, with_timezone=CRYPTOGRAPHY_TIMEZONE, ) self.digest = select_message_digest(module.params["ownca_digest"]) @@ -106,14 +98,14 @@ class OwnCACertificateBackendCryptography(CertificateBackend): self._ensure_csr_loaded() self.ca_cert = load_certificate( - path=self.ca_cert_path, content=self.ca_cert_content, backend=self.backend + path=self.ca_cert_path, + content=self.ca_cert_content, ) try: self.ca_private_key = load_privatekey( path=self.ca_privatekey_path, content=self.ca_privatekey_content, passphrase=self.ca_privatekey_passphrase, - backend=self.backend, ) except OpenSSLBadPassphraseError as exc: module.fail_json(msg=str(exc)) @@ -170,10 +162,6 @@ class OwnCACertificateBackendCryptography(CertificateBackend): x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier( ext.value ) - if CRYPTOGRAPHY_VERSION >= LooseVersion("2.7") - else x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier( - ext - ) ), critical=False, ) @@ -224,10 +212,6 @@ class OwnCACertificateBackendCryptography(CertificateBackend): x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier( ext.value ) - if CRYPTOGRAPHY_VERSION >= LooseVersion("2.7") - else x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier( - ext - ) ) except cryptography.x509.ExtensionNotFound: expected_ext = x509.AuthorityKeyIdentifier.from_issuer_public_key( @@ -310,9 +294,8 @@ class OwnCACertificateProvider(CertificateProvider): def needs_version_two_certs(self, module): return module.params["ownca_version"] == 2 - def create_backend(self, module, backend): - if backend == "cryptography": - return OwnCACertificateBackendCryptography(module) + def create_backend(self, module): + return OwnCACertificateBackendCryptography(module) def add_ownca_provider_to_argument_spec(argument_spec): diff --git a/plugins/module_utils/crypto/module_backends/certificate_selfsigned.py b/plugins/module_utils/crypto/module_backends/certificate_selfsigned.py index 95d7fc70..7714b58f 100644 --- a/plugins/module_utils/crypto/module_backends/certificate_selfsigned.py +++ b/plugins/module_utils/crypto/module_backends/certificate_selfsigned.py @@ -40,9 +40,7 @@ except ImportError: class SelfSignedCertificateBackendCryptography(CertificateBackend): def __init__(self, module): - super(SelfSignedCertificateBackendCryptography, self).__init__( - module, "cryptography" - ) + super(SelfSignedCertificateBackendCryptography, self).__init__(module) self.create_subject_key_identifier = module.params[ "selfsigned_create_subject_key_identifier" @@ -50,13 +48,11 @@ class SelfSignedCertificateBackendCryptography(CertificateBackend): self.notBefore = get_relative_time_option( module.params["selfsigned_not_before"], "selfsigned_not_before", - backend=self.backend, with_timezone=CRYPTOGRAPHY_TIMEZONE, ) self.notAfter = get_relative_time_option( module.params["selfsigned_not_after"], "selfsigned_not_after", - backend=self.backend, with_timezone=CRYPTOGRAPHY_TIMEZONE, ) self.digest = select_message_digest(module.params["selfsigned_digest"]) @@ -206,9 +202,8 @@ class SelfSignedCertificateProvider(CertificateProvider): def needs_version_two_certs(self, module): return module.params["selfsigned_version"] == 2 - def create_backend(self, module, backend): - if backend == "cryptography": - return SelfSignedCertificateBackendCryptography(module) + def create_backend(self, module): + return SelfSignedCertificateBackendCryptography(module) def add_selfsigned_provider_to_argument_spec(argument_spec): diff --git a/plugins/module_utils/crypto/module_backends/crl_info.py b/plugins/module_utils/crypto/module_backends/crl_info.py index 44b353ab..4fb3f546 100644 --- a/plugins/module_utils/crypto/module_backends/crl_info.py +++ b/plugins/module_utils/crypto/module_backends/crl_info.py @@ -4,9 +4,6 @@ from __future__ import annotations -import traceback - -from ansible.module_utils.basic import missing_required_lib from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_crl import ( TIMESTAMP_FORMAT, cryptography_decode_revoked_certificate, @@ -21,9 +18,7 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import ) from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import ( COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION, -) -from ansible_collections.community.crypto.plugins.module_utils.version import ( - LooseVersion, + assert_required_cryptography_version, ) @@ -31,17 +26,10 @@ from ansible_collections.community.crypto.plugins.module_utils.version import ( MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION -CRYPTOGRAPHY_IMP_ERR = None try: - import cryptography from cryptography import x509 - - CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) except ImportError: - CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() - CRYPTOGRAPHY_FOUND = False -else: - CRYPTOGRAPHY_FOUND = True + pass class CRLInfoRetrieval: @@ -96,12 +84,7 @@ class CRLInfoRetrieval: def get_crl_info(module, content, list_revoked_certificates=True): - if not CRYPTOGRAPHY_FOUND: - module.fail_json( - msg=missing_required_lib(f"cryptography >= {MINIMAL_CRYPTOGRAPHY_VERSION}"), - exception=CRYPTOGRAPHY_IMP_ERR, - ) - + assert_required_cryptography_version(MINIMAL_CRYPTOGRAPHY_VERSION) info = CRLInfoRetrieval( module, content, list_revoked_certificates=list_revoked_certificates ) diff --git a/plugins/module_utils/crypto/module_backends/csr.py b/plugins/module_utils/crypto/module_backends/csr.py index 99e835f4..d855971a 100644 --- a/plugins/module_utils/crypto/module_backends/csr.py +++ b/plugins/module_utils/crypto/module_backends/csr.py @@ -7,10 +7,8 @@ from __future__ import annotations import abc import binascii -import traceback from ansible.module_utils import six -from ansible.module_utils.basic import missing_required_lib from ansible.module_utils.common.text.converters import to_text from ansible_collections.community.crypto.plugins.module_utils.argspec import ( ArgumentSpec, @@ -42,15 +40,12 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im ) from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import ( COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION, -) -from ansible_collections.community.crypto.plugins.module_utils.version import ( - LooseVersion, + assert_required_cryptography_version, ) MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION -CRYPTOGRAPHY_IMP_ERR = None try: import cryptography import cryptography.exceptions @@ -59,17 +54,8 @@ try: import cryptography.hazmat.primitives.serialization import cryptography.x509 import cryptography.x509.oid - - CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) except ImportError: - CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() - CRYPTOGRAPHY_FOUND = False -else: - CRYPTOGRAPHY_FOUND = True - CRYPTOGRAPHY_MUST_STAPLE_NAME = cryptography.x509.oid.ObjectIdentifier( - "1.3.6.1.5.5.7.1.24" - ) - CRYPTOGRAPHY_MUST_STAPLE_VALUE = b"\x30\x03\x02\x01\x05" + pass class CertificateSigningRequestError(OpenSSLObjectError): @@ -85,9 +71,8 @@ class CertificateSigningRequestError(OpenSSLObjectError): @six.add_metaclass(abc.ABCMeta) class CertificateSigningRequestBackend: - def __init__(self, module, backend): + def __init__(self, module): self.module = module - self.backend = backend self.digest = module.params["digest"] self.privatekey_path = module.params["privatekey_path"] self.privatekey_content = module.params["privatekey_content"] @@ -202,7 +187,6 @@ class CertificateSigningRequestBackend: try: result = get_csr_info( self.module, - self.backend, data, validate_signature=False, prefer_one_fingerprint=True, @@ -240,7 +224,6 @@ class CertificateSigningRequestBackend: path=self.privatekey_path, content=self.privatekey_content, passphrase=self.privatekey_passphrase, - backend=self.backend, ) except OpenSSLBadPassphraseError as exc: raise CertificateSigningRequestError(exc) @@ -256,7 +239,8 @@ class CertificateSigningRequestBackend: return True try: self.existing_csr = load_certificate_request( - None, content=self.existing_csr_bytes, backend=self.backend + None, + content=self.existing_csr_bytes, ) except Exception: return True @@ -340,9 +324,7 @@ def parse_crl_distribution_points(module, crl_distribution_points): # Implementation with using cryptography class CertificateSigningRequestCryptographyBackend(CertificateSigningRequestBackend): def __init__(self, module): - super(CertificateSigningRequestCryptographyBackend, self).__init__( - module, "cryptography" - ) + super(CertificateSigningRequestCryptographyBackend, self).__init__(module) if self.version != 1: module.warn( "The cryptography backend only supports version 1. (The only valid value according to RFC 2986.)" @@ -613,20 +595,16 @@ class CertificateSigningRequestCryptographyBackend(CertificateSigningRequestBack def _check_ocspMustStaple(extensions): tlsfeature_ext = _find_extension(extensions, cryptography.x509.TLSFeature) - has_tlsfeature = True if self.ocspMustStaple: if ( not tlsfeature_ext or tlsfeature_ext.critical != self.ocspMustStaple_critical ): return False - if has_tlsfeature: - return ( - cryptography.x509.TLSFeatureType.status_request - in tlsfeature_ext.value - ) - else: - return tlsfeature_ext.value.value == CRYPTOGRAPHY_MUST_STAPLE_VALUE + return ( + cryptography.x509.TLSFeatureType.status_request + in tlsfeature_ext.value + ) else: return tlsfeature_ext is None @@ -756,35 +734,9 @@ class CertificateSigningRequestCryptographyBackend(CertificateSigningRequestBack ) -def select_backend(module, backend): - if backend == "auto": - # Detection what is possible - can_use_cryptography = ( - CRYPTOGRAPHY_FOUND - and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION) - ) - - # Try cryptography - if can_use_cryptography: - backend = "cryptography" - - # Success? - if backend == "auto": - module.fail_json( - msg=f"Cannot detect any of the required Python libraries cryptography (>= {MINIMAL_CRYPTOGRAPHY_VERSION})" - ) - - if backend == "cryptography": - if not CRYPTOGRAPHY_FOUND: - module.fail_json( - msg=missing_required_lib( - f"cryptography >= {MINIMAL_CRYPTOGRAPHY_VERSION}" - ), - exception=CRYPTOGRAPHY_IMP_ERR, - ) - return backend, CertificateSigningRequestCryptographyBackend(module) - else: - raise Exception(f"Unsupported value for backend: {backend}") +def select_backend(module): + assert_required_cryptography_version(MINIMAL_CRYPTOGRAPHY_VERSION) + return CertificateSigningRequestCryptographyBackend(module) def get_csr_argument_spec(): diff --git a/plugins/module_utils/crypto/module_backends/csr_info.py b/plugins/module_utils/crypto/module_backends/csr_info.py index 8242b01e..f1047cfc 100644 --- a/plugins/module_utils/crypto/module_backends/csr_info.py +++ b/plugins/module_utils/crypto/module_backends/csr_info.py @@ -8,10 +8,8 @@ from __future__ import annotations import abc import binascii -import traceback from ansible.module_utils import six -from ansible.module_utils.basic import missing_required_lib from ansible.module_utils.common.text.converters import to_native from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( cryptography_decode_name, @@ -26,26 +24,18 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im ) from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import ( COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION, -) -from ansible_collections.community.crypto.plugins.module_utils.version import ( - LooseVersion, + assert_required_cryptography_version, ) MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION -CRYPTOGRAPHY_IMP_ERR = None try: import cryptography from cryptography import x509 from cryptography.hazmat.primitives import serialization - - CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) except ImportError: - CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() - CRYPTOGRAPHY_FOUND = False -else: - CRYPTOGRAPHY_FOUND = True + pass TIMESTAMP_FORMAT = "%Y%m%d%H%M%SZ" @@ -53,10 +43,9 @@ TIMESTAMP_FORMAT = "%Y%m%d%H%M%SZ" @six.add_metaclass(abc.ABCMeta) class CSRInfoRetrieval: - def __init__(self, module, backend, content, validate_signature): + def __init__(self, module, content, validate_signature): # content must be a bytes string self.module = module - self.backend = backend self.content = content self.validate_signature = validate_signature @@ -115,7 +104,8 @@ class CSRInfoRetrieval: def get_info(self, prefer_one_fingerprint=False): result = dict() self.csr = load_certificate_request( - None, content=self.content, backend=self.backend + None, + content=self.content, ) subject = self._get_subject_ordered() @@ -146,7 +136,6 @@ class CSRInfoRetrieval: public_key_info = get_publickey_info( self.module, - self.backend, key=self._get_public_key_object(), prefer_one_fingerprint=prefer_one_fingerprint, ) @@ -185,7 +174,7 @@ class CSRInfoRetrievalCryptography(CSRInfoRetrieval): def __init__(self, module, content, validate_signature): super(CSRInfoRetrievalCryptography, self).__init__( - module, "cryptography", content, validate_signature + module, content, validate_signature ) self.name_encoding = module.params.get("name_encoding", "ignore") @@ -352,43 +341,16 @@ class CSRInfoRetrievalCryptography(CSRInfoRetrieval): def get_csr_info( - module, backend, content, validate_signature=True, prefer_one_fingerprint=False + module, content, validate_signature=True, prefer_one_fingerprint=False ): - if backend == "cryptography": - info = CSRInfoRetrievalCryptography( - module, content, validate_signature=validate_signature - ) + info = CSRInfoRetrievalCryptography( + module, content, validate_signature=validate_signature + ) return info.get_info(prefer_one_fingerprint=prefer_one_fingerprint) -def select_backend(module, backend, content, validate_signature=True): - if backend == "auto": - # Detection what is possible - can_use_cryptography = ( - CRYPTOGRAPHY_FOUND - and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION) - ) - - # Try cryptography - if can_use_cryptography: - backend = "cryptography" - - # Success? - if backend == "auto": - module.fail_json( - msg=f"Cannot detect the required Python library cryptography (>= {MINIMAL_CRYPTOGRAPHY_VERSION})" - ) - - if backend == "cryptography": - if not CRYPTOGRAPHY_FOUND: - module.fail_json( - msg=missing_required_lib( - f"cryptography >= {MINIMAL_CRYPTOGRAPHY_VERSION}" - ), - exception=CRYPTOGRAPHY_IMP_ERR, - ) - return backend, CSRInfoRetrievalCryptography( - module, content, validate_signature=validate_signature - ) - else: - raise ValueError(f"Unsupported value for backend: {backend}") +def select_backend(module, content, validate_signature=True): + assert_required_cryptography_version(MINIMAL_CRYPTOGRAPHY_VERSION) + return CSRInfoRetrievalCryptography( + module, content, validate_signature=validate_signature + ) diff --git a/plugins/module_utils/crypto/module_backends/privatekey.py b/plugins/module_utils/crypto/module_backends/privatekey.py index 07316119..6c548bc5 100644 --- a/plugins/module_utils/crypto/module_backends/privatekey.py +++ b/plugins/module_utils/crypto/module_backends/privatekey.py @@ -10,7 +10,6 @@ import base64 import traceback from ansible.module_utils import six -from ansible.module_utils.basic import missing_required_lib from ansible.module_utils.common.text.converters import to_bytes from ansible_collections.community.crypto.plugins.module_utils.argspec import ( ArgumentSpec, @@ -31,15 +30,12 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im ) from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import ( COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION, -) -from ansible_collections.community.crypto.plugins.module_utils.version import ( - LooseVersion, + assert_required_cryptography_version, ) MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION -CRYPTOGRAPHY_IMP_ERR = None try: import cryptography import cryptography.exceptions @@ -53,13 +49,8 @@ try: import cryptography.hazmat.primitives.asymmetric.x448 import cryptography.hazmat.primitives.asymmetric.x25519 import cryptography.hazmat.primitives.serialization - - CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) except ImportError: - CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() - CRYPTOGRAPHY_FOUND = False -else: - CRYPTOGRAPHY_FOUND = True + pass class PrivateKeyError(OpenSSLObjectError): @@ -75,7 +66,7 @@ class PrivateKeyError(OpenSSLObjectError): @six.add_metaclass(abc.ABCMeta) class PrivateKeyBackend: - def __init__(self, module, backend): + def __init__(self, module): self.module = module self.type = module.params["type"] self.size = module.params["size"] @@ -85,7 +76,6 @@ class PrivateKeyBackend: self.format = module.params["format"] self.format_mismatch = module.params.get("format_mismatch", "regenerate") self.regenerate = module.params.get("regenerate", "full_idempotence") - self.backend = backend self.private_key = None @@ -103,7 +93,6 @@ class PrivateKeyBackend: result.update( get_privatekey_info( self.module, - self.backend, data, passphrase=self.passphrase, return_private_key_data=False, @@ -219,16 +208,14 @@ class PrivateKeyBackend: def _get_fingerprint(self): if self.private_key: - return get_fingerprint_of_privatekey(self.private_key, backend=self.backend) + return get_fingerprint_of_privatekey(self.private_key) try: self._ensure_existing_private_key_loaded() except Exception: # Ignore errors pass if self.existing_private_key: - return get_fingerprint_of_privatekey( - self.existing_private_key, backend=self.backend - ) + return get_fingerprint_of_privatekey(self.existing_private_key) def dump(self, include_key): """Serialize the object into a dictionary.""" @@ -297,9 +284,7 @@ class PrivateKeyCryptographyBackend(PrivateKeyBackend): } def __init__(self, module): - super(PrivateKeyCryptographyBackend, self).__init__( - module=module, backend="cryptography" - ) + super(PrivateKeyCryptographyBackend, self).__init__(module=module) self.curves = dict() self._add_curve("secp224r1", "SECP224R1") @@ -558,34 +543,9 @@ class PrivateKeyCryptographyBackend(PrivateKeyBackend): return False -def select_backend(module, backend): - if backend == "auto": - # Detection what is possible - can_use_cryptography = ( - CRYPTOGRAPHY_FOUND - and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION) - ) - - # Decision - if can_use_cryptography: - backend = "cryptography" - - # Success? - if backend == "auto": - module.fail_json( - msg=f"Cannot detect the required Python library cryptography (>= {MINIMAL_CRYPTOGRAPHY_VERSION})" - ) - if backend == "cryptography": - if not CRYPTOGRAPHY_FOUND: - module.fail_json( - msg=missing_required_lib( - f"cryptography >= {MINIMAL_CRYPTOGRAPHY_VERSION}" - ), - exception=CRYPTOGRAPHY_IMP_ERR, - ) - return backend, PrivateKeyCryptographyBackend(module) - else: - raise Exception(f"Unsupported value for backend: {backend}") +def select_backend(module): + assert_required_cryptography_version(MINIMAL_CRYPTOGRAPHY_VERSION) + return PrivateKeyCryptographyBackend(module) def get_privatekey_argument_spec(): diff --git a/plugins/module_utils/crypto/module_backends/privatekey_convert.py b/plugins/module_utils/crypto/module_backends/privatekey_convert.py index 78e1760b..079bff31 100644 --- a/plugins/module_utils/crypto/module_backends/privatekey_convert.py +++ b/plugins/module_utils/crypto/module_backends/privatekey_convert.py @@ -8,7 +8,6 @@ import abc import traceback from ansible.module_utils import six -from ansible.module_utils.basic import missing_required_lib from ansible.module_utils.common.text.converters import to_bytes from ansible_collections.community.crypto.plugins.module_utils.argspec import ( ArgumentSpec, @@ -24,16 +23,13 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import ) from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import ( COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION, + assert_required_cryptography_version, ) from ansible_collections.community.crypto.plugins.module_utils.io import load_file -from ansible_collections.community.crypto.plugins.module_utils.version import ( - LooseVersion, -) MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION -CRYPTOGRAPHY_IMP_ERR = None try: import cryptography import cryptography.exceptions @@ -47,13 +43,8 @@ try: import cryptography.hazmat.primitives.asymmetric.x448 import cryptography.hazmat.primitives.asymmetric.x25519 import cryptography.hazmat.primitives.serialization - - CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) except ImportError: - CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() - CRYPTOGRAPHY_FOUND = False -else: - CRYPTOGRAPHY_FOUND = True + pass class PrivateKeyError(OpenSSLObjectError): @@ -69,14 +60,13 @@ class PrivateKeyError(OpenSSLObjectError): @six.add_metaclass(abc.ABCMeta) class PrivateKeyConvertBackend: - def __init__(self, module, backend): + def __init__(self, module): self.module = module self.src_path = module.params["src_path"] self.src_content = module.params["src_content"] self.src_passphrase = module.params["src_passphrase"] self.format = module.params["format"] self.dest_passphrase = module.params["dest_passphrase"] - self.backend = backend self.src_private_key = None if self.src_path is not None: @@ -135,9 +125,7 @@ class PrivateKeyConvertBackend: # Implementation with using cryptography class PrivateKeyConvertCryptographyBackend(PrivateKeyConvertBackend): def __init__(self, module): - super(PrivateKeyConvertCryptographyBackend, self).__init__( - module=module, backend="cryptography" - ) + super(PrivateKeyConvertCryptographyBackend, self).__init__(module=module) def get_private_key_data(self): """Return bytes for self.src_private_key in output format""" @@ -262,11 +250,7 @@ class PrivateKeyConvertCryptographyBackend(PrivateKeyConvertBackend): def select_backend(module): - if not CRYPTOGRAPHY_FOUND: - module.fail_json( - msg=missing_required_lib(f"cryptography >= {MINIMAL_CRYPTOGRAPHY_VERSION}"), - exception=CRYPTOGRAPHY_IMP_ERR, - ) + assert_required_cryptography_version(MINIMAL_CRYPTOGRAPHY_VERSION) return PrivateKeyConvertCryptographyBackend(module) diff --git a/plugins/module_utils/crypto/module_backends/privatekey_info.py b/plugins/module_utils/crypto/module_backends/privatekey_info.py index 9a0399cd..61ec9043 100644 --- a/plugins/module_utils/crypto/module_backends/privatekey_info.py +++ b/plugins/module_utils/crypto/module_backends/privatekey_info.py @@ -7,10 +7,8 @@ from __future__ import annotations import abc -import traceback from ansible.module_utils import six -from ansible.module_utils.basic import missing_required_lib from ansible.module_utils.common.text.converters import to_bytes, to_native from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( OpenSSLObjectError, @@ -28,25 +26,17 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im ) from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import ( COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION, -) -from ansible_collections.community.crypto.plugins.module_utils.version import ( - LooseVersion, + assert_required_cryptography_version, ) MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION -CRYPTOGRAPHY_IMP_ERR = None try: import cryptography from cryptography.hazmat.primitives import serialization - - CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) except ImportError: - CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() - CRYPTOGRAPHY_FOUND = False -else: - CRYPTOGRAPHY_FOUND = True + pass SIGNATURE_TEST_DATA = b"1234" @@ -187,7 +177,6 @@ class PrivateKeyInfoRetrieval: def __init__( self, module, - backend, content, passphrase=None, return_private_key_data=False, @@ -195,7 +184,6 @@ class PrivateKeyInfoRetrieval: ): # content must be a bytes string self.module = module - self.backend = backend self.content = content self.passphrase = passphrase self.return_private_key_data = return_private_key_data @@ -228,7 +216,6 @@ class PrivateKeyInfoRetrieval: if self.passphrase is not None else self.passphrase ), - backend=self.backend, ) result["can_parse_key"] = True except OpenSSLObjectError as exc: @@ -269,7 +256,7 @@ class PrivateKeyInfoRetrievalCryptography(PrivateKeyInfoRetrieval): def __init__(self, module, content, **kwargs): super(PrivateKeyInfoRetrievalCryptography, self).__init__( - module, "cryptography", content, **kwargs + module, content, **kwargs ) def _get_public_key(self, binary): @@ -291,61 +278,32 @@ class PrivateKeyInfoRetrievalCryptography(PrivateKeyInfoRetrieval): def get_privatekey_info( module, - backend, content, passphrase=None, return_private_key_data=False, prefer_one_fingerprint=False, ): - if backend == "cryptography": - info = PrivateKeyInfoRetrievalCryptography( - module, - content, - passphrase=passphrase, - return_private_key_data=return_private_key_data, - ) + info = PrivateKeyInfoRetrievalCryptography( + module, + content, + passphrase=passphrase, + return_private_key_data=return_private_key_data, + ) return info.get_info(prefer_one_fingerprint=prefer_one_fingerprint) def select_backend( module, - backend, content, passphrase=None, return_private_key_data=False, check_consistency=False, ): - if backend == "auto": - # Detection what is possible - can_use_cryptography = ( - CRYPTOGRAPHY_FOUND - and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION) - ) - - # Try cryptography - if can_use_cryptography: - backend = "cryptography" - - # Success? - if backend == "auto": - module.fail_json( - msg=f"Cannot detect the required Python library cryptography (>= {MINIMAL_CRYPTOGRAPHY_VERSION})" - ) - - if backend == "cryptography": - if not CRYPTOGRAPHY_FOUND: - module.fail_json( - msg=missing_required_lib( - f"cryptography >= {MINIMAL_CRYPTOGRAPHY_VERSION}" - ), - exception=CRYPTOGRAPHY_IMP_ERR, - ) - return backend, PrivateKeyInfoRetrievalCryptography( - module, - content, - passphrase=passphrase, - return_private_key_data=return_private_key_data, - check_consistency=check_consistency, - ) - else: - raise ValueError(f"Unsupported value for backend: {backend}") + assert_required_cryptography_version(MINIMAL_CRYPTOGRAPHY_VERSION) + return PrivateKeyInfoRetrievalCryptography( + module, + content, + passphrase=passphrase, + return_private_key_data=return_private_key_data, + check_consistency=check_consistency, + ) diff --git a/plugins/module_utils/crypto/module_backends/publickey_info.py b/plugins/module_utils/crypto/module_backends/publickey_info.py index 886ef8b2..438dc184 100644 --- a/plugins/module_utils/crypto/module_backends/publickey_info.py +++ b/plugins/module_utils/crypto/module_backends/publickey_info.py @@ -5,10 +5,8 @@ from __future__ import annotations import abc -import traceback from ansible.module_utils import six -from ansible.module_utils.basic import missing_required_lib from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( OpenSSLObjectError, ) @@ -18,15 +16,12 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im ) from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import ( COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION, -) -from ansible_collections.community.crypto.plugins.module_utils.version import ( - LooseVersion, + assert_required_cryptography_version, ) MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION -CRYPTOGRAPHY_IMP_ERR = None try: import cryptography import cryptography.hazmat.primitives.asymmetric.ed448 @@ -34,13 +29,8 @@ try: import cryptography.hazmat.primitives.asymmetric.x448 import cryptography.hazmat.primitives.asymmetric.x25519 from cryptography.hazmat.primitives import serialization - - CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) except ImportError: - CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() - CRYPTOGRAPHY_FOUND = False -else: - CRYPTOGRAPHY_FOUND = True + pass def _get_cryptography_public_key_info(key): @@ -97,10 +87,9 @@ class PublicKeyParseError(OpenSSLObjectError): @six.add_metaclass(abc.ABCMeta) class PublicKeyInfoRetrieval: - def __init__(self, module, backend, content=None, key=None): + def __init__(self, module, content=None, key=None): # content must be a bytes string self.module = module - self.backend = backend self.content = content self.key = key @@ -116,7 +105,7 @@ class PublicKeyInfoRetrieval: result = dict() if self.key is None: try: - self.key = load_publickey(content=self.content, backend=self.backend) + self.key = load_publickey(content=self.content) except OpenSSLObjectError as e: raise PublicKeyParseError(str(e), {}) @@ -138,7 +127,7 @@ class PublicKeyInfoRetrievalCryptography(PublicKeyInfoRetrieval): def __init__(self, module, content=None, key=None): super(PublicKeyInfoRetrievalCryptography, self).__init__( - module, "cryptography", content=content, key=key + module, content=content, key=key ) def _get_public_key(self, binary): @@ -151,42 +140,11 @@ class PublicKeyInfoRetrievalCryptography(PublicKeyInfoRetrieval): return _get_cryptography_public_key_info(self.key) -def get_publickey_info( - module, backend, content=None, key=None, prefer_one_fingerprint=False -): - if backend == "cryptography": - info = PublicKeyInfoRetrievalCryptography(module, content=content, key=key) +def get_publickey_info(module, content=None, key=None, prefer_one_fingerprint=False): + info = PublicKeyInfoRetrievalCryptography(module, content=content, key=key) return info.get_info(prefer_one_fingerprint=prefer_one_fingerprint) -def select_backend(module, backend, content=None, key=None): - if backend == "auto": - # Detection what is possible - can_use_cryptography = ( - CRYPTOGRAPHY_FOUND - and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION) - ) - - # Try cryptography - if can_use_cryptography: - backend = "cryptography" - - # Success? - if backend == "auto": - module.fail_json( - msg=f"Cannot detect any of the required Python libraries cryptography (>= {MINIMAL_CRYPTOGRAPHY_VERSION})" - ) - - if backend == "cryptography": - if not CRYPTOGRAPHY_FOUND: - module.fail_json( - msg=missing_required_lib( - f"cryptography >= {MINIMAL_CRYPTOGRAPHY_VERSION}" - ), - exception=CRYPTOGRAPHY_IMP_ERR, - ) - return backend, PublicKeyInfoRetrievalCryptography( - module, content=content, key=key - ) - else: - raise ValueError(f"Unsupported value for backend: {backend}") +def select_backend(module, content=None, key=None): + assert_required_cryptography_version(MINIMAL_CRYPTOGRAPHY_VERSION) + return PublicKeyInfoRetrievalCryptography(module, content=content, key=key) diff --git a/plugins/module_utils/crypto/support.py b/plugins/module_utils/crypto/support.py index 738c967a..e26287c1 100644 --- a/plugins/module_utils/crypto/support.py +++ b/plugins/module_utils/crypto/support.py @@ -98,20 +98,17 @@ def get_fingerprint_of_bytes(source, prefer_one=False): return fingerprint -def get_fingerprint_of_privatekey(privatekey, backend="cryptography", prefer_one=False): +def get_fingerprint_of_privatekey(privatekey, prefer_one=False): """Generate the fingerprint of the public key.""" - if backend == "cryptography": - publickey = privatekey.public_key().public_bytes( - serialization.Encoding.DER, serialization.PublicFormat.SubjectPublicKeyInfo - ) + publickey = privatekey.public_key().public_bytes( + serialization.Encoding.DER, serialization.PublicFormat.SubjectPublicKeyInfo + ) return get_fingerprint_of_bytes(publickey, prefer_one=prefer_one) -def get_fingerprint( - path, passphrase=None, content=None, backend="cryptography", prefer_one=False -): +def get_fingerprint(path, passphrase=None, content=None, prefer_one=False): """Generate the fingerprint of the public key.""" privatekey = load_privatekey( @@ -119,16 +116,16 @@ def get_fingerprint( passphrase=passphrase, content=content, check_passphrase=False, - backend=backend, ) - return get_fingerprint_of_privatekey( - privatekey, backend=backend, prefer_one=prefer_one - ) + return get_fingerprint_of_privatekey(privatekey, prefer_one=prefer_one) def load_privatekey( - path, passphrase=None, check_passphrase=True, content=None, backend="cryptography" + path, + passphrase=None, + check_passphrase=True, + content=None, ): """Load the specified OpenSSL private key. @@ -145,23 +142,20 @@ def load_privatekey( except (IOError, OSError) as exc: raise OpenSSLObjectError(exc) - if backend == "cryptography": - try: - result = load_pem_private_key( - priv_key_detail, - None if passphrase is None else to_bytes(passphrase), - ) - except TypeError: - raise OpenSSLBadPassphraseError( - "Wrong or empty passphrase provided for private key" - ) - except ValueError: - raise OpenSSLBadPassphraseError("Wrong passphrase provided for private key") - - return result + try: + return load_pem_private_key( + priv_key_detail, + None if passphrase is None else to_bytes(passphrase), + ) + except TypeError: + raise OpenSSLBadPassphraseError( + "Wrong or empty passphrase provided for private key" + ) + except ValueError: + raise OpenSSLBadPassphraseError("Wrong passphrase provided for private key") -def load_publickey(path=None, content=None, backend=None): +def load_publickey(path=None, content=None): if content is None: if path is None: raise OpenSSLObjectError("Must provide either path or content") @@ -171,16 +165,13 @@ def load_publickey(path=None, content=None, backend=None): except (IOError, OSError) as exc: raise OpenSSLObjectError(exc) - if backend == "cryptography": - try: - return serialization.load_pem_public_key(content) - except Exception as e: - raise OpenSSLObjectError(f"Error while deserializing key: {e}") + try: + return serialization.load_pem_public_key(content) + except Exception as e: + raise OpenSSLObjectError(f"Error while deserializing key: {e}") -def load_certificate( - path, content=None, backend="cryptography", der_support_enabled=False -): +def load_certificate(path, content=None, der_support_enabled=False): """Load the specified certificate.""" try: @@ -191,20 +182,19 @@ def load_certificate( cert_content = content except (IOError, OSError) as exc: raise OpenSSLObjectError(exc) - if backend == "cryptography": - if der_support_enabled is False or identify_pem_format(cert_content): - try: - return x509.load_pem_x509_certificate(cert_content) - except ValueError as exc: - raise OpenSSLObjectError(exc) - elif der_support_enabled: - try: - return x509.load_der_x509_certificate(cert_content) - except ValueError as exc: - raise OpenSSLObjectError(f"Cannot parse DER certificate: {exc}") + if der_support_enabled is False or identify_pem_format(cert_content): + try: + return x509.load_pem_x509_certificate(cert_content) + except ValueError as exc: + raise OpenSSLObjectError(exc) + elif der_support_enabled: + try: + return x509.load_der_x509_certificate(cert_content) + except ValueError as exc: + raise OpenSSLObjectError(f"Cannot parse DER certificate: {exc}") -def load_certificate_request(path, content=None, backend="cryptography"): +def load_certificate_request(path, content=None): """Load the specified certificate signing request.""" try: if content is None: @@ -214,11 +204,10 @@ def load_certificate_request(path, content=None, backend="cryptography"): csr_content = content except (IOError, OSError) as exc: raise OpenSSLObjectError(exc) - if backend == "cryptography": - try: - return x509.load_pem_x509_csr(csr_content) - except ValueError as exc: - raise OpenSSLObjectError(exc) + try: + return x509.load_pem_x509_csr(csr_content) + except ValueError as exc: + raise OpenSSLObjectError(exc) def parse_name_field(input_dict, name_field_name=None): diff --git a/plugins/module_utils/cryptography_dep.py b/plugins/module_utils/cryptography_dep.py index 6f6bf582..a5177f17 100644 --- a/plugins/module_utils/cryptography_dep.py +++ b/plugins/module_utils/cryptography_dep.py @@ -10,6 +10,53 @@ Must be kept in sync with plugins/doc_fragments/cryptography_dep.py. from __future__ import annotations +import traceback + +from ansible.module_utils.basic import missing_required_lib +from ansible_collections.community.crypto.plugins.module_utils.version import ( + LooseVersion, +) + + +_CRYPTOGRAPHY_IMP_ERR = None +try: + import cryptography + from cryptography import x509 # noqa: F401, pylint: disable=unused-import + + _CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) + _CRYPTOGRAPHY_FILE = cryptography.__file__ +except ImportError: + _CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() + _CRYPTOGRAPHY_FOUND = False + _CRYPTOGRAPHY_FILE = None +else: + _CRYPTOGRAPHY_FOUND = True + # Corresponds to the community.crypto.cryptography_dep.minimum doc fragment COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION = "3.3" + + +def assert_required_cryptography_version( + module, + *, + minimum_cryptography_version: str = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION, +) -> None: + if not _CRYPTOGRAPHY_FOUND: + module.fail_json( + msg=missing_required_lib(f"cryptography >= {minimum_cryptography_version}"), + exception=_CRYPTOGRAPHY_IMP_ERR, + ) + if _CRYPTOGRAPHY_VERSION < LooseVersion(minimum_cryptography_version): + module.fail_json( + msg=( + f"Cannot detect the required Python library cryptography (>= {minimum_cryptography_version})." + f" Only found a too old version ({_CRYPTOGRAPHY_VERSION}) at {_CRYPTOGRAPHY_FILE}." + ), + ) + + +__all__ = ( + "COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION", + "assert_required_cryptography_version", +) diff --git a/plugins/module_utils/openssh/backends/keypair_backend.py b/plugins/module_utils/openssh/backends/keypair_backend.py index 1aa41dd2..faf40d74 100644 --- a/plugins/module_utils/openssh/backends/keypair_backend.py +++ b/plugins/module_utils/openssh/backends/keypair_backend.py @@ -22,6 +22,7 @@ from ansible_collections.community.crypto.plugins.module_utils.openssh.backends. parse_private_key_format, ) from ansible_collections.community.crypto.plugins.module_utils.openssh.cryptography import ( + CRYPTOGRAPHY_VERSION, HAS_OPENSSH_SUPPORT, InvalidCommentError, InvalidPassphraseError, @@ -346,8 +347,7 @@ class KeypairBackendOpensshBin(KeypairBackend): if self.module.params["private_key_format"] != "auto": self.module.fail_json( - msg="'auto' is the only valid option for " - + "'private_key_format' when 'backend' is not 'cryptography'" + msg="'auto' is the only valid option for 'private_key_format' when 'backend' is not 'cryptography'" ) self.ssh_keygen = KeygenCommand(self.module) @@ -531,7 +531,9 @@ class KeypairBackendCryptography(KeypairBackend): def select_backend(module, backend): - can_use_cryptography = HAS_OPENSSH_SUPPORT + can_use_cryptography = HAS_OPENSSH_SUPPORT and LooseVersion( + CRYPTOGRAPHY_VERSION + ) >= LooseVersion(COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION) can_use_opensshbin = bool(module.get_bin_path("ssh-keygen")) if backend == "auto": @@ -550,14 +552,13 @@ def select_backend(module, backend): if backend == "opensshbin": if not can_use_opensshbin: module.fail_json(msg="Cannot find the OpenSSH binary in the PATH") - return backend, KeypairBackendOpensshBin(module) - elif backend == "cryptography": + return KeypairBackendOpensshBin(module) + if backend == "cryptography": if not can_use_cryptography: module.fail_json( msg=missing_required_lib( f"cryptography >= {COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION}" ) ) - return backend, KeypairBackendCryptography(module) - else: - raise ValueError(f"Unsupported value for backend: {backend}") + return KeypairBackendCryptography(module) + raise ValueError(f"Unsupported value for backend: {backend}") diff --git a/plugins/module_utils/openssh/cryptography.py b/plugins/module_utils/openssh/cryptography.py index 53dc6064..24653d68 100644 --- a/plugins/module_utils/openssh/cryptography.py +++ b/plugins/module_utils/openssh/cryptography.py @@ -13,7 +13,6 @@ from socket import gethostname try: from cryptography import __version__ as CRYPTOGRAPHY_VERSION from cryptography.exceptions import InvalidSignature, UnsupportedAlgorithm - from cryptography.hazmat.backends.openssl import backend from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import dsa, ec, padding, rsa from cryptography.hazmat.primitives.asymmetric.ed25519 import ( @@ -149,19 +148,16 @@ class AsymmetricKeypair: # if improper padding is used during signing public_exponent=65537, key_size=size, - backend=backend, ) elif keytype == "dsa": privatekey = dsa.generate_private_key( key_size=size, - backend=backend, ) elif keytype == "ed25519": privatekey = Ed25519PrivateKey.generate() elif keytype == "ecdsa": privatekey = ec.generate_private_key( _ALGORITHM_PARAMETERS["ecdsa"]["curves"][size], - backend=backend, ) publickey = privatekey.public_key() @@ -574,7 +570,6 @@ def load_privatekey(path, passphrase, key_format): privatekey = privatekey_loader( data=content, password=passphrase, - backend=backend, ) except ValueError as e: @@ -584,7 +579,6 @@ def load_privatekey(path, passphrase, key_format): privatekey = privatekey_loaders["PEM"]( data=content, password=passphrase, - backend=backend, ) except ValueError as e: raise InvalidPrivateKeyFileError(e) @@ -625,7 +619,6 @@ def load_publickey(path, key_format): publickey = publickey_loader( data=content, - backend=backend, ) except ValueError as e: raise InvalidPublicKeyFileError(e) @@ -692,7 +685,7 @@ def extract_comment(path): def calculate_fingerprint(openssh_publickey): - digest = hashes.Hash(hashes.SHA256(), backend=backend) + digest = hashes.Hash(hashes.SHA256()) decoded_pubkey = b64decode(openssh_publickey.split(b" ")[1]) digest.update(decoded_pubkey) diff --git a/plugins/module_utils/time.py b/plugins/module_utils/time.py index 1d36295d..0ccf859d 100644 --- a/plugins/module_utils/time.py +++ b/plugins/module_utils/time.py @@ -115,14 +115,12 @@ def convert_relative_to_datetime(relative_time_string, with_timezone=False, now= return now - offset -def get_relative_time_option( - input_string, input_name, backend="cryptography", with_timezone=False, now=None -): +def get_relative_time_option(input_string, input_name, with_timezone=False, now=None): """ Return an absolute timespec if a relative timespec or an ASN1 formatted string is provided. - The return value will be a datetime object for the cryptography backend. + The return value will be a datetime object. """ result = to_native(input_string) if result is None: @@ -131,34 +129,31 @@ def get_relative_time_option( ) # Relative time if result.startswith("+") or result.startswith("-"): - result_datetime = convert_relative_to_datetime( + return convert_relative_to_datetime( result, with_timezone=with_timezone, now=now ) - if backend == "cryptography": - return result_datetime # Absolute time - if backend == "cryptography": - for date_fmt, length in [ - ( - "%Y%m%d%H%M%SZ", - 15, - ), # this also parses '202401020304Z', but as datetime(2024, 1, 2, 3, 0, 4) - ("%Y%m%d%H%MZ", 13), - ( - "%Y%m%d%H%M%S%z", - 14 + 5, - ), # this also parses '202401020304+0000', but as datetime(2024, 1, 2, 3, 0, 4, tzinfo=...) - ("%Y%m%d%H%M%z", 12 + 5), - ]: - if len(result) != length: - continue - try: - res = datetime.datetime.strptime(result, date_fmt) - except ValueError: - pass - else: - return add_or_remove_timezone(res, with_timezone=with_timezone) + for date_fmt, length in [ + ( + "%Y%m%d%H%M%SZ", + 15, + ), # this also parses '202401020304Z', but as datetime(2024, 1, 2, 3, 0, 4) + ("%Y%m%d%H%MZ", 13), + ( + "%Y%m%d%H%M%S%z", + 14 + 5, + ), # this also parses '202401020304+0000', but as datetime(2024, 1, 2, 3, 0, 4, tzinfo=...) + ("%Y%m%d%H%M%z", 12 + 5), + ]: + if len(result) != length: + continue + try: + res = datetime.datetime.strptime(result, date_fmt) + except ValueError: + pass + else: + return add_or_remove_timezone(res, with_timezone=with_timezone) - raise OpenSSLObjectError( - f'The time spec "{input_string}" for {input_name} is invalid' - ) + raise OpenSSLObjectError( + f'The time spec "{input_string}" for {input_name} is invalid' + ) diff --git a/plugins/modules/acme_challenge_cert_helper.py b/plugins/modules/acme_challenge_cert_helper.py index 482f1c39..6b66ce35 100644 --- a/plugins/modules/acme_challenge_cert_helper.py +++ b/plugins/modules/acme_challenge_cert_helper.py @@ -149,9 +149,8 @@ regular_certificate: import base64 import datetime import ipaddress -import traceback -from ansible.module_utils.basic import AnsibleModule, missing_required_lib +from ansible.module_utils.basic import AnsibleModule from ansible.module_utils.common.text.converters import to_bytes, to_text from ansible_collections.community.crypto.plugins.module_utils.acme.errors import ( ModuleFailException, @@ -164,16 +163,13 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptograp ) from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import ( COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION, + assert_required_cryptography_version, ) from ansible_collections.community.crypto.plugins.module_utils.time import ( get_now_datetime, ) -from ansible_collections.community.crypto.plugins.module_utils.version import ( - LooseVersion, -) -CRYPTOGRAPHY_IMP_ERR = None try: import cryptography import cryptography.hazmat.backends @@ -185,13 +181,8 @@ try: import cryptography.hazmat.primitives.serialization import cryptography.x509 import cryptography.x509.oid - - HAS_CRYPTOGRAPHY = LooseVersion(cryptography.__version__) >= LooseVersion( - COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION - ) except ImportError: - CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() - HAS_CRYPTOGRAPHY = False + pass # Convert byte string to ASN1 encoded octet string @@ -215,20 +206,8 @@ def main(): required_one_of=(["private_key_src", "private_key_content"],), mutually_exclusive=(["private_key_src", "private_key_content"],), ) - if not HAS_CRYPTOGRAPHY: - # Some callbacks die when exception is provided with value None - if CRYPTOGRAPHY_IMP_ERR: - module.fail_json( - msg=missing_required_lib( - f"cryptography >= {COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION}" - ), - exception=CRYPTOGRAPHY_IMP_ERR, - ) - module.fail_json( - msg=missing_required_lib( - f"cryptography >= {COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION}" - ) - ) + + assert_required_cryptography_version(COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION) try: # Get parameters diff --git a/plugins/modules/certificate_complete_chain.py b/plugins/modules/certificate_complete_chain.py index 159a9b75..33e1e458 100644 --- a/plugins/modules/certificate_complete_chain.py +++ b/plugins/modules/certificate_complete_chain.py @@ -121,26 +121,21 @@ complete_chain: """ import os -import traceback -from ansible.module_utils.basic import AnsibleModule, missing_required_lib +from ansible.module_utils.basic import AnsibleModule from ansible.module_utils.common.text.converters import to_bytes from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import ( split_pem_list, ) from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import ( COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION, -) -from ansible_collections.community.crypto.plugins.module_utils.version import ( - LooseVersion, + assert_required_cryptography_version, ) -CRYPTOGRAPHY_IMP_ERR = None try: import cryptography import cryptography.exceptions - import cryptography.hazmat.backends import cryptography.hazmat.primitives.asymmetric.ec import cryptography.hazmat.primitives.asymmetric.padding import cryptography.hazmat.primitives.asymmetric.rsa @@ -149,13 +144,8 @@ try: import cryptography.hazmat.primitives.serialization import cryptography.x509 import cryptography.x509.oid - - HAS_CRYPTOGRAPHY = LooseVersion(cryptography.__version__) >= LooseVersion( - COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION - ) except ImportError: - CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() - HAS_CRYPTOGRAPHY = False + pass class Certificate: @@ -333,13 +323,7 @@ def main(): supports_check_mode=True, ) - if not HAS_CRYPTOGRAPHY: - module.fail_json( - msg=missing_required_lib( - f"cryptography >= {COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION}" - ), - exception=CRYPTOGRAPHY_IMP_ERR, - ) + assert_required_cryptography_version(COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION) # Load chain chain = parse_PEM_list(module, module.params["input_chain"], source="input chain") diff --git a/plugins/modules/ecs_certificate.py b/plugins/modules/ecs_certificate.py index f3a8b9ae..9e2bb52e 100644 --- a/plugins/modules/ecs_certificate.py +++ b/plugins/modules/ecs_certificate.py @@ -550,15 +550,15 @@ import datetime import os import re import time -import traceback -from ansible.module_utils.basic import AnsibleModule, missing_required_lib +from ansible.module_utils.basic import AnsibleModule from ansible.module_utils.common.text.converters import to_bytes from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( load_certificate, ) from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import ( COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION, + assert_required_cryptography_version, ) from ansible_collections.community.crypto.plugins.module_utils.ecs.api import ( ECSClient, @@ -567,22 +567,8 @@ from ansible_collections.community.crypto.plugins.module_utils.ecs.api import ( ecs_client_argument_spec, ) from ansible_collections.community.crypto.plugins.module_utils.io import write_file -from ansible_collections.community.crypto.plugins.module_utils.version import ( - LooseVersion, -) -CRYPTOGRAPHY_IMP_ERR = None -try: - import cryptography - - CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) -except ImportError: - CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() - CRYPTOGRAPHY_FOUND = False -else: - CRYPTOGRAPHY_FOUND = True - MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION @@ -652,7 +638,7 @@ class EcsCertificate: self.ecs_client = None if self.path and os.path.exists(self.path): try: - self.cert = load_certificate(self.path, backend="cryptography") + self.cert = load_certificate(self.path) except Exception: self.cert = None # Instantiate the ECS client and then try a no-op connection to verify credentials are valid @@ -1008,13 +994,7 @@ def main(): supports_check_mode=True, ) - if not CRYPTOGRAPHY_FOUND or CRYPTOGRAPHY_VERSION < LooseVersion( - MINIMAL_CRYPTOGRAPHY_VERSION - ): - module.fail_json( - msg=missing_required_lib(f"cryptography >= {MINIMAL_CRYPTOGRAPHY_VERSION}"), - exception=CRYPTOGRAPHY_IMP_ERR, - ) + assert_required_cryptography_version(MINIMAL_CRYPTOGRAPHY_VERSION) # If validate_only is used, pointing to an existing tracking_id is an invalid operation if module.params["tracking_id"]: diff --git a/plugins/modules/get_certificate.py b/plugins/modules/get_certificate.py index 5010e673..bfe66e04 100644 --- a/plugins/modules/get_certificate.py +++ b/plugins/modules/get_certificate.py @@ -76,6 +76,9 @@ options: - Determines which crypto backend to use. - The default choice is V(auto), which tries to use C(cryptography) if available. - If set to V(cryptography), will try to use the L(cryptography,https://cryptography.io/) library. + - Note that with community.crypto 3.0.0, all values behave the same. + This option will be deprecated in a later version. + We recommend to not set it explicitly. type: str default: auto choices: [auto, cryptography] @@ -265,7 +268,6 @@ import atexit import base64 import ssl import sys -import traceback from os.path import isfile from socket import create_connection, setdefaulttimeout, socket from ssl import ( @@ -275,7 +277,7 @@ from ssl import ( create_default_context, ) -from ansible.module_utils.basic import AnsibleModule, missing_required_lib +from ansible.module_utils.basic import AnsibleModule from ansible.module_utils.common.text.converters import to_bytes, to_native from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( CRYPTOGRAPHY_TIMEZONE, @@ -286,29 +288,21 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptograp ) from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import ( COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION, + assert_required_cryptography_version, ) from ansible_collections.community.crypto.plugins.module_utils.time import ( get_now_datetime, ) -from ansible_collections.community.crypto.plugins.module_utils.version import ( - LooseVersion, -) MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION -CRYPTOGRAPHY_IMP_ERR = None try: import cryptography import cryptography.exceptions import cryptography.x509 - - CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) except ImportError: - CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() - CRYPTOGRAPHY_FOUND = False -else: - CRYPTOGRAPHY_FOUND = True + pass def send_starttls_packet(sock, server_type): @@ -367,32 +361,7 @@ def main(): f"The Python version used to run the get_certificate module is {sys.version}" ) - backend = module.params.get("select_crypto_backend") - if backend == "auto": - # Detection what is possible - can_use_cryptography = ( - CRYPTOGRAPHY_FOUND - and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION) - ) - - # Try cryptography - if can_use_cryptography: - backend = "cryptography" - - # Success? - if backend == "auto": - module.fail_json( - msg=f"Cannot detect the required Python library cryptography (>= {MINIMAL_CRYPTOGRAPHY_VERSION})" - ) - - if backend == "cryptography": - if not CRYPTOGRAPHY_FOUND: - module.fail_json( - msg=missing_required_lib( - f"cryptography >= {MINIMAL_CRYPTOGRAPHY_VERSION}" - ), - exception=CRYPTOGRAPHY_IMP_ERR, - ) + assert_required_cryptography_version(MINIMAL_CRYPTOGRAPHY_VERSION) result = dict( changed=False, @@ -529,56 +498,55 @@ def main(): result["cert"] = cert - if backend == "cryptography": - x509 = cryptography.x509.load_pem_x509_certificate(to_bytes(cert)) - result["subject"] = {} - for attribute in x509.subject: - result["subject"][cryptography_oid_to_name(attribute.oid, short=True)] = ( - attribute.value - ) - - result["expired"] = get_not_valid_after(x509) < get_now_datetime( - with_timezone=CRYPTOGRAPHY_TIMEZONE + x509 = cryptography.x509.load_pem_x509_certificate(to_bytes(cert)) + result["subject"] = {} + for attribute in x509.subject: + result["subject"][cryptography_oid_to_name(attribute.oid, short=True)] = ( + attribute.value ) - result["extensions"] = [] - for dotted_number, entry in cryptography_get_extensions_from_cert(x509).items(): - oid = cryptography.x509.oid.ObjectIdentifier(dotted_number) - ext = { - "critical": entry["critical"], - "asn1_data": entry["value"], - "name": cryptography_oid_to_name(oid, short=True), - } - if not asn1_base64: - ext["asn1_data"] = base64.b64decode(ext["asn1_data"]) - result["extensions"].append(ext) + result["expired"] = get_not_valid_after(x509) < get_now_datetime( + with_timezone=CRYPTOGRAPHY_TIMEZONE + ) - result["issuer"] = {} - for attribute in x509.issuer: - result["issuer"][cryptography_oid_to_name(attribute.oid, short=True)] = ( - attribute.value - ) + result["extensions"] = [] + for dotted_number, entry in cryptography_get_extensions_from_cert(x509).items(): + oid = cryptography.x509.oid.ObjectIdentifier(dotted_number) + ext = { + "critical": entry["critical"], + "asn1_data": entry["value"], + "name": cryptography_oid_to_name(oid, short=True), + } + if not asn1_base64: + ext["asn1_data"] = base64.b64decode(ext["asn1_data"]) + result["extensions"].append(ext) - result["not_after"] = get_not_valid_after(x509).strftime("%Y%m%d%H%M%SZ") - result["not_before"] = get_not_valid_before(x509).strftime("%Y%m%d%H%M%SZ") - - result["serial_number"] = x509.serial_number - result["signature_algorithm"] = cryptography_oid_to_name( - x509.signature_algorithm_oid + result["issuer"] = {} + for attribute in x509.issuer: + result["issuer"][cryptography_oid_to_name(attribute.oid, short=True)] = ( + attribute.value ) - # We need the -1 offset to get the same values as pyOpenSSL - if x509.version == cryptography.x509.Version.v1: - result["version"] = 1 - 1 - elif x509.version == cryptography.x509.Version.v3: - result["version"] = 3 - 1 - else: - result["version"] = "unknown" + result["not_after"] = get_not_valid_after(x509).strftime("%Y%m%d%H%M%SZ") + result["not_before"] = get_not_valid_before(x509).strftime("%Y%m%d%H%M%SZ") - if verified_chain is not None: - result["verified_chain"] = verified_chain - if unverified_chain is not None: - result["unverified_chain"] = unverified_chain + result["serial_number"] = x509.serial_number + result["signature_algorithm"] = cryptography_oid_to_name( + x509.signature_algorithm_oid + ) + + # We need the -1 offset to get the same values as pyOpenSSL + if x509.version == cryptography.x509.Version.v1: + result["version"] = 1 - 1 + elif x509.version == cryptography.x509.Version.v3: + result["version"] = 3 - 1 + else: + result["version"] = "unknown" + + if verified_chain is not None: + result["verified_chain"] = verified_chain + if unverified_chain is not None: + result["unverified_chain"] = unverified_chain module.exit_json(**result) diff --git a/plugins/modules/openssh_keypair.py b/plugins/modules/openssh_keypair.py index b2181e29..28e30fdc 100644 --- a/plugins/modules/openssh_keypair.py +++ b/plugins/modules/openssh_keypair.py @@ -246,7 +246,7 @@ def main(): add_file_common_args=True, ) - keypair = select_backend(module, module.params["backend"])[1] + keypair = select_backend(module, module.params["backend"]) keypair.execute() diff --git a/plugins/modules/openssl_csr.py b/plugins/modules/openssl_csr.py index c1cc3e76..b40583ab 100644 --- a/plugins/modules/openssl_csr.py +++ b/plugins/modules/openssl_csr.py @@ -340,8 +340,7 @@ def main(): ) try: - backend = module.params["select_crypto_backend"] - backend, module_backend = select_backend(module, backend) + module_backend = select_backend(module) csr = CertificateSigningRequestModule(module, module_backend) if module.params["state"] == "present": diff --git a/plugins/modules/openssl_csr_info.py b/plugins/modules/openssl_csr_info.py index 2f0cfda0..52e33c61 100644 --- a/plugins/modules/openssl_csr_info.py +++ b/plugins/modules/openssl_csr_info.py @@ -40,6 +40,9 @@ options: - Determines which crypto backend to use. - The default choice is V(auto), which tries to use C(cryptography) if available. - If set to V(cryptography), will try to use the L(cryptography,https://cryptography.io/) library. + - Note that with community.crypto 3.0.0, all values behave the same. + This option will be deprecated in a later version. + We recommend to not set it explicitly. type: str default: auto choices: [auto, cryptography] @@ -341,9 +344,7 @@ def main(): except (IOError, OSError) as e: module.fail_json(msg=f"Error while reading CSR file from disk: {e}") - backend, module_backend = select_backend( - module, module.params["select_crypto_backend"], data, validate_signature=True - ) + module_backend = select_backend(module, data, validate_signature=True) try: result = module_backend.get_info() diff --git a/plugins/modules/openssl_csr_pipe.py b/plugins/modules/openssl_csr_pipe.py index 4208499c..f3afc05d 100644 --- a/plugins/modules/openssl_csr_pipe.py +++ b/plugins/modules/openssl_csr_pipe.py @@ -174,8 +174,7 @@ def main(): ) try: - backend = module.params["select_crypto_backend"] - backend, module_backend = select_backend(module, backend) + module_backend = select_backend(module) csr = CertificateSigningRequestModule(module, module_backend) csr.generate(module) diff --git a/plugins/modules/openssl_dhparam.py b/plugins/modules/openssl_dhparam.py index 1c8eff3b..28ceabb6 100644 --- a/plugins/modules/openssl_dhparam.py +++ b/plugins/modules/openssl_dhparam.py @@ -132,15 +132,15 @@ import abc import os import re import tempfile -import traceback -from ansible.module_utils.basic import AnsibleModule, missing_required_lib +from ansible.module_utils.basic import AnsibleModule from ansible.module_utils.common.text.converters import to_native from ansible_collections.community.crypto.plugins.module_utils.crypto.math import ( count_bits, ) from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import ( COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION, + assert_required_cryptography_version, ) from ansible_collections.community.crypto.plugins.module_utils.io import ( load_file_if_exists, @@ -153,7 +153,6 @@ from ansible_collections.community.crypto.plugins.module_utils.version import ( MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION -CRYPTOGRAPHY_IMP_ERR = None try: import cryptography import cryptography.exceptions @@ -163,7 +162,6 @@ try: CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) except ImportError: - CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() CRYPTOGRAPHY_FOUND = False else: CRYPTOGRAPHY_FOUND = True @@ -413,13 +411,7 @@ def main(): if backend == "openssl": dhparam = DHParameterOpenSSL(module) elif backend == "cryptography": - if not CRYPTOGRAPHY_FOUND: - module.fail_json( - msg=missing_required_lib( - f"cryptography >= {MINIMAL_CRYPTOGRAPHY_VERSION}" - ), - exception=CRYPTOGRAPHY_IMP_ERR, - ) + assert_required_cryptography_version(MINIMAL_CRYPTOGRAPHY_VERSION) dhparam = DHParameterCryptography(module) else: raise AssertionError("Internal error: unknown backend") diff --git a/plugins/modules/openssl_pkcs12.py b/plugins/modules/openssl_pkcs12.py index d3bb2d23..93a8f35b 100644 --- a/plugins/modules/openssl_pkcs12.py +++ b/plugins/modules/openssl_pkcs12.py @@ -166,6 +166,9 @@ options: - The default choice is V(auto), which tries to use C(cryptography) if available. - If set to V(cryptography), will try to use the L(cryptography,https://cryptography.io/) library. - The value V(pyopenssl) has been removed for community.crypto 3.0.0. + - Note that with community.crypto 3.0.0, all remaining values behave the same. + This option will be deprecated in a later version. + We recommend to not set it explicitly. type: str default: auto choices: [auto, cryptography] @@ -278,7 +281,7 @@ import os import stat import traceback -from ansible.module_utils.basic import AnsibleModule, missing_required_lib +from ansible.module_utils.basic import AnsibleModule from ansible.module_utils.common.text.converters import to_bytes, to_native from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( OpenSSLBadPassphraseError, @@ -297,32 +300,23 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im ) from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import ( COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION, + assert_required_cryptography_version, ) from ansible_collections.community.crypto.plugins.module_utils.io import ( load_file_if_exists, write_file, ) -from ansible_collections.community.crypto.plugins.module_utils.version import ( - LooseVersion, -) MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION -CRYPTOGRAPHY_IMP_ERR = None try: - import cryptography from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.serialization.pkcs12 import ( serialize_key_and_certificates, ) - - CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) except ImportError: - CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() - CRYPTOGRAPHY_FOUND = False -else: - CRYPTOGRAPHY_FOUND = True + pass CRYPTOGRAPHY_COMPATIBILITY2022_ERR = None try: @@ -340,14 +334,14 @@ else: CRYPTOGRAPHY_HAS_COMPATIBILITY2022 = True -def load_certificate_set(filename, backend): +def load_certificate_set(filename): """ Load list of concatenated PEM files, and return a list of parsed certificates. """ with open(filename, "rb") as f: data = f.read().decode("utf-8") return [ - load_certificate(None, content=cert.encode("utf-8"), backend=backend) + load_certificate(None, content=cert.encode("utf-8")) for cert in split_pem_list(data) ] @@ -357,14 +351,13 @@ class PkcsError(OpenSSLObjectError): class Pkcs(OpenSSLObject): - def __init__(self, module, backend, iter_size_default=2048): + def __init__(self, module, iter_size_default=2048): super(Pkcs, self).__init__( module.params["path"], module.params["state"], module.params["force"], module.check_mode, ) - self.backend = backend self.action = module.params["action"] self.other_certificates = module.params["other_certificates"] self.other_certificates_parse_all = module.params[ @@ -416,11 +409,11 @@ class Pkcs(OpenSSLObject): self.other_certificates = [] for other_cert_bundle in filenames: self.other_certificates.extend( - load_certificate_set(other_cert_bundle, self.backend) + load_certificate_set(other_cert_bundle) ) else: self.other_certificates = [ - load_certificate(other_cert, backend=self.backend) + load_certificate(other_cert) for other_cert in self.other_certificates ] elif self.other_certificates_content: @@ -432,9 +425,7 @@ class Pkcs(OpenSSLObject): ) ) self.other_certificates = [ - load_certificate( - None, content=to_bytes(other_cert), backend=self.backend - ) + load_certificate(None, content=to_bytes(other_cert)) for other_cert in certs ] @@ -475,7 +466,6 @@ class Pkcs(OpenSSLObject): None, content=self.privatekey_content, passphrase=self.privatekey_passphrase, - backend=self.backend, ) except OpenSSLObjectError: return False @@ -606,9 +596,7 @@ class Pkcs(OpenSSLObject): class PkcsCryptography(Pkcs): def __init__(self, module): - super(PkcsCryptography, self).__init__( - module, "cryptography", iter_size_default=50000 - ) + super(PkcsCryptography, self).__init__(module, iter_size_default=50000) if ( self.encryption_level == "compatibility2022" and not CRYPTOGRAPHY_HAS_COMPATIBILITY2022 @@ -628,16 +616,13 @@ class PkcsCryptography(Pkcs): None, content=self.privatekey_content, passphrase=self.privatekey_passphrase, - backend=self.backend, ) except OpenSSLBadPassphraseError as exc: raise PkcsError(exc) cert = None if self.certificate_content: - cert = load_certificate( - None, content=self.certificate_content, backend=self.backend - ) + cert = load_certificate(None, content=self.certificate_content) friendly_name = ( to_bytes(self.friendly_name) if self.friendly_name is not None else None @@ -726,33 +711,9 @@ class PkcsCryptography(Pkcs): return pkcs12[3] -def select_backend(module, backend): - if backend == "auto": - # Detection what is possible - can_use_cryptography = ( - CRYPTOGRAPHY_FOUND - and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION) - ) - if can_use_cryptography: - backend = "cryptography" - - # Success? - if backend == "auto": - module.fail_json( - msg=f"Cannot detect the required Python library cryptography (>= {MINIMAL_CRYPTOGRAPHY_VERSION})" - ) - - if backend == "cryptography": - if not CRYPTOGRAPHY_FOUND: - module.fail_json( - msg=missing_required_lib( - f"cryptography >= {MINIMAL_CRYPTOGRAPHY_VERSION}" - ), - exception=CRYPTOGRAPHY_IMP_ERR, - ) - return backend, PkcsCryptography(module) - else: - raise ValueError(f"Unsupported value for backend: {backend}") +def select_backend(module): + assert_required_cryptography_version(MINIMAL_CRYPTOGRAPHY_VERSION) + return PkcsCryptography(module) def main(): @@ -804,7 +765,7 @@ def main(): supports_check_mode=True, ) - backend, pkcs12 = select_backend(module, module.params["select_crypto_backend"]) + pkcs12 = select_backend(module) base_dir = os.path.dirname(module.params["path"]) or "." if not os.path.isdir(base_dir): diff --git a/plugins/modules/openssl_privatekey.py b/plugins/modules/openssl_privatekey.py index eb8064c3..73d87f69 100644 --- a/plugins/modules/openssl_privatekey.py +++ b/plugins/modules/openssl_privatekey.py @@ -270,10 +270,7 @@ def main(): msg=f"The directory {base_dir} does not exist or the file is not a directory", ) - backend, module_backend = select_backend( - module=module, - backend=module.params["select_crypto_backend"], - ) + module_backend = select_backend(module=module) try: private_key = PrivateKeyModule(module, module_backend) diff --git a/plugins/modules/openssl_privatekey_info.py b/plugins/modules/openssl_privatekey_info.py index 9bb152df..ec75177a 100644 --- a/plugins/modules/openssl_privatekey_info.py +++ b/plugins/modules/openssl_privatekey_info.py @@ -64,6 +64,9 @@ options: - Determines which crypto backend to use. - The default choice is V(auto), which tries to use C(cryptography) if available. - If set to V(cryptography), will try to use the L(cryptography,https://cryptography.io/) library. + - Note that with community.crypto 3.0.0, all values behave the same. + This option will be deprecated in a later version. + We recommend to not set it explicitly. type: str default: auto choices: [auto, cryptography] @@ -245,9 +248,8 @@ def main(): result["can_load_key"] = True - backend, module_backend = select_backend( + module_backend = select_backend( module, - module.params["select_crypto_backend"], data, passphrase=module.params["passphrase"], return_private_key_data=module.params["return_private_key_data"], diff --git a/plugins/modules/openssl_publickey.py b/plugins/modules/openssl_publickey.py index 511bb85d..62d22c5f 100644 --- a/plugins/modules/openssl_publickey.py +++ b/plugins/modules/openssl_publickey.py @@ -84,6 +84,9 @@ options: - Determines which crypto backend to use. - The default choice is V(auto), which tries to use C(cryptography) if available. - If set to V(cryptography), will try to use the L(cryptography,https://cryptography.io/) library. + - Note that with community.crypto 3.0.0, all values behave the same. + This option will be deprecated in a later version. + We recommend to not set it explicitly. type: str default: auto choices: [auto, cryptography] @@ -183,9 +186,8 @@ publickey: """ import os -import traceback -from ansible.module_utils.basic import AnsibleModule, missing_required_lib +from ansible.module_utils.basic import AnsibleModule from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( OpenSSLBadPassphraseError, OpenSSLObjectError, @@ -201,29 +203,20 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im ) from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import ( COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION, + assert_required_cryptography_version, ) from ansible_collections.community.crypto.plugins.module_utils.io import ( load_file_if_exists, write_file, ) -from ansible_collections.community.crypto.plugins.module_utils.version import ( - LooseVersion, -) MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION -CRYPTOGRAPHY_IMP_ERR = None try: - import cryptography from cryptography.hazmat.primitives import serialization as crypto_serialization - - CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) except ImportError: - CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() - CRYPTOGRAPHY_FOUND = False -else: - CRYPTOGRAPHY_FOUND = True + pass class PublicKeyError(OpenSSLObjectError): @@ -232,7 +225,7 @@ class PublicKeyError(OpenSSLObjectError): class PublicKey(OpenSSLObject): - def __init__(self, module, backend): + def __init__(self, module): super(PublicKey, self).__init__( module.params["path"], module.params["state"], @@ -250,7 +243,6 @@ class PublicKey(OpenSSLObject): self.publickey_bytes = None self.return_content = module.params["return_content"] self.fingerprint = {} - self.backend = backend self.backup = module.params["backup"] self.backup_file = None @@ -265,7 +257,7 @@ class PublicKey(OpenSSLObject): try: result.update( get_publickey_info( - self.module, self.backend, content=data, prefer_one_fingerprint=True + self.module, content=data, prefer_one_fingerprint=True ) ) result["can_parse_key"] = True @@ -280,19 +272,17 @@ class PublicKey(OpenSSLObject): path=self.privatekey_path, content=self.privatekey_content, passphrase=self.privatekey_passphrase, - backend=self.backend, ) - if self.backend == "cryptography": - if self.format == "OpenSSH": - return self.privatekey.public_key().public_bytes( - crypto_serialization.Encoding.OpenSSH, - crypto_serialization.PublicFormat.OpenSSH, - ) - else: - return self.privatekey.public_key().public_bytes( - crypto_serialization.Encoding.PEM, - crypto_serialization.PublicFormat.SubjectPublicKeyInfo, - ) + if self.format == "OpenSSH": + return self.privatekey.public_key().public_bytes( + crypto_serialization.Encoding.OpenSSH, + crypto_serialization.PublicFormat.OpenSSH, + ) + else: + return self.privatekey.public_key().public_bytes( + crypto_serialization.Encoding.PEM, + crypto_serialization.PublicFormat.SubjectPublicKeyInfo, + ) def generate(self, module): """Generate the public key.""" @@ -323,7 +313,6 @@ class PublicKey(OpenSSLObject): path=self.privatekey_path, content=self.privatekey_content, passphrase=self.privatekey_passphrase, - backend=self.backend, ) file_args = module.load_file_common_arguments(module.params) if module.check_file_absent_if_check_mode(file_args["path"]): @@ -348,24 +337,23 @@ class PublicKey(OpenSSLObject): self.diff_before = self.diff_after = self._get_info(publickey_content) if self.return_content: self.publickey_bytes = publickey_content - if self.backend == "cryptography": - if self.format == "OpenSSH": - # Read and dump public key. Makes sure that the comment is stripped off. - current_publickey = crypto_serialization.load_ssh_public_key( - publickey_content - ) - publickey_content = current_publickey.public_bytes( - crypto_serialization.Encoding.OpenSSH, - crypto_serialization.PublicFormat.OpenSSH, - ) - else: - current_publickey = crypto_serialization.load_pem_public_key( - publickey_content - ) - publickey_content = current_publickey.public_bytes( - crypto_serialization.Encoding.PEM, - crypto_serialization.PublicFormat.SubjectPublicKeyInfo, - ) + if self.format == "OpenSSH": + # Read and dump public key. Makes sure that the comment is stripped off. + current_publickey = crypto_serialization.load_ssh_public_key( + publickey_content + ) + publickey_content = current_publickey.public_bytes( + crypto_serialization.Encoding.OpenSSH, + crypto_serialization.PublicFormat.OpenSSH, + ) + else: + current_publickey = crypto_serialization.load_pem_public_key( + publickey_content + ) + publickey_content = current_publickey.public_bytes( + crypto_serialization.Encoding.PEM, + crypto_serialization.PublicFormat.SubjectPublicKeyInfo, + ) except Exception: return False @@ -440,35 +428,7 @@ def main(): mutually_exclusive=(["privatekey_path", "privatekey_content"],), ) - backend = module.params["select_crypto_backend"] - if backend == "auto": - # Detection what is possible - can_use_cryptography = ( - CRYPTOGRAPHY_FOUND - and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION) - ) - - # Decision - if can_use_cryptography: - backend = "cryptography" - - # Success? - if backend == "auto": - module.fail_json( - msg=f"Cannot detect the required Python library cryptography (>= {MINIMAL_CRYPTOGRAPHY_VERSION})", - ) - - if module.params["format"] == "OpenSSH" and backend != "cryptography": - module.fail_json(msg="Format OpenSSH requires the cryptography backend.") - - if backend == "cryptography": - if not CRYPTOGRAPHY_FOUND: - module.fail_json( - msg=missing_required_lib( - f"cryptography >= {MINIMAL_CRYPTOGRAPHY_VERSION}" - ), - exception=CRYPTOGRAPHY_IMP_ERR, - ) + assert_required_cryptography_version(MINIMAL_CRYPTOGRAPHY_VERSION) base_dir = os.path.dirname(module.params["path"]) or "." if not os.path.isdir(base_dir): @@ -478,7 +438,7 @@ def main(): ) try: - public_key = PublicKey(module, backend) + public_key = PublicKey(module) if public_key.state == "present": if module.check_mode: diff --git a/plugins/modules/openssl_publickey_info.py b/plugins/modules/openssl_publickey_info.py index 19fcf186..9642e0e8 100644 --- a/plugins/modules/openssl_publickey_info.py +++ b/plugins/modules/openssl_publickey_info.py @@ -36,6 +36,9 @@ options: - Determines which crypto backend to use. - The default choice is V(auto), which tries to use C(cryptography) if available. - If set to V(cryptography), will try to use the L(cryptography,https://cryptography.io/) library. + - Note that with community.crypto 3.0.0, all values behave the same. + This option will be deprecated in a later version. + We recommend to not set it explicitly. type: str default: auto choices: [auto, cryptography] @@ -191,9 +194,7 @@ def main(): msg=f"Error while reading public key file from disk: {e}", **result ) - backend, module_backend = select_backend( - module, module.params["select_crypto_backend"], data - ) + module_backend = select_backend(module, data) try: result.update(module_backend.get_info()) diff --git a/plugins/modules/openssl_signature.py b/plugins/modules/openssl_signature.py index 3b503691..9c141727 100644 --- a/plugins/modules/openssl_signature.py +++ b/plugins/modules/openssl_signature.py @@ -58,6 +58,9 @@ options: - Determines which crypto backend to use. - The default choice is V(auto), which tries to use C(cryptography) if available. - If set to V(cryptography), will try to use the L(cryptography,https://cryptography.io/) library. + - Note that with community.crypto 3.0.0, all values behave the same. + This option will be deprecated in a later version. + We recommend to not set it explicitly. type: str default: auto choices: [auto, cryptography] @@ -96,10 +99,10 @@ signature: import base64 import os -import traceback from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import ( COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION, + assert_required_cryptography_version, ) from ansible_collections.community.crypto.plugins.module_utils.version import ( LooseVersion, @@ -108,7 +111,6 @@ from ansible_collections.community.crypto.plugins.module_utils.version import ( MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION -CRYPTOGRAPHY_IMP_ERR = None try: import cryptography import cryptography.hazmat.primitives.asymmetric.padding @@ -116,12 +118,9 @@ try: CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) except ImportError: - CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() - CRYPTOGRAPHY_FOUND = False -else: - CRYPTOGRAPHY_FOUND = True + CRYPTOGRAPHY_VERSION = LooseVersion("0.0") -from ansible.module_utils.basic import AnsibleModule, missing_required_lib +from ansible.module_utils.basic import AnsibleModule from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( OpenSSLObjectError, ) @@ -133,7 +132,7 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im class SignatureBase(OpenSSLObject): - def __init__(self, module, backend): + def __init__(self, module): super(SignatureBase, self).__init__( path=module.params["path"], state="present", @@ -141,8 +140,6 @@ class SignatureBase(OpenSSLObject): check_mode=module.check_mode, ) - self.backend = backend - self.privatekey_path = module.params["privatekey_path"] self.privatekey_content = module.params["privatekey_content"] if self.privatekey_content is not None: @@ -161,8 +158,8 @@ class SignatureBase(OpenSSLObject): # Implementation with using cryptography class SignatureCryptography(SignatureBase): - def __init__(self, module, backend): - super(SignatureCryptography, self).__init__(module, backend) + def __init__(self, module): + super(SignatureCryptography, self).__init__(module) def run(self): _padding = cryptography.hazmat.primitives.asymmetric.padding.PKCS1v15() @@ -178,7 +175,6 @@ class SignatureCryptography(SignatureBase): path=self.privatekey_path, content=self.privatekey_content, passphrase=self.privatekey_passphrase, - backend=self.backend, ) signature = None @@ -249,33 +245,10 @@ def main(): msg=f"The file {module.params['path']} does not exist", ) - backend = module.params["select_crypto_backend"] - if backend == "auto": - # Detection what is possible - can_use_cryptography = ( - CRYPTOGRAPHY_FOUND - and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION) - ) + assert_required_cryptography_version(MINIMAL_CRYPTOGRAPHY_VERSION) - # Decision - if can_use_cryptography: - backend = "cryptography" - - # Success? - if backend == "auto": - module.fail_json( - msg=f"Cannot detect the required Python library cryptography (>= {MINIMAL_CRYPTOGRAPHY_VERSION})", - ) try: - if backend == "cryptography": - if not CRYPTOGRAPHY_FOUND: - module.fail_json( - msg=missing_required_lib( - f"cryptography >= {MINIMAL_CRYPTOGRAPHY_VERSION}" - ), - exception=CRYPTOGRAPHY_IMP_ERR, - ) - _sign = SignatureCryptography(module, backend) + _sign = SignatureCryptography(module) result = _sign.run() diff --git a/plugins/modules/openssl_signature_info.py b/plugins/modules/openssl_signature_info.py index 5eb0b64d..7de9c7b6 100644 --- a/plugins/modules/openssl_signature_info.py +++ b/plugins/modules/openssl_signature_info.py @@ -47,6 +47,9 @@ options: - Determines which crypto backend to use. - The default choice is V(auto), which tries to use C(cryptography) if available. - If set to V(cryptography), will try to use the L(cryptography,https://cryptography.io/) library. + - Note that with community.crypto 3.0.0, all values behave the same. + This option will be deprecated in a later version. + We recommend to not set it explicitly. type: str default: auto choices: [auto, cryptography] @@ -85,10 +88,10 @@ valid: import base64 import os -import traceback from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import ( COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION, + assert_required_cryptography_version, ) from ansible_collections.community.crypto.plugins.module_utils.version import ( LooseVersion, @@ -97,7 +100,6 @@ from ansible_collections.community.crypto.plugins.module_utils.version import ( MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION -CRYPTOGRAPHY_IMP_ERR = None try: import cryptography import cryptography.hazmat.primitives.asymmetric.padding @@ -105,12 +107,9 @@ try: CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) except ImportError: - CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() - CRYPTOGRAPHY_FOUND = False -else: - CRYPTOGRAPHY_FOUND = True + CRYPTOGRAPHY_VERSION = LooseVersion("0.0") -from ansible.module_utils.basic import AnsibleModule, missing_required_lib +from ansible.module_utils.basic import AnsibleModule from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( OpenSSLObjectError, ) @@ -122,7 +121,7 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im class SignatureInfoBase(OpenSSLObject): - def __init__(self, module, backend): + def __init__(self, module): super(SignatureInfoBase, self).__init__( path=module.params["path"], state="present", @@ -130,8 +129,6 @@ class SignatureInfoBase(OpenSSLObject): check_mode=module.check_mode, ) - self.backend = backend - self.signature = module.params["signature"] self.certificate_path = module.params["certificate_path"] self.certificate_content = module.params["certificate_content"] @@ -150,8 +147,8 @@ class SignatureInfoBase(OpenSSLObject): # Implementation with using cryptography class SignatureInfoCryptography(SignatureInfoBase): - def __init__(self, module, backend): - super(SignatureInfoCryptography, self).__init__(module, backend) + def __init__(self, module): + super(SignatureInfoCryptography, self).__init__(module) def run(self): _padding = cryptography.hazmat.primitives.asymmetric.padding.PKCS1v15() @@ -167,7 +164,6 @@ class SignatureInfoCryptography(SignatureInfoBase): certificate = load_certificate( path=self.certificate_path, content=self.certificate_content, - backend=self.backend, ) public_key = certificate.public_key() verified = False @@ -254,33 +250,10 @@ def main(): msg=f"The file {module.params['path']} does not exist", ) - backend = module.params["select_crypto_backend"] - if backend == "auto": - # Detection what is possible - can_use_cryptography = ( - CRYPTOGRAPHY_FOUND - and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION) - ) + assert_required_cryptography_version(MINIMAL_CRYPTOGRAPHY_VERSION) - # Decision - if can_use_cryptography: - backend = "cryptography" - - # Success? - if backend == "auto": - module.fail_json( - msg=f"Cannot detect any of the required Python libraries cryptography (>= {MINIMAL_CRYPTOGRAPHY_VERSION})" - ) try: - if backend == "cryptography": - if not CRYPTOGRAPHY_FOUND: - module.fail_json( - msg=missing_required_lib( - f"cryptography >= {MINIMAL_CRYPTOGRAPHY_VERSION}" - ), - exception=CRYPTOGRAPHY_IMP_ERR, - ) - _sign = SignatureInfoCryptography(module, backend) + _sign = SignatureInfoCryptography(module) result = _sign.run() diff --git a/plugins/modules/x509_certificate.py b/plugins/modules/x509_certificate.py index 9297d76f..7c228c09 100644 --- a/plugins/modules/x509_certificate.py +++ b/plugins/modules/x509_certificate.py @@ -396,8 +396,7 @@ def main(): "selfsigned": SelfSignedCertificateProvider, } - backend = module.params["select_crypto_backend"] - module_backend = select_backend(module, backend, provider_map[provider]()) + module_backend = select_backend(module, provider_map[provider]()) certificate = GenericCertificate(module, module_backend) certificate.generate(module) diff --git a/plugins/modules/x509_certificate_convert.py b/plugins/modules/x509_certificate_convert.py index c2a707bd..5d7ab46a 100644 --- a/plugins/modules/x509_certificate_convert.py +++ b/plugins/modules/x509_certificate_convert.py @@ -106,9 +106,8 @@ backup_file: import base64 import os -import traceback -from ansible.module_utils.basic import AnsibleModule, missing_required_lib +from ansible.module_utils.basic import AnsibleModule from ansible.module_utils.common.text.converters import to_bytes, to_text from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( OpenSSLObjectError, @@ -126,6 +125,7 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im ) from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import ( COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION, + assert_required_cryptography_version, ) from ansible_collections.community.crypto.plugins.module_utils.io import ( load_file_if_exists, @@ -135,15 +135,11 @@ from ansible_collections.community.crypto.plugins.module_utils.io import ( MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION -CRYPTOGRAPHY_IMP_ERR = None try: import cryptography # noqa: F401, pylint: disable=unused-import from cryptography.x509 import load_der_x509_certificate except ImportError: - CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() - CRYPTOGRAPHY_FOUND = False -else: - CRYPTOGRAPHY_FOUND = True + pass def parse_certificate(input, strict=False): @@ -226,13 +222,7 @@ class X509CertificateConvertModule(OpenSSLObject): pass def verify_cert_parsable(self, module): - if not CRYPTOGRAPHY_FOUND: - module.fail_json( - msg=missing_required_lib( - f"cryptography >= {MINIMAL_CRYPTOGRAPHY_VERSION}" - ), - exception=CRYPTOGRAPHY_IMP_ERR, - ) + assert_required_cryptography_version(MINIMAL_CRYPTOGRAPHY_VERSION) try: load_der_x509_certificate(self.input) except Exception as exc: diff --git a/plugins/modules/x509_certificate_info.py b/plugins/modules/x509_certificate_info.py index 957c56f8..6b417e8c 100644 --- a/plugins/modules/x509_certificate_info.py +++ b/plugins/modules/x509_certificate_info.py @@ -57,6 +57,9 @@ options: - Determines which crypto backend to use. - The default choice is V(auto), which tries to use C(cryptography) if available. - If set to V(cryptography), will try to use the L(cryptography,https://cryptography.io/) library. + - Note that with community.crypto 3.0.0, all values behave the same. + This option will be deprecated in a later version. + We recommend to not set it explicitly. type: str default: auto choices: [auto, cryptography] @@ -430,9 +433,7 @@ def main(): except (IOError, OSError) as e: module.fail_json(msg=f"Error while reading certificate file from disk: {e}") - backend, module_backend = select_backend( - module, module.params["select_crypto_backend"], data - ) + module_backend = select_backend(module, data) valid_at = module.params["valid_at"] if valid_at: diff --git a/plugins/modules/x509_certificate_pipe.py b/plugins/modules/x509_certificate_pipe.py index fc35c8ac..07bad567 100644 --- a/plugins/modules/x509_certificate_pipe.py +++ b/plugins/modules/x509_certificate_pipe.py @@ -188,8 +188,7 @@ def main(): "selfsigned": SelfSignedCertificateProvider, } - backend = module.params["select_crypto_backend"] - module_backend = select_backend(module, backend, provider_map[provider]()) + module_backend = select_backend(module, provider_map[provider]()) certificate = GenericCertificate(module, module_backend) certificate.generate(module) result = certificate.dump() diff --git a/plugins/modules/x509_crl.py b/plugins/modules/x509_crl.py index 426d9ba1..1138f24f 100644 --- a/plugins/modules/x509_crl.py +++ b/plugins/modules/x509_crl.py @@ -425,9 +425,8 @@ crl: import base64 import os -import traceback -from ansible.module_utils.basic import AnsibleModule, missing_required_lib +from ansible.module_utils.basic import AnsibleModule from ansible.module_utils.common.text.converters import to_text from ansible.module_utils.common.validation import check_type_int, check_type_str from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( @@ -471,6 +470,7 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im ) from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import ( COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION, + assert_required_cryptography_version, ) from ansible_collections.community.crypto.plugins.module_utils.io import write_file from ansible_collections.community.crypto.plugins.module_utils.serial import ( @@ -479,16 +479,11 @@ from ansible_collections.community.crypto.plugins.module_utils.serial import ( from ansible_collections.community.crypto.plugins.module_utils.time import ( get_relative_time_option, ) -from ansible_collections.community.crypto.plugins.module_utils.version import ( - LooseVersion, -) MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION -CRYPTOGRAPHY_IMP_ERR = None try: - import cryptography from cryptography import x509 from cryptography.hazmat.primitives.serialization import Encoding from cryptography.x509 import ( @@ -497,13 +492,8 @@ try: NameAttribute, RevokedCertificateBuilder, ) - - CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) except ImportError: - CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() - CRYPTOGRAPHY_FOUND = False -else: - CRYPTOGRAPHY_FOUND = True + pass class CRLError(OpenSSLObjectError): @@ -582,9 +572,7 @@ class CRL(OpenSSLObject): try: if rc["content"] is not None: rc["content"] = rc["content"].encode("utf-8") - cert = load_certificate( - rc["path"], content=rc["content"], backend="cryptography" - ) + cert = load_certificate(rc["path"], content=rc["content"]) result["serial_number"] = cert.serial_number except OpenSSLObjectError as e: if rc["content"] is not None: @@ -631,7 +619,6 @@ class CRL(OpenSSLObject): path=self.privatekey_path, content=self.privatekey_content, passphrase=self.privatekey_passphrase, - backend="cryptography", ) except OpenSSLBadPassphraseError as exc: raise CRLError(exc) @@ -1011,11 +998,7 @@ def main(): add_file_common_args=True, ) - if not CRYPTOGRAPHY_FOUND: - module.fail_json( - msg=missing_required_lib(f"cryptography >= {MINIMAL_CRYPTOGRAPHY_VERSION}"), - exception=CRYPTOGRAPHY_IMP_ERR, - ) + assert_required_cryptography_version(MINIMAL_CRYPTOGRAPHY_VERSION) try: crl = CRL(module) diff --git a/tests/unit/plugins/module_utils/test_time.py b/tests/unit/plugins/module_utils/test_time.py index acef38dd..97f239f9 100644 --- a/tests/unit/plugins/module_utils/test_time.py +++ b/tests/unit/plugins/module_utils/test_time.py @@ -173,7 +173,6 @@ TEST_GET_RELATIVE_TIME_OPTION = cartesian_product( ( "+1d2h3m4s", "foo", - "cryptography", False, datetime.datetime(2024, 1, 1, 0, 0, 0), datetime.datetime(2024, 1, 2, 2, 3, 4), @@ -181,7 +180,6 @@ TEST_GET_RELATIVE_TIME_OPTION = cartesian_product( ( "-1w10d24h", "foo", - "cryptography", False, datetime.datetime(2024, 1, 1, 0, 0, 0), datetime.datetime(2023, 12, 14, 0, 0, 0), @@ -189,7 +187,6 @@ TEST_GET_RELATIVE_TIME_OPTION = cartesian_product( ( "20240102040506Z", "foo", - "cryptography", False, datetime.datetime(2024, 1, 1, 0, 0, 0), datetime.datetime(2024, 1, 2, 4, 5, 6), @@ -197,7 +194,6 @@ TEST_GET_RELATIVE_TIME_OPTION = cartesian_product( ( "202401020405Z", "foo", - "cryptography", False, datetime.datetime(2024, 1, 1, 0, 0, 0), datetime.datetime(2024, 1, 2, 4, 5, 0), @@ -205,7 +201,6 @@ TEST_GET_RELATIVE_TIME_OPTION = cartesian_product( ( "+1d2h3m4s", "foo", - "cryptography", True, datetime.datetime(2024, 1, 1, 0, 0, 0), datetime.datetime(2024, 1, 2, 2, 3, 4, tzinfo=UTC), @@ -213,7 +208,6 @@ TEST_GET_RELATIVE_TIME_OPTION = cartesian_product( ( "-1w10d24h", "foo", - "cryptography", True, datetime.datetime(2024, 1, 1, 0, 0, 0), datetime.datetime(2023, 12, 14, 0, 0, 0, tzinfo=UTC), @@ -221,7 +215,6 @@ TEST_GET_RELATIVE_TIME_OPTION = cartesian_product( ( "20240102040506Z", "foo", - "cryptography", True, datetime.datetime(2024, 1, 1, 0, 0, 0), datetime.datetime(2024, 1, 2, 4, 5, 6, tzinfo=UTC), @@ -229,7 +222,6 @@ TEST_GET_RELATIVE_TIME_OPTION = cartesian_product( ( "202401020405Z", "foo", - "cryptography", True, datetime.datetime(2024, 1, 1, 0, 0, 0), datetime.datetime(2024, 1, 2, 4, 5, 0, tzinfo=UTC), @@ -237,7 +229,6 @@ TEST_GET_RELATIVE_TIME_OPTION = cartesian_product( ( "20240102040506+0100", "foo", - "cryptography", False, datetime.datetime(2024, 1, 1, 0, 0, 0), datetime.datetime(2024, 1, 2, 3, 5, 6), @@ -245,7 +236,6 @@ TEST_GET_RELATIVE_TIME_OPTION = cartesian_product( ( "202401020405+0100", "foo", - "cryptography", False, datetime.datetime(2024, 1, 1, 0, 0, 0), datetime.datetime(2024, 1, 2, 3, 5, 0), @@ -253,7 +243,6 @@ TEST_GET_RELATIVE_TIME_OPTION = cartesian_product( ( "20240102040506+0100", "foo", - "cryptography", True, datetime.datetime(2024, 1, 1, 0, 0, 0), datetime.datetime(2024, 1, 2, 3, 5, 6, tzinfo=UTC), @@ -261,7 +250,6 @@ TEST_GET_RELATIVE_TIME_OPTION = cartesian_product( ( "202401020405+0100", "foo", - "cryptography", True, datetime.datetime(2024, 1, 1, 0, 0, 0), datetime.datetime(2024, 1, 2, 3, 5, 0, tzinfo=UTC), @@ -344,17 +332,16 @@ def test_convert_relative_to_datetime( @pytest.mark.parametrize( - "timezone, input_string, input_name, backend, with_timezone, now, expected", + "timezone, input_string, input_name, with_timezone, now, expected", TEST_GET_RELATIVE_TIME_OPTION, ) def test_get_relative_time_option( - timezone, input_string, input_name, backend, with_timezone, now, expected + timezone, input_string, input_name, with_timezone, now, expected ): with freeze_time("2024-02-03 04:05:06", tz_offset=timezone): output = get_relative_time_option( input_string, input_name, - backend=backend, with_timezone=with_timezone, now=now, )