add race detection to RPC channels. simplify message processing, be stricter about layering guarantees. add more comments and assertions to channel code.

This commit is contained in:
Chris Jones 2009-08-19 00:22:01 -05:00
parent e45c8d96cd
commit db2bde3f0d
5 changed files with 137 additions and 22 deletions

View File

@ -34,6 +34,9 @@ Message::Message(int32 routing_id, uint16 type, PriorityValue priority)
header()->flags = priority;
#if defined(OS_POSIX)
header()->num_fds = 0;
#endif
#if defined(CHROMIUM_MOZILLA_BUILD)
header()->rpc_remote_stack_depth_guess = static_cast<size_t>(-1);
#endif
InitLoggingVariables();
}

View File

@ -140,6 +140,17 @@ class Message : public Pickle {
header()->routing = new_id;
}
#if defined(CHROMIUM_MOZILLA_BUILD)
size_t rpc_remote_stack_depth() const {
return header()->rpc_remote_stack_depth_guess;
}
void set_rpc_remote_stack_depth(size_t depth) {
DCHECK(is_rpc());
header()->rpc_remote_stack_depth_guess = depth;
}
#endif
template<class T>
static bool Dispatch(const Message* msg, T* obj, void (T::*func)()) {
(obj->*func)();
@ -249,6 +260,11 @@ class Message : public Pickle {
uint16 flags; // specifies control flags for the message
#if defined(OS_POSIX)
uint32 num_fds; // the number of descriptors included with this message
#endif
#if defined(CHROMIUM_MOZILLA_BUILD)
// For RPC messages, what *this* side of the channel thinks the
// *other* side's RPC stack depth is.
size_t rpc_remote_stack_depth_guess;
#endif
};
#pragma pack(pop)

View File

@ -67,7 +67,11 @@ RPCChannel::Call(Message* msg, Message* reply)
mChannelState = ChannelWaiting;
msg->set_rpc_remote_stack_depth(mRemoteStackDepth);
mPending.push(*msg);
// bypass |SyncChannel::Send| b/c RPCChannel implements its own
// waiting semantics
AsyncChannel::Send(msg);
while (1) {
@ -81,25 +85,22 @@ RPCChannel::Call(Message* msg, Message* reply)
Message recvd = mPending.top();
mPending.pop();
if (!recvd.is_rpc()) {
SyncChannel::OnDispatchMessage(recvd);
// FIXME/cjones: error handling
}
NS_ABORT_IF_FALSE(recvd.is_rpc(),
"should have been delegated to SyncChannel");
// RPC reply message
else if (recvd.is_reply()) {
if (recvd.is_reply()) {
NS_ASSERTION(0 < mPending.size(), "invalid RPC stack");
const Message& pending = mPending.top();
#ifdef DEBUG
if (recvd.type() != (pending.type()+1) && !recvd.is_reply_error()) {
// FIXME/cjones: handle error
NS_ASSERTION(0, "somebody's misbehavin'");
}
#endif
// we received a reply to our most recent message. pop this
// frame and return the reply
// we received a reply to our most recent outstanding
// call. pop this frame and return the reply
mPending.pop();
bool isError = recvd.is_reply_error();
@ -116,10 +117,12 @@ RPCChannel::Call(Message* msg, Message* reply)
}
// RPC in-call
else {
// "snapshot" the current stack depth while we own the Mutex
size_t stackDepth = StackDepth();
mMutex.Unlock();
// someone called in to us from the other side. handle the call
OnDispatchMessage(recvd);
ProcessIncall(recvd, stackDepth);
// FIXME/cjones: error handling
mMutex.Lock();
@ -132,14 +135,34 @@ RPCChannel::Call(Message* msg, Message* reply)
}
void
RPCChannel::OnDispatchMessage(const Message& call)
RPCChannel::OnIncall(const Message& call)
{
if (!call.is_rpc()) {
return SyncChannel::OnDispatchMessage(call);
}
// We were called from the IO thread when StackDepth() == 0, and
// we were "idle". That's the "snapshot" of the state of
// the RPCChannel we use when processing this message.
ProcessIncall(call, 0);
}
void
RPCChannel::ProcessIncall(const Message& call, size_t stackDepth)
{
mMutex.AssertNotCurrentThreadOwns();
NS_ABORT_IF_FALSE(call.is_rpc(),
"should have been handled by SyncChannel");
// Race detection: see the long comment near mRemoteStackDepth
// in RPCChannel.h
NS_ASSERTION(stackDepth == call.rpc_remote_stack_depth(),
"RPC in-calls have raced!");
Message* reply = nsnull;
switch (static_cast<RPCListener*>(mListener)->OnCallReceived(call, reply)) {
++mRemoteStackDepth;
Result rv =
static_cast<RPCListener*>(mListener)->OnCallReceived(call, reply);
--mRemoteStackDepth;
switch (rv) {
case MsgProcessed:
mIOLoop->PostTask(FROM_HERE,
NewRunnableMethod(this,
@ -178,14 +201,23 @@ RPCChannel::OnDispatchMessage(const Message& call)
void
RPCChannel::OnMessageReceived(const Message& msg)
{
if (!msg.is_rpc()) {
return SyncChannel::OnMessageReceived(msg);
}
MutexAutoLock lock(mMutex);
if (0 == mPending.size()) {
if (0 == StackDepth()) {
// wake up the worker, there's work to do
// NB: the interaction between this and SyncChannel is rather
// subtle. We may be awaiting a synchronous reply when this
// code is executed. If that's the case, posting this in-call
// to the worker will defer processing of the in-call until
// after the synchronous reply is received.
mWorkerLoop->PostTask(FROM_HERE,
NewRunnableMethod(this,
&RPCChannel::OnDispatchMessage,
msg));
&RPCChannel::OnIncall, msg));
}
else {
// let the worker know something new has happened

View File

@ -64,7 +64,9 @@ public:
};
RPCChannel(RPCListener* aListener) :
SyncChannel(aListener)
SyncChannel(aListener),
mPending(),
mRemoteStackDepth(0)
{
}
@ -86,9 +88,67 @@ private:
return mPending.size() > 0 || SyncChannel::WaitingForReply();
}
void OnDispatchMessage(const Message& msg);
void OnIncall(const Message& msg);
void ProcessIncall(const Message& call, size_t stackDepth);
size_t StackDepth() {
mMutex.AssertCurrentThreadOwns();
NS_ABORT_IF_FALSE(
mPending.empty()
|| (mPending.top().is_rpc() && !mPending.top().is_reply()),
"StackDepth() called from an inconsistent state");
return mPending.size();
}
//
// In quiescent states, |mPending| is a stack of all the RPC
// out-calls on which this RPCChannel is awaiting a response.
//
// The stack is also used by the IO thread to transfer received
// messages to the worker thread, only when the worker thread is
// awaiting an RPC response. Until the worker pops the top of the
// stack, it may (legally) contain either of
//
// - RPC in-call (msg.is_rpc() && !msg.is_reply())
// - RPC reply (msg.is_rpc() && msg.is_reply())
//
// In both cases, the worker will pop the message off the stack
// and process it ASAP, returning |mPending| to a quiescent state.
//
std::stack<Message> mPending;
//
// This is what we think the RPC stack depth is on the "other
// side" of this RPC channel. We maintain this variable so that
// we can detect racy RPC calls. With each RPC out-call sent, we
// send along what *we* think the stack depth of the remote side
// is *before* it will receive the RPC call.
//
// After sending the out-call, our stack depth is "incremented"
// by pushing that pending message onto mPending.
//
// Then when processing an in-call |c|, it must be true that
//
// mPending.size() == c.remoteDepth
//
// i.e., my depth is actually the same as what the other side
// thought it was when it sent in-call |c|. If this fails to
// hold, we have detected racy RPC calls.
//
// We then increment mRemoteStackDepth *just before* processing
// the in-call, since we know the other side is waiting on it, and
// decrement it *just after* finishing processing that in-call,
// since our response will pop the top of the other side's
// |mPending|.
//
// One nice aspect of this race detection is that it is symmetric;
// if one side detects a race, then the other side must also
// detect the same race.
//
// TODO: and when we detect a race, what should we actually *do* ... ?
//
size_t mRemoteStackDepth;
};

View File

@ -94,7 +94,8 @@ SyncChannel::Send(Message* msg, Message* reply)
else {
// case (2)
NS_ASSERTION(!mRecvd.is_reply(), "can't process replies here");
// post a task to our own event loop
// post a task to our own event loop; this delays processing
// of mRecvd
mWorkerLoop->PostTask(
FROM_HERE,
NewRunnableMethod(this, &SyncChannel::OnDispatchMessage, mRecvd));
@ -143,7 +144,7 @@ SyncChannel::OnDispatchMessage(const Message& msg)
void
SyncChannel::OnMessageReceived(const Message& msg)
{
MutexAutoLock lock(mMutex);
mMutex.Lock();
if (ChannelIdle == mChannelState) {
// wake up the worker, there's work to do
@ -153,6 +154,7 @@ SyncChannel::OnMessageReceived(const Message& msg)
NewRunnableMethod(this, &SyncChannel::OnDispatchMessage, msg));
}
else {
mMutex.Unlock();
return AsyncChannel::OnMessageReceived(msg);
}
}
@ -165,6 +167,8 @@ SyncChannel::OnMessageReceived(const Message& msg)
// FIXME/cjones: could reach here in error conditions. impl me
NOTREACHED();
}
mMutex.Unlock();
}
void