Files
kubernetes.core/plugins/module_utils/helm.py
Mike Graves 29c75fa1c6 Update for Ansible 2.15 sanity tests (#515)
Update for Ansible 2.15 sanity tests

Depends-On: ansible/ansible-zuul-jobs#1639
SUMMARY

Update for Ansible 2.15 sanity tests

Fixes #519
ISSUE TYPE

COMPONENT NAME

ADDITIONAL INFORMATION

Reviewed-by: Bikouo Aubin <None>
2022-10-06 09:50:26 +00:00

175 lines
5.2 KiB
Python

# -*- coding: utf-8 -*-
# Copyright: (c) 2020, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import os
import tempfile
import traceback
import re
import json
from ansible.module_utils.basic import missing_required_lib
from ansible.module_utils.six import string_types
try:
import yaml
HAS_YAML = True
YAML_IMP_ERR = None
except ImportError:
YAML_IMP_ERR = traceback.format_exc()
HAS_YAML = False
def prepare_helm_environ_update(module):
environ_update = {}
kubeconfig_path = None
if module.params.get("kubeconfig") is not None:
kubeconfig = module.params.get("kubeconfig")
if isinstance(kubeconfig, string_types):
kubeconfig_path = kubeconfig
elif isinstance(kubeconfig, dict):
fd, kubeconfig_path = tempfile.mkstemp()
with os.fdopen(fd, "w") as fp:
json.dump(kubeconfig, fp)
module.add_cleanup_file(kubeconfig_path)
if module.params.get("context") is not None:
environ_update["HELM_KUBECONTEXT"] = module.params.get("context")
if module.params.get("release_namespace"):
environ_update["HELM_NAMESPACE"] = module.params.get("release_namespace")
if module.params.get("api_key"):
environ_update["HELM_KUBETOKEN"] = module.params["api_key"]
if module.params.get("host"):
environ_update["HELM_KUBEAPISERVER"] = module.params["host"]
if module.params.get("validate_certs") is False or module.params.get("ca_cert"):
kubeconfig_path = write_temp_kubeconfig(
module.params["host"],
validate_certs=module.params["validate_certs"],
ca_cert=module.params["ca_cert"],
)
module.add_cleanup_file(kubeconfig_path)
if kubeconfig_path is not None:
environ_update["KUBECONFIG"] = kubeconfig_path
return environ_update
def run_helm(module, command, fails_on_error=True):
if not HAS_YAML:
module.fail_json(msg=missing_required_lib("PyYAML"), exception=YAML_IMP_ERR)
environ_update = prepare_helm_environ_update(module)
rc, out, err = module.run_command(command, environ_update=environ_update)
if fails_on_error and rc != 0:
module.fail_json(
msg="Failure when executing Helm command. Exited {0}.\nstdout: {1}\nstderr: {2}".format(
rc, out, err
),
stdout=out,
stderr=err,
command=command,
)
return rc, out, err
def get_values(module, command, release_name):
"""
Get Values from deployed release
"""
if not HAS_YAML:
module.fail_json(msg=missing_required_lib("PyYAML"), exception=YAML_IMP_ERR)
get_command = command + " get values --output=yaml " + release_name
rc, out, err = run_helm(module, get_command)
# Helm 3 return "null" string when no values are set
if out.rstrip("\n") == "null":
return {}
return yaml.safe_load(out)
def write_temp_kubeconfig(server, validate_certs=True, ca_cert=None):
# Workaround until https://github.com/helm/helm/pull/8622 is merged
content = {
"apiVersion": "v1",
"kind": "Config",
"clusters": [{"cluster": {"server": server}, "name": "generated-cluster"}],
"contexts": [
{"context": {"cluster": "generated-cluster"}, "name": "generated-context"}
],
"current-context": "generated-context",
}
if not validate_certs:
content["clusters"][0]["cluster"]["insecure-skip-tls-verify"] = True
if ca_cert:
content["clusters"][0]["cluster"]["certificate-authority"] = ca_cert
_fd, file_name = tempfile.mkstemp()
with os.fdopen(_fd, "w") as fp:
yaml.dump(content, fp)
return file_name
def get_helm_plugin_list(module, helm_bin=None):
"""
Return `helm plugin list`
"""
if not helm_bin:
return []
helm_plugin_list = helm_bin + " plugin list"
rc, out, err = run_helm(module, helm_plugin_list)
if rc != 0 or (out == "" and err == ""):
module.fail_json(
msg="Failed to get Helm plugin info",
command=helm_plugin_list,
stdout=out,
stderr=err,
rc=rc,
)
return (rc, out, err)
def parse_helm_plugin_list(module, output=None):
"""
Parse `helm plugin list`, return list of plugins
"""
ret = []
if not output:
return ret
for line in output:
if line.startswith("NAME"):
continue
name, version, description = line.split("\t", 3)
name = name.strip()
version = version.strip()
description = description.strip()
if name == "":
continue
ret.append((name, version, description))
return ret
def get_helm_version(module, helm_bin):
helm_version_command = helm_bin + " version"
rc, out, err = module.run_command(helm_version_command)
if rc == 0:
m = re.match(r'version.BuildInfo{Version:"v([0-9\.]*)",', out)
if m:
return m.group(1)
return None
def get_helm_binary(module):
return module.params.get("binary_path") or module.get_bin_path(
"helm", required=True
)