From 241ca75b28731e891b20122252d7555b62f0b0c0 Mon Sep 17 00:00:00 2001 From: Felix Matouschek Date: Thu, 27 Jun 2024 10:21:19 +0200 Subject: [PATCH] cleanup(inventory): Make methods more robust Make the logic in the following methods more robust and add type hints where appropriate. - get_host_from_service - get_port_from_service - is_windows - setup - fetch_objects - set_ansible_host_and_port - set_composable_vars Signed-off-by: Felix Matouschek --- plugins/inventory/kubevirt.py | 72 ++++++++++++++++++++--------------- 1 file changed, 42 insertions(+), 30 deletions(-) diff --git a/plugins/inventory/kubevirt.py b/plugins/inventory/kubevirt.py index c0d8714..6423748 100644 --- a/plugins/inventory/kubevirt.py +++ b/plugins/inventory/kubevirt.py @@ -185,6 +185,7 @@ from typing import ( # potentially print a warning to the user if the client is missing. try: from kubernetes.dynamic.exceptions import DynamicApiError + from kubernetes.dynamic.resource import ResourceField HAS_K8S_MODULE_HELPER = True K8S_IMPORT_EXCEPTION = None @@ -195,6 +196,11 @@ except ImportError as e: Dummy class, mainly used for ansible-test sanity. """ + class ResourceField: + """ + Dummy class, mainly used for ansible-test sanity. + """ + HAS_K8S_MODULE_HELPER = False K8S_IMPORT_EXCEPTION = e @@ -289,21 +295,21 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable): return f"{exc.status} Reason: {exc.reason}" @staticmethod - def get_host_from_service(service: Dict, node_name: str) -> Optional[str]: + def get_host_from_service(service: Dict, node_name: Optional[str]) -> Optional[str]: """ get_host_from_service extracts the hostname to be used from the passed in service. """ - # LoadBalancer services can return a hostname or an IP address - if service["spec"]["type"] == TYPE_LOADBALANCER: - ingress = service["status"]["loadBalancer"].get("ingress") + service_type = service.get("spec", {}).get("type") + if service_type == TYPE_LOADBALANCER: + # LoadBalancer services can return a hostname or an IP address + ingress = service.get("status", {}).get("loadBalancer", {}).get("ingress") if ingress is not None and len(ingress) > 0: hostname = ingress[0].get("hostname") ip_address = ingress[0].get("ip") return hostname if hostname is not None else ip_address - - # NodePort services use the node name as host - if service["spec"]["type"] == TYPE_NODEPORT: + elif service_type == TYPE_NODEPORT: + # NodePort services use the node name as host return node_name return None @@ -314,25 +320,32 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable): get_port_from_service extracts the port to be used from the passed in service. """ - # LoadBalancer services use the port attribute - if service["spec"]["type"] == TYPE_LOADBALANCER: - return service["spec"]["ports"][0]["port"] + ports = service.get("spec", {}).get("ports", []) + if not ports: + return None - # NodePort services use the nodePort attribute - if service["spec"]["type"] == TYPE_NODEPORT: - return service["spec"]["ports"][0]["nodePort"] + service_type = service.get("spec", {}).get("type") + if service_type == TYPE_LOADBALANCER: + # LoadBalancer services use the port attribute + return ports[0].get("port") + if service_type == TYPE_NODEPORT: + # NodePort services use the nodePort attribute + return ports[0].get("nodePort") return None @staticmethod - def is_windows(guest_os_info: Dict, annotations: Dict) -> bool: + def is_windows(guest_os_info: Optional[Dict], annotations: Optional[Dict]) -> bool: """ is_windows checkes whether a given VM is running a Windows guest by checking its GuestOSInfo and annotations. """ - if "id" in guest_os_info: + if guest_os_info and "id" in guest_os_info: return guest_os_info["id"] == ID_MSWINDOWS + if not annotations: + return False + if ANNOTATION_KUBEVIRT_IO_CLUSTER_PREFERENCE_NAME in annotations: return annotations[ ANNOTATION_KUBEVIRT_IO_CLUSTER_PREFERENCE_NAME @@ -376,8 +389,6 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable): gets the configured connections and runs fetch_objects on them. If there is a cache it is returned instead. """ - connections = config_data.get("connections") - if not HAS_K8S_MODULE_HELPER: raise KubeVirtInventoryException( "This module requires the Kubernetes Python client. " @@ -392,9 +403,9 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable): pass if not source_data: - self.fetch_objects(connections) + self.fetch_objects(config_data.get("connections")) - def fetch_objects(self, connections: Dict) -> None: + def fetch_objects(self, connections: Optional[List[Dict]]) -> None: """ fetch_objects populates the inventory with every configured connection. """ @@ -644,7 +655,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable): self.set_composable_vars(vmi_name) - def set_composable_vars(self, vmi_name): + def set_composable_vars(self, vmi_name: str) -> None: """ set_composable_vars sets vars per https://docs.ansible.com/ansible/latest/dev_guide/developing_inventory.html @@ -680,7 +691,10 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable): services = {} for service in service_list.items: # Continue if service is not of type LoadBalancer or NodePort - if service.get("spec", {}).get("type") not in ( + if service.get("spec") is None: + continue + + if service["spec"].get("type") not in ( TYPE_LOADBALANCER, TYPE_NODEPORT, ): @@ -701,7 +715,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable): def set_ansible_host_and_port( self, - vmi: Dict, + vmi: ResourceField, vmi_name: str, ip_address: str, service: Optional[Dict], @@ -713,22 +727,20 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable): """ ansible_host = None ansible_port = None - if opts.kube_secondary_dns and opts.network_name is not None: + if opts.kube_secondary_dns and opts.network_name: # Set ansible_host to the kubesecondarydns derived host name if enabled # See https://github.com/kubevirt/kubesecondarydns#parameters ansible_host = ( f"{opts.network_name}.{vmi.metadata.name}.{vmi.metadata.namespace}.vm" ) - if opts.base_domain is not None: + if opts.base_domain: ansible_host += f".{opts.base_domain}" - elif opts.use_service and service is not None and opts.network_name is None: + elif opts.use_service and service and not opts.network_name: # Set ansible_host and ansible_port to the host and port from the LoadBalancer # or NodePort service exposing SSH - node_name = ( - f"{vmi.status.nodeName}.{opts.base_domain}" - if opts.append_base_domain - else vmi.status.nodeName - ) + node_name = vmi.status.nodeName + if node_name and opts.append_base_domain and opts.base_domain: + node_name += f".{opts.base_domain}" host = self.get_host_from_service(service, node_name) port = self.get_port_from_service(service) if host is not None and port is not None: