mirror of
https://github.com/kubevirt/kubevirt.core.git
synced 2026-03-26 19:03:16 +00:00
The support for connections to multiple clusters in the inventory plugin is dropped to better align with user expectations and how other inventories work. If inventories of multiple clusters are needed the inventory can be run multiple times with different configurations. This also helps to clean up the code and make it simpler. For now this adds a compatibility helper so that configurations with a single connection entry remain supported and a warning is emitted. Signed-off-by: Felix Matouschek <fmatouschek@redhat.com>
785 lines
28 KiB
Python
785 lines
28 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright 2023 Red Hat, Inc.
|
|
# Based on the kubernetes.core.k8s inventory
|
|
# Apache License 2.0 (see LICENSE or http://www.apache.org/licenses/LICENSE-2.0)
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
|
|
__metaclass__ = type
|
|
|
|
DOCUMENTATION = """
|
|
name: kubevirt
|
|
|
|
short_description: Inventory source for KubeVirt VirtualMachines and VirtualMachineInstances
|
|
|
|
author:
|
|
- "KubeVirt.io Project (!UNKNOWN)"
|
|
|
|
description:
|
|
- Fetch virtual machines from one or more namespaces with an optional label selector.
|
|
- Groups by cluster name, namespaces and labels.
|
|
- Uses V(*.kubevirt.[yml|yaml]) YAML configuration file to set parameter values.
|
|
- By default it uses the active context in I(~/.kube/config) and will return all virtual machines
|
|
for all namespaces the active user is authorized to access.
|
|
|
|
extends_documentation_fragment:
|
|
- kubevirt.core.kubevirt_auth_options
|
|
- inventory_cache
|
|
- constructed
|
|
|
|
options:
|
|
plugin:
|
|
description: Token that ensures this is a source file for the P(kubevirt.core.kubevirt#inventory) plugin.
|
|
required: True
|
|
choices: ["kubevirt", "kubevirt.core.kubevirt"]
|
|
host_format:
|
|
description:
|
|
- 'Specify the format of the host in the inventory group. Available specifiers: V(name), V(namespace) and V(uid).'
|
|
default: "{namespace}-{name}"
|
|
name:
|
|
description:
|
|
- Optional name to assign to the cluster. If not provided, a name is constructed from the server
|
|
and port.
|
|
namespaces:
|
|
description:
|
|
- List of namespaces. If not specified, will fetch virtual machines from all namespaces
|
|
the user is authorized to access.
|
|
label_selector:
|
|
description:
|
|
- Define a label selector to select a subset of the fetched virtual machines.
|
|
network_name:
|
|
description:
|
|
- In case multiple networks are attached to a virtual machine, define which interface should
|
|
be returned as primary IP address.
|
|
aliases: [ interface_name ]
|
|
kube_secondary_dns:
|
|
description:
|
|
- Enable C(kubesecondarydns) derived host names when using a secondary network interface.
|
|
type: bool
|
|
default: False
|
|
use_service:
|
|
description:
|
|
- Enable the use of C(Services) to establish an SSH connection to a virtual machine.
|
|
- Services are only used if no O(network_name) was provided.
|
|
type: bool
|
|
default: True
|
|
create_groups:
|
|
description:
|
|
- Enable the creation of groups from labels on C(VirtualMachines) and C(VirtualMachineInstances).
|
|
type: bool
|
|
default: False
|
|
base_domain:
|
|
description:
|
|
- Override the base domain used to construct host names. Used in case of
|
|
C(kubesecondarydns) or C(Services) of type C(NodePort) if O(append_base_domain) is set.
|
|
append_base_domain:
|
|
description:
|
|
- Append the base domain of the cluster to host names constructed from SSH C(Services) of type C(NodePort).
|
|
type: bool
|
|
default: False
|
|
api_version:
|
|
description:
|
|
- Specify the used KubeVirt API version.
|
|
default: "kubevirt.io/v1"
|
|
connections:
|
|
description:
|
|
- Optional list of cluster connection settings.
|
|
- This parameter is deprecated. Split your connections into multiple configuration files and move
|
|
parameters of each connection to the configuration top level.
|
|
- Deprecated in version C(1.5.0), will be removed in version C(3.0.0).
|
|
|
|
requirements:
|
|
- "python >= 3.9"
|
|
- "kubernetes >= 28.1.0"
|
|
- "PyYAML >= 3.11"
|
|
"""
|
|
|
|
EXAMPLES = """
|
|
# Filename must end with kubevirt.[yml|yaml]
|
|
|
|
# Authenticate with token and return all virtual machines from all accessible namespaces
|
|
- plugin: kubevirt.core.kubevirt
|
|
host: https://192.168.64.4:8443
|
|
api_key: xxxxxxxxxxxxxxxx
|
|
validate_certs: false
|
|
|
|
# Use default ~/.kube/config and return virtual machines from namespace testing connected to network bridge-network
|
|
- plugin: kubevirt.core.kubevirt
|
|
namespaces:
|
|
- testing
|
|
network_name: bridge-network
|
|
|
|
# Use default ~/.kube/config and return virtual machines from namespace testing with label app=test
|
|
- plugin: kubevirt.core.kubevirt
|
|
namespaces:
|
|
- testing
|
|
label_selector: app=test
|
|
|
|
# Use a custom config file and a specific context
|
|
- plugin: kubevirt.core.kubevirt
|
|
kubeconfig: /path/to/config
|
|
context: 'awx/192-168-64-4:8443/developer'
|
|
"""
|
|
|
|
from dataclasses import dataclass, InitVar
|
|
from json import loads
|
|
from re import compile as re_compile
|
|
from typing import (
|
|
Any,
|
|
Dict,
|
|
List,
|
|
Optional,
|
|
)
|
|
|
|
# Handle import errors of python kubernetes client.
|
|
# Set HAS_K8S_MODULE_HELPER and k8s_import exception accordingly to
|
|
# potentially print a warning to the user if the client is missing.
|
|
try:
|
|
from kubernetes.dynamic.exceptions import DynamicApiError
|
|
from kubernetes.dynamic.resource import ResourceField
|
|
|
|
HAS_K8S_MODULE_HELPER = True
|
|
K8S_IMPORT_EXCEPTION = None
|
|
except ImportError as e:
|
|
|
|
class DynamicApiError(Exception):
|
|
"""
|
|
Dummy class, mainly used for ansible-test sanity.
|
|
"""
|
|
|
|
class ResourceField:
|
|
"""
|
|
Dummy class, mainly used for ansible-test sanity.
|
|
"""
|
|
|
|
HAS_K8S_MODULE_HELPER = False
|
|
K8S_IMPORT_EXCEPTION = e
|
|
|
|
from ansible.plugins.inventory import BaseInventoryPlugin, Constructable, Cacheable
|
|
|
|
|
|
from ansible_collections.kubernetes.core.plugins.module_utils.k8s.client import (
|
|
get_api_client,
|
|
K8SClient,
|
|
)
|
|
|
|
ANNOTATION_KUBEVIRT_IO_CLUSTER_PREFERENCE_NAME = "kubevirt.io/cluster-preference-name"
|
|
ANNOTATION_KUBEVIRT_IO_PREFERENCE_NAME = "kubevirt.io/preference-name"
|
|
ANNOTATION_VM_KUBEVIRT_IO_OS = "vm.kubevirt.io/os"
|
|
LABEL_KUBEVIRT_IO_DOMAIN = "kubevirt.io/domain"
|
|
TYPE_LOADBALANCER = "LoadBalancer"
|
|
TYPE_NODEPORT = "NodePort"
|
|
ID_MSWINDOWS = "mswindows"
|
|
|
|
|
|
class KubeVirtInventoryException(Exception):
|
|
"""
|
|
This class is used for exceptions raised by this inventory.
|
|
"""
|
|
|
|
|
|
@dataclass
|
|
class InventoryOptions:
|
|
"""
|
|
This class holds the options defined by the user.
|
|
"""
|
|
|
|
api_version: Optional[str] = None
|
|
label_selector: Optional[str] = None
|
|
network_name: Optional[str] = None
|
|
kube_secondary_dns: Optional[bool] = None
|
|
use_service: Optional[bool] = None
|
|
create_groups: Optional[bool] = None
|
|
base_domain: Optional[str] = None
|
|
append_base_domain: Optional[bool] = None
|
|
host_format: Optional[str] = None
|
|
config_data: InitVar[Optional[Dict]] = None
|
|
|
|
def __post_init__(self, config_data: Optional[Dict]) -> None:
|
|
if not config_data or not isinstance(config_data, dict):
|
|
config_data = {}
|
|
|
|
# Copy values from config_data and set defaults for keys not present
|
|
self.api_version = (
|
|
self.api_version
|
|
if self.api_version is not None
|
|
else config_data.get("api_version", "kubevirt.io/v1")
|
|
)
|
|
self.label_selector = (
|
|
self.label_selector
|
|
if self.label_selector is not None
|
|
else config_data.get("label_selector")
|
|
)
|
|
self.network_name = (
|
|
self.network_name
|
|
if self.network_name is not None
|
|
else config_data.get("network_name", config_data.get("interface_name"))
|
|
)
|
|
self.kube_secondary_dns = (
|
|
self.kube_secondary_dns
|
|
if self.kube_secondary_dns is not None
|
|
else config_data.get("kube_secondary_dns", False)
|
|
)
|
|
self.use_service = (
|
|
self.use_service
|
|
if self.use_service is not None
|
|
else config_data.get("use_service", True)
|
|
)
|
|
self.create_groups = (
|
|
self.create_groups
|
|
if self.create_groups is not None
|
|
else config_data.get("create_groups", False)
|
|
)
|
|
self.base_domain = (
|
|
self.base_domain
|
|
if self.base_domain is not None
|
|
else config_data.get("base_domain")
|
|
)
|
|
self.append_base_domain = (
|
|
self.append_base_domain
|
|
if self.append_base_domain is not None
|
|
else config_data.get("append_base_domain", False)
|
|
)
|
|
self.host_format = (
|
|
self.host_format
|
|
if self.host_format is not None
|
|
else config_data.get("host_format", "{namespace}-{name}")
|
|
)
|
|
|
|
|
|
class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
|
|
"""
|
|
This class implements the actual inventory module.
|
|
"""
|
|
|
|
NAME = "kubevirt.core.kubevirt"
|
|
|
|
# Used to convert camel case variable names into snake case
|
|
snake_case_pattern = re_compile(r"(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])")
|
|
|
|
@staticmethod
|
|
def get_default_host_name(host: str) -> str:
|
|
"""
|
|
get_default_host_name strips URL schemes from the host name and
|
|
replaces invalid characters.
|
|
"""
|
|
return (
|
|
host.replace("https://", "")
|
|
.replace("http://", "")
|
|
.replace(".", "-")
|
|
.replace(":", "_")
|
|
)
|
|
|
|
@staticmethod
|
|
def format_dynamic_api_exc(exc: DynamicApiError) -> str:
|
|
"""
|
|
format_dynamic_api_exc tries to extract the message from the JSON body
|
|
of a DynamicApiError.
|
|
"""
|
|
if exc.body:
|
|
if exc.headers and exc.headers.get("Content-Type") == "application/json":
|
|
message = loads(exc.body).get("message")
|
|
if message:
|
|
return message
|
|
return exc.body
|
|
|
|
return f"{exc.status} Reason: {exc.reason}"
|
|
|
|
@staticmethod
|
|
def format_var_name(name: str) -> str:
|
|
"""
|
|
format_var_name formats a CamelCase variable name into a snake_case name
|
|
suitable for use as a inventory variable name.
|
|
"""
|
|
return InventoryModule.snake_case_pattern.sub("_", name).lower()
|
|
|
|
@staticmethod
|
|
def get_host_from_service(service: Dict, node_name: Optional[str]) -> Optional[str]:
|
|
"""
|
|
get_host_from_service extracts the hostname to be used from the
|
|
passed in service.
|
|
"""
|
|
service_type = service.get("spec", {}).get("type")
|
|
if service_type == TYPE_LOADBALANCER:
|
|
# LoadBalancer services can return a hostname or an IP address
|
|
ingress = service.get("status", {}).get("loadBalancer", {}).get("ingress")
|
|
if ingress is not None and len(ingress) > 0:
|
|
hostname = ingress[0].get("hostname")
|
|
ip_address = ingress[0].get("ip")
|
|
return hostname if hostname is not None else ip_address
|
|
elif service_type == TYPE_NODEPORT:
|
|
# NodePort services use the node name as host
|
|
return node_name
|
|
|
|
return None
|
|
|
|
@staticmethod
|
|
def get_port_from_service(service: Dict) -> Optional[str]:
|
|
"""
|
|
get_port_from_service extracts the port to be used from the
|
|
passed in service.
|
|
"""
|
|
ports = service.get("spec", {}).get("ports", [])
|
|
if not ports:
|
|
return None
|
|
|
|
service_type = service.get("spec", {}).get("type")
|
|
if service_type == TYPE_LOADBALANCER:
|
|
# LoadBalancer services use the port attribute
|
|
return ports[0].get("port")
|
|
if service_type == TYPE_NODEPORT:
|
|
# NodePort services use the nodePort attribute
|
|
return ports[0].get("nodePort")
|
|
|
|
return None
|
|
|
|
@staticmethod
|
|
def is_windows(guest_os_info: Optional[Dict], annotations: Optional[Dict]) -> bool:
|
|
"""
|
|
is_windows checkes whether a given VM is running a Windows guest
|
|
by checking its GuestOSInfo and annotations.
|
|
"""
|
|
if guest_os_info and "id" in guest_os_info:
|
|
return guest_os_info["id"] == ID_MSWINDOWS
|
|
|
|
if not annotations:
|
|
return False
|
|
|
|
if ANNOTATION_KUBEVIRT_IO_CLUSTER_PREFERENCE_NAME in annotations:
|
|
return annotations[
|
|
ANNOTATION_KUBEVIRT_IO_CLUSTER_PREFERENCE_NAME
|
|
].startswith("windows")
|
|
|
|
if ANNOTATION_KUBEVIRT_IO_PREFERENCE_NAME in annotations:
|
|
return annotations[ANNOTATION_KUBEVIRT_IO_PREFERENCE_NAME].startswith(
|
|
"windows"
|
|
)
|
|
|
|
if ANNOTATION_VM_KUBEVIRT_IO_OS in annotations:
|
|
return annotations[ANNOTATION_VM_KUBEVIRT_IO_OS].startswith("windows")
|
|
|
|
return False
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
|
|
def verify_file(self, path: str) -> None:
|
|
"""
|
|
verify_file ensures the inventory file is compatible with this plugin.
|
|
"""
|
|
return super().verify_file(path) and path.endswith(
|
|
("kubevirt.yml", "kubevirt.yaml")
|
|
)
|
|
|
|
def parse(self, inventory: Any, loader: Any, path: str, cache: bool = True) -> None:
|
|
"""
|
|
parse is the main entry point of the inventory.
|
|
It checks for availability of the Kubernetes Python client,
|
|
gets the configuration and runs fetch_objects or
|
|
if there is a cache it is used instead.
|
|
"""
|
|
super().parse(inventory, loader, path)
|
|
cache_key = self._get_cache_prefix(path)
|
|
config_data = self._read_config_data(path)
|
|
|
|
if not HAS_K8S_MODULE_HELPER:
|
|
raise KubeVirtInventoryException(
|
|
"This module requires the Kubernetes Python client. "
|
|
+ f"Try `pip install kubernetes`. Detail: {K8S_IMPORT_EXCEPTION}"
|
|
)
|
|
|
|
source_data = None
|
|
if cache and cache_key in self._cache:
|
|
try:
|
|
source_data = self._cache[cache_key]
|
|
except KeyError:
|
|
pass
|
|
|
|
if not source_data:
|
|
self.fetch_objects(config_data)
|
|
|
|
def fetch_objects(self, config_data: Dict) -> None:
|
|
"""
|
|
fetch_objects populates the inventory with the specified parameters.
|
|
"""
|
|
if not config_data or not isinstance(config_data, dict):
|
|
config_data = {}
|
|
|
|
self.connections_compatibility(config_data)
|
|
client = get_api_client(**config_data)
|
|
name = config_data.get(
|
|
"name", self.get_default_host_name(client.configuration.host)
|
|
)
|
|
namespaces = (
|
|
config_data["namespaces"]
|
|
if config_data.get("namespaces")
|
|
else self.get_available_namespaces(client)
|
|
)
|
|
opts = InventoryOptions(config_data=config_data)
|
|
if opts.base_domain is None:
|
|
opts.base_domain = self.get_cluster_domain(client)
|
|
for namespace in namespaces:
|
|
self.populate_inventory_from_namespace(client, name, namespace, opts)
|
|
|
|
def connections_compatibility(self, config_data: Dict) -> None:
|
|
collection_name = "kubevirt.core"
|
|
|
|
if (connections := config_data.get("connections")) is None:
|
|
return
|
|
|
|
self.display.deprecated(
|
|
msg="The 'connections' parameter is deprecated and now supports only a single list entry.",
|
|
version="2.0.0",
|
|
collection_name=collection_name,
|
|
)
|
|
|
|
if not isinstance(connections, list):
|
|
raise KubeVirtInventoryException("Expecting connections to be a list.")
|
|
|
|
if len(connections) == 1:
|
|
if not isinstance(connections[0], dict):
|
|
raise KubeVirtInventoryException(
|
|
"Expecting connection to be a dictionary."
|
|
)
|
|
# Copy the single connections entry into the top level
|
|
for k, v in connections[0].items():
|
|
config_data[k] = v
|
|
self.display.deprecated(
|
|
msg="Move all of your connection parameters to the configuration top level.",
|
|
version="3.0.0",
|
|
collection_name=collection_name,
|
|
)
|
|
elif len(connections) > 1:
|
|
self.display.deprecated(
|
|
msg="Split your connections into multiple configuration files.",
|
|
version="2.0.0",
|
|
collection_name=collection_name,
|
|
removed=True,
|
|
)
|
|
|
|
def get_cluster_domain(self, client: K8SClient) -> Optional[str]:
|
|
"""
|
|
get_cluster_domain tries to get the base domain of an OpenShift cluster.
|
|
"""
|
|
try:
|
|
v1_dns = client.resources.get(
|
|
api_version="config.openshift.io/v1", kind="DNS"
|
|
)
|
|
except Exception:
|
|
# If resource not found return None
|
|
return None
|
|
try:
|
|
obj = v1_dns.get(name="cluster")
|
|
except DynamicApiError as exc:
|
|
self.display.debug(
|
|
f"Failed to fetch cluster DNS config: {self.format_dynamic_api_exc(exc)}"
|
|
)
|
|
return None
|
|
return obj.get("spec", {}).get("baseDomain")
|
|
|
|
def get_resources(
|
|
self, client: K8SClient, api_version: str, kind: str, **kwargs
|
|
) -> List[ResourceField]:
|
|
"""
|
|
get_resources uses a dynamic K8SClient to fetch resources from the K8S API.
|
|
"""
|
|
client = client.resources.get(api_version=api_version, kind=kind)
|
|
try:
|
|
result = client.get(**kwargs)
|
|
except DynamicApiError as exc:
|
|
self.display.debug(exc)
|
|
raise KubeVirtInventoryException(
|
|
f"Error fetching {kind} list: {self.format_dynamic_api_exc(exc)}"
|
|
) from exc
|
|
|
|
return result.items
|
|
|
|
def get_available_namespaces(self, client: K8SClient) -> List[str]:
|
|
"""
|
|
get_available_namespaces lists all namespaces accessible with the
|
|
configured credentials and returns them.
|
|
"""
|
|
return [
|
|
namespace.metadata.name
|
|
for namespace in self.get_resources(client, "v1", "Namespace")
|
|
]
|
|
|
|
def get_vms_for_namespace(
|
|
self, client: K8SClient, namespace: str, opts: InventoryOptions
|
|
) -> List[ResourceField]:
|
|
"""
|
|
get_vms_for_namespace returns a list of all VirtualMachines in a namespace.
|
|
"""
|
|
return self.get_resources(
|
|
client,
|
|
opts.api_version,
|
|
"VirtualMachine",
|
|
namespace=namespace,
|
|
label_selector=opts.label_selector,
|
|
)
|
|
|
|
def get_vmis_for_namespace(
|
|
self, client: K8SClient, namespace: str, opts: InventoryOptions
|
|
) -> List[ResourceField]:
|
|
"""
|
|
get_vmis_for_namespace returns a list of all VirtualMachineInstances in a namespace.
|
|
"""
|
|
return self.get_resources(
|
|
client,
|
|
opts.api_version,
|
|
"VirtualMachineInstance",
|
|
namespace=namespace,
|
|
label_selector=opts.label_selector,
|
|
)
|
|
|
|
def get_ssh_services_for_namespace(self, client: K8SClient, namespace: str) -> Dict:
|
|
"""
|
|
get_ssh_services_for_namespace retrieves all services of a namespace exposing port 22/ssh.
|
|
The services are mapped to the name of the corresponding domain.
|
|
"""
|
|
service_list = self.get_resources(
|
|
client,
|
|
"v1",
|
|
"Service",
|
|
namespace=namespace,
|
|
)
|
|
|
|
services = {}
|
|
for service in service_list:
|
|
# Continue if service is not of type LoadBalancer or NodePort
|
|
if service.get("spec") is None:
|
|
continue
|
|
|
|
if service["spec"].get("type") not in (
|
|
TYPE_LOADBALANCER,
|
|
TYPE_NODEPORT,
|
|
):
|
|
continue
|
|
|
|
# Continue if ports are not defined, there are more than one port mapping
|
|
# or the target port is not port 22/ssh
|
|
ports = service["spec"].get("ports")
|
|
if ports is None or len(ports) != 1 or ports[0].get("targetPort") != 22:
|
|
continue
|
|
|
|
# Only add the service to the dict if the domain selector is present
|
|
domain = service["spec"].get("selector", {}).get(LABEL_KUBEVIRT_IO_DOMAIN)
|
|
if domain is not None:
|
|
services[domain] = service
|
|
|
|
return services
|
|
|
|
def populate_inventory_from_namespace(
|
|
self, client: K8SClient, name: str, namespace: str, opts: InventoryOptions
|
|
) -> None:
|
|
"""
|
|
populate_inventory_from_namespace adds groups and hosts from a
|
|
namespace to the inventory.
|
|
"""
|
|
vms = {
|
|
vm.metadata.name: vm
|
|
for vm in self.get_vms_for_namespace(client, namespace, opts)
|
|
}
|
|
vmis = {
|
|
vmi.metadata.name: vmi
|
|
for vmi in self.get_vmis_for_namespace(client, namespace, opts)
|
|
}
|
|
|
|
if not vms and not vmis:
|
|
# Return early if no VMs and VMIs were found to avoid adding empty groups.
|
|
return
|
|
|
|
services = self.get_ssh_services_for_namespace(client, namespace)
|
|
|
|
name = self._sanitize_group_name(name)
|
|
namespace_group = self._sanitize_group_name(f"namespace_{namespace}")
|
|
|
|
self.inventory.add_group(name)
|
|
self.inventory.add_group(namespace_group)
|
|
self.inventory.add_child(name, namespace_group)
|
|
|
|
# Add found VMs and optionally enhance with VMI data
|
|
for name, vm in vms.items():
|
|
hostname = self.add_host(vm, opts.host_format, namespace_group)
|
|
self.set_vars_from_vm(hostname, vm, opts)
|
|
if name in vmis:
|
|
self.set_vars_from_vmi(hostname, vmis[name], services, opts)
|
|
self.set_composable_vars(hostname)
|
|
|
|
# Add remaining VMIs without VM
|
|
for name, vmi in vmis.items():
|
|
if name in vms:
|
|
continue
|
|
hostname = self.add_host(vmi, opts.host_format, namespace_group)
|
|
self.set_vars_from_vmi(hostname, vmi, services, opts)
|
|
self.set_composable_vars(hostname)
|
|
|
|
def add_host(
|
|
self, obj: ResourceField, host_format: str, namespace_group: str
|
|
) -> str:
|
|
"""
|
|
add_hosts adds a host to the inventory.
|
|
"""
|
|
hostname = host_format.format(
|
|
namespace=obj.metadata.namespace,
|
|
name=obj.metadata.name,
|
|
uid=obj.metadata.uid,
|
|
)
|
|
self.inventory.add_host(hostname)
|
|
self.inventory.add_child(namespace_group, hostname)
|
|
|
|
return hostname
|
|
|
|
def set_vars_from_vm(
|
|
self, hostname: str, vm: ResourceField, opts: InventoryOptions
|
|
) -> None:
|
|
"""
|
|
set_vars_from_vm sets inventory variables from a VM prefixed with vm_.
|
|
"""
|
|
self.set_common_vars(hostname, "vm", vm, opts)
|
|
|
|
def set_vars_from_vmi(
|
|
self, hostname: str, vmi: ResourceField, services: Dict, opts: InventoryOptions
|
|
) -> None:
|
|
"""
|
|
set_vars_from_vmi sets inventory variables from a VMI prefixed with vmi_ and
|
|
looks up the interface to set ansible_host and ansible_port.
|
|
"""
|
|
self.set_common_vars(hostname, "vmi", vmi, opts)
|
|
|
|
if opts.network_name is None:
|
|
# Use first interface
|
|
interface = vmi.status.interfaces[0] if vmi.status.interfaces else None
|
|
else:
|
|
# Find interface by its name
|
|
interface = next(
|
|
(i for i in vmi.status.interfaces if i.name == opts.network_name),
|
|
None,
|
|
)
|
|
|
|
# If interface is not found or IP address is not reported skip this VMI
|
|
if interface is None or interface.ipAddress is None:
|
|
return
|
|
|
|
# Set up the connection
|
|
service = None
|
|
if self.is_windows(
|
|
{} if not vmi.status.guestOSInfo else vmi.status.guestOSInfo.to_dict(),
|
|
{} if not vmi.metadata.annotations else vmi.metadata.annotations.to_dict(),
|
|
):
|
|
self.inventory.set_variable(hostname, "ansible_connection", "winrm")
|
|
else:
|
|
service = services.get(vmi.metadata.labels.get(LABEL_KUBEVIRT_IO_DOMAIN))
|
|
self.set_ansible_host_and_port(
|
|
vmi,
|
|
hostname,
|
|
interface.ipAddress,
|
|
service,
|
|
opts,
|
|
)
|
|
|
|
def set_common_vars(
|
|
self, hostname: str, prefix: str, obj: ResourceField, opts: InventoryOptions
|
|
):
|
|
"""
|
|
set_common_vars sets common inventory variables from VMs or VMIs.
|
|
"""
|
|
# Add hostvars from metadata
|
|
if metadata := obj.metadata:
|
|
if metadata.annotations:
|
|
self.inventory.set_variable(
|
|
hostname, f"{prefix}_annotations", metadata.annotations.to_dict()
|
|
)
|
|
if metadata.labels:
|
|
self.inventory.set_variable(
|
|
hostname, f"{prefix}_labels", metadata.labels.to_dict()
|
|
)
|
|
# Create label groups and add vm to it if enabled
|
|
if opts.create_groups:
|
|
self.set_groups_from_labels(hostname, metadata.labels)
|
|
if metadata.resourceVersion:
|
|
self.inventory.set_variable(
|
|
hostname, f"{prefix}_resource_version", metadata.resourceVersion
|
|
)
|
|
if metadata.uid:
|
|
self.inventory.set_variable(hostname, f"{prefix}_uid", metadata.uid)
|
|
|
|
# Add hostvars from status
|
|
if obj.status:
|
|
for key, value in obj.status.to_dict().items():
|
|
name = self.format_var_name(key)
|
|
self.inventory.set_variable(hostname, f"{prefix}_{name}", value)
|
|
|
|
def set_groups_from_labels(self, hostname: str, labels: ResourceField) -> None:
|
|
"""
|
|
set_groups_from_labels adds groups for each label of a VM or VMI and
|
|
adds the host to each group.
|
|
"""
|
|
groups = []
|
|
for key, value in labels.to_dict().items():
|
|
group_name = self._sanitize_group_name(f"label_{key}_{value}")
|
|
if group_name not in groups:
|
|
groups.append(group_name)
|
|
# Add host to each label_value group
|
|
for group in groups:
|
|
self.inventory.add_group(group)
|
|
self.inventory.add_child(group, hostname)
|
|
|
|
def set_ansible_host_and_port(
|
|
self,
|
|
vmi: ResourceField,
|
|
hostname: str,
|
|
ip_address: str,
|
|
service: Optional[Dict],
|
|
opts: InventoryOptions,
|
|
) -> None:
|
|
"""
|
|
set_ansible_host_and_port sets the ansible_host and possibly the ansible_port var.
|
|
Secondary interfaces have priority over a service exposing SSH
|
|
"""
|
|
ansible_host = None
|
|
ansible_port = None
|
|
if opts.kube_secondary_dns and opts.network_name:
|
|
# Set ansible_host to the kubesecondarydns derived host name if enabled
|
|
# See https://github.com/kubevirt/kubesecondarydns#parameters
|
|
ansible_host = (
|
|
f"{opts.network_name}.{vmi.metadata.name}.{vmi.metadata.namespace}.vm"
|
|
)
|
|
if opts.base_domain:
|
|
ansible_host += f".{opts.base_domain}"
|
|
elif opts.use_service and service and not opts.network_name:
|
|
# Set ansible_host and ansible_port to the host and port from the LoadBalancer
|
|
# or NodePort service exposing SSH
|
|
node_name = vmi.status.nodeName
|
|
if node_name and opts.append_base_domain and opts.base_domain:
|
|
node_name += f".{opts.base_domain}"
|
|
host = self.get_host_from_service(service, node_name)
|
|
port = self.get_port_from_service(service)
|
|
if host is not None and port is not None:
|
|
ansible_host = host
|
|
ansible_port = port
|
|
|
|
# Default to the IP address of the interface if ansible_host was not set prior
|
|
if ansible_host is None:
|
|
ansible_host = ip_address
|
|
|
|
self.inventory.set_variable(hostname, "ansible_host", ansible_host)
|
|
self.inventory.set_variable(hostname, "ansible_port", ansible_port)
|
|
|
|
def set_composable_vars(self, hostname: str) -> None:
|
|
"""
|
|
set_composable_vars sets vars per
|
|
https://docs.ansible.com/ansible/latest/dev_guide/developing_inventory.html
|
|
"""
|
|
hostvars = self.inventory.get_host(hostname).get_vars()
|
|
strict = self.get_option("strict")
|
|
self._set_composite_vars(
|
|
self.get_option("compose"), hostvars, hostname, strict=True
|
|
)
|
|
self._add_host_to_composed_groups(
|
|
self.get_option("groups"), hostvars, hostname, strict=strict
|
|
)
|
|
self._add_host_to_keyed_groups(
|
|
self.get_option("keyed_groups"), hostvars, hostname, strict=strict
|
|
)
|