Bug 1707078 - Change some raw pointers into WeakPtr in Http2Session r=necko-reviewers,valentin

Differential Revision: https://phabricator.services.mozilla.com/D113171
This commit is contained in:
Dragana Damjanovic 2021-04-30 11:39:18 +00:00
parent 2c1ee5e54c
commit 31b7cd7bb5
3 changed files with 70 additions and 43 deletions

View File

@ -194,7 +194,7 @@ void Http2Session::PrintDiagnostics(nsCString& log) {
log.AppendPrintf(" transactionHashCount = %d streamIDHashCount = %d\n",
mStreamTransactionHash.Count(), mStreamIDHash.Count());
log.AppendPrintf(" Queued Stream Size = %zu\n", mQueuedStreams.GetSize());
log.AppendPrintf(" Queued Stream Size = %zu\n", mQueuedStreams.Length());
PRIntervalTime now = PR_IntervalNow();
log.AppendPrintf(" Ping Threshold = %ums\n",

View File

@ -51,6 +51,37 @@ NS_INTERFACE_MAP_BEGIN(Http2Session)
NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsAHttpConnection)
NS_INTERFACE_MAP_END
static void RemoveStreamFromQueue(Http2Stream* aStream,
nsTArray<WeakPtr<Http2Stream>>& queue) {
for (const auto& stream : Reversed(queue)) {
if (stream == aStream) {
queue.RemoveElement(stream);
}
}
}
static void AddStreamToQueue(Http2Stream* aStream,
nsTArray<WeakPtr<Http2Stream>>& queue) {
if (!queue.Contains(aStream)) {
queue.AppendElement(aStream);
}
}
static already_AddRefed<Http2Stream> GetNextStreamFromQueue(
nsTArray<WeakPtr<Http2Stream>>& queue) {
while (!queue.IsEmpty() && !queue[0]) {
MOZ_ASSERT(false);
queue.RemoveElementAt(0);
}
if (queue.IsEmpty()) {
return nullptr;
}
RefPtr<Http2Stream> stream = queue[0].get();
queue.RemoveElementAt(0);
return stream.forget();
}
// "magic" refers to the string that preceeds HTTP/2 on the wire
// to help find any intermediaries speaking an older version of HTTP
const uint8_t Http2Session::kMagicHello[] = {
@ -537,7 +568,7 @@ bool Http2Session::AddStream(nsAHttpTransaction* aHttpTransaction,
RefPtr<Http2Stream> stream = refStream;
mStreamTransactionHash.InsertOrUpdate(aHttpTransaction, std::move(refStream));
mReadyForWrite.Push(stream);
AddStreamToQueue(stream, mReadyForWrite);
SetWriteCallbacks();
// Kick off the SYN transmit without waiting for the poll loop
@ -567,29 +598,28 @@ void Http2Session::QueueStream(Http2Stream* stream) {
LOG3(("Http2Session::QueueStream %p stream %p queued.", this, stream));
#ifdef DEBUG
int32_t qsize = mQueuedStreams.GetSize();
for (int32_t i = 0; i < qsize; i++) {
Http2Stream* qStream = mQueuedStreams.ObjectAt(i);
for (const auto& qStream : mQueuedStreams) {
MOZ_ASSERT(qStream != stream);
MOZ_ASSERT(qStream->Queued());
}
#endif
stream->SetQueued(true);
mQueuedStreams.Push(stream);
AddStreamToQueue(stream, mQueuedStreams);
}
void Http2Session::ProcessPending() {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
Http2Stream* stream;
while (RoomForMoreConcurrent() && (stream = mQueuedStreams.PopFront())) {
RefPtr<Http2Stream> stream;
while (RoomForMoreConcurrent() &&
(stream = GetNextStreamFromQueue(mQueuedStreams))) {
LOG3(("Http2Session::ProcessPending %p stream %p woken from queue.", this,
stream));
stream.get()));
MOZ_ASSERT(!stream->CountAsActive());
MOZ_ASSERT(stream->Queued());
stream->SetQueued(false);
mReadyForWrite.Push(stream);
AddStreamToQueue(stream, mReadyForWrite);
SetWriteCallbacks();
}
}
@ -695,7 +725,7 @@ enum SpdyVersion Http2Session::SpdyVersion() { return SpdyVersion::HTTP_2; }
uint32_t Http2Session::GetWriteQueueSize() {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
return mReadyForWrite.GetSize();
return mReadyForWrite.Length();
}
void Http2Session::ChangeDownstreamState(enum internalStateType newState) {
@ -1249,15 +1279,6 @@ void Http2Session::CleanupStream(uint32_t aID, nsresult aResult,
CleanupStream(stream, aResult, aResetCode);
}
static void RemoveStreamFromQueue(Http2Stream* aStream,
nsDeque<Http2Stream>& queue) {
size_t size = queue.GetSize();
for (size_t count = 0; count < size; ++count) {
Http2Stream* stream = queue.PopFront();
if (stream != aStream) queue.Push(stream);
}
}
void Http2Session::RemoveStreamFromQueues(Http2Stream* aStream) {
RemoveStreamFromQueue(aStream, mReadyForWrite);
RemoveStreamFromQueue(aStream, mQueuedStreams);
@ -1715,6 +1736,7 @@ nsresult Http2Session::RecvSettings(Http2Session* self) {
}
nsresult Http2Session::RecvPushPromise(Http2Session* self) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
MOZ_ASSERT(self->mInputFrameType == FRAME_TYPE_PUSH_PROMISE ||
self->mInputFrameType == FRAME_TYPE_CONTINUATION);
@ -2098,6 +2120,7 @@ nsresult Http2Session::RecvPing(Http2Session* self) {
}
nsresult Http2Session::RecvGoAway(Http2Session* self) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
MOZ_ASSERT(self->mInputFrameType == FRAME_TYPE_GOAWAY);
if (self->mInputFrameDataSize < 8) {
@ -2154,9 +2177,7 @@ nsresult Http2Session::RecvGoAway(Http2Session* self) {
// Queued streams can also be deleted from this session and restarted
// in another one. (they were never sent on the network so they implicitly
// are not covered by the last-good id.
size = self->mQueuedStreams.GetSize();
for (size_t count = 0; count < size; ++count) {
Http2Stream* stream = self->mQueuedStreams.PopFront();
for (const auto& stream : self->mQueuedStreams) {
MOZ_ASSERT(stream->Queued());
stream->SetQueued(false);
if (self->mPeerGoAwayReason == HTTP_1_1_REQUIRED) {
@ -2165,6 +2186,7 @@ nsresult Http2Session::RecvGoAway(Http2Session* self) {
self->CloseStream(stream, NS_ERROR_NET_RESET);
self->mStreamTransactionHash.Remove(stream->Transaction());
}
self->mQueuedStreams.Clear();
LOG3(
("Http2Session::RecvGoAway %p GOAWAY Last-Good-ID 0x%X status 0x%X "
@ -2177,6 +2199,7 @@ nsresult Http2Session::RecvGoAway(Http2Session* self) {
}
nsresult Http2Session::RecvWindowUpdate(Http2Session* self) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
MOZ_ASSERT(self->mInputFrameType == FRAME_TYPE_WINDOW_UPDATE);
if (self->mInputFrameDataSize != 4) {
@ -2268,7 +2291,7 @@ nsresult Http2Session::RecvWindowUpdate(Http2Session* self) {
continue;
}
self->mReadyForWrite.Push(stream);
AddStreamToQueue(stream, self->mReadyForWrite);
self->SetWriteCallbacks();
}
}
@ -2740,7 +2763,8 @@ nsresult Http2Session::ReadSegmentsAgain(nsAHttpSegmentReader* reader,
LOG3(("Http2Session::ReadSegments %p", this));
Http2Stream* stream = mReadyForWrite.PopFront();
RefPtr<Http2Stream> stream = GetNextStreamFromQueue(mReadyForWrite);
if (!stream) {
LOG3(("Http2Session %p could not identify a stream to write; suspending.",
this));
@ -2764,7 +2788,7 @@ nsresult Http2Session::ReadSegmentsAgain(nsAHttpSegmentReader* reader,
if (mAttemptingEarlyData) {
if (!stream->Do0RTT()) {
LOG3(("Http2Session %p will not get early data from Http2Stream %p 0x%X",
this, stream, stream->StreamID()));
this, stream.get(), stream->StreamID()));
FlushOutputQueue();
SetWriteCallbacks();
if (!mCannotDo0RTTStreams.Contains(stream)) {
@ -2787,7 +2811,7 @@ nsresult Http2Session::ReadSegmentsAgain(nsAHttpSegmentReader* reader,
LOG3(
("Http2Session %p will write from Http2Stream %p 0x%X "
"block-input=%d block-output=%d\n",
this, stream, stream->StreamID(), stream->RequestBlockedOnRead(),
this, stream.get(), stream->StreamID(), stream->RequestBlockedOnRead(),
stream->BlockedOnRwin()));
rv = stream->ReadSegments(this, count, countRead);
@ -2851,21 +2875,21 @@ nsresult Http2Session::ReadSegmentsAgain(nsAHttpSegmentReader* reader,
}
if (*countRead > 0) {
LOG3(("Http2Session::ReadSegments %p stream=%p countread=%d", this, stream,
*countRead));
mReadyForWrite.Push(stream);
LOG3(("Http2Session::ReadSegments %p stream=%p countread=%d", this,
stream.get(), *countRead));
AddStreamToQueue(stream, mReadyForWrite);
SetWriteCallbacks();
return rv;
}
if (stream->BlockedOnRwin()) {
LOG3(("Http2Session %p will stream %p 0x%X suspended for flow control\n",
this, stream, stream->StreamID()));
this, stream.get(), stream->StreamID()));
return NS_BASE_STREAM_WOULD_BLOCK;
}
LOG3(("Http2Session::ReadSegments %p stream=%p stream send complete", this,
stream));
stream.get()));
// call readsegments again if there are other streams ready
// to go in this session
@ -2988,7 +3012,8 @@ nsresult Http2Session::WriteSegmentsAgain(nsAHttpSegmentWriter* writer,
// If there are http transactions attached to a push stream with filled
// buffers trigger that data pump here. This only reads from buffers (not the
// network) so mDownstreamState doesn't matter.
Http2Stream* pushConnectedStream = mPushesReadyForRead.PopFront();
RefPtr<Http2Stream> pushConnectedStream =
GetNextStreamFromQueue(mPushesReadyForRead);
if (pushConnectedStream) {
return ProcessConnectedPush(pushConnectedStream, writer, count,
countWritten);
@ -2996,7 +3021,8 @@ nsresult Http2Session::WriteSegmentsAgain(nsAHttpSegmentWriter* writer,
// feed gecko channels that previously stopped consuming data
// only take data from stored buffers
Http2Stream* slowConsumer = mSlowConsumersReadyForRead.PopFront();
RefPtr<Http2Stream> slowConsumer =
GetNextStreamFromQueue(mSlowConsumersReadyForRead);
if (slowConsumer) {
internalStateType savedState = mDownstreamState;
mDownstreamState = NOT_USING_NETWORK;
@ -3354,7 +3380,7 @@ nsresult Http2Session::WriteSegmentsAgain(nsAHttpSegmentWriter* writer,
}
}
if (enqueueSink) {
mPushesReadyForRead.Push(pushSink);
AddStreamToQueue(pushSink, mPushesReadyForRead);
// No use trying to clean up, it won't do anything, anyway
streamToCleanup = nullptr;
}
@ -3959,14 +3985,14 @@ void Http2Session::SetNeedsCleanup() {
}
void Http2Session::ConnectPushedStream(Http2Stream* stream) {
mPushesReadyForRead.Push(stream);
AddStreamToQueue(stream, mPushesReadyForRead);
Unused << ForceRecv();
}
void Http2Session::ConnectSlowConsumer(Http2Stream* stream) {
LOG3(("Http2Session::ConnectSlowConsumer %p 0x%X\n", this,
stream->StreamID()));
mSlowConsumersReadyForRead.Push(stream);
AddStreamToQueue(stream, mSlowConsumersReadyForRead);
Unused << ForceRecv();
}
@ -4229,7 +4255,7 @@ void Http2Session::TransactionHasDataToWrite(nsAHttpTransaction* caller) {
stream->StreamID()));
if (!mClosed) {
mReadyForWrite.Push(stream);
AddStreamToQueue(stream, mReadyForWrite);
SetWriteCallbacks();
} else {
LOG3(
@ -4267,7 +4293,7 @@ void Http2Session::TransactionHasDataToWrite(Http2Stream* stream) {
LOG3(("Http2Session::TransactionHasDataToWrite %p stream=%p ID=0x%x", this,
stream, stream->StreamID()));
mReadyForWrite.Push(stream);
AddStreamToQueue(stream, mReadyForWrite);
SetWriteCallbacks();
Unused << ForceSend();
}
@ -4337,6 +4363,7 @@ uint32_t Http2Session::Http1xTransactionCount() { return 0; }
nsresult Http2Session::TakeSubTransactions(
nsTArray<RefPtr<nsAHttpTransaction> >& outTransactions) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
// Generally this cannot be done with http/2 as transactions are
// started right away.

View File

@ -382,10 +382,10 @@ class Http2Session final : public ASpdySession,
nsRefPtrHashtable<nsPtrHashKey<nsAHttpTransaction>, Http2Stream>
mStreamTransactionHash;
nsDeque<Http2Stream> mReadyForWrite;
nsDeque<Http2Stream> mQueuedStreams;
nsDeque<Http2Stream> mPushesReadyForRead;
nsDeque<Http2Stream> mSlowConsumersReadyForRead;
nsTArray<WeakPtr<Http2Stream>> mReadyForWrite;
nsTArray<WeakPtr<Http2Stream>> mQueuedStreams;
nsTArray<WeakPtr<Http2Stream>> mPushesReadyForRead;
nsTArray<WeakPtr<Http2Stream>> mSlowConsumersReadyForRead;
nsTArray<Http2PushedStream*> mPushedStreams;
// Compression contexts for header transport.