mirror of
https://github.com/mozilla/gecko-dev.git
synced 2024-10-21 09:15:35 +00:00
Bug 1151656 - Make MediaPromises operate with TaskDispatchers. r=mattwoodrow
This commit is contained in:
parent
07fc0fdd03
commit
018e5fc24d
@ -10,6 +10,7 @@
|
||||
#include "prlog.h"
|
||||
|
||||
#include "AbstractThread.h"
|
||||
#include "TaskDispatcher.h"
|
||||
|
||||
#include "nsTArray.h"
|
||||
#include "nsThreadUtils.h"
|
||||
@ -74,18 +75,20 @@ public:
|
||||
class Private;
|
||||
|
||||
static nsRefPtr<MediaPromise>
|
||||
CreateAndResolve(ResolveValueType aResolveValue, const char* aResolveSite)
|
||||
CreateAndResolve(ResolveValueType aResolveValue, const char* aResolveSite,
|
||||
TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>())
|
||||
{
|
||||
nsRefPtr<typename MediaPromise::Private> p = new MediaPromise::Private(aResolveSite);
|
||||
p->Resolve(aResolveValue, aResolveSite);
|
||||
p->Resolve(aResolveValue, aResolveSite, aDispatcher);
|
||||
return Move(p);
|
||||
}
|
||||
|
||||
static nsRefPtr<MediaPromise>
|
||||
CreateAndReject(RejectValueType aRejectValue, const char* aRejectSite)
|
||||
CreateAndReject(RejectValueType aRejectValue, const char* aRejectSite,
|
||||
TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>())
|
||||
{
|
||||
nsRefPtr<typename MediaPromise::Private> p = new MediaPromise::Private(aRejectSite);
|
||||
p->Reject(aRejectValue, aRejectSite);
|
||||
p->Reject(aRejectValue, aRejectSite, aDispatcher);
|
||||
return Move(p);
|
||||
}
|
||||
|
||||
@ -171,7 +174,8 @@ protected:
|
||||
|
||||
explicit ThenValueBase(const char* aCallSite) : mCallSite(aCallSite) {}
|
||||
|
||||
virtual void Dispatch(MediaPromise *aPromise) = 0;
|
||||
virtual void Dispatch(MediaPromise *aPromise,
|
||||
TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>()) = 0;
|
||||
|
||||
protected:
|
||||
virtual void DoResolve(ResolveValueType aResolveValue) = 0;
|
||||
@ -219,7 +223,7 @@ protected:
|
||||
, mResolveMethod(aResolveMethod)
|
||||
, mRejectMethod(aRejectMethod) {}
|
||||
|
||||
void Dispatch(MediaPromise *aPromise) override
|
||||
void Dispatch(MediaPromise *aPromise, TaskDispatcher& aDispatcher) override
|
||||
{
|
||||
aPromise->mMutex.AssertCurrentThreadOwns();
|
||||
MOZ_ASSERT(!aPromise->IsPending());
|
||||
@ -230,16 +234,12 @@ protected:
|
||||
PROMISE_LOG("%s Then() call made from %s [Runnable=%p, Promise=%p, ThenValue=%p]",
|
||||
resolved ? "Resolving" : "Rejecting", ThenValueBase::mCallSite,
|
||||
runnable.get(), aPromise, this);
|
||||
nsresult rv = mResponseTarget->Dispatch(runnable.forget());
|
||||
|
||||
// NB: mDisconnected is only supposed to be accessed on the dispatch
|
||||
// thread. However, we require the consumer to have disconnected any
|
||||
// oustanding promise requests _before_ initiating shutdown on the
|
||||
// thread or task queue. So the only non-buggy scenario for dispatch
|
||||
// failing involves the target thread being unable to manipulate the
|
||||
// ThenValue (since it's been disconnected), so it's safe to read here.
|
||||
MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(rv) || Consumer::mDisconnected);
|
||||
unused << rv;
|
||||
// Promise consumers are allowed to disconnect the Consumer object and
|
||||
// then shut down the thread or task queue that the promise result would
|
||||
// be dispatched on. So we unfortunately can't assert that promise
|
||||
// dispatch succeeds. :-(
|
||||
aDispatcher.AddTask(mResponseTarget, runnable.forget(), /* aAssertDispatchSuccess = */ false);
|
||||
}
|
||||
|
||||
#ifdef DEBUG
|
||||
@ -308,9 +308,20 @@ public:
|
||||
|
||||
template<typename ThisType, typename ResolveMethodType, typename RejectMethodType>
|
||||
already_AddRefed<Consumer> RefableThen(AbstractThread* aResponseThread, const char* aCallSite, ThisType* aThisVal,
|
||||
ResolveMethodType aResolveMethod, RejectMethodType aRejectMethod)
|
||||
ResolveMethodType aResolveMethod, RejectMethodType aRejectMethod,
|
||||
TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>())
|
||||
{
|
||||
MutexAutoLock lock(mMutex);
|
||||
|
||||
// {Refable,}Then() rarely dispatch directly - they do so only in the case
|
||||
// where the promise has already been resolved by the time {Refable,}Then()
|
||||
// is invoked. This case is rare, but it _can_ happen, which makes it a ripe
|
||||
// target for race bugs. So we do an extra assertion here to make sure our
|
||||
// caller is using tail dispatch correctly no matter what, rather than
|
||||
// relying on the assertion in Dispatch(), which may be called extremely
|
||||
// infrequently.
|
||||
aDispatcher.AssertIsTailDispatcherIfRequired();
|
||||
|
||||
MOZ_DIAGNOSTIC_ASSERT(!IsExclusive || !mHaveConsumer);
|
||||
mHaveConsumer = true;
|
||||
nsRefPtr<ThenValueBase> thenValue = new ThenValue<ThisType, ResolveMethodType, RejectMethodType>(
|
||||
@ -318,7 +329,7 @@ public:
|
||||
PROMISE_LOG("%s invoking Then() [this=%p, thenValue=%p, aThisVal=%p, isPending=%d]",
|
||||
aCallSite, this, thenValue.get(), aThisVal, (int) IsPending());
|
||||
if (!IsPending()) {
|
||||
thenValue->Dispatch(this);
|
||||
thenValue->Dispatch(this, aDispatcher);
|
||||
} else {
|
||||
mThenValues.AppendElement(thenValue);
|
||||
}
|
||||
@ -328,14 +339,16 @@ public:
|
||||
|
||||
template<typename ThisType, typename ResolveMethodType, typename RejectMethodType>
|
||||
void Then(AbstractThread* aResponseThread, const char* aCallSite, ThisType* aThisVal,
|
||||
ResolveMethodType aResolveMethod, RejectMethodType aRejectMethod)
|
||||
ResolveMethodType aResolveMethod, RejectMethodType aRejectMethod,
|
||||
TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>())
|
||||
{
|
||||
nsRefPtr<Consumer> c =
|
||||
RefableThen(aResponseThread, aCallSite, aThisVal, aResolveMethod, aRejectMethod);
|
||||
RefableThen(aResponseThread, aCallSite, aThisVal, aResolveMethod, aRejectMethod, aDispatcher);
|
||||
return;
|
||||
}
|
||||
|
||||
void ChainTo(already_AddRefed<Private> aChainedPromise, const char* aCallSite)
|
||||
void ChainTo(already_AddRefed<Private> aChainedPromise, const char* aCallSite,
|
||||
TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>())
|
||||
{
|
||||
MutexAutoLock lock(mMutex);
|
||||
MOZ_DIAGNOSTIC_ASSERT(!IsExclusive || !mHaveConsumer);
|
||||
@ -344,7 +357,7 @@ public:
|
||||
PROMISE_LOG("%s invoking Chain() [this=%p, chainedPromise=%p, isPending=%d]",
|
||||
aCallSite, this, chainedPromise.get(), (int) IsPending());
|
||||
if (!IsPending()) {
|
||||
ForwardTo(chainedPromise);
|
||||
ForwardTo(chainedPromise, aDispatcher);
|
||||
} else {
|
||||
mChainedPromises.AppendElement(chainedPromise);
|
||||
}
|
||||
@ -352,27 +365,27 @@ public:
|
||||
|
||||
protected:
|
||||
bool IsPending() { return mResolveValue.isNothing() && mRejectValue.isNothing(); }
|
||||
void DispatchAll()
|
||||
void DispatchAll(TaskDispatcher& aDispatcher)
|
||||
{
|
||||
mMutex.AssertCurrentThreadOwns();
|
||||
for (size_t i = 0; i < mThenValues.Length(); ++i) {
|
||||
mThenValues[i]->Dispatch(this);
|
||||
mThenValues[i]->Dispatch(this, aDispatcher);
|
||||
}
|
||||
mThenValues.Clear();
|
||||
|
||||
for (size_t i = 0; i < mChainedPromises.Length(); ++i) {
|
||||
ForwardTo(mChainedPromises[i]);
|
||||
ForwardTo(mChainedPromises[i], aDispatcher);
|
||||
}
|
||||
mChainedPromises.Clear();
|
||||
}
|
||||
|
||||
void ForwardTo(Private* aOther)
|
||||
void ForwardTo(Private* aOther, TaskDispatcher& aDispatcher)
|
||||
{
|
||||
MOZ_ASSERT(!IsPending());
|
||||
if (mResolveValue.isSome()) {
|
||||
aOther->Resolve(mResolveValue.ref(), "<chained promise>");
|
||||
aOther->Resolve(mResolveValue.ref(), "<chained promise>", aDispatcher);
|
||||
} else {
|
||||
aOther->Reject(mRejectValue.ref(), "<chained promise>");
|
||||
aOther->Reject(mRejectValue.ref(), "<chained promise>", aDispatcher);
|
||||
}
|
||||
}
|
||||
|
||||
@ -400,22 +413,24 @@ class MediaPromise<ResolveValueT, RejectValueT, IsExclusive>::Private
|
||||
public:
|
||||
explicit Private(const char* aCreationSite) : MediaPromise(aCreationSite) {}
|
||||
|
||||
void Resolve(ResolveValueT aResolveValue, const char* aResolveSite)
|
||||
void Resolve(ResolveValueT aResolveValue, const char* aResolveSite,
|
||||
TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>())
|
||||
{
|
||||
MutexAutoLock lock(mMutex);
|
||||
MOZ_ASSERT(IsPending());
|
||||
PROMISE_LOG("%s resolving MediaPromise (%p created at %s)", aResolveSite, this, mCreationSite);
|
||||
mResolveValue.emplace(aResolveValue);
|
||||
DispatchAll();
|
||||
DispatchAll(aDispatcher);
|
||||
}
|
||||
|
||||
void Reject(RejectValueT aRejectValue, const char* aRejectSite)
|
||||
void Reject(RejectValueT aRejectValue, const char* aRejectSite,
|
||||
TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>())
|
||||
{
|
||||
MutexAutoLock lock(mMutex);
|
||||
MOZ_ASSERT(IsPending());
|
||||
PROMISE_LOG("%s rejecting MediaPromise (%p created at %s)", aRejectSite, this, mCreationSite);
|
||||
mRejectValue.emplace(aRejectValue);
|
||||
DispatchAll();
|
||||
DispatchAll(aDispatcher);
|
||||
}
|
||||
};
|
||||
|
||||
@ -476,40 +491,46 @@ public:
|
||||
}
|
||||
|
||||
void Resolve(typename PromiseType::ResolveValueType aResolveValue,
|
||||
const char* aMethodName)
|
||||
const char* aMethodName,
|
||||
TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>())
|
||||
{
|
||||
if (mMonitor) {
|
||||
mMonitor->AssertCurrentThreadOwns();
|
||||
}
|
||||
MOZ_ASSERT(mPromise);
|
||||
mPromise->Resolve(aResolveValue, aMethodName);
|
||||
mPromise->Resolve(aResolveValue, aMethodName, aDispatcher);
|
||||
mPromise = nullptr;
|
||||
}
|
||||
|
||||
|
||||
void ResolveIfExists(typename PromiseType::ResolveValueType aResolveValue,
|
||||
const char* aMethodName)
|
||||
const char* aMethodName,
|
||||
TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>())
|
||||
{
|
||||
if (!IsEmpty()) {
|
||||
Resolve(aResolveValue, aMethodName);
|
||||
Resolve(aResolveValue, aMethodName, aDispatcher);
|
||||
}
|
||||
}
|
||||
|
||||
void Reject(typename PromiseType::RejectValueType aRejectValue,
|
||||
const char* aMethodName)
|
||||
const char* aMethodName,
|
||||
TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>())
|
||||
{
|
||||
if (mMonitor) {
|
||||
mMonitor->AssertCurrentThreadOwns();
|
||||
}
|
||||
MOZ_ASSERT(mPromise);
|
||||
mPromise->Reject(aRejectValue, aMethodName);
|
||||
mPromise->Reject(aRejectValue, aMethodName, aDispatcher);
|
||||
mPromise = nullptr;
|
||||
}
|
||||
|
||||
|
||||
void RejectIfExists(typename PromiseType::RejectValueType aRejectValue,
|
||||
const char* aMethodName)
|
||||
const char* aMethodName,
|
||||
TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>())
|
||||
{
|
||||
if (!IsEmpty()) {
|
||||
Reject(aRejectValue, aMethodName);
|
||||
Reject(aRejectValue, aMethodName, aDispatcher);
|
||||
}
|
||||
}
|
||||
|
||||
@ -645,13 +666,12 @@ private:
|
||||
|
||||
template<typename PromiseType>
|
||||
static nsRefPtr<PromiseType>
|
||||
ProxyInternal(AbstractThread* aTarget, MethodCallBase<PromiseType>* aMethodCall, const char* aCallerName)
|
||||
ProxyInternal(AbstractThread* aTarget, MethodCallBase<PromiseType>* aMethodCall, const char* aCallerName,
|
||||
TaskDispatcher& aDispatcher)
|
||||
{
|
||||
nsRefPtr<typename PromiseType::Private> p = new (typename PromiseType::Private)(aCallerName);
|
||||
nsRefPtr<ProxyRunnable<PromiseType>> r = new ProxyRunnable<PromiseType>(p, aMethodCall);
|
||||
nsresult rv = aTarget->Dispatch(r.forget());
|
||||
MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(rv));
|
||||
unused << rv;
|
||||
aDispatcher.AddTask(aTarget, r.forget());
|
||||
return Move(p);
|
||||
}
|
||||
|
||||
@ -660,31 +680,34 @@ ProxyInternal(AbstractThread* aTarget, MethodCallBase<PromiseType>* aMethodCall,
|
||||
template<typename PromiseType, typename ThisType>
|
||||
static nsRefPtr<PromiseType>
|
||||
ProxyMediaCall(AbstractThread* aTarget, ThisType* aThisVal, const char* aCallerName,
|
||||
nsRefPtr<PromiseType>(ThisType::*aMethod)())
|
||||
nsRefPtr<PromiseType>(ThisType::*aMethod)(),
|
||||
TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>())
|
||||
{
|
||||
typedef detail::MethodCallWithNoArgs<PromiseType, ThisType> MethodCallType;
|
||||
MethodCallType* methodCall = new MethodCallType(aThisVal, aMethod);
|
||||
return detail::ProxyInternal(aTarget, methodCall, aCallerName);
|
||||
return detail::ProxyInternal(aTarget, methodCall, aCallerName, aDispatcher);
|
||||
}
|
||||
|
||||
template<typename PromiseType, typename ThisType, typename Arg1Type>
|
||||
static nsRefPtr<PromiseType>
|
||||
ProxyMediaCall(AbstractThread* aTarget, ThisType* aThisVal, const char* aCallerName,
|
||||
nsRefPtr<PromiseType>(ThisType::*aMethod)(Arg1Type), Arg1Type aArg1)
|
||||
nsRefPtr<PromiseType>(ThisType::*aMethod)(Arg1Type), Arg1Type aArg1,
|
||||
TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>())
|
||||
{
|
||||
typedef detail::MethodCallWithOneArg<PromiseType, ThisType, Arg1Type> MethodCallType;
|
||||
MethodCallType* methodCall = new MethodCallType(aThisVal, aMethod, aArg1);
|
||||
return detail::ProxyInternal(aTarget, methodCall, aCallerName);
|
||||
return detail::ProxyInternal(aTarget, methodCall, aCallerName, aDispatcher);
|
||||
}
|
||||
|
||||
template<typename PromiseType, typename ThisType, typename Arg1Type, typename Arg2Type>
|
||||
static nsRefPtr<PromiseType>
|
||||
ProxyMediaCall(AbstractThread* aTarget, ThisType* aThisVal, const char* aCallerName,
|
||||
nsRefPtr<PromiseType>(ThisType::*aMethod)(Arg1Type, Arg2Type), Arg1Type aArg1, Arg2Type aArg2)
|
||||
nsRefPtr<PromiseType>(ThisType::*aMethod)(Arg1Type, Arg2Type), Arg1Type aArg1, Arg2Type aArg2,
|
||||
TaskDispatcher& aDispatcher = PassByRef<AutoTaskDispatcher>())
|
||||
{
|
||||
typedef detail::MethodCallWithTwoArgs<PromiseType, ThisType, Arg1Type, Arg2Type> MethodCallType;
|
||||
MethodCallType* methodCall = new MethodCallType(aThisVal, aMethod, aArg1, aArg2);
|
||||
return detail::ProxyInternal(aTarget, methodCall, aCallerName);
|
||||
return detail::ProxyInternal(aTarget, methodCall, aCallerName, aDispatcher);
|
||||
}
|
||||
|
||||
#undef PROMISE_LOG
|
||||
|
@ -297,4 +297,19 @@ MediaTaskQueue::Runner::Run()
|
||||
return NS_OK;
|
||||
}
|
||||
|
||||
#ifdef DEBUG
|
||||
void
|
||||
TaskDispatcher::AssertIsTailDispatcherIfRequired()
|
||||
{
|
||||
MediaTaskQueue* currentQueue = MediaTaskQueue::GetCurrentQueue();
|
||||
|
||||
// NB: Make sure not to use the TailDispatcher() accessor, since that
|
||||
// asserts IsCurrentThreadIn(), which acquires the queue monitor, which
|
||||
// triggers a deadlock during shutdown between the queue monitor and the
|
||||
// MediaPromise monitor.
|
||||
MOZ_ASSERT_IF(currentQueue && currentQueue->RequiresTailDispatch(),
|
||||
this == currentQueue->mTailDispatcher);
|
||||
}
|
||||
#endif
|
||||
|
||||
} // namespace mozilla
|
||||
|
@ -169,6 +169,7 @@ protected:
|
||||
MediaTaskQueue* mQueue;
|
||||
};
|
||||
|
||||
friend class TaskDispatcher;
|
||||
TaskDispatcher* mTailDispatcher;
|
||||
|
||||
// True if we've dispatched an event to the pool to execute events from
|
||||
|
@ -44,6 +44,12 @@ public:
|
||||
virtual void AddTask(AbstractThread* aThread,
|
||||
already_AddRefed<nsIRunnable> aRunnable,
|
||||
bool aAssertDispatchSuccess = true) = 0;
|
||||
|
||||
#ifdef DEBUG
|
||||
void AssertIsTailDispatcherIfRequired();
|
||||
#else
|
||||
void AssertIsTailDispatcherIfRequired() {}
|
||||
#endif
|
||||
};
|
||||
|
||||
/*
|
||||
@ -142,6 +148,18 @@ private:
|
||||
nsTArray<UniquePtr<PerThreadTaskGroup>> mTaskGroups;
|
||||
};
|
||||
|
||||
// Little utility class to allow declaring AutoTaskDispatcher as a default
|
||||
// parameter for methods that take a TaskDispatcher&.
|
||||
template<typename T>
|
||||
class PassByRef
|
||||
{
|
||||
public:
|
||||
PassByRef() {}
|
||||
operator T&() { return mVal; }
|
||||
private:
|
||||
T mVal;
|
||||
};
|
||||
|
||||
} // namespace mozilla
|
||||
|
||||
#endif
|
||||
|
Loading…
Reference in New Issue
Block a user