From 9a65eb5ce274aa49febf47ef66c1afbbe425f1cc Mon Sep 17 00:00:00 2001 From: Niko Matsakis Date: Wed, 21 Nov 2012 13:31:57 -0500 Subject: [PATCH] Bug 801087 - Implement parallel workers threadpool in preparation for Rivertrail. r=dmandelin --- js/src/Makefile.in | 3 + js/src/jsapi.cpp | 4 + js/src/jsarray.cpp | 2 + js/src/jscntxt.h | 3 + js/src/vm/forkjoin.cpp | 496 ++++++++++++++++++++++++++++++++++++ js/src/vm/forkjoin.h | 208 +++++++++++++++ js/src/vm/forkjoininlines.h | 12 + js/src/vm/monitor.cpp | 30 +++ js/src/vm/monitor.h | 82 ++++++ js/src/vm/threadpool.cpp | 267 +++++++++++++++++++ js/src/vm/threadpool.h | 107 ++++++++ 11 files changed, 1214 insertions(+) create mode 100644 js/src/vm/forkjoin.cpp create mode 100644 js/src/vm/forkjoin.h create mode 100644 js/src/vm/forkjoininlines.h create mode 100644 js/src/vm/monitor.cpp create mode 100644 js/src/vm/monitor.h create mode 100644 js/src/vm/threadpool.cpp create mode 100644 js/src/vm/threadpool.h diff --git a/js/src/Makefile.in b/js/src/Makefile.in index 4011c242e42e..ae1fa49b6975 100644 --- a/js/src/Makefile.in +++ b/js/src/Makefile.in @@ -109,6 +109,9 @@ CPPSRCS = \ jswatchpoint.cpp \ jsweakmap.cpp \ jsworkers.cpp \ + threadpool.cpp \ + monitor.cpp \ + forkjoin.cpp \ jswrapper.cpp \ jsxml.cpp \ prmjtime.cpp \ diff --git a/js/src/jsapi.cpp b/js/src/jsapi.cpp index 69f521b9f45c..d8411817074b 100644 --- a/js/src/jsapi.cpp +++ b/js/src/jsapi.cpp @@ -872,6 +872,7 @@ JSRuntime::JSRuntime(JSUseHelperThreads useHelperThreads) ionStackLimit(0), ionActivation(NULL), ionPcScriptCache(NULL), + threadPool(this), ionReturnOverride_(MagicValue(JS_ARG_POISON)), useHelperThreads_(useHelperThreads) { @@ -937,6 +938,9 @@ JSRuntime::init(uint32_t maxbytes) if (!scriptFilenameTable.init()) return false; + if (!threadPool.init()) + return false; + #ifdef JS_THREADSAFE if (!sourceCompressorThread.init()) return false; diff --git a/js/src/jsarray.cpp b/js/src/jsarray.cpp index cd909d915eb5..60a6d13c1deb 100644 --- a/js/src/jsarray.cpp +++ b/js/src/jsarray.cpp @@ -89,6 +89,8 @@ #include "jsobj.h" #include "jsscope.h" #include "jswrapper.h" +#include "vm/threadpool.h" +#include "vm/forkjoin.h" #include "methodjit/MethodJIT.h" #include "methodjit/StubCalls.h" #include "methodjit/StubCalls-inl.h" diff --git a/js/src/jscntxt.h b/js/src/jscntxt.h index 22bb366713fa..6df350927e2f 100644 --- a/js/src/jscntxt.h +++ b/js/src/jscntxt.h @@ -26,6 +26,7 @@ #include "jsprototypes.h" #include "jsutil.h" #include "prmjtime.h" +#include "vm/threadpool.h" #include "ds/LifoAlloc.h" #include "gc/Statistics.h" @@ -1025,6 +1026,8 @@ struct JSRuntime : js::RuntimeFriendFields // Cache for ion::GetPcScript(). js::ion::PcScriptCache *ionPcScriptCache; + js::ThreadPool threadPool; + private: // In certain cases, we want to optimize certain opcodes to typed instructions, // to avoid carrying an extra register to feed into an unbox. Unfortunately, diff --git a/js/src/vm/forkjoin.cpp b/js/src/vm/forkjoin.cpp new file mode 100644 index 000000000000..4075f218e7d2 --- /dev/null +++ b/js/src/vm/forkjoin.cpp @@ -0,0 +1,496 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 4 -*- + * vim: set ts=8 sw=4 et tw=78: + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "forkjoin.h" +#include "monitor.h" +#include "jscntxt.h" +#include "jscompartment.h" +#include "prthread.h" +#include "forkjoininlines.h" + +namespace js { + +class ForkJoinShared + : public TaskExecutor, + public Monitor +{ + //////////////////////////////////////////////////////////////////////// + // Constant fields + + JSContext *const cx_; // Current context + ThreadPool *const threadPool_; // The thread pool. + ForkJoinOp &op_; // User-defined operations to be perf. in par. + const size_t numThreads_; // Total number of threads. + PRCondVar *rendezvousEnd_; // Cond. var used to signal end of rendezvous. + + //////////////////////////////////////////////////////////////////////// + // Per-thread arenas + // + // Each worker thread gets an arena to use when allocating. + + Vector arenaListss_; + + //////////////////////////////////////////////////////////////////////// + // Locked Fields + // + // Only to be accessed while holding the lock. + + size_t uncompleted_; // Number of uncompleted worker threads. + size_t blocked_; // Number of threads that have joined the rendezvous. + size_t rendezvousIndex_; // Number of rendezvous attempts + + //////////////////////////////////////////////////////////////////////// + // Asynchronous Flags + // + // These can be read without the lock (hence the |volatile| declaration). + + // A thread has bailed and others should follow suit. Set and + // read asynchronously. After setting abort, workers will acquire + // the lock, decrement uncompleted, and then notify if uncompleted + // has reached blocked. + volatile bool abort_; + + // Set to true when a worker bails for a fatal reason. + volatile bool fatal_; + + // A thread has request a rendezvous. Only *written* with the + // lock (in |initiateRendezvous()| and |endRendezvous()|) but may + // be *read* without the lock. + volatile bool rendezvous_; + + // Invoked only from the main thread: + void executeFromMainThread(uintptr_t stackLimit); + + // Executes slice #threadId of the work, either from a worker or + // the main thread. + void executePortion(PerThreadData *perThread, size_t threadId, uintptr_t stackLimit); + + // Rendezvous protocol: + // + // Use AutoRendezvous rather than invoking initiateRendezvous() + // and endRendezvous() directly. + + friend class AutoRendezvous; + + // Requests that the other threads stop. Must be invoked from the + // main thread. + void initiateRendezvous(ForkJoinSlice &threadCx); + + // If a rendezvous has been requested, blocks until the main + // thread says we may continue. + void joinRendezvous(ForkJoinSlice &threadCx); + + // Permits other threads to resume execution. Must be invoked + // from the main thread after a call to initiateRendezvous(). + void endRendezvous(ForkJoinSlice &threadCx); + +public: + ForkJoinShared(JSContext *cx, + ThreadPool *threadPool, + ForkJoinOp &op, + size_t numThreads, + size_t uncompleted); + ~ForkJoinShared(); + + bool init(); + + ParallelResult execute(); + + // Invoked from parallel worker threads: + virtual void executeFromWorker(size_t threadId, uintptr_t stackLimit); + + // Moves all the per-thread arenas into the main compartment. + // This can only safely be invoked on the main thread, either + // during a rendezvous or after the workers have completed. + void transferArenasToCompartment(); + + // Invoked during processing by worker threads to "check in" + bool check(ForkJoinSlice &threadCx); + + // See comment on |ForkJoinSlice::setFatal()| in forkjoin.h + bool setFatal(); + + JSRuntime *runtime() { return cx_->runtime; } +}; + +class AutoRendezvous { +private: + ForkJoinSlice &threadCx; + +public: + AutoRendezvous(ForkJoinSlice &threadCx) + : threadCx(threadCx) + { + threadCx.shared->initiateRendezvous(threadCx); + } + + ~AutoRendezvous() + { + threadCx.shared->endRendezvous(threadCx); + } +}; + +PRUintn ForkJoinSlice::ThreadPrivateIndex; + +class AutoSetForkJoinSlice +{ +public: + AutoSetForkJoinSlice(ForkJoinSlice *threadCx) + { + PR_SetThreadPrivate(ForkJoinSlice::ThreadPrivateIndex, threadCx); + } + + ~AutoSetForkJoinSlice() + { + PR_SetThreadPrivate(ForkJoinSlice::ThreadPrivateIndex, NULL); + } +}; + +bool +ForkJoinSlice::Initialize() +{ + PRStatus status = PR_NewThreadPrivateIndex(&ThreadPrivateIndex, NULL); + return status == PR_SUCCESS; +} + +ParallelResult ExecuteForkJoinOp(JSContext *cx, ForkJoinOp &op) +{ +# ifndef JS_THREADSAFE_ION + return TP_RETRY_SEQUENTIALLY; +# else + JS_ASSERT(!InParallelSection()); // Recursive use of the ThreadPool is not supported. + + ThreadPool *threadPool = &cx->runtime->threadPool; + size_t numThreads = threadPool->numWorkers() + 1; // parallel workers plus this main thread + + ForkJoinShared shared(cx, threadPool, op, numThreads, numThreads - 1); + if (!shared.init()) + return TP_RETRY_SEQUENTIALLY; + + return shared.execute(); +# endif +} + +/**************************************************************************** + * ForkJoinShared + */ + +ForkJoinShared::ForkJoinShared(JSContext *cx, + ThreadPool *threadPool, + ForkJoinOp &op, + size_t numThreads, + size_t uncompleted) + : cx_(cx), + threadPool_(threadPool), + op_(op), + numThreads_(numThreads), + arenaListss_(cx), + uncompleted_(uncompleted), + blocked_(0), + rendezvousIndex_(0), + abort_(false), + fatal_(false), + rendezvous_(false) +{} + +bool +ForkJoinShared::init() +{ + // Create temporary arenas to hold the data allocated during the + // parallel code. + // + // Note: you might think (as I did, initially) that we could use + // compartment ArenaLists for the main thread. This is not true, + // because when executing parallel code we sometimes check what + // arena list an object is in to decide if it is writable. If we + // used the compartment ArenaLists for the main thread, then the + // main thread would be permitted to write to any object it wants. + + if (!Monitor::init()) + return false; + + rendezvousEnd_ = PR_NewCondVar(lock_); + if (!rendezvousEnd_) + return false; + + for (unsigned i = 0; i < numThreads_; i++) { + gc::ArenaLists *arenaLists = cx_->new_(); + if (!arenaLists) + return false; + + if (!arenaListss_.append(arenaLists)) { + delete arenaLists; + return false; + } + } + + return true; +} + +ForkJoinShared::~ForkJoinShared() +{ + PR_DestroyCondVar(rendezvousEnd_); + + while (arenaListss_.length() > 0) { + delete arenaListss_.popCopy(); + } +} + +ParallelResult +ForkJoinShared::execute() +{ + AutoLockMonitor lock(*this); + + // give the task set a chance to prepare for parallel workload + if (!op_.pre(numThreads_)) + return TP_RETRY_SEQUENTIALLY; + + // notify workers to start and execute one portion on this thread + { + AutoUnlockMonitor unlock(*this); + threadPool_->submitAll(this); + executeFromMainThread(cx_->runtime->ionStackLimit); + } + + // wait for workers to complete + while (uncompleted_ > 0) + lock.wait(); + + // check if any of the workers failed + if (abort_) { + if (fatal_) + return TP_FATAL; + else + return TP_RETRY_SEQUENTIALLY; + } + + transferArenasToCompartment(); + + // give task set a chance to cleanup after parallel execution + if (!op_.post(numThreads_)) + return TP_RETRY_SEQUENTIALLY; + + return TP_SUCCESS; // everything went swimmingly. give yourself a pat on the back. +} + +void +ForkJoinShared::transferArenasToCompartment() +{ +#if 0 + // This code will become relevant once other + // bugs are merged down. + + JSRuntime *rt = cx_->runtime; + JSCompartment *comp = cx_->compartment; + for (unsigned i = 0; i < numThreads_; i++) { + comp->arenas.adoptArenas(rt, arenaListss_[i]); + } +#endif +} + +void +ForkJoinShared::executeFromWorker(size_t workerId, uintptr_t stackLimit) +{ + JS_ASSERT(workerId < numThreads_ - 1); + + PerThreadData thisThread(cx_->runtime); + TlsPerThreadData.set(&thisThread); + executePortion(&thisThread, workerId, stackLimit); + TlsPerThreadData.set(NULL); + + AutoLockMonitor lock(*this); + uncompleted_ -= 1; + if (blocked_ == uncompleted_) { + // Signal the main thread that we have terminated. It will be + // either working, arranging a rendezvous, or waiting for + // workers to complete. + lock.notify(); + } +} + +void +ForkJoinShared::executeFromMainThread(uintptr_t stackLimit) +{ + executePortion(&cx_->runtime->mainThread, numThreads_ - 1, stackLimit); +} + +void +ForkJoinShared::executePortion(PerThreadData *perThread, + size_t threadId, + uintptr_t stackLimit) +{ + gc::ArenaLists *arenaLists = arenaListss_[threadId]; + ForkJoinSlice slice(perThread, threadId, numThreads_, + stackLimit, arenaLists, this); + AutoSetForkJoinSlice autoContext(&slice); + + if (!op_.parallel(slice)) + abort_ = true; +} + +bool +ForkJoinShared::setFatal() +{ + // Might as well set the abort flag to true, it will make + // propagation faster: + abort_ = true; + fatal_ = true; + return false; +} + +bool +ForkJoinShared::check(ForkJoinSlice &slice) +{ + if (abort_) + return false; + + if (slice.isMainThread()) { + if (cx_->runtime->interrupt) { + // If interrupt is requested, bring worker threads to a + // halt, service the interrupt, then let them start back + // up again. + AutoRendezvous autoRendezvous(slice); + if (!js_HandleExecutionInterrupt(cx_)) { + return setFatal(); + } + } + } else if (rendezvous_) { + joinRendezvous(slice); + } + + return true; +} + +void +ForkJoinShared::initiateRendezvous(ForkJoinSlice &slice) { + /* + The rendezvous protocol is always initiated by the main thread. + The main thread sets the rendezvous flag to true. Seeing this + flag, other threads will invoke |joinRendezvous()|, which causes + them to (1) read |rendezvousIndex| and (2) increment the + |blocked| counter. Once the |blocked| counter is equal to + |uncompleted|, all parallel threads have joined the rendezvous, + and so the main thread is signaled. That will cause this + function to return. + + Some subtle points: + + - Worker threads may potentially terminate their work before + they see the rendezvous flag. In this case, they would + decrement |uncompleted| rather than incrementing |blocked|. + Either way, if the two variables become equal, the main thread + will be notified + + - The |rendezvousIndex| counter is used to detect the case where + the main thread signals the end of the rendezvous and then + starts another rendezvous before the workers have a chance to + exit. We circumvent this by having the workers read the + |rendezvousIndex| counter as they enter the rendezvous, and + then they only block until that counter is incremented. + Another alternative would be for the main thread to block in + |endRendezvous()| until all workers have exited, but that + would be slower and involve unnecessary synchronization. + + Note that the main thread cannot ever get more than one + rendezvous ahead of the workers, because it must wait for all + of them to enter the rendezvous before it can end it, so the + solution of using a counter is perfectly general and we need + not fear rollover. + */ + + + JS_ASSERT(slice.isMainThread()); + JS_ASSERT(!rendezvous_ && blocked_ == 0); + + AutoLockMonitor lock(*this); + + // signal other threads we want to start a rendezvous + rendezvous_ = true; + + // wait until all the other threads blocked themselves + while (blocked_ != uncompleted_) { + lock.wait(); + } +} + +void +ForkJoinShared::joinRendezvous(ForkJoinSlice &slice) { + JS_ASSERT(!slice.isMainThread()); + JS_ASSERT(rendezvous_); + + AutoLockMonitor lock(*this); + const size_t index = rendezvousIndex_; + blocked_ += 1; + + // If we're the last to arrive, let the main thread know about it. + if (blocked_ == uncompleted_) { + lock.notify(); + } + + // Wait until the main thread terminates the rendezvous. We use a + // separate condition variable here to distinguish between workers + // notifying the main thread that they have completed and the main + // thread notifying the workers to resume. + while (rendezvousIndex_ == index) { + PR_WaitCondVar(rendezvousEnd_, PR_INTERVAL_NO_TIMEOUT); + } +} + +void +ForkJoinShared::endRendezvous(ForkJoinSlice &slice) { + JS_ASSERT(slice.isMainThread()); + + AutoLockMonitor lock(*this); + rendezvous_ = false; + blocked_ = 0; + rendezvousIndex_ += 1; + + // signal other threads that rendezvous is over + PR_NotifyAllCondVar(rendezvousEnd_); +} + +/**************************************************************************** + * ForkJoinSlice + */ + +ForkJoinSlice::ForkJoinSlice(PerThreadData *perThreadData, + size_t sliceId, size_t numSlices, + uintptr_t stackLimit, gc::ArenaLists *arenaLists, + ForkJoinShared *shared) + : perThreadData(perThreadData), + sliceId(sliceId), + numSlices(numSlices), + ionStackLimit(stackLimit), + arenaLists(arenaLists), + shared(shared) +{} + +bool +ForkJoinSlice::isMainThread() +{ + return perThreadData == &shared->runtime()->mainThread; +} + +JSRuntime * +ForkJoinSlice::runtime() +{ + return shared->runtime(); +} + +bool +ForkJoinSlice::check() +{ + return shared->check(*this); +} + +bool +ForkJoinSlice::setFatal() +{ + return shared->setFatal(); +} + +} diff --git a/js/src/vm/forkjoin.h b/js/src/vm/forkjoin.h new file mode 100644 index 000000000000..2a38b64c461d --- /dev/null +++ b/js/src/vm/forkjoin.h @@ -0,0 +1,208 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 4 -*- + * vim: set ts=8 sw=4 et tw=78: + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#ifndef jstaskset_h___ +#define jstaskset_h___ + +#include "threadpool.h" + +/* + * ForkJoin + * + * This is the building block for executing multi-threaded JavaScript + * with shared memory (as distinct from Web Workers). The idea is + * that you have some (typically data-parallel) operation which you + * wish to execute in parallel across as many threads as you have + * available. An example might be applying |map()| to a vector in + * parallel. To implement such a thing, you would define a subclass of + * |ForkJoinOp| to implement the operation and then invoke + * |ExecuteForkJoinOp()|, as follows: + * + * > class MyForkJoinOp { + * > ... define callbacks as appropriate for your operation ... + * > }; + * > MyForkJoinOp op; + * > ExecuteForkJoinOp(cx, op); + * + * |ExecuteForkJoinOp()| will fire up the workers in the runtime's + * thread pool, have them execute the callbacks defined in the + * |ForkJoinOp| class, and then return once all the workers have + * completed. + * + * There are three callbacks defined in |ForkJoinOp|. The first, + * |pre()|, is invoked before the parallel section begins. It informs + * you how many slices your problem will be divided into (effectively, + * how many worker threads there will be). This is often useful for + * allocating an array for the workers to store their result or + * something like that. + * + * Next, you will receive |N| calls to the |parallel()| callback, + * where |N| is the number of slices that were specified in |pre()|. + * Each callback will be supplied with a |ForkJoinSlice| instance + * providing some context. + * + * Typically there will be one call to |parallel()| from each worker + * thread, but that is not something you should rely upon---if we + * implement work-stealing, for example, then it could be that a + * single worker thread winds up handling multiple slices. + * + * Finally, after the operation is complete the |post()| callback is + * invoked, giving you a chance to collect the various results. + * + * Operation callback: + * + * During parallel execution, you should periodically invoke + * |slice.check()|, which will handle the operation callback. If the + * operation callback is necessary, |slice.check()| will arrange a + * rendezvous---that is, as each active worker invokes |check()|, it + * will come to a halt until everyone is blocked (Stop The World). At + * this point, we perform the callback on the main thread, and then + * resume execution. If a worker thread terminates before calling + * |check()|, that's fine too. We assume that you do not do unbounded + * work without invoking |check()|. + * + * Sequential Fallback: + * + * It is assumed that anyone using this API must be prepared for a + * sequential fallback. Therefore, the |ExecuteForkJoinOp()| returns + * a status code indicating whether a fatal error occurred (in which + * case you should just stop) or whether you should retry the + * operation, but executing sequentially. An example of where the + * fallback would be useful is if the parallel code encountered an + * unexpected path that cannot safely be executed in parallel (writes + * to shared state, say). + * + * Current Limitations: + * + * - The API does not support recursive or nested use. That is, the + * |parallel()| callback of a |ForkJoinOp| may not itself invoke + * |ExecuteForkJoinOp()|. We may lift this limitation in the + * future. + * + * - No load balancing is performed between worker threads. That + * means that the fork-join system is best suited for problems that + * can be slice into uniform bits. + */ + +namespace js { + +// Parallel operations in general can have one of three states. They +// may succeed, fail, or "bail", where bail indicates that the code +// encountered an unexpected condition and should be re-run +// sequentially. +enum ParallelResult { TP_SUCCESS, TP_RETRY_SEQUENTIALLY, TP_FATAL }; + +struct ForkJoinOp; + +// Executes the given |TaskSet| in parallel using the runtime's +// |ThreadPool|, returning upon completion. In general, if there are +// |N| workers in the threadpool, the problem will be divided into +// |N+1| slices, as the main thread will also execute one slice. +ParallelResult ExecuteForkJoinOp(JSContext *cx, ForkJoinOp &op); + +class ForkJoinShared; +class AutoRendezvous; +class AutoSetForkJoinSlice; +namespace gc { struct ArenaLists; } + +struct ForkJoinSlice +{ +public: + // PerThreadData corresponding to the current worker thread. + PerThreadData *perThreadData; + + // Which slice should you process? Ranges from 0 to |numSlices|. + const size_t sliceId; + + // How many slices are there in total? + const size_t numSlices; + + // Top of the stack. This should move into |perThreadData|. + uintptr_t ionStackLimit; + + // Arenas to use when allocating on this thread. See + // |ion::ParFunctions::ParNewGCThing()|. This should move + // into |perThreadData|. + gc::ArenaLists *const arenaLists; + + ForkJoinSlice(PerThreadData *perThreadData, size_t sliceId, size_t numSlices, + uintptr_t stackLimit, gc::ArenaLists *arenaLists, + ForkJoinShared *shared); + + // True if this is the main thread, false if it is one of the parallel workers + bool isMainThread(); + + // Generally speaking, if a thread returns false, that is + // interpreted as a "bailout"---meaning, a recoverable error. If + // however you call this function before returning false, then the + // error will be interpreted as *fatal*. This doesn't strike me + // as the most elegant solution here but I don't know what'd be better. + // + // For convenience, *always* returns false. + bool setFatal(); + + // During the parallel phase, this method should be invoked + // periodically, for example on every backedge, similar to the + // interrupt check. If it returns false, then the parallel phase + // has been aborted and so you should bailout. The function may + // also rendesvous to perform GC or do other similar things. + bool check(); + + // Be wary, the runtime is shared between all threads! + JSRuntime *runtime(); + + static inline ForkJoinSlice *current(); + static bool Initialize(); + +private: + friend class AutoRendezvous; + friend class AutoSetForkJoinSlice; + + static PRUintn ThreadPrivateIndex; // initialized by Initialize() + + ForkJoinShared *const shared; +}; + +// Generic interface for specifying divisible operations that can be +// executed in a fork-join fashion. +struct ForkJoinOp +{ +public: + // Invoked before parallel phase begins; informs the task set how + // many slices there will be and gives it a chance to initialize + // per-slice data structures. + // + // Returns true on success, false to halt parallel execution. + virtual bool pre(size_t numSlices) = 0; + + // Invoked from each parallel thread to process one slice. The + // |ForkJoinSlice| which is supplied will also be available using + // TLS. + // + // Returns true on success, false to halt parallel execution. + virtual bool parallel(ForkJoinSlice &slice) = 0; + + // Invoked after parallel phase ends if execution was successful + // (not aborted) + // + // Returns true on success, false to halt parallel execution. + virtual bool post(size_t numSlices) = 0; +}; + +/* True if this thread is currently executing a ParallelArray + operation across multiple threads. */ +static inline bool InParallelSection() { +# ifdef JS_THREADSAFE_ION + return ForkJoinSlice::current() != NULL; +# else + return false; +# endif +} + +#endif + +} diff --git a/js/src/vm/forkjoininlines.h b/js/src/vm/forkjoininlines.h new file mode 100644 index 000000000000..8076bf976c08 --- /dev/null +++ b/js/src/vm/forkjoininlines.h @@ -0,0 +1,12 @@ +namespace js { + +ForkJoinSlice * +ForkJoinSlice::current() { +#ifdef JS_THREADSAFE_ION + return (ForkJoinSlice*) PR_GetThreadPrivate(ThreadPrivateIndex); +#else + return NULL; +#endif +} + +} diff --git a/js/src/vm/monitor.cpp b/js/src/vm/monitor.cpp new file mode 100644 index 000000000000..6943a8cc43e5 --- /dev/null +++ b/js/src/vm/monitor.cpp @@ -0,0 +1,30 @@ +#include "vm/monitor.h" + +namespace js { + +Monitor::Monitor() + : lock_(NULL), condVar_(NULL) +{ +} + +Monitor::~Monitor() +{ + PR_DestroyLock(lock_); + PR_DestroyCondVar(condVar_); +} + +bool +Monitor::init() +{ + lock_ = PR_NewLock(); + if (!lock_) + return false; + + condVar_ = PR_NewCondVar(lock_); + if (!condVar_) + return false; + + return true; +} + +} diff --git a/js/src/vm/monitor.h b/js/src/vm/monitor.h new file mode 100644 index 000000000000..adad8a9b5114 --- /dev/null +++ b/js/src/vm/monitor.h @@ -0,0 +1,82 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 4 -*- + * vim: set ts=8 sw=4 et tw=78: + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#ifndef jsmonitor_h___ +#define jsmonitor_h___ + +#include +#include "mozilla/Util.h" +#include "js/Utility.h" +#include "prlock.h" +#include "prcvar.h" + +namespace js { + +/* + * A base class used for types intended to be used in a parallel + * fashion, such as the workers in the |ThreadPool| class. Combines a + * lock and a condition variable. You can acquire the lock or signal + * the condition variable using the |AutoLockMonitor| type. + */ +class Monitor +{ +protected: + friend class AutoLockMonitor; + friend class AutoUnlockMonitor; + + PRLock *lock_; + PRCondVar *condVar_; + +public: + Monitor(); + ~Monitor(); + + bool init(); +}; + +class AutoLockMonitor +{ +private: + Monitor &monitor; + +public: + AutoLockMonitor(Monitor &monitor) : monitor(monitor) { + PR_Lock(monitor.lock_); + } + + ~AutoLockMonitor() { + PR_Unlock(monitor.lock_); + } + + void wait() { + mozilla::DebugOnly status = + PR_WaitCondVar(monitor.condVar_, PR_INTERVAL_NO_TIMEOUT); + JS_ASSERT(status == PR_SUCCESS); + } + + void notify() { + PR_NotifyCondVar(monitor.condVar_); + } + + void notifyAll() { + PR_NotifyAllCondVar(monitor.condVar_); + } +}; + +class AutoUnlockMonitor +{ + private: + Monitor &monitor; + + public: + AutoUnlockMonitor(Monitor &monitor) : monitor(monitor) { PR_Unlock(monitor.lock_); } + ~AutoUnlockMonitor() { PR_Lock(monitor.lock_); } +}; + +} + +#endif /* ndef jsmonitor_h___ */ diff --git a/js/src/vm/threadpool.cpp b/js/src/vm/threadpool.cpp new file mode 100644 index 000000000000..c5aa2d734961 --- /dev/null +++ b/js/src/vm/threadpool.cpp @@ -0,0 +1,267 @@ +/* -*- Mode: C++; c-basic-offset: 4; tab-width: 4; indent-tabs-mode: nil -*- */ +/* vim: set ts=4 sw=4 et tw=99: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "jscntxt.h" +#include "jslock.h" +#include "vm/threadpool.h" +#include "prthread.h" +#include "monitor.h" + +namespace js { + +/**************************************************************************** + * ThreadPoolWorker + * + * Each |ThreadPoolWorker| just hangs around waiting for items to be added + * to its |worklist_|. Whenever something is added, it gets executed. + * Once the worker's state is set to |TERMINATING|, the worker will + * exit as soon as its queue is empty. + */ + +#define WORKER_THREAD_STACK_SIZE (1*1024*1024) + +enum WorkerState { + CREATED, ACTIVE, TERMINATING, TERMINATED +}; + +class ThreadPoolWorker : public Monitor +{ + const size_t workerId_; + ThreadPool *const threadPool_; + + /* Currrent point in the worker's lifecycle. + * + * Modified only while holding the ThreadPoolWorker's lock */ + WorkerState state_; + + /* Worklist for this thread. + * + * Modified only while holding the ThreadPoolWorker's lock */ + js::Vector worklist_; + + /* The thread's main function */ + static void ThreadMain(void *arg); + void run(); + +public: + ThreadPoolWorker(size_t workerId, ThreadPool *tp); + ~ThreadPoolWorker(); + + bool init(); + + /* Invoked from main thread; signals worker to start */ + bool start(); + + /* Submit work to be executed. If this returns true, you are + guaranteed that the task will execute before the thread-pool + terminates (barring an infinite loop in some prior task) */ + bool submit(TaskExecutor *task); + + /* Invoked from main thread; signals worker to terminate + * and blocks until termination completes */ + void terminate(); +}; + +ThreadPoolWorker::ThreadPoolWorker(size_t workerId, ThreadPool *tp) + : workerId_(workerId), threadPool_(tp), state_(CREATED), worklist_() +{} + +ThreadPoolWorker::~ThreadPoolWorker() +{} + +bool +ThreadPoolWorker::init() +{ + return Monitor::init(); +} + +bool +ThreadPoolWorker::start() +{ + JS_ASSERT(state_ == CREATED); + + // Set state to active now, *before* the thread starts: + state_ = ACTIVE; + + if (!PR_CreateThread(PR_USER_THREAD, + ThreadMain, this, + PR_PRIORITY_NORMAL, PR_LOCAL_THREAD, + PR_UNJOINABLE_THREAD, + WORKER_THREAD_STACK_SIZE)) + { + // If the thread failed to start, call it TERMINATED. + state_ = TERMINATED; + return false; + } + + return true; +} + +void +ThreadPoolWorker::ThreadMain(void *arg) +{ + ThreadPoolWorker *thread = (ThreadPoolWorker*) arg; + thread->run(); +} + +void +ThreadPoolWorker::run() +{ + // This is hokey in the extreme. To compute the stack limit, + // subtract the size of the stack from the address of a local + // variable and give a 2k buffer. Is there a better way? + uintptr_t stackLimitOffset = WORKER_THREAD_STACK_SIZE - 2*1024; + uintptr_t stackLimit = (((uintptr_t)&stackLimitOffset) + + stackLimitOffset * JS_STACK_GROWTH_DIRECTION); + + AutoLockMonitor lock(*this); + + for (;;) { + while (!worklist_.empty()) { + TaskExecutor *task = worklist_.popCopy(); + { + // Unlock so that new things can be added to the + // worklist while we are processing the current item: + AutoUnlockMonitor unlock(*this); + task->executeFromWorker(workerId_, stackLimit); + } + } + + if (state_ == TERMINATING) + break; + + JS_ASSERT(state_ == ACTIVE); + + lock.wait(); + } + + JS_ASSERT(worklist_.empty() && state_ == TERMINATING); + state_ = TERMINATED; + lock.notify(); +} + +bool +ThreadPoolWorker::submit(TaskExecutor *task) +{ + AutoLockMonitor lock(*this); + JS_ASSERT(state_ == ACTIVE); + if (!worklist_.append(task)) + return false; + lock.notify(); + return true; +} + +void +ThreadPoolWorker::terminate() +{ + AutoLockMonitor lock(*this); + + if (state_ == CREATED) { + state_ = TERMINATED; + return; + } else if (state_ == ACTIVE) { + state_ = TERMINATING; + lock.notify(); + while (state_ != TERMINATED) { + lock.wait(); + } + } else { + JS_ASSERT(state_ == TERMINATED); + } +} + +/**************************************************************************** + * ThreadPool + * + * The |ThreadPool| starts up workers, submits work to them, and shuts + * them down when requested. + */ + +ThreadPool::ThreadPool(JSRuntime *rt) + : runtime_(rt), + nextId_(0) +{ +} + +ThreadPool::~ThreadPool() { + terminateWorkers(); + while (workers_.length() > 0) { + ThreadPoolWorker *worker = workers_.popCopy(); + js_delete(worker); + } +} + +bool +ThreadPool::init() +{ +#ifdef JS_THREADSAFE_ION + // Compute desired number of workers based on env var or # of CPUs. + size_t numWorkers = 0; + char *pathreads = getenv("PATHREADS"); + if (pathreads != NULL) { + numWorkers = strtol(pathreads, NULL, 10); + } else { + numWorkers = GetCPUCount() - 1; + } + + // Allocate workers array and then start the worker threads. + // Ensure that the field numWorkers_ always tracks the number of + // *successfully initialized* workers. + for (size_t workerId = 0; workerId < numWorkers; workerId++) { + ThreadPoolWorker *worker = js_new(workerId, this); + if (!worker->init()) { + js_delete(worker); + return false; + } + if (!workers_.append(worker)) { + js_delete(worker); + return false; + } + if (!worker->start()) { + return false; + } + } +#endif + + return true; +} + +void +ThreadPool::terminateWorkers() +{ + for (size_t i = 0; i < workers_.length(); i++) { + workers_[i]->terminate(); + } +} + +bool +ThreadPool::submitOne(TaskExecutor *executor) { + runtime_->assertValidThread(); + + if (numWorkers() == 0) + return false; + + // Find next worker in round-robin fashion. + size_t id = JS_ATOMIC_INCREMENT(&nextId_) % workers_.length(); + return workers_[id]->submit(executor); +} + +bool +ThreadPool::submitAll(TaskExecutor *executor) { + for (size_t id = 0; id < workers_.length(); id++) { + if (!workers_[id]->submit(executor)) + return false; + } + return true; +} + +bool +ThreadPool::terminate() { + terminateWorkers(); + return true; +} + +} diff --git a/js/src/vm/threadpool.h b/js/src/vm/threadpool.h new file mode 100644 index 000000000000..c5c7fa810812 --- /dev/null +++ b/js/src/vm/threadpool.h @@ -0,0 +1,107 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 4 -*- + * vim: set ts=8 sw=4 et tw=78: + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#ifndef jsthreadpool_h___ +#define jsthreadpool_h___ + +#if defined(JS_THREADSAFE) && defined(JS_ION) +# define JS_THREADSAFE_ION +#endif + +#include +#include "mozilla/StandardInteger.h" +#include "prtypes.h" +#include "js/Vector.h" +#include "jsalloc.h" +#include "prlock.h" +#include "prcvar.h" + +struct JSContext; +struct JSRuntime; +struct JSCompartment; +struct JSScript; + +namespace js { + +class ThreadPoolWorker; + +typedef void (*TaskFun)(void *userdata, size_t workerId, uintptr_t stackLimit); + +class TaskExecutor +{ +public: + virtual void executeFromWorker(size_t workerId, uintptr_t stackLimit) = 0; +}; + +/* + * ThreadPool used for parallel JavaScript execution as well as + * parallel compilation. Unless you are building a new kind of + * parallel service, it is very likely that you do not wish to + * interact with the threadpool directly. In particular, if you wish + * to execute JavaScript in parallel, you probably want to look at + * |js::ForkJoin| in |forkjoin.cpp|. + * + * The ThreadPool always maintains a fixed pool of worker threads. + * You can query the number of worker threads via the method + * |numWorkers()|. Note that this number may be zero (generally if + * threads are disabled, or when manually specified for benchmarking + * purposes). + * + * You can either submit jobs in one of two ways. The first is + * |submitOne()|, which submits a job to be executed by one worker + * thread (this will fail if there are no worker threads). The job + * will be enqueued and executed by some worker (the current scheduler + * uses round-robin load balancing; something more sophisticated, + * e.g. a central queue or work stealing, might be better). + * + * The second way to submit a job is using |submitAll()|---in this + * case, the job will be executed by all worker threads. This does + * not fail if there are no worker threads, it simply does nothing. + * Of course, each thread may have any number of previously submitted + * things that they are already working on, and so they will finish + * those before they get to this job. Therefore it is possible to + * have some worker threads pick up (and even finish) their piece of + * the job before others have even started. + */ +class ThreadPool +{ +private: + friend class ThreadPoolWorker; + + // Initialized at startup only: + JSRuntime *const runtime_; + js::Vector workers_; + + // Next worker for |submitOne()|. Atomically modified. + size_t nextId_; + + void terminateWorkers(); + +public: + ThreadPool(JSRuntime *rt); + ~ThreadPool(); + + bool init(); + + // Return number of worker threads in the pool. + size_t numWorkers() { return workers_.length(); } + + // See comment on class: + bool submitOne(TaskExecutor *executor); + bool submitAll(TaskExecutor *executor); + + // Wait until all worker threads have finished their current set + // of jobs and then return. You must not submit new jobs after + // invoking |terminate()|. + bool terminate(); +}; + +} + + + +#endif