Allow to specify subject (for CSRs) and issuer (for CRLs) ordered (#316)

* Allow to specify subject (for CSRs) and issuer (for CRLs) ordered.

* Forgot import.

* Apply suggestions from code review

Co-authored-by: Ajpantuso <ajpantuso@gmail.com>

* Apply suggestions from code review

Co-authored-by: Ajpantuso <ajpantuso@gmail.com>

* Fix typo.

* Simplify error handling, reject empty values outright.

* Document d497231e1c.

Co-authored-by: Ajpantuso <ajpantuso@gmail.com>
This commit is contained in:
Felix Fontein
2021-10-31 15:05:04 +01:00
committed by GitHub
parent ecbd44df22
commit 589e7c72ef
11 changed files with 391 additions and 108 deletions

View File

@@ -52,7 +52,20 @@ options:
description:
- Key/value pairs that will be present in the subject name field of the certificate signing request.
- If you need to specify more than one value with the same key, use a list as value.
- If the order of the components is important, use I(subject_ordered).
- Mutually exclusive with I(subject_ordered).
type: dict
subject_ordered:
description:
- A list of dictionaries, where every dictionary must contain one key/value pair. This key/value pair
will be present in the subject name field of the certificate signing request.
- If you want to specify more than one value with the same key in a row, you can use a list as value.
- Mutually exclusive with I(subject), and any other subject field option, such as I(country_name),
I(state_or_province_name), I(locality_name), I(organization_name), I(organizational_unit_name),
I(common_name), or I(email_address).
type: list
elements: dict
version_added: 2.0.0
country_name:
description:
- The countryName field of the certificate signing request subject.

View File

@@ -118,11 +118,11 @@ class CryptographyChainMatcher(ChainMatcher):
self.issuer = []
if criterium.subject:
self.subject = [
(cryptography_name_to_oid(k), to_native(v)) for k, v in parse_name_field(criterium.subject)
(cryptography_name_to_oid(k), to_native(v)) for k, v in parse_name_field(criterium.subject, 'subject')
]
if criterium.issuer:
self.issuer = [
(cryptography_name_to_oid(k), to_native(v)) for k, v in parse_name_field(criterium.issuer)
(cryptography_name_to_oid(k), to_native(v)) for k, v in parse_name_field(criterium.issuer, 'issuer')
]
self.subject_key_identifier = CryptographyChainMatcher._parse_key_identifier(
criterium.subject_key_identifier, 'subject_key_identifier', criterium.index, module)

View File

@@ -16,7 +16,7 @@ from distutils.version import LooseVersion
from ansible.module_utils import six
from ansible.module_utils.basic import missing_required_lib
from ansible.module_utils.common.text.converters import to_bytes, to_text
from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text
from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
OpenSSLObjectError,
@@ -27,6 +27,7 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im
load_privatekey,
load_certificate_request,
parse_name_field,
parse_ordered_name_field,
select_message_digest,
)
@@ -119,6 +120,7 @@ class CertificateSigningRequestBackend(object):
if self.create_subject_key_identifier and self.subject_key_identifier is not None:
module.fail_json(msg='subject_key_identifier cannot be specified if create_subject_key_identifier is true')
self.ordered_subject = False
self.subject = [
('C', module.params['country_name']),
('ST', module.params['state_or_province_name']),
@@ -128,11 +130,19 @@ class CertificateSigningRequestBackend(object):
('CN', module.params['common_name']),
('emailAddress', module.params['email_address']),
]
if module.params['subject']:
self.subject = self.subject + parse_name_field(module.params['subject'])
self.subject = [(entry[0], entry[1]) for entry in self.subject if entry[1]]
try:
if module.params['subject']:
self.subject = self.subject + parse_name_field(module.params['subject'], 'subject')
if module.params['subject_ordered']:
if self.subject:
raise CertificateSigningRequestError('subject_ordered cannot be combined with any other subject field')
self.subject = parse_ordered_name_field(module.params['subject_ordered'], 'subject_ordered')
self.ordered_subject = True
except ValueError as exc:
raise CertificateSigningRequestError(to_native(exc))
self.using_common_name_for_san = False
if not self.subjectAltName and module.params['use_common_name_for_san']:
for sub in self.subject:
@@ -401,7 +411,10 @@ class CertificateSigningRequestCryptographyBackend(CertificateSigningRequestBack
def _check_subject(csr):
subject = [(cryptography_name_to_oid(entry[0]), to_text(entry[1])) for entry in self.subject]
current_subject = [(sub.oid, sub.value) for sub in csr.subject]
return set(subject) == set(current_subject)
if self.ordered_subject:
return subject == current_subject
else:
return set(subject) == set(current_subject)
def _find_extension(extensions, exttype):
return next(
@@ -592,6 +605,7 @@ def get_csr_argument_spec():
privatekey_passphrase=dict(type='str', no_log=True),
version=dict(type='int', default=1, choices=[1]),
subject=dict(type='dict'),
subject_ordered=dict(type='list', elements='dict'),
country_name=dict(type='str', aliases=['C', 'countryName']),
state_or_province_name=dict(type='str', aliases=['ST', 'stateOrProvinceName']),
locality_name=dict(type='str', aliases=['L', 'localityName']),
@@ -645,6 +659,7 @@ def get_csr_argument_spec():
],
mutually_exclusive=[
['privatekey_path', 'privatekey_content'],
['subject', 'subject_ordered'],
],
required_one_of=[
['privatekey_path', 'privatekey_content'],

View File

@@ -237,16 +237,43 @@ def load_certificate_request(path, content=None, backend='cryptography'):
raise OpenSSLObjectError(exc)
def parse_name_field(input_dict):
def parse_name_field(input_dict, name_field_name=None):
"""Take a dict with key: value or key: list_of_values mappings and return a list of tuples"""
error_str = '{key}' if name_field_name is None else '{key} in {name}'
result = []
for key, value in input_dict.items():
if isinstance(value, list):
for entry in value:
if not isinstance(entry, six.string_types):
raise TypeError(('Values %s must be strings' % error_str).format(key=key, name=name_field_name))
if not entry:
raise ValueError(('Values for %s must not be empty strings' % error_str).format(key=key))
result.append((key, entry))
elif isinstance(value, six.string_types):
if not value:
raise ValueError(('Value for %s must not be an empty string' % error_str).format(key=key))
result.append((key, value))
else:
raise TypeError(('Value for %s must be either a string or a list of strings' % error_str).format(key=key))
return result
def parse_ordered_name_field(input_list, name_field_name):
"""Take a dict with key: value or key: list_of_values mappings and return a list of tuples"""
result = []
for key in input_dict:
if isinstance(input_dict[key], list):
for entry in input_dict[key]:
result.append((key, entry))
else:
result.append((key, input_dict[key]))
for index, entry in enumerate(input_list):
if len(entry) != 1:
raise ValueError(
'Entry #{index} in {name} must be a dictionary with exactly one key-value pair'.format(
name=name_field_name, index=index + 1))
try:
result.extend(parse_name_field(entry, name_field_name=name_field_name))
except (TypeError, ValueError) as exc:
raise ValueError(
'Error while processing entry #{index} in {name}: {error}'.format(
name=name_field_name, index=index + 1, error=exc))
return result

View File

@@ -580,9 +580,12 @@ class ACMECertificateClient(object):
if self.module.params['select_chain']:
for criterium_idx, criterium in enumerate(self.module.params['select_chain']):
self.select_chain_matcher.append(
self.client.backend.create_chain_matcher(
Criterium(criterium, index=criterium_idx)))
try:
self.select_chain_matcher.append(
self.client.backend.create_chain_matcher(
Criterium(criterium, index=criterium_idx)))
except ValueError as exc:
self.module.warn('Error while parsing criterium: {error}. Ignoring criterium.'.format(error=exc))
# Make sure account exists
modify_account = module.params['modify_account']

View File

@@ -92,8 +92,21 @@ options:
description:
- Key/value pairs that will be present in the issuer name field of the CRL.
- If you need to specify more than one value with the same key, use a list as value.
- Required if I(state) is C(present).
- If the order of the components is important, use I(issuer_ordered).
- One of I(issuer) and I(issuer_ordered) is required if I(state) is C(present).
- Mutually exclusive with I(issuer_ordered).
type: dict
issuer_ordered:
description:
- A list of dictionaries, where every dictionary must contain one key/value pair.
This key/value pair will be present in the issuer name field of the CRL.
- If you want to specify more than one value with the same key in a row, you can
use a list as value.
- One of I(issuer) and I(issuer_ordered) is required if I(state) is C(present).
- Mutually exclusive with I(issuer).
type: list
elements: dict
version_added: 2.0.0
last_update:
description:
@@ -386,6 +399,7 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im
load_privatekey,
load_certificate,
parse_name_field,
parse_ordered_name_field,
get_relative_time_option,
select_message_digest,
)
@@ -462,8 +476,15 @@ class CRL(OpenSSLObject):
self.privatekey_content = self.privatekey_content.encode('utf-8')
self.privatekey_passphrase = module.params['privatekey_passphrase']
self.issuer = parse_name_field(module.params['issuer'])
self.issuer = [(entry[0], entry[1]) for entry in self.issuer if entry[1]]
try:
if module.params['issuer_ordered']:
self.issuer_ordered = True
self.issuer = parse_ordered_name_field(module.params['issuer_ordered'], 'issuer_ordered')
else:
self.issuer_ordered = False
self.issuer = parse_name_field(module.params['issuer'], 'issuer')
except (TypeError, ValueError) as exc:
module.fail_json(msg=to_native(exc))
self.last_update = get_relative_time_option(module.params['last_update'], 'last_update')
self.next_update = get_relative_time_option(module.params['next_update'], 'next_update')
@@ -616,7 +637,11 @@ class CRL(OpenSSLObject):
return False
want_issuer = [(cryptography_name_to_oid(entry[0]), entry[1]) for entry in self.issuer]
if want_issuer != [(sub.oid, sub.value) for sub in self.crl.issuer]:
is_issuer = [(sub.oid, sub.value) for sub in self.crl.issuer]
if not self.issuer_ordered:
want_issuer = set(want_issuer)
is_issuer = set(is_issuer)
if want_issuer != is_issuer:
return False
old_entries = [self._compress_entry(cryptography_decode_revoked_certificate(cert)) for cert in self.crl]
@@ -782,6 +807,7 @@ def main():
privatekey_content=dict(type='str', no_log=True),
privatekey_passphrase=dict(type='str', no_log=True),
issuer=dict(type='dict'),
issuer_ordered=dict(type='list', elements='dict'),
last_update=dict(type='str', default='+0s'),
next_update=dict(type='str'),
digest=dict(type='str', default='sha256'),
@@ -815,10 +841,12 @@ def main():
),
required_if=[
('state', 'present', ['privatekey_path', 'privatekey_content'], True),
('state', 'present', ['issuer', 'next_update', 'revoked_certificates'], False),
('state', 'present', ['issuer', 'issuer_ordered'], True),
('state', 'present', ['next_update', 'revoked_certificates'], False),
],
mutually_exclusive=(
['privatekey_path', 'privatekey_content'],
['issuer', 'issuer_ordered'],
),
supports_check_mode=True,
add_file_common_args=True,