Merge pull request #70 from PeterTh/master

Added threadpool
This commit is contained in:
Henrik Rydgård 2013-05-01 08:16:45 -07:00
commit da164e1abd
6 changed files with 145 additions and 0 deletions

View File

@ -45,6 +45,7 @@ LOCAL_SRC_FILES :=\
profiler/profiler.cpp \
thread/threadutil.cpp \
thread/prioritizedworkqueue.cpp \
thread/threadpool.cpp \
gfx_es2/glsl_program.cpp \
gfx_es2/gl_state.cpp \
gfx_es2/draw_buffer.cpp.arm \

View File

@ -245,6 +245,7 @@
<ClInclude Include="profiler\profiler.h" />
<ClInclude Include="thread\prioritizedworkqueue.h" />
<ClInclude Include="thread\thread.h" />
<ClInclude Include="thread\threadpool.h" />
<ClInclude Include="thread\threadutil.h" />
<ClInclude Include="ui\screen.h" />
<ClInclude Include="ui\ui.h" />
@ -358,6 +359,7 @@
<ClCompile Include="net\resolve.cpp" />
<ClCompile Include="profiler\profiler.cpp" />
<ClCompile Include="thread\prioritizedworkqueue.cpp" />
<ClCompile Include="thread\threadpool.cpp" />
<ClCompile Include="thread\threadutil.cpp" />
<ClCompile Include="ui\screen.cpp" />
<ClCompile Include="ui\ui.cpp" />

View File

@ -251,6 +251,9 @@
<ClInclude Include="file\ini_file.h">
<Filter>file</Filter>
</ClInclude>
<ClInclude Include="thread\threadpool.h">
<Filter>thread</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<ClCompile Include="gfx\gl_debug_log.cpp">
@ -444,6 +447,9 @@
<ClCompile Include="file\ini_file.cpp">
<Filter>file</Filter>
</ClCompile>
<ClCompile Include="thread\threadpool.cpp">
<Filter>thread</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<Filter Include="gfx">

View File

@ -29,6 +29,7 @@
// WIN32
#define WIN32_LEAN_AND_MEAN
#define NOMINMAX
#include <Windows.h>
#if defined(_MSC_VER) && defined(_MT)

78
thread/threadpool.cpp Normal file
View File

@ -0,0 +1,78 @@
#include "threadpool.h"
///////////////////////////// WorkerThread
WorkerThread::WorkerThread() : active(true), started(false) {
thread = new std::thread(std::bind(&WorkerThread::WorkFunc, this));
doneMutex.lock();
while(!started) { };
}
WorkerThread::~WorkerThread() {
mutex.lock();
active = false;
signal.notify_one();
mutex.unlock();
thread->join();
delete thread;
}
void WorkerThread::Process(const std::function<void()>& work) {
mutex.lock();
work_ = work;
signal.notify_one();
mutex.unlock();
}
void WorkerThread::WaitForCompletion() {
done.wait(doneMutex);
}
void WorkerThread::WorkFunc() {
mutex.lock();
started = true;
while(active) {
signal.wait(mutex);
if(active) {
work_();
doneMutex.lock();
done.notify_one();
doneMutex.unlock();
}
}
}
///////////////////////////// ThreadPool
ThreadPool::ThreadPool(int numThreads) : numThreads(numThreads), workersStarted(false) {
}
void ThreadPool::StartWorkers() {
if(!workersStarted) {
for(int i=0; i<numThreads; ++i) {
workers.push_back(std::make_shared<WorkerThread>());
}
workersStarted = true;
}
}
void ThreadPool::ParallelLoop(std::function<void(int,int)> loop, int lower, int upper) {
mutex.lock();
StartWorkers();
int range = upper-lower;
if(range >= numThreads*2) { // don't parallelize tiny loops (this could be better, maybe add optional parameter that estimates work per iteration)
// could do slightly better load balancing for the generic case,
// but doesn't matter since all our loops are power of 2
int chunk = range/numThreads;
for(int s=lower, i=0; i<numThreads; s+=chunk, ++i) {
workers[i]->Process(std::bind(loop, s, std::min(s+chunk,upper)));
}
for(int i=0; i<numThreads; ++i) {
workers[i]->WaitForCompletion();
}
} else {
loop(lower, upper);
}
mutex.unlock();
}

57
thread/threadpool.h Normal file
View File

@ -0,0 +1,57 @@
#pragma once
#include "thread.h"
#include "base/mutex.h"
#include <functional>
#include <vector>
// This is the simplest possible worker implementation I can think of
// but entirely sufficient for the given purpose.
// Only handles a single item of work at a time.
class WorkerThread {
public:
WorkerThread();
~WorkerThread();
// submit a new work item
void Process(const std::function<void()>& work);
// wait for a submitted work item to be completed
void WaitForCompletion();
private:
std::thread *thread; // the worker thread
condition_variable signal; // used to signal new work
condition_variable done; // used to signal work completion
recursive_mutex mutex, doneMutex; // associated with each respective condition variable
volatile bool active, started;
std::function<void()> work_; // the work to be done by this thread
void WorkFunc();
WorkerThread(const WorkerThread& other); // prevent copies
void operator =(const WorkerThread &other);
};
// A thread pool manages a set of worker threads, and allows the execution of parallel loops on them
// individual parallel loops are fully sequentialized to simplify synchronization, which should not
// be a problem as they should each use the entire system
class ThreadPool {
public:
ThreadPool(int numThreads);
// don't need a destructor, "workers" is cleared on delete,
// leading to the stopping and joining of all worker threads (RAII and all that)
void ParallelLoop(std::function<void(int,int)> loop, int lower, int upper);
private:
const int numThreads;
std::vector<std::shared_ptr<WorkerThread>> workers;
recursive_mutex mutex; // used to sequentialize loop execution
bool workersStarted;
void StartWorkers();
ThreadPool(const ThreadPool& other); // prevent copies
void operator =(const WorkerThread &other);
};