# Copyright (c) Ansible Project # GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) # SPDX-License-Identifier: GPL-3.0-or-later from __future__ import annotations from io import StringIO from itertools import count from urllib.error import HTTPError import pytest from ansible_collections.community.general.plugins.module_utils._keycloak import ( KeycloakError, get_token, ) module_params_creds = { "auth_keycloak_url": "http://keycloak.url/auth", "validate_certs": True, "auth_realm": "master", "client_id": "admin-cli", "auth_username": "admin", "auth_password": "admin", "client_secret": None, } def build_mocked_request(get_id_user_count, response_dict): def _mocked_requests(*args, **kwargs): url = args[0] method = kwargs["method"] future_response = response_dict.get(url, None) return get_response(future_response, method, get_id_user_count) return _mocked_requests def get_response(object_with_future_response, method, get_id_call_count): if callable(object_with_future_response): return object_with_future_response() if isinstance(object_with_future_response, dict): return get_response(object_with_future_response[method], method, get_id_call_count) if isinstance(object_with_future_response, list): try: call_number = get_id_call_count.__next__() except AttributeError: # manage python 2 versions. call_number = get_id_call_count.next() return get_response(object_with_future_response[call_number], method, get_id_call_count) return object_with_future_response def create_wrapper(text_as_string): """Allow to mock many times a call to one address. Without this function, the StringIO is empty for the second call. """ def _create_wrapper(): return StringIO(text_as_string) return _create_wrapper @pytest.fixture() def mock_good_connection(mocker): token_response = { "http://keycloak.url/auth/realms/master/protocol/openid-connect/token": create_wrapper( '{"access_token": "alongtoken"}' ), } return mocker.patch( "ansible_collections.community.general.plugins.module_utils._keycloak.open_url", side_effect=build_mocked_request(count(), token_response), autospec=True, ) def test_connect_to_keycloak_with_creds(mock_good_connection): keycloak_header = get_token(module_params_creds) assert keycloak_header == {"Authorization": "Bearer alongtoken", "Content-Type": "application/json"} def test_connect_to_keycloak_with_token(mock_good_connection): module_params_token = { "auth_keycloak_url": "http://keycloak.url/auth", "validate_certs": True, "client_id": "admin-cli", "token": "alongtoken", } keycloak_header = get_token(module_params_token) assert keycloak_header == {"Authorization": "Bearer alongtoken", "Content-Type": "application/json"} @pytest.fixture() def mock_bad_json_returned(mocker): token_response = { "http://keycloak.url/auth/realms/master/protocol/openid-connect/token": create_wrapper('{"access_token":'), } return mocker.patch( "ansible_collections.community.general.plugins.module_utils._keycloak.open_url", side_effect=build_mocked_request(count(), token_response), autospec=True, ) def test_bad_json_returned(mock_bad_json_returned): with pytest.raises(KeycloakError) as raised_error: get_token(module_params_creds) # cannot check all the message, different errors message for the value # error in python 2.6, 2.7 and 3.*. assert ( "API returned invalid JSON when trying to obtain access token from " "http://keycloak.url/auth/realms/master/protocol/openid-connect/token: " ) in str(raised_error.value) def raise_401(url): def _raise_401(): raise HTTPError(url=url, code=401, msg="Unauthorized", hdrs="", fp=StringIO("")) return _raise_401 @pytest.fixture() def mock_401_returned(mocker): token_response = { "http://keycloak.url/auth/realms/master/protocol/openid-connect/token": raise_401( "http://keycloak.url/auth/realms/master/protocol/openid-connect/token" ), } return mocker.patch( "ansible_collections.community.general.plugins.module_utils._keycloak.open_url", side_effect=build_mocked_request(count(), token_response), autospec=True, ) def test_error_returned(mock_401_returned): with pytest.raises(KeycloakError) as raised_error: get_token(module_params_creds) assert str(raised_error.value) == ( "Could not obtain access token from http://keycloak.url" "/auth/realms/master/protocol/openid-connect/token: " "HTTP Error 401: Unauthorized" ) @pytest.fixture() def mock_json_without_token_returned(mocker): token_response = { "http://keycloak.url/auth/realms/master/protocol/openid-connect/token": create_wrapper( '{"not_token": "It is not a token"}' ), } return mocker.patch( "ansible_collections.community.general.plugins.module_utils._keycloak.open_url", side_effect=build_mocked_request(count(), token_response), autospec=True, ) def test_json_without_token_returned(mock_json_without_token_returned): with pytest.raises(KeycloakError) as raised_error: get_token(module_params_creds) assert str(raised_error.value) == ( "API did not include access_token field in response from " "http://keycloak.url/auth/realms/master/protocol/openid-connect/token" )