Bug 930778: Support DataChannel ACK for unordered channels r=tuexen

This commit is contained in:
Randell Jesup 2013-10-25 16:08:18 -04:00
parent 8d9d566333
commit 756f0284a3
3 changed files with 104 additions and 25 deletions

View File

@ -848,9 +848,9 @@ DataChannelConnection::Connect(const char *addr, unsigned short port)
#endif
DataChannel *
DataChannelConnection::FindChannelByStream(uint16_t streamOut)
DataChannelConnection::FindChannelByStream(uint16_t stream)
{
return mStreams.SafeElementAt(streamOut);
return mStreams.SafeElementAt(stream);
}
uint16_t
@ -934,6 +934,17 @@ DataChannelConnection::SendControlMessage(void *msg, uint32_t len, uint16_t stre
return (1);
}
int32_t
DataChannelConnection::SendOpenAckMessage(uint16_t stream)
{
struct rtcweb_datachannel_ack ack;
memset(&ack, 0, sizeof(struct rtcweb_datachannel_ack));
ack.msg_type = DATA_CHANNEL_ACK;
return SendControlMessage(&ack, sizeof(ack), stream);
}
int32_t
DataChannelConnection::SendOpenRequestMessage(const nsACString& label,
const nsACString& protocol,
@ -1035,6 +1046,23 @@ DataChannelConnection::SendDeferredMessages()
if (still_blocked)
break;
if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_ACK) {
if (SendOpenAckMessage(channel->mStream)) {
channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_ACK;
sent = true;
} else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
still_blocked = true;
} else {
// Close the channel, inform the user
CloseInt(channel);
// XXX send error via DataChannelOnMessageAvailable (bug 843625)
}
}
}
if (still_blocked)
break;
if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_DATA) {
bool failed_send = false;
int32_t result;
@ -1105,12 +1133,14 @@ DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_
mLock.AssertCurrentThreadOwns();
if (length != (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)) {
LOG(("Inconsistent length: %u, should be %u", length,
LOG(("%s: Inconsistent length: %u, should be %u", __FUNCTION__, length,
(sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)));
if (length < (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length))
return;
}
LOG(("%s: length %u, sizeof(*req) = %u", __FUNCTION__, length, sizeof(*req)));
switch (req->channel_type) {
case DATA_CHANNEL_RELIABLE:
case DATA_CHANNEL_RELIABLE_UNORDERED:
@ -1176,12 +1206,20 @@ DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_
LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
if (!SendOpenAckMessage(stream)) {
// XXX Only on EAGAIN!? And if not, then close the channel??
channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_ACK;
StartDefer();
}
// Now process any queued data messages for the channel (which will
// themselves likely get queued until we leave WAITING_TO_OPEN, plus any
// more that come in before that happens)
DeliverQueuedData(stream);
}
// NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
// That would make this code moot. Keep it for now for backwards compatibility.
void
DataChannelConnection::DeliverQueuedData(uint16_t stream)
{
@ -1204,6 +1242,23 @@ DataChannelConnection::DeliverQueuedData(uint16_t stream)
}
}
void
DataChannelConnection::HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack,
size_t length, uint16_t stream)
{
DataChannel *channel;
mLock.AssertCurrentThreadOwns();
channel = FindChannelByStream(stream);
NS_ENSURE_TRUE_VOID(channel);
LOG(("OpenAck received for stream %u, waiting=%d", stream,
(channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK) ? 1 : 0));
channel->mFlags &= ~DATA_CHANNEL_FLAGS_WAITING_ACK;
}
void
DataChannelConnection::HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t stream)
{
@ -1225,6 +1280,8 @@ DataChannelConnection::HandleDataMessage(uint32_t ppid,
channel = FindChannelByStream(stream);
// XXX A closed channel may trip this... check
// NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
// That would make this code moot. Keep it for now for backwards compatibility.
if (!channel) {
// In the updated 0-RTT open case, the sender can send data immediately
// after Open, and doesn't set the in-order bit (since we don't have a
@ -1318,22 +1375,28 @@ void
DataChannelConnection::HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream)
{
const struct rtcweb_datachannel_open_request *req;
const struct rtcweb_datachannel_ack *ack;
mLock.AssertCurrentThreadOwns();
switch (ppid) {
case DATA_CHANNEL_PPID_CONTROL:
// structure includes a possibly-unused char label[1] (in a packed structure)
NS_ENSURE_TRUE_VOID(length >= sizeof(*req) - 1);
req = static_cast<const struct rtcweb_datachannel_open_request *>(buffer);
NS_ENSURE_TRUE_VOID(length >= sizeof(*ack)); // smallest message
switch (req->msg_type) {
case DATA_CHANNEL_OPEN_REQUEST:
LOG(("length %u, sizeof(*req) = %u", length, sizeof(*req)));
NS_ENSURE_TRUE_VOID(length >= sizeof(*req));
// structure includes a possibly-unused char label[1] (in a packed structure)
NS_ENSURE_TRUE_VOID(length >= sizeof(*req) - 1);
HandleOpenRequestMessage(req, length, stream);
break;
case DATA_CHANNEL_ACK:
// >= sizeof(*ack) checked above
ack = static_cast<const struct rtcweb_datachannel_ack *>(buffer);
HandleOpenAckMessage(ack, length, stream);
break;
default:
HandleUnknownMessage(ppid, length, stream);
break;
@ -1581,20 +1644,20 @@ DataChannelConnection::ClearResets()
}
void
DataChannelConnection::ResetOutgoingStream(uint16_t streamOut)
DataChannelConnection::ResetOutgoingStream(uint16_t stream)
{
uint32_t i;
mLock.AssertCurrentThreadOwns();
LOG(("Connection %p: Resetting outgoing stream %u",
(void *) this, streamOut));
(void *) this, stream));
// Rarely has more than a couple items and only for a short time
for (i = 0; i < mStreamsResetting.Length(); ++i) {
if (mStreamsResetting[i] == streamOut) {
if (mStreamsResetting[i] == stream) {
return;
}
}
mStreamsResetting.AppendElement(streamOut);
mStreamsResetting.AppendElement(stream);
}
void
@ -2010,6 +2073,11 @@ DataChannelConnection::OpenFinish(already_AddRefed<DataChannel> aChannel)
SendMsgInternal(channel, "Help me!", 8, DATA_CHANNEL_PPID_DOMSTRING_LAST);
#endif
if (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) {
// Don't send unordered until this gets cleared
channel->mFlags |= DATA_CHANNEL_FLAGS_WAITING_ACK;
}
if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
if (!SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
stream,
@ -2078,14 +2146,16 @@ DataChannelConnection::SendMsgInternal(DataChannel *channel, const char *data,
NS_ENSURE_TRUE(channel->mState == OPEN || channel->mState == CONNECTING, 0);
NS_WARN_IF_FALSE(length > 0, "Length is 0?!");
flags = (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) ? SCTP_UNORDERED : 0;
// To avoid problems where an in-order OPEN_RESPONSE is lost and an
// To avoid problems where an in-order OPEN is lost and an
// out-of-order data message "beats" it, require data to be in-order
// until we get an ACK.
if (channel->mState == CONNECTING) {
flags &= ~SCTP_UNORDERED;
if ((channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) &&
!(channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK)) {
flags = SCTP_UNORDERED;
} else {
flags = 0;
}
spa.sendv_sndinfo.snd_ppid = htonl(ppid);
spa.sendv_sndinfo.snd_sid = channel->mStream;
spa.sendv_sndinfo.snd_flags = flags;

View File

@ -207,10 +207,11 @@ private:
DataChannel* FindChannelByStream(uint16_t stream);
uint16_t FindFreeStream();
bool RequestMoreStreams(int32_t aNeeded = 16);
int32_t SendControlMessage(void *msg, uint32_t len, uint16_t streamOut);
int32_t SendControlMessage(void *msg, uint32_t len, uint16_t stream);
int32_t SendOpenRequestMessage(const nsACString& label, const nsACString& protocol,
uint16_t streamOut,
uint16_t stream,
bool unordered, uint16_t prPolicy, uint32_t prValue);
int32_t SendOpenAckMessage(uint16_t stream);
int32_t SendMsgInternal(DataChannel *channel, const char *data,
uint32_t length, uint32_t ppid);
int32_t SendBinary(DataChannel *channel, const char *data,
@ -226,13 +227,15 @@ private:
void ProcessQueuedOpens();
void ClearResets();
void SendOutgoingStreamReset();
void ResetOutgoingStream(uint16_t streamOut);
void ResetOutgoingStream(uint16_t stream);
void HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req,
size_t length,
uint16_t streamIn);
void HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t streamIn);
void HandleDataMessage(uint32_t ppid, const void *buffer, size_t length, uint16_t streamIn);
void HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t streamIn);
uint16_t stream);
void HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack,
size_t length, uint16_t stream);
void HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t stream);
void HandleDataMessage(uint32_t ppid, const void *buffer, size_t length, uint16_t stream);
void HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream);
void HandleAssociationChangeEvent(const struct sctp_assoc_change *sac);
void HandlePeerAddressChangeEvent(const struct sctp_paddr_change *spc);
void HandleRemoteErrorEvent(const struct sctp_remote_error *sre);

View File

@ -35,6 +35,7 @@
#define DATA_CHANNEL_FLAGS_FINISH_OPEN 0x00000020
#define DATA_CHANNEL_FLAGS_FINISH_RSP 0x00000040
#define DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED 0x00000080
#define DATA_CHANNEL_FLAGS_WAITING_ACK 0x00000100
#define INVALID_STREAM (0xFFFF)
// max is 0xFFFF: Streams 0 to 0xFFFE = 0xFFFF streams
@ -50,8 +51,13 @@ struct rtcweb_datachannel_open_request {
char label[1]; // (and protocol) keep VC++ happy...
} SCTP_PACKED;
struct rtcweb_datachannel_ack {
uint8_t msg_type; // DATA_CHANNEL_ACK
} SCTP_PACKED;
/* msg_type values: */
/* 0-2 were used in an early version of the protocol with 3-way handshakes */
/* 0-1 were used in an early version of the protocol with 3-way handshakes */
#define DATA_CHANNEL_ACK 2
#define DATA_CHANNEL_OPEN_REQUEST 3
/* channel_type values: */