Bug 1812059 - Make WebTransportSessionProxy support retargetTo, r=necko-reviewers,valentin

Differential Revision: https://phabricator.services.mozilla.com/D167794
This commit is contained in:
Kershaw Chang 2023-02-02 22:28:02 +00:00
parent d4bda3b3fe
commit cf0103afb9
6 changed files with 110 additions and 69 deletions

View File

@ -137,13 +137,7 @@ mozilla::ipc::IPCResult WebTransportParent::RecvClose(
PromiseFlatCString(aReason).get()));
MOZ_ASSERT(!mClosed);
mClosed.Flip();
nsAutoCString reason(aReason);
NS_DispatchToMainThread(NS_NewRunnableFunction(
"WebTransport Close", [self = RefPtr{this}, aCode, reason] {
if (self->mWebTransport) {
self->mWebTransport->CloseSession(aCode, reason);
}
}));
mWebTransport->CloseSession(aCode, aReason);
Close();
return IPC_OK();
}
@ -168,6 +162,16 @@ WebTransportParent::OnSessionReady(uint64_t aSessionId) {
}
}));
nsresult rv;
nsCOMPtr<nsISerialEventTarget> sts =
do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
if (NS_FAILED(rv)) {
return rv;
}
// Retarget to socket thread. After this, WebTransportParent and
// |mWebTransport| should be only accessed on the socket thread.
Unused << mWebTransport->RetargetTo(sts);
return NS_OK;
}

View File

@ -331,7 +331,7 @@ void Http3WebTransportSession::Close(nsresult aResult) {
}
void Http3WebTransportSession::OnSessionClosed(uint32_t aStatus,
nsACString& aReason) {
const nsACString& aReason) {
if (mTransaction) {
mTransaction->Close(NS_BASE_STREAM_CLOSED);
mTransaction = nullptr;
@ -344,7 +344,7 @@ void Http3WebTransportSession::OnSessionClosed(uint32_t aStatus,
}
void Http3WebTransportSession::CloseSession(uint32_t aStatus,
nsACString& aReason) {
const nsACString& aReason) {
if ((mRecvState != CLOSE_PENDING) && (mRecvState != RECV_DONE)) {
mStatus = aStatus;
mReason = aReason;

View File

@ -54,8 +54,8 @@ class Http3WebTransportSession final : public Http3StreamBase,
nsresult TryActivating();
void TransactionIsDone(nsresult aResult);
void CloseSession(uint32_t aStatus, nsACString& aReason);
void OnSessionClosed(uint32_t aStatus, nsACString& aReason);
void CloseSession(uint32_t aStatus, const nsACString& aReason);
void OnSessionClosed(uint32_t aStatus, const nsACString& aReason);
void CreateOutgoingBidirectionalStream(
std::function<void(Result<RefPtr<Http3WebTransportStream>, nsresult>&&)>&&

View File

@ -26,7 +26,8 @@ NS_IMPL_ISUPPORTS(WebTransportSessionProxy, WebTransportSessionEventListener,
nsIChannelEventSink, nsIInterfaceRequestor);
WebTransportSessionProxy::WebTransportSessionProxy()
: mMutex("WebTransportSessionProxy::mMutex") {
: mMutex("WebTransportSessionProxy::mMutex"),
mTarget(GetMainThreadSerialEventTarget()) {
LOG(("WebTransportSessionProxy constructor"));
}
@ -101,13 +102,28 @@ nsresult WebTransportSessionProxy::AsyncConnect(
return rv;
}
NS_IMETHODIMP
WebTransportSessionProxy::RetargetTo(nsIEventTarget* aTarget) {
{
MutexAutoLock lock(mMutex);
LOG(("WebTransportSessionProxy::RetargetTo mState=%d", mState));
// RetargetTo should be only called after the session is ready.
if (mState != WebTransportSessionProxyState::ACTIVE) {
return NS_ERROR_UNEXPECTED;
}
}
mTarget = aTarget;
return NS_OK;
}
NS_IMETHODIMP
WebTransportSessionProxy::GetStats() { return NS_ERROR_NOT_IMPLEMENTED; }
NS_IMETHODIMP
WebTransportSessionProxy::CloseSession(uint32_t status,
const nsACString& reason) {
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(mTarget->IsOnCurrentThread());
MutexAutoLock lock(mMutex);
mCloseStatus = status;
mReason = reason;
@ -141,33 +157,40 @@ WebTransportSessionProxy::CloseSession(uint32_t status,
return NS_OK;
}
void WebTransportSessionProxy::CloseSessionInternalLocked() {
MutexAutoLock lock(mMutex);
CloseSessionInternal();
}
void WebTransportSessionProxy::CloseSessionInternal() {
if (!OnSocketThread()) {
mMutex.AssertCurrentThreadOwns();
RefPtr<WebTransportSessionProxy> self(this);
Unused << gSocketTransportService->Dispatch(NS_NewRunnableFunction(
"WebTransportSessionProxy::CallCloseWebTransportSession",
[self{std::move(self)}]() { self->CloseSessionInternal(); }));
[self{std::move(self)}]() { self->CloseSessionInternalLocked(); }));
return;
}
mMutex.AssertCurrentThreadOwns();
RefPtr<Http3WebTransportSession> wt;
uint32_t closeStatus = 0;
nsCString reason;
{
MutexAutoLock lock(mMutex);
if (mState == WebTransportSessionProxyState::SESSION_CLOSE_PENDING) {
MOZ_ASSERT(mWebTransportSession);
wt = mWebTransportSession;
mWebTransportSession = nullptr;
closeStatus = mCloseStatus;
reason = mReason;
ChangeState(WebTransportSessionProxyState::DONE);
} else {
MOZ_ASSERT(mState == WebTransportSessionProxyState::DONE);
}
if (mState == WebTransportSessionProxyState::SESSION_CLOSE_PENDING) {
MOZ_ASSERT(mWebTransportSession);
wt = mWebTransportSession;
mWebTransportSession = nullptr;
closeStatus = mCloseStatus;
reason = mReason;
ChangeState(WebTransportSessionProxyState::DONE);
} else {
MOZ_ASSERT(mState == WebTransportSessionProxyState::DONE);
}
if (wt) {
MutexAutoUnlock unlock(mMutex);
wt->CloseSession(closeStatus, reason);
}
}
@ -650,10 +673,10 @@ WebTransportSessionProxy::OnSessionReadyInternal(
NS_IMETHODIMP
WebTransportSessionProxy::OnIncomingStreamAvailableInternal(
Http3WebTransportStream* aStream) {
if (!NS_IsMainThread()) {
if (!mTarget->IsOnCurrentThread()) {
RefPtr<WebTransportSessionProxy> self(this);
RefPtr<Http3WebTransportStream> stream = aStream;
Unused << NS_DispatchToMainThread(NS_NewRunnableFunction(
Unused << mTarget->Dispatch(NS_NewRunnableFunction(
"WebTransportSessionProxy::OnIncomingStreamAvailableInternal",
[self{std::move(self)}, stream{std::move(stream)}]() {
self->OnIncomingStreamAvailableInternal(stream);
@ -734,10 +757,7 @@ WebTransportSessionProxy::OnSessionClosed(uint32_t status,
mReason = reason;
mWebTransportSession = nullptr;
ChangeState(WebTransportSessionProxyState::CLOSE_CALLBACK_PENDING);
RefPtr<WebTransportSessionProxy> self(this);
Unused << NS_DispatchToMainThread(NS_NewRunnableFunction(
"WebTransportSessionProxy::CallOnSessionClose",
[self{std::move(self)}]() { self->CallOnSessionClosed(); }));
CallOnSessionClosed();
} break;
case WebTransportSessionProxyState::SESSION_CLOSE_PENDING:
ChangeState(WebTransportSessionProxyState::DONE);
@ -750,34 +770,49 @@ WebTransportSessionProxy::OnSessionClosed(uint32_t status,
return NS_OK;
}
void WebTransportSessionProxy::CallOnSessionClosedLocked() {
MutexAutoLock lock(mMutex);
CallOnSessionClosed();
}
void WebTransportSessionProxy::CallOnSessionClosed() {
MOZ_ASSERT(NS_IsMainThread(), "not on socket thread");
if (!mTarget->IsOnCurrentThread()) {
RefPtr<WebTransportSessionProxy> self(this);
Unused << mTarget->Dispatch(NS_NewRunnableFunction(
"WebTransportSessionProxy::CallOnSessionClosed",
[self{std::move(self)}]() { self->CallOnSessionClosedLocked(); }));
return;
}
mMutex.AssertCurrentThreadOwns();
MOZ_ASSERT(mTarget->IsOnCurrentThread());
nsCOMPtr<WebTransportSessionEventListener> listener;
nsAutoCString reason;
uint32_t closeStatus = 0;
{
MutexAutoLock lock(mMutex);
switch (mState) {
case WebTransportSessionProxyState::INIT:
case WebTransportSessionProxyState::NEGOTIATING:
case WebTransportSessionProxyState::NEGOTIATING_SUCCEEDED:
case WebTransportSessionProxyState::ACTIVE:
case WebTransportSessionProxyState::SESSION_CLOSE_PENDING:
MOZ_ASSERT(false,
"CallOnSessionClosed cannot be called in this state.");
break;
case WebTransportSessionProxyState::CLOSE_CALLBACK_PENDING:
listener = mListener;
mListener = nullptr;
reason = mReason;
closeStatus = mCloseStatus;
ChangeState(WebTransportSessionProxyState::DONE);
break;
case WebTransportSessionProxyState::DONE:
break;
}
switch (mState) {
case WebTransportSessionProxyState::INIT:
case WebTransportSessionProxyState::NEGOTIATING:
case WebTransportSessionProxyState::NEGOTIATING_SUCCEEDED:
case WebTransportSessionProxyState::ACTIVE:
case WebTransportSessionProxyState::SESSION_CLOSE_PENDING:
MOZ_ASSERT(false, "CallOnSessionClosed cannot be called in this state.");
break;
case WebTransportSessionProxyState::CLOSE_CALLBACK_PENDING:
listener = mListener;
mListener = nullptr;
reason = mReason;
closeStatus = mCloseStatus;
ChangeState(WebTransportSessionProxyState::DONE);
break;
case WebTransportSessionProxyState::DONE:
break;
}
if (listener) {
// Don't invoke the callback under the lock.
MutexAutoUnlock unlock(mMutex);
listener->OnSessionClosed(closeStatus, reason);
}
}
@ -849,9 +884,7 @@ void WebTransportSessionProxy::ChangeState(
void WebTransportSessionProxy::NotifyDatagramReceived(
nsTArray<uint8_t>&& aData) {
// TODO: this should be on the target thread, but the target thread is main
// thread for now.
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(mTarget->IsOnCurrentThread());
nsCOMPtr<WebTransportSessionEventListener> listener;
{
@ -869,8 +902,8 @@ NS_IMETHODIMP WebTransportSessionProxy::OnDatagramReceivedInternal(
nsTArray<uint8_t>&& aData) {
MOZ_ASSERT(OnSocketThread());
if (!NS_IsMainThread()) {
return NS_DispatchToMainThread(NS_NewRunnableFunction(
if (!mTarget->IsOnCurrentThread()) {
return mTarget->Dispatch(NS_NewRunnableFunction(
"WebTransportSessionProxy::OnDatagramReceived",
[self = RefPtr{this}, data{std::move(aData)}]() mutable {
self->NotifyDatagramReceived(std::move(data));
@ -887,9 +920,7 @@ NS_IMETHODIMP WebTransportSessionProxy::OnDatagramReceived(
}
void WebTransportSessionProxy::OnMaxDatagramSizeInternal(uint64_t aSize) {
// TODO: this should be on the target thread, but the target thread is main
// thread for now.
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(mTarget->IsOnCurrentThread());
nsCOMPtr<WebTransportSessionEventListener> listener;
{
@ -906,8 +937,8 @@ void WebTransportSessionProxy::OnMaxDatagramSizeInternal(uint64_t aSize) {
NS_IMETHODIMP WebTransportSessionProxy::OnMaxDatagramSize(uint64_t aSize) {
MOZ_ASSERT(OnSocketThread());
if (!NS_IsMainThread()) {
return NS_DispatchToMainThread(
if (!mTarget->IsOnCurrentThread()) {
return mTarget->Dispatch(
NS_NewRunnableFunction("WebTransportSessionProxy::OnMaxDatagramSize",
[self = RefPtr{this}, size(aSize)] {
self->OnMaxDatagramSizeInternal(size);
@ -920,9 +951,7 @@ NS_IMETHODIMP WebTransportSessionProxy::OnMaxDatagramSize(uint64_t aSize) {
void WebTransportSessionProxy::OnOutgoingDatagramOutComeInternal(
uint64_t aId, WebTransportSessionEventListener::DatagramOutcome aOutCome) {
// TODO: this should be on the target thread, but the target thread is main
// thread for now.
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(mTarget->IsOnCurrentThread());
nsCOMPtr<WebTransportSessionEventListener> listener;
{
@ -941,8 +970,8 @@ WebTransportSessionProxy::OnOutgoingDatagramOutCome(
uint64_t aId, WebTransportSessionEventListener::DatagramOutcome aOutCome) {
MOZ_ASSERT(OnSocketThread());
if (!NS_IsMainThread()) {
return NS_DispatchToMainThread(NS_NewRunnableFunction(
if (!mTarget->IsOnCurrentThread()) {
return mTarget->Dispatch(NS_NewRunnableFunction(
"WebTransportSessionProxy::OnOutgoingDatagramOutCome",
[self = RefPtr{this}, id(aId), outcome(aOutCome)] {
self->OnOutgoingDatagramOutComeInternal(id, outcome);

View File

@ -141,7 +141,9 @@ class WebTransportSessionProxy final : public nsIWebTransport,
~WebTransportSessionProxy();
void CloseSessionInternal();
void CloseSessionInternalLocked();
void CallOnSessionClosed();
void CallOnSessionClosedLocked();
enum WebTransportSessionProxyState {
INIT,
@ -176,6 +178,7 @@ class WebTransportSessionProxy final : public nsIWebTransport,
nsCString mReason MOZ_GUARDED_BY(mMutex);
// This is used to store events happened before OnSessionReady.
nsTArray<std::function<void()>> mPendingEvents MOZ_GUARDED_BY(mMutex);
nsCOMPtr<nsIEventTarget> mTarget;
};
} // namespace mozilla::net

View File

@ -50,6 +50,11 @@ interface nsIWebTransport : nsISupports {
void sendDatagram(in Array<uint8_t> aData, in uint64_t aTrackingId);
void getMaxDatagramSize();
// This can be only called after onSessionReady().
// After this point, we can retarget the underlying WebTransportSessionProxy
// object off main thread.
[noscript] void retargetTo(in nsIEventTarget aTarget);
};
// Events related to a WebTransport session.