Performance improvements

This commit is contained in:
James Cammarata
2016-07-31 03:23:28 -05:00
parent 80385a47bd
commit d2b3b2c03e
16 changed files with 330 additions and 480 deletions

View File

@@ -508,6 +508,12 @@ class PlayIterator:
the different processes, and not all data structures are preserved. This method
allows us to find the original task passed into the executor engine.
'''
if isinstance(task, Task):
the_uuid = task._uuid
else:
the_uuid = task
def _search_block(block):
'''
helper method to check a block's task lists (block/rescue/always)
@@ -521,7 +527,7 @@ class PlayIterator:
res = _search_block(t)
if res:
return res
elif t._uuid == task._uuid:
elif t._uuid == the_uuid:
return t
return None

View File

@@ -1,196 +0,0 @@
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from ansible.compat.six.moves import queue
from ansible.compat.six import iteritems, text_type
from ansible.vars import strip_internal_keys
import multiprocessing
import time
import traceback
# TODO: not needed if we use the cryptography library with its default RNG
# engine
HAS_ATFORK=True
try:
from Crypto.Random import atfork
except ImportError:
HAS_ATFORK=False
try:
from __main__ import display
except ImportError:
from ansible.utils.display import Display
display = Display()
__all__ = ['ResultProcess']
class ResultProcess(multiprocessing.Process):
'''
The result worker thread, which reads results from the results
queue and fires off callbacks/etc. as necessary.
'''
def __init__(self, final_q, workers):
# takes a task queue manager as the sole param:
self._final_q = final_q
self._workers = workers
self._cur_worker = 0
self._terminated = False
super(ResultProcess, self).__init__()
def _send_result(self, result):
display.debug(u"sending result: %s" % ([text_type(x) for x in result],))
self._final_q.put(result)
display.debug("done sending result")
def _read_worker_result(self):
result = None
starting_point = self._cur_worker
while True:
(worker_prc, rslt_q) = self._workers[self._cur_worker]
self._cur_worker += 1
if self._cur_worker >= len(self._workers):
self._cur_worker = 0
try:
if not rslt_q.empty():
display.debug("worker %d has data to read" % self._cur_worker)
result = rslt_q.get()
display.debug("got a result from worker %d: %s" % (self._cur_worker, result))
break
except queue.Empty:
pass
if self._cur_worker == starting_point:
break
return result
def terminate(self):
self._terminated = True
super(ResultProcess, self).terminate()
def run(self):
'''
The main thread execution, which reads from the results queue
indefinitely and sends callbacks/etc. when results are received.
'''
if HAS_ATFORK:
atfork()
while True:
try:
result = self._read_worker_result()
if result is None:
time.sleep(0.005)
continue
# send callbacks for 'non final' results
if '_ansible_retry' in result._result:
self._send_result(('v2_runner_retry', result))
continue
elif '_ansible_item_result' in result._result:
if result.is_failed() or result.is_unreachable():
self._send_result(('v2_runner_item_on_failed', result))
elif result.is_skipped():
self._send_result(('v2_runner_item_on_skipped', result))
else:
self._send_result(('v2_runner_item_on_ok', result))
if 'diff' in result._result:
self._send_result(('v2_on_file_diff', result))
continue
clean_copy = strip_internal_keys(result._result)
if 'invocation' in clean_copy:
del clean_copy['invocation']
# if this task is registering a result, do it now
if result._task.register:
self._send_result(('register_host_var', result._host, result._task, clean_copy))
# send callbacks, execute other options based on the result status
# TODO: this should all be cleaned up and probably moved to a sub-function.
# the fact that this sometimes sends a TaskResult and other times
# sends a raw dictionary back may be confusing, but the result vs.
# results implementation for tasks with loops should be cleaned up
# better than this
if result.is_unreachable():
self._send_result(('host_unreachable', result))
elif result.is_failed():
self._send_result(('host_task_failed', result))
elif result.is_skipped():
self._send_result(('host_task_skipped', result))
else:
if result._task.loop:
# this task had a loop, and has more than one result, so
# loop over all of them instead of a single result
result_items = result._result.get('results', [])
else:
result_items = [ result._result ]
for result_item in result_items:
# if this task is notifying a handler, do it now
if '_ansible_notify' in result_item:
if result.is_changed():
# The shared dictionary for notified handlers is a proxy, which
# does not detect when sub-objects within the proxy are modified.
# So, per the docs, we reassign the list so the proxy picks up and
# notifies all other threads
for notify in result_item['_ansible_notify']:
self._send_result(('notify_handler', result, notify))
if 'add_host' in result_item:
# this task added a new host (add_host module)
self._send_result(('add_host', result_item))
elif 'add_group' in result_item:
# this task added a new group (group_by module)
self._send_result(('add_group', result._host, result_item))
elif 'ansible_facts' in result_item:
# if this task is registering facts, do that now
loop_var = 'item'
if result._task.loop_control:
loop_var = result._task.loop_control.loop_var or 'item'
item = result_item.get(loop_var, None)
if result._task.action == 'include_vars':
for (key, value) in iteritems(result_item['ansible_facts']):
self._send_result(('set_host_var', result._host, result._task, item, key, value))
else:
self._send_result(('set_host_facts', result._host, result._task, item, result_item['ansible_facts']))
# finally, send the ok for this task
self._send_result(('host_task_ok', result))
except queue.Empty:
pass
except (KeyboardInterrupt, SystemExit, IOError, EOFError):
break
except:
# TODO: we should probably send a proper callback here instead of
# simply dumping a stack trace on the screen
traceback.print_exc()
break

View File

@@ -100,6 +100,10 @@ class WorkerProcess(multiprocessing.Process):
signify that they are ready for their next task.
'''
#import cProfile, pstats, StringIO
#pr = cProfile.Profile()
#pr.enable()
if HAS_ATFORK:
atfork()
@@ -120,7 +124,7 @@ class WorkerProcess(multiprocessing.Process):
display.debug("done running TaskExecutor() for %s/%s" % (self._host, self._task))
self._host.vars = dict()
self._host.groups = []
task_result = TaskResult(self._host, self._task, executor_result)
task_result = TaskResult(self._host.name, self._task._uuid, executor_result)
# put the result on the result queue
display.debug("sending task result")
@@ -130,7 +134,7 @@ class WorkerProcess(multiprocessing.Process):
except AnsibleConnectionFailure:
self._host.vars = dict()
self._host.groups = []
task_result = TaskResult(self._host, self._task, dict(unreachable=True))
task_result = TaskResult(self._host.name, self._task._uuid, dict(unreachable=True))
self._rslt_q.put(task_result, block=False)
except Exception as e:
@@ -138,7 +142,7 @@ class WorkerProcess(multiprocessing.Process):
try:
self._host.vars = dict()
self._host.groups = []
task_result = TaskResult(self._host, self._task, dict(failed=True, exception=to_unicode(traceback.format_exc()), stdout=''))
task_result = TaskResult(self._host.name, self._task._uuid, dict(failed=True, exception=to_unicode(traceback.format_exc()), stdout=''))
self._rslt_q.put(task_result, block=False)
except:
display.debug(u"WORKER EXCEPTION: %s" % to_unicode(e))
@@ -146,3 +150,11 @@ class WorkerProcess(multiprocessing.Process):
display.debug("WORKER PROCESS EXITING")
#pr.disable()
#s = StringIO.StringIO()
#sortby = 'time'
#ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
#ps.print_stats()
#with open('worker_%06d.stats' % os.getpid(), 'w') as f:
# f.write(s.getvalue())

View File

@@ -246,7 +246,7 @@ class TaskExecutor:
task_vars[loop_var] = item
try:
tmp_task = self._task.copy()
tmp_task = self._task.copy(exclude_tasks=True)
tmp_play_context = self._play_context.copy()
except AnsibleParserError as e:
results.append(dict(failed=True, msg=to_unicode(e)))
@@ -265,7 +265,7 @@ class TaskExecutor:
res[loop_var] = item
res['_ansible_item_result'] = True
self._rslt_q.put(TaskResult(self._host, self._task, res), block=False)
self._rslt_q.put(TaskResult(self._host.name, self._task._uuid, res), block=False)
results.append(res)
del task_vars[loop_var]
@@ -516,7 +516,7 @@ class TaskExecutor:
result['_ansible_retry'] = True
result['retries'] = retries
display.debug('Retrying task, attempt %d of %d' % (attempt, retries))
self._rslt_q.put(TaskResult(self._host, self._task, result), block=False)
self._rslt_q.put(TaskResult(self._host.name, self._task._uuid, result), block=False)
time.sleep(delay)
else:
if retries > 1:

View File

@@ -26,7 +26,6 @@ import tempfile
from ansible import constants as C
from ansible.errors import AnsibleError
from ansible.executor.play_iterator import PlayIterator
from ansible.executor.process.result import ResultProcess
from ansible.executor.stats import AggregateStats
from ansible.playbook.block import Block
from ansible.playbook.play_context import PlayContext
@@ -81,7 +80,6 @@ class TaskQueueManager:
self._callbacks_loaded = False
self._callback_plugins = []
self._start_at_done = False
self._result_prc = None
# make sure the module path (if specified) is parsed and
# added to the module_loader object
@@ -113,9 +111,6 @@ class TaskQueueManager:
rslt_q = multiprocessing.Queue()
self._workers.append([None, rslt_q])
self._result_prc = ResultProcess(self._final_q, self._workers)
self._result_prc.start()
def _initialize_notified_handlers(self, play):
'''
Clears and initializes the shared notified handlers dict with entries
@@ -299,9 +294,7 @@ class TaskQueueManager:
self._cleanup_processes()
def _cleanup_processes(self):
if self._result_prc:
self._result_prc.terminate()
if hasattr(self, '_workers'):
for (worker_prc, rslt_q) in self._workers:
rslt_q.close()
if worker_prc and worker_prc.is_alive():

View File

@@ -41,7 +41,7 @@ class TaskResult:
def is_skipped(self):
# loop results
if 'results' in self._result and self._task.loop:
if 'results' in self._result:
results = self._result['results']
# Loop tasks are only considered skipped if all items were skipped.
# some squashed results (eg, yum) are not dicts and can't be skipped individually
@@ -62,7 +62,7 @@ class TaskResult:
return self._check_key('unreachable')
def _check_key(self, key):
if self._result.get('results', []) and self._task.loop:
if self._result.get('results', []):
flag = False
for res in self._result.get('results', []):
if isinstance(res, dict):