Bug 1239292 - Remove the multi-threaded job scheduler. r=jrmuizel

Differential Revision: https://phabricator.services.mozilla.com/D76027
This commit is contained in:
Nicolas Silva 2020-05-20 20:04:05 +00:00
parent 9f814b62de
commit 77f659bffc
12 changed files with 1 additions and 1517 deletions

View File

@ -1,99 +0,0 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "DrawingJob.h"
#include "JobScheduler.h"
#include "mozilla/gfx/2D.h"
namespace mozilla {
namespace gfx {
DrawingJobBuilder::DrawingJobBuilder() = default;
DrawingJobBuilder::~DrawingJobBuilder() { MOZ_ASSERT(!mDrawTarget); }
void DrawingJob::Clear() {
mCommandBuffer = nullptr;
mCursor = 0;
}
void DrawingJobBuilder::BeginDrawingJob(DrawTarget* aTarget, IntPoint aOffset,
SyncObject* aStart) {
MOZ_ASSERT(mCommandOffsets.empty());
MOZ_ASSERT(aTarget);
mDrawTarget = aTarget;
mOffset = aOffset;
mStart = aStart;
}
DrawingJob* DrawingJobBuilder::EndDrawingJob(CommandBuffer* aCmdBuffer,
SyncObject* aCompletion,
WorkerThread* aPinToWorker) {
MOZ_ASSERT(mDrawTarget);
DrawingJob* task =
new DrawingJob(mDrawTarget, mOffset, mStart, aCompletion, aPinToWorker);
task->mCommandBuffer = aCmdBuffer;
task->mCommandOffsets = std::move(mCommandOffsets);
mDrawTarget = nullptr;
mOffset = IntPoint();
mStart = nullptr;
return task;
}
DrawingJob::DrawingJob(DrawTarget* aTarget, IntPoint aOffset,
SyncObject* aStart, SyncObject* aCompletion,
WorkerThread* aPinToWorker)
: Job(aStart, aCompletion, aPinToWorker),
mCommandBuffer(nullptr),
mCursor(0),
mDrawTarget(aTarget),
mOffset(aOffset) {
mCommandOffsets.reserve(64);
}
JobStatus DrawingJob::Run() {
while (mCursor < mCommandOffsets.size()) {
const DrawingCommand* cmd =
mCommandBuffer->GetDrawingCommand(mCommandOffsets[mCursor]);
if (!cmd) {
return JobStatus::Error;
}
cmd->ExecuteOnDT(mDrawTarget);
++mCursor;
}
return JobStatus::Complete;
}
DrawingJob::~DrawingJob() { Clear(); }
const DrawingCommand* CommandBuffer::GetDrawingCommand(ptrdiff_t aId) {
return static_cast<DrawingCommand*>(mStorage.GetStorage(aId));
}
CommandBuffer::~CommandBuffer() {
mStorage.ForEach([](void* item) {
static_cast<DrawingCommand*>(item)->~DrawingCommand();
});
mStorage.Clear();
}
void CommandBufferBuilder::BeginCommandBuffer(size_t aBufferSize) {
MOZ_ASSERT(!mCommands);
mCommands = new CommandBuffer(aBufferSize);
}
already_AddRefed<CommandBuffer> CommandBufferBuilder::EndCommandBuffer() {
return mCommands.forget();
}
} // namespace gfx
} // namespace mozilla

View File

@ -1,149 +0,0 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifndef MOZILLA_GFX_COMMANDBUFFER_H_
#define MOZILLA_GFX_COMMANDBUFFER_H_
#include <stdint.h>
#include "mozilla/RefPtr.h"
#include "mozilla/Assertions.h"
#include "mozilla/gfx/Matrix.h"
#include "mozilla/gfx/JobScheduler.h"
#include "mozilla/gfx/IterableArena.h"
#include "mozilla/RefCounted.h"
#include "DrawCommand.h"
namespace mozilla {
namespace gfx {
class DrawingCommand;
class PrintCommand;
class SignalCommand;
class DrawingJob;
class WaitCommand;
class SyncObject;
class MultiThreadedJobQueue;
class DrawTarget;
class DrawingJobBuilder;
class CommandBufferBuilder;
/// Contains a sequence of immutable drawing commands that are typically used by
/// several DrawingJobs.
///
/// CommandBuffer objects are built using CommandBufferBuilder.
class CommandBuffer : public external::AtomicRefCounted<CommandBuffer> {
public:
MOZ_DECLARE_REFCOUNTED_TYPENAME(CommandBuffer)
~CommandBuffer();
const DrawingCommand* GetDrawingCommand(ptrdiff_t aId);
protected:
explicit CommandBuffer(size_t aSize = 256)
: mStorage(IterableArena::GROWABLE, aSize) {}
IterableArena mStorage;
friend class CommandBufferBuilder;
};
/// Generates CommandBuffer objects.
///
/// The builder is a separate object to ensure that commands are not added to a
/// submitted CommandBuffer.
class CommandBufferBuilder {
public:
void BeginCommandBuffer(size_t aBufferSize = 256);
already_AddRefed<CommandBuffer> EndCommandBuffer();
/// Build the CommandBuffer, command after command.
/// This must be used between BeginCommandBuffer and EndCommandBuffer.
template <typename T, typename... Args>
ptrdiff_t AddCommand(Args&&... aArgs) {
static_assert(std::is_base_of<DrawingCommand, T>::value,
"T must derive from DrawingCommand");
return mCommands->mStorage.Alloc<T>(std::forward<Args>(aArgs)...);
}
bool HasCommands() const { return !!mCommands; }
protected:
RefPtr<CommandBuffer> mCommands;
};
/// Stores multiple commands to be executed sequencially.
class DrawingJob : public Job {
public:
virtual ~DrawingJob();
JobStatus Run() override;
protected:
DrawingJob(DrawTarget* aTarget, IntPoint aOffset, SyncObject* aStart,
SyncObject* aCompletion, WorkerThread* aPinToWorker = nullptr);
/// Runs the tasks's destructors and resets the buffer.
void Clear();
std::vector<ptrdiff_t> mCommandOffsets;
RefPtr<CommandBuffer> mCommandBuffer;
uint32_t mCursor;
RefPtr<DrawTarget> mDrawTarget;
IntPoint mOffset;
friend class DrawingJobBuilder;
};
/// Generates DrawingJob objects.
///
/// The builder is a separate object to ensure that commands are not added to a
/// submitted DrawingJob.
class DrawingJobBuilder final {
public:
DrawingJobBuilder();
~DrawingJobBuilder();
/// Allocates a DrawingJob.
///
/// call this method before starting to add commands.
void BeginDrawingJob(DrawTarget* aTarget, IntPoint aOffset,
SyncObject* aStart = nullptr);
/// Build the DrawingJob, command after command.
/// This must be used between BeginDrawingJob and EndDrawingJob.
void AddCommand(ptrdiff_t offset) { mCommandOffsets.push_back(offset); }
/// Finalizes and returns the drawing task.
///
/// If aCompletion is not null, the sync object will be signaled after the
/// task buffer is destroyed (and after the destructor of the tasks have run).
/// In most cases this means after the completion of all tasks in the task
/// buffer, but also when the task buffer is destroyed due to an error.
DrawingJob* EndDrawingJob(CommandBuffer* aCmdBuffer,
SyncObject* aCompletion = nullptr,
WorkerThread* aPinToWorker = nullptr);
/// Returns true between BeginDrawingJob and EndDrawingJob, false otherwise.
bool HasDrawingJob() const { return !!mDrawTarget; }
protected:
std::vector<ptrdiff_t> mCommandOffsets;
RefPtr<DrawTarget> mDrawTarget;
IntPoint mOffset;
RefPtr<SyncObject> mStart;
};
} // namespace gfx
} // namespace mozilla
#endif

View File

@ -1,250 +0,0 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "JobScheduler.h"
#include "Logging.h"
namespace mozilla {
namespace gfx {
JobScheduler* JobScheduler::sSingleton = nullptr;
bool JobScheduler::Init(uint32_t aNumThreads, uint32_t aNumQueues) {
MOZ_ASSERT(!sSingleton);
MOZ_ASSERT(aNumThreads >= aNumQueues);
sSingleton = new JobScheduler();
sSingleton->mNextQueue = 0;
for (uint32_t i = 0; i < aNumQueues; ++i) {
sSingleton->mDrawingQueues.push_back(new MultiThreadedJobQueue());
}
for (uint32_t i = 0; i < aNumThreads; ++i) {
sSingleton->mWorkerThreads.push_back(
WorkerThread::Create(sSingleton->mDrawingQueues[i % aNumQueues]));
}
return true;
}
void JobScheduler::ShutDown() {
MOZ_ASSERT(IsEnabled());
if (!IsEnabled()) {
return;
}
for (auto queue : sSingleton->mDrawingQueues) {
queue->ShutDown();
delete queue;
}
for (WorkerThread* thread : sSingleton->mWorkerThreads) {
// this will block until the thread is joined.
delete thread;
}
sSingleton->mWorkerThreads.clear();
delete sSingleton;
sSingleton = nullptr;
}
JobStatus JobScheduler::ProcessJob(Job* aJob) {
MOZ_ASSERT(aJob);
auto status = aJob->Run();
if (status == JobStatus::Error || status == JobStatus::Complete) {
delete aJob;
}
return status;
}
void JobScheduler::SubmitJob(Job* aJob) {
MOZ_ASSERT(aJob);
RefPtr<SyncObject> start = aJob->GetStartSync();
if (start && start->Register(aJob)) {
// The Job buffer starts with a non-signaled sync object, it
// is now registered in the list of task buffers waiting on the
// sync object, so we should not place it in the queue.
return;
}
GetQueueForJob(aJob)->SubmitJob(aJob);
}
void JobScheduler::Join(SyncObject* aCompletion) {
RefPtr<EventObject> waitForCompletion = new EventObject();
JobScheduler::SubmitJob(new SetEventJob(waitForCompletion, aCompletion));
waitForCompletion->Wait();
}
MultiThreadedJobQueue* JobScheduler::GetQueueForJob(Job* aJob) {
return aJob->IsPinnedToAThread() ? aJob->GetWorkerThread()->GetJobQueue()
: GetDrawingQueue();
}
Job::Job(SyncObject* aStart, SyncObject* aCompletion, WorkerThread* aThread)
: mNextWaitingJob(nullptr),
mStartSync(aStart),
mCompletionSync(aCompletion),
mPinToThread(aThread) {
if (mStartSync) {
mStartSync->AddSubsequent(this);
}
if (mCompletionSync) {
mCompletionSync->AddPrerequisite(this);
}
}
Job::~Job() {
if (mCompletionSync) {
// printf(" -- Job %p dtor completion %p\n", this, mCompletionSync);
mCompletionSync->Signal();
mCompletionSync = nullptr;
}
}
JobStatus SetEventJob::Run() {
mEvent->Set();
return JobStatus::Complete;
}
SetEventJob::SetEventJob(EventObject* aEvent, SyncObject* aStart,
SyncObject* aCompletion, WorkerThread* aWorker)
: Job(aStart, aCompletion, aWorker), mEvent(aEvent) {}
SetEventJob::~SetEventJob() = default;
SyncObject::SyncObject(uint32_t aNumPrerequisites)
: mSignals(aNumPrerequisites),
mFirstWaitingJob(nullptr)
#ifdef DEBUG
,
mNumPrerequisites(aNumPrerequisites),
mAddedPrerequisites(0)
#endif
{
}
SyncObject::~SyncObject() { MOZ_ASSERT(mFirstWaitingJob == nullptr); }
bool SyncObject::Register(Job* aJob) {
MOZ_ASSERT(aJob);
// For now, ensure that when we schedule the first subsequent, we have already
// created all of the prerequisites. This is an arbitrary restriction because
// we specify the number of prerequisites in the constructor, but in the
// typical scenario, if the assertion FreezePrerequisite blows up here it
// probably means we got the initial nmber of prerequisites wrong. We can
// decide to remove this restriction if needed.
FreezePrerequisites();
int32_t signals = mSignals;
if (signals > 0) {
AddWaitingJob(aJob);
// Since Register and Signal can be called concurrently, it can happen that
// reading mSignals in Register happens before decrementing mSignals in
// Signal, but SubmitWaitingJobs happens before AddWaitingJob. This ordering
// means the SyncObject ends up in the signaled state with a task sitting in
// the waiting list. To prevent that we check mSignals a second time and
// submit again if signals reached zero in the mean time. We do this instead
// of holding a mutex around mSignals+mJobs to reduce lock contention.
int32_t signals2 = mSignals;
if (signals2 == 0) {
SubmitWaitingJobs();
}
return true;
}
return false;
}
void SyncObject::Signal() {
int32_t signals = --mSignals;
MOZ_ASSERT(signals >= 0);
if (signals == 0) {
SubmitWaitingJobs();
}
}
void SyncObject::AddWaitingJob(Job* aJob) {
// Push (using atomics) the task into the list of waiting tasks.
for (;;) {
Job* first = mFirstWaitingJob;
aJob->mNextWaitingJob = first;
if (mFirstWaitingJob.compareExchange(first, aJob)) {
break;
}
}
}
void SyncObject::SubmitWaitingJobs() {
// Scheduling the tasks can cause code that modifies <this>'s reference
// count to run concurrently, and cause the caller of this function to
// be owned by another thread. We need to make sure the reference count
// does not reach 0 on another thread before the end of this method, so
// hold a strong ref to prevent that!
RefPtr<SyncObject> kungFuDeathGrip(this);
// First atomically swap mFirstWaitingJob and waitingJobs...
Job* waitingJobs = nullptr;
for (;;) {
waitingJobs = mFirstWaitingJob;
if (mFirstWaitingJob.compareExchange(waitingJobs, nullptr)) {
break;
}
}
// ... and submit all of the waiting tasks in waitingJob now that they belong
// to this thread.
while (waitingJobs) {
Job* next = waitingJobs->mNextWaitingJob;
waitingJobs->mNextWaitingJob = nullptr;
JobScheduler::GetQueueForJob(waitingJobs)->SubmitJob(waitingJobs);
waitingJobs = next;
}
}
bool SyncObject::IsSignaled() { return mSignals == 0; }
void SyncObject::FreezePrerequisites() {
MOZ_ASSERT(mAddedPrerequisites == mNumPrerequisites);
}
void SyncObject::AddPrerequisite(Job* aJob) {
MOZ_ASSERT(++mAddedPrerequisites <= mNumPrerequisites);
}
void SyncObject::AddSubsequent(Job* aJob) {}
WorkerThread::WorkerThread(MultiThreadedJobQueue* aJobQueue)
: mQueue(aJobQueue) {
aJobQueue->RegisterThread();
}
void WorkerThread::Run() {
SetName("gfx worker");
for (;;) {
Job* commands = nullptr;
if (!mQueue->WaitForJob(commands)) {
mQueue->UnregisterThread();
return;
}
JobStatus status = JobScheduler::ProcessJob(commands);
if (status == JobStatus::Error) {
// Don't try to handle errors for now, but that's open to discussions.
// I expect errors to be mostly OOM issues.
gfxDevCrash(LogReason::JobStatusError)
<< "Invalid job status " << (int)status;
}
}
}
} // namespace gfx
} // namespace mozilla

View File

@ -1,257 +0,0 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifndef MOZILLA_GFX_TASKSCHEDULER_H_
#define MOZILLA_GFX_TASKSCHEDULER_H_
#include "mozilla/RefPtr.h"
#include "mozilla/gfx/Types.h"
#include "mozilla/RefCounted.h"
#ifdef WIN32
# include "mozilla/gfx/JobScheduler_win32.h"
#else
# include "mozilla/gfx/JobScheduler_posix.h"
#endif
#include <vector>
namespace mozilla {
namespace gfx {
class MultiThreadedJobQueue;
class SyncObject;
class WorkerThread;
class JobScheduler {
public:
/// Return one of the queues that the drawing worker threads pull from, chosen
/// pseudo-randomly.
static MultiThreadedJobQueue* GetDrawingQueue() {
return sSingleton->mDrawingQueues[sSingleton->mNextQueue++ %
sSingleton->mDrawingQueues.size()];
}
/// Return one of the queues that the drawing worker threads pull from with a
/// hash to choose the queue.
///
/// Calling this function several times with the same hash will yield the same
/// queue.
static MultiThreadedJobQueue* GetDrawingQueue(uint32_t aHash) {
return sSingleton
->mDrawingQueues[aHash % sSingleton->mDrawingQueues.size()];
}
/// Return the task queue associated to the worker the task is pinned to if
/// the task is pinned to a worker, or a random queue.
static MultiThreadedJobQueue* GetQueueForJob(Job* aJob);
/// Initialize the task scheduler with aNumThreads worker threads for drawing
/// and aNumQueues task queues.
///
/// The number of threads must be superior or equal to the number of queues
/// (since for now a worker thread only pulls from one queue).
static bool Init(uint32_t aNumThreads, uint32_t aNumQueues);
/// Shut the scheduler down.
///
/// This will block until worker threads are joined and deleted.
static void ShutDown();
/// Returns true if there is a successfully initialized JobScheduler
/// singleton.
static bool IsEnabled() { return !!sSingleton; }
/// Submit a task buffer to its associated queue.
///
/// The caller looses ownership of the task buffer.
static void SubmitJob(Job* aJobs);
/// Convenience function to block the current thread until a given SyncObject
/// is in the signaled state.
///
/// The current thread will first try to steal jobs before blocking.
static void Join(SyncObject* aCompletionSync);
/// Process commands until the command buffer needs to block on a sync object,
/// completes, yields, or encounters an error.
///
/// Can be used on any thread. Worker threads basically loop over this, but
/// the main thread can also dequeue pending task buffers and process them
/// alongside the worker threads if it is about to block until completion
/// anyway.
///
/// The caller looses ownership of the task buffer.
static JobStatus ProcessJob(Job* aJobs);
protected:
static JobScheduler* sSingleton;
// queues of Job that are ready to be processed
std::vector<MultiThreadedJobQueue*> mDrawingQueues;
std::vector<WorkerThread*> mWorkerThreads;
Atomic<uint32_t> mNextQueue;
};
/// Jobs are not reference-counted because they don't have shared ownership.
/// The ownership of tasks can change when they are passed to certain methods
/// of JobScheduler and SyncObject. See the docuumentaion of these classes.
class Job {
public:
Job(SyncObject* aStart, SyncObject* aCompletion,
WorkerThread* aThread = nullptr);
virtual ~Job();
virtual JobStatus Run() = 0;
/// For use in JobScheduler::SubmitJob. Don't use it anywhere else.
// already_AddRefed<SyncObject> GetAndResetStartSync();
SyncObject* GetStartSync() { return mStartSync; }
bool IsPinnedToAThread() const { return !!mPinToThread; }
WorkerThread* GetWorkerThread() { return mPinToThread; }
protected:
// An intrusive linked list of tasks waiting for a sync object to enter the
// signaled state. When the task is not waiting for a sync object,
// mNextWaitingJob should be null. This is only accessed from the thread that
// owns the task.
Job* mNextWaitingJob;
RefPtr<SyncObject> mStartSync;
RefPtr<SyncObject> mCompletionSync;
WorkerThread* mPinToThread;
friend class SyncObject;
};
class EventObject;
/// This task will set an EventObject.
///
/// Typically used as the final task, so that the main thread can block on the
/// corresponfing EventObject until all of the tasks are processed.
class SetEventJob : public Job {
public:
explicit SetEventJob(EventObject* aEvent, SyncObject* aStart,
SyncObject* aCompletion = nullptr,
WorkerThread* aPinToWorker = nullptr);
virtual ~SetEventJob();
JobStatus Run() override;
EventObject* GetEvent() { return mEvent; }
protected:
RefPtr<EventObject> mEvent;
};
/// A synchronization object that can be used to express dependencies and
/// ordering between tasks.
///
/// Jobs can register to SyncObjects in order to asynchronously wait for a
/// signal. In practice, Job objects usually start with a sync object (startSyc)
/// and end with another one (completionSync). a Job never gets processed before
/// its startSync is in the signaled state, and signals its completionSync as
/// soon as it finishes. This is how dependencies between tasks is expressed.
class SyncObject final : public external::AtomicRefCounted<SyncObject> {
public:
MOZ_DECLARE_REFCOUNTED_TYPENAME(SyncObject)
/// Create a synchronization object.
///
/// aNumPrerequisites represents the number of times the object must be
/// signaled before actually entering the signaled state (in other words, it
/// means the number of dependencies of this sync object).
///
/// Explicitly specifying the number of prerequisites when creating sync
/// objects makes it easy to start scheduling some of the prerequisite tasks
/// while creating the others, which is how we typically use the task
/// scheduler. Automatically determining the number of prerequisites using
/// Job's constructor brings the risk that the sync object enters the signaled
/// state while we are still adding prerequisites which is hard to fix without
/// using muteces.
explicit SyncObject(uint32_t aNumPrerequisites = 1);
virtual ~SyncObject();
/// Attempt to register a task.
///
/// If the sync object is already in the signaled state, the buffer is *not*
/// registered and the sync object does not take ownership of the task.
/// If the object is not yet in the signaled state, it takes ownership of
/// the task and places it in a list of pending tasks.
/// Pending tasks will not be processed by the worker thread.
/// When the SyncObject reaches the signaled state, it places the pending
/// tasks back in the available buffer queue, so that they can be
/// scheduled again.
///
/// Returns true if the SyncOject is not already in the signaled state.
/// This means that if this method returns true, the SyncObject has taken
/// ownership of the Job.
bool Register(Job* aJob);
/// Signal the SyncObject.
///
/// This decrements an internal counter. The sync object reaches the signaled
/// state when the counter gets to zero.
void Signal();
/// Returns true if mSignals is equal to zero. In other words, returns true
/// if all prerequisite tasks have already signaled the sync object.
bool IsSignaled();
/// Asserts that the number of added prerequisites is equal to the number
/// specified in the constructor (does nothin in release builds).
void FreezePrerequisites();
private:
// Called by Job's constructor
void AddSubsequent(Job* aJob);
void AddPrerequisite(Job* aJob);
void AddWaitingJob(Job* aJob);
void SubmitWaitingJobs();
Atomic<int32_t> mSignals;
Atomic<Job*> mFirstWaitingJob;
#ifdef DEBUG
uint32_t mNumPrerequisites;
Atomic<uint32_t> mAddedPrerequisites;
#endif
friend class Job;
friend class JobScheduler;
};
/// Base class for worker threads.
class WorkerThread {
public:
static WorkerThread* Create(MultiThreadedJobQueue* aJobQueue);
virtual ~WorkerThread() = default;
void Run();
MultiThreadedJobQueue* GetJobQueue() { return mQueue; }
protected:
explicit WorkerThread(MultiThreadedJobQueue* aJobQueue);
virtual void SetName(const char* aName) {}
MultiThreadedJobQueue* mQueue;
};
} // namespace gfx
} // namespace mozilla
#endif

View File

@ -1,155 +0,0 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "JobScheduler.h"
#include "mozilla/gfx/Logging.h"
namespace mozilla::gfx {
void* ThreadCallback(void* threadData);
class WorkerThreadPosix : public WorkerThread {
public:
explicit WorkerThreadPosix(MultiThreadedJobQueue* aJobQueue)
: WorkerThread(aJobQueue) {
pthread_create(&mThread, nullptr, ThreadCallback,
static_cast<WorkerThread*>(this));
}
virtual ~WorkerThreadPosix() { pthread_join(mThread, nullptr); }
void SetName(const char*) override {
// XXX - temporarily disabled, see bug 1209039
//
// // Call this from the thread itself because of Mac.
/*
#ifdef XP_MACOSX
pthread_setname_np(aName);
#elif defined(__DragonFly__) || defined(__FreeBSD__) || \
defined(__OpenBSD__)
pthread_set_name_np(mThread, aName);
#elif defined(__NetBSD__)
pthread_setname_np(mThread, "%s", (void*)aName);
#else
pthread_setname_np(mThread, aName);
#endif
*/
}
protected:
pthread_t mThread;
};
void* ThreadCallback(void* threadData) {
WorkerThread* thread = static_cast<WorkerThread*>(threadData);
thread->Run();
return nullptr;
}
WorkerThread* WorkerThread::Create(MultiThreadedJobQueue* aJobQueue) {
return new WorkerThreadPosix(aJobQueue);
}
MultiThreadedJobQueue::MultiThreadedJobQueue()
: mThreadsCount(0), mShuttingDown(false) {}
MultiThreadedJobQueue::~MultiThreadedJobQueue() { MOZ_ASSERT(mJobs.empty()); }
bool MultiThreadedJobQueue::WaitForJob(Job*& aOutJob) {
return PopJob(aOutJob, BLOCKING);
}
bool MultiThreadedJobQueue::PopJob(Job*& aOutJobs, AccessType aAccess) {
for (;;) {
CriticalSectionAutoEnter lock(&mMutex);
while (aAccess == BLOCKING && !mShuttingDown && mJobs.empty()) {
mAvailableCondvar.Wait(&mMutex);
}
if (mShuttingDown) {
return false;
}
if (mJobs.empty()) {
if (aAccess == NON_BLOCKING) {
return false;
}
continue;
}
Job* task = mJobs.front();
MOZ_ASSERT(task);
mJobs.pop_front();
aOutJobs = task;
return true;
}
}
void MultiThreadedJobQueue::SubmitJob(Job* aJobs) {
MOZ_ASSERT(aJobs);
CriticalSectionAutoEnter lock(&mMutex);
mJobs.push_back(aJobs);
mAvailableCondvar.Broadcast();
}
size_t MultiThreadedJobQueue::NumJobs() {
CriticalSectionAutoEnter lock(&mMutex);
return mJobs.size();
}
bool MultiThreadedJobQueue::IsEmpty() {
CriticalSectionAutoEnter lock(&mMutex);
return mJobs.empty();
}
void MultiThreadedJobQueue::ShutDown() {
CriticalSectionAutoEnter lock(&mMutex);
mShuttingDown = true;
while (mThreadsCount) {
mAvailableCondvar.Broadcast();
mShutdownCondvar.Wait(&mMutex);
}
}
void MultiThreadedJobQueue::RegisterThread() { mThreadsCount += 1; }
void MultiThreadedJobQueue::UnregisterThread() {
CriticalSectionAutoEnter lock(&mMutex);
mThreadsCount -= 1;
if (mThreadsCount == 0) {
mShutdownCondvar.Broadcast();
}
}
EventObject::EventObject() : mIsSet(false) {}
EventObject::~EventObject() = default;
bool EventObject::Peak() {
CriticalSectionAutoEnter lock(&mMutex);
return mIsSet;
}
void EventObject::Set() {
CriticalSectionAutoEnter lock(&mMutex);
if (!mIsSet) {
mIsSet = true;
mCond.Broadcast();
}
}
void EventObject::Wait() {
CriticalSectionAutoEnter lock(&mMutex);
if (mIsSet) {
return;
}
mCond.Wait(&mMutex);
}
} // namespace mozilla::gfx

View File

@ -1,137 +0,0 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifndef WIN32
# ifndef MOZILLA_GFX_TASKSCHEDULER_POSIX_H_
# define MOZILLA_GFX_TASKSCHEDULER_POSIX_H_
# include <string>
# include <vector>
# include <list>
# include <pthread.h>
# include <stdint.h>
# include <stdio.h>
# include "mozilla/RefPtr.h"
# include "mozilla/DebugOnly.h"
# include "mozilla/gfx/CriticalSection.h"
# include "mozilla/RefCounted.h"
namespace mozilla {
namespace gfx {
class Job;
class PosixCondVar;
class WorkerThread;
// posix platforms only!
class PosixCondVar {
public:
PosixCondVar() {
DebugOnly<int> err = pthread_cond_init(&mCond, nullptr);
MOZ_ASSERT(!err);
}
~PosixCondVar() {
DebugOnly<int> err = pthread_cond_destroy(&mCond);
MOZ_ASSERT(!err);
}
void Wait(CriticalSection* aMutex) {
DebugOnly<int> err = pthread_cond_wait(&mCond, &aMutex->mMutex);
MOZ_ASSERT(!err);
}
void Broadcast() {
DebugOnly<int> err = pthread_cond_broadcast(&mCond);
MOZ_ASSERT(!err);
}
protected:
pthread_cond_t mCond;
};
/// A simple and naive multithreaded task queue
///
/// The public interface of this class must remain identical to its equivalent
/// in JobScheduler_win32.h
class MultiThreadedJobQueue {
public:
enum AccessType { BLOCKING, NON_BLOCKING };
// Producer thread
MultiThreadedJobQueue();
// Producer thread
~MultiThreadedJobQueue();
// Worker threads
bool WaitForJob(Job*& aOutJob);
// Any thread
bool PopJob(Job*& aOutJob, AccessType aAccess);
// Any threads
void SubmitJob(Job* aJob);
// Producer thread
void ShutDown();
// Any thread
size_t NumJobs();
// Any thread
bool IsEmpty();
// Producer thread
void RegisterThread();
// Worker threads
void UnregisterThread();
protected:
std::list<Job*> mJobs;
CriticalSection mMutex;
PosixCondVar mAvailableCondvar;
PosixCondVar mShutdownCondvar;
int32_t mThreadsCount;
bool mShuttingDown;
friend class WorkerThread;
};
/// An object that a thread can synchronously wait on.
/// Usually set by a SetEventJob.
class EventObject : public external::AtomicRefCounted<EventObject> {
public:
MOZ_DECLARE_REFCOUNTED_TYPENAME(EventObject)
EventObject();
~EventObject();
/// Synchronously wait until the event is set.
void Wait();
/// Return true if the event is set, without blocking.
bool Peak();
/// Set the event.
void Set();
protected:
CriticalSection mMutex;
PosixCondVar mCond;
bool mIsSet;
};
} // namespace gfx
} // namespace mozilla
# include "JobScheduler.h"
# endif
#endif

View File

@ -1,127 +0,0 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "JobScheduler.h"
#include "mozilla/gfx/Logging.h"
namespace mozilla {
namespace gfx {
DWORD __stdcall ThreadCallback(void* threadData);
class WorkerThreadWin32 : public WorkerThread {
public:
explicit WorkerThreadWin32(MultiThreadedJobQueue* aJobQueue)
: WorkerThread(aJobQueue) {
mThread = ::CreateThread(nullptr, 0, ThreadCallback,
static_cast<WorkerThread*>(this), 0, nullptr);
}
virtual ~WorkerThreadWin32() {
::WaitForSingleObject(mThread, INFINITE);
::CloseHandle(mThread);
}
protected:
HANDLE mThread;
};
DWORD __stdcall ThreadCallback(void* threadData) {
WorkerThread* thread = static_cast<WorkerThread*>(threadData);
thread->Run();
return 0;
}
WorkerThread* WorkerThread::Create(MultiThreadedJobQueue* aJobQueue) {
return new WorkerThreadWin32(aJobQueue);
}
bool MultiThreadedJobQueue::PopJob(Job*& aOutJob, AccessType aAccess) {
for (;;) {
while (aAccess == BLOCKING && mJobs.empty()) {
{
CriticalSectionAutoEnter lock(&mSection);
if (mShuttingDown) {
return false;
}
}
HANDLE handles[] = {mAvailableEvent, mShutdownEvent};
::WaitForMultipleObjects(2, handles, FALSE, INFINITE);
}
CriticalSectionAutoEnter lock(&mSection);
if (mShuttingDown) {
return false;
}
if (mJobs.empty()) {
if (aAccess == NON_BLOCKING) {
return false;
}
continue;
}
Job* task = mJobs.front();
MOZ_ASSERT(task);
mJobs.pop_front();
if (mJobs.empty()) {
::ResetEvent(mAvailableEvent);
}
aOutJob = task;
return true;
}
}
void MultiThreadedJobQueue::SubmitJob(Job* aJob) {
MOZ_ASSERT(aJob);
CriticalSectionAutoEnter lock(&mSection);
mJobs.push_back(aJob);
::SetEvent(mAvailableEvent);
}
void MultiThreadedJobQueue::ShutDown() {
{
CriticalSectionAutoEnter lock(&mSection);
mShuttingDown = true;
}
while (mThreadsCount) {
::SetEvent(mAvailableEvent);
::WaitForSingleObject(mShutdownEvent, INFINITE);
}
}
size_t MultiThreadedJobQueue::NumJobs() {
CriticalSectionAutoEnter lock(&mSection);
return mJobs.size();
}
bool MultiThreadedJobQueue::IsEmpty() {
CriticalSectionAutoEnter lock(&mSection);
return mJobs.empty();
}
void MultiThreadedJobQueue::RegisterThread() { mThreadsCount += 1; }
void MultiThreadedJobQueue::UnregisterThread() {
mSection.Enter();
mThreadsCount -= 1;
bool finishShutdown = mThreadsCount == 0;
mSection.Leave();
if (finishShutdown) {
// Can't touch mSection or any other member from now on because this object
// may get deleted on the main thread after mShutdownEvent is set.
::SetEvent(mShutdownEvent);
}
}
} // namespace gfx
} // namespace mozilla

View File

@ -1,92 +0,0 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifdef WIN32
# ifndef MOZILLA_GFX_TASKSCHEDULER_WIN32_H_
# define MOZILLA_GFX_TASKSCHEDULER_WIN32_H_
# include <windows.h>
# include <list>
# include "mozilla/RefPtr.h"
# include "mozilla/gfx/CriticalSection.h"
# include "mozilla/RefCounted.h"
namespace mozilla {
namespace gfx {
class WorkerThread;
class Job;
// The public interface of this class must remain identical to its equivalent
// in JobScheduler_posix.h
class MultiThreadedJobQueue {
public:
enum AccessType { BLOCKING, NON_BLOCKING };
MultiThreadedJobQueue() : mThreadsCount(0), mShuttingDown(false) {
mAvailableEvent = ::CreateEventW(nullptr, TRUE, FALSE, nullptr);
mShutdownEvent = ::CreateEventW(nullptr, TRUE, FALSE, nullptr);
}
~MultiThreadedJobQueue() {
::CloseHandle(mAvailableEvent);
::CloseHandle(mShutdownEvent);
}
bool WaitForJob(Job*& aOutJob) { return PopJob(aOutJob, BLOCKING); }
bool PopJob(Job*& aOutJob, AccessType aAccess);
void SubmitJob(Job* aJob);
void ShutDown();
size_t NumJobs();
bool IsEmpty();
void RegisterThread();
void UnregisterThread();
protected:
std::list<Job*> mJobs;
CriticalSection mSection;
HANDLE mAvailableEvent;
HANDLE mShutdownEvent;
int32_t mThreadsCount;
bool mShuttingDown;
friend class WorkerThread;
};
// The public interface of this class must remain identical to its equivalent
// in JobScheduler_posix.h
class EventObject : public external::AtomicRefCounted<EventObject> {
public:
MOZ_DECLARE_REFCOUNTED_TYPENAME(EventObject)
EventObject() { mEvent = ::CreateEventW(nullptr, TRUE, FALSE, nullptr); }
~EventObject() { ::CloseHandle(mEvent); }
void Wait() { ::WaitForSingleObject(mEvent, INFINITE); }
bool Peak() { return ::WaitForSingleObject(mEvent, 0) == WAIT_OBJECT_0; }
void Set() { ::SetEvent(mEvent); }
protected:
// TODO: it's expensive to create events so we should try to reuse them
HANDLE mEvent;
};
} // namespace gfx
} // namespace mozilla
# endif
#endif

View File

@ -34,9 +34,6 @@ EXPORTS.mozilla.gfx += [
'HelpersCairo.h',
'InlineTranslator.h',
'IterableArena.h',
'JobScheduler.h',
'JobScheduler_posix.h',
'JobScheduler_win32.h',
'Logging.h',
'LoggingConstants.h',
'Matrix.h',
@ -86,7 +83,6 @@ elif CONFIG['MOZ_WIDGET_TOOLKIT'] == 'windows':
'DrawTargetD2D1.cpp',
'ExtendInputEffectD2D1.cpp',
'FilterNodeD2D1.cpp',
'JobScheduler_win32.cpp',
'NativeFontResourceDWrite.cpp',
'NativeFontResourceGDI.cpp',
'PathD2D.cpp',
@ -97,11 +93,6 @@ elif CONFIG['MOZ_WIDGET_TOOLKIT'] == 'windows':
]
DEFINES['WIN32'] = True
if CONFIG['MOZ_WIDGET_TOOLKIT'] != 'windows':
SOURCES += [
'JobScheduler_posix.cpp',
]
if CONFIG['MOZ_WIDGET_TOOLKIT'] in ('android', 'gtk'):
EXPORTS.mozilla.gfx += [
'UnscaledFontFreeType.h',
@ -173,7 +164,6 @@ UNIFIED_SOURCES += [
'DataSourceSurface.cpp',
'DataSurfaceHelpers.cpp',
'DrawEventRecorder.cpp',
'DrawingJob.cpp',
'DrawTarget.cpp',
'DrawTargetCairo.cpp',
'DrawTargetCapture.cpp',
@ -187,7 +177,6 @@ UNIFIED_SOURCES += [
'FilterProcessing.cpp',
'FilterProcessingScalar.cpp',
'ImageScaling.cpp',
'JobScheduler.cpp',
'Matrix.cpp',
'NativeFontResource.cpp',
'Path.cpp',

View File

@ -1,239 +0,0 @@
/* vim:set ts=2 sw=2 sts=2 et: */
/* Any copyright is dedicated to the Public Domain.
* http://creativecommons.org/publicdomain/zero/1.0/
*/
#include "gtest/gtest.h"
#include "gmock/gmock.h"
#include "mozilla/gfx/JobScheduler.h"
#ifndef WIN32
# include <pthread.h>
# include <sched.h>
#endif
#include <stdlib.h>
#include <time.h>
namespace test_scheduler {
using namespace mozilla::gfx;
using namespace mozilla;
using mozilla::gfx::SyncObject;
// Artificially cause threads to yield randomly in an attempt to make racy
// things more apparent (if any).
static void MaybeYieldThread() {
#ifndef WIN32
if (rand() % 5 == 0) {
sched_yield();
}
#endif
}
/// Used by the TestCommand to check that tasks are processed in the right
/// order.
struct SanityChecker {
std::vector<uint64_t> mAdvancements;
mozilla::gfx::CriticalSection mSection;
explicit SanityChecker(uint64_t aNumCmdBuffers) {
for (uint32_t i = 0; i < aNumCmdBuffers; ++i) {
mAdvancements.push_back(0);
}
}
virtual void Check(uint64_t aJobId, uint64_t aCmdId) {
MaybeYieldThread();
CriticalSectionAutoEnter lock(&mSection);
MOZ_RELEASE_ASSERT(mAdvancements[aJobId] == aCmdId - 1);
mAdvancements[aJobId] = aCmdId;
}
};
/// Run checks that are specific to TestSchulerJoin.
struct JoinTestSanityCheck : public SanityChecker {
bool mSpecialJobHasRun;
explicit JoinTestSanityCheck(uint64_t aNumCmdBuffers)
: SanityChecker(aNumCmdBuffers), mSpecialJobHasRun(false) {}
virtual void Check(uint64_t aJobId, uint64_t aCmdId) override {
// Job 0 is the special task executed when everything is joined after task 1
if (aCmdId == 0) {
MOZ_RELEASE_ASSERT(!mSpecialJobHasRun,
"GFX: A special task has been executed.");
mSpecialJobHasRun = true;
for (auto advancement : mAdvancements) {
// Because of the synchronization point (beforeFilter), all
// task buffers should have run task 1 when task 0 is run.
MOZ_RELEASE_ASSERT(advancement == 1,
"GFX: task buffer has not run task 1.");
}
} else {
// This check does not apply to task 0.
SanityChecker::Check(aJobId, aCmdId);
}
if (aCmdId == 2) {
MOZ_RELEASE_ASSERT(mSpecialJobHasRun, "GFX: Special job has not run.");
}
}
};
class TestJob : public Job {
public:
TestJob(uint64_t aCmdId, uint64_t aJobId, SanityChecker* aChecker,
SyncObject* aStart, SyncObject* aCompletion)
: Job(aStart, aCompletion, nullptr),
mCmdId(aCmdId),
mCmdBufferId(aJobId),
mSanityChecker(aChecker) {}
JobStatus Run() {
MaybeYieldThread();
mSanityChecker->Check(mCmdBufferId, mCmdId);
MaybeYieldThread();
return JobStatus::Complete;
}
uint64_t mCmdId;
uint64_t mCmdBufferId;
SanityChecker* mSanityChecker;
};
/// This test creates aNumCmdBuffers task buffers with sync objects set up
/// so that all tasks will join after command 5 before a task buffer runs
/// a special task (task 0) after which all task buffers fork again.
/// This simulates the kind of scenario where all tiles must join at
/// a certain point to execute, say, a filter, and fork again after the filter
/// has been processed.
/// The main thread is only blocked when waiting for the completion of the
/// entire task stream (it doesn't have to wait at the filter's sync points to
/// orchestrate it).
static void TestSchedulerJoin(uint32_t aNumThreads, uint32_t aNumCmdBuffers) {
JoinTestSanityCheck check(aNumCmdBuffers);
RefPtr<SyncObject> beforeFilter = new SyncObject(aNumCmdBuffers);
RefPtr<SyncObject> afterFilter = new SyncObject();
RefPtr<SyncObject> completion = new SyncObject(aNumCmdBuffers);
for (uint32_t i = 0; i < aNumCmdBuffers; ++i) {
Job* t1 = new TestJob(1, i, &check, nullptr, beforeFilter);
JobScheduler::SubmitJob(t1);
MaybeYieldThread();
}
beforeFilter->FreezePrerequisites();
// This task buffer is executed when all other tasks have joined after task 1
JobScheduler::SubmitJob(new TestJob(0, 0, &check, beforeFilter, afterFilter));
afterFilter->FreezePrerequisites();
for (uint32_t i = 0; i < aNumCmdBuffers; ++i) {
Job* t2 = new TestJob(2, i, &check, afterFilter, completion);
JobScheduler::SubmitJob(t2);
MaybeYieldThread();
}
completion->FreezePrerequisites();
JobScheduler::Join(completion);
MaybeYieldThread();
for (auto advancement : check.mAdvancements) {
EXPECT_TRUE(advancement == 2);
}
}
/// This test creates several chains of 10 task, tasks of a given chain are
/// executed sequentially, and chains are exectuted in parallel. This simulates
/// the typical scenario where we want to process sequences of drawing commands
/// for several tiles in parallel.
static void TestSchedulerChain(uint32_t aNumThreads, uint32_t aNumCmdBuffers) {
SanityChecker check(aNumCmdBuffers);
RefPtr<SyncObject> completion = new SyncObject(aNumCmdBuffers);
uint32_t numJobs = 10;
for (uint32_t i = 0; i < aNumCmdBuffers; ++i) {
std::vector<RefPtr<SyncObject>> syncs;
std::vector<Job*> tasks;
syncs.reserve(numJobs);
tasks.reserve(numJobs);
for (uint32_t t = 0; t < numJobs - 1; ++t) {
syncs.push_back(new SyncObject());
tasks.push_back(new TestJob(
t + 1, i, &check, t == 0 ? nullptr : syncs[t - 1].get(), syncs[t]));
syncs.back()->FreezePrerequisites();
}
tasks.push_back(new TestJob(numJobs, i, &check, syncs.back(), completion));
if (i % 2 == 0) {
// submit half of the tasks in order
for (Job* task : tasks) {
JobScheduler::SubmitJob(task);
MaybeYieldThread();
}
} else {
// ... and submit the other half in reverse order
for (int32_t reverse = numJobs - 1; reverse >= 0; --reverse) {
JobScheduler::SubmitJob(tasks[reverse]);
MaybeYieldThread();
}
}
}
completion->FreezePrerequisites();
JobScheduler::Join(completion);
for (auto advancement : check.mAdvancements) {
EXPECT_TRUE(advancement == numJobs);
}
}
} // namespace test_scheduler
#if !defined(MOZ_CODE_COVERAGE) || !defined(XP_WIN)
TEST(Moz2D, JobScheduler_Shutdown)
{
srand(time(nullptr));
for (uint32_t threads = 1; threads < 16; ++threads) {
for (uint32_t i = 1; i < 1000; ++i) {
mozilla::gfx::JobScheduler::Init(threads, threads);
mozilla::gfx::JobScheduler::ShutDown();
}
}
}
#endif
TEST(Moz2D, JobScheduler_Join)
{
srand(time(nullptr));
for (uint32_t threads = 1; threads < 8; ++threads) {
for (uint32_t queues = 1; queues < threads; ++queues) {
for (uint32_t buffers = 1; buffers < 100; buffers += 3) {
mozilla::gfx::JobScheduler::Init(threads, queues);
test_scheduler::TestSchedulerJoin(threads, buffers);
mozilla::gfx::JobScheduler::ShutDown();
}
}
}
}
TEST(Moz2D, JobScheduler_Chain)
{
srand(time(nullptr));
for (uint32_t threads = 1; threads < 8; ++threads) {
for (uint32_t queues = 1; queues < threads; ++queues) {
for (uint32_t buffers = 1; buffers < 100; buffers += 3) {
mozilla::gfx::JobScheduler::Init(threads, queues);
test_scheduler::TestSchedulerChain(threads, buffers);
mozilla::gfx::JobScheduler::ShutDown();
}
}
}
}

View File

@ -7,6 +7,7 @@
#include "gtest/gtest.h"
#include "gfxTypes.h"
#include "nsRect.h"
#include "gfxRect.h"
#include "mozilla/gfx/Rect.h"

View File

@ -15,7 +15,6 @@ UNIFIED_SOURCES += [
'TestColorNames.cpp',
'TestConfigManager.cpp',
'TestGfxWidgets.cpp',
'TestJobScheduler.cpp',
'TestLayers.cpp',
'TestMatrix.cpp',
'TestMoz2D.cpp',