Bug 1178091: Implement RTCDataChannel BufferedAmountLowThreshold and bufferedamountlow event r=smaug,drno

This commit is contained in:
Randell Jesup 2015-09-28 19:02:23 -04:00
parent 25b3357762
commit 08e9208d4a
7 changed files with 96 additions and 13 deletions

View File

@ -213,6 +213,12 @@ nsDOMDataChannel::BufferedAmount() const
return mDataChannel->GetBufferedAmount();
}
uint32_t
nsDOMDataChannel::BufferedAmountLowThreshold() const
{
return mDataChannel->GetBufferedAmountLowThreshold();
}
NS_IMETHODIMP
nsDOMDataChannel::GetBufferedAmount(uint32_t* aBufferedAmount)
{
@ -220,6 +226,12 @@ nsDOMDataChannel::GetBufferedAmount(uint32_t* aBufferedAmount)
return NS_OK;
}
void
nsDOMDataChannel::SetBufferedAmountLowThreshold(uint32_t aThreshold)
{
mDataChannel->SetBufferedAmountLowThreshold(aThreshold);
}
NS_IMETHODIMP nsDOMDataChannel::GetBinaryType(nsAString & aBinaryType)
{
switch (mBinaryType) {
@ -468,6 +480,14 @@ nsDOMDataChannel::OnChannelClosed(nsISupports* aContext)
return OnSimpleEvent(aContext, NS_LITERAL_STRING("close"));
}
nsresult
nsDOMDataChannel::OnBufferLow(nsISupports* aContext)
{
LOG(("%p(%p): %s - Dispatching\n",this,(void*)mDataChannel,__FUNCTION__));
return OnSimpleEvent(aContext, NS_LITERAL_STRING("bufferedamountlow"));
}
void
nsDOMDataChannel::AppReady()
{

View File

@ -54,11 +54,14 @@ public:
bool Reliable() const;
mozilla::dom::RTCDataChannelState ReadyState() const;
uint32_t BufferedAmount() const;
uint32_t BufferedAmountLowThreshold() const;
void SetBufferedAmountLowThreshold(uint32_t aThreshold);
IMPL_EVENT_HANDLER(open)
IMPL_EVENT_HANDLER(error)
IMPL_EVENT_HANDLER(close)
// Uses XPIDL Close.
IMPL_EVENT_HANDLER(message)
IMPL_EVENT_HANDLER(bufferedamountlow)
mozilla::dom::RTCDataChannelType BinaryType() const
{
return static_cast<mozilla::dom::RTCDataChannelType>(
@ -97,6 +100,9 @@ public:
virtual nsresult
OnChannelClosed(nsISupports* aContext) override;
virtual nsresult
OnBufferLow(nsISupports* aContext) override;
virtual void
AppReady();

View File

@ -699,6 +699,7 @@ GK_ATOM(onblocked, "onblocked")
GK_ATOM(onblur, "onblur")
GK_ATOM(onbroadcast, "onbroadcast")
GK_ATOM(onbusy, "onbusy")
GK_ATOM(onbufferedamountlow, "onbufferedamountlow")
GK_ATOM(oncached, "oncached")
GK_ATOM(oncallschanged, "oncallschanged")
GK_ATOM(oncancel, "oncancel")

View File

@ -21,11 +21,13 @@ interface DataChannel : EventTarget
readonly attribute boolean reliable;
readonly attribute RTCDataChannelState readyState;
readonly attribute unsigned long bufferedAmount;
attribute unsigned long bufferedAmountLowThreshold;
attribute EventHandler onopen;
attribute EventHandler onerror;
attribute EventHandler onclose;
void close();
attribute EventHandler onmessage;
attribute EventHandler onbufferedamountlow;
attribute RTCDataChannelType binaryType;
[Throws]
void send(DOMString data);

View File

@ -149,7 +149,7 @@ NS_IMPL_ISUPPORTS(DataChannelShutdown, nsIObserver);
BufferedMsg::BufferedMsg(struct sctp_sendv_spa &spa, const char *data,
uint32_t length) : mLength(length)
size_t length) : mLength(length)
{
mSpa = new sctp_sendv_spa;
*mSpa = spa;
@ -1123,11 +1123,16 @@ DataChannelConnection::SendDeferredMessages()
if (channel->mState == CLOSED || channel->mState == CLOSING) {
channel->mBufferedData.Clear();
}
uint32_t buffered_amount = channel->GetBufferedAmount();
uint32_t threshold = channel->GetBufferedAmountLowThreshold();
bool was_over_threshold = buffered_amount >= threshold;
while (!channel->mBufferedData.IsEmpty() &&
!failed_send) {
struct sctp_sendv_spa *spa = channel->mBufferedData[0]->mSpa;
const char *data = channel->mBufferedData[0]->mData;
uint32_t len = channel->mBufferedData[0]->mLength;
size_t len = channel->mBufferedData[0]->mLength;
// SCTP will return EMSGSIZE if the message is bigger than the buffer
// size (or EAGAIN if there isn't space)
@ -1147,7 +1152,19 @@ DataChannelConnection::SendDeferredMessages()
} else {
LOG(("Resent buffer of %d bytes (%d)", len, result));
sent = true;
// In theory this could underflow if >4GB was buffered and re
// truncated in GetBufferedAmount(), but this won't cause any problems.
buffered_amount -= channel->mBufferedData[0]->mLength;
channel->mBufferedData.RemoveElementAt(0);
// can never fire with default threshold of 0
if (was_over_threshold && buffered_amount < threshold) {
LOG(("%s: sending BUFFER_LOW_THRESHOLD for %s/%s: %u", __FUNCTION__,
channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::BUFFER_LOW_THRESHOLD,
this, channel)));
was_over_threshold = false;
}
}
}
if (channel->mBufferedData.IsEmpty())
@ -1288,7 +1305,7 @@ DataChannelConnection::DeliverQueuedData(uint16_t stream)
// Careful! we may modify the array length from within the loop!
if (mQueuedData[i]->mStream == stream) {
LOG(("Delivering queued data for stream %u, length %u",
stream, mQueuedData[i]->mLength));
stream, (unsigned int) mQueuedData[i]->mLength));
// Deliver the queued data
HandleDataMessage(mQueuedData[i]->mPpid,
mQueuedData[i]->mData, mQueuedData[i]->mLength,
@ -2199,7 +2216,7 @@ request_error_cleanup:
int32_t
DataChannelConnection::SendMsgInternal(DataChannel *channel, const char *data,
uint32_t length, uint32_t ppid)
size_t length, uint32_t ppid)
{
uint16_t flags;
struct sctp_sendv_spa spa;
@ -2267,7 +2284,7 @@ DataChannelConnection::SendMsgInternal(DataChannel *channel, const char *data,
// Handles fragmenting binary messages
int32_t
DataChannelConnection::SendBinary(DataChannel *channel, const char *data,
uint32_t len,
size_t len,
uint32_t ppid_partial, uint32_t ppid_final)
{
// Since there's a limit on network buffer size and no limits on message
@ -2292,7 +2309,7 @@ DataChannelConnection::SendBinary(DataChannel *channel, const char *data,
LOG(("Sending binary message length %u in chunks", len));
// XXX check flags for out-of-order, or force in-order for large binary messages
while (len > 0) {
uint32_t sendlen = PR_MIN(len, DATA_CHANNEL_MAX_BINARY_FRAGMENT);
size_t sendlen = PR_MIN(len, DATA_CHANNEL_MAX_BINARY_FRAGMENT);
uint32_t ppid;
len -= sendlen;
ppid = len > 0 ? ppid_partial : ppid_final;
@ -2622,13 +2639,36 @@ DataChannel::AppReady()
uint32_t
DataChannel::GetBufferedAmount()
{
uint32_t buffered = 0;
for (uint32_t i = 0; i < mBufferedData.Length(); ++i) {
buffered += mBufferedData[i]->mLength;
size_t buffered = 0;
for (auto& buffer : mBufferedData) {
buffered += buffer->mLength;
}
// XXX Note: per Michael Tuexen, there's no way to currently get the buffered
// amount from the SCTP stack for a single stream. It is on their to-do
// list, and once we import a stack with support for that, we'll need to
// add it to what we buffer. Also we'll need to ask for notification of a per-
// stream buffer-low event and merge that into the handling of buffer-low
// (the equivalent to TCP_NOTSENT_LOWAT on TCP sockets)
if (buffered > UINT32_MAX) { // paranoia - >4GB buffered is very very unlikely
buffered = UINT32_MAX;
}
return buffered;
}
uint32_t
DataChannel::GetBufferedAmountLowThreshold()
{
return mBufferedThreshold;
}
// Never fire immediately, as it's defined to fire on transitions, not state
void
DataChannel::SetBufferedAmountLowThreshold(uint32_t aThreshold)
{
mBufferedThreshold = aThreshold;
}
// Called with mLock locked!
void
DataChannel::SendOrQueue(DataChannelOnMessageAvailable *aMessage)

View File

@ -57,12 +57,12 @@ class BufferedMsg
{
public:
BufferedMsg(struct sctp_sendv_spa &spa,const char *data,
uint32_t length);
size_t length);
~BufferedMsg();
struct sctp_sendv_spa *mSpa;
const char *mData;
uint32_t mLength;
size_t mLength;
};
// for queuing incoming data messages before the Open or
@ -213,9 +213,9 @@ private:
bool unordered, uint16_t prPolicy, uint32_t prValue);
int32_t SendOpenAckMessage(uint16_t stream);
int32_t SendMsgInternal(DataChannel *channel, const char *data,
uint32_t length, uint32_t ppid);
size_t length, uint32_t ppid);
int32_t SendBinary(DataChannel *channel, const char *data,
uint32_t len, uint32_t ppid_partial, uint32_t ppid_final);
size_t len, uint32_t ppid_partial, uint32_t ppid_final);
int32_t SendMsgCommon(uint16_t stream, const nsACString &aMsg, bool isBinary);
void DeliverQueuedData(uint16_t stream);
@ -328,6 +328,7 @@ public:
, mPrValue(value)
, mFlags(flags)
, mIsRecvBinary(false)
, mBufferedThreshold(0) // default from spec
{
NS_ASSERTION(mConnection,"NULL connection");
}
@ -387,6 +388,10 @@ public:
// Amount of data buffered to send
uint32_t GetBufferedAmount();
// Trigger amount for generating BufferedAmountLow events
uint32_t GetBufferedAmountLowThreshold();
void SetBufferedAmountLowThreshold(uint32_t aThreshold);
// Find out state
uint16_t GetReadyState()
{
@ -429,6 +434,7 @@ private:
uint32_t mFlags;
uint32_t mId;
bool mIsRecvBinary;
size_t mBufferedThreshold;
nsCString mRecvBuffer;
nsTArray<nsAutoPtr<BufferedMsg> > mBufferedData;
nsTArray<nsCOMPtr<nsIRunnable> > mQueuedMessages;
@ -448,6 +454,7 @@ public:
ON_CHANNEL_CLOSED,
ON_DATA,
START_DEFER,
BUFFER_LOW_THRESHOLD,
}; /* types */
DataChannelOnMessageAvailable(int32_t aType,
@ -489,6 +496,7 @@ public:
case ON_DATA:
case ON_CHANNEL_OPEN:
case ON_CHANNEL_CLOSED:
case BUFFER_LOW_THRESHOLD:
{
MutexAutoLock lock(mChannel->mListenerLock);
if (!mChannel->mListener) {
@ -510,6 +518,9 @@ public:
case ON_CHANNEL_CLOSED:
mChannel->mListener->OnChannelClosed(mChannel->mContext);
break;
case BUFFER_LOW_THRESHOLD:
mChannel->mListener->OnBufferLow(mChannel->mContext);
break;
}
break;
}

View File

@ -32,6 +32,9 @@ public:
// Called when the channel is closed
virtual nsresult OnChannelClosed(nsISupports *aContext) = 0;
// Called when the BufferedAmount drops below the BufferedAmountLowThreshold
virtual nsresult OnBufferLow(nsISupports *aContext) = 0;
};
}