fix taskpool lock 2

Signed-off-by: zhuruigan <zhuruigan@huawei.com>
Change-Id: Ie930fa75e89dce84626f77b20f312f1ee5f54311
This commit is contained in:
zhuruigan 2024-11-11 10:56:12 +08:00
parent 27ecf90cec
commit 656fcfa38c
4 changed files with 125 additions and 86 deletions

View File

@ -1095,17 +1095,22 @@ bool Task::UpdateTask(uint64_t startTime, void* worker)
napi_value Task::DeserializeValue(napi_env env, napi_value* func, napi_value* args)
{
std::lock_guard<RECURSIVE_MUTEX> lock(taskMutex_);
if (UNLIKELY(currentTaskInfo_ == nullptr)) {
HILOG_ERROR("taskpool:: the currentTaskInfo is nullptr, the task may have been cancelled");
return nullptr;
void* serializationFunction = nullptr;
void* serializationArguments = nullptr;
{
std::lock_guard<RECURSIVE_MUTEX> lock(taskMutex_);
if (UNLIKELY(currentTaskInfo_ == nullptr)) {
HILOG_ERROR("taskpool:: the currentTaskInfo is nullptr, the task may have been cancelled");
return nullptr;
}
serializationFunction = currentTaskInfo_->serializationFunction;
serializationArguments = currentTaskInfo_->serializationArguments;
}
napi_status status = napi_ok;
std::string errMessage = "";
status = napi_deserialize(env, currentTaskInfo_->serializationFunction, func);
status = napi_deserialize(env, serializationFunction, func);
if (!IsGroupFunctionTask()) {
napi_delete_serialization_data(env, currentTaskInfo_->serializationFunction);
napi_delete_serialization_data(env, serializationFunction);
}
if (status != napi_ok || func == nullptr) {
errMessage = "taskpool:: failed to deserialize function.";
@ -1115,9 +1120,9 @@ napi_value Task::DeserializeValue(napi_env env, napi_value* func, napi_value* ar
return err;
}
status = napi_deserialize(env, currentTaskInfo_->serializationArguments, args);
status = napi_deserialize(env, serializationArguments, args);
if (!IsGroupFunctionTask()) {
napi_delete_serialization_data(env, currentTaskInfo_->serializationArguments);
napi_delete_serialization_data(env, serializationArguments);
}
if (status != napi_ok || args == nullptr) {
errMessage = "taskpool:: failed to deserialize function.";
@ -1354,24 +1359,28 @@ void Task::InitHandle(napi_env env)
void Task::ClearDelayedTimers()
{
HILOG_DEBUG("taskpool:: task ClearDelayedTimers");
std::lock_guard<RECURSIVE_MUTEX> lock(taskMutex_);
TaskMessage *taskMessage = nullptr;
for (auto t: delayedTimers_) {
if (t == nullptr) {
continue;
std::list<napi_deferred> deferreds {};
{
std::lock_guard<RECURSIVE_MUTEX> lock(taskMutex_);
TaskMessage *taskMessage = nullptr;
for (auto t: delayedTimers_) {
if (t == nullptr) {
continue;
}
taskMessage = static_cast<TaskMessage *>(t->data);
deferreds.push_back(taskMessage->deferred);
uv_timer_stop(t);
uv_close(reinterpret_cast<uv_handle_t*>(t), [](uv_handle_t* handle) {
delete (uv_timer_t*)handle;
handle = nullptr;
});
delete taskMessage;
taskMessage = nullptr;
}
taskMessage = static_cast<TaskMessage *>(t->data);
napi_value error = ErrorHelper::NewError(env_, 0, "taskpool:: task has been canceled");
napi_reject_deferred(env_, taskMessage->deferred, error);
uv_timer_stop(t);
uv_close(reinterpret_cast<uv_handle_t*>(t), [](uv_handle_t* handle) {
delete (uv_timer_t*)handle;
handle = nullptr;
});
delete taskMessage;
taskMessage = nullptr;
delayedTimers_.clear();
}
delayedTimers_.clear();
std::string error = "taskpool:: task has been canceled";
TaskManager::GetInstance().BatchRejectDeferred(env_, deferreds, error);
}
bool Task::VerifyAndPostResult(Priority priority)

View File

@ -174,23 +174,27 @@ void TaskGroup::NotifyGroupTask(napi_env env)
void TaskGroup::CancelPendingGroup(napi_env env)
{
HILOG_DEBUG("taskpool:: CancelPendingGroup");
if (pendingGroupInfos_.empty()) {
return;
}
napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: taskGroup has been canceled");
auto pendingIter = pendingGroupInfos_.begin();
auto engine = reinterpret_cast<NativeEngine*>(env);
for (; pendingIter != pendingGroupInfos_.end(); ++pendingIter) {
for (size_t i = 0; i < taskIds_.size(); i++) {
engine->DecreaseSubEnvCounter();
std::list<napi_deferred> deferreds {};
{
std::lock_guard<RECURSIVE_MUTEX> lock(taskGroupMutex_);
if (pendingGroupInfos_.empty()) {
return;
}
GroupInfo* info = *pendingIter;
napi_reject_deferred(env, info->deferred, error);
napi_reference_unref(env, groupRef_, nullptr);
delete info;
auto pendingIter = pendingGroupInfos_.begin();
auto engine = reinterpret_cast<NativeEngine*>(env);
for (; pendingIter != pendingGroupInfos_.end(); ++pendingIter) {
for (size_t i = 0; i < taskIds_.size(); i++) {
engine->DecreaseSubEnvCounter();
}
GroupInfo* info = *pendingIter;
deferreds.push_back(info->deferred);
napi_reference_unref(env, groupRef_, nullptr);
delete info;
}
pendingIter = pendingGroupInfos_.begin();
pendingGroupInfos_.erase(pendingIter, pendingGroupInfos_.end());
}
pendingIter = pendingGroupInfos_.begin();
pendingGroupInfos_.erase(pendingIter, pendingGroupInfos_.end());
TaskManager::GetInstance().BatchRejectDeferred(env, deferreds, "taskpool:: taskGroup has been canceled");
}
void TaskGroup::CancelGroupTask(napi_env env, uint64_t taskId)

View File

@ -618,8 +618,6 @@ void TaskManager::CancelTask(napi_env env, uint64_t taskId)
}
return taskGroup->CancelGroupTask(env, task->taskId_);
}
std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
if (task->IsPeriodicTask()) {
napi_reference_unref(env, task->taskRef_, nullptr);
task->CancelPendingTask(env);
@ -630,28 +628,35 @@ void TaskManager::CancelTask(napi_env env, uint64_t taskId)
CancelSeqRunnerTask(env, task);
return;
}
if ((task->currentTaskInfo_ == nullptr && task->taskState_ != ExecuteState::DELAYED) ||
task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED ||
task->taskState_ == ExecuteState::ENDING) {
std::string errMsg = "taskpool:: task is not executed or has been executed";
HILOG_ERROR("%{public}s", errMsg.c_str());
ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
return;
{
std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
if ((task->currentTaskInfo_ == nullptr && task->taskState_ != ExecuteState::DELAYED) ||
task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED ||
task->taskState_ == ExecuteState::ENDING) {
std::string errMsg = "taskpool:: task is not executed or has been executed";
HILOG_ERROR("%{public}s", errMsg.c_str());
ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
return;
}
}
task->ClearDelayedTimers();
ExecuteState state = task->taskState_.exchange(ExecuteState::CANCELED);
task->CancelPendingTask(env);
if (state == ExecuteState::WAITING && task->currentTaskInfo_ != nullptr) {
reinterpret_cast<NativeEngine*>(env)->DecreaseSubEnvCounter();
task->DecreaseTaskRefCount();
EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority);
napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: task has been canceled");
napi_reject_deferred(env, task->currentTaskInfo_->deferred, error);
napi_reference_unref(env, task->taskRef_, nullptr);
delete task->currentTaskInfo_;
task->currentTaskInfo_ = nullptr;
std::list<napi_deferred> deferreds {};
{
std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
if (state == ExecuteState::WAITING && task->currentTaskInfo_ != nullptr) {
reinterpret_cast<NativeEngine*>(env)->DecreaseSubEnvCounter();
task->DecreaseTaskRefCount();
EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority);
deferreds.push_back(task->currentTaskInfo_->deferred);
napi_reference_unref(env, task->taskRef_, nullptr);
delete task->currentTaskInfo_;
task->currentTaskInfo_ = nullptr;
}
}
std::string error = "taskpool:: task has been canceled";
BatchRejectDeferred(env, deferreds, error);
}
void TaskManager::CancelSeqRunnerTask(napi_env env, Task *task)
@ -1372,6 +1377,7 @@ void TaskManager::ReleaseTaskData(napi_env env, Task* task, bool shouldDeleteTas
void TaskManager::ReleaseCallBackInfo(Task* task)
{
HILOG_DEBUG("taskpool:: ReleaseCallBackInfo task:%{public}s", std::to_string(task->taskId_).c_str());
std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
if (task->onEnqueuedCallBackInfo_ != nullptr) {
delete task->onEnqueuedCallBackInfo_;
task->onEnqueuedCallBackInfo_ = nullptr;
@ -1478,6 +1484,17 @@ bool TaskManager::CheckTask(uint64_t taskId)
return item != tasks_.end();
}
void TaskManager::BatchRejectDeferred(napi_env env, std::list<napi_deferred> deferreds, std::string error)
{
if (deferreds.empty()) {
return;
}
napi_value message = ErrorHelper::NewError(env, 0, error.c_str());
for (auto deferred : deferreds) {
napi_reject_deferred(env, deferred, message);
}
}
// ----------------------------------- TaskGroupManager ----------------------------------------
TaskGroupManager& TaskGroupManager::GetInstance()
{
@ -1507,19 +1524,20 @@ void TaskGroupManager::ReleaseTaskGroupData(napi_env env, TaskGroup* group)
{
HILOG_DEBUG("taskpool:: ReleaseTaskGroupData group");
TaskGroupManager::GetInstance().RemoveTaskGroup(group->groupId_);
std::lock_guard<RECURSIVE_MUTEX> lock(group->taskGroupMutex_);
for (uint64_t taskId : group->taskIds_) {
Task* task = TaskManager::GetInstance().GetTask(taskId);
if (task == nullptr || !task->IsValid()) {
continue;
{
std::lock_guard<RECURSIVE_MUTEX> lock(group->taskGroupMutex_);
for (uint64_t taskId : group->taskIds_) {
Task* task = TaskManager::GetInstance().GetTask(taskId);
if (task == nullptr || !task->IsValid()) {
continue;
}
napi_reference_unref(task->env_, task->taskRef_, nullptr);
}
napi_reference_unref(task->env_, task->taskRef_, nullptr);
}
if (group->currentGroupInfo_ != nullptr) {
delete group->currentGroupInfo_;
if (group->currentGroupInfo_ != nullptr) {
delete group->currentGroupInfo_;
}
}
group->CancelPendingGroup(env);
}
@ -1536,17 +1554,20 @@ void TaskGroupManager::CancelGroup(napi_env env, uint64_t groupId)
if (taskGroup->groupState_ == ExecuteState::CANCELED) {
return;
}
std::lock_guard<RECURSIVE_MUTEX> lock(taskGroup->taskGroupMutex_);
if (taskGroup->currentGroupInfo_ == nullptr || taskGroup->groupState_ == ExecuteState::NOT_FOUND ||
taskGroup->groupState_ == ExecuteState::FINISHED) {
std::string errMsg = "taskpool:: taskGroup is not executed or has been executed";
HILOG_ERROR("%{public}s", errMsg.c_str());
ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK_GROUP, errMsg.c_str());
return;
{
std::lock_guard<RECURSIVE_MUTEX> lock(taskGroup->taskGroupMutex_);
if (taskGroup->currentGroupInfo_ == nullptr || taskGroup->groupState_ == ExecuteState::NOT_FOUND ||
taskGroup->groupState_ == ExecuteState::FINISHED) {
std::string errMsg = "taskpool:: taskGroup is not executed or has been executed";
HILOG_ERROR("%{public}s", errMsg.c_str());
ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK_GROUP, errMsg.c_str());
return;
}
}
ExecuteState groupState = taskGroup->groupState_;
taskGroup->groupState_ = ExecuteState::CANCELED;
taskGroup->CancelPendingGroup(env);
std::lock_guard<RECURSIVE_MUTEX> lock(taskGroup->taskGroupMutex_);
if (taskGroup->currentGroupInfo_->finishedTaskNum != taskGroup->taskNum_) {
for (uint64_t taskId : taskGroup->taskIds_) {
CancelGroupTask(env, taskId, taskGroup);
@ -1641,40 +1662,44 @@ bool TaskGroupManager::TriggerSeqRunner(napi_env env, Task* lastTask)
HILOG_ERROR("seqRunner:: only front task can trigger seqRunner.");
return false;
}
std::list<napi_deferred> deferreds {};
{
std::unique_lock<std::shared_mutex> lock(seqRunner->seqRunnerMutex_);
if (seqRunner->seqRunnerTasks_.empty()) {
HILOG_DEBUG("seqRunner:: seqRunner %{public}s empty.", std::to_string(seqRunnerId).c_str());
seqRunner->currentTaskId_ = 0;
return true;
}
Task* task = seqRunner->seqRunnerTasks_.front();
seqRunner->seqRunnerTasks_.pop();
bool isEmpty = false;
while (task->taskState_ == ExecuteState::CANCELED) {
deferreds.push_back(task->currentTaskInfo_->deferred);
DisposeCanceledTask(env, task);
if (seqRunner->seqRunnerTasks_.empty()) {
HILOG_DEBUG("seqRunner:: seqRunner %{public}s empty in cancel loop.",
std::to_string(seqRunnerId).c_str());
seqRunner->currentTaskId_ = 0;
return true;
isEmpty = true;
break;
}
task = seqRunner->seqRunnerTasks_.front();
seqRunner->seqRunnerTasks_.pop();
}
seqRunner->currentTaskId_ = task->taskId_;
task->IncreaseRefCount();
task->taskState_ = ExecuteState::WAITING;
HILOG_DEBUG("seqRunner:: Trigger task %{public}s in seqRunner %{public}s.",
std::to_string(task->taskId_).c_str(), std::to_string(seqRunnerId).c_str());
TaskManager::GetInstance().EnqueueTaskId(task->taskId_, seqRunner->priority_);
if (!isEmpty) {
seqRunner->currentTaskId_ = task->taskId_;
task->IncreaseRefCount();
task->taskState_ = ExecuteState::WAITING;
HILOG_DEBUG("seqRunner:: Trigger task %{public}s in seqRunner %{public}s.",
std::to_string(task->taskId_).c_str(), std::to_string(seqRunnerId).c_str());
TaskManager::GetInstance().EnqueueTaskId(task->taskId_, seqRunner->priority_);
}
}
TaskManager::GetInstance().BatchRejectDeferred(env, deferreds, "taskpool:: sequenceRunner task has been canceled");
return true;
}
void TaskGroupManager::DisposeCanceledTask(napi_env env, Task* task)
{
napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: sequenceRunner task has been canceled");
napi_reject_deferred(env, task->currentTaskInfo_->deferred, error);
reinterpret_cast<NativeEngine*>(env)->DecreaseSubEnvCounter();
napi_reference_unref(env, task->taskRef_, nullptr);
delete task->currentTaskInfo_;

View File

@ -147,6 +147,7 @@ public:
}
bool CheckTask(uint64_t taskId);
void BatchRejectDeferred(napi_env env, std::list<napi_deferred> deferreds, std::string error);
private:
TaskManager();