From 33afc8aba89cd5e15b04ba1baf6c7fff7e277a0f Mon Sep 17 00:00:00 2001 From: shengxia <496830733@qq.com> Date: Fri, 27 Oct 2023 10:15:36 +0800 Subject: [PATCH] reduce the queue_submit switchover probability Signed-off-by: shengxia <496830733@qq.com> --- src/queue/serial_looper.cpp | 2 ++ src/queue/serial_looper.h | 2 +- src/queue/serial_queue.cpp | 47 ++++++++++++++++++++----------------- src/queue/serial_queue.h | 6 +++++ 4 files changed, 34 insertions(+), 23 deletions(-) diff --git a/src/queue/serial_looper.cpp b/src/queue/serial_looper.cpp index 57ae0f1..0ee921f 100644 --- a/src/queue/serial_looper.cpp +++ b/src/queue/serial_looper.cpp @@ -83,6 +83,8 @@ void SerialLooper::Run() while (!isExit_.load()) { ITask* task = queue_->Next(); if (task) { + FFRT_LOGI("pick task gid=%llu, qid=%u [%s] remains [%u]", task->gid, qid_, name_.c_str(), + queue_->GetMapSize()); SetTimeoutMonitor(task); FFRT_COND_DO_ERR((task->handler_ == nullptr), break, "failed to run task, handler is nullptr"); QueueMonitor::GetInstance().UpdateQueueInfo(qid_, task->gid); diff --git a/src/queue/serial_looper.h b/src/queue/serial_looper.h index c593062..bb39c79 100644 --- a/src/queue/serial_looper.h +++ b/src/queue/serial_looper.h @@ -50,7 +50,7 @@ private: void SetTimeoutMonitor(ITask* task); void RunTimeOutCallback(ITask* task); - std::string name_ = "serial_queue_"; + std::string name_; std::atomic_bool isExit_ = {false}; task_handle handle; std::shared_ptr queue_; diff --git a/src/queue/serial_queue.cpp b/src/queue/serial_queue.cpp index ec723e2..d69a987 100644 --- a/src/queue/serial_queue.cpp +++ b/src/queue/serial_queue.cpp @@ -47,36 +47,40 @@ void SerialQueue::Quit() int SerialQueue::PushTask(ITask* task, uint64_t upTime) { - std::unique_lock lock(mutex_); FFRT_COND_DO_ERR((task == nullptr), return -1, "failed to push task, task is nullptr"); - whenMap_[upTime].emplace_back(task); - if (upTime == whenMap_.begin()->first) { - cond_.notify_all(); + { + std::unique_lock lock(mutex_); + whenMap_[upTime].emplace_back(task); + if (upTime == whenMap_.begin()->first) { + cond_.notify_all(); + } } - FFRT_LOGI("push serial task gid=%llu into qid=%u [%s] succ", task->gid, qid_, name_.c_str()); + FFRT_LOGI("push task gid=%llu to qid=%u [%s]", task->gid, qid_, name_.c_str()); return 0; } int SerialQueue::RemoveTask(const ITask* task) { - std::unique_lock lock(mutex_); FFRT_COND_DO_ERR((task == nullptr), return -1, "failed to remove task, task is nullptr"); - FFRT_LOGI("remove serial task gid=%llu of qid=%u [%s] enter", task->gid, qid_, name_.c_str()); - for (auto it = whenMap_.begin(); it != whenMap_.end();) { - for (auto itList = it->second.begin(); itList != it->second.end();) { - if ((*itList) != task) { - itList++; - continue; + FFRT_LOGI("cancel task gid=%llu of qid=%u [%s]", task->gid, qid_, name_.c_str()); + { + std::unique_lock lock(mutex_); + for (auto it = whenMap_.begin(); it != whenMap_.end();) { + for (auto itList = it->second.begin(); itList != it->second.end();) { + if ((*itList) != task) { + itList++; + continue; + } + it->second.erase(itList++); + // a task can be submitted only once through the C interface + return 0; } - it->second.erase(itList++); - // a task can be submitted only once through the C interface - return 0; - } - if (it->second.empty()) { - whenMap_.erase(it++); - } else { - it++; + if (it->second.empty()) { + whenMap_.erase(it++); + } else { + it++; + } } } FFRT_LOGD("remove serial task gid=%llu of [%s] failed, task not waiting in queue", task->gid, name_.c_str()); @@ -110,8 +114,7 @@ ITask* SerialQueue::Next() if (it->second.empty()) { (void)whenMap_.erase(it); } - FFRT_LOGI("get next serial task gid=%llu, qid=%u [%s] contains [%u] other timestamps", nextTask->gid, qid_, - name_.c_str(), whenMap_.size()); + mapSize_.store(whenMap_.size()); return nextTask; } else { uint64_t diff = it->first - now; diff --git a/src/queue/serial_queue.h b/src/queue/serial_queue.h index eed0bf5..7761c01 100644 --- a/src/queue/serial_queue.h +++ b/src/queue/serial_queue.h @@ -28,6 +28,11 @@ public: SerialQueue(const uint32_t qid, const std::string& name) : qid_(qid), name_(name) {} ~SerialQueue(); + inline uint32_t GetMapSize() const + { + return mapSize_.load(); + } + ITask* Next(); int PushTask(ITask* task, uint64_t upTime); int RemoveTask(const ITask* task); @@ -39,6 +44,7 @@ private: bool isExit_ = false; const uint32_t qid_; std::string name_; + std::atomic_uint32_t mapSize_ = {0}; std::map> whenMap_; }; } // namespace ffrt