openssl_csr: allow to specify CRL distribution endpoints (#167)

* Improve error messages for name decoding (not all names appear in SANs).

* Refactor DN parsing, add relative DN parsing code.

* Allow to specify CRL distribution points.

* Add changelog fragment.

* Fix typo.

* Make sure value argument to x509.NameAttribute is a text.

* Update changelogs/fragments/167-openssl_csr-crl-distribution-points.yml

Co-authored-by: Andrew Klychkov <aaklychkov@mail.ru>

* Add example.

Co-authored-by: Andrew Klychkov <aaklychkov@mail.ru>
This commit is contained in:
Felix Fontein
2021-01-26 09:57:40 +01:00
committed by GitHub
parent a7c06b2ec4
commit d8ccebce60
8 changed files with 306 additions and 43 deletions

View File

@@ -36,6 +36,11 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptograp
cryptography_name_to_oid,
cryptography_key_needs_digest_for_signing,
cryptography_parse_key_usage_params,
cryptography_parse_relative_distinguished_name,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_crl import (
REVOCATION_REASON_MAP,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.pyopenssl_support import (
@@ -128,6 +133,7 @@ class CertificateSigningRequestBackend(object):
self.authority_key_identifier = module.params['authority_key_identifier']
self.authority_cert_issuer = module.params['authority_cert_issuer']
self.authority_cert_serial_number = module.params['authority_cert_serial_number']
self.crl_distribution_points = module.params['crl_distribution_points']
self.csr = None
self.privatekey = None
@@ -245,9 +251,10 @@ class CertificateSigningRequestBackend(object):
# Implementation with using pyOpenSSL
class CertificateSigningRequestPyOpenSSLBackend(CertificateSigningRequestBackend):
def __init__(self, module):
if module.params['create_subject_key_identifier']:
module.fail_json(msg='You cannot use create_subject_key_identifier with the pyOpenSSL backend!')
for o in ('subject_key_identifier', 'authority_key_identifier', 'authority_cert_issuer', 'authority_cert_serial_number'):
for o in ('create_subject_key_identifier', ):
if module.params[o]:
module.fail_json(msg='You cannot use {0} with the pyOpenSSL backend!'.format(o))
for o in ('subject_key_identifier', 'authority_key_identifier', 'authority_cert_issuer', 'authority_cert_serial_number', 'crl_distribution_points'):
if module.params[o] is not None:
module.fail_json(msg='You cannot use {0} with the pyOpenSSL backend!'.format(o))
super(CertificateSigningRequestPyOpenSSLBackend, self).__init__(module, 'pyopenssl')
@@ -409,6 +416,39 @@ class CertificateSigningRequestPyOpenSSLBackend(CertificateSigningRequestBackend
return _check_subject(self.existing_csr) and _check_extensions(self.existing_csr) and _check_signature(self.existing_csr)
def parse_crl_distribution_points(module, crl_distribution_points):
result = []
for index, parse_crl_distribution_point in enumerate(crl_distribution_points):
try:
params = dict(
full_name=None,
relative_name=None,
crl_issuer=None,
reasons=None,
)
if parse_crl_distribution_point['full_name'] is not None:
params['full_name'] = [cryptography_get_name(name, 'full name') for name in parse_crl_distribution_point['full_name']]
if parse_crl_distribution_point['relative_name'] is not None:
try:
params['relative_name'] = cryptography_parse_relative_distinguished_name(parse_crl_distribution_point['relative_name'])
except Exception:
# If cryptography's version is < 1.6, the error is probably caused by that
if CRYPTOGRAPHY_VERSION < LooseVersion('1.6'):
raise OpenSSLObjectError('Cannot specify relative_name for cryptography < 1.6')
raise
if parse_crl_distribution_point['crl_issuer'] is not None:
params['crl_issuer'] = [cryptography_get_name(name, 'CRL issuer') for name in parse_crl_distribution_point['crl_issuer']]
if parse_crl_distribution_point['reasons'] is not None:
reasons = []
for reason in parse_crl_distribution_point['reasons']:
reasons.append(REVOCATION_REASON_MAP[reason])
params['reasons'] = frozenset(reasons)
result.append(cryptography.x509.DistributionPoint(**params))
except OpenSSLObjectError as e:
raise OpenSSLObjectError('Error while parsing CRL distribution point #{index}: {error}'.format(index=index, error=e))
return result
# Implementation with using cryptography
class CertificateSigningRequestCryptographyBackend(CertificateSigningRequestBackend):
def __init__(self, module):
@@ -417,6 +457,9 @@ class CertificateSigningRequestCryptographyBackend(CertificateSigningRequestBack
if self.version != 1:
module.warn('The cryptography backend only supports version 1. (The only valid value according to RFC 2986.)')
if self.crl_distribution_points:
self.crl_distribution_points = parse_crl_distribution_points(module, self.crl_distribution_points)
def generate_csr(self):
"""(Re-)Generate CSR."""
self._ensure_private_key_loaded()
@@ -460,8 +503,8 @@ class CertificateSigningRequestCryptographyBackend(CertificateSigningRequestBack
if self.name_constraints_permitted or self.name_constraints_excluded:
try:
csr = csr.add_extension(cryptography.x509.NameConstraints(
[cryptography_get_name(name) for name in self.name_constraints_permitted],
[cryptography_get_name(name) for name in self.name_constraints_excluded],
[cryptography_get_name(name, 'name constraints permitted') for name in self.name_constraints_permitted],
[cryptography_get_name(name, 'name constraints excluded') for name in self.name_constraints_excluded],
), critical=self.name_constraints_critical)
except TypeError as e:
raise OpenSSLObjectError('Error while parsing name constraint: {0}'.format(e))
@@ -477,12 +520,18 @@ class CertificateSigningRequestCryptographyBackend(CertificateSigningRequestBack
if self.authority_key_identifier is not None or self.authority_cert_issuer is not None or self.authority_cert_serial_number is not None:
issuers = None
if self.authority_cert_issuer is not None:
issuers = [cryptography_get_name(n) for n in self.authority_cert_issuer]
issuers = [cryptography_get_name(n, 'authority cert issuer') for n in self.authority_cert_issuer]
csr = csr.add_extension(
cryptography.x509.AuthorityKeyIdentifier(self.authority_key_identifier, issuers, self.authority_cert_serial_number),
critical=False
)
if self.crl_distribution_points:
csr = csr.add_extension(
cryptography.x509.CRLDistributionPoints(self.crl_distribution_points),
critical=False
)
digest = None
if cryptography_key_needs_digest_for_signing(self.privatekey):
digest = select_message_digest(self.digest)
@@ -606,8 +655,8 @@ class CertificateSigningRequestCryptographyBackend(CertificateSigningRequestBack
current_nc_ext = _find_extension(extensions, cryptography.x509.NameConstraints)
current_nc_perm = [str(altname) for altname in current_nc_ext.value.permitted_subtrees] if current_nc_ext else []
current_nc_excl = [str(altname) for altname in current_nc_ext.value.excluded_subtrees] if current_nc_ext else []
nc_perm = [str(cryptography_get_name(altname)) for altname in self.name_constraints_permitted]
nc_excl = [str(cryptography_get_name(altname)) for altname in self.name_constraints_excluded]
nc_perm = [str(cryptography_get_name(altname, 'name constraints permitted')) for altname in self.name_constraints_permitted]
nc_excl = [str(cryptography_get_name(altname, 'name constraints excluded')) for altname in self.name_constraints_excluded]
if set(nc_perm) != set(current_nc_perm) or set(nc_excl) != set(current_nc_excl):
return False
if nc_perm or nc_excl:
@@ -636,7 +685,7 @@ class CertificateSigningRequestCryptographyBackend(CertificateSigningRequestBack
aci = None
csr_aci = None
if self.authority_cert_issuer is not None:
aci = [str(cryptography_get_name(n)) for n in self.authority_cert_issuer]
aci = [str(cryptography_get_name(n, 'authority cert issuer')) for n in self.authority_cert_issuer]
if ext.value.authority_cert_issuer is not None:
csr_aci = [str(n) for n in ext.value.authority_cert_issuer]
return (ext.value.key_identifier == self.authority_key_identifier
@@ -645,12 +694,21 @@ class CertificateSigningRequestCryptographyBackend(CertificateSigningRequestBack
else:
return ext is None
def _check_crl_distribution_points(extensions):
ext = _find_extension(extensions, cryptography.x509.CRLDistributionPoints)
if self.crl_distribution_points is None:
return ext is None
if not ext:
return False
return list(ext.value) == self.crl_distribution_points
def _check_extensions(csr):
extensions = csr.extensions
return (_check_subjectAltName(extensions) and _check_keyUsage(extensions) and
_check_extenededKeyUsage(extensions) and _check_basicConstraints(extensions) and
_check_ocspMustStaple(extensions) and _check_subject_key_identifier(extensions) and
_check_authority_key_identifier(extensions) and _check_nameConstraints(extensions))
_check_authority_key_identifier(extensions) and _check_nameConstraints(extensions) and
_check_crl_distribution_points(extensions))
def _check_signature(csr):
if not csr.is_signature_valid:
@@ -750,6 +808,26 @@ def get_csr_argument_spec():
authority_key_identifier=dict(type='str'),
authority_cert_issuer=dict(type='list', elements='str'),
authority_cert_serial_number=dict(type='int'),
crl_distribution_points=dict(
type='list',
elements='dict',
options=dict(
full_name=dict(type='list', elements='str'),
relative_name=dict(type='list', elements='str'),
crl_issuer=dict(type='list', elements='str'),
reasons=dict(type='list', elements='str', choices=[
'key_compromise',
'ca_compromise',
'affiliation_changed',
'superseded',
'cessation_of_operation',
'certificate_hold',
'privilege_withdrawn',
'aa_compromise',
]),
),
mutually_exclusive=[('full_name', 'relative_name')]
),
select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography', 'pyopenssl']),
),
required_together=[