More work on getting integration tests running for v2

This commit is contained in:
James Cammarata
2015-01-15 01:13:45 -06:00
parent 6326daa34e
commit 02bc014bcd
27 changed files with 414 additions and 187 deletions

View File

@@ -109,15 +109,18 @@ class ResultProcess(multiprocessing.Process):
host_name = result._host.get_name()
# send callbacks, execute other options based on the result status
if result.is_failed():
self._send_result(('host_task_failed', result))
elif result.is_unreachable():
# FIXME: this should all be cleaned up and probably moved to a sub-function.
# the fact that this sometimes sends a TaskResult and other times
# sends a raw dictionary back may be confusing, but the result vs.
# results implementation for tasks with loops should be cleaned up
# better than this
if result.is_unreachable():
self._send_result(('host_unreachable', result))
elif result.is_failed():
self._send_result(('host_task_failed', result))
elif result.is_skipped():
self._send_result(('host_task_skipped', result))
else:
self._send_result(('host_task_ok', result))
# if this task is notifying a handler, do it now
if result._task.notify:
# The shared dictionary for notified handlers is a proxy, which
@@ -125,21 +128,32 @@ class ResultProcess(multiprocessing.Process):
# So, per the docs, we reassign the list so the proxy picks up and
# notifies all other threads
for notify in result._task.notify:
self._send_result(('notify_handler', notify, result._host))
self._send_result(('notify_handler', result._host, notify))
if 'add_host' in result._result:
# this task added a new host (add_host module)
self._send_result(('add_host', result))
elif 'add_group' in result._result:
# this task added a new group (group_by module)
self._send_result(('add_group', result))
elif 'ansible_facts' in result._result:
# if this task is registering facts, do that now
if result._task.action in ('set_fact', 'include_vars'):
for (key, value) in result._result['ansible_facts'].iteritems():
self._send_result(('set_host_var', result._host, key, value))
else:
self._send_result(('set_host_facts', result._host, result._result['ansible_facts']))
if 'results' in result._result:
# this task had a loop, and has more than one result, so
# loop over all of them instead of a single result
result_items = result._result['results']
else:
result_items = [ result._result ]
for result_item in result_items:
if 'add_host' in result_item:
# this task added a new host (add_host module)
self._send_result(('add_host', result_item))
elif 'add_group' in result_item:
# this task added a new group (group_by module)
self._send_result(('add_group', result._host, result_item))
elif 'ansible_facts' in result_item:
# if this task is registering facts, do that now
if result._task.action in ('set_fact', 'include_vars'):
for (key, value) in result_item['ansible_facts'].iteritems():
self._send_result(('set_host_var', result._host, key, value))
else:
self._send_result(('set_host_facts', result._host, result_item['ansible_facts']))
# finally, send the ok for this task
self._send_result(('host_task_ok', result))
# if this task is registering a result, do it now
if result._task.register:

View File

@@ -97,7 +97,7 @@ class WorkerProcess(multiprocessing.Process):
try:
if not self._main_q.empty():
debug("there's work to be done!")
(host, task, basedir, job_vars, connection_info) = self._main_q.get(block=False)
(host, task, basedir, job_vars, connection_info, module_loader) = self._main_q.get(block=False)
debug("got a task/handler to work on: %s" % task)
# because the task queue manager starts workers (forks) before the
@@ -118,7 +118,7 @@ class WorkerProcess(multiprocessing.Process):
# execute the task and build a TaskResult from the result
debug("running TaskExecutor() for %s/%s" % (host, task))
executor_result = TaskExecutor(host, task, job_vars, new_connection_info, self._loader).run()
executor_result = TaskExecutor(host, task, job_vars, new_connection_info, self._loader, module_loader).run()
debug("done running TaskExecutor() for %s/%s" % (host, task))
task_result = TaskResult(host, task, executor_result)

View File

@@ -22,8 +22,10 @@ __metaclass__ = type
from ansible import constants as C
from ansible.errors import AnsibleError, AnsibleParserError
from ansible.executor.connection_info import ConnectionInformation
from ansible.playbook.conditional import Conditional
from ansible.playbook.task import Task
from ansible.plugins import lookup_loader, connection_loader, action_loader
from ansible.utils.listify import listify_lookup_plugin_terms
from ansible.utils.debug import debug
@@ -41,12 +43,13 @@ class TaskExecutor:
class.
'''
def __init__(self, host, task, job_vars, connection_info, loader):
def __init__(self, host, task, job_vars, connection_info, loader, module_loader):
self._host = host
self._task = task
self._job_vars = job_vars
self._connection_info = connection_info
self._loader = loader
self._module_loader = module_loader
def run(self):
'''
@@ -57,6 +60,13 @@ class TaskExecutor:
debug("in run()")
try:
# lookup plugins need to know if this task is executing from
# a role, so that it can properly find files/templates/etc.
roledir = None
if self._task._role:
roledir = self._task._role._role_path
self._job_vars['roledir'] = roledir
items = self._get_loop_items()
if items is not None:
if len(items) > 0:
@@ -84,7 +94,8 @@ class TaskExecutor:
items = None
if self._task.loop and self._task.loop in lookup_loader:
items = lookup_loader.get(self._task.loop, loader=self._loader).run(terms=self._task.loop_args, variables=self._job_vars)
loop_terms = listify_lookup_plugin_terms(terms=self._task.loop_args, variables=self._job_vars, loader=self._loader)
items = lookup_loader.get(self._task.loop, loader=self._loader).run(terms=loop_terms, variables=self._job_vars)
return items
@@ -184,11 +195,10 @@ class TaskExecutor:
# now update them with the registered value, if it is set
if self._task.register:
vars_copy[self._task.register] = result
# now create a pseudo task, and assign the value of the until parameter
# to the when param, so we can use evaluate_conditional()
pseudo_task = Task()
pseudo_task.when = self._task.until
if pseudo_task.evaluate_conditional(vars_copy):
# create a conditional object to evaluate the until condition
cond = Conditional(loader=self._loader)
cond.when = self._task.until
if cond.evaluate_conditional(vars_copy):
break
elif 'failed' not in result and result.get('rc', 0) == 0:
# if the result is not failed, stop trying
@@ -223,7 +233,8 @@ class TaskExecutor:
task=async_task,
connection=self._connection,
connection_info=self._connection_info,
loader=self._loader
loader=self._loader,
module_loader=self._module_loader,
)
time_left = self._task.async
@@ -283,7 +294,8 @@ class TaskExecutor:
task=self._task,
connection=connection,
connection_info=self._connection_info,
loader=self._loader
loader=self._loader,
module_loader=self._module_loader,
)
if not handler:
raise AnsibleError("the handler '%s' was not found" % handler_name)