mirror of
https://github.com/ansible-collections/community.crypto.git
synced 2026-05-06 13:22:58 +00:00
Make Dirname (de)serialization conformant to RFC 4514 (#274)
* Adjust dirName serialization to RFC 4514. * Adjust deserialization to RFC 4514. * Add changelog fragment. * Use Unicode strings, and work around Python 2 and Python 3 differences and problems with old cryptography versions. * Work with bytes, not Unicode strings, to handle escaping of Unicode endpoints correctly.
This commit is contained in:
2
changelogs/fragments/274-dirname-rfc4514.yml
Normal file
2
changelogs/fragments/274-dirname-rfc4514.yml
Normal file
@@ -0,0 +1,2 @@
|
||||
breaking_changes:
|
||||
- "Adjust ``dirName`` text parsing and to text converting code to conform to `Sections 2 and 3 of RFC 4514 <https://datatracker.ietf.org/doc/html/rfc4514.html>`_. This is similar to how `cryptography handles this <https://cryptography.io/en/latest/x509/reference/#cryptography.x509.Name.rfc4514_string>`_ (https://github.com/ansible-collections/community.crypto/pull/274)."
|
||||
@@ -22,6 +22,7 @@ __metaclass__ = type
|
||||
import base64
|
||||
import binascii
|
||||
import re
|
||||
import sys
|
||||
|
||||
from ansible.module_utils.common.text.converters import to_text, to_bytes
|
||||
from ._asn1 import serialize_asn1_string_as_der
|
||||
@@ -161,34 +162,68 @@ def _parse_hex(bytesstr):
|
||||
return data
|
||||
|
||||
|
||||
DN_COMPONENT_START_RE = re.compile(r'^ *([a-zA-z0-9]+) *= *')
|
||||
DN_COMPONENT_START_RE = re.compile(b'^ *([a-zA-z0-9.]+) *= *')
|
||||
DN_HEX_LETTER = b'0123456789abcdef'
|
||||
|
||||
|
||||
def _parse_dn_component(name, sep=',', sep_str='\\', decode_remainder=True):
|
||||
if sys.version_info[0] < 3:
|
||||
_int_to_byte = chr
|
||||
else:
|
||||
def _int_to_byte(value):
|
||||
return bytes((value, ))
|
||||
|
||||
|
||||
def _parse_dn_component(name, sep=b',', decode_remainder=True):
|
||||
m = DN_COMPONENT_START_RE.match(name)
|
||||
if not m:
|
||||
raise OpenSSLObjectError('cannot start part in "{0}"'.format(name))
|
||||
oid = cryptography_name_to_oid(m.group(1))
|
||||
raise OpenSSLObjectError(u'cannot start part in "{0}"'.format(to_text(name)))
|
||||
oid = cryptography_name_to_oid(to_text(m.group(1)))
|
||||
idx = len(m.group(0))
|
||||
decoded_name = []
|
||||
sep_str = sep + b'\\'
|
||||
if decode_remainder:
|
||||
length = len(name)
|
||||
while idx < length:
|
||||
i = idx
|
||||
while i < length and name[i] not in sep_str:
|
||||
i += 1
|
||||
if i > idx:
|
||||
decoded_name.append(name[idx:i])
|
||||
idx = i
|
||||
while idx + 1 < length and name[idx] == '\\':
|
||||
decoded_name.append(name[idx + 1])
|
||||
if length > idx and name[idx:idx + 1] == b'#':
|
||||
# Decoding a hex string
|
||||
idx += 1
|
||||
while idx + 1 < length:
|
||||
ch1 = name[idx:idx + 1]
|
||||
ch2 = name[idx + 1:idx + 2]
|
||||
idx1 = DN_HEX_LETTER.find(ch1.lower())
|
||||
idx2 = DN_HEX_LETTER.find(ch2.lower())
|
||||
if idx1 < 0 or idx2 < 0:
|
||||
raise OpenSSLObjectError(u'Invalid hex sequence entry "{0}"'.format(to_text(ch1 + ch2)))
|
||||
idx += 2
|
||||
if idx < length and name[idx] == sep:
|
||||
break
|
||||
decoded_name.append(_int_to_byte(idx1 * 16 + idx2))
|
||||
else:
|
||||
# Decoding a regular string
|
||||
while idx < length:
|
||||
i = idx
|
||||
while i < length and name[i:i + 1] not in sep_str:
|
||||
i += 1
|
||||
if i > idx:
|
||||
decoded_name.append(name[idx:i])
|
||||
idx = i
|
||||
while idx + 1 < length and name[idx:idx + 1] == b'\\':
|
||||
ch = name[idx + 1:idx + 2]
|
||||
idx1 = DN_HEX_LETTER.find(ch.lower())
|
||||
if idx1 >= 0:
|
||||
if idx + 2 >= length:
|
||||
raise OpenSSLObjectError(u'Hex escape sequence "\\{0}" incomplete at end of string'.format(to_text(ch)))
|
||||
ch2 = name[idx + 2:idx + 3]
|
||||
idx2 = DN_HEX_LETTER.find(ch2.lower())
|
||||
if idx2 < 0:
|
||||
raise OpenSSLObjectError(u'Hex escape sequence "\\{0}" has invalid second letter'.format(to_text(ch + ch2)))
|
||||
ch = _int_to_byte(idx1 * 16 + idx2)
|
||||
idx += 1
|
||||
idx += 2
|
||||
decoded_name.append(ch)
|
||||
if idx < length and name[idx:idx + 1] == sep:
|
||||
break
|
||||
else:
|
||||
decoded_name.append(name[idx:])
|
||||
idx = len(name)
|
||||
return x509.NameAttribute(oid, ''.join(decoded_name)), name[idx:]
|
||||
return x509.NameAttribute(oid, to_text(b''.join(decoded_name))), name[idx:]
|
||||
|
||||
|
||||
def _parse_dn(name):
|
||||
@@ -199,21 +234,20 @@ def _parse_dn(name):
|
||||
'''
|
||||
original_name = name
|
||||
name = name.lstrip()
|
||||
sep = ','
|
||||
if name.startswith('/'):
|
||||
sep = '/'
|
||||
sep = b','
|
||||
if name.startswith(b'/'):
|
||||
sep = b'/'
|
||||
name = name[1:]
|
||||
sep_str = sep + '\\'
|
||||
result = []
|
||||
while name:
|
||||
try:
|
||||
attribute, name = _parse_dn_component(name, sep=sep, sep_str=sep_str)
|
||||
attribute, name = _parse_dn_component(name, sep=sep)
|
||||
except OpenSSLObjectError as e:
|
||||
raise OpenSSLObjectError('Error while parsing distinguished name "{0}": {1}'.format(original_name, e))
|
||||
raise OpenSSLObjectError(u'Error while parsing distinguished name "{0}": {1}'.format(to_text(original_name), e))
|
||||
result.append(attribute)
|
||||
if name:
|
||||
if name[0] != sep or len(name) < 2:
|
||||
raise OpenSSLObjectError('Error while parsing distinguished name "{0}": unexpected end of string'.format(original_name))
|
||||
if name[0:1] != sep or len(name) < 2:
|
||||
raise OpenSSLObjectError(u'Error while parsing distinguished name "{0}": unexpected end of string'.format(to_text(original_name)))
|
||||
name = name[1:]
|
||||
return result
|
||||
|
||||
@@ -222,9 +256,9 @@ def cryptography_parse_relative_distinguished_name(rdn):
|
||||
names = []
|
||||
for part in rdn:
|
||||
try:
|
||||
names.append(_parse_dn_component(to_text(part), decode_remainder=False)[0])
|
||||
names.append(_parse_dn_component(to_bytes(part), decode_remainder=False)[0])
|
||||
except OpenSSLObjectError as e:
|
||||
raise OpenSSLObjectError('Error while parsing relative distinguished name "{0}": {1}'.format(part, e))
|
||||
raise OpenSSLObjectError(u'Error while parsing relative distinguished name "{0}": {1}'.format(part, e))
|
||||
return cryptography.x509.RelativeDistinguishedName(names)
|
||||
|
||||
|
||||
@@ -268,7 +302,7 @@ def cryptography_get_name(name, what='Subject Alternative Name'):
|
||||
b_value = serialize_asn1_string_as_der(value)
|
||||
return x509.OtherName(x509.ObjectIdentifier(oid), b_value)
|
||||
if name.startswith('dirName:'):
|
||||
return x509.DirectoryName(x509.Name(_parse_dn(to_text(name[8:]))))
|
||||
return x509.DirectoryName(x509.Name(reversed(_parse_dn(to_bytes(name[8:])))))
|
||||
except Exception as e:
|
||||
raise OpenSSLObjectError('Cannot parse {what} "{name}": {error}'.format(name=name, what=what, error=e))
|
||||
if ':' not in name:
|
||||
@@ -280,11 +314,14 @@ def _dn_escape_value(value):
|
||||
'''
|
||||
Escape Distinguished Name's attribute value.
|
||||
'''
|
||||
value = value.replace('\\', '\\\\')
|
||||
for ch in [',', '#', '+', '<', '>', ';', '"', '=', '/']:
|
||||
value = value.replace(ch, '\\%s' % ch)
|
||||
if value.startswith(' '):
|
||||
value = r'\ ' + value[1:]
|
||||
value = value.replace('\\', r'\\')
|
||||
for ch in [',', '+', '<', '>', ';', '"']:
|
||||
value = value.replace(ch, r'\%s' % ch)
|
||||
value = value.replace('\0', r'\00')
|
||||
if value.startswith((' ', '#')):
|
||||
value = r'\%s' % value[0] + value[1:]
|
||||
if value.endswith(' '):
|
||||
value = value[:-1] + r'\ '
|
||||
return value
|
||||
|
||||
|
||||
@@ -304,9 +341,11 @@ def cryptography_decode_name(name):
|
||||
if isinstance(name, x509.UniformResourceIdentifier):
|
||||
return 'URI:{0}'.format(name.value)
|
||||
if isinstance(name, x509.DirectoryName):
|
||||
return 'dirName:' + ''.join([
|
||||
'/{0}={1}'.format(cryptography_oid_to_name(attribute.oid, short=True), _dn_escape_value(attribute.value))
|
||||
for attribute in name.value
|
||||
# According to https://datatracker.ietf.org/doc/html/rfc4514.html#section-2.1 the
|
||||
# list needs to be reversed, and joined by commas
|
||||
return 'dirName:' + ','.join([
|
||||
'{0}={1}'.format(cryptography_oid_to_name(attribute.oid, short=True), _dn_escape_value(attribute.value))
|
||||
for attribute in reversed(list(name.value))
|
||||
])
|
||||
if isinstance(name, x509.RegisteredID):
|
||||
return 'RID:{0}'.format(name.value.dotted_string)
|
||||
|
||||
@@ -590,8 +590,8 @@
|
||||
- "RID:1.2.3.4"
|
||||
- "otherName:1.2.3.4;0c:07:63:65:72:74:72:65:71"
|
||||
- "otherName:1.3.6.1.4.1.311.20.2.3;UTF8:bob@localhost"
|
||||
- "dirName:O = Example Net, CN = example.net"
|
||||
- "dirName:/O=Example Com/CN=example.com"
|
||||
- "dirName:CN = example.net, O = Example Net"
|
||||
- "dirName:CN=example.com,O=Example Com"
|
||||
value_for_name_constraints_permitted:
|
||||
- "DNS:www.example.com"
|
||||
- "IP:1.2.3.0/24"
|
||||
@@ -675,8 +675,8 @@
|
||||
- "RID:1.2.3.4"
|
||||
- "otherName:1.2.3.4;0c:07:63:65:72:74:72:65:71"
|
||||
- "otherName:1.3.6.1.4.1.311.20.2.3;UTF8:bob@localhost"
|
||||
- "dirName:O=Example Net,CN=example.net"
|
||||
- "dirName:/O = Example Com/CN = example.com"
|
||||
- "dirName:CN=example.net,O=Example Net"
|
||||
- "dirName:CN = example.com,O = Example Com"
|
||||
value_for_name_constraints_permitted:
|
||||
- "DNS:www.example.com"
|
||||
- "IP:1.2.3.0/255.255.255.0"
|
||||
@@ -761,8 +761,8 @@
|
||||
- "RID:1.2.3.4"
|
||||
- "otherName:1.2.3.4;0c:07:63:65:72:74:72:65:71"
|
||||
- "otherName:1.3.6.1.4.1.311.20.2.3;UTF8:bob@localhost"
|
||||
- "dirName:O =Example Net, CN= example.net"
|
||||
- "dirName:/O =Example Com/CN= example.com"
|
||||
- "dirName:CN= example.net, O =Example Net"
|
||||
- "dirName:/CN= example.com/O =Example Com"
|
||||
value_for_name_constraints_permitted:
|
||||
- "DNS:www.example.com"
|
||||
- "IP:1.2.3.0/255.255.255.0"
|
||||
|
||||
@@ -258,8 +258,8 @@
|
||||
"RID:1.2.3.4",
|
||||
"otherName:1.2.3.4;0c:07:63:65:72:74:72:65:71",
|
||||
"otherName:1.3.6.1.4.1.311.20.2.3;0c:0d:62:6f:62:40:6c:6f:63:61:6c:68:6f:73:74",
|
||||
"dirName:/O=Example Net/CN=example.net",
|
||||
"dirName:/O=Example Com/CN=example.com"
|
||||
"dirName:CN=example.net,O=Example Net",
|
||||
"dirName:CN=example.com,O=Example Com"
|
||||
]
|
||||
- everything_info.subject_key_identifier == "00:11:22:33"
|
||||
- everything_info.extended_key_usage == [
|
||||
|
||||
@@ -6,6 +6,12 @@
|
||||
from __future__ import absolute_import, division, print_function
|
||||
__metaclass__ = type
|
||||
|
||||
import re
|
||||
import sys
|
||||
|
||||
from distutils.version import LooseVersion
|
||||
|
||||
import cryptography
|
||||
import pytest
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
|
||||
@@ -14,11 +20,15 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.basic impo
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
|
||||
cryptography_get_name,
|
||||
_parse_dn_component,
|
||||
_parse_dn,
|
||||
)
|
||||
|
||||
from cryptography.x509 import NameAttribute, oid
|
||||
|
||||
|
||||
def test_cryptography_get_name_invalid_prefix():
|
||||
with pytest.raises(OpenSSLObjectError, match="Cannot parse Subject Alternative Name"):
|
||||
with pytest.raises(OpenSSLObjectError, match="^Cannot parse Subject Alternative Name"):
|
||||
cryptography_get_name('fake:value')
|
||||
|
||||
|
||||
@@ -31,3 +41,69 @@ def test_cryptography_get_name_other_name_utfstring():
|
||||
actual = cryptography_get_name('otherName:1.3.6.1.4.1.311.20.2.3;UTF8:Hello World')
|
||||
assert actual.type_id.dotted_string == '1.3.6.1.4.1.311.20.2.3'
|
||||
assert actual.value == b'\x0c\x0bHello World'
|
||||
|
||||
|
||||
@pytest.mark.parametrize('name, options, expected', [
|
||||
(b'CN=x ', {}, (NameAttribute(oid.NameOID.COMMON_NAME, u'x '), b'')),
|
||||
(b'CN=\\ ', {}, (NameAttribute(oid.NameOID.COMMON_NAME, u' '), b'')),
|
||||
(b'CN=\\#', {}, (NameAttribute(oid.NameOID.COMMON_NAME, u'#'), b'')),
|
||||
(b'CN=#402032', {}, (NameAttribute(oid.NameOID.COMMON_NAME, u'@ 2'), b'')),
|
||||
(b'CN = x ', {}, (NameAttribute(oid.NameOID.COMMON_NAME, u'x '), b'')),
|
||||
(b'CN = x\\, ', {}, (NameAttribute(oid.NameOID.COMMON_NAME, u'x, '), b'')),
|
||||
(b'CN = x\\40 ', {}, (NameAttribute(oid.NameOID.COMMON_NAME, u'x@ '), b'')),
|
||||
(b'CN = \\ , / ', {}, (NameAttribute(oid.NameOID.COMMON_NAME, u' '), b', / ')),
|
||||
(b'CN = \\ , / ', {'sep': b'/'}, (NameAttribute(oid.NameOID.COMMON_NAME, u' , '), b'/ ')),
|
||||
(b'CN = \\ , / ', {'decode_remainder': False}, (NameAttribute(oid.NameOID.COMMON_NAME, u'\\ , / '), b'')),
|
||||
# Some examples from https://datatracker.ietf.org/doc/html/rfc4514#section-4:
|
||||
(b'CN=James \\"Jim\\" Smith\\, III', {}, (NameAttribute(oid.NameOID.COMMON_NAME, u'James "Jim" Smith, III'), b'')),
|
||||
(b'CN=Before\\0dAfter', {}, (NameAttribute(oid.NameOID.COMMON_NAME, u'Before\x0dAfter'), b'')),
|
||||
(b'1.3.6.1.4.1.1466.0=#04024869', {}, (NameAttribute(oid.ObjectIdentifier(u'1.3.6.1.4.1.1466.0'), u'\x04\x02Hi'), b'')),
|
||||
(b'CN=Lu\\C4\\8Di\\C4\\87', {}, (NameAttribute(oid.NameOID.COMMON_NAME, u'Lučić'), b'')),
|
||||
])
|
||||
def test_parse_dn_component(name, options, expected):
|
||||
result = _parse_dn_component(name, **options)
|
||||
print(result, expected)
|
||||
assert result == expected
|
||||
|
||||
|
||||
# Cryptography < 2.9 does not allow empty strings
|
||||
# (https://github.com/pyca/cryptography/commit/87b2749c52e688c809f1861e55d958c64147493c)
|
||||
if LooseVersion(cryptography.__version__) >= LooseVersion('2.9'):
|
||||
@pytest.mark.parametrize('name, options, expected', [
|
||||
(b'CN=', {}, (NameAttribute(oid.NameOID.COMMON_NAME, u''), b'')),
|
||||
(b'CN= ', {}, (NameAttribute(oid.NameOID.COMMON_NAME, u''), b'')),
|
||||
])
|
||||
def test_parse_dn_component_not_py26(name, options, expected):
|
||||
result = _parse_dn_component(name, **options)
|
||||
print(result, expected)
|
||||
assert result == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize('name, options, message', [
|
||||
(b'CN=\\0', {}, u'Hex escape sequence "\\0" incomplete at end of string'),
|
||||
(b'CN=\\0,', {}, u'Hex escape sequence "\\0," has invalid second letter'),
|
||||
(b'CN=#0,', {}, u'Invalid hex sequence entry "0,"'),
|
||||
])
|
||||
def test_parse_dn_component_failure(name, options, message):
|
||||
with pytest.raises(OpenSSLObjectError, match=u'^%s$' % re.escape(message)):
|
||||
result = _parse_dn_component(name, **options)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('name, expected', [
|
||||
(b'CN=foo', [NameAttribute(oid.NameOID.COMMON_NAME, u'foo')]),
|
||||
(b'CN=foo,CN=bar', [NameAttribute(oid.NameOID.COMMON_NAME, u'foo'), NameAttribute(oid.NameOID.COMMON_NAME, u'bar')]),
|
||||
(b'CN = foo , CN = bar', [NameAttribute(oid.NameOID.COMMON_NAME, u'foo '), NameAttribute(oid.NameOID.COMMON_NAME, u'bar')]),
|
||||
])
|
||||
def test_parse_dn(name, expected):
|
||||
result = _parse_dn(name)
|
||||
print(result, expected)
|
||||
assert result == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize('name, message', [
|
||||
(b'CN=\\0', u'Error while parsing distinguished name "CN=\\0": Hex escape sequence "\\0" incomplete at end of string'),
|
||||
(b'CN=x,', u'Error while parsing distinguished name "CN=x,": unexpected end of string'),
|
||||
])
|
||||
def test_parse_dn_failure(name, message):
|
||||
with pytest.raises(OpenSSLObjectError, match=u'^%s$' % re.escape(message)):
|
||||
result = _parse_dn(name)
|
||||
|
||||
Reference in New Issue
Block a user