!979 使用threadsafe_function 替换uv_work part2

Merge pull request !979 from CheerfulRicky/work
This commit is contained in:
openharmony_ci 2024-06-06 09:17:03 +00:00 committed by Gitee
commit 0f59fb150f
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
3 changed files with 22 additions and 40 deletions

View File

@ -58,6 +58,7 @@ struct AsyncCallbackInfoSubscribe {
napi_ref callback = nullptr;
std::shared_ptr<SubscriberInstance> subscriber = nullptr;
int8_t errorCode = NO_ERROR;
napi_threadsafe_function tsfn = nullptr;
};
struct AsyncCallbackInfoUnsubscribe {
@ -215,6 +216,7 @@ public:
void SetEnv(const napi_env &env);
void SetCallbackRef(const napi_ref &ref);
void SetThreadSafeFunction(const napi_threadsafe_function &tsfn);
unsigned long long GetID();
private:
@ -223,6 +225,7 @@ private:
std::shared_ptr<bool> valid_;
std::atomic_ullong id_;
static std::atomic_ullong subscriberID_;
napi_threadsafe_function tsfn_ = nullptr;
};
class SubscriberInstanceWrapper {

View File

@ -54,6 +54,7 @@ SubscriberInstance::~SubscriberInstance()
{
EVENT_LOGD("destructor SubscriberInstance[%{public}llu]", id_.load());
*valid_ = false;
napi_release_threadsafe_function(tsfn_, napi_tsfn_release);
}
unsigned long long SubscriberInstance::GetID()
@ -130,18 +131,12 @@ napi_value SetCommonEventData(const CommonEventDataWorker *commonEventDataWorker
return NapiGetNull(commonEventDataWorkerData->env);
}
void UvQueueWorkOnReceiveEvent(uv_work_t *work, int status)
void ThreadSafeCallback(napi_env env, napi_value jsCallback, void* context, void* data)
{
EVENT_LOGD("OnReceiveEvent uv_work_t start");
if (work == nullptr) {
EVENT_LOGE("work is nullptr");
return;
}
CommonEventDataWorker *commonEventDataWorkerData = static_cast<CommonEventDataWorker *>(work->data);
CommonEventDataWorker *commonEventDataWorkerData = static_cast<CommonEventDataWorker *>(data);
if (commonEventDataWorkerData == nullptr) {
EVENT_LOGE("OnReceiveEvent commonEventDataWorkerData is nullptr");
delete work;
work = nullptr;
return;
}
if (commonEventDataWorkerData->ref == nullptr ||
@ -149,8 +144,6 @@ void UvQueueWorkOnReceiveEvent(uv_work_t *work, int status)
EVENT_LOGE("OnReceiveEvent commonEventDataWorkerData ref is null or invalid which may be previously released");
delete commonEventDataWorkerData;
commonEventDataWorkerData = nullptr;
delete work;
work = nullptr;
return;
}
napi_handle_scope scope;
@ -161,8 +154,6 @@ void UvQueueWorkOnReceiveEvent(uv_work_t *work, int status)
if (SetCommonEventData(commonEventDataWorkerData, result) == nullptr) {
EVENT_LOGE("failed to set common event data");
napi_close_handle_scope(commonEventDataWorkerData->env, scope);
delete work;
work = nullptr;
delete commonEventDataWorkerData;
commonEventDataWorkerData = nullptr;
return;
@ -184,30 +175,14 @@ void UvQueueWorkOnReceiveEvent(uv_work_t *work, int status)
napi_close_handle_scope(commonEventDataWorkerData->env, scope);
delete commonEventDataWorkerData;
commonEventDataWorkerData = nullptr;
delete work;
work = nullptr;
}
void SubscriberInstance::OnReceiveEvent(const CommonEventData &data)
{
EVENT_LOGD("OnReceiveEvent start");
uv_loop_s *loop = nullptr;
napi_get_uv_event_loop(env_, &loop);
if (loop == nullptr) {
EVENT_LOGE("loop instance is nullptr");
return;
}
uv_work_t *work = new (std::nothrow) uv_work_t;
if (work == nullptr) {
EVENT_LOGE("work is null");
return;
}
CommonEventDataWorker *commonEventDataWorker = new (std::nothrow) CommonEventDataWorker();
if (commonEventDataWorker == nullptr) {
EVENT_LOGE("commonEventDataWorker is null");
delete work;
work = nullptr;
return;
}
commonEventDataWorker->want = data.GetWant();
@ -218,8 +193,6 @@ void SubscriberInstance::OnReceiveEvent(const CommonEventData &data)
commonEventDataWorker->ref = ref_;
commonEventDataWorker->valid = valid_;
work->data = reinterpret_cast<void *>(commonEventDataWorker);
if (this->IsOrderedCommonEvent()) {
EVENT_LOGD("IsOrderedCommonEvent is true");
std::lock_guard<std::mutex> lock(subscriberInsMutex);
@ -232,18 +205,17 @@ void SubscriberInstance::OnReceiveEvent(const CommonEventData &data)
}
}
int ret = uv_queue_work_with_qos(loop, work, [](uv_work_t *work) {},
UvQueueWorkOnReceiveEvent, uv_qos_user_initiated);
if (ret != 0) {
EVENT_LOGE("failed to insert work into queue");
delete commonEventDataWorker;
commonEventDataWorker = nullptr;
delete work;
work = nullptr;
}
napi_acquire_threadsafe_function(tsfn_);
napi_call_threadsafe_function(tsfn_, commonEventDataWorker, napi_tsfn_nonblocking);
napi_release_threadsafe_function(tsfn_, napi_tsfn_release);
EVENT_LOGD("OnReceiveEvent end");
}
void ThreadFinished(napi_env env, void* data, [[maybe_unused]] void* context)
{
EVENT_LOGD("ThreadFinished");
}
napi_value ParseParametersByGetSubscribeInfo(
const napi_env &env, const size_t &argc, const napi_value (&argv)[1], napi_ref &callback)
{
@ -1309,7 +1281,8 @@ napi_value Subscribe(napi_env env, napi_callback_info info)
EVENT_LOGD("Create subscribe string.");
napi_value resourceName = nullptr;
napi_create_string_latin1(env, "Subscribe", NAPI_AUTO_LENGTH, &resourceName);
napi_create_threadsafe_function(env, argv[1], nullptr, resourceName, 0, 1, asyncCallbackInfo->callback,
ThreadFinished, nullptr, ThreadSafeCallback, &(asyncCallbackInfo->tsfn));
// Asynchronous function call
napi_create_async_work(env,
nullptr,
@ -1323,6 +1296,7 @@ napi_value Subscribe(napi_env env, napi_callback_info info)
}
asyncCallbackInfo->subscriber->SetEnv(env);
asyncCallbackInfo->subscriber->SetCallbackRef(asyncCallbackInfo->callback);
asyncCallbackInfo->subscriber->SetThreadSafeFunction(asyncCallbackInfo->tsfn);
asyncCallbackInfo->errorCode = CommonEventManager::SubscribeCommonEvent(asyncCallbackInfo->subscriber) ?
NO_ERROR : ERR_CES_FAILED;
},

View File

@ -356,6 +356,11 @@ void SubscriberInstance::SetCallbackRef(const napi_ref &ref)
*valid_ = ref_ != nullptr ? true : false;
}
void SubscriberInstance::SetThreadSafeFunction(const napi_threadsafe_function &tsfn)
{
tsfn_ = tsfn;
}
void SetCallback(const napi_env &env, const napi_ref &callbackIn, const int8_t &errorCode, const napi_value &result)
{
napi_value undefined = nullptr;