Bug 1383880: optimize in three phases; r=ahal

In preparation for much more thorough optimization of task-graphs, this
makes a few changes:

 * optimization is split into thre phases, with task removal in one phase
   (following dependency links) and task replacement in the next (in the
   reverse order).
 * optimization uses class instances instead of functions for optimizations;
   this allows different functions for different phases, and also leaves open
   the possibility of composing optimizations.
 * the replacement phase can also support removal; this is when utility tasks
   like symbol uploads can be optimized away iff their parent task is
   optimized.

MozReview-Commit-ID: C5QznNpwqXn

--HG--
extra : rebase_source : f8e65ee3ebb9fb584b00ee0db518b790a9c1b233
This commit is contained in:
Dustin J. Mitchell 2017-08-20 20:00:17 +00:00
parent 553b0d0e71
commit 7b7bcad347
5 changed files with 552 additions and 406 deletions

View File

@ -3,36 +3,28 @@ Optimization
The objective of optimization to remove as many tasks from the graph as
possible, as efficiently as possible, thereby delivering useful results as
quickly as possible. For example, ideally if only a test script is modified in
quickly as possible. For example, ideally if only a test script is modified in
a push, then the resulting graph contains only the corresponding test suite
task.
A task is said to be "optimized" when it is either replaced with an equivalent,
already-existing task, or dropped from the graph entirely.
Optimization Functions
----------------------
Optimization Strategies
-----------------------
During the optimization phase of task-graph generation, each task is optimized
in post-order, meaning that each task's dependencies will be optimized before
the task itself is optimized.
Each task has a single named optimization strategy, and can provide an argument
to that strategy. Each strategy is defined as an ``OptimizationStrategy``
instance in ``taskcluster/taskgraph/optimization.py``.
Each task has a ``task.optimizations`` property describing the optimization
methods that apply. Each is specified as a list of method and arguments. For
Each task has a ``task.optimization`` property describing the optimization
strategy that applies, specified as a dictionary mapping strategy to argument. For
example::
task.optimizations = [
['seta'],
['skip-unless-changed', ['js/**', 'tests/**']],
]
task.optimization = {'skip-unless-changed': ['js/**', 'tests/**']}
These methods are defined in ``taskcluster/taskgraph/optimize.py``. They are
applied in order, and the first to return a success value causes the task to
be optimized.
Each method can return either a taskId (indicating that the given task can be
replaced) or indicate that the task can be optimized away. If a task on which
others depend is optimized away, task-graph generation will fail.
Strategy implementations are shared across all tasks, so they may cache
commonly-used information as instance variables.
Optimizing Target Tasks
-----------------------
@ -40,5 +32,88 @@ Optimizing Target Tasks
In some cases, such as try pushes, tasks in the target task set have been
explicitly requested and are thus excluded from optimization. In other cases,
the target task set is almost the entire task graph, so targetted tasks are
considered for optimization. This behavior is controlled with the
considered for optimization. This behavior is controlled with the
``optimize_target_tasks`` parameter.
.. note:
Because it is a mix of "what the push author wanted" and "what should run
when necessary", try pushes with the old option syntax (``-b do -p all``,
etc.) *do* optimize target tasks. This can cause unexpected results when
requested jobs are optimized away. If those jobs were actually necessary,
then a try push with ``try_task_config.json`` is the solution.
Optimization Process
--------------------
Optimization proceeds in three phases: removing tasks, replacing tasks,
and finally generating a subgraph containing only the remaining tasks.
Assume the following task graph as context for these examples::
TC1 <--\ ,- UP1
, B1 <--- T1a
I1 <-| `- T1b
` B2 <--- T2a
TC2 <--/ |- T2b
`- UP2
Removing Tasks
::::::::::::::
This phase begins with tasks on which nothing depends and follows the
dependency graph backward from there -- right to left in the diagram above. If
a task is not removed, then nothing it depends on will be removed either.
Thus if T1a and T1b are both removed, B1 may be removed as well. But if T2b is
not removed, then B2 may not be removed either.
For each task with no remaining dependencies, the decision whether to remove is
made by calling the optimization strategy's ``should_remove_task`` method. If
this method returns True, the task is removed.
The optimization process takes a ``do_not_optimize`` argument containing a list
of tasks that cannot be removed under any circumstances. This is used to
"force" running specific tasks.
Replacing Tasks
:::::::::::::::
This phase begins with tasks having no dependencies and follows the reversed
dependency graph from there -- left to right in the diagram above. If a task is
not replaced, then anything depending on that task cannot be replaced.
Replacement is generally done on the basis of some hash of the inputs to the
task. In the diagram above, if both TC1 and I1 are replaced with existing
tasks, then B1 is a candidate for replacement. But if TC2 has no replacement,
then replacement of B2 will not be considered.
It is possible to replace a task with nothing. This is similar to optimzing
away, but is useful for utility tasks like UP1. If such a task is considered
for replacement, then all of its dependencies (here, B1) have already been
replaced and there is no utility in running the task and no need for a
replacement task. It is an error for a task on which others depend to be
replaced with nothing.
The ``do_not_optimize`` set applies to task replacement, as does an additional
``existing_tasks`` dictionary which allows the caller to supply as set of
known, pre-existing tasks. This is used for action tasks, for example, where it
contains the entire task-graph generated by the original decision task.
Subgraph Generation
:::::::::::::::::::
The first two phases annotate each task in the existing taskgraph with their
fate: removed, replaced, or retained. The tasks that are replaced also have a
replacement taskId.
The last phase constructs a subgraph containing the retained tasks, and
simultaneously rewrites all dependencies to refer to taskIds instead of labels.
To do so, it assigns a taskId to each retained task and uses the replacement
taskId for all replaced tasks.
The result is an optimized taskgraph with tasks named by taskId instead of
label. At this phase, the edges in the task graph diverge from the
``task.dependencies`` attributes, as the latter may contain dependencies
outside of the taskgraph (for replacement tasks).
As a side-effect, this phase also expands all ``{"task-reference": ".."}``
objects within the task definitions.

View File

@ -1,6 +1,15 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
The objective of optimization is to remove as many tasks from the graph as
possible, as efficiently as possible, thereby delivering useful results as
quickly as possible. For example, ideally if only a test script is modified in
a push, then the resulting graph contains only the corresponding test suite
task.
See ``taskcluster/docs/optimization.rst`` for more information.
"""
from __future__ import absolute_import, print_function, unicode_literals
@ -19,119 +28,154 @@ from slugid import nice as slugid
logger = logging.getLogger(__name__)
_optimizations = {}
def optimize_task_graph(target_task_graph, params, do_not_optimize, existing_tasks=None):
def optimize_task_graph(target_task_graph, params, do_not_optimize,
existing_tasks=None, strategies=None):
"""
Perform task optimization, without optimizing tasks named in
do_not_optimize.
Perform task optimization, returning a taskgraph and a map from label to
assigned taskId, including replacement tasks.
"""
named_links_dict = target_task_graph.graph.named_links_dict()
label_to_taskid = {}
if not existing_tasks:
existing_tasks = {}
# This proceeds in two phases. First, mark all optimized tasks (those
# which will be removed from the graph) as such, including a replacement
# taskId where applicable. Second, generate a new task graph containing
# only the non-optimized tasks, with all task labels resolved to taskIds
# and with task['dependencies'] populated.
annotate_task_graph(target_task_graph=target_task_graph,
params=params,
do_not_optimize=do_not_optimize,
named_links_dict=named_links_dict,
label_to_taskid=label_to_taskid,
existing_tasks=existing_tasks)
return get_subgraph(target_task_graph, named_links_dict, label_to_taskid), label_to_taskid
# instantiate the strategies for this optimization process
if not strategies:
strategies = _make_default_strategies()
optimizations = _get_optimizations(target_task_graph, strategies)
removed_tasks = remove_tasks(
target_task_graph=target_task_graph,
optimizations=optimizations,
params=params,
do_not_optimize=do_not_optimize)
replaced_tasks = replace_tasks(
target_task_graph=target_task_graph,
optimizations=optimizations,
params=params,
do_not_optimize=do_not_optimize,
label_to_taskid=label_to_taskid,
existing_tasks=existing_tasks,
removed_tasks=removed_tasks)
return get_subgraph(
target_task_graph, removed_tasks, replaced_tasks,
label_to_taskid), label_to_taskid
def optimize_task(task, params):
"""
Run the optimization for a given task
"""
if not task.optimization:
return False
opt_type, arg = task.optimization.items()[0]
opt_fn = _optimizations[opt_type]
return opt_fn(task, params, arg)
def _make_default_strategies():
return {
'never': OptimizationStrategy(), # "never" is the default behavior
'index-search': IndexSearch(),
'seta': SETA(),
'skip-unless-changed': SkipUnlessChanged(),
}
def annotate_task_graph(target_task_graph, params, do_not_optimize,
named_links_dict, label_to_taskid, existing_tasks):
"""
Annotate each task in the graph with .optimized (boolean) and .task_id
(possibly None), following the rules for optimization and calling the task
kinds' `optimize_task` method.
As a side effect, label_to_taskid is updated with labels for all optimized
tasks that are replaced with existing tasks.
"""
# set .optimized for all tasks, and .task_id for optimized tasks
# with replacements
opt_counts = defaultdict(lambda: {'away': 0, 'replaced': 0})
for label in target_task_graph.graph.visit_postorder():
def _get_optimizations(target_task_graph, strategies):
def optimizations(label):
task = target_task_graph.tasks[label]
named_task_dependencies = named_links_dict.get(label, {})
# check whether any dependencies have been optimized away
dependencies = [target_task_graph.tasks[l] for l in named_task_dependencies.itervalues()]
for t in dependencies:
if t.optimized and not t.task_id:
raise Exception(
"task {} was optimized away, but {} depends on it".format(
t.label, label))
# if this task is blacklisted, don't even consider optimizing
replacement_task_id = None
opt_by = None
if label in do_not_optimize:
optimized = False
# Let's check whether this task has been created before
elif existing_tasks is not None and label in existing_tasks:
optimized = True
replacement_task_id = existing_tasks[label]
opt_by = "existing_tasks"
# otherwise, examine the task itself (which may be an expensive operation)
if task.optimization:
opt_by, arg = task.optimization.items()[0]
return (opt_by, strategies[opt_by], arg)
else:
opt_result = optimize_task(task, params)
# use opt_result to determine values for optimized, replacement_task_id
optimized = bool(opt_result)
if optimized:
opt_by = task.optimization.keys()[0]
replacement_task_id = opt_result if opt_result is not True else None
task.optimized = optimized
task.task_id = replacement_task_id
if replacement_task_id:
label_to_taskid[label] = replacement_task_id
if optimized:
if replacement_task_id:
opt_counts[opt_by]['replaced'] += 1
logger.debug("optimizing `{}`, replacing with task `{}`"
.format(label, replacement_task_id))
else:
opt_counts[opt_by]['away'] += 1
logger.debug("optimizing `{}` away".format(label))
# note: any dependent tasks will fail when they see this
for opt_by in sorted(opt_counts):
counts = opt_counts[opt_by]
if counts['away'] and not counts['replaced']:
msg = "optimized away {} tasks for {}: ".format(counts['away'], opt_by)
elif counts['replaced'] and not counts['away']:
msg = "optimized {} tasks, replacing with other tasks, for {}: ".format(
counts['away'], opt_by)
else:
msg = "optimized {} tasks for {}, replacing {} and optimizing {} away".format(
sum(counts.values()), opt_by, counts['replaced'], counts['away'])
logger.info(msg)
return ('never', strategies['never'], None)
return optimizations
def get_subgraph(annotated_task_graph, named_links_dict, label_to_taskid):
def _log_optimization(verb, opt_counts):
if opt_counts:
logger.info(
'{} '.format(verb.title()) +
', '.join(
'{} tasks by {}'.format(c, b)
for b, c in sorted(opt_counts.iteritems())) +
' during optimization.')
else:
logger.info('No tasks {} during optimization'.format(verb))
def remove_tasks(target_task_graph, params, optimizations, do_not_optimize):
"""
Return the subgraph of annotated_task_graph consisting only of
Implement the "Removing Tasks" phase, returning a set of task labels of all removed tasks.
"""
opt_counts = defaultdict(int)
removed = set()
reverse_links_dict = target_task_graph.graph.reverse_links_dict()
for label in target_task_graph.graph.visit_preorder():
# if we're not allowed to optimize, that's easy..
if label in do_not_optimize:
continue
# if there are remaining tasks depending on this one, do not remove..
if any(l not in removed for l in reverse_links_dict[label]):
continue
# call the optimization strategy
task = target_task_graph.tasks[label]
opt_by, opt, arg = optimizations(label)
if opt.should_remove_task(task, params, arg):
removed.add(label)
opt_counts[opt_by] += 1
continue
_log_optimization('removed', opt_counts)
return removed
def replace_tasks(target_task_graph, params, optimizations, do_not_optimize,
label_to_taskid, removed_tasks, existing_tasks):
"""
Implement the "Replacing Tasks" phase, returning a set of task labels of
all replaced tasks. The replacement taskIds are added to label_to_taskid as
a side-effect.
"""
opt_counts = defaultdict(int)
replaced = set()
links_dict = target_task_graph.graph.links_dict()
for label in target_task_graph.graph.visit_postorder():
# if we're not allowed to optimize, that's easy..
if label in do_not_optimize:
continue
# if this task depends on un-replaced, un-removed tasks, do not replace
if any(l not in replaced and l not in removed_tasks for l in links_dict[label]):
continue
# if the task already exists, that's an easy replacement
repl = existing_tasks.get(label)
if repl:
label_to_taskid[label] = repl
replaced.add(label)
opt_counts['existing_tasks'] += 1
continue
# call the optimization strategy
task = target_task_graph.tasks[label]
opt_by, opt, arg = optimizations(label)
repl = opt.should_replace_task(task, params, arg)
if repl:
if repl is True:
# True means remove this task; get_subgraph will catch any
# problems with removed tasks being depended on
removed_tasks.add(label)
else:
label_to_taskid[label] = repl
replaced.add(label)
opt_counts[opt_by] += 1
continue
_log_optimization('replaced', opt_counts)
return replaced
def get_subgraph(target_task_graph, removed_tasks, replaced_tasks, label_to_taskid):
"""
Return the subgraph of target_task_graph consisting only of
non-optimized tasks and edges between them.
To avoid losing track of taskIds for tasks optimized away, this method
@ -140,94 +184,119 @@ def get_subgraph(annotated_task_graph, named_links_dict, label_to_taskid):
taskIds. Task references are resolved in the process.
"""
# check for any dependency edges from included to removed tasks
bad_edges = [(l, r, n) for l, r, n in target_task_graph.graph.edges
if l not in removed_tasks and r in removed_tasks]
if bad_edges:
probs = ', '.join('{} depends on {} as {} but it has been removed'.format(l, r, n)
for l, r, n in bad_edges)
raise Exception("Optimization error: " + probs)
# fill in label_to_taskid for anything not removed or replaced
assert replaced_tasks <= set(label_to_taskid)
for label in sorted(target_task_graph.graph.nodes - removed_tasks - set(label_to_taskid)):
label_to_taskid[label] = slugid()
# resolve labels to taskIds and populate task['dependencies']
tasks_by_taskid = {}
for label in annotated_task_graph.graph.visit_postorder():
task = annotated_task_graph.tasks[label]
if task.optimized:
named_links_dict = target_task_graph.graph.named_links_dict()
omit = removed_tasks | replaced_tasks
for label, task in target_task_graph.tasks.iteritems():
if label in omit:
continue
task.task_id = label_to_taskid[label] = slugid()
task.task_id = label_to_taskid[label]
named_task_dependencies = {
name: label_to_taskid[label]
for name, label in named_links_dict.get(label, {}).iteritems()}
name: label_to_taskid[label]
for name, label in named_links_dict.get(label, {}).iteritems()}
task.task = resolve_task_references(task.label, task.task, named_task_dependencies)
task.task.setdefault('dependencies', []).extend(named_task_dependencies.itervalues())
deps = task.task.setdefault('dependencies', [])
deps.extend(sorted(named_task_dependencies.itervalues()))
tasks_by_taskid[task.task_id] = task
# resolve edges to taskIds
edges_by_taskid = (
(label_to_taskid.get(left), label_to_taskid.get(right), name)
for (left, right, name) in annotated_task_graph.graph.edges
)
# ..and drop edges that are no longer in the task graph
for (left, right, name) in target_task_graph.graph.edges
)
# ..and drop edges that are no longer entirely in the task graph
# (note that this omits edges to replaced tasks, but they are still in task.dependnecies)
edges_by_taskid = set(
(left, right, name)
for (left, right, name) in edges_by_taskid
if left in tasks_by_taskid and right in tasks_by_taskid
)
)
return TaskGraph(
tasks_by_taskid,
Graph(set(tasks_by_taskid), edges_by_taskid))
def optimization(name):
def wrap(func):
if name in _optimizations:
raise Exception("multiple optimizations with name {}".format(name))
_optimizations[name] = func
return func
return wrap
class OptimizationStrategy(object):
def should_remove_task(self, task, params, arg):
"""Determine whether to optimize this task by removing it. Returns
True to remove."""
return False
@optimization('index-search')
def opt_index_search(task, params, index_paths):
for index_path in index_paths:
try:
task_id = find_task_id(
index_path,
use_proxy=bool(os.environ.get('TASK_ID')))
return task_id
except requests.exceptions.HTTPError:
# 404 will end up here and go on to the next index path
pass
return False
@optimization('seta')
def opt_seta(task, params, _):
bbb_task = False
# for bbb tasks we need to send in the buildbot buildername
if task.task.get('provisionerId', '') == 'buildbot-bridge':
label = task.task.get('payload').get('buildername')
bbb_task = True
else:
label = task.label
# we would like to return 'False, None' while it's high_value_task
# and we wouldn't optimize it. Otherwise, it will return 'True, None'
if is_low_value_task(label,
params.get('project'),
params.get('pushlog_id'),
params.get('pushdate'),
bbb_task):
# Always optimize away low-value tasks
return True
else:
def should_replace_task(self, task, params, arg):
"""Determine whether to optimize this task by replacing it. Returns a
taskId to replace this task, True to replace with nothing, or False to
keep the task."""
return False
@optimization('skip-unless-changed')
def opt_files_changed(task, params, file_patterns):
# pushlog_id == -1 - this is the case when run from a cron.yml job
if params.get('pushlog_id') == -1:
class IndexSearch(OptimizationStrategy):
def should_remove_task(self, task, params, index_paths):
"If this task has no dependencies, don't run it.."
return True
changed = files_changed.check(params, file_patterns)
if not changed:
logger.debug('no files found matching a pattern in `skip-unless-changed` for ' +
task.label)
return True
return False
def should_replace_task(self, task, params, index_paths):
"Look for a task with one of the given index paths"
for index_path in index_paths:
try:
task_id = find_task_id(
index_path,
use_proxy=bool(os.environ.get('TASK_ID')))
return task_id
except requests.exceptions.HTTPError:
# 404 will end up here and go on to the next index path
pass
return False
class SETA(OptimizationStrategy):
def should_remove_task(self, task, params, _):
bbb_task = False
# for bbb tasks we need to send in the buildbot buildername
if task.task.get('provisionerId', '') == 'buildbot-bridge':
label = task.task.get('payload').get('buildername')
bbb_task = True
else:
label = task.label
# we would like to return 'False, None' while it's high_value_task
# and we wouldn't optimize it. Otherwise, it will return 'True, None'
if is_low_value_task(label,
params.get('project'),
params.get('pushlog_id'),
params.get('pushdate'),
bbb_task):
# Always optimize away low-value tasks
return True
else:
return False
class SkipUnlessChanged(OptimizationStrategy):
def should_remove_task(self, task, params, file_patterns):
# pushlog_id == -1 - this is the case when run from a cron.yml job
if params.get('pushlog_id') == -1:
return False
changed = files_changed.check(params, file_patterns)
if not changed:
logger.debug('no files found matching a pattern in `skip-unless-changed` for ' +
task.label)
return True
return False

View File

@ -20,7 +20,6 @@ class Task(object):
And later, as the task-graph processing proceeds:
- task_id -- TaskCluster taskId under which this task will be created
- optimized -- true if this task need not be performed
This class is just a convenience wraper for the data type and managing
display, comparison, serialization, etc. It has no functionality of its own.
@ -33,7 +32,6 @@ class Task(object):
self.task = task
self.task_id = None
self.optimized = False
self.attributes['kind'] = kind

View File

@ -6,74 +6,42 @@ from __future__ import absolute_import, print_function, unicode_literals
import unittest
from taskgraph.optimize import optimize_task_graph, resolve_task_references, optimization
from taskgraph.optimize import annotate_task_graph, get_subgraph
from taskgraph import optimize
from taskgraph.taskgraph import TaskGraph
from taskgraph import graph
from taskgraph.task import Task
from mozunit import main
from slugid import nice as slugid
class TestResolveTaskReferences(unittest.TestCase):
class Remove(optimize.OptimizationStrategy):
def do(self, input, output):
taskid_for_edge_name = {'edge%d' % n: 'tid%d' % n for n in range(1, 4)}
self.assertEqual(resolve_task_references('subject', input, taskid_for_edge_name), output)
def should_remove_task(self, task, params, arg):
return True
def test_in_list(self):
"resolve_task_references resolves task references in a list"
self.do({'in-a-list': ['stuff', {'task-reference': '<edge1>'}]},
{'in-a-list': ['stuff', 'tid1']})
def test_in_dict(self):
"resolve_task_references resolves task references in a dict"
self.do({'in-a-dict': {'stuff': {'task-reference': '<edge2>'}}},
{'in-a-dict': {'stuff': 'tid2'}})
class Replace(optimize.OptimizationStrategy):
def test_multiple(self):
"resolve_task_references resolves multiple references in the same string"
self.do({'multiple': {'task-reference': 'stuff <edge1> stuff <edge2> after'}},
{'multiple': 'stuff tid1 stuff tid2 after'})
def test_embedded(self):
"resolve_task_references resolves ebmedded references"
self.do({'embedded': {'task-reference': 'stuff before <edge3> stuff after'}},
{'embedded': 'stuff before tid3 stuff after'})
def test_escaping(self):
"resolve_task_references resolves escapes in task references"
self.do({'escape': {'task-reference': '<<><edge3>>'}},
{'escape': '<tid3>'})
def test_invalid(self):
"resolve_task_references raises a KeyError on reference to an invalid task"
self.assertRaisesRegexp(
KeyError,
"task 'subject' has no dependency named 'no-such'",
lambda: resolve_task_references('subject', {'task-reference': '<no-such>'}, {})
)
def should_replace_task(self, task, params, taskid):
return taskid
class TestOptimize(unittest.TestCase):
kind = None
strategies = {
'never': optimize.OptimizationStrategy(),
'remove': Remove(),
'replace': Replace(),
}
@classmethod
def setUpClass(cls):
# set up some simple optimization functions
optimization('no-optimize')(lambda self, params, arg: False)
optimization('optimize-away')(lambda self, params, arg: True)
optimization('optimize-to-task')(lambda self, params, task: task)
def make_task(self, label, optimization=None, task_def=None, optimized=None, task_id=None):
def make_task(self, label, optimization=None, task_def=None, optimized=None,
task_id=None, dependencies=None):
task_def = task_def or {'sample': 'task-def'}
task = Task(kind='test', label=label, attributes={}, task=task_def)
task.optimized = optimized
if optimization:
task.optimization = optimization
else:
task.optimization = None
task.optimization = optimization
task.task_id = task_id
if dependencies is not None:
task.task['dependencies'] = sorted(dependencies)
return task
def make_graph(self, *tasks_and_edges):
@ -81,168 +49,182 @@ class TestOptimize(unittest.TestCase):
edges = {e for e in tasks_and_edges if not isinstance(e, Task)}
return TaskGraph(tasks, graph.Graph(set(tasks), edges))
def assert_annotations(self, graph, **annotations):
def repl(task_id):
return 'SLUGID' if task_id and len(task_id) == 22 else task_id
got_annotations = {
t.label: repl(t.task_id) or t.optimized for t in graph.tasks.itervalues()
}
self.assertEqual(got_annotations, annotations)
def make_opt_graph(self, *tasks_and_edges):
tasks = {t.task_id: t for t in tasks_and_edges if isinstance(t, Task)}
edges = {e for e in tasks_and_edges if not isinstance(e, Task)}
return TaskGraph(tasks, graph.Graph(set(tasks), edges))
def test_annotate_task_graph_no_optimize(self):
"annotating marks everything as un-optimized if the kind returns that"
graph = self.make_graph(
self.make_task('task1', {'no-optimize': []}),
self.make_task('task2', {'no-optimize': []}),
self.make_task('task3', {'no-optimize': []}),
('task2', 'task1', 'build'),
('task2', 'task3', 'image'),
)
annotate_task_graph(graph, {}, set(), graph.graph.named_links_dict(), {}, None)
self.assert_annotations(
def make_triangle(self, **opts):
"""
Make a "triangle" graph like this:
t1 <-------- t3
`---- t2 --'
"""
return self.make_graph(
self.make_task('t1', opts.get('t1')),
self.make_task('t2', opts.get('t2')),
self.make_task('t3', opts.get('t3')),
('t3', 't2', 'dep'),
('t3', 't1', 'dep2'),
('t2', 't1', 'dep'))
def assert_remove_tasks(self, graph, exp_removed, do_not_optimize=set()):
got_removed = optimize.remove_tasks(
target_task_graph=graph,
optimizations=optimize._get_optimizations(graph, self.strategies),
params={},
do_not_optimize=do_not_optimize)
self.assertEqual(got_removed, exp_removed)
def test_remove_tasks_never(self):
"A graph full of optimization=never has nothing removed"
graph = self.make_triangle()
self.assert_remove_tasks(graph, set())
def test_remove_tasks_all(self):
"A graph full of optimization=remove has removes everything"
graph = self.make_triangle(
t1={'remove': None},
t2={'remove': None},
t3={'remove': None})
self.assert_remove_tasks(graph, {'t1', 't2', 't3'})
def test_remove_tasks_blocked(self):
"Removable tasks that are depended on by non-removable tasks are not removed"
graph = self.make_triangle(
t1={'remove': None},
t3={'remove': None})
self.assert_remove_tasks(graph, {'t3'})
def test_remove_tasks_do_not_optimize(self):
"Removable tasks that are marked do_not_optimize are not removed"
graph = self.make_triangle(
t1={'remove': None},
t2={'remove': None}, # but do_not_optimize
t3={'remove': None})
self.assert_remove_tasks(graph, {'t3'}, do_not_optimize={'t2'})
def assert_replace_tasks(self, graph, exp_replaced, exp_removed=set(), exp_label_to_taskid={},
do_not_optimize=None, label_to_taskid=None, removed_tasks=None,
existing_tasks=None):
do_not_optimize = do_not_optimize or set()
label_to_taskid = label_to_taskid or {}
removed_tasks = removed_tasks or set()
existing_tasks = existing_tasks or {}
got_replaced = optimize.replace_tasks(
target_task_graph=graph,
optimizations=optimize._get_optimizations(graph, self.strategies),
params={},
do_not_optimize=do_not_optimize,
label_to_taskid=label_to_taskid,
removed_tasks=removed_tasks,
existing_tasks=existing_tasks)
self.assertEqual(got_replaced, exp_replaced)
self.assertEqual(removed_tasks, exp_removed)
self.assertEqual(label_to_taskid, exp_label_to_taskid)
def test_replace_tasks_never(self):
"No tasks are replaced when strategy is 'never'"
graph = self.make_triangle()
self.assert_replace_tasks(graph, set())
def test_replace_tasks_all(self):
"All replacable tasks are replaced when strategy is 'replace'"
graph = self.make_triangle(
t1={'replace': 'e1'},
t2={'replace': 'e2'},
t3={'replace': 'e3'})
self.assert_replace_tasks(
graph,
task1=False,
task2=False,
task3=False
)
exp_replaced={'t1', 't2', 't3'},
exp_label_to_taskid={'t1': 'e1', 't2': 'e2', 't3': 'e3'})
def test_annotate_task_graph_optimize_away_dependency(self):
"raises exception if kind optimizes away a task on which another depends"
graph = self.make_graph(
self.make_task('task1', {'optimize-away': []}),
self.make_task('task2', {'no-optimize': []}),
('task2', 'task1', 'build'),
)
self.assertRaises(
Exception,
lambda: annotate_task_graph(graph, {}, set(), graph.graph.named_links_dict(), {}, None)
)
def test_annotate_task_graph_do_not_optimize(self):
"annotating marks everything as un-optimized if in do_not_optimize"
graph = self.make_graph(
self.make_task('task1', {'optimize-away': True}),
self.make_task('task2', {'optimize-away': True}),
('task2', 'task1', 'build'),
)
label_to_taskid = {}
annotate_task_graph(graph, {}, {'task1', 'task2'},
graph.graph.named_links_dict(), label_to_taskid, None)
self.assert_annotations(
def test_replace_tasks_blocked(self):
"A task cannot be replaced if it depends on one that was not replaced"
graph = self.make_triangle(
t1={'replace': 'e1'},
t3={'replace': 'e3'})
self.assert_replace_tasks(
graph,
task1=False,
task2=False
)
self.assertEqual
exp_replaced={'t1'},
exp_label_to_taskid={'t1': 'e1'})
def test_annotate_task_graph_nos_do_not_propagate(self):
"a task with a non-optimized dependency can be optimized"
graph = self.make_graph(
self.make_task('task1', {'no-optimize': []}),
self.make_task('task2', {'optimize-to-task': 'taskid'}),
self.make_task('task3', {'optimize-to-task': 'taskid'}),
('task2', 'task1', 'build'),
('task2', 'task3', 'image'),
)
annotate_task_graph(graph, {}, set(),
graph.graph.named_links_dict(), {}, None)
self.assert_annotations(
def test_replace_tasks_do_not_optimize(self):
"A task cannot be replaced if it depends on one that was not replaced"
graph = self.make_triangle(
t1={'replace': 'e1'},
t2={'replace': 'xxx'}, # but do_not_optimize
t3={'replace': 'e3'})
self.assert_replace_tasks(
graph,
task1=False,
task2='taskid',
task3='taskid'
)
exp_replaced={'t1'},
exp_label_to_taskid={'t1': 'e1'},
do_not_optimize={'t2'})
def test_get_subgraph_single_dep(self):
"when a single dependency is optimized, it is omitted from the graph"
graph = self.make_graph(
self.make_task('task1', optimized=True, task_id='dep1'),
self.make_task('task2', optimized=False),
self.make_task('task3', optimized=False),
('task2', 'task1', 'build'),
('task2', 'task3', 'image'),
)
label_to_taskid = {'task1': 'dep1'}
sub = get_subgraph(graph, graph.graph.named_links_dict(), label_to_taskid)
task2 = label_to_taskid['task2']
task3 = label_to_taskid['task3']
self.assertEqual(sub.graph.nodes, {task2, task3})
self.assertEqual(sub.graph.edges, {(task2, task3, 'image')})
self.assertEqual(sub.tasks[task2].task_id, task2)
self.assertEqual(sorted(sub.tasks[task2].task['dependencies']),
sorted([task3, 'dep1']))
self.assertEqual(sub.tasks[task3].task_id, task3)
self.assertEqual(sorted(sub.tasks[task3].task['dependencies']), [])
def test_replace_tasks_removed(self):
"A task can be replaced with nothing"
graph = self.make_triangle(
t1={'replace': 'e1'},
t2={'replace': True},
t3={'replace': True})
self.assert_replace_tasks(
graph,
exp_replaced={'t1'},
exp_removed={'t2', 't3'},
exp_label_to_taskid={'t1': 'e1'})
def test_get_subgraph_dep_chain(self):
"when a dependency chain is optimized, it is omitted from the graph"
graph = self.make_graph(
self.make_task('task1', optimized=True, task_id='dep1'),
self.make_task('task2', optimized=True, task_id='dep2'),
self.make_task('task3', optimized=False),
('task2', 'task1', 'build'),
('task3', 'task2', 'image'),
)
label_to_taskid = {'task1': 'dep1', 'task2': 'dep2'}
sub = get_subgraph(graph, graph.graph.named_links_dict(), label_to_taskid)
task3 = label_to_taskid['task3']
self.assertEqual(sub.graph.nodes, {task3})
self.assertEqual(sub.graph.edges, set())
self.assertEqual(sub.tasks[task3].task_id, task3)
self.assertEqual(sorted(sub.tasks[task3].task['dependencies']), ['dep2'])
def assert_subgraph(self, graph, removed_tasks, replaced_tasks,
label_to_taskid, exp_subgraph, exp_label_to_taskid):
self.maxDiff = None
optimize.slugid = ('tid{}'.format(i) for i in xrange(1, 10)).next
try:
got_subgraph = optimize.get_subgraph(graph, removed_tasks,
replaced_tasks, label_to_taskid)
finally:
optimize.slugid = slugid
self.assertEqual(got_subgraph.graph, exp_subgraph.graph)
self.assertEqual(got_subgraph.tasks, exp_subgraph.tasks)
self.assertEqual(label_to_taskid, exp_label_to_taskid)
def test_get_subgraph_opt_away(self):
"when a leaf task is optimized away, it is omitted from the graph"
graph = self.make_graph(
self.make_task('task1', optimized=False),
self.make_task('task2', optimized=True),
('task2', 'task1', 'build'),
)
label_to_taskid = {'task2': 'dep2'}
sub = get_subgraph(graph, graph.graph.named_links_dict(), label_to_taskid)
task1 = label_to_taskid['task1']
self.assertEqual(sub.graph.nodes, {task1})
self.assertEqual(sub.graph.edges, set())
self.assertEqual(sub.tasks[task1].task_id, task1)
self.assertEqual(sorted(sub.tasks[task1].task['dependencies']), [])
def test_get_subgraph_no_change(self):
"get_subgraph returns a similarly-shaped subgraph when nothing is removed"
graph = self.make_triangle()
self.assert_subgraph(
graph, set(), set(), {},
self.make_opt_graph(
self.make_task('t1', task_id='tid1', dependencies={}),
self.make_task('t2', task_id='tid2', dependencies={'tid1'}),
self.make_task('t3', task_id='tid3', dependencies={'tid1', 'tid2'}),
('tid3', 'tid2', 'dep'),
('tid3', 'tid1', 'dep2'),
('tid2', 'tid1', 'dep')),
{'t1': 'tid1', 't2': 'tid2', 't3': 'tid3'})
def test_get_subgraph_refs_resolved(self):
"get_subgraph resolves task references"
graph = self.make_graph(
self.make_task('task1', optimized=True, task_id='dep1'),
self.make_task(
'task2',
optimized=False,
task_def={'payload': {'task-reference': 'http://<build>/<test>'}}
),
('task2', 'task1', 'build'),
('task2', 'task3', 'test'),
self.make_task('task3', optimized=False),
)
label_to_taskid = {'task1': 'dep1'}
sub = get_subgraph(graph, graph.graph.named_links_dict(), label_to_taskid)
task2 = label_to_taskid['task2']
task3 = label_to_taskid['task3']
self.assertEqual(sub.graph.nodes, {task2, task3})
self.assertEqual(sub.graph.edges, {(task2, task3, 'test')})
self.assertEqual(sub.tasks[task2].task_id, task2)
self.assertEqual(sorted(sub.tasks[task2].task['dependencies']), sorted([task3, 'dep1']))
self.assertEqual(sub.tasks[task2].task['payload'], 'http://dep1/' + task3)
self.assertEqual(sub.tasks[task3].task_id, task3)
def test_get_subgraph_removed(self):
"get_subgraph returns a smaller subgraph when tasks are removed"
graph = self.make_triangle()
self.assert_subgraph(
graph, {'t2', 't3'}, set(), {},
self.make_opt_graph(
self.make_task('t1', task_id='tid1', dependencies={})),
{'t1': 'tid1'})
def test_optimize(self):
"optimize_task_graph annotates and extracts the subgraph from a simple graph"
input = self.make_graph(
self.make_task('task1', {'optimize-to-task': 'dep1'}),
self.make_task('task2', {'no-optimize': []}),
self.make_task('task3', {'no-optimize': []}),
('task2', 'task1', 'build'),
('task2', 'task3', 'image'),
)
opt, label_to_taskid = optimize_task_graph(input, {}, set())
self.assertEqual(opt.graph, graph.Graph(
{label_to_taskid['task2'], label_to_taskid['task3']},
{(label_to_taskid['task2'], label_to_taskid['task3'], 'image')}))
def test_get_subgraph_replaced(self):
"get_subgraph returns a smaller subgraph when tasks are replaced"
graph = self.make_triangle()
self.assert_subgraph(
graph, set(), {'t1', 't2'}, {'t1': 'e1', 't2': 'e2'},
self.make_opt_graph(
self.make_task('t3', task_id='tid1', dependencies={'e1', 'e2'})),
{'t1': 'e1', 't2': 'e2', 't3': 'tid1'})
def test_get_subgraph_removed_dep(self):
"get_subgraph raises an Exception when a task depends on a removed task"
graph = self.make_triangle()
with self.assertRaises(Exception):
optimize.get_subgraph(graph, {'t2'}, set(), {})
if __name__ == '__main__':

View File

@ -40,30 +40,52 @@ class TestTimestamps(unittest.TestCase):
class TestTaskRefs(unittest.TestCase):
def do(self, input, output):
taskid_for_edge_name = {'edge%d' % n: 'tid%d' % n for n in range(1, 4)}
self.assertEqual(resolve_task_references('subject', input, taskid_for_edge_name), output)
def test_no_change(self):
input = {"key": "value", "numeric": 10, "list": ["a", True, False, None]}
self.assertEqual(resolve_task_references('lable', input, {}), input)
"resolve_task_references does nothing when there are no task references"
self.do({'in-a-list': ['stuff', {'property': '<edge1>'}]},
{'in-a-list': ['stuff', {'property': '<edge1>'}]})
def test_buried_replacement(self):
input = {"key": [{"key2": [{'task-reference': 'taskid=<toolchain>'}]}]}
self.assertEqual(resolve_task_references('lable', input, {'toolchain': 'abcd'}),
{u'key': [{u'key2': [u'taskid=abcd']}]})
def test_in_list(self):
"resolve_task_references resolves task references in a list"
self.do({'in-a-list': ['stuff', {'task-reference': '<edge1>'}]},
{'in-a-list': ['stuff', 'tid1']})
def test_appears_with_other_keys(self):
input = [{'task-reference': '<toolchain>', 'another-key': True}]
self.assertEqual(resolve_task_references('lable', input, {'toolchain': 'abcd'}),
[{'task-reference': '<toolchain>', 'another-key': True}])
def test_in_dict(self):
"resolve_task_references resolves task references in a dict"
self.do({'in-a-dict': {'stuff': {'task-reference': '<edge2>'}}},
{'in-a-dict': {'stuff': 'tid2'}})
def test_multiple_subs(self):
input = [{'task-reference': 'toolchain=<toolchain>, build=<build>'}]
self.assertEqual(
resolve_task_references('lable', input, {'toolchain': 'abcd', 'build': 'def'}),
['toolchain=abcd, build=def'])
def test_multiple(self):
"resolve_task_references resolves multiple references in the same string"
self.do({'multiple': {'task-reference': 'stuff <edge1> stuff <edge2> after'}},
{'multiple': 'stuff tid1 stuff tid2 after'})
def test_escaped(self):
input = [{'task-reference': '<<><toolchain>>'}]
self.assertEqual(resolve_task_references('lable', input, {'toolchain': 'abcd'}),
['<abcd>'])
def test_embedded(self):
"resolve_task_references resolves ebmedded references"
self.do({'embedded': {'task-reference': 'stuff before <edge3> stuff after'}},
{'embedded': 'stuff before tid3 stuff after'})
def test_escaping(self):
"resolve_task_references resolves escapes in task references"
self.do({'escape': {'task-reference': '<<><edge3>>'}},
{'escape': '<tid3>'})
def test_multikey(self):
"resolve_task_references is ignored when there is another key in the dict"
self.do({'escape': {'task-reference': '<edge3>', 'another-key': True}},
{'escape': {'task-reference': '<edge3>', 'another-key': True}})
def test_invalid(self):
"resolve_task_references raises a KeyError on reference to an invalid task"
self.assertRaisesRegexp(
KeyError,
"task 'subject' has no dependency named 'no-such'",
lambda: resolve_task_references('subject', {'task-reference': '<no-such>'}, {})
)
if __name__ == '__main__':