Improve handling of IDNA/Unicode domains (#436)

* Prepare IDNA/Unicode conversion code. Use to normalize input.

* Use IDNA library first (IDNA2008) and Python's IDNA2003 implementation as a fallback.

* Make sure idna is installed.

* Add changelog fragment.

* 'punycode' → 'idna'.

* Add name_encoding options and tests.

* Avoid invalid character for IDNA2008.

* Linting.

* Forgot to upate value.

* Work around cryptography bug. Fix port handling for URIs.

* Forgot other place sensitive to cryptography bug.

* Forgot one. (Will likely still fail.)

* Decode IDNA in _compress_entry() to avoid comparison screw-ups.

* Work around Python 3.5 problem in Ansible 2.9's default test container.

* Update changelog fragment.

* Fix error, add tests.

* Python 2 compatibility.

* Update requirements.
This commit is contained in:
Felix Fontein
2022-05-09 19:57:14 +02:00
committed by GitHub
parent 90efcc1ca7
commit 4cf951596f
20 changed files with 479 additions and 31 deletions

View File

@@ -95,12 +95,12 @@ def cryptography_decode_revoked_certificate(cert):
return result
def cryptography_dump_revoked(entry):
def cryptography_dump_revoked(entry, idn_rewrite='ignore'):
return {
'serial_number': entry['serial_number'],
'revocation_date': entry['revocation_date'].strftime(TIMESTAMP_FORMAT),
'issuer':
[cryptography_decode_name(issuer) for issuer in entry['issuer']]
[cryptography_decode_name(issuer, idn_rewrite=idn_rewrite) for issuer in entry['issuer']]
if entry['issuer'] is not None else None,
'issuer_critical': entry['issuer_critical'],
'reason': REVOCATION_REASON_MAP_INVERSE.get(entry['reason']) if entry['reason'] is not None else None,

View File

@@ -23,8 +23,11 @@ import base64
import binascii
import re
import sys
import traceback
from ansible.module_utils.common.text.converters import to_text, to_bytes
from ansible.module_utils.six.moves.urllib.parse import urlparse, urlunparse, ParseResult
from ._asn1 import serialize_asn1_string_as_der
from ansible_collections.community.crypto.plugins.module_utils.version import LooseVersion
@@ -80,6 +83,16 @@ except ImportError:
# Error handled in the calling module.
_load_pkcs12 = None
try:
import idna
HAS_IDNA = True
except ImportError:
HAS_IDNA = False
IDNA_IMP_ERROR = traceback.format_exc()
from ansible.module_utils.basic import missing_required_lib
from .basic import (
CRYPTOGRAPHY_HAS_DSA_SIGN,
CRYPTOGRAPHY_HAS_EC_SIGN,
@@ -359,6 +372,80 @@ def cryptography_parse_relative_distinguished_name(rdn):
return cryptography.x509.RelativeDistinguishedName(names)
def _is_ascii(value):
'''Check whether the Unicode string `value` contains only ASCII characters.'''
try:
value.encode("ascii")
return True
except UnicodeEncodeError:
return False
def _adjust_idn(value, idn_rewrite):
if idn_rewrite == 'ignore' or not value:
return value
if idn_rewrite == 'idna' and _is_ascii(value):
return value
if idn_rewrite not in ('idna', 'unicode'):
raise ValueError('Invalid value for idn_rewrite: "{0}"'.format(idn_rewrite))
if not HAS_IDNA:
raise OpenSSLObjectError(
missing_required_lib('idna', reason='to transform {what} DNS name "{name}" to {dest}'.format(
name=value,
what='IDNA' if idn_rewrite == 'unicode' else 'Unicode',
dest='Unicode' if idn_rewrite == 'unicode' else 'IDNA',
)))
# Since IDNA does not like '*' or empty labels (except one empty label at the end),
# we split and let IDNA only handle labels that are neither empty or '*'.
parts = value.split(u'.')
for index, part in enumerate(parts):
if part in (u'', u'*'):
continue
try:
if idn_rewrite == 'idna':
parts[index] = idna.encode(part).decode('ascii')
elif idn_rewrite == 'unicode' and part.startswith(u'xn--'):
parts[index] = idna.decode(part)
except idna.IDNAError as exc2008:
try:
if idn_rewrite == 'idna':
parts[index] = part.encode('idna').decode('ascii')
elif idn_rewrite == 'unicode' and part.startswith(u'xn--'):
parts[index] = part.encode('ascii').decode('idna')
except Exception as exc2003:
raise OpenSSLObjectError(
u'Error while transforming part "{part}" of {what} DNS name "{name}" to {dest}.'
u' IDNA2008 transformation resulted in "{exc2008}", IDNA2003 transformation resulted in "{exc2003}".'.format(
part=part,
name=value,
what='IDNA' if idn_rewrite == 'unicode' else 'Unicode',
dest='Unicode' if idn_rewrite == 'unicode' else 'IDNA',
exc2003=exc2003,
exc2008=exc2008,
))
return u'.'.join(parts)
def _adjust_idn_email(value, idn_rewrite):
idx = value.find(u'@')
if idx < 0:
return value
return u'{0}@{1}'.format(value[:idx], _adjust_idn(value[idx + 1:], idn_rewrite))
def _adjust_idn_url(value, idn_rewrite):
url = urlparse(value)
host = _adjust_idn(url.hostname, idn_rewrite)
if url.username is not None and url.password is not None:
host = u'{0}:{1}@{2}'.format(url.username, url.password, host)
elif url.username is not None:
host = u'{0}@{1}'.format(url.username, host)
if url.port is not None:
host = u'{0}:{1}'.format(host, url.port)
return urlunparse(
ParseResult(scheme=url.scheme, netloc=host, path=url.path, params=url.params, query=url.query, fragment=url.fragment))
def cryptography_get_name(name, what='Subject Alternative Name'):
'''
Given a name string, returns a cryptography x509.GeneralName object.
@@ -366,16 +453,16 @@ def cryptography_get_name(name, what='Subject Alternative Name'):
'''
try:
if name.startswith('DNS:'):
return x509.DNSName(to_text(name[4:]))
return x509.DNSName(_adjust_idn(to_text(name[4:]), 'idna'))
if name.startswith('IP:'):
address = to_text(name[3:])
if '/' in address:
return x509.IPAddress(ipaddress.ip_network(address))
return x509.IPAddress(ipaddress.ip_address(address))
if name.startswith('email:'):
return x509.RFC822Name(to_text(name[6:]))
return x509.RFC822Name(_adjust_idn_email(to_text(name[6:]), 'idna'))
if name.startswith('URI:'):
return x509.UniformResourceIdentifier(to_text(name[4:]))
return x509.UniformResourceIdentifier(_adjust_idn_url(to_text(name[4:]), 'idna'))
if name.startswith('RID:'):
m = re.match(r'^([0-9]+(?:\.[0-9]+)*)$', to_text(name[4:]))
if not m:
@@ -422,21 +509,23 @@ def _dn_escape_value(value):
return value
def cryptography_decode_name(name):
def cryptography_decode_name(name, idn_rewrite='ignore'):
'''
Given a cryptography x509.GeneralName object, returns a string.
Raises an OpenSSLObjectError if the name is not supported.
'''
if idn_rewrite not in ('ignore', 'idna', 'unicode'):
raise AssertionError('idn_rewrite must be one of "ignore", "idna", or "unicode"')
if isinstance(name, x509.DNSName):
return u'DNS:{0}'.format(name.value)
return u'DNS:{0}'.format(_adjust_idn(name.value, idn_rewrite))
if isinstance(name, x509.IPAddress):
if isinstance(name.value, (ipaddress.IPv4Network, ipaddress.IPv6Network)):
return u'IP:{0}/{1}'.format(name.value.network_address.compressed, name.value.prefixlen)
return u'IP:{0}'.format(name.value.compressed)
if isinstance(name, x509.RFC822Name):
return u'email:{0}'.format(name.value)
return u'email:{0}'.format(_adjust_idn_email(name.value, idn_rewrite))
if isinstance(name, x509.UniformResourceIdentifier):
return u'URI:{0}'.format(name.value)
return u'URI:{0}'.format(_adjust_idn_url(name.value, idn_rewrite))
if isinstance(name, x509.DirectoryName):
# According to https://datatracker.ietf.org/doc/html/rfc4514.html#section-2.1 the
# list needs to be reversed, and joined by commas

View File

@@ -207,6 +207,7 @@ class CertificateInfoRetrievalCryptography(CertificateInfoRetrieval):
"""Validate the supplied cert, using the cryptography backend"""
def __init__(self, module, content):
super(CertificateInfoRetrievalCryptography, self).__init__(module, 'cryptography', content)
self.name_encoding = module.params.get('name_encoding', 'ignore')
def _get_der_bytes(self):
return self.cert.public_bytes(serialization.Encoding.DER)
@@ -309,7 +310,7 @@ class CertificateInfoRetrievalCryptography(CertificateInfoRetrieval):
def _get_subject_alt_name(self):
try:
san_ext = self.cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
result = [cryptography_decode_name(san) for san in san_ext.value]
result = [cryptography_decode_name(san, idn_rewrite=self.name_encoding) for san in san_ext.value]
return result, san_ext.critical
except cryptography.x509.ExtensionNotFound:
return None, False
@@ -341,7 +342,7 @@ class CertificateInfoRetrievalCryptography(CertificateInfoRetrieval):
ext = self.cert.extensions.get_extension_for_class(x509.AuthorityKeyIdentifier)
issuer = None
if ext.value.authority_cert_issuer is not None:
issuer = [cryptography_decode_name(san) for san in ext.value.authority_cert_issuer]
issuer = [cryptography_decode_name(san, idn_rewrite=self.name_encoding) for san in ext.value.authority_cert_issuer]
return ext.value.key_identifier, issuer, ext.value.authority_cert_serial_number
except cryptography.x509.ExtensionNotFound:
return None, None, None

View File

@@ -51,6 +51,7 @@ class CRLInfoRetrieval(object):
self.module = module
self.content = content
self.list_revoked_certificates = list_revoked_certificates
self.name_encoding = module.params.get('name_encoding', 'ignore')
def get_info(self):
self.crl_pem = identify_pem_format(self.content)
@@ -86,7 +87,7 @@ class CRLInfoRetrieval(object):
result['revoked_certificates'] = []
for cert in self.crl:
entry = cryptography_decode_revoked_certificate(cert)
result['revoked_certificates'].append(cryptography_dump_revoked(entry))
result['revoked_certificates'].append(cryptography_dump_revoked(entry, idn_rewrite=self.name_encoding))
return result

View File

@@ -174,6 +174,7 @@ class CSRInfoRetrievalCryptography(CSRInfoRetrieval):
"""Validate the supplied CSR, using the cryptography backend"""
def __init__(self, module, content, validate_signature):
super(CSRInfoRetrievalCryptography, self).__init__(module, 'cryptography', content, validate_signature)
self.name_encoding = module.params.get('name_encoding', 'ignore')
def _get_subject_ordered(self):
result = []
@@ -256,7 +257,7 @@ class CSRInfoRetrievalCryptography(CSRInfoRetrieval):
def _get_subject_alt_name(self):
try:
san_ext = self.csr.extensions.get_extension_for_class(x509.SubjectAlternativeName)
result = [cryptography_decode_name(san) for san in san_ext.value]
result = [cryptography_decode_name(san, idn_rewrite=self.name_encoding) for san in san_ext.value]
return result, san_ext.critical
except cryptography.x509.ExtensionNotFound:
return None, False
@@ -264,8 +265,8 @@ class CSRInfoRetrievalCryptography(CSRInfoRetrieval):
def _get_name_constraints(self):
try:
nc_ext = self.csr.extensions.get_extension_for_class(x509.NameConstraints)
permitted = [cryptography_decode_name(san) for san in nc_ext.value.permitted_subtrees or []]
excluded = [cryptography_decode_name(san) for san in nc_ext.value.excluded_subtrees or []]
permitted = [cryptography_decode_name(san, idn_rewrite=self.name_encoding) for san in nc_ext.value.permitted_subtrees or []]
excluded = [cryptography_decode_name(san, idn_rewrite=self.name_encoding) for san in nc_ext.value.excluded_subtrees or []]
return permitted, excluded, nc_ext.critical
except cryptography.x509.ExtensionNotFound:
return None, None, False
@@ -291,7 +292,7 @@ class CSRInfoRetrievalCryptography(CSRInfoRetrieval):
ext = self.csr.extensions.get_extension_for_class(x509.AuthorityKeyIdentifier)
issuer = None
if ext.value.authority_cert_issuer is not None:
issuer = [cryptography_decode_name(san) for san in ext.value.authority_cert_issuer]
issuer = [cryptography_decode_name(san, idn_rewrite=self.name_encoding) for san in ext.value.authority_cert_issuer]
return ext.value.key_identifier, issuer, ext.value.authority_cert_serial_number
except cryptography.x509.ExtensionNotFound:
return None, None, None