diff --git a/ipc/glue/AsyncChannel.cpp b/ipc/glue/AsyncChannel.cpp index 4a14d0b134f0..cde92aa83652 100644 --- a/ipc/glue/AsyncChannel.cpp +++ b/ipc/glue/AsyncChannel.cpp @@ -190,6 +190,9 @@ AsyncChannel::ProcessLink::Open(mozilla::ipc::Transport* aTransport, void AsyncChannel::ProcessLink::EchoMessage(Message *msg) { + mChan->AssertWorkerThread(); + mChan->mMonitor->AssertCurrentThreadOwns(); + // NB: Go through this OnMessageReceived indirection so that // echoing this message does the right thing for SyncChannel // and RPCChannel too @@ -203,6 +206,8 @@ void AsyncChannel::ProcessLink::SendMessage(Message *msg) { mChan->AssertWorkerThread(); + mChan->mMonitor->AssertCurrentThreadOwns(); + mIOLoop->PostTask( FROM_HERE, NewRunnableMethod(mTransport, &Transport::Send, msg)); @@ -218,6 +223,54 @@ AsyncChannel::ProcessLink::SendClose() FROM_HERE, NewRunnableMethod(this, &ProcessLink::OnCloseChannel)); } +AsyncChannel::ThreadLink::ThreadLink(AsyncChannel *aChan, + AsyncChannel *aTargetChan) + : Link(aChan), + mTargetChan(aTargetChan) +{ +} + +AsyncChannel::ThreadLink::~ThreadLink() +{ + mTargetChan = 0; +} + +void +AsyncChannel::ThreadLink::EchoMessage(Message *msg) +{ + mChan->AssertWorkerThread(); + mChan->mMonitor->AssertCurrentThreadOwns(); + + mChan->OnMessageReceivedFromLink(*msg); + delete msg; +} + +void +AsyncChannel::ThreadLink::SendMessage(Message *msg) +{ + mChan->AssertWorkerThread(); + mChan->mMonitor->AssertCurrentThreadOwns(); + + mTargetChan->OnMessageReceivedFromLink(*msg); + delete msg; +} + +void +AsyncChannel::ThreadLink::SendClose() +{ + mChan->AssertWorkerThread(); + mChan->mMonitor->AssertCurrentThreadOwns(); + + mChan->mChannelState = ChannelClosed; + + // In a ProcessLink, we would close our half the channel. This + // would show up on the other side as an error on the I/O thread. + // The I/O thread would then invoke OnChannelErrorFromLink(). + // As usual, we skip that process and just invoke the + // OnChannelErrorFromLink() method directly. + mTargetChan->OnChannelErrorFromLink(); +} + AsyncChannel::AsyncChannel(AsyncListener* aListener) : mListener(aListener), mChannelState(ChannelClosed), @@ -249,6 +302,82 @@ AsyncChannel::Open(Transport* aTransport, return true; } +/* Opens a connection to another thread in the same process. + + This handshake proceeds as follows: + - Let A be the thread initiating the process (either child or parent) + and B be the other thread. + - A spawns thread for B, obtaining B's message loop + - A creates ProtocolChild and ProtocolParent instances. + Let PA be the one appropriate to A and PB the side for B. + - A invokes PA->Open(PB, ...): + - set state to mChannelOpening + - this will place a work item in B's worker loop (see next bullet) + and then spins until PB->mChannelState becomes mChannelConnected + - meanwhile, on PB's worker loop, the work item is removed and: + - invokes PB->SlaveOpen(PA, ...): + - sets its state and that of PA to Connected + */ +bool +AsyncChannel::Open(AsyncChannel *aTargetChan, + MessageLoop *aTargetLoop, + AsyncChannel::Side aSide) +{ + NS_PRECONDITION(aTargetChan, "Need a target channel"); + NS_PRECONDITION(ChannelClosed == mChannelState, "Not currently closed"); + + CommonThreadOpenInit(aTargetChan, aSide); + + Side oppSide = Unknown; + switch(aSide) { + case Child: oppSide = Parent; break; + case Parent: oppSide = Child; break; + case Unknown: break; + } + + mMonitor = new RefCountedMonitor(); + + aTargetLoop->PostTask( + FROM_HERE, + NewRunnableMethod(aTargetChan, &AsyncChannel::OnOpenAsSlave, + this, oppSide)); + + MonitorAutoLock lock(*mMonitor); + mChannelState = ChannelOpening; + while (ChannelOpening == mChannelState) + mMonitor->Wait(); + NS_ASSERTION(ChannelConnected == mChannelState, "not connected when awoken"); + return (ChannelConnected == mChannelState); +} + +void +AsyncChannel::CommonThreadOpenInit(AsyncChannel *aTargetChan, Side aSide) +{ + mWorkerLoop = MessageLoop::current(); + mLink = new ThreadLink(this, aTargetChan); + mChild = (aSide == Child); +} + +// Invoked when the other side has begun the open. +void +AsyncChannel::OnOpenAsSlave(AsyncChannel *aTargetChan, Side aSide) +{ + NS_PRECONDITION(ChannelClosed == mChannelState, + "Not currently closed"); + NS_PRECONDITION(ChannelOpening == aTargetChan->mChannelState, + "Target channel not in the process of opening"); + + CommonThreadOpenInit(aTargetChan, aSide); + mMonitor = aTargetChan->mMonitor; + + MonitorAutoLock lock(*mMonitor); + NS_ASSERTION(ChannelOpening == aTargetChan->mChannelState, + "Target channel not in the process of opening"); + mChannelState = ChannelConnected; + aTargetChan->mChannelState = ChannelConnected; + aTargetChan->mMonitor->Notify(); +} + void AsyncChannel::Close() { diff --git a/ipc/glue/AsyncChannel.h b/ipc/glue/AsyncChannel.h index a10405fa56e8..b075b9e00374 100644 --- a/ipc/glue/AsyncChannel.h +++ b/ipc/glue/AsyncChannel.h @@ -122,6 +122,16 @@ public: // i.e., mChannelState == ChannelConnected. bool Open(Transport* aTransport, MessageLoop* aIOLoop=0, Side aSide=Unknown); + // "Open" a connection to another thread in the same process. + // + // Returns true iff the transport layer was successfully connected, + // i.e., mChannelState == ChannelConnected. + // + // For more details on the process of opening a channel between + // threads, see the extended comment on this function + // in AsyncChannel.cpp. + bool Open(AsyncChannel *aTargetChan, MessageLoop *aTargetLoop, Side aSide); + // Close the underlying transport channel. void Close(); @@ -189,9 +199,22 @@ public: NS_OVERRIDE virtual void OnChannelConnected(int32 peer_pid); NS_OVERRIDE virtual void OnChannelError(); - virtual void EchoMessage(Message *msg); - virtual void SendMessage(Message *msg); - virtual void SendClose(); + NS_OVERRIDE virtual void EchoMessage(Message *msg); + NS_OVERRIDE virtual void SendMessage(Message *msg); + NS_OVERRIDE virtual void SendClose(); + }; + + class ThreadLink : public Link { + protected: + AsyncChannel* mTargetChan; + + public: + ThreadLink(AsyncChannel *aChan, AsyncChannel *aTargetChan); + virtual ~ThreadLink(); + + NS_OVERRIDE virtual void EchoMessage(Message *msg); + NS_OVERRIDE virtual void SendMessage(Message *msg); + NS_OVERRIDE virtual void SendClose(); }; protected: @@ -251,6 +274,8 @@ protected: } void NotifyChannelClosed(); void NotifyMaybeChannelError(); + void OnOpenAsSlave(AsyncChannel *aTargetChan, Side aSide); + void CommonThreadOpenInit(AsyncChannel *aTargetChan, Side aSide); virtual void Clear(); diff --git a/ipc/ipdl/ipdl/lower.py b/ipc/ipdl/ipdl/lower.py index d482be8c6363..b9c6ed2a6c0d 100644 --- a/ipc/ipdl/ipdl/lower.py +++ b/ipc/ipdl/ipdl/lower.py @@ -2696,7 +2696,7 @@ class _GenerateProtocolActorCode(ipdl.ast.Visitor): self.cls.addstmts([ dtor, Whitespace.NL ]) if ptype.isToplevel(): - # Open() + # Open(Transport*, ProcessHandle, MessageLoop*, Side) aTransportVar = ExprVar('aTransport') aThreadVar = ExprVar('aThread') processvar = ExprVar('aOtherProcess') @@ -2724,6 +2724,31 @@ class _GenerateProtocolActorCode(ipdl.ast.Visitor): openmeth, Whitespace.NL ]) + # Open(AsyncChannel *, MessageLoop *, Side) + aChannel = ExprVar('aChannel') + aMessageLoop = ExprVar('aMessageLoop') + sidevar = ExprVar('aSide') + openmeth = MethodDefn( + MethodDecl( + 'Open', + params=[ Decl(Type('AsyncChannel', ptr=True), + aChannel.name), + Param(Type('MessageLoop', ptr=True), + aMessageLoop.name), + Param(Type('AsyncChannel::Side'), + sidevar.name, + default=ExprVar('Channel::Unknown')) ], + ret=Type.BOOL)) + + openmeth.addstmts([ + StmtExpr(ExprAssn(p.otherProcessVar(), ExprLiteral.ZERO)), + StmtReturn(ExprCall(ExprSelect(p.channelVar(), '.', 'Open'), + [ aChannel, aMessageLoop, sidevar ])) + ]) + self.cls.addstmts([ + openmeth, + Whitespace.NL ]) + # Close() closemeth = MethodDefn(MethodDecl('Close')) closemeth.addstmt(StmtExpr(