cleanup(inventory): Make methods more robust

Make the logic in the following methods more robust and add type hints
where appropriate.

- get_host_from_service
- get_port_from_service
- is_windows
- setup
- fetch_objects
- set_ansible_host_and_port
- set_composable_vars

Signed-off-by: Felix Matouschek <fmatouschek@redhat.com>
This commit is contained in:
Felix Matouschek
2024-06-27 10:21:19 +02:00
parent 83bcffedd7
commit 241ca75b28

View File

@@ -185,6 +185,7 @@ from typing import (
# potentially print a warning to the user if the client is missing.
try:
from kubernetes.dynamic.exceptions import DynamicApiError
from kubernetes.dynamic.resource import ResourceField
HAS_K8S_MODULE_HELPER = True
K8S_IMPORT_EXCEPTION = None
@@ -195,6 +196,11 @@ except ImportError as e:
Dummy class, mainly used for ansible-test sanity.
"""
class ResourceField:
"""
Dummy class, mainly used for ansible-test sanity.
"""
HAS_K8S_MODULE_HELPER = False
K8S_IMPORT_EXCEPTION = e
@@ -289,21 +295,21 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
return f"{exc.status} Reason: {exc.reason}"
@staticmethod
def get_host_from_service(service: Dict, node_name: str) -> Optional[str]:
def get_host_from_service(service: Dict, node_name: Optional[str]) -> Optional[str]:
"""
get_host_from_service extracts the hostname to be used from the
passed in service.
"""
# LoadBalancer services can return a hostname or an IP address
if service["spec"]["type"] == TYPE_LOADBALANCER:
ingress = service["status"]["loadBalancer"].get("ingress")
service_type = service.get("spec", {}).get("type")
if service_type == TYPE_LOADBALANCER:
# LoadBalancer services can return a hostname or an IP address
ingress = service.get("status", {}).get("loadBalancer", {}).get("ingress")
if ingress is not None and len(ingress) > 0:
hostname = ingress[0].get("hostname")
ip_address = ingress[0].get("ip")
return hostname if hostname is not None else ip_address
# NodePort services use the node name as host
if service["spec"]["type"] == TYPE_NODEPORT:
elif service_type == TYPE_NODEPORT:
# NodePort services use the node name as host
return node_name
return None
@@ -314,25 +320,32 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
get_port_from_service extracts the port to be used from the
passed in service.
"""
# LoadBalancer services use the port attribute
if service["spec"]["type"] == TYPE_LOADBALANCER:
return service["spec"]["ports"][0]["port"]
ports = service.get("spec", {}).get("ports", [])
if not ports:
return None
# NodePort services use the nodePort attribute
if service["spec"]["type"] == TYPE_NODEPORT:
return service["spec"]["ports"][0]["nodePort"]
service_type = service.get("spec", {}).get("type")
if service_type == TYPE_LOADBALANCER:
# LoadBalancer services use the port attribute
return ports[0].get("port")
if service_type == TYPE_NODEPORT:
# NodePort services use the nodePort attribute
return ports[0].get("nodePort")
return None
@staticmethod
def is_windows(guest_os_info: Dict, annotations: Dict) -> bool:
def is_windows(guest_os_info: Optional[Dict], annotations: Optional[Dict]) -> bool:
"""
is_windows checkes whether a given VM is running a Windows guest
by checking its GuestOSInfo and annotations.
"""
if "id" in guest_os_info:
if guest_os_info and "id" in guest_os_info:
return guest_os_info["id"] == ID_MSWINDOWS
if not annotations:
return False
if ANNOTATION_KUBEVIRT_IO_CLUSTER_PREFERENCE_NAME in annotations:
return annotations[
ANNOTATION_KUBEVIRT_IO_CLUSTER_PREFERENCE_NAME
@@ -376,8 +389,6 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
gets the configured connections and runs fetch_objects on them.
If there is a cache it is returned instead.
"""
connections = config_data.get("connections")
if not HAS_K8S_MODULE_HELPER:
raise KubeVirtInventoryException(
"This module requires the Kubernetes Python client. "
@@ -392,9 +403,9 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
pass
if not source_data:
self.fetch_objects(connections)
self.fetch_objects(config_data.get("connections"))
def fetch_objects(self, connections: Dict) -> None:
def fetch_objects(self, connections: Optional[List[Dict]]) -> None:
"""
fetch_objects populates the inventory with every configured connection.
"""
@@ -644,7 +655,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
self.set_composable_vars(vmi_name)
def set_composable_vars(self, vmi_name):
def set_composable_vars(self, vmi_name: str) -> None:
"""
set_composable_vars sets vars per
https://docs.ansible.com/ansible/latest/dev_guide/developing_inventory.html
@@ -680,7 +691,10 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
services = {}
for service in service_list.items:
# Continue if service is not of type LoadBalancer or NodePort
if service.get("spec", {}).get("type") not in (
if service.get("spec") is None:
continue
if service["spec"].get("type") not in (
TYPE_LOADBALANCER,
TYPE_NODEPORT,
):
@@ -701,7 +715,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
def set_ansible_host_and_port(
self,
vmi: Dict,
vmi: ResourceField,
vmi_name: str,
ip_address: str,
service: Optional[Dict],
@@ -713,22 +727,20 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
"""
ansible_host = None
ansible_port = None
if opts.kube_secondary_dns and opts.network_name is not None:
if opts.kube_secondary_dns and opts.network_name:
# Set ansible_host to the kubesecondarydns derived host name if enabled
# See https://github.com/kubevirt/kubesecondarydns#parameters
ansible_host = (
f"{opts.network_name}.{vmi.metadata.name}.{vmi.metadata.namespace}.vm"
)
if opts.base_domain is not None:
if opts.base_domain:
ansible_host += f".{opts.base_domain}"
elif opts.use_service and service is not None and opts.network_name is None:
elif opts.use_service and service and not opts.network_name:
# Set ansible_host and ansible_port to the host and port from the LoadBalancer
# or NodePort service exposing SSH
node_name = (
f"{vmi.status.nodeName}.{opts.base_domain}"
if opts.append_base_domain
else vmi.status.nodeName
)
node_name = vmi.status.nodeName
if node_name and opts.append_base_domain and opts.base_domain:
node_name += f".{opts.base_domain}"
host = self.get_host_from_service(service, node_name)
port = self.get_port_from_service(service)
if host is not None and port is not None: