mirror of
https://github.com/ansible-collections/community.crypto.git
synced 2026-03-26 21:33:25 +00:00
openssl_pkcs12: Add support for certificate_content and other_certificates_content (#848)
* openssl_pkcs12: Add support for `certificate_content` and `other_certificates_content` Co-authored-by: Felix Fontein <felix@fontein.de> * Added minimal tests. The tests are minimal because internally it always ends up with the _content variants, so even when supplying a file most of the internal code paths then use the content. --------- Co-authored-by: Felix Fontein <felix@fontein.de>
This commit is contained in:
committed by
GitHub
parent
260bdb1572
commit
ba55ba7381
@@ -48,13 +48,22 @@ options:
|
||||
- List of other certificates to include. Pre Ansible 2.8 this parameter was called O(ca_certificates).
|
||||
- Assumes there is one PEM-encoded certificate per file. If a file contains multiple PEM certificates, set O(other_certificates_parse_all)
|
||||
to V(true).
|
||||
- Mutually exclusive with O(other_certificates_content).
|
||||
type: list
|
||||
elements: path
|
||||
aliases: [ca_certificates]
|
||||
other_certificates_content:
|
||||
description:
|
||||
- List of other certificates to include.
|
||||
- Assumes there is one PEM-encoded certificate per item. If an item contains multiple PEM certificates, set O(other_certificates_parse_all)
|
||||
- Mutually exclusive with O(other_certificates).
|
||||
type: list
|
||||
elements: str
|
||||
version_added: "2.26.0"
|
||||
other_certificates_parse_all:
|
||||
description:
|
||||
- If set to V(true), assumes that the files mentioned in O(other_certificates) can contain more than one certificate
|
||||
per file (or even none per file).
|
||||
- If set to V(true), assumes that the files mentioned in O(other_certificates)/O(other_certificates_content) can contain more than one
|
||||
certificate per file/item (or even none per file/item).
|
||||
type: bool
|
||||
default: false
|
||||
version_added: 1.4.0
|
||||
@@ -62,7 +71,14 @@ options:
|
||||
description:
|
||||
- The path to read certificates and private keys from.
|
||||
- Must be in PEM format.
|
||||
- Mutually exclusive with O(certificate_content).
|
||||
type: path
|
||||
certificate_content:
|
||||
description:
|
||||
- Content of the certificate file in PEM format.
|
||||
- Mutually exclusive with O(certificate_path).
|
||||
type: str
|
||||
version_added: "2.26.0"
|
||||
force:
|
||||
description:
|
||||
- Should the file be regenerated even if it already exists.
|
||||
@@ -264,6 +280,7 @@ pkcs12:
|
||||
|
||||
import abc
|
||||
import base64
|
||||
import itertools
|
||||
import os
|
||||
import stat
|
||||
import traceback
|
||||
@@ -363,7 +380,9 @@ class Pkcs(OpenSSLObject):
|
||||
self.action = module.params['action']
|
||||
self.other_certificates = module.params['other_certificates']
|
||||
self.other_certificates_parse_all = module.params['other_certificates_parse_all']
|
||||
self.other_certificates_content = module.params['other_certificates_content']
|
||||
self.certificate_path = module.params['certificate_path']
|
||||
self.certificate_content = module.params['certificate_content']
|
||||
self.friendly_name = module.params['friendly_name']
|
||||
self.iter_size = module.params['iter_size'] or iter_size_default
|
||||
self.maciter_size = module.params['maciter_size'] or 1
|
||||
@@ -383,6 +402,15 @@ class Pkcs(OpenSSLObject):
|
||||
self.backup = module.params['backup']
|
||||
self.backup_file = None
|
||||
|
||||
if self.certificate_path is not None:
|
||||
try:
|
||||
with open(self.certificate_path, 'rb') as fh:
|
||||
self.certificate_content = fh.read()
|
||||
except (IOError, OSError) as exc:
|
||||
raise PkcsError(exc)
|
||||
elif self.certificate_content is not None:
|
||||
self.certificate_content = to_bytes(self.certificate_content)
|
||||
|
||||
if self.privatekey_path is not None:
|
||||
try:
|
||||
with open(self.privatekey_path, 'rb') as fh:
|
||||
@@ -402,6 +430,13 @@ class Pkcs(OpenSSLObject):
|
||||
self.other_certificates = [
|
||||
load_certificate(other_cert, backend=self.backend) for other_cert in self.other_certificates
|
||||
]
|
||||
elif self.other_certificates_content:
|
||||
certs = self.other_certificates_content
|
||||
if self.other_certificates_parse_all:
|
||||
certs = list(itertools.chain.from_iterable(split_pem_list(content) for content in certs))
|
||||
self.other_certificates = [
|
||||
load_certificate(None, content=to_bytes(other_cert), backend=self.backend) for other_cert in certs
|
||||
]
|
||||
|
||||
@abc.abstractmethod
|
||||
def generate_bytes(self, module):
|
||||
@@ -458,11 +493,11 @@ class Pkcs(OpenSSLObject):
|
||||
elif bool(pkcs12_privatekey) != bool(self.privatekey_content):
|
||||
return False
|
||||
|
||||
if (pkcs12_certificate is not None) and (self.certificate_path is not None):
|
||||
if (pkcs12_certificate is not None) and (self.certificate_content is not None):
|
||||
expected_cert = self._dump_certificate(self.pkcs12)
|
||||
if pkcs12_certificate != expected_cert:
|
||||
return False
|
||||
elif bool(pkcs12_certificate) != bool(self.certificate_path):
|
||||
elif bool(pkcs12_certificate) != bool(self.certificate_content):
|
||||
return False
|
||||
|
||||
if (pkcs12_other_certificates is not None) and (self.other_certificates is not None):
|
||||
@@ -554,8 +589,8 @@ class PkcsPyOpenSSL(Pkcs):
|
||||
if self.other_certificates:
|
||||
self.pkcs12.set_ca_certificates(self.other_certificates)
|
||||
|
||||
if self.certificate_path:
|
||||
self.pkcs12.set_certificate(load_certificate(self.certificate_path, backend=self.backend))
|
||||
if self.certificate_content:
|
||||
self.pkcs12.set_certificate(load_certificate(None, content=self.certificate_content, backend=self.backend))
|
||||
|
||||
if self.friendly_name:
|
||||
self.pkcs12.set_friendlyname(to_bytes(self.friendly_name))
|
||||
@@ -628,8 +663,8 @@ class PkcsCryptography(Pkcs):
|
||||
raise PkcsError(exc)
|
||||
|
||||
cert = None
|
||||
if self.certificate_path:
|
||||
cert = load_certificate(self.certificate_path, backend=self.backend)
|
||||
if self.certificate_content:
|
||||
cert = load_certificate(None, content=self.certificate_content, backend=self.backend)
|
||||
|
||||
friendly_name = to_bytes(self.friendly_name) if self.friendly_name is not None else None
|
||||
|
||||
@@ -759,7 +794,9 @@ def main():
|
||||
action=dict(type='str', default='export', choices=['export', 'parse']),
|
||||
other_certificates=dict(type='list', elements='path', aliases=['ca_certificates']),
|
||||
other_certificates_parse_all=dict(type='bool', default=False),
|
||||
other_certificates_content=dict(type='list', elements='str'),
|
||||
certificate_path=dict(type='path'),
|
||||
certificate_content=dict(type='str'),
|
||||
force=dict(type='bool', default=False),
|
||||
friendly_name=dict(type='str', aliases=['name']),
|
||||
encryption_level=dict(type='str', choices=['auto', 'compatibility2022'], default='auto'),
|
||||
@@ -783,6 +820,8 @@ def main():
|
||||
|
||||
mutually_exclusive = [
|
||||
['privatekey_path', 'privatekey_content'],
|
||||
['certificate_path', 'certificate_content'],
|
||||
['other_certificates', 'other_certificates_content'],
|
||||
]
|
||||
|
||||
module = AnsibleModule(
|
||||
|
||||
Reference in New Issue
Block a user