Bug 1625200 - [taskgraph] Pull the 10th push backstop out of SETA, r=tomprince

We'll want some kind of backstop no matter what optimization algorithm we use.
We don't want to go too long without running any given task so we can find
regressions quickly and have a good merge candidate.

This pulls the logic that handles this out of the SETA strategy and into its
own strategy.

This will also make the SETA shadow scheduler more representative of what the
algorithm is doing.

Note in the future we may find ways to make this backstop more efficient (i.e
only run tasks that didn't run in the last 9 pushes for example).

Depends on D68621

Differential Revision: https://phabricator.services.mozilla.com/D68622

--HG--
extra : moz-landing-system : lando
This commit is contained in:
Andrew Halberstadt 2020-04-15 19:45:34 +00:00
parent 4da4148bd0
commit 3362e14674
4 changed files with 169 additions and 105 deletions

View File

@ -369,7 +369,7 @@ import_sibling_modules()
# Register composite strategies.
register_strategy('test', args=('skip-unless-schedules', 'seta'))(Any)
register_strategy('test', args=(Any('skip-unless-schedules', 'seta'), 'backstop'))(All)
register_strategy('test-inclusive', args=('skip-unless-schedules',))(Alias)
register_strategy('test-try', args=('skip-unless-schedules',))(Alias)
register_strategy('fuzzing-builds', args=('skip-unless-schedules', 'seta'))(Any)

View File

@ -0,0 +1,128 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import absolute_import, print_function, unicode_literals
import logging
from collections import defaultdict
import requests
from redo import retry
from taskgraph.optimize import OptimizationStrategy, register_strategy
logger = logging.getLogger(__name__)
PUSH_ENDPOINT = "{head_repository}/json-pushes/?startID={push_id_start}&endID={push_id_end}"
@register_strategy('backstop', args=(10, 60))
class Backstop(OptimizationStrategy):
"""Ensures that no task gets left behind.
Will schedule all tasks either every Nth push, or M minutes.
Args:
push_interval (int): Number of pushes
"""
def __init__(self, push_interval, time_interval):
self.push_interval = push_interval
self.time_interval = time_interval
# cached push dates by project
self.push_dates = defaultdict(dict)
# cached push_ids that failed to retrieve datetime for
self.failed_json_push_calls = []
def should_remove_task(self, task, params, _):
project = params['project']
pushid = int(params['pushlog_id'])
pushdate = int(params['pushdate'])
# Only enable the backstop on autoland since we always want the *real*
# optimized tasks on try and release branches.
if project != 'autoland':
return True
# On every Nth push, want to run all tasks.
if pushid % self.push_interval == 0:
return False
# We also want to ensure we run all tasks at least once per N minutes.
if self.minutes_between_pushes(
params["head_repository"],
project,
pushid,
pushdate) >= self.time_interval:
return False
return True
def minutes_between_pushes(self, repository, project, cur_push_id, cur_push_date):
# figure out the minutes that have elapsed between the current push and previous one
# defaulting to max min so if we can't get value, defaults to run the task
min_between_pushes = self.time_interval
prev_push_id = cur_push_id - 1
# cache the pushdate for the current push so we can use it next time
self.push_dates[project].update({cur_push_id: cur_push_date})
# check if we already have the previous push id's datetime cached
prev_push_date = self.push_dates[project].get(prev_push_id, 0)
# we have datetime of current and previous push, so return elapsed minutes and bail
if cur_push_date > 0 and prev_push_date > 0:
return (cur_push_date - prev_push_date) / 60
# datetime for previous pushid not cached, so must retrieve it
# if we already tried to retrieve the datetime for this pushid
# before and the json-push request failed, don't try it again
if prev_push_id in self.failed_json_push_calls:
return min_between_pushes
url = PUSH_ENDPOINT.format(
head_repository=repository,
push_id_start=prev_push_id - 1,
push_id_end=prev_push_id,
)
try:
response = retry(requests.get, attempts=2, sleeptime=10,
args=(url, ),
kwargs={'timeout': 60, 'headers': {'User-Agent': 'TaskCluster'}})
prev_push_date = response.json().get(str(prev_push_id), {}).get('date', 0)
# cache it for next time
self.push_dates[project].update({prev_push_id: prev_push_date})
# now have datetime of current and previous push
if cur_push_date > 0 and prev_push_date > 0:
min_between_pushes = (cur_push_date - prev_push_date) / 60
# In the event of request times out, requests will raise a TimeoutError.
except requests.exceptions.Timeout:
logger.warning("json-pushes timeout, enabling backstop")
self.failed_json_push_calls.append(prev_push_id)
# In the event of a network problem (e.g. DNS failure, refused connection, etc),
# requests will raise a ConnectionError.
except requests.exceptions.ConnectionError:
logger.warning("json-pushes connection error, enabling backstop")
self.failed_json_push_calls.append(prev_push_id)
# In the event of the rare invalid HTTP response(e.g 404, 401),
# requests will raise an HTTPError exception
except requests.exceptions.HTTPError:
logger.warning("Bad Http response, enabling backstop")
self.failed_json_push_calls.append(prev_push_id)
# When we get invalid JSON (i.e. 500 error), it results in a ValueError (bug 1313426)
except ValueError as error:
logger.warning("Invalid JSON, possible server error: {}".format(error))
self.failed_json_push_calls.append(prev_push_id)
# We just print the error out as a debug message if we failed to catch the exception above
except requests.exceptions.RequestException as error:
logger.warning(error)
self.failed_json_push_calls.append(prev_push_id)
return min_between_pushes

View File

@ -6,10 +6,9 @@ from __future__ import absolute_import, print_function, unicode_literals
import json
import logging
import requests
from collections import defaultdict
import attr
import requests
from redo import retry
from requests import exceptions
@ -24,7 +23,6 @@ SETA_LOW_PRIORITY = 5
SETA_ENDPOINT = "https://treeherder.mozilla.org/api/project/%s/seta/" \
"job-priorities/?build_system_type=%s&priority=%s"
PUSH_ENDPOINT = "https://hg.mozilla.org/integration/%s/json-pushes/?startID=%d&endID=%d"
@attr.s(frozen=True)
@ -37,10 +35,6 @@ class SETA(object):
# cached low value tasks, by project
low_value_tasks = attr.ib(factory=dict, init=False)
low_value_bb_tasks = attr.ib(factory=dict, init=False)
# cached push dates by project
push_dates = attr.ib(factory=lambda: defaultdict(dict), init=False)
# cached push_ids that failed to retrieve datetime for
failed_json_push_calls = attr.ib(factory=list, init=False)
def _get_task_string(self, task_tuple):
# convert task tuple to single task string, so the task label sent in can match
@ -162,96 +156,13 @@ class SETA(object):
return low_value_tasks
def minutes_between_pushes(self, project, cur_push_id, cur_push_date, time_interval):
# figure out the minutes that have elapsed between the current push and previous one
# defaulting to max min so if we can't get value, defaults to run the task
min_between_pushes = time_interval
prev_push_id = cur_push_id - 1
# cache the pushdate for the current push so we can use it next time
self.push_dates[project].update({cur_push_id: cur_push_date})
# check if we already have the previous push id's datetime cached
prev_push_date = self.push_dates[project].get(prev_push_id, 0)
# we have datetime of current and previous push, so return elapsed minutes and bail
if cur_push_date > 0 and prev_push_date > 0:
return (cur_push_date - prev_push_date) / 60
# datetime for previous pushid not cached, so must retrieve it
# if we already tried to retrieve the datetime for this pushid
# before and the json-push request failed, don't try it again
if prev_push_id in self.failed_json_push_calls:
return min_between_pushes
url = PUSH_ENDPOINT % (project, cur_push_id - 2, prev_push_id)
try:
response = retry(requests.get, attempts=2, sleeptime=10,
args=(url, ),
kwargs={'timeout': 60, 'headers': {'User-Agent': 'TaskCluster'}})
prev_push_date = json.loads(response.content).get(str(prev_push_id), {}).get('date', 0)
# cache it for next time
self.push_dates[project].update({prev_push_id: prev_push_date})
# now have datetime of current and previous push
if cur_push_date > 0 and prev_push_date > 0:
min_between_pushes = (cur_push_date - prev_push_date) / 60
# In the event of request times out, requests will raise a TimeoutError.
except exceptions.Timeout:
logger.warning("json-pushes timeout, treating task as high value")
self.failed_json_push_calls.append(prev_push_id)
# In the event of a network problem (e.g. DNS failure, refused connection, etc),
# requests will raise a ConnectionError.
except exceptions.ConnectionError:
logger.warning("json-pushes connection error, treating task as high value")
self.failed_json_push_calls.append(prev_push_id)
# In the event of the rare invalid HTTP response(e.g 404, 401),
# requests will raise an HTTPError exception
except exceptions.HTTPError:
logger.warning("Bad Http response, treating task as high value")
self.failed_json_push_calls.append(prev_push_id)
# When we get invalid JSON (i.e. 500 error), it results in a ValueError (bug 1313426)
except ValueError as error:
logger.warning("Invalid JSON, possible server error: {}".format(error))
self.failed_json_push_calls.append(prev_push_id)
# We just print the error out as a debug message if we failed to catch the exception above
except exceptions.RequestException as error:
logger.warning(error)
self.failed_json_push_calls.append(prev_push_id)
return min_between_pushes
def is_low_value_task(self, label, project, pushlog_id, push_date,
push_interval, time_interval):
def is_low_value_task(self, label, project):
# marking a task as low_value means it will be optimized out by tc
if project not in SETA_PROJECTS:
return False
# Disable the "run all tasks" feature if we're on try (e.g pushed via `mach try auto`)
if project != 'try':
# on every Nth push, want to run all tasks
if int(pushlog_id) % push_interval == 0:
return False
# Nth push, so time to call seta based on number of pushes; however
# we also want to ensure we run all tasks at least once per N minutes
if self.minutes_between_pushes(
project,
int(pushlog_id),
int(push_date),
time_interval) >= time_interval:
return False
else:
# The SETA service has a superficial check preventing try, so spoof autoland
project = 'autoland'
# The SETA service has a superficial check preventing try, so spoof autoland
project = 'autoland'
# cache the low value tasks per project to avoid repeated SETA server queries
if project not in self.low_value_tasks:
@ -264,17 +175,9 @@ class SETA(object):
is_low_value_task = SETA().is_low_value_task
@register_strategy('seta', args=(10, 60))
@register_strategy('seta')
class SkipLowValue(OptimizationStrategy):
def __init__(self, push_interval, time_interval):
self.push_interval = push_interval
self.time_interval = time_interval
def should_remove_task(self, task, params, _):
# Return True to optimize a low value task.
return is_low_value_task(task.label, params.get('project'),
params.get('pushlog_id'),
params.get('pushdate'),
self.push_interval,
self.time_interval)
return is_low_value_task(task.label, params['project'])

View File

@ -6,8 +6,11 @@ from __future__ import absolute_import
import time
import pytest
from datetime import datetime
from mozunit import main
from time import mktime
from taskgraph.optimize.backstop import Backstop
from taskgraph.optimize.bugbug import BugBugPushSchedules, BugbugTimeoutException, platform
from taskgraph.task import Task
@ -15,9 +18,12 @@ from taskgraph.task import Task
@pytest.fixture(scope='module')
def params():
return {
'branch': 'integration/autoland',
'head_repository': 'https://hg.mozilla.org/integration/autoland',
'head_rev': 'abcdef',
'branch': 'integration/autoland',
'project': 'autoland',
'pushlog_id': 1,
'pushdate': mktime(datetime.now().timetuple()),
}
@ -151,5 +157,32 @@ def test_bugbug_timeout(monkeypatch, responses, params, tasks):
opt.should_remove_task(tasks[0], params, None)
def test_backstop(params, tasks):
all_labels = {t.label for t in tasks}
opt = Backstop(10, 60) # every 10th push or 1 hour
# If there's no previous push date, run tasks
params['pushlog_id'] = 8
scheduled = {t.label for t in tasks if not opt.should_remove_task(t, params, None)}
assert scheduled == all_labels
# Only multiples of 10 schedule tasks. Pushdate from push 8 was cached.
params['pushlog_id'] = 9
params['pushdate'] += 3599
scheduled = {t.label for t in tasks if not opt.should_remove_task(t, params, None)}
assert scheduled == set()
params['pushlog_id'] = 10
params['pushdate'] += 1
scheduled = {t.label for t in tasks if not opt.should_remove_task(t, params, None)}
assert scheduled == all_labels
# Tasks are also scheduled if an hour has passed.
params['pushlog_id'] = 11
params['pushdate'] += 3600
scheduled = {t.label for t in tasks if not opt.should_remove_task(t, params, None)}
assert scheduled == all_labels
if __name__ == '__main__':
main()