Improve type hints. (#913)

This commit is contained in:
Felix Fontein
2025-06-01 21:33:20 +02:00
committed by GitHub
parent 576a06b5b2
commit f68b0d0c08
13 changed files with 95 additions and 22 deletions

View File

@@ -1,3 +1,3 @@
minor_changes:
- "Various code refactorings (https://github.com/ansible-collections/community.crypto/pull/905, https://github.com/ansible-collections/community.crypto/pull/909, https://github.com/ansible-collections/community.crypto/pull/911)."
- "Various code refactorings (https://github.com/ansible-collections/community.crypto/pull/905, https://github.com/ansible-collections/community.crypto/pull/909, https://github.com/ansible-collections/community.crypto/pull/911, https://github.com/ansible-collections/community.crypto/pull/913)."
- "Remove various no longer needed abstraction layers for multiple backends (https://github.com/ansible-collections/community.crypto/pull/912)."

View File

@@ -301,7 +301,9 @@ def openssl_csr_info_filter(
raise AnsibleFilterError(
f"The name_encoding option must be of a text type, not {type(name_encoding)}"
)
name_encoding = to_text(name_encoding)
name_encoding = t.cast(
t.Literal["ignore", "idna", "unicode"], to_text(name_encoding)
)
if name_encoding not in ("ignore", "idna", "unicode"):
raise AnsibleFilterError(
f'The name_encoding option must be one of the values "ignore", "idna", or "unicode", not "{name_encoding}"'

View File

@@ -335,7 +335,9 @@ def x509_certificate_info_filter(
raise AnsibleFilterError(
f"The name_encoding option must be of a text type, not {type(name_encoding)}"
)
name_encoding = to_text(name_encoding)
name_encoding = t.cast(
t.Literal["ignore", "idna", "unicode"], to_text(name_encoding)
)
if name_encoding not in ("ignore", "idna", "unicode"):
raise AnsibleFilterError(
f'The name_encoding option must be one of the values "ignore", "idna", or "unicode", not "{name_encoding}"'

View File

@@ -191,7 +191,9 @@ def x509_crl_info_filter(
raise AnsibleFilterError(
f"The list_revoked_certificates option must be a boolean, not {type(list_revoked_certificates)}"
)
name_encoding = to_text(name_encoding)
name_encoding = t.cast(
t.Literal["ignore", "idna", "unicode"], to_text(name_encoding)
)
if name_encoding not in ("ignore", "idna", "unicode"):
raise AnsibleFilterError(
f'The name_encoding option must be one of the values "ignore", "idna", or "unicode", not "{name_encoding}"'

View File

@@ -285,11 +285,41 @@ class ACMEClient:
key_file=key_file, key_content=key_content, passphrase=passphrase
)
@t.overload
def sign_request(
self,
*,
protected: dict[str, t.Any],
payload: str | dict[str, t.Any] | None,
payload: dict[str, t.Any] | None,
key_data: dict[str, t.Any],
encode_payload: t.Literal[True] = True,
) -> dict[str, t.Any]: ...
@t.overload
def sign_request(
self,
*,
protected: dict[str, t.Any],
payload: str | bytes | None,
key_data: dict[str, t.Any],
encode_payload: t.Literal[False],
) -> dict[str, t.Any]: ...
@t.overload
def sign_request(
self,
*,
protected: dict[str, t.Any],
payload: str | bytes | dict[str, t.Any] | None,
key_data: dict[str, t.Any],
encode_payload: bool = True,
) -> dict[str, t.Any]: ...
def sign_request(
self,
*,
protected: dict[str, t.Any],
payload: str | bytes | dict[str, t.Any] | None,
key_data: dict[str, t.Any],
encode_payload: bool = True,
) -> dict[str, t.Any]:
@@ -334,12 +364,12 @@ class ACMEClient:
def send_signed_request(
self,
url: str,
payload: str | dict[str, t.Any] | None,
payload: dict[str, t.Any] | None,
*,
key_data: dict[str, t.Any] | None = None,
jws_header: dict[str, t.Any] | None = None,
parse_json_result: t.Literal[True] = True,
encode_payload: bool = True,
encode_payload: t.Literal[True] = True,
fail_on_error: bool = True,
error_msg: str | None = None,
expected_status_codes: t.Iterable[int] | None = None,
@@ -349,12 +379,42 @@ class ACMEClient:
def send_signed_request(
self,
url: str,
payload: str | dict[str, t.Any] | None,
payload: str | bytes | None,
*,
key_data: dict[str, t.Any] | None = None,
jws_header: dict[str, t.Any] | None = None,
parse_json_result: t.Literal[True] = True,
encode_payload: t.Literal[False],
fail_on_error: bool = True,
error_msg: str | None = None,
expected_status_codes: t.Iterable[int] | None = None,
) -> tuple[dict[str, t.Any] | bytes, dict[str, t.Any]]: ...
@t.overload
def send_signed_request(
self,
url: str,
payload: dict[str, t.Any] | None,
*,
key_data: dict[str, t.Any] | None = None,
jws_header: dict[str, t.Any] | None = None,
parse_json_result: t.Literal[False],
encode_payload: bool = True,
encode_payload: t.Literal[True] = True,
fail_on_error: bool = True,
error_msg: str | None = None,
expected_status_codes: t.Iterable[int] | None = None,
) -> tuple[bytes, dict[str, t.Any]]: ...
@t.overload
def send_signed_request(
self,
url: str,
payload: str | bytes | None,
*,
key_data: dict[str, t.Any] | None = None,
jws_header: dict[str, t.Any] | None = None,
parse_json_result: t.Literal[False],
encode_payload: t.Literal[False],
fail_on_error: bool = True,
error_msg: str | None = None,
expected_status_codes: t.Iterable[int] | None = None,
@@ -363,7 +423,7 @@ class ACMEClient:
def send_signed_request(
self,
url: str,
payload: str | dict[str, t.Any] | None,
payload: str | bytes | dict[str, t.Any] | None,
*,
key_data: dict[str, t.Any] | None = None,
jws_header: dict[str, t.Any] | None = None,
@@ -404,7 +464,7 @@ class ACMEClient:
encode_payload=encode_payload,
)
self._log("signed request", data=data)
data = self.module.jsonify(data)
data_str = self.module.jsonify(data)
headers = {
"Content-Type": "application/jose+json",
@@ -412,7 +472,7 @@ class ACMEClient:
resp, info = fetch_url(
self.module,
url,
data=data,
data=data_str,
headers=headers,
method="POST",
timeout=self.request_timeout,

View File

@@ -313,6 +313,8 @@ class OpenSSLCLIBackend(CryptoBackend):
f"-{key_data['hash']}",
] + cmd_postfix
out: bytes | str
rc, out, err = self.module.run_command(
openssl_sign_cmd,
data=sign_payload,
@@ -326,7 +328,7 @@ class OpenSSLCLIBackend(CryptoBackend):
)
if key_data["type"] == "ec":
dummy, der_out, dummy = self.module.run_command(
dummy, der_out, dummy2 = self.module.run_command(
[self.openssl_binary, "asn1parse", "-inform", "DER"],
data=out,
binary_data=True,

View File

@@ -34,7 +34,7 @@ def read_file(fn: str | os.PathLike) -> bytes:
# This function was adapted from an earlier version of https://github.com/ansible/ansible/blob/devel/lib/ansible/modules/uri.py
def write_file(
*, module: AnsibleModule, dest: str | os.PathLike, content: bytes
*, module: AnsibleModule, dest: str | os.PathLike[str], content: bytes
) -> bool:
"""
Write content to destination file dest, only if the content
@@ -79,7 +79,7 @@ def write_file(
if not os.access(dest, os.R_OK):
os.remove(tmpsrc)
raise ModuleFailException(f"Destination {dest} not readable")
checksum_dest = module.sha1(dest)
checksum_dest = module.sha1(str(dest))
else:
dirname = os.path.dirname(dest) or "."
if not os.access(dirname, os.W_OK):

View File

@@ -85,6 +85,7 @@ class AcmeCertificateBackend(CertificateBackend):
f.close()
command.extend(["--csr", tmpsrc])
else:
assert self.csr_path is not None
command.extend(["--csr", self.csr_path])
command.extend(["--acme-dir", self.challenge_path])
command.extend(["--directory-url", self.acme_directory])

View File

@@ -45,7 +45,8 @@ def restore_on_failure(
if backup_file is not None:
module.atomic_move(os.path.abspath(backup_file), os.path.abspath(path))
raise
module.add_cleanup_file(backup_file)
if backup_file is not None:
module.add_cleanup_file(backup_file)
return backup_and_restore

View File

@@ -124,7 +124,7 @@ import os
import typing as t
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.common.text.converters import to_bytes
from ansible.module_utils.common.text.converters import to_bytes, to_text
from ansible_collections.community.crypto.plugins.module_utils._crypto.pem import (
split_pem_list,
)
@@ -229,7 +229,7 @@ def is_parent(
def parse_pem_list(
module: AnsibleModule,
text: str,
source: str | os.PathLike,
source: bytes | str | os.PathLike,
fail_on_error: bool = True,
) -> list[Certificate]:
"""
@@ -242,7 +242,7 @@ def parse_pem_list(
cert = cryptography.x509.load_pem_x509_certificate(to_bytes(cert_pem))
result.append(Certificate(cert_pem, cert))
except Exception as e:
msg = f"Cannot parse certificate #{len(result) + 1} from {source}: {e}"
msg = f"Cannot parse certificate #{len(result) + 1} from {to_text(source)!r}: {e}"
if fail_on_error:
module.fail_json(msg=msg)
else:
@@ -251,7 +251,7 @@ def parse_pem_list(
def load_pem_list(
module: AnsibleModule, path: str | os.PathLike, fail_on_error: bool = True
module: AnsibleModule, path: bytes | str | os.PathLike, fail_on_error: bool = True
) -> list[Certificate]:
"""
Load concatenated PEM certificates from file. Return list of ``Certificate`` objects.
@@ -265,7 +265,7 @@ def load_pem_list(
fail_on_error=fail_on_error,
)
except Exception as e:
msg = f"Cannot read certificate file {path}: {e}"
msg = f"Cannot read certificate file {to_text(path)!r}: {e}"
if fail_on_error:
module.fail_json(msg=msg)
else:
@@ -286,7 +286,7 @@ class CertificateSet:
)
self.certificate_by_cert: dict[cryptography.x509.Certificate, Certificate] = {}
def _load_file(self, path: str | os.PathLike) -> None:
def _load_file(self, path: bytes | str | os.PathLike) -> None:
certs = load_pem_list(self.module, path, fail_on_error=False)
for cert in certs:
self.certificates.add(cert)

View File

@@ -6,6 +6,7 @@ plugins/module_utils/_acme/backend_openssl_cli.py pep8:E704
plugins/module_utils/_acme/certificate.py pep8:E704
plugins/module_utils/_crypto/cryptography_support.py pep8:E704
plugins/module_utils/_crypto/module_backends/certificate.py no-assert
plugins/module_utils/_crypto/module_backends/certificate_acme.py no-assert
plugins/module_utils/_crypto/module_backends/certificate_ownca.py no-assert
plugins/module_utils/_crypto/module_backends/certificate_selfsigned.py no-assert
plugins/module_utils/_crypto/module_backends/csr.py no-assert

View File

@@ -5,6 +5,7 @@ plugins/module_utils/_acme/backend_openssl_cli.py pep8:E704
plugins/module_utils/_acme/certificate.py pep8:E704
plugins/module_utils/_crypto/cryptography_support.py pep8:E704
plugins/module_utils/_crypto/module_backends/certificate.py no-assert
plugins/module_utils/_crypto/module_backends/certificate_acme.py no-assert
plugins/module_utils/_crypto/module_backends/certificate_ownca.py no-assert
plugins/module_utils/_crypto/module_backends/certificate_selfsigned.py no-assert
plugins/module_utils/_crypto/module_backends/csr.py no-assert

View File

@@ -1,5 +1,6 @@
meta/runtime.yml runtime-metadata # Bug in ansible-test: https://github.com/ansible/ansible/pull/85198
plugins/module_utils/_crypto/module_backends/certificate.py no-assert
plugins/module_utils/_crypto/module_backends/certificate_acme.py no-assert
plugins/module_utils/_crypto/module_backends/certificate_ownca.py no-assert
plugins/module_utils/_crypto/module_backends/certificate_selfsigned.py no-assert
plugins/module_utils/_crypto/module_backends/csr.py no-assert