mirror of
https://github.com/ansible-collections/community.crypto.git
synced 2026-05-06 13:22:58 +00:00
openssh_cert - cleanup and diff support (#255)
* Initial commit * Fixing units * Adding changelog fragment * Enhanced encapsulation of certificate data * Avoiding failure when path is not parseable * Diff refactor * Applying initial review suggestions
This commit is contained in:
@@ -35,9 +35,11 @@ import abc
|
||||
import binascii
|
||||
import os
|
||||
from base64 import b64encode
|
||||
from datetime import datetime
|
||||
from hashlib import sha256
|
||||
|
||||
from ansible.module_utils import six
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import convert_relative_to_datetime
|
||||
from ansible_collections.community.crypto.plugins.module_utils.openssh.utils import (
|
||||
OpensshParser,
|
||||
_OpensshWriter,
|
||||
@@ -69,6 +71,108 @@ _ECDSA_CURVE_IDENTIFIERS_LOOKUP = {
|
||||
b'nistp521': 'ecdsa-nistp521',
|
||||
}
|
||||
|
||||
_ALWAYS = datetime(1970, 1, 1)
|
||||
_FOREVER = datetime.max
|
||||
|
||||
if six.PY3:
|
||||
long = int
|
||||
|
||||
|
||||
class OpensshCertificateTimeParameters(object):
|
||||
def __init__(self, valid_from, valid_to):
|
||||
self._valid_from = self.to_datetime(valid_from)
|
||||
self._valid_to = self.to_datetime(valid_to)
|
||||
|
||||
if self._valid_from > self._valid_to:
|
||||
raise ValueError("Valid from: %s must not be greater than Valid to: %s" % (valid_from, valid_to))
|
||||
|
||||
def __eq__(self, other):
|
||||
if not isinstance(other, type(self)):
|
||||
return NotImplemented
|
||||
else:
|
||||
return self._valid_from == other._valid_from and self._valid_to == other._valid_to
|
||||
|
||||
@property
|
||||
def validity_string(self):
|
||||
if not (self._valid_from == _ALWAYS and self._valid_to == _FOREVER):
|
||||
return "%s:%s" % (
|
||||
self.valid_from(date_format='openssh'), self.valid_to(date_format='openssh')
|
||||
)
|
||||
return ""
|
||||
|
||||
def valid_from(self, date_format):
|
||||
return self.format_datetime(self._valid_from, date_format)
|
||||
|
||||
def valid_to(self, date_format):
|
||||
return self.format_datetime(self._valid_to, date_format)
|
||||
|
||||
def within_range(self, valid_at):
|
||||
if valid_at is not None:
|
||||
valid_at_datetime = self.to_datetime(valid_at)
|
||||
return self._valid_from <= valid_at_datetime <= self._valid_to
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def format_datetime(dt, date_format):
|
||||
if date_format in ('human_readable', 'openssh'):
|
||||
if dt == _ALWAYS:
|
||||
result = 'always'
|
||||
elif dt == _FOREVER:
|
||||
result = 'forever'
|
||||
else:
|
||||
result = dt.isoformat() if date_format == 'human_readable' else dt.strftime("%Y%m%d%H%M%S")
|
||||
elif date_format == 'timestamp':
|
||||
td = dt - _ALWAYS
|
||||
result = int((td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / 10 ** 6)
|
||||
else:
|
||||
raise ValueError("%s is not a valid format" % date_format)
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def to_datetime(time_string_or_timestamp):
|
||||
try:
|
||||
if isinstance(time_string_or_timestamp, str):
|
||||
result = OpensshCertificateTimeParameters._time_string_to_datetime(time_string_or_timestamp.strip())
|
||||
elif isinstance(time_string_or_timestamp, (long, int)):
|
||||
result = OpensshCertificateTimeParameters._timestamp_to_datetime(time_string_or_timestamp)
|
||||
else:
|
||||
raise ValueError("Value must be of type (str, int, long) not %s" % type(time_string_or_timestamp))
|
||||
except ValueError:
|
||||
raise
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def _timestamp_to_datetime(timestamp):
|
||||
if timestamp == 0x0:
|
||||
result = _ALWAYS
|
||||
elif timestamp == 0xFFFFFFFFFFFFFFFF:
|
||||
result = _FOREVER
|
||||
else:
|
||||
try:
|
||||
result = datetime.utcfromtimestamp(timestamp)
|
||||
except OverflowError as e:
|
||||
raise ValueError
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def _time_string_to_datetime(time_string):
|
||||
result = None
|
||||
if time_string == 'always':
|
||||
result = _ALWAYS
|
||||
elif time_string == 'forever':
|
||||
result = _FOREVER
|
||||
elif is_relative_time_string(time_string):
|
||||
result = convert_relative_to_datetime(time_string)
|
||||
else:
|
||||
for time_format in ("%Y-%m-%d", "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"):
|
||||
try:
|
||||
result = datetime.strptime(time_string, time_format)
|
||||
except ValueError:
|
||||
pass
|
||||
if result is None:
|
||||
raise ValueError
|
||||
return result
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class OpensshCertificateInfo:
|
||||
@@ -83,7 +187,8 @@ class OpensshCertificateInfo:
|
||||
valid_before=None,
|
||||
critical_options=None,
|
||||
extensions=None,
|
||||
reserved=None):
|
||||
reserved=None,
|
||||
signing_key=None):
|
||||
self.nonce = nonce
|
||||
self.serial = serial
|
||||
self._cert_type = cert_type
|
||||
@@ -94,6 +199,7 @@ class OpensshCertificateInfo:
|
||||
self.critical_options = critical_options
|
||||
self.extensions = extensions
|
||||
self.reserved = reserved
|
||||
self.signing_key = signing_key
|
||||
|
||||
self.type_string = None
|
||||
|
||||
@@ -115,6 +221,9 @@ class OpensshCertificateInfo:
|
||||
else:
|
||||
raise ValueError("%s is not a valid certificate type" % cert_type)
|
||||
|
||||
def signing_key_fingerprint(self):
|
||||
return fingerprint(self.signing_key)
|
||||
|
||||
@abc.abstractmethod
|
||||
def public_key_fingerprint(self):
|
||||
pass
|
||||
@@ -241,10 +350,9 @@ class OpensshED25519CertificateInfo(OpensshCertificateInfo):
|
||||
# See https://cvsweb.openbsd.org/src/usr.bin/ssh/PROTOCOL.certkeys?annotate=HEAD
|
||||
class OpensshCertificate(object):
|
||||
"""Encapsulates a formatted OpenSSH certificate including signature and signing key"""
|
||||
def __init__(self, cert_info, signing_key, signature):
|
||||
def __init__(self, cert_info, signature):
|
||||
|
||||
self.cert_info = cert_info
|
||||
self.signing_key = signing_key
|
||||
self._cert_info = cert_info
|
||||
self.signature = signature
|
||||
|
||||
@classmethod
|
||||
@@ -278,7 +386,6 @@ class OpensshCertificate(object):
|
||||
|
||||
try:
|
||||
cert_info = cls._parse_cert_info(pub_key_type, parser)
|
||||
signing_key = parser.string()
|
||||
signature = parser.string()
|
||||
except (TypeError, ValueError) as e:
|
||||
raise ValueError("Invalid certificate data: %s" % e)
|
||||
@@ -290,12 +397,60 @@ class OpensshCertificate(object):
|
||||
|
||||
return cls(
|
||||
cert_info=cert_info,
|
||||
signing_key=signing_key,
|
||||
signature=signature,
|
||||
)
|
||||
|
||||
def signing_key_fingerprint(self):
|
||||
return fingerprint(self.signing_key)
|
||||
@property
|
||||
def type_string(self):
|
||||
return self._cert_info.type_string
|
||||
|
||||
@property
|
||||
def nonce(self):
|
||||
return self._cert_info.nonce
|
||||
|
||||
@property
|
||||
def public_key(self):
|
||||
return self._cert_info.public_key_fingerprint()
|
||||
|
||||
@property
|
||||
def serial(self):
|
||||
return self._cert_info.serial
|
||||
|
||||
@property
|
||||
def type(self):
|
||||
return self._cert_info.cert_type
|
||||
|
||||
@property
|
||||
def key_id(self):
|
||||
return self._cert_info.key_id
|
||||
|
||||
@property
|
||||
def principals(self):
|
||||
return self._cert_info.principals
|
||||
|
||||
@property
|
||||
def valid_after(self):
|
||||
return self._cert_info.valid_after
|
||||
|
||||
@property
|
||||
def valid_before(self):
|
||||
return self._cert_info.valid_before
|
||||
|
||||
@property
|
||||
def critical_options(self):
|
||||
return self._cert_info.critical_options
|
||||
|
||||
@property
|
||||
def extensions(self):
|
||||
return self._cert_info.extensions
|
||||
|
||||
@property
|
||||
def reserved(self):
|
||||
return self._cert_info.reserved
|
||||
|
||||
@property
|
||||
def signing_key(self):
|
||||
return self._cert_info.signing_key_fingerprint()
|
||||
|
||||
@staticmethod
|
||||
def _parse_cert_info(pub_key_type, parser):
|
||||
@@ -311,9 +466,31 @@ class OpensshCertificate(object):
|
||||
cert_info.critical_options = parser.option_list()
|
||||
cert_info.extensions = parser.option_list()
|
||||
cert_info.reserved = parser.string()
|
||||
cert_info.signing_key = parser.string()
|
||||
|
||||
return cert_info
|
||||
|
||||
def to_dict(self):
|
||||
time_parameters = OpensshCertificateTimeParameters(
|
||||
valid_from=self.valid_after,
|
||||
valid_to=self.valid_before
|
||||
)
|
||||
return {
|
||||
'type_string': self.type_string,
|
||||
'nonce': self.nonce,
|
||||
'serial': self.serial,
|
||||
'cert_type': self.type,
|
||||
'identifier': self.key_id,
|
||||
'principals': self.principals,
|
||||
'valid_after': time_parameters.valid_from(date_format='human_readable'),
|
||||
'valid_before': time_parameters.valid_to(date_format='human_readable'),
|
||||
'critical_options': self.critical_options,
|
||||
'extensions': [e[0] for e in self.extensions],
|
||||
'reserved': self.reserved,
|
||||
'public_key': self.public_key,
|
||||
'signing_key': self.signing_key,
|
||||
}
|
||||
|
||||
|
||||
def fingerprint(public_key):
|
||||
"""Generates a SHA256 hash and formats output to resemble ``ssh-keygen``"""
|
||||
@@ -335,3 +512,7 @@ def get_cert_info_object(key_type):
|
||||
raise ValueError("%s is not a valid key type" % key_type)
|
||||
|
||||
return cert_info
|
||||
|
||||
|
||||
def is_relative_time_string(time_string):
|
||||
return time_string.startswith("+") or time_string.startswith("-")
|
||||
|
||||
Reference in New Issue
Block a user