!449 bbox task uaf fix

Merge pull request !449 from 太帅太烦恼/master

Signed-off-by: baishimin <2258359443@qq.com>
This commit is contained in:
openharmony_ci 2024-10-18 10:11:59 +00:00 committed by baishimin
commit c82d8dc8e8
12 changed files with 443 additions and 104 deletions

View File

@ -33,7 +33,26 @@ FFRT_C_API int ffrt_skip(ffrt_task_handle_t handle);
// config
FFRT_C_API int ffrt_set_cgroup_attr(ffrt_qos_t qos, ffrt_os_sched_attr* attr);
FFRT_C_API void ffrt_restore_qos_config(void);
FFRT_C_API int ffrt_set_cpu_worker_max_num(ffrt_qos_t qos, uint32_t num);
/**
* @brief worker num setting.
*
* @param qosData param is default when value equal 0xffffffff.
* totalNum = lowQosReserveWorkerNum + highQosReserveWorkerNum + sum of all reserveNum
* totalNum is valid in (0,256].
* lowQosReserveWorkerNum is a low partition qos public resource.{[min, max], default} is {[0,256],12}.
* highQosReserveWorkerNum is a hight partition qos public resource.{[min, max], default} is {[0,256],12}.
* lowQosReserveWorkerNum is a global qos public resource.{[min, max], default} is {[0,256],24}.
* qosConfigArray is an array of ffrt_qos_config_attr.
* effectLen: param setting will success when qosConfigArray index less than effectLen.
* qos valid in [0,5].
* reserveNum: mininum number which qos can create worker.{[min, max], default} is {[0,256],8}.
* maxConcurrency is amx concurrency num of the qos.{[min, max], default} is {[0,12],8}.
* hardLimit: max number which qos can create worker.{[min, max], default} is {[0,256],44}.
* @return return ture when setting success.return false when setting fail, and param is default.
* @version 1.0
*/
FFRT_C_API bool ffrt_set_qos_worker_num(ffrt_worker_num_attr* qosData);
/**
* @brief Set the task execution timeout.

View File

@ -67,6 +67,26 @@ typedef struct {
char cpumap[MAX_CPUMAP_LENGTH];
} ffrt_os_sched_attr;
typedef struct {
unsigned int hardLimit;
unsigned int maxConcurrency;
unsigned int reserveNum;
unsigned int qos;
} ffrt_qos_config_attr;
#ifdef OHOS_STANDARD_SYSTEM
#define MAX_QOS_LENGTH 15
#else
#define MAX_QOS_LENGTH 19
#endif
typedef struct {
ffrt_qos_config_attr qosConfigArray[MAX_QOS_LENGTH];
unsigned int effectLen;
unsigned int lowQosReserveWorkerNum;
unsigned int highQosReserveWorkerNum;
unsigned int globalReserveWorkerNum;
} ffrt_worker_num_attr;
typedef struct {
long storage;
} ffrt_rwlockattr_t;

View File

@ -58,9 +58,27 @@ static inline void restore_qos_config()
ffrt_restore_qos_config();
}
static inline int set_cpu_worker_max_num(qos qos_, uint32_t num)
/**
* @brief worker num setting.
*
* @param qosData param is default when value equal 0xffffffff.
* totalNum = lowQosReserveWorkerNum + highQosReserveWorkerNum + sum of all reserveNum
* totalNum is valid in (0,256].
* lowQosReserveWorkerNum is a low partition qos public resource.{[min, max], default} is {[0,256],12}.
* highQosReserveWorkerNum is a hight partition qos public resource.{[min, max], default} is {[0,256],12}.
* lowQosReserveWorkerNum is a global qos public resource.{[min, max], default} is {[0,256],24}.
* qosConfigArray is an array of ffrt_qos_config_attr.
* effectLen: param setting will success when qosConfigArray index less than effectLen.
* qos valid in [0,5].
* reserveNum: mininum number which qos can create worker.{[min, max], default} is {[0,256],8}.
* maxConcurrency is amx concurrency num of the qos.{[min, max], default} is {[0,12],8}.
* hardLimit: max number which qos can create worker.{[min, max], default} is {[0,256],44}.
* @return return ture when setting success.return false when setting fail, and param is default.
* @version 1.0
*/
static inline bool set_qos_worker_num(ffrt_worker_num_attr* qosData)
{
return ffrt_set_cpu_worker_max_num(qos_, num);
return ffrt_set_qos_worker_num(qosData);
}
/**

View File

@ -25,6 +25,7 @@
#include "internal_inc/config.h"
#include "eu/osattr_manager.h"
#include "eu/worker_thread.h"
#include "eu/cpu_monitor.h"
#include "dfx/log/ffrt_log_api.h"
#include "dfx/trace_record/ffrt_trace_record.h"
#include "dfx/watchdog/watchdog_util.h"
@ -406,19 +407,10 @@ void ffrt_restore_qos_config()
}
API_ATTRIBUTE((visibility("default")))
int ffrt_set_cpu_worker_max_num(ffrt_qos_t qos, uint32_t num)
bool ffrt_set_qos_worker_num(ffrt_worker_num_attr *qosData)
{
if (ffrt::GetFuncQosMap() == nullptr) {
FFRT_LOGE("FuncQosMap has not regist");
return -1;
}
ffrt::QoS _qos = ffrt::GetFuncQosMap()(qos);
if (((qos != ffrt::qos_default) && (_qos() == ffrt::qos_default)) || (qos <= ffrt::qos_inherit)) {
FFRT_LOGE("qos[%d] is invalid.", qos);
return -1;
}
ffrt::CPUMonitor *monitor = ffrt::FFRTFacade::GetEUInstance().GetCPUMonitor();
return monitor->SetWorkerMaxNum(_qos, num);
return monitor->QosWorkerNumSegment(qosData);
}
API_ATTRIBUTE((visibility("default")))

View File

@ -131,6 +131,7 @@ static inline void SaveReadyQueueStatus()
static inline void SaveNormalTaskStatus()
{
TaskFactory::LockMem();
auto unfree = TaskFactory::GetUnfreedMem();
auto apply = [&](const char* tag, const std::function<bool(CPUEUTask*)>& filter) {
std::vector<CPUEUTask*> tmp;
@ -168,12 +169,13 @@ static inline void SaveNormalTaskStatus()
apply("pending task", [](CPUEUTask* t) {
return t->state == TaskState::PENDING;
});
TaskFactory::UnlockMem();
}
static inline void SaveQueueTaskStatus()
{
std::lock_guard lk(SimpleAllocator<QueueTask>::Instance()->lock);
auto unfreeQueueTask = SimpleAllocator<QueueTask>::getUnSafeUnfreedMem();
auto unfreeQueueTask = SimpleAllocator<QueueTask>::getUnfreedMem();
auto applyqueue = [&](const char* tag, const std::function<bool(QueueTask*)>& filter) {
std::vector<QueueTask*> tmp;
for (auto task : unfreeQueueTask) {
@ -490,6 +492,7 @@ std::string SaveNormalTaskStatusInfo(void)
{
std::string ffrtStackInfo;
std::ostringstream ss;
TaskFactory::LockMem();
auto unfree = TaskFactory::GetUnfreedMem();
auto apply = [&](const char* tag, const std::function<bool(CPUEUTask*)>& filter) {
std::vector<CPUEUTask*> tmp;
@ -532,6 +535,7 @@ std::string SaveNormalTaskStatusInfo(void)
apply("pending task", [](CPUEUTask* t) {
return t->state == TaskState::PENDING;
});
TaskFactory::UnlockMem();
return ffrtStackInfo;
}
@ -541,7 +545,7 @@ std::string SaveQueueTaskStatusInfo()
std::string ffrtStackInfo;
std::ostringstream ss;
std::lock_guard lk(SimpleAllocator<QueueTask>::Instance()->lock);
auto unfreeQueueTask = SimpleAllocator<QueueTask>::getUnSafeUnfreedMem();
auto unfreeQueueTask = SimpleAllocator<QueueTask>::getUnfreedMem();
auto applyqueue = [&](const char* tag, const std::function<bool(QueueTask*)>& filter) {
std::vector<QueueTask*> tmp;
for (auto task : unfreeQueueTask) {

View File

@ -38,6 +38,7 @@ constexpr int JITTER_DELAY_MS = 5;
namespace ffrt {
CPUMonitor::CPUMonitor(CpuMonitorOps&& ops) : ops(ops)
{
LogAllWorkerNum();
SetupMonitor();
StartMonitor();
}
@ -53,10 +54,25 @@ CPUMonitor::~CPUMonitor()
void CPUMonitor::SetupMonitor()
{
globalReserveWorkerNum = DEFAULT_GLOBAL_RESERVE_NUM;
lowQosReserveWorkerNum = DEFAULT_LOW_RESERVE_NUM;
highQosReserveWorkerNum = DEFAULT_HIGH_RESERVE_NUM;
globalReserveWorkerToken = std::make_unique<Token>(globalReserveWorkerNum);
lowQosReserveWorkerToken = std::make_unique<Token>(lowQosReserveWorkerNum);
highQosReserveWorkerToken = std::make_unique<Token>(highQosReserveWorkerNum);
lowQosUseGlobalWorkerToken = std::make_unique<Token>(0);
highQosUseGlobalWorkerToken = std::make_unique<Token>(0);
for (auto qos = QoS::Min(); qos < QoS::Max(); ++qos) {
ctrlQueue[qos].maxConcurrency = DEFAULT_MAXCONCURRENCY;
if (qos > qos_max) {
ctrlQueue[qos].hardLimit = DEFAULT_HARDLIMIT - DEFAULT_SINGLE_NUM;
ctrlQueue[qos].reserveNum = 0;
continue;
}
ctrlQueue[qos].hardLimit = DEFAULT_HARDLIMIT;
ctrlQueue[qos].maxConcurrency = GlobalConfig::Instance().getCpuWorkerNum(qos);
setWorkerMaxNum[qos] = false;
ctrlQueue[qos].reserveNum = DEFAULT_SINGLE_NUM;
}
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
memset_s(&domainInfoMonitor, sizeof(domainInfoMonitor), 0, sizeof(domainInfoMonitor));
@ -80,6 +96,121 @@ void CPUMonitor::SetupMonitor()
#endif
}
bool CPUMonitor::QosWorkerNumValid(ffrt_worker_num_attr *qosData)
{
bool setWorkerNumQos[QoS::MaxNum()] = {false};
qosData->effectLen = qosData->effectLen == DEFAULT_PARAMS_VALUE ? qos_max + 1 : qosData->effectLen;
if (qosData->effectLen > QoS::MaxNum()) {
FFRT_LOGE("effectLen is invalid[%d]", qosData->effectLen);
return false;
}
if (MaxValueInvalid(qosData->lowQosReserveWorkerNum, GLOBAL_QOS_MAXNUM) ||
MaxValueInvalid(qosData->highQosReserveWorkerNum, GLOBAL_QOS_MAXNUM) ||
MaxValueInvalid(qosData->globalReserveWorkerNum, GLOBAL_QOS_MAXNUM)) {
FFRT_LOGE("lowQosReserveWorkerNum[%d],highQosReserveWorkerNum[%d],globalReserveWorkerNum[%d]",
qosData->lowQosReserveWorkerNum, qosData->highQosReserveWorkerNum, qosData->globalReserveWorkerNum);
return false;
}
unsigned int totalReserveNum = DEFAULT_GLOBAL_HARDLIMIT;
totalReserveNum = qosData->lowQosReserveWorkerNum == DEFAULT_PARAMS_VALUE ?
totalReserveNum : totalReserveNum - DEFAULT_LOW_RESERVE_NUM + qosData->lowQosReserveWorkerNum;
totalReserveNum = qosData->highQosReserveWorkerNum == DEFAULT_PARAMS_VALUE ?
totalReserveNum : totalReserveNum - DEFAULT_HIGH_RESERVE_NUM + qosData->highQosReserveWorkerNum;
totalReserveNum = qosData->globalReserveWorkerNum == DEFAULT_PARAMS_VALUE ?
totalReserveNum : totalReserveNum - DEFAULT_GLOBAL_RESERVE_NUM + qosData->globalReserveWorkerNum;
for (unsigned int i = 0; i < qosData->effectLen; i++) {
ffrt_qos_config_attr* singleQos = &(qosData->qosConfigArray[i]);
unsigned int qos = singleQos->qos;
if (qos >= QoS::MaxNum() || setWorkerNumQos[qos]) {
FFRT_LOGE("qos[%d] is invalid or repeat setting", qos);
return false;
}
setWorkerNumQos[qos] = true;
if (MaxValueInvalid(singleQos->maxConcurrency, MAX_MAXCONCURRENCY) ||
MaxValueInvalid(singleQos->hardLimit, GLOBAL_QOS_MAXNUM) ||
MaxValueInvalid(singleQos->reserveNum, GLOBAL_QOS_MAXNUM)) {
FFRT_LOGE("qos[%d],maxConcurrency[%d],hardLimit[%d],reserveNum[%d] is invalid",
qos, singleQos->maxConcurrency, singleQos->hardLimit, singleQos->reserveNum);
return false;
}
totalReserveNum = singleQos->reserveNum == DEFAULT_PARAMS_VALUE ? totalReserveNum : (qos > qos_max ?
totalReserveNum + singleQos->reserveNum : totalReserveNum - DEFAULT_SINGLE_NUM + singleQos->reserveNum);
}
if (totalReserveNum == 0 || totalReserveNum > GLOBAL_QOS_MAXNUM) {
FFRT_LOGE("totalNum[%d],lowQosWorkerNum[%d],highQosWorkerNum[%d],globalWorkerNum[%d] invalid", totalReserveNum,
qosData->lowQosReserveWorkerNum, qosData->highQosReserveWorkerNum, qosData->globalReserveWorkerNum);
for (unsigned int i = 0; i < qosData->effectLen; i++) {
ffrt_qos_config_attr* singleQos = &(qosData->qosConfigArray[i]);
FFRT_LOGE("totalReserveNum is check fail.reserveNum[%d]", singleQos->reserveNum);
}
return false;
}
return true;
}
bool CPUMonitor::MaxValueInvalid(unsigned int value, unsigned int default_value)
{
return value != DEFAULT_PARAMS_VALUE && value > default_value;
}
template <typename T>
void CPUMonitor::Assignment(T& targetValue, unsigned int value)
{
targetValue = value != DEFAULT_PARAMS_VALUE ? value : targetValue;
}
bool CPUMonitor::QosWorkerNumSegment(ffrt_worker_num_attr *qosData)
{
setWorkerNumLock.lock();
if (setWorkerNum) {
setWorkerNumLock.unlock();
FFRT_LOGE("qos config data setting repeat");
return false;
}
setWorkerNum = true;
setWorkerNumLock.unlock();
if (!QosWorkerNumValid(qosData)) {
return false;
}
for (int i = 0; i < QoS::MaxNum(); i++) {
WorkerCtrl &workerCtrl = ctrlQueue[i];
workerCtrl.lock.lock();
if (workerCtrl.sleepingWorkerNum != 0 || workerCtrl.executionNum != 0) {
for (int j = 0;j <= i; j++) {
WorkerCtrl &workerCtrl = ctrlQueue[j];
workerCtrl.lock.unlock();
}
FFRT_LOGE("Can only be set during initiallization,qos[%d], executionNum[%d],sleepingNum[%d]",
i, workerCtrl.executionNum, workerCtrl.sleepingWorkerNum);
return false;
}
}
for (unsigned int i = 0;i < qosData->effectLen; i++) {
auto singleQos = qosData->qosConfigArray[i];
int qos = singleQos.qos;
WorkerCtrl &workerCtrl = ctrlQueue[qos];
Assignment(workerCtrl.hardLimit, singleQos.hardLimit);
Assignment(workerCtrl.maxConcurrency, singleQos.maxConcurrency);
Assignment(workerCtrl.reserveNum, singleQos.reserveNum);
}
Assignment(lowQosReserveWorkerNum, qosData->lowQosReserveWorkerNum);
Assignment(highQosReserveWorkerNum, qosData->highQosReserveWorkerNum);
Assignment(globalReserveWorkerNum, qosData->globalReserveWorkerNum);
globalReserveWorkerToken = std::make_unique<Token>(globalReserveWorkerNum);
lowQosReserveWorkerToken = std::make_unique<Token>(lowQosReserveWorkerNum);
highQosReserveWorkerToken = std::make_unique<Token>(highQosReserveWorkerNum);
FFRT_LOGI("succ:globalReserveWorkerNum[%d],highQosReserveWorkerNum[%d],lowQosReserveWorkerNum[%d]",
globalReserveWorkerNum, highQosReserveWorkerNum, lowQosReserveWorkerNum);
for (int i = 0; i < QoS::MaxNum(); i++) {
WorkerCtrl &workerCtrl = ctrlQueue[i];
FFRT_LOGI("succ:qos[%d], reserveNum[%d], maxConcurrency[%d], hardLimit[%d]",
i, workerCtrl.reserveNum, workerCtrl.maxConcurrency, workerCtrl.hardLimit);
workerCtrl.lock.unlock();
}
return true;
}
void CPUMonitor::StartMonitor()
{
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
@ -94,26 +225,6 @@ void CPUMonitor::StartMonitor()
#endif
}
int CPUMonitor::SetWorkerMaxNum(const QoS& qos, int num)
{
WorkerCtrl& workerCtrl = ctrlQueue[qos()];
workerCtrl.lock.lock();
if (setWorkerMaxNum[qos()]) {
FFRT_LOGE("qos[%d] worker num can only been setup once", qos());
workerCtrl.lock.unlock();
return -1;
}
if (num <= 0 || num > QOS_WORKER_MAXNUM) {
FFRT_LOGE("qos[%d] worker num[%d] is invalid.", qos(), num);
workerCtrl.lock.unlock();
return -1;
}
workerCtrl.hardLimit = num;
setWorkerMaxNum[qos()] = true;
workerCtrl.lock.unlock();
return 0;
}
uint32_t CPUMonitor::GetMonitorTid() const
{
return monitorTid;
@ -163,6 +274,10 @@ void CPUMonitor::TimeoutCount(const QoS& qos)
{
WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
workerCtrl.lock.lock();
size_t totalNum = static_cast<size_t>(workerCtrl.sleepingWorkerNum + workerCtrl.executionNum);
if (totalNum > workerCtrl.reserveNum) {
ReleasePublicWorkerNum(qos);
}
workerCtrl.sleepingWorkerNum--;
workerCtrl.lock.unlock();
}
@ -176,6 +291,16 @@ void CPUMonitor::WakeupCount(const QoS& qos, bool isDeepSleepWork)
workerCtrl.lock.unlock();
}
void CPUMonitor::DoDestroy(const QoS& qos)
{
WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
std::unique_lock lk(workerCtrl.lock);
size_t totalNum = static_cast<size_t>(workerCtrl.sleepingWorkerNum + workerCtrl.executionNum);
if (totalNum > workerCtrl.reserveNum) {
ReleasePublicWorkerNum(qos);
}
}
int CPUMonitor::WakedWorkerNum(const QoS& qos)
{
WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
@ -238,6 +363,76 @@ bool CPUMonitor::IsExceedDeepSleepThreshold()
return deepSleepingWorkerNum * 2 > totalWorker;
}
bool CPUMonitor::LowQosUseReserveWorkerNum()
{
if (lowQosReserveWorkerToken->try_acquire()) {
return true;
} else {
if (globalReserveWorkerToken->try_acquire()) {
lowQosUseGlobalWorkerToken->release();
return true;
} else {
FFRT_LOGD("worker unavailable[%d], lowQosUse[%d], highQosUse[%d]",
qos(), lowQosUseGlobalWorkerToken->load(), highQosUseGlobalWorkerToken->load());
return false;
}
}
}
bool CPUMonitor::HighQosUseReserveWorkerNum()
{
if (highQosReserveWorkerToken->try_acquire()) {
return true;
} else {
if (globalReserveWorkerToken->try_acquire()) {
highQosUseGlobalWorkerToken->release();
return true;
} else {
FFRT_LOGD("worker unavailable[%d], lowQosUse[%d], highQosUse[%d]",
qos(), lowQosUseGlobalWorkerToken->load(), highQosUseGlobalWorkerToken->load());
return false;
}
}
}
bool CPUMonitor::TryAcquirePublicWorkerNum(const QoS& qos)
{
return qos() <= ffrt_qos_default ? LowQosUseReserveWorkerNum() : HighQosUseReserveWorkerNum();
}
void CPUMonitor::ReleasePublicWorkerNum(const QoS& qos)
{
if (qos() <= ffrt_qos_default) {
if (lowQosUseGlobalWorkerToken->try_acquire()) {
globalReserveWorkerToken->release();
} else {
lowQosReserveWorkerToken->release();
}
} else {
if (highQosUseGlobalWorkerToken->try_acquire()) {
globalReserveWorkerToken->release();
} else {
highQosReserveWorkerToken->release();
}
}
}
void CPUMonitor::LogAllWorkerNum()
{
FFRT_LOGD("globalReserveWorkerNum[%d],highQosReserveWorkerNum[%d],lowQosReserveWorkerNum[%d]",
globalReserveWorkerNum, highQosReserveWorkerNum, lowQosReserveWorkerNum);
FFRT_LOGD("globalReserveWorkerToken[%d],highQosReserveWorkerToken[%d],lowQosReserveWorkerToken[%d]",
globalReserveWorkerToken->load(), highQosReserveWorkerToken->load(), lowQosReserveWorkerToken->load());
FFRT_LOGD("lowQosUseGlobalWorkerToken[%d], highQosUseGlobalWorkerToken[%d]",
lowQosUseGlobalWorkerToken->load(), highQosUseGlobalWorkerToken->load());
for (int i = 0; i < QoS::MaxNum(); i++) {
WorkerCtrl &workerCtrl = ctrlQueue[i];
size_t runningNum = workerCtrl.executionNum;
size_t totalNum = static_cast<size_t>(workerCtrl.sleepingWorkerNum + workerCtrl.executionNum);
FFRT_LOGD("succ:qos[%d], reserveNum[%d], maxConcurrency[%d], hardLimit[%d], runningNum[%d], totalNum[%d]",
i, workerCtrl.reserveNum, workerCtrl.maxConcurrency, workerCtrl.hardLimit, runningNum, totalNum);
}
}
void CPUMonitor::Poke(const QoS& qos, uint32_t taskCount, TaskNotifyType notifyType)
{
WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
@ -264,7 +459,8 @@ void CPUMonitor::Poke(const QoS& qos, uint32_t taskCount, TaskNotifyType notifyT
if (static_cast<uint32_t>(workerCtrl.sleepingWorkerNum) > 0) {
workerCtrl.lock.unlock();
ops.WakeupWorkers(qos);
} else if ((runningNum < workerCtrl.maxConcurrency) && (totalNum < workerCtrl.hardLimit)) {
} else if ((runningNum < workerCtrl.maxConcurrency) && (totalNum < workerCtrl.hardLimit) &&
(totalNum < workerCtrl.reserveNum || TryAcquirePublicWorkerNum(qos))) {
workerCtrl.executionNum++;
FFRTTraceRecord::WorkRecord(static_cast<int>(qos), workerCtrl.executionNum);
workerCtrl.lock.unlock();
@ -273,6 +469,8 @@ void CPUMonitor::Poke(const QoS& qos, uint32_t taskCount, TaskNotifyType notifyT
if (workerCtrl.pollWaitFlag) {
FFRTFacade::GetPPInstance().GetPoller(qos).WakeUp();
}
FFRT_LOGD("noInc:qos[%d],reserveNum[%d],maxConcurrency[%d],hardLimit[%d],runningNum[%d],totalNum[%d]",
qos(), workerCtrl.reserveNum, workerCtrl.maxConcurrency, workerCtrl.hardLimit, runningNum, totalNum);
workerCtrl.lock.unlock();
}
}
@ -282,19 +480,26 @@ void CPUMonitor::NotifyWorkers(const QoS& qos, int number)
WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
workerCtrl.lock.lock();
int increasableNumber = static_cast<int>(workerCtrl.maxConcurrency) -
(workerCtrl.executionNum + workerCtrl.sleepingWorkerNum);
int maxWorkerLimit = static_cast<int>(std::min(workerCtrl.maxConcurrency, workerCtrl.hardLimit));
int increasableNumber = maxWorkerLimit - (workerCtrl.executionNum + workerCtrl.sleepingWorkerNum);
int wakeupNumber = std::min(number, workerCtrl.sleepingWorkerNum);
for (int idx = 0; idx < wakeupNumber; idx++) {
ops.WakeupWorkers(qos);
}
int incPublicNum = workerCtrl.reserveNum - (workerCtrl.executionNum + workerCtrl.sleepingWorkerNum);
int incNumber = std::min(number - wakeupNumber, increasableNumber);
for (int idx = 0; idx < incNumber; idx++) {
workerCtrl.executionNum++;
ops.IncWorker(qos);
if (idx < incPublicNum || TryAcquirePublicWorkerNum(qos)) {
workerCtrl.executionNum++;
ops.IncWorker(qos);
} else {
FFRT_LOGD("Fail:qos[%d],reserveNum[%d],maxConcurrency[%d],hardLimit[%d],totalNum[%d],idx[%d],inc[%d]",
qos(), workerCtrl.reserveNum, workerCtrl.maxConcurrency, workerCtrl.hardLimit,
workerCtrl.executionNum + workerCtrl.sleepingWorkerNum, idx, incNumber);
}
}
workerCtrl.lock.unlock();
FFRT_LOGD("qos[%d] inc [%d] workers, wakeup [%d] workers", static_cast<int>(qos), incNumber, wakeupNumber);
}
@ -337,7 +542,8 @@ void CPUMonitor::PokeAdd(const QoS& qos)
}
}
#endif
if ((runningNum < workerCtrl.maxConcurrency) && (totalNum < workerCtrl.hardLimit)) {
if ((runningNum < workerCtrl.maxConcurrency) && (totalNum < workerCtrl.hardLimit) &&
(totalNum < workerCtrl.reserveNum || TryAcquirePublicWorkerNum(qos))) {
workerCtrl.executionNum++;
workerCtrl.lock.unlock();
ops.IncWorker(qos);
@ -345,6 +551,8 @@ void CPUMonitor::PokeAdd(const QoS& qos)
if (workerCtrl.pollWaitFlag) {
FFRTFacade::GetPPInstance().GetPoller(qos).WakeUp();
}
FFRT_LOGD("noInc:qos[%d],reserveNum[%d],maxConcurrency[%d],hardLimit[%d],runningNum[%d],totalNum[%d]",
qos(), workerCtrl.reserveNum, workerCtrl.maxConcurrency, workerCtrl.hardLimit, runningNum, totalNum);
workerCtrl.lock.unlock();
}
}
@ -373,7 +581,8 @@ void CPUMonitor::PokePick(const QoS& qos)
}
}
#endif
if ((runningNum < workerCtrl.maxConcurrency) && (totalNum < workerCtrl.hardLimit)) {
if ((runningNum < workerCtrl.maxConcurrency) && (totalNum < workerCtrl.hardLimit) &&
(totalNum < workerCtrl.reserveNum || TryAcquirePublicWorkerNum(qos))) {
workerCtrl.executionNum++;
workerCtrl.lock.unlock();
ops.IncWorker(qos);
@ -381,8 +590,11 @@ void CPUMonitor::PokePick(const QoS& qos)
if (workerCtrl.pollWaitFlag) {
FFRTFacade::GetPPInstance().GetPoller(qos).WakeUp();
}
FFRT_LOGD("noInc:qos[%d],reserveNum[%d],maxConcurrency[%d],hardLimit[%d],runningNum[%d],totalNum[%d]",
qos(), workerCtrl.reserveNum, workerCtrl.maxConcurrency, workerCtrl.hardLimit, runningNum, totalNum);
workerCtrl.lock.unlock();
}
}
}
}

View File

@ -23,6 +23,8 @@
#include "qos.h"
#include "cpp/mutex.h"
#include "eu/cpu_manager_interface.h"
#include "c/type_def_ext.h"
#include "util/token.h"
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
#include "eu/blockaware.h"
#endif
@ -32,6 +34,7 @@ namespace ffrt {
struct WorkerCtrl {
size_t hardLimit = 0;
size_t maxConcurrency = 0;
size_t reserveNum = 0;
int executionNum = 0;
int sleepingWorkerNum = 0;
bool pollWaitFlag = false;
@ -56,6 +59,7 @@ public:
bool IsExceedDeepSleepThreshold();
void IntoPollWait(const QoS& qos);
void OutOfPollWait(const QoS& qos);
void DoDestroy(const QoS& qos);
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
bool IsExceedRunningThreshold(const QoS& qos);
bool IsBlockAwareInit(void);
@ -64,7 +68,8 @@ public:
#endif
virtual void Notify(const QoS& qos, TaskNotifyType notifyType) = 0;
virtual void WorkerInit() = 0;
int SetWorkerMaxNum(const QoS& qos, int num);
bool QosWorkerNumSegment (ffrt_worker_num_attr* qosData);
bool TryAcquirePublicWorkerNum(const QoS& qos);
/* strategy options for handling task notify events */
static void HandleTaskNotifyDefault(const QoS& qos, void* p, TaskNotifyType notifyType);
int WakedWorkerNum(const QoS& qos);
@ -86,7 +91,24 @@ private:
std::thread* monitorThread;
CpuMonitorOps ops;
std::atomic<bool> setWorkerMaxNum[QoS::MaxNum()];
bool setWorkerNum = false;
std::mutex setWorkerNumLock;
bool QosWorkerNumValid(ffrt_worker_num_attr* qosData);
bool MaxValueInvalid(unsigned int value, unsigned int default_value);
template <typename T>
void Assignment(T& targetValue, unsigned int value);
bool LowQosUseReserveWorkerNum();
bool HighQosUseReserveWorkerNum();
void ReleasePublicWorkerNum(const QoS& qos);
void LogAllWorkerNum();
unsigned int globalReserveWorkerNum = 0;
unsigned int lowQosReserveWorkerNum = 0;
unsigned int highQosReserveWorkerNum = 0;
std::unique_ptr<Token> globalReserveWorkerToken = nullptr;
std::unique_ptr<Token> lowQosReserveWorkerToken = nullptr;
std::unique_ptr<Token> highQosReserveWorkerToken = nullptr;
std::unique_ptr<Token> lowQosUseGlobalWorkerToken = nullptr;
std::unique_ptr<Token> highQosUseGlobalWorkerToken = nullptr;
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
bool blockAwareInit = false;
bool stopMonitor = false;

View File

@ -20,53 +20,18 @@
#include "types.h"
namespace ffrt {
constexpr int DEFAULT_MINCONCURRENCY = 4;
constexpr int INTERACTIVE_MAXCONCURRENCY = USE_COROUTINE ? 8 : 40000;
constexpr int DEFAULT_MAXCONCURRENCY = USE_COROUTINE ? 8 : 80000;
constexpr int DEFAULT_HARDLIMIT = 16;
constexpr int QOS_WORKER_MAXNUM = (8 * 16);
constexpr unsigned int DEFAULT_GLOBAL_HARDLIMIT = 96;
constexpr unsigned int DEFAULT_PARAMS_VALUE = 0Xffffffff;
class GlobalConfig {
public:
GlobalConfig(const GlobalConfig&) = delete;
constexpr unsigned int DEFAULT_MAXCONCURRENCY = 8;
constexpr unsigned int MAX_MAXCONCURRENCY = 12;
constexpr unsigned int DEFAULT_HARDLIMIT = 44;
constexpr unsigned int DEFAULT_SINGLE_NUM = 8;
GlobalConfig& operator=(const GlobalConfig&) = delete;
~GlobalConfig() {}
static inline GlobalConfig& Instance()
{
static GlobalConfig cfg;
return cfg;
}
void setCpuWorkerNum(const QoS& qos, int num)
{
if ((num <= 0) || (num > DEFAULT_MAXCONCURRENCY)) {
num = DEFAULT_MAXCONCURRENCY;
}
this->cpu_worker_num[qos()] = static_cast<size_t>(num);
}
size_t getCpuWorkerNum(const QoS& qos)
{
return this->cpu_worker_num[qos()];
}
private:
GlobalConfig()
{
for (auto qos = QoS::Min(); qos < QoS::Max(); ++qos) {
if (qos == static_cast<int>(qos_user_interactive)) {
this->cpu_worker_num[qos] = INTERACTIVE_MAXCONCURRENCY;
} else {
this->cpu_worker_num[qos] = DEFAULT_MAXCONCURRENCY;
}
}
}
size_t cpu_worker_num[QoS::MaxNum()];
};
constexpr unsigned int DEFAULT_GLOBAL_RESERVE_NUM = 24;
constexpr unsigned int DEFAULT_LOW_RESERVE_NUM = 12;
constexpr unsigned int DEFAULT_HIGH_RESERVE_NUM = 12;
constexpr unsigned int GLOBAL_QOS_MAXNUM = 256;
}
#endif /* GLOBAL_CONFIG_H */

View File

@ -43,18 +43,33 @@ public:
return {};
}
static void LockMem()
{
return Instance().lockMem_();
}
static void UnlockMem()
{
return Instance().unlockMem_();
}
static void RegistCb(TaskAllocCB<CPUEUTask>::Alloc &&alloc, TaskAllocCB<CPUEUTask>::Free &&free,
TaskAllocCB<CPUEUTask>::GetUnfreedMem &&getUnfreedMem = nullptr)
TaskAllocCB<CPUEUTask>::GetUnfreedMem &&getUnfreedMem = nullptr,
TaskAllocCB<CPUEUTask>::LockMem &&lockMem = nullptr,
TaskAllocCB<CPUEUTask>::UnlockMem &&unlockMem = nullptr)
{
Instance().alloc_ = std::move(alloc);
Instance().free_ = std::move(free);
Instance().getUnfreedMem_ = std::move(getUnfreedMem);
Instance().lockMem_ = std::move(lockMem);
Instance().unlockMem_ = std::move(unlockMem);
}
private:
TaskAllocCB<CPUEUTask>::Free free_;
TaskAllocCB<CPUEUTask>::Alloc alloc_;
TaskAllocCB<CPUEUTask>::GetUnfreedMem getUnfreedMem_;
TaskAllocCB<CPUEUTask>::LockMem lockMem_;
TaskAllocCB<CPUEUTask>::UnlockMem unlockMem_;
};
} // namespace ffrt

View File

@ -28,6 +28,8 @@ struct TaskAllocCB {
using Alloc = std::function<T *()>;
using Free = std::function<void (T *)>;
using GetUnfreedMem = std::function<std::vector<void *> ()>;
using LockMem = std::function<void ()>;
using UnlockMem = std::function<void ()>;
};
#endif /* FFRT_CB_FUNC_H_ */

View File

@ -73,9 +73,14 @@ public:
return Instance()->getUnfreed();
}
static std::vector<void *> getUnSafeUnfreedMem()
static void LockMem()
{
return Instance()->getUnSafeUnfreed();
return Instance()->SimpleAllocatorLock();
}
static void UnlockMem()
{
return Instance()->SimpleAllocatorUnLock();
}
private:
SpmcQueue primaryCache;
@ -87,14 +92,6 @@ private:
std::size_t count = 0;
std::vector<void *> getUnfreed()
{
lock.lock();
std::vector<void *> ret = getUnSafeUnfreed();
lock.unlock();
return ret;
}
std::vector<void *> getUnSafeUnfreed()
{
std::vector<void *> ret;
#ifdef FFRT_BBOX_ENABLE
@ -113,6 +110,16 @@ private:
return ret;
}
void SimpleAllocatorLock()
{
lock.lock();
}
void SimpleAllocatorUnLock()
{
lock.unlock();
}
void init()
{
lock.lock();

63
src/util/token.h Normal file
View File

@ -0,0 +1,63 @@
/*
* Copyright (c) 2023 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef TOKEN_H
#define TOKEN_H
#include <atomic>
#include <cstdio>
#include <stdlib.h>
#include <new>
namespace ffrt {
class Token {
public:
using token_value_t = std::atomic<unsigned int>;
Token() = delete;
Token(unsigned int init)
{
count.store(init);
}
inline bool try_acquire()
{
bool ret = true;
for (; ;) {
unsigned int v = count.load(std::memory_order_relaxed);
if (v == 0) {
ret = false;
break;
}
if (count.compare_exchange_strong(v, v - 1, std::memory_order_acquire, std::memory_order_relaxed)) {
break;
}
}
return ret;
}
inline void release()
{
count.fetch_add(1);
}
unsigned int load()
{
return count.load();
}
private:
token_value_t count;
};
}
#endif