mirror of
https://github.com/ansible-collections/kubernetes.core.git
synced 2026-03-27 05:43:02 +00:00
259 lines
9.8 KiB
Python
259 lines
9.8 KiB
Python
# Copyright [2017] [Red Hat, Inc.]
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
__metaclass__ = type
|
|
|
|
from collections import OrderedDict
|
|
import json
|
|
|
|
from ansible.module_utils.common.dict_transformations import dict_merge
|
|
from ansible_collections.kubernetes.core.plugins.module_utils.exceptions import ApplyException
|
|
|
|
try:
|
|
from kubernetes.dynamic.exceptions import NotFoundError
|
|
except ImportError:
|
|
pass
|
|
|
|
|
|
LAST_APPLIED_CONFIG_ANNOTATION = 'kubectl.kubernetes.io/last-applied-configuration'
|
|
|
|
POD_SPEC_SUFFIXES = {
|
|
'containers': 'name',
|
|
'initContainers': 'name',
|
|
'ephemeralContainers': 'name',
|
|
'volumes': 'name',
|
|
'imagePullSecrets': 'name',
|
|
'containers.volumeMounts': 'mountPath',
|
|
'containers.volumeDevices': 'devicePath',
|
|
'containers.env': 'name',
|
|
'containers.ports': 'containerPort',
|
|
'initContainers.volumeMounts': 'mountPath',
|
|
'initContainers.volumeDevices': 'devicePath',
|
|
'initContainers.env': 'name',
|
|
'initContainers.ports': 'containerPort',
|
|
'ephemeralContainers.volumeMounts': 'mountPath',
|
|
'ephemeralContainers.volumeDevices': 'devicePath',
|
|
'ephemeralContainers.env': 'name',
|
|
'ephemeralContainers.ports': 'containerPort',
|
|
}
|
|
|
|
POD_SPEC_PREFIXES = [
|
|
'Pod.spec',
|
|
'Deployment.spec.template.spec',
|
|
'DaemonSet.spec.template.spec',
|
|
'StatefulSet.spec.template.spec',
|
|
'Job.spec.template.spec',
|
|
'Cronjob.spec.jobTemplate.spec.template.spec',
|
|
]
|
|
|
|
# patch merge keys taken from generated.proto files under
|
|
# staging/src/k8s.io/api in kubernetes/kubernetes
|
|
STRATEGIC_MERGE_PATCH_KEYS = {
|
|
'Service.spec.ports': 'port',
|
|
'ServiceAccount.secrets': 'name',
|
|
'ValidatingWebhookConfiguration.webhooks': 'name',
|
|
'MutatingWebhookConfiguration.webhooks': 'name',
|
|
}
|
|
|
|
STRATEGIC_MERGE_PATCH_KEYS.update(
|
|
{"%s.%s" % (prefix, key): value
|
|
for prefix in POD_SPEC_PREFIXES
|
|
for key, value in POD_SPEC_SUFFIXES.items()}
|
|
)
|
|
|
|
|
|
def annotate(desired):
|
|
return dict(
|
|
metadata=dict(
|
|
annotations={
|
|
LAST_APPLIED_CONFIG_ANNOTATION: json.dumps(desired, separators=(',', ':'), indent=None, sort_keys=True)
|
|
}
|
|
)
|
|
)
|
|
|
|
|
|
def apply_patch(actual, desired):
|
|
last_applied = actual['metadata'].get('annotations', {}).get(LAST_APPLIED_CONFIG_ANNOTATION)
|
|
|
|
if last_applied:
|
|
# ensure that last_applied doesn't come back as a dict of unicode key/value pairs
|
|
# json.loads can be used if we stop supporting python 2
|
|
last_applied = json.loads(last_applied)
|
|
patch = merge(dict_merge(last_applied, annotate(last_applied)),
|
|
dict_merge(desired, annotate(desired)), actual)
|
|
if patch:
|
|
return actual, patch
|
|
else:
|
|
return actual, actual
|
|
else:
|
|
return actual, dict_merge(desired, annotate(desired))
|
|
|
|
|
|
def apply_object(resource, definition):
|
|
try:
|
|
actual = resource.get(name=definition['metadata']['name'], namespace=definition['metadata'].get('namespace'))
|
|
except NotFoundError:
|
|
return None, dict_merge(definition, annotate(definition))
|
|
return apply_patch(actual.to_dict(), definition)
|
|
|
|
|
|
def apply(resource, definition):
|
|
existing, desired = apply_object(resource, definition)
|
|
if not existing:
|
|
return resource.create(body=desired, namespace=definition['metadata'].get('namespace'))
|
|
if existing == desired:
|
|
return resource.get(name=definition['metadata']['name'], namespace=definition['metadata'].get('namespace'))
|
|
return resource.patch(body=desired,
|
|
name=definition['metadata']['name'],
|
|
namespace=definition['metadata'].get('namespace'),
|
|
content_type='application/merge-patch+json')
|
|
|
|
|
|
# The patch is the difference from actual to desired without deletions, plus deletions
|
|
# from last_applied to desired. To find it, we compute deletions, which are the deletions from
|
|
# last_applied to desired, and delta, which is the difference from actual to desired without
|
|
# deletions, and then apply delta to deletions as a patch, which should be strictly additive.
|
|
def merge(last_applied, desired, actual, position=None):
|
|
deletions = get_deletions(last_applied, desired)
|
|
delta = get_delta(last_applied, actual, desired, position or desired['kind'])
|
|
return dict_merge(deletions, delta)
|
|
|
|
|
|
def list_to_dict(lst, key, position):
|
|
result = OrderedDict()
|
|
for item in lst:
|
|
try:
|
|
result[item[key]] = item
|
|
except KeyError:
|
|
raise ApplyException("Expected key '%s' not found in position %s" % (key, position))
|
|
return result
|
|
|
|
|
|
# list_merge applies a strategic merge to a set of lists if the patchMergeKey is known
|
|
# each item in the list is compared based on the patchMergeKey - if two values with the
|
|
# same patchMergeKey differ, we take the keys that are in last applied, compare the
|
|
# actual and desired for those keys, and update if any differ
|
|
def list_merge(last_applied, actual, desired, position):
|
|
result = list()
|
|
if position in STRATEGIC_MERGE_PATCH_KEYS and last_applied:
|
|
patch_merge_key = STRATEGIC_MERGE_PATCH_KEYS[position]
|
|
last_applied_dict = list_to_dict(last_applied, patch_merge_key, position)
|
|
actual_dict = list_to_dict(actual, patch_merge_key, position)
|
|
desired_dict = list_to_dict(desired, patch_merge_key, position)
|
|
for key in desired_dict:
|
|
if key not in actual_dict or key not in last_applied_dict:
|
|
result.append(desired_dict[key])
|
|
else:
|
|
patch = merge(last_applied_dict[key], desired_dict[key], actual_dict[key], position)
|
|
result.append(dict_merge(actual_dict[key], patch))
|
|
for key in actual_dict:
|
|
if key not in desired_dict and key not in last_applied_dict:
|
|
result.append(actual_dict[key])
|
|
return result
|
|
else:
|
|
return desired
|
|
|
|
|
|
def recursive_list_diff(list1, list2, position=None):
|
|
result = (list(), list())
|
|
if position in STRATEGIC_MERGE_PATCH_KEYS:
|
|
patch_merge_key = STRATEGIC_MERGE_PATCH_KEYS[position]
|
|
dict1 = list_to_dict(list1, patch_merge_key, position)
|
|
dict2 = list_to_dict(list2, patch_merge_key, position)
|
|
dict1_keys = set(dict1.keys())
|
|
dict2_keys = set(dict2.keys())
|
|
for key in dict1_keys - dict2_keys:
|
|
result[0].append(dict1[key])
|
|
for key in dict2_keys - dict1_keys:
|
|
result[1].append(dict2[key])
|
|
for key in dict1_keys & dict2_keys:
|
|
diff = recursive_diff(dict1[key], dict2[key], position)
|
|
if diff:
|
|
# reinsert patch merge key to relate changes in other keys to
|
|
# a specific list element
|
|
diff[0].update({patch_merge_key: dict1[key][patch_merge_key]})
|
|
diff[1].update({patch_merge_key: dict2[key][patch_merge_key]})
|
|
result[0].append(diff[0])
|
|
result[1].append(diff[1])
|
|
if result[0] or result[1]:
|
|
return result
|
|
elif list1 != list2:
|
|
return (list1, list2)
|
|
return None
|
|
|
|
|
|
def recursive_diff(dict1, dict2, position=None):
|
|
if not position:
|
|
if 'kind' in dict1 and dict1.get('kind') == dict2.get('kind'):
|
|
position = dict1['kind']
|
|
left = dict((k, v) for (k, v) in dict1.items() if k not in dict2)
|
|
right = dict((k, v) for (k, v) in dict2.items() if k not in dict1)
|
|
for k in (set(dict1.keys()) & set(dict2.keys())):
|
|
if position:
|
|
this_position = "%s.%s" % (position, k)
|
|
if isinstance(dict1[k], dict) and isinstance(dict2[k], dict):
|
|
result = recursive_diff(dict1[k], dict2[k], this_position)
|
|
if result:
|
|
left[k] = result[0]
|
|
right[k] = result[1]
|
|
elif isinstance(dict1[k], list) and isinstance(dict2[k], list):
|
|
result = recursive_list_diff(dict1[k], dict2[k], this_position)
|
|
if result:
|
|
left[k] = result[0]
|
|
right[k] = result[1]
|
|
elif dict1[k] != dict2[k]:
|
|
left[k] = dict1[k]
|
|
right[k] = dict2[k]
|
|
if left or right:
|
|
return left, right
|
|
else:
|
|
return None
|
|
|
|
|
|
def get_deletions(last_applied, desired):
|
|
patch = {}
|
|
for k, last_applied_value in last_applied.items():
|
|
desired_value = desired.get(k)
|
|
if isinstance(last_applied_value, dict) and isinstance(desired_value, dict):
|
|
p = get_deletions(last_applied_value, desired_value)
|
|
if p:
|
|
patch[k] = p
|
|
elif last_applied_value != desired_value:
|
|
patch[k] = desired_value
|
|
return patch
|
|
|
|
|
|
def get_delta(last_applied, actual, desired, position=None):
|
|
patch = {}
|
|
|
|
for k, desired_value in desired.items():
|
|
if position:
|
|
this_position = "%s.%s" % (position, k)
|
|
actual_value = actual.get(k)
|
|
if actual_value is None:
|
|
patch[k] = desired_value
|
|
elif isinstance(desired_value, dict):
|
|
p = get_delta(last_applied.get(k, {}), actual_value, desired_value, this_position)
|
|
if p:
|
|
patch[k] = p
|
|
elif isinstance(desired_value, list):
|
|
p = list_merge(last_applied.get(k, []), actual_value, desired_value, this_position)
|
|
if p:
|
|
patch[k] = [item for item in p if item is not None]
|
|
elif actual_value != desired_value:
|
|
patch[k] = desired_value
|
|
return patch
|