Bug 1128959 - Implement the WHATWG Streams spec - part 9 - FetchStreamReader, r=bkelly

This commit is contained in:
Andrea Marchesini 2017-08-10 18:04:55 -07:00
parent 3a14f6c21f
commit e44cd2d765
10 changed files with 567 additions and 59 deletions

View File

@ -900,6 +900,7 @@ FetchBody<Derived>::FetchBody(nsIGlobalObject* aOwner)
: mOwner(aOwner) : mOwner(aOwner)
, mWorkerPrivate(nullptr) , mWorkerPrivate(nullptr)
, mReadableStreamBody(nullptr) , mReadableStreamBody(nullptr)
, mReadableStreamReader(nullptr)
, mBodyUsed(false) , mBodyUsed(false)
{ {
MOZ_ASSERT(aOwner); MOZ_ASSERT(aOwner);
@ -926,6 +927,12 @@ FetchBody<Derived>::~FetchBody()
{ {
} }
template
FetchBody<Request>::~FetchBody();
template
FetchBody<Response>::~FetchBody();
template <class Derived> template <class Derived>
bool bool
FetchBody<Derived>::BodyUsed() const FetchBody<Derived>::BodyUsed() const
@ -974,13 +981,27 @@ FetchBody<Derived>::ConsumeBody(JSContext* aCx, FetchConsumeType aType,
SetBodyUsed(); SetBodyUsed();
// If we already created a ReadableStreamBody we have to lock it now because // If we already have a ReadableStreamBody and it has been created by DOM, we
// it can have been shared with other objects. // have to lock it now because it can have been shared with other objects.
if (mReadableStreamBody) { if (mReadableStreamBody) {
JS::Rooted<JSObject*> body(aCx, mReadableStreamBody); JS::Rooted<JSObject*> readableStreamObj(aCx, mReadableStreamBody);
LockStream(aCx, body, aRv); if (JS::ReadableStreamGetMode(readableStreamObj) ==
if (NS_WARN_IF(aRv.Failed())) { JS::ReadableStreamMode::ExternalSource) {
return nullptr; LockStream(aCx, readableStreamObj, aRv);
if (NS_WARN_IF(aRv.Failed())) {
return nullptr;
}
} else {
// If this is not a native ReadableStream, let's activate the
// FetchStreamReader.
MOZ_ASSERT(mFetchStreamReader);
JS::Rooted<JSObject*> reader(aCx);
mFetchStreamReader->StartConsuming(aCx, readableStreamObj, &reader, aRv);
if (NS_WARN_IF(aRv.Failed())) {
return nullptr;
}
mReadableStreamReader = reader;
} }
} }
@ -1057,6 +1078,11 @@ FetchBody<Derived>::GetBody(JSContext* aCx,
JS::MutableHandle<JSObject*> aBodyOut, JS::MutableHandle<JSObject*> aBodyOut,
ErrorResult& aRv) ErrorResult& aRv)
{ {
if (mReadableStreamBody) {
aBodyOut.set(mReadableStreamBody);
return;
}
nsCOMPtr<nsIInputStream> inputStream; nsCOMPtr<nsIInputStream> inputStream;
DerivedClass()->GetBody(getter_AddRefs(inputStream)); DerivedClass()->GetBody(getter_AddRefs(inputStream));
@ -1065,30 +1091,27 @@ FetchBody<Derived>::GetBody(JSContext* aCx,
return; return;
} }
if (!mReadableStreamBody) { JS::Rooted<JSObject*> body(aCx,
JS::Rooted<JSObject*> body(aCx, FetchStream::Create(aCx,
FetchStream::Create(aCx, this,
this, DerivedClass()->GetParentObject(),
DerivedClass()->GetParentObject(), inputStream,
inputStream, aRv));
aRv)); if (NS_WARN_IF(aRv.Failed())) {
return;
}
MOZ_ASSERT(body);
// If the body has been already consumed, we lock the stream.
if (BodyUsed()) {
LockStream(aCx, body, aRv);
if (NS_WARN_IF(aRv.Failed())) { if (NS_WARN_IF(aRv.Failed())) {
return; return;
} }
MOZ_ASSERT(body);
// If the body has been already consumed, we close the stream.
if (BodyUsed()) {
LockStream(aCx, body, aRv);
if (NS_WARN_IF(aRv.Failed())) {
return;
}
}
mReadableStreamBody = body;
} }
mReadableStreamBody = body;
aBodyOut.set(mReadableStreamBody); aBodyOut.set(mReadableStreamBody);
} }
@ -1110,7 +1133,15 @@ FetchBody<Derived>::LockStream(JSContext* aCx,
JS::HandleObject aStream, JS::HandleObject aStream,
ErrorResult& aRv) ErrorResult& aRv)
{ {
// TODO: next patch. JS::Rooted<JSObject*> reader(aCx,
JS::ReadableStreamGetReader(aCx, aStream,
JS::ReadableStreamReaderMode::Default));
if (!reader) {
aRv.StealExceptionFromJSContext(aCx);
return;
}
mReadableStreamReader = reader;
} }
template template
@ -1129,10 +1160,18 @@ template <class Derived>
void void
FetchBody<Derived>::MaybeTeeReadableStreamBody(JSContext* aCx, FetchBody<Derived>::MaybeTeeReadableStreamBody(JSContext* aCx,
JS::MutableHandle<JSObject*> aBodyOut, JS::MutableHandle<JSObject*> aBodyOut,
FetchStreamReader** aStreamReader,
nsIInputStream** aInputStream,
ErrorResult& aRv) ErrorResult& aRv)
{ {
MOZ_DIAGNOSTIC_ASSERT(aStreamReader);
MOZ_DIAGNOSTIC_ASSERT(aInputStream);
MOZ_DIAGNOSTIC_ASSERT(!BodyUsed()); MOZ_DIAGNOSTIC_ASSERT(!BodyUsed());
aBodyOut.set(nullptr);
*aStreamReader = nullptr;
*aInputStream = nullptr;
if (!mReadableStreamBody) { if (!mReadableStreamBody) {
return; return;
} }
@ -1157,18 +1196,27 @@ FetchBody<Derived>::MaybeTeeReadableStreamBody(JSContext* aCx,
mReadableStreamBody = branch1; mReadableStreamBody = branch1;
aBodyOut.set(branch2); aBodyOut.set(branch2);
aRv = FetchStreamReader::Create(aCx, mOwner, aStreamReader, aInputStream);
if (NS_WARN_IF(aRv.Failed())) {
return;
}
} }
template template
void void
FetchBody<Request>::MaybeTeeReadableStreamBody(JSContext* aCx, FetchBody<Request>::MaybeTeeReadableStreamBody(JSContext* aCx,
JS::MutableHandle<JSObject*> aMessage, JS::MutableHandle<JSObject*> aMessage,
FetchStreamReader** aStreamReader,
nsIInputStream** aInputStream,
ErrorResult& aRv); ErrorResult& aRv);
template template
void void
FetchBody<Response>::MaybeTeeReadableStreamBody(JSContext* aCx, FetchBody<Response>::MaybeTeeReadableStreamBody(JSContext* aCx,
JS::MutableHandle<JSObject*> aMessage, JS::MutableHandle<JSObject*> aMessage,
FetchStreamReader** aStreamReader,
nsIInputStream** aInputStream,
ErrorResult& aRv); ErrorResult& aRv);
} // namespace dom } // namespace dom

View File

@ -18,6 +18,7 @@
#include "mozilla/DebugOnly.h" #include "mozilla/DebugOnly.h"
#include "mozilla/ErrorResult.h" #include "mozilla/ErrorResult.h"
#include "mozilla/dom/Promise.h" #include "mozilla/dom/Promise.h"
#include "mozilla/dom/FetchStreamReader.h"
#include "mozilla/dom/RequestBinding.h" #include "mozilla/dom/RequestBinding.h"
class nsIGlobalObject; class nsIGlobalObject;
@ -185,6 +186,8 @@ public:
void void
MaybeTeeReadableStreamBody(JSContext* aCx, MaybeTeeReadableStreamBody(JSContext* aCx,
JS::MutableHandle<JSObject*> aBodyOut, JS::MutableHandle<JSObject*> aBodyOut,
FetchStreamReader** aStreamReader,
nsIInputStream** aInputStream,
ErrorResult& aRv); ErrorResult& aRv);
// Utility public methods accessed by various runnables. // Utility public methods accessed by various runnables.
@ -201,10 +204,13 @@ public:
return mMimeType; return mMimeType;
} }
// FetchStreamHolder
void void
NullifyStream() override NullifyStream() override
{ {
mReadableStreamBody = nullptr; mReadableStreamBody = nullptr;
mReadableStreamReader = nullptr;
mFetchStreamReader = nullptr;
} }
protected: protected:
@ -217,6 +223,10 @@ protected:
// FetchStream object. // FetchStream object.
JS::Heap<JSObject*> mReadableStreamBody; JS::Heap<JSObject*> mReadableStreamBody;
// This is the Reader used to retrieve data from the body.
JS::Heap<JSObject*> mReadableStreamReader;
RefPtr<FetchStreamReader> mFetchStreamReader;
explicit FetchBody(nsIGlobalObject* aOwner); explicit FetchBody(nsIGlobalObject* aOwner);
virtual ~FetchBody(); virtual ~FetchBody();

View File

@ -0,0 +1,308 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "FetchStreamReader.h"
#include "InternalResponse.h"
#include "mozilla/dom/PromiseBinding.h"
namespace mozilla {
namespace dom {
using namespace workers;
namespace {
class FetchStreamReaderWorkerHolder final : public WorkerHolder
{
public:
explicit FetchStreamReaderWorkerHolder(FetchStreamReader* aReader)
: WorkerHolder(WorkerHolder::Behavior::AllowIdleShutdownStart)
, mReader(aReader)
, mWasNotified(false)
{}
bool Notify(Status aStatus) override
{
if (!mWasNotified) {
mWasNotified = true;
mReader->CloseAndRelease(NS_ERROR_DOM_INVALID_STATE_ERR);
}
return true;
}
private:
RefPtr<FetchStreamReader> mReader;
bool mWasNotified;
};
} // anonymous
NS_IMPL_ISUPPORTS(FetchStreamReader, nsIOutputStreamCallback)
/* static */ nsresult
FetchStreamReader::Create(JSContext* aCx, nsIGlobalObject* aGlobal,
FetchStreamReader** aStreamReader,
nsIInputStream** aInputStream)
{
MOZ_ASSERT(aCx);
MOZ_ASSERT(aGlobal);
MOZ_ASSERT(aStreamReader);
MOZ_ASSERT(aInputStream);
RefPtr<FetchStreamReader> streamReader = new FetchStreamReader(aGlobal);
nsCOMPtr<nsIAsyncInputStream> pipeIn;
nsresult rv = NS_NewPipe2(getter_AddRefs(pipeIn),
getter_AddRefs(streamReader->mPipeOut),
true, true, 0, 0);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
if (!NS_IsMainThread()) {
WorkerPrivate* workerPrivate = GetWorkerPrivateFromContext(aCx);
MOZ_ASSERT(workerPrivate);
// We need to know when the worker goes away.
UniquePtr<FetchStreamReaderWorkerHolder> holder(
new FetchStreamReaderWorkerHolder(streamReader));
if (NS_WARN_IF(!holder->HoldWorker(workerPrivate, Closing))) {
streamReader->mPipeOut->CloseWithStatus(NS_ERROR_DOM_INVALID_STATE_ERR);
return NS_ERROR_DOM_INVALID_STATE_ERR;
}
// These 2 objects create a ref-cycle here that is broken when the stream is
// closed or the worker shutsdown.
streamReader->mWorkerHolder = Move(holder);
}
pipeIn.forget(aInputStream);
streamReader.forget(aStreamReader);
return NS_OK;
}
FetchStreamReader::FetchStreamReader(nsIGlobalObject* aGlobal)
: mGlobal(aGlobal)
, mOwningEventTarget(mGlobal->EventTargetFor(TaskCategory::Other))
, mBufferRemaining(0)
, mBufferOffset(0)
, mStreamClosed(false)
{
MOZ_ASSERT(aGlobal);
}
FetchStreamReader::~FetchStreamReader()
{
CloseAndRelease(NS_BASE_STREAM_CLOSED);
}
void
FetchStreamReader::CloseAndRelease(nsresult aStatus)
{
NS_ASSERT_OWNINGTHREAD(FetchStreamReader);
if (mStreamClosed) {
// Already closed.
return;
}
RefPtr<FetchStreamReader> kungFuDeathGrip = this;
mStreamClosed = true;
mGlobal = nullptr;
mPipeOut->CloseWithStatus(aStatus);
mPipeOut = nullptr;
mWorkerHolder = nullptr;
mReader = nullptr;
mBuffer = nullptr;
}
void
FetchStreamReader::StartConsuming(JSContext* aCx,
JS::HandleObject aStream,
JS::MutableHandle<JSObject*> aReader,
ErrorResult& aRv)
{
MOZ_DIAGNOSTIC_ASSERT(!mReader);
MOZ_DIAGNOSTIC_ASSERT(aStream);
JS::Rooted<JSObject*> reader(aCx,
JS::ReadableStreamGetReader(aCx, aStream,
JS::ReadableStreamReaderMode::Default));
if (!reader) {
aRv.StealExceptionFromJSContext(aCx);
CloseAndRelease(NS_ERROR_DOM_INVALID_STATE_ERR);
return;
}
mReader = reader;
aReader.set(reader);
aRv = mPipeOut->AsyncWait(this, 0, 0, mOwningEventTarget);
if (NS_WARN_IF(aRv.Failed())) {
return;
}
}
// nsIOutputStreamCallback interface
NS_IMETHODIMP
FetchStreamReader::OnOutputStreamReady(nsIAsyncOutputStream* aStream)
{
NS_ASSERT_OWNINGTHREAD(FetchStreamReader);
MOZ_ASSERT(aStream == mPipeOut);
MOZ_ASSERT(mReader);
if (mStreamClosed) {
return NS_OK;
}
if (mBuffer) {
return WriteBuffer();
}
AutoJSAPI jsapi;
if (NS_WARN_IF(!jsapi.Init(mGlobal))) {
CloseAndRelease(NS_ERROR_DOM_INVALID_STATE_ERR);
return NS_ERROR_FAILURE;
}
JSContext* cx = jsapi.cx();
JS::Rooted<JSObject*> reader(cx, mReader);
JS::Rooted<JSObject*> promise(cx,
JS::ReadableStreamDefaultReaderRead(cx,
reader));
if (NS_WARN_IF(!promise)) {
// Let's close the stream.
CloseAndRelease(NS_ERROR_DOM_INVALID_STATE_ERR);
return NS_ERROR_FAILURE;
}
RefPtr<Promise> domPromise = Promise::CreateFromExisting(mGlobal, promise);
if (NS_WARN_IF(!domPromise)) {
// Let's close the stream.
CloseAndRelease(NS_ERROR_DOM_INVALID_STATE_ERR);
return NS_ERROR_FAILURE;
}
// Let's wait.
domPromise->AppendNativeHandler(this);
return NS_OK;
}
void
FetchStreamReader::ResolvedCallback(JSContext* aCx,
JS::Handle<JS::Value> aValue)
{
if (mStreamClosed) {
return;
}
// This promise should be resolved with { done: boolean, value: something },
// "value" is interesting only if done is false.
// We don't want to play with JS api, let's WebIDL bindings doing it for us.
// FetchReadableStreamReadDataDone is a dictionary with just a boolean, if the
// parsing succeeded, we can proceed with the parsing of the "value", which it
// must be a Uint8Array.
FetchReadableStreamReadDataDone valueDone;
if (!valueDone.Init(aCx, aValue)) {
JS_ClearPendingException(aCx);
CloseAndRelease(NS_ERROR_DOM_INVALID_STATE_ERR);
return;
}
if (valueDone.mDone) {
// Stream is completed.
CloseAndRelease(NS_BASE_STREAM_CLOSED);
return;
}
UniquePtr<FetchReadableStreamReadDataArray> value(
new FetchReadableStreamReadDataArray);
if (!value->Init(aCx, aValue) || !value->mValue.WasPassed()) {
JS_ClearPendingException(aCx);
CloseAndRelease(NS_ERROR_DOM_INVALID_STATE_ERR);
return;
}
Uint8Array& array = value->mValue.Value();
array.ComputeLengthAndData();
uint32_t len = array.Length();
if (len == 0) {
// If there is nothing to read, let's do another reading.
OnOutputStreamReady(mPipeOut);
return;
}
MOZ_DIAGNOSTIC_ASSERT(!mBuffer);
mBuffer = Move(value);
mBufferOffset = 0;
mBufferRemaining = len;
WriteBuffer();
}
nsresult
FetchStreamReader::WriteBuffer()
{
MOZ_ASSERT(mBuffer);
MOZ_ASSERT(mBuffer->mValue.WasPassed());
Uint8Array& array = mBuffer->mValue.Value();
char* data = reinterpret_cast<char*>(array.Data());
while (1) {
uint32_t written = 0;
nsresult rv =
mPipeOut->Write(data + mBufferOffset, mBufferRemaining, &written);
if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
break;
}
if (NS_WARN_IF(NS_FAILED(rv))) {
CloseAndRelease(rv);
return rv;
}
MOZ_ASSERT(written <= mBufferRemaining);
mBufferRemaining -= written;
mBufferOffset += written;
if (mBufferRemaining == 0) {
mBuffer = nullptr;
break;
}
}
nsresult rv = mPipeOut->AsyncWait(this, 0, 0, mOwningEventTarget);
if (NS_WARN_IF(NS_FAILED(rv))) {
CloseAndRelease(rv);
return rv;
}
return NS_OK;
}
void
FetchStreamReader::RejectedCallback(JSContext* aCx,
JS::Handle<JS::Value> aValue)
{
CloseAndRelease(NS_ERROR_FAILURE);
}
} // dom namespace
} // mozilla namespace

View File

@ -0,0 +1,77 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifndef mozilla_dom_FetchStreamReader_h
#define mozilla_dom_FetchStreamReader_h
#include "jsapi.h"
#include "mozilla/dom/FetchBinding.h"
#include "mozilla/dom/PromiseNativeHandler.h"
#include "nsIAsyncOutputStream.h"
namespace mozilla {
namespace dom {
namespace workers {
class WorkerHolder;
}
class FetchStreamReader final : public nsIOutputStreamCallback
, public PromiseNativeHandler
{
public:
NS_DECL_ISUPPORTS
NS_DECL_NSIOUTPUTSTREAMCALLBACK
// This creates a nsIInputStream able to retrieve data from the ReadableStream
// object. The reading starts when StartConsuming() is called.
static nsresult
Create(JSContext* aCx, nsIGlobalObject* aGlobal,
FetchStreamReader** aStreamReader,
nsIInputStream** aInputStream);
void
ResolvedCallback(JSContext* aCx, JS::Handle<JS::Value> aValue) override;
void
RejectedCallback(JSContext* aCx, JS::Handle<JS::Value> aValue) override;
void
CloseAndRelease(nsresult aStatus);
void
StartConsuming(JSContext* aCx,
JS::HandleObject aStream,
JS::MutableHandle<JSObject*> aReader,
ErrorResult& aRv);
private:
explicit FetchStreamReader(nsIGlobalObject* aGlobal);
~FetchStreamReader();
nsresult
WriteBuffer();
nsCOMPtr<nsIGlobalObject> mGlobal;
nsCOMPtr<nsIEventTarget> mOwningEventTarget;
nsCOMPtr<nsIAsyncOutputStream> mPipeOut;
UniquePtr<workers::WorkerHolder> mWorkerHolder;
JS::Heap<JSObject*> mReader;
UniquePtr<FetchReadableStreamReadDataArray> mBuffer;
uint32_t mBufferRemaining;
uint32_t mBufferOffset;
bool mStreamClosed;
};
} // dom namespace
} // mozilla namespace
#endif // mozilla_dom_FetchStreamReader_h

View File

@ -137,18 +137,18 @@ InternalResponse::ToIPC(IPCInternalResponse* aIPCResponse,
} }
already_AddRefed<InternalResponse> already_AddRefed<InternalResponse>
InternalResponse::Clone() InternalResponse::Clone(CloneType aCloneType)
{ {
RefPtr<InternalResponse> clone = CreateIncompleteCopy(); RefPtr<InternalResponse> clone = CreateIncompleteCopy();
clone->mHeaders = new InternalHeaders(*mHeaders); clone->mHeaders = new InternalHeaders(*mHeaders);
if (mWrappedResponse) { if (mWrappedResponse) {
clone->mWrappedResponse = mWrappedResponse->Clone(); clone->mWrappedResponse = mWrappedResponse->Clone(aCloneType);
MOZ_ASSERT(!mBody); MOZ_ASSERT(!mBody);
return clone.forget(); return clone.forget();
} }
if (!mBody) { if (!mBody || aCloneType == eDontCloneInputStream) {
return clone.forget(); return clone.forget();
} }

View File

@ -44,7 +44,13 @@ public:
M* aManager, M* aManager,
UniquePtr<mozilla::ipc::AutoIPCStream>& aAutoStream); UniquePtr<mozilla::ipc::AutoIPCStream>& aAutoStream);
already_AddRefed<InternalResponse> Clone(); enum CloneType
{
eCloneInputStream,
eDontCloneInputStream,
};
already_AddRefed<InternalResponse> Clone(CloneType eCloneType);
static already_AddRefed<InternalResponse> static already_AddRefed<InternalResponse>
NetworkError() NetworkError()

View File

@ -41,6 +41,8 @@ NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN(Request) NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN(Request)
NS_IMPL_CYCLE_COLLECTION_TRACE_JS_MEMBER_CALLBACK(mReadableStreamBody) NS_IMPL_CYCLE_COLLECTION_TRACE_JS_MEMBER_CALLBACK(mReadableStreamBody)
MOZ_DIAGNOSTIC_ASSERT(!tmp->mReadableStreamReader);
NS_IMPL_CYCLE_COLLECTION_TRACE_JS_MEMBER_CALLBACK(mReadableStreamReader)
NS_IMPL_CYCLE_COLLECTION_TRACE_PRESERVED_WRAPPER NS_IMPL_CYCLE_COLLECTION_TRACE_PRESERVED_WRAPPER
NS_IMPL_CYCLE_COLLECTION_TRACE_END NS_IMPL_CYCLE_COLLECTION_TRACE_END

View File

@ -22,6 +22,7 @@
#include "BodyExtractor.h" #include "BodyExtractor.h"
#include "FetchStream.h" #include "FetchStream.h"
#include "FetchStreamReader.h"
#include "InternalResponse.h" #include "InternalResponse.h"
#include "WorkerPrivate.h" #include "WorkerPrivate.h"
@ -38,6 +39,7 @@ NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(Response)
NS_IMPL_CYCLE_COLLECTION_UNLINK(mHeaders) NS_IMPL_CYCLE_COLLECTION_UNLINK(mHeaders)
tmp->mReadableStreamBody = nullptr; tmp->mReadableStreamBody = nullptr;
tmp->mReadableStreamReader = nullptr;
NS_IMPL_CYCLE_COLLECTION_UNLINK_PRESERVED_WRAPPER NS_IMPL_CYCLE_COLLECTION_UNLINK_PRESERVED_WRAPPER
NS_IMPL_CYCLE_COLLECTION_UNLINK_END NS_IMPL_CYCLE_COLLECTION_UNLINK_END
@ -49,6 +51,7 @@ NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN(Response) NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN(Response)
NS_IMPL_CYCLE_COLLECTION_TRACE_JS_MEMBER_CALLBACK(mReadableStreamBody) NS_IMPL_CYCLE_COLLECTION_TRACE_JS_MEMBER_CALLBACK(mReadableStreamBody)
NS_IMPL_CYCLE_COLLECTION_TRACE_JS_MEMBER_CALLBACK(mReadableStreamReader)
NS_IMPL_CYCLE_COLLECTION_TRACE_PRESERVED_WRAPPER NS_IMPL_CYCLE_COLLECTION_TRACE_PRESERVED_WRAPPER
NS_IMPL_CYCLE_COLLECTION_TRACE_END NS_IMPL_CYCLE_COLLECTION_TRACE_END
@ -225,7 +228,7 @@ Response::Constructor(const GlobalObject& aGlobal,
nsCString contentTypeWithCharset; nsCString contentTypeWithCharset;
nsCOMPtr<nsIInputStream> bodyStream; nsCOMPtr<nsIInputStream> bodyStream;
uint64_t bodySize = 0; int64_t bodySize = InternalResponse::UNKNOWN_BODY_SIZE;
if (aBody.Value().IsReadableStream()) { if (aBody.Value().IsReadableStream()) {
const ReadableStream& readableStream = const ReadableStream& readableStream =
@ -243,38 +246,50 @@ Response::Constructor(const GlobalObject& aGlobal,
r->SetReadableStreamBody(readableStreamObj); r->SetReadableStreamBody(readableStreamObj);
// TODO: see next patches if (JS::ReadableStreamGetMode(readableStreamObj) ==
MOZ_ASSERT(JS::ReadableStreamGetMode(readableStreamObj) != JS::ReadableStreamMode::ExternalSource) {
JS::ReadableStreamMode::ExternalSource); // If this is a DOM generated ReadableStream, we can extract the
// inputStream directly.
void* underlyingSource = nullptr;
if (!JS::ReadableStreamGetExternalUnderlyingSource(aGlobal.Context(),
readableStreamObj,
&underlyingSource)) {
aRv.StealExceptionFromJSContext(aGlobal.Context());
return nullptr;
}
void* underlyingSource = nullptr; MOZ_ASSERT(underlyingSource);
if (!JS::ReadableStreamGetExternalUnderlyingSource(aGlobal.Context(),
readableStreamObj,
&underlyingSource)) {
aRv.StealExceptionFromJSContext(aGlobal.Context());
return nullptr;
}
bodySize = InternalResponse::UNKNOWN_BODY_SIZE; aRv = FetchStream::RetrieveInputStream(underlyingSource,
getter_AddRefs(bodyStream));
MOZ_ASSERT(underlyingSource); // The releasing of the external source is needed in order to avoid an
aRv = FetchStream::RetrieveInputStream(underlyingSource, // extra stream lock.
getter_AddRefs(bodyStream)); JS::ReadableStreamReleaseExternalUnderlyingSource(readableStreamObj);
if (NS_WARN_IF(aRv.Failed())) {
// The releasing of the external source is needed in order to avoid an extra return nullptr;
// stream lock. }
JS::ReadableStreamReleaseExternalUnderlyingSource(readableStreamObj); } else {
if (NS_WARN_IF(aRv.Failed())) { // If this is a JS-created ReadableStream, let's create a
return nullptr; // FetchStreamReader.
aRv = FetchStreamReader::Create(aGlobal.Context(), global,
getter_AddRefs(r->mFetchStreamReader),
getter_AddRefs(bodyStream));
if (NS_WARN_IF(aRv.Failed())) {
return nullptr;
}
} }
} else { } else {
uint64_t size = 0;
aRv = ExtractByteStreamFromBody(aBody.Value(), aRv = ExtractByteStreamFromBody(aBody.Value(),
getter_AddRefs(bodyStream), getter_AddRefs(bodyStream),
contentTypeWithCharset, contentTypeWithCharset,
bodySize); size);
if (NS_WARN_IF(aRv.Failed())) { if (NS_WARN_IF(aRv.Failed())) {
return nullptr; return nullptr;
} }
bodySize = size;
} }
internalResponse->SetBody(bodyStream, bodySize); internalResponse->SetBody(bodyStream, bodySize);
@ -306,21 +321,35 @@ Response::Clone(JSContext* aCx, ErrorResult& aRv)
return nullptr; return nullptr;
} }
RefPtr<InternalResponse> ir = mInternalResponse->Clone(); RefPtr<FetchStreamReader> streamReader;
RefPtr<Response> response = new Response(mOwner, ir); nsCOMPtr<nsIInputStream> inputStream;
JS::Rooted<JSObject*> body(aCx); JS::Rooted<JSObject*> body(aCx);
MaybeTeeReadableStreamBody(aCx, &body, aRv); MaybeTeeReadableStreamBody(aCx, &body,
getter_AddRefs(streamReader),
getter_AddRefs(inputStream), aRv);
if (NS_WARN_IF(aRv.Failed())) { if (NS_WARN_IF(aRv.Failed())) {
return nullptr; return nullptr;
} }
MOZ_ASSERT_IF(body, streamReader);
MOZ_ASSERT_IF(body, inputStream);
RefPtr<InternalResponse> ir =
mInternalResponse->Clone(body
? InternalResponse::eDontCloneInputStream
: InternalResponse::eCloneInputStream);
RefPtr<Response> response = new Response(mOwner, ir);
if (body) { if (body) {
// Maybe we have a body, but we receive null from MaybeTeeReadableStreamBody // Maybe we have a body, but we receive null from MaybeTeeReadableStreamBody
// if this body is a native stream. In this case the InternalResponse will // if this body is a native stream. In this case the InternalResponse will
// have a clone of the native body and the ReadableStream will be created // have a clone of the native body and the ReadableStream will be created
// lazily if needed. // lazily if needed.
response->SetReadableStreamBody(body); response->SetReadableStreamBody(body);
response->mFetchStreamReader = streamReader;
ir->SetBody(inputStream, InternalResponse::UNKNOWN_BODY_SIZE);
} }
return response.forget(); return response.forget();
@ -334,22 +363,36 @@ Response::CloneUnfiltered(JSContext* aCx, ErrorResult& aRv)
return nullptr; return nullptr;
} }
RefPtr<InternalResponse> clone = mInternalResponse->Clone(); RefPtr<FetchStreamReader> streamReader;
RefPtr<InternalResponse> ir = clone->Unfiltered(); nsCOMPtr<nsIInputStream> inputStream;
RefPtr<Response> ref = new Response(mOwner, ir);
JS::Rooted<JSObject*> body(aCx); JS::Rooted<JSObject*> body(aCx);
MaybeTeeReadableStreamBody(aCx, &body, aRv); MaybeTeeReadableStreamBody(aCx, &body,
getter_AddRefs(streamReader),
getter_AddRefs(inputStream), aRv);
if (NS_WARN_IF(aRv.Failed())) { if (NS_WARN_IF(aRv.Failed())) {
return nullptr; return nullptr;
} }
MOZ_ASSERT_IF(body, streamReader);
MOZ_ASSERT_IF(body, inputStream);
RefPtr<InternalResponse> clone =
mInternalResponse->Clone(body
? InternalResponse::eDontCloneInputStream
: InternalResponse::eCloneInputStream);
RefPtr<InternalResponse> ir = clone->Unfiltered();
RefPtr<Response> ref = new Response(mOwner, ir);
if (body) { if (body) {
// Maybe we have a body, but we receive null from MaybeTeeReadableStreamBody // Maybe we have a body, but we receive null from MaybeTeeReadableStreamBody
// if this body is a native stream. In this case the InternalResponse will // if this body is a native stream. In this case the InternalResponse will
// have a clone of the native body and the ReadableStream will be created // have a clone of the native body and the ReadableStream will be created
// lazily if needed. // lazily if needed.
ref->SetReadableStreamBody(body); ref->SetReadableStreamBody(body);
ref->mFetchStreamReader = streamReader;
ir->SetBody(inputStream, InternalResponse::UNKNOWN_BODY_SIZE);
} }
return ref.forget(); return ref.forget();

View File

@ -16,6 +16,7 @@ EXPORTS.mozilla.dom += [
'FetchIPCTypes.h', 'FetchIPCTypes.h',
'FetchObserver.h', 'FetchObserver.h',
'FetchSignal.h', 'FetchSignal.h',
'FetchStreamReader.h',
'FetchUtil.h', 'FetchUtil.h',
'Headers.h', 'Headers.h',
'InternalHeaders.h', 'InternalHeaders.h',
@ -35,6 +36,7 @@ UNIFIED_SOURCES += [
'FetchObserver.cpp', 'FetchObserver.cpp',
'FetchSignal.cpp', 'FetchSignal.cpp',
'FetchStream.cpp', 'FetchStream.cpp',
'FetchStreamReader.cpp',
'FetchUtil.cpp', 'FetchUtil.cpp',
'Headers.cpp', 'Headers.cpp',
'InternalHeaders.cpp', 'InternalHeaders.cpp',

View File

@ -24,3 +24,15 @@ interface Body {
[Throws] [Throws]
Promise<USVString> text(); Promise<USVString> text();
}; };
// These are helper dictionaries for the parsing of a
// getReader().read().then(data) parsing.
// See more about how these 2 helpers are used in
// dom/fetch/FetchStreamReader.cpp
dictionary FetchReadableStreamReadDataDone {
boolean done = false;
};
dictionary FetchReadableStreamReadDataArray {
Uint8Array value;
};