Bug 1506697 [wpt PR 14024] - [wptserve] Eliminate race condition, a=testonly

Automatic update from web-platform-tests[wptserve] Eliminate race condition (#14024)

This race condition was expressed during testing sessions where the
first test to use the Stash feature issued did so with multiple requests
made in parallel.

--

wpt-commits: cbb25e2c99696956ed2a36e7bcdbdee1dca71705
wpt-pr: 14024
This commit is contained in:
jugglinmike 2018-11-19 18:44:42 +00:00 committed by moz-wptsync-bot
parent 7f1ab0b64c
commit 97a3928637
2 changed files with 141 additions and 1 deletions

View File

@ -0,0 +1,130 @@
import threading
import multiprocessing
from multiprocessing.managers import BaseManager
import pytest
Stash = pytest.importorskip("wptserve.stash").Stash
@pytest.fixture()
def add_cleanup():
fns = []
def add(fn):
fns.append(fn)
yield add
for fn in fns:
fn()
def run(process_queue, request_lock, response_lock):
"""Create two Stash instances in parallel threads. Use the provided locks
to ensure the first thread is actively establishing an interprocess
communication channel at the moment the second thread executes."""
def target(thread_queue):
stash = Stash("/", ("localhost", 4543), b"some key")
# The `lock` property of the Stash instance should always be set
# immediately following initialization. These values are asserted in
# the active test.
thread_queue.put(stash.lock is None)
thread_queue = multiprocessing.Queue()
first = threading.Thread(target=target, args=(thread_queue,))
second = threading.Thread(target=target, args=(thread_queue,))
request_lock.acquire()
response_lock.acquire()
first.start()
request_lock.acquire()
# At this moment, the `first` thread is waiting for a proxied object.
# Create a second thread in order to inspect the behavior of the Stash
# constructor at this moment.
second.start()
# Allow the `first` thread to proceed
response_lock.release()
# Wait for both threads to complete and report their stateto the test
process_queue.put(thread_queue.get())
process_queue.put(thread_queue.get())
def test_delayed_lock(add_cleanup):
"""Ensure that delays in proxied Lock retrieval do not interfere with
initialization in parallel threads."""
class SlowLock(BaseManager):
pass
request_lock = multiprocessing.Lock()
response_lock = multiprocessing.Lock()
queue = multiprocessing.Queue()
def mutex_lock_request():
"""This request handler allows the caller to delay execution of a
thread which has requested a proxied representation of the `lock`
property, simulating a "slow" interprocess communication channel."""
request_lock.release()
response_lock.acquire()
return threading.Lock()
SlowLock.register("get_dict", callable=lambda: {})
SlowLock.register("Lock", callable=mutex_lock_request)
slowlock = SlowLock(("localhost", 4543), b"some key")
slowlock.start()
add_cleanup(lambda: slowlock.shutdown())
parallel = multiprocessing.Process(target=run,
args=(queue, request_lock, response_lock))
parallel.start()
add_cleanup(lambda: parallel.terminate())
assert [queue.get(), queue.get()] == [False, False], (
"both instances had valid locks")
def test_delayed_dict(add_cleanup):
"""Ensure that delays in proxied `dict` retrieval do not interfere with
initialization in parallel threads."""
class SlowDict(BaseManager):
pass
request_lock = multiprocessing.Lock()
response_lock = multiprocessing.Lock()
queue = multiprocessing.Queue()
# This request handler allows the caller to delay execution of a thread
# which has requested a proxied representation of the "get_dict" property.
def mutex_dict_request():
"""This request handler allows the caller to delay execution of a
thread which has requested a proxied representation of the `get_dict`
property, simulating a "slow" interprocess communication channel."""
request_lock.release()
response_lock.acquire()
return {}
SlowDict.register("get_dict", callable=mutex_dict_request)
SlowDict.register("Lock", callable=lambda: threading.Lock())
slowdict = SlowDict(("localhost", 4543), b"some key")
slowdict.start()
add_cleanup(lambda: slowdict.shutdown())
parallel = multiprocessing.Process(target=run,
args=(queue, request_lock, response_lock))
parallel.start()
add_cleanup(lambda: parallel.terminate())
assert [queue.get(), queue.get()] == [False, False], (
"both instances had valid locks")

View File

@ -104,6 +104,7 @@ class Stash(object):
_proxy = None
lock = None
_initializing = threading.Lock()
def __init__(self, default_path, address=None, authkey=None):
self.default_path = default_path
@ -115,7 +116,16 @@ class Stash(object):
Stash._proxy = {}
Stash.lock = threading.Lock()
if Stash._proxy is None:
# Initializing the proxy involves connecting to the remote process and
# retrieving two proxied objects. This process is not inherently
# atomic, so a lock must be used to make it so. Atomicity ensures that
# only one thread attempts to initialize the connection and that any
# threads running in parallel correctly wait for initialization to be
# fully complete.
with Stash._initializing:
if Stash.lock:
return
manager = ClientDictManager(address, authkey)
manager.connect()
Stash._proxy = manager.get_dict()