Bug 1373555 - Move the Fetch consume body login in a separate class - part 3 - Move the consuming body logic from FetchBody to FetchBodyConsumer, r=bkelly

This commit is contained in:
Andrea Marchesini 2017-06-20 17:53:21 +02:00
parent 50e54e6b44
commit 4b75cfc9bc
6 changed files with 643 additions and 625 deletions

View File

@ -835,274 +835,11 @@ ExtractByteStreamFromBody(const fetch::BodyInit& aBodyInit,
return NS_ERROR_FAILURE;
}
namespace {
/*
* Called on successfully reading the complete stream.
*/
template <class Derived>
class ContinueConsumeBodyRunnable final : public MainThreadWorkerRunnable
{
RefPtr<FetchBodyConsumer<Derived>> mFetchBodyConsumer;
nsresult mStatus;
uint32_t mLength;
uint8_t* mResult;
public:
ContinueConsumeBodyRunnable(FetchBodyConsumer<Derived>* aFetchBodyConsumer,
nsresult aStatus, uint32_t aLength,
uint8_t* aResult)
: MainThreadWorkerRunnable(aFetchBodyConsumer->Body()->mWorkerPrivate)
, mFetchBodyConsumer(aFetchBodyConsumer)
, mStatus(aStatus)
, mLength(aLength)
, mResult(aResult)
{
MOZ_ASSERT(NS_IsMainThread());
}
bool
WorkerRun(JSContext* aCx, WorkerPrivate* aWorkerPrivate) override
{
mFetchBodyConsumer->Body()->ContinueConsumeBody(mFetchBodyConsumer, mStatus,
mLength, mResult);
return true;
}
};
/*
* Called on successfully reading the complete stream for Blob.
*/
template <class Derived>
class ContinueConsumeBlobBodyRunnable final : public MainThreadWorkerRunnable
{
RefPtr<FetchBodyConsumer<Derived>> mFetchBodyConsumer;
RefPtr<BlobImpl> mBlobImpl;
public:
ContinueConsumeBlobBodyRunnable(FetchBodyConsumer<Derived>* aFetchBodyConsumer,
BlobImpl* aBlobImpl)
: MainThreadWorkerRunnable(aFetchBodyConsumer->Body()->mWorkerPrivate)
, mFetchBodyConsumer(aFetchBodyConsumer)
, mBlobImpl(aBlobImpl)
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(mBlobImpl);
}
bool
WorkerRun(JSContext* aCx, WorkerPrivate* aWorkerPrivate) override
{
mFetchBodyConsumer->Body()->ContinueConsumeBlobBody(mFetchBodyConsumer,
mBlobImpl);
return true;
}
};
template <class Derived>
class FailConsumeBodyWorkerRunnable : public MainThreadWorkerControlRunnable
{
RefPtr<FetchBodyConsumer<Derived>> mBodyConsumer;
public:
explicit FailConsumeBodyWorkerRunnable(FetchBodyConsumer<Derived>* aBodyConsumer)
: MainThreadWorkerControlRunnable(aBodyConsumer->Body()->mWorkerPrivate)
, mBodyConsumer(aBodyConsumer)
{
AssertIsOnMainThread();
}
bool
WorkerRun(JSContext* aCx, WorkerPrivate* aWorkerPrivate) override
{
mBodyConsumer->Body()->ContinueConsumeBody(mBodyConsumer, NS_ERROR_FAILURE,
0, nullptr);
return true;
}
};
/*
* In case of failure to create a stream pump or dispatch stream completion to
* worker, ensure we cleanup properly. Thread agnostic.
*/
template <class Derived>
class MOZ_STACK_CLASS AutoFailConsumeBody final
{
RefPtr<FetchBodyConsumer<Derived>> mBodyConsumer;
public:
explicit AutoFailConsumeBody(FetchBodyConsumer<Derived>* aBodyConsumer)
: mBodyConsumer(aBodyConsumer)
{}
~AutoFailConsumeBody()
{
AssertIsOnMainThread();
if (mBodyConsumer) {
if (mBodyConsumer->Body()->mWorkerPrivate) {
RefPtr<FailConsumeBodyWorkerRunnable<Derived>> r =
new FailConsumeBodyWorkerRunnable<Derived>(mBodyConsumer);
if (!r->Dispatch()) {
MOZ_CRASH("We are going to leak");
}
} else {
mBodyConsumer->Body()->ContinueConsumeBody(mBodyConsumer,
NS_ERROR_FAILURE, 0,
nullptr);
}
}
}
void
DontFail()
{
mBodyConsumer = nullptr;
}
};
template <class Derived>
class ConsumeBodyDoneObserver : public nsIStreamLoaderObserver
, public MutableBlobStorageCallback
{
RefPtr<FetchBodyConsumer<Derived>> mFetchBodyConsumer;
public:
NS_DECL_THREADSAFE_ISUPPORTS
explicit ConsumeBodyDoneObserver(FetchBodyConsumer<Derived>* aFetchBodyConsumer)
: mFetchBodyConsumer(aFetchBodyConsumer)
{ }
NS_IMETHOD
OnStreamComplete(nsIStreamLoader* aLoader,
nsISupports* aCtxt,
nsresult aStatus,
uint32_t aResultLength,
const uint8_t* aResult) override
{
MOZ_ASSERT(NS_IsMainThread());
// If the binding requested cancel, we don't need to call
// ContinueConsumeBody, since that is the originator.
if (aStatus == NS_BINDING_ABORTED) {
return NS_OK;
}
uint8_t* nonconstResult = const_cast<uint8_t*>(aResult);
if (mFetchBodyConsumer->Body()->mWorkerPrivate) {
RefPtr<ContinueConsumeBodyRunnable<Derived>> r =
new ContinueConsumeBodyRunnable<Derived>(mFetchBodyConsumer,
aStatus,
aResultLength,
nonconstResult);
if (!r->Dispatch()) {
// XXXcatalinb: The worker is shutting down, the pump will be canceled
// by FetchBodyWorkerHolder::Notify.
NS_WARNING("Could not dispatch ConsumeBodyRunnable");
// Return failure so that aResult is freed.
return NS_ERROR_FAILURE;
}
} else {
mFetchBodyConsumer->Body()->ContinueConsumeBody(mFetchBodyConsumer,
aStatus, aResultLength,
nonconstResult);
}
// FetchBody is responsible for data.
return NS_SUCCESS_ADOPTED_DATA;
}
virtual void BlobStoreCompleted(MutableBlobStorage* aBlobStorage,
Blob* aBlob,
nsresult aRv) override
{
// On error.
if (NS_FAILED(aRv)) {
OnStreamComplete(nullptr, nullptr, aRv, 0, nullptr);
return;
}
MOZ_ASSERT(aBlob);
if (mFetchBodyConsumer->Body()->mWorkerPrivate) {
RefPtr<ContinueConsumeBlobBodyRunnable<Derived>> r =
new ContinueConsumeBlobBodyRunnable<Derived>(mFetchBodyConsumer,
aBlob->Impl());
if (!r->Dispatch()) {
NS_WARNING("Could not dispatch ConsumeBlobBodyRunnable");
return;
}
} else {
mFetchBodyConsumer->Body()->ContinueConsumeBlobBody(mFetchBodyConsumer,
aBlob->Impl());
}
}
private:
virtual ~ConsumeBodyDoneObserver()
{ }
};
template <class Derived>
NS_IMPL_ADDREF(ConsumeBodyDoneObserver<Derived>)
template <class Derived>
NS_IMPL_RELEASE(ConsumeBodyDoneObserver<Derived>)
template <class Derived>
NS_INTERFACE_MAP_BEGIN(ConsumeBodyDoneObserver<Derived>)
NS_INTERFACE_MAP_ENTRY(nsIStreamLoaderObserver)
NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIStreamLoaderObserver)
NS_INTERFACE_MAP_END
template <class Derived>
class BeginConsumeBodyRunnable final : public Runnable
{
RefPtr<FetchBodyConsumer<Derived>> mFetchBodyConsumer;
public:
explicit BeginConsumeBodyRunnable(FetchBodyConsumer<Derived>* aConsumer)
: mFetchBodyConsumer(aConsumer)
{ }
NS_IMETHOD
Run() override
{
mFetchBodyConsumer->Body()->BeginConsumeBodyMainThread(mFetchBodyConsumer);
return NS_OK;
}
};
template <class Derived>
class CancelPumpRunnable final : public WorkerMainThreadRunnable
{
// This is a sync runnable. What dispatches this runnable must keep the body
// alive.
FetchBody<Derived>* mBody;
public:
explicit CancelPumpRunnable(FetchBody<Derived>* aBody)
: WorkerMainThreadRunnable(aBody->mWorkerPrivate,
NS_LITERAL_CSTRING("Fetch :: Cancel Pump"))
, mBody(aBody)
{}
bool
MainThreadRun() override
{
mBody->CancelPump();
return true;
}
};
} // namespace
template <class Derived>
FetchBody<Derived>::FetchBody(nsIGlobalObject* aOwner)
: mOwner(aOwner)
, mWorkerPrivate(nullptr)
, mBodyUsed(false)
#ifdef DEBUG
, mReadDone(false)
#endif
{
MOZ_ASSERT(aOwner);
@ -1111,7 +848,6 @@ FetchBody<Derived>::FetchBody(nsIGlobalObject* aOwner)
MOZ_ASSERT(mWorkerPrivate);
mMainThreadEventTarget = mWorkerPrivate->MainThreadEventTarget();
} else {
mWorkerPrivate = nullptr;
mMainThreadEventTarget = aOwner->EventTargetFor(TaskCategory::Other);
}
@ -1129,335 +865,35 @@ FetchBody<Derived>::~FetchBody()
{
}
template <class Derived>
void
FetchBody<Derived>::CancelPump()
{
AssertIsOnMainThread();
MOZ_ASSERT(mConsumeBodyPump);
mConsumeBodyPump->Cancel(NS_BINDING_ABORTED);
}
// Return value is used by ConsumeBody to bubble the error code up to WebIDL so
// mConsumePromise doesn't have to be rejected on early exit.
template <class Derived>
nsresult
FetchBody<Derived>::BeginConsumeBody()
{
AssertIsOnTargetThread();
MOZ_ASSERT(mConsumePromise);
// The FetchBody is not thread-safe refcounted. We wrap it with a thread-safe
// object able to keep the current worker alive (if we are running in a
// worker).
RefPtr<FetchBodyConsumer<Derived>> consumer =
FetchBodyConsumer<Derived>::Create(this);
if (!consumer) {
return NS_ERROR_FAILURE;
}
nsCOMPtr<nsIRunnable> r = new BeginConsumeBodyRunnable<Derived>(consumer);
nsresult rv = mMainThreadEventTarget->Dispatch(r.forget(), NS_DISPATCH_NORMAL);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
return NS_OK;
}
/*
* BeginConsumeBodyMainThread() will automatically reject the consume promise
* and clean up on any failures, so there is no need for callers to do so,
* reflected in a lack of error return code.
*/
template <class Derived>
void
FetchBody<Derived>::BeginConsumeBodyMainThread(FetchBodyConsumer<Derived>* aConsumer)
{
AssertIsOnMainThread();
AutoFailConsumeBody<Derived> autoReject(aConsumer);
nsresult rv;
nsCOMPtr<nsIInputStream> stream;
DerivedClass()->GetBody(getter_AddRefs(stream));
if (!stream) {
rv = NS_NewCStringInputStream(getter_AddRefs(stream), EmptyCString());
if (NS_WARN_IF(NS_FAILED(rv))) {
return;
}
}
nsCOMPtr<nsIInputStreamPump> pump;
rv = NS_NewInputStreamPump(getter_AddRefs(pump),
stream, -1, -1, 0, 0, false,
mMainThreadEventTarget);
if (NS_WARN_IF(NS_FAILED(rv))) {
return;
}
RefPtr<ConsumeBodyDoneObserver<Derived>> p =
new ConsumeBodyDoneObserver<Derived>(aConsumer);
nsCOMPtr<nsIStreamListener> listener;
if (mConsumeType == CONSUME_BLOB) {
MutableBlobStorage::MutableBlobStorageType type =
MutableBlobStorage::eOnlyInMemory;
const mozilla::UniquePtr<mozilla::ipc::PrincipalInfo>& principalInfo =
DerivedClass()->GetPrincipalInfo();
// We support temporary file for blobs only if the principal is known and
// it's system or content not in private Browsing.
if (principalInfo &&
(principalInfo->type() == mozilla::ipc::PrincipalInfo::TSystemPrincipalInfo ||
(principalInfo->type() == mozilla::ipc::PrincipalInfo::TContentPrincipalInfo &&
principalInfo->get_ContentPrincipalInfo().attrs().mPrivateBrowsingId == 0))) {
type = MutableBlobStorage::eCouldBeInTemporaryFile;
}
listener = new MutableBlobStreamListener(type, nullptr, mMimeType, p,
mMainThreadEventTarget);
} else {
nsCOMPtr<nsIStreamLoader> loader;
rv = NS_NewStreamLoader(getter_AddRefs(loader), p);
if (NS_WARN_IF(NS_FAILED(rv))) {
return;
}
listener = loader;
}
rv = pump->AsyncRead(listener, nullptr);
if (NS_WARN_IF(NS_FAILED(rv))) {
return;
}
// Now that everything succeeded, we can assign the pump to a pointer that
// stays alive for the lifetime of the FetchBody.
mConsumeBodyPump =
new nsMainThreadPtrHolder<nsIInputStreamPump>(
"FetchBody::mConsumeBodyPump", pump, mMainThreadEventTarget);
// It is ok for retargeting to fail and reads to happen on the main thread.
autoReject.DontFail();
// Try to retarget, otherwise fall back to main thread.
nsCOMPtr<nsIThreadRetargetableRequest> rr = do_QueryInterface(pump);
if (rr) {
nsCOMPtr<nsIEventTarget> sts = do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID);
rv = rr->RetargetDeliveryTo(sts);
if (NS_WARN_IF(NS_FAILED(rv))) {
NS_WARNING("Retargeting failed");
}
}
}
template <class Derived>
void
FetchBody<Derived>::ContinueConsumeBody(FetchBodyConsumer<Derived>* aBodyConsumer,
nsresult aStatus, uint32_t aResultLength,
uint8_t* aResult)
{
AssertIsOnTargetThread();
// Just a precaution to ensure ContinueConsumeBody is not called out of
// sync with a body read.
MOZ_ASSERT(mBodyUsed);
MOZ_ASSERT(!mReadDone);
#ifdef DEBUG
mReadDone = true;
#endif
auto autoFree = mozilla::MakeScopeExit([&] {
free(aResult);
});
MOZ_ASSERT(mConsumePromise);
RefPtr<Promise> localPromise = mConsumePromise.forget();
auto autoReleaseObject = mozilla::MakeScopeExit([&] {
aBodyConsumer->ReleaseObject();
});
if (NS_WARN_IF(NS_FAILED(aStatus))) {
localPromise->MaybeReject(NS_ERROR_DOM_ABORT_ERR);
// If binding aborted, cancel the pump. We can't assert mConsumeBodyPump.
// In the (admittedly rare) situation that BeginConsumeBodyMainThread()
// context switches out, and the worker thread gets canceled before the
// pump is setup, mConsumeBodyPump will be null.
// We've to use the !! form since non-main thread pointer access on
// a nsMainThreadPtrHandle is not permitted.
if (aStatus == NS_BINDING_ABORTED && !!mConsumeBodyPump) {
if (NS_IsMainThread()) {
CancelPump();
} else {
MOZ_ASSERT(mWorkerPrivate);
// In case of worker thread, we block the worker while the request is
// canceled on the main thread. This ensures that OnStreamComplete has
// a valid FetchBody around to call CancelPump and we don't release the
// FetchBody on the main thread.
RefPtr<CancelPumpRunnable<Derived>> r =
new CancelPumpRunnable<Derived>(aBodyConsumer->Body());
ErrorResult rv;
r->Dispatch(Terminating, rv);
if (rv.Failed()) {
NS_WARNING("Could not dispatch CancelPumpRunnable. Nothing we can do here");
// None of our callers are callled directly from JS, so there is no
// point in trying to propagate this failure out of here. And
// localPromise is already rejected. Just suppress the failure.
rv.SuppressException();
}
}
}
}
// Release the pump and then early exit if there was an error.
// Uses NS_ProxyRelease internally, so this is safe.
mConsumeBodyPump = nullptr;
// Don't warn here since we warned above.
if (NS_FAILED(aStatus)) {
return;
}
// Finish successfully consuming body according to type.
MOZ_ASSERT(aResult);
AutoJSAPI jsapi;
if (!jsapi.Init(aBodyConsumer->Body()->DerivedClass()->GetParentObject())) {
localPromise->MaybeReject(NS_ERROR_UNEXPECTED);
return;
}
JSContext* cx = jsapi.cx();
ErrorResult error;
switch (mConsumeType) {
case CONSUME_ARRAYBUFFER: {
JS::Rooted<JSObject*> arrayBuffer(cx);
BodyUtil::ConsumeArrayBuffer(cx, &arrayBuffer, aResultLength, aResult,
error);
if (!error.Failed()) {
JS::Rooted<JS::Value> val(cx);
val.setObjectOrNull(arrayBuffer);
localPromise->MaybeResolve(cx, val);
// ArrayBuffer takes over ownership.
autoFree.release();
}
break;
}
case CONSUME_BLOB: {
MOZ_CRASH("This should not happen.");
break;
}
case CONSUME_FORMDATA: {
nsCString data;
data.Adopt(reinterpret_cast<char*>(aResult), aResultLength);
autoFree.release();
RefPtr<dom::FormData> fd = BodyUtil::ConsumeFormData(
aBodyConsumer->Body()->DerivedClass()->GetParentObject(),
mMimeType, data, error);
if (!error.Failed()) {
localPromise->MaybeResolve(fd);
}
break;
}
case CONSUME_TEXT:
// fall through handles early exit.
case CONSUME_JSON: {
nsString decoded;
if (NS_SUCCEEDED(BodyUtil::ConsumeText(aResultLength, aResult, decoded))) {
if (mConsumeType == CONSUME_TEXT) {
localPromise->MaybeResolve(decoded);
} else {
JS::Rooted<JS::Value> json(cx);
BodyUtil::ConsumeJson(cx, &json, decoded, error);
if (!error.Failed()) {
localPromise->MaybeResolve(cx, json);
}
}
};
break;
}
default:
NS_NOTREACHED("Unexpected consume body type");
}
error.WouldReportJSException();
if (error.Failed()) {
localPromise->MaybeReject(error);
}
}
template <class Derived>
void
FetchBody<Derived>::ContinueConsumeBlobBody(FetchBodyConsumer<Derived>* aBodyConsumer,
BlobImpl* aBlobImpl)
{
AssertIsOnTargetThread();
// Just a precaution to ensure ContinueConsumeBody is not called out of
// sync with a body read.
MOZ_ASSERT(mBodyUsed);
MOZ_ASSERT(!mReadDone);
MOZ_ASSERT(mConsumeType == CONSUME_BLOB);
#ifdef DEBUG
mReadDone = true;
#endif
MOZ_ASSERT(mConsumePromise);
RefPtr<Promise> localPromise = mConsumePromise.forget();
auto autoReleaseObject = mozilla::MakeScopeExit([&] {
aBodyConsumer->ReleaseObject();
});
// Release the pump and then early exit if there was an error.
// Uses NS_ProxyRelease internally, so this is safe.
mConsumeBodyPump = nullptr;
RefPtr<dom::Blob> blob =
dom::Blob::Create(aBodyConsumer->Body()->DerivedClass()->GetParentObject(),
aBlobImpl);
MOZ_ASSERT(blob);
localPromise->MaybeResolve(blob);
}
template <class Derived>
already_AddRefed<Promise>
FetchBody<Derived>::ConsumeBody(ConsumeType aType, ErrorResult& aRv)
FetchBody<Derived>::ConsumeBody(FetchConsumeType aType, ErrorResult& aRv)
{
if (BodyUsed()) {
aRv.ThrowTypeError<MSG_FETCH_BODY_CONSUMED_ERROR>();
return nullptr;
}
mConsumeType = aType;
SetBodyUsed();
mConsumePromise = Promise::Create(DerivedClass()->GetParentObject(), aRv);
if (aRv.Failed()) {
return nullptr;
}
aRv = BeginConsumeBody();
RefPtr<Promise> promise =
FetchBodyConsumer<Derived>::Create(DerivedClass()->GetParentObject(),
mMainThreadEventTarget, this, aType,
aRv);
if (NS_WARN_IF(aRv.Failed())) {
mConsumePromise = nullptr;
return nullptr;
}
RefPtr<Promise> promise = mConsumePromise;
return promise.forget();
}
template
already_AddRefed<Promise>
FetchBody<Request>::ConsumeBody(ConsumeType aType, ErrorResult& aRv);
FetchBody<Request>::ConsumeBody(FetchConsumeType aType, ErrorResult& aRv);
template
already_AddRefed<Promise>
FetchBody<Response>::ConsumeBody(ConsumeType aType, ErrorResult& aRv);
FetchBody<Response>::ConsumeBody(FetchConsumeType aType, ErrorResult& aRv);
template <class Derived>
void

View File

@ -8,7 +8,6 @@
#define mozilla_dom_Fetch_h
#include "nsAutoPtr.h"
#include "nsIInputStreamPump.h"
#include "nsIStreamLoader.h"
#include "nsCOMPtr.h"
@ -73,6 +72,15 @@ ExtractByteStreamFromBody(const fetch::BodyInit& aBodyInit,
template <class Derived> class FetchBodyConsumer;
enum FetchConsumeType
{
CONSUME_ARRAYBUFFER,
CONSUME_BLOB,
CONSUME_FORMDATA,
CONSUME_JSON,
CONSUME_TEXT,
};
/*
* FetchBody's body consumption uses nsIInputStreamPump to read from the
* underlying stream to a block of memory, which is then adopted by
@ -110,6 +118,8 @@ template <class Derived>
class FetchBody
{
public:
friend class FetchBodyConsumer<Derived>;
NS_INLINE_DECL_PURE_VIRTUAL_REFCOUNTING
bool
@ -146,19 +156,6 @@ public:
}
// Utility public methods accessed by various runnables.
void
BeginConsumeBodyMainThread(FetchBodyConsumer<Derived>* aConsumer);
void
ContinueConsumeBody(FetchBodyConsumer<Derived>* aConsumer, nsresult aStatus,
uint32_t aLength, uint8_t* aResult);
void
ContinueConsumeBlobBody(FetchBodyConsumer<Derived>* aConsumer,
BlobImpl* aBlobImpl);
void
CancelPump();
void
SetBodyUsed()
@ -166,39 +163,34 @@ public:
mBodyUsed = true;
}
// Always set whenever the FetchBody is created on the worker thread.
workers::WorkerPrivate* mWorkerPrivate;
const nsCString&
MimeType() const
{
return mMimeType;
}
protected:
nsCOMPtr<nsIGlobalObject> mOwner;
// Always set whenever the FetchBody is created on the worker thread.
workers::WorkerPrivate* mWorkerPrivate;
explicit FetchBody(nsIGlobalObject* aOwner);
virtual ~FetchBody();
void
SetMimeType();
private:
enum ConsumeType
{
CONSUME_ARRAYBUFFER,
CONSUME_BLOB,
CONSUME_FORMDATA,
CONSUME_JSON,
CONSUME_TEXT,
};
private:
Derived*
DerivedClass() const
{
return static_cast<Derived*>(const_cast<FetchBody*>(this));
}
nsresult
BeginConsumeBody();
already_AddRefed<Promise>
ConsumeBody(ConsumeType aType, ErrorResult& aRv);
ConsumeBody(FetchConsumeType aType, ErrorResult& aRv);
bool
IsOnTargetThread()
@ -216,15 +208,6 @@ private:
bool mBodyUsed;
nsCString mMimeType;
// Only touched on target thread.
ConsumeType mConsumeType;
RefPtr<Promise> mConsumePromise;
#ifdef DEBUG
bool mReadDone;
#endif
nsMainThreadPtrHandle<nsIInputStreamPump> mConsumeBodyPump;
// The main-thread event target for runnable dispatching.
nsCOMPtr<nsIEventTarget> mMainThreadEventTarget;
};

View File

@ -7,6 +7,7 @@
#include "Fetch.h"
#include "FetchConsumer.h"
#include "nsIInputStreamPump.h"
#include "nsProxyRelease.h"
#include "WorkerPrivate.h"
#include "WorkerRunnable.h"
@ -43,35 +44,305 @@ public:
mWasNotified = true;
// This will probably cause the releasing of the consumer.
// The WorkerHolder will be released as well.
mConsumer->Body()->ContinueConsumeBody(mConsumer, NS_BINDING_ABORTED, 0,
nullptr);
mConsumer->ContinueConsumeBody(NS_BINDING_ABORTED, 0, nullptr);
}
return true;
}
};
template <class Derived>
class BeginConsumeBodyRunnable final : public Runnable
{
RefPtr<FetchBodyConsumer<Derived>> mFetchBodyConsumer;
public:
explicit BeginConsumeBodyRunnable(FetchBodyConsumer<Derived>* aConsumer)
: mFetchBodyConsumer(aConsumer)
{ }
NS_IMETHOD
Run() override
{
mFetchBodyConsumer->BeginConsumeBodyMainThread();
return NS_OK;
}
};
/*
* Called on successfully reading the complete stream.
*/
template <class Derived>
class ContinueConsumeBodyRunnable final : public MainThreadWorkerRunnable
{
RefPtr<FetchBodyConsumer<Derived>> mFetchBodyConsumer;
nsresult mStatus;
uint32_t mLength;
uint8_t* mResult;
public:
ContinueConsumeBodyRunnable(FetchBodyConsumer<Derived>* aFetchBodyConsumer,
nsresult aStatus, uint32_t aLength,
uint8_t* aResult)
: MainThreadWorkerRunnable(aFetchBodyConsumer->GetWorkerPrivate())
, mFetchBodyConsumer(aFetchBodyConsumer)
, mStatus(aStatus)
, mLength(aLength)
, mResult(aResult)
{
MOZ_ASSERT(NS_IsMainThread());
}
bool
WorkerRun(JSContext* aCx, WorkerPrivate* aWorkerPrivate) override
{
mFetchBodyConsumer->ContinueConsumeBody(mStatus, mLength, mResult);
return true;
}
};
template <class Derived>
class FailConsumeBodyWorkerRunnable : public MainThreadWorkerControlRunnable
{
RefPtr<FetchBodyConsumer<Derived>> mBodyConsumer;
public:
explicit FailConsumeBodyWorkerRunnable(FetchBodyConsumer<Derived>* aBodyConsumer)
: MainThreadWorkerControlRunnable(aBodyConsumer->GetWorkerPrivate())
, mBodyConsumer(aBodyConsumer)
{
AssertIsOnMainThread();
}
bool
WorkerRun(JSContext* aCx, WorkerPrivate* aWorkerPrivate) override
{
mBodyConsumer->ContinueConsumeBody(NS_ERROR_FAILURE, 0, nullptr);
return true;
}
};
/*
* In case of failure to create a stream pump or dispatch stream completion to
* worker, ensure we cleanup properly. Thread agnostic.
*/
template <class Derived>
class MOZ_STACK_CLASS AutoFailConsumeBody final
{
RefPtr<FetchBodyConsumer<Derived>> mBodyConsumer;
public:
explicit AutoFailConsumeBody(FetchBodyConsumer<Derived>* aBodyConsumer)
: mBodyConsumer(aBodyConsumer)
{}
~AutoFailConsumeBody()
{
AssertIsOnMainThread();
if (mBodyConsumer) {
if (mBodyConsumer->GetWorkerPrivate()) {
RefPtr<FailConsumeBodyWorkerRunnable<Derived>> r =
new FailConsumeBodyWorkerRunnable<Derived>(mBodyConsumer);
if (!r->Dispatch()) {
MOZ_CRASH("We are going to leak");
}
} else {
mBodyConsumer->ContinueConsumeBody(NS_ERROR_FAILURE, 0, nullptr);
}
}
}
void
DontFail()
{
mBodyConsumer = nullptr;
}
};
/*
* Called on successfully reading the complete stream for Blob.
*/
template <class Derived>
class ContinueConsumeBlobBodyRunnable final : public MainThreadWorkerRunnable
{
RefPtr<FetchBodyConsumer<Derived>> mFetchBodyConsumer;
RefPtr<BlobImpl> mBlobImpl;
public:
ContinueConsumeBlobBodyRunnable(FetchBodyConsumer<Derived>* aFetchBodyConsumer,
BlobImpl* aBlobImpl)
: MainThreadWorkerRunnable(aFetchBodyConsumer->GetWorkerPrivate())
, mFetchBodyConsumer(aFetchBodyConsumer)
, mBlobImpl(aBlobImpl)
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(mBlobImpl);
}
bool
WorkerRun(JSContext* aCx, WorkerPrivate* aWorkerPrivate) override
{
mFetchBodyConsumer->ContinueConsumeBlobBody(mBlobImpl);
return true;
}
};
template <class Derived>
class ConsumeBodyDoneObserver : public nsIStreamLoaderObserver
, public MutableBlobStorageCallback
{
RefPtr<FetchBodyConsumer<Derived>> mFetchBodyConsumer;
public:
NS_DECL_THREADSAFE_ISUPPORTS
explicit ConsumeBodyDoneObserver(FetchBodyConsumer<Derived>* aFetchBodyConsumer)
: mFetchBodyConsumer(aFetchBodyConsumer)
{ }
NS_IMETHOD
OnStreamComplete(nsIStreamLoader* aLoader,
nsISupports* aCtxt,
nsresult aStatus,
uint32_t aResultLength,
const uint8_t* aResult) override
{
MOZ_ASSERT(NS_IsMainThread());
// If the binding requested cancel, we don't need to call
// ContinueConsumeBody, since that is the originator.
if (aStatus == NS_BINDING_ABORTED) {
return NS_OK;
}
uint8_t* nonconstResult = const_cast<uint8_t*>(aResult);
if (mFetchBodyConsumer->GetWorkerPrivate()) {
RefPtr<ContinueConsumeBodyRunnable<Derived>> r =
new ContinueConsumeBodyRunnable<Derived>(mFetchBodyConsumer,
aStatus,
aResultLength,
nonconstResult);
if (!r->Dispatch()) {
// XXXcatalinb: The worker is shutting down, the pump will be canceled
// by FetchBodyWorkerHolder::Notify.
NS_WARNING("Could not dispatch ConsumeBodyRunnable");
// Return failure so that aResult is freed.
return NS_ERROR_FAILURE;
}
} else {
mFetchBodyConsumer->ContinueConsumeBody(aStatus, aResultLength,
nonconstResult);
}
// FetchBody is responsible for data.
return NS_SUCCESS_ADOPTED_DATA;
}
virtual void BlobStoreCompleted(MutableBlobStorage* aBlobStorage,
Blob* aBlob,
nsresult aRv) override
{
// On error.
if (NS_FAILED(aRv)) {
OnStreamComplete(nullptr, nullptr, aRv, 0, nullptr);
return;
}
MOZ_ASSERT(aBlob);
if (mFetchBodyConsumer->GetWorkerPrivate()) {
RefPtr<ContinueConsumeBlobBodyRunnable<Derived>> r =
new ContinueConsumeBlobBodyRunnable<Derived>(mFetchBodyConsumer,
aBlob->Impl());
if (!r->Dispatch()) {
NS_WARNING("Could not dispatch ConsumeBlobBodyRunnable");
return;
}
} else {
mFetchBodyConsumer->ContinueConsumeBlobBody(aBlob->Impl());
}
}
private:
virtual ~ConsumeBodyDoneObserver()
{ }
};
template <class Derived>
NS_IMPL_ADDREF(ConsumeBodyDoneObserver<Derived>)
template <class Derived>
NS_IMPL_RELEASE(ConsumeBodyDoneObserver<Derived>)
template <class Derived>
NS_INTERFACE_MAP_BEGIN(ConsumeBodyDoneObserver<Derived>)
NS_INTERFACE_MAP_ENTRY(nsIStreamLoaderObserver)
NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIStreamLoaderObserver)
NS_INTERFACE_MAP_END
template <class Derived>
class CancelPumpRunnable final : public WorkerMainThreadRunnable
{
RefPtr<FetchBodyConsumer<Derived>> mBodyConsumer;
public:
explicit CancelPumpRunnable(FetchBodyConsumer<Derived>* aBodyConsumer)
: WorkerMainThreadRunnable(aBodyConsumer->GetWorkerPrivate(),
NS_LITERAL_CSTRING("Fetch :: Cancel Pump"))
, mBodyConsumer(aBodyConsumer)
{}
bool
MainThreadRun() override
{
mBodyConsumer->CancelPump();
return true;
}
};
} // anonymous
template <class Derived>
/* static */ already_AddRefed<FetchBodyConsumer<Derived>>
FetchBodyConsumer<Derived>::Create(FetchBody<Derived>* aBody)
/* static */ already_AddRefed<Promise>
FetchBodyConsumer<Derived>::Create(nsIGlobalObject* aGlobal,
nsIEventTarget* aMainThreadEventTarget,
FetchBody<Derived>* aBody,
FetchConsumeType aType,
ErrorResult& aRv)
{
MOZ_ASSERT(aBody);
MOZ_ASSERT(aMainThreadEventTarget);
RefPtr<Promise> promise = Promise::Create(aGlobal, aRv);
if (aRv.Failed()) {
return nullptr;
}
WorkerPrivate* workerPrivate = nullptr;
if (!NS_IsMainThread()) {
workerPrivate = GetCurrentThreadWorkerPrivate();
MOZ_ASSERT(workerPrivate);
}
RefPtr<FetchBodyConsumer<Derived>> consumer =
new FetchBodyConsumer<Derived>(aBody);
new FetchBodyConsumer<Derived>(aMainThreadEventTarget, workerPrivate,
aBody, promise, aType);
if (!NS_IsMainThread()) {
WorkerPrivate* workerPrivate = GetCurrentThreadWorkerPrivate();
MOZ_ASSERT(workerPrivate);
if (!consumer->RegisterWorkerHolder(workerPrivate)) {
if (NS_WARN_IF(!consumer->RegisterWorkerHolder(workerPrivate))) {
aRv.Throw(NS_ERROR_FAILURE);
return nullptr;
}
}
return consumer.forget();
nsCOMPtr<nsIRunnable> r = new BeginConsumeBodyRunnable<Derived>(consumer);
aRv = aMainThreadEventTarget->Dispatch(r.forget(), NS_DISPATCH_NORMAL);
if (NS_WARN_IF(aRv.Failed())) {
return nullptr;
}
return promise.forget();
}
template <class Derived>
@ -85,15 +356,31 @@ FetchBodyConsumer<Derived>::ReleaseObject()
}
template <class Derived>
FetchBodyConsumer<Derived>::FetchBodyConsumer(FetchBody<Derived>* aBody)
FetchBodyConsumer<Derived>::FetchBodyConsumer(nsIEventTarget* aMainThreadEventTarget,
WorkerPrivate* aWorkerPrivate,
FetchBody<Derived>* aBody,
Promise* aPromise,
FetchConsumeType aType)
: mTargetThread(NS_GetCurrentThread())
, mMainThreadEventTarget(aMainThreadEventTarget)
, mBody(aBody)
{}
, mWorkerPrivate(aWorkerPrivate)
, mConsumeType(aType)
, mConsumePromise(aPromise)
#ifdef DEBUG
, mReadDone(false)
#endif
{
MOZ_ASSERT(aMainThreadEventTarget);
MOZ_ASSERT(aBody);
MOZ_ASSERT(aPromise);
}
template <class Derived>
FetchBodyConsumer<Derived>::~FetchBodyConsumer()
{
NS_ProxyRelease(mTargetThread, mBody.forget());
NS_ProxyRelease("FetchBodyConsumer::mBody",
mTargetThread, mBody.forget());
}
template <class Derived>
@ -122,5 +409,271 @@ FetchBodyConsumer<Derived>::RegisterWorkerHolder(WorkerPrivate* aWorkerPrivate)
return true;
}
/*
* BeginConsumeBodyMainThread() will automatically reject the consume promise
* and clean up on any failures, so there is no need for callers to do so,
* reflected in a lack of error return code.
*/
template <class Derived>
void
FetchBodyConsumer<Derived>::BeginConsumeBodyMainThread()
{
AssertIsOnMainThread();
AutoFailConsumeBody<Derived> autoReject(this);
nsresult rv;
nsCOMPtr<nsIInputStream> stream;
mBody->DerivedClass()->GetBody(getter_AddRefs(stream));
if (!stream) {
rv = NS_NewCStringInputStream(getter_AddRefs(stream), EmptyCString());
if (NS_WARN_IF(NS_FAILED(rv))) {
return;
}
}
nsCOMPtr<nsIInputStreamPump> pump;
rv = NS_NewInputStreamPump(getter_AddRefs(pump),
stream, -1, -1, 0, 0, false,
mMainThreadEventTarget);
if (NS_WARN_IF(NS_FAILED(rv))) {
return;
}
RefPtr<ConsumeBodyDoneObserver<Derived>> p =
new ConsumeBodyDoneObserver<Derived>(this);
nsCOMPtr<nsIStreamListener> listener;
if (mConsumeType == CONSUME_BLOB) {
MutableBlobStorage::MutableBlobStorageType type =
MutableBlobStorage::eOnlyInMemory;
const mozilla::UniquePtr<mozilla::ipc::PrincipalInfo>& principalInfo =
mBody->DerivedClass()->GetPrincipalInfo();
// We support temporary file for blobs only if the principal is known and
// it's system or content not in private Browsing.
if (principalInfo &&
(principalInfo->type() == mozilla::ipc::PrincipalInfo::TSystemPrincipalInfo ||
(principalInfo->type() == mozilla::ipc::PrincipalInfo::TContentPrincipalInfo &&
principalInfo->get_ContentPrincipalInfo().attrs().mPrivateBrowsingId == 0))) {
type = MutableBlobStorage::eCouldBeInTemporaryFile;
}
listener = new MutableBlobStreamListener(type, nullptr, mBody->MimeType(),
p, mMainThreadEventTarget);
} else {
nsCOMPtr<nsIStreamLoader> loader;
rv = NS_NewStreamLoader(getter_AddRefs(loader), p);
if (NS_WARN_IF(NS_FAILED(rv))) {
return;
}
listener = loader;
}
rv = pump->AsyncRead(listener, nullptr);
if (NS_WARN_IF(NS_FAILED(rv))) {
return;
}
// Now that everything succeeded, we can assign the pump to a pointer that
// stays alive for the lifetime of the FetchBody.
mConsumeBodyPump =
new nsMainThreadPtrHolder<nsIInputStreamPump>("FetchBodyConsumer::mConsumeBodyPump",
pump, mMainThreadEventTarget);
// It is ok for retargeting to fail and reads to happen on the main thread.
autoReject.DontFail();
// Try to retarget, otherwise fall back to main thread.
nsCOMPtr<nsIThreadRetargetableRequest> rr = do_QueryInterface(pump);
if (rr) {
nsCOMPtr<nsIEventTarget> sts = do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID);
rv = rr->RetargetDeliveryTo(sts);
if (NS_WARN_IF(NS_FAILED(rv))) {
NS_WARNING("Retargeting failed");
}
}
}
template <class Derived>
void
FetchBodyConsumer<Derived>::ContinueConsumeBody(nsresult aStatus,
uint32_t aResultLength,
uint8_t* aResult)
{
AssertIsOnTargetThread();
// Just a precaution to ensure ContinueConsumeBody is not called out of
// sync with a body read.
MOZ_ASSERT(mBody->BodyUsed());
MOZ_ASSERT(!mReadDone);
#ifdef DEBUG
mReadDone = true;
#endif
auto autoFree = mozilla::MakeScopeExit([&] {
free(aResult);
});
MOZ_ASSERT(mConsumePromise);
RefPtr<Promise> localPromise = mConsumePromise.forget();
RefPtr<FetchBodyConsumer<Derived>> self = this;
auto autoReleaseObject = mozilla::MakeScopeExit([&] {
self->ReleaseObject();
});
if (NS_WARN_IF(NS_FAILED(aStatus))) {
localPromise->MaybeReject(NS_ERROR_DOM_ABORT_ERR);
// If binding aborted, cancel the pump. We can't assert mConsumeBodyPump.
// In the (admittedly rare) situation that BeginConsumeBodyMainThread()
// context switches out, and the worker thread gets canceled before the
// pump is setup, mConsumeBodyPump will be null.
// We've to use the !! form since non-main thread pointer access on
// a nsMainThreadPtrHandle is not permitted.
if (aStatus == NS_BINDING_ABORTED && !!mConsumeBodyPump) {
if (NS_IsMainThread()) {
CancelPump();
} else {
MOZ_ASSERT(mWorkerPrivate);
// In case of worker thread, we block the worker while the request is
// canceled on the main thread. This ensures that OnStreamComplete has
// a valid FetchBody around to call CancelPump and we don't release the
// FetchBody on the main thread.
RefPtr<CancelPumpRunnable<Derived>> r =
new CancelPumpRunnable<Derived>(this);
ErrorResult rv;
r->Dispatch(Terminating, rv);
if (rv.Failed()) {
NS_WARNING("Could not dispatch CancelPumpRunnable. Nothing we can do here");
// None of our callers are callled directly from JS, so there is no
// point in trying to propagate this failure out of here. And
// localPromise is already rejected. Just suppress the failure.
rv.SuppressException();
}
}
}
}
// Release the pump and then early exit if there was an error.
// Uses NS_ProxyRelease internally, so this is safe.
mConsumeBodyPump = nullptr;
// Don't warn here since we warned above.
if (NS_FAILED(aStatus)) {
return;
}
// Finish successfully consuming body according to type.
MOZ_ASSERT(aResult);
AutoJSAPI jsapi;
if (!jsapi.Init(mBody->DerivedClass()->GetParentObject())) {
localPromise->MaybeReject(NS_ERROR_UNEXPECTED);
return;
}
JSContext* cx = jsapi.cx();
ErrorResult error;
switch (mConsumeType) {
case CONSUME_ARRAYBUFFER: {
JS::Rooted<JSObject*> arrayBuffer(cx);
BodyUtil::ConsumeArrayBuffer(cx, &arrayBuffer, aResultLength, aResult,
error);
if (!error.Failed()) {
JS::Rooted<JS::Value> val(cx);
val.setObjectOrNull(arrayBuffer);
localPromise->MaybeResolve(cx, val);
// ArrayBuffer takes over ownership.
aResult = nullptr;
}
break;
}
case CONSUME_BLOB: {
MOZ_CRASH("This should not happen.");
break;
}
case CONSUME_FORMDATA: {
nsCString data;
data.Adopt(reinterpret_cast<char*>(aResult), aResultLength);
aResult = nullptr;
RefPtr<dom::FormData> fd =
BodyUtil::ConsumeFormData(mBody->DerivedClass()->GetParentObject(),
mBody->MimeType(), data, error);
if (!error.Failed()) {
localPromise->MaybeResolve(fd);
}
break;
}
case CONSUME_TEXT:
// fall through handles early exit.
case CONSUME_JSON: {
nsString decoded;
if (NS_SUCCEEDED(BodyUtil::ConsumeText(aResultLength, aResult, decoded))) {
if (mConsumeType == CONSUME_TEXT) {
localPromise->MaybeResolve(decoded);
} else {
JS::Rooted<JS::Value> json(cx);
BodyUtil::ConsumeJson(cx, &json, decoded, error);
if (!error.Failed()) {
localPromise->MaybeResolve(cx, json);
}
}
};
break;
}
default:
NS_NOTREACHED("Unexpected consume body type");
}
error.WouldReportJSException();
if (error.Failed()) {
localPromise->MaybeReject(error);
}
}
template <class Derived>
void
FetchBodyConsumer<Derived>::ContinueConsumeBlobBody(BlobImpl* aBlobImpl)
{
AssertIsOnTargetThread();
// Just a precaution to ensure ContinueConsumeBody is not called out of
// sync with a body read.
MOZ_ASSERT(mBody->BodyUsed());
MOZ_ASSERT(!mReadDone);
MOZ_ASSERT(mConsumeType == CONSUME_BLOB);
#ifdef DEBUG
mReadDone = true;
#endif
MOZ_ASSERT(mConsumePromise);
RefPtr<Promise> localPromise = mConsumePromise.forget();
// Release the pump and then early exit if there was an error.
// Uses NS_ProxyRelease internally, so this is safe.
mConsumeBodyPump = nullptr;
RefPtr<dom::Blob> blob =
dom::Blob::Create(mBody->DerivedClass()->GetParentObject(), aBlobImpl);
MOZ_ASSERT(blob);
localPromise->MaybeResolve(blob);
ReleaseObject();
}
template <class Derived>
void
FetchBodyConsumer<Derived>::CancelPump()
{
AssertIsOnMainThread();
MOZ_ASSERT(mConsumeBodyPump);
mConsumeBodyPump->Cancel(NS_BINDING_ABORTED);
}
} // namespace dom
} // namespace mozilla

View File

@ -7,11 +7,15 @@
#ifndef mozilla_dom_FetchConsumer_h
#define mozilla_dom_FetchConsumer_h
#include "Fetch.h"
class nsIThread;
namespace mozilla {
namespace dom {
class Promise;
namespace workers {
class WorkerPrivate;
class WorkerHolder;
@ -28,8 +32,12 @@ class FetchBodyConsumer final
public:
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(FetchBodyConsumer<Derived>)
static already_AddRefed<FetchBodyConsumer<Derived>>
Create(FetchBody<Derived>* aBody);
static already_AddRefed<Promise>
Create(nsIGlobalObject* aGlobal,
nsIEventTarget* aMainThreadEventTarget,
FetchBody<Derived>* aBody,
FetchConsumeType aType,
ErrorResult& aRv);
void
ReleaseObject();
@ -40,8 +48,30 @@ public:
return mBody;
}
void
BeginConsumeBodyMainThread();
void
ContinueConsumeBody(nsresult aStatus, uint32_t aLength, uint8_t* aResult);
void
ContinueConsumeBlobBody(BlobImpl* aBlobImpl);
void
CancelPump();
workers::WorkerPrivate*
GetWorkerPrivate() const
{
return mWorkerPrivate;
}
private:
explicit FetchBodyConsumer(FetchBody<Derived>* aBody);
FetchBodyConsumer(nsIEventTarget* aMainThreadEventTarget,
workers::WorkerPrivate* aWorkerPrivate,
FetchBody<Derived>* aBody,
Promise* aPromise,
FetchConsumeType aType);
~FetchBodyConsumer();
@ -52,12 +82,26 @@ private:
RegisterWorkerHolder(workers::WorkerPrivate* aWorkerPrivate);
nsCOMPtr<nsIThread> mTargetThread;
nsCOMPtr<nsIEventTarget> mMainThreadEventTarget;
RefPtr<FetchBody<Derived>> mBody;
// Set when consuming the body is attempted on a worker.
// Unset when consumption is done/aborted.
// This WorkerHolder keeps alive the consumer via a cycle.
UniquePtr<workers::WorkerHolder> mWorkerHolder;
// Always set whenever the FetchBodyConsumer is created on the worker thread.
workers::WorkerPrivate* mWorkerPrivate;
nsMainThreadPtrHandle<nsIInputStreamPump> mConsumeBodyPump;
// Only ever set once, always on target thread.
FetchConsumeType mConsumeType;
RefPtr<Promise> mConsumePromise;
#ifdef DEBUG
bool mReadDone;
#endif
};
} // namespace dom

View File

@ -19,6 +19,7 @@
#include "nsIStreamLoader.h"
#include "nsIThreadRetargetableRequest.h"
#include "nsIInputStreamPump.h"
#include "nsIPrincipal.h"
#include "nsIScriptError.h"
#include "nsIScriptSecurityManager.h"

View File

@ -16,6 +16,7 @@
#include "mozilla/dom/Response.h"
#include "mozilla/dom/ScriptSettings.h"
#include "mozilla/dom/ScriptLoader.h"
#include "nsIInputStreamPump.h"
#include "nsIThreadRetargetableRequest.h"
#include "nsNetUtil.h"
#include "xpcprivate.h"