From f68b0d0c08bc927497bf75667562e6472889da78 Mon Sep 17 00:00:00 2001 From: Felix Fontein Date: Sun, 1 Jun 2025 21:33:20 +0200 Subject: [PATCH] Improve type hints. (#913) --- changelogs/fragments/refactoring.yml | 2 +- plugins/filter/openssl_csr_info.py | 4 +- plugins/filter/x509_certificate_info.py | 4 +- plugins/filter/x509_crl_info.py | 4 +- plugins/module_utils/_acme/acme.py | 76 +++++++++++++++++-- .../module_utils/_acme/backend_openssl_cli.py | 4 +- plugins/module_utils/_acme/io.py | 4 +- .../module_backends/certificate_acme.py | 1 + .../module_utils/_openssh/backends/common.py | 3 +- plugins/modules/certificate_complete_chain.py | 12 +-- tests/sanity/ignore-2.17.txt | 1 + tests/sanity/ignore-2.18.txt | 1 + tests/sanity/ignore-2.19.txt | 1 + 13 files changed, 95 insertions(+), 22 deletions(-) diff --git a/changelogs/fragments/refactoring.yml b/changelogs/fragments/refactoring.yml index 19928e66..729c991b 100644 --- a/changelogs/fragments/refactoring.yml +++ b/changelogs/fragments/refactoring.yml @@ -1,3 +1,3 @@ minor_changes: - - "Various code refactorings (https://github.com/ansible-collections/community.crypto/pull/905, https://github.com/ansible-collections/community.crypto/pull/909, https://github.com/ansible-collections/community.crypto/pull/911)." + - "Various code refactorings (https://github.com/ansible-collections/community.crypto/pull/905, https://github.com/ansible-collections/community.crypto/pull/909, https://github.com/ansible-collections/community.crypto/pull/911, https://github.com/ansible-collections/community.crypto/pull/913)." - "Remove various no longer needed abstraction layers for multiple backends (https://github.com/ansible-collections/community.crypto/pull/912)." diff --git a/plugins/filter/openssl_csr_info.py b/plugins/filter/openssl_csr_info.py index 13035694..d71bea25 100644 --- a/plugins/filter/openssl_csr_info.py +++ b/plugins/filter/openssl_csr_info.py @@ -301,7 +301,9 @@ def openssl_csr_info_filter( raise AnsibleFilterError( f"The name_encoding option must be of a text type, not {type(name_encoding)}" ) - name_encoding = to_text(name_encoding) + name_encoding = t.cast( + t.Literal["ignore", "idna", "unicode"], to_text(name_encoding) + ) if name_encoding not in ("ignore", "idna", "unicode"): raise AnsibleFilterError( f'The name_encoding option must be one of the values "ignore", "idna", or "unicode", not "{name_encoding}"' diff --git a/plugins/filter/x509_certificate_info.py b/plugins/filter/x509_certificate_info.py index 639b8785..e70957c5 100644 --- a/plugins/filter/x509_certificate_info.py +++ b/plugins/filter/x509_certificate_info.py @@ -335,7 +335,9 @@ def x509_certificate_info_filter( raise AnsibleFilterError( f"The name_encoding option must be of a text type, not {type(name_encoding)}" ) - name_encoding = to_text(name_encoding) + name_encoding = t.cast( + t.Literal["ignore", "idna", "unicode"], to_text(name_encoding) + ) if name_encoding not in ("ignore", "idna", "unicode"): raise AnsibleFilterError( f'The name_encoding option must be one of the values "ignore", "idna", or "unicode", not "{name_encoding}"' diff --git a/plugins/filter/x509_crl_info.py b/plugins/filter/x509_crl_info.py index 03a5980d..a6901e5c 100644 --- a/plugins/filter/x509_crl_info.py +++ b/plugins/filter/x509_crl_info.py @@ -191,7 +191,9 @@ def x509_crl_info_filter( raise AnsibleFilterError( f"The list_revoked_certificates option must be a boolean, not {type(list_revoked_certificates)}" ) - name_encoding = to_text(name_encoding) + name_encoding = t.cast( + t.Literal["ignore", "idna", "unicode"], to_text(name_encoding) + ) if name_encoding not in ("ignore", "idna", "unicode"): raise AnsibleFilterError( f'The name_encoding option must be one of the values "ignore", "idna", or "unicode", not "{name_encoding}"' diff --git a/plugins/module_utils/_acme/acme.py b/plugins/module_utils/_acme/acme.py index b09c2e8c..cc10aecd 100644 --- a/plugins/module_utils/_acme/acme.py +++ b/plugins/module_utils/_acme/acme.py @@ -285,11 +285,41 @@ class ACMEClient: key_file=key_file, key_content=key_content, passphrase=passphrase ) + @t.overload def sign_request( self, *, protected: dict[str, t.Any], - payload: str | dict[str, t.Any] | None, + payload: dict[str, t.Any] | None, + key_data: dict[str, t.Any], + encode_payload: t.Literal[True] = True, + ) -> dict[str, t.Any]: ... + + @t.overload + def sign_request( + self, + *, + protected: dict[str, t.Any], + payload: str | bytes | None, + key_data: dict[str, t.Any], + encode_payload: t.Literal[False], + ) -> dict[str, t.Any]: ... + + @t.overload + def sign_request( + self, + *, + protected: dict[str, t.Any], + payload: str | bytes | dict[str, t.Any] | None, + key_data: dict[str, t.Any], + encode_payload: bool = True, + ) -> dict[str, t.Any]: ... + + def sign_request( + self, + *, + protected: dict[str, t.Any], + payload: str | bytes | dict[str, t.Any] | None, key_data: dict[str, t.Any], encode_payload: bool = True, ) -> dict[str, t.Any]: @@ -334,12 +364,12 @@ class ACMEClient: def send_signed_request( self, url: str, - payload: str | dict[str, t.Any] | None, + payload: dict[str, t.Any] | None, *, key_data: dict[str, t.Any] | None = None, jws_header: dict[str, t.Any] | None = None, parse_json_result: t.Literal[True] = True, - encode_payload: bool = True, + encode_payload: t.Literal[True] = True, fail_on_error: bool = True, error_msg: str | None = None, expected_status_codes: t.Iterable[int] | None = None, @@ -349,12 +379,42 @@ class ACMEClient: def send_signed_request( self, url: str, - payload: str | dict[str, t.Any] | None, + payload: str | bytes | None, + *, + key_data: dict[str, t.Any] | None = None, + jws_header: dict[str, t.Any] | None = None, + parse_json_result: t.Literal[True] = True, + encode_payload: t.Literal[False], + fail_on_error: bool = True, + error_msg: str | None = None, + expected_status_codes: t.Iterable[int] | None = None, + ) -> tuple[dict[str, t.Any] | bytes, dict[str, t.Any]]: ... + + @t.overload + def send_signed_request( + self, + url: str, + payload: dict[str, t.Any] | None, *, key_data: dict[str, t.Any] | None = None, jws_header: dict[str, t.Any] | None = None, parse_json_result: t.Literal[False], - encode_payload: bool = True, + encode_payload: t.Literal[True] = True, + fail_on_error: bool = True, + error_msg: str | None = None, + expected_status_codes: t.Iterable[int] | None = None, + ) -> tuple[bytes, dict[str, t.Any]]: ... + + @t.overload + def send_signed_request( + self, + url: str, + payload: str | bytes | None, + *, + key_data: dict[str, t.Any] | None = None, + jws_header: dict[str, t.Any] | None = None, + parse_json_result: t.Literal[False], + encode_payload: t.Literal[False], fail_on_error: bool = True, error_msg: str | None = None, expected_status_codes: t.Iterable[int] | None = None, @@ -363,7 +423,7 @@ class ACMEClient: def send_signed_request( self, url: str, - payload: str | dict[str, t.Any] | None, + payload: str | bytes | dict[str, t.Any] | None, *, key_data: dict[str, t.Any] | None = None, jws_header: dict[str, t.Any] | None = None, @@ -404,7 +464,7 @@ class ACMEClient: encode_payload=encode_payload, ) self._log("signed request", data=data) - data = self.module.jsonify(data) + data_str = self.module.jsonify(data) headers = { "Content-Type": "application/jose+json", @@ -412,7 +472,7 @@ class ACMEClient: resp, info = fetch_url( self.module, url, - data=data, + data=data_str, headers=headers, method="POST", timeout=self.request_timeout, diff --git a/plugins/module_utils/_acme/backend_openssl_cli.py b/plugins/module_utils/_acme/backend_openssl_cli.py index 10df3440..53a0e5be 100644 --- a/plugins/module_utils/_acme/backend_openssl_cli.py +++ b/plugins/module_utils/_acme/backend_openssl_cli.py @@ -313,6 +313,8 @@ class OpenSSLCLIBackend(CryptoBackend): f"-{key_data['hash']}", ] + cmd_postfix + out: bytes | str + rc, out, err = self.module.run_command( openssl_sign_cmd, data=sign_payload, @@ -326,7 +328,7 @@ class OpenSSLCLIBackend(CryptoBackend): ) if key_data["type"] == "ec": - dummy, der_out, dummy = self.module.run_command( + dummy, der_out, dummy2 = self.module.run_command( [self.openssl_binary, "asn1parse", "-inform", "DER"], data=out, binary_data=True, diff --git a/plugins/module_utils/_acme/io.py b/plugins/module_utils/_acme/io.py index 20ae1930..c268d342 100644 --- a/plugins/module_utils/_acme/io.py +++ b/plugins/module_utils/_acme/io.py @@ -34,7 +34,7 @@ def read_file(fn: str | os.PathLike) -> bytes: # This function was adapted from an earlier version of https://github.com/ansible/ansible/blob/devel/lib/ansible/modules/uri.py def write_file( - *, module: AnsibleModule, dest: str | os.PathLike, content: bytes + *, module: AnsibleModule, dest: str | os.PathLike[str], content: bytes ) -> bool: """ Write content to destination file dest, only if the content @@ -79,7 +79,7 @@ def write_file( if not os.access(dest, os.R_OK): os.remove(tmpsrc) raise ModuleFailException(f"Destination {dest} not readable") - checksum_dest = module.sha1(dest) + checksum_dest = module.sha1(str(dest)) else: dirname = os.path.dirname(dest) or "." if not os.access(dirname, os.W_OK): diff --git a/plugins/module_utils/_crypto/module_backends/certificate_acme.py b/plugins/module_utils/_crypto/module_backends/certificate_acme.py index 69c7c425..300efaa1 100644 --- a/plugins/module_utils/_crypto/module_backends/certificate_acme.py +++ b/plugins/module_utils/_crypto/module_backends/certificate_acme.py @@ -85,6 +85,7 @@ class AcmeCertificateBackend(CertificateBackend): f.close() command.extend(["--csr", tmpsrc]) else: + assert self.csr_path is not None command.extend(["--csr", self.csr_path]) command.extend(["--acme-dir", self.challenge_path]) command.extend(["--directory-url", self.acme_directory]) diff --git a/plugins/module_utils/_openssh/backends/common.py b/plugins/module_utils/_openssh/backends/common.py index 72d58536..f146ca4b 100644 --- a/plugins/module_utils/_openssh/backends/common.py +++ b/plugins/module_utils/_openssh/backends/common.py @@ -45,7 +45,8 @@ def restore_on_failure( if backup_file is not None: module.atomic_move(os.path.abspath(backup_file), os.path.abspath(path)) raise - module.add_cleanup_file(backup_file) + if backup_file is not None: + module.add_cleanup_file(backup_file) return backup_and_restore diff --git a/plugins/modules/certificate_complete_chain.py b/plugins/modules/certificate_complete_chain.py index d20fef51..06c1733c 100644 --- a/plugins/modules/certificate_complete_chain.py +++ b/plugins/modules/certificate_complete_chain.py @@ -124,7 +124,7 @@ import os import typing as t from ansible.module_utils.basic import AnsibleModule -from ansible.module_utils.common.text.converters import to_bytes +from ansible.module_utils.common.text.converters import to_bytes, to_text from ansible_collections.community.crypto.plugins.module_utils._crypto.pem import ( split_pem_list, ) @@ -229,7 +229,7 @@ def is_parent( def parse_pem_list( module: AnsibleModule, text: str, - source: str | os.PathLike, + source: bytes | str | os.PathLike, fail_on_error: bool = True, ) -> list[Certificate]: """ @@ -242,7 +242,7 @@ def parse_pem_list( cert = cryptography.x509.load_pem_x509_certificate(to_bytes(cert_pem)) result.append(Certificate(cert_pem, cert)) except Exception as e: - msg = f"Cannot parse certificate #{len(result) + 1} from {source}: {e}" + msg = f"Cannot parse certificate #{len(result) + 1} from {to_text(source)!r}: {e}" if fail_on_error: module.fail_json(msg=msg) else: @@ -251,7 +251,7 @@ def parse_pem_list( def load_pem_list( - module: AnsibleModule, path: str | os.PathLike, fail_on_error: bool = True + module: AnsibleModule, path: bytes | str | os.PathLike, fail_on_error: bool = True ) -> list[Certificate]: """ Load concatenated PEM certificates from file. Return list of ``Certificate`` objects. @@ -265,7 +265,7 @@ def load_pem_list( fail_on_error=fail_on_error, ) except Exception as e: - msg = f"Cannot read certificate file {path}: {e}" + msg = f"Cannot read certificate file {to_text(path)!r}: {e}" if fail_on_error: module.fail_json(msg=msg) else: @@ -286,7 +286,7 @@ class CertificateSet: ) self.certificate_by_cert: dict[cryptography.x509.Certificate, Certificate] = {} - def _load_file(self, path: str | os.PathLike) -> None: + def _load_file(self, path: bytes | str | os.PathLike) -> None: certs = load_pem_list(self.module, path, fail_on_error=False) for cert in certs: self.certificates.add(cert) diff --git a/tests/sanity/ignore-2.17.txt b/tests/sanity/ignore-2.17.txt index 93d9a8be..87cc4123 100644 --- a/tests/sanity/ignore-2.17.txt +++ b/tests/sanity/ignore-2.17.txt @@ -6,6 +6,7 @@ plugins/module_utils/_acme/backend_openssl_cli.py pep8:E704 plugins/module_utils/_acme/certificate.py pep8:E704 plugins/module_utils/_crypto/cryptography_support.py pep8:E704 plugins/module_utils/_crypto/module_backends/certificate.py no-assert +plugins/module_utils/_crypto/module_backends/certificate_acme.py no-assert plugins/module_utils/_crypto/module_backends/certificate_ownca.py no-assert plugins/module_utils/_crypto/module_backends/certificate_selfsigned.py no-assert plugins/module_utils/_crypto/module_backends/csr.py no-assert diff --git a/tests/sanity/ignore-2.18.txt b/tests/sanity/ignore-2.18.txt index 67340837..755fbff0 100644 --- a/tests/sanity/ignore-2.18.txt +++ b/tests/sanity/ignore-2.18.txt @@ -5,6 +5,7 @@ plugins/module_utils/_acme/backend_openssl_cli.py pep8:E704 plugins/module_utils/_acme/certificate.py pep8:E704 plugins/module_utils/_crypto/cryptography_support.py pep8:E704 plugins/module_utils/_crypto/module_backends/certificate.py no-assert +plugins/module_utils/_crypto/module_backends/certificate_acme.py no-assert plugins/module_utils/_crypto/module_backends/certificate_ownca.py no-assert plugins/module_utils/_crypto/module_backends/certificate_selfsigned.py no-assert plugins/module_utils/_crypto/module_backends/csr.py no-assert diff --git a/tests/sanity/ignore-2.19.txt b/tests/sanity/ignore-2.19.txt index b796d021..12b1de23 100644 --- a/tests/sanity/ignore-2.19.txt +++ b/tests/sanity/ignore-2.19.txt @@ -1,5 +1,6 @@ meta/runtime.yml runtime-metadata # Bug in ansible-test: https://github.com/ansible/ansible/pull/85198 plugins/module_utils/_crypto/module_backends/certificate.py no-assert +plugins/module_utils/_crypto/module_backends/certificate_acme.py no-assert plugins/module_utils/_crypto/module_backends/certificate_ownca.py no-assert plugins/module_utils/_crypto/module_backends/certificate_selfsigned.py no-assert plugins/module_utils/_crypto/module_backends/csr.py no-assert