mirror of
https://github.com/mozilla/gecko-dev.git
synced 2024-11-24 21:31:04 +00:00
f4a515c179
Differential Revision: https://phabricator.services.mozilla.com/D21131 --HG-- extra : rebase_source : 514f36238d908de221a0116f8e8a5d0cf18a168c extra : histedit_source : 85743d2586a7307738866ce145b93dae2a664cf3
415 lines
12 KiB
C++
415 lines
12 KiB
C++
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
|
|
/* This Source Code Form is subject to the terms of the Mozilla Public
|
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
|
|
|
#include "NonBlockingAsyncInputStream.h"
|
|
#include "mozilla/ipc/InputStreamUtils.h"
|
|
#include "nsISeekableStream.h"
|
|
#include "nsStreamUtils.h"
|
|
|
|
namespace mozilla {
|
|
|
|
using namespace ipc;
|
|
|
|
class NonBlockingAsyncInputStream::AsyncWaitRunnable final
|
|
: public CancelableRunnable {
|
|
RefPtr<NonBlockingAsyncInputStream> mStream;
|
|
nsCOMPtr<nsIInputStreamCallback> mCallback;
|
|
|
|
public:
|
|
AsyncWaitRunnable(NonBlockingAsyncInputStream* aStream,
|
|
nsIInputStreamCallback* aCallback)
|
|
: CancelableRunnable("AsyncWaitRunnable"),
|
|
mStream(aStream),
|
|
mCallback(aCallback) {}
|
|
|
|
NS_IMETHOD
|
|
Run() override {
|
|
mStream->RunAsyncWaitCallback(this, mCallback.forget());
|
|
return NS_OK;
|
|
}
|
|
|
|
nsresult Cancel() override {
|
|
mStream = nullptr;
|
|
return NS_OK;
|
|
}
|
|
};
|
|
|
|
NS_IMPL_ADDREF(NonBlockingAsyncInputStream);
|
|
NS_IMPL_RELEASE(NonBlockingAsyncInputStream);
|
|
|
|
NonBlockingAsyncInputStream::WaitClosureOnly::WaitClosureOnly(
|
|
AsyncWaitRunnable* aRunnable, nsIEventTarget* aEventTarget)
|
|
: mRunnable(aRunnable), mEventTarget(aEventTarget) {}
|
|
|
|
NS_INTERFACE_MAP_BEGIN(NonBlockingAsyncInputStream)
|
|
NS_INTERFACE_MAP_ENTRY(nsIInputStream)
|
|
NS_INTERFACE_MAP_ENTRY(nsIAsyncInputStream)
|
|
NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsICloneableInputStream,
|
|
mWeakCloneableInputStream)
|
|
NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIIPCSerializableInputStream,
|
|
mWeakIPCSerializableInputStream)
|
|
NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISeekableStream,
|
|
mWeakSeekableInputStream)
|
|
NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsITellableStream,
|
|
mWeakTellableInputStream)
|
|
NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIInputStream)
|
|
NS_INTERFACE_MAP_END
|
|
|
|
/* static */
|
|
nsresult NonBlockingAsyncInputStream::Create(
|
|
already_AddRefed<nsIInputStream> aInputStream,
|
|
nsIAsyncInputStream** aResult) {
|
|
MOZ_DIAGNOSTIC_ASSERT(aResult);
|
|
|
|
nsCOMPtr<nsIInputStream> inputStream = std::move(aInputStream);
|
|
|
|
bool nonBlocking = false;
|
|
nsresult rv = inputStream->IsNonBlocking(&nonBlocking);
|
|
if (NS_WARN_IF(NS_FAILED(rv))) {
|
|
return rv;
|
|
}
|
|
|
|
MOZ_DIAGNOSTIC_ASSERT(nonBlocking);
|
|
|
|
#ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
|
|
nsCOMPtr<nsIAsyncInputStream> asyncInputStream =
|
|
do_QueryInterface(inputStream);
|
|
MOZ_DIAGNOSTIC_ASSERT(!asyncInputStream);
|
|
#endif // MOZ_DIAGNOSTIC_ASSERT_ENABLED
|
|
|
|
RefPtr<NonBlockingAsyncInputStream> stream =
|
|
new NonBlockingAsyncInputStream(inputStream.forget());
|
|
|
|
stream.forget(aResult);
|
|
return NS_OK;
|
|
}
|
|
|
|
NonBlockingAsyncInputStream::NonBlockingAsyncInputStream(
|
|
already_AddRefed<nsIInputStream> aInputStream)
|
|
: mInputStream(std::move(aInputStream)),
|
|
mWeakCloneableInputStream(nullptr),
|
|
mWeakIPCSerializableInputStream(nullptr),
|
|
mWeakSeekableInputStream(nullptr),
|
|
mWeakTellableInputStream(nullptr),
|
|
mLock("NonBlockingAsyncInputStream::mLock"),
|
|
mClosed(false) {
|
|
MOZ_ASSERT(mInputStream);
|
|
|
|
nsCOMPtr<nsICloneableInputStream> cloneableStream =
|
|
do_QueryInterface(mInputStream);
|
|
if (cloneableStream && SameCOMIdentity(mInputStream, cloneableStream)) {
|
|
mWeakCloneableInputStream = cloneableStream;
|
|
}
|
|
|
|
nsCOMPtr<nsIIPCSerializableInputStream> serializableStream =
|
|
do_QueryInterface(mInputStream);
|
|
if (serializableStream && SameCOMIdentity(mInputStream, serializableStream)) {
|
|
mWeakIPCSerializableInputStream = serializableStream;
|
|
}
|
|
|
|
nsCOMPtr<nsISeekableStream> seekableStream = do_QueryInterface(mInputStream);
|
|
if (seekableStream && SameCOMIdentity(mInputStream, seekableStream)) {
|
|
mWeakSeekableInputStream = seekableStream;
|
|
}
|
|
|
|
nsCOMPtr<nsITellableStream> tellableStream = do_QueryInterface(mInputStream);
|
|
if (tellableStream && SameCOMIdentity(mInputStream, tellableStream)) {
|
|
mWeakTellableInputStream = tellableStream;
|
|
}
|
|
}
|
|
|
|
NonBlockingAsyncInputStream::~NonBlockingAsyncInputStream() {}
|
|
|
|
NS_IMETHODIMP
|
|
NonBlockingAsyncInputStream::Close() {
|
|
RefPtr<AsyncWaitRunnable> waitClosureOnlyRunnable;
|
|
nsCOMPtr<nsIEventTarget> waitClosureOnlyEventTarget;
|
|
|
|
{
|
|
MutexAutoLock lock(mLock);
|
|
|
|
if (mClosed) {
|
|
// Here we could return NS_BASE_STREAM_CLOSED as well, but just to avoid
|
|
// warning messages, let's make everybody happy with a NS_OK.
|
|
return NS_OK;
|
|
}
|
|
|
|
mClosed = true;
|
|
|
|
NS_ENSURE_STATE(mInputStream);
|
|
nsresult rv = mInputStream->Close();
|
|
if (NS_WARN_IF(NS_FAILED(rv))) {
|
|
mWaitClosureOnly.reset();
|
|
return rv;
|
|
}
|
|
|
|
// If we have a WaitClosureOnly runnable, it's time to use it.
|
|
if (mWaitClosureOnly.isSome()) {
|
|
waitClosureOnlyRunnable = std::move(mWaitClosureOnly->mRunnable);
|
|
waitClosureOnlyEventTarget = std::move(mWaitClosureOnly->mEventTarget);
|
|
|
|
mWaitClosureOnly.reset();
|
|
|
|
// Now we want to dispatch the asyncWaitCallback.
|
|
mAsyncWaitCallback = waitClosureOnlyRunnable;
|
|
}
|
|
}
|
|
|
|
if (waitClosureOnlyRunnable) {
|
|
if (waitClosureOnlyEventTarget) {
|
|
waitClosureOnlyEventTarget->Dispatch(waitClosureOnlyRunnable,
|
|
NS_DISPATCH_NORMAL);
|
|
} else {
|
|
waitClosureOnlyRunnable->Run();
|
|
}
|
|
}
|
|
|
|
return NS_OK;
|
|
}
|
|
|
|
// nsIInputStream interface
|
|
|
|
NS_IMETHODIMP
|
|
NonBlockingAsyncInputStream::Available(uint64_t* aLength) {
|
|
nsresult rv = mInputStream->Available(aLength);
|
|
// Don't issue warnings for legal condition NS_BASE_STREAM_CLOSED.
|
|
if (rv == NS_BASE_STREAM_CLOSED || NS_WARN_IF(NS_FAILED(rv))) {
|
|
return rv;
|
|
}
|
|
|
|
// Nothing more to read. Let's close the stream now.
|
|
if (*aLength == 0) {
|
|
mInputStream->Close();
|
|
mClosed = true;
|
|
return NS_BASE_STREAM_CLOSED;
|
|
}
|
|
|
|
return NS_OK;
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
NonBlockingAsyncInputStream::Read(char* aBuffer, uint32_t aCount,
|
|
uint32_t* aReadCount) {
|
|
return mInputStream->Read(aBuffer, aCount, aReadCount);
|
|
}
|
|
|
|
namespace {
|
|
|
|
class MOZ_RAII ReadSegmentsData {
|
|
public:
|
|
ReadSegmentsData(NonBlockingAsyncInputStream* aStream,
|
|
nsWriteSegmentFun aFunc, void* aClosure)
|
|
: mStream(aStream), mFunc(aFunc), mClosure(aClosure) {}
|
|
|
|
NonBlockingAsyncInputStream* mStream;
|
|
nsWriteSegmentFun mFunc;
|
|
void* mClosure;
|
|
};
|
|
|
|
nsresult ReadSegmentsWriter(nsIInputStream* aInStream, void* aClosure,
|
|
const char* aFromSegment, uint32_t aToOffset,
|
|
uint32_t aCount, uint32_t* aWriteCount) {
|
|
ReadSegmentsData* data = static_cast<ReadSegmentsData*>(aClosure);
|
|
return data->mFunc(data->mStream, data->mClosure, aFromSegment, aToOffset,
|
|
aCount, aWriteCount);
|
|
}
|
|
|
|
} // namespace
|
|
|
|
NS_IMETHODIMP
|
|
NonBlockingAsyncInputStream::ReadSegments(nsWriteSegmentFun aWriter,
|
|
void* aClosure, uint32_t aCount,
|
|
uint32_t* aResult) {
|
|
ReadSegmentsData data(this, aWriter, aClosure);
|
|
return mInputStream->ReadSegments(ReadSegmentsWriter, &data, aCount, aResult);
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
NonBlockingAsyncInputStream::IsNonBlocking(bool* aNonBlocking) {
|
|
*aNonBlocking = true;
|
|
return NS_OK;
|
|
}
|
|
|
|
// nsICloneableInputStream interface
|
|
|
|
NS_IMETHODIMP
|
|
NonBlockingAsyncInputStream::GetCloneable(bool* aCloneable) {
|
|
NS_ENSURE_STATE(mWeakCloneableInputStream);
|
|
return mWeakCloneableInputStream->GetCloneable(aCloneable);
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
NonBlockingAsyncInputStream::Clone(nsIInputStream** aResult) {
|
|
NS_ENSURE_STATE(mWeakCloneableInputStream);
|
|
|
|
nsCOMPtr<nsIInputStream> clonedStream;
|
|
nsresult rv = mWeakCloneableInputStream->Clone(getter_AddRefs(clonedStream));
|
|
if (NS_WARN_IF(NS_FAILED(rv))) {
|
|
return rv;
|
|
}
|
|
|
|
nsCOMPtr<nsIAsyncInputStream> asyncStream;
|
|
rv = Create(clonedStream.forget(), getter_AddRefs(asyncStream));
|
|
if (NS_WARN_IF(NS_FAILED(rv))) {
|
|
return rv;
|
|
}
|
|
|
|
asyncStream.forget(aResult);
|
|
return NS_OK;
|
|
}
|
|
|
|
// nsIAsyncInputStream interface
|
|
|
|
NS_IMETHODIMP
|
|
NonBlockingAsyncInputStream::CloseWithStatus(nsresult aStatus) {
|
|
return Close();
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
NonBlockingAsyncInputStream::AsyncWait(nsIInputStreamCallback* aCallback,
|
|
uint32_t aFlags,
|
|
uint32_t aRequestedCount,
|
|
nsIEventTarget* aEventTarget) {
|
|
RefPtr<AsyncWaitRunnable> runnable;
|
|
{
|
|
MutexAutoLock lock(mLock);
|
|
|
|
if (aCallback && (mWaitClosureOnly.isSome() || mAsyncWaitCallback)) {
|
|
return NS_ERROR_FAILURE;
|
|
}
|
|
|
|
if (!aCallback) {
|
|
// Canceling previous callbacks.
|
|
mWaitClosureOnly.reset();
|
|
mAsyncWaitCallback = nullptr;
|
|
return NS_OK;
|
|
}
|
|
|
|
// Maybe the stream is already closed.
|
|
if (!mClosed) {
|
|
uint64_t length;
|
|
nsresult rv = mInputStream->Available(&length);
|
|
if (NS_SUCCEEDED(rv) && length == 0) {
|
|
mInputStream->Close();
|
|
mClosed = true;
|
|
}
|
|
}
|
|
|
|
runnable = new AsyncWaitRunnable(this, aCallback);
|
|
if ((aFlags & nsIAsyncInputStream::WAIT_CLOSURE_ONLY) && !mClosed) {
|
|
mWaitClosureOnly.emplace(runnable, aEventTarget);
|
|
return NS_OK;
|
|
}
|
|
|
|
mAsyncWaitCallback = runnable;
|
|
}
|
|
|
|
MOZ_ASSERT(runnable);
|
|
|
|
if (aEventTarget) {
|
|
return aEventTarget->Dispatch(runnable.forget());
|
|
}
|
|
|
|
return runnable->Run();
|
|
}
|
|
|
|
// nsIIPCSerializableInputStream
|
|
|
|
void NonBlockingAsyncInputStream::Serialize(
|
|
mozilla::ipc::InputStreamParams& aParams,
|
|
FileDescriptorArray& aFileDescriptors, bool aDelayedStart,
|
|
uint32_t aMaxSize, uint32_t* aSizeUsed,
|
|
mozilla::dom::ContentChild* aManager) {
|
|
SerializeInternal(aParams, aFileDescriptors, aDelayedStart, aMaxSize,
|
|
aSizeUsed, aManager);
|
|
}
|
|
|
|
void NonBlockingAsyncInputStream::Serialize(
|
|
mozilla::ipc::InputStreamParams& aParams,
|
|
FileDescriptorArray& aFileDescriptors, bool aDelayedStart,
|
|
uint32_t aMaxSize, uint32_t* aSizeUsed,
|
|
mozilla::ipc::PBackgroundChild* aManager) {
|
|
SerializeInternal(aParams, aFileDescriptors, aDelayedStart, aMaxSize,
|
|
aSizeUsed, aManager);
|
|
}
|
|
|
|
void NonBlockingAsyncInputStream::Serialize(
|
|
mozilla::ipc::InputStreamParams& aParams,
|
|
FileDescriptorArray& aFileDescriptors, bool aDelayedStart,
|
|
uint32_t aMaxSize, uint32_t* aSizeUsed,
|
|
mozilla::dom::ContentParent* aManager) {
|
|
SerializeInternal(aParams, aFileDescriptors, aDelayedStart, aMaxSize,
|
|
aSizeUsed, aManager);
|
|
}
|
|
|
|
void NonBlockingAsyncInputStream::Serialize(
|
|
mozilla::ipc::InputStreamParams& aParams,
|
|
FileDescriptorArray& aFileDescriptors, bool aDelayedStart,
|
|
uint32_t aMaxSize, uint32_t* aSizeUsed,
|
|
mozilla::ipc::PBackgroundParent* aManager) {
|
|
SerializeInternal(aParams, aFileDescriptors, aDelayedStart, aMaxSize,
|
|
aSizeUsed, aManager);
|
|
}
|
|
|
|
template <typename M>
|
|
void NonBlockingAsyncInputStream::SerializeInternal(
|
|
mozilla::ipc::InputStreamParams& aParams,
|
|
FileDescriptorArray& aFileDescriptors, bool aDelayedStart,
|
|
uint32_t aMaxSize, uint32_t* aSizeUsed, M* aManager) {
|
|
MOZ_ASSERT(mWeakIPCSerializableInputStream);
|
|
InputStreamHelper::SerializeInputStream(mInputStream, aParams,
|
|
aFileDescriptors, aDelayedStart,
|
|
aMaxSize, aSizeUsed, aManager);
|
|
}
|
|
|
|
bool NonBlockingAsyncInputStream::Deserialize(
|
|
const mozilla::ipc::InputStreamParams& aParams,
|
|
const FileDescriptorArray& aFileDescriptors) {
|
|
MOZ_CRASH("NonBlockingAsyncInputStream cannot be deserialized!");
|
|
return true;
|
|
}
|
|
|
|
// nsISeekableStream
|
|
|
|
NS_IMETHODIMP
|
|
NonBlockingAsyncInputStream::Seek(int32_t aWhence, int64_t aOffset) {
|
|
NS_ENSURE_STATE(mWeakSeekableInputStream);
|
|
return mWeakSeekableInputStream->Seek(aWhence, aOffset);
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
NonBlockingAsyncInputStream::SetEOF() {
|
|
NS_ENSURE_STATE(mWeakSeekableInputStream);
|
|
return NS_ERROR_NOT_IMPLEMENTED;
|
|
}
|
|
|
|
// nsITellableStream
|
|
|
|
NS_IMETHODIMP
|
|
NonBlockingAsyncInputStream::Tell(int64_t* aResult) {
|
|
NS_ENSURE_STATE(mWeakTellableInputStream);
|
|
return mWeakTellableInputStream->Tell(aResult);
|
|
}
|
|
|
|
void NonBlockingAsyncInputStream::RunAsyncWaitCallback(
|
|
NonBlockingAsyncInputStream::AsyncWaitRunnable* aRunnable,
|
|
already_AddRefed<nsIInputStreamCallback> aCallback) {
|
|
nsCOMPtr<nsIInputStreamCallback> callback = std::move(aCallback);
|
|
|
|
{
|
|
MutexAutoLock lock(mLock);
|
|
if (mAsyncWaitCallback != aRunnable) {
|
|
// The callback has been canceled in the meantime.
|
|
return;
|
|
}
|
|
|
|
mAsyncWaitCallback = nullptr;
|
|
}
|
|
|
|
callback->OnInputStreamReady(this);
|
|
}
|
|
|
|
} // namespace mozilla
|