Merge branch 'master' of gitee.com:openharmony/resourceschedule_ffrt into master

Signed-off-by: 董洁 <dongjie52@h-partners.com>
This commit is contained in:
董洁 2024-11-13 10:33:22 +00:00 committed by Gitee
commit 797d3a8efd
9 changed files with 78 additions and 45 deletions

View File

@ -1,15 +1,15 @@
---
BasedOnStyle: Google
IndentWidth: 4
ColumnLimit: 120
---
Language: Cpp
AccessModifierOffset: -4
AllowShortIfStatementsOnASingleLine: Never
AlwaysBreakBeforeMultilineStrings: false
DerivePointerAlignment: false
IncludeBlocks: Preserve
IndentCaseBlocks: true
PackConstructorInitializers: CurrentLine
SpacesBeforeTrailingComments: 1
---
---
BasedOnStyle: Google
IndentWidth: 4
ColumnLimit: 120
---
Language: Cpp
AccessModifierOffset: -4
AllowShortIfStatementsOnASingleLine: Never
AlwaysBreakBeforeMultilineStrings: false
DerivePointerAlignment: false
IncludeBlocks: Preserve
IndentCaseBlocks: true
PackConstructorInitializers: CurrentLine
SpacesBeforeTrailingComments: 1
---

View File

@ -112,6 +112,7 @@ void ffrt_wake_coroutine(void* task)
ffrt::IOTaskExecutor* wakedTask = static_cast<ffrt::IOTaskExecutor*>(task);
wakedTask->status = ffrt::ExecTaskStatus::ET_READY;
#ifdef FFRT_LOCAL_QUEUE_ENABLE
// in self-wakeup scenario, tasks are placed in local fifo to delay scheduling, implementing the yeild function
bool selfWakeup = (ffrt::ExecuteCtx::Cur()->exec_task == task);
if (!selfWakeup) {
@ -127,6 +128,7 @@ void ffrt_wake_coroutine(void* task)
}
}
}
#endif
ffrt::LinkedList* node = reinterpret_cast<ffrt::LinkedList *>(&wakedTask->wq);
if (!ffrt::FFRTFacade::GetSchedInstance()->InsertNode(node, wakedTask->qos)) {
@ -136,4 +138,3 @@ void ffrt_wake_coroutine(void* task)
#ifdef __cplusplus
}
#endif

View File

@ -15,6 +15,7 @@
#ifdef FFRT_SEND_EVENT
#include "sysevent.h"
#include "hisysevent.h"
#include "dfx/log/ffrt_log_api.h"
namespace ffrt {
void TaskTimeoutReport(std::stringstream& ss, const std::string& processNameStr, const std::string& senarioName)
@ -27,5 +28,15 @@ void TaskTimeoutReport(std::stringstream& ss, const std::string& processNameStr,
OHOS::HiviewDFX::HiSysEvent::EventType::FAULT, "SENARIO", senarioName,
"PROCESS_NAME", processNameStr, "MSG", sendMsg);
}
void WorkerEscapeReport(const std::string& processName, int qos, size_t totalNum)
{
std::string msg = "qos: " + std::to_string(qos) + ", worker num: " + std::to_string(totalNum);
std::string eventName = "WORKER_ESCAPE";
HiSysEventWrite(OHOS::HiviewDFX::HiSysEvent::Domain::FFRT, eventName,
OHOS::HiviewDFX::HiSysEvent::EventType::FAULT, "SENARIO", "Trigger_Escape",
"PROCESS_NAME", processName, "MSG", msg);
FFRT_LOGW("Process: %s trigger escape. %s", processName.c_str(), msg.c_str());
}
}
#endif

View File

@ -103,7 +103,7 @@ static inline int BlockawareLeaveSleeping(void)
{
unsigned long *slot_ptr = curr_thread_tls_blockaware_slot_of();
int err = 0;
if (*slot_ptr == 0) {
err = -EINVAL;
} else {
@ -149,7 +149,7 @@ static inline int BlockawareLeaveSleeping(void)
{
unsigned long *slot_ptr = curr_thread_tls_blockaware_slot_of();
int err = 0;
if (*slot_ptr == 0) {
err = -EINVAL;
} else {
@ -234,6 +234,18 @@ static inline int BlockawareLoadSnapshot(unsigned long key, struct BlockawareDom
return 0;
}
static inline unsigned int BlockawareLoadSnapshotNrRunningFast(unsigned long key, int domainId)
{
BlockawareKinfoPageS* kinfoPage = reinterpret_cast<BlockawareKinfoPageS*>(key);
return kinfoPage->infoArea.localinfo[domainId].nrRunning;
}
static inline unsigned int BlockawareLoadSnapshotNrBlockedFast(unsigned long key, int domainId)
{
BlockawareKinfoPageS* kinfoPage = reinterpret_cast<BlockawareKinfoPageS*>(key);
return kinfoPage->infoArea.localinfo[domainId].nrBlocked;
}
static inline int BlockawareWaitCond(struct BlockawareWakeupCond *cond)
{
int rc = prctl(HM_PR_SILK_BLOCKAWARE_OPS, BLOCKAWARE_SUBOPS_WAIT, reinterpret_cast<unsigned long>(cond));

View File

@ -16,13 +16,16 @@
#include "eu/cpu_monitor.h"
#include <iostream>
#include <thread>
#include <climits>
#include <unistd.h>
#include <securec.h>
#include "sched/scheduler.h"
#include "eu/execute_unit.h"
#include "dfx/log/ffrt_log_api.h"
#include "dfx/sysevent/sysevent.h"
#include "dfx/trace_record/ffrt_trace_record.h"
#include "internal_inc/config.h"
#include "internal_inc/osal.h"
#include "util/name_manager.h"
#include "sync/poller.h"
#include "util/ffrt_facade.h"
@ -60,23 +63,19 @@ void CPUMonitor::SetupMonitor()
}
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
memset_s(&domainInfoMonitor, sizeof(domainInfoMonitor), 0, sizeof(domainInfoMonitor));
memset_s(&domainInfoNotify, sizeof(domainInfoNotify), 0, sizeof(domainInfoNotify));
wakeupCond.check_ahead = false;
wakeupCond.global.low = 0;
wakeupCond.global.high = 0;
for (int i = 0; i < BLOCKAWARE_DOMAIN_ID_MAX + 1; i++) {
wakeupCond.local[i].low = 0;
if (i < qosMonitorMaxNum) {
wakeupCond.local[i].high = ctrlQueue[i].maxConcurrency;
wakeupCond.local[i].high = UNIT_MAX;
wakeupCond.global.low += wakeupCond.local[i].low;
wakeupCond.global.high += wakeupCond.local[i].high;
wakeupCond.global.high = UNIT_MAX;
} else {
wakeupCond.local[i].high = 0;
}
}
for (int i = 0; i < QoS::MaxNum(); i++) {
exceedUpperWaterLine[i] = false;
}
#endif
}
@ -138,20 +137,13 @@ void CPUMonitor::MonitorMain()
if (taskCount > 0 && domainInfoMonitor.localinfo[i].nrRunning <= wakeupCond.local[i].low) {
Poke(i, taskCount, TaskNotifyType::TASK_ADDED);
}
if (domainInfoMonitor.localinfo[i].nrRunning > wakeupCond.local[i].high) {
exceedUpperWaterLine[i] = true;
}
}
stopMonitor = true;
}
bool CPUMonitor::IsExceedRunningThreshold(const QoS& qos)
{
if (blockAwareInit && exceedUpperWaterLine[qos()]) {
exceedUpperWaterLine[qos()] = false;
return true;
}
return false;
return blockAwareInit && (BlockawareLoadSnapshotNrRunningFast(keyPtr, qos()) > ctrlQueue[qos()].maxConcurrency);
}
bool CPUMonitor::IsBlockAwareInit(void)
@ -274,13 +266,14 @@ void CPUMonitor::Poke(const QoS& qos, uint32_t taskCount, TaskNotifyType notifyT
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
/* There is no need to update running num when executionNum < maxConcurrency */
if (workerCtrl.executionNum >= workerCtrl.maxConcurrency) {
if (blockAwareInit && !BlockawareLoadSnapshot(keyPtr, &domainInfoNotify)) {
if (workerCtrl.executionNum >= domainInfoNotify.localinfo[qos()].nrBlocked) {
if (blockAwareInit) {
auto nrBlocked = BlockawareLoadSnapshotNrBlockedFast(keyPtr, qos());
if (workerCtrl.executionNum >= nrBlocked) {
/* nrRunning may not be updated in a timely manner */
runningNum = workerCtrl.executionNum - domainInfoNotify.localinfo[qos()].nrBlocked;
runningNum = workerCtrl.executionNum - nrBlocked;
} else {
FFRT_LOGE("qos [%d] nrBlocked [%u] is larger than executionNum [%d].",
qos(), domainInfoNotify.localinfo[qos()].nrBlocked, workerCtrl.executionNum);
qos(), nrBlocked, workerCtrl.executionNum);
}
}
}
@ -293,14 +286,23 @@ void CPUMonitor::Poke(const QoS& qos, uint32_t taskCount, TaskNotifyType notifyT
workerCtrl.lock.unlock();
return;
}
if (static_cast<uint32_t>(workerCtrl.sleepingWorkerNum) > 0) {
if ((static_cast<uint32_t>(workerCtrl.sleepingWorkerNum) > 0) && (runningNum < workerCtrl.maxConcurrency)) {
workerCtrl.lock.unlock();
ops.WakeupWorkers(qos);
} else if ((runningNum < workerCtrl.maxConcurrency) && (totalNum < workerCtrl.hardLimit)) {
} else if (((runningNum < workerCtrl.maxConcurrency) && (totalNum < workerCtrl.hardLimit)) || (runningNum == 0)) {
workerCtrl.executionNum++;
FFRTTraceRecord::WorkRecord((int)qos, workerCtrl.executionNum);
workerCtrl.lock.unlock();
ops.IncWorker(qos);
#ifdef FFRT_SEND_EVENT
if (!((runningNum < workerCtrl.maxConcurrency) && (totalNum < workerCtrl.hardLimit))) {
constexpr int processNameLen = 1024;
static std::once_flag flag;
static char processName[processNameLen];
std::call_once(flag, []() { GetProcessName(processName, processNameLen); });
WorkerEscapeReport(processName, static_cast<int>(qos), totalNum);
}
#endif
} else {
if (workerCtrl.pollWaitFlag) {
FFRTFacade::GetPPInstance().GetPoller(qos).WakeUp();

View File

@ -95,8 +95,6 @@ protected:
int qosMonitorMaxNum = std::min(QoS::Max(), BLOCKAWARE_DOMAIN_ID_MAX + 1);
BlockawareWakeupCond wakeupCond;
BlockawareDomainInfoArea domainInfoMonitor;
BlockawareDomainInfoArea domainInfoNotify;
std::atomic<bool> exceedUpperWaterLine[QoS::MaxNum()];
#endif
private:
void SetupMonitor();

View File

@ -34,6 +34,7 @@
namespace {
int PLACE_HOLDER = 0;
const unsigned int TRY_POLL_FREQ = 51;
constexpr int CO_CREATE_RETRY_INTERVAL = 500 * 1000;
}
namespace ffrt {
@ -158,6 +159,7 @@ void CPUWorker::RunTaskLifo(ffrt_executor_task_t* task, CPUWorker* worker)
void* CPUWorker::GetTask(CPUWorker* worker)
{
#ifdef FFRT_LOCAL_QUEUE_ENABLE
// periodically pick up tasks from the global queue to prevent global queue starvation
if (worker->tick % worker->global_interval == 0) {
worker->tick = 0;
@ -183,6 +185,14 @@ void* CPUWorker::GetTask(CPUWorker* worker)
}
return worker->localFifo.PopHead();
#else
CPUEUTask* task = worker->ops.PickUpTaskBatch(worker);
if (task != nullptr) {
worker->ops.NotifyTaskPicked(worker);
}
return task;
#endif
}
PollerRet CPUWorker::TryPoll(CPUWorker* worker, int timeout)
@ -250,6 +260,7 @@ void CPUWorker::WorkerLooperDefault(WorkerThread* p)
continue;
}
#ifdef FFRT_LOCAL_QUEUE_ENABLE
// pick up tasks from global queue
CPUEUTask* task = worker->ops.PickUpTaskBatch(worker);
// the worker is not notified when the task attribute is set not to notify worker
@ -278,6 +289,7 @@ void CPUWorker::WorkerLooperDefault(WorkerThread* p)
worker->tick = 1;
continue;
}
#endif
// enable a worker to enter the epoll wait -1 state and continuously listen to fd or timer events
// only one worker enters this state at a QoS level

View File

@ -65,7 +65,7 @@ public:
}
#endif
}
FFRT_LOGW("WorkerThread enter destruction");
FFRT_LOGI("Qos %d WorkerThread enter destruction.", qos());
Detach();
}

View File

@ -20,7 +20,6 @@
#include <fcntl.h>
#include <sys/syscall.h>
#include <unistd.h>
#include <sys/prctl.h>
#define API_ATTRIBUTE(attr) __attribute__(attr)
#define unlikely(x) __builtin_expect(!!(x), 0)
@ -51,10 +50,8 @@ static inline void GetProcessName(char* processName, int bufferLength)
if (ret != -1) {
processName[ret] = 0;
}
syscall(SYS_close, fd);
} else {
// no permission for /proc/self/cmdline, try prctl system call
prctl(PR_GET_NAME, processName);
}
}
#endif