Bug 1363428 - Switch wptrunner to use a deque for test groups, r=ato,jdm

Initially wptrunner had a single test queue that was shared between
all processes. Then for --run-by-dir it changed to a queue of
queues. This change makes it a queue of deques, which is simpler,
since the test queues themselves are no longer shared between
processes. It also changes the implementation when we aren't using
--run-by-dir but are using multiple processes to pre-group the tests
into N queues rather than sharing a single queue between all
processes. This is necessary to use the deque of course, but
importantly anticipates a change in which we will pre-compute per
queue metdata for each queue; that doesn't work well with one shared
queue. The downside of this change is that there is no work stealing,
so it may be less efficient if we randomly assign many slow jobs to
one particular process.

MozReview-Commit-ID: 7e0Odk7yDwr
This commit is contained in:
James Graham 2017-05-28 21:14:28 +01:00
parent 41cffd4ec6
commit e9ca85913c
3 changed files with 141 additions and 181 deletions

View File

@ -564,74 +564,72 @@ class TestLoader(object):
class TestSource(object):
__metaclass__ = ABCMeta
@abstractmethod
def queue_tests(self, test_queue):
pass
def __init__(self, test_queue):
self.test_queue = test_queue
self.current_group = None
@abstractmethod
def requeue_test(self, test):
#@classmethod (doesn't compose with @abstractmethod)
def make_queue(cls, tests, **kwargs):
pass
def __enter__(self):
return self
def group(self):
if not self.current_group or len(self.current_group) == 0:
try:
self.current_group = self.test_queue.get(block=False)
except Empty:
return None
return self.current_group
def __exit__(self, *args, **kwargs):
pass
class GroupedSource(TestSource):
@classmethod
def new_group(cls, state, test, **kwargs):
raise NotImplementedError
@classmethod
def make_queue(cls, tests, **kwargs):
test_queue = Queue()
groups = []
state = {}
for test in tests:
if cls.new_group(state, test, **kwargs):
groups.append(deque([]))
group = groups[-1]
group.append(test)
for item in groups:
test_queue.put(item)
return test_queue
class SingleTestSource(TestSource):
def __init__(self, test_queue):
self.test_queue = test_queue
@classmethod
def queue_tests(cls, test_queue, test_type, tests):
for test in tests[test_type]:
test_queue.put(test)
def get_queue(self):
if self.test_queue.empty():
return None
return self.test_queue
def requeue_test(self, test):
self.test_queue.put(test)
class PathGroupedSource(TestSource):
def __init__(self, test_queue):
self.test_queue = test_queue
self.current_queue = None
@classmethod
def queue_tests(cls, test_queue, test_type, tests, depth=None):
if depth is True:
depth = None
prev_path = None
group = None
for test in tests[test_type]:
path = urlparse.urlsplit(test.url).path.split("/")[1:-1][:depth]
if path != prev_path:
group = []
test_queue.put(group)
prev_path = path
def make_queue(cls, tests, **kwargs):
test_queue = Queue()
processes = kwargs["processes"]
queues = [deque([]) for _ in xrange(processes)]
for test in tests:
idx = hash(test.id) % processes
group = queues[idx]
group.append(test)
def get_queue(self):
if not self.current_queue or self.current_queue.empty():
try:
data = self.test_queue.get(block=True, timeout=1)
self.current_queue = Queue()
for item in data:
self.current_queue.put(item)
except Empty:
return None
return self.current_queue
for item in queues:
test_queue.put(item)
def requeue_test(self, test):
self.current_queue.put(test)
return test_queue
def __exit__(self, *args, **kwargs):
if self.current_queue:
self.current_queue.close()
class PathGroupedSource(GroupedSource):
@classmethod
def new_group(cls, state, test, **kwargs):
depth = kwargs.get("depth")
if depth is True:
depth = None
path = urlparse.urlsplit(test.url).path.split("/")[1:-1][:depth]
rv = path != state.get("prev_path")
state["prev_path"] = path
return rv

View File

@ -147,8 +147,6 @@ def next_manager_number():
class BrowserManager(object):
init_lock = threading.Lock()
def __init__(self, logger, browser, command_queue, no_timeout=False):
self.logger = logger
self.browser = browser
@ -180,27 +178,24 @@ class BrowserManager(object):
self.logger.debug("Init called, starting browser and runner")
with self.init_lock:
# Guard against problems initialising the browser or the browser
# remote control method
if not self.no_timeout:
self.init_timer = threading.Timer(self.browser.init_timeout,
self.init_timeout)
try:
if self.init_timer is not None:
self.init_timer.start()
self.logger.debug("Starting browser with settings %r" % self.browser_settings)
self.browser.start(**self.browser_settings)
self.browser_pid = self.browser.pid()
except:
self.logger.warning("Failure during init %s" % traceback.format_exc())
if self.init_timer is not None:
self.init_timer.cancel()
self.logger.error(traceback.format_exc())
succeeded = False
else:
succeeded = True
self.started = True
if not self.no_timeout:
self.init_timer = threading.Timer(self.browser.init_timeout,
self.init_timeout)
try:
if self.init_timer is not None:
self.init_timer.start()
self.logger.debug("Starting browser with settings %r" % self.browser_settings)
self.browser.start(**self.browser_settings)
self.browser_pid = self.browser.pid()
except:
self.logger.warning("Failure during init %s" % traceback.format_exc())
if self.init_timer is not None:
self.init_timer.cancel()
self.logger.error(traceback.format_exc())
succeeded = False
else:
succeeded = True
self.started = True
return succeeded
@ -237,10 +232,10 @@ class BrowserManager(object):
class _RunnerManagerState(object):
before_init = namedtuple("before_init", [])
initalizing = namedtuple("initalizing_browser",
["test", "test_queue", "failure_count"])
running = namedtuple("running", ["test", "test_queue"])
restarting = namedtuple("restarting", ["test", "test_queue"])
initializing = namedtuple("initializing_browser",
["test", "test_group", "failure_count"])
running = namedtuple("running", ["test", "test_group"])
restarting = namedtuple("restarting", ["test", "test_group"])
error = namedtuple("error", [])
stop = namedtuple("stop", [])
@ -249,9 +244,7 @@ RunnerManagerState = _RunnerManagerState()
class TestRunnerManager(threading.Thread):
init_lock = threading.Lock()
def __init__(self, suite_name, tests, test_source_cls, browser_cls, browser_kwargs,
def __init__(self, suite_name, test_queue, test_source_cls, browser_cls, browser_kwargs,
executor_cls, executor_kwargs, stop_flag, pause_after_test=False,
pause_on_unexpected=False, restart_on_unexpected=True, debug_info=None):
"""Thread that owns a single TestRunner process and any processes required
@ -271,9 +264,7 @@ class TestRunnerManager(threading.Thread):
"""
self.suite_name = suite_name
self.tests = tests
self.test_source_cls = test_source_cls
self.test_queue = None
self.test_source = test_source_cls(test_queue)
self.browser_cls = browser_cls
self.browser_kwargs = browser_kwargs
@ -281,8 +272,6 @@ class TestRunnerManager(threading.Thread):
self.executor_cls = executor_cls
self.executor_kwargs = executor_kwargs
self.test_source = None
# Flags used to shut down this thread if we get a sigint
self.parent_stop_flag = stop_flag
self.child_stop_flag = multiprocessing.Event()
@ -321,15 +310,14 @@ class TestRunnerManager(threading.Thread):
that the manager should shut down the next time the event loop
spins."""
self.logger = structuredlog.StructuredLogger(self.suite_name)
with self.browser_cls(self.logger, **self.browser_kwargs) as browser, self.test_source_cls(self.tests) as test_source:
with self.browser_cls(self.logger, **self.browser_kwargs) as browser:
self.browser = BrowserManager(self.logger,
browser,
self.command_queue,
no_timeout=self.debug_info is not None)
self.test_source = test_source
dispatch = {
RunnerManagerState.before_init: self.start_init,
RunnerManagerState.initalizing: self.init,
RunnerManagerState.initializing: self.init,
RunnerManagerState.running: self.run_test,
RunnerManagerState.restarting: self.restart_runner
}
@ -374,7 +362,7 @@ class TestRunnerManager(threading.Thread):
def wait_event(self):
dispatch = {
RunnerManagerState.before_init: {},
RunnerManagerState.initalizing:
RunnerManagerState.initializing:
{
"init_succeeded": self.init_succeeded,
"init_failed": self.init_failed,
@ -432,19 +420,18 @@ class TestRunnerManager(threading.Thread):
return
return f(*data)
def should_stop(self):
return self.child_stop_flag.is_set() or self.parent_stop_flag.is_set()
def start_init(self):
test, test_queue = self.get_next_test()
test, test_group = self.get_next_test()
if test is None:
return RunnerManagerState.stop()
else:
return RunnerManagerState.initalizing(test, test_queue, 0)
return RunnerManagerState.initializing(test, test_group, 0)
def init(self):
assert isinstance(self.state, RunnerManagerState.initalizing)
assert isinstance(self.state, RunnerManagerState.initializing)
if self.state.failure_count > self.max_restarts:
self.logger.error("Max restarts exceeded")
return RunnerManagerState.error()
@ -455,9 +442,9 @@ class TestRunnerManager(threading.Thread):
if result is Stop:
return RunnerManagerState.error()
elif not result:
return RunnerManagerState.initalizing(self.state.test,
self.state.test_queue,
self.state.failure_count + 1)
return RunnerManagerState.initializing(self.state.test,
self.state.test_group,
self.state.failure_count + 1)
else:
self.start_test_runner()
@ -465,7 +452,7 @@ class TestRunnerManager(threading.Thread):
# Note that we need to be careful to start the browser before the
# test runner to ensure that any state set when the browser is started
# can be passed in to the test runner.
assert isinstance(self.state, RunnerManagerState.initalizing)
assert isinstance(self.state, RunnerManagerState.initializing)
assert self.command_queue is not None
assert self.remote_queue is not None
self.logger.info("Starting runner")
@ -486,34 +473,29 @@ class TestRunnerManager(threading.Thread):
# Now we wait for either an init_succeeded event or an init_failed event
def init_succeeded(self):
assert isinstance(self.state, RunnerManagerState.initalizing)
assert isinstance(self.state, RunnerManagerState.initializing)
self.browser.after_init()
return RunnerManagerState.running(self.state.test,
self.state.test_queue)
self.state.test_group)
def init_failed(self):
assert isinstance(self.state, RunnerManagerState.initalizing)
assert isinstance(self.state, RunnerManagerState.initializing)
self.browser.after_init()
self.stop_runner(force=True)
return RunnerManagerState.initalizing(self.state.test,
self.state.test_queue,
self.state.failure_count + 1)
return RunnerManagerState.initializing(self.state.test,
self.state.test_group,
self.state.failure_count + 1)
def get_next_test(self, test_queue=None):
def get_next_test(self, test_group=None):
test = None
while test is None:
if test_queue is None:
test_queue = self.test_source.get_queue()
if test_queue is None:
while test_group is None or len(test_group) == 0:
test_group = self.test_source.group()
if test_group is None:
self.logger.info("No more tests")
return None, None
try:
# Need to block here just to allow for contention with other processes
test = test_queue.get(block=True, timeout=2)
except Empty:
if test_queue.empty():
test_queue = None
return test, test_queue
test = test_group.popleft()
return test, test_group
def run_test(self):
assert isinstance(self.state, RunnerManagerState.running)
@ -522,7 +504,7 @@ class TestRunnerManager(threading.Thread):
if self.browser.update_settings(self.state.test):
self.logger.info("Restarting browser for new test environment")
return RunnerManagerState.restarting(self.state.test,
self.state.test_queue)
self.state.test_group)
self.logger.test_start(self.state.test.id)
self.send_message("run_test", self.state.test)
@ -595,22 +577,22 @@ class TestRunnerManager(threading.Thread):
def after_test_end(self, restart):
assert isinstance(self.state, RunnerManagerState.running)
test, test_queue = self.get_next_test()
test, test_group = self.get_next_test()
if test is None:
return RunnerManagerState.stop()
if test_queue != self.state.test_queue:
if test_group != self.state.test_group:
# We are starting a new group of tests, so force a restart
restart = True
if restart:
return RunnerManagerState.restarting(test, test_queue)
return RunnerManagerState.restarting(test, test_group)
else:
return RunnerManagerState.running(test, test_queue)
return RunnerManagerState.running(test, test_group)
def restart_runner(self):
"""Stop and restart the TestRunner"""
assert isinstance(self.state, RunnerManagerState.restarting)
self.stop_runner()
return RunnerManagerState.initalizing(self.state.test, self.state.test_queue, 0)
return RunnerManagerState.initializing(self.state.test, self.state.test_group, 0)
def log(self, action, kwargs):
getattr(self.logger, action)(**kwargs)
@ -673,34 +655,16 @@ class TestRunnerManager(threading.Thread):
except Empty:
break
class TestQueue(object):
def __init__(self, test_source_cls, test_type, tests, **kwargs):
self.queue = None
self.test_source_cls = test_source_cls
self.test_type = test_type
self.tests = tests
self.kwargs = kwargs
def __enter__(self):
if not self.tests[self.test_type]:
return None
def make_test_queue(tests, test_source_cls, **test_source_kwargs):
queue = test_source_cls.make_queue(tests, **test_source_kwargs)
self.queue = Queue()
has_tests = self.test_source_cls.queue_tests(self.queue,
self.test_type,
self.tests,
**self.kwargs)
# There is a race condition that means sometimes we continue
# before the tests have been written to the underlying pipe.
# Polling the pipe for data here avoids that
self.queue._reader.poll(10)
assert not self.queue.empty()
return self.queue
def __exit__(self, *args, **kwargs):
if self.queue is not None:
self.queue.close()
self.queue = None
# There is a race condition that means sometimes we continue
# before the tests have been written to the underlying pipe.
# Polling the pipe for data here avoids that
queue._reader.poll(10)
assert not queue.empty()
return queue
class ManagerGroup(object):
@ -730,7 +694,6 @@ class ManagerGroup(object):
# of sigint
self.stop_flag = threading.Event()
self.logger = structuredlog.StructuredLogger(suite_name)
self.test_queue = None
def __enter__(self):
return self
@ -742,30 +705,29 @@ class ManagerGroup(object):
"""Start all managers in the group"""
self.logger.debug("Using %i processes" % self.size)
self.test_queue = TestQueue(self.test_source_cls,
test_type,
tests,
**self.test_source_kwargs)
with self.test_queue as test_queue:
if test_queue is None:
self.logger.info("No %s tests to run" % test_type)
return
for _ in range(self.size):
manager = TestRunnerManager(self.suite_name,
test_queue,
self.test_source_cls,
self.browser_cls,
self.browser_kwargs,
self.executor_cls,
self.executor_kwargs,
self.stop_flag,
self.pause_after_test,
self.pause_on_unexpected,
self.restart_on_unexpected,
self.debug_info)
manager.start()
self.pool.add(manager)
self.wait()
type_tests = tests[test_type]
if type_tests is None:
self.logger.info("No %s tests to run" % test_type)
return
test_queue = make_test_queue(type_tests, self.test_source_cls, **self.test_source_kwargs)
for _ in range(self.size):
manager = TestRunnerManager(self.suite_name,
test_queue,
self.test_source_cls,
self.browser_cls,
self.browser_kwargs,
self.executor_cls,
self.executor_kwargs,
self.stop_flag,
self.pause_after_test,
self.pause_on_unexpected,
self.restart_on_unexpected,
self.debug_info)
manager.start()
self.pool.add(manager)
self.wait()
def is_alive(self):
"""Boolean indicating whether any manager in the group is still alive"""

View File

@ -152,13 +152,13 @@ def run_tests(config, test_paths, product, **kwargs):
run_info_extras=run_info_extras(**kwargs),
**kwargs)
test_source_kwargs = {"processes": kwargs["processes"]}
if kwargs["run_by_dir"] is False:
test_source_cls = testloader.SingleTestSource
test_source_kwargs = {}
else:
# A value of None indicates infinite depth
test_source_cls = testloader.PathGroupedSource
test_source_kwargs = {"depth": kwargs["run_by_dir"]}
test_source_kwargs["depth"] = kwargs["run_by_dir"]
logger.info("Using %i client processes" % kwargs["processes"])