From f73a1ce5907281c17dec03dd07a4ee027697962c Mon Sep 17 00:00:00 2001 From: Felix Fontein Date: Mon, 28 Apr 2025 21:45:42 +0200 Subject: [PATCH] Drop compatibility with older versions. (#872) --- changelogs/fragments/872-action-module.yml | 2 + plugins/plugin_utils/action_module.py | 725 ++------------------- 2 files changed, 52 insertions(+), 675 deletions(-) create mode 100644 changelogs/fragments/872-action-module.yml diff --git a/changelogs/fragments/872-action-module.yml b/changelogs/fragments/872-action-module.yml new file mode 100644 index 00000000..bab82e4b --- /dev/null +++ b/changelogs/fragments/872-action-module.yml @@ -0,0 +1,2 @@ +minor_changes: + - "action_module plugin utils - remove compatibility with older ansible-core/ansible-base/Ansible versions (https://github.com/ansible-collections/community.crypto/pull/872)." diff --git a/plugins/plugin_utils/action_module.py b/plugins/plugin_utils/action_module.py index 12fed39a..0fd1cfc3 100644 --- a/plugins/plugin_utils/action_module.py +++ b/plugins/plugin_utils/action_module.py @@ -22,68 +22,19 @@ import abc import copy import traceback -from ansible import constants as C from ansible.errors import AnsibleError from ansible.module_utils import six -from ansible.module_utils.basic import ( - SEQUENCETYPE, - AnsibleFallbackNotFound, - remove_values, -) +from ansible.module_utils.basic import SEQUENCETYPE, remove_values from ansible.module_utils.common._collections_compat import Mapping -from ansible.module_utils.common.parameters import PASS_BOOLS, PASS_VARS -from ansible.module_utils.common.text.converters import to_native, to_text -from ansible.module_utils.common.text.formatters import lenient_lowercase +from ansible.module_utils.common.arg_spec import ArgumentSpecValidator from ansible.module_utils.common.validation import ( - check_mutually_exclusive, - check_required_arguments, - check_required_by, - check_required_if, - check_required_one_of, - check_required_together, - check_type_bits, - check_type_bool, - check_type_bytes, - check_type_dict, - check_type_float, - check_type_int, - check_type_jsonarg, - check_type_list, - check_type_path, - check_type_raw, - check_type_str, - count_terms, safe_eval, ) -from ansible.module_utils.parsing.convert_bool import BOOLEANS_FALSE, BOOLEANS_TRUE -from ansible.module_utils.six import binary_type, string_types, text_type +from ansible.module_utils.errors import UnsupportedError +from ansible.module_utils.six import string_types from ansible.plugins.action import ActionBase -try: - # For ansible-core 2.11, we can use the ArgumentSpecValidator. We also import - # ModuleArgumentSpecValidator since that indicates that the 'classical' approach - # will no longer work. - from ansible.module_utils.common.arg_spec import ( - ArgumentSpecValidator, - ) - from ansible.module_utils.common.arg_spec import ( # noqa: F401, pylint: disable=unused-import; ModuleArgumentSpecValidator is not used - ModuleArgumentSpecValidator as dummy, - ) - from ansible.module_utils.errors import UnsupportedError - - HAS_ARGSPEC_VALIDATOR = True -except ImportError: - # For ansible-base 2.10 and Ansible 2.9, we need to use the 'classical' approach - from ansible.module_utils.common.parameters import ( - handle_aliases, - list_deprecations, - list_no_log_values, - ) - - HAS_ARGSPEC_VALIDATOR = False - - class _ModuleExitException(Exception): def __init__(self, result): super(_ModuleExitException, self).__init__() @@ -130,642 +81,66 @@ class AnsibleActionModule(object): self.params = copy.deepcopy(self.__action_plugin._task.args) self.no_log_values = set() - if HAS_ARGSPEC_VALIDATOR: - self._validator = ArgumentSpecValidator( - self.argument_spec, - self.mutually_exclusive, - self.required_together, - self.required_one_of, - self.required_if, - self.required_by, - ) - self._validation_result = self._validator.validate(self.params) - self.params.update(self._validation_result.validated_parameters) - self.no_log_values.update(self._validation_result._no_log_values) - - try: - error = self._validation_result.errors[0] - except IndexError: - error = None - - # We cannot use ModuleArgumentSpecValidator directly since it uses mechanisms for reporting - # warnings and deprecations that do not work in plugins. This is a copy of that code adjusted - # for our use-case: - for d in self._validation_result._deprecations: - # Before ansible-core 2.14.2, deprecations were always for aliases: - if "name" in d: - self.deprecate( - "Alias '{name}' is deprecated. See the module docs for more information".format( - name=d["name"] - ), - version=d.get("version"), - date=d.get("date"), - collection_name=d.get("collection_name"), - ) - # Since ansible-core 2.14.2, a message is present that can be directly printed: - if "msg" in d: - self.deprecate( - d["msg"], - version=d.get("version"), - date=d.get("date"), - collection_name=d.get("collection_name"), - ) - - for w in self._validation_result._warnings: - self.warn( - "Both option {option} and its alias {alias} are set.".format( - option=w["option"], alias=w["alias"] - ) - ) - - # Fail for validation errors, even in check mode - if error: - msg = self._validation_result.errors.msg - if isinstance(error, UnsupportedError): - msg = "Unsupported parameters for ({name}) {kind}: {msg}".format( - name=self._name, kind="module", msg=msg - ) - - self.fail_json(msg=msg) - else: - self._set_fallbacks() - - # append to legal_inputs and then possibly check against them - try: - self.aliases = self._handle_aliases() - except (ValueError, TypeError) as e: - # Use exceptions here because it is not safe to call fail_json until no_log is processed - raise _ModuleExitException( - dict(failed=True, msg="Module alias error: %s" % to_native(e)) - ) - - # Save parameter values that should never be logged - self._handle_no_log_values() - - self._check_arguments() - - # check exclusive early - if not bypass_checks: - self._check_mutually_exclusive(mutually_exclusive) - - self._set_defaults(pre=True) - - self._CHECK_ARGUMENT_TYPES_DISPATCHER = { - "str": self._check_type_str, - "list": check_type_list, - "dict": check_type_dict, - "bool": check_type_bool, - "int": check_type_int, - "float": check_type_float, - "path": check_type_path, - "raw": check_type_raw, - "jsonarg": check_type_jsonarg, - "json": check_type_jsonarg, - "bytes": check_type_bytes, - "bits": check_type_bits, - } - if not bypass_checks: - self._check_required_arguments() - self._check_argument_types() - self._check_argument_values() - self._check_required_together(required_together) - self._check_required_one_of(required_one_of) - self._check_required_if(required_if) - self._check_required_by(required_by) - - self._set_defaults(pre=False) - - # deal with options sub-spec - self._handle_options() - - def _handle_aliases(self, spec=None, param=None, option_prefix=""): - if spec is None: - spec = self.argument_spec - if param is None: - param = self.params - - # this uses exceptions as it happens before we can safely call fail_json - alias_warnings = [] - alias_results, self._legal_inputs = ( - handle_aliases( # pylint: disable=used-before-assignment - spec, param, alias_warnings=alias_warnings - ) + self._validator = ArgumentSpecValidator( + self.argument_spec, + self.mutually_exclusive, + self.required_together, + self.required_one_of, + self.required_if, + self.required_by, ) - for option, alias in alias_warnings: - self.warn( - "Both option %s and its alias %s are set." - % (option_prefix + option, option_prefix + alias) - ) + self._validation_result = self._validator.validate(self.params) + self.params.update(self._validation_result.validated_parameters) + self.no_log_values.update(self._validation_result._no_log_values) - deprecated_aliases = [] - for i in spec.keys(): - if "deprecated_aliases" in spec[i].keys(): - for alias in spec[i]["deprecated_aliases"]: - deprecated_aliases.append(alias) + try: + error = self._validation_result.errors[0] + except IndexError: + error = None - for deprecation in deprecated_aliases: - if deprecation["name"] in param.keys(): + # We cannot use ModuleArgumentSpecValidator directly since it uses mechanisms for reporting + # warnings and deprecations that do not work in plugins. This is a copy of that code adjusted + # for our use-case: + for d in self._validation_result._deprecations: + # Before ansible-core 2.14.2, deprecations were always for aliases: + if "name" in d: self.deprecate( - "Alias '%s' is deprecated. See the module docs for more information" - % deprecation["name"], - version=deprecation.get("version"), - date=deprecation.get("date"), - collection_name=deprecation.get("collection_name"), + "Alias '{name}' is deprecated. See the module docs for more information".format( + name=d["name"] + ), + version=d.get("version"), + date=d.get("date"), + collection_name=d.get("collection_name"), + ) + # Since ansible-core 2.14.2, a message is present that can be directly printed: + if "msg" in d: + self.deprecate( + d["msg"], + version=d.get("version"), + date=d.get("date"), + collection_name=d.get("collection_name"), ) - return alias_results - def _handle_no_log_values(self, spec=None, param=None): - if spec is None: - spec = self.argument_spec - if param is None: - param = self.params - - try: - self.no_log_values.update( - list_no_log_values( # pylint: disable=used-before-assignment - spec, param + for w in self._validation_result._warnings: + self.warn( + "Both option {option} and its alias {alias} are set.".format( + option=w["option"], alias=w["alias"] ) ) - except TypeError as te: - self.fail_json( - msg="Failure when processing no_log parameters. Module invocation will be hidden. " - "%s" % to_native(te), - invocation={"module_args": "HIDDEN DUE TO FAILURE"}, - ) - for message in list_deprecations( # pylint: disable=used-before-assignment - spec, param - ): - self.deprecate( - message["msg"], - version=message.get("version"), - date=message.get("date"), - collection_name=message.get("collection_name"), - ) - - def _check_arguments(self, spec=None, param=None, legal_inputs=None): - self._syslog_facility = "LOG_USER" - unsupported_parameters = set() - if spec is None: - spec = self.argument_spec - if param is None: - param = self.params - if legal_inputs is None: - legal_inputs = self._legal_inputs - - for k in list(param.keys()): - - if k not in legal_inputs: - unsupported_parameters.add(k) - - for k in PASS_VARS: - # handle setting internal properties from internal ansible vars - param_key = "_ansible_%s" % k - if param_key in param: - if k in PASS_BOOLS: - setattr(self, PASS_VARS[k][0], self.boolean(param[param_key])) - else: - setattr(self, PASS_VARS[k][0], param[param_key]) - - # clean up internal top level params: - if param_key in self.params: - del self.params[param_key] - else: - # use defaults if not already set - if not hasattr(self, PASS_VARS[k][0]): - setattr(self, PASS_VARS[k][0], PASS_VARS[k][1]) - - if unsupported_parameters: - msg = "Unsupported parameters for (%s) module: %s" % ( - self._name, - ", ".join(sorted(list(unsupported_parameters))), - ) - if self._options_context: - msg += " found in %s." % " -> ".join(self._options_context) - supported_parameters = list() - for key in sorted(spec.keys()): - if "aliases" in spec[key] and spec[key]["aliases"]: - supported_parameters.append( - "%s (%s)" % (key, ", ".join(sorted(spec[key]["aliases"]))) - ) - else: - supported_parameters.append(key) - msg += " Supported parameters include: %s" % ( - ", ".join(supported_parameters) - ) - self.fail_json(msg=msg) - if self.check_mode and not self.supports_check_mode: - self.exit_json( - skipped=True, - msg="action module (%s) does not support check mode" % self._name, - ) - - def _count_terms(self, check, param=None): - if param is None: - param = self.params - return count_terms(check, param) - - def _check_mutually_exclusive(self, spec, param=None): - if param is None: - param = self.params - - try: - check_mutually_exclusive(spec, param) - except TypeError as e: - msg = to_native(e) - if self._options_context: - msg += " found in %s" % " -> ".join(self._options_context) - self.fail_json(msg=msg) - - def _check_required_one_of(self, spec, param=None): - if spec is None: - return - - if param is None: - param = self.params - - try: - check_required_one_of(spec, param) - except TypeError as e: - msg = to_native(e) - if self._options_context: - msg += " found in %s" % " -> ".join(self._options_context) - self.fail_json(msg=msg) - - def _check_required_together(self, spec, param=None): - if spec is None: - return - if param is None: - param = self.params - - try: - check_required_together(spec, param) - except TypeError as e: - msg = to_native(e) - if self._options_context: - msg += " found in %s" % " -> ".join(self._options_context) - self.fail_json(msg=msg) - - def _check_required_by(self, spec, param=None): - if spec is None: - return - if param is None: - param = self.params - - try: - check_required_by(spec, param) - except TypeError as e: - self.fail_json(msg=to_native(e)) - - def _check_required_arguments(self, spec=None, param=None): - if spec is None: - spec = self.argument_spec - if param is None: - param = self.params - - try: - check_required_arguments(spec, param) - except TypeError as e: - msg = to_native(e) - if self._options_context: - msg += " found in %s" % " -> ".join(self._options_context) - self.fail_json(msg=msg) - - def _check_required_if(self, spec, param=None): - """ensure that parameters which conditionally required are present""" - if spec is None: - return - if param is None: - param = self.params - - try: - check_required_if(spec, param) - except TypeError as e: - msg = to_native(e) - if self._options_context: - msg += " found in %s" % " -> ".join(self._options_context) - self.fail_json(msg=msg) - - def _check_argument_values(self, spec=None, param=None): - """ensure all arguments have the requested values, and there are no stray arguments""" - if spec is None: - spec = self.argument_spec - if param is None: - param = self.params - for k, v in spec.items(): - choices = v.get("choices", None) - if choices is None: - continue - if isinstance(choices, SEQUENCETYPE) and not isinstance( - choices, (binary_type, text_type) - ): - if k in param: - # Allow one or more when type='list' param with choices - if isinstance(param[k], list): - diff_list = ", ".join( - [item for item in param[k] if item not in choices] - ) - if diff_list: - choices_str = ", ".join([to_native(c) for c in choices]) - msg = ( - "value of %s must be one or more of: %s. Got no match for: %s" - % (k, choices_str, diff_list) - ) - if self._options_context: - msg += " found in %s" % " -> ".join( - self._options_context - ) - self.fail_json(msg=msg) - elif param[k] not in choices: - # PyYaml converts certain strings to bools. If we can unambiguously convert back, do so before checking - # the value. If we cannot figure this out, module author is responsible. - lowered_choices = None - if param[k] == "False": - lowered_choices = lenient_lowercase(choices) - overlap = BOOLEANS_FALSE.intersection(choices) - if len(overlap) == 1: - # Extract from a set - (param[k],) = overlap - - if param[k] == "True": - if lowered_choices is None: - lowered_choices = lenient_lowercase(choices) - overlap = BOOLEANS_TRUE.intersection(choices) - if len(overlap) == 1: - (param[k],) = overlap - - if param[k] not in choices: - choices_str = ", ".join([to_native(c) for c in choices]) - msg = "value of %s must be one of: %s, got: %s" % ( - k, - choices_str, - param[k], - ) - if self._options_context: - msg += " found in %s" % " -> ".join( - self._options_context - ) - self.fail_json(msg=msg) - else: - msg = "internal error: choices for argument %s are not iterable: %s" % ( - k, - choices, + # Fail for validation errors, even in check mode + if error: + msg = self._validation_result.errors.msg + if isinstance(error, UnsupportedError): + msg = "Unsupported parameters for ({name}) {kind}: {msg}".format( + name=self._name, kind="module", msg=msg ) - if self._options_context: - msg += " found in %s" % " -> ".join(self._options_context) - self.fail_json(msg=msg) + + self.fail_json(msg=msg) def safe_eval(self, value, locals=None, include_exceptions=False): return safe_eval(value, locals, include_exceptions) - def _check_type_str(self, value, param=None, prefix=""): - opts = {"error": False, "warn": False, "ignore": True} - - # Ignore, warn, or error when converting to a string. - allow_conversion = opts.get(C.STRING_CONVERSION_ACTION, True) - try: - return check_type_str(value, allow_conversion) - except TypeError: - common_msg = "quote the entire value to ensure it does not change." - from_msg = "{0!r}".format(value) - to_msg = "{0!r}".format(to_text(value)) - - if param is not None: - if prefix: - param = "{0}{1}".format(prefix, param) - - from_msg = "{0}: {1!r}".format(param, value) - to_msg = "{0}: {1!r}".format(param, to_text(value)) - - if C.STRING_CONVERSION_ACTION == "error": - msg = common_msg.capitalize() - raise TypeError(to_native(msg)) - elif C.STRING_CONVERSION_ACTION == "warn": - msg = ( - 'The value "{0}" (type {1.__class__.__name__}) was converted to "{2}" (type string). ' - "If this does not look like what you expect, {3}" - ).format(from_msg, value, to_msg, common_msg) - self.warn(to_native(msg)) - return to_native(value, errors="surrogate_or_strict") - - def _handle_options(self, argument_spec=None, params=None, prefix=""): - """deal with options to create sub spec""" - if argument_spec is None: - argument_spec = self.argument_spec - if params is None: - params = self.params - - for k, v in argument_spec.items(): - wanted = v.get("type", None) - if wanted == "dict" or ( - wanted == "list" and v.get("elements", "") == "dict" - ): - spec = v.get("options", None) - if v.get("apply_defaults", False): - if spec is not None: - if params.get(k) is None: - params[k] = {} - else: - continue - elif spec is None or k not in params or params[k] is None: - continue - - self._options_context.append(k) - - if isinstance(params[k], dict): - elements = [params[k]] - else: - elements = params[k] - - for idx, param in enumerate(elements): - if not isinstance(param, dict): - self.fail_json( - msg="value of %s must be of type dict or list of dict" % k - ) - - new_prefix = prefix + k - if wanted == "list": - new_prefix += "[%d]" % idx - new_prefix += "." - - self._set_fallbacks(spec, param) - options_aliases = self._handle_aliases( - spec, param, option_prefix=new_prefix - ) - - options_legal_inputs = list(spec.keys()) + list( - options_aliases.keys() - ) - - self._check_arguments(spec, param, options_legal_inputs) - - # check exclusive early - if not self.bypass_checks: - self._check_mutually_exclusive( - v.get("mutually_exclusive", None), param - ) - - self._set_defaults(pre=True, spec=spec, param=param) - - if not self.bypass_checks: - self._check_required_arguments(spec, param) - self._check_argument_types(spec, param, new_prefix) - self._check_argument_values(spec, param) - - self._check_required_together( - v.get("required_together", None), param - ) - self._check_required_one_of( - v.get("required_one_of", None), param - ) - self._check_required_if(v.get("required_if", None), param) - self._check_required_by(v.get("required_by", None), param) - - self._set_defaults(pre=False, spec=spec, param=param) - - # handle multi level options (sub argspec) - self._handle_options(spec, param, new_prefix) - self._options_context.pop() - - def _get_wanted_type(self, wanted, k): - if not callable(wanted): - if wanted is None: - # Mostly we want to default to str. - # For values set to None explicitly, return None instead as - # that allows a user to unset a parameter - wanted = "str" - try: - type_checker = self._CHECK_ARGUMENT_TYPES_DISPATCHER[wanted] - except KeyError: - self.fail_json( - msg="implementation error: unknown type %s requested for %s" - % (wanted, k) - ) - else: - # set the type_checker to the callable, and reset wanted to the callable's name (or type if it does not have one, ala MagicMock) - type_checker = wanted - wanted = getattr(wanted, "__name__", to_native(type(wanted))) - - return type_checker, wanted - - def _handle_elements(self, wanted, param, values): - type_checker, wanted_name = self._get_wanted_type(wanted, param) - validated_params = [] - # Get param name for strings so we can later display this value in a useful error message if needed - # Only pass 'kwargs' to our checkers and ignore custom callable checkers - kwargs = {} - if wanted_name == "str" and isinstance(wanted, string_types): - if isinstance(param, string_types): - kwargs["param"] = param - elif isinstance(param, dict): - kwargs["param"] = list(param.keys())[0] - for value in values: - try: - validated_params.append(type_checker(value, **kwargs)) - except (TypeError, ValueError) as e: - msg = "Elements value for option %s" % param - if self._options_context: - msg += " found in '%s'" % " -> ".join(self._options_context) - msg += " is of type %s and we were unable to convert to %s: %s" % ( - type(value), - wanted_name, - to_native(e), - ) - self.fail_json(msg=msg) - return validated_params - - def _check_argument_types(self, spec=None, param=None, prefix=""): - """ensure all arguments have the requested type""" - - if spec is None: - spec = self.argument_spec - if param is None: - param = self.params - - for k, v in spec.items(): - wanted = v.get("type", None) - if k not in param: - continue - - value = param[k] - if value is None: - continue - - type_checker, wanted_name = self._get_wanted_type(wanted, k) - # Get param name for strings so we can later display this value in a useful error message if needed - # Only pass 'kwargs' to our checkers and ignore custom callable checkers - kwargs = {} - if wanted_name == "str" and isinstance(type_checker, string_types): - kwargs["param"] = list(param.keys())[0] - - # Get the name of the parent key if this is a nested option - if prefix: - kwargs["prefix"] = prefix - - try: - param[k] = type_checker(value, **kwargs) - wanted_elements = v.get("elements", None) - if wanted_elements: - if wanted != "list" or not isinstance(param[k], list): - msg = "Invalid type %s for option '%s'" % (wanted_name, param) - if self._options_context: - msg += " found in '%s'." % " -> ".join( - self._options_context - ) - msg += ( - ", elements value check is supported only with 'list' type" - ) - self.fail_json(msg=msg) - param[k] = self._handle_elements(wanted_elements, k, param[k]) - - except (TypeError, ValueError) as e: - msg = "argument %s is of type %s" % (k, type(value)) - if self._options_context: - msg += " found in '%s'." % " -> ".join(self._options_context) - msg += " and we were unable to convert to %s: %s" % ( - wanted_name, - to_native(e), - ) - self.fail_json(msg=msg) - - def _set_defaults(self, pre=True, spec=None, param=None): - if spec is None: - spec = self.argument_spec - if param is None: - param = self.params - for k, v in spec.items(): - default = v.get("default", None) - if pre is True: - # this prevents setting defaults on required items - if default is not None and k not in param: - param[k] = default - else: - # make sure things without a default still get set None - if k not in param: - param[k] = default - - def _set_fallbacks(self, spec=None, param=None): - if spec is None: - spec = self.argument_spec - if param is None: - param = self.params - - for k, v in spec.items(): - fallback = v.get("fallback", (None,)) - fallback_strategy = fallback[0] - fallback_args = [] - fallback_kwargs = {} - if k not in param and fallback_strategy is not None: - for item in fallback[1:]: - if isinstance(item, dict): - fallback_kwargs = item - else: - fallback_args = item - try: - param[k] = fallback_strategy(*fallback_args, **fallback_kwargs) - except AnsibleFallbackNotFound: - continue - def warn(self, warning): # Copied from ansible.module_utils.common.warnings: if isinstance(warning, string_types):