diff --git a/changelogs/fragments/refactoring.yml b/changelogs/fragments/refactoring.yml new file mode 100644 index 00000000..95ff48ab --- /dev/null +++ b/changelogs/fragments/refactoring.yml @@ -0,0 +1,4 @@ +minor_changes: + - "Python code modernization: use f-strings instead of ``%`` and ``str.format()`` (https://github.com/ansible-collections/community.crypto/pull/875)." + - "Python code modernization: update ``__future__`` imports, remove Python 2 specific boilerplates (https://github.com/ansible-collections/community.crypto/pull/876)." + - "Python code modernization: remove Python 3 specific code (https://github.com/ansible-collections/community.crypto/pull/877)." diff --git a/plugins/doc_fragments/acme.py b/plugins/doc_fragments/acme.py index 3eab486d..b47394cc 100644 --- a/plugins/doc_fragments/acme.py +++ b/plugins/doc_fragments/acme.py @@ -19,7 +19,6 @@ notes: to help us supporting it. Feedback that an ACME server not mentioned does work is also appreciated. requirements: - either openssl or L(cryptography,https://cryptography.io/) >= 1.5 - - ipaddress options: acme_version: description: diff --git a/plugins/filter/gpg_fingerprint.py b/plugins/filter/gpg_fingerprint.py index 147fcb26..a34531b4 100644 --- a/plugins/filter/gpg_fingerprint.py +++ b/plugins/filter/gpg_fingerprint.py @@ -41,7 +41,6 @@ _value: from ansible.errors import AnsibleFilterError from ansible.module_utils.common.text.converters import to_bytes, to_native -from ansible.module_utils.six import string_types from ansible_collections.community.crypto.plugins.module_utils.gnupg.cli import ( GPGError, get_fingerprint_from_bytes, @@ -52,7 +51,7 @@ from ansible_collections.community.crypto.plugins.plugin_utils.gnupg import ( def gpg_fingerprint(input): - if not isinstance(input, string_types): + if not isinstance(input, (str, bytes)): raise AnsibleFilterError( f"The input for the community.crypto.gpg_fingerprint filter must be a string; got {type(input)} instead" ) diff --git a/plugins/filter/openssl_csr_info.py b/plugins/filter/openssl_csr_info.py index 86f36053..ea63590c 100644 --- a/plugins/filter/openssl_csr_info.py +++ b/plugins/filter/openssl_csr_info.py @@ -276,7 +276,6 @@ _value: from ansible.errors import AnsibleFilterError from ansible.module_utils.common.text.converters import to_bytes, to_native -from ansible.module_utils.six import string_types from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( OpenSSLObjectError, ) @@ -290,11 +289,11 @@ from ansible_collections.community.crypto.plugins.plugin_utils.filter_module imp def openssl_csr_info_filter(data, name_encoding="ignore"): """Extract information from X.509 PEM certificate.""" - if not isinstance(data, string_types): + if not isinstance(data, (str, bytes)): raise AnsibleFilterError( f"The community.crypto.openssl_csr_info input must be a text type, not {type(data)}" ) - if not isinstance(name_encoding, string_types): + if not isinstance(name_encoding, (str, bytes)): raise AnsibleFilterError( f"The name_encoding option must be of a text type, not {type(name_encoding)}" ) diff --git a/plugins/filter/openssl_privatekey_info.py b/plugins/filter/openssl_privatekey_info.py index 303194cd..50eef31c 100644 --- a/plugins/filter/openssl_privatekey_info.py +++ b/plugins/filter/openssl_privatekey_info.py @@ -148,7 +148,6 @@ _value: from ansible.errors import AnsibleFilterError from ansible.module_utils.common.text.converters import to_bytes, to_native -from ansible.module_utils.six import string_types from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( OpenSSLObjectError, ) @@ -165,11 +164,11 @@ def openssl_privatekey_info_filter( data, passphrase=None, return_private_key_data=False ): """Extract information from X.509 PEM certificate.""" - if not isinstance(data, string_types): + if not isinstance(data, (str, bytes)): raise AnsibleFilterError( f"The community.crypto.openssl_privatekey_info input must be a text type, not {type(data)}" ) - if passphrase is not None and not isinstance(passphrase, string_types): + if passphrase is not None and not isinstance(passphrase, (str, bytes)): raise AnsibleFilterError( f"The passphrase option must be a text type, not {type(passphrase)}" ) diff --git a/plugins/filter/openssl_publickey_info.py b/plugins/filter/openssl_publickey_info.py index b8a66790..33647b67 100644 --- a/plugins/filter/openssl_publickey_info.py +++ b/plugins/filter/openssl_publickey_info.py @@ -125,7 +125,6 @@ _value: from ansible.errors import AnsibleFilterError from ansible.module_utils.common.text.converters import to_bytes, to_native -from ansible.module_utils.six import string_types from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( OpenSSLObjectError, ) @@ -140,7 +139,7 @@ from ansible_collections.community.crypto.plugins.plugin_utils.filter_module imp def openssl_publickey_info_filter(data): """Extract information from OpenSSL PEM public key.""" - if not isinstance(data, string_types): + if not isinstance(data, (str, bytes)): raise AnsibleFilterError( f"The community.crypto.openssl_publickey_info input must be a text type, not {type(data)}" ) diff --git a/plugins/filter/parse_serial.py b/plugins/filter/parse_serial.py index 9a785557..fd814dbf 100644 --- a/plugins/filter/parse_serial.py +++ b/plugins/filter/parse_serial.py @@ -41,14 +41,13 @@ _value: from ansible.errors import AnsibleFilterError from ansible.module_utils.common.text.converters import to_native -from ansible.module_utils.six import string_types from ansible_collections.community.crypto.plugins.module_utils.serial import ( parse_serial, ) def parse_serial_filter(input): - if not isinstance(input, string_types): + if not isinstance(input, (str, bytes)): raise AnsibleFilterError( f"The input for the community.crypto.parse_serial filter must be a string; got {type(input)} instead" ) diff --git a/plugins/filter/split_pem.py b/plugins/filter/split_pem.py index 79d2d448..4112e988 100644 --- a/plugins/filter/split_pem.py +++ b/plugins/filter/split_pem.py @@ -40,7 +40,6 @@ _value: from ansible.errors import AnsibleFilterError from ansible.module_utils.common.text.converters import to_text -from ansible.module_utils.six import string_types from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import ( split_pem_list, ) @@ -48,7 +47,7 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import def split_pem_filter(data): """Split PEM file.""" - if not isinstance(data, string_types): + if not isinstance(data, (str, bytes)): raise AnsibleFilterError( f"The community.crypto.split_pem input must be a text type, not {type(data)}" ) diff --git a/plugins/filter/to_serial.py b/plugins/filter/to_serial.py index ec3d704c..9675218c 100644 --- a/plugins/filter/to_serial.py +++ b/plugins/filter/to_serial.py @@ -41,12 +41,11 @@ _value: from ansible.errors import AnsibleFilterError from ansible.module_utils.common.text.converters import to_native -from ansible.module_utils.six import integer_types from ansible_collections.community.crypto.plugins.module_utils.serial import to_serial def to_serial_filter(input): - if not isinstance(input, integer_types): + if not isinstance(input, int): raise AnsibleFilterError( f"The input for the community.crypto.to_serial filter must be an integer; got {type(input)} instead" ) diff --git a/plugins/filter/x509_certificate_info.py b/plugins/filter/x509_certificate_info.py index 1fda7ff9..0a832483 100644 --- a/plugins/filter/x509_certificate_info.py +++ b/plugins/filter/x509_certificate_info.py @@ -310,7 +310,6 @@ _value: from ansible.errors import AnsibleFilterError from ansible.module_utils.common.text.converters import to_bytes, to_native -from ansible.module_utils.six import string_types from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( OpenSSLObjectError, ) @@ -324,11 +323,11 @@ from ansible_collections.community.crypto.plugins.plugin_utils.filter_module imp def x509_certificate_info_filter(data, name_encoding="ignore"): """Extract information from X.509 PEM certificate.""" - if not isinstance(data, string_types): + if not isinstance(data, (str, bytes)): raise AnsibleFilterError( f"The community.crypto.x509_certificate_info input must be a text type, not {type(data)}" ) - if not isinstance(name_encoding, string_types): + if not isinstance(name_encoding, (str, bytes)): raise AnsibleFilterError( f"The name_encoding option must be of a text type, not {type(name_encoding)}" ) diff --git a/plugins/filter/x509_crl_info.py b/plugins/filter/x509_crl_info.py index 5aa57342..3a4e6100 100644 --- a/plugins/filter/x509_crl_info.py +++ b/plugins/filter/x509_crl_info.py @@ -158,7 +158,6 @@ import binascii from ansible.errors import AnsibleFilterError from ansible.module_utils.common.text.converters import to_bytes, to_native -from ansible.module_utils.six import string_types from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( OpenSSLObjectError, ) @@ -175,11 +174,11 @@ from ansible_collections.community.crypto.plugins.plugin_utils.filter_module imp def x509_crl_info_filter(data, name_encoding="ignore", list_revoked_certificates=True): """Extract information from X.509 PEM certificate.""" - if not isinstance(data, string_types): + if not isinstance(data, (str, bytes)): raise AnsibleFilterError( f"The community.crypto.x509_crl_info input must be a text type, not {type(data)}" ) - if not isinstance(name_encoding, string_types): + if not isinstance(name_encoding, (str, bytes)): raise AnsibleFilterError( f"The name_encoding option must be of a text type, not {type(name_encoding)}" ) diff --git a/plugins/module_utils/acme/acme.py b/plugins/module_utils/acme/acme.py index d93ad984..b5bdba25 100644 --- a/plugins/module_utils/acme/acme.py +++ b/plugins/module_utils/acme/acme.py @@ -10,11 +10,9 @@ import datetime import json import locale import time -import traceback from ansible.module_utils.basic import missing_required_lib from ansible.module_utils.common.text.converters import to_bytes -from ansible.module_utils.six import PY3 from ansible.module_utils.urls import fetch_url from ansible_collections.community.crypto.plugins.module_utils.acme.backend_cryptography import ( CRYPTOGRAPHY_ERROR, @@ -43,16 +41,6 @@ from ansible_collections.community.crypto.plugins.module_utils.argspec import ( ) -try: - import ipaddress # noqa: F401, pylint: disable=unused-import -except ImportError: - HAS_IPADDRESS = False - IPADDRESS_IMPORT_ERROR = traceback.format_exc() -else: - HAS_IPADDRESS = True - IPADDRESS_IMPORT_ERROR = None - - # -1 usually means connection problems RETRY_STATUS_CODES = (-1, 408, 429, 503) @@ -345,7 +333,7 @@ class ACMEClient: try: # In Python 2, reading from a closed response yields a TypeError. # In Python 3, read() simply returns '' - if PY3 and resp.closed: + if resp.closed: raise TypeError content = resp.read() except (AttributeError, TypeError): @@ -440,7 +428,7 @@ class ACMEClient: try: # In Python 2, reading from a closed response yields a TypeError. # In Python 3, read() simply returns '' - if PY3 and resp.closed: + if resp.closed: raise TypeError content = resp.read() except (AttributeError, TypeError): @@ -557,11 +545,6 @@ def create_default_argspec( def create_backend(module, needs_acme_v2=True): - if not HAS_IPADDRESS: - module.fail_json( - msg=missing_required_lib("ipaddress"), exception=IPADDRESS_IMPORT_ERROR - ) - backend = module.params["select_crypto_backend"] # Backend autodetect diff --git a/plugins/module_utils/acme/backend_openssl_cli.py b/plugins/module_utils/acme/backend_openssl_cli.py index b225da96..069ce350 100644 --- a/plugins/module_utils/acme/backend_openssl_cli.py +++ b/plugins/module_utils/acme/backend_openssl_cli.py @@ -8,6 +8,7 @@ from __future__ import annotations import base64 import binascii import datetime +import ipaddress import os import re import tempfile @@ -33,12 +34,6 @@ from ansible_collections.community.crypto.plugins.module_utils.time import ( ) -try: - import ipaddress -except ImportError: - pass - - _OPENSSL_ENVIRONMENT_UPDATE = dict(LANG="C", LC_ALL="C", LC_MESSAGES="C", LC_CTYPE="C") diff --git a/plugins/module_utils/acme/backends.py b/plugins/module_utils/acme/backends.py index 3127eb11..fb8a2cf2 100644 --- a/plugins/module_utils/acme/backends.py +++ b/plugins/module_utils/acme/backends.py @@ -74,7 +74,6 @@ def _parse_acme_timestamp(timestamp_str, with_timezone): "%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%dT%H:%M:%S.%f%z", ): - # Note that %z will not work with Python 2... https://stackoverflow.com/a/27829491 try: result = datetime.datetime.strptime(timestamp_str, format) except ValueError: diff --git a/plugins/module_utils/acme/challenges.py b/plugins/module_utils/acme/challenges.py index c1fcc43c..35ff5d84 100644 --- a/plugins/module_utils/acme/challenges.py +++ b/plugins/module_utils/acme/challenges.py @@ -7,6 +7,7 @@ from __future__ import annotations import base64 import hashlib +import ipaddress import json import re import time @@ -22,12 +23,6 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.utils import ) -try: - import ipaddress -except ImportError: - pass - - def create_key_authorization(client, token): """ Returns the key authorization for the given token diff --git a/plugins/module_utils/acme/errors.py b/plugins/module_utils/acme/errors.py index 61a59efa..dfc7cfac 100644 --- a/plugins/module_utils/acme/errors.py +++ b/plugins/module_utils/acme/errors.py @@ -5,9 +5,9 @@ from __future__ import annotations +from http.client import responses as http_responses + from ansible.module_utils.common.text.converters import to_text -from ansible.module_utils.six import PY3, binary_type -from ansible.module_utils.six.moves.http_client import responses as http_responses def format_http_status(status_code): @@ -67,7 +67,7 @@ class ACMEProtocolException(ModuleFailException): try: # In Python 2, reading from a closed response yields a TypeError. # In Python 3, read() simply returns '' - if PY3 and response.closed: + if response.closed: raise TypeError content = response.read() except (AttributeError, TypeError): @@ -75,7 +75,7 @@ class ACMEProtocolException(ModuleFailException): # Make sure that content_json is None or a dictionary if content_json is not None and not isinstance(content_json, dict): - if content is None and isinstance(content_json, binary_type): + if content is None and isinstance(content_json, bytes): content = content_json content_json = None diff --git a/plugins/module_utils/acme/utils.py b/plugins/module_utils/acme/utils.py index d5881fdd..23a7caa1 100644 --- a/plugins/module_utils/acme/utils.py +++ b/plugins/module_utils/acme/utils.py @@ -10,9 +10,9 @@ import datetime import re import textwrap import traceback +from urllib.parse import unquote from ansible.module_utils.common.text.converters import to_native -from ansible.module_utils.six.moves.urllib.parse import unquote from ansible_collections.community.crypto.plugins.module_utils.acme.errors import ( ModuleFailException, ) diff --git a/plugins/module_utils/crypto/cryptography_support.py b/plugins/module_utils/crypto/cryptography_support.py index ef5c0738..ade63281 100644 --- a/plugins/module_utils/crypto/cryptography_support.py +++ b/plugins/module_utils/crypto/cryptography_support.py @@ -6,16 +6,16 @@ from __future__ import annotations import base64 import binascii +import ipaddress import re -import sys import traceback - -from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text -from ansible.module_utils.six.moves.urllib.parse import ( +from urllib.parse import ( ParseResult, urlparse, urlunparse, ) + +from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text from ansible_collections.community.crypto.plugins.module_utils.version import ( LooseVersion, ) @@ -24,8 +24,6 @@ from ._asn1 import serialize_asn1_string_as_der try: - import ipaddress - import cryptography from cryptography import x509 from cryptography.exceptions import InvalidSignature @@ -286,12 +284,8 @@ DN_COMPONENT_START_RE = re.compile(b"^ *([a-zA-z0-9.]+) *= *") DN_HEX_LETTER = b"0123456789abcdef" -if sys.version_info[0] < 3: - _int_to_byte = chr -else: - - def _int_to_byte(value): - return bytes((value,)) +def _int_to_byte(value): + return bytes((value,)) def _parse_dn_component(name, sep=b",", decode_remainder=True): diff --git a/plugins/module_utils/crypto/math.py b/plugins/module_utils/crypto/math.py index 85046857..6b3e3434 100644 --- a/plugins/module_utils/crypto/math.py +++ b/plugins/module_utils/crypto/math.py @@ -4,8 +4,6 @@ from __future__ import annotations -import sys - def binary_exp_mod(f, e, m): """Computes f^e mod m in O(log e) multiplications modulo m.""" @@ -99,82 +97,36 @@ def quick_is_not_prime(n): return False -python_version = (sys.version_info[0], sys.version_info[1]) -if python_version >= (2, 7) or python_version >= (3, 1): - # Ansible still supports Python 2.6 on remote nodes - - def count_bytes(no): - """ - Given an integer, compute the number of bytes necessary to store its absolute value. - """ - no = abs(no) - if no == 0: - return 0 - return (no.bit_length() + 7) // 8 - - def count_bits(no): - """ - Given an integer, compute the number of bits necessary to store its absolute value. - """ - no = abs(no) - if no == 0: - return 0 - return no.bit_length() - -else: - # Slow, but works - def count_bytes(no): - """ - Given an integer, compute the number of bytes necessary to store its absolute value. - """ - no = abs(no) - count = 0 - while no > 0: - no >>= 8 - count += 1 - return count - - def count_bits(no): - """ - Given an integer, compute the number of bits necessary to store its absolute value. - """ - no = abs(no) - count = 0 - while no > 0: - no >>= 1 - count += 1 - return count +def count_bytes(no): + """ + Given an integer, compute the number of bytes necessary to store its absolute value. + """ + no = abs(no) + if no == 0: + return 0 + return (no.bit_length() + 7) // 8 -if sys.version_info[0] >= 3: - # Python 3 (and newer) - def _convert_int_to_bytes(count, no): - return no.to_bytes(count, byteorder="big") +def count_bits(no): + """ + Given an integer, compute the number of bits necessary to store its absolute value. + """ + no = abs(no) + if no == 0: + return 0 + return no.bit_length() - def _convert_bytes_to_int(data): - return int.from_bytes(data, byteorder="big", signed=False) - def _to_hex(no): - return hex(no)[2:] +def _convert_int_to_bytes(count, no): + return no.to_bytes(count, byteorder="big") -else: - # Python 2 - def _convert_int_to_bytes(count, n): - if n == 0 and count == 0: - return "" - h = f"{n:x}" - if len(h) > 2 * count: - raise Exception(f"Number {n} needs more than {count} bytes!") - return ("0" * (2 * count - len(h)) + h).decode("hex") - def _convert_bytes_to_int(data): - v = 0 - for x in data: - v = (v << 8) | ord(x) - return v +def _convert_bytes_to_int(data): + return int.from_bytes(data, byteorder="big", signed=False) - def _to_hex(no): - return f"{no:x}" + +def _to_hex(no): + return f"{no:x}" def convert_int_to_bytes(no, count=None): diff --git a/plugins/module_utils/ecs/api.py b/plugins/module_utils/ecs/api.py index 146d8867..e43d581e 100644 --- a/plugins/module_utils/ecs/api.py +++ b/plugins/module_utils/ecs/api.py @@ -14,11 +14,11 @@ import json import os import re import traceback +from urllib.error import HTTPError +from urllib.parse import urlencode from ansible.module_utils.basic import missing_required_lib from ansible.module_utils.common.text.converters import to_native, to_text -from ansible.module_utils.six.moves.urllib.error import HTTPError -from ansible.module_utils.six.moves.urllib.parse import urlencode from ansible.module_utils.urls import Request diff --git a/plugins/module_utils/openssh/certificate.py b/plugins/module_utils/openssh/certificate.py index e9035f07..fe67d88b 100644 --- a/plugins/module_utils/openssh/certificate.py +++ b/plugins/module_utils/openssh/certificate.py @@ -8,7 +8,6 @@ import abc import binascii import datetime as _datetime import os -import sys from base64 import b64encode from datetime import datetime from hashlib import sha256 @@ -68,13 +67,8 @@ _ECDSA_CURVE_IDENTIFIERS_LOOKUP = { b"nistp521": "ecdsa-nistp521", } -_USE_TIMEZONE = sys.version_info >= (3, 6) - - -_ALWAYS = _add_or_remove_timezone(datetime(1970, 1, 1), with_timezone=_USE_TIMEZONE) -_FOREVER = ( - datetime(9999, 12, 31, 23, 59, 59, 999999, _UTC) if _USE_TIMEZONE else datetime.max -) +_ALWAYS = _add_or_remove_timezone(datetime(1970, 1, 1), with_timezone=True) +_FOREVER = datetime(9999, 12, 31, 23, 59, 59, 999999, _UTC) _CRITICAL_OPTIONS = ( "force-command", @@ -99,9 +93,6 @@ _EXTENSIONS = ( "permit-user-rc", ) -if six.PY3: - long = int - class OpensshCertificateTimeParameters: def __init__(self, valid_from, valid_to): @@ -172,13 +163,13 @@ class OpensshCertificateTimeParameters: result = OpensshCertificateTimeParameters._time_string_to_datetime( time_string_or_timestamp.strip() ) - elif isinstance(time_string_or_timestamp, (long, int)): + elif isinstance(time_string_or_timestamp, int): result = OpensshCertificateTimeParameters._timestamp_to_datetime( time_string_or_timestamp ) else: raise ValueError( - f"Value must be of type (str, unicode, int, long) not {type(time_string_or_timestamp)}" + f"Value must be of type (str, unicode, int) not {type(time_string_or_timestamp)}" ) except ValueError: raise @@ -192,12 +183,7 @@ class OpensshCertificateTimeParameters: result = _FOREVER else: try: - if _USE_TIMEZONE: - result = datetime.fromtimestamp( - timestamp, tz=_datetime.timezone.utc - ) - else: - result = datetime.utcfromtimestamp(timestamp) + result = datetime.fromtimestamp(timestamp, tz=_datetime.timezone.utc) except OverflowError: raise ValueError return result @@ -210,15 +196,13 @@ class OpensshCertificateTimeParameters: elif time_string == "forever": result = _FOREVER elif is_relative_time_string(time_string): - result = convert_relative_to_datetime( - time_string, with_timezone=_USE_TIMEZONE - ) + result = convert_relative_to_datetime(time_string, with_timezone=True) else: for time_format in ("%Y-%m-%d", "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"): try: result = _add_or_remove_timezone( datetime.strptime(time_string, time_format), - with_timezone=_USE_TIMEZONE, + with_timezone=True, ) except ValueError: pass diff --git a/plugins/module_utils/openssh/utils.py b/plugins/module_utils/openssh/utils.py index 63d45710..d58c917d 100644 --- a/plugins/module_utils/openssh/utils.py +++ b/plugins/module_utils/openssh/utils.py @@ -10,8 +10,6 @@ import re from contextlib import contextmanager from struct import Struct -from ansible.module_utils.six import PY3 - # Protocol References # ------------------- @@ -25,9 +23,6 @@ from ansible.module_utils.six import PY3 # https://github.com/pyca/cryptography/blob/main/src/cryptography/hazmat/primitives/serialization/ssh.py # https://github.com/paramiko/paramiko/blob/master/paramiko/message.py -if PY3: - long = int - # 0 (False) or 1 (True) encoded as a single byte _BOOLEAN = Struct(b"?") # Unsigned 8-bit integer in network-byte-order @@ -93,7 +88,7 @@ class OpensshParser: if not isinstance(data, (bytes, bytearray)): raise TypeError(f"Data must be bytes-like not {type(data)}") - self._data = memoryview(data) if PY3 else data + self._data = memoryview(data) self._pos = 0 def boolean(self): @@ -125,7 +120,7 @@ class OpensshParser: value = self._data[self._pos : next_pos] self._pos = next_pos # Cast to bytes is required as a memoryview slice is itself a memoryview - return value if not PY3 else bytes(value) + return bytes(value) def mpint(self): return self._big_int(self.string(), "big", signed=True) @@ -223,47 +218,7 @@ class OpensshParser: f"Byte_order must be one of (big, little) not {byte_order}" ) - if PY3: - return int.from_bytes(raw_string, byte_order, signed=signed) - - result = 0 - byte_length = len(raw_string) - - if byte_length > 0: - # Check sign-bit - msb = raw_string[0] if byte_order == "big" else raw_string[-1] - negative = bool(ord(msb) & 0x80) - # Match pad value for two's complement - pad = b"\xff" if signed and negative else b"\x00" - # The definition of ``mpint`` enforces that unnecessary bytes are not encoded so they are added back - pad_length = 4 - byte_length % 4 - if pad_length < 4: - raw_string = ( - pad * pad_length + raw_string - if byte_order == "big" - else raw_string + pad * pad_length - ) - byte_length += pad_length - # Accumulate arbitrary precision integer bytes in the appropriate order - if byte_order == "big": - for i in range(0, byte_length, cls.UINT32_OFFSET): - left_shift = result << cls.UINT32_OFFSET * 8 - result = ( - left_shift - + _UINT32.unpack(raw_string[i : i + cls.UINT32_OFFSET])[0] - ) - else: - for i in range(byte_length, 0, -cls.UINT32_OFFSET): - left_shift = result << cls.UINT32_OFFSET * 8 - result = ( - left_shift - + _UINT32_LE.unpack(raw_string[i - cls.UINT32_OFFSET : i])[0] - ) - # Adjust for two's complement - if signed and negative: - result -= 1 << (8 * byte_length) - - return result + return int.from_bytes(raw_string, byte_order, signed=signed) class _OpensshWriter: @@ -307,8 +262,8 @@ class _OpensshWriter: return self def uint64(self, value): - if not isinstance(value, (long, int)): - raise TypeError(f"Value must be of type (long, int) not {type(value)}") + if not isinstance(value, int): + raise TypeError(f"Value must be of type int not {type(value)}") if value < 0 or value > _UINT64_MAX: raise ValueError( f"Value must be a positive integer less than {_UINT64_MAX}" @@ -327,8 +282,8 @@ class _OpensshWriter: return self def mpint(self, value): - if not isinstance(value, (int, long)): - raise TypeError(f"Value must be of type (long, int) not {type(value)}") + if not isinstance(value, int): + raise TypeError(f"Value must be of type int not {type(value)}") self.string(self._int_to_mpint(value)) @@ -373,42 +328,12 @@ class _OpensshWriter: @staticmethod def _int_to_mpint(num): - if PY3: - byte_length = (num.bit_length() + 7) // 8 - try: - result = num.to_bytes(byte_length, "big", signed=True) - # Handles values which require \x00 or \xFF to pad sign-bit - except OverflowError: - result = num.to_bytes(byte_length + 1, "big", signed=True) - else: - result = bytes() - # 0 and -1 are treated as special cases since they are used as sentinels for all other values - if num == 0: - result += b"\x00" - elif num == -1: - result += b"\xff" - elif num > 0: - while num >> 32: - result = _UINT32.pack(num & _UINT32_MAX) + result - num = num >> 32 - # Pack last 4 bytes individually to discard insignificant bytes - while num: - result = _UBYTE.pack(num & _UBYTE_MAX) + result - num = num >> 8 - # Zero pad final byte if most-significant bit is 1 as per mpint definition - if ord(result[0]) & 0x80: - result = b"\x00" + result - else: - while (num >> 32) < -1: - result = _UINT32.pack(num & _UINT32_MAX) + result - num = num >> 32 - while num < -1: - result = _UBYTE.pack(num & _UBYTE_MAX) + result - num = num >> 8 - if not ord(result[0]) & 0x80: - result = b"\xff" + result - - return result + byte_length = (num.bit_length() + 7) // 8 + try: + return num.to_bytes(byte_length, "big", signed=True) + # Handles values which require \x00 or \xFF to pad sign-bit + except OverflowError: + return num.to_bytes(byte_length + 1, "big", signed=True) def bytes(self): return bytes(self._buff) diff --git a/plugins/module_utils/time.py b/plugins/module_utils/time.py index 18d1c48b..1d36295d 100644 --- a/plugins/module_utils/time.py +++ b/plugins/module_utils/time.py @@ -6,7 +6,6 @@ from __future__ import annotations import datetime import re -import sys from ansible.module_utils.common.text.converters import to_native from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( @@ -68,29 +67,11 @@ def add_or_remove_timezone(timestamp, with_timezone): ) -if sys.version_info < (3, 3): - - def get_epoch_seconds(timestamp): - epoch = datetime.datetime( - 1970, 1, 1, tzinfo=UTC if timestamp.tzinfo is not None else None - ) - delta = timestamp - epoch - try: - return delta.total_seconds() - except AttributeError: - # Python 2.6 and earlier: total_seconds() does not yet exist, so we use the formula from - # https://docs.python.org/2/library/datetime.html#datetime.timedelta.total_seconds - return ( - delta.microseconds + (delta.seconds + delta.days * 24 * 3600) * 10**6 - ) / 10**6 - -else: - - def get_epoch_seconds(timestamp): - if timestamp.tzinfo is None: - # timestamp.timestamp() is offset by the local timezone if timestamp has no timezone - timestamp = ensure_utc_timezone(timestamp) - return timestamp.timestamp() +def get_epoch_seconds(timestamp): + if timestamp.tzinfo is None: + # timestamp.timestamp() is offset by the local timezone if timestamp has no timezone + timestamp = ensure_utc_timezone(timestamp) + return timestamp.timestamp() def from_epoch_seconds(timestamp, with_timezone): diff --git a/plugins/modules/acme_challenge_cert_helper.py b/plugins/modules/acme_challenge_cert_helper.py index 66ee3c04..eef5c1d4 100644 --- a/plugins/modules/acme_challenge_cert_helper.py +++ b/plugins/modules/acme_challenge_cert_helper.py @@ -149,7 +149,7 @@ regular_certificate: import base64 import datetime -import sys +import ipaddress import traceback from ansible.module_utils.basic import AnsibleModule, missing_required_lib @@ -173,8 +173,6 @@ from ansible_collections.community.crypto.plugins.module_utils.version import ( CRYPTOGRAPHY_IMP_ERR = None try: - import ipaddress - import cryptography import cryptography.hazmat.backends import cryptography.hazmat.primitives.asymmetric.ec @@ -194,23 +192,12 @@ except ImportError: # Convert byte string to ASN1 encoded octet string -if sys.version_info[0] >= 3: - - def encode_octet_string(octet_string): - if len(octet_string) >= 128: - raise ModuleFailException( - "Cannot handle octet strings with more than 128 bytes" - ) - return bytes([0x4, len(octet_string)]) + octet_string - -else: - - def encode_octet_string(octet_string): - if len(octet_string) >= 128: - raise ModuleFailException( - "Cannot handle octet strings with more than 128 bytes" - ) - return b"\x04" + chr(len(octet_string)) + octet_string +def encode_octet_string(octet_string): + if len(octet_string) >= 128: + raise ModuleFailException( + "Cannot handle octet strings with more than 128 bytes" + ) + return bytes([0x4, len(octet_string)]) + octet_string def main(): diff --git a/plugins/modules/get_certificate.py b/plugins/modules/get_certificate.py index d72486f6..c5e65a86 100644 --- a/plugins/modules/get_certificate.py +++ b/plugins/modules/get_certificate.py @@ -13,8 +13,6 @@ short_description: Get a certificate from a host:port description: - Makes a secure connection and returns information about the presented certificate. - The module uses the cryptography Python library. - - Support SNI (L(Server Name Indication,https://en.wikipedia.org/wiki/Server_Name_Indication)) only with Python 2.7 and - newer. extends_documentation_fragment: - community.crypto.attributes - community.crypto.attributes.idempotent_not_modify_state @@ -122,7 +120,7 @@ options: notes: - When using ca_cert on OS X it has been reported that in some conditions the validate will always succeed. requirements: - - "Python >= 2.7 when using O(proxy_host), and Python >= 3.10 when O(get_certificate_chain=true)" + - "Python >= 3.10 when O(get_certificate_chain=true)" - "cryptography >= 1.6" seealso: @@ -270,11 +268,15 @@ import sys import traceback from os.path import isfile from socket import create_connection, setdefaulttimeout, socket -from ssl import CERT_NONE, CERT_REQUIRED, DER_cert_to_PEM_cert, get_server_certificate +from ssl import ( + CERT_NONE, + CERT_REQUIRED, + DER_cert_to_PEM_cert, + create_default_context, +) from ansible.module_utils.basic import AnsibleModule, missing_required_lib from ansible.module_utils.common.text.converters import to_bytes, to_native -from ansible.module_utils.six import string_types from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( CRYPTOGRAPHY_TIMEZONE, cryptography_get_extensions_from_cert, @@ -292,15 +294,6 @@ from ansible_collections.community.crypto.plugins.module_utils.version import ( MINIMAL_CRYPTOGRAPHY_VERSION = "1.6" -CREATE_DEFAULT_CONTEXT_IMP_ERR = None -try: - from ssl import create_default_context -except ImportError: - CREATE_DEFAULT_CONTEXT_IMP_ERR = traceback.format_exc() - HAS_CREATE_DEFAULT_CONTEXT = False -else: - HAS_CREATE_DEFAULT_CONTEXT = True - CRYPTOGRAPHY_IMP_ERR = None try: import cryptography @@ -413,156 +406,124 @@ def main(): verified_chain = None unverified_chain = None - if not HAS_CREATE_DEFAULT_CONTEXT: - # Python < 2.7.9 + try: if proxy_host: - module.fail_json( - msg="To use proxy_host, you must run the get_certificate module with Python 2.7 or newer.", - exception=CREATE_DEFAULT_CONTEXT_IMP_ERR, - ) + connect = f"CONNECT {host}:{port} HTTP/1.0\r\n\r\n" + sock = socket() + atexit.register(sock.close) + sock.connect((proxy_host, proxy_port)) + sock.send(connect.encode()) + sock.recv(8192) + else: + sock = create_connection((host, port)) + atexit.register(sock.close) + + if ca_cert: + ctx = create_default_context(cafile=ca_cert) + ctx.check_hostname = False + ctx.verify_mode = CERT_REQUIRED + else: + ctx = create_default_context() + ctx.check_hostname = False + ctx.verify_mode = CERT_NONE + + if start_tls_server_type is not None: + send_starttls_packet(sock, start_tls_server_type) + if ciphers is not None: - module.fail_json( - msg="To use ciphers, you must run the get_certificate module with Python 2.7 or newer.", - exception=CREATE_DEFAULT_CONTEXT_IMP_ERR, - ) + ciphers_joined = ":".join(ciphers) + ctx.set_ciphers(ciphers_joined) + if tls_ctx_options is not None: - module.fail_json( - msg="To use tls_ctx_options, you must run the get_certificate module with Python 2.7 or newer.", - exception=CREATE_DEFAULT_CONTEXT_IMP_ERR, - ) - try: - # Note: get_server_certificate does not support SNI! - cert = get_server_certificate((host, port), ca_certs=ca_cert) - except Exception as e: - module.fail_json(msg=f"Failed to get cert from {host}:{port}, error: {e}") - else: - # Python >= 2.7.9 - try: - if proxy_host: - connect = f"CONNECT {host}:{port} HTTP/1.0\r\n\r\n" - sock = socket() - atexit.register(sock.close) - sock.connect((proxy_host, proxy_port)) - sock.send(connect.encode()) - sock.recv(8192) - else: - sock = create_connection((host, port)) - atexit.register(sock.close) + # Clear default ctx options + ctx.options = 0 - if ca_cert: - ctx = create_default_context(cafile=ca_cert) - ctx.check_hostname = False - ctx.verify_mode = CERT_REQUIRED - else: - ctx = create_default_context() - ctx.check_hostname = False - ctx.verify_mode = CERT_NONE - - if start_tls_server_type is not None: - send_starttls_packet(sock, start_tls_server_type) - - if ciphers is not None: - ciphers_joined = ":".join(ciphers) - ctx.set_ciphers(ciphers_joined) - - if tls_ctx_options is not None: - # Clear default ctx options - ctx.options = 0 - - # For each item in the tls_ctx_options list - for tls_ctx_option in tls_ctx_options: - # If the item is a string_type - if isinstance(tls_ctx_option, string_types): - # Convert tls_ctx_option to a native string - tls_ctx_option_str = to_native(tls_ctx_option) - # Get the tls_ctx_option_str attribute from ssl - tls_ctx_option_attr = getattr(ssl, tls_ctx_option_str, None) - # If tls_ctx_option_attr is an integer - if isinstance(tls_ctx_option_attr, int): - # Set tls_ctx_option_int to the attribute value - tls_ctx_option_int = tls_ctx_option_attr - # If tls_ctx_option_attr is not an integer - else: - module.fail_json( - msg=f"Failed to determine the numeric value for {tls_ctx_option_str}" - ) - # If the item is an integer - elif isinstance(tls_ctx_option, int): - # Set tls_ctx_option_int to the item value - tls_ctx_option_int = tls_ctx_option - # If the item is not a string nor integer + # For each item in the tls_ctx_options list + for tls_ctx_option in tls_ctx_options: + # If the item is a string_type + if isinstance(tls_ctx_option, (str, bytes)): + # Convert tls_ctx_option to a native string + tls_ctx_option_str = to_native(tls_ctx_option) + # Get the tls_ctx_option_str attribute from ssl + tls_ctx_option_attr = getattr(ssl, tls_ctx_option_str, None) + # If tls_ctx_option_attr is an integer + if isinstance(tls_ctx_option_attr, int): + # Set tls_ctx_option_int to the attribute value + tls_ctx_option_int = tls_ctx_option_attr + # If tls_ctx_option_attr is not an integer else: module.fail_json( - msg=f"tls_ctx_options must be a string or integer, got {tls_ctx_option!r}" + msg=f"Failed to determine the numeric value for {tls_ctx_option_str}" ) - tls_ctx_option_int = ( - 0 # make pylint happy; this code is actually unreachable - ) - - try: - # Add the int value of the item to ctx options - ctx.options |= tls_ctx_option_int - except Exception: - module.fail_json( - msg=f"Failed to add {tls_ctx_option_str or tls_ctx_option_int} to CTX options" - ) - - tls_sock = ctx.wrap_socket(sock, server_hostname=server_name or host) - cert = tls_sock.getpeercert(True) - cert = DER_cert_to_PEM_cert(cert) - - if get_certificate_chain: - if sys.version_info < (3, 13): - # The official way to access this has been added in https://github.com/python/cpython/pull/109113/files. - # We are basically doing the same for older Python versions. The internal API needed for this was added - # in https://github.com/python/cpython/commit/666991fc598bc312d72aff0078ecb553f0a968f1, which was first - # released in Python 3.10.0. - def _convert_chain(chain): - if not chain: - return [] - return [c.public_bytes(ssl._ssl.ENCODING_DER) for c in chain] - - ssl_obj = tls_sock._sslobj # This is of type ssl._ssl._SSLSocket - verified_der_chain = _convert_chain(ssl_obj.get_verified_chain()) - unverified_der_chain = _convert_chain( - ssl_obj.get_unverified_chain() - ) + # If the item is an integer + elif isinstance(tls_ctx_option, int): + # Set tls_ctx_option_int to the item value + tls_ctx_option_int = tls_ctx_option + # If the item is not a string nor integer else: - # This works with Python 3.13+ - - # Unfortunately due to a bug (https://github.com/python/cpython/issues/118658) some early pre-releases of - # Python 3.13 do not return lists of byte strings, but lists of _ssl.Certificate objects. This is going to - # be fixed by https://github.com/python/cpython/pull/118669. For now we convert the certificates ourselves - # if they are not byte strings to work around this. - def _convert_chain(chain): - return [ - ( - c - if isinstance(c, bytes) - else c.public_bytes(ssl._ssl.ENCODING_DER) - ) - for c in chain - ] - - verified_der_chain = _convert_chain(tls_sock.get_verified_chain()) - unverified_der_chain = _convert_chain( - tls_sock.get_unverified_chain() + module.fail_json( + msg=f"tls_ctx_options must be a string or integer, got {tls_ctx_option!r}" + ) + tls_ctx_option_int = ( + 0 # make pylint happy; this code is actually unreachable ) - verified_chain = [DER_cert_to_PEM_cert(c) for c in verified_der_chain] - unverified_chain = [ - DER_cert_to_PEM_cert(c) for c in unverified_der_chain - ] + try: + # Add the int value of the item to ctx options + ctx.options |= tls_ctx_option_int + except Exception: + module.fail_json( + msg=f"Failed to add {tls_ctx_option_str or tls_ctx_option_int} to CTX options" + ) - except Exception as e: - if proxy_host: - module.fail_json( - msg=f"Failed to get cert via proxy {proxy_host}:{proxy_port} from {host}:{port}, error: {e}" - ) + tls_sock = ctx.wrap_socket(sock, server_hostname=server_name or host) + cert = tls_sock.getpeercert(True) + cert = DER_cert_to_PEM_cert(cert) + + if get_certificate_chain: + if sys.version_info < (3, 13): + # The official way to access this has been added in https://github.com/python/cpython/pull/109113/files. + # We are basically doing the same for older Python versions. The internal API needed for this was added + # in https://github.com/python/cpython/commit/666991fc598bc312d72aff0078ecb553f0a968f1, which was first + # released in Python 3.10.0. + def _convert_chain(chain): + if not chain: + return [] + return [c.public_bytes(ssl._ssl.ENCODING_DER) for c in chain] + + ssl_obj = tls_sock._sslobj # This is of type ssl._ssl._SSLSocket + verified_der_chain = _convert_chain(ssl_obj.get_verified_chain()) + unverified_der_chain = _convert_chain(ssl_obj.get_unverified_chain()) else: - module.fail_json( - msg=f"Failed to get cert from {host}:{port}, error: {e}" - ) + # This works with Python 3.13+ + + # Unfortunately due to a bug (https://github.com/python/cpython/issues/118658) some early pre-releases of + # Python 3.13 do not return lists of byte strings, but lists of _ssl.Certificate objects. This is going to + # be fixed by https://github.com/python/cpython/pull/118669. For now we convert the certificates ourselves + # if they are not byte strings to work around this. + def _convert_chain(chain): + return [ + ( + c + if isinstance(c, bytes) + else c.public_bytes(ssl._ssl.ENCODING_DER) + ) + for c in chain + ] + + verified_der_chain = _convert_chain(tls_sock.get_verified_chain()) + unverified_der_chain = _convert_chain(tls_sock.get_unverified_chain()) + + verified_chain = [DER_cert_to_PEM_cert(c) for c in verified_der_chain] + unverified_chain = [DER_cert_to_PEM_cert(c) for c in unverified_der_chain] + + except Exception as e: + if proxy_host: + module.fail_json( + msg=f"Failed to get cert via proxy {proxy_host}:{proxy_port} from {host}:{port}, error: {e}" + ) + else: + module.fail_json(msg=f"Failed to get cert from {host}:{port}, error: {e}") result["cert"] = cert diff --git a/plugins/modules/x509_certificate_info.py b/plugins/modules/x509_certificate_info.py index add2691c..5c5a3827 100644 --- a/plugins/modules/x509_certificate_info.py +++ b/plugins/modules/x509_certificate_info.py @@ -12,7 +12,7 @@ module: x509_certificate_info short_description: Provide information of OpenSSL X.509 certificates description: - This module allows one to query information on OpenSSL certificates. - - It uses the cryptography python library to interact with OpenSSL. + - It uses the cryptography Python library to interact with OpenSSL. - Note that this module was called C(openssl_certificate_info) when included directly in Ansible up to version 2.9. When moved to the collection C(community.crypto), it was renamed to M(community.crypto.x509_certificate_info). From Ansible 2.10 on, it can still be used by the old short name (or by C(ansible.builtin.openssl_certificate_info)), which redirects @@ -391,7 +391,6 @@ issuer_uri: from ansible.module_utils.basic import AnsibleModule from ansible.module_utils.common.text.converters import to_native -from ansible.module_utils.six import string_types from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( OpenSSLObjectError, ) @@ -440,7 +439,7 @@ def main(): valid_at = module.params["valid_at"] if valid_at: for k, v in valid_at.items(): - if not isinstance(v, string_types): + if not isinstance(v, (str, bytes)): module.fail_json( msg=f"The value for valid_at.{k} must be of type string (got {type(v)})" ) diff --git a/plugins/plugin_utils/action_module.py b/plugins/plugin_utils/action_module.py index da923293..cc246fe4 100644 --- a/plugins/plugin_utils/action_module.py +++ b/plugins/plugin_utils/action_module.py @@ -17,7 +17,6 @@ import copy import traceback from ansible.errors import AnsibleError -from ansible.module_utils import six from ansible.module_utils.basic import SEQUENCETYPE, remove_values from ansible.module_utils.common._collections_compat import Mapping from ansible.module_utils.common.arg_spec import ArgumentSpecValidator @@ -25,7 +24,6 @@ from ansible.module_utils.common.validation import ( safe_eval, ) from ansible.module_utils.errors import UnsupportedError -from ansible.module_utils.six import string_types from ansible.plugins.action import ActionBase @@ -129,7 +127,7 @@ class AnsibleActionModule: def warn(self, warning): # Copied from ansible.module_utils.common.warnings: - if isinstance(warning, string_types): + if isinstance(warning, (str, bytes)): self.__warnings.append(warning) else: raise TypeError(f"warn requires a string not a {type(warning)}") @@ -141,7 +139,7 @@ class AnsibleActionModule: ) # Copied from ansible.module_utils.common.warnings: - if isinstance(msg, string_types): + if isinstance(msg, (str, bytes)): # For compatibility, we accept that neither version nor date is set, # and treat that the same as if version would haven been set if date is not None: @@ -209,8 +207,7 @@ class AnsibleActionModule: self._return_formatted(result) -@six.add_metaclass(abc.ABCMeta) -class ActionModuleBase(ActionBase): +class ActionModuleBase(ActionBase, metaclass=abc.ABCMeta): @abc.abstractmethod def setup_module(self): """Return pair (ArgumentSpec, kwargs).""" diff --git a/tests/unit/plugins/module_utils/acme/backend_data.py b/tests/unit/plugins/module_utils/acme/backend_data.py index f2ea900e..4acad56b 100644 --- a/tests/unit/plugins/module_utils/acme/backend_data.py +++ b/tests/unit/plugins/module_utils/acme/backend_data.py @@ -7,7 +7,6 @@ from __future__ import annotations import base64 import datetime import os -import sys from ansible_collections.community.crypto.plugins.module_utils.acme.backends import ( CertificateInformation, @@ -154,34 +153,25 @@ TEST_PARSE_ACME_TIMESTAMP = cartesian_product( microsecond=333333, ), ), + ( + "2024-01-01T00:11:22+0100", + dict(year=2023, month=12, day=31, hour=23, minute=11, second=22), + ), + ( + "2024-01-01T00:11:22.123+0100", + dict( + year=2023, + month=12, + day=31, + hour=23, + minute=11, + second=22, + microsecond=123000, + ), + ), ], ) -if sys.version_info >= (3, 5): - TEST_PARSE_ACME_TIMESTAMP.extend( - cartesian_product( - TIMEZONES, - [ - ( - "2024-01-01T00:11:22+0100", - dict(year=2023, month=12, day=31, hour=23, minute=11, second=22), - ), - ( - "2024-01-01T00:11:22.123+0100", - dict( - year=2023, - month=12, - day=31, - hour=23, - minute=11, - second=22, - microsecond=123000, - ), - ), - ], - ) - ) - TEST_INTERPOLATE_TIMESTAMP = cartesian_product( TIMEZONES, diff --git a/tests/unit/plugins/module_utils/test_time.py b/tests/unit/plugins/module_utils/test_time.py index 99805e35..acef38dd 100644 --- a/tests/unit/plugins/module_utils/test_time.py +++ b/tests/unit/plugins/module_utils/test_time.py @@ -5,7 +5,6 @@ from __future__ import annotations import datetime -import sys import pytest from ansible.module_utils.common.collections import is_sequence @@ -47,6 +46,8 @@ def cartesian_product(list1, list2): return result +ONE_HOUR_PLUS = datetime.timezone(datetime.timedelta(hours=1)) + TEST_REMOVE_TIMEZONE = cartesian_product( TIMEZONES, [ @@ -58,6 +59,10 @@ TEST_REMOVE_TIMEZONE = cartesian_product( datetime.datetime(2024, 1, 1, 0, 1, 2), datetime.datetime(2024, 1, 1, 0, 1, 2), ), + ( + datetime.datetime(2024, 1, 1, 0, 1, 2, tzinfo=ONE_HOUR_PLUS), + datetime.datetime(2023, 12, 31, 23, 1, 2), + ), ], ) @@ -72,6 +77,10 @@ TEST_UTC_TIMEZONE = cartesian_product( datetime.datetime(2024, 1, 1, 0, 1, 2, tzinfo=UTC), datetime.datetime(2024, 1, 1, 0, 1, 2, tzinfo=UTC), ), + ( + datetime.datetime(2024, 1, 1, 0, 1, 2, tzinfo=ONE_HOUR_PLUS), + datetime.datetime(2023, 12, 31, 23, 1, 2, tzinfo=UTC), + ), ], ) @@ -109,6 +118,10 @@ TEST_EPOCH_TO_SECONDS = cartesian_product( [ (datetime.datetime(1970, 1, 1, 0, 1, 2, 0), 62), (datetime.datetime(1970, 1, 1, 0, 1, 2, 0, tzinfo=UTC), 62), + ( + datetime.datetime(1970, 1, 1, 0, 1, 2, 0, tzinfo=ONE_HOUR_PLUS), + 62 - 3600, + ), ], ) @@ -221,87 +234,42 @@ TEST_GET_RELATIVE_TIME_OPTION = cartesian_product( datetime.datetime(2024, 1, 1, 0, 0, 0), datetime.datetime(2024, 1, 2, 4, 5, 0, tzinfo=UTC), ), + ( + "20240102040506+0100", + "foo", + "cryptography", + False, + datetime.datetime(2024, 1, 1, 0, 0, 0), + datetime.datetime(2024, 1, 2, 3, 5, 6), + ), + ( + "202401020405+0100", + "foo", + "cryptography", + False, + datetime.datetime(2024, 1, 1, 0, 0, 0), + datetime.datetime(2024, 1, 2, 3, 5, 0), + ), + ( + "20240102040506+0100", + "foo", + "cryptography", + True, + datetime.datetime(2024, 1, 1, 0, 0, 0), + datetime.datetime(2024, 1, 2, 3, 5, 6, tzinfo=UTC), + ), + ( + "202401020405+0100", + "foo", + "cryptography", + True, + datetime.datetime(2024, 1, 1, 0, 0, 0), + datetime.datetime(2024, 1, 2, 3, 5, 0, tzinfo=UTC), + ), ], ) -if sys.version_info >= (3, 5): - ONE_HOUR_PLUS = datetime.timezone(datetime.timedelta(hours=1)) - - TEST_REMOVE_TIMEZONE.extend( - cartesian_product( - TIMEZONES, - [ - ( - datetime.datetime(2024, 1, 1, 0, 1, 2, tzinfo=ONE_HOUR_PLUS), - datetime.datetime(2023, 12, 31, 23, 1, 2), - ), - ], - ) - ) - TEST_UTC_TIMEZONE.extend( - cartesian_product( - TIMEZONES, - [ - ( - datetime.datetime(2024, 1, 1, 0, 1, 2, tzinfo=ONE_HOUR_PLUS), - datetime.datetime(2023, 12, 31, 23, 1, 2, tzinfo=UTC), - ), - ], - ) - ) - TEST_EPOCH_TO_SECONDS.extend( - cartesian_product( - TIMEZONES, - [ - ( - datetime.datetime(1970, 1, 1, 0, 1, 2, 0, tzinfo=ONE_HOUR_PLUS), - 62 - 3600, - ), - ], - ) - ) - TEST_GET_RELATIVE_TIME_OPTION.extend( - cartesian_product( - TIMEZONES, - [ - ( - "20240102040506+0100", - "foo", - "cryptography", - False, - datetime.datetime(2024, 1, 1, 0, 0, 0), - datetime.datetime(2024, 1, 2, 3, 5, 6), - ), - ( - "202401020405+0100", - "foo", - "cryptography", - False, - datetime.datetime(2024, 1, 1, 0, 0, 0), - datetime.datetime(2024, 1, 2, 3, 5, 0), - ), - ( - "20240102040506+0100", - "foo", - "cryptography", - True, - datetime.datetime(2024, 1, 1, 0, 0, 0), - datetime.datetime(2024, 1, 2, 3, 5, 6, tzinfo=UTC), - ), - ( - "202401020405+0100", - "foo", - "cryptography", - True, - datetime.datetime(2024, 1, 1, 0, 0, 0), - datetime.datetime(2024, 1, 2, 3, 5, 0, tzinfo=UTC), - ), - ], - ) - ) - - @pytest.mark.parametrize("timezone, input, expected", TEST_REMOVE_TIMEZONE) def test_remove_timezone(timezone, input, expected): with freeze_time("2024-02-03 04:05:06", tz_offset=timezone):