From 0ce97e7c2d8fd9a85d384a1aa524799ad81d20f6 Mon Sep 17 00:00:00 2001 From: Randell Jesup Date: Fri, 2 Nov 2012 15:28:13 -0400 Subject: [PATCH] Bug 807929: Make DataChannel refcounted r=mcmanus --- content/base/src/nsDOMDataChannel.cpp | 6 +- content/base/src/nsDOMDataChannel.h | 3 +- .../src/peerconnection/PeerConnectionImpl.cpp | 4 +- netwerk/sctp/datachannel/DataChannel.cpp | 142 ++++++++++-------- netwerk/sctp/datachannel/DataChannel.h | 35 ++--- 5 files changed, 102 insertions(+), 88 deletions(-) diff --git a/content/base/src/nsDOMDataChannel.cpp b/content/base/src/nsDOMDataChannel.cpp index 69892c0bdc67..83cdec3566da 100644 --- a/content/base/src/nsDOMDataChannel.cpp +++ b/content/base/src/nsDOMDataChannel.cpp @@ -49,7 +49,7 @@ class nsDOMDataChannel : public nsDOMEventTargetHelper, public mozilla::DataChannelListener { public: - nsDOMDataChannel(mozilla::DataChannel* aDataChannel) + nsDOMDataChannel(already_AddRefed aDataChannel) : mDataChannel(aDataChannel) , mBinaryType(DC_BINARY_TYPE_BLOB) {} @@ -92,7 +92,7 @@ private: JSContext *aCx); // Owning reference - nsAutoPtr mDataChannel; + nsRefPtr mDataChannel; nsString mOrigin; enum { @@ -492,7 +492,7 @@ nsDOMDataChannel::AppReady() /* static */ nsresult -NS_NewDOMDataChannel(mozilla::DataChannel* aDataChannel, +NS_NewDOMDataChannel(already_AddRefed aDataChannel, nsPIDOMWindow* aWindow, nsIDOMDataChannel** aDomDataChannel) { diff --git a/content/base/src/nsDOMDataChannel.h b/content/base/src/nsDOMDataChannel.h index d4e4abce8b43..c30dad0fd098 100644 --- a/content/base/src/nsDOMDataChannel.h +++ b/content/base/src/nsDOMDataChannel.h @@ -10,6 +10,7 @@ // This defines only what's necessary to create nsDOMDataChannels, since this // gets used with MOZ_INTERNAL_API not set for media/webrtc/signaling/testing +#include "nsCOMPtr.h" #include "nsIDOMDataChannel.h" namespace mozilla { @@ -19,7 +20,7 @@ namespace mozilla { class nsPIDOMWindow; nsresult -NS_NewDOMDataChannel(mozilla::DataChannel* dataChannel, +NS_NewDOMDataChannel(already_AddRefed dataChannel, nsPIDOMWindow* aWindow, nsIDOMDataChannel** domDataChannel); diff --git a/media/webrtc/signaling/src/peerconnection/PeerConnectionImpl.cpp b/media/webrtc/signaling/src/peerconnection/PeerConnectionImpl.cpp index 85601a8f480b..c0e148999278 100644 --- a/media/webrtc/signaling/src/peerconnection/PeerConnectionImpl.cpp +++ b/media/webrtc/signaling/src/peerconnection/PeerConnectionImpl.cpp @@ -644,7 +644,7 @@ PeerConnectionImpl::CreateDataChannel(const nsACString& aLabel, MOZ_ASSERT(aRetval); #ifdef MOZILLA_INTERNAL_API - mozilla::DataChannel* dataChannel; + nsRefPtr dataChannel; mozilla::DataChannelConnection::Type theType = static_cast(aType); @@ -661,7 +661,7 @@ PeerConnectionImpl::CreateDataChannel(const nsACString& aLabel, CSFLogDebugS(logTag, __FUNCTION__ << ": making DOMDataChannel"); - return NS_NewDOMDataChannel(dataChannel, mWindow, aRetval); + return NS_NewDOMDataChannel(dataChannel.forget(), mWindow, aRetval); #else return NS_OK; #endif diff --git a/netwerk/sctp/datachannel/DataChannel.cpp b/netwerk/sctp/datachannel/DataChannel.cpp index 4d211a4b12c6..018ed0ba1986 100644 --- a/netwerk/sctp/datachannel/DataChannel.cpp +++ b/netwerk/sctp/datachannel/DataChannel.cpp @@ -621,6 +621,12 @@ DataChannelConnection::FindFreeStreamOut() limit = MAX_NUM_STREAMS; for (i = 0; i < limit; ++i) { if (!mStreamsOut[i]) { + // Verify it's not still in the process of closing + for (uint32_t j = 0; j < mStreamsResetting.Length(); ++j) { + if (mStreamsResetting[j] == i) { + continue; + } + } break; } } @@ -760,7 +766,7 @@ bool DataChannelConnection::SendDeferredMessages() { uint32_t i; - DataChannel *channel; + nsRefPtr channel; // we may null out the refs to this bool still_blocked = false; bool sent = false; @@ -814,7 +820,6 @@ DataChannelConnection::SendDeferredMessages() // delete the channel. mStreamsIn[channel->mStreamIn] = nullptr; mStreamsOut[channel->mStreamOut] = nullptr; - delete channel; } } } @@ -899,7 +904,7 @@ DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_ size_t length, uint16_t streamIn) { - DataChannel *channel; + nsRefPtr channel; uint32_t prValue; uint16_t prPolicy; uint32_t flags; @@ -929,7 +934,8 @@ DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_ } prValue = ntohs(req->reliability_params); flags = ntohs(req->flags) & DATA_CHANNEL_FLAG_OUT_OF_ORDER_ALLOWED; - channel = new DataChannel(this, INVALID_STREAM, streamIn, + channel = new DataChannel(this, + INVALID_STREAM, streamIn, DataChannel::CONNECTING, label, prPolicy, prValue, @@ -937,17 +943,18 @@ DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_ nullptr, nullptr); mStreamsIn[streamIn] = channel; - OpenResponseFinish(channel); + OpenResponseFinish(channel.forget()); } void -DataChannelConnection::OpenResponseFinish(DataChannel *channel) +DataChannelConnection::OpenResponseFinish(already_AddRefed aChannel) { + nsRefPtr channel(aChannel); uint16_t streamOut = FindFreeStreamOut(); // may be INVALID_STREAM! mLock.AssertCurrentThreadOwns(); - LOG(("Finished response: channel %p, streamOut = %u", channel, streamOut)); + LOG(("Finished response: channel %p, streamOut = %u", channel.get(), streamOut)); if (streamOut == INVALID_STREAM) { if (!RequestMoreStreamsOut()) { @@ -955,23 +962,22 @@ DataChannelConnection::OpenResponseFinish(DataChannel *channel) mStreamsIn[channel->mStreamIn] = nullptr; // we can do this with the lock held because mStreamOut is INVALID_STREAM, // so there's no outbound channel to reset - delete channel; return; } - LOG(("Queuing channel %p to finish response", channel)); + LOG(("Queuing channel %d to finish response", channel->mStreamIn)); channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_RSP; - mPending.Push(channel); + DataChannel *temp = channel.get(); // Can't cast away already_AddRefed<> from channel.forget() + channel.forget(); + mPending.Push(temp); // can't notify the user until we can send an OpenResponse } else { channel->mStreamOut = streamOut; mStreamsOut[streamOut] = channel; if (SendOpenResponseMessage(streamOut, channel->mStreamIn)) { - LOG(("successful incoming open of '%s' in: %u, out: %u\n", - channel->mLabel.get(), channel->mStreamIn, streamOut)); - /* Notify ondatachannel */ // XXX We need to make sure connection sticks around until the message is delivered - LOG(("%s: sending ON_CHANNEL_CREATED for %p", __FUNCTION__, channel)); + LOG(("%s: sending ON_CHANNEL_CREATED for %s: %d/%d", __FUNCTION__, + channel->mLabel.get(), streamOut, channel->mStreamIn)); NS_DispatchToMainThread(new DataChannelOnMessageAvailable( DataChannelOnMessageAvailable::ON_CHANNEL_CREATED, this, channel)); @@ -986,7 +992,6 @@ DataChannelConnection::OpenResponseFinish(DataChannel *channel) channel->mStreamOut = INVALID_STREAM; // we can do this with the lock held because mStreamOut is INVALID_STREAM, // so there's no outbound channel to reset (we failed to send on it) - delete channel; return; // paranoia against future changes since we unlocked } } @@ -1006,7 +1011,7 @@ DataChannelConnection::HandleOpenResponseMessage(const struct rtcweb_datachannel streamOut = ntohs(rsp->reverse_stream); channel = FindChannelByStreamOut(streamOut); - NS_ENSURE_TRUE(channel != nullptr, /* */); + NS_ENSURE_TRUE(channel, /* */); NS_ENSURE_TRUE(channel->mState == CONNECTING, /* */); if (rsp->error) { @@ -1044,7 +1049,7 @@ DataChannelConnection::HandleOpenAckMessage(const struct rtcweb_datachannel_ack channel = FindChannelByStreamIn(streamIn); - NS_ENSURE_TRUE(channel != nullptr, /* */); + NS_ENSURE_TRUE(channel, /* */); NS_ENSURE_TRUE(channel->mState == CONNECTING, /* */); channel->mState = channel->mReady ? DataChannel::OPEN : DataChannel::WAITING_TO_OPEN; @@ -1079,7 +1084,7 @@ DataChannelConnection::HandleDataMessage(uint32_t ppid, channel = FindChannelByStreamIn(streamIn); // XXX A closed channel may trip this... check - NS_ENSURE_TRUE(channel != nullptr, /* */); + NS_ENSURE_TRUE(channel, /* */); NS_ENSURE_TRUE(channel->mState != CONNECTING, /* */); // XXX should this be a simple if, no warnings/debugbreaks? @@ -1113,11 +1118,10 @@ DataChannelConnection::HandleDataMessage(uint32_t ppid, if (!channel->mBinaryBuffer.IsEmpty()) { channel->mBinaryBuffer += recvData; LOG(("%s: sending ON_DATA (binary fragmented) for %p", __FUNCTION__, channel)); - SendOrQueue(channel, - new DataChannelOnMessageAvailable( - DataChannelOnMessageAvailable::ON_DATA, this, - channel, channel->mBinaryBuffer, - channel->mBinaryBuffer.Length())); + channel->SendOrQueue(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_DATA, this, + channel, channel->mBinaryBuffer, + channel->mBinaryBuffer.Length())); channel->mBinaryBuffer.Truncate(0); return; } @@ -1130,24 +1134,19 @@ DataChannelConnection::HandleDataMessage(uint32_t ppid, } /* Notify onmessage */ LOG(("%s: sending ON_DATA for %p", __FUNCTION__, channel)); - SendOrQueue(channel, - new DataChannelOnMessageAvailable( - DataChannelOnMessageAvailable::ON_DATA, this, - channel, recvData, length)); + channel->SendOrQueue(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_DATA, this, + channel, recvData, length)); } } // Called with mLock locked! void -DataChannelConnection::SendOrQueue(DataChannel *aChannel, - DataChannelOnMessageAvailable *aMessage) +DataChannel::SendOrQueue(DataChannelOnMessageAvailable *aMessage) { - mLock.AssertCurrentThreadOwns(); - - if (!aChannel->mReady && - (aChannel->mState == DataChannel::CONNECTING || - aChannel->mState == DataChannel::WAITING_TO_OPEN)) { - aChannel->mQueuedMessages.AppendElement(aMessage); + if (!mReady && + (mState == CONNECTING || mState == WAITING_TO_OPEN)) { + mQueuedMessages.AppendElement(aMessage); } else { NS_DispatchToMainThread(aMessage); } @@ -1394,6 +1393,7 @@ DataChannelConnection::ResetOutgoingStream(uint16_t streamOut) uint32_t i; mLock.AssertCurrentThreadOwns(); + LOG(("Resetting outgoing stream %d",streamOut)); // Rarely has more than a couple items and only for a short time for (i = 0; i < mStreamsResetting.Length(); ++i) { if (mStreamsResetting[i] == streamOut) { @@ -1434,7 +1434,7 @@ void DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_event *strrst) { uint32_t n, i; - DataChannel *channel; + nsRefPtr channel; // since we may null out the ref to the channel if (!(strrst->strreset_flags & SCTP_STREAM_RESET_DENIED) && !(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) { @@ -1442,7 +1442,9 @@ DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_eve for (i = 0; i < n; ++i) { if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) { channel = FindChannelByStreamIn(strrst->strreset_stream_list[i]); - if (channel != nullptr) { + if (channel) { + LOG(("Channel %d outgoing/%d incoming closed", + channel->mStreamOut,channel->mStreamIn)); mStreamsIn[channel->mStreamIn] = nullptr; channel->mStreamIn = INVALID_STREAM; if (channel->mStreamOut == INVALID_STREAM) { @@ -1462,6 +1464,8 @@ DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_eve if (strrst->strreset_flags & SCTP_STREAM_RESET_OUTGOING_SSN) { channel = FindChannelByStreamOut(strrst->strreset_stream_list[i]); if (channel != nullptr && channel->mStreamOut != INVALID_STREAM) { + LOG(("Channel %d outgoing/%d incoming closed", + channel->mStreamOut,channel->mStreamIn)); mStreamsOut[channel->mStreamOut] = nullptr; channel->mStreamOut = INVALID_STREAM; if (channel->mStreamIn == INVALID_STREAM) { @@ -1484,7 +1488,7 @@ DataChannelConnection::HandleStreamChangeEvent(const struct sctp_stream_change_e { uint16_t streamOut; uint32_t i; - DataChannel *channel; + nsRefPtr channel; if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) { LOG(("*** Failed increasing number of streams from %u (%u/%u)", @@ -1531,18 +1535,19 @@ DataChannelConnection::HandleStreamChangeEvent(const struct sctp_stream_change_e // Can't copy nsDeque's. Move into temp array since any that fail will // go back to mPending nsDeque temp; - while (nullptr != (channel = static_cast(mPending.PopFront()))) { - temp.Push(channel); + DataChannel *temp_channel; // really already_AddRefed<> + while (nullptr != (temp_channel = static_cast(mPending.PopFront()))) { + temp.Push(static_cast(temp_channel)); } // Now assign our new streams - while (nullptr != (channel = static_cast(temp.PopFront()))) { + while (nullptr != (channel = dont_AddRef(static_cast(temp.PopFront())))) { if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_RSP) { channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_RSP; - OpenResponseFinish(channel); // may reset the flag and re-push + OpenResponseFinish(channel.forget()); // may reset the flag and re-push } else if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) { channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_OPEN; - OpenFinish(channel); // may reset the flag and re-push + OpenFinish(channel.forget()); // may reset the flag and re-push } } } @@ -1660,12 +1665,11 @@ DataChannelConnection::ReceiveCallback(struct socket* sock, void *data, size_t d return 1; } -DataChannel * +already_AddRefed DataChannelConnection::Open(const nsACString& label, Type type, bool inOrder, uint32_t prValue, DataChannelListener *aListener, nsISupports *aContext) { - DataChannel *channel; uint16_t prPolicy = SCTP_PR_SCTP_NONE; uint32_t flags; @@ -1687,25 +1691,27 @@ DataChannelConnection::Open(const nsACString& label, Type type, bool inOrder, } flags = !inOrder ? DATA_CHANNEL_FLAG_OUT_OF_ORDER_ALLOWED : 0; - channel = new DataChannel(this, INVALID_STREAM, INVALID_STREAM, - DataChannel::CONNECTING, - label, type, prValue, - flags, - aListener, aContext); // infallible malloc + nsRefPtr channel(new DataChannel(this, + INVALID_STREAM, INVALID_STREAM, + DataChannel::CONNECTING, + label, type, prValue, + flags, + aListener, aContext)); MutexAutoLock lock(mLock); // OpenFinish assumes this - return OpenFinish(channel); + return OpenFinish(channel.forget()); } // Separate routine so we can also call it to finish up from pending opens -DataChannel * -DataChannelConnection::OpenFinish(DataChannel *channel) +already_AddRefed +DataChannelConnection::OpenFinish(already_AddRefed aChannel) { uint16_t streamOut = FindFreeStreamOut(); // may be INVALID_STREAM! + nsRefPtr channel(aChannel); mLock.AssertCurrentThreadOwns(); - LOG(("Finishing open: channel %p, streamOut = %u", channel, streamOut)); + LOG(("Finishing open: channel %p, streamOut = %u", channel.get(), streamOut)); if (streamOut == INVALID_STREAM) { if (!RequestMoreStreamsOut()) { @@ -1713,18 +1719,18 @@ DataChannelConnection::OpenFinish(DataChannel *channel) // We already returned the channel to the app. Mark it closed channel->mState = CLOSED; NS_ERROR("Failed to request more streams"); - return channel; + return channel.forget(); } // we can do this with the lock held because mStreamOut is INVALID_STREAM, // so there's no outbound channel to reset - delete channel; return nullptr; } - LOG(("Queuing channel %p to finish open", channel)); + LOG(("Queuing channel %p to finish open", channel.get())); // Also serves to mark we told the app channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_OPEN; + channel->AddRef(); // we need a ref for the nsDeQue and one to return mPending.Push(channel); - return channel; + return channel.forget(); } mStreamsOut[streamOut] = channel; channel->mStreamOut = streamOut; @@ -1743,11 +1749,10 @@ DataChannelConnection::OpenFinish(DataChannel *channel) channel->mStreamOut = INVALID_STREAM; // we can do this with the lock held because mStreamOut is INVALID_STREAM, // so there's no outbound channel to reset (we didn't sent anything) - delete channel; return nullptr; } } - return channel; + return channel.forget(); } int32_t @@ -1930,14 +1935,15 @@ DataChannelConnection::SendMsgCommon(uint16_t stream, const nsACString &aMsg, void DataChannelConnection::Close(uint16_t streamOut) { - DataChannel *channel; + nsRefPtr channel; // make sure it doesn't go away on us MutexAutoLock lock(mLock); LOG(("Closing stream %d",streamOut)); channel = FindChannelByStreamOut(streamOut); if (channel) { channel->mBufferedData.Clear(); - ResetOutgoingStream(channel->mStreamOut); + if (channel->mStreamOut != INVALID_STREAM) + ResetOutgoingStream(channel->mStreamOut); SendOutgoingStreamReset(); channel->mState = CLOSING; } @@ -1961,9 +1967,15 @@ void DataChannelConnection::CloseAll() } // Clean up any pending opens for channels - DataChannel *channel; - while (nullptr != (channel = static_cast(mPending.PopFront()))) - channel->Close(); + nsRefPtr channel; + while (nullptr != (channel = dont_AddRef(static_cast(mPending.PopFront())))) + channel->Close(); // also releases the ref on each iteration +} + +DataChannel::~DataChannel() +{ + LOG(("Destroying Data channel %d/%d", mStreamOut, mStreamIn)); + Close(); } void diff --git a/netwerk/sctp/datachannel/DataChannel.h b/netwerk/sctp/datachannel/DataChannel.h index d0b950962476..c5305fee0c20 100644 --- a/netwerk/sctp/datachannel/DataChannel.h +++ b/netwerk/sctp/datachannel/DataChannel.h @@ -67,11 +67,11 @@ public: // Called when a DOMString message is received. virtual nsresult OnMessageAvailable(nsISupports *aContext, - const nsACString& message) = 0; + const nsACString& message) = 0; // Called when a binary message is received. virtual nsresult OnBinaryMessageAvailable(nsISupports *aContext, - const nsACString& message) = 0; + const nsACString& message) = 0; // Called when the channel is connected virtual nsresult OnChannelConnected(nsISupports *aContext) = 0; @@ -127,10 +127,11 @@ public: PARTIAL_RELIABLE_TIMED = 2 } Type; - DataChannel *Open(const nsACString& label, - Type type, bool inOrder, - uint32_t prValue, DataChannelListener *aListener, - nsISupports *aContext); + already_AddRefed Open(const nsACString& label, + Type type, bool inOrder, + uint32_t prValue, + DataChannelListener *aListener, + nsISupports *aContext); void Close(uint16_t stream); void CloseAll(); @@ -188,9 +189,8 @@ private: uint32_t len); int32_t SendMsgCommon(uint16_t stream, const nsACString &aMsg, bool isBinary); - DataChannel *OpenFinish(DataChannel *channel); + already_AddRefed OpenFinish(already_AddRefed channel); - void SendOrQueue(DataChannel *aChannel, DataChannelOnMessageAvailable *aMessage); void StartDefer(); bool SendDeferredMessages(); void SendOutgoingStreamReset(); @@ -198,7 +198,7 @@ private: void HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req, size_t length, uint16_t streamIn); - void OpenResponseFinish(DataChannel *channel); + void OpenResponseFinish(already_AddRefed channel); void HandleOpenResponseMessage(const struct rtcweb_datachannel_open_response *rsp, size_t length, uint16_t streamIn); void HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack, @@ -228,9 +228,9 @@ private: // NOTE: while these arrays will auto-expand, increases in the number of // channels available from the stack must be negotiated! - nsAutoTArray mStreamsOut; - nsAutoTArray mStreamsIn; - nsDeque mPending; // Holds DataChannels + nsAutoTArray,16> mStreamsOut; + nsAutoTArray,16> mStreamsIn; + nsDeque mPending; // Holds already_AddRefeds -- careful! // Streams pending reset nsAutoTArray mStreamsResetting; @@ -285,10 +285,9 @@ public: NS_ASSERTION(mConnection,"NULL connection"); } - ~DataChannel() - { - Close(); - } + ~DataChannel(); + + NS_INLINE_DECL_THREADSAFE_REFCOUNTING(DataChannel); // Close this DataChannel. Can be called multiple times. void Close(); @@ -345,6 +344,8 @@ public: void AppReady(); + void SendOrQueue(DataChannelOnMessageAvailable *aMessage); + protected: DataChannelListener *mListener; @@ -465,7 +466,7 @@ private: int32_t mType; // XXX should use union - DataChannel *mChannel; // XXX careful of ownership! + nsRefPtr mChannel; nsRefPtr mConnection; nsCString mData; int32_t mLen;