Bug 1538754 - Consistently watch the nsIAsyncOutputStream for closure, r=asuth,saschanaz

Before these changes, there were a number of cases where it would be
possible for the pipe to be closed or otherwise encounter an error, and
for there to be a very large or potentially indefinite delay before that
error is reported to the ReadableStream.

This patch avoids changing too much of the structure of the existing
FetchStreamReader's design, while avoiding the potential pitfalls, and
ensuring that the stream is being consistently watched.

A more polished patch in the future may be able to perform a more
efficient and direct adaptation from a `ReadableStream` or
`WritableStream` to the `nsIAsyncInputStream` and `nsIAsyncOutputStream`
types, but that was not tackled here.

Differential Revision: https://phabricator.services.mozilla.com/D168481
This commit is contained in:
Nika Layzell 2023-03-30 15:14:00 +00:00
parent c69d4c6ee0
commit 0e64b40e0c
2 changed files with 92 additions and 49 deletions

View File

@ -8,6 +8,7 @@
#include "InternalResponse.h"
#include "mozilla/ConsoleReportCollector.h"
#include "mozilla/ErrorResult.h"
#include "mozilla/StaticAnalysisFunctions.h"
#include "mozilla/dom/AutoEntryScript.h"
#include "mozilla/dom/Promise.h"
#include "mozilla/dom/PromiseBinding.h"
@ -98,10 +99,7 @@ nsresult FetchStreamReader::Create(JSContext* aCx, nsIGlobalObject* aGlobal,
FetchStreamReader::FetchStreamReader(nsIGlobalObject* aGlobal)
: mGlobal(aGlobal),
mOwningEventTarget(mGlobal->EventTargetFor(TaskCategory::Other)),
mBufferRemaining(0),
mBufferOffset(0),
mStreamClosed(false) {
mOwningEventTarget(mGlobal->EventTargetFor(TaskCategory::Other)) {
MOZ_ASSERT(aGlobal);
mozilla::HoldJSObjects(this);
@ -190,11 +188,12 @@ void FetchStreamReader::StartConsuming(JSContext* aCx, ReadableStream* aStream,
mReader = reader;
reader.forget(aReader);
mAsyncWaitWorkerRef = mWorkerRef;
aRv = mPipeOut->AsyncWait(this, 0, 0, mOwningEventTarget);
if (NS_WARN_IF(aRv.Failed())) {
return;
mAsyncWaitWorkerRef = nullptr;
CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR);
}
mAsyncWaitWorkerRef = mWorkerRef;
}
struct FetchReadRequest : public ReadRequest {
@ -205,15 +204,18 @@ struct FetchReadRequest : public ReadRequest {
explicit FetchReadRequest(FetchStreamReader* aReader)
: mFetchStreamReader(aReader) {}
MOZ_CAN_RUN_SCRIPT_BOUNDARY
void ChunkSteps(JSContext* aCx, JS::Handle<JS::Value> aChunk,
ErrorResult& aRv) override {
mFetchStreamReader->ChunkSteps(aCx, aChunk, aRv);
}
MOZ_CAN_RUN_SCRIPT_BOUNDARY
void CloseSteps(JSContext* aCx, ErrorResult& aRv) override {
mFetchStreamReader->CloseAndRelease(aCx, NS_BASE_STREAM_CLOSED);
mFetchStreamReader->CloseSteps(aCx, aRv);
}
MOZ_CAN_RUN_SCRIPT_BOUNDARY
void ErrorSteps(JSContext* aCx, JS::Handle<JS::Value> aError,
ErrorResult& aRv) override {
mFetchStreamReader->ErrorSteps(aCx, aError, aRv);
@ -222,7 +224,7 @@ struct FetchReadRequest : public ReadRequest {
protected:
virtual ~FetchReadRequest() = default;
RefPtr<FetchStreamReader> mFetchStreamReader;
MOZ_KNOWN_LIVE RefPtr<FetchStreamReader> mFetchStreamReader;
};
NS_IMPL_CYCLE_COLLECTION_INHERITED(FetchReadRequest, ReadRequest,
@ -237,41 +239,71 @@ MOZ_CAN_RUN_SCRIPT_BOUNDARY
NS_IMETHODIMP
FetchStreamReader::OnOutputStreamReady(nsIAsyncOutputStream* aStream) {
NS_ASSERT_OWNINGTHREAD(FetchStreamReader);
mAsyncWaitWorkerRef = nullptr;
if (mStreamClosed) {
mAsyncWaitWorkerRef = nullptr;
return NS_OK;
}
// Only assert if we know the stream is not closed yet.
MOZ_ASSERT(aStream == mPipeOut);
AutoEntryScript aes(mGlobal, "ReadableStreamReader.read", !mWorkerRef);
if (!Process(aes.cx())) {
// We're done processing data, and haven't queued up a new AsyncWait - we
// can clear our mAsyncWaitWorkerRef.
mAsyncWaitWorkerRef = nullptr;
}
return NS_OK;
}
bool FetchStreamReader::Process(JSContext* aCx) {
NS_ASSERT_OWNINGTHREAD(FetchStreamReader);
MOZ_ASSERT(mReader);
if (!mBuffer.IsEmpty()) {
return WriteBuffer();
nsresult rv = WriteBuffer();
if (NS_WARN_IF(NS_FAILED(rv))) {
CloseAndRelease(aCx, NS_ERROR_DOM_ABORT_ERR);
return false;
}
return true;
}
// Here we can retrieve data from the reader using any global we want because
// it is not observable. We want to use the reader's global, which is also the
// Response's one.
AutoEntryScript aes(mGlobal, "ReadableStreamReader.read", !mWorkerRef);
IgnoredErrorResult rv;
// https://fetch.spec.whatwg.org/#incrementally-read-loop
// The below very loosely tries to implement the incrementally-read-loop from
// the fetch spec.
// Step 2: Read a chunk from reader given readRequest.
RefPtr<ReadRequest> readRequest = new FetchReadRequest(this);
RefPtr<ReadableStreamDefaultReader> reader = mReader;
reader->ReadChunk(aes.cx(), *readRequest, rv);
if (NS_WARN_IF(rv.Failed())) {
// Let's close the stream.
CloseAndRelease(aes.cx(), NS_ERROR_DOM_INVALID_STATE_ERR);
return NS_ERROR_FAILURE;
// Check if the output stream has already been closed. This lets us propagate
// errors eagerly, and detect output stream closures even when we have no data
// to write.
if (NS_WARN_IF(NS_FAILED(mPipeOut->StreamStatus()))) {
CloseAndRelease(aCx, NS_ERROR_DOM_ABORT_ERR);
return false;
}
return NS_OK;
// We're waiting on new data - set up a WAIT_CLOSURE_ONLY callback so we
// notice if the reader closes.
nsresult rv = mPipeOut->AsyncWait(
this, nsIAsyncOutputStream::WAIT_CLOSURE_ONLY, 0, mOwningEventTarget);
if (NS_WARN_IF(NS_FAILED(rv))) {
CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR);
return false;
}
// If we already have an outstanding read request, don't start another one
// concurrently.
if (!mHasOutstandingReadRequest) {
// https://fetch.spec.whatwg.org/#incrementally-read-loop
// The below very loosely tries to implement the incrementally-read-loop
// from the fetch spec.
// Step 2: Read a chunk from reader given readRequest.
RefPtr<ReadRequest> readRequest = new FetchReadRequest(this);
RefPtr<ReadableStreamDefaultReader> reader = mReader;
mHasOutstandingReadRequest = true;
IgnoredErrorResult err;
reader->ReadChunk(aCx, *readRequest, err);
if (NS_WARN_IF(err.Failed())) {
// Let's close the stream.
mHasOutstandingReadRequest = false;
CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR);
// Don't return false, as we've already called `AsyncWait`.
}
}
return true;
}
void FetchStreamReader::ChunkSteps(JSContext* aCx, JS::Handle<JS::Value> aChunk,
@ -279,6 +311,8 @@ void FetchStreamReader::ChunkSteps(JSContext* aCx, JS::Handle<JS::Value> aChunk,
// This roughly implements the chunk steps from
// https://fetch.spec.whatwg.org/#incrementally-read-loop.
mHasOutstandingReadRequest = false;
// Step 2. If chunk is not a Uint8Array object, then set continueAlgorithm to
// this step: run processBodyError given a TypeError.
RootedSpiderMonkeyInterface<Uint8Array> chunk(aCx);
@ -288,33 +322,31 @@ void FetchStreamReader::ChunkSteps(JSContext* aCx, JS::Handle<JS::Value> aChunk,
}
chunk.ComputeState();
uint32_t len = chunk.Length();
if (len == 0) {
// If there is nothing to read, let's do another reading.
OnOutputStreamReady(mPipeOut);
return;
}
MOZ_DIAGNOSTIC_ASSERT(mBuffer.IsEmpty());
// Let's take a copy of the data.
if (!mBuffer.AppendElements(chunk.Data(), len, fallible)) {
// FIXME: We could sometimes avoid this copy by trying to write `chunk`
// directly into `mPipeOut` eagerly, and only filling `mBuffer` if there isn't
// enough space in the pipe's buffer.
if (!mBuffer.AppendElements(chunk.Data(), chunk.Length(), fallible)) {
CloseAndRelease(aCx, NS_ERROR_OUT_OF_MEMORY);
return;
}
mBufferOffset = 0;
mBufferRemaining = len;
mBufferRemaining = chunk.Length();
nsresult rv = WriteBuffer();
if (NS_FAILED(rv)) {
// Normalize to a generic DOM exception.
CloseAndRelease(aCx, NS_ERROR_DOM_ABORT_ERR);
}
Process(aCx);
}
void FetchStreamReader::CloseSteps(JSContext* aCx, ErrorResult& aRv) {
mHasOutstandingReadRequest = false;
CloseAndRelease(aCx, NS_BASE_STREAM_CLOSED);
}
void FetchStreamReader::ErrorSteps(JSContext* aCx, JS::Handle<JS::Value> aError,
ErrorResult& aRv) {
mHasOutstandingReadRequest = false;
ReportErrorToConsole(aCx, aError);
CloseAndRelease(aCx, NS_ERROR_FAILURE);
}

View File

@ -34,8 +34,12 @@ class FetchStreamReader final : public nsIOutputStreamCallback {
FetchStreamReader** aStreamReader,
nsIInputStream** aInputStream);
MOZ_CAN_RUN_SCRIPT
void ChunkSteps(JSContext* aCx, JS::Handle<JS::Value> aChunk,
ErrorResult& aRv);
MOZ_CAN_RUN_SCRIPT
void CloseSteps(JSContext* aCx, ErrorResult& aRv);
MOZ_CAN_RUN_SCRIPT
void ErrorSteps(JSContext* aCx, JS::Handle<JS::Value> aError,
ErrorResult& aRv);
@ -59,6 +63,12 @@ class FetchStreamReader final : public nsIOutputStreamCallback {
nsresult WriteBuffer();
// Attempt to copy data from mBuffer into mPipeOut. Returns `true` if data was
// written, and AsyncWait callbacks or FetchReadRequest calls have been set up
// to write more data in the future, and `false` otherwise.
MOZ_CAN_RUN_SCRIPT
bool Process(JSContext* aCx);
void ReportErrorToConsole(JSContext* aCx, JS::Handle<JS::Value> aValue);
nsCOMPtr<nsIGlobalObject> mGlobal;
@ -72,10 +82,11 @@ class FetchStreamReader final : public nsIOutputStreamCallback {
RefPtr<ReadableStreamDefaultReader> mReader;
nsTArray<uint8_t> mBuffer;
uint32_t mBufferRemaining;
uint32_t mBufferOffset;
uint32_t mBufferRemaining = 0;
uint32_t mBufferOffset = 0;
bool mStreamClosed;
bool mHasOutstandingReadRequest = false;
bool mStreamClosed = false;
};
} // namespace mozilla::dom