Files
kubernetes.core/tests/unit/module_utils/test_waiter.py
Steve Ovens 7cdf0d03f5 waiter.py Add ClusterOperator Test (#879)
SUMMARY
Fixes #869
During an OpenShift installation, one of the checks to see that the cluster is ready to proceed with configuration is to check to ensure that the Cluster Operators are in an Available: True Degraded: False Progressing: False state. While you can currently use the k8s_info module to get a json response, the resulting json needs to be iterated over several times to get the appropriate status.
This PR adds functionality into waiter.py which loops over all resource instances of the cluster operators. If any of them is not ready, waiter returns False and the task false. If the task returns, you can assume that all the cluster operators are healthy.


ISSUE TYPE


Feature Pull Request

COMPONENT NAME

waiter.py
ADDITIONAL INFORMATION



A simple playbook will trigger the waiter.py to watch the ClusterOperator object

---
- name: get operators
  hosts: localhost
  gather_facts: false
  tasks:
    - name: Get cluster operators
      kubernetes.core.k8s_info:
        api_version: v1
        kind: ClusterOperator
        kubeconfig: "/home/ocp/one/auth/kubeconfig"
        wait: true
        wait_timeout: 30
      register: cluster_operators


This will produce the simple response if everything is functioning properly:
PLAY [get operators] *************************************************************************************************

TASK [Get cluster operators] *****************************************************************************************
ok: [localhost]

PLAY RECAP ***********************************************************************************************************
localhost                  : ok=1    changed=0    unreachable=0    failed=0    skipped=0    rescued=0    ignored=0   

If the timeout is reached:
PLAY [get operators] *************************************************************************************************

TASK [Get cluster operators] *****************************************************************************************
An exception occurred during task execution. To see the full traceback, use -vvv. The error was: ansible_collections.kubernetes.core.plugins.module_utils.k8s.exceptions.CoreException: Failed to gather information about ClusterOperator(s) even after waiting for 30 seconds
fatal: [localhost]: FAILED! => {"changed": false, "msg": "Failed to gather information about ClusterOperator(s) even after waiting for 30 seconds"}

PLAY RECAP ***********************************************************************************************************
localhost                  : ok=0    changed=0    unreachable=0    failed=1    skipped=0    rescued=0    ignored=0   

UNSOLVED: How to know which Operators are failing

Reviewed-by: Mandar Kulkarni <mandar242@gmail.com>
Reviewed-by: Bikouo Aubin
2025-02-26 17:53:12 +00:00

131 lines
4.0 KiB
Python

import os
import time
from pathlib import Path
from unittest.mock import Mock
import pytest
import yaml
from ansible_collections.kubernetes.core.plugins.module_utils.k8s.waiter import (
DummyWaiter,
Waiter,
clock,
cluster_operator_ready,
custom_condition,
deployment_ready,
exists,
get_waiter,
pod_ready,
resource_absent,
)
from kubernetes.dynamic.exceptions import NotFoundError
from kubernetes.dynamic.resource import ResourceInstance
def resources(filepath):
current = Path(os.path.dirname(os.path.abspath(__file__)))
with open(current / filepath) as fp:
return [ResourceInstance(None, d) for d in yaml.safe_load_all(fp)]
RESOURCES = resources("fixtures/definitions.yml")
PODS = resources("fixtures/pods.yml")
DEPLOYMENTS = resources("fixtures/deployments.yml")
CLUSTER_OPERATOR = resources("fixtures/clusteroperator.yml")
def test_clock_times_out():
start = time.monotonic()
for x in clock(5, 1):
pass
elapsed = int(time.monotonic() - start)
assert x == 5
assert 5 <= elapsed <= 6
@pytest.mark.parametrize(
"resource,expected",
zip(RESOURCES + [None, {}], [True, True, True, False, False, False]),
)
def test_exists_and_absent_checks_for_existence(resource, expected):
assert exists(resource) is expected
assert resource_absent(resource) is not expected
@pytest.mark.parametrize("pod,expected", zip(PODS, [True, False, True, True]))
def test_pod_ready_checks_readiness(pod, expected):
assert pod_ready(pod) is expected
@pytest.mark.parametrize("pod,expected", zip(PODS, [True, False, False, False]))
def test_custom_condition_checks_readiness(pod, expected):
condition = {"type": "www.example.com/gate", "status": "True"}
assert custom_condition(condition, pod) is expected
@pytest.mark.parametrize("deployment,expected", zip(DEPLOYMENTS, [True, False]))
def test_deployment_ready_checks_readiness(deployment, expected):
assert deployment_ready(deployment) is expected
def test_dummywaiter_returns_resource_immediately():
resource = {
"kind": "Pod",
"apiVersion": "v1",
"metadata": {"name": "foopod", "namespace": "foobar"},
}
result, instance, elapsed = DummyWaiter().wait(resource, 10, 100)
assert result is True
assert instance == resource
assert elapsed == 0
def test_waiter_waits_for_missing_resource():
spec = {"get.side_effect": NotFoundError(Mock())}
client = Mock(**spec)
resource = Mock()
result, instance, elapsed = Waiter(client, resource, exists).wait(
timeout=3,
sleep=1,
name=RESOURCES[0]["metadata"].get("name"),
namespace=RESOURCES[0]["metadata"].get("namespace"),
)
assert result is False
assert instance == {}
assert abs(elapsed - 3) <= 1
@pytest.mark.parametrize("resource,expected", zip(RESOURCES, [True, True, True, False]))
def test_waiter_waits_for_resource_to_exist(resource, expected):
result = resource.to_dict()
spec = {"get.side_effect": [NotFoundError(Mock()), resource, resource, resource]}
client = Mock(**spec)
success, instance, elapsed = Waiter(client, Mock(), exists).wait(
timeout=3,
sleep=1,
name=result["metadata"].get("name"),
namespace=result["metadata"].get("namespace"),
)
assert success is expected
assert instance == result
assert abs(elapsed - 2) <= 1
def test_get_waiter_returns_correct_waiter():
assert get_waiter(Mock(), PODS[0]).predicate == pod_ready
waiter = get_waiter(Mock(), PODS[0], check_mode=True)
assert isinstance(waiter, DummyWaiter)
assert get_waiter(Mock(), PODS[0], state="absent").predicate == resource_absent
assert (
get_waiter(
Mock(), PODS[0], condition={"type": "Ready", "status": "True"}
).predicate.func
== custom_condition
)
@pytest.mark.parametrize(
"clusteroperator,expected", zip(CLUSTER_OPERATOR, [True, False, False, False])
)
def test_cluster_operator(clusteroperator, expected):
assert cluster_operator_ready(clusteroperator) is expected