Get rid of the PrioritizedWorkQueue. Instead just queue tasks on the ThreadManager.

This commit is contained in:
Henrik Rydgård 2020-11-30 16:46:52 +01:00
parent 34f8fc557f
commit 446c2c027e
17 changed files with 102 additions and 293 deletions

View File

@ -611,8 +611,6 @@ add_library(Common STATIC
Common/Thread/Channel.h
Common/Thread/ParallelLoop.cpp
Common/Thread/ParallelLoop.h
Common/Thread/PrioritizedWorkQueue.cpp
Common/Thread/PrioritizedWorkQueue.h
Common/Thread/Promise.h
Common/Thread/ThreadUtil.cpp
Common/Thread/ThreadUtil.h

View File

@ -528,8 +528,9 @@
<ClInclude Include="System\NativeApp.h" />
<ClInclude Include="System\System.h" />
<ClInclude Include="Thread\Channel.h" />
<ClInclude Include="Thread\Event.h" />
<ClInclude Include="Thread\Executor.h" />
<ClInclude Include="Thread\ParallelLoop.h" />
<ClInclude Include="Thread\PrioritizedWorkQueue.h" />
<ClInclude Include="Thread\Promise.h" />
<ClInclude Include="Thread\ThreadManager.h" />
<ClInclude Include="Thread\ThreadUtil.h" />
@ -968,7 +969,6 @@
<ClCompile Include="StringUtils.cpp" />
<ClCompile Include="System\Display.cpp" />
<ClCompile Include="Thread\ParallelLoop.cpp" />
<ClCompile Include="Thread\PrioritizedWorkQueue.cpp" />
<ClCompile Include="Thread\ThreadManager.cpp" />
<ClCompile Include="Thread\ThreadUtil.cpp" />
<ClCompile Include="Thunk.cpp" />
@ -999,4 +999,4 @@
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
</Project>
</Project>

View File

@ -90,9 +90,6 @@
<ClInclude Include="..\ext\libpng17\pngstruct.h">
<Filter>ext\libpng17</Filter>
</ClInclude>
<ClInclude Include="Thread\PrioritizedWorkQueue.h">
<Filter>Thread</Filter>
</ClInclude>
<ClInclude Include="Thread\ThreadUtil.h">
<Filter>Thread</Filter>
</ClInclude>
@ -407,6 +404,9 @@
<ClInclude Include="Thread\ParallelLoop.h">
<Filter>Thread</Filter>
</ClInclude>
<ClInclude Include="Thread\Event.h">
<Filter>Thread</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<ClCompile Include="ABI.cpp" />
@ -502,9 +502,6 @@
<ClCompile Include="..\ext\libpng17\pngwutil.c">
<Filter>ext\libpng17</Filter>
</ClCompile>
<ClCompile Include="Thread\PrioritizedWorkQueue.cpp">
<Filter>Thread</Filter>
</ClCompile>
<ClCompile Include="Thread\ThreadUtil.cpp">
<Filter>Thread</Filter>
</ClCompile>

27
Common/Thread/Event.h Normal file
View File

@ -0,0 +1,27 @@
#pragma once
#include "Common/Thread/ThreadManager.h"
#include <condition_variable>
#include <mutex>
struct Event : public Waitable {
public:
void Wait() override {
if (triggered_) {
return;
}
std::unique_lock<std::mutex> lock;
cond_.wait(lock, [&] { return !triggered_; });
}
void Notify() {
std::unique_lock<std::mutex> lock;
triggered_ = true;
cond_.notify_one();
}
private:
std::condition_variable cond_;
std::mutex mutex_;
bool triggered_ = false;
};

View File

@ -7,7 +7,7 @@ public:
LoopRangeTask(WaitableCounter *counter, const std::function<void(int, int)> &loop, int lower, int upper)
: counter_(counter), loop_(loop), lower_(lower), upper_(upper) {}
void run() override {
void Run() override {
loop_(lower_, upper_);
counter_->Count();
}

View File

@ -8,7 +8,7 @@
#include "Common/Thread/ThreadManager.h"
// Kind of like a semaphore I guess.
struct WaitableCounter {
struct WaitableCounter : public Waitable {
public:
WaitableCounter(int maxValue) : maxValue_(maxValue) {}
@ -19,18 +19,13 @@ public:
}
}
void Wait() {
void Wait() override {
std::unique_lock<std::mutex> lock(mutex_);
while (count_.load() != maxValue_) {
cond_.wait(lock);
}
}
void WaitAndRelease() {
Wait();
delete this;
}
int maxValue_;
std::atomic<int> count_;
std::mutex mutex_;

View File

@ -1,140 +0,0 @@
#include <functional>
#include <thread>
#include "Common/TimeUtil.h"
#include "Common/Thread/PrioritizedWorkQueue.h"
#include "Common/Log.h"
PrioritizedWorkQueue::~PrioritizedWorkQueue() {
if (!done_) {
ERROR_LOG(SYSTEM, "PrioritizedWorkQueue destroyed but not done!");
}
}
void PrioritizedWorkQueue::Add(PrioritizedWorkQueueItem *item) {
std::lock_guard<std::mutex> guard(mutex_);
queue_.push_back(item);
notEmpty_.notify_one();
}
void PrioritizedWorkQueue::Stop() {
std::lock_guard<std::mutex> guard(mutex_);
done_ = true;
notEmpty_.notify_one();
}
void PrioritizedWorkQueue::Flush() {
std::lock_guard<std::mutex> guard(mutex_);
int flush_count = 0;
for (auto iter = queue_.begin(); iter != queue_.end(); ++iter) {
delete *iter;
flush_count++;
}
queue_.clear();
if (flush_count > 0) {
INFO_LOG(SYSTEM, "PrioritizedWorkQueue: Flushed %d un-executed tasks", flush_count);
}
}
bool PrioritizedWorkQueue::WaitUntilDone(bool all) {
// We'll lock drain this entire time, so make sure you follow that lock ordering.
std::unique_lock<std::mutex> guard(drainMutex_);
if (AllItemsDone()) {
return true;
}
while (!AllItemsDone()) {
drain_.wait(guard);
if (!all) {
// Return whether empty or not, something just drained.
return AllItemsDone();
}
}
return true;
}
void PrioritizedWorkQueue::NotifyDrain() {
std::lock_guard<std::mutex> guard(drainMutex_);
drain_.notify_one();
}
bool PrioritizedWorkQueue::AllItemsDone() {
std::lock_guard<std::mutex> guard(mutex_);
return queue_.empty() && !working_;
}
// The worker should simply call this in a loop. Will block when appropriate.
PrioritizedWorkQueueItem *PrioritizedWorkQueue::Pop() {
{
std::lock_guard<std::mutex> guard(mutex_);
working_ = false; // The thread only calls Pop if it's done.
}
// Important: make sure mutex_ is not locked while draining.
NotifyDrain();
std::unique_lock<std::mutex> guard(mutex_);
if (done_) {
return 0;
}
while (queue_.empty()) {
notEmpty_.wait(guard);
if (done_) {
return 0;
}
}
// Find the top priority item (lowest value).
float best_prio = std::numeric_limits<float>::infinity();
std::vector<PrioritizedWorkQueueItem *>::iterator best = queue_.end();
for (auto iter = queue_.begin(); iter != queue_.end(); ++iter) {
if ((*iter)->priority() < best_prio) {
best = iter;
best_prio = (*iter)->priority();
}
}
if (best != queue_.end()) {
PrioritizedWorkQueueItem *poppedItem = *best;
queue_.erase(best);
working_ = true; // This will be worked on.
return poppedItem;
} else {
// Not really sure how this can happen, but let's be safe.
return 0;
}
}
// TODO: This feels ugly. Revisit later.
static std::thread *workThread;
static void threadfunc(PrioritizedWorkQueue *wq) {
SetCurrentThreadName("PrioQueue");
while (true) {
PrioritizedWorkQueueItem *item = wq->Pop();
if (!item) {
if (wq->Done())
break;
} else {
item->run();
delete item;
}
}
}
void ProcessWorkQueueOnThreadWhile(PrioritizedWorkQueue *wq) {
workThread = new std::thread([=](){threadfunc(wq);});
}
void StopProcessingWorkQueue(PrioritizedWorkQueue *wq) {
wq->Stop();
if (workThread) {
workThread->join();
delete workThread;
}
workThread = nullptr;
}

View File

@ -1,65 +0,0 @@
#pragma once
#include <vector>
#include <limits>
#include <mutex>
#include <condition_variable>
#include "Common/Thread/ThreadUtil.h"
#include "Common/Common.h"
// Priorities can change dynamically.
// Try to make priority() fast, it will be called a lot.
class PrioritizedWorkQueueItem {
public:
PrioritizedWorkQueueItem() {}
virtual ~PrioritizedWorkQueueItem() {}
virtual void run() = 0;
virtual float priority() = 0; // Low priority value = high priority!
private:
DISALLOW_COPY_AND_ASSIGN(PrioritizedWorkQueueItem);
};
class PrioritizedWorkQueue {
public:
PrioritizedWorkQueue() : done_(false), working_(false) {}
~PrioritizedWorkQueue();
// Takes ownership.
void Add(PrioritizedWorkQueueItem *item);
// The worker should simply call this in a loop. Will block when appropriate.
PrioritizedWorkQueueItem *Pop();
void Flush();
bool Done() { return done_; }
void Stop();
bool WaitUntilDone(bool all = true);
bool IsWorking() {
return working_;
}
private:
void NotifyDrain();
bool AllItemsDone();
bool done_;
bool working_;
std::mutex mutex_;
std::mutex drainMutex_;
std::condition_variable notEmpty_;
std::condition_variable drain_;
std::vector<PrioritizedWorkQueueItem *> queue_;
DISALLOW_COPY_AND_ASSIGN(PrioritizedWorkQueue);
};
// Starts up a thread that keeps trying to run this workqueue.
// TODO: This feels ugly. Revisit later.
void ProcessWorkQueueOnThreadWhile(PrioritizedWorkQueue *wq);
void StopProcessingWorkQueue(PrioritizedWorkQueue *wq);

View File

@ -13,7 +13,7 @@ public:
tx_->Release();
}
void run() override {
void Run() override {
T *value = fun_();
tx_->Send(value);
tx_->Release();

View File

@ -72,7 +72,7 @@ static void WorkerThreadFunc(GlobalThreadContext *global, ThreadContext *thread)
// The task itself takes care of notifying anyone waiting on it. Not the
// responsibility of the ThreadManager (although it could be!).
if (task) {
task->run();
task->Run();
delete task;
}
}
@ -124,6 +124,6 @@ int ThreadManager::GetNumLooperThreads() const {
return (int)(global_->threads_.size() / 2);
}
void ThreadManager::TryCancelTask(Task *task) {
void ThreadManager::TryCancelTask(uint64_t taskID) {
// Do nothing
}

View File

@ -1,5 +1,7 @@
#pragma once
#include <cstdint>
// The new threadpool.
// To help future smart scheduling.
@ -8,12 +10,27 @@ enum class TaskType {
IO_BLOCKING,
};
// Implement this to make something that you can run on the thread manager.
class Task {
public:
virtual ~Task() {}
virtual void run() = 0;
virtual bool cancellable() { return false; }
virtual void cancel() {}
virtual void Run() = 0;
virtual bool Cancellable() { return false; }
virtual void Cancel() {}
virtual float Priority() { return 1.0f; }
virtual uint64_t id() { return 0; }
};
class Waitable {
public:
virtual ~Waitable() {}
virtual void Wait() = 0;
void WaitAndRelease() {
Wait();
delete this;
}
};
struct ThreadContext;
@ -29,8 +46,9 @@ public:
void EnqueueTaskOnThread(int threadNum, Task *task, TaskType taskType = TaskType::CPU_COMPUTE);
// Currently does nothing. It will always be best-effort - maybe it cancels,
// maybe it doesn't.
void TryCancelTask(Task *task);
// maybe it doesn't. Note that the id is the id() returned by the task. You need to make that
// something meaningful yourself.
void TryCancelTask(uint64_t id);
// Parallel loops get to use half the threads,
// so we still have some worker threads for other tasks.

View File

@ -23,7 +23,7 @@
#include <algorithm>
#include "Common/GPU/thin3d.h"
#include "Common/Thread/PrioritizedWorkQueue.h"
#include "Common/Thread/ThreadManager.h"
#include "Common/File/VFS/VFS.h"
#include "Common/File/FileUtil.h"
#include "Common/File/Path.h"
@ -214,8 +214,8 @@ bool GameInfo::LoadFromPath(const Path &gamePath) {
std::shared_ptr<FileLoader> GameInfo::GetFileLoader() {
if (filePath_.empty()) {
// Happens when workqueue tries to figure out priorities in PrioritizedWorkQueue::Pop(),
// because priority() calls GetFileLoader()... gnarly.
// Happens when workqueue tries to figure out priorities,
// because Priority() calls GetFileLoader()... gnarly.
return fileLoader;
}
if (!fileLoader) {
@ -331,7 +331,7 @@ static bool ReadVFSToString(const char *filename, std::string *contents, std::mu
}
class GameInfoWorkItem : public PrioritizedWorkQueueItem {
class GameInfoWorkItem : public Task {
public:
GameInfoWorkItem(const Path &gamePath, std::shared_ptr<GameInfo> &info)
: gamePath_(gamePath), info_(info) {
@ -341,15 +341,18 @@ public:
info_->DisposeFileLoader();
}
void run() override {
void Run() override {
// NOTE! DO NOT early-return from this, always use "goto done". It's essential
// that we trigger the event.
if (!info_->LoadFromPath(gamePath_)) {
info_->pending = false;
return;
goto done;
}
// In case of a remote file, check if it actually exists before locking.
if (!info_->GetFileLoader()->Exists()) {
info_->pending = false;
return;
goto done;
}
info_->working = true;
@ -371,10 +374,8 @@ public:
if (pbp.IsELF()) {
goto handleELF;
}
ERROR_LOG(LOADER, "invalid pbp '%s'\n", pbpLoader->GetPath().ToVisualString().c_str());
info_->pending = false;
info_->working = false;
return;
ERROR_LOG(LOADER, "invalid pbp '%s'\n", pbpLoader->GetPath().c_str());
goto done;
}
// First, PARAM.SFO.
@ -541,15 +542,11 @@ handleELF:
// few files.
auto fl = info_->GetFileLoader();
if (!fl) {
info_->pending = false;
info_->working = false;
return; // Happens with UWP currently, TODO...
goto done;
}
BlockDevice *bd = constructBlockDevice(info_->GetFileLoader().get());
if (!bd) {
info_->pending = false;
info_->working = false;
return; // nothing to do here..
goto done;
}
ISOFileSystem umd(&handles, bd);
@ -629,12 +626,14 @@ handleELF:
info_->installDataSize = info_->GetInstallDataSizeInBytes();
}
done:
info_->pending = false;
info_->working = false;
info_->readyEvent.Notify();
// INFO_LOG(SYSTEM, "Completed writing info for %s", info_->GetTitle().c_str());
}
float priority() override {
float Priority() override {
auto fl = info_->GetFileLoader();
if (fl && fl->IsRemote()) {
// Increase the value so remote info loads after non-remote.
@ -649,7 +648,7 @@ private:
DISALLOW_COPY_AND_ASSIGN(GameInfoWorkItem);
};
GameInfoCache::GameInfoCache() : gameInfoWQ_(nullptr) {
GameInfoCache::GameInfoCache() {
Init();
}
@ -658,28 +657,15 @@ GameInfoCache::~GameInfoCache() {
Shutdown();
}
void GameInfoCache::Init() {
gameInfoWQ_ = new PrioritizedWorkQueue();
ProcessWorkQueueOnThreadWhile(gameInfoWQ_);
}
void GameInfoCache::Init() {}
void GameInfoCache::Shutdown() {
CancelAll();
if (gameInfoWQ_) {
StopProcessingWorkQueue(gameInfoWQ_);
delete gameInfoWQ_;
gameInfoWQ_ = nullptr;
}
}
void GameInfoCache::Clear() {
CancelAll();
if (gameInfoWQ_) {
gameInfoWQ_->Flush();
gameInfoWQ_->WaitUntilDone();
}
info_.clear();
}
@ -706,31 +692,21 @@ void GameInfoCache::FlushBGs() {
}
void GameInfoCache::PurgeType(IdentifiedFileType fileType) {
if (gameInfoWQ_)
gameInfoWQ_->Flush();
restart:
for (auto iter = info_.begin(); iter != info_.end(); iter++) {
for (auto iter = info_.begin(); iter != info_.end();) {
auto &info = iter->second;
if (!info->working && info->fileType == fileType) {
info_.erase(iter);
goto restart;
info->readyEvent.Wait();
if (info->fileType == fileType) {
iter = info_.erase(iter);
} else {
iter++;
}
}
}
void GameInfoCache::WaitUntilDone(std::shared_ptr<GameInfo> &info) {
while (info->pending) {
if (gameInfoWQ_->WaitUntilDone(false)) {
// A true return means everything finished, so bail out.
// This way even if something gets stuck, we won't hang.
break;
}
// Otherwise, wait again if it's not done...
}
info->readyEvent.Wait();
}
// Runs on the main thread. Only call from render() and similar, not update()!
// Can also be called from the audio thread for menu background music.
std::shared_ptr<GameInfo> GameInfoCache::GetInfo(Draw::DrawContext *draw, const Path &gamePath, int wantFlags) {
@ -776,11 +752,11 @@ std::shared_ptr<GameInfo> GameInfoCache::GetInfo(Draw::DrawContext *draw, const
}
GameInfoWorkItem *item = new GameInfoWorkItem(gamePath, info);
gameInfoWQ_->Add(item);
g_threadManager.EnqueueTask(item);
// Don't re-insert if we already have it.
if (info_.find(pathStr) == info_.end())
info_[pathStr] = std::shared_ptr<GameInfo>(info);
info_[pathStr] = info;
return info;
}

View File

@ -23,6 +23,7 @@
#include <mutex>
#include <atomic>
#include "Common/Thread/Event.h"
#include "Core/ELF/ParamSFO.h"
#include "Common/File/Path.h"
#include "UI/TextureUtil.h"
@ -31,7 +32,6 @@ namespace Draw {
class DrawContext;
class Texture;
}
class PrioritizedWorkQueue;
// A GameInfo holds information about a game, and also lets you do things that the VSH
// does on the PSP, namely checking for and deleting savedata, and similar things.
@ -140,9 +140,12 @@ public:
u64 gameSize = 0;
u64 saveDataSize = 0;
u64 installDataSize = 0;
std::atomic<bool> pending{};
std::atomic<bool> working{};
Event readyEvent;
protected:
// Note: this can change while loading, use GetTitle().
std::string title;
@ -182,9 +185,6 @@ private:
// Maps ISO path to info. Need to use shared_ptr as we can return these pointers -
// and if they get destructed while being in use, that's bad.
std::map<std::string, std::shared_ptr<GameInfo> > info_;
// Work queue and management
PrioritizedWorkQueue *gameInfoWQ_;
};
// This one can be global, no good reason not to.

View File

@ -486,7 +486,7 @@
<ClInclude Include="..\..\Common\System\Display.h" />
<ClInclude Include="..\..\Common\System\NativeApp.h" />
<ClInclude Include="..\..\Common\System\System.h" />
<ClInclude Include="..\..\Common\Thread\PrioritizedWorkQueue.h" />
<ClInclude Include="..\..\Common\Thread\Executor.h" />
<ClInclude Include="..\..\Common\Thread\Channel.h" />
<ClInclude Include="..\..\Common\Thread\Promise.h" />
<ClInclude Include="..\..\Common\Thread\ThreadUtil.h" />
@ -599,7 +599,7 @@
<ClCompile Include="..\..\Common\OSVersion.cpp" />
<ClCompile Include="..\..\Common\StringUtils.cpp" />
<ClCompile Include="..\..\Common\System\Display.cpp" />
<ClCompile Include="..\..\Common\Thread\PrioritizedWorkQueue.cpp" />
<ClCompile Include="..\..\Common\Thread\Executor.cpp" />
<ClCompile Include="..\..\Common\Thread\ThreadUtil.cpp" />
<ClCompile Include="..\..\Common\Thread\ThreadManager.cpp" />
<ClCompile Include="..\..\Common\Thread\ParallelLoop.cpp" />

View File

@ -180,7 +180,7 @@
<ClCompile Include="..\..\Common\Input\InputState.cpp">
<Filter>Input</Filter>
</ClCompile>
<ClCompile Include="..\..\Common\Thread\PrioritizedWorkQueue.cpp">
<ClCompile Include="..\..\Common\Thread\Executor.cpp">
<Filter>Thread</Filter>
</ClCompile>
<ClCompile Include="..\..\Common\Thread\ThreadManager.cpp">
@ -460,7 +460,7 @@
<ClInclude Include="..\..\Common\Input\KeyCodes.h">
<Filter>Input</Filter>
</ClInclude>
<ClInclude Include="..\..\Common\Thread\PrioritizedWorkQueue.h">
<ClInclude Include="..\..\Common\Thread\Executor.h">
<Filter>Thread</Filter>
</ClInclude>
<ClInclude Include="..\..\Common\Thread\Channel.h">

View File

@ -285,7 +285,11 @@ EXEC_AND_LIB_FILES := \
$(SRC)/Common/Net/WebsocketServer.cpp \
$(SRC)/Common/Profiler/Profiler.cpp \
$(SRC)/Common/System/Display.cpp \
<<<<<<< HEAD
$(SRC)/Common/Thread/PrioritizedWorkQueue.cpp \
=======
$(SRC)/Common/Thread/Executor.cpp \
>>>>>>> 82282c7a3 (Get rid of the PrioritizedWorkQueue. Instead just queue tasks on the ThreadManager.)
$(SRC)/Common/Thread/ThreadUtil.cpp \
$(SRC)/Common/Thread/ThreadManager.cpp \
$(SRC)/Common/Thread/ParallelLoop.cpp \

View File

@ -278,7 +278,6 @@ SOURCES_CXX += \
$(COMMONDIR)/Render/TextureAtlas.cpp \
$(COMMONDIR)/Serialize/Serializer.cpp \
$(COMMONDIR)/Thread/ThreadUtil.cpp \
$(COMMONDIR)/Thread/PrioritizedWorkQueue.cpp \
$(COMMONDIR)/Thread/ParallelLoop.cpp \
$(COMMONDIR)/Thread/ThreadManager.cpp \
$(COMMONDIR)/UI/Root.cpp \