mirror of
https://github.com/mozilla/gecko-dev.git
synced 2024-12-11 16:32:59 +00:00
646 lines
19 KiB
C++
646 lines
19 KiB
C++
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
|
|
* vim: sw=4 ts=4 et :
|
|
*/
|
|
/* This Source Code Form is subject to the terms of the Mozilla Public
|
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
|
|
|
#include "mozilla/ipc/RPCChannel.h"
|
|
#include "mozilla/ipc/ProtocolUtils.h"
|
|
|
|
#include "nsDebug.h"
|
|
#include "nsTraceRefcnt.h"
|
|
|
|
#define RPC_ASSERT(_cond, ...) \
|
|
do { \
|
|
if (!(_cond)) \
|
|
DebugAbort(__FILE__, __LINE__, #_cond,## __VA_ARGS__); \
|
|
} while (0)
|
|
|
|
using mozilla::MonitorAutoLock;
|
|
using mozilla::MonitorAutoUnlock;
|
|
|
|
template<>
|
|
struct RunnableMethodTraits<mozilla::ipc::RPCChannel>
|
|
{
|
|
static void RetainCallee(mozilla::ipc::RPCChannel* obj) { }
|
|
static void ReleaseCallee(mozilla::ipc::RPCChannel* obj) { }
|
|
};
|
|
|
|
|
|
namespace mozilla {
|
|
namespace ipc {
|
|
|
|
RPCChannel::RPCChannel(RPCListener* aListener)
|
|
: SyncChannel(aListener),
|
|
mPending(),
|
|
mStack(),
|
|
mOutOfTurnReplies(),
|
|
mDeferred(),
|
|
mRemoteStackDepthGuess(0),
|
|
mSawRPCOutMsg(false)
|
|
{
|
|
MOZ_COUNT_CTOR(RPCChannel);
|
|
|
|
mDequeueOneTask = new RefCountedTask(NewRunnableMethod(
|
|
this,
|
|
&RPCChannel::OnMaybeDequeueOne));
|
|
}
|
|
|
|
RPCChannel::~RPCChannel()
|
|
{
|
|
MOZ_COUNT_DTOR(RPCChannel);
|
|
RPC_ASSERT(mCxxStackFrames.empty(), "mismatched CxxStackFrame ctor/dtors");
|
|
Clear();
|
|
}
|
|
|
|
void
|
|
RPCChannel::Clear()
|
|
{
|
|
mDequeueOneTask->Cancel();
|
|
|
|
AsyncChannel::Clear();
|
|
}
|
|
|
|
bool
|
|
RPCChannel::EventOccurred() const
|
|
{
|
|
AssertWorkerThread();
|
|
mMonitor->AssertCurrentThreadOwns();
|
|
RPC_ASSERT(StackDepth() > 0, "not in wait loop");
|
|
|
|
return (!Connected() ||
|
|
!mPending.empty() ||
|
|
!mUrgent.empty() ||
|
|
(!mOutOfTurnReplies.empty() &&
|
|
mOutOfTurnReplies.find(mStack.top().seqno())
|
|
!= mOutOfTurnReplies.end()));
|
|
}
|
|
|
|
bool
|
|
RPCChannel::Send(Message* msg)
|
|
{
|
|
Message copy = *msg;
|
|
CxxStackFrame f(*this, OUT_MESSAGE, ©);
|
|
return AsyncChannel::Send(msg);
|
|
}
|
|
|
|
bool
|
|
RPCChannel::Send(Message* msg, Message* reply)
|
|
{
|
|
Message copy = *msg;
|
|
CxxStackFrame f(*this, OUT_MESSAGE, ©);
|
|
return SyncChannel::Send(msg, reply);
|
|
}
|
|
|
|
bool
|
|
RPCChannel::Call(Message* _msg, Message* reply)
|
|
{
|
|
RPC_ASSERT(!mPendingReply, "should not be waiting for a reply");
|
|
|
|
nsAutoPtr<Message> msg(_msg);
|
|
AssertWorkerThread();
|
|
mMonitor->AssertNotCurrentThreadOwns();
|
|
RPC_ASSERT(!ProcessingSyncMessage() || msg->priority() == IPC::Message::PRIORITY_HIGH,
|
|
"violation of sync handler invariant");
|
|
RPC_ASSERT(msg->is_rpc(), "can only Call() RPC messages here");
|
|
|
|
#ifdef OS_WIN
|
|
SyncStackFrame frame(this, true);
|
|
#endif
|
|
|
|
Message copy = *msg;
|
|
CxxStackFrame f(*this, OUT_MESSAGE, ©);
|
|
|
|
MonitorAutoLock lock(*mMonitor);
|
|
|
|
if (!Connected()) {
|
|
ReportConnectionError("RPCChannel");
|
|
return false;
|
|
}
|
|
|
|
bool urgent = (copy.priority() == IPC::Message::PRIORITY_HIGH);
|
|
|
|
msg->set_seqno(NextSeqno());
|
|
msg->set_rpc_remote_stack_depth_guess(mRemoteStackDepthGuess);
|
|
msg->set_rpc_local_stack_depth(1 + StackDepth());
|
|
mStack.push(*msg);
|
|
|
|
mLink->SendMessage(msg.forget());
|
|
|
|
while (1) {
|
|
// if a handler invoked by *Dispatch*() spun a nested event
|
|
// loop, and the connection was broken during that loop, we
|
|
// might have already processed the OnError event. if so,
|
|
// trying another loop iteration will be futile because
|
|
// channel state will have been cleared
|
|
if (!Connected()) {
|
|
ReportConnectionError("RPCChannel");
|
|
return false;
|
|
}
|
|
|
|
// now might be the time to process a message deferred because
|
|
// of race resolution
|
|
MaybeUndeferIncall();
|
|
|
|
// here we're waiting for something to happen. see long
|
|
// comment about the queue in RPCChannel.h
|
|
while (!EventOccurred()) {
|
|
bool maybeTimedOut = !RPCChannel::WaitForNotify();
|
|
|
|
if (EventOccurred() ||
|
|
// we might have received a "subtly deferred" message
|
|
// in a nested loop that it's now time to process
|
|
(!maybeTimedOut &&
|
|
(!mDeferred.empty() || !mOutOfTurnReplies.empty())))
|
|
break;
|
|
|
|
if (maybeTimedOut && !ShouldContinueFromTimeout())
|
|
return false;
|
|
}
|
|
|
|
if (!Connected()) {
|
|
ReportConnectionError("RPCChannel");
|
|
return false;
|
|
}
|
|
|
|
Message recvd;
|
|
MessageMap::iterator it;
|
|
if (!mOutOfTurnReplies.empty() &&
|
|
((it = mOutOfTurnReplies.find(mStack.top().seqno())) !=
|
|
mOutOfTurnReplies.end())) {
|
|
recvd = it->second;
|
|
mOutOfTurnReplies.erase(it);
|
|
}
|
|
else if (!mUrgent.empty()) {
|
|
recvd = mUrgent.front();
|
|
mUrgent.pop_front();
|
|
}
|
|
else if (!mPending.empty()) {
|
|
recvd = mPending.front();
|
|
mPending.pop_front();
|
|
}
|
|
else {
|
|
// because of subtleties with nested event loops, it's
|
|
// possible that we got here and nothing happened. or, we
|
|
// might have a deferred in-call that needs to be
|
|
// processed. either way, we won't break the inner while
|
|
// loop again until something new happens.
|
|
continue;
|
|
}
|
|
|
|
if (!recvd.is_rpc()) {
|
|
if (urgent && recvd.priority() != IPC::Message::PRIORITY_HIGH) {
|
|
// If we're waiting for an urgent reply, don't process any
|
|
// messages yet.
|
|
mNonUrgentDeferred.push_back(recvd);
|
|
} else if (recvd.is_sync()) {
|
|
RPC_ASSERT(mPending.empty(),
|
|
"other side should have been blocked");
|
|
MonitorAutoUnlock unlock(*mMonitor);
|
|
CxxStackFrame f(*this, IN_MESSAGE, &recvd);
|
|
SyncChannel::OnDispatchMessage(recvd);
|
|
} else {
|
|
MonitorAutoUnlock unlock(*mMonitor);
|
|
CxxStackFrame f(*this, IN_MESSAGE, &recvd);
|
|
AsyncChannel::OnDispatchMessage(recvd);
|
|
}
|
|
continue;
|
|
}
|
|
|
|
RPC_ASSERT(recvd.is_rpc(), "wtf???");
|
|
|
|
if (recvd.is_reply()) {
|
|
RPC_ASSERT(0 < mStack.size(), "invalid RPC stack");
|
|
|
|
const Message& outcall = mStack.top();
|
|
|
|
// in the parent, seqno's increase from 0, and in the
|
|
// child, they decrease from 0
|
|
if ((!mChild && recvd.seqno() < outcall.seqno()) ||
|
|
(mChild && recvd.seqno() > outcall.seqno())) {
|
|
mOutOfTurnReplies[recvd.seqno()] = recvd;
|
|
continue;
|
|
}
|
|
|
|
// FIXME/cjones: handle error
|
|
RPC_ASSERT(
|
|
recvd.is_reply_error() ||
|
|
(recvd.type() == (outcall.type()+1) &&
|
|
recvd.seqno() == outcall.seqno()),
|
|
"somebody's misbehavin'", "rpc", true);
|
|
|
|
// we received a reply to our most recent outstanding
|
|
// call. pop this frame and return the reply
|
|
mStack.pop();
|
|
|
|
bool isError = recvd.is_reply_error();
|
|
if (!isError) {
|
|
*reply = recvd;
|
|
}
|
|
|
|
if (0 == StackDepth()) {
|
|
RPC_ASSERT(
|
|
mOutOfTurnReplies.empty(),
|
|
"still have pending replies with no pending out-calls",
|
|
"rpc", true);
|
|
}
|
|
|
|
// finished with this RPC stack frame
|
|
return !isError;
|
|
}
|
|
|
|
// in-call. process in a new stack frame.
|
|
|
|
// "snapshot" the current stack depth while we own the Monitor
|
|
size_t stackDepth = StackDepth();
|
|
{
|
|
MonitorAutoUnlock unlock(*mMonitor);
|
|
// someone called in to us from the other side. handle the call
|
|
CxxStackFrame f(*this, IN_MESSAGE, &recvd);
|
|
Incall(recvd, stackDepth);
|
|
// FIXME/cjones: error handling
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
void
|
|
RPCChannel::MaybeUndeferIncall()
|
|
{
|
|
AssertWorkerThread();
|
|
mMonitor->AssertCurrentThreadOwns();
|
|
|
|
if (mDeferred.empty())
|
|
return;
|
|
|
|
size_t stackDepth = StackDepth();
|
|
|
|
// the other side can only *under*-estimate our actual stack depth
|
|
RPC_ASSERT(mDeferred.top().rpc_remote_stack_depth_guess() <= stackDepth,
|
|
"fatal logic error");
|
|
|
|
if (mDeferred.top().rpc_remote_stack_depth_guess() < RemoteViewOfStackDepth(stackDepth))
|
|
return;
|
|
|
|
// maybe time to process this message
|
|
Message call = mDeferred.top();
|
|
mDeferred.pop();
|
|
|
|
// fix up fudge factor we added to account for race
|
|
RPC_ASSERT(0 < mRemoteStackDepthGuess, "fatal logic error");
|
|
--mRemoteStackDepthGuess;
|
|
|
|
mPending.push_back(call);
|
|
}
|
|
|
|
void
|
|
RPCChannel::EnqueuePendingMessages()
|
|
{
|
|
AssertWorkerThread();
|
|
mMonitor->AssertCurrentThreadOwns();
|
|
|
|
MaybeUndeferIncall();
|
|
|
|
for (size_t i = 0; i < mDeferred.size(); ++i) {
|
|
mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
|
|
}
|
|
|
|
// XXX performance tuning knob: could process all or k pending
|
|
// messages here, rather than enqueuing for later processing
|
|
|
|
size_t total = mPending.size() + mUrgent.size() + mNonUrgentDeferred.size();
|
|
for (size_t i = 0; i < total; ++i) {
|
|
mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
|
|
}
|
|
}
|
|
|
|
void
|
|
RPCChannel::FlushPendingRPCQueue()
|
|
{
|
|
AssertWorkerThread();
|
|
mMonitor->AssertNotCurrentThreadOwns();
|
|
|
|
{
|
|
MonitorAutoLock lock(*mMonitor);
|
|
|
|
if (mDeferred.empty()) {
|
|
if (mPending.empty())
|
|
return;
|
|
|
|
const Message& last = mPending.back();
|
|
if (!last.is_rpc() || last.is_reply())
|
|
return;
|
|
}
|
|
}
|
|
|
|
while (OnMaybeDequeueOne());
|
|
}
|
|
|
|
bool
|
|
RPCChannel::OnMaybeDequeueOne()
|
|
{
|
|
// XXX performance tuning knob: could process all or k pending
|
|
// messages here
|
|
|
|
AssertWorkerThread();
|
|
mMonitor->AssertNotCurrentThreadOwns();
|
|
|
|
Message recvd;
|
|
{
|
|
MonitorAutoLock lock(*mMonitor);
|
|
|
|
if (!Connected()) {
|
|
ReportConnectionError("RPCChannel");
|
|
return false;
|
|
}
|
|
|
|
if (!mDeferred.empty())
|
|
MaybeUndeferIncall();
|
|
|
|
MessageQueue *queue = mUrgent.empty()
|
|
? mNonUrgentDeferred.empty()
|
|
? &mPending
|
|
: &mNonUrgentDeferred
|
|
: &mUrgent;
|
|
if (queue->empty())
|
|
return false;
|
|
|
|
recvd = queue->front();
|
|
queue->pop_front();
|
|
}
|
|
|
|
if (IsOnCxxStack() && recvd.is_rpc() && recvd.is_reply()) {
|
|
// We probably just received a reply in a nested loop for an
|
|
// RPC call sent before entering that loop.
|
|
mOutOfTurnReplies[recvd.seqno()] = recvd;
|
|
return false;
|
|
}
|
|
|
|
CxxStackFrame f(*this, IN_MESSAGE, &recvd);
|
|
|
|
if (recvd.is_rpc())
|
|
Incall(recvd, 0);
|
|
else if (recvd.is_sync())
|
|
SyncChannel::OnDispatchMessage(recvd);
|
|
else
|
|
AsyncChannel::OnDispatchMessage(recvd);
|
|
|
|
return true;
|
|
}
|
|
|
|
size_t
|
|
RPCChannel::RemoteViewOfStackDepth(size_t stackDepth) const
|
|
{
|
|
AssertWorkerThread();
|
|
return stackDepth - mOutOfTurnReplies.size();
|
|
}
|
|
|
|
void
|
|
RPCChannel::Incall(const Message& call, size_t stackDepth)
|
|
{
|
|
AssertWorkerThread();
|
|
mMonitor->AssertNotCurrentThreadOwns();
|
|
RPC_ASSERT(call.is_rpc() && !call.is_reply(), "wrong message type");
|
|
|
|
// Race detection: see the long comment near
|
|
// mRemoteStackDepthGuess in RPCChannel.h. "Remote" stack depth
|
|
// means our side, and "local" means other side.
|
|
if (call.rpc_remote_stack_depth_guess() != RemoteViewOfStackDepth(stackDepth)) {
|
|
// RPC in-calls have raced.
|
|
// the "winner", if there is one, gets to defer processing of
|
|
// the other side's in-call
|
|
bool defer;
|
|
const char* winner;
|
|
switch (Listener()->MediateRPCRace(mChild ? call : mStack.top(),
|
|
mChild ? mStack.top() : call)) {
|
|
case RRPChildWins:
|
|
winner = "child";
|
|
defer = mChild;
|
|
break;
|
|
case RRPParentWins:
|
|
winner = "parent";
|
|
defer = !mChild;
|
|
break;
|
|
case RRPError:
|
|
NS_RUNTIMEABORT("NYI: 'Error' RPC race policy");
|
|
return;
|
|
default:
|
|
NS_RUNTIMEABORT("not reached");
|
|
return;
|
|
}
|
|
|
|
if (LoggingEnabled()) {
|
|
printf_stderr(" (%s: %s won, so we're%sdeferring)\n",
|
|
mChild ? "child" : "parent", winner,
|
|
defer ? " " : " not ");
|
|
}
|
|
|
|
if (defer) {
|
|
// we now know the other side's stack has one more frame
|
|
// than we thought
|
|
++mRemoteStackDepthGuess; // decremented in MaybeProcessDeferred()
|
|
mDeferred.push(call);
|
|
return;
|
|
}
|
|
|
|
// we "lost" and need to process the other side's in-call.
|
|
// don't need to fix up the mRemoteStackDepthGuess here,
|
|
// because we're just about to increment it in DispatchCall(),
|
|
// which will make it correct again
|
|
}
|
|
|
|
#ifdef OS_WIN
|
|
SyncStackFrame frame(this, true);
|
|
#endif
|
|
|
|
DispatchIncall(call);
|
|
}
|
|
|
|
void
|
|
RPCChannel::DispatchIncall(const Message& call)
|
|
{
|
|
AssertWorkerThread();
|
|
mMonitor->AssertNotCurrentThreadOwns();
|
|
RPC_ASSERT(call.is_rpc() && !call.is_reply(),
|
|
"wrong message type");
|
|
|
|
Message* reply = nullptr;
|
|
|
|
++mRemoteStackDepthGuess;
|
|
Result rv = Listener()->OnCallReceived(call, reply);
|
|
--mRemoteStackDepthGuess;
|
|
|
|
if (!MaybeHandleError(rv, "RPCChannel")) {
|
|
delete reply;
|
|
reply = new Message();
|
|
reply->set_rpc();
|
|
reply->set_reply();
|
|
reply->set_reply_error();
|
|
}
|
|
|
|
reply->set_seqno(call.seqno());
|
|
|
|
{
|
|
MonitorAutoLock lock(*mMonitor);
|
|
if (ChannelConnected == mChannelState)
|
|
mLink->SendMessage(reply);
|
|
}
|
|
}
|
|
|
|
void
|
|
RPCChannel::ExitedCxxStack()
|
|
{
|
|
Listener()->OnExitedCxxStack();
|
|
if (mSawRPCOutMsg) {
|
|
MonitorAutoLock lock(*mMonitor);
|
|
// see long comment in OnMaybeDequeueOne()
|
|
EnqueuePendingMessages();
|
|
mSawRPCOutMsg = false;
|
|
}
|
|
}
|
|
|
|
void
|
|
RPCChannel::DebugAbort(const char* file, int line, const char* cond,
|
|
const char* why,
|
|
const char* type, bool reply) const
|
|
{
|
|
printf_stderr("###!!! [RPCChannel][%s][%s:%d] "
|
|
"Assertion (%s) failed. %s (triggered by %s%s)\n",
|
|
mChild ? "Child" : "Parent",
|
|
file, line, cond,
|
|
why,
|
|
type, reply ? "reply" : "");
|
|
// technically we need the mutex for this, but we're dying anyway
|
|
DumpRPCStack(" ");
|
|
printf_stderr(" remote RPC stack guess: %lu\n",
|
|
mRemoteStackDepthGuess);
|
|
printf_stderr(" deferred stack size: %lu\n",
|
|
mDeferred.size());
|
|
printf_stderr(" out-of-turn RPC replies stack size: %lu\n",
|
|
mOutOfTurnReplies.size());
|
|
printf_stderr(" Pending queue size: %lu, front to back:\n",
|
|
mPending.size());
|
|
|
|
MessageQueue pending = mPending;
|
|
while (!pending.empty()) {
|
|
printf_stderr(" [ %s%s ]\n",
|
|
pending.front().is_rpc() ? "rpc" :
|
|
(pending.front().is_sync() ? "sync" : "async"),
|
|
pending.front().is_reply() ? "reply" : "");
|
|
pending.pop_front();
|
|
}
|
|
|
|
NS_RUNTIMEABORT(why);
|
|
}
|
|
|
|
void
|
|
RPCChannel::DumpRPCStack(const char* const pfx) const
|
|
{
|
|
NS_WARN_IF_FALSE(MessageLoop::current() != mWorkerLoop,
|
|
"The worker thread had better be paused in a debugger!");
|
|
|
|
printf_stderr("%sRPCChannel 'backtrace':\n", pfx);
|
|
|
|
// print a python-style backtrace, first frame to last
|
|
for (uint32_t i = 0; i < mCxxStackFrames.size(); ++i) {
|
|
int32_t id;
|
|
const char* dir, *sems, *name;
|
|
mCxxStackFrames[i].Describe(&id, &dir, &sems, &name);
|
|
|
|
printf_stderr("%s[(%u) %s %s %s(actor=%d) ]\n", pfx,
|
|
i, dir, sems, name, id);
|
|
}
|
|
}
|
|
|
|
//
|
|
// The methods below run in the context of the link thread, and can proxy
|
|
// back to the methods above
|
|
//
|
|
|
|
void
|
|
RPCChannel::OnMessageReceivedFromLink(const Message& msg)
|
|
{
|
|
AssertLinkThread();
|
|
mMonitor->AssertCurrentThreadOwns();
|
|
|
|
if (MaybeInterceptSpecialIOMessage(msg))
|
|
return;
|
|
|
|
// regardless of the RPC stack, if we're awaiting a sync reply, we
|
|
// know that it needs to be immediately handled to unblock us.
|
|
if (AwaitingSyncReply() && msg.is_sync()) {
|
|
// wake up worker thread waiting at SyncChannel::Send
|
|
mRecvd = msg;
|
|
NotifyWorkerThread();
|
|
return;
|
|
}
|
|
|
|
MessageQueue *queue = (msg.priority() == IPC::Message::PRIORITY_HIGH)
|
|
? &mUrgent
|
|
: &mPending;
|
|
|
|
bool compressMessage = (msg.compress() && !queue->empty() &&
|
|
queue->back().type() == msg.type() &&
|
|
queue->back().routing_id() == msg.routing_id());
|
|
if (compressMessage) {
|
|
// This message type has compression enabled, and the back of
|
|
// the queue was the same message type and routed to the same
|
|
// destination. Replace it with the newer message.
|
|
MOZ_ASSERT(queue->back().compress());
|
|
queue->pop_back();
|
|
}
|
|
|
|
queue->push_back(msg);
|
|
|
|
// There are three cases we're concerned about, relating to the state of
|
|
// the main thread:
|
|
//
|
|
// (1) We are waiting on a sync reply - main thread is blocked on the IPC monitor.
|
|
// - If the message is high priority, we wake up the main thread to
|
|
// deliver the message. Otherwise, we leave it in the mPending queue,
|
|
// posting a task to the main event loop, where it will be processed
|
|
// once the synchronous reply has been received.
|
|
//
|
|
// (2) We are waiting on an RPC reply - main thread is blocked on the IPC monitor.
|
|
// - Always wake up the main thread to deliver the message.
|
|
//
|
|
// (3) We are not waiting on a reply.
|
|
// - We post a task to the main event loop.
|
|
//
|
|
bool waiting_rpc = (0 != StackDepth());
|
|
bool urgent = (msg.priority() == IPC::Message::PRIORITY_HIGH);
|
|
|
|
if (waiting_rpc || (AwaitingSyncReply() && urgent)) {
|
|
// Always wake up our RPC waiter, and wake up sync waiters for urgent
|
|
// messages.
|
|
NotifyWorkerThread();
|
|
} else {
|
|
// Worker thread is either not blocked on a reply, or this is an
|
|
// incoming RPC that raced with outgoing sync and needs to be deferred
|
|
// to a later event-loop iteration.
|
|
if (!compressMessage) {
|
|
// If we compressed away the previous message, we'll reuse
|
|
// its pending task.
|
|
mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
|
|
}
|
|
}
|
|
}
|
|
|
|
void
|
|
RPCChannel::OnChannelErrorFromLink()
|
|
{
|
|
AssertLinkThread();
|
|
mMonitor->AssertCurrentThreadOwns();
|
|
|
|
if (0 < StackDepth())
|
|
NotifyWorkerThread();
|
|
|
|
SyncChannel::OnChannelErrorFromLink();
|
|
}
|
|
|
|
} // namespace ipc
|
|
} // namespace mozilla
|
|
|