openssl_pkcs12: add cryptography backend (#234)

* Began refactoring.

* Continue.

* Factor PyOpenSSL backend out.

* Add basic cryptography backend.

* Update plugins/modules/openssl_pkcs12.py

Co-authored-by: Ajpantuso <ajpantuso@gmail.com>

* Only run tests when new enough pyOpenSSL or cryptography is around.

* Reduce required pyOpenSSL version from 17.1.0 to 0.15.

I have no idea why 17.1.0 was there (in the tests), and not something smaller.
The module itself did not mention any version.

* Linting.

* Linting.

* Increase compatibility by selecting pyopenssl backend when iter_size or maciter_size is used.

* Improve docs, add changelog fragment.

* Move hackish code to cryptography_support.

* Update plugins/modules/openssl_pkcs12.py

Co-authored-by: Ajpantuso <ajpantuso@gmail.com>

* Update plugins/modules/openssl_pkcs12.py

Co-authored-by: Ajpantuso <ajpantuso@gmail.com>

* Streamline cert creation.

* Convert range to list.

Co-authored-by: Ajpantuso <ajpantuso@gmail.com>
This commit is contained in:
Felix Fontein
2021-05-20 19:36:07 +02:00
committed by GitHub
parent 0a0d0f2bdf
commit e9bc7c7163
6 changed files with 504 additions and 187 deletions

View File

@@ -16,8 +16,14 @@ author:
short_description: Generate OpenSSL PKCS#12 archive
description:
- This module allows one to (re-)generate PKCS#12.
- The module can use the cryptography Python library, or the pyOpenSSL Python
library. By default, it tries to detect which one is available, assuming none of the
I(iter_size) and I(maciter_size) options are used. This can be overridden with the
I(select_crypto_backend) option.
# Please note that the C(pyopenssl) backend has been deprecated in community.crypto x.y.0,
# and will be removed in community.crypto (x+1).0.0.
requirements:
- python-pyOpenSSL
- PyOpenSSL >= 0.15 or cryptography >= 3.0
options:
action:
description:
@@ -58,16 +64,21 @@ options:
iter_size:
description:
- Number of times to repeat the encryption step.
- This is not considered during idempotency checks.
- This is only used by the C(pyopenssl) backend. When using it, the default is C(2048).
type: int
default: 2048
maciter_size:
description:
- Number of times to repeat the MAC step.
- This is not considered during idempotency checks.
- This is only used by the C(pyopenssl) backend. When using it, the default is C(1).
type: int
default: 1
passphrase:
description:
- The PKCS#12 password.
- "B(Note:) PKCS12 encryption is not secure and should not be used as a security mechanism.
If you need to store or send a PKCS12 file safely, you should additionally encrypt it
with something else."
type: str
path:
description:
@@ -105,6 +116,21 @@ options:
type: bool
default: no
version_added: "1.0.0"
select_crypto_backend:
description:
- Determines which crypto backend to use.
- The default choice is C(auto), which tries to use C(cryptography) if available, and falls back to C(pyopenssl).
If one of I(iter_size) or I(maciter_size) is used, C(auto) will always result in C(pyopenssl) to be chosen
for backwards compatibility.
- If set to C(pyopenssl), will try to use the L(pyOpenSSL,https://pypi.org/project/pyOpenSSL/) library.
- If set to C(cryptography), will try to use the L(cryptography,https://cryptography.io/) library.
# - Please note that the C(pyopenssl) backend has been deprecated in community.crypto x.y.0, and will be
# removed in community.crypto (x+1).0.0.
# From that point on, only the C(cryptography) backend will be available.
type: str
default: auto
choices: [ auto, cryptography, pyopenssl ]
version_added: 1.7.0
extends_documentation_fragment:
- files
seealso:
@@ -207,11 +233,14 @@ pkcs12:
version_added: "1.0.0"
'''
import abc
import base64
import os
import stat
import traceback
from distutils.version import LooseVersion
from ansible.module_utils.basic import AnsibleModule, missing_required_lib
from ansible.module_utils._text import to_bytes, to_native
@@ -225,6 +254,10 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.basic impo
OpenSSLBadPassphraseError,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
parse_pkcs12,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
OpenSSLObject,
load_privatekey,
@@ -235,23 +268,40 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import
split_pem_list,
)
MINIMAL_CRYPTOGRAPHY_VERSION = '3.0'
MINIMAL_PYOPENSSL_VERSION = '0.15'
PYOPENSSL_IMP_ERR = None
try:
import OpenSSL
from OpenSSL import crypto
PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__)
except ImportError:
PYOPENSSL_IMP_ERR = traceback.format_exc()
pyopenssl_found = False
PYOPENSSL_FOUND = False
else:
pyopenssl_found = True
PYOPENSSL_FOUND = True
CRYPTOGRAPHY_IMP_ERR = None
try:
import cryptography
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization.pkcs12 import serialize_key_and_certificates
CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
except ImportError:
CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
CRYPTOGRAPHY_FOUND = False
else:
CRYPTOGRAPHY_FOUND = True
def load_certificate_set(filename):
def load_certificate_set(filename, backend):
'''
Load list of concatenated PEM files, and return a list of parsed certificates.
'''
with open(filename, 'rb') as f:
data = f.read().decode('utf-8')
return [load_certificate(None, content=cert) for cert in split_pem_list(data)]
return [load_certificate(None, content=cert.encode('utf-8'), backend=backend) for cert in split_pem_list(data)]
class PkcsError(OpenSSLObjectError):
@@ -259,21 +309,21 @@ class PkcsError(OpenSSLObjectError):
class Pkcs(OpenSSLObject):
def __init__(self, module):
def __init__(self, module, backend):
super(Pkcs, self).__init__(
module.params['path'],
module.params['state'],
module.params['force'],
module.check_mode
)
self.backend = backend
self.action = module.params['action']
self.other_certificates = module.params['other_certificates']
self.other_certificates_parse_all = module.params['other_certificates_parse_all']
self.certificate_path = module.params['certificate_path']
self.friendly_name = module.params['friendly_name']
self.iter_size = module.params['iter_size']
self.maciter_size = module.params['maciter_size']
self.iter_size = module.params['iter_size'] or 2048
self.maciter_size = module.params['maciter_size'] or 1
self.passphrase = module.params['passphrase']
self.pkcs12 = None
self.privatekey_passphrase = module.params['privatekey_passphrase']
@@ -293,12 +343,37 @@ class Pkcs(OpenSSLObject):
filenames = list(self.other_certificates)
self.other_certificates = []
for other_cert_bundle in filenames:
self.other_certificates.extend(load_certificate_set(other_cert_bundle))
self.other_certificates.extend(load_certificate_set(other_cert_bundle, self.backend))
else:
self.other_certificates = [
load_certificate(other_cert) for other_cert in self.other_certificates
load_certificate(other_cert, backend=self.backend) for other_cert in self.other_certificates
]
@abc.abstractmethod
def generate_bytes(self, module):
"""Generate PKCS#12 file archive."""
pass
@abc.abstractmethod
def parse_bytes(self, pkcs12_content):
pass
@abc.abstractmethod
def _dump_privatekey(self, pkcs12):
pass
@abc.abstractmethod
def _dump_certificate(self, pkcs12):
pass
@abc.abstractmethod
def _dump_other_certificates(self, pkcs12):
pass
@abc.abstractmethod
def _get_friendly_name(self, pkcs12):
pass
def check(self, module, perms_required=True):
"""Ensure the resource is in its desired state."""
@@ -307,10 +382,8 @@ class Pkcs(OpenSSLObject):
def _check_pkey_passphrase():
if self.privatekey_passphrase:
try:
load_privatekey(self.privatekey_path, self.privatekey_passphrase)
except crypto.Error:
return False
except OpenSSLBadPassphraseError:
load_privatekey(self.privatekey_path, self.privatekey_passphrase, backend=self.backend)
except OpenSSLObjectError:
return False
return True
@@ -318,32 +391,28 @@ class Pkcs(OpenSSLObject):
return state_and_perms
if os.path.exists(self.path) and module.params['action'] == 'export':
dummy = self.generate(module)
dummy = self.generate_bytes(module)
self.src = self.path
try:
pkcs12_privatekey, pkcs12_certificate, pkcs12_other_certificates, pkcs12_friendly_name = self.parse()
except crypto.Error:
except OpenSSLObjectError:
return False
if (pkcs12_privatekey is not None) and (self.privatekey_path is not None):
expected_pkey = crypto.dump_privatekey(crypto.FILETYPE_PEM,
self.pkcs12.get_privatekey())
expected_pkey = self._dump_privatekey(self.pkcs12)
if pkcs12_privatekey != expected_pkey:
return False
elif bool(pkcs12_privatekey) != bool(self.privatekey_path):
return False
if (pkcs12_certificate is not None) and (self.certificate_path is not None):
expected_cert = crypto.dump_certificate(crypto.FILETYPE_PEM,
self.pkcs12.get_certificate())
expected_cert = self._dump_certificate(self.pkcs12)
if pkcs12_certificate != expected_cert:
return False
elif bool(pkcs12_certificate) != bool(self.certificate_path):
return False
if (pkcs12_other_certificates is not None) and (self.other_certificates is not None):
expected_other_certs = [crypto.dump_certificate(crypto.FILETYPE_PEM,
other_cert) for other_cert in self.pkcs12.get_ca_certificates()]
expected_other_certs = self._dump_other_certificates(self.pkcs12)
if set(pkcs12_other_certificates) != set(expected_other_certs):
return False
elif bool(pkcs12_other_certificates) != bool(self.other_certificates):
@@ -352,15 +421,16 @@ class Pkcs(OpenSSLObject):
if pkcs12_privatekey:
# This check is required because pyOpenSSL will not return a friendly name
# if the private key is not set in the file
if ((self.pkcs12.get_friendlyname() is not None) and (pkcs12_friendly_name is not None)):
if self.pkcs12.get_friendlyname() != pkcs12_friendly_name:
friendly_name = self._get_friendly_name(self.pkcs12)
if ((friendly_name is not None) and (pkcs12_friendly_name is not None)):
if friendly_name != pkcs12_friendly_name:
return False
elif bool(self.pkcs12.get_friendlyname()) != bool(pkcs12_friendly_name):
elif bool(friendly_name) != bool(pkcs12_friendly_name):
return False
elif module.params['action'] == 'parse' and os.path.exists(self.src) and os.path.exists(self.path):
try:
pkey, cert, other_certs, friendly_name = self.parse()
except crypto.Error:
except OpenSSLObjectError:
return False
expected_content = to_bytes(
''.join([to_native(pem) for pem in [pkey, cert] + other_certs if pem is not None])
@@ -390,27 +460,6 @@ class Pkcs(OpenSSLObject):
return result
def generate(self, module):
"""Generate PKCS#12 file archive."""
self.pkcs12 = crypto.PKCS12()
if self.other_certificates:
self.pkcs12.set_ca_certificates(self.other_certificates)
if self.certificate_path:
self.pkcs12.set_certificate(load_certificate(self.certificate_path))
if self.friendly_name:
self.pkcs12.set_friendlyname(to_bytes(self.friendly_name))
if self.privatekey_path:
try:
self.pkcs12.set_privatekey(load_privatekey(self.privatekey_path, self.privatekey_passphrase))
except OpenSSLBadPassphraseError as exc:
raise PkcsError(exc)
return self.pkcs12.export(self.passphrase, self.iter_size, self.maciter_size)
def remove(self, module):
if self.backup:
self.backup_file = module.backup_local(self.path)
@@ -422,8 +471,51 @@ class Pkcs(OpenSSLObject):
try:
with open(self.src, 'rb') as pkcs12_fh:
pkcs12_content = pkcs12_fh.read()
p12 = crypto.load_pkcs12(pkcs12_content,
self.passphrase)
return self.parse_bytes(pkcs12_content)
except IOError as exc:
raise PkcsError(exc)
def generate(self):
pass
def write(self, module, content, mode=None):
"""Write the PKCS#12 file."""
if self.backup:
self.backup_file = module.backup_local(self.path)
write_file(module, content, mode)
if self.return_content:
self.pkcs12_bytes = content
class PkcsPyOpenSSL(Pkcs):
def __init__(self, module):
super(PkcsPyOpenSSL, self).__init__(module, 'pyopenssl')
def generate_bytes(self, module):
"""Generate PKCS#12 file archive."""
self.pkcs12 = crypto.PKCS12()
if self.other_certificates:
self.pkcs12.set_ca_certificates(self.other_certificates)
if self.certificate_path:
self.pkcs12.set_certificate(load_certificate(self.certificate_path, backend=self.backend))
if self.friendly_name:
self.pkcs12.set_friendlyname(to_bytes(self.friendly_name))
if self.privatekey_path:
try:
self.pkcs12.set_privatekey(
load_privatekey(self.privatekey_path, self.privatekey_passphrase, backend=self.backend))
except OpenSSLBadPassphraseError as exc:
raise PkcsError(exc)
return self.pkcs12.export(self.passphrase, self.iter_size, self.maciter_size)
def parse_bytes(self, pkcs12_content):
try:
p12 = crypto.load_pkcs12(pkcs12_content, self.passphrase)
pkey = p12.get_privatekey()
if pkey is not None:
pkey = crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)
@@ -438,17 +530,143 @@ class Pkcs(OpenSSLObject):
friendly_name = p12.get_friendlyname()
return (pkey, crt, other_certs, friendly_name)
except IOError as exc:
except crypto.Error as exc:
raise PkcsError(exc)
def write(self, module, content, mode=None):
"""Write the PKCS#12 file."""
if self.backup:
self.backup_file = module.backup_local(self.path)
write_file(module, content, mode)
if self.return_content:
self.pkcs12_bytes = content
def _dump_privatekey(self, pkcs12):
pk = pkcs12.get_privatekey()
return crypto.dump_privatekey(crypto.FILETYPE_PEM, pk) if pk else None
def _dump_certificate(self, pkcs12):
cert = pkcs12.get_certificate()
return crypto.dump_certificate(crypto.FILETYPE_PEM, cert) if cert else None
def _dump_other_certificates(self, pkcs12):
return [
crypto.dump_certificate(crypto.FILETYPE_PEM, other_cert)
for other_cert in pkcs12.get_ca_certificates()
]
def _get_friendly_name(self, pkcs12):
return pkcs12.get_friendlyname()
class PkcsCryptography(Pkcs):
def __init__(self, module):
super(PkcsCryptography, self).__init__(module, 'cryptography')
def generate_bytes(self, module):
"""Generate PKCS#12 file archive."""
pkey = None
if self.privatekey_path:
try:
pkey = load_privatekey(self.privatekey_path, self.privatekey_passphrase, backend=self.backend)
except OpenSSLBadPassphraseError as exc:
raise PkcsError(exc)
cert = None
if self.certificate_path:
cert = load_certificate(self.certificate_path, backend=self.backend)
friendly_name = to_bytes(self.friendly_name) if self.friendly_name is not None else None
# Store fake object which can be used to retrieve the components back
self.pkcs12 = (pkey, cert, self.other_certificates, friendly_name)
return serialize_key_and_certificates(
friendly_name,
pkey,
cert,
self.other_certificates,
serialization.BestAvailableEncryption(to_bytes(self.passphrase))
if self.passphrase else serialization.NoEncryption(),
)
def parse_bytes(self, pkcs12_content):
try:
private_key, certificate, additional_certificates, friendly_name = parse_pkcs12(
pkcs12_content, self.passphrase)
pkey = None
if private_key is not None:
pkey = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
crt = None
if certificate is not None:
crt = certificate.public_bytes(serialization.Encoding.PEM)
other_certs = []
if additional_certificates is not None:
other_certs = [
other_cert.public_bytes(serialization.Encoding.PEM)
for other_cert in additional_certificates
]
return (pkey, crt, other_certs, friendly_name)
except ValueError as exc:
raise PkcsError(exc)
# The following methods will get self.pkcs12 passed, which is computed as:
#
# self.pkcs12 = (pkey, cert, self.other_certificates, self.friendly_name)
def _dump_privatekey(self, pkcs12):
return pkcs12[0].private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
) if pkcs12[0] else None
def _dump_certificate(self, pkcs12):
return pkcs12[1].public_bytes(serialization.Encoding.PEM) if pkcs12[1] else None
def _dump_other_certificates(self, pkcs12):
return [other_cert.public_bytes(serialization.Encoding.PEM) for other_cert in pkcs12[2]]
def _get_friendly_name(self, pkcs12):
return pkcs12[3]
def select_backend(module, backend):
if backend == 'auto':
# Detection what is possible
can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION)
can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION)
# If no restrictions are provided, first try cryptography, then pyOpenSSL
if module.params['iter_size'] is not None or module.params['maciter_size'] is not None:
# If iter_size or maciter_size is specified, use pyOpenSSL backend
backend = 'pyopenssl'
elif can_use_cryptography:
backend = 'cryptography'
elif can_use_pyopenssl:
backend = 'pyopenssl'
# Success?
if backend == 'auto':
module.fail_json(msg=("Can't detect any of the required Python libraries "
"cryptography (>= {0}) or PyOpenSSL (>= {1})").format(
MINIMAL_CRYPTOGRAPHY_VERSION,
MINIMAL_PYOPENSSL_VERSION))
if backend == 'pyopenssl':
if not PYOPENSSL_FOUND:
module.fail_json(msg=missing_required_lib('pyOpenSSL >= {0}'.format(MINIMAL_PYOPENSSL_VERSION)),
exception=PYOPENSSL_IMP_ERR)
# module.deprecate('The module is using the PyOpenSSL backend. This backend has been deprecated',
# version='x.0.0', collection_name='community.crypto')
return backend, PkcsPyOpenSSL(module)
elif backend == 'cryptography':
if not CRYPTOGRAPHY_FOUND:
module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)),
exception=CRYPTOGRAPHY_IMP_ERR)
return backend, PkcsCryptography(module)
else:
raise ValueError('Unsupported value for backend: {0}'.format(backend))
def main():
@@ -459,8 +677,8 @@ def main():
certificate_path=dict(type='path'),
force=dict(type='bool', default=False),
friendly_name=dict(type='str', aliases=['name']),
iter_size=dict(type='int', default=2048),
maciter_size=dict(type='int', default=1),
iter_size=dict(type='int'),
maciter_size=dict(type='int'),
passphrase=dict(type='str', no_log=True),
path=dict(type='path', required=True),
privatekey_passphrase=dict(type='str', no_log=True),
@@ -469,6 +687,7 @@ def main():
src=dict(type='path'),
backup=dict(type='bool', default=False),
return_content=dict(type='bool', default=False),
select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography', 'pyopenssl']),
)
required_if = [
@@ -482,8 +701,7 @@ def main():
supports_check_mode=True,
)
if not pyopenssl_found:
module.fail_json(msg=missing_required_lib('pyOpenSSL'), exception=PYOPENSSL_IMP_ERR)
backend, pkcs12 = select_backend(module, module.params['select_crypto_backend'])
base_dir = os.path.dirname(module.params['path']) or '.'
if not os.path.isdir(base_dir):
@@ -493,7 +711,6 @@ def main():
)
try:
pkcs12 = Pkcs(module)
changed = False
if module.params['state'] == 'present':
@@ -506,7 +723,7 @@ def main():
if module.params['action'] == 'export':
if not module.params['friendly_name']:
module.fail_json(msg='Friendly_name is required')
pkcs12_content = pkcs12.generate(module)
pkcs12_content = pkcs12.generate_bytes(module)
pkcs12.write(module, pkcs12_content, 0o600)
changed = True
else: