Bug 1415081 - part 0 - NS_ReadInputStreamToString must support nsIAsyncInputStream, r=smaug

This commit is contained in:
Andrea Marchesini 2017-11-09 11:18:08 +01:00
parent 1911626bf9
commit b71cdc9eb5
2 changed files with 368 additions and 32 deletions

View File

@ -13,6 +13,8 @@
#include "mozilla/LoadContext.h"
#include "mozilla/LoadInfo.h"
#include "mozilla/BasePrincipal.h"
#include "mozilla/Monitor.h"
#include "mozilla/TaskQueue.h"
#include "mozilla/Telemetry.h"
#include "nsCategoryCache.h"
#include "nsContentUtils.h"
@ -1411,45 +1413,359 @@ NS_NewPostDataStream(nsIInputStream **result,
return NS_OK;
}
nsresult
NS_ReadInputStreamToBuffer(nsIInputStream *aInputStream,
void **aDest,
uint32_t aCount)
namespace {
#define BUFFER_SIZE 4096
class BufferWriter final : public Runnable
, public nsIInputStreamCallback
{
nsresult rv;
public:
NS_DECL_ISUPPORTS_INHERITED
BufferWriter(nsIInputStream* aInputStream,
void* aBuffer, int64_t aCount)
: Runnable("BufferWriterRunnable")
, mMonitor("BufferWriter.mMonitor")
, mInputStream(aInputStream)
, mBuffer(aBuffer)
, mCount(aCount)
, mWrittenData(0)
, mBufferType(mBuffer ? eExternal : eInternal)
, mAsyncResult(NS_OK)
, mBufferSize(0)
{
MOZ_ASSERT(aInputStream);
MOZ_ASSERT(aCount == -1 || aCount > 0);
MOZ_ASSERT_IF(mBuffer, aCount > 0);
}
nsresult
Write()
{
// Let's make the inputStream buffered if it's not.
if (!NS_InputStreamIsBuffered(mInputStream)) {
nsCOMPtr<nsIInputStream> bufferedStream;
nsresult rv =
NS_NewBufferedInputStream(getter_AddRefs(bufferedStream),
mInputStream.forget(), BUFFER_SIZE);
NS_ENSURE_SUCCESS(rv, rv);
mInputStream = bufferedStream;
}
mAsyncInputStream = do_QueryInterface(mInputStream);
if (!mAsyncInputStream) {
return WriteSync();
}
// Let's use mAsyncInputStream only.
mInputStream = nullptr;
return WriteAsync();
}
uint64_t
WrittenData() const
{
return mWrittenData;
}
void*
StealBuffer()
{
MOZ_ASSERT(mBufferType == eInternal);
void* buffer = mBuffer;
mBuffer = nullptr;
return buffer;
}
private:
~BufferWriter()
{
if (mBuffer && mBufferType == eInternal) {
free(mBuffer);
}
if (mTaskQueue) {
mTaskQueue->BeginShutdown();
}
}
nsresult
WriteSync()
{
uint64_t length = (uint64_t)mCount;
if (mCount == -1) {
nsresult rv = mInputStream->Available(&length);
NS_ENSURE_SUCCESS(rv, rv);
if (length == 0) {
// nothing to read.
return NS_OK;
}
}
if (mBufferType == eInternal) {
mBuffer = malloc(length);
if (NS_WARN_IF(!mBuffer)) {
return NS_ERROR_OUT_OF_MEMORY;
}
}
uint32_t writtenData;
nsresult rv = mInputStream->ReadSegments(NS_CopySegmentToBuffer,
mBuffer, length,
&writtenData);
NS_ENSURE_SUCCESS(rv, rv);
mWrittenData = writtenData;
return NS_OK;
}
nsresult
WriteAsync()
{
if (mCount > 0 && mBufferType == eInternal) {
mBuffer = malloc(mCount);
if (NS_WARN_IF(!mBuffer)) {
return NS_ERROR_OUT_OF_MEMORY;
}
}
nsCOMPtr<nsIEventTarget> target =
do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID);
if (!target) {
return NS_ERROR_FAILURE;
}
mTaskQueue = new TaskQueue(target.forget());
MonitorAutoLock lock(mMonitor);
nsCOMPtr<nsIRunnable> runnable = this;
nsresult rv = mTaskQueue->Dispatch(runnable.forget(),
AbstractThread::DontAssertDispatchSuccess);
NS_ENSURE_SUCCESS(rv, rv);
lock.Wait();
return mAsyncResult;
}
// This method runs on the I/O Thread when the owning thread is blocked by
// the mMonitor. It is called multiple times until mCount is greater than 0
// or until there is something to read in the stream.
NS_IMETHOD
Run() override
{
MOZ_ASSERT(mAsyncInputStream);
MOZ_ASSERT(!mInputStream);
if (mCount == 0) {
OperationCompleted(NS_OK);
return NS_OK;
}
if (mCount == -1 && !MaybeExpandBufferSize()) {
OperationCompleted(NS_ERROR_OUT_OF_MEMORY);
return NS_OK;
}
uint64_t offset = mWrittenData;
uint64_t length = mCount == -1 ? BUFFER_SIZE : mCount;
// Let's try to read it directly.
uint32_t writtenData;
nsresult rv = mAsyncInputStream->ReadSegments(NS_CopySegmentToBuffer,
static_cast<char*>(mBuffer) + offset,
length, &writtenData);
// Operation completed.
if (NS_SUCCEEDED(rv) && writtenData == 0) {
OperationCompleted(NS_OK);
return NS_OK;
}
// If we succeeded, let's try to read again.
if (NS_SUCCEEDED(rv)) {
mWrittenData += writtenData;
if (mCount != -1) {
MOZ_ASSERT(mCount >= writtenData);
mCount -= writtenData;
}
nsCOMPtr<nsIRunnable> runnable = this;
rv = mTaskQueue->Dispatch(runnable.forget(),
AbstractThread::DontAssertDispatchSuccess);
if (NS_WARN_IF(NS_FAILED(rv))) {
OperationCompleted(rv);
}
return NS_OK;
}
// Async wait...
if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
rv = mAsyncInputStream->AsyncWait(this, 0, length, mTaskQueue);
if (NS_WARN_IF(NS_FAILED(rv))) {
OperationCompleted(rv);
}
return NS_OK;
}
// Error.
OperationCompleted(rv);
return NS_OK;
}
NS_IMETHOD
OnInputStreamReady(nsIAsyncInputStream* aStream) override
{
MOZ_ASSERT(aStream == mAsyncInputStream);
// The stream is ready, let's read it again.
return Run();
}
// This function is called from the I/O thread and it will unblock the
// owning thread.
void
OperationCompleted(nsresult aRv)
{
MonitorAutoLock lock(mMonitor);
mAsyncResult = aRv;
// This will unlock the owning thread.
lock.Notify();
}
bool
MaybeExpandBufferSize()
{
MOZ_ASSERT(mCount == -1);
if (mBufferSize >= mWrittenData + BUFFER_SIZE) {
// The buffer is big enough.
return true;
}
CheckedUint32 bufferSize =
std::max<uint32_t>(static_cast<uint32_t>(mWrittenData),
BUFFER_SIZE);
while (bufferSize.isValid() &&
bufferSize.value() < mWrittenData + BUFFER_SIZE) {
bufferSize *= 2;
}
if (!bufferSize.isValid()) {
return false;
}
void* buffer = realloc(mBuffer, bufferSize.value());
if (!buffer) {
return false;
}
mBuffer = buffer;
mBufferSize = bufferSize.value();
return true;
}
Monitor mMonitor;
nsCOMPtr<nsIInputStream> mInputStream;
nsCOMPtr<nsIAsyncInputStream> mAsyncInputStream;
RefPtr<TaskQueue> mTaskQueue;
void* mBuffer;
int64_t mCount;
uint64_t mWrittenData;
enum {
// The buffer is allocated internally and this object must release it
// in the DTOR if not stolen. The buffer can be reallocated.
eInternal,
// The buffer is not owned by this object and it cannot be reallocated.
eExternal,
} mBufferType;
// The following set if needed for the async read.
nsresult mAsyncResult;
uint64_t mBufferSize;
};
NS_IMPL_ISUPPORTS_INHERITED(BufferWriter, Runnable, nsIInputStreamCallback)
} // anonymous namespace
nsresult
NS_ReadInputStreamToBuffer(nsIInputStream* aInputStream,
void** aDest,
int64_t aCount,
uint64_t* aWritten)
{
MOZ_ASSERT(aInputStream);
MOZ_ASSERT(aCount >= -1);
uint64_t dummyWritten;
if (!aWritten) {
aWritten = &dummyWritten;
}
if (aCount == 0) {
*aWritten = 0;
return NS_OK;
}
// This will take care of allocating and reallocating aDest.
RefPtr<BufferWriter> writer =
new BufferWriter(aInputStream, *aDest, aCount);
nsresult rv = writer->Write();
NS_ENSURE_SUCCESS(rv, rv);
*aWritten = writer->WrittenData();
if (!*aDest) {
*aDest = malloc(aCount);
if (!*aDest)
return NS_ERROR_OUT_OF_MEMORY;
*aDest = writer->StealBuffer();
}
char * p = reinterpret_cast<char*>(*aDest);
uint32_t bytesRead;
uint32_t totalRead = 0;
while (1) {
rv = aInputStream->Read(p + totalRead, aCount - totalRead, &bytesRead);
if (!NS_SUCCEEDED(rv))
return rv;
totalRead += bytesRead;
if (totalRead == aCount)
break;
// if Read reads 0 bytes, we've hit EOF
if (bytesRead == 0)
return NS_ERROR_UNEXPECTED;
}
return rv;
return NS_OK;
}
nsresult
NS_ReadInputStreamToString(nsIInputStream *aInputStream,
nsACString &aDest,
uint32_t aCount)
NS_ReadInputStreamToString(nsIInputStream* aInputStream,
nsACString& aDest,
int64_t aCount,
uint64_t* aWritten)
{
if (!aDest.SetLength(aCount, mozilla::fallible))
return NS_ERROR_OUT_OF_MEMORY;
void* dest = aDest.BeginWriting();
return NS_ReadInputStreamToBuffer(aInputStream, &dest, aCount);
uint64_t dummyWritten;
if (!aWritten) {
aWritten = &dummyWritten;
}
void* dest = nullptr;
nsresult rv = NS_ReadInputStreamToBuffer(aInputStream, &dest, aCount,
aWritten);
MOZ_ASSERT_IF(NS_FAILED(rv), dest == nullptr);
NS_ENSURE_SUCCESS(rv, rv);
if (!dest) {
MOZ_ASSERT(*aWritten == 0);
aDest.Truncate();
return NS_OK;
}
aDest.Adopt(reinterpret_cast<char*>(dest), *aWritten);
return NS_OK;
}
nsresult

View File

@ -526,13 +526,33 @@ nsresult NS_NewPostDataStream(nsIInputStream **result,
bool isFile,
const nsACString &data);
/**
* This function reads an inputStream and stores its content into a buffer. In
* general, you should avoid using this function because, it blocks the current
* thread until the operation is done.
* If the inputStream is async, the reading happens on an I/O thread.
*
* @param aInputStream the inputStream.
* @param aDest the destination buffer. if *aDest is null, it will be allocated
* with the size of the written data. if aDest is not null, aCount
* must greater than 0.
* @param aCount the amount of data to read. Use -1 if you want that all the
* stream is read.
* @param aWritten this pointer will be used to store the number of data
* written in the buffer. If you don't need, pass nullptr.
*/
nsresult NS_ReadInputStreamToBuffer(nsIInputStream *aInputStream,
void **aDest,
uint32_t aCount);
int64_t aCount,
uint64_t* aWritten = nullptr);
/**
* See the comment for NS_ReadInputStreamToBuffer
*/
nsresult NS_ReadInputStreamToString(nsIInputStream *aInputStream,
nsACString &aDest,
uint32_t aCount);
int64_t aCount,
uint64_t* aWritten = nullptr);
nsresult
NS_LoadPersistentPropertiesFromURISpec(nsIPersistentProperties **outResult,