mirror of
https://github.com/ansible-collections/kubernetes.core.git
synced 2026-05-06 13:02:37 +00:00
action/k8s_info: refactoring to simplify the code base (#331)
Break down the long `run()` method to simplify the readability of the plugin.
This commit is contained in:
@@ -8,6 +8,8 @@ __metaclass__ = type
|
||||
|
||||
import copy
|
||||
import traceback
|
||||
from contextlib import contextmanager
|
||||
|
||||
|
||||
from ansible.config.manager import ensure_type
|
||||
from ansible.errors import AnsibleError, AnsibleFileNotFound, AnsibleAction, AnsibleActionFail
|
||||
@@ -34,6 +36,122 @@ class ActionModule(ActionBase):
|
||||
|
||||
return result
|
||||
|
||||
@contextmanager
|
||||
def get_template_data(self, template_path):
|
||||
try:
|
||||
source = self._find_needle('templates', template_path)
|
||||
except AnsibleError as e:
|
||||
raise AnsibleActionFail(to_text(e))
|
||||
|
||||
# Get vault decrypted tmp file
|
||||
try:
|
||||
tmp_source = self._loader.get_real_file(source)
|
||||
except AnsibleFileNotFound as e:
|
||||
raise AnsibleActionFail("could not find template=%s, %s" % (source, to_text(e)))
|
||||
b_tmp_source = to_bytes(tmp_source, errors='surrogate_or_strict')
|
||||
|
||||
try:
|
||||
with open(b_tmp_source, 'rb') as f:
|
||||
try:
|
||||
template_data = to_text(f.read(), errors='surrogate_or_strict')
|
||||
except UnicodeError:
|
||||
raise AnsibleActionFail("Template source files must be utf-8 encoded")
|
||||
yield template_data
|
||||
except AnsibleAction:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise AnsibleActionFail("%s: %s" % (type(e).__name__, to_text(e)))
|
||||
finally:
|
||||
self._loader.cleanup_tmp_file(b_tmp_source)
|
||||
|
||||
def load_template(self, template, new_module_args, task_vars):
|
||||
# template is only supported by k8s module.
|
||||
if self._task.action not in ('k8s', 'community.kubernetes.k8s', 'community.okd.k8s'):
|
||||
raise AnsibleActionFail("'template' is only supported parameter for 'k8s' module.")
|
||||
if isinstance(template, string_types):
|
||||
# treat this as raw_params
|
||||
template_path = template
|
||||
newline_sequence = self.DEFAULT_NEWLINE_SEQUENCE
|
||||
variable_start_string = None
|
||||
variable_end_string = None
|
||||
block_start_string = None
|
||||
block_end_string = None
|
||||
trim_blocks = True
|
||||
lstrip_blocks = False
|
||||
elif isinstance(template, dict):
|
||||
template_args = template
|
||||
template_path = template_args.get('path', None)
|
||||
if not template:
|
||||
raise AnsibleActionFail("Please specify path for template.")
|
||||
|
||||
# Options type validation strings
|
||||
for s_type in ('newline_sequence', 'variable_start_string', 'variable_end_string', 'block_start_string',
|
||||
'block_end_string'):
|
||||
if s_type in template_args:
|
||||
value = ensure_type(template_args[s_type], 'string')
|
||||
if value is not None and not isinstance(value, string_types):
|
||||
raise AnsibleActionFail("%s is expected to be a string, but got %s instead" % (s_type, type(value)))
|
||||
try:
|
||||
trim_blocks = boolean(template_args.get('trim_blocks', True), strict=False)
|
||||
lstrip_blocks = boolean(template_args.get('lstrip_blocks', False), strict=False)
|
||||
except TypeError as e:
|
||||
raise AnsibleActionFail(to_native(e))
|
||||
|
||||
newline_sequence = template_args.get('newline_sequence', self.DEFAULT_NEWLINE_SEQUENCE)
|
||||
variable_start_string = template_args.get('variable_start_string', None)
|
||||
variable_end_string = template_args.get('variable_end_string', None)
|
||||
block_start_string = template_args.get('block_start_string', None)
|
||||
block_end_string = template_args.get('block_end_string', None)
|
||||
else:
|
||||
raise AnsibleActionFail("Error while reading template file - "
|
||||
"a string or dict for template expected, but got %s instead" % type(template))
|
||||
|
||||
# Option `lstrip_blocks' was added in Jinja2 version 2.7.
|
||||
if lstrip_blocks:
|
||||
try:
|
||||
import jinja2.defaults
|
||||
except ImportError:
|
||||
raise AnsibleError('Unable to import Jinja2 defaults for determining Jinja2 features.')
|
||||
|
||||
try:
|
||||
jinja2.defaults.LSTRIP_BLOCKS
|
||||
except AttributeError:
|
||||
raise AnsibleError("Option `lstrip_blocks' is only available in Jinja2 versions >=2.7")
|
||||
|
||||
wrong_sequences = ["\\n", "\\r", "\\r\\n"]
|
||||
allowed_sequences = ["\n", "\r", "\r\n"]
|
||||
|
||||
# We need to convert unescaped sequences to proper escaped sequences for Jinja2
|
||||
if newline_sequence in wrong_sequences:
|
||||
newline_sequence = allowed_sequences[wrong_sequences.index(newline_sequence)]
|
||||
elif newline_sequence not in allowed_sequences:
|
||||
raise AnsibleActionFail("newline_sequence needs to be one of: \n, \r or \r\n")
|
||||
|
||||
# template the source data locally & get ready to transfer
|
||||
with self.get_template_data(template_path) as template_data:
|
||||
# add ansible 'template' vars
|
||||
temp_vars = task_vars.copy()
|
||||
old_vars = self._templar.available_variables
|
||||
|
||||
self._templar.environment.newline_sequence = newline_sequence
|
||||
if block_start_string is not None:
|
||||
self._templar.environment.block_start_string = block_start_string
|
||||
if block_end_string is not None:
|
||||
self._templar.environment.block_end_string = block_end_string
|
||||
if variable_start_string is not None:
|
||||
self._templar.environment.variable_start_string = variable_start_string
|
||||
if variable_end_string is not None:
|
||||
self._templar.environment.variable_end_string = variable_end_string
|
||||
self._templar.environment.trim_blocks = trim_blocks
|
||||
self._templar.environment.lstrip_blocks = lstrip_blocks
|
||||
self._templar.available_variables = temp_vars
|
||||
resultant = self._templar.do_template(template_data, preserve_trailing_newlines=True, escape_backslashes=False)
|
||||
self._templar.available_variables = old_vars
|
||||
resource_definition = self._task.args.get('definition', None)
|
||||
if not resource_definition:
|
||||
new_module_args.pop('template')
|
||||
new_module_args['definition'] = resultant
|
||||
|
||||
def run(self, tmp=None, task_vars=None):
|
||||
''' handler for k8s options '''
|
||||
if task_vars is None:
|
||||
@@ -91,116 +209,7 @@ class ActionModule(ActionBase):
|
||||
|
||||
template = self._task.args.get('template', None)
|
||||
if template:
|
||||
# template is only supported by k8s module.
|
||||
if self._task.action not in ('k8s', 'community.kubernetes.k8s', 'community.okd.k8s'):
|
||||
raise AnsibleActionFail("'template' is only supported parameter for 'k8s' module.")
|
||||
if isinstance(template, string_types):
|
||||
# treat this as raw_params
|
||||
template_path = template
|
||||
newline_sequence = self.DEFAULT_NEWLINE_SEQUENCE
|
||||
variable_start_string = None
|
||||
variable_end_string = None
|
||||
block_start_string = None
|
||||
block_end_string = None
|
||||
trim_blocks = True
|
||||
lstrip_blocks = False
|
||||
elif isinstance(template, dict):
|
||||
template_args = template
|
||||
template_path = template_args.get('path', None)
|
||||
if not template:
|
||||
raise AnsibleActionFail("Please specify path for template.")
|
||||
|
||||
# Options type validation strings
|
||||
for s_type in ('newline_sequence', 'variable_start_string', 'variable_end_string', 'block_start_string',
|
||||
'block_end_string'):
|
||||
if s_type in template_args:
|
||||
value = ensure_type(template_args[s_type], 'string')
|
||||
if value is not None and not isinstance(value, string_types):
|
||||
raise AnsibleActionFail("%s is expected to be a string, but got %s instead" % (s_type, type(value)))
|
||||
try:
|
||||
trim_blocks = boolean(template_args.get('trim_blocks', True), strict=False)
|
||||
lstrip_blocks = boolean(template_args.get('lstrip_blocks', False), strict=False)
|
||||
except TypeError as e:
|
||||
raise AnsibleActionFail(to_native(e))
|
||||
|
||||
newline_sequence = template_args.get('newline_sequence', self.DEFAULT_NEWLINE_SEQUENCE)
|
||||
variable_start_string = template_args.get('variable_start_string', None)
|
||||
variable_end_string = template_args.get('variable_end_string', None)
|
||||
block_start_string = template_args.get('block_start_string', None)
|
||||
block_end_string = template_args.get('block_end_string', None)
|
||||
else:
|
||||
raise AnsibleActionFail("Error while reading template file - "
|
||||
"a string or dict for template expected, but got %s instead" % type(template))
|
||||
|
||||
# Option `lstrip_blocks' was added in Jinja2 version 2.7.
|
||||
if lstrip_blocks:
|
||||
try:
|
||||
import jinja2.defaults
|
||||
except ImportError:
|
||||
raise AnsibleError('Unable to import Jinja2 defaults for determining Jinja2 features.')
|
||||
|
||||
try:
|
||||
jinja2.defaults.LSTRIP_BLOCKS
|
||||
except AttributeError:
|
||||
raise AnsibleError("Option `lstrip_blocks' is only available in Jinja2 versions >=2.7")
|
||||
|
||||
wrong_sequences = ["\\n", "\\r", "\\r\\n"]
|
||||
allowed_sequences = ["\n", "\r", "\r\n"]
|
||||
|
||||
# We need to convert unescaped sequences to proper escaped sequences for Jinja2
|
||||
if newline_sequence in wrong_sequences:
|
||||
newline_sequence = allowed_sequences[wrong_sequences.index(newline_sequence)]
|
||||
elif newline_sequence not in allowed_sequences:
|
||||
raise AnsibleActionFail("newline_sequence needs to be one of: \n, \r or \r\n")
|
||||
|
||||
try:
|
||||
source = self._find_needle('templates', template_path)
|
||||
except AnsibleError as e:
|
||||
raise AnsibleActionFail(to_text(e))
|
||||
|
||||
# Get vault decrypted tmp file
|
||||
try:
|
||||
tmp_source = self._loader.get_real_file(source)
|
||||
except AnsibleFileNotFound as e:
|
||||
raise AnsibleActionFail("could not find template=%s, %s" % (source, to_text(e)))
|
||||
b_tmp_source = to_bytes(tmp_source, errors='surrogate_or_strict')
|
||||
|
||||
# template the source data locally & get ready to transfer
|
||||
try:
|
||||
with open(b_tmp_source, 'rb') as f:
|
||||
try:
|
||||
template_data = to_text(f.read(), errors='surrogate_or_strict')
|
||||
except UnicodeError:
|
||||
raise AnsibleActionFail("Template source files must be utf-8 encoded")
|
||||
|
||||
# add ansible 'template' vars
|
||||
temp_vars = task_vars.copy()
|
||||
old_vars = self._templar.available_variables
|
||||
|
||||
self._templar.environment.newline_sequence = newline_sequence
|
||||
if block_start_string is not None:
|
||||
self._templar.environment.block_start_string = block_start_string
|
||||
if block_end_string is not None:
|
||||
self._templar.environment.block_end_string = block_end_string
|
||||
if variable_start_string is not None:
|
||||
self._templar.environment.variable_start_string = variable_start_string
|
||||
if variable_end_string is not None:
|
||||
self._templar.environment.variable_end_string = variable_end_string
|
||||
self._templar.environment.trim_blocks = trim_blocks
|
||||
self._templar.environment.lstrip_blocks = lstrip_blocks
|
||||
self._templar.available_variables = temp_vars
|
||||
resultant = self._templar.do_template(template_data, preserve_trailing_newlines=True, escape_backslashes=False)
|
||||
self._templar.available_variables = old_vars
|
||||
resource_definition = self._task.args.get('definition', None)
|
||||
if not resource_definition:
|
||||
new_module_args.pop('template')
|
||||
new_module_args['definition'] = resultant
|
||||
except AnsibleAction:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise AnsibleActionFail("%s: %s" % (type(e).__name__, to_text(e)))
|
||||
finally:
|
||||
self._loader.cleanup_tmp_file(b_tmp_source)
|
||||
self.load_template(template, new_module_args, task_vars)
|
||||
|
||||
# Execute the k8s_* module.
|
||||
module_return = self._execute_module(module_name=self._task.action, module_args=new_module_args, task_vars=task_vars)
|
||||
|
||||
Reference in New Issue
Block a user