debug定位

Signed-off-by: 董洁 <dongjie52@h-partners.com>
This commit is contained in:
董洁 2024-11-12 16:42:32 +08:00
parent 693076b3f7
commit 32929ae634
77 changed files with 1049 additions and 594 deletions

15
.clang-format Normal file
View File

@ -0,0 +1,15 @@
---
BasedOnStyle: Google
IndentWidth: 4
ColumnLimit: 120
---
Language: Cpp
AccessModifierOffset: -4
AllowShortIfStatementsOnASingleLine: Never
AlwaysBreakBeforeMultilineStrings: false
DerivePointerAlignment: false
IncludeBlocks: Preserve
IndentCaseBlocks: true
PackConstructorInitializers: CurrentLine
SpacesBeforeTrailingComments: 1
---

8
.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
#ignore files
.vscode
ffrt.cfg
build
test/build
benchmarks/output
output
trace.json

View File

@ -132,6 +132,7 @@ ohos_shared_library("libffrt") {
"src/eu/co2_context.c",
"src/eu/co_routine.cpp",
"src/eu/co_routine_factory.cpp",
"src/eu/cpu_manager_strategy.cpp",
"src/eu/cpu_monitor.cpp",
"src/eu/cpu_worker.cpp",
"src/eu/cpuworker_manager.cpp",
@ -208,7 +209,7 @@ ohos_shared_library("libffrt") {
defines = []
if (ffrt_async_stack_enable) {
defines += [ "ASYNC_STACKTRACE" ]
defines += [ "FFRT_ASYNC_STACKTRACE" ]
sources += [ "src/dfx/async_stack/ffrt_async_stack.cpp" ]
}
@ -240,22 +241,9 @@ ohos_prebuilt_etc("blacklist_cfg") {
]
}
ohos_prebuilt_etc("log_ctr_whitelist_cfg") {
relative_install_dir = "ffrt"
source = "log_ctr_whitelist.conf"
part_name = "ffrt"
subsystem_name = "resourceschedule"
install_enable = true
install_images = [
"system",
"updater",
]
}
group("ffrt_ndk") {
deps = [
":blacklist_cfg",
":libffrt",
":log_ctr_whitelist_cfg",
]
}

View File

@ -14,7 +14,7 @@
domain: FFRT
AI_KERN_POWER_UP_ERR:
__BASE: {type: FAULT, level: MINOR, desc: AI KERNEL ipu fail or status invalid, preserve: true}
__BASE: {type: FAULT, level: MINOR, desc: AI KERNEL ipu powerup fail or status invalid, preserve: true}
ERROR_LEVEL: {type: INT32, desc: error level}
IC_NAME: {type: STRING, desc: IC name}
MODULE_NAME: {type: STRING, desc: module name}
@ -24,7 +24,7 @@ AI_KERN_POWER_UP_ERR:
DEVICE_NAME: {type: STRING, desc: device name}
RUNNING_TEST_SWITCH: {type: BOOL, desc: running_test_switch}
FAULT_PHENOMENON: {type: STRING, desc: fault_phenomenon}
NFF_THRESHOLD_MOUTH: {type: INT32, desc: nff_threshold_mouth}
NFF_THRESHOLD_MONTH: {type: INT32, desc: nff_threshold_month}
NFF_THRESHOLD_DAY: {type: INT32, desc: nff_threshold_day}
AI_KERN_WTD_TIMEOUT_ERR:
@ -38,7 +38,7 @@ AI_KERN_WTD_TIMEOUT_ERR:
DEVICE_NAME: {type: STRING, desc: device name}
RUNNING_TEST_SWITCH: {type: BOOL, desc: running_test_switch}
FAULT_PHENOMENON: {type: STRING, desc: fault_phenomenon}
NFF_THRESHOLD_MOUTH: {type: INT32, desc: nff_threshold_mouth}
NFF_THRESHOLD_MONTH: {type: INT32, desc: nff_threshold_month}
NFF_THRESHOLD_DAY: {type: INT32, desc: nff_threshold_day}
TASK_TIMEOUT:

View File

@ -57,7 +57,7 @@ FFRT_C_API bool ffrt_queue_has_task(ffrt_queue_t queue, const char* name);
FFRT_C_API void ffrt_queue_cancel_all(ffrt_queue_t queue);
/**
* @brief Cancels all unexecuted tasks and wait for running tasks in the queue.
* @brief Cancels all unexecuted tasks and wait for running tasks in the queue. No new tasks will be accepted.
*
* @param queue Indicates a queue handle.
* @version 1.0

View File

@ -1,13 +1,14 @@
# -----------------------------------------------------------------------------
# ffrt code
# -----------------------------------------------------------------------------
set(FFRT_LOG_PLAT "linux")
set(FFRT_LOG_PLAT_PATH "${FFRT_CODE_PATH}/dfx/log/linux")
file(GLOB_RECURSE FFRT_SRC_LIST
"${FFRT_CODE_PATH}/core/*.cpp"
"${FFRT_CODE_PATH}/eu/*.cpp"
"${FFRT_CODE_PATH}/eu/*.c"
"${FFRT_CODE_PATH}/internal_inc/*.cpp"
"${FFRT_CODE_PATH}/ipc/*.cpp"
"${FFRT_CODE_PATH}/queue/*.cpp"
"${FFRT_CODE_PATH}/sched/*.cpp"
"${FFRT_CODE_PATH}/sync/*.cpp"

View File

@ -17,7 +17,11 @@
#define FFRT_TASK_IO_H
#include "internal_inc/types.h"
#include "sched/qos.h"
#ifdef USE_OHOS_QOS
#include "qos.h"
#else
#include "staging_qos/sched/qos.h"
#endif
#include "c/executor_task.h"
namespace ffrt {

View File

@ -186,7 +186,7 @@ static inline void SaveNormalTaskStatus()
}
if (t->coRoutine && (t->coRoutine->status.load() == static_cast<int>(CoStatus::CO_NOT_FINISH))
&& t != g_cur_task) {
CoStart(t);
CoStart(t, GetCoEnv());
}
}
};
@ -229,7 +229,7 @@ static inline void SaveQueueTaskStatus()
}
if (t->coRoutine && (t->coRoutine->status.load() == static_cast<int>(CoStatus::CO_NOT_FINISH))) {
CoStart(reinterpret_cast<CPUEUTask*>(t));
CoStart(reinterpret_cast<CPUEUTask*>(t), GetCoEnv());
}
}
};
@ -504,15 +504,6 @@ std::string SaveWorkerStatusInfo(void)
AppendTaskInfo(ss, t);
ss << std::endl;
}
if (t->type != ffrt_normal_task && t->type != ffrt_queue_task && t->type != ffrt_invalid_task) {
ss << " qos " << i << ": worker tid " << thread.first->Id();
if (t->type == ffrt_io_task) {
ss << " io task is running";
} else {
ss << " uv task is running";
}
ss << std::endl;
}
}
if (tidArr.size() == 0) {
continue;

View File

@ -125,7 +125,7 @@ int dump_info_all(char *buf, uint32_t len)
dumpInfo += SaveNormalTaskStatusInfo();
dumpInfo += SaveQueueTaskStatusInfo();
if (dumpInfo.length() > (len - 1)) {
FFRT_LOGW("dumpInfo exceeds the buffer length, length:%d", dumpInfo.length());
FFRT_LOGW("dumpInfo exceeds the buffer length, info length:%d, input len:%u", dumpInfo.length(), len);
}
return snprintf_s(buf, len, len - 1, "%s", dumpInfo.c_str());
} else {

View File

@ -17,7 +17,7 @@
#include "hisysevent.h"
namespace ffrt {
void TaskTimeoutReport(std::stringstream& ss, std::string& processNameStr, std::string& senarioName)
void TaskTimeoutReport(std::stringstream& ss, const std::string& processNameStr, const std::string& senarioName)
{
std::string msg = ss.str();
std::string eventName = "TASK_TIMEOUT";

View File

@ -19,7 +19,8 @@
#include <string>
namespace ffrt {
#ifdef FFRT_SEND_EVENT
void TaskTimeoutReport(std::stringstream& ss, std::string& processNameStr, std::string& senarioName);
void TaskTimeoutReport(std::stringstream& ss, const std::string& processNameStr, const std::string& senarioName);
void WorkerEscapeReport(const std::string& processName, int qos, size_t totalNum);
#endif
}
#endif

View File

@ -112,7 +112,7 @@ private:
return true;
}
handle = dlopen(TRACE_LIB_PATH, RTLD_NOW | RTLD_LOCAL);
handle = dlopen(TRACE_LIB_PATH, RTLD_NOW | RTLD_LOCAL | RTLD_NODELETE);
if (handle == nullptr) {
FFRT_LOGE("load so[%s] fail", TRACE_LIB_PATH);
return false;
@ -221,6 +221,36 @@ static bool _IsTagEnabled(uint64_t label)
if (__builtin_expect(!!(_IsTagEnabled(HITRACE_TAG_FFRT)), 0)) \
_TraceCount(HITRACE_TAG_FFRT, tag, value); \
} while (false)
#define FFRT_TASK_BEGIN(tag, gid) \
do { \
if (__builtin_expect(!!(_IsTagEnabled(HITRACE_TAG_FFRT)), 0)) \
_StartTrace(HITRACE_TAG_FFRT, ("FFRT" + (tag) + "|" + std::to_string(gid)).c_str(), -1); \
} while (false)
#define FFRT_BLOCK_TRACER(gid, tag) \
do { \
if (__builtin_expect(!!(_IsTagEnabled(HITRACE_TAG_FFRT)), 0)) \
_StartTrace(HITRACE_TAG_FFRT, ("FFBK" #tag "|" + std::to_string(gid)).c_str(), -1); \
FFRT_TRACE_END(); \
} while (false)
#define FFRT_WAKE_TRACER(gid) \
do { \
if (__builtin_expect(!!(_IsTagEnabled(HITRACE_TAG_FFRT)), 0)) \
_StartTrace(HITRACE_TAG_FFRT, ("FFWK|" + std::to_string(gid)).c_str(), -1); \
FFRT_TRACE_END(); \
} while (false)
#define FFRT_EXECUTOR_TASK_BEGIN(ptr) \
do { \
if (__builtin_expect(!!(_IsTagEnabled(HITRACE_TAG_FFRT)), 0)) \
_StartTrace(HITRACE_TAG_FFRT, ("FFRTex_task|" + \
std::to_string(((reinterpret_cast<uintptr_t>(ptr)) & 0xffffffff))).c_str(), -1); \
} while (false)
#define FFRT_SERIAL_QUEUE_TASK_SUBMIT_MARKER(qid, gid) \
do { \
if (__builtin_expect(!!(_IsTagEnabled(HITRACE_TAG_FFRT)), 0)) \
_StartTrace(HITRACE_TAG_FFRT, ("P[sq_" + \
std::to_string(qid) + "]|" + std::to_string(gid)).c_str(), -1); \
FFRT_TRACE_END(); \
} while (false)
#define FFRT_TRACE_SCOPE(level, tag) ffrt::ScopedTrace ___tracer##tag(level, #tag)
#else
#define FFRT_TRACE_BEGIN(tag)
@ -229,6 +259,11 @@ static bool _IsTagEnabled(uint64_t label)
#define FFRT_TRACE_ASYNC_END(tag, tid)
#define FFRT_TRACE_COUNT(tag, value)
#define FFRT_TRACE_SCOPE(level, tag)
#define FFRT_TASK_BEGIN(tag, gid)
#define FFRT_BLOCK_TRACER(gid, tag)
#define FFRT_WAKE_TRACER(gid)
#define FFRT_EXECUTOR_TASK_BEGIN(ptr)
#define FFRT_SERIAL_QUEUE_TASK_SUBMIT_MARKER(qid, gid)
#endif
// DFX Trace for FFRT Normal Task
@ -254,24 +289,10 @@ static bool _IsTagEnabled(uint64_t label)
{ \
FFRT_TRACE_ASYNC_END("Co", gid); \
}
#define FFRT_TASK_BEGIN(tag, gid) \
{ \
FFRT_TRACE_BEGIN(("FFRT" + (tag) + "|" + std::to_string(gid)).c_str()); \
}
#define FFRT_TASK_END() \
{ \
FFRT_TRACE_END(); \
}
#define FFRT_BLOCK_TRACER(gid, tag) \
do { \
FFRT_TRACE_BEGIN(("FFBK" #tag "|" + std::to_string(gid)).c_str()); \
FFRT_TRACE_END(); \
} while (false)
#define FFRT_WAKE_TRACER(gid) \
do { \
FFRT_TRACE_BEGIN(("FFWK|" + std::to_string(gid)).c_str()); \
FFRT_TRACE_END(); \
} while (false)
// DFX Trace for FFRT Executor Task
#define FFRT_EXECUTOR_TASK_SUBMIT_MARKER(ptr) \
@ -290,22 +311,11 @@ static bool _IsTagEnabled(uint64_t label)
{ \
FFRT_TRACE_ASYNC_END("F", ((reinterpret_cast<uintptr_t>(ptr)) & 0x7fffffff)); \
}
#define FFRT_EXECUTOR_TASK_BEGIN(ptr) \
{ \
FFRT_TRACE_BEGIN(("FFRTex_task|" + \
std::to_string(((reinterpret_cast<uintptr_t>(ptr)) & 0xffffffff))).c_str()); \
}
#define FFRT_EXECUTOR_TASK_END() \
{ \
FFRT_TRACE_END(); \
}
// DFX Trace for FFRT Serial Queue Task
#define FFRT_SERIAL_QUEUE_TASK_SUBMIT_MARKER(qid, gid) \
do { \
FFRT_TRACE_BEGIN(("P[sq_" + std::to_string(qid) + "]|" + std::to_string(gid)).c_str()); \
FFRT_TRACE_END(); \
} while (false)
#define FFRT_SERIAL_QUEUE_TASK_EXECUTE_MARKER(gid) \
{ \
FFRT_TRACE_ASYNC_END("E", gid); \

View File

@ -24,12 +24,12 @@
namespace ffrt {
typedef struct ffrt_record_task_counter {
std::atomic<unsigned int> submitCounter{0};
std::atomic<unsigned int> enqueueCounter{0};
std::atomic<unsigned int> coSwitchCounter{0};
std::atomic<unsigned int> runCounter{0};
std::atomic<unsigned int> doneCounter{0};
std::atomic<unsigned int> cancelCounter{0};
alignas(cacheline_size) std::atomic<unsigned int> submitCounter{0};
alignas(cacheline_size) std::atomic<unsigned int> enqueueCounter{0};
alignas(cacheline_size) std::atomic<unsigned int> coSwitchCounter{0};
alignas(cacheline_size) std::atomic<unsigned int> runCounter{0};
alignas(cacheline_size) std::atomic<unsigned int> doneCounter{0};
alignas(cacheline_size) std::atomic<unsigned int> cancelCounter{0};
} ffrt_record_task_counter_t;
typedef struct ffrt_record_task_time {
@ -68,15 +68,22 @@ public:
return ffrt_be_used_;
}
static inline void UseFfrt()
{
if (unlikely(!ffrt_be_used_)) {
ffrt_be_used_ = true;
}
}
template<ffrt_executor_task_type_t taskType>
static inline void AddSubmitCounter(int qos)
static inline void TaskSubmit(int qos)
{
#if (FFRT_TRACE_RECORD_LEVEL >= FFRT_TRACE_RECORD_LEVEL_2)
g_recordTaskCounter_[taskType][qos].submitCounter.fetch_add(1, std::memory_order_relaxed);
#endif
}
static inline void RecordCreateTimeAndTid(uint64_t* createTime, int32_t* fromTid)
static inline void TaskSubmit(uint64_t* createTime, int32_t* fromTid)
{
#if (FFRT_TRACE_RECORD_LEVEL >= FFRT_TRACE_RECORD_LEVEL_1)
*createTime = TimeStamp();
@ -84,31 +91,16 @@ public:
#endif
}
template<ffrt_executor_task_type_t taskType>
static inline void TaskSubmit(int qos)
{
if (unlikely(!ffrt_be_used_)) {
ffrt_be_used_ = true;
}
AddSubmitCounter<taskType>(qos);
}
static inline void TaskSubmit(uint64_t* createTime, int32_t* fromTid)
{
if (unlikely(!ffrt_be_used_)) {
ffrt_be_used_ = true;
}
RecordCreateTimeAndTid(createTime, fromTid);
}
template<ffrt_executor_task_type_t taskType>
static inline void TaskSubmit(int qos, uint64_t* createTime, int32_t* fromTid)
{
if (unlikely(!ffrt_be_used_)) {
ffrt_be_used_ = true;
}
AddSubmitCounter<taskType>(qos);
RecordCreateTimeAndTid(createTime, fromTid);
#if (FFRT_TRACE_RECORD_LEVEL >= FFRT_TRACE_RECORD_LEVEL_2)
g_recordTaskCounter_[taskType][qos].submitCounter.fetch_add(1, std::memory_order_relaxed);
#endif
#if (FFRT_TRACE_RECORD_LEVEL >= FFRT_TRACE_RECORD_LEVEL_1)
*createTime = TimeStamp();
*fromTid = ExecuteCtx::Cur()->tid;
#endif
}
static inline void TaskExecute(uint64_t* executeTime)

View File

@ -97,13 +97,11 @@ void SDependenceManager::onSubmit(bool has_handle, ffrt_task_handle_t &handle, f
new (task)SCPUEUTask(attr, parent, ++parent->childNum, QoS());
}
FFRT_TRACE_BEGIN(("submit|" + std::to_string(task->gid)).c_str());
FFRT_LOGD("submit task[%lu], name[%s]", task->gid, task->label.c_str());
#ifdef FFRT_ASYNC_STACKTRACE
{
task->stackId = FFRTCollectAsyncStack();
}
#endif
QoS qos = (attr == nullptr ? QoS() : QoS(attr->qos_));
FFRTTraceRecord::TaskSubmit<ffrt_normal_task>(qos, &(task->createTime), &(task->fromTid));
@ -156,12 +154,11 @@ void SDependenceManager::onSubmit(bool has_handle, ffrt_task_handle_t &handle, f
return;
}
}
if (attr != nullptr) {
task->notifyWorker_ = attr->notifyWorker_;
}
FFRT_LOGD("Submit completed, enter ready queue, task[%lu], name[%s]", task->gid, task->label.c_str());
task->UpdateState(TaskState::READY);
FFRTTraceRecord::TaskEnqueue<ffrt_normal_task>(qos);
FFRT_TRACE_END();

View File

@ -61,10 +61,8 @@ static inline void CoStackCheck(CoRoutine* co)
}
extern pthread_key_t g_executeCtxTlsKey;
namespace {
pthread_key_t g_coThreadTlsKey = 0;
pthread_once_t g_coThreadTlsKeyOnce = PTHREAD_ONCE_INIT;
void CoEnvDestructor(void* args)
{
auto coEnv = static_cast<CoRoutineEnv*>(args);
@ -92,7 +90,6 @@ CoRoutineEnv* GetCoEnv()
}
return coEnv;
}
} // namespace
#ifdef FFRT_TASK_LOCAL_ENABLE
namespace {
@ -166,9 +163,9 @@ void UpdateWorkerTsdValueToThread(void** taskTsd)
threadTsd[key] = taskVal;
} else {
FFRT_UNLIKELY_COND_DO_ABORT((threadVal && taskVal && (threadVal != taskVal)),
"FFRT abort: mismatch key = [%u]", key);
"FFRT abort: mismatch key=[%u]", key);
FFRT_UNLIKELY_COND_DO_ABORT((threadVal && !taskVal),
"FFRT abort: unexpected: thread exist but task not exist, key = [%u]", key);
"FFRT abort: unexpected: thread exists but task not exist, key=[%u]", key);
}
taskTsd[key] = nullptr;
}
@ -268,8 +265,7 @@ static void CoSetStackProt(CoRoutine* co, int prot)
uint64_t mp = reinterpret_cast<uint64_t>(co->stkMem.stk);
mp = (mp + p_size - 1) / p_size * p_size;
int ret = mprotect(reinterpret_cast<void *>(static_cast<uintptr_t>(mp)), p_size, prot);
FFRT_UNLIKELY_COND_DO_ABORT(ret < 0,
"coroutine size:%lu, mp:0x%lx, page_size:%zu, result:%d, prot:%d, err:%d, %s",
FFRT_UNLIKELY_COND_DO_ABORT(ret < 0, "coroutine size:%lu, mp:0x%lx, page_size:%zu, result:%d, prot:%d, err:%d, %s",
static_cast<unsigned long>(sizeof(struct CoRoutine)), static_cast<unsigned long>(mp),
p_size, ret, prot, errno, strerror(errno));
}
@ -412,7 +408,7 @@ static inline void CoSwitchOutTransaction(ffrt::CPUEUTask* task)
}
// called by thread work
int CoStart(ffrt::CPUEUTask* task)
int CoStart(ffrt::CPUEUTask* task, CoRoutineEnv* coRoutineEnv)
{
if (task->coRoutine) {
int ret = task->coRoutine->status.exchange(static_cast<int>(CoStatus::CO_RUNNING));
@ -455,11 +451,11 @@ int CoStart(ffrt::CPUEUTask* task)
// 2. couroutine task block, switch to thread
// need suspend the coroutine task or continue to execute the coroutine task.
auto pending = GetCoEnv()->pending;
auto pending = coRoutineEnv->pending;
if (pending == nullptr) {
return 0;
}
GetCoEnv()->pending = nullptr;
coRoutineEnv->pending = nullptr;
FFRTTraceRecord::TaskCoSwitchOut(task);
// Fast path: skip state transition
if ((*pending)(task)) {
@ -468,7 +464,7 @@ int CoStart(ffrt::CPUEUTask* task)
return 0;
}
FFRT_WAKE_TRACER(task->gid); // fast path wk
GetCoEnv()->runningCo = co;
coRoutineEnv->runningCo = co;
}
return 0;
}
@ -514,7 +510,6 @@ void CoWake(ffrt::CPUEUTask* task, bool timeOut)
return;
}
// Fast path: state transition without lock
FFRT_LOGD("Cowake task[%lu], name[%s], timeOut[%d]", task->gid, task->label.c_str(), timeOut);
task->wakeupTimeOut = timeOut;
FFRT_WAKE_TRACER(task->gid);
switch (task->type) {

View File

@ -18,6 +18,7 @@
#include <atomic>
#include <functional>
#include <thread>
#include <pthread.h>
#include "co2_context.h"
#if defined(__aarch64__)
@ -26,8 +27,6 @@ constexpr size_t STACK_MAGIC = 0x7BCDABCDABCDABCD;
constexpr size_t STACK_MAGIC = 0x7BCDABCD;
#elif defined(__x86_64__)
constexpr size_t STACK_MAGIC = 0x7BCDABCDABCDABCD;
#elif defined(__riscv) && __riscv_xlen == 64
constexpr size_t STACK_MAGIC = 0x7BCDABCDABCDABCD;
#endif
#ifndef FFRT_STACK_SIZE
@ -62,6 +61,8 @@ constexpr uint64_t MIN_STACK_SIZE = 32 * 1024;
using CoCtx = struct co2_context;
struct CoRoutineEnv {
// when task is running, runningCo same with task->co
// if task switch out, set to null. if task complete, be used as co cache for next task.
CoRoutine* runningCo = nullptr;
CoCtx schCtx;
const std::function<bool(ffrt::CPUEUTask*)>* pending = nullptr;
@ -125,12 +126,14 @@ private:
void CoStackFree(void);
void CoWorkerExit(void);
int CoStart(ffrt::CPUEUTask* task);
int CoStart(ffrt::CPUEUTask* task, CoRoutineEnv* coRoutineEnv);
void CoYield(void);
void CoWait(const std::function<bool(ffrt::CPUEUTask*)>& pred);
void CoWake(ffrt::CPUEUTask* task, bool timeOut);
CoRoutineEnv* GetCoEnv(void);
#ifdef FFRT_TASK_LOCAL_ENABLE
void TaskTsdDeconstruct(ffrt::CPUEUTask* task);
#endif

View File

@ -0,0 +1,73 @@
/*
* Copyright (c) 2023 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cpu_manager_strategy.h"
#include "internal_inc/osal.h"
#include "eu/cpuworker_manager.h"
#include "eu/scpuworker_manager.h"
#include "eu/scpu_monitor.h"
#include <cstring>
namespace ffrt {
WorkerThread* CPUManagerStrategy::CreateCPUWorker(const QoS& qos, void* manager)
{
constexpr int processNameLen = 32;
static std::once_flag flag;
static char processName[processNameLen];
std::call_once(flag, []() {
GetProcessName(processName, processNameLen);
});
CPUWorkerManager* pIns = reinterpret_cast<CPUWorkerManager*>(manager);
// default strategy of worker ops
CpuWorkerOps ops {
CPUWorker::WorkerLooperDefault,
[pIns] (WorkerThread* thread) { return pIns->PickUpTaskFromGlobalQueue(thread); },
[pIns] (const WorkerThread* thread) { pIns->NotifyTaskPicked(thread); },
[pIns] (const WorkerThread* thread) { return pIns->WorkerIdleAction(thread); },
[pIns] (WorkerThread* thread) { pIns->WorkerRetired(thread); },
[pIns] (WorkerThread* thread) { pIns->WorkerPrepare(thread); },
[pIns] (const WorkerThread* thread, int timeout) { return pIns->TryPoll(thread, timeout); },
[pIns] (WorkerThread* thread) { return pIns->StealTaskBatch(thread); },
[pIns] (WorkerThread* thread) { return pIns->PickUpTaskBatch(thread); },
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
[pIns] (const WorkerThread* thread) { return pIns->IsExceedRuningThreshold(thread); },
[pIns] () { return pIns->IsBlockAwareInit(); },
#endif
};
return new (std::nothrow) CPUWorker(qos, std::move(ops), pIns);
}
CPUMonitor* CPUManagerStrategy::CreateCPUMonitor(void* manager)
{
constexpr int processNameLen = 32;
static std::once_flag flag;
static char processName[processNameLen];
std::call_once(flag, []() {
GetProcessName(processName, processNameLen);
});
SCPUWorkerManager* pIns = reinterpret_cast<SCPUWorkerManager*>(manager);
// default strategy of monitor ops
CpuMonitorOps ops {
[pIns] (const QoS& qos) { return pIns->IncWorker(qos); },
[pIns] (const QoS& qos) { pIns-> WakeupWorkers(qos); },
[pIns] (const QoS& qos) { return pIns->GetTaskCount(qos); },
[pIns] (const QoS& qos) { return pIns->GetWorkerCount(qos); },
CPUMonitor::HandleTaskNotifyDefault,
};
return new SCPUMonitor(std::move(ops));
}
}

View File

@ -0,0 +1,79 @@
/*
* Copyright (c) 2023 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FFRT_CPU_MANAGER_INTERFACE_HPP
#define FFRT_CPU_MANAGER_INTERFACE_HPP
#ifndef FFRT_CPU_MANAGER_STRATEGY_HPP
#define FFRT_CPU_MANAGER_STRATEGY_HPP
#include "eu/worker_thread.h"
#include "qos.h"
#include "sync/poller.h"
#include "tm/cpu_task.h"
namespace ffrt {
enum class WorkerAction {
RETRY = 0,
RETIRE,
MAX,
};
enum class TaskNotifyType {
TASK_PICKED = 0,
TASK_ADDED,
TASK_LOCAL,
};
enum class SleepType {
SLEEP_UNTIL_WAKEUP = 0,
SLEEP_UNTIL_INTERRUPT,
SLEEP_BREAK,
};
struct CpuWorkerOps {
std::function<void (WorkerThread*)> WorkerLooper;
std::function<CPUEUTask* (WorkerThread*)> PickUpTask;
std::function<void (const WorkerThread*)> NotifyTaskPicked;
std::function<WorkerAction (const WorkerThread*)> WaitForNewAction;
std::function<void (WorkerThread*)> WorkerRetired;
std::function<void (WorkerThread*)> WorkerPrepare;
std::function<PollerRet (const WorkerThread*, int timeout)> TryPoll;
std::function<unsigned int (WorkerThread*)> StealTaskBatch;
std::function<CPUEUTask* (WorkerThread*)> PickUpTaskBatch;
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
std::function<bool (WorkerThread*)> IsExceedRunningThreshold;
std::function<bool (void)> IsBlockAwareInit;
#endif
};
struct CpuMonitorOps {
std::function<bool (const QoS& qos)> IncWorker;
std::function<void (const QoS& qos)> WakeupWorkers;
std::function<int (const QoS& qos)> GetTaskCount;
std::function<int (const QoS& qos)> GetWorkerCount;
std::function<void (const QoS& qos, void*, TaskNotifyType)> HandleTaskNotity;
std::function<void (const QoS& qos, void*, TaskNotifyType)> HandleTaskNotify;
};
class CPUMonitor;
class CPUManagerStrategy {
public:
static WorkerThread* CreateCPUWorker(const QoS& qos, void* manager);
static CPUMonitor* CreateCPUMonitor(void* manager);
};
}
#endif
#endif

View File

@ -27,6 +27,7 @@
#include "sync/poller.h"
#include "util/ffrt_facade.h"
#include "util/spmc_queue.h"
namespace {
const size_t TIGGER_SUPPRESS_WORKER_COUNT = 4;
const size_t TIGGER_SUPPRESS_EXECUTION_NUM = 2;
@ -39,7 +40,6 @@ namespace ffrt {
CPUMonitor::CPUMonitor(CpuMonitorOps&& ops) : ops(ops)
{
SetupMonitor();
StartMonitor();
}
CPUMonitor::~CPUMonitor()
@ -142,6 +142,7 @@ void CPUMonitor::MonitorMain()
exceedUpperWaterLine[i] = true;
}
}
stopMonitor = true;
}
bool CPUMonitor::IsExceedRunningThreshold(const QoS& qos)
@ -162,25 +163,46 @@ bool CPUMonitor::IsBlockAwareInit(void)
void CPUMonitor::TimeoutCount(const QoS& qos)
{
WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
workerCtrl.lock.lock();
std::lock_guard lk(workerCtrl.lock);
workerCtrl.sleepingWorkerNum--;
workerCtrl.lock.unlock();
}
void CPUMonitor::WakeupCount(const QoS& qos, bool isDeepSleepWork)
void CPUMonitor::WakeupSleep(const QoS& qos, bool irqWake)
{
WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
workerCtrl.lock.lock();
std::lock_guard lk(workerCtrl.lock);
if (irqWake) {
workerCtrl.irqEnable = false;
}
workerCtrl.sleepingWorkerNum--;
workerCtrl.executionNum++;
workerCtrl.lock.unlock();
}
int CPUMonitor::WakedWorkerNum(const QoS& qos)
int CPUMonitor::TotalCount(const QoS& qos)
{
WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
std::unique_lock lk(workerCtrl.lock);
return workerCtrl.executionNum;
workerCtrl.lock.lock();
int total = workerCtrl.sleepingWorkerNum + workerCtrl.executionNum;
workerCtrl.lock.unlock();
return total;
}
void CPUMonitor::RollbackDestroy(const QoS& qos, bool irqWake)
{
WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
std::lock_guard lk(workerCtrl.lock);
if (irqWake) {
workerCtrl.irqEnable = false;
}
workerCtrl.executionNum++;
}
bool CPUMonitor::TryDestroy(const QoS& qos)
{
WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
std::lock_guard lk(workerCtrl.lock);
workerCtrl.sleepingWorkerNum--;
return workerCtrl.sleepingWorkerNum > 0;
}
int CPUMonitor::SleepingWorkerNum(const QoS& qos)
@ -190,45 +212,44 @@ int CPUMonitor::SleepingWorkerNum(const QoS& qos)
return workerCtrl.sleepingWorkerNum;
}
bool CPUMonitor::HasDeepSleepWork(const QoS& qos)
int CPUMonitor::WakedWorkerNum(const QoS& qos)
{
WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
std::lock_guard lock(workerCtrl.lock);
return workerCtrl.hasWorkDeepSleep;
std::lock_guard lk(workerCtrl.lock);
return workerCtrl.executionNum;
}
void CPUMonitor::IntoDeepSleep(const QoS& qos)
{
WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
workerCtrl.lock.lock();
std::lock_guard lk(workerCtrl.lock);
workerCtrl.deepSleepingWorkerNum++;
workerCtrl.lock.unlock();
}
void CPUMonitor::OutOfDeepSleep(const QoS& qos)
void CPUMonitor::WakeupDeepSleep(const QoS& qos, bool irqWake)
{
WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
workerCtrl.lock.lock();
std::lock_guard lk(workerCtrl.lock);
if (irqWake) {
workerCtrl.irqEnable = false;
}
workerCtrl.sleepingWorkerNum--;
workerCtrl.executionNum++;
workerCtrl.deepSleepingWorkerNum--;
workerCtrl.lock.unlock();
workerCtrl.executionNum++;
}
void CPUMonitor::IntoPollWait(const QoS& qos)
{
WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
workerCtrl.lock.lock();
std::lock_guard lk(workerCtrl.lock);
workerCtrl.pollWaitFlag = true;
workerCtrl.lock.unlock();
}
void CPUMonitor::OutOfPollWait(const QoS& qos)
{
WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
workerCtrl.lock.lock();
std::lock_guard lk(workerCtrl.lock);
workerCtrl.pollWaitFlag = false;
workerCtrl.lock.unlock();
}
bool CPUMonitor::IsExceedDeepSleepThreshold()
@ -237,10 +258,9 @@ bool CPUMonitor::IsExceedDeepSleepThreshold()
int deepSleepingWorkerNum = 0;
for (unsigned int i = 0; i < static_cast<unsigned int>(QoS::Max()); i++) {
WorkerCtrl& workerCtrl = ctrlQueue[i];
workerCtrl.lock.lock();
std::lock_guard lk(workerCtrl.lock);
deepSleepingWorkerNum += workerCtrl.deepSleepingWorkerNum;
totalWorker += workerCtrl.executionNum + workerCtrl.sleepingWorkerNum;
workerCtrl.lock.unlock();
}
return deepSleepingWorkerNum * 2 > totalWorker;
}
@ -278,7 +298,7 @@ void CPUMonitor::Poke(const QoS& qos, uint32_t taskCount, TaskNotifyType notifyT
ops.WakeupWorkers(qos);
} else if ((runningNum < workerCtrl.maxConcurrency) && (totalNum < workerCtrl.hardLimit)) {
workerCtrl.executionNum++;
FFRTTraceRecord::WorkRecord(static_cast<int>(qos), workerCtrl.executionNum);
FFRTTraceRecord::WorkRecord((int)qos, workerCtrl.executionNum);
workerCtrl.lock.unlock();
ops.IncWorker(qos);
} else {
@ -331,69 +351,83 @@ void CPUMonitor::HandleTaskNotifyDefault(const QoS& qos, void* p, TaskNotifyType
}
}
void CPUMonitor::PokeAdd(const QoS& qos)
// conservative strategy for poking workers
void CPUMonitor::HandleTaskNotifyConservative(const QoS& qos, void* p, TaskNotifyType notifyType)
{
WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
workerCtrl.lock.lock();
if (static_cast<uint32_t>(workerCtrl.sleepingWorkerNum) > 0) {
workerCtrl.lock.unlock();
CPUMonitor* monitor = reinterpret_cast<CPUMonitor*>(p);
int taskCount = monitor->ops.GetTaskCount(qos);
if (taskCount == 0) {
//no available task in global queue, skip
return;
} else {
size_t runningNum = workerCtrl.executionNum;
size_t totalNum = static_cast<size_t>(workerCtrl.sleepingWorkerNum + workerCtrl.executionNum);
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
if (workerCtrl.executionNum >= workerCtrl.maxConcurrency) {
if (blockAwareInit && !BlockawareLoadSnapshot(keyPtr, &domainInfoNotify)) {
runningNum = workerCtrl.executionNum - domainInfoNotify.localinfo[qos()].nrBlocked;
}
}
#endif
if ((runningNum < workerCtrl.maxConcurrency) && (totalNum < workerCtrl.hardLimit)) {
workerCtrl.executionNum++;
workerCtrl.lock.unlock();
ops.IncWorker(qos);
} else {
if (workerCtrl.pollWaitFlag) {
FFRTFacade::GetPPInstance().GetPoller(qos).WakeUp();
}
workerCtrl.lock.unlock();
}
}
}
void CPUMonitor::PokePick(const QoS& qos)
{
WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
constexpr double thresholdTaskPick = 1.0;
WorkerCtrl& workerCtrl = monitor->ctrlQueue[static_cast<int>(qos)];
workerCtrl.lock.lock();
if (static_cast<uint32_t>(workerCtrl.sleepingWorkerNum) > 0) {
if (workerCtrl.hasWorkDeepSleep &&GetOps().GetTaskCount(qos) == 0) {
if (notifyType == TaskNotifyType::TASK_PICKED) {
int wakedWorkerCount = workerCtrl.executionNum;
double remainingLoadRatio = (wakedWorkerCount == 0) ? static_cast<double>(workerCtrl.maxConcurrency) :
static_cast<double>(taskCount) / static_cast<double>(wakedWorkerCount);
if (remainingLoadRatio <= thresholdTaskPick) {
//for task pick, wake worker when load ratio > 1
workerCtrl.lock.unlock();
return;
}
workerCtrl.lock.unlock();
ops.WakeupWorkers(qos);
} else {
size_t runningNum = workerCtrl.executionNum;
size_t totalNum = static_cast<size_t>(workerCtrl.sleepingWorkerNum + workerCtrl.executionNum);
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
if (workerCtrl.executionNum >= workerCtrl.maxConcurrency) {
if (blockAwareInit && !BlockawareLoadSnapshot(keyPtr, &domainInfoNotify)) {
runningNum = workerCtrl.executionNum - domainInfoNotify.localinfo[qos()].nrBlocked;
}
}
#endif
if ((runningNum < workerCtrl.maxConcurrency) && (totalNum < workerCtrl.hardLimit)) {
}
if (static_cast<uint32_t>(workerCtrl.executionNum) < workerCtrl.maxConcurrency) {
if (workerCtrl.sleepingWorkerNum == 0) {
FFRT_LOGI("begin to create worker, notifyType[%d]"
"execnum[%d], maxconcur[%d], slpnum[%d], dslpnum[%d]",
notifyType, workerCtrl.executionNum, workerCtrl.maxConcurrency,
workerCtrl.sleepingWorkerNum, workerCtrl.deepSleepingWorkerNum);
workerCtrl.executionNum++;
workerCtrl.lock.unlock();
ops.IncWorker(qos);
monitor->ops.IncWorker(qos);
} else {
if (workerCtrl.pollWaitFlag) {
FFRTFacade::GetPPInstance().GetPoller(qos).WakeUp();
}
workerCtrl.lock.unlock();
monitor->ops.WakeupWorkers(qos);
}
} else {
workerCtrl.lock.unlock();
}
}
void CPUMonitor::HandleTaskNotifyUltraConservative(const QoS& qos, void* p, TaskNotifyType notifyType)
{
(void)notifyType;
CPUMonitor* monitor = reinterpret_cast<CPUMonitor*>(p);
int taskCount = monitor->ops.GetTaskCount(qos);
if (taskCount == 0) {
// no available task in global queue, skip
return;
}
WorkerCtrl& workerCtrl = monitor->ctrlQueue[static_cast<int>(qos)];
std::lock_guard lock(workerCtrl.lock);
int runningNum = workerCtrl.executionNum;
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
if (monitor->blockAwareInit && !BlockawareLoadSnapshot(monitor->keyPtr, &monitor->domainInfoNotify)) {
/* nrRunning may not be updated in a timely manner */
runningNum = workerCtrl.executionNum - monitor->domainInfoNotify.localinfo[qos()].nrBlocked;
if (!monitor->stopMonitor && taskCount == runningNum) {
BlockawareWake();
return;
}
}
#endif
if (taskCount < runningNum) {
return;
}
if (runningNum < static_cast<int>(workerCtrl.maxConcurrency)) {
if (workerCtrl.sleepingWorkerNum == 0) {
workerCtrl.executionNum++;
monitor->ops.IncWorker(qos);
} else {
monitor->ops.WakeupWorkers(qos);
}
}
}

View File

@ -22,23 +22,23 @@
#include <mutex>
#include "qos.h"
#include "cpp/mutex.h"
#include "eu/cpu_manager_interface.h"
#include "eu/cpu_manager_strategy.h"
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
#include "eu/blockaware.h"
#endif
#include "sync/sync.h"
namespace ffrt {
struct WorkerCtrl {
alignas(cacheline_size) fast_mutex lock;
alignas(cacheline_size) int executionNum = 0;
alignas(cacheline_size) int sleepingWorkerNum = 0;
alignas(cacheline_size) bool irqEnable = false;
size_t hardLimit = 0;
size_t maxConcurrency = 0;
int executionNum = 0;
int sleepingWorkerNum = 0;
bool pollWaitFlag = false;
int deepSleepingWorkerNum = 0;
bool hasWorkDeepSleep = false;
bool retryBeforeDeepSleep = true;
std::mutex lock;
};
class CPUMonitor {
@ -48,14 +48,18 @@ public:
CPUMonitor& operator=(const CPUMonitor&) = delete;
virtual ~CPUMonitor();
uint32_t GetMonitorTid() const;
virtual SleepType IntoSleep(const QoS& qos) = 0;
virtual void WakeupCount(const QoS& qos, bool isDeepSleepWork = false);
int TotalCount(const QoS& qos);
virtual void IntoSleep(const QoS& qos) = 0;
void WakeupSleep(const QoS& qos, bool irqWake = false);
void IntoDeepSleep(const QoS& qos);
void OutOfDeepSleep(const QoS& qos);
void WakeupDeepSleep(const QoS& qos, bool irqWake = false);
void TimeoutCount(const QoS& qos);
bool IsExceedDeepSleepThreshold();
void IntoPollWait(const QoS& qos);
void OutOfPollWait(const QoS& qos);
void RollbackDestroy(const QoS& qos, bool irqWake = false);
bool TryDestroy(const QoS& qos);
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
bool IsExceedRunningThreshold(const QoS& qos);
bool IsBlockAwareInit(void);
@ -67,27 +71,23 @@ public:
int SetWorkerMaxNum(const QoS& qos, int num);
/* strategy options for handling task notify events */
static void HandleTaskNotifyDefault(const QoS& qos, void* p, TaskNotifyType notifyType);
static void HandleTaskNotifyConservative(const QoS& qos, void* p, TaskNotifyType notifyType);
static void HandleTaskNotifyUltraConservative(const QoS& qos, void* p, TaskNotifyType notifyType);
int WakedWorkerNum(const QoS& qos);
int SleepingWorkerNum(const QoS& qos);
void NotifyWorkers(const QoS& qos, int number);
bool HasDeepSleepWork(const QoS& qos);
void StartMonitor();
CpuMonitorOps ops;
std::thread* monitorThread = nullptr;
uint32_t monitorTid = 0;
protected:
WorkerCtrl ctrlQueue[QoS::MaxNum()];
void PokeAdd(const QoS& qos);
void PokePick(const QoS& qos);
void Poke(const QoS& qos, uint32_t taskCount, TaskNotifyType notifyType);
CpuMonitorOps& GetOps()
{
return ops;
}
private:
void SetupMonitor();
void StartMonitor();
std::thread* monitorThread;
CpuMonitorOps ops;
std::atomic<bool> setWorkerMaxNum[QoS::MaxNum()];
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
bool blockAwareInit = false;
bool stopMonitor = false;
@ -98,6 +98,10 @@ private:
BlockawareDomainInfoArea domainInfoNotify;
std::atomic<bool> exceedUpperWaterLine[QoS::MaxNum()];
#endif
private:
void SetupMonitor();
std::atomic<bool> setWorkerMaxNum[QoS::MaxNum()];
};
}
#endif /* CPU_MONITOR_H */

View File

@ -17,28 +17,31 @@
#include "eu/worker_thread.h"
#include "ffrt_trace.h"
#include "sched/scheduler.h"
#include "eu/cpu_manager_interface.h"
#include "eu/cpu_manager_strategy.h"
#include "dfx/bbox/bbox.h"
#include "eu/func_manager.h"
#include "dm/dependence_manager.h"
#include "dfx/perf/ffrt_perf.h"
#include "sync/poller.h"
#include "util/spmc_queue.h"
#include "util/ffrt_facade.h"
#include "tm/cpu_task.h"
#include "tm/queue_task.h"
#ifdef FFRT_ASYNC_STACKTRACE
#include "dfx/async_stack/ffrt_async_stack.h"
#endif
#include "eu/cpuworker_manager.h"
namespace {
int PLACE_HOLDER = 0;
const unsigned int TRY_POLL_FREQ = 51;
}
namespace ffrt {
void CPUWorker::Run(CPUEUTask* task, CPUWorker* worker)
void CPUWorker::Run(CPUEUTask* task, CoRoutineEnv* coRoutineEnv, CPUWorker* worker)
{
if constexpr(USE_COROUTINE) {
if (CoStart(task) != 0) {
worker->localFifo.PushTail(task);
while (CoStart(task, coRoutineEnv) != 0) {
usleep(CO_CREATE_RETRY_INTERVAL);
}
return;
}
@ -104,7 +107,13 @@ void* CPUWorker::WrapDispatch(void* worker)
void CPUWorker::RunTask(ffrt_executor_task_t* curtask, CPUWorker* worker)
{
auto ctx = ExecuteCtx::Cur();
ExecuteCtx* ctx = ExecuteCtx::Cur();
CoRoutineEnv* coRoutineEnv = GetCoEnv();
RunTask(curtask, worker, ctx, coRoutineEnv);
}
void CPUWorker::RunTask(ffrt_executor_task_t* curtask, CPUWorker* worker, ExecuteCtx* ctx, CoRoutineEnv* coRoutineEnv)
{
CPUEUTask* task = reinterpret_cast<CPUEUTask*>(curtask);
worker->curTask = task;
worker->curTaskType_ = task->type;
@ -117,7 +126,7 @@ void CPUWorker::RunTask(ffrt_executor_task_t* curtask, CPUWorker* worker)
#endif
ctx->task = task;
ctx->lastGid_ = task->gid;
Run(task, worker);
Run(task, coRoutineEnv, worker);
ctx->task = nullptr;
break;
}
@ -213,7 +222,6 @@ void CPUWorker::Dispatch(CPUWorker* worker)
FFRT_PERF_WORKER_AWAKE(static_cast<int>(worker->GetQos()));
worker->ops.WorkerLooper(worker);
CoWorkerExit();
FFRT_LOGD("ExecutionThread exited");
worker->ops.WorkerRetired(worker);
}
@ -289,4 +297,37 @@ void CPUWorker::WorkerLooperDefault(WorkerThread* p)
}
}
}
// work looper with standard procedure which could be strategical
void CPUWorker::WorkerLooperStandard(WorkerThread* p)
{
CPUWorker* worker = reinterpret_cast<CPUWorker*>(p);
auto mgr = reinterpret_cast<CPUWorkerManager*>(p->worker_mgr);
auto& sched = FFRTFacade::GetSchedInstance()->GetScheduler(p->GetQos());
auto lock = mgr->GetSleepCtl(static_cast<int>(p->GetQos()));
ExecuteCtx* ctx = ExecuteCtx::Cur();
CoRoutineEnv* coRoutineEnv = GetCoEnv();
for (;;) {
// try get task
CPUEUTask* task = nullptr;
if (!mgr->tearDown) {
std::lock_guard lg(*lock);
task = sched.PickNextTask();
}
// if succ, notify picked and run task
if (task != nullptr) {
mgr->NotifyTaskPicked(worker);
RunTask(reinterpret_cast<ffrt_executor_task_t*>(task), worker, ctx, coRoutineEnv);
continue;
}
// otherwise, worker wait action
auto action = worker->ops.WaitForNewAction(worker);
if (action == WorkerAction::RETRY) {
continue;
} else if (action == WorkerAction::RETIRE) {
break;
}
}
}
} // namespace ffrt

View File

@ -17,7 +17,7 @@
#define FFRT_CPU_WORKER_HPP
#include "eu/worker_thread.h"
#include "eu/cpu_manager_interface.h"
#include "eu/cpu_manager_strategy.h"
#include "c/executor_task.h"
#include "sync/poller.h"
#include "util/spmc_queue.h"
@ -26,10 +26,12 @@
namespace ffrt {
const unsigned int LOCAL_QUEUE_SIZE = 128;
const unsigned int STEAL_BUFFER_SIZE = LOCAL_QUEUE_SIZE / 2;
class CPUWorker : public WorkerThread {
public:
CPUWorker(const QoS& qos, CpuWorkerOps&& ops) : WorkerThread(qos), ops(ops)
CPUWorker(const QoS& qos, CpuWorkerOps&& ops, void* worker_mgr) : WorkerThread(qos), ops(ops)
{
this->worker_mgr = worker_mgr;
localFifo.Init(LOCAL_QUEUE_SIZE);
#ifdef FFRT_PTHREAD_ENABLE
Start(CPUWorker::WrapDispatch, this);
@ -48,13 +50,15 @@ public:
public:
/* strategy options for worklooper function */
static void WorkerLooperDefault(WorkerThread* p);
static void WorkerLooperStandard(WorkerThread* p);
static void Run(CPUEUTask* task, CoRoutineEnv* coRoutineEnv, CPUWorker* worker);
private:
static void* WrapDispatch(void* worker);
static void Dispatch(CPUWorker* worker);
static void Run(CPUEUTask* task, CPUWorker* worker);
static void Run(ffrt_executor_task_t* task, ffrt_qos_t qos);
static void RunTask(ffrt_executor_task_t* curtask, CPUWorker* worker);
static void RunTask(ffrt_executor_task_t* curtask, CPUWorker* worker, ExecuteCtx* ctx, CoRoutineEnv* coRoutineEnv);
static void RunTaskLifo(ffrt_executor_task_t* task, CPUWorker* worker);
static void* GetTask(CPUWorker* worker);
static PollerRet TryPoll(CPUWorker* worker, int timeout);

View File

@ -17,8 +17,9 @@
#include <sys/stat.h>
#include "qos.h"
#include "dfx/perf/ffrt_perf.h"
#include "dfx/trace_record/ffrt_trace_record.h"
#include "eu/cpu_monitor.h"
#include "eu/cpu_manager_interface.h"
#include "eu/cpu_manager_strategy.h"
#include "sched/scheduler.h"
#include "sched/workgroup_internal.h"
#include "eu/qos_interface.h"
@ -72,8 +73,9 @@ bool CPUWorkerManager::IncWorker(const QoS& qos)
FFRT_PERF_WORKER_WAKE(workerQos);
lock.unlock();
#ifdef FFRT_WORKER_MONITOR
FFRTFFacade::GetWMInstance().SubmitTask();
FFRTFacade::GetWMInstance().SubmitTask();
#endif
FFRTTraceRecord::UseFfrt();
return true;
}

View File

@ -19,7 +19,7 @@
#include "eu/worker_manager.h"
#include "eu/cpu_worker.h"
#include "eu/cpu_monitor.h"
#include "eu/cpu_manager_interface.h"
#include "eu/cpu_manager_strategy.h"
#include "sync/poller.h"
#include "util/spmc_queue.h"
#include "tm/cpu_task.h"
@ -72,7 +72,6 @@ public:
return monitor;
}
protected:
virtual void WorkerPrepare(WorkerThread* thread) = 0;
virtual void WakeupWorkers(const QoS& qos) = 0;
bool IncWorker(const QoS& qos) override;
@ -92,10 +91,10 @@ protected:
bool IsBlockAwareInit(void);
#endif
private:
bool WorkerTearDown();
bool DecWorker() override
{return false;}
virtual void WorkerRetiredSimplified(WorkerThread* thread) = 0;
void NotifyTaskPicked(const WorkerThread* thread);
/* strategy options for task pick up */
virtual CPUEUTask* PickUpTaskFromGlobalQueue(WorkerThread* thread) = 0;
@ -103,6 +102,7 @@ private:
/* strategy options for worker wait action */
virtual WorkerAction WorkerIdleAction(const WorkerThread* thread) = 0;
virtual WorkerAction WorkerIdleActionSimplified(const WorkerThread* thread) = 0;
void WorkerSetup(WorkerThread* thread);
PollerRet TryPoll(const WorkerThread* thread, int timeout = -1);

View File

@ -12,19 +12,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "eu/cpu_manager_interface.h"
#include "eu/cpu_manager_strategy.h"
#include "eu/scpu_monitor.h"
namespace ffrt {
SleepType SCPUMonitor::IntoSleep(const QoS& qos)
void SCPUMonitor::IntoSleep(const QoS& qos)
{
SleepType type = SleepType::SLEEP_UNTIL_WAKEUP;
WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
workerCtrl.lock.lock();
workerCtrl.sleepingWorkerNum++;
workerCtrl.executionNum--;
workerCtrl.lock.unlock();
return type;
}
void SCPUMonitor::Notify(const QoS& qos, TaskNotifyType notifyType)

View File

@ -16,7 +16,7 @@
#ifndef SCPU_MONITOR_H
#define SCPU_MONITOR_H
#include "eu/cpu_manager_interface.h"
#include "eu/cpu_manager_strategy.h"
#include "eu/cpu_monitor.h"
namespace ffrt {
@ -24,8 +24,7 @@ namespace ffrt {
class SCPUMonitor : public CPUMonitor {
public:
SCPUMonitor(CpuMonitorOps&& ops) : CPUMonitor(std::move(ops)) {};
SleepType IntoSleep(const QoS& qos) override;
void IntoSleep(const QoS& qos) override;
void Notify(const QoS& qos, TaskNotifyType notifyType) override;
void WorkerInit() override;
};

View File

@ -18,24 +18,34 @@
#include <sys/stat.h>
#include "dfx/perf/ffrt_perf.h"
#include "eu/co_routine_factory.h"
#include "eu/cpu_manager_interface.h"
#include "eu/cpu_manager_strategy.h"
#include "eu/qos_interface.h"
#include "eu/scpu_monitor.h"
#include "sched/scheduler.h"
#include "sched/workgroup_internal.h"
#include "util/ffrt_facade.h"
#include "eu/scpuworker_manager.h"
#include "util/slab.h"
#include "eu/scpuworker_manager.h"
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
#include "eu/blockaware.h"
#endif
namespace {
#if !defined(OHOS_STANDARD_SYSTEM)
/* SUPPORT_WORKER_DESTRUCT indicates that the idle thread destruction function is supported.
* The stack canary is saved or destored during coroutine switch-out and switch-in,
* currently, only the stack canary used by the ohos compiler stack protection is global
* and is not affected by worker destruction.
*/
#if !defined(SUPPORT_WORKER_DESTRUCT)
constexpr int waiting_seconds = 10;
#else
constexpr int waiting_seconds = 5;
#endif
const std::map<std::string, void(*)(const ffrt::QoS&, void*, ffrt::TaskNotifyType)> NOTIFY_FUNCTION_FACTORY = {
{ "CameraDaemon", ffrt::CPUMonitor::HandleTaskNotifyConservative },
{ "bluetooth", ffrt::CPUMonitor::HandleTaskNotifyUltraConservative },
};
}
namespace ffrt {
@ -44,6 +54,7 @@ constexpr uint64_t DELAYED_WAKED_UP_TASK_TIME_INTERVAL = 5 * 1000 * 1000;
SCPUWorkerManager::SCPUWorkerManager()
{
monitor = CPUManagerStrategy::CreateCPUMonitor(this);
(void)monitor->StartMonitor();
}
SCPUWorkerManager::~SCPUWorkerManager()
@ -71,81 +82,38 @@ SCPUWorkerManager::~SCPUWorkerManager()
delete monitor;
}
void SCPUWorkerManager::AddDelayedTask(int qos)
void SCPUWorkerManager::WorkerRetiredSimplified(WorkerThread* thread)
{
WaitUntilEntry* we = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
we->tp = std::chrono::steady_clock::now() + std::chrono::microseconds(DELAYED_WAKED_UP_TASK_TIME_INTERVAL);
we->cb = ([this, qos](WaitEntry* we) {
int taskCount = GetTaskCount(QoS(qos));
pid_t pid = thread->Id();
int qos = static_cast<int>(thread->GetQos());
bool isEmptyQosThreads = false;
{
std::unique_lock<std::shared_mutex> lck(groupCtl[qos].tgMutex);
bool isEmpty = groupCtl[qos].threads.empty();
lck.unlock();
if (!isEmpty) {
SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(we));
FFRT_LOGW("qos[%d] has worker, no need add delayed task", qos);
return;
thread->SetExited(true);
thread->Detach();
auto worker = std::move(groupCtl[qos].threads[thread]);
int ret = groupCtl[qos].threads.erase(thread);
if (ret != 1) {
FFRT_LOGE("erase qos[%d] thread failed, %d elements removed", qos, ret);
}
if (taskCount != 0) {
FFRT_LOGI("notify task, qos %d", qos);
FFRTFacade::GetEUInstance().NotifyTaskAdded(QoS(qos));
} else {
AddDelayedTask(qos);
isEmptyQosThreads = groupCtl[qos].threads.empty();
WorkerLeaveTg(QoS(qos), pid);
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
if (IsBlockAwareInit()) {
ret = BlockawareUnregister();
if (ret != 0) {
FFRT_LOGE("blockaware unregister fail, ret[%d]", ret);
}
}
SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(we));
});
if (!DelayedWakeup(we->tp, we, we->cb)) {
SimpleAllocator<WaitUntilEntry>::FreeMem(we);
FFRT_LOGW("add delyaed task failed, qos %d", qos);
}
}
WorkerAction SCPUWorkerManager::WorkerIdleAction(const WorkerThread* thread)
{
if (tearDown) {
return WorkerAction::RETIRE;
#endif
worker = nullptr;
}
auto& ctl = sleepCtl[thread->GetQos()];
std::unique_lock lk(ctl.mutex);
(void)monitor->IntoSleep(thread->GetQos());
FFRT_PERF_WORKER_IDLE(static_cast<int>(thread->GetQos()));
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
BlockawareEnterSleeping();
#endif
if (ctl.cv.wait_for(lk, std::chrono::seconds(waiting_seconds), [this, thread] {
bool taskExistence = GetTaskCount(thread->GetQos()) ||
reinterpret_cast<const CPUWorker*>(thread)->priority_task ||
reinterpret_cast<const CPUWorker*>(thread)->localFifo.GetLength();
bool needPoll = !FFRTFacade::GetPPInstance().GetPoller(thread->GetQos()).DetermineEmptyMap() &&
(polling_[thread->GetQos()] == 0);
return tearDown || taskExistence || needPoll;
})) {
monitor->WakeupCount(thread->GetQos());
FFRT_PERF_WORKER_AWAKE(static_cast<int>(thread->GetQos()));
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
BlockawareLeaveSleeping();
#endif
return WorkerAction::RETRY;
} else {
#if !defined(OHOS_STANDARD_SYSTEM)
monitor->IntoDeepSleep(thread->GetQos());
CoStackFree();
if (monitor->IsExceedDeepSleepThreshold()) {
ffrt::CoRoutineReleaseMem();
}
ctl.cv.wait(lk, [this, thread] {
return tearDown || GetTaskCount(thread->GetQos()) ||
reinterpret_cast<const CPUWorker*>(thread)->priority_task ||
reinterpret_cast<const CPUWorker*>(thread)->localFifo.GetLength();
});
monitor->OutOfDeepSleep(thread->GetQos());
return WorkerAction::RETRY;
#else
monitor->TimeoutCount(thread->GetQos());
FFRT_LOGD("worker exit");
return WorkerAction::RETIRE;
#endif
//qos has no worker, start delay worker to monitor task
if (isEmptyQosThreads) {
FFRT_LOGI("qos has no worker, start delay worker to monitor task, qos %d", qos);
AddDelayedTask(qos);
}
}
@ -156,7 +124,7 @@ CPUEUTask* SCPUWorkerManager::PickUpTaskFromGlobalQueue(WorkerThread* thread)
return nullptr;
}
auto& sched = FFRTScheduler::Instance()->GetScheduler(thread->GetQos());
auto& sched = FFRTFacade::GetSchedInstance()->GetScheduler(thread->GetQos());
auto lock = GetSleepCtl(static_cast<int>(thread->GetQos()));
std::lock_guard lg(*lock);
return sched.PickNextTask();
@ -168,10 +136,12 @@ CPUEUTask* SCPUWorkerManager::PickUpTaskBatch(WorkerThread* thread)
return nullptr;
}
auto& sched = FFRTScheduler::Instance()->GetScheduler(thread->GetQos());
auto& sched = FFRTFacade::GetSchedInstance()->GetScheduler(thread->GetQos());
auto lock = GetSleepCtl(static_cast<int>(thread->GetQos()));
std::lock_guard lg(*lock);
CPUEUTask* task = sched.PickNextTask();
#ifdef FFRT_LOCAL_QUEUE_ENABLE
if (task == nullptr) {
return nullptr;
}
@ -197,10 +167,124 @@ CPUEUTask* SCPUWorkerManager::PickUpTaskBatch(WorkerThread* thread)
queue->PushTail(task2local);
}
#endif
return task;
}
void SCPUWorkerManager::AddDelayedTask(int qos)
{
WaitUntilEntry* we = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
we->tp = std::chrono::steady_clock::now() + std::chrono::microseconds(DELAYED_WAKED_UP_TASK_TIME_INTERVAL);
we->cb = ([this, qos](WaitEntry* we) {
int taskCount = GetTaskCount(QoS(qos));
std::unique_lock<std::shared_mutex> lck(groupCtl[qos].tgMutex);
bool isEmpty = groupCtl[qos].threads.empty();
lck.unlock();
if (!isEmpty) {
SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(we));
FFRT_LOGW("qos[%d] has worker, no need add delayed task", qos);
return;
}
if (taskCount != 0) {
FFRT_LOGI("notify task, qos %d", qos);
FFRTFacade::GetEUInstance().NotifyTaskAdded(QoS(qos));
} else {
AddDelayedTask(qos);
}
SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(we));
});
if (!DelayedWakeup(we->tp, we, we->cb)) {
SimpleAllocator<WaitUntilEntry>::FreeMem(we);
FFRT_LOGW("add delyaed task failed, qos %d", qos);
}
}
WorkerAction SCPUWorkerManager::WorkerIdleAction(const WorkerThread* thread)
{
if (tearDown) {
return WorkerAction::RETIRE;
}
auto& ctl = sleepCtl[thread->GetQos()];
std::unique_lock lk(ctl.mutex);
monitor->IntoSleep(thread->GetQos());
FFRT_PERF_WORKER_IDLE(static_cast<int>(thread->GetQos()));
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
BlockawareEnterSleeping();
#endif
if (ctl.cv.wait_for(lk, std::chrono::seconds(waiting_seconds), [this, thread] {
bool taskExistence = GetTaskCount(thread->GetQos()) ||
reinterpret_cast<const CPUWorker*>(thread)->priority_task ||
reinterpret_cast<const CPUWorker*>(thread)->localFifo.GetLength();
bool needPoll = !FFRTFacade::GetPPInstance().GetPoller(thread->GetQos()).DetermineEmptyMap() &&
(polling_[thread->GetQos()] == 0);
return tearDown || taskExistence || needPoll;
})) {
monitor->WakeupSleep(thread->GetQos());
FFRT_PERF_WORKER_AWAKE(static_cast<int>(thread->GetQos()));
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
BlockawareLeaveSleeping();
#endif
return WorkerAction::RETRY;
} else {
#if !defined(SUPPORT_WORKER_DESTRUCT)
monitor->IntoDeepSleep(thread->GetQos());
CoStackFree();
if (monitor->IsExceedDeepSleepThreshold()) {
ffrt::CoRoutineReleaseMem();
}
ctl.cv.wait(lk, [this, thread] {
return tearDown || GetTaskCount(thread->GetQos()) ||
reinterpret_cast<const CPUWorker*>(thread)->priority_task ||
reinterpret_cast<const CPUWorker*>(thread)->localFifo.GetLength();
});
monitor->WakeupDeepSleep(thread->GetQos());
return WorkerAction::RETRY;
#else
monitor->TimeoutCount(thread->GetQos());
return WorkerAction::RETIRE;
#endif
}
}
WorkerAction SCPUWorkerManager::WorkerIdleActionSimplified(const WorkerThread* thread)
{
if (tearDown) {
return WorkerAction::RETIRE;
}
auto& ctl = sleepCtl[thread->GetQos()];
std::unique_lock lk(ctl.mutex);
monitor->IntoSleep(thread->GetQos());
FFRT_PERF_WORKER_IDLE(static_cast<int>(thread->GetQos()));
if (ctl.cv.wait_for(lk, std::chrono::seconds(waiting_seconds), [this, thread] {
bool taskExistence = GetTaskCount(thread->GetQos());
return tearDown || taskExistence;
})) {
monitor->WakeupSleep(thread->GetQos());
FFRT_PERF_WORKER_AWAKE(static_cast<int>(thread->GetQos()));
return WorkerAction::RETRY;
} else {
#if !defined(SUPPORT_WORKER_DESTRUCT)
monitor->IntoDeepSleep(thread->GetQos());
CoStackFree();
if (monitor->IsExceedDeepSleepThreshold()) {
ffrt::CoRoutineReleaseMem();
}
ctl.cv.wait(lk, [this, thread] {return tearDown || GetTaskCount(thread->GetQos());});
monitor->WakeupDeepSleep(thread->GetQos());
return WorkerAction::RETRY;
#else
monitor->TimeoutCount(thread->GetQos());
return WorkerAction::RETIRE;
#endif
}
}
void SCPUWorkerManager::WorkerPrepare(WorkerThread* thread)
{
WorkerJoinTg(thread->GetQos(), thread->Id());
@ -217,54 +301,5 @@ void SCPUWorkerManager::WakeupWorkers(const QoS& qos)
ctl.cv.notify_one();
FFRT_PERF_WORKER_WAKE(static_cast<int>(qos));
}
WorkerThread* CPUManagerStrategy::CreateCPUWorker(const QoS& qos, void* manager)
{
constexpr int processNameLen = 32;
static std::once_flag flag;
static char processName[processNameLen];
std::call_once(flag, []() {
GetProcessName(processName, processNameLen);
});
CPUWorkerManager* pIns = reinterpret_cast<CPUWorkerManager*>(manager);
// default strategy of worker ops
CpuWorkerOps ops {
CPUWorker::WorkerLooperDefault,
[pIns] (WorkerThread* thread) { return pIns->PickUpTaskFromGlobalQueue(thread); },
[pIns] (const WorkerThread* thread) { pIns->NotifyTaskPicked(thread); },
[pIns] (const WorkerThread* thread) { return pIns->WorkerIdleAction(thread); },
[pIns] (WorkerThread* thread) { pIns->WorkerRetired(thread); },
[pIns] (WorkerThread* thread) { pIns->WorkerPrepare(thread); },
[pIns] (const WorkerThread* thread, int timeout) { return pIns->TryPoll(thread, timeout); },
[pIns] (WorkerThread* thread) { return pIns->StealTaskBatch(thread); },
[pIns] (WorkerThread* thread) { return pIns->PickUpTaskBatch(thread); },
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
[pIns] (const WorkerThread* thread) { return pIns->IsExceedRunningThreshold(thread); },
[pIns] () { return pIns->IsBlockAwareInit(); },
#endif
};
return new (std::nothrow) CPUWorker(qos, std::move(ops));
}
CPUMonitor* CPUManagerStrategy::CreateCPUMonitor(void* manager)
{
constexpr int processNameLen = 32;
static std::once_flag flag;
static char processName[processNameLen];
std::call_once(flag, []() {
GetProcessName(processName, processNameLen);
});
SCPUWorkerManager* pIns = reinterpret_cast<SCPUWorkerManager*>(manager);
// default strategy of monitor ops
CpuMonitorOps ops {
[pIns] (const QoS& qos) { return pIns->IncWorker(qos); },
[pIns] (const QoS& qos) { pIns->WakeupWorkers(qos); },
[pIns] (const QoS& qos) { return pIns->GetTaskCount(qos); },
[pIns] (const QoS& qos) { return pIns->GetWorkerCount(qos); },
CPUMonitor::HandleTaskNotifyDefault,
};
return new SCPUMonitor(std::move(ops));
}
} // namespace ffrt

View File

@ -23,11 +23,12 @@ public:
SCPUWorkerManager();
~SCPUWorkerManager() override;
WorkerAction WorkerIdleAction(const WorkerThread* thread) override;
void WorkerPrepare(WorkerThread* thread) override;
void WakeupWorkers(const QoS& qos) override;
WorkerAction WorkerIdleActionSimplified(const WorkerThread* thread) override;
CPUEUTask* PickUpTaskFromGlobalQueue(WorkerThread* thread) override;
CPUEUTask* PickUpTaskBatch(WorkerThread* thread) override;
friend class CPUManagerStrategy;
void WorkerRetiredSimplified(WorkerThread* thread) override;
void WorkerPrepare(WorkerThread* thread) override;
void WakeupWorkers(const QoS& qos) override;
private:
void AddDelayedTask(int qos);
};

View File

@ -186,6 +186,7 @@ public:
void WorkerSetup(WorkerThread* wthread);
void NativeConfig();
void* worker_mgr;
private:
std::atomic_bool exited;

View File

@ -13,13 +13,14 @@
* limitations under the License.
*/
#include "base_queue.h"
#include <unordered_map>
#include "dfx/log/ffrt_log_api.h"
#include "tm/queue_task.h"
#include "serial_queue.h"
#include "concurrent_queue.h"
#include "eventhandler_adapter_queue.h"
#include "eventhandler_interactive_queue.h"
#include "base_queue.h"
namespace {
using CreateFunc = std::unique_ptr<ffrt::BaseQueue>(*)(const ffrt_queue_attr_t*);

View File

@ -13,6 +13,7 @@
* limitations under the License.
*/
#include "queue_handler.h"
#include <sys/syscall.h>
#include <sstream>
#include "dfx/log/ffrt_log_api.h"
#include "dfx/trace_record/ffrt_trace_record.h"
@ -63,7 +64,6 @@ QueueHandler::QueueHandler(const char* name, const ffrt_queue_attr_t* attr, cons
QueueHandler::~QueueHandler()
{
FFRT_COND_DO_ERR((queue_ == nullptr), return, "cannot destruct, [queueId=%u] constructed failed", GetQueueId());
FFRT_LOGI("destruct %s enter", name_.c_str());
// clear tasks in queue
CancelAndWait();
@ -173,8 +173,8 @@ void QueueHandler::CancelAndWait()
{
FFRT_COND_DO_ERR((queue_ == nullptr), return, "cannot cancelAndWait, [queueId=%u] constructed failed",
GetQueueId());
queue_->Remove();
while (FFRTFacade::GetQMInstance().QueryQueueStatus(GetQueueId()) != 0 || queue_->GetActiveStatus()) {
queue_->Stop();
while (FFRTFacade::GetQMInstance().QueryQueueStatus(GetQueueId()) || queue_->GetActiveStatus()) {
std::this_thread::sleep_for(std::chrono::microseconds(TASK_DONE_WAIT_UNIT));
}
}
@ -200,6 +200,7 @@ int QueueHandler::Cancel(QueueTask* task)
if (task->GetSchedTimeout() > 0) {
RemoveSchedDeadline(task);
}
int ret = queue_->Remove(task);
if (ret == SUCC) {
FFRT_LOGD("cancel task[%llu] %s succ", task->gid, task->label.c_str());
@ -245,7 +246,6 @@ void QueueHandler::Dispatch(QueueTask* inTask)
f->destroy(f);
task->Notify();
RemoveTimeoutMonitor(task);
// run task batch
nextTask = task->GetNextTask();
@ -299,25 +299,25 @@ void QueueHandler::SetTimeoutMonitor(QueueTask* task)
}
task->IncDeleteRef();
timeoutWe_ = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
WaitUntilEntry* we = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
// set delayed worker callback
timeoutWe_->cb = ([this, task](WaitEntry* timeoutWe_) {
we->cb = ([this, task](WaitEntry* we) {
if (!task->GetFinishStatus()) {
RunTimeOutCallback(task);
}
delayedCbCnt_.fetch_sub(1);
task->DecDeleteRef();
SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(timeoutWe_));
SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(we));
});
// set delayed worker wakeup time
std::chrono::microseconds timeout(timeout_);
auto now = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
timeoutWe_->tp = std::chrono::time_point_cast<std::chrono::steady_clock::duration>(now + timeout);
we->tp = std::chrono::time_point_cast<std::chrono::steady_clock::duration>(now + timeout);
if (!DelayedWakeup(timeoutWe_->tp, timeoutWe_, timeoutWe_->cb)) {
if (!DelayedWakeup(we->tp, we, we->cb)) {
task->DecDeleteRef();
SimpleAllocator<WaitUntilEntry>::FreeMem(timeoutWe_);
SimpleAllocator<WaitUntilEntry>::FreeMem(we);
FFRT_LOGW("failed to set watchdog for task gid=%llu in %s with timeout [%llu us] ", task->gid,
name_.c_str(), timeout_);
return;
@ -327,20 +327,6 @@ void QueueHandler::SetTimeoutMonitor(QueueTask* task)
FFRT_LOGD("set watchdog of task gid=%llu of %s succ", task->gid, name_.c_str());
}
void QueueHandler::RemoveTimeoutMonitor(QueueTask* task)
{
if (timeout_ <= 0) {
return;
}
WaitEntry* dwe = static_cast<WaitEntry*>(timeoutWe_);
if (DelayedRemove(timeoutWe_->tp, dwe)) {
delayedCbCnt_.fetch_sub(1);
SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(timeoutWe_));
}
return;
}
void QueueHandler::RunTimeOutCallback(QueueTask* task)
{
std::stringstream ss;
@ -392,7 +378,7 @@ void QueueHandler::SetEventHandler(void* eventHandler)
void* QueueHandler::GetEventHandler()
{
FFRT_COND_DO_ERR((queue_ == nullptr), return nullptr, "[queueId=%u] constructed failed", GetQueueId());
bool typeInvalid = (queue_->GetQueueType() != ffrt_queue_eventhandler_interactive) &&
(queue_->GetQueueType() != ffrt_queue_eventhandler_adapter);
FFRT_COND_DO_ERR(typeInvalid, return nullptr, "[queueId=%u] type invalid", GetQueueId());
@ -422,7 +408,7 @@ void QueueHandler::SendSchedTimer(TimePoint delay)
we_->cb = ([this](WaitEntry* we_) { CheckSchedDeadline(); });
bool result = DelayedWakeup(we_->tp, we_, we_->cb);
while (!result) {
FFRT_LOGW("failed to set delayedwoker, retry");
FFRT_LOGW("failed to set delayedworker, retry");
we_->tp = std::chrono::steady_clock::now() + std::chrono::microseconds(SCHED_TIME_ACC_ERROR_US);
result = DelayedWakeup(we_->tp, we_, we_->cb);
}

View File

@ -14,7 +14,7 @@
*/
#ifndef FFRT_QUEUE_HANDLER_H
#define FFRT_QUEUE_HANDLER_H
#include <atomic>
#include <memory>
#include <string>
@ -24,7 +24,6 @@
#include "cpp/task.h"
#include "base_queue.h"
#include "sched/execute_ctx.h"
#include "sched/execute_ctx.h"
namespace ffrt {
class QueueTask;
@ -48,21 +47,21 @@ public:
bool SetLoop(Loop* loop);
bool ClearLoop();
QueueTask* PickUpTask();
inline bool IsValidForLoop()
inline bool IsValidForLoop()
{
return !isUsed_.load() && (queue_->GetQueueType() == ffrt_queue_concurrent
|| queue_->GetQueueType() == ffrt_queue_eventhandler_interactive);
|| queue_->GetQueueType() == ffrt_queue_eventhandler_interactive);
}
inline std::string GetName()
inline std::string GetName()
{
return name_;
}
inline uint32_t GetQueueId()
inline uint32_t GetQueueId()
{
FFRT_COND_DO_ERR((queue_ == nullptr), return 0, "queue construct failed");
return queue_->GetQueueId();
@ -81,7 +80,7 @@ public:
inline uint64_t GetTaskCnt()
{
FFRT_COND_DO_ERR((queue_ == nullptr), return false, "[queueID=%u] constructed failed", GetQueueId());
FFRT_COND_DO_ERR((queue_ == nullptr), return false, "[queueId=%u] constructed failed", GetQueueId());
return queue_->GetMapSize();
}
@ -101,7 +100,6 @@ private:
void Deliver();
void TransferInitTask();
void SetTimeoutMonitor(QueueTask* task);
void RemoveTimeoutMonitor(QueueTask* task);
void RunTimeOutCallback(QueueTask* task);
void CheckSchedDeadline();
@ -120,7 +118,6 @@ private:
uint64_t timeout_ = 0;
std::atomic_int delayedCbCnt_ = {0};
ffrt_function_header_t* timeoutCb_ = nullptr;
WaitUntilEntry* timeoutWe_ = nullptr;
std::mutex mutex_;
bool initSchedTimer_ = false;

View File

@ -55,7 +55,7 @@ QueueMonitor::QueueMonitor()
QueueMonitor::~QueueMonitor()
{
FFRT_LOGI("destruction of QueueMonitor enter");
FFRT_LOGI("destruction of QueueMonitor");
SimpleAllocator<WaitUntilEntry>::FreeMem(we_);
}

View File

@ -15,9 +15,10 @@
#include "execute_ctx.h"
#include <sys/syscall.h>
#include <unistd.h>
#include <pthread.h>
pthread_key_t g_executeCtxTlsKey = 0;
pthread_once_t g_executeCtxKeyOnce = PTHREAD_ONCE_INIT;
namespace ffrt {
namespace {
void ExecuteCtxTlsDestructor(void* args)
@ -32,7 +33,7 @@ void MakeExecuteCtxTlsKey()
{
pthread_key_create(&g_executeCtxTlsKey, ExecuteCtxTlsDestructor);
}
} // namespace
}
ExecuteCtx::ExecuteCtx()
{

View File

@ -20,6 +20,7 @@
#include "dfx/log/ffrt_log_api.h"
#include "task_client_adapter.h"
#if (defined(QOS_WORKER_FRAME_RTG) || defined(QOS_FRAME_RTG))
constexpr int HWC_UID = 3039;
constexpr int ROOT_UID = 0;
@ -27,7 +28,7 @@ constexpr int RS_RTG_ID = 10;
namespace ffrt {
static int wgId = -1;
static WorkGroup *rsWorkGroup = nullptr;
static WorkGroup* rsWorkGroup = nullptr;
static int wgCount = 0;
static std::mutex wgLock;
@ -115,7 +116,7 @@ bool LeaveRSWorkGroup(int tid)
if (existIndex != -1) {
rsWorkGroup->tids[existIndex] = -1;
}
FFRT_LOGI("[RSWorkGroup] LeaveRSWorkGroup ,tid:%{public}d,existIndex:%{public}d", tid, existIndex);
FFRT_LOGI("[RSWorkGroup] LeaveRSWorkGroup ,tid: %{public}d, existIndex: %{public}d", tid, existIndex);
return true;
}
@ -165,7 +166,6 @@ bool JoinWG(int tid)
}
return false;
}
int uid = getuid();
if (uid == RS_UID) {
return JoinRSWorkGroup(tid);
@ -203,7 +203,7 @@ struct WorkGroup* WorkgroupCreate(uint64_t interval)
FFRT_LOGE("[WorkGroup] create rtg group %d failed", rtgId);
return nullptr;
}
FFRT_LOGI("[WorkGroup] create rtg group %{public}d success", rtgId);
FFRT_LOGI("[WorkGroup] create rtg group %d success", rtgId);
WorkGroup* wg = nullptr;
wg = new struct WorkGroup();
@ -225,7 +225,6 @@ int WorkgroupClear(struct WorkGroup* wg)
if (uid == RS_UID) {
return DestoryRSWorkGroup();
}
if (wg == nullptr) {
FFRT_LOGE("[WorkGroup] input workgroup is null");
return 0;
@ -306,6 +305,7 @@ void WorkgroupJoin(struct WorkGroup* wg, int tid)
FFRT_LOGE("[WorkGroup] join fail with %{public}d threads for %{public}d", addRet, tid);
}
}
#endif /* QOS_FRAME_RTG */
}

View File

@ -27,15 +27,15 @@ int QoSMap(int qos)
}
}
static FuncQosMap g_funcQosMap = nullptr;
static FuncQosMap funcQosMap = nullptr;
void SetFuncQosMap(FuncQosMap func)
{
g_funcQosMap = func;
funcQosMap = func;
}
FuncQosMap GetFuncQosMap(void)
{
return g_funcQosMap;
return funcQosMap;
}
int QoSMax(void)
@ -43,14 +43,14 @@ int QoSMax(void)
return qos_max + 1;
}
static FuncQosMax g_funcQosMax = nullptr;
static FuncQosMax funcQosMax = nullptr;
void SetFuncQosMax(FuncQosMax func)
{
g_funcQosMax = func;
funcQosMax = func;
}
FuncQosMax GetFuncQosMax(void)
{
return g_funcQosMax;
return funcQosMax;
}
}

View File

@ -65,12 +65,12 @@ public:
bool RemoveNode(LinkedList* node)
{
FFRT_PERF_TASK_NUM(qos, RQSize());
bool ret = false;
{
que->RmQueueNode(node);
ret = true;
}
FFRT_PERF_TASK_NUM(qos, RQSize());
return ret;
}

View File

@ -28,7 +28,6 @@ int TaskState::OnTransition(State state, CPUEUTask* task, Op&& op)
return -1;
}
if (task->IsRoot()) {
FFRT_LOGD("task root no state transition");
return 0;
}

View File

@ -20,6 +20,7 @@
#include <sys/prctl.h>
#include <sys/timerfd.h>
#include <thread>
#include <pthread.h>
#include "eu/blockaware.h"
#include "eu/execute_unit.h"
#include "dfx/log/ffrt_log_api.h"
@ -27,12 +28,36 @@
#include "util/name_manager.h"
#include "sched/scheduler.h"
namespace {
const uintptr_t FFRT_DELAY_WORKER_MAGICNUM = 0x5aa5;
const int FFRT_DELAY_WORKER_IDLE_TIMEOUT_SECONDS = 3 * 60;
const int NS_PER_SEC = 1000 * 1000 * 1000;
const int WAIT_EVENT_SIZE = 5;
const int64_t EXECUTION_TIMEOUT_MILISECONDS = 500;
}
namespace ffrt {
pthread_key_t g_ffrtDelayWorkerFlagKey;
pthread_once_t g_ffrtDelayWorkerThreadKeyOnce = PTHREAD_ONCE_INIT;
void FFRTDelayWorkeEnvKeyCreate()
{
pthread_key_create(&g_ffrtDelayWorkerFlagKey, nullptr);
}
void DelayedWorker::ThreadEnvCreate()
{
pthread_once(&g_ffrtDelayWorkerThreadKeyOnce, FFRTDelayWorkeEnvKeyCreate);
}
bool DelayedWorker::IsDelayerWorkerThread()
{
bool isDelayerWorkerFlag = false;
void* flag = pthread_getspecific(g_ffrtDelayWorkerFlagKey);
if ((flag != nullptr) && (reinterpret_cast<uintptr_t>(flag) == FFRT_DELAY_WORKER_MAGICNUM)) {
isDelayerWorkerFlag = true;
}
return isDelayerWorkerFlag;
}
void DelayedWorker::ThreadInit()
{
if (delayWorker != nullptr && delayWorker->joinable()) {
@ -48,6 +73,7 @@ void DelayedWorker::ThreadInit()
FFRT_LOGI("thread init");
}
prctl(PR_SET_NAME, DELAYED_WORKER_NAME);
pthread_setspecific(g_ffrtDelayWorkerFlagKey, reinterpret_cast<void*>(FFRT_DELAY_WORKER_MAGICNUM));
std::array<epoll_event, WAIT_EVENT_SIZE> waitedEvents;
for (;;) {
std::unique_lock lk(lock);
@ -115,7 +141,7 @@ DelayedWorker::DelayedWorker(): epollfd_ { ::epoll_create1(EPOLL_CLOEXEC) },
monitorfd_ = BlockawareMonitorfd(-1, monitor->WakeupCond());
FFRT_ASSERT(monitorfd_ >= 0);
FFRT_LOGI("timerfd:%d, monitorfd:%d", timerfd_, monitorfd_);
/* monitorfd does not support 'CLOEXEC', add current kernel does not inherit monitorfd after 'fork'.
/* monitorfd does not support 'CLOEXEC', and current kernel does not inherit monitorfd after 'fork'.
* 1. if user calls 'exec' directly after 'fork' and does not use ffrt, it's ok.
* 2. if user calls 'exec' directly, the original process cannot close monitorfd automatically, and
* it will be fail when new program use ffrt to create monitorfd.
@ -156,7 +182,7 @@ void CheckTimeInterval(const TimePoint& startTp, const TimePoint& endTp)
int64_t durationMs = duration.count();
if (durationMs > EXECUTION_TIMEOUT_MILISECONDS) {
FFRT_LOGW("handle work more than [%lld]ms", durationMs);
}
}
}
int DelayedWorker::HandleWork()

View File

@ -41,13 +41,15 @@ class DelayedWorker {
int timerfd_{-1};
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
int monitorfd_{-1};
CPUMonitor* monitor;
CPUMonitor* monitor = nullptr;
#endif
int HandleWork(void);
void ThreadInit();
public:
static DelayedWorker &GetInstance();
static void ThreadEnvCreate();
static bool IsDelayerWorkerThread();
DelayedWorker(DelayedWorker const&) = delete;
void operator=(DelayedWorker const&) = delete;

View File

@ -141,7 +141,7 @@ public:
bool try_lock() override;
RecursiveMutexPrivate() = default;
~RecursiveMutexPrivate() override = default;
~RecursiveMutexPrivate() = default;
RecursiveMutexPrivate(RecursiveMutexPrivate const&) = delete;
void operator = (RecursiveMutexPrivate const&) = delete;

View File

@ -60,12 +60,12 @@ int Poller::AddFdEvent(int op, uint32_t events, int fd, void* data, ffrt_poller_
wakeData->monitorEvents = events;
epoll_event ev = { .events = events, .data = { .ptr = ptr } };
std::unique_lock lock(m_mapMutex);
if (epoll_ctl(m_epFd, op, fd, &ev) != 0) {
FFRT_LOGE("epoll_ctl add fd error: efd=%d, fd=%d, errorno=%d", m_epFd, fd, errno);
return -1;
}
std::unique_lock lock(m_mapMutex);
if (op == EPOLL_CTL_ADD) {
m_wakeDataMap[fd].emplace_back(std::move(wakeData));
fdEmpty_.store(false);
@ -84,12 +84,12 @@ int Poller::AddFdEvent(int op, uint32_t events, int fd, void* data, ffrt_poller_
int Poller::DelFdEvent(int fd) noexcept
{
std::unique_lock lock(m_mapMutex);
if (epoll_ctl(m_epFd, EPOLL_CTL_DEL, fd, nullptr) != 0) {
FFRT_LOGE("epoll_ctl del fd error: efd=%d, fd=%d, errorno=%d", m_epFd, fd, errno);
return -1;
}
std::unique_lock lock(m_mapMutex);
m_delCntMap[fd]++;
WakeUp();
return 0;
@ -301,14 +301,6 @@ void Poller::CacheEventsAndDoMask(CPUEUTask* task, EventVec& eventVec) noexcept
{
for (size_t i = 0; i < eventVec.size(); i++) {
int currFd = eventVec[i].data.fd;
auto delIter = m_delCntMap.find(currFd);
if (delIter != m_delCntMap.end()) {
unsigned int delCnt = static_cast<unsigned int>(delIter->second);
auto& WakeDataList = m_wakeDataMap[currFd];
if (WakeDataList.size() == delCnt) {
continue;
}
}
struct epoll_event maskEv;
maskEv.events = 0;
if (epoll_ctl(m_epFd, EPOLL_CTL_MOD, currFd, &maskEv) != 0 && errno != ENOENT) {
@ -473,14 +465,11 @@ void Poller::ExecuteTimerCb(TimePoint timer) noexcept
timerMutex_.unlock();
if (data.cb != nullptr) {
data.cb(data.data);
executedHandle_[data.handle] = TimerStatus::EXECUTED;
} else if (data.task != nullptr) {
ProcessTimerDataCb(data.task);
}
if (data.cb != nullptr) {
executedHandle_[data.handle] = TimerStatus::EXECUTED;
}
timerMutex_.lock();
if (data.repeat && (executedHandle_.find(data.handle) != executedHandle_.end())) {

View File

@ -14,6 +14,7 @@
*/
#include "shared_mutex_private.h"
#include "dfx/log/ffrt_log_api.h"
#include "ffrt_trace.h"
#include "internal_inc/osal.h"

View File

@ -23,9 +23,9 @@
#include <map>
#include <functional>
#include <linux/futex.h>
#include "sync.h"
#include "delayed_worker.h"
#include "util/ffrt_facade.h"
#include "sync.h"
#ifdef NS_PER_SEC
#undef NS_PER_SEC
@ -60,9 +60,6 @@ static void spin()
asm volatile("isb sy");
#elif defined(__arm__)
asm volatile("yield");
#elif defined(__riscv)
// https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/arch/riscv/include/asm/vdso/processor.h?h=v6.6#n25
asm volatile(".4byte 0x100000F");
#endif
}

View File

@ -59,10 +59,6 @@ struct TaskWithNode : public TaskListNode {
};
class WaitQueue {
private:
spin_mutex wqlock;
WaitUntilEntry* whead;
public:
using TimePoint = std::chrono::steady_clock::time_point;
void SuspendAndWait(mutexPrivate* lk);
@ -88,6 +84,10 @@ public:
wqlock.unlock();
}
private:
spin_mutex wqlock;
WaitUntilEntry* whead;
private:
bool WeNotifyProc(WaitUntilEntry* we);
void ThreadWait(WaitUntilEntry* wn, mutexPrivate* lk, bool legacyMode, CPUEUTask* task);

View File

@ -103,6 +103,5 @@ CPUEUTask::CPUEUTask(const task_attr_private *attr, CPUEUTask *parent, const uin
if (attr) {
stack_size = std::max(attr->stackSize_, MIN_STACK_SIZE);
}
FFRT_LOGD("create task name:%s gid=%lu taskLocal:%d", label.c_str(), gid, taskLocal);
}
} /* namespace ffrt */

View File

@ -55,8 +55,6 @@ void SCPUEUTask::DecDepRef()
void SCPUEUTask::DecChildRef()
{
SCPUEUTask* parent = reinterpret_cast<SCPUEUTask*>(this->parent);
FFRT_LOGD("DecChildRef parent task:%s, childRefCnt=%u, task[%lu], name[%s]",
parent->label.c_str(), parent->childRefCnt.load(), gid, label.c_str());
FFRT_TRACE_SCOPE(2, taskDecChildRef);
std::unique_lock<decltype(parent->mutex_)> lck(parent->mutex_);
parent->childRefCnt--;

View File

@ -24,6 +24,7 @@
namespace ffrt {
static std::atomic_uint64_t s_gid(0);
static constexpr uint64_t cacheline_size = 64;
class TaskBase {
public:
uintptr_t reserved = 0;

View File

@ -23,9 +23,8 @@
#include <unordered_set>
#endif
#include <sys/mman.h>
#include "dfx/log/ffrt_log_api.h"
#include "spmc_queue.h"
#include "sync/sync.h"
#include "dfx/log/ffrt_log_api.h"
namespace ffrt {
const std::size_t BatchAllocSize = 32 * 1024;
@ -83,7 +82,7 @@ public:
return Instance()->SimpleAllocatorUnLock();
}
private:
SpmcQueue primaryCache;
std::vector<T*> primaryCache;
#ifdef FFRT_BBOX_ENABLE
std::unordered_set<T*> secondaryCache;
#endif
@ -99,7 +98,8 @@ private:
char* p = reinterpret_cast<char*>(basePtr);
for (std::size_t i = 0; i + TSize <= MmapSz; i += TSize) {
if (basePtr != nullptr &&
primaryCache.FindElement(reinterpret_cast<void *>(p + i)) == false) {
std::find(primaryCache.begin(), primaryCache.end(),
reinterpret_cast<T*>(p + i)) == primaryCache.end()) {
ret.push_back(reinterpret_cast<void *>(p + i));
}
}
@ -122,56 +122,53 @@ private:
void init()
{
lock.lock();
if (basePtr) {
lock.unlock();
return;
}
char* p = reinterpret_cast<char*>(operator new(MmapSz));
count = MmapSz / TSize;
primaryCache.Init(count);
primaryCache.reserve(count);
for (std::size_t i = 0; i + TSize <= MmapSz; i += TSize) {
primaryCache.PushTail(p + i);
primaryCache.push_back(reinterpret_cast<T*>(p + i));
}
basePtr = reinterpret_cast<T*>(p);
lock.unlock();
}
T* Alloc()
{
// 未初始化
if (primaryCache.GetCapacity() == 0) {
lock.lock();
T* t = nullptr;
if (count == 0) {
if (basePtr != nullptr) {
t = reinterpret_cast<T*>(::operator new(TSize));
#ifdef FFRT_BBOX_ENABLE
secondaryCache.insert(t);
#endif
lock.unlock();
return t;
}
init();
}
// 一级cache已耗尽
if (primaryCache.GetLength()) {
T* t = reinterpret_cast<T*>(::operator new(TSize));
#ifdef FFRT_BBOX_ENABLE
lock.lock();
secondaryCache.insert(t);
lock.unlock();
#endif
return t;
}
return static_cast<T*>(primaryCache.PopHead());
t = primaryCache.back();
primaryCache.pop_back();
count--;
lock.unlock();
return t;
}
void free(T* t)
{
if (basePtr && basePtr <= t &&
lock.lock();
if (basePtr != nullptr &&
basePtr <= t &&
static_cast<size_t>(reinterpret_cast<uintptr_t>(t)) <
static_cast<size_t>(reinterpret_cast<uintptr_t>(basePtr)) + MmapSz) {
primaryCache.PushTail(t);
return;
}
primaryCache.push_back(t);
count++;
} else {
#ifdef FFRT_BBOX_ENABLE
lock.lock();
secondaryCache.erase(t);
lock.unlock();
#endif
::operator delete(t);
}
lock.unlock();
}
SimpleAllocator(std::size_t size = sizeof(T)) : TSize(size)
@ -187,7 +184,7 @@ private:
uint32_t try_cnt = ALLOCATOR_DESTRUCT_TIMESOUT;
std::size_t reserved = MmapSz / TSize;
while (try_cnt > 0) {
if (primaryCache.GetLength() == reserved && secondaryCache.size() == 0) {
if (primaryCache.size() == reserved && secondaryCache.size() == 0) {
break;
}
lck.unlock();

View File

@ -89,23 +89,6 @@ int SpmcQueue::PushTail(void* object)
return -1;
}
bool SpmcQueue::FindElement(void* target)
{
if (buf_ == nullptr) {
return false;
}
unsigned int head = head_.load();
unsigned int tail = tail_.load();
while (head != tail) {
void* element = buf_[head % capacity_];
if (target == element) {
return true;
}
head++;
}
return false;
}
unsigned int SpmcQueue::PopHeadToAnotherQueue(SpmcQueue& dstQueue, unsigned int elementNum, int qos, PushFunc func)
{
if (elementNum == 0) {

View File

@ -46,13 +46,6 @@ public:
*/
int PushTail(void* object);
/**
* @brief
* @param target
* @retval truefalse
*/
bool FindElement(void* target);
/**
* @brief
* @param dstQueue

View File

@ -35,7 +35,7 @@ static std::string FormatDateString(const std::chrono::system_clock::time_point&
constexpr int UsPerSecond = 1000 * 1000;
std::string remainder;
if (timeUnit == microsecond) {
if (microsecond == timeUnit) {
auto tp = std::chrono::time_point_cast<std::chrono::microseconds>(timePoint);
auto us = tp.time_since_epoch().count() % UsPerSecond;
remainder = std::to_string(us);

View File

@ -49,20 +49,20 @@ namespace ffrt {
WorkerMonitor::WorkerMonitor()
{
// 获取当前进程名称
char processName[PROCESS_NAME_BUFFER_LENGTH] = {0};
char processName[PROCESS_NAME_BUFFER_LENGTH] = "";
GetProcessName(processName, PROCESS_NAME_BUFFER_LENGTH);
if (strlen(processName) == 0) {
FFRT_LOGW("Can't get process name, no permission for /proc folder or prctl exec, skip worker monitor");
FFRT_LOGW("Get process name failed, skip worker monitor");
skipSampling_ = true;
return;
}
// 从配置文件中读取屏蔽进程
// 从配置文件读取黑名单
std::string skipProcess;
std::ifstream file(CONF_FILEPATH);
if (file.is_open()) {
while (std::getline(file, skipProcess)) {
if (std::string(processName).find(skipProcess) != std::string::npos) {
if (strstr(processName, skipProcess.c_str()) != nullptr) {
skipSampling_ = true;
return;
}

View File

@ -87,6 +87,7 @@ private:
CPUEUTask* workerTask, std::vector<TimeoutFunctionInfo>& timeoutFunctions);
void RecordSymbolAndBacktrace(const TimeoutFunctionInfo& timeoutFunction);
void RecordIpcInfo(const std::string& dumpInfo, int tid);
void RecordKeyInfo(const std::string& dumpInfo);
private:
std::mutex mutex_;

View File

@ -76,7 +76,7 @@ config("ffrt_test_config") {
defines += [ "TDD_MUSL" ]
}
if (ffrt_async_stack_enable) {
defines += [ "ASYNC_STACKTRACE" ]
defines += [ "FFRT_ASYNC_STACKTRACE" ]
}
if (target_cpu == "arm") {
defines += [ "APP_USE_ARM" ]

View File

@ -80,25 +80,6 @@ HWTEST_F(CpuMonitorTest, IntoSleep, TestSize.Level1)
cpu.IntoSleep(QoS(5));
}
#ifdef FFRT_GITEE
/**
* @tc.name: WakeupCount
* @tc.desc: Test whether the WakeupCount interface are normal.
* @tc.type: FUNC
*
*
*/
HWTEST_F(CpuMonitorTest, WakeupCount, TestSize.Level1)
{
CPUWorkerManager *it = new SCPUWorkerManager();
SCPUMonitor cpu({
std::bind(&CPUWorkerManager::IncWorker, it, std::placeholders::_1),
std::bind(&CPUWorkerManager::WakeupWorkers, it, std::placeholders::_1),
std::bind(&CPUWorkerManager::GetTaskCount, it, std::placeholders::_1)});
cpu.WakeupCount(QoS(5));
}
#else
/**
* @tc.name: WakeupSleep
* @tc.desc: Test whether the WakeupSleep interface are normal.
@ -109,6 +90,7 @@ HWTEST_F(CpuMonitorTest, WakeupCount, TestSize.Level1)
HWTEST_F(CpuMonitorTest, WakeupSleep, TestSize.Level1)
{
CPUWorkerManager *it = new SCPUWorkerManager();
EXPECT_NE(it, nullptr);
SCPUMonitor cpu({
std::bind(&CPUWorkerManager::IncWorker, it, std::placeholders::_1),
@ -117,7 +99,7 @@ HWTEST_F(CpuMonitorTest, WakeupSleep, TestSize.Level1)
cpu.WakeupSleep(QoS(5));
}
#endif
/**
* @tc.name: TimeoutCount
@ -129,6 +111,7 @@ HWTEST_F(CpuMonitorTest, WakeupSleep, TestSize.Level1)
HWTEST_F(CpuMonitorTest, TimeoutCount, TestSize.Level1)
{
CPUWorkerManager *it = new SCPUWorkerManager();
EXPECT_NE(it, nullptr);
SCPUMonitor cpu({
std::bind(&CPUWorkerManager::IncWorker, it, std::placeholders::_1),
std::bind(&CPUWorkerManager::WakeupWorkers, it, std::placeholders::_1),
@ -147,6 +130,7 @@ HWTEST_F(CpuMonitorTest, TimeoutCount, TestSize.Level1)
HWTEST_F(CpuMonitorTest, Notify, TestSize.Level1)
{
CPUWorkerManager *it = new SCPUWorkerManager();
EXPECT_NE(it, nullptr);
SCPUMonitor cpu({
std::bind(&CPUWorkerManager::IncWorker, it, std::placeholders::_1),
std::bind(&CPUWorkerManager::WakeupWorkers, it, std::placeholders::_1),
@ -167,6 +151,7 @@ HWTEST_F(CpuMonitorTest, Notify, TestSize.Level1)
HWTEST_F(CpuMonitorTest, IntoDeepSleep, TestSize.Level1)
{
CPUWorkerManager *it = new SCPUWorkerManager();
EXPECT_NE(it, nullptr);
SCPUMonitor cpu({
std::bind(&CPUWorkerManager::IncWorker, it, std::placeholders::_1),
std::bind(&CPUWorkerManager::WakeupWorkers, it, std::placeholders::_1),
@ -175,28 +160,11 @@ HWTEST_F(CpuMonitorTest, IntoDeepSleep, TestSize.Level1)
cpu.IntoDeepSleep(QoS(5));
}
#ifdef FFRT_GITEE
/**
* @tc.name: OutOfDeepSleep
* @tc.desc: Test whether the OutOfDeepSleep interface are normal.
* @tc.type: FUNC
*
*
*/
HWTEST_F(CpuMonitorTest, OutOfDeepSleep, TestSize.Level1)
{
CPUWorkerManager *it = new SCPUWorkerManager();
SCPUMonitor cpu({
std::bind(&CPUWorkerManager::IncWorker, it, std::placeholders::_1),
std::bind(&CPUWorkerManager::WakeupWorkers, it, std::placeholders::_1),
std::bind(&CPUWorkerManager::GetTaskCount, it, std::placeholders::_1)});
cpu.OutOfDeepSleep(QoS(5));
}
#else
HWTEST_F(CpuMonitorTest, WakeupDeepSleep, TestSize.Level1)
{
CPUWorkerManager *it = new SCPUWorkerManager();
EXPECT_NE(it, nullptr);
SCPUMonitor cpu({
std::bind(&CPUWorkerManager::IncWorker, it, std::placeholders::_1),
std::bind(&CPUWorkerManager::WakeupWorkers, it, std::placeholders::_1),
@ -204,7 +172,6 @@ HWTEST_F(CpuMonitorTest, WakeupDeepSleep, TestSize.Level1)
cpu.WakeupDeepSleep(QoS(5));
}
#endif
/**
* @tc.name: IsExceedDeepSleepThreshold
@ -216,6 +183,7 @@ HWTEST_F(CpuMonitorTest, WakeupDeepSleep, TestSize.Level1)
HWTEST_F(CpuMonitorTest, IsExceedDeepSleepThreshold, TestSize.Level1)
{
CPUWorkerManager *it = new SCPUWorkerManager();
EXPECT_NE(it, nullptr);
SCPUMonitor cpu({
std::bind(&CPUWorkerManager::IncWorker, it, std::placeholders::_1),
std::bind(&CPUWorkerManager::WakeupWorkers, it, std::placeholders::_1),

View File

@ -59,11 +59,11 @@ HWTEST_F(DeadlineTest, qos_interval_create_test, TestSize.Level1)
qos qos = qos_deadline_request;
interval qi = qos_interval_create(deadline_us, qos);
EXPECT_NE(&qi, nullptr);
EXPECT_NE(qi, nullptr);
qos = qos_max + 1;
interval qi1 = qos_interval_create(deadline_us, qos);
EXPECT_NE(&qi1, nullptr);
EXPECT_EQ(qi1, nullptr);
}
/**
@ -74,12 +74,14 @@ HWTEST_F(DeadlineTest, qos_interval_create_test, TestSize.Level1)
HWTEST_F(DeadlineTest, qos_interval_destroy_test, TestSize.Level1)
{
interval* qi = new interval();
EXPECT_NE(qi, nullptr);
qos_interval_destroy(*qi);
uint64_t deadline_us = 50000;
qos qos = qos_max + 1;
interval qi1 = qos_interval_create(deadline_us, qos);
EXPECT_EQ(qi1, nullptr);
qos_interval_destroy(qi1);
}
@ -97,6 +99,7 @@ HWTEST_F(DeadlineTest, qos_interval_begin_test, TestSize.Level1)
qos qos = qos_max + 1;
interval qi1 = qos_interval_create(deadline_us, qos);
EXPECT_EQ(qi1, nullptr);
qos_interval_begin(qi1);
}
@ -114,6 +117,7 @@ HWTEST_F(DeadlineTest, qos_interval_update_test, TestSize.Level1)
qos_interval_update(*qi, new_deadline_us);
interval qi1 = qos_interval_create(deadline_us, qos);
EXPECT_EQ(qi1, nullptr);
qos_interval_update(qi1, new_deadline_us);
}
@ -130,6 +134,7 @@ HWTEST_F(DeadlineTest, qos_interval_end_test, TestSize.Level1)
qos_interval_end(*qi);
interval qi1 = qos_interval_create(deadline_us, qos);
EXPECT_EQ(qi1, nullptr);
qos_interval_end(qi1);
}
@ -143,10 +148,12 @@ HWTEST_F(DeadlineTest, qos_interval_join_test, TestSize.Level1)
uint64_t deadline_us = 50000;
qos qos = qos_deadline_request;
interval ret = qos_interval_create(deadline_us, qos);
EXPECT_NE(ret, nullptr);
qos_interval_join(ret);
qos = qos_max + 1;
interval ret1 = qos_interval_create(deadline_us, qos);
EXPECT_EQ(ret1, nullptr);
qos_interval_join(ret1);
}
@ -160,9 +167,11 @@ HWTEST_F(DeadlineTest, qos_interval_leave_test, TestSize.Level1)
uint64_t deadline_us = 50000;
qos qos = qos_deadline_request;
interval ret = qos_interval_create(deadline_us, qos);
EXPECT_NE(ret, nullptr);
qos_interval_leave(ret);
qos = qos_max + 1;
interval ret1 = qos_interval_create(deadline_us, qos);
EXPECT_EQ(ret1, nullptr);
qos_interval_leave(ret1);
}

View File

@ -69,6 +69,7 @@ HWTEST_F(ExecuteUnitTest, BindWG, TestSize.Level1)
{
QoS *qos1 = new QoS();
FFRTFacade::GetEUInstance().BindWG(DevType(0), *qos1);
EXPECT_EQ(*qos1, qos_default);
}
/**
@ -80,6 +81,7 @@ HWTEST_F(ExecuteUnitTest, UnbindTG, TestSize.Level1)
{
QoS *qos1 = new QoS();
FFRTFacade::GetEUInstance().UnbindTG(DevType(0), *qos1);
EXPECT_EQ(*qos1, qos_default);
}
/**
@ -91,6 +93,7 @@ HWTEST_F(ExecuteUnitTest, BindTG, TestSize.Level1)
{
QoS *qos1 = new QoS();
ThreadGroup* it = FFRTFacade::GetEUInstance().BindTG(DevType(0), *qos1);
EXPECT_EQ(*qos1, qos_default);
}
}

View File

@ -70,6 +70,7 @@ HWTEST_F(FrameIntervalTest, OnQoSIntervalsTest, TestSize.Level1)
FrameInterval* fi = new FrameInterval(100000, QoS(5));
fi->OnQoSIntervals(ffrt::IntervalState::DEADLINE_BEGIN);
fi->OnQoSIntervals(ffrt::IntervalState::DEADLINE_END);
EXPECT_NE(fi, nullptr);
}
/**
@ -110,6 +111,7 @@ HWTEST_F(FrameIntervalTest, EndTest, TestSize.Level1)
HWTEST_F(FrameIntervalTest, updateTest, TestSize.Level1)
{
FrameInterval* fi = new FrameInterval(100000, QoS(5));
EXPECT_NE(fi, nullptr);
uint64_t deadline = 900;
fi->Update(deadline);
deadline = 1500000;
@ -127,6 +129,7 @@ HWTEST_F(FrameIntervalTest, updateTest, TestSize.Level1)
HWTEST_F(FrameIntervalTest, JoinTest, TestSize.Level1)
{
FrameInterval* fi = new FrameInterval(100000, QoS(5));
EXPECT_NE(fi, nullptr);
fi->Join();
fi->Leave();
}

View File

@ -18,6 +18,7 @@
#include "c/ffrt_dump.h"
#include "sync/delayed_worker.h"
#include "../common.h"
#include "util/ffrt_facade.h"
using namespace std;
using namespace ffrt;
@ -59,7 +60,7 @@ void SendDelayedWorker(uint64_t timeoutUs)
g_delayWorkerThreadTestWe.tp = delay;
g_delayWorkerThreadTestWe.cb = ([](ffrt::WaitEntry* we) { CheckCallBackThreadName(); });
DelayedWorker::GetInstance().dispatch(g_delayWorkerThreadTestWe.tp,
FFRTFacade::GetDWInstance().dispatch(g_delayWorkerThreadTestWe.tp,
&g_delayWorkerThreadTestWe, g_delayWorkerThreadTestWe.cb);
}

View File

@ -13,11 +13,11 @@
* limitations under the License.
*/
#include <gtest/gtest.h>
#include <cinttypes>
#include "ffrt_inner.h"
#include "c/executor_task.h"
#include "tm/scpu_task.h"
#include "dfx/log/ffrt_log_api.h"
#include <cinttypes>
#include "../common.h"
using namespace std;

View File

@ -73,32 +73,41 @@ HWTEST_F(CgroupQosTest, UpdateSchedAttr_test, TestSize.Level1)
HWTEST_F(CgroupQosTest, SetTidToCGroup_test, TestSize.Level1)
{
OSAttrManager::Instance()->SetTidToCGroup(100);
int32_t pid = 100;
OSAttrManager::Instance()->SetTidToCGroup(pid);
OSAttrManager::Instance()->SetTidToCGroup(-1);
EXPECT_EQ(pid, 100);
}
HWTEST_F(CgroupQosTest, SetCGroupCtlPara_test, TestSize.Level1)
{
OSAttrManager::Instance()->SetCGroupCtlPara("", 1);
OSAttrManager::Instance()->SetCGroupCtlPara("test", 1);
int32_t value = 1;
OSAttrManager::Instance()->SetCGroupCtlPara("", value);
OSAttrManager::Instance()->SetCGroupCtlPara("test", value);
EXPECT_EQ(value, 1);
}
HWTEST_F(CgroupQosTest, SetCGroupSetPara_test, TestSize.Level1)
{
OSAttrManager::Instance()->SetCGroupSetPara("", "1");
OSAttrManager::Instance()->SetCGroupSetPara("test", "1");
std::string value = "1";
OSAttrManager::Instance()->SetCGroupSetPara("", value);
OSAttrManager::Instance()->SetCGroupSetPara("test", value);
EXPECT_EQ(value, "1");
}
HWTEST_F(CgroupQosTest, SetTidToCGroupPrivate_test, TestSize.Level1)
{
OSAttrManager::Instance()->SetTidToCGroupPrivate("test", 100);
int32_t pid = 100;
OSAttrManager::Instance()->SetTidToCGroupPrivate("test", pid);
OSAttrManager::Instance()->SetTidToCGroupPrivate("test", -1);
EXPECT_EQ(pid, 100);
}
HWTEST_F(CgroupQosTest, SetCGroupPara_test, TestSize.Level1)
{
int a = 100;
OSAttrManager::Instance()->SetCGroupPara("/proc/cpuinfo", a);
EXPECT_EQ(a, 100);
}
HWTEST_F(CgroupQosTest, SetCGroupPara_err_test, TestSize.Level1)
@ -109,6 +118,7 @@ HWTEST_F(CgroupQosTest, SetCGroupPara_err_test, TestSize.Level1)
#endif
int a = 3;
OSAttrManager::Instance()->SetCGroupPara("/proc/cpuinfo", a);
EXPECT_EQ(a, 3);
#ifndef WITH_NO_MOCKER
MOCKER(write).stubs().will(returnValue(-1));
MOCKER(read).stubs().will(returnValue(0));
@ -143,7 +153,12 @@ protected:
HWTEST_F(QosTest, QosConfig_test, TestSize.Level1)
{
#ifndef WITH_NO_MOCKER
int i = 0;
auto handle = ffrt::submit_h([]{
QosConfig::Instance().setPolicySystem();
i++;
});
EXPECT_EQ(i, 1);
#endif
}
@ -171,13 +186,15 @@ HWTEST_F(QosInterfaceTest, QosPolicyTest, TestSize.Level1)
struct QosPolicyData qp = {0, 0, 0, 0, 0};
struct QosPolicyDatas policyDatas = {0, 0, {qp}};
QosPolicy(&policyDatas);
int ret = QosPolicy(&policyDatas);
EXPECT_NE(ret, 0);
}
HWTEST_F(QosInterfaceTest, FFRTEnableRtgTest, TestSize.Level1)
{
bool flag = false;
FFRTEnableRtg(flag);
EXPECT_EQ(flag, false);
}
HWTEST_F(QosInterfaceTest, FFRTAuthEnableTest, TestSize.Level1)
@ -186,6 +203,7 @@ HWTEST_F(QosInterfaceTest, FFRTAuthEnableTest, TestSize.Level1)
unsigned int uaFlag = 0x1fff;
unsigned int status = 3;
FFRTAuthEnable(uid, uaFlag, status);
EXPECT_EQ(status, 3);
}
HWTEST_F(QosInterfaceTest, FFRTAuthSwitchTest, TestSize.Level1)
@ -195,12 +213,14 @@ HWTEST_F(QosInterfaceTest, FFRTAuthSwitchTest, TestSize.Level1)
unsigned int qosFlag = 0x0003;
unsigned int status = 3;
FFRTAuthSwitch(uid, rtgFlag, qosFlag, status);
EXPECT_EQ(status, 3);
}
HWTEST_F(QosInterfaceTest, FFRTAuthDeleteTest, TestSize.Level1)
{
unsigned int uid = 3039;
FFRTAuthDelete(uid);
EXPECT_EQ(uid, 3039);
}
HWTEST_F(QosInterfaceTest, FFRTAuthPauseTest, TestSize.Level1)
@ -210,6 +230,7 @@ HWTEST_F(QosInterfaceTest, FFRTAuthPauseTest, TestSize.Level1)
unsigned int status = 3;
FFRTAuthEnable(uid, uaFlag, status);
FFRTAuthPause(uid);
EXPECT_EQ(uid, 3039);
}
HWTEST_F(QosInterfaceTest, FFRTAuthGetTest, TestSize.Level1)
@ -220,6 +241,7 @@ HWTEST_F(QosInterfaceTest, FFRTAuthGetTest, TestSize.Level1)
FFRTAuthEnable(uid, uaFlag, status);
FFRTAuthGet(uid, &uaFlag, &status);
EXPECT_EQ(status, 3);
}
HWTEST_F(QosInterfaceTest, FFRTQosApplyTest, TestSize.Level1)
@ -227,6 +249,7 @@ HWTEST_F(QosInterfaceTest, FFRTQosApplyTest, TestSize.Level1)
unsigned int level = 1;
FFRTQosApply(level);
EXPECT_EQ(level, 1);
}
HWTEST_F(QosInterfaceTest, FFRTQosApplyForOtherTest, TestSize.Level1)
@ -235,6 +258,7 @@ HWTEST_F(QosInterfaceTest, FFRTQosApplyForOtherTest, TestSize.Level1)
int tid = 0;
FFRTQosApplyForOther(level, tid);
EXPECT_EQ(level, 1);
}
HWTEST_F(QosInterfaceTest, FFRTQosLeaveTest, TestSize.Level1)
@ -249,6 +273,7 @@ HWTEST_F(QosInterfaceTest, FFRTQosLeaveForOtherTest, TestSize.Level1)
FFRTQosApplyForOther(level, tid);
FFRTQosLeaveForOther(tid);
EXPECT_EQ(level, 1);
}
HWTEST_F(QosInterfaceTest, FFRTQosConvertInt, TestSize.Level1)
@ -257,4 +282,5 @@ HWTEST_F(QosInterfaceTest, FFRTQosConvertInt, TestSize.Level1)
QoS qos2 = 2;
QoS qos3 = qos1 + qos2;
printf("qos3=%d", qos3());
EXPECT_EQ(qos3, 3);
}

View File

@ -56,7 +56,9 @@ protected:
HWTEST_F(CoreTest, core_test_success_01, TestSize.Level1)
{
sync_io(0);
int fd = 0;
sync_io(fd);
EXPECT_EQ(fd, 0);
}
HWTEST_F(CoreTest, task_ctx_success_01, TestSize.Level1)
@ -202,8 +204,10 @@ HWTEST_F(CoreTest, task_attr_set_timeout_nullptr, TestSize.Level1)
*/
HWTEST_F(CoreTest, ffrt_task_handle_ref_nullptr, TestSize.Level1)
{
ffrt_task_handle_inc_ref(nullptr);
ffrt_task_handle_dec_ref(nullptr);
ffrt_task_handle_t handle = nullptr;
ffrt_task_handle_inc_ref(handle);
ffrt_task_handle_dec_ref(handle);
EXPECT_EQ(handle, nullptr);
}
/**

View File

@ -24,8 +24,12 @@
#include "c/executor_task.h"
#include "ffrt_inner.h"
#include "eu/cpu_monitor.h"
#include "eu/cpu_worker.h"
#include "eu/co_routine.h"
#include "eu/scpuworker_manager.h"
#include "sched/scheduler.h"
#include "../common.h"
#include "tm/scpu_task.h"
using namespace std;
using namespace ffrt;
@ -154,3 +158,38 @@ HWTEST_F(CoroutineTest, ffrt_epoll_ctl_add_del, TestSize.Level1)
ffrt_epoll_ctl(ffrt_qos_default, EPOLL_CTL_DEL, testFd, 0, nullptr, nullptr);
close(testFd);
}
HWTEST_F(CoroutineTest, coroutine_alloc_fail, TestSize.Level1)
{
ffrt::task_attr attr;
const uint64_t id = 0;
ffrt::SCPUEUTask task(reinterpret_cast<ffrt::task_attr_private *>(&attr), nullptr, id, ffrt::QoS(2));
task.coRoutine = nullptr;
task.stack_size = 100 * (1ULL << 40); //100T
struct CoRoutineEnv env;
CPUWorkerManager* manager = new SCPUWorkerManager();
CpuWorkerOps ops {
CPUWorker::WorkerLooperStandard,
std::bind(&CPUWorkerManager::PickUpTaskFromGlobalQueue, manager, std::placeholders::_1),
std::bind(&CPUWorkerManager::NotifyTaskPicked, manager, std::placeholders::_1),
std::bind(&CPUWorkerManager::WorkerIdleActionSimplified, manager, std::placeholders::_1),
std::bind(&CPUWorkerManager::WorkerRetired, manager, std::placeholders::_1),
std::bind(&CPUWorkerManager::WorkerPrepare, manager, std::placeholders::_1),
std::bind(&CPUWorkerManager::TryPoll, manager, std::placeholders::_1, std::placeholders::_2),
std::bind(&CPUWorkerManager::StealTaskBatch, manager, std::placeholders::_1),
std::bind(&CPUWorkerManager::PickUpTaskBatch, manager, std::placeholders::_1),
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
std::bind(&CPUWorkerManager::IsExceedRunningThreshold, manager, std::placeholders::_1),
std::bind(&CPUWorkerManager::IsBlockAwareInit, manager),
#endif
};
CPUWorker* worker = new CPUWorker(QoS(2), std::move(ops), manager);
EXPECT_NE(worker, nullptr);
sleep(1); // wait worker into wait action
worker->Run(&task, &env, worker);
delete manager;
worker->Join();
delete worker;
}

View File

@ -21,6 +21,7 @@
#include "c/executor_task.h"
#include "tm/scpu_task.h"
#include "dfx/log/ffrt_log_api.h"
#include "dm/sdependence_manager.h"
#ifndef WITH_NO_MOCKER
extern "C" int ffrt_set_cgroup_attr(ffrt_qos_t qos, ffrt_os_sched_attr *attr);
#endif
@ -100,8 +101,6 @@ HWTEST_F(DependencyTest, update_qos_success_04, TestSize.Level1)
ffrt::submit([] {
printf("return %d\n", ffrt::this_task::update_qos(static_cast<int>(ffrt::qos_user_initiated)));
});
int ret2 = ffrt_set_cpu_worker_max_num(static_cast<int>(ffrt::qos_user_initiated), 4);
EXPECT_EQ(ret2, 0);
}
HWTEST_F(DependencyTest, update_qos_success_05, TestSize.Level1)
@ -165,28 +164,21 @@ HWTEST_F(DependencyTest, update_qos_failed_02, TestSize.Level1)
ffrt::submit([] {
printf("return %d\n", ffrt::this_task::update_qos(static_cast<int>(ffrt::qos_user_initiated)));
});
int ret1 = ffrt_set_cpu_worker_max_num(static_cast<int>(ffrt::qos_inherit), 4);
EXPECT_EQ(ret1, -1);
}
HWTEST_F(DependencyTest, executor_task_submit_success_01, TestSize.Level1)
HWTEST_F(DependencyTest, executor_task_submit_success_cancel_01, TestSize.Level1)
{
ffrt_task_attr_t attr;
static ffrt_executor_task_t work;
work.wq[0] = &work.wq;
work.wq[1] = &work.wq;
work.type = reinterpret_cast<uintptr_t>(&attr);
ffrt_executor_task_submit(nullptr, nullptr);
ffrt_executor_task_submit(&work, &attr);
}
HWTEST_F(DependencyTest, executor_task_submit_nullptr_01, TestSize.Level1)
{
ffrt_executor_task_submit(nullptr, nullptr);
}
HWTEST_F(DependencyTest, executor_task_submit_cancel_01, TestSize.Level1)
{
ffrt_executor_task_cancel(nullptr, static_cast<int>(ffrt::qos_user_initiated));
int ret = ffrt_executor_task_cancel(nullptr, static_cast<int>(ffrt::qos_user_initiated));
EXPECT_EQ(ret, 0);
}
HWTEST_F(DependencyTest, executor_task_submit_cancel_02, TestSize.Level1)
@ -205,19 +197,16 @@ HWTEST_F(DependencyTest, executor_task_submit_cancel_02, TestSize.Level1)
ffrt_task_attr_destroy(&attr);
}
HWTEST_F(DependencyTest, update_trace_tag_success_02, TestSize.Level1)
HWTEST_F(DependencyTest, update_trace_tag_task_attr_success, TestSize.Level1)
{
ffrt::set_trace_tag("TASK A");
ffrt::clear_trace_tag();
}
HWTEST_F(DependencyTest, task_attr_success_02, TestSize.Level1)
{
ffrt::task_attr tmpTask;
tmpTask.name("Task A");
tmpTask.qos(static_cast<int>(ffrt::qos_user_initiated));
tmpTask.name();
tmpTask.qos();
EXPECT_EQ(ffrt_task_attr_get_qos(&tmpTask), ffrt::qos_user_initiated);
}
HWTEST_F(DependencyTest, sample_pingpong_pipe_interval_checkpoint, TestSize.Level1)
@ -285,3 +274,15 @@ HWTEST_F(DependencyTest, sample_pingpong_pipe_interval_checkpoint, TestSize.Leve
ffrt::qos_interval_destroy(it);
}
void AddOne(void* args)
{
*(static_cast<int*>(args)) += 1;
}
HWTEST_F(DependencyTest, dependency_onsubmit_dev, TestSize.Level1)
{
int data = 0;
ffrt_task_handle_t handle = nullptr;
ffrt::SDependenceManager& dependenceManager = ffrt::SDependenceManager::Instance();
}

View File

@ -15,9 +15,13 @@
#include <gtest/gtest.h>
#include "ffrt_inner.h"
#include "../common.h"
using namespace std;
using namespace testing;
#ifdef HWTEST_TESTING_EXT_ENABLE
using namespace testing::ext;
#endif
class DumpTest : public testing::Test {
protected:
@ -37,3 +41,15 @@ protected:
{
}
};
/*
* dump_succ
* dump info失败
* 1ffrt_dump获取dump info
*
*/
HWTEST_F(DumpTest, dump_succ, TestSize.Level1)
{
char dumpinfo[1024 * 512] = {0};
ffrt_dump(ffrt_dump_cmd_t::DUMP_INFO_ALL, dumpinfo, 1024 * 512);
}

View File

@ -72,7 +72,7 @@ HWTEST_F(ExecuteUnitTest, submit_cancel_failed, TestSize.Level1)
auto h2 = ffrt::submit_h([&]() { x += 2; }, {&x}, {&x}, ffrt::task_attr().delay(1));
int cancel_ret = ffrt::skip(h2);
EXPECT_EQ(cancel_ret, 0);
ffrt::wait();
ffrt::wait({h1});
EXPECT_EQ(x, 1);
cancel_ret = ffrt::skip(h1);

View File

@ -582,6 +582,7 @@ HWTEST_F(ffrtIoTest, ffrt_task_attr_set_local_attr_invalid, TestSize.Level1)
{
bool isLocalSet = true;
ffrt_task_attr_set_local(nullptr, isLocalSet);
EXPECT_EQ(isLocalSet, true);
}
struct WakeData {

View File

@ -106,9 +106,12 @@ protected:
HWTEST_F(FloTest, FFRTFloApiSuccess, TestSize.Level1)
{
int i = 0;
InitCfg(1);
ffrt_flo_start(1);
i++;
ffrt_flo_end(1);
EXCEPT_EQ(i, 1);
}
HWTEST_F(FloTest, FFRTFloTaskWithoutYield, TestSize.Level1)

View File

@ -14,8 +14,8 @@
*/
#include <gtest/gtest.h>
#include "ffrt_inner.h"
#include <cstdlib>
#include "ffrt_inner.h"
#include "../common.h"
using namespace testing;

View File

@ -45,6 +45,7 @@ protected:
virtual void SetUp()
{
ffrt_task_timeout_set_threshold(1);
}
virtual void TearDown()

View File

@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <thread>
#include <chrono>
#include <gtest/gtest.h>
@ -30,26 +30,26 @@ using namespace testing;
#ifdef HWTEST_TESTING_EXT_ENABLE
using namespace testing::ext;
#endif
class PollerTest : public testing::Test {
protected:
static void SetUpTestCase()
{
}
static void TearDownTestCase()
{
}
virtual void SetUp()
{
}
virtual void TearDown()
{
}
};
static void Testfun(void* data)
{
int* testData = static_cast<int*>(data);
@ -57,7 +57,7 @@ static void Testfun(void* data)
printf("%d, timeout callback\n", *testData);
}
static void (*g_cb)(void*) = Testfun;
/*
* poll_once_batch_timeout
* PollOnce批量超时测试
@ -85,7 +85,7 @@ HWTEST_F(PollerTest, poll_once_batch_timeout, TestSize.Level1)
usleep(sleepTime);
poller.PollOnce(1);
EXPECT_EQ(true, poller.DetermineEmptyMap());
uint64_t timeout3 = 10000;
uint64_t timeout4 = 100;
int loopNum = 2;
@ -99,7 +99,7 @@ HWTEST_F(PollerTest, poll_once_batch_timeout, TestSize.Level1)
// 预计等待时间为100可能有几毫秒的误差
EXPECT_EQ(true, m >= timeout4 && m < timeout3);
}
/*
* cache_events_mask_test
* events缓存
@ -117,7 +117,7 @@ HWTEST_F(PollerTest, cache_events_mask_test, TestSize.Level1)
poller.CacheEventsAndDoMask(currTask, eventVec);
EXPECT_EQ(1, poller.m_cachedTaskEvents[currTask].size());
}
/*
* fetch_cached_event_unmask
* events缓存event
@ -171,4 +171,97 @@ HWTEST_F(PollerTest, unregister_timer_001, TestSize.Level1)
poller.timerMap_.clear();
poller.executedHandle_.clear();
}
}
std::mutex g_mutexRegister;
std::condition_variable g_cvRegister;
void WaitCallback(void* data)
{
int* dependency = reinterpret_cast<int*>(data);
while (*dependency != 1) {
std::this_thread::yield();
}
}
void EmptyCallback(void* data) {}
/*
* : multi_timer_dependency
* : poller批量超时回调依赖测试
* : timer A依赖回调B执行完成B依赖timer C取消成功
* : 1PollOnce接口
* : 1
*/
HWTEST_F(PollerTest, multi_timer_dependency, TestSize.Level1)
{
int dependency = 0;
ffrt::task_handle handle = ffrt::submit_h([&] {
std::unique_lock lk(g_mutexRegister);
g_cvRegister.wait(lk);
dependency = 1;
});
TimePoint timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds(100);
TimerDataWithCb data(&dependency, WaitCallback, nullptr, false, 100);
data.handle = 0;
Poller poller;
poller.timerMap_.emplace(timeout, data);
data.handle++;
data.cb = EmptyCallback;
poller.timerMap_.emplace(timeout, data);
std::thread th1([&] { poller.PollOnce(-1); });
std::thread th2([&] {
usleep(100 * 1000);
poller.UnregisterTimer(1);
g_cvRegister.notify_all();
});
th1.join();
th2.join();
ffrt::wait({handle});
}
/*
* : multi_timer_dependency_unregister_self
* : poller批量超时回调,
* : timer A依赖回调B执行完成B依赖另一个线程的timer A取消成功
* : 1PollOnce接口
* : 1
*/
HWTEST_F(PollerTest, multi_timer_dependency_unregister_self, TestSize.Level1)
{
int dependency = 0;
TimePoint timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds(100);
TimerDataWithCb data(&dependency, WaitCallback, nullptr, false, 100);
data.handle = 0;
Poller poller;
poller.timerMap_.emplace(timeout, data);
data.handle++;
data.cb = EmptyCallback;
poller.timerMap_.emplace(timeout, data);
ffrt::task_handle handle = ffrt::submit_h([&] {
std::unique_lock lk(g_mutexRegister);
g_cvRegister.wait(lk);
poller.IsTimerReady();
dependency = 1;
});
std::thread th1([&] { poller.PollOnce(-1); });
std::thread th2([&] {
usleep(100 * 1000);
g_cvRegister.notify_all();
poller.UnregisterTimer(0);
});
th1.join();
th2.join();
ffrt::wait({handle});
}

View File

@ -19,6 +19,7 @@
#include "ffrt_inner.h"
#include "c/queue_ext.h"
#include "../common.h"
#include "queue/base_queue.h"
using namespace std;
using namespace ffrt;
@ -791,12 +792,12 @@ HWTEST_F(QueueTest, ffrt_get_current_queue, TestSize.Level1)
/*
* : ffrt_queue_set_eventhand
* eventhandler
* 1
2ffrt_queue_set_eventhandler接口设置串行队列的eventhandler
3
*
* : 1
2ffrt_queue_set_eventhandler接口设置串行队列的eventhandler
3
* :
*/
TEST_F(QueueTest, ffrt_queue_set_eventhand)
HWTEST_F(QueueTest, ffrt_queue_set_eventhand, TestSize.Level1)
{
ffrt_queue_attr_t queue_attr;
(void)ffrt_queue_attr_init(&queue_attr);
@ -807,4 +808,17 @@ TEST_F(QueueTest, ffrt_queue_set_eventhand)
EXPECT_EQ(temphandler, nullptr);
ffrt_queue_attr_destroy(&queue_attr);
ffrt_queue_destroy(queue_handle);
}
}
TEST_F(QueueTest, ffrt_queue_print_mutex_owner_info)
{
ffrt_queue_attr_t queue_attr;
(void)ffrt_queue_attr_init(&queue_attr);
std::unique_ptr<BaseQueue> queue = CreateQueue(ffrt_queue_serial, &queue_attr);
queue->PrintMutexOwner();
RecordMutex recordMutex;
(void)recordMutex.IsTimeout();
ffrt_queue_attr_destroy(&queue_attr);
}

View File

@ -54,6 +54,7 @@ HWTEST_F(RTGTest, rtg_init_test, TestSize.Level1)
{
bool enabled = RTGCtrl::Instance().Enabled();
FFRT_LOGE("RTGCtrl Init %s", enabled ? "Success" : "Failed");
EXPECT_EQ(enabled, false);
}
HWTEST_F(RTGTest, rtg_get_group_test, TestSize.Level1)
@ -62,11 +63,13 @@ HWTEST_F(RTGTest, rtg_get_group_test, TestSize.Level1)
if (tgid < 0) {
FFRT_LOGE("Failed to Get RTG id %d", tgid);
}
EXPECT_LE(tgid, 0);
bool ret = RTGCtrl::Instance().PutThreadGroup(tgid);
if (!ret) {
FFRT_LOGE("Failed to Put RTG id %d", tgid);
}
EXPECT_EQ(ret, false);
}
HWTEST_F(RTGTest, rtg_set_window_size_test, TestSize.Level1)
@ -77,16 +80,19 @@ HWTEST_F(RTGTest, rtg_set_window_size_test, TestSize.Level1)
if (tgid < 0) {
FFRT_LOGE("Failed to Get RTG id %d", tgid);
}
EXPECT_LE(tgid, 0);
bool ret = RTGCtrl::Instance().SetGroupWindowSize(tgid, WINDOW_SIZE);
if (!ret) {
FFRT_LOGE("Failed to Set Window Size %d", WINDOW_SIZE);
}
EXPECT_EQ(ret, false);
ret = RTGCtrl::Instance().PutThreadGroup(tgid);
if (!ret) {
FFRT_LOGE("Failed to Put RTG id %d", tgid);
}
EXPECT_EQ(ret, false);
}
HWTEST_F(RTGTest, rtg_set_invalid_interval_test, TestSize.Level1)
@ -97,16 +103,19 @@ HWTEST_F(RTGTest, rtg_set_invalid_interval_test, TestSize.Level1)
if (tgid < 0) {
FFRT_LOGE("Failed to Get RTG id %d", tgid);
}
EXPECT_LE(tgid, 0);
bool ret = RTGCtrl::Instance().SetInvalidInterval(tgid, INVALID_INTERVAL);
if (!ret) {
FFRT_LOGE("Failed to Set Invalid Interval %d", INVALID_INTERVAL);
}
EXPECT_EQ(ret, false);
ret = RTGCtrl::Instance().PutThreadGroup(tgid);
if (!ret) {
FFRT_LOGE("Failed to Put RTG id %d", tgid);
}
EXPECT_EQ(ret, false);
}
HWTEST_F(RTGTest, rtg_set_preferred_cluster_test, TestSize.Level1)
@ -117,16 +126,19 @@ HWTEST_F(RTGTest, rtg_set_preferred_cluster_test, TestSize.Level1)
if (tgid < 0) {
FFRT_LOGE("Failed to Get RTG id %d", tgid);
}
EXPECT_LE(tgid, 0);
bool ret = RTGCtrl::Instance().SetPreferredCluster(tgid, CLUSTER_ID);
if (!ret) {
FFRT_LOGE("Failed to Set Preferred Cluster %d", CLUSTER_ID);
}
EXPECT_EQ(ret, false);
ret = RTGCtrl::Instance().PutThreadGroup(tgid);
if (!ret) {
FFRT_LOGE("Failed to Put RTG id %d", tgid);
}
EXPECT_EQ(ret, false);
}
HWTEST_F(RTGTest, rtg_begin_end_test, TestSize.Level1)
@ -135,21 +147,25 @@ HWTEST_F(RTGTest, rtg_begin_end_test, TestSize.Level1)
if (tgid < 0) {
FFRT_LOGE("Failed to Get RTG id %d", tgid);
}
EXPECT_LE(tgid, 0);
bool ret = RTGCtrl::Instance().Begin(tgid);
if (!ret) {
FFRT_LOGE("Failed to Begin");
}
EXPECT_EQ(ret, false);
ret = RTGCtrl::Instance().End(tgid);
if (!ret) {
FFRT_LOGE("Failed to End");
}
EXPECT_EQ(ret, false);
ret = RTGCtrl::Instance().PutThreadGroup(tgid);
if (!ret) {
FFRT_LOGE("Failed to Put RTG id %d", tgid);
}
EXPECT_EQ(ret, false);
}
HWTEST_F(RTGTest, rtg_add_tread_test, TestSize.Level1)

View File

@ -172,6 +172,7 @@ HWTEST_F(SchedulerTest, taskstateCount_test, TestSize.Level1)
{
SCPUEUTask* task1 = new SCPUEUTask(nullptr, nullptr, 0, QoS(static_cast<int>(qos_user_interactive)));
SCPUEUTask *task2 = new SCPUEUTask(nullptr, task1, 0, QoS());
EXPECT_NE(task2, nullptr);
TaskManager::Instance().TaskStateCount(task2);
}

View File

@ -106,18 +106,21 @@ HWTEST_F(ThreadTest, SetExitedTest, TestSize.Level1)
HWTEST_F(ThreadTest, GetQosTest, TestSize.Level1)
{
WorkerThread* wt = new WorkerThread(QoS(6));
EXPECT_NE(wt, nullptr);
QoS ret = wt->GetQos();
}
HWTEST_F(ThreadTest, JoinTest, TestSize.Level1)
{
WorkerThread* wt = new WorkerThread(QoS(6));
EXPECT_NE(wt, nullptr);
wt->Join();
}
HWTEST_F(ThreadTest, DetachTest, TestSize.Level1)
{
WorkerThread* wt = new WorkerThread(QoS(6));
EXPECT_NE(wt, nullptr);
wt->Detach();
}
@ -156,5 +159,8 @@ HWTEST_F(ThreadTest, c_api_thread_simple_test, TestSize.Level1)
HWTEST_F(ThreadTest, wait_queue_test, TestSize.Level1)
{
TaskWithNode node = TaskWithNode();
ffrt::submit([]{
TaskWithNode node = TaskWithNode();
EXPECT_NE(node.task, nullptr);
}, {}, {});
}

View File

@ -110,6 +110,7 @@ HWTEST_F(WorkerThreadTest, SetExitedTest, TestSize.Level1)
HWTEST_F(WorkerThreadTest, GetQosTest, TestSize.Level1)
{
WorkerThread* wt = new WorkerThread(QoS(6));
EXPECT_NE(wt, nullptr);
QoS ret = wt->GetQos();
}
@ -121,6 +122,7 @@ HWTEST_F(WorkerThreadTest, GetQosTest, TestSize.Level1)
HWTEST_F(WorkerThreadTest, JoinTest, TestSize.Level1)
{
WorkerThread* wt = new WorkerThread(QoS(6));
EXPECT_NE(wt, nullptr);
wt->Join();
}
@ -132,5 +134,6 @@ HWTEST_F(WorkerThreadTest, JoinTest, TestSize.Level1)
HWTEST_F(WorkerThreadTest, DetachTest, TestSize.Level1)
{
WorkerThread* wt = new WorkerThread(QoS(6));
EXPECT_NE(wt, nullptr);
wt->Detach();
}