Add kubeconfig module for managing Kubernetes config files (#1104)

* Add kubeconfig module for managing Kubernetes config files

* Remove unnecessary requirement & Change version

* Move functions to module_utils

* Add unit tests

* Add kubeconfig module for managing Kubernetes config files

* Remove unnecessary requirement & Change version

* Move functions to module_utils

* Add unit tests

* Avoid linter errors

* Improve documentation clarity

* Redact sensitive kubeconfig information

* Imprvoe verbosity

* Move import statement for to_native to avoid linters check failure

* Fix linting error

---------

Co-authored-by: Bianca Henderson <bianca@redhat.com>
This commit is contained in:
Youssef Ali
2026-05-06 14:56:22 +03:00
committed by GitHub
parent 4d7dc2a7d1
commit e79ed52a4d
5 changed files with 886 additions and 0 deletions

View File

@@ -0,0 +1,91 @@
# Copyright (c) Ansible Project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import hashlib
import os
import traceback
try:
import yaml
IMP_YAML = True
IMP_YAML_ERR = None
except ImportError:
IMP_YAML = False
IMP_YAML_ERR = traceback.format_exc()
def load_yaml_file(path):
if not path or not os.path.exists(path):
return {}
with open(path, "r") as f:
return yaml.safe_load(f) or {}
def deep_merge(base, updates):
result = base.copy()
for key, value in updates.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = deep_merge(result[key], value)
else:
result[key] = value
return result
def merge_by_name(existing, new):
merged = {}
for item in existing:
if isinstance(item, dict) and "name" in item:
merged[item["name"]] = item
for item in new:
if not isinstance(item, dict) or "name" not in item:
continue
name = item["name"]
behavior = item.get("behavior", "merge")
item_copy = {k: v for k, v in item.items() if k != "behavior"}
if name in merged:
if behavior == "keep":
continue
elif behavior == "replace":
merged[name] = item_copy
else:
result = {"name": name}
for key in ["cluster", "user", "context"]:
if key in merged[name] or key in item_copy:
existing_config = merged[name].get(key, {})
new_config = item_copy.get(key, {})
result[key] = deep_merge(existing_config, new_config)
for key in merged[name]:
if key not in ["name", "cluster", "user", "context"]:
result[key] = merged[name][key]
for key in item_copy:
if (
key not in ["name", "cluster", "user", "context"]
and key not in result
):
result[key] = item_copy[key]
merged[name] = result
else:
merged[name] = item_copy
return list(merged.values())
def hash_data(data):
"""Generate SHA-256 hash for idempotency checking."""
return hashlib.sha256(yaml.safe_dump(data, sort_keys=True).encode()).hexdigest()
def write_file(dest, data):
if not dest:
return False
with open(dest, "w") as f:
yaml.safe_dump(data, f, sort_keys=False)
return True

View File

@@ -0,0 +1,441 @@
#!/usr/bin/python
#
# Copyright (c) Ansible Project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
DOCUMENTATION = r"""
---
module: kubeconfig
short_description: Generate, update, and optionally write Kubernetes kubeconfig files
version_added: "6.5.0"
author: "Youssef Khalid Ali (@YoussefKhalidAli)"
description:
- Build, update, and manage Kubernetes kubeconfig files using structured input.
- Supports loading an existing kubeconfig file and merging clusters, users, and contexts.
- Can optionally write the resulting kubeconfig to a destination path.
- Ensures idempotent behavior by only updating files when changes occur.
requirements:
- "PyYAML >= 5.1"
notes:
- Input data is merged by resource name (cluster, user, context).
- Updates under O(clusters), O(users), and O(contexts) are matched by C(name) against the kubeconfig loaded from O(path).
- For an existing C(name), each entry's C(behavior) suboption controls the update.
- The default is V(merge), which merges nested C(cluster), C(user), and C(context) data so unspecified keys are preserved.
- With V(replace), the previous entry for that name is dropped and only the new definition is used.
- With V(keep), the existing entry is left unchanged.
- This can be used to move kubeconfig files to a different location with different content.
- This module does not validate cluster connectivity or authentication.
- The module supports C(check_mode) and will not write files when enabled.
- The structure follows standard Kubernetes kubeconfig format as defined in the Kubernetes documentation.
- Tokens and sensitive data should be protected using ansible-vault or environment variables.
options:
path:
description:
- Path to an existing kubeconfig file to load and merge from.
- If the file does not exist, a new kubeconfig will be created.
- This becomes the default destination if O(dest) is not specified.
type: str
required: true
dest:
description:
- Destination path where the final kubeconfig should be written.
- If not specified, the kubeconfig will be saved to O(path).
- Allows copying and modifying a kubeconfig to a new location.
type: str
required: false
clusters:
description:
- List of cluster definitions to merge into the kubeconfig.
- Each cluster is identified by its C(name).
- When C(name) matches an existing cluster, the default C(behavior) is V(merge).
- See the C(behavior) suboption for V(replace) and V(keep).
type: list
elements: dict
required: false
default: []
suboptions:
name:
description:
- Unique name identifier for the cluster.
type: str
required: true
behavior:
description:
- How to handle merging if a cluster with this name already exists.
- C(merge) - Update only the specified fields, preserve others (default).
- C(replace) - Replace the entire cluster definition.
- C(keep) - Keep existing cluster, skip this entry.
type: str
choices: ['merge', 'replace', 'keep']
default: merge
cluster:
description:
- Cluster configuration details.
type: dict
required: true
suboptions:
server:
description:
- Kubernetes API server URL (e.g., C(https://k8s.example.com:6443)).
type: str
required: true
certificate-authority:
description:
- Path to a CA certificate file for validating the API server certificate.
type: str
certificate-authority-data:
description:
- Base64 encoded CA certificate data.
- Use this instead of C(certificate-authority) for embedded certificates.
type: str
insecure-skip-tls-verify:
description:
- If true, the server's certificate will not be validated.
type: bool
proxy-url:
description:
- Optional proxy URL for cluster connections.
type: str
tls-server-name:
description:
- Server name to use for server certificate validation.
type: str
users:
description:
- List of user authentication configurations.
- Each user is identified by its C(name).
- When C(name) matches an existing user, the default C(behavior) is V(merge).
- See the C(behavior) suboption for V(replace) and V(keep).
type: list
elements: dict
required: false
default: []
suboptions:
name:
description:
- Unique name identifier for the user.
type: str
required: true
behavior:
description:
- How to handle merging if a user with this name already exists.
- C(merge) - Update only the specified fields, preserve others (default).
- C(replace) - Replace the entire user definition.
- C(keep) - Keep existing user, skip this entry.
type: str
choices: ['merge', 'replace', 'keep']
default: merge
user:
description:
- User authentication configuration.
type: dict
required: true
suboptions:
token:
description:
- Bearer token for authentication.
type: str
username:
description:
- Username for basic authentication.
type: str
password:
description:
- Password for basic authentication.
type: str
client-certificate:
description:
- Path to client certificate file.
- Used for certificate-based authentication.
type: str
client-key:
description:
- Path to client private key file.
- Must be provided with C(client-certificate).
type: str
client-certificate-data:
description:
- Base64 encoded client certificate.
- Use instead of C(client-certificate) for embedded certificates.
type: str
client-key-data:
description:
- Base64 encoded client private key.
- Use instead of C(client-key) for embedded keys.
type: str
auth-provider:
description:
- Authentication provider configuration (e.g., for GCP, Azure).
type: dict
exec:
description:
- Exec-based credential plugin configuration.
- Used for external authentication providers.
type: dict
contexts:
description:
- List of context definitions linking users and clusters.
- Each context is identified by its C(name).
- When C(name) matches an existing context, the default C(behavior) is V(merge).
- See the C(behavior) suboption for V(replace) and V(keep).
type: list
elements: dict
required: false
default: []
suboptions:
name:
description:
- Unique name identifier for the context.
type: str
required: true
behavior:
description:
- How to handle merging if a context with this name already exists.
- C(merge) - Update only the specified fields, preserve others (default).
- C(replace) - Replace the entire context definition.
- C(keep) - Keep existing context, skip this entry.
type: str
choices: ['merge', 'replace', 'keep']
default: merge
context:
description:
- Context configuration linking cluster and user.
type: dict
required: true
suboptions:
cluster:
description:
- Name of the cluster to use (must match a cluster name in O(clusters)).
type: str
required: true
user:
description:
- Name of the user to authenticate as (must match a user name in O(users)).
type: str
required: true
namespace:
description:
- Default namespace to use for this context.
- If not specified, defaults to C(default).
type: str
preferences:
description:
- Kubeconfig preferences.
- Used for client-side settings like color output, default editor, etc.
type: dict
required: false
default: {}
current_context:
description:
- Name of the context to set as current/active.
- This context will be used by default when using kubectl.
- Must match one of the context names defined in O(contexts).
type: str
required: false
seealso:
- name: Kubernetes kubeconfig documentation
description: Official Kubernetes documentation for kubeconfig files
link: https://kubernetes.io/docs/concepts/configuration/organize-cluster-access-kubeconfig/
- name: kubectl config documentation
description: kubectl commands for working with kubeconfig files
link: https://kubernetes.io/docs/reference/kubectl/generated/kubectl_config/
"""
EXAMPLES = r"""
# Create a new kubeconfig file with a single cluster
- name: Create basic kubeconfig
kubernetes.core.kubeconfig:
path: /home/user/.kube/config
clusters:
- name: production-cluster
cluster:
server: https://prod.k8s.example.com:6443
certificate-authority-data: LS0tLS1CRUdJTi...
users:
- name: admin-user
user:
token: eyJhbGciOiJSUzI1NiIsImtpZCI6IiJ9...
contexts:
- name: prod-admin
context:
cluster: production-cluster
user: admin-user
namespace: production
current_context: prod-admin
- name: Copy and modify kubeconfig
kubernetes.core.kubeconfig:
path: /home/user/.kube/config
dest: /home/user/.kube/config-backup
clusters:
- name: new-cluster
cluster:
server: https://new.example.com:6443
- name: Switch current context
kubernetes.core.kubeconfig:
path: ~/.kube/config
current_context: prod-context
- name: Update user credentials
kubernetes.core.kubeconfig:
path: ~/.kube/config
users:
- name: admin-user
user:
token: "{{ new_admin_token }}"
"""
RETURN = r"""
kubeconfig:
description: The complete kubeconfig data structure.
type: dict
returned: always
dest:
description: The path where the kubeconfig was written.
type: str
returned: always
sample: /home/user/.kube/config
"""
import os
import traceback
from ansible.module_utils.basic import AnsibleModule, missing_required_lib
from ansible.module_utils.common.text.converters import to_native
from ansible_collections.kubernetes.core.plugins.module_utils.args_common import (
extract_sensitive_values_from_kubeconfig,
)
from ansible_collections.kubernetes.core.plugins.module_utils.kubeconfig import (
hash_data,
load_yaml_file,
merge_by_name,
write_file,
)
try:
import yaml
IMP_YAML = True
IMP_YAML_ERR = None
except ImportError:
IMP_YAML = False
IMP_YAML_ERR = traceback.format_exc()
def run_module():
module_args = dict(
path=dict(type="str", required=True),
dest=dict(type="str", required=False),
clusters=dict(type="list", elements="dict", required=False, default=[]),
users=dict(type="list", elements="dict", required=False, default=[]),
contexts=dict(type="list", elements="dict", required=False, default=[]),
preferences=dict(type="dict", required=False, default={}),
current_context=dict(type="str", required=False),
)
module = AnsibleModule(argument_spec=module_args, supports_check_mode=True)
path = module.params["path"]
dest = module.params["dest"] or path
clusters_input = module.params["clusters"]
users_input = module.params["users"]
contexts_input = module.params["contexts"]
preferences = module.params["preferences"]
current_context = module.params["current_context"]
# Load existing kubeconfig
try:
if not IMP_YAML:
module.fail_json(
msg=missing_required_lib("pyyaml"),
exception=IMP_YAML_ERR,
)
existing = load_yaml_file(path) if path else {}
except Exception as e:
module.fail_json(
msg="Failed to load existing kubeconfig: %s" % to_native(e),
exception=traceback.format_exc(),
)
clusters = merge_by_name(existing.get("clusters", []), clusters_input)
users = merge_by_name(existing.get("users", []), users_input)
contexts = merge_by_name(existing.get("contexts", []), contexts_input)
# Build final kubeconfig
kubeconfig = {
"apiVersion": "v1",
"kind": "Config",
"preferences": preferences or existing.get("preferences", {}),
"clusters": clusters,
"users": users,
"contexts": contexts,
"current-context": current_context or existing.get("current-context") or "",
}
changed = False
old_data = {}
if os.path.exists(dest):
try:
with open(dest, "r") as f:
old_data = yaml.safe_load(f) or {}
except Exception as e:
module.fail_json(
msg="Failed to read destination file: %s" % to_native(e),
exception=traceback.format_exc(),
)
old_hash = hash_data(old_data)
new_hash = hash_data(kubeconfig)
if old_hash != new_hash:
if not module.check_mode:
try:
write_file(dest, kubeconfig)
except Exception as e:
module.fail_json(
msg="Failed to write kubeconfig: %s" % to_native(e),
exception=traceback.format_exc(),
)
changed = True
if isinstance(kubeconfig, dict):
module.no_log_values.update(
extract_sensitive_values_from_kubeconfig(kubeconfig)
)
module.exit_json(
changed=changed,
kubeconfig=kubeconfig,
dest=dest,
msg=(
"Kubeconfig file has been updated."
if changed
else "Kubeconfig file is already up to date."
),
)
def main():
run_module()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1 @@
test_directory: /tmp

View File

@@ -0,0 +1,122 @@
---
- name: Set test variables
set_fact:
test_config_path: /tmp/test-kubeconfig
test_cluster_name: test-cluster
test_user_name: test-user
test_context_name: test-context
# Test 1: Create new kubeconfig
- name: Create new kubeconfig file
kubernetes.core.kubeconfig:
path: "{{ test_config_path }}"
clusters:
- name: "{{ test_cluster_name }}"
cluster:
server: https://test.example.com:6443
insecure-skip-tls-verify: true
users:
- name: "{{ test_user_name }}"
user:
token: test-token-123
contexts:
- name: "{{ test_context_name }}"
context:
cluster: "{{ test_cluster_name }}"
user: "{{ test_user_name }}"
namespace: default
current_context: "{{ test_context_name }}"
register: create_result
- name: Verify file was created
assert:
that:
- create_result is changed
- create_result.kubeconfig.clusters | length == 1
- create_result.kubeconfig['current-context'] == test_context_name
# Test 2: Idempotency check
- name: Run same configuration again
kubernetes.core.kubeconfig:
path: "{{ test_config_path }}"
clusters:
- name: "{{ test_cluster_name }}"
cluster:
server: https://test.example.com:6443
insecure-skip-tls-verify: true
users:
- name: "{{ test_user_name }}"
user:
token: test-token-123
contexts:
- name: "{{ test_context_name }}"
context:
cluster: "{{ test_cluster_name }}"
user: "{{ test_user_name }}"
namespace: default
current_context: "{{ test_context_name }}"
register: idempotent_result
- name: Verify idempotency
assert:
that:
- idempotent_result is not changed
# Test 3: Merge new cluster
- name: Add second cluster
kubernetes.core.kubeconfig:
path: "{{ test_config_path }}"
clusters:
- name: cluster-2
cluster:
server: https://cluster2.example.com:6443
users:
- name: user-2
user:
token: token-2
contexts:
- name: context-2
context:
cluster: cluster-2
user: user-2
register: merge_result
- name: Verify merge
assert:
that:
- merge_result is changed
- merge_result.kubeconfig.clusters | length == 2
# Test 4: Update existing entry
- name: Update cluster server
kubernetes.core.kubeconfig:
path: "{{ test_config_path }}"
clusters:
- name: "{{ test_cluster_name }}"
cluster:
server: https://updated.example.com:6443
insecure-skip-tls-verify: true
register: update_result
- name: Verify update
assert:
that:
- update_result is changed
- update_result.kubeconfig.clusters[0].cluster.server == "https://updated.example.com:6443"
# Test 5: Check mode
- name: Test check mode
kubernetes.core.kubeconfig:
path: "{{ test_config_path }}"
clusters:
- name: check-mode-cluster
cluster:
server: https://check.example.com:6443
check_mode: true
register: check_mode_result
- name: Verify check mode didn't write
assert:
that:
- check_mode_result is changed
- check_mode_result.kubeconfig.clusters | length == 3 # Includes new cluster in output

View File

@@ -0,0 +1,231 @@
# Copyright (c) Ansible Project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import yaml
from ansible_collections.kubernetes.core.plugins.module_utils.kubeconfig import (
deep_merge,
hash_data,
load_yaml_file,
merge_by_name,
write_file,
)
# load_yaml_file
def test_load_yaml_file_returns_empty_dict_for_missing_file():
assert load_yaml_file("/nonexistent/path/config") == {}
def test_load_yaml_file_returns_empty_dict_for_none():
assert load_yaml_file(None) == {}
def test_load_yaml_file_returns_empty_dict_for_empty_string():
assert load_yaml_file("") == {}
def test_load_yaml_file_loads_valid_yaml(tmp_path):
config = {"apiVersion": "v1", "kind": "Config", "clusters": []}
f = tmp_path / "config"
f.write_text(yaml.safe_dump(config))
assert load_yaml_file(str(f)) == config
def test_load_yaml_file_returns_empty_dict_for_empty_file(tmp_path):
f = tmp_path / "config"
f.write_text("")
assert load_yaml_file(str(f)) == {}
# deep_merge
def test_deep_merge_adds_new_keys():
base = {"a": 1}
updates = {"b": 2}
assert deep_merge(base, updates) == {"a": 1, "b": 2}
def test_deep_merge_overwrites_scalar():
base = {"a": 1}
updates = {"a": 99}
assert deep_merge(base, updates) == {"a": 99}
def test_deep_merge_recursively_merges_dicts():
base = {
"cluster": {
"server": "https://old.example.com",
"insecure-skip-tls-verify": True,
}
}
updates = {"cluster": {"server": "https://new.example.com"}}
result = deep_merge(base, updates)
assert result["cluster"]["server"] == "https://new.example.com"
assert result["cluster"]["insecure-skip-tls-verify"] is True
def test_deep_merge_does_not_mutate_base():
base = {"a": {"b": 1}}
updates = {"a": {"c": 2}}
deep_merge(base, updates)
assert base == {"a": {"b": 1}}
def test_deep_merge_overwrites_dict_with_scalar():
base = {"a": {"nested": 1}}
updates = {"a": "flat"}
assert deep_merge(base, updates) == {"a": "flat"}
# merge_by_name
def test_merge_by_name_adds_new_entry():
existing = []
new = [{"name": "cluster-a", "cluster": {"server": "https://a.example.com"}}]
result = merge_by_name(existing, new)
assert len(result) == 1
assert result[0]["name"] == "cluster-a"
def test_merge_by_name_preserves_existing_when_no_new():
existing = [{"name": "cluster-a", "cluster": {"server": "https://a.example.com"}}]
result = merge_by_name(existing, [])
assert len(result) == 1
assert result[0]["name"] == "cluster-a"
def test_merge_by_name_default_behavior_merges_fields():
existing = [
{
"name": "cluster-a",
"cluster": {"server": "https://old.com", "insecure-skip-tls-verify": True},
}
]
new = [{"name": "cluster-a", "cluster": {"server": "https://new.com"}}]
result = merge_by_name(existing, new)
assert len(result) == 1
assert result[0]["cluster"]["server"] == "https://new.com"
assert result[0]["cluster"]["insecure-skip-tls-verify"] is True
def test_merge_by_name_replace_behavior_replaces_entire_entry():
existing = [
{
"name": "cluster-a",
"cluster": {"server": "https://old.com", "insecure-skip-tls-verify": True},
}
]
new = [
{
"name": "cluster-a",
"behavior": "replace",
"cluster": {"server": "https://new.com"},
}
]
result = merge_by_name(existing, new)
assert result[0]["cluster"] == {"server": "https://new.com"}
assert "insecure-skip-tls-verify" not in result[0]["cluster"]
def test_merge_by_name_keep_behavior_preserves_existing():
existing = [{"name": "cluster-a", "cluster": {"server": "https://old.com"}}]
new = [
{
"name": "cluster-a",
"behavior": "keep",
"cluster": {"server": "https://new.com"},
}
]
result = merge_by_name(existing, new)
assert result[0]["cluster"]["server"] == "https://old.com"
def test_merge_by_name_behavior_key_not_in_output():
existing = []
new = [
{
"name": "cluster-a",
"behavior": "replace",
"cluster": {"server": "https://a.com"},
}
]
result = merge_by_name(existing, new)
assert "behavior" not in result[0]
def test_merge_by_name_skips_items_without_name():
existing = []
new = [{"cluster": {"server": "https://a.com"}}]
result = merge_by_name(existing, new)
assert result == []
def test_merge_by_name_skips_non_dict_items():
existing = []
new = ["not-a-dict", 42]
result = merge_by_name(existing, new)
assert result == []
def test_merge_by_name_adds_multiple_new_entries():
existing = []
new = [
{"name": "cluster-a", "cluster": {"server": "https://a.com"}},
{"name": "cluster-b", "cluster": {"server": "https://b.com"}},
]
result = merge_by_name(existing, new)
names = [r["name"] for r in result]
assert "cluster-a" in names
assert "cluster-b" in names
def test_merge_by_name_existing_non_dict_items_are_skipped():
existing = ["not-a-dict", {"cluster": {"server": "https://a.com"}}]
new = [{"name": "cluster-b", "cluster": {"server": "https://b.com"}}]
result = merge_by_name(existing, new)
assert len(result) == 1
assert result[0]["name"] == "cluster-b"
# hash_data
def test_hash_data_returns_string():
assert isinstance(hash_data({}), str)
def test_hash_data_different_input_different_hash():
assert hash_data({"a": 1}) != hash_data({"a": 2})
def test_hash_data_order_independent():
a = {"x": 1, "y": 2}
b = {"y": 2, "x": 1}
assert hash_data(a) == hash_data(b)
# write_file
def test_write_file_returns_false_for_empty_dest():
assert write_file("", {"apiVersion": "v1"}) is False
def test_write_file_returns_false_for_none_dest():
assert write_file(None, {"apiVersion": "v1"}) is False
def test_write_file_writes_valid_yaml(tmp_path):
dest = str(tmp_path / "config")
data = {"apiVersion": "v1", "kind": "Config"}
result = write_file(dest, data)
assert result is True
with open(dest, "r") as f:
written = yaml.safe_load(f)
assert written == data
def test_write_file_overwrites_existing_file(tmp_path):
dest = str(tmp_path / "config")
write_file(dest, {"apiVersion": "v1"})
write_file(dest, {"apiVersion": "v2"})
with open(dest, "r") as f:
written = yaml.safe_load(f)
assert written["apiVersion"] == "v2"