#!/usr/bin/env python from __future__ import (absolute_import, division, print_function) __metaclass__ = type import traceback from urllib.parse import urlparse from ansible_collections.community.okd.plugins.module_utils.openshift_common import AnsibleOpenshiftModule from ansible_collections.community.okd.plugins.module_utils.openshift_docker_image import ( parse_docker_image_ref, ) try: from requests import request from requests.auth import HTTPBasicAuth HAS_REQUESTS_MODULE = True requests_import_exception = None except ImportError as e: HAS_REQUESTS_MODULE = False requests_import_exception = e REQUESTS_MODULE_ERROR = traceback.format_exc() class OpenShiftRegistry(AnsibleOpenshiftModule): def __init__(self, **kwargs): super(OpenShiftRegistry, self).__init__(**kwargs) self.check = self.params.get("check") def list_image_streams(self, namespace=None): kind = "ImageStream" api_version = "image.openshift.io/v1" params = dict( kind=kind, api_version=api_version, namespace=namespace ) result = self.kubernetes_facts(**params) imagestream = [] if len(result["resources"]) > 0: imagestream = result["resources"] return imagestream def find_registry_info(self): def _determine_registry(image_stream): public, internal = None, None docker_repo = image_stream["status"].get("publicDockerImageRepository") if docker_repo: ref, err = parse_docker_image_ref(docker_repo, self.module) public = ref["hostname"] docker_repo = image_stream["status"].get("dockerImageRepository") if docker_repo: ref, err = parse_docker_image_ref(docker_repo, self.module) internal = ref["hostname"] return internal, public # Try to determine registry hosts from Image Stream from 'openshift' namespace for stream in self.list_image_streams(namespace="openshift"): internal, public = _determine_registry(stream) if not public and not internal: self.fail_json(msg="The integrated registry has not been configured") return internal, public # Unable to determine registry from 'openshift' namespace, trying with all namespace for stream in self.list_image_streams(): internal, public = _determine_registry(stream) if not public and not internal: self.fail_json(msg="The integrated registry has not been configured") return internal, public self.fail_json(msg="No Image Streams could be located to retrieve registry info.") def execute_module(self): result = {} result["internal_hostname"], result["public_hostname"] = self.find_registry_info() if self.check: public_registry = result["public_hostname"] if not public_registry: result["check"] = dict( reached=False, msg="Registry does not have a public hostname." ) else: headers = { 'Content-Type': 'application/json' } params = { 'method': 'GET', 'verify': False } if self.client.configuration.api_key: headers.update(self.client.configuration.api_key) elif self.client.configuration.username and self.client.configuration.password: if not HAS_REQUESTS_MODULE: result["check"] = dict( reached=False, msg="The requests python package is missing, try `pip install requests`", error=requests_import_exception ) self.exit_json(**result) params.update( dict(auth=HTTPBasicAuth(self.client.configuration.username, self.client.configuration.password)) ) # verify ssl host = urlparse(public_registry) if len(host.scheme) == 0: registry_url = "https://" + public_registry if registry_url.startswith("https://") and self.client.configuration.ssl_ca_cert: params.update( dict(verify=self.client.configuration.ssl_ca_cert) ) params.update( dict(headers=headers) ) last_bad_status, last_bad_reason = None, None for path in ("/", "/healthz"): params.update( dict(url=registry_url + path) ) response = request(**params) if response.status_code == 200: result["check"] = dict( reached=True, msg="The local client can contact the integrated registry." ) self.exit_json(**result) last_bad_reason = response.reason last_bad_status = response.status_code result["check"] = dict( reached=False, msg="Unable to contact the integrated registry using local client. Status=%d, Reason=%s" % ( last_bad_status, last_bad_reason ) ) self.exit_json(**result)