fix bugs in RPC race resolution

This commit is contained in:
Chris Jones 2009-10-09 01:21:39 -05:00
parent f71ad44292
commit a0cb461b15
6 changed files with 273 additions and 381 deletions

View File

@ -115,14 +115,20 @@ bool
AsyncChannel::Send(Message* msg)
{
AssertWorkerThread();
mMutex.AssertNotCurrentThreadOwns();
NS_ABORT_IF_FALSE(MSG_ROUTING_NONE != msg->routing_id(), "need a route");
if (!Connected())
// trying to Send() to a closed or error'd channel
return false;
{
MutexAutoLock lock(mMutex);
if (!Connected())
// trying to Send() to a closed or error'd channel
return false;
mIOLoop->PostTask(FROM_HERE,
NewRunnableMethod(this, &AsyncChannel::OnSend, msg));
}
mIOLoop->PostTask(FROM_HERE,
NewRunnableMethod(this, &AsyncChannel::OnSend, msg));
return true;
}
@ -163,20 +169,18 @@ AsyncChannel::OnMessageReceived(const Message& msg)
NS_ASSERTION(mChannelState != ChannelError, "Shouldn't get here!");
// wake up the worker, there's work to do
mWorkerLoop->PostTask(FROM_HERE,
NewRunnableMethod(this,
&AsyncChannel::OnDispatchMessage,
msg));
mWorkerLoop->PostTask(
FROM_HERE,
NewRunnableMethod(this, &AsyncChannel::OnDispatchMessage, msg));
}
void
AsyncChannel::OnChannelConnected(int32 peer_pid)
{
AssertIOThread();
MutexAutoLock lock(mMutex);
mChannelState = ChannelConnected;
mCvar.Notify();
}
@ -184,7 +188,11 @@ void
AsyncChannel::OnChannelError()
{
AssertIOThread();
mChannelState = ChannelError;
{
MutexAutoLock lock(mMutex);
mChannelState = ChannelError;
}
if (XRE_GetProcessType() == GeckoProcessType_Default) {
// Parent process, one of our children died. Notify?

View File

@ -128,26 +128,25 @@ protected:
// Can be run on either thread
void AssertWorkerThread()
{
if (mWorkerLoop != MessageLoop::current()) {
NS_ERROR("not on worker thread!");
}
NS_ABORT_IF_FALSE(mWorkerLoop == MessageLoop::current(),
"not on worker thread!");
}
void AssertIOThread()
{
if (mIOLoop != MessageLoop::current()) {
NS_ERROR("not on IO thread!");
}
NS_ABORT_IF_FALSE(mIOLoop == MessageLoop::current(),
"not on IO thread!");
}
bool Connected() {
mMutex.AssertCurrentThreadOwns();
return ChannelConnected == mChannelState;
}
// Additional methods that execute on the worker thread
// Run on the worker thread
void OnDispatchMessage(const Message& aMsg);
// Additional methods that execute on the IO thread
// Run on the IO thread
void OnChannelOpened();
void OnSend(Message* aMsg);

View File

@ -42,6 +42,12 @@
#include "nsDebug.h"
#define RPC_ASSERT(_cond, ...) \
do { \
if (!(_cond)) \
DebugAbort(__FILE__, __LINE__, #_cond,## __VA_ARGS__); \
} while (0)
using mozilla::MutexAutoLock;
using mozilla::MutexAutoUnlock;
@ -59,30 +65,29 @@ bool
RPCChannel::Call(Message* msg, Message* reply)
{
AssertWorkerThread();
NS_ABORT_IF_FALSE(!ProcessingSyncMessage(),
"violation of sync handler invariant");
NS_ABORT_IF_FALSE(msg->is_rpc(),
"can only Call() RPC messages here");
RPC_ASSERT(!ProcessingSyncMessage(),
"violation of sync handler invariant");
RPC_ASSERT(msg->is_rpc(), "can only Call() RPC messages here");
MutexAutoLock lock(mMutex);
if (!Connected())
// trying to Send() to a closed or error'd channel
// trying to Call() on a closed or error'd channel
return false;
mStack.push(*msg);
msg->set_rpc_remote_stack_depth_guess(mRemoteStackDepthGuess);
msg->set_rpc_local_stack_depth(StackDepth());
// bypass |SyncChannel::Send| b/c RPCChannel implements its own
// waiting semantics
AsyncChannel::Send(msg);
mIOLoop->PostTask(
FROM_HERE,
NewRunnableMethod(this, &RPCChannel::OnSend, msg));
while (1) {
// here we're waiting for something to happen. see long
// comment about the queue in RPCChannel.h
while (Connected() && mPending.empty()) {
mCvar.Wait();
WaitForNotify();
}
if (!Connected())
@ -92,37 +97,31 @@ RPCChannel::Call(Message* msg, Message* reply)
Message recvd = mPending.front();
mPending.pop();
// async message. process it, go back to waiting
if (!recvd.is_sync() && !recvd.is_rpc()) {
MutexAutoUnlock unlock(mMutex);
AsyncChannel::OnDispatchMessage(recvd);
continue;
}
// something sync. Let the sync dispatcher take care of it
// (it may be an invalid message, but the sync handler will
// check that).
if (recvd.is_sync()) {
RPC_ASSERT(mPending.empty(), "other side is malfunctioning");
RPC_ASSERT(mPending.empty(),
"other side should have been blocked");
MutexAutoUnlock unlock(mMutex);
SyncChannel::OnDispatchMessage(recvd);
continue;
}
// from here on, we know that recvd.is_rpc()
NS_ABORT_IF_FALSE(recvd.is_rpc(), "wtf???");
// reply message
if (recvd.is_reply()) {
RPC_ASSERT(0 < mStack.size(), "invalid RPC stack");
const Message& outcall = mStack.top();
// FIXME/cjones: handle error
RPC_ASSERT(recvd.type() == (outcall.type()+1) || recvd.is_reply_error(),
"somebody's misbehavin'", "rpc", true);
RPC_ASSERT(
recvd.type() == (outcall.type()+1) || recvd.is_reply_error(),
"somebody's misbehavin'", "rpc", true);
// we received a reply to our most recent outstanding
// call. pop this frame and return the reply
@ -133,79 +132,31 @@ RPCChannel::Call(Message* msg, Message* reply)
*reply = recvd;
}
if (0 == StackDepth()) {
// this was the last outcall we were waiting on.
// flush the pending queue into the "regular" event
// queue, checking invariants along the way. see long
// comment in RPCChannel.h
bool seenBlocker = false;
// the stack depth just shrunk, so now might be the time
// to process a message deferred because of race
// resolution
MaybeProcessDeferredIncall();
// A<* (S< | C<)
while (!mPending.empty()) {
Message m = mPending.front();
mPending.pop();
if (0 == StackDepth())
// we may have received new messages while waiting for
// our reply. because we were awaiting a reply,
// StackDepth > 0, and the IO thread didn't enqueue
// OnMaybeDequeueOne() events for us. so to avoid
// "losing" the new messages, we do that now.
EnqueuePendingMessages();
if (m.is_sync()) {
RPC_ASSERT(!seenBlocker,
"other side is malfunctioning",
"sync", m.is_reply());
seenBlocker = true;
MessageLoop::current()->PostTask(
FROM_HERE,
NewRunnableMethod(this,
&RPCChannel::OnDelegate, m));
}
else if (m.is_rpc()) {
RPC_ASSERT(!seenBlocker,
"other side is malfunctioning",
"rpc", m.is_reply());
seenBlocker = true;
MessageLoop::current()->PostTask(
FROM_HERE,
NewRunnableMethod(this,
&RPCChannel::OnIncall,
m));
}
else {
MessageLoop::current()->PostTask(
FROM_HERE,
NewRunnableMethod(this,
&RPCChannel::OnDelegate, m));
}
}
}
else {
// shouldn't have queued any more messages, since
// the other side is now supposed to be blocked on a
// reply from us!
RPC_ASSERT(mPending.empty(),
"other side should have been blocked");
}
// unlocks mMutex
// finished with this RPC stack frame
return !isError;
}
// in-call. process in a new stack frame. the other side
// should be blocked on us, hence an empty queue, except for
// the case where this side "won" an RPC race and the other
// side already replied back
RPC_ASSERT(mPending.empty()
|| (1 == mPending.size()
&& mPending.front().is_rpc()
&& mPending.front().is_reply()
&& 1 == StackDepth()),
"other side is malfunctioning", "rpc");
// in-call. process in a new stack frame.
// "snapshot" the current stack depth while we own the Mutex
size_t stackDepth = StackDepth();
{
MutexAutoUnlock unlock(mMutex);
// someone called in to us from the other side. handle the call
ProcessIncall(recvd, stackDepth);
Incall(recvd, stackDepth);
// FIXME/cjones: error handling
}
}
@ -214,64 +165,85 @@ RPCChannel::Call(Message* msg, Message* reply)
}
void
RPCChannel::OnDelegate(const Message& msg)
RPCChannel::MaybeProcessDeferredIncall()
{
AssertWorkerThread();
if (msg.is_sync())
return SyncChannel::OnDispatchMessage(msg);
else if (!msg.is_rpc())
return AsyncChannel::OnDispatchMessage(msg);
RPC_ASSERT(0, "fatal logic error");
mMutex.AssertCurrentThreadOwns();
if (mDeferred.empty())
return;
size_t stackDepth = StackDepth();
// the other side can only *under*-estimate our actual stack depth
RPC_ASSERT(mDeferred.top().rpc_remote_stack_depth_guess() <= stackDepth,
"fatal logic error");
if (mDeferred.top().rpc_remote_stack_depth_guess() < stackDepth)
return;
// time to process this message
Message call = mDeferred.top();
mDeferred.pop();
// fix up fudge factor we added to account for race
RPC_ASSERT(0 < mRemoteStackDepthGuess, "fatal logic error");
--mRemoteStackDepthGuess;
MutexAutoUnlock unlock(mMutex);
fprintf(stderr, " (processing deferred in-call)\n");
Incall(call, stackDepth);
}
void
RPCChannel::EnqueuePendingMessages()
{
// XXX performance tuning knob: could process all or k pending
// messages here, rather than enqueuing for later processing
AssertWorkerThread();
mMutex.AssertCurrentThreadOwns();
for (size_t i = 0; i < mPending.size(); ++i)
mWorkerLoop->PostTask(
FROM_HERE,
NewRunnableMethod(this, &RPCChannel::OnMaybeDequeueOne));
}
void
RPCChannel::OnMaybeDequeueOne()
{
// XXX performance tuning knob: could process all or k pending
// messages here
AssertWorkerThread();
Message recvd;
mMutex.AssertNotCurrentThreadOwns();
Message recvd;
{
MutexAutoLock lock(mMutex);
if (mPending.empty())
return;
RPC_ASSERT(mPending.size() == 1, "should only have one msg");
RPC_ASSERT(mPending.front().is_rpc() || mPending.front().is_sync(),
"msg should be RPC or sync", "async");
recvd = mPending.front();
mPending.pop();
}
return recvd.is_sync() ?
SyncChannel::OnDispatchMessage(recvd)
: RPCChannel::OnIncall(recvd);
if (recvd.is_rpc())
return Incall(recvd, 0);
else if (recvd.is_sync())
return SyncChannel::OnDispatchMessage(recvd);
else
return AsyncChannel::OnDispatchMessage(recvd);
}
void
RPCChannel::OnIncall(const Message& call)
{
AssertWorkerThread();
// We only reach here from the "regular" event loop, when
// StackDepth() == 0. That's the "snapshot" of the state of the
// RPCChannel we use when processing this message.
ProcessIncall(call, 0);
}
void
RPCChannel::OnDeferredIncall(const Message& call)
{
AssertWorkerThread();
ProcessIncall(call, 0);
mRemoteStackDepthGuess = 0; // see the race detector code below
}
void
RPCChannel::ProcessIncall(const Message& call, size_t stackDepth)
RPCChannel::Incall(const Message& call, size_t stackDepth)
{
AssertWorkerThread();
mMutex.AssertNotCurrentThreadOwns();
NS_ABORT_IF_FALSE(call.is_rpc(),
"should have been handled by SyncChannel");
RPC_ASSERT(call.is_rpc() && !call.is_reply(), "wrong message type");
// Race detection: see the long comment near
// mRemoteStackDepthGuess in RPCChannel.h. "Remote" stack depth
@ -279,18 +251,12 @@ RPCChannel::ProcessIncall(const Message& call, size_t stackDepth)
if (call.rpc_remote_stack_depth_guess() != stackDepth) {
NS_WARNING("RPC in-calls have raced!");
// assumption: at this point, we have one call on our stack,
// as does the other side. But both we and the other side
// think that the opposite side *doesn't* have any calls on
// its stack. We need to verify this because race resolution
// depends on this fact.
if (!((1 == stackDepth && 0 == mRemoteStackDepthGuess)
&& (1 == call.rpc_local_stack_depth()
&& 0 == call.rpc_remote_stack_depth_guess())))
// TODO this /could/ be construed as evidence of a
// misbehaving process, so should probably go through
// regular error-handling channels
RPC_ASSERT(0, "fatal logic error");
RPC_ASSERT(call.rpc_remote_stack_depth_guess() < stackDepth,
"fatal logic error");
RPC_ASSERT(1 == (stackDepth - call.rpc_remote_stack_depth_guess()),
"got more than 1 RPC message out of sync???");
RPC_ASSERT(1 == (call.rpc_local_stack_depth() -mRemoteStackDepthGuess),
"RPC unexpected not symmetric");
// the "winner", if there is one, gets to defer processing of
// the other side's in-call
@ -313,21 +279,34 @@ RPCChannel::ProcessIncall(const Message& call, size_t stackDepth)
return;
}
printf(" (%s won, so we're%sdeferring)\n",
winner, defer ? " " : " not ");
fprintf(stderr, " (%s won, so we're%sdeferring)\n",
winner, defer ? " " : " not ");
if (defer) {
// we now know there's one frame on the other side's stack
mRemoteStackDepthGuess = 1;
mWorkerLoop->PostTask(
FROM_HERE,
NewRunnableMethod(this, &RPCChannel::OnDeferredIncall, call));
// we now know the other side's stack has one more frame
// than we thought
++mRemoteStackDepthGuess; // decremented in MaybeProcessDeferred()
mDeferred.push(call);
return;
}
// we "lost" and need to process the other side's in-call
// we "lost" and need to process the other side's in-call.
// don't need to fix up the mRemoteStackDepthGuess here,
// because we're just about to increment it in DispatchCall(),
// which will make it correct again
}
DispatchIncall(call);
}
void
RPCChannel::DispatchIncall(const Message& call)
{
AssertWorkerThread();
mMutex.AssertNotCurrentThreadOwns();
RPC_ASSERT(call.is_rpc() && !call.is_reply(),
"wrong message type");
Message* reply = nsnull;
++mRemoteStackDepthGuess;
@ -344,6 +323,7 @@ RPCChannel::ProcessIncall(const Message& call, size_t stackDepth)
case MsgPayloadError:
case MsgRouteError:
case MsgValueError:
NS_WARNING("[RPCChannel] error processing message!");
delete reply;
reply = new Message();
reply->set_rpc();
@ -357,11 +337,42 @@ RPCChannel::ProcessIncall(const Message& call, size_t stackDepth)
return;
}
mIOLoop->PostTask(FROM_HERE,
NewRunnableMethod(this,
&RPCChannel::OnSendReply,
reply));
mIOLoop->PostTask(
FROM_HERE,
NewRunnableMethod(this, &RPCChannel::OnSend, reply));
}
void
RPCChannel::DebugAbort(const char* file, int line, const char* cond,
const char* why,
const char* type, bool reply)
{
fprintf(stderr,
"[RPCChannel][%s][%s:%d] "
"Assertion (%s) failed. %s (triggered by %s%s)\n",
mChild ? "Child" : "Parent",
file, line, cond,
why,
type, reply ? "reply" : "");
// technically we need the mutex for this, but we're dying anyway
fprintf(stderr, " local RPC stack size: %lu\n",
mStack.size());
fprintf(stderr, " remote RPC stack guess: %lu\n",
mRemoteStackDepthGuess);
fprintf(stderr, " deferred stack size: %lu\n",
mDeferred.size());
fprintf(stderr, " Pending queue size: %lu, front to back:\n",
mPending.size());
while (!mPending.empty()) {
fprintf(stderr, " [ %s%s ]\n",
mPending.front().is_rpc() ? "rpc" :
(mPending.front().is_sync() ? "sync" : "async"),
mPending.front().is_reply() ? "reply" : "");
mPending.pop();
}
NS_RUNTIMEABORT(why);
}
//
@ -377,122 +388,22 @@ RPCChannel::OnMessageReceived(const Message& msg)
// regardless of the RPC stack, if we're awaiting a sync reply, we
// know that it needs to be immediately handled to unblock us.
// The SyncChannel will check that msg is a reply, and the right
// kind of reply, then do its thing.
if (AwaitingSyncReply()
&& msg.is_sync()) {
// wake up worker thread (at SyncChannel::Send) awaiting
// this reply
if (AwaitingSyncReply() && msg.is_sync()) {
// wake up worker thread waiting at SyncChannel::Send
mRecvd = msg;
mCvar.Notify();
NotifyWorkerThread();
return;
}
// otherwise, we handle sync/async/rpc messages differently depending
// on whether the RPC channel is idle
mPending.push(msg);
if (0 == StackDepth()) {
// we're idle wrt to the RPC layer, and this message could be
// async, sync, or rpc.
// async message: delegate, doesn't affect anything
if (!msg.is_sync() && !msg.is_rpc()) {
MutexAutoUnlock unlock(mMutex);
return AsyncChannel::OnMessageReceived(msg);
}
// NB: the interaction between this and SyncChannel is rather
// subtle; we have to handle a fairly nasty race condition.
// The other side may send us a sync message at any time. If
// we receive it here while the worker thread is processing an
// event that will eventually send an RPC message (or will
// send an RPC message while processing any event enqueued
// before the sync message was received), then if we tried to
// enqueue the sync message in the worker's event queue, it
// would get "lost": the worker would block on the RPC reply
// without seeing the sync request, and we'd deadlock.
//
// So to avoid this case, when the RPC channel is idle and
// receives a sync request, it puts the request in the special
// RPC message queue, and asks the worker thread to process a
// task that might end up dequeuing that RPC message. The
// task *might not* dequeue a sync request --- this might
// occur if the event the worker is currently processing sends
// an RPC message. If that happens, the worker will go into
// its "wait loop" for the RPC response, and immediately
// dequeue and process the sync request.
if (msg.is_sync()) {
mPending.push(msg);
mWorkerLoop->PostTask(
FROM_HERE,
NewRunnableMethod(this, &RPCChannel::OnMaybeDequeueOne));
return;
}
// OK: the RPC channel is idle, and we received an in-call.
// wake up the worker thread.
//
// Because of RPC race resolution, there's another sublety
// here. We can't enqueue an OnInCall() event directly,
// because while this code is executing, the worker thread
// concurrently might be making (or preparing to make) an
// out-call. If so, and race resolution is ParentWins or
// ChildWins, and this side is the "losing" side, then this
// side needs to "unblock" and process the new in-call. If
// the in-call were to go into the main event queue, then it
// would be lost. So it needs to go into mPending queue along
// with a OnMaybeDequeueOne() event. The OnMaybeDequeueOne()
// event handles the non-racy case.
NS_ABORT_IF_FALSE(msg.is_rpc(), "should be RPC");
mPending.push(msg);
if (0 == StackDepth())
// the worker thread might be idle, make sure it wakes up
mWorkerLoop->PostTask(
FROM_HERE,
NewRunnableMethod(this, &RPCChannel::OnMaybeDequeueOne));
}
else {
// we're waiting on an RPC reply
// NB some logic here is duplicated with SyncChannel. this is
// to allow more local reasoning
// NBB see the second-to-last long comment in RPCChannel.h
// describing legal queue states
// if we're waiting on a sync reply, and this message is sync,
// dispatch it to the sync message handler.
//
// since we're waiting on an RPC answer in an older stack
// frame, we know we'll eventually pop back to the
// RPCChannel::Call frame where we're awaiting the RPC reply.
// so the queue won't be forgotten!
// waiting on a sync reply, but got an async message. that's OK,
// but we defer processing of it until the sync reply comes in.
if (AwaitingSyncReply()
&& !msg.is_sync() && !msg.is_rpc()) {
mPending.push(msg);
return;
}
// if this side and the other were functioning correctly, we'd
// never reach this case. RPCChannel::Call explicitly checks
// for and disallows this case. so if we reach here, the other
// side is malfunctioning (compromised?).
RPC_ASSERT(!AwaitingSyncReply(),
"the other side is malfunctioning",
"rpc", msg.is_reply());
// otherwise, we (legally) either got (i) async msg; (ii) sync
// in-msg; (iii) re-entrant rpc in-call; (iv) rpc reply we
// were awaiting. Dispatch to the worker, where invariants
// are checked and the message processed.
mPending.push(msg);
mCvar.Notify();
}
else
NotifyWorkerThread();
}
@ -506,9 +417,8 @@ RPCChannel::OnChannelError()
mChannelState = ChannelError;
if (AwaitingSyncReply()
|| 0 < StackDepth()) {
mCvar.Notify();
}
|| 0 < StackDepth())
NotifyWorkerThread();
}
// skip SyncChannel::OnError(); we subsume its duties
@ -519,3 +429,4 @@ RPCChannel::OnChannelError()
} // namespace ipc
} // namespace mozilla

View File

@ -74,6 +74,8 @@ public:
RPCChannel(RPCListener* aListener, RacyRPCPolicy aPolicy=RRPChildWins) :
SyncChannel(aListener),
mPending(),
mStack(),
mDeferred(),
mRemoteStackDepthGuess(0),
mRacePolicy(aPolicy)
{
@ -87,26 +89,20 @@ public:
// Make an RPC to the other side of the channel
bool Call(Message* msg, Message* reply);
// Override the SyncChannel handler so we can dispatch RPC messages
// Override the SyncChannel handler so we can dispatch RPC
// messages. Called on the IO thread only.
NS_OVERRIDE virtual void OnMessageReceived(const Message& msg);
NS_OVERRIDE virtual void OnChannelError();
protected:
// Only exists because we can't schedule SyncChannel::OnDispatchMessage
// or AsyncChannel::OnDispatchMessage from within Call() when we flush
// the pending queue
void OnDelegate(const Message& msg);
// There's a fairly subtle race condition that arises between
// processing an event on this side that ends up sending an RPC
// message, while receiving a sync message from the other side.
// See the long comment in RPCChannel.cpp, near line 300.
void OnMaybeDequeueOne();
private:
void OnIncall(const Message& msg);
void OnDeferredIncall(const Message& msg);
void ProcessIncall(const Message& call, size_t stackDepth);
// Called on worker thread only
void MaybeProcessDeferredIncall();
void EnqueuePendingMessages();
void OnMaybeDequeueOne();
void Incall(const Message& call, size_t stackDepth);
void DispatchIncall(const Message& call);
// Called from both threads
size_t StackDepth() {
@ -114,40 +110,51 @@ private:
return mStack.size();
}
#define RPC_ASSERT(_cond, ...) \
do { \
if (!(_cond)) \
DebugAbort(__FILE__, __LINE__, #_cond,## __VA_ARGS__); \
} while (0)
void DebugAbort(const char* file, int line, const char* cond,
const char* why,
const char* type="rpc", bool reply=false)
{
fprintf(stderr,
"[RPCChannel][%s][%s:%d] "
"Assertion (%s) failed. %s (triggered by %s%s)\n",
mChild ? "Child" : "Parent",
file, line, cond,
why,
type, reply ? "reply" : "");
// technically we need the mutex for this, but we're dying anyway
fprintf(stderr, " local RPC stack size: %lu\n",
mStack.size());
fprintf(stderr, " remote RPC stack guess: %lu\n",
mRemoteStackDepthGuess);
fprintf(stderr, " Pending queue size: %lu, front to back:\n",
mPending.size());
while (!mPending.empty()) {
fprintf(stderr, " [ %s%s ]\n",
mPending.front().is_rpc() ? "rpc" :
(mPending.front().is_sync() ? "sync" : "async"),
mPending.front().is_reply() ? "reply" : "");
mPending.pop();
}
const char* type="rpc", bool reply=false);
NS_RUNTIMEABORT(why);
}
//
// Queue of all incoming messages, except for replies to sync
// messages, which are delivered directly to the SyncChannel
// through its mRecvd member.
//
// If both this side and the other side are functioning correctly,
// the queue can only be in certain configurations. Let
//
// |A<| be an async in-message,
// |S<| be a sync in-message,
// |C<| be an RPC in-call,
// |R<| be an RPC reply.
//
// The queue can only match this configuration
//
// A<* (S< | C< | R< (?{mStack.size() == 1} A<* (S< | C<)))
//
// The other side can send as many async messages |A<*| as it
// wants before sending us a blocking message.
//
// The first case is |S<|, a sync in-msg. The other side must be
// blocked, and thus can't send us any more messages until we
// process the sync in-msg.
//
// The second case is |C<|, an RPC in-call; the other side must be
// blocked. (There's a subtlety here: this in-call might have
// raced with an out-call, but we detect that with the mechanism
// below, |mRemoteStackDepth|, and races don't matter to the
// queue.)
//
// Final case, the other side replied to our most recent out-call
// |R<|. If that was the *only* out-call on our stack,
// |?{mStack.size() == 1}|, then other side "finished with us,"
// and went back to its own business. That business might have
// included sending any number of async message |A<*| until
// sending a blocking message |(S< | C<)|. If we had more than
// one RPC call on our stack, the other side *better* not have
// sent us another blocking message, because it's blocked on a
// reply from us.
//
std::queue<Message> mPending;
//
// Stack of all the RPC out-calls on which this RPCChannel is
@ -155,51 +162,11 @@ private:
//
std::stack<Message> mStack;
//
// After the worker thread is blocked on an RPC out-call
// (i.e. awaiting a reply), the IO thread uses this queue to
// transfer received messages to the worker thread for processing.
// If both this side and the other side are functioning correctly,
// the queue is only allowed to have certain configurations. Let
//
// |A<| be an async in-message,
// |S<| be a sync in-message,
// |C<| be an RPC in-call,
// |R<| be an RPC reply.
//
// After the worker thread wakes us up to process the queue,
// the queue can only match this configuration
//
// A<* (S< | C< | R< (?{mStack.size() == 1} A<* (S< | C<)))
//
// After we send an RPC message, the other side can send as many
// async messages |A<*| as it wants before sending back any other
// message type.
//
// The first "other message type" case is |S<|, a sync in-msg.
// The other side must be blocked, and thus can't send us any more
// messages until we process the sync in-msg.
//
// The second case is |C<|, an RPC in-call; the other side
// re-entered us while processing our out-call. It therefore must
// be blocked. (There's a subtlety here: this in-call might have
// raced with our out-call, but we detect that with the mechanism
// below, |mRemoteStackDepth|, and races don't matter to the
// queue.)
//
// Final case, the other side replied to our most recent out-call
// |R<|. If that was the *only* out-call on our stack, |{
// mStack.size() == 1}|, then other side "finished with us," and
// went back to its own business. That business might have
// included sending any number of async message |A<*| until
// sending a blocking message |(S< | C<)|. We just flush these to
// the event loop to process in order, it will do the Right Thing,
// since only the last message can be a blocking message.
// HOWEVER, if we had more than one RPC call on our stack, the
// other side *better* not have sent us another blocking message,
// because it's blocked on a reply from us.
//
std::queue<Message> mPending;
//
// Stack of RPC in-calls that were deferred because of race
// conditions.
//
std::stack<Message> mDeferred;
//
// This is what we think the RPC stack depth is on the "other
@ -213,7 +180,7 @@ private:
//
// Then when processing an in-call |c|, it must be true that
//
// mPending.size() == c.remoteDepth
// mStack.size() == c.remoteDepth
//
// i.e., my depth is actually the same as what the other side
// thought it was when it sent in-call |c|. If this fails to
@ -229,10 +196,7 @@ private:
// if one side detects a race, then the other side must also
// detect the same race.
//
// TODO: and when we detect a race, what should we actually *do* ... ?
//
size_t mRemoteStackDepthGuess;
RacyRPCPolicy mRacePolicy;
};

View File

@ -69,12 +69,12 @@ SyncChannel::Send(Message* msg, Message* reply)
return false;
mPendingReply = msg->type() + 1;
if (!AsyncChannel::Send(msg))
// FIXME more sophisticated error handling
return false;
mIOLoop->PostTask(
FROM_HERE,
NewRunnableMethod(this, &SyncChannel::OnSend, msg));
// wait for the next sync message to arrive
mCvar.Wait();
WaitForNotify();
if (!Connected())
// FIXME more sophisticated error handling
@ -133,10 +133,9 @@ SyncChannel::OnDispatchMessage(const Message& msg)
return;
}
mIOLoop->PostTask(FROM_HERE,
NewRunnableMethod(this,
&SyncChannel::OnSendReply,
reply));
mIOLoop->PostTask(
FROM_HERE,
NewRunnableMethod(this, &SyncChannel::OnSend, reply));
}
//
@ -163,7 +162,7 @@ SyncChannel::OnMessageReceived(const Message& msg)
else {
// let the worker know a new sync message has arrived
mRecvd = msg;
mCvar.Notify();
NotifyWorkerThread();
}
}
@ -177,18 +176,27 @@ SyncChannel::OnChannelError()
mChannelState = ChannelError;
if (AwaitingSyncReply()) {
mCvar.Notify();
NotifyWorkerThread();
}
}
return AsyncChannel::OnChannelError();
}
//
// Synchronization between worker and IO threads
//
void
SyncChannel::OnSendReply(Message* aReply)
SyncChannel::WaitForNotify()
{
AssertIOThread();
mTransport->Send(aReply);
mCvar.Wait();
}
void
SyncChannel::NotifyWorkerThread()
{
mCvar.Notify();
}

View File

@ -94,9 +94,11 @@ protected:
}
void OnDispatchMessage(const Message& aMsg);
void WaitForNotify();
// Executed on the IO thread.
void OnSendReply(Message* msg);
void NotifyWorkerThread();
// On both
bool AwaitingSyncReply() {