From 3693f5941ab6ecefd173c94c6be2ae56a51acdf7 Mon Sep 17 00:00:00 2001 From: Ilya Biryukov Date: Tue, 6 Feb 2018 19:22:40 +0000 Subject: [PATCH] Revert "[clangd] The new threading implementation" (r324356) And the follow-up changes r324361 and r324363. These changes seem to break two buildbots: - http://lab.llvm.org:8011/builders/clang-atom-d525-fedora-rel/builds/14091 - http://lab.llvm.org:8011/builders/clang-x86_64-linux-selfhost-modules-2/builds/16001 We will need to investigate what went wrong and resubmit the changes afterwards. llvm-svn: 324386 --- clang-tools-extra/clangd/CMakeLists.txt | 1 + clang-tools-extra/clangd/ClangdServer.h | 1 + clang-tools-extra/clangd/ClangdUnit.h | 4 - clang-tools-extra/clangd/ClangdUnitStore.cpp | 37 ++ clang-tools-extra/clangd/ClangdUnitStore.h | 73 +++ clang-tools-extra/clangd/TUScheduler.cpp | 418 +++--------------- clang-tools-extra/clangd/TUScheduler.h | 17 +- clang-tools-extra/clangd/Threading.cpp | 102 ++--- clang-tools-extra/clangd/Threading.h | 101 +++-- .../unittests/clangd/CMakeLists.txt | 1 - .../unittests/clangd/ThreadingTests.cpp | 65 --- 11 files changed, 283 insertions(+), 537 deletions(-) create mode 100644 clang-tools-extra/clangd/ClangdUnitStore.cpp create mode 100644 clang-tools-extra/clangd/ClangdUnitStore.h delete mode 100644 clang-tools-extra/unittests/clangd/ThreadingTests.cpp diff --git a/clang-tools-extra/clangd/CMakeLists.txt b/clang-tools-extra/clangd/CMakeLists.txt index 67530270ebb3..9c424391dd9b 100644 --- a/clang-tools-extra/clangd/CMakeLists.txt +++ b/clang-tools-extra/clangd/CMakeLists.txt @@ -6,6 +6,7 @@ add_clang_library(clangDaemon ClangdLSPServer.cpp ClangdServer.cpp ClangdUnit.cpp + ClangdUnitStore.cpp CodeComplete.cpp CodeCompletionStrings.cpp CompileArgsCache.cpp diff --git a/clang-tools-extra/clangd/ClangdServer.h b/clang-tools-extra/clangd/ClangdServer.h index fffd46f15cb3..79dcf278457c 100644 --- a/clang-tools-extra/clangd/ClangdServer.h +++ b/clang-tools-extra/clangd/ClangdServer.h @@ -11,6 +11,7 @@ #define LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDSERVER_H #include "ClangdUnit.h" +#include "ClangdUnitStore.h" #include "CodeComplete.h" #include "CompileArgsCache.h" #include "DraftStore.h" diff --git a/clang-tools-extra/clangd/ClangdUnit.h b/clang-tools-extra/clangd/ClangdUnit.h index 12228bcfc7cb..bf1aced1009c 100644 --- a/clang-tools-extra/clangd/ClangdUnit.h +++ b/clang-tools-extra/clangd/ClangdUnit.h @@ -151,8 +151,6 @@ using ASTParsedCallback = std::function; /// Manages resources, required by clangd. Allows to rebuild file with new /// contents, and provides AST and Preamble for it. -/// NOTE: Threading-related bits of CppFile are now deprecated and will be -/// removed soon. class CppFile : public std::enable_shared_from_this { public: // We only allow to create CppFile as shared_ptr, because a future returned by @@ -180,7 +178,6 @@ public: /// that will wait for any ongoing rebuilds to finish and actually set the AST /// and Preamble to nulls. It can be run on a different thread. This function /// is useful to cancel ongoing rebuilds, if any, before removing CppFile. - /// DEPRECATED. This function will be removed soon, please do not use it. UniqueFunction deferCancelRebuild(); /// Rebuild AST and Preamble synchronously on the calling thread. @@ -203,7 +200,6 @@ public: /// The future to finish rebuild returns a list of diagnostics built during /// reparse, or None, if another deferRebuild was called before this /// rebuild was finished. - /// DEPRECATED. This function will be removed soon, please do not use it. UniqueFunction>()> deferRebuild(ParseInputs &&Inputs); diff --git a/clang-tools-extra/clangd/ClangdUnitStore.cpp b/clang-tools-extra/clangd/ClangdUnitStore.cpp new file mode 100644 index 000000000000..bc2479d669e2 --- /dev/null +++ b/clang-tools-extra/clangd/ClangdUnitStore.cpp @@ -0,0 +1,37 @@ +//===--- ClangdUnitStore.cpp - A ClangdUnits container -----------*-C++-*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// + +#include "ClangdUnitStore.h" +#include "llvm/Support/Path.h" +#include + +using namespace clang::clangd; +using namespace clang; + +std::shared_ptr CppFileCollection::removeIfPresent(PathRef File) { + std::lock_guard Lock(Mutex); + + auto It = OpenedFiles.find(File); + if (It == OpenedFiles.end()) + return nullptr; + + std::shared_ptr Result = It->second; + OpenedFiles.erase(It); + return Result; +} +std::vector> +CppFileCollection::getUsedBytesPerFile() const { + std::lock_guard Lock(Mutex); + std::vector> Result; + Result.reserve(OpenedFiles.size()); + for (auto &&PathAndFile : OpenedFiles) + Result.push_back( + {PathAndFile.first().str(), PathAndFile.second->getUsedBytes()}); + return Result; +} diff --git a/clang-tools-extra/clangd/ClangdUnitStore.h b/clang-tools-extra/clangd/ClangdUnitStore.h new file mode 100644 index 000000000000..6ec03023299f --- /dev/null +++ b/clang-tools-extra/clangd/ClangdUnitStore.h @@ -0,0 +1,73 @@ +//===--- ClangdUnitStore.h - A container of CppFiles -------------*-C++-*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===---------------------------------------------------------------------===// + +#ifndef LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDUNITSTORE_H +#define LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDUNITSTORE_H + +#include "ClangdUnit.h" +#include "GlobalCompilationDatabase.h" +#include "Logger.h" +#include "Path.h" +#include "clang/Tooling/CompilationDatabase.h" +#include + +namespace clang { +namespace clangd { + +class Logger; + +/// Thread-safe mapping from FileNames to CppFile. +class CppFileCollection { +public: + /// \p ASTCallback is called when a file is parsed synchronously. This should + /// not be expensive since it blocks diagnostics. + explicit CppFileCollection(bool StorePreamblesInMemory, + std::shared_ptr PCHs, + ASTParsedCallback ASTCallback) + : ASTCallback(std::move(ASTCallback)), PCHs(std::move(PCHs)), + StorePreamblesInMemory(StorePreamblesInMemory) {} + + std::shared_ptr getOrCreateFile(PathRef File) { + std::lock_guard Lock(Mutex); + auto It = OpenedFiles.find(File); + if (It == OpenedFiles.end()) { + It = OpenedFiles + .try_emplace(File, CppFile::Create(File, StorePreamblesInMemory, + PCHs, ASTCallback)) + .first; + } + return It->second; + } + + std::shared_ptr getFile(PathRef File) const { + std::lock_guard Lock(Mutex); + auto It = OpenedFiles.find(File); + if (It == OpenedFiles.end()) + return nullptr; + return It->second; + } + + /// Removes a CppFile, stored for \p File, if it's inside collection and + /// returns it. + std::shared_ptr removeIfPresent(PathRef File); + + /// Gets used memory for each of the stored files. + std::vector> getUsedBytesPerFile() const; + +private: + mutable std::mutex Mutex; + llvm::StringMap> OpenedFiles; + ASTParsedCallback ASTCallback; + std::shared_ptr PCHs; + bool StorePreamblesInMemory; +}; +} // namespace clangd +} // namespace clang + +#endif diff --git a/clang-tools-extra/clangd/TUScheduler.cpp b/clang-tools-extra/clangd/TUScheduler.cpp index e2393658ba01..4c18dcdab6be 100644 --- a/clang-tools-extra/clangd/TUScheduler.cpp +++ b/clang-tools-extra/clangd/TUScheduler.cpp @@ -1,303 +1,9 @@ -//===--- TUScheduler.cpp -----------------------------------------*-C++-*-===// -// -// The LLVM Compiler Infrastructure -// -// This file is distributed under the University of Illinois Open Source -// License. See LICENSE.TXT for details. -// -//===----------------------------------------------------------------------===// -// For each file, managed by TUScheduler, we create a single ASTWorker that -// manages an AST for that file. All operations that modify or read the AST are -// run on a separate dedicated thread asynchronously in FIFO order. -// -// We start processing each update immediately after we receive it. If two or -// more updates come subsequently without reads in-between, we attempt to drop -// an older one to not waste time building the ASTs we don't need. -// -// The processing thread of the ASTWorker is also responsible for building the -// preamble. However, unlike AST, the same preamble can be read concurrently, so -// we run each of async preamble reads on its own thread. -// -// To limit the concurrent load that clangd produces we mantain a semaphore that -// keeps more than a fixed number of threads from running concurrently. -// -// Rationale for cancelling updates. -// LSP clients can send updates to clangd on each keystroke. Some files take -// significant time to parse (e.g. a few seconds) and clangd can get starved by -// the updates to those files. Therefore we try to process only the last update, -// if possible. -// Our current strategy to do that is the following: -// - For each update we immediately schedule rebuild of the AST. -// - Rebuild of the AST checks if it was cancelled before doing any actual work. -// If it was, it does not do an actual rebuild, only reports llvm::None to the -// callback -// - When adding an update, we cancel the last update in the queue if it didn't -// have any reads. -// There is probably a optimal ways to do that. One approach we might take is -// the following: -// - For each update we remember the pending inputs, but delay rebuild of the -// AST for some timeout. -// - If subsequent updates come before rebuild was started, we replace the -// pending inputs and reset the timer. -// - If any reads of the AST are scheduled, we start building the AST -// immediately. - #include "TUScheduler.h" #include "clang/Frontend/PCHContainerOperations.h" #include "llvm/Support/Errc.h" -#include -#include namespace clang { namespace clangd { -namespace { -class ASTWorkerHandle; - -/// Owns one instance of the AST, schedules updates and reads of it. -/// Also responsible for building and providing access to the preamble. -/// Each ASTWorker processes the async requests sent to it on a separate -/// dedicated thread. -/// The ASTWorker that manages the AST is shared by both the processing thread -/// and the TUScheduler. The TUScheduler should discard an ASTWorker when -/// remove() is called, but its thread may be busy and we don't want to block. -/// So the workers are accessed via an ASTWorkerHandle. Destroying the handle -/// signals the worker to exit its run loop and gives up shared ownership of the -/// worker. -class ASTWorker { - friend class ASTWorkerHandle; - ASTWorker(Semaphore &Barrier, std::shared_ptr AST, bool RunSync); - -public: - /// Create a new ASTWorker and return a handle to it. - /// The processing thread is spawned using \p Tasks. However, when \p Tasks - /// is null, all requests will be processed on the calling thread - /// synchronously instead. \p Barrier is acquired when processing each - /// request, it is be used to limit the number of actively running threads. - static ASTWorkerHandle Create(AsyncTaskRunner *Tasks, Semaphore &Barrier, - std::shared_ptr AST); - ~ASTWorker(); - - void update(ParseInputs Inputs, - UniqueFunction>)> - OnUpdated); - void runWithAST(UniqueFunction)> Action); - - std::shared_ptr getPossiblyStalePreamble() const; - std::size_t getUsedBytes() const; - -private: - // Must be called exactly once on processing thread. Will return after - // stop() is called on a separate thread and all pending requests are - // processed. - void run(); - /// Signal that run() should finish processing pending requests and exit. - void stop(); - /// Adds a new task to the end of the request queue. - void startTask(UniqueFunction Task, bool isUpdate, - llvm::Optional CF); - - using RequestWithCtx = std::pair, Context>; - - const bool RunSync; - Semaphore &Barrier; - // AST and FileInputs are only accessed on the processing thread from run(). - const std::shared_ptr AST; - // Inputs, corresponding to the current state of AST. - ParseInputs FileInputs; - // Guards members used by both TUScheduler and the worker thread. - mutable std::mutex Mutex; - // Set to true to signal run() to finish processing. - bool Done; /* GUARDED_BY(Mutex) */ - std::queue Requests; /* GUARDED_BY(Mutex) */ - // Only set when last request is an update. This allows us to cancel an update - // that was never read, if a subsequent update comes in. - llvm::Optional LastUpdateCF; /* GUARDED_BY(Mutex) */ - std::condition_variable RequestsCV; -}; - -/// A smart-pointer-like class that points to an active ASTWorker. -/// In destructor, signals to the underlying ASTWorker that no new requests will -/// be sent and the processing loop may exit (after running all pending -/// requests). -class ASTWorkerHandle { - friend class ASTWorker; - ASTWorkerHandle(std::shared_ptr Worker) - : Worker(std::move(Worker)) { - assert(this->Worker); - } - -public: - ASTWorkerHandle(const ASTWorkerHandle &) = delete; - ASTWorkerHandle &operator=(const ASTWorkerHandle &) = delete; - ASTWorkerHandle(ASTWorkerHandle &&) = default; - ASTWorkerHandle &operator=(ASTWorkerHandle &&) = default; - - ~ASTWorkerHandle() { - if (Worker) - Worker->stop(); - } - - ASTWorker &operator*() { - assert(Worker && "Handle was moved from"); - return *Worker; - } - - ASTWorker *operator->() { - assert(Worker && "Handle was moved from"); - return Worker.get(); - } - - /// Returns an owning reference to the underlying ASTWorker that can outlive - /// the ASTWorkerHandle. However, no new requests to an active ASTWorker can - /// be schedule via the returned reference, i.e. only reads of the preamble - /// are possible. - std::shared_ptr lock() { return Worker; } - -private: - std::shared_ptr Worker; -}; - -ASTWorkerHandle ASTWorker::Create(AsyncTaskRunner *Tasks, Semaphore &Barrier, - std::shared_ptr AST) { - std::shared_ptr Worker( - new ASTWorker(Barrier, std::move(AST), /*RunSync=*/!Tasks)); - if (Tasks) - Tasks->runAsync([Worker]() { Worker->run(); }); - - return ASTWorkerHandle(std::move(Worker)); -} - -ASTWorker::ASTWorker(Semaphore &Barrier, std::shared_ptr AST, - bool RunSync) - : RunSync(RunSync), Barrier(Barrier), AST(std::move(AST)), Done(false) { - if (RunSync) - return; -} - -ASTWorker::~ASTWorker() { -#ifndef NDEBUG - std::lock_guard Lock(Mutex); - assert(Done && "handle was not destroyed"); - assert(Requests.empty() && "unprocessed requests when destroying ASTWorker"); -#endif -} - -void ASTWorker::update( - ParseInputs Inputs, - UniqueFunction>)> - OnUpdated) { - auto Task = [=](CancellationFlag CF, decltype(OnUpdated) OnUpdated) mutable { - if (CF.isCancelled()) { - OnUpdated(llvm::None); - return; - } - FileInputs = Inputs; - auto Diags = AST->rebuild(std::move(Inputs)); - // We want to report the diagnostics even if this update was cancelled. - // It seems more useful than making the clients wait indefinitely if they - // spam us with updates. - OnUpdated(std::move(Diags)); - }; - - CancellationFlag UpdateCF; - startTask(BindWithForward(Task, UpdateCF, std::move(OnUpdated)), - /*isUpdate=*/true, UpdateCF); -} - -void ASTWorker::runWithAST( - UniqueFunction)> Action) { - auto Task = [=](decltype(Action) Action) { - auto ASTWrapper = this->AST->getAST().get(); - // FIXME: no need to lock here, cleanup the CppFile interface to get rid of - // them. - ASTWrapper->runUnderLock([&](ParsedAST *AST) { - if (!AST) { - Action(llvm::make_error( - "invalid AST", llvm::errc::invalid_argument)); - return; - } - Action(InputsAndAST{FileInputs, *AST}); - }); - }; - - startTask(BindWithForward(Task, std::move(Action)), /*isUpdate=*/false, - llvm::None); -} - -std::shared_ptr -ASTWorker::getPossiblyStalePreamble() const { - return AST->getPossiblyStalePreamble(); -} - -std::size_t ASTWorker::getUsedBytes() const { - // FIXME(ibiryukov): we'll need to take locks here after we remove - // thread-safety from CppFile. For now, CppFile is thread-safe and we can - // safely call methods on it without acquiring a lock. - return AST->getUsedBytes(); -} - -void ASTWorker::stop() { - { - std::lock_guard Lock(Mutex); - assert(!Done && "stop() called twice"); - Done = true; - } - RequestsCV.notify_one(); -} - -void ASTWorker::startTask(UniqueFunction Task, bool isUpdate, - llvm::Optional CF) { - assert(isUpdate == CF.hasValue() && - "Only updates are expected to pass CancellationFlag"); - - if (RunSync) { - assert(!Done && "running a task after stop()"); - Task(); - return; - } - - { - std::lock_guard Lock(Mutex); - assert(!Done && "running a task after stop()"); - if (isUpdate) { - if (!Requests.empty() && LastUpdateCF) { - // There were no reads for the last unprocessed update, let's cancel it - // to not waste time on it. - LastUpdateCF->cancel(); - } - LastUpdateCF = std::move(*CF); - } else { - LastUpdateCF = llvm::None; - } - Requests.emplace(std::move(Task), Context::current().clone()); - } // unlock Mutex. - RequestsCV.notify_one(); -} - -void ASTWorker::run() { - while (true) { - RequestWithCtx Req; - { - std::unique_lock Lock(Mutex); - RequestsCV.wait(Lock, [&]() { return Done || !Requests.empty(); }); - if (Requests.empty()) { - assert(Done); - return; - } - // Even when Done is true, we finish processing all pending requests - // before exiting the processing loop. - - Req = std::move(Requests.front()); - Requests.pop(); - } // unlock Mutex - - std::lock_guard BarrierLock(Barrier); - WithContext Guard(std::move(Req.second)); - Req.first(); - } -} -} // namespace - unsigned getDefaultAsyncThreadsCount() { unsigned HardwareConcurrency = std::thread::hardware_concurrency(); // C++ standard says that hardware_concurrency() @@ -308,114 +14,110 @@ unsigned getDefaultAsyncThreadsCount() { return HardwareConcurrency; } -struct TUScheduler::FileData { - /// Latest inputs, passed to TUScheduler::update(). - ParseInputs Inputs; - ASTWorkerHandle Worker; -}; - TUScheduler::TUScheduler(unsigned AsyncThreadsCount, bool StorePreamblesInMemory, ASTParsedCallback ASTCallback) - : StorePreamblesInMemory(StorePreamblesInMemory), - PCHOps(std::make_shared()), - ASTCallback(std::move(ASTCallback)), Barrier(AsyncThreadsCount) { - if (0 < AsyncThreadsCount) - Tasks.emplace(); -} -TUScheduler::~TUScheduler() { - // Notify all workers that they need to stop. - Files.clear(); - - // Wait for all in-flight tasks to finish. - if (Tasks) - Tasks->waitForAll(); -} + : Files(StorePreamblesInMemory, std::make_shared(), + std::move(ASTCallback)), + Threads(AsyncThreadsCount) {} void TUScheduler::update( PathRef File, ParseInputs Inputs, UniqueFunction>)> OnUpdated) { - std::unique_ptr &FD = Files[File]; - if (!FD) { - // Create a new worker to process the AST-related tasks. - ASTWorkerHandle Worker = ASTWorker::Create( - Tasks ? Tasks.getPointer() : nullptr, Barrier, - CppFile::Create(File, StorePreamblesInMemory, PCHOps, ASTCallback)); - FD = std::unique_ptr(new FileData{Inputs, std::move(Worker)}); - } else { - FD->Inputs = Inputs; - } - FD->Worker->update(std::move(Inputs), std::move(OnUpdated)); + CachedInputs[File] = Inputs; + + auto Resources = Files.getOrCreateFile(File); + auto DeferredRebuild = Resources->deferRebuild(std::move(Inputs)); + + Threads.addToFront( + [](decltype(OnUpdated) OnUpdated, + decltype(DeferredRebuild) DeferredRebuild) { + auto Diags = DeferredRebuild(); + OnUpdated(Diags); + }, + std::move(OnUpdated), std::move(DeferredRebuild)); } void TUScheduler::remove(PathRef File, UniqueFunction Action) { - auto It = Files.find(File); - if (It == Files.end()) { + CachedInputs.erase(File); + + auto Resources = Files.removeIfPresent(File); + if (!Resources) { Action(llvm::make_error( "trying to remove non-added document", llvm::errc::invalid_argument)); return; } - Files.erase(It); + + auto DeferredCancel = Resources->deferCancelRebuild(); + Threads.addToFront( + [](decltype(Action) Action, decltype(DeferredCancel) DeferredCancel) { + DeferredCancel(); + Action(llvm::Error::success()); + }, + std::move(Action), std::move(DeferredCancel)); } void TUScheduler::runWithAST( PathRef File, UniqueFunction)> Action) { - auto It = Files.find(File); - if (It == Files.end()) { + auto Resources = Files.getFile(File); + if (!Resources) { Action(llvm::make_error( "trying to get AST for non-added document", llvm::errc::invalid_argument)); return; } - It->second->Worker->runWithAST(std::move(Action)); + const ParseInputs &Inputs = getInputs(File); + // We currently block the calling thread until AST is available and run the + // action on the calling thread to avoid inconsistent states coming from + // subsequent updates. + // FIXME(ibiryukov): this should be moved to the worker threads. + Resources->getAST().get()->runUnderLock([&](ParsedAST *AST) { + if (AST) + Action(InputsAndAST{Inputs, *AST}); + else + Action(llvm::make_error( + "Could not build AST for the latest file update", + llvm::errc::invalid_argument)); + }); } void TUScheduler::runWithPreamble( PathRef File, UniqueFunction)> Action) { - auto It = Files.find(File); - if (It == Files.end()) { + std::shared_ptr Resources = Files.getFile(File); + if (!Resources) { Action(llvm::make_error( "trying to get preamble for non-added document", llvm::errc::invalid_argument)); return; } - if (!Tasks) { - std::shared_ptr Preamble = - It->second->Worker->getPossiblyStalePreamble(); - Action(InputsAndPreamble{It->second->Inputs, Preamble.get()}); - return; - } + const ParseInputs &Inputs = getInputs(File); + std::shared_ptr Preamble = + Resources->getPossiblyStalePreamble(); + Threads.addToFront( + [Resources, Preamble, Inputs](decltype(Action) Action) mutable { + if (!Preamble) + Preamble = Resources->getPossiblyStalePreamble(); - ParseInputs InputsCopy = It->second->Inputs; - std::shared_ptr Worker = It->second->Worker.lock(); - auto Task = [InputsCopy, Worker, this](Context Ctx, - decltype(Action) Action) mutable { - std::lock_guard BarrierLock(Barrier); - WithContext Guard(std::move(Ctx)); - std::shared_ptr Preamble = - Worker->getPossiblyStalePreamble(); - Action(InputsAndPreamble{InputsCopy, Preamble.get()}); - }; + Action(InputsAndPreamble{Inputs, Preamble.get()}); + }, + std::move(Action)); +} - Tasks->runAsync( - BindWithForward(Task, Context::current().clone(), std::move(Action))); +const ParseInputs &TUScheduler::getInputs(PathRef File) { + auto It = CachedInputs.find(File); + assert(It != CachedInputs.end()); + return It->second; } std::vector> TUScheduler::getUsedBytesPerFile() const { - std::vector> Result; - Result.reserve(Files.size()); - for (auto &&PathAndFile : Files) - Result.push_back( - {PathAndFile.first(), PathAndFile.second->Worker->getUsedBytes()}); - return Result; + return Files.getUsedBytesPerFile(); } - } // namespace clangd } // namespace clang diff --git a/clang-tools-extra/clangd/TUScheduler.h b/clang-tools-extra/clangd/TUScheduler.h index 41562ebff452..c7df8c4dba1b 100644 --- a/clang-tools-extra/clangd/TUScheduler.h +++ b/clang-tools-extra/clangd/TUScheduler.h @@ -11,9 +11,9 @@ #define LLVM_CLANG_TOOLS_EXTRA_CLANGD_TUSCHEDULER_H #include "ClangdUnit.h" +#include "ClangdUnitStore.h" #include "Function.h" #include "Threading.h" -#include "llvm/ADT/StringMap.h" namespace clang { namespace clangd { @@ -42,7 +42,6 @@ class TUScheduler { public: TUScheduler(unsigned AsyncThreadsCount, bool StorePreamblesInMemory, ASTParsedCallback ASTCallback); - ~TUScheduler(); /// Returns estimated memory usage for each of the currently open files. /// The order of results is unspecified. @@ -82,17 +81,11 @@ public: UniqueFunction)> Action); private: - /// This class stores per-file data in the Files map. - struct FileData; + const ParseInputs &getInputs(PathRef File); - const bool StorePreamblesInMemory; - const std::shared_ptr PCHOps; - const ASTParsedCallback ASTCallback; - Semaphore Barrier; - llvm::StringMap> Files; - // None when running tasks synchronously and non-None when running tasks - // asynchronously. - llvm::Optional Tasks; + llvm::StringMap CachedInputs; + CppFileCollection Files; + ThreadPool Threads; }; } // namespace clangd } // namespace clang diff --git a/clang-tools-extra/clangd/Threading.cpp b/clang-tools-extra/clangd/Threading.cpp index b067758a16aa..3c0c74bb803c 100644 --- a/clang-tools-extra/clangd/Threading.cpp +++ b/clang-tools-extra/clangd/Threading.cpp @@ -1,63 +1,63 @@ #include "Threading.h" -#include "llvm/ADT/ScopeExit.h" #include "llvm/Support/FormatVariadic.h" #include "llvm/Support/Threading.h" -#include namespace clang { namespace clangd { - -CancellationFlag::CancellationFlag() - : WasCancelled(std::make_shared>(false)) {} - -Semaphore::Semaphore(std::size_t MaxLocks) : FreeSlots(MaxLocks) {} - -void Semaphore::lock() { - std::unique_lock Lock(Mutex); - SlotsChanged.wait(Lock, [&]() { return FreeSlots > 0; }); - --FreeSlots; -} - -void Semaphore::unlock() { - std::unique_lock Lock(Mutex); - ++FreeSlots; - Lock.unlock(); - - SlotsChanged.notify_one(); -} - -AsyncTaskRunner::~AsyncTaskRunner() { waitForAll(); } - -void AsyncTaskRunner::waitForAll() { - std::unique_lock Lock(Mutex); - TasksReachedZero.wait(Lock, [&]() { return InFlightTasks == 0; }); -} - -void AsyncTaskRunner::runAsync(UniqueFunction Action) { - { - std::unique_lock Lock(Mutex); - ++InFlightTasks; +ThreadPool::ThreadPool(unsigned AsyncThreadsCount) + : RunSynchronously(AsyncThreadsCount == 0) { + if (RunSynchronously) { + // Don't start the worker thread if we're running synchronously + return; } - auto CleanupTask = llvm::make_scope_exit([this]() { - std::lock_guard Lock(Mutex); - int NewTasksCnt = --InFlightTasks; - if (NewTasksCnt == 0) { - // Note: we can't unlock here because we don't want the object to be - // destroyed before we notify. - TasksReachedZero.notify_one(); - } - }); + Workers.reserve(AsyncThreadsCount); + for (unsigned I = 0; I < AsyncThreadsCount; ++I) { + Workers.push_back(std::thread([this, I]() { + llvm::set_thread_name(llvm::formatv("scheduler/{0}", I)); + while (true) { + UniqueFunction Request; + Context Ctx; - std::thread( - [](decltype(Action) Action, decltype(CleanupTask)) { - Action(); - // Make sure function stored by Action is destroyed before CleanupTask - // is run. - Action = nullptr; - }, - std::move(Action), std::move(CleanupTask)) - .detach(); + // Pick request from the queue + { + std::unique_lock Lock(Mutex); + // Wait for more requests. + RequestCV.wait(Lock, + [this] { return !RequestQueue.empty() || Done; }); + if (RequestQueue.empty()) { + assert(Done); + return; + } + + // We process requests starting from the front of the queue. Users of + // ThreadPool have a way to prioritise their requests by putting + // them to the either side of the queue (using either addToEnd or + // addToFront). + std::tie(Request, Ctx) = std::move(RequestQueue.front()); + RequestQueue.pop_front(); + } // unlock Mutex + + WithContext WithCtx(std::move(Ctx)); + Request(); + } + })); + } +} + +ThreadPool::~ThreadPool() { + if (RunSynchronously) + return; // no worker thread is running in that case + + { + std::lock_guard Lock(Mutex); + // Wake up the worker thread + Done = true; + } // unlock Mutex + RequestCV.notify_all(); + + for (auto &Worker : Workers) + Worker.join(); } } // namespace clangd } // namespace clang diff --git a/clang-tools-extra/clangd/Threading.h b/clang-tools-extra/clangd/Threading.h index a24eed7bc5b1..123d17964ef3 100644 --- a/clang-tools-extra/clangd/Threading.h +++ b/clang-tools-extra/clangd/Threading.h @@ -12,65 +12,74 @@ #include "Context.h" #include "Function.h" -#include -#include #include -#include +#include #include +#include #include namespace clang { namespace clangd { - -/// A shared boolean flag indicating if the computation was cancelled. -/// Once cancelled, cannot be returned to the previous state. -class CancellationFlag { +/// A simple fixed-size thread pool implementation. +class ThreadPool { public: - CancellationFlag(); + /// If \p AsyncThreadsCount is 0, requests added using addToFront and addToEnd + /// will be processed synchronously on the calling thread. + // Otherwise, \p AsyncThreadsCount threads will be created to schedule the + // requests. + ThreadPool(unsigned AsyncThreadsCount); + /// Destructor blocks until all requests are processed and worker threads are + /// terminated. + ~ThreadPool(); - void cancel() { - assert(WasCancelled && "the object was moved"); - WasCancelled->store(true); + /// Add a new request to run function \p F with args \p As to the start of the + /// queue. The request will be run on a separate thread. + template + void addToFront(Func &&F, Args &&... As) { + if (RunSynchronously) { + std::forward(F)(std::forward(As)...); + return; + } + + { + std::lock_guard Lock(Mutex); + RequestQueue.emplace_front( + BindWithForward(std::forward(F), std::forward(As)...), + Context::current().clone()); + } + RequestCV.notify_one(); } - bool isCancelled() const { - assert(WasCancelled && "the object was moved"); - return WasCancelled->load(); + /// Add a new request to run function \p F with args \p As to the end of the + /// queue. The request will be run on a separate thread. + template void addToEnd(Func &&F, Args &&... As) { + if (RunSynchronously) { + std::forward(F)(std::forward(As)...); + return; + } + + { + std::lock_guard Lock(Mutex); + RequestQueue.emplace_back( + BindWithForward(std::forward(F), std::forward(As)...), + Context::current().clone()); + } + RequestCV.notify_one(); } private: - std::shared_ptr> WasCancelled; -}; - -/// Limits the number of threads that can acquire the lock at the same time. -class Semaphore { -public: - Semaphore(std::size_t MaxLocks); - - void lock(); - void unlock(); - -private: - std::mutex Mutex; - std::condition_variable SlotsChanged; - std::size_t FreeSlots; -}; - -/// Runs tasks on separate (detached) threads and wait for all tasks to finish. -/// Objects that need to spawn threads can own an AsyncTaskRunner to ensure they -/// all complete on destruction. -class AsyncTaskRunner { -public: - /// Destructor waits for all pending tasks to finish. - ~AsyncTaskRunner(); - - void waitForAll(); - void runAsync(UniqueFunction Action); - -private: - std::mutex Mutex; - std::condition_variable TasksReachedZero; - std::size_t InFlightTasks = 0; + bool RunSynchronously; + mutable std::mutex Mutex; + /// We run some tasks on separate threads(parsing, CppFile cleanup). + /// These threads looks into RequestQueue to find requests to handle and + /// terminate when Done is set to true. + std::vector Workers; + /// Setting Done to true will make the worker threads terminate. + bool Done = false; + /// A queue of requests. + std::deque, Context>> RequestQueue; + /// Condition variable to wake up worker threads. + std::condition_variable RequestCV; }; } // namespace clangd } // namespace clang diff --git a/clang-tools-extra/unittests/clangd/CMakeLists.txt b/clang-tools-extra/unittests/clangd/CMakeLists.txt index c0cba6c817f2..8f6125e192d1 100644 --- a/clang-tools-extra/unittests/clangd/CMakeLists.txt +++ b/clang-tools-extra/unittests/clangd/CMakeLists.txt @@ -21,7 +21,6 @@ add_extra_unittest(ClangdTests JSONExprTests.cpp URITests.cpp TestFS.cpp - ThreadingTests.cpp TraceTests.cpp TUSchedulerTests.cpp SourceCodeTests.cpp diff --git a/clang-tools-extra/unittests/clangd/ThreadingTests.cpp b/clang-tools-extra/unittests/clangd/ThreadingTests.cpp deleted file mode 100644 index 84e6512fe43a..000000000000 --- a/clang-tools-extra/unittests/clangd/ThreadingTests.cpp +++ /dev/null @@ -1,65 +0,0 @@ -//===-- ThreadingTests.cpp --------------------------------------*- C++ -*-===// -// -// The LLVM Compiler Infrastructure -// -// This file is distributed under the University of Illinois Open Source -// License. See LICENSE.TXT for details. -// -//===----------------------------------------------------------------------===// - -#include "Threading.h" -#include "gtest/gtest.h" -#include - -namespace clang { -namespace clangd { -class ThreadingTest : public ::testing::Test {}; - -TEST_F(ThreadingTest, TaskRunner) { - const int TasksCnt = 100; - // This should be const, but MSVC does not allow to use const vars in lambdas - // without capture. On the other hand, clang gives a warning that capture of - // const var is not required. - // Making it non-const makes both compilers happy. - int IncrementsPerTask = 1000; - - std::mutex Mutex; - int Counter(0); /* GUARDED_BY(Mutex) */ - { - AsyncTaskRunner Tasks; - auto scheduleIncrements = [&]() { - for (int TaskI = 0; TaskI < TasksCnt; ++TaskI) { - Tasks.runAsync([&Counter, &Mutex, IncrementsPerTask]() { - for (int Increment = 0; Increment < IncrementsPerTask; ++Increment) { - std::lock_guard Lock(Mutex); - ++Counter; - } - }); - } - }; - - { - // Make sure runAsync is not running tasks synchronously on the same - // thread by locking the Mutex used for increments. - std::lock_guard Lock(Mutex); - scheduleIncrements(); - } - - Tasks.waitForAll(); - { - std::lock_guard Lock(Mutex); - ASSERT_EQ(Counter, TasksCnt * IncrementsPerTask); - } - - { - std::lock_guard Lock(Mutex); - Counter = 0; - scheduleIncrements(); - } - } - // Check that destructor has waited for tasks to finish. - std::lock_guard Lock(Mutex); - ASSERT_EQ(Counter, TasksCnt * IncrementsPerTask); -} -} // namespace clangd -} // namespace clang