mirror of
https://github.com/mozilla/gecko-dev.git
synced 2025-01-11 22:41:02 +00:00
Bug 1399393 Refactor create_tasks to avoid bottlenecks r=dustin
MozReview-Commit-ID: cJW5X3HSCx --HG-- extra : rebase_source : 721dc051bf246385adc2e786815ebd70bc2cc7cc
This commit is contained in:
parent
1e8cda55bd
commit
d4015fac9b
@ -47,52 +47,61 @@ def create_tasks(taskgraph, label_to_taskid, params, decision_task_id=None):
|
||||
task_group_id = decision_task_id or slugid()
|
||||
scheduler_id = 'gecko-level-{}'.format(params['level'])
|
||||
|
||||
# Add the taskGroupId, schedulerId and optionally the decision task
|
||||
# dependency
|
||||
for task_id in taskgraph.graph.nodes:
|
||||
task_def = taskgraph.tasks[task_id].task
|
||||
|
||||
# if this task has no dependencies *within* this taskgraph, make it
|
||||
# depend on this decision task. If it has another dependency within
|
||||
# the taskgraph, then it already implicitly depends on the decision
|
||||
# task. The result is that tasks do not start immediately. if this
|
||||
# loop fails halfway through, none of the already-created tasks run.
|
||||
if decision_task_id:
|
||||
if not any(t in taskgraph.tasks for t in task_def.get('dependencies', [])):
|
||||
task_def.setdefault('dependencies', []).append(decision_task_id)
|
||||
|
||||
task_def['taskGroupId'] = task_group_id
|
||||
task_def['schedulerId'] = scheduler_id
|
||||
|
||||
with futures.ThreadPoolExecutor(CONCURRENCY) as e:
|
||||
fs = {}
|
||||
|
||||
# We can't submit a task until its dependencies have been submitted.
|
||||
# So our strategy is to walk the graph and submit tasks once all
|
||||
# their dependencies have been submitted.
|
||||
#
|
||||
# Using visit_postorder() here isn't the most efficient: we'll
|
||||
# block waiting for dependencies of task N to submit even though
|
||||
# dependencies for task N+1 may be finished. If we need to optimize
|
||||
# this further, we can build a graph of task dependencies and walk
|
||||
# that.
|
||||
for task_id in taskgraph.graph.visit_postorder():
|
||||
task_def = taskgraph.tasks[task_id].task
|
||||
attributes = taskgraph.tasks[task_id].attributes
|
||||
tasklist = set(taskgraph.graph.visit_postorder())
|
||||
alltasks = tasklist.copy()
|
||||
|
||||
# if this task has no dependencies *within* this taskgraph, make it
|
||||
# depend on this decision task. If it has another dependency within
|
||||
# the taskgraph, then it already implicitly depends on the decision
|
||||
# task. The result is that tasks do not start immediately. if this
|
||||
# loop fails halfway through, none of the already-created tasks run.
|
||||
if decision_task_id:
|
||||
if not any(t in taskgraph.tasks for t in task_def.get('dependencies', [])):
|
||||
task_def.setdefault('dependencies', []).append(decision_task_id)
|
||||
def schedule_tasks(f=None):
|
||||
to_remove = set()
|
||||
for task_id in tasklist:
|
||||
task_def = taskgraph.tasks[task_id].task
|
||||
# If we haven't finished submitting all our dependencies yet,
|
||||
# come back to this later.
|
||||
# Some dependencies aren't in our graph, so make sure to filter
|
||||
# those out
|
||||
deps = set(task_def.get('dependencies', [])) & alltasks
|
||||
if any((d not in fs or not fs[d].done()) for d in deps):
|
||||
continue
|
||||
|
||||
task_def['taskGroupId'] = task_group_id
|
||||
task_def['schedulerId'] = scheduler_id
|
||||
|
||||
# Wait for dependencies before submitting this.
|
||||
deps_fs = [fs[dep] for dep in task_def.get('dependencies', [])
|
||||
if dep in fs]
|
||||
for f in futures.as_completed(deps_fs):
|
||||
f.result()
|
||||
|
||||
fs[task_id] = e.submit(create_task, session, task_id,
|
||||
taskid_to_label[task_id], task_def)
|
||||
|
||||
# Schedule tasks as many times as task_duplicates indicates
|
||||
for i in range(1, attributes.get('task_duplicates', 1)):
|
||||
# We use slugid() since we want a distinct task id
|
||||
fs[task_id] = e.submit(create_task, session, slugid(),
|
||||
fs[task_id] = e.submit(create_task, session, task_id,
|
||||
taskid_to_label[task_id], task_def)
|
||||
to_remove.add(task_id)
|
||||
|
||||
# Wait for all futures to complete.
|
||||
for f in futures.as_completed(fs.values()):
|
||||
f.result()
|
||||
# Schedule tasks as many times as task_duplicates indicates
|
||||
attributes = taskgraph.tasks[task_id].attributes
|
||||
for i in range(1, attributes.get('task_duplicates', 1)):
|
||||
# We use slugid() since we want a distinct task id
|
||||
fs[task_id] = e.submit(create_task, session, slugid(),
|
||||
taskid_to_label[task_id], task_def)
|
||||
tasklist.difference_update(to_remove)
|
||||
|
||||
schedule_tasks()
|
||||
while tasklist:
|
||||
for f in futures.as_completed(fs.values()):
|
||||
f.result()
|
||||
schedule_tasks()
|
||||
|
||||
|
||||
def create_task(session, task_id, label, task_def):
|
||||
|
Loading…
x
Reference in New Issue
Block a user