mirror of
https://github.com/ansible-collections/community.general.git
synced 2026-05-08 06:12:51 +00:00
ACME: add support for IP identifiers (#53660)
* Adding support for IP identifiers according to https://tools.ietf.org/html/draft-ietf-acme-ip-05. * Add changelog. * Make sure that the authorizations return value is unchanged for CSRs with DNS-only SANs. * Remove unneeded import. * type -> identifier_type * Python 2.6 compatibility. * Fix unit tests. * Add IP address normalization. * Extend tests. * Move data into fixtures. * Adjust BOTMETA.
This commit is contained in:
committed by
René Moser
parent
028facdfed
commit
c2cb82ec14
@@ -822,21 +822,97 @@ class ACMEAccount(object):
|
||||
return True, account_data
|
||||
|
||||
|
||||
def cryptography_get_csr_domains(module, csr_filename):
|
||||
def _normalize_ip(ip):
|
||||
if ':' not in ip:
|
||||
# For IPv4 addresses: remove trailing zeros per nibble
|
||||
ip = '.'.join([nibble.lstrip('0') or '0' for nibble in ip.split('.')])
|
||||
return ip
|
||||
# For IPv6 addresses:
|
||||
# 1. Make them lowercase and split
|
||||
ip = ip.lower()
|
||||
i = ip.find('::')
|
||||
if i >= 0:
|
||||
front = ip[:i].split(':') or []
|
||||
back = ip[i + 2:].split(':') or []
|
||||
ip = front + ['0'] * (8 - len(front) - len(back)) + back
|
||||
else:
|
||||
ip = ip.split(':')
|
||||
# 2. Remove trailing zeros per nibble
|
||||
ip = [nibble.lstrip('0') or '0' for nibble in ip]
|
||||
# 3. Find longest consecutive sequence of zeros
|
||||
zeros_start = -1
|
||||
zeros_length = -1
|
||||
current_start = -1
|
||||
for i, nibble in enumerate(ip):
|
||||
if nibble == '0':
|
||||
if current_start < 0:
|
||||
current_start = i
|
||||
elif current_start >= 0:
|
||||
if i - current_start > zeros_length:
|
||||
zeros_start = current_start
|
||||
zeros_length = i - current_start
|
||||
current_start = -1
|
||||
if current_start >= 0:
|
||||
if 8 - current_start > zeros_length:
|
||||
zeros_start = current_start
|
||||
zeros_length = 8 - current_start
|
||||
# 4. If the sequence has at least two elements, contract
|
||||
if zeros_length >= 2:
|
||||
return ':'.join(ip[:zeros_start]) + '::' + ':'.join(ip[zeros_start + zeros_length:])
|
||||
# 5. If not, return full IP
|
||||
return ':'.join(ip)
|
||||
|
||||
|
||||
def openssl_get_csr_identifiers(openssl_binary, module, csr_filename):
|
||||
'''
|
||||
Return a set of requested domains (CN and SANs) for the CSR.
|
||||
Return a set of requested identifiers (CN and SANs) for the CSR.
|
||||
Each identifier is a pair (type, identifier), where type is either
|
||||
'dns' or 'ip'.
|
||||
'''
|
||||
domains = set([])
|
||||
openssl_csr_cmd = [openssl_binary, "req", "-in", csr_filename, "-noout", "-text"]
|
||||
dummy, out, dummy = module.run_command(openssl_csr_cmd, check_rc=True)
|
||||
|
||||
identifiers = set([])
|
||||
common_name = re.search(r"Subject:.* CN\s?=\s?([^\s,;/]+)", to_text(out, errors='surrogate_or_strict'))
|
||||
if common_name is not None:
|
||||
identifiers.add(('dns', common_name.group(1)))
|
||||
subject_alt_names = re.search(
|
||||
r"X509v3 Subject Alternative Name: (?:critical)?\n +([^\n]+)\n",
|
||||
to_text(out, errors='surrogate_or_strict'), re.MULTILINE | re.DOTALL)
|
||||
if subject_alt_names is not None:
|
||||
for san in subject_alt_names.group(1).split(", "):
|
||||
if san.lower().startswith("dns:"):
|
||||
identifiers.add(('dns', san[4:]))
|
||||
elif san.lower().startswith("ip:"):
|
||||
identifiers.add(('ip', _normalize_ip(san[3:])))
|
||||
elif san.lower().startswith("ip address:"):
|
||||
identifiers.add(('ip', _normalize_ip(san[11:])))
|
||||
else:
|
||||
raise ModuleFailException('Found unsupported SAN identifier "{0}"'.format(san))
|
||||
return identifiers
|
||||
|
||||
|
||||
def cryptography_get_csr_identifiers(module, csr_filename):
|
||||
'''
|
||||
Return a set of requested identifiers (CN and SANs) for the CSR.
|
||||
Each identifier is a pair (type, identifier), where type is either
|
||||
'dns' or 'ip'.
|
||||
'''
|
||||
identifiers = set([])
|
||||
csr = cryptography.x509.load_pem_x509_csr(read_file(csr_filename), _cryptography_backend)
|
||||
for sub in csr.subject:
|
||||
if sub.oid == cryptography.x509.oid.NameOID.COMMON_NAME:
|
||||
domains.add(sub.value)
|
||||
identifiers.add(('dns', sub.value))
|
||||
for extension in csr.extensions:
|
||||
if extension.oid == cryptography.x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME:
|
||||
for name in extension.value:
|
||||
if isinstance(name, cryptography.x509.DNSName):
|
||||
domains.add(name.value)
|
||||
return domains
|
||||
identifiers.add(('dns', name.value))
|
||||
elif isinstance(name, cryptography.x509.IPAddress):
|
||||
identifiers.add(('ip', _normalize_ip(str(name.value))))
|
||||
else:
|
||||
raise ModuleFailException('Found unsupported SAN identifier {0}'.format(name))
|
||||
return identifiers
|
||||
|
||||
|
||||
def cryptography_get_cert_days(module, cert_file, now=None):
|
||||
|
||||
Reference in New Issue
Block a user