feat: Set ansible_connection to winrm for Windows hosts

This changes the inventory plugin so that it sets the ansible_connection
to winrm if it detected a Windows host. If it did not detect a Windows
host the ansible_connection is no longer set, so Ansible falls back to
its default value of ssh. The detection of SSH services for hosts using
winrm is disabled.

Signed-off-by: Felix Matouschek <fmatouschek@redhat.com>
This commit is contained in:
Felix Matouschek
2024-04-15 14:28:57 +02:00
parent 7c5de4adf9
commit a3abcbedd4
4 changed files with 251 additions and 15 deletions

View File

@@ -202,10 +202,13 @@ from ansible_collections.kubernetes.core.plugins.module_utils.k8s.client import
K8SClient,
)
ANNOTATION_KUBEVIRT_IO_CLUSTER_PREFERENCE_NAME = "kubevirt.io/cluster-preference-name"
ANNOTATION_KUBEVIRT_IO_PREFERENCE_NAME = "kubevirt.io/preference-name"
ANNOTATION_VM_KUBEVIRT_IO_OS = "vm.kubevirt.io/os"
LABEL_KUBEVIRT_IO_DOMAIN = "kubevirt.io/domain"
TYPE_LOADBALANCER = "LoadBalancer"
TYPE_NODEPORT = "NodePort"
ID_MSWINDOWS = "mswindows"
class KubeVirtInventoryException(Exception):
@@ -318,6 +321,26 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
return None
@staticmethod
def is_windows(guest_os_info: Dict, annotations: Dict) -> bool:
if "id" in guest_os_info:
return guest_os_info["id"] == ID_MSWINDOWS
if ANNOTATION_KUBEVIRT_IO_CLUSTER_PREFERENCE_NAME in annotations:
return annotations[
ANNOTATION_KUBEVIRT_IO_CLUSTER_PREFERENCE_NAME
].startswith("windows")
if ANNOTATION_KUBEVIRT_IO_PREFERENCE_NAME in annotations:
return annotations[ANNOTATION_KUBEVIRT_IO_PREFERENCE_NAME].startswith(
"windows"
)
if ANNOTATION_VM_KUBEVIRT_IO_OS in annotations:
return annotations[ANNOTATION_VM_KUBEVIRT_IO_OS].startswith("windows")
return False
def __init__(self) -> None:
super().__init__()
self.host_format = None
@@ -523,16 +546,6 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
self.inventory.add_group(group)
self.inventory.add_child(group, vmi_name)
# Set up the connection
self.inventory.set_variable(vmi_name, "ansible_connection", "ssh")
self.set_ansible_host_and_port(
vmi,
vmi_name,
interface.ipAddress,
services.get(vmi.metadata.labels.get(LABEL_KUBEVIRT_IO_DOMAIN)),
opts,
)
# Add hostvars from metadata
self.inventory.set_variable(vmi_name, "object_type", "vmi")
self.inventory.set_variable(vmi_name, "labels", vmi_labels)
@@ -605,6 +618,23 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
self.inventory.set_variable(
vmi_name, "vmi_volume_status", vmi_volume_status
)
# Set up the connection
service = None
if self.is_windows(vmi_guest_os_info, vmi_annotations):
self.inventory.set_variable(vmi_name, "ansible_connection", "winrm")
else:
service = services.get(
vmi.metadata.labels.get(LABEL_KUBEVIRT_IO_DOMAIN)
)
self.set_ansible_host_and_port(
vmi,
vmi_name,
interface.ipAddress,
service,
opts,
)
self.set_composable_vars(vmi_name)
def set_composable_vars(self, vmi_name):

View File

@@ -1,3 +1,4 @@
addict
pytest
pytest-ansible
pytest-mock

View File

@@ -8,12 +8,131 @@ __metaclass__ = type
import pytest
from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import InventoryModule
from addict import Dict
from ansible_collections.kubevirt.core.plugins.inventory.kubevirt import (
GetVmiOptions,
InventoryModule,
)
from ansible_collections.kubevirt.core.tests.unit.utils.merge_dicts import (
merge_dicts,
)
DEFAULT_NAMESPACE = "default"
DEFAULT_BASE_DOMAIN = "example.com"
BASE_VMI = {
"apiVersion": "kubevirt.io/v1",
"kind": "VirtualMachineInstance",
"metadata": {
"name": "testvmi",
"namespace": "default",
"uid": "f8abae7c-d792-4b9b-af95-62d322ae5bc1",
},
"spec": {
"domain": {"devices": {}},
},
"status": {
"interfaces": [{"ipAddress": "10.10.10.10"}],
},
}
WINDOWS_VMI_1 = merge_dicts(
BASE_VMI,
{
"status": {
"guestOSInfo": {"id": "mswindows"},
}
},
)
WINDOWS_VMI_2 = merge_dicts(
BASE_VMI,
{
"metadata": {
"annotations": {"kubevirt.io/cluster-preference-name": "windows.2k22"}
},
},
)
WINDOWS_VMI_3 = merge_dicts(
BASE_VMI,
{
"metadata": {"annotations": {"kubevirt.io/preference-name": "windows.2k22"}},
},
)
WINDOWS_VMI_4 = merge_dicts(
BASE_VMI,
{
"metadata": {"annotations": {"vm.kubevirt.io/os": "windows2k22"}},
},
)
@pytest.fixture
def inventory():
return InventoryModule()
@pytest.fixture(scope="function")
def inventory(mocker):
inventory = InventoryModule()
inventory.inventory = mocker.Mock()
mocker.patch.object(inventory, "set_composable_vars")
return inventory
@pytest.fixture(scope="function")
def host_vars(monkeypatch, inventory):
host_vars = {}
def set_variable(host, key, value):
if host not in host_vars:
host_vars[host] = {}
host_vars[host][key] = value
monkeypatch.setattr(inventory.inventory, "set_variable", set_variable)
return host_vars
@pytest.fixture(scope="function")
def client(mocker, request):
namespaces = mocker.Mock()
namespaces.items = [
Dict(item)
for item in request.param.get(
"namespaces", [{"metadata": {"name": DEFAULT_NAMESPACE}}]
)
]
vmis = mocker.Mock()
vmis.items = [Dict(item) for item in request.param.get("vmis", [])]
services = mocker.Mock()
services.items = [Dict(item) for item in request.param.get("services", [])]
dns = Dict(
{"spec": {"baseDomain": request.param.get("base_domain", DEFAULT_BASE_DOMAIN)}}
)
namespace_client = mocker.Mock()
namespace_client.get = mocker.Mock(return_value=namespaces)
vmi_client = mocker.Mock()
vmi_client.get = mocker.Mock(return_value=vmis)
service_client = mocker.Mock()
service_client.get = mocker.Mock(return_value=services)
dns_client = mocker.Mock()
dns_client.get = mocker.Mock(return_value=dns)
def resources_get(api_version="", kind=""):
if api_version.lower() == "v1":
if kind.lower() == "namespace":
return namespace_client
elif kind.lower() == "service":
return service_client
elif api_version.lower() == "config.openshift.io/v1" and kind.lower() == "dns":
return dns_client
elif (
"kubevirt.io/" in api_version.lower()
and kind.lower() == "virtualmachineinstance"
):
return vmi_client
return None
client = mocker.Mock()
client.resources.get = resources_get
return client
@pytest.mark.parametrize(
@@ -50,3 +169,69 @@ def test_verify_file(tmp_path, inventory, file_name, expected):
)
def test_verify_file_bad_config(inventory, file_name):
assert inventory.verify_file(file_name) is False
@pytest.mark.parametrize(
"guest_os_info,annotations,expected",
[
({"id": "mswindows"}, {}, True),
({}, {"kubevirt.io/preference-name": "windows.2k22"}, True),
({}, {"vm.kubevirt.io/os": "windows2k22"}, True),
({}, {}, False),
({"id": "fedora"}, {}, False),
(
{"id": "fedora"},
{"kubevirt.io/cluster-preference-name": "windows.2k22"},
False,
),
({"id": "fedora"}, {"kubevirt.io/preference-name": "windows.2k22"}, False),
({"id": "fedora"}, {"vm.kubevirt.io/os": "windows2k22"}, False),
(
{},
{
"kubevirt.io/cluster-preference-name": "fedora",
"kubevirt.io/preference-name": "windows.2k22",
},
False,
),
(
{},
{
"kubevirt.io/cluster-preference-name": "fedora",
"vm.kubevirt.io/os": "windows2k22",
},
False,
),
(
{},
{
"kubevirt.io/preference-name": "fedora",
"vm.kubevirt.io/os": "windows2k22",
},
False,
),
],
)
def test_is_windows(inventory, guest_os_info, annotations, expected):
assert inventory.is_windows(guest_os_info, annotations) is expected
@pytest.mark.parametrize(
"client,vmi,expected",
[
({"vmis": [BASE_VMI]}, BASE_VMI, False),
({"vmis": [WINDOWS_VMI_1]}, WINDOWS_VMI_1, True),
({"vmis": [WINDOWS_VMI_2]}, WINDOWS_VMI_2, True),
({"vmis": [WINDOWS_VMI_3]}, WINDOWS_VMI_3, True),
({"vmis": [WINDOWS_VMI_4]}, WINDOWS_VMI_4, True),
],
indirect=["client"],
)
def test_ansible_connection_winrm(inventory, host_vars, client, vmi, expected):
inventory.get_vmis_for_namespace(client, "", DEFAULT_NAMESPACE, GetVmiOptions())
host = f"{DEFAULT_NAMESPACE}-{vmi['metadata']['name']}"
if expected:
assert host_vars[host]["ansible_connection"] == "winrm"
else:
assert "ansible_connection" not in host_vars[host]

View File

@@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
# Copyright 2024 Red Hat, Inc.
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
from copy import deepcopy
def merge_dicts(dict1, dict2):
merged = deepcopy(dict1)
for key, value in dict2.items():
if key in merged and isinstance(merged[key], dict) and isinstance(value, dict):
merged[key] = merge_dicts(merged[key], value)
else:
merged[key] = value
return merged