mirror of
https://github.com/ansible-middleware/keycloak.git
synced 2026-06-13 12:05:54 +00:00
This commit is contained in:
@@ -59,6 +59,9 @@ URL_GROUP_CHILDREN = "{url}/admin/realms/{realm}/groups/{groupid}/children"
|
||||
|
||||
URL_CLIENTSCOPES = "{url}/admin/realms/{realm}/client-scopes"
|
||||
URL_CLIENTSCOPE = "{url}/admin/realms/{realm}/client-scopes/{id}"
|
||||
URL_CLIENTSCOPE_SCOPE_MAPPINGS = "{url}/admin/realms/{realm}/client-scopes/{id}/scope-mappings"
|
||||
URL_CLIENTSCOPE_SCOPE_MAPPINGS_REALM = "{url}/admin/realms/{realm}/client-scopes/{id}/scope-mappings/realm"
|
||||
URL_CLIENTSCOPE_SCOPE_MAPPINGS_CLIENT = "{url}/admin/realms/{realm}/client-scopes/{id}/scope-mappings/clients/{client}"
|
||||
URL_CLIENTSCOPE_PROTOCOLMAPPERS = "{url}/admin/realms/{realm}/client-scopes/{id}/protocol-mappers/models"
|
||||
URL_CLIENTSCOPE_PROTOCOLMAPPER = "{url}/admin/realms/{realm}/client-scopes/{id}/protocol-mappers/models/{mapper_id}"
|
||||
|
||||
@@ -331,41 +334,44 @@ def get_token(module_params: dict[str, t.Any]) -> dict[str, str]:
|
||||
return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
||||
|
||||
|
||||
def is_struct_included(struct1: object, struct2: object, exclude: Sequence[str] | None = None) -> bool:
|
||||
def is_struct_included(
|
||||
struct1: dict | list | bool | int | str,
|
||||
struct2: dict | list | bool | int | str,
|
||||
exclude: Sequence[str] | None = None,
|
||||
empty_list_result: bool = True,
|
||||
) -> bool:
|
||||
"""
|
||||
This function compare if the first parameter structure is included in the second.
|
||||
The function use every elements of struct1 and validates they are present in the struct2 structure.
|
||||
The two structure does not need to be equals for that function to return true.
|
||||
Each elements are compared recursively.
|
||||
:param struct1:
|
||||
type:
|
||||
dict for the initial call, can be dict, list, bool, int or str for recursive calls
|
||||
description:
|
||||
reference structure
|
||||
:param struct2:
|
||||
type:
|
||||
dict for the initial call, can be dict, list, bool, int or str for recursive calls
|
||||
description:
|
||||
structure to compare with first parameter.
|
||||
:param exclude:
|
||||
type:
|
||||
list
|
||||
description:
|
||||
Key to exclude from the comparison.
|
||||
default: None
|
||||
:param empty_list_result:
|
||||
description:
|
||||
Return this value, when struct1 is an empty list.
|
||||
:return:
|
||||
type:
|
||||
bool
|
||||
description:
|
||||
Return True if all element of dict 1 are present in dict 2, return false otherwise.
|
||||
"""
|
||||
if isinstance(struct1, list) and isinstance(struct2, list):
|
||||
if not struct1 and not struct2:
|
||||
return True
|
||||
|
||||
if not struct1:
|
||||
return empty_list_result
|
||||
|
||||
for item1 in struct1:
|
||||
if isinstance(item1, (list, dict)):
|
||||
for item2 in struct2:
|
||||
if is_struct_included(item1, item2, exclude):
|
||||
if is_struct_included(item1, item2, exclude, empty_list_result):
|
||||
break
|
||||
else:
|
||||
return False
|
||||
@@ -379,7 +385,7 @@ def is_struct_included(struct1: object, struct2: object, exclude: Sequence[str]
|
||||
try:
|
||||
for key in struct1:
|
||||
if not (exclude and key in exclude):
|
||||
if not is_struct_included(struct1[key], struct2[key], exclude):
|
||||
if not is_struct_included(struct1[key], struct2[key], exclude, empty_list_result):
|
||||
return False
|
||||
except KeyError:
|
||||
return False
|
||||
@@ -2937,7 +2943,7 @@ class KeycloakAPI:
|
||||
:return: Representation of the user.
|
||||
"""
|
||||
try:
|
||||
user_url = URL_USER.format(url=self.baseurl, realm=realm, id=user_id)
|
||||
user_url = URL_USER.format(url=self.baseurl, realm=realm, id=user_id) + "?userProfileMetadata=True"
|
||||
userrep = json.load(self._request(user_url, method="GET"))
|
||||
return userrep
|
||||
except Exception as e:
|
||||
@@ -3108,11 +3114,19 @@ class KeycloakAPI:
|
||||
realm_group = self.find_group_by_path(group_to_add, realm=realm)
|
||||
if realm_group:
|
||||
self.add_user_to_group(user_id=userrep["id"], group_id=realm_group["id"], realm=realm)
|
||||
else:
|
||||
self.module.fail_json(
|
||||
msg=f"Could not update group membership for user {userrep['username']} in realm {realm}: group not found {group_to_add}"
|
||||
)
|
||||
|
||||
for group_to_remove in groups_to_remove:
|
||||
realm_group = self.find_group_by_path(group_to_remove, realm=realm)
|
||||
if realm_group:
|
||||
self.remove_user_from_group(user_id=userrep["id"], group_id=realm_group["id"], realm=realm)
|
||||
else:
|
||||
self.module.fail_json(
|
||||
msg=f"Could not update group membership for user {userrep['username']} in realm {realm}: group not found {group_to_remove}"
|
||||
)
|
||||
|
||||
return True
|
||||
except Exception as e:
|
||||
@@ -3257,6 +3271,52 @@ class KeycloakAPI:
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def get_all_clientscope_scope_mappings(self, clientscope_id, realm: str = "master"):
|
||||
"""Fetch all (realm and client) roles (scope-mappings) associated with the clientscope for a specific clientscope on the Keycloak server.
|
||||
:param clientscope_id: ID of the clientscope from which to obtain the associated roles.
|
||||
:param realm: Realm from which to obtain the scope.
|
||||
:return: The clientscope scope-mappings.
|
||||
"""
|
||||
client_role_scope_url = URL_CLIENTSCOPE_SCOPE_MAPPINGS.format(url=self.baseurl, realm=realm, id=clientscope_id)
|
||||
try:
|
||||
return self._request_and_deserialize(client_role_scope_url, method="GET")
|
||||
except Exception as e:
|
||||
self.fail_request(e, msg=f"Could not fetch roles for client-scope {clientscope_id} in realm {realm}: {e}")
|
||||
|
||||
def get_clientscope_scope_mappings_realm(self, clientscope_id, realm: str = "master"):
|
||||
"""Fetch the realm roles (scope-mappings) associated with the clientscope for a specific clientscope on the Keycloak server.
|
||||
:param clientscope_id: ID of the clientscope from which to obtain the associated roles.
|
||||
:param realm: Realm from which to obtain the scope.
|
||||
:return: The clientscope realm scope-mappings.
|
||||
"""
|
||||
client_role_scope_url = URL_CLIENTSCOPE_SCOPE_MAPPINGS_REALM.format(
|
||||
url=self.baseurl, realm=realm, id=clientscope_id
|
||||
)
|
||||
try:
|
||||
return self._request_and_deserialize(client_role_scope_url, method="GET")
|
||||
except Exception as e:
|
||||
self.fail_request(
|
||||
e, msg=f"Could not fetch realm roles for client-scope {clientscope_id} in realm {realm}: {e}"
|
||||
)
|
||||
|
||||
def get_clientscope_scope_mappings_client(self, clientscope_id, client_id, realm: str = "master"):
|
||||
"""Fetch the client roles (scope-mappings) associated with the clientscope for a specific clientscope and client on the Keycloak server.
|
||||
:param clientscope_id: ID of the clientscope from which to obtain the associated roles.
|
||||
:param clientid: ID of the client from which to obtain the associated roles.
|
||||
:param realm: Realm from which to obtain the scope.
|
||||
:return: The clientscope client scope-mappings.
|
||||
"""
|
||||
client_role_scope_url = URL_CLIENTSCOPE_SCOPE_MAPPINGS_CLIENT.format(
|
||||
url=self.baseurl, realm=realm, id=clientscope_id, client=client_id
|
||||
)
|
||||
try:
|
||||
return self._request_and_deserialize(client_role_scope_url, method="GET")
|
||||
except Exception as e:
|
||||
self.fail_request(
|
||||
e,
|
||||
msg=f"Could not fetch client roles from client {client_id} for client-scope {clientscope_id} in realm {realm}: {e}",
|
||||
)
|
||||
|
||||
def get_client_role_scope_from_client(self, clientid, clientscopeid, realm: str = "master"):
|
||||
"""Fetch the roles associated with the client's scope for a specific client on the Keycloak server.
|
||||
:param clientid: ID of the client from which to obtain the associated roles.
|
||||
@@ -3310,6 +3370,94 @@ class KeycloakAPI:
|
||||
|
||||
return self.get_client_role_scope_from_client(clientid, clientscopeid, realm)
|
||||
|
||||
def update_clientscope_scope_mappings_client(
|
||||
self, payload: list[dict], clientscope_id: str, client_id: str, realm: str = "master"
|
||||
):
|
||||
"""Update and fetch the client roles (scope-mappings) associated with the clientscope on the Keycloak server.
|
||||
:param payload: List of client roles to be added to the scope.
|
||||
:param clientscope_id: ID of the clientscope to update scope-mappings.
|
||||
:param clientid: ID of the client from which to obtain the associated roles.
|
||||
:param realm: Realm from which to obtain the client.
|
||||
:return: The clientscope client scope-mappings.
|
||||
"""
|
||||
client_role_scope_url = URL_CLIENTSCOPE_SCOPE_MAPPINGS_CLIENT.format(
|
||||
url=self.baseurl, realm=realm, id=clientscope_id, client=client_id
|
||||
)
|
||||
try:
|
||||
self._request(client_role_scope_url, method="POST", data=json.dumps(payload))
|
||||
|
||||
except Exception as e:
|
||||
self.fail_request(
|
||||
e,
|
||||
msg=f"Could not update scope mappings for client-scope {client_id}.{clientscope_id} in realm {realm}: {e}",
|
||||
)
|
||||
|
||||
return self.get_clientscope_scope_mappings_client(clientscope_id, client_id, realm)
|
||||
|
||||
def update_clientscope_scope_mappings_realm(self, payload: list[dict], clientscope_id: str, realm: str = "master"):
|
||||
"""Update and fetch the realm roles (scope-mappings) associated with the clientscope on the Keycloak server.
|
||||
:param payload: List of realm roles to be added to the scope.
|
||||
:param clientscope_id: ID of the clientscope to update scope-mappings.
|
||||
:param realm: Realm from which to obtain the roles.
|
||||
:return: The clientscope realm scope-mappings.
|
||||
"""
|
||||
client_role_scope_url = URL_CLIENTSCOPE_SCOPE_MAPPINGS_REALM.format(
|
||||
url=self.baseurl, realm=realm, id=clientscope_id
|
||||
)
|
||||
try:
|
||||
self._request(client_role_scope_url, method="POST", data=json.dumps(payload))
|
||||
|
||||
except Exception as e:
|
||||
self.fail_request(
|
||||
e, msg=f"Could not update scope mappings for client-scope {clientscope_id} in realm {realm}: {e}"
|
||||
)
|
||||
|
||||
return self.get_clientscope_scope_mappings_realm(clientscope_id, realm)
|
||||
|
||||
def delete_clientscope_scope_mappings_client(
|
||||
self, payload: list[dict], clientscope_id: str, client_id: str, realm: str = "master"
|
||||
):
|
||||
"""Delete the client roles (scope_mappings) contained in the payload from the clientscope on the Keycloak server.
|
||||
:param payload: List of roles to be deleted.
|
||||
:param clientscope_id: ID of the clientscope to delete roles from scope-mappings.
|
||||
:param clientid: ID of the client who owns the roles.
|
||||
:param realm: Realm from which to obtain the client.
|
||||
:return: The clientscope client scope-mappings.
|
||||
"""
|
||||
client_role_scope_url = URL_CLIENTSCOPE_SCOPE_MAPPINGS_CLIENT.format(
|
||||
url=self.baseurl, realm=realm, id=clientscope_id, client=client_id
|
||||
)
|
||||
try:
|
||||
self._request(client_role_scope_url, method="DELETE", data=json.dumps(payload))
|
||||
|
||||
except Exception as e:
|
||||
self.fail_request(
|
||||
e,
|
||||
msg=f"Could not delete scope mappings for client-scope {client_id}.{clientscope_id} in realm {realm}: {e}",
|
||||
)
|
||||
|
||||
return self.get_clientscope_scope_mappings_client(clientscope_id, client_id, realm)
|
||||
|
||||
def delete_clientscope_scope_mappings_realm(self, payload: list[dict], clientscope_id: str, realm: str = "master"):
|
||||
"""Delete the realm roles (scope_mappings) contained in the payload from the clientscope on the Keycloak server.
|
||||
:param payload: List of roles to be deleted.
|
||||
:param clientscope_id: ID of the clientscope to delete roles from scope-mappings.
|
||||
:param realm: Realm from which to obtain the roles.
|
||||
:return: The clientscope realm scope-mappings.
|
||||
"""
|
||||
client_role_scope_url = URL_CLIENTSCOPE_SCOPE_MAPPINGS_REALM.format(
|
||||
url=self.baseurl, realm=realm, id=clientscope_id
|
||||
)
|
||||
try:
|
||||
self._request(client_role_scope_url, method="DELETE", data=json.dumps(payload))
|
||||
|
||||
except Exception as e:
|
||||
self.fail_request(
|
||||
e, msg=f"Could not delete scope mappings for client-scope {clientscope_id} in realm {realm}: {e}"
|
||||
)
|
||||
|
||||
return self.get_clientscope_scope_mappings_realm(clientscope_id, realm)
|
||||
|
||||
def get_client_role_scope_from_realm(self, clientid, realm: str = "master"):
|
||||
"""Fetch the realm roles from the client's scope on the Keycloak server.
|
||||
:param clientid: ID of the client from which to obtain the associated realm roles.
|
||||
|
||||
Reference in New Issue
Block a user