Bug 1405974 - NonBlockingAsyncInputStream must take the ownership of the underlying stream, r=qdot

This commit is contained in:
Andrea Marchesini 2018-01-31 16:45:20 +01:00
parent 11e06fdb47
commit 69f4700dfd
9 changed files with 153 additions and 107 deletions

View File

@ -207,7 +207,7 @@ FetchStream::RequestDataCallback(JSContext* aCx,
nsCOMPtr<nsIAsyncInputStream> asyncStream;
nsresult rv =
NS_MakeAsyncNonBlockingInputStream(stream->mOriginalInputStream,
NS_MakeAsyncNonBlockingInputStream(stream->mOriginalInputStream.forget(),
getter_AddRefs(asyncStream));
if (NS_WARN_IF(NS_FAILED(rv))) {
stream->ErrorPropagation(aCx, aStream, rv);

View File

@ -405,16 +405,18 @@ FileReader::ReadFileContent(Blob& aBlob,
mDataFormat = aDataFormat;
CopyUTF16toUTF8(aCharset, mCharset);
nsCOMPtr<nsIInputStream> stream;
mBlob->CreateInputStream(getter_AddRefs(stream), aRv);
if (NS_WARN_IF(aRv.Failed())) {
return;
}
{
nsCOMPtr<nsIInputStream> stream;
mBlob->CreateInputStream(getter_AddRefs(stream), aRv);
if (NS_WARN_IF(aRv.Failed())) {
return;
}
aRv = NS_MakeAsyncNonBlockingInputStream(stream,
getter_AddRefs(mAsyncStream));
if (NS_WARN_IF(aRv.Failed())) {
return;
aRv = NS_MakeAsyncNonBlockingInputStream(stream.forget(),
getter_AddRefs(mAsyncStream));
if (NS_WARN_IF(aRv.Failed())) {
return;
}
}
MOZ_ASSERT(mAsyncStream);

View File

@ -649,11 +649,13 @@ IPCBlobInputStream::EnsureAsyncRemoteStream()
// If non-blocking and non-async, let's use NonBlockingAsyncInputStream.
if (nonBlocking && !asyncStream) {
rv = NonBlockingAsyncInputStream::Create(mRemoteStream,
rv = NonBlockingAsyncInputStream::Create(mRemoteStream.forget(),
getter_AddRefs(asyncStream));
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
MOZ_ASSERT(asyncStream);
}
if (!asyncStream) {

View File

@ -335,10 +335,11 @@ nsInputStreamPump::AsyncRead(nsIStreamListener *listener, nsISupports *ctxt)
if (nonBlocking) {
mAsyncStream = do_QueryInterface(mStream);
if (!mAsyncStream) {
rv = NonBlockingAsyncInputStream::Create(mStream,
rv = NonBlockingAsyncInputStream::Create(mStream.forget(),
getter_AddRefs(mAsyncStream));
if (NS_WARN_IF(NS_FAILED(rv))) return rv;
}
MOZ_ASSERT(mAsyncStream);
}
if (!mAsyncStream) {

View File

@ -53,14 +53,15 @@ NS_INTERFACE_MAP_BEGIN(NonBlockingAsyncInputStream)
NS_INTERFACE_MAP_END
/* static */ nsresult
NonBlockingAsyncInputStream::Create(nsIInputStream* aInputStream,
NonBlockingAsyncInputStream::Create(already_AddRefed<nsIInputStream> aInputStream,
nsIAsyncInputStream** aResult)
{
MOZ_DIAGNOSTIC_ASSERT(aInputStream);
MOZ_DIAGNOSTIC_ASSERT(aResult);
nsCOMPtr<nsIInputStream> inputStream = Move(aInputStream);
bool nonBlocking = false;
nsresult rv = aInputStream->IsNonBlocking(&nonBlocking);
nsresult rv = inputStream->IsNonBlocking(&nonBlocking);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
@ -68,18 +69,18 @@ NonBlockingAsyncInputStream::Create(nsIInputStream* aInputStream,
MOZ_DIAGNOSTIC_ASSERT(nonBlocking);
nsCOMPtr<nsIAsyncInputStream> asyncInputStream =
do_QueryInterface(aInputStream);
do_QueryInterface(inputStream);
MOZ_DIAGNOSTIC_ASSERT(!asyncInputStream);
RefPtr<NonBlockingAsyncInputStream> stream =
new NonBlockingAsyncInputStream(aInputStream);
new NonBlockingAsyncInputStream(inputStream.forget());
stream.forget(aResult);
return NS_OK;
}
NonBlockingAsyncInputStream::NonBlockingAsyncInputStream(nsIInputStream* aInputStream)
: mInputStream(aInputStream)
NonBlockingAsyncInputStream::NonBlockingAsyncInputStream(already_AddRefed<nsIInputStream> aInputStream)
: mInputStream(Move(aInputStream))
, mWeakCloneableInputStream(nullptr)
, mWeakIPCSerializableInputStream(nullptr)
, mWeakSeekableInputStream(nullptr)
@ -88,21 +89,21 @@ NonBlockingAsyncInputStream::NonBlockingAsyncInputStream(nsIInputStream* aInputS
MOZ_ASSERT(mInputStream);
nsCOMPtr<nsICloneableInputStream> cloneableStream =
do_QueryInterface(aInputStream);
if (cloneableStream && SameCOMIdentity(aInputStream, cloneableStream)) {
do_QueryInterface(mInputStream);
if (cloneableStream && SameCOMIdentity(mInputStream, cloneableStream)) {
mWeakCloneableInputStream = cloneableStream;
}
nsCOMPtr<nsIIPCSerializableInputStream> serializableStream =
do_QueryInterface(aInputStream);
do_QueryInterface(mInputStream);
if (serializableStream &&
SameCOMIdentity(aInputStream, serializableStream)) {
SameCOMIdentity(mInputStream, serializableStream)) {
mWeakIPCSerializableInputStream = serializableStream;
}
nsCOMPtr<nsISeekableStream> seekableStream =
do_QueryInterface(aInputStream);
if (seekableStream && SameCOMIdentity(aInputStream, seekableStream)) {
do_QueryInterface(mInputStream);
if (seekableStream && SameCOMIdentity(mInputStream, seekableStream)) {
mWeakSeekableInputStream = seekableStream;
}
}
@ -232,7 +233,7 @@ NonBlockingAsyncInputStream::Clone(nsIInputStream** aResult)
}
nsCOMPtr<nsIAsyncInputStream> asyncStream;
rv = Create(clonedStream, getter_AddRefs(asyncStream));
rv = Create(clonedStream.forget(), getter_AddRefs(asyncStream));
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}

View File

@ -37,11 +37,11 @@ public:
// |aInputStream| must be a non-blocking, non-async inputSteam.
static nsresult
Create(nsIInputStream* aInputStream,
Create(already_AddRefed<nsIInputStream> aInputStream,
nsIAsyncInputStream** aAsyncInputStream);
private:
explicit NonBlockingAsyncInputStream(nsIInputStream* aInputStream);
explicit NonBlockingAsyncInputStream(already_AddRefed<nsIInputStream> aInputStream);
~NonBlockingAsyncInputStream();
nsCOMPtr<nsIInputStream> mInputStream;

View File

@ -969,20 +969,21 @@ NS_CloneInputStream(nsIInputStream* aSource, nsIInputStream** aCloneOut,
}
nsresult
NS_MakeAsyncNonBlockingInputStream(nsIInputStream* aSource,
NS_MakeAsyncNonBlockingInputStream(already_AddRefed<nsIInputStream> aSource,
nsIAsyncInputStream** aAsyncInputStream)
{
if (NS_WARN_IF(!aSource || !aAsyncInputStream)) {
nsCOMPtr<nsIInputStream> source = Move(aSource);
if (NS_WARN_IF(!aAsyncInputStream)) {
return NS_ERROR_FAILURE;
}
bool nonBlocking = false;
nsresult rv = aSource->IsNonBlocking(&nonBlocking);
nsresult rv = source->IsNonBlocking(&nonBlocking);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
nsCOMPtr<nsIAsyncInputStream> asyncStream = do_QueryInterface(aSource);
nsCOMPtr<nsIAsyncInputStream> asyncStream = do_QueryInterface(source);
if (nonBlocking && asyncStream) {
// This stream is perfect!
@ -992,7 +993,8 @@ NS_MakeAsyncNonBlockingInputStream(nsIInputStream* aSource,
if (nonBlocking) {
// If the stream is non-blocking but not async, we wrap it.
return NonBlockingAsyncInputStream::Create(aSource, aAsyncInputStream);
return NonBlockingAsyncInputStream::Create(source.forget(),
aAsyncInputStream);
}
nsCOMPtr<nsIStreamTransportService> sts =
@ -1002,7 +1004,7 @@ NS_MakeAsyncNonBlockingInputStream(nsIInputStream* aSource,
}
nsCOMPtr<nsITransport> transport;
rv = sts->CreateInputTransport(aSource,
rv = sts->CreateInputTransport(source,
/* aCloseWhenDone */ true,
getter_AddRefs(transport));
if (NS_WARN_IF(NS_FAILED(rv))) {

View File

@ -299,6 +299,8 @@ NS_CloneInputStream(nsIInputStream* aSource, nsIInputStream** aCloneOut,
* different approaches are used based on what |aSource| is and what it
* implements.
*
* Note that this component takes the owninship of aSource.
*
* If the |aSource| is already a non-blocking and async stream,
* |aAsyncInputStream| will be equal to |aSource|.
*
@ -310,7 +312,7 @@ NS_CloneInputStream(nsIInputStream* aSource, nsIInputStream** aCloneOut,
* a separate thread.
*/
extern nsresult
NS_MakeAsyncNonBlockingInputStream(nsIInputStream* aSource,
NS_MakeAsyncNonBlockingInputStream(already_AddRefed<nsIInputStream> aSource,
nsIAsyncInputStream** aAsyncInputStream);
#endif // !nsStreamUtils_h__

View File

@ -8,26 +8,30 @@
#include "Helpers.h"
TEST(TestNonBlockingAsyncInputStream, Simple) {
nsCOMPtr<nsIInputStream> stream;
nsCString data;
data.Assign("Hello world!");
// Let's create a test string inputStream
ASSERT_EQ(NS_OK, NS_NewCStringInputStream(getter_AddRefs(stream), data));
// It should not be async.
nsCOMPtr<nsIAsyncInputStream> async = do_QueryInterface(stream);
ASSERT_EQ(nullptr, async);
// It must be non-blocking
bool nonBlocking = false;
ASSERT_EQ(NS_OK, stream->IsNonBlocking(&nonBlocking));
ASSERT_TRUE(nonBlocking);
nsCOMPtr<nsIAsyncInputStream> async;
// Here the non-blocking stream.
ASSERT_EQ(NS_OK,
NonBlockingAsyncInputStream::Create(stream, getter_AddRefs(async)));
{
// Let's create a test string inputStream
nsCOMPtr<nsIInputStream> stream;
ASSERT_EQ(NS_OK, NS_NewCStringInputStream(getter_AddRefs(stream), data));
async = do_QueryInterface(stream);
ASSERT_EQ(nullptr, async);
// It must be non-blocking
ASSERT_EQ(NS_OK, stream->IsNonBlocking(&nonBlocking));
ASSERT_TRUE(nonBlocking);
// Here the non-blocking stream.
ASSERT_EQ(NS_OK,
NonBlockingAsyncInputStream::Create(stream.forget(),
getter_AddRefs(async)));
}
ASSERT_TRUE(!!async);
// Still non-blocking
@ -75,18 +79,20 @@ ReadSegmentsFunction(nsIInputStream* aInStr,
}
TEST(TestNonBlockingAsyncInputStream, ReadSegments) {
nsCOMPtr<nsIInputStream> stream;
nsCString data;
data.Assign("Hello world!");
// Let's create a test string inputStream
ASSERT_EQ(NS_OK, NS_NewCStringInputStream(getter_AddRefs(stream), data));
// Here the non-blocking stream.
nsCOMPtr<nsIAsyncInputStream> async;
ASSERT_EQ(NS_OK,
NonBlockingAsyncInputStream::Create(stream, getter_AddRefs(async)));
{
// Let's create a test string inputStream
nsCOMPtr<nsIInputStream> stream;
ASSERT_EQ(NS_OK, NS_NewCStringInputStream(getter_AddRefs(stream), data));
// Here the non-blocking stream.
ASSERT_EQ(NS_OK,
NonBlockingAsyncInputStream::Create(stream.forget(),
getter_AddRefs(async)));
}
// Read works fine.
char buffer[1024];
@ -99,18 +105,20 @@ TEST(TestNonBlockingAsyncInputStream, ReadSegments) {
}
TEST(TestNonBlockingAsyncInputStream, AsyncWait_Simple) {
nsCOMPtr<nsIInputStream> stream;
nsCString data;
data.Assign("Hello world!");
// Let's create a test string inputStream
ASSERT_EQ(NS_OK, NS_NewCStringInputStream(getter_AddRefs(stream), data));
// Here the non-blocking stream.
nsCOMPtr<nsIAsyncInputStream> async;
ASSERT_EQ(NS_OK,
NonBlockingAsyncInputStream::Create(stream, getter_AddRefs(async)));
{
// Let's create a test string inputStream
nsCOMPtr<nsIInputStream> stream;
ASSERT_EQ(NS_OK, NS_NewCStringInputStream(getter_AddRefs(stream), data));
// Here the non-blocking stream.
ASSERT_EQ(NS_OK,
NonBlockingAsyncInputStream::Create(stream.forget(),
getter_AddRefs(async)));
}
ASSERT_TRUE(!!async);
// Testing ::Available()
@ -143,18 +151,20 @@ TEST(TestNonBlockingAsyncInputStream, AsyncWait_Simple) {
}
TEST(TestNonBlockingAsyncInputStream, AsyncWait_ClosureOnly_withoutEventTarget) {
nsCOMPtr<nsIInputStream> stream;
nsCString data;
data.Assign("Hello world!");
// Let's create a test string inputStream
ASSERT_EQ(NS_OK, NS_NewCStringInputStream(getter_AddRefs(stream), data));
// Here the non-blocking stream.
nsCOMPtr<nsIAsyncInputStream> async;
ASSERT_EQ(NS_OK,
NonBlockingAsyncInputStream::Create(stream, getter_AddRefs(async)));
{
// Let's create a test string inputStream
nsCOMPtr<nsIInputStream> stream;
ASSERT_EQ(NS_OK, NS_NewCStringInputStream(getter_AddRefs(stream), data));
// Here the non-blocking stream.
ASSERT_EQ(NS_OK,
NonBlockingAsyncInputStream::Create(stream.forget(),
getter_AddRefs(async)));
}
ASSERT_TRUE(!!async);
// Testing ::AsyncWait - no eventTarget
@ -169,18 +179,20 @@ TEST(TestNonBlockingAsyncInputStream, AsyncWait_ClosureOnly_withoutEventTarget)
}
TEST(TestNonBlockingAsyncInputStream, AsyncWait_ClosureOnly_withEventTarget) {
nsCOMPtr<nsIInputStream> stream;
nsCString data;
data.Assign("Hello world!");
// Let's create a test string inputStream
ASSERT_EQ(NS_OK, NS_NewCStringInputStream(getter_AddRefs(stream), data));
// Here the non-blocking stream.
nsCOMPtr<nsIAsyncInputStream> async;
ASSERT_EQ(NS_OK,
NonBlockingAsyncInputStream::Create(stream, getter_AddRefs(async)));
{
// Let's create a test string inputStream
nsCOMPtr<nsIInputStream> stream;
ASSERT_EQ(NS_OK, NS_NewCStringInputStream(getter_AddRefs(stream), data));
// Here the non-blocking stream.
ASSERT_EQ(NS_OK,
NonBlockingAsyncInputStream::Create(stream.forget(),
getter_AddRefs(async)));
}
ASSERT_TRUE(!!async);
// Testing ::AsyncWait - with EventTarget
@ -199,30 +211,39 @@ TEST(TestNonBlockingAsyncInputStream, AsyncWait_ClosureOnly_withEventTarget) {
}
TEST(TestNonBlockingAsyncInputStream, Helper) {
nsCOMPtr<nsIInputStream> stream;
nsCString data;
data.Assign("Hello world!");
// Let's create a test string inputStream
ASSERT_EQ(NS_OK, NS_NewCStringInputStream(getter_AddRefs(stream), data));
// Here the non-blocking stream.
nsCOMPtr<nsIAsyncInputStream> async;
ASSERT_EQ(NS_OK,
NonBlockingAsyncInputStream::Create(stream, getter_AddRefs(async)));
{
// Let's create a test string inputStream
nsCOMPtr<nsIInputStream> stream;
ASSERT_EQ(NS_OK, NS_NewCStringInputStream(getter_AddRefs(stream), data));
// Here the non-blocking stream.
ASSERT_EQ(NS_OK,
NonBlockingAsyncInputStream::Create(stream.forget(),
getter_AddRefs(async)));
}
ASSERT_TRUE(!!async);
// This should return the same object because async is already non-blocking
// and async.
nsCOMPtr<nsIAsyncInputStream> result;
nsCOMPtr<nsIAsyncInputStream> asyncTmp = async;
ASSERT_EQ(NS_OK,
NS_MakeAsyncNonBlockingInputStream(async, getter_AddRefs(result)));
NS_MakeAsyncNonBlockingInputStream(asyncTmp.forget(),
getter_AddRefs(result)));
ASSERT_EQ(async, result);
// This will use NonBlockingAsyncInputStream wrapper.
ASSERT_EQ(NS_OK,
NS_MakeAsyncNonBlockingInputStream(stream, getter_AddRefs(result)));
{
nsCOMPtr<nsIInputStream> stream;
ASSERT_EQ(NS_OK, NS_NewCStringInputStream(getter_AddRefs(stream), data));
ASSERT_EQ(NS_OK,
NS_MakeAsyncNonBlockingInputStream(stream.forget(),
getter_AddRefs(result)));
}
ASSERT_TRUE(async != result);
ASSERT_TRUE(async);
}
@ -291,11 +312,15 @@ NS_INTERFACE_MAP_END
TEST(TestNonBlockingAsyncInputStream, QI) {
// Let's test ::Create() returning error.
nsCOMPtr<nsIInputStream> stream = new QIInputStream(true, true, true, true);
nsCOMPtr<nsIAsyncInputStream> async;
ASSERT_EQ(NS_ERROR_FAILURE,
NonBlockingAsyncInputStream::Create(stream, getter_AddRefs(async)));
{
nsCOMPtr<nsIInputStream> stream = new QIInputStream(true, true, true, true);
ASSERT_EQ(NS_ERROR_FAILURE,
NonBlockingAsyncInputStream::Create(stream.forget(),
getter_AddRefs(async)));
}
// Let's test the QIs
for (int i = 0; i < 8; ++i) {
@ -303,29 +328,40 @@ TEST(TestNonBlockingAsyncInputStream, QI) {
bool shouldBeSerializable = !!(i & 0x02);
bool shouldBeSeekable = !!(i & 0x04);
stream = new QIInputStream(false, shouldBeCloneable, shouldBeSerializable,
shouldBeSeekable);
ASSERT_EQ(NS_OK, NonBlockingAsyncInputStream::Create(stream, getter_AddRefs(async)));
nsCOMPtr<nsICloneableInputStream> cloneable;
nsCOMPtr<nsIIPCSerializableInputStream> ipcSerializable;
nsCOMPtr<nsISeekableStream> seekable;
{
nsCOMPtr<nsIInputStream> stream =
new QIInputStream(false, shouldBeCloneable, shouldBeSerializable, shouldBeSeekable);
cloneable = do_QueryInterface(stream);
ASSERT_EQ(shouldBeCloneable, !!cloneable);
ipcSerializable = do_QueryInterface(stream);
ASSERT_EQ(shouldBeSerializable, !!ipcSerializable);
seekable = do_QueryInterface(stream);
ASSERT_EQ(shouldBeSeekable, !!seekable);
ASSERT_EQ(NS_OK, NonBlockingAsyncInputStream::Create(stream.forget(),
getter_AddRefs(async)));
}
// The returned async stream should be cloneable only if the underlying
// stream is.
nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(async);
ASSERT_EQ(shouldBeCloneable, !!cloneable);
cloneable = do_QueryInterface(stream);
cloneable = do_QueryInterface(async);
ASSERT_EQ(shouldBeCloneable, !!cloneable);
// The returned async stream should be serializable only if the underlying
// stream is.
nsCOMPtr<nsIIPCSerializableInputStream> ipcSerializable = do_QueryInterface(async);
ASSERT_EQ(shouldBeSerializable, !!ipcSerializable);
ipcSerializable = do_QueryInterface(stream);
ipcSerializable = do_QueryInterface(async);
ASSERT_EQ(shouldBeSerializable, !!ipcSerializable);
// The returned async stream should be seekable only if the underlying
// stream is.
nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(async);
ASSERT_EQ(shouldBeSeekable, !!seekable);
seekable = do_QueryInterface(stream);
seekable = do_QueryInterface(async);
ASSERT_EQ(shouldBeSeekable, !!seekable);
}
}