gecko-dev/netwerk/sctp/datachannel/DataChannel.cpp
Lennart Grahl c2ebc3e45e Bug 979417 - Implement EOR when receiving and explicit EOR when sending on data channels (including DCEP). r=drno,jesup
This allows sending and receiving arbitrarily (we limit to 1 GiB atm) sized
messages while not relying on the deprecated PPID fragmentation/reassembly
mode. The code already supports the ndata extension but it's not activated,
yet. Without the SCTP ndata extension, a large data channel message will
monopolise the SCTP association. While this is a problem, it is a temporary
solution until the extension is being activated. Keep in mind that every
application that uses data channels currently does fragmentation/reassembly on
application-level and it's unlikely that this will change until the popular
implementations (libwebrtc) implement EOR as well. Moreover, until the WebRTC
API specifies an API that hands over partial messages, doing application-level
fragmentation/reassembly is still useful for very large messages (sadly).

We fall back to PPID-based fragmentation/reassembly mode IFF a=max-message-size
is not set in the SDP and the negotiated amount of SCTP inbound streams is
exactly 256. Other implementations should avoid using this combination (to be
precise, other implementations should send a=max-message-size).

It also changes behaviour of RTCDataChannel.send which now raises TypeError in
case the message is too large for the other peer to receive. This is a
necessity to ensure that implementations that do not look at the EOR flag when
receiving are always able to receive our messages. Even if these
implementations do not set a=max-message-size, we use a safe default value (64
KiB, dictated by the spec) that every implementation should be able to receive,
with or without EOR support.

* Due to the use of explicit EOR, this required some major refactoring of all
  send-related and deferred sending functions (which is now a lot less
  complex). There's now only one place where `usrsctp_sendv` is being used.
* All data channel messages and DCEP messages will be sent without copying them
  first. Only in case this fails (e.g. usrsctp's buffer is full), the message
  will be copied and added to a buffer queue.
* Queued data channel messages will now be re-sent fairly (round-robin).
* Maximum message size and the PPID-based fragmentation are configurable using
  about:config (media.peerconnection.sctp.force_ppid_fragmentation and
  media.peerconnection.sctp.force_maximum_message_size).
* Enable interleaving of incoming messages for different streams (preparation
  for SCTP ndata, has no effect until it is enabled).
* Enable interleaving of outgoing messages (disabled if SCTP ndata has not been
  negotiated).
* Add pending messages flag to reduce performance impact from frequent calls to
  SendDeferredMessages.
* Handle partial delivery events (for cases where a partially delivered message
  is being aborted).
* Close a data channel/the connection in case the message is too large to be
  handled (this is only applied in cases where the remote peer ignores our
  announced local maximum message size).
* Various size_t to uint32_t conversions (message length) and back should be
  safe now.
* Remove aUsingDtls/mUsingDtls from DataChannelConnection.
* Set maximum message size in SDP and in the data channel stack.
* Replace implicit NS_ENSURE_*'s with explicit NS_WARN_IF's.
* Add SetMaxMessageSize method for late-applying those signalling parameters
  when a data channel has been created before the remote SDP was available.
* Limit remote maximum message size and add a GetMaxMessageSize method for a
  future implementation of RTCSctpTransport.maxMessageSize.

MozReview-Commit-ID: FlmZrpC5zVI

--HG--
extra : rebase_source : 54e1b838c788a3abbded4fb32fe7c2788f8a9bc0
2017-07-26 13:18:54 +02:00

3178 lines
104 KiB
C++

/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/. */
#include <algorithm>
#include <stdio.h>
#include <stdlib.h>
#if !defined(__Userspace_os_Windows)
#include <arpa/inet.h>
#endif
// usrsctp.h expects to have errno definitions prior to its inclusion.
#include <errno.h>
#define SCTP_DEBUG 1
#define SCTP_STDINT_INCLUDE <stdint.h>
#ifdef _MSC_VER
// Disable "warning C4200: nonstandard extension used : zero-sized array in
// struct/union"
// ...which the third-party file usrsctp.h runs afoul of.
#pragma warning(push)
#pragma warning(disable:4200)
#endif
#include "usrsctp.h"
#ifdef _MSC_VER
#pragma warning(pop)
#endif
#include "DataChannelLog.h"
#include "nsServiceManagerUtils.h"
#include "nsIObserverService.h"
#include "nsIObserver.h"
#include "nsIPrefBranch.h"
#include "nsIPrefService.h"
#include "mozilla/Services.h"
#include "mozilla/Sprintf.h"
#include "nsProxyRelease.h"
#include "nsThread.h"
#include "nsThreadUtils.h"
#include "nsAutoPtr.h"
#include "nsNetUtil.h"
#include "nsNetCID.h"
#include "mozilla/StaticPtr.h"
#include "mozilla/Unused.h"
#ifdef MOZ_PEERCONNECTION
#include "mtransport/runnable_utils.h"
#endif
#define DATACHANNEL_LOG(args) LOG(args)
#include "DataChannel.h"
#include "DataChannelProtocol.h"
// Let us turn on and off important assertions in non-debug builds
#ifdef DEBUG
#define ASSERT_WEBRTC(x) MOZ_ASSERT((x))
#elif defined(MOZ_WEBRTC_ASSERT_ALWAYS)
#define ASSERT_WEBRTC(x) do { if (!(x)) { MOZ_CRASH(); } } while (0)
#endif
static bool sctp_initialized;
namespace mozilla {
LazyLogModule gDataChannelLog("DataChannel");
static LazyLogModule gSCTPLog("SCTP");
#define SCTP_LOG(args) MOZ_LOG(mozilla::gSCTPLog, mozilla::LogLevel::Debug, args)
class DataChannelShutdown : public nsIObserver
{
public:
// This needs to be tied to some form object that is guaranteed to be
// around (singleton likely) unless we want to shutdown sctp whenever
// we're not using it (and in which case we'd keep a refcnt'd object
// ref'd by each DataChannelConnection to release the SCTP usrlib via
// sctp_finish). Right now, the single instance of this class is
// owned by the observer service.
NS_DECL_ISUPPORTS
DataChannelShutdown() {}
void Init()
{
nsCOMPtr<nsIObserverService> observerService =
mozilla::services::GetObserverService();
if (!observerService)
return;
nsresult rv = observerService->AddObserver(this,
"xpcom-will-shutdown",
false);
MOZ_ASSERT(rv == NS_OK);
(void) rv;
}
private:
// The only instance of DataChannelShutdown is owned by the observer
// service, so there is no need to call RemoveObserver here.
virtual ~DataChannelShutdown() = default;
public:
NS_IMETHOD Observe(nsISupports* aSubject, const char* aTopic,
const char16_t* aData) override {
if (strcmp(aTopic, "xpcom-will-shutdown") == 0) {
LOG(("Shutting down SCTP"));
if (sctp_initialized) {
usrsctp_finish();
sctp_initialized = false;
}
nsCOMPtr<nsIObserverService> observerService =
mozilla::services::GetObserverService();
if (!observerService)
return NS_ERROR_FAILURE;
nsresult rv = observerService->RemoveObserver(this,
"xpcom-will-shutdown");
MOZ_ASSERT(rv == NS_OK);
(void) rv;
}
return NS_OK;
}
};
NS_IMPL_ISUPPORTS(DataChannelShutdown, nsIObserver);
OutgoingMsg::OutgoingMsg(struct sctp_sendv_spa &info, const uint8_t *data,
size_t length)
: mLength(length)
, mData(data)
{
mInfo = &info;
mPos = 0;
}
void OutgoingMsg::Advance(size_t offset)
{
mPos += offset;
if (mPos > mLength) {
mPos = mLength;
}
}
BufferedOutgoingMsg::BufferedOutgoingMsg(OutgoingMsg &msg)
{
size_t length = msg.GetLeft();
auto *tmp = new uint8_t[length]; // infallible malloc!
memcpy(tmp, msg.GetData(), length);
mLength = length;
mData = tmp;
mInfo = new sctp_sendv_spa;
*mInfo = msg.GetInfo();
mPos = 0;
}
BufferedOutgoingMsg::~BufferedOutgoingMsg()
{
delete mInfo;
delete mData;
}
static int
receive_cb(struct socket* sock, union sctp_sockstore addr,
void *data, size_t datalen,
struct sctp_rcvinfo rcv, int flags, void *ulp_info)
{
DataChannelConnection *connection = static_cast<DataChannelConnection*>(ulp_info);
return connection->ReceiveCallback(sock, data, datalen, rcv, flags);
}
static
DataChannelConnection *
GetConnectionFromSocket(struct socket* sock)
{
struct sockaddr *addrs = nullptr;
int naddrs = usrsctp_getladdrs(sock, 0, &addrs);
if (naddrs <= 0 || addrs[0].sa_family != AF_CONN) {
return nullptr;
}
// usrsctp_getladdrs() returns the addresses bound to this socket, which
// contains the SctpDataMediaChannel* as sconn_addr. Read the pointer,
// then free the list of addresses once we have the pointer. We only open
// AF_CONN sockets, and they should all have the sconn_addr set to the
// pointer that created them, so [0] is as good as any other.
struct sockaddr_conn *sconn = reinterpret_cast<struct sockaddr_conn *>(&addrs[0]);
DataChannelConnection *connection =
reinterpret_cast<DataChannelConnection *>(sconn->sconn_addr);
usrsctp_freeladdrs(addrs);
return connection;
}
// called when the buffer empties to the threshold value
static int
threshold_event(struct socket* sock, uint32_t sb_free)
{
DataChannelConnection *connection = GetConnectionFromSocket(sock);
if (connection) {
connection->SendDeferredMessages();
} else {
LOG(("Can't find connection for socket %p", sock));
}
return 0;
}
static void
debug_printf(const char *format, ...)
{
va_list ap;
char buffer[1024];
if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
va_start(ap, format);
#ifdef _WIN32
if (vsnprintf_s(buffer, sizeof(buffer), _TRUNCATE, format, ap) > 0) {
#else
if (VsprintfLiteral(buffer, format, ap) > 0) {
#endif
SCTP_LOG(("%s", buffer));
}
va_end(ap);
}
}
DataChannelConnection::DataChannelConnection(DataConnectionListener *listener,
nsIEventTarget *aTarget)
: NeckoTargetHolder(aTarget)
, mLock("netwerk::sctp::DataChannelConnection")
{
mCurrentStream = 0;
mState = CLOSED;
mSocket = nullptr;
mMasterSocket = nullptr;
mListener = listener;
mLocalPort = 0;
mRemotePort = 0;
mPendingType = PENDING_NONE;
LOG(("Constructor DataChannelConnection=%p, listener=%p", this, mListener.get()));
mInternalIOThread = nullptr;
}
DataChannelConnection::~DataChannelConnection()
{
LOG(("Deleting DataChannelConnection %p", (void *) this));
// This may die on the MainThread, or on the STS thread
ASSERT_WEBRTC(mState == CLOSED);
MOZ_ASSERT(!mMasterSocket);
MOZ_ASSERT(mPending.GetSize() == 0);
// Already disconnected from sigslot/mTransportFlow
// TransportFlows must be released from the STS thread
if (!IsSTSThread()) {
ASSERT_WEBRTC(NS_IsMainThread());
if (mTransportFlow) {
ASSERT_WEBRTC(mSTS);
NS_ProxyRelease(
"DataChannelConnection::mTransportFlow", mSTS, mTransportFlow.forget());
}
if (mInternalIOThread) {
// Avoid spinning the event thread from here (which if we're mainthread
// is in the event loop already)
nsCOMPtr<nsIRunnable> r = WrapRunnable(nsCOMPtr<nsIThread>(mInternalIOThread),
&nsIThread::Shutdown);
Dispatch(r.forget());
}
} else {
// on STS, safe to call shutdown
if (mInternalIOThread) {
mInternalIOThread->Shutdown();
}
}
}
void
DataChannelConnection::Destroy()
{
// Though it's probably ok to do this and close the sockets;
// if we really want it to do true clean shutdowns it can
// create a dependant Internal object that would remain around
// until the network shut down the association or timed out.
LOG(("Destroying DataChannelConnection %p", (void *) this));
ASSERT_WEBRTC(NS_IsMainThread());
CloseAll();
MutexAutoLock lock(mLock);
// If we had a pending reset, we aren't waiting for it - clear the list so
// we can deregister this DataChannelConnection without leaking.
ClearResets();
MOZ_ASSERT(mSTS);
ASSERT_WEBRTC(NS_IsMainThread());
// Must do this in Destroy() since we may then delete this object.
// Do this before dispatching to create a consistent ordering of calls to
// the SCTP stack.
usrsctp_deregister_address(static_cast<void *>(this));
LOG(("Deregistered %p from the SCTP stack.", static_cast<void *>(this)));
// Finish Destroy on STS thread to avoid bug 876167 - once that's fixed,
// the usrsctp_close() calls can move back here (and just proxy the
// disconnect_all())
RUN_ON_THREAD(mSTS, WrapRunnable(RefPtr<DataChannelConnection>(this),
&DataChannelConnection::DestroyOnSTS,
mSocket, mMasterSocket),
NS_DISPATCH_NORMAL);
// These will be released on STS
mSocket = nullptr;
mMasterSocket = nullptr; // also a flag that we've Destroyed this connection
// We can't get any more new callbacks from the SCTP library
// All existing callbacks have refs to DataChannelConnection
// nsDOMDataChannel objects have refs to DataChannels that have refs to us
}
void DataChannelConnection::DestroyOnSTS(struct socket *aMasterSocket,
struct socket *aSocket)
{
if (aSocket && aSocket != aMasterSocket)
usrsctp_close(aSocket);
if (aMasterSocket)
usrsctp_close(aMasterSocket);
disconnect_all();
}
bool
DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aMaxMessageSizeSet,
uint64_t aMaxMessageSize)
{
struct sctp_initmsg initmsg;
struct sctp_assoc_value av;
struct sctp_event event;
socklen_t len;
uint16_t event_types[] = {SCTP_ASSOC_CHANGE,
SCTP_PEER_ADDR_CHANGE,
SCTP_REMOTE_ERROR,
SCTP_SHUTDOWN_EVENT,
SCTP_ADAPTATION_INDICATION,
SCTP_PARTIAL_DELIVERY_EVENT,
SCTP_SEND_FAILED_EVENT,
SCTP_STREAM_RESET_EVENT,
SCTP_STREAM_CHANGE_EVENT};
{
ASSERT_WEBRTC(NS_IsMainThread());
// MutexAutoLock lock(mLock); Not needed since we're on mainthread always
mSendInterleaved = false;
mPpidFragmentation = false;
SetMaxMessageSize(aMaxMessageSizeSet, aMaxMessageSize);
if (!sctp_initialized) {
LOG(("sctp_init"));
#ifdef MOZ_PEERCONNECTION
usrsctp_init(0,
DataChannelConnection::SctpDtlsOutput,
debug_printf
);
#else
MOZ_CRASH("Trying to use SCTP/DTLS without mtransport");
#endif
// Set logging to SCTP:LogLevel::Debug to get SCTP debugs
if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL);
}
// Do not send ABORTs in response to INITs (1).
// Do not send ABORTs for received Out of the Blue packets (2).
usrsctp_sysctl_set_sctp_blackhole(2);
// Disable the Explicit Congestion Notification extension (currently not supported by the
// Firefox code)
usrsctp_sysctl_set_sctp_ecn_enable(0);
// Enable interleaving messages for different streams (incoming)
// See: https://tools.ietf.org/html/rfc6458#section-8.1.20
usrsctp_sysctl_set_sctp_default_frag_interleave(2);
sctp_initialized = true;
RefPtr<DataChannelShutdown> shutdown = new DataChannelShutdown();
shutdown->Init();
}
}
// XXX FIX! make this a global we get once
// Find the STS thread
nsresult rv;
mSTS = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
MOZ_ASSERT(NS_SUCCEEDED(rv));
// Open sctp with a callback
if ((mMasterSocket = usrsctp_socket(
AF_CONN, SOCK_STREAM, IPPROTO_SCTP, receive_cb, threshold_event,
usrsctp_sysctl_get_sctp_sendspace() / 2, this)) == nullptr) {
return false;
}
// Make non-blocking for bind/connect. SCTP over UDP defaults to non-blocking
// in associations for normal IO
if (usrsctp_set_non_blocking(mMasterSocket, 1) < 0) {
LOG(("Couldn't set non_blocking on SCTP socket"));
// We can't handle connect() safely if it will block, not that this will
// even happen.
goto error_cleanup;
}
// Make sure when we close the socket, make sure it doesn't call us back again!
// This would cause it try to use an invalid DataChannelConnection pointer
struct linger l;
l.l_onoff = 1;
l.l_linger = 0;
if (usrsctp_setsockopt(mMasterSocket, SOL_SOCKET, SO_LINGER,
(const void *)&l, (socklen_t)sizeof(struct linger)) < 0) {
LOG(("Couldn't set SO_LINGER on SCTP socket"));
// unsafe to allow it to continue if this fails
goto error_cleanup;
}
// XXX Consider disabling this when we add proper SDP negotiation.
// We may want to leave enabled for supporting 'cloning' of SDP offers, which
// implies re-use of the same pseudo-port number, or forcing a renegotiation.
{
const int option_value = 1;
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REUSE_PORT,
(const void *)&option_value, (socklen_t)sizeof(option_value)) < 0) {
LOG(("Couldn't set SCTP_REUSE_PORT on SCTP socket"));
}
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_NODELAY,
(const void *)&option_value, (socklen_t)sizeof(option_value)) < 0) {
LOG(("Couldn't set SCTP_NODELAY on SCTP socket"));
}
}
// Set explicit EOR
{
const int option_value = 1;
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EXPLICIT_EOR,
(const void *)&option_value, (socklen_t)sizeof(option_value)) < 0) {
LOG(("*** failed enable explicit EOR mode %d", errno));
goto error_cleanup;
}
}
// Enable ndata
// TODO: Bug 1381145, enable this once ndata has been deployed
#if 0
av.assoc_id = SCTP_FUTURE_ASSOC;
av.assoc_value = 1;
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INTERLEAVING_SUPPORTED, &av,
(socklen_t)sizeof(struct sctp_assoc_value)) < 0) {
LOG(("*** failed enable ndata errno %d", errno));
goto error_cleanup;
}
#endif
av.assoc_id = SCTP_ALL_ASSOC;
av.assoc_value = SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ;
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av,
(socklen_t)sizeof(struct sctp_assoc_value)) < 0) {
LOG(("*** failed enable stream reset errno %d", errno));
goto error_cleanup;
}
/* Enable the events of interest. */
memset(&event, 0, sizeof(event));
event.se_assoc_id = SCTP_ALL_ASSOC;
event.se_on = 1;
for (unsigned short event_type : event_types) {
event.se_type = event_type;
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EVENT, &event, sizeof(event)) < 0) {
LOG(("*** failed setsockopt SCTP_EVENT errno %d", errno));
goto error_cleanup;
}
}
// Update number of streams
mStreams.AppendElements(aNumStreams);
for (uint32_t i = 0; i < aNumStreams; ++i) {
mStreams[i] = nullptr;
}
memset(&initmsg, 0, sizeof(initmsg));
len = sizeof(initmsg);
if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) {
LOG(("*** failed getsockopt SCTP_INITMSG"));
goto error_cleanup;
}
LOG(("Setting number of SCTP streams to %u, was %u/%u", aNumStreams,
initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
initmsg.sinit_num_ostreams = aNumStreams;
initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
(socklen_t)sizeof(initmsg)) < 0) {
LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
goto error_cleanup;
}
mSocket = nullptr;
usrsctp_register_address(static_cast<void *>(this));
LOG(("Registered %p within the SCTP stack.", static_cast<void *>(this)));
return true;
error_cleanup:
usrsctp_close(mMasterSocket);
mMasterSocket = nullptr;
return false;
}
void
DataChannelConnection::SetMaxMessageSize(bool aMaxMessageSizeSet, uint64_t aMaxMessageSize)
{
MutexAutoLock lock(mLock); // TODO: Needed?
mMaxMessageSizeSet = aMaxMessageSizeSet;
mMaxMessageSize = aMaxMessageSize;
bool ppidFragmentationEnforced = false;
nsresult rv;
nsCOMPtr<nsIPrefService> prefs = do_GetService("@mozilla.org/preferences-service;1", &rv);
if (!NS_WARN_IF(NS_FAILED(rv))) {
nsCOMPtr<nsIPrefBranch> branch = do_QueryInterface(prefs);
if (branch) {
if (!NS_FAILED(branch->GetBoolPref(
"media.peerconnection.sctp.force_ppid_fragmentation", &mPpidFragmentation))) {
// Ensure that forced on/off PPID fragmentation does not get overridden when Firefox has
// been detected.
mMaxMessageSizeSet = true;
ppidFragmentationEnforced = true;
}
int32_t temp;
if (!NS_FAILED(branch->GetIntPref(
"media.peerconnection.sctp.force_maximum_message_size", &temp))) {
if (temp >= 0) {
mMaxMessageSize = (uint64_t)temp;
}
}
}
}
// Fix remote MMS. This code exists, so future implementations of RTCSctpTransport.maxMessageSize
// can simply provide that value from GetMaxMessageSize.
// TODO: Bug 1382779, once resolved, can be increased to min(Uint8ArrayMaxSize, UINT32_MAX)
// TODO: Bug 1381146, once resolved, can be increased to whatever we support then (hopefully
// SIZE_MAX)
if (mMaxMessageSize == 0 || mMaxMessageSize > WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_REMOTE) {
mMaxMessageSize = WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_REMOTE;
}
LOG(("Use PPID-based fragmentation/reassembly: %s (enforced=%s)",
mPpidFragmentation ? "yes" : "no", ppidFragmentationEnforced ? "yes" : "no"));
LOG(("Maximum message size (outgoing data): %" PRIu64 " (set=%s, enforced=%s)",
mMaxMessageSize, mMaxMessageSizeSet ? "yes" : "no",
aMaxMessageSize != mMaxMessageSize ? "yes" : "no"));
}
uint64_t
DataChannelConnection::GetMaxMessageSize()
{
return mMaxMessageSize;
}
#ifdef MOZ_PEERCONNECTION
void
DataChannelConnection::SetEvenOdd()
{
ASSERT_WEBRTC(IsSTSThread());
TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>(
mTransportFlow->GetLayer(TransportLayerDtls::ID()));
MOZ_ASSERT(dtls); // DTLS is mandatory
mAllocateEven = (dtls->role() == TransportLayerDtls::CLIENT);
}
bool
DataChannelConnection::ConnectViaTransportFlow(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport)
{
LOG(("Connect DTLS local %u, remote %u", localport, remoteport));
NS_PRECONDITION(mMasterSocket, "SCTP wasn't initialized before ConnectViaTransportFlow!");
if (NS_WARN_IF(!aFlow)) {
return false;
}
mTransportFlow = aFlow;
mLocalPort = localport;
mRemotePort = remoteport;
mState = CONNECTING;
RUN_ON_THREAD(mSTS, WrapRunnable(RefPtr<DataChannelConnection>(this),
&DataChannelConnection::SetSignals),
NS_DISPATCH_NORMAL);
return true;
}
void
DataChannelConnection::SetSignals()
{
ASSERT_WEBRTC(IsSTSThread());
ASSERT_WEBRTC(mTransportFlow);
LOG(("Setting transport signals, state: %d", mTransportFlow->state()));
mTransportFlow->SignalPacketReceived.connect(this, &DataChannelConnection::SctpDtlsInput);
// SignalStateChange() doesn't call you with the initial state
mTransportFlow->SignalStateChange.connect(this, &DataChannelConnection::CompleteConnect);
CompleteConnect(mTransportFlow, mTransportFlow->state());
}
void
DataChannelConnection::CompleteConnect(TransportFlow *flow, TransportLayer::State state)
{
LOG(("Data transport state: %d", state));
MutexAutoLock lock(mLock);
ASSERT_WEBRTC(IsSTSThread());
// We should abort connection on TS_ERROR.
// Note however that the association will also fail (perhaps with a delay) and
// notify us in that way
if (state != TransportLayer::TS_OPEN || !mMasterSocket)
return;
struct sockaddr_conn addr;
memset(&addr, 0, sizeof(addr));
addr.sconn_family = AF_CONN;
#if defined(__Userspace_os_Darwin)
addr.sconn_len = sizeof(addr);
#endif
addr.sconn_port = htons(mLocalPort);
addr.sconn_addr = static_cast<void *>(this);
LOG(("Calling usrsctp_bind"));
int r = usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
sizeof(addr));
if (r < 0) {
LOG(("usrsctp_bind failed: %d", r));
} else {
// This is the remote addr
addr.sconn_port = htons(mRemotePort);
LOG(("Calling usrsctp_connect"));
r = usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
sizeof(addr));
if (r >= 0 || errno == EINPROGRESS) {
struct sctp_paddrparams paddrparams;
socklen_t opt_len;
memset(&paddrparams, 0, sizeof(struct sctp_paddrparams));
memcpy(&paddrparams.spp_address, &addr, sizeof(struct sockaddr_conn));
opt_len = (socklen_t)sizeof(struct sctp_paddrparams);
r = usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS,
&paddrparams, &opt_len);
if (r < 0) {
LOG(("usrsctp_getsockopt failed: %d", r));
} else {
// draft-ietf-rtcweb-data-channel-13 section 5: max initial MTU IPV4 1200, IPV6 1280
paddrparams.spp_pathmtu = 1200; // safe for either
paddrparams.spp_flags &= ~SPP_PMTUD_ENABLE;
paddrparams.spp_flags |= SPP_PMTUD_DISABLE;
opt_len = (socklen_t)sizeof(struct sctp_paddrparams);
r = usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS,
&paddrparams, opt_len);
if (r < 0) {
LOG(("usrsctp_getsockopt failed: %d", r));
} else {
LOG(("usrsctp: PMTUD disabled, MTU set to %u", paddrparams.spp_pathmtu));
}
}
}
if (r < 0) {
if (errno == EINPROGRESS) {
// non-blocking
return;
}
LOG(("usrsctp_connect failed: %d", errno));
mState = CLOSED;
} else {
// We set Even/Odd and fire ON_CONNECTION via SCTP_COMM_UP when we get that
// This also avoids issues with calling TransportFlow stuff on Mainthread
return;
}
}
// Note: currently this doesn't actually notify the application
Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CONNECTION,
this)));
}
// Process any pending Opens
void
DataChannelConnection::ProcessQueuedOpens()
{
// The nsDeque holds channels with an AddRef applied. Another reference
// (may) be held by the DOMDataChannel, unless it's been GC'd. No other
// references should exist.
// Can't copy nsDeque's. Move into temp array since any that fail will
// go back to mPending
nsDeque temp;
DataChannel *temp_channel; // really already_AddRefed<>
while (nullptr != (temp_channel = static_cast<DataChannel *>(mPending.PopFront()))) {
temp.Push(static_cast<void *>(temp_channel));
}
RefPtr<DataChannel> channel;
// All these entries have an AddRef(); make that explicit now via the dont_AddRef()
while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(temp.PopFront())))) {
if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
LOG(("Processing queued open for %p (%u)", channel.get(), channel->mStream));
channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_OPEN;
// OpenFinish returns a reference itself, so we need to take it can Release it
channel = OpenFinish(channel.forget()); // may reset the flag and re-push
} else {
NS_ASSERTION(false, "How did a DataChannel get queued without the FINISH_OPEN flag?");
}
}
}
void
DataChannelConnection::SctpDtlsInput(TransportFlow *flow,
const unsigned char *data, size_t len)
{
if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
char *buf;
if ((buf = usrsctp_dumppacket((void *)data, len, SCTP_DUMP_INBOUND)) != nullptr) {
SCTP_LOG(("%s", buf));
usrsctp_freedumpbuffer(buf);
}
}
// Pass the data to SCTP
usrsctp_conninput(static_cast<void *>(this), data, len, 0);
}
int
DataChannelConnection::SendPacket(unsigned char data[], size_t len, bool release)
{
//LOG(("%p: SCTP/DTLS sent %ld bytes", this, len));
int res = mTransportFlow->SendPacket(data, len) < 0 ? 1 : 0;
if (release)
delete [] data;
return res;
}
/* static */
int
DataChannelConnection::SctpDtlsOutput(void *addr, void *buffer, size_t length,
uint8_t tos, uint8_t set_df)
{
DataChannelConnection *peer = static_cast<DataChannelConnection *>(addr);
int res;
if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
char *buf;
if ((buf = usrsctp_dumppacket(buffer, length, SCTP_DUMP_OUTBOUND)) != nullptr) {
SCTP_LOG(("%s", buf));
usrsctp_freedumpbuffer(buf);
}
}
// We're async proxying even if on the STSThread because this is called
// with internal SCTP locks held in some cases (such as in usrsctp_connect()).
// SCTP has an option for Apple, on IP connections only, to release at least
// one of the locks before calling a packet output routine; with changes to
// the underlying SCTP stack this might remove the need to use an async proxy.
if ((false /*peer->IsSTSThread()*/)) {
res = peer->SendPacket(static_cast<unsigned char *>(buffer), length, false);
} else {
auto *data = new unsigned char[length];
memcpy(data, buffer, length);
// Commented out since we have to Dispatch SendPacket to avoid deadlock"
// res = -1;
// XXX It might be worthwhile to add an assertion against the thread
// somehow getting into the DataChannel/SCTP code again, as
// DISPATCH_SYNC is not fully blocking. This may be tricky, as it
// needs to be a per-thread check, not a global.
peer->mSTS->Dispatch(WrapRunnable(
RefPtr<DataChannelConnection>(peer),
&DataChannelConnection::SendPacket, data, length, true),
NS_DISPATCH_NORMAL);
res = 0; // cheat! Packets can always be dropped later anyways
}
return res;
}
#endif
#ifdef ALLOW_DIRECT_SCTP_LISTEN_CONNECT
// listen for incoming associations
// Blocks! - Don't call this from main thread!
#error This code will not work as-is since SetEvenOdd() runs on Mainthread
bool
DataChannelConnection::Listen(unsigned short port)
{
struct sockaddr_in addr;
socklen_t addr_len;
NS_WARNING_ASSERTION(!NS_IsMainThread(),
"Blocks, do not call from main thread!!!");
/* Acting as the 'server' */
memset((void *)&addr, 0, sizeof(addr));
#ifdef HAVE_SIN_LEN
addr.sin_len = sizeof(struct sockaddr_in);
#endif
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
LOG(("Waiting for connections on port %u", ntohs(addr.sin_port)));
mState = CONNECTING;
if (usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr), sizeof(struct sockaddr_in)) < 0) {
LOG(("***Failed userspace_bind"));
return false;
}
if (usrsctp_listen(mMasterSocket, 1) < 0) {
LOG(("***Failed userspace_listen"));
return false;
}
LOG(("Accepting connection"));
addr_len = 0;
if ((mSocket = usrsctp_accept(mMasterSocket, nullptr, &addr_len)) == nullptr) {
LOG(("***Failed accept"));
return false;
}
mState = OPEN;
struct linger l;
l.l_onoff = 1;
l.l_linger = 0;
if (usrsctp_setsockopt(mSocket, SOL_SOCKET, SO_LINGER,
(const void *)&l, (socklen_t)sizeof(struct linger)) < 0) {
LOG(("Couldn't set SO_LINGER on SCTP socket"));
}
SetEvenOdd();
// Notify Connection open
// XXX We need to make sure connection sticks around until the message is delivered
LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CONNECTION,
this, (DataChannel *) nullptr)));
return true;
}
// Blocks! - Don't call this from main thread!
bool
DataChannelConnection::Connect(const char *addr, unsigned short port)
{
struct sockaddr_in addr4;
struct sockaddr_in6 addr6;
NS_WARNING_ASSERTION(!NS_IsMainThread(),
"Blocks, do not call from main thread!!!");
/* Acting as the connector */
LOG(("Connecting to %s, port %u", addr, port));
memset((void *)&addr4, 0, sizeof(struct sockaddr_in));
memset((void *)&addr6, 0, sizeof(struct sockaddr_in6));
#ifdef HAVE_SIN_LEN
addr4.sin_len = sizeof(struct sockaddr_in);
#endif
#ifdef HAVE_SIN6_LEN
addr6.sin6_len = sizeof(struct sockaddr_in6);
#endif
addr4.sin_family = AF_INET;
addr6.sin6_family = AF_INET6;
addr4.sin_port = htons(port);
addr6.sin6_port = htons(port);
mState = CONNECTING;
#if !defined(__Userspace_os_Windows)
if (inet_pton(AF_INET6, addr, &addr6.sin6_addr) == 1) {
if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) {
LOG(("*** Failed userspace_connect"));
return false;
}
} else if (inet_pton(AF_INET, addr, &addr4.sin_addr) == 1) {
if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) {
LOG(("*** Failed userspace_connect"));
return false;
}
} else {
LOG(("*** Illegal destination address."));
}
#else
{
struct sockaddr_storage ss;
int sslen = sizeof(ss);
if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET6, nullptr, (struct sockaddr*)&ss, &sslen)) {
addr6.sin6_addr = (reinterpret_cast<struct sockaddr_in6 *>(&ss))->sin6_addr;
if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) {
LOG(("*** Failed userspace_connect"));
return false;
}
} else if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET, nullptr, (struct sockaddr*)&ss, &sslen)) {
addr4.sin_addr = (reinterpret_cast<struct sockaddr_in *>(&ss))->sin_addr;
if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) {
LOG(("*** Failed userspace_connect"));
return false;
}
} else {
LOG(("*** Illegal destination address."));
}
}
#endif
mSocket = mMasterSocket;
LOG(("connect() succeeded! Entering connected mode"));
mState = OPEN;
SetEvenOdd();
// Notify Connection open
// XXX We need to make sure connection sticks around until the message is delivered
LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CONNECTION,
this, (DataChannel *) nullptr)));
return true;
}
#endif
DataChannel *
DataChannelConnection::FindChannelByStream(uint16_t stream)
{
return mStreams.SafeElementAt(stream);
}
uint16_t
DataChannelConnection::FindFreeStream()
{
uint32_t i, j, limit;
limit = mStreams.Length();
if (limit > MAX_NUM_STREAMS)
limit = MAX_NUM_STREAMS;
for (i = (mAllocateEven ? 0 : 1); i < limit; i += 2) {
if (!mStreams[i]) {
// Verify it's not still in the process of closing
for (j = 0; j < mStreamsResetting.Length(); ++j) {
if (mStreamsResetting[j] == i) {
break;
}
}
if (j == mStreamsResetting.Length())
break;
}
}
if (i >= limit) {
return INVALID_STREAM;
}
return i;
}
uint32_t
DataChannelConnection::UpdateCurrentStreamIndex()
{
if (mCurrentStream == mStreams.Length() - 1) {
mCurrentStream = 0;
} else {
++mCurrentStream;
}
return mCurrentStream;
}
uint32_t
DataChannelConnection::GetCurrentStreamIndex()
{
// Fix current stream index (in case #streams decreased)
if (mCurrentStream >= mStreams.Length()) {
mCurrentStream = 0;
}
return mCurrentStream;
}
bool
DataChannelConnection::RequestMoreStreams(int32_t aNeeded)
{
struct sctp_status status;
struct sctp_add_streams sas;
uint32_t outStreamsNeeded;
socklen_t len;
if (aNeeded + mStreams.Length() > MAX_NUM_STREAMS) {
aNeeded = MAX_NUM_STREAMS - mStreams.Length();
}
if (aNeeded <= 0) {
return false;
}
len = (socklen_t)sizeof(struct sctp_status);
if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_STATUS, &status, &len) < 0) {
LOG(("***failed: getsockopt SCTP_STATUS"));
return false;
}
outStreamsNeeded = aNeeded; // number to add
// Note: if multiple channel opens happen when we don't have enough space,
// we'll call RequestMoreStreams() multiple times
memset(&sas, 0, sizeof(sas));
sas.sas_instrms = 0;
sas.sas_outstrms = (uint16_t)outStreamsNeeded; /* XXX error handling */
// Doesn't block, we get an event when it succeeds or fails
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ADD_STREAMS, &sas,
(socklen_t) sizeof(struct sctp_add_streams)) < 0) {
if (errno == EALREADY) {
LOG(("Already have %u output streams", outStreamsNeeded));
return true;
}
LOG(("***failed: setsockopt ADD errno=%d", errno));
return false;
}
LOG(("Requested %u more streams", outStreamsNeeded));
// We add to mStreams when we get a SCTP_STREAM_CHANGE_EVENT and the
// values are larger than mStreams.Length()
return true;
}
// Returns a POSIX error code.
int
DataChannelConnection::SendControlMessage(const uint8_t *data, uint32_t len, uint16_t stream)
{
struct sctp_sendv_spa info = {0};
// General flags
info.sendv_flags = SCTP_SEND_SNDINFO_VALID;
// Set stream identifier, protocol identifier and flags
info.sendv_sndinfo.snd_sid = stream;
info.sendv_sndinfo.snd_flags = SCTP_EOR;
info.sendv_sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL);
// Create message instance and send
// Note: Main-thread IO, but doesn't block
#if (UINT32_MAX > SIZE_MAX)
if (len > SIZE_MAX) {
return EMSGSIZE;
}
#endif
OutgoingMsg msg(info, data, (size_t)len);
bool buffered;
int error = SendMsgInternalOrBuffer(mBufferedControl, msg, buffered);
// Set pending type (if buffered)
if (!error && buffered && !mPendingType) {
mPendingType = PENDING_DCEP;
}
return error;
}
// Returns a POSIX error code.
int
DataChannelConnection::SendOpenAckMessage(uint16_t stream)
{
struct rtcweb_datachannel_ack ack;
memset(&ack, 0, sizeof(struct rtcweb_datachannel_ack));
ack.msg_type = DATA_CHANNEL_ACK;
return SendControlMessage((const uint8_t *)&ack, sizeof(ack), stream);
}
// Returns a POSIX error code.
int
DataChannelConnection::SendOpenRequestMessage(const nsACString& label,
const nsACString& protocol,
uint16_t stream, bool unordered,
uint16_t prPolicy, uint32_t prValue)
{
const int label_len = label.Length(); // not including nul
const int proto_len = protocol.Length(); // not including nul
// careful - request struct include one char for the label
const int req_size = sizeof(struct rtcweb_datachannel_open_request) - 1 +
label_len + proto_len;
struct rtcweb_datachannel_open_request *req =
(struct rtcweb_datachannel_open_request*) moz_xmalloc(req_size);
memset(req, 0, req_size);
req->msg_type = DATA_CHANNEL_OPEN_REQUEST;
switch (prPolicy) {
case SCTP_PR_SCTP_NONE:
req->channel_type = DATA_CHANNEL_RELIABLE;
break;
case SCTP_PR_SCTP_TTL:
req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_TIMED;
break;
case SCTP_PR_SCTP_RTX:
req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT;
break;
default:
free(req);
return EINVAL;
}
if (unordered) {
// Per the current types, all differ by 0x80 between ordered and unordered
req->channel_type |= 0x80; // NOTE: be careful if new types are added in the future
}
req->reliability_param = htonl(prValue);
req->priority = htons(0); /* XXX: add support */
req->label_length = htons(label_len);
req->protocol_length = htons(proto_len);
memcpy(&req->label[0], PromiseFlatCString(label).get(), label_len);
memcpy(&req->label[label_len], PromiseFlatCString(protocol).get(), proto_len);
// TODO: req_size is an int... that looks hairy
int error = SendControlMessage((const uint8_t *)req, req_size, stream);
free(req);
return error;
}
// XXX This should use a separate thread (outbound queue) which should
// select() to know when to *try* to send data to the socket again.
// Alternatively, it can use a timeout, but that's guaranteed to be wrong
// (just not sure in what direction). We could re-implement NSPR's
// PR_POLL_WRITE/etc handling... with a lot of work.
// Better yet, use the SCTP stack's notifications on buffer state to avoid
// filling the SCTP's buffers.
// returns if we're still blocked (true)
bool
DataChannelConnection::SendDeferredMessages()
{
RefPtr<DataChannel> channel; // we may null out the refs to this
// This may block while something is modifying channels, but should not block for IO
MutexAutoLock lock(mLock);
LOG(("SendDeferredMessages called, pending type: %d", mPendingType));
if (!mPendingType) {
return false;
}
// Send pending control messages
// Note: If ndata is not active, check if DCEP messages are currently outstanding. These need to
// be sent first before other streams can be used for sending.
if (!mBufferedControl.IsEmpty() && (mSendInterleaved || mPendingType == PENDING_DCEP)) {
if (SendBufferedMessages(mBufferedControl)) {
return true;
}
// Note: There may or may not be pending data messages
mPendingType = PENDING_DATA;
}
bool blocked = false;
uint32_t i = GetCurrentStreamIndex();
uint32_t end = i;
do {
channel = mStreams[i];
if (!channel || channel->mBufferedData.IsEmpty()) {
i = UpdateCurrentStreamIndex();
continue;
}
// Clear if closing/closed
if (channel->mState == CLOSED || channel->mState == CLOSING) {
channel->mBufferedData.Clear();
i = UpdateCurrentStreamIndex();
continue;
}
size_t bufferedAmount = channel->GetBufferedAmountLocked();
size_t threshold = channel->mBufferedThreshold;
bool wasOverThreshold = bufferedAmount >= threshold;
// Send buffered data messages
// Warning: This will fail in case ndata is inactive and a previously deallocated data channel
// has not been closed properly. If you ever see that no messages can be sent on any
// channel, this is likely the cause (an explicit EOR message partially sent whose
// remaining chunks are still being waited for).
blocked = SendBufferedMessages(channel->mBufferedData);
bufferedAmount = channel->GetBufferedAmountLocked();
// can never fire with default threshold of 0
if (wasOverThreshold && bufferedAmount < threshold) {
LOG(("%s: sending BUFFER_LOW_THRESHOLD for %s/%s: %u", __FUNCTION__,
channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::BUFFER_LOW_THRESHOLD,
this, channel)));
}
if (bufferedAmount == 0) {
// buffered-to-not-buffered transition; tell the DOM code in case this makes it
// available for GC
LOG(("%s: sending NO_LONGER_BUFFERED for %s/%s: %u", __FUNCTION__,
channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::NO_LONGER_BUFFERED,
this, channel)));
}
// Update current stream index
// Note: If ndata is not active, the outstanding data messages on this stream need to be sent
// first before other streams can be used for sending.
if (mSendInterleaved || !blocked) {
i = UpdateCurrentStreamIndex();
}
} while (!blocked && i != end);
if (!blocked) {
mPendingType = mBufferedControl.IsEmpty() ? PENDING_NONE : PENDING_DCEP;
}
return blocked;
}
// Called with mLock locked!
// buffer MUST have at least one item!
// returns if we're still blocked (true)
bool
DataChannelConnection::SendBufferedMessages(nsTArray<nsAutoPtr<BufferedOutgoingMsg>> &buffer)
{
do {
// Re-send message
int error = SendMsgInternal(*buffer[0]);
switch (error) {
case 0:
buffer.RemoveElementAt(0);
break;
case EAGAIN:
#if (EAGAIN != EWOULDBLOCK)
case EWOULDBLOCK:
#endif
return true;
default:
buffer.RemoveElementAt(0);
LOG(("error on sending: %d", error));
break;
}
} while (!buffer.IsEmpty());
return false;
}
// Caller must ensure that length <= SIZE_MAX
void
DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req,
uint32_t length, uint16_t stream)
{
RefPtr<DataChannel> channel;
uint32_t prValue;
uint16_t prPolicy;
uint32_t flags;
mLock.AssertCurrentThreadOwns();
const size_t requiredLength =
(sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length);
if (((size_t)length) != requiredLength) {
LOG(("%s: Inconsistent length: %u, should be %zu",
__FUNCTION__, length, requiredLength));
if (((size_t)length) < requiredLength)
return;
}
LOG(("%s: length %u, sizeof(*req) = %zu", __FUNCTION__, length, sizeof(*req)));
switch (req->channel_type) {
case DATA_CHANNEL_RELIABLE:
case DATA_CHANNEL_RELIABLE_UNORDERED:
prPolicy = SCTP_PR_SCTP_NONE;
break;
case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT_UNORDERED:
prPolicy = SCTP_PR_SCTP_RTX;
break;
case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED_UNORDERED:
prPolicy = SCTP_PR_SCTP_TTL;
break;
default:
LOG(("Unknown channel type %d", req->channel_type));
/* XXX error handling */
return;
}
prValue = ntohl(req->reliability_param);
flags = (req->channel_type & 0x80) ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
if ((channel = FindChannelByStream(stream))) {
if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
LOG(("ERROR: HandleOpenRequestMessage: channel for stream %u is in state %d instead of CLOSED.",
stream, channel->mState));
/* XXX: some error handling */
} else {
LOG(("Open for externally negotiated channel %u", stream));
// XXX should also check protocol, maybe label
if (prPolicy != channel->mPrPolicy ||
prValue != channel->mPrValue ||
flags != (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED))
{
LOG(("WARNING: external negotiation mismatch with OpenRequest:"
"channel %u, policy %u/%u, value %u/%u, flags %x/%x",
stream, prPolicy, channel->mPrPolicy,
prValue, channel->mPrValue, flags, channel->mFlags));
}
}
return;
}
if (stream >= mStreams.Length()) {
LOG(("%s: stream %u out of bounds (%zu)", __FUNCTION__, stream, mStreams.Length()));
return;
}
nsCString label(nsDependentCSubstring(&req->label[0], ntohs(req->label_length)));
nsCString protocol(nsDependentCSubstring(&req->label[ntohs(req->label_length)],
ntohs(req->protocol_length)));
channel = new DataChannel(this,
stream,
DataChannel::CONNECTING,
label,
protocol,
prPolicy, prValue,
flags,
nullptr, nullptr);
mStreams[stream] = channel;
channel->mState = DataChannel::WAITING_TO_OPEN;
LOG(("%s: sending ON_CHANNEL_CREATED for %s/%s: %u (state %u)", __FUNCTION__,
channel->mLabel.get(), channel->mProtocol.get(), stream, channel->mState));
Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CHANNEL_CREATED,
this, channel)));
LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
int error = SendOpenAckMessage(stream);
if (error) {
LOG(("SendOpenRequest failed, error = %d", error));
// Close the channel, inform the user
CloseInt(channel);
// XXX send error via DataChannelOnMessageAvailable (bug 843625)
return;
}
// Now process any queued data messages for the channel (which will
// themselves likely get queued until we leave WAITING_TO_OPEN, plus any
// more that come in before that happens)
DeliverQueuedData(stream);
}
// NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
// That would make this code moot. Keep it for now for backwards compatibility.
void
DataChannelConnection::DeliverQueuedData(uint16_t stream)
{
mLock.AssertCurrentThreadOwns();
uint32_t i = 0;
while (i < mQueuedData.Length()) {
// Careful! we may modify the array length from within the loop!
if (mQueuedData[i]->mStream == stream) {
LOG(("Delivering queued data for stream %u, length %u",
stream, mQueuedData[i]->mLength));
// Deliver the queued data
HandleDataMessage(mQueuedData[i]->mData, mQueuedData[i]->mLength,
mQueuedData[i]->mPpid, mQueuedData[i]->mStream,
mQueuedData[i]->mFlags);
mQueuedData.RemoveElementAt(i);
continue; // don't bump index since we removed the element
}
i++;
}
}
// Caller must ensure that length <= SIZE_MAX
void
DataChannelConnection::HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack,
uint32_t length, uint16_t stream)
{
DataChannel *channel;
mLock.AssertCurrentThreadOwns();
channel = FindChannelByStream(stream);
if (NS_WARN_IF(!channel)) {
return;
}
LOG(("OpenAck received for stream %u, waiting=%d", stream,
(channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK) ? 1 : 0));
channel->mFlags &= ~DATA_CHANNEL_FLAGS_WAITING_ACK;
}
// Caller must ensure that length <= SIZE_MAX
void
DataChannelConnection::HandleUnknownMessage(uint32_t ppid, uint32_t length, uint16_t stream)
{
/* XXX: Send an error message? */
LOG(("unknown DataChannel message received: %u, len %u on stream %d", ppid, length, stream));
// XXX Log to JS error console if possible
}
uint8_t
DataChannelConnection::BufferMessage(nsACString& recvBuffer, const void *data,
uint32_t length, uint32_t ppid, int flags)
{
const char *buffer = (const char *) data;
uint8_t bufferFlags = 0;
if ((flags & MSG_EOR) &&
ppid != DATA_CHANNEL_PPID_BINARY_PARTIAL &&
ppid != DATA_CHANNEL_PPID_DOMSTRING_PARTIAL) {
bufferFlags |= DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE;
// Return directly if nothing has been buffered
if (recvBuffer.IsEmpty()) {
return bufferFlags;
}
}
// Ensure it doesn't blow up our buffer
// TODO: Change 'WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL' to whatever the new buffer is capable
// of holding.
if (((uint64_t) recvBuffer.Length()) + ((uint64_t) length) > WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL) {
bufferFlags |= DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_TOO_LARGE;
return bufferFlags;
}
// Copy & add to receive buffer
recvBuffer.Append(buffer, length);
bufferFlags |= DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED;
return bufferFlags;
}
void
DataChannelConnection::HandleDataMessage(const void *data, size_t length, uint32_t ppid,
uint16_t stream, int flags)
{
DataChannel *channel;
const char *buffer = (const char *) data;
mLock.AssertCurrentThreadOwns();
channel = FindChannelByStream(stream);
// Note: Until we support SIZE_MAX sized messages, we need this check
#if (SIZE_MAX > UINT32_MAX)
if (length > UINT32_MAX) {
LOG(("DataChannel: Cannot handle message of size %zu (max=%" PRIu32 ")",
length, UINT32_MAX));
CloseInt(channel);
return;
}
#endif
uint32_t data_length = (uint32_t)length;
// XXX A closed channel may trip this... check
// NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
// That would make this code moot. Keep it for now for backwards compatibility.
if (!channel) {
// In the updated 0-RTT open case, the sender can send data immediately
// after Open, and doesn't set the in-order bit (since we don't have a
// response or ack). Also, with external negotiation, data can come in
// before we're told about the external negotiation. We need to buffer
// data until either a) Open comes in, if the ordering get messed up,
// or b) the app tells us this channel was externally negotiated. When
// these occur, we deliver the data.
// Since this is rare and non-performance, keep a single list of queued
// data messages to deliver once the channel opens.
LOG(("Queuing data for stream %u, length %u", stream, data_length));
// Copies data
mQueuedData.AppendElement(new QueuedDataMessage(stream, ppid, flags, data, data_length));
return;
}
// Ignore incoming data in case the channel is closed
if (channel->mState == CLOSED) {
return;
}
bool is_binary = true;
uint8_t bufferFlags;
int32_t type;
const char* info = "";
if (ppid == DATA_CHANNEL_PPID_DOMSTRING_PARTIAL ||
ppid == DATA_CHANNEL_PPID_DOMSTRING) {
is_binary = false;
}
if (is_binary != channel->mIsRecvBinary && !channel->mRecvBuffer.IsEmpty()) {
NS_WARNING("DataChannel message aborted by fragment type change!");
// TODO: Maybe closing would be better as this is a hard to detect protocol violation?
channel->mRecvBuffer.Truncate(0);
}
channel->mIsRecvBinary = is_binary;
// Remaining chunks of previously truncated message (due to the buffer being full)?
if (channel->mFlags & DATA_CHANNEL_FLAGS_CLOSING_TOO_LARGE) {
LOG(("DataChannel: Ignoring partial message of length %u, buffer full and closing",
data_length));
// Only unblock if unordered
if ((channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) && (flags & MSG_EOR)) {
channel->mFlags &= ~DATA_CHANNEL_FLAGS_CLOSING_TOO_LARGE;
}
}
// Buffer message until complete
bufferFlags = BufferMessage(channel->mRecvBuffer, buffer, data_length, ppid, flags);
if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_TOO_LARGE) {
LOG(("DataChannel: Buffered message would become too large to handle, closing channel"));
channel->mRecvBuffer.Truncate(0);
channel->mFlags |= DATA_CHANNEL_FLAGS_CLOSING_TOO_LARGE;
CloseInt(channel);
return;
}
if (!(bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE)) {
LOG(("DataChannel: Partial %s message of length %u (total %u) on channel id %u",
is_binary ? "binary" : "string", data_length, channel->mRecvBuffer.Length(),
channel->mStream));
return; // Not ready to notify application
}
if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
data_length = channel->mRecvBuffer.Length();
}
// Complain about large messages (only complain - we can handle it)
if (data_length > WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL) {
LOG(("DataChannel: Received message of length %u is > announced maximum message size (%u)",
data_length, WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL));
}
switch (ppid) {
case DATA_CHANNEL_PPID_DOMSTRING:
LOG(("DataChannel: Received string message of length %u on channel %u",
data_length, channel->mStream));
type = DataChannelOnMessageAvailable::ON_DATA_STRING;
if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
info = " (string fragmented)";
}
// else send using recvData normally
// WebSockets checks IsUTF8() here; we can try to deliver it
break;
case DATA_CHANNEL_PPID_BINARY:
LOG(("DataChannel: Received binary message of length %u on channel id %u",
data_length, channel->mStream));
type = DataChannelOnMessageAvailable::ON_DATA_BINARY;
if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
info = " (binary fragmented)";
}
// else send using recvData normally
break;
default:
NS_ERROR("Unknown data PPID");
return;
}
// Notify onmessage
LOG(("%s: sending ON_DATA_%s%s for %p", __FUNCTION__,
(type == DataChannelOnMessageAvailable::ON_DATA_STRING) ? "STRING" : "BINARY",
info, channel));
if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
channel->SendOrQueue(new DataChannelOnMessageAvailable(
type, this, channel, channel->mRecvBuffer));
channel->mRecvBuffer.Truncate(0);
} else {
nsAutoCString recvData(buffer, data_length); // copies (<64) or allocates
channel->SendOrQueue(new DataChannelOnMessageAvailable(
type, this, channel, recvData));
}
}
void
DataChannelConnection::HandleDCEPMessage(const void *buffer, size_t length, uint32_t ppid,
uint16_t stream, int flags)
{
const struct rtcweb_datachannel_open_request *req;
const struct rtcweb_datachannel_ack *ack;
// Note: Until we support SIZE_MAX sized messages, we need this check
#if (SIZE_MAX > UINT32_MAX)
if (length > UINT32_MAX) {
LOG(("DataChannel: Cannot handle message of size %zu (max=%u)", length, UINT32_MAX));
Stop();
return;
}
#endif
uint32_t data_length = (uint32_t)length;
mLock.AssertCurrentThreadOwns();
// Buffer message until complete
const uint8_t bufferFlags = BufferMessage(mRecvBuffer, buffer, data_length, ppid, flags);
if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_TOO_LARGE) {
LOG(("DataChannel: Buffered message would become too large to handle, closing connection"));
mRecvBuffer.Truncate(0);
Stop();
return;
}
if (!(bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE)) {
LOG(("Buffered partial DCEP message of length %u", data_length));
return;
}
if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
buffer = reinterpret_cast<const void *>(mRecvBuffer.BeginReading());
data_length = mRecvBuffer.Length();
}
req = static_cast<const struct rtcweb_datachannel_open_request *>(buffer);
LOG(("Handling DCEP message of length %u", data_length));
// Ensure minimum message size (ack is the smallest DCEP message)
if ((size_t)data_length < sizeof(*ack)) {
LOG(("Ignored invalid DCEP message (too short)"));
return;
}
switch (req->msg_type) {
case DATA_CHANNEL_OPEN_REQUEST:
// structure includes a possibly-unused char label[1] (in a packed structure)
if (NS_WARN_IF((size_t)data_length < sizeof(*req) - 1)) {
return;
}
HandleOpenRequestMessage(req, data_length, stream);
break;
case DATA_CHANNEL_ACK:
// >= sizeof(*ack) checked above
ack = static_cast<const struct rtcweb_datachannel_ack *>(buffer);
HandleOpenAckMessage(ack, data_length, stream);
break;
default:
HandleUnknownMessage(ppid, data_length, stream);
break;
}
// Reset buffer
mRecvBuffer.Truncate(0);
}
// Called with mLock locked!
void
DataChannelConnection::HandleMessage(const void *buffer, size_t length, uint32_t ppid,
uint16_t stream, int flags)
{
mLock.AssertCurrentThreadOwns();
switch (ppid) {
case DATA_CHANNEL_PPID_CONTROL:
HandleDCEPMessage(buffer, length, ppid, stream, flags);
break;
case DATA_CHANNEL_PPID_DOMSTRING_PARTIAL:
case DATA_CHANNEL_PPID_DOMSTRING:
case DATA_CHANNEL_PPID_BINARY_PARTIAL:
case DATA_CHANNEL_PPID_BINARY:
HandleDataMessage(buffer, length, ppid, stream, flags);
break;
default:
LOG(("Message of length %zu PPID %u on stream %u received (%s).",
length, ppid, stream, (flags & MSG_EOR) ? "complete" : "partial"));
break;
}
}
void
DataChannelConnection::HandleAssociationChangeEvent(const struct sctp_assoc_change *sac)
{
uint32_t i, n;
switch (sac->sac_state) {
case SCTP_COMM_UP:
LOG(("Association change: SCTP_COMM_UP"));
if (mState == CONNECTING) {
mSocket = mMasterSocket;
mState = OPEN;
// Check for older Firefox by looking at the amount of incoming streams
LOG(("Negotiated number of incoming streams: %" PRIu16, sac->sac_inbound_streams));
if (!mMaxMessageSizeSet
&& sac->sac_inbound_streams == WEBRTC_DATACHANNEL_STREAMS_OLDER_FIREFOX) {
LOG(("Older Firefox detected, using PPID-based fragmentation"));
mPpidFragmentation = true;
}
SetEvenOdd();
Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CONNECTION,
this)));
LOG(("DTLS connect() succeeded! Entering connected mode"));
// Open any streams pending...
ProcessQueuedOpens();
} else if (mState == OPEN) {
LOG(("DataConnection Already OPEN"));
} else {
LOG(("Unexpected state: %d", mState));
}
break;
case SCTP_COMM_LOST:
LOG(("Association change: SCTP_COMM_LOST"));
// This association is toast, so also close all the channels -- from mainthread!
Stop();
break;
case SCTP_RESTART:
LOG(("Association change: SCTP_RESTART"));
break;
case SCTP_SHUTDOWN_COMP:
LOG(("Association change: SCTP_SHUTDOWN_COMP"));
Stop();
break;
case SCTP_CANT_STR_ASSOC:
LOG(("Association change: SCTP_CANT_STR_ASSOC"));
break;
default:
LOG(("Association change: UNKNOWN"));
break;
}
LOG(("Association change: streams (in/out) = (%u/%u)",
sac->sac_inbound_streams, sac->sac_outbound_streams));
if (NS_WARN_IF(!sac)) {
return;
}
n = sac->sac_length - sizeof(*sac);
if ((sac->sac_state == SCTP_COMM_UP) || (sac->sac_state == SCTP_RESTART)) {
if (n > 0) {
for (i = 0; i < n; ++i) {
switch (sac->sac_info[i]) {
case SCTP_ASSOC_SUPPORTS_PR:
LOG(("Supports: PR"));
break;
case SCTP_ASSOC_SUPPORTS_AUTH:
LOG(("Supports: AUTH"));
break;
case SCTP_ASSOC_SUPPORTS_ASCONF:
LOG(("Supports: ASCONF"));
break;
case SCTP_ASSOC_SUPPORTS_MULTIBUF:
LOG(("Supports: MULTIBUF"));
break;
case SCTP_ASSOC_SUPPORTS_RE_CONFIG:
LOG(("Supports: RE-CONFIG"));
break;
#if defined(SCTP_ASSOC_SUPPORTS_INTERLEAVING)
case SCTP_ASSOC_SUPPORTS_INTERLEAVING:
LOG(("Supports: NDATA"));
// TODO: This should probably be set earlier above in 'case SCTP_COMM_UP' but we also
// need this for 'SCTP_RESTART'.
mSendInterleaved = true;
break;
#endif
default:
LOG(("Supports: UNKNOWN(0x%02x)", sac->sac_info[i]));
break;
}
}
}
} else if (((sac->sac_state == SCTP_COMM_LOST) ||
(sac->sac_state == SCTP_CANT_STR_ASSOC)) && (n > 0)) {
LOG(("Association: ABORT ="));
for (i = 0; i < n; ++i) {
LOG((" 0x%02x", sac->sac_info[i]));
}
}
if ((sac->sac_state == SCTP_CANT_STR_ASSOC) ||
(sac->sac_state == SCTP_SHUTDOWN_COMP) ||
(sac->sac_state == SCTP_COMM_LOST)) {
return;
}
}
void
DataChannelConnection::HandlePeerAddressChangeEvent(const struct sctp_paddr_change *spc)
{
const char *addr = "";
#if !defined(__Userspace_os_Windows)
char addr_buf[INET6_ADDRSTRLEN];
struct sockaddr_in *sin;
struct sockaddr_in6 *sin6;
#endif
switch (spc->spc_aaddr.ss_family) {
case AF_INET:
#if !defined(__Userspace_os_Windows)
sin = (struct sockaddr_in *)&spc->spc_aaddr;
addr = inet_ntop(AF_INET, &sin->sin_addr, addr_buf, INET6_ADDRSTRLEN);
#endif
break;
case AF_INET6:
#if !defined(__Userspace_os_Windows)
sin6 = (struct sockaddr_in6 *)&spc->spc_aaddr;
addr = inet_ntop(AF_INET6, &sin6->sin6_addr, addr_buf, INET6_ADDRSTRLEN);
#endif
break;
case AF_CONN:
addr = "DTLS connection";
break;
default:
break;
}
LOG(("Peer address %s is now ", addr));
switch (spc->spc_state) {
case SCTP_ADDR_AVAILABLE:
LOG(("SCTP_ADDR_AVAILABLE"));
break;
case SCTP_ADDR_UNREACHABLE:
LOG(("SCTP_ADDR_UNREACHABLE"));
break;
case SCTP_ADDR_REMOVED:
LOG(("SCTP_ADDR_REMOVED"));
break;
case SCTP_ADDR_ADDED:
LOG(("SCTP_ADDR_ADDED"));
break;
case SCTP_ADDR_MADE_PRIM:
LOG(("SCTP_ADDR_MADE_PRIM"));
break;
case SCTP_ADDR_CONFIRMED:
LOG(("SCTP_ADDR_CONFIRMED"));
break;
default:
LOG(("UNKNOWN"));
break;
}
LOG((" (error = 0x%08x).\n", spc->spc_error));
}
void
DataChannelConnection::HandleRemoteErrorEvent(const struct sctp_remote_error *sre)
{
size_t i, n;
n = sre->sre_length - sizeof(struct sctp_remote_error);
LOG(("Remote Error (error = 0x%04x): ", sre->sre_error));
for (i = 0; i < n; ++i) {
LOG((" 0x%02x", sre-> sre_data[i]));
}
}
void
DataChannelConnection::HandleShutdownEvent(const struct sctp_shutdown_event *sse)
{
LOG(("Shutdown event."));
/* XXX: notify all channels. */
// Attempts to actually send anything will fail
}
void
DataChannelConnection::HandleAdaptationIndication(const struct sctp_adaptation_event *sai)
{
LOG(("Adaptation indication: %x.", sai-> sai_adaptation_ind));
}
void
DataChannelConnection::HandlePartialDeliveryEvent(const struct sctp_pdapi_event *spde)
{
// Note: Be aware that stream and sequence number being u32 instead of u16 is
// a bug in the SCTP API. This may change in the future.
LOG(("Partial delivery event: "));
switch (spde->pdapi_indication) {
case SCTP_PARTIAL_DELIVERY_ABORTED:
LOG(("delivery aborted "));
break;
default:
LOG(("??? "));
break;
}
LOG(("(flags = %x), stream = %" PRIu32 ", sn = %" PRIu32, spde->pdapi_flags, spde->pdapi_stream,
spde->pdapi_seq));
// Validate stream ID
if (spde->pdapi_stream >= UINT16_MAX) {
LOG(("Invalid stream id in partial delivery event: %" PRIu32 "\n", spde->pdapi_stream));
return;
}
// Find channel and reset buffer
DataChannel *channel = FindChannelByStream((uint16_t)spde->pdapi_stream);
if (channel) {
LOG(("Abort partially delivered message of %u bytes\n", channel->mRecvBuffer.Length()));
channel->mRecvBuffer.Truncate(0);
}
}
void
DataChannelConnection::HandleSendFailedEvent(const struct sctp_send_failed_event *ssfe)
{
size_t i, n;
if (ssfe->ssfe_flags & SCTP_DATA_UNSENT) {
LOG(("Unsent "));
}
if (ssfe->ssfe_flags & SCTP_DATA_SENT) {
LOG(("Sent "));
}
if (ssfe->ssfe_flags & ~(SCTP_DATA_SENT | SCTP_DATA_UNSENT)) {
LOG(("(flags = %x) ", ssfe->ssfe_flags));
}
LOG(("message with PPID = %u, SID = %d, flags: 0x%04x due to error = 0x%08x",
ntohl(ssfe->ssfe_info.snd_ppid), ssfe->ssfe_info.snd_sid,
ssfe->ssfe_info.snd_flags, ssfe->ssfe_error));
n = ssfe->ssfe_length - sizeof(struct sctp_send_failed_event);
for (i = 0; i < n; ++i) {
LOG((" 0x%02x", ssfe->ssfe_data[i]));
}
}
void
DataChannelConnection::ClearResets()
{
// Clear all pending resets
if (!mStreamsResetting.IsEmpty()) {
LOG(("Clearing resets for %zu streams", mStreamsResetting.Length()));
}
for (uint32_t i = 0; i < mStreamsResetting.Length(); ++i) {
RefPtr<DataChannel> channel;
channel = FindChannelByStream(mStreamsResetting[i]);
if (channel) {
LOG(("Forgetting channel %u (%p) with pending reset",channel->mStream, channel.get()));
mStreams[channel->mStream] = nullptr;
}
}
mStreamsResetting.Clear();
}
void
DataChannelConnection::ResetOutgoingStream(uint16_t stream)
{
uint32_t i;
mLock.AssertCurrentThreadOwns();
LOG(("Connection %p: Resetting outgoing stream %u",
(void *) this, stream));
// Rarely has more than a couple items and only for a short time
for (i = 0; i < mStreamsResetting.Length(); ++i) {
if (mStreamsResetting[i] == stream) {
return;
}
}
mStreamsResetting.AppendElement(stream);
}
void
DataChannelConnection::SendOutgoingStreamReset()
{
struct sctp_reset_streams *srs;
uint32_t i;
size_t len;
LOG(("Connection %p: Sending outgoing stream reset for %zu streams",
(void *) this, mStreamsResetting.Length()));
mLock.AssertCurrentThreadOwns();
if (mStreamsResetting.IsEmpty()) {
LOG(("No streams to reset"));
return;
}
len = sizeof(sctp_assoc_t) + (2 + mStreamsResetting.Length()) * sizeof(uint16_t);
srs = static_cast<struct sctp_reset_streams *> (moz_xmalloc(len)); // infallible malloc
memset(srs, 0, len);
srs->srs_flags = SCTP_STREAM_RESET_OUTGOING;
srs->srs_number_streams = mStreamsResetting.Length();
for (i = 0; i < mStreamsResetting.Length(); ++i) {
srs->srs_stream_list[i] = mStreamsResetting[i];
}
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_RESET_STREAMS, srs, (socklen_t)len) < 0) {
LOG(("***failed: setsockopt RESET, errno %d", errno));
// if errno == EALREADY, this is normal - we can't send another reset
// with one pending.
// When we get an incoming reset (which may be a response to our
// outstanding one), see if we have any pending outgoing resets and
// send them
} else {
mStreamsResetting.Clear();
}
free(srs);
}
void
DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_event *strrst)
{
uint32_t n, i;
RefPtr<DataChannel> channel; // since we may null out the ref to the channel
if (!(strrst->strreset_flags & SCTP_STREAM_RESET_DENIED) &&
!(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) {
n = (strrst->strreset_length - sizeof(struct sctp_stream_reset_event)) / sizeof(uint16_t);
for (i = 0; i < n; ++i) {
if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
channel = FindChannelByStream(strrst->strreset_stream_list[i]);
if (channel) {
// The other side closed the channel
// We could be in three states:
// 1. Normal state (input and output streams (OPEN)
// Notify application, send a RESET in response on our
// outbound channel. Go to CLOSED
// 2. We sent our own reset (CLOSING); either they crossed on the
// wire, or this is a response to our Reset.
// Go to CLOSED
// 3. We've sent a open but haven't gotten a response yet (CONNECTING)
// I believe this is impossible, as we don't have an input stream yet.
LOG(("Incoming: Channel %u closed, state %d",
channel->mStream, channel->mState));
ASSERT_WEBRTC(channel->mState == DataChannel::OPEN ||
channel->mState == DataChannel::CLOSING ||
channel->mState == DataChannel::CONNECTING ||
channel->mState == DataChannel::WAITING_TO_OPEN);
if (channel->mState == DataChannel::OPEN ||
channel->mState == DataChannel::WAITING_TO_OPEN) {
// Mark the stream for reset (the reset is sent below)
ResetOutgoingStream(channel->mStream);
}
mStreams[channel->mStream] = nullptr;
LOG(("Disconnected DataChannel %p from connection %p",
(void *) channel.get(), (void *) channel->mConnection.get()));
// This sends ON_CHANNEL_CLOSED to mainthread
channel->StreamClosedLocked();
} else {
LOG(("Can't find incoming channel %d",i));
}
}
}
}
// Process any pending resets now:
if (!mStreamsResetting.IsEmpty()) {
LOG(("Sending %zu pending resets", mStreamsResetting.Length()));
SendOutgoingStreamReset();
}
}
void
DataChannelConnection::HandleStreamChangeEvent(const struct sctp_stream_change_event *strchg)
{
uint16_t stream;
RefPtr<DataChannel> channel;
if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) {
LOG(("*** Failed increasing number of streams from %zu (%u/%u)",
mStreams.Length(),
strchg->strchange_instrms,
strchg->strchange_outstrms));
// XXX FIX! notify pending opens of failure
return;
}
if (strchg->strchange_instrms > mStreams.Length()) {
LOG(("Other side increased streams from %zu to %u",
mStreams.Length(), strchg->strchange_instrms));
}
if (strchg->strchange_outstrms > mStreams.Length() ||
strchg->strchange_instrms > mStreams.Length()) {
uint16_t old_len = mStreams.Length();
uint16_t new_len = std::max(strchg->strchange_outstrms,
strchg->strchange_instrms);
LOG(("Increasing number of streams from %u to %u - adding %u (in: %u)",
old_len, new_len, new_len - old_len,
strchg->strchange_instrms));
// make sure both are the same length
mStreams.AppendElements(new_len - old_len);
LOG(("New length = %zu (was %d)", mStreams.Length(), old_len));
for (size_t i = old_len; i < mStreams.Length(); ++i) {
mStreams[i] = nullptr;
}
// Re-process any channels waiting for streams.
// Linear search, but we don't increase channels often and
// the array would only get long in case of an app error normally
// Make sure we request enough streams if there's a big jump in streams
// Could make a more complex API for OpenXxxFinish() and avoid this loop
size_t num_needed = mPending.GetSize();
LOG(("%zu of %d new streams already needed", num_needed,
new_len - old_len));
num_needed -= (new_len - old_len); // number we added
if (num_needed > 0) {
if (num_needed < 16)
num_needed = 16;
LOG(("Not enough new streams, asking for %zu more", num_needed));
// TODO: parameter is an int32_t but we pass size_t
RequestMoreStreams(num_needed);
} else if (strchg->strchange_outstrms < strchg->strchange_instrms) {
LOG(("Requesting %d output streams to match partner",
strchg->strchange_instrms - strchg->strchange_outstrms));
RequestMoreStreams(strchg->strchange_instrms - strchg->strchange_outstrms);
}
ProcessQueuedOpens();
}
// else probably not a change in # of streams
for (uint32_t i = 0; i < mStreams.Length(); ++i) {
channel = mStreams[i];
if (!channel)
continue;
if ((channel->mState == CONNECTING) &&
(channel->mStream == INVALID_STREAM)) {
if ((strchg->strchange_flags & SCTP_STREAM_CHANGE_DENIED) ||
(strchg->strchange_flags & SCTP_STREAM_CHANGE_FAILED)) {
/* XXX: Signal to the other end. */
channel->mState = CLOSED;
Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
channel)));
// maybe fire onError (bug 843625)
} else {
stream = FindFreeStream();
if (stream != INVALID_STREAM) {
channel->mStream = stream;
mStreams[stream] = channel;
// Send open request
int error = SendOpenRequestMessage(
channel->mLabel, channel->mProtocol, channel->mStream,
!!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED), channel->mPrPolicy,
channel->mPrValue);
if (error) {
LOG(("SendOpenRequest failed, error = %d", error));
// Close the channel, inform the user
mStreams[channel->mStream] = nullptr;
channel->mState = CLOSED;
// Don't need to reset; we didn't open it
Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
channel)));
} else {
channel->mState = OPEN;
channel->mFlags |= DATA_CHANNEL_FLAGS_READY;
LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
channel)));
}
} else {
/* We will not find more ... */
break;
}
}
}
}
}
// Called with mLock locked!
void
DataChannelConnection::HandleNotification(const union sctp_notification *notif, size_t n)
{
mLock.AssertCurrentThreadOwns();
if (notif->sn_header.sn_length != (uint32_t)n) {
return;
}
switch (notif->sn_header.sn_type) {
case SCTP_ASSOC_CHANGE:
HandleAssociationChangeEvent(&(notif->sn_assoc_change));
break;
case SCTP_PEER_ADDR_CHANGE:
HandlePeerAddressChangeEvent(&(notif->sn_paddr_change));
break;
case SCTP_REMOTE_ERROR:
HandleRemoteErrorEvent(&(notif->sn_remote_error));
break;
case SCTP_SHUTDOWN_EVENT:
HandleShutdownEvent(&(notif->sn_shutdown_event));
break;
case SCTP_ADAPTATION_INDICATION:
HandleAdaptationIndication(&(notif->sn_adaptation_event));
break;
case SCTP_AUTHENTICATION_EVENT:
LOG(("SCTP_AUTHENTICATION_EVENT"));
break;
case SCTP_SENDER_DRY_EVENT:
//LOG(("SCTP_SENDER_DRY_EVENT"));
break;
case SCTP_NOTIFICATIONS_STOPPED_EVENT:
LOG(("SCTP_NOTIFICATIONS_STOPPED_EVENT"));
break;
case SCTP_PARTIAL_DELIVERY_EVENT:
HandlePartialDeliveryEvent(&(notif->sn_pdapi_event));
break;
case SCTP_SEND_FAILED_EVENT:
HandleSendFailedEvent(&(notif->sn_send_failed_event));
break;
case SCTP_STREAM_RESET_EVENT:
HandleStreamResetEvent(&(notif->sn_strreset_event));
break;
case SCTP_ASSOC_RESET_EVENT:
LOG(("SCTP_ASSOC_RESET_EVENT"));
break;
case SCTP_STREAM_CHANGE_EVENT:
HandleStreamChangeEvent(&(notif->sn_strchange_event));
break;
default:
LOG(("unknown SCTP event: %u", (uint32_t)notif->sn_header.sn_type));
break;
}
}
int
DataChannelConnection::ReceiveCallback(struct socket* sock, void *data, size_t datalen,
struct sctp_rcvinfo rcv, int flags)
{
ASSERT_WEBRTC(!NS_IsMainThread());
if (!data) {
usrsctp_close(sock); // SCTP has finished shutting down
} else {
MutexAutoLock lock(mLock);
if (flags & MSG_NOTIFICATION) {
HandleNotification(static_cast<union sctp_notification *>(data), datalen);
} else {
HandleMessage(data, datalen, ntohl(rcv.rcv_ppid), rcv.rcv_sid, flags);
}
}
// sctp allocates 'data' with malloc(), and expects the receiver to free
// it (presumably with free).
// XXX future optimization: try to deliver messages without an internal
// alloc/copy, and if so delay the free until later.
free(data);
// usrsctp defines the callback as returning an int, but doesn't use it
return 1;
}
already_AddRefed<DataChannel>
DataChannelConnection::Open(const nsACString& label, const nsACString& protocol,
Type type, bool inOrder,
uint32_t prValue, DataChannelListener *aListener,
nsISupports *aContext, bool aExternalNegotiated,
uint16_t aStream)
{
// aStream == INVALID_STREAM to have the protocol allocate
uint16_t prPolicy = SCTP_PR_SCTP_NONE;
uint32_t flags;
LOG(("DC Open: label %s/%s, type %u, inorder %d, prValue %u, listener %p, context %p, external: %s, stream %u",
PromiseFlatCString(label).get(), PromiseFlatCString(protocol).get(),
type, inOrder, prValue, aListener, aContext,
aExternalNegotiated ? "true" : "false", aStream));
switch (type) {
case DATA_CHANNEL_RELIABLE:
prPolicy = SCTP_PR_SCTP_NONE;
break;
case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
prPolicy = SCTP_PR_SCTP_RTX;
break;
case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
prPolicy = SCTP_PR_SCTP_TTL;
break;
default:
LOG(("ERROR: unsupported channel type: %u", type));
MOZ_ASSERT(false);
return nullptr;
}
if ((prPolicy == SCTP_PR_SCTP_NONE) && (prValue != 0)) {
return nullptr;
}
// Don't look past currently-negotiated streams
if (aStream != INVALID_STREAM && aStream < mStreams.Length() && mStreams[aStream]) {
LOG(("ERROR: external negotiation of already-open channel %u", aStream));
// XXX How do we indicate this up to the application? Probably the
// caller's job, but we may need to return an error code.
return nullptr;
}
flags = !inOrder ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
RefPtr<DataChannel> channel(new DataChannel(this,
aStream,
DataChannel::CONNECTING,
label, protocol,
prPolicy, prValue,
flags,
aListener, aContext));
if (aExternalNegotiated) {
channel->mFlags |= DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED;
}
MutexAutoLock lock(mLock); // OpenFinish assumes this
return OpenFinish(channel.forget());
}
// Separate routine so we can also call it to finish up from pending opens
already_AddRefed<DataChannel>
DataChannelConnection::OpenFinish(already_AddRefed<DataChannel>&& aChannel)
{
RefPtr<DataChannel> channel(aChannel); // takes the reference passed in
// Normally 1 reference if called from ::Open(), or 2 if called from
// ProcessQueuedOpens() unless the DOMDataChannel was gc'd
uint16_t stream = channel->mStream;
bool queue = false;
mLock.AssertCurrentThreadOwns();
// Cases we care about:
// Pre-negotiated:
// Not Open:
// Doesn't fit:
// -> change initial ask or renegotiate after open
// -> queue open
// Open:
// Doesn't fit:
// -> RequestMoreStreams && queue
// Does fit:
// -> open
// Not negotiated:
// Not Open:
// -> queue open
// Open:
// -> Try to get a stream
// Doesn't fit:
// -> RequestMoreStreams && queue
// Does fit:
// -> open
// So the Open cases are basically the same
// Not Open cases are simply queue for non-negotiated, and
// either change the initial ask or possibly renegotiate after open.
if (mState == OPEN) {
if (stream == INVALID_STREAM) {
stream = FindFreeStream(); // may be INVALID_STREAM if we need more
}
if (stream == INVALID_STREAM || stream >= mStreams.Length()) {
// RequestMoreStreams() limits to MAX_NUM_STREAMS -- allocate extra streams
// to avoid going back immediately for more if the ask to N, N+1, etc
int32_t more_needed = (stream == INVALID_STREAM) ? 16 :
(stream-((int32_t)mStreams.Length())) + 16;
if (!RequestMoreStreams(more_needed)) {
// Something bad happened... we're done
goto request_error_cleanup;
}
queue = true;
}
} else {
// not OPEN
if (stream != INVALID_STREAM && stream >= mStreams.Length() &&
mState == CLOSED) {
// Update number of streams for init message
struct sctp_initmsg initmsg;
socklen_t len = sizeof(initmsg);
int32_t total_needed = stream+16;
memset(&initmsg, 0, sizeof(initmsg));
if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) {
LOG(("*** failed getsockopt SCTP_INITMSG"));
goto request_error_cleanup;
}
LOG(("Setting number of SCTP streams to %u, was %u/%u", total_needed,
initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
initmsg.sinit_num_ostreams = total_needed;
initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
(socklen_t)sizeof(initmsg)) < 0) {
LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
goto request_error_cleanup;
}
int32_t old_len = mStreams.Length();
mStreams.AppendElements(total_needed - old_len);
for (int32_t i = old_len; i < total_needed; ++i) {
mStreams[i] = nullptr;
}
}
// else if state is CONNECTING, we'll just re-negotiate when OpenFinish
// is called, if needed
queue = true;
}
if (queue) {
LOG(("Queuing channel %p (%u) to finish open", channel.get(), stream));
// Also serves to mark we told the app
channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_OPEN;
// we need a ref for the nsDeQue and one to return
DataChannel* rawChannel = channel;
rawChannel->AddRef();
mPending.Push(rawChannel);
return channel.forget();
}
MOZ_ASSERT(stream != INVALID_STREAM);
// just allocated (& OPEN), or externally negotiated
mStreams[stream] = channel; // holds a reference
channel->mStream = stream;
#ifdef TEST_QUEUED_DATA
// It's painful to write a test for this...
channel->mState = OPEN;
channel->mFlags |= DATA_CHANNEL_FLAGS_READY;
SendDataMsgInternalOrBuffer(channel, "Help me!", 8, DATA_CHANNEL_PPID_DOMSTRING);
#endif
if (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) {
// Don't send unordered until this gets cleared
channel->mFlags |= DATA_CHANNEL_FLAGS_WAITING_ACK;
}
if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
int error = SendOpenRequestMessage(
channel->mLabel, channel->mProtocol, stream,
!!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED), channel->mPrPolicy,
channel->mPrValue);
if (error) {
LOG(("SendOpenRequest failed, error = %d", error));
if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
// We already returned the channel to the app.
NS_ERROR("Failed to send open request");
Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
channel)));
}
// If we haven't returned the channel yet, it will get destroyed when we exit
// this function.
mStreams[stream] = nullptr;
channel->mStream = INVALID_STREAM;
// we'll be destroying the channel
channel->mState = CLOSED;
return nullptr;
/* NOTREACHED */
}
}
// Either externally negotiated or we sent Open
channel->mState = OPEN;
channel->mFlags |= DATA_CHANNEL_FLAGS_READY;
// FIX? Move into DOMDataChannel? I don't think we can send it yet here
LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
channel)));
return channel.forget();
request_error_cleanup:
channel->mState = CLOSED;
if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
// We already returned the channel to the app.
NS_ERROR("Failed to request more streams");
Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
channel)));
return channel.forget();
}
// we'll be destroying the channel, but it never really got set up
// Alternative would be to RUN_ON_THREAD(channel.forget(),::Destroy,...) and
// Dispatch it to ourselves
return nullptr;
}
// Requires mLock to be locked!
// Returns a POSIX error code directly instead of setting errno.
int
DataChannelConnection::SendMsgInternal(OutgoingMsg &msg)
{
auto &info = msg.GetInfo().sendv_sndinfo;
int error;
// EOR set?
bool eor_set = info.snd_flags & SCTP_EOR ? true : false;
// Send until buffer is empty
size_t left = msg.GetLeft();
do {
size_t length;
// Carefully chunk the buffer
if (left > DATA_CHANNEL_MAX_BINARY_FRAGMENT) {
length = DATA_CHANNEL_MAX_BINARY_FRAGMENT;
// Unset EOR flag
info.snd_flags &= ~SCTP_EOR;
} else {
length = left;
// Set EOR flag
if (eor_set) {
info.snd_flags |= SCTP_EOR;
}
}
// Send (or try at least)
// SCTP will return EMSGSIZE if the message is bigger than the buffer
// size (or EAGAIN if there isn't space). However, we can avoid EMSGSIZE
// by carefully crafting small enough message chunks.
ssize_t written = usrsctp_sendv(
mSocket, msg.GetData(), length, nullptr, 0,
(void *)&msg.GetInfo(), (socklen_t)sizeof(struct sctp_sendv_spa),
SCTP_SENDV_SPA, 0);
if (written < 0) {
error = errno;
goto out;
}
LOG(("Sent buffer (written=%zu, len=%zu, left=%zu)",
(size_t)written, length, left - (size_t)written));
// TODO: Remove once resolved (https://github.com/sctplab/usrsctp/issues/132)
if (written == 0) {
LOG(("@tuexen: usrsctp_sendv returned 0"));
error = EAGAIN;
goto out;
}
// If not all bytes have been written, this obviously means that usrsctp's buffer is full
// and we need to try again later.
if ((size_t)written < length) {
msg.Advance((size_t)written);
error = EAGAIN;
goto out;
}
// Update buffer position
msg.Advance((size_t)written);
// Get amount of bytes left in the buffer
left = msg.GetLeft();
} while (left > 0);
// Done
error = 0;
out:
// Reset EOR flag
if (eor_set) {
info.snd_flags |= SCTP_EOR;
}
return error;
}
// Requires mLock to be locked!
// Returns a POSIX error code directly instead of setting errno.
// IMPORTANT: Ensure that the buffer passed is guarded by mLock!
int
DataChannelConnection::SendMsgInternalOrBuffer(nsTArray<nsAutoPtr<BufferedOutgoingMsg>> &buffer,
OutgoingMsg &msg, bool &buffered)
{
NS_WARNING_ASSERTION(msg.GetLength() > 0, "Length is 0?!");
int error = 0;
bool need_buffering = false;
// Note: Main-thread IO, but doesn't block!
// XXX FIX! to deal with heavy overruns of JS trying to pass data in
// (more than the buffersize) queue data onto another thread to do the
// actual sends. See netwerk/protocol/websocket/WebSocketChannel.cpp
// Avoid a race between buffer-full-failure (where we have to add the
// packet to the buffered-data queue) and the buffer-now-only-half-full
// callback, which happens on a different thread. Otherwise we might
// fail here, then before we add it to the queue get the half-full
// callback, find nothing to do, then on this thread add it to the
// queue - which would sit there. Also, if we later send more data, it
// would arrive ahead of the buffered message, but if the buffer ever
// got to 1/2 full, the message would get sent - but at a semi-random
// time, after other data it was supposed to be in front of.
// Must lock before empty check for similar reasons!
mLock.AssertCurrentThreadOwns();
if (buffer.IsEmpty() && (mSendInterleaved || !mPendingType)) {
error = SendMsgInternal(msg);
switch (error) {
case 0:
break;
case EAGAIN:
#if (EAGAIN != EWOULDBLOCK)
case EWOULDBLOCK:
#endif
need_buffering = true;
break;
default:
LOG(("error %d on sending", error));
break;
}
} else {
need_buffering = true;
}
if (need_buffering) {
// queue data for resend! And queue any further data for the stream until it is...
auto *bufferedMsg = new BufferedOutgoingMsg(msg); // infallible malloc
buffer.AppendElement(bufferedMsg); // owned by mBufferedData array
LOG(("Queued %zu buffers (left=%zu, total=%zu)",
buffer.Length(), msg.GetLeft(), msg.GetLength()));
buffered = true;
return 0;
}
buffered = false;
return error;
}
// Caller must ensure that length <= UINT32_MAX
// Returns a POSIX error code.
int
DataChannelConnection::SendDataMsgInternalOrBuffer(DataChannel &channel, const uint8_t *data,
size_t len, uint32_t ppid)
{
if (NS_WARN_IF(channel.mState != OPEN && channel.mState != CONNECTING)) {
return EINVAL; // TODO: Find a better error code
}
struct sctp_sendv_spa info = {0};
// General flags
info.sendv_flags = SCTP_SEND_SNDINFO_VALID;
// Set stream identifier, protocol identifier and flags
info.sendv_sndinfo.snd_sid = channel.mStream;
info.sendv_sndinfo.snd_flags = SCTP_EOR;
info.sendv_sndinfo.snd_ppid = htonl(ppid);
// Unordered?
// To avoid problems where an in-order OPEN is lost and an
// out-of-order data message "beats" it, require data to be in-order
// until we get an ACK.
if ((channel.mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) &&
!(channel.mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK)) {
info.sendv_sndinfo.snd_flags |= SCTP_UNORDERED;
}
// Partial reliability policy
if (channel.mPrPolicy != SCTP_PR_SCTP_NONE) {
info.sendv_prinfo.pr_policy = channel.mPrPolicy;
info.sendv_prinfo.pr_value = channel.mPrValue;
info.sendv_flags |= SCTP_SEND_PRINFO_VALID;
}
// Create message instance and send
OutgoingMsg msg(info, data, len);
MutexAutoLock lock(mLock);
bool buffered;
int error = SendMsgInternalOrBuffer(channel.mBufferedData, msg, buffered);
// Set pending type and stream index (if buffered)
if (!error && buffered && !mPendingType) {
mPendingType = PENDING_DATA;
mCurrentStream = channel.mStream;
}
return error;
}
// Caller must ensure that length <= UINT32_MAX
// Returns a POSIX error code.
int
DataChannelConnection::SendDataMsg(DataChannel &channel, const uint8_t *data, size_t len,
uint32_t ppidPartial, uint32_t ppidFinal)
{
// We *really* don't want to do this from main thread! - and
// SendDataMsgInternalOrBuffer avoids blocking.
if (mPpidFragmentation) {
// TODO: Bug 1381136, remove this block and all other code that uses PPIDs for fragmentation
// and reassembly once older Firefoxes without EOR are no longer supported as target
// clients.
// Use the deprecated PPID-level fragmentation if enabled. Should be enabled
// in case we can be certain that the other peer is an older Firefox browser
// that does support PPID-level fragmentation/reassembly.
// PPID-level fragmentation can only be applied on reliable data channels.
if (len > DATA_CHANNEL_MAX_BINARY_FRAGMENT &&
channel.mPrPolicy == DATA_CHANNEL_RELIABLE &&
!(channel.mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) {
LOG(("Sending data message (total=%zu) using deprecated PPID-based chunks", len));
size_t left = len;
while (left > 0) {
// Note: For correctness, chunkLen should also consider mMaxMessageSize as minimum but as
// this block is going to be removed soon, I see no need for it.
size_t chunkLen = std::min<size_t>(left, DATA_CHANNEL_MAX_BINARY_FRAGMENT);
left -= chunkLen;
uint32_t ppid = left > 0 ? ppidPartial : ppidFinal;
// Send the chunk
// Note that these might end up being deferred and queued.
LOG(("Send chunk (len=%zu, left=%zu, total=%zu, ppid %u",
chunkLen, left, len, ppid));
int error = SendDataMsgInternalOrBuffer(channel, data, chunkLen, ppid);
if (error) {
LOG(("*** send chunk fail %d", error));
return error;
}
// Update data position
data += chunkLen;
}
// Sending chunks complete
LOG(("Sent %zu chunks using deprecated PPID-based fragmentation",
(size_t)(len+DATA_CHANNEL_MAX_BINARY_FRAGMENT-1)/DATA_CHANNEL_MAX_BINARY_FRAGMENT));
return 0;
}
// Cannot do PPID-based fragmentaton on unreliable channels
NS_WARNING_ASSERTION(len <= DATA_CHANNEL_MAX_BINARY_FRAGMENT,
"Sending too-large data on unreliable channel!");
} else {
if (mMaxMessageSize != 0 && len > mMaxMessageSize) {
LOG(("Message rejected, too large (%zu > %" PRIu64 ")", len, mMaxMessageSize));
return EMSGSIZE;
}
}
// This will use EOR-based fragmentation if the message is too large (> 64 KiB)
return SendDataMsgInternalOrBuffer(channel, data, len, ppidFinal);
}
class ReadBlobRunnable : public Runnable {
public:
ReadBlobRunnable(DataChannelConnection* aConnection,
uint16_t aStream,
nsIInputStream* aBlob)
: Runnable("ReadBlobRunnable")
, mConnection(aConnection)
, mStream(aStream)
, mBlob(aBlob)
{}
NS_IMETHOD Run() override {
// ReadBlob() is responsible to releasing the reference
DataChannelConnection *self = mConnection;
self->ReadBlob(mConnection.forget(), mStream, mBlob);
return NS_OK;
}
private:
// Make sure the Connection doesn't die while there are jobs outstanding.
// Let it die (if released by PeerConnectionImpl while we're running)
// when we send our runnable back to MainThread. Then ~DataChannelConnection
// can send the IOThread to MainThread to die in a runnable, avoiding
// unsafe event loop recursion. Evil.
RefPtr<DataChannelConnection> mConnection;
uint16_t mStream;
// Use RefCount for preventing the object is deleted when SendBlob returns.
RefPtr<nsIInputStream> mBlob;
};
// Returns a POSIX error code.
int
DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream *aBlob)
{
DataChannel *channel = mStreams[stream];
if (NS_WARN_IF(!channel)) {
return EINVAL; // TODO: Find a better error code
}
// Spawn a thread to send the data
if (!mInternalIOThread) {
nsresult rv = NS_NewNamedThread("DataChannel IO",
getter_AddRefs(mInternalIOThread));
if (NS_FAILED(rv)) {
return EINVAL; // TODO: Find a better error code
}
}
mInternalIOThread->Dispatch(do_AddRef(new ReadBlobRunnable(this, stream, aBlob)), NS_DISPATCH_NORMAL);
return 0;
}
class DataChannelBlobSendRunnable : public Runnable
{
public:
DataChannelBlobSendRunnable(
already_AddRefed<DataChannelConnection>& aConnection,
uint16_t aStream)
: Runnable("DataChannelBlobSendRunnable")
, mConnection(aConnection)
, mStream(aStream)
{}
~DataChannelBlobSendRunnable() override
{
if (!NS_IsMainThread() && mConnection) {
MOZ_ASSERT(false);
// explicitly leak the connection if destroyed off mainthread
Unused << mConnection.forget().take();
}
}
NS_IMETHOD Run() override
{
ASSERT_WEBRTC(NS_IsMainThread());
mConnection->SendBinaryMsg(mStream, mData);
mConnection = nullptr;
return NS_OK;
}
// explicitly public so we can avoid allocating twice and copying
nsCString mData;
private:
// Note: we can be destroyed off the target thread, so be careful not to let this
// get Released()ed on the temp thread!
RefPtr<DataChannelConnection> mConnection;
uint16_t mStream;
};
void
DataChannelConnection::ReadBlob(already_AddRefed<DataChannelConnection> aThis,
uint16_t aStream, nsIInputStream* aBlob)
{
// NOTE: 'aThis' has been forgotten by the caller to avoid releasing
// it off mainthread; if PeerConnectionImpl has released then we want
// ~DataChannelConnection() to run on MainThread
// XXX to do this safely, we must enqueue these atomically onto the
// output socket. We need a sender thread(s?) to enqueue data into the
// socket and to avoid main-thread IO that might block. Even on a
// background thread, we may not want to block on one stream's data.
// I.e. run non-blocking and service multiple channels.
// For now as a hack, send as a single blast of queued packets which may
// be deferred until buffer space is available.
uint64_t len;
// Must not let Dispatching it cause the DataChannelConnection to get
// released on the wrong thread. Using WrapRunnable(RefPtr<DataChannelConnection>(aThis),...
// will occasionally cause aThis to get released on this thread. Also, an explicit Runnable
// lets us avoid copying the blob data an extra time.
RefPtr<DataChannelBlobSendRunnable> runnable = new DataChannelBlobSendRunnable(aThis,
aStream);
// avoid copying the blob data by passing the mData from the runnable
if (NS_FAILED(aBlob->Available(&len)) ||
NS_FAILED(NS_ReadInputStreamToString(aBlob, runnable->mData, len))) {
// Bug 966602: Doesn't return an error to the caller via onerror.
// We must release DataChannelConnection on MainThread to avoid issues (bug 876167)
// aThis is now owned by the runnable; release it there
NS_ReleaseOnMainThreadSystemGroup(
"DataChannelBlobSendRunnable", runnable.forget());
return;
}
aBlob->Close();
Dispatch(runnable.forget());
}
void
DataChannelConnection::GetStreamIds(std::vector<uint16_t>* aStreamList)
{
ASSERT_WEBRTC(NS_IsMainThread());
for (uint32_t i = 0; i < mStreams.Length(); ++i) {
if (mStreams[i]) {
aStreamList->push_back(mStreams[i]->mStream);
}
}
}
// Returns a POSIX error code.
int
DataChannelConnection::SendDataMsgCommon(uint16_t stream, const nsACString &aMsg,
bool isBinary)
{
ASSERT_WEBRTC(NS_IsMainThread());
// We really could allow this from other threads, so long as we deal with
// asynchronosity issues with channels closing, in particular access to
// mStreams, and issues with the association closing (access to mSocket).
const uint8_t *data = (const uint8_t *)aMsg.BeginReading();
uint32_t len = aMsg.Length();
#if (UINT32_MAX > SIZE_MAX)
if (len > SIZE_MAX) {
return EMSGSIZE;
}
#endif
DataChannel *channelPtr;
LOG(("Sending %sto stream %u: %u bytes", isBinary ? "binary " : "", stream, len));
// XXX if we want more efficiency, translate flags once at open time
channelPtr = mStreams[stream];
if (NS_WARN_IF(!channelPtr)) {
return EINVAL; // TODO: Find a better error code
}
auto &channel = *channelPtr;
if (isBinary) {
return SendDataMsg(channel, data, len,
DATA_CHANNEL_PPID_BINARY_PARTIAL, DATA_CHANNEL_PPID_BINARY);
} else {
return SendDataMsg(channel, data, len,
DATA_CHANNEL_PPID_DOMSTRING_PARTIAL, DATA_CHANNEL_PPID_DOMSTRING);
}
}
void
DataChannelConnection::Stop()
{
// Note: This will call 'CloseAll' from the main thread
Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_DISCONNECTED,
this)));
}
void
DataChannelConnection::Close(DataChannel *aChannel)
{
MutexAutoLock lock(mLock);
CloseInt(aChannel);
}
// So we can call Close() with the lock already held
// Called from someone who holds a ref via ::Close(), or from ~DataChannel
void
DataChannelConnection::CloseInt(DataChannel *aChannel)
{
MOZ_ASSERT(aChannel);
RefPtr<DataChannel> channel(aChannel); // make sure it doesn't go away on us
mLock.AssertCurrentThreadOwns();
LOG(("Connection %p/Channel %p: Closing stream %u",
channel->mConnection.get(), channel.get(), channel->mStream));
// re-test since it may have closed before the lock was grabbed
if (aChannel->mState == CLOSED || aChannel->mState == CLOSING) {
LOG(("Channel already closing/closed (%u)", aChannel->mState));
if (mState == CLOSED && channel->mStream != INVALID_STREAM) {
// called from CloseAll()
// we're not going to hang around waiting any more
mStreams[channel->mStream] = nullptr;
}
return;
}
aChannel->mBufferedData.Clear();
if (channel->mStream != INVALID_STREAM) {
ResetOutgoingStream(channel->mStream);
if (mState == CLOSED) { // called from CloseAll()
// Let resets accumulate then send all at once in CloseAll()
// we're not going to hang around waiting
mStreams[channel->mStream] = nullptr;
} else {
SendOutgoingStreamReset();
}
}
aChannel->mState = CLOSING;
if (mState == CLOSED) {
// we're not going to hang around waiting
channel->StreamClosedLocked();
}
// At this point when we leave here, the object is a zombie held alive only by the DOM object
}
void DataChannelConnection::CloseAll()
{
LOG(("Closing all channels (connection %p)", (void*) this));
// Don't need to lock here
// Make sure no more channels will be opened
{
MutexAutoLock lock(mLock);
mState = CLOSED;
}
// Close current channels
// If there are runnables, they hold a strong ref and keep the channel
// and/or connection alive (even if in a CLOSED state)
bool closed_some = false;
for (uint32_t i = 0; i < mStreams.Length(); ++i) {
if (mStreams[i]) {
mStreams[i]->Close();
closed_some = true;
}
}
// Clean up any pending opens for channels
RefPtr<DataChannel> channel;
while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(mPending.PopFront())))) {
LOG(("closing pending channel %p, stream %u", channel.get(), channel->mStream));
channel->Close(); // also releases the ref on each iteration
closed_some = true;
}
// It's more efficient to let the Resets queue in shutdown and then
// SendOutgoingStreamReset() here.
if (closed_some) {
MutexAutoLock lock(mLock);
SendOutgoingStreamReset();
}
}
DataChannel::~DataChannel()
{
// NS_ASSERTION since this is more "I think I caught all the cases that
// can cause this" than a true kill-the-program assertion. If this is
// wrong, nothing bad happens. A worst it's a leak.
NS_ASSERTION(mState == CLOSED || mState == CLOSING, "unexpected state in ~DataChannel");
}
void
DataChannel::Close()
{
if (mConnection) {
// ensure we don't get deleted
RefPtr<DataChannelConnection> connection(mConnection);
connection->Close(this);
}
}
// Used when disconnecting from the DataChannelConnection
void
DataChannel::StreamClosedLocked()
{
mConnection->mLock.AssertCurrentThreadOwns();
ENSURE_DATACONNECTION;
LOG(("Destroying Data channel %u", mStream));
MOZ_ASSERT_IF(mStream != INVALID_STREAM,
!mConnection->FindChannelByStream(mStream));
mStream = INVALID_STREAM;
mState = CLOSED;
mMainThreadEventTarget->Dispatch(
do_AddRef(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED,
mConnection, this)));
// We leave mConnection live until the DOM releases us, to avoid races
}
void
DataChannel::ReleaseConnection()
{
ASSERT_WEBRTC(NS_IsMainThread());
mConnection = nullptr;
}
void
DataChannel::SetListener(DataChannelListener *aListener, nsISupports *aContext)
{
MutexAutoLock mLock(mListenerLock);
mContext = aContext;
mListener = aListener;
}
void
DataChannel::SendErrnoToErrorResult(int error, ErrorResult& aRv)
{
switch (error) {
case 0:
break;
case EMSGSIZE:
aRv.Throw(NS_ERROR_DOM_TYPE_ERR);
break;
default:
aRv.Throw(NS_ERROR_DOM_OPERATION_ERR);
break;
}
}
void
DataChannel::SendMsg(const nsACString &aMsg, ErrorResult& aRv)
{
if (!EnsureValidStream(aRv)) {
return;
}
SendErrnoToErrorResult(mConnection->SendMsg(mStream, aMsg), aRv);
}
void
DataChannel::SendBinaryMsg(const nsACString &aMsg, ErrorResult& aRv)
{
if (!EnsureValidStream(aRv)) {
return;
}
SendErrnoToErrorResult(mConnection->SendBinaryMsg(mStream, aMsg), aRv);
}
void
DataChannel::SendBinaryStream(nsIInputStream *aBlob,ErrorResult& aRv)
{
if (!EnsureValidStream(aRv)) {
return;
}
SendErrnoToErrorResult(mConnection->SendBlob(mStream, aBlob), aRv);
}
// May be called from another (i.e. Main) thread!
void
DataChannel::AppReady()
{
ENSURE_DATACONNECTION;
MutexAutoLock lock(mConnection->mLock);
mFlags |= DATA_CHANNEL_FLAGS_READY;
if (mState == WAITING_TO_OPEN) {
mState = OPEN;
mMainThreadEventTarget->Dispatch(
do_AddRef(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, mConnection,
this)));
for (uint32_t i = 0; i < mQueuedMessages.Length(); ++i) {
nsCOMPtr<nsIRunnable> runnable = mQueuedMessages[i];
MOZ_ASSERT(runnable);
mMainThreadEventTarget->Dispatch(runnable.forget());
}
} else {
NS_ASSERTION(mQueuedMessages.IsEmpty(), "Shouldn't have queued messages if not WAITING_TO_OPEN");
}
mQueuedMessages.Clear();
mQueuedMessages.Compact();
// We never use it again... We could even allocate the array in the odd
// cases we need it.
}
size_t
DataChannel::GetBufferedAmountLocked() const
{
size_t buffered = 0;
for (auto &msg : mBufferedData) {
buffered += msg->GetLeft();
}
// XXX Note: per Michael Tuexen, there's no way to currently get the buffered
// amount from the SCTP stack for a single stream. It is on their to-do
// list, and once we import a stack with support for that, we'll need to
// add it to what we buffer. Also we'll need to ask for notification of a per-
// stream buffer-low event and merge that into the handling of buffer-low
// (the equivalent to TCP_NOTSENT_LOWAT on TCP sockets)
return buffered;
}
uint32_t
DataChannel::GetBufferedAmountLowThreshold()
{
return mBufferedThreshold;
}
// Never fire immediately, as it's defined to fire on transitions, not state
void
DataChannel::SetBufferedAmountLowThreshold(uint32_t aThreshold)
{
mBufferedThreshold = aThreshold;
}
// Called with mLock locked!
void
DataChannel::SendOrQueue(DataChannelOnMessageAvailable *aMessage)
{
if (!(mFlags & DATA_CHANNEL_FLAGS_READY) &&
(mState == CONNECTING || mState == WAITING_TO_OPEN)) {
mQueuedMessages.AppendElement(aMessage);
} else {
nsCOMPtr<nsIRunnable> runnable = aMessage;
mMainThreadEventTarget->Dispatch(runnable.forget());
}
}
bool
DataChannel::EnsureValidStream(ErrorResult& aRv)
{
MOZ_ASSERT(mConnection);
if (mConnection && mStream != INVALID_STREAM) {
return true;
} else {
aRv.Throw(NS_ERROR_DOM_INVALID_STATE_ERR);
return false;
}
}
} // namespace mozilla