#!/usr/bin/env python from __future__ import (absolute_import, division, print_function) __metaclass__ = type from ansible.module_utils._text import to_native from ansible_collections.community.okd.plugins.module_utils.openshift_common import AnsibleOpenshiftModule try: from kubernetes import client from kubernetes.dynamic.exceptions import DynamicApiError, NotFoundError except ImportError: pass class OpenShiftAdmPruneAuth(AnsibleOpenshiftModule): def __init__(self, **kwargs): super(OpenShiftAdmPruneAuth, self).__init__(**kwargs) def prune_resource_binding(self, kind, api_version, ref_kind, ref_namespace_names, propagation_policy=None): resource = self.find_resource(kind=kind, api_version=api_version, fail=True) candidates = [] for ref_namespace, ref_name in ref_namespace_names: try: result = resource.get(name=None, namespace=ref_namespace) result = result.to_dict() result = result.get('items') if 'items' in result else [result] for obj in result: namespace = obj['metadata'].get('namespace', None) name = obj['metadata'].get('name') if ref_kind and obj['roleRef']['kind'] != ref_kind: # skip this binding as the roleRef.kind does not match continue if obj['roleRef']['name'] == ref_name: # select this binding as the roleRef.name match candidates.append((namespace, name)) except NotFoundError: continue except DynamicApiError as exc: msg = "Failed to get {kind} resource due to: {msg}".format(kind=kind, msg=exc.body) self.fail_json(msg=msg) except Exception as e: msg = "Failed to get {kind} due to: {msg}".format(kind=kind, msg=to_native(e)) self.fail_json(msg=msg) if len(candidates) == 0 or self.check_mode: return [y if x is None else x + "/" + y for x, y in candidates] delete_options = client.V1DeleteOptions() if propagation_policy: delete_options.propagation_policy = propagation_policy for namespace, name in candidates: try: result = resource.delete(name=name, namespace=namespace, body=delete_options) except DynamicApiError as exc: msg = "Failed to delete {kind} {namespace}/{name} due to: {msg}".format(kind=kind, namespace=namespace, name=name, msg=exc.body) self.fail_json(msg=msg) except Exception as e: msg = "Failed to delete {kind} {namespace}/{name} due to: {msg}".format(kind=kind, namespace=namespace, name=name, msg=to_native(e)) self.fail_json(msg=msg) return [y if x is None else x + "/" + y for x, y in candidates] def update_resource_binding(self, ref_kind, ref_names, namespaced=False): kind = 'ClusterRoleBinding' api_version = "rbac.authorization.k8s.io/v1" if namespaced: kind = "RoleBinding" resource = self.find_resource(kind=kind, api_version=api_version, fail=True) result = resource.get(name=None, namespace=None).to_dict() result = result.get('items') if 'items' in result else [result] if len(result) == 0: return [], False def _update_user_group(binding_namespace, subjects): users, groups = [], [] for x in subjects: if x['kind'] == 'User': users.append(x['name']) elif x['kind'] == 'Group': groups.append(x['name']) elif x['kind'] == 'ServiceAccount': namespace = binding_namespace if x.get('namespace') is not None: namespace = x.get('namespace') if namespace is not None: users.append("system:serviceaccount:%s:%s" % (namespace, x['name'])) return users, groups candidates = [] changed = False for item in result: subjects = item.get('subjects', []) retainedSubjects = [x for x in subjects if x['kind'] == ref_kind and x['name'] in ref_names] if len(subjects) != len(retainedSubjects): updated_binding = item updated_binding['subjects'] = retainedSubjects binding_namespace = item['metadata'].get('namespace', None) updated_binding['userNames'], updated_binding['groupNames'] = _update_user_group(binding_namespace, retainedSubjects) candidates.append(binding_namespace + "/" + item['metadata']['name'] if binding_namespace else item['metadata']['name']) changed = True if not self.check_mode: try: resource.apply(updated_binding, namespace=binding_namespace) except DynamicApiError as exc: msg = "Failed to apply object due to: {0}".format(exc.body) self.fail_json(msg=msg) return candidates, changed def update_security_context(self, ref_names, key): params = {'kind': 'SecurityContextConstraints', 'api_version': 'security.openshift.io/v1'} sccs = self.kubernetes_facts(**params) if not sccs['api_found']: self.fail_json(msg=sccs['msg']) sccs = sccs.get('resources') candidates = [] changed = False resource = self.find_resource(kind="SecurityContextConstraints", api_version="security.openshift.io/v1") for item in sccs: subjects = item.get(key, []) retainedSubjects = [x for x in subjects if x not in ref_names] if len(subjects) != len(retainedSubjects): candidates.append(item['metadata']['name']) changed = True if not self.check_mode: upd_sec_ctx = item upd_sec_ctx.update({key: retainedSubjects}) try: resource.apply(upd_sec_ctx, namespace=None) except DynamicApiError as exc: msg = "Failed to apply object due to: {0}".format(exc.body) self.fail_json(msg=msg) return candidates, changed def auth_prune_roles(self): params = {'kind': 'Role', 'api_version': 'rbac.authorization.k8s.io/v1', 'namespace': self.params.get('namespace')} for attr in ('name', 'label_selectors'): if self.params.get(attr): params[attr] = self.params.get(attr) result = self.kubernetes_facts(**params) if not result['api_found']: self.fail_json(msg=result['msg']) roles = result.get('resources') if len(roles) == 0: self.exit_json(changed=False, msg="No candidate rolebinding to prune from namespace %s." % self.params.get('namespace')) ref_roles = [(x['metadata']['namespace'], x['metadata']['name']) for x in roles] candidates = self.prune_resource_binding(kind="RoleBinding", api_version="rbac.authorization.k8s.io/v1", ref_kind="Role", ref_namespace_names=ref_roles, propagation_policy='Foreground') if len(candidates) == 0: self.exit_json(changed=False, role_binding=candidates) self.exit_json(changed=True, role_binding=candidates) def auth_prune_clusterroles(self): params = {'kind': 'ClusterRole', 'api_version': 'rbac.authorization.k8s.io/v1'} for attr in ('name', 'label_selectors'): if self.params.get(attr): params[attr] = self.params.get(attr) result = self.kubernetes_facts(**params) if not result['api_found']: self.fail_json(msg=result['msg']) clusterroles = result.get('resources') if len(clusterroles) == 0: self.exit_json(changed=False, msg="No clusterroles found matching input criteria.") ref_clusterroles = [(None, x['metadata']['name']) for x in clusterroles] # Prune ClusterRoleBinding candidates_cluster_binding = self.prune_resource_binding(kind="ClusterRoleBinding", api_version="rbac.authorization.k8s.io/v1", ref_kind=None, ref_namespace_names=ref_clusterroles) # Prune Role Binding candidates_namespaced_binding = self.prune_resource_binding(kind="RoleBinding", api_version="rbac.authorization.k8s.io/v1", ref_kind='ClusterRole', ref_namespace_names=ref_clusterroles) self.exit_json(changed=True, cluster_role_binding=candidates_cluster_binding, role_binding=candidates_namespaced_binding) def list_groups(self, params=None): options = {'kind': 'Group', 'api_version': 'user.openshift.io/v1'} if params: for attr in ('name', 'label_selectors'): if params.get(attr): options[attr] = params.get(attr) return self.kubernetes_facts(**options) def auth_prune_users(self): params = {'kind': 'User', 'api_version': 'user.openshift.io/v1'} for attr in ('name', 'label_selectors'): if self.params.get(attr): params[attr] = self.params.get(attr) users = self.kubernetes_facts(**params) if len(users) == 0: self.exit_json(changed=False, msg="No resource type 'User' found matching input criteria.") names = [x['metadata']['name'] for x in users] changed = False # Remove the user role binding rolebinding, changed_role = self.update_resource_binding(ref_kind="User", ref_names=names, namespaced=True) changed = changed or changed_role # Remove the user cluster role binding clusterrolesbinding, changed_cr = self.update_resource_binding(ref_kind="User", ref_names=names) changed = changed or changed_cr # Remove the user from security context constraints sccs, changed_sccs = self.update_security_context(names, 'users') changed = changed or changed_sccs # Remove the user from groups groups = self.list_groups() deleted_groups = [] resource = self.find_resource(kind="Group", api_version="user.openshift.io/v1") for grp in groups: subjects = grp.get('users', []) retainedSubjects = [x for x in subjects if x not in names] if len(subjects) != len(retainedSubjects): deleted_groups.append(grp['metadata']['name']) changed = True if not self.check_mode: upd_group = grp upd_group.update({'users': retainedSubjects}) try: resource.apply(upd_group, namespace=None) except DynamicApiError as exc: msg = "Failed to apply object due to: {0}".format(exc.body) self.fail_json(msg=msg) # Remove the user's OAuthClientAuthorizations oauth = self.kubernetes_facts(kind='OAuthClientAuthorization', api_version='oauth.openshift.io/v1') deleted_auths = [] resource = self.find_resource(kind="OAuthClientAuthorization", api_version="oauth.openshift.io/v1") for authorization in oauth: if authorization.get('userName', None) in names: auth_name = authorization['metadata']['name'] deleted_auths.append(auth_name) changed = True if not self.check_mode: try: resource.delete(name=auth_name, namespace=None, body=client.V1DeleteOptions()) except DynamicApiError as exc: msg = "Failed to delete OAuthClientAuthorization {name} due to: {msg}".format(name=auth_name, msg=exc.body) self.fail_json(msg=msg) except Exception as e: msg = "Failed to delete OAuthClientAuthorization {name} due to: {msg}".format(name=auth_name, msg=to_native(e)) self.fail_json(msg=msg) self.exit_json(changed=changed, cluster_role_binding=clusterrolesbinding, role_binding=rolebinding, security_context_constraints=sccs, authorization=deleted_auths, group=deleted_groups) def auth_prune_groups(self): groups = self.list_groups(params=self.params) if len(groups) == 0: self.exit_json(changed=False, result="No resource type 'Group' found matching input criteria.") names = [x['metadata']['name'] for x in groups] changed = False # Remove the groups role binding rolebinding, changed_role = self.update_resource_binding(ref_kind="Group", ref_names=names, namespaced=True) changed = changed or changed_role # Remove the groups cluster role binding clusterrolesbinding, changed_cr = self.update_resource_binding(ref_kind="Group", ref_names=names) changed = changed or changed_cr # Remove the groups security context constraints sccs, changed_sccs = self.update_security_context(names, 'groups') changed = changed or changed_sccs self.exit_json(changed=changed, cluster_role_binding=clusterrolesbinding, role_binding=rolebinding, security_context_constraints=sccs) def execute_module(self): auth_prune = { 'roles': self.auth_prune_roles, 'clusterroles': self.auth_prune_clusterroles, 'users': self.auth_prune_users, 'groups': self.auth_prune_groups, } auth_prune[self.params.get('resource')]()