mirror of
https://github.com/mozilla/gecko-dev.git
synced 2024-10-13 13:25:37 +00:00
Bug 1499534: Add Pause, Resume, and IsPaused methods to ThrottledEventQueue. r=froydnj
Depends on D8913 Differential Revision: https://phabricator.services.mozilla.com/D8914 --HG-- extra : moz-landing-system : lando
This commit is contained in:
parent
f1a7d75388
commit
a78e7a516e
@ -407,3 +407,203 @@ TEST(ThrottledEventQueue, AwaitIdleMixed)
|
||||
|
||||
ASSERT_TRUE(NS_SUCCEEDED(thread->Shutdown()));
|
||||
}
|
||||
|
||||
TEST(ThrottledEventQueue, SimplePauseResume)
|
||||
{
|
||||
string log;
|
||||
|
||||
auto base = MakeRefPtr<RunnableQueue>();
|
||||
RefPtr<ThrottledEventQueue> throttled = ThrottledEventQueue::Create(base);
|
||||
|
||||
ASSERT_FALSE(throttled->IsPaused());
|
||||
|
||||
Enqueue(throttled, [&]() { log += 'a'; });
|
||||
|
||||
ASSERT_EQ(log, "");
|
||||
ASSERT_TRUE(NS_SUCCEEDED(base->Run()));
|
||||
ASSERT_EQ(log, "a");
|
||||
|
||||
ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(true)));
|
||||
ASSERT_TRUE(throttled->IsPaused());
|
||||
|
||||
Enqueue(throttled, [&]() { log += 'b'; });
|
||||
|
||||
ASSERT_EQ(log, "a");
|
||||
ASSERT_TRUE(NS_SUCCEEDED(base->Run()));
|
||||
ASSERT_EQ(log, "a");
|
||||
|
||||
ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(false)));
|
||||
ASSERT_FALSE(throttled->IsPaused());
|
||||
|
||||
ASSERT_EQ(log, "a");
|
||||
ASSERT_TRUE(NS_SUCCEEDED(base->Run()));
|
||||
ASSERT_EQ(log, "ab");
|
||||
|
||||
ASSERT_TRUE(base->IsEmpty());
|
||||
ASSERT_TRUE(throttled->IsEmpty());
|
||||
}
|
||||
|
||||
TEST(ThrottledEventQueue, MixedPauseResume)
|
||||
{
|
||||
string log;
|
||||
|
||||
auto base = MakeRefPtr<RunnableQueue>();
|
||||
RefPtr<ThrottledEventQueue> throttled = ThrottledEventQueue::Create(base);
|
||||
|
||||
ASSERT_FALSE(throttled->IsPaused());
|
||||
|
||||
Enqueue(base, [&]() { log += 'A'; });
|
||||
Enqueue(throttled, [&]() {
|
||||
log += 'b';
|
||||
MOZ_ALWAYS_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(true)));
|
||||
});
|
||||
Enqueue(throttled, [&]() { log += 'c'; });
|
||||
Enqueue(base, [&]() { log += 'D'; });
|
||||
|
||||
ASSERT_EQ(log, "");
|
||||
ASSERT_TRUE(NS_SUCCEEDED(base->Run()));
|
||||
// Since the 'b' event paused the throttled queue, 'c' should not have run.
|
||||
// but 'D' was enqueued directly on the base, and should have run.
|
||||
ASSERT_EQ(log, "AbD");
|
||||
ASSERT_TRUE(base->IsEmpty());
|
||||
ASSERT_FALSE(throttled->IsEmpty());
|
||||
ASSERT_TRUE(throttled->IsPaused());
|
||||
|
||||
Enqueue(base, [&]() { log += 'E'; });
|
||||
ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(false)));
|
||||
Enqueue(base, [&]() { log += 'F'; });
|
||||
ASSERT_FALSE(throttled->IsPaused());
|
||||
|
||||
ASSERT_TRUE(NS_SUCCEEDED(base->Run()));
|
||||
// Since we've unpaused, 'c' should be able to run now. The executor should have
|
||||
// been enqueued between 'E' and 'F'.
|
||||
ASSERT_EQ(log, "AbDEcF");
|
||||
|
||||
ASSERT_TRUE(base->IsEmpty());
|
||||
ASSERT_TRUE(throttled->IsEmpty());
|
||||
}
|
||||
|
||||
TEST(ThrottledEventQueue, AwaitIdlePaused)
|
||||
{
|
||||
Mutex mutex("AwaitIdlePaused");
|
||||
CondVar cond(mutex, "AwaitIdlePaused");
|
||||
|
||||
string dequeue_await; // mutex
|
||||
bool threadFinished = false; // mutex & cond
|
||||
bool runnableFinished = false; // main thread only
|
||||
|
||||
auto base = MakeRefPtr<RunnableQueue>();
|
||||
RefPtr<ThrottledEventQueue> throttled = ThrottledEventQueue::Create(base);
|
||||
|
||||
ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(true)));
|
||||
|
||||
// Put an event in the queue so the AwaitIdle might block. Since throttled is
|
||||
// paused, this should not enqueue an executor in the base target.
|
||||
Enqueue(throttled, [&]() { runnableFinished = true; });
|
||||
ASSERT_TRUE(base->IsEmpty());
|
||||
|
||||
// Create a separate thread that waits for the queue to become idle, and
|
||||
// then takes observable action.
|
||||
nsCOMPtr<nsIRunnable> await =
|
||||
NS_NewRunnableFunction(
|
||||
"AwaitIdlePaused",
|
||||
[&]() {
|
||||
throttled->AwaitIdle();
|
||||
MutexAutoLock lock(mutex);
|
||||
dequeue_await += " await";
|
||||
threadFinished = true;
|
||||
cond.Notify();
|
||||
});
|
||||
|
||||
nsCOMPtr<nsIThread> thread;
|
||||
nsresult rv = NS_NewNamedThread("AwaitIdlePaused", getter_AddRefs(thread),
|
||||
await);
|
||||
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
||||
|
||||
// We can't guarantee that the thread has reached the AwaitIdle call, but we
|
||||
// can get pretty close. Either way, it shouldn't affect the behavior of the
|
||||
// test.
|
||||
PR_Sleep(PR_MillisecondsToInterval(100));
|
||||
|
||||
// The AwaitIdle call should be blocked, even though there is no executor,
|
||||
// because throttled is paused.
|
||||
{
|
||||
MutexAutoLock lock(mutex);
|
||||
ASSERT_EQ(dequeue_await, "");
|
||||
dequeue_await += "dequeue";
|
||||
ASSERT_FALSE(threadFinished);
|
||||
}
|
||||
|
||||
// A paused TEQ contributes no events to its base target. (This is covered by
|
||||
// other tests...)
|
||||
ASSERT_TRUE(NS_SUCCEEDED(base->Run()));
|
||||
ASSERT_TRUE(base->IsEmpty());
|
||||
ASSERT_FALSE(throttled->IsEmpty());
|
||||
|
||||
// Resume and drain the queue.
|
||||
ASSERT_FALSE(runnableFinished);
|
||||
ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(false)));
|
||||
ASSERT_TRUE(NS_SUCCEEDED(base->Run()));
|
||||
ASSERT_TRUE(base->IsEmpty());
|
||||
ASSERT_TRUE(throttled->IsEmpty());
|
||||
ASSERT_TRUE(runnableFinished);
|
||||
|
||||
// Wait for the thread to finish.
|
||||
{
|
||||
MutexAutoLock lock(mutex);
|
||||
while (!threadFinished)
|
||||
cond.Wait();
|
||||
ASSERT_EQ(dequeue_await, "dequeue await");
|
||||
}
|
||||
|
||||
ASSERT_TRUE(NS_SUCCEEDED(thread->Shutdown()));
|
||||
}
|
||||
|
||||
TEST(ThrottledEventQueue, ExecutorTransitions)
|
||||
{
|
||||
string log;
|
||||
|
||||
auto base = MakeRefPtr<RunnableQueue>();
|
||||
RefPtr<ThrottledEventQueue> throttled = ThrottledEventQueue::Create(base);
|
||||
|
||||
ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(true)));
|
||||
|
||||
// Since we're paused, queueing an event on throttled shouldn't queue the
|
||||
// executor on the base target.
|
||||
Enqueue(throttled, [&]() { log += 'a'; });
|
||||
ASSERT_EQ(throttled->Length(), 1U);
|
||||
ASSERT_EQ(base->Length(), 0U);
|
||||
|
||||
// Resuming throttled should create the executor, since throttled is not empty.
|
||||
ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(false)));
|
||||
ASSERT_EQ(throttled->Length(), 1U);
|
||||
ASSERT_EQ(base->Length(), 1U);
|
||||
|
||||
// Pausing can't remove the executor from the base target since we've already
|
||||
// queued it there, but it can ensure that it doesn't do anything.
|
||||
ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(true)));
|
||||
|
||||
ASSERT_EQ(log, "");
|
||||
ASSERT_TRUE(NS_SUCCEEDED(base->Run()));
|
||||
ASSERT_EQ(log, "");
|
||||
ASSERT_EQ(throttled->Length(), 1U);
|
||||
ASSERT_EQ(base->Length(), 0U);
|
||||
|
||||
// As before, resuming must create the executor, since throttled is not empty.
|
||||
ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(false)));
|
||||
ASSERT_EQ(throttled->Length(), 1U);
|
||||
ASSERT_EQ(base->Length(), 1U);
|
||||
|
||||
ASSERT_EQ(log, "");
|
||||
ASSERT_TRUE(NS_SUCCEEDED(base->Run()));
|
||||
ASSERT_EQ(log, "a");
|
||||
ASSERT_EQ(throttled->Length(), 0U);
|
||||
ASSERT_EQ(base->Length(), 0U);
|
||||
|
||||
// Since throttled is empty, pausing and resuming now should not enqueue an
|
||||
// executor.
|
||||
ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(true)));
|
||||
ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(false)));
|
||||
ASSERT_EQ(throttled->Length(), 0U);
|
||||
ASSERT_EQ(base->Length(), 0U);
|
||||
}
|
||||
|
@ -46,6 +46,18 @@ namespace {
|
||||
// live for as long as the executor exists - that is, until the Inner's queue is
|
||||
// empty.
|
||||
//
|
||||
// A Paused ThrottledEventQueue does not enqueue an executor when new events are
|
||||
// added. Any executor previously queued on the base event target draws no
|
||||
// events from a Paused ThrottledEventQueue, and returns without re-enqueueing
|
||||
// itself. Since there is no executor keeping the Inner object alive until its
|
||||
// queue is empty, dropping a Paused ThrottledEventQueue may drop the Inner
|
||||
// while it still owns events. This is the correct behavior: if there are no
|
||||
// references to it, it will never be Resumed, and thus it will never dispatch
|
||||
// events again.
|
||||
//
|
||||
// Resuming a ThrottledEventQueue must dispatch an executor, so calls to Resume
|
||||
// are fallible for the same reasons as calls to Dispatch.
|
||||
//
|
||||
// The xpcom shutdown process drains the main thread's event queue several
|
||||
// times, so if a ThrottledEventQueue is being driven by the main thread, it
|
||||
// should get emptied out by the time we reach the "eventq shutdown" phase.
|
||||
@ -86,22 +98,31 @@ class ThrottledEventQueue::Inner final : public nsISupports
|
||||
mutable CondVar mIdleCondVar;
|
||||
|
||||
// As-of-yet unexecuted runnables queued on this ThrottledEventQueue.
|
||||
// (Used from any thread, protected by mMutex.)
|
||||
//
|
||||
// Used from any thread; protected by mMutex. Signals mIdleCondVar when
|
||||
// emptied.
|
||||
EventQueue mEventQueue;
|
||||
|
||||
// The event target we dispatch our events (actually, just our Executor) to.
|
||||
// (Written during construction on main thread; read by any thread.)
|
||||
//
|
||||
// Written only during construction. Readable by any thread without locking.
|
||||
nsCOMPtr<nsISerialEventTarget> mBaseTarget;
|
||||
|
||||
// The Executor that we dispatch to mBaseTarget to draw runnables from our
|
||||
// queue. mExecutor->mInner points to this Inner, forming a reference loop.
|
||||
// (Used from any thread, protected by mMutex.)
|
||||
//
|
||||
// Used from any thread; protected by mMutex.
|
||||
nsCOMPtr<nsIRunnable> mExecutor;
|
||||
|
||||
// True if this queue is currently paused.
|
||||
// Used from any thread; protected by mMutex.
|
||||
bool mIsPaused;
|
||||
|
||||
explicit Inner(nsISerialEventTarget* aBaseTarget)
|
||||
: mMutex("ThrottledEventQueue")
|
||||
, mIdleCondVar(mMutex, "ThrottledEventQueue:Idle")
|
||||
, mBaseTarget(aBaseTarget)
|
||||
, mIsPaused(false)
|
||||
{
|
||||
}
|
||||
|
||||
@ -109,11 +130,40 @@ class ThrottledEventQueue::Inner final : public nsISupports
|
||||
{
|
||||
#ifdef DEBUG
|
||||
MutexAutoLock lock(mMutex);
|
||||
|
||||
// As long as an executor exists, it had better keep us alive, since it's
|
||||
// going to call ExecuteRunnable on us.
|
||||
MOZ_ASSERT(!mExecutor);
|
||||
MOZ_ASSERT(mEventQueue.IsEmpty(lock));
|
||||
|
||||
// If we have any events in our queue, there should be an executor queued
|
||||
// for them, and that should have kept us alive. The exception is that, if
|
||||
// we're paused, we don't enqueue an executor.
|
||||
MOZ_ASSERT(mEventQueue.IsEmpty(lock) || IsPaused(lock));
|
||||
|
||||
// Some runnables are only safe to drop on the main thread, so if our queue
|
||||
// isn't empty, we'd better be on the main thread.
|
||||
MOZ_ASSERT_IF(!mEventQueue.IsEmpty(lock), NS_IsMainThread());
|
||||
#endif
|
||||
}
|
||||
|
||||
// Make sure an executor has been queued on our base target. If we already
|
||||
// have one, do nothing; otherwise, create and dispatch it.
|
||||
nsresult EnsureExecutor(MutexAutoLock& lock) {
|
||||
if (mExecutor)
|
||||
return NS_OK;
|
||||
|
||||
// Note, this creates a ref cycle keeping the inner alive
|
||||
// until the queue is drained.
|
||||
mExecutor = new Executor(this);
|
||||
nsresult rv = mBaseTarget->Dispatch(mExecutor, NS_DISPATCH_NORMAL);
|
||||
if (NS_WARN_IF(NS_FAILED(rv))) {
|
||||
mExecutor = nullptr;
|
||||
return rv;
|
||||
}
|
||||
|
||||
return NS_OK;
|
||||
}
|
||||
|
||||
nsresult
|
||||
CurrentName(nsACString& aName)
|
||||
{
|
||||
@ -158,6 +208,17 @@ class ThrottledEventQueue::Inner final : public nsISupports
|
||||
{
|
||||
MutexAutoLock lock(mMutex);
|
||||
|
||||
// Normally, a paused queue doesn't dispatch any executor, but we might
|
||||
// have been paused after the executor was already in flight. There's no
|
||||
// way to yank the executor out of the base event target, so we just check
|
||||
// for a paused queue here and return without running anything. We'll
|
||||
// create a new executor when we're resumed.
|
||||
if (IsPaused(lock)) {
|
||||
// Note, this breaks a ref cycle.
|
||||
mExecutor = nullptr;
|
||||
return;
|
||||
}
|
||||
|
||||
// We only dispatch an executor runnable when we know there is something
|
||||
// in the queue, so this should never fail.
|
||||
event = mEventQueue.GetEvent(nullptr, lock);
|
||||
@ -228,11 +289,43 @@ public:
|
||||
#endif
|
||||
|
||||
MutexAutoLock lock(mMutex);
|
||||
while (mExecutor) {
|
||||
while (mExecutor || IsPaused(lock)) {
|
||||
mIdleCondVar.Wait();
|
||||
}
|
||||
}
|
||||
|
||||
bool
|
||||
IsPaused() const
|
||||
{
|
||||
MutexAutoLock lock(mMutex);
|
||||
return IsPaused(lock);
|
||||
}
|
||||
|
||||
bool
|
||||
IsPaused(const MutexAutoLock& aProofOfLock) const
|
||||
{
|
||||
return mIsPaused;
|
||||
}
|
||||
|
||||
nsresult
|
||||
SetIsPaused(bool aIsPaused)
|
||||
{
|
||||
MutexAutoLock lock(mMutex);
|
||||
|
||||
// If we will be unpaused, and we have events in our queue, make sure we
|
||||
// have an executor queued on the base event target to run them. Do this
|
||||
// before we actually change mIsPaused, since this is fallible.
|
||||
if (!aIsPaused && !mEventQueue.IsEmpty(lock)) {
|
||||
nsresult rv = EnsureExecutor(lock);
|
||||
if (NS_FAILED(rv)) {
|
||||
return rv;
|
||||
}
|
||||
}
|
||||
|
||||
mIsPaused = aIsPaused;
|
||||
return NS_OK;
|
||||
}
|
||||
|
||||
nsresult
|
||||
DispatchFromScript(nsIRunnable* aEvent, uint32_t aFlags)
|
||||
{
|
||||
@ -250,19 +343,13 @@ public:
|
||||
// Any thread
|
||||
MutexAutoLock lock(mMutex);
|
||||
|
||||
// We are not currently processing events, so we must start
|
||||
// operating on our base target. This is fallible, so do
|
||||
// it first. Our lock will prevent the executor from accessing
|
||||
// the event queue before we add the event below.
|
||||
if (!mExecutor) {
|
||||
// Note, this creates a ref cycle keeping the inner alive
|
||||
// until the queue is drained.
|
||||
mExecutor = new Executor(this);
|
||||
nsresult rv = mBaseTarget->Dispatch(mExecutor, NS_DISPATCH_NORMAL);
|
||||
if (NS_WARN_IF(NS_FAILED(rv))) {
|
||||
mExecutor = nullptr;
|
||||
if (!IsPaused(lock)) {
|
||||
// Make sure we have an executor in flight to process events. This is
|
||||
// fallible, so do it first. Our lock will prevent the executor from
|
||||
// accessing the event queue before we add the event below.
|
||||
nsresult rv = EnsureExecutor(lock);
|
||||
if (NS_FAILED(rv))
|
||||
return rv;
|
||||
}
|
||||
}
|
||||
|
||||
// Only add the event to the underlying queue if are able to
|
||||
@ -332,6 +419,18 @@ ThrottledEventQueue::AwaitIdle() const
|
||||
return mInner->AwaitIdle();
|
||||
}
|
||||
|
||||
nsresult
|
||||
ThrottledEventQueue::SetIsPaused(bool aIsPaused)
|
||||
{
|
||||
return mInner->SetIsPaused(aIsPaused);
|
||||
}
|
||||
|
||||
bool
|
||||
ThrottledEventQueue::IsPaused() const
|
||||
{
|
||||
return mInner->IsPaused();
|
||||
}
|
||||
|
||||
NS_IMETHODIMP
|
||||
ThrottledEventQueue::DispatchFromScript(nsIRunnable* aEvent, uint32_t aFlags)
|
||||
{
|
||||
|
@ -22,7 +22,7 @@ namespace mozilla {
|
||||
// own queue of events and only dispatches one at a time to the wrapped
|
||||
// target. This can be used to avoid flooding the base target.
|
||||
//
|
||||
// Flooding is avoided via a very simply principal. Runnables dispatched
|
||||
// Flooding is avoided via a very simple principle. Runnables dispatched
|
||||
// to the ThrottledEventQueue are only dispatched to the base target
|
||||
// one at a time. Only once that runnable has executed will we dispatch
|
||||
// the next runnable to the base target. This in effect makes all
|
||||
@ -74,10 +74,32 @@ public:
|
||||
// Determine how many events are pending in the queue.
|
||||
uint32_t Length() const;
|
||||
|
||||
// Block the current thread until the queue is empty. This may not
|
||||
// be called on the main thread or the base target.
|
||||
// Block the current thread until the queue is empty. This may not be called
|
||||
// on the main thread or the base target. The ThrottledEventQueue must not be
|
||||
// paused.
|
||||
void AwaitIdle() const;
|
||||
|
||||
// If |aIsPaused| is true, pause execution of events from this queue. No
|
||||
// events from this queue will be run until this is called with |aIsPaused|
|
||||
// false.
|
||||
//
|
||||
// To un-pause a ThrottledEventQueue, we need to dispatch a runnable to the
|
||||
// underlying event target. That operation may fail, so this method is
|
||||
// fallible as well.
|
||||
//
|
||||
// Note that, although ThrottledEventQueue's behavior is descibed as queueing
|
||||
// events on the base target, an event queued on a TEQ is never actually moved
|
||||
// to any other queue. What is actually dispatched to the base is an
|
||||
// "executor" event which, when run, removes an event from the TEQ and runs it
|
||||
// immediately. This means that you can pause a TEQ even after the executor
|
||||
// has been queued on the base target, and even so, no events from the TEQ
|
||||
// will run. When the base target gets around to running the executor, the
|
||||
// executor will see that the TEQ is paused, and do nothing.
|
||||
MOZ_MUST_USE nsresult SetIsPaused(bool aIsPaused);
|
||||
|
||||
// Return true if this ThrottledEventQueue is paused.
|
||||
bool IsPaused() const;
|
||||
|
||||
NS_DECL_THREADSAFE_ISUPPORTS
|
||||
NS_DECL_NSIEVENTTARGET_FULL
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user