mirror of
https://github.com/kubevirt/kubevirt.core.git
synced 2026-05-07 05:42:38 +00:00
fix(inventory): Fix inventory source caching
Fix inventory source caching by separating the fetching of objects and populating the inventory. This way objects can be fetched from the K8S API or from a configured cached and the cache related parameters on the plugin now actually work. The inventory source cache was tested with the ansible.builtin.jsonfile cache plugin and 100k hosts (~2G JSON file). Though it took a noticeable amount of time for the inventory plugin to run it worked fine and no failures could be observed. Signed-off-by: Felix Matouschek <fmatouschek@redhat.com>
This commit is contained in:
@@ -186,6 +186,8 @@ class InventoryOptions:
|
||||
base_domain: Optional[str] = None
|
||||
append_base_domain: Optional[bool] = None
|
||||
host_format: Optional[str] = None
|
||||
namespaces: Optional[List[str]] = None
|
||||
name: Optional[str] = None
|
||||
config_data: InitVar[Optional[Dict]] = None
|
||||
|
||||
def __post_init__(self, config_data: Optional[Dict]) -> None:
|
||||
@@ -238,6 +240,12 @@ class InventoryOptions:
|
||||
if self.host_format is not None
|
||||
else config_data.get("host_format", "{namespace}-{name}")
|
||||
)
|
||||
self.namespaces = (
|
||||
self.namespaces
|
||||
if self.namespaces is not None
|
||||
else config_data.get("namespaces")
|
||||
)
|
||||
self.name = self.name if self.name is not None else config_data.get("name")
|
||||
|
||||
|
||||
class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
|
||||
@@ -379,51 +387,38 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
|
||||
"""
|
||||
parse is the main entry point of the inventory.
|
||||
It checks for availability of the Kubernetes Python client,
|
||||
gets the configuration and runs fetch_objects or
|
||||
if there is a cache it is used instead.
|
||||
gets the configuration, retrieves the cache or runs fetch_objects and
|
||||
populates the inventory.
|
||||
"""
|
||||
super().parse(inventory, loader, path)
|
||||
cache_key = self._get_cache_prefix(path)
|
||||
config_data = self._read_config_data(path)
|
||||
|
||||
if not HAS_K8S_MODULE_HELPER:
|
||||
raise KubeVirtInventoryException(
|
||||
"This module requires the Kubernetes Python client. "
|
||||
+ f"Try `pip install kubernetes`. Detail: {K8S_IMPORT_EXCEPTION}"
|
||||
)
|
||||
|
||||
source_data = None
|
||||
if cache and cache_key in self._cache:
|
||||
try:
|
||||
source_data = self._cache[cache_key]
|
||||
except KeyError:
|
||||
pass
|
||||
super().parse(inventory, loader, path)
|
||||
|
||||
if not source_data:
|
||||
self.fetch_objects(config_data)
|
||||
|
||||
def fetch_objects(self, config_data: Dict) -> None:
|
||||
"""
|
||||
fetch_objects populates the inventory with the specified parameters.
|
||||
"""
|
||||
if not config_data or not isinstance(config_data, dict):
|
||||
config_data = {}
|
||||
config_data = self._read_config_data(path)
|
||||
cache_key = self.get_cache_key(path)
|
||||
user_cache_setting = self.get_option("cache")
|
||||
attempt_to_read_cache = user_cache_setting and cache
|
||||
cache_needs_update = user_cache_setting and not cache
|
||||
|
||||
self.connections_compatibility(config_data)
|
||||
client = get_api_client(**config_data)
|
||||
name = config_data.get(
|
||||
"name", self.get_default_host_name(client.configuration.host)
|
||||
)
|
||||
namespaces = (
|
||||
config_data["namespaces"]
|
||||
if config_data.get("namespaces")
|
||||
else self.get_available_namespaces(client)
|
||||
)
|
||||
opts = InventoryOptions(config_data=config_data)
|
||||
if opts.base_domain is None:
|
||||
opts.base_domain = self.get_cluster_domain(client)
|
||||
for namespace in namespaces:
|
||||
self.populate_inventory_from_namespace(client, name, namespace, opts)
|
||||
|
||||
results = {}
|
||||
if attempt_to_read_cache:
|
||||
try:
|
||||
results = self._cache[cache_key]
|
||||
except KeyError:
|
||||
cache_needs_update = True
|
||||
if not attempt_to_read_cache or cache_needs_update:
|
||||
results = self.fetch_objects(get_api_client(**config_data), opts)
|
||||
if cache_needs_update:
|
||||
self._cache[cache_key] = results
|
||||
|
||||
self.populate_inventory(results, opts)
|
||||
|
||||
def connections_compatibility(self, config_data: Dict) -> None:
|
||||
collection_name = "kubevirt.core"
|
||||
@@ -462,6 +457,35 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
|
||||
removed=True,
|
||||
)
|
||||
|
||||
def fetch_objects(self, client: Any, opts: InventoryOptions) -> Dict:
|
||||
"""
|
||||
fetch_objects fetches all relevant objects from the K8S API.
|
||||
"""
|
||||
namespaces = {}
|
||||
for namespace in (
|
||||
opts.namespaces
|
||||
if opts.namespaces
|
||||
else self.get_available_namespaces(client)
|
||||
):
|
||||
vms = self.get_vms_for_namespace(client, namespace, opts)
|
||||
vmis = self.get_vmis_for_namespace(client, namespace, opts)
|
||||
|
||||
if not vms and not vmis:
|
||||
# Continue if no VMs and VMIs were found to avoid adding empty groups.
|
||||
continue
|
||||
|
||||
namespaces[namespace] = {
|
||||
"vms": vms,
|
||||
"vmis": vmis,
|
||||
"services": self.get_ssh_services_for_namespace(client, namespace),
|
||||
}
|
||||
|
||||
return {
|
||||
"default_hostname": self.get_default_hostname(client.configuration.host),
|
||||
"cluster_domain": self.get_cluster_domain(client),
|
||||
"namespaces": namespaces,
|
||||
}
|
||||
|
||||
def get_cluster_domain(self, client: K8SClient) -> Optional[str]:
|
||||
"""
|
||||
get_cluster_domain tries to get the base domain of an OpenShift cluster.
|
||||
@@ -577,22 +601,31 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
|
||||
|
||||
return services
|
||||
|
||||
def populate_inventory(self, results: Dict, opts: InventoryOptions) -> None:
|
||||
"""
|
||||
populate_inventory populates the inventory by completing the InventoryOptions
|
||||
and invoking populate_inventory_from_namespace for every namespace in results.
|
||||
"""
|
||||
if opts.base_domain is None:
|
||||
opts.base_domain = results["cluster_domain"]
|
||||
if opts.name is None:
|
||||
opts.name = results["default_hostname"]
|
||||
for namespace, data in results["namespaces"].items():
|
||||
self.populate_inventory_from_namespace(namespace, data, opts)
|
||||
|
||||
def populate_inventory_from_namespace(
|
||||
self, client: K8SClient, name: str, namespace: str, opts: InventoryOptions
|
||||
self, namespace: str, data: Dict, opts: InventoryOptions
|
||||
) -> None:
|
||||
"""
|
||||
populate_inventory_from_namespace adds groups and hosts from a
|
||||
namespace to the inventory.
|
||||
"""
|
||||
vms = {
|
||||
vm["metadata"]["name"]: vm
|
||||
for vm in self.get_vms_for_namespace(client, namespace, opts)
|
||||
if self.obj_is_valid(vm)
|
||||
vm["metadata"]["name"]: vm for vm in data["vms"] if self.obj_is_valid(vm)
|
||||
}
|
||||
vmis = {
|
||||
vmi["metadata"]["name"]: vmi
|
||||
for vmi in self.get_vmis_for_namespace(client, namespace, opts)
|
||||
for vmi in data["vmis"]
|
||||
if self.obj_is_valid(vmi)
|
||||
}
|
||||
|
||||
@@ -600,9 +633,13 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
|
||||
# Return early if no VMs and VMIs were found to avoid adding empty groups.
|
||||
return
|
||||
|
||||
services = self.get_ssh_services_for_namespace(client, namespace)
|
||||
services = {
|
||||
domain: service
|
||||
for domain, service in data["services"].items()
|
||||
if self.obj_is_valid(service)
|
||||
}
|
||||
|
||||
name = self._sanitize_group_name(name)
|
||||
name = self._sanitize_group_name(opts.name)
|
||||
namespace_group = self._sanitize_group_name(f"namespace_{namespace}")
|
||||
|
||||
self.inventory.add_group(name)
|
||||
|
||||
Reference in New Issue
Block a user