Bug 1754004 - Part 17: Keep pipe streams alive so long as there's a callback registered, r=asuth

If we don't do this, we can encounter issues where we'll spuriously close the
stream when the last reference to the input stream is dropped while an
AsyncWait is still pending.

Differential Revision: https://phabricator.services.mozilla.com/D145672
This commit is contained in:
Nika Layzell 2022-05-13 14:16:16 +00:00
parent dd3458ff58
commit b82512de01

View File

@ -27,6 +27,7 @@
#include "nsIAsyncInputStream.h"
#include "nsIAsyncOutputStream.h"
#include "nsIInputStreamPriority.h"
#include "nsThreadUtils.h"
using namespace mozilla;
@ -58,6 +59,76 @@ enum SegmentChangeResult { SegmentNotChanged, SegmentAdvanceBufferRead };
//-----------------------------------------------------------------------------
class CallbackHolder {
public:
CallbackHolder() = default;
MOZ_IMPLICIT CallbackHolder(std::nullptr_t) {}
CallbackHolder(nsIAsyncInputStream* aStream,
nsIInputStreamCallback* aCallback, uint32_t aFlags,
nsIEventTarget* aEventTarget)
: mRunnable(aCallback ? NS_NewCancelableRunnableFunction(
"nsPipeInputStream AsyncWait Callback",
[stream = nsCOMPtr{aStream},
callback = nsCOMPtr{aCallback}]() {
callback->OnInputStreamReady(stream);
})
: nullptr),
mEventTarget(aEventTarget),
mFlags(aFlags) {}
CallbackHolder(nsIAsyncOutputStream* aStream,
nsIOutputStreamCallback* aCallback, uint32_t aFlags,
nsIEventTarget* aEventTarget)
: mRunnable(aCallback ? NS_NewCancelableRunnableFunction(
"nsPipeOutputStream AsyncWait Callback",
[stream = nsCOMPtr{aStream},
callback = nsCOMPtr{aCallback}]() {
callback->OnOutputStreamReady(stream);
})
: nullptr),
mEventTarget(aEventTarget),
mFlags(aFlags) {}
CallbackHolder(const CallbackHolder&) = delete;
CallbackHolder(CallbackHolder&&) = default;
CallbackHolder& operator=(const CallbackHolder&) = delete;
CallbackHolder& operator=(CallbackHolder&&) = default;
CallbackHolder& operator=(std::nullptr_t) {
mRunnable = nullptr;
mEventTarget = nullptr;
mFlags = 0;
return *this;
}
MOZ_IMPLICIT operator bool() const { return mRunnable; }
uint32_t Flags() const {
MOZ_ASSERT(mRunnable, "Should only be called when a callback is present");
return mFlags;
}
void Notify() {
nsCOMPtr<nsIRunnable> runnable = mRunnable.forget();
nsCOMPtr<nsIEventTarget> eventTarget = mEventTarget.forget();
if (runnable) {
if (eventTarget) {
eventTarget->Dispatch(runnable.forget());
} else {
runnable->Run();
}
}
}
private:
nsCOMPtr<nsIRunnable> mRunnable;
nsCOMPtr<nsIEventTarget> mEventTarget;
uint32_t mFlags = 0;
};
//-----------------------------------------------------------------------------
// this class is used to delay notifications until the end of a particular
// scope. it helps avoid the complexity of issuing callbacks while inside
// a critical section.
@ -66,34 +137,12 @@ class nsPipeEvents {
nsPipeEvents() = default;
~nsPipeEvents();
inline void NotifyInputReady(nsIAsyncInputStream* aStream,
nsIInputStreamCallback* aCallback) {
mInputList.AppendElement(InputEntry(aStream, aCallback));
}
inline void NotifyOutputReady(nsIAsyncOutputStream* aStream,
nsIOutputStreamCallback* aCallback) {
MOZ_DIAGNOSTIC_ASSERT(!mOutputCallback);
mOutputStream = aStream;
mOutputCallback = aCallback;
inline void NotifyReady(CallbackHolder aCallback) {
mCallbacks.AppendElement(std::move(aCallback));
}
private:
struct InputEntry {
InputEntry(nsIAsyncInputStream* aStream, nsIInputStreamCallback* aCallback)
: mStream(aStream), mCallback(aCallback) {
MOZ_DIAGNOSTIC_ASSERT(mStream);
MOZ_DIAGNOSTIC_ASSERT(mCallback);
}
nsCOMPtr<nsIAsyncInputStream> mStream;
nsCOMPtr<nsIInputStreamCallback> mCallback;
};
nsTArray<InputEntry> mInputList;
nsCOMPtr<nsIAsyncOutputStream> mOutputStream;
nsCOMPtr<nsIOutputStreamCallback> mOutputCallback;
nsTArray<CallbackHolder> mCallbacks;
};
//-----------------------------------------------------------------------------
@ -151,7 +200,6 @@ class nsPipeInputStream final : public nsIAsyncInputStream,
mInputStatus(NS_OK),
mBlocking(true),
mBlocked(false),
mCallbackFlags(0),
mPriority(nsIRunnablePriority::PRIORITY_NORMAL) {}
nsPipeInputStream(const nsPipeInputStream& aOther)
@ -160,7 +208,6 @@ class nsPipeInputStream final : public nsIAsyncInputStream,
mInputStatus(aOther.mInputStatus),
mBlocking(aOther.mBlocking),
mBlocked(false),
mCallbackFlags(0),
mReadState(aOther.mReadState),
mPriority(nsIRunnablePriority::PRIORITY_NORMAL) {}
@ -210,8 +257,7 @@ class nsPipeInputStream final : public nsIAsyncInputStream,
// these variables can only be accessed while inside the pipe's monitor
bool mBlocked;
nsCOMPtr<nsIInputStreamCallback> mCallback;
uint32_t mCallbackFlags;
CallbackHolder mCallback;
// requires pipe's monitor; usually treat as an opaque token to pass to nsPipe
nsPipeReadState mReadState;
@ -240,8 +286,7 @@ class nsPipeOutputStream : public nsIAsyncOutputStream, public nsIClassInfo {
mLogicalOffset(0),
mBlocking(true),
mBlocked(false),
mWritable(true),
mCallbackFlags(0) {}
mWritable(true) {}
void SetNonBlocking(bool aNonBlocking) { mBlocking = !aNonBlocking; }
void SetWritable(bool aWritable) { mWritable = aWritable; }
@ -263,8 +308,7 @@ class nsPipeOutputStream : public nsIAsyncOutputStream, public nsIClassInfo {
// these variables can only be accessed while inside the pipe's monitor
bool mBlocked;
bool mWritable;
nsCOMPtr<nsIOutputStreamCallback> mCallback;
uint32_t mCallbackFlags;
CallbackHolder mCallback;
};
//-----------------------------------------------------------------------------
@ -1059,17 +1103,10 @@ bool nsPipe::IsAdvanceBufferFull(const ReentrantMonitorAutoEnter& ev) const {
nsPipeEvents::~nsPipeEvents() {
// dispatch any pending events
for (uint32_t i = 0; i < mInputList.Length(); ++i) {
mInputList[i].mCallback->OnInputStreamReady(mInputList[i].mStream);
}
mInputList.Clear();
if (mOutputCallback) {
mOutputCallback->OnOutputStreamReady(mOutputStream);
mOutputCallback = nullptr;
mOutputStream = nullptr;
for (auto& callback : mCallbacks) {
callback.Notify();
}
mCallbacks.Clear();
}
//-----------------------------------------------------------------------------
@ -1148,10 +1185,8 @@ MonitorAction nsPipeInputStream::OnInputReadable(
mPipe->mReentrantMonitor.AssertCurrentThreadIn();
mReadState.mAvailable += aBytesWritten;
if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) {
aEvents.NotifyInputReady(this, mCallback);
mCallback = nullptr;
mCallbackFlags = 0;
if (mCallback && !(mCallback.Flags() & WAIT_CLOSURE_ONLY)) {
aEvents.NotifyReady(std::move(mCallback));
} else if (mBlocked) {
result = NotifyMonitor;
}
@ -1177,9 +1212,7 @@ MonitorAction nsPipeInputStream::OnInputException(
mPipe->DrainInputStream(mReadState, aEvents);
if (mCallback) {
aEvents.NotifyInputReady(this, mCallback);
mCallback = nullptr;
mCallbackFlags = 0;
aEvents.NotifyReady(std::move(mCallback));
} else if (mBlocked) {
result = NotifyMonitor;
}
@ -1321,27 +1354,20 @@ nsPipeInputStream::AsyncWait(nsIInputStreamCallback* aCallback, uint32_t aFlags,
// replace a pending callback
mCallback = nullptr;
mCallbackFlags = 0;
if (!aCallback) {
return NS_OK;
}
nsCOMPtr<nsIInputStreamCallback> proxy;
if (aTarget) {
proxy = NS_NewInputStreamReadyEvent("nsPipeInputStream::AsyncWait",
aCallback, aTarget, mPriority);
aCallback = proxy;
}
CallbackHolder callback(this, aCallback, aFlags, aTarget);
if (NS_FAILED(Status(mon)) ||
(mReadState.mAvailable && !(aFlags & WAIT_CLOSURE_ONLY))) {
// stream is already closed or readable; post event.
pipeEvents.NotifyInputReady(this, aCallback);
pipeEvents.NotifyReady(std::move(callback));
} else {
// queue up callback object to be notified when data becomes available
mCallback = aCallback;
mCallbackFlags = aFlags;
mCallback = std::move(callback);
}
}
return NS_OK;
@ -1509,10 +1535,8 @@ MonitorAction nsPipeOutputStream::OnOutputWritable(nsPipeEvents& aEvents) {
mWritable = true;
if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) {
aEvents.NotifyOutputReady(this, mCallback);
mCallback = nullptr;
mCallbackFlags = 0;
if (mCallback && !(mCallback.Flags() & WAIT_CLOSURE_ONLY)) {
aEvents.NotifyReady(std::move(mCallback));
} else if (mBlocked) {
result = NotifyMonitor;
}
@ -1531,9 +1555,7 @@ MonitorAction nsPipeOutputStream::OnOutputException(nsresult aReason,
mWritable = false;
if (mCallback) {
aEvents.NotifyOutputReady(this, mCallback);
mCallback = nullptr;
mCallbackFlags = 0;
aEvents.NotifyReady(std::move(mCallback));
} else if (mBlocked) {
result = NotifyMonitor;
}
@ -1679,26 +1701,20 @@ nsPipeOutputStream::AsyncWait(nsIOutputStreamCallback* aCallback,
// replace a pending callback
mCallback = nullptr;
mCallbackFlags = 0;
if (!aCallback) {
return NS_OK;
}
nsCOMPtr<nsIOutputStreamCallback> proxy;
if (aTarget) {
proxy = NS_NewOutputStreamReadyEvent(aCallback, aTarget);
aCallback = proxy;
}
CallbackHolder callback(this, aCallback, aFlags, aTarget);
if (NS_FAILED(mPipe->mStatus) ||
(mWritable && !(aFlags & WAIT_CLOSURE_ONLY))) {
// stream is already closed or writable; post event.
pipeEvents.NotifyOutputReady(this, aCallback);
pipeEvents.NotifyReady(std::move(callback));
} else {
// queue up callback object to be notified when data becomes available
mCallback = aCallback;
mCallbackFlags = aFlags;
mCallback = std::move(callback);
}
}
return NS_OK;