From d83a9233252602e43d67a8735ce00ff7007fdceb Mon Sep 17 00:00:00 2001 From: Felix Fontein Date: Mon, 9 Jun 2025 10:10:19 +0200 Subject: [PATCH] Ensure that *everything* is typed in community.crypto (#917) * Ensure that *everything* is typed in community.crypto. * Fix comment. * Ignore type definitions/imports and AssertionErrors for code coverage. --- .mypy.ini | 5 +- changelogs/fragments/refactoring.yml | 3 +- plugins/action/openssl_privatekey_pipe.py | 6 +- plugins/lookup/gpg_fingerprint.py | 8 ++- plugins/module_utils/_acme/account.py | 2 +- plugins/module_utils/_acme/acme.py | 22 +++++--- .../_acme/backend_cryptography.py | 6 +- .../module_utils/_acme/backend_openssl_cli.py | 4 +- plugins/module_utils/_acme/backends.py | 30 +++++++--- plugins/module_utils/_acme/certificate.py | 8 +-- plugins/module_utils/_acme/certificates.py | 2 +- plugins/module_utils/_acme/challenges.py | 4 +- plugins/module_utils/_acme/errors.py | 11 ++-- plugins/module_utils/_acme/io.py | 2 +- plugins/module_utils/_acme/orders.py | 2 +- plugins/module_utils/_acme/utils.py | 2 +- plugins/module_utils/_argspec.py | 4 +- plugins/module_utils/_crypto/_obj2txt.py | 4 +- plugins/module_utils/_crypto/_objects.py | 4 +- .../module_utils/_crypto/cryptography_crl.py | 2 +- .../_crypto/cryptography_support.py | 27 +++++---- .../_crypto/module_backends/certificate.py | 24 ++++---- .../module_backends/certificate_acme.py | 8 ++- .../module_backends/certificate_info.py | 18 +++--- .../module_backends/certificate_ownca.py | 16 ++++-- .../module_backends/certificate_selfsigned.py | 18 +++--- .../_crypto/module_backends/crl_info.py | 12 ++-- .../_crypto/module_backends/csr.py | 20 ++++--- .../_crypto/module_backends/csr_info.py | 12 ++-- .../_crypto/module_backends/privatekey.py | 24 +++++--- .../module_backends/privatekey_convert.py | 8 +-- .../module_backends/privatekey_info.py | 12 ++-- .../_crypto/module_backends/publickey_info.py | 16 +++--- plugins/module_utils/_crypto/support.py | 6 +- plugins/module_utils/_cryptography_dep.py | 10 ++-- plugins/module_utils/_io.py | 2 +- .../module_utils/_openssh/backends/common.py | 55 +++++++++++++------ .../_openssh/backends/keypair_backend.py | 4 +- plugins/module_utils/_openssh/certificate.py | 47 +++++++++++++--- plugins/module_utils/_openssh/cryptography.py | 12 ++-- plugins/modules/acme_account.py | 4 +- plugins/modules/acme_account_info.py | 4 +- plugins/modules/acme_certificate.py | 6 +- .../acme_certificate_order_finalize.py | 2 +- .../acme_certificate_order_validate.py | 2 +- .../modules/acme_certificate_renewal_info.py | 2 +- plugins/modules/acme_certificate_revoke.py | 2 +- plugins/modules/certificate_complete_chain.py | 4 +- plugins/modules/openssh_cert.py | 36 +++++++++--- plugins/modules/openssl_csr.py | 4 +- plugins/modules/openssl_csr_pipe.py | 4 +- plugins/modules/openssl_dhparam.py | 2 +- plugins/modules/openssl_pkcs12.py | 6 +- plugins/modules/openssl_privatekey.py | 4 +- plugins/modules/openssl_privatekey_convert.py | 8 ++- plugins/modules/openssl_publickey.py | 2 +- plugins/modules/x509_certificate.py | 4 +- plugins/modules/x509_certificate_pipe.py | 4 +- plugins/modules/x509_crl.py | 2 +- plugins/plugin_utils/_action_module.py | 10 ++-- plugins/plugin_utils/_filter_module.py | 2 +- .../module_utils/_acme/backend_data.py | 34 ++++++++---- .../_acme/test_backend_cryptography.py | 40 ++++++++------ .../_acme/test_backend_openssl_cli.py | 40 ++++++++------ .../plugins/module_utils/_acme/test_io.py | 13 +++-- .../plugins/module_utils/_acme/test_utils.py | 11 ++-- .../plugins/module_utils/_crypto/test_asn1.py | 3 +- .../_crypto/test_cryptography_support.py | 2 +- .../plugins/module_utils/_crypto/test_pem.py | 2 +- .../module_utils/_openssh/test_certificate.py | 32 ++++++----- .../_openssh/test_cryptography.py | 2 +- tests/unit/plugins/module_utils/test__time.py | 4 +- .../unit/plugins/modules/test_luks_device.py | 32 ++++++----- 73 files changed, 494 insertions(+), 317 deletions(-) diff --git a/.mypy.ini b/.mypy.ini index 82321070..5104700a 100644 --- a/.mypy.ini +++ b/.mypy.ini @@ -4,7 +4,7 @@ [mypy] check_untyped_defs = True -# disallow_untyped_defs = True -- not yet feasible +disallow_untyped_defs = True # strict = True -- only try to enable once everything (including dependencies!) is typed strict_equality = True @@ -15,8 +15,7 @@ warn_redundant_casts = True warn_unreachable = True [mypy-ansible.*] -# ansible-core has no typing information -# ignore_missing_imports = True +# ansible-core has partial typing information follow_untyped_imports = True [mypy-ansible_collections.community.internal_test_tools.*] diff --git a/changelogs/fragments/refactoring.yml b/changelogs/fragments/refactoring.yml index 5554127b..2d123059 100644 --- a/changelogs/fragments/refactoring.yml +++ b/changelogs/fragments/refactoring.yml @@ -3,5 +3,6 @@ minor_changes: https://github.com/ansible-collections/community.crypto/pull/909, https://github.com/ansible-collections/community.crypto/pull/911, https://github.com/ansible-collections/community.crypto/pull/913, - https://github.com/ansible-collections/community.crypto/pull/914)." + https://github.com/ansible-collections/community.crypto/pull/914, + https://github.com/ansible-collections/community.crypto/pull/917)." - "Remove various no longer needed abstraction layers for multiple backends (https://github.com/ansible-collections/community.crypto/pull/912)." diff --git a/plugins/action/openssl_privatekey_pipe.py b/plugins/action/openssl_privatekey_pipe.py index 33842e6b..b7c5634c 100644 --- a/plugins/action/openssl_privatekey_pipe.py +++ b/plugins/action/openssl_privatekey_pipe.py @@ -21,13 +21,13 @@ from ansible_collections.community.crypto.plugins.plugin_utils._action_module im if t.TYPE_CHECKING: - from ansible_collections.community.crypto.plugins.module_utils._argspec import ( + from ansible_collections.community.crypto.plugins.module_utils._argspec import ( # pragma: no cover ArgumentSpec, ) - from ansible_collections.community.crypto.plugins.module_utils._crypto.module_backends.privatekey import ( + from ansible_collections.community.crypto.plugins.module_utils._crypto.module_backends.privatekey import ( # pragma: no cover PrivateKeyBackend, ) - from ansible_collections.community.crypto.plugins.plugin_utils._action_module import ( + from ansible_collections.community.crypto.plugins.plugin_utils._action_module import ( # pragma: no cover AnsibleActionModule, ) diff --git a/plugins/lookup/gpg_fingerprint.py b/plugins/lookup/gpg_fingerprint.py index 3b428c20..5ac1050e 100644 --- a/plugins/lookup/gpg_fingerprint.py +++ b/plugins/lookup/gpg_fingerprint.py @@ -58,10 +58,14 @@ from ansible_collections.community.crypto.plugins.plugin_utils._gnupg import ( class LookupModule(LookupBase): - def run(self, terms: list[t.Any], variables=None, **kwargs) -> list[str]: + def run( + self, terms: list[t.Any], variables: None = None, **kwargs: t.Any + ) -> list[str]: self.set_options(direct=kwargs) if self._loader is None: - raise AssertionError("Contract violation: self._loader is None") + raise AssertionError( + "Contract violation: self._loader is None" + ) # pragma: no cover try: gpg = PluginGPGRunner(cwd=self._loader.get_basedir()) diff --git a/plugins/module_utils/_acme/account.py b/plugins/module_utils/_acme/account.py index 498bcbaa..d5085c9c 100644 --- a/plugins/module_utils/_acme/account.py +++ b/plugins/module_utils/_acme/account.py @@ -18,7 +18,7 @@ from ansible_collections.community.crypto.plugins.module_utils._acme.errors impo if t.TYPE_CHECKING: - from ansible_collections.community.crypto.plugins.module_utils._acme.acme import ( + from ansible_collections.community.crypto.plugins.module_utils._acme.acme import ( # pragma: no cover ACMEClient, ) diff --git a/plugins/module_utils/_acme/acme.py b/plugins/module_utils/_acme/acme.py index acc2cfca..42cabd86 100644 --- a/plugins/module_utils/_acme/acme.py +++ b/plugins/module_utils/_acme/acme.py @@ -49,13 +49,15 @@ from ansible_collections.community.crypto.plugins.module_utils._time import ( if t.TYPE_CHECKING: - import os + import http.client # pragma: no cover + import os # pragma: no cover + import urllib.error # pragma: no cover - from ansible.module_utils.basic import AnsibleModule - from ansible_collections.community.crypto.plugins.module_utils._acme.account import ( + from ansible.module_utils.basic import AnsibleModule # pragma: no cover + from ansible_collections.community.crypto.plugins.module_utils._acme.account import ( # pragma: no cover ACMEAccount, ) - from ansible_collections.community.crypto.plugins.module_utils._acme.backends import ( + from ansible_collections.community.crypto.plugins.module_utils._acme.backends import ( # pragma: no cover CertificateInformation, CryptoBackend, ) @@ -68,7 +70,11 @@ RETRY_COUNT = 10 def _decode_retry( - *, module: AnsibleModule, response: t.Any, info: dict[str, t.Any], retry_count: int + *, + module: AnsibleModule, + response: urllib.error.HTTPError | http.client.HTTPResponse | None, + info: dict[str, t.Any], + retry_count: int, ) -> bool: if info["status"] not in RETRY_STATUS_CODES: return False @@ -102,7 +108,7 @@ def _decode_retry( def _assert_fetch_url_success( *, module: AnsibleModule, - response: t.Any, + response: urllib.error.HTTPError | http.client.HTTPResponse | None, info: dict[str, t.Any], allow_redirect: bool = False, allow_client_error: bool = True, @@ -288,7 +294,9 @@ class ACMEClient: In case of an error, raises KeyParsingError. """ if key_file is None and key_content is None: - raise AssertionError("One of key_file and key_content must be specified!") + raise AssertionError( + "One of key_file and key_content must be specified!" + ) # pragma: no cover return self.backend.parse_key( key_file=key_file, key_content=key_content, passphrase=passphrase ) diff --git a/plugins/module_utils/_acme/backend_cryptography.py b/plugins/module_utils/_acme/backend_cryptography.py index 003f5e37..52639d40 100644 --- a/plugins/module_utils/_acme/backend_cryptography.py +++ b/plugins/module_utils/_acme/backend_cryptography.py @@ -80,10 +80,10 @@ else: ) if t.TYPE_CHECKING: - import datetime + import datetime # pragma: no cover - from ansible.module_utils.basic import AnsibleModule - from ansible_collections.community.crypto.plugins.module_utils._acme.certificates import ( + from ansible.module_utils.basic import AnsibleModule # pragma: no cover + from ansible_collections.community.crypto.plugins.module_utils._acme.certificates import ( # pragma: no cover CertificateChain, Criterium, ) diff --git a/plugins/module_utils/_acme/backend_openssl_cli.py b/plugins/module_utils/_acme/backend_openssl_cli.py index 53a0e5be..f73b03c9 100644 --- a/plugins/module_utils/_acme/backend_openssl_cli.py +++ b/plugins/module_utils/_acme/backend_openssl_cli.py @@ -39,8 +39,8 @@ from ansible_collections.community.crypto.plugins.module_utils._time import ( if t.TYPE_CHECKING: - from ansible.module_utils.basic import AnsibleModule - from ansible_collections.community.crypto.plugins.module_utils._acme.certificates import ( + from ansible.module_utils.basic import AnsibleModule # pragma: no cover + from ansible_collections.community.crypto.plugins.module_utils._acme.certificates import ( # pragma: no cover Criterium, ) diff --git a/plugins/module_utils/_acme/backends.py b/plugins/module_utils/_acme/backends.py index 7ccbf796..d13423b5 100644 --- a/plugins/module_utils/_acme/backends.py +++ b/plugins/module_utils/_acme/backends.py @@ -31,10 +31,10 @@ from ansible_collections.community.crypto.plugins.module_utils._time import ( if t.TYPE_CHECKING: - import os + import os # pragma: no cover - from ansible.module_utils.basic import AnsibleModule - from ansible_collections.community.crypto.plugins.module_utils._acme.certificates import ( + from ansible.module_utils.basic import AnsibleModule # pragma: no cover + from ansible_collections.community.crypto.plugins.module_utils._acme.certificates import ( # pragma: no cover ChainMatcher, Criterium, ) @@ -132,12 +132,24 @@ class CryptoBackend(metaclass=abc.ABCMeta): start + percentage * (end - start), with_timezone=self._with_timezone ) - def get_utc_datetime(self, *args, **kwargs) -> datetime.datetime: - kwargs_ext: dict[str, t.Any] = dict(kwargs) - if self._with_timezone and ("tzinfo" not in kwargs_ext and len(args) < 8): - kwargs_ext["tzinfo"] = UTC - result = datetime.datetime(*args, **kwargs_ext) - if self._with_timezone and ("tzinfo" in kwargs or len(args) >= 8): + def get_utc_datetime( + self, + year: int, + month: int, + day: int, + hour: int = 0, + minute: int = 0, + second: int = 0, + microsecond: int = 0, + tzinfo: datetime.timezone | None = None, + ) -> datetime.datetime: + has_tzinfo = tzinfo is not None + if self._with_timezone and not has_tzinfo: + tzinfo = UTC + result = datetime.datetime( + year, month, day, hour, minute, second, microsecond, tzinfo + ) + if self._with_timezone and has_tzinfo: result = ensure_utc_timezone(result) return result diff --git a/plugins/module_utils/_acme/certificate.py b/plugins/module_utils/_acme/certificate.py index 765d23c1..06ff439f 100644 --- a/plugins/module_utils/_acme/certificate.py +++ b/plugins/module_utils/_acme/certificate.py @@ -37,14 +37,14 @@ from ansible_collections.community.crypto.plugins.module_utils._acme.utils impor if t.TYPE_CHECKING: - from ansible.module_utils.basic import AnsibleModule - from ansible_collections.community.crypto.plugins.module_utils._acme.backends import ( + from ansible.module_utils.basic import AnsibleModule # pragma: no cover + from ansible_collections.community.crypto.plugins.module_utils._acme.backends import ( # pragma: no cover CryptoBackend, ) - from ansible_collections.community.crypto.plugins.module_utils._acme.certificates import ( + from ansible_collections.community.crypto.plugins.module_utils._acme.certificates import ( # pragma: no cover ChainMatcher, ) - from ansible_collections.community.crypto.plugins.module_utils._acme.challenges import ( + from ansible_collections.community.crypto.plugins.module_utils._acme.challenges import ( # pragma: no cover Challenge, ) diff --git a/plugins/module_utils/_acme/certificates.py b/plugins/module_utils/_acme/certificates.py index df189389..196fcacf 100644 --- a/plugins/module_utils/_acme/certificates.py +++ b/plugins/module_utils/_acme/certificates.py @@ -24,7 +24,7 @@ from ansible_collections.community.crypto.plugins.module_utils._crypto.pem impor if t.TYPE_CHECKING: - from ansible_collections.community.crypto.plugins.module_utils._acme.acme import ( + from ansible_collections.community.crypto.plugins.module_utils._acme.acme import ( # pragma: no cover ACMEClient, ) diff --git a/plugins/module_utils/_acme/challenges.py b/plugins/module_utils/_acme/challenges.py index f6306ebd..ebeca50c 100644 --- a/plugins/module_utils/_acme/challenges.py +++ b/plugins/module_utils/_acme/challenges.py @@ -28,8 +28,8 @@ from ansible_collections.community.crypto.plugins.module_utils._acme.utils impor if t.TYPE_CHECKING: - from ansible.module_utils.basic import AnsibleModule - from ansible_collections.community.crypto.plugins.module_utils._acme.acme import ( + from ansible.module_utils.basic import AnsibleModule # pragma: no cover + from ansible_collections.community.crypto.plugins.module_utils._acme.acme import ( # pragma: no cover ACMEClient, ) diff --git a/plugins/module_utils/_acme/errors.py b/plugins/module_utils/_acme/errors.py index 185db98f..f157fbb9 100644 --- a/plugins/module_utils/_acme/errors.py +++ b/plugins/module_utils/_acme/errors.py @@ -15,7 +15,10 @@ from ansible.module_utils.common.text.converters import to_text if t.TYPE_CHECKING: - from ansible.module_utils.basic import AnsibleModule + import http.client # pragma: no cover + import urllib.error # pragma: no cover + + from ansible.module_utils.basic import AnsibleModule # pragma: no cover def format_http_status(status_code: int) -> str: @@ -59,7 +62,7 @@ class ModuleFailException(Exception): self.msg = msg self.module_fail_args = args - def do_fail(self, *, module: AnsibleModule, **arguments) -> t.NoReturn: + def do_fail(self, *, module: AnsibleModule, **arguments: t.Any) -> t.NoReturn: module.fail_json(msg=self.msg, other=self.module_fail_args, **arguments) @@ -70,11 +73,11 @@ class ACMEProtocolException(ModuleFailException): module: AnsibleModule, msg: str | None = None, info: dict[str, t.Any] | None = None, - response=None, + response: urllib.error.HTTPError | http.client.HTTPResponse | None = None, content: bytes | None = None, content_json: object | bytes | None = None, extras: dict[str, t.Any] | None = None, - ): + ) -> None: # Try to get hold of content, if response is given and content is not provided if content is None and content_json is None and response is not None: try: diff --git a/plugins/module_utils/_acme/io.py b/plugins/module_utils/_acme/io.py index c268d342..3ab0bd27 100644 --- a/plugins/module_utils/_acme/io.py +++ b/plugins/module_utils/_acme/io.py @@ -21,7 +21,7 @@ from ansible_collections.community.crypto.plugins.module_utils._acme.errors impo if t.TYPE_CHECKING: - from ansible.module_utils.basic import AnsibleModule + from ansible.module_utils.basic import AnsibleModule # pragma: no cover def read_file(fn: str | os.PathLike) -> bytes: diff --git a/plugins/module_utils/_acme/orders.py b/plugins/module_utils/_acme/orders.py index 9c10edb5..d5421d13 100644 --- a/plugins/module_utils/_acme/orders.py +++ b/plugins/module_utils/_acme/orders.py @@ -25,7 +25,7 @@ from ansible_collections.community.crypto.plugins.module_utils._acme.utils impor if t.TYPE_CHECKING: - from ansible_collections.community.crypto.plugins.module_utils._acme.acme import ( + from ansible_collections.community.crypto.plugins.module_utils._acme.acme import ( # pragma: no cover ACMEClient, ) diff --git a/plugins/module_utils/_acme/utils.py b/plugins/module_utils/_acme/utils.py index f7e253e2..4f277c76 100644 --- a/plugins/module_utils/_acme/utils.py +++ b/plugins/module_utils/_acme/utils.py @@ -29,7 +29,7 @@ from ansible_collections.community.crypto.plugins.module_utils._time import ( if t.TYPE_CHECKING: - from ansible_collections.community.crypto.plugins.module_utils._acme.backends import ( + from ansible_collections.community.crypto.plugins.module_utils._acme.backends import ( # pragma: no cover CertificateInformation, CryptoBackend, ) diff --git a/plugins/module_utils/_argspec.py b/plugins/module_utils/_argspec.py index 2dbab70e..266345cb 100644 --- a/plugins/module_utils/_argspec.py +++ b/plugins/module_utils/_argspec.py @@ -45,7 +45,7 @@ class ArgumentSpec: self.required_if = _ensure_list(required_if) self.required_by = required_by or {} - def update_argspec(self, **kwargs) -> t.Self: + def update_argspec(self, **kwargs: t.Any) -> t.Self: self.argument_spec.update(kwargs) return self @@ -63,7 +63,7 @@ class ArgumentSpec: | None ) = None, required_by: dict[str, tuple[str, ...] | list[str]] | None = None, - ): + ) -> t.Self: if mutually_exclusive: self.mutually_exclusive.extend(mutually_exclusive) if required_together: diff --git a/plugins/module_utils/_crypto/_obj2txt.py b/plugins/module_utils/_crypto/_obj2txt.py index 5a356572..4e5768af 100644 --- a/plugins/module_utils/_crypto/_obj2txt.py +++ b/plugins/module_utils/_crypto/_obj2txt.py @@ -31,13 +31,15 @@ from __future__ import annotations +import typing as t + # WARNING: this function no longer works with cryptography 35.0.0 and newer! # It must **ONLY** be used in compatibility code for older # cryptography versions! -def obj2txt(openssl_lib, openssl_ffi, obj) -> str: +def obj2txt(openssl_lib: t.Any, openssl_ffi: t.Any, obj: t.Any) -> str: # Set to 80 on the recommendation of # https://www.openssl.org/docs/crypto/OBJ_nid2ln.html#return_values # diff --git a/plugins/module_utils/_crypto/_objects.py b/plugins/module_utils/_crypto/_objects.py index d15a4b5b..464983f4 100644 --- a/plugins/module_utils/_crypto/_objects.py +++ b/plugins/module_utils/_crypto/_objects.py @@ -19,7 +19,7 @@ NORMALIZE_NAMES_SHORT: dict[str, str] = {} for dotted, names in OID_MAP.items(): for name in names: if name in NORMALIZE_NAMES and OID_LOOKUP[name] != dotted: - raise AssertionError( + raise AssertionError( # pragma: no cover f'Name collision during setup: "{name}" for OIDs {dotted} and {OID_LOOKUP[name]}' ) NORMALIZE_NAMES[name] = names[0] @@ -27,7 +27,7 @@ for dotted, names in OID_MAP.items(): OID_LOOKUP[name] = dotted for alias, original in [("userID", "userId")]: if alias in NORMALIZE_NAMES: - raise AssertionError( + raise AssertionError( # pragma: no cover f'Name collision during adding aliases: "{alias}" (alias for "{original}") is already mapped to OID {OID_LOOKUP[alias]}' ) NORMALIZE_NAMES[alias] = original diff --git a/plugins/module_utils/_crypto/cryptography_crl.py b/plugins/module_utils/_crypto/cryptography_crl.py index a2158697..8821f2ae 100644 --- a/plugins/module_utils/_crypto/cryptography_crl.py +++ b/plugins/module_utils/_crypto/cryptography_crl.py @@ -34,7 +34,7 @@ from ansible_collections.community.crypto.plugins.module_utils._crypto.cryptogra if t.TYPE_CHECKING: - import datetime + import datetime # pragma: no cover # TODO: once cryptography has a _utc variant of InvalidityDate.invalidity_date, set this diff --git a/plugins/module_utils/_crypto/cryptography_support.py b/plugins/module_utils/_crypto/cryptography_support.py index 8c4c648a..2f2129dc 100644 --- a/plugins/module_utils/_crypto/cryptography_support.py +++ b/plugins/module_utils/_crypto/cryptography_support.py @@ -86,30 +86,33 @@ from ansible_collections.community.crypto.plugins.module_utils._crypto.basic imp if t.TYPE_CHECKING: - import datetime + import datetime # pragma: no cover - from cryptography.hazmat.primitives import hashes - from cryptography.hazmat.primitives.asymmetric.dh import DHPrivateKey, DHPublicKey - from cryptography.hazmat.primitives.asymmetric.dsa import ( + from cryptography.hazmat.primitives import hashes # pragma: no cover + from cryptography.hazmat.primitives.asymmetric.dh import ( # pragma: no cover + DHPrivateKey, + DHPublicKey, + ) + from cryptography.hazmat.primitives.asymmetric.dsa import ( # pragma: no cover DSAPrivateKey, DSAPublicKey, ) - from cryptography.hazmat.primitives.asymmetric.ec import ( + from cryptography.hazmat.primitives.asymmetric.ec import ( # pragma: no cover EllipticCurvePrivateKey, EllipticCurvePublicKey, ) - from cryptography.hazmat.primitives.asymmetric.rsa import ( + from cryptography.hazmat.primitives.asymmetric.rsa import ( # pragma: no cover RSAPrivateKey, RSAPublicKey, ) - from cryptography.hazmat.primitives.asymmetric.types import ( + from cryptography.hazmat.primitives.asymmetric.types import ( # pragma: no cover CertificateIssuerPrivateKeyTypes, CertificateIssuerPublicKeyTypes, CertificatePublicKeyTypes, PrivateKeyTypes, PublicKeyTypes, ) - from cryptography.hazmat.primitives.serialization.pkcs12 import ( + from cryptography.hazmat.primitives.serialization.pkcs12 import ( # pragma: no cover PKCS12KeyAndCertificates, ) @@ -117,13 +120,13 @@ if t.TYPE_CHECKING: CertificateIssuerPrivateKeyTypes, cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey, cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey, - ] + ] # pragma: no cover PublicKeyTypesWOEdwards = t.Union[ # pylint: disable=invalid-name DHPublicKey, DSAPublicKey, EllipticCurvePublicKey, RSAPublicKey - ] + ] # pragma: no cover PrivateKeyTypesWOEdwards = t.Union[ # pylint: disable=invalid-name DHPrivateKey, DSAPrivateKey, EllipticCurvePrivateKey, RSAPrivateKey - ] + ] # pragma: no cover else: PublicKeyTypesWOEdwards = None # pylint: disable=invalid-name PrivateKeyTypesWOEdwards = None # pylint: disable=invalid-name @@ -602,7 +605,7 @@ def cryptography_decode_name( Raises an OpenSSLObjectError if the name is not supported. """ if idn_rewrite not in ("ignore", "idna", "unicode"): - raise AssertionError( + raise AssertionError( # pragma: no cover 'idn_rewrite must be one of "ignore", "idna", or "unicode"' ) if isinstance(name, x509.DNSName): diff --git a/plugins/module_utils/_crypto/module_backends/certificate.py b/plugins/module_utils/_crypto/module_backends/certificate.py index c5f2c332..b3376ad8 100644 --- a/plugins/module_utils/_crypto/module_backends/certificate.py +++ b/plugins/module_utils/_crypto/module_backends/certificate.py @@ -38,13 +38,13 @@ from ansible_collections.community.crypto.plugins.module_utils._cryptography_dep if t.TYPE_CHECKING: - import datetime + import datetime # pragma: no cover - from ansible.module_utils.basic import AnsibleModule - from ansible_collections.community.crypto.plugins.module_utils._crypto.cryptography_support import ( + from ansible.module_utils.basic import AnsibleModule # pragma: no cover + from ansible_collections.community.crypto.plugins.module_utils._crypto.cryptography_support import ( # pragma: no cover CertificatePrivateKeyTypes, ) - from cryptography.hazmat.primitives.asymmetric.types import ( + from cryptography.hazmat.primitives.asymmetric.types import ( # pragma: no cover CertificateIssuerPrivateKeyTypes, ) @@ -170,11 +170,11 @@ class CertificateBackend(metaclass=abc.ABCMeta): def _check_privatekey(self) -> bool: """Check whether provided parameters match, assuming self.existing_certificate and self.privatekey have been populated.""" if self.existing_certificate is None: - raise AssertionError( + raise AssertionError( # pragma: no cover "Contract violation: existing_certificate has not been populated" ) if self.privatekey is None: - raise AssertionError( + raise AssertionError( # pragma: no cover "Contract violation: privatekey has not been populated" ) return cryptography_compare_public_keys( @@ -184,11 +184,13 @@ class CertificateBackend(metaclass=abc.ABCMeta): def _check_csr(self) -> bool: """Check whether provided parameters match, assuming self.existing_certificate and self.csr have been populated.""" if self.existing_certificate is None: - raise AssertionError( + raise AssertionError( # pragma: no cover "Contract violation: existing_certificate has not been populated" ) if self.csr is None: - raise AssertionError("Contract violation: csr has not been populated") + raise AssertionError( + "Contract violation: csr has not been populated" + ) # pragma: no cover # Verify that CSR is signed by certificate's private key if not self.csr.is_signature_valid: return False @@ -249,11 +251,13 @@ class CertificateBackend(metaclass=abc.ABCMeta): def _check_subject_key_identifier(self) -> bool: """Check whether Subject Key Identifier matches, assuming self.existing_certificate and self.csr have been populated.""" if self.existing_certificate is None: - raise AssertionError( + raise AssertionError( # pragma: no cover "Contract violation: existing_certificate has not been populated" ) if self.csr is None: - raise AssertionError("Contract violation: csr has not been populated") + raise AssertionError( + "Contract violation: csr has not been populated" + ) # pragma: no cover # Get hold of certificate's SKI try: ext = self.existing_certificate.extensions.get_extension_for_class( diff --git a/plugins/module_utils/_crypto/module_backends/certificate_acme.py b/plugins/module_utils/_crypto/module_backends/certificate_acme.py index 300efaa1..3811984b 100644 --- a/plugins/module_utils/_crypto/module_backends/certificate_acme.py +++ b/plugins/module_utils/_crypto/module_backends/certificate_acme.py @@ -22,8 +22,8 @@ from ansible_collections.community.crypto.plugins.module_utils._crypto.module_ba if t.TYPE_CHECKING: - from ansible.module_utils.basic import AnsibleModule - from ansible_collections.community.crypto.plugins.module_utils._argspec import ( + from ansible.module_utils.basic import AnsibleModule # pragma: no cover + from ansible_collections.community.crypto.plugins.module_utils._argspec import ( # pragma: no cover ArgumentSpec, ) @@ -100,7 +100,9 @@ class AcmeCertificateBackend(CertificateBackend): def get_certificate_data(self) -> bytes: """Return bytes for self.cert.""" if self.cert_bytes is None: - raise AssertionError("Contract violation: cert_bytes is None") + raise AssertionError( + "Contract violation: cert_bytes is None" + ) # pragma: no cover return self.cert_bytes def dump(self, *, include_certificate: bool) -> dict[str, t.Any]: diff --git a/plugins/module_utils/_crypto/module_backends/certificate_info.py b/plugins/module_utils/_crypto/module_backends/certificate_info.py index be5f5922..4e038eb0 100644 --- a/plugins/module_utils/_crypto/module_backends/certificate_info.py +++ b/plugins/module_utils/_crypto/module_backends/certificate_info.py @@ -38,21 +38,25 @@ from ansible_collections.community.crypto.plugins.module_utils._time import ( if t.TYPE_CHECKING: - import datetime + import datetime # pragma: no cover - from ansible.module_utils.basic import AnsibleModule - from ansible_collections.community.crypto.plugins.module_utils._argspec import ( + from ansible.module_utils.basic import AnsibleModule # pragma: no cover + from ansible_collections.community.crypto.plugins.module_utils._argspec import ( # pragma: no cover ArgumentSpec, ) - from ansible_collections.community.crypto.plugins.plugin_utils._action_module import ( + from ansible_collections.community.crypto.plugins.plugin_utils._action_module import ( # pragma: no cover AnsibleActionModule, ) - from ansible_collections.community.crypto.plugins.plugin_utils._filter_module import ( + from ansible_collections.community.crypto.plugins.plugin_utils._filter_module import ( # pragma: no cover FilterModuleMock, ) - from cryptography.hazmat.primitives.asymmetric.types import PublicKeyTypes + from cryptography.hazmat.primitives.asymmetric.types import ( + PublicKeyTypes, # pragma: no cover + ) - GeneralAnsibleModule = t.Union[AnsibleModule, AnsibleActionModule, FilterModuleMock] + GeneralAnsibleModule = t.Union[ + AnsibleModule, AnsibleActionModule, FilterModuleMock + ] # pragma: no cover MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION diff --git a/plugins/module_utils/_crypto/module_backends/certificate_ownca.py b/plugins/module_utils/_crypto/module_backends/certificate_ownca.py index d0fab608..d3d5d0a1 100644 --- a/plugins/module_utils/_crypto/module_backends/certificate_ownca.py +++ b/plugins/module_utils/_crypto/module_backends/certificate_ownca.py @@ -42,13 +42,13 @@ from ansible_collections.community.crypto.plugins.module_utils._time import ( if t.TYPE_CHECKING: - import datetime + import datetime # pragma: no cover - from ansible.module_utils.basic import AnsibleModule - from ansible_collections.community.crypto.plugins.module_utils._argspec import ( + from ansible.module_utils.basic import AnsibleModule # pragma: no cover + from ansible_collections.community.crypto.plugins.module_utils._argspec import ( # pragma: no cover ArgumentSpec, ) - from cryptography.hazmat.primitives.asymmetric.types import ( + from cryptography.hazmat.primitives.asymmetric.types import ( # pragma: no cover CertificateIssuerPrivateKeyTypes, ) @@ -157,7 +157,9 @@ class OwnCACertificateBackendCryptography(CertificateBackend): def generate_certificate(self) -> None: """(Re-)Generate certificate.""" if self.csr is None: - raise AssertionError("Contract violation: csr has not been populated") + raise AssertionError( + "Contract violation: csr has not been populated" + ) # pragma: no cover cert_builder = x509.CertificateBuilder() cert_builder = cert_builder.subject_name(self.csr.subject) cert_builder = cert_builder.issuer_name(self.ca_cert.subject) @@ -214,7 +216,9 @@ class OwnCACertificateBackendCryptography(CertificateBackend): def get_certificate_data(self) -> bytes: """Return bytes for self.cert.""" if self.cert is None: - raise AssertionError("Contract violation: cert has not been populated") + raise AssertionError( + "Contract violation: cert has not been populated" + ) # pragma: no cover return self.cert.public_bytes(Encoding.PEM) def needs_regeneration( diff --git a/plugins/module_utils/_crypto/module_backends/certificate_selfsigned.py b/plugins/module_utils/_crypto/module_backends/certificate_selfsigned.py index 8c9a4bf6..4374d4a4 100644 --- a/plugins/module_utils/_crypto/module_backends/certificate_selfsigned.py +++ b/plugins/module_utils/_crypto/module_backends/certificate_selfsigned.py @@ -36,13 +36,13 @@ from ansible_collections.community.crypto.plugins.module_utils._time import ( if t.TYPE_CHECKING: - import datetime + import datetime # pragma: no cover - from ansible.module_utils.basic import AnsibleModule - from ansible_collections.community.crypto.plugins.module_utils._argspec import ( + from ansible.module_utils.basic import AnsibleModule # pragma: no cover + from ansible_collections.community.crypto.plugins.module_utils._argspec import ( # pragma: no cover ArgumentSpec, ) - from cryptography.hazmat.primitives.asymmetric.types import ( + from cryptography.hazmat.primitives.asymmetric.types import ( # pragma: no cover CertificateIssuerPrivateKeyTypes, ) @@ -114,9 +114,11 @@ class SelfSignedCertificateBackendCryptography(CertificateBackend): def generate_certificate(self) -> None: """(Re-)Generate certificate.""" if self.csr is None: - raise AssertionError("Contract violation: csr has not been populated") - if self.privatekey is None: raise AssertionError( + "Contract violation: csr has not been populated" + ) # pragma: no cover + if self.privatekey is None: + raise AssertionError( # pragma: no cover "Contract violation: privatekey has not been populated" ) try: @@ -156,7 +158,9 @@ class SelfSignedCertificateBackendCryptography(CertificateBackend): def get_certificate_data(self) -> bytes: """Return bytes for self.cert.""" if self.cert is None: - raise AssertionError("Contract violation: cert has not been populated") + raise AssertionError( + "Contract violation: cert has not been populated" + ) # pragma: no cover return self.cert.public_bytes(Encoding.PEM) def needs_regeneration( diff --git a/plugins/module_utils/_crypto/module_backends/crl_info.py b/plugins/module_utils/_crypto/module_backends/crl_info.py index fb902137..a6afc3cc 100644 --- a/plugins/module_utils/_crypto/module_backends/crl_info.py +++ b/plugins/module_utils/_crypto/module_backends/crl_info.py @@ -28,18 +28,20 @@ from ansible_collections.community.crypto.plugins.module_utils._cryptography_dep if t.TYPE_CHECKING: - from ansible.module_utils.basic import AnsibleModule - from ansible_collections.community.crypto.plugins.plugin_utils._action_module import ( + from ansible.module_utils.basic import AnsibleModule # pragma: no cover + from ansible_collections.community.crypto.plugins.plugin_utils._action_module import ( # pragma: no cover AnsibleActionModule, ) - from ansible_collections.community.crypto.plugins.plugin_utils._filter_module import ( + from ansible_collections.community.crypto.plugins.plugin_utils._filter_module import ( # pragma: no cover FilterModuleMock, ) - from cryptography.hazmat.primitives.asymmetric.types import ( + from cryptography.hazmat.primitives.asymmetric.types import ( # pragma: no cover PrivateKeyTypes, ) - GeneralAnsibleModule = t.Union[AnsibleModule, AnsibleActionModule, FilterModuleMock] + GeneralAnsibleModule = t.Union[ + AnsibleModule, AnsibleActionModule, FilterModuleMock + ] # pragma: no cover # crypto_utils diff --git a/plugins/module_utils/_crypto/module_backends/csr.py b/plugins/module_utils/_crypto/module_backends/csr.py index 37c9d23b..56501dd9 100644 --- a/plugins/module_utils/_crypto/module_backends/csr.py +++ b/plugins/module_utils/_crypto/module_backends/csr.py @@ -48,16 +48,16 @@ from ansible_collections.community.crypto.plugins.module_utils._cryptography_dep if t.TYPE_CHECKING: - from ansible.module_utils.basic import AnsibleModule - from ansible_collections.community.crypto.plugins.module_utils._crypto.cryptography_support import ( + from ansible.module_utils.basic import AnsibleModule # pragma: no cover + from ansible_collections.community.crypto.plugins.module_utils._crypto.cryptography_support import ( # pragma: no cover CertificatePrivateKeyTypes, ) - from cryptography.hazmat.primitives.asymmetric.types import ( + from cryptography.hazmat.primitives.asymmetric.types import ( # pragma: no cover CertificateIssuerPrivateKeyTypes, PrivateKeyTypes, ) - _ET = t.TypeVar("_ET", bound="cryptography.x509.ExtensionType") + _ET = t.TypeVar("_ET", bound="cryptography.x509.ExtensionType") # pragma: no cover MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION @@ -453,7 +453,9 @@ class CertificateSigningRequestBackend: def get_csr_data(self) -> bytes: """Return bytes for self.csr.""" if self.csr is None: - raise AssertionError("Violated contract: csr is not populated") + raise AssertionError( + "Violated contract: csr is not populated" + ) # pragma: no cover return self.csr.public_bytes( cryptography.hazmat.primitives.serialization.Encoding.PEM ) @@ -485,9 +487,13 @@ class CertificateSigningRequestBackend: def _check_csr(self) -> bool: """Check whether provided parameters, assuming self.existing_csr and self.privatekey have been populated.""" if self.existing_csr is None: - raise AssertionError("Violated contract: existing_csr is not populated") + raise AssertionError( + "Violated contract: existing_csr is not populated" + ) # pragma: no cover if self.privatekey is None: - raise AssertionError("Violated contract: privatekey is not populated") + raise AssertionError( + "Violated contract: privatekey is not populated" + ) # pragma: no cover def _check_subject(csr: cryptography.x509.CertificateSigningRequest) -> bool: subject = [ diff --git a/plugins/module_utils/_crypto/module_backends/csr_info.py b/plugins/module_utils/_crypto/module_backends/csr_info.py index a38c2405..a0726f2d 100644 --- a/plugins/module_utils/_crypto/module_backends/csr_info.py +++ b/plugins/module_utils/_crypto/module_backends/csr_info.py @@ -31,19 +31,21 @@ from ansible_collections.community.crypto.plugins.module_utils._cryptography_dep if t.TYPE_CHECKING: - from ansible.module_utils.basic import AnsibleModule - from ansible_collections.community.crypto.plugins.plugin_utils._action_module import ( + from ansible.module_utils.basic import AnsibleModule # pragma: no cover + from ansible_collections.community.crypto.plugins.plugin_utils._action_module import ( # pragma: no cover AnsibleActionModule, ) - from ansible_collections.community.crypto.plugins.plugin_utils._filter_module import ( + from ansible_collections.community.crypto.plugins.plugin_utils._filter_module import ( # pragma: no cover FilterModuleMock, ) - from cryptography.hazmat.primitives.asymmetric.types import ( + from cryptography.hazmat.primitives.asymmetric.types import ( # pragma: no cover CertificatePublicKeyTypes, PrivateKeyTypes, ) - GeneralAnsibleModule = t.Union[AnsibleModule, AnsibleActionModule, FilterModuleMock] + GeneralAnsibleModule = t.Union[ + AnsibleModule, AnsibleActionModule, FilterModuleMock + ] # pragma: no cover MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION diff --git a/plugins/module_utils/_crypto/module_backends/privatekey.py b/plugins/module_utils/_crypto/module_backends/privatekey.py index 9b9b3ed1..c4cd5ecd 100644 --- a/plugins/module_utils/_crypto/module_backends/privatekey.py +++ b/plugins/module_utils/_crypto/module_backends/privatekey.py @@ -37,15 +37,17 @@ from ansible_collections.community.crypto.plugins.module_utils._cryptography_dep if t.TYPE_CHECKING: - from ansible.module_utils.basic import AnsibleModule - from ansible_collections.community.crypto.plugins.plugin_utils._action_module import ( + from ansible.module_utils.basic import AnsibleModule # pragma: no cover + from ansible_collections.community.crypto.plugins.plugin_utils._action_module import ( # pragma: no cover AnsibleActionModule, ) - from cryptography.hazmat.primitives.asymmetric.types import ( + from cryptography.hazmat.primitives.asymmetric.types import ( # pragma: no cover PrivateKeyTypes, ) - GeneralAnsibleModule = t.Union[AnsibleModule, AnsibleActionModule] + GeneralAnsibleModule = t.Union[ + AnsibleModule, AnsibleActionModule + ] # pragma: no cover MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION @@ -267,7 +269,7 @@ class PrivateKeyBackend: def get_private_key_data(self) -> bytes: """Return bytes for self.private_key""" if self.private_key is None: - raise AssertionError("private_key not set") + raise AssertionError("private_key not set") # pragma: no cover # Select export format and encoding try: export_format_txt = self._get_wanted_format() @@ -341,7 +343,9 @@ class PrivateKeyBackend: def _load_privatekey(self) -> PrivateKeyTypes: data = self.existing_private_key_bytes if data is None: - raise AssertionError("existing_private_key_bytes not set") + raise AssertionError( + "existing_private_key_bytes not set" + ) # pragma: no cover try: # Interpret bytes depending on format. key_format = identify_private_key_format(data) @@ -388,7 +392,9 @@ class PrivateKeyBackend: def _check_passphrase(self) -> bool: """Check whether provided passphrase matches, assuming self.existing_private_key_bytes has been populated.""" if self.existing_private_key_bytes is None: - raise AssertionError("existing_private_key_bytes not set") + raise AssertionError( + "existing_private_key_bytes not set" + ) # pragma: no cover try: key_format = identify_private_key_format(self.existing_private_key_bytes) if key_format == "raw": @@ -460,7 +466,9 @@ class PrivateKeyBackend: def _check_format(self) -> bool: """Check whether the key file format, assuming self.existing_private_key and self.existing_private_key_bytes has been populated.""" if self.existing_private_key_bytes is None: - raise AssertionError("existing_private_key_bytes not set") + raise AssertionError( + "existing_private_key_bytes not set" + ) # pragma: no cover if self.format == "auto_ignore": return True try: diff --git a/plugins/module_utils/_crypto/module_backends/privatekey_convert.py b/plugins/module_utils/_crypto/module_backends/privatekey_convert.py index b4a22d1e..3bd0f718 100644 --- a/plugins/module_utils/_crypto/module_backends/privatekey_convert.py +++ b/plugins/module_utils/_crypto/module_backends/privatekey_convert.py @@ -31,8 +31,8 @@ from ansible_collections.community.crypto.plugins.module_utils._io import load_f if t.TYPE_CHECKING: - from ansible.module_utils.basic import AnsibleModule - from cryptography.hazmat.primitives.asymmetric.types import ( + from ansible.module_utils.basic import AnsibleModule # pragma: no cover + from cryptography.hazmat.primitives.asymmetric.types import ( # pragma: no cover PrivateKeyTypes, ) @@ -81,7 +81,7 @@ class PrivateKeyConvertBackend: self.src_private_key_bytes = load_file(path=self.src_path, module=module) else: if self.src_content is None: - raise AssertionError("src_content is None") + raise AssertionError("src_content is None") # pragma: no cover self.src_private_key_bytes = self.src_content.encode("utf-8") self.dest_private_key: PrivateKeyTypes | None = None @@ -90,7 +90,7 @@ class PrivateKeyConvertBackend: def get_private_key_data(self) -> bytes: """Return bytes for self.src_private_key in output format""" if self.src_private_key is None: - raise AssertionError("src_private_key not set") + raise AssertionError("src_private_key not set") # pragma: no cover # Select export format and encoding try: export_encoding = cryptography.hazmat.primitives.serialization.Encoding.PEM diff --git a/plugins/module_utils/_crypto/module_backends/privatekey_info.py b/plugins/module_utils/_crypto/module_backends/privatekey_info.py index 23ff2444..85a36b49 100644 --- a/plugins/module_utils/_crypto/module_backends/privatekey_info.py +++ b/plugins/module_utils/_crypto/module_backends/privatekey_info.py @@ -33,18 +33,20 @@ from ansible_collections.community.crypto.plugins.module_utils._cryptography_dep if t.TYPE_CHECKING: - from ansible.module_utils.basic import AnsibleModule - from ansible_collections.community.crypto.plugins.plugin_utils._action_module import ( + from ansible.module_utils.basic import AnsibleModule # pragma: no cover + from ansible_collections.community.crypto.plugins.plugin_utils._action_module import ( # pragma: no cover AnsibleActionModule, ) - from ansible_collections.community.crypto.plugins.plugin_utils._filter_module import ( + from ansible_collections.community.crypto.plugins.plugin_utils._filter_module import ( # pragma: no cover FilterModuleMock, ) - from cryptography.hazmat.primitives.asymmetric.types import ( + from cryptography.hazmat.primitives.asymmetric.types import ( # pragma: no cover PrivateKeyTypes, ) - GeneralAnsibleModule = t.Union[AnsibleModule, AnsibleActionModule, FilterModuleMock] + GeneralAnsibleModule = t.Union[ + AnsibleModule, AnsibleActionModule, FilterModuleMock + ] # pragma: no cover MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION diff --git a/plugins/module_utils/_crypto/module_backends/publickey_info.py b/plugins/module_utils/_crypto/module_backends/publickey_info.py index a07b8412..38ca68b7 100644 --- a/plugins/module_utils/_crypto/module_backends/publickey_info.py +++ b/plugins/module_utils/_crypto/module_backends/publickey_info.py @@ -23,18 +23,20 @@ from ansible_collections.community.crypto.plugins.module_utils._cryptography_dep if t.TYPE_CHECKING: - from ansible.module_utils.basic import AnsibleModule - from ansible_collections.community.crypto.plugins.plugin_utils._action_module import ( + from ansible.module_utils.basic import AnsibleModule # pragma: no cover + from ansible_collections.community.crypto.plugins.plugin_utils._action_module import ( # pragma: no cover AnsibleActionModule, ) - from ansible_collections.community.crypto.plugins.plugin_utils._filter_module import ( + from ansible_collections.community.crypto.plugins.plugin_utils._filter_module import ( # pragma: no cover FilterModuleMock, ) - from cryptography.hazmat.primitives.asymmetric.types import ( + from cryptography.hazmat.primitives.asymmetric.types import ( # pragma: no cover PublicKeyTypes, ) - GeneralAnsibleModule = t.Union[AnsibleModule, AnsibleActionModule, FilterModuleMock] + GeneralAnsibleModule = t.Union[ + AnsibleModule, AnsibleActionModule, FilterModuleMock + ] # pragma: no cover MINIMAL_CRYPTOGRAPHY_VERSION = COLLECTION_MINIMUM_CRYPTOGRAPHY_VERSION @@ -119,7 +121,7 @@ class PublicKeyInfoRetrieval: def _get_public_key(self, binary: bool) -> bytes: if self.key is None: - raise AssertionError("key must be set") + raise AssertionError("key must be set") # pragma: no cover return self.key.public_bytes( serialization.Encoding.DER if binary else serialization.Encoding.PEM, serialization.PublicFormat.SubjectPublicKeyInfo, @@ -127,7 +129,7 @@ class PublicKeyInfoRetrieval: def _get_key_info(self) -> tuple[str, dict[str, t.Any]]: if self.key is None: - raise AssertionError("key must be set") + raise AssertionError("key must be set") # pragma: no cover return _get_cryptography_public_key_info(self.key) def get_info(self, *, prefer_one_fingerprint: bool = False) -> dict[str, t.Any]: diff --git a/plugins/module_utils/_crypto/support.py b/plugins/module_utils/_crypto/support.py index ae6397b5..8e473ffe 100644 --- a/plugins/module_utils/_crypto/support.py +++ b/plugins/module_utils/_crypto/support.py @@ -38,11 +38,11 @@ from ansible_collections.community.crypto.plugins.module_utils._crypto.basic imp if t.TYPE_CHECKING: - from ansible.module_utils.basic import AnsibleModule - from ansible_collections.community.crypto.plugins.module_utils._crypto.cryptography_support import ( + from ansible.module_utils.basic import AnsibleModule # pragma: no cover + from ansible_collections.community.crypto.plugins.module_utils._crypto.cryptography_support import ( # pragma: no cover CertificatePrivateKeyTypes, ) - from cryptography.hazmat.primitives.asymmetric.types import ( + from cryptography.hazmat.primitives.asymmetric.types import ( # pragma: no cover CertificateIssuerPrivateKeyTypes, PrivateKeyTypes, PublicKeyTypes, diff --git a/plugins/module_utils/_cryptography_dep.py b/plugins/module_utils/_cryptography_dep.py index 5f67d3bf..5150c3bf 100644 --- a/plugins/module_utils/_cryptography_dep.py +++ b/plugins/module_utils/_cryptography_dep.py @@ -23,15 +23,17 @@ from ansible_collections.community.crypto.plugins.module_utils._version import ( if t.TYPE_CHECKING: - from ansible.module_utils.basic import AnsibleModule - from ansible_collections.community.crypto.plugins.plugin_utils._action_module import ( + from ansible.module_utils.basic import AnsibleModule # pragma: no cover + from ansible_collections.community.crypto.plugins.plugin_utils._action_module import ( # pragma: no cover AnsibleActionModule, ) - from ansible_collections.community.crypto.plugins.plugin_utils._filter_module import ( + from ansible_collections.community.crypto.plugins.plugin_utils._filter_module import ( # pragma: no cover FilterModuleMock, ) - GeneralAnsibleModule = t.Union[AnsibleModule, AnsibleActionModule, FilterModuleMock] + GeneralAnsibleModule = t.Union[ + AnsibleModule, AnsibleActionModule, FilterModuleMock + ] # pragma: no cover _CRYPTOGRAPHY_IMP_ERR: str | None = None diff --git a/plugins/module_utils/_io.py b/plugins/module_utils/_io.py index 149caf21..14c9e168 100644 --- a/plugins/module_utils/_io.py +++ b/plugins/module_utils/_io.py @@ -14,7 +14,7 @@ import typing as t if t.TYPE_CHECKING: - from ansible.module_utils.basic import AnsibleModule + from ansible.module_utils.basic import AnsibleModule # pragma: no cover def load_file(*, path: str | os.PathLike, module: AnsibleModule | None = None) -> bytes: diff --git a/plugins/module_utils/_openssh/backends/common.py b/plugins/module_utils/_openssh/backends/common.py index f146ca4b..06b991bc 100644 --- a/plugins/module_utils/_openssh/backends/common.py +++ b/plugins/module_utils/_openssh/backends/common.py @@ -19,23 +19,26 @@ from ansible_collections.community.crypto.plugins.module_utils._openssh.utils im if t.TYPE_CHECKING: - from ansible.module_utils.basic import AnsibleModule - from ansible_collections.community.crypto.plugins.module_utils._openssh.certificate import ( + from ansible.module_utils.basic import AnsibleModule # pragma: no cover + from ansible_collections.community.crypto.plugins.module_utils._openssh.certificate import ( # pragma: no cover OpensshCertificateTimeParameters, ) - from cryptography.hazmat.primitives.asymmetric.types import ( + from cryptography.hazmat.primitives.asymmetric.types import ( # pragma: no cover CertificateIssuerPrivateKeyTypes, PrivateKeyTypes, ) - Param = t.ParamSpec("Param") + Param = t.ParamSpec("Param") # pragma: no cover def restore_on_failure( f: t.Callable[t.Concatenate[AnsibleModule, str | os.PathLike, Param], None], ) -> t.Callable[t.Concatenate[AnsibleModule, str | os.PathLike, Param], None]: def backup_and_restore( - module: AnsibleModule, path: str | os.PathLike, *args, **kwargs + module: AnsibleModule, + path: str | os.PathLike, + *args: Param.args, + **kwargs: Param.kwargs, ) -> None: backup_file = module.backup_local(path) if os.path.exists(path) else None @@ -74,8 +77,8 @@ def _restore_all_on_failure( def backup_and_restore( self: OpensshModule, sources_and_destinations: list[tuple[str | os.PathLike, str | os.PathLike]], - *args, - **kwargs, + *args: Param.args, + **kwargs: Param.kwargs, ) -> None: backups = [ (d, self.module.backup_local(d)) @@ -97,6 +100,9 @@ def _restore_all_on_failure( return backup_and_restore +_OpensshModule = t.TypeVar("_OpensshModule", bound="OpensshModule") + + class OpensshModule(metaclass=abc.ABCMeta): def __init__(self, *, module: AnsibleModule) -> None: self.module = module @@ -141,16 +147,24 @@ class OpensshModule(metaclass=abc.ABCMeta): pass @staticmethod - def skip_if_check_mode(f: t.Callable[Param, None]) -> t.Callable[Param, None]: - def wrapper(self, *args, **kwargs) -> None: + def skip_if_check_mode( + f: t.Callable[t.Concatenate[_OpensshModule, Param], None], + ) -> t.Callable[t.Concatenate[_OpensshModule, Param], None]: + def wrapper( + self: _OpensshModule, *args: Param.args, **kwargs: Param.kwargs + ) -> None: if not self.check_mode: f(self, *args, **kwargs) return wrapper # type: ignore @staticmethod - def trigger_change(f: t.Callable[Param, None]) -> t.Callable[Param, None]: - def wrapper(self, *args, **kwargs) -> None: + def trigger_change( + f: t.Callable[t.Concatenate[_OpensshModule, Param], None], + ) -> t.Callable[t.Concatenate[_OpensshModule, Param], None]: + def wrapper( + self: _OpensshModule, *args: Param.args, **kwargs: Param.kwargs + ) -> None: f(self, *args, **kwargs) self.changed = True @@ -202,6 +216,13 @@ class OpensshModule(metaclass=abc.ABCMeta): self.changed = True +if t.TYPE_CHECKING: + + class _RunCommandKwarg(t.TypedDict): + check_rc: t.NotRequired[bool] + environ_update: t.NotRequired[dict[str, str] | None] + + class KeygenCommand: def __init__(self, module: AnsibleModule) -> None: self._bin_path = module.get_bin_path("ssh-keygen", True) @@ -221,7 +242,7 @@ class KeygenCommand: cert_type: t.Literal["host", "user"] | None, time_parameters: OpensshCertificateTimeParameters, use_agent: bool, - **kwargs, + **kwargs: t.Unpack[_RunCommandKwarg], ) -> tuple[int, str, str]: args = [self._bin_path, "-s", signing_key_path, "-P", "", "-I", identifier] @@ -253,7 +274,7 @@ class KeygenCommand: size: int, key_type: str, comment: str | None, - **kwargs, + **kwargs: t.Unpack[_RunCommandKwarg], ) -> tuple[int, str, str]: args = [ self._bin_path, @@ -276,21 +297,21 @@ class KeygenCommand: return self._run_command(args, data=data, **kwargs) def get_certificate_info( - self, *, certificate_path: str, **kwargs + self, *, certificate_path: str, **kwargs: t.Unpack[_RunCommandKwarg] ) -> tuple[int, str, str]: return self._run_command( [self._bin_path, "-L", "-f", certificate_path], **kwargs ) def get_matching_public_key( - self, *, private_key_path: str, **kwargs + self, *, private_key_path: str, **kwargs: t.Unpack[_RunCommandKwarg] ) -> tuple[int, str, str]: return self._run_command( [self._bin_path, "-P", "", "-y", "-f", private_key_path], **kwargs ) def get_private_key( - self, *, private_key_path: str, **kwargs + self, *, private_key_path: str, **kwargs: t.Unpack[_RunCommandKwarg] ) -> tuple[int, str, str]: return self._run_command( [self._bin_path, "-l", "-f", private_key_path], **kwargs @@ -302,7 +323,7 @@ class KeygenCommand: private_key_path: str, comment: str, force_new_format: bool = True, - **kwargs, + **kwargs: t.Unpack[_RunCommandKwarg], ) -> tuple[int, str, str]: if os.path.exists(private_key_path) and not os.access( private_key_path, os.W_OK diff --git a/plugins/module_utils/_openssh/backends/keypair_backend.py b/plugins/module_utils/_openssh/backends/keypair_backend.py index 22e4ea1b..362139f8 100644 --- a/plugins/module_utils/_openssh/backends/keypair_backend.py +++ b/plugins/module_utils/_openssh/backends/keypair_backend.py @@ -44,8 +44,8 @@ from ansible_collections.community.crypto.plugins.module_utils._version import ( if t.TYPE_CHECKING: - from ansible.module_utils.basic import AnsibleModule - from cryptography.hazmat.primitives.asymmetric.types import ( + from ansible.module_utils.basic import AnsibleModule # pragma: no cover + from cryptography.hazmat.primitives.asymmetric.types import ( # pragma: no cover CertificateIssuerPrivateKeyTypes, PrivateKeyTypes, ) diff --git a/plugins/module_utils/_openssh/certificate.py b/plugins/module_utils/_openssh/certificate.py index 21478145..e5cfb17a 100644 --- a/plugins/module_utils/_openssh/certificate.py +++ b/plugins/module_utils/_openssh/certificate.py @@ -31,13 +31,13 @@ from ansible_collections.community.crypto.plugins.module_utils._time import ( if t.TYPE_CHECKING: - from ansible_collections.community.crypto.plugins.module_utils._openssh.cryptography import ( + from ansible_collections.community.crypto.plugins.module_utils._openssh.cryptography import ( # pragma: no cover KeyType, ) - DateFormat = t.Literal["human_readable", "openssh", "timestamp"] - DateFormatStr = t.Literal["human_readable", "openssh"] - DateFormatInt = t.Literal["timestamp"] + DateFormat = t.Literal["human_readable", "openssh", "timestamp"] # pragma: no cover + DateFormatStr = t.Literal["human_readable", "openssh"] # pragma: no cover + DateFormatInt = t.Literal["timestamp"] # pragma: no cover else: KeyType = None # pylint: disable=invalid-name @@ -338,6 +338,22 @@ class OpensshCertificateOption: ) +if t.TYPE_CHECKING: + + class _OpensshCertificateInfoKwarg(t.TypedDict): + nonce: t.NotRequired[bytes | None] + serial: t.NotRequired[int | None] + cert_type: t.NotRequired[int | None] + key_id: t.NotRequired[bytes | None] + principals: t.NotRequired[list[bytes] | None] + valid_after: t.NotRequired[int | None] + valid_before: t.NotRequired[int | None] + critical_options: t.NotRequired[list[tuple[bytes, bytes]] | None] + extensions: t.NotRequired[list[tuple[bytes, bytes]] | None] + reserved: t.NotRequired[bytes | None] + signing_key: t.NotRequired[bytes | None] + + class OpensshCertificateInfo(metaclass=abc.ABCMeta): """Encapsulates all certificate information which is signed by a CA key""" @@ -402,7 +418,13 @@ class OpensshCertificateInfo(metaclass=abc.ABCMeta): class OpensshRSACertificateInfo(OpensshCertificateInfo): - def __init__(self, *, e: int | None = None, n: int | None = None, **kwargs) -> None: + def __init__( + self, + *, + e: int | None = None, + n: int | None = None, + **kwargs: t.Unpack[_OpensshCertificateInfoKwarg], + ) -> None: super().__init__(**kwargs) self.type_string = _SSH_TYPE_STRINGS["rsa"] + _CERT_SUFFIX_V01 self.e = e @@ -433,7 +455,7 @@ class OpensshDSACertificateInfo(OpensshCertificateInfo): q: int | None = None, g: int | None = None, y: int | None = None, - **kwargs, + **kwargs: t.Unpack[_OpensshCertificateInfoKwarg], ) -> None: super().__init__(**kwargs) self.type_string = _SSH_TYPE_STRINGS["dsa"] + _CERT_SUFFIX_V01 @@ -465,7 +487,11 @@ class OpensshDSACertificateInfo(OpensshCertificateInfo): class OpensshECDSACertificateInfo(OpensshCertificateInfo): def __init__( - self, *, curve: bytes | None = None, public_key: bytes | None = None, **kwargs + self, + *, + curve: bytes | None = None, + public_key: bytes | None = None, + **kwargs: t.Unpack[_OpensshCertificateInfoKwarg], ): super().__init__(**kwargs) self._curve: bytes | None = None @@ -509,7 +535,12 @@ class OpensshECDSACertificateInfo(OpensshCertificateInfo): class OpensshED25519CertificateInfo(OpensshCertificateInfo): - def __init__(self, *, pk: bytes | None = None, **kwargs) -> None: + def __init__( + self, + *, + pk: bytes | None = None, + **kwargs: t.Unpack[_OpensshCertificateInfoKwarg], + ) -> None: super().__init__(**kwargs) self.type_string = _SSH_TYPE_STRINGS["ed25519"] + _CERT_SUFFIX_V01 self.pk = pk diff --git a/plugins/module_utils/_openssh/cryptography.py b/plugins/module_utils/_openssh/cryptography.py index bde1bdf8..2c7bb647 100644 --- a/plugins/module_utils/_openssh/cryptography.py +++ b/plugins/module_utils/_openssh/cryptography.py @@ -75,22 +75,22 @@ from ansible_collections.community.crypto.plugins.module_utils._crypto.cryptogra if t.TYPE_CHECKING: - KeyFormat = t.Literal["SSH", "PKCS8", "PKCS1"] - KeySerializationFormat = t.Literal["PEM", "DER", "SSH"] - KeyType = t.Literal["rsa", "dsa", "ed25519", "ecdsa"] + KeyFormat = t.Literal["SSH", "PKCS8", "PKCS1"] # pragma: no cover + KeySerializationFormat = t.Literal["PEM", "DER", "SSH"] # pragma: no cover + KeyType = t.Literal["rsa", "dsa", "ed25519", "ecdsa"] # pragma: no cover PrivateKeyTypes = t.Union[ rsa.RSAPrivateKey, dsa.DSAPrivateKey, ec.EllipticCurvePrivateKey, Ed25519PrivateKey, - ] + ] # pragma: no cover PublicKeyTypes = t.Union[ rsa.RSAPublicKey, dsa.DSAPublicKey, ec.EllipticCurvePublicKey, Ed25519PublicKey - ] + ] # pragma: no cover from cryptography.hazmat.primitives.asymmetric.types import ( - PublicKeyTypes as AllPublicKeyTypes, + PublicKeyTypes as AllPublicKeyTypes, # pragma: no cover ) diff --git a/plugins/modules/acme_account.py b/plugins/modules/acme_account.py index b4716f91..e1980bfa 100644 --- a/plugins/modules/acme_account.py +++ b/plugins/modules/acme_account.py @@ -252,7 +252,7 @@ def main() -> t.NoReturn: if client.account_key_data: diff_before["public_account_key"] = client.account_key_data["jwk"] if created: - raise AssertionError("Unwanted account creation") + raise AssertionError("Unwanted account creation") # pragma: no cover if account_data is not None: # Account is not yet deactivated if not module.check_mode: @@ -310,7 +310,7 @@ def main() -> t.NoReturn: # Verify that the account exists and has not been deactivated created, account_data = account.setup_account(allow_creation=False) if created: - raise AssertionError("Unwanted account creation") + raise AssertionError("Unwanted account creation") # pragma: no cover if account_data is None: raise ModuleFailException( msg="Account does not exist or is deactivated." diff --git a/plugins/modules/acme_account_info.py b/plugins/modules/acme_account_info.py index 3390c1e8..e37a0007 100644 --- a/plugins/modules/acme_account_info.py +++ b/plugins/modules/acme_account_info.py @@ -224,7 +224,7 @@ from ansible_collections.community.crypto.plugins.module_utils._acme.utils impor if t.TYPE_CHECKING: - from ansible.module_utils.basic import AnsibleModule + from ansible.module_utils.basic import AnsibleModule # pragma: no cover def get_orders_list( @@ -310,7 +310,7 @@ def main() -> t.NoReturn: remove_account_uri_if_not_exists=True, ) if created: - raise AssertionError("Unwanted account creation") + raise AssertionError("Unwanted account creation") # pragma: no cover result: dict[str, t.Any] = { "changed": False, "exists": False, diff --git a/plugins/modules/acme_certificate.py b/plugins/modules/acme_certificate.py index 1276ab0e..4e054083 100644 --- a/plugins/modules/acme_certificate.py +++ b/plugins/modules/acme_certificate.py @@ -594,12 +594,12 @@ from ansible_collections.community.crypto.plugins.module_utils._acme.utils impor if t.TYPE_CHECKING: - from ansible.module_utils.basic import AnsibleModule - from ansible_collections.community.crypto.plugins.module_utils._acme.backends import ( + from ansible.module_utils.basic import AnsibleModule # pragma: no cover + from ansible_collections.community.crypto.plugins.module_utils._acme.backends import ( # pragma: no cover CertificateInformation, CryptoBackend, ) - from ansible_collections.community.crypto.plugins.module_utils._acme.challenges import ( + from ansible_collections.community.crypto.plugins.module_utils._acme.challenges import ( # pragma: no cover Authorization, ) diff --git a/plugins/modules/acme_certificate_order_finalize.py b/plugins/modules/acme_certificate_order_finalize.py index ea42333c..6db4419d 100644 --- a/plugins/modules/acme_certificate_order_finalize.py +++ b/plugins/modules/acme_certificate_order_finalize.py @@ -332,7 +332,7 @@ from ansible_collections.community.crypto.plugins.module_utils._acme.errors impo if t.TYPE_CHECKING: - from ansible_collections.community.crypto.plugins.module_utils._acme.certificates import ( + from ansible_collections.community.crypto.plugins.module_utils._acme.certificates import ( # pragma: no cover CertificateChain, ) diff --git a/plugins/modules/acme_certificate_order_validate.py b/plugins/modules/acme_certificate_order_validate.py index 487e6135..9dfbefad 100644 --- a/plugins/modules/acme_certificate_order_validate.py +++ b/plugins/modules/acme_certificate_order_validate.py @@ -244,7 +244,7 @@ from ansible_collections.community.crypto.plugins.module_utils._acme.errors impo if t.TYPE_CHECKING: - from ansible_collections.community.crypto.plugins.module_utils._acme.challenges import ( + from ansible_collections.community.crypto.plugins.module_utils._acme.challenges import ( # pragma: no cover Authorization, ) diff --git a/plugins/modules/acme_certificate_renewal_info.py b/plugins/modules/acme_certificate_renewal_info.py index 77b66fa1..bd57b63c 100644 --- a/plugins/modules/acme_certificate_renewal_info.py +++ b/plugins/modules/acme_certificate_renewal_info.py @@ -206,7 +206,7 @@ def main() -> t.NoReturn: "supports_ari": False, } - def complete(should_renew: bool, **kwargs) -> t.NoReturn: + def complete(should_renew: bool, **kwargs: t.Any) -> t.NoReturn: result["should_renew"] = should_renew result.update(kwargs) module.exit_json(**result) diff --git a/plugins/modules/acme_certificate_revoke.py b/plugins/modules/acme_certificate_revoke.py index a6a42ee9..c2a54af2 100644 --- a/plugins/modules/acme_certificate_revoke.py +++ b/plugins/modules/acme_certificate_revoke.py @@ -205,7 +205,7 @@ def main() -> t.NoReturn: # Step 1: get hold of account URI created, account_data = account.setup_account(allow_creation=False) if created: - raise AssertionError("Unwanted account creation") + raise AssertionError("Unwanted account creation") # pragma: no cover if account_data is None: raise ModuleFailException( msg="Account does not exist or is deactivated." diff --git a/plugins/modules/certificate_complete_chain.py b/plugins/modules/certificate_complete_chain.py index 06c1733c..63eca227 100644 --- a/plugins/modules/certificate_complete_chain.py +++ b/plugins/modules/certificate_complete_chain.py @@ -179,7 +179,7 @@ def is_parent( public_key, cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey ): if cert.cert.signature_hash_algorithm is None: - raise AssertionError( + raise AssertionError( # pragma: no cover "signature_hash_algorithm should be present for RSA certificates" ) public_key.verify( @@ -193,7 +193,7 @@ def is_parent( cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey, ): if cert.cert.signature_hash_algorithm is None: - raise AssertionError( + raise AssertionError( # pragma: no cover "signature_hash_algorithm should be present for EC certificates" ) public_key.verify( diff --git a/plugins/modules/openssh_cert.py b/plugins/modules/openssh_cert.py index 2ad5b5f4..5310fe22 100644 --- a/plugins/modules/openssh_cert.py +++ b/plugins/modules/openssh_cert.py @@ -421,11 +421,17 @@ class Certificate(OpensshModule): def _is_fully_valid(self) -> bool: if self.original_data is None: - raise AssertionError("Contract violation original_data not provided") + raise AssertionError( + "Contract violation original_data not provided" + ) # pragma: no cover if self.public_key is None: - raise AssertionError("Contract violation public_key not provided") + raise AssertionError( + "Contract violation public_key not provided" + ) # pragma: no cover if self.signing_key is None: - raise AssertionError("Contract violation signing_key not provided") + raise AssertionError( + "Contract violation signing_key not provided" + ) # pragma: no cover return self._is_partially_valid() and all( [ self._compare_options() if self.original_data.type == "user" else True, @@ -439,7 +445,9 @@ class Certificate(OpensshModule): def _is_partially_valid(self) -> bool: if self.original_data is None: - raise AssertionError("Contract violation original_data not provided") + raise AssertionError( + "Contract violation original_data not provided" + ) # pragma: no cover return all( [ set(self.original_data.principals) == set(self.principals), @@ -460,7 +468,9 @@ class Certificate(OpensshModule): def _compare_time_parameters(self) -> bool: if self.original_data is None: - raise AssertionError("Contract violation original_data not provided") + raise AssertionError( + "Contract violation original_data not provided" + ) # pragma: no cover try: original_time_parameters = OpensshCertificateTimeParameters( valid_from=self.original_data.valid_after, @@ -481,7 +491,9 @@ class Certificate(OpensshModule): def _compare_options(self) -> bool: if self.original_data is None: - raise AssertionError("Contract violation original_data not provided") + raise AssertionError( + "Contract violation original_data not provided" + ) # pragma: no cover try: critical_options, extensions = parse_option_list(self.options) except ValueError as e: @@ -518,11 +530,17 @@ class Certificate(OpensshModule): def _generate_temp_certificate(self) -> str: if self.public_key is None: - raise AssertionError("Contract violation public_key not provided") + raise AssertionError( + "Contract violation public_key not provided" + ) # pragma: no cover if self.signing_key is None: - raise AssertionError("Contract violation signing_key not provided") + raise AssertionError( + "Contract violation signing_key not provided" + ) # pragma: no cover if self.time_parameters is None: - raise AssertionError("Contract violation time_parameters not provided") + raise AssertionError( + "Contract violation time_parameters not provided" + ) # pragma: no cover key_copy = os.path.join(self.module.tmpdir, os.path.basename(self.public_key)) diff --git a/plugins/modules/openssl_csr.py b/plugins/modules/openssl_csr.py index ae6ad4c3..99a46243 100644 --- a/plugins/modules/openssl_csr.py +++ b/plugins/modules/openssl_csr.py @@ -258,8 +258,8 @@ from ansible_collections.community.crypto.plugins.module_utils._io import ( if t.TYPE_CHECKING: - from ansible.module_utils.basic import AnsibleModule - from ansible_collections.community.crypto.plugins.module_utils._crypto.module_backends.csr import ( + from ansible.module_utils.basic import AnsibleModule # pragma: no cover + from ansible_collections.community.crypto.plugins.module_utils._crypto.module_backends.csr import ( # pragma: no cover CertificateSigningRequestBackend, ) diff --git a/plugins/modules/openssl_csr_pipe.py b/plugins/modules/openssl_csr_pipe.py index 695c2441..76c7fb07 100644 --- a/plugins/modules/openssl_csr_pipe.py +++ b/plugins/modules/openssl_csr_pipe.py @@ -139,8 +139,8 @@ from ansible_collections.community.crypto.plugins.module_utils._crypto.module_ba if t.TYPE_CHECKING: - from ansible.module_utils.basic import AnsibleModule - from ansible_collections.community.crypto.plugins.module_utils._crypto.module_backends.csr import ( + from ansible.module_utils.basic import AnsibleModule # pragma: no cover + from ansible_collections.community.crypto.plugins.module_utils._crypto.module_backends.csr import ( # pragma: no cover CertificateSigningRequestBackend, ) diff --git a/plugins/modules/openssl_dhparam.py b/plugins/modules/openssl_dhparam.py index ec753aa6..876be6ca 100644 --- a/plugins/modules/openssl_dhparam.py +++ b/plugins/modules/openssl_dhparam.py @@ -416,7 +416,7 @@ def main() -> t.NoReturn: ) dhparam = DHParameterCryptography(module) else: - raise AssertionError("Internal error: unknown backend") + raise AssertionError("Internal error: unknown backend") # pragma: no cover if module.check_mode: result = dhparam.dump() diff --git a/plugins/modules/openssl_pkcs12.py b/plugins/modules/openssl_pkcs12.py index 98ff2965..ada150bd 100644 --- a/plugins/modules/openssl_pkcs12.py +++ b/plugins/modules/openssl_pkcs12.py @@ -334,7 +334,7 @@ else: CRYPTOGRAPHY_HAS_COMPATIBILITY2022 = True if t.TYPE_CHECKING: - from ansible_collections.community.crypto.plugins.module_utils._crypto.cryptography_support import ( + from ansible_collections.community.crypto.plugins.module_utils._crypto.cryptography_support import ( # pragma: no cover CertificateIssuerPrivateKeyTypes, ) @@ -343,7 +343,7 @@ if t.TYPE_CHECKING: t.Union[cryptography.x509.Certificate, None], list[cryptography.x509.Certificate], t.Union[bytes, None], - ] + ] # pragma: no cover def load_certificate_set( @@ -688,7 +688,7 @@ class Pkcs(OpenSSLObject): ]: """Read PKCS#12 file.""" if self.src is None: - raise AssertionError("Contract violation: src is None") + raise AssertionError("Contract violation: src is None") # pragma: no cover try: with open(self.src, "rb") as pkcs12_fh: diff --git a/plugins/modules/openssl_privatekey.py b/plugins/modules/openssl_privatekey.py index da0f5a0c..41dc5a7f 100644 --- a/plugins/modules/openssl_privatekey.py +++ b/plugins/modules/openssl_privatekey.py @@ -174,8 +174,8 @@ from ansible_collections.community.crypto.plugins.module_utils._io import ( if t.TYPE_CHECKING: - from ansible.module_utils.basic import AnsibleModule - from ansible_collections.community.crypto.plugins.module_utils._crypto.module_backends.privatekey import ( + from ansible.module_utils.basic import AnsibleModule # pragma: no cover + from ansible_collections.community.crypto.plugins.module_utils._crypto.module_backends.privatekey import ( # pragma: no cover PrivateKeyBackend, ) diff --git a/plugins/modules/openssl_privatekey_convert.py b/plugins/modules/openssl_privatekey_convert.py index 2f01640b..f1070535 100644 --- a/plugins/modules/openssl_privatekey_convert.py +++ b/plugins/modules/openssl_privatekey_convert.py @@ -79,8 +79,8 @@ from ansible_collections.community.crypto.plugins.module_utils._io import ( if t.TYPE_CHECKING: - from ansible.module_utils.basic import AnsibleModule - from ansible_collections.community.crypto.plugins.module_utils._crypto.module_backends.privatekey_convert import ( + from ansible.module_utils.basic import AnsibleModule # pragma: no cover + from ansible_collections.community.crypto.plugins.module_utils._crypto.module_backends.privatekey_convert import ( # pragma: no cover PrivateKeyConvertBackend, ) @@ -115,7 +115,9 @@ class PrivateKeyConvertModule(OpenSSLObject): # Convert privatekey_data = self.module_backend.get_private_key_data() if privatekey_data is None: - raise AssertionError("Contract violation: privatekey_data is None") + raise AssertionError( + "Contract violation: privatekey_data is None" + ) # pragma: no cover if not self.check_mode: if self.backup: self.backup_file = module.backup_local(self.path) diff --git a/plugins/modules/openssl_publickey.py b/plugins/modules/openssl_publickey.py index c82f75c9..bd2a405d 100644 --- a/plugins/modules/openssl_publickey.py +++ b/plugins/modules/openssl_publickey.py @@ -220,7 +220,7 @@ except ImportError: pass if t.TYPE_CHECKING: - from cryptography.hazmat.primitives.asymmetric.types import ( + from cryptography.hazmat.primitives.asymmetric.types import ( # pragma: no cover PrivateKeyTypes, PublicKeyTypes, ) diff --git a/plugins/modules/x509_certificate.py b/plugins/modules/x509_certificate.py index 7eda271c..53f7acf7 100644 --- a/plugins/modules/x509_certificate.py +++ b/plugins/modules/x509_certificate.py @@ -238,8 +238,8 @@ from ansible_collections.community.crypto.plugins.module_utils._io import ( if t.TYPE_CHECKING: - from ansible.module_utils.basic import AnsibleModule - from ansible_collections.community.crypto.plugins.module_utils._crypto.module_backends.certificate import ( + from ansible.module_utils.basic import AnsibleModule # pragma: no cover + from ansible_collections.community.crypto.plugins.module_utils._crypto.module_backends.certificate import ( # pragma: no cover CertificateBackend, ) diff --git a/plugins/modules/x509_certificate_pipe.py b/plugins/modules/x509_certificate_pipe.py index 32946173..589f2c03 100644 --- a/plugins/modules/x509_certificate_pipe.py +++ b/plugins/modules/x509_certificate_pipe.py @@ -136,8 +136,8 @@ from ansible_collections.community.crypto.plugins.module_utils._crypto.module_ba if t.TYPE_CHECKING: - from ansible.module_utils.basic import AnsibleModule - from ansible_collections.community.crypto.plugins.module_utils._crypto.module_backends.certificate import ( + from ansible.module_utils.basic import AnsibleModule # pragma: no cover + from ansible_collections.community.crypto.plugins.module_utils._crypto.module_backends.certificate import ( # pragma: no cover CertificateBackend, ) diff --git a/plugins/modules/x509_crl.py b/plugins/modules/x509_crl.py index e659e5ad..b6c1773f 100644 --- a/plugins/modules/x509_crl.py +++ b/plugins/modules/x509_crl.py @@ -497,7 +497,7 @@ except ImportError: pass if t.TYPE_CHECKING: - import datetime + import datetime # pragma: no cover class CRLError(OpenSSLObjectError): diff --git a/plugins/plugin_utils/_action_module.py b/plugins/plugin_utils/_action_module.py index 7586c1cb..e94c1482 100644 --- a/plugins/plugin_utils/_action_module.py +++ b/plugins/plugin_utils/_action_module.py @@ -141,7 +141,7 @@ class AnsibleActionModule: collection_name: str | None = None, ) -> None: if version is not None and date is not None: - raise AssertionError( + raise AssertionError( # pragma: no cover "implementation error -- version and date must not both be set" ) @@ -203,13 +203,13 @@ class AnsibleActionModule: kwargs = remove_values(kwargs, self.no_log_values) raise _ModuleExitException(kwargs) - def exit_json(self, **kwargs) -> t.NoReturn: + def exit_json(self, **kwargs: t.Any) -> t.NoReturn: result = dict(kwargs) if "failed" not in result: result["failed"] = False self._return_formatted(result) - def fail_json(self, msg: str, **kwargs) -> t.NoReturn: + def fail_json(self, msg: str, **kwargs: t.Any) -> t.NoReturn: result = dict(kwargs) result["failed"] = True result["msg"] = msg @@ -226,7 +226,9 @@ class ActionModuleBase(ActionBase, metaclass=abc.ABCMeta): """Run module code""" module.fail_json(msg="Not implemented.") - def run(self, tmp=None, task_vars=None) -> dict[str, t.Any]: + def run( + self, tmp: None = None, task_vars: dict[str, t.Any] | None = None + ) -> dict[str, t.Any]: if task_vars is None: task_vars = {} diff --git a/plugins/plugin_utils/_filter_module.py b/plugins/plugin_utils/_filter_module.py index 767ba592..e9cad339 100644 --- a/plugins/plugin_utils/_filter_module.py +++ b/plugins/plugin_utils/_filter_module.py @@ -24,7 +24,7 @@ class FilterModuleMock: self.params = params self._diff = False - def fail_json(self, msg: str, **kwargs) -> t.NoReturn: + def fail_json(self, msg: str, **kwargs: t.Any) -> t.NoReturn: raise AnsibleFilterError(msg) def warn(self, warning: str) -> None: diff --git a/tests/unit/plugins/module_utils/_acme/backend_data.py b/tests/unit/plugins/module_utils/_acme/backend_data.py index d1a147e8..e008e75b 100644 --- a/tests/unit/plugins/module_utils/_acme/backend_data.py +++ b/tests/unit/plugins/module_utils/_acme/backend_data.py @@ -21,10 +21,20 @@ from ..test__time import TIMEZONES, cartesian_product if t.TYPE_CHECKING: - from ansible_collections.community.crypto.plugins.module_utils._acme.backends import ( + from ansible_collections.community.crypto.plugins.module_utils._acme.backends import ( # pragma: no cover Criterium, ) + class DatetimeKwarg(t.TypedDict): # pragma: no cover + year: int + month: int + day: int + hour: t.NotRequired[int] + minute: t.NotRequired[int] + second: t.NotRequired[int] + microsecond: t.NotRequired[int] + tzinfo: t.NotRequired[datetime.timezone | None] + def load_fixture(name: str) -> str: with open( @@ -133,7 +143,7 @@ TEST_CERT_INFO: list[tuple[str, CertificateInformation, str]] = [ ] -TEST_PARSE_ACME_TIMESTAMP: list[tuple[datetime.timedelta, str, dict[str, int]]] = ( +TEST_PARSE_ACME_TIMESTAMP: list[tuple[datetime.timedelta, str, DatetimeKwarg]] = ( cartesian_product( TIMEZONES, [ @@ -201,7 +211,7 @@ TEST_PARSE_ACME_TIMESTAMP: list[tuple[datetime.timedelta, str, dict[str, int]]] TEST_INTERPOLATE_TIMESTAMP: list[ - tuple[datetime.timedelta, dict[str, int], dict[str, int], float, dict[str, int]] + tuple[datetime.timedelta, DatetimeKwarg, DatetimeKwarg, float, DatetimeKwarg] ] = cartesian_product( TIMEZONES, [ @@ -233,17 +243,17 @@ class FakeBackend(CryptoBackend): *, key_file: str | os.PathLike | None = None, key_content: str | None = None, - passphrase=None, + passphrase: str | None = None, ) -> t.NoReturn: - raise BackendException("Not implemented in fake backend") + raise BackendException("Not implemented in fake backend") # pragma: no cover def sign( self, *, payload64: str, protected64: str, key_data: dict[str, t.Any] | None ) -> t.NoReturn: - raise BackendException("Not implemented in fake backend") + raise BackendException("Not implemented in fake backend") # pragma: no cover def create_mac_key(self, *, alg: str, key: str) -> t.NoReturn: - raise BackendException("Not implemented in fake backend") + raise BackendException("Not implemented in fake backend") # pragma: no cover def get_ordered_csr_identifiers( self, @@ -251,7 +261,7 @@ class FakeBackend(CryptoBackend): csr_filename: str | os.PathLike | None = None, csr_content: str | bytes | None = None, ) -> t.NoReturn: - raise BackendException("Not implemented in fake backend") + raise BackendException("Not implemented in fake backend") # pragma: no cover def get_csr_identifiers( self, @@ -259,7 +269,7 @@ class FakeBackend(CryptoBackend): csr_filename: str | os.PathLike | None = None, csr_content: str | bytes | None = None, ) -> t.NoReturn: - raise BackendException("Not implemented in fake backend") + raise BackendException("Not implemented in fake backend") # pragma: no cover def get_cert_days( self, @@ -268,10 +278,10 @@ class FakeBackend(CryptoBackend): cert_content: str | bytes | None = None, now: datetime.datetime | None = None, ) -> t.NoReturn: - raise BackendException("Not implemented in fake backend") + raise BackendException("Not implemented in fake backend") # pragma: no cover def create_chain_matcher(self, *, criterium: Criterium) -> t.NoReturn: - raise BackendException("Not implemented in fake backend") + raise BackendException("Not implemented in fake backend") # pragma: no cover def get_cert_information( self, @@ -279,4 +289,4 @@ class FakeBackend(CryptoBackend): cert_filename: str | os.PathLike | None = None, cert_content: str | bytes | None = None, ) -> t.NoReturn: - raise BackendException("Not implemented in fake backend") + raise BackendException("Not implemented in fake backend") # pragma: no cover diff --git a/tests/unit/plugins/module_utils/_acme/test_backend_cryptography.py b/tests/unit/plugins/module_utils/_acme/test_backend_cryptography.py index acd67390..0932f1be 100644 --- a/tests/unit/plugins/module_utils/_acme/test_backend_cryptography.py +++ b/tests/unit/plugins/module_utils/_acme/test_backend_cryptography.py @@ -5,6 +5,7 @@ from __future__ import annotations import datetime +import pathlib import typing as t from unittest.mock import ( MagicMock, @@ -37,10 +38,12 @@ from .backend_data import ( if t.TYPE_CHECKING: - from ansible_collections.community.crypto.plugins.module_utils._acme.backends import ( + from ansible_collections.community.crypto.plugins.module_utils._acme.backends import ( # pragma: no cover CertificateInformation, ) + from .backend_data import DatetimeKwarg # pragma: no cover + if not HAS_CURRENT_CRYPTOGRAPHY: pytest.skip("cryptography not found") @@ -48,10 +51,10 @@ if not HAS_CURRENT_CRYPTOGRAPHY: @pytest.mark.parametrize("pem, result, dummy", TEST_KEYS) def test_eckeyparse_cryptography( - pem: str, result: dict[str, t.Any], dummy: str, tmpdir + pem: str, result: dict[str, t.Any], dummy: str, tmp_path: pathlib.Path ) -> None: - fn = tmpdir / "test.pem" - fn.write(pem) + fn = tmp_path / "test.pem" + fn.write_text(pem) module = MagicMock() backend = CryptographyBackend(module=module) key = backend.parse_key(key_file=str(fn)) @@ -64,10 +67,10 @@ def test_eckeyparse_cryptography( @pytest.mark.parametrize("csr, result, openssl_output", TEST_CSRS) def test_csridentifiers_cryptography( - csr: str, result: set[tuple[str, str]], openssl_output: str, tmpdir + csr: str, result: set[tuple[str, str]], openssl_output: str, tmp_path: pathlib.Path ) -> None: - fn = tmpdir / "test.csr" - fn.write(csr) + fn = tmp_path / "test.csr" + fn.write_text(csr) module = MagicMock() backend = CryptographyBackend(module=module) identifiers = backend.get_csr_identifiers(csr_filename=str(fn)) @@ -78,11 +81,14 @@ def test_csridentifiers_cryptography( @pytest.mark.parametrize("timezone, now, expected_days", TEST_CERT_DAYS) def test_certdays_cryptography( - timezone: datetime.timedelta, now: datetime.datetime, expected_days: int, tmpdir + timezone: datetime.timedelta, + now: datetime.datetime, + expected_days: int, + tmp_path: pathlib.Path, ) -> None: with freeze_time("2024-02-03 04:05:06", tz_offset=timezone): - fn = tmpdir / "test-cert.pem" - fn.write(TEST_CERT) + fn = tmp_path / "test-cert.pem" + fn.write_text(TEST_CERT) module = MagicMock() backend = CryptographyBackend(module=module) days = backend.get_cert_days(cert_filename=str(fn), now=now) @@ -98,10 +104,10 @@ def test_get_cert_information( cert_content: str, expected_cert_info: CertificateInformation, openssl_output: str, - tmpdir, + tmp_path: pathlib.Path, ) -> None: - fn = tmpdir / "test-cert.pem" - fn.write(cert_content) + fn = tmp_path / "test-cert.pem" + fn.write_text(cert_content) module = MagicMock() backend = CryptographyBackend(module=module) @@ -138,7 +144,7 @@ def test_now(timezone: datetime.timedelta) -> None: @pytest.mark.parametrize("timezone, timestamp_str, expected", TEST_PARSE_ACME_TIMESTAMP) def test_parse_acme_timestamp( - timezone: datetime.timedelta, timestamp_str: str, expected: dict[str, int] + timezone: datetime.timedelta, timestamp_str: str, expected: DatetimeKwarg ) -> None: with freeze_time("2024-02-03 04:05:06 +00:00", tz_offset=timezone): module = MagicMock() @@ -153,10 +159,10 @@ def test_parse_acme_timestamp( ) def test_interpolate_timestamp( timezone: datetime.timedelta, - start: dict[str, int], - end: dict[str, int], + start: DatetimeKwarg, + end: DatetimeKwarg, percentage: float, - expected: dict[str, int], + expected: DatetimeKwarg, ) -> None: with freeze_time("2024-02-03 04:05:06", tz_offset=timezone): module = MagicMock() diff --git a/tests/unit/plugins/module_utils/_acme/test_backend_openssl_cli.py b/tests/unit/plugins/module_utils/_acme/test_backend_openssl_cli.py index fab56af5..9804ebe8 100644 --- a/tests/unit/plugins/module_utils/_acme/test_backend_openssl_cli.py +++ b/tests/unit/plugins/module_utils/_acme/test_backend_openssl_cli.py @@ -5,6 +5,7 @@ from __future__ import annotations import datetime +import pathlib import typing as t from unittest.mock import ( MagicMock, @@ -33,10 +34,12 @@ from .backend_data import ( if t.TYPE_CHECKING: - from ansible_collections.community.crypto.plugins.module_utils._acme.backends import ( + from ansible_collections.community.crypto.plugins.module_utils._acme.backends import ( # pragma: no cover CertificateInformation, ) + from .backend_data import DatetimeKwarg # pragma: no cover + # from ..test_time import TIMEZONES @@ -55,10 +58,10 @@ TEST_IPS = [ @pytest.mark.parametrize("pem, result, openssl_output", TEST_KEYS) def test_eckeyparse_openssl( - pem: str, result: dict[str, t.Any], openssl_output: str, tmpdir + pem: str, result: dict[str, t.Any], openssl_output: str, tmp_path: pathlib.Path ) -> None: - fn = tmpdir / "test.key" - fn.write(pem) + fn = tmp_path / "test.key" + fn.write_text(pem) module = MagicMock() module.run_command = MagicMock(return_value=(0, openssl_output, 0)) backend = OpenSSLCLIBackend(module=module, openssl_binary="openssl") @@ -69,10 +72,10 @@ def test_eckeyparse_openssl( @pytest.mark.parametrize("csr, result, openssl_output", TEST_CSRS) def test_csridentifiers_openssl( - csr: str, result: set[tuple[str, str]], openssl_output: str, tmpdir + csr: str, result: set[tuple[str, str]], openssl_output: str, tmp_path: pathlib.Path ) -> None: - fn = tmpdir / "test.csr" - fn.write(csr) + fn = tmp_path / "test.csr" + fn.write_text(csr) module = MagicMock() module.run_command = MagicMock(return_value=(0, openssl_output, 0)) backend = OpenSSLCLIBackend(module=module, openssl_binary="openssl") @@ -89,11 +92,14 @@ def test_normalize_ip(ip: str, result: str) -> None: @pytest.mark.parametrize("timezone, now, expected_days", TEST_CERT_DAYS) def test_certdays_cryptography( - timezone: datetime.timedelta, now: datetime.datetime, expected_days: int, tmpdir + timezone: datetime.timedelta, + now: datetime.datetime, + expected_days: int, + tmp_path: pathlib.Path, ) -> None: with freeze_time("2024-02-03 04:05:06", tz_offset=timezone): - fn = tmpdir / "test-cert.pem" - fn.write(TEST_CERT) + fn = tmp_path / "test-cert.pem" + fn.write_text(TEST_CERT) module = MagicMock() module.run_command = MagicMock(return_value=(0, TEST_CERT_OPENSSL_OUTPUT, 0)) backend = OpenSSLCLIBackend(module=module, openssl_binary="openssl") @@ -110,10 +116,10 @@ def test_get_cert_information( cert_content: str, expected_cert_info: CertificateInformation, openssl_output: str, - tmpdir, + tmp_path: pathlib.Path, ) -> None: - fn = tmpdir / "test-cert.pem" - fn.write(cert_content) + fn = tmp_path / "test-cert.pem" + fn.write_text(cert_content) module = MagicMock() module.run_command = MagicMock(return_value=(0, openssl_output, 0)) backend = OpenSSLCLIBackend(module=module, openssl_binary="openssl") @@ -144,7 +150,7 @@ def test_now(timezone: datetime.timedelta) -> None: @pytest.mark.parametrize("timezone, timestamp_str, expected", TEST_PARSE_ACME_TIMESTAMP) def test_parse_acme_timestamp( - timezone: datetime.timedelta, timestamp_str: str, expected: dict[str, int] + timezone: datetime.timedelta, timestamp_str: str, expected: DatetimeKwarg ) -> None: with freeze_time("2024-02-03 04:05:06", tz_offset=timezone): module = MagicMock() @@ -159,10 +165,10 @@ def test_parse_acme_timestamp( ) def test_interpolate_timestamp( timezone: datetime.timedelta, - start: dict[str, int], - end: dict[str, int], + start: DatetimeKwarg, + end: DatetimeKwarg, percentage: float, - expected: dict[str, int], + expected: DatetimeKwarg, ) -> None: with freeze_time("2024-02-03 04:05:06", tz_offset=timezone): module = MagicMock() diff --git a/tests/unit/plugins/module_utils/_acme/test_io.py b/tests/unit/plugins/module_utils/_acme/test_io.py index 3a194200..2c61707b 100644 --- a/tests/unit/plugins/module_utils/_acme/test_io.py +++ b/tests/unit/plugins/module_utils/_acme/test_io.py @@ -4,6 +4,7 @@ from __future__ import annotations +import pathlib from unittest.mock import ( MagicMock, ) @@ -18,14 +19,14 @@ TEST_TEXT = r"""1234 5678""" -def test_read_file(tmpdir) -> None: - fn = tmpdir / "test.txt" - fn.write(TEST_TEXT) +def test_read_file(tmp_path: pathlib.Path) -> None: + fn = tmp_path / "test.txt" + fn.write_text(TEST_TEXT) assert read_file(str(fn)) == TEST_TEXT.encode("utf-8") -def test_write_file(tmpdir) -> None: - fn = tmpdir / "test.txt" +def test_write_file(tmp_path: pathlib.Path) -> None: + fn = tmp_path / "test.txt" module = MagicMock() write_file(module=module, dest=str(fn), content=TEST_TEXT.encode("utf-8")) - assert fn.read() == TEST_TEXT + assert fn.read_text() == TEST_TEXT diff --git a/tests/unit/plugins/module_utils/_acme/test_utils.py b/tests/unit/plugins/module_utils/_acme/test_utils.py index 8f8b84de..79e376cc 100644 --- a/tests/unit/plugins/module_utils/_acme/test_utils.py +++ b/tests/unit/plugins/module_utils/_acme/test_utils.py @@ -5,6 +5,7 @@ from __future__ import annotations import datetime +import pathlib import typing as t import pytest @@ -100,9 +101,9 @@ def test_nopad_b64(value: str, result: str) -> None: @pytest.mark.parametrize("pem, der", TEST_PEM_DERS) -def test_pem_to_der(pem: str, der: bytes, tmpdir): - fn = tmpdir / "test.pem" - fn.write(pem) +def test_pem_to_der(pem: str, der: bytes, tmp_path: pathlib.Path) -> None: + fn = tmp_path / "test.pem" + fn.write_text(pem) assert pem_to_der(pem_filename=str(fn)) == der assert pem_to_der(pem_content=pem) == der @@ -111,9 +112,9 @@ def test_pem_to_der(pem: str, der: bytes, tmpdir): def test_process_links( value: dict[str, t.Any], expected_result: list[tuple[str, str]] ) -> None: - data = [] + data: list[tuple[str, str]] = [] - def callback(url, rel): + def callback(url: str, rel: str) -> None: data.append((url, rel)) process_links(info=value, callback=callback) diff --git a/tests/unit/plugins/module_utils/_crypto/test_asn1.py b/tests/unit/plugins/module_utils/_crypto/test_asn1.py index a1de18e3..811ec4f6 100644 --- a/tests/unit/plugins/module_utils/_crypto/test_asn1.py +++ b/tests/unit/plugins/module_utils/_crypto/test_asn1.py @@ -5,6 +5,7 @@ from __future__ import annotations import base64 +import pathlib import re import subprocess @@ -104,7 +105,7 @@ def test_serialize_asn1_string_as_der_invalid_type() -> None: @pytest.mark.skip() # This is to just to build the test case assertions and shouldn't run normally. @pytest.mark.parametrize("value, expected", TEST_CASES) -def test_test_cases(value: str, expected: bytes, tmp_path) -> None: +def test_test_cases(value: str, expected: bytes, tmp_path: pathlib.Path) -> None: test_file = tmp_path / "test.der" subprocess.run( ["openssl", "asn1parse", "-genstr", value, "-noout", "-out", test_file], diff --git a/tests/unit/plugins/module_utils/_crypto/test_cryptography_support.py b/tests/unit/plugins/module_utils/_crypto/test_cryptography_support.py index 7979382e..9183c6ff 100644 --- a/tests/unit/plugins/module_utils/_crypto/test_cryptography_support.py +++ b/tests/unit/plugins/module_utils/_crypto/test_cryptography_support.py @@ -254,6 +254,6 @@ def test_parse_dn(name: bytes, expected: list[NameAttribute]) -> None: ), ], ) -def test_parse_dn_failure(name: bytes, message: str): +def test_parse_dn_failure(name: bytes, message: str) -> None: with pytest.raises(OpenSSLObjectError, match=f"^{re.escape(message)}$"): _parse_dn(name) diff --git a/tests/unit/plugins/module_utils/_crypto/test_pem.py b/tests/unit/plugins/module_utils/_crypto/test_pem.py index 0597162c..60872784 100644 --- a/tests/unit/plugins/module_utils/_crypto/test_pem.py +++ b/tests/unit/plugins/module_utils/_crypto/test_pem.py @@ -60,7 +60,7 @@ def test_pem_handling( pems: list[str], is_pem: bool, private_key_type: t.Literal["raw", "pkcs1", "pkcs8", "unknown-pem"], -): +) -> None: assert identify_pem_format(data) == is_pem assert identify_private_key_format(data) == private_key_type try: diff --git a/tests/unit/plugins/module_utils/_openssh/test_certificate.py b/tests/unit/plugins/module_utils/_openssh/test_certificate.py index 4b27ce37..f3109a29 100644 --- a/tests/unit/plugins/module_utils/_openssh/test_certificate.py +++ b/tests/unit/plugins/module_utils/_openssh/test_certificate.py @@ -4,6 +4,8 @@ from __future__ import annotations +import pathlib + import pytest from ansible_collections.community.crypto.plugins.module_utils._openssh.certificate import ( OpensshCertificate, @@ -303,9 +305,9 @@ INVALID_OPTIONS: list[str | list] = [ ] -def test_rsa_certificate(tmpdir) -> None: - cert_file = tmpdir / "id_rsa-cert.pub" - cert_file.write(RSA_CERT_SIGNED_BY_DSA, mode="wb") +def test_rsa_certificate(tmp_path: pathlib.Path) -> None: + cert_file = tmp_path / "id_rsa-cert.pub" + cert_file.write_bytes(RSA_CERT_SIGNED_BY_DSA) cert = OpensshCertificate.load(str(cert_file)) assert cert.key_id == "test" @@ -315,9 +317,9 @@ def test_rsa_certificate(tmpdir) -> None: assert cert.signing_key == DSA_FINGERPRINT -def test_dsa_certificate(tmpdir) -> None: - cert_file = tmpdir / "id_dsa-cert.pub" - cert_file.write(DSA_CERT_SIGNED_BY_ECDSA_NO_OPTS) +def test_dsa_certificate(tmp_path: pathlib.Path) -> None: + cert_file = tmp_path / "id_dsa-cert.pub" + cert_file.write_bytes(DSA_CERT_SIGNED_BY_ECDSA_NO_OPTS) cert = OpensshCertificate.load(str(cert_file)) @@ -328,9 +330,9 @@ def test_dsa_certificate(tmpdir) -> None: assert cert.extensions == [] -def test_ecdsa_certificate(tmpdir) -> None: - cert_file = tmpdir / "id_ecdsa-cert.pub" - cert_file.write(ECDSA_CERT_SIGNED_BY_ED25519_VALID_OPTS) +def test_ecdsa_certificate(tmp_path: pathlib.Path) -> None: + cert_file = tmp_path / "id_ecdsa-cert.pub" + cert_file.write_bytes(ECDSA_CERT_SIGNED_BY_ED25519_VALID_OPTS) cert = OpensshCertificate.load(str(cert_file)) assert cert.type_string == "ecdsa-sha2-nistp256-cert-v01@openssh.com" @@ -340,9 +342,9 @@ def test_ecdsa_certificate(tmpdir) -> None: assert cert.extensions == VALID_EXTENSIONS -def test_ed25519_certificate(tmpdir) -> None: - cert_file = tmpdir / "id_ed25519-cert.pub" - cert_file.write(ED25519_CERT_SIGNED_BY_RSA_INVALID_OPTS) +def test_ed25519_certificate(tmp_path: pathlib.Path) -> None: + cert_file = tmp_path / "id_ed25519-cert.pub" + cert_file.write_bytes(ED25519_CERT_SIGNED_BY_RSA_INVALID_OPTS) cert = OpensshCertificate.load(str(cert_file)) assert cert.type_string == "ssh-ed25519-cert-v01@openssh.com" @@ -352,10 +354,10 @@ def test_ed25519_certificate(tmpdir) -> None: assert cert.extensions == INVALID_EXTENSIONS -def test_invalid_data(tmpdir) -> None: +def test_invalid_data(tmp_path: pathlib.Path) -> None: result = False - cert_file = tmpdir / "invalid-cert.pub" - cert_file.write(INVALID_DATA) + cert_file = tmp_path / "invalid-cert.pub" + cert_file.write_bytes(INVALID_DATA) try: OpensshCertificate.load(str(cert_file)) diff --git a/tests/unit/plugins/module_utils/_openssh/test_cryptography.py b/tests/unit/plugins/module_utils/_openssh/test_cryptography.py index c77cb74b..eb16b4f1 100644 --- a/tests/unit/plugins/module_utils/_openssh/test_cryptography.py +++ b/tests/unit/plugins/module_utils/_openssh/test_cryptography.py @@ -25,7 +25,7 @@ from ansible_collections.community.crypto.plugins.module_utils._openssh.cryptogr if t.TYPE_CHECKING: - from ansible_collections.community.crypto.plugins.module_utils._openssh.cryptography import ( + from ansible_collections.community.crypto.plugins.module_utils._openssh.cryptography import ( # pragma: no cover KeyType, ) diff --git a/tests/unit/plugins/module_utils/test__time.py b/tests/unit/plugins/module_utils/test__time.py index 5b432725..4cd77235 100644 --- a/tests/unit/plugins/module_utils/test__time.py +++ b/tests/unit/plugins/module_utils/test__time.py @@ -31,8 +31,8 @@ TIMEZONES = [ if t.TYPE_CHECKING: - _S = t.TypeVar("_S") - _Ts = t.TypeVarTuple("_Ts") + _S = t.TypeVar("_S") # pragma: no cover + _Ts = t.TypeVarTuple("_Ts") # pragma: no cover def cartesian_product( diff --git a/tests/unit/plugins/modules/test_luks_device.py b/tests/unit/plugins/modules/test_luks_device.py index b703f701..7be2abb5 100644 --- a/tests/unit/plugins/modules/test_luks_device.py +++ b/tests/unit/plugins/modules/test_luks_device.py @@ -12,20 +12,20 @@ from ansible_collections.community.crypto.plugins.modules import luks_device class DummyModule: # module to mock AnsibleModule class - def __init__(self): - self.params = {} + def __init__(self) -> None: + self.params: dict[str, t.Any] = {} - def fail_json(self, msg=""): + def fail_json(self, msg: str = "") -> t.NoReturn: raise ValueError(msg) - def get_bin_path(self, command, dummy): + def get_bin_path(self, command: str, dummy: bool) -> str | None: return command # ===== Handler & CryptHandler methods tests ===== -def test_generate_luks_name(monkeypatch) -> None: +def test_generate_luks_name(monkeypatch: pytest.MonkeyPatch) -> None: module = DummyModule() module.params["passphrase_encoding"] = "text" monkeypatch.setattr( @@ -35,7 +35,7 @@ def test_generate_luks_name(monkeypatch) -> None: assert crypt.generate_luks_name("/dev/dummy") == "luks-UUID" -def test_get_container_name_by_device(monkeypatch) -> None: +def test_get_container_name_by_device(monkeypatch: pytest.MonkeyPatch) -> None: module = DummyModule() module.params["passphrase_encoding"] = "text" monkeypatch.setattr( @@ -47,7 +47,7 @@ def test_get_container_name_by_device(monkeypatch) -> None: assert crypt.get_container_name_by_device("/dev/dummy") == "container_name" -def test_get_container_device_by_name(monkeypatch) -> None: +def test_get_container_device_by_name(monkeypatch: pytest.MonkeyPatch) -> None: module = DummyModule() module.params["passphrase_encoding"] = "text" monkeypatch.setattr( @@ -59,8 +59,10 @@ def test_get_container_device_by_name(monkeypatch) -> None: assert crypt.get_container_device_by_name("dummy") == "/dev/luksdevice" -def test_run_luks_remove(monkeypatch) -> None: - def run_command_check(self, command: list[str]) -> tuple[int, str, str]: +def test_run_luks_remove(monkeypatch: pytest.MonkeyPatch) -> None: + def run_command_check( + self: luks_device.Handler, command: list[str] + ) -> tuple[int, str, str]: # check that wipefs command is actually called assert command[0] == "wipefs" return 0, "", "" @@ -249,7 +251,7 @@ def test_luks_create( cipher: str | None, hash_: str | None, expected: bool | t.Literal["exception"], - monkeypatch, + monkeypatch: pytest.MonkeyPatch, ) -> None: module = DummyModule() @@ -286,7 +288,7 @@ def test_luks_remove( state: t.Literal["present", "absent", "opened", "closed"], is_luks: bool, expected: bool | t.Literal["exception"], - monkeypatch, + monkeypatch: pytest.MonkeyPatch, ) -> None: module = DummyModule() @@ -315,7 +317,7 @@ def test_luks_open( name: str | None, name_by_dev: str | None, expected: bool | t.Literal["exception"], - monkeypatch, + monkeypatch: pytest.MonkeyPatch, ) -> None: module = DummyModule() module.params["device"] = device @@ -356,7 +358,7 @@ def test_luks_close( state: t.Literal["present", "absent", "opened", "closed"], label: str | None, expected: bool | t.Literal["exception"], - monkeypatch, + monkeypatch: pytest.MonkeyPatch, ) -> None: module = DummyModule() module.params["device"] = device @@ -397,7 +399,7 @@ def test_luks_add_key( state: t.Literal["present", "absent", "opened", "closed"], label: str | None, expected: bool | t.Literal["exception"], - monkeypatch, + monkeypatch: pytest.MonkeyPatch, ) -> None: module = DummyModule() module.params["device"] = device @@ -438,7 +440,7 @@ def test_luks_remove_key( state: t.Literal["present", "absent", "opened", "closed"], label: str | None, expected: bool | t.Literal["exception"], - monkeypatch, + monkeypatch: pytest.MonkeyPatch, ) -> None: module = DummyModule() module.params["device"] = device