gecko-dev/xpcom/io/nsPipe3.cpp

2030 lines
58 KiB
C++

/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include <algorithm>
#include "mozilla/Attributes.h"
#include "mozilla/IntegerPrintfMacros.h"
#include "mozilla/ReentrantMonitor.h"
#include "nsIBufferedStreams.h"
#include "nsICloneableInputStream.h"
#include "nsIPipe.h"
#include "nsIEventTarget.h"
#include "nsISeekableStream.h"
#include "mozilla/RefPtr.h"
#include "nsSegmentedBuffer.h"
#include "nsStreamUtils.h"
#include "nsCOMPtr.h"
#include "nsCRT.h"
#include "mozilla/Logging.h"
#include "nsIClassInfoImpl.h"
#include "nsAlgorithm.h"
#include "nsMemory.h"
#include "nsIAsyncInputStream.h"
#include "nsIAsyncOutputStream.h"
using namespace mozilla;
#ifdef LOG
#undef LOG
#endif
//
// set MOZ_LOG=nsPipe:5
//
static LazyLogModule sPipeLog("nsPipe");
#define LOG(args) MOZ_LOG(sPipeLog, mozilla::LogLevel::Debug, args)
#define DEFAULT_SEGMENT_SIZE 4096
#define DEFAULT_SEGMENT_COUNT 16
class nsPipe;
class nsPipeEvents;
class nsPipeInputStream;
class nsPipeOutputStream;
class AutoReadSegment;
namespace {
enum MonitorAction
{
DoNotNotifyMonitor,
NotifyMonitor
};
enum SegmentChangeResult
{
SegmentNotChanged,
SegmentAdvanceBufferRead
};
} // namespace
//-----------------------------------------------------------------------------
// this class is used to delay notifications until the end of a particular
// scope. it helps avoid the complexity of issuing callbacks while inside
// a critical section.
class nsPipeEvents
{
public:
nsPipeEvents() { }
~nsPipeEvents();
inline void NotifyInputReady(nsIAsyncInputStream* aStream,
nsIInputStreamCallback* aCallback)
{
mInputList.AppendElement(InputEntry(aStream, aCallback));
}
inline void NotifyOutputReady(nsIAsyncOutputStream* aStream,
nsIOutputStreamCallback* aCallback)
{
MOZ_DIAGNOSTIC_ASSERT(!mOutputCallback);
mOutputStream = aStream;
mOutputCallback = aCallback;
}
private:
struct InputEntry
{
InputEntry(nsIAsyncInputStream* aStream, nsIInputStreamCallback* aCallback)
: mStream(aStream)
, mCallback(aCallback)
{
MOZ_DIAGNOSTIC_ASSERT(mStream);
MOZ_DIAGNOSTIC_ASSERT(mCallback);
}
nsCOMPtr<nsIAsyncInputStream> mStream;
nsCOMPtr<nsIInputStreamCallback> mCallback;
};
nsTArray<InputEntry> mInputList;
nsCOMPtr<nsIAsyncOutputStream> mOutputStream;
nsCOMPtr<nsIOutputStreamCallback> mOutputCallback;
};
//-----------------------------------------------------------------------------
// This class is used to maintain input stream state. Its broken out from the
// nsPipeInputStream class because generally the nsPipe should be modifying
// this state and not the input stream itself.
struct nsPipeReadState
{
nsPipeReadState()
: mReadCursor(nullptr)
, mReadLimit(nullptr)
, mSegment(0)
, mAvailable(0)
, mActiveRead(false)
, mNeedDrain(false)
{ }
char* mReadCursor;
char* mReadLimit;
int32_t mSegment;
uint32_t mAvailable;
// This flag is managed using the AutoReadSegment RAII stack class.
bool mActiveRead;
// Set to indicate that the input stream has closed and should be drained,
// but that drain has been delayed due to an active read. When the read
// completes, this flag indicate the drain should then be performed.
bool mNeedDrain;
};
//-----------------------------------------------------------------------------
// an input end of a pipe (maintained as a list of refs within the pipe)
class nsPipeInputStream final
: public nsIAsyncInputStream
, public nsISeekableStream
, public nsISearchableInputStream
, public nsICloneableInputStream
, public nsIClassInfo
, public nsIBufferedInputStream
{
public:
NS_DECL_THREADSAFE_ISUPPORTS
NS_DECL_NSIINPUTSTREAM
NS_DECL_NSIASYNCINPUTSTREAM
NS_DECL_NSISEEKABLESTREAM
NS_DECL_NSISEARCHABLEINPUTSTREAM
NS_DECL_NSICLONEABLEINPUTSTREAM
NS_DECL_NSICLASSINFO
NS_DECL_NSIBUFFEREDINPUTSTREAM
explicit nsPipeInputStream(nsPipe* aPipe)
: mPipe(aPipe)
, mLogicalOffset(0)
, mInputStatus(NS_OK)
, mBlocking(true)
, mBlocked(false)
, mCallbackFlags(0)
{ }
explicit nsPipeInputStream(const nsPipeInputStream& aOther)
: mPipe(aOther.mPipe)
, mLogicalOffset(aOther.mLogicalOffset)
, mInputStatus(aOther.mInputStatus)
, mBlocking(aOther.mBlocking)
, mBlocked(false)
, mCallbackFlags(0)
, mReadState(aOther.mReadState)
{ }
nsresult Fill();
void SetNonBlocking(bool aNonBlocking)
{
mBlocking = !aNonBlocking;
}
uint32_t Available();
// synchronously wait for the pipe to become readable.
nsresult Wait();
// These two don't acquire the monitor themselves. Instead they
// expect their caller to have done so and to pass the monitor as
// evidence.
MonitorAction OnInputReadable(uint32_t aBytesWritten, nsPipeEvents&,
const ReentrantMonitorAutoEnter& ev);
MonitorAction OnInputException(nsresult, nsPipeEvents&,
const ReentrantMonitorAutoEnter& ev);
nsPipeReadState& ReadState()
{
return mReadState;
}
const nsPipeReadState& ReadState() const
{
return mReadState;
}
nsresult Status() const;
// A version of Status() that doesn't acquire the monitor.
nsresult Status(const ReentrantMonitorAutoEnter& ev) const;
private:
virtual ~nsPipeInputStream();
RefPtr<nsPipe> mPipe;
int64_t mLogicalOffset;
// Individual input streams can be closed without effecting the rest of the
// pipe. So track individual input stream status separately. |mInputStatus|
// is protected by |mPipe->mReentrantMonitor|.
nsresult mInputStatus;
bool mBlocking;
// these variables can only be accessed while inside the pipe's monitor
bool mBlocked;
nsCOMPtr<nsIInputStreamCallback> mCallback;
uint32_t mCallbackFlags;
// requires pipe's monitor; usually treat as an opaque token to pass to nsPipe
nsPipeReadState mReadState;
};
//-----------------------------------------------------------------------------
// the output end of a pipe (allocated as a member of the pipe).
class nsPipeOutputStream
: public nsIAsyncOutputStream
, public nsIClassInfo
{
public:
// since this class will be allocated as a member of the pipe, we do not
// need our own ref count. instead, we share the lifetime (the ref count)
// of the entire pipe. this macro is just convenience since it does not
// declare a mRefCount variable; however, don't let the name fool you...
// we are not inheriting from nsPipe ;-)
NS_DECL_ISUPPORTS_INHERITED
NS_DECL_NSIOUTPUTSTREAM
NS_DECL_NSIASYNCOUTPUTSTREAM
NS_DECL_NSICLASSINFO
explicit nsPipeOutputStream(nsPipe* aPipe)
: mPipe(aPipe)
, mWriterRefCnt(0)
, mLogicalOffset(0)
, mBlocking(true)
, mBlocked(false)
, mWritable(true)
, mCallbackFlags(0)
{ }
void SetNonBlocking(bool aNonBlocking)
{
mBlocking = !aNonBlocking;
}
void SetWritable(bool aWritable)
{
mWritable = aWritable;
}
// synchronously wait for the pipe to become writable.
nsresult Wait();
MonitorAction OnOutputWritable(nsPipeEvents&);
MonitorAction OnOutputException(nsresult, nsPipeEvents&);
private:
nsPipe* mPipe;
// separate refcnt so that we know when to close the producer
mozilla::ThreadSafeAutoRefCnt mWriterRefCnt;
int64_t mLogicalOffset;
bool mBlocking;
// these variables can only be accessed while inside the pipe's monitor
bool mBlocked;
bool mWritable;
nsCOMPtr<nsIOutputStreamCallback> mCallback;
uint32_t mCallbackFlags;
};
//-----------------------------------------------------------------------------
class nsPipe final : public nsIPipe
{
public:
friend class nsPipeInputStream;
friend class nsPipeOutputStream;
friend class AutoReadSegment;
NS_DECL_THREADSAFE_ISUPPORTS
NS_DECL_NSIPIPE
// nsPipe methods:
nsPipe();
private:
~nsPipe();
//
// Methods below may only be called while inside the pipe's monitor. Some
// of these methods require passing a ReentrantMonitorAutoEnter to prove the
// monitor is held.
//
void PeekSegment(const nsPipeReadState& aReadState, uint32_t aIndex,
char*& aCursor, char*& aLimit);
SegmentChangeResult AdvanceReadSegment(nsPipeReadState& aReadState,
const ReentrantMonitorAutoEnter &ev);
bool ReadSegmentBeingWritten(nsPipeReadState& aReadState);
uint32_t CountSegmentReferences(int32_t aSegment);
void SetAllNullReadCursors();
bool AllReadCursorsMatchWriteCursor();
void RollBackAllReadCursors(char* aWriteCursor);
void UpdateAllReadCursors(char* aWriteCursor);
void ValidateAllReadCursors();
uint32_t GetBufferSegmentCount(const nsPipeReadState& aReadState,
const ReentrantMonitorAutoEnter& ev) const;
bool IsAdvanceBufferFull(const ReentrantMonitorAutoEnter& ev) const;
//
// methods below may be called while outside the pipe's monitor
//
void DrainInputStream(nsPipeReadState& aReadState, nsPipeEvents& aEvents);
nsresult GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen);
void AdvanceWriteCursor(uint32_t aCount);
void OnInputStreamException(nsPipeInputStream* aStream, nsresult aReason);
void OnPipeException(nsresult aReason, bool aOutputOnly = false);
nsresult CloneInputStream(nsPipeInputStream* aOriginal,
nsIInputStream** aCloneOut);
// methods below should only be called by AutoReadSegment
nsresult GetReadSegment(nsPipeReadState& aReadState, const char*& aSegment,
uint32_t& aLength);
void ReleaseReadSegment(nsPipeReadState& aReadState,
nsPipeEvents& aEvents);
void AdvanceReadCursor(nsPipeReadState& aReadState, uint32_t aCount);
// We can't inherit from both nsIInputStream and nsIOutputStream
// because they collide on their Close method. Consequently we nest their
// implementations to avoid the extra object allocation.
nsPipeOutputStream mOutput;
// Since the input stream can be cloned, we may have more than one. Use
// a weak reference as the streams will clear their entry here in their
// destructor. Using a strong reference would create a reference cycle.
// Only usable while mReentrantMonitor is locked.
nsTArray<nsPipeInputStream*> mInputList;
// But hold a strong ref to our original input stream. For backward
// compatibility we need to be able to consistently return this same
// object from GetInputStream(). Note, mOriginalInput is also stored
// in mInputList as a weak ref.
RefPtr<nsPipeInputStream> mOriginalInput;
ReentrantMonitor mReentrantMonitor;
nsSegmentedBuffer mBuffer;
// The maximum number of segments to allow to be buffered in advance
// of the fastest reader. This is collection of segments is called
// the "advance buffer".
uint32_t mMaxAdvanceBufferSegmentCount;
int32_t mWriteSegment;
char* mWriteCursor;
char* mWriteLimit;
// |mStatus| is protected by |mReentrantMonitor|.
nsresult mStatus;
bool mInited;
};
//-----------------------------------------------------------------------------
// RAII class representing an active read segment. When it goes out of scope
// it automatically updates the read cursor and releases the read segment.
class MOZ_STACK_CLASS AutoReadSegment final
{
public:
AutoReadSegment(nsPipe* aPipe, nsPipeReadState& aReadState,
uint32_t aMaxLength)
: mPipe(aPipe)
, mReadState(aReadState)
, mStatus(NS_ERROR_FAILURE)
, mSegment(nullptr)
, mLength(0)
, mOffset(0)
{
MOZ_DIAGNOSTIC_ASSERT(mPipe);
MOZ_DIAGNOSTIC_ASSERT(!mReadState.mActiveRead);
mStatus = mPipe->GetReadSegment(mReadState, mSegment, mLength);
if (NS_SUCCEEDED(mStatus)) {
MOZ_DIAGNOSTIC_ASSERT(mReadState.mActiveRead);
MOZ_DIAGNOSTIC_ASSERT(mSegment);
mLength = std::min(mLength, aMaxLength);
MOZ_DIAGNOSTIC_ASSERT(mLength);
}
}
~AutoReadSegment()
{
if (NS_SUCCEEDED(mStatus)) {
if (mOffset) {
mPipe->AdvanceReadCursor(mReadState, mOffset);
} else {
nsPipeEvents events;
mPipe->ReleaseReadSegment(mReadState, events);
}
}
MOZ_DIAGNOSTIC_ASSERT(!mReadState.mActiveRead);
}
nsresult Status() const
{
return mStatus;
}
const char* Data() const
{
MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(mStatus));
MOZ_DIAGNOSTIC_ASSERT(mSegment);
return mSegment + mOffset;
}
uint32_t Length() const
{
MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(mStatus));
MOZ_DIAGNOSTIC_ASSERT(mLength >= mOffset);
return mLength - mOffset;
}
void
Advance(uint32_t aCount)
{
MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(mStatus));
MOZ_DIAGNOSTIC_ASSERT(aCount <= (mLength - mOffset));
mOffset += aCount;
}
nsPipeReadState&
ReadState() const
{
return mReadState;
}
private:
// guaranteed to remain alive due to limited stack lifetime of AutoReadSegment
nsPipe* mPipe;
nsPipeReadState& mReadState;
nsresult mStatus;
const char* mSegment;
uint32_t mLength;
uint32_t mOffset;
};
//
// NOTES on buffer architecture:
//
// +-----------------+ - - mBuffer.GetSegment(0)
// | |
// + - - - - - - - - + - - nsPipeReadState.mReadCursor
// |/////////////////|
// |/////////////////|
// |/////////////////|
// |/////////////////|
// +-----------------+ - - nsPipeReadState.mReadLimit
// |
// +-----------------+
// |/////////////////|
// |/////////////////|
// |/////////////////|
// |/////////////////|
// |/////////////////|
// |/////////////////|
// +-----------------+
// |
// +-----------------+ - - mBuffer.GetSegment(mWriteSegment)
// |/////////////////|
// |/////////////////|
// |/////////////////|
// + - - - - - - - - + - - mWriteCursor
// | |
// | |
// +-----------------+ - - mWriteLimit
//
// (shaded region contains data)
//
// NOTE: Each input stream produced by the nsPipe contains its own, separate
// nsPipeReadState. This means there are multiple mReadCursor and
// mReadLimit values in play. The pipe cannot discard old data until
// all mReadCursors have moved beyond that point in the stream.
//
// Likewise, each input stream reader will have it's own amount of
// buffered data. The pipe size threshold, however, is only applied
// to the input stream that is being read fastest. We call this
// the "advance buffer" in that its in advance of all readers. We
// allow slower input streams to buffer more data so that we don't
// stall processing of the faster input stream.
//
// NOTE: on some systems (notably OS/2), the heap allocator uses an arena for
// small allocations (e.g., 64 byte allocations). this means that buffers may
// be allocated back-to-back. in the diagram above, for example, mReadLimit
// would actually be pointing at the beginning of the next segment. when
// making changes to this file, please keep this fact in mind.
//
//-----------------------------------------------------------------------------
// nsPipe methods:
//-----------------------------------------------------------------------------
nsPipe::nsPipe()
: mOutput(this)
, mOriginalInput(new nsPipeInputStream(this))
, mReentrantMonitor("nsPipe.mReentrantMonitor")
, mMaxAdvanceBufferSegmentCount(0)
, mWriteSegment(-1)
, mWriteCursor(nullptr)
, mWriteLimit(nullptr)
, mStatus(NS_OK)
, mInited(false)
{
mInputList.AppendElement(mOriginalInput);
}
nsPipe::~nsPipe()
{
}
NS_IMPL_ADDREF(nsPipe)
NS_IMPL_QUERY_INTERFACE(nsPipe, nsIPipe)
NS_IMETHODIMP_(MozExternalRefCountType)
nsPipe::Release()
{
MOZ_DIAGNOSTIC_ASSERT(int32_t(mRefCnt) > 0, "dup release");
nsrefcnt count = --mRefCnt;
NS_LOG_RELEASE(this, count, "nsPipe");
if (count == 0) {
delete (this);
return 0;
}
// Avoid racing on |mOriginalInput| by only looking at it when
// the refcount is 1, that is, we are the only pointer (hence only
// thread) to access it.
if (count == 1 && mOriginalInput) {
mOriginalInput = nullptr;
return 1;
}
return count;
}
NS_IMETHODIMP
nsPipe::Init(bool aNonBlockingIn,
bool aNonBlockingOut,
uint32_t aSegmentSize,
uint32_t aSegmentCount)
{
mInited = true;
if (aSegmentSize == 0) {
aSegmentSize = DEFAULT_SEGMENT_SIZE;
}
if (aSegmentCount == 0) {
aSegmentCount = DEFAULT_SEGMENT_COUNT;
}
// protect against overflow
uint32_t maxCount = uint32_t(-1) / aSegmentSize;
if (aSegmentCount > maxCount) {
aSegmentCount = maxCount;
}
// The internal buffer is always "infinite" so that we can allow
// the size to expand when cloned streams are read at different
// rates. We enforce a limit on how much data can be buffered
// ahead of the fastest reader in GetWriteSegment().
nsresult rv = mBuffer.Init(aSegmentSize, UINT32_MAX);
if (NS_FAILED(rv)) {
return rv;
}
mMaxAdvanceBufferSegmentCount = aSegmentCount;
mOutput.SetNonBlocking(aNonBlockingOut);
mOriginalInput->SetNonBlocking(aNonBlockingIn);
return NS_OK;
}
NS_IMETHODIMP
nsPipe::GetInputStream(nsIAsyncInputStream** aInputStream)
{
if (NS_WARN_IF(!mInited)) {
return NS_ERROR_NOT_INITIALIZED;
}
RefPtr<nsPipeInputStream> ref = mOriginalInput;
ref.forget(aInputStream);
return NS_OK;
}
NS_IMETHODIMP
nsPipe::GetOutputStream(nsIAsyncOutputStream** aOutputStream)
{
if (NS_WARN_IF(!mInited)) {
return NS_ERROR_NOT_INITIALIZED;
}
NS_ADDREF(*aOutputStream = &mOutput);
return NS_OK;
}
void
nsPipe::PeekSegment(const nsPipeReadState& aReadState, uint32_t aIndex,
char*& aCursor, char*& aLimit)
{
if (aIndex == 0) {
MOZ_DIAGNOSTIC_ASSERT(!aReadState.mReadCursor || mBuffer.GetSegmentCount());
aCursor = aReadState.mReadCursor;
aLimit = aReadState.mReadLimit;
} else {
uint32_t absoluteIndex = aReadState.mSegment + aIndex;
uint32_t numSegments = mBuffer.GetSegmentCount();
if (absoluteIndex >= numSegments) {
aCursor = aLimit = nullptr;
} else {
aCursor = mBuffer.GetSegment(absoluteIndex);
if (mWriteSegment == (int32_t)absoluteIndex) {
aLimit = mWriteCursor;
} else {
aLimit = aCursor + mBuffer.GetSegmentSize();
}
}
}
}
nsresult
nsPipe::GetReadSegment(nsPipeReadState& aReadState, const char*& aSegment,
uint32_t& aLength)
{
ReentrantMonitorAutoEnter mon(mReentrantMonitor);
if (aReadState.mReadCursor == aReadState.mReadLimit) {
return NS_FAILED(mStatus) ? mStatus : NS_BASE_STREAM_WOULD_BLOCK;
}
// The input stream locks the pipe while getting the buffer to read from,
// but then unlocks while actual data copying is taking place. In
// order to avoid deleting the buffer out from under this lockless read
// set a flag to indicate a read is active. This flag is only modified
// while the lock is held.
MOZ_DIAGNOSTIC_ASSERT(!aReadState.mActiveRead);
aReadState.mActiveRead = true;
aSegment = aReadState.mReadCursor;
aLength = aReadState.mReadLimit - aReadState.mReadCursor;
MOZ_DIAGNOSTIC_ASSERT(aLength <= aReadState.mAvailable);
return NS_OK;
}
void
nsPipe::ReleaseReadSegment(nsPipeReadState& aReadState, nsPipeEvents& aEvents)
{
ReentrantMonitorAutoEnter mon(mReentrantMonitor);
MOZ_DIAGNOSTIC_ASSERT(aReadState.mActiveRead);
aReadState.mActiveRead = false;
// When a read completes and releases the mActiveRead flag, we may have blocked
// a drain from completing. This occurs when the input stream is closed during
// the read. In these cases, we need to complete the drain as soon as the
// active read completes.
if (aReadState.mNeedDrain) {
aReadState.mNeedDrain = false;
DrainInputStream(aReadState, aEvents);
}
}
void
nsPipe::AdvanceReadCursor(nsPipeReadState& aReadState, uint32_t aBytesRead)
{
MOZ_DIAGNOSTIC_ASSERT(aBytesRead > 0);
nsPipeEvents events;
{
ReentrantMonitorAutoEnter mon(mReentrantMonitor);
LOG(("III advancing read cursor by %u\n", aBytesRead));
MOZ_DIAGNOSTIC_ASSERT(aBytesRead <= mBuffer.GetSegmentSize());
aReadState.mReadCursor += aBytesRead;
MOZ_DIAGNOSTIC_ASSERT(aReadState.mReadCursor <= aReadState.mReadLimit);
MOZ_DIAGNOSTIC_ASSERT(aReadState.mAvailable >= aBytesRead);
aReadState.mAvailable -= aBytesRead;
// Check to see if we're at the end of the available read data. If we
// are, and this segment is not still being written, then we can possibly
// free up the segment.
if (aReadState.mReadCursor == aReadState.mReadLimit &&
!ReadSegmentBeingWritten(aReadState)) {
// Advance the segment position. If we have read any segments from the
// advance buffer then we can potentially notify blocked writers.
if (AdvanceReadSegment(aReadState, mon) == SegmentAdvanceBufferRead &&
mOutput.OnOutputWritable(events) == NotifyMonitor) {
mon.NotifyAll();
}
}
ReleaseReadSegment(aReadState, events);
}
}
SegmentChangeResult
nsPipe::AdvanceReadSegment(nsPipeReadState& aReadState,
const ReentrantMonitorAutoEnter &ev)
{
// Calculate how many segments are buffered for this stream to start.
uint32_t startBufferSegments = GetBufferSegmentCount(aReadState, ev);
int32_t currentSegment = aReadState.mSegment;
// Move to the next segment to read
aReadState.mSegment += 1;
// If this was the last reference to the first segment, then remove it.
if (currentSegment == 0 && CountSegmentReferences(currentSegment) == 0) {
// shift write and read segment index (-1 indicates an empty buffer).
mWriteSegment -= 1;
// Directly modify the current read state. If the associated input
// stream is closed simultaneous with reading, then it may not be
// in the mInputList any more.
aReadState.mSegment -= 1;
for (uint32_t i = 0; i < mInputList.Length(); ++i) {
// Skip the current read state structure since we modify it manually
// before entering this loop.
if (&mInputList[i]->ReadState() == &aReadState) {
continue;
}
mInputList[i]->ReadState().mSegment -= 1;
}
// done with this segment
mBuffer.DeleteFirstSegment();
LOG(("III deleting first segment\n"));
}
if (mWriteSegment < aReadState.mSegment) {
// read cursor has hit the end of written data, so reset it
MOZ_DIAGNOSTIC_ASSERT(mWriteSegment == (aReadState.mSegment - 1));
aReadState.mReadCursor = nullptr;
aReadState.mReadLimit = nullptr;
// also, the buffer is completely empty, so reset the write cursor
if (mWriteSegment == -1) {
mWriteCursor = nullptr;
mWriteLimit = nullptr;
}
} else {
// advance read cursor and limit to next buffer segment
aReadState.mReadCursor = mBuffer.GetSegment(aReadState.mSegment);
if (mWriteSegment == aReadState.mSegment) {
aReadState.mReadLimit = mWriteCursor;
} else {
aReadState.mReadLimit = aReadState.mReadCursor + mBuffer.GetSegmentSize();
}
}
// Calculate how many segments are buffered for the stream after
// reading.
uint32_t endBufferSegments = GetBufferSegmentCount(aReadState, ev);
// If the stream has read a segment out of the set of advanced buffer
// segments, then the writer may advance.
if (startBufferSegments >= mMaxAdvanceBufferSegmentCount &&
endBufferSegments < mMaxAdvanceBufferSegmentCount) {
return SegmentAdvanceBufferRead;
}
// Otherwise there are no significant changes to the segment structure.
return SegmentNotChanged;
}
void
nsPipe::DrainInputStream(nsPipeReadState& aReadState, nsPipeEvents& aEvents)
{
ReentrantMonitorAutoEnter mon(mReentrantMonitor);
// If a segment is actively being read in ReadSegments() for this input
// stream, then we cannot drain the stream. This can happen because
// ReadSegments() does not hold the lock while copying from the buffer.
// If we detect this condition, simply note that we need a drain once
// the read completes and return immediately.
if (aReadState.mActiveRead) {
MOZ_DIAGNOSTIC_ASSERT(!aReadState.mNeedDrain);
aReadState.mNeedDrain = true;
return;
}
while(mWriteSegment >= aReadState.mSegment) {
// If the last segment to free is still being written to, we're done
// draining. We can't free any more.
if (ReadSegmentBeingWritten(aReadState)) {
break;
}
// Don't bother checking if this results in an advance buffer segment
// read. Since we are draining the entire stream we will read an
// advance buffer segment no matter what.
AdvanceReadSegment(aReadState, mon);
}
// Force the stream into an empty state. Make sure mAvailable, mCursor, and
// mReadLimit are consistent with one another.
aReadState.mAvailable = 0;
aReadState.mReadCursor = nullptr;
aReadState.mReadLimit = nullptr;
// Remove the input stream from the pipe's list of streams. This will
// prevent the pipe from holding the stream alive or trying to update
// its read state any further.
DebugOnly<uint32_t> numRemoved = 0;
mInputList.RemoveElementsBy([&](nsPipeInputStream* aEntry) {
bool result = &aReadState == &aEntry->ReadState();
numRemoved += result ? 1 : 0;
return result;
});
MOZ_ASSERT(numRemoved == 1);
// If we have read any segments from the advance buffer then we can
// potentially notify blocked writers.
if (!IsAdvanceBufferFull(mon) &&
mOutput.OnOutputWritable(aEvents) == NotifyMonitor) {
mon.NotifyAll();
}
}
bool
nsPipe::ReadSegmentBeingWritten(nsPipeReadState& aReadState)
{
mReentrantMonitor.AssertCurrentThreadIn();
bool beingWritten = mWriteSegment == aReadState.mSegment &&
mWriteLimit > mWriteCursor;
MOZ_DIAGNOSTIC_ASSERT(!beingWritten || aReadState.mReadLimit == mWriteCursor);
return beingWritten;
}
nsresult
nsPipe::GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen)
{
ReentrantMonitorAutoEnter mon(mReentrantMonitor);
if (NS_FAILED(mStatus)) {
return mStatus;
}
// write cursor and limit may both be null indicating an empty buffer.
if (mWriteCursor == mWriteLimit) {
// The pipe is full if we have hit our limit on advance data buffering.
// This means the fastest reader is still reading slower than data is
// being written into the pipe.
if (IsAdvanceBufferFull(mon)) {
return NS_BASE_STREAM_WOULD_BLOCK;
}
// The nsSegmentedBuffer is configured to be "infinite", so this
// should never return nullptr here.
char* seg = mBuffer.AppendNewSegment();
if (!seg) {
return NS_ERROR_OUT_OF_MEMORY;
}
LOG(("OOO appended new segment\n"));
mWriteCursor = seg;
mWriteLimit = mWriteCursor + mBuffer.GetSegmentSize();
++mWriteSegment;
}
// make sure read cursor is initialized
SetAllNullReadCursors();
// check to see if we can roll-back our read and write cursors to the
// beginning of the current/first segment. this is purely an optimization.
if (mWriteSegment == 0 && AllReadCursorsMatchWriteCursor()) {
char* head = mBuffer.GetSegment(0);
LOG(("OOO rolling back write cursor %" PRId64 " bytes\n",
static_cast<int64_t>(mWriteCursor - head)));
RollBackAllReadCursors(head);
mWriteCursor = head;
}
aSegment = mWriteCursor;
aSegmentLen = mWriteLimit - mWriteCursor;
return NS_OK;
}
void
nsPipe::AdvanceWriteCursor(uint32_t aBytesWritten)
{
MOZ_DIAGNOSTIC_ASSERT(aBytesWritten > 0);
nsPipeEvents events;
{
ReentrantMonitorAutoEnter mon(mReentrantMonitor);
LOG(("OOO advancing write cursor by %u\n", aBytesWritten));
char* newWriteCursor = mWriteCursor + aBytesWritten;
MOZ_DIAGNOSTIC_ASSERT(newWriteCursor <= mWriteLimit);
// update read limit if reading in the same segment
UpdateAllReadCursors(newWriteCursor);
mWriteCursor = newWriteCursor;
ValidateAllReadCursors();
// update the writable flag on the output stream
if (mWriteCursor == mWriteLimit) {
mOutput.SetWritable(!IsAdvanceBufferFull(mon));
}
// notify input stream that pipe now contains additional data
bool needNotify = false;
for (uint32_t i = 0; i < mInputList.Length(); ++i) {
if (mInputList[i]->OnInputReadable(aBytesWritten, events, mon)
== NotifyMonitor) {
needNotify = true;
}
}
if (needNotify) {
mon.NotifyAll();
}
}
}
void
nsPipe::OnInputStreamException(nsPipeInputStream* aStream, nsresult aReason)
{
MOZ_DIAGNOSTIC_ASSERT(NS_FAILED(aReason));
nsPipeEvents events;
{
ReentrantMonitorAutoEnter mon(mReentrantMonitor);
// Its possible to re-enter this method when we call OnPipeException() or
// OnInputExection() below. If there is a caller stuck in our synchronous
// Wait() method, then they will get woken up with a failure code which
// re-enters this method. Therefore, gracefully handle unknown streams
// here.
// If we only have one stream open and it is the given stream, then shut
// down the entire pipe.
if (mInputList.Length() == 1) {
if (mInputList[0] == aStream) {
OnPipeException(aReason);
}
return;
}
// Otherwise just close the particular stream that hit an exception.
for (uint32_t i = 0; i < mInputList.Length(); ++i) {
if (mInputList[i] != aStream) {
continue;
}
MonitorAction action = mInputList[i]->OnInputException(aReason, events,
mon);
// Notify after element is removed in case we re-enter as a result.
if (action == NotifyMonitor) {
mon.NotifyAll();
}
return;
}
}
}
void
nsPipe::OnPipeException(nsresult aReason, bool aOutputOnly)
{
LOG(("PPP nsPipe::OnPipeException [reason=%" PRIx32 " output-only=%d]\n",
static_cast<uint32_t>(aReason), aOutputOnly));
nsPipeEvents events;
{
ReentrantMonitorAutoEnter mon(mReentrantMonitor);
// if we've already hit an exception, then ignore this one.
if (NS_FAILED(mStatus)) {
return;
}
mStatus = aReason;
bool needNotify = false;
// OnInputException() can drain the stream and remove it from
// mInputList. So iterate over a temp list instead.
nsTArray<nsPipeInputStream*> list(mInputList);
for (uint32_t i = 0; i < list.Length(); ++i) {
// an output-only exception applies to the input end if the pipe has
// zero bytes available.
if (aOutputOnly && list[i]->Available()) {
continue;
}
if (list[i]->OnInputException(aReason, events, mon) == NotifyMonitor) {
needNotify = true;
}
}
if (mOutput.OnOutputException(aReason, events) == NotifyMonitor) {
needNotify = true;
}
// Notify after we have removed any input streams from mInputList
if (needNotify) {
mon.NotifyAll();
}
}
}
nsresult
nsPipe::CloneInputStream(nsPipeInputStream* aOriginal,
nsIInputStream** aCloneOut)
{
ReentrantMonitorAutoEnter mon(mReentrantMonitor);
RefPtr<nsPipeInputStream> ref = new nsPipeInputStream(*aOriginal);
mInputList.AppendElement(ref);
nsCOMPtr<nsIAsyncInputStream> downcast = ref.forget();
downcast.forget(aCloneOut);
return NS_OK;
}
uint32_t
nsPipe::CountSegmentReferences(int32_t aSegment)
{
mReentrantMonitor.AssertCurrentThreadIn();
uint32_t count = 0;
for (uint32_t i = 0; i < mInputList.Length(); ++i) {
if (aSegment >= mInputList[i]->ReadState().mSegment) {
count += 1;
}
}
return count;
}
void
nsPipe::SetAllNullReadCursors()
{
mReentrantMonitor.AssertCurrentThreadIn();
for (uint32_t i = 0; i < mInputList.Length(); ++i) {
nsPipeReadState& readState = mInputList[i]->ReadState();
if (!readState.mReadCursor) {
MOZ_DIAGNOSTIC_ASSERT(mWriteSegment == readState.mSegment);
readState.mReadCursor = readState.mReadLimit = mWriteCursor;
}
}
}
bool
nsPipe::AllReadCursorsMatchWriteCursor()
{
mReentrantMonitor.AssertCurrentThreadIn();
for (uint32_t i = 0; i < mInputList.Length(); ++i) {
const nsPipeReadState& readState = mInputList[i]->ReadState();
if (readState.mSegment != mWriteSegment ||
readState.mReadCursor != mWriteCursor) {
return false;
}
}
return true;
}
void
nsPipe::RollBackAllReadCursors(char* aWriteCursor)
{
mReentrantMonitor.AssertCurrentThreadIn();
for (uint32_t i = 0; i < mInputList.Length(); ++i) {
nsPipeReadState& readState = mInputList[i]->ReadState();
MOZ_DIAGNOSTIC_ASSERT(mWriteSegment == readState.mSegment);
MOZ_DIAGNOSTIC_ASSERT(mWriteCursor == readState.mReadCursor);
MOZ_DIAGNOSTIC_ASSERT(mWriteCursor == readState.mReadLimit);
readState.mReadCursor = aWriteCursor;
readState.mReadLimit = aWriteCursor;
}
}
void
nsPipe::UpdateAllReadCursors(char* aWriteCursor)
{
mReentrantMonitor.AssertCurrentThreadIn();
for (uint32_t i = 0; i < mInputList.Length(); ++i) {
nsPipeReadState& readState = mInputList[i]->ReadState();
if (mWriteSegment == readState.mSegment &&
readState.mReadLimit == mWriteCursor) {
readState.mReadLimit = aWriteCursor;
}
}
}
void
nsPipe::ValidateAllReadCursors()
{
mReentrantMonitor.AssertCurrentThreadIn();
// The only way mReadCursor == mWriteCursor is if:
//
// - mReadCursor is at the start of a segment (which, based on how
// nsSegmentedBuffer works, means that this segment is the "first"
// segment)
// - mWriteCursor points at the location past the end of the current
// write segment (so the current write filled the current write
// segment, so we've incremented mWriteCursor to point past the end
// of it)
// - the segment to which data has just been written is located
// exactly one segment's worth of bytes before the first segment
// where mReadCursor is located
//
// Consequently, the byte immediately after the end of the current
// write segment is the first byte of the first segment, so
// mReadCursor == mWriteCursor. (Another way to think about this is
// to consider the buffer architecture diagram above, but consider it
// with an arena allocator which allocates from the *end* of the
// arena to the *beginning* of the arena.)
#ifdef DEBUG
for (uint32_t i = 0; i < mInputList.Length(); ++i) {
const nsPipeReadState& state = mInputList[i]->ReadState();
MOZ_ASSERT(state.mReadCursor != mWriteCursor ||
(mBuffer.GetSegment(state.mSegment) == state.mReadCursor &&
mWriteCursor == mWriteLimit));
}
#endif
}
uint32_t
nsPipe::GetBufferSegmentCount(const nsPipeReadState& aReadState,
const ReentrantMonitorAutoEnter& ev) const
{
// The write segment can be smaller than the current reader position
// in some cases. For example, when the first write segment has not
// been allocated yet mWriteSegment is negative. In these cases
// the stream is effectively using zero segments.
if (mWriteSegment < aReadState.mSegment) {
return 0;
}
MOZ_DIAGNOSTIC_ASSERT(mWriteSegment >= 0);
MOZ_DIAGNOSTIC_ASSERT(aReadState.mSegment >= 0);
// Otherwise at least one segment is being used. We add one here
// since a single segment is being used when the write and read
// segment indices are the same.
return 1 + mWriteSegment - aReadState.mSegment;
}
bool
nsPipe::IsAdvanceBufferFull(const ReentrantMonitorAutoEnter& ev) const
{
// If we have fewer total segments than the limit we can immediately
// determine we are not full. Note, we must add one to mWriteSegment
// to convert from a index to a count.
MOZ_DIAGNOSTIC_ASSERT(mWriteSegment >= -1);
MOZ_DIAGNOSTIC_ASSERT(mWriteSegment < INT32_MAX);
uint32_t totalWriteSegments = mWriteSegment + 1;
if (totalWriteSegments < mMaxAdvanceBufferSegmentCount) {
return false;
}
// Otherwise we must inspect all of our reader streams. We need
// to determine the buffer depth of the fastest reader.
uint32_t minBufferSegments = UINT32_MAX;
for (uint32_t i = 0; i < mInputList.Length(); ++i) {
// Only count buffer segments from input streams that are open.
if (NS_FAILED(mInputList[i]->Status(ev))) {
continue;
}
const nsPipeReadState& state = mInputList[i]->ReadState();
uint32_t bufferSegments = GetBufferSegmentCount(state, ev);
minBufferSegments = std::min(minBufferSegments, bufferSegments);
// We only care if any reader has fewer segments buffered than
// our threshold. We can stop once we hit that threshold.
if (minBufferSegments < mMaxAdvanceBufferSegmentCount) {
return false;
}
}
// Note, its possible for minBufferSegments to exceed our
// mMaxAdvanceBufferSegmentCount here. This happens when a cloned
// reader gets far behind, but then the fastest reader stream is
// closed. This leaves us with a single stream that is buffered
// beyond our max. Naturally we continue to indicate the pipe
// is full at this point.
return true;
}
//-----------------------------------------------------------------------------
// nsPipeEvents methods:
//-----------------------------------------------------------------------------
nsPipeEvents::~nsPipeEvents()
{
// dispatch any pending events
for (uint32_t i = 0; i < mInputList.Length(); ++i) {
mInputList[i].mCallback->OnInputStreamReady(mInputList[i].mStream);
}
mInputList.Clear();
if (mOutputCallback) {
mOutputCallback->OnOutputStreamReady(mOutputStream);
mOutputCallback = nullptr;
mOutputStream = nullptr;
}
}
//-----------------------------------------------------------------------------
// nsPipeInputStream methods:
//-----------------------------------------------------------------------------
NS_IMPL_ADDREF(nsPipeInputStream);
NS_IMPL_RELEASE(nsPipeInputStream);
NS_INTERFACE_TABLE_HEAD(nsPipeInputStream)
NS_INTERFACE_TABLE_BEGIN
NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsIAsyncInputStream)
NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsISeekableStream)
NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsISearchableInputStream)
NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsICloneableInputStream)
NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsIBufferedInputStream)
NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsIClassInfo)
NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsPipeInputStream, nsIInputStream,
nsIAsyncInputStream)
NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsPipeInputStream, nsISupports,
nsIAsyncInputStream)
NS_INTERFACE_TABLE_END
NS_INTERFACE_TABLE_TAIL
NS_IMPL_CI_INTERFACE_GETTER(nsPipeInputStream,
nsIInputStream,
nsIAsyncInputStream,
nsISeekableStream,
nsISearchableInputStream,
nsICloneableInputStream,
nsIBufferedInputStream)
NS_IMPL_THREADSAFE_CI(nsPipeInputStream)
NS_IMETHODIMP
nsPipeInputStream::Init(nsIInputStream*, uint32_t)
{
MOZ_CRASH("nsPipeInputStream should never be initialized with "
"nsIBufferedInputStream::Init!\n");
}
NS_IMETHODIMP
nsPipeInputStream::GetData(nsIInputStream **aResult)
{
// as this was not created with init() we are not
// wrapping anything
return NS_ERROR_NOT_IMPLEMENTED;
}
uint32_t
nsPipeInputStream::Available()
{
mPipe->mReentrantMonitor.AssertCurrentThreadIn();
return mReadState.mAvailable;
}
nsresult
nsPipeInputStream::Wait()
{
MOZ_DIAGNOSTIC_ASSERT(mBlocking);
ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
while (NS_SUCCEEDED(Status(mon)) && (mReadState.mAvailable == 0)) {
LOG(("III pipe input: waiting for data\n"));
mBlocked = true;
mon.Wait();
mBlocked = false;
LOG(("III pipe input: woke up [status=%" PRIx32 " available=%u]\n",
static_cast<uint32_t>(Status(mon)), mReadState.mAvailable));
}
return Status(mon) == NS_BASE_STREAM_CLOSED ? NS_OK : Status(mon);
}
MonitorAction
nsPipeInputStream::OnInputReadable(uint32_t aBytesWritten,
nsPipeEvents& aEvents,
const ReentrantMonitorAutoEnter& ev)
{
MonitorAction result = DoNotNotifyMonitor;
mPipe->mReentrantMonitor.AssertCurrentThreadIn();
mReadState.mAvailable += aBytesWritten;
if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) {
aEvents.NotifyInputReady(this, mCallback);
mCallback = nullptr;
mCallbackFlags = 0;
} else if (mBlocked) {
result = NotifyMonitor;
}
return result;
}
MonitorAction
nsPipeInputStream::OnInputException(nsresult aReason, nsPipeEvents& aEvents,
const ReentrantMonitorAutoEnter& ev)
{
LOG(("nsPipeInputStream::OnInputException [this=%p reason=%" PRIx32 "]\n",
this, static_cast<uint32_t>(aReason)));
MonitorAction result = DoNotNotifyMonitor;
MOZ_DIAGNOSTIC_ASSERT(NS_FAILED(aReason));
if (NS_SUCCEEDED(mInputStatus)) {
mInputStatus = aReason;
}
// force count of available bytes to zero.
mPipe->DrainInputStream(mReadState, aEvents);
if (mCallback) {
aEvents.NotifyInputReady(this, mCallback);
mCallback = nullptr;
mCallbackFlags = 0;
} else if (mBlocked) {
result = NotifyMonitor;
}
return result;
}
NS_IMETHODIMP
nsPipeInputStream::CloseWithStatus(nsresult aReason)
{
LOG(("III CloseWithStatus [this=%p reason=%" PRIx32 "]\n",
this, static_cast<uint32_t>(aReason)));
ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
if (NS_FAILED(mInputStatus)) {
return NS_OK;
}
if (NS_SUCCEEDED(aReason)) {
aReason = NS_BASE_STREAM_CLOSED;
}
mPipe->OnInputStreamException(this, aReason);
return NS_OK;
}
NS_IMETHODIMP
nsPipeInputStream::Close()
{
return CloseWithStatus(NS_BASE_STREAM_CLOSED);
}
NS_IMETHODIMP
nsPipeInputStream::Available(uint64_t* aResult)
{
// nsPipeInputStream supports under 4GB stream only
ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
// return error if closed
if (!mReadState.mAvailable && NS_FAILED(Status(mon))) {
return Status(mon);
}
*aResult = (uint64_t)mReadState.mAvailable;
return NS_OK;
}
NS_IMETHODIMP
nsPipeInputStream::ReadSegments(nsWriteSegmentFun aWriter,
void* aClosure,
uint32_t aCount,
uint32_t* aReadCount)
{
LOG(("III ReadSegments [this=%p count=%u]\n", this, aCount));
nsresult rv = NS_OK;
*aReadCount = 0;
while (aCount) {
AutoReadSegment segment(mPipe, mReadState, aCount);
rv = segment.Status();
if (NS_FAILED(rv)) {
// ignore this error if we've already read something.
if (*aReadCount > 0) {
rv = NS_OK;
break;
}
if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
// pipe is empty
if (!mBlocking) {
break;
}
// wait for some data to be written to the pipe
rv = Wait();
if (NS_SUCCEEDED(rv)) {
continue;
}
}
// ignore this error, just return.
if (rv == NS_BASE_STREAM_CLOSED) {
rv = NS_OK;
break;
}
mPipe->OnInputStreamException(this, rv);
break;
}
uint32_t writeCount;
while (segment.Length()) {
writeCount = 0;
rv = aWriter(static_cast<nsIAsyncInputStream*>(this), aClosure,
segment.Data(), *aReadCount, segment.Length(), &writeCount);
if (NS_FAILED(rv) || writeCount == 0) {
aCount = 0;
// any errors returned from the writer end here: do not
// propagate to the caller of ReadSegments.
rv = NS_OK;
break;
}
MOZ_DIAGNOSTIC_ASSERT(writeCount <= segment.Length());
segment.Advance(writeCount);
aCount -= writeCount;
*aReadCount += writeCount;
mLogicalOffset += writeCount;
}
}
return rv;
}
NS_IMETHODIMP
nsPipeInputStream::Read(char* aToBuf, uint32_t aBufLen, uint32_t* aReadCount)
{
return ReadSegments(NS_CopySegmentToBuffer, aToBuf, aBufLen, aReadCount);
}
NS_IMETHODIMP
nsPipeInputStream::IsNonBlocking(bool* aNonBlocking)
{
*aNonBlocking = !mBlocking;
return NS_OK;
}
NS_IMETHODIMP
nsPipeInputStream::AsyncWait(nsIInputStreamCallback* aCallback,
uint32_t aFlags,
uint32_t aRequestedCount,
nsIEventTarget* aTarget)
{
LOG(("III AsyncWait [this=%p]\n", this));
nsPipeEvents pipeEvents;
{
ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
// replace a pending callback
mCallback = nullptr;
mCallbackFlags = 0;
if (!aCallback) {
return NS_OK;
}
nsCOMPtr<nsIInputStreamCallback> proxy;
if (aTarget) {
proxy = NS_NewInputStreamReadyEvent("nsPipeInputStream::AsyncWait",
aCallback, aTarget);
aCallback = proxy;
}
if (NS_FAILED(Status(mon)) ||
(mReadState.mAvailable && !(aFlags & WAIT_CLOSURE_ONLY))) {
// stream is already closed or readable; post event.
pipeEvents.NotifyInputReady(this, aCallback);
} else {
// queue up callback object to be notified when data becomes available
mCallback = aCallback;
mCallbackFlags = aFlags;
}
}
return NS_OK;
}
NS_IMETHODIMP
nsPipeInputStream::Seek(int32_t aWhence, int64_t aOffset)
{
NS_NOTREACHED("nsPipeInputStream::Seek");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
nsPipeInputStream::Tell(int64_t* aOffset)
{
ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
// return error if closed
if (!mReadState.mAvailable && NS_FAILED(Status(mon))) {
return Status(mon);
}
*aOffset = mLogicalOffset;
return NS_OK;
}
NS_IMETHODIMP
nsPipeInputStream::SetEOF()
{
NS_NOTREACHED("nsPipeInputStream::SetEOF");
return NS_ERROR_NOT_IMPLEMENTED;
}
static bool strings_equal(bool aIgnoreCase,
const char* aS1, const char* aS2, uint32_t aLen)
{
return aIgnoreCase
? !nsCRT::strncasecmp(aS1, aS2, aLen) : !nsCRT::strncmp(aS1, aS2, aLen);
}
NS_IMETHODIMP
nsPipeInputStream::Search(const char* aForString,
bool aIgnoreCase,
bool* aFound,
uint32_t* aOffsetSearchedTo)
{
LOG(("III Search [for=%s ic=%u]\n", aForString, aIgnoreCase));
ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
char* cursor1;
char* limit1;
uint32_t index = 0, offset = 0;
uint32_t strLen = strlen(aForString);
mPipe->PeekSegment(mReadState, 0, cursor1, limit1);
if (cursor1 == limit1) {
*aFound = false;
*aOffsetSearchedTo = 0;
LOG((" result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
return NS_OK;
}
while (true) {
uint32_t i, len1 = limit1 - cursor1;
// check if the string is in the buffer segment
for (i = 0; i < len1 - strLen + 1; i++) {
if (strings_equal(aIgnoreCase, &cursor1[i], aForString, strLen)) {
*aFound = true;
*aOffsetSearchedTo = offset + i;
LOG((" result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
return NS_OK;
}
}
// get the next segment
char* cursor2;
char* limit2;
uint32_t len2;
index++;
offset += len1;
mPipe->PeekSegment(mReadState, index, cursor2, limit2);
if (cursor2 == limit2) {
*aFound = false;
*aOffsetSearchedTo = offset - strLen + 1;
LOG((" result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
return NS_OK;
}
len2 = limit2 - cursor2;
// check if the string is straddling the next buffer segment
uint32_t lim = XPCOM_MIN(strLen, len2 + 1);
for (i = 0; i < lim; ++i) {
uint32_t strPart1Len = strLen - i - 1;
uint32_t strPart2Len = strLen - strPart1Len;
const char* strPart2 = &aForString[strLen - strPart2Len];
uint32_t bufSeg1Offset = len1 - strPart1Len;
if (strings_equal(aIgnoreCase, &cursor1[bufSeg1Offset], aForString, strPart1Len) &&
strings_equal(aIgnoreCase, cursor2, strPart2, strPart2Len)) {
*aFound = true;
*aOffsetSearchedTo = offset - strPart1Len;
LOG((" result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
return NS_OK;
}
}
// finally continue with the next buffer
cursor1 = cursor2;
limit1 = limit2;
}
NS_NOTREACHED("can't get here");
return NS_ERROR_UNEXPECTED; // keep compiler happy
}
NS_IMETHODIMP
nsPipeInputStream::GetCloneable(bool* aCloneableOut)
{
*aCloneableOut = true;
return NS_OK;
}
NS_IMETHODIMP
nsPipeInputStream::Clone(nsIInputStream** aCloneOut)
{
return mPipe->CloneInputStream(this, aCloneOut);
}
nsresult
nsPipeInputStream::Status(const ReentrantMonitorAutoEnter& ev) const
{
if (NS_FAILED(mInputStatus)) {
return mInputStatus;
}
if (mReadState.mAvailable) {
// Still something to read and this input stream state is OK.
return NS_OK;
}
// Nothing to read, just fall through to the pipe's state that
// may reflect state of its output stream side (already closed).
return mPipe->mStatus;
}
nsresult
nsPipeInputStream::Status() const
{
ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
return Status(mon);
}
nsPipeInputStream::~nsPipeInputStream()
{
Close();
}
//-----------------------------------------------------------------------------
// nsPipeOutputStream methods:
//-----------------------------------------------------------------------------
NS_IMPL_QUERY_INTERFACE(nsPipeOutputStream,
nsIOutputStream,
nsIAsyncOutputStream,
nsIClassInfo)
NS_IMPL_CI_INTERFACE_GETTER(nsPipeOutputStream,
nsIOutputStream,
nsIAsyncOutputStream)
NS_IMPL_THREADSAFE_CI(nsPipeOutputStream)
nsresult
nsPipeOutputStream::Wait()
{
MOZ_DIAGNOSTIC_ASSERT(mBlocking);
ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
if (NS_SUCCEEDED(mPipe->mStatus) && !mWritable) {
LOG(("OOO pipe output: waiting for space\n"));
mBlocked = true;
mon.Wait();
mBlocked = false;
LOG(("OOO pipe output: woke up [pipe-status=%" PRIx32 " writable=%u]\n",
static_cast<uint32_t>(mPipe->mStatus), mWritable));
}
return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus;
}
MonitorAction
nsPipeOutputStream::OnOutputWritable(nsPipeEvents& aEvents)
{
MonitorAction result = DoNotNotifyMonitor;
mWritable = true;
if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) {
aEvents.NotifyOutputReady(this, mCallback);
mCallback = nullptr;
mCallbackFlags = 0;
} else if (mBlocked) {
result = NotifyMonitor;
}
return result;
}
MonitorAction
nsPipeOutputStream::OnOutputException(nsresult aReason, nsPipeEvents& aEvents)
{
LOG(("nsPipeOutputStream::OnOutputException [this=%p reason=%" PRIx32 "]\n",
this, static_cast<uint32_t>(aReason)));
MonitorAction result = DoNotNotifyMonitor;
MOZ_DIAGNOSTIC_ASSERT(NS_FAILED(aReason));
mWritable = false;
if (mCallback) {
aEvents.NotifyOutputReady(this, mCallback);
mCallback = nullptr;
mCallbackFlags = 0;
} else if (mBlocked) {
result = NotifyMonitor;
}
return result;
}
NS_IMETHODIMP_(MozExternalRefCountType)
nsPipeOutputStream::AddRef()
{
++mWriterRefCnt;
return mPipe->AddRef();
}
NS_IMETHODIMP_(MozExternalRefCountType)
nsPipeOutputStream::Release()
{
if (--mWriterRefCnt == 0) {
Close();
}
return mPipe->Release();
}
NS_IMETHODIMP
nsPipeOutputStream::CloseWithStatus(nsresult aReason)
{
LOG(("OOO CloseWithStatus [this=%p reason=%" PRIx32 "]\n",
this, static_cast<uint32_t>(aReason)));
if (NS_SUCCEEDED(aReason)) {
aReason = NS_BASE_STREAM_CLOSED;
}
// input stream may remain open
mPipe->OnPipeException(aReason, true);
return NS_OK;
}
NS_IMETHODIMP
nsPipeOutputStream::Close()
{
return CloseWithStatus(NS_BASE_STREAM_CLOSED);
}
NS_IMETHODIMP
nsPipeOutputStream::WriteSegments(nsReadSegmentFun aReader,
void* aClosure,
uint32_t aCount,
uint32_t* aWriteCount)
{
LOG(("OOO WriteSegments [this=%p count=%u]\n", this, aCount));
nsresult rv = NS_OK;
char* segment;
uint32_t segmentLen;
*aWriteCount = 0;
while (aCount) {
rv = mPipe->GetWriteSegment(segment, segmentLen);
if (NS_FAILED(rv)) {
if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
// pipe is full
if (!mBlocking) {
// ignore this error if we've already written something
if (*aWriteCount > 0) {
rv = NS_OK;
}
break;
}
// wait for the pipe to have an empty segment.
rv = Wait();
if (NS_SUCCEEDED(rv)) {
continue;
}
}
mPipe->OnPipeException(rv);
break;
}
// write no more than aCount
if (segmentLen > aCount) {
segmentLen = aCount;
}
uint32_t readCount, originalLen = segmentLen;
while (segmentLen) {
readCount = 0;
rv = aReader(this, aClosure, segment, *aWriteCount, segmentLen, &readCount);
if (NS_FAILED(rv) || readCount == 0) {
aCount = 0;
// any errors returned from the aReader end here: do not
// propagate to the caller of WriteSegments.
rv = NS_OK;
break;
}
MOZ_DIAGNOSTIC_ASSERT(readCount <= segmentLen);
segment += readCount;
segmentLen -= readCount;
aCount -= readCount;
*aWriteCount += readCount;
mLogicalOffset += readCount;
}
if (segmentLen < originalLen) {
mPipe->AdvanceWriteCursor(originalLen - segmentLen);
}
}
return rv;
}
static nsresult
nsReadFromRawBuffer(nsIOutputStream* aOutStr,
void* aClosure,
char* aToRawSegment,
uint32_t aOffset,
uint32_t aCount,
uint32_t* aReadCount)
{
const char* fromBuf = (const char*)aClosure;
memcpy(aToRawSegment, &fromBuf[aOffset], aCount);
*aReadCount = aCount;
return NS_OK;
}
NS_IMETHODIMP
nsPipeOutputStream::Write(const char* aFromBuf,
uint32_t aBufLen,
uint32_t* aWriteCount)
{
return WriteSegments(nsReadFromRawBuffer, (void*)aFromBuf, aBufLen, aWriteCount);
}
NS_IMETHODIMP
nsPipeOutputStream::Flush(void)
{
// nothing to do
return NS_OK;
}
static nsresult
nsReadFromInputStream(nsIOutputStream* aOutStr,
void* aClosure,
char* aToRawSegment,
uint32_t aOffset,
uint32_t aCount,
uint32_t* aReadCount)
{
nsIInputStream* fromStream = (nsIInputStream*)aClosure;
return fromStream->Read(aToRawSegment, aCount, aReadCount);
}
NS_IMETHODIMP
nsPipeOutputStream::WriteFrom(nsIInputStream* aFromStream,
uint32_t aCount,
uint32_t* aWriteCount)
{
return WriteSegments(nsReadFromInputStream, aFromStream, aCount, aWriteCount);
}
NS_IMETHODIMP
nsPipeOutputStream::IsNonBlocking(bool* aNonBlocking)
{
*aNonBlocking = !mBlocking;
return NS_OK;
}
NS_IMETHODIMP
nsPipeOutputStream::AsyncWait(nsIOutputStreamCallback* aCallback,
uint32_t aFlags,
uint32_t aRequestedCount,
nsIEventTarget* aTarget)
{
LOG(("OOO AsyncWait [this=%p]\n", this));
nsPipeEvents pipeEvents;
{
ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
// replace a pending callback
mCallback = nullptr;
mCallbackFlags = 0;
if (!aCallback) {
return NS_OK;
}
nsCOMPtr<nsIOutputStreamCallback> proxy;
if (aTarget) {
proxy = NS_NewOutputStreamReadyEvent(aCallback, aTarget);
aCallback = proxy;
}
if (NS_FAILED(mPipe->mStatus) ||
(mWritable && !(aFlags & WAIT_CLOSURE_ONLY))) {
// stream is already closed or writable; post event.
pipeEvents.NotifyOutputReady(this, aCallback);
} else {
// queue up callback object to be notified when data becomes available
mCallback = aCallback;
mCallbackFlags = aFlags;
}
}
return NS_OK;
}
////////////////////////////////////////////////////////////////////////////////
nsresult
NS_NewPipe(nsIInputStream** aPipeIn,
nsIOutputStream** aPipeOut,
uint32_t aSegmentSize,
uint32_t aMaxSize,
bool aNonBlockingInput,
bool aNonBlockingOutput)
{
if (aSegmentSize == 0) {
aSegmentSize = DEFAULT_SEGMENT_SIZE;
}
// Handle aMaxSize of UINT32_MAX as a special case
uint32_t segmentCount;
if (aMaxSize == UINT32_MAX) {
segmentCount = UINT32_MAX;
} else {
segmentCount = aMaxSize / aSegmentSize;
}
nsIAsyncInputStream* in;
nsIAsyncOutputStream* out;
nsresult rv = NS_NewPipe2(&in, &out, aNonBlockingInput, aNonBlockingOutput,
aSegmentSize, segmentCount);
if (NS_FAILED(rv)) {
return rv;
}
*aPipeIn = in;
*aPipeOut = out;
return NS_OK;
}
nsresult
NS_NewPipe2(nsIAsyncInputStream** aPipeIn,
nsIAsyncOutputStream** aPipeOut,
bool aNonBlockingInput,
bool aNonBlockingOutput,
uint32_t aSegmentSize,
uint32_t aSegmentCount)
{
nsPipe* pipe = new nsPipe();
nsresult rv = pipe->Init(aNonBlockingInput,
aNonBlockingOutput,
aSegmentSize,
aSegmentCount);
if (NS_FAILED(rv)) {
NS_ADDREF(pipe);
NS_RELEASE(pipe);
return rv;
}
// These always succeed because the pipe is initialized above.
MOZ_ALWAYS_SUCCEEDS(pipe->GetInputStream(aPipeIn));
MOZ_ALWAYS_SUCCEEDS(pipe->GetOutputStream(aPipeOut));
return NS_OK;
}
nsresult
nsPipeConstructor(nsISupports* aOuter, REFNSIID aIID, void** aResult)
{
if (aOuter) {
return NS_ERROR_NO_AGGREGATION;
}
nsPipe* pipe = new nsPipe();
NS_ADDREF(pipe);
nsresult rv = pipe->QueryInterface(aIID, aResult);
NS_RELEASE(pipe);
return rv;
}
////////////////////////////////////////////////////////////////////////////////