V2 fixing bugs

This commit is contained in:
James Cammarata
2015-03-25 13:51:40 -05:00
parent 79cf7e7292
commit 785c0c0c8c
34 changed files with 505 additions and 426 deletions

View File

@@ -1,43 +0,0 @@
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
class HostLog:
def __init__(self, host):
self.host = host
def add_task_result(self, task_result):
pass
def has_failures(self):
assert False
def has_changes(self):
assert False
def get_tasks(self, are_executed=None, are_changed=None, are_successful=None):
assert False
def get_current_running_task(self)
# atomic decorator likely required?
assert False

View File

@@ -1,29 +0,0 @@
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
class HostLogManager:
def __init__(self):
pass
def get_log_for_host(self, host):
assert False

View File

@@ -20,6 +20,7 @@ from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from ansible.errors import *
from ansible.playbook.block import Block
from ansible.playbook.task import Task
from ansible.utils.boolean import boolean
@@ -38,9 +39,10 @@ class HostState:
self.run_state = PlayIterator.ITERATING_SETUP
self.fail_state = PlayIterator.FAILED_NONE
self.pending_setup = False
self.child_state = None
def __repr__(self):
return "HOST STATE: block=%d, task=%d, rescue=%d, always=%d, role=%s, run_state=%d, fail_state=%d, pending_setup=%s" % (
return "HOST STATE: block=%d, task=%d, rescue=%d, always=%d, role=%s, run_state=%d, fail_state=%d, pending_setup=%s, child state? %s" % (
self.cur_block,
self.cur_regular_task,
self.cur_rescue_task,
@@ -49,6 +51,7 @@ class HostState:
self.run_state,
self.fail_state,
self.pending_setup,
self.child_state,
)
def get_current_block(self):
@@ -64,6 +67,7 @@ class HostState:
new_state.run_state = self.run_state
new_state.fail_state = self.fail_state
new_state.pending_setup = self.pending_setup
new_state.child_state = self.child_state
return new_state
class PlayIterator:
@@ -104,75 +108,35 @@ class PlayIterator:
except KeyError:
raise AnsibleError("invalid host (%s) specified for playbook iteration" % host)
def get_next_task_for_host(self, host, peek=False, lock_step=True):
def get_next_task_for_host(self, host, peek=False):
s = self.get_host_state(host)
task = None
if s.run_state == self.ITERATING_COMPLETE:
return None
else:
while True:
try:
cur_block = s._blocks[s.cur_block]
except IndexError:
s.run_state = self.ITERATING_COMPLETE
break
elif s.run_state == self.ITERATING_SETUP:
s.run_state = self.ITERATING_TASKS
s.pending_setup = True
if self._play.gather_facts == 'smart' and not host._gathered_facts or boolean(self._play.gather_facts):
if not peek:
# mark the host as having gathered facts
host.set_gathered_facts(True)
if s.run_state == self.ITERATING_SETUP:
s.run_state = self.ITERATING_TASKS
if self._play._gather_facts == 'smart' and not host.gathered_facts or boolean(self._play._gather_facts):
# mark the host as having gathered facts
host.set_gathered_facts(True)
task = Task()
task.action = 'setup'
task.args = {}
task.set_loader(self._play._loader)
else:
s.pending_setup = False
task = Task()
task.action = 'setup'
task.set_loader(self._play._loader)
elif s.run_state == self.ITERATING_TASKS:
# clear the pending setup flag, since we're past that and it didn't fail
if s.pending_setup:
s.pending_setup = False
if s.fail_state & self.FAILED_TASKS == self.FAILED_TASKS:
s.run_state = self.ITERATING_RESCUE
elif s.cur_regular_task >= len(cur_block.block):
s.run_state = self.ITERATING_ALWAYS
else:
task = cur_block.block[s.cur_regular_task]
s.cur_regular_task += 1
break
elif s.run_state == self.ITERATING_RESCUE:
if s.fail_state & self.FAILED_RESCUE == self.FAILED_RESCUE:
s.run_state = self.ITERATING_ALWAYS
elif s.cur_rescue_task >= len(cur_block.rescue):
if len(cur_block.rescue) > 0:
s.fail_state = self.FAILED_NONE
s.run_state = self.ITERATING_ALWAYS
else:
task = cur_block.rescue[s.cur_rescue_task]
s.cur_rescue_task += 1
break
elif s.run_state == self.ITERATING_ALWAYS:
if s.cur_always_task >= len(cur_block.always):
if s.fail_state != self.FAILED_NONE:
s.run_state = self.ITERATING_COMPLETE
break
else:
s.cur_block += 1
s.cur_regular_task = 0
s.cur_rescue_task = 0
s.cur_always_task = 0
s.run_state = self.ITERATING_TASKS
else:
task= cur_block.always[s.cur_always_task]
s.cur_always_task += 1
break
if not task:
(s, task) = self._get_next_task_from_state(s, peek=peek)
if task and task._role:
# if we had a current role, mark that role as completed
if s.cur_role and task._role != s.cur_role and s.cur_role._had_task_run and not peek:
s.cur_role._completed = True
s.cur_role = task._role
if not peek:
@@ -180,6 +144,86 @@ class PlayIterator:
return (s, task)
def _get_next_task_from_state(self, state, peek):
task = None
# if we previously encountered a child block and we have a
# saved child state, try and get the next task from there
if state.child_state:
(state.child_state, task) = self._get_next_task_from_state(state.child_state, peek=peek)
if task:
return (state.child_state, task)
else:
state.child_state = None
# try and find the next task, given the current state.
while True:
# try to get the current block from the list of blocks, and
# if we run past the end of the list we know we're done with
# this block
try:
block = state._blocks[state.cur_block]
except IndexError:
state.run_state = self.ITERATING_COMPLETE
return (state, None)
if state.run_state == self.ITERATING_TASKS:
# clear the pending setup flag, since we're past that and it didn't fail
if state.pending_setup:
state.pending_setup = False
if state.fail_state & self.FAILED_TASKS == self.FAILED_TASKS:
state.run_state = self.ITERATING_RESCUE
elif state.cur_regular_task >= len(block.block):
state.run_state = self.ITERATING_ALWAYS
else:
task = block.block[state.cur_regular_task]
state.cur_regular_task += 1
elif state.run_state == self.ITERATING_RESCUE:
if state.fail_state & self.FAILED_RESCUE == self.FAILED_RESCUE:
state.run_state = self.ITERATING_ALWAYS
elif state.cur_rescue_task >= len(block.rescue):
if len(block.rescue) > 0:
state.fail_state = self.FAILED_NONE
state.run_state = self.ITERATING_ALWAYS
else:
task = block.rescue[state.cur_rescue_task]
state.cur_rescue_task += 1
elif state.run_state == self.ITERATING_ALWAYS:
if state.cur_always_task >= len(block.always):
if state.fail_state != self.FAILED_NONE:
state.run_state = self.ITERATING_COMPLETE
else:
state.cur_block += 1
state.cur_regular_task = 0
state.cur_rescue_task = 0
state.cur_always_task = 0
state.run_state = self.ITERATING_TASKS
state.child_state = None
else:
task = block.always[state.cur_always_task]
state.cur_always_task += 1
elif state.run_state == self.ITERATING_COMPLETE:
return (state, None)
# if the current task is actually a child block, we dive into it
if isinstance(task, Block):
state.child_state = HostState(blocks=[task])
state.child_state.run_state = self.ITERATING_TASKS
state.child_state.cur_role = state.cur_role
(state.child_state, task) = self._get_next_task_from_state(state.child_state, peek=peek)
# if something above set the task, break out of the loop now
if task:
break
return (state, task)
def mark_host_failed(self, host):
s = self.get_host_state(host)
if s.pending_setup:
@@ -206,25 +250,41 @@ class PlayIterator:
the different processes, and not all data structures are preserved. This method
allows us to find the original task passed into the executor engine.
'''
def _search_block(block, task):
for t in block.block:
if isinstance(t, Block):
res = _search_block(t, task)
if res:
return res
elif t._uuid == task._uuid:
return t
for t in block.rescue:
if isinstance(t, Block):
res = _search_block(t, task)
if res:
return res
elif t._uuid == task._uuid:
return t
for t in block.always:
if isinstance(t, Block):
res = _search_block(t, task)
if res:
return res
elif t._uuid == task._uuid:
return t
return None
s = self.get_host_state(host)
for block in s._blocks:
if block.block:
for t in block.block:
if t._uuid == task._uuid:
return t
if block.rescue:
for t in block.rescue:
if t._uuid == task._uuid:
return t
if block.always:
for t in block.always:
if t._uuid == task._uuid:
return t
res = _search_block(block, task)
if res:
return res
return None
def add_tasks(self, host, task_list):
s = self.get_host_state(host)
target_block = s._blocks[s.cur_block].copy()
target_block = s._blocks[s.cur_block].copy(exclude_parent=True)
if s.run_state == self.ITERATING_TASKS:
before = target_block.block[:s.cur_regular_task]

View File

@@ -26,6 +26,7 @@ from ansible.errors import *
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.playbook import Playbook
from ansible.utils.color import colorize, hostcolor
from ansible.utils.debug import debug
class PlaybookExecutor:
@@ -70,8 +71,8 @@ class PlaybookExecutor:
for batch in self._get_serialized_batches(new_play):
if len(batch) == 0:
self._tqm._callback.playbook_on_play_start(new_play.name)
self._tqm._callback.playbook_on_no_hosts_matched()
self._tqm.send_callback('v2_playbook_on_play_start', new_play)
self._tqm.send_callback('v2_playbook_on_no_hosts_matched')
result = 0
break
# restrict the inventory to the hosts in the serialized batch
@@ -90,6 +91,36 @@ class PlaybookExecutor:
raise
self._cleanup()
# FIXME: this stat summary stuff should be cleaned up and moved
# to a new method, if it even belongs here...
self._tqm._display.banner("PLAY RECAP")
hosts = sorted(self._tqm._stats.processed.keys())
for h in hosts:
t = self._tqm._stats.summarize(h)
self._tqm._display.display("%s : %s %s %s %s" % (
hostcolor(h, t),
colorize('ok', t['ok'], 'green'),
colorize('changed', t['changed'], 'yellow'),
colorize('unreachable', t['unreachable'], 'red'),
colorize('failed', t['failures'], 'red')),
screen_only=True
)
self._tqm._display.display("%s : %s %s %s %s" % (
hostcolor(h, t, False),
colorize('ok', t['ok'], None),
colorize('changed', t['changed'], None),
colorize('unreachable', t['unreachable'], None),
colorize('failed', t['failures'], None)),
log_only=True
)
self._tqm._display.display("", screen_only=True)
# END STATS STUFF
return result
def _cleanup(self, signum=None, framenum=None):

View File

@@ -0,0 +1,51 @@
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
class AggregateStats:
''' holds stats about per-host activity during playbook runs '''
def __init__(self):
self.processed = {}
self.failures = {}
self.ok = {}
self.dark = {}
self.changed = {}
self.skipped = {}
def increment(self, what, host):
''' helper function to bump a statistic '''
self.processed[host] = 1
prev = (getattr(self, what)).get(host, 0)
getattr(self, what)[host] = prev+1
def summarize(self, host):
''' return information about a particular host '''
return dict(
ok = self.ok.get(host, 0),
failures = self.failures.get(host, 0),
unreachable = self.dark.get(host,0),
changed = self.changed.get(host, 0),
skipped = self.skipped.get(host, 0)
)

View File

@@ -237,10 +237,14 @@ class TaskExecutor:
if self._task.poll > 0:
result = self._poll_async_result(result=result)
# update the local copy of vars with the registered value, if specified
# update the local copy of vars with the registered value, if specified,
# or any facts which may have been generated by the module execution
if self._task.register:
vars_copy[self._task.register] = result
if 'ansible_facts' in result:
vars_copy.update(result['ansible_facts'])
# create a conditional object to evaluate task conditions
cond = Conditional(loader=self._loader)
@@ -266,6 +270,15 @@ class TaskExecutor:
if attempt < retries - 1:
time.sleep(delay)
# do the final update of the local variables here, for both registered
# values and any facts which may have been created
if self._task.register:
variables[self._task.register] = result
if 'ansible_facts' in result:
variables.update(result['ansible_facts'])
# and return
debug("attempt loop complete, returning result")
return result

View File

@@ -29,9 +29,11 @@ from ansible.executor.connection_info import ConnectionInformation
from ansible.executor.play_iterator import PlayIterator
from ansible.executor.process.worker import WorkerProcess
from ansible.executor.process.result import ResultProcess
from ansible.executor.stats import AggregateStats
from ansible.plugins import callback_loader, strategy_loader
from ansible.utils.debug import debug
from ansible.utils.display import Display
__all__ = ['TaskQueueManager']
@@ -53,6 +55,9 @@ class TaskQueueManager:
self._variable_manager = variable_manager
self._loader = loader
self._options = options
self._stats = AggregateStats()
self._display = Display()
# a special flag to help us exit cleanly
self._terminated = False
@@ -66,9 +71,14 @@ class TaskQueueManager:
self._final_q = multiprocessing.Queue()
# FIXME: hard-coded the default callback plugin here, which
# should be configurable.
self._callback = callback_loader.get(callback)
# load all available callback plugins
# FIXME: we need an option to white-list callback plugins
self._callback_plugins = []
for callback_plugin in callback_loader.all(class_only=True):
if hasattr(callback_plugin, 'CALLBACK_VERSION') and callback_plugin.CALLBACK_VERSION >= 2.0:
self._callback_plugins.append(callback_plugin(self._display))
else:
self._callback_plugins.append(callback_plugin())
# create the pool of worker threads, based on the number of forks specified
try:
@@ -131,16 +141,11 @@ class TaskQueueManager:
'''
connection_info = ConnectionInformation(play, self._options)
self._callback.set_connection_info(connection_info)
for callback_plugin in self._callback_plugins:
if hasattr(callback_plugin, 'set_connection_info'):
callback_plugin.set_connection_info(connection_info)
# run final validation on the play now, to make sure fields are templated
# FIXME: is this even required? Everything is validated and merged at the
# task level, so else in the play needs to be templated
#all_vars = self._vmw.get_vars(loader=self._dlw, play=play)
#all_vars = self._vmw.get_vars(loader=self._loader, play=play)
#play.post_validate(all_vars=all_vars)
self._callback.playbook_on_play_start(play.name)
self.send_callback('v2_playbook_on_play_start', play)
# initialize the shared dictionary containing the notified handlers
self._initialize_notified_handlers(play.handlers)
@@ -172,9 +177,6 @@ class TaskQueueManager:
def get_inventory(self):
return self._inventory
def get_callback(self):
return self._callback
def get_variable_manager(self):
return self._variable_manager
@@ -201,3 +203,18 @@ class TaskQueueManager:
def terminate(self):
self._terminated = True
def send_callback(self, method_name, *args, **kwargs):
for callback_plugin in self._callback_plugins:
# a plugin that set self.disabled to True will not be called
# see osx_say.py example for such a plugin
if getattr(callback_plugin, 'disabled', False):
continue
methods = [
getattr(callback_plugin, method_name, None),
getattr(callback_plugin, 'on_any', None)
]
for method in methods:
if method is not None:
method(*args, **kwargs)