mirror of
https://github.com/mozilla/gecko-dev.git
synced 2024-12-01 00:32:11 +00:00
680bd63580
Differential Revision: https://phabricator.services.mozilla.com/D209747
412 lines
10 KiB
C++
412 lines
10 KiB
C++
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
|
|
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
|
|
/* This Source Code Form is subject to the terms of the Mozilla Public
|
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
|
|
|
#include "ThrottleQueue.h"
|
|
#include "mozilla/Components.h"
|
|
#include "mozilla/net/InputChannelThrottleQueueParent.h"
|
|
#include "nsISeekableStream.h"
|
|
#include "nsIAsyncInputStream.h"
|
|
#include "nsIOService.h"
|
|
#include "nsSocketTransportService2.h"
|
|
#include "nsStreamUtils.h"
|
|
#include "nsNetUtil.h"
|
|
|
|
namespace mozilla {
|
|
namespace net {
|
|
|
|
//-----------------------------------------------------------------------------
|
|
|
|
class ThrottleInputStream final : public nsIAsyncInputStream,
|
|
public nsISeekableStream {
|
|
public:
|
|
ThrottleInputStream(nsIInputStream* aStream, ThrottleQueue* aQueue);
|
|
|
|
NS_DECL_THREADSAFE_ISUPPORTS
|
|
NS_DECL_NSIINPUTSTREAM
|
|
NS_DECL_NSISEEKABLESTREAM
|
|
NS_DECL_NSITELLABLESTREAM
|
|
NS_DECL_NSIASYNCINPUTSTREAM
|
|
|
|
void AllowInput();
|
|
|
|
private:
|
|
~ThrottleInputStream();
|
|
|
|
nsCOMPtr<nsIInputStream> mStream;
|
|
RefPtr<ThrottleQueue> mQueue;
|
|
nsresult mClosedStatus;
|
|
|
|
nsCOMPtr<nsIInputStreamCallback> mCallback;
|
|
nsCOMPtr<nsIEventTarget> mEventTarget;
|
|
};
|
|
|
|
NS_IMPL_ISUPPORTS(ThrottleInputStream, nsIAsyncInputStream, nsIInputStream,
|
|
nsITellableStream, nsISeekableStream)
|
|
|
|
ThrottleInputStream::ThrottleInputStream(nsIInputStream* aStream,
|
|
ThrottleQueue* aQueue)
|
|
: mStream(aStream), mQueue(aQueue), mClosedStatus(NS_OK) {
|
|
MOZ_ASSERT(aQueue != nullptr);
|
|
}
|
|
|
|
ThrottleInputStream::~ThrottleInputStream() { Close(); }
|
|
|
|
NS_IMETHODIMP
|
|
ThrottleInputStream::Close() {
|
|
if (NS_FAILED(mClosedStatus)) {
|
|
return mClosedStatus;
|
|
}
|
|
|
|
if (mQueue) {
|
|
mQueue->DequeueStream(this);
|
|
mQueue = nullptr;
|
|
mClosedStatus = NS_BASE_STREAM_CLOSED;
|
|
}
|
|
return mStream->Close();
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
ThrottleInputStream::Available(uint64_t* aResult) {
|
|
if (NS_FAILED(mClosedStatus)) {
|
|
return mClosedStatus;
|
|
}
|
|
|
|
return mStream->Available(aResult);
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
ThrottleInputStream::StreamStatus() {
|
|
if (NS_FAILED(mClosedStatus)) {
|
|
return mClosedStatus;
|
|
}
|
|
|
|
return mStream->StreamStatus();
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
ThrottleInputStream::Read(char* aBuf, uint32_t aCount, uint32_t* aResult) {
|
|
if (NS_FAILED(mClosedStatus)) {
|
|
return mClosedStatus;
|
|
}
|
|
|
|
uint32_t realCount;
|
|
nsresult rv = mQueue->Available(aCount, &realCount);
|
|
if (NS_FAILED(rv)) {
|
|
return rv;
|
|
}
|
|
|
|
if (realCount == 0) {
|
|
return NS_BASE_STREAM_WOULD_BLOCK;
|
|
}
|
|
|
|
rv = mStream->Read(aBuf, realCount, aResult);
|
|
if (NS_SUCCEEDED(rv) && *aResult > 0) {
|
|
mQueue->RecordRead(*aResult);
|
|
}
|
|
return rv;
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
ThrottleInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
|
|
uint32_t aCount, uint32_t* aResult) {
|
|
if (NS_FAILED(mClosedStatus)) {
|
|
return mClosedStatus;
|
|
}
|
|
|
|
uint32_t realCount;
|
|
nsresult rv = mQueue->Available(aCount, &realCount);
|
|
if (NS_FAILED(rv)) {
|
|
return rv;
|
|
}
|
|
MOZ_ASSERT(realCount <= aCount);
|
|
|
|
if (realCount == 0) {
|
|
return NS_BASE_STREAM_WOULD_BLOCK;
|
|
}
|
|
|
|
rv = mStream->ReadSegments(aWriter, aClosure, realCount, aResult);
|
|
if (NS_SUCCEEDED(rv) && *aResult > 0) {
|
|
mQueue->RecordRead(*aResult);
|
|
}
|
|
return rv;
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
ThrottleInputStream::IsNonBlocking(bool* aNonBlocking) {
|
|
*aNonBlocking = true;
|
|
return NS_OK;
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
ThrottleInputStream::Seek(int32_t aWhence, int64_t aOffset) {
|
|
if (NS_FAILED(mClosedStatus)) {
|
|
return mClosedStatus;
|
|
}
|
|
|
|
nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
|
|
if (!sstream) {
|
|
return NS_ERROR_FAILURE;
|
|
}
|
|
|
|
return sstream->Seek(aWhence, aOffset);
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
ThrottleInputStream::Tell(int64_t* aResult) {
|
|
if (NS_FAILED(mClosedStatus)) {
|
|
return mClosedStatus;
|
|
}
|
|
|
|
nsCOMPtr<nsITellableStream> sstream = do_QueryInterface(mStream);
|
|
if (!sstream) {
|
|
return NS_ERROR_FAILURE;
|
|
}
|
|
|
|
return sstream->Tell(aResult);
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
ThrottleInputStream::SetEOF() {
|
|
if (NS_FAILED(mClosedStatus)) {
|
|
return mClosedStatus;
|
|
}
|
|
|
|
nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
|
|
if (!sstream) {
|
|
return NS_ERROR_FAILURE;
|
|
}
|
|
|
|
return sstream->SetEOF();
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
ThrottleInputStream::CloseWithStatus(nsresult aStatus) {
|
|
if (NS_FAILED(mClosedStatus)) {
|
|
// Already closed, ignore.
|
|
return NS_OK;
|
|
}
|
|
if (NS_SUCCEEDED(aStatus)) {
|
|
aStatus = NS_BASE_STREAM_CLOSED;
|
|
}
|
|
|
|
mClosedStatus = Close();
|
|
if (NS_SUCCEEDED(mClosedStatus)) {
|
|
mClosedStatus = aStatus;
|
|
}
|
|
return NS_OK;
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
ThrottleInputStream::AsyncWait(nsIInputStreamCallback* aCallback,
|
|
uint32_t aFlags, uint32_t aRequestedCount,
|
|
nsIEventTarget* aEventTarget) {
|
|
if (aFlags != 0) {
|
|
return NS_ERROR_ILLEGAL_VALUE;
|
|
}
|
|
|
|
mCallback = aCallback;
|
|
mEventTarget = aEventTarget;
|
|
if (mCallback) {
|
|
mQueue->QueueStream(this);
|
|
} else {
|
|
mQueue->DequeueStream(this);
|
|
}
|
|
return NS_OK;
|
|
}
|
|
|
|
void ThrottleInputStream::AllowInput() {
|
|
MOZ_ASSERT(mCallback);
|
|
nsCOMPtr<nsIInputStreamCallback> callbackEvent = NS_NewInputStreamReadyEvent(
|
|
"ThrottleInputStream::AllowInput", mCallback, mEventTarget);
|
|
mCallback = nullptr;
|
|
mEventTarget = nullptr;
|
|
callbackEvent->OnInputStreamReady(this);
|
|
}
|
|
|
|
//-----------------------------------------------------------------------------
|
|
|
|
// static
|
|
already_AddRefed<nsIInputChannelThrottleQueue> ThrottleQueue::Create() {
|
|
MOZ_ASSERT(XRE_IsParentProcess());
|
|
|
|
nsCOMPtr<nsIInputChannelThrottleQueue> tq;
|
|
if (nsIOService::UseSocketProcess()) {
|
|
tq = new InputChannelThrottleQueueParent();
|
|
} else {
|
|
tq = new ThrottleQueue();
|
|
}
|
|
|
|
return tq.forget();
|
|
}
|
|
|
|
NS_IMPL_ISUPPORTS(ThrottleQueue, nsIInputChannelThrottleQueue, nsITimerCallback,
|
|
nsINamed)
|
|
|
|
ThrottleQueue::ThrottleQueue()
|
|
|
|
{
|
|
nsresult rv;
|
|
nsCOMPtr<nsIEventTarget> sts;
|
|
nsCOMPtr<nsIIOService> ioService = do_GetIOService(&rv);
|
|
if (NS_SUCCEEDED(rv)) {
|
|
sts = mozilla::components::SocketTransport::Service(&rv);
|
|
}
|
|
if (NS_SUCCEEDED(rv)) mTimer = NS_NewTimer(sts);
|
|
}
|
|
|
|
ThrottleQueue::~ThrottleQueue() {
|
|
if (mTimer && mTimerArmed) {
|
|
mTimer->Cancel();
|
|
}
|
|
mTimer = nullptr;
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
ThrottleQueue::RecordRead(uint32_t aBytesRead) {
|
|
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
|
|
ThrottleEntry entry;
|
|
entry.mTime = TimeStamp::Now();
|
|
entry.mBytesRead = aBytesRead;
|
|
mReadEvents.AppendElement(entry);
|
|
mBytesProcessed += aBytesRead;
|
|
return NS_OK;
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
ThrottleQueue::Available(uint32_t aRemaining, uint32_t* aAvailable) {
|
|
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
|
|
TimeStamp now = TimeStamp::Now();
|
|
TimeStamp oneSecondAgo = now - TimeDuration::FromSeconds(1);
|
|
size_t i;
|
|
|
|
// Remove all stale events.
|
|
for (i = 0; i < mReadEvents.Length(); ++i) {
|
|
if (mReadEvents[i].mTime >= oneSecondAgo) {
|
|
break;
|
|
}
|
|
}
|
|
mReadEvents.RemoveElementsAt(0, i);
|
|
|
|
uint32_t totalBytes = 0;
|
|
for (i = 0; i < mReadEvents.Length(); ++i) {
|
|
totalBytes += mReadEvents[i].mBytesRead;
|
|
}
|
|
|
|
uint32_t spread = mMaxBytesPerSecond - mMeanBytesPerSecond;
|
|
double prob = static_cast<double>(rand()) / RAND_MAX;
|
|
uint32_t thisSliceBytes =
|
|
mMeanBytesPerSecond - spread + static_cast<uint32_t>(2 * spread * prob);
|
|
|
|
if (totalBytes >= thisSliceBytes) {
|
|
*aAvailable = 0;
|
|
} else {
|
|
*aAvailable = std::min(thisSliceBytes, aRemaining);
|
|
}
|
|
return NS_OK;
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
ThrottleQueue::Init(uint32_t aMeanBytesPerSecond, uint32_t aMaxBytesPerSecond) {
|
|
// Can be called on any thread.
|
|
if (aMeanBytesPerSecond == 0 || aMaxBytesPerSecond == 0 ||
|
|
aMaxBytesPerSecond < aMeanBytesPerSecond) {
|
|
return NS_ERROR_ILLEGAL_VALUE;
|
|
}
|
|
|
|
mMeanBytesPerSecond = aMeanBytesPerSecond;
|
|
mMaxBytesPerSecond = aMaxBytesPerSecond;
|
|
return NS_OK;
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
ThrottleQueue::BytesProcessed(uint64_t* aResult) {
|
|
*aResult = mBytesProcessed;
|
|
return NS_OK;
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
ThrottleQueue::WrapStream(nsIInputStream* aInputStream,
|
|
nsIAsyncInputStream** aResult) {
|
|
nsCOMPtr<nsIAsyncInputStream> result =
|
|
new ThrottleInputStream(aInputStream, this);
|
|
result.forget(aResult);
|
|
return NS_OK;
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
ThrottleQueue::Notify(nsITimer* aTimer) {
|
|
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
|
|
// A notified reader may need to push itself back on the queue.
|
|
// Swap out the list of readers so that this works properly.
|
|
nsTArray<RefPtr<ThrottleInputStream>> events = std::move(mAsyncEvents);
|
|
|
|
// Optimistically notify all the waiting readers, and then let them
|
|
// requeue if there isn't enough bandwidth.
|
|
for (size_t i = 0; i < events.Length(); ++i) {
|
|
events[i]->AllowInput();
|
|
}
|
|
|
|
mTimerArmed = false;
|
|
return NS_OK;
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
ThrottleQueue::GetName(nsACString& aName) {
|
|
aName.AssignLiteral("net::ThrottleQueue");
|
|
return NS_OK;
|
|
}
|
|
|
|
void ThrottleQueue::QueueStream(ThrottleInputStream* aStream) {
|
|
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
|
|
if (mAsyncEvents.IndexOf(aStream) ==
|
|
nsTArray<RefPtr<mozilla::net::ThrottleInputStream>>::NoIndex) {
|
|
mAsyncEvents.AppendElement(aStream);
|
|
|
|
if (!mTimerArmed) {
|
|
uint32_t ms = 1000;
|
|
if (mReadEvents.Length() > 0) {
|
|
TimeStamp t = mReadEvents[0].mTime + TimeDuration::FromSeconds(1);
|
|
TimeStamp now = TimeStamp::Now();
|
|
|
|
if (t > now) {
|
|
ms = static_cast<uint32_t>((t - now).ToMilliseconds());
|
|
} else {
|
|
ms = 1;
|
|
}
|
|
}
|
|
|
|
if (NS_SUCCEEDED(
|
|
mTimer->InitWithCallback(this, ms, nsITimer::TYPE_ONE_SHOT))) {
|
|
mTimerArmed = true;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void ThrottleQueue::DequeueStream(ThrottleInputStream* aStream) {
|
|
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
|
|
mAsyncEvents.RemoveElement(aStream);
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
ThrottleQueue::GetMeanBytesPerSecond(uint32_t* aMeanBytesPerSecond) {
|
|
NS_ENSURE_ARG(aMeanBytesPerSecond);
|
|
|
|
*aMeanBytesPerSecond = mMeanBytesPerSecond;
|
|
return NS_OK;
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
ThrottleQueue::GetMaxBytesPerSecond(uint32_t* aMaxBytesPerSecond) {
|
|
NS_ENSURE_ARG(aMaxBytesPerSecond);
|
|
|
|
*aMaxBytesPerSecond = mMaxBytesPerSecond;
|
|
return NS_OK;
|
|
}
|
|
|
|
} // namespace net
|
|
} // namespace mozilla
|