mirror of
https://github.com/kubevirt/kubevirt.core.git
synced 2026-03-26 19:03:16 +00:00
Merge pull request #114 from 0xFelix/vm-vmi-2
feat,test(inventory): Support listing stopped VMs and major rework of unit tests
This commit is contained in:
2
examples/default.kubevirt.yml
Normal file
2
examples/default.kubevirt.yml
Normal file
@@ -0,0 +1,2 @@
|
||||
---
|
||||
plugin: kubevirt.core.kubevirt
|
||||
29
examples/play-create-stopped.yml
Normal file
29
examples/play-create-stopped.yml
Normal file
@@ -0,0 +1,29 @@
|
||||
---
|
||||
- name: Playbook creating a stopped virtual machine
|
||||
hosts: localhost
|
||||
tasks:
|
||||
- name: Create VM
|
||||
kubevirt.core.kubevirt_vm:
|
||||
state: present
|
||||
name: testvm-stopped
|
||||
namespace: default
|
||||
running: false
|
||||
instancetype:
|
||||
name: u1.medium
|
||||
preference:
|
||||
name: fedora
|
||||
spec:
|
||||
domain:
|
||||
devices: {}
|
||||
volumes:
|
||||
- containerDisk:
|
||||
image: quay.io/containerdisks/fedora:latest
|
||||
name: containerdisk
|
||||
- cloudInitNoCloud:
|
||||
userData: |-
|
||||
#cloud-config
|
||||
# The default username is: fedora
|
||||
ssh_authorized_keys:
|
||||
- ssh-ed25519 AAAA...
|
||||
name: cloudinit
|
||||
wait: true
|
||||
@@ -10,13 +10,13 @@ __metaclass__ = type
|
||||
DOCUMENTATION = """
|
||||
name: kubevirt
|
||||
|
||||
short_description: Inventory source for KubeVirt VirtualMachines
|
||||
short_description: Inventory source for KubeVirt VirtualMachines and VirtualMachineInstances
|
||||
|
||||
author:
|
||||
- "KubeVirt.io Project (!UNKNOWN)"
|
||||
|
||||
description:
|
||||
- Fetch running C(VirtualMachineInstances) for one or more namespaces with an optional label selector.
|
||||
- Fetch virtual machines from one or more namespaces with an optional label selector.
|
||||
- Groups by cluster name, namespace and labels.
|
||||
- Uses the M(kubernetes.core.kubectl) connection plugin to access the Kubernetes cluster.
|
||||
- Uses V(*.kubevirt.[yml|yaml]) YAML configuration file to set parameter values.
|
||||
@@ -94,14 +94,14 @@ options:
|
||||
aliases: [ verify_ssl ]
|
||||
namespaces:
|
||||
description:
|
||||
- List of namespaces. If not specified, will fetch all C(VirtualMachineInstances) for all namespaces
|
||||
- List of namespaces. If not specified, will fetch virtual machines from all namespaces
|
||||
the user is authorized to access.
|
||||
label_selector:
|
||||
description:
|
||||
- Define a label selector to select a subset of the fetched C(VirtualMachineInstances).
|
||||
- Define a label selector to select a subset of the fetched virtual machines.
|
||||
network_name:
|
||||
description:
|
||||
- In case multiple networks are attached to a C(VirtualMachineInstance), define which interface should
|
||||
- In case multiple networks are attached to a virtual machine, define which interface should
|
||||
be returned as primary IP address.
|
||||
aliases: [ interface_name ]
|
||||
kube_secondary_dns:
|
||||
@@ -111,18 +111,18 @@ options:
|
||||
default: False
|
||||
use_service:
|
||||
description:
|
||||
- Enable the use of C(Services) to establish an SSH connection to the C(VirtualMachine).
|
||||
- Enable the use of C(Services) to establish an SSH connection to a virtual machine.
|
||||
- Services are only used if no O(connections.network_name) was provided.
|
||||
type: bool
|
||||
default: True
|
||||
create_groups:
|
||||
description:
|
||||
- Enable the creation of groups from labels on C(VirtualMachines).
|
||||
- Enable the creation of groups from labels on C(VirtualMachines) and C(VirtualMachineInstances).
|
||||
type: bool
|
||||
default: False
|
||||
base_domain:
|
||||
description:
|
||||
- Override the base domain used to construct host names of C(VirtualMachines). Used in case of
|
||||
- Override the base domain used to construct host names. Used in case of
|
||||
C(kubesecondarydns) or C(Services) of type C(NodePort) if O(connections.append_base_domain) is set.
|
||||
append_base_domain:
|
||||
description:
|
||||
@@ -143,21 +143,21 @@ requirements:
|
||||
EXAMPLES = """
|
||||
# Filename must end with kubevirt.[yml|yaml]
|
||||
|
||||
- name: Authenticate with token and return all VirtualMachineInstances for all accessible namespaces
|
||||
- name: Authenticate with token and return all virtual machines from all accessible namespaces
|
||||
plugin: kubevirt.core.kubevirt
|
||||
connections:
|
||||
- host: https://192.168.64.4:8443
|
||||
api_key: xxxxxxxxxxxxxxxx
|
||||
validate_certs: false
|
||||
|
||||
- name: Use default ~/.kube/config and return VirtualMachineInstances from namespace testing connected to network bridge-network
|
||||
- name: Use default ~/.kube/config and return virtual machines from namespace testing connected to network bridge-network
|
||||
plugin: kubevirt.core.kubevirt
|
||||
connections:
|
||||
- namespaces:
|
||||
- testing
|
||||
network_name: bridge-network
|
||||
|
||||
- name: Use default ~/.kube/config and return VirtualMachineInstances from namespace testing with label app=test
|
||||
- name: Use default ~/.kube/config and return virtual machines from namespace testing with label app=test
|
||||
plugin: kubevirt.core.kubevirt
|
||||
connections:
|
||||
- namespaces:
|
||||
@@ -173,6 +173,7 @@ EXAMPLES = """
|
||||
|
||||
from dataclasses import dataclass
|
||||
from json import loads
|
||||
from re import compile as re_compile
|
||||
from typing import (
|
||||
Any,
|
||||
Dict,
|
||||
@@ -180,22 +181,29 @@ from typing import (
|
||||
Optional,
|
||||
)
|
||||
|
||||
|
||||
# Handle import errors of python kubernetes client.
|
||||
# Set HAS_K8S_MODULE_HELPER and k8s_import exception accordingly to
|
||||
# potentially print a warning to the user if the client is missing.
|
||||
try:
|
||||
from kubernetes.dynamic.exceptions import DynamicApiError
|
||||
from kubernetes.dynamic.resource import ResourceField
|
||||
|
||||
HAS_K8S_MODULE_HELPER = True
|
||||
k8s_import_exception = None
|
||||
K8S_IMPORT_EXCEPTION = None
|
||||
except ImportError as e:
|
||||
|
||||
class DynamicApiError(Exception):
|
||||
pass
|
||||
"""
|
||||
Dummy class, mainly used for ansible-test sanity.
|
||||
"""
|
||||
|
||||
class ResourceField:
|
||||
"""
|
||||
Dummy class, mainly used for ansible-test sanity.
|
||||
"""
|
||||
|
||||
HAS_K8S_MODULE_HELPER = False
|
||||
k8s_import_exception = e
|
||||
K8S_IMPORT_EXCEPTION = e
|
||||
|
||||
from ansible.plugins.inventory import BaseInventoryPlugin, Constructable, Cacheable
|
||||
|
||||
@@ -215,11 +223,13 @@ ID_MSWINDOWS = "mswindows"
|
||||
|
||||
|
||||
class KubeVirtInventoryException(Exception):
|
||||
pass
|
||||
"""
|
||||
This class is used for exceptions raised by this inventory.
|
||||
"""
|
||||
|
||||
|
||||
@dataclass
|
||||
class GetVmiOptions:
|
||||
class InventoryOptions:
|
||||
"""
|
||||
This class holds the options defined by the user.
|
||||
"""
|
||||
@@ -257,8 +267,8 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
|
||||
|
||||
NAME = "kubevirt.core.kubevirt"
|
||||
|
||||
connection_plugin = "kubernetes.core.kubectl"
|
||||
transport = "kubectl"
|
||||
# Used to convert camel case variable names into snake case
|
||||
snake_case_pattern = re_compile(r"(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])")
|
||||
|
||||
@staticmethod
|
||||
def get_default_host_name(host: str) -> str:
|
||||
@@ -289,21 +299,29 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
|
||||
return f"{exc.status} Reason: {exc.reason}"
|
||||
|
||||
@staticmethod
|
||||
def get_host_from_service(service: Dict, node_name: str) -> Optional[str]:
|
||||
def format_var_name(name: str) -> str:
|
||||
"""
|
||||
format_var_name formats a CamelCase variable name into a snake_case name
|
||||
suitable for use as a inventory variable name.
|
||||
"""
|
||||
return InventoryModule.snake_case_pattern.sub("_", name).lower()
|
||||
|
||||
@staticmethod
|
||||
def get_host_from_service(service: Dict, node_name: Optional[str]) -> Optional[str]:
|
||||
"""
|
||||
get_host_from_service extracts the hostname to be used from the
|
||||
passed in service.
|
||||
"""
|
||||
# LoadBalancer services can return a hostname or an IP address
|
||||
if service["spec"]["type"] == TYPE_LOADBALANCER:
|
||||
ingress = service["status"]["loadBalancer"].get("ingress")
|
||||
service_type = service.get("spec", {}).get("type")
|
||||
if service_type == TYPE_LOADBALANCER:
|
||||
# LoadBalancer services can return a hostname or an IP address
|
||||
ingress = service.get("status", {}).get("loadBalancer", {}).get("ingress")
|
||||
if ingress is not None and len(ingress) > 0:
|
||||
hostname = ingress[0].get("hostname")
|
||||
ip_address = ingress[0].get("ip")
|
||||
return hostname if hostname is not None else ip_address
|
||||
|
||||
# NodePort services use the node name as host
|
||||
if service["spec"]["type"] == TYPE_NODEPORT:
|
||||
elif service_type == TYPE_NODEPORT:
|
||||
# NodePort services use the node name as host
|
||||
return node_name
|
||||
|
||||
return None
|
||||
@@ -314,21 +332,32 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
|
||||
get_port_from_service extracts the port to be used from the
|
||||
passed in service.
|
||||
"""
|
||||
# LoadBalancer services use the port attribute
|
||||
if service["spec"]["type"] == TYPE_LOADBALANCER:
|
||||
return service["spec"]["ports"][0]["port"]
|
||||
ports = service.get("spec", {}).get("ports", [])
|
||||
if not ports:
|
||||
return None
|
||||
|
||||
# NodePort services use the nodePort attribute
|
||||
if service["spec"]["type"] == TYPE_NODEPORT:
|
||||
return service["spec"]["ports"][0]["nodePort"]
|
||||
service_type = service.get("spec", {}).get("type")
|
||||
if service_type == TYPE_LOADBALANCER:
|
||||
# LoadBalancer services use the port attribute
|
||||
return ports[0].get("port")
|
||||
if service_type == TYPE_NODEPORT:
|
||||
# NodePort services use the nodePort attribute
|
||||
return ports[0].get("nodePort")
|
||||
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def is_windows(guest_os_info: Dict, annotations: Dict) -> bool:
|
||||
if "id" in guest_os_info:
|
||||
def is_windows(guest_os_info: Optional[Dict], annotations: Optional[Dict]) -> bool:
|
||||
"""
|
||||
is_windows checkes whether a given VM is running a Windows guest
|
||||
by checking its GuestOSInfo and annotations.
|
||||
"""
|
||||
if guest_os_info and "id" in guest_os_info:
|
||||
return guest_os_info["id"] == ID_MSWINDOWS
|
||||
|
||||
if not annotations:
|
||||
return False
|
||||
|
||||
if ANNOTATION_KUBEVIRT_IO_CLUSTER_PREFERENCE_NAME in annotations:
|
||||
return annotations[
|
||||
ANNOTATION_KUBEVIRT_IO_CLUSTER_PREFERENCE_NAME
|
||||
@@ -372,12 +401,10 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
|
||||
gets the configured connections and runs fetch_objects on them.
|
||||
If there is a cache it is returned instead.
|
||||
"""
|
||||
connections = config_data.get("connections")
|
||||
|
||||
if not HAS_K8S_MODULE_HELPER:
|
||||
raise KubeVirtInventoryException(
|
||||
"This module requires the Kubernetes Python client. "
|
||||
+ f"Try `pip install kubernetes`. Detail: {k8s_import_exception}"
|
||||
+ f"Try `pip install kubernetes`. Detail: {K8S_IMPORT_EXCEPTION}"
|
||||
)
|
||||
|
||||
source_data = None
|
||||
@@ -388,9 +415,9 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
|
||||
pass
|
||||
|
||||
if not source_data:
|
||||
self.fetch_objects(connections)
|
||||
self.fetch_objects(config_data.get("connections"))
|
||||
|
||||
def fetch_objects(self, connections: Dict) -> None:
|
||||
def fetch_objects(self, connections: Optional[List[Dict]]) -> None:
|
||||
"""
|
||||
fetch_objects populates the inventory with every configured connection.
|
||||
"""
|
||||
@@ -412,7 +439,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
|
||||
else:
|
||||
namespaces = self.get_available_namespaces(client)
|
||||
|
||||
opts = GetVmiOptions(
|
||||
opts = InventoryOptions(
|
||||
connection.get("api_version"),
|
||||
connection.get("label_selector"),
|
||||
connection.get("network_name", connection.get("interface_name")),
|
||||
@@ -424,14 +451,19 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
|
||||
self.host_format,
|
||||
)
|
||||
for namespace in namespaces:
|
||||
self.get_vmis_for_namespace(client, name, namespace, opts)
|
||||
self.populate_inventory_from_namespace(
|
||||
client, name, namespace, opts
|
||||
)
|
||||
else:
|
||||
client = get_api_client()
|
||||
name = self.get_default_host_name(client.configuration.host)
|
||||
namespaces = self.get_available_namespaces(client)
|
||||
opts = GetVmiOptions(host_format=self.host_format)
|
||||
opts = InventoryOptions(
|
||||
host_format=self.host_format,
|
||||
base_domain=self.get_cluster_domain(client),
|
||||
)
|
||||
for namespace in namespaces:
|
||||
self.get_vmis_for_namespace(client, name, namespace, opts)
|
||||
self.populate_inventory_from_namespace(client, name, namespace, opts)
|
||||
|
||||
def get_cluster_domain(self, client: K8SClient) -> Optional[str]:
|
||||
"""
|
||||
@@ -453,208 +485,59 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
|
||||
return None
|
||||
return obj.get("spec", {}).get("baseDomain")
|
||||
|
||||
def get_available_namespaces(self, client: K8SClient) -> List:
|
||||
def get_resources(
|
||||
self, client: K8SClient, api_version: str, kind: str, **kwargs
|
||||
) -> List[ResourceField]:
|
||||
"""
|
||||
get_resources uses a dynamic K8SClient to fetch resources from the K8S API.
|
||||
"""
|
||||
client = client.resources.get(api_version=api_version, kind=kind)
|
||||
try:
|
||||
result = client.get(**kwargs)
|
||||
except DynamicApiError as exc:
|
||||
self.display.debug(exc)
|
||||
raise KubeVirtInventoryException(
|
||||
f"Error fetching {kind} list: {self.format_dynamic_api_exc(exc)}"
|
||||
) from exc
|
||||
|
||||
return result.items
|
||||
|
||||
def get_available_namespaces(self, client: K8SClient) -> List[str]:
|
||||
"""
|
||||
get_available_namespaces lists all namespaces accessible with the
|
||||
configured credentials and returns them.
|
||||
"""
|
||||
v1_namespace = client.resources.get(api_version="v1", kind="Namespace")
|
||||
try:
|
||||
obj = v1_namespace.get()
|
||||
except DynamicApiError as exc:
|
||||
self.display.debug(exc)
|
||||
raise KubeVirtInventoryException(
|
||||
f"Error fetching Namespace list: {self.format_dynamic_api_exc(exc)}"
|
||||
) from exc
|
||||
return [namespace.metadata.name for namespace in obj.items]
|
||||
return [
|
||||
namespace.metadata.name
|
||||
for namespace in self.get_resources(client, "v1", "Namespace")
|
||||
]
|
||||
|
||||
def get_vms_for_namespace(
|
||||
self, client: K8SClient, namespace: str, opts: InventoryOptions
|
||||
) -> List[ResourceField]:
|
||||
"""
|
||||
get_vms_for_namespace returns a list of all VirtualMachines in a namespace.
|
||||
"""
|
||||
return self.get_resources(
|
||||
client,
|
||||
opts.api_version,
|
||||
"VirtualMachine",
|
||||
namespace=namespace,
|
||||
label_selector=opts.label_selector,
|
||||
)
|
||||
|
||||
def get_vmis_for_namespace(
|
||||
self, client: K8SClient, name: str, namespace: str, opts: GetVmiOptions
|
||||
) -> None:
|
||||
self, client: K8SClient, namespace: str, opts: InventoryOptions
|
||||
) -> List[ResourceField]:
|
||||
"""
|
||||
get_vmis_for_namespace lists all VirtualMachineInstances in a namespace
|
||||
and adds groups and hosts to the inventory.
|
||||
get_vmis_for_namespace returns a list of all VirtualMachineInstances in a namespace.
|
||||
"""
|
||||
vmi_client = client.resources.get(
|
||||
api_version=opts.api_version, kind="VirtualMachineInstance"
|
||||
)
|
||||
try:
|
||||
vmi_list = vmi_client.get(
|
||||
namespace=namespace, label_selector=opts.label_selector
|
||||
)
|
||||
except DynamicApiError as exc:
|
||||
self.display.debug(exc)
|
||||
raise KubeVirtInventoryException(
|
||||
f"Error fetching VirtualMachineInstance list: {self.format_dynamic_api_exc(exc)}"
|
||||
) from exc
|
||||
|
||||
if not vmi_list.items:
|
||||
# Return early if no VMIs were found to avoid adding empty groups.
|
||||
return
|
||||
|
||||
services = self.get_ssh_services_for_namespace(client, namespace)
|
||||
|
||||
name = self._sanitize_group_name(name)
|
||||
namespace_group = self._sanitize_group_name(f"namespace_{namespace}")
|
||||
|
||||
self.inventory.add_group(name)
|
||||
self.inventory.add_group(namespace_group)
|
||||
self.inventory.add_child(name, namespace_group)
|
||||
|
||||
for vmi in vmi_list.items:
|
||||
if not (vmi.status and vmi.status.interfaces):
|
||||
continue
|
||||
|
||||
# Find interface by its name:
|
||||
if opts.network_name is None:
|
||||
interface = vmi.status.interfaces[0]
|
||||
else:
|
||||
interface = next(
|
||||
(i for i in vmi.status.interfaces if i.name == opts.network_name),
|
||||
None,
|
||||
)
|
||||
|
||||
# If interface is not found or IP address is not reported skip this VM:
|
||||
if interface is None or interface.ipAddress is None:
|
||||
continue
|
||||
|
||||
vmi_name = opts.host_format.format(
|
||||
namespace=vmi.metadata.namespace,
|
||||
name=vmi.metadata.name,
|
||||
uid=vmi.metadata.uid,
|
||||
)
|
||||
vmi_annotations = (
|
||||
{}
|
||||
if not vmi.metadata.annotations
|
||||
else vmi.metadata.annotations.to_dict()
|
||||
)
|
||||
vmi_labels = (
|
||||
{} if not vmi.metadata.labels else vmi.metadata.labels.to_dict()
|
||||
)
|
||||
|
||||
# Add vmi to the namespace group
|
||||
self.inventory.add_host(vmi_name)
|
||||
self.inventory.add_child(namespace_group, vmi_name)
|
||||
|
||||
# Create label groups and add vmi to it if enabled
|
||||
if vmi.metadata.labels and opts.create_groups:
|
||||
# Create a group for each label_value
|
||||
vmi_groups = []
|
||||
for key, value in vmi.metadata.labels.items():
|
||||
group_name = self._sanitize_group_name(f"label_{key}_{value}")
|
||||
if group_name not in vmi_groups:
|
||||
vmi_groups.append(group_name)
|
||||
# Add vmi to each label_value group
|
||||
for group in vmi_groups:
|
||||
self.inventory.add_group(group)
|
||||
self.inventory.add_child(group, vmi_name)
|
||||
|
||||
# Add hostvars from metadata
|
||||
self.inventory.set_variable(vmi_name, "object_type", "vmi")
|
||||
self.inventory.set_variable(vmi_name, "labels", vmi_labels)
|
||||
self.inventory.set_variable(vmi_name, "annotations", vmi_annotations)
|
||||
self.inventory.set_variable(
|
||||
vmi_name, "cluster_name", vmi.metadata.clusterName
|
||||
)
|
||||
self.inventory.set_variable(
|
||||
vmi_name, "resource_version", vmi.metadata.resourceVersion
|
||||
)
|
||||
self.inventory.set_variable(vmi_name, "uid", vmi.metadata.uid)
|
||||
|
||||
# Add hostvars from status
|
||||
vmi_active_pods = (
|
||||
{} if not vmi.status.activePods else vmi.status.activePods.to_dict()
|
||||
)
|
||||
self.inventory.set_variable(vmi_name, "vmi_active_pods", vmi_active_pods)
|
||||
vmi_conditions = (
|
||||
[]
|
||||
if not vmi.status.conditions
|
||||
else [c.to_dict() for c in vmi.status.conditions]
|
||||
)
|
||||
self.inventory.set_variable(vmi_name, "vmi_conditions", vmi_conditions)
|
||||
vmi_guest_os_info = (
|
||||
{} if not vmi.status.guestOSInfo else vmi.status.guestOSInfo.to_dict()
|
||||
)
|
||||
self.inventory.set_variable(
|
||||
vmi_name, "vmi_guest_os_info", vmi_guest_os_info
|
||||
)
|
||||
vmi_interfaces = (
|
||||
[]
|
||||
if not vmi.status.interfaces
|
||||
else [i.to_dict() for i in vmi.status.interfaces]
|
||||
)
|
||||
self.inventory.set_variable(vmi_name, "vmi_interfaces", vmi_interfaces)
|
||||
self.inventory.set_variable(
|
||||
vmi_name,
|
||||
"vmi_launcher_container_image_version",
|
||||
vmi.status.launcherContainerImageVersion,
|
||||
)
|
||||
self.inventory.set_variable(
|
||||
vmi_name, "vmi_migration_method", vmi.status.migrationMethod
|
||||
)
|
||||
self.inventory.set_variable(
|
||||
vmi_name, "vmi_migration_transport", vmi.status.migrationTransport
|
||||
)
|
||||
self.inventory.set_variable(vmi_name, "vmi_node_name", vmi.status.nodeName)
|
||||
self.inventory.set_variable(vmi_name, "vmi_phase", vmi.status.phase)
|
||||
vmi_phase_transition_timestamps = (
|
||||
[]
|
||||
if not vmi.status.phaseTransitionTimestamps
|
||||
else [p.to_dict() for p in vmi.status.phaseTransitionTimestamps]
|
||||
)
|
||||
self.inventory.set_variable(
|
||||
vmi_name,
|
||||
"vmi_phase_transition_timestamps",
|
||||
vmi_phase_transition_timestamps,
|
||||
)
|
||||
self.inventory.set_variable(vmi_name, "vmi_qos_class", vmi.status.qosClass)
|
||||
self.inventory.set_variable(
|
||||
vmi_name,
|
||||
"vmi_virtual_machine_revision_name",
|
||||
vmi.status.virtualMachineRevisionName,
|
||||
)
|
||||
vmi_volume_status = (
|
||||
[]
|
||||
if not vmi.status.volumeStatus
|
||||
else [v.to_dict() for v in vmi.status.volumeStatus]
|
||||
)
|
||||
self.inventory.set_variable(
|
||||
vmi_name, "vmi_volume_status", vmi_volume_status
|
||||
)
|
||||
|
||||
# Set up the connection
|
||||
service = None
|
||||
if self.is_windows(vmi_guest_os_info, vmi_annotations):
|
||||
self.inventory.set_variable(vmi_name, "ansible_connection", "winrm")
|
||||
else:
|
||||
service = services.get(
|
||||
vmi.metadata.labels.get(LABEL_KUBEVIRT_IO_DOMAIN)
|
||||
)
|
||||
self.set_ansible_host_and_port(
|
||||
vmi,
|
||||
vmi_name,
|
||||
interface.ipAddress,
|
||||
service,
|
||||
opts,
|
||||
)
|
||||
|
||||
self.set_composable_vars(vmi_name)
|
||||
|
||||
def set_composable_vars(self, vmi_name):
|
||||
"""
|
||||
set_composable_vars sets vars per
|
||||
https://docs.ansible.com/ansible/latest/dev_guide/developing_inventory.html
|
||||
"""
|
||||
host_vars = self.inventory.get_host(vmi_name).get_vars()
|
||||
strict = self.get_option("strict")
|
||||
self._set_composite_vars(
|
||||
self.get_option("compose"), host_vars, vmi_name, strict=True
|
||||
)
|
||||
self._add_host_to_composed_groups(
|
||||
self.get_option("groups"), host_vars, vmi_name, strict=strict
|
||||
)
|
||||
self._add_host_to_keyed_groups(
|
||||
self.get_option("keyed_groups"), host_vars, vmi_name, strict=strict
|
||||
return self.get_resources(
|
||||
client,
|
||||
opts.api_version,
|
||||
"VirtualMachineInstance",
|
||||
namespace=namespace,
|
||||
label_selector=opts.label_selector,
|
||||
)
|
||||
|
||||
def get_ssh_services_for_namespace(self, client: K8SClient, namespace: str) -> Dict:
|
||||
@@ -662,21 +545,20 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
|
||||
get_ssh_services_for_namespace retrieves all services of a namespace exposing port 22/ssh.
|
||||
The services are mapped to the name of the corresponding domain.
|
||||
"""
|
||||
v1_service = client.resources.get(api_version="v1", kind="Service")
|
||||
try:
|
||||
service_list = v1_service.get(
|
||||
namespace=namespace,
|
||||
)
|
||||
except DynamicApiError as exc:
|
||||
self.display.debug(exc)
|
||||
raise KubeVirtInventoryException(
|
||||
f"Error fetching Service list: {self.format_dynamic_api_exc(exc)}"
|
||||
) from exc
|
||||
service_list = self.get_resources(
|
||||
client,
|
||||
"v1",
|
||||
"Service",
|
||||
namespace=namespace,
|
||||
)
|
||||
|
||||
services = {}
|
||||
for service in service_list.items:
|
||||
for service in service_list:
|
||||
# Continue if service is not of type LoadBalancer or NodePort
|
||||
if service.get("spec", {}).get("type") not in (
|
||||
if service.get("spec") is None:
|
||||
continue
|
||||
|
||||
if service["spec"].get("type") not in (
|
||||
TYPE_LOADBALANCER,
|
||||
TYPE_NODEPORT,
|
||||
):
|
||||
@@ -695,13 +577,169 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
|
||||
|
||||
return services
|
||||
|
||||
def populate_inventory_from_namespace(
|
||||
self, client: K8SClient, name: str, namespace: str, opts: InventoryOptions
|
||||
) -> None:
|
||||
"""
|
||||
populate_inventory_from_namespace adds groups and hosts from a
|
||||
namespace to the inventory.
|
||||
"""
|
||||
vms = {
|
||||
vm.metadata.name: vm
|
||||
for vm in self.get_vms_for_namespace(client, namespace, opts)
|
||||
}
|
||||
vmis = {
|
||||
vmi.metadata.name: vmi
|
||||
for vmi in self.get_vmis_for_namespace(client, namespace, opts)
|
||||
}
|
||||
|
||||
if not vms and not vmis:
|
||||
# Return early if no VMs and VMIs were found to avoid adding empty groups.
|
||||
return
|
||||
|
||||
services = self.get_ssh_services_for_namespace(client, namespace)
|
||||
|
||||
name = self._sanitize_group_name(name)
|
||||
namespace_group = self._sanitize_group_name(f"namespace_{namespace}")
|
||||
|
||||
self.inventory.add_group(name)
|
||||
self.inventory.add_group(namespace_group)
|
||||
self.inventory.add_child(name, namespace_group)
|
||||
|
||||
# Add found VMs and optionally enhance with VMI data
|
||||
for name, vm in vms.items():
|
||||
hostname = self.add_host(vm, opts.host_format, namespace_group)
|
||||
self.set_vars_from_vm(hostname, vm, opts)
|
||||
if name in vmis:
|
||||
self.set_vars_from_vmi(hostname, vmis[name], services, opts)
|
||||
self.set_composable_vars(hostname)
|
||||
|
||||
# Add remaining VMIs without VM
|
||||
for name, vmi in vmis.items():
|
||||
if name in vms:
|
||||
continue
|
||||
hostname = self.add_host(vmi, opts.host_format, namespace_group)
|
||||
self.set_vars_from_vmi(hostname, vmi, services, opts)
|
||||
self.set_composable_vars(hostname)
|
||||
|
||||
def add_host(
|
||||
self, obj: ResourceField, host_format: str, namespace_group: str
|
||||
) -> str:
|
||||
"""
|
||||
add_hosts adds a host to the inventory.
|
||||
"""
|
||||
hostname = host_format.format(
|
||||
namespace=obj.metadata.namespace,
|
||||
name=obj.metadata.name,
|
||||
uid=obj.metadata.uid,
|
||||
)
|
||||
self.inventory.add_host(hostname)
|
||||
self.inventory.add_child(namespace_group, hostname)
|
||||
|
||||
return hostname
|
||||
|
||||
def set_vars_from_vm(
|
||||
self, hostname: str, vm: ResourceField, opts: InventoryOptions
|
||||
) -> None:
|
||||
"""
|
||||
set_vars_from_vm sets inventory variables from a VM prefixed with vm_.
|
||||
"""
|
||||
self.set_common_vars(hostname, "vm", vm, opts)
|
||||
|
||||
def set_vars_from_vmi(
|
||||
self, hostname: str, vmi: ResourceField, services: Dict, opts: InventoryOptions
|
||||
) -> None:
|
||||
"""
|
||||
set_vars_from_vmi sets inventory variables from a VMI prefixed with vmi_ and
|
||||
looks up the interface to set ansible_host and ansible_port.
|
||||
"""
|
||||
self.set_common_vars(hostname, "vmi", vmi, opts)
|
||||
|
||||
if opts.network_name is None:
|
||||
# Use first interface
|
||||
interface = vmi.status.interfaces[0] if vmi.status.interfaces else None
|
||||
else:
|
||||
# Find interface by its name
|
||||
interface = next(
|
||||
(i for i in vmi.status.interfaces if i.name == opts.network_name),
|
||||
None,
|
||||
)
|
||||
|
||||
# If interface is not found or IP address is not reported skip this VMI
|
||||
if interface is None or interface.ipAddress is None:
|
||||
return
|
||||
|
||||
# Set up the connection
|
||||
service = None
|
||||
if self.is_windows(
|
||||
{} if not vmi.status.guestOSInfo else vmi.status.guestOSInfo.to_dict(),
|
||||
{} if not vmi.metadata.annotations else vmi.metadata.annotations.to_dict(),
|
||||
):
|
||||
self.inventory.set_variable(hostname, "ansible_connection", "winrm")
|
||||
else:
|
||||
service = services.get(vmi.metadata.labels.get(LABEL_KUBEVIRT_IO_DOMAIN))
|
||||
self.set_ansible_host_and_port(
|
||||
vmi,
|
||||
hostname,
|
||||
interface.ipAddress,
|
||||
service,
|
||||
opts,
|
||||
)
|
||||
|
||||
def set_common_vars(
|
||||
self, hostname: str, prefix: str, obj: ResourceField, opts: InventoryOptions
|
||||
):
|
||||
"""
|
||||
set_common_vars sets common inventory variables from VMs or VMIs.
|
||||
"""
|
||||
# Add hostvars from metadata
|
||||
if metadata := obj.metadata:
|
||||
if metadata.annotations:
|
||||
self.inventory.set_variable(
|
||||
hostname, f"{prefix}_annotations", metadata.annotations.to_dict()
|
||||
)
|
||||
if metadata.labels:
|
||||
self.inventory.set_variable(
|
||||
hostname, f"{prefix}_labels", metadata.labels.to_dict()
|
||||
)
|
||||
# Create label groups and add vm to it if enabled
|
||||
if opts.create_groups:
|
||||
self.set_groups_from_labels(hostname, metadata.labels)
|
||||
if metadata.resourceVersion:
|
||||
self.inventory.set_variable(
|
||||
hostname, f"{prefix}_resource_version", metadata.resourceVersion
|
||||
)
|
||||
if metadata.uid:
|
||||
self.inventory.set_variable(hostname, f"{prefix}_uid", metadata.uid)
|
||||
|
||||
# Add hostvars from status
|
||||
if obj.status:
|
||||
for key, value in obj.status.to_dict().items():
|
||||
name = self.format_var_name(key)
|
||||
self.inventory.set_variable(hostname, f"{prefix}_{name}", value)
|
||||
|
||||
def set_groups_from_labels(self, hostname: str, labels: ResourceField) -> None:
|
||||
"""
|
||||
set_groups_from_labels adds groups for each label of a VM or VMI and
|
||||
adds the host to each group.
|
||||
"""
|
||||
groups = []
|
||||
for key, value in labels.to_dict().items():
|
||||
group_name = self._sanitize_group_name(f"label_{key}_{value}")
|
||||
if group_name not in groups:
|
||||
groups.append(group_name)
|
||||
# Add host to each label_value group
|
||||
for group in groups:
|
||||
self.inventory.add_group(group)
|
||||
self.inventory.add_child(group, hostname)
|
||||
|
||||
def set_ansible_host_and_port(
|
||||
self,
|
||||
vmi: Dict,
|
||||
vmi_name: str,
|
||||
vmi: ResourceField,
|
||||
hostname: str,
|
||||
ip_address: str,
|
||||
service: Optional[Dict],
|
||||
opts: GetVmiOptions,
|
||||
opts: InventoryOptions,
|
||||
) -> None:
|
||||
"""
|
||||
set_ansible_host_and_port sets the ansible_host and possibly the ansible_port var.
|
||||
@@ -709,22 +747,20 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
|
||||
"""
|
||||
ansible_host = None
|
||||
ansible_port = None
|
||||
if opts.kube_secondary_dns and opts.network_name is not None:
|
||||
if opts.kube_secondary_dns and opts.network_name:
|
||||
# Set ansible_host to the kubesecondarydns derived host name if enabled
|
||||
# See https://github.com/kubevirt/kubesecondarydns#parameters
|
||||
ansible_host = (
|
||||
f"{opts.network_name}.{vmi.metadata.name}.{vmi.metadata.namespace}.vm"
|
||||
)
|
||||
if opts.base_domain is not None:
|
||||
if opts.base_domain:
|
||||
ansible_host += f".{opts.base_domain}"
|
||||
elif opts.use_service and service is not None and opts.network_name is None:
|
||||
elif opts.use_service and service and not opts.network_name:
|
||||
# Set ansible_host and ansible_port to the host and port from the LoadBalancer
|
||||
# or NodePort service exposing SSH
|
||||
node_name = (
|
||||
f"{vmi.status.nodeName}.{opts.base_domain}"
|
||||
if opts.append_base_domain
|
||||
else vmi.status.nodeName
|
||||
)
|
||||
node_name = vmi.status.nodeName
|
||||
if node_name and opts.append_base_domain and opts.base_domain:
|
||||
node_name += f".{opts.base_domain}"
|
||||
host = self.get_host_from_service(service, node_name)
|
||||
port = self.get_port_from_service(service)
|
||||
if host is not None and port is not None:
|
||||
@@ -735,5 +771,22 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
|
||||
if ansible_host is None:
|
||||
ansible_host = ip_address
|
||||
|
||||
self.inventory.set_variable(vmi_name, "ansible_host", ansible_host)
|
||||
self.inventory.set_variable(vmi_name, "ansible_port", ansible_port)
|
||||
self.inventory.set_variable(hostname, "ansible_host", ansible_host)
|
||||
self.inventory.set_variable(hostname, "ansible_port", ansible_port)
|
||||
|
||||
def set_composable_vars(self, hostname: str) -> None:
|
||||
"""
|
||||
set_composable_vars sets vars per
|
||||
https://docs.ansible.com/ansible/latest/dev_guide/developing_inventory.html
|
||||
"""
|
||||
hostvars = self.inventory.get_host(hostname).get_vars()
|
||||
strict = self.get_option("strict")
|
||||
self._set_composite_vars(
|
||||
self.get_option("compose"), hostvars, hostname, strict=True
|
||||
)
|
||||
self._add_host_to_composed_groups(
|
||||
self.get_option("groups"), hostvars, hostname, strict=strict
|
||||
)
|
||||
self._add_host_to_keyed_groups(
|
||||
self.get_option("keyed_groups"), hostvars, hostname, strict=strict
|
||||
)
|
||||
|
||||
@@ -17,3 +17,10 @@
|
||||
name: testvm2
|
||||
namespace: default
|
||||
wait: true
|
||||
|
||||
- name: Delete the stopped VM
|
||||
kubevirt.core.kubevirt_vm:
|
||||
state: absent
|
||||
name: testvm-stopped
|
||||
namespace: default
|
||||
wait: true
|
||||
|
||||
@@ -52,3 +52,13 @@
|
||||
name: containerdisk
|
||||
wait: true
|
||||
wait_timeout: 600
|
||||
|
||||
- name: Create a stopped VM
|
||||
kubevirt.core.kubevirt_vm:
|
||||
state: present
|
||||
name: testvm-stopped
|
||||
namespace: default
|
||||
running: false
|
||||
spec:
|
||||
domain:
|
||||
devices: {}
|
||||
|
||||
@@ -16,11 +16,12 @@
|
||||
ansible.builtin.include_vars:
|
||||
file: all.yml
|
||||
name: inv_all
|
||||
- name: Assert two instances with different labels
|
||||
- name: Assert all expected hosts were discovered
|
||||
ansible.builtin.assert:
|
||||
that:
|
||||
- inv_all['all']['children']['label_app_test']['hosts'] | length == 1
|
||||
- inv_all['all']['children']['label_foo_bar']['hosts'] | length == 1
|
||||
- "'default-testvm-stopped' in inv_all['all']['children']['test']['children']['namespace_default']['hosts']"
|
||||
- name: Read filtered inventory
|
||||
ansible.builtin.include_vars:
|
||||
file: label.yml
|
||||
|
||||
@@ -0,0 +1,80 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2024 Red Hat, Inc.
|
||||
# Apache License 2.0 (see LICENSE or http://www.apache.org/licenses/LICENSE-2.0)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import (
|
||||
InventoryOptions,
|
||||
)
|
||||
|
||||
from ansible_collections.kubevirt.core.tests.unit.plugins.inventory.constants import (
|
||||
DEFAULT_NAMESPACE,
|
||||
merge_dicts,
|
||||
)
|
||||
|
||||
BASE_VMI = {
|
||||
"metadata": {
|
||||
"name": "testvmi",
|
||||
"namespace": "default",
|
||||
},
|
||||
"status": {
|
||||
"interfaces": [{"ipAddress": "10.10.10.10"}],
|
||||
},
|
||||
}
|
||||
WINDOWS_VMI_1 = merge_dicts(
|
||||
BASE_VMI,
|
||||
{
|
||||
"status": {
|
||||
"guestOSInfo": {"id": "mswindows"},
|
||||
}
|
||||
},
|
||||
)
|
||||
WINDOWS_VMI_2 = merge_dicts(
|
||||
BASE_VMI,
|
||||
{
|
||||
"metadata": {
|
||||
"annotations": {"kubevirt.io/cluster-preference-name": "windows.2k22"}
|
||||
},
|
||||
},
|
||||
)
|
||||
WINDOWS_VMI_3 = merge_dicts(
|
||||
BASE_VMI,
|
||||
{
|
||||
"metadata": {"annotations": {"kubevirt.io/preference-name": "windows.2k22"}},
|
||||
},
|
||||
)
|
||||
WINDOWS_VMI_4 = merge_dicts(
|
||||
BASE_VMI,
|
||||
{
|
||||
"metadata": {"annotations": {"vm.kubevirt.io/os": "windows2k22"}},
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"client,vmi,expected",
|
||||
[
|
||||
({"vmis": [BASE_VMI]}, BASE_VMI, False),
|
||||
({"vmis": [WINDOWS_VMI_1]}, WINDOWS_VMI_1, True),
|
||||
({"vmis": [WINDOWS_VMI_2]}, WINDOWS_VMI_2, True),
|
||||
({"vmis": [WINDOWS_VMI_3]}, WINDOWS_VMI_3, True),
|
||||
({"vmis": [WINDOWS_VMI_4]}, WINDOWS_VMI_4, True),
|
||||
],
|
||||
indirect=["client"],
|
||||
)
|
||||
def test_ansible_connection_winrm(inventory, hosts, client, vmi, expected):
|
||||
inventory.populate_inventory_from_namespace(
|
||||
client, "", DEFAULT_NAMESPACE, InventoryOptions()
|
||||
)
|
||||
|
||||
host = f"{DEFAULT_NAMESPACE}-{vmi['metadata']['name']}"
|
||||
if expected:
|
||||
assert hosts[host]["ansible_connection"] == "winrm"
|
||||
else:
|
||||
assert "ansible_connection" not in hosts[host]
|
||||
@@ -0,0 +1,63 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2024 Red Hat, Inc.
|
||||
# Apache License 2.0 (see LICENSE or http://www.apache.org/licenses/LICENSE-2.0)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
import pytest
|
||||
|
||||
from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import (
|
||||
InventoryOptions,
|
||||
)
|
||||
|
||||
from ansible_collections.kubevirt.core.tests.unit.plugins.inventory.constants import (
|
||||
DEFAULT_NAMESPACE,
|
||||
)
|
||||
|
||||
|
||||
VMI = {
|
||||
"metadata": {
|
||||
"name": "testvmi",
|
||||
"namespace": "default",
|
||||
},
|
||||
"status": {
|
||||
"interfaces": [{"ipAddress": "10.10.10.10"}],
|
||||
"migrationMethod": "BlockMigration",
|
||||
"nodeName": "test-node",
|
||||
"guestOSInfo": {
|
||||
"id": "fedora",
|
||||
"versionId": "40",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"client",
|
||||
[{"vmis": [VMI]}],
|
||||
indirect=["client"],
|
||||
)
|
||||
def test_set_composable_vars(
|
||||
inventory,
|
||||
groups,
|
||||
hosts,
|
||||
client,
|
||||
):
|
||||
inventory._options = {
|
||||
"compose": {"set_from_another_var": "vmi_node_name"},
|
||||
"groups": {"block_migratable_vmis": "vmi_migration_method == 'BlockMigration'"},
|
||||
"keyed_groups": [{"prefix": "fedora", "key": "vmi_guest_os_info.versionId"}],
|
||||
"strict": True,
|
||||
}
|
||||
inventory.populate_inventory_from_namespace(
|
||||
client, "", DEFAULT_NAMESPACE, InventoryOptions()
|
||||
)
|
||||
|
||||
host = f"{DEFAULT_NAMESPACE}-testvmi"
|
||||
assert hosts[host]["set_from_another_var"] == "test-node"
|
||||
assert "block_migratable_vmis" in groups
|
||||
assert host in groups["block_migratable_vmis"]["children"]
|
||||
assert "fedora_40" in groups
|
||||
assert host in groups["fedora_40"]["children"]
|
||||
@@ -0,0 +1,70 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2024 Red Hat, Inc.
|
||||
# Apache License 2.0 (see LICENSE or http://www.apache.org/licenses/LICENSE-2.0)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import (
|
||||
InventoryOptions,
|
||||
)
|
||||
|
||||
from ansible_collections.kubevirt.core.tests.unit.plugins.inventory.constants import (
|
||||
DEFAULT_NAMESPACE,
|
||||
)
|
||||
|
||||
VM1 = {
|
||||
"metadata": {
|
||||
"name": "testvm1",
|
||||
"namespace": "default",
|
||||
"uid": "940003aa-0160-4b7e-9e55-8ec3df72047f",
|
||||
},
|
||||
"spec": {"running": True},
|
||||
}
|
||||
|
||||
VM2 = {
|
||||
"metadata": {
|
||||
"name": "testvm2",
|
||||
"namespace": "default",
|
||||
"uid": "c2c68de5-b9d7-4c25-872f-462e7245b3e6",
|
||||
},
|
||||
"spec": {"running": False},
|
||||
}
|
||||
|
||||
VMI1 = {
|
||||
"metadata": {
|
||||
"name": "testvm1",
|
||||
"namespace": "default",
|
||||
"uid": "a84319a9-db31-4a36-9b66-3e387578f871",
|
||||
},
|
||||
"status": {
|
||||
"interfaces": [{"ipAddress": "10.10.10.10"}],
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"client",
|
||||
[
|
||||
({"vms": [VM1, VM2], "vmis": [VMI1]}),
|
||||
],
|
||||
indirect=["client"],
|
||||
)
|
||||
def test_stopped_vm(inventory, hosts, client):
|
||||
inventory.populate_inventory_from_namespace(
|
||||
client, "", DEFAULT_NAMESPACE, InventoryOptions()
|
||||
)
|
||||
|
||||
# The running VM should be present with ansible_host or ansible_port
|
||||
assert "default-testvm1" in hosts
|
||||
assert "ansible_host" in hosts["default-testvm1"]
|
||||
assert "ansible_port" in hosts["default-testvm1"]
|
||||
|
||||
# The stopped VM should be present without ansible_host or ansible_port
|
||||
assert "default-testvm2" in hosts
|
||||
assert "ansible_host" not in hosts["default-testvm2"]
|
||||
assert "ansible_port" not in hosts["default-testvm2"]
|
||||
151
tests/unit/plugins/inventory/conftest.py
Normal file
151
tests/unit/plugins/inventory/conftest.py
Normal file
@@ -0,0 +1,151 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2024 Red Hat, Inc.
|
||||
# Apache License 2.0 (see LICENSE or http://www.apache.org/licenses/LICENSE-2.0)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
import pytest
|
||||
|
||||
from addict import Dict
|
||||
|
||||
from ansible.template import Templar
|
||||
|
||||
from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import (
|
||||
InventoryModule,
|
||||
)
|
||||
|
||||
from ansible_collections.kubevirt.core.tests.unit.plugins.inventory.constants import (
|
||||
DEFAULT_BASE_DOMAIN,
|
||||
DEFAULT_NAMESPACE,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def inventory(mocker):
|
||||
inventory = InventoryModule()
|
||||
inventory.inventory = mocker.Mock()
|
||||
inventory.templar = Templar(loader=None)
|
||||
inventory._options = {
|
||||
"compose": {},
|
||||
"groups": {},
|
||||
"keyed_groups": [],
|
||||
"strict": True,
|
||||
}
|
||||
return inventory
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def inventory_data(mocker, inventory):
|
||||
groups = {}
|
||||
hosts = {}
|
||||
|
||||
def add_group(group):
|
||||
if group not in groups:
|
||||
groups[group] = {"children": [], "vars": {}}
|
||||
return group
|
||||
|
||||
def add_child(group, name):
|
||||
if name not in groups[group]["children"]:
|
||||
groups[group]["children"].append(name)
|
||||
|
||||
def add_host(host, group=None):
|
||||
if host not in hosts:
|
||||
hosts[host] = {}
|
||||
if group is not None:
|
||||
add_child(group, host)
|
||||
|
||||
def get_host(hostname):
|
||||
host = mocker.Mock()
|
||||
host.get_vars = mocker.Mock(return_value=hosts[hostname])
|
||||
return host
|
||||
|
||||
def set_variable(name, key, value):
|
||||
if name in groups:
|
||||
groups[name]["vars"][key] = value
|
||||
else:
|
||||
hosts[name][key] = value
|
||||
|
||||
mocker.patch.object(inventory.inventory, "add_group", add_group)
|
||||
mocker.patch.object(inventory.inventory, "add_child", add_child)
|
||||
mocker.patch.object(inventory.inventory, "add_host", add_host)
|
||||
mocker.patch.object(inventory.inventory, "get_host", get_host)
|
||||
mocker.patch.object(inventory.inventory, "set_variable", set_variable)
|
||||
return groups, hosts
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def groups(inventory_data):
|
||||
return inventory_data[0]
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def hosts(inventory_data):
|
||||
return inventory_data[1]
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def client(mocker, request):
|
||||
param = {}
|
||||
if hasattr(request, "param"):
|
||||
param = request.param
|
||||
|
||||
namespaces = mocker.Mock()
|
||||
if "namespaces" in param:
|
||||
items = param["namespaces"]
|
||||
else:
|
||||
items = [{"metadata": {"name": DEFAULT_NAMESPACE}}]
|
||||
namespaces.items = [Dict(item) for item in items]
|
||||
|
||||
vms = mocker.Mock()
|
||||
vms.items = [Dict(item) for item in param.get("vms", [])]
|
||||
vmis = mocker.Mock()
|
||||
vmis.items = [Dict(item) for item in param.get("vmis", [])]
|
||||
services = mocker.Mock()
|
||||
services.items = [Dict(item) for item in param.get("services", [])]
|
||||
|
||||
dns = mocker.Mock()
|
||||
if "base_domain" in param:
|
||||
base_domain = param["base_domain"]
|
||||
else:
|
||||
base_domain = DEFAULT_BASE_DOMAIN
|
||||
dns_obj = Dict({"spec": {"baseDomain": base_domain}})
|
||||
dns.items = [dns_obj]
|
||||
|
||||
namespace_client = mocker.Mock()
|
||||
namespace_client.get = mocker.Mock(return_value=namespaces)
|
||||
vm_client = mocker.Mock()
|
||||
vm_client.get = mocker.Mock(return_value=vms)
|
||||
vmi_client = mocker.Mock()
|
||||
vmi_client.get = mocker.Mock(return_value=vmis)
|
||||
service_client = mocker.Mock()
|
||||
service_client.get = mocker.Mock(return_value=services)
|
||||
|
||||
def dns_client_get(**kwargs):
|
||||
if "name" in kwargs:
|
||||
return dns_obj
|
||||
return dns
|
||||
|
||||
dns_client = mocker.Mock()
|
||||
dns_client.get = dns_client_get
|
||||
|
||||
def resources_get(api_version="", kind=""):
|
||||
if api_version.lower() == "v1":
|
||||
if kind.lower() == "namespace":
|
||||
return namespace_client
|
||||
if kind.lower() == "service":
|
||||
return service_client
|
||||
elif api_version.lower() == "config.openshift.io/v1" and kind.lower() == "dns":
|
||||
return dns_client
|
||||
elif "kubevirt.io/" in api_version.lower():
|
||||
if kind.lower() == "virtualmachine":
|
||||
return vm_client
|
||||
if kind.lower() == "virtualmachineinstance":
|
||||
return vmi_client
|
||||
|
||||
return None
|
||||
|
||||
client = mocker.Mock()
|
||||
client.resources.get = resources_get
|
||||
return client
|
||||
@@ -8,6 +8,9 @@ __metaclass__ = type
|
||||
|
||||
from copy import deepcopy
|
||||
|
||||
DEFAULT_NAMESPACE = "default"
|
||||
DEFAULT_BASE_DOMAIN = "example.com"
|
||||
|
||||
|
||||
def merge_dicts(dict1, dict2):
|
||||
merged = deepcopy(dict1)
|
||||
File diff suppressed because it is too large
Load Diff
82
tests/unit/plugins/inventory/test_kubevirt_add_host.py
Normal file
82
tests/unit/plugins/inventory/test_kubevirt_add_host.py
Normal file
@@ -0,0 +1,82 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2024 Red Hat, Inc.
|
||||
# Apache License 2.0 (see LICENSE or http://www.apache.org/licenses/LICENSE-2.0)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
import pytest
|
||||
|
||||
from addict import Dict
|
||||
|
||||
from ansible_collections.kubevirt.core.tests.unit.plugins.inventory.constants import (
|
||||
DEFAULT_NAMESPACE,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"host_format,expected",
|
||||
[
|
||||
("static", "static"),
|
||||
("{name}", "testvm"),
|
||||
("{name}-static", "testvm-static"),
|
||||
("{namespace}", "default"),
|
||||
("{uid}", "f8abae7c-d792-4b9b-af95-62d322ae5bc1"),
|
||||
("{name}-{namespace}", "testvm-default"),
|
||||
("{name}-{namespace}-static", "testvm-default-static"),
|
||||
("{name}-{uid}", "testvm-f8abae7c-d792-4b9b-af95-62d322ae5bc1"),
|
||||
("{namespace}-{name}", "default-testvm"),
|
||||
("{namespace}-{uid}", "default-f8abae7c-d792-4b9b-af95-62d322ae5bc1"),
|
||||
("{uid}-{name}", "f8abae7c-d792-4b9b-af95-62d322ae5bc1-testvm"),
|
||||
("{uid}-{namespace}", "f8abae7c-d792-4b9b-af95-62d322ae5bc1-default"),
|
||||
(
|
||||
"{name}-{namespace}-{uid}",
|
||||
"testvm-default-f8abae7c-d792-4b9b-af95-62d322ae5bc1",
|
||||
),
|
||||
(
|
||||
"{name}-{namespace}-{uid}-static",
|
||||
"testvm-default-f8abae7c-d792-4b9b-af95-62d322ae5bc1-static",
|
||||
),
|
||||
(
|
||||
"{name}-{uid}-{namespace}",
|
||||
"testvm-f8abae7c-d792-4b9b-af95-62d322ae5bc1-default",
|
||||
),
|
||||
(
|
||||
"{namespace}-{name}-{uid}",
|
||||
"default-testvm-f8abae7c-d792-4b9b-af95-62d322ae5bc1",
|
||||
),
|
||||
(
|
||||
"{namespace}-{uid}-{name}",
|
||||
"default-f8abae7c-d792-4b9b-af95-62d322ae5bc1-testvm",
|
||||
),
|
||||
(
|
||||
"{uid}-{namespace}-{name}",
|
||||
"f8abae7c-d792-4b9b-af95-62d322ae5bc1-default-testvm",
|
||||
),
|
||||
(
|
||||
"{uid}-{name}-{namespace}",
|
||||
"f8abae7c-d792-4b9b-af95-62d322ae5bc1-testvm-default",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_add_host(inventory, groups, hosts, host_format, expected):
|
||||
namespace_group = "namespace_default"
|
||||
inventory.inventory.add_group(namespace_group)
|
||||
|
||||
inventory.add_host(
|
||||
Dict(
|
||||
{
|
||||
"metadata": {
|
||||
"name": "testvm",
|
||||
"namespace": DEFAULT_NAMESPACE,
|
||||
"uid": "f8abae7c-d792-4b9b-af95-62d322ae5bc1",
|
||||
}
|
||||
}
|
||||
),
|
||||
host_format,
|
||||
namespace_group,
|
||||
)
|
||||
|
||||
assert expected in hosts
|
||||
assert expected in groups[namespace_group]["children"]
|
||||
@@ -1,119 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2024 Red Hat, Inc.
|
||||
# Apache License 2.0 (see LICENSE or http://www.apache.org/licenses/LICENSE-2.0)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
import pytest
|
||||
|
||||
from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import (
|
||||
InventoryModule,
|
||||
)
|
||||
|
||||
from ansible.template import Templar
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def inventory_composable_vars(mocker):
|
||||
inventory = InventoryModule()
|
||||
|
||||
inventory.templar = Templar(loader=None)
|
||||
|
||||
inventory._options = {
|
||||
"compose": {"block_migratable_vmis": "vmi_migration_method"},
|
||||
"strict": True,
|
||||
"groups": {"vmi_node_groups": "cluster_name"},
|
||||
"keyed_groups": [{"prefix": "fedora", "key": "vmi_guest_os_info.version"}],
|
||||
}
|
||||
|
||||
inventory.inventory = mocker.Mock()
|
||||
|
||||
return inventory
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def host_vars(monkeypatch, inventory_composable_vars):
|
||||
host_vars = {}
|
||||
|
||||
def set_variable(host, key, value):
|
||||
if host not in host_vars:
|
||||
host_vars[host] = {}
|
||||
host_vars[host][key] = value
|
||||
|
||||
monkeypatch.setattr(
|
||||
inventory_composable_vars.inventory, "set_variable", set_variable
|
||||
)
|
||||
return host_vars
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def add_group(monkeypatch, inventory_composable_vars):
|
||||
groups = []
|
||||
|
||||
def add_group(name):
|
||||
if name not in groups:
|
||||
groups.append(name)
|
||||
return name
|
||||
|
||||
monkeypatch.setattr(inventory_composable_vars.inventory, "add_group", add_group)
|
||||
return groups
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def add_host(monkeypatch, inventory_composable_vars):
|
||||
hosts = []
|
||||
|
||||
def add_host(name, group=None):
|
||||
if name not in hosts:
|
||||
hosts.append(name)
|
||||
if group is not None and group not in hosts:
|
||||
hosts.append(group)
|
||||
|
||||
monkeypatch.setattr(inventory_composable_vars.inventory, "add_host", add_host)
|
||||
return hosts
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def add_child(monkeypatch, inventory_composable_vars):
|
||||
children = {}
|
||||
|
||||
def add_child(group, name):
|
||||
if group not in children:
|
||||
children[group] = []
|
||||
if name not in children[group]:
|
||||
children[group].append(name)
|
||||
|
||||
monkeypatch.setattr(inventory_composable_vars.inventory, "add_child", add_child)
|
||||
return children
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def vmi_host_vars():
|
||||
return {
|
||||
"vmi_migration_method": "BlockMigration",
|
||||
"vmi_guest_os_info": {"id": "fedora", "version": "39"},
|
||||
"cluster_name": {"test-cluster"},
|
||||
}
|
||||
|
||||
|
||||
def test_set_composable_vars(
|
||||
inventory_composable_vars,
|
||||
mocker,
|
||||
host_vars,
|
||||
add_group,
|
||||
add_child,
|
||||
add_host,
|
||||
vmi_host_vars,
|
||||
):
|
||||
get_vars = mocker.patch.object(
|
||||
inventory_composable_vars.inventory.get_host(), "get_vars"
|
||||
)
|
||||
get_vars.return_value = vmi_host_vars
|
||||
inventory_composable_vars.set_composable_vars("testvmi")
|
||||
|
||||
assert {"testvmi": {"block_migratable_vmis": "BlockMigration"}} == host_vars
|
||||
assert ["vmi_node_groups", "fedora_39"] == add_group
|
||||
assert {"vmi_node_groups": ["testvmi"]} == add_child
|
||||
assert ["testvmi", "fedora_39"] == add_host
|
||||
193
tests/unit/plugins/inventory/test_kubevirt_fetch_objects.py
Normal file
193
tests/unit/plugins/inventory/test_kubevirt_fetch_objects.py
Normal file
@@ -0,0 +1,193 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2024 Red Hat, Inc.
|
||||
# Apache License 2.0 (see LICENSE or http://www.apache.org/licenses/LICENSE-2.0)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import (
|
||||
InventoryOptions,
|
||||
KubeVirtInventoryException,
|
||||
)
|
||||
|
||||
from ansible_collections.kubevirt.core.tests.unit.plugins.inventory.constants import (
|
||||
DEFAULT_NAMESPACE,
|
||||
)
|
||||
|
||||
from ansible_collections.kubevirt.core.plugins.inventory import kubevirt
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"connections,expected",
|
||||
[
|
||||
(
|
||||
None,
|
||||
[
|
||||
{
|
||||
"name": "default-hostname",
|
||||
"namespace": DEFAULT_NAMESPACE,
|
||||
"opts": InventoryOptions(),
|
||||
},
|
||||
],
|
||||
),
|
||||
(
|
||||
[
|
||||
{
|
||||
"name": "test",
|
||||
},
|
||||
],
|
||||
[
|
||||
{
|
||||
"name": "test",
|
||||
"namespace": DEFAULT_NAMESPACE,
|
||||
"opts": InventoryOptions(),
|
||||
},
|
||||
],
|
||||
),
|
||||
(
|
||||
[
|
||||
{"name": "test", "namespaces": ["test"]},
|
||||
],
|
||||
[
|
||||
{"name": "test", "namespace": "test", "opts": InventoryOptions()},
|
||||
],
|
||||
),
|
||||
(
|
||||
[
|
||||
{
|
||||
"name": "test",
|
||||
"namespaces": ["test"],
|
||||
"use_service": True,
|
||||
"create_groups": True,
|
||||
"append_base_domain": True,
|
||||
"base_domain": "test-domain",
|
||||
},
|
||||
],
|
||||
[
|
||||
{
|
||||
"name": "test",
|
||||
"namespace": "test",
|
||||
"opts": InventoryOptions(
|
||||
use_service=True,
|
||||
create_groups=True,
|
||||
append_base_domain=True,
|
||||
base_domain="test-domain",
|
||||
),
|
||||
},
|
||||
],
|
||||
),
|
||||
(
|
||||
[
|
||||
{
|
||||
"name": "test",
|
||||
"namespaces": ["test"],
|
||||
"use_service": True,
|
||||
"create_groups": True,
|
||||
"append_base_domain": True,
|
||||
"base_domain": "test-domain",
|
||||
"network_name": "test-network",
|
||||
},
|
||||
],
|
||||
[
|
||||
{
|
||||
"name": "test",
|
||||
"namespace": "test",
|
||||
"opts": InventoryOptions(
|
||||
use_service=True,
|
||||
create_groups=True,
|
||||
append_base_domain=True,
|
||||
base_domain="test-domain",
|
||||
network_name="test-network",
|
||||
),
|
||||
},
|
||||
],
|
||||
),
|
||||
(
|
||||
[
|
||||
{
|
||||
"name": "test",
|
||||
"namespaces": ["test"],
|
||||
"use_service": True,
|
||||
"create_groups": True,
|
||||
"append_base_domain": True,
|
||||
"base_domain": "test-domain",
|
||||
"interface_name": "test-interface",
|
||||
},
|
||||
],
|
||||
[
|
||||
{
|
||||
"name": "test",
|
||||
"namespace": "test",
|
||||
"opts": InventoryOptions(
|
||||
use_service=True,
|
||||
create_groups=True,
|
||||
append_base_domain=True,
|
||||
base_domain="test-domain",
|
||||
network_name="test-interface",
|
||||
),
|
||||
},
|
||||
],
|
||||
),
|
||||
(
|
||||
[
|
||||
{
|
||||
"name": "test",
|
||||
},
|
||||
{"name": "test", "namespaces": ["test"]},
|
||||
],
|
||||
[
|
||||
{
|
||||
"name": "test",
|
||||
"namespace": DEFAULT_NAMESPACE,
|
||||
"opts": InventoryOptions(),
|
||||
},
|
||||
{"name": "test", "namespace": "test", "opts": InventoryOptions()},
|
||||
],
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_fetch_objects(mocker, inventory, connections, expected):
|
||||
mocker.patch.object(kubevirt, "get_api_client")
|
||||
mocker.patch.object(
|
||||
inventory, "get_default_host_name", return_value="default-hostname"
|
||||
)
|
||||
|
||||
cluster_domain = "test.com"
|
||||
mocker.patch.object(inventory, "get_cluster_domain", return_value=cluster_domain)
|
||||
for e in expected:
|
||||
e["opts"].base_domain = e["opts"].base_domain or cluster_domain
|
||||
|
||||
get_available_namespaces = mocker.patch.object(
|
||||
inventory, "get_available_namespaces", return_value=[DEFAULT_NAMESPACE]
|
||||
)
|
||||
populate_inventory_from_namespace = mocker.patch.object(
|
||||
inventory, "populate_inventory_from_namespace"
|
||||
)
|
||||
|
||||
inventory.fetch_objects(connections)
|
||||
|
||||
get_available_namespaces.assert_has_calls(
|
||||
[mocker.call(mocker.ANY) for c in connections or [{}] if "namespaces" not in c]
|
||||
)
|
||||
populate_inventory_from_namespace.assert_has_calls(
|
||||
[
|
||||
mocker.call(mocker.ANY, e["name"], e["namespace"], e["opts"])
|
||||
for e in expected
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"connections,expected",
|
||||
[
|
||||
("test", "Expecting connections to be a list."),
|
||||
(["test", "test"], "Expecting connection to be a dictionary."),
|
||||
],
|
||||
)
|
||||
def test_fetch_objects_exceptions(inventory, connections, expected):
|
||||
with pytest.raises(KubeVirtInventoryException, match=expected):
|
||||
inventory.fetch_objects(connections)
|
||||
@@ -0,0 +1,56 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2024 Red Hat, Inc.
|
||||
# Apache License 2.0 (see LICENSE or http://www.apache.org/licenses/LICENSE-2.0)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
from json import dumps
|
||||
|
||||
import pytest
|
||||
|
||||
from kubernetes.dynamic.exceptions import DynamicApiError
|
||||
|
||||
from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import (
|
||||
InventoryModule,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def body_error(mocker):
|
||||
error = DynamicApiError(e=mocker.Mock())
|
||||
error.headers = None
|
||||
error.body = "This is a test error"
|
||||
return error
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def message_error(mocker):
|
||||
error = DynamicApiError(e=mocker.Mock())
|
||||
error.headers = {"Content-Type": "application/json"}
|
||||
error.body = dumps({"message": "This is a test error"}).encode("utf-8")
|
||||
return error
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def status_reason_error(mocker):
|
||||
error = DynamicApiError(e=mocker.Mock())
|
||||
error.body = None
|
||||
error.status = 404
|
||||
error.reason = "This is a test error"
|
||||
return error
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"exc,expected",
|
||||
[
|
||||
("body_error", "This is a test error"),
|
||||
("message_error", "This is a test error"),
|
||||
("status_reason_error", "404 Reason: This is a test error"),
|
||||
],
|
||||
)
|
||||
def test_format_dynamic_api_exc(request, exc, expected):
|
||||
assert (
|
||||
InventoryModule.format_dynamic_api_exc(request.getfixturevalue(exc)) == expected
|
||||
)
|
||||
106
tests/unit/plugins/inventory/test_kubevirt_get_resources.py
Normal file
106
tests/unit/plugins/inventory/test_kubevirt_get_resources.py
Normal file
@@ -0,0 +1,106 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2024 Red Hat, Inc.
|
||||
# Apache License 2.0 (see LICENSE or http://www.apache.org/licenses/LICENSE-2.0)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
import pytest
|
||||
|
||||
from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import (
|
||||
InventoryOptions,
|
||||
)
|
||||
|
||||
from ansible_collections.kubevirt.core.tests.unit.plugins.inventory.constants import (
|
||||
DEFAULT_NAMESPACE,
|
||||
DEFAULT_BASE_DOMAIN,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"client",
|
||||
[
|
||||
{
|
||||
"vms": [{"metadata": {"name": "testvm"}}],
|
||||
"vmis": [{"metadata": {"name": "testvmi"}}],
|
||||
"services": [{"metadata": {"name": "testsvc"}}],
|
||||
},
|
||||
],
|
||||
indirect=["client"],
|
||||
)
|
||||
def test_get_resources(inventory, client):
|
||||
assert inventory.get_resources(client, "v1", "Namespace") == [
|
||||
{"metadata": {"name": DEFAULT_NAMESPACE}}
|
||||
]
|
||||
assert inventory.get_resources(client, "v1", "Service") == [
|
||||
{"metadata": {"name": "testsvc"}}
|
||||
]
|
||||
assert inventory.get_resources(client, "config.openshift.io/v1", "DNS") == [
|
||||
{"spec": {"baseDomain": DEFAULT_BASE_DOMAIN}}
|
||||
]
|
||||
assert inventory.get_resources(client, "kubevirt.io/v1", "VirtualMachine") == [
|
||||
{"metadata": {"name": "testvm"}}
|
||||
]
|
||||
assert inventory.get_resources(
|
||||
client, "kubevirt.io/v1", "VirtualMachineInstance"
|
||||
) == [{"metadata": {"name": "testvmi"}}]
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"client,expected",
|
||||
[
|
||||
(
|
||||
{},
|
||||
[DEFAULT_NAMESPACE],
|
||||
),
|
||||
(
|
||||
{
|
||||
"namespaces": [
|
||||
{"metadata": {"name": DEFAULT_NAMESPACE}},
|
||||
{"metadata": {"name": "test"}},
|
||||
]
|
||||
},
|
||||
[DEFAULT_NAMESPACE, "test"],
|
||||
),
|
||||
],
|
||||
indirect=["client"],
|
||||
)
|
||||
def test_get_available_namespaces(inventory, client, expected):
|
||||
assert inventory.get_available_namespaces(client) == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"client",
|
||||
[
|
||||
{
|
||||
"vms": [
|
||||
{"metadata": {"name": "testvm1"}},
|
||||
{"metadata": {"name": "testvm2"}},
|
||||
],
|
||||
},
|
||||
],
|
||||
indirect=["client"],
|
||||
)
|
||||
def test_get_vms_for_namespace(inventory, client):
|
||||
assert inventory.get_vms_for_namespace(
|
||||
client, DEFAULT_NAMESPACE, InventoryOptions()
|
||||
) == [{"metadata": {"name": "testvm1"}}, {"metadata": {"name": "testvm2"}}]
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"client",
|
||||
[
|
||||
{
|
||||
"vmis": [
|
||||
{"metadata": {"name": "testvmi1"}},
|
||||
{"metadata": {"name": "testvmi2"}},
|
||||
],
|
||||
},
|
||||
],
|
||||
indirect=["client"],
|
||||
)
|
||||
def test_get_vmis_for_namespace(inventory, client):
|
||||
assert inventory.get_vmis_for_namespace(
|
||||
client, DEFAULT_NAMESPACE, InventoryOptions()
|
||||
) == [{"metadata": {"name": "testvmi1"}}, {"metadata": {"name": "testvmi2"}}]
|
||||
@@ -0,0 +1,168 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2024 Red Hat, Inc.
|
||||
# Apache License 2.0 (see LICENSE or http://www.apache.org/licenses/LICENSE-2.0)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
import pytest
|
||||
|
||||
from ansible_collections.kubevirt.core.tests.unit.plugins.inventory.constants import (
|
||||
DEFAULT_NAMESPACE,
|
||||
)
|
||||
|
||||
SVC_LB_SSH = {
|
||||
"apiVersion": "v1",
|
||||
"kind": "Service",
|
||||
"metadata": {"name": "test-lb-ssh"},
|
||||
"spec": {
|
||||
"ports": [
|
||||
{
|
||||
"protocol": "TCP",
|
||||
"port": 22,
|
||||
"targetPort": 22,
|
||||
},
|
||||
],
|
||||
"type": "LoadBalancer",
|
||||
"selector": {"kubevirt.io/domain": "test-lb-ssh"},
|
||||
},
|
||||
}
|
||||
|
||||
SVC_NP_SSH = {
|
||||
"apiVersion": "v1",
|
||||
"kind": "Service",
|
||||
"metadata": {"name": "test-np-ssh"},
|
||||
"spec": {
|
||||
"ports": [
|
||||
{
|
||||
"protocol": "TCP",
|
||||
"port": 22,
|
||||
"targetPort": 22,
|
||||
},
|
||||
],
|
||||
"type": "NodePort",
|
||||
"selector": {"kubevirt.io/domain": "test-np-ssh"},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"client",
|
||||
[
|
||||
{
|
||||
"services": [SVC_LB_SSH, SVC_NP_SSH],
|
||||
},
|
||||
],
|
||||
indirect=["client"],
|
||||
)
|
||||
def test_get_ssh_services_for_namespace(inventory, client):
|
||||
assert inventory.get_ssh_services_for_namespace(client, DEFAULT_NAMESPACE) == {
|
||||
"test-lb-ssh": SVC_LB_SSH,
|
||||
"test-np-ssh": SVC_NP_SSH,
|
||||
}
|
||||
|
||||
|
||||
SVC_CLUSTERIP = {
|
||||
"apiVersion": "v1",
|
||||
"kind": "Service",
|
||||
"metadata": {"name": "test-clusterip"},
|
||||
"spec": {
|
||||
"ports": [
|
||||
{
|
||||
"protocol": "TCP",
|
||||
"port": 22,
|
||||
"targetPort": 22,
|
||||
},
|
||||
],
|
||||
"type": "ClusterIP",
|
||||
"selector": {"kubevirt.io/domain": "test-clusterip"},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
SVC_HTTP = {
|
||||
"apiVersion": "v1",
|
||||
"kind": "Service",
|
||||
"metadata": {"name": "test-http"},
|
||||
"spec": {
|
||||
"ports": [
|
||||
{
|
||||
"protocol": "TCP",
|
||||
"port": 80,
|
||||
"targetPort": 80,
|
||||
},
|
||||
],
|
||||
"type": "LoadBalancer",
|
||||
"selector": {"kubevirt.io/domain": "test-http"},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
SVC_NO_SPEC = {
|
||||
"apiVersion": "v1",
|
||||
"kind": "Service",
|
||||
"metadata": {"name": "test-no-spec"},
|
||||
}
|
||||
|
||||
SVC_NO_TYPE = {
|
||||
"apiVersion": "v1",
|
||||
"kind": "Service",
|
||||
"metadata": {"name": "test-no-type"},
|
||||
"spec": {
|
||||
"ports": [
|
||||
{
|
||||
"protocol": "TCP",
|
||||
"port": 22,
|
||||
"targetPort": 22,
|
||||
},
|
||||
],
|
||||
"selector": {"kubevirt.io/domain": "test-no-type"},
|
||||
},
|
||||
}
|
||||
|
||||
SVC_NO_PORTS = {
|
||||
"apiVersion": "v1",
|
||||
"kind": "Service",
|
||||
"metadata": {"name": "test-no-ports"},
|
||||
"spec": {
|
||||
"type": "LoadBalancer",
|
||||
"selector": {"kubevirt.io/domain": "test-no-ports"},
|
||||
},
|
||||
}
|
||||
|
||||
SVC_NO_SELECTOR = {
|
||||
"apiVersion": "v1",
|
||||
"kind": "Service",
|
||||
"metadata": {"name": "test-no-selector"},
|
||||
"spec": {
|
||||
"ports": [
|
||||
{
|
||||
"protocol": "TCP",
|
||||
"port": 22,
|
||||
"targetPort": 22,
|
||||
},
|
||||
],
|
||||
"type": "LoadBalancer",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"client",
|
||||
[
|
||||
{
|
||||
"services": [
|
||||
SVC_HTTP,
|
||||
SVC_CLUSTERIP,
|
||||
SVC_NO_SPEC,
|
||||
SVC_NO_TYPE,
|
||||
SVC_NO_PORTS,
|
||||
SVC_NO_SELECTOR,
|
||||
],
|
||||
},
|
||||
],
|
||||
indirect=["client"],
|
||||
)
|
||||
def test_ignore_unwanted_services(inventory, client):
|
||||
assert not inventory.get_ssh_services_for_namespace(client, DEFAULT_NAMESPACE)
|
||||
@@ -0,0 +1,57 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2024 Red Hat, Inc.
|
||||
# Apache License 2.0 (see LICENSE or http://www.apache.org/licenses/LICENSE-2.0)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import (
|
||||
InventoryOptions,
|
||||
)
|
||||
|
||||
|
||||
def test_inventory_options_defaults():
|
||||
opts = InventoryOptions()
|
||||
assert opts.api_version == "kubevirt.io/v1"
|
||||
assert opts.label_selector is None
|
||||
assert opts.network_name is None
|
||||
assert opts.kube_secondary_dns is False
|
||||
assert opts.use_service is True
|
||||
assert opts.create_groups is False
|
||||
assert opts.base_domain is None
|
||||
assert opts.append_base_domain is False
|
||||
assert opts.host_format == "{namespace}-{name}"
|
||||
|
||||
|
||||
def test_inventory_options_override_defaults():
|
||||
api_version = "test/v1"
|
||||
label_selector = "test-selector"
|
||||
network_name = "test-network"
|
||||
kube_secondary_dns = True
|
||||
use_service = False
|
||||
create_groups = True
|
||||
base_domain = "test-domain.com"
|
||||
append_base_domain = True
|
||||
host_format = "{name}-testhost"
|
||||
|
||||
opts = InventoryOptions(
|
||||
api_version=api_version,
|
||||
label_selector=label_selector,
|
||||
network_name=network_name,
|
||||
kube_secondary_dns=kube_secondary_dns,
|
||||
use_service=use_service,
|
||||
create_groups=create_groups,
|
||||
base_domain=base_domain,
|
||||
append_base_domain=append_base_domain,
|
||||
host_format=host_format,
|
||||
)
|
||||
assert opts.api_version == api_version
|
||||
assert opts.label_selector == label_selector
|
||||
assert opts.network_name == network_name
|
||||
assert opts.kube_secondary_dns == kube_secondary_dns
|
||||
assert opts.use_service == use_service
|
||||
assert opts.create_groups == create_groups
|
||||
assert opts.base_domain == base_domain
|
||||
assert opts.append_base_domain == append_base_domain
|
||||
assert opts.host_format == host_format
|
||||
@@ -0,0 +1,163 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2024 Red Hat, Inc.
|
||||
# Apache License 2.0 (see LICENSE or http://www.apache.org/licenses/LICENSE-2.0)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
import pytest
|
||||
|
||||
from addict import Dict
|
||||
|
||||
|
||||
from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import (
|
||||
InventoryOptions,
|
||||
)
|
||||
|
||||
from ansible_collections.kubevirt.core.tests.unit.plugins.inventory.constants import (
|
||||
DEFAULT_NAMESPACE,
|
||||
)
|
||||
|
||||
VM1 = {
|
||||
"metadata": {
|
||||
"name": "testvm1",
|
||||
"namespace": DEFAULT_NAMESPACE,
|
||||
"uid": "940003aa-0160-4b7e-9e55-8ec3df72047f",
|
||||
},
|
||||
}
|
||||
|
||||
VM2 = {
|
||||
"metadata": {
|
||||
"name": "testvm2",
|
||||
"namespace": DEFAULT_NAMESPACE,
|
||||
"uid": "c2c68de5-b9d7-4c25-872f-462e7245b3e6",
|
||||
},
|
||||
}
|
||||
|
||||
VMI1 = {
|
||||
"metadata": {
|
||||
"name": "testvm1",
|
||||
"namespace": DEFAULT_NAMESPACE,
|
||||
"uid": "a84319a9-db31-4a36-9b66-3e387578f871",
|
||||
},
|
||||
"status": {
|
||||
"interfaces": [{"ipAddress": "10.10.10.10"}],
|
||||
},
|
||||
}
|
||||
|
||||
VMI2 = {
|
||||
"metadata": {
|
||||
"name": "testvm2",
|
||||
"namespace": DEFAULT_NAMESPACE,
|
||||
"uid": "fd35700a-9cbe-488b-8f32-7adbe57eadc2",
|
||||
},
|
||||
"status": {
|
||||
"interfaces": [{"ipAddress": "10.10.10.10"}],
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"vms,vmis,expected",
|
||||
[
|
||||
([], [], 0),
|
||||
([VM1], [], 1),
|
||||
([VM1, VM2], [], 2),
|
||||
([VM1], [VMI1], 1),
|
||||
([VM2], [VMI2], 1),
|
||||
([VM1], [VMI2], 2),
|
||||
([VM2], [VMI1], 2),
|
||||
([VM1], [VMI1, VMI2], 2),
|
||||
([VM2], [VMI1, VMI2], 2),
|
||||
([VM1, VM2], [VMI1, VMI2], 2),
|
||||
([], [VMI1], 1),
|
||||
([], [VMI1, VMI2], 2),
|
||||
],
|
||||
)
|
||||
def test_populate_inventory_from_namespace(
|
||||
mocker, inventory, groups, vms, vmis, expected
|
||||
):
|
||||
_vms = {vm["metadata"]["name"]: Dict(vm) for vm in vms}
|
||||
_vmis = {vmi["metadata"]["name"]: Dict(vmi) for vmi in vmis}
|
||||
opts = InventoryOptions()
|
||||
|
||||
def format_hostname(obj):
|
||||
return opts.host_format.format(
|
||||
namespace=obj.metadata.namespace,
|
||||
name=obj.metadata.name,
|
||||
uid=obj.metadata.uid,
|
||||
)
|
||||
|
||||
def add_host_call(obj):
|
||||
return mocker.call(
|
||||
obj,
|
||||
opts.host_format,
|
||||
f"namespace_{DEFAULT_NAMESPACE}",
|
||||
)
|
||||
|
||||
add_host_side_effects = []
|
||||
add_host_calls = []
|
||||
set_vars_from_vm_calls = []
|
||||
set_vars_from_vmi_calls = []
|
||||
set_composable_vars_calls = []
|
||||
|
||||
# For each VM add the expected calls
|
||||
# Also add expected calls for VMIs for which a VM exists
|
||||
for name, vm in _vms.items():
|
||||
hostname = format_hostname(vm)
|
||||
add_host_side_effects.append(hostname)
|
||||
add_host_calls.append(add_host_call(vm))
|
||||
set_vars_from_vm_calls.append(mocker.call(hostname, vm, opts))
|
||||
if name in _vmis.keys():
|
||||
set_vars_from_vmi_calls.append(mocker.call(hostname, _vmis[name], [], opts))
|
||||
set_composable_vars_calls.append(mocker.call(hostname))
|
||||
|
||||
# For each VMI add the expected calls
|
||||
# Do not add for VMIs for which a VM exists
|
||||
for name, vmi in _vmis.items():
|
||||
if name not in _vms.keys():
|
||||
hostname = format_hostname(vmi)
|
||||
add_host_side_effects.append(hostname)
|
||||
add_host_calls.append(add_host_call(vmi))
|
||||
set_vars_from_vmi_calls.append(mocker.call(hostname, vmi, [], opts))
|
||||
set_composable_vars_calls.append(mocker.call(hostname))
|
||||
|
||||
get_vms_for_namespace = mocker.patch.object(
|
||||
inventory, "get_vms_for_namespace", return_value=_vms.values()
|
||||
)
|
||||
get_vmis_for_namespace = mocker.patch.object(
|
||||
inventory, "get_vmis_for_namespace", return_value=_vmis.values()
|
||||
)
|
||||
get_ssh_services_for_namespace = mocker.patch.object(
|
||||
inventory, "get_ssh_services_for_namespace", return_value=[]
|
||||
)
|
||||
add_host = mocker.patch.object(
|
||||
inventory, "add_host", side_effect=add_host_side_effects
|
||||
)
|
||||
set_vars_from_vm = mocker.patch.object(inventory, "set_vars_from_vm")
|
||||
set_vars_from_vmi = mocker.patch.object(inventory, "set_vars_from_vmi")
|
||||
set_composable_vars = mocker.patch.object(inventory, "set_composable_vars")
|
||||
|
||||
inventory.populate_inventory_from_namespace(None, "test", DEFAULT_NAMESPACE, opts)
|
||||
|
||||
# These should always get called once
|
||||
get_vms_for_namespace.assert_called_once_with(None, DEFAULT_NAMESPACE, opts)
|
||||
get_vmis_for_namespace.assert_called_once_with(None, DEFAULT_NAMESPACE, opts)
|
||||
|
||||
# Assert it tries to add the expected vars for all provided VMs/VMIs
|
||||
set_vars_from_vm.assert_has_calls(set_vars_from_vm_calls)
|
||||
set_vars_from_vmi.assert_has_calls(set_vars_from_vmi_calls)
|
||||
set_composable_vars.assert_has_calls(set_composable_vars_calls)
|
||||
|
||||
# If no VMs or VMIs were provided the function should not add any groups
|
||||
if vms or vmis:
|
||||
get_ssh_services_for_namespace.assert_called_once_with(None, DEFAULT_NAMESPACE)
|
||||
assert list(groups.keys()) == ["test", f"namespace_{DEFAULT_NAMESPACE}"]
|
||||
else:
|
||||
get_ssh_services_for_namespace.assert_not_called()
|
||||
assert not list(groups.keys())
|
||||
|
||||
# Assert the expected amount of hosts was added
|
||||
add_host.assert_has_calls(add_host_calls)
|
||||
assert len(add_host_calls) == expected
|
||||
@@ -0,0 +1,286 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2024 Red Hat, Inc.
|
||||
# Apache License 2.0 (see LICENSE or http://www.apache.org/licenses/LICENSE-2.0)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
import pytest
|
||||
|
||||
from addict import Dict
|
||||
|
||||
from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import (
|
||||
InventoryOptions,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"opts",
|
||||
[
|
||||
InventoryOptions(),
|
||||
InventoryOptions(kube_secondary_dns=True), # needs network_name
|
||||
InventoryOptions(use_service=True), # needs service
|
||||
],
|
||||
)
|
||||
def test_use_ip_address_by_default(mocker, inventory, opts):
|
||||
set_variable = mocker.patch.object(inventory.inventory, "set_variable")
|
||||
|
||||
hostname = "default-testvm"
|
||||
ip_address = "1.1.1.1"
|
||||
|
||||
inventory.set_ansible_host_and_port(Dict(), hostname, ip_address, None, opts)
|
||||
|
||||
set_variable.assert_has_calls(
|
||||
[
|
||||
mocker.call(hostname, "ansible_host", ip_address),
|
||||
mocker.call(hostname, "ansible_port", None),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"base_domain",
|
||||
[
|
||||
True,
|
||||
False,
|
||||
],
|
||||
)
|
||||
def test_kube_secondary_dns(mocker, inventory, base_domain):
|
||||
set_variable = mocker.patch.object(inventory.inventory, "set_variable")
|
||||
|
||||
hostname = "default-testvm"
|
||||
vmi = Dict(
|
||||
{
|
||||
"metadata": {"name": "testvm", "namespace": "default"},
|
||||
"status": {"interfaces": [{"name": "awesome"}]},
|
||||
}
|
||||
)
|
||||
|
||||
inventory.set_ansible_host_and_port(
|
||||
vmi,
|
||||
hostname,
|
||||
"1.1.1.1",
|
||||
None,
|
||||
InventoryOptions(
|
||||
kube_secondary_dns=True,
|
||||
network_name="awesome",
|
||||
base_domain="example.com" if base_domain else None,
|
||||
),
|
||||
)
|
||||
|
||||
ansible_host = "awesome.testvm.default.vm"
|
||||
if base_domain:
|
||||
ansible_host += ".example.com"
|
||||
|
||||
set_variable.assert_has_calls(
|
||||
[
|
||||
mocker.call(hostname, "ansible_host", ansible_host),
|
||||
mocker.call(hostname, "ansible_port", None),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def test_kube_secondary_dns_precedence_over_service(mocker, inventory):
|
||||
set_variable = mocker.patch.object(inventory.inventory, "set_variable")
|
||||
|
||||
hostname = "default-testvm"
|
||||
vmi = Dict(
|
||||
{
|
||||
"metadata": {"name": "testvm", "namespace": "default"},
|
||||
"status": {"interfaces": [{"name": "awesome"}]},
|
||||
}
|
||||
)
|
||||
|
||||
inventory.set_ansible_host_and_port(
|
||||
vmi,
|
||||
hostname,
|
||||
"1.1.1.1",
|
||||
{"metadata": {"name": "testsvc"}},
|
||||
InventoryOptions(
|
||||
kube_secondary_dns=True, network_name="awesome", use_service=True
|
||||
),
|
||||
)
|
||||
|
||||
set_variable.assert_has_calls(
|
||||
[
|
||||
mocker.call(hostname, "ansible_host", "awesome.testvm.default.vm"),
|
||||
mocker.call(hostname, "ansible_port", None),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"service,expected_host,expected_port",
|
||||
[
|
||||
(
|
||||
{
|
||||
"spec": {
|
||||
"type": "LoadBalancer",
|
||||
"ports": [{"port": 22}],
|
||||
},
|
||||
"status": {
|
||||
"loadBalancer": {"ingress": [{"hostname": "testhost.example.com"}]}
|
||||
},
|
||||
},
|
||||
"testhost.example.com",
|
||||
22,
|
||||
),
|
||||
(
|
||||
{
|
||||
"spec": {
|
||||
"type": "LoadBalancer",
|
||||
"ports": [{"port": 23}],
|
||||
},
|
||||
"status": {
|
||||
"loadBalancer": {
|
||||
"ingress": [
|
||||
{"hostname": "testhost.example.com", "ip": "2.2.2.2"}
|
||||
]
|
||||
}
|
||||
},
|
||||
},
|
||||
"testhost.example.com",
|
||||
23,
|
||||
),
|
||||
(
|
||||
{
|
||||
"spec": {
|
||||
"type": "LoadBalancer",
|
||||
"ports": [{"port": 24}],
|
||||
},
|
||||
"status": {"loadBalancer": {"ingress": [{"ip": "2.2.2.2"}]}},
|
||||
},
|
||||
"2.2.2.2",
|
||||
24,
|
||||
),
|
||||
(
|
||||
{
|
||||
"spec": {
|
||||
"type": "NodePort",
|
||||
"ports": [{"nodePort": 25}],
|
||||
},
|
||||
},
|
||||
"testnode.example.com",
|
||||
25,
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_service(mocker, inventory, service, expected_host, expected_port):
|
||||
set_variable = mocker.patch.object(inventory.inventory, "set_variable")
|
||||
|
||||
hostname = "default-testvm"
|
||||
vmi = Dict(
|
||||
{
|
||||
"status": {
|
||||
"nodeName": "testnode.example.com",
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
inventory.set_ansible_host_and_port(
|
||||
vmi,
|
||||
hostname,
|
||||
"1.1.1.1",
|
||||
service,
|
||||
InventoryOptions(use_service=True),
|
||||
)
|
||||
|
||||
set_variable.assert_has_calls(
|
||||
[
|
||||
mocker.call(hostname, "ansible_host", expected_host),
|
||||
mocker.call(hostname, "ansible_port", expected_port),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def test_service_append_base_domain(mocker, inventory):
|
||||
set_variable = mocker.patch.object(inventory.inventory, "set_variable")
|
||||
|
||||
hostname = "default-testvm"
|
||||
vmi = Dict(
|
||||
{
|
||||
"status": {
|
||||
"nodeName": "testnode",
|
||||
},
|
||||
}
|
||||
)
|
||||
service = {
|
||||
"spec": {
|
||||
"type": "NodePort",
|
||||
"ports": [{"nodePort": 25}],
|
||||
},
|
||||
}
|
||||
inventory.set_ansible_host_and_port(
|
||||
vmi,
|
||||
hostname,
|
||||
"1.1.1.1",
|
||||
service,
|
||||
InventoryOptions(
|
||||
use_service=True, append_base_domain=True, base_domain="awesome.com"
|
||||
),
|
||||
)
|
||||
|
||||
set_variable.assert_has_calls(
|
||||
[
|
||||
mocker.call(hostname, "ansible_host", "testnode.awesome.com"),
|
||||
mocker.call(hostname, "ansible_port", 25),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"host,port",
|
||||
[
|
||||
("testhost.com", None),
|
||||
(None, 22),
|
||||
(None, None),
|
||||
],
|
||||
)
|
||||
def test_service_fallback(mocker, inventory, host, port):
|
||||
set_variable = mocker.patch.object(inventory.inventory, "set_variable")
|
||||
mocker.patch.object(inventory, "get_host_from_service", return_value=host)
|
||||
mocker.patch.object(inventory, "get_port_from_service", return_value=port)
|
||||
|
||||
hostname = "default-testvm"
|
||||
vmi = Dict(
|
||||
{
|
||||
"status": {
|
||||
"nodeName": "testnode",
|
||||
},
|
||||
}
|
||||
)
|
||||
inventory.set_ansible_host_and_port(
|
||||
vmi,
|
||||
hostname,
|
||||
"1.1.1.1",
|
||||
{"something": "something"},
|
||||
InventoryOptions(use_service=True),
|
||||
)
|
||||
|
||||
set_variable.assert_has_calls(
|
||||
[
|
||||
mocker.call(hostname, "ansible_host", "1.1.1.1"),
|
||||
mocker.call(hostname, "ansible_port", None),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def test_no_service_if_network_name(mocker, inventory):
|
||||
set_variable = mocker.patch.object(inventory.inventory, "set_variable")
|
||||
|
||||
hostname = "default-testvm"
|
||||
inventory.set_ansible_host_and_port(
|
||||
Dict(),
|
||||
hostname,
|
||||
"1.2.3.4",
|
||||
{"something": "something"},
|
||||
InventoryOptions(use_service=True, network_name="awesome"),
|
||||
)
|
||||
|
||||
set_variable.assert_has_calls(
|
||||
[
|
||||
mocker.call(hostname, "ansible_host", "1.2.3.4"),
|
||||
mocker.call(hostname, "ansible_port", None),
|
||||
]
|
||||
)
|
||||
122
tests/unit/plugins/inventory/test_kubevirt_set_common_vars.py
Normal file
122
tests/unit/plugins/inventory/test_kubevirt_set_common_vars.py
Normal file
@@ -0,0 +1,122 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2024 Red Hat, Inc.
|
||||
# Apache License 2.0 (see LICENSE or http://www.apache.org/licenses/LICENSE-2.0)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
from random import choice
|
||||
from string import ascii_lowercase
|
||||
|
||||
import pytest
|
||||
|
||||
from addict import Dict
|
||||
|
||||
from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import (
|
||||
InventoryOptions,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"obj,expected",
|
||||
[
|
||||
(
|
||||
{
|
||||
"metadata": {
|
||||
"something": "idontcare",
|
||||
},
|
||||
},
|
||||
{},
|
||||
),
|
||||
(
|
||||
{
|
||||
"metadata": {
|
||||
"annotations": {"testanno": "testval"},
|
||||
},
|
||||
},
|
||||
{"annotations": {"testanno": "testval"}},
|
||||
),
|
||||
(
|
||||
{
|
||||
"metadata": {
|
||||
"labels": {"testlabel": "testval"},
|
||||
},
|
||||
},
|
||||
{"labels": {"testlabel": "testval"}},
|
||||
),
|
||||
(
|
||||
{
|
||||
"metadata": {
|
||||
"resourceVersion": "123",
|
||||
},
|
||||
},
|
||||
{"resource_version": "123"},
|
||||
),
|
||||
(
|
||||
{
|
||||
"metadata": {
|
||||
"uid": "48e6ed2c-d8a2-4172-844d-0fe7056aa180",
|
||||
},
|
||||
},
|
||||
{"uid": "48e6ed2c-d8a2-4172-844d-0fe7056aa180"},
|
||||
),
|
||||
(
|
||||
{
|
||||
"status": {
|
||||
"interfaces": [{"ipAddress": "10.10.10.10"}],
|
||||
},
|
||||
},
|
||||
{"interfaces": [{"ipAddress": "10.10.10.10"}]},
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_set_common_vars(inventory, hosts, obj, expected):
|
||||
hostname = "default-testvm"
|
||||
prefix = "".join(choice(ascii_lowercase) for i in range(5))
|
||||
inventory.inventory.add_host(hostname)
|
||||
inventory.set_common_vars(hostname, prefix, Dict(obj), InventoryOptions())
|
||||
|
||||
for key, value in expected.items():
|
||||
prefixed_key = f"{prefix}_{key}"
|
||||
assert prefixed_key in hosts[hostname]
|
||||
assert hosts[hostname][prefixed_key] == value
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"create_groups",
|
||||
[
|
||||
True,
|
||||
False,
|
||||
],
|
||||
)
|
||||
def test_set_common_vars_create_groups(mocker, inventory, create_groups):
|
||||
mocker.patch.object(inventory.inventory, "set_variable")
|
||||
set_groups_from_labels = mocker.patch.object(inventory, "set_groups_from_labels")
|
||||
|
||||
hostname = "default-testvm"
|
||||
labels = {"testkey": "testval"}
|
||||
opts = InventoryOptions(create_groups=create_groups)
|
||||
|
||||
inventory.set_common_vars(
|
||||
hostname, "prefix", Dict({"metadata": {"labels": labels}}), opts
|
||||
)
|
||||
|
||||
if create_groups:
|
||||
set_groups_from_labels.assert_called_once_with(hostname, labels)
|
||||
else:
|
||||
set_groups_from_labels.assert_not_called()
|
||||
|
||||
|
||||
def test_called_by_set_vars_from(mocker, inventory):
|
||||
hostname = "default-testvm"
|
||||
opts = InventoryOptions()
|
||||
|
||||
set_common_vars = mocker.patch.object(inventory, "set_common_vars")
|
||||
|
||||
inventory.set_vars_from_vm(hostname, Dict(), opts)
|
||||
inventory.set_vars_from_vmi(hostname, Dict(), {}, opts)
|
||||
|
||||
set_common_vars.assert_has_calls(
|
||||
[mocker.call(hostname, "vm", {}, opts), mocker.call(hostname, "vmi", {}, opts)]
|
||||
)
|
||||
120
tests/unit/plugins/inventory/test_kubevirt_set_vars_from_vmi.py
Normal file
120
tests/unit/plugins/inventory/test_kubevirt_set_vars_from_vmi.py
Normal file
@@ -0,0 +1,120 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2024 Red Hat, Inc.
|
||||
# Apache License 2.0 (see LICENSE or http://www.apache.org/licenses/LICENSE-2.0)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
from addict import Dict
|
||||
|
||||
from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import (
|
||||
InventoryOptions,
|
||||
LABEL_KUBEVIRT_IO_DOMAIN,
|
||||
)
|
||||
|
||||
|
||||
def test_ignore_vmi_without_interface(mocker, inventory):
|
||||
mocker.patch.object(inventory, "set_common_vars")
|
||||
set_ansible_host_and_port = mocker.patch.object(
|
||||
inventory, "set_ansible_host_and_port"
|
||||
)
|
||||
|
||||
vmi = Dict({"status": {}})
|
||||
inventory.set_vars_from_vmi("default-testvm", vmi, {}, InventoryOptions())
|
||||
|
||||
set_ansible_host_and_port.assert_not_called()
|
||||
|
||||
|
||||
def test_use_first_interface_by_default(mocker, inventory):
|
||||
mocker.patch.object(inventory, "set_common_vars")
|
||||
set_ansible_host_and_port = mocker.patch.object(
|
||||
inventory, "set_ansible_host_and_port"
|
||||
)
|
||||
|
||||
hostname = "default-testvm"
|
||||
vmi = Dict(
|
||||
{"status": {"interfaces": [{"ipAddress": "1.1.1.1"}, {"ipAddress": "2.2.2.2"}]}}
|
||||
)
|
||||
opts = InventoryOptions()
|
||||
inventory.set_vars_from_vmi(hostname, vmi, {}, opts)
|
||||
|
||||
set_ansible_host_and_port.assert_called_once_with(
|
||||
vmi, hostname, "1.1.1.1", None, opts
|
||||
)
|
||||
|
||||
|
||||
def test_use_named_interface(mocker, inventory):
|
||||
mocker.patch.object(inventory, "set_common_vars")
|
||||
set_ansible_host_and_port = mocker.patch.object(
|
||||
inventory, "set_ansible_host_and_port"
|
||||
)
|
||||
|
||||
hostname = "default-testvm"
|
||||
vmi = Dict(
|
||||
{
|
||||
"status": {
|
||||
"interfaces": [
|
||||
{"name": "first", "ipAddress": "1.1.1.1"},
|
||||
{"name": "second", "ipAddress": "2.2.2.2"},
|
||||
]
|
||||
}
|
||||
}
|
||||
)
|
||||
opts = InventoryOptions(network_name="second")
|
||||
inventory.set_vars_from_vmi(hostname, vmi, {}, opts)
|
||||
|
||||
set_ansible_host_and_port.assert_called_once_with(
|
||||
vmi, hostname, "2.2.2.2", None, opts
|
||||
)
|
||||
|
||||
|
||||
def test_ignore_vmi_without_named_interface(mocker, inventory):
|
||||
mocker.patch.object(inventory, "set_common_vars")
|
||||
set_ansible_host_and_port = mocker.patch.object(
|
||||
inventory, "set_ansible_host_and_port"
|
||||
)
|
||||
|
||||
vmi = Dict(
|
||||
{"status": {"interfaces": [{"name": "somename", "ipAddress": "1.1.1.1"}]}}
|
||||
)
|
||||
inventory.set_vars_from_vmi(
|
||||
"default-testvm", vmi, {}, InventoryOptions(network_name="awesome")
|
||||
)
|
||||
|
||||
set_ansible_host_and_port.assert_not_called()
|
||||
|
||||
|
||||
def test_set_winrm_if_windows(mocker, inventory):
|
||||
mocker.patch.object(inventory, "set_common_vars")
|
||||
mocker.patch.object(inventory, "is_windows", return_value=True)
|
||||
mocker.patch.object(inventory, "set_ansible_host_and_port")
|
||||
set_variable = mocker.patch.object(inventory.inventory, "set_variable")
|
||||
|
||||
hostname = "default-testvm"
|
||||
vmi = Dict({"status": {"interfaces": [{"ipAddress": "1.1.1.1"}]}})
|
||||
inventory.set_vars_from_vmi(hostname, vmi, {}, InventoryOptions())
|
||||
|
||||
set_variable.assert_called_once_with(hostname, "ansible_connection", "winrm")
|
||||
|
||||
|
||||
def test_service_lookup(mocker, inventory):
|
||||
mocker.patch.object(inventory, "set_common_vars")
|
||||
set_ansible_host_and_port = mocker.patch.object(
|
||||
inventory, "set_ansible_host_and_port"
|
||||
)
|
||||
|
||||
hostname = "default-testvm"
|
||||
vmi = Dict(
|
||||
{
|
||||
"metadata": {"labels": {LABEL_KUBEVIRT_IO_DOMAIN: "testdomain"}},
|
||||
"status": {"interfaces": [{"name": "somename", "ipAddress": "1.1.1.1"}]},
|
||||
}
|
||||
)
|
||||
opts = InventoryOptions()
|
||||
service = {"metadata": {"name": "testsvc"}}
|
||||
inventory.set_vars_from_vmi(hostname, vmi, {"testdomain": service}, opts)
|
||||
|
||||
set_ansible_host_and_port.assert_called_once_with(
|
||||
vmi, hostname, "1.1.1.1", service, opts
|
||||
)
|
||||
63
tests/unit/plugins/inventory/test_kubevirt_setup.py
Normal file
63
tests/unit/plugins/inventory/test_kubevirt_setup.py
Normal file
@@ -0,0 +1,63 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2024 Red Hat, Inc.
|
||||
# Apache License 2.0 (see LICENSE or http://www.apache.org/licenses/LICENSE-2.0)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
import pytest
|
||||
|
||||
from ansible_collections.kubevirt.core.plugins.inventory import (
|
||||
kubevirt,
|
||||
)
|
||||
|
||||
from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import (
|
||||
KubeVirtInventoryException,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"cache",
|
||||
[
|
||||
True,
|
||||
False,
|
||||
],
|
||||
)
|
||||
def test_cache_is_used(mocker, inventory, cache):
|
||||
connections = [{"test-conn": {}}]
|
||||
config_data = {"connections": connections}
|
||||
cache_key = "test-prefix"
|
||||
|
||||
mocker.patch.dict(inventory._cache, {cache_key: {"test-key": "test-value"}})
|
||||
fetch_objects = mocker.patch.object(inventory, "fetch_objects")
|
||||
|
||||
inventory.setup(config_data, cache, cache_key)
|
||||
|
||||
if cache:
|
||||
fetch_objects.assert_not_called()
|
||||
else:
|
||||
fetch_objects.assert_called_once_with(connections)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"present",
|
||||
[
|
||||
True,
|
||||
False,
|
||||
],
|
||||
)
|
||||
def test_k8s_client_missing(mocker, inventory, present):
|
||||
mocker.patch.object(kubevirt, "HAS_K8S_MODULE_HELPER", present)
|
||||
fetch_objects = mocker.patch.object(inventory, "fetch_objects")
|
||||
|
||||
if present:
|
||||
inventory.setup({}, False, "test")
|
||||
fetch_objects.assert_called_once()
|
||||
else:
|
||||
with pytest.raises(
|
||||
KubeVirtInventoryException,
|
||||
match="This module requires the Kubernetes Python client. Try `pip install kubernetes`. Detail: None",
|
||||
):
|
||||
inventory.setup({}, False, "test")
|
||||
fetch_objects.assert_not_called()
|
||||
45
tests/unit/plugins/inventory/test_kubevirt_verify_file.py
Normal file
45
tests/unit/plugins/inventory/test_kubevirt_verify_file.py
Normal file
@@ -0,0 +1,45 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2024 Red Hat, Inc.
|
||||
# Apache License 2.0 (see LICENSE or http://www.apache.org/licenses/LICENSE-2.0)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"file_name,expected",
|
||||
[
|
||||
("inventory.kubevirt.yml", True),
|
||||
("inventory.kubevirt.yaml", True),
|
||||
("something.kubevirt.yml", True),
|
||||
("something.kubevirt.yaml", True),
|
||||
("inventory.somethingelse.yml", False),
|
||||
("inventory.somethingelse.yaml", False),
|
||||
("something.somethingelse.yml", False),
|
||||
("something.somethingelse.yaml", False),
|
||||
],
|
||||
)
|
||||
def test_verify_file(tmp_path, inventory, file_name, expected):
|
||||
file = tmp_path / file_name
|
||||
file.touch()
|
||||
assert inventory.verify_file(str(file)) is expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"file_name",
|
||||
[
|
||||
"inventory.kubevirt.yml",
|
||||
"inventory.kubevirt.yaml",
|
||||
"something.kubevirt.yml",
|
||||
"something.kubevirt.yaml",
|
||||
"inventory.somethingelse.yml",
|
||||
"inventory.somethingelse.yaml",
|
||||
"something.somethingelse.yml",
|
||||
"something.somethingelse.yaml",
|
||||
],
|
||||
)
|
||||
def test_verify_file_bad_config(inventory, file_name):
|
||||
assert inventory.verify_file(file_name) is False
|
||||
Reference in New Issue
Block a user