ThreadManager: Protect against teardown hang/leak.

Make sure we lock during canceled checks/updates, and also try to free
tasks queued on threads that didn't end up running.
This commit is contained in:
Unknown W. Brackets 2021-11-27 08:43:51 -08:00
parent de9a4dcede
commit 12b790bb81
2 changed files with 54 additions and 8 deletions

View File

@ -53,15 +53,59 @@ ThreadManager::~ThreadManager() {
}
void ThreadManager::Teardown() {
for (size_t i = 0; i < global_->threads_.size(); i++) {
global_->threads_[i]->cancelled = true;
global_->threads_[i]->cond.notify_one();
for (ThreadContext *&threadCtx : global_->threads_) {
threadCtx->cancelled = true;
std::unique_lock<std::mutex> lock(threadCtx->mutex);
threadCtx->cond.notify_one();
}
for (size_t i = 0; i < global_->threads_.size(); i++) {
global_->threads_[i]->thread.join();
delete global_->threads_[i];
// Purge any cancellable tasks while the threads shut down.
bool done = false;
while (!done) {
done = true;
std::unique_lock<std::mutex> lock(global_->mutex);
for (auto it = global_->queue.begin(); it != global_->queue.end(); ++it) {
if (TeardownTask(*it, false)) {
global_->queue.erase(it);
global_->queue_size--;
done = false;
break;
}
}
}
for (ThreadContext *&threadCtx : global_->threads_) {
threadCtx->thread.join();
// TODO: Is it better to just delete these?
TeardownTask(threadCtx->private_single, true);
for (Task *task : threadCtx->private_queue) {
TeardownTask(threadCtx->private_single, true);
}
delete threadCtx;
}
global_->threads_.clear();
if (global_->queue_size > 0) {
WARN_LOG(SYSTEM, "ThreadManager::Teardown() with tasks still enqueued");
}
}
bool ThreadManager::TeardownTask(Task *task, bool enqueue) {
if (!task)
return true;
if (task->Cancellable()) {
task->Cancel();
delete task;
return true;
}
if (enqueue) {
global_->queue.push_back(task);
global_->queue_size++;
}
return false;
}
static void WorkerThreadFunc(GlobalThreadContext *global, ThreadContext *thread) {
@ -72,7 +116,7 @@ static void WorkerThreadFunc(GlobalThreadContext *global, ThreadContext *thread)
Task *task = thread->private_single.exchange(nullptr);
// Check the global queue first, then check the private queue and wait if there's nothing to do.
if (!task) {
if (!task && global->queue_size.load() > 0) {
// Grab one from the global queue if there is any.
std::unique_lock<std::mutex> lock(global->mutex);
if (!global->queue.empty()) {
@ -91,7 +135,7 @@ static void WorkerThreadFunc(GlobalThreadContext *global, ThreadContext *thread)
if (!thread->private_queue.empty()) {
task = thread->private_queue.front();
thread->private_queue.pop_front();
} else if (thread->private_single.load() == nullptr && global->queue_size.load() == 0) {
} else if (!thread->private_single && !thread->cancelled && global->queue_size.load() == 0) {
thread->cond.wait(lock);
}
}

View File

@ -60,6 +60,8 @@ public:
int GetNumLooperThreads() const;
private:
bool TeardownTask(Task *task, bool enqueue);
// This is always pointing to a context, initialized in the constructor.
GlobalThreadContext *global_;