From 8ae9a395e99222b3734507ab3fd652d6295cede5 Mon Sep 17 00:00:00 2001 From: Felix Matouschek Date: Thu, 27 Jun 2024 10:43:32 +0200 Subject: [PATCH] cleanup(inventory): Consolidate fetching resources with K8SClient This consolidates fetching of resources with K8SClient by introducing the get_resources method, which is can be called by getters for certain kinds of objects. The former get_vmis_for_namespace method, which contains the main logic of the inventory is renamed to populate_inventory_from_namespace. This refactors the following getters: - get_available_namespaces - get_ssh_services_for_namespace This introduces the following getters: - get_vms_for_namespace (to be used in the following commit) - get_vmis_for_namespace Signed-off-by: Felix Matouschek --- plugins/inventory/kubevirt.py | 161 ++++++++++++++++++++-------------- 1 file changed, 94 insertions(+), 67 deletions(-) diff --git a/plugins/inventory/kubevirt.py b/plugins/inventory/kubevirt.py index 8e360fb..6bf7796 100644 --- a/plugins/inventory/kubevirt.py +++ b/plugins/inventory/kubevirt.py @@ -439,7 +439,9 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable): self.host_format, ) for namespace in namespaces: - self.get_vmis_for_namespace(client, name, namespace, opts) + self.populate_inventory_from_namespace( + client, name, namespace, opts + ) else: client = get_api_client() name = self.get_default_host_name(client.configuration.host) @@ -449,7 +451,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable): base_domain=self.get_cluster_domain(client), ) for namespace in namespaces: - self.get_vmis_for_namespace(client, name, namespace, opts) + self.populate_inventory_from_namespace(client, name, namespace, opts) def get_cluster_domain(self, client: K8SClient) -> Optional[str]: """ @@ -471,40 +473,106 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable): return None return obj.get("spec", {}).get("baseDomain") - def get_available_namespaces(self, client: K8SClient) -> List: + def get_resources( + self, client: K8SClient, api_version: str, kind: str, **kwargs + ) -> List[ResourceField]: + """ + get_resources uses a dynamic K8SClient to fetch resources from the K8S API. + """ + client = client.resources.get(api_version=api_version, kind=kind) + try: + result = client.get(**kwargs) + except DynamicApiError as exc: + self.display.debug(exc) + raise KubeVirtInventoryException( + f"Error fetching {kind} list: {self.format_dynamic_api_exc(exc)}" + ) from exc + + return result.items + + def get_available_namespaces(self, client: K8SClient) -> List[str]: """ get_available_namespaces lists all namespaces accessible with the configured credentials and returns them. """ - v1_namespace = client.resources.get(api_version="v1", kind="Namespace") - try: - obj = v1_namespace.get() - except DynamicApiError as exc: - self.display.debug(exc) - raise KubeVirtInventoryException( - f"Error fetching Namespace list: {self.format_dynamic_api_exc(exc)}" - ) from exc - return [namespace.metadata.name for namespace in obj.items] + return [ + namespace.metadata.name + for namespace in self.get_resources(client, "v1", "Namespace") + ] + + def get_vms_for_namespace( + self, client: K8SClient, namespace: str, opts: InventoryOptions + ) -> List[ResourceField]: + """ + get_vms_for_namespace returns a list of all VirtualMachines in a namespace. + """ + return self.get_resources( + client, + opts.api_version, + "VirtualMachine", + namespace=namespace, + label_selector=opts.label_selector, + ) def get_vmis_for_namespace( + self, client: K8SClient, namespace: str, opts: InventoryOptions + ) -> List[ResourceField]: + """ + get_vmis_for_namespace returns a list of all VirtualMachineInstances in a namespace. + """ + return self.get_resources( + client, + opts.api_version, + "VirtualMachineInstance", + namespace=namespace, + label_selector=opts.label_selector, + ) + + def get_ssh_services_for_namespace(self, client: K8SClient, namespace: str) -> Dict: + """ + get_ssh_services_for_namespace retrieves all services of a namespace exposing port 22/ssh. + The services are mapped to the name of the corresponding domain. + """ + service_list = self.get_resources( + client, + "v1", + "Service", + namespace=namespace, + ) + + services = {} + for service in service_list: + # Continue if service is not of type LoadBalancer or NodePort + if service.get("spec") is None: + continue + + if service["spec"].get("type") not in ( + TYPE_LOADBALANCER, + TYPE_NODEPORT, + ): + continue + + # Continue if ports are not defined, there are more than one port mapping + # or the target port is not port 22/ssh + ports = service["spec"].get("ports") + if ports is None or len(ports) != 1 or ports[0].get("targetPort") != 22: + continue + + # Only add the service to the dict if the domain selector is present + domain = service["spec"].get("selector", {}).get(LABEL_KUBEVIRT_IO_DOMAIN) + if domain is not None: + services[domain] = service + + return services + + def populate_inventory_from_namespace( self, client: K8SClient, name: str, namespace: str, opts: InventoryOptions ) -> None: """ - get_vmis_for_namespace lists all VirtualMachineInstances in a namespace - and adds groups and hosts to the inventory. + populate_inventory_from_namespace adds groups and hosts from a + namespace to the inventory. """ - vmi_client = client.resources.get( - api_version=opts.api_version, kind="VirtualMachineInstance" - ) - try: - vmi_list = vmi_client.get( - namespace=namespace, label_selector=opts.label_selector - ) - except DynamicApiError as exc: - self.display.debug(exc) - raise KubeVirtInventoryException( - f"Error fetching VirtualMachineInstance list: {self.format_dynamic_api_exc(exc)}" - ) from exc + vmi_list = self.get_vmis_for_namespace(client, namespace, opts) if not vmi_list.items: # Return early if no VMIs were found to avoid adding empty groups. @@ -658,47 +726,6 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable): self.set_composable_vars(vmi_name) - def get_ssh_services_for_namespace(self, client: K8SClient, namespace: str) -> Dict: - """ - get_ssh_services_for_namespace retrieves all services of a namespace exposing port 22/ssh. - The services are mapped to the name of the corresponding domain. - """ - v1_service = client.resources.get(api_version="v1", kind="Service") - try: - service_list = v1_service.get( - namespace=namespace, - ) - except DynamicApiError as exc: - self.display.debug(exc) - raise KubeVirtInventoryException( - f"Error fetching Service list: {self.format_dynamic_api_exc(exc)}" - ) from exc - - services = {} - for service in service_list.items: - # Continue if service is not of type LoadBalancer or NodePort - if service.get("spec") is None: - continue - - if service["spec"].get("type") not in ( - TYPE_LOADBALANCER, - TYPE_NODEPORT, - ): - continue - - # Continue if ports are not defined, there are more than one port mapping - # or the target port is not port 22/ssh - ports = service["spec"].get("ports") - if ports is None or len(ports) != 1 or ports[0].get("targetPort") != 22: - continue - - # Only add the service to the dict if the domain selector is present - domain = service["spec"].get("selector", {}).get(LABEL_KUBEVIRT_IO_DOMAIN) - if domain is not None: - services[domain] = service - - return services - def set_ansible_host_and_port( self, vmi: ResourceField,