/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim: set ts=8 sts=2 et sw=2 tw=80: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #include "SlicedInputStream.h" #include "mozilla/ipc/InputStreamUtils.h" #include "nsISeekableStream.h" #include "nsStreamUtils.h" namespace mozilla { using namespace ipc; NS_IMPL_ADDREF(SlicedInputStream); NS_IMPL_RELEASE(SlicedInputStream); NS_INTERFACE_MAP_BEGIN(SlicedInputStream) NS_INTERFACE_MAP_ENTRY(nsIInputStream) NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsICloneableInputStream, mWeakCloneableInputStream || !mInputStream) NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIIPCSerializableInputStream, mWeakIPCSerializableInputStream || !mInputStream) NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISeekableStream, mWeakSeekableInputStream || !mInputStream) NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStream, mWeakAsyncInputStream || !mInputStream) NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamCallback, mWeakAsyncInputStream || !mInputStream) NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIInputStream) NS_INTERFACE_MAP_END SlicedInputStream::SlicedInputStream(already_AddRefed aInputStream, uint64_t aStart, uint64_t aLength) : mWeakCloneableInputStream(nullptr) , mWeakIPCSerializableInputStream(nullptr) , mWeakSeekableInputStream(nullptr) , mWeakAsyncInputStream(nullptr) , mStart(aStart) , mLength(aLength) , mCurPos(0) , mClosed(false) , mAsyncWaitFlags(0) , mAsyncWaitRequestedCount(0) { nsCOMPtr inputStream = mozilla::Move(aInputStream); SetSourceStream(inputStream.forget()); } SlicedInputStream::SlicedInputStream() : mWeakCloneableInputStream(nullptr) , mWeakIPCSerializableInputStream(nullptr) , mWeakSeekableInputStream(nullptr) , mWeakAsyncInputStream(nullptr) , mStart(0) , mLength(0) , mCurPos(0) , mClosed(false) , mAsyncWaitFlags(0) , mAsyncWaitRequestedCount(0) {} SlicedInputStream::~SlicedInputStream() {} void SlicedInputStream::SetSourceStream(already_AddRefed aInputStream) { MOZ_ASSERT(!mInputStream); mInputStream = mozilla::Move(aInputStream); nsCOMPtr cloneableStream = do_QueryInterface(mInputStream); if (cloneableStream && SameCOMIdentity(mInputStream, cloneableStream)) { mWeakCloneableInputStream = cloneableStream; } nsCOMPtr serializableStream = do_QueryInterface(mInputStream); if (serializableStream && SameCOMIdentity(mInputStream, serializableStream)) { mWeakIPCSerializableInputStream = serializableStream; } nsCOMPtr seekableStream = do_QueryInterface(mInputStream); if (seekableStream && SameCOMIdentity(mInputStream, seekableStream)) { mWeakSeekableInputStream = seekableStream; } nsCOMPtr asyncInputStream = do_QueryInterface(mInputStream); if (asyncInputStream && SameCOMIdentity(mInputStream, asyncInputStream)) { mWeakAsyncInputStream = asyncInputStream; } } NS_IMETHODIMP SlicedInputStream::Close() { NS_ENSURE_STATE(mInputStream); mClosed = true; return mInputStream->Close(); } // nsIInputStream interface NS_IMETHODIMP SlicedInputStream::Available(uint64_t* aLength) { NS_ENSURE_STATE(mInputStream); if (mClosed) { return NS_BASE_STREAM_CLOSED; } nsresult rv = mInputStream->Available(aLength); if (rv == NS_BASE_STREAM_CLOSED) { mClosed = true; return rv; } if (NS_WARN_IF(NS_FAILED(rv))) { return rv; } // Let's remove extra length from the end. if (*aLength + mCurPos > mStart + mLength) { *aLength -= XPCOM_MIN(*aLength, (*aLength + mCurPos) - (mStart + mLength)); } // Let's remove extra length from the begin. if (mCurPos < mStart) { *aLength -= XPCOM_MIN(*aLength, mStart - mCurPos); } return NS_OK; } NS_IMETHODIMP SlicedInputStream::Read(char* aBuffer, uint32_t aCount, uint32_t* aReadCount) { *aReadCount = 0; if (mClosed) { return NS_OK; } if (mCurPos < mStart) { nsCOMPtr seekableStream = do_QueryInterface(mInputStream); if (seekableStream) { nsresult rv = seekableStream->Seek(nsISeekableStream::NS_SEEK_SET, mStart); if (NS_WARN_IF(NS_FAILED(rv))) { return rv; } mCurPos = mStart; } else { char buf[4096]; while (mCurPos < mStart) { uint32_t bytesRead; uint64_t bufCount = XPCOM_MIN(mStart - mCurPos, (uint64_t)sizeof(buf)); nsresult rv = mInputStream->Read(buf, bufCount, &bytesRead); if (NS_SUCCEEDED(rv) && bytesRead == 0) { mClosed = true; return rv; } if (NS_WARN_IF(NS_FAILED(rv))) { return rv; } mCurPos += bytesRead; } } } // Let's reduce aCount in case it's too big. if (mCurPos + aCount > mStart + mLength) { aCount = mStart + mLength - mCurPos; } // Nothing else to read. if (!aCount) { return NS_OK; } nsresult rv = mInputStream->Read(aBuffer, aCount, aReadCount); if (NS_SUCCEEDED(rv) && *aReadCount == 0) { mClosed = true; return rv; } if (NS_WARN_IF(NS_FAILED(rv))) { return rv; } mCurPos += *aReadCount; return NS_OK; } NS_IMETHODIMP SlicedInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, uint32_t aCount, uint32_t *aResult) { return NS_ERROR_NOT_IMPLEMENTED; } NS_IMETHODIMP SlicedInputStream::IsNonBlocking(bool* aNonBlocking) { NS_ENSURE_STATE(mInputStream); return mInputStream->IsNonBlocking(aNonBlocking); } // nsICloneableInputStream interface NS_IMETHODIMP SlicedInputStream::GetCloneable(bool* aCloneable) { NS_ENSURE_STATE(mInputStream); NS_ENSURE_STATE(mWeakCloneableInputStream); *aCloneable = true; return NS_OK; } NS_IMETHODIMP SlicedInputStream::Clone(nsIInputStream** aResult) { NS_ENSURE_STATE(mInputStream); NS_ENSURE_STATE(mWeakCloneableInputStream); nsCOMPtr clonedStream; nsresult rv = mWeakCloneableInputStream->Clone(getter_AddRefs(clonedStream)); if (NS_WARN_IF(NS_FAILED(rv))) { return rv; } nsCOMPtr sis = new SlicedInputStream(clonedStream.forget(), mStart, mLength); sis.forget(aResult); return NS_OK; } // nsIAsyncInputStream interface NS_IMETHODIMP SlicedInputStream::CloseWithStatus(nsresult aStatus) { NS_ENSURE_STATE(mInputStream); NS_ENSURE_STATE(mWeakAsyncInputStream); mClosed = true; return mWeakAsyncInputStream->CloseWithStatus(aStatus); } NS_IMETHODIMP SlicedInputStream::AsyncWait(nsIInputStreamCallback* aCallback, uint32_t aFlags, uint32_t aRequestedCount, nsIEventTarget* aEventTarget) { NS_ENSURE_STATE(mInputStream); NS_ENSURE_STATE(mWeakAsyncInputStream); if (mAsyncWaitCallback && aCallback) { return NS_ERROR_FAILURE; } mAsyncWaitCallback = aCallback; if (!mAsyncWaitCallback) { return NS_OK; } // If we haven't started retrieving data, let's see if we can seek. // If we cannot seek, we will do consecutive reads. if (mCurPos < mStart && mWeakSeekableInputStream) { nsresult rv = mWeakSeekableInputStream->Seek(nsISeekableStream::NS_SEEK_SET, mStart); if (NS_WARN_IF(NS_FAILED(rv))) { return rv; } mCurPos = mStart; } mAsyncWaitFlags = aFlags; mAsyncWaitRequestedCount = aRequestedCount; mAsyncWaitEventTarget = aEventTarget; // If we are not at the right position, let's do an asyncWait just internal. if (mCurPos < mStart) { return mWeakAsyncInputStream->AsyncWait(this, 0, mStart - mCurPos, aEventTarget); } return mWeakAsyncInputStream->AsyncWait(this, aFlags, aRequestedCount, aEventTarget); } // nsIInputStreamCallback NS_IMETHODIMP SlicedInputStream::OnInputStreamReady(nsIAsyncInputStream* aStream) { MOZ_ASSERT(mInputStream); MOZ_ASSERT(mWeakAsyncInputStream); MOZ_ASSERT(mWeakAsyncInputStream == aStream); // We have been canceled in the meanwhile. if (!mAsyncWaitCallback) { return NS_OK; } if (mCurPos < mStart) { char buf[4096]; while (mCurPos < mStart) { uint32_t bytesRead; uint64_t bufCount = XPCOM_MIN(mStart - mCurPos, (uint64_t)sizeof(buf)); nsresult rv = mInputStream->Read(buf, bufCount, &bytesRead); if (NS_SUCCEEDED(rv) && bytesRead == 0) { mClosed = true; return RunAsyncWaitCallback(); } if (rv == NS_BASE_STREAM_WOULD_BLOCK) { return mWeakAsyncInputStream->AsyncWait(this, 0, mStart - mCurPos, mAsyncWaitEventTarget); } if (NS_WARN_IF(NS_FAILED(rv))) { return RunAsyncWaitCallback(); } mCurPos += bytesRead; } // Now we are ready to do the 'real' asyncWait. return mWeakAsyncInputStream->AsyncWait(this, mAsyncWaitFlags, mAsyncWaitRequestedCount, mAsyncWaitEventTarget); } return RunAsyncWaitCallback(); } nsresult SlicedInputStream::RunAsyncWaitCallback() { nsCOMPtr callback = mAsyncWaitCallback; mAsyncWaitCallback = nullptr; mAsyncWaitEventTarget = nullptr; return callback->OnInputStreamReady(this); } // nsIIPCSerializableInputStream void SlicedInputStream::Serialize(mozilla::ipc::InputStreamParams& aParams, FileDescriptorArray& aFileDescriptors) { MOZ_ASSERT(mInputStream); MOZ_ASSERT(mWeakIPCSerializableInputStream); SlicedInputStreamParams params; InputStreamHelper::SerializeInputStream(mInputStream, params.stream(), aFileDescriptors); params.start() = mStart; params.length() = mLength; params.curPos() = mCurPos; params.closed() = mClosed; aParams = params; } bool SlicedInputStream::Deserialize(const mozilla::ipc::InputStreamParams& aParams, const FileDescriptorArray& aFileDescriptors) { MOZ_ASSERT(!mInputStream); MOZ_ASSERT(!mWeakIPCSerializableInputStream); if (aParams.type() != InputStreamParams::TSlicedInputStreamParams) { NS_ERROR("Received unknown parameters from the other process!"); return false; } const SlicedInputStreamParams& params = aParams.get_SlicedInputStreamParams(); nsCOMPtr stream = InputStreamHelper::DeserializeInputStream(params.stream(), aFileDescriptors); if (!stream) { NS_WARNING("Deserialize failed!"); return false; } SetSourceStream(stream.forget()); mStart = params.start(); mLength = params.length(); mCurPos = params.curPos(); mClosed = params.closed(); return true; } mozilla::Maybe SlicedInputStream::ExpectedSerializedLength() { if (!mInputStream || !mWeakIPCSerializableInputStream) { return mozilla::Nothing(); } return mWeakIPCSerializableInputStream->ExpectedSerializedLength(); } // nsISeekableStream NS_IMETHODIMP SlicedInputStream::Seek(int32_t aWhence, int64_t aOffset) { NS_ENSURE_STATE(mInputStream); NS_ENSURE_STATE(mWeakSeekableInputStream); int64_t offset; nsresult rv; switch (aWhence) { case NS_SEEK_SET: offset = mStart + aOffset; break; case NS_SEEK_CUR: // mCurPos could be lower than mStart if the reading has not started yet. offset = XPCOM_MAX(mStart, mCurPos) + aOffset; break; case NS_SEEK_END: { uint64_t available; rv = mInputStream->Available(&available); if (rv == NS_BASE_STREAM_CLOSED) { mClosed = true; return rv; } if (NS_WARN_IF(NS_FAILED(rv))) { return rv; } offset = XPCOM_MIN(mStart + mLength, available) + aOffset; break; } default: return NS_ERROR_ILLEGAL_VALUE; } if (offset < (int64_t)mStart || offset > (int64_t)(mStart + mLength)) { return NS_ERROR_INVALID_ARG; } rv = mWeakSeekableInputStream->Seek(NS_SEEK_SET, offset); if (NS_WARN_IF(NS_FAILED(rv))) { return rv; } mCurPos = offset; return NS_OK; } NS_IMETHODIMP SlicedInputStream::Tell(int64_t *aResult) { NS_ENSURE_STATE(mInputStream); NS_ENSURE_STATE(mWeakSeekableInputStream); int64_t tell = 0; nsresult rv = mWeakSeekableInputStream->Tell(&tell); if (NS_WARN_IF(NS_FAILED(rv))) { return rv; } if (tell < (int64_t)mStart) { *aResult = 0; return NS_OK; } *aResult = tell - mStart; if (*aResult > (int64_t)mLength) { *aResult = mLength; } return NS_OK; } NS_IMETHODIMP SlicedInputStream::SetEOF() { NS_ENSURE_STATE(mInputStream); NS_ENSURE_STATE(mWeakSeekableInputStream); mClosed = true; return mWeakSeekableInputStream->SetEOF(); } } // namespace mozilla