Files
community.okd/plugins/module_utils/openshift_adm_prune_auth.py
Bikouo Aubin a63e5b7b36 Update CI - Continue work from #195 (#202)
* Upgrade Ansible and OKD versions for CI

* Use ubi9 and fix sanity

* Use correct pip install

* Try using quotes

* Ensure python3.9

* Upgrade ansible and molecule versions

* Remove DeploymentConfig

DeploymentConfigs are deprecated and seem to now be causing idempotence
problems. Replacing them with Deployments fixes it.

* Attempt to fix ldap integration tests

Signed-off-by: Alina Buzachis <abuzachis@redhat.com>

* Move sanity and unit tests to GH actions

Signed-off-by: Alina Buzachis <abuzachis@redhat.com>

* Firt round of sanity fixes

Signed-off-by: Alina Buzachis <abuzachis@redhat.com>

* Add kubernetes.core collection as sanity requirement

Signed-off-by: Alina Buzachis <abuzachis@redhat.com>

* Add ignore-2.16.txt

Signed-off-by: Alina Buzachis <abuzachis@redhat.com>

* Attempt to fix units

Signed-off-by: Alina Buzachis <abuzachis@redhat.com>

* Add ignore-2.17

Signed-off-by: Alina Buzachis <abuzachis@redhat.com>

* Attempt to fix unit tests

Signed-off-by: Alina Buzachis <abuzachis@redhat.com>

* Add pytest-ansible to test-requirements.txt

Signed-off-by: Alina Buzachis <abuzachis@redhat.com>

* Add changelog fragment

Signed-off-by: Alina Buzachis <abuzachis@redhat.com>

* Add workflow for ansible-lint

Signed-off-by: Alina Buzachis <abuzachis@redhat.com>

* Apply black

Signed-off-by: Alina Buzachis <abuzachis@redhat.com>

* Fix linters

Signed-off-by: Alina Buzachis <abuzachis@redhat.com>

* Add # fmt: skip

Signed-off-by: Alina Buzachis <abuzachis@redhat.com>

* Yet another round of linting

Signed-off-by: Alina Buzachis <abuzachis@redhat.com>

* Yet another round of linting

Signed-off-by: Alina Buzachis <abuzachis@redhat.com>

* Remove setup.cfg

Signed-off-by: Alina Buzachis <abuzachis@redhat.com>

* Revert #fmt

Signed-off-by: Alina Buzachis <abuzachis@redhat.com>

* Use ansible-core 2.14

Signed-off-by: Alina Buzachis <abuzachis@redhat.com>

* Cleanup ansible-lint ignores

Signed-off-by: Alina Buzachis <abuzachis@redhat.com>

* Try using service instead of pod IP

* Fix typo

* Actually use the correct port

* See if NetworkPolicy is preventing connection

* using Pod internal IP

* fix adm prune auth roles syntax

* adding some retry steps

* fix: openshift_builds target

* add flag --force-with-deps when building downstream collection

* Remove yamllint from tox linters, bump minimum python supported version to 3.9, Remove support for ansible-core < 2.14

---------

Signed-off-by: Alina Buzachis <abuzachis@redhat.com>
Co-authored-by: Mike Graves <mgraves@redhat.com>
Co-authored-by: Alina Buzachis <abuzachis@redhat.com>
2023-11-15 17:00:38 +00:00

381 lines
15 KiB
Python

#!/usr/bin/env python
from __future__ import absolute_import, division, print_function
__metaclass__ = type
from ansible.module_utils._text import to_native
from ansible_collections.community.okd.plugins.module_utils.openshift_common import (
AnsibleOpenshiftModule,
)
try:
from kubernetes import client
from kubernetes.dynamic.exceptions import DynamicApiError, NotFoundError
except ImportError:
pass
class OpenShiftAdmPruneAuth(AnsibleOpenshiftModule):
def __init__(self, **kwargs):
super(OpenShiftAdmPruneAuth, self).__init__(**kwargs)
def prune_resource_binding(
self, kind, api_version, ref_kind, ref_namespace_names, propagation_policy=None
):
resource = self.find_resource(kind=kind, api_version=api_version, fail=True)
candidates = []
for ref_namespace, ref_name in ref_namespace_names:
try:
result = resource.get(name=None, namespace=ref_namespace)
result = result.to_dict()
result = result.get("items") if "items" in result else [result]
for obj in result:
namespace = obj["metadata"].get("namespace", None)
name = obj["metadata"].get("name")
if ref_kind and obj["roleRef"]["kind"] != ref_kind:
# skip this binding as the roleRef.kind does not match
continue
if obj["roleRef"]["name"] == ref_name:
# select this binding as the roleRef.name match
candidates.append((namespace, name))
except NotFoundError:
continue
except DynamicApiError as exc:
msg = "Failed to get {kind} resource due to: {msg}".format(
kind=kind, msg=exc.body
)
self.fail_json(msg=msg)
except Exception as e:
msg = "Failed to get {kind} due to: {msg}".format(
kind=kind, msg=to_native(e)
)
self.fail_json(msg=msg)
if len(candidates) == 0 or self.check_mode:
return [y if x is None else x + "/" + y for x, y in candidates]
delete_options = client.V1DeleteOptions()
if propagation_policy:
delete_options.propagation_policy = propagation_policy
for namespace, name in candidates:
try:
result = resource.delete(
name=name, namespace=namespace, body=delete_options
)
except DynamicApiError as exc:
msg = "Failed to delete {kind} {namespace}/{name} due to: {msg}".format(
kind=kind, namespace=namespace, name=name, msg=exc.body
)
self.fail_json(msg=msg)
except Exception as e:
msg = "Failed to delete {kind} {namespace}/{name} due to: {msg}".format(
kind=kind, namespace=namespace, name=name, msg=to_native(e)
)
self.fail_json(msg=msg)
return [y if x is None else x + "/" + y for x, y in candidates]
def update_resource_binding(self, ref_kind, ref_names, namespaced=False):
kind = "ClusterRoleBinding"
api_version = "rbac.authorization.k8s.io/v1"
if namespaced:
kind = "RoleBinding"
resource = self.find_resource(kind=kind, api_version=api_version, fail=True)
result = resource.get(name=None, namespace=None).to_dict()
result = result.get("items") if "items" in result else [result]
if len(result) == 0:
return [], False
def _update_user_group(binding_namespace, subjects):
users, groups = [], []
for x in subjects:
if x["kind"] == "User":
users.append(x["name"])
elif x["kind"] == "Group":
groups.append(x["name"])
elif x["kind"] == "ServiceAccount":
namespace = binding_namespace
if x.get("namespace") is not None:
namespace = x.get("namespace")
if namespace is not None:
users.append(
"system:serviceaccount:%s:%s" % (namespace, x["name"])
)
return users, groups
candidates = []
changed = False
for item in result:
subjects = item.get("subjects", [])
retainedSubjects = [
x for x in subjects if x["kind"] == ref_kind and x["name"] in ref_names
]
if len(subjects) != len(retainedSubjects):
updated_binding = item
updated_binding["subjects"] = retainedSubjects
binding_namespace = item["metadata"].get("namespace", None)
(
updated_binding["userNames"],
updated_binding["groupNames"],
) = _update_user_group(binding_namespace, retainedSubjects)
candidates.append(
binding_namespace + "/" + item["metadata"]["name"]
if binding_namespace
else item["metadata"]["name"]
)
changed = True
if not self.check_mode:
try:
resource.apply(updated_binding, namespace=binding_namespace)
except DynamicApiError as exc:
msg = "Failed to apply object due to: {0}".format(exc.body)
self.fail_json(msg=msg)
return candidates, changed
def update_security_context(self, ref_names, key):
params = {
"kind": "SecurityContextConstraints",
"api_version": "security.openshift.io/v1",
}
sccs = self.kubernetes_facts(**params)
if not sccs["api_found"]:
self.fail_json(msg=sccs["msg"])
sccs = sccs.get("resources")
candidates = []
changed = False
resource = self.find_resource(
kind="SecurityContextConstraints", api_version="security.openshift.io/v1"
)
for item in sccs:
subjects = item.get(key, [])
retainedSubjects = [x for x in subjects if x not in ref_names]
if len(subjects) != len(retainedSubjects):
candidates.append(item["metadata"]["name"])
changed = True
if not self.check_mode:
upd_sec_ctx = item
upd_sec_ctx.update({key: retainedSubjects})
try:
resource.apply(upd_sec_ctx, namespace=None)
except DynamicApiError as exc:
msg = "Failed to apply object due to: {0}".format(exc.body)
self.fail_json(msg=msg)
return candidates, changed
def auth_prune_roles(self):
params = {
"kind": "Role",
"api_version": "rbac.authorization.k8s.io/v1",
"namespace": self.params.get("namespace"),
}
for attr in ("name", "label_selectors"):
if self.params.get(attr):
params[attr] = self.params.get(attr)
result = self.kubernetes_facts(**params)
if not result["api_found"]:
self.fail_json(msg=result["msg"])
roles = result.get("resources")
if len(roles) == 0:
self.exit_json(
changed=False,
msg="No candidate rolebinding to prune from namespace %s."
% self.params.get("namespace"),
)
ref_roles = [(x["metadata"]["namespace"], x["metadata"]["name"]) for x in roles]
candidates = self.prune_resource_binding(
kind="RoleBinding",
api_version="rbac.authorization.k8s.io/v1",
ref_kind="Role",
ref_namespace_names=ref_roles,
propagation_policy="Foreground",
)
if len(candidates) == 0:
self.exit_json(changed=False, role_binding=candidates)
self.exit_json(changed=True, role_binding=candidates)
def auth_prune_clusterroles(self):
params = {"kind": "ClusterRole", "api_version": "rbac.authorization.k8s.io/v1"}
for attr in ("name", "label_selectors"):
if self.params.get(attr):
params[attr] = self.params.get(attr)
result = self.kubernetes_facts(**params)
if not result["api_found"]:
self.fail_json(msg=result["msg"])
clusterroles = result.get("resources")
if len(clusterroles) == 0:
self.exit_json(
changed=False, msg="No clusterroles found matching input criteria."
)
ref_clusterroles = [(None, x["metadata"]["name"]) for x in clusterroles]
# Prune ClusterRoleBinding
candidates_cluster_binding = self.prune_resource_binding(
kind="ClusterRoleBinding",
api_version="rbac.authorization.k8s.io/v1",
ref_kind=None,
ref_namespace_names=ref_clusterroles,
)
# Prune Role Binding
candidates_namespaced_binding = self.prune_resource_binding(
kind="RoleBinding",
api_version="rbac.authorization.k8s.io/v1",
ref_kind="ClusterRole",
ref_namespace_names=ref_clusterroles,
)
self.exit_json(
changed=True,
cluster_role_binding=candidates_cluster_binding,
role_binding=candidates_namespaced_binding,
)
def list_groups(self, params=None):
options = {"kind": "Group", "api_version": "user.openshift.io/v1"}
if params:
for attr in ("name", "label_selectors"):
if params.get(attr):
options[attr] = params.get(attr)
return self.kubernetes_facts(**options)
def auth_prune_users(self):
params = {"kind": "User", "api_version": "user.openshift.io/v1"}
for attr in ("name", "label_selectors"):
if self.params.get(attr):
params[attr] = self.params.get(attr)
users = self.kubernetes_facts(**params)
if len(users) == 0:
self.exit_json(
changed=False,
msg="No resource type 'User' found matching input criteria.",
)
names = [x["metadata"]["name"] for x in users]
changed = False
# Remove the user role binding
rolebinding, changed_role = self.update_resource_binding(
ref_kind="User", ref_names=names, namespaced=True
)
changed = changed or changed_role
# Remove the user cluster role binding
clusterrolesbinding, changed_cr = self.update_resource_binding(
ref_kind="User", ref_names=names
)
changed = changed or changed_cr
# Remove the user from security context constraints
sccs, changed_sccs = self.update_security_context(names, "users")
changed = changed or changed_sccs
# Remove the user from groups
groups = self.list_groups()
deleted_groups = []
resource = self.find_resource(kind="Group", api_version="user.openshift.io/v1")
for grp in groups:
subjects = grp.get("users", [])
retainedSubjects = [x for x in subjects if x not in names]
if len(subjects) != len(retainedSubjects):
deleted_groups.append(grp["metadata"]["name"])
changed = True
if not self.check_mode:
upd_group = grp
upd_group.update({"users": retainedSubjects})
try:
resource.apply(upd_group, namespace=None)
except DynamicApiError as exc:
msg = "Failed to apply object due to: {0}".format(exc.body)
self.fail_json(msg=msg)
# Remove the user's OAuthClientAuthorizations
oauth = self.kubernetes_facts(
kind="OAuthClientAuthorization", api_version="oauth.openshift.io/v1"
)
deleted_auths = []
resource = self.find_resource(
kind="OAuthClientAuthorization", api_version="oauth.openshift.io/v1"
)
for authorization in oauth:
if authorization.get("userName", None) in names:
auth_name = authorization["metadata"]["name"]
deleted_auths.append(auth_name)
changed = True
if not self.check_mode:
try:
resource.delete(
name=auth_name,
namespace=None,
body=client.V1DeleteOptions(),
)
except DynamicApiError as exc:
msg = "Failed to delete OAuthClientAuthorization {name} due to: {msg}".format(
name=auth_name, msg=exc.body
)
self.fail_json(msg=msg)
except Exception as e:
msg = "Failed to delete OAuthClientAuthorization {name} due to: {msg}".format(
name=auth_name, msg=to_native(e)
)
self.fail_json(msg=msg)
self.exit_json(
changed=changed,
cluster_role_binding=clusterrolesbinding,
role_binding=rolebinding,
security_context_constraints=sccs,
authorization=deleted_auths,
group=deleted_groups,
)
def auth_prune_groups(self):
groups = self.list_groups(params=self.params)
if len(groups) == 0:
self.exit_json(
changed=False,
result="No resource type 'Group' found matching input criteria.",
)
names = [x["metadata"]["name"] for x in groups]
changed = False
# Remove the groups role binding
rolebinding, changed_role = self.update_resource_binding(
ref_kind="Group", ref_names=names, namespaced=True
)
changed = changed or changed_role
# Remove the groups cluster role binding
clusterrolesbinding, changed_cr = self.update_resource_binding(
ref_kind="Group", ref_names=names
)
changed = changed or changed_cr
# Remove the groups security context constraints
sccs, changed_sccs = self.update_security_context(names, "groups")
changed = changed or changed_sccs
self.exit_json(
changed=changed,
cluster_role_binding=clusterrolesbinding,
role_binding=rolebinding,
security_context_constraints=sccs,
)
def execute_module(self):
auth_prune = {
"roles": self.auth_prune_roles,
"clusterroles": self.auth_prune_clusterroles,
"users": self.auth_prune_users,
"groups": self.auth_prune_groups,
}
auth_prune[self.params.get("resource")]()