openssh_cert - Adding regenerate option (#256)

* Initial commit

* Fixing unit tests

* More unit fixes

* Adding changelog fragment

* Minor refactor in Certificate.generate()

* Addressing option case-sensitivity and directive overrides

* Renaming idempotency to regenerate

* updating changelog

* Minor refactoring of default options

* Cleaning up with inline functions

* Fixing false failures when regenerate=fail and improving clarity

* Applying second round of review suggestions

* adding helper for safe atomic moves
This commit is contained in:
Ajpantuso
2021-07-31 05:36:03 -04:00
committed by GitHub
parent d6403ace6e
commit aaba87ac57
10 changed files with 704 additions and 76 deletions

View File

@@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
#
# Copyright: (c) 2021, Andrew Pantuso (@ajpantuso) <ajpantuso@gmail.com>
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import os
def restore_on_failure(f):
def backup_and_restore(module, path, *args, **kwargs):
backup_file = module.backup_local(path) if os.path.exists(path) else None
try:
f(module, path, *args, **kwargs)
except Exception:
if backup_file is not None:
module.atomic_move(backup_file, path)
raise
else:
module.add_cleanup_file(backup_file)
return backup_and_restore
@restore_on_failure
def safe_atomic_move(module, path, destination):
module.atomic_move(path, destination)

View File

@@ -39,6 +39,7 @@ from datetime import datetime
from hashlib import sha256
from ansible.module_utils import six
from ansible.module_utils.common.text.converters import to_text
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import convert_relative_to_datetime
from ansible_collections.community.crypto.plugins.module_utils.openssh.utils import (
OpensshParser,
@@ -74,6 +75,29 @@ _ECDSA_CURVE_IDENTIFIERS_LOOKUP = {
_ALWAYS = datetime(1970, 1, 1)
_FOREVER = datetime.max
_CRITICAL_OPTIONS = (
'force-command',
'source-address',
'verify-required',
)
_DIRECTIVES = (
'clear',
'no-x11-forwarding',
'no-agent-forwarding',
'no-port-forwarding',
'no-pty',
'no-user-rc',
)
_EXTENSIONS = (
'permit-x11-forwarding',
'permit-agent-forwarding',
'permit-port-forwarding',
'permit-pty',
'permit-user-rc'
)
if six.PY3:
long = int
@@ -92,6 +116,9 @@ class OpensshCertificateTimeParameters(object):
else:
return self._valid_from == other._valid_from and self._valid_to == other._valid_to
def __ne__(self, other):
return not self == other
@property
def validity_string(self):
if not (self._valid_from == _ALWAYS and self._valid_to == _FOREVER):
@@ -131,12 +158,14 @@ class OpensshCertificateTimeParameters(object):
@staticmethod
def to_datetime(time_string_or_timestamp):
try:
if isinstance(time_string_or_timestamp, str):
if isinstance(time_string_or_timestamp, six.string_types):
result = OpensshCertificateTimeParameters._time_string_to_datetime(time_string_or_timestamp.strip())
elif isinstance(time_string_or_timestamp, (long, int)):
result = OpensshCertificateTimeParameters._timestamp_to_datetime(time_string_or_timestamp)
else:
raise ValueError("Value must be of type (str, int, long) not %s" % type(time_string_or_timestamp))
raise ValueError(
"Value must be of type (str, unicode, int, long) not %s" % type(time_string_or_timestamp)
)
except ValueError:
raise
return result
@@ -174,6 +203,78 @@ class OpensshCertificateTimeParameters(object):
return result
class OpensshCertificateOption(object):
def __init__(self, option_type, name, data):
if option_type not in ('critical', 'extension'):
raise ValueError("type must be either 'critical' or 'extension'")
if not isinstance(name, six.string_types):
raise TypeError("name must be a string not %s" % type(name))
if not isinstance(data, six.string_types):
raise TypeError("data must be a string not %s" % type(data))
self._option_type = option_type
self._name = name.lower()
self._data = data
def __eq__(self, other):
if not isinstance(other, type(self)):
return NotImplemented
return all([
self._option_type == other._option_type,
self._name == other._name,
self._data == other._data,
])
def __hash__(self):
return hash((self._option_type, self._name, self._data))
def __ne__(self, other):
return not self == other
def __str__(self):
if self._data:
return "%s=%s" % (self._name, self._data)
return self._name
@property
def data(self):
return self._data
@property
def name(self):
return self._name
@property
def type(self):
return self._option_type
@classmethod
def from_string(cls, option_string):
if not isinstance(option_string, six.string_types):
raise ValueError("option_string must be a string not %s" % type(option_string))
option_type = None
if ':' in option_string:
option_type, value = option_string.strip().split(':', 1)
if '=' in value:
name, data = value.split('=', 1)
else:
name, data = value, ''
elif '=' in option_string:
name, data = option_string.strip().split('=', 1)
else:
name, data = option_string.strip(), ''
return cls(
option_type=option_type or get_option_type(name.lower()),
name=name,
data=data
)
@six.add_metaclass(abc.ABCMeta)
class OpensshCertificateInfo:
"""Encapsulates all certificate information which is signed by a CA key"""
@@ -402,7 +503,7 @@ class OpensshCertificate(object):
@property
def type_string(self):
return self._cert_info.type_string
return to_text(self._cert_info.type_string)
@property
def nonce(self):
@@ -410,7 +511,7 @@ class OpensshCertificate(object):
@property
def public_key(self):
return self._cert_info.public_key_fingerprint()
return to_text(self._cert_info.public_key_fingerprint())
@property
def serial(self):
@@ -422,11 +523,11 @@ class OpensshCertificate(object):
@property
def key_id(self):
return self._cert_info.key_id
return to_text(self._cert_info.key_id)
@property
def principals(self):
return self._cert_info.principals
return [to_text(p) for p in self._cert_info.principals]
@property
def valid_after(self):
@@ -438,11 +539,13 @@ class OpensshCertificate(object):
@property
def critical_options(self):
return self._cert_info.critical_options
return [
OpensshCertificateOption('critical', to_text(n), to_text(d)) for n, d in self._cert_info.critical_options
]
@property
def extensions(self):
return self._cert_info.extensions
return [OpensshCertificateOption('extension', to_text(n), to_text(d)) for n, d in self._cert_info.extensions]
@property
def reserved(self):
@@ -450,7 +553,7 @@ class OpensshCertificate(object):
@property
def signing_key(self):
return self._cert_info.signing_key_fingerprint()
return to_text(self._cert_info.signing_key_fingerprint())
@staticmethod
def _parse_cert_info(pub_key_type, parser):
@@ -484,14 +587,36 @@ class OpensshCertificate(object):
'principals': self.principals,
'valid_after': time_parameters.valid_from(date_format='human_readable'),
'valid_before': time_parameters.valid_to(date_format='human_readable'),
'critical_options': self.critical_options,
'extensions': [e[0] for e in self.extensions],
'critical_options': [str(critical_option) for critical_option in self.critical_options],
'extensions': [str(extension) for extension in self.extensions],
'reserved': self.reserved,
'public_key': self.public_key,
'signing_key': self.signing_key,
}
def apply_directives(directives):
if any(d not in _DIRECTIVES for d in directives):
raise ValueError("directives must be one of %s" % ", ".join(_DIRECTIVES))
directive_to_option = {
'no-x11-forwarding': OpensshCertificateOption('extension', 'permit-x11-forwarding', ''),
'no-agent-forwarding': OpensshCertificateOption('extension', 'permit-agent-forwarding', ''),
'no-port-forwarding': OpensshCertificateOption('extension', 'permit-port-forwarding', ''),
'no-pty': OpensshCertificateOption('extension', 'permit-pty', ''),
'no-user-rc': OpensshCertificateOption('extension', 'permit-user-rc', ''),
}
if 'clear' in directives:
return []
else:
return list(set(default_options()) - set(directive_to_option[d] for d in directives))
def default_options():
return [OpensshCertificateOption('extension', name, '') for name in _EXTENSIONS]
def fingerprint(public_key):
"""Generates a SHA256 hash and formats output to resemble ``ssh-keygen``"""
h = sha256()
@@ -514,5 +639,34 @@ def get_cert_info_object(key_type):
return cert_info
def get_option_type(name):
if name in _CRITICAL_OPTIONS:
result = 'critical'
elif name in _EXTENSIONS:
result = 'extension'
else:
raise ValueError("%s is not a valid option. " % name +
"Custom options must start with 'critical:' or 'extension:' to indicate type")
return result
def is_relative_time_string(time_string):
return time_string.startswith("+") or time_string.startswith("-")
def parse_option_list(option_list):
critical_options = []
directives = []
extensions = []
for option in option_list:
if option.lower() in _DIRECTIVES:
directives.append(option.lower())
else:
option_object = OpensshCertificateOption.from_string(option)
if option_object.type == 'critical':
critical_options.append(option_object)
else:
extensions.append(option_object)
return critical_options, list(set(extensions + apply_directives(directives)))