Use timezone aware functionality when using cryptography >= 42.0.0 (#727)

* Use timezone aware functionality when using cryptography >= 42.0.0.

* Adjust OpenSSH certificate code to avoid functions deprecated in Python 3.12.

* Strip timezone info from isoformat() output.

* InvalidityDate.invalidity_date currently has no _utc variant.
This commit is contained in:
Felix Fontein
2024-04-18 07:49:53 +02:00
committed by GitHub
parent 1b75f1aa9c
commit ae548de502
15 changed files with 215 additions and 64 deletions

View File

@@ -11,7 +11,6 @@ __metaclass__ = type
import base64
import binascii
import datetime
import os
import traceback
@@ -42,11 +41,15 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.math impor
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
get_now_datetime,
ensure_utc_timezone,
parse_name_field,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
CRYPTOGRAPHY_TIMEZONE,
cryptography_name_to_oid,
get_not_valid_after,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import (
@@ -373,8 +376,10 @@ class CryptographyBackend(CryptoBackend):
raise BackendException('Cannot parse certificate {0}: {1}'.format(cert_filename, e))
if now is None:
now = datetime.datetime.now()
return (cert.not_valid_after - now).days
now = get_now_datetime(with_timezone=CRYPTOGRAPHY_TIMEZONE)
elif CRYPTOGRAPHY_TIMEZONE:
now = ensure_utc_timezone(now)
return (get_not_valid_after(cert) - now).days
def create_chain_matcher(self, criterium):
'''

View File

@@ -19,6 +19,7 @@ from .basic import (
)
from .cryptography_support import (
CRYPTOGRAPHY_TIMEZONE,
cryptography_decode_name,
)
@@ -27,6 +28,11 @@ from ._obj2txt import (
)
# TODO: once cryptography has a _utc variant of InvalidityDate.invalidity_date, set this
# to True and adjust get_invalidity_date() accordingly.
# (https://github.com/pyca/cryptography/issues/10818)
CRYPTOGRAPHY_TIMEZONE_INVALIDITY_DATE = False
TIMESTAMP_FORMAT = "%Y%m%d%H%M%SZ"
@@ -55,7 +61,7 @@ else:
def cryptography_decode_revoked_certificate(cert):
result = {
'serial_number': cert.serial_number,
'revocation_date': cert.revocation_date,
'revocation_date': get_revocation_date(cert),
'issuer': None,
'issuer_critical': False,
'reason': None,
@@ -77,7 +83,7 @@ def cryptography_decode_revoked_certificate(cert):
pass
try:
ext = cert.extensions.get_extension_for_class(x509.InvalidityDate)
result['invalidity_date'] = ext.value.invalidity_date
result['invalidity_date'] = get_invalidity_date(ext.value)
result['invalidity_date_critical'] = ext.critical
except x509.ExtensionNotFound:
pass
@@ -112,3 +118,38 @@ def cryptography_get_signature_algorithm_oid_from_crl(crl):
crl._x509_crl.sig_alg.algorithm
)
return x509.oid.ObjectIdentifier(dotted)
def get_next_update(obj):
if CRYPTOGRAPHY_TIMEZONE:
return obj.next_update_utc
return obj.next_update
def get_last_update(obj):
if CRYPTOGRAPHY_TIMEZONE:
return obj.last_update_utc
return obj.last_update
def get_revocation_date(obj):
if CRYPTOGRAPHY_TIMEZONE:
return obj.revocation_date_utc
return obj.revocation_date
def get_invalidity_date(obj):
# TODO: special handling if CRYPTOGRAPHY_TIMEZONE_INVALIDITY_DATE is True
return obj.invalidity_date
def set_next_update(builder, value):
return builder.next_update(value)
def set_last_update(builder, value):
return builder.last_update(value)
def set_revocation_date(builder, value):
return builder.revocation_date(value)

View File

@@ -29,7 +29,9 @@ try:
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
import ipaddress
_HAS_CRYPTOGRAPHY = True
except ImportError:
_HAS_CRYPTOGRAPHY = False
# Error handled in the calling module.
pass
@@ -106,6 +108,11 @@ from ._objects import (
from ._obj2txt import obj2txt
CRYPTOGRAPHY_TIMEZONE = False
if _HAS_CRYPTOGRAPHY:
CRYPTOGRAPHY_TIMEZONE = LooseVersion(cryptography.__version__) >= LooseVersion('42.0.0')
DOTTED_OID = re.compile(r'^\d+(?:\.\d+)+$')
@@ -807,3 +814,23 @@ def cryptography_verify_certificate_signature(certificate, signer_public_key):
certificate.signature_hash_algorithm,
signer_public_key
)
def get_not_valid_after(obj):
if CRYPTOGRAPHY_TIMEZONE:
return obj.not_valid_after_utc
return obj.not_valid_after
def get_not_valid_before(obj):
if CRYPTOGRAPHY_TIMEZONE:
return obj.not_valid_before_utc
return obj.not_valid_before
def set_not_valid_after(builder, value):
return builder.not_valid_after(value)
def set_not_valid_before(builder, value):
return builder.not_valid_before(value)

View File

@@ -32,6 +32,8 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
cryptography_compare_public_keys,
get_not_valid_after,
get_not_valid_before,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.certificate_info import (
@@ -251,12 +253,12 @@ class CertificateBackend(object):
# Check not before
if not_before is not None and not self.ignore_timestamps:
if self.existing_certificate.not_valid_before != not_before:
if get_not_valid_before(self.existing_certificate) != not_before:
return True
# Check not after
if not_after is not None and not self.ignore_timestamps:
if self.existing_certificate.not_valid_after != not_after:
if get_not_valid_after(self.existing_certificate) != not_after:
return True
return False

View File

@@ -10,7 +10,6 @@ __metaclass__ = type
import datetime
import time
import os
from ansible.module_utils.common.text.converters import to_native, to_bytes
@@ -19,11 +18,14 @@ from ansible_collections.community.crypto.plugins.module_utils.ecs.api import EC
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
load_certificate,
get_now_datetime,
get_relative_time_option,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
CRYPTOGRAPHY_TIMEZONE,
cryptography_serial_number_of_cert,
get_not_valid_after,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.certificate import (
@@ -99,7 +101,7 @@ class EntrustCertificateBackend(CertificateBackend):
# Handle expiration (30 days if not specified)
expiry = self.notAfter
if not expiry:
gmt_now = datetime.datetime.fromtimestamp(time.mktime(time.gmtime()))
gmt_now = get_now_datetime(with_timezone=CRYPTOGRAPHY_TIMEZONE)
expiry = gmt_now + datetime.timedelta(days=365)
expiry_iso3339 = expiry.strftime("%Y-%m-%dT%H:%M:%S.00Z")
@@ -154,7 +156,7 @@ class EntrustCertificateBackend(CertificateBackend):
expiry = None
if self.backend == 'cryptography':
serial_number = "{0:X}".format(cryptography_serial_number_of_cert(self.existing_certificate))
expiry = self.existing_certificate.not_valid_after
expiry = get_not_valid_after(self.existing_certificate)
# get some information about the expiry of this certificate
expiry_iso3339 = expiry.strftime("%Y-%m-%dT%H:%M:%S.00Z")

View File

@@ -12,7 +12,6 @@ __metaclass__ = type
import abc
import binascii
import datetime
import traceback
from ansible.module_utils import six
@@ -24,13 +23,17 @@ from ansible_collections.community.crypto.plugins.module_utils.version import Lo
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
load_certificate,
get_fingerprint_of_bytes,
get_now_datetime,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
CRYPTOGRAPHY_TIMEZONE,
cryptography_decode_name,
cryptography_get_extensions_from_cert,
cryptography_oid_to_name,
cryptography_serial_number_of_cert,
get_not_valid_after,
get_not_valid_before,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.publickey_info import (
@@ -169,7 +172,7 @@ class CertificateInfoRetrieval(object):
not_after = self.get_not_after()
result['not_before'] = not_before.strftime(TIMESTAMP_FORMAT)
result['not_after'] = not_after.strftime(TIMESTAMP_FORMAT)
result['expired'] = not_after < datetime.datetime.utcnow()
result['expired'] = not_after < get_now_datetime(with_timezone=CRYPTOGRAPHY_TIMEZONE)
result['public_key'] = to_native(self._get_public_key_pem())
@@ -322,10 +325,10 @@ class CertificateInfoRetrievalCryptography(CertificateInfoRetrieval):
return None, False
def get_not_before(self):
return self.cert.not_valid_before
return get_not_valid_before(self.cert)
def get_not_after(self):
return self.cert.not_valid_after
return get_not_valid_after(self.cert)
def _get_public_key_pem(self):
return self.cert.public_key().public_bytes(

View File

@@ -31,6 +31,10 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptograp
cryptography_key_needs_digest_for_signing,
cryptography_serial_number_of_cert,
cryptography_verify_certificate_signature,
get_not_valid_after,
get_not_valid_before,
set_not_valid_after,
set_not_valid_before,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.certificate import (
@@ -120,8 +124,8 @@ class OwnCACertificateBackendCryptography(CertificateBackend):
cert_builder = cert_builder.subject_name(self.csr.subject)
cert_builder = cert_builder.issuer_name(self.ca_cert.subject)
cert_builder = cert_builder.serial_number(self.serial_number)
cert_builder = cert_builder.not_valid_before(self.notBefore)
cert_builder = cert_builder.not_valid_after(self.notAfter)
cert_builder = set_not_valid_before(cert_builder, self.notBefore)
cert_builder = set_not_valid_after(cert_builder, self.notAfter)
cert_builder = cert_builder.public_key(self.csr.public_key())
has_ski = False
for extension in self.csr.extensions:
@@ -220,8 +224,8 @@ class OwnCACertificateBackendCryptography(CertificateBackend):
if self.cert is None:
self.cert = self.existing_certificate
result.update({
'notBefore': self.cert.not_valid_before.strftime("%Y%m%d%H%M%SZ"),
'notAfter': self.cert.not_valid_after.strftime("%Y%m%d%H%M%SZ"),
'notBefore': get_not_valid_before(self.cert).strftime("%Y%m%d%H%M%SZ"),
'notAfter': get_not_valid_after(self.cert).strftime("%Y%m%d%H%M%SZ"),
'serial_number': cryptography_serial_number_of_cert(self.cert),
})

View File

@@ -22,6 +22,10 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptograp
cryptography_key_needs_digest_for_signing,
cryptography_serial_number_of_cert,
cryptography_verify_certificate_signature,
get_not_valid_after,
get_not_valid_before,
set_not_valid_after,
set_not_valid_before,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.certificate import (
@@ -95,8 +99,8 @@ class SelfSignedCertificateBackendCryptography(CertificateBackend):
cert_builder = cert_builder.subject_name(self.csr.subject)
cert_builder = cert_builder.issuer_name(self.csr.subject)
cert_builder = cert_builder.serial_number(self.serial_number)
cert_builder = cert_builder.not_valid_before(self.notBefore)
cert_builder = cert_builder.not_valid_after(self.notAfter)
cert_builder = set_not_valid_before(cert_builder, self.notBefore)
cert_builder = set_not_valid_after(cert_builder, self.notAfter)
cert_builder = cert_builder.public_key(self.privatekey.public_key())
has_ski = False
for extension in self.csr.extensions:
@@ -154,8 +158,8 @@ class SelfSignedCertificateBackendCryptography(CertificateBackend):
if self.cert is None:
self.cert = self.existing_certificate
result.update({
'notBefore': self.cert.not_valid_before.strftime("%Y%m%d%H%M%SZ"),
'notAfter': self.cert.not_valid_after.strftime("%Y%m%d%H%M%SZ"),
'notBefore': get_not_valid_before(self.cert).strftime("%Y%m%d%H%M%SZ"),
'notAfter': get_not_valid_after(self.cert).strftime("%Y%m%d%H%M%SZ"),
'serial_number': cryptography_serial_number_of_cert(self.cert),
})

View File

@@ -279,7 +279,19 @@ def parse_ordered_name_field(input_list, name_field_name):
return result
def convert_relative_to_datetime(relative_time_string):
def get_now_datetime(with_timezone):
if with_timezone:
return datetime.datetime.now(tz=datetime.timezone.utc)
return datetime.datetime.utcnow()
def ensure_utc_timezone(timestamp):
if timestamp.tzinfo is not None:
return timestamp
return timestamp.astimezone(datetime.timezone.utc)
def convert_relative_to_datetime(relative_time_string, with_timezone=False):
"""Get a datetime.datetime or None from a string in the time format described in sshd_config(5)"""
parsed_result = re.match(
@@ -304,13 +316,14 @@ def convert_relative_to_datetime(relative_time_string):
offset += datetime.timedelta(
seconds=int(parsed_result.group("seconds")))
now = get_now_datetime(with_timezone=with_timezone)
if parsed_result.group("prefix") == "+":
return datetime.datetime.utcnow() + offset
return now + offset
else:
return datetime.datetime.utcnow() - offset
return now - offset
def get_relative_time_option(input_string, input_name, backend='cryptography'):
def get_relative_time_option(input_string, input_name, backend='cryptography', with_timezone=False):
"""Return an absolute timespec if a relative timespec or an ASN1 formatted
string is provided.
@@ -323,7 +336,7 @@ def get_relative_time_option(input_string, input_name, backend='cryptography'):
input_string, input_name)
# Relative time
if result.startswith("+") or result.startswith("-"):
result_datetime = convert_relative_to_datetime(result)
result_datetime = convert_relative_to_datetime(result, with_timezone=with_timezone)
if backend == 'pyopenssl':
return result_datetime.strftime("%Y%m%d%H%M%SZ")
elif backend == 'cryptography':
@@ -332,9 +345,13 @@ def get_relative_time_option(input_string, input_name, backend='cryptography'):
if backend == 'cryptography':
for date_fmt in ['%Y%m%d%H%M%SZ', '%Y%m%d%H%MZ', '%Y%m%d%H%M%S%z', '%Y%m%d%H%M%z']:
try:
return datetime.datetime.strptime(result, date_fmt)
res = datetime.datetime.strptime(result, date_fmt)
except ValueError:
pass
else:
if with_timezone:
res = res.astimezone(datetime.timezone.utc)
return res
raise OpenSSLObjectError(
'The time spec "%s" for %s is invalid' %

View File

@@ -22,7 +22,9 @@ __metaclass__ = type
import abc
import binascii
import datetime as _datetime
import os
import sys
from base64 import b64encode
from datetime import datetime
from hashlib import sha256
@@ -61,8 +63,17 @@ _ECDSA_CURVE_IDENTIFIERS_LOOKUP = {
b'nistp521': 'ecdsa-nistp521',
}
_ALWAYS = datetime(1970, 1, 1)
_FOREVER = datetime.max
_USE_TIMEZONE = sys.version_info >= (3, 6)
def _ensure_utc_timezone_if_use_timezone(value):
if not _USE_TIMEZONE or value.tzinfo is not None:
return value
return value.astimezone(_datetime.timezone.utc)
_ALWAYS = _ensure_utc_timezone_if_use_timezone(datetime(1970, 1, 1))
_FOREVER = datetime(9999, 12, 31, 23, 59, 59, 999999, _datetime.timezone.utc) if _USE_TIMEZONE else datetime.max
_CRITICAL_OPTIONS = (
'force-command',
@@ -136,7 +147,7 @@ class OpensshCertificateTimeParameters(object):
elif dt == _FOREVER:
result = 'forever'
else:
result = dt.isoformat() if date_format == 'human_readable' else dt.strftime("%Y%m%d%H%M%S")
result = dt.isoformat().replace('+00:00', '') if date_format == 'human_readable' else dt.strftime("%Y%m%d%H%M%S")
elif date_format == 'timestamp':
td = dt - _ALWAYS
result = int((td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / 10 ** 6)
@@ -167,7 +178,10 @@ class OpensshCertificateTimeParameters(object):
result = _FOREVER
else:
try:
result = datetime.utcfromtimestamp(timestamp)
if _USE_TIMEZONE:
result = datetime.fromtimestamp(timestamp, tz=_datetime.timezone.utc)
else:
result = datetime.utcfromtimestamp(timestamp)
except OverflowError as e:
raise ValueError
return result
@@ -180,11 +194,11 @@ class OpensshCertificateTimeParameters(object):
elif time_string == 'forever':
result = _FOREVER
elif is_relative_time_string(time_string):
result = convert_relative_to_datetime(time_string)
result = convert_relative_to_datetime(time_string, with_timezone=_USE_TIMEZONE)
else:
for time_format in ("%Y-%m-%d", "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"):
try:
result = datetime.strptime(time_string, time_format)
result = _ensure_utc_timezone_if_use_timezone(datetime.strptime(time_string, time_format))
except ValueError:
pass
if result is None: