cleanup(inventory): Consolidate fetching resources with K8SClient

This consolidates fetching of resources with K8SClient by introducing
the get_resources method, which is can be called by getters for certain
kinds of objects.

The former get_vmis_for_namespace method, which contains the main logic
of the inventory is renamed to populate_inventory_from_namespace.

This refactors the following getters:

- get_available_namespaces
- get_ssh_services_for_namespace

This introduces the following getters:

- get_vms_for_namespace (to be used in the following commit)
- get_vmis_for_namespace

Signed-off-by: Felix Matouschek <fmatouschek@redhat.com>
This commit is contained in:
Felix Matouschek
2024-06-27 10:43:32 +02:00
parent 01a0e535e2
commit 8ae9a395e9

View File

@@ -439,7 +439,9 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
self.host_format,
)
for namespace in namespaces:
self.get_vmis_for_namespace(client, name, namespace, opts)
self.populate_inventory_from_namespace(
client, name, namespace, opts
)
else:
client = get_api_client()
name = self.get_default_host_name(client.configuration.host)
@@ -449,7 +451,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
base_domain=self.get_cluster_domain(client),
)
for namespace in namespaces:
self.get_vmis_for_namespace(client, name, namespace, opts)
self.populate_inventory_from_namespace(client, name, namespace, opts)
def get_cluster_domain(self, client: K8SClient) -> Optional[str]:
"""
@@ -471,40 +473,106 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
return None
return obj.get("spec", {}).get("baseDomain")
def get_available_namespaces(self, client: K8SClient) -> List:
def get_resources(
self, client: K8SClient, api_version: str, kind: str, **kwargs
) -> List[ResourceField]:
"""
get_resources uses a dynamic K8SClient to fetch resources from the K8S API.
"""
client = client.resources.get(api_version=api_version, kind=kind)
try:
result = client.get(**kwargs)
except DynamicApiError as exc:
self.display.debug(exc)
raise KubeVirtInventoryException(
f"Error fetching {kind} list: {self.format_dynamic_api_exc(exc)}"
) from exc
return result.items
def get_available_namespaces(self, client: K8SClient) -> List[str]:
"""
get_available_namespaces lists all namespaces accessible with the
configured credentials and returns them.
"""
v1_namespace = client.resources.get(api_version="v1", kind="Namespace")
try:
obj = v1_namespace.get()
except DynamicApiError as exc:
self.display.debug(exc)
raise KubeVirtInventoryException(
f"Error fetching Namespace list: {self.format_dynamic_api_exc(exc)}"
) from exc
return [namespace.metadata.name for namespace in obj.items]
return [
namespace.metadata.name
for namespace in self.get_resources(client, "v1", "Namespace")
]
def get_vms_for_namespace(
self, client: K8SClient, namespace: str, opts: InventoryOptions
) -> List[ResourceField]:
"""
get_vms_for_namespace returns a list of all VirtualMachines in a namespace.
"""
return self.get_resources(
client,
opts.api_version,
"VirtualMachine",
namespace=namespace,
label_selector=opts.label_selector,
)
def get_vmis_for_namespace(
self, client: K8SClient, namespace: str, opts: InventoryOptions
) -> List[ResourceField]:
"""
get_vmis_for_namespace returns a list of all VirtualMachineInstances in a namespace.
"""
return self.get_resources(
client,
opts.api_version,
"VirtualMachineInstance",
namespace=namespace,
label_selector=opts.label_selector,
)
def get_ssh_services_for_namespace(self, client: K8SClient, namespace: str) -> Dict:
"""
get_ssh_services_for_namespace retrieves all services of a namespace exposing port 22/ssh.
The services are mapped to the name of the corresponding domain.
"""
service_list = self.get_resources(
client,
"v1",
"Service",
namespace=namespace,
)
services = {}
for service in service_list:
# Continue if service is not of type LoadBalancer or NodePort
if service.get("spec") is None:
continue
if service["spec"].get("type") not in (
TYPE_LOADBALANCER,
TYPE_NODEPORT,
):
continue
# Continue if ports are not defined, there are more than one port mapping
# or the target port is not port 22/ssh
ports = service["spec"].get("ports")
if ports is None or len(ports) != 1 or ports[0].get("targetPort") != 22:
continue
# Only add the service to the dict if the domain selector is present
domain = service["spec"].get("selector", {}).get(LABEL_KUBEVIRT_IO_DOMAIN)
if domain is not None:
services[domain] = service
return services
def populate_inventory_from_namespace(
self, client: K8SClient, name: str, namespace: str, opts: InventoryOptions
) -> None:
"""
get_vmis_for_namespace lists all VirtualMachineInstances in a namespace
and adds groups and hosts to the inventory.
populate_inventory_from_namespace adds groups and hosts from a
namespace to the inventory.
"""
vmi_client = client.resources.get(
api_version=opts.api_version, kind="VirtualMachineInstance"
)
try:
vmi_list = vmi_client.get(
namespace=namespace, label_selector=opts.label_selector
)
except DynamicApiError as exc:
self.display.debug(exc)
raise KubeVirtInventoryException(
f"Error fetching VirtualMachineInstance list: {self.format_dynamic_api_exc(exc)}"
) from exc
vmi_list = self.get_vmis_for_namespace(client, namespace, opts)
if not vmi_list.items:
# Return early if no VMIs were found to avoid adding empty groups.
@@ -658,47 +726,6 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
self.set_composable_vars(vmi_name)
def get_ssh_services_for_namespace(self, client: K8SClient, namespace: str) -> Dict:
"""
get_ssh_services_for_namespace retrieves all services of a namespace exposing port 22/ssh.
The services are mapped to the name of the corresponding domain.
"""
v1_service = client.resources.get(api_version="v1", kind="Service")
try:
service_list = v1_service.get(
namespace=namespace,
)
except DynamicApiError as exc:
self.display.debug(exc)
raise KubeVirtInventoryException(
f"Error fetching Service list: {self.format_dynamic_api_exc(exc)}"
) from exc
services = {}
for service in service_list.items:
# Continue if service is not of type LoadBalancer or NodePort
if service.get("spec") is None:
continue
if service["spec"].get("type") not in (
TYPE_LOADBALANCER,
TYPE_NODEPORT,
):
continue
# Continue if ports are not defined, there are more than one port mapping
# or the target port is not port 22/ssh
ports = service["spec"].get("ports")
if ports is None or len(ports) != 1 or ports[0].get("targetPort") != 22:
continue
# Only add the service to the dict if the domain selector is present
domain = service["spec"].get("selector", {}).get(LABEL_KUBEVIRT_IO_DOMAIN)
if domain is not None:
services[domain] = service
return services
def set_ansible_host_and_port(
self,
vmi: ResourceField,