Bug 806375: cleanup DataChannel, esp. channel close and connection shutdown r=mcmanus

This commit is contained in:
Randell Jesup 2012-12-13 23:30:11 -05:00
parent 8d7e7dd8d7
commit 5e38742f08
3 changed files with 223 additions and 113 deletions

View File

@ -480,10 +480,15 @@ PeerConnectionImpl::ConnectDataConnection(uint16_t aLocalport,
return NS_ERROR_FAILURE;
}
// XXX Fix! Get the correct flow for DataChannel. Also error handling.
nsRefPtr<TransportFlow> flow = mMedia->GetTransportFlow(1,false).get();
CSFLogDebugS(logTag, "Transportflow[1] = " << flow.get());
if (!mDataConnection->ConnectDTLS(flow, aLocalport, aRemoteport)) {
return NS_ERROR_FAILURE;
for (int i = 2; i >= 0; i--) {
nsRefPtr<TransportFlow> flow = mMedia->GetTransportFlow(i,false).get();
CSFLogDebugS(logTag, "Transportflow[" << i << "] = " << flow.get());
if (flow) {
if (!mDataConnection->ConnectDTLS(flow, aLocalport, aRemoteport)) {
return NS_ERROR_FAILURE;
}
break;
}
}
return NS_OK;
#else
@ -920,6 +925,7 @@ PeerConnectionImpl::CheckApiState(bool assert_ice_ready) const
NS_IMETHODIMP
PeerConnectionImpl::Close(bool aIsSynchronous)
{
CSFLogDebugS(logTag, __FUNCTION__);
PC_AUTO_ENTER_API_CALL(false);
return CloseInt(aIsSynchronous);
@ -934,8 +940,10 @@ PeerConnectionImpl::CloseInt(bool aIsSynchronous)
if (mCall != nullptr)
mCall->endCall();
#ifdef MOZILLA_INTERNAL_API
if (mDataConnection)
mDataConnection->CloseAll();
if (mDataConnection) {
mDataConnection->Destroy();
mDataConnection = nullptr; // it may not go away until the runnables are dead
}
#endif
ShutdownMedia(aIsSynchronous);

View File

@ -46,7 +46,7 @@ static bool sctp_initialized;
namespace mozilla {
class DataChannelShutdown;
nsCOMPtr<DataChannelShutdown> gDataChannelShutdown;
nsRefPtr<DataChannelShutdown> gDataChannelShutdown;
class DataChannelShutdown : public nsIObserver
{
@ -61,7 +61,7 @@ public:
DataChannelShutdown() {}
void Init()
void Init()
{
nsCOMPtr<nsIObserverService> observerService =
mozilla::services::GetObserverService();
@ -91,6 +91,18 @@ public:
usrsctp_finish();
sctp_initialized = false;
}
nsCOMPtr<nsIObserverService> observerService =
mozilla::services::GetObserverService();
if (!observerService)
return NS_ERROR_FAILURE;
nsresult rv = observerService->RemoveObserver(this,
"profile-change-net-teardown");
MOZ_ASSERT(rv == NS_OK);
(void) rv;
nsRefPtr<DataChannelShutdown> kungFuDeathGrip(this);
gDataChannelShutdown = nullptr;
}
return NS_OK;
}
@ -140,17 +152,31 @@ DataChannelConnection::DataChannelConnection(DataConnectionListener *listener) :
DataChannelConnection::~DataChannelConnection()
{
// XXX Move CloseAll() to a Destroy() call
// This may die on the MainThread, or on the STS thread
MOZ_ASSERT(mState == CLOSED);
MOZ_ASSERT(!mMasterSocket);
}
void
DataChannelConnection::Destroy()
{
// Though it's probably ok to do this and close the sockets;
// if we really want it to do true clean shutdowns it can
// create a dependant Internal object that would remain around
// until the network shut down the association or timed out.
LOG(("Destroying DataChannelConnection"));
LOG(("Destroying DataChannelConnection %p", (void *) this));
CloseAll();
if (mSocket && mSocket != mMasterSocket)
usrsctp_close(mSocket);
if (mMasterSocket)
usrsctp_close(mMasterSocket);
mSocket = nullptr;
mMasterSocket = nullptr;
// We can't get any more new callbacks from the SCTP library
// All existing callbacks have refs to DataChannelConnection
}
NS_IMPL_THREADSAFE_ISUPPORTS1(DataChannelConnection,
@ -190,7 +216,10 @@ DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aUs
usrsctp_init(aPort, nullptr);
}
usrsctp_sysctl_set_sctp_debug_on(0 /* SCTP_DEBUG_ALL */);
// Set logging to datachannel:6 to get SCTP debugs
#ifdef PR_LOGGING
usrsctp_sysctl_set_sctp_debug_on(GetDataChannelLog()->level > 5 ? SCTP_DEBUG_ALL : 0);
#endif
usrsctp_sysctl_set_sctp_blackhole(2);
sctp_initialized = true;
@ -198,14 +227,14 @@ DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aUs
gDataChannelShutdown->Init();
}
}
// XXX FIX! make this a global we get once
// Find the STS thread
nsresult rv;
mSTS = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
MOZ_ASSERT(NS_SUCCEEDED(rv));
nsresult res;
mSTS = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &res);
MOZ_ASSERT(NS_SUCCEEDED(res));
// Open sctp association across tunnel
// Open sctp with a callback
if ((mMasterSocket = usrsctp_socket(
aUsingDtls ? AF_CONN : AF_INET,
SOCK_STREAM, IPPROTO_SCTP, receive_cb, nullptr, 0, this)) == nullptr) {
@ -222,6 +251,16 @@ DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aUs
LOG(("Couldn't set SO_LINGER on SCTP socket"));
}
// XXX Consider disabling this when we add proper SDP negotiation.
// We may want to leave enabled for supporting 'cloning' of SDP offers, which
// implies re-use of the same pseudo-port number, or forcing a renegotiation.
uint32_t on = 1;
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REUSE_PORT,
(const void *)&on, (socklen_t)sizeof(on)) < 0) {
LOG(("Couldn't set SCTP_REUSE_PORT on SCTP socket"));
}
if (!aUsingDtls) {
memset(&encaps, 0, sizeof(encaps));
encaps.sue_address.ss_family = AF_INET;
@ -294,7 +333,7 @@ DataChannelConnection::StartDefer()
if (!NS_IsMainThread()) {
NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::START_DEFER,
this, nullptr));
this, (DataChannel *) nullptr));
return;
}
@ -342,6 +381,60 @@ DataChannelConnection::Notify(nsITimer *timer)
}
#ifdef MOZ_PEERCONNECTION
class DataChannelConnectRunnable : public nsRunnable
{
public:
DataChannelConnectRunnable(DataChannelConnection *aConnection)
: mConnection(aConnection) {}
NS_IMETHOD Run()
{
struct sockaddr_conn addr;
memset(&addr, 0, sizeof(addr));
addr.sconn_family = AF_CONN;
#if !defined(__Userspace_os_Linux) && !defined(__Userspace_os_Windows)
addr.sconn_len = sizeof(addr);
#endif
addr.sconn_port = htons(mConnection->mLocalPort);
int r = usrsctp_bind(mConnection->mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
sizeof(addr));
if (r < 0) {
LOG(("usrsctp_bind failed: %d", r));
} else {
// This is the remote addr
addr.sconn_port = htons(mConnection->mRemotePort);
addr.sconn_addr = static_cast<void *>(mConnection.get());
r = usrsctp_connect(mConnection->mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
sizeof(addr));
if (r < 0) {
LOG(("usrsctp_connect failed: %d", r));
} else {
// Notify Connection open
LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, mConnection.get()));
mConnection->mSocket = mConnection->mMasterSocket;
mConnection->mState = DataChannelConnection::OPEN;
LOG(("DTLS connect() succeeded! Entering connected mode"));
NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CONNECTION,
mConnection, true));
return NS_OK;
}
}
// on errors, we simply don't notify there was a connection, but we
// want to kill the thread (can we kill ourselves here? That would be better)
NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CONNECTION,
mConnection, false));
return NS_OK;
}
private:
nsRefPtr<DataChannelConnection> mConnection;
};
bool
DataChannelConnection::ConnectDTLS(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport)
{
@ -355,57 +448,10 @@ DataChannelConnection::ConnectDTLS(TransportFlow *aFlow, uint16_t localport, uin
mLocalPort = localport;
mRemotePort = remoteport;
PR_CreateThread(
PR_SYSTEM_THREAD,
DataChannelConnection::DTLSConnectThread, this,
PR_PRIORITY_NORMAL,
PR_GLOBAL_THREAD,
PR_JOINABLE_THREAD, 0
);
nsCOMPtr<nsIRunnable> connect_event = new DataChannelConnectRunnable(this);
nsresult rv = NS_NewThread(getter_AddRefs(mConnectThread), connect_event);
return true; // not finished yet
}
/* static */
void
DataChannelConnection::DTLSConnectThread(void *data)
{
DataChannelConnection *_this = static_cast<DataChannelConnection*>(data);
struct sockaddr_conn addr;
memset(&addr, 0, sizeof(addr));
addr.sconn_family = AF_CONN;
#if defined(__Userspace_os_Darwin)
addr.sconn_len = sizeof(addr);
#endif
addr.sconn_port = htons(_this->mLocalPort);
int r = usrsctp_bind(_this->mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
sizeof(addr));
if (r < 0) {
LOG(("usrsctp_bind failed: %d", r));
return;
}
// This is the remote addr
addr.sconn_port = htons(_this->mRemotePort);
addr.sconn_addr = static_cast<void *>(_this);
r = usrsctp_connect(_this->mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
sizeof(addr));
if (r < 0) {
LOG(("usrsctp_connect failed: %d", r));
return;
}
// Notify Connection open
LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, _this));
_this->mSocket = _this->mMasterSocket;
_this->mState = OPEN;
LOG(("DTLS connect() succeeded! Entering connected mode"));
NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CONNECTION,
_this, nullptr));
return NS_SUCCEEDED(rv);
}
void
@ -452,8 +498,9 @@ DataChannelConnection::SctpDtlsOutput(void *addr, void *buffer, size_t length,
// DISPATCH_SYNC is not fully blocking. This may be tricky, as it
// needs to be a per-thread check, not a global.
peer->mSTS->Dispatch(WrapRunnable(
peer, &DataChannelConnection::SendPacket, data, length, true
), NS_DISPATCH_NORMAL);
nsRefPtr<DataChannelConnection>(peer),
&DataChannelConnection::SendPacket, data, length, true),
NS_DISPATCH_NORMAL);
res = 0; // cheat! Packets can always be dropped later anyways
}
return res;
@ -510,7 +557,7 @@ DataChannelConnection::Listen(unsigned short port)
LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CONNECTION,
this, nullptr));
this, (DataChannel *) nullptr));
return true;
}
@ -586,7 +633,7 @@ DataChannelConnection::Connect(const char *addr, unsigned short port)
LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CONNECTION,
this, nullptr));
this, (DataChannel *) nullptr));
return true;
}
@ -976,7 +1023,7 @@ DataChannelConnection::OpenResponseFinish(already_AddRefed<DataChannel> aChannel
if (SendOpenResponseMessage(streamOut, channel->mStreamIn)) {
/* Notify ondatachannel */
// XXX We need to make sure connection sticks around until the message is delivered
LOG(("%s: sending ON_CHANNEL_CREATED for %s: %d/%d", __FUNCTION__,
LOG(("%s: sending ON_CHANNEL_CREATED for %s: %d/%d", __FUNCTION__,
channel->mLabel.get(), streamOut, channel->mStreamIn));
NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CHANNEL_CREATED,
@ -1393,7 +1440,8 @@ DataChannelConnection::ResetOutgoingStream(uint16_t streamOut)
uint32_t i;
mLock.AssertCurrentThreadOwns();
LOG(("Resetting outgoing stream %d",streamOut));
LOG(("Connection %p: Resetting outgoing stream %d",
(void *) this, streamOut));
// Rarely has more than a couple items and only for a short time
for (i = 0; i < mStreamsResetting.Length(); ++i) {
if (mStreamsResetting[i] == streamOut) {
@ -1410,8 +1458,11 @@ DataChannelConnection::SendOutgoingStreamReset()
uint32_t i;
size_t len;
LOG(("Connection %p: Sending outgoing stream reset for %d streams",
(void *) this, mStreamsResetting.Length()));
mLock.AssertCurrentThreadOwns();
if (mStreamsResetting.IsEmpty() == 0) {
if (mStreamsResetting.IsEmpty()) {
LOG(("No streams to reset"));
return;
}
len = sizeof(sctp_assoc_t) + (2 + mStreamsResetting.Length()) * sizeof(uint16_t);
@ -1443,40 +1494,57 @@ DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_eve
if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
channel = FindChannelByStreamIn(strrst->strreset_stream_list[i]);
if (channel) {
LOG(("Channel %d outgoing/%d incoming closed",
channel->mStreamOut,channel->mStreamIn));
mStreamsIn[channel->mStreamIn] = nullptr;
channel->mStreamIn = INVALID_STREAM;
if (channel->mStreamOut == INVALID_STREAM) {
channel->mPrPolicy = SCTP_PR_SCTP_NONE;
channel->mPrValue = 0;
channel->mFlags = 0;
channel->mState = CLOSED;
// The other side closed the channel
// We could be in three states:
// 1. Normal state (input and output streams (OPEN)
// Notify application, send a RESET in response on our
// outbound channel. Go to CLOSED
// 2. We sent our own reset (CLOSING); either they crossed on the
// wire, or this is a response to our Reset.
// Go to CLOSED
// 3. We've sent a open but haven't gotten a response yet (OPENING)
// I believe this is impossible, as we don't have an input stream yet.
LOG(("Incoming: Channel %d outgoing/%d incoming closed, state %d",
channel->mStreamOut, channel->mStreamIn, channel->mState));
MOZ_ASSERT(channel->mState == OPEN || channel->mState == CLOSING);
if (channel->mState == OPEN) {
ResetOutgoingStream(channel->mStreamOut);
NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
channel));
} else {
ResetOutgoingStream(channel->mStreamOut);
channel->mState = CLOSING;
mStreamsOut[channel->mStreamOut] = nullptr;
}
mStreamsIn[channel->mStreamIn] = nullptr;
LOG(("Disconnected DataChannel %p from connection %p",
(void *) channel.get(), (void *) channel->mConnection.get()));
channel->Destroy();
// At this point when we leave here, the object is a zombie held alive only by the DOM object
} else {
LOG(("Can't find incoming channel %d",i));
}
}
if (strrst->strreset_flags & SCTP_STREAM_RESET_OUTGOING_SSN) {
channel = FindChannelByStreamOut(strrst->strreset_stream_list[i]);
if (channel != nullptr && channel->mStreamOut != INVALID_STREAM) {
LOG(("Channel %d outgoing/%d incoming closed",
channel->mStreamOut,channel->mStreamIn));
mStreamsOut[channel->mStreamOut] = nullptr;
channel->mStreamOut = INVALID_STREAM;
if (channel->mStreamIn == INVALID_STREAM) {
channel->mPrPolicy = SCTP_PR_SCTP_NONE;
channel->mPrValue = 0;
channel->mFlags = 0;
channel->mState = CLOSED;
NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
channel));
if (channel) {
LOG(("Outgoing: Connection %p channel %p streams: %d outgoing/%d incoming closed",
(void *) this, (void *) channel.get(), channel->mStreamOut, channel->mStreamIn));
MOZ_ASSERT(channel->mState == CLOSING);
if (channel->mState == CLOSING) {
mStreamsOut[channel->mStreamOut] = nullptr;
if (channel->mStreamIn != INVALID_STREAM)
mStreamsIn[channel->mStreamIn] = nullptr;
LOG(("Disconnected DataChannel %p from connection %p (refcnt will be %u)",
(void *) channel.get(), (void *) channel->mConnection.get(),
(uint32_t) channel->mConnection->mRefCnt-1));
channel->Destroy();
// At this point when we leave here, the object is a zombie held alive only by the DOM object
}
} else {
LOG(("Can't find outgoing channel %d",i));
}
}
}
@ -1691,7 +1759,7 @@ DataChannelConnection::Open(const nsACString& label, Type type, bool inOrder,
}
flags = !inOrder ? DATA_CHANNEL_FLAG_OUT_OF_ORDER_ALLOWED : 0;
nsRefPtr<DataChannel> channel(new DataChannel(this,
nsRefPtr<DataChannel> channel(new DataChannel(this,
INVALID_STREAM, INVALID_STREAM,
DataChannel::CONNECTING,
label, type, prValue,
@ -1938,14 +2006,17 @@ DataChannelConnection::Close(uint16_t streamOut)
nsRefPtr<DataChannel> channel; // make sure it doesn't go away on us
MutexAutoLock lock(mLock);
LOG(("Closing stream %d",streamOut));
channel = FindChannelByStreamOut(streamOut);
if (channel) {
LOG(("Connection %p/Channel %p: Closing stream %d",
(void *) channel->mConnection.get(), (void *) channel.get(), streamOut));
channel->mBufferedData.Clear();
if (channel->mStreamOut != INVALID_STREAM)
ResetOutgoingStream(channel->mStreamOut);
SendOutgoingStreamReset();
channel->mState = CLOSING;
} else {
LOG(("!!!? no channel when closing stream %d?",streamOut));
}
}
@ -1958,8 +2029,8 @@ void DataChannelConnection::CloseAll()
mState = CLOSED;
// Close current channels
// FIX! if there are runnables, they must use weakrefs or hold a strong
// ref and keep the channel and/or connection alive
// If there are runnables, they hold a strong ref and keep the channel
// and/or connection alive (even if in a CLOSED state)
for (uint32_t i = 0; i < mStreamsOut.Length(); ++i) {
if (mStreamsOut[i]) {
mStreamsOut[i]->Close();
@ -1973,22 +2044,35 @@ void DataChannelConnection::CloseAll()
}
DataChannel::~DataChannel()
{
if (mConnection)
Close();
}
// Used when disconnecting from the DataChannelConnection
void
DataChannel::Destroy()
{
LOG(("Destroying Data channel %d/%d", mStreamOut, mStreamIn));
Close();
MOZ_ASSERT_IF(mStreamOut != INVALID_STREAM,
!mConnection->FindChannelByStreamOut(mStreamOut));
MOZ_ASSERT_IF(mStreamIn != INVALID_STREAM,
!mConnection->FindChannelByStreamIn(mStreamIn));
mStreamIn = INVALID_STREAM;
mStreamOut = INVALID_STREAM;
mState = CLOSED;
mConnection = nullptr;
}
void
DataChannel::Close()
{
{
if (mState == CLOSING || mState == CLOSED ||
mStreamOut == INVALID_STREAM) {
return;
}
mState = CLOSING;
mConnection->Close(mStreamOut);
mStreamOut = INVALID_STREAM;
mStreamIn = INVALID_STREAM;
}
void
@ -2036,4 +2120,3 @@ DataChannel::GetBufferedAmount()
}
} // namespace mozilla

View File

@ -109,6 +109,7 @@ public:
virtual ~DataChannelConnection();
bool Init(unsigned short aPort, uint16_t aNumStreams, bool aUsingDtls);
void Destroy(); // So we can spawn refs tied to runnables in shutdown
// These block; they require something to decide on listener/connector
// (though you can do simultaneous Connect()). Do not call these from
@ -168,6 +169,8 @@ protected:
DataConnectionListener *mListener;
private:
friend class DataChannelConnectRunnable;
#ifdef SCTP_DTLS_SUPPORTED
static void DTLSConnectThread(void *data);
int SendPacket(const unsigned char* data, size_t len, bool release);
@ -235,21 +238,24 @@ private:
// Streams pending reset
nsAutoTArray<uint16_t,4> mStreamsResetting;
struct socket *mMasterSocket;
struct socket *mSocket;
uint16_t mState;
struct socket *mMasterSocket; // accessed from connect thread
struct socket *mSocket; // cloned from mMasterSocket on successful Connect on connect thread
uint16_t mState; // modified on connect thread (to OPEN)
#ifdef SCTP_DTLS_SUPPORTED
nsRefPtr<TransportFlow> mTransportFlow;
nsCOMPtr<nsIEventTarget> mSTS;
#endif
uint16_t mLocalPort;
uint16_t mLocalPort; // Accessed from connect thread
uint16_t mRemotePort;
// Timer to control when we try to resend blocked messages
nsCOMPtr<nsITimer> mDeferredTimer;
uint32_t mDeferTimeout; // in ms
bool mTimerRunning;
// Thread used for connections
nsCOMPtr<nsIThread> mConnectThread;
};
class DataChannel {
@ -286,6 +292,7 @@ public:
}
~DataChannel();
void Destroy(); // when we disconnect from the connection after stream RESET
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(DataChannel)
@ -413,6 +420,14 @@ public:
mChannel(aChannel),
mConnection(aConnection) {}
// for ON_CONNECTION
DataChannelOnMessageAvailable(int32_t aType,
DataChannelConnection *aConnection,
bool aResult)
: mType(aType),
mConnection(aConnection),
mResult(aResult) {}
NS_IMETHOD Run()
{
switch (mType) {
@ -449,7 +464,10 @@ public:
mConnection->mListener->NotifyDataChannel(mChannel);
break;
case ON_CONNECTION:
mConnection->mListener->NotifyConnection();
if (mResult) {
mConnection->mListener->NotifyConnection();
}
mConnection->mConnectThread = nullptr; // kill the connection thread
break;
case ON_DISCONNECTED:
mConnection->mListener->NotifyClosedConnection();
@ -470,6 +488,7 @@ private:
nsRefPtr<DataChannelConnection> mConnection;
nsCString mData;
int32_t mLen;
bool mResult;
};
}