Get rid of some to_native and to_text calls. (#880)

This commit is contained in:
Felix Fontein
2025-05-02 15:58:39 +02:00
committed by GitHub
parent 0b8f3306c7
commit 86db561193
46 changed files with 100 additions and 140 deletions

View File

@@ -14,7 +14,7 @@ import re
import tempfile
import traceback
from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text
from ansible.module_utils.common.text.converters import to_bytes, to_text
from ansible_collections.community.crypto.plugins.module_utils.acme.backends import (
CertificateInformation,
CryptoBackend,
@@ -140,7 +140,7 @@ class OpenSSLCLIBackend(CryptoBackend):
)
if rc != 0:
raise BackendException(
f"Error while running {' '.join(openssl_keydump_cmd)}: {to_text(err)}"
f"Error while running {' '.join(openssl_keydump_cmd)}: {err}"
)
out_text = to_text(out, errors="surrogate_or_strict")
@@ -227,9 +227,9 @@ class OpenSSLCLIBackend(CryptoBackend):
def sign(self, payload64, protected64, key_data):
sign_payload = f"{protected64}.{payload64}".encode("utf8")
if key_data["type"] == "hmac":
hex_key = to_native(
hex_key = (
binascii.hexlify(base64.urlsafe_b64decode(key_data["jwk"]["k"]))
)
).decode("ascii")
cmd_postfix = [
"-mac",
"hmac",
@@ -254,7 +254,7 @@ class OpenSSLCLIBackend(CryptoBackend):
)
if rc != 0:
raise BackendException(
f"Error while running {' '.join(openssl_sign_cmd)}: {to_text(err)}"
f"Error while running {' '.join(openssl_sign_cmd)}: {err}"
)
if key_data["type"] == "ec":
@@ -317,7 +317,7 @@ class OpenSSLCLIBackend(CryptoBackend):
@staticmethod
def _normalize_ip(ip):
try:
return to_native(ipaddress.ip_address(to_text(ip)).compressed)
return ipaddress.ip_address(to_text(ip)).compressed
except ValueError:
# We do not want to error out on something IPAddress() cannot parse
return ip
@@ -354,7 +354,7 @@ class OpenSSLCLIBackend(CryptoBackend):
)
if rc != 0:
raise BackendException(
f"Error while running {' '.join(openssl_csr_cmd)}: {to_text(err)}"
f"Error while running {' '.join(openssl_csr_cmd)}: {err}"
)
identifiers = set()
@@ -439,7 +439,7 @@ class OpenSSLCLIBackend(CryptoBackend):
)
if rc != 0:
raise BackendException(
f"Error while running {' '.join(openssl_cert_cmd)}: {to_text(err)}"
f"Error while running {' '.join(openssl_cert_cmd)}: {err}"
)
out_text = to_text(out, errors="surrogate_or_strict")
@@ -490,7 +490,7 @@ class OpenSSLCLIBackend(CryptoBackend):
)
if rc != 0:
raise BackendException(
f"Error while running {' '.join(openssl_cert_cmd)}: {to_text(err)}"
f"Error while running {' '.join(openssl_cert_cmd)}: {err}"
)
out_text = to_text(out, errors="surrogate_or_strict")

View File

@@ -11,7 +11,6 @@ import re
from collections import namedtuple
from ansible.module_utils import six
from ansible.module_utils.common.text.converters import to_native
from ansible_collections.community.crypto.plugins.module_utils.acme.errors import (
BackendException,
)
@@ -106,7 +105,7 @@ class CryptoBackend:
value, name, backend="cryptography", with_timezone=self._with_timezone
)
except OpenSSLObjectError as exc:
raise BackendException(to_native(exc))
raise BackendException(str(exc))
def interpolate_timestamp(self, timestamp_start, timestamp_end, percentage):
start = get_epoch_seconds(timestamp_start)

View File

@@ -11,7 +11,6 @@ import shutil
import tempfile
import traceback
from ansible.module_utils.common.text.converters import to_native
from ansible_collections.community.crypto.plugins.module_utils.acme.errors import (
ModuleFailException,
)
@@ -83,7 +82,7 @@ def write_file(module, dest, content):
except Exception as err:
os.remove(tmpsrc)
raise ModuleFailException(
f"failed to copy {tmpsrc} to {dest}: {to_native(err)}",
f"failed to copy {tmpsrc} to {dest}: {err}",
exception=traceback.format_exc(),
)
os.remove(tmpsrc)

View File

@@ -12,7 +12,6 @@ import textwrap
import traceback
from urllib.parse import unquote
from ansible.module_utils.common.text.converters import to_native
from ansible_collections.community.crypto.plugins.module_utils.acme.errors import (
ModuleFailException,
)
@@ -51,7 +50,7 @@ def pem_to_der(pem_filename=None, pem_content=None):
lines = list(f)
except Exception as err:
raise ModuleFailException(
f"cannot load PEM file {pem_filename}: {to_native(err)}",
f"cannot load PEM file {pem_filename}: {err}",
exception=traceback.format_exc(),
)
else:
@@ -126,15 +125,17 @@ def compute_cert_id(
raise ModuleFailException(
"Certificate has no Authority Key Identifier extension"
)
aki = to_native(
base64.urlsafe_b64encode(cert_info.authority_key_identifier)
).replace("=", "")
aki = (
(base64.urlsafe_b64encode(cert_info.authority_key_identifier))
.decode("ascii")
.replace("=", "")
)
# Convert serial number to string
serial_bytes = convert_int_to_bytes(cert_info.serial_number)
if ord(serial_bytes[:1]) >= 128:
serial_bytes = b"\x00" + serial_bytes
serial = to_native(base64.urlsafe_b64encode(serial_bytes)).replace("=", "")
serial = (base64.urlsafe_b64encode(serial_bytes)).decode("ascii").replace("=", "")
# Compose cert ID
return f"{aki}.{serial}"

View File

@@ -15,7 +15,7 @@ from urllib.parse import (
urlunparse,
)
from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text
from ansible.module_utils.common.text.converters import to_bytes, to_text
from ansible_collections.community.crypto.plugins.module_utils.version import (
LooseVersion,
)
@@ -95,7 +95,7 @@ def cryptography_get_extensions_from_cert(cert):
for ext in cert.extensions:
result[ext.oid.dotted_string] = dict(
critical=ext.critical,
value=to_native(base64.b64encode(ext.value.public_bytes())),
value=base64.b64encode(ext.value.public_bytes()).decode("ascii"),
)
else:
# Since cryptography will not give us the DER value for an extension
@@ -120,7 +120,7 @@ def cryptography_get_extensions_from_cert(cert):
der = backend._ffi.buffer(data.data, data.length)[:]
entry = dict(
critical=(crit == 1),
value=to_native(base64.b64encode(der)),
value=base64.b64encode(der).decode("ascii"),
)
try:
oid = obj2txt(
@@ -142,7 +142,7 @@ def cryptography_get_extensions_from_csr(csr):
for ext in csr.extensions:
result[ext.oid.dotted_string] = dict(
critical=ext.critical,
value=to_native(base64.b64encode(ext.value.public_bytes())),
value=base64.b64encode(ext.value.public_bytes()).decode("ascii"),
)
else:
@@ -178,7 +178,7 @@ def cryptography_get_extensions_from_csr(csr):
der = backend._ffi.buffer(data.data, data.length)[:]
entry = dict(
critical=(crit == 1),
value=to_native(base64.b64encode(der)),
value=base64.b64encode(der).decode("ascii"),
)
try:
oid = obj2txt(

View File

@@ -9,7 +9,7 @@ import os
import tempfile
import traceback
from ansible.module_utils.common.text.converters import to_bytes, to_native
from ansible.module_utils.common.text.converters import to_bytes
from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.certificate import (
CertificateBackend,
CertificateError,
@@ -66,7 +66,7 @@ class AcmeCertificateBackend(CertificateBackend):
except Exception:
pass
self.module.fail_json(
msg=f"failed to create temporary CSR file: {to_native(err)}",
msg=f"failed to create temporary CSR file: {err}",
exception=traceback.format_exc(),
)
f.close()

View File

@@ -208,13 +208,13 @@ class CertificateInfoRetrieval:
ski = self._get_subject_key_identifier()
if ski is not None:
ski = to_native(binascii.hexlify(ski))
ski = binascii.hexlify(ski).decode("ascii")
ski = ":".join([ski[i : i + 2] for i in range(0, len(ski), 2)])
result["subject_key_identifier"] = ski
aki, aci, acsn = self._get_authority_key_identifier()
if aki is not None:
aki = to_native(binascii.hexlify(aki))
aki = binascii.hexlify(aki).decode("ascii")
aki = ":".join([aki[i : i + 2] for i in range(0, len(aki), 2)])
result["authority_key_identifier"] = aki
result["authority_cert_issuer"] = aci

View File

@@ -11,7 +11,7 @@ import traceback
from ansible.module_utils import six
from ansible.module_utils.basic import missing_required_lib
from ansible.module_utils.common.text.converters import to_native, to_text
from ansible.module_utils.common.text.converters import to_text
from ansible_collections.community.crypto.plugins.module_utils.argspec import (
ArgumentSpec,
)
@@ -157,7 +157,7 @@ class CertificateSigningRequestBackend:
)
self.ordered_subject = True
except ValueError as exc:
raise CertificateSigningRequestError(to_native(exc))
raise CertificateSigningRequestError(str(exc))
self.using_common_name_for_san = False
if not self.subjectAltName and module.params["use_common_name_for_san"]:

View File

@@ -157,13 +157,13 @@ class CSRInfoRetrieval:
ski = self._get_subject_key_identifier()
if ski is not None:
ski = to_native(binascii.hexlify(ski))
ski = binascii.hexlify(ski).decode("ascii")
ski = ":".join([ski[i : i + 2] for i in range(0, len(ski), 2)])
result["subject_key_identifier"] = ski
aki, aci, acsn = self._get_authority_key_identifier()
if aki is not None:
aki = to_native(binascii.hexlify(aki))
aki = binascii.hexlify(aki).decode("ascii")
aki = ":".join([aki[i : i + 2] for i in range(0, len(aki), 2)])
result["authority_key_identifier"] = aki
result["authority_cert_issuer"] = aci

View File

@@ -229,7 +229,7 @@ class PrivateKeyInfoRetrieval:
)
result["can_parse_key"] = True
except OpenSSLObjectError as exc:
raise PrivateKeyParseError(to_native(exc), result)
raise PrivateKeyParseError(str(exc), result)
result["public_key"] = to_native(self._get_public_key(binary=False))
pk = self._get_public_key(binary=True)

View File

@@ -9,7 +9,6 @@ import traceback
from ansible.module_utils import six
from ansible.module_utils.basic import missing_required_lib
from ansible.module_utils.common.text.converters import to_native
from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
OpenSSLObjectError,
)
@@ -116,7 +115,7 @@ class PublicKeyInfoRetrieval:
try:
self.key = load_publickey(content=self.content, backend=self.backend)
except OpenSSLObjectError as e:
raise PublicKeyParseError(to_native(e), {})
raise PublicKeyParseError(str(e), {})
pk = self._get_public_key(binary=True)
result["fingerprints"] = (

View File

@@ -59,7 +59,7 @@ class RestOperationException(Exception):
def __init__(self, error):
self.status = to_native(error.get("status", None))
self.errors = [to_native(err.get("message")) for err in error.get("errors", {})]
self.message = to_native(" ".join(self.errors))
self.message = " ".join(self.errors)
def generate_docstring(operation_spec):
@@ -197,7 +197,7 @@ class Resource:
operation_name = "Patch"
else:
raise SessionConfigurationException(
to_native(f"Invalid REST method type {method}")
f"Invalid REST method type {method}"
)
# Get the non-parameter parts of the URL and append to the operation name
@@ -242,7 +242,7 @@ class ECSSession:
if self._config:
break
if self._config is None:
raise SessionConfigurationException(to_native("No Configuration Found."))
raise SessionConfigurationException("No Configuration Found.")
# set up auth if passed
entrust_api_user = self.get_config("entrust_api_user")
@@ -251,9 +251,7 @@ class ECSSession:
self.request.url_username = entrust_api_user
self.request.url_password = entrust_api_key
else:
raise SessionConfigurationException(
to_native("User and key must be provided.")
)
raise SessionConfigurationException("User and key must be provided.")
# set up client certificate if passed (support all-in one or cert + key)
entrust_api_cert = self.get_config("entrust_api_cert")
@@ -264,9 +262,7 @@ class ECSSession:
self.request.client_key = entrust_api_cert_key
else:
raise SessionConfigurationException(
to_native(
"Client certificate for authentication to the API must be provided."
)
"Client certificate for authentication to the API must be provided."
)
# set up the spec
@@ -278,15 +274,11 @@ class ECSSession:
entrust_api_specification_path
):
raise SessionConfigurationException(
to_native(
f"OpenAPI specification was not found at location {entrust_api_specification_path}."
)
f"OpenAPI specification was not found at location {entrust_api_specification_path}."
)
if not valid_file_format.match(entrust_api_specification_path):
raise SessionConfigurationException(
to_native(
"OpenAPI specification filename must end in .json, .yml or .yaml"
)
"OpenAPI specification filename must end in .json, .yml or .yaml"
)
self.verify = True
@@ -305,9 +297,7 @@ class ECSSession:
self._spec = yaml.safe_load(http_response_contents)
except HTTPError as e:
raise SessionConfigurationException(
to_native(
f"Error downloading specification from address '{entrust_api_specification_path}', received error code '{e.getcode()}'"
)
f"Error downloading specification from address '{entrust_api_specification_path}', received error code '{e.getcode()}'"
)
else:
with open(entrust_api_specification_path) as f:
@@ -332,25 +322,21 @@ class ECSSession:
and not os.path.isfile(entrust_api_specification_path)
):
raise SessionConfigurationException(
to_native(
f"Parameter provided for entrust_api_specification_path of value '{entrust_api_specification_path}'"
" was not a valid file path or HTTPS address."
)
f"Parameter provided for entrust_api_specification_path of value '{entrust_api_specification_path}'"
" was not a valid file path or HTTPS address."
)
for required_file in ["entrust_api_cert", "entrust_api_cert_key"]:
file_path = kwargs.get(required_file)
if not file_path or not os.path.isfile(file_path):
raise SessionConfigurationException(
to_native(
f"Parameter provided for {required_file} of value '{file_path}' was not a valid file path."
)
f"Parameter provided for {required_file} of value '{file_path}' was not a valid file path."
)
for required_var in ["entrust_api_user", "entrust_api_key"]:
if not kwargs.get(required_var):
raise SessionConfigurationException(
to_native(f"Parameter provided for {required_var} was missing.")
f"Parameter provided for {required_var} was missing."
)
config["entrust_api_cert"] = kwargs.get("entrust_api_cert")

View File

@@ -10,7 +10,6 @@ import stat
import traceback
from ansible.module_utils import six
from ansible.module_utils.common.text.converters import to_native
from ansible_collections.community.crypto.plugins.module_utils.openssh.utils import (
parse_openssh_version,
)
@@ -73,7 +72,7 @@ class OpensshModule:
self._execute()
except Exception as e:
self.module.fail_json(
msg=f"unexpected error occurred: {to_native(e)}",
msg=f"unexpected error occurred: {e}",
exception=traceback.format_exc(),
)

View File

@@ -10,7 +10,7 @@ import os
from ansible.module_utils import six
from ansible.module_utils.basic import missing_required_lib
from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text
from ansible.module_utils.common.text.converters import to_bytes, to_text
from ansible_collections.community.crypto.plugins.module_utils.openssh.backends.common import (
KeygenCommand,
OpensshModule,
@@ -216,7 +216,7 @@ class KeypairBackend(OpensshModule):
]
)
except OSError as e:
self.module.fail_json(msg=to_native(e))
self.module.fail_json(msg=str(e))
def _generate_temp_keypair(self):
temp_private_key = os.path.join(
@@ -227,7 +227,7 @@ class KeypairBackend(OpensshModule):
try:
self._generate_keypair(temp_private_key)
except (IOError, OSError) as e:
self.module.fail_json(msg=to_native(e))
self.module.fail_json(msg=str(e))
for f in (temp_private_key, temp_public_key):
self.module.add_cleanup_file(f)
@@ -283,7 +283,7 @@ class KeypairBackend(OpensshModule):
to_bytes(content),
)
except (IOError, OSError) as e:
self.module.fail_json(msg=to_native(e))
self.module.fail_json(msg=str(e))
self.module.add_cleanup_file(temp_public_key)
return temp_public_key
@@ -304,7 +304,7 @@ class KeypairBackend(OpensshModule):
if self._public_key_exists():
os.remove(self.public_key_path)
except (IOError, OSError) as e:
self.module.fail_json(msg=to_native(e))
self.module.fail_json(msg=str(e))
@property
def _result(self):
@@ -396,7 +396,7 @@ class KeypairBackendOpensshBin(KeypairBackend):
check_rc=True,
)
except (IOError, OSError) as e:
self.module.fail_json(msg=to_native(e))
self.module.fail_json(msg=str(e))
def _private_key_valid_backend(self):
return True
@@ -520,13 +520,13 @@ class KeypairBackendCryptography(KeypairBackend):
try:
keypair.comment = self.comment
except InvalidCommentError as e:
self.module.fail_json(msg=to_native(e))
self.module.fail_json(msg=str(e))
try:
temp_public_key = self._create_temp_public_key(keypair.public_key + b"\n")
self._safe_secure_move([(temp_public_key, self.public_key_path)])
except (IOError, OSError) as e:
self.module.fail_json(msg=to_native(e))
self.module.fail_json(msg=str(e))
def _private_key_valid_backend(self):
# avoids breaking behavior and prevents