mirror of
https://github.com/kubevirt/kubevirt.core.git
synced 2026-03-27 03:13:10 +00:00
Use dicts instead of ResourceFields where possible to allow easier serialization/deserialization of objects fetched from the K8S API. Signed-off-by: Felix Matouschek <fmatouschek@redhat.com>
790 lines
28 KiB
Python
790 lines
28 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright 2023 Red Hat, Inc.
|
|
# Based on the kubernetes.core.k8s inventory
|
|
# Apache License 2.0 (see LICENSE or http://www.apache.org/licenses/LICENSE-2.0)
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
|
|
__metaclass__ = type
|
|
|
|
DOCUMENTATION = """
|
|
name: kubevirt
|
|
|
|
short_description: Inventory source for KubeVirt VirtualMachines and VirtualMachineInstances
|
|
|
|
author:
|
|
- "KubeVirt.io Project (!UNKNOWN)"
|
|
|
|
description:
|
|
- Fetch virtual machines from one or more namespaces with an optional label selector.
|
|
- Groups by cluster name, namespaces and labels.
|
|
- Uses V(*.kubevirt.[yml|yaml]) YAML configuration file to set parameter values.
|
|
- By default it uses the active context in I(~/.kube/config) and will return all virtual machines
|
|
for all namespaces the active user is authorized to access.
|
|
|
|
extends_documentation_fragment:
|
|
- kubevirt.core.kubevirt_auth_options
|
|
- inventory_cache
|
|
- constructed
|
|
|
|
options:
|
|
plugin:
|
|
description: Token that ensures this is a source file for the P(kubevirt.core.kubevirt#inventory) plugin.
|
|
required: True
|
|
choices: ["kubevirt", "kubevirt.core.kubevirt"]
|
|
host_format:
|
|
description:
|
|
- 'Specify the format of the host in the inventory group. Available specifiers: V(name), V(namespace) and V(uid).'
|
|
default: "{namespace}-{name}"
|
|
name:
|
|
description:
|
|
- Optional name to assign to the cluster. If not provided, a name is constructed from the server
|
|
and port.
|
|
namespaces:
|
|
description:
|
|
- List of namespaces. If not specified, will fetch virtual machines from all namespaces
|
|
the user is authorized to access.
|
|
label_selector:
|
|
description:
|
|
- Define a label selector to select a subset of the fetched virtual machines.
|
|
network_name:
|
|
description:
|
|
- In case multiple networks are attached to a virtual machine, define which interface should
|
|
be returned as primary IP address.
|
|
aliases: [ interface_name ]
|
|
kube_secondary_dns:
|
|
description:
|
|
- Enable C(kubesecondarydns) derived host names when using a secondary network interface.
|
|
type: bool
|
|
default: False
|
|
use_service:
|
|
description:
|
|
- Enable the use of C(Services) to establish an SSH connection to a virtual machine.
|
|
- Services are only used if no O(network_name) was provided.
|
|
type: bool
|
|
default: True
|
|
create_groups:
|
|
description:
|
|
- Enable the creation of groups from labels on C(VirtualMachines) and C(VirtualMachineInstances).
|
|
type: bool
|
|
default: False
|
|
base_domain:
|
|
description:
|
|
- Override the base domain used to construct host names. Used in case of
|
|
C(kubesecondarydns) or C(Services) of type C(NodePort) if O(append_base_domain) is set.
|
|
append_base_domain:
|
|
description:
|
|
- Append the base domain of the cluster to host names constructed from SSH C(Services) of type C(NodePort).
|
|
type: bool
|
|
default: False
|
|
api_version:
|
|
description:
|
|
- Specify the used KubeVirt API version.
|
|
default: "kubevirt.io/v1"
|
|
connections:
|
|
description:
|
|
- Optional list of cluster connection settings.
|
|
- This parameter is deprecated. Split your connections into multiple configuration files and move
|
|
parameters of each connection to the configuration top level.
|
|
- Deprecated in version C(1.5.0), will be removed in version C(3.0.0).
|
|
|
|
requirements:
|
|
- "python >= 3.9"
|
|
- "kubernetes >= 28.1.0"
|
|
- "PyYAML >= 3.11"
|
|
"""
|
|
|
|
EXAMPLES = """
|
|
# Filename must end with kubevirt.[yml|yaml]
|
|
|
|
# Authenticate with token and return all virtual machines from all accessible namespaces
|
|
- plugin: kubevirt.core.kubevirt
|
|
host: https://192.168.64.4:8443
|
|
api_key: xxxxxxxxxxxxxxxx
|
|
validate_certs: false
|
|
|
|
# Use default ~/.kube/config and return virtual machines from namespace testing connected to network bridge-network
|
|
- plugin: kubevirt.core.kubevirt
|
|
namespaces:
|
|
- testing
|
|
network_name: bridge-network
|
|
|
|
# Use default ~/.kube/config and return virtual machines from namespace testing with label app=test
|
|
- plugin: kubevirt.core.kubevirt
|
|
namespaces:
|
|
- testing
|
|
label_selector: app=test
|
|
|
|
# Use a custom config file and a specific context
|
|
- plugin: kubevirt.core.kubevirt
|
|
kubeconfig: /path/to/config
|
|
context: 'awx/192-168-64-4:8443/developer'
|
|
"""
|
|
|
|
from dataclasses import dataclass, InitVar
|
|
from json import loads
|
|
from re import compile as re_compile
|
|
from typing import (
|
|
Any,
|
|
Dict,
|
|
List,
|
|
Optional,
|
|
)
|
|
|
|
# Handle import errors of python kubernetes client.
|
|
# Set HAS_K8S_MODULE_HELPER and k8s_import exception accordingly to
|
|
# potentially print a warning to the user if the client is missing.
|
|
try:
|
|
from kubernetes.dynamic.exceptions import DynamicApiError
|
|
|
|
HAS_K8S_MODULE_HELPER = True
|
|
K8S_IMPORT_EXCEPTION = None
|
|
except ImportError as e:
|
|
|
|
class DynamicApiError(Exception):
|
|
"""
|
|
Dummy class, mainly used for ansible-test sanity.
|
|
"""
|
|
|
|
HAS_K8S_MODULE_HELPER = False
|
|
K8S_IMPORT_EXCEPTION = e
|
|
|
|
from ansible.plugins.inventory import BaseInventoryPlugin, Constructable, Cacheable
|
|
|
|
from ansible_collections.kubernetes.core.plugins.module_utils.k8s.client import (
|
|
get_api_client,
|
|
K8SClient,
|
|
)
|
|
|
|
ANNOTATION_KUBEVIRT_IO_CLUSTER_PREFERENCE_NAME = "kubevirt.io/cluster-preference-name"
|
|
ANNOTATION_KUBEVIRT_IO_PREFERENCE_NAME = "kubevirt.io/preference-name"
|
|
ANNOTATION_VM_KUBEVIRT_IO_OS = "vm.kubevirt.io/os"
|
|
LABEL_KUBEVIRT_IO_DOMAIN = "kubevirt.io/domain"
|
|
TYPE_LOADBALANCER = "LoadBalancer"
|
|
TYPE_NODEPORT = "NodePort"
|
|
ID_MSWINDOWS = "mswindows"
|
|
|
|
|
|
class KubeVirtInventoryException(Exception):
|
|
"""
|
|
This class is used for exceptions raised by this inventory.
|
|
"""
|
|
|
|
|
|
@dataclass
|
|
class InventoryOptions:
|
|
"""
|
|
This class holds the options defined by the user.
|
|
"""
|
|
|
|
api_version: Optional[str] = None
|
|
label_selector: Optional[str] = None
|
|
network_name: Optional[str] = None
|
|
kube_secondary_dns: Optional[bool] = None
|
|
use_service: Optional[bool] = None
|
|
create_groups: Optional[bool] = None
|
|
base_domain: Optional[str] = None
|
|
append_base_domain: Optional[bool] = None
|
|
host_format: Optional[str] = None
|
|
config_data: InitVar[Optional[Dict]] = None
|
|
|
|
def __post_init__(self, config_data: Optional[Dict]) -> None:
|
|
if not config_data or not isinstance(config_data, dict):
|
|
config_data = {}
|
|
|
|
# Copy values from config_data and set defaults for keys not present
|
|
self.api_version = (
|
|
self.api_version
|
|
if self.api_version is not None
|
|
else config_data.get("api_version", "kubevirt.io/v1")
|
|
)
|
|
self.label_selector = (
|
|
self.label_selector
|
|
if self.label_selector is not None
|
|
else config_data.get("label_selector")
|
|
)
|
|
self.network_name = (
|
|
self.network_name
|
|
if self.network_name is not None
|
|
else config_data.get("network_name", config_data.get("interface_name"))
|
|
)
|
|
self.kube_secondary_dns = (
|
|
self.kube_secondary_dns
|
|
if self.kube_secondary_dns is not None
|
|
else config_data.get("kube_secondary_dns", False)
|
|
)
|
|
self.use_service = (
|
|
self.use_service
|
|
if self.use_service is not None
|
|
else config_data.get("use_service", True)
|
|
)
|
|
self.create_groups = (
|
|
self.create_groups
|
|
if self.create_groups is not None
|
|
else config_data.get("create_groups", False)
|
|
)
|
|
self.base_domain = (
|
|
self.base_domain
|
|
if self.base_domain is not None
|
|
else config_data.get("base_domain")
|
|
)
|
|
self.append_base_domain = (
|
|
self.append_base_domain
|
|
if self.append_base_domain is not None
|
|
else config_data.get("append_base_domain", False)
|
|
)
|
|
self.host_format = (
|
|
self.host_format
|
|
if self.host_format is not None
|
|
else config_data.get("host_format", "{namespace}-{name}")
|
|
)
|
|
|
|
|
|
class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
|
|
"""
|
|
This class implements the actual inventory module.
|
|
"""
|
|
|
|
NAME = "kubevirt.core.kubevirt"
|
|
|
|
# Used to convert camel case variable names into snake case
|
|
snake_case_pattern = re_compile(r"(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])")
|
|
|
|
@staticmethod
|
|
def get_default_host_name(host: str) -> str:
|
|
"""
|
|
get_default_host_name strips URL schemes from the host name and
|
|
replaces invalid characters.
|
|
"""
|
|
return (
|
|
host.replace("https://", "")
|
|
.replace("http://", "")
|
|
.replace(".", "-")
|
|
.replace(":", "_")
|
|
)
|
|
|
|
@staticmethod
|
|
def format_dynamic_api_exc(exc: DynamicApiError) -> str:
|
|
"""
|
|
format_dynamic_api_exc tries to extract the message from the JSON body
|
|
of a DynamicApiError.
|
|
"""
|
|
if exc.body:
|
|
if exc.headers and exc.headers.get("Content-Type") == "application/json":
|
|
message = loads(exc.body).get("message")
|
|
if message:
|
|
return message
|
|
return exc.body
|
|
|
|
return f"{exc.status} Reason: {exc.reason}"
|
|
|
|
@staticmethod
|
|
def format_var_name(name: str) -> str:
|
|
"""
|
|
format_var_name formats a CamelCase variable name into a snake_case name
|
|
suitable for use as a inventory variable name.
|
|
"""
|
|
return InventoryModule.snake_case_pattern.sub("_", name).lower()
|
|
|
|
@staticmethod
|
|
def obj_is_valid(obj: Dict) -> bool:
|
|
"""
|
|
obj_is_valid ensures commonly used keys are present in the passed object.
|
|
"""
|
|
return bool(
|
|
"spec" in obj
|
|
and "status" in obj
|
|
and "metadata" in obj
|
|
and obj["metadata"].get("name")
|
|
and obj["metadata"].get("namespace")
|
|
and obj["metadata"].get("uid")
|
|
)
|
|
|
|
@staticmethod
|
|
def get_host_from_service(service: Dict, node_name: Optional[str]) -> Optional[str]:
|
|
"""
|
|
get_host_from_service extracts the hostname to be used from the
|
|
passed in service.
|
|
"""
|
|
service_type = service.get("spec", {}).get("type")
|
|
if service_type == TYPE_LOADBALANCER:
|
|
# LoadBalancer services can return a hostname or an IP address
|
|
ingress = service.get("status", {}).get("loadBalancer", {}).get("ingress")
|
|
if ingress is not None and len(ingress) > 0:
|
|
hostname = ingress[0].get("hostname")
|
|
ip_address = ingress[0].get("ip")
|
|
return hostname if hostname is not None else ip_address
|
|
elif service_type == TYPE_NODEPORT:
|
|
# NodePort services use the node name as host
|
|
return node_name
|
|
|
|
return None
|
|
|
|
@staticmethod
|
|
def get_port_from_service(service: Dict) -> Optional[str]:
|
|
"""
|
|
get_port_from_service extracts the port to be used from the
|
|
passed in service.
|
|
"""
|
|
ports = service.get("spec", {}).get("ports", [])
|
|
if not ports:
|
|
return None
|
|
|
|
service_type = service.get("spec", {}).get("type")
|
|
if service_type == TYPE_LOADBALANCER:
|
|
# LoadBalancer services use the port attribute
|
|
return ports[0].get("port")
|
|
if service_type == TYPE_NODEPORT:
|
|
# NodePort services use the nodePort attribute
|
|
return ports[0].get("nodePort")
|
|
|
|
return None
|
|
|
|
@staticmethod
|
|
def is_windows(guest_os_info: Optional[Dict], annotations: Optional[Dict]) -> bool:
|
|
"""
|
|
is_windows checkes whether a given VM is running a Windows guest
|
|
by checking its GuestOSInfo and annotations.
|
|
"""
|
|
if guest_os_info and "id" in guest_os_info:
|
|
return guest_os_info["id"] == ID_MSWINDOWS
|
|
|
|
if not annotations:
|
|
return False
|
|
|
|
if ANNOTATION_KUBEVIRT_IO_CLUSTER_PREFERENCE_NAME in annotations:
|
|
return annotations[
|
|
ANNOTATION_KUBEVIRT_IO_CLUSTER_PREFERENCE_NAME
|
|
].startswith("windows")
|
|
|
|
if ANNOTATION_KUBEVIRT_IO_PREFERENCE_NAME in annotations:
|
|
return annotations[ANNOTATION_KUBEVIRT_IO_PREFERENCE_NAME].startswith(
|
|
"windows"
|
|
)
|
|
|
|
if ANNOTATION_VM_KUBEVIRT_IO_OS in annotations:
|
|
return annotations[ANNOTATION_VM_KUBEVIRT_IO_OS].startswith("windows")
|
|
|
|
return False
|
|
|
|
def verify_file(self, path: str) -> None:
|
|
"""
|
|
verify_file ensures the inventory file is compatible with this plugin.
|
|
"""
|
|
return super().verify_file(path) and path.endswith(
|
|
("kubevirt.yml", "kubevirt.yaml")
|
|
)
|
|
|
|
def parse(self, inventory: Any, loader: Any, path: str, cache: bool = True) -> None:
|
|
"""
|
|
parse is the main entry point of the inventory.
|
|
It checks for availability of the Kubernetes Python client,
|
|
gets the configuration and runs fetch_objects or
|
|
if there is a cache it is used instead.
|
|
"""
|
|
super().parse(inventory, loader, path)
|
|
cache_key = self._get_cache_prefix(path)
|
|
config_data = self._read_config_data(path)
|
|
|
|
if not HAS_K8S_MODULE_HELPER:
|
|
raise KubeVirtInventoryException(
|
|
"This module requires the Kubernetes Python client. "
|
|
+ f"Try `pip install kubernetes`. Detail: {K8S_IMPORT_EXCEPTION}"
|
|
)
|
|
|
|
source_data = None
|
|
if cache and cache_key in self._cache:
|
|
try:
|
|
source_data = self._cache[cache_key]
|
|
except KeyError:
|
|
pass
|
|
|
|
if not source_data:
|
|
self.fetch_objects(config_data)
|
|
|
|
def fetch_objects(self, config_data: Dict) -> None:
|
|
"""
|
|
fetch_objects populates the inventory with the specified parameters.
|
|
"""
|
|
if not config_data or not isinstance(config_data, dict):
|
|
config_data = {}
|
|
|
|
self.connections_compatibility(config_data)
|
|
client = get_api_client(**config_data)
|
|
name = config_data.get(
|
|
"name", self.get_default_host_name(client.configuration.host)
|
|
)
|
|
namespaces = (
|
|
config_data["namespaces"]
|
|
if config_data.get("namespaces")
|
|
else self.get_available_namespaces(client)
|
|
)
|
|
opts = InventoryOptions(config_data=config_data)
|
|
if opts.base_domain is None:
|
|
opts.base_domain = self.get_cluster_domain(client)
|
|
for namespace in namespaces:
|
|
self.populate_inventory_from_namespace(client, name, namespace, opts)
|
|
|
|
def connections_compatibility(self, config_data: Dict) -> None:
|
|
collection_name = "kubevirt.core"
|
|
version_removed_in = "3.0.0"
|
|
|
|
if (connections := config_data.get("connections")) is None:
|
|
return
|
|
|
|
self.display.deprecated(
|
|
msg="The 'connections' parameter is deprecated and now supports only a single list entry.",
|
|
version=version_removed_in,
|
|
collection_name=collection_name,
|
|
)
|
|
|
|
if not isinstance(connections, list):
|
|
raise KubeVirtInventoryException("Expecting connections to be a list.")
|
|
|
|
if len(connections) == 1:
|
|
if not isinstance(connections[0], dict):
|
|
raise KubeVirtInventoryException(
|
|
"Expecting connection to be a dictionary."
|
|
)
|
|
# Copy the single connections entry into the top level
|
|
for k, v in connections[0].items():
|
|
config_data[k] = v
|
|
self.display.deprecated(
|
|
msg="Move all of your connection parameters to the configuration top level.",
|
|
version=version_removed_in,
|
|
collection_name=collection_name,
|
|
)
|
|
elif len(connections) > 1:
|
|
self.display.deprecated(
|
|
msg="Split your connections into multiple configuration files.",
|
|
version="2.0.0",
|
|
collection_name=collection_name,
|
|
removed=True,
|
|
)
|
|
|
|
def get_cluster_domain(self, client: K8SClient) -> Optional[str]:
|
|
"""
|
|
get_cluster_domain tries to get the base domain of an OpenShift cluster.
|
|
"""
|
|
try:
|
|
v1_dns = client.resources.get(
|
|
api_version="config.openshift.io/v1", kind="DNS"
|
|
)
|
|
except Exception:
|
|
# If resource not found return None
|
|
return None
|
|
try:
|
|
obj = v1_dns.get(name="cluster")
|
|
except DynamicApiError as exc:
|
|
self.display.debug(
|
|
f"Failed to fetch cluster DNS config: {self.format_dynamic_api_exc(exc)}"
|
|
)
|
|
return None
|
|
return obj.get("spec", {}).get("baseDomain")
|
|
|
|
def get_resources(
|
|
self, client: K8SClient, api_version: str, kind: str, **kwargs
|
|
) -> List[Dict]:
|
|
"""
|
|
get_resources uses a dynamic K8SClient to fetch resources from the K8S API.
|
|
"""
|
|
client = client.resources.get(api_version=api_version, kind=kind)
|
|
try:
|
|
result = client.get(**kwargs)
|
|
except DynamicApiError as exc:
|
|
self.display.debug(exc)
|
|
raise KubeVirtInventoryException(
|
|
f"Error fetching {kind} list: {self.format_dynamic_api_exc(exc)}"
|
|
) from exc
|
|
|
|
return [item.to_dict() for item in result.items]
|
|
|
|
def get_available_namespaces(self, client: K8SClient) -> List[str]:
|
|
"""
|
|
get_available_namespaces lists all namespaces accessible with the
|
|
configured credentials and returns them.
|
|
"""
|
|
return [
|
|
namespace["metadata"]["name"]
|
|
for namespace in self.get_resources(client, "v1", "Namespace")
|
|
if "metadata" in namespace and "name" in namespace["metadata"]
|
|
]
|
|
|
|
def get_vms_for_namespace(
|
|
self, client: K8SClient, namespace: str, opts: InventoryOptions
|
|
) -> List[Dict]:
|
|
"""
|
|
get_vms_for_namespace returns a list of all VirtualMachines in a namespace.
|
|
"""
|
|
return self.get_resources(
|
|
client,
|
|
opts.api_version,
|
|
"VirtualMachine",
|
|
namespace=namespace,
|
|
label_selector=opts.label_selector,
|
|
)
|
|
|
|
def get_vmis_for_namespace(
|
|
self, client: K8SClient, namespace: str, opts: InventoryOptions
|
|
) -> List[Dict]:
|
|
"""
|
|
get_vmis_for_namespace returns a list of all VirtualMachineInstances in a namespace.
|
|
"""
|
|
return self.get_resources(
|
|
client,
|
|
opts.api_version,
|
|
"VirtualMachineInstance",
|
|
namespace=namespace,
|
|
label_selector=opts.label_selector,
|
|
)
|
|
|
|
def get_ssh_services_for_namespace(self, client: K8SClient, namespace: str) -> Dict:
|
|
"""
|
|
get_ssh_services_for_namespace retrieves all services of a namespace exposing port 22/ssh.
|
|
The services are mapped to the name of the corresponding domain.
|
|
"""
|
|
items = self.get_resources(
|
|
client,
|
|
"v1",
|
|
"Service",
|
|
namespace=namespace,
|
|
)
|
|
|
|
services = {}
|
|
for service in items:
|
|
# Continue if service is not of type LoadBalancer or NodePort
|
|
if not (spec := service.get("spec")):
|
|
continue
|
|
|
|
if spec.get("type") not in (
|
|
TYPE_LOADBALANCER,
|
|
TYPE_NODEPORT,
|
|
):
|
|
continue
|
|
|
|
# Continue if ports are not defined, there are more than one port mapping
|
|
# or the target port is not port 22/ssh
|
|
if (
|
|
(ports := spec.get("ports")) is None
|
|
or len(ports) != 1
|
|
or ports[0].get("targetPort") != 22
|
|
):
|
|
continue
|
|
|
|
# Only add the service to the dict if the domain selector is present
|
|
if domain := spec.get("selector", {}).get(LABEL_KUBEVIRT_IO_DOMAIN):
|
|
services[domain] = service
|
|
|
|
return services
|
|
|
|
|
|
def populate_inventory_from_namespace(
|
|
self, client: K8SClient, name: str, namespace: str, opts: InventoryOptions
|
|
) -> None:
|
|
"""
|
|
populate_inventory_from_namespace adds groups and hosts from a
|
|
namespace to the inventory.
|
|
"""
|
|
vms = {
|
|
vm["metadata"]["name"]: vm
|
|
for vm in self.get_vms_for_namespace(client, namespace, opts)
|
|
if self.obj_is_valid(vm)
|
|
}
|
|
vmis = {
|
|
vmi["metadata"]["name"]: vmi
|
|
for vmi in self.get_vmis_for_namespace(client, namespace, opts)
|
|
if self.obj_is_valid(vmi)
|
|
}
|
|
|
|
if not vms and not vmis:
|
|
# Return early if no VMs and VMIs were found to avoid adding empty groups.
|
|
return
|
|
|
|
services = self.get_ssh_services_for_namespace(client, namespace)
|
|
|
|
name = self._sanitize_group_name(name)
|
|
namespace_group = self._sanitize_group_name(f"namespace_{namespace}")
|
|
|
|
self.inventory.add_group(name)
|
|
self.inventory.add_group(namespace_group)
|
|
self.inventory.add_child(name, namespace_group)
|
|
|
|
# Add found VMs and optionally enhance with VMI data
|
|
for name, vm in vms.items():
|
|
hostname = self.add_host(vm["metadata"], opts.host_format, namespace_group)
|
|
self.set_vars_from_vm(hostname, vm, opts)
|
|
if name in vmis:
|
|
self.set_vars_from_vmi(hostname, vmis[name], services, opts)
|
|
self.set_composable_vars(hostname)
|
|
|
|
# Add remaining VMIs without VM
|
|
for name, vmi in vmis.items():
|
|
if name in vms:
|
|
continue
|
|
hostname = self.add_host(vmi["metadata"], opts.host_format, namespace_group)
|
|
self.set_vars_from_vmi(hostname, vmi, services, opts)
|
|
self.set_composable_vars(hostname)
|
|
|
|
def add_host(self, metadata: Dict, host_format: str, namespace_group: str) -> str:
|
|
"""
|
|
add_hosts adds a host to the inventory.
|
|
"""
|
|
hostname = host_format.format(
|
|
namespace=metadata["namespace"],
|
|
name=metadata["name"],
|
|
uid=metadata["uid"],
|
|
)
|
|
self.inventory.add_host(hostname)
|
|
self.inventory.add_child(namespace_group, hostname)
|
|
|
|
return hostname
|
|
|
|
def set_vars_from_vm(self, hostname: str, vm: Dict, opts: InventoryOptions) -> None:
|
|
"""
|
|
set_vars_from_vm sets inventory variables from a VM prefixed with vm_.
|
|
"""
|
|
self.set_common_vars(hostname, "vm", vm, opts)
|
|
|
|
def set_vars_from_vmi(
|
|
self, hostname: str, vmi: Dict, services: Dict, opts: InventoryOptions
|
|
) -> None:
|
|
"""
|
|
set_vars_from_vmi sets inventory variables from a VMI prefixed with vmi_ and
|
|
looks up the interface to set ansible_host and ansible_port.
|
|
"""
|
|
self.set_common_vars(hostname, "vmi", vmi, opts)
|
|
|
|
if not (interfaces := vmi["status"].get("interfaces")):
|
|
return
|
|
|
|
if opts.network_name is None:
|
|
# Use first interface
|
|
interface = interfaces[0]
|
|
else:
|
|
# Find interface by its name
|
|
interface = next(
|
|
(i for i in interfaces if i.get("name") == opts.network_name),
|
|
None,
|
|
)
|
|
|
|
# If interface is not found or IP address is not reported skip this VMI
|
|
if not interface or not interface.get("ipAddress"):
|
|
return
|
|
|
|
# Set up the connection
|
|
service = None
|
|
if self.is_windows(
|
|
vmi["status"].get("guestOSInfo", {}),
|
|
vmi["metadata"].get("annotations", {}),
|
|
):
|
|
self.inventory.set_variable(hostname, "ansible_connection", "winrm")
|
|
else:
|
|
service = services.get(
|
|
vmi["metadata"].get("labels", {}).get(LABEL_KUBEVIRT_IO_DOMAIN)
|
|
)
|
|
self.set_ansible_host_and_port(
|
|
vmi,
|
|
hostname,
|
|
interface["ipAddress"],
|
|
service,
|
|
opts,
|
|
)
|
|
|
|
def set_common_vars(
|
|
self, hostname: str, prefix: str, obj: Dict, opts: InventoryOptions
|
|
):
|
|
"""
|
|
set_common_vars sets common inventory variables from VMs or VMIs.
|
|
"""
|
|
# Add hostvars from metadata
|
|
if annotations := obj["metadata"].get("annotations"):
|
|
self.inventory.set_variable(hostname, f"{prefix}_annotations", annotations)
|
|
if labels := obj["metadata"].get("labels"):
|
|
self.inventory.set_variable(hostname, f"{prefix}_labels", labels)
|
|
# Create label groups and add vm to it if enabled
|
|
if opts.create_groups:
|
|
self.set_groups_from_labels(hostname, labels)
|
|
if resource_version := obj["metadata"].get("resourceVersion"):
|
|
self.inventory.set_variable(
|
|
hostname, f"{prefix}_resource_version", resource_version
|
|
)
|
|
if uid := obj["metadata"].get("uid"):
|
|
self.inventory.set_variable(hostname, f"{prefix}_uid", uid)
|
|
|
|
# Add hostvars from status
|
|
for key, value in obj["status"].items():
|
|
self.inventory.set_variable(
|
|
hostname, f"{prefix}_{self.format_var_name(key)}", value
|
|
)
|
|
|
|
def set_groups_from_labels(self, hostname: str, labels: Dict) -> None:
|
|
"""
|
|
set_groups_from_labels adds groups for each label of a VM or VMI and
|
|
adds the host to each group.
|
|
"""
|
|
groups = []
|
|
for key, value in labels.items():
|
|
group_name = self._sanitize_group_name(f"label_{key}_{value}")
|
|
if group_name not in groups:
|
|
groups.append(group_name)
|
|
# Add host to each label_value group
|
|
for group in groups:
|
|
self.inventory.add_group(group)
|
|
self.inventory.add_child(group, hostname)
|
|
|
|
def set_ansible_host_and_port(
|
|
self,
|
|
vmi: Dict,
|
|
hostname: str,
|
|
ip_address: str,
|
|
service: Optional[Dict],
|
|
opts: InventoryOptions,
|
|
) -> None:
|
|
"""
|
|
set_ansible_host_and_port sets the ansible_host and possibly the ansible_port var.
|
|
Secondary interfaces have priority over a service exposing SSH
|
|
"""
|
|
ansible_host = None
|
|
ansible_port = None
|
|
if opts.kube_secondary_dns and opts.network_name:
|
|
# Set ansible_host to the kubesecondarydns derived host name if enabled
|
|
# See https://github.com/kubevirt/kubesecondarydns#parameters
|
|
ansible_host = f"{opts.network_name}.{vmi['metadata']['name']}.{vmi['metadata']['namespace']}.vm"
|
|
if opts.base_domain:
|
|
ansible_host += f".{opts.base_domain}"
|
|
elif opts.use_service and service and not opts.network_name:
|
|
# Set ansible_host and ansible_port to the host and port from the LoadBalancer
|
|
# or NodePort service exposing SSH
|
|
node_name = vmi["status"].get("nodeName")
|
|
if node_name and opts.append_base_domain and opts.base_domain:
|
|
node_name += f".{opts.base_domain}"
|
|
host = self.get_host_from_service(service, node_name)
|
|
port = self.get_port_from_service(service)
|
|
if host is not None and port is not None:
|
|
ansible_host = host
|
|
ansible_port = port
|
|
|
|
# Default to the IP address of the interface if ansible_host was not set prior
|
|
if ansible_host is None:
|
|
ansible_host = ip_address
|
|
|
|
self.inventory.set_variable(hostname, "ansible_host", ansible_host)
|
|
self.inventory.set_variable(hostname, "ansible_port", ansible_port)
|
|
|
|
def set_composable_vars(self, hostname: str) -> None:
|
|
"""
|
|
set_composable_vars sets vars per
|
|
https://docs.ansible.com/ansible/latest/dev_guide/developing_inventory.html
|
|
"""
|
|
hostvars = self.inventory.get_host(hostname).get_vars()
|
|
strict = self.get_option("strict")
|
|
self._set_composite_vars(
|
|
self.get_option("compose"), hostvars, hostname, strict=True
|
|
)
|
|
self._add_host_to_composed_groups(
|
|
self.get_option("groups"), hostvars, hostname, strict=strict
|
|
)
|
|
self._add_host_to_keyed_groups(
|
|
self.get_option("keyed_groups"), hostvars, hostname, strict=strict
|
|
)
|