Bug 807929: Make DataChannel refcounted r=mcmanus

This commit is contained in:
Randell Jesup 2012-11-02 15:28:13 -04:00
parent 039dac9e06
commit 0ce97e7c2d
5 changed files with 102 additions and 88 deletions

View File

@ -49,7 +49,7 @@ class nsDOMDataChannel : public nsDOMEventTargetHelper,
public mozilla::DataChannelListener public mozilla::DataChannelListener
{ {
public: public:
nsDOMDataChannel(mozilla::DataChannel* aDataChannel) nsDOMDataChannel(already_AddRefed<mozilla::DataChannel> aDataChannel)
: mDataChannel(aDataChannel) : mDataChannel(aDataChannel)
, mBinaryType(DC_BINARY_TYPE_BLOB) , mBinaryType(DC_BINARY_TYPE_BLOB)
{} {}
@ -92,7 +92,7 @@ private:
JSContext *aCx); JSContext *aCx);
// Owning reference // Owning reference
nsAutoPtr<mozilla::DataChannel> mDataChannel; nsRefPtr<mozilla::DataChannel> mDataChannel;
nsString mOrigin; nsString mOrigin;
enum enum
{ {
@ -492,7 +492,7 @@ nsDOMDataChannel::AppReady()
/* static */ /* static */
nsresult nsresult
NS_NewDOMDataChannel(mozilla::DataChannel* aDataChannel, NS_NewDOMDataChannel(already_AddRefed<mozilla::DataChannel> aDataChannel,
nsPIDOMWindow* aWindow, nsPIDOMWindow* aWindow,
nsIDOMDataChannel** aDomDataChannel) nsIDOMDataChannel** aDomDataChannel)
{ {

View File

@ -10,6 +10,7 @@
// This defines only what's necessary to create nsDOMDataChannels, since this // This defines only what's necessary to create nsDOMDataChannels, since this
// gets used with MOZ_INTERNAL_API not set for media/webrtc/signaling/testing // gets used with MOZ_INTERNAL_API not set for media/webrtc/signaling/testing
#include "nsCOMPtr.h"
#include "nsIDOMDataChannel.h" #include "nsIDOMDataChannel.h"
namespace mozilla { namespace mozilla {
@ -19,7 +20,7 @@ namespace mozilla {
class nsPIDOMWindow; class nsPIDOMWindow;
nsresult nsresult
NS_NewDOMDataChannel(mozilla::DataChannel* dataChannel, NS_NewDOMDataChannel(already_AddRefed<mozilla::DataChannel> dataChannel,
nsPIDOMWindow* aWindow, nsPIDOMWindow* aWindow,
nsIDOMDataChannel** domDataChannel); nsIDOMDataChannel** domDataChannel);

View File

@ -644,7 +644,7 @@ PeerConnectionImpl::CreateDataChannel(const nsACString& aLabel,
MOZ_ASSERT(aRetval); MOZ_ASSERT(aRetval);
#ifdef MOZILLA_INTERNAL_API #ifdef MOZILLA_INTERNAL_API
mozilla::DataChannel* dataChannel; nsRefPtr<mozilla::DataChannel> dataChannel;
mozilla::DataChannelConnection::Type theType = mozilla::DataChannelConnection::Type theType =
static_cast<mozilla::DataChannelConnection::Type>(aType); static_cast<mozilla::DataChannelConnection::Type>(aType);
@ -661,7 +661,7 @@ PeerConnectionImpl::CreateDataChannel(const nsACString& aLabel,
CSFLogDebugS(logTag, __FUNCTION__ << ": making DOMDataChannel"); CSFLogDebugS(logTag, __FUNCTION__ << ": making DOMDataChannel");
return NS_NewDOMDataChannel(dataChannel, mWindow, aRetval); return NS_NewDOMDataChannel(dataChannel.forget(), mWindow, aRetval);
#else #else
return NS_OK; return NS_OK;
#endif #endif

View File

@ -621,6 +621,12 @@ DataChannelConnection::FindFreeStreamOut()
limit = MAX_NUM_STREAMS; limit = MAX_NUM_STREAMS;
for (i = 0; i < limit; ++i) { for (i = 0; i < limit; ++i) {
if (!mStreamsOut[i]) { if (!mStreamsOut[i]) {
// Verify it's not still in the process of closing
for (uint32_t j = 0; j < mStreamsResetting.Length(); ++j) {
if (mStreamsResetting[j] == i) {
continue;
}
}
break; break;
} }
} }
@ -760,7 +766,7 @@ bool
DataChannelConnection::SendDeferredMessages() DataChannelConnection::SendDeferredMessages()
{ {
uint32_t i; uint32_t i;
DataChannel *channel; nsRefPtr<DataChannel> channel; // we may null out the refs to this
bool still_blocked = false; bool still_blocked = false;
bool sent = false; bool sent = false;
@ -814,7 +820,6 @@ DataChannelConnection::SendDeferredMessages()
// delete the channel. // delete the channel.
mStreamsIn[channel->mStreamIn] = nullptr; mStreamsIn[channel->mStreamIn] = nullptr;
mStreamsOut[channel->mStreamOut] = nullptr; mStreamsOut[channel->mStreamOut] = nullptr;
delete channel;
} }
} }
} }
@ -899,7 +904,7 @@ DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_
size_t length, size_t length,
uint16_t streamIn) uint16_t streamIn)
{ {
DataChannel *channel; nsRefPtr<DataChannel> channel;
uint32_t prValue; uint32_t prValue;
uint16_t prPolicy; uint16_t prPolicy;
uint32_t flags; uint32_t flags;
@ -929,7 +934,8 @@ DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_
} }
prValue = ntohs(req->reliability_params); prValue = ntohs(req->reliability_params);
flags = ntohs(req->flags) & DATA_CHANNEL_FLAG_OUT_OF_ORDER_ALLOWED; flags = ntohs(req->flags) & DATA_CHANNEL_FLAG_OUT_OF_ORDER_ALLOWED;
channel = new DataChannel(this, INVALID_STREAM, streamIn, channel = new DataChannel(this,
INVALID_STREAM, streamIn,
DataChannel::CONNECTING, DataChannel::CONNECTING,
label, label,
prPolicy, prValue, prPolicy, prValue,
@ -937,17 +943,18 @@ DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_
nullptr, nullptr); nullptr, nullptr);
mStreamsIn[streamIn] = channel; mStreamsIn[streamIn] = channel;
OpenResponseFinish(channel); OpenResponseFinish(channel.forget());
} }
void void
DataChannelConnection::OpenResponseFinish(DataChannel *channel) DataChannelConnection::OpenResponseFinish(already_AddRefed<DataChannel> aChannel)
{ {
nsRefPtr<DataChannel> channel(aChannel);
uint16_t streamOut = FindFreeStreamOut(); // may be INVALID_STREAM! uint16_t streamOut = FindFreeStreamOut(); // may be INVALID_STREAM!
mLock.AssertCurrentThreadOwns(); mLock.AssertCurrentThreadOwns();
LOG(("Finished response: channel %p, streamOut = %u", channel, streamOut)); LOG(("Finished response: channel %p, streamOut = %u", channel.get(), streamOut));
if (streamOut == INVALID_STREAM) { if (streamOut == INVALID_STREAM) {
if (!RequestMoreStreamsOut()) { if (!RequestMoreStreamsOut()) {
@ -955,23 +962,22 @@ DataChannelConnection::OpenResponseFinish(DataChannel *channel)
mStreamsIn[channel->mStreamIn] = nullptr; mStreamsIn[channel->mStreamIn] = nullptr;
// we can do this with the lock held because mStreamOut is INVALID_STREAM, // we can do this with the lock held because mStreamOut is INVALID_STREAM,
// so there's no outbound channel to reset // so there's no outbound channel to reset
delete channel;
return; return;
} }
LOG(("Queuing channel %p to finish response", channel)); LOG(("Queuing channel %d to finish response", channel->mStreamIn));
channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_RSP; channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_RSP;
mPending.Push(channel); DataChannel *temp = channel.get(); // Can't cast away already_AddRefed<> from channel.forget()
channel.forget();
mPending.Push(temp);
// can't notify the user until we can send an OpenResponse // can't notify the user until we can send an OpenResponse
} else { } else {
channel->mStreamOut = streamOut; channel->mStreamOut = streamOut;
mStreamsOut[streamOut] = channel; mStreamsOut[streamOut] = channel;
if (SendOpenResponseMessage(streamOut, channel->mStreamIn)) { if (SendOpenResponseMessage(streamOut, channel->mStreamIn)) {
LOG(("successful incoming open of '%s' in: %u, out: %u\n",
channel->mLabel.get(), channel->mStreamIn, streamOut));
/* Notify ondatachannel */ /* Notify ondatachannel */
// XXX We need to make sure connection sticks around until the message is delivered // XXX We need to make sure connection sticks around until the message is delivered
LOG(("%s: sending ON_CHANNEL_CREATED for %p", __FUNCTION__, channel)); LOG(("%s: sending ON_CHANNEL_CREATED for %s: %d/%d", __FUNCTION__,
channel->mLabel.get(), streamOut, channel->mStreamIn));
NS_DispatchToMainThread(new DataChannelOnMessageAvailable( NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CHANNEL_CREATED, DataChannelOnMessageAvailable::ON_CHANNEL_CREATED,
this, channel)); this, channel));
@ -986,7 +992,6 @@ DataChannelConnection::OpenResponseFinish(DataChannel *channel)
channel->mStreamOut = INVALID_STREAM; channel->mStreamOut = INVALID_STREAM;
// we can do this with the lock held because mStreamOut is INVALID_STREAM, // we can do this with the lock held because mStreamOut is INVALID_STREAM,
// so there's no outbound channel to reset (we failed to send on it) // so there's no outbound channel to reset (we failed to send on it)
delete channel;
return; // paranoia against future changes since we unlocked return; // paranoia against future changes since we unlocked
} }
} }
@ -1006,7 +1011,7 @@ DataChannelConnection::HandleOpenResponseMessage(const struct rtcweb_datachannel
streamOut = ntohs(rsp->reverse_stream); streamOut = ntohs(rsp->reverse_stream);
channel = FindChannelByStreamOut(streamOut); channel = FindChannelByStreamOut(streamOut);
NS_ENSURE_TRUE(channel != nullptr, /* */); NS_ENSURE_TRUE(channel, /* */);
NS_ENSURE_TRUE(channel->mState == CONNECTING, /* */); NS_ENSURE_TRUE(channel->mState == CONNECTING, /* */);
if (rsp->error) { if (rsp->error) {
@ -1044,7 +1049,7 @@ DataChannelConnection::HandleOpenAckMessage(const struct rtcweb_datachannel_ack
channel = FindChannelByStreamIn(streamIn); channel = FindChannelByStreamIn(streamIn);
NS_ENSURE_TRUE(channel != nullptr, /* */); NS_ENSURE_TRUE(channel, /* */);
NS_ENSURE_TRUE(channel->mState == CONNECTING, /* */); NS_ENSURE_TRUE(channel->mState == CONNECTING, /* */);
channel->mState = channel->mReady ? DataChannel::OPEN : DataChannel::WAITING_TO_OPEN; channel->mState = channel->mReady ? DataChannel::OPEN : DataChannel::WAITING_TO_OPEN;
@ -1079,7 +1084,7 @@ DataChannelConnection::HandleDataMessage(uint32_t ppid,
channel = FindChannelByStreamIn(streamIn); channel = FindChannelByStreamIn(streamIn);
// XXX A closed channel may trip this... check // XXX A closed channel may trip this... check
NS_ENSURE_TRUE(channel != nullptr, /* */); NS_ENSURE_TRUE(channel, /* */);
NS_ENSURE_TRUE(channel->mState != CONNECTING, /* */); NS_ENSURE_TRUE(channel->mState != CONNECTING, /* */);
// XXX should this be a simple if, no warnings/debugbreaks? // XXX should this be a simple if, no warnings/debugbreaks?
@ -1113,11 +1118,10 @@ DataChannelConnection::HandleDataMessage(uint32_t ppid,
if (!channel->mBinaryBuffer.IsEmpty()) { if (!channel->mBinaryBuffer.IsEmpty()) {
channel->mBinaryBuffer += recvData; channel->mBinaryBuffer += recvData;
LOG(("%s: sending ON_DATA (binary fragmented) for %p", __FUNCTION__, channel)); LOG(("%s: sending ON_DATA (binary fragmented) for %p", __FUNCTION__, channel));
SendOrQueue(channel, channel->SendOrQueue(new DataChannelOnMessageAvailable(
new DataChannelOnMessageAvailable( DataChannelOnMessageAvailable::ON_DATA, this,
DataChannelOnMessageAvailable::ON_DATA, this, channel, channel->mBinaryBuffer,
channel, channel->mBinaryBuffer, channel->mBinaryBuffer.Length()));
channel->mBinaryBuffer.Length()));
channel->mBinaryBuffer.Truncate(0); channel->mBinaryBuffer.Truncate(0);
return; return;
} }
@ -1130,24 +1134,19 @@ DataChannelConnection::HandleDataMessage(uint32_t ppid,
} }
/* Notify onmessage */ /* Notify onmessage */
LOG(("%s: sending ON_DATA for %p", __FUNCTION__, channel)); LOG(("%s: sending ON_DATA for %p", __FUNCTION__, channel));
SendOrQueue(channel, channel->SendOrQueue(new DataChannelOnMessageAvailable(
new DataChannelOnMessageAvailable( DataChannelOnMessageAvailable::ON_DATA, this,
DataChannelOnMessageAvailable::ON_DATA, this, channel, recvData, length));
channel, recvData, length));
} }
} }
// Called with mLock locked! // Called with mLock locked!
void void
DataChannelConnection::SendOrQueue(DataChannel *aChannel, DataChannel::SendOrQueue(DataChannelOnMessageAvailable *aMessage)
DataChannelOnMessageAvailable *aMessage)
{ {
mLock.AssertCurrentThreadOwns(); if (!mReady &&
(mState == CONNECTING || mState == WAITING_TO_OPEN)) {
if (!aChannel->mReady && mQueuedMessages.AppendElement(aMessage);
(aChannel->mState == DataChannel::CONNECTING ||
aChannel->mState == DataChannel::WAITING_TO_OPEN)) {
aChannel->mQueuedMessages.AppendElement(aMessage);
} else { } else {
NS_DispatchToMainThread(aMessage); NS_DispatchToMainThread(aMessage);
} }
@ -1394,6 +1393,7 @@ DataChannelConnection::ResetOutgoingStream(uint16_t streamOut)
uint32_t i; uint32_t i;
mLock.AssertCurrentThreadOwns(); mLock.AssertCurrentThreadOwns();
LOG(("Resetting outgoing stream %d",streamOut));
// Rarely has more than a couple items and only for a short time // Rarely has more than a couple items and only for a short time
for (i = 0; i < mStreamsResetting.Length(); ++i) { for (i = 0; i < mStreamsResetting.Length(); ++i) {
if (mStreamsResetting[i] == streamOut) { if (mStreamsResetting[i] == streamOut) {
@ -1434,7 +1434,7 @@ void
DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_event *strrst) DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_event *strrst)
{ {
uint32_t n, i; uint32_t n, i;
DataChannel *channel; nsRefPtr<DataChannel> channel; // since we may null out the ref to the channel
if (!(strrst->strreset_flags & SCTP_STREAM_RESET_DENIED) && if (!(strrst->strreset_flags & SCTP_STREAM_RESET_DENIED) &&
!(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) { !(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) {
@ -1442,7 +1442,9 @@ DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_eve
for (i = 0; i < n; ++i) { for (i = 0; i < n; ++i) {
if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) { if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
channel = FindChannelByStreamIn(strrst->strreset_stream_list[i]); channel = FindChannelByStreamIn(strrst->strreset_stream_list[i]);
if (channel != nullptr) { if (channel) {
LOG(("Channel %d outgoing/%d incoming closed",
channel->mStreamOut,channel->mStreamIn));
mStreamsIn[channel->mStreamIn] = nullptr; mStreamsIn[channel->mStreamIn] = nullptr;
channel->mStreamIn = INVALID_STREAM; channel->mStreamIn = INVALID_STREAM;
if (channel->mStreamOut == INVALID_STREAM) { if (channel->mStreamOut == INVALID_STREAM) {
@ -1462,6 +1464,8 @@ DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_eve
if (strrst->strreset_flags & SCTP_STREAM_RESET_OUTGOING_SSN) { if (strrst->strreset_flags & SCTP_STREAM_RESET_OUTGOING_SSN) {
channel = FindChannelByStreamOut(strrst->strreset_stream_list[i]); channel = FindChannelByStreamOut(strrst->strreset_stream_list[i]);
if (channel != nullptr && channel->mStreamOut != INVALID_STREAM) { if (channel != nullptr && channel->mStreamOut != INVALID_STREAM) {
LOG(("Channel %d outgoing/%d incoming closed",
channel->mStreamOut,channel->mStreamIn));
mStreamsOut[channel->mStreamOut] = nullptr; mStreamsOut[channel->mStreamOut] = nullptr;
channel->mStreamOut = INVALID_STREAM; channel->mStreamOut = INVALID_STREAM;
if (channel->mStreamIn == INVALID_STREAM) { if (channel->mStreamIn == INVALID_STREAM) {
@ -1484,7 +1488,7 @@ DataChannelConnection::HandleStreamChangeEvent(const struct sctp_stream_change_e
{ {
uint16_t streamOut; uint16_t streamOut;
uint32_t i; uint32_t i;
DataChannel *channel; nsRefPtr<DataChannel> channel;
if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) { if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) {
LOG(("*** Failed increasing number of streams from %u (%u/%u)", LOG(("*** Failed increasing number of streams from %u (%u/%u)",
@ -1531,18 +1535,19 @@ DataChannelConnection::HandleStreamChangeEvent(const struct sctp_stream_change_e
// Can't copy nsDeque's. Move into temp array since any that fail will // Can't copy nsDeque's. Move into temp array since any that fail will
// go back to mPending // go back to mPending
nsDeque temp; nsDeque temp;
while (nullptr != (channel = static_cast<DataChannel *>(mPending.PopFront()))) { DataChannel *temp_channel; // really already_AddRefed<>
temp.Push(channel); while (nullptr != (temp_channel = static_cast<DataChannel *>(mPending.PopFront()))) {
temp.Push(static_cast<void *>(temp_channel));
} }
// Now assign our new streams // Now assign our new streams
while (nullptr != (channel = static_cast<DataChannel *>(temp.PopFront()))) { while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(temp.PopFront())))) {
if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_RSP) { if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_RSP) {
channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_RSP; channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_RSP;
OpenResponseFinish(channel); // may reset the flag and re-push OpenResponseFinish(channel.forget()); // may reset the flag and re-push
} else if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) { } else if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_OPEN; channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_OPEN;
OpenFinish(channel); // may reset the flag and re-push OpenFinish(channel.forget()); // may reset the flag and re-push
} }
} }
} }
@ -1660,12 +1665,11 @@ DataChannelConnection::ReceiveCallback(struct socket* sock, void *data, size_t d
return 1; return 1;
} }
DataChannel * already_AddRefed<DataChannel>
DataChannelConnection::Open(const nsACString& label, Type type, bool inOrder, DataChannelConnection::Open(const nsACString& label, Type type, bool inOrder,
uint32_t prValue, DataChannelListener *aListener, uint32_t prValue, DataChannelListener *aListener,
nsISupports *aContext) nsISupports *aContext)
{ {
DataChannel *channel;
uint16_t prPolicy = SCTP_PR_SCTP_NONE; uint16_t prPolicy = SCTP_PR_SCTP_NONE;
uint32_t flags; uint32_t flags;
@ -1687,25 +1691,27 @@ DataChannelConnection::Open(const nsACString& label, Type type, bool inOrder,
} }
flags = !inOrder ? DATA_CHANNEL_FLAG_OUT_OF_ORDER_ALLOWED : 0; flags = !inOrder ? DATA_CHANNEL_FLAG_OUT_OF_ORDER_ALLOWED : 0;
channel = new DataChannel(this, INVALID_STREAM, INVALID_STREAM, nsRefPtr<DataChannel> channel(new DataChannel(this,
DataChannel::CONNECTING, INVALID_STREAM, INVALID_STREAM,
label, type, prValue, DataChannel::CONNECTING,
flags, label, type, prValue,
aListener, aContext); // infallible malloc flags,
aListener, aContext));
MutexAutoLock lock(mLock); // OpenFinish assumes this MutexAutoLock lock(mLock); // OpenFinish assumes this
return OpenFinish(channel); return OpenFinish(channel.forget());
} }
// Separate routine so we can also call it to finish up from pending opens // Separate routine so we can also call it to finish up from pending opens
DataChannel * already_AddRefed<DataChannel>
DataChannelConnection::OpenFinish(DataChannel *channel) DataChannelConnection::OpenFinish(already_AddRefed<DataChannel> aChannel)
{ {
uint16_t streamOut = FindFreeStreamOut(); // may be INVALID_STREAM! uint16_t streamOut = FindFreeStreamOut(); // may be INVALID_STREAM!
nsRefPtr<DataChannel> channel(aChannel);
mLock.AssertCurrentThreadOwns(); mLock.AssertCurrentThreadOwns();
LOG(("Finishing open: channel %p, streamOut = %u", channel, streamOut)); LOG(("Finishing open: channel %p, streamOut = %u", channel.get(), streamOut));
if (streamOut == INVALID_STREAM) { if (streamOut == INVALID_STREAM) {
if (!RequestMoreStreamsOut()) { if (!RequestMoreStreamsOut()) {
@ -1713,18 +1719,18 @@ DataChannelConnection::OpenFinish(DataChannel *channel)
// We already returned the channel to the app. Mark it closed // We already returned the channel to the app. Mark it closed
channel->mState = CLOSED; channel->mState = CLOSED;
NS_ERROR("Failed to request more streams"); NS_ERROR("Failed to request more streams");
return channel; return channel.forget();
} }
// we can do this with the lock held because mStreamOut is INVALID_STREAM, // we can do this with the lock held because mStreamOut is INVALID_STREAM,
// so there's no outbound channel to reset // so there's no outbound channel to reset
delete channel;
return nullptr; return nullptr;
} }
LOG(("Queuing channel %p to finish open", channel)); LOG(("Queuing channel %p to finish open", channel.get()));
// Also serves to mark we told the app // Also serves to mark we told the app
channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_OPEN; channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_OPEN;
channel->AddRef(); // we need a ref for the nsDeQue and one to return
mPending.Push(channel); mPending.Push(channel);
return channel; return channel.forget();
} }
mStreamsOut[streamOut] = channel; mStreamsOut[streamOut] = channel;
channel->mStreamOut = streamOut; channel->mStreamOut = streamOut;
@ -1743,11 +1749,10 @@ DataChannelConnection::OpenFinish(DataChannel *channel)
channel->mStreamOut = INVALID_STREAM; channel->mStreamOut = INVALID_STREAM;
// we can do this with the lock held because mStreamOut is INVALID_STREAM, // we can do this with the lock held because mStreamOut is INVALID_STREAM,
// so there's no outbound channel to reset (we didn't sent anything) // so there's no outbound channel to reset (we didn't sent anything)
delete channel;
return nullptr; return nullptr;
} }
} }
return channel; return channel.forget();
} }
int32_t int32_t
@ -1930,14 +1935,15 @@ DataChannelConnection::SendMsgCommon(uint16_t stream, const nsACString &aMsg,
void void
DataChannelConnection::Close(uint16_t streamOut) DataChannelConnection::Close(uint16_t streamOut)
{ {
DataChannel *channel; nsRefPtr<DataChannel> channel; // make sure it doesn't go away on us
MutexAutoLock lock(mLock); MutexAutoLock lock(mLock);
LOG(("Closing stream %d",streamOut)); LOG(("Closing stream %d",streamOut));
channel = FindChannelByStreamOut(streamOut); channel = FindChannelByStreamOut(streamOut);
if (channel) { if (channel) {
channel->mBufferedData.Clear(); channel->mBufferedData.Clear();
ResetOutgoingStream(channel->mStreamOut); if (channel->mStreamOut != INVALID_STREAM)
ResetOutgoingStream(channel->mStreamOut);
SendOutgoingStreamReset(); SendOutgoingStreamReset();
channel->mState = CLOSING; channel->mState = CLOSING;
} }
@ -1961,9 +1967,15 @@ void DataChannelConnection::CloseAll()
} }
// Clean up any pending opens for channels // Clean up any pending opens for channels
DataChannel *channel; nsRefPtr<DataChannel> channel;
while (nullptr != (channel = static_cast<DataChannel *>(mPending.PopFront()))) while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(mPending.PopFront()))))
channel->Close(); channel->Close(); // also releases the ref on each iteration
}
DataChannel::~DataChannel()
{
LOG(("Destroying Data channel %d/%d", mStreamOut, mStreamIn));
Close();
} }
void void

View File

@ -67,11 +67,11 @@ public:
// Called when a DOMString message is received. // Called when a DOMString message is received.
virtual nsresult OnMessageAvailable(nsISupports *aContext, virtual nsresult OnMessageAvailable(nsISupports *aContext,
const nsACString& message) = 0; const nsACString& message) = 0;
// Called when a binary message is received. // Called when a binary message is received.
virtual nsresult OnBinaryMessageAvailable(nsISupports *aContext, virtual nsresult OnBinaryMessageAvailable(nsISupports *aContext,
const nsACString& message) = 0; const nsACString& message) = 0;
// Called when the channel is connected // Called when the channel is connected
virtual nsresult OnChannelConnected(nsISupports *aContext) = 0; virtual nsresult OnChannelConnected(nsISupports *aContext) = 0;
@ -127,10 +127,11 @@ public:
PARTIAL_RELIABLE_TIMED = 2 PARTIAL_RELIABLE_TIMED = 2
} Type; } Type;
DataChannel *Open(const nsACString& label, already_AddRefed<DataChannel> Open(const nsACString& label,
Type type, bool inOrder, Type type, bool inOrder,
uint32_t prValue, DataChannelListener *aListener, uint32_t prValue,
nsISupports *aContext); DataChannelListener *aListener,
nsISupports *aContext);
void Close(uint16_t stream); void Close(uint16_t stream);
void CloseAll(); void CloseAll();
@ -188,9 +189,8 @@ private:
uint32_t len); uint32_t len);
int32_t SendMsgCommon(uint16_t stream, const nsACString &aMsg, bool isBinary); int32_t SendMsgCommon(uint16_t stream, const nsACString &aMsg, bool isBinary);
DataChannel *OpenFinish(DataChannel *channel); already_AddRefed<DataChannel> OpenFinish(already_AddRefed<DataChannel> channel);
void SendOrQueue(DataChannel *aChannel, DataChannelOnMessageAvailable *aMessage);
void StartDefer(); void StartDefer();
bool SendDeferredMessages(); bool SendDeferredMessages();
void SendOutgoingStreamReset(); void SendOutgoingStreamReset();
@ -198,7 +198,7 @@ private:
void HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req, void HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req,
size_t length, size_t length,
uint16_t streamIn); uint16_t streamIn);
void OpenResponseFinish(DataChannel *channel); void OpenResponseFinish(already_AddRefed<DataChannel> channel);
void HandleOpenResponseMessage(const struct rtcweb_datachannel_open_response *rsp, void HandleOpenResponseMessage(const struct rtcweb_datachannel_open_response *rsp,
size_t length, uint16_t streamIn); size_t length, uint16_t streamIn);
void HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack, void HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack,
@ -228,9 +228,9 @@ private:
// NOTE: while these arrays will auto-expand, increases in the number of // NOTE: while these arrays will auto-expand, increases in the number of
// channels available from the stack must be negotiated! // channels available from the stack must be negotiated!
nsAutoTArray<DataChannel*,16> mStreamsOut; nsAutoTArray<nsRefPtr<DataChannel>,16> mStreamsOut;
nsAutoTArray<DataChannel*,16> mStreamsIn; nsAutoTArray<nsRefPtr<DataChannel>,16> mStreamsIn;
nsDeque mPending; // Holds DataChannels nsDeque mPending; // Holds already_AddRefed<DataChannel>s -- careful!
// Streams pending reset // Streams pending reset
nsAutoTArray<uint16_t,4> mStreamsResetting; nsAutoTArray<uint16_t,4> mStreamsResetting;
@ -285,10 +285,9 @@ public:
NS_ASSERTION(mConnection,"NULL connection"); NS_ASSERTION(mConnection,"NULL connection");
} }
~DataChannel() ~DataChannel();
{
Close(); NS_INLINE_DECL_THREADSAFE_REFCOUNTING(DataChannel);
}
// Close this DataChannel. Can be called multiple times. // Close this DataChannel. Can be called multiple times.
void Close(); void Close();
@ -345,6 +344,8 @@ public:
void AppReady(); void AppReady();
void SendOrQueue(DataChannelOnMessageAvailable *aMessage);
protected: protected:
DataChannelListener *mListener; DataChannelListener *mListener;
@ -465,7 +466,7 @@ private:
int32_t mType; int32_t mType;
// XXX should use union // XXX should use union
DataChannel *mChannel; // XXX careful of ownership! nsRefPtr<DataChannel> mChannel;
nsRefPtr<DataChannelConnection> mConnection; nsRefPtr<DataChannelConnection> mConnection;
nsCString mData; nsCString mData;
int32_t mLen; int32_t mLen;