openssh_keypair: Adding passphrase parameter (#225)

* Integrating openssh module utils with openssh_keypair

* Added explicit PEM formatting for OpenSSH < 7.8

* Adding changelog fragment

* Adding OpenSSL/cryptography dependency for integration tests

* Adding private_key_format option and removing forced cryptography update for CI

* Fixed version check for bcrypt and key_format option name

* Setting no_log=False for private_key_format

* Docs correction and simplification of control flow for private_key_format
This commit is contained in:
Ajpantuso
2021-05-10 08:47:01 -04:00
committed by GitHub
parent 37c1540ff4
commit 6100d9b4df
8 changed files with 395 additions and 69 deletions

View File

@@ -19,24 +19,23 @@ from __future__ import absolute_import, division, print_function
__metaclass__ = type
from base64 import b64encode, b64decode
from distutils.version import LooseVersion
from getpass import getuser
from socket import gethostname
from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
HAS_CRYPTOGRAPHY,
CRYPTOGRAPHY_HAS_ED25519,
)
try:
import cryptography as c
from cryptography.exceptions import InvalidSignature, UnsupportedAlgorithm
from cryptography.hazmat.backends.openssl import backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa, padding
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey, Ed25519PublicKey
except ImportError:
pass
if HAS_CRYPTOGRAPHY and CRYPTOGRAPHY_HAS_ED25519:
if LooseVersion(c.__version__) >= LooseVersion("3.0"):
HAS_OPENSSH_PRIVATE_FORMAT = True
else:
HAS_OPENSSH_PRIVATE_FORMAT = False
HAS_OPENSSH_SUPPORT = True
_ALGORITHM_PARAMETERS = {
@@ -76,7 +75,8 @@ if HAS_CRYPTOGRAPHY and CRYPTOGRAPHY_HAS_ED25519:
}
}
}
else:
except ImportError:
HAS_OPENSSH_PRIVATE_FORMAT = False
HAS_OPENSSH_SUPPORT = False
_ALGORITHM_PARAMETERS = {}
@@ -192,12 +192,14 @@ class Asymmetric_Keypair(object):
)
@classmethod
def load(cls, path, passphrase=None, key_format='PEM'):
def load(cls, path, passphrase=None, private_key_format='PEM', public_key_format='PEM', no_public_key=False):
"""Returns an Asymmetric_Keypair object loaded from the supplied file path
:path: A path to an existing private key to be loaded
:passphrase: Secret of type bytes used to decrypt the private key being loaded
:key_format: Format of key files to be loaded
:private_key_format: Format of private key to be loaded
:public_key_format: Format of public key to be loaded
:no_public_key: Set 'True' to only load a private key and automatically populate the matching public key
"""
if passphrase:
@@ -205,8 +207,11 @@ class Asymmetric_Keypair(object):
else:
encryption_algorithm = serialization.NoEncryption()
privatekey = load_privatekey(path, passphrase, key_format)
publickey = load_publickey(path + '.pub', key_format)
privatekey = load_privatekey(path, passphrase, private_key_format)
if no_public_key:
publickey = privatekey.public_key()
else:
publickey = load_publickey(path + '.pub', public_key_format)
# Ed25519 keys are always of size 256 and do not have a key_size attribute
if isinstance(privatekey, Ed25519PrivateKey):
@@ -352,11 +357,11 @@ class OpenSSH_Keypair(object):
:comment: Comment for a newly generated OpenSSH public key
"""
if not comment:
if comment is None:
comment = "%s@%s" % (getuser(), gethostname())
asym_keypair = Asymmetric_Keypair.generate(keytype, size, passphrase)
openssh_privatekey = cls.encode_openssh_privatekey(asym_keypair)
openssh_privatekey = cls.encode_openssh_privatekey(asym_keypair, 'SSH')
openssh_publickey = cls.encode_openssh_publickey(asym_keypair, comment)
fingerprint = calculate_fingerprint(openssh_publickey)
@@ -365,20 +370,21 @@ class OpenSSH_Keypair(object):
openssh_privatekey=openssh_privatekey,
openssh_publickey=openssh_publickey,
fingerprint=fingerprint,
comment=comment
comment=comment,
)
@classmethod
def load(cls, path, passphrase=None):
def load(cls, path, passphrase=None, no_public_key=False):
"""Returns an Openssh_Keypair object loaded from the supplied file path
:path: A path to an existing private key to be loaded
:passphrase: Secret used to decrypt the private key being loaded
:no_public_key: Set 'True' to only load a private key and automatically populate the matching public key
"""
comment = extract_comment(path + '.pub')
asym_keypair = Asymmetric_Keypair.load(path, passphrase, 'SSH')
openssh_privatekey = cls.encode_openssh_privatekey(asym_keypair)
asym_keypair = Asymmetric_Keypair.load(path, passphrase, 'SSH', 'SSH', no_public_key)
openssh_privatekey = cls.encode_openssh_privatekey(asym_keypair, 'SSH')
openssh_publickey = cls.encode_openssh_publickey(asym_keypair, comment)
fingerprint = calculate_fingerprint(openssh_publickey)
@@ -387,21 +393,31 @@ class OpenSSH_Keypair(object):
openssh_privatekey=openssh_privatekey,
openssh_publickey=openssh_publickey,
fingerprint=fingerprint,
comment=comment
comment=comment,
)
@staticmethod
def encode_openssh_privatekey(asym_keypair):
def encode_openssh_privatekey(asym_keypair, key_format):
"""Returns an OpenSSH encoded private key for a given keypair
:asym_keypair: Asymmetric_Keypair from the private key is extracted
:key_format: Format of the encoded private key.
"""
# OpenSSH formatted private keys are not available in Cryptography <3.0
try:
privatekey_format = serialization.PrivateFormat.OpenSSH
except AttributeError:
if key_format == 'SSH':
# Default to PEM format if SSH not available
if not HAS_OPENSSH_PRIVATE_FORMAT:
privatekey_format = serialization.PrivateFormat.PKCS8
else:
privatekey_format = serialization.PrivateFormat.OpenSSH
elif key_format == 'PKCS8':
privatekey_format = serialization.PrivateFormat.PKCS8
elif key_format == 'PKCS1':
if asym_keypair.key_type == 'ed25519':
raise InvalidKeyFormatError("ed25519 keys cannot be represented in PKCS1 format")
privatekey_format = serialization.PrivateFormat.TraditionalOpenSSL
else:
raise InvalidKeyFormatError("The accepted private key formats are SSH, PKCS8, and PKCS1")
encoded_privatekey = asym_keypair.private_key.private_bytes(
encoding=serialization.Encoding.PEM,
@@ -425,7 +441,7 @@ class OpenSSH_Keypair(object):
validate_comment(comment)
encoded_publickey += (" %s" % comment).encode(encoding=_TEXT_ENCODING)
encoded_publickey += (" %s" % comment).encode(encoding=_TEXT_ENCODING) if comment else b''
return encoded_publickey
@@ -502,7 +518,7 @@ class OpenSSH_Keypair(object):
validate_comment(comment)
self.__comment = comment
encoded_comment = (" %s" % self.__comment).encode(encoding=_TEXT_ENCODING)
encoded_comment = (" %s" % self.__comment).encode(encoding=_TEXT_ENCODING) if self.__comment else b''
self.__openssh_publickey = b' '.join(self.__openssh_publickey.split(b' ', 2)[:2]) + encoded_comment
return self.__openssh_publickey
@@ -513,7 +529,7 @@ class OpenSSH_Keypair(object):
"""
self.__asym_keypair.update_passphrase(passphrase)
self.__openssh_privatekey = OpenSSH_Keypair.encode_openssh_privatekey(self.__asym_keypair)
self.__openssh_privatekey = OpenSSH_Keypair.encode_openssh_privatekey(self.__asym_keypair, 'SSH')
def load_privatekey(path, passphrase, key_format):
@@ -549,7 +565,22 @@ def load_privatekey(path, passphrase, key_format):
)
except ValueError as e:
raise InvalidPrivateKeyFileError(e)
# Revert to PEM if key could not be loaded in SSH format
if key_format == 'SSH':
try:
privatekey = privatekey_loaders['PEM'](
data=content,
password=passphrase,
backend=backend,
)
except ValueError as e:
raise InvalidPrivateKeyFileError(e)
except TypeError as e:
raise InvalidPassphraseError(e)
except UnsupportedAlgorithm as e:
raise InvalidAlgorithmError(e)
else:
raise InvalidPrivateKeyFileError(e)
except TypeError as e:
raise InvalidPassphraseError(e)
except UnsupportedAlgorithm as e:
@@ -645,4 +676,4 @@ def calculate_fingerprint(openssh_publickey):
decoded_pubkey = b64decode(openssh_publickey.split(b' ')[1])
digest.update(decoded_pubkey)
return b64encode(digest.finalize()).decode(encoding=_TEXT_ENCODING).rstrip('=')
return 'SHA256:%s' % b64encode(digest.finalize()).decode(encoding=_TEXT_ENCODING).rstrip('=')