Improve type hinting. (#914)

This commit is contained in:
Felix Fontein
2025-06-08 20:48:58 +02:00
committed by GitHub
parent a6b5884fc6
commit e90d4d2b0f
6 changed files with 73 additions and 21 deletions

View File

@@ -1,3 +1,7 @@
minor_changes:
- "Various code refactorings (https://github.com/ansible-collections/community.crypto/pull/905, https://github.com/ansible-collections/community.crypto/pull/909, https://github.com/ansible-collections/community.crypto/pull/911, https://github.com/ansible-collections/community.crypto/pull/913)."
- "Various code refactorings (https://github.com/ansible-collections/community.crypto/pull/905,
https://github.com/ansible-collections/community.crypto/pull/909,
https://github.com/ansible-collections/community.crypto/pull/911,
https://github.com/ansible-collections/community.crypto/pull/913,
https://github.com/ansible-collections/community.crypto/pull/914)."
- "Remove various no longer needed abstraction layers for multiple backends (https://github.com/ansible-collections/community.crypto/pull/912)."

View File

@@ -146,7 +146,15 @@ class ACMEDirectory:
self.directory_root = module.params["acme_directory"]
self.version = module.params["acme_version"]
self.directory, dummy = client.get_request(self.directory_root, get_only=True)
directory, info = client.get_request(self.directory_root, get_only=True)
if not isinstance(directory, dict):
raise ACMEProtocolException(
module=module,
msg=f"ACME directory is not a dictionary, but {type(directory)}",
info=info,
content_json=directory,
)
self.directory = directory
self.request_timeout = module.params["request_timeout"]
@@ -373,7 +381,7 @@ class ACMEClient:
fail_on_error: bool = True,
error_msg: str | None = None,
expected_status_codes: t.Iterable[int] | None = None,
) -> tuple[dict[str, t.Any] | bytes, dict[str, t.Any]]: ...
) -> tuple[object | bytes, dict[str, t.Any]]: ...
@t.overload
def send_signed_request(
@@ -388,7 +396,7 @@ class ACMEClient:
fail_on_error: bool = True,
error_msg: str | None = None,
expected_status_codes: t.Iterable[int] | None = None,
) -> tuple[dict[str, t.Any] | bytes, dict[str, t.Any]]: ...
) -> tuple[object | bytes, dict[str, t.Any]]: ...
@t.overload
def send_signed_request(
@@ -432,7 +440,7 @@ class ACMEClient:
fail_on_error: bool = True,
error_msg: str | None = None,
expected_status_codes: t.Iterable[int] | None = None,
) -> tuple[dict[str, t.Any] | bytes, dict[str, t.Any]]:
) -> tuple[object | bytes, dict[str, t.Any]]:
"""
Sends a JWS signed HTTP POST request to the ACME server and returns
the response as dictionary (if parse_json_result is True) or in raw form
@@ -483,7 +491,7 @@ class ACMEClient:
failed_tries += 1
continue
_assert_fetch_url_success(module=self.module, response=resp, info=info)
result = {}
result: object | bytes = {}
try:
# In Python 2, reading from a closed response yields a TypeError.
@@ -504,13 +512,12 @@ class ACMEClient:
self._log("parsed result", data=decoded_result)
# In case of badNonce error, try again (up to 5 times)
# (https://tools.ietf.org/html/rfc8555#section-6.7)
if all(
(
400 <= info["status"] < 600,
decoded_result.get("type")
== "urn:ietf:params:acme:error:badNonce",
failed_tries <= 5,
)
if (
400 <= info["status"] < 600
and failed_tries <= 5
and isinstance(decoded_result, dict)
and decoded_result.get("type")
== "urn:ietf:params:acme:error:badNonce"
):
failed_tries += 1
continue
@@ -548,7 +555,7 @@ class ACMEClient:
fail_on_error: bool = True,
error_msg: str | None = None,
expected_status_codes: t.Iterable[int] | None = None,
) -> tuple[dict[str, t.Any], dict[str, t.Any]]: ...
) -> tuple[object, dict[str, t.Any]]: ...
@t.overload
def get_request(
@@ -573,7 +580,7 @@ class ACMEClient:
fail_on_error: bool = True,
error_msg: str | None = None,
expected_status_codes: t.Iterable[int] | None = None,
) -> tuple[dict[str, t.Any] | bytes, dict[str, t.Any]]:
) -> tuple[object | bytes, dict[str, t.Any]]:
"""
Perform a GET-like request. Will try POST-as-GET for ACMEv2, with fallback
to GET if server replies with a status code of 405.
@@ -623,7 +630,7 @@ class ACMEClient:
# Process result
parsed_json_result = False
result: dict[str, t.Any] | bytes
result: object | bytes
if parse_json_result:
result = {}
if content:
@@ -681,6 +688,13 @@ class ACMEClient:
data, info = self.get_request(
url, parse_json_result=True, fail_on_error=True, get_only=True
)
if not isinstance(data, dict):
raise ACMEProtocolException(
module=self.module,
msg="Unexpected renewal information",
info=info,
content_json=data,
)
# Include Retry-After header if asked for
if include_retry_after and "retry-after" in info:

View File

@@ -258,7 +258,14 @@ class Authorization:
return self.data.copy()
def refresh(self, *, client: ACMEClient) -> bool:
result, dummy = client.get_request(self.url)
result, info = client.get_request(self.url)
if not isinstance(result, dict):
raise ACMEProtocolException(
module=client.module,
msg="Unexpected authorization data",
info=info,
content_json=result,
)
changed = self.data != result
self._setup(client=client, data=result)
return changed

View File

@@ -72,7 +72,7 @@ class ACMEProtocolException(ModuleFailException):
info: dict[str, t.Any] | None = None,
response=None,
content: bytes | None = None,
content_json: dict[str, t.Any] | bytes | None = None,
content_json: object | bytes | None = None,
extras: dict[str, t.Any] | None = None,
):
# Try to get hold of content, if response is given and content is not provided
@@ -99,7 +99,9 @@ class ACMEProtocolException(ModuleFailException):
# Try to get hold of JSON decoded content, when content is given and JSON not provided
if content_json_json is None and content is not None and module is not None:
try:
content_json_json = module.from_json(to_text(content))
cjj = module.from_json(to_text(content))
if isinstance(cjj, dict):
content_json_json = cjj
except Exception:
pass

View File

@@ -173,7 +173,14 @@ class Order:
raise
def refresh(self, *, client: ACMEClient) -> bool:
result, dummy = client.get_request(self.url)
result, info = client.get_request(self.url)
if not isinstance(result, dict):
raise ACMEProtocolException(
module=client.module,
msg="Unexpected authorization data",
info=info,
content_json=result,
)
changed = self.data != result
self._setup(client=client, data=result)
return changed

View File

@@ -215,6 +215,7 @@ from ansible_collections.community.crypto.plugins.module_utils._acme.acme import
create_default_argspec,
)
from ansible_collections.community.crypto.plugins.module_utils._acme.errors import (
ACMEProtocolException,
ModuleFailException,
)
from ansible_collections.community.crypto.plugins.module_utils._acme.utils import (
@@ -239,6 +240,13 @@ def get_orders_list(
res, info = client.get_request(
next_orders_url, parse_json_result=True, fail_on_error=True
)
if not isinstance(res, dict):
raise ACMEProtocolException(
module=module,
msg="Unexpected account information",
info=info,
content_json=res,
)
if not res.get("orders"):
if orders:
module.warn(
@@ -267,7 +275,17 @@ def get_order(client: ACMEClient, order_url: str) -> dict[str, t.Any]:
"""
Retrieve order data.
"""
return client.get_request(order_url, parse_json_result=True, fail_on_error=True)[0]
result, info = client.get_request(
order_url, parse_json_result=True, fail_on_error=True
)
if not isinstance(result, dict):
raise ACMEProtocolException(
module=client.module,
msg="Unexpected order data",
info=info,
content_json=result,
)
return result
def main() -> t.NoReturn: