Implement new thread manager, port stuff to it.

This commit is contained in:
Henrik Rydgård 2020-11-28 00:12:06 +01:00
parent 3d5c387b3b
commit 73871b9b7e
34 changed files with 760 additions and 319 deletions

View File

@ -608,12 +608,16 @@ add_library(Common STATIC
Common/Render/Text/draw_text_uwp.h
Common/System/Display.cpp
Common/System/Display.h
Common/Thread/Channel.h
Common/Thread/ParallelLoop.cpp
Common/Thread/ParallelLoop.h
Common/Thread/PrioritizedWorkQueue.cpp
Common/Thread/PrioritizedWorkQueue.h
Common/Thread/Promise.h
Common/Thread/ThreadUtil.cpp
Common/Thread/ThreadUtil.h
Common/Thread/ThreadPool.cpp
Common/Thread/ThreadPool.h
Common/Thread/ThreadManager.cpp
Common/Thread/ThreadManager.h
Common/UI/Root.cpp
Common/UI/Root.h
Common/UI/Screen.cpp

View File

@ -527,8 +527,11 @@
<ClInclude Include="System\Display.h" />
<ClInclude Include="System\NativeApp.h" />
<ClInclude Include="System\System.h" />
<ClInclude Include="Thread\Channel.h" />
<ClInclude Include="Thread\ParallelLoop.h" />
<ClInclude Include="Thread\PrioritizedWorkQueue.h" />
<ClInclude Include="Thread\ThreadPool.h" />
<ClInclude Include="Thread\Promise.h" />
<ClInclude Include="Thread\ThreadManager.h" />
<ClInclude Include="Thread\ThreadUtil.h" />
<ClInclude Include="Thunk.h" />
<ClInclude Include="TimeUtil.h" />
@ -964,8 +967,9 @@
<ClCompile Include="OSVersion.cpp" />
<ClCompile Include="StringUtils.cpp" />
<ClCompile Include="System\Display.cpp" />
<ClCompile Include="Thread\ParallelLoop.cpp" />
<ClCompile Include="Thread\PrioritizedWorkQueue.cpp" />
<ClCompile Include="Thread\ThreadPool.cpp" />
<ClCompile Include="Thread\ThreadManager.cpp" />
<ClCompile Include="Thread\ThreadUtil.cpp" />
<ClCompile Include="Thunk.cpp" />
<ClCompile Include="TimeUtil.cpp" />

View File

@ -93,9 +93,6 @@
<ClInclude Include="Thread\PrioritizedWorkQueue.h">
<Filter>Thread</Filter>
</ClInclude>
<ClInclude Include="Thread\ThreadPool.h">
<Filter>Thread</Filter>
</ClInclude>
<ClInclude Include="Thread\ThreadUtil.h">
<Filter>Thread</Filter>
</ClInclude>
@ -398,6 +395,18 @@
<ClInclude Include="File\AndroidStorage.h">
<Filter>File</Filter>
</ClInclude>
<ClInclude Include="Thread\ThreadManager.h">
<Filter>Thread</Filter>
</ClInclude>
<ClInclude Include="Thread\Channel.h">
<Filter>Thread</Filter>
</ClInclude>
<ClInclude Include="Thread\Promise.h">
<Filter>Thread</Filter>
</ClInclude>
<ClInclude Include="Thread\ParallelLoop.h">
<Filter>Thread</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<ClCompile Include="ABI.cpp" />
@ -496,9 +505,6 @@
<ClCompile Include="Thread\PrioritizedWorkQueue.cpp">
<Filter>Thread</Filter>
</ClCompile>
<ClCompile Include="Thread\ThreadPool.cpp">
<Filter>Thread</Filter>
</ClCompile>
<ClCompile Include="Thread\ThreadUtil.cpp">
<Filter>Thread</Filter>
</ClCompile>
@ -768,6 +774,12 @@
<ClCompile Include="File\AndroidStorage.cpp">
<Filter>File</Filter>
</ClCompile>
<ClCompile Include="Thread\ThreadManager.cpp">
<Filter>Thread</Filter>
</ClCompile>
<ClCompile Include="Thread\ParallelLoop.cpp">
<Filter>Thread</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<Filter Include="Crypto">

120
Common/Thread/Channel.h Normal file
View File

@ -0,0 +1,120 @@
#pragma once
#include <mutex>
#include <condition_variable>
// Single item mailbox.
template<class T>
struct Mailbox {
Mailbox() : refcount_(1) {}
std::mutex mutex_;
std::condition_variable condvar_;
T *data_ = nullptr;
T *Wait() {
T *data;
{
std::unique_lock<std::mutex> lock(mutex_);
while (!data_) {
condvar_.wait(lock);
}
data = data_;
}
return data;
}
bool Poll(T **data) {
bool retval = false;
{
std::unique_lock<std::mutex> lock(mutex_);
if (data_) {
*data = data_;
retval = true;
}
}
return retval;
}
bool Send(T *data) {
bool success = false;
{
std::unique_lock<std::mutex> lock(mutex_);
if (!data_) {
data_ = data;
success = true;
}
condvar_.notify_one();
}
return success;
}
void AddRef() {
refcount_.fetch_add(1);
}
void Release() {
int count = refcount_.fetch_sub(1);
if (count == 1) { // was definitely decreased to 0
delete this;
}
}
private:
std::atomic<int> refcount_;
};
// POD so can be moved around freely.
// TODO: I can't quite get this to work with all the moves, the refcount blows up. I'll just use mailboxes directly.
/*
template<class T>
class Rx {
public:
Rx() : mbx_(nullptr) {}
Rx(Mailbox<T> *mbx) : mbx_(mbx) { mbx_->AddRef(); }
Rx(Rx<T> &&rx) { mbx_ = rx.mbx_; }
Rx<T>& operator=(Rx<T> &&rx) { mbx_ = rx.mbx_; return *this; }
~Rx() {
mbx_->Release();
}
bool Poll(T **data) {
return mbx_->Poll(data);
}
T *Wait() {
return mbx_->Wait();
}
private:
Mailbox<T> *mbx_;
};
// POD so can be moved around freely.
template<class T>
class Tx {
public:
Tx() : mbx_(nullptr) {}
Tx(Mailbox<T> *mbx) : mbx_(mbx) { mbx_->AddRef(); }
Tx(Tx<T> &&rx) { mbx_ = rx.mbx_; }
Tx<T>& operator=(Tx<T> &&rx) { mbx_ = rx.mbx_; return *this; }
~Tx() {
mbx_->Release();
}
bool Send(T *t) {
return mbx_->Send(t);
}
private:
Mailbox<T> *mbx_;
};
template<class T>
std::pair<Rx<T>, Tx<T>> CreateChannel() {
Mailbox<T> *mbx = new Mailbox<T>();
auto retval = std::make_pair(Rx<T>(mbx), Tx<T>(mbx));
mbx->Release();
return retval;
}
*/

View File

@ -0,0 +1,105 @@
#include <cstring>
#include "ParallelLoop.h"
class LoopRangeTask : public Task {
public:
LoopRangeTask(WaitableCounter *counter, const std::function<void(int, int)> &loop, int lower, int upper)
: counter_(counter), loop_(loop), lower_(lower), upper_(upper) {}
void run() override {
loop_(lower_, upper_);
counter_->Count();
}
std::function<void(int, int)> loop_;
WaitableCounter *counter_;
int lower_;
int upper_;
};
WaitableCounter *ParallelRangeLoopWaitable(ThreadManager *threadMan, const std::function<void(int, int)> &loop, int lower, int upper, int minSize) {
if (minSize == -1) {
minSize = 1;
}
// TODO: Optimize using minSize.
int numTasks = threadMan->GetNumLooperThreads();
int range = upper - lower;
if (range <= 0) {
// Bad range. A finished counter allocated.
return new WaitableCounter(0);
}
if (range <= numTasks) {
// Just assign one task per thread, as many as we have.
WaitableCounter *counter = new WaitableCounter(range);
for (int i = 0; i < range; i++) {
threadMan->EnqueueTaskOnThread(i, new LoopRangeTask(counter, loop, i, i + 1));
}
return counter;
} else {
WaitableCounter *counter = new WaitableCounter(numTasks);
// Split the range between threads.
double dx = (double)range / (double)numTasks;
double d = 0.0;
int lastEnd = 0;
for (int i = 0; i < numTasks; i++) {
int start = lastEnd;
d += dx;
int end = i == numTasks - 1 ? range : (int)d;
threadMan->EnqueueTaskOnThread(i, new LoopRangeTask(counter, loop, start, end));
lastEnd = end;
}
return counter;
}
}
void ParallelRangeLoop(ThreadManager *threadMan, const std::function<void(int, int)> &loop, int lower, int upper, int minSize) {
if (minSize == -1) {
minSize = 4;
}
WaitableCounter *counter = ParallelRangeLoopWaitable(threadMan, loop, lower, upper, minSize);
// TODO: Optimize using minSize. We'll just compute whether there's a remainer, remove it from the call to ParallelRangeLoopWaitable,
// and process the remainder right here. If there's no remainer, we'll steal a whole chunk.
if (counter) {
counter->WaitAndRelease();
}
}
void ParallelMemcpy(ThreadManager *threadMan, void *dst, const void *src, size_t bytes) {
// This threshold can probably be a lot bigger.
if (bytes < 512) {
memcpy(dst, src, bytes);
return;
}
// 128 is the largest cacheline size on common CPUs.
// Still I suspect that the optimal minSize is a lot higher.
char *d = (char *)dst;
char *s = (char *)src;
ParallelRangeLoop(threadMan, [&](int l, int h) {
memmove(d + l, s + l, h - l);
}, 0, 128);
}
void ParallelMemset(ThreadManager *threadMan, void *dst, uint8_t value, size_t bytes) {
// This threshold can probably be a lot bigger.
if (bytes < 512) {
memset(dst, 0, bytes);
return;
}
// 128 is the largest cacheline size on common CPUs.
// Still I suspect that the optimal minSize is a lot higher.
char *d = (char *)dst;
ParallelRangeLoop(threadMan, [&](int l, int h) {
memset(d + l, value, h - l);
}, 0, 128);
}

View File

@ -0,0 +1,51 @@
#pragma once
#include <functional>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include "Common/Thread/ThreadManager.h"
// Kind of like a semaphore I guess.
struct WaitableCounter {
public:
WaitableCounter(int maxValue) : maxValue_(maxValue) {}
void Count() {
if (count_.fetch_add(1) == maxValue_ - 1) {
// We were the last one to increment
cond_.notify_one();
}
}
void Wait() {
std::unique_lock<std::mutex> lock(mutex_);
while (count_.load() != maxValue_) {
cond_.wait(lock);
}
}
void WaitAndRelease() {
Wait();
delete this;
}
int maxValue_;
std::atomic<int> count_;
std::mutex mutex_;
std::condition_variable cond_;
};
// Note that upper bounds are non-inclusive.
// This one never executes the remainer on the calling thread.
WaitableCounter *ParallelRangeLoopWaitable(ThreadManager *threadMan, const std::function<void(int, int)> &loop, int lower, int upper, int minSize);
// This one optimizes by running the remainder on the calling thread.
void ParallelRangeLoop(ThreadManager *threadMan, const std::function<void(int, int)> &loop, int lower, int upper, int minSize = -1);
// Common utilities for large (!) memory copies.
// Will only fall back to threads if it seems to make sense.
void ParallelMemcpy(ThreadManager *threadMan, void *dst, const void *src, size_t bytes);
void ParallelMemset(ThreadManager *threadMan, void *dst, uint8_t value, size_t bytes);

93
Common/Thread/Promise.h Normal file
View File

@ -0,0 +1,93 @@
#pragma once
#include <functional>
#include "Common/Thread/Channel.h"
#include "Common/Thread/ThreadManager.h"
template<class T>
class PromiseTask : public Task {
public:
PromiseTask() {}
~PromiseTask() {
tx_->Release();
}
void run() override {
T *value = fun_();
tx_->Send(value);
tx_->Release();
}
std::function<T *()> fun_;
Mailbox<T> *tx_;
};
// Represents pending or actual data.
// Has ownership over the data.
// Single use.
// TODO: Split Mailbox (rx_ and tx_) up into separate proxy objects.
template<class T>
class Promise {
public:
static Promise<T> *Spawn(ThreadManager *threadman, std::function<T *()> fun) {
// std::pair<Rx<T>, Tx<T>> channel = CreateChannel<T>();
Mailbox<T> *mailbox = new Mailbox<T>();
PromiseTask<T> *task = new PromiseTask<T>();
task->fun_ = fun;
task->tx_ = mailbox;
threadman->EnqueueTask(task);
Promise<T> *promise = new Promise<T>();
promise->rx_ = mailbox;
mailbox->AddRef();
return promise;
}
/*
Promise(Promise &&promise) noexcept {
data_ = promise.data_;
ready_ = promise.ready_;
rx_ = promise.rx_;
}*/
~Promise() {
if (rx_) {
rx_->Release();
}
delete data_;
}
// Returns *T if the data is ready, nullptr if it's not.
T *Poll() {
if (ready_) {
return data_;
} else {
if (rx_->Poll(&data_)) {
ready_ = true;
return data_;
} else {
return nullptr;
}
}
}
T *BlockUntilReady() {
if (ready_) {
return data_;
} else {
data_ = rx_->Wait();
rx_->Release();
rx_ = nullptr;
ready_ = true;
return data_;
}
}
private:
Promise() {}
T *data_ = nullptr;
bool ready_ = false;
Mailbox<T> *rx_;
};

View File

@ -0,0 +1,128 @@
#include <cstdio>
#include <thread>
#include <deque>
#include <condition_variable>
#include <mutex>
#include <vector>
#include <atomic>
#include "Common/Log.h"
#include "Common/Thread/ThreadUtil.h"
#include "Common/Thread/ThreadManager.h"
struct GlobalThreadContext {
std::mutex mutex; // associated with each respective condition variable
std::deque<Task *> queue;
std::vector<ThreadContext *> threads_;
};
struct ThreadContext {
std::thread thread; // the worker thread
std::condition_variable cond; // used to signal new work
std::mutex mutex; // protects the local queue.
std::atomic<int> queueSize;
int index;
std::atomic<bool> cancelled;
std::deque<Task *> private_queue;
};
ThreadManager::ThreadManager() : global_(new GlobalThreadContext()) {
}
ThreadManager::~ThreadManager() {
for (size_t i = 0; i < global_->threads_.size(); i++) {
global_->threads_[i]->cancelled = true;
global_->threads_[i]->cond.notify_one();
}
for (size_t i = 0; i < global_->threads_.size(); i++) {
global_->threads_[i]->thread.join();
}
global_->threads_.clear();
delete global_;
}
static void WorkerThreadFunc(GlobalThreadContext *global, ThreadContext *thread) {
char threadName[16];
snprintf(threadName, sizeof(threadName), "PoolWorker %d", thread->index);
SetCurrentThreadName(threadName);
while (!thread->cancelled) {
Task *task = nullptr;
// Check the thread-private queue first, then check the global queue.
{
std::unique_lock<std::mutex> lock(thread->mutex);
if (!thread->private_queue.empty()) {
task = thread->private_queue.front();
thread->private_queue.pop_front();
thread->queueSize.store((int)thread->private_queue.size());
} else {
thread->cond.wait(lock);
}
}
if (!task) {
// Grab one from the global queue if there is any.
std::unique_lock<std::mutex> lock(global->mutex);
if (!global->queue.empty()) {
task = global->queue.front();
global->queue.pop_front();
}
} // Don't try to else here!
// The task itself takes care of notifying anyone waiting on it. Not the
// responsibility of the ThreadManager (although it could be!).
if (task) {
task->run();
delete task;
}
}
}
void ThreadManager::Init(int numThreads) {
for (int i = 0; i < numThreads; i++) {
ThreadContext *thread = new ThreadContext();
thread->cancelled.store(false);
thread->thread = std::thread(&WorkerThreadFunc, global_, thread);
thread->index = i;
global_->threads_.push_back(thread);
}
}
void ThreadManager::EnqueueTask(Task *task, TaskType taskType) {
// Find a thread with no outstanding work.
for (int i = 0; i < global_->threads_.size(); i++) {
ThreadContext *thread = global_->threads_[i];
if (thread->queueSize.load() == 0) {
std::unique_lock<std::mutex> lock(thread->mutex);
thread->private_queue.push_back(task);
thread->queueSize.store((int)thread->private_queue.size());
thread->cond.notify_one();
// Found it - done.
return;
}
}
// Still not scheduled? Put it on the global queue and notify a random thread.
{
std::unique_lock<std::mutex> lock(global_->mutex);
global_->queue.push_back(task);
global_->threads_[0]->cond.notify_one();
}
}
void ThreadManager::EnqueueTaskOnThread(int threadNum, Task *task, TaskType taskType) {
_assert_(threadNum >= 0 && threadNum < (int)global_->threads_.size())
ThreadContext *thread = global_->threads_[threadNum];
{
std::unique_lock<std::mutex> lock(thread->mutex);
thread->private_queue.push_back(task);
thread->cond.notify_one();
}
}
int ThreadManager::GetNumLooperThreads() const {
return (int)(global_->threads_.size() / 2);
}
void ThreadManager::TryCancelTask(Task *task) {
// Do nothing
}

View File

@ -0,0 +1,45 @@
#pragma once
// The new threadpool.
// To help future smart scheduling.
enum class TaskType {
CPU_COMPUTE,
IO_BLOCKING,
};
class Task {
public:
virtual ~Task() {}
virtual void run() = 0;
virtual bool cancellable() { return false; }
virtual void cancel() {}
};
struct ThreadContext;
struct GlobalThreadContext;
class ThreadManager {
public:
ThreadManager();
~ThreadManager();
void Init(int numWorkerThreads);
void EnqueueTask(Task *task, TaskType taskType = TaskType::CPU_COMPUTE);
void EnqueueTaskOnThread(int threadNum, Task *task, TaskType taskType = TaskType::CPU_COMPUTE);
// Currently does nothing. It will always be best-effort - maybe it cancels,
// maybe it doesn't.
void TryCancelTask(Task *task);
// Parallel loops get to use half the threads,
// so we still have some worker threads for other tasks.
int GetNumLooperThreads() const;
private:
GlobalThreadContext *global_;
friend struct ThreadContext;
};
extern ThreadManager g_threadManager;

View File

@ -1,142 +0,0 @@
#include <algorithm>
#include <cstring>
#include "Common/Thread/ThreadPool.h"
#include "Common/Thread/ThreadUtil.h"
#include "Common/Log.h"
#include "Common/MakeUnique.h"
///////////////////////////// WorkerThread
WorkerThread::~WorkerThread() {
{
std::lock_guard<std::mutex> guard(mutex);
active = false;
signal.notify_one();
}
if (thread.joinable()) {
thread.join();
}
}
void WorkerThread::StartUp() {
thread = std::thread(std::bind(&WorkerThread::WorkFunc, this));
}
void WorkerThread::Process(std::function<void()> work) {
std::lock_guard<std::mutex> guard(mutex);
work_ = std::move(work);
jobsTarget = jobsDone + 1;
signal.notify_one();
}
void WorkerThread::WaitForCompletion() {
std::unique_lock<std::mutex> guard(doneMutex);
while (jobsDone < jobsTarget) {
done.wait(guard);
}
}
void WorkerThread::WorkFunc() {
SetCurrentThreadName("Worker");
std::unique_lock<std::mutex> guard(mutex);
while (active) {
// 'active == false' is one of the conditions for signaling,
// do not "optimize" it
while (active && jobsTarget <= jobsDone) {
signal.wait(guard);
}
if (active) {
work_();
std::lock_guard<std::mutex> doneGuard(doneMutex);
jobsDone++;
done.notify_one();
}
}
}
void LoopWorkerThread::ProcessLoop(std::function<void(int, int)> work, int start, int end) {
std::lock_guard<std::mutex> guard(mutex);
loopWork_ = std::move(work);
work_ = [this]() {
loopWork_(start_, end_);
};
start_ = start;
end_ = end;
jobsTarget = jobsDone + 1;
signal.notify_one();
}
///////////////////////////// ThreadPool
ThreadPool::ThreadPool(int numThreads) {
if (numThreads <= 0) {
numThreads_ = 1;
INFO_LOG(JIT, "ThreadPool: Bad number of threads %d", numThreads);
} else if (numThreads > 16) {
INFO_LOG(JIT, "ThreadPool: Capping number of threads to 16 (was %d)", numThreads);
numThreads_ = 16;
} else {
numThreads_ = numThreads;
}
}
void ThreadPool::StartWorkers() {
if (!workersStarted) {
workers.reserve(numThreads_ - 1);
for(int i = 0; i < numThreads_ - 1; ++i) { // create one less worker thread as the thread calling ParallelLoop will also do work
auto workerPtr = make_unique<LoopWorkerThread>();
workerPtr->StartUp();
workers.push_back(std::move(workerPtr));
}
workersStarted = true;
}
}
void ThreadPool::ParallelLoop(const std::function<void(int,int)> &loop, int lower, int upper, int minSize) {
// Don't parallelize tiny loops.
if (minSize == -1)
minSize = 4;
int range = upper - lower;
if (range >= minSize) {
std::lock_guard<std::mutex> guard(mutex);
StartWorkers();
// could do slightly better load balancing for the generic case,
// but doesn't matter since all our loops are power of 2
int chunk = std::max(minSize, range / numThreads_);
int s = lower;
for (auto &worker : workers) {
// We'll do the last chunk on the current thread.
if (s + chunk >= upper) {
break;
}
worker->ProcessLoop(loop, s, s + chunk);
s += chunk;
}
// This is the final chunk.
if (s < upper)
loop(s, upper);
for (auto &worker : workers) {
worker->WaitForCompletion();
}
} else {
loop(lower, upper);
}
}
void ThreadPool::ParallelMemcpy(void *dest, const void *src, int size) {
static const int MIN_SIZE = 128 * 1024;
ParallelLoop([&](int l, int h) {
memmove((uint8_t *)dest + l, (const uint8_t *)src + l, h - l);
}, 0, size, MIN_SIZE);
}
void ThreadPool::ParallelMemset(void *dest, uint8_t val, int size) {
static const int MIN_SIZE = 128 * 1024;
ParallelLoop([&](int l, int h) {
memset((uint8_t *)dest + l, val, h - l);
}, 0, size, MIN_SIZE);
}

View File

@ -1,78 +0,0 @@
#pragma once
#include <functional>
#include <memory>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
// This is the simplest possible worker implementation I can think of
// but entirely sufficient for the given purpose.
// Only handles a single item of work at a time.
class WorkerThread {
public:
WorkerThread() = default;
virtual ~WorkerThread();
void StartUp();
// submit a new work item
void Process(std::function<void()> work);
// wait for a submitted work item to be completed
void WaitForCompletion();
protected:
virtual void WorkFunc();
std::thread thread; // the worker thread
std::condition_variable signal; // used to signal new work
std::condition_variable done; // used to signal work completion
std::mutex mutex, doneMutex; // associated with each respective condition variable
bool active = true;
int jobsDone = 0;
int jobsTarget = 0;
std::function<void()> work_; // the work to be done by this thread
private:
WorkerThread(const WorkerThread& other) = delete; // prevent copies
void operator =(const WorkerThread &other) = delete;
};
class LoopWorkerThread final : public WorkerThread {
public:
LoopWorkerThread() = default;
void ProcessLoop(std::function<void(int, int)> work, int start, int end);
private:
int start_;
int end_;
std::function<void(int, int)> loopWork_; // the work to be done by this thread
};
// A thread pool manages a set of worker threads, and allows the execution of parallel loops on them
// individual parallel loops are fully sequentialized to simplify synchronization, which should not
// be a problem as they should each use the entire system
class ThreadPool {
public:
ThreadPool(int numThreads);
// don't need a destructor, "workers" is cleared on delete,
// leading to the stopping and joining of all worker threads (RAII and all that)
void ParallelLoop(const std::function<void(int,int)> &loop, int lower, int upper, int minSize);
void ParallelMemcpy(void *dest, const void *src, int sz);
void ParallelMemset(void *dest, uint8_t val, int sz);
private:
int numThreads_;
std::vector<std::unique_ptr<LoopWorkerThread>> workers;
std::mutex mutex; // used to sequentialize loop execution
bool workersStarted = false;
void StartWorkers();
ThreadPool(const ThreadPool& other) = delete; // prevent copies
void operator =(const ThreadPool &other) = delete;
};

View File

@ -1,5 +1,34 @@
#pragma once
#include <mutex>
template<class T>
class RustMutexProxy {
public:
RustMutexProxy(T *data, std::mutex &mutex) : data_(data), mutex_(mutex) {}
~RustMutexProxy() { mutex_.unlock(); }
T &operator*() { return *data_; }
T *operator->() { return data_; }
private:
T *data_;
std::mutex &mutex_;
};
template<class T>
class RustMutex {
public:
RustMutex(T && t) : t_(t) {}
RustMutexProxy<T> lock() {
mutex_.lock();
return RustMutexProxy<T>(&t_, mutex_);
}
private:
T t_;
std::mutex mutex_;
};
// Note that name must be a global string that lives until the end of the process,
// for AssertCurrentThreadName to work.
void SetCurrentThreadName(const char *threadName);

View File

@ -16,6 +16,8 @@
// https://github.com/hrydgard/ppsspp and http://www.ppsspp.org/.
#include "Common/StringUtils.h"
#include "Common/Thread/ParallelLoop.h"
#include "Core/MemMap.h"
#include "Core/Reporting.h"
#include "Core/ThreadPools.h"
@ -60,9 +62,8 @@ bool ElfReader::LoadRelocations(const Elf32_Rel *rels, int numRelocs) {
relocOps.resize(numRelocs);
DEBUG_LOG(LOADER, "Loading %i relocations...", numRelocs);
int numErrors = 0;
GlobalThreadPool::Loop([&](int l, int h) {
std::atomic<int> numErrors;
ParallelRangeLoop(&g_threadManager, [&](int l, int h) {
for (int r = l; r < h; r++) {
u32 info = rels[r].r_info;
u32 addr = rels[r].r_offset;
@ -96,7 +97,7 @@ bool ElfReader::LoadRelocations(const Elf32_Rel *rels, int numRelocs) {
}
}, 0, numRelocs, 128);
GlobalThreadPool::Loop([&](int l, int h) {
ParallelRangeLoop(&g_threadManager, [&](int l, int h) {
for (int r = l; r < h; r++) {
VERBOSE_LOG(LOADER, "Loading reloc %i (%p)...", r, rels + r);
u32 info = rels[r].r_info;
@ -210,7 +211,7 @@ bool ElfReader::LoadRelocations(const Elf32_Rel *rels, int numRelocs) {
}, 0, numRelocs, 128);
if (numErrors) {
WARN_LOG(LOADER, "%i bad relocations found!!!", numErrors);
WARN_LOG(LOADER, "%i bad relocations found!!!", numErrors.load());
}
return numErrors == 0;
}

View File

@ -20,6 +20,7 @@
#include <vector>
#include <map>
#include "Common/Thread/ParallelLoop.h"
#include "Core/CoreTiming.h"
#include "Core/Debugger/MemBlockInfo.h"
#include "Core/HLE/HLE.h"
@ -431,7 +432,7 @@ void __KernelMemoryInit()
MemBlockInfoInit();
kernelMemory.Init(PSP_GetKernelMemoryBase(), PSP_GetKernelMemoryEnd() - PSP_GetKernelMemoryBase(), false);
userMemory.Init(PSP_GetUserMemoryBase(), PSP_GetUserMemoryEnd() - PSP_GetUserMemoryBase(), false);
GlobalThreadPool::Memset(Memory::GetPointer(PSP_GetKernelMemoryBase()), 0, PSP_GetUserMemoryEnd() - PSP_GetKernelMemoryBase());
ParallelMemset(&g_threadManager, Memory::GetPointer(PSP_GetKernelMemoryBase()), 0, PSP_GetUserMemoryEnd() - PSP_GetKernelMemoryBase());
NotifyMemInfo(MemBlockFlags::WRITE, PSP_GetKernelMemoryBase(), PSP_GetUserMemoryEnd() - PSP_GetKernelMemoryBase(), "MemInit");
INFO_LOG(SCEKERNEL, "Kernel and user memory pools initialized");

View File

@ -43,7 +43,7 @@
#include "Core/MIPS/MIPS.h"
#include "Core/MIPS/JitCommon/JitBlockCache.h"
#include "Core/MIPS/JitCommon/JitCommon.h"
#include "Core/ThreadPools.h"
#include "Common/Thread/ParallelLoop.h"
#include "UI/OnScreenDisplay.h"
namespace Memory {
@ -318,16 +318,16 @@ static void DoMemoryVoid(PointerWrap &p, uint32_t start, uint32_t size) {
switch (p.mode) {
case PointerWrap::MODE_READ:
GlobalThreadPool::Memcpy(d, storage, size);
ParallelMemcpy(&g_threadManager, d, storage, size);
break;
case PointerWrap::MODE_WRITE:
GlobalThreadPool::Memcpy(storage, d, size);
ParallelMemcpy(&g_threadManager, storage, d, size);
break;
case PointerWrap::MODE_MEASURE:
// Nothing to do here.
break;
case PointerWrap::MODE_VERIFY:
GlobalThreadPool::Loop([&](int l, int h) {
ParallelRangeLoop(&g_threadManager, [&](int l, int h) {
for (int i = l; i < h; i++)
_dbg_assert_msg_(d[i] == storage[i], "Savestate verification failure: %d (0x%X) (at %p) != %d (0x%X) (at %p).\n", d[i], d[i], &d[i], storage[i], storage[i], &storage[i]);
}, 0, size);

View File

@ -30,6 +30,7 @@
#include "Common/Data/Text/Parsers.h"
#include "Common/File/FileUtil.h"
#include "Common/StringUtils.h"
#include "Common/Thread/ParallelLoop.h"
#include "Core/Config.h"
#include "Core/Host.h"
#include "Core/System.h"
@ -748,8 +749,8 @@ void ReplacedTexture::Load(int level, void *out, int rowPitch) {
int w, h, f;
uint8_t *image;
if (LoadZIMPtr(&zim[0], zimSize, &w, &h, &f, &image)) {
GlobalThreadPool::Loop([&](int low, int high) {
for (int y = low; y < high; ++y) {
ParallelRangeLoop(&g_threadManager, [&](int l, int h) {
for (int y = l; y < h; ++y) {
memcpy((uint8_t *)out + rowPitch * y, image + w * 4 * y, w * 4);
}
}, 0, h);

View File

@ -1,26 +1,3 @@
#include "ThreadPools.h"
#include "../Core/Config.h"
#include "Common/MakeUnique.h"
std::unique_ptr<ThreadPool> GlobalThreadPool::pool;
std::once_flag GlobalThreadPool::init_flag;
void GlobalThreadPool::Loop(const std::function<void(int,int)>& loop, int lower, int upper, int minSize) {
std::call_once(init_flag, Inititialize);
pool->ParallelLoop(loop, lower, upper, minSize);
}
void GlobalThreadPool::Memcpy(void *dest, const void *src, int size) {
std::call_once(init_flag, Inititialize);
pool->ParallelMemcpy(dest, src, size);
}
void GlobalThreadPool::Memset(void *dest, uint8_t val, int size) {
std::call_once(init_flag, Inititialize);
pool->ParallelMemset(dest, val, size);
}
void GlobalThreadPool::Inititialize() {
pool = make_unique<ThreadPool>(g_Config.iNumWorkerThreads);
}
ThreadManager g_threadManager;

View File

@ -1,17 +1,5 @@
#pragma once
#include "Common/Thread/ThreadPool.h"
#include "Common/Thread/ThreadManager.h"
class GlobalThreadPool {
public:
// will execute slices of "loop" from "lower" to "upper"
// in parallel on the global thread pool
static void Loop(const std::function<void(int,int)>& loop, int lower, int upper, int minSize = -1);
static void Memcpy(void *dest, const void *src, int size);
static void Memset(void *dest, uint8_t val, int size);
private:
static std::unique_ptr<ThreadPool> pool;
static std::once_flag init_flag;
static void Inititialize();
};
extern ThreadManager g_threadManager;

View File

@ -26,6 +26,7 @@
#include "Common/Common.h"
#include "Common/Log.h"
#include "Common/CommonFuncs.h"
#include "Common/Thread/ParallelLoop.h"
#include "Core/ThreadPools.h"
#include "Common/CPUDetect.h"
#include "ext/xbrz/xbrz.h"
@ -607,22 +608,22 @@ bool TextureScalerCommon::Scale(u32* &data, u32 &dstFmt, int &width, int &height
void TextureScalerCommon::ScaleXBRZ(int factor, u32* source, u32* dest, int width, int height) {
xbrz::ScalerCfg cfg;
GlobalThreadPool::Loop(std::bind(&xbrz::scale, factor, source, dest, width, height, xbrz::ColorFormat::ARGB, cfg, std::placeholders::_1, std::placeholders::_2), 0, height);
ParallelRangeLoop(&g_threadManager, std::bind(&xbrz::scale, factor, source, dest, width, height, xbrz::ColorFormat::ARGB, cfg, std::placeholders::_1, std::placeholders::_2), 0, height);
}
void TextureScalerCommon::ScaleBilinear(int factor, u32* source, u32* dest, int width, int height) {
bufTmp1.resize(width*height*factor);
bufTmp1.resize(width * height * factor);
u32 *tmpBuf = bufTmp1.data();
GlobalThreadPool::Loop(std::bind(&bilinearH, factor, source, tmpBuf, width, std::placeholders::_1, std::placeholders::_2), 0, height);
GlobalThreadPool::Loop(std::bind(&bilinearV, factor, tmpBuf, dest, width, 0, height, std::placeholders::_1, std::placeholders::_2), 0, height);
ParallelRangeLoop(&g_threadManager, std::bind(&bilinearH, factor, source, tmpBuf, width, std::placeholders::_1, std::placeholders::_2), 0, height);
ParallelRangeLoop(&g_threadManager, std::bind(&bilinearV, factor, tmpBuf, dest, width, 0, height, std::placeholders::_1, std::placeholders::_2), 0, height);
}
void TextureScalerCommon::ScaleBicubicBSpline(int factor, u32* source, u32* dest, int width, int height) {
GlobalThreadPool::Loop(std::bind(&scaleBicubicBSpline, factor, source, dest, width, height, std::placeholders::_1, std::placeholders::_2), 0, height);
ParallelRangeLoop(&g_threadManager,std::bind(&scaleBicubicBSpline, factor, source, dest, width, height, std::placeholders::_1, std::placeholders::_2), 0, height);
}
void TextureScalerCommon::ScaleBicubicMitchell(int factor, u32* source, u32* dest, int width, int height) {
GlobalThreadPool::Loop(std::bind(&scaleBicubicMitchell, factor, source, dest, width, height, std::placeholders::_1, std::placeholders::_2), 0, height);
ParallelRangeLoop(&g_threadManager,std::bind(&scaleBicubicMitchell, factor, source, dest, width, height, std::placeholders::_1, std::placeholders::_2), 0, height);
}
void TextureScalerCommon::ScaleHybrid(int factor, u32* source, u32* dest, int width, int height, bool bicubic) {
@ -638,8 +639,9 @@ void TextureScalerCommon::ScaleHybrid(int factor, u32* source, u32* dest, int wi
bufTmp1.resize(width*height);
bufTmp2.resize(width*height*factor*factor);
bufTmp3.resize(width*height*factor*factor);
GlobalThreadPool::Loop(std::bind(&generateDistanceMask, source, bufTmp1.data(), width, height, std::placeholders::_1, std::placeholders::_2), 0, height);
GlobalThreadPool::Loop(std::bind(&convolve3x3, bufTmp1.data(), bufTmp2.data(), KERNEL_SPLAT, width, height, std::placeholders::_1, std::placeholders::_2), 0, height);
ParallelRangeLoop(&g_threadManager,std::bind(&generateDistanceMask, source, bufTmp1.data(), width, height, std::placeholders::_1, std::placeholders::_2), 0, height);
ParallelRangeLoop(&g_threadManager,std::bind(&convolve3x3, bufTmp1.data(), bufTmp2.data(), KERNEL_SPLAT, width, height, std::placeholders::_1, std::placeholders::_2), 0, height);
ScaleBilinear(factor, bufTmp2.data(), bufTmp3.data(), width, height);
// mask C is now in bufTmp3
@ -652,13 +654,13 @@ void TextureScalerCommon::ScaleHybrid(int factor, u32* source, u32* dest, int wi
// Now we can mix it all together
// The factor 8192 was found through practical testing on a variety of textures
GlobalThreadPool::Loop(std::bind(&mix, dest, bufTmp2.data(), bufTmp3.data(), 8192, width*factor, std::placeholders::_1, std::placeholders::_2), 0, height*factor);
ParallelRangeLoop(&g_threadManager,std::bind(&mix, dest, bufTmp2.data(), bufTmp3.data(), 8192, width*factor, std::placeholders::_1, std::placeholders::_2), 0, height*factor);
}
void TextureScalerCommon::DePosterize(u32* source, u32* dest, int width, int height) {
bufTmp3.resize(width*height);
GlobalThreadPool::Loop(std::bind(&deposterizeH, source, bufTmp3.data(), width, std::placeholders::_1, std::placeholders::_2), 0, height);
GlobalThreadPool::Loop(std::bind(&deposterizeV, bufTmp3.data(), dest, width, height, std::placeholders::_1, std::placeholders::_2), 0, height);
GlobalThreadPool::Loop(std::bind(&deposterizeH, dest, bufTmp3.data(), width, std::placeholders::_1, std::placeholders::_2), 0, height);
GlobalThreadPool::Loop(std::bind(&deposterizeV, bufTmp3.data(), dest, width, height, std::placeholders::_1, std::placeholders::_2), 0, height);
ParallelRangeLoop(&g_threadManager,std::bind(&deposterizeH, source, bufTmp3.data(), width, std::placeholders::_1, std::placeholders::_2), 0, height);
ParallelRangeLoop(&g_threadManager,std::bind(&deposterizeV, bufTmp3.data(), dest, width, height, std::placeholders::_1, std::placeholders::_2), 0, height);
ParallelRangeLoop(&g_threadManager,std::bind(&deposterizeH, dest, bufTmp3.data(), width, std::placeholders::_1, std::placeholders::_2), 0, height);
ParallelRangeLoop(&g_threadManager,std::bind(&deposterizeV, bufTmp3.data(), dest, width, height, std::placeholders::_1, std::placeholders::_2), 0, height);
}

View File

@ -20,6 +20,7 @@
#include <d3d11.h>
#include "Common/Data/Convert/ColorConv.h"
#include "Core/ThreadPools.h"
#include "Common/Thread/ParallelLoop.h"
#include "GPU/Common/TextureScalerCommon.h"
#include "GPU/D3D11/TextureScalerD3D11.h"
#include "GPU/D3D11/GPU_D3D11.h"
@ -39,15 +40,15 @@ void TextureScalerD3D11::ConvertTo8888(u32 format, u32* source, u32* &dest, int
break;
case DXGI_FORMAT_B4G4R4A4_UNORM:
GlobalThreadPool::Loop(std::bind(&convert4444_dx9, (u16*)source, dest, width, std::placeholders::_1, std::placeholders::_2), 0, height);
ParallelRangeLoop(&g_threadManager, std::bind(&convert4444_dx9, (u16*)source, dest, width, std::placeholders::_1, std::placeholders::_2), 0, height);
break;
case DXGI_FORMAT_B5G6R5_UNORM:
GlobalThreadPool::Loop(std::bind(&convert565_dx9, (u16*)source, dest, width, std::placeholders::_1, std::placeholders::_2), 0, height);
ParallelRangeLoop(&g_threadManager, std::bind(&convert565_dx9, (u16*)source, dest, width, std::placeholders::_1, std::placeholders::_2), 0, height);
break;
case DXGI_FORMAT_B5G5R5A1_UNORM:
GlobalThreadPool::Loop(std::bind(&convert5551_dx9, (u16*)source, dest, width, std::placeholders::_1, std::placeholders::_2), 0, height);
ParallelRangeLoop(&g_threadManager, std::bind(&convert5551_dx9, (u16*)source, dest, width, std::placeholders::_1, std::placeholders::_2), 0, height);
break;
default:

View File

@ -21,10 +21,12 @@
#include <functional>
#include <set>
#include <vector>
#include <mutex>
#include <zstd.h>
#include "Common/Common.h"
#include "Common/File/FileUtil.h"
#include "Common/Thread/ParallelLoop.h"
#include "Common/Log.h"
#include "Common/StringUtils.h"
@ -179,7 +181,7 @@ static const u8 *mymemmem(const u8 *haystack, size_t off, size_t hlen, const u8
std::mutex resultLock;
int range = (int)(last_possible - first_possible);
GlobalThreadPool::Loop([&](int l, int h) {
ParallelRangeLoop(&g_threadManager, [&](int l, int h) {
const u8 *p = haystack + off + l;
const u8 *pend = haystack + off + h;

View File

@ -19,6 +19,7 @@
#include "Common/Common.h"
#include "Common/Data/Convert/ColorConv.h"
#include "Common/Thread/ParallelLoop.h"
#include "Core/ThreadPools.h"
#include "GPU/Common/TextureScalerCommon.h"
#include "GPU/Directx9/TextureScalerDX9.h"
@ -41,15 +42,15 @@ void TextureScalerDX9::ConvertTo8888(u32 format, u32* source, u32* &dest, int wi
break;
case D3DFMT_A4R4G4B4:
GlobalThreadPool::Loop(std::bind(&convert4444_dx9, (u16*)source, dest, width, std::placeholders::_1, std::placeholders::_2), 0, height);
ParallelRangeLoop(&g_threadManager, std::bind(&convert4444_dx9, (u16*)source, dest, width, std::placeholders::_1, std::placeholders::_2), 0, height);
break;
case D3DFMT_R5G6B5:
GlobalThreadPool::Loop(std::bind(&convert565_dx9, (u16*)source, dest, width, std::placeholders::_1, std::placeholders::_2), 0, height);
ParallelRangeLoop(&g_threadManager, std::bind(&convert565_dx9, (u16*)source, dest, width, std::placeholders::_1, std::placeholders::_2), 0, height);
break;
case D3DFMT_A1R5G5B5:
GlobalThreadPool::Loop(std::bind(&convert5551_dx9, (u16*)source, dest, width, std::placeholders::_1, std::placeholders::_2), 0, height);
ParallelRangeLoop(&g_threadManager, std::bind(&convert5551_dx9, (u16*)source, dest, width, std::placeholders::_1, std::placeholders::_2), 0, height);
break;
default:

View File

@ -22,6 +22,7 @@
#include "GPU/GLES/TextureScalerGLES.h"
#include "Common/Data/Convert/ColorConv.h"
#include "Common/Log.h"
#include "Common/Thread/ParallelLoop.h"
#include "Core/ThreadPools.h"
#include "Common/GPU/DataFormat.h"
@ -41,15 +42,15 @@ void TextureScalerGLES::ConvertTo8888(u32 format, u32* source, u32* &dest, int w
break;
case Draw::DataFormat::R4G4B4A4_UNORM_PACK16:
GlobalThreadPool::Loop(std::bind(&convert4444_gl, (u16*)source, dest, width, std::placeholders::_1, std::placeholders::_2), 0, height);
ParallelRangeLoop(&g_threadManager, std::bind(&convert4444_gl, (u16*)source, dest, width, std::placeholders::_1, std::placeholders::_2), 0, height);
break;
case Draw::DataFormat::R5G6B5_UNORM_PACK16:
GlobalThreadPool::Loop(std::bind(&convert565_gl, (u16*)source, dest, width, std::placeholders::_1, std::placeholders::_2), 0, height);
ParallelRangeLoop(&g_threadManager, std::bind(&convert565_gl, (u16*)source, dest, width, std::placeholders::_1, std::placeholders::_2), 0, height);
break;
case Draw::DataFormat::R5G5B5A1_UNORM_PACK16:
GlobalThreadPool::Loop(std::bind(&convert5551_gl, (u16*)source, dest, width, std::placeholders::_1, std::placeholders::_2), 0, height);
ParallelRangeLoop(&g_threadManager, std::bind(&convert5551_gl, (u16*)source, dest, width, std::placeholders::_1, std::placeholders::_2), 0, height);
break;
default:

View File

@ -21,11 +21,12 @@
#include "Common/Data/Convert/ColorConv.h"
#include "Common/Profiler/Profiler.h"
#include "Common/Thread/ParallelLoop.h"
#include "Core/ThreadPools.h"
#include "Core/Config.h"
#include "Core/MemMap.h"
#include "Core/Reporting.h"
#include "Core/ThreadPools.h"
#include "GPU/GPUState.h"
#include "GPU/Common/TextureDecoder.h"
@ -1326,24 +1327,24 @@ void DrawTriangle(const VertexData& v0, const VertexData& v1, const VertexData&
auto bound = [&](int a, int b) -> void {
DrawTriangleSlice<true>(v0, v1, v2, minX, minY, maxX, maxY, false, a, b);
};
GlobalThreadPool::Loop(bound, 0, rangeX);
ParallelRangeLoop(&g_threadManager, bound, 0, rangeX);
} else {
auto bound = [&](int a, int b) -> void {
DrawTriangleSlice<false>(v0, v1, v2, minX, minY, maxX, maxY, false, a, b);
};
GlobalThreadPool::Loop(bound, 0, rangeX);
ParallelRangeLoop(&g_threadManager, bound, 0, rangeX);
}
} else if (rangeY >= 12 && rangeX >= 12) {
if (gstate.isModeClear()) {
auto bound = [&](int a, int b) -> void {
DrawTriangleSlice<true>(v0, v1, v2, minX, minY, maxX, maxY, true, a, b);
};
GlobalThreadPool::Loop(bound, 0, rangeY);
ParallelRangeLoop(&g_threadManager, bound, 0, rangeY);
} else {
auto bound = [&](int a, int b) -> void {
DrawTriangleSlice<false>(v0, v1, v2, minX, minY, maxX, maxY, true, a, b);
};
GlobalThreadPool::Loop(bound, 0, rangeY);
ParallelRangeLoop(&g_threadManager, bound, 0, rangeY);
}
} else {
if (gstate.isModeClear()) {

View File

@ -21,6 +21,7 @@
#include "Common/Data/Convert/ColorConv.h"
#include "Common/GPU/Vulkan/VulkanContext.h"
#include "Common/Log.h"
#include "Common/Thread/ParallelLoop.h"
#include "Core/ThreadPools.h"
#include "GPU/Common/TextureScalerCommon.h"
#include "GPU/Vulkan/TextureScalerVulkan.h"
@ -47,15 +48,15 @@ void TextureScalerVulkan::ConvertTo8888(u32 format, u32* source, u32* &dest, int
break;
case VULKAN_4444_FORMAT:
GlobalThreadPool::Loop(std::bind(&convert4444_dx9, (u16*)source, dest, width, std::placeholders::_1, std::placeholders::_2), 0, height);
ParallelRangeLoop(&g_threadManager, std::bind(&convert4444_dx9, (u16*)source, dest, width, std::placeholders::_1, std::placeholders::_2), 0, height);
break;
case VULKAN_565_FORMAT:
GlobalThreadPool::Loop(std::bind(&convert565_dx9, (u16*)source, dest, width, std::placeholders::_1, std::placeholders::_2), 0, height);
ParallelRangeLoop(&g_threadManager, std::bind(&convert565_dx9, (u16*)source, dest, width, std::placeholders::_1, std::placeholders::_2), 0, height);
break;
case VULKAN_1555_FORMAT:
GlobalThreadPool::Loop(std::bind(&convert5551_dx9, (u16*)source, dest, width, std::placeholders::_1, std::placeholders::_2), 0, height);
ParallelRangeLoop(&g_threadManager, std::bind(&convert5551_dx9, (u16*)source, dest, width, std::placeholders::_1, std::placeholders::_2), 0, height);
break;
default:

View File

@ -96,6 +96,7 @@
#include "Core/Util/GameManager.h"
#include "Core/Util/AudioFormat.h"
#include "Core/WebServer.h"
#include "Core/ThreadPools.h"
#include "GPU/GPUInterface.h"
#include "UI/BackgroundAudio.h"
@ -336,6 +337,8 @@ static void PostLoadConfig() {
i18nrepo.LoadIni(g_Config.sLanguageIni);
else
i18nrepo.LoadIni(g_Config.sLanguageIni, langOverridePath);
g_threadManager.Init(g_Config.iNumWorkerThreads);
}
static bool CreateDirectoriesAndroid() {

View File

@ -487,8 +487,11 @@
<ClInclude Include="..\..\Common\System\NativeApp.h" />
<ClInclude Include="..\..\Common\System\System.h" />
<ClInclude Include="..\..\Common\Thread\PrioritizedWorkQueue.h" />
<ClInclude Include="..\..\Common\Thread\ThreadPool.h" />
<ClInclude Include="..\..\Common\Thread\Channel.h" />
<ClInclude Include="..\..\Common\Thread\Promise.h" />
<ClInclude Include="..\..\Common\Thread\ThreadUtil.h" />
<ClInclude Include="..\..\Common\Thread\ThreadManager.h" />
<ClInclude Include="..\..\Common\Thread\ParallelLoop.h" />
<ClInclude Include="..\..\Common\Thunk.h" />
<ClInclude Include="..\..\Common\TimeUtil.h" />
<ClInclude Include="..\..\Common\UI\Context.h" />
@ -597,8 +600,9 @@
<ClCompile Include="..\..\Common\StringUtils.cpp" />
<ClCompile Include="..\..\Common\System\Display.cpp" />
<ClCompile Include="..\..\Common\Thread\PrioritizedWorkQueue.cpp" />
<ClCompile Include="..\..\Common\Thread\ThreadPool.cpp" />
<ClCompile Include="..\..\Common\Thread\ThreadUtil.cpp" />
<ClCompile Include="..\..\Common\Thread\ThreadManager.cpp" />
<ClCompile Include="..\..\Common\Thread\ParallelLoop.cpp" />
<ClCompile Include="..\..\Common\Thunk.cpp" />
<ClCompile Include="..\..\Common\TimeUtil.cpp" />
<ClCompile Include="..\..\Common\UI\Context.cpp" />

View File

@ -183,7 +183,10 @@
<ClCompile Include="..\..\Common\Thread\PrioritizedWorkQueue.cpp">
<Filter>Thread</Filter>
</ClCompile>
<ClCompile Include="..\..\Common\Thread\ThreadPool.cpp">
<ClCompile Include="..\..\Common\Thread\ThreadManager.cpp">
<Filter>Thread</Filter>
</ClCompile>
<ClCompile Include="..\..\Common\Thread\ParallelLoop.cpp">
<Filter>Thread</Filter>
</ClCompile>
<ClCompile Include="..\..\Common\Thread\ThreadUtil.cpp">
@ -460,7 +463,16 @@
<ClInclude Include="..\..\Common\Thread\PrioritizedWorkQueue.h">
<Filter>Thread</Filter>
</ClInclude>
<ClInclude Include="..\..\Common\Thread\ThreadPool.h">
<ClInclude Include="..\..\Common\Thread\Channel.h">
<Filter>Thread</Filter>
</ClInclude>
<ClInclude Include="..\..\Common\Thread\ParallelLoop.h">
<Filter>Thread</Filter>
</ClInclude>
<ClInclude Include="..\..\Common\Thread\Promise.h">
<Filter>Thread</Filter>
</ClInclude>
<ClInclude Include="..\..\Common\Thread\ThreadManager.h">
<Filter>Thread</Filter>
</ClInclude>
<ClInclude Include="..\..\Common\Thread\ThreadUtil.h">

View File

@ -286,8 +286,9 @@ EXEC_AND_LIB_FILES := \
$(SRC)/Common/Profiler/Profiler.cpp \
$(SRC)/Common/System/Display.cpp \
$(SRC)/Common/Thread/PrioritizedWorkQueue.cpp \
$(SRC)/Common/Thread/ThreadPool.cpp \
$(SRC)/Common/Thread/ThreadUtil.cpp \
$(SRC)/Common/Thread/ThreadManager.cpp \
$(SRC)/Common/Thread/ParallelLoop.cpp \
$(SRC)/Common/UI/Root.cpp \
$(SRC)/Common/UI/Screen.cpp \
$(SRC)/Common/UI/UI.cpp \

View File

@ -278,8 +278,9 @@ SOURCES_CXX += \
$(COMMONDIR)/Render/TextureAtlas.cpp \
$(COMMONDIR)/Serialize/Serializer.cpp \
$(COMMONDIR)/Thread/ThreadUtil.cpp \
$(COMMONDIR)/Thread/ThreadPool.cpp \
$(COMMONDIR)/Thread/PrioritizedWorkQueue.cpp \
$(COMMONDIR)/Thread/ParallelLoop.cpp \
$(COMMONDIR)/Thread/ThreadManager.cpp \
$(COMMONDIR)/UI/Root.cpp \
$(COMMONDIR)/UI/Screen.cpp \
$(COMMONDIR)/UI/UI.cpp \
@ -583,7 +584,7 @@ SOURCES_CXX += \
$(COREDIR)/SaveState.cpp \
$(COREDIR)/Screenshot.cpp \
$(COREDIR)/System.cpp \
$(COREDIR)/ThreadPools.cpp \
$(COREDIR)/ThreadPools.cpp \
$(COREDIR)/Util/BlockAllocator.cpp \
$(COREDIR)/Util/PPGeDraw.cpp \
$(COREDIR)/Util/AudioFormat.cpp \

View File

@ -0,0 +1,68 @@
#pragma once
#include "Common/Log.h"
#include "Common/TimeUtil.h"
#include "Common/Thread/ThreadManager.h"
#include "Common/Thread/Channel.h"
#include "Common/Thread/Promise.h"
#include "Common/Thread/ParallelLoop.h"
struct ResultObject {
bool ok;
};
ResultObject *ResultProducer() {
sleep_ms(250);
return new ResultObject{ true };
}
bool TestMailbox() {
Mailbox<ResultObject> *mailbox = new Mailbox<ResultObject>();
mailbox->Send(new ResultObject{ true });
ResultObject *data;
data = mailbox->Wait();
_assert_(data && data->ok);
delete data;
mailbox->Release();
return true;
}
void rangeFunc(int lower, int upper) {
printf("%d-%d\n", lower, upper);
}
bool TestParallelLoop(ThreadManager *threadMan) {
WaitableCounter *waitable = ParallelRangeLoopWaitable(threadMan, rangeFunc, 0, 7, 1);
// Can do stuff here if we like.
waitable->WaitAndRelease();
// Now it's done.
return true;
}
bool TestThreadManager() {
if (!TestMailbox()) {
return false;
}
ThreadManager manager;
manager.Init(8);
Promise<ResultObject> *object(Promise<ResultObject>::Spawn(&manager, &ResultProducer));
if (!TestParallelLoop(&manager)) {
return false;
}
sleep_ms(1000);
ResultObject *result = object->BlockUntilReady();
if (result) {
// Note that the data is owned by the promise so we don't
// delete it here.
printf("Got result back!");
}
delete object;
return true;
}

View File

@ -656,6 +656,7 @@ bool TestArmEmitter();
bool TestArm64Emitter();
bool TestX64Emitter();
bool TestShaderGenerators();
bool TestThreadManager();
TestItem availableTests[] = {
#if PPSSPP_ARCH(ARM64) || PPSSPP_ARCH(AMD64) || PPSSPP_ARCH(X86)
@ -682,6 +683,7 @@ TestItem availableTests[] = {
TEST_ITEM(ShaderGenerators),
TEST_ITEM(Path),
TEST_ITEM(AndroidContentURI),
TEST_ITEM(ThreadManager),
};
int main(int argc, const char *argv[]) {

View File

@ -383,6 +383,7 @@
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|ARM'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="TestShaderGenerators.cpp" />
<ClCompile Include="TestThreadManager.cpp" />
<ClCompile Include="TestVertexJit.cpp" />
<ClCompile Include="UnitTest.cpp" />
<ClCompile Include="TestArmEmitter.cpp">

View File

@ -12,6 +12,7 @@
<Filter>Windows</Filter>
</ClCompile>
<ClCompile Include="TestShaderGenerators.cpp" />
<ClCompile Include="TestThreadManager.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="JitHarness.h" />