Bug 1400223 - Merge tasks added by action tasks into graphs used for subsequent tasks r=dustin

MozReview-Commit-ID: 7ZTbS5h0vPA

--HG--
extra : rebase_source : c1acea26ac526c672f6630504ae69bbe7dbd6677
This commit is contained in:
Brian Stack 2017-09-20 12:52:29 -07:00
parent 518a10833d
commit 17f6f5f45c
9 changed files with 104 additions and 48 deletions

View File

@ -40,7 +40,7 @@ python callback. When the action is triggered in a user interface,
input matching the schema is collected, passed to a new task which then calls
your python callback, enabling it to do pretty much anything it wants to.
To create a new action you must create a file
To create a new callback action you must create a file
``taskcluster/taskgraph/actions/my-action.py``, that at minimum contains::
from registry import register_callback_action
@ -58,6 +58,20 @@ To create a new action you must create a file
# input, task_id, and task should all be None
print "Hello was triggered from taskGroupId: " + taskGroupId
Callback actions are configured in-tree to generate 3 artifacts when they run.
These artifacts are similar to the artifacts generated by decision tasks since
callback actions are basically mini decision tasks. The artifacts are:
``task-graph.json``:
The graph of all tasks created by the action task. Includes tasks
created to satisfy requirements.
``to-run.json``:
The set of tasks that the action task requested to build. This does not
include the requirements.
``label-to-taskid.json``:
This is the mapping from label to ``taskid`` for all tasks involved in
the task-graph. This includes dependencies.
The example above defines an action that is available in the context-menu for
the entire task-group (result-set or push in Treeherder terminology). To create
an action that shows up in the context menu for a task we would specify the

View File

@ -8,9 +8,7 @@ from __future__ import absolute_import, print_function, unicode_literals
from .registry import register_callback_action
from .util import (create_tasks, find_decision_task)
from taskgraph.util.taskcluster import get_artifact
from taskgraph.taskgraph import TaskGraph
from .util import (create_tasks, fetch_graph_and_labels)
@register_callback_action(
@ -34,11 +32,7 @@ from taskgraph.taskgraph import TaskGraph
}
)
def add_new_jobs_action(parameters, input, task_group_id, task_id, task):
decision_task_id = find_decision_task(parameters)
full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
_, full_task_graph = TaskGraph.from_json(full_task_graph)
label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(parameters)
to_run = []
for elem in input['tasks']:

View File

@ -9,9 +9,7 @@ from __future__ import absolute_import, print_function, unicode_literals
import logging
from .registry import register_callback_action
from .util import create_tasks, find_decision_task
from taskgraph.util.taskcluster import get_artifact
from taskgraph.taskgraph import TaskGraph
from .util import create_tasks, fetch_graph_and_labels
logger = logging.getLogger(__name__)
@ -39,11 +37,7 @@ logger = logging.getLogger(__name__)
},
)
def add_all_talos(parameters, input, task_group_id, task_id, task):
decision_task_id = find_decision_task(parameters)
full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
_, full_task_graph = TaskGraph.from_json(full_task_graph)
label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(parameters)
times = input.get('times', 1)
for i in xrange(times):

View File

@ -11,11 +11,9 @@ import logging
from slugid import nice as slugid
from .util import (find_decision_task, create_task_from_def)
from .util import (create_task_from_def, fetch_graph_and_labels)
from .registry import register_callback_action
from taskgraph.util.taskcluster import get_artifact
from taskgraph.util.parameterization import resolve_task_references
from taskgraph.taskgraph import TaskGraph
TASKCLUSTER_QUEUE_URL = "https://queue.taskcluster.net/v1/task"
@ -82,11 +80,7 @@ logger = logging.getLogger(__name__)
}
)
def mochitest_retrigger_action(parameters, input, task_group_id, task_id, task):
decision_task_id = find_decision_task(parameters)
full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
_, full_task_graph = TaskGraph.from_json(full_task_graph)
label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(parameters)
pre_task = full_task_graph.tasks[task['metadata']['name']]

View File

@ -190,7 +190,7 @@ def register_callback_action(name, title, symbol, description, order=10000,
return {
'created': {'$fromNow': ''},
'deadline': {'$fromNow': '12 hours'},
'expires': {'$fromNow': '14 days'},
'expires': {'$fromNow': '1 year'},
'metadata': {
'owner': 'mozilla-taskcluster-maintenance@mozilla.com',
'source': '{}raw-file/{}/{}'.format(
@ -215,6 +215,8 @@ def register_callback_action(name, title, symbol, description, order=10000,
parameters['project'], parameters['head_rev'], parameters['pushlog_id']),
'tc-treeherder-stage.v2.{}.{}.{}'.format(
parameters['project'], parameters['head_rev'], parameters['pushlog_id']),
'index.gecko.v2.{}.pushlog-id.{}.actions.${{ownTaskId}}'.format(
parameters['project'], parameters['pushlog_id'])
],
'payload': {
'env': {
@ -230,6 +232,13 @@ def register_callback_action(name, title, symbol, description, order=10000,
'ACTION_CALLBACK': cb.__name__,
'ACTION_PARAMETERS': {'$json': {'$eval': 'parameters'}},
},
'artifacts': {
'public': {
'type': 'directory',
'path': '/builds/worker/artifacts',
'expires': {'$fromNow': '1 year'},
},
},
'cache': {
'level-{}-checkouts'.format(parameters['level']):
'/builds/worker/checkouts',

View File

@ -10,11 +10,9 @@ import logging
from .util import (
create_tasks,
find_decision_task
fetch_graph_and_labels
)
from .registry import register_callback_action
from taskgraph.util.taskcluster import get_artifact
from taskgraph.taskgraph import TaskGraph
logger = logging.getLogger(__name__)
@ -51,11 +49,7 @@ logger = logging.getLogger(__name__)
}
)
def retrigger_action(parameters, input, task_group_id, task_id, task):
decision_task_id = find_decision_task(parameters)
full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
_, full_task_graph = TaskGraph.from_json(full_task_graph)
label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(parameters)
label = task['metadata']['name']
with_downstream = ' '

View File

@ -9,9 +9,8 @@ from __future__ import absolute_import, print_function, unicode_literals
import logging
from .registry import register_callback_action
from .util import create_tasks, find_decision_task
from .util import create_tasks, fetch_graph_and_labels
from taskgraph.util.taskcluster import get_artifact
from taskgraph.taskgraph import TaskGraph
logger = logging.getLogger(__name__)
@ -30,12 +29,8 @@ logger = logging.getLogger(__name__)
context=[], # Applies to decision task
)
def run_missing_tests(parameters, input, task_group_id, task_id, task):
decision_task_id = find_decision_task(parameters)
full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
_, full_task_graph = TaskGraph.from_json(full_task_graph)
decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(parameters)
target_tasks = get_artifact(decision_task_id, "public/target-tasks.json")
label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
# The idea here is to schedule all tasks of the `test` kind that were
# targetted but did not appear in the final task-graph -- those were the

View File

@ -6,10 +6,17 @@
from __future__ import absolute_import, print_function, unicode_literals
import logging
from requests.exceptions import HTTPError
from taskgraph import create
from taskgraph.decision import write_artifact
from taskgraph.taskgraph import TaskGraph
from taskgraph.optimize import optimize_task_graph
from taskgraph.util.taskcluster import get_session, find_task_id
from taskgraph.util.taskcluster import get_session, find_task_id, get_artifact, list_tasks
logger = logging.getLogger(__name__)
def find_decision_task(parameters):
@ -20,6 +27,30 @@ def find_decision_task(parameters):
parameters['pushlog_id']))
def fetch_graph_and_labels(parameters):
decision_task_id = find_decision_task(parameters)
# First grab the graph and labels generated during the initial decision task
full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
_, full_task_graph = TaskGraph.from_json(full_task_graph)
label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
# Now fetch any modifications made by action tasks and swap out new tasks
# for old ones
namespace = 'gecko.v2.{}.pushlog-id.{}.actions'.format(
parameters['project'],
parameters['pushlog_id'])
for action in list_tasks(namespace):
try:
run_label_to_id = get_artifact(action, "public/label-to-taskid.json")
label_to_taskid.update(run_label_to_id)
except HTTPError as e:
logger.info('Skipping {} due to missing artifact! Error: {}'.format(action, e))
continue
return (decision_task_id, full_task_graph, label_to_taskid)
def create_task_from_def(task_id, task_def, level):
"""Create a new task from a definition rather than from a label
that is already in the full-task-graph. The task definition will
@ -49,4 +80,7 @@ def create_tasks(to_run, full_task_graph, label_to_taskid, params, decision_task
params,
to_run,
label_to_taskid)
write_artifact('task-graph.json', optimized_task_graph.to_json())
write_artifact('label-to-taskid.json', label_to_taskid)
write_artifact('to-run.json', list(to_run))
create.create_tasks(optimized_task_graph, label_to_taskid, params, decision_task_id)

View File

@ -6,6 +6,7 @@
from __future__ import absolute_import, print_function, unicode_literals
import datetime
import functools
import yaml
import requests
@ -27,9 +28,12 @@ def get_session():
return session
def _do_request(url):
def _do_request(url, content=None):
session = get_session()
response = session.get(url, stream=True)
if content is None:
response = session.get(url, stream=True)
else:
response = session.post(url, json=content)
if response.status_code >= 400:
# Consume content before raise_for_status, so that the connection can be
# reused.
@ -74,12 +78,12 @@ def list_artifacts(task_id, use_proxy=False):
return response.json()['artifacts']
def get_index_url(index_path, use_proxy=False):
def get_index_url(index_path, use_proxy=False, multiple=False):
if use_proxy:
INDEX_URL = 'http://taskcluster/index/v1/task/{}'
INDEX_URL = 'http://taskcluster/index/v1/task{}/{}'
else:
INDEX_URL = 'https://index.taskcluster.net/v1/task/{}'
return INDEX_URL.format(index_path)
INDEX_URL = 'https://index.taskcluster.net/v1/task{}/{}'
return INDEX_URL.format('s' if multiple else '', index_path)
def find_task_id(index_path, use_proxy=False):
@ -98,6 +102,30 @@ def get_artifact_from_index(index_path, artifact_path, use_proxy=False):
return _handle_artifact(full_path, response)
def list_tasks(index_path, use_proxy=False):
"""
Returns a list of task_ids where each task_id is indexed under a path
in the index. Results are sorted by expiration date from oldest to newest.
"""
results = []
data = {}
while True:
response = _do_request(get_index_url(index_path, use_proxy, multiple=True), data)
response = response.json()
results += response['tasks']
if response.get('continuationToken'):
data = {'continuationToken': response.get('continuationToken')}
else:
break
# We can sort on expires because in the general case
# all of these tasks should be created with the same expires time so they end up in
# order from earliest to latest action. If more correctness is needed, consider
# fetching each task and sorting on the created date.
results.sort(key=lambda t: datetime.datetime.strptime(t['expires'], '%Y-%m-%dT%H:%M:%S.%fZ'))
return [t['taskId'] for t in results]
def get_task_url(task_id, use_proxy=False):
if use_proxy:
TASK_URL = 'http://taskcluster/queue/v1/task/{}'