mirror of
https://github.com/ansible-collections/community.crypto.git
synced 2026-05-06 13:22:58 +00:00
Get rid of backend parameter whenever possible (#883)
* Get rid of backend parameter whenever possible. * Always auto-detect if backend choices are 'cryptography' and 'auto', resp. always check cryptography version. * Improve error message. * Update documentation.
This commit is contained in:
@@ -102,7 +102,7 @@ class CryptoBackend:
|
||||
def parse_module_parameter(self, value, name):
|
||||
try:
|
||||
return get_relative_time_option(
|
||||
value, name, backend="cryptography", with_timezone=self._with_timezone
|
||||
value, name, with_timezone=self._with_timezone
|
||||
)
|
||||
except OpenSSLObjectError as exc:
|
||||
raise BackendException(str(exc))
|
||||
|
||||
@@ -6,10 +6,8 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import abc
|
||||
import traceback
|
||||
|
||||
from ansible.module_utils import six
|
||||
from ansible.module_utils.basic import missing_required_lib
|
||||
from ansible_collections.community.crypto.plugins.module_utils.argspec import (
|
||||
ArgumentSpec,
|
||||
)
|
||||
@@ -32,26 +30,17 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im
|
||||
)
|
||||
from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import (
|
||||
COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION,
|
||||
)
|
||||
from ansible_collections.community.crypto.plugins.module_utils.version import (
|
||||
LooseVersion,
|
||||
assert_required_cryptography_version,
|
||||
)
|
||||
|
||||
|
||||
MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION
|
||||
|
||||
CRYPTOGRAPHY_IMP_ERR = None
|
||||
CRYPTOGRAPHY_VERSION = None
|
||||
try:
|
||||
import cryptography
|
||||
from cryptography import x509
|
||||
|
||||
CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
|
||||
except ImportError:
|
||||
CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
|
||||
CRYPTOGRAPHY_FOUND = False
|
||||
else:
|
||||
CRYPTOGRAPHY_FOUND = True
|
||||
pass
|
||||
|
||||
|
||||
class CertificateError(OpenSSLObjectError):
|
||||
@@ -60,9 +49,8 @@ class CertificateError(OpenSSLObjectError):
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class CertificateBackend:
|
||||
def __init__(self, module, backend):
|
||||
def __init__(self, module):
|
||||
self.module = module
|
||||
self.backend = backend
|
||||
|
||||
self.force = module.params["force"]
|
||||
self.ignore_timestamps = module.params["ignore_timestamps"]
|
||||
@@ -98,7 +86,7 @@ class CertificateBackend:
|
||||
return dict()
|
||||
try:
|
||||
result = get_certificate_info(
|
||||
self.module, self.backend, data, prefer_one_fingerprint=True
|
||||
self.module, data, prefer_one_fingerprint=True
|
||||
)
|
||||
result["can_parse_certificate"] = True
|
||||
return result
|
||||
@@ -137,7 +125,6 @@ class CertificateBackend:
|
||||
path=self.privatekey_path,
|
||||
content=self.privatekey_content,
|
||||
passphrase=self.privatekey_passphrase,
|
||||
backend=self.backend,
|
||||
)
|
||||
except OpenSSLBadPassphraseError as exc:
|
||||
raise CertificateError(exc)
|
||||
@@ -151,7 +138,6 @@ class CertificateBackend:
|
||||
self.csr = load_certificate_request(
|
||||
path=self.csr_path,
|
||||
content=self.csr_content,
|
||||
backend=self.backend,
|
||||
)
|
||||
|
||||
def _ensure_existing_certificate_loaded(self):
|
||||
@@ -163,75 +149,72 @@ class CertificateBackend:
|
||||
self.existing_certificate = load_certificate(
|
||||
path=None,
|
||||
content=self.existing_certificate_bytes,
|
||||
backend=self.backend,
|
||||
)
|
||||
|
||||
def _check_privatekey(self):
|
||||
"""Check whether provided parameters match, assuming self.existing_certificate and self.privatekey have been populated."""
|
||||
if self.backend == "cryptography":
|
||||
return cryptography_compare_public_keys(
|
||||
self.existing_certificate.public_key(), self.privatekey.public_key()
|
||||
)
|
||||
return cryptography_compare_public_keys(
|
||||
self.existing_certificate.public_key(), self.privatekey.public_key()
|
||||
)
|
||||
|
||||
def _check_csr(self):
|
||||
"""Check whether provided parameters match, assuming self.existing_certificate and self.csr have been populated."""
|
||||
if self.backend == "cryptography":
|
||||
# Verify that CSR is signed by certificate's private key
|
||||
if not self.csr.is_signature_valid:
|
||||
return False
|
||||
if not cryptography_compare_public_keys(
|
||||
self.csr.public_key(), self.existing_certificate.public_key()
|
||||
):
|
||||
return False
|
||||
# Check subject
|
||||
if (
|
||||
self.check_csr_subject
|
||||
and self.csr.subject != self.existing_certificate.subject
|
||||
):
|
||||
return False
|
||||
# Check extensions
|
||||
if not self.check_csr_extensions:
|
||||
return True
|
||||
cert_exts = list(self.existing_certificate.extensions)
|
||||
csr_exts = list(self.csr.extensions)
|
||||
if self.create_subject_key_identifier != "never_create":
|
||||
# Filter out SubjectKeyIdentifier extension before comparison
|
||||
cert_exts = list(
|
||||
filter(
|
||||
lambda x: not isinstance(x.value, x509.SubjectKeyIdentifier),
|
||||
cert_exts,
|
||||
)
|
||||
)
|
||||
csr_exts = list(
|
||||
filter(
|
||||
lambda x: not isinstance(x.value, x509.SubjectKeyIdentifier),
|
||||
csr_exts,
|
||||
)
|
||||
)
|
||||
if self.create_authority_key_identifier:
|
||||
# Filter out AuthorityKeyIdentifier extension before comparison
|
||||
cert_exts = list(
|
||||
filter(
|
||||
lambda x: not isinstance(x.value, x509.AuthorityKeyIdentifier),
|
||||
cert_exts,
|
||||
)
|
||||
)
|
||||
csr_exts = list(
|
||||
filter(
|
||||
lambda x: not isinstance(x.value, x509.AuthorityKeyIdentifier),
|
||||
csr_exts,
|
||||
)
|
||||
)
|
||||
if len(cert_exts) != len(csr_exts):
|
||||
return False
|
||||
for cert_ext in cert_exts:
|
||||
try:
|
||||
csr_ext = self.csr.extensions.get_extension_for_oid(cert_ext.oid)
|
||||
if cert_ext != csr_ext:
|
||||
return False
|
||||
except cryptography.x509.ExtensionNotFound:
|
||||
return False
|
||||
# Verify that CSR is signed by certificate's private key
|
||||
if not self.csr.is_signature_valid:
|
||||
return False
|
||||
if not cryptography_compare_public_keys(
|
||||
self.csr.public_key(), self.existing_certificate.public_key()
|
||||
):
|
||||
return False
|
||||
# Check subject
|
||||
if (
|
||||
self.check_csr_subject
|
||||
and self.csr.subject != self.existing_certificate.subject
|
||||
):
|
||||
return False
|
||||
# Check extensions
|
||||
if not self.check_csr_extensions:
|
||||
return True
|
||||
cert_exts = list(self.existing_certificate.extensions)
|
||||
csr_exts = list(self.csr.extensions)
|
||||
if self.create_subject_key_identifier != "never_create":
|
||||
# Filter out SubjectKeyIdentifier extension before comparison
|
||||
cert_exts = list(
|
||||
filter(
|
||||
lambda x: not isinstance(x.value, x509.SubjectKeyIdentifier),
|
||||
cert_exts,
|
||||
)
|
||||
)
|
||||
csr_exts = list(
|
||||
filter(
|
||||
lambda x: not isinstance(x.value, x509.SubjectKeyIdentifier),
|
||||
csr_exts,
|
||||
)
|
||||
)
|
||||
if self.create_authority_key_identifier:
|
||||
# Filter out AuthorityKeyIdentifier extension before comparison
|
||||
cert_exts = list(
|
||||
filter(
|
||||
lambda x: not isinstance(x.value, x509.AuthorityKeyIdentifier),
|
||||
cert_exts,
|
||||
)
|
||||
)
|
||||
csr_exts = list(
|
||||
filter(
|
||||
lambda x: not isinstance(x.value, x509.AuthorityKeyIdentifier),
|
||||
csr_exts,
|
||||
)
|
||||
)
|
||||
if len(cert_exts) != len(csr_exts):
|
||||
return False
|
||||
for cert_ext in cert_exts:
|
||||
try:
|
||||
csr_ext = self.csr.extensions.get_extension_for_oid(cert_ext.oid)
|
||||
if cert_ext != csr_ext:
|
||||
return False
|
||||
except cryptography.x509.ExtensionNotFound:
|
||||
return False
|
||||
return True
|
||||
|
||||
def _check_subject_key_identifier(self):
|
||||
"""Check whether Subject Key Identifier matches, assuming self.existing_certificate has been populated."""
|
||||
@@ -336,53 +319,29 @@ class CertificateProvider:
|
||||
"""Whether the provider needs to create a version 2 certificate."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def create_backend(self, module, backend):
|
||||
def create_backend(self, module):
|
||||
"""Create an implementation for a backend.
|
||||
|
||||
Return value must be instance of CertificateBackend.
|
||||
"""
|
||||
|
||||
|
||||
def select_backend(module, backend, provider):
|
||||
def select_backend(module, provider):
|
||||
"""
|
||||
:type module: AnsibleModule
|
||||
:type backend: str
|
||||
:type provider: CertificateProvider
|
||||
"""
|
||||
provider.validate_module_args(module)
|
||||
|
||||
backend = module.params["select_crypto_backend"]
|
||||
if backend == "auto":
|
||||
# Detect what backend we can use
|
||||
can_use_cryptography = (
|
||||
CRYPTOGRAPHY_FOUND
|
||||
and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION)
|
||||
assert_required_cryptography_version(MINIMAL_CRYPTOGRAPHY_VERSION)
|
||||
|
||||
if provider.needs_version_two_certs(module):
|
||||
# TODO: remove
|
||||
module.fail_json(
|
||||
msg="The cryptography backend does not support v2 certificates"
|
||||
)
|
||||
|
||||
# If cryptography is available we'll use it
|
||||
if can_use_cryptography:
|
||||
backend = "cryptography"
|
||||
|
||||
# Fail if no backend has been found
|
||||
if backend == "auto":
|
||||
module.fail_json(
|
||||
msg=f"Cannot detect the required Python library cryptography (>= {MINIMAL_CRYPTOGRAPHY_VERSION})"
|
||||
)
|
||||
|
||||
if backend == "cryptography":
|
||||
if not CRYPTOGRAPHY_FOUND:
|
||||
module.fail_json(
|
||||
msg=missing_required_lib(
|
||||
f"cryptography >= {MINIMAL_CRYPTOGRAPHY_VERSION}"
|
||||
),
|
||||
exception=CRYPTOGRAPHY_IMP_ERR,
|
||||
)
|
||||
if provider.needs_version_two_certs(module):
|
||||
module.fail_json(
|
||||
msg="The cryptography backend does not support v2 certificates"
|
||||
)
|
||||
|
||||
return provider.create_backend(module, backend)
|
||||
return provider.create_backend(module)
|
||||
|
||||
|
||||
def get_certificate_argument_spec():
|
||||
|
||||
@@ -18,8 +18,8 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.module_bac
|
||||
|
||||
|
||||
class AcmeCertificateBackend(CertificateBackend):
|
||||
def __init__(self, module, backend):
|
||||
super(AcmeCertificateBackend, self).__init__(module, backend)
|
||||
def __init__(self, module):
|
||||
super(AcmeCertificateBackend, self).__init__(module)
|
||||
self.accountkey_path = module.params["acme_accountkey_path"]
|
||||
self.challenge_path = module.params["acme_challenge_path"]
|
||||
self.use_chain = module.params["acme_chain"]
|
||||
@@ -105,8 +105,8 @@ class AcmeCertificateProvider(CertificateProvider):
|
||||
def needs_version_two_certs(self, module):
|
||||
return False
|
||||
|
||||
def create_backend(self, module, backend):
|
||||
return AcmeCertificateBackend(module, backend)
|
||||
def create_backend(self, module):
|
||||
return AcmeCertificateBackend(module)
|
||||
|
||||
|
||||
def add_acme_provider_to_argument_spec(argument_spec):
|
||||
|
||||
@@ -39,13 +39,12 @@ except ImportError:
|
||||
|
||||
|
||||
class EntrustCertificateBackend(CertificateBackend):
|
||||
def __init__(self, module, backend):
|
||||
super(EntrustCertificateBackend, self).__init__(module, backend)
|
||||
def __init__(self, module):
|
||||
super(EntrustCertificateBackend, self).__init__(module)
|
||||
self.trackingId = None
|
||||
self.notAfter = get_relative_time_option(
|
||||
module.params["entrust_not_after"],
|
||||
"entrust_not_after",
|
||||
backend=self.backend,
|
||||
with_timezone=CRYPTOGRAPHY_TIMEZONE,
|
||||
)
|
||||
|
||||
@@ -64,19 +63,18 @@ class EntrustCertificateBackend(CertificateBackend):
|
||||
# We want to always force behavior of trying to use the organization provided in the CSR.
|
||||
# To that end we need to parse out the organization from the CSR.
|
||||
self.csr_org = None
|
||||
if self.backend == "cryptography":
|
||||
csr_subject_orgs = self.csr.subject.get_attributes_for_oid(
|
||||
NameOID.ORGANIZATION_NAME
|
||||
)
|
||||
if len(csr_subject_orgs) == 1:
|
||||
self.csr_org = csr_subject_orgs[0].value
|
||||
elif len(csr_subject_orgs) > 1:
|
||||
self.module.fail_json(
|
||||
msg=(
|
||||
"Entrust provider does not currently support multiple validated organizations. Multiple organizations found in "
|
||||
f"Subject DN: '{self.csr.subject}'. "
|
||||
)
|
||||
csr_subject_orgs = self.csr.subject.get_attributes_for_oid(
|
||||
NameOID.ORGANIZATION_NAME
|
||||
)
|
||||
if len(csr_subject_orgs) == 1:
|
||||
self.csr_org = csr_subject_orgs[0].value
|
||||
elif len(csr_subject_orgs) > 1:
|
||||
self.module.fail_json(
|
||||
msg=(
|
||||
"Entrust provider does not currently support multiple validated organizations. Multiple organizations found in "
|
||||
f"Subject DN: '{self.csr.subject}'. "
|
||||
)
|
||||
)
|
||||
# If no organization in the CSR, explicitly tell ECS that it should be blank in issued cert, not defaulted to
|
||||
# organization tied to the account.
|
||||
if self.csr_org is None:
|
||||
@@ -136,7 +134,8 @@ class EntrustCertificateBackend(CertificateBackend):
|
||||
|
||||
self.cert_bytes = to_bytes(result.get("endEntityCert"))
|
||||
self.cert = load_certificate(
|
||||
path=None, content=self.cert_bytes, backend=self.backend
|
||||
path=None,
|
||||
content=self.cert_bytes,
|
||||
)
|
||||
|
||||
def get_certificate_data(self):
|
||||
@@ -175,11 +174,8 @@ class EntrustCertificateBackend(CertificateBackend):
|
||||
except Exception:
|
||||
return
|
||||
if self.existing_certificate:
|
||||
serial_number = None
|
||||
expiry = None
|
||||
if self.backend == "cryptography":
|
||||
serial_number = f"{self.existing_certificate.serial_number:X}"
|
||||
expiry = get_not_valid_after(self.existing_certificate)
|
||||
serial_number = f"{self.existing_certificate.serial_number:X}"
|
||||
expiry = get_not_valid_after(self.existing_certificate)
|
||||
|
||||
# get some information about the expiry of this certificate
|
||||
expiry_iso3339 = expiry.strftime("%Y-%m-%dT%H:%M:%S.00Z")
|
||||
@@ -213,8 +209,8 @@ class EntrustCertificateProvider(CertificateProvider):
|
||||
def needs_version_two_certs(self, module):
|
||||
return False
|
||||
|
||||
def create_backend(self, module, backend):
|
||||
return EntrustCertificateBackend(module, backend)
|
||||
def create_backend(self, module):
|
||||
return EntrustCertificateBackend(module)
|
||||
|
||||
|
||||
def add_entrust_provider_to_argument_spec(argument_spec):
|
||||
|
||||
@@ -8,10 +8,8 @@ from __future__ import annotations
|
||||
|
||||
import abc
|
||||
import binascii
|
||||
import traceback
|
||||
|
||||
from ansible.module_utils import six
|
||||
from ansible.module_utils.basic import missing_required_lib
|
||||
from ansible.module_utils.common.text.converters import to_native
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
|
||||
CRYPTOGRAPHY_TIMEZONE,
|
||||
@@ -30,29 +28,21 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im
|
||||
)
|
||||
from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import (
|
||||
COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION,
|
||||
assert_required_cryptography_version,
|
||||
)
|
||||
from ansible_collections.community.crypto.plugins.module_utils.time import (
|
||||
get_now_datetime,
|
||||
)
|
||||
from ansible_collections.community.crypto.plugins.module_utils.version import (
|
||||
LooseVersion,
|
||||
)
|
||||
|
||||
|
||||
MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION
|
||||
|
||||
CRYPTOGRAPHY_IMP_ERR = None
|
||||
try:
|
||||
import cryptography
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
|
||||
CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
|
||||
except ImportError:
|
||||
CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
|
||||
CRYPTOGRAPHY_FOUND = False
|
||||
else:
|
||||
CRYPTOGRAPHY_FOUND = True
|
||||
pass
|
||||
|
||||
|
||||
TIMESTAMP_FORMAT = "%Y%m%d%H%M%SZ"
|
||||
@@ -60,10 +50,9 @@ TIMESTAMP_FORMAT = "%Y%m%d%H%M%SZ"
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class CertificateInfoRetrieval:
|
||||
def __init__(self, module, backend, content):
|
||||
def __init__(self, module, content):
|
||||
# content must be a bytes string
|
||||
self.module = module
|
||||
self.backend = backend
|
||||
self.content = content
|
||||
|
||||
@abc.abstractmethod
|
||||
@@ -151,7 +140,6 @@ class CertificateInfoRetrieval:
|
||||
self.cert = load_certificate(
|
||||
None,
|
||||
content=self.content,
|
||||
backend=self.backend,
|
||||
der_support_enabled=der_support_enabled,
|
||||
)
|
||||
|
||||
@@ -193,7 +181,6 @@ class CertificateInfoRetrieval:
|
||||
|
||||
public_key_info = get_publickey_info(
|
||||
self.module,
|
||||
self.backend,
|
||||
key=self._get_public_key_object(),
|
||||
prefer_one_fingerprint=prefer_one_fingerprint,
|
||||
)
|
||||
@@ -235,9 +222,7 @@ class CertificateInfoRetrievalCryptography(CertificateInfoRetrieval):
|
||||
"""Validate the supplied cert, using the cryptography backend"""
|
||||
|
||||
def __init__(self, module, content):
|
||||
super(CertificateInfoRetrievalCryptography, self).__init__(
|
||||
module, "cryptography", content
|
||||
)
|
||||
super(CertificateInfoRetrievalCryptography, self).__init__(module, content)
|
||||
self.name_encoding = module.params.get("name_encoding", "ignore")
|
||||
|
||||
def _get_der_bytes(self):
|
||||
@@ -445,38 +430,11 @@ class CertificateInfoRetrievalCryptography(CertificateInfoRetrieval):
|
||||
return None
|
||||
|
||||
|
||||
def get_certificate_info(module, backend, content, prefer_one_fingerprint=False):
|
||||
if backend == "cryptography":
|
||||
info = CertificateInfoRetrievalCryptography(module, content)
|
||||
def get_certificate_info(module, content, prefer_one_fingerprint=False):
|
||||
info = CertificateInfoRetrievalCryptography(module, content)
|
||||
return info.get_info(prefer_one_fingerprint=prefer_one_fingerprint)
|
||||
|
||||
|
||||
def select_backend(module, backend, content):
|
||||
if backend == "auto":
|
||||
# Detection what is possible
|
||||
can_use_cryptography = (
|
||||
CRYPTOGRAPHY_FOUND
|
||||
and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION)
|
||||
)
|
||||
|
||||
# Try cryptography
|
||||
if can_use_cryptography:
|
||||
backend = "cryptography"
|
||||
|
||||
# Success?
|
||||
if backend == "auto":
|
||||
module.fail_json(
|
||||
msg=f"Cannot detect any of the required Python libraries cryptography (>= {MINIMAL_CRYPTOGRAPHY_VERSION})"
|
||||
)
|
||||
|
||||
if backend == "cryptography":
|
||||
if not CRYPTOGRAPHY_FOUND:
|
||||
module.fail_json(
|
||||
msg=missing_required_lib(
|
||||
f"cryptography >= {MINIMAL_CRYPTOGRAPHY_VERSION}"
|
||||
),
|
||||
exception=CRYPTOGRAPHY_IMP_ERR,
|
||||
)
|
||||
return backend, CertificateInfoRetrievalCryptography(module, content)
|
||||
else:
|
||||
raise ValueError(f"Unsupported value for backend: {backend}")
|
||||
def select_backend(module, content):
|
||||
assert_required_cryptography_version(MINIMAL_CRYPTOGRAPHY_VERSION)
|
||||
return CertificateInfoRetrievalCryptography(module, content)
|
||||
|
||||
@@ -22,7 +22,6 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptograp
|
||||
set_not_valid_before,
|
||||
)
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.certificate import (
|
||||
CRYPTOGRAPHY_VERSION,
|
||||
CertificateBackend,
|
||||
CertificateError,
|
||||
CertificateProvider,
|
||||
@@ -35,9 +34,6 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im
|
||||
from ansible_collections.community.crypto.plugins.module_utils.time import (
|
||||
get_relative_time_option,
|
||||
)
|
||||
from ansible_collections.community.crypto.plugins.module_utils.version import (
|
||||
LooseVersion,
|
||||
)
|
||||
|
||||
|
||||
try:
|
||||
@@ -50,9 +46,7 @@ except ImportError:
|
||||
|
||||
class OwnCACertificateBackendCryptography(CertificateBackend):
|
||||
def __init__(self, module):
|
||||
super(OwnCACertificateBackendCryptography, self).__init__(
|
||||
module, "cryptography"
|
||||
)
|
||||
super(OwnCACertificateBackendCryptography, self).__init__(module)
|
||||
|
||||
self.create_subject_key_identifier = module.params[
|
||||
"ownca_create_subject_key_identifier"
|
||||
@@ -63,13 +57,11 @@ class OwnCACertificateBackendCryptography(CertificateBackend):
|
||||
self.notBefore = get_relative_time_option(
|
||||
module.params["ownca_not_before"],
|
||||
"ownca_not_before",
|
||||
backend=self.backend,
|
||||
with_timezone=CRYPTOGRAPHY_TIMEZONE,
|
||||
)
|
||||
self.notAfter = get_relative_time_option(
|
||||
module.params["ownca_not_after"],
|
||||
"ownca_not_after",
|
||||
backend=self.backend,
|
||||
with_timezone=CRYPTOGRAPHY_TIMEZONE,
|
||||
)
|
||||
self.digest = select_message_digest(module.params["ownca_digest"])
|
||||
@@ -106,14 +98,14 @@ class OwnCACertificateBackendCryptography(CertificateBackend):
|
||||
|
||||
self._ensure_csr_loaded()
|
||||
self.ca_cert = load_certificate(
|
||||
path=self.ca_cert_path, content=self.ca_cert_content, backend=self.backend
|
||||
path=self.ca_cert_path,
|
||||
content=self.ca_cert_content,
|
||||
)
|
||||
try:
|
||||
self.ca_private_key = load_privatekey(
|
||||
path=self.ca_privatekey_path,
|
||||
content=self.ca_privatekey_content,
|
||||
passphrase=self.ca_privatekey_passphrase,
|
||||
backend=self.backend,
|
||||
)
|
||||
except OpenSSLBadPassphraseError as exc:
|
||||
module.fail_json(msg=str(exc))
|
||||
@@ -170,10 +162,6 @@ class OwnCACertificateBackendCryptography(CertificateBackend):
|
||||
x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
|
||||
ext.value
|
||||
)
|
||||
if CRYPTOGRAPHY_VERSION >= LooseVersion("2.7")
|
||||
else x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
|
||||
ext
|
||||
)
|
||||
),
|
||||
critical=False,
|
||||
)
|
||||
@@ -224,10 +212,6 @@ class OwnCACertificateBackendCryptography(CertificateBackend):
|
||||
x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
|
||||
ext.value
|
||||
)
|
||||
if CRYPTOGRAPHY_VERSION >= LooseVersion("2.7")
|
||||
else x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
|
||||
ext
|
||||
)
|
||||
)
|
||||
except cryptography.x509.ExtensionNotFound:
|
||||
expected_ext = x509.AuthorityKeyIdentifier.from_issuer_public_key(
|
||||
@@ -310,9 +294,8 @@ class OwnCACertificateProvider(CertificateProvider):
|
||||
def needs_version_two_certs(self, module):
|
||||
return module.params["ownca_version"] == 2
|
||||
|
||||
def create_backend(self, module, backend):
|
||||
if backend == "cryptography":
|
||||
return OwnCACertificateBackendCryptography(module)
|
||||
def create_backend(self, module):
|
||||
return OwnCACertificateBackendCryptography(module)
|
||||
|
||||
|
||||
def add_ownca_provider_to_argument_spec(argument_spec):
|
||||
|
||||
@@ -40,9 +40,7 @@ except ImportError:
|
||||
|
||||
class SelfSignedCertificateBackendCryptography(CertificateBackend):
|
||||
def __init__(self, module):
|
||||
super(SelfSignedCertificateBackendCryptography, self).__init__(
|
||||
module, "cryptography"
|
||||
)
|
||||
super(SelfSignedCertificateBackendCryptography, self).__init__(module)
|
||||
|
||||
self.create_subject_key_identifier = module.params[
|
||||
"selfsigned_create_subject_key_identifier"
|
||||
@@ -50,13 +48,11 @@ class SelfSignedCertificateBackendCryptography(CertificateBackend):
|
||||
self.notBefore = get_relative_time_option(
|
||||
module.params["selfsigned_not_before"],
|
||||
"selfsigned_not_before",
|
||||
backend=self.backend,
|
||||
with_timezone=CRYPTOGRAPHY_TIMEZONE,
|
||||
)
|
||||
self.notAfter = get_relative_time_option(
|
||||
module.params["selfsigned_not_after"],
|
||||
"selfsigned_not_after",
|
||||
backend=self.backend,
|
||||
with_timezone=CRYPTOGRAPHY_TIMEZONE,
|
||||
)
|
||||
self.digest = select_message_digest(module.params["selfsigned_digest"])
|
||||
@@ -206,9 +202,8 @@ class SelfSignedCertificateProvider(CertificateProvider):
|
||||
def needs_version_two_certs(self, module):
|
||||
return module.params["selfsigned_version"] == 2
|
||||
|
||||
def create_backend(self, module, backend):
|
||||
if backend == "cryptography":
|
||||
return SelfSignedCertificateBackendCryptography(module)
|
||||
def create_backend(self, module):
|
||||
return SelfSignedCertificateBackendCryptography(module)
|
||||
|
||||
|
||||
def add_selfsigned_provider_to_argument_spec(argument_spec):
|
||||
|
||||
@@ -4,9 +4,6 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import traceback
|
||||
|
||||
from ansible.module_utils.basic import missing_required_lib
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_crl import (
|
||||
TIMESTAMP_FORMAT,
|
||||
cryptography_decode_revoked_certificate,
|
||||
@@ -21,9 +18,7 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import
|
||||
)
|
||||
from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import (
|
||||
COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION,
|
||||
)
|
||||
from ansible_collections.community.crypto.plugins.module_utils.version import (
|
||||
LooseVersion,
|
||||
assert_required_cryptography_version,
|
||||
)
|
||||
|
||||
|
||||
@@ -31,17 +26,10 @@ from ansible_collections.community.crypto.plugins.module_utils.version import (
|
||||
|
||||
MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION
|
||||
|
||||
CRYPTOGRAPHY_IMP_ERR = None
|
||||
try:
|
||||
import cryptography
|
||||
from cryptography import x509
|
||||
|
||||
CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
|
||||
except ImportError:
|
||||
CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
|
||||
CRYPTOGRAPHY_FOUND = False
|
||||
else:
|
||||
CRYPTOGRAPHY_FOUND = True
|
||||
pass
|
||||
|
||||
|
||||
class CRLInfoRetrieval:
|
||||
@@ -96,12 +84,7 @@ class CRLInfoRetrieval:
|
||||
|
||||
|
||||
def get_crl_info(module, content, list_revoked_certificates=True):
|
||||
if not CRYPTOGRAPHY_FOUND:
|
||||
module.fail_json(
|
||||
msg=missing_required_lib(f"cryptography >= {MINIMAL_CRYPTOGRAPHY_VERSION}"),
|
||||
exception=CRYPTOGRAPHY_IMP_ERR,
|
||||
)
|
||||
|
||||
assert_required_cryptography_version(MINIMAL_CRYPTOGRAPHY_VERSION)
|
||||
info = CRLInfoRetrieval(
|
||||
module, content, list_revoked_certificates=list_revoked_certificates
|
||||
)
|
||||
|
||||
@@ -7,10 +7,8 @@ from __future__ import annotations
|
||||
|
||||
import abc
|
||||
import binascii
|
||||
import traceback
|
||||
|
||||
from ansible.module_utils import six
|
||||
from ansible.module_utils.basic import missing_required_lib
|
||||
from ansible.module_utils.common.text.converters import to_text
|
||||
from ansible_collections.community.crypto.plugins.module_utils.argspec import (
|
||||
ArgumentSpec,
|
||||
@@ -42,15 +40,12 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im
|
||||
)
|
||||
from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import (
|
||||
COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION,
|
||||
)
|
||||
from ansible_collections.community.crypto.plugins.module_utils.version import (
|
||||
LooseVersion,
|
||||
assert_required_cryptography_version,
|
||||
)
|
||||
|
||||
|
||||
MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION
|
||||
|
||||
CRYPTOGRAPHY_IMP_ERR = None
|
||||
try:
|
||||
import cryptography
|
||||
import cryptography.exceptions
|
||||
@@ -59,17 +54,8 @@ try:
|
||||
import cryptography.hazmat.primitives.serialization
|
||||
import cryptography.x509
|
||||
import cryptography.x509.oid
|
||||
|
||||
CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
|
||||
except ImportError:
|
||||
CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
|
||||
CRYPTOGRAPHY_FOUND = False
|
||||
else:
|
||||
CRYPTOGRAPHY_FOUND = True
|
||||
CRYPTOGRAPHY_MUST_STAPLE_NAME = cryptography.x509.oid.ObjectIdentifier(
|
||||
"1.3.6.1.5.5.7.1.24"
|
||||
)
|
||||
CRYPTOGRAPHY_MUST_STAPLE_VALUE = b"\x30\x03\x02\x01\x05"
|
||||
pass
|
||||
|
||||
|
||||
class CertificateSigningRequestError(OpenSSLObjectError):
|
||||
@@ -85,9 +71,8 @@ class CertificateSigningRequestError(OpenSSLObjectError):
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class CertificateSigningRequestBackend:
|
||||
def __init__(self, module, backend):
|
||||
def __init__(self, module):
|
||||
self.module = module
|
||||
self.backend = backend
|
||||
self.digest = module.params["digest"]
|
||||
self.privatekey_path = module.params["privatekey_path"]
|
||||
self.privatekey_content = module.params["privatekey_content"]
|
||||
@@ -202,7 +187,6 @@ class CertificateSigningRequestBackend:
|
||||
try:
|
||||
result = get_csr_info(
|
||||
self.module,
|
||||
self.backend,
|
||||
data,
|
||||
validate_signature=False,
|
||||
prefer_one_fingerprint=True,
|
||||
@@ -240,7 +224,6 @@ class CertificateSigningRequestBackend:
|
||||
path=self.privatekey_path,
|
||||
content=self.privatekey_content,
|
||||
passphrase=self.privatekey_passphrase,
|
||||
backend=self.backend,
|
||||
)
|
||||
except OpenSSLBadPassphraseError as exc:
|
||||
raise CertificateSigningRequestError(exc)
|
||||
@@ -256,7 +239,8 @@ class CertificateSigningRequestBackend:
|
||||
return True
|
||||
try:
|
||||
self.existing_csr = load_certificate_request(
|
||||
None, content=self.existing_csr_bytes, backend=self.backend
|
||||
None,
|
||||
content=self.existing_csr_bytes,
|
||||
)
|
||||
except Exception:
|
||||
return True
|
||||
@@ -340,9 +324,7 @@ def parse_crl_distribution_points(module, crl_distribution_points):
|
||||
# Implementation with using cryptography
|
||||
class CertificateSigningRequestCryptographyBackend(CertificateSigningRequestBackend):
|
||||
def __init__(self, module):
|
||||
super(CertificateSigningRequestCryptographyBackend, self).__init__(
|
||||
module, "cryptography"
|
||||
)
|
||||
super(CertificateSigningRequestCryptographyBackend, self).__init__(module)
|
||||
if self.version != 1:
|
||||
module.warn(
|
||||
"The cryptography backend only supports version 1. (The only valid value according to RFC 2986.)"
|
||||
@@ -613,20 +595,16 @@ class CertificateSigningRequestCryptographyBackend(CertificateSigningRequestBack
|
||||
|
||||
def _check_ocspMustStaple(extensions):
|
||||
tlsfeature_ext = _find_extension(extensions, cryptography.x509.TLSFeature)
|
||||
has_tlsfeature = True
|
||||
if self.ocspMustStaple:
|
||||
if (
|
||||
not tlsfeature_ext
|
||||
or tlsfeature_ext.critical != self.ocspMustStaple_critical
|
||||
):
|
||||
return False
|
||||
if has_tlsfeature:
|
||||
return (
|
||||
cryptography.x509.TLSFeatureType.status_request
|
||||
in tlsfeature_ext.value
|
||||
)
|
||||
else:
|
||||
return tlsfeature_ext.value.value == CRYPTOGRAPHY_MUST_STAPLE_VALUE
|
||||
return (
|
||||
cryptography.x509.TLSFeatureType.status_request
|
||||
in tlsfeature_ext.value
|
||||
)
|
||||
else:
|
||||
return tlsfeature_ext is None
|
||||
|
||||
@@ -756,35 +734,9 @@ class CertificateSigningRequestCryptographyBackend(CertificateSigningRequestBack
|
||||
)
|
||||
|
||||
|
||||
def select_backend(module, backend):
|
||||
if backend == "auto":
|
||||
# Detection what is possible
|
||||
can_use_cryptography = (
|
||||
CRYPTOGRAPHY_FOUND
|
||||
and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION)
|
||||
)
|
||||
|
||||
# Try cryptography
|
||||
if can_use_cryptography:
|
||||
backend = "cryptography"
|
||||
|
||||
# Success?
|
||||
if backend == "auto":
|
||||
module.fail_json(
|
||||
msg=f"Cannot detect any of the required Python libraries cryptography (>= {MINIMAL_CRYPTOGRAPHY_VERSION})"
|
||||
)
|
||||
|
||||
if backend == "cryptography":
|
||||
if not CRYPTOGRAPHY_FOUND:
|
||||
module.fail_json(
|
||||
msg=missing_required_lib(
|
||||
f"cryptography >= {MINIMAL_CRYPTOGRAPHY_VERSION}"
|
||||
),
|
||||
exception=CRYPTOGRAPHY_IMP_ERR,
|
||||
)
|
||||
return backend, CertificateSigningRequestCryptographyBackend(module)
|
||||
else:
|
||||
raise Exception(f"Unsupported value for backend: {backend}")
|
||||
def select_backend(module):
|
||||
assert_required_cryptography_version(MINIMAL_CRYPTOGRAPHY_VERSION)
|
||||
return CertificateSigningRequestCryptographyBackend(module)
|
||||
|
||||
|
||||
def get_csr_argument_spec():
|
||||
|
||||
@@ -8,10 +8,8 @@ from __future__ import annotations
|
||||
|
||||
import abc
|
||||
import binascii
|
||||
import traceback
|
||||
|
||||
from ansible.module_utils import six
|
||||
from ansible.module_utils.basic import missing_required_lib
|
||||
from ansible.module_utils.common.text.converters import to_native
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
|
||||
cryptography_decode_name,
|
||||
@@ -26,26 +24,18 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im
|
||||
)
|
||||
from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import (
|
||||
COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION,
|
||||
)
|
||||
from ansible_collections.community.crypto.plugins.module_utils.version import (
|
||||
LooseVersion,
|
||||
assert_required_cryptography_version,
|
||||
)
|
||||
|
||||
|
||||
MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION
|
||||
|
||||
CRYPTOGRAPHY_IMP_ERR = None
|
||||
try:
|
||||
import cryptography
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
|
||||
CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
|
||||
except ImportError:
|
||||
CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
|
||||
CRYPTOGRAPHY_FOUND = False
|
||||
else:
|
||||
CRYPTOGRAPHY_FOUND = True
|
||||
pass
|
||||
|
||||
|
||||
TIMESTAMP_FORMAT = "%Y%m%d%H%M%SZ"
|
||||
@@ -53,10 +43,9 @@ TIMESTAMP_FORMAT = "%Y%m%d%H%M%SZ"
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class CSRInfoRetrieval:
|
||||
def __init__(self, module, backend, content, validate_signature):
|
||||
def __init__(self, module, content, validate_signature):
|
||||
# content must be a bytes string
|
||||
self.module = module
|
||||
self.backend = backend
|
||||
self.content = content
|
||||
self.validate_signature = validate_signature
|
||||
|
||||
@@ -115,7 +104,8 @@ class CSRInfoRetrieval:
|
||||
def get_info(self, prefer_one_fingerprint=False):
|
||||
result = dict()
|
||||
self.csr = load_certificate_request(
|
||||
None, content=self.content, backend=self.backend
|
||||
None,
|
||||
content=self.content,
|
||||
)
|
||||
|
||||
subject = self._get_subject_ordered()
|
||||
@@ -146,7 +136,6 @@ class CSRInfoRetrieval:
|
||||
|
||||
public_key_info = get_publickey_info(
|
||||
self.module,
|
||||
self.backend,
|
||||
key=self._get_public_key_object(),
|
||||
prefer_one_fingerprint=prefer_one_fingerprint,
|
||||
)
|
||||
@@ -185,7 +174,7 @@ class CSRInfoRetrievalCryptography(CSRInfoRetrieval):
|
||||
|
||||
def __init__(self, module, content, validate_signature):
|
||||
super(CSRInfoRetrievalCryptography, self).__init__(
|
||||
module, "cryptography", content, validate_signature
|
||||
module, content, validate_signature
|
||||
)
|
||||
self.name_encoding = module.params.get("name_encoding", "ignore")
|
||||
|
||||
@@ -352,43 +341,16 @@ class CSRInfoRetrievalCryptography(CSRInfoRetrieval):
|
||||
|
||||
|
||||
def get_csr_info(
|
||||
module, backend, content, validate_signature=True, prefer_one_fingerprint=False
|
||||
module, content, validate_signature=True, prefer_one_fingerprint=False
|
||||
):
|
||||
if backend == "cryptography":
|
||||
info = CSRInfoRetrievalCryptography(
|
||||
module, content, validate_signature=validate_signature
|
||||
)
|
||||
info = CSRInfoRetrievalCryptography(
|
||||
module, content, validate_signature=validate_signature
|
||||
)
|
||||
return info.get_info(prefer_one_fingerprint=prefer_one_fingerprint)
|
||||
|
||||
|
||||
def select_backend(module, backend, content, validate_signature=True):
|
||||
if backend == "auto":
|
||||
# Detection what is possible
|
||||
can_use_cryptography = (
|
||||
CRYPTOGRAPHY_FOUND
|
||||
and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION)
|
||||
)
|
||||
|
||||
# Try cryptography
|
||||
if can_use_cryptography:
|
||||
backend = "cryptography"
|
||||
|
||||
# Success?
|
||||
if backend == "auto":
|
||||
module.fail_json(
|
||||
msg=f"Cannot detect the required Python library cryptography (>= {MINIMAL_CRYPTOGRAPHY_VERSION})"
|
||||
)
|
||||
|
||||
if backend == "cryptography":
|
||||
if not CRYPTOGRAPHY_FOUND:
|
||||
module.fail_json(
|
||||
msg=missing_required_lib(
|
||||
f"cryptography >= {MINIMAL_CRYPTOGRAPHY_VERSION}"
|
||||
),
|
||||
exception=CRYPTOGRAPHY_IMP_ERR,
|
||||
)
|
||||
return backend, CSRInfoRetrievalCryptography(
|
||||
module, content, validate_signature=validate_signature
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Unsupported value for backend: {backend}")
|
||||
def select_backend(module, content, validate_signature=True):
|
||||
assert_required_cryptography_version(MINIMAL_CRYPTOGRAPHY_VERSION)
|
||||
return CSRInfoRetrievalCryptography(
|
||||
module, content, validate_signature=validate_signature
|
||||
)
|
||||
|
||||
@@ -10,7 +10,6 @@ import base64
|
||||
import traceback
|
||||
|
||||
from ansible.module_utils import six
|
||||
from ansible.module_utils.basic import missing_required_lib
|
||||
from ansible.module_utils.common.text.converters import to_bytes
|
||||
from ansible_collections.community.crypto.plugins.module_utils.argspec import (
|
||||
ArgumentSpec,
|
||||
@@ -31,15 +30,12 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im
|
||||
)
|
||||
from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import (
|
||||
COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION,
|
||||
)
|
||||
from ansible_collections.community.crypto.plugins.module_utils.version import (
|
||||
LooseVersion,
|
||||
assert_required_cryptography_version,
|
||||
)
|
||||
|
||||
|
||||
MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION
|
||||
|
||||
CRYPTOGRAPHY_IMP_ERR = None
|
||||
try:
|
||||
import cryptography
|
||||
import cryptography.exceptions
|
||||
@@ -53,13 +49,8 @@ try:
|
||||
import cryptography.hazmat.primitives.asymmetric.x448
|
||||
import cryptography.hazmat.primitives.asymmetric.x25519
|
||||
import cryptography.hazmat.primitives.serialization
|
||||
|
||||
CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
|
||||
except ImportError:
|
||||
CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
|
||||
CRYPTOGRAPHY_FOUND = False
|
||||
else:
|
||||
CRYPTOGRAPHY_FOUND = True
|
||||
pass
|
||||
|
||||
|
||||
class PrivateKeyError(OpenSSLObjectError):
|
||||
@@ -75,7 +66,7 @@ class PrivateKeyError(OpenSSLObjectError):
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class PrivateKeyBackend:
|
||||
def __init__(self, module, backend):
|
||||
def __init__(self, module):
|
||||
self.module = module
|
||||
self.type = module.params["type"]
|
||||
self.size = module.params["size"]
|
||||
@@ -85,7 +76,6 @@ class PrivateKeyBackend:
|
||||
self.format = module.params["format"]
|
||||
self.format_mismatch = module.params.get("format_mismatch", "regenerate")
|
||||
self.regenerate = module.params.get("regenerate", "full_idempotence")
|
||||
self.backend = backend
|
||||
|
||||
self.private_key = None
|
||||
|
||||
@@ -103,7 +93,6 @@ class PrivateKeyBackend:
|
||||
result.update(
|
||||
get_privatekey_info(
|
||||
self.module,
|
||||
self.backend,
|
||||
data,
|
||||
passphrase=self.passphrase,
|
||||
return_private_key_data=False,
|
||||
@@ -219,16 +208,14 @@ class PrivateKeyBackend:
|
||||
|
||||
def _get_fingerprint(self):
|
||||
if self.private_key:
|
||||
return get_fingerprint_of_privatekey(self.private_key, backend=self.backend)
|
||||
return get_fingerprint_of_privatekey(self.private_key)
|
||||
try:
|
||||
self._ensure_existing_private_key_loaded()
|
||||
except Exception:
|
||||
# Ignore errors
|
||||
pass
|
||||
if self.existing_private_key:
|
||||
return get_fingerprint_of_privatekey(
|
||||
self.existing_private_key, backend=self.backend
|
||||
)
|
||||
return get_fingerprint_of_privatekey(self.existing_private_key)
|
||||
|
||||
def dump(self, include_key):
|
||||
"""Serialize the object into a dictionary."""
|
||||
@@ -297,9 +284,7 @@ class PrivateKeyCryptographyBackend(PrivateKeyBackend):
|
||||
}
|
||||
|
||||
def __init__(self, module):
|
||||
super(PrivateKeyCryptographyBackend, self).__init__(
|
||||
module=module, backend="cryptography"
|
||||
)
|
||||
super(PrivateKeyCryptographyBackend, self).__init__(module=module)
|
||||
|
||||
self.curves = dict()
|
||||
self._add_curve("secp224r1", "SECP224R1")
|
||||
@@ -558,34 +543,9 @@ class PrivateKeyCryptographyBackend(PrivateKeyBackend):
|
||||
return False
|
||||
|
||||
|
||||
def select_backend(module, backend):
|
||||
if backend == "auto":
|
||||
# Detection what is possible
|
||||
can_use_cryptography = (
|
||||
CRYPTOGRAPHY_FOUND
|
||||
and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION)
|
||||
)
|
||||
|
||||
# Decision
|
||||
if can_use_cryptography:
|
||||
backend = "cryptography"
|
||||
|
||||
# Success?
|
||||
if backend == "auto":
|
||||
module.fail_json(
|
||||
msg=f"Cannot detect the required Python library cryptography (>= {MINIMAL_CRYPTOGRAPHY_VERSION})"
|
||||
)
|
||||
if backend == "cryptography":
|
||||
if not CRYPTOGRAPHY_FOUND:
|
||||
module.fail_json(
|
||||
msg=missing_required_lib(
|
||||
f"cryptography >= {MINIMAL_CRYPTOGRAPHY_VERSION}"
|
||||
),
|
||||
exception=CRYPTOGRAPHY_IMP_ERR,
|
||||
)
|
||||
return backend, PrivateKeyCryptographyBackend(module)
|
||||
else:
|
||||
raise Exception(f"Unsupported value for backend: {backend}")
|
||||
def select_backend(module):
|
||||
assert_required_cryptography_version(MINIMAL_CRYPTOGRAPHY_VERSION)
|
||||
return PrivateKeyCryptographyBackend(module)
|
||||
|
||||
|
||||
def get_privatekey_argument_spec():
|
||||
|
||||
@@ -8,7 +8,6 @@ import abc
|
||||
import traceback
|
||||
|
||||
from ansible.module_utils import six
|
||||
from ansible.module_utils.basic import missing_required_lib
|
||||
from ansible.module_utils.common.text.converters import to_bytes
|
||||
from ansible_collections.community.crypto.plugins.module_utils.argspec import (
|
||||
ArgumentSpec,
|
||||
@@ -24,16 +23,13 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import
|
||||
)
|
||||
from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import (
|
||||
COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION,
|
||||
assert_required_cryptography_version,
|
||||
)
|
||||
from ansible_collections.community.crypto.plugins.module_utils.io import load_file
|
||||
from ansible_collections.community.crypto.plugins.module_utils.version import (
|
||||
LooseVersion,
|
||||
)
|
||||
|
||||
|
||||
MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION
|
||||
|
||||
CRYPTOGRAPHY_IMP_ERR = None
|
||||
try:
|
||||
import cryptography
|
||||
import cryptography.exceptions
|
||||
@@ -47,13 +43,8 @@ try:
|
||||
import cryptography.hazmat.primitives.asymmetric.x448
|
||||
import cryptography.hazmat.primitives.asymmetric.x25519
|
||||
import cryptography.hazmat.primitives.serialization
|
||||
|
||||
CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
|
||||
except ImportError:
|
||||
CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
|
||||
CRYPTOGRAPHY_FOUND = False
|
||||
else:
|
||||
CRYPTOGRAPHY_FOUND = True
|
||||
pass
|
||||
|
||||
|
||||
class PrivateKeyError(OpenSSLObjectError):
|
||||
@@ -69,14 +60,13 @@ class PrivateKeyError(OpenSSLObjectError):
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class PrivateKeyConvertBackend:
|
||||
def __init__(self, module, backend):
|
||||
def __init__(self, module):
|
||||
self.module = module
|
||||
self.src_path = module.params["src_path"]
|
||||
self.src_content = module.params["src_content"]
|
||||
self.src_passphrase = module.params["src_passphrase"]
|
||||
self.format = module.params["format"]
|
||||
self.dest_passphrase = module.params["dest_passphrase"]
|
||||
self.backend = backend
|
||||
|
||||
self.src_private_key = None
|
||||
if self.src_path is not None:
|
||||
@@ -135,9 +125,7 @@ class PrivateKeyConvertBackend:
|
||||
# Implementation with using cryptography
|
||||
class PrivateKeyConvertCryptographyBackend(PrivateKeyConvertBackend):
|
||||
def __init__(self, module):
|
||||
super(PrivateKeyConvertCryptographyBackend, self).__init__(
|
||||
module=module, backend="cryptography"
|
||||
)
|
||||
super(PrivateKeyConvertCryptographyBackend, self).__init__(module=module)
|
||||
|
||||
def get_private_key_data(self):
|
||||
"""Return bytes for self.src_private_key in output format"""
|
||||
@@ -262,11 +250,7 @@ class PrivateKeyConvertCryptographyBackend(PrivateKeyConvertBackend):
|
||||
|
||||
|
||||
def select_backend(module):
|
||||
if not CRYPTOGRAPHY_FOUND:
|
||||
module.fail_json(
|
||||
msg=missing_required_lib(f"cryptography >= {MINIMAL_CRYPTOGRAPHY_VERSION}"),
|
||||
exception=CRYPTOGRAPHY_IMP_ERR,
|
||||
)
|
||||
assert_required_cryptography_version(MINIMAL_CRYPTOGRAPHY_VERSION)
|
||||
return PrivateKeyConvertCryptographyBackend(module)
|
||||
|
||||
|
||||
|
||||
@@ -7,10 +7,8 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import abc
|
||||
import traceback
|
||||
|
||||
from ansible.module_utils import six
|
||||
from ansible.module_utils.basic import missing_required_lib
|
||||
from ansible.module_utils.common.text.converters import to_bytes, to_native
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
|
||||
OpenSSLObjectError,
|
||||
@@ -28,25 +26,17 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im
|
||||
)
|
||||
from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import (
|
||||
COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION,
|
||||
)
|
||||
from ansible_collections.community.crypto.plugins.module_utils.version import (
|
||||
LooseVersion,
|
||||
assert_required_cryptography_version,
|
||||
)
|
||||
|
||||
|
||||
MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION
|
||||
|
||||
CRYPTOGRAPHY_IMP_ERR = None
|
||||
try:
|
||||
import cryptography
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
|
||||
CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
|
||||
except ImportError:
|
||||
CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
|
||||
CRYPTOGRAPHY_FOUND = False
|
||||
else:
|
||||
CRYPTOGRAPHY_FOUND = True
|
||||
pass
|
||||
|
||||
SIGNATURE_TEST_DATA = b"1234"
|
||||
|
||||
@@ -187,7 +177,6 @@ class PrivateKeyInfoRetrieval:
|
||||
def __init__(
|
||||
self,
|
||||
module,
|
||||
backend,
|
||||
content,
|
||||
passphrase=None,
|
||||
return_private_key_data=False,
|
||||
@@ -195,7 +184,6 @@ class PrivateKeyInfoRetrieval:
|
||||
):
|
||||
# content must be a bytes string
|
||||
self.module = module
|
||||
self.backend = backend
|
||||
self.content = content
|
||||
self.passphrase = passphrase
|
||||
self.return_private_key_data = return_private_key_data
|
||||
@@ -228,7 +216,6 @@ class PrivateKeyInfoRetrieval:
|
||||
if self.passphrase is not None
|
||||
else self.passphrase
|
||||
),
|
||||
backend=self.backend,
|
||||
)
|
||||
result["can_parse_key"] = True
|
||||
except OpenSSLObjectError as exc:
|
||||
@@ -269,7 +256,7 @@ class PrivateKeyInfoRetrievalCryptography(PrivateKeyInfoRetrieval):
|
||||
|
||||
def __init__(self, module, content, **kwargs):
|
||||
super(PrivateKeyInfoRetrievalCryptography, self).__init__(
|
||||
module, "cryptography", content, **kwargs
|
||||
module, content, **kwargs
|
||||
)
|
||||
|
||||
def _get_public_key(self, binary):
|
||||
@@ -291,61 +278,32 @@ class PrivateKeyInfoRetrievalCryptography(PrivateKeyInfoRetrieval):
|
||||
|
||||
def get_privatekey_info(
|
||||
module,
|
||||
backend,
|
||||
content,
|
||||
passphrase=None,
|
||||
return_private_key_data=False,
|
||||
prefer_one_fingerprint=False,
|
||||
):
|
||||
if backend == "cryptography":
|
||||
info = PrivateKeyInfoRetrievalCryptography(
|
||||
module,
|
||||
content,
|
||||
passphrase=passphrase,
|
||||
return_private_key_data=return_private_key_data,
|
||||
)
|
||||
info = PrivateKeyInfoRetrievalCryptography(
|
||||
module,
|
||||
content,
|
||||
passphrase=passphrase,
|
||||
return_private_key_data=return_private_key_data,
|
||||
)
|
||||
return info.get_info(prefer_one_fingerprint=prefer_one_fingerprint)
|
||||
|
||||
|
||||
def select_backend(
|
||||
module,
|
||||
backend,
|
||||
content,
|
||||
passphrase=None,
|
||||
return_private_key_data=False,
|
||||
check_consistency=False,
|
||||
):
|
||||
if backend == "auto":
|
||||
# Detection what is possible
|
||||
can_use_cryptography = (
|
||||
CRYPTOGRAPHY_FOUND
|
||||
and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION)
|
||||
)
|
||||
|
||||
# Try cryptography
|
||||
if can_use_cryptography:
|
||||
backend = "cryptography"
|
||||
|
||||
# Success?
|
||||
if backend == "auto":
|
||||
module.fail_json(
|
||||
msg=f"Cannot detect the required Python library cryptography (>= {MINIMAL_CRYPTOGRAPHY_VERSION})"
|
||||
)
|
||||
|
||||
if backend == "cryptography":
|
||||
if not CRYPTOGRAPHY_FOUND:
|
||||
module.fail_json(
|
||||
msg=missing_required_lib(
|
||||
f"cryptography >= {MINIMAL_CRYPTOGRAPHY_VERSION}"
|
||||
),
|
||||
exception=CRYPTOGRAPHY_IMP_ERR,
|
||||
)
|
||||
return backend, PrivateKeyInfoRetrievalCryptography(
|
||||
module,
|
||||
content,
|
||||
passphrase=passphrase,
|
||||
return_private_key_data=return_private_key_data,
|
||||
check_consistency=check_consistency,
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Unsupported value for backend: {backend}")
|
||||
assert_required_cryptography_version(MINIMAL_CRYPTOGRAPHY_VERSION)
|
||||
return PrivateKeyInfoRetrievalCryptography(
|
||||
module,
|
||||
content,
|
||||
passphrase=passphrase,
|
||||
return_private_key_data=return_private_key_data,
|
||||
check_consistency=check_consistency,
|
||||
)
|
||||
|
||||
@@ -5,10 +5,8 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import abc
|
||||
import traceback
|
||||
|
||||
from ansible.module_utils import six
|
||||
from ansible.module_utils.basic import missing_required_lib
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
|
||||
OpenSSLObjectError,
|
||||
)
|
||||
@@ -18,15 +16,12 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im
|
||||
)
|
||||
from ansible_collections.community.crypto.plugins.module_utils.cryptography_dep import (
|
||||
COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION,
|
||||
)
|
||||
from ansible_collections.community.crypto.plugins.module_utils.version import (
|
||||
LooseVersion,
|
||||
assert_required_cryptography_version,
|
||||
)
|
||||
|
||||
|
||||
MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION
|
||||
|
||||
CRYPTOGRAPHY_IMP_ERR = None
|
||||
try:
|
||||
import cryptography
|
||||
import cryptography.hazmat.primitives.asymmetric.ed448
|
||||
@@ -34,13 +29,8 @@ try:
|
||||
import cryptography.hazmat.primitives.asymmetric.x448
|
||||
import cryptography.hazmat.primitives.asymmetric.x25519
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
|
||||
CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
|
||||
except ImportError:
|
||||
CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
|
||||
CRYPTOGRAPHY_FOUND = False
|
||||
else:
|
||||
CRYPTOGRAPHY_FOUND = True
|
||||
pass
|
||||
|
||||
|
||||
def _get_cryptography_public_key_info(key):
|
||||
@@ -97,10 +87,9 @@ class PublicKeyParseError(OpenSSLObjectError):
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class PublicKeyInfoRetrieval:
|
||||
def __init__(self, module, backend, content=None, key=None):
|
||||
def __init__(self, module, content=None, key=None):
|
||||
# content must be a bytes string
|
||||
self.module = module
|
||||
self.backend = backend
|
||||
self.content = content
|
||||
self.key = key
|
||||
|
||||
@@ -116,7 +105,7 @@ class PublicKeyInfoRetrieval:
|
||||
result = dict()
|
||||
if self.key is None:
|
||||
try:
|
||||
self.key = load_publickey(content=self.content, backend=self.backend)
|
||||
self.key = load_publickey(content=self.content)
|
||||
except OpenSSLObjectError as e:
|
||||
raise PublicKeyParseError(str(e), {})
|
||||
|
||||
@@ -138,7 +127,7 @@ class PublicKeyInfoRetrievalCryptography(PublicKeyInfoRetrieval):
|
||||
|
||||
def __init__(self, module, content=None, key=None):
|
||||
super(PublicKeyInfoRetrievalCryptography, self).__init__(
|
||||
module, "cryptography", content=content, key=key
|
||||
module, content=content, key=key
|
||||
)
|
||||
|
||||
def _get_public_key(self, binary):
|
||||
@@ -151,42 +140,11 @@ class PublicKeyInfoRetrievalCryptography(PublicKeyInfoRetrieval):
|
||||
return _get_cryptography_public_key_info(self.key)
|
||||
|
||||
|
||||
def get_publickey_info(
|
||||
module, backend, content=None, key=None, prefer_one_fingerprint=False
|
||||
):
|
||||
if backend == "cryptography":
|
||||
info = PublicKeyInfoRetrievalCryptography(module, content=content, key=key)
|
||||
def get_publickey_info(module, content=None, key=None, prefer_one_fingerprint=False):
|
||||
info = PublicKeyInfoRetrievalCryptography(module, content=content, key=key)
|
||||
return info.get_info(prefer_one_fingerprint=prefer_one_fingerprint)
|
||||
|
||||
|
||||
def select_backend(module, backend, content=None, key=None):
|
||||
if backend == "auto":
|
||||
# Detection what is possible
|
||||
can_use_cryptography = (
|
||||
CRYPTOGRAPHY_FOUND
|
||||
and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION)
|
||||
)
|
||||
|
||||
# Try cryptography
|
||||
if can_use_cryptography:
|
||||
backend = "cryptography"
|
||||
|
||||
# Success?
|
||||
if backend == "auto":
|
||||
module.fail_json(
|
||||
msg=f"Cannot detect any of the required Python libraries cryptography (>= {MINIMAL_CRYPTOGRAPHY_VERSION})"
|
||||
)
|
||||
|
||||
if backend == "cryptography":
|
||||
if not CRYPTOGRAPHY_FOUND:
|
||||
module.fail_json(
|
||||
msg=missing_required_lib(
|
||||
f"cryptography >= {MINIMAL_CRYPTOGRAPHY_VERSION}"
|
||||
),
|
||||
exception=CRYPTOGRAPHY_IMP_ERR,
|
||||
)
|
||||
return backend, PublicKeyInfoRetrievalCryptography(
|
||||
module, content=content, key=key
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Unsupported value for backend: {backend}")
|
||||
def select_backend(module, content=None, key=None):
|
||||
assert_required_cryptography_version(MINIMAL_CRYPTOGRAPHY_VERSION)
|
||||
return PublicKeyInfoRetrievalCryptography(module, content=content, key=key)
|
||||
|
||||
@@ -98,20 +98,17 @@ def get_fingerprint_of_bytes(source, prefer_one=False):
|
||||
return fingerprint
|
||||
|
||||
|
||||
def get_fingerprint_of_privatekey(privatekey, backend="cryptography", prefer_one=False):
|
||||
def get_fingerprint_of_privatekey(privatekey, prefer_one=False):
|
||||
"""Generate the fingerprint of the public key."""
|
||||
|
||||
if backend == "cryptography":
|
||||
publickey = privatekey.public_key().public_bytes(
|
||||
serialization.Encoding.DER, serialization.PublicFormat.SubjectPublicKeyInfo
|
||||
)
|
||||
publickey = privatekey.public_key().public_bytes(
|
||||
serialization.Encoding.DER, serialization.PublicFormat.SubjectPublicKeyInfo
|
||||
)
|
||||
|
||||
return get_fingerprint_of_bytes(publickey, prefer_one=prefer_one)
|
||||
|
||||
|
||||
def get_fingerprint(
|
||||
path, passphrase=None, content=None, backend="cryptography", prefer_one=False
|
||||
):
|
||||
def get_fingerprint(path, passphrase=None, content=None, prefer_one=False):
|
||||
"""Generate the fingerprint of the public key."""
|
||||
|
||||
privatekey = load_privatekey(
|
||||
@@ -119,16 +116,16 @@ def get_fingerprint(
|
||||
passphrase=passphrase,
|
||||
content=content,
|
||||
check_passphrase=False,
|
||||
backend=backend,
|
||||
)
|
||||
|
||||
return get_fingerprint_of_privatekey(
|
||||
privatekey, backend=backend, prefer_one=prefer_one
|
||||
)
|
||||
return get_fingerprint_of_privatekey(privatekey, prefer_one=prefer_one)
|
||||
|
||||
|
||||
def load_privatekey(
|
||||
path, passphrase=None, check_passphrase=True, content=None, backend="cryptography"
|
||||
path,
|
||||
passphrase=None,
|
||||
check_passphrase=True,
|
||||
content=None,
|
||||
):
|
||||
"""Load the specified OpenSSL private key.
|
||||
|
||||
@@ -145,23 +142,20 @@ def load_privatekey(
|
||||
except (IOError, OSError) as exc:
|
||||
raise OpenSSLObjectError(exc)
|
||||
|
||||
if backend == "cryptography":
|
||||
try:
|
||||
result = load_pem_private_key(
|
||||
priv_key_detail,
|
||||
None if passphrase is None else to_bytes(passphrase),
|
||||
)
|
||||
except TypeError:
|
||||
raise OpenSSLBadPassphraseError(
|
||||
"Wrong or empty passphrase provided for private key"
|
||||
)
|
||||
except ValueError:
|
||||
raise OpenSSLBadPassphraseError("Wrong passphrase provided for private key")
|
||||
|
||||
return result
|
||||
try:
|
||||
return load_pem_private_key(
|
||||
priv_key_detail,
|
||||
None if passphrase is None else to_bytes(passphrase),
|
||||
)
|
||||
except TypeError:
|
||||
raise OpenSSLBadPassphraseError(
|
||||
"Wrong or empty passphrase provided for private key"
|
||||
)
|
||||
except ValueError:
|
||||
raise OpenSSLBadPassphraseError("Wrong passphrase provided for private key")
|
||||
|
||||
|
||||
def load_publickey(path=None, content=None, backend=None):
|
||||
def load_publickey(path=None, content=None):
|
||||
if content is None:
|
||||
if path is None:
|
||||
raise OpenSSLObjectError("Must provide either path or content")
|
||||
@@ -171,16 +165,13 @@ def load_publickey(path=None, content=None, backend=None):
|
||||
except (IOError, OSError) as exc:
|
||||
raise OpenSSLObjectError(exc)
|
||||
|
||||
if backend == "cryptography":
|
||||
try:
|
||||
return serialization.load_pem_public_key(content)
|
||||
except Exception as e:
|
||||
raise OpenSSLObjectError(f"Error while deserializing key: {e}")
|
||||
try:
|
||||
return serialization.load_pem_public_key(content)
|
||||
except Exception as e:
|
||||
raise OpenSSLObjectError(f"Error while deserializing key: {e}")
|
||||
|
||||
|
||||
def load_certificate(
|
||||
path, content=None, backend="cryptography", der_support_enabled=False
|
||||
):
|
||||
def load_certificate(path, content=None, der_support_enabled=False):
|
||||
"""Load the specified certificate."""
|
||||
|
||||
try:
|
||||
@@ -191,20 +182,19 @@ def load_certificate(
|
||||
cert_content = content
|
||||
except (IOError, OSError) as exc:
|
||||
raise OpenSSLObjectError(exc)
|
||||
if backend == "cryptography":
|
||||
if der_support_enabled is False or identify_pem_format(cert_content):
|
||||
try:
|
||||
return x509.load_pem_x509_certificate(cert_content)
|
||||
except ValueError as exc:
|
||||
raise OpenSSLObjectError(exc)
|
||||
elif der_support_enabled:
|
||||
try:
|
||||
return x509.load_der_x509_certificate(cert_content)
|
||||
except ValueError as exc:
|
||||
raise OpenSSLObjectError(f"Cannot parse DER certificate: {exc}")
|
||||
if der_support_enabled is False or identify_pem_format(cert_content):
|
||||
try:
|
||||
return x509.load_pem_x509_certificate(cert_content)
|
||||
except ValueError as exc:
|
||||
raise OpenSSLObjectError(exc)
|
||||
elif der_support_enabled:
|
||||
try:
|
||||
return x509.load_der_x509_certificate(cert_content)
|
||||
except ValueError as exc:
|
||||
raise OpenSSLObjectError(f"Cannot parse DER certificate: {exc}")
|
||||
|
||||
|
||||
def load_certificate_request(path, content=None, backend="cryptography"):
|
||||
def load_certificate_request(path, content=None):
|
||||
"""Load the specified certificate signing request."""
|
||||
try:
|
||||
if content is None:
|
||||
@@ -214,11 +204,10 @@ def load_certificate_request(path, content=None, backend="cryptography"):
|
||||
csr_content = content
|
||||
except (IOError, OSError) as exc:
|
||||
raise OpenSSLObjectError(exc)
|
||||
if backend == "cryptography":
|
||||
try:
|
||||
return x509.load_pem_x509_csr(csr_content)
|
||||
except ValueError as exc:
|
||||
raise OpenSSLObjectError(exc)
|
||||
try:
|
||||
return x509.load_pem_x509_csr(csr_content)
|
||||
except ValueError as exc:
|
||||
raise OpenSSLObjectError(exc)
|
||||
|
||||
|
||||
def parse_name_field(input_dict, name_field_name=None):
|
||||
|
||||
@@ -10,6 +10,53 @@ Must be kept in sync with plugins/doc_fragments/cryptography_dep.py.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import traceback
|
||||
|
||||
from ansible.module_utils.basic import missing_required_lib
|
||||
from ansible_collections.community.crypto.plugins.module_utils.version import (
|
||||
LooseVersion,
|
||||
)
|
||||
|
||||
|
||||
_CRYPTOGRAPHY_IMP_ERR = None
|
||||
try:
|
||||
import cryptography
|
||||
from cryptography import x509 # noqa: F401, pylint: disable=unused-import
|
||||
|
||||
_CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
|
||||
_CRYPTOGRAPHY_FILE = cryptography.__file__
|
||||
except ImportError:
|
||||
_CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
|
||||
_CRYPTOGRAPHY_FOUND = False
|
||||
_CRYPTOGRAPHY_FILE = None
|
||||
else:
|
||||
_CRYPTOGRAPHY_FOUND = True
|
||||
|
||||
|
||||
# Corresponds to the community.crypto.cryptography_dep.minimum doc fragment
|
||||
COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION = "3.3"
|
||||
|
||||
|
||||
def assert_required_cryptography_version(
|
||||
module,
|
||||
*,
|
||||
minimum_cryptography_version: str = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION,
|
||||
) -> None:
|
||||
if not _CRYPTOGRAPHY_FOUND:
|
||||
module.fail_json(
|
||||
msg=missing_required_lib(f"cryptography >= {minimum_cryptography_version}"),
|
||||
exception=_CRYPTOGRAPHY_IMP_ERR,
|
||||
)
|
||||
if _CRYPTOGRAPHY_VERSION < LooseVersion(minimum_cryptography_version):
|
||||
module.fail_json(
|
||||
msg=(
|
||||
f"Cannot detect the required Python library cryptography (>= {minimum_cryptography_version})."
|
||||
f" Only found a too old version ({_CRYPTOGRAPHY_VERSION}) at {_CRYPTOGRAPHY_FILE}."
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
__all__ = (
|
||||
"COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION",
|
||||
"assert_required_cryptography_version",
|
||||
)
|
||||
|
||||
@@ -22,6 +22,7 @@ from ansible_collections.community.crypto.plugins.module_utils.openssh.backends.
|
||||
parse_private_key_format,
|
||||
)
|
||||
from ansible_collections.community.crypto.plugins.module_utils.openssh.cryptography import (
|
||||
CRYPTOGRAPHY_VERSION,
|
||||
HAS_OPENSSH_SUPPORT,
|
||||
InvalidCommentError,
|
||||
InvalidPassphraseError,
|
||||
@@ -346,8 +347,7 @@ class KeypairBackendOpensshBin(KeypairBackend):
|
||||
|
||||
if self.module.params["private_key_format"] != "auto":
|
||||
self.module.fail_json(
|
||||
msg="'auto' is the only valid option for "
|
||||
+ "'private_key_format' when 'backend' is not 'cryptography'"
|
||||
msg="'auto' is the only valid option for 'private_key_format' when 'backend' is not 'cryptography'"
|
||||
)
|
||||
|
||||
self.ssh_keygen = KeygenCommand(self.module)
|
||||
@@ -531,7 +531,9 @@ class KeypairBackendCryptography(KeypairBackend):
|
||||
|
||||
|
||||
def select_backend(module, backend):
|
||||
can_use_cryptography = HAS_OPENSSH_SUPPORT
|
||||
can_use_cryptography = HAS_OPENSSH_SUPPORT and LooseVersion(
|
||||
CRYPTOGRAPHY_VERSION
|
||||
) >= LooseVersion(COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION)
|
||||
can_use_opensshbin = bool(module.get_bin_path("ssh-keygen"))
|
||||
|
||||
if backend == "auto":
|
||||
@@ -550,14 +552,13 @@ def select_backend(module, backend):
|
||||
if backend == "opensshbin":
|
||||
if not can_use_opensshbin:
|
||||
module.fail_json(msg="Cannot find the OpenSSH binary in the PATH")
|
||||
return backend, KeypairBackendOpensshBin(module)
|
||||
elif backend == "cryptography":
|
||||
return KeypairBackendOpensshBin(module)
|
||||
if backend == "cryptography":
|
||||
if not can_use_cryptography:
|
||||
module.fail_json(
|
||||
msg=missing_required_lib(
|
||||
f"cryptography >= {COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION}"
|
||||
)
|
||||
)
|
||||
return backend, KeypairBackendCryptography(module)
|
||||
else:
|
||||
raise ValueError(f"Unsupported value for backend: {backend}")
|
||||
return KeypairBackendCryptography(module)
|
||||
raise ValueError(f"Unsupported value for backend: {backend}")
|
||||
|
||||
@@ -13,7 +13,6 @@ from socket import gethostname
|
||||
try:
|
||||
from cryptography import __version__ as CRYPTOGRAPHY_VERSION
|
||||
from cryptography.exceptions import InvalidSignature, UnsupportedAlgorithm
|
||||
from cryptography.hazmat.backends.openssl import backend
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import dsa, ec, padding, rsa
|
||||
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
|
||||
@@ -149,19 +148,16 @@ class AsymmetricKeypair:
|
||||
# if improper padding is used during signing
|
||||
public_exponent=65537,
|
||||
key_size=size,
|
||||
backend=backend,
|
||||
)
|
||||
elif keytype == "dsa":
|
||||
privatekey = dsa.generate_private_key(
|
||||
key_size=size,
|
||||
backend=backend,
|
||||
)
|
||||
elif keytype == "ed25519":
|
||||
privatekey = Ed25519PrivateKey.generate()
|
||||
elif keytype == "ecdsa":
|
||||
privatekey = ec.generate_private_key(
|
||||
_ALGORITHM_PARAMETERS["ecdsa"]["curves"][size],
|
||||
backend=backend,
|
||||
)
|
||||
|
||||
publickey = privatekey.public_key()
|
||||
@@ -574,7 +570,6 @@ def load_privatekey(path, passphrase, key_format):
|
||||
privatekey = privatekey_loader(
|
||||
data=content,
|
||||
password=passphrase,
|
||||
backend=backend,
|
||||
)
|
||||
|
||||
except ValueError as e:
|
||||
@@ -584,7 +579,6 @@ def load_privatekey(path, passphrase, key_format):
|
||||
privatekey = privatekey_loaders["PEM"](
|
||||
data=content,
|
||||
password=passphrase,
|
||||
backend=backend,
|
||||
)
|
||||
except ValueError as e:
|
||||
raise InvalidPrivateKeyFileError(e)
|
||||
@@ -625,7 +619,6 @@ def load_publickey(path, key_format):
|
||||
|
||||
publickey = publickey_loader(
|
||||
data=content,
|
||||
backend=backend,
|
||||
)
|
||||
except ValueError as e:
|
||||
raise InvalidPublicKeyFileError(e)
|
||||
@@ -692,7 +685,7 @@ def extract_comment(path):
|
||||
|
||||
|
||||
def calculate_fingerprint(openssh_publickey):
|
||||
digest = hashes.Hash(hashes.SHA256(), backend=backend)
|
||||
digest = hashes.Hash(hashes.SHA256())
|
||||
decoded_pubkey = b64decode(openssh_publickey.split(b" ")[1])
|
||||
digest.update(decoded_pubkey)
|
||||
|
||||
|
||||
@@ -115,14 +115,12 @@ def convert_relative_to_datetime(relative_time_string, with_timezone=False, now=
|
||||
return now - offset
|
||||
|
||||
|
||||
def get_relative_time_option(
|
||||
input_string, input_name, backend="cryptography", with_timezone=False, now=None
|
||||
):
|
||||
def get_relative_time_option(input_string, input_name, with_timezone=False, now=None):
|
||||
"""
|
||||
Return an absolute timespec if a relative timespec or an ASN1 formatted
|
||||
string is provided.
|
||||
|
||||
The return value will be a datetime object for the cryptography backend.
|
||||
The return value will be a datetime object.
|
||||
"""
|
||||
result = to_native(input_string)
|
||||
if result is None:
|
||||
@@ -131,34 +129,31 @@ def get_relative_time_option(
|
||||
)
|
||||
# Relative time
|
||||
if result.startswith("+") or result.startswith("-"):
|
||||
result_datetime = convert_relative_to_datetime(
|
||||
return convert_relative_to_datetime(
|
||||
result, with_timezone=with_timezone, now=now
|
||||
)
|
||||
if backend == "cryptography":
|
||||
return result_datetime
|
||||
# Absolute time
|
||||
if backend == "cryptography":
|
||||
for date_fmt, length in [
|
||||
(
|
||||
"%Y%m%d%H%M%SZ",
|
||||
15,
|
||||
), # this also parses '202401020304Z', but as datetime(2024, 1, 2, 3, 0, 4)
|
||||
("%Y%m%d%H%MZ", 13),
|
||||
(
|
||||
"%Y%m%d%H%M%S%z",
|
||||
14 + 5,
|
||||
), # this also parses '202401020304+0000', but as datetime(2024, 1, 2, 3, 0, 4, tzinfo=...)
|
||||
("%Y%m%d%H%M%z", 12 + 5),
|
||||
]:
|
||||
if len(result) != length:
|
||||
continue
|
||||
try:
|
||||
res = datetime.datetime.strptime(result, date_fmt)
|
||||
except ValueError:
|
||||
pass
|
||||
else:
|
||||
return add_or_remove_timezone(res, with_timezone=with_timezone)
|
||||
for date_fmt, length in [
|
||||
(
|
||||
"%Y%m%d%H%M%SZ",
|
||||
15,
|
||||
), # this also parses '202401020304Z', but as datetime(2024, 1, 2, 3, 0, 4)
|
||||
("%Y%m%d%H%MZ", 13),
|
||||
(
|
||||
"%Y%m%d%H%M%S%z",
|
||||
14 + 5,
|
||||
), # this also parses '202401020304+0000', but as datetime(2024, 1, 2, 3, 0, 4, tzinfo=...)
|
||||
("%Y%m%d%H%M%z", 12 + 5),
|
||||
]:
|
||||
if len(result) != length:
|
||||
continue
|
||||
try:
|
||||
res = datetime.datetime.strptime(result, date_fmt)
|
||||
except ValueError:
|
||||
pass
|
||||
else:
|
||||
return add_or_remove_timezone(res, with_timezone=with_timezone)
|
||||
|
||||
raise OpenSSLObjectError(
|
||||
f'The time spec "{input_string}" for {input_name} is invalid'
|
||||
)
|
||||
raise OpenSSLObjectError(
|
||||
f'The time spec "{input_string}" for {input_name} is invalid'
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user