Merge remote-tracking branch 'upstream/main' into merge-upstream

This commit is contained in:
Shaun Smiley
2020-09-18 13:22:53 -07:00
123 changed files with 3666 additions and 988 deletions

View File

@@ -18,14 +18,13 @@
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import copy
import json
from datetime import datetime
import time
import os
import traceback
from ansible.module_utils.basic import AnsibleModule, missing_required_lib
from ansible.module_utils.common.dict_transformations import recursive_diff
from ansible.module_utils.six import iteritems, string_types
from ansible.module_utils._text import to_native
@@ -34,7 +33,7 @@ try:
import kubernetes
import openshift
from openshift.dynamic import DynamicClient
from openshift.dynamic.exceptions import ResourceNotFoundError, ResourceNotUniqueError
from openshift.dynamic.exceptions import ResourceNotFoundError, ResourceNotUniqueError, NotFoundError
HAS_K8S_MODULE_HELPER = True
k8s_import_exception = None
except ImportError as e:
@@ -56,13 +55,14 @@ try:
except ImportError:
pass
try:
from openshift.dynamic.apply import recursive_diff
except ImportError:
from ansible.module_utils.common.dict_transformations import recursive_diff
def list_dict_str(value):
if isinstance(value, list):
return value
elif isinstance(value, dict):
return value
elif isinstance(value, string_types):
if isinstance(value, (list, dict, string_types)):
return value
raise TypeError
@@ -78,6 +78,9 @@ COMMON_ARG_SPEC = {
'type': 'bool',
'default': False,
},
}
RESOURCE_ARG_SPEC = {
'resource_definition': {
'type': list_dict_str,
'aliases': ['definition', 'inline']
@@ -85,6 +88,9 @@ COMMON_ARG_SPEC = {
'src': {
'type': 'path',
},
}
NAME_ARG_SPEC = {
'kind': {},
'name': {},
'namespace': {},
@@ -149,20 +155,15 @@ AUTH_ARG_MAP = {
class K8sAnsibleMixin(object):
_argspec_cache = None
@property
def argspec(self):
"""
Introspect the model properties, and return an Ansible module arg_spec dict.
:return: dict
"""
if self._argspec_cache:
return self._argspec_cache
argument_spec = copy.deepcopy(COMMON_ARG_SPEC)
argument_spec.update(copy.deepcopy(AUTH_ARG_SPEC))
self._argspec_cache = argument_spec
return self._argspec_cache
def __init__(self, *args, **kwargs):
if not HAS_K8S_MODULE_HELPER:
self.fail_json(msg=missing_required_lib('openshift'), exception=K8S_IMP_ERR,
error=to_native(k8s_import_exception))
self.openshift_version = openshift.__version__
if not HAS_YAML:
self.fail_json(msg=missing_required_lib("PyYAML"), exception=YAML_IMP_ERR)
def get_api_client(self, **auth_params):
auth_params = auth_params or getattr(self, 'params', {})
@@ -186,13 +187,19 @@ class K8sAnsibleMixin(object):
# We have enough in the parameters to authenticate, no need to load incluster or kubeconfig
pass
elif auth_set('kubeconfig') or auth_set('context'):
kubernetes.config.load_kube_config(auth.get('kubeconfig'), auth.get('context'), persist_config=auth.get('persist_config'))
try:
kubernetes.config.load_kube_config(auth.get('kubeconfig'), auth.get('context'), persist_config=auth.get('persist_config'))
except Exception as err:
self.fail(msg='Failed to load kubeconfig due to %s' % to_native(err))
else:
# First try to do incluster config, then kubeconfig
try:
kubernetes.config.load_incluster_config()
except kubernetes.config.ConfigException:
kubernetes.config.load_kube_config(auth.get('kubeconfig'), auth.get('context'), persist_config=auth.get('persist_config'))
try:
kubernetes.config.load_kube_config(auth.get('kubeconfig'), auth.get('context'), persist_config=auth.get('persist_config'))
except Exception as err:
self.fail(msg='Failed to load kubeconfig due to %s' % to_native(err))
# Override any values in the default configuration with Ansible parameters
configuration = kubernetes.client.Configuration()
@@ -204,7 +211,10 @@ class K8sAnsibleMixin(object):
setattr(configuration, key, value)
kubernetes.client.Configuration.set_default(configuration)
return DynamicClient(kubernetes.client.ApiClient(configuration))
try:
return DynamicClient(kubernetes.client.ApiClient(configuration))
except Exception as err:
self.fail(msg='Failed to get client due to %s' % to_native(err))
def find_resource(self, kind, api_version, fail=False):
for attribute in ['kind', 'name', 'singular_name']:
@@ -258,36 +268,165 @@ class K8sAnsibleMixin(object):
self.fail(msg="Error loading resource_definition: {0}".format(exc))
return result
@staticmethod
def diff_objects(existing, new):
def diff_objects(self, existing, new):
result = dict()
diff = recursive_diff(existing, new)
if diff:
result['before'] = diff[0]
result['after'] = diff[1]
return not diff, result
if not diff:
return True, result
result['before'] = diff[0]
result['after'] = diff[1]
class KubernetesAnsibleModule(AnsibleModule, K8sAnsibleMixin):
resource_definition = None
api_version = None
kind = None
# If only metadata.generation and metadata.resourceVersion changed, ignore it
ignored_keys = set(['generation', 'resourceVersion'])
def __init__(self, *args, **kwargs):
if list(result['after'].keys()) != ['metadata'] or list(result['before'].keys()) != ['metadata']:
return False, result
kwargs['argument_spec'] = self.argspec
AnsibleModule.__init__(self, *args, **kwargs)
if not set(result['after']['metadata'].keys()).issubset(ignored_keys):
return False, result
if not set(result['before']['metadata'].keys()).issubset(ignored_keys):
return False, result
if not HAS_K8S_MODULE_HELPER:
self.fail_json(msg=missing_required_lib('openshift'), exception=K8S_IMP_ERR,
error=to_native(k8s_import_exception))
self.openshift_version = openshift.__version__
if hasattr(self, 'warn'):
self.warn('No meaningful diff was generated, but the API may not be idempotent (only metadata.generation or metadata.resourceVersion were changed)')
if not HAS_YAML:
self.fail_json(msg=missing_required_lib("PyYAML"), exception=YAML_IMP_ERR)
def execute_module(self):
raise NotImplementedError()
return True, result
def fail(self, msg=None):
self.fail_json(msg=msg)
def _wait_for(self, resource, name, namespace, predicate, sleep, timeout, state):
start = datetime.now()
def _wait_for_elapsed():
return (datetime.now() - start).seconds
response = None
while _wait_for_elapsed() < timeout:
try:
response = resource.get(name=name, namespace=namespace)
if predicate(response):
if response:
return True, response.to_dict(), _wait_for_elapsed()
else:
return True, {}, _wait_for_elapsed()
time.sleep(sleep)
except NotFoundError:
if state == 'absent':
return True, {}, _wait_for_elapsed()
if response:
response = response.to_dict()
return False, response, _wait_for_elapsed()
def wait(self, resource, definition, sleep, timeout, state='present', condition=None):
def _deployment_ready(deployment):
# FIXME: frustratingly bool(deployment.status) is True even if status is empty
# Furthermore deployment.status.availableReplicas == deployment.status.replicas == None if status is empty
# deployment.status.replicas is None is perfectly ok if desired replicas == 0
# Scaling up means that we also need to check that we're not in a
# situation where status.replicas == status.availableReplicas
# but spec.replicas != status.replicas
return (deployment.status
and deployment.spec.replicas == (deployment.status.replicas or 0)
and deployment.status.availableReplicas == deployment.status.replicas
and deployment.status.observedGeneration == deployment.metadata.generation
and not deployment.status.unavailableReplicas)
def _pod_ready(pod):
return (pod.status and pod.status.containerStatuses is not None
and all([container.ready for container in pod.status.containerStatuses]))
def _daemonset_ready(daemonset):
return (daemonset.status and daemonset.status.desiredNumberScheduled is not None
and daemonset.status.numberReady == daemonset.status.desiredNumberScheduled
and daemonset.status.observedGeneration == daemonset.metadata.generation
and not daemonset.status.unavailableReplicas)
def _custom_condition(resource):
if not resource.status or not resource.status.conditions:
return False
match = [x for x in resource.status.conditions if x.type == condition['type']]
if not match:
return False
# There should never be more than one condition of a specific type
match = match[0]
if match.status == 'Unknown':
if match.status == condition['status']:
if 'reason' not in condition:
return True
if condition['reason']:
return match.reason == condition['reason']
return False
status = True if match.status == 'True' else False
if status == condition['status']:
if condition.get('reason'):
return match.reason == condition['reason']
return True
return False
def _resource_absent(resource):
return not resource
waiter = dict(
Deployment=_deployment_ready,
DaemonSet=_daemonset_ready,
Pod=_pod_ready
)
kind = definition['kind']
if state == 'present' and not condition:
predicate = waiter.get(kind, lambda x: x)
elif state == 'present' and condition:
predicate = _custom_condition
else:
predicate = _resource_absent
return self._wait_for(resource, definition['metadata']['name'], definition['metadata'].get('namespace'), predicate, sleep, timeout, state)
def set_resource_definitions(self):
resource_definition = self.params.get('resource_definition')
self.resource_definitions = []
if resource_definition:
if isinstance(resource_definition, string_types):
try:
self.resource_definitions = yaml.safe_load_all(resource_definition)
except (IOError, yaml.YAMLError) as exc:
self.fail(msg="Error loading resource_definition: {0}".format(exc))
elif isinstance(resource_definition, list):
self.resource_definitions = resource_definition
else:
self.resource_definitions = [resource_definition]
src = self.params.get('src')
if src:
self.resource_definitions = self.load_resource_definitions(src)
try:
self.resource_definitions = [item for item in self.resource_definitions if item]
except AttributeError:
pass
if not resource_definition and not src:
implicit_definition = dict(
kind=self.kind,
apiVersion=self.api_version,
metadata=dict(name=self.name)
)
if self.namespace:
implicit_definition['metadata']['namespace'] = self.namespace
self.resource_definitions = [implicit_definition]
class KubernetesAnsibleModule(AnsibleModule, K8sAnsibleMixin):
# NOTE: This class KubernetesAnsibleModule is deprecated in favor of
# class K8sAnsibleMixin and will be removed 2.0.0 release.
# Please use K8sAnsibleMixin instead.
def __init__(self, *args, **kwargs):
kwargs['argument_spec'] = self.argspec
AnsibleModule.__init__(self, *args, **kwargs)
K8sAnsibleMixin.__init__(self, *args, **kwargs)
self.warn("class KubernetesAnsibleModule is deprecated"
" and will be removed in 2.0.0. Please use K8sAnsibleMixin instead.")

View File

@@ -20,32 +20,24 @@ from __future__ import absolute_import, division, print_function
__metaclass__ = type
import copy
from datetime import datetime
from distutils.version import LooseVersion
import time
import sys
import traceback
from ansible.module_utils.basic import missing_required_lib
from ansible_collections.community.kubernetes.plugins.module_utils.common import AUTH_ARG_SPEC, COMMON_ARG_SPEC
from ansible.module_utils.six import string_types
from ansible_collections.community.kubernetes.plugins.module_utils.common import KubernetesAnsibleModule
from ansible.module_utils.basic import missing_required_lib, AnsibleModule
from ansible.module_utils._text import to_native
from ansible.module_utils.common.dict_transformations import dict_merge
from ansible_collections.community.kubernetes.plugins.module_utils.common import (
AUTH_ARG_SPEC, COMMON_ARG_SPEC, RESOURCE_ARG_SPEC, NAME_ARG_SPEC, K8sAnsibleMixin)
try:
import yaml
from openshift.dynamic.exceptions import DynamicApiError, NotFoundError, ConflictError, ForbiddenError, KubernetesValidateMissing
import urllib3
except ImportError:
# Exceptions handled in common
pass
try:
import kubernetes_validate
HAS_KUBERNETES_VALIDATE = True
except ImportError:
HAS_KUBERNETES_VALIDATE = False
K8S_CONFIG_HASH_IMP_ERR = None
try:
@@ -63,7 +55,7 @@ except ImportError:
HAS_K8S_APPLY = False
class KubernetesRawModule(KubernetesAnsibleModule):
class KubernetesRawModule(K8sAnsibleMixin):
@property
def validate_spec(self):
@@ -84,6 +76,8 @@ class KubernetesRawModule(KubernetesAnsibleModule):
@property
def argspec(self):
argument_spec = copy.deepcopy(COMMON_ARG_SPEC)
argument_spec.update(copy.deepcopy(NAME_ARG_SPEC))
argument_spec.update(copy.deepcopy(RESOURCE_ARG_SPEC))
argument_spec.update(copy.deepcopy(AUTH_ARG_SPEC))
argument_spec['merge_type'] = dict(type='list', elements='str', choices=['json', 'merge', 'strategic-merge'])
argument_spec['wait'] = dict(type='bool', default=False)
@@ -104,15 +98,25 @@ class KubernetesRawModule(KubernetesAnsibleModule):
('merge_type', 'apply'),
]
KubernetesAnsibleModule.__init__(self, *args,
mutually_exclusive=mutually_exclusive,
supports_check_mode=True,
**kwargs)
module = AnsibleModule(
argument_spec=self.argspec,
mutually_exclusive=mutually_exclusive,
supports_check_mode=True,
)
self.module = module
self.check_mode = self.module.check_mode
self.params = self.module.params
self.fail_json = self.module.fail_json
self.fail = self.module.fail_json
self.exit_json = self.module.exit_json
super(KubernetesRawModule, self).__init__()
self.kind = k8s_kind or self.params.get('kind')
self.api_version = self.params.get('api_version')
self.name = self.params.get('name')
self.namespace = self.params.get('namespace')
resource_definition = self.params.get('resource_definition')
validate = self.params.get('validate')
if validate:
if LooseVersion(self.openshift_version) < LooseVersion("0.8.0"):
@@ -129,34 +133,7 @@ class KubernetesRawModule(KubernetesAnsibleModule):
if self.apply:
if not HAS_K8S_APPLY:
self.fail_json(msg=missing_required_lib("openshift >= 0.9.2", reason="for apply"))
if resource_definition:
if isinstance(resource_definition, string_types):
try:
self.resource_definitions = yaml.safe_load_all(resource_definition)
except (IOError, yaml.YAMLError) as exc:
self.fail(msg="Error loading resource_definition: {0}".format(exc))
elif isinstance(resource_definition, list):
self.resource_definitions = resource_definition
else:
self.resource_definitions = [resource_definition]
src = self.params.get('src')
if src:
self.resource_definitions = self.load_resource_definitions(src)
try:
self.resource_definitions = [item for item in self.resource_definitions if item]
except AttributeError:
pass
if not resource_definition and not src:
implicit_definition = dict(
kind=self.kind,
apiVersion=self.api_version,
metadata=dict(name=self.name)
)
if self.namespace:
implicit_definition['metadata']['namespace'] = self.namespace
self.resource_definitions = [implicit_definition]
self.set_resource_definitions()
def flatten_list_kind(self, list_resource, definitions):
flattened = []
@@ -178,9 +155,11 @@ class KubernetesRawModule(KubernetesAnsibleModule):
flattened_definitions = []
for definition in self.resource_definitions:
if definition is None:
continue
kind = definition.get('kind', self.kind)
api_version = definition.get('apiVersion', self.api_version)
if kind.endswith('List'):
if kind and kind.endswith('List'):
resource = self.find_resource(kind, api_version, fail=False)
flattened_definitions.extend(self.flatten_list_kind(resource, definition))
else:
@@ -274,6 +253,9 @@ class KubernetesRawModule(KubernetesAnsibleModule):
except DynamicApiError as exc:
self.fail_json(msg='Failed to retrieve requested object: {0}'.format(exc.body),
error=exc.status, status=exc.status, reason=exc.reason)
except Exception as exc:
self.fail_json(msg='Failed to retrieve requested object: {0}'.format(to_native(exc)),
error='', status='', reason='')
if state == 'absent':
result['method'] = "delete"
@@ -299,7 +281,11 @@ class KubernetesRawModule(KubernetesAnsibleModule):
else:
if self.apply:
if self.check_mode:
ignored, k8s_obj = apply_object(resource, definition)
ignored, patch = apply_object(resource, definition)
if existing:
k8s_obj = dict_merge(existing.to_dict(), patch)
else:
k8s_obj = patch
else:
try:
k8s_obj = resource.apply(definition, namespace=namespace).to_dict()
@@ -310,7 +296,7 @@ class KubernetesRawModule(KubernetesAnsibleModule):
self.fail_json(msg=msg, error=exc.status, status=exc.status, reason=exc.reason)
success = True
result['result'] = k8s_obj
if wait:
if wait and not self.check_mode:
success, result['result'], result['duration'] = self.wait(resource, definition, wait_sleep, wait_timeout, condition=wait_condition)
if existing:
existing = existing.to_dict()
@@ -369,7 +355,7 @@ class KubernetesRawModule(KubernetesAnsibleModule):
match, diffs = self.diff_objects(existing.to_dict(), k8s_obj)
success = True
result['result'] = k8s_obj
if wait:
if wait and not self.check_mode:
success, result['result'], result['duration'] = self.wait(resource, definition, wait_sleep, wait_timeout, condition=wait_condition)
match, diffs = self.diff_objects(existing.to_dict(), result['result'])
result['changed'] = not match
@@ -397,7 +383,7 @@ class KubernetesRawModule(KubernetesAnsibleModule):
success = True
result['result'] = k8s_obj
if wait:
if wait and not self.check_mode:
success, result['result'], result['duration'] = self.wait(resource, definition, wait_sleep, wait_timeout, condition=wait_condition)
match, diffs = self.diff_objects(existing.to_dict(), result['result'])
result['changed'] = not match
@@ -428,6 +414,12 @@ class KubernetesRawModule(KubernetesAnsibleModule):
msg += "\n" + "\n ".join(self.warnings)
error = dict(msg=msg, error=exc.status, status=exc.status, reason=exc.reason, warnings=self.warnings)
return None, error
except Exception as exc:
msg = "Failed to patch object: {0}".format(exc)
if self.warnings:
msg += "\n" + "\n ".join(self.warnings)
error = dict(msg=msg, error=to_native(exc), status='', reason='', warnings=self.warnings)
return None, error
def create_project_request(self, definition):
definition['kind'] = 'ProjectRequest'
@@ -443,83 +435,3 @@ class KubernetesRawModule(KubernetesAnsibleModule):
result['changed'] = True
result['method'] = 'create'
return result
def _wait_for(self, resource, name, namespace, predicate, sleep, timeout, state):
start = datetime.now()
def _wait_for_elapsed():
return (datetime.now() - start).seconds
response = None
while _wait_for_elapsed() < timeout:
try:
response = resource.get(name=name, namespace=namespace)
if predicate(response):
if response:
return True, response.to_dict(), _wait_for_elapsed()
else:
return True, {}, _wait_for_elapsed()
time.sleep(sleep)
except NotFoundError:
if state == 'absent':
return True, {}, _wait_for_elapsed()
if response:
response = response.to_dict()
return False, response, _wait_for_elapsed()
def wait(self, resource, definition, sleep, timeout, state='present', condition=None):
def _deployment_ready(deployment):
# FIXME: frustratingly bool(deployment.status) is True even if status is empty
# Furthermore deployment.status.availableReplicas == deployment.status.replicas == None if status is empty
return (deployment.status and deployment.status.replicas is not None and
deployment.status.availableReplicas == deployment.status.replicas and
deployment.status.observedGeneration == deployment.metadata.generation)
def _pod_ready(pod):
return (pod.status and pod.status.containerStatuses is not None and
all([container.ready for container in pod.status.containerStatuses]))
def _daemonset_ready(daemonset):
return (daemonset.status and daemonset.status.desiredNumberScheduled is not None and
daemonset.status.numberReady == daemonset.status.desiredNumberScheduled and
daemonset.status.observedGeneration == daemonset.metadata.generation)
def _custom_condition(resource):
if not resource.status or not resource.status.conditions:
return False
match = [x for x in resource.status.conditions if x.type == condition['type']]
if not match:
return False
# There should never be more than one condition of a specific type
match = match[0]
if match.status == 'Unknown':
if match.status == condition['status']:
if 'reason' not in condition:
return True
if condition['reason']:
return match.reason == condition['reason']
return False
status = True if match.status == 'True' else False
if status == condition['status']:
if condition.get('reason'):
return match.reason == condition['reason']
return True
return False
def _resource_absent(resource):
return not resource
waiter = dict(
Deployment=_deployment_ready,
DaemonSet=_daemonset_ready,
Pod=_pod_ready
)
kind = definition['kind']
if state == 'present' and not condition:
predicate = waiter.get(kind, lambda x: x)
elif state == 'present' and condition:
predicate = _custom_condition
else:
predicate = _resource_absent
return self._wait_for(resource, definition['metadata']['name'], definition['metadata'].get('namespace'), predicate, sleep, timeout, state)

View File

@@ -20,21 +20,15 @@ from __future__ import absolute_import, division, print_function
__metaclass__ = type
import copy
import math
import time
from ansible_collections.community.kubernetes.plugins.module_utils.common import AUTH_ARG_SPEC, COMMON_ARG_SPEC
from ansible_collections.community.kubernetes.plugins.module_utils.common import KubernetesAnsibleModule
from ansible.module_utils.six import string_types
from ansible.module_utils.basic import AnsibleModule
from ansible_collections.community.kubernetes.plugins.module_utils.common import (
AUTH_ARG_SPEC, RESOURCE_ARG_SPEC, NAME_ARG_SPEC, K8sAnsibleMixin)
try:
import yaml
from openshift import watch
from openshift.dynamic.client import ResourceInstance
from openshift.helper.exceptions import KubernetesException
except ImportError as exc:
class KubernetesException(Exception):
pass
from openshift.dynamic.exceptions import NotFoundError
except ImportError:
pass
SCALE_ARG_SPEC = {
@@ -46,7 +40,7 @@ SCALE_ARG_SPEC = {
}
class KubernetesAnsibleScaleModule(KubernetesAnsibleModule):
class KubernetesAnsibleScaleModule(K8sAnsibleMixin):
def __init__(self, k8s_kind=None, *args, **kwargs):
self.client = None
@@ -56,39 +50,25 @@ class KubernetesAnsibleScaleModule(KubernetesAnsibleModule):
('resource_definition', 'src'),
]
KubernetesAnsibleModule.__init__(self, *args,
mutually_exclusive=mutually_exclusive,
supports_check_mode=True,
**kwargs)
module = AnsibleModule(
argument_spec=self.argspec,
mutually_exclusive=mutually_exclusive,
supports_check_mode=True,
)
self.module = module
self.params = self.module.params
self.check_mode = self.module.check_mode
self.fail_json = self.module.fail_json
self.fail = self.module.fail_json
self.exit_json = self.module.exit_json
super(KubernetesAnsibleScaleModule, self).__init__()
self.kind = k8s_kind or self.params.get('kind')
self.api_version = self.params.get('api_version')
self.name = self.params.get('name')
self.namespace = self.params.get('namespace')
resource_definition = self.params.get('resource_definition')
if resource_definition:
if isinstance(resource_definition, string_types):
try:
self.resource_definitions = yaml.safe_load_all(resource_definition)
except (IOError, yaml.YAMLError) as exc:
self.fail(msg="Error loading resource_definition: {0}".format(exc))
elif isinstance(resource_definition, list):
self.resource_definitions = resource_definition
else:
self.resource_definitions = [resource_definition]
src = self.params.get('src')
if src:
self.resource_definitions = self.load_resource_definitions(src)
if not resource_definition and not src:
implicit_definition = dict(
kind=self.kind,
apiVersion=self.api_version,
metadata=dict(name=self.name)
)
if self.namespace:
implicit_definition['metadata']['namespace'] = self.namespace
self.resource_definitions = [implicit_definition]
self.set_resource_definitions()
def execute_module(self):
definition = self.resource_definitions[0]
@@ -107,14 +87,16 @@ class KubernetesAnsibleScaleModule(KubernetesAnsibleModule):
wait_time = self.params.get('wait_timeout')
existing = None
existing_count = None
return_attributes = dict(changed=False, result=dict())
return_attributes = dict(changed=False, result=dict(), diff=dict())
if wait:
return_attributes['duration'] = 0
resource = self.find_resource(kind, api_version, fail=True)
try:
existing = resource.get(name=name, namespace=namespace)
return_attributes['result'] = existing.to_dict()
except KubernetesException as exc:
except NotFoundError as exc:
self.fail_json(msg='Failed to retrieve requested object: {0}'.format(exc),
error=exc.value.get('status'))
@@ -137,108 +119,48 @@ class KubernetesAnsibleScaleModule(KubernetesAnsibleModule):
if not self.check_mode:
if self.kind == 'job':
existing.spec.parallelism = replicas
k8s_obj = resource.patch(existing.to_dict())
return_attributes['result'] = resource.patch(existing.to_dict()).to_dict()
else:
k8s_obj = self.scale(resource, existing, replicas, wait, wait_time)
return_attributes['result'] = k8s_obj.to_dict()
return_attributes = self.scale(resource, existing, replicas, wait, wait_time)
self.exit_json(**return_attributes)
@property
def argspec(self):
args = copy.deepcopy(COMMON_ARG_SPEC)
args.pop('state')
args.pop('force')
args = copy.deepcopy(SCALE_ARG_SPEC)
args.update(RESOURCE_ARG_SPEC)
args.update(NAME_ARG_SPEC)
args.update(AUTH_ARG_SPEC)
args.update(SCALE_ARG_SPEC)
return args
def scale(self, resource, existing_object, replicas, wait, wait_time):
name = existing_object.metadata.name
namespace = existing_object.metadata.namespace
kind = existing_object.kind
if not hasattr(resource, 'scale'):
self.fail_json(
msg="Cannot perform scale on resource of kind {0}".format(resource.kind)
)
scale_obj = {'metadata': {'name': name, 'namespace': namespace}, 'spec': {'replicas': replicas}}
scale_obj = {'kind': kind, 'metadata': {'name': name, 'namespace': namespace}, 'spec': {'replicas': replicas}}
return_obj = None
stream = None
if wait:
w, stream = self._create_stream(resource, namespace, wait_time)
existing = resource.get(name=name, namespace=namespace)
try:
resource.scale.patch(body=scale_obj)
except Exception as exc:
self.fail_json(
msg="Scale request failed: {0}".format(exc)
)
self.fail_json(msg="Scale request failed: {0}".format(exc))
if wait and stream is not None:
return_obj = self._read_stream(resource, w, stream, name, replicas)
k8s_obj = resource.get(name=name, namespace=namespace).to_dict()
match, diffs = self.diff_objects(existing.to_dict(), k8s_obj)
result = dict()
result['result'] = k8s_obj
result['changed'] = not match
result['diff'] = diffs
if not return_obj:
return_obj = self._wait_for_response(resource, name, namespace)
return return_obj
def _create_stream(self, resource, namespace, wait_time):
""" Create a stream of events for the object """
w = None
stream = None
try:
w = watch.Watch()
w._api_client = self.client.client
if namespace:
stream = w.stream(resource.get, serialize=False, namespace=namespace, timeout_seconds=wait_time)
else:
stream = w.stream(resource.get, serialize=False, namespace=namespace, timeout_seconds=wait_time)
except KubernetesException:
pass
return w, stream
def _read_stream(self, resource, watcher, stream, name, replicas):
""" Wait for ready_replicas to equal the requested number of replicas. """
return_obj = None
try:
for event in stream:
if event.get('object'):
obj = ResourceInstance(resource, event['object'])
if obj.metadata.name == name and hasattr(obj, 'status'):
if replicas == 0:
if not hasattr(obj.status, 'readyReplicas') or not obj.status.readyReplicas:
return_obj = obj
watcher.stop()
break
if hasattr(obj.status, 'readyReplicas') and obj.status.readyReplicas == replicas:
return_obj = obj
watcher.stop()
break
except Exception as exc:
self.fail_json(msg="Exception reading event stream: {0}".format(exc))
if not return_obj:
self.fail_json(msg="Error fetching the patched object. Try a higher wait_timeout value.")
if replicas and return_obj.status.readyReplicas is None:
self.fail_json(msg="Failed to fetch the number of ready replicas. Try a higher wait_timeout value.")
if replicas and return_obj.status.readyReplicas != replicas:
self.fail_json(msg="Number of ready replicas is {0}. Failed to reach {1} ready replicas within "
"the wait_timeout period.".format(return_obj.status.ready_replicas, replicas))
return return_obj
def _wait_for_response(self, resource, name, namespace):
""" Wait for an API response """
tries = 0
half = math.ceil(20 / 2)
obj = None
while tries <= half:
obj = resource.get(name=name, namespace=namespace)
if obj:
break
tries += 2
time.sleep(2)
return obj
if wait:
success, result['result'], result['duration'] = self.wait(resource, scale_obj, 5, wait_time)
if not success:
self.fail_json(msg="Resource scaling timed out", **result)
return result