get_certificate: add get_certificate_chain option (#784)

* Implement get_certificate_chain option.

* Implement basic tests.

* Add compatibility for current Python 3.13 pre-releases.
This commit is contained in:
Felix Fontein
2024-07-10 21:51:30 +02:00
committed by GitHub
parent 4c26fada5e
commit d50c3cc944
3 changed files with 114 additions and 3 deletions

View File

@@ -107,12 +107,23 @@ options:
type: list
elements: raw
version_added: 2.21.0
get_certificate_chain:
description:
- If set to V(true), will obtain the certificate chain next to the certificate itself.
- The chain as returned by the server can be found in RV(unverified_chain), and the chain that passed validation
in RV(verified_chain).
- B(Note) that this needs B(Python 3.10 or newer). Also note that only Python 3.13 or newer officially supports this.
The module uses internal APIs of Python 3.10, 3.11, and 3.12 to achieve the same. It can be that future versions of
Python 3.10, 3.11, or 3.12 break this.
type: bool
default: false
version_added: 2.21.0
notes:
- When using ca_cert on OS X it has been reported that in some conditions the validate will always succeed.
requirements:
- "Python >= 2.7 when using O(proxy_host)"
- "Python >= 2.7 when using O(proxy_host), and Python >= 3.10 when O(get_certificate_chain=true)"
- "cryptography >= 1.6"
seealso:
@@ -189,6 +200,26 @@ version:
description: The version number of the certificate.
returned: success
type: str
verified_chain:
description:
- The verified certificate chain retrieved from the port.
- The first entry is always RV(cert).
- The last certificate the root certificate the chain is traced to. If O(ca_cert) is provided this certificate is part of that store;
otherwise it is part of the store used by default by Python.
- Note that RV(unverified_chain) generally does not contain the root certificate, and might contain other certificates that are not part
of the validated chain.
returned: success and O(get_certificate_chain=true)
type: list
elements: str
version_added: 2.21.0
unverified_chain:
description:
- The certificate chain retrieved from the port.
- The first entry is always RV(cert).
returned: success and O(get_certificate_chain=true)
type: list
elements: str
version_added: 2.21.0
'''
EXAMPLES = '''
@@ -240,6 +271,7 @@ import atexit
import base64
import traceback
import ssl
import sys
from os.path import isfile
from socket import create_connection, setdefaulttimeout, socket
@@ -317,6 +349,7 @@ def main():
ciphers=dict(type='list', elements='str'),
asn1_base64=dict(type='bool'),
tls_ctx_options=dict(type='list', elements='raw'),
get_certificate_chain=dict(type='bool', default=False),
),
)
@@ -330,7 +363,9 @@ def main():
start_tls_server_type = module.params.get('starttls')
ciphers = module.params.get('ciphers')
asn1_base64 = module.params['asn1_base64']
tls_ctx_options = module.params.get('tls_ctx_options')
tls_ctx_options = module.params['tls_ctx_options']
get_certificate_chain = module.params['get_certificate_chain']
if asn1_base64 is None:
module.deprecate(
'The default value `false` for asn1_base64 is deprecated and will change to `true` in '
@@ -341,6 +376,12 @@ def main():
)
asn1_base64 = False
if get_certificate_chain and sys.version_info < (3, 10):
module.fail_json(
msg='get_certificate_chain=true can only be used with Python 3.10 (Python 3.13+ officially supports this). '
'The Python version used to run the get_certificate module is %s' % sys.version
)
backend = module.params.get('select_crypto_backend')
if backend == 'auto':
# Detection what is possible
@@ -371,6 +412,9 @@ def main():
if not isfile(ca_cert):
module.fail_json(msg="ca_cert file does not exist")
verified_chain = None
unverified_chain = None
if not HAS_CREATE_DEFAULT_CONTEXT:
# Python < 2.7.9
if proxy_host:
@@ -450,8 +494,43 @@ def main():
except Exception as e:
module.fail_json(msg="Failed to add {0} to CTX options".format(tls_ctx_option_str or tls_ctx_option_int))
cert = ctx.wrap_socket(sock, server_hostname=server_name or host).getpeercert(True)
tls_sock = ctx.wrap_socket(sock, server_hostname=server_name or host)
cert = tls_sock.getpeercert(True)
cert = DER_cert_to_PEM_cert(cert)
if get_certificate_chain:
if sys.version_info < (3, 13):
# The official way to access this has been added in https://github.com/python/cpython/pull/109113/files.
# We're basically doing the same for older Python versions. The internal API needed for this was added
# in https://github.com/python/cpython/commit/666991fc598bc312d72aff0078ecb553f0a968f1, which was first
# released in Python 3.10.0.
def _convert_chain(chain):
if not chain:
return []
return [c.public_bytes(ssl._ssl.ENCODING_DER) for c in chain]
ssl_obj = tls_sock._sslobj # This is of type ssl._ssl._SSLSocket
verified_der_chain = _convert_chain(ssl_obj.get_verified_chain())
unverified_der_chain = _convert_chain(ssl_obj.get_unverified_chain())
else:
# This works with Python 3.13+
# Unfortunately due to a bug (https://github.com/python/cpython/issues/118658) some early pre-releases of
# Python 3.13 do not return lists of byte strings, but lists of _ssl.Certificate objects. This is going to
# be fixed by https://github.com/python/cpython/pull/118669. For now we convert the certificates ourselves
# if they are not byte strings to work around this.
def _convert_chain(chain):
return [
c if isinstance(c, bytes) else c.public_bytes(ssl._ssl.ENCODING_DER)
for c in chain
]
verified_der_chain = _convert_chain(tls_sock.get_verified_chain())
unverified_der_chain = _convert_chain(tls_sock.get_unverified_chain())
verified_chain = [DER_cert_to_PEM_cert(c) for c in verified_der_chain]
unverified_chain = [DER_cert_to_PEM_cert(c) for c in unverified_der_chain]
except Exception as e:
if proxy_host:
module.fail_json(msg="Failed to get cert via proxy {0}:{1} from {2}:{3}, error: {4}".format(
@@ -499,6 +578,11 @@ def main():
else:
result['version'] = "unknown"
if verified_chain is not None:
result['verified_chain'] = verified_chain
if unverified_chain is not None:
result['unverified_chain'] = unverified_chain
module.exit_json(**result)

View File

@@ -10,6 +10,8 @@
- set_fact:
skip_tests: false
has_get_certificate_chain: >-
{{ ansible_facts.python_version is version('3.10.0', '>=') }}
- block:

View File

@@ -123,6 +123,7 @@
port: 443
select_crypto_backend: "{{ select_crypto_backend }}"
asn1_base64: true
get_certificate_chain: "{{ has_get_certificate_chain }}"
register: result
- assert:
@@ -130,6 +131,30 @@
- result is not changed
- result is not failed
- name: Read CA cert
slurp:
src: '{{ remote_tmp_dir }}/temp.pem'
register: cacert
when: has_get_certificate_chain
- name: Validate get_certificate_chain=true results
assert:
that:
- result.verified_chain is sequence
- result.unverified_chain is sequence
- result.verified_chain[0] == result.cert
- result.unverified_chain[0] == result.cert
- result.verified_chain[-1] == cacert.content | b64decode
- result.verified_chain == result.unverified_chain + [cacert.content | b64decode]
when: has_get_certificate_chain
- name: Validate get_certificate_chain=false results
assert:
that:
- result.verified_chain is undefined
- result.unverified_chain is undefined
when: not has_get_certificate_chain
- name: Generate bogus CA privatekey
openssl_privatekey:
path: '{{ remote_tmp_dir }}/bogus_ca.key'