Bug 801087 - Implement parallel workers threadpool in preparation for Rivertrail. r=dmandelin

This commit is contained in:
Niko Matsakis 2012-11-21 13:31:57 -05:00
parent 67a6adffd0
commit 9a65eb5ce2
11 changed files with 1214 additions and 0 deletions

View File

@ -109,6 +109,9 @@ CPPSRCS = \
jswatchpoint.cpp \
jsweakmap.cpp \
jsworkers.cpp \
threadpool.cpp \
monitor.cpp \
forkjoin.cpp \
jswrapper.cpp \
jsxml.cpp \
prmjtime.cpp \

View File

@ -872,6 +872,7 @@ JSRuntime::JSRuntime(JSUseHelperThreads useHelperThreads)
ionStackLimit(0),
ionActivation(NULL),
ionPcScriptCache(NULL),
threadPool(this),
ionReturnOverride_(MagicValue(JS_ARG_POISON)),
useHelperThreads_(useHelperThreads)
{
@ -937,6 +938,9 @@ JSRuntime::init(uint32_t maxbytes)
if (!scriptFilenameTable.init())
return false;
if (!threadPool.init())
return false;
#ifdef JS_THREADSAFE
if (!sourceCompressorThread.init())
return false;

View File

@ -89,6 +89,8 @@
#include "jsobj.h"
#include "jsscope.h"
#include "jswrapper.h"
#include "vm/threadpool.h"
#include "vm/forkjoin.h"
#include "methodjit/MethodJIT.h"
#include "methodjit/StubCalls.h"
#include "methodjit/StubCalls-inl.h"

View File

@ -26,6 +26,7 @@
#include "jsprototypes.h"
#include "jsutil.h"
#include "prmjtime.h"
#include "vm/threadpool.h"
#include "ds/LifoAlloc.h"
#include "gc/Statistics.h"
@ -1025,6 +1026,8 @@ struct JSRuntime : js::RuntimeFriendFields
// Cache for ion::GetPcScript().
js::ion::PcScriptCache *ionPcScriptCache;
js::ThreadPool threadPool;
private:
// In certain cases, we want to optimize certain opcodes to typed instructions,
// to avoid carrying an extra register to feed into an unbox. Unfortunately,

496
js/src/vm/forkjoin.cpp Normal file
View File

@ -0,0 +1,496 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 4 -*-
* vim: set ts=8 sw=4 et tw=78:
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "forkjoin.h"
#include "monitor.h"
#include "jscntxt.h"
#include "jscompartment.h"
#include "prthread.h"
#include "forkjoininlines.h"
namespace js {
class ForkJoinShared
: public TaskExecutor,
public Monitor
{
////////////////////////////////////////////////////////////////////////
// Constant fields
JSContext *const cx_; // Current context
ThreadPool *const threadPool_; // The thread pool.
ForkJoinOp &op_; // User-defined operations to be perf. in par.
const size_t numThreads_; // Total number of threads.
PRCondVar *rendezvousEnd_; // Cond. var used to signal end of rendezvous.
////////////////////////////////////////////////////////////////////////
// Per-thread arenas
//
// Each worker thread gets an arena to use when allocating.
Vector<gc::ArenaLists *, 16> arenaListss_;
////////////////////////////////////////////////////////////////////////
// Locked Fields
//
// Only to be accessed while holding the lock.
size_t uncompleted_; // Number of uncompleted worker threads.
size_t blocked_; // Number of threads that have joined the rendezvous.
size_t rendezvousIndex_; // Number of rendezvous attempts
////////////////////////////////////////////////////////////////////////
// Asynchronous Flags
//
// These can be read without the lock (hence the |volatile| declaration).
// A thread has bailed and others should follow suit. Set and
// read asynchronously. After setting abort, workers will acquire
// the lock, decrement uncompleted, and then notify if uncompleted
// has reached blocked.
volatile bool abort_;
// Set to true when a worker bails for a fatal reason.
volatile bool fatal_;
// A thread has request a rendezvous. Only *written* with the
// lock (in |initiateRendezvous()| and |endRendezvous()|) but may
// be *read* without the lock.
volatile bool rendezvous_;
// Invoked only from the main thread:
void executeFromMainThread(uintptr_t stackLimit);
// Executes slice #threadId of the work, either from a worker or
// the main thread.
void executePortion(PerThreadData *perThread, size_t threadId, uintptr_t stackLimit);
// Rendezvous protocol:
//
// Use AutoRendezvous rather than invoking initiateRendezvous()
// and endRendezvous() directly.
friend class AutoRendezvous;
// Requests that the other threads stop. Must be invoked from the
// main thread.
void initiateRendezvous(ForkJoinSlice &threadCx);
// If a rendezvous has been requested, blocks until the main
// thread says we may continue.
void joinRendezvous(ForkJoinSlice &threadCx);
// Permits other threads to resume execution. Must be invoked
// from the main thread after a call to initiateRendezvous().
void endRendezvous(ForkJoinSlice &threadCx);
public:
ForkJoinShared(JSContext *cx,
ThreadPool *threadPool,
ForkJoinOp &op,
size_t numThreads,
size_t uncompleted);
~ForkJoinShared();
bool init();
ParallelResult execute();
// Invoked from parallel worker threads:
virtual void executeFromWorker(size_t threadId, uintptr_t stackLimit);
// Moves all the per-thread arenas into the main compartment.
// This can only safely be invoked on the main thread, either
// during a rendezvous or after the workers have completed.
void transferArenasToCompartment();
// Invoked during processing by worker threads to "check in"
bool check(ForkJoinSlice &threadCx);
// See comment on |ForkJoinSlice::setFatal()| in forkjoin.h
bool setFatal();
JSRuntime *runtime() { return cx_->runtime; }
};
class AutoRendezvous {
private:
ForkJoinSlice &threadCx;
public:
AutoRendezvous(ForkJoinSlice &threadCx)
: threadCx(threadCx)
{
threadCx.shared->initiateRendezvous(threadCx);
}
~AutoRendezvous()
{
threadCx.shared->endRendezvous(threadCx);
}
};
PRUintn ForkJoinSlice::ThreadPrivateIndex;
class AutoSetForkJoinSlice
{
public:
AutoSetForkJoinSlice(ForkJoinSlice *threadCx)
{
PR_SetThreadPrivate(ForkJoinSlice::ThreadPrivateIndex, threadCx);
}
~AutoSetForkJoinSlice()
{
PR_SetThreadPrivate(ForkJoinSlice::ThreadPrivateIndex, NULL);
}
};
bool
ForkJoinSlice::Initialize()
{
PRStatus status = PR_NewThreadPrivateIndex(&ThreadPrivateIndex, NULL);
return status == PR_SUCCESS;
}
ParallelResult ExecuteForkJoinOp(JSContext *cx, ForkJoinOp &op)
{
# ifndef JS_THREADSAFE_ION
return TP_RETRY_SEQUENTIALLY;
# else
JS_ASSERT(!InParallelSection()); // Recursive use of the ThreadPool is not supported.
ThreadPool *threadPool = &cx->runtime->threadPool;
size_t numThreads = threadPool->numWorkers() + 1; // parallel workers plus this main thread
ForkJoinShared shared(cx, threadPool, op, numThreads, numThreads - 1);
if (!shared.init())
return TP_RETRY_SEQUENTIALLY;
return shared.execute();
# endif
}
/****************************************************************************
* ForkJoinShared
*/
ForkJoinShared::ForkJoinShared(JSContext *cx,
ThreadPool *threadPool,
ForkJoinOp &op,
size_t numThreads,
size_t uncompleted)
: cx_(cx),
threadPool_(threadPool),
op_(op),
numThreads_(numThreads),
arenaListss_(cx),
uncompleted_(uncompleted),
blocked_(0),
rendezvousIndex_(0),
abort_(false),
fatal_(false),
rendezvous_(false)
{}
bool
ForkJoinShared::init()
{
// Create temporary arenas to hold the data allocated during the
// parallel code.
//
// Note: you might think (as I did, initially) that we could use
// compartment ArenaLists for the main thread. This is not true,
// because when executing parallel code we sometimes check what
// arena list an object is in to decide if it is writable. If we
// used the compartment ArenaLists for the main thread, then the
// main thread would be permitted to write to any object it wants.
if (!Monitor::init())
return false;
rendezvousEnd_ = PR_NewCondVar(lock_);
if (!rendezvousEnd_)
return false;
for (unsigned i = 0; i < numThreads_; i++) {
gc::ArenaLists *arenaLists = cx_->new_<gc::ArenaLists>();
if (!arenaLists)
return false;
if (!arenaListss_.append(arenaLists)) {
delete arenaLists;
return false;
}
}
return true;
}
ForkJoinShared::~ForkJoinShared()
{
PR_DestroyCondVar(rendezvousEnd_);
while (arenaListss_.length() > 0) {
delete arenaListss_.popCopy();
}
}
ParallelResult
ForkJoinShared::execute()
{
AutoLockMonitor lock(*this);
// give the task set a chance to prepare for parallel workload
if (!op_.pre(numThreads_))
return TP_RETRY_SEQUENTIALLY;
// notify workers to start and execute one portion on this thread
{
AutoUnlockMonitor unlock(*this);
threadPool_->submitAll(this);
executeFromMainThread(cx_->runtime->ionStackLimit);
}
// wait for workers to complete
while (uncompleted_ > 0)
lock.wait();
// check if any of the workers failed
if (abort_) {
if (fatal_)
return TP_FATAL;
else
return TP_RETRY_SEQUENTIALLY;
}
transferArenasToCompartment();
// give task set a chance to cleanup after parallel execution
if (!op_.post(numThreads_))
return TP_RETRY_SEQUENTIALLY;
return TP_SUCCESS; // everything went swimmingly. give yourself a pat on the back.
}
void
ForkJoinShared::transferArenasToCompartment()
{
#if 0
// This code will become relevant once other
// bugs are merged down.
JSRuntime *rt = cx_->runtime;
JSCompartment *comp = cx_->compartment;
for (unsigned i = 0; i < numThreads_; i++) {
comp->arenas.adoptArenas(rt, arenaListss_[i]);
}
#endif
}
void
ForkJoinShared::executeFromWorker(size_t workerId, uintptr_t stackLimit)
{
JS_ASSERT(workerId < numThreads_ - 1);
PerThreadData thisThread(cx_->runtime);
TlsPerThreadData.set(&thisThread);
executePortion(&thisThread, workerId, stackLimit);
TlsPerThreadData.set(NULL);
AutoLockMonitor lock(*this);
uncompleted_ -= 1;
if (blocked_ == uncompleted_) {
// Signal the main thread that we have terminated. It will be
// either working, arranging a rendezvous, or waiting for
// workers to complete.
lock.notify();
}
}
void
ForkJoinShared::executeFromMainThread(uintptr_t stackLimit)
{
executePortion(&cx_->runtime->mainThread, numThreads_ - 1, stackLimit);
}
void
ForkJoinShared::executePortion(PerThreadData *perThread,
size_t threadId,
uintptr_t stackLimit)
{
gc::ArenaLists *arenaLists = arenaListss_[threadId];
ForkJoinSlice slice(perThread, threadId, numThreads_,
stackLimit, arenaLists, this);
AutoSetForkJoinSlice autoContext(&slice);
if (!op_.parallel(slice))
abort_ = true;
}
bool
ForkJoinShared::setFatal()
{
// Might as well set the abort flag to true, it will make
// propagation faster:
abort_ = true;
fatal_ = true;
return false;
}
bool
ForkJoinShared::check(ForkJoinSlice &slice)
{
if (abort_)
return false;
if (slice.isMainThread()) {
if (cx_->runtime->interrupt) {
// If interrupt is requested, bring worker threads to a
// halt, service the interrupt, then let them start back
// up again.
AutoRendezvous autoRendezvous(slice);
if (!js_HandleExecutionInterrupt(cx_)) {
return setFatal();
}
}
} else if (rendezvous_) {
joinRendezvous(slice);
}
return true;
}
void
ForkJoinShared::initiateRendezvous(ForkJoinSlice &slice) {
/*
The rendezvous protocol is always initiated by the main thread.
The main thread sets the rendezvous flag to true. Seeing this
flag, other threads will invoke |joinRendezvous()|, which causes
them to (1) read |rendezvousIndex| and (2) increment the
|blocked| counter. Once the |blocked| counter is equal to
|uncompleted|, all parallel threads have joined the rendezvous,
and so the main thread is signaled. That will cause this
function to return.
Some subtle points:
- Worker threads may potentially terminate their work before
they see the rendezvous flag. In this case, they would
decrement |uncompleted| rather than incrementing |blocked|.
Either way, if the two variables become equal, the main thread
will be notified
- The |rendezvousIndex| counter is used to detect the case where
the main thread signals the end of the rendezvous and then
starts another rendezvous before the workers have a chance to
exit. We circumvent this by having the workers read the
|rendezvousIndex| counter as they enter the rendezvous, and
then they only block until that counter is incremented.
Another alternative would be for the main thread to block in
|endRendezvous()| until all workers have exited, but that
would be slower and involve unnecessary synchronization.
Note that the main thread cannot ever get more than one
rendezvous ahead of the workers, because it must wait for all
of them to enter the rendezvous before it can end it, so the
solution of using a counter is perfectly general and we need
not fear rollover.
*/
JS_ASSERT(slice.isMainThread());
JS_ASSERT(!rendezvous_ && blocked_ == 0);
AutoLockMonitor lock(*this);
// signal other threads we want to start a rendezvous
rendezvous_ = true;
// wait until all the other threads blocked themselves
while (blocked_ != uncompleted_) {
lock.wait();
}
}
void
ForkJoinShared::joinRendezvous(ForkJoinSlice &slice) {
JS_ASSERT(!slice.isMainThread());
JS_ASSERT(rendezvous_);
AutoLockMonitor lock(*this);
const size_t index = rendezvousIndex_;
blocked_ += 1;
// If we're the last to arrive, let the main thread know about it.
if (blocked_ == uncompleted_) {
lock.notify();
}
// Wait until the main thread terminates the rendezvous. We use a
// separate condition variable here to distinguish between workers
// notifying the main thread that they have completed and the main
// thread notifying the workers to resume.
while (rendezvousIndex_ == index) {
PR_WaitCondVar(rendezvousEnd_, PR_INTERVAL_NO_TIMEOUT);
}
}
void
ForkJoinShared::endRendezvous(ForkJoinSlice &slice) {
JS_ASSERT(slice.isMainThread());
AutoLockMonitor lock(*this);
rendezvous_ = false;
blocked_ = 0;
rendezvousIndex_ += 1;
// signal other threads that rendezvous is over
PR_NotifyAllCondVar(rendezvousEnd_);
}
/****************************************************************************
* ForkJoinSlice
*/
ForkJoinSlice::ForkJoinSlice(PerThreadData *perThreadData,
size_t sliceId, size_t numSlices,
uintptr_t stackLimit, gc::ArenaLists *arenaLists,
ForkJoinShared *shared)
: perThreadData(perThreadData),
sliceId(sliceId),
numSlices(numSlices),
ionStackLimit(stackLimit),
arenaLists(arenaLists),
shared(shared)
{}
bool
ForkJoinSlice::isMainThread()
{
return perThreadData == &shared->runtime()->mainThread;
}
JSRuntime *
ForkJoinSlice::runtime()
{
return shared->runtime();
}
bool
ForkJoinSlice::check()
{
return shared->check(*this);
}
bool
ForkJoinSlice::setFatal()
{
return shared->setFatal();
}
}

208
js/src/vm/forkjoin.h Normal file
View File

@ -0,0 +1,208 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 4 -*-
* vim: set ts=8 sw=4 et tw=78:
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifndef jstaskset_h___
#define jstaskset_h___
#include "threadpool.h"
/*
* ForkJoin
*
* This is the building block for executing multi-threaded JavaScript
* with shared memory (as distinct from Web Workers). The idea is
* that you have some (typically data-parallel) operation which you
* wish to execute in parallel across as many threads as you have
* available. An example might be applying |map()| to a vector in
* parallel. To implement such a thing, you would define a subclass of
* |ForkJoinOp| to implement the operation and then invoke
* |ExecuteForkJoinOp()|, as follows:
*
* > class MyForkJoinOp {
* > ... define callbacks as appropriate for your operation ...
* > };
* > MyForkJoinOp op;
* > ExecuteForkJoinOp(cx, op);
*
* |ExecuteForkJoinOp()| will fire up the workers in the runtime's
* thread pool, have them execute the callbacks defined in the
* |ForkJoinOp| class, and then return once all the workers have
* completed.
*
* There are three callbacks defined in |ForkJoinOp|. The first,
* |pre()|, is invoked before the parallel section begins. It informs
* you how many slices your problem will be divided into (effectively,
* how many worker threads there will be). This is often useful for
* allocating an array for the workers to store their result or
* something like that.
*
* Next, you will receive |N| calls to the |parallel()| callback,
* where |N| is the number of slices that were specified in |pre()|.
* Each callback will be supplied with a |ForkJoinSlice| instance
* providing some context.
*
* Typically there will be one call to |parallel()| from each worker
* thread, but that is not something you should rely upon---if we
* implement work-stealing, for example, then it could be that a
* single worker thread winds up handling multiple slices.
*
* Finally, after the operation is complete the |post()| callback is
* invoked, giving you a chance to collect the various results.
*
* Operation callback:
*
* During parallel execution, you should periodically invoke
* |slice.check()|, which will handle the operation callback. If the
* operation callback is necessary, |slice.check()| will arrange a
* rendezvous---that is, as each active worker invokes |check()|, it
* will come to a halt until everyone is blocked (Stop The World). At
* this point, we perform the callback on the main thread, and then
* resume execution. If a worker thread terminates before calling
* |check()|, that's fine too. We assume that you do not do unbounded
* work without invoking |check()|.
*
* Sequential Fallback:
*
* It is assumed that anyone using this API must be prepared for a
* sequential fallback. Therefore, the |ExecuteForkJoinOp()| returns
* a status code indicating whether a fatal error occurred (in which
* case you should just stop) or whether you should retry the
* operation, but executing sequentially. An example of where the
* fallback would be useful is if the parallel code encountered an
* unexpected path that cannot safely be executed in parallel (writes
* to shared state, say).
*
* Current Limitations:
*
* - The API does not support recursive or nested use. That is, the
* |parallel()| callback of a |ForkJoinOp| may not itself invoke
* |ExecuteForkJoinOp()|. We may lift this limitation in the
* future.
*
* - No load balancing is performed between worker threads. That
* means that the fork-join system is best suited for problems that
* can be slice into uniform bits.
*/
namespace js {
// Parallel operations in general can have one of three states. They
// may succeed, fail, or "bail", where bail indicates that the code
// encountered an unexpected condition and should be re-run
// sequentially.
enum ParallelResult { TP_SUCCESS, TP_RETRY_SEQUENTIALLY, TP_FATAL };
struct ForkJoinOp;
// Executes the given |TaskSet| in parallel using the runtime's
// |ThreadPool|, returning upon completion. In general, if there are
// |N| workers in the threadpool, the problem will be divided into
// |N+1| slices, as the main thread will also execute one slice.
ParallelResult ExecuteForkJoinOp(JSContext *cx, ForkJoinOp &op);
class ForkJoinShared;
class AutoRendezvous;
class AutoSetForkJoinSlice;
namespace gc { struct ArenaLists; }
struct ForkJoinSlice
{
public:
// PerThreadData corresponding to the current worker thread.
PerThreadData *perThreadData;
// Which slice should you process? Ranges from 0 to |numSlices|.
const size_t sliceId;
// How many slices are there in total?
const size_t numSlices;
// Top of the stack. This should move into |perThreadData|.
uintptr_t ionStackLimit;
// Arenas to use when allocating on this thread. See
// |ion::ParFunctions::ParNewGCThing()|. This should move
// into |perThreadData|.
gc::ArenaLists *const arenaLists;
ForkJoinSlice(PerThreadData *perThreadData, size_t sliceId, size_t numSlices,
uintptr_t stackLimit, gc::ArenaLists *arenaLists,
ForkJoinShared *shared);
// True if this is the main thread, false if it is one of the parallel workers
bool isMainThread();
// Generally speaking, if a thread returns false, that is
// interpreted as a "bailout"---meaning, a recoverable error. If
// however you call this function before returning false, then the
// error will be interpreted as *fatal*. This doesn't strike me
// as the most elegant solution here but I don't know what'd be better.
//
// For convenience, *always* returns false.
bool setFatal();
// During the parallel phase, this method should be invoked
// periodically, for example on every backedge, similar to the
// interrupt check. If it returns false, then the parallel phase
// has been aborted and so you should bailout. The function may
// also rendesvous to perform GC or do other similar things.
bool check();
// Be wary, the runtime is shared between all threads!
JSRuntime *runtime();
static inline ForkJoinSlice *current();
static bool Initialize();
private:
friend class AutoRendezvous;
friend class AutoSetForkJoinSlice;
static PRUintn ThreadPrivateIndex; // initialized by Initialize()
ForkJoinShared *const shared;
};
// Generic interface for specifying divisible operations that can be
// executed in a fork-join fashion.
struct ForkJoinOp
{
public:
// Invoked before parallel phase begins; informs the task set how
// many slices there will be and gives it a chance to initialize
// per-slice data structures.
//
// Returns true on success, false to halt parallel execution.
virtual bool pre(size_t numSlices) = 0;
// Invoked from each parallel thread to process one slice. The
// |ForkJoinSlice| which is supplied will also be available using
// TLS.
//
// Returns true on success, false to halt parallel execution.
virtual bool parallel(ForkJoinSlice &slice) = 0;
// Invoked after parallel phase ends if execution was successful
// (not aborted)
//
// Returns true on success, false to halt parallel execution.
virtual bool post(size_t numSlices) = 0;
};
/* True if this thread is currently executing a ParallelArray
operation across multiple threads. */
static inline bool InParallelSection() {
# ifdef JS_THREADSAFE_ION
return ForkJoinSlice::current() != NULL;
# else
return false;
# endif
}
#endif
}

View File

@ -0,0 +1,12 @@
namespace js {
ForkJoinSlice *
ForkJoinSlice::current() {
#ifdef JS_THREADSAFE_ION
return (ForkJoinSlice*) PR_GetThreadPrivate(ThreadPrivateIndex);
#else
return NULL;
#endif
}
}

30
js/src/vm/monitor.cpp Normal file
View File

@ -0,0 +1,30 @@
#include "vm/monitor.h"
namespace js {
Monitor::Monitor()
: lock_(NULL), condVar_(NULL)
{
}
Monitor::~Monitor()
{
PR_DestroyLock(lock_);
PR_DestroyCondVar(condVar_);
}
bool
Monitor::init()
{
lock_ = PR_NewLock();
if (!lock_)
return false;
condVar_ = PR_NewCondVar(lock_);
if (!condVar_)
return false;
return true;
}
}

82
js/src/vm/monitor.h Normal file
View File

@ -0,0 +1,82 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 4 -*-
* vim: set ts=8 sw=4 et tw=78:
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifndef jsmonitor_h___
#define jsmonitor_h___
#include <stdlib.h>
#include "mozilla/Util.h"
#include "js/Utility.h"
#include "prlock.h"
#include "prcvar.h"
namespace js {
/*
* A base class used for types intended to be used in a parallel
* fashion, such as the workers in the |ThreadPool| class. Combines a
* lock and a condition variable. You can acquire the lock or signal
* the condition variable using the |AutoLockMonitor| type.
*/
class Monitor
{
protected:
friend class AutoLockMonitor;
friend class AutoUnlockMonitor;
PRLock *lock_;
PRCondVar *condVar_;
public:
Monitor();
~Monitor();
bool init();
};
class AutoLockMonitor
{
private:
Monitor &monitor;
public:
AutoLockMonitor(Monitor &monitor) : monitor(monitor) {
PR_Lock(monitor.lock_);
}
~AutoLockMonitor() {
PR_Unlock(monitor.lock_);
}
void wait() {
mozilla::DebugOnly<PRStatus> status =
PR_WaitCondVar(monitor.condVar_, PR_INTERVAL_NO_TIMEOUT);
JS_ASSERT(status == PR_SUCCESS);
}
void notify() {
PR_NotifyCondVar(monitor.condVar_);
}
void notifyAll() {
PR_NotifyAllCondVar(monitor.condVar_);
}
};
class AutoUnlockMonitor
{
private:
Monitor &monitor;
public:
AutoUnlockMonitor(Monitor &monitor) : monitor(monitor) { PR_Unlock(monitor.lock_); }
~AutoUnlockMonitor() { PR_Lock(monitor.lock_); }
};
}
#endif /* ndef jsmonitor_h___ */

267
js/src/vm/threadpool.cpp Normal file
View File

@ -0,0 +1,267 @@
/* -*- Mode: C++; c-basic-offset: 4; tab-width: 4; indent-tabs-mode: nil -*- */
/* vim: set ts=4 sw=4 et tw=99: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "jscntxt.h"
#include "jslock.h"
#include "vm/threadpool.h"
#include "prthread.h"
#include "monitor.h"
namespace js {
/****************************************************************************
* ThreadPoolWorker
*
* Each |ThreadPoolWorker| just hangs around waiting for items to be added
* to its |worklist_|. Whenever something is added, it gets executed.
* Once the worker's state is set to |TERMINATING|, the worker will
* exit as soon as its queue is empty.
*/
#define WORKER_THREAD_STACK_SIZE (1*1024*1024)
enum WorkerState {
CREATED, ACTIVE, TERMINATING, TERMINATED
};
class ThreadPoolWorker : public Monitor
{
const size_t workerId_;
ThreadPool *const threadPool_;
/* Currrent point in the worker's lifecycle.
*
* Modified only while holding the ThreadPoolWorker's lock */
WorkerState state_;
/* Worklist for this thread.
*
* Modified only while holding the ThreadPoolWorker's lock */
js::Vector<TaskExecutor*, 4, SystemAllocPolicy> worklist_;
/* The thread's main function */
static void ThreadMain(void *arg);
void run();
public:
ThreadPoolWorker(size_t workerId, ThreadPool *tp);
~ThreadPoolWorker();
bool init();
/* Invoked from main thread; signals worker to start */
bool start();
/* Submit work to be executed. If this returns true, you are
guaranteed that the task will execute before the thread-pool
terminates (barring an infinite loop in some prior task) */
bool submit(TaskExecutor *task);
/* Invoked from main thread; signals worker to terminate
* and blocks until termination completes */
void terminate();
};
ThreadPoolWorker::ThreadPoolWorker(size_t workerId, ThreadPool *tp)
: workerId_(workerId), threadPool_(tp), state_(CREATED), worklist_()
{}
ThreadPoolWorker::~ThreadPoolWorker()
{}
bool
ThreadPoolWorker::init()
{
return Monitor::init();
}
bool
ThreadPoolWorker::start()
{
JS_ASSERT(state_ == CREATED);
// Set state to active now, *before* the thread starts:
state_ = ACTIVE;
if (!PR_CreateThread(PR_USER_THREAD,
ThreadMain, this,
PR_PRIORITY_NORMAL, PR_LOCAL_THREAD,
PR_UNJOINABLE_THREAD,
WORKER_THREAD_STACK_SIZE))
{
// If the thread failed to start, call it TERMINATED.
state_ = TERMINATED;
return false;
}
return true;
}
void
ThreadPoolWorker::ThreadMain(void *arg)
{
ThreadPoolWorker *thread = (ThreadPoolWorker*) arg;
thread->run();
}
void
ThreadPoolWorker::run()
{
// This is hokey in the extreme. To compute the stack limit,
// subtract the size of the stack from the address of a local
// variable and give a 2k buffer. Is there a better way?
uintptr_t stackLimitOffset = WORKER_THREAD_STACK_SIZE - 2*1024;
uintptr_t stackLimit = (((uintptr_t)&stackLimitOffset) +
stackLimitOffset * JS_STACK_GROWTH_DIRECTION);
AutoLockMonitor lock(*this);
for (;;) {
while (!worklist_.empty()) {
TaskExecutor *task = worklist_.popCopy();
{
// Unlock so that new things can be added to the
// worklist while we are processing the current item:
AutoUnlockMonitor unlock(*this);
task->executeFromWorker(workerId_, stackLimit);
}
}
if (state_ == TERMINATING)
break;
JS_ASSERT(state_ == ACTIVE);
lock.wait();
}
JS_ASSERT(worklist_.empty() && state_ == TERMINATING);
state_ = TERMINATED;
lock.notify();
}
bool
ThreadPoolWorker::submit(TaskExecutor *task)
{
AutoLockMonitor lock(*this);
JS_ASSERT(state_ == ACTIVE);
if (!worklist_.append(task))
return false;
lock.notify();
return true;
}
void
ThreadPoolWorker::terminate()
{
AutoLockMonitor lock(*this);
if (state_ == CREATED) {
state_ = TERMINATED;
return;
} else if (state_ == ACTIVE) {
state_ = TERMINATING;
lock.notify();
while (state_ != TERMINATED) {
lock.wait();
}
} else {
JS_ASSERT(state_ == TERMINATED);
}
}
/****************************************************************************
* ThreadPool
*
* The |ThreadPool| starts up workers, submits work to them, and shuts
* them down when requested.
*/
ThreadPool::ThreadPool(JSRuntime *rt)
: runtime_(rt),
nextId_(0)
{
}
ThreadPool::~ThreadPool() {
terminateWorkers();
while (workers_.length() > 0) {
ThreadPoolWorker *worker = workers_.popCopy();
js_delete(worker);
}
}
bool
ThreadPool::init()
{
#ifdef JS_THREADSAFE_ION
// Compute desired number of workers based on env var or # of CPUs.
size_t numWorkers = 0;
char *pathreads = getenv("PATHREADS");
if (pathreads != NULL) {
numWorkers = strtol(pathreads, NULL, 10);
} else {
numWorkers = GetCPUCount() - 1;
}
// Allocate workers array and then start the worker threads.
// Ensure that the field numWorkers_ always tracks the number of
// *successfully initialized* workers.
for (size_t workerId = 0; workerId < numWorkers; workerId++) {
ThreadPoolWorker *worker = js_new<ThreadPoolWorker>(workerId, this);
if (!worker->init()) {
js_delete(worker);
return false;
}
if (!workers_.append(worker)) {
js_delete(worker);
return false;
}
if (!worker->start()) {
return false;
}
}
#endif
return true;
}
void
ThreadPool::terminateWorkers()
{
for (size_t i = 0; i < workers_.length(); i++) {
workers_[i]->terminate();
}
}
bool
ThreadPool::submitOne(TaskExecutor *executor) {
runtime_->assertValidThread();
if (numWorkers() == 0)
return false;
// Find next worker in round-robin fashion.
size_t id = JS_ATOMIC_INCREMENT(&nextId_) % workers_.length();
return workers_[id]->submit(executor);
}
bool
ThreadPool::submitAll(TaskExecutor *executor) {
for (size_t id = 0; id < workers_.length(); id++) {
if (!workers_[id]->submit(executor))
return false;
}
return true;
}
bool
ThreadPool::terminate() {
terminateWorkers();
return true;
}
}

107
js/src/vm/threadpool.h Normal file
View File

@ -0,0 +1,107 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 4 -*-
* vim: set ts=8 sw=4 et tw=78:
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifndef jsthreadpool_h___
#define jsthreadpool_h___
#if defined(JS_THREADSAFE) && defined(JS_ION)
# define JS_THREADSAFE_ION
#endif
#include <stddef.h>
#include "mozilla/StandardInteger.h"
#include "prtypes.h"
#include "js/Vector.h"
#include "jsalloc.h"
#include "prlock.h"
#include "prcvar.h"
struct JSContext;
struct JSRuntime;
struct JSCompartment;
struct JSScript;
namespace js {
class ThreadPoolWorker;
typedef void (*TaskFun)(void *userdata, size_t workerId, uintptr_t stackLimit);
class TaskExecutor
{
public:
virtual void executeFromWorker(size_t workerId, uintptr_t stackLimit) = 0;
};
/*
* ThreadPool used for parallel JavaScript execution as well as
* parallel compilation. Unless you are building a new kind of
* parallel service, it is very likely that you do not wish to
* interact with the threadpool directly. In particular, if you wish
* to execute JavaScript in parallel, you probably want to look at
* |js::ForkJoin| in |forkjoin.cpp|.
*
* The ThreadPool always maintains a fixed pool of worker threads.
* You can query the number of worker threads via the method
* |numWorkers()|. Note that this number may be zero (generally if
* threads are disabled, or when manually specified for benchmarking
* purposes).
*
* You can either submit jobs in one of two ways. The first is
* |submitOne()|, which submits a job to be executed by one worker
* thread (this will fail if there are no worker threads). The job
* will be enqueued and executed by some worker (the current scheduler
* uses round-robin load balancing; something more sophisticated,
* e.g. a central queue or work stealing, might be better).
*
* The second way to submit a job is using |submitAll()|---in this
* case, the job will be executed by all worker threads. This does
* not fail if there are no worker threads, it simply does nothing.
* Of course, each thread may have any number of previously submitted
* things that they are already working on, and so they will finish
* those before they get to this job. Therefore it is possible to
* have some worker threads pick up (and even finish) their piece of
* the job before others have even started.
*/
class ThreadPool
{
private:
friend class ThreadPoolWorker;
// Initialized at startup only:
JSRuntime *const runtime_;
js::Vector<ThreadPoolWorker*, 8, SystemAllocPolicy> workers_;
// Next worker for |submitOne()|. Atomically modified.
size_t nextId_;
void terminateWorkers();
public:
ThreadPool(JSRuntime *rt);
~ThreadPool();
bool init();
// Return number of worker threads in the pool.
size_t numWorkers() { return workers_.length(); }
// See comment on class:
bool submitOne(TaskExecutor *executor);
bool submitAll(TaskExecutor *executor);
// Wait until all worker threads have finished their current set
// of jobs and then return. You must not submit new jobs after
// invoking |terminate()|.
bool terminate();
};
}
#endif