K8sService class (#307)

K8sService class

SUMMARY

This refactors the perform_action() logic from common.py into a separate K8sService class.
TODO:

 Unit tests.

ISSUE TYPE

New Module Pull Request

COMPONENT NAME

service.py

Reviewed-by: Abhijeet Kasurde <None>
Reviewed-by: Mike Graves <mgraves@redhat.com>
Reviewed-by: Alina Buzachis <None>
Reviewed-by: None <None>
This commit is contained in:
Alina Buzachis
2022-01-13 16:08:47 +01:00
committed by Mike Graves
parent f168a3f67f
commit e2f54d3431
6 changed files with 1029 additions and 8 deletions

View File

@@ -128,17 +128,27 @@ class Waiter:
def wait(
self,
definition: Dict,
timeout: int,
sleep: int,
name: Optional[str] = None,
namespace: Optional[str] = None,
label_selectors: Optional[List[str]] = None,
field_selectors: Optional[List[str]] = None,
) -> Tuple[bool, Optional[Dict], int]:
params = {
"name": definition["metadata"].get("name"),
"namespace": definition["metadata"].get("namespace"),
}
params = {}
if name:
params["name"] = name
if namespace:
params["namespace"] = namespace
if label_selectors:
params["label_selector"] = ",".join(label_selectors)
if field_selectors:
params["field_selector"] = ",".join(field_selectors)
instance: Optional[Dict] = None
response = None
elapsed = 0