Bug 1713148 - Part 5: Remove ThreadLink, r=handyman

This removes the last form of unique link between two MessageChannels so that
all MessageChannels communicate using PortLink, as it is fairly straightforward
to use PortLink to communicate between two threads in-process.

Differential Revision: https://phabricator.services.mozilla.com/D116672
This commit is contained in:
Nika Layzell 2021-06-22 18:17:25 +00:00
parent 7dba3a39f8
commit 5cc51fa13f
4 changed files with 45 additions and 208 deletions

View File

@ -36,6 +36,7 @@
#include "nsIMemoryReporter.h"
#include "nsISupportsImpl.h"
#include "nsPrintfCString.h"
#include "nsThreadUtils.h"
#ifdef OS_WIN
# include "mozilla/gfx/Logging.h"
@ -799,116 +800,57 @@ bool MessageChannel::Open(ScopedPort aPort, Side aSide,
return true;
}
static Side GetOppSide(Side aSide) {
switch (aSide) {
case ChildSide:
return ParentSide;
case ParentSide:
return ChildSide;
default:
return UnknownSide;
}
}
bool MessageChannel::Open(MessageChannel* aTargetChan,
nsISerialEventTarget* aEventTarget, Side aSide) {
// Opens a connection to another thread in the same process.
// This handshake proceeds as follows:
// - Let A be the thread initiating the process (either child or parent)
// and B be the other thread.
// - A spawns thread for B, obtaining B's message loop
// - A creates ProtocolChild and ProtocolParent instances.
// Let PA be the one appropriate to A and PB the side for B.
// - A invokes PA->Open(PB, ...):
// - set state to mChannelOpening
// - this will place a work item in B's worker loop (see next bullet)
// and then spins until PB->mChannelState becomes mChannelConnected
// - meanwhile, on PB's worker loop, the work item is removed and:
// - invokes PB->OpenAsOtherThread(PA, ...):
// - sets its state and that of PA to Connected
MOZ_ASSERT(aTargetChan, "Need a target channel");
MOZ_ASSERT(ChannelClosed == mChannelState, "Not currently closed");
CommonThreadOpenInit(aTargetChan, GetCurrentSerialEventTarget(), aSide);
std::pair<ScopedPort, ScopedPort> ports =
NodeController::GetSingleton()->CreatePortPair();
Side oppSide = UnknownSide;
switch (aSide) {
case ChildSide:
oppSide = ParentSide;
break;
case ParentSide:
oppSide = ChildSide;
break;
case UnknownSide:
break;
}
// NOTE: This dispatch must be sync as it captures locals by non-owning
// reference, however we can't use `NS_DISPATCH_SYNC` as that will spin a
// nested event loop, and doesn't work with certain types of calling event
// targets.
base::WaitableEvent event(/* manual_reset */ true,
/* initially_signaled */ false);
MOZ_ALWAYS_SUCCEEDS(aEventTarget->Dispatch(NS_NewCancelableRunnableFunction(
"ipc::MessageChannel::OpenAsOtherThread", [&]() {
aTargetChan->Open(std::move(ports.second), GetOppSide(aSide),
aEventTarget);
event.Signal();
})));
bool ok = event.Wait();
MOZ_RELEASE_ASSERT(ok);
mMonitor = new RefCountedMonitor();
MonitorAutoLock lock(*mMonitor);
mChannelState = ChannelOpening;
MOZ_ALWAYS_SUCCEEDS(aEventTarget->Dispatch(
NewNonOwningRunnableMethod<MessageChannel*, nsISerialEventTarget*, Side>(
"ipc::MessageChannel::OpenAsOtherThread", aTargetChan,
&MessageChannel::OpenAsOtherThread, this, aEventTarget, oppSide)));
while (ChannelOpening == mChannelState) mMonitor->Wait();
MOZ_RELEASE_ASSERT(ChannelConnected == mChannelState,
"not connected when awoken");
return (ChannelConnected == mChannelState);
}
void MessageChannel::OpenAsOtherThread(MessageChannel* aTargetChan,
nsISerialEventTarget* aThread,
Side aSide) {
// Invoked when the other side has begun the open.
MOZ_ASSERT(ChannelClosed == mChannelState, "Not currently closed");
MOZ_ASSERT(ChannelOpening == aTargetChan->mChannelState,
"Target channel not in the process of opening");
CommonThreadOpenInit(aTargetChan, aThread, aSide);
mMonitor = aTargetChan->mMonitor;
MonitorAutoLock lock(*mMonitor);
MOZ_RELEASE_ASSERT(ChannelOpening == aTargetChan->mChannelState,
"Target channel not in the process of opening");
mChannelState = ChannelConnected;
aTargetChan->mChannelState = ChannelConnected;
aTargetChan->mMonitor->Notify();
}
void MessageChannel::CommonThreadOpenInit(MessageChannel* aTargetChan,
nsISerialEventTarget* aThread,
Side aSide) {
MOZ_ASSERT(aThread);
mWorkerThread = aThread;
mListener->OnIPCChannelOpened();
mLink = MakeUnique<ThreadLink>(this, aTargetChan);
mSide = aSide;
// Now that the other side has connected, open the port on our side.
return Open(std::move(ports.first), aSide);
}
bool MessageChannel::OpenOnSameThread(MessageChannel* aTargetChan,
mozilla::ipc::Side aSide) {
nsCOMPtr<nsISerialEventTarget> currentThread = GetCurrentSerialEventTarget();
CommonThreadOpenInit(aTargetChan, currentThread, aSide);
Side oppSide = UnknownSide;
switch (aSide) {
case ChildSide:
oppSide = ParentSide;
break;
case ParentSide:
oppSide = ChildSide;
break;
case UnknownSide:
break;
}
mIsSameThreadChannel = true;
// XXX(nika): Avoid setting up a monitor for same thread channels? We
// shouldn't need it.
mMonitor = new RefCountedMonitor();
mChannelState = ChannelOpening;
aTargetChan->CommonThreadOpenInit(this, currentThread, oppSide);
auto [porta, portb] = NodeController::GetSingleton()->CreatePortPair();
aTargetChan->mIsSameThreadChannel = true;
aTargetChan->mMonitor = mMonitor;
mIsSameThreadChannel = true;
mChannelState = ChannelConnected;
aTargetChan->mChannelState = ChannelConnected;
return true;
auto* currentThread = GetCurrentSerialEventTarget();
return aTargetChan->Open(std::move(portb), GetOppSide(aSide),
currentThread) &&
Open(std::move(porta), aSide, currentThread);
}
bool MessageChannel::Send(UniquePtr<Message> aMsg) {

View File

@ -382,11 +382,6 @@ class MessageChannel : HasResultCodes {
#endif // defined(OS_WIN)
private:
void CommonThreadOpenInit(MessageChannel* aTargetChan,
nsISerialEventTarget* aThread, Side aSide);
void OpenAsOtherThread(MessageChannel* aTargetChan,
nsISerialEventTarget* aThread, Side aSide);
void PostErrorNotifyTask();
void OnNotifyMaybeChannelError();
void ReportConnectionError(const char* aChannelName,

View File

@ -49,95 +49,6 @@ MessageLink::~MessageLink() {
#endif
}
ThreadLink::ThreadLink(MessageChannel* aChan, MessageChannel* aTargetChan)
: MessageLink(aChan), mTargetChan(aTargetChan) {}
void ThreadLink::PrepareToDestroy() {
MOZ_ASSERT(mChan);
MOZ_ASSERT(mChan->mMonitor);
MonitorAutoLock lock(*mChan->mMonitor);
// Bug 848949: We need to prevent the other side
// from sending us any more messages to avoid Use-After-Free.
// The setup here is as shown:
//
// (Us) (Them)
// MessageChannel MessageChannel
// | ^ \ / ^ |
// | | X | |
// v | / \ | v
// ThreadLink ThreadLink
//
// We want to null out the diagonal link from their ThreadLink
// to our MessageChannel. Note that we must hold the monitor so
// that we do this atomically with respect to them trying to send
// us a message. Since the channels share the same monitor this
// also protects against the two PrepareToDestroy() calls racing.
//
//
// Why splitting is done in a method separate from ~ThreadLink:
//
// ThreadLinks are destroyed in MessageChannel::Clear(), when
// nullptr is assigned to the UniquePtr<> MessageChannel::mLink.
// This single line of code gets executed in three separate steps:
// 1. Load the value of mLink into a temporary.
// 2. Store nullptr in the mLink field.
// 3. Call the destructor on the temporary from step 1.
// This is all done without holding the monitor.
// The splitting operation, among other things, loads the mLink field
// of the other thread's MessageChannel while holding the monitor.
// If splitting was done in the destructor, and the two sides were
// both running MessageChannel::Clear(), then there would be a race
// between the store to mLink in Clear() and the load of mLink
// during splitting.
// Instead, we call PrepareToDestroy() prior to step 1. One thread or
// the other will run the entire method before the other thread,
// because this method acquires the monitor. Once that is done, the
// mTargetChan of both ThreadLink will be null, so they will no
// longer be able to access the other and so there won't be any races.
//
// An alternate approach would be to hold the monitor in Clear() or
// make mLink atomic, but MessageLink does not have to worry about
// Clear() racing with Clear(), so it would be inefficient.
if (mTargetChan) {
MOZ_ASSERT(mTargetChan->mLink);
static_cast<ThreadLink*>(mTargetChan->mLink.get())->mTargetChan = nullptr;
}
mTargetChan = nullptr;
}
void ThreadLink::SendMessage(UniquePtr<Message> msg) {
if (!mChan->mIsPostponingSends) {
mChan->AssertWorkerThread();
}
mChan->mMonitor->AssertCurrentThreadOwns();
if (mTargetChan) mTargetChan->OnMessageReceivedFromLink(std::move(*msg));
}
void ThreadLink::SendClose() {
mChan->AssertWorkerThread();
mChan->mMonitor->AssertCurrentThreadOwns();
mChan->mChannelState = ChannelClosed;
// In a ProcessLink, we would close our half the channel. This
// would show up on the other side as an error on the I/O thread.
// The I/O thread would then invoke OnChannelErrorFromLink().
// As usual, we skip that process and just invoke the
// OnChannelErrorFromLink() method directly.
if (mTargetChan) mTargetChan->OnChannelErrorFromLink();
}
bool ThreadLink::Unsound_IsClosed() const {
MonitorAutoLock lock(*mChan->mMonitor);
return mChan->mChannelState == ChannelClosed;
}
uint32_t ThreadLink::Unsound_NumQueuedMessages() const {
// ThreadLinks don't have a message queue.
return 0;
}
class PortLink::PortObserverThunk : public NodeController::PortObserver {
public:
PortObserverThunk(RefCountedMonitor* aMonitor, PortLink* aLink)
@ -169,10 +80,16 @@ PortLink::PortLink(MessageChannel* aChan, ScopedPort aPort)
// Dispatch an event to the IO loop to trigger an initial
// `OnPortStatusChanged` to deliver any pending messages. This needs to be run
// asynchronously from a different thread for now due to assertions in
// asynchronously from a different thread (or in the case of a same-thread
// channel, from the current thread), for now due to assertions in
// `MessageChannel`.
XRE_GetIOMessageLoop()->PostTask(NewRunnableMethod(
"PortLink::Open", mObserver, &PortObserverThunk::OnPortStatusChanged));
nsCOMPtr<nsIRunnable> openRunnable = NewRunnableMethod(
"PortLink::Open", mObserver, &PortObserverThunk::OnPortStatusChanged);
if (aChan->mIsSameThreadChannel) {
aChan->mWorkerThread->Dispatch(openRunnable.forget());
} else {
XRE_GetIOMessageLoop()->PostTask(openRunnable.forget());
}
}
PortLink::~PortLink() {

View File

@ -65,23 +65,6 @@ class MessageLink {
MessageChannel* mChan;
};
class ThreadLink : public MessageLink {
public:
ThreadLink(MessageChannel* aChan, MessageChannel* aTargetChan);
virtual ~ThreadLink() = default;
virtual void PrepareToDestroy() override;
virtual void SendMessage(mozilla::UniquePtr<Message> msg) override;
virtual void SendClose() override;
virtual bool Unsound_IsClosed() const override;
virtual uint32_t Unsound_NumQueuedMessages() const override;
protected:
MessageChannel* mTargetChan;
};
class PortLink final : public MessageLink {
using PortRef = mojo::core::ports::PortRef;
using PortStatus = mojo::core::ports::PortStatus;