mirror of
https://github.com/ansible-collections/community.crypto.git
synced 2026-03-26 21:33:25 +00:00
Replace to_native with to_text. (#897)
This commit is contained in:
@@ -277,7 +277,7 @@ _value:
|
||||
import typing as t
|
||||
|
||||
from ansible.errors import AnsibleFilterError
|
||||
from ansible.module_utils.common.text.converters import to_bytes, to_native
|
||||
from ansible.module_utils.common.text.converters import to_bytes, to_text
|
||||
from ansible_collections.community.crypto.plugins.module_utils._crypto.basic import (
|
||||
OpenSSLObjectError,
|
||||
)
|
||||
@@ -301,7 +301,7 @@ def openssl_csr_info_filter(
|
||||
raise AnsibleFilterError(
|
||||
f"The name_encoding option must be of a text type, not {type(name_encoding)}"
|
||||
)
|
||||
name_encoding = to_native(name_encoding)
|
||||
name_encoding = to_text(name_encoding)
|
||||
if name_encoding not in ("ignore", "idna", "unicode"):
|
||||
raise AnsibleFilterError(
|
||||
f'The name_encoding option must be one of the values "ignore", "idna", or "unicode", not "{name_encoding}"'
|
||||
|
||||
@@ -42,7 +42,7 @@ _value:
|
||||
import typing as t
|
||||
|
||||
from ansible.errors import AnsibleFilterError
|
||||
from ansible.module_utils.common.text.converters import to_native
|
||||
from ansible.module_utils.common.text.converters import to_text
|
||||
from ansible_collections.community.crypto.plugins.module_utils._serial import (
|
||||
parse_serial,
|
||||
)
|
||||
@@ -54,7 +54,7 @@ def parse_serial_filter(serial_str: str | bytes) -> int:
|
||||
f"The input for the community.crypto.parse_serial filter must be a string; got {type(serial_str)} instead"
|
||||
)
|
||||
try:
|
||||
return parse_serial(to_native(serial_str))
|
||||
return parse_serial(to_text(serial_str))
|
||||
except ValueError as exc:
|
||||
raise AnsibleFilterError(str(exc)) from exc
|
||||
|
||||
|
||||
@@ -311,7 +311,7 @@ _value:
|
||||
import typing as t
|
||||
|
||||
from ansible.errors import AnsibleFilterError
|
||||
from ansible.module_utils.common.text.converters import to_bytes, to_native
|
||||
from ansible.module_utils.common.text.converters import to_bytes, to_text
|
||||
from ansible_collections.community.crypto.plugins.module_utils._crypto.basic import (
|
||||
OpenSSLObjectError,
|
||||
)
|
||||
@@ -335,7 +335,7 @@ def x509_certificate_info_filter(
|
||||
raise AnsibleFilterError(
|
||||
f"The name_encoding option must be of a text type, not {type(name_encoding)}"
|
||||
)
|
||||
name_encoding = to_native(name_encoding)
|
||||
name_encoding = to_text(name_encoding)
|
||||
if name_encoding not in ("ignore", "idna", "unicode"):
|
||||
raise AnsibleFilterError(
|
||||
f'The name_encoding option must be one of the values "ignore", "idna", or "unicode", not "{name_encoding}"'
|
||||
|
||||
@@ -158,7 +158,7 @@ import binascii
|
||||
import typing as t
|
||||
|
||||
from ansible.errors import AnsibleFilterError
|
||||
from ansible.module_utils.common.text.converters import to_bytes, to_native
|
||||
from ansible.module_utils.common.text.converters import to_bytes, to_text
|
||||
from ansible_collections.community.crypto.plugins.module_utils._crypto.basic import (
|
||||
OpenSSLObjectError,
|
||||
)
|
||||
@@ -191,7 +191,7 @@ def x509_crl_info_filter(
|
||||
raise AnsibleFilterError(
|
||||
f"The list_revoked_certificates option must be a boolean, not {type(list_revoked_certificates)}"
|
||||
)
|
||||
name_encoding = to_native(name_encoding)
|
||||
name_encoding = to_text(name_encoding)
|
||||
if name_encoding not in ("ignore", "idna", "unicode"):
|
||||
raise AnsibleFilterError(
|
||||
f'The name_encoding option must be one of the values "ignore", "idna", or "unicode", not "{name_encoding}"'
|
||||
@@ -200,7 +200,7 @@ def x509_crl_info_filter(
|
||||
data_bytes = to_bytes(data)
|
||||
if not identify_pem_format(data_bytes):
|
||||
try:
|
||||
data_bytes = base64.b64decode(to_native(data_bytes))
|
||||
data_bytes = base64.b64decode(to_text(data_bytes))
|
||||
except (binascii.Error, TypeError, ValueError, UnicodeEncodeError):
|
||||
pass
|
||||
|
||||
|
||||
@@ -46,7 +46,7 @@ import os
|
||||
import typing as t
|
||||
|
||||
from ansible.errors import AnsibleLookupError
|
||||
from ansible.module_utils.common.text.converters import to_native
|
||||
from ansible.module_utils.common.text.converters import to_text
|
||||
from ansible.plugins.lookup import LookupBase
|
||||
from ansible_collections.community.crypto.plugins.module_utils._gnupg.cli import (
|
||||
GPGError,
|
||||
@@ -72,7 +72,7 @@ class LookupModule(LookupBase):
|
||||
f"Lookup parameter #{i} should be string or a path object, but got {type(path)}"
|
||||
)
|
||||
result.append(
|
||||
get_fingerprint_from_file(gpg_runner=gpg, path=to_native(path))
|
||||
get_fingerprint_from_file(gpg_runner=gpg, path=to_text(path))
|
||||
)
|
||||
return result
|
||||
except GPGError as exc:
|
||||
|
||||
@@ -14,7 +14,7 @@ import os
|
||||
import traceback
|
||||
import typing as t
|
||||
|
||||
from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text
|
||||
from ansible.module_utils.common.text.converters import to_bytes, to_text
|
||||
from ansible_collections.community.crypto.plugins.module_utils._acme.backends import (
|
||||
CertificateInformation,
|
||||
CryptoBackend,
|
||||
@@ -120,14 +120,14 @@ class CryptographyChainMatcher(ChainMatcher):
|
||||
self.issuer: list[tuple[cryptography.x509.oid.ObjectIdentifier, str]] = []
|
||||
if criterium.subject:
|
||||
self.subject = [
|
||||
(cryptography_name_to_oid(k), to_native(v))
|
||||
(cryptography_name_to_oid(k), to_text(v))
|
||||
for k, v in parse_name_field(
|
||||
criterium.subject, name_field_name="subject"
|
||||
)
|
||||
]
|
||||
if criterium.issuer:
|
||||
self.issuer = [
|
||||
(cryptography_name_to_oid(k), to_native(v))
|
||||
(cryptography_name_to_oid(k), to_text(v))
|
||||
for k, v in parse_name_field(criterium.issuer, name_field_name="issuer")
|
||||
]
|
||||
self.subject_key_identifier = CryptographyChainMatcher._parse_key_identifier(
|
||||
@@ -153,7 +153,7 @@ class CryptographyChainMatcher(ChainMatcher):
|
||||
for oid, value in match_subject:
|
||||
found = False
|
||||
for attribute in x509_subject:
|
||||
if attribute.oid == oid and value == to_native(attribute.value):
|
||||
if attribute.oid == oid and value == to_text(attribute.value):
|
||||
found = True
|
||||
break
|
||||
if not found:
|
||||
|
||||
@@ -12,7 +12,7 @@ import datetime
|
||||
import os
|
||||
import typing as t
|
||||
|
||||
from ansible.module_utils.common.text.converters import to_bytes, to_native
|
||||
from ansible.module_utils.common.text.converters import to_bytes, to_text
|
||||
from ansible_collections.community.crypto.plugins.module_utils._crypto.cryptography_support import (
|
||||
CRYPTOGRAPHY_TIMEZONE,
|
||||
get_not_valid_after,
|
||||
@@ -117,7 +117,7 @@ class EntrustCertificateBackend(CertificateBackend):
|
||||
# Read the CSR that was generated for us
|
||||
if self.csr_content is not None:
|
||||
# csr_content contains bytes
|
||||
body["csr"] = to_native(self.csr_content)
|
||||
body["csr"] = to_text(self.csr_content)
|
||||
else:
|
||||
assert self.csr_path is not None
|
||||
with open(self.csr_path, "r", encoding="utf-8") as csr_file:
|
||||
|
||||
@@ -13,7 +13,7 @@ import abc
|
||||
import binascii
|
||||
import typing as t
|
||||
|
||||
from ansible.module_utils.common.text.converters import to_native
|
||||
from ansible.module_utils.common.text.converters import to_text
|
||||
from ansible_collections.community.crypto.plugins.module_utils._crypto.cryptography_support import (
|
||||
CRYPTOGRAPHY_TIMEZONE,
|
||||
cryptography_decode_name,
|
||||
@@ -202,7 +202,7 @@ class CertificateInfoRetrieval(metaclass=abc.ABCMeta):
|
||||
with_timezone=CRYPTOGRAPHY_TIMEZONE
|
||||
)
|
||||
|
||||
result["public_key"] = to_native(self._get_public_key_pem())
|
||||
result["public_key"] = to_text(self._get_public_key_pem())
|
||||
|
||||
public_key_info = get_publickey_info(
|
||||
module=self.module,
|
||||
@@ -264,7 +264,7 @@ class CertificateInfoRetrievalCryptography(CertificateInfoRetrieval):
|
||||
result: list[list[str]] = []
|
||||
for attribute in self.cert.subject:
|
||||
result.append(
|
||||
[cryptography_oid_to_name(attribute.oid), to_native(attribute.value)]
|
||||
[cryptography_oid_to_name(attribute.oid), to_text(attribute.value)]
|
||||
)
|
||||
return result
|
||||
|
||||
@@ -272,7 +272,7 @@ class CertificateInfoRetrievalCryptography(CertificateInfoRetrieval):
|
||||
result = []
|
||||
for attribute in self.cert.issuer:
|
||||
result.append(
|
||||
[cryptography_oid_to_name(attribute.oid), to_native(attribute.value)]
|
||||
[cryptography_oid_to_name(attribute.oid), to_text(attribute.value)]
|
||||
)
|
||||
return result
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@ import abc
|
||||
import binascii
|
||||
import typing as t
|
||||
|
||||
from ansible.module_utils.common.text.converters import to_native
|
||||
from ansible.module_utils.common.text.converters import to_text
|
||||
from ansible_collections.community.crypto.plugins.module_utils._crypto.cryptography_support import (
|
||||
cryptography_decode_name,
|
||||
cryptography_get_extensions_from_csr,
|
||||
@@ -154,7 +154,7 @@ class CSRInfoRetrieval(metaclass=abc.ABCMeta):
|
||||
result["name_constraints_critical"],
|
||||
) = self._get_name_constraints()
|
||||
|
||||
result["public_key"] = to_native(self._get_public_key_pem())
|
||||
result["public_key"] = to_text(self._get_public_key_pem())
|
||||
|
||||
public_key_info = get_publickey_info(
|
||||
module=self.module,
|
||||
@@ -210,7 +210,7 @@ class CSRInfoRetrievalCryptography(CSRInfoRetrieval):
|
||||
result: list[list[str]] = []
|
||||
for attribute in self.csr.subject:
|
||||
result.append(
|
||||
[cryptography_oid_to_name(attribute.oid), to_native(attribute.value)]
|
||||
[cryptography_oid_to_name(attribute.oid), to_text(attribute.value)]
|
||||
)
|
||||
return result
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ from __future__ import annotations
|
||||
import abc
|
||||
import typing as t
|
||||
|
||||
from ansible.module_utils.common.text.converters import to_bytes, to_native
|
||||
from ansible.module_utils.common.text.converters import to_bytes, to_text
|
||||
from ansible_collections.community.crypto.plugins.module_utils._crypto.basic import (
|
||||
OpenSSLObjectError,
|
||||
)
|
||||
@@ -261,7 +261,7 @@ class PrivateKeyInfoRetrieval(metaclass=abc.ABCMeta):
|
||||
except OpenSSLObjectError as exc:
|
||||
raise PrivateKeyParseError(str(exc), result=result) from exc
|
||||
|
||||
result["public_key"] = to_native(self._get_public_key(binary=False))
|
||||
result["public_key"] = to_text(self._get_public_key(binary=False))
|
||||
pk = self._get_public_key(binary=True)
|
||||
result["public_key_fingerprints"] = (
|
||||
get_fingerprint_of_bytes(pk, prefer_one=prefer_one_fingerprint)
|
||||
|
||||
@@ -22,7 +22,7 @@ from urllib.error import HTTPError
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from ansible.module_utils.basic import missing_required_lib
|
||||
from ansible.module_utils.common.text.converters import to_native, to_text
|
||||
from ansible.module_utils.common.text.converters import to_text
|
||||
from ansible.module_utils.urls import Request
|
||||
|
||||
|
||||
@@ -67,8 +67,8 @@ class RestOperationException(Exception):
|
||||
"""Encapsulate a REST API error"""
|
||||
|
||||
def __init__(self, error: dict[str, t.Any]) -> None:
|
||||
self.status = to_native(error.get("status", None))
|
||||
self.errors = [to_native(err.get("message")) for err in error.get("errors", {})]
|
||||
self.status = to_text(error.get("status", None))
|
||||
self.errors = [to_text(err.get("message")) for err in error.get("errors", {})]
|
||||
self.message = " ".join(self.errors)
|
||||
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from ansible.module_utils.common.text.converters import to_native
|
||||
from ansible.module_utils.common.text.converters import to_text
|
||||
from ansible_collections.community.crypto.plugins.module_utils._crypto.math import (
|
||||
convert_int_to_hex,
|
||||
)
|
||||
@@ -31,7 +31,7 @@ def parse_serial(value: str | bytes) -> int:
|
||||
"""
|
||||
Given a colon-separated string of hexadecimal byte values, converts it to an integer.
|
||||
"""
|
||||
value_str = to_native(value)
|
||||
value_str = to_text(value)
|
||||
result = 0
|
||||
for i, part in enumerate(value_str.split(":")):
|
||||
try:
|
||||
|
||||
@@ -10,7 +10,7 @@ from __future__ import annotations
|
||||
import datetime
|
||||
import re
|
||||
|
||||
from ansible.module_utils.common.text.converters import to_native
|
||||
from ansible.module_utils.common.text.converters import to_text
|
||||
from ansible_collections.community.crypto.plugins.module_utils._crypto.basic import (
|
||||
OpenSSLObjectError,
|
||||
)
|
||||
@@ -118,7 +118,7 @@ def get_relative_time_option(
|
||||
|
||||
The return value will be a datetime object.
|
||||
"""
|
||||
result = to_native(input_string)
|
||||
result = to_text(input_string)
|
||||
if result is None:
|
||||
raise OpenSSLObjectError(
|
||||
f'The timespec "{input_string}" for {input_name} is not valid'
|
||||
|
||||
@@ -225,7 +225,7 @@ output_json:
|
||||
|
||||
import typing as t
|
||||
|
||||
from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text
|
||||
from ansible.module_utils.common.text.converters import to_bytes, to_text
|
||||
from ansible_collections.community.crypto.plugins.module_utils._acme.acme import (
|
||||
ACMEClient,
|
||||
create_backend,
|
||||
@@ -291,7 +291,7 @@ def main() -> t.NoReturn:
|
||||
result.update(
|
||||
{
|
||||
"headers": info,
|
||||
"output_text": to_native(data),
|
||||
"output_text": to_text(data),
|
||||
}
|
||||
)
|
||||
# See if we can parse the result as JSON
|
||||
|
||||
@@ -279,7 +279,7 @@ from ssl import (
|
||||
)
|
||||
|
||||
from ansible.module_utils.basic import AnsibleModule
|
||||
from ansible.module_utils.common.text.converters import to_bytes, to_native
|
||||
from ansible.module_utils.common.text.converters import to_bytes, to_text
|
||||
from ansible_collections.community.crypto.plugins.module_utils._crypto.cryptography_support import (
|
||||
CRYPTOGRAPHY_TIMEZONE,
|
||||
cryptography_get_extensions_from_cert,
|
||||
@@ -419,7 +419,7 @@ def main() -> t.NoReturn:
|
||||
# If the item is a string_type
|
||||
if isinstance(tls_ctx_option, (str, bytes)):
|
||||
# Convert tls_ctx_option to a native string
|
||||
tls_ctx_option_str = to_native(tls_ctx_option)
|
||||
tls_ctx_option_str = to_text(tls_ctx_option)
|
||||
# Get the tls_ctx_option_str attribute from ssl
|
||||
tls_ctx_option_attr = getattr(ssl, tls_ctx_option_str, None)
|
||||
# If tls_ctx_option_attr is an integer
|
||||
|
||||
@@ -424,7 +424,7 @@ import typing as t
|
||||
from base64 import b64decode
|
||||
|
||||
from ansible.module_utils.basic import AnsibleModule
|
||||
from ansible.module_utils.common.text.converters import to_bytes, to_native
|
||||
from ansible.module_utils.common.text.converters import to_bytes, to_text
|
||||
|
||||
|
||||
# used to get <luks-name> out of lsblk output in format 'crypt <luks-name>'
|
||||
@@ -487,7 +487,7 @@ class Handler:
|
||||
if self._passphrase_encoding == "text":
|
||||
return to_bytes(passphrase)
|
||||
try:
|
||||
return b64decode(to_native(passphrase))
|
||||
return b64decode(to_text(passphrase))
|
||||
except Exception as exc:
|
||||
self._module.fail_json(
|
||||
f"Error while base64-decoding '{parameter_name}': {exc}"
|
||||
|
||||
@@ -135,7 +135,7 @@ import tempfile
|
||||
import typing as t
|
||||
|
||||
from ansible.module_utils.basic import AnsibleModule
|
||||
from ansible.module_utils.common.text.converters import to_native
|
||||
from ansible.module_utils.common.text.converters import to_text
|
||||
from ansible_collections.community.crypto.plugins.module_utils._crypto.math import (
|
||||
count_bits,
|
||||
)
|
||||
@@ -298,7 +298,7 @@ class DHParameterOpenSSL(DHParameterBase):
|
||||
self.path,
|
||||
]
|
||||
rc, out, err = module.run_command(command, check_rc=False)
|
||||
result = to_native(out)
|
||||
result = to_text(out)
|
||||
if rc != 0:
|
||||
# If the call failed the file probably does not exist or is
|
||||
# unreadable
|
||||
@@ -311,7 +311,7 @@ class DHParameterOpenSSL(DHParameterBase):
|
||||
bits = int(match.group(1))
|
||||
|
||||
# if output contains "WARNING" we've got a problem
|
||||
if "WARNING" in result or "WARNING" in to_native(err):
|
||||
if "WARNING" in result or "WARNING" in to_text(err):
|
||||
return False
|
||||
|
||||
return bits == self.size
|
||||
|
||||
@@ -282,7 +282,7 @@ import traceback
|
||||
import typing as t
|
||||
|
||||
from ansible.module_utils.basic import AnsibleModule
|
||||
from ansible.module_utils.common.text.converters import to_bytes, to_native
|
||||
from ansible.module_utils.common.text.converters import to_bytes, to_text
|
||||
from ansible_collections.community.crypto.plugins.module_utils._crypto.basic import (
|
||||
OpenSSLBadPassphraseError,
|
||||
OpenSSLObjectError,
|
||||
@@ -559,7 +559,7 @@ class Pkcs(OpenSSLObject):
|
||||
expected_content = to_bytes(
|
||||
"".join(
|
||||
[
|
||||
to_native(pem)
|
||||
to_text(pem)
|
||||
for pem in [pkey, cert] + other_certs
|
||||
if pem is not None
|
||||
]
|
||||
@@ -848,7 +848,7 @@ def main() -> t.NoReturn:
|
||||
pkey, cert, other_certs, _friendly_name = pkcs12.parse()
|
||||
dump_content = "".join(
|
||||
[
|
||||
to_native(pem)
|
||||
to_text(pem)
|
||||
for pem in [pkey, cert] + other_certs
|
||||
if pem is not None
|
||||
]
|
||||
|
||||
@@ -11,7 +11,7 @@ import typing as t
|
||||
from subprocess import PIPE, Popen
|
||||
|
||||
from ansible.module_utils.common.process import get_bin_path
|
||||
from ansible.module_utils.common.text.converters import to_native
|
||||
from ansible.module_utils.common.text.converters import to_text
|
||||
from ansible_collections.community.crypto.plugins.module_utils._gnupg.cli import (
|
||||
GPGError,
|
||||
GPGRunner,
|
||||
@@ -51,8 +51,8 @@ class PluginGPGRunner(GPGRunner):
|
||||
command, shell=False, cwd=self.cwd, stdin=PIPE, stdout=PIPE, stderr=PIPE
|
||||
) as p:
|
||||
stdout, stderr = p.communicate(input=data)
|
||||
stdout_n = to_native(stdout, errors="surrogate_or_replace")
|
||||
stderr_n = to_native(stderr, errors="surrogate_or_replace")
|
||||
stdout_n = to_text(stdout, errors="surrogate_or_replace")
|
||||
stderr_n = to_text(stderr, errors="surrogate_or_replace")
|
||||
if check_rc and p.returncode != 0:
|
||||
raise GPGError(
|
||||
f'Running {" ".join(command)} yielded return code {p.returncode} with stdout: "{stdout_n}" and stderr: "{stderr_n}")'
|
||||
|
||||
Reference in New Issue
Block a user