mirror of
https://github.com/mozilla/gecko-dev.git
synced 2024-11-27 23:02:20 +00:00
Bug 1735664 - Implement the WritableStream APIs. r=mgaudet,smaug
This is a complete implementation of WritableStreams including AbortSignal that weren't supported by the SpiderMonkey implementation. There are still some XXX comments, but an early review is probably better for this. Differential Revision: https://phabricator.services.mozilla.com/D128419
This commit is contained in:
parent
0f5c5fe340
commit
3b37b2b050
@ -1577,6 +1577,15 @@ DOMInterfaces = {
|
||||
'implicitJSContext': [ 'addModule' ],
|
||||
},
|
||||
|
||||
# Bug 1734174: We should validate ReadableStream usage of implicitJSContext.
|
||||
'WritableStream': {
|
||||
'implicitJSContext': ['close'],
|
||||
},
|
||||
|
||||
'WritableStreamDefaultWriter': {
|
||||
'implicitJSContext': ['close', 'releaseLock'],
|
||||
},
|
||||
|
||||
'XMLSerializer': {
|
||||
'nativeType': 'nsDOMSerializer',
|
||||
'wrapperCache': False
|
||||
|
@ -91,6 +91,35 @@ inline void DequeueValue(QueueContainingClass aContainer,
|
||||
aResultValue.set(valueWithSize->mValue);
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#peek-queue-value
|
||||
template <class QueueContainingClass>
|
||||
inline void PeekQueueValue(QueueContainingClass aContainer,
|
||||
JS::MutableHandle<JS::Value> aResultValue) {
|
||||
// Step 1. Assert: container has [[queue]] and [[queueTotalSize]] internal
|
||||
// slots.
|
||||
// Step 2. Assert: container.[[queue]] is not empty.
|
||||
MOZ_ASSERT(!aContainer->Queue().isEmpty());
|
||||
|
||||
// Step 3. Let valueWithSize be container.[[queue]][0].
|
||||
ValueWithSize* valueWithSize = aContainer->Queue().getFirst();
|
||||
|
||||
// Step 4. Return valueWithSize’s value.
|
||||
aResultValue.set(valueWithSize->mValue);
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#reset-queue
|
||||
template <class QueueContainingClass>
|
||||
inline void ResetQueue(QueueContainingClass aContainer) {
|
||||
// Step 1. Assert: container has [[queue]] and [[queueTotalSize]] internal
|
||||
// slots. (implicit)
|
||||
|
||||
// Step 2. Set container.[[queue]] to a new empty list.
|
||||
aContainer->Queue().clear();
|
||||
|
||||
// Step 3. Set container.[[queueTotalSize]] to 0.
|
||||
aContainer->SetQueueTotalSize(0.0);
|
||||
}
|
||||
|
||||
} // namespace mozilla::dom
|
||||
|
||||
#endif
|
||||
|
@ -25,6 +25,7 @@
|
||||
#include "mozilla/dom/ReadableStreamDefaultReader.h"
|
||||
#include "mozilla/dom/ReadableStreamTee.h"
|
||||
#include "mozilla/dom/ScriptSettings.h"
|
||||
#include "mozilla/dom/StreamUtils.h"
|
||||
#include "mozilla/dom/TeeState.h"
|
||||
#include "mozilla/dom/UnderlyingSourceBinding.h"
|
||||
#include "nsCOMPtr.h"
|
||||
@ -81,29 +82,6 @@ void ReadableStream::SetReader(ReadableStreamDefaultReader* aReader) {
|
||||
mReader = aReader;
|
||||
}
|
||||
|
||||
// FIXME: This needs to go into a helper file.
|
||||
// Streams Spec: 7.4
|
||||
// https://streams.spec.whatwg.org/#validate-and-normalize-high-water-mark
|
||||
static double ExtractHighWaterMark(const QueuingStrategy& aStrategy,
|
||||
double aDefaultHWM, ErrorResult& aRv) {
|
||||
// Step 1.
|
||||
if (!aStrategy.mHighWaterMark.WasPassed()) {
|
||||
return aDefaultHWM;
|
||||
}
|
||||
|
||||
// Step 2.
|
||||
double highWaterMark = aStrategy.mHighWaterMark.Value();
|
||||
|
||||
// Step 3.
|
||||
if (mozilla::IsNaN(highWaterMark) || highWaterMark < 0) {
|
||||
aRv.ThrowRangeError("Invalid highWaterMark");
|
||||
return 0.0;
|
||||
}
|
||||
|
||||
// Step 4.
|
||||
return highWaterMark;
|
||||
}
|
||||
|
||||
// Streams Spec: 4.2.4: https://streams.spec.whatwg.org/#rs-prototype
|
||||
/* static */
|
||||
already_AddRefed<ReadableStream> ReadableStream::Constructor(
|
||||
|
@ -30,7 +30,7 @@ NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(ReadableStreamDefaultController)
|
||||
NS_IMPL_CYCLE_COLLECTION_UNLINK(mGlobal, mCancelAlgorithm,
|
||||
mStrategySizeAlgorithm, mPullAlgorithm,
|
||||
mStream)
|
||||
|
||||
tmp->mQueue.clear();
|
||||
NS_IMPL_CYCLE_COLLECTION_UNLINK_PRESERVED_WRAPPER
|
||||
NS_IMPL_CYCLE_COLLECTION_UNLINK_END
|
||||
NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN(ReadableStreamDefaultController)
|
||||
@ -285,19 +285,6 @@ void ReadableStreamDefaultController::Error(JSContext* aCx,
|
||||
ReadableStreamDefaultControllerError(aCx, this, aError, aRv);
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#reset-queue
|
||||
void ResetQueue(ReadableStreamDefaultController* aController) {
|
||||
// Step 1. Implicit.
|
||||
|
||||
// Step 2.
|
||||
//
|
||||
// This is described in the spec 'set queue to new empty list'
|
||||
aController->Queue().clear();
|
||||
|
||||
// Step 3.
|
||||
aController->SetQueueTotalSize(0.0);
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#readable-stream-default-controller-should-call-pull
|
||||
static bool ReadableStreamDefaultControllerShouldCallPull(
|
||||
ReadableStreamDefaultController* aController) {
|
||||
|
@ -26,9 +26,11 @@ namespace mozilla {
|
||||
namespace dom {
|
||||
|
||||
class ReadableStream;
|
||||
class ReadableStreamDefaultReader;
|
||||
struct UnderlyingSource;
|
||||
class UnderlyingSourceCancelCallbackHelper;
|
||||
class UnderlyingSourcePullCallbackHelper;
|
||||
class UnderlyingSourceStartCallbackHelper;
|
||||
|
||||
class ReadableStreamDefaultController final : public nsISupports,
|
||||
public nsWrapperCache {
|
||||
|
33
dom/streams/StreamUtils.cpp
Normal file
33
dom/streams/StreamUtils.cpp
Normal file
@ -0,0 +1,33 @@
|
||||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
#include "mozilla/dom/StreamUtils.h"
|
||||
#include "mozilla/FloatingPoint.h"
|
||||
#include "mozilla/dom/QueuingStrategyBinding.h"
|
||||
|
||||
namespace mozilla::dom {
|
||||
|
||||
// Streams Spec: 7.4
|
||||
// https://streams.spec.whatwg.org/#validate-and-normalize-high-water-mark
|
||||
double ExtractHighWaterMark(const QueuingStrategy& aStrategy,
|
||||
double aDefaultHWM, mozilla::ErrorResult& aRv) {
|
||||
// Step 1.
|
||||
if (!aStrategy.mHighWaterMark.WasPassed()) {
|
||||
return aDefaultHWM;
|
||||
}
|
||||
|
||||
// Step 2.
|
||||
double highWaterMark = aStrategy.mHighWaterMark.Value();
|
||||
|
||||
// Step 3.
|
||||
if (mozilla::IsNaN(highWaterMark) || highWaterMark < 0) {
|
||||
aRv.ThrowRangeError("Invalid highWaterMark");
|
||||
return 0.0;
|
||||
}
|
||||
|
||||
// Step 4.
|
||||
return highWaterMark;
|
||||
}
|
||||
|
||||
} // namespace mozilla::dom
|
19
dom/streams/StreamUtils.h
Normal file
19
dom/streams/StreamUtils.h
Normal file
@ -0,0 +1,19 @@
|
||||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
#ifndef mozilla_dom_StreamUtils_h
|
||||
#define mozilla_dom_StreamUtils_h
|
||||
|
||||
#include "mozilla/ErrorResult.h"
|
||||
|
||||
namespace mozilla::dom {
|
||||
|
||||
struct QueuingStrategy;
|
||||
|
||||
extern double ExtractHighWaterMark(const QueuingStrategy& aStrategy,
|
||||
double aDefaultHWM, ErrorResult& aRv);
|
||||
|
||||
} // namespace mozilla::dom
|
||||
|
||||
#endif // mozilla_dom_StreamUtils_h
|
146
dom/streams/UnderlyingSinkCallbackHelpers.cpp
Normal file
146
dom/streams/UnderlyingSinkCallbackHelpers.cpp
Normal file
@ -0,0 +1,146 @@
|
||||
/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
|
||||
/* vim:set ts=2 sw=2 sts=2 et cindent: */
|
||||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
#include "mozilla/dom/UnderlyingSinkCallbackHelpers.h"
|
||||
|
||||
using namespace mozilla::dom;
|
||||
|
||||
// UnderlyingSinkStartCallbackHelper
|
||||
NS_IMPL_CYCLE_COLLECTION_CLASS(UnderlyingSinkStartCallbackHelper)
|
||||
|
||||
NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(UnderlyingSinkStartCallbackHelper)
|
||||
tmp->mUnderlyingSink = nullptr;
|
||||
NS_IMPL_CYCLE_COLLECTION_UNLINK(mCallback)
|
||||
NS_IMPL_CYCLE_COLLECTION_UNLINK_END
|
||||
|
||||
NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN(UnderlyingSinkStartCallbackHelper)
|
||||
NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mCallback)
|
||||
NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
|
||||
|
||||
NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN(UnderlyingSinkStartCallbackHelper)
|
||||
NS_IMPL_CYCLE_COLLECTION_TRACE_JS_MEMBER_CALLBACK(mUnderlyingSink)
|
||||
NS_IMPL_CYCLE_COLLECTION_TRACE_END
|
||||
|
||||
NS_IMPL_CYCLE_COLLECTING_ADDREF(UnderlyingSinkStartCallbackHelper)
|
||||
NS_IMPL_CYCLE_COLLECTING_RELEASE(UnderlyingSinkStartCallbackHelper)
|
||||
|
||||
NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(UnderlyingSinkStartCallbackHelper)
|
||||
NS_INTERFACE_MAP_ENTRY(nsISupports)
|
||||
NS_INTERFACE_MAP_END
|
||||
|
||||
void UnderlyingSinkStartCallbackHelper::StartCallback(
|
||||
JSContext* aCx, WritableStreamDefaultController& aController,
|
||||
JS::MutableHandle<JS::Value> aRetVal, ErrorResult& aRv) {
|
||||
JS::Rooted<JSObject*> thisObj(aCx, mUnderlyingSink);
|
||||
RefPtr<UnderlyingSinkStartCallback> callback(mCallback);
|
||||
return callback->Call(thisObj, aController, aRetVal, aRv,
|
||||
"UnderlyingSink.start",
|
||||
CallbackFunction::eRethrowExceptions);
|
||||
}
|
||||
|
||||
// UnderlyingSinkWriteCallbackHelper
|
||||
NS_IMPL_CYCLE_COLLECTION_CLASS(UnderlyingSinkWriteCallbackHelper)
|
||||
|
||||
NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(UnderlyingSinkWriteCallbackHelper)
|
||||
tmp->mUnderlyingSink = nullptr;
|
||||
NS_IMPL_CYCLE_COLLECTION_UNLINK(mCallback)
|
||||
NS_IMPL_CYCLE_COLLECTION_UNLINK_END
|
||||
|
||||
NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN(UnderlyingSinkWriteCallbackHelper)
|
||||
NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mCallback)
|
||||
NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
|
||||
|
||||
NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN(UnderlyingSinkWriteCallbackHelper)
|
||||
NS_IMPL_CYCLE_COLLECTION_TRACE_JS_MEMBER_CALLBACK(mUnderlyingSink)
|
||||
NS_IMPL_CYCLE_COLLECTION_TRACE_END
|
||||
|
||||
NS_IMPL_CYCLE_COLLECTING_ADDREF(UnderlyingSinkWriteCallbackHelper)
|
||||
NS_IMPL_CYCLE_COLLECTING_RELEASE(UnderlyingSinkWriteCallbackHelper)
|
||||
|
||||
NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(UnderlyingSinkWriteCallbackHelper)
|
||||
NS_INTERFACE_MAP_ENTRY(nsISupports)
|
||||
NS_INTERFACE_MAP_END
|
||||
|
||||
already_AddRefed<Promise> UnderlyingSinkWriteCallbackHelper::WriteCallback(
|
||||
JSContext* aCx, JS::Handle<JS::Value> aChunk,
|
||||
WritableStreamDefaultController& aController, ErrorResult& aRv) {
|
||||
JS::Rooted<JSObject*> thisObj(aCx, mUnderlyingSink);
|
||||
RefPtr<UnderlyingSinkWriteCallback> callback(mCallback);
|
||||
RefPtr<Promise> promise =
|
||||
callback->Call(thisObj, aChunk, aController, aRv, "UnderlyingSink.write",
|
||||
CallbackFunction::eRethrowExceptions);
|
||||
return promise.forget();
|
||||
}
|
||||
|
||||
// UnderlyingSinkCloseCallbackHelper
|
||||
NS_IMPL_CYCLE_COLLECTION_CLASS(UnderlyingSinkCloseCallbackHelper)
|
||||
|
||||
NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(UnderlyingSinkCloseCallbackHelper)
|
||||
tmp->mUnderlyingSink = nullptr;
|
||||
NS_IMPL_CYCLE_COLLECTION_UNLINK(mCallback)
|
||||
NS_IMPL_CYCLE_COLLECTION_UNLINK_END
|
||||
|
||||
NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN(UnderlyingSinkCloseCallbackHelper)
|
||||
NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mCallback)
|
||||
NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
|
||||
|
||||
NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN(UnderlyingSinkCloseCallbackHelper)
|
||||
NS_IMPL_CYCLE_COLLECTION_TRACE_JS_MEMBER_CALLBACK(mUnderlyingSink)
|
||||
NS_IMPL_CYCLE_COLLECTION_TRACE_END
|
||||
|
||||
NS_IMPL_CYCLE_COLLECTING_ADDREF(UnderlyingSinkCloseCallbackHelper)
|
||||
NS_IMPL_CYCLE_COLLECTING_RELEASE(UnderlyingSinkCloseCallbackHelper)
|
||||
|
||||
NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(UnderlyingSinkCloseCallbackHelper)
|
||||
NS_INTERFACE_MAP_ENTRY(nsISupports)
|
||||
NS_INTERFACE_MAP_END
|
||||
|
||||
already_AddRefed<Promise> UnderlyingSinkCloseCallbackHelper::CloseCallback(
|
||||
JSContext* aCx, ErrorResult& aRv) {
|
||||
JS::Rooted<JSObject*> thisObj(aCx, mUnderlyingSink);
|
||||
RefPtr<UnderlyingSinkCloseCallback> callback(mCallback);
|
||||
RefPtr<Promise> promise =
|
||||
callback->Call(thisObj, aRv, "UnderlyingSink.close",
|
||||
CallbackFunction::eRethrowExceptions);
|
||||
return promise.forget();
|
||||
}
|
||||
|
||||
// UnderlyingSinkAbortCallbackHelper
|
||||
NS_IMPL_CYCLE_COLLECTION_CLASS(UnderlyingSinkAbortCallbackHelper)
|
||||
|
||||
NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(UnderlyingSinkAbortCallbackHelper)
|
||||
tmp->mUnderlyingSink = nullptr;
|
||||
NS_IMPL_CYCLE_COLLECTION_UNLINK(mCallback)
|
||||
NS_IMPL_CYCLE_COLLECTION_UNLINK_END
|
||||
|
||||
NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN(UnderlyingSinkAbortCallbackHelper)
|
||||
NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mCallback)
|
||||
NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
|
||||
|
||||
NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN(UnderlyingSinkAbortCallbackHelper)
|
||||
NS_IMPL_CYCLE_COLLECTION_TRACE_JS_MEMBER_CALLBACK(mUnderlyingSink)
|
||||
NS_IMPL_CYCLE_COLLECTION_TRACE_END
|
||||
|
||||
NS_IMPL_CYCLE_COLLECTING_ADDREF(UnderlyingSinkAbortCallbackHelper)
|
||||
NS_IMPL_CYCLE_COLLECTING_RELEASE(UnderlyingSinkAbortCallbackHelper)
|
||||
|
||||
NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(UnderlyingSinkAbortCallbackHelper)
|
||||
NS_INTERFACE_MAP_ENTRY(nsISupports)
|
||||
NS_INTERFACE_MAP_END
|
||||
|
||||
already_AddRefed<Promise> UnderlyingSinkAbortCallbackHelper::AbortCallback(
|
||||
JSContext* aCx, const Optional<JS::Handle<JS::Value>>& aReason,
|
||||
ErrorResult& aRv) {
|
||||
JS::Rooted<JSObject*> thisObj(aCx, mUnderlyingSink);
|
||||
|
||||
// Strong Ref
|
||||
RefPtr<UnderlyingSinkAbortCallback> callback(mCallback);
|
||||
RefPtr<Promise> promise =
|
||||
callback->Call(thisObj, aReason, aRv, "UnderlyingSink.abort",
|
||||
CallbackFunction::eRethrowExceptions);
|
||||
|
||||
return promise.forget();
|
||||
}
|
134
dom/streams/UnderlyingSinkCallbackHelpers.h
Normal file
134
dom/streams/UnderlyingSinkCallbackHelpers.h
Normal file
@ -0,0 +1,134 @@
|
||||
/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
|
||||
/* vim:set ts=2 sw=2 sts=2 et cindent: */
|
||||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
#ifndef mozilla_dom_UnderlyingSinkCallbackHelpers_h
|
||||
#define mozilla_dom_UnderlyingSinkCallbackHelpers_h
|
||||
|
||||
#include "mozilla/HoldDropJSObjects.h"
|
||||
#include "mozilla/dom/ModuleMapKey.h"
|
||||
#include "mozilla/dom/Promise.h"
|
||||
#include "mozilla/dom/UnderlyingSinkBinding.h"
|
||||
#include "nsISupports.h"
|
||||
#include "nsISupportsImpl.h"
|
||||
|
||||
namespace mozilla::dom {
|
||||
class WritableStreamDefaultController;
|
||||
}
|
||||
|
||||
/*
|
||||
* See the comment in UnderlyingSourceCallbackHelpers.h!
|
||||
*
|
||||
* A native implementation of these callbacks is however currently not required.
|
||||
*/
|
||||
namespace mozilla::dom {
|
||||
class UnderlyingSinkStartCallbackHelper : public nsISupports {
|
||||
public:
|
||||
NS_DECL_CYCLE_COLLECTING_ISUPPORTS
|
||||
NS_DECL_CYCLE_COLLECTION_SCRIPT_HOLDER_CLASS(
|
||||
UnderlyingSinkStartCallbackHelper)
|
||||
|
||||
UnderlyingSinkStartCallbackHelper(UnderlyingSinkStartCallback* aCallback,
|
||||
JS::Handle<JSObject*> aUnderlyingSink)
|
||||
: mUnderlyingSink(aUnderlyingSink), mCallback(aCallback) {
|
||||
mozilla::HoldJSObjects(this);
|
||||
}
|
||||
|
||||
MOZ_CAN_RUN_SCRIPT
|
||||
void StartCallback(JSContext* aCx,
|
||||
WritableStreamDefaultController& aController,
|
||||
JS::MutableHandle<JS::Value> aRetVal, ErrorResult& aRv);
|
||||
|
||||
protected:
|
||||
virtual ~UnderlyingSinkStartCallbackHelper() {
|
||||
mozilla::DropJSObjects(this);
|
||||
};
|
||||
|
||||
private:
|
||||
JS::Heap<JSObject*> mUnderlyingSink;
|
||||
RefPtr<UnderlyingSinkStartCallback> mCallback;
|
||||
};
|
||||
|
||||
class UnderlyingSinkWriteCallbackHelper : public nsISupports {
|
||||
public:
|
||||
NS_DECL_CYCLE_COLLECTING_ISUPPORTS
|
||||
NS_DECL_CYCLE_COLLECTION_SCRIPT_HOLDER_CLASS(
|
||||
UnderlyingSinkWriteCallbackHelper)
|
||||
|
||||
explicit UnderlyingSinkWriteCallbackHelper(
|
||||
UnderlyingSinkWriteCallback* aCallback,
|
||||
JS::Handle<JSObject*> aUnderlyingSink)
|
||||
: mUnderlyingSink(aUnderlyingSink), mCallback(aCallback) {
|
||||
MOZ_ASSERT(mCallback);
|
||||
mozilla::HoldJSObjects(this);
|
||||
}
|
||||
|
||||
MOZ_CAN_RUN_SCRIPT
|
||||
already_AddRefed<Promise> WriteCallback(
|
||||
JSContext* aCx, JS::Handle<JS::Value> aChunk,
|
||||
WritableStreamDefaultController& aController, ErrorResult& aRv);
|
||||
|
||||
protected:
|
||||
virtual ~UnderlyingSinkWriteCallbackHelper() { mozilla::DropJSObjects(this); }
|
||||
|
||||
private:
|
||||
JS::Heap<JSObject*> mUnderlyingSink;
|
||||
RefPtr<UnderlyingSinkWriteCallback> mCallback;
|
||||
};
|
||||
|
||||
class UnderlyingSinkCloseCallbackHelper : public nsISupports {
|
||||
public:
|
||||
NS_DECL_CYCLE_COLLECTING_ISUPPORTS
|
||||
NS_DECL_CYCLE_COLLECTION_SCRIPT_HOLDER_CLASS(
|
||||
UnderlyingSinkCloseCallbackHelper)
|
||||
|
||||
UnderlyingSinkCloseCallbackHelper(UnderlyingSinkCloseCallback* aCallback,
|
||||
JS::Handle<JSObject*> aUnderlyingSink)
|
||||
: mUnderlyingSink(aUnderlyingSink), mCallback(aCallback) {
|
||||
MOZ_ASSERT(mCallback);
|
||||
mozilla::HoldJSObjects(this);
|
||||
}
|
||||
|
||||
MOZ_CAN_RUN_SCRIPT
|
||||
already_AddRefed<Promise> CloseCallback(JSContext* aCx, ErrorResult& aRv);
|
||||
|
||||
protected:
|
||||
virtual ~UnderlyingSinkCloseCallbackHelper() { mozilla::DropJSObjects(this); }
|
||||
|
||||
private:
|
||||
JS::Heap<JSObject*> mUnderlyingSink;
|
||||
RefPtr<UnderlyingSinkCloseCallback> mCallback;
|
||||
};
|
||||
|
||||
// Abstract over the implementation details for the UnderlyinSinkAbortCallback
|
||||
class UnderlyingSinkAbortCallbackHelper : public nsISupports {
|
||||
public:
|
||||
NS_DECL_CYCLE_COLLECTING_ISUPPORTS
|
||||
NS_DECL_CYCLE_COLLECTION_SCRIPT_HOLDER_CLASS(
|
||||
UnderlyingSinkAbortCallbackHelper)
|
||||
|
||||
UnderlyingSinkAbortCallbackHelper(UnderlyingSinkAbortCallback* aCallback,
|
||||
JS::Handle<JSObject*> aUnderlyingSink)
|
||||
: mUnderlyingSink(aUnderlyingSink), mCallback(aCallback) {
|
||||
MOZ_ASSERT(mCallback);
|
||||
mozilla::HoldJSObjects(this);
|
||||
}
|
||||
|
||||
MOZ_CAN_RUN_SCRIPT
|
||||
already_AddRefed<Promise> AbortCallback(
|
||||
JSContext* aCx, const Optional<JS::Handle<JS::Value>>& aReason,
|
||||
ErrorResult& aRv);
|
||||
|
||||
protected:
|
||||
virtual ~UnderlyingSinkAbortCallbackHelper() { mozilla::DropJSObjects(this); }
|
||||
|
||||
private:
|
||||
JS::Heap<JSObject*> mUnderlyingSink;
|
||||
RefPtr<UnderlyingSinkAbortCallback> mCallback;
|
||||
};
|
||||
|
||||
} // namespace mozilla::dom
|
||||
|
||||
#endif
|
776
dom/streams/WritableStream.cpp
Normal file
776
dom/streams/WritableStream.cpp
Normal file
@ -0,0 +1,776 @@
|
||||
/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
|
||||
/* vim:set ts=2 sw=2 sts=2 et cindent: */
|
||||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
#include "mozilla/dom/WritableStream.h"
|
||||
#include "js/Array.h"
|
||||
#include "js/PropertyAndElement.h"
|
||||
#include "js/TypeDecls.h"
|
||||
#include "js/Value.h"
|
||||
#include "mozilla/AlreadyAddRefed.h"
|
||||
#include "mozilla/Assertions.h"
|
||||
#include "mozilla/Attributes.h"
|
||||
#include "mozilla/CycleCollectedJSContext.h"
|
||||
#include "mozilla/FloatingPoint.h"
|
||||
#include "mozilla/HoldDropJSObjects.h"
|
||||
#include "mozilla/dom/AbortSignal.h"
|
||||
#include "mozilla/dom/BindingCallContext.h"
|
||||
#include "mozilla/dom/ModuleMapKey.h"
|
||||
#include "mozilla/dom/QueueWithSizes.h"
|
||||
#include "mozilla/dom/QueuingStrategyBinding.h"
|
||||
#include "mozilla/dom/ReadRequest.h"
|
||||
#include "mozilla/dom/StreamUtils.h"
|
||||
#include "mozilla/dom/UnderlyingSinkBinding.h"
|
||||
#include "mozilla/dom/WritableStreamBinding.h"
|
||||
#include "mozilla/dom/WritableStreamDefaultController.h"
|
||||
#include "mozilla/dom/WritableStreamDefaultWriter.h"
|
||||
#include "nsCOMPtr.h"
|
||||
|
||||
#include "mozilla/dom/Promise-inl.h"
|
||||
#include "nsIGlobalObject.h"
|
||||
#include "nsISupports.h"
|
||||
|
||||
#include <unistd.h>
|
||||
|
||||
namespace mozilla::dom {
|
||||
|
||||
NS_IMPL_CYCLE_COLLECTION_CLASS(WritableStream)
|
||||
NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(WritableStream)
|
||||
NS_IMPL_CYCLE_COLLECTION_UNLINK(mGlobal, mCloseRequest, mController,
|
||||
mInFlightWriteRequest, mInFlightCloseRequest,
|
||||
mPendingAbortRequestPromise, mWriter,
|
||||
mWriteRequests)
|
||||
NS_IMPL_CYCLE_COLLECTION_UNLINK_PRESERVED_WRAPPER
|
||||
tmp->mPendingAbortRequestReason.setNull();
|
||||
tmp->mStoredError.setNull();
|
||||
NS_IMPL_CYCLE_COLLECTION_UNLINK_END
|
||||
NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN(WritableStream)
|
||||
NS_IMPL_CYCLE_COLLECTION_TRAVERSE(
|
||||
mGlobal, mCloseRequest, mController, mInFlightWriteRequest,
|
||||
mInFlightCloseRequest, mWriter, mWriteRequests)
|
||||
NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
|
||||
|
||||
NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN(WritableStream)
|
||||
NS_IMPL_CYCLE_COLLECTION_TRACE_PRESERVED_WRAPPER
|
||||
NS_IMPL_CYCLE_COLLECTION_TRACE_JS_MEMBER_CALLBACK(mPendingAbortRequestReason)
|
||||
NS_IMPL_CYCLE_COLLECTION_TRACE_JS_MEMBER_CALLBACK(mStoredError)
|
||||
NS_IMPL_CYCLE_COLLECTION_TRACE_END
|
||||
|
||||
NS_IMPL_CYCLE_COLLECTING_ADDREF(WritableStream)
|
||||
NS_IMPL_CYCLE_COLLECTING_RELEASE(WritableStream)
|
||||
NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(WritableStream)
|
||||
NS_WRAPPERCACHE_INTERFACE_MAP_ENTRY
|
||||
NS_INTERFACE_MAP_ENTRY(nsISupports)
|
||||
NS_INTERFACE_MAP_END
|
||||
|
||||
WritableStream::WritableStream(nsIGlobalObject* aGlobal) : mGlobal(aGlobal) {
|
||||
mozilla::HoldJSObjects(this);
|
||||
}
|
||||
|
||||
WritableStream::WritableStream(const GlobalObject& aGlobal)
|
||||
: mGlobal(do_QueryInterface(aGlobal.GetAsSupports())) {
|
||||
mozilla::HoldJSObjects(this);
|
||||
}
|
||||
|
||||
WritableStream::~WritableStream() { mozilla::DropJSObjects(this); }
|
||||
|
||||
JSObject* WritableStream::WrapObject(JSContext* aCx,
|
||||
JS::Handle<JSObject*> aGivenProto) {
|
||||
return WritableStream_Binding::Wrap(aCx, this, aGivenProto);
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#writable-stream-deal-with-rejection
|
||||
void WritableStream::DealWithRejection(JSContext* aCx,
|
||||
JS::Handle<JS::Value> aError,
|
||||
ErrorResult& aRv) {
|
||||
// Step 1. Let state be stream.[[state]].
|
||||
// Step 2. If state is "writable",
|
||||
if (mState == WriterState::Writable) {
|
||||
// Step 2.1. Perform ! WritableStreamStartErroring(stream, error).
|
||||
StartErroring(aCx, aError, aRv);
|
||||
|
||||
// Step 2.2. Return.
|
||||
return;
|
||||
}
|
||||
|
||||
// Step 3. Assert: state is "erroring".
|
||||
MOZ_ASSERT(mState == WriterState::Erroring);
|
||||
|
||||
// Step 4. Perform ! WritableStreamFinishErroring(stream).
|
||||
FinishErroring(aCx, aRv);
|
||||
}
|
||||
|
||||
class AbortStepsNativePromiseHandler final : public PromiseNativeHandler {
|
||||
~AbortStepsNativePromiseHandler() = default;
|
||||
|
||||
RefPtr<WritableStream> mStream;
|
||||
RefPtr<Promise> mAbortRequestPromise;
|
||||
|
||||
public:
|
||||
NS_DECL_CYCLE_COLLECTING_ISUPPORTS
|
||||
NS_DECL_CYCLE_COLLECTION_CLASS(AbortStepsNativePromiseHandler)
|
||||
|
||||
explicit AbortStepsNativePromiseHandler(WritableStream* aStream,
|
||||
Promise* aAbortRequestPromise)
|
||||
: PromiseNativeHandler(),
|
||||
mStream(aStream),
|
||||
mAbortRequestPromise(aAbortRequestPromise) {}
|
||||
|
||||
void ResolvedCallback(JSContext* aCx, JS::Handle<JS::Value> aValue) override {
|
||||
// https://streams.spec.whatwg.org/#writable-stream-finish-erroring
|
||||
|
||||
// Step 13. Upon fulfillment of promise,
|
||||
// Step 13.1. Resolve abortRequest’s promise with undefined.
|
||||
mAbortRequestPromise->MaybeResolveWithUndefined();
|
||||
|
||||
// Step 13.2. Perform !
|
||||
// WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
|
||||
mStream->RejectCloseAndClosedPromiseIfNeeded();
|
||||
}
|
||||
|
||||
void RejectedCallback(JSContext* aCx, JS::Handle<JS::Value> aValue) override {
|
||||
// https://streams.spec.whatwg.org/#writable-stream-finish-erroring
|
||||
|
||||
// Step 14. Upon rejection of promise with reason reason,
|
||||
// Step 14.1. Reject abortRequest’s promise with reason.
|
||||
mAbortRequestPromise->MaybeReject(aValue);
|
||||
|
||||
// Step 14.2. Perform !
|
||||
// WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
|
||||
mStream->RejectCloseAndClosedPromiseIfNeeded();
|
||||
}
|
||||
};
|
||||
|
||||
NS_IMPL_CYCLE_COLLECTION(AbortStepsNativePromiseHandler, mStream,
|
||||
mAbortRequestPromise)
|
||||
NS_IMPL_CYCLE_COLLECTING_ADDREF(AbortStepsNativePromiseHandler)
|
||||
NS_IMPL_CYCLE_COLLECTING_RELEASE(AbortStepsNativePromiseHandler)
|
||||
NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(AbortStepsNativePromiseHandler)
|
||||
NS_INTERFACE_MAP_ENTRY(nsISupports)
|
||||
NS_INTERFACE_MAP_END
|
||||
|
||||
// https://streams.spec.whatwg.org/#writable-stream-finish-erroring
|
||||
void WritableStream::FinishErroring(JSContext* aCx, ErrorResult& aRv) {
|
||||
// Step 1. Assert: stream.[[state]] is "erroring".
|
||||
MOZ_ASSERT(mState == WriterState::Erroring);
|
||||
|
||||
// Step 2. Assert: ! WritableStreamHasOperationMarkedInFlight(stream) is
|
||||
// false.
|
||||
MOZ_ASSERT(!HasOperationMarkedInFlight());
|
||||
|
||||
// Step 3. Set stream.[[state]] to "errored".
|
||||
mState = WriterState::Errored;
|
||||
|
||||
// Step 4. Perform ! stream.[[controller]].[[ErrorSteps]]().
|
||||
Controller()->ErrorSteps();
|
||||
|
||||
// Step 5. Let storedError be stream.[[storedError]].
|
||||
JS::Rooted<JS::Value> storedError(aCx, mStoredError);
|
||||
|
||||
// Step 6. For each writeRequest of stream.[[writeRequests]]:
|
||||
for (const RefPtr<Promise>& writeRequest : mWriteRequests) {
|
||||
// Step 6.1. Reject writeRequest with storedError.
|
||||
writeRequest->MaybeReject(storedError);
|
||||
}
|
||||
|
||||
// Step 7. Set stream.[[writeRequests]] to an empty list.
|
||||
mWriteRequests.Clear();
|
||||
|
||||
// Step 8. If stream.[[pendingAbortRequest]] is undefined,
|
||||
if (!mPendingAbortRequestPromise) {
|
||||
// Step 8.1. Perform !
|
||||
// WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
|
||||
RejectCloseAndClosedPromiseIfNeeded();
|
||||
|
||||
// Step 8.2. Return.
|
||||
return;
|
||||
}
|
||||
|
||||
// Step 9. Let abortRequest be stream.[[pendingAbortRequest]].
|
||||
RefPtr<Promise> abortPromise = mPendingAbortRequestPromise;
|
||||
JS::Rooted<JS::Value> abortReason(aCx, mPendingAbortRequestReason);
|
||||
bool abortWasAlreadyErroring = mPendingAbortRequestWasAlreadyErroring;
|
||||
|
||||
// Step 10. Set stream.[[pendingAbortRequest]] to undefined.
|
||||
SetPendingAbortRequest(nullptr, JS::UndefinedHandleValue, false);
|
||||
|
||||
// Step 11. If abortRequest’s was already erroring is true,
|
||||
if (abortWasAlreadyErroring) {
|
||||
// Step 11.1. Reject abortRequest’s promise with storedError.
|
||||
abortPromise->MaybeReject(storedError);
|
||||
|
||||
// Step 11.2. Perform !
|
||||
// WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
|
||||
RejectCloseAndClosedPromiseIfNeeded();
|
||||
|
||||
// Step 11.3. Return.
|
||||
return;
|
||||
}
|
||||
|
||||
// Step 12. Let promise be !
|
||||
// stream.[[controller]].[[AbortSteps]](abortRequest’s reason).
|
||||
RefPtr<WritableStreamDefaultController> controller = mController;
|
||||
RefPtr<Promise> promise = controller->AbortSteps(aCx, abortReason, aRv);
|
||||
if (aRv.Failed()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Step 13 + 14.
|
||||
promise->AppendNativeHandler(
|
||||
new AbortStepsNativePromiseHandler(this, abortPromise));
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-close
|
||||
void WritableStream::FinishInFlightClose() {
|
||||
// Step 1. Assert: stream.[[inFlightCloseRequest]] is not undefined.
|
||||
MOZ_ASSERT(mInFlightCloseRequest);
|
||||
|
||||
// Step 2. Resolve stream.[[inFlightCloseRequest]] with undefined.
|
||||
mInFlightCloseRequest->MaybeResolveWithUndefined();
|
||||
|
||||
// Step 3. Set stream.[[inFlightCloseRequest]] to undefined.
|
||||
mInFlightCloseRequest = nullptr;
|
||||
|
||||
// Step 4. Let state be stream.[[state]].
|
||||
// Step 5. Assert: stream.[[state]] is "writable" or "erroring".
|
||||
MOZ_ASSERT(mState == WriterState::Writable ||
|
||||
mState == WriterState::Erroring);
|
||||
|
||||
// Step 6. If state is "erroring",
|
||||
if (mState == WriterState::Erroring) {
|
||||
// Step 6.1. Set stream.[[storedError]] to undefined.
|
||||
mStoredError.setUndefined();
|
||||
|
||||
// Step 6.2. If stream.[[pendingAbortRequest]] is not undefined,
|
||||
if (mPendingAbortRequestPromise) {
|
||||
// Step 6.2.1. Resolve stream.[[pendingAbortRequest]]'s promise with
|
||||
// undefined.
|
||||
mPendingAbortRequestPromise->MaybeResolveWithUndefined();
|
||||
|
||||
// Step 6.2.2. Set stream.[[pendingAbortRequest]] to undefined.
|
||||
SetPendingAbortRequest(nullptr, JS::UndefinedHandleValue, false);
|
||||
}
|
||||
}
|
||||
|
||||
// Step 7. Set stream.[[state]] to "closed".
|
||||
mState = WriterState::Closed;
|
||||
|
||||
// Step 8. Let writer be stream.[[writer]].
|
||||
// Step 9. If writer is not undefined, resolve writer.[[closedPromise]] with
|
||||
// undefined.
|
||||
if (mWriter) {
|
||||
mWriter->ClosedPromise()->MaybeResolveWithUndefined();
|
||||
}
|
||||
|
||||
// Step 10. Assert: stream.[[pendingAbortRequest]] is undefined.
|
||||
MOZ_ASSERT(!mPendingAbortRequestPromise);
|
||||
// Assert: stream.[[storedError]] is undefined.
|
||||
MOZ_ASSERT(mStoredError.isUndefined());
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-close-with-error
|
||||
void WritableStream::FinishInFlightCloseWithError(JSContext* aCx,
|
||||
JS::Handle<JS::Value> aError,
|
||||
ErrorResult& aRv) {
|
||||
// Step 1. Assert: stream.[[inFlightCloseRequest]] is not undefined.
|
||||
MOZ_ASSERT(mInFlightCloseRequest);
|
||||
|
||||
// Step 2. Reject stream.[[inFlightCloseRequest]] with error.
|
||||
mInFlightCloseRequest->MaybeReject(aError);
|
||||
|
||||
// Step 3. Set stream.[[inFlightCloseRequest]] to undefined.
|
||||
mInFlightCloseRequest = nullptr;
|
||||
|
||||
// Step 4. Assert: stream.[[state]] is "writable" or "erroring".
|
||||
MOZ_ASSERT(mState == WriterState::Writable ||
|
||||
mState == WriterState::Erroring);
|
||||
|
||||
// Step 5. If stream.[[pendingAbortRequest]] is not undefined,
|
||||
if (mPendingAbortRequestPromise) {
|
||||
// Step 5.1. Reject stream.[[pendingAbortRequest]]'s promise with error.
|
||||
mPendingAbortRequestPromise->MaybeReject(aError);
|
||||
|
||||
// Step 5.2. Set stream.[[pendingAbortRequest]] to undefined.
|
||||
SetPendingAbortRequest(nullptr, JS::UndefinedHandleValue, false);
|
||||
}
|
||||
|
||||
// Step 6. Perform ! WritableStreamDealWithRejection(stream, error).
|
||||
DealWithRejection(aCx, aError, aRv);
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-write
|
||||
void WritableStream::FinishInFlightWrite() {
|
||||
// Step 1. Assert: stream.[[inFlightWriteRequest]] is not undefined.
|
||||
MOZ_ASSERT(mInFlightWriteRequest);
|
||||
|
||||
// Step 2. Resolve stream.[[inFlightWriteRequest]] with undefined.
|
||||
mInFlightWriteRequest->MaybeResolveWithUndefined();
|
||||
|
||||
// Step 3. Set stream.[[inFlightWriteRequest]] to undefined.
|
||||
mInFlightWriteRequest = nullptr;
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-write-with-error
|
||||
void WritableStream::FinishInFlightWriteWithError(JSContext* aCx,
|
||||
JS::Handle<JS::Value> aError,
|
||||
ErrorResult& aRv) {
|
||||
// Step 1. Assert: stream.[[inFlightWriteRequest]] is not undefined.
|
||||
MOZ_ASSERT(mInFlightWriteRequest);
|
||||
|
||||
// Step 2. Reject stream.[[inFlightWriteRequest]] with error.
|
||||
mInFlightWriteRequest->MaybeReject(aError);
|
||||
|
||||
// Step 3. Set stream.[[inFlightWriteRequest]] to undefined.
|
||||
mInFlightWriteRequest = nullptr;
|
||||
|
||||
// Step 4. Assert: stream.[[state]] is "writable" or "erroring".
|
||||
MOZ_ASSERT(mState == WriterState::Writable ||
|
||||
mState == WriterState::Erroring);
|
||||
|
||||
// Step 5. Perform ! WritableStreamDealWithRejection(stream, error).
|
||||
DealWithRejection(aCx, aError, aRv);
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#writable-stream-mark-close-request-in-flight
|
||||
void WritableStream::MarkCloseRequestInFlight() {
|
||||
// Step 1. Assert: stream.[[inFlightCloseRequest]] is undefined.
|
||||
MOZ_ASSERT(!mInFlightCloseRequest);
|
||||
|
||||
// Step 2. Assert: stream.[[closeRequest]] is not undefined.
|
||||
MOZ_ASSERT(mCloseRequest);
|
||||
|
||||
// Step 3. Set stream.[[inFlightCloseRequest]] to stream.[[closeRequest]].
|
||||
mInFlightCloseRequest = mCloseRequest;
|
||||
|
||||
// Step 4. Set stream.[[closeRequest]] to undefined.
|
||||
mCloseRequest = nullptr;
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#writable-stream-mark-first-write-request-in-flight
|
||||
void WritableStream::MarkFirstWriteRequestInFlight() {
|
||||
// Step 1. Assert: stream.[[inFlightWriteRequest]] is undefined.
|
||||
MOZ_ASSERT(!mInFlightWriteRequest);
|
||||
|
||||
// Step 2. Assert: stream.[[writeRequests]] is not empty.
|
||||
MOZ_ASSERT(!mWriteRequests.IsEmpty());
|
||||
|
||||
// Step 3. Let writeRequest be stream.[[writeRequests]][0].
|
||||
RefPtr<Promise> writeRequest = mWriteRequests.ElementAt(0);
|
||||
|
||||
// Step 4. Remove writeRequest from stream.[[writeRequests]].
|
||||
mWriteRequests.RemoveElementAt(0);
|
||||
|
||||
// Step 5. Set stream.[[inFlightWriteRequest]] to writeRequest.
|
||||
mInFlightWriteRequest = writeRequest;
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#writable-stream-reject-close-and-closed-promise-if-needed
|
||||
void WritableStream::RejectCloseAndClosedPromiseIfNeeded() {
|
||||
// Step 1. Assert: stream.[[state]] is "errored".
|
||||
MOZ_ASSERT(mState == WriterState::Errored);
|
||||
|
||||
JS::Rooted<JS::Value> storedError(RootingCx(), mStoredError);
|
||||
// Step 2. If stream.[[closeRequest]] is not undefined,
|
||||
if (mCloseRequest) {
|
||||
// Step 2.1. Assert: stream.[[inFlightCloseRequest]] is undefined.
|
||||
MOZ_ASSERT(!mInFlightCloseRequest);
|
||||
|
||||
// Step 2.2. Reject stream.[[closeRequest]] with stream.[[storedError]].
|
||||
mCloseRequest->MaybeReject(storedError);
|
||||
|
||||
// Step 2.3. Set stream.[[closeRequest]] to undefined.
|
||||
mCloseRequest = nullptr;
|
||||
}
|
||||
|
||||
// Step 3. Let writer be stream.[[writer]].
|
||||
RefPtr<WritableStreamDefaultWriter> writer = mWriter;
|
||||
|
||||
// Step 4. If writer is not undefined,
|
||||
if (writer) {
|
||||
// Step 4.1. Reject writer.[[closedPromise]] with stream.[[storedError]].
|
||||
RefPtr<Promise> closedPromise = writer->ClosedPromise();
|
||||
closedPromise->MaybeReject(storedError);
|
||||
|
||||
// Step 4.2. Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
|
||||
closedPromise->SetSettledPromiseIsHandled();
|
||||
}
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#writable-stream-start-erroring
|
||||
void WritableStream::StartErroring(JSContext* aCx,
|
||||
JS::Handle<JS::Value> aReason,
|
||||
ErrorResult& aRv) {
|
||||
// Step 1. Assert: stream.[[storedError]] is undefined.
|
||||
MOZ_ASSERT(mStoredError.isUndefined());
|
||||
|
||||
// Step 2. Assert: stream.[[state]] is "writable".
|
||||
MOZ_ASSERT(mState == WriterState::Writable);
|
||||
|
||||
// Step 3. Let controller be stream.[[controller]].
|
||||
RefPtr<WritableStreamDefaultController> controller = mController;
|
||||
// Step 4. Assert: controller is not undefined.
|
||||
MOZ_ASSERT(controller);
|
||||
|
||||
// Step 5. Set stream.[[state]] to "erroring".
|
||||
mState = WriterState::Erroring;
|
||||
|
||||
// Step 6. Set stream.[[storedError]] to reason.
|
||||
mStoredError = aReason;
|
||||
|
||||
// Step 7. Let writer be stream.[[writer]].
|
||||
RefPtr<WritableStreamDefaultWriter> writer = mWriter;
|
||||
// Step 8. If writer is not undefined, perform !
|
||||
// WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason).
|
||||
if (writer) {
|
||||
WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, aReason, aRv);
|
||||
if (aRv.Failed()) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Step 9. If ! WritableStreamHasOperationMarkedInFlight(stream) is false
|
||||
// and controller.[[started]] is true,
|
||||
// perform !WritableStreamFinishErroring(stream).
|
||||
if (!HasOperationMarkedInFlight() && controller->Started()) {
|
||||
FinishErroring(aCx, aRv);
|
||||
}
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#writable-stream-update-backpressure
|
||||
void WritableStream::UpdateBackpressure(bool aBackpressure, ErrorResult& aRv) {
|
||||
// Step 1. Assert: stream.[[state]] is "writable".
|
||||
MOZ_ASSERT(mState == WriterState::Writable);
|
||||
// Step 2. Assert: ! WritableStreamCloseQueuedOrInFlight(stream) is false.
|
||||
MOZ_ASSERT(!CloseQueuedOrInFlight());
|
||||
|
||||
// Step 3. Let writer be stream.[[writer]].
|
||||
RefPtr<WritableStreamDefaultWriter> writer = mWriter;
|
||||
|
||||
// Step 4. If writer is not undefined and backpressure is not
|
||||
// stream.[[backpressure]],
|
||||
if (writer && aBackpressure != mBackpressure) {
|
||||
// Step 4.1. If backpressure is true, set writer.[[readyPromise]] to a new
|
||||
// promise.
|
||||
if (aBackpressure) {
|
||||
RefPtr<Promise> promise = Promise::Create(writer->GetParentObject(), aRv);
|
||||
if (aRv.Failed()) {
|
||||
return;
|
||||
}
|
||||
writer->SetReadyPromise(promise);
|
||||
} else {
|
||||
// Step 4.2. Otherwise,
|
||||
// Step 4.2.1. Assert: backpressure is false.
|
||||
// Step 4.2.2. Resolve writer.[[readyPromise]] with undefined.
|
||||
writer->ReadyPromise()->MaybeResolveWithUndefined();
|
||||
}
|
||||
}
|
||||
|
||||
// Step 5. Set stream.[[backpressure]] to backpressure.
|
||||
mBackpressure = aBackpressure;
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#ws-constructor
|
||||
already_AddRefed<WritableStream> WritableStream::Constructor(
|
||||
const GlobalObject& aGlobal,
|
||||
const Optional<JS::Handle<JSObject*>>& aUnderlyingSink,
|
||||
const QueuingStrategy& aStrategy, ErrorResult& aRv) {
|
||||
// Step 1. If underlyingSink is missing, set it to null.
|
||||
JS::Rooted<JSObject*> underlyingSinkObj(
|
||||
aGlobal.Context(),
|
||||
aUnderlyingSink.WasPassed() ? aUnderlyingSink.Value() : nullptr);
|
||||
|
||||
// Step 2. Let underlyingSinkDict be underlyingSink, converted to
|
||||
// an IDL value of type UnderlyingSink.
|
||||
UnderlyingSink underlyingSinkDict;
|
||||
if (underlyingSinkObj) {
|
||||
JS::Rooted<JS::Value> objValue(aGlobal.Context(),
|
||||
JS::ObjectValue(*underlyingSinkObj));
|
||||
dom::BindingCallContext callCx(aGlobal.Context(),
|
||||
"WritableStream.constructor");
|
||||
aRv.MightThrowJSException();
|
||||
if (!underlyingSinkDict.Init(callCx, objValue)) {
|
||||
aRv.StealExceptionFromJSContext(aGlobal.Context());
|
||||
return nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
// Step 3. If underlyingSinkDict["type"] exists, throw a RangeError exception.
|
||||
if (!underlyingSinkDict.mType.isUndefined()) {
|
||||
aRv.ThrowRangeError("Implementation preserved member 'type'");
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Step 4. Perform ! InitializeWritableStream(this).
|
||||
RefPtr<WritableStream> writableStream = new WritableStream(aGlobal);
|
||||
|
||||
// Step 5. Let sizeAlgorithm be ! ExtractSizeAlgorithm(strategy).
|
||||
//
|
||||
// Implementation Note: The specification demands that if the size doesn't
|
||||
// exist, we instead would provide an algorithm that returns 1. Instead, we
|
||||
// will teach callers that a missing callback should simply return 1, rather
|
||||
// than gin up a fake callback here.
|
||||
//
|
||||
// This decision may need to be revisited if the default action ever diverges
|
||||
// within the specification.
|
||||
RefPtr<QueuingStrategySize> sizeAlgorithm =
|
||||
aStrategy.mSize.WasPassed() ? &aStrategy.mSize.Value() : nullptr;
|
||||
|
||||
// Step 6. Let highWaterMark be ? ExtractHighWaterMark(strategy, 1).
|
||||
double highWaterMark = ExtractHighWaterMark(aStrategy, 1, aRv);
|
||||
if (aRv.Failed()) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Step 7. Perform ? SetUpWritableStreamDefaultControllerFromUnderlyingSink(
|
||||
// this, underlyingSink, underlyingSinkDict, highWaterMark, sizeAlgorithm).
|
||||
SetUpWritableStreamDefaultControllerFromUnderlyingSink(
|
||||
aGlobal.Context(), writableStream, underlyingSinkObj, underlyingSinkDict,
|
||||
highWaterMark, sizeAlgorithm, aRv);
|
||||
if (aRv.Failed()) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
return writableStream.forget();
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#writable-stream-abort
|
||||
already_AddRefed<Promise> WritableStreamAbort(JSContext* aCx,
|
||||
WritableStream* aStream,
|
||||
JS::Handle<JS::Value> aReason,
|
||||
ErrorResult& aRv) {
|
||||
// Step 1. If stream.[[state]] is "closed" or "errored", return a promise
|
||||
// resolved with undefined.
|
||||
if (aStream->State() == WritableStream::WriterState::Closed ||
|
||||
aStream->State() == WritableStream::WriterState::Errored) {
|
||||
RefPtr<Promise> promise = Promise::Create(aStream->GetParentObject(), aRv);
|
||||
if (aRv.Failed()) {
|
||||
return nullptr;
|
||||
}
|
||||
promise->MaybeResolveWithUndefined();
|
||||
return promise.forget();
|
||||
}
|
||||
|
||||
// Step 2. Signal abort on stream.[[controller]].[[signal]] with reason.
|
||||
RefPtr<WritableStreamDefaultController> controller = aStream->Controller();
|
||||
controller->Signal()->SignalAbort(aReason);
|
||||
|
||||
// Step 3. Let state be stream.[[state]].
|
||||
WritableStream::WriterState state = aStream->State();
|
||||
|
||||
// Step 4. If state is "closed" or "errored", return a promise resolved with
|
||||
// undefined. Note: We re-check the state because signaling abort runs author
|
||||
// code and that might have changed the state.
|
||||
if (aStream->State() == WritableStream::WriterState::Closed ||
|
||||
aStream->State() == WritableStream::WriterState::Errored) {
|
||||
RefPtr<Promise> promise = Promise::Create(aStream->GetParentObject(), aRv);
|
||||
if (aRv.Failed()) {
|
||||
return nullptr;
|
||||
}
|
||||
promise->MaybeResolveWithUndefined();
|
||||
return promise.forget();
|
||||
}
|
||||
|
||||
// Step 5. If stream.[[pendingAbortRequest]] is not undefined, return
|
||||
// stream.[[pendingAbortRequest]]'s promise.
|
||||
if (aStream->GetPendingAbortRequestPromise()) {
|
||||
RefPtr<Promise> promise = aStream->GetPendingAbortRequestPromise();
|
||||
return promise.forget();
|
||||
}
|
||||
|
||||
// Step 6. Assert: state is "writable" or "erroring".
|
||||
MOZ_ASSERT(state == WritableStream::WriterState::Writable ||
|
||||
state == WritableStream::WriterState::Erroring);
|
||||
|
||||
// Step 7. Let wasAlreadyErroring be false.
|
||||
bool wasAlreadyErroring = false;
|
||||
|
||||
// Step 8. If state is "erroring",
|
||||
JS::Rooted<JS::Value> reason(aCx, aReason);
|
||||
if (state == WritableStream::WriterState::Erroring) {
|
||||
// Step 8.1. Set wasAlreadyErroring to true.
|
||||
wasAlreadyErroring = true;
|
||||
// Step 8.2. Set reason to undefined.
|
||||
reason.setUndefined();
|
||||
}
|
||||
|
||||
// Step 9. Let promise be a new promise.
|
||||
RefPtr<Promise> promise = Promise::Create(aStream->GetParentObject(), aRv);
|
||||
if (aRv.Failed()) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Step 10. Set stream.[[pendingAbortRequest]] to a new pending abort request
|
||||
// whose promise is promise, reason is reason, and was already erroring is
|
||||
// wasAlreadyErroring.
|
||||
aStream->SetPendingAbortRequest(promise, reason, wasAlreadyErroring);
|
||||
|
||||
// Step 11. If wasAlreadyErroring is false, perform !
|
||||
// WritableStreamStartErroring(stream, reason).
|
||||
if (!wasAlreadyErroring) {
|
||||
aStream->StartErroring(aCx, reason, aRv);
|
||||
if (aRv.Failed()) {
|
||||
return nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
// Step 12. Return promise.
|
||||
return promise.forget();
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#ws-abort
|
||||
MOZ_CAN_RUN_SCRIPT
|
||||
already_AddRefed<Promise> WritableStream::Abort(JSContext* aCx,
|
||||
JS::Handle<JS::Value> aReason,
|
||||
ErrorResult& aRv) {
|
||||
// Step 1. If ! IsWritableStreamLocked(this) is true, return a promise
|
||||
// rejected with a TypeError exception.
|
||||
if (Locked()) {
|
||||
RefPtr<Promise> promise = Promise::Create(GetParentObject(), aRv);
|
||||
if (aRv.Failed()) {
|
||||
return nullptr;
|
||||
}
|
||||
promise->MaybeRejectWithTypeError("Canceled Locked Stream");
|
||||
return promise.forget();
|
||||
}
|
||||
|
||||
// Step 2. Return ! WritableStreamAbort(this, reason).
|
||||
RefPtr<WritableStream> thisRefPtr = this;
|
||||
return WritableStreamAbort(aCx, thisRefPtr, aReason, aRv);
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#writable-stream-close
|
||||
already_AddRefed<Promise> WritableStreamClose(JSContext* aCx,
|
||||
WritableStream* aStream,
|
||||
ErrorResult& aRv) {
|
||||
// Step 1. Let state be stream.[[state]].
|
||||
WritableStream::WriterState state = aStream->State();
|
||||
|
||||
// Step 2. If state is "closed" or "errored", return a promise rejected with a
|
||||
// TypeError exception.
|
||||
if (state == WritableStream::WriterState::Closed ||
|
||||
state == WritableStream::WriterState::Errored) {
|
||||
RefPtr<Promise> promise = Promise::Create(aStream->GetParentObject(), aRv);
|
||||
if (aRv.Failed()) {
|
||||
return nullptr;
|
||||
}
|
||||
promise->MaybeRejectWithTypeError(
|
||||
"Can not close stream after closing or error");
|
||||
return promise.forget();
|
||||
}
|
||||
|
||||
// Step 3. Assert: state is "writable" or "erroring".
|
||||
MOZ_ASSERT(state == WritableStream::WriterState::Writable ||
|
||||
state == WritableStream::WriterState::Erroring);
|
||||
|
||||
// Step 4. Assert: ! WritableStreamCloseQueuedOrInFlight(stream) is false.
|
||||
MOZ_ASSERT(!aStream->CloseQueuedOrInFlight());
|
||||
|
||||
// Step 5. Let promise be a new promise.
|
||||
RefPtr<Promise> promise = Promise::Create(aStream->GetParentObject(), aRv);
|
||||
if (aRv.Failed()) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Step 6. Set stream.[[closeRequest]] to promise.
|
||||
aStream->SetCloseRequest(promise);
|
||||
|
||||
// Step 7. Let writer be stream.[[writer]].
|
||||
RefPtr<WritableStreamDefaultWriter> writer = aStream->GetWriter();
|
||||
|
||||
// Step 8. If writer is not undefined, and stream.[[backpressure]] is true,
|
||||
// and state is "writable", resolve writer.[[readyPromise]] with undefined.
|
||||
if (writer && aStream->Backpressure() &&
|
||||
state == WritableStream::WriterState::Writable) {
|
||||
writer->ReadyPromise()->MaybeResolveWithUndefined();
|
||||
}
|
||||
|
||||
// Step 9.
|
||||
// Perform ! WritableStreamDefaultControllerClose(stream.[[controller]]).
|
||||
RefPtr<WritableStreamDefaultController> controller = aStream->Controller();
|
||||
WritableStreamDefaultControllerClose(aCx, controller, aRv);
|
||||
if (aRv.Failed()) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Step 10. Return promise.
|
||||
return promise.forget();
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#ws-close
|
||||
MOZ_CAN_RUN_SCRIPT
|
||||
already_AddRefed<Promise> WritableStream::Close(JSContext* aCx,
|
||||
ErrorResult& aRv) {
|
||||
// Step 1. If ! IsWritableStreamLocked(this) is true, return a promise
|
||||
// rejected with a TypeError exception.
|
||||
if (Locked()) {
|
||||
RefPtr<Promise> promise = Promise::Create(GetParentObject(), aRv);
|
||||
if (aRv.Failed()) {
|
||||
return nullptr;
|
||||
}
|
||||
promise->MaybeRejectWithTypeError("Can not close locked stream");
|
||||
return promise.forget();
|
||||
}
|
||||
|
||||
// Step 2. If ! WritableStreamCloseQueuedOrInFlight(this) is true, return a
|
||||
// promise rejected with a TypeError exception.
|
||||
if (CloseQueuedOrInFlight()) {
|
||||
RefPtr<Promise> promise = Promise::Create(GetParentObject(), aRv);
|
||||
if (aRv.Failed()) {
|
||||
return nullptr;
|
||||
}
|
||||
promise->MaybeRejectWithTypeError("Stream is already closing");
|
||||
return promise.forget();
|
||||
}
|
||||
|
||||
// Step 3. Return ! WritableStreamClose(this).
|
||||
RefPtr<WritableStream> thisRefPtr = this;
|
||||
return WritableStreamClose(aCx, thisRefPtr, aRv);
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#acquire-writable-stream-default-writer
|
||||
already_AddRefed<WritableStreamDefaultWriter>
|
||||
AcquireWritableStreamDefaultWriter(WritableStream* aStream, ErrorResult& aRv) {
|
||||
// Step 1. Let writer be a new WritableStreamDefaultWriter.
|
||||
RefPtr<WritableStreamDefaultWriter> writer =
|
||||
new WritableStreamDefaultWriter(aStream->GetParentObject());
|
||||
|
||||
// Step 2. Perform ? SetUpWritableStreamDefaultWriter(writer, stream).
|
||||
SetUpWritableStreamDefaultWriter(writer, aStream, aRv);
|
||||
if (aRv.Failed()) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Step 3. Return writer.
|
||||
return writer.forget();
|
||||
}
|
||||
|
||||
already_AddRefed<WritableStreamDefaultWriter> WritableStream::GetWriter(
|
||||
ErrorResult& aRv) {
|
||||
return AcquireWritableStreamDefaultWriter(this, aRv);
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#writable-stream-add-write-request
|
||||
already_AddRefed<Promise> WritableStreamAddWriteRequest(WritableStream* aStream,
|
||||
ErrorResult& aRv) {
|
||||
// Step 1. Assert: ! IsWritableStreamLocked(stream) is true.
|
||||
MOZ_ASSERT(IsWritableStreamLocked(aStream));
|
||||
|
||||
// Step 2. Assert: stream.[[state]] is "writable".
|
||||
MOZ_ASSERT(aStream->State() == WritableStream::WriterState::Writable);
|
||||
|
||||
// Step 3. Let promise be a new promise.
|
||||
RefPtr<Promise> promise = Promise::Create(aStream->GetParentObject(), aRv);
|
||||
if (aRv.Failed()) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Step 4. Append promise to stream.[[writeRequests]].
|
||||
aStream->AppendWriteRequest(promise);
|
||||
|
||||
// Step 5. Return promise.
|
||||
return promise.forget();
|
||||
}
|
||||
|
||||
} // namespace mozilla::dom
|
198
dom/streams/WritableStream.h
Normal file
198
dom/streams/WritableStream.h
Normal file
@ -0,0 +1,198 @@
|
||||
/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
|
||||
/* vim:set ts=2 sw=2 sts=2 et cindent: */
|
||||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
#ifndef mozilla_dom_WritableStream_h
|
||||
#define mozilla_dom_WritableStream_h
|
||||
|
||||
#include "js/TypeDecls.h"
|
||||
#include "js/Value.h"
|
||||
#include "mozilla/Attributes.h"
|
||||
#include "mozilla/ErrorResult.h"
|
||||
#include "mozilla/dom/BindingDeclarations.h"
|
||||
#include "mozilla/dom/QueuingStrategyBinding.h"
|
||||
#include "mozilla/dom/WritableStreamDefaultController.h"
|
||||
#include "mozilla/dom/WritableStreamDefaultWriter.h"
|
||||
|
||||
#include "nsCycleCollectionParticipant.h"
|
||||
#include "nsWrapperCache.h"
|
||||
|
||||
#ifndef MOZ_DOM_STREAMS
|
||||
# error "Shouldn't be compiling with this header without MOZ_DOM_STREAMS set"
|
||||
#endif
|
||||
|
||||
namespace mozilla::dom {
|
||||
|
||||
class Promise;
|
||||
class WritableStreamDefaultController;
|
||||
class WritableStreamDefaultWriter;
|
||||
|
||||
class WritableStream final : public nsISupports, public nsWrapperCache {
|
||||
public:
|
||||
NS_DECL_CYCLE_COLLECTING_ISUPPORTS
|
||||
NS_DECL_CYCLE_COLLECTION_SCRIPT_HOLDER_CLASS(WritableStream)
|
||||
|
||||
protected:
|
||||
~WritableStream();
|
||||
|
||||
public:
|
||||
explicit WritableStream(const GlobalObject& aGlobal);
|
||||
explicit WritableStream(nsIGlobalObject* aGlobal);
|
||||
|
||||
enum class WriterState { Writable, Closed, Erroring, Errored };
|
||||
|
||||
// Slot Getter/Setters:
|
||||
public:
|
||||
bool Backpressure() const { return mBackpressure; }
|
||||
void SetBackpressure(bool aBackpressure) { mBackpressure = aBackpressure; }
|
||||
|
||||
Promise* GetCloseRequest() { return mCloseRequest; }
|
||||
void SetCloseRequest(Promise* aRequest) { mCloseRequest = aRequest; }
|
||||
|
||||
WritableStreamDefaultController* Controller() { return mController; }
|
||||
void SetController(WritableStreamDefaultController* aController) {
|
||||
MOZ_ASSERT(aController);
|
||||
mController = aController;
|
||||
}
|
||||
|
||||
Promise* GetInFlightWriteRequest() const { return mInFlightWriteRequest; }
|
||||
|
||||
Promise* GetPendingAbortRequestPromise() const {
|
||||
return mPendingAbortRequestPromise;
|
||||
}
|
||||
|
||||
void SetPendingAbortRequest(Promise* aPromise, JS::Handle<JS::Value> aReason,
|
||||
bool aWasAlreadyErroring) {
|
||||
mPendingAbortRequestPromise = aPromise;
|
||||
mPendingAbortRequestReason = aReason;
|
||||
mPendingAbortRequestWasAlreadyErroring = aWasAlreadyErroring;
|
||||
}
|
||||
|
||||
WritableStreamDefaultWriter* GetWriter() const { return mWriter; }
|
||||
void SetWriter(WritableStreamDefaultWriter* aWriter) { mWriter = aWriter; }
|
||||
|
||||
WriterState State() const { return mState; }
|
||||
void SetState(const WriterState& aState) { mState = aState; }
|
||||
|
||||
JS::Value StoredError() const { return mStoredError; }
|
||||
void SetStoredError(JS::HandleValue aStoredError) {
|
||||
mStoredError = aStoredError;
|
||||
}
|
||||
|
||||
void AppendWriteRequest(RefPtr<Promise>& aRequest) {
|
||||
mWriteRequests.AppendElement(aRequest);
|
||||
}
|
||||
|
||||
// WritableStreamCloseQueuedOrInFlight
|
||||
bool CloseQueuedOrInFlight() const {
|
||||
return mCloseRequest || mInFlightCloseRequest;
|
||||
}
|
||||
|
||||
// WritableStreamDealWithRejection
|
||||
void DealWithRejection(JSContext* aCx, JS::Handle<JS::Value> aError,
|
||||
ErrorResult& aRv);
|
||||
|
||||
// WritableStreamFinishErroring
|
||||
void FinishErroring(JSContext* aCx, ErrorResult& aRv);
|
||||
|
||||
// WritableStreamFinishInFlightClose
|
||||
void FinishInFlightClose();
|
||||
|
||||
// WritableStreamFinishInFlightCloseWithError
|
||||
void FinishInFlightCloseWithError(JSContext* aCx,
|
||||
JS::Handle<JS::Value> aError,
|
||||
ErrorResult& aRv);
|
||||
|
||||
// WritableStreamFinishInFlightWrite
|
||||
void FinishInFlightWrite();
|
||||
|
||||
// WritableStreamFinishInFlightWriteWithError
|
||||
void FinishInFlightWriteWithError(JSContext* aCX,
|
||||
JS::Handle<JS::Value> aError,
|
||||
ErrorResult& aR);
|
||||
|
||||
// WritableStreamHasOperationMarkedInFlight
|
||||
bool HasOperationMarkedInFlight() const {
|
||||
return mInFlightWriteRequest || mInFlightCloseRequest;
|
||||
}
|
||||
|
||||
// WritableStreamMarkCloseRequestInFlight
|
||||
void MarkCloseRequestInFlight();
|
||||
|
||||
// WritableStreamMarkFirstWriteRequestInFlight
|
||||
void MarkFirstWriteRequestInFlight();
|
||||
|
||||
// WritableStreamRejectCloseAndClosedPromiseIfNeeded
|
||||
void RejectCloseAndClosedPromiseIfNeeded();
|
||||
|
||||
// WritableStreamStartErroring
|
||||
void StartErroring(JSContext* aCx, JS::Handle<JS::Value> aReason,
|
||||
ErrorResult& aRv);
|
||||
|
||||
// WritableStreamUpdateBackpressure
|
||||
void UpdateBackpressure(bool aBackpressure, ErrorResult& aRv);
|
||||
|
||||
public:
|
||||
nsIGlobalObject* GetParentObject() const { return mGlobal; }
|
||||
|
||||
JSObject* WrapObject(JSContext* aCx,
|
||||
JS::Handle<JSObject*> aGivenProto) override;
|
||||
|
||||
// IDL Methods
|
||||
static already_AddRefed<WritableStream> Constructor(
|
||||
const GlobalObject& aGlobal,
|
||||
const Optional<JS::Handle<JSObject*>>& aUnderlyingSink,
|
||||
const QueuingStrategy& aStrategy, ErrorResult& aRv);
|
||||
|
||||
bool Locked() const { return !!mWriter; }
|
||||
|
||||
already_AddRefed<Promise> Abort(JSContext* cx, JS::Handle<JS::Value> aReason,
|
||||
ErrorResult& aRv);
|
||||
|
||||
already_AddRefed<Promise> Close(JSContext* aCx, ErrorResult& aRv);
|
||||
|
||||
already_AddRefed<WritableStreamDefaultWriter> GetWriter(ErrorResult& aRv);
|
||||
|
||||
// Internal Slots:
|
||||
private:
|
||||
bool mBackpressure = false;
|
||||
RefPtr<Promise> mCloseRequest;
|
||||
RefPtr<WritableStreamDefaultController> mController;
|
||||
RefPtr<Promise> mInFlightWriteRequest;
|
||||
RefPtr<Promise> mInFlightCloseRequest;
|
||||
|
||||
// We inline all members of [[pendingAbortRequest]] in this class.
|
||||
// The absence (i.e. undefined) of the [[pendingAbortRequest]]
|
||||
// is indicated by mPendingAbortRequestPromise = nullptr.
|
||||
RefPtr<Promise> mPendingAbortRequestPromise;
|
||||
JS::Heap<JS::Value> mPendingAbortRequestReason;
|
||||
bool mPendingAbortRequestWasAlreadyErroring = false;
|
||||
|
||||
WriterState mState = WriterState::Writable;
|
||||
JS::Heap<JS::Value> mStoredError;
|
||||
RefPtr<WritableStreamDefaultWriter> mWriter;
|
||||
nsTArray<RefPtr<Promise>> mWriteRequests;
|
||||
|
||||
nsCOMPtr<nsIGlobalObject> mGlobal;
|
||||
};
|
||||
|
||||
inline bool IsWritableStreamLocked(WritableStream* aStream) {
|
||||
return aStream->Locked();
|
||||
}
|
||||
|
||||
extern already_AddRefed<Promise> WritableStreamAbort(
|
||||
JSContext* aCx, WritableStream* aStream, JS::Handle<JS::Value> aReason,
|
||||
ErrorResult& aRv);
|
||||
|
||||
extern already_AddRefed<Promise> WritableStreamClose(JSContext* aCx,
|
||||
WritableStream* aStream,
|
||||
ErrorResult& aRv);
|
||||
|
||||
extern already_AddRefed<Promise> WritableStreamAddWriteRequest(
|
||||
WritableStream* aStream, ErrorResult& aRv);
|
||||
|
||||
} // namespace mozilla::dom
|
||||
|
||||
#endif // mozilla_dom_WritableStream_h
|
699
dom/streams/WritableStreamDefaultController.cpp
Normal file
699
dom/streams/WritableStreamDefaultController.cpp
Normal file
@ -0,0 +1,699 @@
|
||||
/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
|
||||
/* vim:set ts=2 sw=2 sts=2 et cindent: */
|
||||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
#include "js/Exception.h"
|
||||
#include "js/TypeDecls.h"
|
||||
#include "js/Value.h"
|
||||
#include "mozilla/AlreadyAddRefed.h"
|
||||
#include "mozilla/Attributes.h"
|
||||
#include "mozilla/dom/AbortSignal.h"
|
||||
#include "mozilla/dom/Promise.h"
|
||||
#include "mozilla/dom/PromiseNativeHandler.h"
|
||||
#include "mozilla/dom/WritableStream.h"
|
||||
#include "mozilla/dom/WritableStreamDefaultController.h"
|
||||
#include "mozilla/dom/WritableStreamDefaultControllerBinding.h"
|
||||
// #include "mozilla/dom/ReadableStreamDefaultReaderBinding.h"
|
||||
#include "mozilla/dom/UnderlyingSinkBinding.h"
|
||||
#include "nsCycleCollectionParticipant.h"
|
||||
#include "nsDebug.h"
|
||||
#include "nsISupports.h"
|
||||
|
||||
namespace mozilla::dom {
|
||||
|
||||
// Note: Using the individual macros vs NS_IMPL_CYCLE_COLLECTION_WRAPPERCACHE
|
||||
// because I need to specificy a manual implementation of
|
||||
// NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN.
|
||||
NS_IMPL_CYCLE_COLLECTION_CLASS(WritableStreamDefaultController)
|
||||
NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(WritableStreamDefaultController)
|
||||
NS_IMPL_CYCLE_COLLECTION_UNLINK(mGlobal, mSignal, mStrategySizeAlgorithm,
|
||||
mWriteAlgorithm, mCloseAlgorithm,
|
||||
mAbortAlgorithm, mStream)
|
||||
tmp->mQueue.clear();
|
||||
NS_IMPL_CYCLE_COLLECTION_UNLINK_PRESERVED_WRAPPER
|
||||
NS_IMPL_CYCLE_COLLECTION_UNLINK_END
|
||||
NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN(WritableStreamDefaultController)
|
||||
NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mGlobal, mSignal, mStrategySizeAlgorithm,
|
||||
mWriteAlgorithm, mCloseAlgorithm,
|
||||
mAbortAlgorithm, mStream)
|
||||
NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
|
||||
|
||||
NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN(WritableStreamDefaultController)
|
||||
NS_IMPL_CYCLE_COLLECTION_TRACE_PRESERVED_WRAPPER
|
||||
// Trace the associated queue.
|
||||
for (const auto& queueEntry : tmp->mQueue) {
|
||||
aCallbacks.Trace(&queueEntry->mValue, "mQueue.mValue", aClosure);
|
||||
}
|
||||
NS_IMPL_CYCLE_COLLECTION_TRACE_END
|
||||
|
||||
NS_IMPL_CYCLE_COLLECTING_ADDREF(WritableStreamDefaultController)
|
||||
NS_IMPL_CYCLE_COLLECTING_RELEASE(WritableStreamDefaultController)
|
||||
NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(WritableStreamDefaultController)
|
||||
NS_WRAPPERCACHE_INTERFACE_MAP_ENTRY
|
||||
NS_INTERFACE_MAP_ENTRY(nsISupports)
|
||||
NS_INTERFACE_MAP_END
|
||||
|
||||
WritableStreamDefaultController::WritableStreamDefaultController(
|
||||
nsISupports* aGlobal, WritableStream& aStream)
|
||||
: mGlobal(do_QueryInterface(aGlobal)), mStream(&aStream) {}
|
||||
|
||||
WritableStreamDefaultController::~WritableStreamDefaultController() {
|
||||
// MG:XXX: LinkedLists are required to be empty at destruction, but it seems
|
||||
// it is possible to have a controller be destructed while still
|
||||
// having entries in its queue.
|
||||
//
|
||||
// This needs to be verified as not indicating some other issue.
|
||||
mQueue.clear();
|
||||
}
|
||||
|
||||
JSObject* WritableStreamDefaultController::WrapObject(
|
||||
JSContext* aCx, JS::Handle<JSObject*> aGivenProto) {
|
||||
return WritableStreamDefaultController_Binding::Wrap(aCx, this, aGivenProto);
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#ws-default-controller-error
|
||||
void WritableStreamDefaultController::Error(JSContext* aCx,
|
||||
JS::Handle<JS::Value> aError,
|
||||
ErrorResult& aRv) {
|
||||
// Step 1. Let state be this.[[stream]].[[state]].
|
||||
// Step 2. If state is not "writable", return.
|
||||
if (mStream->State() != WritableStream::WriterState::Writable) {
|
||||
return;
|
||||
}
|
||||
// Step 3. Perform ! WritableStreamDefaultControllerError(this, e).
|
||||
RefPtr<WritableStreamDefaultController> thisRefPtr = this;
|
||||
WritableStreamDefaultControllerError(aCx, thisRefPtr, aError, aRv);
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#ws-default-controller-private-abort
|
||||
already_AddRefed<Promise> WritableStreamDefaultController::AbortSteps(
|
||||
JSContext* aCx, JS::Handle<JS::Value> aReason, ErrorResult& aRv) {
|
||||
// Step 1. Let result be the result of performing this.[[abortAlgorithm]],
|
||||
// passing reason.
|
||||
RefPtr<UnderlyingSinkAbortCallbackHelper> abortAlgorithm(mAbortAlgorithm);
|
||||
Optional<JS::Handle<JS::Value>> optionalReason(aCx, aReason);
|
||||
RefPtr<Promise> abortPromise =
|
||||
abortAlgorithm
|
||||
? abortAlgorithm->AbortCallback(aCx, optionalReason, aRv)
|
||||
: Promise::CreateResolvedWithUndefined(GetParentObject(), aRv);
|
||||
if (aRv.Failed()) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Step 2. Perform ! WritableStreamDefaultControllerClearAlgorithms(this).
|
||||
ClearAlgorithms();
|
||||
|
||||
// Step 3. Return result.
|
||||
return abortPromise.forget();
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#ws-default-controller-private-error
|
||||
void WritableStreamDefaultController::ErrorSteps() {
|
||||
// Step 1. Perform ! ResetQueue(this).
|
||||
ResetQueue(this);
|
||||
}
|
||||
|
||||
void WritableStreamDefaultController::SetSignal(AbortSignal* aSignal) {
|
||||
MOZ_ASSERT(aSignal);
|
||||
mSignal = aSignal;
|
||||
}
|
||||
|
||||
void WritableStreamDefaultControllerAdvanceQueueIfNeeded(
|
||||
JSContext* aCx, WritableStreamDefaultController* aController,
|
||||
ErrorResult& aRv);
|
||||
|
||||
class WritableStartPromiseNativeHandler final : public PromiseNativeHandler {
|
||||
~WritableStartPromiseNativeHandler() = default;
|
||||
|
||||
RefPtr<WritableStreamDefaultController> mController;
|
||||
|
||||
public:
|
||||
NS_DECL_CYCLE_COLLECTING_ISUPPORTS
|
||||
NS_DECL_CYCLE_COLLECTION_CLASS(WritableStartPromiseNativeHandler)
|
||||
|
||||
explicit WritableStartPromiseNativeHandler(
|
||||
WritableStreamDefaultController* aController)
|
||||
: PromiseNativeHandler(), mController(aController) {}
|
||||
|
||||
void ResolvedCallback(JSContext* aCx, JS::Handle<JS::Value> aValue) override {
|
||||
// https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller
|
||||
RefPtr<WritableStream> stream = mController->Stream();
|
||||
// Step 17. Upon fulfillment of startPromise,
|
||||
// Step 17.1. Assert: stream.[[state]] is "writable" or "erroring".
|
||||
MOZ_ASSERT(stream->State() == WritableStream::WriterState::Writable ||
|
||||
stream->State() == WritableStream::WriterState::Erroring);
|
||||
// Step 17.2. Set controller.[[started]] to true.
|
||||
mController->SetStarted(true);
|
||||
// Step 17.3 Perform
|
||||
// ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
|
||||
IgnoredErrorResult rv;
|
||||
WritableStreamDefaultControllerAdvanceQueueIfNeeded(aCx, mController, rv);
|
||||
if (rv.MaybeSetPendingException(aCx)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
void RejectedCallback(JSContext* aCx, JS::Handle<JS::Value> aValue) override {
|
||||
// https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller
|
||||
RefPtr<WritableStream> stream = mController->Stream();
|
||||
// Step 18. Upon rejection of startPromise with reason r,
|
||||
// Step 18.1. Assert: stream.[[state]] is "writable" or "erroring".
|
||||
MOZ_ASSERT(stream->State() == WritableStream::WriterState::Writable ||
|
||||
stream->State() == WritableStream::WriterState::Erroring);
|
||||
// Step 18.2. Set controller.[[started]] to true.
|
||||
mController->SetStarted(true);
|
||||
// Step 18.3. Perform ! WritableStreamDealWithRejection(stream, r).
|
||||
IgnoredErrorResult rv;
|
||||
stream->DealWithRejection(aCx, aValue, rv);
|
||||
if (rv.MaybeSetPendingException(aCx)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
NS_IMPL_CYCLE_COLLECTION(WritableStartPromiseNativeHandler, mController)
|
||||
NS_IMPL_CYCLE_COLLECTING_ADDREF(WritableStartPromiseNativeHandler)
|
||||
NS_IMPL_CYCLE_COLLECTING_RELEASE(WritableStartPromiseNativeHandler)
|
||||
NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(WritableStartPromiseNativeHandler)
|
||||
NS_INTERFACE_MAP_ENTRY(nsISupports)
|
||||
NS_INTERFACE_MAP_END
|
||||
|
||||
// https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller
|
||||
MOZ_CAN_RUN_SCRIPT
|
||||
void SetUpWritableStreamDefaultController(
|
||||
JSContext* aCx, WritableStream* aStream,
|
||||
WritableStreamDefaultController* aController,
|
||||
UnderlyingSinkStartCallbackHelper* aStartAlgorithm,
|
||||
UnderlyingSinkWriteCallbackHelper* aWriteAlgorithm,
|
||||
UnderlyingSinkCloseCallbackHelper* aCloseAlgorithm,
|
||||
UnderlyingSinkAbortCallbackHelper* aAbortAlgorithm, double aHighWaterMark,
|
||||
QueuingStrategySize* aSizeAlgorithm, ErrorResult& aRv) {
|
||||
// Step 1. Assert: stream implements WritableStream.
|
||||
// Step 2. Assert: stream.[[controller]] is undefined.
|
||||
MOZ_ASSERT(!aStream->Controller());
|
||||
|
||||
// Step 3. Set controller.[[stream]] to stream.
|
||||
// Note: Already set in
|
||||
// SetUpWritableStreamDefaultControllerFromUnderlyingSink.
|
||||
MOZ_ASSERT(aController->Stream() == aStream);
|
||||
|
||||
// Step 4. Set stream.[[controller]] to controller.
|
||||
aStream->SetController(aController);
|
||||
|
||||
// Step 5. Perform ! ResetQueue(controller).
|
||||
ResetQueue(aController);
|
||||
|
||||
// Step 6. Set controller.[[signal]] to a new AbortSignal.
|
||||
RefPtr<AbortSignal> signal = new AbortSignal(aController->GetParentObject(),
|
||||
false, JS::UndefinedHandleValue);
|
||||
aController->SetSignal(signal);
|
||||
|
||||
// Step 7. Set controller.[[started]] to false.
|
||||
aController->SetStarted(false);
|
||||
|
||||
// Step 8. Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm.
|
||||
aController->SetStrategySizeAlgorithm(aSizeAlgorithm);
|
||||
|
||||
// Step 9. Set controller.[[strategyHWM]] to highWaterMark.
|
||||
aController->SetStrategyHWM(aHighWaterMark);
|
||||
|
||||
// Step 10. Set controller.[[writeAlgorithm]] to writeAlgorithm.
|
||||
aController->SetWriteAlgorithm(aWriteAlgorithm);
|
||||
|
||||
// Step 11. Set controller.[[closeAlgorithm]] to closeAlgorithm.
|
||||
aController->SetCloseAlgorithm(aCloseAlgorithm);
|
||||
|
||||
// Step 12. Set controller.[[abortAlgorithm]] to abortAlgorithm.
|
||||
aController->SetAbortAlgorithm(aAbortAlgorithm);
|
||||
|
||||
// Step 13. Let backpressure be !
|
||||
// WritableStreamDefaultControllerGetBackpressure(controller).
|
||||
bool backpressure = aController->GetBackpressure();
|
||||
|
||||
// Step 14. Perform ! WritableStreamUpdateBackpressure(stream, backpressure).
|
||||
aStream->UpdateBackpressure(backpressure, aRv);
|
||||
if (aRv.Failed()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Step 15. Let startResult be the result of performing startAlgorithm. (This
|
||||
// may throw an exception.)
|
||||
JS::Rooted<JS::Value> startResult(aCx, JS::UndefinedValue());
|
||||
if (aStartAlgorithm) {
|
||||
// Strong Refs:
|
||||
RefPtr<UnderlyingSinkStartCallbackHelper> startAlgorithm(aStartAlgorithm);
|
||||
RefPtr<WritableStreamDefaultController> controller(aController);
|
||||
|
||||
startAlgorithm->StartCallback(aCx, *controller, &startResult, aRv);
|
||||
if (aRv.Failed()) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Step 16. Let startPromise be a promise resolved with startResult.
|
||||
RefPtr<Promise> startPromise = Promise::Create(GetIncumbentGlobal(), aRv);
|
||||
if (aRv.Failed()) {
|
||||
return;
|
||||
}
|
||||
startPromise->MaybeResolve(startResult);
|
||||
|
||||
// Step 17/18.
|
||||
RefPtr<WritableStartPromiseNativeHandler> startPromiseHandler =
|
||||
new WritableStartPromiseNativeHandler(aController);
|
||||
startPromise->AppendNativeHandler(startPromiseHandler);
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink
|
||||
MOZ_CAN_RUN_SCRIPT
|
||||
void SetUpWritableStreamDefaultControllerFromUnderlyingSink(
|
||||
JSContext* aCx, WritableStream* aStream, JS::HandleObject aUnderlyingSink,
|
||||
UnderlyingSink& aUnderlyingSinkDict, double aHighWaterMark,
|
||||
QueuingStrategySize* aSizeAlgorithm, ErrorResult& aRv) {
|
||||
// Step 1.
|
||||
RefPtr<WritableStreamDefaultController> controller =
|
||||
new WritableStreamDefaultController(aStream->GetParentObject(), *aStream);
|
||||
|
||||
// Step 6. (implicit Step 2.)
|
||||
RefPtr<UnderlyingSinkStartCallbackHelper> startAlgorithm =
|
||||
aUnderlyingSinkDict.mStart.WasPassed()
|
||||
? new UnderlyingSinkStartCallbackHelper(
|
||||
aUnderlyingSinkDict.mStart.Value(), aUnderlyingSink)
|
||||
: nullptr;
|
||||
|
||||
// Step 7. (implicit Step 3.)
|
||||
RefPtr<UnderlyingSinkWriteCallbackHelper> writeAlgorithm =
|
||||
aUnderlyingSinkDict.mWrite.WasPassed()
|
||||
? new UnderlyingSinkWriteCallbackHelper(
|
||||
aUnderlyingSinkDict.mWrite.Value(), aUnderlyingSink)
|
||||
: nullptr;
|
||||
|
||||
// Step 8. (implicit Step 4.)
|
||||
RefPtr<UnderlyingSinkCloseCallbackHelper> closeAlgorithm =
|
||||
aUnderlyingSinkDict.mClose.WasPassed()
|
||||
? new UnderlyingSinkCloseCallbackHelper(
|
||||
aUnderlyingSinkDict.mClose.Value(), aUnderlyingSink)
|
||||
: nullptr;
|
||||
|
||||
// Step 9. (implicit Step 5.)
|
||||
RefPtr<UnderlyingSinkAbortCallbackHelper> abortAlgorithm =
|
||||
aUnderlyingSinkDict.mAbort.WasPassed()
|
||||
? new UnderlyingSinkAbortCallbackHelper(
|
||||
aUnderlyingSinkDict.mAbort.Value(), aUnderlyingSink)
|
||||
: nullptr;
|
||||
|
||||
// Step 10.
|
||||
SetUpWritableStreamDefaultController(
|
||||
aCx, aStream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm,
|
||||
abortAlgorithm, aHighWaterMark, aSizeAlgorithm, aRv);
|
||||
}
|
||||
|
||||
// MG:XXX: Probably can find base class between this and
|
||||
// StartPromiseNativeHandler
|
||||
class SinkCloseNativePromiseHandler final : public PromiseNativeHandler {
|
||||
~SinkCloseNativePromiseHandler() = default;
|
||||
|
||||
public:
|
||||
NS_DECL_CYCLE_COLLECTING_ISUPPORTS
|
||||
NS_DECL_CYCLE_COLLECTION_CLASS(SinkCloseNativePromiseHandler)
|
||||
|
||||
explicit SinkCloseNativePromiseHandler(
|
||||
WritableStreamDefaultController* aController)
|
||||
: PromiseNativeHandler(), mController(aController) {}
|
||||
|
||||
void ResolvedCallback(JSContext* aCx, JS::Handle<JS::Value> aValue) override {
|
||||
// https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close
|
||||
RefPtr<WritableStream> stream = mController->Stream();
|
||||
// Step 7. Upon fulfillment of sinkClosePromise,
|
||||
// Step 7.1. Perform ! WritableStreamFinishInFlightClose(stream).
|
||||
stream->FinishInFlightClose();
|
||||
}
|
||||
|
||||
void RejectedCallback(JSContext* aCx, JS::Handle<JS::Value> aValue) override {
|
||||
// https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close
|
||||
RefPtr<WritableStream> stream = mController->Stream();
|
||||
// Step 8. Upon rejection of sinkClosePromise with reason reason,
|
||||
// Step 8.1. Perform ! WritableStreamFinishInFlightCloseWithError(stream,
|
||||
// reason).
|
||||
IgnoredErrorResult rv;
|
||||
stream->FinishInFlightCloseWithError(aCx, aValue, rv);
|
||||
NS_WARNING_ASSERTION(!rv.Failed(), "FinishInFlightCloseWithError failed");
|
||||
}
|
||||
|
||||
private:
|
||||
RefPtr<WritableStreamDefaultController> mController;
|
||||
};
|
||||
|
||||
// Cycle collection methods for promise handler.
|
||||
NS_IMPL_CYCLE_COLLECTION(SinkCloseNativePromiseHandler, mController)
|
||||
NS_IMPL_CYCLE_COLLECTING_ADDREF(SinkCloseNativePromiseHandler)
|
||||
NS_IMPL_CYCLE_COLLECTING_RELEASE(SinkCloseNativePromiseHandler)
|
||||
NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(SinkCloseNativePromiseHandler)
|
||||
NS_INTERFACE_MAP_ENTRY(nsISupports)
|
||||
NS_INTERFACE_MAP_END
|
||||
|
||||
// https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close
|
||||
void WritableStreamDefaultControllerProcessClose(
|
||||
JSContext* aCx, WritableStreamDefaultController* aController,
|
||||
ErrorResult& aRv) {
|
||||
// Step 1. Let stream be controller.[[stream]].
|
||||
RefPtr<WritableStream> stream = aController->Stream();
|
||||
|
||||
// Step 2. Perform ! WritableStreamMarkCloseRequestInFlight(stream).
|
||||
stream->MarkCloseRequestInFlight();
|
||||
|
||||
// Step 3. Perform ! DequeueValue(controller).
|
||||
JS::Rooted<JS::Value> value(aCx);
|
||||
DequeueValue(aController, &value);
|
||||
|
||||
// Step 4. Assert: controller.[[queue]] is empty.
|
||||
MOZ_ASSERT(aController->Queue().isEmpty());
|
||||
|
||||
// Step 5. Let sinkClosePromise be the result of performing
|
||||
// controller.[[closeAlgorithm]].
|
||||
RefPtr<UnderlyingSinkCloseCallbackHelper> closeAlgorithm(
|
||||
aController->GetCloseAlgorithm());
|
||||
|
||||
RefPtr<Promise> sinkClosePromise =
|
||||
closeAlgorithm ? closeAlgorithm->CloseCallback(aCx, aRv)
|
||||
: Promise::CreateResolvedWithUndefined(
|
||||
aController->GetParentObject(), aRv);
|
||||
|
||||
// Step 6. Perform !
|
||||
// WritableStreamDefaultControllerClearAlgorithms(controller).
|
||||
aController->ClearAlgorithms();
|
||||
|
||||
// Step 7 + 8.
|
||||
sinkClosePromise->AppendNativeHandler(
|
||||
new SinkCloseNativePromiseHandler(aController));
|
||||
}
|
||||
|
||||
// MG:XXX: Probably can find base class between this and
|
||||
// StartPromiseNativeHandler
|
||||
class SinkWriteNativePromiseHandler final : public PromiseNativeHandler {
|
||||
~SinkWriteNativePromiseHandler() = default;
|
||||
|
||||
RefPtr<WritableStreamDefaultController> mController;
|
||||
|
||||
public:
|
||||
NS_DECL_CYCLE_COLLECTING_ISUPPORTS
|
||||
NS_DECL_CYCLE_COLLECTION_CLASS(SinkWriteNativePromiseHandler)
|
||||
|
||||
explicit SinkWriteNativePromiseHandler(
|
||||
WritableStreamDefaultController* aController)
|
||||
: PromiseNativeHandler(), mController(aController) {}
|
||||
|
||||
void ResolvedCallback(JSContext* aCx, JS::Handle<JS::Value> aValue) override {
|
||||
// https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write
|
||||
RefPtr<WritableStream> stream = mController->Stream();
|
||||
|
||||
// Step 4.1. Perform ! WritableStreamFinishInFlightWrite(stream).
|
||||
stream->FinishInFlightWrite();
|
||||
|
||||
// Step 4.2. Let state be stream.[[state]].
|
||||
WritableStream::WriterState state = stream->State();
|
||||
|
||||
// Step 4.3. Assert: state is "writable" or "erroring".
|
||||
MOZ_ASSERT(state == WritableStream::WriterState::Writable ||
|
||||
state == WritableStream::WriterState::Erroring);
|
||||
|
||||
// Step 4.4. Perform ! DequeueValue(controller).
|
||||
JS::Rooted<JS::Value> value(aCx);
|
||||
DequeueValue(mController, &value);
|
||||
|
||||
// Step 4.5. If ! WritableStreamCloseQueuedOrInFlight(stream) is false and
|
||||
// state is "writable",
|
||||
if (!stream->CloseQueuedOrInFlight() &&
|
||||
state == WritableStream::WriterState::Writable) {
|
||||
// Step 4.5.1. Let backpressure be !
|
||||
// WritableStreamDefaultControllerGetBackpressure(controller).
|
||||
bool backpressure = mController->GetBackpressure();
|
||||
// Step 4.5.2. Perform ! WritableStreamUpdateBackpressure(stream,
|
||||
// backpressure).
|
||||
IgnoredErrorResult rv;
|
||||
stream->UpdateBackpressure(backpressure, rv);
|
||||
// XXX Not Sure How To Handle Errors Inside Native Callbacks,
|
||||
NS_WARNING_ASSERTION(!rv.Failed(), "UpdateBackpressure failed");
|
||||
}
|
||||
|
||||
// Step 4.6. Perform !
|
||||
// WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
|
||||
IgnoredErrorResult rv;
|
||||
WritableStreamDefaultControllerAdvanceQueueIfNeeded(aCx, mController, rv);
|
||||
// XXX Not Sure How To Handle Errors Inside Native Callbacks,
|
||||
NS_WARNING_ASSERTION(
|
||||
!rv.Failed(),
|
||||
"WritableStreamDefaultControllerAdvanceQueueIfNeeded failed");
|
||||
}
|
||||
|
||||
void RejectedCallback(JSContext* aCx, JS::Handle<JS::Value> aValue) override {
|
||||
// https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write
|
||||
RefPtr<WritableStream> stream = mController->Stream();
|
||||
|
||||
// Step 5.1. If stream.[[state]] is "writable", perform !
|
||||
// WritableStreamDefaultControllerClearAlgorithms(controller).
|
||||
if (stream->State() == WritableStream::WriterState::Writable) {
|
||||
mController->ClearAlgorithms();
|
||||
}
|
||||
|
||||
// Step 5.2. Perform ! WritableStreamFinishInFlightWriteWithError(stream,
|
||||
// reason)
|
||||
IgnoredErrorResult rv;
|
||||
stream->FinishInFlightWriteWithError(aCx, aValue, rv);
|
||||
// XXX Not Sure How To Handle Errors Inside Native Callbacks,
|
||||
NS_WARNING_ASSERTION(!rv.Failed(), "FinishInFlightWriteWithError failed");
|
||||
}
|
||||
};
|
||||
|
||||
// Cycle collection methods for promise handler.
|
||||
NS_IMPL_CYCLE_COLLECTION(SinkWriteNativePromiseHandler, mController)
|
||||
NS_IMPL_CYCLE_COLLECTING_ADDREF(SinkWriteNativePromiseHandler)
|
||||
NS_IMPL_CYCLE_COLLECTING_RELEASE(SinkWriteNativePromiseHandler)
|
||||
NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(SinkWriteNativePromiseHandler)
|
||||
NS_INTERFACE_MAP_ENTRY(nsISupports)
|
||||
NS_INTERFACE_MAP_END
|
||||
|
||||
// https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write
|
||||
void WritableStreamDefaultControllerProcessWrite(
|
||||
JSContext* aCx, WritableStreamDefaultController* aController,
|
||||
JS::Handle<JS::Value> aChunk, ErrorResult& aRv) {
|
||||
// Step 1. Let stream be controller.[[stream]].
|
||||
RefPtr<WritableStream> stream = aController->Stream();
|
||||
|
||||
// Step 2. Perform ! WritableStreamMarkFirstWriteRequestInFlight(stream).
|
||||
stream->MarkFirstWriteRequestInFlight();
|
||||
|
||||
// Step 3. Let sinkWritePromise be the result of performing
|
||||
// controller.[[writeAlgorithm]], passing in chunk.
|
||||
RefPtr<UnderlyingSinkWriteCallbackHelper> writeAlgorithm(
|
||||
aController->GetWriteAlgorithm());
|
||||
|
||||
RefPtr<Promise> sinkWritePromise =
|
||||
writeAlgorithm
|
||||
? writeAlgorithm->WriteCallback(aCx, aChunk, *aController, aRv)
|
||||
: Promise::CreateResolvedWithUndefined(aController->GetParentObject(),
|
||||
aRv);
|
||||
if (aRv.Failed()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Step 4 + 5:
|
||||
sinkWritePromise->AppendNativeHandler(
|
||||
new SinkWriteNativePromiseHandler(aController));
|
||||
}
|
||||
|
||||
// We use a JS::MagicValue to represent the close sentinel required by the spec.
|
||||
// Normal JavaScript code can not generate magic values, so we can use this
|
||||
// as a special value. However care has to be taken to not leak the magic value
|
||||
// to other code.
|
||||
constexpr JSWhyMagic CLOSE_SENTINEL = JS_GENERIC_MAGIC;
|
||||
|
||||
// https://streams.spec.whatwg.org/#writable-stream-default-controller-advance-queue-if-needed
|
||||
void WritableStreamDefaultControllerAdvanceQueueIfNeeded(
|
||||
JSContext* aCx, WritableStreamDefaultController* aController,
|
||||
ErrorResult& aRv) {
|
||||
// Step 1. Let stream be controller.[[stream]].
|
||||
RefPtr<WritableStream> stream = aController->Stream();
|
||||
|
||||
// Step 2. If controller.[[started]] is false, return.
|
||||
if (!aController->Started()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Step 3. If stream.[[inFlightWriteRequest]] is not undefined, return.
|
||||
if (stream->GetInFlightWriteRequest()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Step 4. Let state be stream.[[state]].
|
||||
WritableStream::WriterState state = stream->State();
|
||||
|
||||
// Step 5. Assert: state is not "closed" or "errored".
|
||||
MOZ_ASSERT(state != WritableStream::WriterState::Closed &&
|
||||
state != WritableStream::WriterState::Errored);
|
||||
|
||||
// Step 6. If state is "erroring",
|
||||
if (state == WritableStream::WriterState::Erroring) {
|
||||
// Step 6.1. Perform ! WritableStreamFinishErroring(stream).
|
||||
stream->FinishErroring(aCx, aRv);
|
||||
|
||||
// Step 6.2. Return.
|
||||
return;
|
||||
}
|
||||
|
||||
// Step 7. If controller.[[queue]] is empty, return.
|
||||
if (aController->Queue().isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Step 8. Let value be ! PeekQueueValue(controller).
|
||||
JS::Rooted<JS::Value> value(aCx);
|
||||
PeekQueueValue(aController, &value);
|
||||
|
||||
// Step 9. If value is the close sentinel, perform !
|
||||
// WritableStreamDefaultControllerProcessClose(controller).
|
||||
if (value.isMagic(CLOSE_SENTINEL)) {
|
||||
WritableStreamDefaultControllerProcessClose(aCx, aController, aRv);
|
||||
return;
|
||||
}
|
||||
|
||||
// Step 10. Otherwise, perform !
|
||||
// WritableStreamDefaultControllerProcessWrite(controller, value).
|
||||
WritableStreamDefaultControllerProcessWrite(aCx, aController, value, aRv);
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#writable-stream-default-controller-close
|
||||
void WritableStreamDefaultControllerClose(
|
||||
JSContext* aCx, WritableStreamDefaultController* aController,
|
||||
ErrorResult& aRv) {
|
||||
// Step 1. Perform ! EnqueueValueWithSize(controller, close sentinel, 0).
|
||||
JS::Rooted<JS::Value> aCloseSentinel(aCx, JS::MagicValue(CLOSE_SENTINEL));
|
||||
EnqueueValueWithSize(aController, aCloseSentinel, 0, aRv);
|
||||
MOZ_ASSERT(!aRv.Failed());
|
||||
|
||||
// Step 2. Perform !
|
||||
// WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
|
||||
WritableStreamDefaultControllerAdvanceQueueIfNeeded(aCx, aController, aRv);
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#writable-stream-default-controller-write
|
||||
void WritableStreamDefaultControllerWrite(
|
||||
JSContext* aCx, WritableStreamDefaultController* aController,
|
||||
JS::Handle<JS::Value> aChunk, double chunkSize, ErrorResult& aRv) {
|
||||
// Step 1. Let enqueueResult be EnqueueValueWithSize(controller, chunk,
|
||||
// chunkSize).
|
||||
IgnoredErrorResult rv;
|
||||
EnqueueValueWithSize(aController, aChunk, chunkSize, rv);
|
||||
|
||||
// Step 2. If enqueueResult is an abrupt completion,
|
||||
if (rv.MaybeSetPendingException(aCx,
|
||||
"WritableStreamDefaultController.write")) {
|
||||
JS::Rooted<JS::Value> error(aCx);
|
||||
JS_GetPendingException(aCx, &error);
|
||||
JS_ClearPendingException(aCx);
|
||||
|
||||
// Step 2.1. Perform !
|
||||
// WritableStreamDefaultControllerErrorIfNeeded(controller,
|
||||
// enqueueResult.[[Value]]).
|
||||
WritableStreamDefaultControllerErrorIfNeeded(aCx, aController, error, aRv);
|
||||
|
||||
// Step 2.2. Return.
|
||||
return;
|
||||
}
|
||||
|
||||
// Step 3. Let stream be controller.[[stream]].
|
||||
RefPtr<WritableStream> stream = aController->Stream();
|
||||
|
||||
// Step 4. If ! WritableStreamCloseQueuedOrInFlight(stream) is false and
|
||||
// stream.[[state]] is "writable",
|
||||
if (!stream->CloseQueuedOrInFlight() &&
|
||||
stream->State() == WritableStream::WriterState::Writable) {
|
||||
// Step 4.1. Let backpressure be
|
||||
// !WritableStreamDefaultControllerGetBackpressure(controller).
|
||||
bool backpressure = aController->GetBackpressure();
|
||||
|
||||
// Step 4.2. Perform ! WritableStreamUpdateBackpressure(stream,
|
||||
// backpressure).
|
||||
stream->UpdateBackpressure(backpressure, aRv);
|
||||
if (aRv.Failed()) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Step 5. Perform
|
||||
// ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
|
||||
WritableStreamDefaultControllerAdvanceQueueIfNeeded(aCx, aController, aRv);
|
||||
}
|
||||
|
||||
void WritableStreamDefaultControllerError(
|
||||
JSContext* aCx, WritableStreamDefaultController* aController,
|
||||
JS::Handle<JS::Value> aError, ErrorResult& aRv) {
|
||||
// Step 1. Let stream be controller.[[stream]].
|
||||
RefPtr<WritableStream> stream = aController->Stream();
|
||||
|
||||
// Step 2. Assert: stream.[[state]] is "writable".
|
||||
MOZ_ASSERT(stream->State() == WritableStream::WriterState::Writable);
|
||||
|
||||
// Step 3. Perform
|
||||
// ! WritableStreamDefaultControllerClearAlgorithms(controller).
|
||||
aController->ClearAlgorithms();
|
||||
|
||||
// Step 4.Perform ! WritableStreamStartErroring(stream, error).
|
||||
stream->StartErroring(aCx, aError, aRv);
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#writable-stream-default-controller-error-if-needed
|
||||
void WritableStreamDefaultControllerErrorIfNeeded(
|
||||
JSContext* aCx, WritableStreamDefaultController* aController,
|
||||
JS::Handle<JS::Value> aError, ErrorResult& aRv) {
|
||||
// Step 1. If controller.[[stream]].[[state]] is "writable", perform
|
||||
// !WritableStreamDefaultControllerError(controller, error).
|
||||
if (aController->Stream()->State() == WritableStream::WriterState::Writable) {
|
||||
WritableStreamDefaultControllerError(aCx, aController, aError, aRv);
|
||||
}
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#writable-stream-default-controller-get-chunk-size
|
||||
double WritableStreamDefaultControllerGetChunkSize(
|
||||
JSContext* aCx, WritableStreamDefaultController* aController,
|
||||
JS::Handle<JS::Value> aChunk, ErrorResult& aRv) {
|
||||
// Step 1. Let returnValue be the result of performing
|
||||
// controller.[[strategySizeAlgorithm]], passing in chunk, and interpreting
|
||||
// the result as a completion record.
|
||||
RefPtr<QueuingStrategySize> sizeAlgorithm(
|
||||
aController->StrategySizeAlgorithm());
|
||||
|
||||
// If !sizeAlgorithm, we return 1, which is inlined from
|
||||
// https://streams.spec.whatwg.org/#make-size-algorithm-from-size-function
|
||||
Optional<JS::Handle<JS::Value>> optionalChunk(aCx, aChunk);
|
||||
|
||||
double chunkSize =
|
||||
sizeAlgorithm
|
||||
? sizeAlgorithm->Call(
|
||||
optionalChunk, aRv,
|
||||
"WritableStreamDefaultController.[[strategySizeAlgorithm]]",
|
||||
CallbackObject::eRethrowExceptions)
|
||||
: 1.0;
|
||||
|
||||
// Step 2. If returnValue is an abrupt completion,
|
||||
if (aRv.MaybeSetPendingException(
|
||||
aCx, "WritableStreamDefaultController.[[strategySizeAlgorithm]]")) {
|
||||
JS::Rooted<JS::Value> error(aCx);
|
||||
JS_GetPendingException(aCx, &error);
|
||||
JS_ClearPendingException(aCx);
|
||||
|
||||
// Step 2.1. Perform !
|
||||
// WritableStreamDefaultControllerErrorIfNeeded(controller,
|
||||
// returnValue.[[Value]]).
|
||||
WritableStreamDefaultControllerErrorIfNeeded(aCx, aController, error, aRv);
|
||||
|
||||
// Step 2.2. Return 1.
|
||||
return 1.0;
|
||||
}
|
||||
|
||||
// Step 3. Return returnValue.[[Value]].
|
||||
return chunkSize;
|
||||
}
|
||||
|
||||
} // namespace mozilla::dom
|
195
dom/streams/WritableStreamDefaultController.h
Normal file
195
dom/streams/WritableStreamDefaultController.h
Normal file
@ -0,0 +1,195 @@
|
||||
/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
|
||||
/* vim:set ts=2 sw=2 sts=2 et cindent: */
|
||||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
#ifndef mozilla_dom_WritableStreamDefaultController_h
|
||||
#define mozilla_dom_WritableStreamDefaultController_h
|
||||
|
||||
#include "js/TypeDecls.h"
|
||||
#include "mozilla/AlreadyAddRefed.h"
|
||||
#include "mozilla/Attributes.h"
|
||||
#include "mozilla/ErrorResult.h"
|
||||
#include "mozilla/dom/BindingDeclarations.h"
|
||||
#include "mozilla/dom/QueuingStrategyBinding.h"
|
||||
#include "mozilla/dom/QueueWithSizes.h"
|
||||
#include "mozilla/dom/ReadRequest.h"
|
||||
#include "mozilla/dom/UnderlyingSinkCallbackHelpers.h"
|
||||
#include "nsCycleCollectionParticipant.h"
|
||||
#include "nsWrapperCache.h"
|
||||
#include "mozilla/dom/Nullable.h"
|
||||
#include "nsTArray.h"
|
||||
#include "nsISupportsBase.h"
|
||||
|
||||
namespace mozilla {
|
||||
namespace dom {
|
||||
|
||||
class AbortSignal;
|
||||
class WritableStream;
|
||||
struct UnderlyingSink;
|
||||
|
||||
class WritableStreamDefaultController final : public nsISupports,
|
||||
public nsWrapperCache {
|
||||
public:
|
||||
NS_DECL_CYCLE_COLLECTING_ISUPPORTS
|
||||
NS_DECL_CYCLE_COLLECTION_SCRIPT_HOLDER_CLASS(WritableStreamDefaultController)
|
||||
|
||||
explicit WritableStreamDefaultController(nsISupports* aGlobal,
|
||||
WritableStream& aStream);
|
||||
|
||||
protected:
|
||||
~WritableStreamDefaultController();
|
||||
|
||||
public:
|
||||
nsIGlobalObject* GetParentObject() const { return mGlobal; }
|
||||
|
||||
JSObject* WrapObject(JSContext* aCx,
|
||||
JS::Handle<JSObject*> aGivenProto) override;
|
||||
|
||||
// WebIDL methods/properties
|
||||
|
||||
AbortSignal* Signal() { return mSignal; }
|
||||
|
||||
void Error(JSContext* aCx, JS::Handle<JS::Value> aError, ErrorResult& aRv);
|
||||
|
||||
// [[AbortSteps]]
|
||||
virtual already_AddRefed<Promise> AbortSteps(JSContext* aCx,
|
||||
JS::Handle<JS::Value> aReason,
|
||||
ErrorResult& aRv);
|
||||
|
||||
// [[ErrorSteps]]
|
||||
virtual void ErrorSteps();
|
||||
|
||||
// Internal Slot Accessors
|
||||
|
||||
QueueWithSizes& Queue() { return mQueue; }
|
||||
|
||||
double QueueTotalSize() const { return mQueueTotalSize; }
|
||||
void SetQueueTotalSize(double aQueueTotalSize) {
|
||||
mQueueTotalSize = aQueueTotalSize;
|
||||
}
|
||||
|
||||
void SetSignal(AbortSignal* aSignal);
|
||||
|
||||
bool Started() const { return mStarted; }
|
||||
void SetStarted(bool aStarted) { mStarted = aStarted; }
|
||||
|
||||
double StrategyHWM() const { return mStrategyHWM; }
|
||||
void SetStrategyHWM(double aStrategyHWM) { mStrategyHWM = aStrategyHWM; }
|
||||
|
||||
QueuingStrategySize* StrategySizeAlgorithm() const {
|
||||
return mStrategySizeAlgorithm;
|
||||
}
|
||||
void SetStrategySizeAlgorithm(QueuingStrategySize* aStrategySizeAlgorithm) {
|
||||
mStrategySizeAlgorithm = aStrategySizeAlgorithm;
|
||||
}
|
||||
|
||||
UnderlyingSinkWriteCallbackHelper* GetWriteAlgorithm() {
|
||||
return mWriteAlgorithm;
|
||||
}
|
||||
void SetWriteAlgorithm(UnderlyingSinkWriteCallbackHelper* aWriteAlgorithm) {
|
||||
mWriteAlgorithm = aWriteAlgorithm;
|
||||
}
|
||||
|
||||
UnderlyingSinkCloseCallbackHelper* GetCloseAlgorithm() {
|
||||
return mCloseAlgorithm;
|
||||
}
|
||||
void SetCloseAlgorithm(UnderlyingSinkCloseCallbackHelper* aCloseAlgorithm) {
|
||||
mCloseAlgorithm = aCloseAlgorithm;
|
||||
}
|
||||
|
||||
UnderlyingSinkAbortCallbackHelper* GetAbortAlgorithm() {
|
||||
return mAbortAlgorithm;
|
||||
}
|
||||
void SetAbortAlgorithm(UnderlyingSinkAbortCallbackHelper* aAbortAlgorithm) {
|
||||
mAbortAlgorithm = aAbortAlgorithm;
|
||||
}
|
||||
|
||||
WritableStream* Stream() { return mStream; }
|
||||
|
||||
// WritableStreamDefaultControllerGetBackpressure
|
||||
// https://streams.spec.whatwg.org/#writable-stream-default-controller-get-backpressure
|
||||
bool GetBackpressure() const {
|
||||
// Step 1. Let desiredSize be !
|
||||
// WritableStreamDefaultControllerGetDesiredSize(controller).
|
||||
double desiredSize = GetDesiredSize();
|
||||
// Step 2. Return true if desiredSize ≤ 0, or false otherwise.
|
||||
return desiredSize <= 0;
|
||||
}
|
||||
|
||||
// WritableStreamDefaultControllerGetDesiredSize
|
||||
// https://streams.spec.whatwg.org/#writable-stream-default-controller-get-desired-size
|
||||
double GetDesiredSize() const { return mStrategyHWM - mQueueTotalSize; }
|
||||
|
||||
// WritableStreamDefaultControllerClearAlgorithms
|
||||
// https://streams.spec.whatwg.org/#writable-stream-default-controller-clear-algorithms
|
||||
void ClearAlgorithms() {
|
||||
// Step 1. Set controller.[[writeAlgorithm]] to undefined.
|
||||
mWriteAlgorithm = nullptr;
|
||||
|
||||
// Step 2. Set controller.[[closeAlgorithm]] to undefined.
|
||||
mCloseAlgorithm = nullptr;
|
||||
|
||||
// Step 3. Set controller.[[abortAlgorithm]] to undefined.
|
||||
mAbortAlgorithm = nullptr;
|
||||
|
||||
// Step 4. Set controller.[[strategySizeAlgorithm]] to undefined.
|
||||
mStrategySizeAlgorithm = nullptr;
|
||||
}
|
||||
|
||||
private:
|
||||
nsCOMPtr<nsIGlobalObject> mGlobal;
|
||||
|
||||
// Internal Slots
|
||||
QueueWithSizes mQueue = {};
|
||||
double mQueueTotalSize = 0.0;
|
||||
RefPtr<AbortSignal> mSignal;
|
||||
bool mStarted = false;
|
||||
double mStrategyHWM = 0.0;
|
||||
|
||||
RefPtr<QueuingStrategySize> mStrategySizeAlgorithm;
|
||||
RefPtr<UnderlyingSinkWriteCallbackHelper> mWriteAlgorithm;
|
||||
RefPtr<UnderlyingSinkCloseCallbackHelper> mCloseAlgorithm;
|
||||
RefPtr<UnderlyingSinkAbortCallbackHelper> mAbortAlgorithm;
|
||||
RefPtr<WritableStream> mStream;
|
||||
};
|
||||
|
||||
extern void SetUpWritableStreamDefaultController(
|
||||
JSContext* aCx, WritableStream* aStream,
|
||||
WritableStreamDefaultController* aController,
|
||||
UnderlyingSinkStartCallbackHelper* aStartAlgorithm,
|
||||
UnderlyingSinkWriteCallbackHelper* aWriteAlgorithm,
|
||||
UnderlyingSinkCloseCallbackHelper* aCloseAlgorithm,
|
||||
UnderlyingSinkAbortCallbackHelper* aAbortAlgorithm, double aHighWaterMark,
|
||||
QueuingStrategySize* aSizeAlgorithm, ErrorResult& aRv);
|
||||
|
||||
extern void SetUpWritableStreamDefaultControllerFromUnderlyingSink(
|
||||
JSContext* aCx, WritableStream* aStream, JS::HandleObject aUnderlyingSink,
|
||||
UnderlyingSink& aUnderlyingSinkDict, double aHighWaterMark,
|
||||
QueuingStrategySize* aSizeAlgorithm, ErrorResult& aRv);
|
||||
|
||||
extern void WritableStreamDefaultControllerClose(
|
||||
JSContext* aCx, WritableStreamDefaultController* aController,
|
||||
ErrorResult& aRv);
|
||||
|
||||
extern void WritableStreamDefaultControllerWrite(
|
||||
JSContext* aCx, WritableStreamDefaultController* aController,
|
||||
JS::Handle<JS::Value> aChunk, double chunkSize, ErrorResult& aRv);
|
||||
|
||||
extern void WritableStreamDefaultControllerError(
|
||||
JSContext* aCx, WritableStreamDefaultController* aController,
|
||||
JS::Handle<JS::Value> aError, ErrorResult& aRv);
|
||||
|
||||
extern void WritableStreamDefaultControllerErrorIfNeeded(
|
||||
JSContext* aCx, WritableStreamDefaultController* aController,
|
||||
JS::Handle<JS::Value> aError, ErrorResult& aRv);
|
||||
|
||||
extern double WritableStreamDefaultControllerGetChunkSize(
|
||||
JSContext* aCx, WritableStreamDefaultController* aController,
|
||||
JS::Handle<JS::Value> aChunk, ErrorResult& aRv);
|
||||
|
||||
} // namespace dom
|
||||
} // namespace mozilla
|
||||
|
||||
#endif // mozilla_dom_WritableStreamDefaultController_h
|
549
dom/streams/WritableStreamDefaultWriter.cpp
Normal file
549
dom/streams/WritableStreamDefaultWriter.cpp
Normal file
@ -0,0 +1,549 @@
|
||||
/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
|
||||
/* vim:set ts=2 sw=2 sts=2 et cindent: */
|
||||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
#include "mozilla/dom/WritableStreamDefaultWriter.h"
|
||||
#include "js/Array.h"
|
||||
#include "js/TypeDecls.h"
|
||||
#include "js/Value.h"
|
||||
#include "mozilla/AlreadyAddRefed.h"
|
||||
#include "mozilla/Assertions.h"
|
||||
#include "mozilla/Attributes.h"
|
||||
#include "mozilla/CycleCollectedJSContext.h"
|
||||
#include "mozilla/FloatingPoint.h"
|
||||
#include "mozilla/HoldDropJSObjects.h"
|
||||
#include "mozilla/dom/WritableStreamDefaultWriterBinding.h"
|
||||
#include "nsCOMPtr.h"
|
||||
|
||||
#include "mozilla/dom/Promise-inl.h"
|
||||
#include "nsIGlobalObject.h"
|
||||
#include "nsISupports.h"
|
||||
|
||||
#include <unistd.h>
|
||||
|
||||
namespace mozilla::dom {
|
||||
|
||||
NS_IMPL_CYCLE_COLLECTION_CLASS(WritableStreamDefaultWriter)
|
||||
NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(WritableStreamDefaultWriter)
|
||||
NS_IMPL_CYCLE_COLLECTION_UNLINK(mGlobal, mStream, mReadyPromise,
|
||||
mClosedPromise)
|
||||
NS_IMPL_CYCLE_COLLECTION_UNLINK_PRESERVED_WRAPPER
|
||||
NS_IMPL_CYCLE_COLLECTION_UNLINK_END
|
||||
|
||||
NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN(WritableStreamDefaultWriter)
|
||||
NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mGlobal, mStream, mReadyPromise,
|
||||
mClosedPromise)
|
||||
NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
|
||||
|
||||
NS_IMPL_CYCLE_COLLECTION_TRACE_BEGIN(WritableStreamDefaultWriter)
|
||||
NS_IMPL_CYCLE_COLLECTION_TRACE_PRESERVED_WRAPPER
|
||||
NS_IMPL_CYCLE_COLLECTION_TRACE_END
|
||||
|
||||
NS_IMPL_CYCLE_COLLECTING_ADDREF(WritableStreamDefaultWriter)
|
||||
NS_IMPL_CYCLE_COLLECTING_RELEASE(WritableStreamDefaultWriter)
|
||||
NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(WritableStreamDefaultWriter)
|
||||
NS_WRAPPERCACHE_INTERFACE_MAP_ENTRY
|
||||
NS_INTERFACE_MAP_ENTRY(nsISupports)
|
||||
NS_INTERFACE_MAP_END
|
||||
|
||||
WritableStreamDefaultWriter::WritableStreamDefaultWriter(
|
||||
nsIGlobalObject* aGlobal)
|
||||
: mGlobal(aGlobal) {
|
||||
mozilla::HoldJSObjects(this);
|
||||
}
|
||||
|
||||
WritableStreamDefaultWriter::~WritableStreamDefaultWriter() {
|
||||
mozilla::DropJSObjects(this);
|
||||
}
|
||||
|
||||
void WritableStreamDefaultWriter::SetReadyPromise(Promise* aPromise) {
|
||||
MOZ_ASSERT(aPromise);
|
||||
mReadyPromise = aPromise;
|
||||
}
|
||||
|
||||
void WritableStreamDefaultWriter::SetClosedPromise(Promise* aPromise) {
|
||||
MOZ_ASSERT(aPromise);
|
||||
mClosedPromise = aPromise;
|
||||
}
|
||||
|
||||
JSObject* WritableStreamDefaultWriter::WrapObject(
|
||||
JSContext* aCx, JS::Handle<JSObject*> aGivenProto) {
|
||||
return WritableStreamDefaultWriter_Binding::Wrap(aCx, this, aGivenProto);
|
||||
}
|
||||
|
||||
/* static */
|
||||
already_AddRefed<WritableStreamDefaultWriter>
|
||||
WritableStreamDefaultWriter::Constructor(const GlobalObject& aGlobal,
|
||||
WritableStream& aStream,
|
||||
ErrorResult& aRv) {
|
||||
nsCOMPtr<nsIGlobalObject> global = do_QueryInterface(aGlobal.GetAsSupports());
|
||||
RefPtr<WritableStreamDefaultWriter> writer =
|
||||
new WritableStreamDefaultWriter(global);
|
||||
SetUpWritableStreamDefaultWriter(writer, &aStream, aRv);
|
||||
if (aRv.Failed()) {
|
||||
return nullptr;
|
||||
}
|
||||
return writer.forget();
|
||||
}
|
||||
|
||||
already_AddRefed<Promise> WritableStreamDefaultWriter::Closed() {
|
||||
RefPtr<Promise> closedPromise = mClosedPromise;
|
||||
return closedPromise.forget();
|
||||
}
|
||||
|
||||
already_AddRefed<Promise> WritableStreamDefaultWriter::Ready() {
|
||||
RefPtr<Promise> readyPromise = mReadyPromise;
|
||||
return readyPromise.forget();
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#writable-stream-default-writer-get-desired-size
|
||||
static Nullable<double> WritableStreamDefaultWriterGetDesiredSize(
|
||||
WritableStreamDefaultWriter* aWriter, ErrorResult& aRv) {
|
||||
// Step 1. Let stream be writer.[[stream]].
|
||||
RefPtr<WritableStream> stream = aWriter->GetStream();
|
||||
|
||||
// Step 2. Let state be stream.[[state]].
|
||||
WritableStream::WriterState state = stream->State();
|
||||
|
||||
// Step 3. If state is "errored" or "erroring", return null.
|
||||
if (state == WritableStream::WriterState::Errored ||
|
||||
state == WritableStream::WriterState::Erroring) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Step 4. If state is "closed", return 0.
|
||||
if (state == WritableStream::WriterState::Closed) {
|
||||
return 0.0;
|
||||
}
|
||||
|
||||
// Step 5. Return
|
||||
// ! WritableStreamDefaultControllerGetDesiredSize(stream.[[controller]]).
|
||||
return stream->Controller()->GetDesiredSize();
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#default-writer-desired-size
|
||||
Nullable<double> WritableStreamDefaultWriter::GetDesiredSize(ErrorResult& aRv) {
|
||||
// Step 1. If this.[[stream]] is undefined, throw a TypeError exception.
|
||||
if (!mStream) {
|
||||
aRv.ThrowTypeError("Missing stream");
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Step 2. Return ! WritableStreamDefaultWriterGetDesiredSize(this).
|
||||
RefPtr<WritableStreamDefaultWriter> thisRefPtr = this;
|
||||
return WritableStreamDefaultWriterGetDesiredSize(thisRefPtr, aRv);
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#writable-stream-default-writer-abort
|
||||
already_AddRefed<Promise> WritableStreamDefaultWriterAbort(
|
||||
JSContext* aCx, WritableStreamDefaultWriter* aWriter,
|
||||
JS::Handle<JS::Value> aReason, ErrorResult& aRv) {
|
||||
// Step 1. Let stream be writer.[[stream]].
|
||||
RefPtr<WritableStream> stream = aWriter->GetStream();
|
||||
|
||||
// Step 2. Assert: stream is not undefined.
|
||||
MOZ_ASSERT(stream);
|
||||
|
||||
// Step 3. Return ! WritableStreamAbort(stream, reason).
|
||||
return WritableStreamAbort(aCx, stream, aReason, aRv);
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#default-writer-abort
|
||||
already_AddRefed<Promise> WritableStreamDefaultWriter::Abort(
|
||||
JSContext* aCx, JS::Handle<JS::Value> aReason, ErrorResult& aRv) {
|
||||
// Step 1. If this.[[stream]] is undefined, return a promise rejected with a
|
||||
// TypeError exception.
|
||||
if (!mStream) {
|
||||
aRv.ThrowTypeError("Missing stream");
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Step 2. Return ! WritableStreamDefaultWriterAbort(this, reason).
|
||||
RefPtr<WritableStreamDefaultWriter> thisRefPtr = this;
|
||||
return WritableStreamDefaultWriterAbort(aCx, thisRefPtr, aReason, aRv);
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#writable-stream-default-writer-close
|
||||
already_AddRefed<Promise> WritableStreamDefaultWriterClose(
|
||||
JSContext* aCx, WritableStreamDefaultWriter* aWriter, ErrorResult& aRv) {
|
||||
// Step 1. Let stream be writer.[[stream]].
|
||||
RefPtr<WritableStream> stream = aWriter->GetStream();
|
||||
|
||||
// Step 2. Assert: stream is not undefined.
|
||||
MOZ_ASSERT(stream);
|
||||
|
||||
// Step 3. Return ! WritableStreamClose(stream).
|
||||
return WritableStreamClose(aCx, stream, aRv);
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#default-writer-close
|
||||
already_AddRefed<Promise> WritableStreamDefaultWriter::Close(JSContext* aCx,
|
||||
ErrorResult& aRv) {
|
||||
// Step 1. Let stream be this.[[stream]].
|
||||
RefPtr<WritableStream> stream = mStream;
|
||||
|
||||
// Step 2. If stream is undefined, return a promise rejected with a TypeError
|
||||
// exception.
|
||||
if (!stream) {
|
||||
aRv.ThrowTypeError("Missing stream");
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Step 3. If ! WritableStreamCloseQueuedOrInFlight(stream) is true,
|
||||
// return a promise rejected with a TypeError exception.
|
||||
if (stream->CloseQueuedOrInFlight()) {
|
||||
aRv.ThrowTypeError("Stream is closing");
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Step 3. Return ! WritableStreamDefaultWriterClose(this).
|
||||
RefPtr<WritableStreamDefaultWriter> thisRefPtr = this;
|
||||
return WritableStreamDefaultWriterClose(aCx, thisRefPtr, aRv);
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#writable-stream-default-writer-release
|
||||
void WritableStreamDefaultWriterRelease(JSContext* aCx,
|
||||
WritableStreamDefaultWriter* aWriter,
|
||||
ErrorResult& aRv) {
|
||||
// Step 1. Let stream be writer.[[stream]].
|
||||
RefPtr<WritableStream> stream = aWriter->GetStream();
|
||||
|
||||
// Step 2. Assert: stream is not undefined.
|
||||
MOZ_ASSERT(stream);
|
||||
|
||||
// Step 3. Assert: stream.[[writer]] is writer.
|
||||
MOZ_ASSERT(stream->GetWriter() == aWriter);
|
||||
|
||||
// Step 4. Let releasedError be a new TypeError.
|
||||
JS::Rooted<JS::Value> releasedError(RootingCx(), JS::UndefinedValue());
|
||||
{
|
||||
ErrorResult rv;
|
||||
rv.ThrowTypeError("Releasing lock");
|
||||
bool ok = ToJSValue(aCx, std::move(rv), &releasedError);
|
||||
MOZ_RELEASE_ASSERT(ok, "must be ok");
|
||||
}
|
||||
|
||||
// Step 5. Perform !
|
||||
// WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer,
|
||||
// releasedError).
|
||||
WritableStreamDefaultWriterEnsureReadyPromiseRejected(aWriter, releasedError,
|
||||
aRv);
|
||||
if (aRv.Failed()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Step 6. Perform !
|
||||
// WritableStreamDefaultWriterEnsureClosedPromiseRejected(writer,
|
||||
// releasedError).
|
||||
WritableStreamDefaultWriterEnsureClosedPromiseRejected(aWriter, releasedError,
|
||||
aRv);
|
||||
if (aRv.Failed()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Step 7. Set stream.[[writer]] to undefined.
|
||||
stream->SetWriter(nullptr);
|
||||
|
||||
// Step 8. Set writer.[[stream]] to undefined.
|
||||
aWriter->SetStream(nullptr);
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#default-writer-release-lock
|
||||
void WritableStreamDefaultWriter::ReleaseLock(JSContext* aCx,
|
||||
ErrorResult& aRv) {
|
||||
// Step 1. Let stream be this.[[stream]].
|
||||
RefPtr<WritableStream> stream = mStream;
|
||||
|
||||
// Step 2. If stream is undefined, return.
|
||||
if (!stream) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Step 3. Assert: stream.[[writer]] is not undefined.
|
||||
MOZ_ASSERT(stream->GetWriter());
|
||||
|
||||
// Step 4. Perform ! WritableStreamDefaultWriterRelease(this).
|
||||
RefPtr<WritableStreamDefaultWriter> thisRefPtr = this;
|
||||
return WritableStreamDefaultWriterRelease(aCx, thisRefPtr, aRv);
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#writable-stream-default-writer-write
|
||||
already_AddRefed<Promise> WritableStreamDefaultWriterWrite(
|
||||
JSContext* aCx, WritableStreamDefaultWriter* aWriter,
|
||||
JS::Handle<JS::Value> aChunk, ErrorResult& aRv) {
|
||||
// Step 1. Let stream be writer.[[stream]].
|
||||
RefPtr<WritableStream> stream = aWriter->GetStream();
|
||||
|
||||
// Step 2. Assert: stream is not undefined.
|
||||
MOZ_ASSERT(stream);
|
||||
|
||||
// Step 3. Let controller be stream.[[controller]].
|
||||
RefPtr<WritableStreamDefaultController> controller = stream->Controller();
|
||||
|
||||
// Step 4. Let chunkSize be !
|
||||
// WritableStreamDefaultControllerGetChunkSize(controller, chunk).
|
||||
double chunkSize =
|
||||
WritableStreamDefaultControllerGetChunkSize(aCx, controller, aChunk, aRv);
|
||||
if (aRv.Failed()) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Step 5. If stream is not equal to writer.[[stream]], return a promise
|
||||
// rejected with a TypeError exception.
|
||||
if (stream != aWriter->GetStream()) {
|
||||
aRv.ThrowTypeError(
|
||||
"Can not write on WritableStream owned by another writer.");
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Step 6. Let state be stream.[[state]].
|
||||
WritableStream::WriterState state = stream->State();
|
||||
|
||||
// Step 7. If state is "errored", return a promise rejected with
|
||||
// stream.[[storedError]].
|
||||
if (state == WritableStream::WriterState::Errored) {
|
||||
RefPtr<Promise> promise = Promise::Create(aWriter->GetParentObject(), aRv);
|
||||
if (aRv.Failed()) {
|
||||
return nullptr;
|
||||
}
|
||||
JS::Rooted<JS::Value> error(aCx, stream->StoredError());
|
||||
promise->MaybeReject(error);
|
||||
return promise.forget();
|
||||
}
|
||||
|
||||
// Step 8. If ! WritableStreamCloseQueuedOrInFlight(stream) is true or state
|
||||
// is "closed", return a promise rejected with a TypeError exception
|
||||
// indicating that the stream is closing or closed.
|
||||
if (stream->CloseQueuedOrInFlight() ||
|
||||
state == WritableStream::WriterState::Closed) {
|
||||
RefPtr<Promise> promise = Promise::Create(aWriter->GetParentObject(), aRv);
|
||||
if (aRv.Failed()) {
|
||||
return nullptr;
|
||||
}
|
||||
promise->MaybeRejectWithTypeError("Stream is closed or closing");
|
||||
return promise.forget();
|
||||
}
|
||||
|
||||
// Step 9. If state is "erroring", return a promise rejected with
|
||||
// stream.[[storedError]].
|
||||
if (state == WritableStream::WriterState::Erroring) {
|
||||
RefPtr<Promise> promise = Promise::Create(aWriter->GetParentObject(), aRv);
|
||||
if (aRv.Failed()) {
|
||||
return nullptr;
|
||||
}
|
||||
JS::Rooted<JS::Value> error(aCx, stream->StoredError());
|
||||
promise->MaybeReject(error);
|
||||
return promise.forget();
|
||||
}
|
||||
|
||||
// Step 10. Assert: state is "writable".
|
||||
MOZ_ASSERT(state == WritableStream::WriterState::Writable);
|
||||
|
||||
// Step 11. Let promise be ! WritableStreamAddWriteRequest(stream).
|
||||
RefPtr<Promise> promise = WritableStreamAddWriteRequest(stream, aRv);
|
||||
if (aRv.Failed()) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Step 12. Perform ! WritableStreamDefaultControllerWrite(controller, chunk,
|
||||
// chunkSize).
|
||||
WritableStreamDefaultControllerWrite(aCx, controller, aChunk, chunkSize, aRv);
|
||||
if (aRv.Failed()) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Step 13. Return promise.
|
||||
return promise.forget();
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#default-writer-write
|
||||
already_AddRefed<Promise> WritableStreamDefaultWriter::Write(
|
||||
JSContext* aCx, JS::Handle<JS::Value> aChunk, ErrorResult& aRv) {
|
||||
// Step 1. If this.[[stream]] is undefined, return a promise rejected with a
|
||||
// TypeError exception.
|
||||
if (!mStream) {
|
||||
aRv.ThrowTypeError("Missing stream");
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Step 2. Return ! WritableStreamDefaultWriterWrite(this, chunk).
|
||||
return WritableStreamDefaultWriterWrite(aCx, this, aChunk, aRv);
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#set-up-writable-stream-default-writer
|
||||
void SetUpWritableStreamDefaultWriter(WritableStreamDefaultWriter* aWriter,
|
||||
WritableStream* aStream,
|
||||
ErrorResult& aRv) {
|
||||
// Step 1. If ! IsWritableStreamLocked(stream) is true, throw a TypeError
|
||||
// exception.
|
||||
if (IsWritableStreamLocked(aStream)) {
|
||||
aRv.ThrowTypeError("WritableStream is already locked!");
|
||||
return;
|
||||
}
|
||||
|
||||
// Step 2. Set writer.[[stream]] to stream.
|
||||
aWriter->SetStream(aStream);
|
||||
|
||||
// Step 3. Set stream.[[writer]] to writer.
|
||||
aStream->SetWriter(aWriter);
|
||||
|
||||
// Step 4. Let state be stream.[[state]].
|
||||
WritableStream::WriterState state = aStream->State();
|
||||
|
||||
// Step 5. If state is "writable",
|
||||
if (state == WritableStream::WriterState::Writable) {
|
||||
RefPtr<Promise> readyPromise =
|
||||
Promise::Create(aWriter->GetParentObject(), aRv);
|
||||
if (aRv.Failed()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Step 5.1 If ! WritableStreamCloseQueuedOrInFlight(stream) is false and
|
||||
// stream.[[backpressure]] is true, set writer.[[readyPromise]] to a new
|
||||
// promise.
|
||||
if (!aStream->CloseQueuedOrInFlight() && aStream->Backpressure()) {
|
||||
aWriter->SetReadyPromise(readyPromise);
|
||||
} else {
|
||||
// Step 5.2. Otherwise, set writer.[[readyPromise]] to a promise resolved
|
||||
// with undefined.
|
||||
readyPromise->MaybeResolveWithUndefined();
|
||||
aWriter->SetReadyPromise(readyPromise);
|
||||
}
|
||||
|
||||
// Step 5.3. Set writer.[[closedPromise]] to a new promise.
|
||||
RefPtr<Promise> closedPromise =
|
||||
Promise::Create(aWriter->GetParentObject(), aRv);
|
||||
if (aRv.Failed()) {
|
||||
return;
|
||||
}
|
||||
aWriter->SetClosedPromise(closedPromise);
|
||||
} else if (state == WritableStream::WriterState::Erroring) {
|
||||
// Step 6. Otherwise, if state is "erroring",
|
||||
|
||||
// Step 6.1. Set writer.[[readyPromise]] to a promise rejected with
|
||||
// stream.[[storedError]].
|
||||
JS::Rooted<JS::Value> storedError(RootingCx(), aStream->StoredError());
|
||||
RefPtr<Promise> readyPromise =
|
||||
Promise::Create(aWriter->GetParentObject(), aRv);
|
||||
if (aRv.Failed()) {
|
||||
return;
|
||||
}
|
||||
readyPromise->MaybeReject(storedError);
|
||||
aWriter->SetReadyPromise(readyPromise);
|
||||
|
||||
// Step 6.2. Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
|
||||
readyPromise->SetSettledPromiseIsHandled();
|
||||
|
||||
// Step 6.3. Set writer.[[closedPromise]] to a new promise.
|
||||
RefPtr<Promise> closedPromise =
|
||||
Promise::Create(aWriter->GetParentObject(), aRv);
|
||||
if (aRv.Failed()) {
|
||||
return;
|
||||
}
|
||||
aWriter->SetClosedPromise(closedPromise);
|
||||
} else if (state == WritableStream::WriterState::Closed) {
|
||||
// Step 7. Otherwise, if state is "closed",
|
||||
// Step 7.1. Set writer.[[readyPromise]] to a promise resolved with
|
||||
// undefined.
|
||||
RefPtr<Promise> readyPromise =
|
||||
Promise::CreateResolvedWithUndefined(aWriter->GetParentObject(), aRv);
|
||||
if (aRv.Failed()) {
|
||||
return;
|
||||
}
|
||||
aWriter->SetReadyPromise(readyPromise);
|
||||
|
||||
// Step 7.2. Set writer.[[closedPromise]] to a promise resolved with
|
||||
// undefined.
|
||||
RefPtr<Promise> closedPromise =
|
||||
Promise::CreateResolvedWithUndefined(aWriter->GetParentObject(), aRv);
|
||||
if (aRv.Failed()) {
|
||||
return;
|
||||
}
|
||||
aWriter->SetClosedPromise(closedPromise);
|
||||
} else {
|
||||
// Step 8. Otherwise,
|
||||
// Step 8.1 Assert: state is "errored".
|
||||
MOZ_ASSERT(state == WritableStream::WriterState::Errored);
|
||||
|
||||
// Step 8.2. Step Let storedError be stream.[[storedError]].
|
||||
JS::Rooted<JS::Value> storedError(RootingCx(), aStream->StoredError());
|
||||
|
||||
// Step 8.3. Set writer.[[readyPromise]] to a promise rejected with
|
||||
// storedError.
|
||||
RefPtr<Promise> readyPromise =
|
||||
Promise::Create(aWriter->GetParentObject(), aRv);
|
||||
if (aRv.Failed()) {
|
||||
return;
|
||||
}
|
||||
readyPromise->MaybeReject(storedError);
|
||||
aWriter->SetReadyPromise(readyPromise);
|
||||
|
||||
// Step 8.4. Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
|
||||
readyPromise->SetSettledPromiseIsHandled();
|
||||
|
||||
// Step 8.5. Set writer.[[closedPromise]] to a promise rejected with
|
||||
// storedError.
|
||||
RefPtr<Promise> closedPromise =
|
||||
Promise::Create(aWriter->GetParentObject(), aRv);
|
||||
if (aRv.Failed()) {
|
||||
return;
|
||||
}
|
||||
closedPromise->MaybeReject(storedError);
|
||||
aWriter->SetClosedPromise(closedPromise);
|
||||
|
||||
// Step 8.6 Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
|
||||
closedPromise->SetSettledPromiseIsHandled();
|
||||
}
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#writable-stream-default-writer-ensure-closed-promise-rejected
|
||||
void WritableStreamDefaultWriterEnsureClosedPromiseRejected(
|
||||
WritableStreamDefaultWriter* aWriter, JS::Handle<JS::Value> aError,
|
||||
ErrorResult& aRv) {
|
||||
RefPtr<Promise> closedPromise = aWriter->ClosedPromise();
|
||||
// Step 1. If writer.[[closedPromise]].[[PromiseState]] is "pending", reject
|
||||
// writer.[[closedPromise]] with error.
|
||||
if (closedPromise->State() == Promise::PromiseState::Pending) {
|
||||
closedPromise->MaybeReject(aError);
|
||||
} else {
|
||||
// Step 2. Otherwise, set writer.[[closedPromise]] to a promise rejected
|
||||
// with error.
|
||||
closedPromise = Promise::Create(aWriter->GetParentObject(), aRv);
|
||||
if (aRv.Failed()) {
|
||||
return;
|
||||
}
|
||||
closedPromise->MaybeReject(aError);
|
||||
aWriter->SetClosedPromise(closedPromise);
|
||||
}
|
||||
|
||||
// Step 3. Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
|
||||
closedPromise->SetSettledPromiseIsHandled();
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#writable-stream-default-writer-ensure-ready-promise-rejected
|
||||
void WritableStreamDefaultWriterEnsureReadyPromiseRejected(
|
||||
WritableStreamDefaultWriter* aWriter, JS::Handle<JS::Value> aError,
|
||||
ErrorResult& aRv) {
|
||||
RefPtr<Promise> readyPromise = aWriter->ReadyPromise();
|
||||
// Step 1. If writer.[[readyPromise]].[[PromiseState]] is "pending", reject
|
||||
// writer.[[readyPromise]] with error.
|
||||
if (readyPromise->State() == Promise::PromiseState::Pending) {
|
||||
readyPromise->MaybeReject(aError);
|
||||
} else {
|
||||
// Step 2. Otherwise, set writer.[[readyPromise]] to a promise rejected with
|
||||
// error.
|
||||
readyPromise = Promise::Create(aWriter->GetParentObject(), aRv);
|
||||
if (aRv.Failed()) {
|
||||
return;
|
||||
}
|
||||
readyPromise->MaybeReject(aError);
|
||||
aWriter->SetReadyPromise(readyPromise);
|
||||
}
|
||||
|
||||
// Step 3. Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
|
||||
readyPromise->SetSettledPromiseIsHandled();
|
||||
}
|
||||
|
||||
} // namespace mozilla::dom
|
99
dom/streams/WritableStreamDefaultWriter.h
Normal file
99
dom/streams/WritableStreamDefaultWriter.h
Normal file
@ -0,0 +1,99 @@
|
||||
/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
|
||||
/* vim:set ts=2 sw=2 sts=2 et cindent: */
|
||||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
#ifndef mozilla_dom_WritableStreamDefaultWriter_h
|
||||
#define mozilla_dom_WritableStreamDefaultWriter_h
|
||||
|
||||
#include "js/TypeDecls.h"
|
||||
#include "js/Value.h"
|
||||
#include "mozilla/Attributes.h"
|
||||
#include "mozilla/ErrorResult.h"
|
||||
#include "mozilla/dom/BindingDeclarations.h"
|
||||
#include "mozilla/dom/QueuingStrategyBinding.h"
|
||||
#include "mozilla/dom/WritableStream.h"
|
||||
|
||||
#include "nsCycleCollectionParticipant.h"
|
||||
#include "nsWrapperCache.h"
|
||||
|
||||
#ifndef MOZ_DOM_STREAMS
|
||||
# error "Shouldn't be compiling with this header without MOZ_DOM_STREAMS set"
|
||||
#endif
|
||||
|
||||
namespace mozilla::dom {
|
||||
|
||||
class Promise;
|
||||
|
||||
class WritableStreamDefaultWriter final : public nsISupports,
|
||||
public nsWrapperCache {
|
||||
public:
|
||||
NS_DECL_CYCLE_COLLECTING_ISUPPORTS
|
||||
NS_DECL_CYCLE_COLLECTION_SCRIPT_HOLDER_CLASS(WritableStreamDefaultWriter)
|
||||
|
||||
protected:
|
||||
~WritableStreamDefaultWriter();
|
||||
|
||||
public:
|
||||
explicit WritableStreamDefaultWriter(nsIGlobalObject* aGlobal);
|
||||
|
||||
// Slot Getter/Setters:
|
||||
public:
|
||||
WritableStream* GetStream() const { return mStream; }
|
||||
void SetStream(WritableStream* aStream) { mStream = aStream; }
|
||||
|
||||
Promise* ReadyPromise() const { return mReadyPromise; }
|
||||
void SetReadyPromise(Promise* aPromise);
|
||||
|
||||
Promise* ClosedPromise() const { return mClosedPromise; }
|
||||
void SetClosedPromise(Promise* aPromise);
|
||||
|
||||
public:
|
||||
nsIGlobalObject* GetParentObject() const { return mGlobal; }
|
||||
|
||||
JSObject* WrapObject(JSContext* aCx,
|
||||
JS::Handle<JSObject*> aGivenProto) override;
|
||||
|
||||
// IDL Methods
|
||||
static already_AddRefed<WritableStreamDefaultWriter> Constructor(
|
||||
const GlobalObject& aGlobal, WritableStream& aStream, ErrorResult& aRv);
|
||||
|
||||
already_AddRefed<Promise> Closed();
|
||||
already_AddRefed<Promise> Ready();
|
||||
|
||||
Nullable<double> GetDesiredSize(ErrorResult& aRv);
|
||||
|
||||
already_AddRefed<Promise> Abort(JSContext* aCx, JS::Handle<JS::Value> aReason,
|
||||
ErrorResult& aRv);
|
||||
|
||||
already_AddRefed<Promise> Close(JSContext* aCx, ErrorResult& aRv);
|
||||
|
||||
void ReleaseLock(JSContext* aCx, ErrorResult& aRv);
|
||||
|
||||
already_AddRefed<Promise> Write(JSContext* aCx, JS::Handle<JS::Value> aChunk,
|
||||
ErrorResult& aRv);
|
||||
|
||||
// Internal Slots:
|
||||
private:
|
||||
nsCOMPtr<nsIGlobalObject> mGlobal;
|
||||
RefPtr<WritableStream> mStream;
|
||||
RefPtr<Promise> mReadyPromise;
|
||||
RefPtr<Promise> mClosedPromise;
|
||||
};
|
||||
|
||||
extern void SetUpWritableStreamDefaultWriter(
|
||||
WritableStreamDefaultWriter* aWriter, WritableStream* aStream,
|
||||
ErrorResult& aRv);
|
||||
|
||||
extern void WritableStreamDefaultWriterEnsureClosedPromiseRejected(
|
||||
WritableStreamDefaultWriter* aWriter, JS::Handle<JS::Value> aError,
|
||||
ErrorResult& aRv);
|
||||
|
||||
extern void WritableStreamDefaultWriterEnsureReadyPromiseRejected(
|
||||
WritableStreamDefaultWriter* aWriter, JS::Handle<JS::Value> aError,
|
||||
ErrorResult& aRv);
|
||||
|
||||
} // namespace mozilla::dom
|
||||
|
||||
#endif // mozilla_dom_WritableStreamDefaultWriter_h
|
@ -16,8 +16,13 @@ EXPORTS.mozilla.dom += [
|
||||
"ReadableStreamGenericReader.h",
|
||||
"ReadableStreamTee.h",
|
||||
"ReadRequest.h",
|
||||
"StreamUtils.h",
|
||||
"TeeState.h",
|
||||
"UnderlyingSinkCallbackHelpers.h",
|
||||
"UnderlyingSourceCallbackHelpers.h",
|
||||
"WritableStream.h",
|
||||
"WritableStreamDefaultController.h",
|
||||
"WritableStreamDefaultWriter.h",
|
||||
]
|
||||
|
||||
UNIFIED_SOURCES += [
|
||||
@ -26,8 +31,13 @@ UNIFIED_SOURCES += [
|
||||
"ReadableStreamDefaultController.cpp",
|
||||
"ReadableStreamDefaultReader.cpp",
|
||||
"ReadableStreamTee.cpp",
|
||||
"StreamUtils.cpp",
|
||||
"TeeState.cpp",
|
||||
"UnderlyingSinkCallbackHelpers.cpp",
|
||||
"UnderlyingSourceCallbackHelpers.cpp",
|
||||
"WritableStream.cpp",
|
||||
"WritableStreamDefaultController.cpp",
|
||||
"WritableStreamDefaultWriter.cpp",
|
||||
]
|
||||
|
||||
FINAL_LIBRARY = "xul"
|
||||
|
13
dom/webidl/UnderlyingSink.webidl
Normal file
13
dom/webidl/UnderlyingSink.webidl
Normal file
@ -0,0 +1,13 @@
|
||||
[GenerateInit]
|
||||
dictionary UnderlyingSink {
|
||||
UnderlyingSinkStartCallback start;
|
||||
UnderlyingSinkWriteCallback write;
|
||||
UnderlyingSinkCloseCallback close;
|
||||
UnderlyingSinkAbortCallback abort;
|
||||
any type;
|
||||
};
|
||||
|
||||
callback UnderlyingSinkStartCallback = any (WritableStreamDefaultController controller);
|
||||
callback UnderlyingSinkWriteCallback = Promise<void> (any chunk, WritableStreamDefaultController controller);
|
||||
callback UnderlyingSinkCloseCallback = Promise<void> ();
|
||||
callback UnderlyingSinkAbortCallback = Promise<void> (optional any reason);
|
18
dom/webidl/WritableStream.webidl
Normal file
18
dom/webidl/WritableStream.webidl
Normal file
@ -0,0 +1,18 @@
|
||||
[Exposed=(Window,Worker,Worklet),
|
||||
//Transferable See Bug 1734240
|
||||
]
|
||||
interface WritableStream {
|
||||
[Throws]
|
||||
constructor(optional object underlyingSink, optional QueuingStrategy strategy = {});
|
||||
|
||||
readonly attribute boolean locked;
|
||||
|
||||
[Throws]
|
||||
Promise<void> abort(optional any reason);
|
||||
|
||||
[Throws]
|
||||
Promise<void> close();
|
||||
|
||||
[Throws]
|
||||
WritableStreamDefaultWriter getWriter();
|
||||
};
|
12
dom/webidl/WritableStreamDefaultController.webidl
Normal file
12
dom/webidl/WritableStreamDefaultController.webidl
Normal file
@ -0,0 +1,12 @@
|
||||
[Exposed=(Window,Worker,Worklet)]
|
||||
interface WritableStreamDefaultController {
|
||||
[Throws]
|
||||
void error(optional any e);
|
||||
};
|
||||
|
||||
// TODO: AbortSignal is not exposed on Worklet
|
||||
[Exposed=(Window,Worker)]
|
||||
partial interface WritableStreamDefaultController {
|
||||
readonly attribute AbortSignal signal;
|
||||
};
|
||||
|
21
dom/webidl/WritableStreamDefaultWriter.webidl
Normal file
21
dom/webidl/WritableStreamDefaultWriter.webidl
Normal file
@ -0,0 +1,21 @@
|
||||
[Exposed=(Window,Worker,Worklet)]
|
||||
interface WritableStreamDefaultWriter {
|
||||
[Throws]
|
||||
constructor(WritableStream stream);
|
||||
|
||||
readonly attribute Promise<void> closed;
|
||||
[Throws] readonly attribute unrestricted double? desiredSize;
|
||||
readonly attribute Promise<void> ready;
|
||||
|
||||
[Throws]
|
||||
Promise<void> abort(optional any reason);
|
||||
|
||||
[Throws]
|
||||
Promise<void> close();
|
||||
|
||||
[Throws]
|
||||
void releaseLock();
|
||||
|
||||
[Throws]
|
||||
Promise<void> write(optional any chunk);
|
||||
};
|
@ -1008,7 +1008,11 @@ if CONFIG["MOZ_DOM_STREAMS"]:
|
||||
"ReadableStream.webidl",
|
||||
"ReadableStreamDefaultController.webidl",
|
||||
"ReadableStreamDefaultReader.webidl",
|
||||
"UnderlyingSink.webidl",
|
||||
"UnderlyingSource.webidl",
|
||||
"WritableStream.webidl",
|
||||
"WritableStreamDefaultController.webidl",
|
||||
"WritableStreamDefaultWriter.webidl",
|
||||
]
|
||||
|
||||
if CONFIG["MOZ_WEBRTC"]:
|
||||
|
Loading…
Reference in New Issue
Block a user