mirror of
https://github.com/ansible-collections/kubernetes.core.git
synced 2026-03-26 21:33:02 +00:00
k8s: Display warnings to users (#701)
k8s: Display warnings to users SUMMARY This changes K8sService and the k8s module so warnings returned by the K8S API are displayed to the user. Fixes kubevirt/kubevirt.core#30 Fixes kubevirt/kubevirt.core#31 ISSUE TYPE Feature Pull Request COMPONENT NAME k8s module K8sService ADDITIONAL INFORMATION Before: TASK [Create VM] ********************************************************************************************************************************************** ok: [localhost] After: TASK [Create VM] ********************************************************************************************************************************************** [WARNING]: unknown field "spec.template.spec.disk" [WARNING]: unknown field "spec.template.spec.domain.bogus" ok: [localhost] Reviewed-by: Adam Miller <admiller@redhat.com> Reviewed-by: Mike Graves <mgraves@redhat.com> Reviewed-by: Felix Matouschek <felix@matouschek.org>
This commit is contained in:
@@ -0,0 +1,3 @@
|
||||
---
|
||||
minor_changes:
|
||||
- k8s - The module and K8sService were changed so warnings returned by the K8S API are now displayed to the user.
|
||||
@@ -149,6 +149,7 @@ def k8s_apply(resource, definition, **kwargs):
|
||||
force_conflicts=kwargs.get("force_conflicts"),
|
||||
field_manager=kwargs.get("field_manager"),
|
||||
dry_run=kwargs.get("dry_run"),
|
||||
serialize=kwargs.get("serialize"),
|
||||
)
|
||||
if not existing:
|
||||
return resource.create(
|
||||
@@ -158,6 +159,7 @@ def k8s_apply(resource, definition, **kwargs):
|
||||
return resource.get(
|
||||
name=definition["metadata"]["name"],
|
||||
namespace=definition["metadata"].get("namespace"),
|
||||
**kwargs
|
||||
)
|
||||
return resource.patch(
|
||||
body=desired,
|
||||
|
||||
@@ -139,6 +139,7 @@ def perform_action(svc, definition: Dict, params: Dict) -> Dict:
|
||||
|
||||
result = {"changed": False, "result": {}}
|
||||
instance = {}
|
||||
warnings = []
|
||||
|
||||
resource = svc.find_resource(kind, api_version, fail=True)
|
||||
definition["kind"] = resource.kind
|
||||
@@ -172,7 +173,7 @@ def perform_action(svc, definition: Dict, params: Dict) -> Dict:
|
||||
return result
|
||||
|
||||
if params.get("apply"):
|
||||
instance = svc.apply(resource, definition, existing)
|
||||
instance, warnings = svc.apply(resource, definition, existing)
|
||||
result["method"] = "apply"
|
||||
elif not existing:
|
||||
if state == "patched":
|
||||
@@ -183,16 +184,19 @@ def perform_action(svc, definition: Dict, params: Dict) -> Dict:
|
||||
)
|
||||
)
|
||||
return result
|
||||
instance = svc.create(resource, definition)
|
||||
instance, warnings = svc.create(resource, definition)
|
||||
result["method"] = "create"
|
||||
result["changed"] = True
|
||||
elif params.get("force", False):
|
||||
instance = svc.replace(resource, definition, existing)
|
||||
instance, warnings = svc.replace(resource, definition, existing)
|
||||
result["method"] = "replace"
|
||||
else:
|
||||
instance = svc.update(resource, definition, existing)
|
||||
instance, warnings = svc.update(resource, definition, existing)
|
||||
result["method"] = "update"
|
||||
|
||||
if warnings:
|
||||
result["warnings"] = warnings
|
||||
|
||||
# If needed, wait and/or create diff
|
||||
success = True
|
||||
|
||||
|
||||
@@ -2,6 +2,8 @@
|
||||
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||
|
||||
import copy
|
||||
from json import loads
|
||||
from re import compile
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
from ansible.module_utils.common.dict_transformations import dict_merge
|
||||
@@ -142,7 +144,7 @@ class K8sService:
|
||||
name: str,
|
||||
namespace: str,
|
||||
merge_type: str = None,
|
||||
) -> Dict:
|
||||
) -> Tuple[Dict, List[str]]:
|
||||
if merge_type == "json":
|
||||
self.module.deprecate(
|
||||
msg="json as a merge_type value is deprecated. Please use the k8s_json_patch module instead.",
|
||||
@@ -150,10 +152,10 @@ class K8sService:
|
||||
collection_name="kubernetes.core",
|
||||
)
|
||||
try:
|
||||
params = dict(name=name, namespace=namespace)
|
||||
params = dict(name=name, namespace=namespace, serialize=False)
|
||||
if merge_type:
|
||||
params["content_type"] = "application/{0}-patch+json".format(merge_type)
|
||||
return self.client.patch(resource, definition, **params).to_dict()
|
||||
return decode_response(self.client.patch(resource, definition, **params))
|
||||
except Exception as e:
|
||||
reason = e.body if hasattr(e, "body") else e
|
||||
msg = "Failed to patch object: {0}".format(reason)
|
||||
@@ -330,123 +332,124 @@ class K8sService:
|
||||
result["resources"].append(hide_fields(res, hidden_fields))
|
||||
return result
|
||||
|
||||
def create(self, resource: Resource, definition: Dict) -> Dict:
|
||||
def create(self, resource: Resource, definition: Dict) -> Tuple[Dict, List[str]]:
|
||||
namespace = definition["metadata"].get("namespace")
|
||||
name = definition["metadata"].get("name")
|
||||
|
||||
if self._client_side_dry_run:
|
||||
k8s_obj = _encode_stringdata(definition)
|
||||
else:
|
||||
try:
|
||||
k8s_obj = self.client.create(
|
||||
resource, definition, namespace=namespace
|
||||
).to_dict()
|
||||
except ConflictError:
|
||||
# Some resources, like ProjectRequests, can't be created multiple times,
|
||||
# because the resources that they create don't match their kind
|
||||
# In this case we'll mark it as unchanged and warn the user
|
||||
self.module.warn(
|
||||
"{0} was not found, but creating it returned a 409 Conflict error. This can happen \
|
||||
if the resource you are creating does not directly create a resource of the same kind.".format(
|
||||
name
|
||||
)
|
||||
return _encode_stringdata(definition), []
|
||||
|
||||
try:
|
||||
return decode_response(
|
||||
self.client.create(
|
||||
resource, definition, namespace=namespace, serialize=False
|
||||
)
|
||||
return dict()
|
||||
except Exception as e:
|
||||
reason = e.body if hasattr(e, "body") else e
|
||||
msg = "Failed to create object: {0}".format(reason)
|
||||
raise CoreException(msg) from e
|
||||
return k8s_obj
|
||||
)
|
||||
except ConflictError:
|
||||
# Some resources, like ProjectRequests, can't be created multiple times,
|
||||
# because the resources that they create don't match their kind
|
||||
# In this case we'll mark it as unchanged and warn the user
|
||||
self.module.warn(
|
||||
"{0} was not found, but creating it returned a 409 Conflict error. This can happen \
|
||||
if the resource you are creating does not directly create a resource of the same kind.".format(
|
||||
name
|
||||
)
|
||||
)
|
||||
return dict(), []
|
||||
except Exception as e:
|
||||
reason = e.body if hasattr(e, "body") else e
|
||||
msg = "Failed to create object: {0}".format(reason)
|
||||
raise CoreException(msg) from e
|
||||
|
||||
def apply(
|
||||
self,
|
||||
resource: Resource,
|
||||
definition: Dict,
|
||||
existing: Optional[ResourceInstance] = None,
|
||||
) -> Dict:
|
||||
) -> Tuple[Dict, List[str]]:
|
||||
namespace = definition["metadata"].get("namespace")
|
||||
|
||||
server_side_apply = self.module.params.get("server_side_apply")
|
||||
if server_side_apply:
|
||||
requires("kubernetes", "19.15.0", reason="to use server side apply")
|
||||
|
||||
if self._client_side_dry_run:
|
||||
ignored, patch = apply_object(resource, _encode_stringdata(definition))
|
||||
if existing:
|
||||
k8s_obj = dict_merge(existing.to_dict(), patch)
|
||||
return dict_merge(existing.to_dict(), patch), []
|
||||
else:
|
||||
k8s_obj = patch
|
||||
else:
|
||||
try:
|
||||
params = {}
|
||||
if server_side_apply:
|
||||
params["server_side"] = True
|
||||
params.update(server_side_apply)
|
||||
k8s_obj = self.client.apply(
|
||||
resource, definition, namespace=namespace, **params
|
||||
).to_dict()
|
||||
except Exception as e:
|
||||
reason = e.body if hasattr(e, "body") else e
|
||||
msg = "Failed to apply object: {0}".format(reason)
|
||||
raise CoreException(msg) from e
|
||||
return k8s_obj
|
||||
return patch, []
|
||||
|
||||
try:
|
||||
params = {}
|
||||
if server_side_apply:
|
||||
params["server_side"] = True
|
||||
params.update(server_side_apply)
|
||||
return decode_response(
|
||||
self.client.apply(
|
||||
resource, definition, namespace=namespace, serialize=False, **params
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
reason = e.body if hasattr(e, "body") else e
|
||||
msg = "Failed to apply object: {0}".format(reason)
|
||||
raise CoreException(msg) from e
|
||||
|
||||
def replace(
|
||||
self,
|
||||
resource: Resource,
|
||||
definition: Dict,
|
||||
existing: ResourceInstance,
|
||||
) -> Dict:
|
||||
) -> Tuple[Dict, List[str]]:
|
||||
append_hash = self.module.params.get("append_hash", False)
|
||||
name = definition["metadata"].get("name")
|
||||
namespace = definition["metadata"].get("namespace")
|
||||
|
||||
if self._client_side_dry_run:
|
||||
k8s_obj = _encode_stringdata(definition)
|
||||
else:
|
||||
try:
|
||||
k8s_obj = self.client.replace(
|
||||
return _encode_stringdata(definition), []
|
||||
|
||||
try:
|
||||
return decode_response(
|
||||
self.client.replace(
|
||||
resource,
|
||||
definition,
|
||||
name=name,
|
||||
namespace=namespace,
|
||||
append_hash=append_hash,
|
||||
).to_dict()
|
||||
except Exception as e:
|
||||
reason = e.body if hasattr(e, "body") else e
|
||||
msg = "Failed to replace object: {0}".format(reason)
|
||||
raise CoreException(msg) from e
|
||||
return k8s_obj
|
||||
serialize=False,
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
reason = e.body if hasattr(e, "body") else e
|
||||
msg = "Failed to replace object: {0}".format(reason)
|
||||
raise CoreException(msg) from e
|
||||
|
||||
def update(
|
||||
self, resource: Resource, definition: Dict, existing: ResourceInstance
|
||||
) -> Dict:
|
||||
) -> Tuple[Dict, List[str]]:
|
||||
name = definition["metadata"].get("name")
|
||||
namespace = definition["metadata"].get("namespace")
|
||||
|
||||
if self._client_side_dry_run:
|
||||
k8s_obj = dict_merge(existing.to_dict(), _encode_stringdata(definition))
|
||||
else:
|
||||
exception = None
|
||||
for merge_type in self.module.params.get("merge_type") or [
|
||||
"strategic-merge",
|
||||
"merge",
|
||||
]:
|
||||
try:
|
||||
k8s_obj = self.patch_resource(
|
||||
resource,
|
||||
definition,
|
||||
name,
|
||||
namespace,
|
||||
merge_type=merge_type,
|
||||
)
|
||||
exception = None
|
||||
except CoreException as e:
|
||||
exception = e
|
||||
continue
|
||||
break
|
||||
if exception:
|
||||
raise exception
|
||||
return k8s_obj
|
||||
return dict_merge(existing.to_dict(), _encode_stringdata(definition)), []
|
||||
|
||||
exception = None
|
||||
for merge_type in self.module.params.get("merge_type") or [
|
||||
"strategic-merge",
|
||||
"merge",
|
||||
]:
|
||||
try:
|
||||
return self.patch_resource(
|
||||
resource,
|
||||
definition,
|
||||
name,
|
||||
namespace,
|
||||
merge_type=merge_type,
|
||||
)
|
||||
except CoreException as e:
|
||||
exception = e
|
||||
continue
|
||||
raise exception
|
||||
|
||||
def delete(
|
||||
self,
|
||||
@@ -543,3 +546,83 @@ def hide_field(definition: dict, hidden_field: str) -> dict:
|
||||
else:
|
||||
del definition[split[0]]
|
||||
return definition
|
||||
|
||||
|
||||
def decode_response(resp) -> Tuple[Dict, List[str]]:
|
||||
"""
|
||||
This function decodes unserialized responses from the Kubernetes python
|
||||
client and decodes the RFC2616 14.46 warnings found in the response
|
||||
headers.
|
||||
"""
|
||||
obj = ResourceInstance(None, loads(resp.data.decode("utf8"))).to_dict()
|
||||
warnings = []
|
||||
if (
|
||||
resp.headers is not None
|
||||
and "warning" in resp.headers
|
||||
and resp.headers["warning"] is not None
|
||||
):
|
||||
warnings = resp.headers["warning"].split(", ")
|
||||
return obj, decode_warnings(warnings)
|
||||
|
||||
|
||||
def decode_warnings(warnings: str) -> List[str]:
|
||||
"""
|
||||
This function decodes RFC2616 14.46 warnings in a simplified way, where
|
||||
only the warn-texts are returned in a list.
|
||||
"""
|
||||
p = compile('\\d{3} .+ (".+")')
|
||||
|
||||
decoded = []
|
||||
for warning in warnings:
|
||||
m = p.match(warning)
|
||||
if m:
|
||||
try:
|
||||
parsed, unused = parse_quoted_string(m.group(1))
|
||||
decoded.append(parsed)
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
return decoded
|
||||
|
||||
|
||||
def parse_quoted_string(quoted_string: str) -> Tuple[str, str]:
|
||||
"""
|
||||
This function was adapted from:
|
||||
https://github.com/kubernetes/apimachinery/blob/bb8822152cabfb4f34dbc26270f874ce53db50de/pkg/util/net/http.go#L609
|
||||
"""
|
||||
if len(quoted_string) == 0:
|
||||
raise ValueError("invalid quoted string: 0-length")
|
||||
|
||||
if quoted_string[0] != '"':
|
||||
raise ValueError("invalid quoted string: missing initial quote")
|
||||
|
||||
quoted_string = quoted_string[1:]
|
||||
remainder = ""
|
||||
escaping = False
|
||||
closed_quote = False
|
||||
result = []
|
||||
|
||||
for i, b in enumerate(quoted_string):
|
||||
if b == '"':
|
||||
if escaping:
|
||||
result.append(b)
|
||||
escaping = False
|
||||
else:
|
||||
closed_quote = True
|
||||
remainder_start = i + 1
|
||||
remainder = quoted_string[remainder_start:].strip()
|
||||
break
|
||||
elif b == "\\":
|
||||
if escaping:
|
||||
result.append(b)
|
||||
escaping = False
|
||||
else:
|
||||
escaping = True
|
||||
else:
|
||||
result.append(b)
|
||||
escaping = False
|
||||
|
||||
if not closed_quote:
|
||||
raise ValueError("invalid quoted string: missing closing quote")
|
||||
|
||||
return "".join(result), remainder
|
||||
|
||||
@@ -31,7 +31,7 @@ modified_def["metadata"]["labels"]["environment"] = "testing"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"action, params, existing, instance, expected",
|
||||
"action, params, existing, instance_warnings, expected",
|
||||
[
|
||||
(
|
||||
"delete",
|
||||
@@ -51,14 +51,26 @@ modified_def["metadata"]["labels"]["environment"] = "testing"
|
||||
"apply",
|
||||
{"apply": "yes"},
|
||||
{},
|
||||
definition,
|
||||
(definition, []),
|
||||
{"changed": True, "method": "apply", "result": definition},
|
||||
),
|
||||
(
|
||||
"apply",
|
||||
{"apply": "yes"},
|
||||
{},
|
||||
(definition, ["test warning"]),
|
||||
{
|
||||
"changed": True,
|
||||
"method": "apply",
|
||||
"result": definition,
|
||||
"warnings": ["test warning"],
|
||||
},
|
||||
),
|
||||
(
|
||||
"create",
|
||||
{"state": "patched"},
|
||||
{},
|
||||
{},
|
||||
({}, []),
|
||||
{
|
||||
"changed": False,
|
||||
"result": {},
|
||||
@@ -71,42 +83,78 @@ modified_def["metadata"]["labels"]["environment"] = "testing"
|
||||
"create",
|
||||
{},
|
||||
{},
|
||||
definition,
|
||||
(definition, []),
|
||||
{"changed": True, "method": "create", "result": definition},
|
||||
),
|
||||
(
|
||||
"create",
|
||||
{},
|
||||
{},
|
||||
(definition, ["test warning"]),
|
||||
{
|
||||
"changed": True,
|
||||
"method": "create",
|
||||
"result": definition,
|
||||
"warnings": ["test warning"],
|
||||
},
|
||||
),
|
||||
(
|
||||
"replace",
|
||||
{"force": "yes"},
|
||||
definition,
|
||||
definition,
|
||||
(definition, []),
|
||||
{"changed": False, "method": "replace", "result": definition},
|
||||
),
|
||||
(
|
||||
"replace",
|
||||
{"force": "yes"},
|
||||
definition,
|
||||
modified_def,
|
||||
(modified_def, []),
|
||||
{"changed": True, "method": "replace", "result": modified_def},
|
||||
),
|
||||
(
|
||||
"replace",
|
||||
{"force": "yes"},
|
||||
definition,
|
||||
(modified_def, ["test warning"]),
|
||||
{
|
||||
"changed": True,
|
||||
"method": "replace",
|
||||
"result": modified_def,
|
||||
"warnings": ["test warning"],
|
||||
},
|
||||
),
|
||||
(
|
||||
"update",
|
||||
{},
|
||||
definition,
|
||||
definition,
|
||||
(definition, []),
|
||||
{"changed": False, "method": "update", "result": definition},
|
||||
),
|
||||
(
|
||||
"update",
|
||||
{},
|
||||
definition,
|
||||
modified_def,
|
||||
(modified_def, []),
|
||||
{"changed": True, "method": "update", "result": modified_def},
|
||||
),
|
||||
(
|
||||
"update",
|
||||
{},
|
||||
definition,
|
||||
(modified_def, ["test warning"]),
|
||||
{
|
||||
"changed": True,
|
||||
"method": "update",
|
||||
"result": modified_def,
|
||||
"warnings": ["test warning"],
|
||||
},
|
||||
),
|
||||
(
|
||||
"create",
|
||||
{"label_selectors": ["app=foo"]},
|
||||
{},
|
||||
definition,
|
||||
(definition, []),
|
||||
{
|
||||
"changed": False,
|
||||
"msg": "resource 'kind=Pod,name=foo,namespace=foo' filtered by label_selectors.",
|
||||
@@ -116,18 +164,18 @@ modified_def["metadata"]["labels"]["environment"] = "testing"
|
||||
"create",
|
||||
{"label_selectors": ["app=nginx"]},
|
||||
{},
|
||||
definition,
|
||||
(definition, []),
|
||||
{"changed": True, "method": "create", "result": definition},
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_perform_action(action, params, existing, instance, expected):
|
||||
def test_perform_action(action, params, existing, instance_warnings, expected):
|
||||
svc = Mock()
|
||||
svc.find_resource.return_value = Mock(
|
||||
kind=definition["kind"], group_version=definition["apiVersion"]
|
||||
)
|
||||
svc.retrieve.return_value = ResourceInstance(None, existing) if existing else None
|
||||
spec = {action + ".return_value": instance}
|
||||
spec = {action + ".return_value": instance_warnings}
|
||||
svc.configure_mock(**spec)
|
||||
|
||||
result = perform_action(svc, definition, params)
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
from json import dumps
|
||||
from unittest.mock import Mock
|
||||
|
||||
import pytest
|
||||
from ansible_collections.kubernetes.core.plugins.module_utils.k8s.service import (
|
||||
K8sService,
|
||||
diff_objects,
|
||||
parse_quoted_string,
|
||||
)
|
||||
from kubernetes.dynamic.exceptions import NotFoundError
|
||||
from kubernetes.dynamic.resource import Resource, ResourceInstance
|
||||
@@ -57,6 +59,22 @@ def mock_pod_updated_resource_instance():
|
||||
return ResourceInstance(None, pod_definition_updated)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def mock_pod_response():
|
||||
resp = Mock()
|
||||
resp.data.decode.return_value = dumps(pod_definition)
|
||||
resp.headers = {}
|
||||
return resp
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def mock_pod_warnings_response():
|
||||
resp = Mock()
|
||||
resp.data.decode.return_value = dumps(pod_definition)
|
||||
resp.headers = {"warning": '299 - "test warning 1", 299 - "test warning 2"'}
|
||||
return resp
|
||||
|
||||
|
||||
def test_diff_objects_no_diff():
|
||||
match, diff = diff_objects(pod_definition, pod_definition)
|
||||
|
||||
@@ -159,16 +177,33 @@ def test_service_delete_existing_resource_check_mode(mock_pod_resource_instance)
|
||||
client.delete.assert_not_called()
|
||||
|
||||
|
||||
def test_service_create_resource(mock_pod_resource_instance):
|
||||
spec = {"create.side_effect": [mock_pod_resource_instance]}
|
||||
def test_service_create_resource(mock_pod_response, mock_pod_resource_instance):
|
||||
spec = {"create.side_effect": [mock_pod_response]}
|
||||
client = Mock(**spec)
|
||||
module = Mock()
|
||||
module.params = {}
|
||||
module.check_mode = False
|
||||
svc = K8sService(client, module)
|
||||
result = svc.create(Mock(), pod_definition)
|
||||
result, warnings = svc.create(Mock(), pod_definition)
|
||||
|
||||
assert result == mock_pod_resource_instance.to_dict()
|
||||
assert not warnings
|
||||
|
||||
|
||||
def test_service_create_resource_warnings(
|
||||
mock_pod_warnings_response, mock_pod_resource_instance
|
||||
):
|
||||
spec = {"create.side_effect": [mock_pod_warnings_response]}
|
||||
client = Mock(**spec)
|
||||
module = Mock()
|
||||
module.params = {}
|
||||
module.check_mode = False
|
||||
svc = K8sService(client, module)
|
||||
result, warnings = svc.create(Mock(), pod_definition)
|
||||
|
||||
assert result == mock_pod_resource_instance.to_dict()
|
||||
assert warnings[0] == "test warning 1"
|
||||
assert warnings[1] == "test warning 2"
|
||||
|
||||
|
||||
def test_service_create_resource_check_mode():
|
||||
@@ -176,9 +211,10 @@ def test_service_create_resource_check_mode():
|
||||
client.create.return_value = mock_pod_resource_instance
|
||||
module = Mock(params={}, check_mode=True)
|
||||
svc = K8sService(client, module)
|
||||
result = svc.create(Mock(), pod_definition)
|
||||
result, warnings = svc.create(Mock(), pod_definition)
|
||||
|
||||
assert result == pod_definition
|
||||
assert not warnings
|
||||
client.create.assert_not_called()
|
||||
|
||||
|
||||
@@ -224,40 +260,99 @@ def test_create_project_request():
|
||||
assert results["result"] == project_definition
|
||||
|
||||
|
||||
def test_service_apply_existing_resource(mock_pod_resource_instance):
|
||||
spec = {"apply.side_effect": [mock_pod_resource_instance]}
|
||||
def test_service_apply_existing_resource(mock_pod_response, mock_pod_resource_instance):
|
||||
spec = {"apply.side_effect": [mock_pod_response]}
|
||||
client = Mock(**spec)
|
||||
module = Mock()
|
||||
module.params = {"apply": True}
|
||||
module.check_mode = False
|
||||
svc = K8sService(client, module)
|
||||
result = svc.apply(Mock(), pod_definition_updated, mock_pod_resource_instance)
|
||||
result, warnings = svc.apply(
|
||||
Mock(), pod_definition_updated, mock_pod_resource_instance
|
||||
)
|
||||
|
||||
assert result == mock_pod_resource_instance.to_dict()
|
||||
assert not warnings
|
||||
|
||||
|
||||
def test_service_replace_existing_resource(mock_pod_resource_instance):
|
||||
spec = {"replace.side_effect": [mock_pod_resource_instance]}
|
||||
def test_service_apply_existing_resource_warnings(
|
||||
mock_pod_warnings_response, mock_pod_resource_instance
|
||||
):
|
||||
spec = {"apply.side_effect": [mock_pod_warnings_response]}
|
||||
client = Mock(**spec)
|
||||
module = Mock()
|
||||
module.params = {"apply": True}
|
||||
module.check_mode = False
|
||||
svc = K8sService(client, module)
|
||||
result, warnings = svc.apply(
|
||||
Mock(), pod_definition_updated, mock_pod_resource_instance
|
||||
)
|
||||
|
||||
assert result == mock_pod_resource_instance.to_dict()
|
||||
assert warnings[0] == "test warning 1"
|
||||
assert warnings[1] == "test warning 2"
|
||||
|
||||
|
||||
def test_service_replace_existing_resource(
|
||||
mock_pod_response, mock_pod_resource_instance
|
||||
):
|
||||
spec = {"replace.side_effect": [mock_pod_response]}
|
||||
client = Mock(**spec)
|
||||
module = Mock()
|
||||
module.params = {}
|
||||
module.check_mode = False
|
||||
svc = K8sService(client, module)
|
||||
result = svc.replace(Mock(), pod_definition, mock_pod_resource_instance)
|
||||
result, warnings = svc.replace(Mock(), pod_definition, mock_pod_resource_instance)
|
||||
|
||||
assert result == mock_pod_resource_instance.to_dict()
|
||||
assert not warnings
|
||||
|
||||
|
||||
def test_service_update_existing_resource(mock_pod_resource_instance):
|
||||
spec = {"replace.side_effect": [mock_pod_resource_instance]}
|
||||
def test_service_replace_existing_resource_warnings(
|
||||
mock_pod_warnings_response, mock_pod_resource_instance
|
||||
):
|
||||
spec = {"replace.side_effect": [mock_pod_warnings_response]}
|
||||
client = Mock(**spec)
|
||||
module = Mock()
|
||||
module.params = {}
|
||||
module.check_mode = False
|
||||
svc = K8sService(client, module)
|
||||
result = svc.replace(Mock(), pod_definition, mock_pod_resource_instance)
|
||||
result, warnings = svc.replace(Mock(), pod_definition, mock_pod_resource_instance)
|
||||
|
||||
assert result == mock_pod_resource_instance.to_dict()
|
||||
assert warnings[0] == "test warning 1"
|
||||
assert warnings[1] == "test warning 2"
|
||||
|
||||
|
||||
def test_service_update_existing_resource(
|
||||
mock_pod_response, mock_pod_resource_instance
|
||||
):
|
||||
spec = {"replace.side_effect": [mock_pod_response]}
|
||||
client = Mock(**spec)
|
||||
module = Mock()
|
||||
module.params = {}
|
||||
module.check_mode = False
|
||||
svc = K8sService(client, module)
|
||||
result, warnings = svc.replace(Mock(), pod_definition, mock_pod_resource_instance)
|
||||
|
||||
assert result == mock_pod_resource_instance.to_dict()
|
||||
assert not warnings
|
||||
|
||||
|
||||
def test_service_update_existing_resource_warnings(
|
||||
mock_pod_warnings_response, mock_pod_resource_instance
|
||||
):
|
||||
spec = {"replace.side_effect": [mock_pod_warnings_response]}
|
||||
client = Mock(**spec)
|
||||
module = Mock()
|
||||
module.params = {}
|
||||
module.check_mode = False
|
||||
svc = K8sService(client, module)
|
||||
result, warnings = svc.replace(Mock(), pod_definition, mock_pod_resource_instance)
|
||||
|
||||
assert result == mock_pod_resource_instance.to_dict()
|
||||
assert warnings[0] == "test warning 1"
|
||||
assert warnings[1] == "test warning 2"
|
||||
|
||||
|
||||
def test_service_find(mock_pod_resource_instance):
|
||||
@@ -288,3 +383,24 @@ def test_service_find_error():
|
||||
assert isinstance(results, dict)
|
||||
assert results["api_found"] is True
|
||||
assert results["resources"] == []
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"quoted_string,expected_val,expected_remainder",
|
||||
[
|
||||
(
|
||||
'"Response is stale" Tue, 15 Nov 1994 12:45:26 GMT',
|
||||
"Response is stale",
|
||||
"Tue, 15 Nov 1994 12:45:26 GMT",
|
||||
),
|
||||
(
|
||||
'"unknown field \\"spec.template.spec.disk\\""',
|
||||
'unknown field "spec.template.spec.disk"',
|
||||
"",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_parse_quoted_string(quoted_string, expected_val, expected_remainder):
|
||||
val, remainder = parse_quoted_string(quoted_string)
|
||||
assert val == expected_val
|
||||
assert remainder == expected_remainder
|
||||
|
||||
Reference in New Issue
Block a user