task add new priority

Signed-off-by: zhuruigan <zhuruigan@huawei.com>
Change-Id: Id5077eb08007afd5a6d8dc6795e5166c2342e783
This commit is contained in:
zhuruigan 2024-04-22 15:59:21 +08:00
parent 70cff5c420
commit 5ad8f0a8d8
8 changed files with 102 additions and 24 deletions

View File

@ -194,13 +194,13 @@ void TaskManager::UpdateExecutedInfo(uint64_t duration)
uint32_t TaskManager::ComputeSuitableThreadNum()
{
uint32_t targetNum = 0;
if (GetTaskNum() != 0 && totalExecCount_ == 0) {
if (GetNonIdleTaskNum() != 0 && totalExecCount_ == 0) {
// this branch is used for avoiding time-consuming tasks that may block the taskpool
targetNum = std::min(STEP_SIZE, GetTaskNum());
targetNum = std::min(STEP_SIZE, GetNonIdleTaskNum());
} else if (totalExecCount_ != 0) {
auto durationPerTask = static_cast<double>(totalExecTime_) / totalExecCount_;
uint32_t result = std::ceil(durationPerTask * GetTaskNum() / MAX_TASK_DURATION);
targetNum = std::min(result, GetTaskNum());
uint32_t result = std::ceil(durationPerTask * GetNonIdleTaskNum() / MAX_TASK_DURATION);
targetNum = std::min(result, GetNonIdleTaskNum());
}
targetNum += GetRunningWorkers();
return targetNum | 1;
@ -455,7 +455,7 @@ void TaskManager::TryExpand()
// dispatch task in the TaskPoolManager thread
NotifyExecuteTask();
// do not trigger when there are more idleWorkers than tasks
if (GetIdleWorkers() > GetTaskNum()) {
if (GetIdleWorkers() > GetNonIdleTaskNum()) {
return;
}
needChecking_ = false; // do not need to check
@ -621,6 +621,25 @@ uint32_t TaskManager::GetTaskNum()
return sum;
}
uint32_t TaskManager::GetNonIdleTaskNum()
{
return nonIdleTaskNum_;
}
void TaskManager::IncreaseNumIfNoIdle(Priority priority)
{
if (priority != Priority::IDLE) {
++nonIdleTaskNum_;
}
}
void TaskManager::DecreaseNumIfNoIdle(Priority priority)
{
if (priority != Priority::IDLE) {
--nonIdleTaskNum_;
}
}
uint32_t TaskManager::GetThreadNum()
{
std::lock_guard<std::recursive_mutex> lock(workersMutex_);
@ -631,6 +650,7 @@ void TaskManager::EnqueueTaskId(uint64_t taskId, Priority priority)
{
{
std::lock_guard<std::mutex> lock(taskQueuesMutex_);
IncreaseNumIfNoIdle(priority);
taskQueues_[priority]->EnqueueTaskId(taskId);
}
TryTriggerExpand();
@ -645,36 +665,55 @@ std::pair<uint64_t, Priority> TaskManager::DequeueTaskId()
std::lock_guard<std::mutex> lock(taskQueuesMutex_);
auto& highTaskQueue = taskQueues_[Priority::HIGH];
if (!highTaskQueue->IsEmpty() && highPrioExecuteCount_ < HIGH_PRIORITY_TASK_COUNT) {
auto& highTaskQueue = taskQueues_[Priority::HIGH];
highPrioExecuteCount_++;
uint64_t taskId = highTaskQueue->DequeueTaskId();
if (IsDependendByTaskId(taskId)) {
EnqueuePendingTaskInfo(taskId, Priority::HIGH);
return std::make_pair(0, Priority::HIGH);
}
return std::make_pair(taskId, Priority::HIGH);
return GetTaskByPriority(highTaskQueue, Priority::HIGH);
}
highPrioExecuteCount_ = 0;
auto& mediumTaskQueue = taskQueues_[Priority::MEDIUM];
if (!mediumTaskQueue->IsEmpty() && mediumPrioExecuteCount_ < MEDIUM_PRIORITY_TASK_COUNT) {
mediumPrioExecuteCount_++;
uint64_t taskId = mediumTaskQueue->DequeueTaskId();
if (IsDependendByTaskId(taskId)) {
EnqueuePendingTaskInfo(taskId, Priority::MEDIUM);
return std::make_pair(0, Priority::MEDIUM);
}
return std::make_pair(taskId, Priority::MEDIUM);
return GetTaskByPriority(mediumTaskQueue, Priority::MEDIUM);
}
mediumPrioExecuteCount_ = 0;
auto& lowTaskQueue = taskQueues_[Priority::LOW];
uint64_t taskId = lowTaskQueue->DequeueTaskId();
if (IsDependendByTaskId(taskId)) {
EnqueuePendingTaskInfo(taskId, Priority::LOW);
return std::make_pair(0, Priority::LOW);
if (!lowTaskQueue->IsEmpty()) {
return GetTaskByPriority(lowTaskQueue, Priority::LOW);
}
return std::make_pair(taskId, Priority::LOW);
auto& idleTaskQueue = taskQueues_[Priority::IDLE];
if (highTaskQueue->IsEmpty() && mediumTaskQueue->IsEmpty() && !idleTaskQueue->IsEmpty() && IsChooseIdle()) {
return GetTaskByPriority(idleTaskQueue, Priority::IDLE);
}
return std::make_pair(0, Priority::LOW);
}
bool TaskManager::IsChooseIdle()
{
std::lock_guard<std::recursive_mutex> lock(workersMutex_);
for (auto& worker : workers_) {
if (worker->state_ == WorkerState::IDLE) {
// If worker->state_ is WorkerState::IDLE, it means that the worker is free
continue;
}
// If there is a worker running a task, do not take the idle task.
return false;
}
// Only when all workers are free, will idle task be taken.
return true;
}
std::pair<uint64_t, Priority> TaskManager::GetTaskByPriority(const std::unique_ptr<ExecuteQueue>& taskQueue,
Priority priority)
{
uint64_t taskId = taskQueue->DequeueTaskId();
if (IsDependendByTaskId(taskId)) {
EnqueuePendingTaskInfo(taskId, priority);
return std::make_pair(0, priority);
}
DecreaseNumIfNoIdle(priority);
return std::make_pair(taskId, priority);
}
void TaskManager::NotifyExecuteTask()
@ -703,6 +742,7 @@ void TaskManager::InitTaskManager(napi_env env)
}
if (EnableFfrt()) {
HILOG_INFO("taskpool:: apps use ffrt");
ffrt_set_cpu_worker_max_num(ffrt::qos_background, 1);
ffrt_set_cpu_worker_max_num(ffrt::qos_utility, 12); // 12 : worker max num
ffrt_set_cpu_worker_max_num(ffrt::qos_default, 12); // 12 : worker max num
ffrt_set_cpu_worker_max_num(ffrt::qos_user_initiated, 12); // 12 : worker max num

View File

@ -166,6 +166,12 @@ private:
static void NotifyExpand(const uv_async_t* req);
static void TriggerLoadBalance(const uv_timer_t* req = nullptr);
bool IsChooseIdle();
uint32_t GetNonIdleTaskNum();
std::pair<uint64_t, Priority> GetTaskByPriority(const std::unique_ptr<ExecuteQueue>& taskQueue, Priority priority);
void IncreaseNumIfNoIdle(Priority priority);
void DecreaseNumIfNoIdle(Priority priority);
// <taskId, Task>
std::unordered_map<uint64_t, Task*> tasks_ {};
std::recursive_mutex tasksMutex_;
@ -202,6 +208,7 @@ private:
uv_async_t* expandHandle_ = nullptr;
std::atomic<bool> suspend_ = false;
std::atomic<uint32_t> retryCount_ = 0;
std::atomic<uint32_t> nonIdleTaskNum_ = 0;
std::atomic<uint32_t> totalExecCount_ = 0;
std::atomic<uint64_t> totalExecTime_ = 0;
std::atomic<bool> needChecking_ = false;

View File

@ -56,10 +56,12 @@ napi_value TaskPool::InitTaskPool(napi_env env, napi_value exports)
napi_value highPriority = NapiHelper::CreateUint32(env, Priority::HIGH);
napi_value mediumPriority = NapiHelper::CreateUint32(env, Priority::MEDIUM);
napi_value lowPriority = NapiHelper::CreateUint32(env, Priority::LOW);
napi_value idlePriority = NapiHelper::CreateUint32(env, Priority::IDLE);
napi_property_descriptor exportPriority[] = {
DECLARE_NAPI_PROPERTY("HIGH", highPriority),
DECLARE_NAPI_PROPERTY("MEDIUM", mediumPriority),
DECLARE_NAPI_PROPERTY("LOW", lowPriority),
DECLARE_NAPI_PROPERTY("IDLE", idlePriority),
};
napi_define_properties(env, priorityObj, sizeof(exportPriority) / sizeof(exportPriority[0]), exportPriority);

View File

@ -706,4 +706,29 @@ HWTEST_F(NativeEngineTest, TaskpoolTest059, testing::ext::TestSize.Level0)
usleep(50000);
uint32_t result = taskManager.GetIdleWorkers();
ASSERT_TRUE(result == 0);
}
HWTEST_F(NativeEngineTest, TaskpoolTest060, testing::ext::TestSize.Level0)
{
napi_env env = reinterpret_cast<napi_env>(engine_);
TaskManager& taskManager = TaskManager::GetInstance();
taskManager.InitTaskManager(env);
uint64_t taskId = 36;
taskManager.EnqueueTaskId(taskId, Priority::LOW);
ASSERT_EQ(taskId, 36);
std::pair<uint64_t, Priority> result = taskManager.DequeueTaskId();
ASSERT_TRUE(result.first == 36);
ASSERT_TRUE(result.second == Priority::LOW);
taskId = 37;
taskManager.EnqueueTaskId(taskId, Priority::IDLE);
ASSERT_EQ(taskId, 37);
result = taskManager.DequeueTaskId();
ASSERT_TRUE(result.first == 37);
ASSERT_TRUE(result.second == Priority::IDLE);
result = taskManager.DequeueTaskId();
ASSERT_TRUE(result.first == 0);
ASSERT_TRUE(result.second == Priority::LOW);
}

View File

@ -157,6 +157,7 @@ void Worker::InitFfrtInfo()
{
if (TaskManager::GetInstance().EnableFfrt()) {
static const std::map<int, Priority> FFRTQOS_WORKERPRIORITY_MAP = {
{ffrt::qos_background, Priority::IDLE},
{ffrt::qos_utility, Priority::LOW},
{ffrt::qos_default, Priority::DEFAULT},
{ffrt::qos_user_initiated, Priority::HIGH},

View File

@ -44,6 +44,7 @@ enum class WorkerState { IDLE, RUNNING, BLOCKED };
#if defined(ENABLE_TASKPOOL_FFRT)
static const std::map<Priority, int> WORKERPRIORITY_FFRTQOS_MAP = {
{Priority::IDLE, ffrt::qos_background},
{Priority::LOW, ffrt::qos_utility},
{Priority::MEDIUM, ffrt::qos_default},
{Priority::HIGH, ffrt::qos_user_initiated},

View File

@ -28,6 +28,7 @@ using OHOS::QOS::QosLevel;
#ifdef ENABLE_QOS
static const std::map<Priority, QosLevel> WORKERPRIORITY_QOSLEVEL_MAP = {
{Priority::IDLE, OHOS::QOS::QosLevel::QOS_BACKGROUND},
{Priority::LOW, OHOS::QOS::QosLevel::QOS_UTILITY},
{Priority::DEFAULT, OHOS::QOS::QosLevel::QOS_DEFAULT},
{Priority::HIGH, OHOS::QOS::QosLevel::QOS_USER_INITIATED}

View File

@ -22,7 +22,8 @@ enum Priority {
HIGH = 0,
MEDIUM,
LOW,
NUMBER = 3,
IDLE,
NUMBER = 4,
DEFAULT = MEDIUM,
};
} // namespace Commonlibrary::Platform