Bug 1434137 - Implement websockets over http/2 - RFC 8441 r=michal,dragana

https://tools.ietf.org/html/rfc8441

This uses our existing http/2 CONNECT infrastructure (modified) to
enable the new extended CONNECT form defined by 8441, and pretend for
the websocket's sake that an http/2 stream is actually a socket. From
the websocket's point of view, this is relatively non-invasive - a few
things have changed (http response code, absence of some headers) versus
http/1.1 websockets, but for the most part, the websocket code doesn't
care.

Differential Revision: https://phabricator.services.mozilla.com/D8016

--HG--
extra : moz-landing-system : lando
This commit is contained in:
Nicholas Hurley 2018-11-06 19:29:57 +00:00
parent da798ba098
commit 27190f4a59
20 changed files with 935 additions and 165 deletions

View File

@ -1817,6 +1817,7 @@ pref("network.http.spdy.push-allowance", 131072); // 128KB
pref("network.http.spdy.pull-allowance", 12582912); // 12MB
pref("network.http.spdy.default-concurrent", 100);
pref("network.http.spdy.default-hpack-buffer", 65536); // 64k
pref("network.http.spdy.websockets", true);
// alt-svc allows separation of transport routing from
// the origin host without using a proxy.

View File

@ -22,7 +22,7 @@ public:
virtual ~ASpdySession() = default;
virtual MOZ_MUST_USE bool
AddStream(nsAHttpTransaction *, int32_t, bool, nsIInterfaceRequestor *) = 0;
AddStream(nsAHttpTransaction *, int32_t, bool, bool, nsIInterfaceRequestor *) = 0;
virtual bool CanReuse() = 0;
virtual bool RoomForMoreStreams() = 0;
virtual PRIntervalTime IdleTime() = 0;
@ -88,6 +88,7 @@ public:
}
virtual void SetCleanShutdown(bool) = 0;
virtual bool CanAcceptWebsocket() = 0;
};
typedef bool (*ALPNCallback) (nsISupports *); // nsISSLSocketControl is typical

View File

@ -1111,13 +1111,16 @@ nsresult
Http2Compressor::EncodeHeaderBlock(const nsCString &nvInput,
const nsACString &method, const nsACString &path,
const nsACString &host, const nsACString &scheme,
bool connectForm, nsACString &output)
const nsACString &protocol, bool simpleConnectForm,
nsACString &output)
{
mSetInitialMaxBufferSizeAllowed = false;
mOutput = &output;
output.Truncate();
mParsedContentLength = -1;
bool isWebsocket = (!simpleConnectForm && !protocol.IsEmpty());
// first thing's first - context size updates (if necessary)
if (mBufferSizeChangeWaiting) {
if (mLowestBufferSizeWaiting < mMaxBufferSetting) {
@ -1128,11 +1131,14 @@ Http2Compressor::EncodeHeaderBlock(const nsCString &nvInput,
}
// colon headers first
if (!connectForm) {
if (!simpleConnectForm) {
ProcessHeader(nvPair(NS_LITERAL_CSTRING(":method"), method), false, false);
ProcessHeader(nvPair(NS_LITERAL_CSTRING(":path"), path), true, false);
ProcessHeader(nvPair(NS_LITERAL_CSTRING(":authority"), host), false, false);
ProcessHeader(nvPair(NS_LITERAL_CSTRING(":scheme"), scheme), false, false);
if (isWebsocket) {
ProcessHeader(nvPair(NS_LITERAL_CSTRING(":protocol"), protocol), false, false);
}
} else {
ProcessHeader(nvPair(NS_LITERAL_CSTRING(":method"), method), false, false);
ProcessHeader(nvPair(NS_LITERAL_CSTRING(":authority"), host), false, false);
@ -1169,7 +1175,8 @@ Http2Compressor::EncodeHeaderBlock(const nsCString &nvInput,
name.EqualsLiteral("proxy-connection") ||
name.EqualsLiteral("te") ||
name.EqualsLiteral("transfer-encoding") ||
name.EqualsLiteral("upgrade")) {
name.EqualsLiteral("upgrade") ||
name.EqualsLiteral("sec-websocket-key")) {
continue;
}
@ -1235,7 +1242,7 @@ Http2Compressor::EncodeHeaderBlock(const nsCString &nvInput,
// transaction) would require totally reworking how/when the transaction
// creates its request stream, which is not worth the effort and risk of
// breakage just to add one header only to h2 connections.
if (!connectForm) {
if (!simpleConnectForm && !isWebsocket) {
// Add in TE: trailers for regular requests
nsAutoCString te("te");
nsAutoCString trailers("trailers");

View File

@ -179,7 +179,8 @@ public:
const nsACString &path,
const nsACString &host,
const nsACString &scheme,
bool connectForm,
const nsACString &protocol,
bool simpleConnectForm,
nsACString &output);
int64_t GetParsedContentLength() { return mParsedContentLength; } // -1 on not found

View File

@ -117,6 +117,9 @@ Http2Session::Http2Session(nsISocketTransport *aSocketTransport, enum SpdyVersio
, mLastRequestBytesSentTime(0)
, mPeerFailedHandshake(false)
, mTrrStreams(0)
, mEnableWebsockets(false)
, mPeerAllowsWebsockets(false)
, mProcessedWaitingWebsockets(false)
{
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
@ -141,6 +144,8 @@ Http2Session::Http2Session(nsISocketTransport *aSocketTransport, enum SpdyVersio
mPreviousPingThreshold = mPingThreshold;
mCurrentForegroundTabOuterContentWindowId =
gHttpHandler->ConnMgr()->CurrentTopLevelOuterContentWindowId();
mEnableWebsockets = gHttpHandler->IsH2WebsocketsEnabled();
}
void
@ -434,6 +439,7 @@ bool
Http2Session::AddStream(nsAHttpTransaction *aHttpTransaction,
int32_t aPriority,
bool aUseTunnel,
bool aIsWebsocket,
nsIInterfaceRequestor *aCallbacks)
{
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
@ -473,6 +479,58 @@ Http2Session::AddStream(nsAHttpTransaction *aHttpTransaction,
aHttpTransaction->SetConnection(this);
aHttpTransaction->OnActivated();
if (aIsWebsocket) {
MOZ_ASSERT(!aUseTunnel, "Websocket on tunnel?!");
nsHttpTransaction *trans = aHttpTransaction->QueryHttpTransaction();
MOZ_ASSERT(trans, "Websocket without transaction?!");
if (!trans) {
LOG3(("Http2Session::AddStream %p websocket without transaction. WAT?!", this));
return true;
}
if (!mEnableWebsockets) {
LOG3(("Http2Session::AddStream %p Re-queuing websocket as h1 due to "
"mEnableWebsockets=false", this));
aHttpTransaction->SetConnection(nullptr);
aHttpTransaction->DisableSpdy();
nsresult rv = gHttpHandler->InitiateTransaction(trans, trans->Priority());
if (NS_FAILED(rv)) {
LOG3(("Http2Session::AddStream %p failed to reinitiate websocket "
"transaction (0x%08x).", this, static_cast<uint32_t>(rv)));
}
return true;
}
if (!mPeerAllowsWebsockets) {
LOG3(("Http2Session::AddStream %p mPeerAllowsWebsockets=false", this));
if (!mProcessedWaitingWebsockets) {
LOG3(("Http2Session::AddStream %p waiting for SETTINGS to determine "
"fate of websocket", this));
mWaitingWebsockets.AppendElement(aHttpTransaction);
mWaitingWebsocketCallbacks.AppendElement(aCallbacks);
} else {
LOG3(("Http2Session::AddStream %p Re-queuing websocket as h1 due to "
"mPeerAllowsWebsockets=false", this));
aHttpTransaction->SetConnection(nullptr);
aHttpTransaction->DisableSpdy();
if (trans) {
nsresult rv = gHttpHandler->InitiateTransaction(trans, trans->Priority());
if (NS_FAILED(rv)) {
LOG3(("Http2Session::AddStream %p failed to reinitiate websocket "
"transaction (%08x).\n", this, static_cast<uint32_t>(rv)));
}
}
}
return true;
}
LOG3(("Http2Session::AddStream session=%p trans=%p websocket",
this, aHttpTransaction));
CreateWebsocketStream(aHttpTransaction, aCallbacks);
return true;
}
if (aUseTunnel) {
LOG3(("Http2Session::AddStream session=%p trans=%p OnTunnel",
this, aHttpTransaction));
@ -1676,7 +1734,23 @@ Http2Session::RecvSettings(Http2Session *self)
}
break;
case SETTINGS_TYPE_ENABLE_CONNECT_PROTOCOL:
{
if (value == 1) {
LOG3(("Enabling extended CONNECT"));
self->mPeerAllowsWebsockets = true;
} else if (value > 1) {
LOG3(("Peer sent invalid value for ENABLE_CONNECT_PROTOCOL %d", value));
return self->SessionError(PROTOCOL_ERROR);
} else if (self->mPeerAllowsWebsockets) {
LOG3(("Peer tried to re-disable extended CONNECT"));
return self->SessionError(PROTOCOL_ERROR);
}
}
break;
default:
LOG3(("Received an unknown SETTING id %d. Ignoring.", id));
break;
}
}
@ -1689,6 +1763,10 @@ Http2Session::RecvSettings(Http2Session *self)
self->mGoAwayOnPush = true;
}
if (!self->mProcessedWaitingWebsockets) {
self->ProcessWaitingWebsockets();
}
return NS_OK;
}
@ -4087,8 +4165,8 @@ Http2Session::CreateTunnel(nsHttpTransaction *trans,
// to the correct security callbacks
RefPtr<SpdyConnectTransaction> connectTrans =
new SpdyConnectTransaction(ci, aCallbacks, trans->Caps(), trans, this);
DebugOnly<bool> rv = AddStream(connectTrans, nsISupportsPriority::PRIORITY_NORMAL, false, nullptr);
new SpdyConnectTransaction(ci, aCallbacks, trans->Caps(), trans, this, false);
DebugOnly<bool> rv = AddStream(connectTrans, nsISupportsPriority::PRIORITY_NORMAL, false, false, nullptr);
MOZ_ASSERT(rv);
Http2Stream *tunnel = mStreamTransactionHash.Get(connectTrans);
MOZ_ASSERT(tunnel);
@ -4660,5 +4738,80 @@ Http2Session::SetCleanShutdown(bool aCleanShutdown)
mCleanShutdown = aCleanShutdown;
}
void
Http2Session::CreateWebsocketStream(nsAHttpTransaction *aOriginalTransaction,
nsIInterfaceRequestor *aCallbacks)
{
LOG(("Http2Session::CreateWebsocketStream %p %p\n", this, aOriginalTransaction));
nsHttpTransaction *trans = aOriginalTransaction->QueryHttpTransaction();
MOZ_ASSERT(trans);
nsHttpConnectionInfo *ci = aOriginalTransaction->ConnectionInfo();
MOZ_ASSERT(ci);
RefPtr<SpdyConnectTransaction> connectTrans =
new SpdyConnectTransaction(ci, aCallbacks, trans->Caps(), trans, this, true);
DebugOnly<bool> rv = AddStream(connectTrans, nsISupportsPriority::PRIORITY_NORMAL, false, false, nullptr);
MOZ_ASSERT(rv);
}
void
Http2Session::ProcessWaitingWebsockets()
{
MOZ_ASSERT(!mProcessedWaitingWebsockets);
MOZ_ASSERT(mWaitingWebsockets.Length() == mWaitingWebsocketCallbacks.Length());
mProcessedWaitingWebsockets = true;
if (!mWaitingWebsockets.Length()) {
// Nothing to do here
LOG3(("Http2Session::ProcessWaitingWebsockets %p nothing to do", this));
return;
}
for (size_t i = 0; i < mWaitingWebsockets.Length(); ++i) {
RefPtr<nsAHttpTransaction> httpTransaction = mWaitingWebsockets[i];
nsCOMPtr<nsIInterfaceRequestor> callbacks = mWaitingWebsocketCallbacks[i];
if (mPeerAllowsWebsockets) {
LOG3(("Http2Session::ProcessWaitingWebsockets session=%p trans=%p websocket",
this, httpTransaction.get()));
CreateWebsocketStream(httpTransaction, callbacks);
} else {
LOG3(("Http2Session::ProcessWaitingWebsockets %p Re-queuing websocket as "
"h1 due to mPeerAllowsWebsockets=false", this));
httpTransaction->SetConnection(nullptr);
httpTransaction->DisableSpdy();
nsHttpTransaction *trans = httpTransaction->QueryHttpTransaction();
if (trans) {
nsresult rv = gHttpHandler->InitiateTransaction(trans, trans->Priority());
if (NS_FAILED(rv)) {
LOG3(("Http2Session::ProcessWaitingWebsockets %p failed to reinitiate "
"websocket transaction (%08x).\n", this, static_cast<uint32_t>(rv)));
}
} else {
LOG3(("Http2Session::ProcessWaitingWebsockets %p missing transaction?!", this));
}
}
}
mWaitingWebsockets.Clear();
mWaitingWebsocketCallbacks.Clear();
}
bool
Http2Session::CanAcceptWebsocket()
{
LOG3(("Http2Session::CanAcceptWebsocket %p enable=%d allow=%d processed=%d",
this, mEnableWebsockets, mPeerAllowsWebsockets, mProcessedWaitingWebsockets));
if (mEnableWebsockets &&
(mPeerAllowsWebsockets || !mProcessedWaitingWebsockets)) {
return true;
}
return false;
}
} // namespace net
} // namespace mozilla

View File

@ -49,7 +49,7 @@ public:
Http2Session(nsISocketTransport *, enum SpdyVersion version, bool attemptingEarlyData);
MOZ_MUST_USE bool AddStream(nsAHttpTransaction *, int32_t,
bool, nsIInterfaceRequestor *) override;
bool, bool, nsIInterfaceRequestor *) override;
bool CanReuse() override { return !mShouldGoAway && !mClosed; }
bool RoomForMoreStreams() override;
enum SpdyVersion SpdyVersion() override;
@ -133,7 +133,10 @@ public:
SETTINGS_TYPE_ENABLE_PUSH = 2, // can be used to disable push
SETTINGS_TYPE_MAX_CONCURRENT = 3, // streams recvr allowed to initiate
SETTINGS_TYPE_INITIAL_WINDOW = 4, // bytes for flow control default
SETTINGS_TYPE_MAX_FRAME_SIZE = 5 // max frame size settings sender allows receipt of
SETTINGS_TYPE_MAX_FRAME_SIZE = 5, // max frame size settings sender allows receipt of
// 6 is SETTINGS_TYPE_MAX_HEADER_LIST - advisory, we ignore it
// 7 is unassigned
SETTINGS_TYPE_ENABLE_CONNECT_PROTOCOL = 8 // if sender implements extended CONNECT for websockets
};
// This should be big enough to hold all of your control packets,
@ -263,6 +266,8 @@ public:
void SendPriorityFrame(uint32_t streamID, uint32_t dependsOn, uint8_t weight);
void IncrementTrrCounter() { mTrrStreams++; }
bool CanAcceptWebsocket() override;
private:
// These internal states do not correspond to the states of the HTTP/2 specification
@ -581,6 +586,15 @@ private:
uint32_t FindTunnelCount(nsHttpConnectionInfo *);
nsDataHashtable<nsCStringHashKey, uint32_t> mTunnelHash;
uint32_t mTrrStreams;
// websockets
void CreateWebsocketStream(nsAHttpTransaction *, nsIInterfaceRequestor *);
void ProcessWaitingWebsockets();
bool mEnableWebsockets; // Whether we allow websockets, based on a pref
bool mPeerAllowsWebsockets; // Whether our peer allows websockets, based on SETTINGS
bool mProcessedWaitingWebsockets; // True once we've received at least one SETTINGS
nsTArray<RefPtr<nsAHttpTransaction>> mWaitingWebsockets; // Websocket transactions that may be waiting for the opening SETTINGS
nsCOMArray<nsIInterfaceRequestor> mWaitingWebsocketCallbacks;
};
} // namespace net

View File

@ -77,6 +77,7 @@ Http2Stream::Http2Stream(nsAHttpTransaction *httpTransaction,
, mAttempting0RTT(false)
, mIsTunnel(false)
, mPlainTextTunnel(false)
, mIsWebsocket(false)
{
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
@ -566,32 +567,46 @@ Http2Stream::GenerateOpen()
nsDependentCString scheme(head->IsHTTPS() ? "https" : "http");
if (head->IsConnect()) {
MOZ_ASSERT(mTransaction->QuerySpdyConnectTransaction());
mIsTunnel = true;
SpdyConnectTransaction *scTrans = mTransaction->QuerySpdyConnectTransaction();
MOZ_ASSERT(scTrans);
if (scTrans->IsWebsocket()) {
mIsWebsocket = true;
} else {
mIsTunnel = true;
}
mRequestBodyLenRemaining = 0x0fffffffffffffffULL;
// Our normal authority has an implicit port, best to use an
// explicit one with a tunnel
nsHttpConnectionInfo *ci = mTransaction->ConnectionInfo();
if (!ci) {
return NS_ERROR_UNEXPECTED;
}
if (mIsTunnel) {
// Our normal authority has an implicit port, best to use an
// explicit one with a tunnel
nsHttpConnectionInfo *ci = mTransaction->ConnectionInfo();
if (!ci) {
return NS_ERROR_UNEXPECTED;
}
authorityHeader = ci->GetOrigin();
authorityHeader.Append(':');
authorityHeader.AppendInt(ci->OriginPort());
authorityHeader = ci->GetOrigin();
authorityHeader.Append(':');
authorityHeader.AppendInt(ci->OriginPort());
}
}
nsAutoCString method;
nsAutoCString path;
head->Method(method);
head->Path(path);
bool useSimpleConnect = head->IsConnect();
nsAutoCString protocol;
if (mIsWebsocket) {
useSimpleConnect = false;
protocol.AppendLiteral("websocket");
}
rv = mSession->Compressor()->EncodeHeaderBlock(mFlatHttpRequestHeaders,
method,
path,
authorityHeader,
scheme,
head->IsConnect(),
protocol,
useSimpleConnect,
compressedData);
NS_ENSURE_SUCCESS(rv, rv);
@ -1072,6 +1087,11 @@ Http2Stream::ConvertResponseHeaders(Http2Decompressor *decompressor,
}
MapStreamToHttpConnection();
ClearTransactionsBlockedOnTunnel();
} else if (mIsWebsocket) {
LOG3(("Http2Stream %p websocket response code %d", this, httpResponseCode));
if (httpResponseCode == 200) {
MapStreamToHttpConnection();
}
}
if (httpResponseCode == 101) {

View File

@ -373,6 +373,12 @@ private:
bool mIsTunnel;
bool mPlainTextTunnel;
/// websockets
public:
bool IsWebsocket() { return mIsWebsocket; }
private:
bool mIsWebsocket;
};
} // namespace net

View File

@ -26,6 +26,7 @@
#include "nsComponentManagerUtils.h"
#include "nsSocketTransport2.h"
#include "nsSocketTransportService2.h"
#include "mozilla/Mutex.h"
namespace mozilla {
namespace net {
@ -919,14 +920,72 @@ public:
NS_DECL_NSITRANSPORT
NS_DECL_NSISOCKETTRANSPORT
explicit SocketTransportShim(nsISocketTransport *aWrapped)
explicit SocketTransportShim(nsISocketTransport *aWrapped, bool aIsWebsocket)
: mWrapped(aWrapped)
, mIsWebsocket(aIsWebsocket)
{};
private:
virtual ~SocketTransportShim() = default;;
virtual ~SocketTransportShim() = default;
nsCOMPtr<nsISocketTransport> mWrapped;
bool mIsWebsocket;
nsCOMPtr<nsIInterfaceRequestor> mSecurityCallbacks;
};
class WeakTransProxy final : public nsISupports
{
public:
NS_DECL_THREADSAFE_ISUPPORTS
explicit WeakTransProxy(SpdyConnectTransaction *aTrans)
{
MOZ_ASSERT(OnSocketThread());
mWeakTrans = do_GetWeakReference(aTrans);
}
already_AddRefed<NullHttpTransaction> QueryTransaction()
{
MOZ_ASSERT(OnSocketThread());
RefPtr<NullHttpTransaction> trans = do_QueryReferent(mWeakTrans);
return trans.forget();
}
private:
~WeakTransProxy()
{
MOZ_ASSERT(OnSocketThread());
}
nsWeakPtr mWeakTrans;
};
NS_IMPL_ISUPPORTS(WeakTransProxy, nsISupports)
class WeakTransFreeProxy final : public Runnable
{
public:
explicit WeakTransFreeProxy(WeakTransProxy *proxy)
: Runnable("WeakTransFreeProxy")
, mProxy(proxy)
{ }
NS_IMETHOD Run() override
{
MOZ_ASSERT(OnSocketThread());
mProxy = nullptr;
return NS_OK;
}
void Dispatch()
{
MOZ_ASSERT(!OnSocketThread());
nsCOMPtr<nsIEventTarget> sts = do_GetService("@mozilla.org/network/socket-transport-service;1");
Unused << sts->Dispatch(this, nsIEventTarget::DISPATCH_NORMAL);
}
private:
RefPtr<WeakTransProxy> mProxy;
};
class OutputStreamShim : public nsIAsyncOutputStream
@ -937,20 +996,39 @@ public:
NS_DECL_NSIASYNCOUTPUTSTREAM
friend class SpdyConnectTransaction;
friend class WebsocketHasDataToWrite;
friend class OutputCloseTransaction;
explicit OutputStreamShim(SpdyConnectTransaction *aTrans)
OutputStreamShim(SpdyConnectTransaction *aTrans, bool aIsWebsocket)
: mCallback(nullptr)
, mStatus(NS_OK)
, mMutex("OutputStreamShim")
, mIsWebsocket(aIsWebsocket)
{
mWeakTrans = do_GetWeakReference(aTrans);
mWeakTrans = new WeakTransProxy(aTrans);
}
private:
virtual ~OutputStreamShim() = default;;
nsIOutputStreamCallback *GetCallback();
nsWeakPtr mWeakTrans; // SpdyConnectTransaction *
private:
virtual ~OutputStreamShim()
{
if (!OnSocketThread()) {
RefPtr<WeakTransFreeProxy> p = new WeakTransFreeProxy(mWeakTrans);
mWeakTrans = nullptr;
p->Dispatch();
}
}
RefPtr<WeakTransProxy> mWeakTrans; // SpdyConnectTransaction *
nsIOutputStreamCallback *mCallback;
nsresult mStatus;
mozilla::Mutex mMutex;
// Websockets
bool mIsWebsocket;
nsresult CallTransactionHasDataToWrite();
nsresult CloseTransaction(nsresult reason);
};
class InputStreamShim : public nsIAsyncInputStream
@ -961,27 +1039,45 @@ public:
NS_DECL_NSIASYNCINPUTSTREAM
friend class SpdyConnectTransaction;
friend class InputCloseTransaction;
explicit InputStreamShim(SpdyConnectTransaction *aTrans)
InputStreamShim(SpdyConnectTransaction *aTrans, bool aIsWebsocket)
: mCallback(nullptr)
, mStatus(NS_OK)
, mMutex("InputStreamShim")
, mIsWebsocket(aIsWebsocket)
{
mWeakTrans = do_GetWeakReference(aTrans);
mWeakTrans = new WeakTransProxy(aTrans);
}
private:
virtual ~InputStreamShim() = default;;
nsIInputStreamCallback *GetCallback();
nsWeakPtr mWeakTrans; // SpdyConnectTransaction *
private:
virtual ~InputStreamShim()
{
if (!OnSocketThread()) {
RefPtr<WeakTransFreeProxy> p = new WeakTransFreeProxy(mWeakTrans);
mWeakTrans = nullptr;
p->Dispatch();
}
}
RefPtr<WeakTransProxy> mWeakTrans; // SpdyConnectTransaction *
nsIInputStreamCallback *mCallback;
nsresult mStatus;
mozilla::Mutex mMutex;
// Websockets
bool mIsWebsocket;
nsresult CloseTransaction(nsresult reason);
};
SpdyConnectTransaction::SpdyConnectTransaction(nsHttpConnectionInfo *ci,
nsIInterfaceRequestor *callbacks,
uint32_t caps,
nsHttpTransaction *trans,
nsAHttpConnection *session)
nsAHttpConnection *session,
bool isWebsocket)
: NullHttpTransaction(ci, callbacks, caps | NS_HTTP_ALLOW_KEEPALIVE)
, mConnectStringOffset(0)
, mSession(session)
@ -993,13 +1089,23 @@ SpdyConnectTransaction::SpdyConnectTransaction(nsHttpConnectionInfo *ci,
, mOutputDataUsed(0)
, mOutputDataOffset(0)
, mForcePlainText(false)
, mIsWebsocket(isWebsocket)
, mConnRefTaken(false)
{
LOG(("SpdyConnectTransaction ctor %p\n", this));
mTimestampSyn = TimeStamp::Now();
mRequestHead = new nsHttpRequestHead();
if (mIsWebsocket) {
// Ensure our request head has all the websocket headers duplicated from the
// original transaction before calling the boilerplate stuff to create the
// rest of the CONNECT headers.
trans->RequestHead()->Enter();
mRequestHead->SetHeaders(trans->RequestHead()->Headers());
trans->RequestHead()->Exit();
}
DebugOnly<nsresult> rv =
nsHttpConnection::MakeConnectString(trans, mRequestHead, mConnectString);
nsHttpConnection::MakeConnectString(trans, mRequestHead, mConnectString, mIsWebsocket);
MOZ_ASSERT(NS_SUCCEEDED(rv));
mDrivingTransaction = trans;
}
@ -1010,6 +1116,7 @@ SpdyConnectTransaction::~SpdyConnectTransaction()
if (mDrivingTransaction) {
// requeue it I guess. This should be gone.
mDrivingTransaction->SetH2WSTransaction(nullptr);
Unused << gHttpHandler->InitiateTransaction(mDrivingTransaction,
mDrivingTransaction->Priority());
}
@ -1022,6 +1129,7 @@ SpdyConnectTransaction::ForcePlainText()
MOZ_ASSERT(!mInputDataUsed && !mInputDataSize && !mInputDataOffset);
MOZ_ASSERT(!mForcePlainText);
MOZ_ASSERT(!mTunnelTransport, "call before mapstreamtohttpconnection");
MOZ_ASSERT(!mIsWebsocket);
mForcePlainText = true;
}
@ -1032,9 +1140,9 @@ SpdyConnectTransaction::MapStreamToHttpConnection(nsISocketTransport *aTransport
{
mConnInfo = aConnInfo;
mTunnelTransport = new SocketTransportShim(aTransport);
mTunnelStreamIn = new InputStreamShim(this);
mTunnelStreamOut = new OutputStreamShim(this);
mTunnelTransport = new SocketTransportShim(aTransport, mIsWebsocket);
mTunnelStreamIn = new InputStreamShim(this, mIsWebsocket);
mTunnelStreamOut = new OutputStreamShim(this, mIsWebsocket);
mTunneledConn = new nsHttpConnection();
// this new http connection has a specific hashkey (i.e. to a particular
@ -1045,7 +1153,7 @@ SpdyConnectTransaction::MapStreamToHttpConnection(nsISocketTransport *aTransport
nsCOMPtr<nsIInterfaceRequestor> callbacks;
GetSecurityCallbacks(getter_AddRefs(callbacks));
mTunneledConn->SetTransactionCaps(Caps());
MOZ_ASSERT(aConnInfo->UsingHttpsProxy());
MOZ_ASSERT(aConnInfo->UsingHttpsProxy() || mIsWebsocket);
TimeDuration rtt = TimeStamp::Now() - mTimestampSyn;
DebugOnly<nsresult> rv =
mTunneledConn->Init(aConnInfo,
@ -1057,6 +1165,11 @@ SpdyConnectTransaction::MapStreamToHttpConnection(nsISocketTransport *aTransport
MOZ_ASSERT(NS_SUCCEEDED(rv));
if (mForcePlainText) {
mTunneledConn->ForcePlainText();
} else if (mIsWebsocket) {
LOG(("SpdyConnectTransaction::MapStreamToHttpConnection %p websocket", this));
// Let the root transaction know about us, so it can pass our own conn
// to the websocket.
mDrivingTransaction->SetH2WSTransaction(this);
} else {
mTunneledConn->SetupSecondaryTLS(this);
mTunneledConn->SetInSpdyTunnel(true);
@ -1068,10 +1181,12 @@ SpdyConnectTransaction::MapStreamToHttpConnection(nsISocketTransport *aTransport
mDrivingTransaction->SetConnection(wrappedConn);
mDrivingTransaction->MakeSticky();
// jump the priority and start the dispatcher
Unused << gHttpHandler->InitiateTransaction(
mDrivingTransaction, nsISupportsPriority::PRIORITY_HIGHEST - 60);
mDrivingTransaction = nullptr;
if (!mIsWebsocket) {
// jump the priority and start the dispatcher
Unused << gHttpHandler->InitiateTransaction(
mDrivingTransaction, nsISupportsPriority::PRIORITY_HIGHEST - 60);
mDrivingTransaction = nullptr;
}
}
nsresult
@ -1164,21 +1279,26 @@ SpdyConnectTransaction::ReadSegments(nsAHttpSegmentReader *reader,
}
*countRead = 0;
Unused << Flush(count, countRead);
if (!mTunnelStreamOut->mCallback) {
nsresult rv = Flush(count, countRead);
nsIOutputStreamCallback *cb = mTunnelStreamOut->GetCallback();
if (!cb && !(*countRead)) {
return NS_BASE_STREAM_WOULD_BLOCK;
}
nsresult rv =
mTunnelStreamOut->mCallback->OnOutputStreamReady(mTunnelStreamOut);
if (NS_FAILED(rv)) {
return rv;
if (cb) {
// See if there is any more data available
rv = cb->OnOutputStreamReady(mTunnelStreamOut);
if (NS_FAILED(rv)) {
return rv;
}
// Write out anything that may have come out of the stream just above
uint32_t subtotal;
count -= *countRead;
rv = Flush(count, &subtotal);
*countRead += subtotal;
}
uint32_t subtotal;
count -= *countRead;
rv = Flush(count, &subtotal);
*countRead += subtotal;
return rv;
}
@ -1196,26 +1316,26 @@ SpdyConnectTransaction::CreateShimError(nsresult code)
mTunnelStreamIn->mStatus = code;
}
if (mTunnelStreamIn && mTunnelStreamIn->mCallback) {
mTunnelStreamIn->mCallback->OnInputStreamReady(mTunnelStreamIn);
if (mTunnelStreamIn) {
nsIInputStreamCallback *cb = mTunnelStreamIn->GetCallback();
if (cb) {
cb->OnInputStreamReady(mTunnelStreamIn);
}
}
if (mTunnelStreamOut && mTunnelStreamOut->mCallback) {
mTunnelStreamOut->mCallback->OnOutputStreamReady(mTunnelStreamOut);
if (mTunnelStreamOut) {
nsIOutputStreamCallback *cb = mTunnelStreamOut->GetCallback();
if (cb) {
cb->OnOutputStreamReady(mTunnelStreamOut);
}
}
}
nsresult
SpdyConnectTransaction::WriteSegments(nsAHttpSegmentWriter *writer,
uint32_t count,
uint32_t *countWritten)
SpdyConnectTransaction::WriteDataToBuffer(nsAHttpSegmentWriter *writer,
uint32_t count,
uint32_t *countWritten)
{
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
LOG(("SpdyConnectTransaction::WriteSegments %p max=%d cb=%p\n",
this, count, mTunneledConn ? mTunnelStreamIn->mCallback : nullptr));
// first call into the tunnel stream to get the demux'd data out of the
// spdy session.
EnsureBuffer(mInputData, mInputDataUsed + count, mInputDataUsed, mInputDataSize);
nsresult rv = writer->OnWriteSegment(&mInputData[mInputDataUsed],
count, countWritten);
@ -1231,11 +1351,38 @@ SpdyConnectTransaction::WriteSegments(nsAHttpSegmentWriter *writer,
LOG(("SpdyConnectTransaction %p %d new bytes [%d total] of ciphered data buffered\n",
this, *countWritten, mInputDataUsed - mInputDataOffset));
if (!mTunneledConn || !mTunnelStreamIn->mCallback) {
return rv;
}
nsresult
SpdyConnectTransaction::WriteSegments(nsAHttpSegmentWriter *writer,
uint32_t count,
uint32_t *countWritten)
{
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
nsIInputStreamCallback *cb = mTunneledConn ? mTunnelStreamIn->GetCallback() : nullptr;
LOG(("SpdyConnectTransaction::WriteSegments %p max=%d cb=%p\n",
this, count, cb));
// For websockets, we need to forward the initial response through to the base
// transaction so the normal websocket plumbing can do all the things it needs
// to do.
if (mIsWebsocket) {
return WebsocketWriteSegments(writer, count, countWritten);
}
// first call into the tunnel stream to get the demux'd data out of the
// spdy session.
nsresult rv = WriteDataToBuffer(writer, count, countWritten);
if (NS_FAILED(rv)) {
return rv;
}
if (!mTunneledConn || cb) {
return NS_BASE_STREAM_WOULD_BLOCK;
}
rv = mTunnelStreamIn->mCallback->OnInputStreamReady(mTunnelStreamIn);
rv = cb->OnInputStreamReady(mTunnelStreamIn);
LOG(("SpdyConnectTransaction::WriteSegments %p "
"after InputStreamReady callback %d total of ciphered data buffered rv=%"
PRIx32 "\n",
@ -1244,15 +1391,51 @@ SpdyConnectTransaction::WriteSegments(nsAHttpSegmentWriter *writer,
"goodput %p out %" PRId64 "\n", this, mTunneledConn.get(),
mTunneledConn->ContentBytesWritten()));
if (NS_SUCCEEDED(rv) && !mTunneledConn->ContentBytesWritten()) {
mTunnelStreamOut->AsyncWait(mTunnelStreamOut->mCallback, 0, 0, nullptr);
nsIOutputStreamCallback *ocb = mTunnelStreamOut->GetCallback();
mTunnelStreamOut->AsyncWait(ocb, 0, 0, nullptr);
}
return rv;
}
nsresult
SpdyConnectTransaction::WebsocketWriteSegments(nsAHttpSegmentWriter *writer,
uint32_t count,
uint32_t *countWritten)
{
MOZ_ASSERT(mIsWebsocket);
if (mDrivingTransaction && !mDrivingTransaction->IsDone()) {
// Transaction hasn't received end of headers yet, so keep passing data to
// it until it has. Then we can take over.
nsresult rv = mDrivingTransaction->WriteSegments(writer, count, countWritten);
if (NS_SUCCEEDED(rv) && mDrivingTransaction->IsDone() && !mConnRefTaken) {
mDrivingTransaction->Close(NS_OK);
}
}
if (!mConnRefTaken) {
// Force driving transaction to finish so the websocket channel can get its
// notifications correctly and start driving.
MOZ_ASSERT(mDrivingTransaction);
mDrivingTransaction->Close(NS_OK);
}
nsresult rv = WriteDataToBuffer(writer, count, countWritten);
if (NS_SUCCEEDED(rv)) {
nsIInputStreamCallback *cb = mTunnelStreamIn->GetCallback();
if (!cb) {
rv = NS_BASE_STREAM_WOULD_BLOCK;
} else {
rv = cb->OnInputStreamReady(mTunnelStreamIn);
}
}
return rv;
}
bool
SpdyConnectTransaction::ConnectedReadyForInput()
{
return mTunneledConn && mTunnelStreamIn->mCallback;
return mTunneledConn && mTunnelStreamIn->GetCallback();
}
nsHttpRequestHead *
@ -1266,6 +1449,15 @@ SpdyConnectTransaction::Close(nsresult code)
{
LOG(("SpdyConnectTransaction close %p %" PRIx32 "\n", this, static_cast<uint32_t>(code)));
if (mIsWebsocket && mDrivingTransaction) {
mDrivingTransaction->SetH2WSTransaction(nullptr);
if (!mConnRefTaken) {
// This indicates that the websocket failed to set up, so just close down
// the transaction as usual.
mDrivingTransaction->Close(code);
mDrivingTransaction = nullptr;
}
}
NullHttpTransaction::Close(code);
if (NS_FAILED(code) && (code != NS_BASE_STREAM_WOULD_BLOCK)) {
CreateShimError(code);
@ -1274,40 +1466,123 @@ SpdyConnectTransaction::Close(nsresult code)
}
}
void
SpdyConnectTransaction::SetConnRefTaken()
{
mConnRefTaken = true;
mDrivingTransaction = nullptr; // Just in case
}
nsIOutputStreamCallback *
OutputStreamShim::GetCallback()
{
mozilla::MutexAutoLock lock(mMutex);
return mCallback;
}
class WebsocketHasDataToWrite final : public Runnable
{
public:
explicit WebsocketHasDataToWrite(OutputStreamShim *shim)
: Runnable("WebsocketHasDataToWrite")
, mShim(shim)
{ }
~WebsocketHasDataToWrite() = default;
NS_IMETHOD Run() override
{
return mShim->CallTransactionHasDataToWrite();
}
MOZ_MUST_USE nsresult Dispatch()
{
if (OnSocketThread()) {
return Run();
}
nsCOMPtr<nsIEventTarget> sts = do_GetService("@mozilla.org/network/socket-transport-service;1");
return sts->Dispatch(this, nsIEventTarget::DISPATCH_NORMAL);
}
private:
RefPtr<OutputStreamShim> mShim;
};
NS_IMETHODIMP
OutputStreamShim::AsyncWait(nsIOutputStreamCallback *callback,
unsigned int, unsigned int, nsIEventTarget *target)
unsigned int flags, unsigned int requestedCount,
nsIEventTarget *target)
{
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
bool currentThread;
if (mIsWebsocket) {
// With websockets, AsyncWait may be called from the main thread, but the
// target is on the socket thread. That's all we really care about.
nsCOMPtr<nsIEventTarget> sts = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID);
MOZ_ASSERT((!target && !callback) || (target == sts));
if (target && (target != sts)) {
return NS_ERROR_FAILURE;
}
} else {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
bool currentThread;
if (target &&
(NS_FAILED(target->IsOnCurrentThread(&currentThread)) || !currentThread)) {
return NS_ERROR_FAILURE;
if (target &&
(NS_FAILED(target->IsOnCurrentThread(&currentThread)) || !currentThread)) {
return NS_ERROR_FAILURE;
}
}
LOG(("OutputStreamShim::AsyncWait %p callback %p\n", this, callback));
mCallback = callback;
RefPtr<NullHttpTransaction> baseTrans(do_QueryReferent(mWeakTrans));
if (!baseTrans) {
return NS_ERROR_FAILURE;
}
SpdyConnectTransaction *trans = baseTrans->QuerySpdyConnectTransaction();
MOZ_ASSERT(trans);
if (!trans) {
return NS_ERROR_UNEXPECTED;
{
mozilla::MutexAutoLock lock(mMutex);
mCallback = callback;
}
trans->mSession->TransactionHasDataToWrite(trans);
RefPtr<WebsocketHasDataToWrite> wsdw = new WebsocketHasDataToWrite(this);
Unused << wsdw->Dispatch();
return NS_OK;
}
class OutputCloseTransaction final : public Runnable
{
public:
OutputCloseTransaction(OutputStreamShim *shim, nsresult reason)
: Runnable("OutputCloseTransaction")
, mShim(shim)
, mReason(reason)
{ }
~OutputCloseTransaction() = default;
NS_IMETHOD Run() override
{
return mShim->CloseTransaction(mReason);
}
private:
RefPtr<OutputStreamShim> mShim;
nsresult mReason;
};
NS_IMETHODIMP
OutputStreamShim::CloseWithStatus(nsresult reason)
{
RefPtr<NullHttpTransaction> baseTrans(do_QueryReferent(mWeakTrans));
if (!OnSocketThread()) {
RefPtr<OutputCloseTransaction> oct = new OutputCloseTransaction(this, reason);
nsCOMPtr<nsIEventTarget> sts = do_GetService("@mozilla.org/network/socket-transport-service;1");
return sts->Dispatch(oct, nsIEventTarget::DISPATCH_NORMAL);
}
return CloseTransaction(reason);
}
nsresult
OutputStreamShim::CloseTransaction(nsresult reason)
{
MOZ_ASSERT(OnSocketThread());
RefPtr<NullHttpTransaction> baseTrans = mWeakTrans->QueryTransaction();
if (!baseTrans) {
return NS_ERROR_FAILURE;
}
@ -1330,7 +1605,8 @@ OutputStreamShim::Close()
NS_IMETHODIMP
OutputStreamShim::Flush()
{
RefPtr<NullHttpTransaction> baseTrans(do_QueryReferent(mWeakTrans));
MOZ_ASSERT(OnSocketThread());
RefPtr<NullHttpTransaction> baseTrans = mWeakTrans->QueryTransaction();
if (!baseTrans) {
return NS_ERROR_FAILURE;
}
@ -1352,6 +1628,23 @@ OutputStreamShim::Flush()
return rv;
}
nsresult
OutputStreamShim::CallTransactionHasDataToWrite()
{
MOZ_ASSERT(OnSocketThread());
RefPtr<NullHttpTransaction> baseTrans = mWeakTrans->QueryTransaction();
if (!baseTrans) {
return NS_ERROR_FAILURE;
}
SpdyConnectTransaction *trans = baseTrans->QuerySpdyConnectTransaction();
MOZ_ASSERT(trans);
if (!trans) {
return NS_ERROR_UNEXPECTED;
}
trans->mSession->TransactionHasDataToWrite(trans);
return NS_OK;
}
NS_IMETHODIMP
OutputStreamShim::Write(const char * aBuf, uint32_t aCount, uint32_t *_retval)
{
@ -1361,7 +1654,7 @@ OutputStreamShim::Write(const char * aBuf, uint32_t aCount, uint32_t *_retval)
return mStatus;
}
RefPtr<NullHttpTransaction> baseTrans(do_QueryReferent(mWeakTrans));
RefPtr<NullHttpTransaction> baseTrans = mWeakTrans->QueryTransaction();
if (!baseTrans) {
return NS_ERROR_FAILURE;
}
@ -1392,12 +1685,18 @@ OutputStreamShim::Write(const char * aBuf, uint32_t aCount, uint32_t *_retval)
NS_IMETHODIMP
OutputStreamShim::WriteFrom(nsIInputStream *aFromStream, uint32_t aCount, uint32_t *_retval)
{
if (mIsWebsocket) {
LOG3(("WARNING: OutputStreamShim::WriteFrom %p", this));
}
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
OutputStreamShim::WriteSegments(nsReadSegmentFun aReader, void *aClosure, uint32_t aCount, uint32_t *_retval)
{
if (mIsWebsocket) {
LOG3(("WARNING: OutputStreamShim::WriteSegments %p", this));
}
return NS_ERROR_NOT_IMPLEMENTED;
}
@ -1408,27 +1707,84 @@ OutputStreamShim::IsNonBlocking(bool *_retval)
return NS_OK;
}
nsIInputStreamCallback *
InputStreamShim::GetCallback()
{
mozilla::MutexAutoLock lock(mMutex);
return mCallback;
}
NS_IMETHODIMP
InputStreamShim::AsyncWait(nsIInputStreamCallback *callback,
unsigned int, unsigned int, nsIEventTarget *target)
unsigned int flags, unsigned int requestedCount,
nsIEventTarget *target)
{
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
bool currentThread;
if (mIsWebsocket) {
// With websockets, AsyncWait may be called from the main thread, but the
// target is on the socket thread. That's all we really care about.
nsCOMPtr<nsIEventTarget> sts = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID);
MOZ_ASSERT((!target && !callback) || (target == sts));
if (target && (target != sts)) {
return NS_ERROR_FAILURE;
}
} else {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
bool currentThread;
if (target &&
(NS_FAILED(target->IsOnCurrentThread(&currentThread)) || !currentThread)) {
return NS_ERROR_FAILURE;
if (target &&
(NS_FAILED(target->IsOnCurrentThread(&currentThread)) || !currentThread)) {
return NS_ERROR_FAILURE;
}
}
LOG(("InputStreamShim::AsyncWait %p callback %p\n", this, callback));
mCallback = callback;
{
mozilla::MutexAutoLock lock(mMutex);
mCallback = callback;
}
return NS_OK;
}
class InputCloseTransaction final : public Runnable
{
public:
InputCloseTransaction(InputStreamShim *shim, nsresult reason)
: Runnable("InputCloseTransaction")
, mShim(shim)
, mReason(reason)
{ }
~InputCloseTransaction() = default;
NS_IMETHOD Run() override
{
return mShim->CloseTransaction(mReason);
}
private:
RefPtr<InputStreamShim> mShim;
nsresult mReason;
};
NS_IMETHODIMP
InputStreamShim::CloseWithStatus(nsresult reason)
{
RefPtr<NullHttpTransaction> baseTrans(do_QueryReferent(mWeakTrans));
if (!OnSocketThread()) {
RefPtr<InputCloseTransaction> ict = new InputCloseTransaction(this, reason);
nsCOMPtr<nsIEventTarget> sts = do_GetService("@mozilla.org/network/socket-transport-service;1");
return sts->Dispatch(ict, nsIEventTarget::DISPATCH_NORMAL);
}
return CloseTransaction(reason);
}
nsresult
InputStreamShim::CloseTransaction(nsresult reason)
{
MOZ_ASSERT(OnSocketThread());
RefPtr<NullHttpTransaction> baseTrans = mWeakTrans->QueryTransaction();
if (!baseTrans) {
return NS_ERROR_FAILURE;
}
@ -1451,7 +1807,7 @@ InputStreamShim::Close()
NS_IMETHODIMP
InputStreamShim::Available(uint64_t *_retval)
{
RefPtr<NullHttpTransaction> baseTrans(do_QueryReferent(mWeakTrans));
RefPtr<NullHttpTransaction> baseTrans = mWeakTrans->QueryTransaction();
if (!baseTrans) {
return NS_ERROR_FAILURE;
}
@ -1474,7 +1830,7 @@ InputStreamShim::Read(char *aBuf, uint32_t aCount, uint32_t *_retval)
return mStatus;
}
RefPtr<NullHttpTransaction> baseTrans(do_QueryReferent(mWeakTrans));
RefPtr<NullHttpTransaction> baseTrans = mWeakTrans->QueryTransaction();
if (!baseTrans) {
return NS_ERROR_FAILURE;
}
@ -1500,6 +1856,9 @@ NS_IMETHODIMP
InputStreamShim::ReadSegments(nsWriteSegmentFun aWriter, void *aClosure,
uint32_t aCount, uint32_t *_retval)
{
if (mIsWebsocket) {
LOG3(("WARNING: InputStreamShim::ReadSegments %p", this));
}
return NS_ERROR_NOT_IMPLEMENTED;
}
@ -1513,18 +1872,40 @@ InputStreamShim::IsNonBlocking(bool *_retval)
NS_IMETHODIMP
SocketTransportShim::SetKeepaliveEnabled(bool aKeepaliveEnabled)
{
if (mIsWebsocket) {
LOG3(("WARNING: SocketTransportShim::SetKeepaliveEnabled %p called", this));
}
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
SocketTransportShim::SetKeepaliveVals(int32_t keepaliveIdleTime, int32_t keepaliveRetryInterval)
{
if (mIsWebsocket) {
LOG3(("WARNING: SocketTransportShim::SetKeepaliveVals %p called", this));
}
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
SocketTransportShim::GetSecurityCallbacks(nsIInterfaceRequestor **aSecurityCallbacks)
{
if (mIsWebsocket) {
nsCOMPtr<nsIInterfaceRequestor> out(mSecurityCallbacks);
*aSecurityCallbacks = out.forget().take();
return NS_OK;
}
return mWrapped->GetSecurityCallbacks(aSecurityCallbacks);
}
NS_IMETHODIMP
SocketTransportShim::SetSecurityCallbacks(nsIInterfaceRequestor *aSecurityCallbacks)
{
if (mIsWebsocket) {
mSecurityCallbacks = aSecurityCallbacks;
return NS_OK;
}
return NS_ERROR_NOT_IMPLEMENTED;
}
@ -1532,6 +1913,9 @@ NS_IMETHODIMP
SocketTransportShim::OpenInputStream(uint32_t aFlags, uint32_t aSegmentSize,
uint32_t aSegmentCount, nsIInputStream * *_retval)
{
if (mIsWebsocket) {
LOG3(("WARNING: SocketTransportShim::OpenInputStream %p", this));
}
return NS_ERROR_NOT_IMPLEMENTED;
}
@ -1539,36 +1923,55 @@ NS_IMETHODIMP
SocketTransportShim::OpenOutputStream(uint32_t aFlags, uint32_t aSegmentSize,
uint32_t aSegmentCount, nsIOutputStream * *_retval)
{
if (mIsWebsocket) {
LOG3(("WARNING: SocketTransportShim::OpenOutputStream %p", this));
}
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
SocketTransportShim::Close(nsresult aReason)
{
if (mIsWebsocket) {
LOG3(("WARNING: SocketTransportShim::Close %p", this));
}
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
SocketTransportShim::SetEventSink(nsITransportEventSink *aSink, nsIEventTarget *aEventTarget)
{
if (mIsWebsocket) {
// Need to pretend, since websockets expect this to work
return NS_OK;
}
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
SocketTransportShim::Bind(NetAddr *aLocalAddr)
{
if (mIsWebsocket) {
LOG3(("WARNING: SocketTransportShim::Bind %p", this));
}
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
SocketTransportShim::GetFirstRetryError(nsresult *aFirstRetryError)
{
if (mIsWebsocket) {
LOG3(("WARNING: SocketTransportShim::GetFirstRetryError %p", this));
}
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
SocketTransportShim::GetEsniUsed(bool *aEsniUsed)
{
if (mIsWebsocket) {
LOG3(("WARNING: SocketTransportShim::GetEsniUsed %p", this));
}
return NS_ERROR_NOT_IMPLEMENTED;
}
@ -1591,7 +1994,6 @@ FWD_TS_PTR(GetSelfAddr, mozilla::net::NetAddr);
FWD_TS_ADDREF(GetScriptablePeerAddr, nsINetAddr);
FWD_TS_ADDREF(GetScriptableSelfAddr, nsINetAddr);
FWD_TS_ADDREF(GetSecurityInfo, nsISupports);
FWD_TS_ADDREF(GetSecurityCallbacks, nsIInterfaceRequestor);
FWD_TS_PTR(IsAlive, bool);
FWD_TS_PTR(GetConnectionFlags, uint32_t);
FWD_TS(SetConnectionFlags, uint32_t);

View File

@ -189,7 +189,8 @@ public:
nsIInterfaceRequestor *callbacks,
uint32_t caps,
nsHttpTransaction *trans,
nsAHttpConnection *session);
nsAHttpConnection *session,
bool isWebsocket);
~SpdyConnectTransaction();
SpdyConnectTransaction *QuerySpdyConnectTransaction() override { return this; }
@ -213,6 +214,9 @@ public:
// an nsHttpConnection that can properly deal with flow control, etc..
bool ConnectedReadyForInput();
bool IsWebsocket() { return mIsWebsocket; }
void SetConnRefTaken();
private:
friend class InputStreamShim;
friend class OutputStreamShim;
@ -249,6 +253,18 @@ private:
RefPtr<InputStreamShim> mTunnelStreamIn;
RefPtr<OutputStreamShim> mTunnelStreamOut;
RefPtr<nsHttpTransaction> mDrivingTransaction;
// This is all for websocket support
bool mIsWebsocket;
bool mConnRefTaken;
nsCOMPtr<nsIAsyncOutputStream> mInputShimPipe;
nsCOMPtr<nsIAsyncInputStream> mOutputShimPipe;
nsresult WriteDataToBuffer(nsAHttpSegmentWriter *writer,
uint32_t count,
uint32_t *countWritten);
MOZ_MUST_USE nsresult WebsocketWriteSegments(nsAHttpSegmentWriter *writer,
uint32_t count,
uint32_t *countWritten);
};
} // namespace net

View File

@ -111,6 +111,11 @@ namespace net {
// for use with TRR implementations themselves
#define NS_HTTP_DISABLE_TRR (1<<14)
// Allow re-using a spdy/http2 connection with NS_HTTP_ALLOW_KEEPALIVE not set.
// This is primarily used to allow connection sharing for websockets over http/2
// without accidentally allowing it for websockets not over http/2
#define NS_HTTP_ALLOW_SPDY_WITHOUT_KEEPALIVE (1<<15)
//-----------------------------------------------------------------------------
// some default values
//-----------------------------------------------------------------------------

View File

@ -593,7 +593,17 @@ nsHttpChannel::OnBeforeConnect()
return NS_ERROR_UNKNOWN_HOST;
if (mUpgradeProtocolCallback) {
mCaps |= NS_HTTP_DISALLOW_SPDY;
// Websockets can run over HTTP/2, but other upgrades can't.
if (mUpgradeProtocol.EqualsLiteral("websocket") &&
gHttpHandler->IsH2WebsocketsEnabled()) {
// Need to tell the conn manager that we're ok with http/2 even with
// the allow keepalive bit not set. That bit needs to stay off,
// though, in case we end up having to fallback to http/1.1 (where
// we absolutely do want to disable keepalive).
mCaps |= NS_HTTP_ALLOW_SPDY_WITHOUT_KEEPALIVE;
} else {
mCaps |= NS_HTTP_DISALLOW_SPDY;
}
}
if (mTRR) {
@ -7767,7 +7777,9 @@ nsHttpChannel::OnStopRequest(nsIRequest *request, nsISupports *ctxt, nsresult st
}
if (mUpgradeProtocolCallback && stickyConn &&
mResponseHead && mResponseHead->Status() == 101) {
mResponseHead &&
((mResponseHead->Status() == 101 && mResponseHead->Version() == HttpVersion::v1_1) ||
(mResponseHead->Status() == 200 && mResponseHead->Version() == HttpVersion::v2_0))) {
nsresult rv =
gHttpHandler->ConnMgr()->CompleteUpgrade(stickyConn,
mUpgradeProtocolCallback);

View File

@ -902,11 +902,18 @@ nsHttpConnection::AddTransaction(nsAHttpTransaction *httpTransaction,
needTunnel = needTunnel && transCI->UsingConnect();
needTunnel = needTunnel && httpTransaction->QueryHttpTransaction();
bool isWebsocket = false;
nsHttpTransaction *trans = httpTransaction->QueryHttpTransaction();
if (trans) {
isWebsocket = trans->IsWebsocketUpgrade();
MOZ_ASSERT(!isWebsocket || !needTunnel, "Websocket and tunnel?!");
}
LOG(("nsHttpConnection::AddTransaction for SPDY%s",
needTunnel ? " over tunnel" : ""));
needTunnel ? " over tunnel" : (isWebsocket ? " websocket" : "")));
if (!mSpdySession->AddStream(httpTransaction, priority,
needTunnel, mCallbacks)) {
needTunnel, isWebsocket, mCallbacks)) {
MOZ_ASSERT(false); // this cannot happen!
httpTransaction->Close(NS_ERROR_ABORT);
return NS_ERROR_FAILURE;
@ -1310,8 +1317,11 @@ nsHttpConnection::OnHeadersAvailable(nsAHttpTransaction *trans,
upgradeReq));
// Don't use persistent connection for Upgrade unless there's an auth failure:
// some proxies expect to see auth response on persistent connection.
if (hasUpgradeReq && responseStatus != 401 && responseStatus != 407) {
LOG(("HTTP Upgrade in play - disable keepalive\n"));
// Also allow persistent conn for h2, as we don't want to waste connections
// for multiplexed upgrades.
if (hasUpgradeReq && responseStatus != 401 && responseStatus != 407 &&
!mSpdySession) {
LOG(("HTTP Upgrade in play - disable keepalive for http/1.x\n"));
DontReuse();
}
@ -2138,7 +2148,8 @@ nsHttpConnection::SetInSpdyTunnel(bool arg)
nsresult
nsHttpConnection::MakeConnectString(nsAHttpTransaction *trans,
nsHttpRequestHead *request,
nsACString &result)
nsACString &result,
bool h2ws)
{
result.Truncate();
if (!trans->ConnectionInfo()) {
@ -2155,7 +2166,16 @@ nsHttpConnection::MakeConnectString(nsAHttpTransaction *trans,
// CONNECT host:port HTTP/1.1
request->SetMethod(NS_LITERAL_CSTRING("CONNECT"));
request->SetVersion(gHttpHandler->HttpVersion());
request->SetRequestURI(result);
if (h2ws) {
// HTTP/2 websocket CONNECT forms need the full request URI
nsAutoCString requestURI;
trans->RequestHead()->RequestURI(requestURI);
request->SetRequestURI(requestURI);
request->SetHTTPS(trans->RequestHead()->IsHTTPS());
} else {
request->SetRequestURI(result);
}
rv = request->SetHeader(nsHttp::User_Agent, gHttpHandler->UserAgent());
MOZ_ASSERT(NS_SUCCEEDED(rv));
@ -2196,7 +2216,7 @@ nsHttpConnection::SetupProxyConnect()
nsAutoCString buf;
nsHttpRequestHead request;
nsresult rv = MakeConnectString(mTransaction, &request, buf);
nsresult rv = MakeConnectString(mTransaction, &request, buf, false);
if (NS_FAILED(rv)) {
return rv;
}
@ -2652,5 +2672,15 @@ nsHttpConnection::NoClientCertAuth() const
return !ssc->GetClientCertSent();
}
bool
nsHttpConnection::CanAcceptWebsocket()
{
if (!UsingSpdy()) {
return true;
}
return mSpdySession->CanAcceptWebsocket();
}
} // namespace net
} // namespace mozilla

View File

@ -53,6 +53,7 @@ class nsHttpConnection final : public nsAHttpSegmentReader
, public ARefBase
, public nsSupportsWeakReference
{
private:
virtual ~nsHttpConnection();
public:
@ -214,7 +215,8 @@ public:
static MOZ_MUST_USE nsresult MakeConnectString(nsAHttpTransaction *trans,
nsHttpRequestHead *request,
nsACString &result);
nsACString &result,
bool h2ws);
void SetupSecondaryTLS(nsAHttpTransaction *aSpdyConnectTransaction = nullptr);
void SetInSpdyTunnel(bool arg);
@ -248,6 +250,9 @@ public:
// has finished this returns false.
bool NoClientCertAuth() const;
// HTTP/2 websocket support
bool CanAcceptWebsocket();
private:
// Value (set in mTCPKeepaliveConfig) indicates which set of prefs to use.
enum TCPKeepaliveConfig {

View File

@ -1546,7 +1546,11 @@ nsHttpConnectionMgr::TryDispatchTransaction(nsConnectionEntry *ent,
if (!(caps & NS_HTTP_DISALLOW_SPDY) && gHttpHandler->IsSpdyEnabled()) {
RefPtr<nsHttpConnection> conn = GetSpdyActiveConn(ent);
if (conn) {
if ((caps & NS_HTTP_ALLOW_KEEPALIVE) || !conn->IsExperienced()) {
bool websocketCheckOK = trans->IsWebsocketUpgrade() ? conn->CanAcceptWebsocket() : true;
if (websocketCheckOK &&
((caps & NS_HTTP_ALLOW_KEEPALIVE) ||
(caps & NS_HTTP_ALLOW_SPDY_WITHOUT_KEEPALIVE) ||
!conn->IsExperienced())) {
LOG((" dispatch to spdy: [conn=%p]\n", conn.get()));
trans->RemoveDispatchedAsBlocking(); /* just in case */
nsresult rv = DispatchTransaction(ent, trans, conn);
@ -1910,7 +1914,7 @@ nsHttpConnectionMgr::ProcessNewTransaction(nsHttpTransaction *trans)
LOG((" ProcessNewTransaction %p tied to h2 session push %p\n",
trans, pushedStream->Session()));
return pushedStream->Session()->
AddStream(trans, trans->Priority(), false, nullptr) ?
AddStream(trans, trans->Priority(), false, false, nullptr) ?
NS_OK : NS_ERROR_UNEXPECTED;
}

View File

@ -260,6 +260,7 @@ nsHttpHandler::nsHttpHandler()
, mEnableAltSvc(false)
, mEnableAltSvcOE(false)
, mEnableOriginExtension(false)
, mEnableH2Websockets(true)
, mSpdySendingChunkSize(ASpdySession::kSendingChunkSize)
, mSpdySendBufferSize(ASpdySession::kTCPSendBufferSize)
, mSpdyPushAllowance(131072) // match default pref
@ -1582,6 +1583,14 @@ nsHttpHandler::PrefsChanged(const char *pref)
mEnableOriginExtension = cVar;
}
if (PREF_CHANGED(HTTP_PREF("spdy.websockets"))) {
rv = Preferences::GetBool(HTTP_PREF("spdy.websockets"),
&cVar);
if (NS_SUCCEEDED(rv)) {
mEnableH2Websockets = cVar;
}
}
if (PREF_CHANGED(HTTP_PREF("spdy.push-allowance"))) {
rv = Preferences::GetInt(HTTP_PREF("spdy.push-allowance"), &val);
if (NS_SUCCEEDED(rv)) {

View File

@ -131,6 +131,7 @@ public:
uint32_t ParallelSpeculativeConnectLimit() { return mParallelSpeculativeConnectLimit; }
bool CriticalRequestPrioritization() { return mCriticalRequestPrioritization; }
bool UseH2Deps() { return mUseH2Deps; }
bool IsH2WebsocketsEnabled() { return mEnableH2Websockets; }
uint32_t MaxConnectionsPerOrigin() { return mMaxPersistentConnectionsPerServer; }
bool UseRequestTokenBucket() { return mRequestTokenBucketEnabled; }
@ -591,6 +592,7 @@ private:
uint32_t mEnableAltSvc : 1;
uint32_t mEnableAltSvcOE : 1;
uint32_t mEnableOriginExtension : 1;
uint32_t mEnableH2Websockets : 1;
// Try to use SPDY features instead of HTTP/1.1 over SSL
SpdyInformation mSpdyInfo;

View File

@ -45,6 +45,7 @@
#include "nsIRequestContext.h"
#include "nsIHttpAuthenticator.h"
#include "NSSErrorsService.h"
#include "TunnelUtils.h"
#include "sslerr.h"
#include <algorithm>
@ -214,6 +215,30 @@ void nsHttpTransaction::SetClassOfService(uint32_t cos)
}
}
class ReleaseH2WSTrans final : public Runnable
{
public:
explicit ReleaseH2WSTrans(SpdyConnectTransaction *trans)
: Runnable("ReleaseH2WSTrans")
, mTrans(trans)
{ }
NS_IMETHOD Run() override
{
mTrans = nullptr;
return NS_OK;
}
void Dispatch()
{
nsCOMPtr<nsIEventTarget> sts = do_GetService("@mozilla.org/network/socket-transport-service;1");
Unused << sts->Dispatch(this, nsIEventTarget::DISPATCH_NORMAL);
}
private:
RefPtr<SpdyConnectTransaction> mTrans;
};
nsHttpTransaction::~nsHttpTransaction()
{
LOG(("Destroying nsHttpTransaction @%p\n", this));
@ -237,6 +262,11 @@ nsHttpTransaction::~nsHttpTransaction()
delete mResponseHead;
delete mChunkedDecoder;
ReleaseBlockingTransaction();
if (mH2WSTransaction) {
RefPtr<ReleaseH2WSTrans> r = new ReleaseH2WSTrans(mH2WSTransaction);
r->Dispatch();
}
}
nsresult
@ -456,6 +486,13 @@ nsHttpTransaction::Connection()
already_AddRefed<nsAHttpConnection>
nsHttpTransaction::GetConnectionReference()
{
if (mH2WSTransaction) {
// Need to let the websocket transaction/connection know we've reached
// this point so it can stop forwarding information through us and
// instead communicate directly with the websocket channel.
mH2WSTransaction->SetConnRefTaken();
mH2WSTransaction = nullptr;
}
MutexAutoLock lock(mLock);
RefPtr<nsAHttpConnection> connection(mConnection);
return connection.forget();
@ -1681,6 +1718,12 @@ nsHttpTransaction::HandleContentStart()
break;
}
if (mResponseHead->Status() == 200 &&
mH2WSTransaction) {
// http/2 websockets do not have response bodies
mNoContent = true;
}
if (mResponseHead->Status() == 200 &&
mConnection->IsProxyConnectInProgress()) {
// successful CONNECTs do not have response bodies
@ -2488,5 +2531,24 @@ nsHttpTransaction::SetHttpTrailers(nsCString &aTrailers)
}
}
bool
nsHttpTransaction::IsWebsocketUpgrade()
{
if (mRequestHead) {
nsAutoCString upgradeHeader;
if (NS_SUCCEEDED(mRequestHead->GetHeader(nsHttp::Upgrade, upgradeHeader)) &&
upgradeHeader.LowerCaseEqualsLiteral("websocket")) {
return true;
}
}
return false;
}
void
nsHttpTransaction::SetH2WSTransaction(SpdyConnectTransaction *aH2WSTransaction)
{
mH2WSTransaction = aH2WSTransaction;
}
} // namespace net
} // namespace mozilla

View File

@ -33,6 +33,8 @@ class nsHttpChunkedDecoder;
class nsHttpHeaderArray;
class nsHttpRequestHead;
class nsHttpResponseHead;
class NullHttpTransaction;
class SpdyConnectTransaction;
//-----------------------------------------------------------------------------
// nsHttpTransaction represents a single HTTP transaction. It is thread-safe,
@ -195,6 +197,9 @@ public:
void SetFastOpenStatus(uint8_t aStatus) override;
void SetHttpTrailers(nsCString &aTrailers);
bool IsWebsocketUpgrade();
void SetH2WSTransaction(SpdyConnectTransaction *);
private:
friend class DeleteHttpTransaction;
virtual ~nsHttpTransaction();
@ -467,6 +472,9 @@ private:
} mEarlyDataDisposition;
uint8_t mFastOpenStatus;
// H2 websocket support
RefPtr<SpdyConnectTransaction> mH2WSTransaction;
};
} // namespace net

View File

@ -2359,7 +2359,9 @@ WebSocketChannel::CleanupConnection()
}
if (mSocketIn) {
mSocketIn->AsyncWait(nullptr, 0, 0, nullptr);
if (mDataStarted) {
mSocketIn->AsyncWait(nullptr, 0, 0, nullptr);
}
mSocketIn = nullptr;
}
@ -2413,6 +2415,7 @@ WebSocketChannel::DoStopSession(nsresult reason)
// from OnStartRequest before the socket thread machine has gotten underway
MOZ_ASSERT(mStopped);
MOZ_ASSERT(OnSocketThread() || mTCPClosed || !mDataStarted);
if (!mOpenedHttpChannel) {
// The HTTP channel information will never be used in this case
@ -2446,7 +2449,7 @@ WebSocketChannel::DoStopSession(nsresult reason)
mPingTimer = nullptr;
}
if (mSocketIn && !mTCPClosed) {
if (mSocketIn && !mTCPClosed && mDataStarted) {
// Drain, within reason, this socket. if we leave any data
// unconsumed (including the tcp fin) a RST will be generated
// The right thing to do here is shutdown(SHUT_WR) and then wait
@ -3893,72 +3896,81 @@ WebSocketChannel::OnStartRequest(nsIRequest *aRequest,
}
LOG(("WebSocketChannel::OnStartRequest: HTTP status %d\n", status));
if (status != 101) {
nsCOMPtr<nsIHttpChannelInternal> internalChannel = do_QueryInterface(mHttpChannel);
uint32_t versionMajor, versionMinor;
rv = internalChannel->GetResponseVersion(&versionMajor, &versionMinor);
if (NS_FAILED(rv) ||
(versionMajor != 1 && versionMajor != 2) ||
(versionMajor == 1 && status != 101) ||
(versionMajor == 2 && status != 200)) {
AbortSession(NS_ERROR_CONNECTION_REFUSED);
return NS_ERROR_CONNECTION_REFUSED;
}
nsAutoCString respUpgrade;
rv = mHttpChannel->GetResponseHeader(
NS_LITERAL_CSTRING("Upgrade"), respUpgrade);
if (versionMajor == 1) {
// These are only present on http/1.x websocket upgrades
nsAutoCString respUpgrade;
rv = mHttpChannel->GetResponseHeader(
NS_LITERAL_CSTRING("Upgrade"), respUpgrade);
if (NS_SUCCEEDED(rv)) {
rv = NS_ERROR_ILLEGAL_VALUE;
if (!respUpgrade.IsEmpty()) {
val = respUpgrade.BeginWriting();
while ((token = nsCRT::strtok(val, ", \t", &val))) {
if (PL_strcasecmp(token, "Websocket") == 0) {
rv = NS_OK;
break;
if (NS_SUCCEEDED(rv)) {
rv = NS_ERROR_ILLEGAL_VALUE;
if (!respUpgrade.IsEmpty()) {
val = respUpgrade.BeginWriting();
while ((token = nsCRT::strtok(val, ", \t", &val))) {
if (PL_strcasecmp(token, "Websocket") == 0) {
rv = NS_OK;
break;
}
}
}
}
}
if (NS_FAILED(rv)) {
LOG(("WebSocketChannel::OnStartRequest: "
"HTTP response header Upgrade: websocket not found\n"));
AbortSession(NS_ERROR_ILLEGAL_VALUE);
return rv;
}
if (NS_FAILED(rv)) {
LOG(("WebSocketChannel::OnStartRequest: "
"HTTP response header Upgrade: websocket not found\n"));
AbortSession(NS_ERROR_ILLEGAL_VALUE);
return rv;
}
nsAutoCString respConnection;
rv = mHttpChannel->GetResponseHeader(
NS_LITERAL_CSTRING("Connection"), respConnection);
nsAutoCString respConnection;
rv = mHttpChannel->GetResponseHeader(
NS_LITERAL_CSTRING("Connection"), respConnection);
if (NS_SUCCEEDED(rv)) {
rv = NS_ERROR_ILLEGAL_VALUE;
if (!respConnection.IsEmpty()) {
val = respConnection.BeginWriting();
while ((token = nsCRT::strtok(val, ", \t", &val))) {
if (PL_strcasecmp(token, "Upgrade") == 0) {
rv = NS_OK;
break;
if (NS_SUCCEEDED(rv)) {
rv = NS_ERROR_ILLEGAL_VALUE;
if (!respConnection.IsEmpty()) {
val = respConnection.BeginWriting();
while ((token = nsCRT::strtok(val, ", \t", &val))) {
if (PL_strcasecmp(token, "Upgrade") == 0) {
rv = NS_OK;
break;
}
}
}
}
}
if (NS_FAILED(rv)) {
LOG(("WebSocketChannel::OnStartRequest: "
"HTTP response header 'Connection: Upgrade' not found\n"));
AbortSession(NS_ERROR_ILLEGAL_VALUE);
return rv;
}
if (NS_FAILED(rv)) {
LOG(("WebSocketChannel::OnStartRequest: "
"HTTP response header 'Connection: Upgrade' not found\n"));
AbortSession(NS_ERROR_ILLEGAL_VALUE);
return rv;
}
nsAutoCString respAccept;
rv = mHttpChannel->GetResponseHeader(
NS_LITERAL_CSTRING("Sec-WebSocket-Accept"),
respAccept);
nsAutoCString respAccept;
rv = mHttpChannel->GetResponseHeader(
NS_LITERAL_CSTRING("Sec-WebSocket-Accept"),
respAccept);
if (NS_FAILED(rv) ||
respAccept.IsEmpty() || !respAccept.Equals(mHashedSecret)) {
LOG(("WebSocketChannel::OnStartRequest: "
"HTTP response header Sec-WebSocket-Accept check failed\n"));
LOG(("WebSocketChannel::OnStartRequest: Expected %s received %s\n",
mHashedSecret.get(), respAccept.get()));
AbortSession(NS_ERROR_ILLEGAL_VALUE);
return NS_ERROR_ILLEGAL_VALUE;
if (NS_FAILED(rv) ||
respAccept.IsEmpty() || !respAccept.Equals(mHashedSecret)) {
LOG(("WebSocketChannel::OnStartRequest: "
"HTTP response header Sec-WebSocket-Accept check failed\n"));
LOG(("WebSocketChannel::OnStartRequest: Expected %s received %s\n",
mHashedSecret.get(), respAccept.get()));
AbortSession(NS_ERROR_ILLEGAL_VALUE);
return NS_ERROR_ILLEGAL_VALUE;
}
}
// If we sent a sub protocol header, verify the response matches.