Bug 699319 - Part 2: Create threaded version of the Link class. r=cjones

This commit is contained in:
Niko Matsakis 2011-11-30 08:26:16 -08:00
parent fce15e5c44
commit 40750730de
3 changed files with 183 additions and 4 deletions

View File

@ -190,6 +190,9 @@ AsyncChannel::ProcessLink::Open(mozilla::ipc::Transport* aTransport,
void
AsyncChannel::ProcessLink::EchoMessage(Message *msg)
{
mChan->AssertWorkerThread();
mChan->mMonitor->AssertCurrentThreadOwns();
// NB: Go through this OnMessageReceived indirection so that
// echoing this message does the right thing for SyncChannel
// and RPCChannel too
@ -203,6 +206,8 @@ void
AsyncChannel::ProcessLink::SendMessage(Message *msg)
{
mChan->AssertWorkerThread();
mChan->mMonitor->AssertCurrentThreadOwns();
mIOLoop->PostTask(
FROM_HERE,
NewRunnableMethod(mTransport, &Transport::Send, msg));
@ -218,6 +223,54 @@ AsyncChannel::ProcessLink::SendClose()
FROM_HERE, NewRunnableMethod(this, &ProcessLink::OnCloseChannel));
}
AsyncChannel::ThreadLink::ThreadLink(AsyncChannel *aChan,
AsyncChannel *aTargetChan)
: Link(aChan),
mTargetChan(aTargetChan)
{
}
AsyncChannel::ThreadLink::~ThreadLink()
{
mTargetChan = 0;
}
void
AsyncChannel::ThreadLink::EchoMessage(Message *msg)
{
mChan->AssertWorkerThread();
mChan->mMonitor->AssertCurrentThreadOwns();
mChan->OnMessageReceivedFromLink(*msg);
delete msg;
}
void
AsyncChannel::ThreadLink::SendMessage(Message *msg)
{
mChan->AssertWorkerThread();
mChan->mMonitor->AssertCurrentThreadOwns();
mTargetChan->OnMessageReceivedFromLink(*msg);
delete msg;
}
void
AsyncChannel::ThreadLink::SendClose()
{
mChan->AssertWorkerThread();
mChan->mMonitor->AssertCurrentThreadOwns();
mChan->mChannelState = ChannelClosed;
// In a ProcessLink, we would close our half the channel. This
// would show up on the other side as an error on the I/O thread.
// The I/O thread would then invoke OnChannelErrorFromLink().
// As usual, we skip that process and just invoke the
// OnChannelErrorFromLink() method directly.
mTargetChan->OnChannelErrorFromLink();
}
AsyncChannel::AsyncChannel(AsyncListener* aListener)
: mListener(aListener),
mChannelState(ChannelClosed),
@ -249,6 +302,82 @@ AsyncChannel::Open(Transport* aTransport,
return true;
}
/* Opens a connection to another thread in the same process.
This handshake proceeds as follows:
- Let A be the thread initiating the process (either child or parent)
and B be the other thread.
- A spawns thread for B, obtaining B's message loop
- A creates ProtocolChild and ProtocolParent instances.
Let PA be the one appropriate to A and PB the side for B.
- A invokes PA->Open(PB, ...):
- set state to mChannelOpening
- this will place a work item in B's worker loop (see next bullet)
and then spins until PB->mChannelState becomes mChannelConnected
- meanwhile, on PB's worker loop, the work item is removed and:
- invokes PB->SlaveOpen(PA, ...):
- sets its state and that of PA to Connected
*/
bool
AsyncChannel::Open(AsyncChannel *aTargetChan,
MessageLoop *aTargetLoop,
AsyncChannel::Side aSide)
{
NS_PRECONDITION(aTargetChan, "Need a target channel");
NS_PRECONDITION(ChannelClosed == mChannelState, "Not currently closed");
CommonThreadOpenInit(aTargetChan, aSide);
Side oppSide = Unknown;
switch(aSide) {
case Child: oppSide = Parent; break;
case Parent: oppSide = Child; break;
case Unknown: break;
}
mMonitor = new RefCountedMonitor();
aTargetLoop->PostTask(
FROM_HERE,
NewRunnableMethod(aTargetChan, &AsyncChannel::OnOpenAsSlave,
this, oppSide));
MonitorAutoLock lock(*mMonitor);
mChannelState = ChannelOpening;
while (ChannelOpening == mChannelState)
mMonitor->Wait();
NS_ASSERTION(ChannelConnected == mChannelState, "not connected when awoken");
return (ChannelConnected == mChannelState);
}
void
AsyncChannel::CommonThreadOpenInit(AsyncChannel *aTargetChan, Side aSide)
{
mWorkerLoop = MessageLoop::current();
mLink = new ThreadLink(this, aTargetChan);
mChild = (aSide == Child);
}
// Invoked when the other side has begun the open.
void
AsyncChannel::OnOpenAsSlave(AsyncChannel *aTargetChan, Side aSide)
{
NS_PRECONDITION(ChannelClosed == mChannelState,
"Not currently closed");
NS_PRECONDITION(ChannelOpening == aTargetChan->mChannelState,
"Target channel not in the process of opening");
CommonThreadOpenInit(aTargetChan, aSide);
mMonitor = aTargetChan->mMonitor;
MonitorAutoLock lock(*mMonitor);
NS_ASSERTION(ChannelOpening == aTargetChan->mChannelState,
"Target channel not in the process of opening");
mChannelState = ChannelConnected;
aTargetChan->mChannelState = ChannelConnected;
aTargetChan->mMonitor->Notify();
}
void
AsyncChannel::Close()
{

View File

@ -122,6 +122,16 @@ public:
// i.e., mChannelState == ChannelConnected.
bool Open(Transport* aTransport, MessageLoop* aIOLoop=0, Side aSide=Unknown);
// "Open" a connection to another thread in the same process.
//
// Returns true iff the transport layer was successfully connected,
// i.e., mChannelState == ChannelConnected.
//
// For more details on the process of opening a channel between
// threads, see the extended comment on this function
// in AsyncChannel.cpp.
bool Open(AsyncChannel *aTargetChan, MessageLoop *aTargetLoop, Side aSide);
// Close the underlying transport channel.
void Close();
@ -189,9 +199,22 @@ public:
NS_OVERRIDE virtual void OnChannelConnected(int32 peer_pid);
NS_OVERRIDE virtual void OnChannelError();
virtual void EchoMessage(Message *msg);
virtual void SendMessage(Message *msg);
virtual void SendClose();
NS_OVERRIDE virtual void EchoMessage(Message *msg);
NS_OVERRIDE virtual void SendMessage(Message *msg);
NS_OVERRIDE virtual void SendClose();
};
class ThreadLink : public Link {
protected:
AsyncChannel* mTargetChan;
public:
ThreadLink(AsyncChannel *aChan, AsyncChannel *aTargetChan);
virtual ~ThreadLink();
NS_OVERRIDE virtual void EchoMessage(Message *msg);
NS_OVERRIDE virtual void SendMessage(Message *msg);
NS_OVERRIDE virtual void SendClose();
};
protected:
@ -251,6 +274,8 @@ protected:
}
void NotifyChannelClosed();
void NotifyMaybeChannelError();
void OnOpenAsSlave(AsyncChannel *aTargetChan, Side aSide);
void CommonThreadOpenInit(AsyncChannel *aTargetChan, Side aSide);
virtual void Clear();

View File

@ -2696,7 +2696,7 @@ class _GenerateProtocolActorCode(ipdl.ast.Visitor):
self.cls.addstmts([ dtor, Whitespace.NL ])
if ptype.isToplevel():
# Open()
# Open(Transport*, ProcessHandle, MessageLoop*, Side)
aTransportVar = ExprVar('aTransport')
aThreadVar = ExprVar('aThread')
processvar = ExprVar('aOtherProcess')
@ -2724,6 +2724,31 @@ class _GenerateProtocolActorCode(ipdl.ast.Visitor):
openmeth,
Whitespace.NL ])
# Open(AsyncChannel *, MessageLoop *, Side)
aChannel = ExprVar('aChannel')
aMessageLoop = ExprVar('aMessageLoop')
sidevar = ExprVar('aSide')
openmeth = MethodDefn(
MethodDecl(
'Open',
params=[ Decl(Type('AsyncChannel', ptr=True),
aChannel.name),
Param(Type('MessageLoop', ptr=True),
aMessageLoop.name),
Param(Type('AsyncChannel::Side'),
sidevar.name,
default=ExprVar('Channel::Unknown')) ],
ret=Type.BOOL))
openmeth.addstmts([
StmtExpr(ExprAssn(p.otherProcessVar(), ExprLiteral.ZERO)),
StmtReturn(ExprCall(ExprSelect(p.channelVar(), '.', 'Open'),
[ aChannel, aMessageLoop, sidevar ]))
])
self.cls.addstmts([
openmeth,
Whitespace.NL ])
# Close()
closemeth = MethodDefn(MethodDecl('Close'))
closemeth.addstmt(StmtExpr(