Merge pull request #119 from 0xFelix/fix-cacheable

fix(inventory): Fix inventory source caching
This commit is contained in:
kubevirt-bot
2024-07-12 11:09:05 +02:00
committed by GitHub
20 changed files with 639 additions and 396 deletions

3
.gitignore vendored
View File

@@ -145,7 +145,10 @@ kubevirt.core
/tests/output/
/tests/integration/inventory
/tests/integration/targets/inventory_kubevirt/all.yml
/tests/integration/targets/inventory_kubevirt/cache_after.yml
/tests/integration/targets/inventory_kubevirt/cache_before.yml
/tests/integration/targets/inventory_kubevirt/empty.yml
/tests/integration/targets/inventory_kubevirt/label.yml
/tests/integration/targets/inventory_kubevirt/net.yml
/tests/integration/targets/kubevirt_vm/files
kubevirt-cache

View File

@@ -0,0 +1,5 @@
---
plugin: kubevirt.core.kubevirt
cache: true
cache_plugin: ansible.builtin.jsonfile
cache_connection: kubevirt-cache

View File

@@ -136,7 +136,6 @@ from typing import (
# potentially print a warning to the user if the client is missing.
try:
from kubernetes.dynamic.exceptions import DynamicApiError
from kubernetes.dynamic.resource import ResourceField
HAS_K8S_MODULE_HELPER = True
K8S_IMPORT_EXCEPTION = None
@@ -147,17 +146,11 @@ except ImportError as e:
Dummy class, mainly used for ansible-test sanity.
"""
class ResourceField:
"""
Dummy class, mainly used for ansible-test sanity.
"""
HAS_K8S_MODULE_HELPER = False
K8S_IMPORT_EXCEPTION = e
from ansible.plugins.inventory import BaseInventoryPlugin, Constructable, Cacheable
from ansible_collections.kubernetes.core.plugins.module_utils.k8s.client import (
get_api_client,
K8SClient,
@@ -193,6 +186,8 @@ class InventoryOptions:
base_domain: Optional[str] = None
append_base_domain: Optional[bool] = None
host_format: Optional[str] = None
namespaces: Optional[List[str]] = None
name: Optional[str] = None
config_data: InitVar[Optional[Dict]] = None
def __post_init__(self, config_data: Optional[Dict]) -> None:
@@ -245,6 +240,12 @@ class InventoryOptions:
if self.host_format is not None
else config_data.get("host_format", "{namespace}-{name}")
)
self.namespaces = (
self.namespaces
if self.namespaces is not None
else config_data.get("namespaces")
)
self.name = self.name if self.name is not None else config_data.get("name")
class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
@@ -258,7 +259,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
snake_case_pattern = re_compile(r"(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])")
@staticmethod
def get_default_host_name(host: str) -> str:
def get_default_hostname(host: str) -> str:
"""
get_default_host_name strips URL schemes from the host name and
replaces invalid characters.
@@ -293,6 +294,20 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
"""
return InventoryModule.snake_case_pattern.sub("_", name).lower()
@staticmethod
def obj_is_valid(obj: Dict) -> bool:
"""
obj_is_valid ensures commonly used keys are present in the passed object.
"""
return bool(
"spec" in obj
and "status" in obj
and "metadata" in obj
and obj["metadata"].get("name")
and obj["metadata"].get("namespace")
and obj["metadata"].get("uid")
)
@staticmethod
def get_host_from_service(service: Dict, node_name: Optional[str]) -> Optional[str]:
"""
@@ -360,9 +375,6 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
return False
def __init__(self) -> None:
super().__init__()
def verify_file(self, path: str) -> None:
"""
verify_file ensures the inventory file is compatible with this plugin.
@@ -375,61 +387,49 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
"""
parse is the main entry point of the inventory.
It checks for availability of the Kubernetes Python client,
gets the configuration and runs fetch_objects or
if there is a cache it is used instead.
gets the configuration, retrieves the cache or runs fetch_objects and
populates the inventory.
"""
super().parse(inventory, loader, path)
cache_key = self._get_cache_prefix(path)
config_data = self._read_config_data(path)
if not HAS_K8S_MODULE_HELPER:
raise KubeVirtInventoryException(
"This module requires the Kubernetes Python client. "
+ f"Try `pip install kubernetes`. Detail: {K8S_IMPORT_EXCEPTION}"
)
source_data = None
if cache and cache_key in self._cache:
try:
source_data = self._cache[cache_key]
except KeyError:
pass
super().parse(inventory, loader, path)
if not source_data:
self.fetch_objects(config_data)
def fetch_objects(self, config_data: Dict) -> None:
"""
fetch_objects populates the inventory with the specified parameters.
"""
if not config_data or not isinstance(config_data, dict):
config_data = {}
config_data = self._read_config_data(path)
cache_key = self.get_cache_key(path)
user_cache_setting = self.get_option("cache")
attempt_to_read_cache = user_cache_setting and cache
cache_needs_update = user_cache_setting and not cache
self.connections_compatibility(config_data)
client = get_api_client(**config_data)
name = config_data.get(
"name", self.get_default_host_name(client.configuration.host)
)
namespaces = (
config_data["namespaces"]
if config_data.get("namespaces")
else self.get_available_namespaces(client)
)
opts = InventoryOptions(config_data=config_data)
if opts.base_domain is None:
opts.base_domain = self.get_cluster_domain(client)
for namespace in namespaces:
self.populate_inventory_from_namespace(client, name, namespace, opts)
results = {}
if attempt_to_read_cache:
try:
results = self._cache[cache_key]
except KeyError:
cache_needs_update = True
if not attempt_to_read_cache or cache_needs_update:
results = self.fetch_objects(get_api_client(**config_data), opts)
if cache_needs_update:
self._cache[cache_key] = results
self.populate_inventory(results, opts)
def connections_compatibility(self, config_data: Dict) -> None:
collection_name = "kubevirt.core"
version_removed_in = "3.0.0"
if (connections := config_data.get("connections")) is None:
return
self.display.deprecated(
msg="The 'connections' parameter is deprecated and now supports only a single list entry.",
version="2.0.0",
version=version_removed_in,
collection_name=collection_name,
)
@@ -446,7 +446,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
config_data[k] = v
self.display.deprecated(
msg="Move all of your connection parameters to the configuration top level.",
version="3.0.0",
version=version_removed_in,
collection_name=collection_name,
)
elif len(connections) > 1:
@@ -457,6 +457,35 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
removed=True,
)
def fetch_objects(self, client: Any, opts: InventoryOptions) -> Dict:
"""
fetch_objects fetches all relevant objects from the K8S API.
"""
namespaces = {}
for namespace in (
opts.namespaces
if opts.namespaces
else self.get_available_namespaces(client)
):
vms = self.get_vms_for_namespace(client, namespace, opts)
vmis = self.get_vmis_for_namespace(client, namespace, opts)
if not vms and not vmis:
# Continue if no VMs and VMIs were found to avoid adding empty groups.
continue
namespaces[namespace] = {
"vms": vms,
"vmis": vmis,
"services": self.get_ssh_services_for_namespace(client, namespace),
}
return {
"default_hostname": self.get_default_hostname(client.configuration.host),
"cluster_domain": self.get_cluster_domain(client),
"namespaces": namespaces,
}
def get_cluster_domain(self, client: K8SClient) -> Optional[str]:
"""
get_cluster_domain tries to get the base domain of an OpenShift cluster.
@@ -479,7 +508,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
def get_resources(
self, client: K8SClient, api_version: str, kind: str, **kwargs
) -> List[ResourceField]:
) -> List[Dict]:
"""
get_resources uses a dynamic K8SClient to fetch resources from the K8S API.
"""
@@ -492,7 +521,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
f"Error fetching {kind} list: {self.format_dynamic_api_exc(exc)}"
) from exc
return result.items
return [item.to_dict() for item in result.items]
def get_available_namespaces(self, client: K8SClient) -> List[str]:
"""
@@ -500,13 +529,14 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
configured credentials and returns them.
"""
return [
namespace.metadata.name
namespace["metadata"]["name"]
for namespace in self.get_resources(client, "v1", "Namespace")
if "metadata" in namespace and "name" in namespace["metadata"]
]
def get_vms_for_namespace(
self, client: K8SClient, namespace: str, opts: InventoryOptions
) -> List[ResourceField]:
) -> List[Dict]:
"""
get_vms_for_namespace returns a list of all VirtualMachines in a namespace.
"""
@@ -520,7 +550,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
def get_vmis_for_namespace(
self, client: K8SClient, namespace: str, opts: InventoryOptions
) -> List[ResourceField]:
) -> List[Dict]:
"""
get_vmis_for_namespace returns a list of all VirtualMachineInstances in a namespace.
"""
@@ -537,7 +567,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
get_ssh_services_for_namespace retrieves all services of a namespace exposing port 22/ssh.
The services are mapped to the name of the corresponding domain.
"""
service_list = self.get_resources(
items = self.get_resources(
client,
"v1",
"Service",
@@ -545,12 +575,12 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
)
services = {}
for service in service_list:
for service in items:
# Continue if service is not of type LoadBalancer or NodePort
if service.get("spec") is None:
if not (spec := service.get("spec")):
continue
if service["spec"].get("type") not in (
if spec.get("type") not in (
TYPE_LOADBALANCER,
TYPE_NODEPORT,
):
@@ -558,40 +588,58 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
# Continue if ports are not defined, there are more than one port mapping
# or the target port is not port 22/ssh
ports = service["spec"].get("ports")
if ports is None or len(ports) != 1 or ports[0].get("targetPort") != 22:
if (
(ports := spec.get("ports")) is None
or len(ports) != 1
or ports[0].get("targetPort") != 22
):
continue
# Only add the service to the dict if the domain selector is present
domain = service["spec"].get("selector", {}).get(LABEL_KUBEVIRT_IO_DOMAIN)
if domain is not None:
if domain := spec.get("selector", {}).get(LABEL_KUBEVIRT_IO_DOMAIN):
services[domain] = service
return services
def populate_inventory(self, results: Dict, opts: InventoryOptions) -> None:
"""
populate_inventory populates the inventory by completing the InventoryOptions
and invoking populate_inventory_from_namespace for every namespace in results.
"""
if opts.base_domain is None:
opts.base_domain = results["cluster_domain"]
if opts.name is None:
opts.name = results["default_hostname"]
for namespace, data in results["namespaces"].items():
self.populate_inventory_from_namespace(namespace, data, opts)
def populate_inventory_from_namespace(
self, client: K8SClient, name: str, namespace: str, opts: InventoryOptions
self, namespace: str, data: Dict, opts: InventoryOptions
) -> None:
"""
populate_inventory_from_namespace adds groups and hosts from a
namespace to the inventory.
"""
vms = {
vm.metadata.name: vm
for vm in self.get_vms_for_namespace(client, namespace, opts)
vm["metadata"]["name"]: vm for vm in data["vms"] if self.obj_is_valid(vm)
}
vmis = {
vmi.metadata.name: vmi
for vmi in self.get_vmis_for_namespace(client, namespace, opts)
vmi["metadata"]["name"]: vmi
for vmi in data["vmis"]
if self.obj_is_valid(vmi)
}
if not vms and not vmis:
# Return early if no VMs and VMIs were found to avoid adding empty groups.
return
services = self.get_ssh_services_for_namespace(client, namespace)
services = {
domain: service
for domain, service in data["services"].items()
if self.obj_is_valid(service)
}
name = self._sanitize_group_name(name)
name = self._sanitize_group_name(opts.name)
namespace_group = self._sanitize_group_name(f"namespace_{namespace}")
self.inventory.add_group(name)
@@ -600,7 +648,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
# Add found VMs and optionally enhance with VMI data
for name, vm in vms.items():
hostname = self.add_host(vm, opts.host_format, namespace_group)
hostname = self.add_host(vm["metadata"], opts.host_format, namespace_group)
self.set_vars_from_vm(hostname, vm, opts)
if name in vmis:
self.set_vars_from_vmi(hostname, vmis[name], services, opts)
@@ -610,36 +658,32 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
for name, vmi in vmis.items():
if name in vms:
continue
hostname = self.add_host(vmi, opts.host_format, namespace_group)
hostname = self.add_host(vmi["metadata"], opts.host_format, namespace_group)
self.set_vars_from_vmi(hostname, vmi, services, opts)
self.set_composable_vars(hostname)
def add_host(
self, obj: ResourceField, host_format: str, namespace_group: str
) -> str:
def add_host(self, metadata: Dict, host_format: str, namespace_group: str) -> str:
"""
add_hosts adds a host to the inventory.
"""
hostname = host_format.format(
namespace=obj.metadata.namespace,
name=obj.metadata.name,
uid=obj.metadata.uid,
namespace=metadata["namespace"],
name=metadata["name"],
uid=metadata["uid"],
)
self.inventory.add_host(hostname)
self.inventory.add_child(namespace_group, hostname)
return hostname
def set_vars_from_vm(
self, hostname: str, vm: ResourceField, opts: InventoryOptions
) -> None:
def set_vars_from_vm(self, hostname: str, vm: Dict, opts: InventoryOptions) -> None:
"""
set_vars_from_vm sets inventory variables from a VM prefixed with vm_.
"""
self.set_common_vars(hostname, "vm", vm, opts)
def set_vars_from_vmi(
self, hostname: str, vmi: ResourceField, services: Dict, opts: InventoryOptions
self, hostname: str, vmi: Dict, services: Dict, opts: InventoryOptions
) -> None:
"""
set_vars_from_vmi sets inventory variables from a VMI prefixed with vmi_ and
@@ -647,76 +691,76 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
"""
self.set_common_vars(hostname, "vmi", vmi, opts)
if not (interfaces := vmi["status"].get("interfaces")):
return
if opts.network_name is None:
# Use first interface
interface = vmi.status.interfaces[0] if vmi.status.interfaces else None
interface = interfaces[0]
else:
# Find interface by its name
interface = next(
(i for i in vmi.status.interfaces if i.name == opts.network_name),
(i for i in interfaces if i.get("name") == opts.network_name),
None,
)
# If interface is not found or IP address is not reported skip this VMI
if interface is None or interface.ipAddress is None:
if not interface or not interface.get("ipAddress"):
return
# Set up the connection
service = None
if self.is_windows(
{} if not vmi.status.guestOSInfo else vmi.status.guestOSInfo.to_dict(),
{} if not vmi.metadata.annotations else vmi.metadata.annotations.to_dict(),
vmi["status"].get("guestOSInfo", {}),
vmi["metadata"].get("annotations", {}),
):
self.inventory.set_variable(hostname, "ansible_connection", "winrm")
else:
service = services.get(vmi.metadata.labels.get(LABEL_KUBEVIRT_IO_DOMAIN))
service = services.get(
vmi["metadata"].get("labels", {}).get(LABEL_KUBEVIRT_IO_DOMAIN)
)
self.set_ansible_host_and_port(
vmi,
hostname,
interface.ipAddress,
interface["ipAddress"],
service,
opts,
)
def set_common_vars(
self, hostname: str, prefix: str, obj: ResourceField, opts: InventoryOptions
self, hostname: str, prefix: str, obj: Dict, opts: InventoryOptions
):
"""
set_common_vars sets common inventory variables from VMs or VMIs.
"""
# Add hostvars from metadata
if metadata := obj.metadata:
if metadata.annotations:
self.inventory.set_variable(
hostname, f"{prefix}_annotations", metadata.annotations.to_dict()
)
if metadata.labels:
self.inventory.set_variable(
hostname, f"{prefix}_labels", metadata.labels.to_dict()
)
# Create label groups and add vm to it if enabled
if opts.create_groups:
self.set_groups_from_labels(hostname, metadata.labels)
if metadata.resourceVersion:
self.inventory.set_variable(
hostname, f"{prefix}_resource_version", metadata.resourceVersion
)
if metadata.uid:
self.inventory.set_variable(hostname, f"{prefix}_uid", metadata.uid)
if annotations := obj["metadata"].get("annotations"):
self.inventory.set_variable(hostname, f"{prefix}_annotations", annotations)
if labels := obj["metadata"].get("labels"):
self.inventory.set_variable(hostname, f"{prefix}_labels", labels)
# Create label groups and add vm to it if enabled
if opts.create_groups:
self.set_groups_from_labels(hostname, labels)
if resource_version := obj["metadata"].get("resourceVersion"):
self.inventory.set_variable(
hostname, f"{prefix}_resource_version", resource_version
)
if uid := obj["metadata"].get("uid"):
self.inventory.set_variable(hostname, f"{prefix}_uid", uid)
# Add hostvars from status
if obj.status:
for key, value in obj.status.to_dict().items():
name = self.format_var_name(key)
self.inventory.set_variable(hostname, f"{prefix}_{name}", value)
for key, value in obj["status"].items():
self.inventory.set_variable(
hostname, f"{prefix}_{self.format_var_name(key)}", value
)
def set_groups_from_labels(self, hostname: str, labels: ResourceField) -> None:
def set_groups_from_labels(self, hostname: str, labels: Dict) -> None:
"""
set_groups_from_labels adds groups for each label of a VM or VMI and
adds the host to each group.
"""
groups = []
for key, value in labels.to_dict().items():
for key, value in labels.items():
group_name = self._sanitize_group_name(f"label_{key}_{value}")
if group_name not in groups:
groups.append(group_name)
@@ -727,7 +771,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
def set_ansible_host_and_port(
self,
vmi: ResourceField,
vmi: Dict,
hostname: str,
ip_address: str,
service: Optional[Dict],
@@ -742,15 +786,13 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
if opts.kube_secondary_dns and opts.network_name:
# Set ansible_host to the kubesecondarydns derived host name if enabled
# See https://github.com/kubevirt/kubesecondarydns#parameters
ansible_host = (
f"{opts.network_name}.{vmi.metadata.name}.{vmi.metadata.namespace}.vm"
)
ansible_host = f"{opts.network_name}.{vmi['metadata']['name']}.{vmi['metadata']['namespace']}.vm"
if opts.base_domain:
ansible_host += f".{opts.base_domain}"
elif opts.use_service and service and not opts.network_name:
# Set ansible_host and ansible_port to the host and port from the LoadBalancer
# or NodePort service exposing SSH
node_name = vmi.status.nodeName
node_name = vmi["status"].get("nodeName")
if node_name and opts.append_base_domain and opts.base_domain:
node_name += f".{opts.base_domain}"
host = self.get_host_from_service(service, node_name)

View File

@@ -1,4 +1,3 @@
addict
pytest
pytest-ansible
pytest-mock

View File

@@ -10,9 +10,12 @@ ansible-inventory -i test.kubevirt.yml -y --list --output empty.yml "$@"
ansible-playbook playbook.yml "$@"
ansible-inventory -i test.kubevirt.yml -y --list --output all.yml "$@"
ansible-inventory -i test.cache.kubevirt.yml -y --list --output cache_before.yml "$@"
ansible-inventory -i test.label.kubevirt.yml -y --list --output label.yml "$@"
ansible-inventory -i test.net.kubevirt.yml -y --list --output net.yml "$@"
ansible-playbook cleanup.yml "$@"
ansible-inventory -i test.cache.kubevirt.yml -y --list --output cache_after.yml "$@"
ansible-playbook verify.yml "$@"

View File

@@ -0,0 +1,9 @@
---
plugin: kubevirt.core.kubevirt
name: test
namespaces:
- default
create_groups: true
cache: true
cache_plugin: ansible.builtin.jsonfile
cache_connection: kubevirt-cache

View File

@@ -38,3 +38,16 @@
ansible.builtin.assert:
that:
- inv_net['all']['children']['label_app_test']['hosts'] | length == 1
- name: Read cached inventory after VM creation
ansible.builtin.include_vars:
file: cache_before.yml
name: inv_cache_before
- name: Read cached inventory after VM deletion
ansible.builtin.include_vars:
file: cache_after.yml
name: inv_cache_after
- name: Assert cached inventories are populated
ansible.builtin.assert:
that:
- inv_cache_before == inv_all
- inv_cache_before == inv_cache_after

View File

@@ -8,7 +8,6 @@ __metaclass__ = type
import pytest
from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import (
InventoryOptions,
)
@@ -22,7 +21,9 @@ BASE_VMI = {
"metadata": {
"name": "testvmi",
"namespace": "default",
"uid": "e86c603c-fb13-4933-bf67-de100bdba0c3",
},
"spec": {},
"status": {
"interfaces": [{"ipAddress": "10.10.10.10"}],
},
@@ -58,19 +59,25 @@ WINDOWS_VMI_4 = merge_dicts(
@pytest.mark.parametrize(
"client,vmi,expected",
"vmi,expected",
[
({"vmis": [BASE_VMI]}, BASE_VMI, False),
({"vmis": [WINDOWS_VMI_1]}, WINDOWS_VMI_1, True),
({"vmis": [WINDOWS_VMI_2]}, WINDOWS_VMI_2, True),
({"vmis": [WINDOWS_VMI_3]}, WINDOWS_VMI_3, True),
({"vmis": [WINDOWS_VMI_4]}, WINDOWS_VMI_4, True),
(BASE_VMI, False),
(WINDOWS_VMI_1, True),
(WINDOWS_VMI_2, True),
(WINDOWS_VMI_3, True),
(WINDOWS_VMI_4, True),
],
indirect=["client"],
)
def test_ansible_connection_winrm(inventory, hosts, client, vmi, expected):
inventory.populate_inventory_from_namespace(
client, "", DEFAULT_NAMESPACE, InventoryOptions()
def test_ansible_connection_winrm(inventory, hosts, vmi, expected):
inventory.populate_inventory(
{
"default_hostname": "test",
"cluster_domain": "test.com",
"namespaces": {
"default": {"vms": [], "vmis": [vmi], "services": {}},
},
},
InventoryOptions(),
)
host = f"{DEFAULT_NAMESPACE}-{vmi['metadata']['name']}"

View File

@@ -6,8 +6,6 @@ from __future__ import absolute_import, division, print_function
__metaclass__ = type
import pytest
from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import (
InventoryOptions,
)
@@ -21,7 +19,9 @@ VMI = {
"metadata": {
"name": "testvmi",
"namespace": "default",
"uid": "6ffdef43-6c39-4441-a088-82d319ea5c13",
},
"spec": {},
"status": {
"interfaces": [{"ipAddress": "10.10.10.10"}],
"migrationMethod": "BlockMigration",
@@ -34,16 +34,10 @@ VMI = {
}
@pytest.mark.parametrize(
"client",
[{"vmis": [VMI]}],
indirect=["client"],
)
def test_set_composable_vars(
inventory,
groups,
hosts,
client,
):
inventory._options = {
"compose": {"set_from_another_var": "vmi_node_name"},
@@ -51,8 +45,15 @@ def test_set_composable_vars(
"keyed_groups": [{"prefix": "fedora", "key": "vmi_guest_os_info.versionId"}],
"strict": True,
}
inventory.populate_inventory_from_namespace(
client, "", DEFAULT_NAMESPACE, InventoryOptions()
inventory.populate_inventory(
{
"default_hostname": "test",
"cluster_domain": "test.com",
"namespaces": {
"default": {"vms": [], "vmis": [VMI], "services": {}},
},
},
InventoryOptions(),
)
host = f"{DEFAULT_NAMESPACE}-testvmi"

View File

@@ -6,17 +6,10 @@ from __future__ import absolute_import, division, print_function
__metaclass__ = type
import pytest
from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import (
InventoryOptions,
)
from ansible_collections.kubevirt.core.tests.unit.plugins.inventory.constants import (
DEFAULT_NAMESPACE,
)
VM1 = {
"metadata": {
"name": "testvm1",
@@ -24,6 +17,7 @@ VM1 = {
"uid": "940003aa-0160-4b7e-9e55-8ec3df72047f",
},
"spec": {"running": True},
"status": {},
}
VM2 = {
@@ -33,6 +27,7 @@ VM2 = {
"uid": "c2c68de5-b9d7-4c25-872f-462e7245b3e6",
},
"spec": {"running": False},
"status": {},
}
VMI1 = {
@@ -41,22 +36,23 @@ VMI1 = {
"namespace": "default",
"uid": "a84319a9-db31-4a36-9b66-3e387578f871",
},
"spec": {},
"status": {
"interfaces": [{"ipAddress": "10.10.10.10"}],
},
}
@pytest.mark.parametrize(
"client",
[
({"vms": [VM1, VM2], "vmis": [VMI1]}),
],
indirect=["client"],
)
def test_stopped_vm(inventory, hosts, client):
inventory.populate_inventory_from_namespace(
client, "", DEFAULT_NAMESPACE, InventoryOptions()
def test_stopped_vm(inventory, hosts):
inventory.populate_inventory(
{
"default_hostname": "test",
"cluster_domain": "test.com",
"namespaces": {
"default": {"vms": [VM1, VM2], "vmis": [VMI1], "services": {}},
},
},
InventoryOptions(),
)
# The running VM should be present with ansible_host or ansible_port

View File

@@ -8,7 +8,7 @@ __metaclass__ = type
import pytest
from addict import Dict
from kubernetes.dynamic.resource import ResourceField
from ansible.template import Templar
@@ -96,21 +96,21 @@ def client(mocker, request):
items = param["namespaces"]
else:
items = [{"metadata": {"name": DEFAULT_NAMESPACE}}]
namespaces.items = [Dict(item) for item in items]
namespaces.items = [ResourceField(item) for item in items]
vms = mocker.Mock()
vms.items = [Dict(item) for item in param.get("vms", [])]
vms.items = [ResourceField(item) for item in param.get("vms", [])]
vmis = mocker.Mock()
vmis.items = [Dict(item) for item in param.get("vmis", [])]
vmis.items = [ResourceField(item) for item in param.get("vmis", [])]
services = mocker.Mock()
services.items = [Dict(item) for item in param.get("services", [])]
services.items = [ResourceField(item) for item in param.get("services", [])]
dns = mocker.Mock()
if "base_domain" in param:
base_domain = param["base_domain"]
else:
base_domain = DEFAULT_BASE_DOMAIN
dns_obj = Dict({"spec": {"baseDomain": base_domain}})
dns_obj = ResourceField({"spec": {"baseDomain": base_domain}})
dns.items = [dns_obj]
namespace_client = mocker.Mock()

View File

@@ -8,10 +8,9 @@ __metaclass__ = type
import pytest
from addict import Dict
from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import (
InventoryModule,
InventoryOptions,
)
from ansible_collections.kubevirt.core.tests.unit.plugins.inventory.constants import (
@@ -29,8 +28,8 @@ from ansible_collections.kubevirt.core.tests.unit.plugins.inventory.constants im
("https://example.com:8080", "example-com_8080"),
],
)
def test_get_default_host_name(host, expected):
assert InventoryModule.get_default_host_name(host) == expected
def test_get_default_hostname(host, expected):
assert InventoryModule.get_default_hostname(host) == expected
@pytest.mark.parametrize(
@@ -48,6 +47,55 @@ def test_format_var_name(name, expected):
assert InventoryModule.format_var_name(name) == expected
@pytest.mark.parametrize(
"obj,expected",
[
({}, False),
({"spec": {}}, False),
({"status": {}}, False),
({"metadata": {}}, False),
({"spec": {}, "status": {}}, False),
({"spec": {}, "metadata": {}}, False),
({"status": {}, "metadata": {}}, False),
({"spec": {}, "status": {}, "metadata": {}}, False),
({"spec": {}, "status": {}, "metadata": {}, "something": {}}, False),
({"spec": {}, "status": {}, "metadata": {"name": "test"}}, False),
({"spec": {}, "status": {}, "metadata": {"namespace": "test"}}, False),
({"spec": {}, "status": {}, "metadata": {"uid": "test"}}, False),
(
{
"spec": {},
"status": {},
"metadata": {"name": "test", "namespace": "test"},
},
False,
),
(
{
"spec": {},
"status": {},
"metadata": {"name": "test", "namespace": "test", "something": "test"},
},
False,
),
(
{"spec": {}, "status": {}, "metadata": {"name": "test", "uid": "test"}},
False,
),
(
{
"spec": {},
"status": {},
"metadata": {"name": "test", "namespace": "test", "uid": "test"},
},
True,
),
],
)
def test_obj_is_valid(obj, expected):
assert InventoryModule.obj_is_valid(obj) == expected
@pytest.mark.parametrize(
"service,node_name,expected",
[
@@ -210,6 +258,56 @@ def test_get_cluster_domain(inventory, client):
assert inventory.get_cluster_domain(client) == DEFAULT_BASE_DOMAIN
@pytest.mark.parametrize(
"results,expected",
[
(
{
"cluster_domain": "example.com",
"default_hostname": "test",
"namespaces": {},
},
0,
),
(
{
"cluster_domain": "example.com",
"default_hostname": "test",
"namespaces": {"test": {"vms": [], "vmis": [], "services": {}}},
},
1,
),
(
{
"cluster_domain": "example.com",
"default_hostname": "test",
"namespaces": {
"test": {"vms": [], "vmis": [], "services": {}},
"test2": {"vms": [], "vmis": [], "services": {}},
},
},
2,
),
],
)
def test_populate_inventory(mocker, inventory, results, expected):
populate_inventory_from_namespace = mocker.patch.object(
inventory, "populate_inventory_from_namespace"
)
inventory.populate_inventory(results, InventoryOptions())
opts = InventoryOptions(
base_domain=results["cluster_domain"], name=results["default_hostname"]
)
calls = [
mocker.call(namespace, data, opts)
for namespace, data in results["namespaces"].items()
]
populate_inventory_from_namespace.assert_has_calls(calls)
assert len(calls) == expected
@pytest.mark.parametrize(
"labels,expected",
[
@@ -223,7 +321,7 @@ def test_get_cluster_domain(inventory, client):
)
def test_set_groups_from_labels(inventory, groups, labels, expected):
hostname = "default-testvm"
inventory.set_groups_from_labels(hostname, Dict(labels))
inventory.set_groups_from_labels(hostname, labels)
for group in expected:
assert group in groups
assert hostname in groups[group]["children"]

View File

@@ -8,8 +8,6 @@ __metaclass__ = type
import pytest
from addict import Dict
from ansible_collections.kubevirt.core.tests.unit.plugins.inventory.constants import (
DEFAULT_NAMESPACE,
)
@@ -65,15 +63,11 @@ def test_add_host(inventory, groups, hosts, host_format, expected):
inventory.inventory.add_group(namespace_group)
inventory.add_host(
Dict(
{
"metadata": {
"name": "testvm",
"namespace": DEFAULT_NAMESPACE,
"uid": "f8abae7c-d792-4b9b-af95-62d322ae5bc1",
}
}
),
{
"name": "testvm",
"namespace": DEFAULT_NAMESPACE,
"uid": "f8abae7c-d792-4b9b-af95-62d322ae5bc1",
},
host_format,
namespace_group,
)

View File

@@ -17,127 +17,93 @@ from ansible_collections.kubevirt.core.tests.unit.plugins.inventory.constants im
DEFAULT_NAMESPACE,
)
from ansible_collections.kubevirt.core.plugins.inventory import kubevirt
@pytest.mark.parametrize(
"config_data,expected",
"opts,namespaces",
[
(
None,
{
"name": "default-hostname",
"namespaces": [DEFAULT_NAMESPACE],
"opts": InventoryOptions(),
},
InventoryOptions(),
[DEFAULT_NAMESPACE],
),
(
{
"name": "test",
},
{
"name": "test",
"namespaces": [DEFAULT_NAMESPACE],
"opts": InventoryOptions(),
},
InventoryOptions(namespaces=["test"]),
["test"],
),
(
{"name": "test", "namespaces": ["test"]},
{"name": "test", "namespaces": ["test"], "opts": InventoryOptions()},
),
(
{
"name": "test",
"namespaces": ["test"],
"use_service": True,
"create_groups": True,
"append_base_domain": True,
"base_domain": "test-domain",
},
{
"name": "test",
"namespaces": ["test"],
"opts": InventoryOptions(
use_service=True,
create_groups=True,
append_base_domain=True,
base_domain="test-domain",
),
},
),
(
{
"name": "test",
"namespaces": ["test"],
"use_service": True,
"create_groups": True,
"append_base_domain": True,
"base_domain": "test-domain",
"network_name": "test-network",
},
{
"name": "test",
"namespaces": ["test"],
"opts": InventoryOptions(
use_service=True,
create_groups=True,
append_base_domain=True,
base_domain="test-domain",
network_name="test-network",
),
},
),
(
{
"name": "test",
"namespaces": ["test"],
"use_service": True,
"create_groups": True,
"append_base_domain": True,
"base_domain": "test-domain",
"interface_name": "test-interface",
},
{
"name": "test",
"namespaces": ["test"],
"opts": InventoryOptions(
use_service=True,
create_groups=True,
append_base_domain=True,
base_domain="test-domain",
network_name="test-interface",
),
},
InventoryOptions(namespaces=["test1", "test2"]),
["test1", "test2"],
),
],
)
def test_fetch_objects(mocker, inventory, config_data, expected):
mocker.patch.object(kubevirt, "get_api_client")
mocker.patch.object(
inventory, "get_default_host_name", return_value="default-hostname"
)
cluster_domain = "test.com"
mocker.patch.object(inventory, "get_cluster_domain", return_value=cluster_domain)
expected["opts"].base_domain = expected["opts"].base_domain or cluster_domain
def test_fetch_objects(mocker, inventory, opts, namespaces):
get_available_namespaces = mocker.patch.object(
inventory, "get_available_namespaces", return_value=[DEFAULT_NAMESPACE]
)
populate_inventory_from_namespace = mocker.patch.object(
inventory, "populate_inventory_from_namespace"
get_vms_for_namespace = mocker.patch.object(
inventory, "get_vms_for_namespace", return_value=[{}]
)
get_vmis_for_namespace = mocker.patch.object(
inventory, "get_vmis_for_namespace", return_value=[{}]
)
get_ssh_services_for_namespace = mocker.patch.object(
inventory, "get_ssh_services_for_namespace", return_value=[]
)
get_default_hostname = mocker.patch.object(
inventory, "get_default_hostname", return_value="default-hostname"
)
get_cluster_domain = mocker.patch.object(
inventory, "get_cluster_domain", return_value="test.com"
)
inventory.fetch_objects(config_data)
inventory.fetch_objects(mocker.Mock(), opts)
if config_data and "namespaces" in config_data:
if opts.namespaces:
get_available_namespaces.assert_not_called()
else:
get_available_namespaces.assert_called()
populate_inventory_from_namespace.assert_has_calls(
[
mocker.call(mocker.ANY, expected["name"], namespace, expected["opts"])
for namespace in expected["namespaces"]
]
get_vms_for_namespace.assert_has_calls(
[mocker.call(mocker.ANY, namespace, opts) for namespace in namespaces]
)
get_vmis_for_namespace.assert_has_calls(
[mocker.call(mocker.ANY, namespace, opts) for namespace in namespaces]
)
get_ssh_services_for_namespace.assert_has_calls(
[mocker.call(mocker.ANY, namespace) for namespace in namespaces]
)
get_default_hostname.assert_called_once()
get_cluster_domain.assert_called_once()
def test_fetch_objects_early_return(mocker, inventory):
get_available_namespaces = mocker.patch.object(
inventory, "get_available_namespaces", return_value=[DEFAULT_NAMESPACE]
)
get_vms_for_namespace = mocker.patch.object(
inventory, "get_vms_for_namespace", return_value=[]
)
get_vmis_for_namespace = mocker.patch.object(
inventory, "get_vmis_for_namespace", return_value=[]
)
get_ssh_services_for_namespace = mocker.patch.object(
inventory, "get_ssh_services_for_namespace"
)
get_default_hostname = mocker.patch.object(
inventory, "get_default_hostname", return_value="default-hostname"
)
get_cluster_domain = mocker.patch.object(
inventory, "get_cluster_domain", return_value="test.com"
)
inventory.fetch_objects(mocker.Mock(), InventoryOptions())
get_available_namespaces.assert_called_once()
get_vms_for_namespace.assert_called_once_with(
mocker.ANY, DEFAULT_NAMESPACE, InventoryOptions()
)
get_vmis_for_namespace.assert_called_once_with(
mocker.ANY, DEFAULT_NAMESPACE, InventoryOptions()
)
get_ssh_services_for_namespace.assert_not_called()
get_default_hostname.assert_called_once()
get_cluster_domain.assert_called_once()

View File

@@ -13,39 +13,151 @@ from ansible_collections.kubevirt.core.plugins.inventory import (
)
from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import (
InventoryOptions,
KubeVirtInventoryException,
)
@pytest.mark.parametrize(
"cache",
"config_data,expected",
[
True,
False,
(
{},
InventoryOptions(),
),
(
{
"name": "test",
},
InventoryOptions(name="test"),
),
(
{"name": "test", "namespaces": ["test"]},
InventoryOptions(name="test", namespaces=["test"]),
),
(
{
"name": "test",
"namespaces": ["test"],
"use_service": True,
"create_groups": True,
"append_base_domain": True,
"base_domain": "test-domain",
},
InventoryOptions(
use_service=True,
create_groups=True,
append_base_domain=True,
base_domain="test-domain",
name="test",
namespaces=["test"],
),
),
(
{
"name": "test",
"namespaces": ["test"],
"use_service": True,
"create_groups": True,
"append_base_domain": True,
"base_domain": "test-domain",
"network_name": "test-network",
},
InventoryOptions(
use_service=True,
create_groups=True,
append_base_domain=True,
base_domain="test-domain",
network_name="test-network",
name="test",
namespaces=["test"],
),
),
(
{
"name": "test",
"namespaces": ["test"],
"use_service": True,
"create_groups": True,
"append_base_domain": True,
"base_domain": "test-domain",
"interface_name": "test-interface",
},
InventoryOptions(
use_service=True,
create_groups=True,
append_base_domain=True,
base_domain="test-domain",
network_name="test-interface",
name="test",
namespaces=["test"],
),
),
],
)
def test_parse(mocker, inventory, cache):
path = "/testpath"
cache_prefix = "test-prefix"
config_data = {"host_format": "test-format"}
mocker.patch.dict(inventory._cache, {cache_prefix: {"test-key": "test-value"}})
get_cache_prefix = mocker.patch.object(
inventory, "_get_cache_prefix", return_value=cache_prefix
)
def test_config_data_to_opts(mocker, inventory, config_data, expected):
read_config_data = mocker.patch.object(
inventory, "_read_config_data", return_value=config_data
)
mocker.patch.object(inventory, "get_cache_key")
mocker.patch.object(inventory, "get_option")
mocker.patch.object(kubevirt, "get_api_client")
mocker.patch.object(inventory, "fetch_objects")
populate_inventory = mocker.patch.object(inventory, "populate_inventory")
inventory.parse(None, None, "", False)
populate_inventory.assert_called_once_with(mocker.ANY, expected)
@pytest.mark.parametrize(
"cache_parse,cache_option,cache_data,expected",
[
(True, True, {"test-key": {"something": "something"}}, True),
(None, True, {"test-key": {"something": "something"}}, True),
(False, True, {"test-key": {"something": "something"}}, False),
(True, False, {"test-key": {"something": "something"}}, False),
(None, False, {"test-key": {"something": "something"}}, False),
(False, False, {"test-key": {"something": "something"}}, False),
(True, True, {"test-key2": {"something": "something"}}, False),
(None, True, {"test-key2": {"something": "something"}}, False),
],
)
def test_use_of_cache(
mocker, inventory, cache_parse, cache_option, cache_data, expected
):
path = "/testpath"
config_data = {"host_format": "test-format"}
mocker.patch.dict(inventory._cache, cache_data)
read_config_data = mocker.patch.object(
inventory, "_read_config_data", return_value=config_data
)
get_cache_key = mocker.patch.object(
inventory, "get_cache_key", return_value="test-key"
)
get_option = mocker.patch.object(inventory, "get_option", return_value=cache_option)
get_api_client = mocker.patch.object(kubevirt, "get_api_client")
fetch_objects = mocker.patch.object(inventory, "fetch_objects")
populate_inventory = mocker.patch.object(inventory, "populate_inventory")
inventory.parse(None, None, path, cache)
if cache_parse is None:
inventory.parse(None, None, path)
else:
inventory.parse(None, None, path, cache_parse)
get_cache_prefix.assert_called_once_with(path)
opts = InventoryOptions(config_data=config_data)
get_cache_key.assert_called_once_with(path)
get_option.assert_called_once_with("cache")
read_config_data.assert_called_once_with(path)
if cache:
if expected:
get_api_client.assert_not_called()
fetch_objects.assert_not_called()
else:
fetch_objects.assert_called_once_with(config_data)
get_api_client.assert_called_once_with(**config_data)
fetch_objects.assert_called_once_with(mocker.ANY, opts)
populate_inventory.assert_called_once_with(mocker.ANY, opts)
@pytest.mark.parametrize(
@@ -57,8 +169,10 @@ def test_parse(mocker, inventory, cache):
)
def test_k8s_client_missing(mocker, inventory, present):
mocker.patch.object(kubevirt, "HAS_K8S_MODULE_HELPER", present)
mocker.patch.object(inventory, "_get_cache_prefix")
mocker.patch.object(inventory, "_read_config_data")
mocker.patch.object(kubevirt, "get_api_client")
mocker.patch.object(inventory, "_read_config_data", return_value={})
mocker.patch.object(inventory, "get_cache_key")
mocker.patch.object(inventory, "get_option")
fetch_objects = mocker.patch.object(inventory, "fetch_objects")
if present:

View File

@@ -8,9 +8,6 @@ __metaclass__ = type
import pytest
from addict import Dict
from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import (
InventoryOptions,
)
@@ -25,6 +22,8 @@ VM1 = {
"namespace": DEFAULT_NAMESPACE,
"uid": "940003aa-0160-4b7e-9e55-8ec3df72047f",
},
"spec": {},
"status": {},
}
VM2 = {
@@ -33,6 +32,8 @@ VM2 = {
"namespace": DEFAULT_NAMESPACE,
"uid": "c2c68de5-b9d7-4c25-872f-462e7245b3e6",
},
"spec": {},
"status": {},
}
VMI1 = {
@@ -41,6 +42,7 @@ VMI1 = {
"namespace": DEFAULT_NAMESPACE,
"uid": "a84319a9-db31-4a36-9b66-3e387578f871",
},
"spec": {},
"status": {
"interfaces": [{"ipAddress": "10.10.10.10"}],
},
@@ -52,6 +54,7 @@ VMI2 = {
"namespace": DEFAULT_NAMESPACE,
"uid": "fd35700a-9cbe-488b-8f32-7adbe57eadc2",
},
"spec": {},
"status": {
"interfaces": [{"ipAddress": "10.10.10.10"}],
},
@@ -78,24 +81,25 @@ VMI2 = {
def test_populate_inventory_from_namespace(
mocker, inventory, groups, vms, vmis, expected
):
_vms = {vm["metadata"]["name"]: Dict(vm) for vm in vms}
_vmis = {vmi["metadata"]["name"]: Dict(vmi) for vmi in vmis}
opts = InventoryOptions()
_vms = {vm["metadata"]["name"]: vm for vm in vms}
_vmis = {vmi["metadata"]["name"]: vmi for vmi in vmis}
opts = InventoryOptions(name="test")
def format_hostname(obj):
return opts.host_format.format(
namespace=obj.metadata.namespace,
name=obj.metadata.name,
uid=obj.metadata.uid,
namespace=obj["metadata"]["namespace"],
name=obj["metadata"]["name"],
uid=obj["metadata"]["uid"],
)
def add_host_call(obj):
return mocker.call(
obj,
obj["metadata"],
opts.host_format,
f"namespace_{DEFAULT_NAMESPACE}",
)
obj_is_valid_calls = []
add_host_side_effects = []
add_host_calls = []
set_vars_from_vm_calls = []
@@ -105,33 +109,27 @@ def test_populate_inventory_from_namespace(
# For each VM add the expected calls
# Also add expected calls for VMIs for which a VM exists
for name, vm in _vms.items():
obj_is_valid_calls.append(mocker.call(vm))
hostname = format_hostname(vm)
add_host_side_effects.append(hostname)
add_host_calls.append(add_host_call(vm))
set_vars_from_vm_calls.append(mocker.call(hostname, vm, opts))
if name in _vmis.keys():
set_vars_from_vmi_calls.append(mocker.call(hostname, _vmis[name], [], opts))
set_vars_from_vmi_calls.append(mocker.call(hostname, _vmis[name], {}, opts))
set_composable_vars_calls.append(mocker.call(hostname))
# For each VMI add the expected calls
# Do not add for VMIs for which a VM exists
for name, vmi in _vmis.items():
obj_is_valid_calls.append(mocker.call(vmi))
if name not in _vms.keys():
hostname = format_hostname(vmi)
add_host_side_effects.append(hostname)
add_host_calls.append(add_host_call(vmi))
set_vars_from_vmi_calls.append(mocker.call(hostname, vmi, [], opts))
set_vars_from_vmi_calls.append(mocker.call(hostname, vmi, {}, opts))
set_composable_vars_calls.append(mocker.call(hostname))
get_vms_for_namespace = mocker.patch.object(
inventory, "get_vms_for_namespace", return_value=_vms.values()
)
get_vmis_for_namespace = mocker.patch.object(
inventory, "get_vmis_for_namespace", return_value=_vmis.values()
)
get_ssh_services_for_namespace = mocker.patch.object(
inventory, "get_ssh_services_for_namespace", return_value=[]
)
obj_is_valid = mocker.patch.object(inventory, "obj_is_valid", return_value=True)
add_host = mocker.patch.object(
inventory, "add_host", side_effect=add_host_side_effects
)
@@ -139,23 +137,20 @@ def test_populate_inventory_from_namespace(
set_vars_from_vmi = mocker.patch.object(inventory, "set_vars_from_vmi")
set_composable_vars = mocker.patch.object(inventory, "set_composable_vars")
inventory.populate_inventory_from_namespace(None, "test", DEFAULT_NAMESPACE, opts)
# These should always get called once
get_vms_for_namespace.assert_called_once_with(None, DEFAULT_NAMESPACE, opts)
get_vmis_for_namespace.assert_called_once_with(None, DEFAULT_NAMESPACE, opts)
inventory.populate_inventory_from_namespace(
DEFAULT_NAMESPACE, {"vms": vms, "vmis": vmis, "services": {}}, opts
)
# Assert it tries to add the expected vars for all provided VMs/VMIs
obj_is_valid.assert_has_calls(obj_is_valid_calls)
set_vars_from_vm.assert_has_calls(set_vars_from_vm_calls)
set_vars_from_vmi.assert_has_calls(set_vars_from_vmi_calls)
set_composable_vars.assert_has_calls(set_composable_vars_calls)
# If no VMs or VMIs were provided the function should not add any groups
if vms or vmis:
get_ssh_services_for_namespace.assert_called_once_with(None, DEFAULT_NAMESPACE)
assert list(groups.keys()) == ["test", f"namespace_{DEFAULT_NAMESPACE}"]
else:
get_ssh_services_for_namespace.assert_not_called()
assert not list(groups.keys())
# Assert the expected amount of hosts was added

View File

@@ -8,8 +8,6 @@ __metaclass__ = type
import pytest
from addict import Dict
from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import (
InventoryOptions,
)
@@ -29,7 +27,7 @@ def test_use_ip_address_by_default(mocker, inventory, opts):
hostname = "default-testvm"
ip_address = "1.1.1.1"
inventory.set_ansible_host_and_port(Dict(), hostname, ip_address, None, opts)
inventory.set_ansible_host_and_port({}, hostname, ip_address, None, opts)
set_variable.assert_has_calls(
[
@@ -50,12 +48,10 @@ def test_kube_secondary_dns(mocker, inventory, base_domain):
set_variable = mocker.patch.object(inventory.inventory, "set_variable")
hostname = "default-testvm"
vmi = Dict(
{
"metadata": {"name": "testvm", "namespace": "default"},
"status": {"interfaces": [{"name": "awesome"}]},
}
)
vmi = {
"metadata": {"name": "testvm", "namespace": "default"},
"status": {"interfaces": [{"name": "awesome"}]},
}
inventory.set_ansible_host_and_port(
vmi,
@@ -85,12 +81,10 @@ def test_kube_secondary_dns_precedence_over_service(mocker, inventory):
set_variable = mocker.patch.object(inventory.inventory, "set_variable")
hostname = "default-testvm"
vmi = Dict(
{
"metadata": {"name": "testvm", "namespace": "default"},
"status": {"interfaces": [{"name": "awesome"}]},
}
)
vmi = {
"metadata": {"name": "testvm", "namespace": "default"},
"status": {"interfaces": [{"name": "awesome"}]},
}
inventory.set_ansible_host_and_port(
vmi,
@@ -170,13 +164,11 @@ def test_service(mocker, inventory, service, expected_host, expected_port):
set_variable = mocker.patch.object(inventory.inventory, "set_variable")
hostname = "default-testvm"
vmi = Dict(
{
"status": {
"nodeName": "testnode.example.com",
},
}
)
vmi = {
"status": {
"nodeName": "testnode.example.com",
},
}
inventory.set_ansible_host_and_port(
vmi,
@@ -198,13 +190,11 @@ def test_service_append_base_domain(mocker, inventory):
set_variable = mocker.patch.object(inventory.inventory, "set_variable")
hostname = "default-testvm"
vmi = Dict(
{
"status": {
"nodeName": "testnode",
},
}
)
vmi = {
"status": {
"nodeName": "testnode",
},
}
service = {
"spec": {
"type": "NodePort",
@@ -243,13 +233,11 @@ def test_service_fallback(mocker, inventory, host, port):
mocker.patch.object(inventory, "get_port_from_service", return_value=port)
hostname = "default-testvm"
vmi = Dict(
{
"status": {
"nodeName": "testnode",
},
}
)
vmi = {
"status": {
"nodeName": "testnode",
},
}
inventory.set_ansible_host_and_port(
vmi,
hostname,
@@ -271,7 +259,7 @@ def test_no_service_if_network_name(mocker, inventory):
hostname = "default-testvm"
inventory.set_ansible_host_and_port(
Dict(),
{},
hostname,
"1.2.3.4",
{"something": "something"},

View File

@@ -11,8 +11,6 @@ from string import ascii_lowercase
import pytest
from addict import Dict
from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import (
InventoryOptions,
)
@@ -26,6 +24,8 @@ from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import (
"metadata": {
"something": "idontcare",
},
"spec": {},
"status": {},
},
{},
),
@@ -34,6 +34,8 @@ from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import (
"metadata": {
"annotations": {"testanno": "testval"},
},
"spec": {},
"status": {},
},
{"annotations": {"testanno": "testval"}},
),
@@ -42,6 +44,8 @@ from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import (
"metadata": {
"labels": {"testlabel": "testval"},
},
"spec": {},
"status": {},
},
{"labels": {"testlabel": "testval"}},
),
@@ -50,6 +54,8 @@ from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import (
"metadata": {
"resourceVersion": "123",
},
"spec": {},
"status": {},
},
{"resource_version": "123"},
),
@@ -58,11 +64,15 @@ from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import (
"metadata": {
"uid": "48e6ed2c-d8a2-4172-844d-0fe7056aa180",
},
"spec": {},
"status": {},
},
{"uid": "48e6ed2c-d8a2-4172-844d-0fe7056aa180"},
),
(
{
"metadata": {},
"spec": {},
"status": {
"interfaces": [{"ipAddress": "10.10.10.10"}],
},
@@ -75,7 +85,7 @@ def test_set_common_vars(inventory, hosts, obj, expected):
hostname = "default-testvm"
prefix = "".join(choice(ascii_lowercase) for i in range(5))
inventory.inventory.add_host(hostname)
inventory.set_common_vars(hostname, prefix, Dict(obj), InventoryOptions())
inventory.set_common_vars(hostname, prefix, obj, InventoryOptions())
for key, value in expected.items():
prefixed_key = f"{prefix}_{key}"
@@ -99,7 +109,7 @@ def test_set_common_vars_create_groups(mocker, inventory, create_groups):
opts = InventoryOptions(create_groups=create_groups)
inventory.set_common_vars(
hostname, "prefix", Dict({"metadata": {"labels": labels}}), opts
hostname, "prefix", {"metadata": {"labels": labels}, "status": {}}, opts
)
if create_groups:
@@ -111,12 +121,16 @@ def test_set_common_vars_create_groups(mocker, inventory, create_groups):
def test_called_by_set_vars_from(mocker, inventory):
hostname = "default-testvm"
opts = InventoryOptions()
obj = {"status": {}}
set_common_vars = mocker.patch.object(inventory, "set_common_vars")
inventory.set_vars_from_vm(hostname, Dict(), opts)
inventory.set_vars_from_vmi(hostname, Dict(), {}, opts)
inventory.set_vars_from_vm(hostname, obj, opts)
inventory.set_vars_from_vmi(hostname, obj, {}, opts)
set_common_vars.assert_has_calls(
[mocker.call(hostname, "vm", {}, opts), mocker.call(hostname, "vmi", {}, opts)]
[
mocker.call(hostname, "vm", obj, opts),
mocker.call(hostname, "vmi", obj, opts),
]
)

View File

@@ -6,8 +6,6 @@ from __future__ import absolute_import, division, print_function
__metaclass__ = type
from addict import Dict
from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import (
InventoryOptions,
LABEL_KUBEVIRT_IO_DOMAIN,
@@ -20,7 +18,7 @@ def test_ignore_vmi_without_interface(mocker, inventory):
inventory, "set_ansible_host_and_port"
)
vmi = Dict({"status": {}})
vmi = {"status": {}}
inventory.set_vars_from_vmi("default-testvm", vmi, {}, InventoryOptions())
set_ansible_host_and_port.assert_not_called()
@@ -33,9 +31,10 @@ def test_use_first_interface_by_default(mocker, inventory):
)
hostname = "default-testvm"
vmi = Dict(
{"status": {"interfaces": [{"ipAddress": "1.1.1.1"}, {"ipAddress": "2.2.2.2"}]}}
)
vmi = {
"metadata": {},
"status": {"interfaces": [{"ipAddress": "1.1.1.1"}, {"ipAddress": "2.2.2.2"}]},
}
opts = InventoryOptions()
inventory.set_vars_from_vmi(hostname, vmi, {}, opts)
@@ -51,16 +50,15 @@ def test_use_named_interface(mocker, inventory):
)
hostname = "default-testvm"
vmi = Dict(
{
"status": {
"interfaces": [
{"name": "first", "ipAddress": "1.1.1.1"},
{"name": "second", "ipAddress": "2.2.2.2"},
]
}
}
)
vmi = {
"metadata": {},
"status": {
"interfaces": [
{"name": "first", "ipAddress": "1.1.1.1"},
{"name": "second", "ipAddress": "2.2.2.2"},
]
},
}
opts = InventoryOptions(network_name="second")
inventory.set_vars_from_vmi(hostname, vmi, {}, opts)
@@ -75,9 +73,10 @@ def test_ignore_vmi_without_named_interface(mocker, inventory):
inventory, "set_ansible_host_and_port"
)
vmi = Dict(
{"status": {"interfaces": [{"name": "somename", "ipAddress": "1.1.1.1"}]}}
)
vmi = {
"metadata": {},
"status": {"interfaces": [{"name": "somename", "ipAddress": "1.1.1.1"}]},
}
inventory.set_vars_from_vmi(
"default-testvm", vmi, {}, InventoryOptions(network_name="awesome")
)
@@ -92,7 +91,7 @@ def test_set_winrm_if_windows(mocker, inventory):
set_variable = mocker.patch.object(inventory.inventory, "set_variable")
hostname = "default-testvm"
vmi = Dict({"status": {"interfaces": [{"ipAddress": "1.1.1.1"}]}})
vmi = {"metadata": {}, "status": {"interfaces": [{"ipAddress": "1.1.1.1"}]}}
inventory.set_vars_from_vmi(hostname, vmi, {}, InventoryOptions())
set_variable.assert_called_once_with(hostname, "ansible_connection", "winrm")
@@ -105,12 +104,10 @@ def test_service_lookup(mocker, inventory):
)
hostname = "default-testvm"
vmi = Dict(
{
"metadata": {"labels": {LABEL_KUBEVIRT_IO_DOMAIN: "testdomain"}},
"status": {"interfaces": [{"name": "somename", "ipAddress": "1.1.1.1"}]},
}
)
vmi = {
"metadata": {"labels": {LABEL_KUBEVIRT_IO_DOMAIN: "testdomain"}},
"status": {"interfaces": [{"name": "somename", "ipAddress": "1.1.1.1"}]},
}
opts = InventoryOptions()
service = {"metadata": {"name": "testsvc"}}
inventory.set_vars_from_vmi(hostname, vmi, {"testdomain": service}, opts)

View File

@@ -2,7 +2,6 @@
jsonpatch
kubernetes>=28.1.0
PyYAML>=3.11
addict
pytest
pytest-mock
pytest-xdist