mirror of
https://github.com/kubevirt/kubevirt.core.git
synced 2026-03-26 19:03:16 +00:00
If no namespaces were specified in the inventory config try to get all available namespaces by trying to list OCP projects first. If the resource was not found (no OCP cluster) fall back to regular namespaces. Signed-off-by: Felix Matouschek <fmatouschek@redhat.com>
937 lines
33 KiB
Python
937 lines
33 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright 2023 Red Hat, Inc.
|
|
# Based on the kubernetes.core.k8s inventory
|
|
# Apache License 2.0 (see LICENSE or http://www.apache.org/licenses/LICENSE-2.0)
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
|
|
__metaclass__ = type
|
|
|
|
DOCUMENTATION = """
|
|
name: kubevirt
|
|
|
|
short_description: Inventory source for KubeVirt VirtualMachines and VirtualMachineInstances
|
|
|
|
author:
|
|
- "KubeVirt.io Project (!UNKNOWN)"
|
|
|
|
description:
|
|
- Fetch virtual machines from one or more namespaces with an optional label selector.
|
|
- Groups by cluster name, namespaces and labels.
|
|
- Uses V(*.kubevirt.[yml|yaml]) YAML configuration file to set parameter values.
|
|
- By default it uses the active context in I(~/.kube/config) and will return all virtual machines
|
|
for all namespaces the active user is authorized to access.
|
|
|
|
extends_documentation_fragment:
|
|
- kubevirt.core.kubevirt_auth_options
|
|
- inventory_cache
|
|
- constructed
|
|
|
|
options:
|
|
plugin:
|
|
description: Token that ensures this is a source file for the P(kubevirt.core.kubevirt#inventory) plugin.
|
|
required: True
|
|
choices: ["kubevirt", "kubevirt.core.kubevirt"]
|
|
host_format:
|
|
description:
|
|
- 'Specify the format of the host in the inventory group. Available specifiers: V(name), V(namespace) and V(uid).'
|
|
default: "{namespace}-{name}"
|
|
name:
|
|
description:
|
|
- Optional name to assign to the cluster. If not provided, a name is constructed from the server
|
|
and port.
|
|
namespaces:
|
|
description:
|
|
- List of namespaces. If not specified, will fetch virtual machines from all namespaces
|
|
the user is authorized to access.
|
|
label_selector:
|
|
description:
|
|
- Define a label selector to select a subset of the fetched virtual machines.
|
|
network_name:
|
|
description:
|
|
- In case multiple networks are attached to a virtual machine, define which interface should
|
|
be returned as primary IP address.
|
|
aliases: [ interface_name ]
|
|
kube_secondary_dns:
|
|
description:
|
|
- Enable C(kubesecondarydns) derived host names when using a secondary network interface.
|
|
type: bool
|
|
default: False
|
|
use_service:
|
|
description:
|
|
- Enable the use of C(Services) to establish an SSH connection to a virtual machine.
|
|
- Services are only used if no O(network_name) was provided.
|
|
type: bool
|
|
default: True
|
|
unset_ansible_port:
|
|
description:
|
|
- Try to unset the value of C(ansible_port) if no non-default value was found.
|
|
type: bool
|
|
default: True
|
|
version_added: 2.2.0
|
|
create_groups:
|
|
description:
|
|
- Enable the creation of groups from labels on C(VirtualMachines) and C(VirtualMachineInstances).
|
|
type: bool
|
|
default: False
|
|
base_domain:
|
|
description:
|
|
- Override the base domain used to construct host names. Used in case of
|
|
C(kubesecondarydns) or C(Services) of type C(NodePort) if O(append_base_domain) is set.
|
|
append_base_domain:
|
|
description:
|
|
- Append the base domain of the cluster to host names constructed from SSH C(Services) of type C(NodePort).
|
|
type: bool
|
|
default: False
|
|
api_version:
|
|
description:
|
|
- Specify the used KubeVirt API version.
|
|
default: "kubevirt.io/v1"
|
|
connections:
|
|
description:
|
|
- Optional list of cluster connection settings.
|
|
- This parameter is deprecated. Split your connections into multiple configuration files and move
|
|
parameters of each connection to the configuration top level.
|
|
- Deprecated in version C(1.5.0), will be removed in version C(3.0.0).
|
|
|
|
requirements:
|
|
- "python >= 3.9"
|
|
- "kubernetes >= 28.1.0"
|
|
- "PyYAML >= 3.11"
|
|
"""
|
|
|
|
EXAMPLES = """
|
|
# Filename must end with kubevirt.[yml|yaml]
|
|
|
|
# Authenticate with token and return all virtual machines from all accessible namespaces
|
|
- plugin: kubevirt.core.kubevirt
|
|
host: https://192.168.64.4:8443
|
|
api_key: xxxxxxxxxxxxxxxx
|
|
validate_certs: false
|
|
|
|
# Use default ~/.kube/config and return virtual machines from namespace testing connected to network bridge-network
|
|
- plugin: kubevirt.core.kubevirt
|
|
namespaces:
|
|
- testing
|
|
network_name: bridge-network
|
|
|
|
# Use default ~/.kube/config and return virtual machines from namespace testing with label app=test
|
|
- plugin: kubevirt.core.kubevirt
|
|
namespaces:
|
|
- testing
|
|
label_selector: app=test
|
|
|
|
# Use a custom config file and a specific context
|
|
- plugin: kubevirt.core.kubevirt
|
|
kubeconfig: /path/to/config
|
|
context: 'awx/192-168-64-4:8443/developer'
|
|
"""
|
|
|
|
from dataclasses import dataclass, InitVar
|
|
from json import loads
|
|
from re import compile as re_compile
|
|
from typing import (
|
|
Any,
|
|
Dict,
|
|
List,
|
|
Optional,
|
|
)
|
|
|
|
# Handle import errors of python kubernetes client.
|
|
# Set HAS_K8S_MODULE_HELPER and k8s_import exception accordingly to
|
|
# potentially print a warning to the user if the client is missing.
|
|
try:
|
|
from kubernetes.dynamic.exceptions import DynamicApiError, ResourceNotFoundError
|
|
|
|
HAS_K8S_MODULE_HELPER = True
|
|
K8S_IMPORT_EXCEPTION = None
|
|
except ImportError as e:
|
|
|
|
class DynamicApiError(Exception):
|
|
"""
|
|
Dummy class, mainly used for ansible-test sanity.
|
|
"""
|
|
|
|
class ResourceNotFoundError(Exception):
|
|
"""
|
|
Dummy class, mainly used for ansible-test sanity.
|
|
"""
|
|
|
|
HAS_K8S_MODULE_HELPER = False
|
|
K8S_IMPORT_EXCEPTION = e
|
|
|
|
from ansible.plugins.inventory import BaseInventoryPlugin, Constructable, Cacheable
|
|
|
|
# Handle import errors of trust_as_template.
|
|
# It is only available on ansible-core >=2.19.
|
|
try:
|
|
from ansible.template import trust_as_template
|
|
except ImportError:
|
|
trust_as_template = None
|
|
|
|
|
|
from ansible_collections.kubernetes.core.plugins.module_utils.k8s.client import (
|
|
get_api_client,
|
|
K8SClient,
|
|
)
|
|
|
|
ANNOTATION_KUBEVIRT_IO_CLUSTER_PREFERENCE_NAME = "kubevirt.io/cluster-preference-name"
|
|
ANNOTATION_KUBEVIRT_IO_PREFERENCE_NAME = "kubevirt.io/preference-name"
|
|
ANNOTATION_VM_KUBEVIRT_IO_OS = "vm.kubevirt.io/os"
|
|
LABEL_KUBEVIRT_IO_DOMAIN = "kubevirt.io/domain"
|
|
TYPE_LOADBALANCER = "LoadBalancer"
|
|
TYPE_NODEPORT = "NodePort"
|
|
ID_MSWINDOWS = "mswindows"
|
|
SERVICE_TARGET_PORT_SSH = 22
|
|
SERVICE_TARGET_PORT_WINRM_HTTP = 5985
|
|
SERVICE_TARGET_PORT_WINRM_HTTPS = 5986
|
|
|
|
|
|
class KubeVirtInventoryException(Exception):
|
|
"""
|
|
This class is used for exceptions raised by this inventory.
|
|
"""
|
|
|
|
|
|
@dataclass
|
|
class InventoryOptions:
|
|
"""
|
|
This class holds the options defined by the user.
|
|
"""
|
|
|
|
api_version: Optional[str] = None
|
|
label_selector: Optional[str] = None
|
|
network_name: Optional[str] = None
|
|
kube_secondary_dns: Optional[bool] = None
|
|
use_service: Optional[bool] = None
|
|
unset_ansible_port: Optional[bool] = None
|
|
create_groups: Optional[bool] = None
|
|
base_domain: Optional[str] = None
|
|
append_base_domain: Optional[bool] = None
|
|
host_format: Optional[str] = None
|
|
namespaces: Optional[List[str]] = None
|
|
name: Optional[str] = None
|
|
config_data: InitVar[Optional[Dict]] = None
|
|
|
|
def __post_init__(self, config_data: Optional[Dict]) -> None:
|
|
if not config_data or not isinstance(config_data, dict):
|
|
config_data = {}
|
|
|
|
# Copy values from config_data and set defaults for keys not present
|
|
self.api_version = (
|
|
self.api_version
|
|
if self.api_version is not None
|
|
else config_data.get("api_version", "kubevirt.io/v1")
|
|
)
|
|
self.label_selector = (
|
|
self.label_selector
|
|
if self.label_selector is not None
|
|
else config_data.get("label_selector")
|
|
)
|
|
self.network_name = (
|
|
self.network_name
|
|
if self.network_name is not None
|
|
else config_data.get("network_name", config_data.get("interface_name"))
|
|
)
|
|
self.kube_secondary_dns = (
|
|
self.kube_secondary_dns
|
|
if self.kube_secondary_dns is not None
|
|
else config_data.get("kube_secondary_dns", False)
|
|
)
|
|
self.use_service = (
|
|
self.use_service
|
|
if self.use_service is not None
|
|
else config_data.get("use_service", True)
|
|
)
|
|
self.unset_ansible_port = (
|
|
self.unset_ansible_port
|
|
if self.unset_ansible_port is not None
|
|
else config_data.get("unset_ansible_port", True)
|
|
)
|
|
self.create_groups = (
|
|
self.create_groups
|
|
if self.create_groups is not None
|
|
else config_data.get("create_groups", False)
|
|
)
|
|
self.base_domain = (
|
|
self.base_domain
|
|
if self.base_domain is not None
|
|
else config_data.get("base_domain")
|
|
)
|
|
self.append_base_domain = (
|
|
self.append_base_domain
|
|
if self.append_base_domain is not None
|
|
else config_data.get("append_base_domain", False)
|
|
)
|
|
self.host_format = (
|
|
self.host_format
|
|
if self.host_format is not None
|
|
else config_data.get("host_format", "{namespace}-{name}")
|
|
)
|
|
self.namespaces = (
|
|
self.namespaces
|
|
if self.namespaces is not None
|
|
else config_data.get("namespaces")
|
|
)
|
|
self.name = self.name if self.name is not None else config_data.get("name")
|
|
|
|
|
|
class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
|
|
"""
|
|
This class implements the actual inventory module.
|
|
"""
|
|
|
|
NAME = "kubevirt.core.kubevirt"
|
|
|
|
# Used to convert camel case variable names into snake case
|
|
_snake_case_pattern = re_compile(r"(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])")
|
|
|
|
@staticmethod
|
|
def _get_default_hostname(host: str) -> str:
|
|
"""
|
|
_get_default_host_name strips URL schemes from the host name and
|
|
replaces invalid characters.
|
|
"""
|
|
return (
|
|
host.replace("https://", "")
|
|
.replace("http://", "")
|
|
.replace(".", "-")
|
|
.replace(":", "_")
|
|
)
|
|
|
|
@staticmethod
|
|
def _format_dynamic_api_exc(exc: DynamicApiError) -> str:
|
|
"""
|
|
_format_dynamic_api_exc tries to extract the message from the JSON body
|
|
of a DynamicApiError.
|
|
"""
|
|
if exc.body:
|
|
if exc.headers and exc.headers.get("Content-Type") == "application/json":
|
|
message = loads(exc.body).get("message")
|
|
if message:
|
|
return message
|
|
return exc.body
|
|
|
|
return f"{exc.status} Reason: {exc.reason}"
|
|
|
|
@staticmethod
|
|
def _format_var_name(name: str) -> str:
|
|
"""
|
|
_format_var_name formats a CamelCase variable name into a snake_case name
|
|
suitable for use as a inventory variable name.
|
|
"""
|
|
return InventoryModule._snake_case_pattern.sub("_", name).lower()
|
|
|
|
@staticmethod
|
|
def _obj_is_valid(obj: Dict) -> bool:
|
|
"""
|
|
_obj_is_valid ensures commonly used keys are present in the passed object.
|
|
"""
|
|
return bool(
|
|
"spec" in obj
|
|
and "status" in obj
|
|
and "metadata" in obj
|
|
and obj["metadata"].get("name")
|
|
and obj["metadata"].get("namespace")
|
|
and obj["metadata"].get("uid")
|
|
)
|
|
|
|
@staticmethod
|
|
def _find_service_with_target_port(
|
|
services: List[Dict], target_port: int
|
|
) -> Optional[Dict]:
|
|
"""
|
|
_find_service_with_target_port returns the first found service with a given
|
|
target port in the passed in list of services or otherwise None.
|
|
"""
|
|
for service in services:
|
|
if (
|
|
(ports := service.get("spec", {}).get("ports")) is not None
|
|
and len(ports) == 1
|
|
and ports[0].get("targetPort", 0) == target_port
|
|
):
|
|
return service
|
|
|
|
return None
|
|
|
|
@staticmethod
|
|
def _get_host_from_service(
|
|
service: Dict, node_name: Optional[str]
|
|
) -> Optional[str]:
|
|
"""
|
|
_get_host_from_service extracts the hostname to be used from the
|
|
passed in service.
|
|
"""
|
|
service_type = service.get("spec", {}).get("type")
|
|
if service_type == TYPE_LOADBALANCER:
|
|
# LoadBalancer services can return a hostname or an IP address
|
|
ingress = service.get("status", {}).get("loadBalancer", {}).get("ingress")
|
|
if ingress is not None and len(ingress) > 0:
|
|
hostname = ingress[0].get("hostname")
|
|
ip_address = ingress[0].get("ip")
|
|
return hostname if hostname is not None else ip_address
|
|
elif service_type == TYPE_NODEPORT:
|
|
# NodePort services use the node name as host
|
|
return node_name
|
|
|
|
return None
|
|
|
|
@staticmethod
|
|
def _get_port_from_service(service: Dict) -> Optional[str]:
|
|
"""
|
|
_get_port_from_service extracts the port to be used from the
|
|
passed in service.
|
|
"""
|
|
ports = service.get("spec", {}).get("ports", [])
|
|
if not ports:
|
|
return None
|
|
|
|
service_type = service.get("spec", {}).get("type")
|
|
if service_type == TYPE_LOADBALANCER:
|
|
# LoadBalancer services use the port attribute
|
|
return ports[0].get("port")
|
|
if service_type == TYPE_NODEPORT:
|
|
# NodePort services use the nodePort attribute
|
|
return ports[0].get("nodePort")
|
|
|
|
return None
|
|
|
|
@staticmethod
|
|
def _is_windows(guest_os_info: Optional[Dict], annotations: Optional[Dict]) -> bool:
|
|
"""
|
|
_is_windows checks whether a given VM is running a Windows guest
|
|
by checking its GuestOSInfo and annotations.
|
|
"""
|
|
if guest_os_info and "id" in guest_os_info:
|
|
return guest_os_info["id"] == ID_MSWINDOWS
|
|
|
|
if not annotations:
|
|
return False
|
|
|
|
if ANNOTATION_KUBEVIRT_IO_CLUSTER_PREFERENCE_NAME in annotations:
|
|
return annotations[
|
|
ANNOTATION_KUBEVIRT_IO_CLUSTER_PREFERENCE_NAME
|
|
].startswith("windows")
|
|
|
|
if ANNOTATION_KUBEVIRT_IO_PREFERENCE_NAME in annotations:
|
|
return annotations[ANNOTATION_KUBEVIRT_IO_PREFERENCE_NAME].startswith(
|
|
"windows"
|
|
)
|
|
|
|
if ANNOTATION_VM_KUBEVIRT_IO_OS in annotations:
|
|
return annotations[ANNOTATION_VM_KUBEVIRT_IO_OS].startswith("windows")
|
|
|
|
return False
|
|
|
|
def verify_file(self, path: str) -> None:
|
|
"""
|
|
verify_file ensures the inventory file is compatible with this plugin.
|
|
"""
|
|
return super().verify_file(path) and path.endswith(
|
|
("kubevirt.yml", "kubevirt.yaml")
|
|
)
|
|
|
|
def parse(self, inventory: Any, loader: Any, path: str, cache: bool = True) -> None:
|
|
"""
|
|
parse is the main entry point of the inventory.
|
|
It checks for availability of the Kubernetes Python client,
|
|
gets the configuration, retrieves the cache or runs fetch_objects and
|
|
populates the inventory.
|
|
"""
|
|
if not HAS_K8S_MODULE_HELPER:
|
|
raise KubeVirtInventoryException(
|
|
"This module requires the Kubernetes Python client. "
|
|
+ f"Try `pip install kubernetes`. Detail: {K8S_IMPORT_EXCEPTION}"
|
|
)
|
|
|
|
super().parse(inventory, loader, path)
|
|
|
|
config_data = self._read_config_data(path)
|
|
cache_key = self.get_cache_key(path)
|
|
user_cache_setting = self.get_option("cache")
|
|
attempt_to_read_cache = user_cache_setting and cache
|
|
cache_needs_update = user_cache_setting and not cache
|
|
|
|
self._connections_compatibility(config_data)
|
|
opts = InventoryOptions(config_data=config_data)
|
|
|
|
results = {}
|
|
if attempt_to_read_cache:
|
|
try:
|
|
results = self.cache[cache_key]
|
|
except KeyError:
|
|
cache_needs_update = True
|
|
if not attempt_to_read_cache or cache_needs_update:
|
|
results = self._fetch_objects(get_api_client(**config_data), opts)
|
|
if cache_needs_update:
|
|
self.cache[cache_key] = results
|
|
|
|
self._populate_inventory(results, opts)
|
|
|
|
def _connections_compatibility(self, config_data: Dict) -> None:
|
|
"""
|
|
_connections_compatibility ensures compatibility with the connection
|
|
parameter found in earlier versions of this inventory plugin (<1.5.0).
|
|
"""
|
|
collection_name = "kubevirt.core"
|
|
version_removed_in = "3.0.0"
|
|
|
|
if (connections := config_data.get("connections")) is None:
|
|
return
|
|
|
|
self.display.deprecated(
|
|
msg="The 'connections' parameter is deprecated and now supports only a single list entry.",
|
|
version=version_removed_in,
|
|
collection_name=collection_name,
|
|
)
|
|
|
|
if not isinstance(connections, list):
|
|
raise KubeVirtInventoryException("Expecting connections to be a list.")
|
|
|
|
if len(connections) == 1:
|
|
if not isinstance(connections[0], dict):
|
|
raise KubeVirtInventoryException(
|
|
"Expecting connection to be a dictionary."
|
|
)
|
|
# Copy the single connections entry into the top level
|
|
for k, v in connections[0].items():
|
|
config_data[k] = v
|
|
self.display.deprecated(
|
|
msg="Move all of your connection parameters to the configuration top level.",
|
|
version=version_removed_in,
|
|
collection_name=collection_name,
|
|
)
|
|
elif len(connections) > 1:
|
|
self.display.deprecated(
|
|
msg="Split your connections into multiple configuration files.",
|
|
version="2.0.0",
|
|
collection_name=collection_name,
|
|
removed=True,
|
|
)
|
|
|
|
def _fetch_objects(self, client: Any, opts: InventoryOptions) -> Dict:
|
|
"""
|
|
fetch_objects fetches all relevant objects from the K8S API.
|
|
"""
|
|
namespaces = {}
|
|
for namespace in (
|
|
opts.namespaces
|
|
if opts.namespaces
|
|
else self._get_available_namespaces(client)
|
|
):
|
|
vms = self._get_vms_for_namespace(client, namespace, opts)
|
|
vmis = self._get_vmis_for_namespace(client, namespace, opts)
|
|
|
|
if not vms and not vmis:
|
|
# Continue if no VMs and VMIs were found to avoid adding empty groups.
|
|
continue
|
|
|
|
namespaces[namespace] = {
|
|
"vms": vms,
|
|
"vmis": vmis,
|
|
"services": self._get_services_for_namespace(client, namespace),
|
|
}
|
|
|
|
return {
|
|
"default_hostname": self._get_default_hostname(client.configuration.host),
|
|
"cluster_domain": self._get_cluster_domain(client),
|
|
"namespaces": namespaces,
|
|
}
|
|
|
|
def _get_cluster_domain(self, client: K8SClient) -> Optional[str]:
|
|
"""
|
|
_get_cluster_domain tries to get the base domain of an OpenShift cluster.
|
|
"""
|
|
try:
|
|
v1_dns = client.resources.get(
|
|
api_version="config.openshift.io/v1", kind="DNS"
|
|
)
|
|
except Exception:
|
|
# If resource not found return None
|
|
return None
|
|
try:
|
|
obj = v1_dns.get(name="cluster")
|
|
except DynamicApiError as exc:
|
|
self.display.debug(
|
|
f"Failed to fetch cluster DNS config: {self._format_dynamic_api_exc(exc)}"
|
|
)
|
|
return None
|
|
return obj.get("spec", {}).get("baseDomain")
|
|
|
|
def _get_resources(
|
|
self, client: K8SClient, api_version: str, kind: str, **kwargs
|
|
) -> List[Dict]:
|
|
"""
|
|
_get_resources uses a dynamic K8SClient to fetch resources from the K8S API.
|
|
"""
|
|
client = client.resources.get(api_version=api_version, kind=kind)
|
|
try:
|
|
result = client.get(**kwargs)
|
|
except DynamicApiError as exc:
|
|
self.display.debug(exc)
|
|
raise KubeVirtInventoryException(
|
|
f"Error fetching {kind} list: {self._format_dynamic_api_exc(exc)}"
|
|
) from exc
|
|
|
|
return [item.to_dict() for item in result.items]
|
|
|
|
def _get_available_namespaces(self, client: K8SClient) -> List[str]:
|
|
"""
|
|
_get_available_namespaces lists all namespaces accessible with the
|
|
configured credentials and returns them.
|
|
"""
|
|
|
|
namespaces = []
|
|
try:
|
|
namespaces = self._get_resources(
|
|
client, "project.openshift.io/v1", "Project"
|
|
)
|
|
except ResourceNotFoundError:
|
|
namespaces = self._get_resources(client, "v1", "Namespace")
|
|
|
|
return [
|
|
namespace["metadata"]["name"]
|
|
for namespace in namespaces
|
|
if "metadata" in namespace and "name" in namespace["metadata"]
|
|
]
|
|
|
|
def _get_vms_for_namespace(
|
|
self, client: K8SClient, namespace: str, opts: InventoryOptions
|
|
) -> List[Dict]:
|
|
"""
|
|
_get_vms_for_namespace returns a list of all VirtualMachines in a namespace.
|
|
"""
|
|
return self._get_resources(
|
|
client,
|
|
opts.api_version,
|
|
"VirtualMachine",
|
|
namespace=namespace,
|
|
label_selector=opts.label_selector,
|
|
)
|
|
|
|
def _get_vmis_for_namespace(
|
|
self, client: K8SClient, namespace: str, opts: InventoryOptions
|
|
) -> List[Dict]:
|
|
"""
|
|
_get_vmis_for_namespace returns a list of all VirtualMachineInstances in a namespace.
|
|
"""
|
|
return self._get_resources(
|
|
client,
|
|
opts.api_version,
|
|
"VirtualMachineInstance",
|
|
namespace=namespace,
|
|
label_selector=opts.label_selector,
|
|
)
|
|
|
|
def _get_services_for_namespace(
|
|
self, client: K8SClient, namespace: str
|
|
) -> Dict[str, List[Dict]]:
|
|
"""
|
|
_get_services_for_namespace retrieves all services of a namespace exposing ssh or winrm.
|
|
The services are mapped to the name of the corresponding domain.
|
|
"""
|
|
items = self._get_resources(
|
|
client,
|
|
"v1",
|
|
"Service",
|
|
namespace=namespace,
|
|
)
|
|
|
|
services = {}
|
|
for service in items:
|
|
# Continue if service is not of type LoadBalancer or NodePort
|
|
if not (spec := service.get("spec")):
|
|
continue
|
|
|
|
if spec.get("type") not in (
|
|
TYPE_LOADBALANCER,
|
|
TYPE_NODEPORT,
|
|
):
|
|
continue
|
|
|
|
# Continue if ports are not defined, there are more than one port mapping
|
|
# or the target port is not port 22 (ssh) or port 5985 or 5986 (winrm).
|
|
if (
|
|
(ports := spec.get("ports")) is None
|
|
or len(ports) != 1
|
|
or ports[0].get("targetPort")
|
|
not in [
|
|
SERVICE_TARGET_PORT_SSH,
|
|
SERVICE_TARGET_PORT_WINRM_HTTP,
|
|
SERVICE_TARGET_PORT_WINRM_HTTPS,
|
|
]
|
|
):
|
|
continue
|
|
|
|
# Only add the service to the list if the domain selector is present
|
|
if domain := spec.get("selector", {}).get(LABEL_KUBEVIRT_IO_DOMAIN):
|
|
if domain not in services:
|
|
services[domain] = []
|
|
services[domain].append(service)
|
|
|
|
return services
|
|
|
|
def _populate_inventory(self, results: Dict, opts: InventoryOptions) -> None:
|
|
"""
|
|
_populate_inventory populates the inventory by completing the InventoryOptions
|
|
and invoking populate_inventory_from_namespace for every namespace in results.
|
|
"""
|
|
if opts.base_domain is None:
|
|
opts.base_domain = results["cluster_domain"]
|
|
if opts.name is None:
|
|
opts.name = results["default_hostname"]
|
|
for namespace, data in results["namespaces"].items():
|
|
self._populate_inventory_from_namespace(namespace, data, opts)
|
|
|
|
def _populate_inventory_from_namespace(
|
|
self, namespace: str, data: Dict, opts: InventoryOptions
|
|
) -> None:
|
|
"""
|
|
_populate_inventory_from_namespace adds groups and hosts from a
|
|
namespace to the inventory.
|
|
"""
|
|
vms = {
|
|
vm["metadata"]["name"]: vm for vm in data["vms"] if self._obj_is_valid(vm)
|
|
}
|
|
vmis = {
|
|
vmi["metadata"]["name"]: vmi
|
|
for vmi in data["vmis"]
|
|
if self._obj_is_valid(vmi)
|
|
}
|
|
|
|
if not vms and not vmis:
|
|
# Return early if no VMs and VMIs were found to avoid adding empty groups.
|
|
return
|
|
|
|
services = {
|
|
domain: [service for service in services if self._obj_is_valid(service)]
|
|
for domain, services in data["services"].items()
|
|
}
|
|
|
|
name = self._sanitize_group_name(opts.name)
|
|
namespace_group = self._sanitize_group_name(f"namespace_{namespace}")
|
|
|
|
self.inventory.add_group(name)
|
|
self.inventory.add_group(namespace_group)
|
|
self.inventory.add_child(name, namespace_group)
|
|
|
|
# Add found VMs and optionally enhance with VMI data
|
|
for name, vm in vms.items():
|
|
hostname = self._add_host(vm["metadata"], opts.host_format, namespace_group)
|
|
self._set_vars_from_vm(hostname, vm, opts)
|
|
if name in vmis:
|
|
self._set_vars_from_vmi(hostname, vmis[name], services, opts)
|
|
self._set_composable_vars(hostname)
|
|
|
|
# Add remaining VMIs without VM
|
|
for name, vmi in vmis.items():
|
|
if name in vms:
|
|
continue
|
|
hostname = self._add_host(
|
|
vmi["metadata"], opts.host_format, namespace_group
|
|
)
|
|
self._set_vars_from_vmi(hostname, vmi, services, opts)
|
|
self._set_composable_vars(hostname)
|
|
|
|
def _add_host(self, metadata: Dict, host_format: str, namespace_group: str) -> str:
|
|
"""
|
|
_add_host adds a host to the inventory.
|
|
"""
|
|
hostname = host_format.format(
|
|
namespace=metadata["namespace"],
|
|
name=metadata["name"],
|
|
uid=metadata["uid"],
|
|
)
|
|
self.inventory.add_host(hostname)
|
|
self.inventory.add_child(namespace_group, hostname)
|
|
|
|
return hostname
|
|
|
|
def _set_vars_from_vm(
|
|
self, hostname: str, vm: Dict, opts: InventoryOptions
|
|
) -> None:
|
|
"""
|
|
_set_vars_from_vm sets inventory variables from a VM prefixed with vm_.
|
|
"""
|
|
self._set_common_vars(hostname, "vm", vm, opts)
|
|
|
|
def _set_vars_from_vmi(
|
|
self,
|
|
hostname: str,
|
|
vmi: Dict,
|
|
services: Dict[str, List[Dict]],
|
|
opts: InventoryOptions,
|
|
) -> None:
|
|
"""
|
|
_set_vars_from_vmi sets inventory variables from a VMI prefixed with vmi_ and
|
|
looks up the interface to set ansible_host and ansible_port.
|
|
"""
|
|
self._set_common_vars(hostname, "vmi", vmi, opts)
|
|
|
|
if not (interfaces := vmi["status"].get("interfaces")):
|
|
return
|
|
|
|
if opts.network_name is None:
|
|
# Use first interface
|
|
interface = interfaces[0]
|
|
else:
|
|
# Find interface by its name
|
|
interface = next(
|
|
(i for i in interfaces if i.get("name") == opts.network_name),
|
|
None,
|
|
)
|
|
|
|
# If interface is not found or IP address is not reported skip this VMI
|
|
if not interface or not interface.get("ipAddress"):
|
|
return
|
|
|
|
_services = services.get(
|
|
vmi["metadata"].get("labels", {}).get(LABEL_KUBEVIRT_IO_DOMAIN), []
|
|
)
|
|
|
|
# Set up the connection
|
|
service = None
|
|
if self._is_windows(
|
|
vmi["status"].get("guestOSInfo", {}),
|
|
vmi["metadata"].get("annotations", {}),
|
|
):
|
|
self.inventory.set_variable(hostname, "ansible_connection", "winrm")
|
|
service = self._find_service_with_target_port(
|
|
_services, SERVICE_TARGET_PORT_WINRM_HTTPS
|
|
)
|
|
if service is None:
|
|
service = self._find_service_with_target_port(
|
|
_services, SERVICE_TARGET_PORT_WINRM_HTTP
|
|
)
|
|
else:
|
|
service = self._find_service_with_target_port(
|
|
_services, SERVICE_TARGET_PORT_SSH
|
|
)
|
|
|
|
self._set_ansible_host_and_port(
|
|
vmi,
|
|
hostname,
|
|
interface["ipAddress"],
|
|
service,
|
|
opts,
|
|
)
|
|
|
|
def _set_common_vars(
|
|
self, hostname: str, prefix: str, obj: Dict, opts: InventoryOptions
|
|
):
|
|
"""
|
|
_set_common_vars sets common inventory variables from VMs or VMIs.
|
|
"""
|
|
# Add hostvars from metadata
|
|
if annotations := obj["metadata"].get("annotations"):
|
|
self.inventory.set_variable(hostname, f"{prefix}_annotations", annotations)
|
|
if labels := obj["metadata"].get("labels"):
|
|
self.inventory.set_variable(hostname, f"{prefix}_labels", labels)
|
|
# Create label groups and add vm to it if enabled
|
|
if opts.create_groups:
|
|
self._set_groups_from_labels(hostname, labels)
|
|
if resource_version := obj["metadata"].get("resourceVersion"):
|
|
self.inventory.set_variable(
|
|
hostname, f"{prefix}_resource_version", resource_version
|
|
)
|
|
if uid := obj["metadata"].get("uid"):
|
|
self.inventory.set_variable(hostname, f"{prefix}_uid", uid)
|
|
|
|
# Add hostvars from status
|
|
for key, value in obj["status"].items():
|
|
self.inventory.set_variable(
|
|
hostname, f"{prefix}_{self._format_var_name(key)}", value
|
|
)
|
|
|
|
def _set_groups_from_labels(self, hostname: str, labels: Dict) -> None:
|
|
"""
|
|
_set_groups_from_labels adds groups for each label of a VM or VMI and
|
|
adds the host to each group.
|
|
"""
|
|
groups = []
|
|
for key, value in labels.items():
|
|
group_name = self._sanitize_group_name(f"label_{key}_{value}")
|
|
if group_name not in groups:
|
|
groups.append(group_name)
|
|
# Add host to each label_value group
|
|
for group in groups:
|
|
self.inventory.add_group(group)
|
|
self.inventory.add_child(group, hostname)
|
|
|
|
def _set_ansible_host_and_port(
|
|
self,
|
|
vmi: Dict,
|
|
hostname: str,
|
|
ip_address: str,
|
|
service: Optional[Dict],
|
|
opts: InventoryOptions,
|
|
) -> None:
|
|
"""
|
|
_set_ansible_host_and_port sets the ansible_host and possibly the ansible_port var.
|
|
Secondary interfaces have priority over a service exposing SSH
|
|
"""
|
|
ansible_host = None
|
|
ansible_port = None
|
|
if opts.kube_secondary_dns and opts.network_name:
|
|
# Set ansible_host to the kubesecondarydns derived host name if enabled
|
|
# See https://github.com/kubevirt/kubesecondarydns#parameters
|
|
ansible_host = f"{opts.network_name}.{vmi['metadata']['name']}.{vmi['metadata']['namespace']}.vm"
|
|
if opts.base_domain:
|
|
ansible_host += f".{opts.base_domain}"
|
|
elif opts.use_service and service and not opts.network_name:
|
|
# Set ansible_host and ansible_port to the host and port from the LoadBalancer
|
|
# or NodePort service exposing SSH
|
|
node_name = vmi["status"].get("nodeName")
|
|
if node_name and opts.append_base_domain and opts.base_domain:
|
|
node_name += f".{opts.base_domain}"
|
|
host = self._get_host_from_service(service, node_name)
|
|
port = self._get_port_from_service(service)
|
|
if host is not None and port is not None:
|
|
ansible_host = host
|
|
ansible_port = port
|
|
|
|
# Default to the IP address of the interface if ansible_host was not set prior
|
|
if ansible_host is None:
|
|
ansible_host = ip_address
|
|
|
|
self.inventory.set_variable(hostname, "ansible_host", ansible_host)
|
|
if opts.unset_ansible_port or ansible_port is not None:
|
|
self.inventory.set_variable(hostname, "ansible_port", ansible_port)
|
|
|
|
def _set_composable_vars(self, hostname: str) -> None:
|
|
"""
|
|
_set_composable_vars sets vars per
|
|
https://docs.ansible.com/ansible/latest/dev_guide/developing_inventory.html
|
|
"""
|
|
hostvars = self.inventory.get_host(hostname).get_vars()
|
|
strict = self.get_option("strict")
|
|
|
|
def trust_compose_groups(data: Dict) -> Dict:
|
|
if trust_as_template is not None:
|
|
return {k: trust_as_template(v) for k, v in data.items()}
|
|
return data
|
|
|
|
def trust_keyed_groups(data: List) -> List:
|
|
if trust_as_template is not None:
|
|
return [{**d, "key": trust_as_template(d["key"])} for d in data]
|
|
return data
|
|
|
|
self._set_composite_vars(
|
|
trust_compose_groups(self.get_option("compose")),
|
|
hostvars,
|
|
hostname,
|
|
strict=True,
|
|
)
|
|
self._add_host_to_composed_groups(
|
|
trust_compose_groups(self.get_option("groups")),
|
|
hostvars,
|
|
hostname,
|
|
strict=strict,
|
|
)
|
|
self._add_host_to_keyed_groups(
|
|
trust_keyed_groups(self.get_option("keyed_groups")),
|
|
hostvars,
|
|
hostname,
|
|
strict=strict,
|
|
)
|