Bug 855623: Update DataChannel protocol to be declarative vs 3-way handshake (per IETF) r=tuexen,ehugg

This commit is contained in:
Randell Jesup 2013-03-31 21:09:25 -04:00
parent 8de31476da
commit a401fa74bc
4 changed files with 154 additions and 401 deletions

View File

@ -654,7 +654,8 @@ PeerConnectionImpl::InitializeDataChannel(uint16_t aLocalport,
// and we increase the number of streams dynamically as needed.
return NS_OK;
}
mDataConnection = new mozilla::DataChannelConnection(this);
// FIX! Temporary cheat to decide on even/odd
mDataConnection = new mozilla::DataChannelConnection(this, aLocalport > aRemoteport);
if (!mDataConnection->Init(aLocalport, aNumstreams, true)) {
CSFLogError(logTag,"%s DataConnection Init Failed",__FUNCTION__);
return NS_ERROR_FAILURE;

View File

@ -175,9 +175,11 @@ debug_printf(const char *format, ...)
}
#endif
DataChannelConnection::DataChannelConnection(DataConnectionListener *listener) :
DataChannelConnection::DataChannelConnection(DataConnectionListener *listener,
bool aIsEven) :
mLock("netwerk::sctp::DataChannelConnection")
{
mAllocateEven = aIsEven;
mState = CLOSED;
mSocket = nullptr;
mMasterSocket = nullptr;
@ -186,7 +188,8 @@ DataChannelConnection::DataChannelConnection(DataConnectionListener *listener) :
mRemotePort = 0;
mDeferTimeout = 10;
mTimerRunning = false;
LOG(("Constructor DataChannelConnection=%p, listener=%p", this, mListener.get()));
LOG(("Constructor DataChannelConnection=%p, listener=%p, %s", this, mListener.get(),
aIsEven ? "Even" : "Odd"));
}
DataChannelConnection::~DataChannelConnection()
@ -286,7 +289,7 @@ DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aUs
NS_ASSERTION(!aUsingDtls, "Trying to use SCTP/DTLS without mtransport");
#endif
} else {
LOG(("sctp_init(%d)", aPort));
LOG(("sctp_init(%u)", aPort));
usrsctp_init(aPort,
nullptr,
#ifdef PR_LOGGING
@ -392,11 +395,9 @@ DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aUs
}
// Update number of streams
mStreamsOut.AppendElements(aNumStreams);
mStreamsIn.AppendElements(aNumStreams); // make sure both are the same length
mStreams.AppendElements(aNumStreams);
for (uint32_t i = 0; i < aNumStreams; ++i) {
mStreamsOut[i] = nullptr;
mStreamsIn[i] = nullptr;
mStreams[i] = nullptr;
}
memset(&initmsg, 0, sizeof(initmsg));
len = sizeof(initmsg);
@ -489,7 +490,7 @@ DataChannelConnection::Notify(nsITimer *timer)
bool
DataChannelConnection::ConnectDTLS(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport)
{
LOG(("Connect DTLS local %d, remote %d", localport, remoteport));
LOG(("Connect DTLS local %u, remote %u", localport, remoteport));
NS_PRECONDITION(mMasterSocket, "SCTP wasn't initialized before ConnectDTLS!");
NS_ENSURE_TRUE(aFlow, false);
@ -637,7 +638,7 @@ DataChannelConnection::Listen(unsigned short port)
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
LOG(("Waiting for connections on port %d", ntohs(addr.sin_port)));
LOG(("Waiting for connections on port %u", ntohs(addr.sin_port)));
mState = CONNECTING;
if (usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr), sizeof(struct sockaddr_in)) < 0) {
LOG(("***Failed userspace_bind"));
@ -750,36 +751,22 @@ DataChannelConnection::Connect(const char *addr, unsigned short port)
}
DataChannel *
DataChannelConnection::FindChannelByStreamIn(uint16_t streamIn)
DataChannelConnection::FindChannelByStream(uint16_t streamOut)
{
// Auto-extend mStreamsIn as needed
if (((uint32_t) streamIn) + 1 > mStreamsIn.Length()) {
uint32_t old_len = mStreamsIn.Length();
LOG(("Extending mStreamsIn[] to %d elements", ((int32_t) streamIn)+1));
mStreamsIn.AppendElements((streamIn+1) - mStreamsIn.Length());
for (uint32_t i = old_len; i < mStreamsIn.Length(); ++i)
mStreamsIn[i] = nullptr;
}
// Should always be safe in practice
return mStreamsIn.SafeElementAt(streamIn);
}
DataChannel *
DataChannelConnection::FindChannelByStreamOut(uint16_t streamOut)
{
return mStreamsOut.SafeElementAt(streamOut);
return mStreams.SafeElementAt(streamOut);
}
uint16_t
DataChannelConnection::FindFreeStreamOut()
DataChannelConnection::FindFreeStream()
{
uint32_t i, limit;
limit = mStreamsOut.Length();
limit = mStreams.Length();
if (limit > MAX_NUM_STREAMS)
limit = MAX_NUM_STREAMS;
for (i = 0; i < limit; ++i) {
if (!mStreamsOut[i]) {
for (i = (mAllocateEven ? 0 : 1); i < limit; i += 2) {
if (!mStreams[i]) {
// Verify it's not still in the process of closing
for (uint32_t j = 0; j < mStreamsResetting.Length(); ++j) {
if (mStreamsResetting[j] == i) {
@ -796,15 +783,15 @@ DataChannelConnection::FindFreeStreamOut()
}
bool
DataChannelConnection::RequestMoreStreamsOut(int32_t aNeeded)
DataChannelConnection::RequestMoreStreams(int32_t aNeeded)
{
struct sctp_status status;
struct sctp_add_streams sas;
uint32_t outStreamsNeeded;
socklen_t len;
if (aNeeded + mStreamsOut.Length() > MAX_NUM_STREAMS)
aNeeded = MAX_NUM_STREAMS - mStreamsOut.Length();
if (aNeeded + mStreams.Length() > MAX_NUM_STREAMS)
aNeeded = MAX_NUM_STREAMS - mStreams.Length();
if (aNeeded <= 0)
return false;
@ -832,13 +819,13 @@ DataChannelConnection::RequestMoreStreamsOut(int32_t aNeeded)
}
int32_t
DataChannelConnection::SendControlMessage(void *msg, uint32_t len, uint16_t streamOut)
DataChannelConnection::SendControlMessage(void *msg, uint32_t len, uint16_t stream)
{
struct sctp_sndinfo sndinfo;
// Note: Main-thread IO, but doesn't block
memset(&sndinfo, 0, sizeof(struct sctp_sndinfo));
sndinfo.snd_sid = streamOut;
sndinfo.snd_sid = stream;
sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL);
if (usrsctp_sendv(mSocket, msg, len, nullptr, 0,
&sndinfo, (socklen_t)sizeof(struct sctp_sndinfo),
@ -849,33 +836,9 @@ DataChannelConnection::SendControlMessage(void *msg, uint32_t len, uint16_t stre
return (1);
}
int32_t
DataChannelConnection::SendOpenResponseMessage(uint16_t streamOut, uint16_t streamIn)
{
struct rtcweb_datachannel_open_response rsp;
memset(&rsp, 0, sizeof(struct rtcweb_datachannel_open_response));
rsp.msg_type = DATA_CHANNEL_OPEN_RESPONSE;
rsp.reverse_stream = htons(streamIn);
return SendControlMessage(&rsp, sizeof(rsp), streamOut);
}
int32_t
DataChannelConnection::SendOpenAckMessage(uint16_t streamOut)
{
struct rtcweb_datachannel_ack ack;
memset(&ack, 0, sizeof(struct rtcweb_datachannel_ack));
ack.msg_type = DATA_CHANNEL_ACK;
return SendControlMessage(&ack, sizeof(ack), streamOut);
}
int32_t
DataChannelConnection::SendOpenRequestMessage(const nsACString& label,
uint16_t streamOut, bool unordered,
uint16_t stream, bool unordered,
uint16_t prPolicy, uint32_t prValue)
{
int len = label.Length(); // not including nul
@ -908,7 +871,7 @@ DataChannelConnection::SendOpenRequestMessage(const nsACString& label,
req->priority = htons(0); /* XXX: add support */
strcpy(&req->label[0], PromiseFlatCString(label).get());
int32_t result = SendControlMessage(req, sizeof(*req)+len, streamOut);
int32_t result = SendControlMessage(req, sizeof(*req)+len, stream);
moz_free(req);
return result;
@ -937,14 +900,14 @@ DataChannelConnection::SendDeferredMessages()
// XXX For total fairness, on a still_blocked we'd start next time at the
// same index. Sorry, not going to bother for now.
for (i = 0; i < mStreamsOut.Length(); ++i) {
channel = mStreamsOut[i];
for (i = 0; i < mStreams.Length(); ++i) {
channel = mStreams[i];
if (!channel)
continue;
// Only one of these should be set....
if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_REQ) {
if (SendOpenRequestMessage(channel->mLabel, channel->mStreamOut,
if (SendOpenRequestMessage(channel->mLabel, channel->mStream,
channel->mFlags & DATA_CHANNEL_FLAG_OUT_OF_ORDER_ALLOWED,
channel->mPrPolicy, channel->mPrValue)) {
channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_REQ;
@ -954,7 +917,7 @@ DataChannelConnection::SendDeferredMessages()
still_blocked = true;
} else {
// Close the channel, inform the user
mStreamsOut[channel->mStreamOut] = nullptr;
mStreams[channel->mStream] = nullptr;
channel->mState = CLOSED;
// Don't need to reset; we didn't open it
NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
@ -966,46 +929,6 @@ DataChannelConnection::SendDeferredMessages()
if (still_blocked)
break;
if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_RSP) {
if (SendOpenResponseMessage(channel->mStreamOut, channel->mStreamIn)) {
channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_RSP;
sent = true;
} else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
still_blocked = true;
} else {
// Close the channel
// Don't need to reset; we didn't open it
// The other side may be left with a hanging Open. Our inability to
// send the open response means we can't easily tell them about it
// We haven't informed the user/DOM of the creation yet, so just
// delete the channel.
mStreamsIn[channel->mStreamIn] = nullptr;
mStreamsOut[channel->mStreamOut] = nullptr;
channel->mState = CLOSED;
}
}
}
if (still_blocked)
break;
if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_ACK) {
if (SendOpenAckMessage(channel->mStreamOut)) {
channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_ACK;
sent = true;
} else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
still_blocked = true;
} else {
// Close the channel, inform the user
CloseInt(channel);
// XXX send error via DataChannelOnMessageAvailable (bug 843625)
}
}
}
if (still_blocked)
break;
if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_DATA) {
bool failed_send = false;
int32_t result;
@ -1066,7 +989,7 @@ DataChannelConnection::SendDeferredMessages()
void
DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req,
size_t length,
uint16_t streamIn)
uint16_t stream)
{
nsRefPtr<DataChannel> channel;
uint32_t prValue;
@ -1076,9 +999,9 @@ DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_
mLock.AssertCurrentThreadOwns();
if ((channel = FindChannelByStreamIn(streamIn))) {
LOG(("ERROR: HandleOpenRequestMessage: channel for stream %d is in state %d instead of CLOSED.",
streamIn, channel->mState));
if ((channel = FindChannelByStream(stream))) {
LOG(("ERROR: HandleOpenRequestMessage: channel for stream %u is in state %d instead of CLOSED.",
stream, channel->mState));
/* XXX: some error handling */
return;
}
@ -1099,171 +1022,44 @@ DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_
prValue = ntohs(req->reliability_params);
flags = ntohs(req->flags) & DATA_CHANNEL_FLAG_OUT_OF_ORDER_ALLOWED;
channel = new DataChannel(this,
INVALID_STREAM, streamIn,
stream,
DataChannel::CONNECTING,
label,
prPolicy, prValue,
flags,
nullptr, nullptr);
mStreamsIn[streamIn] = channel;
mStreams[stream] = channel;
OpenResponseFinish(channel.forget());
channel->mState = DataChannel::WAITING_TO_OPEN;
LOG(("%s: sending ON_CHANNEL_CREATED for %s: %u", __FUNCTION__,
channel->mLabel.get(), stream));
NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CHANNEL_CREATED,
this, channel));
LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
}
void
DataChannelConnection::OpenResponseFinish(already_AddRefed<DataChannel> aChannel)
{
nsRefPtr<DataChannel> channel(aChannel);
uint16_t streamOut = FindFreeStreamOut(); // may be INVALID_STREAM!
mLock.AssertCurrentThreadOwns();
LOG(("Finished response: channel %p, streamOut = %u", channel.get(), streamOut));
if (streamOut == INVALID_STREAM) {
if (!RequestMoreStreamsOut()) {
channel->mState = CLOSED;
if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_RSP) {
// We already returned the channel to the app.
NS_ERROR("Failed to request more streams");
NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
channel));
}
// If we weren't deferred, we'll be destroying the channel, but it
// never really got set up
// Alternative would be to RUN_ON_THREAD(channel.forget(),::Destroy,...) and
// Dispatch it to ourselves
mStreamsIn[channel->mStreamIn] = nullptr;
/* XXX: Signal error to the other end (and maybe fire onError: bug 843625) */
return;
}
LOG(("Queuing channel %d to finish response", channel->mStreamIn));
channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_RSP;
DataChannel *temp = channel.get(); // Can't cast away already_AddRefed<> from channel.forget()
channel.forget();
mPending.Push(temp);
// can't notify the user until we can send an OpenResponse
} else {
channel->mStreamOut = streamOut;
mStreamsOut[streamOut] = channel;
if (SendOpenResponseMessage(streamOut, channel->mStreamIn)) {
/* Notify ondatachannel */
// XXX We need to make sure connection sticks around until the message is delivered
LOG(("%s: sending ON_CHANNEL_CREATED for %s: %d/%d", __FUNCTION__,
channel->mLabel.get(), streamOut, channel->mStreamIn));
NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CHANNEL_CREATED,
this, channel));
} else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_RSP;
StartDefer();
} else {
if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_RSP) {
// We already returned the channel to the app.
NS_ERROR("Failed to send open response");
NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
channel));
}
/* XXX: Signal error to the other end. */
mStreamsIn[channel->mStreamIn] = nullptr;
mStreamsOut[streamOut] = nullptr;
channel->mStreamOut = INVALID_STREAM;
// we'll be destroying the channel if it wasn't already returned
channel->mState = CLOSED;
return;
}
}
}
}
void
DataChannelConnection::HandleOpenResponseMessage(const struct rtcweb_datachannel_open_response *rsp,
size_t length, uint16_t streamIn)
{
uint16_t streamOut;
DataChannel *channel;
mLock.AssertCurrentThreadOwns();
streamOut = ntohs(rsp->reverse_stream);
channel = FindChannelByStreamOut(streamOut);
NS_ENSURE_TRUE_VOID(channel);
NS_ENSURE_TRUE_VOID(channel->mState == CONNECTING);
if (rsp->error) {
LOG(("%s: error in response to open of channel %d (%s)",
__FUNCTION__, streamOut, channel->mLabel.get()));
} else {
NS_ENSURE_TRUE_VOID(!FindChannelByStreamIn(streamIn));
channel->mStreamIn = streamIn;
channel->mState = OPEN;
channel->mReady = true;
mStreamsIn[streamIn] = channel;
if (SendOpenAckMessage(streamOut)) {
channel->mFlags = 0;
} else {
// XXX Only on EAGAIN!? And if not, then close the channel??
channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_ACK;
StartDefer();
}
LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel));
NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
channel));
}
}
void
DataChannelConnection::HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack,
size_t length, uint16_t streamIn)
{
DataChannel *channel;
mLock.AssertCurrentThreadOwns();
channel = FindChannelByStreamIn(streamIn);
NS_ENSURE_TRUE_VOID(channel);
NS_ENSURE_TRUE_VOID(channel->mState == CONNECTING);
channel->mState = channel->mReady ? DataChannel::OPEN : DataChannel::WAITING_TO_OPEN;
if (channel->mState == OPEN) {
LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel));
NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
channel));
} else {
LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel));
}
}
void
DataChannelConnection::HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t streamIn)
DataChannelConnection::HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t stream)
{
/* XXX: Send an error message? */
LOG(("unknown DataChannel message received: %u, len %ld on stream %lu", ppid, length, streamIn));
LOG(("unknown DataChannel message received: %u, len %ld on stream %lu", ppid, length, stream));
// XXX Log to JS error console if possible
}
void
DataChannelConnection::HandleDataMessage(uint32_t ppid,
const void *data, size_t length,
uint16_t streamIn)
uint16_t stream)
{
DataChannel *channel;
const char *buffer = (const char *) data;
mLock.AssertCurrentThreadOwns();
channel = FindChannelByStreamIn(streamIn);
channel = FindChannelByStream(stream);
// XXX A closed channel may trip this... check
NS_ENSURE_TRUE_VOID(channel);
@ -1277,8 +1073,8 @@ DataChannelConnection::HandleDataMessage(uint32_t ppid,
switch (ppid) {
case DATA_CHANNEL_PPID_DOMSTRING:
LOG(("DataChannel: String message received of length %lu on channel %d: %.*s",
length, channel->mStreamOut, (int)PR_MIN(length, 80), buffer));
LOG(("DataChannel: String message received of length %lu on channel %u: %.*s",
length, channel->mStream, (int)PR_MIN(length, 80), buffer));
length = -1; // Flag for DOMString
// WebSockets checks IsUTF8() here; we can try to deliver it
@ -1290,13 +1086,13 @@ DataChannelConnection::HandleDataMessage(uint32_t ppid,
case DATA_CHANNEL_PPID_BINARY:
channel->mBinaryBuffer += recvData;
LOG(("DataChannel: Received binary message of length %lu (total %u) on channel id %d",
length, channel->mBinaryBuffer.Length(), channel->mStreamOut));
LOG(("DataChannel: Received binary message of length %lu (total %u) on channel id %u",
length, channel->mBinaryBuffer.Length(), channel->mStream));
return; // Not ready to notify application
case DATA_CHANNEL_PPID_BINARY_LAST:
LOG(("DataChannel: Received binary message of length %lu on channel id %d",
length, channel->mStreamOut));
LOG(("DataChannel: Received binary message of length %lu on channel id %u",
length, channel->mStream));
if (!channel->mBinaryBuffer.IsEmpty()) {
channel->mBinaryBuffer += recvData;
LOG(("%s: sending ON_DATA (binary fragmented) for %p", __FUNCTION__, channel));
@ -1324,52 +1120,37 @@ DataChannelConnection::HandleDataMessage(uint32_t ppid,
// Called with mLock locked!
void
DataChannelConnection::HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t streamIn)
DataChannelConnection::HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream)
{
const struct rtcweb_datachannel_open_request *req;
const struct rtcweb_datachannel_open_response *rsp;
const struct rtcweb_datachannel_ack *ack, *msg;
mLock.AssertCurrentThreadOwns();
switch (ppid) {
case DATA_CHANNEL_PPID_CONTROL:
NS_ENSURE_TRUE_VOID(length >= sizeof(*ack)); // Ack is the smallest
NS_ENSURE_TRUE_VOID(length >= sizeof(*req));
msg = static_cast<const struct rtcweb_datachannel_ack *>(buffer);
switch (msg->msg_type) {
req = static_cast<const struct rtcweb_datachannel_open_request *>(buffer);
switch (req->msg_type) {
case DATA_CHANNEL_OPEN_REQUEST:
LOG(("length %u, sizeof(*req) = %u", length, sizeof(*req)));
NS_ENSURE_TRUE_VOID(length >= sizeof(*req));
req = static_cast<const struct rtcweb_datachannel_open_request *>(buffer);
HandleOpenRequestMessage(req, length, streamIn);
break;
case DATA_CHANNEL_OPEN_RESPONSE:
NS_ENSURE_TRUE_VOID(length >= sizeof(*rsp));
rsp = static_cast<const struct rtcweb_datachannel_open_response *>(buffer);
HandleOpenResponseMessage(rsp, length, streamIn);
break;
case DATA_CHANNEL_ACK:
// >= sizeof(*ack) checked above
ack = static_cast<const struct rtcweb_datachannel_ack *>(buffer);
HandleOpenAckMessage(ack, length, streamIn);
HandleOpenRequestMessage(req, length, stream);
break;
default:
HandleUnknownMessage(ppid, length, streamIn);
HandleUnknownMessage(ppid, length, stream);
break;
}
break;
case DATA_CHANNEL_PPID_DOMSTRING:
case DATA_CHANNEL_PPID_BINARY:
case DATA_CHANNEL_PPID_BINARY_LAST:
HandleDataMessage(ppid, buffer, length, streamIn);
HandleDataMessage(ppid, buffer, length, stream);
break;
default:
LOG(("Message of length %lu, PPID %u on stream %u received.",
length, ppid, streamIn));
length, ppid, stream));
break;
}
}
@ -1567,7 +1348,7 @@ DataChannelConnection::HandleSendFailedEvent(const struct sctp_send_failed_event
if (ssfe->ssfe_flags & ~(SCTP_DATA_SENT | SCTP_DATA_UNSENT)) {
LOG(("(flags = %x) ", ssfe->ssfe_flags));
}
LOG(("message with PPID = %d, SID = %d, flags: 0x%04x due to error = 0x%08x",
LOG(("message with PPID = %u, SID = %d, flags: 0x%04x due to error = 0x%08x",
ntohl(ssfe->ssfe_info.snd_ppid), ssfe->ssfe_info.snd_sid,
ssfe->ssfe_info.snd_flags, ssfe->ssfe_error));
n = ssfe->ssfe_length - sizeof(struct sctp_send_failed_event);
@ -1582,7 +1363,7 @@ DataChannelConnection::ResetOutgoingStream(uint16_t streamOut)
uint32_t i;
mLock.AssertCurrentThreadOwns();
LOG(("Connection %p: Resetting outgoing stream %d",
LOG(("Connection %p: Resetting outgoing stream %u",
(void *) this, streamOut));
// Rarely has more than a couple items and only for a short time
for (i = 0; i < mStreamsResetting.Length(); ++i) {
@ -1634,7 +1415,7 @@ DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_eve
n = (strrst->strreset_length - sizeof(struct sctp_stream_reset_event)) / sizeof(uint16_t);
for (i = 0; i < n; ++i) {
if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
channel = FindChannelByStreamIn(strrst->strreset_stream_list[i]);
channel = FindChannelByStream(strrst->strreset_stream_list[i]);
if (channel) {
// The other side closed the channel
// We could be in three states:
@ -1647,21 +1428,20 @@ DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_eve
// 3. We've sent a open but haven't gotten a response yet (OPENING)
// I believe this is impossible, as we don't have an input stream yet.
LOG(("Incoming: Channel %d outgoing/%d incoming closed, state %d",
channel->mStreamOut, channel->mStreamIn, channel->mState));
LOG(("Incoming: Channel %u closed, state %d",
channel->mStream, channel->mState));
ASSERT_WEBRTC(channel->mState == DataChannel::OPEN ||
channel->mState == DataChannel::CLOSING ||
channel->mState == DataChannel::WAITING_TO_OPEN);
if (channel->mState == DataChannel::OPEN ||
channel->mState == DataChannel::WAITING_TO_OPEN) {
ResetOutgoingStream(channel->mStreamOut);
ResetOutgoingStream(channel->mStream);
SendOutgoingStreamReset();
NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
channel));
mStreamsOut[channel->mStreamOut] = nullptr;
}
mStreamsIn[channel->mStreamIn] = nullptr;
mStreams[channel->mStream] = nullptr;
LOG(("Disconnected DataChannel %p from connection %p",
(void *) channel.get(), (void *) channel->mConnection.get()));
@ -1673,16 +1453,14 @@ DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_eve
}
if (strrst->strreset_flags & SCTP_STREAM_RESET_OUTGOING_SSN) {
channel = FindChannelByStreamOut(strrst->strreset_stream_list[i]);
channel = FindChannelByStream(strrst->strreset_stream_list[i]);
if (channel) {
LOG(("Outgoing: Connection %p channel %p streams: %d outgoing/%d incoming closed",
(void *) this, (void *) channel.get(), channel->mStreamOut, channel->mStreamIn));
LOG(("Outgoing: Connection %p channel %p stream: %u closed",
(void *) this, (void *) channel.get(), channel->mStream));
ASSERT_WEBRTC(channel->mState == CLOSING);
if (channel->mState == CLOSING) {
mStreamsOut[channel->mStreamOut] = nullptr;
if (channel->mStreamIn != INVALID_STREAM)
mStreamsIn[channel->mStreamIn] = nullptr;
mStreams[channel->mStream] = nullptr;
LOG(("Disconnected DataChannel %p from connection %p (refcnt will be %u)",
(void *) channel.get(), (void *) channel->mConnection.get(),
(uint32_t) channel->mConnection->mRefCnt-1));
@ -1700,34 +1478,34 @@ DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_eve
void
DataChannelConnection::HandleStreamChangeEvent(const struct sctp_stream_change_event *strchg)
{
uint16_t streamOut;
uint16_t stream;
uint32_t i;
nsRefPtr<DataChannel> channel;
if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) {
LOG(("*** Failed increasing number of streams from %u (%u/%u)",
mStreamsOut.Length(),
mStreams.Length(),
strchg->strchange_instrms,
strchg->strchange_outstrms));
// XXX FIX! notify pending opens of failure
return;
} else {
if (strchg->strchange_instrms > mStreamsIn.Length()) {
if (strchg->strchange_instrms > mStreams.Length()) {
LOG(("Other side increased streamds from %u to %u",
mStreamsIn.Length(), strchg->strchange_instrms));
mStreams.Length(), strchg->strchange_instrms));
}
if (strchg->strchange_outstrms > mStreamsOut.Length()) {
uint16_t old_len = mStreamsOut.Length();
if (strchg->strchange_outstrms > mStreams.Length()) {
uint16_t old_len = mStreams.Length();
LOG(("Increasing number of streams from %u to %u - adding %u (in: %u)",
old_len,
strchg->strchange_outstrms,
strchg->strchange_outstrms - old_len,
strchg->strchange_instrms));
// make sure both are the same length
mStreamsOut.AppendElements(strchg->strchange_outstrms - old_len);
LOG(("New length = %d (was %d)", mStreamsOut.Length(), old_len));
for (uint32_t i = old_len; i < mStreamsOut.Length(); ++i) {
mStreamsOut[i] = nullptr;
mStreams.AppendElements(strchg->strchange_outstrms - old_len);
LOG(("New length = %d (was %d)", mStreams.Length(), old_len));
for (uint32_t i = old_len; i < mStreams.Length(); ++i) {
mStreams[i] = nullptr;
}
// Re-process any channels waiting for streams.
// Linear search, but we don't increase channels often and
@ -1743,7 +1521,7 @@ DataChannelConnection::HandleStreamChangeEvent(const struct sctp_stream_change_e
if (num_needed < 16)
num_needed = 16;
LOG(("Not enough new streams, asking for %d more", num_needed));
RequestMoreStreamsOut(num_needed);
RequestMoreStreams(num_needed);
}
// Can't copy nsDeque's. Move into temp array since any that fail will
@ -1756,10 +1534,7 @@ DataChannelConnection::HandleStreamChangeEvent(const struct sctp_stream_change_e
// Now assign our new streams
while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(temp.PopFront())))) {
if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_RSP) {
channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_RSP;
OpenResponseFinish(channel.forget()); // may reset the flag and re-push
} else if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_OPEN;
OpenFinish(channel.forget()); // may reset the flag and re-push
}
@ -1768,34 +1543,28 @@ DataChannelConnection::HandleStreamChangeEvent(const struct sctp_stream_change_e
// else probably not a change in # of streams
}
for (i = 0; i < mStreamsOut.Length(); ++i) {
channel = mStreamsOut[i];
for (i = 0; i < mStreams.Length(); ++i) {
channel = mStreams[i];
if (!channel)
continue;
if ((channel->mState == CONNECTING) &&
(channel->mStreamOut == INVALID_STREAM)) {
(channel->mStream == INVALID_STREAM)) {
if ((strchg->strchange_flags & SCTP_STREAM_CHANGE_DENIED) ||
(strchg->strchange_flags & SCTP_STREAM_CHANGE_FAILED)) {
/* XXX: Signal to the other end. */
if (channel->mStreamIn != INVALID_STREAM) {
mStreamsIn[channel->mStreamIn] = nullptr;
}
channel->mState = CLOSED;
NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
channel));
// maybe fire onError (bug 843625)
} else {
streamOut = FindFreeStreamOut();
if (streamOut != INVALID_STREAM) {
channel->mStreamOut = streamOut;
mStreamsOut[streamOut] = channel;
if (channel->mStreamIn == INVALID_STREAM) {
channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
} else {
channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_RSP;
}
stream = FindFreeStream();
if (stream != INVALID_STREAM) {
channel->mStream = stream;
mStreams[stream] = channel;
channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
/// XXX fix
StartDefer();
} else {
/* We will not find more ... */
@ -1908,7 +1677,7 @@ DataChannelConnection::Open(const nsACString& label, Type type, bool inOrder,
flags = !inOrder ? DATA_CHANNEL_FLAG_OUT_OF_ORDER_ALLOWED : 0;
nsRefPtr<DataChannel> channel(new DataChannel(this,
INVALID_STREAM, INVALID_STREAM,
INVALID_STREAM,
DataChannel::CONNECTING,
label, type, prValue,
flags,
@ -1922,15 +1691,15 @@ DataChannelConnection::Open(const nsACString& label, Type type, bool inOrder,
already_AddRefed<DataChannel>
DataChannelConnection::OpenFinish(already_AddRefed<DataChannel> aChannel)
{
uint16_t streamOut = FindFreeStreamOut(); // may be INVALID_STREAM!
uint16_t stream = FindFreeStream(); // may be INVALID_STREAM!
nsRefPtr<DataChannel> channel(aChannel);
mLock.AssertCurrentThreadOwns();
LOG(("Finishing open: channel %p, streamOut = %u", channel.get(), streamOut));
LOG(("Finishing open: channel %p, stream = %u", channel.get(), stream));
if (streamOut == INVALID_STREAM) {
if (!RequestMoreStreamsOut()) {
if (stream == INVALID_STREAM) {
if (!RequestMoreStreams()) {
channel->mState = CLOSED;
if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
// We already returned the channel to the app.
@ -1952,10 +1721,10 @@ DataChannelConnection::OpenFinish(already_AddRefed<DataChannel> aChannel)
mPending.Push(channel);
return channel.forget();
}
mStreamsOut[streamOut] = channel;
channel->mStreamOut = streamOut;
mStreams[stream] = channel;
channel->mStream = stream;
if (!SendOpenRequestMessage(channel->mLabel, streamOut,
if (!SendOpenRequestMessage(channel->mLabel, stream,
!!(channel->mFlags & DATA_CHANNEL_FLAG_OUT_OF_ORDER_ALLOWED),
channel->mPrPolicy, channel->mPrValue)) {
LOG(("SendOpenRequest failed, errno = %d", errno));
@ -1972,12 +1741,20 @@ DataChannelConnection::OpenFinish(already_AddRefed<DataChannel> aChannel)
}
// If we haven't returned the channel yet, it will get destroyed when we exit
// this function.
mStreamsOut[streamOut] = nullptr;
channel->mStreamOut = INVALID_STREAM;
mStreams[stream] = nullptr;
channel->mStream = INVALID_STREAM;
// we'll be destroying the channel
channel->mState = CLOSED;
return nullptr;
}
} else {
channel->mState = OPEN;
channel->mReady = true;
// FIX? Move into DOMDataChannel? I don't think we can send it yet here
LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
channel));
}
return channel.forget();
}
@ -2002,7 +1779,7 @@ DataChannelConnection::SendMsgInternal(DataChannel *channel, const char *data,
flags &= ~SCTP_UNORDERED;
}
spa.sendv_sndinfo.snd_ppid = htonl(ppid);
spa.sendv_sndinfo.snd_sid = channel->mStreamOut;
spa.sendv_sndinfo.snd_sid = channel->mStream;
spa.sendv_sndinfo.snd_flags = flags;
spa.sendv_sndinfo.snd_context = 0;
spa.sendv_sndinfo.snd_assoc_id = 0;
@ -2065,8 +1842,10 @@ DataChannelConnection::SendBinary(DataChannel *channel, const char *data,
// We *really* don't want to do this from main thread! - and SendMsgInternal
// avoids blocking.
// This MUST be reliable and in-order for the reassembly to work
if (len > DATA_CHANNEL_MAX_BINARY_FRAGMENT &&
channel->mPrPolicy == DATA_CHANNEL_RELIABLE) {
channel->mPrPolicy == DATA_CHANNEL_RELIABLE &&
!(channel->mFlags & DATA_CHANNEL_FLAG_OUT_OF_ORDER_ALLOWED)) {
int32_t sent=0;
uint32_t origlen = len;
LOG(("Sending binary message length %u in chunks", len));
@ -2076,7 +1855,7 @@ DataChannelConnection::SendBinary(DataChannel *channel, const char *data,
uint32_t ppid;
len -= sendlen;
ppid = len > 0 ? DATA_CHANNEL_PPID_BINARY : DATA_CHANNEL_PPID_BINARY_LAST;
LOG(("Send chunk of %d bytes, ppid %d", sendlen, ppid));
LOG(("Send chunk of %u bytes, ppid %u", sendlen, ppid));
// Note that these might end up being deferred and queued.
sent += SendMsgInternal(channel, data, sendlen, ppid);
data += sendlen;
@ -2097,7 +1876,7 @@ DataChannelConnection::SendBinary(DataChannel *channel, const char *data,
int32_t
DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream *aBlob)
{
DataChannel *channel = mStreamsOut[stream];
DataChannel *channel = mStreams[stream];
NS_ENSURE_TRUE(channel, 0);
// Spawn a thread to send the data
@ -2142,7 +1921,7 @@ DataChannelConnection::SendMsgCommon(uint16_t stream, const nsACString &aMsg,
ASSERT_WEBRTC(NS_IsMainThread());
// We really could allow this from other threads, so long as we deal with
// asynchronosity issues with channels closing, in particular access to
// mStreamsOut, and issues with the association closing (access to mSocket).
// mStreams, and issues with the association closing (access to mSocket).
const char *data = aMsg.BeginReading();
uint32_t len = aMsg.Length();
@ -2150,7 +1929,7 @@ DataChannelConnection::SendMsgCommon(uint16_t stream, const nsACString &aMsg,
LOG(("Sending %sto stream %u: %u bytes", isBinary ? "binary " : "", stream, len));
// XXX if we want more efficiency, translate flags once at open time
channel = mStreamsOut[stream];
channel = mStreams[stream];
NS_ENSURE_TRUE(channel, 0);
if (isBinary)
@ -2174,20 +1953,20 @@ DataChannelConnection::CloseInt(DataChannel *aChannel)
nsRefPtr<DataChannel> channel(aChannel); // make sure it doesn't go away on us
mLock.AssertCurrentThreadOwns();
LOG(("Connection %p/Channel %p: Closing stream %d",
aChannel->mConnection.get(), aChannel, aChannel->mStreamOut));
LOG(("Connection %p/Channel %p: Closing stream %u",
channel->mConnection.get(), channel.get(), channel->mStream));
// re-test since it may have closed before the lock was grabbed
if (aChannel->mState == CLOSED || aChannel->mState == CLOSING) {
LOG(("Channel already closing/closed (%d)", aChannel->mState));
LOG(("Channel already closing/closed (%u)", aChannel->mState));
return;
}
aChannel->mBufferedData.Clear();
if (aChannel->mStreamOut != INVALID_STREAM) {
ResetOutgoingStream(aChannel->mStreamOut);
if (channel->mStream != INVALID_STREAM) {
ResetOutgoingStream(channel->mStream);
if (mState == CLOSED) { // called from CloseAll()
// Let resets accumulate then send all at once in CloseAll()
// we're not going to hang around waiting
mStreamsOut[aChannel->mStreamOut] = nullptr;
mStreams[channel->mStream] = nullptr;
} else {
SendOutgoingStreamReset();
}
@ -2195,9 +1974,6 @@ DataChannelConnection::CloseInt(DataChannel *aChannel)
aChannel->mState = CLOSING;
if (mState == CLOSED) {
// we're not going to hang around waiting
if (channel->mStreamOut != INVALID_STREAM) {
mStreamsIn[channel->mStreamIn] = nullptr;
}
channel->Destroy();
}
// At this point when we leave here, the object is a zombie held alive only by the DOM object
@ -2215,9 +1991,9 @@ void DataChannelConnection::CloseAll()
// If there are runnables, they hold a strong ref and keep the channel
// and/or connection alive (even if in a CLOSED state)
bool closed_some = false;
for (uint32_t i = 0; i < mStreamsOut.Length(); ++i) {
if (mStreamsOut[i]) {
mStreamsOut[i]->Close();
for (uint32_t i = 0; i < mStreams.Length(); ++i) {
if (mStreams[i]) {
mStreams[i]->Close();
closed_some = true;
}
}
@ -2225,7 +2001,7 @@ void DataChannelConnection::CloseAll()
// Clean up any pending opens for channels
nsRefPtr<DataChannel> channel;
while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(mPending.PopFront())))) {
LOG(("closing pending channel %p, stream %d", channel.get(), channel->mStreamOut));
LOG(("closing pending channel %p, stream %u", channel.get(), channel->mStream));
channel->Close(); // also releases the ref on each iteration
closed_some = true;
}
@ -2258,13 +2034,10 @@ DataChannel::Destroy()
{
ENSURE_DATACONNECTION;
LOG(("Destroying Data channel %d/%d", mStreamOut, mStreamIn));
MOZ_ASSERT_IF(mStreamOut != INVALID_STREAM,
!mConnection->FindChannelByStreamOut(mStreamOut));
MOZ_ASSERT_IF(mStreamIn != INVALID_STREAM,
!mConnection->FindChannelByStreamIn(mStreamIn));
mStreamIn = INVALID_STREAM;
mStreamOut = INVALID_STREAM;
LOG(("Destroying Data channel %u", mStream));
MOZ_ASSERT_IF(mStream != INVALID_STREAM,
!mConnection->FindChannelByStream(mStream));
mStream = INVALID_STREAM;
mState = CLOSED;
mConnection = nullptr;
}

View File

@ -111,7 +111,8 @@ public:
virtual void NotifyDataChannel(already_AddRefed<DataChannel> channel) = 0;
};
DataChannelConnection(DataConnectionListener *listener);
DataChannelConnection(DataConnectionListener *listener,
bool aIsEven);
virtual ~DataChannelConnection();
bool Init(unsigned short aPort, uint16_t aNumStreams, bool aUsingDtls);
@ -187,15 +188,12 @@ private:
void SctpDtlsInput(TransportFlow *flow, const unsigned char *data, size_t len);
static int SctpDtlsOutput(void *addr, void *buffer, size_t length, uint8_t tos, uint8_t set_df);
#endif
DataChannel* FindChannelByStreamIn(uint16_t streamIn);
DataChannel* FindChannelByStreamOut(uint16_t streamOut);
uint16_t FindFreeStreamOut();
bool RequestMoreStreamsOut(int32_t aNeeded = 16);
DataChannel* FindChannelByStream(uint16_t stream);
uint16_t FindFreeStream();
bool RequestMoreStreams(int32_t aNeeded = 16);
int32_t SendControlMessage(void *msg, uint32_t len, uint16_t streamOut);
int32_t SendOpenRequestMessage(const nsACString& label,uint16_t streamOut,
bool unordered, uint16_t prPolicy, uint32_t prValue);
int32_t SendOpenResponseMessage(uint16_t streamOut, uint16_t streamIn);
int32_t SendOpenAckMessage(uint16_t streamOut);
int32_t SendMsgInternal(DataChannel *channel, const char *data,
uint32_t length, uint32_t ppid);
int32_t SendBinary(DataChannel *channel, const char *data,
@ -211,11 +209,6 @@ private:
void HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req,
size_t length,
uint16_t streamIn);
void OpenResponseFinish(already_AddRefed<DataChannel> channel);
void HandleOpenResponseMessage(const struct rtcweb_datachannel_open_response *rsp,
size_t length, uint16_t streamIn);
void HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack,
size_t length, uint16_t streamIn);
void HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t streamIn);
void HandleDataMessage(uint32_t ppid, const void *buffer, size_t length, uint16_t streamIn);
void HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t streamIn);
@ -243,10 +236,10 @@ private:
static void ReleaseTransportFlow(nsRefPtr<TransportFlow> aFlow) {}
// Data:
// NOTE: while these arrays will auto-expand, increases in the number of
// NOTE: while this array will auto-expand, increases in the number of
// channels available from the stack must be negotiated!
nsAutoTArray<nsRefPtr<DataChannel>,16> mStreamsOut;
nsAutoTArray<nsRefPtr<DataChannel>,16> mStreamsIn;
bool mAllocateEven;
nsAutoTArray<nsRefPtr<DataChannel>,16> mStreams;
nsDeque mPending; // Holds already_AddRefed<DataChannel>s -- careful!
// Streams pending reset
@ -287,7 +280,7 @@ public:
};
DataChannel(DataChannelConnection *connection,
uint16_t streamOut, uint16_t streamIn,
uint16_t stream,
uint16_t state,
const nsACString& label,
uint16_t policy, uint32_t value,
@ -301,8 +294,7 @@ public:
, mLabel(label)
, mState(state)
, mReady(false)
, mStreamOut(streamOut)
, mStreamIn(streamIn)
, mStream(stream)
, mPrPolicy(policy)
, mPrValue(value)
, mFlags(0)
@ -327,8 +319,8 @@ public:
{
ENSURE_DATACONNECTION_RET(false);
if (mStreamOut != INVALID_STREAM)
return (mConnection->SendMsg(mStreamOut, aMsg) > 0);
if (mStream != INVALID_STREAM)
return (mConnection->SendMsg(mStream, aMsg) > 0);
else
return false;
}
@ -338,8 +330,8 @@ public:
{
ENSURE_DATACONNECTION_RET(false);
if (mStreamOut != INVALID_STREAM)
return (mConnection->SendBinaryMsg(mStreamOut, aMsg) > 0);
if (mStream != INVALID_STREAM)
return (mConnection->SendBinaryMsg(mStream, aMsg) > 0);
else
return false;
}
@ -349,8 +341,8 @@ public:
{
ENSURE_DATACONNECTION_RET(false);
if (mStreamOut != INVALID_STREAM)
return (mConnection->SendBlob(mStreamOut, aBlob) > 0);
if (mStream != INVALID_STREAM)
return (mConnection->SendBlob(mStream, aBlob) > 0);
else
return false;
}
@ -393,8 +385,7 @@ private:
nsCString mLabel;
uint16_t mState;
bool mReady;
uint16_t mStreamOut;
uint16_t mStreamIn;
uint16_t mStream;
uint16_t mPrPolicy;
uint32_t mPrValue;
uint32_t mFlags;

View File

@ -44,21 +44,9 @@ struct rtcweb_datachannel_open_request {
char label[1]; // keep VC++ happy... UTF8 null-terminated string
} SCTP_PACKED;
struct rtcweb_datachannel_open_response {
uint8_t msg_type; // DATA_CHANNEL_OPEN_RESPONSE
uint8_t error; // 0 == no error
uint16_t flags;
uint16_t reverse_stream;
} SCTP_PACKED;
struct rtcweb_datachannel_ack {
uint8_t msg_type; // DATA_CHANNEL_ACK
} SCTP_PACKED;
/* msg_type values: */
#define DATA_CHANNEL_OPEN_REQUEST 0
#define DATA_CHANNEL_OPEN_RESPONSE 1
#define DATA_CHANNEL_ACK 2
/* 0-2 were used in an early version of the protocol with 3-way handshakes */
#define DATA_CHANNEL_OPEN_REQUEST 3
/* channel_type values: */
#define DATA_CHANNEL_RELIABLE 0