Revert "[clangd] The new threading implementation" (r324356)

And the follow-up changes r324361 and r324363.
These changes seem to break two buildbots:
  - http://lab.llvm.org:8011/builders/clang-atom-d525-fedora-rel/builds/14091
  - http://lab.llvm.org:8011/builders/clang-x86_64-linux-selfhost-modules-2/builds/16001

We will need to investigate what went wrong and resubmit the changes
afterwards.

llvm-svn: 324386
This commit is contained in:
Ilya Biryukov 2018-02-06 19:22:40 +00:00
parent 6ef990e9ce
commit 3693f5941a
11 changed files with 283 additions and 537 deletions

View File

@ -6,6 +6,7 @@ add_clang_library(clangDaemon
ClangdLSPServer.cpp
ClangdServer.cpp
ClangdUnit.cpp
ClangdUnitStore.cpp
CodeComplete.cpp
CodeCompletionStrings.cpp
CompileArgsCache.cpp

View File

@ -11,6 +11,7 @@
#define LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDSERVER_H
#include "ClangdUnit.h"
#include "ClangdUnitStore.h"
#include "CodeComplete.h"
#include "CompileArgsCache.h"
#include "DraftStore.h"

View File

@ -151,8 +151,6 @@ using ASTParsedCallback = std::function<void(PathRef Path, ParsedAST *)>;
/// Manages resources, required by clangd. Allows to rebuild file with new
/// contents, and provides AST and Preamble for it.
/// NOTE: Threading-related bits of CppFile are now deprecated and will be
/// removed soon.
class CppFile : public std::enable_shared_from_this<CppFile> {
public:
// We only allow to create CppFile as shared_ptr, because a future returned by
@ -180,7 +178,6 @@ public:
/// that will wait for any ongoing rebuilds to finish and actually set the AST
/// and Preamble to nulls. It can be run on a different thread. This function
/// is useful to cancel ongoing rebuilds, if any, before removing CppFile.
/// DEPRECATED. This function will be removed soon, please do not use it.
UniqueFunction<void()> deferCancelRebuild();
/// Rebuild AST and Preamble synchronously on the calling thread.
@ -203,7 +200,6 @@ public:
/// The future to finish rebuild returns a list of diagnostics built during
/// reparse, or None, if another deferRebuild was called before this
/// rebuild was finished.
/// DEPRECATED. This function will be removed soon, please do not use it.
UniqueFunction<llvm::Optional<std::vector<DiagWithFixIts>>()>
deferRebuild(ParseInputs &&Inputs);

View File

@ -0,0 +1,37 @@
//===--- ClangdUnitStore.cpp - A ClangdUnits container -----------*-C++-*-===//
//
// The LLVM Compiler Infrastructure
//
// This file is distributed under the University of Illinois Open Source
// License. See LICENSE.TXT for details.
//
//===----------------------------------------------------------------------===//
#include "ClangdUnitStore.h"
#include "llvm/Support/Path.h"
#include <algorithm>
using namespace clang::clangd;
using namespace clang;
std::shared_ptr<CppFile> CppFileCollection::removeIfPresent(PathRef File) {
std::lock_guard<std::mutex> Lock(Mutex);
auto It = OpenedFiles.find(File);
if (It == OpenedFiles.end())
return nullptr;
std::shared_ptr<CppFile> Result = It->second;
OpenedFiles.erase(It);
return Result;
}
std::vector<std::pair<Path, std::size_t>>
CppFileCollection::getUsedBytesPerFile() const {
std::lock_guard<std::mutex> Lock(Mutex);
std::vector<std::pair<Path, std::size_t>> Result;
Result.reserve(OpenedFiles.size());
for (auto &&PathAndFile : OpenedFiles)
Result.push_back(
{PathAndFile.first().str(), PathAndFile.second->getUsedBytes()});
return Result;
}

View File

@ -0,0 +1,73 @@
//===--- ClangdUnitStore.h - A container of CppFiles -------------*-C++-*-===//
//
// The LLVM Compiler Infrastructure
//
// This file is distributed under the University of Illinois Open Source
// License. See LICENSE.TXT for details.
//
//===---------------------------------------------------------------------===//
#ifndef LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDUNITSTORE_H
#define LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDUNITSTORE_H
#include "ClangdUnit.h"
#include "GlobalCompilationDatabase.h"
#include "Logger.h"
#include "Path.h"
#include "clang/Tooling/CompilationDatabase.h"
#include <mutex>
namespace clang {
namespace clangd {
class Logger;
/// Thread-safe mapping from FileNames to CppFile.
class CppFileCollection {
public:
/// \p ASTCallback is called when a file is parsed synchronously. This should
/// not be expensive since it blocks diagnostics.
explicit CppFileCollection(bool StorePreamblesInMemory,
std::shared_ptr<PCHContainerOperations> PCHs,
ASTParsedCallback ASTCallback)
: ASTCallback(std::move(ASTCallback)), PCHs(std::move(PCHs)),
StorePreamblesInMemory(StorePreamblesInMemory) {}
std::shared_ptr<CppFile> getOrCreateFile(PathRef File) {
std::lock_guard<std::mutex> Lock(Mutex);
auto It = OpenedFiles.find(File);
if (It == OpenedFiles.end()) {
It = OpenedFiles
.try_emplace(File, CppFile::Create(File, StorePreamblesInMemory,
PCHs, ASTCallback))
.first;
}
return It->second;
}
std::shared_ptr<CppFile> getFile(PathRef File) const {
std::lock_guard<std::mutex> Lock(Mutex);
auto It = OpenedFiles.find(File);
if (It == OpenedFiles.end())
return nullptr;
return It->second;
}
/// Removes a CppFile, stored for \p File, if it's inside collection and
/// returns it.
std::shared_ptr<CppFile> removeIfPresent(PathRef File);
/// Gets used memory for each of the stored files.
std::vector<std::pair<Path, std::size_t>> getUsedBytesPerFile() const;
private:
mutable std::mutex Mutex;
llvm::StringMap<std::shared_ptr<CppFile>> OpenedFiles;
ASTParsedCallback ASTCallback;
std::shared_ptr<PCHContainerOperations> PCHs;
bool StorePreamblesInMemory;
};
} // namespace clangd
} // namespace clang
#endif

View File

@ -1,303 +1,9 @@
//===--- TUScheduler.cpp -----------------------------------------*-C++-*-===//
//
// The LLVM Compiler Infrastructure
//
// This file is distributed under the University of Illinois Open Source
// License. See LICENSE.TXT for details.
//
//===----------------------------------------------------------------------===//
// For each file, managed by TUScheduler, we create a single ASTWorker that
// manages an AST for that file. All operations that modify or read the AST are
// run on a separate dedicated thread asynchronously in FIFO order.
//
// We start processing each update immediately after we receive it. If two or
// more updates come subsequently without reads in-between, we attempt to drop
// an older one to not waste time building the ASTs we don't need.
//
// The processing thread of the ASTWorker is also responsible for building the
// preamble. However, unlike AST, the same preamble can be read concurrently, so
// we run each of async preamble reads on its own thread.
//
// To limit the concurrent load that clangd produces we mantain a semaphore that
// keeps more than a fixed number of threads from running concurrently.
//
// Rationale for cancelling updates.
// LSP clients can send updates to clangd on each keystroke. Some files take
// significant time to parse (e.g. a few seconds) and clangd can get starved by
// the updates to those files. Therefore we try to process only the last update,
// if possible.
// Our current strategy to do that is the following:
// - For each update we immediately schedule rebuild of the AST.
// - Rebuild of the AST checks if it was cancelled before doing any actual work.
// If it was, it does not do an actual rebuild, only reports llvm::None to the
// callback
// - When adding an update, we cancel the last update in the queue if it didn't
// have any reads.
// There is probably a optimal ways to do that. One approach we might take is
// the following:
// - For each update we remember the pending inputs, but delay rebuild of the
// AST for some timeout.
// - If subsequent updates come before rebuild was started, we replace the
// pending inputs and reset the timer.
// - If any reads of the AST are scheduled, we start building the AST
// immediately.
#include "TUScheduler.h"
#include "clang/Frontend/PCHContainerOperations.h"
#include "llvm/Support/Errc.h"
#include <memory>
#include <queue>
namespace clang {
namespace clangd {
namespace {
class ASTWorkerHandle;
/// Owns one instance of the AST, schedules updates and reads of it.
/// Also responsible for building and providing access to the preamble.
/// Each ASTWorker processes the async requests sent to it on a separate
/// dedicated thread.
/// The ASTWorker that manages the AST is shared by both the processing thread
/// and the TUScheduler. The TUScheduler should discard an ASTWorker when
/// remove() is called, but its thread may be busy and we don't want to block.
/// So the workers are accessed via an ASTWorkerHandle. Destroying the handle
/// signals the worker to exit its run loop and gives up shared ownership of the
/// worker.
class ASTWorker {
friend class ASTWorkerHandle;
ASTWorker(Semaphore &Barrier, std::shared_ptr<CppFile> AST, bool RunSync);
public:
/// Create a new ASTWorker and return a handle to it.
/// The processing thread is spawned using \p Tasks. However, when \p Tasks
/// is null, all requests will be processed on the calling thread
/// synchronously instead. \p Barrier is acquired when processing each
/// request, it is be used to limit the number of actively running threads.
static ASTWorkerHandle Create(AsyncTaskRunner *Tasks, Semaphore &Barrier,
std::shared_ptr<CppFile> AST);
~ASTWorker();
void update(ParseInputs Inputs,
UniqueFunction<void(llvm::Optional<std::vector<DiagWithFixIts>>)>
OnUpdated);
void runWithAST(UniqueFunction<void(llvm::Expected<InputsAndAST>)> Action);
std::shared_ptr<const PreambleData> getPossiblyStalePreamble() const;
std::size_t getUsedBytes() const;
private:
// Must be called exactly once on processing thread. Will return after
// stop() is called on a separate thread and all pending requests are
// processed.
void run();
/// Signal that run() should finish processing pending requests and exit.
void stop();
/// Adds a new task to the end of the request queue.
void startTask(UniqueFunction<void()> Task, bool isUpdate,
llvm::Optional<CancellationFlag> CF);
using RequestWithCtx = std::pair<UniqueFunction<void()>, Context>;
const bool RunSync;
Semaphore &Barrier;
// AST and FileInputs are only accessed on the processing thread from run().
const std::shared_ptr<CppFile> AST;
// Inputs, corresponding to the current state of AST.
ParseInputs FileInputs;
// Guards members used by both TUScheduler and the worker thread.
mutable std::mutex Mutex;
// Set to true to signal run() to finish processing.
bool Done; /* GUARDED_BY(Mutex) */
std::queue<RequestWithCtx> Requests; /* GUARDED_BY(Mutex) */
// Only set when last request is an update. This allows us to cancel an update
// that was never read, if a subsequent update comes in.
llvm::Optional<CancellationFlag> LastUpdateCF; /* GUARDED_BY(Mutex) */
std::condition_variable RequestsCV;
};
/// A smart-pointer-like class that points to an active ASTWorker.
/// In destructor, signals to the underlying ASTWorker that no new requests will
/// be sent and the processing loop may exit (after running all pending
/// requests).
class ASTWorkerHandle {
friend class ASTWorker;
ASTWorkerHandle(std::shared_ptr<ASTWorker> Worker)
: Worker(std::move(Worker)) {
assert(this->Worker);
}
public:
ASTWorkerHandle(const ASTWorkerHandle &) = delete;
ASTWorkerHandle &operator=(const ASTWorkerHandle &) = delete;
ASTWorkerHandle(ASTWorkerHandle &&) = default;
ASTWorkerHandle &operator=(ASTWorkerHandle &&) = default;
~ASTWorkerHandle() {
if (Worker)
Worker->stop();
}
ASTWorker &operator*() {
assert(Worker && "Handle was moved from");
return *Worker;
}
ASTWorker *operator->() {
assert(Worker && "Handle was moved from");
return Worker.get();
}
/// Returns an owning reference to the underlying ASTWorker that can outlive
/// the ASTWorkerHandle. However, no new requests to an active ASTWorker can
/// be schedule via the returned reference, i.e. only reads of the preamble
/// are possible.
std::shared_ptr<const ASTWorker> lock() { return Worker; }
private:
std::shared_ptr<ASTWorker> Worker;
};
ASTWorkerHandle ASTWorker::Create(AsyncTaskRunner *Tasks, Semaphore &Barrier,
std::shared_ptr<CppFile> AST) {
std::shared_ptr<ASTWorker> Worker(
new ASTWorker(Barrier, std::move(AST), /*RunSync=*/!Tasks));
if (Tasks)
Tasks->runAsync([Worker]() { Worker->run(); });
return ASTWorkerHandle(std::move(Worker));
}
ASTWorker::ASTWorker(Semaphore &Barrier, std::shared_ptr<CppFile> AST,
bool RunSync)
: RunSync(RunSync), Barrier(Barrier), AST(std::move(AST)), Done(false) {
if (RunSync)
return;
}
ASTWorker::~ASTWorker() {
#ifndef NDEBUG
std::lock_guard<std::mutex> Lock(Mutex);
assert(Done && "handle was not destroyed");
assert(Requests.empty() && "unprocessed requests when destroying ASTWorker");
#endif
}
void ASTWorker::update(
ParseInputs Inputs,
UniqueFunction<void(llvm::Optional<std::vector<DiagWithFixIts>>)>
OnUpdated) {
auto Task = [=](CancellationFlag CF, decltype(OnUpdated) OnUpdated) mutable {
if (CF.isCancelled()) {
OnUpdated(llvm::None);
return;
}
FileInputs = Inputs;
auto Diags = AST->rebuild(std::move(Inputs));
// We want to report the diagnostics even if this update was cancelled.
// It seems more useful than making the clients wait indefinitely if they
// spam us with updates.
OnUpdated(std::move(Diags));
};
CancellationFlag UpdateCF;
startTask(BindWithForward(Task, UpdateCF, std::move(OnUpdated)),
/*isUpdate=*/true, UpdateCF);
}
void ASTWorker::runWithAST(
UniqueFunction<void(llvm::Expected<InputsAndAST>)> Action) {
auto Task = [=](decltype(Action) Action) {
auto ASTWrapper = this->AST->getAST().get();
// FIXME: no need to lock here, cleanup the CppFile interface to get rid of
// them.
ASTWrapper->runUnderLock([&](ParsedAST *AST) {
if (!AST) {
Action(llvm::make_error<llvm::StringError>(
"invalid AST", llvm::errc::invalid_argument));
return;
}
Action(InputsAndAST{FileInputs, *AST});
});
};
startTask(BindWithForward(Task, std::move(Action)), /*isUpdate=*/false,
llvm::None);
}
std::shared_ptr<const PreambleData>
ASTWorker::getPossiblyStalePreamble() const {
return AST->getPossiblyStalePreamble();
}
std::size_t ASTWorker::getUsedBytes() const {
// FIXME(ibiryukov): we'll need to take locks here after we remove
// thread-safety from CppFile. For now, CppFile is thread-safe and we can
// safely call methods on it without acquiring a lock.
return AST->getUsedBytes();
}
void ASTWorker::stop() {
{
std::lock_guard<std::mutex> Lock(Mutex);
assert(!Done && "stop() called twice");
Done = true;
}
RequestsCV.notify_one();
}
void ASTWorker::startTask(UniqueFunction<void()> Task, bool isUpdate,
llvm::Optional<CancellationFlag> CF) {
assert(isUpdate == CF.hasValue() &&
"Only updates are expected to pass CancellationFlag");
if (RunSync) {
assert(!Done && "running a task after stop()");
Task();
return;
}
{
std::lock_guard<std::mutex> Lock(Mutex);
assert(!Done && "running a task after stop()");
if (isUpdate) {
if (!Requests.empty() && LastUpdateCF) {
// There were no reads for the last unprocessed update, let's cancel it
// to not waste time on it.
LastUpdateCF->cancel();
}
LastUpdateCF = std::move(*CF);
} else {
LastUpdateCF = llvm::None;
}
Requests.emplace(std::move(Task), Context::current().clone());
} // unlock Mutex.
RequestsCV.notify_one();
}
void ASTWorker::run() {
while (true) {
RequestWithCtx Req;
{
std::unique_lock<std::mutex> Lock(Mutex);
RequestsCV.wait(Lock, [&]() { return Done || !Requests.empty(); });
if (Requests.empty()) {
assert(Done);
return;
}
// Even when Done is true, we finish processing all pending requests
// before exiting the processing loop.
Req = std::move(Requests.front());
Requests.pop();
} // unlock Mutex
std::lock_guard<Semaphore> BarrierLock(Barrier);
WithContext Guard(std::move(Req.second));
Req.first();
}
}
} // namespace
unsigned getDefaultAsyncThreadsCount() {
unsigned HardwareConcurrency = std::thread::hardware_concurrency();
// C++ standard says that hardware_concurrency()
@ -308,114 +14,110 @@ unsigned getDefaultAsyncThreadsCount() {
return HardwareConcurrency;
}
struct TUScheduler::FileData {
/// Latest inputs, passed to TUScheduler::update().
ParseInputs Inputs;
ASTWorkerHandle Worker;
};
TUScheduler::TUScheduler(unsigned AsyncThreadsCount,
bool StorePreamblesInMemory,
ASTParsedCallback ASTCallback)
: StorePreamblesInMemory(StorePreamblesInMemory),
PCHOps(std::make_shared<PCHContainerOperations>()),
ASTCallback(std::move(ASTCallback)), Barrier(AsyncThreadsCount) {
if (0 < AsyncThreadsCount)
Tasks.emplace();
}
TUScheduler::~TUScheduler() {
// Notify all workers that they need to stop.
Files.clear();
// Wait for all in-flight tasks to finish.
if (Tasks)
Tasks->waitForAll();
}
: Files(StorePreamblesInMemory, std::make_shared<PCHContainerOperations>(),
std::move(ASTCallback)),
Threads(AsyncThreadsCount) {}
void TUScheduler::update(
PathRef File, ParseInputs Inputs,
UniqueFunction<void(llvm::Optional<std::vector<DiagWithFixIts>>)>
OnUpdated) {
std::unique_ptr<FileData> &FD = Files[File];
if (!FD) {
// Create a new worker to process the AST-related tasks.
ASTWorkerHandle Worker = ASTWorker::Create(
Tasks ? Tasks.getPointer() : nullptr, Barrier,
CppFile::Create(File, StorePreamblesInMemory, PCHOps, ASTCallback));
FD = std::unique_ptr<FileData>(new FileData{Inputs, std::move(Worker)});
} else {
FD->Inputs = Inputs;
}
FD->Worker->update(std::move(Inputs), std::move(OnUpdated));
CachedInputs[File] = Inputs;
auto Resources = Files.getOrCreateFile(File);
auto DeferredRebuild = Resources->deferRebuild(std::move(Inputs));
Threads.addToFront(
[](decltype(OnUpdated) OnUpdated,
decltype(DeferredRebuild) DeferredRebuild) {
auto Diags = DeferredRebuild();
OnUpdated(Diags);
},
std::move(OnUpdated), std::move(DeferredRebuild));
}
void TUScheduler::remove(PathRef File,
UniqueFunction<void(llvm::Error)> Action) {
auto It = Files.find(File);
if (It == Files.end()) {
CachedInputs.erase(File);
auto Resources = Files.removeIfPresent(File);
if (!Resources) {
Action(llvm::make_error<llvm::StringError>(
"trying to remove non-added document", llvm::errc::invalid_argument));
return;
}
Files.erase(It);
auto DeferredCancel = Resources->deferCancelRebuild();
Threads.addToFront(
[](decltype(Action) Action, decltype(DeferredCancel) DeferredCancel) {
DeferredCancel();
Action(llvm::Error::success());
},
std::move(Action), std::move(DeferredCancel));
}
void TUScheduler::runWithAST(
PathRef File, UniqueFunction<void(llvm::Expected<InputsAndAST>)> Action) {
auto It = Files.find(File);
if (It == Files.end()) {
auto Resources = Files.getFile(File);
if (!Resources) {
Action(llvm::make_error<llvm::StringError>(
"trying to get AST for non-added document",
llvm::errc::invalid_argument));
return;
}
It->second->Worker->runWithAST(std::move(Action));
const ParseInputs &Inputs = getInputs(File);
// We currently block the calling thread until AST is available and run the
// action on the calling thread to avoid inconsistent states coming from
// subsequent updates.
// FIXME(ibiryukov): this should be moved to the worker threads.
Resources->getAST().get()->runUnderLock([&](ParsedAST *AST) {
if (AST)
Action(InputsAndAST{Inputs, *AST});
else
Action(llvm::make_error<llvm::StringError>(
"Could not build AST for the latest file update",
llvm::errc::invalid_argument));
});
}
void TUScheduler::runWithPreamble(
PathRef File,
UniqueFunction<void(llvm::Expected<InputsAndPreamble>)> Action) {
auto It = Files.find(File);
if (It == Files.end()) {
std::shared_ptr<CppFile> Resources = Files.getFile(File);
if (!Resources) {
Action(llvm::make_error<llvm::StringError>(
"trying to get preamble for non-added document",
llvm::errc::invalid_argument));
return;
}
if (!Tasks) {
std::shared_ptr<const PreambleData> Preamble =
It->second->Worker->getPossiblyStalePreamble();
Action(InputsAndPreamble{It->second->Inputs, Preamble.get()});
return;
}
const ParseInputs &Inputs = getInputs(File);
std::shared_ptr<const PreambleData> Preamble =
Resources->getPossiblyStalePreamble();
Threads.addToFront(
[Resources, Preamble, Inputs](decltype(Action) Action) mutable {
if (!Preamble)
Preamble = Resources->getPossiblyStalePreamble();
ParseInputs InputsCopy = It->second->Inputs;
std::shared_ptr<const ASTWorker> Worker = It->second->Worker.lock();
auto Task = [InputsCopy, Worker, this](Context Ctx,
decltype(Action) Action) mutable {
std::lock_guard<Semaphore> BarrierLock(Barrier);
WithContext Guard(std::move(Ctx));
std::shared_ptr<const PreambleData> Preamble =
Worker->getPossiblyStalePreamble();
Action(InputsAndPreamble{InputsCopy, Preamble.get()});
};
Action(InputsAndPreamble{Inputs, Preamble.get()});
},
std::move(Action));
}
Tasks->runAsync(
BindWithForward(Task, Context::current().clone(), std::move(Action)));
const ParseInputs &TUScheduler::getInputs(PathRef File) {
auto It = CachedInputs.find(File);
assert(It != CachedInputs.end());
return It->second;
}
std::vector<std::pair<Path, std::size_t>>
TUScheduler::getUsedBytesPerFile() const {
std::vector<std::pair<Path, std::size_t>> Result;
Result.reserve(Files.size());
for (auto &&PathAndFile : Files)
Result.push_back(
{PathAndFile.first(), PathAndFile.second->Worker->getUsedBytes()});
return Result;
return Files.getUsedBytesPerFile();
}
} // namespace clangd
} // namespace clang

View File

@ -11,9 +11,9 @@
#define LLVM_CLANG_TOOLS_EXTRA_CLANGD_TUSCHEDULER_H
#include "ClangdUnit.h"
#include "ClangdUnitStore.h"
#include "Function.h"
#include "Threading.h"
#include "llvm/ADT/StringMap.h"
namespace clang {
namespace clangd {
@ -42,7 +42,6 @@ class TUScheduler {
public:
TUScheduler(unsigned AsyncThreadsCount, bool StorePreamblesInMemory,
ASTParsedCallback ASTCallback);
~TUScheduler();
/// Returns estimated memory usage for each of the currently open files.
/// The order of results is unspecified.
@ -82,17 +81,11 @@ public:
UniqueFunction<void(llvm::Expected<InputsAndPreamble>)> Action);
private:
/// This class stores per-file data in the Files map.
struct FileData;
const ParseInputs &getInputs(PathRef File);
const bool StorePreamblesInMemory;
const std::shared_ptr<PCHContainerOperations> PCHOps;
const ASTParsedCallback ASTCallback;
Semaphore Barrier;
llvm::StringMap<std::unique_ptr<FileData>> Files;
// None when running tasks synchronously and non-None when running tasks
// asynchronously.
llvm::Optional<AsyncTaskRunner> Tasks;
llvm::StringMap<ParseInputs> CachedInputs;
CppFileCollection Files;
ThreadPool Threads;
};
} // namespace clangd
} // namespace clang

View File

@ -1,63 +1,63 @@
#include "Threading.h"
#include "llvm/ADT/ScopeExit.h"
#include "llvm/Support/FormatVariadic.h"
#include "llvm/Support/Threading.h"
#include <thread>
namespace clang {
namespace clangd {
CancellationFlag::CancellationFlag()
: WasCancelled(std::make_shared<std::atomic<bool>>(false)) {}
Semaphore::Semaphore(std::size_t MaxLocks) : FreeSlots(MaxLocks) {}
void Semaphore::lock() {
std::unique_lock<std::mutex> Lock(Mutex);
SlotsChanged.wait(Lock, [&]() { return FreeSlots > 0; });
--FreeSlots;
}
void Semaphore::unlock() {
std::unique_lock<std::mutex> Lock(Mutex);
++FreeSlots;
Lock.unlock();
SlotsChanged.notify_one();
}
AsyncTaskRunner::~AsyncTaskRunner() { waitForAll(); }
void AsyncTaskRunner::waitForAll() {
std::unique_lock<std::mutex> Lock(Mutex);
TasksReachedZero.wait(Lock, [&]() { return InFlightTasks == 0; });
}
void AsyncTaskRunner::runAsync(UniqueFunction<void()> Action) {
{
std::unique_lock<std::mutex> Lock(Mutex);
++InFlightTasks;
ThreadPool::ThreadPool(unsigned AsyncThreadsCount)
: RunSynchronously(AsyncThreadsCount == 0) {
if (RunSynchronously) {
// Don't start the worker thread if we're running synchronously
return;
}
auto CleanupTask = llvm::make_scope_exit([this]() {
std::lock_guard<std::mutex> Lock(Mutex);
int NewTasksCnt = --InFlightTasks;
if (NewTasksCnt == 0) {
// Note: we can't unlock here because we don't want the object to be
// destroyed before we notify.
TasksReachedZero.notify_one();
}
});
Workers.reserve(AsyncThreadsCount);
for (unsigned I = 0; I < AsyncThreadsCount; ++I) {
Workers.push_back(std::thread([this, I]() {
llvm::set_thread_name(llvm::formatv("scheduler/{0}", I));
while (true) {
UniqueFunction<void()> Request;
Context Ctx;
std::thread(
[](decltype(Action) Action, decltype(CleanupTask)) {
Action();
// Make sure function stored by Action is destroyed before CleanupTask
// is run.
Action = nullptr;
},
std::move(Action), std::move(CleanupTask))
.detach();
// Pick request from the queue
{
std::unique_lock<std::mutex> Lock(Mutex);
// Wait for more requests.
RequestCV.wait(Lock,
[this] { return !RequestQueue.empty() || Done; });
if (RequestQueue.empty()) {
assert(Done);
return;
}
// We process requests starting from the front of the queue. Users of
// ThreadPool have a way to prioritise their requests by putting
// them to the either side of the queue (using either addToEnd or
// addToFront).
std::tie(Request, Ctx) = std::move(RequestQueue.front());
RequestQueue.pop_front();
} // unlock Mutex
WithContext WithCtx(std::move(Ctx));
Request();
}
}));
}
}
ThreadPool::~ThreadPool() {
if (RunSynchronously)
return; // no worker thread is running in that case
{
std::lock_guard<std::mutex> Lock(Mutex);
// Wake up the worker thread
Done = true;
} // unlock Mutex
RequestCV.notify_all();
for (auto &Worker : Workers)
Worker.join();
}
} // namespace clangd
} // namespace clang

View File

@ -12,65 +12,74 @@
#include "Context.h"
#include "Function.h"
#include <atomic>
#include <cassert>
#include <condition_variable>
#include <memory>
#include <deque>
#include <mutex>
#include <thread>
#include <vector>
namespace clang {
namespace clangd {
/// A shared boolean flag indicating if the computation was cancelled.
/// Once cancelled, cannot be returned to the previous state.
class CancellationFlag {
/// A simple fixed-size thread pool implementation.
class ThreadPool {
public:
CancellationFlag();
/// If \p AsyncThreadsCount is 0, requests added using addToFront and addToEnd
/// will be processed synchronously on the calling thread.
// Otherwise, \p AsyncThreadsCount threads will be created to schedule the
// requests.
ThreadPool(unsigned AsyncThreadsCount);
/// Destructor blocks until all requests are processed and worker threads are
/// terminated.
~ThreadPool();
void cancel() {
assert(WasCancelled && "the object was moved");
WasCancelled->store(true);
/// Add a new request to run function \p F with args \p As to the start of the
/// queue. The request will be run on a separate thread.
template <class Func, class... Args>
void addToFront(Func &&F, Args &&... As) {
if (RunSynchronously) {
std::forward<Func>(F)(std::forward<Args>(As)...);
return;
}
{
std::lock_guard<std::mutex> Lock(Mutex);
RequestQueue.emplace_front(
BindWithForward(std::forward<Func>(F), std::forward<Args>(As)...),
Context::current().clone());
}
RequestCV.notify_one();
}
bool isCancelled() const {
assert(WasCancelled && "the object was moved");
return WasCancelled->load();
/// Add a new request to run function \p F with args \p As to the end of the
/// queue. The request will be run on a separate thread.
template <class Func, class... Args> void addToEnd(Func &&F, Args &&... As) {
if (RunSynchronously) {
std::forward<Func>(F)(std::forward<Args>(As)...);
return;
}
{
std::lock_guard<std::mutex> Lock(Mutex);
RequestQueue.emplace_back(
BindWithForward(std::forward<Func>(F), std::forward<Args>(As)...),
Context::current().clone());
}
RequestCV.notify_one();
}
private:
std::shared_ptr<std::atomic<bool>> WasCancelled;
};
/// Limits the number of threads that can acquire the lock at the same time.
class Semaphore {
public:
Semaphore(std::size_t MaxLocks);
void lock();
void unlock();
private:
std::mutex Mutex;
std::condition_variable SlotsChanged;
std::size_t FreeSlots;
};
/// Runs tasks on separate (detached) threads and wait for all tasks to finish.
/// Objects that need to spawn threads can own an AsyncTaskRunner to ensure they
/// all complete on destruction.
class AsyncTaskRunner {
public:
/// Destructor waits for all pending tasks to finish.
~AsyncTaskRunner();
void waitForAll();
void runAsync(UniqueFunction<void()> Action);
private:
std::mutex Mutex;
std::condition_variable TasksReachedZero;
std::size_t InFlightTasks = 0;
bool RunSynchronously;
mutable std::mutex Mutex;
/// We run some tasks on separate threads(parsing, CppFile cleanup).
/// These threads looks into RequestQueue to find requests to handle and
/// terminate when Done is set to true.
std::vector<std::thread> Workers;
/// Setting Done to true will make the worker threads terminate.
bool Done = false;
/// A queue of requests.
std::deque<std::pair<UniqueFunction<void()>, Context>> RequestQueue;
/// Condition variable to wake up worker threads.
std::condition_variable RequestCV;
};
} // namespace clangd
} // namespace clang

View File

@ -21,7 +21,6 @@ add_extra_unittest(ClangdTests
JSONExprTests.cpp
URITests.cpp
TestFS.cpp
ThreadingTests.cpp
TraceTests.cpp
TUSchedulerTests.cpp
SourceCodeTests.cpp

View File

@ -1,65 +0,0 @@
//===-- ThreadingTests.cpp --------------------------------------*- C++ -*-===//
//
// The LLVM Compiler Infrastructure
//
// This file is distributed under the University of Illinois Open Source
// License. See LICENSE.TXT for details.
//
//===----------------------------------------------------------------------===//
#include "Threading.h"
#include "gtest/gtest.h"
#include <mutex>
namespace clang {
namespace clangd {
class ThreadingTest : public ::testing::Test {};
TEST_F(ThreadingTest, TaskRunner) {
const int TasksCnt = 100;
// This should be const, but MSVC does not allow to use const vars in lambdas
// without capture. On the other hand, clang gives a warning that capture of
// const var is not required.
// Making it non-const makes both compilers happy.
int IncrementsPerTask = 1000;
std::mutex Mutex;
int Counter(0); /* GUARDED_BY(Mutex) */
{
AsyncTaskRunner Tasks;
auto scheduleIncrements = [&]() {
for (int TaskI = 0; TaskI < TasksCnt; ++TaskI) {
Tasks.runAsync([&Counter, &Mutex, IncrementsPerTask]() {
for (int Increment = 0; Increment < IncrementsPerTask; ++Increment) {
std::lock_guard<std::mutex> Lock(Mutex);
++Counter;
}
});
}
};
{
// Make sure runAsync is not running tasks synchronously on the same
// thread by locking the Mutex used for increments.
std::lock_guard<std::mutex> Lock(Mutex);
scheduleIncrements();
}
Tasks.waitForAll();
{
std::lock_guard<std::mutex> Lock(Mutex);
ASSERT_EQ(Counter, TasksCnt * IncrementsPerTask);
}
{
std::lock_guard<std::mutex> Lock(Mutex);
Counter = 0;
scheduleIncrements();
}
}
// Check that destructor has waited for tasks to finish.
std::lock_guard<std::mutex> Lock(Mutex);
ASSERT_EQ(Counter, TasksCnt * IncrementsPerTask);
}
} // namespace clangd
} // namespace clang