Bug 1816925: Implement WebTransport SendOrder at the DOM level r=kershaw,necko-reviewers

Differential Revision: https://phabricator.services.mozilla.com/D178491
This commit is contained in:
Randell Jesup 2023-08-31 17:58:51 +00:00
parent 8f09f6a6ff
commit 0d5b60fd40
11 changed files with 75 additions and 13 deletions

View File

@ -175,18 +175,20 @@ class ReceiveStream final : public nsIWebTransportStreamCallback {
std::function<void(uint64_t,
WebTransportParent::OnResetOrStopSendingCallback&&)>&&
aStreamCallback,
nsCOMPtr<nsISerialEventTarget>& aSocketThread)
Maybe<int64_t> aSendOrder, nsCOMPtr<nsISerialEventTarget>& aSocketThread)
: mUniResolver(aResolver),
mStreamCallback(std::move(aStreamCallback)),
mSendOrder(aSendOrder),
mSocketThread(aSocketThread) {}
ReceiveStream(
WebTransportParent::CreateBidirectionalStreamResolver&& aResolver,
std::function<void(uint64_t,
WebTransportParent::OnResetOrStopSendingCallback&&)>&&
aStreamCallback,
nsCOMPtr<nsISerialEventTarget>& aSocketThread)
Maybe<int64_t> aSendOrder, nsCOMPtr<nsISerialEventTarget>& aSocketThread)
: mBiResolver(aResolver),
mStreamCallback(std::move(aStreamCallback)),
mSendOrder(aSendOrder),
mSocketThread(aSocketThread) {}
private:
@ -196,6 +198,7 @@ class ReceiveStream final : public nsIWebTransportStreamCallback {
std::function<void(uint64_t,
WebTransportParent::OnResetOrStopSendingCallback&&)>
mStreamCallback;
Maybe<int64_t> mSendOrder;
nsCOMPtr<nsISerialEventTarget> mSocketThread;
};
@ -207,6 +210,9 @@ NS_IMETHODIMP ReceiveStream::OnBidirectionalStreamReady(
LOG(("Bidirectional stream ready!"));
MOZ_ASSERT(mSocketThread->IsOnCurrentThread());
if (mSendOrder.isSome()) {
aStream->SetSendOrder(mSendOrder.value());
}
RefPtr<mozilla::ipc::DataPipeSender> inputsender;
RefPtr<mozilla::ipc::DataPipeReceiver> inputreceiver;
nsresult rv =
@ -283,6 +289,9 @@ ReceiveStream::OnUnidirectionalStreamReady(nsIWebTransportSendStream* aStream) {
// We should be on the Socket Thread
MOZ_ASSERT(mSocketThread->IsOnCurrentThread());
if (mSendOrder.isSome()) {
aStream->SetSendOrder(mSendOrder.value());
}
RefPtr<::mozilla::ipc::DataPipeSender> sender;
RefPtr<::mozilla::ipc::DataPipeReceiver> receiver;
nsresult rv = NewDataPipe(mozilla::ipc::kDefaultDataPipeCapacity,
@ -352,7 +361,7 @@ IPCResult WebTransportParent::RecvCreateUnidirectionalStream(
std::move(aCallback));
};
RefPtr<ReceiveStream> callback = new ReceiveStream(
std::move(aResolver), std::move(streamCb), mSocketThread);
std::move(aResolver), std::move(streamCb), aSendOrder, mSocketThread);
nsresult rv;
rv = mWebTransport->CreateOutgoingUnidirectionalStream(callback);
if (NS_FAILED(rv)) {
@ -375,7 +384,7 @@ IPCResult WebTransportParent::RecvCreateBidirectionalStream(
std::move(aCallback));
};
RefPtr<ReceiveStream> callback = new ReceiveStream(
std::move(aResolver), std::move(streamCb), mSocketThread);
std::move(aResolver), std::move(streamCb), aSendOrder, mSocketThread);
nsresult rv;
rv = mWebTransport->CreateOutgoingBidirectionalStream(callback);
if (NS_FAILED(rv)) {

View File

@ -479,7 +479,7 @@ nsresult Http3Session::ProcessEvents() {
break;
}
case Http3Event::Tag::DataWritable: {
MOZ_ASSERT(CanSandData());
MOZ_ASSERT(CanSendData());
LOG(("Http3Session::ProcessEvents - DataWritable"));
RefPtr<Http3StreamBase> stream =
@ -1061,7 +1061,7 @@ bool Http3Session::AddStream(nsAHttpTransaction* aHttpTransaction,
bool Http3Session::CanReuse() {
// TODO: we assume "pooling" is disabled here, so we don't allow this session
// to be reused. "pooling" will be implemented in bug 1815735.
return CanSandData() && !(mGoawayReceived || mShouldClose) &&
return CanSendData() && !(mGoawayReceived || mShouldClose) &&
!mHasWebTransportSession;
}
@ -1485,12 +1485,13 @@ nsresult Http3Session::SendData(nsIUDPSocket* socket) {
// 3) if we still have streams ready to write call ResumeSend()(we may
// still have such streams because on an stream error we return earlier
// to let the error be handled).
// 4)
nsresult rv = NS_OK;
RefPtr<Http3StreamBase> stream;
// Step 1)
while (CanSandData() && (stream = mReadyForWrite.PopFront())) {
while (CanSendData() && (stream = mReadyForWrite.PopFront())) {
LOG(("Http3Session::SendData call ReadSegments from stream=%p [this=%p]",
stream.get(), this));
@ -1515,7 +1516,7 @@ nsresult Http3Session::SendData(nsIUDPSocket* socket) {
if (NS_SUCCEEDED(rv)) {
// Step 2:
// Call actuall network write.
// Call actual network write.
rv = ProcessOutput(socket);
}
@ -1532,13 +1533,13 @@ nsresult Http3Session::SendData(nsIUDPSocket* socket) {
void Http3Session::StreamReadyToWrite(Http3StreamBase* aStream) {
MOZ_ASSERT(aStream);
mReadyForWrite.Push(aStream);
if (CanSandData() && mConnection) {
if (CanSendData() && mConnection) {
Unused << mConnection->ResumeSend();
}
}
void Http3Session::MaybeResumeSend() {
if ((mReadyForWrite.GetSize() > 0) && CanSandData() && mConnection) {
if ((mReadyForWrite.GetSize() > 0) && CanSendData() && mConnection) {
Unused << mConnection->ResumeSend();
}
}
@ -2023,7 +2024,7 @@ bool Http3Session::JoinConnection(const nsACString& hostname, int32_t port) {
bool Http3Session::RealJoinConnection(const nsACString& hostname, int32_t port,
bool justKidding) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
if (!mConnection || !CanSandData() || mShouldClose || mGoawayReceived) {
if (!mConnection || !CanSendData() || mShouldClose || mGoawayReceived) {
return false;
}
@ -2356,7 +2357,7 @@ void Http3Session::Finish0Rtt(bool aRestart) {
}
void Http3Session::ReportHttp3Connection() {
if (CanSandData() && !mHttp3ConnectionReported) {
if (CanSendData() && !mHttp3ConnectionReported) {
mHttp3ConnectionReported = true;
gHttpHandler->ConnMgr()->ReportHttp3Connection(mUdpConn);
MaybeResumeSend();
@ -2481,4 +2482,12 @@ uint64_t Http3Session::MaxDatagramSize(uint64_t aSessionId) {
return size;
}
void Http3Session::SetSendOrder(Http3StreamBase* aStream, int64_t aSendOrder) {
if (!IsClosing()) {
nsresult rv = mHttp3Connection->WebTransportSetSendOrder(
aStream->StreamId(), aSendOrder);
MOZ_ASSERT(NS_SUCCEEDED(rv));
}
}
} // namespace mozilla::net

View File

@ -133,7 +133,7 @@ class Http3Session final : public nsAHttpTransaction, public nsAHttpConnection {
uint32_t controlFlags, nsIInterfaceRequestor* callbacks);
bool IsConnected() const { return mState == CONNECTED; }
bool CanSandData() const {
bool CanSendData() const {
return (mState == CONNECTED) || (mState == ZERORTT);
}
bool IsClosing() const { return (mState == CLOSING || mState == CLOSED); }
@ -215,6 +215,8 @@ class Http3Session final : public nsAHttpTransaction, public nsAHttpConnection {
uint64_t MaxDatagramSize(uint64_t aSessionId);
void SetSendOrder(Http3StreamBase* aStream, int64_t aSendOrder);
void CloseWebTransportConn();
private:
@ -278,6 +280,7 @@ class Http3Session final : public nsAHttpTransaction, public nsAHttpConnection {
mStreamTransactionHash;
nsRefPtrDeque<Http3StreamBase> mReadyForWrite;
nsTArray<RefPtr<Http3StreamBase>> mSlowConsumersReadyForRead;
nsRefPtrDeque<Http3StreamBase> mQueuedStreams;

View File

@ -56,6 +56,8 @@ class Http3StreamBase : public SupportsWeakPtr, public ARefBase {
~Http3StreamBase();
uint64_t mStreamId{UINT64_MAX};
int64_t mSendOrder{0};
bool mSendOrderIsSet{false};
RefPtr<nsAHttpTransaction> mTransaction;
RefPtr<Http3Session> mSession;
bool mQueued{false};

View File

@ -657,4 +657,8 @@ void Http3WebTransportStream::SendStopSending(uint8_t aErrorCode) {
mSession->StreamHasDataToWrite(this);
}
void Http3WebTransportStream::SetSendOrder(int64_t aSendOrder) {
mSession->SetSendOrder(this, aSendOrder);
}
} // namespace mozilla::net

View File

@ -48,6 +48,8 @@ class Http3WebTransportStream final : public Http3StreamBase,
}
Http3Stream* GetHttp3Stream() override { return nullptr; }
void SetSendOrder(int64_t aSendOrder);
[[nodiscard]] nsresult ReadSegments() override;
[[nodiscard]] nsresult WriteSegments() override;

View File

@ -223,6 +223,11 @@ NS_IMETHODIMP WebTransportStreamProxy::GetStreamId(uint64_t* aId) {
return NS_OK;
}
NS_IMETHODIMP WebTransportStreamProxy::SetSendOrder(int64_t aSendOrder) {
mWebTransportStream->SetSendOrder(aSendOrder);
return NS_OK;
}
//------------------------------------------------------------------------------
// WebTransportStreamProxy::AsyncInputStreamWrapper
//------------------------------------------------------------------------------
@ -285,6 +290,9 @@ NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::ReadSegments(
LOG(("WebTransportStreamProxy::AsyncInputStreamWrapper::ReadSegments %p",
this));
nsresult rv = mStream->ReadSegments(aWriter, aClosure, aCount, aResult);
if (*aResult > 0) {
LOG((" Read %u bytes", *aResult));
}
MaybeCloseStream();
return rv;
}
@ -330,6 +338,8 @@ WebTransportStreamProxy::AsyncOutputStreamWrapper::StreamStatus() {
NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::Write(
const char* aBuf, uint32_t aCount, uint32_t* aResult) {
LOG(("WebTransportStreamProxy::AsyncOutputStreamWrapper::Write %p %u bytes",
this, aCount));
return mStream->Write(aBuf, aCount, aResult);
}

View File

@ -38,6 +38,7 @@ class WebTransportStreamProxy final
NS_IMETHOD GetOutputStream(nsIAsyncOutputStream** aOut) override;
NS_IMETHOD GetStreamId(uint64_t* aId) override;
NS_IMETHOD SetSendOrder(int64_t aSendOrder) override;
private:
virtual ~WebTransportStreamProxy();

View File

@ -62,6 +62,7 @@ interface nsIWebTransportSendStream : nsISupports {
void getSendStreamStats(in nsIWebTransportStreamStatsCallback aCallback);
readonly attribute nsIAsyncOutputStream outputStream;
readonly attribute uint64_t streamId;
void setSendOrder(in int64_t aSendOrder);
};
[builtinclass, scriptable, uuid(f9ecb509-36db-4689-97d6-137639a08750)]
@ -81,4 +82,5 @@ interface nsIWebTransportBidirectionalStream : nsISupports {
readonly attribute nsIAsyncInputStream inputStream;
readonly attribute nsIAsyncOutputStream outputStream;
readonly attribute uint64_t streamId;
void setSendOrder(in int64_t aSendOrder);
};

View File

@ -149,6 +149,11 @@ class NeqoHttp3Conn final {
aResult);
}
nsresult WebTransportSetSendOrder(uint64_t aSessionId, int64_t aSendOrder) {
return neqo_http3conn_webtransport_set_sendorder(this, aSessionId,
aSendOrder);
}
private:
NeqoHttp3Conn() = delete;
~NeqoHttp3Conn() = delete;

View File

@ -1329,3 +1329,18 @@ pub extern "C" fn neqo_http3conn_webtransport_max_datagram_size(
Err(_) => NS_ERROR_UNEXPECTED,
}
}
#[no_mangle]
pub extern "C" fn neqo_http3conn_webtransport_set_sendorder(
conn: &mut NeqoHttp3Conn,
stream_id: u64,
sendorder: i64,
) -> nsresult {
match conn
.conn
.webtransport_set_sendorder(StreamId::from(stream_id), sendorder)
{
Ok(()) => NS_OK,
Err(_) => NS_ERROR_UNEXPECTED,
}
}