From a78e7a516e5fc7fa1fa704438e8a1828631177df Mon Sep 17 00:00:00 2001 From: Jim Blandy Date: Tue, 23 Oct 2018 06:21:10 +0000 Subject: [PATCH] Bug 1499534: Add Pause, Resume, and IsPaused methods to ThrottledEventQueue. r=froydnj Depends on D8913 Differential Revision: https://phabricator.services.mozilla.com/D8914 --HG-- extra : moz-landing-system : lando --- xpcom/tests/gtest/TestThrottledEventQueue.cpp | 200 ++++++++++++++++++ xpcom/threads/ThrottledEventQueue.cpp | 133 ++++++++++-- xpcom/threads/ThrottledEventQueue.h | 28 ++- 3 files changed, 341 insertions(+), 20 deletions(-) diff --git a/xpcom/tests/gtest/TestThrottledEventQueue.cpp b/xpcom/tests/gtest/TestThrottledEventQueue.cpp index 05206b3e7dcf..610de9acdfbb 100644 --- a/xpcom/tests/gtest/TestThrottledEventQueue.cpp +++ b/xpcom/tests/gtest/TestThrottledEventQueue.cpp @@ -407,3 +407,203 @@ TEST(ThrottledEventQueue, AwaitIdleMixed) ASSERT_TRUE(NS_SUCCEEDED(thread->Shutdown())); } + +TEST(ThrottledEventQueue, SimplePauseResume) +{ + string log; + + auto base = MakeRefPtr(); + RefPtr throttled = ThrottledEventQueue::Create(base); + + ASSERT_FALSE(throttled->IsPaused()); + + Enqueue(throttled, [&]() { log += 'a'; }); + + ASSERT_EQ(log, ""); + ASSERT_TRUE(NS_SUCCEEDED(base->Run())); + ASSERT_EQ(log, "a"); + + ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(true))); + ASSERT_TRUE(throttled->IsPaused()); + + Enqueue(throttled, [&]() { log += 'b'; }); + + ASSERT_EQ(log, "a"); + ASSERT_TRUE(NS_SUCCEEDED(base->Run())); + ASSERT_EQ(log, "a"); + + ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(false))); + ASSERT_FALSE(throttled->IsPaused()); + + ASSERT_EQ(log, "a"); + ASSERT_TRUE(NS_SUCCEEDED(base->Run())); + ASSERT_EQ(log, "ab"); + + ASSERT_TRUE(base->IsEmpty()); + ASSERT_TRUE(throttled->IsEmpty()); +} + +TEST(ThrottledEventQueue, MixedPauseResume) +{ + string log; + + auto base = MakeRefPtr(); + RefPtr throttled = ThrottledEventQueue::Create(base); + + ASSERT_FALSE(throttled->IsPaused()); + + Enqueue(base, [&]() { log += 'A'; }); + Enqueue(throttled, [&]() { + log += 'b'; + MOZ_ALWAYS_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(true))); + }); + Enqueue(throttled, [&]() { log += 'c'; }); + Enqueue(base, [&]() { log += 'D'; }); + + ASSERT_EQ(log, ""); + ASSERT_TRUE(NS_SUCCEEDED(base->Run())); + // Since the 'b' event paused the throttled queue, 'c' should not have run. + // but 'D' was enqueued directly on the base, and should have run. + ASSERT_EQ(log, "AbD"); + ASSERT_TRUE(base->IsEmpty()); + ASSERT_FALSE(throttled->IsEmpty()); + ASSERT_TRUE(throttled->IsPaused()); + + Enqueue(base, [&]() { log += 'E'; }); + ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(false))); + Enqueue(base, [&]() { log += 'F'; }); + ASSERT_FALSE(throttled->IsPaused()); + + ASSERT_TRUE(NS_SUCCEEDED(base->Run())); + // Since we've unpaused, 'c' should be able to run now. The executor should have + // been enqueued between 'E' and 'F'. + ASSERT_EQ(log, "AbDEcF"); + + ASSERT_TRUE(base->IsEmpty()); + ASSERT_TRUE(throttled->IsEmpty()); +} + +TEST(ThrottledEventQueue, AwaitIdlePaused) +{ + Mutex mutex("AwaitIdlePaused"); + CondVar cond(mutex, "AwaitIdlePaused"); + + string dequeue_await; // mutex + bool threadFinished = false; // mutex & cond + bool runnableFinished = false; // main thread only + + auto base = MakeRefPtr(); + RefPtr throttled = ThrottledEventQueue::Create(base); + + ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(true))); + + // Put an event in the queue so the AwaitIdle might block. Since throttled is + // paused, this should not enqueue an executor in the base target. + Enqueue(throttled, [&]() { runnableFinished = true; }); + ASSERT_TRUE(base->IsEmpty()); + + // Create a separate thread that waits for the queue to become idle, and + // then takes observable action. + nsCOMPtr await = + NS_NewRunnableFunction( + "AwaitIdlePaused", + [&]() { + throttled->AwaitIdle(); + MutexAutoLock lock(mutex); + dequeue_await += " await"; + threadFinished = true; + cond.Notify(); + }); + + nsCOMPtr thread; + nsresult rv = NS_NewNamedThread("AwaitIdlePaused", getter_AddRefs(thread), + await); + ASSERT_TRUE(NS_SUCCEEDED(rv)); + + // We can't guarantee that the thread has reached the AwaitIdle call, but we + // can get pretty close. Either way, it shouldn't affect the behavior of the + // test. + PR_Sleep(PR_MillisecondsToInterval(100)); + + // The AwaitIdle call should be blocked, even though there is no executor, + // because throttled is paused. + { + MutexAutoLock lock(mutex); + ASSERT_EQ(dequeue_await, ""); + dequeue_await += "dequeue"; + ASSERT_FALSE(threadFinished); + } + + // A paused TEQ contributes no events to its base target. (This is covered by + // other tests...) + ASSERT_TRUE(NS_SUCCEEDED(base->Run())); + ASSERT_TRUE(base->IsEmpty()); + ASSERT_FALSE(throttled->IsEmpty()); + + // Resume and drain the queue. + ASSERT_FALSE(runnableFinished); + ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(false))); + ASSERT_TRUE(NS_SUCCEEDED(base->Run())); + ASSERT_TRUE(base->IsEmpty()); + ASSERT_TRUE(throttled->IsEmpty()); + ASSERT_TRUE(runnableFinished); + + // Wait for the thread to finish. + { + MutexAutoLock lock(mutex); + while (!threadFinished) + cond.Wait(); + ASSERT_EQ(dequeue_await, "dequeue await"); + } + + ASSERT_TRUE(NS_SUCCEEDED(thread->Shutdown())); +} + +TEST(ThrottledEventQueue, ExecutorTransitions) +{ + string log; + + auto base = MakeRefPtr(); + RefPtr throttled = ThrottledEventQueue::Create(base); + + ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(true))); + + // Since we're paused, queueing an event on throttled shouldn't queue the + // executor on the base target. + Enqueue(throttled, [&]() { log += 'a'; }); + ASSERT_EQ(throttled->Length(), 1U); + ASSERT_EQ(base->Length(), 0U); + + // Resuming throttled should create the executor, since throttled is not empty. + ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(false))); + ASSERT_EQ(throttled->Length(), 1U); + ASSERT_EQ(base->Length(), 1U); + + // Pausing can't remove the executor from the base target since we've already + // queued it there, but it can ensure that it doesn't do anything. + ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(true))); + + ASSERT_EQ(log, ""); + ASSERT_TRUE(NS_SUCCEEDED(base->Run())); + ASSERT_EQ(log, ""); + ASSERT_EQ(throttled->Length(), 1U); + ASSERT_EQ(base->Length(), 0U); + + // As before, resuming must create the executor, since throttled is not empty. + ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(false))); + ASSERT_EQ(throttled->Length(), 1U); + ASSERT_EQ(base->Length(), 1U); + + ASSERT_EQ(log, ""); + ASSERT_TRUE(NS_SUCCEEDED(base->Run())); + ASSERT_EQ(log, "a"); + ASSERT_EQ(throttled->Length(), 0U); + ASSERT_EQ(base->Length(), 0U); + + // Since throttled is empty, pausing and resuming now should not enqueue an + // executor. + ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(true))); + ASSERT_TRUE(NS_SUCCEEDED(throttled->SetIsPaused(false))); + ASSERT_EQ(throttled->Length(), 0U); + ASSERT_EQ(base->Length(), 0U); +} diff --git a/xpcom/threads/ThrottledEventQueue.cpp b/xpcom/threads/ThrottledEventQueue.cpp index 0a961659b34f..03541b5b041e 100644 --- a/xpcom/threads/ThrottledEventQueue.cpp +++ b/xpcom/threads/ThrottledEventQueue.cpp @@ -46,6 +46,18 @@ namespace { // live for as long as the executor exists - that is, until the Inner's queue is // empty. // +// A Paused ThrottledEventQueue does not enqueue an executor when new events are +// added. Any executor previously queued on the base event target draws no +// events from a Paused ThrottledEventQueue, and returns without re-enqueueing +// itself. Since there is no executor keeping the Inner object alive until its +// queue is empty, dropping a Paused ThrottledEventQueue may drop the Inner +// while it still owns events. This is the correct behavior: if there are no +// references to it, it will never be Resumed, and thus it will never dispatch +// events again. +// +// Resuming a ThrottledEventQueue must dispatch an executor, so calls to Resume +// are fallible for the same reasons as calls to Dispatch. +// // The xpcom shutdown process drains the main thread's event queue several // times, so if a ThrottledEventQueue is being driven by the main thread, it // should get emptied out by the time we reach the "eventq shutdown" phase. @@ -86,22 +98,31 @@ class ThrottledEventQueue::Inner final : public nsISupports mutable CondVar mIdleCondVar; // As-of-yet unexecuted runnables queued on this ThrottledEventQueue. - // (Used from any thread, protected by mMutex.) + // + // Used from any thread; protected by mMutex. Signals mIdleCondVar when + // emptied. EventQueue mEventQueue; // The event target we dispatch our events (actually, just our Executor) to. - // (Written during construction on main thread; read by any thread.) + // + // Written only during construction. Readable by any thread without locking. nsCOMPtr mBaseTarget; // The Executor that we dispatch to mBaseTarget to draw runnables from our // queue. mExecutor->mInner points to this Inner, forming a reference loop. - // (Used from any thread, protected by mMutex.) + // + // Used from any thread; protected by mMutex. nsCOMPtr mExecutor; + // True if this queue is currently paused. + // Used from any thread; protected by mMutex. + bool mIsPaused; + explicit Inner(nsISerialEventTarget* aBaseTarget) : mMutex("ThrottledEventQueue") , mIdleCondVar(mMutex, "ThrottledEventQueue:Idle") , mBaseTarget(aBaseTarget) + , mIsPaused(false) { } @@ -109,11 +130,40 @@ class ThrottledEventQueue::Inner final : public nsISupports { #ifdef DEBUG MutexAutoLock lock(mMutex); + + // As long as an executor exists, it had better keep us alive, since it's + // going to call ExecuteRunnable on us. MOZ_ASSERT(!mExecutor); - MOZ_ASSERT(mEventQueue.IsEmpty(lock)); + + // If we have any events in our queue, there should be an executor queued + // for them, and that should have kept us alive. The exception is that, if + // we're paused, we don't enqueue an executor. + MOZ_ASSERT(mEventQueue.IsEmpty(lock) || IsPaused(lock)); + + // Some runnables are only safe to drop on the main thread, so if our queue + // isn't empty, we'd better be on the main thread. + MOZ_ASSERT_IF(!mEventQueue.IsEmpty(lock), NS_IsMainThread()); #endif } + // Make sure an executor has been queued on our base target. If we already + // have one, do nothing; otherwise, create and dispatch it. + nsresult EnsureExecutor(MutexAutoLock& lock) { + if (mExecutor) + return NS_OK; + + // Note, this creates a ref cycle keeping the inner alive + // until the queue is drained. + mExecutor = new Executor(this); + nsresult rv = mBaseTarget->Dispatch(mExecutor, NS_DISPATCH_NORMAL); + if (NS_WARN_IF(NS_FAILED(rv))) { + mExecutor = nullptr; + return rv; + } + + return NS_OK; + } + nsresult CurrentName(nsACString& aName) { @@ -158,6 +208,17 @@ class ThrottledEventQueue::Inner final : public nsISupports { MutexAutoLock lock(mMutex); + // Normally, a paused queue doesn't dispatch any executor, but we might + // have been paused after the executor was already in flight. There's no + // way to yank the executor out of the base event target, so we just check + // for a paused queue here and return without running anything. We'll + // create a new executor when we're resumed. + if (IsPaused(lock)) { + // Note, this breaks a ref cycle. + mExecutor = nullptr; + return; + } + // We only dispatch an executor runnable when we know there is something // in the queue, so this should never fail. event = mEventQueue.GetEvent(nullptr, lock); @@ -228,11 +289,43 @@ public: #endif MutexAutoLock lock(mMutex); - while (mExecutor) { + while (mExecutor || IsPaused(lock)) { mIdleCondVar.Wait(); } } + bool + IsPaused() const + { + MutexAutoLock lock(mMutex); + return IsPaused(lock); + } + + bool + IsPaused(const MutexAutoLock& aProofOfLock) const + { + return mIsPaused; + } + + nsresult + SetIsPaused(bool aIsPaused) + { + MutexAutoLock lock(mMutex); + + // If we will be unpaused, and we have events in our queue, make sure we + // have an executor queued on the base event target to run them. Do this + // before we actually change mIsPaused, since this is fallible. + if (!aIsPaused && !mEventQueue.IsEmpty(lock)) { + nsresult rv = EnsureExecutor(lock); + if (NS_FAILED(rv)) { + return rv; + } + } + + mIsPaused = aIsPaused; + return NS_OK; + } + nsresult DispatchFromScript(nsIRunnable* aEvent, uint32_t aFlags) { @@ -250,19 +343,13 @@ public: // Any thread MutexAutoLock lock(mMutex); - // We are not currently processing events, so we must start - // operating on our base target. This is fallible, so do - // it first. Our lock will prevent the executor from accessing - // the event queue before we add the event below. - if (!mExecutor) { - // Note, this creates a ref cycle keeping the inner alive - // until the queue is drained. - mExecutor = new Executor(this); - nsresult rv = mBaseTarget->Dispatch(mExecutor, NS_DISPATCH_NORMAL); - if (NS_WARN_IF(NS_FAILED(rv))) { - mExecutor = nullptr; + if (!IsPaused(lock)) { + // Make sure we have an executor in flight to process events. This is + // fallible, so do it first. Our lock will prevent the executor from + // accessing the event queue before we add the event below. + nsresult rv = EnsureExecutor(lock); + if (NS_FAILED(rv)) return rv; - } } // Only add the event to the underlying queue if are able to @@ -332,6 +419,18 @@ ThrottledEventQueue::AwaitIdle() const return mInner->AwaitIdle(); } +nsresult +ThrottledEventQueue::SetIsPaused(bool aIsPaused) +{ + return mInner->SetIsPaused(aIsPaused); +} + +bool +ThrottledEventQueue::IsPaused() const +{ + return mInner->IsPaused(); +} + NS_IMETHODIMP ThrottledEventQueue::DispatchFromScript(nsIRunnable* aEvent, uint32_t aFlags) { diff --git a/xpcom/threads/ThrottledEventQueue.h b/xpcom/threads/ThrottledEventQueue.h index c86b2d3bf4ce..d24365c1b914 100644 --- a/xpcom/threads/ThrottledEventQueue.h +++ b/xpcom/threads/ThrottledEventQueue.h @@ -22,7 +22,7 @@ namespace mozilla { // own queue of events and only dispatches one at a time to the wrapped // target. This can be used to avoid flooding the base target. // -// Flooding is avoided via a very simply principal. Runnables dispatched +// Flooding is avoided via a very simple principle. Runnables dispatched // to the ThrottledEventQueue are only dispatched to the base target // one at a time. Only once that runnable has executed will we dispatch // the next runnable to the base target. This in effect makes all @@ -74,10 +74,32 @@ public: // Determine how many events are pending in the queue. uint32_t Length() const; - // Block the current thread until the queue is empty. This may not - // be called on the main thread or the base target. + // Block the current thread until the queue is empty. This may not be called + // on the main thread or the base target. The ThrottledEventQueue must not be + // paused. void AwaitIdle() const; + // If |aIsPaused| is true, pause execution of events from this queue. No + // events from this queue will be run until this is called with |aIsPaused| + // false. + // + // To un-pause a ThrottledEventQueue, we need to dispatch a runnable to the + // underlying event target. That operation may fail, so this method is + // fallible as well. + // + // Note that, although ThrottledEventQueue's behavior is descibed as queueing + // events on the base target, an event queued on a TEQ is never actually moved + // to any other queue. What is actually dispatched to the base is an + // "executor" event which, when run, removes an event from the TEQ and runs it + // immediately. This means that you can pause a TEQ even after the executor + // has been queued on the base target, and even so, no events from the TEQ + // will run. When the base target gets around to running the executor, the + // executor will see that the TEQ is paused, and do nothing. + MOZ_MUST_USE nsresult SetIsPaused(bool aIsPaused); + + // Return true if this ThrottledEventQueue is paused. + bool IsPaused() const; + NS_DECL_THREADSAFE_ISUPPORTS NS_DECL_NSIEVENTTARGET_FULL