From 838bdd711bdb4dca5474036a16378a6d80b0d333 Mon Sep 17 00:00:00 2001 From: Felix Fontein Date: Tue, 28 Sep 2021 18:15:38 +0200 Subject: [PATCH] Make Dirname (de)serialization conformant to RFC 4514 (#274) * Adjust dirName serialization to RFC 4514. * Adjust deserialization to RFC 4514. * Add changelog fragment. * Use Unicode strings, and work around Python 2 and Python 3 differences and problems with old cryptography versions. * Work with bytes, not Unicode strings, to handle escaping of Unicode endpoints correctly. --- changelogs/fragments/274-dirname-rfc4514.yml | 2 + .../crypto/cryptography_support.py | 109 ++++++++++++------ .../targets/openssl_csr/tasks/impl.yml | 12 +- .../targets/openssl_csr/tests/validate.yml | 4 +- .../crypto/test_cryptography_support.py | 78 ++++++++++++- 5 files changed, 161 insertions(+), 44 deletions(-) create mode 100644 changelogs/fragments/274-dirname-rfc4514.yml diff --git a/changelogs/fragments/274-dirname-rfc4514.yml b/changelogs/fragments/274-dirname-rfc4514.yml new file mode 100644 index 00000000..23750439 --- /dev/null +++ b/changelogs/fragments/274-dirname-rfc4514.yml @@ -0,0 +1,2 @@ +breaking_changes: +- "Adjust ``dirName`` text parsing and to text converting code to conform to `Sections 2 and 3 of RFC 4514 `_. This is similar to how `cryptography handles this `_ (https://github.com/ansible-collections/community.crypto/pull/274)." diff --git a/plugins/module_utils/crypto/cryptography_support.py b/plugins/module_utils/crypto/cryptography_support.py index fd05a226..6c576a76 100644 --- a/plugins/module_utils/crypto/cryptography_support.py +++ b/plugins/module_utils/crypto/cryptography_support.py @@ -22,6 +22,7 @@ __metaclass__ = type import base64 import binascii import re +import sys from ansible.module_utils.common.text.converters import to_text, to_bytes from ._asn1 import serialize_asn1_string_as_der @@ -161,34 +162,68 @@ def _parse_hex(bytesstr): return data -DN_COMPONENT_START_RE = re.compile(r'^ *([a-zA-z0-9]+) *= *') +DN_COMPONENT_START_RE = re.compile(b'^ *([a-zA-z0-9.]+) *= *') +DN_HEX_LETTER = b'0123456789abcdef' -def _parse_dn_component(name, sep=',', sep_str='\\', decode_remainder=True): +if sys.version_info[0] < 3: + _int_to_byte = chr +else: + def _int_to_byte(value): + return bytes((value, )) + + +def _parse_dn_component(name, sep=b',', decode_remainder=True): m = DN_COMPONENT_START_RE.match(name) if not m: - raise OpenSSLObjectError('cannot start part in "{0}"'.format(name)) - oid = cryptography_name_to_oid(m.group(1)) + raise OpenSSLObjectError(u'cannot start part in "{0}"'.format(to_text(name))) + oid = cryptography_name_to_oid(to_text(m.group(1))) idx = len(m.group(0)) decoded_name = [] + sep_str = sep + b'\\' if decode_remainder: length = len(name) - while idx < length: - i = idx - while i < length and name[i] not in sep_str: - i += 1 - if i > idx: - decoded_name.append(name[idx:i]) - idx = i - while idx + 1 < length and name[idx] == '\\': - decoded_name.append(name[idx + 1]) + if length > idx and name[idx:idx + 1] == b'#': + # Decoding a hex string + idx += 1 + while idx + 1 < length: + ch1 = name[idx:idx + 1] + ch2 = name[idx + 1:idx + 2] + idx1 = DN_HEX_LETTER.find(ch1.lower()) + idx2 = DN_HEX_LETTER.find(ch2.lower()) + if idx1 < 0 or idx2 < 0: + raise OpenSSLObjectError(u'Invalid hex sequence entry "{0}"'.format(to_text(ch1 + ch2))) idx += 2 - if idx < length and name[idx] == sep: - break + decoded_name.append(_int_to_byte(idx1 * 16 + idx2)) + else: + # Decoding a regular string + while idx < length: + i = idx + while i < length and name[i:i + 1] not in sep_str: + i += 1 + if i > idx: + decoded_name.append(name[idx:i]) + idx = i + while idx + 1 < length and name[idx:idx + 1] == b'\\': + ch = name[idx + 1:idx + 2] + idx1 = DN_HEX_LETTER.find(ch.lower()) + if idx1 >= 0: + if idx + 2 >= length: + raise OpenSSLObjectError(u'Hex escape sequence "\\{0}" incomplete at end of string'.format(to_text(ch))) + ch2 = name[idx + 2:idx + 3] + idx2 = DN_HEX_LETTER.find(ch2.lower()) + if idx2 < 0: + raise OpenSSLObjectError(u'Hex escape sequence "\\{0}" has invalid second letter'.format(to_text(ch + ch2))) + ch = _int_to_byte(idx1 * 16 + idx2) + idx += 1 + idx += 2 + decoded_name.append(ch) + if idx < length and name[idx:idx + 1] == sep: + break else: decoded_name.append(name[idx:]) idx = len(name) - return x509.NameAttribute(oid, ''.join(decoded_name)), name[idx:] + return x509.NameAttribute(oid, to_text(b''.join(decoded_name))), name[idx:] def _parse_dn(name): @@ -199,21 +234,20 @@ def _parse_dn(name): ''' original_name = name name = name.lstrip() - sep = ',' - if name.startswith('/'): - sep = '/' + sep = b',' + if name.startswith(b'/'): + sep = b'/' name = name[1:] - sep_str = sep + '\\' result = [] while name: try: - attribute, name = _parse_dn_component(name, sep=sep, sep_str=sep_str) + attribute, name = _parse_dn_component(name, sep=sep) except OpenSSLObjectError as e: - raise OpenSSLObjectError('Error while parsing distinguished name "{0}": {1}'.format(original_name, e)) + raise OpenSSLObjectError(u'Error while parsing distinguished name "{0}": {1}'.format(to_text(original_name), e)) result.append(attribute) if name: - if name[0] != sep or len(name) < 2: - raise OpenSSLObjectError('Error while parsing distinguished name "{0}": unexpected end of string'.format(original_name)) + if name[0:1] != sep or len(name) < 2: + raise OpenSSLObjectError(u'Error while parsing distinguished name "{0}": unexpected end of string'.format(to_text(original_name))) name = name[1:] return result @@ -222,9 +256,9 @@ def cryptography_parse_relative_distinguished_name(rdn): names = [] for part in rdn: try: - names.append(_parse_dn_component(to_text(part), decode_remainder=False)[0]) + names.append(_parse_dn_component(to_bytes(part), decode_remainder=False)[0]) except OpenSSLObjectError as e: - raise OpenSSLObjectError('Error while parsing relative distinguished name "{0}": {1}'.format(part, e)) + raise OpenSSLObjectError(u'Error while parsing relative distinguished name "{0}": {1}'.format(part, e)) return cryptography.x509.RelativeDistinguishedName(names) @@ -268,7 +302,7 @@ def cryptography_get_name(name, what='Subject Alternative Name'): b_value = serialize_asn1_string_as_der(value) return x509.OtherName(x509.ObjectIdentifier(oid), b_value) if name.startswith('dirName:'): - return x509.DirectoryName(x509.Name(_parse_dn(to_text(name[8:])))) + return x509.DirectoryName(x509.Name(reversed(_parse_dn(to_bytes(name[8:]))))) except Exception as e: raise OpenSSLObjectError('Cannot parse {what} "{name}": {error}'.format(name=name, what=what, error=e)) if ':' not in name: @@ -280,11 +314,14 @@ def _dn_escape_value(value): ''' Escape Distinguished Name's attribute value. ''' - value = value.replace('\\', '\\\\') - for ch in [',', '#', '+', '<', '>', ';', '"', '=', '/']: - value = value.replace(ch, '\\%s' % ch) - if value.startswith(' '): - value = r'\ ' + value[1:] + value = value.replace('\\', r'\\') + for ch in [',', '+', '<', '>', ';', '"']: + value = value.replace(ch, r'\%s' % ch) + value = value.replace('\0', r'\00') + if value.startswith((' ', '#')): + value = r'\%s' % value[0] + value[1:] + if value.endswith(' '): + value = value[:-1] + r'\ ' return value @@ -304,9 +341,11 @@ def cryptography_decode_name(name): if isinstance(name, x509.UniformResourceIdentifier): return 'URI:{0}'.format(name.value) if isinstance(name, x509.DirectoryName): - return 'dirName:' + ''.join([ - '/{0}={1}'.format(cryptography_oid_to_name(attribute.oid, short=True), _dn_escape_value(attribute.value)) - for attribute in name.value + # According to https://datatracker.ietf.org/doc/html/rfc4514.html#section-2.1 the + # list needs to be reversed, and joined by commas + return 'dirName:' + ','.join([ + '{0}={1}'.format(cryptography_oid_to_name(attribute.oid, short=True), _dn_escape_value(attribute.value)) + for attribute in reversed(list(name.value)) ]) if isinstance(name, x509.RegisteredID): return 'RID:{0}'.format(name.value.dotted_string) diff --git a/tests/integration/targets/openssl_csr/tasks/impl.yml b/tests/integration/targets/openssl_csr/tasks/impl.yml index e3e141bf..0d9041de 100644 --- a/tests/integration/targets/openssl_csr/tasks/impl.yml +++ b/tests/integration/targets/openssl_csr/tasks/impl.yml @@ -590,8 +590,8 @@ - "RID:1.2.3.4" - "otherName:1.2.3.4;0c:07:63:65:72:74:72:65:71" - "otherName:1.3.6.1.4.1.311.20.2.3;UTF8:bob@localhost" - - "dirName:O = Example Net, CN = example.net" - - "dirName:/O=Example Com/CN=example.com" + - "dirName:CN = example.net, O = Example Net" + - "dirName:CN=example.com,O=Example Com" value_for_name_constraints_permitted: - "DNS:www.example.com" - "IP:1.2.3.0/24" @@ -675,8 +675,8 @@ - "RID:1.2.3.4" - "otherName:1.2.3.4;0c:07:63:65:72:74:72:65:71" - "otherName:1.3.6.1.4.1.311.20.2.3;UTF8:bob@localhost" - - "dirName:O=Example Net,CN=example.net" - - "dirName:/O = Example Com/CN = example.com" + - "dirName:CN=example.net,O=Example Net" + - "dirName:CN = example.com,O = Example Com" value_for_name_constraints_permitted: - "DNS:www.example.com" - "IP:1.2.3.0/255.255.255.0" @@ -761,8 +761,8 @@ - "RID:1.2.3.4" - "otherName:1.2.3.4;0c:07:63:65:72:74:72:65:71" - "otherName:1.3.6.1.4.1.311.20.2.3;UTF8:bob@localhost" - - "dirName:O =Example Net, CN= example.net" - - "dirName:/O =Example Com/CN= example.com" + - "dirName:CN= example.net, O =Example Net" + - "dirName:/CN= example.com/O =Example Com" value_for_name_constraints_permitted: - "DNS:www.example.com" - "IP:1.2.3.0/255.255.255.0" diff --git a/tests/integration/targets/openssl_csr/tests/validate.yml b/tests/integration/targets/openssl_csr/tests/validate.yml index 24638197..969ae0ef 100644 --- a/tests/integration/targets/openssl_csr/tests/validate.yml +++ b/tests/integration/targets/openssl_csr/tests/validate.yml @@ -258,8 +258,8 @@ "RID:1.2.3.4", "otherName:1.2.3.4;0c:07:63:65:72:74:72:65:71", "otherName:1.3.6.1.4.1.311.20.2.3;0c:0d:62:6f:62:40:6c:6f:63:61:6c:68:6f:73:74", - "dirName:/O=Example Net/CN=example.net", - "dirName:/O=Example Com/CN=example.com" + "dirName:CN=example.net,O=Example Net", + "dirName:CN=example.com,O=Example Com" ] - everything_info.subject_key_identifier == "00:11:22:33" - everything_info.extended_key_usage == [ diff --git a/tests/unit/plugins/module_utils/crypto/test_cryptography_support.py b/tests/unit/plugins/module_utils/crypto/test_cryptography_support.py index 6a5545b0..d44d0efb 100644 --- a/tests/unit/plugins/module_utils/crypto/test_cryptography_support.py +++ b/tests/unit/plugins/module_utils/crypto/test_cryptography_support.py @@ -6,6 +6,12 @@ from __future__ import absolute_import, division, print_function __metaclass__ = type +import re +import sys + +from distutils.version import LooseVersion + +import cryptography import pytest from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( @@ -14,11 +20,15 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.basic impo from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( cryptography_get_name, + _parse_dn_component, + _parse_dn, ) +from cryptography.x509 import NameAttribute, oid + def test_cryptography_get_name_invalid_prefix(): - with pytest.raises(OpenSSLObjectError, match="Cannot parse Subject Alternative Name"): + with pytest.raises(OpenSSLObjectError, match="^Cannot parse Subject Alternative Name"): cryptography_get_name('fake:value') @@ -31,3 +41,69 @@ def test_cryptography_get_name_other_name_utfstring(): actual = cryptography_get_name('otherName:1.3.6.1.4.1.311.20.2.3;UTF8:Hello World') assert actual.type_id.dotted_string == '1.3.6.1.4.1.311.20.2.3' assert actual.value == b'\x0c\x0bHello World' + + +@pytest.mark.parametrize('name, options, expected', [ + (b'CN=x ', {}, (NameAttribute(oid.NameOID.COMMON_NAME, u'x '), b'')), + (b'CN=\\ ', {}, (NameAttribute(oid.NameOID.COMMON_NAME, u' '), b'')), + (b'CN=\\#', {}, (NameAttribute(oid.NameOID.COMMON_NAME, u'#'), b'')), + (b'CN=#402032', {}, (NameAttribute(oid.NameOID.COMMON_NAME, u'@ 2'), b'')), + (b'CN = x ', {}, (NameAttribute(oid.NameOID.COMMON_NAME, u'x '), b'')), + (b'CN = x\\, ', {}, (NameAttribute(oid.NameOID.COMMON_NAME, u'x, '), b'')), + (b'CN = x\\40 ', {}, (NameAttribute(oid.NameOID.COMMON_NAME, u'x@ '), b'')), + (b'CN = \\ , / ', {}, (NameAttribute(oid.NameOID.COMMON_NAME, u' '), b', / ')), + (b'CN = \\ , / ', {'sep': b'/'}, (NameAttribute(oid.NameOID.COMMON_NAME, u' , '), b'/ ')), + (b'CN = \\ , / ', {'decode_remainder': False}, (NameAttribute(oid.NameOID.COMMON_NAME, u'\\ , / '), b'')), + # Some examples from https://datatracker.ietf.org/doc/html/rfc4514#section-4: + (b'CN=James \\"Jim\\" Smith\\, III', {}, (NameAttribute(oid.NameOID.COMMON_NAME, u'James "Jim" Smith, III'), b'')), + (b'CN=Before\\0dAfter', {}, (NameAttribute(oid.NameOID.COMMON_NAME, u'Before\x0dAfter'), b'')), + (b'1.3.6.1.4.1.1466.0=#04024869', {}, (NameAttribute(oid.ObjectIdentifier(u'1.3.6.1.4.1.1466.0'), u'\x04\x02Hi'), b'')), + (b'CN=Lu\\C4\\8Di\\C4\\87', {}, (NameAttribute(oid.NameOID.COMMON_NAME, u'Lučić'), b'')), +]) +def test_parse_dn_component(name, options, expected): + result = _parse_dn_component(name, **options) + print(result, expected) + assert result == expected + + +# Cryptography < 2.9 does not allow empty strings +# (https://github.com/pyca/cryptography/commit/87b2749c52e688c809f1861e55d958c64147493c) +if LooseVersion(cryptography.__version__) >= LooseVersion('2.9'): + @pytest.mark.parametrize('name, options, expected', [ + (b'CN=', {}, (NameAttribute(oid.NameOID.COMMON_NAME, u''), b'')), + (b'CN= ', {}, (NameAttribute(oid.NameOID.COMMON_NAME, u''), b'')), + ]) + def test_parse_dn_component_not_py26(name, options, expected): + result = _parse_dn_component(name, **options) + print(result, expected) + assert result == expected + + +@pytest.mark.parametrize('name, options, message', [ + (b'CN=\\0', {}, u'Hex escape sequence "\\0" incomplete at end of string'), + (b'CN=\\0,', {}, u'Hex escape sequence "\\0," has invalid second letter'), + (b'CN=#0,', {}, u'Invalid hex sequence entry "0,"'), +]) +def test_parse_dn_component_failure(name, options, message): + with pytest.raises(OpenSSLObjectError, match=u'^%s$' % re.escape(message)): + result = _parse_dn_component(name, **options) + + +@pytest.mark.parametrize('name, expected', [ + (b'CN=foo', [NameAttribute(oid.NameOID.COMMON_NAME, u'foo')]), + (b'CN=foo,CN=bar', [NameAttribute(oid.NameOID.COMMON_NAME, u'foo'), NameAttribute(oid.NameOID.COMMON_NAME, u'bar')]), + (b'CN = foo , CN = bar', [NameAttribute(oid.NameOID.COMMON_NAME, u'foo '), NameAttribute(oid.NameOID.COMMON_NAME, u'bar')]), +]) +def test_parse_dn(name, expected): + result = _parse_dn(name) + print(result, expected) + assert result == expected + + +@pytest.mark.parametrize('name, message', [ + (b'CN=\\0', u'Error while parsing distinguished name "CN=\\0": Hex escape sequence "\\0" incomplete at end of string'), + (b'CN=x,', u'Error while parsing distinguished name "CN=x,": unexpected end of string'), +]) +def test_parse_dn_failure(name, message): + with pytest.raises(OpenSSLObjectError, match=u'^%s$' % re.escape(message)): + result = _parse_dn(name)