mirror of
https://github.com/ansible-collections/community.crypto.git
synced 2026-03-26 21:33:25 +00:00
More refactorings (#890)
* Improve typing. * Improve version parameter validation for x509_certificate* modules. * Use utils for parsing retry-after.
This commit is contained in:
@@ -265,6 +265,8 @@ options:
|
||||
- This is only used by the V(ownca) provider.
|
||||
type: int
|
||||
default: 3
|
||||
choices:
|
||||
- 3
|
||||
|
||||
ownca_not_before:
|
||||
description:
|
||||
@@ -351,6 +353,8 @@ options:
|
||||
- This is only used by the V(selfsigned) provider.
|
||||
type: int
|
||||
default: 3
|
||||
choices:
|
||||
- 3
|
||||
|
||||
selfsigned_digest:
|
||||
description:
|
||||
|
||||
@@ -43,6 +43,9 @@ from ansible_collections.community.crypto.plugins.module_utils._acme.utils impor
|
||||
from ansible_collections.community.crypto.plugins.module_utils._argspec import (
|
||||
ArgumentSpec,
|
||||
)
|
||||
from ansible_collections.community.crypto.plugins.module_utils._time import (
|
||||
get_now_datetime,
|
||||
)
|
||||
|
||||
|
||||
if t.TYPE_CHECKING:
|
||||
@@ -79,9 +82,13 @@ def _decode_retry(
|
||||
)
|
||||
|
||||
# 429 and 503 should have a Retry-After header (https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After)
|
||||
now = get_now_datetime(with_timezone=True)
|
||||
try:
|
||||
# TODO: use utils.parse_retry_after()
|
||||
retry_after = min(max(1, int(info.get("retry-after", "10"))), 60)
|
||||
then = parse_retry_after(
|
||||
info.get("retry-after", "10"), relative_with_timezone=True, now=now
|
||||
)
|
||||
retry_after = (then - now).total_seconds()
|
||||
retry_after = min(max(1, retry_after), 60)
|
||||
except (TypeError, ValueError):
|
||||
retry_after = 10
|
||||
module.log(
|
||||
|
||||
@@ -360,10 +360,6 @@ class CertificateProvider(metaclass=abc.ABCMeta):
|
||||
def validate_module_args(self, module: AnsibleModule) -> None:
|
||||
"""Check module arguments"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def needs_version_two_certs(self, module: AnsibleModule) -> bool:
|
||||
"""Whether the provider needs to create a version 2 certificate."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def create_backend(self, module: AnsibleModule) -> CertificateBackend:
|
||||
"""Create an implementation for a backend.
|
||||
@@ -381,12 +377,6 @@ def select_backend(
|
||||
module, minimum_cryptography_version=MINIMAL_CRYPTOGRAPHY_VERSION
|
||||
)
|
||||
|
||||
if provider.needs_version_two_certs(module):
|
||||
# TODO: remove
|
||||
module.fail_json(
|
||||
msg="The cryptography backend does not support v2 certificates"
|
||||
)
|
||||
|
||||
return provider.create_backend(module)
|
||||
|
||||
|
||||
|
||||
@@ -121,9 +121,6 @@ class AcmeCertificateProvider(CertificateProvider):
|
||||
msg="The acme_challenge_path option must be specified for the acme provider."
|
||||
)
|
||||
|
||||
def needs_version_two_certs(self, module: AnsibleModule) -> bool:
|
||||
return False
|
||||
|
||||
def create_backend(self, module: AnsibleModule) -> AcmeCertificateBackend:
|
||||
return AcmeCertificateBackend(module=module)
|
||||
|
||||
|
||||
@@ -226,9 +226,6 @@ class EntrustCertificateProvider(CertificateProvider):
|
||||
def validate_module_args(self, module: AnsibleModule) -> None:
|
||||
pass
|
||||
|
||||
def needs_version_two_certs(self, module: AnsibleModule) -> t.Literal[False]:
|
||||
return False
|
||||
|
||||
def create_backend(self, module: AnsibleModule) -> EntrustCertificateBackend:
|
||||
return EntrustCertificateBackend(module=module)
|
||||
|
||||
|
||||
@@ -82,7 +82,6 @@ class OwnCACertificateBackendCryptography(CertificateBackend):
|
||||
with_timezone=CRYPTOGRAPHY_TIMEZONE,
|
||||
)
|
||||
self.digest = select_message_digest(module.params["ownca_digest"])
|
||||
self.version: int = module.params["ownca_version"]
|
||||
self.serial_number = x509.random_serial_number()
|
||||
self.ca_cert_path: str | None = module.params["ownca_path"]
|
||||
ca_cert_content: str | None = module.params["ownca_content"]
|
||||
@@ -335,9 +334,6 @@ class OwnCACertificateProvider(CertificateProvider):
|
||||
msg="One of ownca_privatekey_path and ownca_privatekey_content must be specified for the ownca provider."
|
||||
)
|
||||
|
||||
def needs_version_two_certs(self, module: AnsibleModule) -> bool:
|
||||
return module.params["ownca_version"] == 2
|
||||
|
||||
def create_backend(
|
||||
self, module: AnsibleModule
|
||||
) -> OwnCACertificateBackendCryptography:
|
||||
@@ -354,7 +350,7 @@ def add_ownca_provider_to_argument_spec(argument_spec: ArgumentSpec) -> None:
|
||||
ownca_privatekey_content=dict(type="str", no_log=True),
|
||||
ownca_privatekey_passphrase=dict(type="str", no_log=True),
|
||||
ownca_digest=dict(type="str", default="sha256"),
|
||||
ownca_version=dict(type="int", default=3),
|
||||
ownca_version=dict(type="int", default=3, choices=[3]), # not used
|
||||
ownca_not_before=dict(type="str", default="+0s"),
|
||||
ownca_not_after=dict(type="str", default="+3650d"),
|
||||
ownca_create_subject_key_identifier=dict(
|
||||
|
||||
@@ -75,7 +75,6 @@ class SelfSignedCertificateBackendCryptography(CertificateBackend):
|
||||
with_timezone=CRYPTOGRAPHY_TIMEZONE,
|
||||
)
|
||||
self.digest = select_message_digest(module.params["selfsigned_digest"])
|
||||
self.version: int = module.params["selfsigned_version"]
|
||||
self.serial_number = x509.random_serial_number()
|
||||
|
||||
if self.csr_path is not None and not os.path.exists(self.csr_path):
|
||||
@@ -235,9 +234,6 @@ class SelfSignedCertificateProvider(CertificateProvider):
|
||||
msg="One of privatekey_path and privatekey_content must be specified for the selfsigned provider."
|
||||
)
|
||||
|
||||
def needs_version_two_certs(self, module: AnsibleModule) -> bool:
|
||||
return module.params["selfsigned_version"] == 2
|
||||
|
||||
def create_backend(
|
||||
self, module: AnsibleModule
|
||||
) -> SelfSignedCertificateBackendCryptography:
|
||||
@@ -248,7 +244,7 @@ def add_selfsigned_provider_to_argument_spec(argument_spec: ArgumentSpec) -> Non
|
||||
argument_spec.argument_spec["provider"]["choices"].append("selfsigned")
|
||||
argument_spec.argument_spec.update(
|
||||
dict(
|
||||
selfsigned_version=dict(type="int", default=3),
|
||||
selfsigned_version=dict(type="int", default=3, choices=[3]), # not used
|
||||
selfsigned_digest=dict(type="str", default="sha256"),
|
||||
selfsigned_not_before=dict(
|
||||
type="str", default="+0s", aliases=["selfsigned_notBefore"]
|
||||
|
||||
@@ -233,12 +233,14 @@ class AsymmetricKeypair:
|
||||
privatekey = load_privatekey(
|
||||
path=path, passphrase=passphrase, key_format=private_key_format
|
||||
)
|
||||
publickey: AllPublicKeyTypes
|
||||
if no_public_key:
|
||||
publickey = privatekey.public_key()
|
||||
else:
|
||||
# TODO: BUG: load_publickey() can return unsupported key types
|
||||
# (Also we should check whether the public key fits the private key...)
|
||||
publickey = load_publickey(path=path + ".pub", key_format=public_key_format) # type: ignore
|
||||
# TODO: Maybe we should check whether the public key actually fits the private key?
|
||||
publickey = load_publickey(
|
||||
path=str(path) + ".pub", key_format=public_key_format
|
||||
)
|
||||
|
||||
# Ed25519 keys are always of size 256 and do not have a key_size attribute
|
||||
if isinstance(privatekey, Ed25519PrivateKey):
|
||||
@@ -249,12 +251,28 @@ class AsymmetricKeypair:
|
||||
keytype: KeyType
|
||||
if isinstance(privatekey, rsa.RSAPrivateKey):
|
||||
keytype = "rsa"
|
||||
if not isinstance(publickey, rsa.RSAPublicKey):
|
||||
raise InvalidKeyTypeError(
|
||||
f"Private key is an RSA key, but public key is of type '{type(publickey)}'"
|
||||
)
|
||||
elif isinstance(privatekey, dsa.DSAPrivateKey):
|
||||
keytype = "dsa"
|
||||
if not isinstance(publickey, dsa.DSAPublicKey):
|
||||
raise InvalidKeyTypeError(
|
||||
f"Private key is a DSA key, but public key is of type '{type(publickey)}'"
|
||||
)
|
||||
elif isinstance(privatekey, ec.EllipticCurvePrivateKey):
|
||||
keytype = "ecdsa"
|
||||
if not isinstance(publickey, ec.EllipticCurvePublicKey):
|
||||
raise InvalidKeyTypeError(
|
||||
f"Private key is an Elliptic Curve key, but public key is of type '{type(publickey)}'"
|
||||
)
|
||||
elif isinstance(privatekey, Ed25519PrivateKey):
|
||||
keytype = "ed25519"
|
||||
if not isinstance(publickey, Ed25519PublicKey):
|
||||
raise InvalidKeyTypeError(
|
||||
f"Private key is an Ed25519 key, but public key is of type '{type(publickey)}'"
|
||||
)
|
||||
else:
|
||||
raise InvalidKeyTypeError(f"Key type '{type(privatekey)}' is not supported")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user