Reworking v2 play iterator and fixing some other bugs

Still not working quite right:
* dynamic includes are not adding the included tasks yet
* running roles with tags not quite working right
This commit is contained in:
James Cammarata
2015-02-26 09:51:12 -06:00
parent fbc525cfb6
commit 4af2d0a907
28 changed files with 430 additions and 418 deletions

View File

@@ -1,66 +0,0 @@
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from multiprocessing.managers import SyncManager, BaseProxy
from ansible.playbook.handler import Handler
from ansible.playbook.task import Task
from ansible.playbook.play import Play
from ansible.errors import AnsibleError
__all__ = ['AnsibleManager']
class VariableManagerWrapper:
'''
This class simply acts as a wrapper around the VariableManager class,
since manager proxies expect a new object to be returned rather than
any existing one. Using this wrapper, a shared proxy can be created
and an existing VariableManager class assigned to it, which can then
be accessed through the exposed proxy methods.
'''
def __init__(self):
self._vm = None
def get_vars(self, loader, play=None, host=None, task=None):
return self._vm.get_vars(loader=loader, play=play, host=host, task=task)
def set_variable_manager(self, vm):
self._vm = vm
def set_host_variable(self, host, varname, value):
self._vm.set_host_variable(host, varname, value)
def set_host_facts(self, host, facts):
self._vm.set_host_facts(host, facts)
class AnsibleManager(SyncManager):
'''
This is our custom manager class, which exists only so we may register
the new proxy below
'''
pass
AnsibleManager.register(
typeid='VariableManagerWrapper',
callable=VariableManagerWrapper,
)

View File

@@ -26,278 +26,202 @@ from ansible.utils.boolean import boolean
__all__ = ['PlayIterator']
class HostState:
def __init__(self, blocks):
self._blocks = blocks[:]
# the primary running states for the play iteration
ITERATING_SETUP = 0
ITERATING_TASKS = 1
ITERATING_RESCUE = 2
ITERATING_ALWAYS = 3
ITERATING_COMPLETE = 4
self.cur_block = 0
self.cur_regular_task = 0
self.cur_rescue_task = 0
self.cur_always_task = 0
self.cur_role = None
self.run_state = PlayIterator.ITERATING_SETUP
self.fail_state = PlayIterator.FAILED_NONE
self.pending_setup = False
# the failure states for the play iteration
FAILED_NONE = 0
FAILED_SETUP = 1
FAILED_TASKS = 2
FAILED_RESCUE = 3
FAILED_ALWAYS = 4
def get_current_block(self):
return self._blocks[self.cur_block]
class PlayState:
'''
A helper class, which keeps track of the task iteration
state for a given playbook. This is used in the PlaybookIterator
class on a per-host basis.
'''
# FIXME: this class is the representation of a finite state machine,
# so we really should have a well defined state representation
# documented somewhere...
def __init__(self, parent_iterator, host):
'''
Create the initial state, which tracks the running state as well
as the failure state, which are used when executing block branches
(rescue/always)
'''
self._parent_iterator = parent_iterator
self._run_state = ITERATING_SETUP
self._failed_state = FAILED_NONE
self._gather_facts = parent_iterator._play.gather_facts
#self._task_list = parent_iterator._play.compile()
self._task_list = parent_iterator._task_list[:]
self._host = host
self._cur_block = None
self._cur_role = None
self._cur_task_pos = 0
self._cur_rescue_pos = 0
self._cur_always_pos = 0
self._cur_handler_pos = 0
def next(self, peek=False):
'''
Determines and returns the next available task from the playbook,
advancing through the list of plays as it goes. If peek is set to True,
the internal state is not stored.
'''
task = None
# save this locally so that we can peek at the next task
# without updating the internal state of the iterator
run_state = self._run_state
failed_state = self._failed_state
cur_block = self._cur_block
cur_role = self._cur_role
cur_task_pos = self._cur_task_pos
cur_rescue_pos = self._cur_rescue_pos
cur_always_pos = self._cur_always_pos
cur_handler_pos = self._cur_handler_pos
while True:
if run_state == ITERATING_SETUP:
if failed_state == FAILED_SETUP:
run_state = ITERATING_COMPLETE
else:
run_state = ITERATING_TASKS
if self._gather_facts == 'smart' and not self._host.gathered_facts or boolean(self._gather_facts):
self._host.set_gathered_facts(True)
task = Task()
# FIXME: this is not the best way to get this...
task.set_loader(self._parent_iterator._play._loader)
task.action = 'setup'
break
elif run_state == ITERATING_TASKS:
# if there is any failure state besides FAILED_NONE, we should
# change to some other running state
if failed_state != FAILED_NONE or cur_task_pos > len(self._task_list) - 1:
# if there is a block (and there always should be), start running
# the rescue portion if it exists (and if we haven't failed that
# already), or the always portion (if it exists and we didn't fail
# there too). Otherwise, we're done iterating.
if cur_block:
if failed_state != FAILED_RESCUE and cur_block.rescue:
run_state = ITERATING_RESCUE
cur_rescue_pos = 0
elif failed_state != FAILED_ALWAYS and cur_block.always:
run_state = ITERATING_ALWAYS
cur_always_pos = 0
else:
run_state = ITERATING_COMPLETE
else:
run_state = ITERATING_COMPLETE
else:
task = self._task_list[cur_task_pos]
if cur_block is not None and cur_block != task._block:
run_state = ITERATING_ALWAYS
continue
else:
cur_block = task._block
cur_task_pos += 1
# Break out of the while loop now that we have our task
break
elif run_state == ITERATING_RESCUE:
# If we're iterating through the rescue tasks, make sure we haven't
# failed yet. If so, move on to the always block or if not get the
# next rescue task (if one exists)
if failed_state == FAILED_RESCUE or cur_block.rescue is None or cur_rescue_pos > len(cur_block.rescue) - 1:
run_state = ITERATING_ALWAYS
else:
task = cur_block.rescue[cur_rescue_pos]
cur_rescue_pos += 1
break
elif run_state == ITERATING_ALWAYS:
# If we're iterating through the always tasks, make sure we haven't
# failed yet. If so, we're done iterating otherwise get the next always
# task (if one exists)
if failed_state == FAILED_ALWAYS or cur_block.always is None or cur_always_pos > len(cur_block.always) - 1:
cur_block = None
if failed_state == FAILED_ALWAYS or cur_task_pos > len(self._task_list) - 1:
run_state = ITERATING_COMPLETE
else:
run_state = ITERATING_TASKS
else:
task = cur_block.always[cur_always_pos]
cur_always_pos += 1
break
elif run_state == ITERATING_COMPLETE:
# done iterating, return None to signify that
return None
if task._role:
# if we had a current role, mark that role as completed
if cur_role and task._role != cur_role and not peek:
cur_role._completed = True
cur_role = task._role
# if the current role has not had its task run flag set, mark
# clear the completed flag so we can correctly determine if the
# role was run
if not cur_role._had_task_run and not peek:
cur_role._completed = False
# If we're not just peeking at the next task, save the internal state
if not peek:
self._run_state = run_state
self._failed_state = failed_state
self._cur_block = cur_block
self._cur_role = cur_role
self._cur_task_pos = cur_task_pos
self._cur_rescue_pos = cur_rescue_pos
self._cur_always_pos = cur_always_pos
self._cur_handler_pos = cur_handler_pos
return task
def mark_failed(self):
'''
Escalates the failed state relative to the running state.
'''
if self._run_state == ITERATING_SETUP:
self._failed_state = FAILED_SETUP
elif self._run_state == ITERATING_TASKS:
self._failed_state = FAILED_TASKS
elif self._run_state == ITERATING_RESCUE:
self._failed_state = FAILED_RESCUE
elif self._run_state == ITERATING_ALWAYS:
self._failed_state = FAILED_ALWAYS
def add_tasks(self, task_list):
if self._run_state == ITERATING_TASKS:
before = self._task_list[:self._cur_task_pos]
after = self._task_list[self._cur_task_pos:]
self._task_list = before + task_list + after
elif self._run_state == ITERATING_RESCUE:
before = self._cur_block.rescue[:self._cur_rescue_pos]
after = self._cur_block.rescue[self._cur_rescue_pos:]
self._cur_block.rescue = before + task_list + after
elif self._run_state == ITERATING_ALWAYS:
before = self._cur_block.always[:self._cur_always_pos]
after = self._cur_block.always[self._cur_always_pos:]
self._cur_block.always = before + task_list + after
def copy(self):
new_state = HostState(self._blocks)
new_state.cur_block = self.cur_block
new_state.cur_regular_task = self.cur_regular_task
new_state.cur_rescue_task = self.cur_rescue_task
new_state.cur_always_task = self.cur_always_task
new_state.cur_role = self.cur_role
new_state.run_state = self.run_state
new_state.fail_state = self.fail_state
new_state.pending_setup = self.pending_setup
return new_state
class PlayIterator:
# the primary running states for the play iteration
ITERATING_SETUP = 0
ITERATING_TASKS = 1
ITERATING_RESCUE = 2
ITERATING_ALWAYS = 3
ITERATING_COMPLETE = 4
'''
The main iterator class, which keeps the state of the playbook
on a per-host basis using the above PlaybookState class.
'''
# the failure states for the play iteration, which are powers
# of 2 as they may be or'ed together in certain circumstances
FAILED_NONE = 0
FAILED_SETUP = 1
FAILED_TASKS = 2
FAILED_RESCUE = 4
FAILED_ALWAYS = 8
def __init__(self, inventory, play):
self._play = play
self._inventory = inventory
self._host_entries = dict()
self._first_host = None
# FIXME: should we save the post_validated play from below here instead?
self._play = play
# Build the per-host dictionary of playbook states, using a copy
# of the play object so we can post_validate it to ensure any templated
# fields are filled in without modifying the original object, since
# post_validate() saves the templated values.
# FIXME: this is a hacky way of doing this, the iterator should
# instead get the loader and variable manager directly
# as args to __init__
# post validate the play, as we need some fields to be finalized now
# so that we can use them to setup the iterator properly
all_vars = inventory._variable_manager.get_vars(loader=inventory._loader, play=play)
new_play = play.copy()
new_play.post_validate(all_vars, fail_on_undefined=False)
self._task_list = new_play.compile()
self._blocks = new_play.compile()
self._host_states = {}
for host in inventory.get_hosts(new_play.hosts):
if self._first_host is None:
self._first_host = host
self._host_entries[host.get_name()] = PlayState(parent_iterator=self, host=host)
self._host_states[host.name] = HostState(blocks=self._blocks)
# FIXME: remove, probably not required anymore
#def get_next_task(self, peek=False):
# ''' returns the next task for host[0] '''
#
# first_entry = self._host_entries[self._first_host.get_name()]
# if not peek:
# for entry in self._host_entries:
# if entry != self._first_host.get_name():
# target_entry = self._host_entries[entry]
# if target_entry._cur_task_pos == first_entry._cur_task_pos:
# target_entry.next()
# return first_entry.next(peek=peek)
def get_next_task_for_host(self, host, peek=False):
''' fetch the next task for the given host '''
if host.get_name() not in self._host_entries:
def get_host_state(self, host):
try:
return self._host_states[host.name].copy()
except KeyError:
raise AnsibleError("invalid host (%s) specified for playbook iteration" % host)
return self._host_entries[host.get_name()].next(peek=peek)
def get_next_task_for_host(self, host, peek=False, lock_step=True):
s = self.get_host_state(host)
task = None
if s.run_state == self.ITERATING_COMPLETE:
return None
else:
while True:
try:
cur_block = s._blocks[s.cur_block]
except IndexError:
s.run_state = self.ITERATING_COMPLETE
break
if s.run_state == self.ITERATING_SETUP:
s.run_state = self.ITERATING_TASKS
if self._play._gather_facts == 'smart' and not host.gathered_facts or boolean(self._play._gather_facts):
# mark the host as having gathered facts
host.set_gathered_facts(True)
task = Task()
task.action = 'setup'
task.set_loader(self._play._loader)
elif s.run_state == self.ITERATING_TASKS:
# clear the pending setup flag, since we're past that and it didn't fail
if s.pending_setup:
s.pending_setup = False
if s.fail_state & self.FAILED_TASKS == self.FAILED_TASKS:
s.run_state = self.ITERATING_RESCUE
elif s.cur_regular_task >= len(cur_block.block):
s.run_state = self.ITERATING_ALWAYS
else:
task = cur_block.block[s.cur_regular_task]
s.cur_regular_task += 1
break
elif s.run_state == self.ITERATING_RESCUE:
if s.fail_state & self.FAILED_RESCUE == self.FAILED_RESCUE:
s.run_state = self.ITERATING_ALWAYS
elif s.cur_rescue_task >= len(cur_block.rescue):
if len(cur_block.rescue) > 0:
s.fail_state = self.FAILED_NONE
s.run_state = self.ITERATING_ALWAYS
else:
task = cur_block.rescue[s.cur_rescue_task]
s.cur_rescue_task += 1
break
elif s.run_state == self.ITERATING_ALWAYS:
if s.cur_always_task >= len(cur_block.always):
if s.fail_state != self.FAILED_NONE:
s.run_state = self.ITERATING_COMPLETE
break
else:
s.cur_block += 1
s.cur_regular_task = 0
s.cur_rescue_task = 0
s.cur_always_task = 0
s.run_state = self.ITERATING_TASKS
else:
task= cur_block.always[s.cur_always_task]
s.cur_always_task += 1
break
if task and task._role:
# if we had a current role, mark that role as completed
if s.cur_role and task._role != s.cur_role and s.cur_role._had_task_run and not peek:
s.cur_role._completed = True
s.cur_role = task._role
if not peek:
self._host_states[host.name] = s
return (s, task)
def mark_host_failed(self, host):
''' mark the given host as failed '''
if host.get_name() not in self._host_entries:
raise AnsibleError("invalid host (%s) specified for playbook iteration" % host)
s = self.get_host_state(host)
if s.pending_setup:
s.fail_state |= self.FAILED_SETUP
s.run_state = self.ITERATING_COMPLETE
elif s.run_state == self.ITERATING_TASKS:
s.fail_state |= self.FAILED_TASKS
s.run_state = self.ITERATING_RESCUE
elif s.run_state == self.ITERATING_RESCUE:
s.fail_state |= self.FAILED_RESCUE
s.run_state = self.ITERATING_ALWAYS
elif s.run_state == self.ITERATING_ALWAYS:
s.fail_state |= self.FAILED_ALWAYS
s.run_state = self.ITERATING_COMPLETE
self._host_states[host.name] = s
self._host_entries[host.get_name()].mark_failed()
def get_failed_hosts(self):
return dict((host, True) for (host, state) in self._host_states.iteritems() if state.run_state == self.ITERATING_COMPLETE and state.failed_state != self.FAILED_NONE)
def get_original_task(self, task):
def get_original_task(self, host, task):
'''
Finds the task in the task list which matches the UUID of the given task.
The executor engine serializes/deserializes objects as they are passed through
the different processes, and not all data structures are preserved. This method
allows us to find the original task passed into the executor engine.
'''
for t in self._task_list:
if t._uuid == task._uuid:
return t
for block in self._blocks:
if block.block:
for t in block.block:
if t._uuid == task._uuid:
return t
if block.rescue:
for t in block.rescue:
if t._uuid == task._uuid:
return t
if block.always:
for t in block.always:
if t._uuid == task._uuid:
return t
return None
def add_tasks(self, host, task_list):
if host.name not in self._host_entries:
raise AnsibleError("invalid host (%s) specified for playbook iteration (expanding task list)" % host)
def add_tasks(self, task_list):
if self._run_state == self.ITERATING_TASKS:
before = self._task_list[:self._cur_task_pos + self._tasks_added]
after = self._task_list[self._cur_task_pos + self._tasks_added:]
self._task_list = before + task_list + after
elif self._run_state == self.ITERATING_RESCUE:
before = self._cur_block.rescue[:self._cur_rescue_pos + self._tasks_added]
after = self._cur_block.rescue[self._cur_rescue_pos + self._tasks_added:]
self._cur_block.rescue = before + task_list + after
elif self._run_state == self.ITERATING_ALWAYS:
before = self._cur_block.always[:self._cur_always_pos + self._tasks_added]
after = self._cur_block.always[self._cur_always_pos + self._tasks_added:]
self._cur_block.always = before + task_list + after
# set this internal flag now so we know if
self._tasks_added += len(task_list)
self._host_entries[host.name].add_tasks(task_list)

View File

@@ -137,12 +137,13 @@ class ResultProcess(multiprocessing.Process):
result_items = [ result._result ]
for result_item in result_items:
if 'include' in result_item:
include_variables = result_item.get('include_variables', dict())
if 'item' in result_item:
include_variables['item'] = result_item['item']
self._send_result(('include', result._host, result._task, result_item['include'], include_variables))
elif 'add_host' in result_item:
#if 'include' in result_item:
# include_variables = result_item.get('include_variables', dict())
# if 'item' in result_item:
# include_variables['item'] = result_item['item']
# self._send_result(('include', result._host, result._task, result_item['include'], include_variables))
#elif 'add_host' in result_item:
if 'add_host' in result_item:
# this task added a new host (add_host module)
self._send_result(('add_host', result_item))
elif 'add_group' in result_item:

View File

@@ -132,13 +132,14 @@ class TaskExecutor:
res = self._execute(variables=task_vars)
(self._task, tmp_task) = (tmp_task, self._task)
# FIXME: we should be sending back a callback result for each item in the loop here
# now update the result with the item info, and append the result
# to the list of results
res['item'] = item
results.append(res)
# FIXME: we should be sending back a callback result for each item in the loop here
print(res)
return results
def _squash_items(self, items, variables):

View File

@@ -26,7 +26,6 @@ import sys
from ansible.errors import AnsibleError
from ansible.executor.connection_info import ConnectionInformation
#from ansible.executor.manager import AnsibleManager
from ansible.executor.play_iterator import PlayIterator
from ansible.executor.process.worker import WorkerProcess
from ansible.executor.process.result import ResultProcess
@@ -36,7 +35,6 @@ from ansible.utils.debug import debug
__all__ = ['TaskQueueManager']
class TaskQueueManager:
'''
@@ -59,10 +57,6 @@ class TaskQueueManager:
# a special flag to help us exit cleanly
self._terminated = False
# create and start the multiprocessing manager
#self._manager = AnsibleManager()
#self._manager.start()
# this dictionary is used to keep track of notified handlers
self._notified_handlers = dict()