From e9ca85913c5ff1c9c1911b7018ba908577f893e9 Mon Sep 17 00:00:00 2001 From: James Graham Date: Sun, 28 May 2017 21:14:28 +0100 Subject: [PATCH] Bug 1363428 - Switch wptrunner to use a deque for test groups, r=ato,jdm Initially wptrunner had a single test queue that was shared between all processes. Then for --run-by-dir it changed to a queue of queues. This change makes it a queue of deques, which is simpler, since the test queues themselves are no longer shared between processes. It also changes the implementation when we aren't using --run-by-dir but are using multiple processes to pre-group the tests into N queues rather than sharing a single queue between all processes. This is necessary to use the deque of course, but importantly anticipates a change in which we will pre-compute per queue metdata for each queue; that doesn't work well with one shared queue. The downside of this change is that there is no work stealing, so it may be less efficient if we randomly assign many slow jobs to one particular process. MozReview-Commit-ID: 7e0Odk7yDwr --- .../tools/wptrunner/wptrunner/testloader.py | 114 +++++----- .../tools/wptrunner/wptrunner/testrunner.py | 204 +++++++----------- .../tools/wptrunner/wptrunner/wptrunner.py | 4 +- 3 files changed, 141 insertions(+), 181 deletions(-) diff --git a/testing/web-platform/tests/tools/wptrunner/wptrunner/testloader.py b/testing/web-platform/tests/tools/wptrunner/wptrunner/testloader.py index 2b2033f3d213..65bfcd72a289 100644 --- a/testing/web-platform/tests/tools/wptrunner/wptrunner/testloader.py +++ b/testing/web-platform/tests/tools/wptrunner/wptrunner/testloader.py @@ -564,74 +564,72 @@ class TestLoader(object): class TestSource(object): __metaclass__ = ABCMeta - @abstractmethod - def queue_tests(self, test_queue): - pass + def __init__(self, test_queue): + self.test_queue = test_queue + self.current_group = None @abstractmethod - def requeue_test(self, test): + #@classmethod (doesn't compose with @abstractmethod) + def make_queue(cls, tests, **kwargs): pass - def __enter__(self): - return self + def group(self): + if not self.current_group or len(self.current_group) == 0: + try: + self.current_group = self.test_queue.get(block=False) + except Empty: + return None + return self.current_group - def __exit__(self, *args, **kwargs): - pass + +class GroupedSource(TestSource): + @classmethod + def new_group(cls, state, test, **kwargs): + raise NotImplementedError + + @classmethod + def make_queue(cls, tests, **kwargs): + test_queue = Queue() + groups = [] + + state = {} + + for test in tests: + if cls.new_group(state, test, **kwargs): + groups.append(deque([])) + + group = groups[-1] + group.append(test) + + for item in groups: + test_queue.put(item) + return test_queue class SingleTestSource(TestSource): - def __init__(self, test_queue): - self.test_queue = test_queue - @classmethod - def queue_tests(cls, test_queue, test_type, tests): - for test in tests[test_type]: - test_queue.put(test) - - def get_queue(self): - if self.test_queue.empty(): - return None - return self.test_queue - - def requeue_test(self, test): - self.test_queue.put(test) - -class PathGroupedSource(TestSource): - def __init__(self, test_queue): - self.test_queue = test_queue - self.current_queue = None - - @classmethod - def queue_tests(cls, test_queue, test_type, tests, depth=None): - if depth is True: - depth = None - - prev_path = None - group = None - - for test in tests[test_type]: - path = urlparse.urlsplit(test.url).path.split("/")[1:-1][:depth] - if path != prev_path: - group = [] - test_queue.put(group) - prev_path = path - + def make_queue(cls, tests, **kwargs): + test_queue = Queue() + processes = kwargs["processes"] + queues = [deque([]) for _ in xrange(processes)] + for test in tests: + idx = hash(test.id) % processes + group = queues[idx] group.append(test) - def get_queue(self): - if not self.current_queue or self.current_queue.empty(): - try: - data = self.test_queue.get(block=True, timeout=1) - self.current_queue = Queue() - for item in data: - self.current_queue.put(item) - except Empty: - return None - return self.current_queue + for item in queues: + test_queue.put(item) - def requeue_test(self, test): - self.current_queue.put(test) + return test_queue - def __exit__(self, *args, **kwargs): - if self.current_queue: - self.current_queue.close() + +class PathGroupedSource(GroupedSource): + @classmethod + def new_group(cls, state, test, **kwargs): + depth = kwargs.get("depth") + if depth is True: + depth = None + path = urlparse.urlsplit(test.url).path.split("/")[1:-1][:depth] + rv = path != state.get("prev_path") + state["prev_path"] = path + return rv diff --git a/testing/web-platform/tests/tools/wptrunner/wptrunner/testrunner.py b/testing/web-platform/tests/tools/wptrunner/wptrunner/testrunner.py index ee118dc93572..b3c2a3ddc032 100644 --- a/testing/web-platform/tests/tools/wptrunner/wptrunner/testrunner.py +++ b/testing/web-platform/tests/tools/wptrunner/wptrunner/testrunner.py @@ -147,8 +147,6 @@ def next_manager_number(): class BrowserManager(object): - init_lock = threading.Lock() - def __init__(self, logger, browser, command_queue, no_timeout=False): self.logger = logger self.browser = browser @@ -180,27 +178,24 @@ class BrowserManager(object): self.logger.debug("Init called, starting browser and runner") - with self.init_lock: - # Guard against problems initialising the browser or the browser - # remote control method - if not self.no_timeout: - self.init_timer = threading.Timer(self.browser.init_timeout, - self.init_timeout) - try: - if self.init_timer is not None: - self.init_timer.start() - self.logger.debug("Starting browser with settings %r" % self.browser_settings) - self.browser.start(**self.browser_settings) - self.browser_pid = self.browser.pid() - except: - self.logger.warning("Failure during init %s" % traceback.format_exc()) - if self.init_timer is not None: - self.init_timer.cancel() - self.logger.error(traceback.format_exc()) - succeeded = False - else: - succeeded = True - self.started = True + if not self.no_timeout: + self.init_timer = threading.Timer(self.browser.init_timeout, + self.init_timeout) + try: + if self.init_timer is not None: + self.init_timer.start() + self.logger.debug("Starting browser with settings %r" % self.browser_settings) + self.browser.start(**self.browser_settings) + self.browser_pid = self.browser.pid() + except: + self.logger.warning("Failure during init %s" % traceback.format_exc()) + if self.init_timer is not None: + self.init_timer.cancel() + self.logger.error(traceback.format_exc()) + succeeded = False + else: + succeeded = True + self.started = True return succeeded @@ -237,10 +232,10 @@ class BrowserManager(object): class _RunnerManagerState(object): before_init = namedtuple("before_init", []) - initalizing = namedtuple("initalizing_browser", - ["test", "test_queue", "failure_count"]) - running = namedtuple("running", ["test", "test_queue"]) - restarting = namedtuple("restarting", ["test", "test_queue"]) + initializing = namedtuple("initializing_browser", + ["test", "test_group", "failure_count"]) + running = namedtuple("running", ["test", "test_group"]) + restarting = namedtuple("restarting", ["test", "test_group"]) error = namedtuple("error", []) stop = namedtuple("stop", []) @@ -249,9 +244,7 @@ RunnerManagerState = _RunnerManagerState() class TestRunnerManager(threading.Thread): - init_lock = threading.Lock() - - def __init__(self, suite_name, tests, test_source_cls, browser_cls, browser_kwargs, + def __init__(self, suite_name, test_queue, test_source_cls, browser_cls, browser_kwargs, executor_cls, executor_kwargs, stop_flag, pause_after_test=False, pause_on_unexpected=False, restart_on_unexpected=True, debug_info=None): """Thread that owns a single TestRunner process and any processes required @@ -271,9 +264,7 @@ class TestRunnerManager(threading.Thread): """ self.suite_name = suite_name - self.tests = tests - self.test_source_cls = test_source_cls - self.test_queue = None + self.test_source = test_source_cls(test_queue) self.browser_cls = browser_cls self.browser_kwargs = browser_kwargs @@ -281,8 +272,6 @@ class TestRunnerManager(threading.Thread): self.executor_cls = executor_cls self.executor_kwargs = executor_kwargs - self.test_source = None - # Flags used to shut down this thread if we get a sigint self.parent_stop_flag = stop_flag self.child_stop_flag = multiprocessing.Event() @@ -321,15 +310,14 @@ class TestRunnerManager(threading.Thread): that the manager should shut down the next time the event loop spins.""" self.logger = structuredlog.StructuredLogger(self.suite_name) - with self.browser_cls(self.logger, **self.browser_kwargs) as browser, self.test_source_cls(self.tests) as test_source: + with self.browser_cls(self.logger, **self.browser_kwargs) as browser: self.browser = BrowserManager(self.logger, browser, self.command_queue, no_timeout=self.debug_info is not None) - self.test_source = test_source dispatch = { RunnerManagerState.before_init: self.start_init, - RunnerManagerState.initalizing: self.init, + RunnerManagerState.initializing: self.init, RunnerManagerState.running: self.run_test, RunnerManagerState.restarting: self.restart_runner } @@ -374,7 +362,7 @@ class TestRunnerManager(threading.Thread): def wait_event(self): dispatch = { RunnerManagerState.before_init: {}, - RunnerManagerState.initalizing: + RunnerManagerState.initializing: { "init_succeeded": self.init_succeeded, "init_failed": self.init_failed, @@ -432,19 +420,18 @@ class TestRunnerManager(threading.Thread): return return f(*data) - def should_stop(self): return self.child_stop_flag.is_set() or self.parent_stop_flag.is_set() def start_init(self): - test, test_queue = self.get_next_test() + test, test_group = self.get_next_test() if test is None: return RunnerManagerState.stop() else: - return RunnerManagerState.initalizing(test, test_queue, 0) + return RunnerManagerState.initializing(test, test_group, 0) def init(self): - assert isinstance(self.state, RunnerManagerState.initalizing) + assert isinstance(self.state, RunnerManagerState.initializing) if self.state.failure_count > self.max_restarts: self.logger.error("Max restarts exceeded") return RunnerManagerState.error() @@ -455,9 +442,9 @@ class TestRunnerManager(threading.Thread): if result is Stop: return RunnerManagerState.error() elif not result: - return RunnerManagerState.initalizing(self.state.test, - self.state.test_queue, - self.state.failure_count + 1) + return RunnerManagerState.initializing(self.state.test, + self.state.test_group, + self.state.failure_count + 1) else: self.start_test_runner() @@ -465,7 +452,7 @@ class TestRunnerManager(threading.Thread): # Note that we need to be careful to start the browser before the # test runner to ensure that any state set when the browser is started # can be passed in to the test runner. - assert isinstance(self.state, RunnerManagerState.initalizing) + assert isinstance(self.state, RunnerManagerState.initializing) assert self.command_queue is not None assert self.remote_queue is not None self.logger.info("Starting runner") @@ -486,34 +473,29 @@ class TestRunnerManager(threading.Thread): # Now we wait for either an init_succeeded event or an init_failed event def init_succeeded(self): - assert isinstance(self.state, RunnerManagerState.initalizing) + assert isinstance(self.state, RunnerManagerState.initializing) self.browser.after_init() return RunnerManagerState.running(self.state.test, - self.state.test_queue) + self.state.test_group) def init_failed(self): - assert isinstance(self.state, RunnerManagerState.initalizing) + assert isinstance(self.state, RunnerManagerState.initializing) self.browser.after_init() self.stop_runner(force=True) - return RunnerManagerState.initalizing(self.state.test, - self.state.test_queue, - self.state.failure_count + 1) + return RunnerManagerState.initializing(self.state.test, + self.state.test_group, + self.state.failure_count + 1) - def get_next_test(self, test_queue=None): + def get_next_test(self, test_group=None): test = None while test is None: - if test_queue is None: - test_queue = self.test_source.get_queue() - if test_queue is None: + while test_group is None or len(test_group) == 0: + test_group = self.test_source.group() + if test_group is None: self.logger.info("No more tests") return None, None - try: - # Need to block here just to allow for contention with other processes - test = test_queue.get(block=True, timeout=2) - except Empty: - if test_queue.empty(): - test_queue = None - return test, test_queue + test = test_group.popleft() + return test, test_group def run_test(self): assert isinstance(self.state, RunnerManagerState.running) @@ -522,7 +504,7 @@ class TestRunnerManager(threading.Thread): if self.browser.update_settings(self.state.test): self.logger.info("Restarting browser for new test environment") return RunnerManagerState.restarting(self.state.test, - self.state.test_queue) + self.state.test_group) self.logger.test_start(self.state.test.id) self.send_message("run_test", self.state.test) @@ -595,22 +577,22 @@ class TestRunnerManager(threading.Thread): def after_test_end(self, restart): assert isinstance(self.state, RunnerManagerState.running) - test, test_queue = self.get_next_test() + test, test_group = self.get_next_test() if test is None: return RunnerManagerState.stop() - if test_queue != self.state.test_queue: + if test_group != self.state.test_group: # We are starting a new group of tests, so force a restart restart = True if restart: - return RunnerManagerState.restarting(test, test_queue) + return RunnerManagerState.restarting(test, test_group) else: - return RunnerManagerState.running(test, test_queue) + return RunnerManagerState.running(test, test_group) def restart_runner(self): """Stop and restart the TestRunner""" assert isinstance(self.state, RunnerManagerState.restarting) self.stop_runner() - return RunnerManagerState.initalizing(self.state.test, self.state.test_queue, 0) + return RunnerManagerState.initializing(self.state.test, self.state.test_group, 0) def log(self, action, kwargs): getattr(self.logger, action)(**kwargs) @@ -673,34 +655,16 @@ class TestRunnerManager(threading.Thread): except Empty: break -class TestQueue(object): - def __init__(self, test_source_cls, test_type, tests, **kwargs): - self.queue = None - self.test_source_cls = test_source_cls - self.test_type = test_type - self.tests = tests - self.kwargs = kwargs - def __enter__(self): - if not self.tests[self.test_type]: - return None +def make_test_queue(tests, test_source_cls, **test_source_kwargs): + queue = test_source_cls.make_queue(tests, **test_source_kwargs) - self.queue = Queue() - has_tests = self.test_source_cls.queue_tests(self.queue, - self.test_type, - self.tests, - **self.kwargs) - # There is a race condition that means sometimes we continue - # before the tests have been written to the underlying pipe. - # Polling the pipe for data here avoids that - self.queue._reader.poll(10) - assert not self.queue.empty() - return self.queue - - def __exit__(self, *args, **kwargs): - if self.queue is not None: - self.queue.close() - self.queue = None + # There is a race condition that means sometimes we continue + # before the tests have been written to the underlying pipe. + # Polling the pipe for data here avoids that + queue._reader.poll(10) + assert not queue.empty() + return queue class ManagerGroup(object): @@ -730,7 +694,6 @@ class ManagerGroup(object): # of sigint self.stop_flag = threading.Event() self.logger = structuredlog.StructuredLogger(suite_name) - self.test_queue = None def __enter__(self): return self @@ -742,30 +705,29 @@ class ManagerGroup(object): """Start all managers in the group""" self.logger.debug("Using %i processes" % self.size) - self.test_queue = TestQueue(self.test_source_cls, - test_type, - tests, - **self.test_source_kwargs) - with self.test_queue as test_queue: - if test_queue is None: - self.logger.info("No %s tests to run" % test_type) - return - for _ in range(self.size): - manager = TestRunnerManager(self.suite_name, - test_queue, - self.test_source_cls, - self.browser_cls, - self.browser_kwargs, - self.executor_cls, - self.executor_kwargs, - self.stop_flag, - self.pause_after_test, - self.pause_on_unexpected, - self.restart_on_unexpected, - self.debug_info) - manager.start() - self.pool.add(manager) - self.wait() + type_tests = tests[test_type] + if type_tests is None: + self.logger.info("No %s tests to run" % test_type) + return + + test_queue = make_test_queue(type_tests, self.test_source_cls, **self.test_source_kwargs) + + for _ in range(self.size): + manager = TestRunnerManager(self.suite_name, + test_queue, + self.test_source_cls, + self.browser_cls, + self.browser_kwargs, + self.executor_cls, + self.executor_kwargs, + self.stop_flag, + self.pause_after_test, + self.pause_on_unexpected, + self.restart_on_unexpected, + self.debug_info) + manager.start() + self.pool.add(manager) + self.wait() def is_alive(self): """Boolean indicating whether any manager in the group is still alive""" diff --git a/testing/web-platform/tests/tools/wptrunner/wptrunner/wptrunner.py b/testing/web-platform/tests/tools/wptrunner/wptrunner/wptrunner.py index 5579f8794cc0..c8e3e545d954 100644 --- a/testing/web-platform/tests/tools/wptrunner/wptrunner/wptrunner.py +++ b/testing/web-platform/tests/tools/wptrunner/wptrunner/wptrunner.py @@ -152,13 +152,13 @@ def run_tests(config, test_paths, product, **kwargs): run_info_extras=run_info_extras(**kwargs), **kwargs) + test_source_kwargs = {"processes": kwargs["processes"]} if kwargs["run_by_dir"] is False: test_source_cls = testloader.SingleTestSource - test_source_kwargs = {} else: # A value of None indicates infinite depth test_source_cls = testloader.PathGroupedSource - test_source_kwargs = {"depth": kwargs["run_by_dir"]} + test_source_kwargs["depth"] = kwargs["run_by_dir"] logger.info("Using %i client processes" % kwargs["processes"])