Bug 1248203 - streamline h2 stream flow control buffer r=hurley

This commit is contained in:
Patrick McManus 2016-02-13 12:10:02 -05:00
parent 05c95317a9
commit 0acb222f9a
6 changed files with 172 additions and 44 deletions

View File

@ -0,0 +1,95 @@
/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim:set ts=2 sw=2 sts=2 et cindent: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "SimpleBuffer.h"
#include <algorithm>
namespace mozilla {
namespace net {
SimpleBuffer::SimpleBuffer()
: mStatus(NS_OK)
, mAvailable(0)
{
mOwningThread = PR_GetCurrentThread();
}
nsresult SimpleBuffer::Write(char *src, size_t len)
{
MOZ_ASSERT(PR_GetCurrentThread() == mOwningThread);
if (NS_FAILED(mStatus)) {
return mStatus;
}
while (len > 0) {
SimpleBufferPage *p = mBufferList.getLast();
if (p && (p->mWriteOffset == SimpleBufferPage::kSimpleBufferPageSize)) {
// no room.. make a new page
p = nullptr;
}
if (!p) {
p = new (fallible) SimpleBufferPage();
if (!p) {
mStatus = NS_ERROR_OUT_OF_MEMORY;
return mStatus;
}
mBufferList.insertBack(p);
}
size_t roomOnPage = SimpleBufferPage::kSimpleBufferPageSize - p->mWriteOffset;
size_t toWrite = std::min(roomOnPage, len);
memcpy(p->mBuffer + p->mWriteOffset, src, toWrite);
src += toWrite;
len -= toWrite;
p->mWriteOffset += toWrite;
mAvailable += toWrite;
}
return NS_OK;
}
size_t SimpleBuffer::Read(char *dest, size_t maxLen)
{
MOZ_ASSERT(PR_GetCurrentThread() == mOwningThread);
if (NS_FAILED(mStatus)) {
return 0;
}
size_t rv = 0;
for (SimpleBufferPage *p = mBufferList.getFirst();
p && (rv < maxLen); p = mBufferList.getFirst()) {
size_t avail = p->mWriteOffset - p->mReadOffset;
size_t toRead = std::min(avail, (maxLen - rv));
memcpy(dest + rv, p->mBuffer + p->mReadOffset, toRead);
rv += toRead;
p->mReadOffset += toRead;
if (p->mReadOffset == p->mWriteOffset) {
p->remove();
delete p;
}
}
MOZ_ASSERT(mAvailable >= rv);
mAvailable -= rv;
return rv;
}
size_t SimpleBuffer::Available()
{
MOZ_ASSERT(PR_GetCurrentThread() == mOwningThread);
return NS_SUCCEEDED(mStatus) ? mAvailable : 0;
}
void SimpleBuffer::Clear()
{
MOZ_ASSERT(PR_GetCurrentThread() == mOwningThread);
SimpleBufferPage *p;
while ((p = mBufferList.popFirst())) {
delete p;
}
mAvailable = 0;
}
} // namespace net
} // namespace mozilla

View File

@ -0,0 +1,57 @@
/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim:set ts=2 sw=2 sts=2 et cindent: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifndef SimpleBuffer_h__
#define SimpleBuffer_h__
/*
This class is similar to a nsPipe except it does not have any locking, stores
an unbounded amount of data, can only be used on one thread, and has much
simpler result code semantics to deal with.
*/
#include "prtypes.h"
#include "mozilla/LinkedList.h"
#include "nsIThreadInternal.h"
namespace mozilla {
namespace net {
class SimpleBufferPage : public LinkedListElement<SimpleBufferPage>
{
public:
SimpleBufferPage() : mReadOffset(0), mWriteOffset(0) {}
static const size_t kSimpleBufferPageSize = 32000;
private:
friend class SimpleBuffer;
char mBuffer[kSimpleBufferPageSize];
size_t mReadOffset;
size_t mWriteOffset;
};
class SimpleBuffer
{
public:
SimpleBuffer();
~SimpleBuffer() {}
nsresult Write(char *stc, size_t len); // return OK or OUT_OF_MEMORY
size_t Read(char *dest, size_t maxLen); // return bytes read
size_t Available();
void Clear();
private:
PRThread *mOwningThread;
nsresult mStatus;
AutoCleanLinkedList<SimpleBufferPage> mBufferList;
size_t mAvailable;
};
} // namespace net
} // namespace mozilla
#endif

View File

@ -254,6 +254,7 @@ UNIFIED_SOURCES += [
'ProxyAutoConfig.cpp',
'RedirectChannelRegistrar.cpp',
'SchedulingContextService.cpp',
'SimpleBuffer.cpp',
'StreamingProtocolService.cpp',
'Tickler.cpp',
'TLSServerSocket.cpp',

View File

@ -68,7 +68,7 @@ public:
// this could go either way, but because there are network instances of
// it being a hard error we should consider it hard.
if (code == NS_ERROR_FAILURE) {
if (code == NS_ERROR_FAILURE || code == NS_ERROR_OUT_OF_MEMORY) {
return false;
}

View File

@ -235,28 +235,12 @@ Http2Stream::ReadSegments(nsAHttpSegmentReader *reader,
return rv;
}
static bool
IsDataAvailable(nsIInputStream *stream)
{
if (!stream) {
return false;
}
uint64_t avail;
if (NS_FAILED(stream->Available(&avail))) {
return false;
}
return (avail > 0);
}
uint64_t
Http2Stream::LocalUnAcked()
{
// reduce unacked by the amount of undelivered data
// to help assert flow control
uint64_t undelivered = 0;
if (mInputBufferIn) {
mInputBufferIn->Available(&undelivered);
}
uint64_t undelivered = mSimpleBuffer.Available();
if (undelivered > mLocalUnacked) {
return 0;
@ -267,25 +251,20 @@ Http2Stream::LocalUnAcked()
nsresult
Http2Stream::BufferInput(uint32_t count, uint32_t *countWritten)
{
static const uint32_t segmentSize = 32768;
char buf[segmentSize];
count = std::min(segmentSize, count);
if (!mInputBufferOut) {
NS_NewPipe(getter_AddRefs(mInputBufferIn), getter_AddRefs(mInputBufferOut),
segmentSize, UINT32_MAX);
if (!mInputBufferOut) {
return NS_ERROR_OUT_OF_MEMORY;
}
char buf[SimpleBufferPage::kSimpleBufferPageSize];
if (SimpleBufferPage::kSimpleBufferPageSize < count) {
count = SimpleBufferPage::kSimpleBufferPageSize;
}
mBypassInputBuffer = 1;
nsresult rv = mSegmentWriter->OnWriteSegment(buf, count, countWritten);
mBypassInputBuffer = 0;
if (NS_SUCCEEDED(rv)) {
uint32_t buffered;
rv = mInputBufferOut->Write(buf, *countWritten, &buffered);
if (NS_SUCCEEDED(rv) && (buffered != *countWritten)) {
rv = NS_ERROR_OUT_OF_MEMORY;
rv = mSimpleBuffer.Write(buf, *countWritten);
if (NS_FAILED(rv)) {
MOZ_ASSERT(rv == NS_ERROR_OUT_OF_MEMORY);
return NS_ERROR_OUT_OF_MEMORY;
}
}
return rv;
@ -295,7 +274,7 @@ bool
Http2Stream::DeferCleanup(nsresult status)
{
// do not cleanup a stream that has data buffered for the transaction
return (NS_SUCCEEDED(status) && IsDataAvailable(mInputBufferIn));
return (NS_SUCCEEDED(status) && mSimpleBuffer.Available());
}
// WriteSegments() is used to read data off the socket. Generally this is
@ -1432,16 +1411,12 @@ Http2Stream::OnWriteSegment(char *buf,
// so that other streams can proceed when the gecko caller is not processing
// data events fast enough and flow control hasn't caught up yet. This
// gets the stored data out of that pipe
if (!mBypassInputBuffer && IsDataAvailable(mInputBufferIn)) {
nsresult rv = mInputBufferIn->Read(buf, count, countWritten);
if (!mBypassInputBuffer && mSimpleBuffer.Available()) {
*countWritten = mSimpleBuffer.Read(buf, count);
MOZ_ASSERT(*countWritten);
LOG3(("Http2Stream::OnWriteSegment read from flow control buffer %p %x %d\n",
this, mStreamID, *countWritten));
if (!IsDataAvailable(mInputBufferIn)) {
// drop the pipe if we don't need it anymore
mInputBufferIn = nullptr;
mInputBufferOut = nullptr;
}
return rv;
return NS_OK;
}
// read from the network

View File

@ -13,6 +13,7 @@
#include "mozilla/UniquePtr.h"
#include "nsAHttpTransaction.h"
#include "nsISupportsPriority.h"
#include "SimpleBuffer.h"
class nsStandardURL;
class nsIInputStream;
@ -325,10 +326,9 @@ private:
// For Http2Push
Http2PushedStream *mPushSource;
// A pipe used to store stream data when the transaction cannot keep up
// Used to store stream data when the transaction channel cannot keep up
// and flow control has not yet kicked in.
nsCOMPtr<nsIInputStream> mInputBufferIn;
nsCOMPtr<nsIOutputStream> mInputBufferOut;
SimpleBuffer mSimpleBuffer;
/// connect tunnels
public: