From 80375021394983f5e3a8403eaad2edaa5a4ca06c Mon Sep 17 00:00:00 2001 From: chendi Date: Wed, 5 Jun 2024 15:13:33 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dtimer=20query=E5=8D=A1?= =?UTF-8?q?=E6=AD=BB=E9=97=AE=E9=A2=98=E5=92=8Cconcurrent=20queue=E4=BC=98?= =?UTF-8?q?=E5=85=88=E7=BA=A7=E9=94=99=E8=AF=AF=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: chendi Change-Id: Ib2222a18f48345e2c1fddf39e34b308cf4b0fa72 --- interfaces/inner_api/c/queue_ext.h | 18 ++++---- src/queue/base_queue.cpp | 22 ++-------- src/queue/base_queue.h | 1 - src/queue/concurrent_queue.cpp | 3 ++ src/queue/concurrent_queue.h | 4 +- src/queue/eventhandler_adapter_queue.cpp | 50 ++++++----------------- src/queue/queue_handler.cpp | 7 +--- src/queue/queue_handler.h | 1 - src/sync/poller.cpp | 8 ++-- src/tm/queue_task.cpp | 8 ++-- src/util/time_format.h | 52 ++++++++++++++++++++++++ 11 files changed, 91 insertions(+), 83 deletions(-) create mode 100644 src/util/time_format.h diff --git a/interfaces/inner_api/c/queue_ext.h b/interfaces/inner_api/c/queue_ext.h index 0ee460d..440f22a 100644 --- a/interfaces/inner_api/c/queue_ext.h +++ b/interfaces/inner_api/c/queue_ext.h @@ -13,8 +13,8 @@ * limitations under the License. */ -#ifndef FFRT_INNER_API_C_QUEUE_H -#define FFRT_INNER_API_C_QUEUE_H +#ifndef FFRT_INNER_API_C_QUEUE_EXT_H +#define FFRT_INNER_API_C_QUEUE_EXT_H #include #include "c/queue.h" @@ -26,15 +26,15 @@ typedef enum { } ffrt_inner_queue_type_t; typedef enum { - /** highest priority, should be distributed until the tasks in the queue are completed */ + /* highest priority, should be distributed until the tasks in the queue are completed */ ffrt_inner_queue_priority_vip = 0, - /** should be distributed at once if possible, handle time equals to send time, prior to high level */ + /* should be distributed at once if possible, handle time equals to send time, prior to high level */ ffrt_inner_queue_priority_immediate, - /** high priority, sorted by handle time, prior to low level. */ + /* high priority, sorted by handle time, prior to low level. */ ffrt_inner_queue_priority_high, - /** low priority, sorted by handle time, prior to idle level. */ + /* low priority, sorted by handle time, prior to idle level. */ ffrt_inner_queue_priority_low, - /** lowest priority, sorted by handle time, only distribute when there is no other level inside queue. */ + /* lowest priority, sorted by handle time, only distribute when there is no other level inside queue. */ ffrt_inner_queue_priority_idle, } ffrt_inner_queue_priority_t; @@ -83,7 +83,7 @@ FFRT_C_API void ffrt_queue_cancel_all(ffrt_queue_t queue); * @brief Cancels a task with the given name in the queue. * * @param queue Indicates a queue handle. - * @param name Indicates name of the task to be canceled, regular expressions are supperted. + * @param name Indicates name of the task to be canceled, regular expressions are supported. * @return Returns 0 if the task is canceled; returns 1 otherwise. * @version 1.0 @@ -101,7 +101,7 @@ FFRT_C_API bool ffrt_queue_is_idle(ffrt_queue_t queue); /** * @brief Dumps queue information; - * including current execution, historical execution, and remaining unexecuted task information, etc. + including current execution, historical execution, and remaining unexecuted task information, etc. * * @param queue Indicates a queue handle. * @param tag Indicates tag prefix for dump information. diff --git a/src/queue/base_queue.cpp b/src/queue/base_queue.cpp index baca8cd..5f4ca04 100644 --- a/src/queue/base_queue.cpp +++ b/src/queue/base_queue.cpp @@ -24,7 +24,7 @@ namespace { using CreateFunc = std::unique_ptr(*)(uint32_t, const ffrt_queue_attr_t*); -const std::map CREATE_FUNC_MAP = { +const std::unordered_map CREATE_FUNC_MAP = { { ffrt_queue_serial, ffrt::CreateSerialQueue }, { ffrt_queue_concurrent, ffrt::CreateConcurrentQueue }, { ffrt_queue_eventhandler_interactive, ffrt::CreateEventHandlerInteractiveQueue }, @@ -90,21 +90,6 @@ int BaseQueue::Remove(const QueueTask* task) return FAILED; } -int BaseQueue::GetNextTimeout() -{ - std::unique_lock lock(mutex_); - if (whenMap_.empty()) { - return -1; - } - uint64_t now = GetNow(); - if (now >= whenMap_.begin()->first) { - return 0; - } - uint64_t diff = whenMap_.begin()->first - now; - uint64_t timeout = (diff - 1) / 1000 + 1; // us->ms - return timeout > INT_MAX ? INT_MAX : static_cast(timeout); -} - bool BaseQueue::HasTask(const char* name) { std::unique_lock lock(mutex_); @@ -119,6 +104,7 @@ void BaseQueue::ClearWhenMap() if (it->second) { it->second->Notify(); it->second->Destroy(); + it->second = nullptr; } } whenMap_.clear(); @@ -128,9 +114,7 @@ void BaseQueue::ClearWhenMap() std::unique_ptr CreateQueue(int queueType, uint32_t queueId, const ffrt_queue_attr_t* attr) { const auto iter = CREATE_FUNC_MAP.find(queueType); - if (iter == CREATE_FUNC_MAP.end()) { - return nullptr; - } + FFRT_COND_DO_ERR((iter == CREATE_FUNC_MAP.end()), return nullptr, "invalid queue type"); return iter->second(queueId, attr); } diff --git a/src/queue/base_queue.h b/src/queue/base_queue.h index 4689f8c..6bf37e2 100644 --- a/src/queue/base_queue.h +++ b/src/queue/base_queue.h @@ -55,7 +55,6 @@ public: return false; } - int GetNextTimeout(); inline uint64_t GetMapSize() { std::unique_lock lock(mutex_); diff --git a/src/queue/concurrent_queue.cpp b/src/queue/concurrent_queue.cpp index 5221c2d..eafca7b 100644 --- a/src/queue/concurrent_queue.cpp +++ b/src/queue/concurrent_queue.cpp @@ -34,6 +34,9 @@ int ConcurrentQueue::Push(QueueTask* task) { std::unique_lock lock(mutex_); FFRT_COND_DO_ERR(isExit_, return FAILED, "cannot push task, [queueId=%u] is exiting", queueId_); + if (task->GetPriority() > ffrt_queue_priority_idle) { + task->SetPriority(ffrt_queue_priority_low); + } if (loop_ != nullptr) { if (task->GetDelay() == 0) { diff --git a/src/queue/concurrent_queue.h b/src/queue/concurrent_queue.h index 2db6e7d..f51e4dd 100644 --- a/src/queue/concurrent_queue.h +++ b/src/queue/concurrent_queue.h @@ -20,8 +20,8 @@ namespace ffrt { class ConcurrentQueue : public BaseQueue { public: - explicit ConcurrentQueue(const uint32_t queueId, const int maxConcurrency = 1) : - BaseQueue(queueId), maxConcurrency_(maxConcurrency) + explicit ConcurrentQueue(const uint32_t queueId, const int maxConcurrency = 1) + : BaseQueue(queueId), maxConcurrency_(maxConcurrency) { dequeFunc_ = QueueStrategy::DequeSingleByPriority; } diff --git a/src/queue/eventhandler_adapter_queue.cpp b/src/queue/eventhandler_adapter_queue.cpp index 79efbcd..477a7fa 100644 --- a/src/queue/eventhandler_adapter_queue.cpp +++ b/src/queue/eventhandler_adapter_queue.cpp @@ -17,49 +17,23 @@ #include #include #include "dfx/log/ffrt_log_api.h" +#include "util/time_format.h" namespace { -constexpr int DATETIME_STRING_LENGTH = 80; -constexpr int MAX_MS_LENGTH = 3; -constexpr int MS_PER_SECOND = 1000; constexpr int MAX_DUMP_SIZE = 500; constexpr uint8_t HISTORY_TASK_NUM_POWER = 32; -std::string FormatDateString(const std::chrono::system_clock::time_point& timePoint) -{ - auto tp = std::chrono::time_point_cast(timePoint); - auto tt = std::chrono::system_clock::to_time_t(timePoint); - auto ms = tp.time_since_epoch().count() % MS_PER_SECOND; - auto msString = std::to_string(ms); - if (msString.length() < MAX_MS_LENGTH) { - msString = std::string(MAX_MS_LENGTH - msString.length(), '0') + msString; - } - struct tm curTime = {0}; - localtime_r(&tt, &curTime); - char sysTime[DATETIME_STRING_LENGTH]; - std::strftime(sysTime, sizeof(char) * DATETIME_STRING_LENGTH, "%Y-%m-%d %I:%M:%S.", &curTime); - return std::string(sysTime) + msString; -} - -std::string FormatDateString(uint64_t steadyClockTimeStamp) -{ - std::chrono::microseconds ms(steadyClockTimeStamp - std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()).count()); - auto tp = std::chrono::system_clock::now() + ms; - return FormatDateString(tp); -} - void DumpRunningTaskInfo(const char* tag, const ffrt::HistoryTask& currentRunningTask, std::ostringstream& oss) { oss << tag << " Current Running: "; if (currentRunningTask.beginTime_ == std::numeric_limits::max()) { oss << "{}"; } else { - oss << "start at " << FormatDateString(currentRunningTask.beginTime_) << ", "; + oss << "start at " << ffrt::FormatDateString(currentRunningTask.beginTime_) << ", "; oss << "Event { "; oss << "send thread = " << currentRunningTask.senderKernelThreadId_; - oss << ", send time = " << FormatDateString(currentRunningTask.sendTime_); - oss << ", handle time = " << FormatDateString(currentRunningTask.handleTime_); + oss << ", send time = " << ffrt::FormatDateString(currentRunningTask.sendTime_); + oss << ", handle time = " << ffrt::FormatDateString(currentRunningTask.handleTime_); oss << ", task name = " << currentRunningTask.taskName_; oss << " }\n"; } @@ -76,10 +50,10 @@ void DumpHistoryTaskInfo(const char* tag, const std::vector& oss << tag << " No. " << i + 1 << " : Event { "; oss << "send thread = " << historyTask.senderKernelThreadId_; - oss << ", send time = " << FormatDateString(historyTask.sendTime_); - oss << ", handle time = " << FormatDateString(historyTask.handleTime_); - oss << ", trigger time = " << FormatDateString(historyTask.triggerTime_); - oss << ", complete time = " << FormatDateString(historyTask.completeTime_); + oss << ", send time = " << ffrt::FormatDateString(historyTask.sendTime_); + oss << ", handle time = " << ffrt::FormatDateString(historyTask.handleTime_); + oss << ", trigger time = " << ffrt::FormatDateString(historyTask.triggerTime_); + oss << ", complete time = " << ffrt::FormatDateString(historyTask.completeTime_); oss << ", task name = " << historyTask.taskName_; oss << " }\n"; } @@ -104,8 +78,8 @@ void DumpUnexecutedTaskInfo(const char* tag, auto taskDumpFun = [&](int n, ffrt::QueueTask* task) { oss << tag << " No. " << n << " : Event { "; oss << "send thread = " << task->GetSenderKernelThreadId(); - oss << ", send time = " << FormatDateString(task->GetUptime() - task->GetDelay()); - oss << ", handle time = " << FormatDateString(task->GetUptime()); + oss << ", send time = " << ffrt::FormatDateString(task->GetUptime() - task->GetDelay()); + oss << ", handle time = " << ffrt::FormatDateString(task->GetUptime()); oss << ", task name = " << task->label; oss << " }\n"; dumpSize--; @@ -215,8 +189,8 @@ int EventHandlerAdapterQueue::DumpSize(ffrt_inner_queue_priority_t priority) { std::unique_lock lock(mutex_); return std::count_if(whenMap_.begin(), whenMap_.end(), [=](const auto& pair) { - return static_cast(pair.second->GetPriority()) == priority; - }); + return static_cast(pair.second->GetPriority()) == priority; + }); } void EventHandlerAdapterQueue::SetCurrentRunningTask(QueueTask* task) diff --git a/src/queue/queue_handler.cpp b/src/queue/queue_handler.cpp index e3f0176..4e1892d 100644 --- a/src/queue/queue_handler.cpp +++ b/src/queue/queue_handler.cpp @@ -28,6 +28,7 @@ constexpr uint32_t STRING_SIZE_MAX = 128; constexpr uint32_t TASK_DONE_WAIT_UNIT = 10; std::atomic_uint32_t queueId(0); } + namespace ffrt { QueueHandler::QueueHandler(const char* name, const ffrt_queue_attr_t* attr, const int type) : queueId_(queueId++) @@ -107,12 +108,6 @@ QueueTask* QueueHandler::PickUpTask() return queue_->Pull(); } -uint64_t QueueHandler::GetNextTimeout() -{ - FFRT_COND_DO_ERR((queue_ == nullptr), return 0, "[queueId=%u] constructed failed", queueId_); - return queue_->GetNextTimeout(); -} - void QueueHandler::Submit(QueueTask* task) { FFRT_COND_DO_ERR((queue_ == nullptr), return, "cannot submit, [queueId=%u] constructed failed", queueId_); diff --git a/src/queue/queue_handler.h b/src/queue/queue_handler.h index 20e2917..3bad98b 100644 --- a/src/queue/queue_handler.h +++ b/src/queue/queue_handler.h @@ -46,7 +46,6 @@ public: bool ClearLoop(); QueueTask* PickUpTask(); - uint64_t GetNextTimeout(); inline bool IsValidForLoop() { diff --git a/src/sync/poller.cpp b/src/sync/poller.cpp index 89e2cfc..a88b0e7 100644 --- a/src/sync/poller.cpp +++ b/src/sync/poller.cpp @@ -349,23 +349,23 @@ void Poller::ExecuteTimerCb(time_point_t timer) noexcept break; } timerEmpty_.store(timerMap_.empty()); - + + timerMutex_.unlock(); for (const auto& data : timerData) { - timerMutex_.unlock(); if (data.cb) { data.cb(data.data); } else if (data.task != nullptr) { ProcessTimerDataCb(data.task); } - timerMutex_.lock(); + if (data.repeat) { + std::lock_guard lock(timerMutex_); executedHandle_.erase(data.handle); RegisterTimerImpl(data); } else { executedHandle_[data.handle] = TimerStatus::EXECUTED; } } - timerMutex_.unlock(); } void Poller::RegisterTimerImpl(const TimerDataWithCb& data) noexcept diff --git a/src/tm/queue_task.cpp b/src/tm/queue_task.cpp index 1fad9d1..cb5e9d0 100644 --- a/src/tm/queue_task.cpp +++ b/src/tm/queue_task.cpp @@ -24,7 +24,11 @@ QueueTask::QueueTask(QueueHandler* handler, const task_attr_private* attr, bool { type = ffrt_queue_task; if (handler) { - label = handler->GetName(); + if (attr) { + label = handler->GetName() + "_" + attr->name_ + "_" + std::to_string(gid); + } else { + label = handler->GetName() + "_" + std::to_string(gid); + } } fq_we.task = reinterpret_cast(this); @@ -36,9 +40,7 @@ QueueTask::QueueTask(QueueHandler* handler, const task_attr_private* attr, bool qos_ = attr->qos_; uptime_ += delay_; prio_ = attr->prio_; - label = label + "_" + attr->name_; } - label = label + "_" + std::to_string(gid); FFRT_LOGD("ctor task [gid=%llu], delay=%lluus, type=%llu, prio=%u", gid, delay_, type, prio_); } diff --git a/src/util/time_format.h b/src/util/time_format.h new file mode 100644 index 0000000..3d3dcd7 --- /dev/null +++ b/src/util/time_format.h @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2023 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef UTIL_TIMER_FORMAT_H +#define UTIL_TIMER_FORMAT_H + +#include +#include + +namespace ffrt { +std::string FormatDateString(const std::chrono::system_clock::time_point& timePoint) +{ + constexpr int MaxMsLength = 3; + constexpr int MsPerSecond = 1000; + constexpr int DatetimeStringLength = 80; + + auto tp = std::chrono::time_point_cast(timePoint); + auto tt = std::chrono::system_clock::to_time_t(timePoint); + auto ms = tp.time_since_epoch().count() % MsPerSecond; + auto msString = std::to_string(ms); + if (msString.length() < MaxMsLength) { + msString = std::string(MaxMsLength - msString.length(), '0') + msString; + } + struct tm curTime = {0}; + localtime_r(&tt, &curTime); + char sysTime[DatetimeStringLength]; + std::strftime(sysTime, sizeof(char) * DatetimeStringLength, "%Y-%m-%d %I:%M:%S.", &curTime); + return std::string(sysTime) + msString; +} + +std::string FormatDateString(uint64_t steadyClockTimeStamp) +{ + std::chrono::microseconds ms(steadyClockTimeStamp - std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()).count()); + auto tp = std::chrono::system_clock::now() + ms; + return FormatDateString(tp); +} +} + +#endif // UTIL_TIMER_FORMAT_H \ No newline at end of file