Bug 1366316 - Separate thread for IPCBlobInputStream actors - part 1 - actor migration, r=smaug

This commit is contained in:
Andrea Marchesini 2017-05-31 07:41:10 +02:00
parent 4943943130
commit dd8eddb26f
12 changed files with 451 additions and 29 deletions

View File

@ -5,6 +5,7 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "IPCBlobInputStreamChild.h"
#include "IPCBlobInputStreamThread.h"
#include "mozilla/ipc/IPCStreamUtils.h"
#include "WorkerPrivate.h"
@ -49,7 +50,9 @@ public:
NS_IMETHOD
Run() override
{
if (mActor->IsAlive()) {
MOZ_ASSERT(mActor->State() != IPCBlobInputStreamChild::eActiveMigrating &&
mActor->State() != IPCBlobInputStreamChild::eInactiveMigrating);
if (mActor->State() == IPCBlobInputStreamChild::eActive) {
mActor->SendStreamNeeded();
}
return NS_OK;
@ -112,7 +115,7 @@ IPCBlobInputStreamChild::IPCBlobInputStreamChild(const nsID& aID,
: mMutex("IPCBlobInputStreamChild::mMutex")
, mID(aID)
, mSize(aSize)
, mActorAlive(true)
, mState(eActive)
, mOwningThread(NS_GetCurrentThread())
{
// If we are running in a worker, we need to send a Close() to the parent side
@ -142,42 +145,76 @@ IPCBlobInputStreamChild::Shutdown()
mWorkerHolder = nullptr;
mPendingOperations.Clear();
if (mActorAlive) {
if (mState == eActive) {
SendClose();
mActorAlive = false;
mState = eInactive;
}
}
void
IPCBlobInputStreamChild::ActorDestroy(IProtocol::ActorDestroyReason aReason)
{
bool migrating = false;
{
MutexAutoLock lock(mMutex);
mActorAlive = false;
migrating = mState == eActiveMigrating;
mState = migrating ? eInactiveMigrating : eInactive;
}
if (migrating) {
// We were waiting for this! Now we can migrate the actor in the correct
// thread.
RefPtr<IPCBlobInputStreamThread> thread =
IPCBlobInputStreamThread::GetOrCreate();
ResetManager();
thread->MigrateActor(this);
return;
}
// Let's cleanup the workerHolder and the pending operation queue.
Shutdown();
}
bool
IPCBlobInputStreamChild::IsAlive()
IPCBlobInputStreamChild::ActorState
IPCBlobInputStreamChild::State()
{
MutexAutoLock lock(mMutex);
return mActorAlive;
return mState;
}
already_AddRefed<nsIInputStream>
IPCBlobInputStreamChild::CreateStream()
{
MutexAutoLock lock(mMutex);
if (!mActorAlive) {
return nullptr;
}
bool shouldMigrate = false;
RefPtr<IPCBlobInputStream> stream = new IPCBlobInputStream(this);
mStreams.AppendElement(stream);
{
MutexAutoLock lock(mMutex);
if (mState == eInactive) {
return nullptr;
}
// The stream is active but maybe it is not running in the DOM-File thread.
// We should migrate it there.
if (mState == eActive &&
!IPCBlobInputStreamThread::IsOnFileThread(mOwningThread)) {
MOZ_ASSERT(mStreams.IsEmpty());
shouldMigrate = true;
mState = eActiveMigrating;
}
mStreams.AppendElement(stream);
}
// Send__delete__ will call ActorDestroy(). mMutex cannot be locked at this
// time.
if (shouldMigrate) {
Send__delete__(this);
}
return stream.forget();
}
@ -192,7 +229,7 @@ IPCBlobInputStreamChild::ForgetStream(IPCBlobInputStream* aStream)
MutexAutoLock lock(mMutex);
mStreams.RemoveElement(aStream);
if (!mStreams.IsEmpty() || !mActorAlive) {
if (!mStreams.IsEmpty() || mState != eActive) {
return;
}
}
@ -212,7 +249,7 @@ IPCBlobInputStreamChild::StreamNeeded(IPCBlobInputStream* aStream,
{
MutexAutoLock lock(mMutex);
if (!mActorAlive) {
if (mState == eInactive) {
return;
}
@ -222,6 +259,13 @@ IPCBlobInputStreamChild::StreamNeeded(IPCBlobInputStream* aStream,
opt->mStream = aStream;
opt->mEventTarget = aEventTarget ? aEventTarget : NS_GetCurrentThread();
if (mState == eActiveMigrating || mState == eInactiveMigrating) {
// This operation will be continued when the migration is completed.
return;
}
MOZ_ASSERT(mState == eActive);
if (mOwningThread == NS_GetCurrentThread()) {
SendStreamNeeded();
return;
@ -242,7 +286,7 @@ IPCBlobInputStreamChild::RecvStreamReady(const OptionalIPCStream& aStream)
{
MutexAutoLock lock(mMutex);
MOZ_ASSERT(!mPendingOperations.IsEmpty());
MOZ_ASSERT(mActorAlive);
MOZ_ASSERT(mState == eActive);
pendingStream = mPendingOperations[0].mStream;
eventTarget = mPendingOperations[0].mEventTarget;
@ -257,5 +301,30 @@ IPCBlobInputStreamChild::RecvStreamReady(const OptionalIPCStream& aStream)
return IPC_OK();
}
void
IPCBlobInputStreamChild::Migrated()
{
MutexAutoLock lock(mMutex);
MOZ_ASSERT(mState == eInactiveMigrating);
mOwningThread = NS_GetCurrentThread();
MOZ_ASSERT(IPCBlobInputStreamThread::IsOnFileThread(mOwningThread));
// Maybe we have no reasons to keep this actor alive.
if (mStreams.IsEmpty()) {
mState = eInactive;
SendClose();
return;
}
mState = eActive;
// Let's processing the pending operations. We need a stream for each pending
// operation.
for (uint32_t i = 0; i < mPendingOperations.Length(); ++i) {
SendStreamNeeded();
}
}
} // namespace dom
} // namespace mozilla

View File

@ -26,6 +26,23 @@ class IPCBlobInputStreamChild final
: public mozilla::ipc::PIPCBlobInputStreamChild
{
public:
enum ActorState
{
// The actor is connected via IPDL to the parent.
eActive,
// The actor is disconnected.
eInactive,
// The actor is waiting to be disconnected. Once it has been disconnected,
// it will be reactivated on the DOM-File thread.
eActiveMigrating,
// The actor has been disconnected and it's waiting to be connected on the
// DOM-File thread.
eInactiveMigrating,
};
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(IPCBlobInputStreamChild)
IPCBlobInputStreamChild(const nsID& aID, uint64_t aSize);
@ -33,8 +50,8 @@ public:
void
ActorDestroy(IProtocol::ActorDestroyReason aReason) override;
bool
IsAlive();
ActorState
State();
already_AddRefed<nsIInputStream>
CreateStream();
@ -64,6 +81,9 @@ public:
void
Shutdown();
void
Migrated();
private:
~IPCBlobInputStreamChild();
@ -78,8 +98,7 @@ private:
const nsID mID;
const uint64_t mSize;
// false when ActorDestroy() is called.
bool mActorAlive;
ActorState mState;
// This struct and the array are used for creating streams when needed.
struct PendingOperation

View File

@ -31,6 +31,13 @@ IPCBlobInputStreamParent::Create(nsIInputStream* aInputStream, uint64_t aSize,
return new IPCBlobInputStreamParent(id, aSize, aManager);
}
/* static */ IPCBlobInputStreamParent*
IPCBlobInputStreamParent::Create(const nsID& aID, uint64_t aSize,
PBackgroundParent* aManager)
{
return new IPCBlobInputStreamParent(aID, aSize, aManager);
}
IPCBlobInputStreamParent::IPCBlobInputStreamParent(const nsID& aID,
uint64_t aSize,
nsIContentParent* aManager)
@ -38,6 +45,7 @@ IPCBlobInputStreamParent::IPCBlobInputStreamParent(const nsID& aID,
, mSize(aSize)
, mContentManager(aManager)
, mPBackgroundManager(nullptr)
, mMigrating(false)
{}
IPCBlobInputStreamParent::IPCBlobInputStreamParent(const nsID& aID,
@ -47,6 +55,7 @@ IPCBlobInputStreamParent::IPCBlobInputStreamParent(const nsID& aID,
, mSize(aSize)
, mContentManager(nullptr)
, mPBackgroundManager(aManager)
, mMigrating(false)
{}
void
@ -57,13 +66,16 @@ IPCBlobInputStreamParent::ActorDestroy(IProtocol::ActorDestroyReason aReason)
mContentManager = nullptr;
mPBackgroundManager = nullptr;
IPCBlobInputStreamStorage::Get()->ForgetStream(mID);
if (!mMigrating) {
IPCBlobInputStreamStorage::Get()->ForgetStream(mID);
RefPtr<IPCBlobInputStreamParentCallback> callback;
mCallback.swap(callback);
// TODO, this calllback must be migrated as well!
RefPtr<IPCBlobInputStreamParentCallback> callback;
mCallback.swap(callback);
if (callback) {
callback->ActorDestroyed(mID);
if (callback) {
callback->ActorDestroyed(mID);
}
}
}
@ -123,5 +135,21 @@ IPCBlobInputStreamParent::RecvClose()
return IPC_OK();
}
mozilla::ipc::IPCResult
IPCBlobInputStreamParent::Recv__delete__()
{
MOZ_ASSERT(mContentManager || mPBackgroundManager);
mMigrating = true;
return IPC_OK();
}
bool
IPCBlobInputStreamParent::HasValidStream() const
{
nsCOMPtr<nsIInputStream> stream;
IPCBlobInputStreamStorage::Get()->GetStream(mID, getter_AddRefs(stream));
return !!stream;
}
} // namespace dom
} // namespace mozilla

View File

@ -39,6 +39,10 @@ public:
Create(nsIInputStream* aInputStream, uint64_t aSize, nsresult* aRv,
M* aManager);
static IPCBlobInputStreamParent*
Create(const nsID& aID, uint64_t aSize,
mozilla::ipc::PBackgroundParent* aManager);
void
ActorDestroy(IProtocol::ActorDestroyReason aReason) override;
@ -63,6 +67,12 @@ public:
mozilla::ipc::IPCResult
RecvClose() override;
mozilla::ipc::IPCResult
Recv__delete__() override;
bool
HasValidStream() const;
private:
IPCBlobInputStreamParent(const nsID& aID, uint64_t aSize,
nsIContentParent* aManager);
@ -79,6 +89,8 @@ private:
mozilla::ipc::PBackgroundParent* mPBackgroundManager;
RefPtr<IPCBlobInputStreamParentCallback> mCallback;
bool mMigrating;
};
} // namespace dom

View File

@ -0,0 +1,203 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "IPCBlobInputStreamThread.h"
#include "mozilla/StaticMutex.h"
#include "mozilla/ipc/BackgroundChild.h"
#include "mozilla/ipc/PBackgroundChild.h"
#include "nsIIPCBackgroundChildCreateCallback.h"
#include "nsXPCOMPrivate.h"
namespace mozilla {
using namespace ipc;
namespace dom {
namespace {
StaticMutex gIPCBlobThreadMutex;
StaticRefPtr<IPCBlobInputStreamThread> gIPCBlobThread;
bool gShutdownHasStarted = false;
class ThreadInitializeRunnable final : public Runnable
{
public:
NS_IMETHOD
Run() override
{
mozilla::StaticMutexAutoLock lock(gIPCBlobThreadMutex);
MOZ_ASSERT(gIPCBlobThread);
gIPCBlobThread->Initialize();
return NS_OK;
}
};
class MigrateActorRunnable final : public Runnable
, public nsIIPCBackgroundChildCreateCallback
{
public:
NS_DECL_ISUPPORTS_INHERITED
explicit MigrateActorRunnable(IPCBlobInputStreamChild* aActor)
: mActor(aActor)
{
MOZ_ASSERT(mActor);
}
NS_IMETHOD
Run() override
{
BackgroundChild::GetOrCreateForCurrentThread(this);
return NS_OK;
}
void
ActorFailed() override
{
// We cannot continue. We are probably shutting down.
}
void
ActorCreated(mozilla::ipc::PBackgroundChild* aActor) override
{
MOZ_ASSERT(mActor->State() == IPCBlobInputStreamChild::eInactiveMigrating);
if (aActor->SendPIPCBlobInputStreamConstructor(mActor, mActor->ID(),
mActor->Size())) {
// We need manually to increase the reference for this actor because the
// IPC allocator method is not triggered. The Release() is called by IPDL
// when the actor is deleted.
mActor.get()->AddRef();
mActor->Migrated();
}
}
private:
~MigrateActorRunnable() = default;
RefPtr<IPCBlobInputStreamChild> mActor;
};
NS_IMPL_ISUPPORTS_INHERITED(MigrateActorRunnable, Runnable,
nsIIPCBackgroundChildCreateCallback)
} // anonymous
NS_IMPL_ISUPPORTS(IPCBlobInputStreamThread, nsIObserver)
/* static */ bool
IPCBlobInputStreamThread::IsOnFileThread(nsIThread* aThread)
{
MOZ_ASSERT(aThread);
mozilla::StaticMutexAutoLock lock(gIPCBlobThreadMutex);
return gIPCBlobThread && aThread == gIPCBlobThread->mThread;
}
/* static */ IPCBlobInputStreamThread*
IPCBlobInputStreamThread::GetOrCreate()
{
mozilla::StaticMutexAutoLock lock(gIPCBlobThreadMutex);
if (gShutdownHasStarted) {
return nullptr;
}
if (!gIPCBlobThread) {
gIPCBlobThread = new IPCBlobInputStreamThread();
gIPCBlobThread->Initialize();
}
return gIPCBlobThread;
}
void
IPCBlobInputStreamThread::Initialize()
{
if (!NS_IsMainThread()) {
NS_DispatchToMainThread(new ThreadInitializeRunnable());
return;
}
nsCOMPtr<nsIObserverService> obs = services::GetObserverService();
if (NS_WARN_IF(!obs)) {
return;
}
nsresult rv =
obs->AddObserver(this, NS_XPCOM_SHUTDOWN_THREADS_OBSERVER_ID, false);
if (NS_WARN_IF(NS_FAILED(rv))) {
return;
}
nsCOMPtr<nsIThread> thread;
rv = NS_NewNamedThread("DOM File", getter_AddRefs(thread));
if (NS_WARN_IF(NS_FAILED(rv))) {
return;
}
mThread = thread;
if (!mPendingActors.IsEmpty()) {
for (uint32_t i = 0; i < mPendingActors.Length(); ++i) {
MigrateActorInternal(mPendingActors[i]);
}
mPendingActors.Clear();
}
}
NS_IMETHODIMP
IPCBlobInputStreamThread::Observe(nsISupports* aSubject,
const char* aTopic,
const char16_t* aData)
{
MOZ_ASSERT(!strcmp(aTopic, NS_XPCOM_SHUTDOWN_THREADS_OBSERVER_ID));
mozilla::StaticMutexAutoLock lock(gIPCBlobThreadMutex);
if (mThread) {
mThread->Shutdown();
mThread = nullptr;
}
gShutdownHasStarted = true;
gIPCBlobThread = nullptr;
return NS_OK;
}
void
IPCBlobInputStreamThread::MigrateActor(IPCBlobInputStreamChild* aActor)
{
MOZ_ASSERT(aActor->State() == IPCBlobInputStreamChild::eInactiveMigrating);
mozilla::StaticMutexAutoLock lock(gIPCBlobThreadMutex);
if (gShutdownHasStarted) {
return;
}
if (!mThread) {
// The thread is not initialized yet.
mPendingActors.AppendElement(aActor);
return;
}
MigrateActorInternal(aActor);
}
void
IPCBlobInputStreamThread::MigrateActorInternal(IPCBlobInputStreamChild* aActor)
{
RefPtr<Runnable> runnable = new MigrateActorRunnable(aActor);
mThread->Dispatch(runnable, NS_DISPATCH_NORMAL);
}
} // dom namespace
} // mozilla namespace

View File

@ -0,0 +1,53 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifndef mozilla_dom_IPCBlobInputStreamThread_h
#define mozilla_dom_IPCBlobInputStreamThread_h
#include "nsIObserverService.h"
class nsIThread;
namespace mozilla {
namespace dom {
class IPCBlobInputStreamChild;
class IPCBlobInputStreamThread final : public nsIObserver
{
public:
NS_DECL_THREADSAFE_ISUPPORTS
NS_DECL_NSIOBSERVER
static bool
IsOnFileThread(nsIThread* aThread);
static IPCBlobInputStreamThread*
GetOrCreate();
void
MigrateActor(IPCBlobInputStreamChild* aActor);
void
Initialize();
private:
~IPCBlobInputStreamThread() = default;
void
MigrateActorInternal(IPCBlobInputStreamChild* aActor);
nsCOMPtr<nsIThread> mThread;
// This is populated if MigrateActor() is called before the initialization of
// the thread.
nsTArray<RefPtr<IPCBlobInputStreamChild>> mPendingActors;
};
} // dom namespace
} // mozilla namespace
#endif // mozilla_dom_IPCBlobInputStreamThread_h

View File

@ -20,10 +20,23 @@ protocol PIPCBlobInputStream
parent:
async StreamNeeded();
// When this is called, the parent releases the inputStream and sends a
// __delete__.
async Close();
child:
async StreamReady(OptionalIPCStream aStream);
both:
// __delete__ can be called by parent and by child for 2 reasons:
// - parent->child: This happens after a Close(). The child wants to inform
// the parent that no other messages will be dispatched and
// that the channel can be interrupted.
// - child->parent: before any operation, the child could start a migration
// from the current thread to a dedicated DOM-File one. The
// reason why a __delete__ is sent from child to parent is
// because it doesn't require any additional runnables.
async __delete__();
};

View File

@ -22,6 +22,7 @@ UNIFIED_SOURCES += [
'IPCBlobInputStreamChild.cpp',
'IPCBlobInputStreamParent.cpp',
'IPCBlobInputStreamStorage.cpp',
'IPCBlobInputStreamThread.cpp',
'IPCBlobUtils.cpp',
'PendingIPCBlobChild.cpp',
'PendingIPCBlobParent.cpp',
@ -38,6 +39,7 @@ LOCAL_INCLUDES += [
'/dom/file',
'/dom/ipc',
'/dom/workers',
'/xpcom/build',
]
include('/ipc/chromium/chromium-config.mozbuild')

View File

@ -262,7 +262,22 @@ PIPCBlobInputStreamParent*
BackgroundParentImpl::AllocPIPCBlobInputStreamParent(const nsID& aID,
const uint64_t& aSize)
{
MOZ_CRASH("PIPCBlobInputStreamParent actors should be manually constructed!");
AssertIsInMainProcess();
AssertIsOnBackgroundThread();
return mozilla::dom::IPCBlobInputStreamParent::Create(aID, aSize, this);
}
mozilla::ipc::IPCResult
BackgroundParentImpl::RecvPIPCBlobInputStreamConstructor(PIPCBlobInputStreamParent* aActor,
const nsID& aID,
const uint64_t& aSize)
{
if (!static_cast<mozilla::dom::IPCBlobInputStreamParent*>(aActor)->HasValidStream()) {
return IPC_FAIL_NO_REASON(this);
}
return IPC_OK();
}
bool

View File

@ -73,6 +73,11 @@ protected:
AllocPIPCBlobInputStreamParent(const nsID& aID,
const uint64_t& aSize) override;
virtual mozilla::ipc::IPCResult
RecvPIPCBlobInputStreamConstructor(PIPCBlobInputStreamParent* aActor,
const nsID& aID,
const uint64_t& aSize) override;
virtual bool
DeallocPIPCBlobInputStreamParent(PIPCBlobInputStreamParent* aActor) override;

View File

@ -123,11 +123,13 @@ child:
async PParentToChildStream();
async PIPCBlobInputStream(nsID aID, uint64_t aSize);
async PPendingIPCBlob(IPCBlob blob);
both:
// PIPCBlobInputStream is created on the parent side only if the child starts
// a migration.
async PIPCBlobInputStream(nsID aID, uint64_t aSize);
async PFileDescriptorSet(FileDescriptor fd);
};

View File

@ -204,6 +204,7 @@ protected:
friend class IToplevelProtocol;
void SetId(int32_t aId) { mId = aId; }
void ResetManager() { mManager = nullptr; }
void SetManager(IProtocol* aManager);
void SetIPCChannel(MessageChannel* aChannel) { mChannel = aChannel; }