Bug 1556795: Allocate stream ids as soon as client/server is negotiated, and try to negotiate id limit increases after. r=ng

Differential Revision: https://phabricator.services.mozilla.com/D35047

--HG--
extra : moz-landing-system : lando
This commit is contained in:
Byron Campen [:bwc] 2019-06-19 20:46:19 +00:00
parent 2347ae8973
commit 806b266db5
2 changed files with 217 additions and 175 deletions

View File

@ -567,11 +567,6 @@ bool DataChannelConnection::Init(const uint16_t aLocalPort,
}
}
// Update number of streams
mStreams.AppendElements(aNumStreams);
for (uint32_t i = 0; i < aNumStreams; ++i) {
mStreams[i] = nullptr;
}
memset(&initmsg, 0, sizeof(initmsg));
len = sizeof(initmsg);
if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
@ -667,20 +662,27 @@ bool DataChannelConnection::ConnectToTransport(const std::string& aTransportId,
mLocalPort = localport;
mRemotePort = remoteport;
mState = CONNECTING;
mAllocateEven = Some(aClient);
RUN_ON_THREAD(
mSTS,
WrapRunnable(RefPtr<DataChannelConnection>(this),
&DataChannelConnection::SetSignals, aTransportId, aClient),
NS_DISPATCH_NORMAL);
// Could be faster. Probably doesn't matter.
while (auto channel = mChannels.Get(INVALID_STREAM)) {
mChannels.Remove(channel);
channel->mStream = FindFreeStream();
if (channel->mStream != INVALID_STREAM) {
mChannels.Insert(channel);
}
}
RUN_ON_THREAD(mSTS,
WrapRunnable(RefPtr<DataChannelConnection>(this),
&DataChannelConnection::SetSignals, aTransportId),
NS_DISPATCH_NORMAL);
return true;
}
void DataChannelConnection::SetSignals(const std::string& aTransportId,
bool aClient) {
void DataChannelConnection::SetSignals(const std::string& aTransportId) {
ASSERT_WEBRTC(IsSTSThread());
mTransportId = aTransportId;
mAllocateEven = aClient;
mTransportHandler->SignalPacketReceived.connect(
this, &DataChannelConnection::SctpDtlsInput);
// SignalStateChange() doesn't call you with the initial state
@ -1025,48 +1027,47 @@ bool DataChannelConnection::Connect(const char* addr, unsigned short port) {
#endif
DataChannel* DataChannelConnection::FindChannelByStream(uint16_t stream) {
return mStreams.SafeElementAt(stream);
return mChannels.Get(stream).get();
}
uint16_t DataChannelConnection::FindFreeStream() {
uint32_t i, j, limit;
ASSERT_WEBRTC(NS_IsMainThread());
uint16_t i, limit;
limit = mStreams.Length();
if (limit > MAX_NUM_STREAMS) limit = MAX_NUM_STREAMS;
limit = MAX_NUM_STREAMS;
for (i = (mAllocateEven ? 0 : 1); i < limit; i += 2) {
if (!mStreams[i]) {
// Verify it's not still in the process of closing
for (j = 0; j < mStreamsResetting.Length(); ++j) {
if (mStreamsResetting[j] == i) {
break;
}
MOZ_ASSERT(mAllocateEven.isSome());
for (i = (*mAllocateEven ? 0 : 1); i < limit; i += 2) {
if (mChannels.Get(i)) {
continue;
}
// Verify it's not still in the process of closing
size_t j;
for (j = 0; j < mStreamsResetting.Length(); ++j) {
if (mStreamsResetting[j] == i) {
break;
}
if (j == mStreamsResetting.Length()) break;
}
if (j == mStreamsResetting.Length()) {
return i;
}
}
if (i >= limit) {
return INVALID_STREAM;
}
return i;
return INVALID_STREAM;
}
uint32_t DataChannelConnection::UpdateCurrentStreamIndex() {
if (mCurrentStream == mStreams.Length() - 1) {
RefPtr<DataChannel> channel = mChannels.GetNextChannel(mCurrentStream);
if (!channel) {
mCurrentStream = 0;
} else {
++mCurrentStream;
mCurrentStream = channel->mStream;
}
return mCurrentStream;
}
uint32_t DataChannelConnection::GetCurrentStreamIndex() {
// Fix current stream index (in case #streams decreased)
if (mCurrentStream >= mStreams.Length()) {
mCurrentStream = 0;
}
return mCurrentStream;
}
@ -1076,8 +1077,8 @@ bool DataChannelConnection::RequestMoreStreams(int32_t aNeeded) {
uint32_t outStreamsNeeded;
socklen_t len;
if (aNeeded + mStreams.Length() > MAX_NUM_STREAMS) {
aNeeded = MAX_NUM_STREAMS - mStreams.Length();
if (aNeeded + mNegotiatedIdLimit > MAX_NUM_STREAMS) {
aNeeded = MAX_NUM_STREAMS - mNegotiatedIdLimit;
}
if (aNeeded <= 0) {
return false;
@ -1108,8 +1109,8 @@ bool DataChannelConnection::RequestMoreStreams(int32_t aNeeded) {
return false;
}
LOG(("Requested %u more streams", outStreamsNeeded));
// We add to mStreams when we get a SCTP_STREAM_CHANGE_EVENT and the
// values are larger than mStreams.Length()
// We add to mNegotiatedIdLimit when we get a SCTP_STREAM_CHANGE_EVENT and the
// values are larger than mNegotiatedIdLimit
return true;
}
@ -1242,7 +1243,7 @@ bool DataChannelConnection::SendDeferredMessages() {
uint32_t i = GetCurrentStreamIndex();
uint32_t end = i;
do {
channel = mStreams[i];
channel = mChannels.Get(i);
// Should already be cleared if closing/closed
if (!channel || channel->mBufferedData.IsEmpty()) {
i = UpdateCurrentStreamIndex();
@ -1375,9 +1376,9 @@ void DataChannelConnection::HandleOpenRequestMessage(
}
return;
}
if (stream >= mStreams.Length()) {
if (stream >= mNegotiatedIdLimit) {
LOG(("%s: stream %u out of bounds (%zu)", __FUNCTION__, stream,
mStreams.Length()));
mNegotiatedIdLimit));
return;
}
@ -1389,7 +1390,7 @@ void DataChannelConnection::HandleOpenRequestMessage(
channel =
new DataChannel(this, stream, DataChannel::OPEN, label, protocol,
prPolicy, prValue, ordered, false, nullptr, nullptr);
mStreams[stream] = channel;
mChannels.Insert(channel);
LOG(("%s: sending ON_CHANNEL_CREATED for %s/%s: %u", __FUNCTION__,
channel->mLabel.get(), channel->mProtocol.get(), stream));
@ -1989,7 +1990,9 @@ void DataChannelConnection::ClearResets() {
if (channel) {
LOG(("Forgetting channel %u (%p) with pending reset", channel->mStream,
channel.get()));
mStreams[channel->mStream] = nullptr;
// TODO: Do we _really_ want to remove this? Are we allowed to reuse the
// id?
mChannels.Remove(channel);
}
}
mStreamsResetting.Clear();
@ -2071,11 +2074,10 @@ void DataChannelConnection::HandleStreamResetEvent(
// yet.
LOG(("Incoming: Channel %u closed", channel->mStream));
if (mStreams[channel->mStream]) {
if (mChannels.Remove(channel)) {
// Mark the stream for reset (the reset is sent below)
ResetOutgoingStream(channel->mStream);
}
mStreams[channel->mStream] = nullptr;
LOG(("Disconnected DataChannel %p from connection %p",
(void*)channel.get(), (void*)channel->mConnection.get()));
@ -2096,48 +2098,42 @@ void DataChannelConnection::HandleStreamResetEvent(
void DataChannelConnection::HandleStreamChangeEvent(
const struct sctp_stream_change_event* strchg) {
uint16_t stream;
RefPtr<DataChannel> channel;
if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) {
LOG(("*** Failed increasing number of streams from %zu (%u/%u)",
mStreams.Length(), strchg->strchange_instrms,
mNegotiatedIdLimit, strchg->strchange_instrms,
strchg->strchange_outstrms));
// XXX FIX! notify pending opens of failure
return;
}
if (strchg->strchange_instrms > mStreams.Length()) {
LOG(("Other side increased streams from %zu to %u", mStreams.Length(),
if (strchg->strchange_instrms > mNegotiatedIdLimit) {
LOG(("Other side increased streams from %zu to %u", mNegotiatedIdLimit,
strchg->strchange_instrms));
}
if (strchg->strchange_outstrms > mStreams.Length() ||
strchg->strchange_instrms > mStreams.Length()) {
uint16_t old_len = mStreams.Length();
uint16_t new_len =
std::max(strchg->strchange_outstrms, strchg->strchange_instrms);
uint16_t old_limit = mNegotiatedIdLimit;
uint16_t new_limit =
std::max(strchg->strchange_outstrms, strchg->strchange_instrms);
if (new_limit > mNegotiatedIdLimit) {
LOG(("Increasing number of streams from %u to %u - adding %u (in: %u)",
old_len, new_len, new_len - old_len, strchg->strchange_instrms));
old_limit, new_limit, new_limit - old_limit,
strchg->strchange_instrms));
// make sure both are the same length
mStreams.AppendElements(new_len - old_len);
LOG(("New length = %zu (was %d)", mStreams.Length(), old_len));
for (size_t i = old_len; i < mStreams.Length(); ++i) {
mStreams[i] = nullptr;
}
mNegotiatedIdLimit = new_limit;
LOG(("New length = %zu (was %d)", mNegotiatedIdLimit, old_limit));
// Re-process any channels waiting for streams.
// Linear search, but we don't increase channels often and
// the array would only get long in case of an app error normally
// Make sure we request enough streams if there's a big jump in streams
// Could make a more complex API for OpenXxxFinish() and avoid this loop
size_t num_needed = mPending.GetSize();
LOG(("%zu of %d new streams already needed", num_needed,
new_len - old_len));
num_needed -= (new_len - old_len); // number we added
if (num_needed > 0) {
if (num_needed < 16) num_needed = 16;
LOG(("Not enough new streams, asking for %zu more", num_needed));
auto channels = mChannels.GetAll();
size_t num_needed =
channels.Length() ? (channels.LastElement()->mStream + 1) : 0;
MOZ_ASSERT(num_needed != INVALID_STREAM);
if (num_needed > new_limit) {
int32_t more_needed = num_needed - ((int32_t)mNegotiatedIdLimit) + 16;
LOG(("Not enough new streams, asking for %d more", more_needed));
// TODO: parameter is an int32_t but we pass size_t
RequestMoreStreams(num_needed);
RequestMoreStreams(more_needed);
} else if (strchg->strchange_outstrms < strchg->strchange_instrms) {
LOG(("Requesting %d output streams to match partner",
strchg->strchange_instrms - strchg->strchange_outstrms));
@ -2149,41 +2145,14 @@ void DataChannelConnection::HandleStreamChangeEvent(
}
// else probably not a change in # of streams
for (uint32_t i = 0; i < mStreams.Length(); ++i) {
channel = mStreams[i];
if (!channel) continue;
if (channel->mStream == INVALID_STREAM) {
if ((strchg->strchange_flags & SCTP_STREAM_CHANGE_DENIED) ||
(strchg->strchange_flags & SCTP_STREAM_CHANGE_FAILED)) {
if ((strchg->strchange_flags & SCTP_STREAM_CHANGE_DENIED) ||
(strchg->strchange_flags & SCTP_STREAM_CHANGE_FAILED)) {
// Other side denied our request. Need to AnnounceClosed some stuff.
for (auto& channel : mChannels.GetAll()) {
if (channel->mStream >= mNegotiatedIdLimit) {
/* XXX: Signal to the other end. */
channel->AnnounceClosed();
// maybe fire onError (bug 843625)
} else {
stream = FindFreeStream();
if (stream != INVALID_STREAM) {
channel->mStream = stream;
mStreams[stream] = channel;
// Send open request
int error = SendOpenRequestMessage(
channel->mLabel, channel->mProtocol, channel->mStream,
!!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED),
channel->mPrPolicy, channel->mPrValue);
if (error) {
LOG(("SendOpenRequest failed, error = %d", error));
// Close the channel, inform the user
mStreams[channel->mStream] = nullptr;
channel->AnnounceClosed();
// Don't need to reset; we didn't open it
} else {
channel->mFlags |= DATA_CHANNEL_FLAGS_READY;
channel->AnnounceOpen();
}
} else {
/* We will not find more ... */
break;
}
}
}
}
@ -2280,8 +2249,16 @@ already_AddRefed<DataChannel> DataChannelConnection::Open(
bool inOrder, uint32_t prValue, DataChannelListener* aListener,
nsISupports* aContext, bool aExternalNegotiated, uint16_t aStream) {
if (!aExternalNegotiated) {
// aStream == INVALID_STREAM to have the protocol allocate
aStream = INVALID_STREAM;
if (mAllocateEven.isSome()) {
aStream = FindFreeStream();
if (aStream == INVALID_STREAM) {
return nullptr;
}
} else {
// We do not yet know whether we are client or server, and an id has not
// been chosen for us. We will need to choose later.
aStream = INVALID_STREAM;
}
}
uint16_t prPolicy = SCTP_PR_SCTP_NONE;
@ -2310,9 +2287,7 @@ already_AddRefed<DataChannel> DataChannelConnection::Open(
return nullptr;
}
// Don't look past currently-negotiated streams
if (aStream != INVALID_STREAM && aStream < mStreams.Length() &&
mStreams[aStream]) {
if (aStream != INVALID_STREAM && mChannels.Get(aStream)) {
LOG(("ERROR: external negotiation of already-open channel %u", aStream));
// XXX How do we indicate this up to the application? Probably the
// caller's job, but we may need to return an error code.
@ -2322,6 +2297,7 @@ already_AddRefed<DataChannel> DataChannelConnection::Open(
RefPtr<DataChannel> channel(new DataChannel(
this, aStream, DataChannel::CONNECTING, label, protocol, prPolicy,
prValue, inOrder, aExternalNegotiated, aListener, aContext));
mChannels.Insert(channel);
MutexAutoLock lock(mLock); // OpenFinish assumes this
return OpenFinish(channel.forget());
@ -2333,7 +2309,7 @@ already_AddRefed<DataChannel> DataChannelConnection::OpenFinish(
RefPtr<DataChannel> channel(aChannel); // takes the reference passed in
// Normally 1 reference if called from ::Open(), or 2 if called from
// ProcessQueuedOpens() unless the DOMDataChannel was gc'd
uint16_t stream = channel->mStream;
const uint16_t stream = channel->mStream;
bool queue = false;
mLock.AssertCurrentThreadOwns();
@ -2363,16 +2339,12 @@ already_AddRefed<DataChannel> DataChannelConnection::OpenFinish(
// either change the initial ask or possibly renegotiate after open.
if (mState == OPEN) {
if (stream == INVALID_STREAM) {
stream = FindFreeStream(); // may be INVALID_STREAM if we need more
}
if (stream == INVALID_STREAM || stream >= mStreams.Length()) {
MOZ_ASSERT(stream != INVALID_STREAM);
if (stream >= mNegotiatedIdLimit) {
// RequestMoreStreams() limits to MAX_NUM_STREAMS -- allocate extra
// streams to avoid going back immediately for more if the ask to N, N+1,
// etc
int32_t more_needed = (stream == INVALID_STREAM)
? 16
: (stream - ((int32_t)mStreams.Length())) + 16;
int32_t more_needed = stream - ((int32_t)mNegotiatedIdLimit) + 16;
if (!RequestMoreStreams(more_needed)) {
// Something bad happened... we're done
goto request_error_cleanup;
@ -2381,12 +2353,13 @@ already_AddRefed<DataChannel> DataChannelConnection::OpenFinish(
}
} else {
// not OPEN
if (stream != INVALID_STREAM && stream >= mStreams.Length() &&
if (stream != INVALID_STREAM && stream >= mNegotiatedIdLimit &&
mState == CLOSED) {
// Update number of streams for init message
struct sctp_initmsg initmsg;
socklen_t len = sizeof(initmsg);
int32_t total_needed = stream + 16;
uint16_t total_needed =
(stream < UINT16_MAX - 16) ? stream + 16 : UINT16_MAX;
memset(&initmsg, 0, sizeof(initmsg));
if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG,
@ -2403,12 +2376,6 @@ already_AddRefed<DataChannel> DataChannelConnection::OpenFinish(
LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
goto request_error_cleanup;
}
int32_t old_len = mStreams.Length();
mStreams.AppendElements(total_needed - old_len);
for (int32_t i = old_len; i < total_needed; ++i) {
mStreams[i] = nullptr;
}
}
// else if state is CONNECTING, we'll just re-negotiate when OpenFinish
// is called, if needed
@ -2426,9 +2393,7 @@ already_AddRefed<DataChannel> DataChannelConnection::OpenFinish(
}
MOZ_ASSERT(stream != INVALID_STREAM);
// just allocated (& OPEN), or externally negotiated
mStreams[stream] = channel; // holds a reference
channel->mStream = stream;
MOZ_ASSERT(stream < mNegotiatedIdLimit);
#ifdef TEST_QUEUED_DATA
// It's painful to write a test for this...
@ -2457,8 +2422,7 @@ already_AddRefed<DataChannel> DataChannelConnection::OpenFinish(
}
// If we haven't returned the channel yet, it will get destroyed when we
// exit this function.
mStreams[stream] = nullptr;
channel->mStream = INVALID_STREAM;
mChannels.Remove(channel);
// we'll be destroying the channel
return nullptr;
/* NOTREACHED */
@ -2743,7 +2707,7 @@ class ReadBlobRunnable : public Runnable {
// Returns a POSIX error code.
int DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream* aBlob) {
DataChannel* channel = mStreams[stream];
RefPtr<DataChannel> channel = mChannels.Get(stream);
if (NS_WARN_IF(!channel)) {
return EINVAL; // TODO: Find a better error code
}
@ -2829,15 +2793,6 @@ void DataChannelConnection::ReadBlob(
Dispatch(runnable.forget());
}
void DataChannelConnection::GetStreamIds(std::vector<uint16_t>* aStreamList) {
ASSERT_WEBRTC(NS_IsMainThread());
for (uint32_t i = 0; i < mStreams.Length(); ++i) {
if (mStreams[i]) {
aStreamList->push_back(mStreams[i]->mStream);
}
}
}
// Returns a POSIX error code.
int DataChannelConnection::SendDataMsgCommon(uint16_t stream,
const nsACString& aMsg,
@ -2845,7 +2800,7 @@ int DataChannelConnection::SendDataMsgCommon(uint16_t stream,
ASSERT_WEBRTC(NS_IsMainThread());
// We really could allow this from other threads, so long as we deal with
// asynchronosity issues with channels closing, in particular access to
// mStreams, and issues with the association closing (access to mSocket).
// mChannels, and issues with the association closing (access to mSocket).
const uint8_t* data = (const uint8_t*)aMsg.BeginReading();
uint32_t len = aMsg.Length();
@ -2854,12 +2809,11 @@ int DataChannelConnection::SendDataMsgCommon(uint16_t stream,
return EMSGSIZE;
}
#endif
DataChannel* channelPtr;
LOG(("Sending %sto stream %u: %u bytes", isBinary ? "binary " : "", stream,
len));
// XXX if we want more efficiency, translate flags once at open time
channelPtr = mStreams[stream];
RefPtr<DataChannel> channelPtr = mChannels.Get(stream);
if (NS_WARN_IF(!channelPtr)) {
return EINVAL; // TODO: Find a better error code
}
@ -2894,24 +2848,24 @@ void DataChannelConnection::CloseInt(DataChannel* aChannel) {
mLock.AssertCurrentThreadOwns();
LOG(("Connection %p/Channel %p: Closing stream %u",
channel->mConnection.get(), channel.get(), channel->mStream));
aChannel->mBufferedData.Clear();
if (mState == CLOSED) {
// If we're CLOSING, we might leave this in place until we can send a
// reset.
mChannels.Remove(channel);
}
// re-test since it may have closed before the lock was grabbed
if (aChannel->mReadyState == CLOSED || aChannel->mReadyState == CLOSING) {
LOG(("Channel already closing/closed (%u)", aChannel->mReadyState));
if (mState == CLOSED && channel->mStream != INVALID_STREAM) {
// called from CloseAll()
// we're not going to hang around waiting any more
mStreams[channel->mStream] = nullptr;
}
return;
}
aChannel->mBufferedData.Clear();
if (channel->mStream != INVALID_STREAM) {
ResetOutgoingStream(channel->mStream);
if (mState == CLOSED) { // called from CloseAll()
// Let resets accumulate then send all at once in CloseAll()
// we're not going to hang around waiting
mStreams[channel->mStream] = nullptr;
} else {
if (mState != CLOSED) {
// Individual channel is being closed, send reset now.
SendOutgoingStreamReset();
}
}
@ -2937,12 +2891,8 @@ void DataChannelConnection::CloseAll() {
// Close current channels
// If there are runnables, they hold a strong ref and keep the channel
// and/or connection alive (even if in a CLOSED state)
bool closed_some = false;
for (uint32_t i = 0; i < mStreams.Length(); ++i) {
if (mStreams[i]) {
mStreams[i]->Close();
closed_some = true;
}
for (auto& channel : mChannels.GetAll()) {
channel->Close();
}
// Clean up any pending opens for channels
@ -2952,14 +2902,78 @@ void DataChannelConnection::CloseAll() {
LOG(("closing pending channel %p, stream %u", channel.get(),
channel->mStream));
channel->Close(); // also releases the ref on each iteration
closed_some = true;
}
// It's more efficient to let the Resets queue in shutdown and then
// SendOutgoingStreamReset() here.
if (closed_some) {
MutexAutoLock lock(mLock);
SendOutgoingStreamReset();
MutexAutoLock lock(mLock);
SendOutgoingStreamReset();
}
bool DataChannelConnection::Channels::IdComparator::Equals(
const RefPtr<DataChannel>& aChannel, uint16_t aId) const {
return aChannel->mStream == aId;
}
bool DataChannelConnection::Channels::IdComparator::LessThan(
const RefPtr<DataChannel>& aChannel, uint16_t aId) const {
return aChannel->mStream < aId;
}
bool DataChannelConnection::Channels::IdComparator::Equals(
const RefPtr<DataChannel>& a1, const RefPtr<DataChannel>& a2) const {
return Equals(a1, a2->mStream);
}
bool DataChannelConnection::Channels::IdComparator::LessThan(
const RefPtr<DataChannel>& a1, const RefPtr<DataChannel>& a2) const {
return LessThan(a1, a2->mStream);
}
void DataChannelConnection::Channels::Insert(
const RefPtr<DataChannel>& aChannel) {
LOG(("Inserting channel %u : %p", aChannel->mStream, aChannel.get()));
MutexAutoLock lock(mMutex);
if (aChannel->mStream != INVALID_STREAM) {
MOZ_ASSERT(!mChannels.ContainsSorted(aChannel, IdComparator()));
}
MOZ_ASSERT(!mChannels.Contains(aChannel));
mChannels.InsertElementSorted(aChannel, IdComparator());
}
bool DataChannelConnection::Channels::Remove(
const RefPtr<DataChannel>& aChannel) {
LOG(("Removing channel %u : %p", aChannel->mStream, aChannel.get()));
MutexAutoLock lock(mMutex);
if (aChannel->mStream == INVALID_STREAM) {
return mChannels.RemoveElement(aChannel);
}
return mChannels.RemoveElementSorted(aChannel, IdComparator());
}
RefPtr<DataChannel> DataChannelConnection::Channels::Get(uint16_t aId) const {
MutexAutoLock lock(mMutex);
auto index = mChannels.BinaryIndexOf(aId, IdComparator());
if (index == ChannelArray::NoIndex) {
return nullptr;
}
return mChannels[index];
}
RefPtr<DataChannel> DataChannelConnection::Channels::GetNextChannel(
uint16_t aCurrentId) const {
MutexAutoLock lock(mMutex);
if (mChannels.IsEmpty()) {
return nullptr;
}
auto index = mChannels.IndexOfFirstElementGt(aCurrentId, IdComparator());
if (index == mChannels.Length()) {
index = 0;
}
return mChannels[index];
}
DataChannel::~DataChannel() {
@ -2986,8 +3000,6 @@ void DataChannel::StreamClosedLocked() {
LOG(("Destroying Data channel %u", mStream));
MOZ_ASSERT_IF(mStream != INVALID_STREAM,
!mConnection->FindChannelByStream(mStream));
// Spec doesn't say to mess with the stream id...
mStream = INVALID_STREAM;
AnnounceClosed();
// We leave mConnection live until the DOM releases us, to avoid races
}

View File

@ -164,7 +164,7 @@ class DataChannelConnection final : public net::NeckoTargetHolder
void TransportStateChange(const std::string& aTransportId,
TransportLayer::State aState);
void CompleteConnect();
void SetSignals(const std::string& aTransportId, bool aClient);
void SetSignals(const std::string& aTransportId);
#endif
typedef enum {
@ -216,8 +216,6 @@ class DataChannelConnection final : public net::NeckoTargetHolder
void ReadBlob(already_AddRefed<DataChannelConnection> aThis, uint16_t aStream,
nsIInputStream* aBlob);
void GetStreamIds(std::vector<uint16_t>* aStreamList);
bool SendDeferredMessages();
protected:
@ -310,16 +308,47 @@ class DataChannelConnection final : public net::NeckoTargetHolder
}
#endif
class Channels {
public:
Channels() : mMutex("DataChannelConnection::Channels::mMutex") {}
void Insert(const RefPtr<DataChannel>& aChannel);
bool Remove(const RefPtr<DataChannel>& aChannel);
RefPtr<DataChannel> Get(uint16_t aId) const;
typedef AutoTArray<RefPtr<DataChannel>, 16> ChannelArray;
ChannelArray GetAll() const {
MutexAutoLock lock(mMutex);
return mChannels;
}
RefPtr<DataChannel> GetNextChannel(uint16_t aCurrentId) const;
private:
struct IdComparator {
bool Equals(const RefPtr<DataChannel>& aChannel, uint16_t aId) const;
bool LessThan(const RefPtr<DataChannel>& aChannel, uint16_t aId) const;
bool Equals(const RefPtr<DataChannel>& a1,
const RefPtr<DataChannel>& a2) const;
bool LessThan(const RefPtr<DataChannel>& a1,
const RefPtr<DataChannel>& a2) const;
};
mutable Mutex mMutex;
ChannelArray mChannels;
};
bool mSendInterleaved = false;
bool mMaxMessageSizeSet = false;
uint64_t mMaxMessageSize = 0;
bool mAllocateEven = false;
// Main thread only
Maybe<bool> mAllocateEven;
// Data:
// NOTE: while this array will auto-expand, increases in the number of
// NOTE: while this container will auto-expand, increases in the number of
// channels available from the stack must be negotiated!
AutoTArray<RefPtr<DataChannel>, 16> mStreams;
// Accessed from both main and sts, API is threadsafe
Channels mChannels;
// STS only
uint32_t mCurrentStream = 0;
nsDeque mPending; // Holds addref'ed DataChannel's -- careful!
// STS and main
size_t mNegotiatedIdLimit = 0; // GUARDED_BY(mConnection->mLock)
uint8_t mPendingType = PENDING_NONE;
// holds data that's come in before a channel is open
nsTArray<nsAutoPtr<QueuedDataMessage>> mQueuedData;
@ -327,8 +356,8 @@ class DataChannelConnection final : public net::NeckoTargetHolder
nsTArray<nsAutoPtr<BufferedOutgoingMsg>>
mBufferedControl; // GUARDED_BY(mConnection->mLock)
// Streams pending reset
AutoTArray<uint16_t, 4> mStreamsResetting;
// Streams pending reset. Accessed from main and STS.
AutoTArray<uint16_t, 4> mStreamsResetting; // GUARDED_BY(mConnection->mLock)
// accessed from STS thread
struct socket* mMasterSocket = nullptr;
// cloned from mMasterSocket on successful Connect on STS thread
@ -493,6 +522,7 @@ class DataChannel {
uint16_t mStream;
uint16_t mPrPolicy;
uint32_t mPrValue;
// Accessed on main and STS
const bool mNegotiated;
const bool mOrdered;
uint32_t mFlags;