gecko-dev/ipc/glue/IPCStreamDestination.cpp

401 lines
11 KiB
C++

/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "IPCStreamDestination.h"
#include "mozilla/InputStreamLengthWrapper.h"
#include "mozilla/Mutex.h"
#include "nsIAsyncInputStream.h"
#include "nsIAsyncOutputStream.h"
#include "nsIBufferedStreams.h"
#include "nsICloneableInputStream.h"
#include "nsIPipe.h"
#include "nsThreadUtils.h"
#include "mozilla/webrender/WebRenderTypes.h"
namespace mozilla {
namespace ipc {
// ----------------------------------------------------------------------------
// IPCStreamDestination::DelayedStartInputStream
//
// When AutoIPCStream is used with delayedStart, we need to ask for data at the
// first real use of the nsIInputStream. In order to do so, we wrap the
// nsIInputStream, created by the nsIPipe, with this wrapper.
class IPCStreamDestination::DelayedStartInputStream final
: public nsIAsyncInputStream,
public nsIInputStreamCallback,
public nsISearchableInputStream,
public nsICloneableInputStream,
public nsIBufferedInputStream {
public:
NS_DECL_THREADSAFE_ISUPPORTS
DelayedStartInputStream(IPCStreamDestination* aDestination,
nsCOMPtr<nsIAsyncInputStream>&& aStream)
: mDestination(aDestination),
mStream(std::move(aStream)),
mMutex("IPCStreamDestination::DelayedStartInputStream::mMutex") {
MOZ_ASSERT(mDestination);
MOZ_ASSERT(mStream);
}
void DestinationShutdown() {
MutexAutoLock lock(mMutex);
mDestination = nullptr;
}
// nsIInputStream interface
NS_IMETHOD
Close() override {
MaybeCloseDestination();
return mStream->Close();
}
NS_IMETHOD
Available(uint64_t* aLength) override {
MaybeStartReading();
return mStream->Available(aLength);
}
NS_IMETHOD
Read(char* aBuffer, uint32_t aCount, uint32_t* aReadCount) override {
MaybeStartReading();
return mStream->Read(aBuffer, aCount, aReadCount);
}
NS_IMETHOD
ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, uint32_t aCount,
uint32_t* aResult) override {
MaybeStartReading();
return mStream->ReadSegments(aWriter, aClosure, aCount, aResult);
}
NS_IMETHOD
IsNonBlocking(bool* aNonBlocking) override {
MaybeStartReading();
return mStream->IsNonBlocking(aNonBlocking);
}
// nsIAsyncInputStream interface
NS_IMETHOD
CloseWithStatus(nsresult aReason) override {
MaybeCloseDestination();
return mStream->CloseWithStatus(aReason);
}
NS_IMETHOD
AsyncWait(nsIInputStreamCallback* aCallback, uint32_t aFlags,
uint32_t aRequestedCount, nsIEventTarget* aTarget) override {
{
MutexAutoLock lock(mMutex);
if (mAsyncWaitCallback && aCallback) {
return NS_ERROR_FAILURE;
}
mAsyncWaitCallback = aCallback;
MaybeStartReading(lock);
}
nsCOMPtr<nsIInputStreamCallback> callback = aCallback ? this : nullptr;
return mStream->AsyncWait(callback, aFlags, aRequestedCount, aTarget);
}
NS_IMETHOD
Search(const char* aForString, bool aIgnoreCase, bool* aFound,
uint32_t* aOffsetSearchedTo) override {
MaybeStartReading();
nsCOMPtr<nsISearchableInputStream> searchable = do_QueryInterface(mStream);
MOZ_ASSERT(searchable);
return searchable->Search(aForString, aIgnoreCase, aFound,
aOffsetSearchedTo);
}
// nsICloneableInputStream interface
NS_IMETHOD
GetCloneable(bool* aCloneable) override {
MaybeStartReading();
nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(mStream);
MOZ_ASSERT(cloneable);
return cloneable->GetCloneable(aCloneable);
}
NS_IMETHOD
Clone(nsIInputStream** aResult) override {
MaybeStartReading();
nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(mStream);
MOZ_ASSERT(cloneable);
return cloneable->Clone(aResult);
}
// nsIBufferedInputStream
NS_IMETHOD
Init(nsIInputStream* aStream, uint32_t aBufferSize) override {
MaybeStartReading();
nsCOMPtr<nsIBufferedInputStream> stream = do_QueryInterface(mStream);
MOZ_ASSERT(stream);
return stream->Init(aStream, aBufferSize);
}
NS_IMETHODIMP
GetData(nsIInputStream** aResult) override {
return NS_ERROR_NOT_IMPLEMENTED;
}
// nsIInputStreamCallback
NS_IMETHOD
OnInputStreamReady(nsIAsyncInputStream* aStream) override {
nsCOMPtr<nsIInputStreamCallback> callback;
{
MutexAutoLock lock(mMutex);
// We have been canceled in the meanwhile.
if (!mAsyncWaitCallback) {
return NS_OK;
}
callback.swap(mAsyncWaitCallback);
}
callback->OnInputStreamReady(this);
return NS_OK;
}
void MaybeStartReading();
void MaybeStartReading(const MutexAutoLock& aProofOfLook);
void MaybeCloseDestination();
private:
~DelayedStartInputStream() = default;
IPCStreamDestination* mDestination;
nsCOMPtr<nsIAsyncInputStream> mStream;
nsCOMPtr<nsIInputStreamCallback> mAsyncWaitCallback;
// This protects mDestination: any method can be called by any thread.
Mutex mMutex;
class HelperRunnable;
};
class IPCStreamDestination::DelayedStartInputStream::HelperRunnable final
: public Runnable {
public:
enum Op {
eStartReading,
eCloseDestination,
};
HelperRunnable(
IPCStreamDestination::DelayedStartInputStream* aDelayedStartInputStream,
Op aOp)
: Runnable(
"ipc::IPCStreamDestination::DelayedStartInputStream::"
"HelperRunnable"),
mDelayedStartInputStream(aDelayedStartInputStream),
mOp(aOp) {
MOZ_ASSERT(aDelayedStartInputStream);
}
NS_IMETHOD
Run() override {
switch (mOp) {
case eStartReading:
mDelayedStartInputStream->MaybeStartReading();
break;
case eCloseDestination:
mDelayedStartInputStream->MaybeCloseDestination();
break;
}
return NS_OK;
}
private:
RefPtr<IPCStreamDestination::DelayedStartInputStream>
mDelayedStartInputStream;
Op mOp;
};
void IPCStreamDestination::DelayedStartInputStream::MaybeStartReading() {
MutexAutoLock lock(mMutex);
MaybeStartReading(lock);
}
void IPCStreamDestination::DelayedStartInputStream::MaybeStartReading(
const MutexAutoLock& aProofOfLook) {
if (!mDestination) {
return;
}
if (mDestination->IsOnOwningThread()) {
mDestination->StartReading();
mDestination = nullptr;
return;
}
RefPtr<Runnable> runnable =
new HelperRunnable(this, HelperRunnable::eStartReading);
mDestination->DispatchRunnable(runnable.forget());
}
void IPCStreamDestination::DelayedStartInputStream::MaybeCloseDestination() {
MutexAutoLock lock(mMutex);
if (!mDestination) {
return;
}
if (mDestination->IsOnOwningThread()) {
mDestination->RequestClose(NS_ERROR_ABORT);
mDestination = nullptr;
return;
}
RefPtr<Runnable> runnable =
new HelperRunnable(this, HelperRunnable::eCloseDestination);
mDestination->DispatchRunnable(runnable.forget());
}
NS_IMPL_ADDREF(IPCStreamDestination::DelayedStartInputStream);
NS_IMPL_RELEASE(IPCStreamDestination::DelayedStartInputStream);
NS_INTERFACE_MAP_BEGIN(IPCStreamDestination::DelayedStartInputStream)
NS_INTERFACE_MAP_ENTRY(nsIAsyncInputStream)
NS_INTERFACE_MAP_ENTRY(nsIInputStreamCallback)
NS_INTERFACE_MAP_ENTRY(nsISearchableInputStream)
NS_INTERFACE_MAP_ENTRY(nsICloneableInputStream)
NS_INTERFACE_MAP_ENTRY(nsIBufferedInputStream)
NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsIInputStream, nsIAsyncInputStream)
NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIAsyncInputStream)
NS_INTERFACE_MAP_END
// ----------------------------------------------------------------------------
// IPCStreamDestination
IPCStreamDestination::IPCStreamDestination()
: mOwningThread(NS_GetCurrentThread()),
mDelayedStart(false)
#ifdef MOZ_DEBUG
,
mLengthSet(false)
#endif
{
}
IPCStreamDestination::~IPCStreamDestination() = default;
nsresult IPCStreamDestination::Initialize() {
MOZ_ASSERT(!mReader);
MOZ_ASSERT(!mWriter);
// use async versions for both reader and writer even though we are
// opening the writer as an infinite stream. We want to be able to
// use CloseWithStatus() to communicate errors through the pipe.
// Use an "infinite" pipe because we cannot apply back-pressure through
// the async IPC layer at the moment. Blocking the IPC worker thread
// is not desirable, either.
nsresult rv = NS_NewPipe2(getter_AddRefs(mReader), getter_AddRefs(mWriter),
true, true, // non-blocking
0, // segment size
UINT32_MAX); // "infinite" pipe
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
return NS_OK;
}
void IPCStreamDestination::SetDelayedStart(bool aDelayedStart) {
mDelayedStart = aDelayedStart;
}
void IPCStreamDestination::SetLength(int64_t aLength) {
MOZ_ASSERT(mReader);
MOZ_ASSERT(!mLengthSet);
#ifdef DEBUG
mLengthSet = true;
#endif
if (aLength != -1) {
nsCOMPtr<nsIInputStream> finalStream;
finalStream = new InputStreamLengthWrapper(mReader.forget(), aLength);
mReader = do_QueryInterface(finalStream);
MOZ_ASSERT(mReader);
}
}
already_AddRefed<nsIInputStream> IPCStreamDestination::TakeReader() {
MOZ_ASSERT(mReader);
MOZ_ASSERT(!mDelayedStartInputStream);
if (mDelayedStart) {
mDelayedStartInputStream =
new DelayedStartInputStream(this, std::move(mReader));
RefPtr<nsIAsyncInputStream> inputStream = mDelayedStartInputStream;
return inputStream.forget();
}
return mReader.forget();
}
bool IPCStreamDestination::IsOnOwningThread() const {
return mOwningThread == NS_GetCurrentThread();
}
void IPCStreamDestination::DispatchRunnable(
already_AddRefed<nsIRunnable>&& aRunnable) {
nsCOMPtr<nsIRunnable> runnable = aRunnable;
mOwningThread->Dispatch(runnable.forget(), NS_DISPATCH_NORMAL);
}
void IPCStreamDestination::ActorDestroyed() {
MOZ_ASSERT(mWriter);
// If we were gracefully closed we should have gotten RecvClose(). In
// that case, the writer will already be closed and this will have no
// effect. This just aborts the writer in the case where the child process
// crashes.
mWriter->CloseWithStatus(NS_ERROR_ABORT);
if (mDelayedStartInputStream) {
mDelayedStartInputStream->DestinationShutdown();
mDelayedStartInputStream = nullptr;
}
}
void IPCStreamDestination::BufferReceived(const wr::ByteBuffer& aBuffer) {
MOZ_ASSERT(mWriter);
uint32_t numWritten = 0;
// This should only fail if we hit an OOM condition.
nsresult rv = mWriter->Write(reinterpret_cast<char*>(aBuffer.mData),
aBuffer.mLength, &numWritten);
if (NS_WARN_IF(NS_FAILED(rv))) {
RequestClose(rv);
}
}
void IPCStreamDestination::CloseReceived(nsresult aRv) {
MOZ_ASSERT(mWriter);
mWriter->CloseWithStatus(aRv);
TerminateDestination();
}
} // namespace ipc
} // namespace mozilla