修复timer query卡死问题和concurrent queue优先级错误问题

Signed-off-by: chendi <cdrom000@126.com>
Change-Id: Ib2222a18f48345e2c1fddf39e34b308cf4b0fa72
This commit is contained in:
chendi 2024-06-05 15:13:33 +08:00
parent e2c6ac07be
commit 8037502139
11 changed files with 91 additions and 83 deletions

View File

@ -13,8 +13,8 @@
* limitations under the License.
*/
#ifndef FFRT_INNER_API_C_QUEUE_H
#define FFRT_INNER_API_C_QUEUE_H
#ifndef FFRT_INNER_API_C_QUEUE_EXT_H
#define FFRT_INNER_API_C_QUEUE_EXT_H
#include <stdbool.h>
#include "c/queue.h"
@ -26,15 +26,15 @@ typedef enum {
} ffrt_inner_queue_type_t;
typedef enum {
/** highest priority, should be distributed until the tasks in the queue are completed */
/* highest priority, should be distributed until the tasks in the queue are completed */
ffrt_inner_queue_priority_vip = 0,
/** should be distributed at once if possible, handle time equals to send time, prior to high level */
/* should be distributed at once if possible, handle time equals to send time, prior to high level */
ffrt_inner_queue_priority_immediate,
/** high priority, sorted by handle time, prior to low level. */
/* high priority, sorted by handle time, prior to low level. */
ffrt_inner_queue_priority_high,
/** low priority, sorted by handle time, prior to idle level. */
/* low priority, sorted by handle time, prior to idle level. */
ffrt_inner_queue_priority_low,
/** lowest priority, sorted by handle time, only distribute when there is no other level inside queue. */
/* lowest priority, sorted by handle time, only distribute when there is no other level inside queue. */
ffrt_inner_queue_priority_idle,
} ffrt_inner_queue_priority_t;
@ -83,7 +83,7 @@ FFRT_C_API void ffrt_queue_cancel_all(ffrt_queue_t queue);
* @brief Cancels a task with the given name in the queue.
*
* @param queue Indicates a queue handle.
* @param name Indicates name of the task to be canceled, regular expressions are supperted.
* @param name Indicates name of the task to be canceled, regular expressions are supported.
* @return Returns <b>0</b> if the task is canceled;
returns <b>1</b> otherwise.
* @version 1.0
@ -101,7 +101,7 @@ FFRT_C_API bool ffrt_queue_is_idle(ffrt_queue_t queue);
/**
* @brief Dumps queue information;
* including current execution, historical execution, and remaining unexecuted task information, etc.
including current execution, historical execution, and remaining unexecuted task information, etc.
*
* @param queue Indicates a queue handle.
* @param tag Indicates tag prefix for dump information.

View File

@ -24,7 +24,7 @@
namespace {
using CreateFunc = std::unique_ptr<ffrt::BaseQueue>(*)(uint32_t, const ffrt_queue_attr_t*);
const std::map<int, CreateFunc> CREATE_FUNC_MAP = {
const std::unordered_map<int, CreateFunc> CREATE_FUNC_MAP = {
{ ffrt_queue_serial, ffrt::CreateSerialQueue },
{ ffrt_queue_concurrent, ffrt::CreateConcurrentQueue },
{ ffrt_queue_eventhandler_interactive, ffrt::CreateEventHandlerInteractiveQueue },
@ -90,21 +90,6 @@ int BaseQueue::Remove(const QueueTask* task)
return FAILED;
}
int BaseQueue::GetNextTimeout()
{
std::unique_lock lock(mutex_);
if (whenMap_.empty()) {
return -1;
}
uint64_t now = GetNow();
if (now >= whenMap_.begin()->first) {
return 0;
}
uint64_t diff = whenMap_.begin()->first - now;
uint64_t timeout = (diff - 1) / 1000 + 1; // us->ms
return timeout > INT_MAX ? INT_MAX : static_cast<int>(timeout);
}
bool BaseQueue::HasTask(const char* name)
{
std::unique_lock lock(mutex_);
@ -119,6 +104,7 @@ void BaseQueue::ClearWhenMap()
if (it->second) {
it->second->Notify();
it->second->Destroy();
it->second = nullptr;
}
}
whenMap_.clear();
@ -128,9 +114,7 @@ void BaseQueue::ClearWhenMap()
std::unique_ptr<BaseQueue> CreateQueue(int queueType, uint32_t queueId, const ffrt_queue_attr_t* attr)
{
const auto iter = CREATE_FUNC_MAP.find(queueType);
if (iter == CREATE_FUNC_MAP.end()) {
return nullptr;
}
FFRT_COND_DO_ERR((iter == CREATE_FUNC_MAP.end()), return nullptr, "invalid queue type");
return iter->second(queueId, attr);
}

View File

@ -55,7 +55,6 @@ public:
return false;
}
int GetNextTimeout();
inline uint64_t GetMapSize()
{
std::unique_lock lock(mutex_);

View File

@ -34,6 +34,9 @@ int ConcurrentQueue::Push(QueueTask* task)
{
std::unique_lock lock(mutex_);
FFRT_COND_DO_ERR(isExit_, return FAILED, "cannot push task, [queueId=%u] is exiting", queueId_);
if (task->GetPriority() > ffrt_queue_priority_idle) {
task->SetPriority(ffrt_queue_priority_low);
}
if (loop_ != nullptr) {
if (task->GetDelay() == 0) {

View File

@ -20,8 +20,8 @@
namespace ffrt {
class ConcurrentQueue : public BaseQueue {
public:
explicit ConcurrentQueue(const uint32_t queueId, const int maxConcurrency = 1) :
BaseQueue(queueId), maxConcurrency_(maxConcurrency)
explicit ConcurrentQueue(const uint32_t queueId, const int maxConcurrency = 1)
: BaseQueue(queueId), maxConcurrency_(maxConcurrency)
{
dequeFunc_ = QueueStrategy<QueueTask>::DequeSingleByPriority;
}

View File

@ -17,49 +17,23 @@
#include <securec.h>
#include <sstream>
#include "dfx/log/ffrt_log_api.h"
#include "util/time_format.h"
namespace {
constexpr int DATETIME_STRING_LENGTH = 80;
constexpr int MAX_MS_LENGTH = 3;
constexpr int MS_PER_SECOND = 1000;
constexpr int MAX_DUMP_SIZE = 500;
constexpr uint8_t HISTORY_TASK_NUM_POWER = 32;
std::string FormatDateString(const std::chrono::system_clock::time_point& timePoint)
{
auto tp = std::chrono::time_point_cast<std::chrono::milliseconds>(timePoint);
auto tt = std::chrono::system_clock::to_time_t(timePoint);
auto ms = tp.time_since_epoch().count() % MS_PER_SECOND;
auto msString = std::to_string(ms);
if (msString.length() < MAX_MS_LENGTH) {
msString = std::string(MAX_MS_LENGTH - msString.length(), '0') + msString;
}
struct tm curTime = {0};
localtime_r(&tt, &curTime);
char sysTime[DATETIME_STRING_LENGTH];
std::strftime(sysTime, sizeof(char) * DATETIME_STRING_LENGTH, "%Y-%m-%d %I:%M:%S.", &curTime);
return std::string(sysTime) + msString;
}
std::string FormatDateString(uint64_t steadyClockTimeStamp)
{
std::chrono::microseconds ms(steadyClockTimeStamp - std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count());
auto tp = std::chrono::system_clock::now() + ms;
return FormatDateString(tp);
}
void DumpRunningTaskInfo(const char* tag, const ffrt::HistoryTask& currentRunningTask, std::ostringstream& oss)
{
oss << tag << " Current Running: ";
if (currentRunningTask.beginTime_ == std::numeric_limits<uint64_t>::max()) {
oss << "{}";
} else {
oss << "start at " << FormatDateString(currentRunningTask.beginTime_) << ", ";
oss << "start at " << ffrt::FormatDateString(currentRunningTask.beginTime_) << ", ";
oss << "Event { ";
oss << "send thread = " << currentRunningTask.senderKernelThreadId_;
oss << ", send time = " << FormatDateString(currentRunningTask.sendTime_);
oss << ", handle time = " << FormatDateString(currentRunningTask.handleTime_);
oss << ", send time = " << ffrt::FormatDateString(currentRunningTask.sendTime_);
oss << ", handle time = " << ffrt::FormatDateString(currentRunningTask.handleTime_);
oss << ", task name = " << currentRunningTask.taskName_;
oss << " }\n";
}
@ -76,10 +50,10 @@ void DumpHistoryTaskInfo(const char* tag, const std::vector<ffrt::HistoryTask>&
oss << tag << " No. " << i + 1 << " : Event { ";
oss << "send thread = " << historyTask.senderKernelThreadId_;
oss << ", send time = " << FormatDateString(historyTask.sendTime_);
oss << ", handle time = " << FormatDateString(historyTask.handleTime_);
oss << ", trigger time = " << FormatDateString(historyTask.triggerTime_);
oss << ", complete time = " << FormatDateString(historyTask.completeTime_);
oss << ", send time = " << ffrt::FormatDateString(historyTask.sendTime_);
oss << ", handle time = " << ffrt::FormatDateString(historyTask.handleTime_);
oss << ", trigger time = " << ffrt::FormatDateString(historyTask.triggerTime_);
oss << ", complete time = " << ffrt::FormatDateString(historyTask.completeTime_);
oss << ", task name = " << historyTask.taskName_;
oss << " }\n";
}
@ -104,8 +78,8 @@ void DumpUnexecutedTaskInfo(const char* tag,
auto taskDumpFun = [&](int n, ffrt::QueueTask* task) {
oss << tag << " No. " << n << " : Event { ";
oss << "send thread = " << task->GetSenderKernelThreadId();
oss << ", send time = " << FormatDateString(task->GetUptime() - task->GetDelay());
oss << ", handle time = " << FormatDateString(task->GetUptime());
oss << ", send time = " << ffrt::FormatDateString(task->GetUptime() - task->GetDelay());
oss << ", handle time = " << ffrt::FormatDateString(task->GetUptime());
oss << ", task name = " << task->label;
oss << " }\n";
dumpSize--;
@ -215,8 +189,8 @@ int EventHandlerAdapterQueue::DumpSize(ffrt_inner_queue_priority_t priority)
{
std::unique_lock lock(mutex_);
return std::count_if(whenMap_.begin(), whenMap_.end(), [=](const auto& pair) {
return static_cast<ffrt_inner_queue_priority_t>(pair.second->GetPriority()) == priority;
});
return static_cast<ffrt_inner_queue_priority_t>(pair.second->GetPriority()) == priority;
});
}
void EventHandlerAdapterQueue::SetCurrentRunningTask(QueueTask* task)

View File

@ -28,6 +28,7 @@ constexpr uint32_t STRING_SIZE_MAX = 128;
constexpr uint32_t TASK_DONE_WAIT_UNIT = 10;
std::atomic_uint32_t queueId(0);
}
namespace ffrt {
QueueHandler::QueueHandler(const char* name, const ffrt_queue_attr_t* attr, const int type)
: queueId_(queueId++)
@ -107,12 +108,6 @@ QueueTask* QueueHandler::PickUpTask()
return queue_->Pull();
}
uint64_t QueueHandler::GetNextTimeout()
{
FFRT_COND_DO_ERR((queue_ == nullptr), return 0, "[queueId=%u] constructed failed", queueId_);
return queue_->GetNextTimeout();
}
void QueueHandler::Submit(QueueTask* task)
{
FFRT_COND_DO_ERR((queue_ == nullptr), return, "cannot submit, [queueId=%u] constructed failed", queueId_);

View File

@ -46,7 +46,6 @@ public:
bool ClearLoop();
QueueTask* PickUpTask();
uint64_t GetNextTimeout();
inline bool IsValidForLoop()
{

View File

@ -349,23 +349,23 @@ void Poller::ExecuteTimerCb(time_point_t timer) noexcept
break;
}
timerEmpty_.store(timerMap_.empty());
timerMutex_.unlock();
for (const auto& data : timerData) {
timerMutex_.unlock();
if (data.cb) {
data.cb(data.data);
} else if (data.task != nullptr) {
ProcessTimerDataCb(data.task);
}
timerMutex_.lock();
if (data.repeat) {
std::lock_guard lock(timerMutex_);
executedHandle_.erase(data.handle);
RegisterTimerImpl(data);
} else {
executedHandle_[data.handle] = TimerStatus::EXECUTED;
}
}
timerMutex_.unlock();
}
void Poller::RegisterTimerImpl(const TimerDataWithCb& data) noexcept

View File

@ -24,7 +24,11 @@ QueueTask::QueueTask(QueueHandler* handler, const task_attr_private* attr, bool
{
type = ffrt_queue_task;
if (handler) {
label = handler->GetName();
if (attr) {
label = handler->GetName() + "_" + attr->name_ + "_" + std::to_string(gid);
} else {
label = handler->GetName() + "_" + std::to_string(gid);
}
}
fq_we.task = reinterpret_cast<CPUEUTask*>(this);
@ -36,9 +40,7 @@ QueueTask::QueueTask(QueueHandler* handler, const task_attr_private* attr, bool
qos_ = attr->qos_;
uptime_ += delay_;
prio_ = attr->prio_;
label = label + "_" + attr->name_;
}
label = label + "_" + std::to_string(gid);
FFRT_LOGD("ctor task [gid=%llu], delay=%lluus, type=%llu, prio=%u", gid, delay_, type, prio_);
}

52
src/util/time_format.h Normal file
View File

@ -0,0 +1,52 @@
/*
* Copyright (c) 2023 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef UTIL_TIMER_FORMAT_H
#define UTIL_TIMER_FORMAT_H
#include <chrono>
#include <string>
namespace ffrt {
std::string FormatDateString(const std::chrono::system_clock::time_point& timePoint)
{
constexpr int MaxMsLength = 3;
constexpr int MsPerSecond = 1000;
constexpr int DatetimeStringLength = 80;
auto tp = std::chrono::time_point_cast<std::chrono::milliseconds>(timePoint);
auto tt = std::chrono::system_clock::to_time_t(timePoint);
auto ms = tp.time_since_epoch().count() % MsPerSecond;
auto msString = std::to_string(ms);
if (msString.length() < MaxMsLength) {
msString = std::string(MaxMsLength - msString.length(), '0') + msString;
}
struct tm curTime = {0};
localtime_r(&tt, &curTime);
char sysTime[DatetimeStringLength];
std::strftime(sysTime, sizeof(char) * DatetimeStringLength, "%Y-%m-%d %I:%M:%S.", &curTime);
return std::string(sysTime) + msString;
}
std::string FormatDateString(uint64_t steadyClockTimeStamp)
{
std::chrono::microseconds ms(steadyClockTimeStamp - std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count());
auto tp = std::chrono::system_clock::now() + ms;
return FormatDateString(tp);
}
}
#endif // UTIL_TIMER_FORMAT_H