From 97a3928637755b7ea1b3d5a62a752ab6ad5d5302 Mon Sep 17 00:00:00 2001 From: jugglinmike Date: Mon, 19 Nov 2018 18:44:42 +0000 Subject: [PATCH] Bug 1506697 [wpt PR 14024] - [wptserve] Eliminate race condition, a=testonly Automatic update from web-platform-tests[wptserve] Eliminate race condition (#14024) This race condition was expressed during testing sessions where the first test to use the Stash feature issued did so with multiple requests made in parallel. -- wpt-commits: cbb25e2c99696956ed2a36e7bcdbdee1dca71705 wpt-pr: 14024 --- .../tests/tools/wptserve/tests/test_stash.py | 130 ++++++++++++++++++ .../tests/tools/wptserve/wptserve/stash.py | 12 +- 2 files changed, 141 insertions(+), 1 deletion(-) create mode 100644 testing/web-platform/tests/tools/wptserve/tests/test_stash.py diff --git a/testing/web-platform/tests/tools/wptserve/tests/test_stash.py b/testing/web-platform/tests/tools/wptserve/tests/test_stash.py new file mode 100644 index 000000000000..4813504a7e1f --- /dev/null +++ b/testing/web-platform/tests/tools/wptserve/tests/test_stash.py @@ -0,0 +1,130 @@ +import threading +import multiprocessing +from multiprocessing.managers import BaseManager + +import pytest + +Stash = pytest.importorskip("wptserve.stash").Stash + +@pytest.fixture() +def add_cleanup(): + fns = [] + + def add(fn): + fns.append(fn) + + yield add + + for fn in fns: + fn() + +def run(process_queue, request_lock, response_lock): + """Create two Stash instances in parallel threads. Use the provided locks + to ensure the first thread is actively establishing an interprocess + communication channel at the moment the second thread executes.""" + + def target(thread_queue): + stash = Stash("/", ("localhost", 4543), b"some key") + + # The `lock` property of the Stash instance should always be set + # immediately following initialization. These values are asserted in + # the active test. + thread_queue.put(stash.lock is None) + + thread_queue = multiprocessing.Queue() + first = threading.Thread(target=target, args=(thread_queue,)) + second = threading.Thread(target=target, args=(thread_queue,)) + + request_lock.acquire() + response_lock.acquire() + first.start() + + request_lock.acquire() + + # At this moment, the `first` thread is waiting for a proxied object. + # Create a second thread in order to inspect the behavior of the Stash + # constructor at this moment. + + second.start() + + # Allow the `first` thread to proceed + + response_lock.release() + + # Wait for both threads to complete and report their stateto the test + process_queue.put(thread_queue.get()) + process_queue.put(thread_queue.get()) + + +def test_delayed_lock(add_cleanup): + """Ensure that delays in proxied Lock retrieval do not interfere with + initialization in parallel threads.""" + + class SlowLock(BaseManager): + pass + + request_lock = multiprocessing.Lock() + response_lock = multiprocessing.Lock() + + queue = multiprocessing.Queue() + + def mutex_lock_request(): + """This request handler allows the caller to delay execution of a + thread which has requested a proxied representation of the `lock` + property, simulating a "slow" interprocess communication channel.""" + + request_lock.release() + response_lock.acquire() + return threading.Lock() + + SlowLock.register("get_dict", callable=lambda: {}) + SlowLock.register("Lock", callable=mutex_lock_request) + + slowlock = SlowLock(("localhost", 4543), b"some key") + slowlock.start() + add_cleanup(lambda: slowlock.shutdown()) + + parallel = multiprocessing.Process(target=run, + args=(queue, request_lock, response_lock)) + parallel.start() + add_cleanup(lambda: parallel.terminate()) + + assert [queue.get(), queue.get()] == [False, False], ( + "both instances had valid locks") + +def test_delayed_dict(add_cleanup): + """Ensure that delays in proxied `dict` retrieval do not interfere with + initialization in parallel threads.""" + + class SlowDict(BaseManager): + pass + + request_lock = multiprocessing.Lock() + response_lock = multiprocessing.Lock() + + queue = multiprocessing.Queue() + + # This request handler allows the caller to delay execution of a thread + # which has requested a proxied representation of the "get_dict" property. + def mutex_dict_request(): + """This request handler allows the caller to delay execution of a + thread which has requested a proxied representation of the `get_dict` + property, simulating a "slow" interprocess communication channel.""" + request_lock.release() + response_lock.acquire() + return {} + + SlowDict.register("get_dict", callable=mutex_dict_request) + SlowDict.register("Lock", callable=lambda: threading.Lock()) + + slowdict = SlowDict(("localhost", 4543), b"some key") + slowdict.start() + add_cleanup(lambda: slowdict.shutdown()) + + parallel = multiprocessing.Process(target=run, + args=(queue, request_lock, response_lock)) + parallel.start() + add_cleanup(lambda: parallel.terminate()) + + assert [queue.get(), queue.get()] == [False, False], ( + "both instances had valid locks") diff --git a/testing/web-platform/tests/tools/wptserve/wptserve/stash.py b/testing/web-platform/tests/tools/wptserve/wptserve/stash.py index ae48a8611233..d13703c16ebb 100644 --- a/testing/web-platform/tests/tools/wptserve/wptserve/stash.py +++ b/testing/web-platform/tests/tools/wptserve/wptserve/stash.py @@ -104,6 +104,7 @@ class Stash(object): _proxy = None lock = None + _initializing = threading.Lock() def __init__(self, default_path, address=None, authkey=None): self.default_path = default_path @@ -115,7 +116,16 @@ class Stash(object): Stash._proxy = {} Stash.lock = threading.Lock() - if Stash._proxy is None: + # Initializing the proxy involves connecting to the remote process and + # retrieving two proxied objects. This process is not inherently + # atomic, so a lock must be used to make it so. Atomicity ensures that + # only one thread attempts to initialize the connection and that any + # threads running in parallel correctly wait for initialization to be + # fully complete. + with Stash._initializing: + if Stash.lock: + return + manager = ClientDictManager(address, authkey) manager.connect() Stash._proxy = manager.get_dict()