Introduce new RPC messaging semantics (bug 910493 part 1, r=cjones).

This commit is contained in:
David Anderson 2013-10-01 09:15:03 -07:00
parent 0ef15f261a
commit 47ae6f0cc8
17 changed files with 711 additions and 152 deletions

View File

@ -96,6 +96,11 @@ class Message : public Pickle {
return (header()->flags & URGENT_BIT) != 0;
}
// True if this is an RPC message.
bool is_rpc() const {
return (header()->flags & RPC_BIT) != 0;
}
// True if compression is enabled for this message.
bool compress() const {
return (header()->flags & COMPRESS_BIT) != 0;
@ -153,6 +158,14 @@ class Message : public Pickle {
header()->routing = new_id;
}
int32_t transaction_id() const {
return header()->txid;
}
void set_transaction_id(int32_t txid) {
header()->txid = txid;
}
uint32_t interrupt_remote_stack_depth_guess() const {
return header()->interrupt_remote_stack_depth_guess;
}
@ -279,6 +292,10 @@ class Message : public Pickle {
header()->flags |= URGENT_BIT;
}
void set_rpc() {
header()->flags |= RPC_BIT;
}
#if !defined(OS_MACOSX)
protected:
#endif
@ -294,7 +311,8 @@ class Message : public Pickle {
HAS_SENT_TIME_BIT = 0x0080,
INTERRUPT_BIT = 0x0100,
COMPRESS_BIT = 0x0200,
URGENT_BIT = 0x0400
URGENT_BIT = 0x0400,
RPC_BIT = 0x0800
};
struct Header : Pickle::Header {
@ -307,8 +325,13 @@ class Message : public Pickle {
uint32_t cookie; // cookie to ACK that the descriptors have been read.
# endif
#endif
// For RPC messages, a guess at what the *other* side's stack depth is.
uint32_t interrupt_remote_stack_depth_guess;
union {
// For Interrupt messages, a guess at what the *other* side's stack depth is.
uint32_t interrupt_remote_stack_depth_guess;
// For RPC and Urgent messages, a transaction ID for message ordering.
int32_t txid;
};
// The actual local stack depth.
uint32_t interrupt_local_stack_depth;
// Sequence number

View File

@ -51,6 +51,8 @@ MessageChannel::MessageChannel(MessageListener *aListener)
mNextSeqno(0),
mPendingSyncReplies(0),
mPendingUrgentReplies(0),
mPendingRPCReplies(0),
mCurrentRPCTransaction(0),
mDispatchingSyncMessage(false),
mRemoteStackDepthGuess(false),
mSawInterruptOutMsg(false)
@ -274,7 +276,8 @@ MessageChannel::OnMessageReceivedFromLink(const Message& aMsg)
// Regardless of the Interrupt stack, if we're awaiting a sync or urgent reply,
// we know that it needs to be immediately handled to unblock us.
if ((AwaitingSyncReply() && aMsg.is_sync()) ||
(AwaitingUrgentReply() && aMsg.is_urgent()))
(AwaitingUrgentReply() && aMsg.is_urgent()) ||
(AwaitingRPCReply() && aMsg.is_rpc()))
{
mRecvd = new Message(aMsg);
NotifyWorkerThread();
@ -295,23 +298,24 @@ MessageChannel::OnMessageReceivedFromLink(const Message& aMsg)
mPending.pop_back();
}
if (aMsg.is_urgent()) {
MOZ_ASSERT(!mPendingUrgentRequest);
mPendingUrgentRequest = new Message(aMsg);
} else {
mPending.push_back(aMsg);
}
bool shouldWakeUp = AwaitingInterruptReply() ||
// Allow incoming RPCs to be processed inside an urgent message.
(AwaitingUrgentReply() && aMsg.is_rpc()) ||
// Always process urgent messages while blocked.
((AwaitingSyncReply() || AwaitingRPCReply()) && aMsg.is_urgent());
// There are four cases we're concerned about, relating to the state of the
// main thread:
//
// (1) We are waiting on a sync reply - main thread is blocked on the IPC monitor.
// (1) We are waiting on a sync|rpc reply - main thread is blocked on the
// IPC monitor.
// - If the message is high priority, we wake up the main thread to
// deliver the message. Otherwise, we leave it in the mPending queue,
// posting a task to the main event loop, where it will be processed
// once the synchronous reply has been received.
//
// (2) We are waiting on an Interrupt reply - main thread is blocked on the IPC monitor.
// (2) We are waiting on an Interrupt reply - main thread is blocked on the
// IPC monitor.
// - Always notify and wake up the main thread.
//
// (3) We are not waiting on a reply.
@ -320,10 +324,48 @@ MessageChannel::OnMessageReceivedFromLink(const Message& aMsg)
// Note that, we may notify the main thread even though the monitor is not
// blocked. This is okay, since we always check for pending events before
// blocking again.
//
if (AwaitingInterruptReply() || (AwaitingSyncReply() && aMsg.is_urgent())) {
// Always wake up our Interrupt waiter, and wake up sync waiters for urgent
// messages.
if (shouldWakeUp && (AwaitingUrgentReply() && aMsg.is_rpc())) {
// If we're receiving an RPC message while blocked on an urgent message,
// we must defer any messages that were not sent as part of the child
// answering the urgent message.
//
// We must also be sure that we will not accidentally defer any RPC
// message that was sent while answering an urgent message. Otherwise,
// we will deadlock.
//
// On the parent side, the current transaction can only transition from 0
// to an ID, either by us issuing an urgent request while not blocked, or
// by receiving an RPC request while not blocked. When we unblock, the
// current transaction is reset to 0.
//
// When the child side receives an urgent message, any RPC messages sent
// before issuing the urgent reply will carry the urgent message's
// transaction ID.
//
// Since AwaitingUrgentReply() implies we are blocked, it also implies
// that we are within a transaction that will not change until we are
// completely unblocked (i.e, the transaction has completed).
if (aMsg.transaction_id() != mCurrentRPCTransaction)
shouldWakeUp = false;
}
if (aMsg.is_urgent()) {
MOZ_ASSERT(!mPendingUrgentRequest);
mPendingUrgentRequest = new Message(aMsg);
} else if (aMsg.is_rpc() && shouldWakeUp) {
// Only use this slot if we need to wake up for an RPC call. Otherwise
// we treat it like a normal async or sync message.
MOZ_ASSERT(!mPendingRPCCall);
mPendingRPCCall = new Message(aMsg);
} else {
mPending.push_back(aMsg);
}
if (shouldWakeUp) {
// Always wake up Interrupt waiters, sync waiters for urgent messages,
// RPC waiters for urgent messages, and urgent waiters for RPCs in the
// same transaction.
NotifyWorkerThread();
} else {
// Worker thread is either not blocked on a reply, or this is an
@ -355,14 +397,7 @@ MessageChannel::Send(Message* aMsg, Message* aReply)
IPC_ASSERT(aMsg->is_sync(), "can only Send() sync messages here");
IPC_ASSERT(!DispatchingSyncMessage(), "violation of sync handler invariant");
if (AwaitingSyncReply()) {
// This is a temporary hack in place, for e10s CPOWs, until bug 901789
// and the new followup Interrupt protocol land. Eventually this will become
// an assert again. See bug 900062 for details.
NS_ERROR("Nested sync messages are not supported");
return false;
}
IPC_ASSERT(!AwaitingSyncReply(), "nested sync messages are not supported");
AutoEnterPendingReply replies(mPendingSyncReplies);
if (!SendAndWait(aMsg, aReply))
@ -388,12 +423,12 @@ MessageChannel::UrgentCall(Message* aMsg, Message* aReply)
MonitorAutoLock lock(*mMonitor);
// At the moment, we don't allow urgent outcalls to nest, though this will
// change soon.
IPC_ASSERT(!AwaitingUrgentReply(), "urgent calls cannot nest");
IPC_ASSERT(!AwaitingInterruptReply(), "urgent calls cannot be issued within Interrupt calls");
IPC_ASSERT(!AwaitingSyncReply(), "urgent calls cannot be issued within sync sends");
AutoEnterRPCTransaction transact(this);
aMsg->set_transaction_id(mCurrentRPCTransaction);
AutoEnterPendingReply replies(mPendingUrgentReplies);
if (!SendAndWait(aMsg, aReply))
return false;
@ -402,6 +437,36 @@ MessageChannel::UrgentCall(Message* aMsg, Message* aReply)
return true;
}
bool
MessageChannel::RPCCall(Message* aMsg, Message* aReply)
{
AssertWorkerThread();
mMonitor->AssertNotCurrentThreadOwns();
IPC_ASSERT(mSide == ChildSide, "cannot send rpc messages from parent");
#ifdef OS_WIN
SyncStackFrame frame(this, false);
#endif
Message copy = *aMsg;
CxxStackFrame f(*this, OUT_MESSAGE, &copy);
MonitorAutoLock lock(*mMonitor);
// RPC calls must be the only thing on the stack.
IPC_ASSERT(!AwaitingInterruptReply(), "rpc calls cannot be issued within interrupts");
AutoEnterRPCTransaction transact(this);
aMsg->set_transaction_id(mCurrentRPCTransaction);
AutoEnterPendingReply replies(mPendingRPCReplies);
if (!SendAndWait(aMsg, aReply))
return false;
NS_ABORT_IF_FALSE(aReply->is_rpc(), "expected rpc reply");
return true;
}
bool
MessageChannel::SendAndWait(Message* aMsg, Message* aReply)
{
@ -424,7 +489,7 @@ MessageChannel::SendAndWait(Message* aMsg, Message* aReply)
while (true) {
// Wait for an event to occur.
while (true) {
if (mRecvd || mPendingUrgentRequest)
if (mRecvd || mPendingUrgentRequest || mPendingRPCCall)
break;
bool maybeTimedOut = !WaitForSyncNotify();
@ -438,28 +503,11 @@ MessageChannel::SendAndWait(Message* aMsg, Message* aReply)
return false;
}
if (mPendingUrgentRequest) {
// Note that it is possible we could have sent a sync message at
// the same time the parent process sent an urgent message, and
// therefore mPendingUrgentRequest is set *and* mRecvd is set as
// well. In this case we always process the urgent request first.
// However, if mRecvd is not set, we assert that it does not
// become set by DispatchMessage(), since the parent should be
// blocked.
bool hadSyncReply = !!mRecvd;
if (mPendingUrgentRequest && !ProcessPendingUrgentRequest())
return false;
nsAutoPtr<Message> recvd(mPendingUrgentRequest.forget());
{
MonitorAutoUnlock unlock(*mMonitor);
DispatchMessage(*recvd);
}
if (!Connected()) {
ReportConnectionError("MessageChannel::DispatchMessage");
return false;
}
IPC_ASSERT(!hadSyncReply || !mRecvd, "incoherent mRecvd state");
}
if (mPendingRPCCall && !ProcessPendingRPCCall())
return false;
if (mRecvd) {
NS_ABORT_IF_FALSE(mRecvd->is_reply(), "expected reply");
@ -486,6 +534,8 @@ MessageChannel::Call(Message* aMsg, Message* aReply)
{
if (aMsg->is_urgent())
return UrgentCall(aMsg, aReply);
if (aMsg->is_rpc())
return RPCCall(aMsg, aReply);
return InterruptCall(aMsg, aReply);
}
@ -563,6 +613,9 @@ MessageChannel::InterruptCall(Message* aMsg, Message* aReply)
if (mPendingUrgentRequest) {
recvd = *mPendingUrgentRequest;
mPendingUrgentRequest = nullptr;
} else if (mPendingRPCCall) {
recvd = *mPendingRPCCall;
mPendingRPCCall = nullptr;
} else if ((it = mOutOfTurnReplies.find(mInterruptStack.top().seqno()))
!= mOutOfTurnReplies.end())
{
@ -586,6 +639,7 @@ MessageChannel::InterruptCall(Message* aMsg, Message* aReply)
IPC_ASSERT(!recvd.is_sync() || mPending.empty(), "other side should be blocked");
{
AutoEnterRPCTransaction transaction(this, &recvd);
MonitorAutoUnlock unlock(*mMonitor);
CxxStackFrame frame(*this, IN_MESSAGE, &recvd);
DispatchMessage(recvd);
@ -667,11 +721,126 @@ MessageChannel::InterruptEventOccurred()
return (!Connected() ||
!mPending.empty() ||
mPendingUrgentRequest ||
mPendingRPCCall ||
(!mOutOfTurnReplies.empty() &&
mOutOfTurnReplies.find(mInterruptStack.top().seqno()) !=
mOutOfTurnReplies.end()));
}
bool
MessageChannel::ProcessPendingUrgentRequest()
{
AssertWorkerThread();
mMonitor->AssertCurrentThreadOwns();
// Note that it is possible we could have sent a sync message at
// the same time the parent process sent an urgent message, and
// therefore mPendingUrgentRequest is set *and* mRecvd is set as
// well, because the link thread received both before the worker
// thread woke up.
//
// In this case, we process the urgent message first, but we need
// to save the reply.
nsAutoPtr<Message> savedReply(mRecvd.forget());
// We're the child process. We should not be receiving RPC calls.
IPC_ASSERT(!mPendingRPCCall, "unexpected RPC call");
nsAutoPtr<Message> recvd(mPendingUrgentRequest.forget());
{
// In order to send the parent RPC messages and guarantee it will
// wake up, we must re-use its transaction.
AutoEnterRPCTransaction transaction(this, recvd);
MonitorAutoUnlock unlock(*mMonitor);
DispatchUrgentMessage(*recvd);
}
if (!Connected()) {
ReportConnectionError("MessageChannel::DispatchUrgentMessage");
return false;
}
// In between having dispatched our reply to the parent process, and
// re-acquiring the monitor, the parent process could have already
// processed that reply and sent the reply to our sync message. If so,
// our saved reply should be empty.
IPC_ASSERT(!mRecvd || !savedReply, "unknown reply");
if (!mRecvd)
mRecvd = savedReply.forget();
return true;
}
bool
MessageChannel::ProcessPendingRPCCall()
{
AssertWorkerThread();
mMonitor->AssertCurrentThreadOwns();
// See comment above re: mRecvd replies and incoming calls.
nsAutoPtr<Message> savedReply(mRecvd.forget());
IPC_ASSERT(!mPendingUrgentRequest, "unexpected urgent message");
nsAutoPtr<Message> recvd(mPendingRPCCall.forget());
{
// If we are not currently in a transaction, this will begin one,
// and the link thread will not wake us up for any RPC messages not
// apart of this transaction. If we are already in a transaction,
// then this will assert that we're still in the same transaction.
AutoEnterRPCTransaction transaction(this, recvd);
MonitorAutoUnlock unlock(*mMonitor);
DispatchRPCMessage(*recvd);
}
if (!Connected()) {
ReportConnectionError("MessageChannel::DispatchRPCMessage");
return false;
}
// In between having dispatched our reply to the parent process, and
// re-acquiring the monitor, the parent process could have already
// processed that reply and sent the reply to our sync message. If so,
// our saved reply should be empty.
IPC_ASSERT(!mRecvd || !savedReply, "unknown reply");
if (!mRecvd)
mRecvd = savedReply.forget();
return true;
}
bool
MessageChannel::DequeueOne(Message *recvd)
{
AssertWorkerThread();
mMonitor->AssertCurrentThreadOwns();
if (!Connected()) {
ReportConnectionError("OnMaybeDequeueOne");
return false;
}
if (mPendingUrgentRequest) {
*recvd = *mPendingUrgentRequest;
mPendingUrgentRequest = nullptr;
return true;
}
if (mPendingRPCCall) {
*recvd = *mPendingRPCCall;
mPendingRPCCall = nullptr;
return true;
}
if (!mDeferred.empty())
MaybeUndeferIncall();
if (mPending.empty())
return false;
*recvd = mPending.front();
mPending.pop_front();
return true;
}
bool
MessageChannel::OnMaybeDequeueOne()
{
@ -679,29 +848,10 @@ MessageChannel::OnMaybeDequeueOne()
mMonitor->AssertNotCurrentThreadOwns();
Message recvd;
do {
MonitorAutoLock lock(*mMonitor);
if (!Connected()) {
ReportConnectionError("OnMaybeDequeueOne");
return false;
}
if (mPendingUrgentRequest) {
recvd = *mPendingUrgentRequest;
mPendingUrgentRequest = nullptr;
break;
}
if (!mDeferred.empty())
MaybeUndeferIncall();
if (mPending.empty())
return false;
recvd = mPending.front();
mPending.pop_front();
} while (0);
MonitorAutoLock lock(*mMonitor);
if (!DequeueOne(&recvd))
return false;
if (IsOnCxxStack() && recvd.is_interrupt() && recvd.is_reply()) {
// We probably just received a reply in a nested loop for an
@ -710,8 +860,16 @@ MessageChannel::OnMaybeDequeueOne()
return false;
}
CxxStackFrame frame(*this, IN_MESSAGE, &recvd);
DispatchMessage(recvd);
{
// We should not be in a transaction yet if we're not blocked.
MOZ_ASSERT(mCurrentRPCTransaction == 0);
AutoEnterRPCTransaction transaction(this, &recvd);
MonitorAutoUnlock unlock(*mMonitor);
CxxStackFrame frame(*this, IN_MESSAGE, &recvd);
DispatchMessage(recvd);
}
return true;
}
@ -724,6 +882,8 @@ MessageChannel::DispatchMessage(const Message &aMsg)
DispatchUrgentMessage(aMsg);
else if (aMsg.is_interrupt())
DispatchInterruptMessage(aMsg, 0);
else if (aMsg.is_rpc())
DispatchRPCMessage(aMsg);
else
DispatchAsyncMessage(aMsg);
}
@ -775,6 +935,28 @@ MessageChannel::DispatchUrgentMessage(const Message& aMsg)
mLink->SendMessage(reply);
}
void
MessageChannel::DispatchRPCMessage(const Message& aMsg)
{
AssertWorkerThread();
MOZ_ASSERT(aMsg.is_rpc());
Message *reply = nullptr;
if (!MaybeHandleError(mListener->OnCallReceived(aMsg, reply), "DispatchRPCMessage")) {
delete reply;
reply = new Message();
reply->set_rpc();
reply->set_reply();
reply->set_reply_error();
}
reply->set_seqno(aMsg.seqno());
MonitorAutoLock lock(*mMonitor);
if (ChannelConnected == mChannelState)
mLink->SendMessage(reply);
}
void
MessageChannel::DispatchAsyncMessage(const Message& aMsg)
{
@ -1158,7 +1340,7 @@ MessageChannel::OnChannelErrorFromLink()
if (InterruptStackDepth() > 0)
NotifyWorkerThread();
if (AwaitingSyncReply())
if (AwaitingSyncReply() || AwaitingRPCReply() || AwaitingUrgentReply())
NotifyWorkerThread();
if (ChannelClosing != mChannelState) {

View File

@ -42,6 +42,7 @@ class MessageChannel : HasResultCodes
{
friend class ProcessLink;
friend class ThreadLink;
friend class AutoEnterRPCTransaction;
typedef mozilla::Monitor Monitor;
@ -189,16 +190,21 @@ class MessageChannel : HasResultCodes
// up to process urgent calls from the parent.
bool SendAndWait(Message* aMsg, Message* aReply);
bool RPCCall(Message* aMsg, Message* aReply);
bool InterruptCall(Message* aMsg, Message* aReply);
bool UrgentCall(Message* aMsg, Message* aReply);
bool InterruptEventOccurred();
bool ProcessPendingUrgentRequest();
bool ProcessPendingRPCCall();
void MaybeUndeferIncall();
void EnqueuePendingMessages();
// Executed on the worker thread. Dequeues one pending message.
bool OnMaybeDequeueOne();
bool DequeueOne(Message *recvd);
// Dispatches an incoming message to its appropriate handler.
void DispatchMessage(const Message &aMsg);
@ -208,6 +214,7 @@ class MessageChannel : HasResultCodes
void DispatchSyncMessage(const Message &aMsg);
void DispatchUrgentMessage(const Message &aMsg);
void DispatchAsyncMessage(const Message &aMsg);
void DispatchRPCMessage(const Message &aMsg);
void DispatchInterruptMessage(const Message &aMsg, size_t aStackDepth);
// Return true if the wait ended because a notification was received.
@ -362,6 +369,10 @@ class MessageChannel : HasResultCodes
mMonitor->AssertCurrentThreadOwns();
return mPendingUrgentReplies > 0;
}
bool AwaitingRPCReply() const {
mMonitor->AssertCurrentThreadOwns();
return mPendingRPCReplies > 0;
}
bool AwaitingInterruptReply() const {
mMonitor->AssertCurrentThreadOwns();
return !mInterruptStack.empty();
@ -496,9 +507,66 @@ class MessageChannel : HasResultCodes
// out-message. This will never be greater than 1.
size_t mPendingSyncReplies;
// Worker-thread only; type we're expecting for the reply to an
// urgent out-message. This will never be greater than 1.
// Worker-thread only; Number of urgent and rpc replies we're waiting on.
// These are mutually exclusive since one channel cannot have outcalls of
// both kinds.
size_t mPendingUrgentReplies;
size_t mPendingRPCReplies;
// When we send an urgent request from the parent process, we could race
// with an RPC message that was issued by the child beforehand. In this
// case, if the parent were to wake up while waiting for the urgent reply,
// and process the RPC, it could send an additional urgent message. The
// child would wake up to process the urgent message (as it always will),
// then send a reply, which could be received by the parent out-of-order
// with respect to the first urgent reply.
//
// To address this problem, urgent or RPC requests are associated with a
// "transaction". Whenever one side of the channel wishes to start a
// chain of RPC/urgent messages, it allocates a new transaction ID. Any
// messages the parent receives, not apart of this transaction, are
// deferred. When issuing RPC/urgent requests on top of a started
// transaction, the initiating transaction ID is used.
//
// To ensure IDs are unique, we use sequence numbers for transaction IDs,
// which grow in opposite directions from child to parent.
// The current transaction ID.
int32_t mCurrentRPCTransaction;
class AutoEnterRPCTransaction
{
public:
AutoEnterRPCTransaction(MessageChannel *aChan)
: mChan(aChan),
mOldTransaction(mChan->mCurrentRPCTransaction)
{
mChan->mMonitor->AssertCurrentThreadOwns();
if (mChan->mCurrentRPCTransaction == 0)
mChan->mCurrentRPCTransaction = mChan->NextSeqno();
}
AutoEnterRPCTransaction(MessageChannel *aChan, Message *message)
: mChan(aChan),
mOldTransaction(mChan->mCurrentRPCTransaction)
{
mChan->mMonitor->AssertCurrentThreadOwns();
if (!message->is_rpc() && !message->is_urgent())
return;
MOZ_ASSERT_IF(mChan->mSide == ParentSide,
!mOldTransaction || mOldTransaction == message->transaction_id());
mChan->mCurrentRPCTransaction = message->transaction_id();
}
~AutoEnterRPCTransaction() {
mChan->mMonitor->AssertCurrentThreadOwns();
mChan->mCurrentRPCTransaction = mOldTransaction;
}
private:
MessageChannel *mChan;
int32_t mOldTransaction;
};
// If waiting for the reply to a sync out-message, it will be saved here
// on the I/O thread and then read and cleared by the worker thread.
@ -544,7 +612,18 @@ class MessageChannel : HasResultCodes
// another blocking message, because it's blocked on a reply from us.
//
MessageQueue mPending;
// Note that these two pointers are mutually exclusive. One channel cannot
// send both urgent requests (parent -> child) and RPC calls (child->parent).
// Also note that since initiating either requires blocking, they cannot
// queue up on the other side. One message slot is enough.
//
// Normally, all other message types are deferred into into mPending, and
// only these two types have special treatment (since they wake up blocked
// requests). However, when an RPC in-call races with an urgent out-call,
// the RPC message will be put into mPending instead of its slot below.
nsAutoPtr<Message> mPendingUrgentRequest;
nsAutoPtr<Message> mPendingRPCCall;
// Stack of all the out-calls on which this channel is awaiting responses.
// Each stack refers to a different protocol and the stacks are mutually

View File

@ -191,51 +191,31 @@ class UsingStmt(Node):
self.type = cxxTypeSpec
# "singletons"
class ASYNC:
class PrettyPrinted:
@classmethod
def __hash__(cls): return hash(cls.pretty)
@classmethod
def __str__(cls): return cls.pretty
class ASYNC(PrettyPrinted):
pretty = 'async'
@classmethod
def __hash__(cls): return hash(cls.pretty)
@classmethod
def __str__(cls): return cls.pretty
class INTR:
class INTR(PrettyPrinted):
pretty = 'intr'
@classmethod
def __hash__(cls): return hash(cls.pretty)
@classmethod
def __str__(cls): return cls.pretty
class SYNC:
class SYNC(PrettyPrinted):
pretty = 'sync'
@classmethod
def __hash__(cls): return hash(cls.pretty)
@classmethod
def __str__(cls): return cls.pretty
class URGENT:
class URGENT(PrettyPrinted):
pretty = 'urgent'
@classmethod
def __hash__(cls): return hash(cls.pretty)
@classmethod
def __str__(cls): return cls.pretty
class RPC(PrettyPrinted):
pretty = 'rpc'
class INOUT:
class INOUT(PrettyPrinted):
pretty = 'inout'
@classmethod
def __hash__(cls): return hash(cls.pretty)
@classmethod
def __str__(cls): return cls.pretty
class IN:
class IN(PrettyPrinted):
pretty = 'in'
@classmethod
def __hash__(cls): return hash(cls.pretty)
@classmethod
def __str__(cls): return cls.pretty
@staticmethod
def prettySS(cls, ss): return _prettyTable['in'][ss.pretty]
class OUT:
class OUT(PrettyPrinted):
pretty = 'out'
@classmethod
def __hash__(cls): return hash(cls.pretty)
@classmethod
def __str__(cls): return cls.pretty
@staticmethod
def prettySS(ss): return _prettyTable['out'][ss.pretty]
@ -243,10 +223,12 @@ _prettyTable = {
IN : { 'async': 'AsyncRecv',
'sync': 'SyncRecv',
'intr': 'IntrAnswer',
'rpc': 'RPCAnswer',
'urgent': 'UrgentAnswer' },
OUT : { 'async': 'AsyncSend',
'sync': 'SyncSend',
'intr': 'IntrCall',
'rpc': 'RPCCall',
'urgent': 'UrgentCall' }
# inout doesn't make sense here
}

View File

@ -277,13 +277,13 @@ def _putInNamespaces(cxxthing, namespaces):
def _sendPrefix(msgtype):
"""Prefix of the name of the C++ method that sends |msgtype|."""
if msgtype.isInterrupt() or msgtype.isUrgent():
if msgtype.isInterrupt() or msgtype.isUrgent() or msgtype.isRpc():
return 'Call'
return 'Send'
def _recvPrefix(msgtype):
"""Prefix of the name of the C++ method that handles |msgtype|."""
if msgtype.isInterrupt() or msgtype.isUrgent():
if msgtype.isInterrupt() or msgtype.isUrgent() or msgtype.isRpc():
return 'Answer'
return 'Recv'
@ -2955,7 +2955,7 @@ class _GenerateProtocolActorCode(ipdl.ast.Visitor):
self.asyncSwitch = StmtSwitch(msgtype)
if toplevel.talksSync():
self.syncSwitch = StmtSwitch(msgtype)
if toplevel.talksInterrupt():
if toplevel.talksRpc():
self.interruptSwitch = StmtSwitch(msgtype)
# implement Send*() methods and add dispatcher cases to
@ -2974,7 +2974,7 @@ class _GenerateProtocolActorCode(ipdl.ast.Visitor):
self.asyncSwitch.addcase(DefaultLabel(), default)
if toplevel.talksSync():
self.syncSwitch.addcase(DefaultLabel(), default)
if toplevel.talksInterrupt():
if toplevel.talksRpc():
self.interruptSwitch.addcase(DefaultLabel(), default)
# FIXME/bug 535053: only manager protocols and non-manager
@ -3051,7 +3051,7 @@ class _GenerateProtocolActorCode(ipdl.ast.Visitor):
hasReply=0, dispatches=dispatches),
Whitespace.NL
])
if not toplevel.talksInterrupt():
if not toplevel.talksRpc():
self.interruptSwitch = None
if not toplevel.talksSync():
self.syncSwitch = None
@ -4640,7 +4640,7 @@ class _GenerateProtocolActorCode(ipdl.ast.Visitor):
self.asyncSwitch.addcase(lbl, case)
elif sems is ipdl.ast.SYNC:
self.syncSwitch.addcase(lbl, case)
elif sems is ipdl.ast.INTR or sems is ipdl.ast.URGENT:
elif sems is ipdl.ast.INTR or sems is ipdl.ast.URGENT or sems is ipdl.ast.RPC:
self.interruptSwitch.addcase(lbl, case)
else: assert 0
@ -5041,6 +5041,9 @@ class _GenerateProtocolActorCode(ipdl.ast.Visitor):
elif md.decl.type.isInterrupt():
stmts.append(StmtExpr(ExprCall(
ExprSelect(var, '->', 'set_interrupt'))))
elif md.decl.type.isRpc():
stmts.append(StmtExpr(ExprCall(
ExprSelect(var, '->', 'set_rpc'))))
if reply:
stmts.append(StmtExpr(ExprCall(

View File

@ -137,6 +137,7 @@ reserved = set((
'protocol',
'recv',
'returns',
'rpc',
'send',
'spawns',
'start',
@ -607,6 +608,7 @@ def p_OptionalSendSemanticsQual(p):
def p_SendSemanticsQual(p):
"""SendSemanticsQual : ASYNC
| INTR
| RPC
| URGENT
| SYNC"""
s = p[1]
@ -614,6 +616,7 @@ def p_SendSemanticsQual(p):
elif 'intr' == s: p[0] = INTR
elif 'sync' == s: p[0] = SYNC
elif 'urgent' == s: p[0] = URGENT
elif 'rpc' == s: p[0] = RPC
else:
assert 0

View File

@ -7,7 +7,7 @@ import os, sys
from ipdl.ast import CxxInclude, Decl, Loc, QualifiedId, State, StructDecl, TransitionStmt
from ipdl.ast import TypeSpec, UnionDecl, UsingStmt, Visitor, ASYNC, SYNC, INTR
from ipdl.ast import IN, OUT, INOUT, ANSWER, CALL, RECV, SEND, URGENT
from ipdl.ast import IN, OUT, INOUT, ANSWER, CALL, RECV, SEND, URGENT, RPC
import ipdl.builtin as builtin
_DELETE_MSG = '__delete__'
@ -208,16 +208,22 @@ class IPDLType(Type):
def isSync(self): return self.sendSemantics is SYNC
def isInterrupt(self): return self.sendSemantics is INTR
def isUrgent(self): return self.sendSemantics is URGENT
def isRpc(self): return self.sendSemantics is RPC
def talksAsync(self): return True
def talksSync(self): return self.isSync() or self.isInterrupt()
def talksSync(self): return self.isSync() or self.isRpc() or self.isInterrupt()
def talksRpc(self): return self.isRpc() or self.isInterrupt()
def talksInterrupt(self): return self.isInterrupt()
def hasReply(self): return self.isSync() or self.isInterrupt() or self.isUrgent()
def hasReply(self): return (self.isSync()
or self.isInterrupt()
or self.isUrgent()
or self.isRpc())
def needsMoreJuiceThan(self, o):
return (o.isAsync() and not self.isAsync()
or o.isSync() and self.isInterrupt())
or o.isSync() and (self.isUrgent() or self.isRpc())
or (o.isUrgent() or o.isRpc()) and self.isInterrupt())
class StateType(IPDLType):
def __init__(self, protocol, name, start=False):
@ -1467,6 +1473,12 @@ class CheckTypes(TcheckVisitor):
"urgent child-to-parent messages are verboten (here, message `%s' in protocol `%s')",
mname, pname)
if mtype.isRpc() and (mtype.isOut() or mtype.isInout()):
self.error(
loc,
"rpc parent-to-child messages are verboten (here, message' `%s' in protocol `%s')",
mname, pname)
if mtype.needsMoreJuiceThan(ptype):
self.error(
loc,

View File

@ -21,6 +21,7 @@ IPDLTESTS = \
TestInterruptErrorCleanup \
TestInterruptRaces \
TestInterruptShutdownRace \
TestRPC \
TestRaceDeferral \
TestRacyReentry \
TestRacyInterruptReplies \

View File

@ -0,0 +1,24 @@
namespace mozilla {
namespace _ipdltest {
rpc protocol PTestRPC
{
parent:
rpc Test1_Start() returns (uint32_t result);
rpc Test1_InnerEvent() returns (uint32_t result);
async Test2_Start();
rpc Test2_OutOfOrder();
sync Test3_Start() returns (uint32_t result);
rpc Test3_InnerEvent() returns (uint32_t result);
child:
async Start();
urgent Test1_InnerQuery() returns (uint32_t result);
urgent Test1_NoReenter() returns (uint32_t result);
urgent Test2_FirstUrgent();
urgent Test2_SecondUrgent();
urgent Test3_WakeUp() returns (uint32_t result);
};
} // namespace _ipdltest
} // namespace mozilla

View File

@ -7,15 +7,12 @@ parent:
sync Test1() returns (uint32_t result);
async Test2();
sync Test3() returns (uint32_t result);
sync Test4_Begin();
sync Test4_NestedSync();
sync FinalTest_Begin();
child:
async Start();
urgent Reply1() returns (uint32_t result);
urgent Reply2() returns (uint32_t result);
urgent Test4_Reenter();
urgent FinalTest_Hang();
};

View File

@ -0,0 +1,182 @@
#include "TestRPC.h"
#include "IPDLUnitTests.h" // fail etc.
#include <unistd.h>
#if !defined(OS_POSIX)
#include <windows.h>
#endif
namespace mozilla {
namespace _ipdltest {
//-----------------------------------------------------------------------------
// parent
TestRPCParent::TestRPCParent()
: reentered_(false),
resolved_first_cpow_(false)
{
MOZ_COUNT_CTOR(TestRPCParent);
}
TestRPCParent::~TestRPCParent()
{
MOZ_COUNT_DTOR(TestRPCParent);
}
void
TestRPCParent::Main()
{
if (!SendStart())
fail("sending Start");
}
bool
TestRPCParent::AnswerTest1_Start(uint32_t* aResult)
{
uint32_t result;
if (!CallTest1_InnerQuery(&result))
fail("CallTest1_InnerQuery");
if (result != 300)
fail("Wrong result (expected 300)");
*aResult = 100;
return true;
}
bool
TestRPCParent::AnswerTest1_InnerEvent(uint32_t* aResult)
{
uint32_t result;
if (!CallTest1_NoReenter(&result))
fail("CallTest1_NoReenter");
if (result != 400)
fail("Wrong result (expected 400)");
*aResult = 200;
return true;
}
bool
TestRPCParent::RecvTest2_Start()
{
// Send a CPOW. During this time, we must NOT process the RPC message, as
// we could start receiving CPOW replies out-of-order.
if (!CallTest2_FirstUrgent())
fail("CallTest2_FirstUrgent");
MOZ_ASSERT(!reentered_);
resolved_first_cpow_ = true;
return true;
}
bool
TestRPCParent::AnswerTest2_OutOfOrder()
{
// Send a CPOW. If this RPC call was initiated while waiting for the first
// CPOW to resolve, replies will be processed out of order, and we'll crash.
if (!CallTest2_SecondUrgent())
fail("CallTest2_SecondUrgent");
reentered_ = true;
return true;
}
bool
TestRPCParent::RecvTest3_Start(uint32_t* aResult)
{
if (!CallTest3_WakeUp(aResult))
fail("CallTest3_WakeUp");
return true;
}
bool
TestRPCParent::AnswerTest3_InnerEvent(uint32_t* aResult)
{
*aResult = 200;
return true;
}
//-----------------------------------------------------------------------------
// child
TestRPCChild::TestRPCChild()
{
MOZ_COUNT_CTOR(TestRPCChild);
}
TestRPCChild::~TestRPCChild()
{
MOZ_COUNT_DTOR(TestRPCChild);
}
bool
TestRPCChild::RecvStart()
{
uint32_t result;
if (!CallTest1_Start(&result))
fail("CallTest1_Start");
if (result != 100)
fail("Wrong result (expected 100)");
if (!SendTest2_Start())
fail("SendTest2_Start");
if (!CallTest2_OutOfOrder())
fail("CallTest2_OutOfOrder");
result = 0;
if (!SendTest3_Start(&result))
fail("SendTest3_Start");
if (result != 200)
fail("Wrong result (expected 200)");
Close();
return true;
}
bool
TestRPCChild::AnswerTest1_InnerQuery(uint32_t* aResult)
{
uint32_t result;
if (!CallTest1_InnerEvent(&result))
fail("CallTest1_InnerEvent");
if (result != 200)
fail("Wrong result (expected 200)");
*aResult = 300;
return true;
}
bool
TestRPCChild::AnswerTest1_NoReenter(uint32_t* aResult)
{
*aResult = 400;
return true;
}
bool
TestRPCChild::AnswerTest2_FirstUrgent()
{
return true;
}
bool
TestRPCChild::AnswerTest2_SecondUrgent()
{
return true;
}
bool
TestRPCChild::AnswerTest3_WakeUp(uint32_t* aResult)
{
if (!CallTest3_InnerEvent(aResult))
fail("CallTest3_InnerEvent");
return true;
}
} // namespace _ipdltest
} // namespace mozilla

View File

@ -0,0 +1,77 @@
#ifndef mozilla__ipdltest_TestRPC_h
#define mozilla__ipdltest_TestRPC_h 1
#include "mozilla/_ipdltest/IPDLUnitTests.h"
#include "mozilla/_ipdltest/PTestRPCParent.h"
#include "mozilla/_ipdltest/PTestRPCChild.h"
namespace mozilla {
namespace _ipdltest {
class TestRPCParent :
public PTestRPCParent
{
public:
TestRPCParent();
virtual ~TestRPCParent();
static bool RunTestInProcesses() { return true; }
static bool RunTestInThreads() { return true; }
void Main();
bool AnswerTest1_Start(uint32_t* aResult) MOZ_OVERRIDE;
bool AnswerTest1_InnerEvent(uint32_t* aResult) MOZ_OVERRIDE;
bool RecvTest2_Start() MOZ_OVERRIDE;
bool AnswerTest2_OutOfOrder() MOZ_OVERRIDE;
bool RecvTest3_Start(uint32_t* aResult) MOZ_OVERRIDE;
bool AnswerTest3_InnerEvent(uint32_t* aResult) MOZ_OVERRIDE;
virtual void ActorDestroy(ActorDestroyReason why) MOZ_OVERRIDE
{
if (NormalShutdown != why)
fail("unexpected destruction!");
if (!reentered_)
fail("never processed raced RPC call!");
if (!resolved_first_cpow_)
fail("never resolved first CPOW!");
passed("ok");
QuitParent();
}
private:
bool reentered_;
bool resolved_first_cpow_;
};
class TestRPCChild :
public PTestRPCChild
{
public:
TestRPCChild();
virtual ~TestRPCChild();
bool RecvStart() MOZ_OVERRIDE;
bool AnswerTest1_InnerQuery(uint32_t* aResult) MOZ_OVERRIDE;
bool AnswerTest1_NoReenter(uint32_t* aResult) MOZ_OVERRIDE;
bool AnswerTest2_FirstUrgent() MOZ_OVERRIDE;
bool AnswerTest2_SecondUrgent() MOZ_OVERRIDE;
bool AnswerTest3_WakeUp(uint32_t* aResult) MOZ_OVERRIDE;
virtual void ActorDestroy(ActorDestroyReason why) MOZ_OVERRIDE
{
if (NormalShutdown != why)
fail("unexpected destruction!");
QuitChild();
}
};
} // namespace _ipdltest
} // namespace mozilla
#endif // ifndef mozilla__ipdltest_TestRPC_h

View File

@ -2,8 +2,7 @@
#include "IPDLUnitTests.h" // fail etc.
#include <unistd.h>
#if defined(OS_POSIX)
#else
#if !defined(OS_POSIX)
#include <windows.h>
#endif
@ -77,21 +76,6 @@ TestUrgencyParent::RecvTest3(uint32_t *value)
return true;
}
bool
TestUrgencyParent::RecvTest4_Begin()
{
if (!CallTest4_Reenter())
fail("call Test4_Reenter");
return true;
}
bool
TestUrgencyParent::RecvTest4_NestedSync()
{
fail("nested sync not supported");
return false;
}
bool
TestUrgencyParent::RecvFinalTest_Begin()
{
@ -145,9 +129,6 @@ TestUrgencyChild::RecvStart()
if (result != 1000)
fail("wrong value from test3");
if (!SendTest4_Begin())
fail("calling SendTest4_Begin");
// This must be the last test, since the child process may die.
if (SendFinalTest_Begin())
fail("Final test should not have succeeded");
@ -182,14 +163,6 @@ TestUrgencyChild::AnswerReply2(uint32_t *reply)
return true;
}
bool
TestUrgencyChild::AnswerTest4_Reenter()
{
if (SendTest4_NestedSync())
fail("sending nested sync messages not supported");
return true;
}
bool
TestUrgencyChild::AnswerFinalTest_Hang()
{

View File

@ -62,6 +62,7 @@ IPDL_SOURCES += [
'PTestNestedLoops.ipdl',
'PTestOpens.ipdl',
'PTestOpensOpened.ipdl',
'PTestRPC.ipdl',
'PTestRaceDeferral.ipdl',
'PTestRacyInterruptReplies.ipdl',
'PTestRacyReentry.ipdl',

View File

@ -0,0 +1,6 @@
intr protocol rpcParentToChild {
// can't declare rpc parent-to-child messages
child: rpc Msg();
};

View File

@ -0,0 +1,8 @@
sync protocol tooWeakRpcSync {
// it's an error to declare a sync protocol with an rpc or urgent message
parent:
rpc Msg();
child:
urgent Msg2();
};

View File

@ -0,0 +1,6 @@
intr protocol urgentChildToParent {
// can't declare urgent child-to-parent messages
parent: ugrent Msg();
};