Fix scale wait and add tests

Move wait logic out of raw and into common and use that
logic in scale

Fix a few broken wait condition cases highlighted by scaling
up and down

Move scale-related tests into dedicated test task file

Additional service related tests
This commit is contained in:
Will Thames
2020-05-22 12:39:43 +10:00
parent 3bdfb4745a
commit beebe98ce6
7 changed files with 486 additions and 170 deletions

View File

@@ -20,8 +20,6 @@ from __future__ import absolute_import, division, print_function
__metaclass__ = type
import copy
import math
import time
from ansible_collections.community.kubernetes.plugins.module_utils.common import AUTH_ARG_SPEC, COMMON_ARG_SPEC
from ansible_collections.community.kubernetes.plugins.module_utils.common import KubernetesAnsibleModule
@@ -29,17 +27,9 @@ from ansible.module_utils.six import string_types
try:
import yaml
from openshift.dynamic.client import ResourceInstance
from openshift.dynamic.exceptions import NotFoundError
except ImportError:
pass
try:
from openshift import watch
except ImportError:
try:
from openshift.dynamic.client import watch
except ImportError:
pass
SCALE_ARG_SPEC = {
@@ -112,7 +102,9 @@ class KubernetesAnsibleScaleModule(KubernetesAnsibleModule):
wait_time = self.params.get('wait_timeout')
existing = None
existing_count = None
return_attributes = dict(changed=False, result=dict())
return_attributes = dict(changed=False, result=dict(), diff=dict())
if wait:
return_attributes['duration'] = 0
resource = self.find_resource(kind, api_version, fail=True)
@@ -142,10 +134,9 @@ class KubernetesAnsibleScaleModule(KubernetesAnsibleModule):
if not self.check_mode:
if self.kind == 'job':
existing.spec.parallelism = replicas
k8s_obj = resource.patch(existing.to_dict())
return_attributes['result'] = resource.patch(existing.to_dict()).to_dict()
else:
k8s_obj = self.scale(resource, existing, replicas, wait, wait_time)
return_attributes['result'] = k8s_obj.to_dict()
return_attributes = self.scale(resource, existing, replicas, wait, wait_time)
self.exit_json(**return_attributes)
@@ -161,86 +152,31 @@ class KubernetesAnsibleScaleModule(KubernetesAnsibleModule):
def scale(self, resource, existing_object, replicas, wait, wait_time):
name = existing_object.metadata.name
namespace = existing_object.metadata.namespace
kind = existing_object.kind
if not hasattr(resource, 'scale'):
self.fail_json(
msg="Cannot perform scale on resource of kind {0}".format(resource.kind)
)
scale_obj = {'metadata': {'name': name, 'namespace': namespace}, 'spec': {'replicas': replicas}}
scale_obj = {'kind': kind, 'metadata': {'name': name, 'namespace': namespace}, 'spec': {'replicas': replicas}}
return_obj = None
stream = None
if wait:
w, stream = self._create_stream(resource, namespace, wait_time)
existing = resource.get(name=name, namespace=namespace)
try:
resource.scale.patch(body=scale_obj)
except Exception as exc:
self.fail_json(
msg="Scale request failed: {0}".format(exc)
)
self.fail_json(msg="Scale request failed: {0}".format(exc))
if wait and stream is not None:
return_obj = self._read_stream(resource, w, stream, name, replicas)
k8s_obj = resource.get(name=name, namespace=namespace).to_dict()
match, diffs = self.diff_objects(existing.to_dict(), k8s_obj)
result = dict()
result['result'] = k8s_obj
result['changed'] = not match
result['diff'] = diffs
if not return_obj:
return_obj = self._wait_for_response(resource, name, namespace)
return return_obj
def _create_stream(self, resource, namespace, wait_time):
""" Create a stream of events for the object """
w = None
stream = None
w = watch.Watch()
w._api_client = self.client.client
if namespace:
stream = w.stream(resource.get, serialize=False, namespace=namespace, timeout_seconds=wait_time)
else:
stream = w.stream(resource.get, serialize=False, namespace=namespace, timeout_seconds=wait_time)
return w, stream
def _read_stream(self, resource, watcher, stream, name, replicas):
""" Wait for ready_replicas to equal the requested number of replicas. """
return_obj = None
try:
for event in stream:
if event.get('object'):
obj = ResourceInstance(resource, event['object'])
if obj.metadata.name == name and hasattr(obj, 'status'):
if replicas == 0:
if not hasattr(obj.status, 'readyReplicas') or not obj.status.readyReplicas:
return_obj = obj
watcher.stop()
break
if hasattr(obj.status, 'readyReplicas') and obj.status.readyReplicas == replicas:
return_obj = obj
watcher.stop()
break
except Exception as exc:
self.fail_json(msg="Exception reading event stream: {0}".format(exc))
if not return_obj:
self.fail_json(msg="Error fetching the patched object. Try a higher wait_timeout value.")
if replicas and return_obj.status.readyReplicas is None:
self.fail_json(msg="Failed to fetch the number of ready replicas. Try a higher wait_timeout value.")
if replicas and return_obj.status.readyReplicas != replicas:
self.fail_json(msg="Number of ready replicas is {0}. Failed to reach {1} ready replicas within "
"the wait_timeout period.".format(return_obj.status.ready_replicas, replicas))
return return_obj
def _wait_for_response(self, resource, name, namespace):
""" Wait for an API response """
tries = 0
half = math.ceil(20 / 2)
obj = None
while tries <= half:
obj = resource.get(name=name, namespace=namespace)
if obj:
break
tries += 2
time.sleep(2)
return obj
if wait:
success, result['result'], result['duration'] = self.wait(resource, scale_obj, 5, wait_time)
if not success:
self.fail_json(msg="Resource scaling timed out", **result)
return result