Moving more action plugins over and fixing some bugs with role loading

This commit is contained in:
James Cammarata
2015-01-02 07:51:15 -06:00
parent 7f7e9914aa
commit 065733ad93
30 changed files with 1332 additions and 59 deletions

View File

@@ -51,7 +51,7 @@ class ConnectionInformation:
self.sudo_user = ''
self.sudo_pass = ''
self.verbosity = 0
self.only_tags = set()
self.only_tags = set(['all'])
self.skip_tags = set()
if play:
@@ -93,15 +93,19 @@ class ConnectionInformation:
self.connection = options.connection
# get the tag info from options, converting a comma-separated list
# of values into a proper list if need be
if isinstance(options.tags, list):
self.only_tags.update(options.tags)
elif isinstance(options.tags, basestring):
self.only_tags.update(options.tags.split(','))
if isinstance(options.skip_tags, list):
self.skip_tags.update(options.skip_tags)
elif isinstance(options.skip_tags, basestring):
self.skip_tags.update(options.skip_tags.split(','))
# of values into a proper list if need be. We check to see if the
# options have the attribute, as it is not always added via the CLI
if hasattr(options, 'tags'):
if isinstance(options.tags, list):
self.only_tags.update(options.tags)
elif isinstance(options.tags, basestring):
self.only_tags.update(options.tags.split(','))
if hasattr(options, 'skip_tags'):
if isinstance(options.skip_tags, list):
self.skip_tags.update(options.skip_tags)
elif isinstance(options.skip_tags, basestring):
self.skip_tags.update(options.skip_tags.split(','))
def copy(self, ci):
'''

View File

@@ -110,16 +110,12 @@ class ResultProcess(multiprocessing.Process):
# send callbacks, execute other options based on the result status
if result.is_failed():
#self._callback.runner_on_failed(result._task, result)
self._send_result(('host_task_failed', result))
elif result.is_unreachable():
#self._callback.runner_on_unreachable(result._task, result)
self._send_result(('host_unreachable', result))
elif result.is_skipped():
#self._callback.runner_on_skipped(result._task, result)
self._send_result(('host_task_skipped', result))
else:
#self._callback.runner_on_ok(result._task, result)
self._send_result(('host_task_ok', result))
# if this task is notifying a handler, do it now
@@ -131,8 +127,14 @@ class ResultProcess(multiprocessing.Process):
for notify in result._task.notify:
self._send_result(('notify_handler', notify, result._host))
# if this task is registering facts, do that now
if 'ansible_facts' in result._result:
if 'add_host' in result._result:
# this task added a new host (add_host module)
self._send_result(('add_host', result))
elif 'add_group' in result._result:
# this task added a new group (group_by module)
self._send_result(('add_group', result))
elif 'ansible_facts' in result._result:
# if this task is registering facts, do that now
if result._task.action in ('set_fact', 'include_vars'):
for (key, value) in result._result['ansible_facts'].iteritems():
self._send_result(('set_host_var', result._host, key, value))

View File

@@ -74,6 +74,9 @@ class WorkerProcess(multiprocessing.Process):
# using the one that was passed in
pass
if self._new_stdin:
sys.stdin = self._new_stdin
super(WorkerProcess, self).__init__()
def run(self):
@@ -130,7 +133,7 @@ class WorkerProcess(multiprocessing.Process):
debug("WORKER EXCEPTION: %s" % traceback.format_exc())
try:
if task:
task_result = TaskResult(host, task, dict(failed=True, exception=True, stdout=traceback.format_exc()))
task_result = TaskResult(host, task, dict(failed=True, exception=traceback.format_exc(), stdout=''))
self._rslt_q.put(task_result, block=False)
except:
# FIXME: most likely an abort, catch those kinds of errors specifically

View File

@@ -22,6 +22,7 @@ __metaclass__ = type
from ansible import constants as C
from ansible.errors import AnsibleError
from ansible.executor.connection_info import ConnectionInformation
from ansible.playbook.task import Task
from ansible.plugins import lookup_loader, connection_loader, action_loader
from ansible.utils.debug import debug
@@ -110,8 +111,8 @@ class TaskExecutor:
the retry/until and block rescue/always execution
'''
connection = self._get_connection()
handler = self._get_action_handler(connection=connection)
self._connection = self._get_connection()
self._handler = self._get_action_handler(connection=self._connection)
# check to see if this task should be skipped, due to it being a member of a
# role which has already run (and whether that role allows duplicate execution)
@@ -147,8 +148,12 @@ class TaskExecutor:
result['attempts'] = attempt + 1
debug("running the handler")
result = handler.run(task_vars=self._job_vars)
result = self._handler.run(task_vars=self._job_vars)
debug("handler run complete")
if self._task.async > 0 and self._task.poll > 0:
result = self._poll_async_result(result=result)
if self._task.until:
# TODO: implement until logic (pseudo logic follows...)
# if VariableManager.check_conditional(cond, extra_vars=(dict(result=result))):
@@ -164,6 +169,54 @@ class TaskExecutor:
debug("attempt loop complete, returning result")
return result
def _poll_async_result(self, result):
'''
Polls for the specified JID to be complete
'''
# the async_wrapper module returns dumped JSON via its stdout
# response, so we parse it here
try:
async_data = json.loads(result.get('stdout'))
except ValueError, e:
return dict(failed=True, msg="The async task did not return valid JSON: %s" % str(e))
async_jid = async_data.get('ansible_job_id')
if async_jid is None:
return dict(failed=True, msg="No job id was returned by the async task")
# Create a new psuedo-task to run the async_status module, and run
# that (with a sleep for "poll" seconds between each retry) until the
# async time limit is exceeded.
async_task = Task().load(dict(action='async_status jid=%s' % async_jid))
# Because this is an async task, the action handler is async. However,
# we need the 'normal' action handler for the status check, so get it
# now via the action_loader
normal_handler = action_loader.get(
'normal',
task=async_task,
connection=self._connection,
connection_info=self._connection_info,
loader=self._loader
)
time_left = self._task.async
while time_left > 0:
time.sleep(self._task.poll)
async_result = normal_handler.run()
if int(async_result.get('finished', 0)) == 1 or 'failed' in async_result or 'skipped' in async_result:
break
time_left -= self._task.poll
if int(async_result.get('finished', 0)) != 1:
return dict(failed=True, msg="async task did not complete within the requested time")
else:
return async_result
def _get_connection(self):
'''
Reads the connection property for the host, and returns the