!90 reduce the queue_submit switchover probability

Merge pull request !90 from 盛夏/master
This commit is contained in:
openharmony_ci 2023-10-27 03:32:48 +00:00 committed by Gitee
commit 030494dfe2
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
4 changed files with 34 additions and 23 deletions

View File

@ -83,6 +83,8 @@ void SerialLooper::Run()
while (!isExit_.load()) {
ITask* task = queue_->Next();
if (task) {
FFRT_LOGI("pick task gid=%llu, qid=%u [%s] remains [%u]", task->gid, qid_, name_.c_str(),
queue_->GetMapSize());
SetTimeoutMonitor(task);
FFRT_COND_DO_ERR((task->handler_ == nullptr), break, "failed to run task, handler is nullptr");
QueueMonitor::GetInstance().UpdateQueueInfo(qid_, task->gid);

View File

@ -50,7 +50,7 @@ private:
void SetTimeoutMonitor(ITask* task);
void RunTimeOutCallback(ITask* task);
std::string name_ = "serial_queue_";
std::string name_;
std::atomic_bool isExit_ = {false};
task_handle handle;
std::shared_ptr<SerialQueue> queue_;

View File

@ -47,21 +47,24 @@ void SerialQueue::Quit()
int SerialQueue::PushTask(ITask* task, uint64_t upTime)
{
std::unique_lock lock(mutex_);
FFRT_COND_DO_ERR((task == nullptr), return -1, "failed to push task, task is nullptr");
{
std::unique_lock lock(mutex_);
whenMap_[upTime].emplace_back(task);
if (upTime == whenMap_.begin()->first) {
cond_.notify_all();
}
FFRT_LOGI("push serial task gid=%llu into qid=%u [%s] succ", task->gid, qid_, name_.c_str());
}
FFRT_LOGI("push task gid=%llu to qid=%u [%s]", task->gid, qid_, name_.c_str());
return 0;
}
int SerialQueue::RemoveTask(const ITask* task)
{
std::unique_lock lock(mutex_);
FFRT_COND_DO_ERR((task == nullptr), return -1, "failed to remove task, task is nullptr");
FFRT_LOGI("remove serial task gid=%llu of qid=%u [%s] enter", task->gid, qid_, name_.c_str());
FFRT_LOGI("cancel task gid=%llu of qid=%u [%s]", task->gid, qid_, name_.c_str());
{
std::unique_lock lock(mutex_);
for (auto it = whenMap_.begin(); it != whenMap_.end();) {
for (auto itList = it->second.begin(); itList != it->second.end();) {
if ((*itList) != task) {
@ -79,6 +82,7 @@ int SerialQueue::RemoveTask(const ITask* task)
it++;
}
}
}
FFRT_LOGD("remove serial task gid=%llu of [%s] failed, task not waiting in queue", task->gid, name_.c_str());
return 1;
}
@ -110,8 +114,7 @@ ITask* SerialQueue::Next()
if (it->second.empty()) {
(void)whenMap_.erase(it);
}
FFRT_LOGI("get next serial task gid=%llu, qid=%u [%s] contains [%u] other timestamps", nextTask->gid, qid_,
name_.c_str(), whenMap_.size());
mapSize_.store(whenMap_.size());
return nextTask;
} else {
uint64_t diff = it->first - now;

View File

@ -28,6 +28,11 @@ public:
SerialQueue(const uint32_t qid, const std::string& name) : qid_(qid), name_(name) {}
~SerialQueue();
inline uint32_t GetMapSize() const
{
return mapSize_.load();
}
ITask* Next();
int PushTask(ITask* task, uint64_t upTime);
int RemoveTask(const ITask* task);
@ -39,6 +44,7 @@ private:
bool isExit_ = false;
const uint32_t qid_;
std::string name_;
std::atomic_uint32_t mapSize_ = {0};
std::map<uint64_t, std::list<ITask*>> whenMap_;
};
} // namespace ffrt