openssl_*: improve passphrase handling for private keys in PyOpenSSL (#53489)

* Raise OpenSSLBadPassphraseError if passphrase is wrong.

* Improve handling of passphrase errors.

Current behavior for modules is: if passphrase is wrong (or wrongly specified), fail.
Current behavior for openssl_privatekey is: if passphrase is worng (or wrongly specified), regenerate.

* Add changelog.

* Add tests.

* Adjustments for some versions of PyOpenSSL.

* Update lib/ansible/modules/crypto/openssl_certificate.py

Improve text.

Co-Authored-By: felixfontein <felix@fontein.de>
This commit is contained in:
Felix Fontein
2019-03-08 17:21:18 +01:00
committed by John R Barker
parent 1d91e03119
commit caf7fd2245
20 changed files with 427 additions and 36 deletions

View File

@@ -38,6 +38,10 @@ class OpenSSLObjectError(Exception):
pass
class OpenSSLBadPassphraseError(OpenSSLObjectError):
pass
def get_fingerprint_of_bytes(source):
"""Generate the fingerprint of the given bytes."""
@@ -67,7 +71,7 @@ def get_fingerprint_of_bytes(source):
def get_fingerprint(path, passphrase=None):
"""Generate the fingerprint of the public key. """
privatekey = load_privatekey(path, passphrase)
privatekey = load_privatekey(path, passphrase, check_passphrase=False)
try:
publickey = crypto.dump_publickey(crypto.FILETYPE_ASN1, privatekey)
return get_fingerprint_of_bytes(publickey)
@@ -78,20 +82,46 @@ def get_fingerprint(path, passphrase=None):
return None
def load_privatekey(path, passphrase=None):
def load_privatekey(path, passphrase=None, check_passphrase=True):
"""Load the specified OpenSSL private key."""
try:
with open(path, 'rb') as b_priv_key_fh:
priv_key_detail = b_priv_key_fh.read()
if passphrase:
return crypto.load_privatekey(crypto.FILETYPE_PEM,
priv_key_detail,
to_bytes(passphrase))
else:
return crypto.load_privatekey(crypto.FILETYPE_PEM,
priv_key_detail)
# First try: try to load with real passphrase (resp. empty string)
# Will work if this is the correct passphrase, or the key is not
# password-protected.
try:
result = crypto.load_privatekey(crypto.FILETYPE_PEM,
priv_key_detail,
to_bytes(passphrase or ''))
except crypto.Error as e:
if len(e.args) > 0 and len(e.args[0]) > 0 and e.args[0][0][2] == 'bad decrypt':
# This happens in case we have the wrong passphrase.
if passphrase is not None:
raise OpenSSLBadPassphraseError('Wrong passphrase provided for private key!')
else:
raise OpenSSLBadPassphraseError('No passphrase provided, but private key is password-protected!')
raise
if check_passphrase:
# Next we want to make sure that the key is actually protected by
# a passphrase (in case we did try the empty string before, make
# sure that the key is not protected by the empty string)
try:
crypto.load_privatekey(crypto.FILETYPE_PEM,
priv_key_detail,
to_bytes('y' if passphrase == 'x' else 'x'))
if passphrase is not None:
# Since we can load the key without an exception, the
# key isn't password-protected
raise OpenSSLBadPassphraseError('Passphrase provided, but private key is not password-protected!')
except crypto.Error:
if passphrase is None and len(e.args) > 0 and len(e.args[0]) > 0 and e.args[0][0][2] == 'bad decrypt':
# The key is obviously protected by the empty string.
# Don't do this at home (if it's possible at all)...
raise OpenSSLBadPassphraseError('No passphrase provided, but private key is password-protected!')
return result
except (IOError, OSError) as exc:
raise OpenSSLObjectError(exc)

View File

@@ -596,10 +596,13 @@ class Certificate(crypto_utils.OpenSSLObject):
self.cert = crypto_utils.load_certificate(self.path)
if self.privatekey_path:
self.privatekey = crypto_utils.load_privatekey(
self.privatekey_path,
self.privatekey_passphrase
)
try:
self.privatekey = crypto_utils.load_privatekey(
self.privatekey_path,
self.privatekey_passphrase
)
except crypto_utils.OpenSSLBadPassphraseError as exc:
raise CertificateError(exc)
return _validate_privatekey()
if self.csr_path:
@@ -621,9 +624,12 @@ class SelfSignedCertificate(Certificate):
self.version = module.params['selfsigned_version']
self.serial_number = randint(1000, 99999)
self.csr = crypto_utils.load_certificate_request(self.csr_path)
self.privatekey = crypto_utils.load_privatekey(
self.privatekey_path, self.privatekey_passphrase
)
try:
self.privatekey = crypto_utils.load_privatekey(
self.privatekey_path, self.privatekey_passphrase
)
except crypto_utils.OpenSSLBadPassphraseError as exc:
module.fail_json(msg=str(exc))
def generate(self, module):
@@ -703,9 +709,12 @@ class OwnCACertificate(Certificate):
self.ca_privatekey_passphrase = module.params['ownca_privatekey_passphrase']
self.csr = crypto_utils.load_certificate_request(self.csr_path)
self.ca_cert = crypto_utils.load_certificate(self.ca_cert_path)
self.ca_privatekey = crypto_utils.load_privatekey(
self.ca_privatekey_path, self.ca_privatekey_passphrase
)
try:
self.ca_privatekey = crypto_utils.load_privatekey(
self.ca_privatekey_path, self.ca_privatekey_passphrase
)
except crypto_utils.OpenSSLBadPassphraseError as exc:
module.fail_json(msg=str(exc))
def generate(self, module):
@@ -1003,10 +1012,15 @@ class AssertOnlyCertificate(Certificate):
self.assertonly()
if self.privatekey_path and \
not super(AssertOnlyCertificate, self).check(module, perms_required=False):
try:
if self.privatekey_path and \
not super(AssertOnlyCertificate, self).check(module, perms_required=False):
self.message.append(
'Certificate %s and private key %s do not match' % (self.path, self.privatekey_path)
)
except CertificateError as e:
self.message.append(
'Certificate %s and private key %s does not match' % (self.path, self.privatekey_path)
'Error while reading private key %s: %s' % (self.privatekey_path, str(e))
)
if len(self.message):

View File

@@ -528,7 +528,10 @@ class CertificateSigningRequestPyOpenSSL(CertificateSigningRequestBase):
return crypto.dump_certificate_request(crypto.FILETYPE_PEM, self.request)
def _load_private_key(self):
self.privatekey = crypto_utils.load_privatekey(self.privatekey_path, self.privatekey_passphrase)
try:
self.privatekey = crypto_utils.load_privatekey(self.privatekey_path, self.privatekey_passphrase)
except crypto_utils.OpenSSLBadPassphraseError as exc:
raise CertificateSigningRequestError(exc)
def _check_csr(self):
def _check_subject(csr):
@@ -776,7 +779,7 @@ class CertificateSigningRequestCryptography(CertificateSigningRequestBase):
cryptography.x509.NameAttribute(self._get_name_oid(entry[0]), to_text(entry[1])) for entry in self.subject
]))
except ValueError as e:
raise CertificateSigningRequestError(str(e))
raise CertificateSigningRequestError(e)
if self.subjectAltName:
csr = csr.add_extension(cryptography.x509.SubjectAlternativeName([

View File

@@ -215,6 +215,8 @@ class Pkcs(crypto_utils.OpenSSLObject):
self.privatekey_passphrase)
except crypto.Error:
return False
except crypto_utils.OpenSSLBadPassphraseError:
return False
return True
if not state_and_perms:
@@ -256,10 +258,13 @@ class Pkcs(crypto_utils.OpenSSLObject):
self.pkcs12.set_friendlyname(to_bytes(self.friendly_name))
if self.privatekey_path:
self.pkcs12.set_privatekey(crypto_utils.load_privatekey(
self.privatekey_path,
self.privatekey_passphrase)
)
try:
self.pkcs12.set_privatekey(crypto_utils.load_privatekey(
self.privatekey_path,
self.privatekey_passphrase)
)
except crypto_utils.OpenSSLBadPassphraseError as exc:
raise PkcsError(exc)
try:
pkcs12_file = os.open(self.path,

View File

@@ -365,6 +365,8 @@ class PrivateKeyPyOpenSSL(PrivateKeyBase):
return True
except crypto.Error:
return False
except crypto_utils.OpenSSLBadPassphraseError as exc:
return False
def _check_size_and_type(self):
def _check_size(privatekey):
@@ -373,7 +375,10 @@ class PrivateKeyPyOpenSSL(PrivateKeyBase):
def _check_type(privatekey):
return self.type == privatekey.type()
privatekey = crypto_utils.load_privatekey(self.path, self.passphrase)
try:
privatekey = crypto_utils.load_privatekey(self.path, self.passphrase)
except crypto_utils.OpenSSLBadPassphraseError as exc:
raise PrivateKeyError(exc)
return _check_size(privatekey) and _check_type(privatekey)
@@ -513,10 +518,19 @@ class PrivateKeyCryptography(PrivateKeyBase):
def _check_passphrase(self):
try:
self._load_privatekey()
with open(self.path, 'rb') as f:
return cryptography.hazmat.primitives.serialization.load_pem_private_key(
f.read(),
None if self.passphrase is None else to_bytes(self.passphrase),
backend=self.cryptography_backend
)
return True
except crypto.Error:
return False
except TypeError as e:
if 'Password' in str(e) and 'encrypted' in str(e):
return False
raise PrivateKeyError(e)
except Exception as e:
raise PrivateKeyError(e)
def _check_size_and_type(self):
privatekey = self._load_privatekey()

View File

@@ -200,6 +200,8 @@ class PublicKey(crypto_utils.OpenSSLObject):
publickey_file.write(publickey_content)
self.changed = True
except crypto_utils.OpenSSLBadPassphraseError as exc:
raise PublicKeyError(exc)
except (IOError, OSError) as exc:
raise PublicKeyError(exc)
except AttributeError as exc:
@@ -237,10 +239,13 @@ class PublicKey(crypto_utils.OpenSSLObject):
except (crypto.Error, ValueError):
return False
desired_publickey = crypto.dump_publickey(
crypto.FILETYPE_ASN1,
crypto_utils.load_privatekey(self.privatekey_path, self.privatekey_passphrase)
)
try:
desired_publickey = crypto.dump_publickey(
crypto.FILETYPE_ASN1,
crypto_utils.load_privatekey(self.privatekey_path, self.privatekey_passphrase)
)
except crypto_utils.OpenSSLBadPassphraseError as exc:
raise PublicKeyError(exc)
return current_publickey == desired_publickey