revert ffrt-based taskpool

Issue: https://gitee.com/openharmony/arkcompiler_ets_runtime/issues/IAPOOO
Signed-off-by: Gymee <yumeijie@huawei.com>
Change-Id: I0be3997f06b44a22a74e16185e0a0fd1320a6847
This commit is contained in:
Gymee 2024-09-08 14:01:51 +08:00
parent dab66c4a6f
commit c50c4cac25
24 changed files with 81 additions and 301 deletions

View File

@ -51,7 +51,7 @@ void DaemonThread::StartRunning()
ASSERT(thread_ == nullptr);
ASSERT(!IsRunning());
ASSERT(tasks_.empty());
GCWorkerPool::GetCurrentTaskpool()->Initialize();
Taskpool::GetCurrentTaskpool()->Initialize();
ASSERT(GetThreadId() == 0);
thread_ = std::make_unique<std::thread>([this] {this->Run();});
// Wait until daemon thread is running.
@ -78,7 +78,7 @@ void DaemonThread::WaitFinished()
CheckAndPostTask(TerminateDaemonTask(nullptr));
thread_->join();
thread_.reset();
GCWorkerPool::GetCurrentTaskpool()->Destroy(GetThreadId());
Taskpool::GetCurrentTaskpool()->Destroy(GetThreadId());
}
ASSERT(!IsInRunningState());
ASSERT(!IsRunning());

View File

@ -22,9 +22,9 @@ void EcmaStringTableCleaner::PostSweepWeakRefTask(const WeakRootVisitor &visitor
{
StartSweepWeakRefTask();
iter_ = std::make_shared<std::atomic<uint32_t>>(0U);
const uint32_t postTaskCount = GCWorkerPool::GetCurrentTaskpool()->GetTotalThreadNum();
const uint32_t postTaskCount = Taskpool::GetCurrentTaskpool()->GetTotalThreadNum();
for (uint32_t i = 0U; i < postTaskCount; ++i) {
GCWorkerPool::GetCurrentTaskpool()->PostTask(std::make_unique<SweepWeakRefTask>(iter_, this, visitor));
Taskpool::GetCurrentTaskpool()->PostTask(std::make_unique<SweepWeakRefTask>(iter_, this, visitor));
}
}

View File

@ -127,7 +127,6 @@ void EcmaVM::PostFork()
RandomGenerator::InitRandom(GetAssociatedJSThread());
heap_->SetHeapMode(HeapMode::SHARE);
GetAssociatedJSThread()->PostFork();
GCWorkerPool::GetCurrentTaskpool()->Initialize();
Taskpool::GetCurrentTaskpool()->Initialize();
SetPostForked(true);
LOG_ECMA(INFO) << "multi-thread check enabled: " << GetThreadCheckStatus();
@ -250,7 +249,6 @@ bool EcmaVM::Initialize()
ECMA_BYTRACE_NAME(HITRACE_TAG_ARK, "EcmaVM::Initialize");
stringTable_ = Runtime::GetInstance()->GetEcmaStringTable();
InitializePGOProfiler();
GCWorkerPool::GetCurrentTaskpool()->Initialize();
Taskpool::GetCurrentTaskpool()->Initialize();
#ifndef PANDA_TARGET_WINDOWS
RuntimeStubs::Initialize(thread_);
@ -335,7 +333,6 @@ EcmaVM::~EcmaVM()
// clear c_address: c++ pointer delete
ClearBufferData();
heap_->WaitAllTasksFinished();
GCWorkerPool::GetCurrentTaskpool()->Destroy(thread_->GetThreadId());
Taskpool::GetCurrentTaskpool()->Destroy(thread_->GetThreadId());
if (pgoProfiler_ != nullptr) {
@ -450,13 +447,12 @@ JSHandle<GlobalEnv> EcmaVM::GetGlobalEnv() const
void EcmaVM::CheckThread() const
{
// Exclude GC thread
if (thread_ == nullptr) {
LOG_FULL(FATAL) << "Fatal: ecma_vm has been destructed! vm address is: " << this;
UNREACHABLE();
}
// Exclude the threads in GCWorkerPool and Taskpool
if (!(GCWorkerPool::GetCurrentTaskpool()->IsDaemonThreadOrInThreadPool() ||
Taskpool::GetCurrentTaskpool()->IsInThreadPool()) &&
if (!Taskpool::GetCurrentTaskpool()->IsDaemonThreadOrInThreadPool(std::this_thread::get_id()) &&
thread_->GetThreadId() != JSThread::GetCurrentThreadId() && !thread_->IsCrossThreadExecutionEnable()) {
LOG_FULL(FATAL) << "Fatal: ecma_vm cannot run in multi-thread!"
<< " thread:" << thread_->GetThreadId()

View File

@ -28,7 +28,7 @@
#include "ecmascript/mem/heap_region_allocator.h"
#include "ecmascript/napi/include/dfx_jsnapi.h"
#include "ecmascript/napi/include/jsnapi.h"
#include "ecmascript/platform/mutex.h"
#include "ecmascript/taskpool/taskpool.h"
namespace panda {
class JSNApi;

View File

@ -173,7 +173,7 @@ void Jit::SetEnableOrDisable(const JSRuntimeOptions &options, bool isEnableFastJ
if (enableCodeSign && shouldCompileMain) {
JitFort::InitJitFortResource();
}
JitTaskpool::GetCurrentTaskpool()->Init(enableCodeSign && !shouldCompileMain);
JitTaskpool::GetCurrentTaskpool()->Initialize(enableCodeSign && !shouldCompileMain);
}
}
}
@ -186,7 +186,7 @@ void Jit::Destroy()
LockHolder holder(setEnableLock_);
JitTaskpool::GetCurrentTaskpool()->Finalize();
JitTaskpool::GetCurrentTaskpool()->Destroy();
initialized_ = false;
fastJitEnable_ = false;
baselineJitEnable_ = false;

View File

@ -37,7 +37,7 @@ enum RunState : uint8_t {
FINISH
};
class JitTaskpool : public ThreadedTaskpool {
class JitTaskpool : public Taskpool {
public:
PUBLIC_API static JitTaskpool *GetCurrentTaskpool();
JitTaskpool() = default;
@ -65,9 +65,9 @@ public:
}
}
void Init(bool needInitJitFort)
void Initialize(bool needInitJitFort)
{
ThreadedTaskpool::InitializeWithHooks(0, [needInitJitFort](os::thread::native_handle_type thread) {
Taskpool::Initialize(0, [needInitJitFort](os::thread::native_handle_type thread) {
os::thread::SetThreadName(thread, "OS_JIT_Thread");
constexpr int32_t priorityVal = 5; // 5: The priority can be set within range [-20, 19]
os::thread::SetPriority(os::thread::GetCurrentThreadId(), priorityVal);
@ -82,9 +82,9 @@ public:
});
}
void Finalize()
void Destroy()
{
ThreadedTaskpool::Destroy(threadId_);
Taskpool::Destroy(threadId_);
}
private:

View File

@ -48,7 +48,7 @@ public:
static bool TryIncreaseTaskCounts()
{
size_t taskPoolSize = GCWorkerPool::GetCurrentTaskpool()->GetTotalThreadNum();
size_t taskPoolSize = Taskpool::GetCurrentTaskpool()->GetTotalThreadNum();
{
LockHolder holder(taskCountMutex_);
// total counts of running concurrent mark tasks should be less than taskPoolSize

View File

@ -29,12 +29,12 @@ void ConcurrentSweeper::PostTask(bool fullGC)
{
if (ConcurrentSweepEnabled()) {
if (!fullGC) {
GCWorkerPool::GetCurrentTaskpool()->PostTask(
Taskpool::GetCurrentTaskpool()->PostTask(
std::make_unique<SweeperTask>(heap_->GetJSThread()->GetThreadId(), this, OLD_SPACE));
}
GCWorkerPool::GetCurrentTaskpool()->PostTask(
Taskpool::GetCurrentTaskpool()->PostTask(
std::make_unique<SweeperTask>(heap_->GetJSThread()->GetThreadId(), this, NON_MOVABLE));
GCWorkerPool::GetCurrentTaskpool()->PostTask(
Taskpool::GetCurrentTaskpool()->PostTask(
std::make_unique<SweeperTask>(heap_->GetJSThread()->GetThreadId(), this, MACHINE_CODE_SPACE));
}
}

View File

@ -115,7 +115,7 @@ void FullGC::Sweep()
// process weak reference
uint32_t totalThreadCount = 1; // 1 : mainthread
if (heap_->IsParallelGCEnabled()) {
totalThreadCount += GCWorkerPool::GetCurrentTaskpool()->GetTotalThreadNum();
totalThreadCount += Taskpool::GetCurrentTaskpool()->GetTotalThreadNum();
}
for (uint32_t i = 0; i < totalThreadCount; i++) {
ProcessQueue *queue = workManager_->GetWeakReferenceQueue(i);

View File

@ -283,7 +283,7 @@ void SharedHeap::Destroy()
void SharedHeap::PostInitialization(const GlobalEnvConstants *globalEnvConstants, const JSRuntimeOptions &option)
{
globalEnvConstants_ = globalEnvConstants;
uint32_t totalThreadNum = GCWorkerPool::GetCurrentTaskpool()->GetTotalThreadNum();
uint32_t totalThreadNum = Taskpool::GetCurrentTaskpool()->GetTotalThreadNum();
maxMarkTaskCount_ = totalThreadNum - 1;
sWorkManager_ = new SharedGCWorkManager(this, totalThreadNum + 1);
sharedGCMarker_ = new SharedGCMarker(sWorkManager_);
@ -299,7 +299,7 @@ void SharedHeap::PostInitialization(const GlobalEnvConstants *globalEnvConstants
void SharedHeap::PostGCMarkingTask(SharedParallelMarkPhase sharedTaskPhase)
{
IncreaseTaskCount();
GCWorkerPool::GetCurrentTaskpool()->PostTask(std::make_unique<ParallelMarkTask>(dThread_->GetThreadId(),
Taskpool::GetCurrentTaskpool()->PostTask(std::make_unique<ParallelMarkTask>(dThread_->GetThreadId(),
this, sharedTaskPhase));
}
@ -475,7 +475,7 @@ void SharedHeap::Reclaim(TriggerGCType gcType)
if (parallelGC_) {
clearTaskFinished_ = false;
GCWorkerPool::GetCurrentTaskpool()->PostTask(
Taskpool::GetCurrentTaskpool()->PostTask(
std::make_unique<AsyncClearTask>(dThread_->GetThreadId(), this, gcType));
} else {
ReclaimRegions(gcType);
@ -511,7 +511,7 @@ void SharedHeap::DisableParallelGC(JSThread *thread)
void SharedHeap::EnableParallelGC(JSRuntimeOptions &option)
{
uint32_t totalThreadNum = GCWorkerPool::GetCurrentTaskpool()->GetTotalThreadNum();
uint32_t totalThreadNum = Taskpool::GetCurrentTaskpool()->GetTotalThreadNum();
maxMarkTaskCount_ = totalThreadNum - 1;
parallelGC_ = option.EnableParallelGC();
if (auto workThreadNum = sWorkManager_->GetTotalThreadNum();
@ -726,7 +726,7 @@ void Heap::Initialize()
hugeObjectSpace_ = new HugeObjectSpace(this, heapRegionAllocator_, oldSpaceCapacity, oldSpaceCapacity);
hugeMachineCodeSpace_ = new HugeMachineCodeSpace(this, heapRegionAllocator_, oldSpaceCapacity, oldSpaceCapacity);
maxEvacuateTaskCount_ = GCWorkerPool::GetCurrentTaskpool()->GetTotalThreadNum();
maxEvacuateTaskCount_ = Taskpool::GetCurrentTaskpool()->GetTotalThreadNum();
maxMarkTaskCount_ = std::min<size_t>(ecmaVm_->GetJSOptions().GetGcThreadNum(),
maxEvacuateTaskCount_ - 1);
@ -744,7 +744,7 @@ void Heap::Initialize()
#if ECMASCRIPT_DISABLE_CONCURRENT_MARKING
concurrentMarkerEnabled = false;
#endif
workManager_ = new WorkManager(this, GCWorkerPool::GetCurrentTaskpool()->GetTotalThreadNum() + 1);
workManager_ = new WorkManager(this, Taskpool::GetCurrentTaskpool()->GetTotalThreadNum() + 1);
stwYoungGC_ = new STWYoungGC(this, parallelGC_);
fullGC_ = new FullGC(this);
@ -981,7 +981,7 @@ void Heap::Resume(TriggerGCType gcType)
hugeMachineCodeSpace_->ReclaimHugeRegion();
if (parallelGC_) {
clearTaskFinished_ = false;
GCWorkerPool::GetCurrentTaskpool()->PostTask(
Taskpool::GetCurrentTaskpool()->PostTask(
std::make_unique<AsyncClearTask>(GetJSThread()->GetThreadId(), this, gcType));
} else {
ReclaimRegions(gcType);
@ -1019,13 +1019,13 @@ void Heap::DisableParallelGC()
stwYoungGC_->ConfigParallelGC(false);
sweeper_->ConfigConcurrentSweep(false);
concurrentMarker_->ConfigConcurrentMark(false);
GCWorkerPool::GetCurrentTaskpool()->Destroy(GetJSThread()->GetThreadId());
Taskpool::GetCurrentTaskpool()->Destroy(GetJSThread()->GetThreadId());
}
void Heap::EnableParallelGC()
{
parallelGC_ = ecmaVm_->GetJSOptions().EnableParallelGC();
maxEvacuateTaskCount_ = GCWorkerPool::GetCurrentTaskpool()->GetTotalThreadNum();
maxEvacuateTaskCount_ = Taskpool::GetCurrentTaskpool()->GetTotalThreadNum();
if (auto totalThreadNum = workManager_->GetTotalThreadNum();
totalThreadNum != maxEvacuateTaskCount_ + 1) {
LOG_ECMA_MEM(WARN) << "TheadNum mismatch, totalThreadNum(workerManager): " << totalThreadNum << ", "
@ -2054,7 +2054,7 @@ void Heap::WaitConcurrentMarkingFinished()
void Heap::PostParallelGCTask(ParallelGCTaskPhase gcTask)
{
IncreaseTaskCount();
GCWorkerPool::GetCurrentTaskpool()->PostTask(
Taskpool::GetCurrentTaskpool()->PostTask(
std::make_unique<ParallelGCTask>(GetJSThread()->GetThreadId(), this, gcTask));
}
@ -2082,7 +2082,7 @@ void Heap::ChangeGCParams(bool inBackground)
sweeper_->EnableConcurrentSweep(EnableConcurrentSweepType::DISABLE);
maxMarkTaskCount_ = 1;
maxEvacuateTaskCount_ = 1;
GCWorkerPool::GetCurrentTaskpool()->SetThreadPriority(PriorityMode::BACKGROUND);
Taskpool::GetCurrentTaskpool()->SetThreadPriority(PriorityMode::BACKGROUND);
} else {
LOG_GC(INFO) << "app is not inBackground";
if (GetMemGrowingType() != MemGrowingType::PRESSURE) {
@ -2092,9 +2092,9 @@ void Heap::ChangeGCParams(bool inBackground)
concurrentMarker_->EnableConcurrentMarking(EnableConcurrentMarkType::ENABLE);
sweeper_->EnableConcurrentSweep(EnableConcurrentSweepType::ENABLE);
maxMarkTaskCount_ = std::min<size_t>(ecmaVm_->GetJSOptions().GetGcThreadNum(),
GCWorkerPool::GetCurrentTaskpool()->GetTotalThreadNum() - 1);
maxEvacuateTaskCount_ = GCWorkerPool::GetCurrentTaskpool()->GetTotalThreadNum();
GCWorkerPool::GetCurrentTaskpool()->SetThreadPriority(PriorityMode::FOREGROUND);
Taskpool::GetCurrentTaskpool()->GetTotalThreadNum() - 1);
maxEvacuateTaskCount_ = Taskpool::GetCurrentTaskpool()->GetTotalThreadNum();
Taskpool::GetCurrentTaskpool()->SetThreadPriority(PriorityMode::FOREGROUND);
}
}
@ -2198,7 +2198,7 @@ void Heap::NotifyFinishColdStartSoon()
}
// post 2s task
GCWorkerPool::GetCurrentTaskpool()->PostTask(
Taskpool::GetCurrentTaskpool()->PostTask(
std::make_unique<FinishColdStartTask>(GetJSThread()->GetThreadId(), this));
}
@ -2326,7 +2326,7 @@ void Heap::CleanCallBack()
{
auto &concurrentCallbacks = this->GetEcmaVM()->GetConcurrentNativePointerCallbacks();
if (!concurrentCallbacks.empty()) {
GCWorkerPool::GetCurrentTaskpool()->PostTask(
Taskpool::GetCurrentTaskpool()->PostTask(
std::make_unique<DeleteCallbackTask>(thread_->GetThreadId(), concurrentCallbacks)
);
}

View File

@ -386,7 +386,7 @@ int ParallelEvacuator::CalculateEvacuationThreadNum()
uint32_t count = evacuateWorkloadSet_.GetWorkloadCount();
uint32_t regionPerThread = 8;
uint32_t maxThreadNum = std::min(heap_->GetMaxEvacuateTaskCount(),
GCWorkerPool::GetCurrentTaskpool()->GetTotalThreadNum());
Taskpool::GetCurrentTaskpool()->GetTotalThreadNum());
return static_cast<int>(std::min(std::max(1U, count / regionPerThread), maxThreadNum));
}
@ -396,7 +396,7 @@ int ParallelEvacuator::CalculateUpdateThreadNum()
double regionPerThread = 1.0 / 4;
count = std::pow(count, regionPerThread);
uint32_t maxThreadNum = std::min(heap_->GetMaxEvacuateTaskCount(),
GCWorkerPool::GetCurrentTaskpool()->GetTotalThreadNum());
Taskpool::GetCurrentTaskpool()->GetTotalThreadNum());
return static_cast<int>(std::min(std::max(1U, count), maxThreadNum));
}

View File

@ -95,7 +95,7 @@ void ParallelEvacuator::EvacuateSpace()
LockHolder holder(mutex_);
parallel_ = CalculateEvacuationThreadNum();
for (int i = 0; i < parallel_; i++) {
GCWorkerPool::GetCurrentTaskpool()->PostTask(
Taskpool::GetCurrentTaskpool()->PostTask(
std::make_unique<EvacuationTask>(heap_->GetJSThread()->GetThreadId(), this));
}
}
@ -246,7 +246,7 @@ void ParallelEvacuator::UpdateReference()
LockHolder holder(mutex_);
parallel_ = CalculateUpdateThreadNum();
for (int i = 0; i < parallel_; i++) {
GCWorkerPool::GetCurrentTaskpool()->PostTask(
Taskpool::GetCurrentTaskpool()->PostTask(
std::make_unique<UpdateReferenceTask>(heap_->GetJSThread()->GetThreadId(), this));
}
}
@ -297,7 +297,7 @@ void ParallelEvacuator::UpdateRoot()
void ParallelEvacuator::UpdateRecordWeakReference()
{
auto totalThreadCount = GCWorkerPool::GetCurrentTaskpool()->GetTotalThreadNum() + 1;
auto totalThreadCount = Taskpool::GetCurrentTaskpool()->GetTotalThreadNum() + 1;
for (uint32_t i = 0; i < totalThreadCount; i++) {
ProcessQueue *queue = heap_->GetWorkManager()->GetWeakReferenceQueue(i);
@ -371,7 +371,7 @@ void ParallelEvacuator::UpdateWeakReference()
template<TriggerGCType gcType>
void ParallelEvacuator::UpdateRecordWeakReferenceOpt()
{
auto totalThreadCount = GCWorkerPool::GetCurrentTaskpool()->GetTotalThreadNum() + 1;
auto totalThreadCount = Taskpool::GetCurrentTaskpool()->GetTotalThreadNum() + 1;
for (uint32_t i = 0; i < totalThreadCount; i++) {
ProcessQueue *queue = heap_->GetWorkManager()->GetWeakReferenceQueue(i);

View File

@ -43,7 +43,7 @@ void PartialGC::RunPhases()
bool needAjustGCThreadPrio = heap_->GetGCType() == TriggerGCType::OLD_GC ||
heap_->GetNewSpace()->GetCommittedSize() >= heap_->GetNewSpace()->GetMaximumCapacity();
if (mainThreadInForeground && needAjustGCThreadPrio) {
GCWorkerPool::GetCurrentTaskpool()->SetThreadPriority(PriorityMode::STW);
Taskpool::GetCurrentTaskpool()->SetThreadPriority(PriorityMode::STW);
}
markingInProgress_ = heap_->CheckOngoingConcurrentMarking();
LOG_GC(DEBUG) << "markingInProgress_" << markingInProgress_;
@ -63,7 +63,7 @@ void PartialGC::RunPhases()
}
Finish();
if (mainThreadInForeground && needAjustGCThreadPrio) {
GCWorkerPool::GetCurrentTaskpool()->SetThreadPriority(PriorityMode::FOREGROUND);
Taskpool::GetCurrentTaskpool()->SetThreadPriority(PriorityMode::FOREGROUND);
}
if (heap_->IsConcurrentFullMark()) {
heap_->NotifyHeapAliveSizeAfterGC(heap_->GetHeapObjectSize());

View File

@ -28,10 +28,10 @@ void SharedConcurrentSweeper::PostTask(bool isFullGC)
auto tid = DaemonThread::GetInstance()->GetThreadId();
if (ConcurrentSweepEnabled()) {
if (!isFullGC) {
GCWorkerPool::GetCurrentTaskpool()->PostTask(
Taskpool::GetCurrentTaskpool()->PostTask(
std::make_unique<SweeperTask>(tid, this, SHARED_OLD_SPACE));
}
GCWorkerPool::GetCurrentTaskpool()->PostTask(
Taskpool::GetCurrentTaskpool()->PostTask(
std::make_unique<SweeperTask>(tid, this, SHARED_NON_MOVABLE));
}
}

View File

@ -126,7 +126,7 @@ void SharedFullGC::Finish()
void SharedFullGC::UpdateRecordWeakReference()
{
auto totalThreadCount = GCWorkerPool::GetCurrentTaskpool()->GetTotalThreadNum() + 1;
auto totalThreadCount = Taskpool::GetCurrentTaskpool()->GetTotalThreadNum() + 1;
for (uint32_t i = 0; i < totalThreadCount; i++) {
ProcessQueue *queue = sHeap_->GetWorkManager()->GetWeakReferenceQueue(i);

View File

@ -131,7 +131,7 @@ void SharedGC::Finish()
void SharedGC::UpdateRecordWeakReference()
{
auto totalThreadCount = GCWorkerPool::GetCurrentTaskpool()->GetTotalThreadNum() + 1;
auto totalThreadCount = Taskpool::GetCurrentTaskpool()->GetTotalThreadNum() + 1;
for (uint32_t i = 0; i < totalThreadCount; i++) {
ProcessQueue *queue = sHeap_->GetWorkManager()->GetWeakReferenceQueue(i);

View File

@ -71,7 +71,7 @@ void STWYoungGC::Mark()
}
heap_->WaitRunningTaskFinished();
auto totalThreadCount = GCWorkerPool::GetCurrentTaskpool()->GetTotalThreadNum() + 1; // gc thread and main thread
auto totalThreadCount = Taskpool::GetCurrentTaskpool()->GetTotalThreadNum() + 1; // gc thread and main thread
for (uint32_t i = 0; i < totalThreadCount; i++) {
SlotNeedUpdate needUpdate(nullptr, ObjectSlot(0));
while (workManager_->GetSlotNeedUpdate(i, &needUpdate)) {
@ -85,7 +85,7 @@ void STWYoungGC::Sweep()
ECMA_BYTRACE_NAME(HITRACE_TAG_ARK, "STWYoungGC::Sweep");
TRACE_GC(GCStats::Scope::ScopeId::Sweep, heap_->GetEcmaVM()->GetEcmaGCStats());
auto totalThreadCount = static_cast<uint32_t>(
GCWorkerPool::GetCurrentTaskpool()->GetTotalThreadNum() + 1); // gc thread and main thread
Taskpool::GetCurrentTaskpool()->GetTotalThreadNum() + 1); // gc thread and main thread
for (uint32_t i = 0; i < totalThreadCount; i++) {
ProcessQueue *queue = workManager_->GetWeakReferenceQueue(i);
while (true) {

View File

@ -31,7 +31,6 @@
#include "ecmascript/pgo_profiler/pgo_utils.h"
#include "ecmascript/platform/file.h"
#include "ecmascript/platform/mutex.h"
#include "ecmascript/taskpool/taskpool.h"
#include "ecmascript/platform/os.h"
namespace panda::ecmascript::pgo {

View File

@ -122,10 +122,7 @@ private:
class SaveTask : public Task {
public:
explicit SaveTask(PGOProfilerEncoder *encoder, int32_t id) : Task(id), encoder_(encoder)
{
isCancellable_ = true;
}
explicit SaveTask(PGOProfilerEncoder *encoder, int32_t id) : Task(id), encoder_(encoder) {};
virtual ~SaveTask() override = default;
bool Run([[maybe_unused]] uint32_t threadIndex) override

View File

@ -58,14 +58,6 @@ public:
return terminate_;
}
bool IsCancellable() const
{
return isCancellable_;
}
protected:
bool isCancellable_ {false};
private:
int32_t id_ {0};
volatile bool terminate_ {false};

View File

@ -17,24 +17,15 @@
#include "ecmascript/platform/os.h"
#if defined(ENABLE_FFRT_INTERFACES)
#include "ffrt_inner.h"
#include "c/executor_task.h"
#endif
namespace panda::ecmascript {
Taskpool *Taskpool::GetCurrentTaskpool()
{
#if defined(ENABLE_FFRT_INTERFACES)
static Taskpool *taskpool = new FFRTTaskpool();
#else
static Taskpool *taskpool = new ThreadedTaskpool();
#endif
static Taskpool *taskpool = new Taskpool();
return taskpool;
}
void ThreadedTaskpool::InitializeWithHooks(int32_t threadNum,
const std::function<void(os::thread::native_handle_type)> prologueHook,
void Taskpool::Initialize(int threadNum,
std::function<void(os::thread::native_handle_type)> prologueHook,
const std::function<void(os::thread::native_handle_type)> epilogueHook)
{
LockHolder lock(mutex_);
@ -43,7 +34,7 @@ void ThreadedTaskpool::InitializeWithHooks(int32_t threadNum,
}
}
void ThreadedTaskpool::Destroy(int32_t id)
void Taskpool::Destroy(int32_t id)
{
ASSERT(id != 0);
LockHolder lock(mutex_);
@ -58,7 +49,7 @@ void ThreadedTaskpool::Destroy(int32_t id)
}
}
void ThreadedTaskpool::TerminateTask(int32_t id, TaskType type)
void Taskpool::TerminateTask(int32_t id, TaskType type)
{
if (isInitialized_ <= 0) {
return;
@ -66,7 +57,7 @@ void ThreadedTaskpool::TerminateTask(int32_t id, TaskType type)
runner_->TerminateTask(id, type);
}
uint32_t ThreadedTaskpool::TheMostSuitableThreadNum(uint32_t threadNum) const
uint32_t Taskpool::TheMostSuitableThreadNum(uint32_t threadNum) const
{
if (threadNum > 0) {
return std::min<uint32_t>(threadNum, MAX_TASKPOOL_THREAD_NUM);
@ -78,116 +69,11 @@ uint32_t ThreadedTaskpool::TheMostSuitableThreadNum(uint32_t threadNum) const
return MIN_TASKPOOL_THREAD_NUM; // At least MIN_TASKPOOL_THREAD_NUM GC threads, and 1 extra daemon thread.
}
void ThreadedTaskpool::ForEachTask(const std::function<void(Task*)> &f)
void Taskpool::ForEachTask(const std::function<void(Task*)> &f)
{
if (isInitialized_ <= 0) {
return;
}
runner_->ForEachTask(f);
}
GCWorkerPool *GCWorkerPool::GetCurrentTaskpool()
{
static GCWorkerPool *taskpool = new GCWorkerPool();
return taskpool;
}
#if defined(ENABLE_FFRT_INTERFACES)
void FFRTTaskpool::TerminateTask(int32_t id, TaskType type)
{
LockHolder lock(mutex_);
for (auto &[task, handler] : cancellableTasks_) {
if (id != ALL_TASK_ID && id != task->GetId()) {
continue;
}
if (type != TaskType::ALL && type != task->GetTaskType()) {
continue;
}
// Return non-zero if the task is doing or has finished, so calling terminated is meaningless.
if (ffrt::skip(handler) == 0) {
task->Terminated();
}
}
}
void FFRTTaskpool::Destroy(int32_t id)
{
TerminateTask(id, TaskType::ALL);
}
uint32_t FFRTTaskpool::TheMostSuitableThreadNum(uint32_t threadNum) const
{
if (threadNum > 0) {
return std::min<uint32_t>(threadNum, MAX_TASKPOOL_THREAD_NUM);
}
uint32_t numOfThreads = std::min<uint32_t>(NumberOfCpuCore() / 2, MAX_TASKPOOL_THREAD_NUM);
return std::max<uint32_t>(numOfThreads, MIN_TASKPOOL_THREAD_NUM);
}
bool FFRTTaskpool::IsInThreadPool() const
{
auto tid = ffrt::this_task::get_id();
LockHolder lock(mutex_);
return ffrtTaskIds_.find(tid) != ffrtTaskIds_.end();
}
void FFRTTaskpool::PostTask(std::unique_ptr<Task> task)
{
constexpr uint32_t FFRT_TASK_STACK_SIZE = 8 * 1024 * 1024; // 8MB
ffrt::task_attr taskAttr;
ffrt_task_attr_init(&taskAttr);
ffrt_task_attr_set_name(&taskAttr, "Ark_FFRTTaskpool_Task");
ffrt_task_attr_set_qos(&taskAttr, ffrt_qos_user_initiated);
ffrt_task_attr_set_stack_size(&taskAttr, FFRT_TASK_STACK_SIZE);
if (LIKELY(!task->IsCancellable())) {
SubmitNonCancellableTask(std::move(task), taskAttr);
} else {
SubmitCancellableTask(std::move(task), taskAttr);
}
}
void FFRTTaskpool::SubmitNonCancellableTask(std::unique_ptr<Task> task, const ffrt::task_attr &taskAttr)
{
auto ffrtTask = [this, task = task.release()]() {
auto tid = ffrt::this_task::get_id();
{
LockHolder lock(mutex_);
ffrtTaskIds_.insert(tid);
}
task->Run(tid);
delete task;
LockHolder lock(mutex_);
if (auto iter = ffrtTaskIds_.find(tid); LIKELY(iter != ffrtTaskIds_.end())) {
ffrtTaskIds_.erase(iter);
}
};
ffrt::submit(ffrtTask, {}, {}, taskAttr);
}
void FFRTTaskpool::SubmitCancellableTask(std::unique_ptr<Task> task, const ffrt::task_attr &taskAttr)
{
std::shared_ptr<Task> sTask(std::move(task));
auto ffrtTask = [this, sTask]() {
auto tid = ffrt::this_task::get_id();
{
LockHolder lock(mutex_);
ffrtTaskIds_.insert(tid);
}
sTask->Run(tid);
LockHolder lock(mutex_);
cancellableTasks_.erase(sTask);
if (auto iter = ffrtTaskIds_.find(tid); LIKELY(iter != ffrtTaskIds_.end())) {
ffrtTaskIds_.erase(iter);
}
};
// When the ffrtTask is being scheduled, it may not hold the same potential lock as ffrt::submit_h;
// So it is safe to lock before ffrt::submit_h.
LockHolder lock(mutex_);
ffrt::task_handle handler = ffrt::submit_h(ffrtTask, {}, {}, taskAttr);
cancellableTasks_.emplace(sTask, std::move(handler));
}
#endif
} // namespace panda::ecmascript

View File

@ -29,49 +29,22 @@ public:
PUBLIC_API static Taskpool *GetCurrentTaskpool();
Taskpool() = default;
virtual ~Taskpool() = default;
NO_COPY_SEMANTIC(Taskpool);
NO_MOVE_SEMANTIC(Taskpool);
virtual void Initialize(int32_t threadNum = DEFAULT_TASKPOOL_THREAD_NUM) = 0;
virtual void PostTask(std::unique_ptr<Task> task) = 0;
virtual void Destroy(int32_t id) = 0;
// Terminate a task of a specified type
virtual void TerminateTask(int32_t id, TaskType type = TaskType::ALL) = 0;
virtual uint32_t GetTotalThreadNum() const = 0;
virtual bool IsInThreadPool() const = 0;
private:
virtual uint32_t TheMostSuitableThreadNum(uint32_t threadNum) const = 0;
};
class ThreadedTaskpool : public Taskpool {
public:
ThreadedTaskpool() = default;
~ThreadedTaskpool()
PUBLIC_API ~Taskpool()
{
LockHolder lock(mutex_);
runner_->TerminateThread();
isInitialized_ = 0;
}
NO_COPY_SEMANTIC(ThreadedTaskpool);
NO_MOVE_SEMANTIC(ThreadedTaskpool);
NO_COPY_SEMANTIC(Taskpool);
NO_MOVE_SEMANTIC(Taskpool);
void Initialize(int32_t threadNum = DEFAULT_TASKPOOL_THREAD_NUM) override
{
InitializeWithHooks(threadNum, nullptr, nullptr);
}
void Initialize(int threadNum = DEFAULT_TASKPOOL_THREAD_NUM,
std::function<void(os::thread::native_handle_type)> prologueHook = nullptr,
const std::function<void(os::thread::native_handle_type)> epilogueHook = nullptr);
void Destroy(int32_t id);
void Destroy(int32_t id) override;
void PostTask(std::unique_ptr<Task> task) override
void PostTask(std::unique_ptr<Task> task) const
{
if (isInitialized_ > 0) {
runner_->PostTask(std::move(task));
@ -79,95 +52,38 @@ public:
}
// Terminate a task of a specified type
void TerminateTask(int32_t id, TaskType type = TaskType::ALL) override;
void TerminateTask(int32_t id, TaskType type = TaskType::ALL);
uint32_t GetTotalThreadNum() const override
uint32_t GetTotalThreadNum() const
{
return runner_->GetTotalThreadNum();
}
bool IsInThreadPool(std::thread::id id) const
{
return runner_->IsInThreadPool(id);
}
bool IsDaemonThreadOrInThreadPool(std::thread::id id) const
{
DaemonThread *dThread = DaemonThread::GetInstance();
return IsInThreadPool(id) || (dThread != nullptr
&& dThread->GetThreadId() == JSThread::GetCurrentThreadId());
}
void SetThreadPriority(PriorityMode mode)
{
runner_->SetQosPriority(mode);
}
bool IsInThreadPool() const override
{
std::thread::id id = std::this_thread::get_id();
return runner_->IsInThreadPool(id);
}
void ForEachTask(const std::function<void(Task*)> &f);
protected:
void InitializeWithHooks(int32_t threadNum,
const std::function<void(os::thread::native_handle_type)> prologueHook,
const std::function<void(os::thread::native_handle_type)> epilogueHook);
private:
uint32_t TheMostSuitableThreadNum(uint32_t threadNum) const override;
virtual uint32_t TheMostSuitableThreadNum(uint32_t threadNum) const;
std::unique_ptr<Runner> runner_;
volatile int isInitialized_ = 0;
Mutex mutex_;
};
class GCWorkerPool : public ThreadedTaskpool {
public:
PUBLIC_API static GCWorkerPool *GetCurrentTaskpool();
GCWorkerPool() = default;
~GCWorkerPool() = default;
NO_COPY_SEMANTIC(GCWorkerPool);
NO_MOVE_SEMANTIC(GCWorkerPool);
bool IsDaemonThreadOrInThreadPool() const
{
DaemonThread *dThread = DaemonThread::GetInstance();
return IsInThreadPool() || (dThread != nullptr
&& dThread->GetThreadId() == JSThread::GetCurrentThreadId());
}
};
#if defined(ENABLE_FFRT_INTERFACES)
class FFRTTaskpool : public Taskpool {
public:
FFRTTaskpool() = default;
~FFRTTaskpool() = default;
NO_COPY_SEMANTIC(FFRTTaskpool);
NO_MOVE_SEMANTIC(FFRTTaskpool);
void Initialize(int32_t threadNum = DEFAULT_TASKPOOL_THREAD_NUM) override
{
totalThreadNum_ = TheMostSuitableThreadNum(threadNum);
}
void Destroy(int32_t id) override;
uint32_t GetTotalThreadNum() const override
{
return totalThreadNum_;
}
void PostTask(std::unique_ptr<Task> task) override;
void TerminateTask(int32_t id, TaskType type = TaskType::ALL) override;
bool IsInThreadPool() const override;
private:
uint32_t TheMostSuitableThreadNum(uint32_t threadNum) const override;
void SubmitNonCancellableTask(std::unique_ptr<Task> task, const ffrt::task_attr &taskAttr);
void SubmitCancellableTask(std::unique_ptr<Task> task, const ffrt::task_attr &taskAttr);
mutable Mutex mutex_;
std::unordered_map<std::shared_ptr<Task>, ffrt::task_handle> cancellableTasks_ {};
std::atomic<uint32_t> totalThreadNum_ {0};
std::unordered_multiset<uint32_t> ffrtTaskIds_ {};
};
#endif
} // namespace panda::ecmascript
#endif // ECMASCRIPT_PALTFORM_PLATFORM_H

View File

@ -318,11 +318,6 @@ template("libark_jsruntime_intl_common_set") {
external_deps = []
deps = []
defines = []
if (is_ohos && is_standard_system && !is_arkui_x) {
defines += [ "ENABLE_FFRT_INTERFACES" ]
external_deps += [ "ffrt:libffrt" ]
}
if (!is_arkui_x) {
external_deps += [ "runtime_core:arkfile_header_deps" ]
} else {

View File

@ -98,7 +98,6 @@
panda::ecmascript::DebugInfoExtractor::*;
panda::ecmascript::JSRuntimeOptions::*;
panda::ecmascript::Taskpool::*;
panda::ecmascript::GCWorkerPool::*;
panda::ecmascript::JSThread::*;
panda::ecmascript::Chunk::*;
panda::ecmascript::WorkSpaceChunk::*;