三级预留代码回退

Signed-off-by: baishimin <2258359443@qq.com>
This commit is contained in:
baishimin 2024-11-04 11:24:50 +08:00
parent a4e86ce6fd
commit 989fe2118f
8 changed files with 88 additions and 373 deletions

View File

@ -33,26 +33,7 @@ FFRT_C_API int ffrt_skip(ffrt_task_handle_t handle);
// config
FFRT_C_API int ffrt_set_cgroup_attr(ffrt_qos_t qos, ffrt_os_sched_attr* attr);
FFRT_C_API void ffrt_restore_qos_config(void);
/**
* @brief worker num setting.
*
* @param qosData param is default when value equal 0xffffffff.
* totalNum = lowQosReserveWorkerNum + highQosReserveWorkerNum + sum of all reserveNum
* totalNum is valid in (0,256].
* lowQosReserveWorkerNum is a low partition qos public resource.{[min, max], default} is {[0,256],12}.
* highQosReserveWorkerNum is a hight partition qos public resource.{[min, max], default} is {[0,256],12}.
* lowQosReserveWorkerNum is a global qos public resource.{[min, max], default} is {[0,256],24}.
* qosConfigArray is an array of ffrt_qos_config.
* effectLen: param setting will success when qosConfigArray index less than effectLen.
* qos valid in [0,5].
* reserveNum: mininum number which qos can create worker.{[min, max], default} is {[0,256],8}.
* maxConcurrency is amx concurrency num of the qos.{[min, max], default} is {[0,12],8}.
* hardLimit: max number which qos can create worker.{[min, max], default} is {[0,256],44}.
* @return return 0 when setting success.return -1 when setting fail, and param is default.
* @version 1.0
*/
FFRT_C_API int ffrt_set_qos_worker_num(ffrt_worker_num_param* qosData);
FFRT_C_API int ffrt_set_cpu_worker_max_num(ffrt_qos_t qos, uint32_t num);
/**
* @brief Set the task execution timeout.

View File

@ -67,22 +67,6 @@ typedef struct {
char cpumap[MAX_CPUMAP_LENGTH];
} ffrt_os_sched_attr;
typedef struct {
unsigned int hardLimit;
unsigned int maxConcurrency;
unsigned int reserveNum;
unsigned int qos;
} ffrt_qos_config;
#define MAX_QOS_LENGTH 19
typedef struct {
unsigned int effectLen;
unsigned int lowQosReserveWorkerNum;
unsigned int highQosReserveWorkerNum;
unsigned int globalReserveWorkerNum;
ffrt_qos_config qosConfigArray[MAX_QOS_LENGTH];
} ffrt_worker_num_param;
typedef struct {
long storage;
} ffrt_rwlockattr_t;

View File

@ -58,27 +58,9 @@ static inline void restore_qos_config()
ffrt_restore_qos_config();
}
/**
* @brief worker num setting.
*
* @param qosData param is default when value equal 0xffffffff.
* totalNum = lowQosReserveWorkerNum + highQosReserveWorkerNum + sum of all reserveNum
* totalNum is valid in (0,256].
* lowQosReserveWorkerNum is a low partition qos public resource.{[min, max], default} is {[0,256],12}.
* highQosReserveWorkerNum is a hight partition qos public resource.{[min, max], default} is {[0,256],12}.
* lowQosReserveWorkerNum is a global qos public resource.{[min, max], default} is {[0,256],24}.
* qosConfigArray is an array of ffrt_qos_config.
* effectLen: param setting will success when qosConfigArray index less than effectLen.
* qos valid in [0,5].
* reserveNum: mininum number which qos can create worker.{[min, max], default} is {[0,256],8}.
* maxConcurrency is amx concurrency num of the qos.{[min, max], default} is {[0,12],8}.
* hardLimit: max number which qos can create worker.{[min, max], default} is {[0,256],44}.
* @return return 0 when setting success.return -1 when setting fail, and param is default.
* @version 1.0
*/
static inline int set_qos_worker_num(ffrt_worker_num_param* qosData)
static inline int set_cpu_worker_max_num(qos qos_, uint32_t num)
{
return ffrt_set_qos_worker_num(qosData);
return ffrt_set_cpu_worker_max_num(qos_, num);
}
/**

View File

@ -25,7 +25,6 @@
#include "internal_inc/config.h"
#include "eu/osattr_manager.h"
#include "eu/worker_thread.h"
#include "eu/cpu_monitor.h"
#include "dfx/log/ffrt_log_api.h"
#include "dfx/trace_record/ffrt_trace_record.h"
#include "dfx/watchdog/watchdog_util.h"
@ -407,10 +406,19 @@ void ffrt_restore_qos_config()
}
API_ATTRIBUTE((visibility("default")))
int ffrt_set_qos_worker_num(ffrt_worker_num_param *qosData)
int ffrt_set_cpu_worker_max_num(ffrt_qos_t qos, uint32_t num)
{
if (ffrt::GetFuncQosMap() == nullptr) {
FFRT_LOGE("FuncQosMap has not regist");
return -1;
}
ffrt::QoS _qos = ffrt::GetFuncQosMap()(qos);
if (((qos != ffrt::qos_default) && (_qos() == ffrt::qos_default)) || (qos <= ffrt::qos_inherit)) {
FFRT_LOGE("qos[%d] is invalid.", qos);
return -1;
}
ffrt::CPUMonitor *monitor = ffrt::FFRTFacade::GetEUInstance().GetCPUMonitor();
return monitor->QosWorkerNumSegment(qosData);
return monitor->SetWorkerMaxNum(_qos, num);
}
API_ATTRIBUTE((visibility("default")))

View File

@ -22,6 +22,7 @@
#include "eu/execute_unit.h"
#include "dfx/log/ffrt_log_api.h"
#include "dfx/trace_record/ffrt_trace_record.h"
#include "internal_inc/config.h"
#include "util/name_manager.h"
#include "sync/poller.h"
#include "util/ffrt_facade.h"
@ -35,9 +36,7 @@ constexpr int JITTER_DELAY_MS = 5;
}
namespace ffrt {
CPUMonitor::CPUMonitor(CpuMonitorOps&& ops)
: ops(ops),
qosWorkerConfig(QoS::MaxNum())
CPUMonitor::CPUMonitor(CpuMonitorOps&& ops) : ops(ops)
{
SetupMonitor();
StartMonitor();
@ -45,7 +44,6 @@ CPUMonitor::CPUMonitor(CpuMonitorOps&& ops)
CPUMonitor::~CPUMonitor()
{
LogAllWorkerNum();
if (monitorThread != nullptr) {
monitorThread->join();
}
@ -55,27 +53,10 @@ CPUMonitor::~CPUMonitor()
void CPUMonitor::SetupMonitor()
{
globalReserveWorkerNum = DEFAULT_GLOBAL_RESERVE_NUM;
lowQosReserveWorkerNum = DEFAULT_LOW_RESERVE_NUM;
highQosReserveWorkerNum = DEFAULT_HIGH_RESERVE_NUM;
globalReserveWorkerToken = std::make_unique<Token>(globalReserveWorkerNum);
lowQosReserveWorkerToken = std::make_unique<Token>(lowQosReserveWorkerNum);
highQosReserveWorkerToken = std::make_unique<Token>(highQosReserveWorkerNum);
lowQosUseGlobalWorkerToken = std::make_unique<Token>(0);
highQosUseGlobalWorkerToken = std::make_unique<Token>(0);
for (auto qos = QoS::Min(); qos < QoS::Max(); ++qos) {
ctrlQueue[qos].maxConcurrency = DEFAULT_MAXCONCURRENCY;
if (qos > qos_max) {
ctrlQueue[qos].hardLimit = DEFAULT_HARDLIMIT - DEFAULT_SINGLE_NUM;
ctrlQueue[qos].reserveNum = 0;
qosWorkerConfig.mQosWorkerCfg[qos].reserveNum = 0;
qosWorkerConfig.mQosWorkerCfg[qos].hardLimit = DEFAULT_HARDLIMIT - DEFAULT_SINGLE_NUM;
continue;
}
ctrlQueue[qos].hardLimit = DEFAULT_HARDLIMIT;
ctrlQueue[qos].reserveNum = DEFAULT_SINGLE_NUM;
ctrlQueue[qos].maxConcurrency = GlobalConfig::Instance().getCpuWorkerNum(qos);
setWorkerMaxNum[qos] = false;
}
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
memset_s(&domainInfoMonitor, sizeof(domainInfoMonitor), 0, sizeof(domainInfoMonitor));
@ -99,128 +80,6 @@ void CPUMonitor::SetupMonitor()
#endif
}
void CPUMonitor::SetWorkerPara(unsigned int& param, unsigned int value)
{
if (value != DEFAULT_PARAMS_VALUE) {
param = value;
}
}
int CPUMonitor::SetQosWorkerPara(ffrt_qos_config& qosCfg)
{
SetWorkerPara(qosWorkerConfig.mQosWorkerCfg[qosCfg.qos].maxConcurrency, qosCfg.maxConcurrency);
SetWorkerPara(qosWorkerConfig.mQosWorkerCfg[qosCfg.qos].hardLimit, qosCfg.hardLimit);
SetWorkerPara(qosWorkerConfig.mQosWorkerCfg[qosCfg.qos].reserveNum, qosCfg.reserveNum);
if ((qosWorkerConfig.mQosWorkerCfg[qosCfg.qos].maxConcurrency > MAX_MAXCONCURRENCY) ||
(qosWorkerConfig.mQosWorkerCfg[qosCfg.qos].hardLimit > GLOBAL_QOS_MAXNUM) ||
(qosWorkerConfig.mQosWorkerCfg[qosCfg.qos].reserveNum > GLOBAL_QOS_MAXNUM)) {
FFRT_LOGE("qos[%d],maxConcurrency[%d],hardLimit[%d],reserveNum[%d] is invalid",
qosCfg.qos, qosWorkerConfig.mQosWorkerCfg[qosCfg.qos].maxConcurrency,
qosWorkerConfig.mQosWorkerCfg[qosCfg.qos].hardLimit,
qosWorkerConfig.mQosWorkerCfg[qosCfg.qos].reserveNum);
return -1;
}
return 0;
}
bool CPUMonitor::QosWorkerNumValid(ffrt_worker_num_param *qosData)
{
bool setWorkerNumQos[QoS::MaxNum()] = {false};
if (qosData->effectLen > QoS::MaxNum()) {
FFRT_LOGE("effectLen is invalid[%d]", qosData->effectLen);
return false;
}
for (unsigned int i = 0; i < qosData->effectLen; i++) {
unsigned int qos = qosData->qosConfigArray[i].qos;
if (qos >= QoS::MaxNum() || setWorkerNumQos[qos]) {
FFRT_LOGE("qos[%d] is invalid or repeat setting", qos);
return false;
}
setWorkerNumQos[qos] = true;
if (SetQosWorkerPara(qosData->qosConfigArray[i]) != 0) {
return false;
}
}
SetWorkerPara(qosWorkerConfig.mLowQosReserveWorkerNum, qosData->lowQosReserveWorkerNum);
SetWorkerPara(qosWorkerConfig.mHighQosReserveWorkerNum, qosData->highQosReserveWorkerNum);
SetWorkerPara(qosWorkerConfig.mGlobalReserveWorkerNum, qosData->globalReserveWorkerNum);
if ((qosWorkerConfig.mLowQosReserveWorkerNum> GLOBAL_QOS_MAXNUM) ||
(qosWorkerConfig.mHighQosReserveWorkerNum > GLOBAL_QOS_MAXNUM) ||
(qosWorkerConfig.mGlobalReserveWorkerNum > GLOBAL_QOS_MAXNUM)) {
FFRT_LOGE("lowQosReserveWorkerNum[%d],highQosReserveWorkerNum[%d],globalReserveWorkerNum[%d]",
qosWorkerConfig.mLowQosReserveWorkerNum, qosWorkerConfig.mHighQosReserveWorkerNum,
qosWorkerConfig.mGlobalReserveWorkerNum);
return false;
}
unsigned int totalReserveNum = qosWorkerConfig.GetGlobalMaxWorkerNum();
if (totalReserveNum == 0 || totalReserveNum > GLOBAL_QOS_MAXNUM) {
FFRT_LOGE("totalNum[%d],lowQosWorkerNum[%d],highQosWorkerNum[%d],globalWorkerNum[%d] invalid", totalReserveNum,
qosData->lowQosReserveWorkerNum, qosData->highQosReserveWorkerNum, qosData->globalReserveWorkerNum);
for (unsigned int i = 0; i < qosData->effectLen; i++) {
ffrt_qos_config* singleQos = &(qosData->qosConfigArray[i]);
FFRT_LOGE("totalReserveNum is check fail.reserveNum[%d]", singleQos->reserveNum);
}
return false;
}
return true;
}
int CPUMonitor::QosWorkerNumSegment(ffrt_worker_num_param *qosData)
{
setWorkerNumLock.lock();
if (setWorkerNum) {
setWorkerNumLock.unlock();
FFRT_LOGE("qos config data setting repeat");
return -1;
}
setWorkerNum = true;
setWorkerNumLock.unlock();
if (!QosWorkerNumValid(qosData)) {
return -1;
}
for (int i = 0; i < QoS::MaxNum(); i++) {
WorkerCtrl &workerCtrl = ctrlQueue[i];
workerCtrl.lock.lock();
if (workerCtrl.sleepingWorkerNum != 0 || workerCtrl.executionNum != 0) {
for (int j = 0;j <= i; j++) {
WorkerCtrl &workerCtrl = ctrlQueue[j];
workerCtrl.lock.unlock();
}
FFRT_LOGE("Can only be set during initiallization,qos[%d], executionNum[%d],sleepingNum[%d]",
i, workerCtrl.executionNum, workerCtrl.sleepingWorkerNum);
return -1;
}
}
for (int i = 0; i < QoS::MaxNum(); i++) {
WorkerCtrl &workerCtrl = ctrlQueue[i];
workerCtrl.hardLimit = qosWorkerConfig.mQosWorkerCfg[i].hardLimit;
workerCtrl.maxConcurrency = qosWorkerConfig.mQosWorkerCfg[i].maxConcurrency;
workerCtrl.reserveNum = qosWorkerConfig.mQosWorkerCfg[i].reserveNum;
}
lowQosReserveWorkerNum = qosWorkerConfig.mLowQosReserveWorkerNum;
highQosReserveWorkerNum = qosWorkerConfig.mHighQosReserveWorkerNum;
globalReserveWorkerNum = qosWorkerConfig.mGlobalReserveWorkerNum;
globalReserveWorkerToken = std::make_unique<Token>(globalReserveWorkerNum);
lowQosReserveWorkerToken = std::make_unique<Token>(lowQosReserveWorkerNum);
highQosReserveWorkerToken = std::make_unique<Token>(highQosReserveWorkerNum);
FFRT_LOGI("succ:globalReserveWorkerNum[%d],highQosReserveWorkerNum[%d],lowQosReserveWorkerNum[%d]",
globalReserveWorkerNum, highQosReserveWorkerNum, lowQosReserveWorkerNum);
for (int i = 0; i < QoS::MaxNum(); i++) {
WorkerCtrl &workerCtrl = ctrlQueue[i];
FFRT_LOGI("succ:qos[%d], reserveNum[%d], maxConcurrency[%d], hardLimit[%d]",
i, workerCtrl.reserveNum, workerCtrl.maxConcurrency, workerCtrl.hardLimit);
workerCtrl.lock.unlock();
}
return 0;
}
void CPUMonitor::StartMonitor()
{
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
@ -235,6 +94,26 @@ void CPUMonitor::StartMonitor()
#endif
}
int CPUMonitor::SetWorkerMaxNum(const QoS& qos, int num)
{
WorkerCtrl& workerCtrl = ctrlQueue[qos()];
workerCtrl.lock.lock();
if (setWorkerMaxNum[qos()]) {
FFRT_LOGE("qos[%d] worker num can only been setup once", qos());
workerCtrl.lock.unlock();
return -1;
}
if (num <= 0 || num > QOS_WORKER_MAXNUM) {
FFRT_LOGE("qos[%d] worker num[%d] is invalid.", qos(), num);
workerCtrl.lock.unlock();
return -1;
}
workerCtrl.hardLimit = num;
setWorkerMaxNum[qos()] = true;
workerCtrl.lock.unlock();
return 0;
}
uint32_t CPUMonitor::GetMonitorTid() const
{
return monitorTid;
@ -284,10 +163,6 @@ void CPUMonitor::TimeoutCount(const QoS& qos)
{
WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
workerCtrl.lock.lock();
size_t totalNum = static_cast<size_t>(workerCtrl.sleepingWorkerNum + workerCtrl.executionNum);
if (totalNum > workerCtrl.reserveNum) {
ReleasePublicWorkerNum(qos);
}
workerCtrl.sleepingWorkerNum--;
workerCtrl.lock.unlock();
}
@ -301,16 +176,6 @@ void CPUMonitor::WakeupCount(const QoS& qos, bool isDeepSleepWork)
workerCtrl.lock.unlock();
}
void CPUMonitor::DoDestroy(const QoS& qos)
{
WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
std::lock_guard lk(workerCtrl.lock);
size_t totalNum = static_cast<size_t>(workerCtrl.sleepingWorkerNum + workerCtrl.executionNum);
if (totalNum > workerCtrl.reserveNum) {
ReleasePublicWorkerNum(qos);
}
}
int CPUMonitor::WakedWorkerNum(const QoS& qos)
{
WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
@ -380,76 +245,6 @@ bool CPUMonitor::IsExceedDeepSleepThreshold()
return deepSleepingWorkerNum * 2 > totalWorker;
}
bool CPUMonitor::LowQosUseReserveWorkerNum()
{
if (lowQosReserveWorkerToken->try_acquire()) {
return true;
} else {
if (globalReserveWorkerToken->try_acquire()) {
lowQosUseGlobalWorkerToken->release();
return true;
} else {
FFRT_LOGD("worker unavailable[%d], lowQosUse[%d], highQosUse[%d]",
qos(), lowQosUseGlobalWorkerToken->load(), highQosUseGlobalWorkerToken->load());
return false;
}
}
}
bool CPUMonitor::HighQosUseReserveWorkerNum()
{
if (highQosReserveWorkerToken->try_acquire()) {
return true;
} else {
if (globalReserveWorkerToken->try_acquire()) {
highQosUseGlobalWorkerToken->release();
return true;
} else {
FFRT_LOGD("worker unavailable[%d], lowQosUse[%d], highQosUse[%d]",
qos(), lowQosUseGlobalWorkerToken->load(), highQosUseGlobalWorkerToken->load());
return false;
}
}
}
bool CPUMonitor::TryAcquirePublicWorkerNum(const QoS& qos)
{
return qos() <= ffrt_qos_default ? LowQosUseReserveWorkerNum() : HighQosUseReserveWorkerNum();
}
void CPUMonitor::ReleasePublicWorkerNum(const QoS& qos)
{
if (qos() <= ffrt_qos_default) {
if (lowQosUseGlobalWorkerToken->try_acquire()) {
globalReserveWorkerToken->release();
} else {
lowQosReserveWorkerToken->release();
}
} else {
if (highQosUseGlobalWorkerToken->try_acquire()) {
globalReserveWorkerToken->release();
} else {
highQosReserveWorkerToken->release();
}
}
}
void CPUMonitor::LogAllWorkerNum()
{
FFRT_LOGD("globalReserveWorkerNum[%d],highQosReserveWorkerNum[%d],lowQosReserveWorkerNum[%d]",
globalReserveWorkerNum, highQosReserveWorkerNum, lowQosReserveWorkerNum);
FFRT_LOGD("globalReserveWorkerToken[%d],highQosReserveWorkerToken[%d],lowQosReserveWorkerToken[%d]",
globalReserveWorkerToken->load(), highQosReserveWorkerToken->load(), lowQosReserveWorkerToken->load());
FFRT_LOGD("lowQosUseGlobalWorkerToken[%d], highQosUseGlobalWorkerToken[%d]",
lowQosUseGlobalWorkerToken->load(), highQosUseGlobalWorkerToken->load());
for (int i = 0; i < QoS::MaxNum(); i++) {
WorkerCtrl& workerCtrl = ctrlQueue[i];
size_t runningNum = workerCtrl.executionNum;
size_t totalNum = static_cast<size_t>(workerCtrl.sleepingWorkerNum + workerCtrl.executionNum);
FFRT_LOGD("succ:qos[%d], reserveNum[%d], maxConcurrency[%d], hardLimit[%d], runningNum[%d], totalNum[%d]",
i, workerCtrl.reserveNum, workerCtrl.maxConcurrency, workerCtrl.hardLimit, runningNum, totalNum);
}
}
void CPUMonitor::Poke(const QoS& qos, uint32_t taskCount, TaskNotifyType notifyType)
{
WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
@ -481,8 +276,7 @@ void CPUMonitor::Poke(const QoS& qos, uint32_t taskCount, TaskNotifyType notifyT
if (static_cast<uint32_t>(workerCtrl.sleepingWorkerNum) > 0) {
workerCtrl.lock.unlock();
ops.WakeupWorkers(qos);
} else if ((runningNum < workerCtrl.maxConcurrency) && (totalNum < workerCtrl.hardLimit) &&
(totalNum < workerCtrl.reserveNum || TryAcquirePublicWorkerNum(qos))) {
} else if ((runningNum < workerCtrl.maxConcurrency) && (totalNum < workerCtrl.hardLimit)) {
workerCtrl.executionNum++;
FFRTTraceRecord::WorkRecord(static_cast<int>(qos), workerCtrl.executionNum);
workerCtrl.lock.unlock();
@ -491,8 +285,6 @@ void CPUMonitor::Poke(const QoS& qos, uint32_t taskCount, TaskNotifyType notifyT
if (workerCtrl.pollWaitFlag) {
FFRTFacade::GetPPInstance().GetPoller(qos).WakeUp();
}
FFRT_LOGD("noInc:qos[%d],reserveNum[%d],maxConcurrency[%d],hardLimit[%d],runningNum[%d],totalNum[%d]",
qos(), workerCtrl.reserveNum, workerCtrl.maxConcurrency, workerCtrl.hardLimit, runningNum, totalNum);
workerCtrl.lock.unlock();
}
}
@ -502,26 +294,19 @@ void CPUMonitor::NotifyWorkers(const QoS& qos, int number)
WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
workerCtrl.lock.lock();
int maxWorkerLimit = static_cast<int>(std::min(workerCtrl.maxConcurrency, workerCtrl.hardLimit));
int increasableNumber = maxWorkerLimit - (workerCtrl.executionNum + workerCtrl.sleepingWorkerNum);
int increasableNumber = static_cast<int>(workerCtrl.maxConcurrency) -
(workerCtrl.executionNum + workerCtrl.sleepingWorkerNum);
int wakeupNumber = std::min(number, workerCtrl.sleepingWorkerNum);
for (int idx = 0; idx < wakeupNumber; idx++) {
ops.WakeupWorkers(qos);
}
int incPublicNum = workerCtrl.reserveNum - (workerCtrl.executionNum + workerCtrl.sleepingWorkerNum);
int incNumber = std::min(number - wakeupNumber, increasableNumber);
for (int idx = 0; idx < incNumber; idx++) {
if (idx < incPublicNum || TryAcquirePublicWorkerNum(qos)) {
workerCtrl.executionNum++;
ops.IncWorker(qos);
} else {
FFRT_LOGD("Fail:qos[%d],reserveNum[%d],maxConcurrency[%d],hardLimit[%d],totalNum[%d],idx[%d],inc[%d]",
qos(), workerCtrl.reserveNum, workerCtrl.maxConcurrency, workerCtrl.hardLimit,
workerCtrl.executionNum + workerCtrl.sleepingWorkerNum, idx, incNumber);
}
workerCtrl.executionNum++;
ops.IncWorker(qos);
}
workerCtrl.lock.unlock();
FFRT_LOGD("qos[%d] inc [%d] workers, wakeup [%d] workers", static_cast<int>(qos), incNumber, wakeupNumber);
}
@ -564,8 +349,7 @@ void CPUMonitor::PokeAdd(const QoS& qos)
}
}
#endif
if ((runningNum < workerCtrl.maxConcurrency) && (totalNum < workerCtrl.hardLimit) &&
(totalNum < workerCtrl.reserveNum || TryAcquirePublicWorkerNum(qos))) {
if ((runningNum < workerCtrl.maxConcurrency) && (totalNum < workerCtrl.hardLimit)) {
workerCtrl.executionNum++;
workerCtrl.lock.unlock();
ops.IncWorker(qos);
@ -573,8 +357,6 @@ void CPUMonitor::PokeAdd(const QoS& qos)
if (workerCtrl.pollWaitFlag) {
FFRTFacade::GetPPInstance().GetPoller(qos).WakeUp();
}
FFRT_LOGD("noInc:qos[%d],reserveNum[%d],maxConcurrency[%d],hardLimit[%d],runningNum[%d],totalNum[%d]",
qos(), workerCtrl.reserveNum, workerCtrl.maxConcurrency, workerCtrl.hardLimit, runningNum, totalNum);
workerCtrl.lock.unlock();
}
}
@ -603,8 +385,7 @@ void CPUMonitor::PokePick(const QoS& qos)
}
}
#endif
if ((runningNum < workerCtrl.maxConcurrency) && (totalNum < workerCtrl.hardLimit) &&
(totalNum < workerCtrl.reserveNum || TryAcquirePublicWorkerNum(qos))) {
if ((runningNum < workerCtrl.maxConcurrency) && (totalNum < workerCtrl.hardLimit)) {
workerCtrl.executionNum++;
workerCtrl.lock.unlock();
ops.IncWorker(qos);
@ -612,11 +393,8 @@ void CPUMonitor::PokePick(const QoS& qos)
if (workerCtrl.pollWaitFlag) {
FFRTFacade::GetPPInstance().GetPoller(qos).WakeUp();
}
FFRT_LOGD("noInc:qos[%d],reserveNum[%d],maxConcurrency[%d],hardLimit[%d],runningNum[%d],totalNum[%d]",
qos(), workerCtrl.reserveNum, workerCtrl.maxConcurrency, workerCtrl.hardLimit, runningNum, totalNum);
workerCtrl.lock.unlock();
}
}
}
}

View File

@ -23,9 +23,6 @@
#include "qos.h"
#include "cpp/mutex.h"
#include "eu/cpu_manager_interface.h"
#include "c/type_def_ext.h"
#include "util/token.h"
#include "internal_inc/config.h"
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
#include "eu/blockaware.h"
#endif
@ -35,7 +32,6 @@ namespace ffrt {
struct WorkerCtrl {
size_t hardLimit = 0;
size_t maxConcurrency = 0;
size_t reserveNum = 0;
int executionNum = 0;
int sleepingWorkerNum = 0;
bool pollWaitFlag = false;
@ -60,7 +56,6 @@ public:
bool IsExceedDeepSleepThreshold();
void IntoPollWait(const QoS& qos);
void OutOfPollWait(const QoS& qos);
void DoDestroy(const QoS& qos);
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
bool IsExceedRunningThreshold(const QoS& qos);
bool IsBlockAwareInit(void);
@ -69,8 +64,7 @@ public:
#endif
virtual void Notify(const QoS& qos, TaskNotifyType notifyType) = 0;
virtual void WorkerInit() = 0;
int QosWorkerNumSegment (ffrt_worker_num_param* qosData);
bool TryAcquirePublicWorkerNum(const QoS& qos);
int SetWorkerMaxNum(const QoS& qos, int num);
/* strategy options for handling task notify events */
static void HandleTaskNotifyDefault(const QoS& qos, void* p, TaskNotifyType notifyType);
int WakedWorkerNum(const QoS& qos);
@ -93,24 +87,7 @@ private:
std::thread* monitorThread;
CpuMonitorOps ops;
bool setWorkerNum = false;
std::mutex setWorkerNumLock;
void SetWorkerPara(unsigned int& param, unsigned int value);
int SetQosWorkerPara(ffrt_qos_config& qosCfg);
bool QosWorkerNumValid(ffrt_worker_num_param* qosData);
bool LowQosUseReserveWorkerNum();
bool HighQosUseReserveWorkerNum();
void ReleasePublicWorkerNum(const QoS& qos);
void LogAllWorkerNum();
unsigned int globalReserveWorkerNum = 0;
unsigned int lowQosReserveWorkerNum = 0;
unsigned int highQosReserveWorkerNum = 0;
std::unique_ptr<Token> globalReserveWorkerToken = nullptr;
std::unique_ptr<Token> lowQosReserveWorkerToken = nullptr;
std::unique_ptr<Token> highQosReserveWorkerToken = nullptr;
std::unique_ptr<Token> lowQosUseGlobalWorkerToken = nullptr;
std::unique_ptr<Token> highQosUseGlobalWorkerToken = nullptr;
QosWorkerConfig qosWorkerConfig;
std::atomic<bool> setWorkerMaxNum[QoS::MaxNum()];
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
bool blockAwareInit = false;
bool stopMonitor = false;

View File

@ -20,51 +20,52 @@
#include "types.h"
namespace ffrt {
constexpr unsigned int DEFAULT_GLOBAL_HARDLIMIT = 96;
constexpr unsigned int DEFAULT_PARAMS_VALUE = 0Xffffffff;
constexpr int DEFAULT_MINCONCURRENCY = 4;
constexpr int INTERACTIVE_MAXCONCURRENCY = USE_COROUTINE ? 8 : 40000;
constexpr int DEFAULT_MAXCONCURRENCY = USE_COROUTINE ? 8 : 80000;
constexpr int DEFAULT_HARDLIMIT = 16;
constexpr int QOS_WORKER_MAXNUM = (8 * 16);
constexpr unsigned int DEFAULT_MAXCONCURRENCY = 8;
constexpr unsigned int MAX_MAXCONCURRENCY = 12;
constexpr unsigned int DEFAULT_HARDLIMIT = 44;
constexpr unsigned int DEFAULT_SINGLE_NUM = 8;
constexpr unsigned int DEFAULT_GLOBAL_RESERVE_NUM = 24;
constexpr unsigned int DEFAULT_LOW_RESERVE_NUM = 12;
constexpr unsigned int DEFAULT_HIGH_RESERVE_NUM = 12;
constexpr unsigned int GLOBAL_QOS_MAXNUM = 256;
class QosWorkerConfig {
class GlobalConfig {
public:
struct FfrtQosWorkerNumCfg {
unsigned int hardLimit = DEFAULT_HARDLIMIT;
unsigned int maxConcurrency = DEFAULT_MAXCONCURRENCY;
unsigned int reserveNum = DEFAULT_SINGLE_NUM;
};
GlobalConfig(const GlobalConfig&) = delete;
QosWorkerConfig(int workerNum)
GlobalConfig& operator=(const GlobalConfig&) = delete;
~GlobalConfig() {}
static inline GlobalConfig& Instance()
{
mQosWorkerCfg.resize(workerNum);
static GlobalConfig cfg;
return cfg;
}
QosWorkerConfig(const QosWorkerConfig&) = delete;
QosWorkerConfig& operator=(const QosWorkerConfig&) = delete;
~QosWorkerConfig() {}
unsigned int GetGlobalMaxWorkerNum() const
void setCpuWorkerNum(const QoS& qos, int num)
{
unsigned int ret = 0;
ret += mLowQosReserveWorkerNum;
ret += mHighQosReserveWorkerNum;
ret += mGlobalReserveWorkerNum;
for (const auto &tmpStru : mQosWorkerCfg) {
ret += tmpStru.reserveNum;
if ((num <= 0) || (num > DEFAULT_MAXCONCURRENCY)) {
num = DEFAULT_MAXCONCURRENCY;
}
return ret;
this->cpu_worker_num[qos()] = static_cast<size_t>(num);
}
std::vector<FfrtQosWorkerNumCfg> mQosWorkerCfg;
unsigned int mLowQosReserveWorkerNum = DEFAULT_LOW_RESERVE_NUM;
unsigned int mHighQosReserveWorkerNum = DEFAULT_HIGH_RESERVE_NUM;
unsigned int mGlobalReserveWorkerNum = DEFAULT_GLOBAL_RESERVE_NUM;
size_t getCpuWorkerNum(const QoS& qos)
{
return this->cpu_worker_num[qos()];
}
private:
GlobalConfig()
{
for (auto qos = QoS::Min(); qos < QoS::Max(); ++qos) {
if (qos == static_cast<int>(qos_user_interactive)) {
this->cpu_worker_num[qos] = INTERACTIVE_MAXCONCURRENCY;
} else {
this->cpu_worker_num[qos] = DEFAULT_MAXCONCURRENCY;
}
}
}
size_t cpu_worker_num[QoS::MaxNum()];
};
}

View File

@ -100,6 +100,8 @@ HWTEST_F(DependencyTest, update_qos_success_04, TestSize.Level1)
ffrt::submit([] {
printf("return %d\n", ffrt::this_task::update_qos(static_cast<int>(ffrt::qos_user_initiated)));
});
int ret2 = ffrt_set_cpu_worker_max_num(static_cast<int>(ffrt::qos_user_initiated), 4);
EXPECT_EQ(ret2, 0);
}
HWTEST_F(DependencyTest, update_qos_success_05, TestSize.Level1)
@ -163,6 +165,8 @@ HWTEST_F(DependencyTest, update_qos_failed_02, TestSize.Level1)
ffrt::submit([] {
printf("return %d\n", ffrt::this_task::update_qos(static_cast<int>(ffrt::qos_user_initiated)));
});
int ret1 = ffrt_set_cpu_worker_max_num(static_cast<int>(ffrt::qos_inherit), 4);
EXPECT_EQ(ret1, -1);
}
HWTEST_F(DependencyTest, executor_task_submit_success_01, TestSize.Level1)