Bug 1434553 - Implement nsIInputStreamLength and nsIAsyncInputStreamLength - part 7 - nsIMultiplexInputStream exposes nsIInputStreamLength, r=froydnj, r=mayhemer

This commit is contained in:
Andrea Marchesini 2018-05-23 07:12:35 +02:00
parent f0fb8aa183
commit 767ed01fe0
2 changed files with 497 additions and 0 deletions

View File

@ -26,6 +26,7 @@
#include "nsIIPCSerializableInputStream.h"
#include "mozilla/ipc/InputStreamUtils.h"
#include "nsIAsyncInputStream.h"
#include "nsIInputStreamLength.h"
using namespace mozilla;
using namespace mozilla::ipc;
@ -42,6 +43,8 @@ class nsMultiplexInputStream final
, public nsICloneableInputStream
, public nsIAsyncInputStream
, public nsIInputStreamCallback
, public nsIInputStreamLength
, public nsIAsyncInputStreamLength
{
public:
nsMultiplexInputStream();
@ -54,9 +57,16 @@ public:
NS_DECL_NSICLONEABLEINPUTSTREAM
NS_DECL_NSIASYNCINPUTSTREAM
NS_DECL_NSIINPUTSTREAMCALLBACK
NS_DECL_NSIINPUTSTREAMLENGTH
NS_DECL_NSIASYNCINPUTSTREAMLENGTH
// This is used for nsIAsyncInputStream::AsyncWait
void AsyncWaitCompleted();
// This is used for nsIAsyncInputStreamLength::AsyncLengthWait
void AsyncWaitCompleted(int64_t aLength,
const MutexAutoLock& aProofOfLock);
struct StreamData
{
void Initialize(nsIInputStream* aStream)
@ -74,6 +84,11 @@ public:
nsCOMPtr<nsISeekableStream> mSeekableStream;
};
Mutex& GetLock()
{
return mLock;
}
private:
~nsMultiplexInputStream()
{
@ -103,6 +118,8 @@ private:
bool IsIPCSerializable() const;
bool IsCloneable() const;
bool IsAsyncInputStream() const;
bool IsInputStreamLength() const;
bool IsAsyncInputStreamLength() const;
Mutex mLock; // Protects access to all data members.
@ -115,11 +132,17 @@ private:
uint32_t mAsyncWaitFlags;
uint32_t mAsyncWaitRequestedCount;
nsCOMPtr<nsIEventTarget> mAsyncWaitEventTarget;
nsCOMPtr<nsIInputStreamLengthCallback> mAsyncWaitLengthCallback;
class AsyncWaitLengthHelper;
RefPtr<AsyncWaitLengthHelper> mAsyncWaitLengthHelper;
uint32_t mSeekableStreams;
uint32_t mIPCSerializableStreams;
uint32_t mCloneableStreams;
uint32_t mAsyncInputStreams;
uint32_t mInputStreamLengths;
uint32_t mAsyncInputStreamLengths;
};
NS_IMPL_ADDREF(nsMultiplexInputStream)
@ -140,6 +163,10 @@ NS_INTERFACE_MAP_BEGIN(nsMultiplexInputStream)
IsAsyncInputStream())
NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamCallback,
IsAsyncInputStream())
NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamLength,
IsInputStreamLength())
NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStreamLength,
IsAsyncInputStreamLength())
NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIMultiplexInputStream)
NS_IMPL_QUERY_CLASSINFO(nsMultiplexInputStream)
NS_INTERFACE_MAP_END
@ -198,6 +225,8 @@ nsMultiplexInputStream::nsMultiplexInputStream()
, mIPCSerializableStreams(0)
, mCloneableStreams(0)
, mAsyncInputStreams(0)
, mInputStreamLengths(0)
, mAsyncInputStreamLengths(0)
{}
NS_IMETHODIMP
@ -1117,6 +1146,315 @@ nsMultiplexInputStream::Clone(nsIInputStream** aClone)
return NS_OK;
}
NS_IMETHODIMP
nsMultiplexInputStream::Length(int64_t* aLength)
{
MutexAutoLock lock(mLock);
if (mCurrentStream > 0 || mStartedReadingCurrent) {
return NS_ERROR_NOT_AVAILABLE;
}
CheckedInt64 length = 0;
nsresult retval = NS_OK;
for (uint32_t i = 0, len = mStreams.Length(); i < len; ++i) {
nsCOMPtr<nsIInputStreamLength> substream =
do_QueryInterface(mStreams[i].mStream);
if (!substream) {
// Let's use available as fallback.
uint64_t streamAvail = 0;
nsresult rv = AvailableMaybeSeek(mStreams[i], &streamAvail);
if (rv == NS_BASE_STREAM_CLOSED) {
continue;
}
if (NS_WARN_IF(NS_FAILED(rv))) {
mStatus = rv;
return mStatus;
}
length += streamAvail;
if (!length.isValid()) {
return NS_ERROR_OUT_OF_MEMORY;
}
continue;
}
int64_t size = 0;
nsresult rv = substream->Length(&size);
if (rv == NS_BASE_STREAM_CLOSED) {
continue;
}
if (rv == NS_ERROR_NOT_AVAILABLE) {
return rv;
}
// If one stream blocks, we all block.
if (rv != NS_BASE_STREAM_WOULD_BLOCK &&
NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
// We want to return WOULD_BLOCK if there is 1 stream that blocks. But want
// to see if there are other streams with length = -1.
if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
retval = NS_BASE_STREAM_WOULD_BLOCK;
continue;
}
// If one of the stream doesn't know the size, we all don't know the size.
if (size == -1) {
*aLength = -1;
return NS_OK;
}
length += size;
if (!length.isValid()) {
return NS_ERROR_OUT_OF_MEMORY;
}
}
*aLength = length.value();
return retval;
}
class nsMultiplexInputStream::AsyncWaitLengthHelper final : public nsIInputStreamLengthCallback
{
public:
NS_DECL_ISUPPORTS
AsyncWaitLengthHelper()
: mStreamNotified(false)
, mLength(0)
, mNegativeSize(false)
{}
void
AddStream(nsIAsyncInputStreamLength* aStream)
{
mPendingStreams.AppendElement(aStream);
}
bool
AddSize(int64_t aSize)
{
MOZ_ASSERT(!mNegativeSize);
mLength += aSize;
return mLength.isValid();
}
void
NegativeSize()
{
MOZ_ASSERT(!mNegativeSize);
mNegativeSize = true;
}
nsresult
Proceed(nsMultiplexInputStream* aParentStream,
nsIEventTarget* aEventTarget,
const MutexAutoLock& aProofOfLock)
{
MOZ_ASSERT(!mStream);
// If we don't need to wait, let's inform the callback immediately.
if (mPendingStreams.IsEmpty() || mNegativeSize) {
RefPtr<nsMultiplexInputStream> parentStream = aParentStream;
int64_t length = -1;
if (!mNegativeSize && mLength.isValid()) {
length = mLength.value();
}
nsCOMPtr<nsIRunnable> r = NS_NewRunnableFunction(
"AsyncWaitLengthHelper",
[parentStream, length]() {
MutexAutoLock lock(parentStream->GetLock());
parentStream->AsyncWaitCompleted(length, lock);
});
return aEventTarget->Dispatch(r.forget(), NS_DISPATCH_NORMAL);
}
// Let's store the callback and the parent stream until we have
// notifications from the async length streams.
mStream = aParentStream;
// Let's activate all the pending streams.
for (nsIAsyncInputStreamLength* stream : mPendingStreams) {
nsresult rv = stream->AsyncLengthWait(this, aEventTarget);
if (rv == NS_BASE_STREAM_CLOSED) {
continue;
}
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
}
return NS_OK;
}
NS_IMETHOD
OnInputStreamLengthReady(nsIAsyncInputStreamLength* aStream,
int64_t aLength) override
{
MutexAutoLock lock(mStream->GetLock());
MOZ_ASSERT(mPendingStreams.Contains(aStream));
mPendingStreams.RemoveElement(aStream);
// Already notified.
if (mStreamNotified) {
return NS_OK;
}
if (aLength == -1) {
mNegativeSize = true;
} else {
mLength += aLength;
if (!mLength.isValid()) {
mNegativeSize = true;
}
}
// We need to wait.
if (!mNegativeSize && !mPendingStreams.IsEmpty()) {
return NS_OK;
}
// Let's notify the parent stream.
mStreamNotified = true;
mStream->AsyncWaitCompleted(mNegativeSize ? -1 : mLength.value(), lock);
return NS_OK;
}
private:
~AsyncWaitLengthHelper() = default;
RefPtr<nsMultiplexInputStream> mStream;
bool mStreamNotified;
CheckedInt64 mLength;
bool mNegativeSize;
nsTArray<nsCOMPtr<nsIAsyncInputStreamLength>> mPendingStreams;
};
NS_IMPL_ISUPPORTS(nsMultiplexInputStream::AsyncWaitLengthHelper,
nsIInputStreamLengthCallback)
NS_IMETHODIMP
nsMultiplexInputStream::AsyncLengthWait(nsIInputStreamLengthCallback* aCallback,
nsIEventTarget* aEventTarget)
{
if (NS_WARN_IF(!aEventTarget)) {
return NS_ERROR_NULL_POINTER;
}
MutexAutoLock lock(mLock);
if (mCurrentStream > 0 || mStartedReadingCurrent) {
return NS_ERROR_NOT_AVAILABLE;
}
if (!aCallback) {
mAsyncWaitLengthCallback = nullptr;
return NS_OK;
}
// We have a pending operation! Let's use this instead of creating a new one.
if (mAsyncWaitLengthHelper) {
mAsyncWaitLengthCallback = aCallback;
return NS_OK;
}
RefPtr<AsyncWaitLengthHelper> helper = new AsyncWaitLengthHelper();
for (uint32_t i = 0, len = mStreams.Length(); i < len; ++i) {
nsCOMPtr<nsIAsyncInputStreamLength> asyncStream =
do_QueryInterface(mStreams[i].mStream);
if (asyncStream) {
helper->AddStream(asyncStream);
continue;
}
nsCOMPtr<nsIInputStreamLength> stream =
do_QueryInterface(mStreams[i].mStream);
if (!stream) {
// Let's use available as fallback.
uint64_t streamAvail = 0;
nsresult rv = AvailableMaybeSeek(mStreams[i], &streamAvail);
if (rv == NS_BASE_STREAM_CLOSED) {
continue;
}
if (NS_WARN_IF(NS_FAILED(rv))) {
mStatus = rv;
return mStatus;
}
if (NS_WARN_IF(!helper->AddSize(streamAvail))) {
return NS_ERROR_OUT_OF_MEMORY;
}
continue;
}
int64_t size = 0;
nsresult rv = stream->Length(&size);
if (rv == NS_BASE_STREAM_CLOSED) {
continue;
}
MOZ_ASSERT(rv != NS_BASE_STREAM_WOULD_BLOCK,
"A nsILengthInutStream returns NS_BASE_STREAM_WOULD_BLOCK but it doesn't implement nsIAsyncInputStreamLength.");
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
if (size == -1) {
helper->NegativeSize();
break;
}
if (NS_WARN_IF(!helper->AddSize(size))) {
return NS_ERROR_OUT_OF_MEMORY;
}
}
nsresult rv = helper->Proceed(this, aEventTarget, lock);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
mAsyncWaitLengthHelper = helper;
mAsyncWaitLengthCallback = aCallback;
return NS_OK;
}
void
nsMultiplexInputStream::AsyncWaitCompleted(int64_t aLength,
const MutexAutoLock& aProofOfLock)
{
nsCOMPtr<nsIInputStreamLengthCallback> callback;
callback.swap(mAsyncWaitLengthCallback);
mAsyncWaitLengthHelper = nullptr;
// Already canceled.
if (!callback) {
return;
}
MutexAutoUnlock unlock(mLock);
callback->OnInputStreamLengthReady(this, aLength);
}
#define MAYBE_UPDATE_VALUE_REAL(x, y) \
if (y) { \
if (aCount == 1) { \
@ -1143,6 +1481,8 @@ nsMultiplexInputStream::UpdateQIMap(StreamData& aStream, int32_t aCount)
MAYBE_UPDATE_VALUE(mIPCSerializableStreams, nsIIPCSerializableInputStream)
MAYBE_UPDATE_VALUE(mCloneableStreams, nsICloneableInputStream)
MAYBE_UPDATE_VALUE_REAL(mAsyncInputStreams, aStream.mAsyncStream)
MAYBE_UPDATE_VALUE(mInputStreamLengths, nsIInputStreamLength)
MAYBE_UPDATE_VALUE(mAsyncInputStreamLengths, nsIAsyncInputStreamLength)
}
#undef MAYBE_UPDATE_VALUE
@ -1172,3 +1512,15 @@ nsMultiplexInputStream::IsAsyncInputStream() const
// substream implements that interface.
return !!mAsyncInputStreams;
}
bool
nsMultiplexInputStream::IsInputStreamLength() const
{
return !!mInputStreamLengths;
}
bool
nsMultiplexInputStream::IsAsyncInputStreamLength() const
{
return !!mAsyncInputStreamLengths;
}

View File

@ -386,3 +386,148 @@ TEST(TestMultiplexInputStream, Available) {
ASSERT_EQ(NS_OK, rv);
ASSERT_EQ(buffer.Length(), length);
}
TEST(TestMultiplexInputStream, QILengthInputStream) {
nsCString buf;
buf.AssignLiteral("Hello world");
nsCOMPtr<nsIMultiplexInputStream> multiplexStream =
do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1");
// nsMultiplexInputStream doesn't expose nsIInputStreamLength if there are
// no nsIInputStreamLength sub streams.
{
nsCOMPtr<nsIInputStream> inputStream;
nsresult rv = NS_NewCStringInputStream(getter_AddRefs(inputStream), buf);
ASSERT_TRUE(NS_SUCCEEDED(rv));
rv = multiplexStream->AppendStream(inputStream);
ASSERT_TRUE(NS_SUCCEEDED(rv));
nsCOMPtr<nsIInputStreamLength> fsis = do_QueryInterface(multiplexStream);
ASSERT_TRUE(!fsis);
nsCOMPtr<nsIAsyncInputStreamLength> afsis = do_QueryInterface(multiplexStream);
ASSERT_TRUE(!afsis);
}
// nsMultiplexInputStream exposes nsIInputStreamLength if there is one or
// more nsIInputStreamLength sub streams.
{
RefPtr<testing::LengthInputStream> inputStream =
new testing::LengthInputStream(buf, true, false);
nsresult rv = multiplexStream->AppendStream(inputStream);
ASSERT_TRUE(NS_SUCCEEDED(rv));
nsCOMPtr<nsIInputStreamLength> fsis = do_QueryInterface(multiplexStream);
ASSERT_TRUE(!!fsis);
nsCOMPtr<nsIAsyncInputStreamLength> afsis = do_QueryInterface(multiplexStream);
ASSERT_TRUE(!afsis);
}
// nsMultiplexInputStream exposes nsIAsyncInputStreamLength if there is one
// or more nsIAsyncInputStreamLength sub streams.
{
RefPtr<testing::LengthInputStream> inputStream =
new testing::LengthInputStream(buf, true, true);
nsresult rv = multiplexStream->AppendStream(inputStream);
ASSERT_TRUE(NS_SUCCEEDED(rv));
nsCOMPtr<nsIInputStreamLength> fsis = do_QueryInterface(multiplexStream);
ASSERT_TRUE(!!fsis);
nsCOMPtr<nsIAsyncInputStreamLength> afsis = do_QueryInterface(multiplexStream);
ASSERT_TRUE(!!afsis);
}
}
TEST(TestMultiplexInputStream, LengthInputStream) {
nsCOMPtr<nsIMultiplexInputStream> multiplexStream =
do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1");
// First stream is a a simple one.
nsCString buf;
buf.AssignLiteral("Hello world");
nsCOMPtr<nsIInputStream> inputStream;
nsresult rv = NS_NewCStringInputStream(getter_AddRefs(inputStream), buf);
ASSERT_TRUE(NS_SUCCEEDED(rv));
rv = multiplexStream->AppendStream(inputStream);
ASSERT_TRUE(NS_SUCCEEDED(rv));
// A LengthInputStream, non-async.
RefPtr<testing::LengthInputStream> lengthStream =
new testing::LengthInputStream(buf, true, false);
rv = multiplexStream->AppendStream(lengthStream);
ASSERT_TRUE(NS_SUCCEEDED(rv));
nsCOMPtr<nsIInputStreamLength> fsis = do_QueryInterface(multiplexStream);
ASSERT_TRUE(!!fsis);
// Size is the sum of the 2 streams.
int64_t length;
rv = fsis->Length(&length);
ASSERT_TRUE(NS_SUCCEEDED(rv));
ASSERT_EQ(buf.Length() * 2, length);
// An async LengthInputStream.
RefPtr<testing::LengthInputStream> asyncLengthStream =
new testing::LengthInputStream(buf, true, true, NS_BASE_STREAM_WOULD_BLOCK);
rv = multiplexStream->AppendStream(asyncLengthStream);
ASSERT_TRUE(NS_SUCCEEDED(rv));
nsCOMPtr<nsIAsyncInputStreamLength> afsis = do_QueryInterface(multiplexStream);
ASSERT_TRUE(!!afsis);
// Now it would block.
rv = fsis->Length(&length);
ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);
// Let's read the size async.
RefPtr<testing::LengthCallback> callback = new testing::LengthCallback();
rv = afsis->AsyncLengthWait(callback, GetCurrentThreadSerialEventTarget());
ASSERT_EQ(NS_OK, rv);
MOZ_ALWAYS_TRUE(SpinEventLoopUntil([&]() { return callback->Called(); }));
ASSERT_EQ(buf.Length() * 3, callback->Size());
// Now a negative stream
lengthStream =
new testing::LengthInputStream(buf, true, false, NS_OK, true);
rv = multiplexStream->AppendStream(lengthStream);
ASSERT_TRUE(NS_SUCCEEDED(rv));
rv = fsis->Length(&length);
ASSERT_TRUE(NS_SUCCEEDED(rv));
ASSERT_EQ(-1, length);
// Another async LengthInputStream.
asyncLengthStream =
new testing::LengthInputStream(buf, true, true, NS_BASE_STREAM_WOULD_BLOCK);
rv = multiplexStream->AppendStream(asyncLengthStream);
ASSERT_TRUE(NS_SUCCEEDED(rv));
afsis = do_QueryInterface(multiplexStream);
ASSERT_TRUE(!!afsis);
// Let's read the size async.
RefPtr<testing::LengthCallback> callback1 = new testing::LengthCallback();
rv = afsis->AsyncLengthWait(callback1, GetCurrentThreadSerialEventTarget());
ASSERT_EQ(NS_OK, rv);
RefPtr<testing::LengthCallback> callback2 = new testing::LengthCallback();
rv = afsis->AsyncLengthWait(callback2, GetCurrentThreadSerialEventTarget());
ASSERT_EQ(NS_OK, rv);
MOZ_ALWAYS_TRUE(SpinEventLoopUntil([&]() { return callback2->Called(); }));
ASSERT_FALSE(callback1->Called());
ASSERT_TRUE(callback2->Called());
}