gecko-dev/ipc/glue/DataPipe.cpp
Nika Layzell 3b40268cc1 Bug 1818305 - Part 2: Add a streamStatus method to nsIInputStream, r=xpcom-reviewers,necko-reviewers,geckoview-reviewers,valentin,jesup,m_kato,mccr8
This is semantically similar to the existing available() method, however will
not block, and doesn't need to do the work to actually determine the number of
available bytes.

As part of this patch, I also fixed one available() implementation which was
incorrectly throwing NS_BASE_STREAM_WOULD_BLOCK.

Differential Revision: https://phabricator.services.mozilla.com/D170697
2023-03-15 19:52:34 +00:00

742 lines
26 KiB
C++

/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "DataPipe.h"
#include "mozilla/AlreadyAddRefed.h"
#include "mozilla/Assertions.h"
#include "mozilla/CheckedInt.h"
#include "mozilla/ErrorNames.h"
#include "mozilla/Logging.h"
#include "mozilla/MoveOnlyFunction.h"
#include "mozilla/ipc/InputStreamParams.h"
#include "nsIAsyncInputStream.h"
#include "nsStreamUtils.h"
#include "nsThreadUtils.h"
namespace mozilla {
namespace ipc {
LazyLogModule gDataPipeLog("DataPipe");
namespace data_pipe_detail {
// Helper for queueing up actions to be run once the mutex has been unlocked.
// Actions will be run in-order.
class MOZ_SCOPED_CAPABILITY DataPipeAutoLock {
public:
explicit DataPipeAutoLock(Mutex& aMutex) MOZ_CAPABILITY_ACQUIRE(aMutex)
: mMutex(aMutex) {
mMutex.Lock();
}
DataPipeAutoLock(const DataPipeAutoLock&) = delete;
DataPipeAutoLock& operator=(const DataPipeAutoLock&) = delete;
template <typename F>
void AddUnlockAction(F aAction) {
mActions.AppendElement(std::move(aAction));
}
~DataPipeAutoLock() MOZ_CAPABILITY_RELEASE() {
mMutex.Unlock();
for (auto& action : mActions) {
action();
}
}
private:
Mutex& mMutex;
AutoTArray<MoveOnlyFunction<void()>, 4> mActions;
};
static void DoNotifyOnUnlock(DataPipeAutoLock& aLock,
already_AddRefed<nsIRunnable> aCallback,
already_AddRefed<nsIEventTarget> aTarget) {
nsCOMPtr<nsIRunnable> callback{std::move(aCallback)};
nsCOMPtr<nsIEventTarget> target{std::move(aTarget)};
if (callback) {
aLock.AddUnlockAction(
[callback = std::move(callback), target = std::move(target)]() mutable {
if (target) {
target->Dispatch(callback.forget());
} else {
NS_DispatchBackgroundTask(callback.forget());
}
});
}
}
class DataPipeLink : public NodeController::PortObserver {
public:
DataPipeLink(bool aReceiverSide, std::shared_ptr<Mutex> aMutex,
ScopedPort aPort, SharedMemory* aShmem, uint32_t aCapacity,
nsresult aPeerStatus, uint32_t aOffset, uint32_t aAvailable)
: mMutex(std::move(aMutex)),
mPort(std::move(aPort)),
mShmem(aShmem),
mCapacity(aCapacity),
mReceiverSide(aReceiverSide),
mPeerStatus(aPeerStatus),
mOffset(aOffset),
mAvailable(aAvailable) {}
void Init() MOZ_EXCLUDES(*mMutex) {
{
DataPipeAutoLock lock(*mMutex);
if (NS_FAILED(mPeerStatus)) {
return;
}
MOZ_ASSERT(mPort.IsValid());
mPort.Controller()->SetPortObserver(mPort.Port(), this);
}
OnPortStatusChanged();
}
void OnPortStatusChanged() final MOZ_EXCLUDES(*mMutex);
// Add a task to notify the callback after `aLock` is unlocked.
//
// This method is safe to call multiple times, as after the first time it is
// called, `mCallback` will be cleared.
void NotifyOnUnlock(DataPipeAutoLock& aLock) MOZ_REQUIRES(*mMutex) {
DoNotifyOnUnlock(aLock, mCallback.forget(), mCallbackTarget.forget());
}
void SendBytesConsumedOnUnlock(DataPipeAutoLock& aLock, uint32_t aBytes)
MOZ_REQUIRES(*mMutex) {
MOZ_LOG(gDataPipeLog, LogLevel::Verbose,
("SendOnUnlock CONSUMED(%u) %s", aBytes, Describe(aLock).get()));
if (NS_FAILED(mPeerStatus)) {
return;
}
// `mPort` may be destroyed by `SetPeerError` after the DataPipe is unlocked
// but before we send the message. The strong controller and port references
// will allow us to try to send the message anyway, and it will be safely
// dropped if the port has already been closed. CONSUMED messages are safe
// to deliver out-of-order, so we don't need to worry about ordering here.
aLock.AddUnlockAction([controller = RefPtr{mPort.Controller()},
port = mPort.Port(), aBytes]() mutable {
auto message = MakeUnique<IPC::Message>(
MSG_ROUTING_NONE, DATA_PIPE_BYTES_CONSUMED_MESSAGE_TYPE);
IPC::MessageWriter writer(*message);
WriteParam(&writer, aBytes);
controller->SendUserMessage(port, std::move(message));
});
}
void SetPeerError(DataPipeAutoLock& aLock, nsresult aStatus,
bool aSendClosed = false) MOZ_REQUIRES(*mMutex) {
MOZ_LOG(gDataPipeLog, LogLevel::Debug,
("SetPeerError(%s%s) %s", GetStaticErrorName(aStatus),
aSendClosed ? ", send" : "", Describe(aLock).get()));
// The pipe was closed or errored. Clear the observer reference back
// to this type from the port layer, and ensure we notify waiters.
MOZ_ASSERT(NS_SUCCEEDED(mPeerStatus));
mPeerStatus = NS_SUCCEEDED(aStatus) ? NS_BASE_STREAM_CLOSED : aStatus;
aLock.AddUnlockAction([port = std::move(mPort), aStatus, aSendClosed] {
if (aSendClosed) {
auto message = MakeUnique<IPC::Message>(MSG_ROUTING_NONE,
DATA_PIPE_CLOSED_MESSAGE_TYPE);
IPC::MessageWriter writer(*message);
WriteParam(&writer, aStatus);
port.Controller()->SendUserMessage(port.Port(), std::move(message));
}
// The `ScopedPort` being destroyed with this action will close it,
// clearing the observer reference from the ports layer.
});
NotifyOnUnlock(aLock);
}
nsCString Describe(DataPipeAutoLock& aLock) const MOZ_REQUIRES(*mMutex) {
return nsPrintfCString(
"[%s(%p) c=%u e=%s o=%u a=%u, cb=%s]",
mReceiverSide ? "Receiver" : "Sender", this, mCapacity,
GetStaticErrorName(mPeerStatus), mOffset, mAvailable,
mCallback ? (mCallbackClosureOnly ? "clo" : "yes") : "no");
}
// This mutex is shared with the `DataPipeBase` which owns this
// `DataPipeLink`.
std::shared_ptr<Mutex> mMutex;
ScopedPort mPort MOZ_GUARDED_BY(*mMutex);
const RefPtr<SharedMemory> mShmem;
const uint32_t mCapacity;
const bool mReceiverSide;
bool mProcessingSegment MOZ_GUARDED_BY(*mMutex) = false;
nsresult mPeerStatus MOZ_GUARDED_BY(*mMutex) = NS_OK;
uint32_t mOffset MOZ_GUARDED_BY(*mMutex) = 0;
uint32_t mAvailable MOZ_GUARDED_BY(*mMutex) = 0;
bool mCallbackClosureOnly MOZ_GUARDED_BY(*mMutex) = false;
nsCOMPtr<nsIRunnable> mCallback MOZ_GUARDED_BY(*mMutex);
nsCOMPtr<nsIEventTarget> mCallbackTarget MOZ_GUARDED_BY(*mMutex);
};
void DataPipeLink::OnPortStatusChanged() {
DataPipeAutoLock lock(*mMutex);
while (NS_SUCCEEDED(mPeerStatus)) {
UniquePtr<IPC::Message> message;
if (!mPort.Controller()->GetMessage(mPort.Port(), &message)) {
SetPeerError(lock, NS_BASE_STREAM_CLOSED);
return;
}
if (!message) {
return; // no more messages
}
IPC::MessageReader reader(*message);
switch (message->type()) {
case DATA_PIPE_CLOSED_MESSAGE_TYPE: {
nsresult status = NS_OK;
if (!ReadParam(&reader, &status)) {
NS_WARNING("Unable to parse nsresult error from peer");
status = NS_ERROR_UNEXPECTED;
}
MOZ_LOG(gDataPipeLog, LogLevel::Debug,
("Got CLOSED(%s) %s", GetStaticErrorName(status),
Describe(lock).get()));
SetPeerError(lock, status);
return;
}
case DATA_PIPE_BYTES_CONSUMED_MESSAGE_TYPE: {
uint32_t consumed = 0;
if (!ReadParam(&reader, &consumed)) {
NS_WARNING("Unable to parse bytes consumed from peer");
SetPeerError(lock, NS_ERROR_UNEXPECTED);
return;
}
MOZ_LOG(gDataPipeLog, LogLevel::Verbose,
("Got CONSUMED(%u) %s", consumed, Describe(lock).get()));
auto newAvailable = CheckedUint32{mAvailable} + consumed;
if (!newAvailable.isValid() || newAvailable.value() > mCapacity) {
NS_WARNING("Illegal bytes consumed message received from peer");
SetPeerError(lock, NS_ERROR_UNEXPECTED);
return;
}
mAvailable = newAvailable.value();
if (!mCallbackClosureOnly) {
NotifyOnUnlock(lock);
}
break;
}
default: {
NS_WARNING("Illegal message type received from peer");
SetPeerError(lock, NS_ERROR_UNEXPECTED);
return;
}
}
}
}
DataPipeBase::DataPipeBase(bool aReceiverSide, nsresult aError)
: mMutex(std::make_shared<Mutex>(aReceiverSide ? "DataPipeReceiver"
: "DataPipeSender")),
mStatus(NS_SUCCEEDED(aError) ? NS_BASE_STREAM_CLOSED : aError) {}
DataPipeBase::DataPipeBase(bool aReceiverSide, ScopedPort aPort,
SharedMemory* aShmem, uint32_t aCapacity,
nsresult aPeerStatus, uint32_t aOffset,
uint32_t aAvailable)
: mMutex(std::make_shared<Mutex>(aReceiverSide ? "DataPipeReceiver"
: "DataPipeSender")),
mStatus(NS_OK),
mLink(new DataPipeLink(aReceiverSide, mMutex, std::move(aPort), aShmem,
aCapacity, aPeerStatus, aOffset, aAvailable)) {
mLink->Init();
}
DataPipeBase::~DataPipeBase() {
DataPipeAutoLock lock(*mMutex);
CloseInternal(lock, NS_BASE_STREAM_CLOSED);
}
void DataPipeBase::CloseInternal(DataPipeAutoLock& aLock, nsresult aStatus) {
if (NS_FAILED(mStatus)) {
return;
}
MOZ_LOG(
gDataPipeLog, LogLevel::Debug,
("Closing(%s) %s", GetStaticErrorName(aStatus), Describe(aLock).get()));
// Set our status to an errored status.
mStatus = NS_SUCCEEDED(aStatus) ? NS_BASE_STREAM_CLOSED : aStatus;
RefPtr<DataPipeLink> link = mLink.forget();
AssertSameMutex(link->mMutex);
link->NotifyOnUnlock(aLock);
// If our peer hasn't disappeared yet, clean up our connection to it.
if (NS_SUCCEEDED(link->mPeerStatus)) {
link->SetPeerError(aLock, mStatus, /* aSendClosed */ true);
}
}
nsresult DataPipeBase::ProcessSegmentsInternal(
uint32_t aCount, ProcessSegmentFun aProcessSegment,
uint32_t* aProcessedCount) {
*aProcessedCount = 0;
while (*aProcessedCount < aCount) {
DataPipeAutoLock lock(*mMutex);
mMutex->AssertCurrentThreadOwns();
MOZ_LOG(gDataPipeLog, LogLevel::Verbose,
("ProcessSegments(%u of %u) %s", *aProcessedCount, aCount,
Describe(lock).get()));
nsresult status = CheckStatus(lock);
if (NS_FAILED(status)) {
if (*aProcessedCount > 0) {
return NS_OK;
}
return status == NS_BASE_STREAM_CLOSED ? NS_OK : status;
}
RefPtr<DataPipeLink> link = mLink;
AssertSameMutex(link->mMutex);
if (!link->mAvailable) {
MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(link->mPeerStatus),
"CheckStatus will have returned an error");
return *aProcessedCount > 0 ? NS_OK : NS_BASE_STREAM_WOULD_BLOCK;
}
MOZ_RELEASE_ASSERT(!link->mProcessingSegment,
"Only one thread may be processing a segment at a time");
// Extract an iterator over the next contiguous region of the shared memory
// buffer which will be used .
char* start = static_cast<char*>(link->mShmem->memory()) + link->mOffset;
char* iter = start;
char* end = start + std::min({aCount - *aProcessedCount, link->mAvailable,
link->mCapacity - link->mOffset});
// Record the consumed region from our segment when exiting this scope,
// telling our peer how many bytes were consumed. Hold on to `mLink` to keep
// the shmem mapped and make sure we can clean up even if we're closed while
// processing the shmem region.
link->mProcessingSegment = true;
auto scopeExit = MakeScopeExit([&] {
mMutex->AssertCurrentThreadOwns(); // should still be held
AssertSameMutex(link->mMutex);
MOZ_RELEASE_ASSERT(link->mProcessingSegment);
link->mProcessingSegment = false;
uint32_t totalProcessed = iter - start;
if (totalProcessed > 0) {
link->mOffset += totalProcessed;
MOZ_RELEASE_ASSERT(link->mOffset <= link->mCapacity);
if (link->mOffset == link->mCapacity) {
link->mOffset = 0;
}
link->mAvailable -= totalProcessed;
link->SendBytesConsumedOnUnlock(lock, totalProcessed);
}
MOZ_LOG(gDataPipeLog, LogLevel::Verbose,
("Processed Segment(%u of %zu) %s", totalProcessed, end - start,
Describe(lock).get()));
});
{
MutexAutoUnlock unlock(*mMutex);
while (iter < end) {
uint32_t processed = 0;
Span segment{iter, end};
nsresult rv = aProcessSegment(segment, *aProcessedCount, &processed);
if (NS_FAILED(rv) || processed == 0) {
return NS_OK;
}
MOZ_RELEASE_ASSERT(processed <= segment.Length());
iter += processed;
*aProcessedCount += processed;
}
}
}
MOZ_DIAGNOSTIC_ASSERT(*aProcessedCount == aCount,
"Must have processed exactly aCount");
return NS_OK;
}
void DataPipeBase::AsyncWaitInternal(already_AddRefed<nsIRunnable> aCallback,
already_AddRefed<nsIEventTarget> aTarget,
bool aClosureOnly) {
RefPtr<nsIRunnable> callback = std::move(aCallback);
RefPtr<nsIEventTarget> target = std::move(aTarget);
DataPipeAutoLock lock(*mMutex);
MOZ_LOG(gDataPipeLog, LogLevel::Debug,
("AsyncWait %s %p %s", aClosureOnly ? "(closure)" : "(ready)",
callback.get(), Describe(lock).get()));
if (NS_FAILED(CheckStatus(lock))) {
#ifdef DEBUG
if (mLink) {
AssertSameMutex(mLink->mMutex);
MOZ_ASSERT(!mLink->mCallback);
}
#endif
DoNotifyOnUnlock(lock, callback.forget(), target.forget());
return;
}
AssertSameMutex(mLink->mMutex);
// NOTE: After this point, `mLink` may have previously had a callback which is
// now being cancelled, make sure we clear `mCallback` even if we're going to
// call `aCallback` immediately.
mLink->mCallback = callback.forget();
mLink->mCallbackTarget = target.forget();
mLink->mCallbackClosureOnly = aClosureOnly;
if (!aClosureOnly && mLink->mAvailable) {
mLink->NotifyOnUnlock(lock);
}
}
nsresult DataPipeBase::CheckStatus(DataPipeAutoLock& aLock) {
// If our peer has closed or errored, we may need to close our local side to
// reflect the error code our peer provided. If we're a sender, we want to
// become closed immediately, whereas if we're a receiver we want to wait
// until our available buffer has been exhausted.
//
// NOTE: There may still be 2-stage writes/reads ongoing at this point, which
// will continue due to `mLink` being kept alive by the
// `ProcessSegmentsInternal` function.
if (NS_FAILED(mStatus)) {
return mStatus;
}
AssertSameMutex(mLink->mMutex);
if (NS_FAILED(mLink->mPeerStatus) &&
(!mLink->mReceiverSide || !mLink->mAvailable)) {
CloseInternal(aLock, mLink->mPeerStatus);
}
return mStatus;
}
nsCString DataPipeBase::Describe(DataPipeAutoLock& aLock) {
if (mLink) {
AssertSameMutex(mLink->mMutex);
return mLink->Describe(aLock);
}
return nsPrintfCString("[status=%s]", GetStaticErrorName(mStatus));
}
template <typename T>
void DataPipeWrite(IPC::MessageWriter* aWriter, T* aParam) {
DataPipeAutoLock lock(*aParam->mMutex);
MOZ_LOG(gDataPipeLog, LogLevel::Debug,
("IPC Write: %s", aParam->Describe(lock).get()));
WriteParam(aWriter, aParam->mStatus);
if (NS_FAILED(aParam->mStatus)) {
return;
}
aParam->AssertSameMutex(aParam->mLink->mMutex);
MOZ_RELEASE_ASSERT(!aParam->mLink->mProcessingSegment,
"cannot transfer while processing a segment");
// Serialize relevant parameters to our peer.
WriteParam(aWriter, std::move(aParam->mLink->mPort));
if (!aParam->mLink->mShmem->WriteHandle(aWriter)) {
aWriter->FatalError("failed to write DataPipe shmem handle");
MOZ_CRASH("failed to write DataPipe shmem handle");
}
WriteParam(aWriter, aParam->mLink->mCapacity);
WriteParam(aWriter, aParam->mLink->mPeerStatus);
WriteParam(aWriter, aParam->mLink->mOffset);
WriteParam(aWriter, aParam->mLink->mAvailable);
// Mark our peer as closed so we don't try to send to it when closing.
aParam->mLink->mPeerStatus = NS_ERROR_NOT_INITIALIZED;
aParam->CloseInternal(lock, NS_ERROR_NOT_INITIALIZED);
}
template <typename T>
bool DataPipeRead(IPC::MessageReader* aReader, RefPtr<T>* aResult) {
nsresult rv = NS_OK;
if (!ReadParam(aReader, &rv)) {
aReader->FatalError("failed to read DataPipe status");
return false;
}
if (NS_FAILED(rv)) {
*aResult = new T(rv);
MOZ_LOG(gDataPipeLog, LogLevel::Debug,
("IPC Read: [status=%s]", GetStaticErrorName(rv)));
return true;
}
ScopedPort port;
if (!ReadParam(aReader, &port)) {
aReader->FatalError("failed to read DataPipe port");
return false;
}
RefPtr shmem = new SharedMemoryBasic();
if (!shmem->ReadHandle(aReader)) {
aReader->FatalError("failed to read DataPipe shmem");
return false;
}
uint32_t capacity = 0;
nsresult peerStatus = NS_OK;
uint32_t offset = 0;
uint32_t available = 0;
if (!ReadParam(aReader, &capacity) || !ReadParam(aReader, &peerStatus) ||
!ReadParam(aReader, &offset) || !ReadParam(aReader, &available)) {
aReader->FatalError("failed to read DataPipe fields");
return false;
}
if (!capacity || offset >= capacity || available > capacity) {
aReader->FatalError("received DataPipe state values are inconsistent");
return false;
}
if (!shmem->Map(SharedMemory::PageAlignedSize(capacity))) {
aReader->FatalError("failed to map DataPipe shared memory region");
return false;
}
*aResult =
new T(std::move(port), shmem, capacity, peerStatus, offset, available);
if (MOZ_LOG_TEST(gDataPipeLog, LogLevel::Debug)) {
DataPipeAutoLock lock(*(*aResult)->mMutex);
MOZ_LOG(gDataPipeLog, LogLevel::Debug,
("IPC Read: %s", (*aResult)->Describe(lock).get()));
}
return true;
}
} // namespace data_pipe_detail
//-----------------------------------------------------------------------------
// DataPipeSender
//-----------------------------------------------------------------------------
NS_IMPL_ISUPPORTS(DataPipeSender, nsIOutputStream, nsIAsyncOutputStream,
DataPipeSender)
// nsIOutputStream
NS_IMETHODIMP DataPipeSender::Close() {
return CloseWithStatus(NS_BASE_STREAM_CLOSED);
}
NS_IMETHODIMP DataPipeSender::Flush() { return NS_OK; }
NS_IMETHODIMP DataPipeSender::StreamStatus() {
data_pipe_detail::DataPipeAutoLock lock(*mMutex);
return CheckStatus(lock);
}
NS_IMETHODIMP DataPipeSender::Write(const char* aBuf, uint32_t aCount,
uint32_t* aWriteCount) {
return WriteSegments(NS_CopyBufferToSegment, (void*)aBuf, aCount,
aWriteCount);
}
NS_IMETHODIMP DataPipeSender::WriteFrom(nsIInputStream* aFromStream,
uint32_t aCount,
uint32_t* aWriteCount) {
return WriteSegments(NS_CopyStreamToSegment, aFromStream, aCount,
aWriteCount);
}
NS_IMETHODIMP DataPipeSender::WriteSegments(nsReadSegmentFun aReader,
void* aClosure, uint32_t aCount,
uint32_t* aWriteCount) {
auto processSegment = [&](Span<char> aSpan, uint32_t aToOffset,
uint32_t* aReadCount) -> nsresult {
return aReader(this, aClosure, aSpan.data(), aToOffset, aSpan.Length(),
aReadCount);
};
return ProcessSegmentsInternal(aCount, processSegment, aWriteCount);
}
NS_IMETHODIMP DataPipeSender::IsNonBlocking(bool* _retval) {
*_retval = true;
return NS_OK;
}
// nsIAsyncOutputStream
NS_IMETHODIMP DataPipeSender::CloseWithStatus(nsresult reason) {
data_pipe_detail::DataPipeAutoLock lock(*mMutex);
CloseInternal(lock, reason);
return NS_OK;
}
NS_IMETHODIMP DataPipeSender::AsyncWait(nsIOutputStreamCallback* aCallback,
uint32_t aFlags,
uint32_t aRequestedCount,
nsIEventTarget* aTarget) {
AsyncWaitInternal(
aCallback ? NS_NewCancelableRunnableFunction(
"DataPipeReceiver::AsyncWait",
[self = RefPtr{this}, callback = RefPtr{aCallback}] {
MOZ_LOG(gDataPipeLog, LogLevel::Debug,
("Calling OnOutputStreamReady(%p, %p)",
callback.get(), self.get()));
callback->OnOutputStreamReady(self);
})
: nullptr,
do_AddRef(aTarget), aFlags & WAIT_CLOSURE_ONLY);
return NS_OK;
}
//-----------------------------------------------------------------------------
// DataPipeReceiver
//-----------------------------------------------------------------------------
NS_IMPL_ISUPPORTS(DataPipeReceiver, nsIInputStream, nsIAsyncInputStream,
nsIIPCSerializableInputStream, DataPipeReceiver)
// nsIInputStream
NS_IMETHODIMP DataPipeReceiver::Close() {
return CloseWithStatus(NS_BASE_STREAM_CLOSED);
}
NS_IMETHODIMP DataPipeReceiver::Available(uint64_t* _retval) {
data_pipe_detail::DataPipeAutoLock lock(*mMutex);
nsresult rv = CheckStatus(lock);
if (NS_FAILED(rv)) {
return rv;
}
AssertSameMutex(mLink->mMutex);
*_retval = mLink->mAvailable;
return NS_OK;
}
NS_IMETHODIMP DataPipeReceiver::StreamStatus() {
data_pipe_detail::DataPipeAutoLock lock(*mMutex);
return CheckStatus(lock);
}
NS_IMETHODIMP DataPipeReceiver::Read(char* aBuf, uint32_t aCount,
uint32_t* aReadCount) {
return ReadSegments(NS_CopySegmentToBuffer, aBuf, aCount, aReadCount);
}
NS_IMETHODIMP DataPipeReceiver::ReadSegments(nsWriteSegmentFun aWriter,
void* aClosure, uint32_t aCount,
uint32_t* aReadCount) {
auto processSegment = [&](Span<char> aSpan, uint32_t aToOffset,
uint32_t* aWriteCount) -> nsresult {
return aWriter(this, aClosure, aSpan.data(), aToOffset, aSpan.Length(),
aWriteCount);
};
return ProcessSegmentsInternal(aCount, processSegment, aReadCount);
}
NS_IMETHODIMP DataPipeReceiver::IsNonBlocking(bool* _retval) {
*_retval = true;
return NS_OK;
}
// nsIAsyncInputStream
NS_IMETHODIMP DataPipeReceiver::CloseWithStatus(nsresult aStatus) {
data_pipe_detail::DataPipeAutoLock lock(*mMutex);
CloseInternal(lock, aStatus);
return NS_OK;
}
NS_IMETHODIMP DataPipeReceiver::AsyncWait(nsIInputStreamCallback* aCallback,
uint32_t aFlags,
uint32_t aRequestedCount,
nsIEventTarget* aTarget) {
AsyncWaitInternal(
aCallback ? NS_NewCancelableRunnableFunction(
"DataPipeReceiver::AsyncWait",
[self = RefPtr{this}, callback = RefPtr{aCallback}] {
MOZ_LOG(gDataPipeLog, LogLevel::Debug,
("Calling OnInputStreamReady(%p, %p)",
callback.get(), self.get()));
callback->OnInputStreamReady(self);
})
: nullptr,
do_AddRef(aTarget), aFlags & WAIT_CLOSURE_ONLY);
return NS_OK;
}
// nsIIPCSerializableInputStream
void DataPipeReceiver::SerializedComplexity(uint32_t aMaxSize,
uint32_t* aSizeUsed,
uint32_t* aPipes,
uint32_t* aTransferables) {
// We report DataPipeReceiver as taking one transferrable to serialize, rather
// than one pipe, as we aren't starting a new pipe for this purpose, and are
// instead transferring an existing pipe.
*aTransferables = 1;
}
void DataPipeReceiver::Serialize(InputStreamParams& aParams, uint32_t aMaxSize,
uint32_t* aSizeUsed) {
*aSizeUsed = 0;
aParams = DataPipeReceiverStreamParams(this);
}
bool DataPipeReceiver::Deserialize(const InputStreamParams& aParams) {
MOZ_CRASH("Handled directly in `DeserializeInputStream`");
}
//-----------------------------------------------------------------------------
// NewDataPipe
//-----------------------------------------------------------------------------
nsresult NewDataPipe(uint32_t aCapacity, DataPipeSender** aSender,
DataPipeReceiver** aReceiver) {
if (!aCapacity) {
aCapacity = kDefaultDataPipeCapacity;
}
RefPtr<NodeController> controller = NodeController::GetSingleton();
if (!controller) {
return NS_ERROR_ILLEGAL_DURING_SHUTDOWN;
}
auto [senderPort, receiverPort] = controller->CreatePortPair();
auto shmem = MakeRefPtr<SharedMemoryBasic>();
size_t alignedCapacity = SharedMemory::PageAlignedSize(aCapacity);
if (!shmem->Create(alignedCapacity) || !shmem->Map(alignedCapacity)) {
return NS_ERROR_OUT_OF_MEMORY;
}
RefPtr sender = new DataPipeSender(std::move(senderPort), shmem, aCapacity,
NS_OK, 0, aCapacity);
RefPtr receiver = new DataPipeReceiver(std::move(receiverPort), shmem,
aCapacity, NS_OK, 0, 0);
sender.forget(aSender);
receiver.forget(aReceiver);
return NS_OK;
}
} // namespace ipc
} // namespace mozilla
void IPC::ParamTraits<mozilla::ipc::DataPipeSender*>::Write(
MessageWriter* aWriter, mozilla::ipc::DataPipeSender* aParam) {
mozilla::ipc::data_pipe_detail::DataPipeWrite(aWriter, aParam);
}
bool IPC::ParamTraits<mozilla::ipc::DataPipeSender*>::Read(
MessageReader* aReader, RefPtr<mozilla::ipc::DataPipeSender>* aResult) {
return mozilla::ipc::data_pipe_detail::DataPipeRead(aReader, aResult);
}
void IPC::ParamTraits<mozilla::ipc::DataPipeReceiver*>::Write(
MessageWriter* aWriter, mozilla::ipc::DataPipeReceiver* aParam) {
mozilla::ipc::data_pipe_detail::DataPipeWrite(aWriter, aParam);
}
bool IPC::ParamTraits<mozilla::ipc::DataPipeReceiver*>::Read(
MessageReader* aReader, RefPtr<mozilla::ipc::DataPipeReceiver>* aResult) {
return mozilla::ipc::data_pipe_detail::DataPipeRead(aReader, aResult);
}