openshift adm prune images (#133)

This commit is contained in:
abikouo
2022-01-24 15:46:23 +01:00
committed by GitHub
parent 0a1a647e37
commit fc4a979762
12 changed files with 2618 additions and 0 deletions

View File

@@ -0,0 +1,513 @@
#!/usr/bin/env python
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from datetime import datetime, timezone, timedelta
import traceback
import copy
from ansible.module_utils._text import to_native
from ansible.module_utils.parsing.convert_bool import boolean
from ansible.module_utils.six import iteritems
try:
from ansible_collections.kubernetes.core.plugins.module_utils.common import (
K8sAnsibleMixin,
get_api_client,
)
HAS_KUBERNETES_COLLECTION = True
except ImportError as e:
HAS_KUBERNETES_COLLECTION = False
k8s_collection_import_exception = e
K8S_COLLECTION_ERROR = traceback.format_exc()
from ansible_collections.community.okd.plugins.module_utils.openshift_images_common import (
OpenShiftAnalyzeImageStream,
get_image_blobs,
is_too_young_object,
is_created_after,
)
from ansible_collections.community.okd.plugins.module_utils.openshift_docker_image import (
parse_docker_image_ref,
convert_storage_to_bytes,
)
try:
from kubernetes import client
from kubernetes.client import rest
from kubernetes.dynamic.exceptions import (
DynamicApiError,
NotFoundError,
ApiException
)
except ImportError:
pass
ApiConfiguration = {
"LimitRange": "v1",
"Pod": "v1",
"ReplicationController": "v1",
"DaemonSet": "apps/v1",
"Deployment": "apps/v1",
"ReplicaSet": "apps/v1",
"StatefulSet": "apps/v1",
"Job": "batch/v1",
"CronJob": "batch/v1beta1",
"DeploymentConfig": "apps.openshift.io/v1",
"BuildConfig": "build.openshift.io/v1",
"Build": "build.openshift.io/v1",
"Image": "image.openshift.io/v1",
"ImageStream": "image.openshift.io/v1",
}
def read_object_annotation(obj, name):
return obj["metadata"]["annotations"].get(name)
def determine_host_registry(module, images, image_streams):
# filter managed images
def _f_managed_images(obj):
value = read_object_annotation(obj, "openshift.io/image.managed")
return boolean(value) if value is not None else False
managed_images = list(filter(_f_managed_images, images))
# Be sure to pick up the newest managed image which should have an up to date information
sorted_images = sorted(managed_images,
key=lambda x: x["metadata"]["creationTimestamp"],
reverse=True)
docker_image_ref = ""
if len(sorted_images) > 0:
docker_image_ref = sorted_images[0].get("dockerImageReference", "")
else:
# 2nd try to get the pull spec from any image stream
# Sorting by creation timestamp may not get us up to date info. Modification time would be much
sorted_image_streams = sorted(image_streams,
key=lambda x: x["metadata"]["creationTimestamp"],
reverse=True)
for i_stream in sorted_image_streams:
docker_image_ref = i_stream["status"].get("dockerImageRepository", "")
if len(docker_image_ref) > 0:
break
if len(docker_image_ref) == 0:
module.exit_json(changed=False, result="no managed image found")
result, error = parse_docker_image_ref(docker_image_ref, module)
return result['hostname']
class OpenShiftAdmPruneImages(K8sAnsibleMixin):
def __init__(self, module):
self.module = module
self.fail_json = self.module.fail_json
self.exit_json = self.module.exit_json
if not HAS_KUBERNETES_COLLECTION:
self.fail_json(
msg="The kubernetes.core collection must be installed",
exception=K8S_COLLECTION_ERROR,
error=to_native(k8s_collection_import_exception),
)
super(OpenShiftAdmPruneImages, self).__init__(self.module)
self.params = self.module.params
self.check_mode = self.module.check_mode
try:
self.client = get_api_client(self.module)
except DynamicApiError as e:
self.fail_json(
msg="Failed to get kubernetes client.",
reason=e.reason,
status=e.status,
)
except Exception as e:
self.fail_json(
msg="Failed to get kubernetes client.",
error=to_native(e)
)
self.max_creation_timestamp = self.get_max_creation_timestamp()
self._rest_client = None
self.registryhost = self.params.get('registry_url')
self.changed = False
def list_objects(self):
result = {}
for kind, version in iteritems(ApiConfiguration):
namespace = None
if self.params.get("namespace") and kind.lower() == "imagestream":
namespace = self.params.get("namespace")
try:
result[kind] = self.kubernetes_facts(kind=kind,
api_version=version,
namespace=namespace).get('resources')
except DynamicApiError as e:
self.fail_json(
msg="An error occurred while trying to list objects.",
reason=e.reason,
status=e.status,
)
except Exception as e:
self.fail_json(
msg="An error occurred while trying to list objects.",
error=to_native(e)
)
return result
def get_max_creation_timestamp(self):
result = None
if self.params.get("keep_younger_than"):
dt_now = datetime.now(timezone.utc).replace(tzinfo=None)
result = dt_now - timedelta(minutes=self.params.get("keep_younger_than"))
return result
@property
def rest_client(self):
if not self._rest_client:
configuration = copy.deepcopy(self.client.configuration)
validate_certs = self.params.get('registry_validate_certs')
ssl_ca_cert = self.params.get('registry_ca_cert')
if validate_certs is not None:
configuration.verify_ssl = validate_certs
if ssl_ca_cert is not None:
configuration.ssl_ca_cert = ssl_ca_cert
self._rest_client = rest.RESTClientObject(configuration)
return self._rest_client
def delete_from_registry(self, url):
try:
response = self.rest_client.DELETE(url=url, headers=self.client.configuration.api_key)
if response.status == 404:
# Unable to delete layer
return None
# non-2xx/3xx response doesn't cause an error
if response.status < 200 or response.status >= 400:
return None
if response.status != 202 and response.status != 204:
self.fail_json(
msg="Delete URL {0}: Unexpected status code in response: {1}".format(
response.status, url),
reason=response.reason
)
return None
except ApiException as e:
if e.status != 404:
self.fail_json(
msg="Failed to delete URL: %s" % url,
reason=e.reason,
status=e.status,
)
except Exception as e:
self.fail_json(msg="Delete URL {0}: {1}".format(url, type(e)))
def delete_layers_links(self, path, layers):
for layer in layers:
url = "%s/v2/%s/blobs/%s" % (self.registryhost, path, layer)
self.changed = True
if not self.check_mode:
self.delete_from_registry(url=url)
def delete_manifests(self, path, digests):
for digest in digests:
url = "%s/v2/%s/manifests/%s" % (self.registryhost, path, digest)
self.changed = True
if not self.check_mode:
self.delete_from_registry(url=url)
def delete_blobs(self, blobs):
for blob in blobs:
self.changed = True
url = "%s/admin/blobs/%s" % (self.registryhost, blob)
if not self.check_mode:
self.delete_from_registry(url=url)
def update_image_stream_status(self, definition):
kind = definition["kind"]
api_version = definition["apiVersion"]
namespace = definition["metadata"]["namespace"]
name = definition["metadata"]["name"]
self.changed = True
result = definition
if not self.check_mode:
try:
result = self.client.request(
"PUT",
"/apis/{api_version}/namespaces/{namespace}/imagestreams/{name}/status".format(
api_version=api_version,
namespace=namespace,
name=name
),
body=definition,
content_type="application/json",
).to_dict()
except DynamicApiError as exc:
msg = "Failed to patch object: kind={0} {1}/{2}".format(
kind, namespace, name
)
self.fail_json(msg=msg, status=exc.status, reason=exc.reason)
except Exception as exc:
msg = "Failed to patch object kind={0} {1}/{2} due to: {3}".format(
kind, namespace, name, exc
)
self.fail_json(msg=msg, error=to_native(exc))
return result
def delete_image(self, image):
kind = "Image"
api_version = "image.openshift.io/v1"
resource = self.find_resource(kind=kind, api_version=api_version)
name = image["metadata"]["name"]
self.changed = True
if not self.check_mode:
try:
delete_options = client.V1DeleteOptions(grace_period_seconds=0)
return resource.delete(name=name, body=delete_options).to_dict()
except NotFoundError:
pass
except DynamicApiError as exc:
self.fail_json(
msg="Failed to delete object %s/%s due to: %s" % (
kind, name, exc.body
),
reason=exc.reason,
status=exc.status
)
else:
existing = resource.get(name=name)
if existing:
existing = existing.to_dict()
return existing
def exceeds_limits(self, namespace, image):
if namespace not in self.limit_range:
return False
docker_image_metadata = image.get("dockerImageMetadata")
if not docker_image_metadata:
return False
docker_image_size = docker_image_metadata["Size"]
for limit in self.limit_range.get(namespace):
for item in limit["spec"]["limits"]:
if item["type"] != "openshift.io/Image":
continue
limit_max = item["max"]
if not limit_max:
continue
storage = limit_max["storage"]
if not storage:
continue
if convert_storage_to_bytes(storage) < docker_image_size:
# image size is larger than the permitted limit range max size
return True
return False
def prune_image_stream_tag(self, stream, tag_event_list):
manifests_to_delete, images_to_delete = [], []
filtered_items = []
tag_event_items = tag_event_list["items"] or []
prune_over_size_limit = self.params.get("prune_over_size_limit")
stream_namespace = stream["metadata"]["namespace"]
stream_name = stream["metadata"]["name"]
for idx, item in enumerate(tag_event_items):
if is_created_after(item["created"], self.max_creation_timestamp):
filtered_items.append(item)
continue
if idx == 0:
istag = "%s/%s:%s" % (stream_namespace,
stream_name,
tag_event_list["tag"])
if istag in self.used_tags:
# keeping because tag is used
filtered_items.append(item)
continue
if item["image"] not in self.image_mapping:
# There are few options why the image may not be found:
# 1. the image is deleted manually and this record is no longer valid
# 2. the imagestream was observed before the image creation, i.e.
# this record was created recently and it should be protected by keep_younger_than
continue
image = self.image_mapping[item["image"]]
# check prune over limit size
if prune_over_size_limit and not self.exceeds_limits(stream_namespace, image):
filtered_items.append(item)
continue
image_ref = "%s/%s@%s" % (stream_namespace,
stream_name,
item["image"])
if image_ref in self.used_images:
# keeping because tag is used
filtered_items.append(item)
continue
images_to_delete.append(item["image"])
if self.params.get('prune_registry'):
manifests_to_delete.append(image["metadata"]["name"])
path = stream_namespace + "/" + stream_name
image_blobs, err = get_image_blobs(image)
if not err:
self.delete_layers_links(path, image_blobs)
return filtered_items, manifests_to_delete, images_to_delete
def prune_image_streams(self, stream):
name = stream['metadata']['namespace'] + "/" + stream['metadata']['name']
if is_too_young_object(stream, self.max_creation_timestamp):
# keeping all images because of image stream too young
return None, []
facts = self.kubernetes_facts(kind="ImageStream",
api_version=ApiConfiguration.get("ImageStream"),
name=stream["metadata"]["name"],
namespace=stream["metadata"]["namespace"])
image_stream = facts.get('resources')
if len(image_stream) != 1:
# skipping because it does not exist anymore
return None, []
stream = image_stream[0]
namespace = self.params.get("namespace")
stream_to_update = not namespace or (stream["metadata"]["namespace"] == namespace)
manifests_to_delete, images_to_delete = [], []
deleted_items = False
# Update Image stream tag
if stream_to_update:
tags = stream["status"].get("tags", [])
for idx, tag_event_list in enumerate(tags):
(
filtered_tag_event,
tag_manifests_to_delete,
tag_images_to_delete
) = self.prune_image_stream_tag(stream, tag_event_list)
stream['status']['tags'][idx]['items'] = filtered_tag_event
manifests_to_delete += tag_manifests_to_delete
images_to_delete += tag_images_to_delete
deleted_items = deleted_items or (len(tag_images_to_delete) > 0)
# Deleting tags without items
tags = []
for tag in stream["status"].get("tags", []):
if tag['items'] is None or len(tag['items']) == 0:
continue
tags.append(tag)
stream['status']['tags'] = tags
result = None
# Update ImageStream
if stream_to_update:
if deleted_items:
result = self.update_image_stream_status(stream)
if self.params.get("prune_registry"):
self.delete_manifests(name, manifests_to_delete)
return result, images_to_delete
def prune_images(self, image):
if not self.params.get("all_images"):
if read_object_annotation(image, "openshift.io/image.managed") != "true":
# keeping external image because all_images is set to false
# pruning only managed images
return None
if is_too_young_object(image, self.max_creation_timestamp):
# keeping because of keep_younger_than
return None
# Deleting image from registry
if self.params.get("prune_registry"):
image_blobs, err = get_image_blobs(image)
if err:
self.fail_json(msg=err)
# add blob for image name
image_blobs.append(image["metadata"]["name"])
self.delete_blobs(image_blobs)
# Delete image from cluster
return self.delete_image(image)
def execute_module(self):
resources = self.list_objects()
if not self.check_mode and self.params.get('prune_registry'):
if not self.registryhost:
self.registryhost = determine_host_registry(self.module, resources['Image'], resources['ImageStream'])
# validate that host has a scheme
if "://" not in self.registryhost:
self.registryhost = "https://" + self.registryhost
# Analyze Image Streams
analyze_ref = OpenShiftAnalyzeImageStream(
ignore_invalid_refs=self.params.get('ignore_invalid_refs'),
max_creation_timestamp=self.max_creation_timestamp,
module=self.module
)
self.used_tags, self.used_images, error = analyze_ref.analyze_image_stream(resources)
if error:
self.fail_json(msg=error)
# Create image mapping
self.image_mapping = {}
for m in resources["Image"]:
self.image_mapping[m["metadata"]["name"]] = m
# Create limit range mapping
self.limit_range = {}
for limit in resources["LimitRange"]:
namespace = limit["metadata"]["namespace"]
if namespace not in self.limit_range:
self.limit_range[namespace] = []
self.limit_range[namespace].append(limit)
# Stage 1: delete history from image streams
updated_image_streams = []
deleted_tags_images = []
updated_is_mapping = {}
for stream in resources['ImageStream']:
result, images_to_delete = self.prune_image_streams(stream)
if result:
updated_is_mapping[result["metadata"]["namespace"] + "/" + result["metadata"]["name"]] = result
updated_image_streams.append(result)
deleted_tags_images += images_to_delete
# Create a list with images referenced on image stream
self.referenced_images = []
for item in self.kubernetes_facts(kind="ImageStream", api_version="image.openshift.io/v1")["resources"]:
name = "%s/%s" % (item["metadata"]["namespace"], item["metadata"]["name"])
if name in updated_is_mapping:
item = updated_is_mapping[name]
for tag in item["status"].get("tags", []):
self.referenced_images += [t["image"] for t in tag["items"] or []]
# Stage 2: delete images
images = []
images_to_delete = [x["metadata"]["name"] for x in resources['Image']]
if self.params.get("namespace") is not None:
# When namespace is defined, prune only images that were referenced by ImageStream
# from the corresponding namespace
images_to_delete = deleted_tags_images
for name in images_to_delete:
if name in self.referenced_images:
# The image is referenced in one or more Image stream
continue
if name not in self.image_mapping:
# The image is not existing anymore
continue
result = self.prune_images(self.image_mapping[name])
if result:
images.append(result)
result = {
"changed": self.changed,
"deleted_images": images,
"updated_image_streams": updated_image_streams,
}
self.exit_json(**result)

View File

@@ -0,0 +1,103 @@
#!/usr/bin/env python
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import re
def convert_storage_to_bytes(value):
keys = {
"Ki": 1024,
"Mi": 1024 * 1024,
"Gi": 1024 * 1024 * 1024,
"Ti": 1024 * 1024 * 1024 * 1024,
"Pi": 1024 * 1024 * 1024 * 1024 * 1024,
"Ei": 1024 * 1024 * 1024 * 1024 * 1024 * 1024,
}
for k in keys:
if value.endswith(k) or value.endswith(k[0]):
idx = value.find(k[0])
return keys.get(k) * int(value[:idx])
return int(value)
def is_valid_digest(digest):
digest_algorithm_size = dict(
sha256=64, sha384=96, sha512=128,
)
m = re.match(r'[a-zA-Z0-9-_+.]+:[a-fA-F0-9]+', digest)
if not m:
return "Docker digest does not match expected format %s" % digest
idx = digest.find(':')
# case: "sha256:" with no hex.
if idx < 0 or idx == (len(digest) - 1):
return "Invalid docker digest %s, no hex value define" % digest
algorithm = digest[:idx]
if algorithm not in digest_algorithm_size:
return "Unsupported digest algorithm value %s for digest %s" % (algorithm, digest)
hex_value = digest[idx + 1:]
if len(hex_value) != digest_algorithm_size.get(algorithm):
return "Invalid length for digest hex expected %d found %d (digest is %s)" % (
digest_algorithm_size.get(algorithm), len(hex_value), digest
)
def parse_docker_image_ref(image_ref, module=None):
"""
Docker Grammar Reference
Reference => name [ ":" tag ] [ "@" digest ]
name => [hostname '/'] component ['/' component]*
hostname => hostcomponent ['.' hostcomponent]* [':' port-number]
hostcomponent => /([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9-]*[a-zA-Z0-9])/
port-number => /[0-9]+/
component => alpha-numeric [separator alpha-numeric]*
alpha-numeric => /[a-z0-9]+/
separator => /[_.]|__|[-]*/
"""
idx = image_ref.find("/")
def _contains_any(src, values):
return any(x in src for x in values)
result = {
"tag": None, "digest": None
}
default_domain = "docker.io"
if idx < 0 or (not _contains_any(image_ref[:idx], ":.") and image_ref[:idx] != "localhost"):
result["hostname"], remainder = default_domain, image_ref
else:
result["hostname"], remainder = image_ref[:idx], image_ref[idx + 1:]
# Parse remainder information
idx = remainder.find("@")
if idx > 0 and len(remainder) > (idx + 1):
# docker image reference with digest
component, result["digest"] = remainder[:idx], remainder[idx + 1:]
err = is_valid_digest(result["digest"])
if err:
if module:
module.fail_json(msg=err)
return None, err
else:
idx = remainder.find(":")
if idx > 0 and len(remainder) > (idx + 1):
# docker image reference with tag
component, result["tag"] = remainder[:idx], remainder[idx + 1:]
else:
# name only
component = remainder
v = component.split("/")
namespace = None
if len(v) > 1:
namespace = v[0]
result.update({
"namespace": namespace, "name": v[-1]
})
return result, None

View File

@@ -0,0 +1,232 @@
#!/usr/bin/env python
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from datetime import datetime
from ansible_collections.community.okd.plugins.module_utils.openshift_docker_image import (
parse_docker_image_ref,
)
from ansible.module_utils.six import iteritems
def get_image_blobs(image):
blobs = [layer["image"] for layer in image["dockerImageLayers"]]
docker_image_metadata = image.get("dockerImageMetadata")
if not docker_image_metadata:
return blobs, "failed to read metadata for image %s" % image["metadata"]["name"]
media_type_manifest = (
"application/vnd.docker.distribution.manifest.v2+json",
"application/vnd.oci.image.manifest.v1+json"
)
media_type_has_config = image['dockerImageManifestMediaType'] in media_type_manifest
docker_image_id = docker_image_metadata.get("Id")
if media_type_has_config and docker_image_id and len(docker_image_id) > 0:
blobs.append(docker_image_id)
return blobs, None
def is_created_after(creation_timestamp, max_creation_timestamp):
if not max_creation_timestamp:
return False
creationTimestamp = datetime.strptime(creation_timestamp, '%Y-%m-%dT%H:%M:%SZ')
return creationTimestamp > max_creation_timestamp
def is_too_young_object(obj, max_creation_timestamp):
return is_created_after(obj['metadata']['creationTimestamp'],
max_creation_timestamp)
class OpenShiftAnalyzeImageStream(object):
def __init__(self, ignore_invalid_refs, max_creation_timestamp, module):
self.max_creationTimestamp = max_creation_timestamp
self.used_tags = {}
self.used_images = {}
self.ignore_invalid_refs = ignore_invalid_refs
self.module = module
def analyze_reference_image(self, image, referrer):
result, error = parse_docker_image_ref(image, self.module)
if error:
return error
if len(result['hostname']) == 0 or len(result['namespace']) == 0:
# image reference does not match hostname/namespace/name pattern - skipping
return None
if not result['digest']:
# Attempt to dereference istag. Since we cannot be sure whether the reference refers to the
# integrated registry or not, we ignore the host part completely. As a consequence, we may keep
# image otherwise sentenced for a removal just because its pull spec accidentally matches one of
# our imagestreamtags.
# set the tag if empty
if result['tag'] == "":
result['tag'] = 'latest'
key = "%s/%s:%s" % (result['namespace'], result['name'], result['tag'])
if key not in self.used_tags:
self.used_tags[key] = []
self.used_tags[key].append(referrer)
else:
key = "%s/%s@%s" % (result['namespace'], result['name'], result['digest'])
if key not in self.used_images:
self.used_images[key] = []
self.used_images[key].append(referrer)
def analyze_refs_from_pod_spec(self, podSpec, referrer):
for container in podSpec.get('initContainers', []) + podSpec.get('containers', []):
image = container.get('image')
if len(image.strip()) == 0:
# Ignoring container because it has no reference to image
continue
err = self.analyze_reference_image(image, referrer)
if err:
return err
return None
def analyze_refs_from_pods(self, pods):
for pod in pods:
# A pod is only *excluded* from being added to the graph if its phase is not
# pending or running. Additionally, it has to be at least as old as the minimum
# age threshold defined by the algorithm.
too_young = is_too_young_object(pod, self.max_creationTimestamp)
if pod['status']['phase'] not in ("Running", "Pending") and too_young:
continue
referrer = {
"kind": pod["kind"],
"namespace": pod["metadata"]["namespace"],
"name": pod["metadata"]["name"],
}
err = self.analyze_refs_from_pod_spec(pod['spec'], referrer)
if err:
return err
return None
def analyze_refs_pod_creators(self, resources):
keys = (
"ReplicationController", "DeploymentConfig", "DaemonSet",
"Deployment", "ReplicaSet", "StatefulSet", "Job", "CronJob"
)
for k, objects in iteritems(resources):
if k not in keys:
continue
for obj in objects:
if k == 'CronJob':
spec = obj["spec"]["jobTemplate"]["spec"]["template"]["spec"]
else:
spec = obj["spec"]["template"]["spec"]
referrer = {
"kind": obj["kind"],
"namespace": obj["metadata"]["namespace"],
"name": obj["metadata"]["name"],
}
err = self.analyze_refs_from_pod_spec(spec, referrer)
if err:
return err
return None
def analyze_refs_from_strategy(self, build_strategy, namespace, referrer):
# Determine 'from' reference
def _determine_source_strategy():
for src in ('sourceStrategy', 'dockerStrategy', 'customStrategy'):
strategy = build_strategy.get(src)
if strategy:
return strategy.get('from')
return None
def _parse_image_stream_image_name(name):
v = name.split('@')
if len(v) != 2:
return None, None, "expected exactly one @ in the isimage name %s" % name
name = v[0]
tag = v[1]
if len(name) == 0 or len(tag) == 0:
return None, None, "image stream image name %s must have a name and ID" % name
return name, tag, None
def _parse_image_stream_tag_name(name):
if "@" in name:
return None, None, "%s is an image stream image, not an image stream tag" % name
v = name.split(":")
if len(v) != 2:
return None, None, "expected exactly one : delimiter in the istag %s" % name
name = v[0]
tag = v[1]
if len(name) == 0 or len(tag) == 0:
return None, None, "image stream tag name %s must have a name and a tag" % name
return name, tag, None
from_strategy = _determine_source_strategy()
if not from_strategy:
# Build strategy not found
return
if from_strategy.get('kind') == "DockerImage":
docker_image_ref = from_strategy.get('name').strip()
if len(docker_image_ref) > 0:
err = self.analyze_reference_image(docker_image_ref, referrer)
elif from_strategy.get('kind') == "ImageStreamImage":
name, tag, error = _parse_image_stream_image_name(from_strategy.get('name'))
if error:
if not self.ignore_invalid_refs:
return error
else:
namespace = from_strategy.get('namespace') or namespace
self.used_images.append({
'namespace': namespace,
'name': name,
'tag': tag
})
elif from_strategy.get('kind') == "ImageStreamTag":
name, tag, error = _parse_image_stream_tag_name(from_strategy.get('name'))
if error:
if not self.ignore_invalid_refs:
return error
else:
namespace = from_strategy.get('namespace') or namespace
self.used_tags.append({
'namespace': namespace,
'name': name,
'tag': tag
})
return None
def analyze_refs_from_build_strategy(self, resources):
# Json Path is always spec.strategy
keys = ("BuildConfig", "Build")
for k, objects in iteritems(resources):
if k not in keys:
continue
for obj in objects:
referrer = {
"kind": obj["kind"],
"namespace": obj["metadata"]["namespace"],
"name": obj["metadata"]["name"],
}
error = self.analyze_refs_from_strategy(obj['spec']['strategy'],
obj['metadata']['namespace'],
referrer)
if not error:
return "%s/%s/%s: %s" % (referrer["kind"], referrer["namespace"], referrer["name"], error)
def analyze_image_stream(self, resources):
# Analyze image reference from Pods
error = self.analyze_refs_from_pods(resources['Pod'])
if error:
return None, None, error
# Analyze image reference from Resources creating Pod
error = self.analyze_refs_pod_creators(resources)
if error:
return None, None, error
# Analyze image reference from Build/BuildConfig
error = self.analyze_refs_from_build_strategy(resources)
return self.used_tags, self.used_images, error

View File

@@ -0,0 +1,414 @@
#!/usr/bin/env python
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import traceback
import copy
from ansible.module_utils._text import to_native
from ansible.module_utils.parsing.convert_bool import boolean
from ansible.module_utils.six import string_types
try:
from ansible_collections.kubernetes.core.plugins.module_utils.common import (
K8sAnsibleMixin,
get_api_client,
)
HAS_KUBERNETES_COLLECTION = True
except ImportError as e:
HAS_KUBERNETES_COLLECTION = False
k8s_collection_import_exception = e
K8S_COLLECTION_ERROR = traceback.format_exc()
try:
from kubernetes.dynamic.exceptions import DynamicApiError
except ImportError:
pass
from ansible_collections.community.okd.plugins.module_utils.openshift_docker_image import (
parse_docker_image_ref,
)
err_stream_not_found_ref = "NotFound reference"
def follow_imagestream_tag_reference(stream, tag):
multiple = False
def _imagestream_has_tag():
for ref in stream["spec"].get("tags", []):
if ref["name"] == tag:
return ref
return None
def _imagestream_split_tag(name):
parts = name.split(":")
name = parts[0]
tag = ""
if len(parts) > 1:
tag = parts[1]
if len(tag) == 0:
tag = "latest"
return name, tag, len(parts) == 2
content = []
err_cross_stream_ref = "tag %s points to an imagestreamtag from another ImageStream" % tag
while True:
if tag in content:
return tag, None, multiple, "tag %s on the image stream is a reference to same tag" % tag
content.append(tag)
tag_ref = _imagestream_has_tag()
if not tag_ref:
return None, None, multiple, err_stream_not_found_ref
if not tag_ref.get("from") or tag_ref["from"]["kind"] != "ImageStreamTag":
return tag, tag_ref, multiple, None
if tag_ref["from"]["namespace"] != "" and tag_ref["from"]["namespace"] != stream["metadata"]["namespace"]:
return tag, None, multiple, err_cross_stream_ref
# The reference needs to be followed with two format patterns:
# a) sameis:sometag and b) sometag
if ":" in tag_ref["from"]["name"]:
name, tagref, result = _imagestream_split_tag(tag_ref["from"]["name"])
if not result:
return tag, None, multiple, "tag %s points to an invalid imagestreamtag" % tag
if name != stream["metadata"]["namespace"]:
# anotheris:sometag - this should not happen.
return tag, None, multiple, err_cross_stream_ref
# sameis:sometag - follow the reference as sometag
tag = tagref
else:
tag = tag_ref["from"]["name"]
multiple = True
class OpenShiftImportImage(K8sAnsibleMixin):
def __init__(self, module):
self.module = module
self.fail_json = self.module.fail_json
self.exit_json = self.module.exit_json
if not HAS_KUBERNETES_COLLECTION:
self.fail_json(
msg="The kubernetes.core collection must be installed",
exception=K8S_COLLECTION_ERROR,
error=to_native(k8s_collection_import_exception),
)
super(OpenShiftImportImage, self).__init__(self.module)
self.params = self.module.params
self.check_mode = self.module.check_mode
self.client = get_api_client(self.module)
self._rest_client = None
self.registryhost = self.params.get('registry_url')
self.changed = False
ref_policy = self.params.get("reference_policy")
ref_policy_type = None
if ref_policy == "source":
ref_policy_type = "Source"
elif ref_policy == "local":
ref_policy_type = "Local"
self.ref_policy = {
"type": ref_policy_type
}
self.validate_certs = self.params.get("validate_registry_certs")
self.cluster_resources = {}
def create_image_stream_import(self, stream):
isi = {
"apiVersion": "image.openshift.io/v1",
"kind": "ImageStreamImport",
"metadata": {
"name": stream["metadata"]["name"],
"namespace": stream["metadata"]["namespace"],
"resourceVersion": stream["metadata"].get("resourceVersion")
},
"spec": {
"import": True
}
}
annotations = stream.get("annotations", {})
insecure = boolean(annotations.get("openshift.io/image.insecureRepository", True))
if self.validate_certs is not None:
insecure = not self.validate_certs
return isi, insecure
def create_image_stream_import_all(self, stream, source):
isi, insecure = self.create_image_stream_import(stream)
isi["spec"]["repository"] = {
"from": {
"kind": "DockerImage",
"name": source,
},
"importPolicy": {
"insecure": insecure,
"scheduled": self.params.get("scheduled")
},
"referencePolicy": self.ref_policy,
}
return isi
def create_image_stream_import_tags(self, stream, tags):
isi, streamInsecure = self.create_image_stream_import(stream)
for k in tags:
insecure = streamInsecure
scheduled = self.params.get("scheduled")
old_tag = None
for t in stream.get("spec", {}).get("tags", []):
if t["name"] == k:
old_tag = t
break
if old_tag:
insecure = insecure or old_tag["importPolicy"].get("insecure")
scheduled = scheduled or old_tag["importPolicy"].get("scheduled")
images = isi["spec"].get("images", [])
images.append({
"from": {
"kind": "DockerImage",
"name": tags.get(k),
},
"to": {
"name": k
},
"importPolicy": {
"insecure": insecure,
"scheduled": scheduled
},
"referencePolicy": self.ref_policy,
})
isi["spec"]["images"] = images
return isi
def create_image_stream(self, ref):
"""
Create new ImageStream and accompanying ImageStreamImport
"""
source = self.params.get("source")
if not source:
source = ref["source"]
stream = dict(
apiVersion="image.openshift.io/v1",
kind="ImageStream",
metadata=dict(
name=ref["name"],
namespace=self.params.get("namespace"),
),
)
if self.params.get("all") and not ref["tag"]:
spec = dict(
dockerImageRepository=source
)
isi = self.create_image_stream_import_all(stream, source)
else:
spec = dict(
tags=[
{
"from": {
"kind": "DockerImage",
"name": source
},
"referencePolicy": self.ref_policy
}
]
)
tags = {ref["tag"]: source}
isi = self.create_image_stream_import_tags(stream, tags)
stream.update(
dict(spec=spec)
)
return stream, isi
def import_all(self, istream):
stream = copy.deepcopy(istream)
# Update ImageStream appropriately
source = self.params.get("source")
docker_image_repo = stream["spec"].get("dockerImageRepository")
if not source:
if docker_image_repo:
source = docker_image_repo
else:
tags = {}
for t in stream["spec"].get("tags", []):
if t.get("from") and t["from"].get("kind") == "DockerImage":
tags[t.get("name")] = t["from"].get("name")
if tags == {}:
msg = "image stream %s/%s does not have tags pointing to external container images" % (
stream["metadata"]["namespace"], stream["metadata"]["name"]
)
self.fail_json(msg=msg)
isi = self.create_image_stream_import_tags(stream, tags)
return stream, isi
if source != docker_image_repo:
stream["spec"]["dockerImageRepository"] = source
isi = self.create_image_stream_import_all(stream, source)
return stream, isi
def import_tag(self, stream, tag):
source = self.params.get("source")
# Follow any referential tags to the destination
final_tag, existing, multiple, err = follow_imagestream_tag_reference(stream, tag)
if err:
if err == err_stream_not_found_ref:
# Create a new tag
if not source and tag == "latest":
source = stream["spec"].get("dockerImageRepository")
# if the from is still empty this means there's no such tag defined
# nor we can't create any from .spec.dockerImageRepository
if not source:
msg = "the tag %s does not exist on the image stream - choose an existing tag to import" % tag
self.fail_json(msg=msg)
existing = {
"from": {
"kind": "DockerImage",
"name": source,
}
}
else:
self.fail_json(msg=err)
else:
# Disallow re-importing anything other than DockerImage
if existing.get("from", {}) and existing["from"].get("kind") != "DockerImage":
msg = "tag {tag} points to existing {kind}/={name}, it cannot be re-imported.".format(
tag=tag, kind=existing["from"]["kind"], name=existing["from"]["name"]
)
# disallow changing an existing tag
if not existing.get("from", {}):
msg = "tag %s already exists - you cannot change the source using this module." % tag
self.fail_json(msg=msg)
if source and source != existing["from"]["name"]:
if multiple:
msg = "the tag {0} points to the tag {1} which points to {2} you cannot change the source using this module".format(
tag, final_tag, existing["from"]["name"]
)
else:
msg = "the tag %s points to %s you cannot change the source using this module." % (tag, final_tag)
self.fail_json(msg=msg)
# Set the target item to import
source = existing["from"].get("name")
if multiple:
tag = final_tag
# Clear the legacy annotation
tag_to_delete = "openshift.io/image.dockerRepositoryCheck"
if existing["annotations"] and tag_to_delete in existing["annotations"]:
del existing["annotations"][tag_to_delete]
# Reset the generation
existing["generation"] = 0
new_stream = copy.deepcopy(stream)
new_stream["spec"]["tags"] = []
for t in stream["spec"]["tags"]:
if t["name"] == tag:
new_stream["spec"]["tags"].append(existing)
else:
new_stream["spec"]["tags"].append(t)
# Create accompanying ImageStreamImport
tags = {tag: source}
isi = self.create_image_stream_import_tags(new_stream, tags)
return new_stream, isi
def create_image_import(self, ref):
kind = "ImageStream"
api_version = "image.openshift.io/v1"
# Find existing Image Stream
params = dict(
kind=kind,
api_version=api_version,
name=ref.get("name"),
namespace=self.params.get("namespace")
)
result = self.kubernetes_facts(**params)
if not result["api_found"]:
msg = 'Failed to find API for resource with apiVersion "{0}" and kind "{1}"'.format(
api_version, kind
),
self.fail_json(msg=msg)
imagestream = None
if len(result["resources"]) > 0:
imagestream = result["resources"][0]
stream, isi = None, None
if not imagestream:
stream, isi = self.create_image_stream(ref)
elif self.params.get("all") and not ref["tag"]:
# importing the entire repository
stream, isi = self.import_all(imagestream)
else:
# importing a single tag
stream, isi = self.import_tag(imagestream, ref["tag"])
return isi
def parse_image_reference(self, image_ref):
result, err = parse_docker_image_ref(image_ref, self.module)
if result.get("digest"):
self.fail_json(msg="Cannot import by ID, error with definition: %s" % image_ref)
tag = result.get("tag") or None
if not self.params.get("all") and not tag:
tag = "latest"
source = self.params.get("source")
if not source:
source = image_ref
return dict(name=result.get("name"), tag=tag, source=image_ref)
def execute_module(self):
names = []
name = self.params.get("name")
if isinstance(name, string_types):
names.append(name)
elif isinstance(name, list):
names = name
else:
self.fail_json(msg="Parameter name should be provided as list or string.")
images_refs = [self.parse_image_reference(x) for x in names]
images_imports = []
for ref in images_refs:
isi = self.create_image_import(ref)
images_imports.append(isi)
# Create image import
kind = "ImageStreamImport"
api_version = "image.openshift.io/v1"
namespace = self.params.get("namespace")
try:
resource = self.find_resource(kind=kind, api_version=api_version, fail=True)
result = []
for isi in images_imports:
if not self.check_mode:
isi = resource.create(isi, namespace=namespace).to_dict()
result.append(isi)
self.exit_json(changed=True, result=result)
except DynamicApiError as exc:
msg = "Failed to create object {kind}/{namespace}/{name} due to: {error}".format(
kind=kind, namespace=namespace, name=isi["metadata"]["name"], error=exc
)
self.fail_json(
msg=msg,
error=exc.status,
status=exc.status,
reason=exc.reason,
)
except Exception as exc:
msg = "Failed to create object {kind}/{namespace}/{name} due to: {error}".format(
kind=kind, namespace=namespace, name=isi["metadata"]["name"], error=exc
)
self.fail_json(msg=msg)

View File

@@ -0,0 +1,170 @@
#!/usr/bin/env python
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import traceback
from urllib.parse import urlparse
from ansible.module_utils._text import to_native
try:
from ansible_collections.kubernetes.core.plugins.module_utils.common import (
K8sAnsibleMixin,
get_api_client,
)
HAS_KUBERNETES_COLLECTION = True
except ImportError as e:
HAS_KUBERNETES_COLLECTION = False
k8s_collection_import_exception = e
K8S_COLLECTION_ERROR = traceback.format_exc()
from ansible_collections.community.okd.plugins.module_utils.openshift_docker_image import (
parse_docker_image_ref,
)
try:
from requests import request
from requests.auth import HTTPBasicAuth
HAS_REQUESTS_MODULE = True
except ImportError as e:
HAS_REQUESTS_MODULE = False
requests_import_exception = e
REQUESTS_MODULE_ERROR = traceback.format_exc()
class OpenShiftRegistry(K8sAnsibleMixin):
def __init__(self, module):
self.module = module
self.fail_json = self.module.fail_json
self.exit_json = self.module.exit_json
if not HAS_KUBERNETES_COLLECTION:
self.fail_json(
msg="The kubernetes.core collection must be installed",
exception=K8S_COLLECTION_ERROR,
error=to_native(k8s_collection_import_exception),
)
super(OpenShiftRegistry, self).__init__(self.module)
self.params = self.module.params
self.check_mode = self.module.check_mode
self.client = get_api_client(self.module)
self.check = self.params.get("check")
def list_image_streams(self, namespace=None):
kind = "ImageStream"
api_version = "image.openshift.io/v1"
params = dict(
kind=kind,
api_version=api_version,
namespace=namespace
)
result = self.kubernetes_facts(**params)
imagestream = []
if len(result["resources"]) > 0:
imagestream = result["resources"]
return imagestream
def find_registry_info(self):
def _determine_registry(image_stream):
public, internal = None, None
docker_repo = image_stream["status"].get("publicDockerImageRepository")
if docker_repo:
ref, err = parse_docker_image_ref(docker_repo, self.module)
public = ref["hostname"]
docker_repo = image_stream["status"].get("dockerImageRepository")
if docker_repo:
ref, err = parse_docker_image_ref(docker_repo, self.module)
internal = ref["hostname"]
return internal, public
# Try to determine registry hosts from Image Stream from 'openshift' namespace
for stream in self.list_image_streams(namespace="openshift"):
internal, public = _determine_registry(stream)
if not public and not internal:
self.fail_json(msg="The integrated registry has not been configured")
return internal, public
# Unable to determine registry from 'openshift' namespace, trying with all namespace
for stream in self.list_image_streams():
internal, public = _determine_registry(stream)
if not public and not internal:
self.fail_json(msg="The integrated registry has not been configured")
return internal, public
self.fail_json(msg="No Image Streams could be located to retrieve registry info.")
def info(self):
result = {}
result["internal_hostname"], result["public_hostname"] = self.find_registry_info()
if self.check:
public_registry = result["public_hostname"]
if not public_registry:
result["check"] = dict(
reached=False,
msg="Registry does not have a public hostname."
)
else:
headers = {
'Content-Type': 'application/json'
}
params = {
'method': 'GET',
'verify': False
}
if self.client.configuration.api_key:
headers.update(self.client.configuration.api_key)
elif self.client.configuration.username and self.client.configuration.password:
if not HAS_REQUESTS_MODULE:
result["check"] = dict(
reached=False,
msg="The requests python package is missing, try `pip install requests`",
error=requests_import_exception
)
self.exit_json(**result)
params.update(
dict(auth=HTTPBasicAuth(self.client.configuration.username, self.client.configuration.password))
)
# verify ssl
host = urlparse(public_registry)
if len(host.scheme) == 0:
registry_url = "https://" + public_registry
if registry_url.startswith("https://") and self.client.configuration.ssl_ca_cert:
params.update(
dict(verify=self.client.configuration.ssl_ca_cert)
)
params.update(
dict(headers=headers)
)
last_bad_status, last_bad_reason = None, None
for path in ("/", "/healthz"):
params.update(
dict(url=registry_url + path)
)
response = request(**params)
if response.status_code == 200:
result["check"] = dict(
reached=True,
msg="The local client can contact the integrated registry."
)
self.exit_json(**result)
last_bad_reason = response.reason
last_bad_status = response.status_code
result["check"] = dict(
reached=False,
msg="Unable to contact the integrated registry using local client. Status=%d, Reason=%s" % (
last_bad_status, last_bad_reason
)
)
self.exit_json(**result)