diff --git a/adapter/BUILD.gn b/adapter/BUILD.gn index c9c983c8f..a3e9582a8 100644 --- a/adapter/BUILD.gn +++ b/adapter/BUILD.gn @@ -296,6 +296,7 @@ if (defined(ohos_lite)) { public_external_deps += [ "hitrace:libhitracechain" ] if (is_standard_system) { external_deps += [ + "ffrt:libffrt", "hilog:libhilog", "hisysevent:libhisysevent", "hitrace:libhitracechain", diff --git a/adapter/common/include/softbus_adapter_timer.h b/adapter/common/include/softbus_adapter_timer.h index c01f1708c..b619fd3eb 100644 --- a/adapter/common/include/softbus_adapter_timer.h +++ b/adapter/common/include/softbus_adapter_timer.h @@ -17,6 +17,7 @@ #define SOFTBUS_ADAPTER_TIMER_H #include +#include #ifdef __cplusplus extern "C" { @@ -34,6 +35,10 @@ void SetTimerFunc(TimerFunc func); void *SoftBusCreateTimer(void **timerId, unsigned int type); int SoftBusStartTimer(void *timerId, unsigned int tickets); int SoftBusDeleteTimer(void *timerId); +#ifdef SOFTBUS_STANDARD_OS +int32_t SoftBusStartTimerWithFfrt(int32_t *timerHandle, uint64_t timeout, bool repeat); +void SoftBusStopTimerWithFfrt(int32_t timerHandle); +#endif /* Sleep */ int SoftBusSleepMs(unsigned int ms); diff --git a/adapter/common/kernel/posix/softbus_adapter_timer.c b/adapter/common/kernel/posix/softbus_adapter_timer.c index 3244519b7..1ea893d1f 100644 --- a/adapter/common/kernel/posix/softbus_adapter_timer.c +++ b/adapter/common/kernel/posix/softbus_adapter_timer.c @@ -22,6 +22,10 @@ #include #include +#ifdef SOFTBUS_STANDARD_OS +#include "ffrt.h" +#endif + #include "comm_log.h" #include "securec.h" #include "softbus_def.h" @@ -49,6 +53,46 @@ __attribute__((no_sanitize("hwaddress"))) void SetTimerFunc(TimerFunc func) g_timerFunc = func; } +#ifdef SOFTBUS_STANDARD_OS +static void HandleTimeoutFunWithFfrt(void *data) +{ + (void)data; + if (g_timerFunc != NULL) { + g_timerFunc(); + } +} + +int32_t SoftBusStartTimerWithFfrt(int32_t *timerHandle, uint64_t timeout, bool repeat) +{ + if (timerHandle == NULL) { + COMM_LOGE(COMM_ADAPTER, "timerHandle is null"); + return SOFTBUS_INVALID_PARAM; + } + int32_t ret = ffrt_timer_start(ffrt_qos_default, timeout, NULL, HandleTimeoutFunWithFfrt, repeat); + if (ret == ffrt_error) { + COMM_LOGE(COMM_ADAPTER, "timer start with ffrt fail, ret=%{public}d", ret); + return SOFTBUS_TIMER_ERR; + } + *timerHandle = ret; + COMM_LOGI(COMM_ADAPTER, "timer start with ffrt succ, timerHandle=%{public}d", *timerHandle); + return SOFTBUS_OK; +} + +void SoftBusStopTimerWithFfrt(int32_t timerHandle) +{ + if (timerHandle < 0) { + COMM_LOGE(COMM_ADAPTER, "invalid timerHandle=%{public}d", timerHandle); + return; + } + int32_t ret = ffrt_timer_stop(ffrt_qos_default, timerHandle); + if (ret != ffrt_success) { + COMM_LOGE(COMM_ADAPTER, "timer stop with ffrt fail, timerHandle=%{public}d, ret=%{public}d", timerHandle, ret); + return; + } + COMM_LOGI(COMM_ADAPTER, "timer stop with ffrt succ, timerHandle=%{public}d", timerHandle); +} +#endif + void *SoftBusCreateTimer(void **timerId, unsigned int type) { if (timerId == NULL) { diff --git a/bundle.json b/bundle.json index 7a2a33019..afb46e7f7 100644 --- a/bundle.json +++ b/bundle.json @@ -87,7 +87,8 @@ "libcoap", "zlib", "libnl", - "power_manager" + "power_manager", + "ffrt" ], "third_party": [ "cJSON", diff --git a/core/common/BUILD.gn b/core/common/BUILD.gn index 271888c3b..f75f3ea9f 100644 --- a/core/common/BUILD.gn +++ b/core/common/BUILD.gn @@ -24,7 +24,6 @@ common_utils_src = [ "bitmap/softbus_bitmap.c", "network/softbus_network_utils.c", "json_utils/softbus_json_utils.c", - "message_handler/message_handler.c", "queue/softbus_queue.c", "security/sequence_verification/softbus_sequence_verification.c", "softbus_property/softbus_feature_config.c", @@ -89,6 +88,7 @@ if (defined(ohos_lite)) { dfx_src = [ "dfx/hisysevent_adapter/softbus_hisysevt_nstack_virtual.c" ] sources = common_utils_src sources += conn_common_src + trans_common_src + dfx_src + sources += [ "message_handler/message_handler.c" ] if (board_toolchain_type != "iccarm") { cflags = [ "-Wall", @@ -202,10 +202,14 @@ if (defined(ohos_lite)) { ] } if (is_standard_system) { + sources += [ "message_handler/message_handler_ffrt.cpp" ] external_deps += [ + "ffrt:libffrt", "hilog:libhilog", "hisysevent:libhisysevent", ] + } else { + sources += [ "message_handler/message_handler.c" ] } innerapi_tags = [ "platformsdk_indirect" ] part_name = "dsoftbus" diff --git a/core/common/dfx/hidumper_adapter/BUILD.gn b/core/common/dfx/hidumper_adapter/BUILD.gn index c6c4da179..7abea0978 100644 --- a/core/common/dfx/hidumper_adapter/BUILD.gn +++ b/core/common/dfx/hidumper_adapter/BUILD.gn @@ -99,7 +99,6 @@ if (defined(ohos_lite)) { include_dirs += trans_common_inc sources = [ "$dsoftbus_root_path/core/bus_center/utils/src/lnn_map.c", - "$dsoftbus_root_path/core/common/message_handler/message_handler.c", "$dsoftbus_root_path/core/common/softbus_property/softbus_feature_config.c", "$dsoftbus_root_path/core/common/utils/softbus_utils.c", "softbus_hidumper.c", @@ -141,6 +140,14 @@ if (defined(ohos_lite)) { "hisysevent:libhisysevent", "hisysevent:libhisyseventmanager", ] + if (is_standard_system) { + sources += [ "$dsoftbus_root_path/core/common/message_handler/message_handler_ffrt.cpp" ] + external_deps += [ "ffrt:libffrt" ] + } else { + sources += [ + "$dsoftbus_root_path/core/common/message_handler/message_handler.c", + ] + } innerapi_tags = [ "platformsdk_indirect" ] part_name = "dsoftbus" diff --git a/core/common/include/message_handler.h b/core/common/include/message_handler.h index 68fe07611..20aed64e7 100644 --- a/core/common/include/message_handler.h +++ b/core/common/include/message_handler.h @@ -27,9 +27,11 @@ typedef struct SoftBusMessage SoftBusMessage; typedef struct SoftBusHandler SoftBusHandler; typedef struct SoftBusLooperContext SoftBusLooperContext; typedef struct SoftBusLooper SoftBusLooper; +typedef struct FfrtMsgQueue MsgQueue; struct SoftBusLooper { SoftBusLooperContext *context; + MsgQueue *queue; bool dumpable; void (*PostMessage)(const SoftBusLooper *looper, SoftBusMessage *msg); void (*PostMessageDelay)(const SoftBusLooper *looper, SoftBusMessage *msg, uint64_t delayMillis); diff --git a/core/common/message_handler/message_handler.c b/core/common/message_handler/message_handler.c index d52a14394..b4cc95e35 100644 --- a/core/common/message_handler/message_handler.c +++ b/core/common/message_handler/message_handler.c @@ -33,6 +33,9 @@ static int8_t g_isNeedDestroy = 0; static int8_t g_isThreadStarted = 0; static uint32_t g_looperCnt = 0; +struct FfrtMsgQueue { +}; + typedef struct { SoftBusMessage *msg; ListNode node; diff --git a/core/common/message_handler/message_handler_ffrt.cpp b/core/common/message_handler/message_handler_ffrt.cpp new file mode 100644 index 000000000..1384ae48e --- /dev/null +++ b/core/common/message_handler/message_handler_ffrt.cpp @@ -0,0 +1,582 @@ +/* + * Copyright (c) 2024 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "message_handler.h" + +#include +#include +#include "ffrt.h" +#include "c/ffrt_ipc.h" + +#include "common_list.h" +#include "comm_log.h" +#include "softbus_adapter_mem.h" +#include "softbus_def.h" +#include "softbus_error_code.h" + +#define LOOP_NAME_LEN 16 +#define TIME_THOUSANDS_MULTIPLIER 1000LL +#define MAX_LOOPER_CNT 30U +#define MAX_LOOPER_PRINT_CNT 64 + +static std::atomic g_looperCnt(0); + +struct FfrtMsgQueue { + ffrt::queue *msgQueue; +}; + +typedef struct { + SoftBusMessage *msg; + ListNode node; + ffrt::task_handle *msgHandle; +} SoftBusMessageNode; + +struct SoftBusLooperContext { + ListNode msgHead; + char name[LOOP_NAME_LEN]; + SoftBusMessage *currentMsg; + unsigned int msgSize; + ffrt::mutex *mtx; + volatile bool stop; // destroys looper, stop = true +}; + +static int64_t UptimeMicros(void) +{ + SoftBusSysTime t = { + .sec = 0, + .usec = 0, + }; + SoftBusGetTime(&t); + int64_t when = t.sec * TIME_THOUSANDS_MULTIPLIER * TIME_THOUSANDS_MULTIPLIER + t.usec; + return when; +} + +NO_SANITIZE("cfi") static void FreeSoftBusMsg(SoftBusMessage *msg) +{ + if (msg->FreeMessage == nullptr) { + SoftBusFree(msg); + } else { + msg->FreeMessage(msg); + } +} + +SoftBusMessage *MallocMessage(void) +{ + SoftBusMessage *msg = static_cast(SoftBusCalloc(sizeof(SoftBusMessage))); + if (msg == nullptr) { + COMM_LOGE(COMM_UTILS, "malloc SoftBusMessage failed"); + return nullptr; + } + return msg; +} + +void FreeMessage(SoftBusMessage *msg) +{ + if (msg != nullptr) { + FreeSoftBusMsg(msg); + msg = nullptr; + } +} + +static void DumpLooperLocked(const SoftBusLooperContext *context) +{ + int32_t i = 0; + ListNode *item = nullptr; + LIST_FOR_EACH(item, &context->msgHead) { + SoftBusMessageNode *itemNode = LIST_ENTRY(item, SoftBusMessageNode, node); + SoftBusMessage *msg = itemNode->msg; + if (i > MAX_LOOPER_PRINT_CNT) { + COMM_LOGW(COMM_UTILS, "many messages left unprocessed, msgSize=%{public}u", + context->msgSize); + break; + } + COMM_LOGD(COMM_UTILS, + "DumpLooper. i=%{public}d, handler=%{public}s, what=%{public}" PRId32 ", arg1=%{public}" PRIu64 ", " + "arg2=%{public}" PRIu64 ", time=%{public}" PRId64 "", + i, msg->handler->name, msg->what, msg->arg1, msg->arg2, msg->time); + i++; + } +} + +void DumpLooper(const SoftBusLooper *looper) +{ + if (looper == nullptr) { + return; + } + SoftBusLooperContext *context = looper->context; + context->mtx->lock(); + if (looper->dumpable) { + DumpLooperLocked(context); + } + context->mtx->unlock(); +} + +struct LoopConfigItem { + int type; + SoftBusLooper *looper; +}; + +static struct LoopConfigItem g_loopConfig[] = { + {LOOP_TYPE_DEFAULT, nullptr}, + {LOOP_TYPE_CONN, nullptr}, + {LOOP_TYPE_LNN, nullptr}, +}; + +static void ReleaseLooper(const SoftBusLooper *looper) +{ + const uint32_t len = sizeof(g_loopConfig) / sizeof(struct LoopConfigItem); + for (uint32_t i = 0; i < len; i++) { + if (g_loopConfig[i].looper == looper) { + g_loopConfig[i].looper = nullptr; + return; + } + } +} + +static void DumpMsgInfo(const SoftBusMessage *msg) +{ + if (msg->handler == nullptr) { + return; + } + COMM_LOGD(COMM_UTILS, "DumpMsgInfo.handler=%{public}s, what=%{public}" PRId32 ", arg1=%{public}" PRIu64 ", " + "arg2=%{public}" PRIu64 ", time=%{public}" PRId64 "", + msg->handler->name, msg->what, msg->arg1, msg->arg2, msg->time); +} + +static int32_t GetMsgNodeFromContext(SoftBusMessageNode **msgNode, + const SoftBusMessage *tmpMsg, const SoftBusLooper *looper) +{ + looper->context->mtx->lock(); + if (looper->context->stop) { + COMM_LOGE(COMM_UTILS, "cancel handle with looper is stop, name=%{public}s", looper->context->name); + looper->context->mtx->unlock(); + return SOFTBUS_LOOPER_ERR; + } + ListNode *item = nullptr; + ListNode *nextItem = nullptr; + LIST_FOR_EACH_SAFE(item, nextItem, &(looper->context->msgHead)) { + SoftBusMessageNode *itemNode = LIST_ENTRY(item, SoftBusMessageNode, node); + SoftBusMessage *msg = itemNode->msg; + if (tmpMsg->what == msg->what && tmpMsg->arg1 == msg->arg1 && tmpMsg->arg2 == msg->arg2 && + tmpMsg->time == msg->time && tmpMsg->handler == msg->handler) { + ListDelete(&itemNode->node); + *msgNode = itemNode; + looper->context->msgSize--; + looper->context->mtx->unlock(); + return SOFTBUS_OK; + } + } + COMM_LOGE(COMM_UTILS, "no get correct msg from context, time=%{public}" PRId64"", tmpMsg->time); + looper->context->mtx->unlock(); + return SOFTBUS_LOOPER_ERR; +} + +static int32_t SubmitMsgToFfrt(SoftBusMessageNode *msgNode, const SoftBusLooper *looper, uint64_t delayMicros) +{ + msgNode->msgHandle = new (std::nothrow)ffrt::task_handle(); + if (msgNode->msgHandle == nullptr) { + COMM_LOGE(COMM_UTILS, "ffrt msgHandle SoftBusCalloc fail"); + return SOFTBUS_MALLOC_ERR; + } + SoftBusMessage tmpMsg = { + .what = msgNode->msg->what, + .arg1 = msgNode->msg->arg1, + .arg2 = msgNode->msg->arg2, + .time = msgNode->msg->time, + .handler = msgNode->msg->handler, + }; + *(msgNode->msgHandle) = looper->queue->msgQueue->submit_h([tmpMsg, looper] { + ffrt_this_task_set_legacy_mode(true); + if (looper == nullptr || looper->context == nullptr) { + COMM_LOGE(COMM_UTILS, "invalid looper para when handle"); + return; + } + SoftBusMessageNode *currentMsgNode = nullptr; + if (GetMsgNodeFromContext(¤tMsgNode, &tmpMsg, looper) != SOFTBUS_OK) { + COMM_LOGE(COMM_UTILS, "get currentMsgNode from context fail"); + return; + } + SoftBusMessage *currentMsg = currentMsgNode->msg; + if (currentMsg->handler != nullptr && currentMsg->handler->HandleMessage != nullptr) { + DumpMsgInfo(currentMsg); + currentMsg->handler->HandleMessage(currentMsg); + } else { + COMM_LOGE(COMM_UTILS, "handler is null when handle msg, name=%{public}s", looper->context->name); + } + FreeSoftBusMsg(currentMsg); + delete (currentMsgNode->msgHandle); + SoftBusFree(currentMsgNode); + }, ffrt::task_attr().delay(delayMicros)); + return SOFTBUS_OK; +} + +static void InsertMsgWithTime(SoftBusLooperContext *context, SoftBusMessageNode *msgNode) +{ + ListNode *item = nullptr; + ListNode *nextItem = nullptr; + bool insert = false; + LIST_FOR_EACH_SAFE(item, nextItem, &context->msgHead) { + SoftBusMessageNode *itemNode = LIST_ENTRY(item, SoftBusMessageNode, node); + if (itemNode->msg->time > msgNode->msg->time) { + ListTailInsert(item, &(msgNode->node)); + insert = true; + break; + } + } + if (!insert) { + ListTailInsert(&(context->msgHead), &(msgNode->node)); + } +} + +static void PostMessageWithFfrt(const SoftBusLooper *looper, SoftBusMessage *msg, uint64_t delayMicros) +{ + SoftBusMessageNode *msgNode = static_cast(SoftBusCalloc(sizeof(SoftBusMessageNode))); + if (msgNode == nullptr) { + COMM_LOGE(COMM_UTILS, "message node malloc failed"); + FreeSoftBusMsg(msg); + return; + } + ListInit(&msgNode->node); + msgNode->msg = msg; + SoftBusLooperContext *context = looper->context; + context->mtx->lock(); + if (context->stop) { + FreeSoftBusMsg(msg); + SoftBusFree(msgNode); + COMM_LOGE(COMM_UTILS, "cancel post with looper is stop, name=%{public}s", context->name); + context->mtx->unlock(); + return; + } + if (SubmitMsgToFfrt(msgNode, looper, delayMicros) != SOFTBUS_OK) { + FreeSoftBusMsg(msg); + SoftBusFree(msgNode); + COMM_LOGE(COMM_UTILS, "submit msg to ffrt fail, name=%{public}s", context->name); + context->mtx->unlock(); + return; + } + InsertMsgWithTime(context, msgNode); + context->msgSize++; + if (looper->dumpable) { + DumpLooperLocked(context); + } + context->mtx->unlock(); +} + +static void RemoveMessageWithFfrt(const SoftBusLooper *looper, const SoftBusHandler *handler, + int (*customFunc)(const SoftBusMessage*, void*), void *args) +{ + SoftBusLooperContext *context = looper->context; + context->mtx->lock(); + if (context->stop) { + COMM_LOGE(COMM_UTILS, "cancel remove with looper is stop, name=%{public}s", context->name); + context->mtx->unlock(); + return; + } + ListNode *item = nullptr; + ListNode *nextItem = nullptr; + LIST_FOR_EACH_SAFE(item, nextItem, &context->msgHead) { + SoftBusMessageNode *itemNode = LIST_ENTRY(item, SoftBusMessageNode, node); + SoftBusMessage *msg = itemNode->msg; + if (msg->handler == handler && customFunc(msg, args) == 0) { + looper->queue->msgQueue->cancel(*(itemNode->msgHandle)); // cancel fail when task is handling + COMM_LOGD(COMM_UTILS, "remove msg with ffrt succ, time=%{public}" PRId64"", msg->time); + FreeSoftBusMsg(msg); + ListDelete(&itemNode->node); + delete (itemNode->msgHandle); + SoftBusFree(itemNode); + context->msgSize--; + } + } + context->mtx->unlock(); +} + +static void DestroyLooperWithFfrt(SoftBusLooper *looper) +{ + SoftBusLooperContext *context = looper->context; + if (context != nullptr) { + context->mtx->lock(); + context->stop = true; + COMM_LOGI(COMM_UTILS, "looper is stop, name=%{public}s", looper->context->name); + context->mtx->unlock(); + delete (looper->queue->msgQueue); //if task is handling when delete, it will return after handle; + ListNode *item = nullptr; + ListNode *nextItem = nullptr; + LIST_FOR_EACH_SAFE(item, nextItem, &context->msgHead) { + SoftBusMessageNode *itemNode = LIST_ENTRY(item, SoftBusMessageNode, node); + SoftBusMessage *msg = itemNode->msg; + FreeSoftBusMsg(msg); + ListDelete(&itemNode->node); + delete (itemNode->msgHandle); + SoftBusFree(itemNode); + context->msgSize--; + } + delete (context->mtx); + SoftBusFree(context); + context = nullptr; + } else { + delete (looper->queue->msgQueue); + } + SoftBusFree(looper->queue); + ReleaseLooper(looper); + SoftBusFree(looper); + if (g_looperCnt.load(std::memory_order_acquire) != 0) { + g_looperCnt--; + } +} + +static void LooperPostMessage(const SoftBusLooper *looper, SoftBusMessage *msg) +{ + if (msg == nullptr || msg->handler == nullptr) { + COMM_LOGE(COMM_UTILS, "LooperPostMessage with nullmsg"); + return; + } + if (looper == nullptr || looper->context == nullptr) { + COMM_LOGE(COMM_UTILS, "LooperPostMessage with nulllooper"); + return; + } + if (looper->queue == nullptr || looper->queue->msgQueue == nullptr) { + COMM_LOGE(COMM_UTILS, "LooperPostMessage with nullqueue"); + return; + } + msg->time = UptimeMicros(); + PostMessageWithFfrt(looper, msg, 0); +} + +static void LooperPostMessageDelay(const SoftBusLooper *looper, SoftBusMessage *msg, uint64_t delayMillis) +{ + if (msg == nullptr || msg->handler == nullptr) { + COMM_LOGE(COMM_UTILS, "LooperPostMessageDelay with nullmsg"); + return; + } + if (looper == nullptr || looper->context == nullptr) { + COMM_LOGE(COMM_UTILS, "LooperPostMessageDelay with nulllooper"); + return; + } + if (looper->queue == nullptr || looper->queue->msgQueue == nullptr) { + COMM_LOGE(COMM_UTILS, "LooperPostMessageDelay with nullqueue"); + return; + } + msg->time = UptimeMicros() + (int64_t)delayMillis * TIME_THOUSANDS_MULTIPLIER; + PostMessageWithFfrt(looper, msg, delayMillis * TIME_THOUSANDS_MULTIPLIER); +} + +static int WhatRemoveFunc(const SoftBusMessage *msg, void *args) +{ + int32_t what = (int32_t)(intptr_t)args; + if (msg->what == what) { + return 0; + } + return 1; +} + +static void LoopRemoveMessageCustom(const SoftBusLooper *looper, const SoftBusHandler *handler, + int (*customFunc)(const SoftBusMessage*, void*), void *args) +{ + if (looper == nullptr || looper->context == nullptr) { + COMM_LOGE(COMM_UTILS, "LooperRemoveMessage with nulllopper"); + return; + } + if (looper->queue == nullptr || looper->queue->msgQueue == nullptr) { + COMM_LOGE(COMM_UTILS, "LooperRemoveMessage with nullqueue"); + return; + } + if (handler == nullptr) { + COMM_LOGE(COMM_UTILS, "LooperRemoveMessage with nullhandler"); + return; + } + if (customFunc == nullptr) { + COMM_LOGE(COMM_UTILS, "LooperRemoveMessage with nullcustomFunc"); + return; + } + RemoveMessageWithFfrt(looper, handler, customFunc, args); +} + +static void LooperRemoveMessage(const SoftBusLooper *looper, const SoftBusHandler *handler, int32_t what) +{ + if (looper == nullptr || looper->context == nullptr) { + COMM_LOGE(COMM_UTILS, "LooperRemoveMessage with nulllopper"); + return; + } + if (looper->queue == nullptr || looper->queue->msgQueue == nullptr) { + COMM_LOGE(COMM_UTILS, "LooperRemoveMessage with nullqueue"); + return; + } + if (handler == nullptr) { + COMM_LOGE(COMM_UTILS, "LooperRemoveMessage with nullhandler"); + return; + } + LoopRemoveMessageCustom(looper, handler, WhatRemoveFunc, (void*)(intptr_t)what); +} + +void SetLooperDumpable(SoftBusLooper *looper, bool dumpable) +{ + if (looper == nullptr || looper->context == nullptr) { + COMM_LOGE(COMM_UTILS, "looper param is invalid"); + return; + } + looper->context->mtx->lock(); + looper->dumpable = dumpable; + looper->context->mtx->unlock(); +} + +static int32_t CreateNewFfrtQueue(FfrtMsgQueue **ffrtQueue, const char *name) +{ + FfrtMsgQueue *tmpQueue = static_cast(SoftBusCalloc(sizeof(FfrtMsgQueue))); + if (tmpQueue == nullptr) { + COMM_LOGE(COMM_UTILS, "softbus msgQueue SoftBusCalloc fail"); + return SOFTBUS_MALLOC_ERR; + } + tmpQueue->msgQueue = new (std::nothrow)ffrt::queue(name); + if (tmpQueue->msgQueue == nullptr) { + COMM_LOGE(COMM_UTILS, "ffrt msgQueue SoftBusCalloc fail"); + SoftBusFree(tmpQueue); + return SOFTBUS_MALLOC_ERR; + } + *ffrtQueue = tmpQueue; + return SOFTBUS_OK; +} + +static int32_t CreateNewContext(SoftBusLooperContext **context, const char *name) +{ + SoftBusLooperContext *tmpContext = static_cast(SoftBusCalloc(sizeof(SoftBusLooperContext))); + if (tmpContext == nullptr) { + COMM_LOGE(COMM_UTILS, "context SoftBusCalloc fail"); + return SOFTBUS_MALLOC_ERR; + } + if (strncpy_s(tmpContext->name, LOOP_NAME_LEN, name, strlen(name)) != EOK) { + COMM_LOGE(COMM_UTILS, "memcpy context name fail"); + SoftBusFree(tmpContext); + return SOFTBUS_STRCPY_ERR; + } + ListInit(&tmpContext->msgHead); + // init mtx + tmpContext->mtx = new (std::nothrow)ffrt::mutex(); + if (tmpContext->mtx == nullptr) { + COMM_LOGE(COMM_UTILS, "context ffrt lock init fail"); + SoftBusFree(tmpContext); + return SOFTBUS_MALLOC_ERR; + } + tmpContext->stop = false; + *context = tmpContext; + return SOFTBUS_OK; +} + +SoftBusLooper *CreateNewLooper(const char *name) +{ + if (name == nullptr || strlen(name) >= LOOP_NAME_LEN) { + COMM_LOGE(COMM_UTILS, "invalid looper name=%{public}s", name); + return nullptr; + } + if (g_looperCnt.load(std::memory_order_acquire) >= MAX_LOOPER_CNT) { + COMM_LOGE(COMM_UTILS, "Looper exceeds the maximum, count=%{public}u,", + g_looperCnt.load(std::memory_order_acquire)); + return nullptr; + } + SoftBusLooper *looper = static_cast(SoftBusCalloc(sizeof(SoftBusLooper))); + if (looper == nullptr) { + COMM_LOGE(COMM_UTILS, "Looper SoftBusCalloc fail"); + return nullptr; + } + SoftBusLooperContext *context = nullptr; + if (CreateNewContext(&context, name) != SOFTBUS_OK) { + COMM_LOGE(COMM_UTILS, "create new context fail"); + SoftBusFree(looper); + return nullptr; + } + FfrtMsgQueue *ffrtQueue = nullptr; + if (CreateNewFfrtQueue(&ffrtQueue, name) != SOFTBUS_OK) { + COMM_LOGE(COMM_UTILS, "create new ffrtQueue fail"); + delete (context->mtx); + SoftBusFree(context); + SoftBusFree(looper); + return nullptr; + } + // init looper + looper->context = context; + looper->dumpable = true; + looper->PostMessage = LooperPostMessage; + looper->PostMessageDelay = LooperPostMessageDelay; + looper->RemoveMessage = LooperRemoveMessage; + looper->RemoveMessageCustom = LoopRemoveMessageCustom; + looper->queue = ffrtQueue; + COMM_LOGI(COMM_UTILS, "start looper with ffrt ok, name=%{public}s", context->name); + g_looperCnt++; + return looper; +} + +SoftBusLooper *GetLooper(int type) +{ + uint32_t len = sizeof(g_loopConfig) / sizeof(struct LoopConfigItem); + for (uint32_t i = 0; i < len; i++) { + if (g_loopConfig[i].type == type) { + return g_loopConfig[i].looper; + } + } + return nullptr; +} + +void SetLooper(int type, SoftBusLooper *looper) +{ + uint32_t len = sizeof(g_loopConfig) / sizeof(struct LoopConfigItem); + for (uint32_t i = 0; i < len; i++) { + if (g_loopConfig[i].type == type) { + g_loopConfig[i].looper = looper; + } + } +} + +void DestroyLooper(SoftBusLooper *looper) +{ + if (looper == nullptr) { + COMM_LOGE(COMM_UTILS, "DestroyLooper with nulllooper"); + return; + } + if (looper->queue == nullptr || looper->queue->msgQueue == nullptr) { + COMM_LOGE(COMM_UTILS, "DestroyLooper with nullqueue"); + return; + } + DestroyLooperWithFfrt(looper); +} + +int LooperInit(void) +{ + SoftBusLooper *looper = CreateNewLooper("BusCenter_Lp"); + if (!looper) { + COMM_LOGE(COMM_UTILS, "init BusCenter looper fail."); + return SOFTBUS_LOOPER_ERR; + } + SetLooper(LOOP_TYPE_DEFAULT, looper); + + SoftBusLooper *connLooper = CreateNewLooper("ReactorLink_Lp"); + if (!connLooper) { + COMM_LOGE(COMM_UTILS, "init connection looper fail."); + return SOFTBUS_LOOPER_ERR; + } + SetLooper(LOOP_TYPE_CONN, connLooper); + + COMM_LOGD(COMM_UTILS, "init looper success."); + return SOFTBUS_OK; +} + +void LooperDeinit(void) +{ + uint32_t len = sizeof(g_loopConfig) / sizeof(struct LoopConfigItem); + for (uint32_t i = 0; i < len; i++) { + if (g_loopConfig[i].looper == nullptr) { + continue; + } + DestroyLooper(g_loopConfig[i].looper); + } +} \ No newline at end of file diff --git a/core/common/utils/softbus_utils.c b/core/common/utils/softbus_utils.c index 7bfc83002..bff6d32f0 100644 --- a/core/common/utils/softbus_utils.c +++ b/core/common/utils/softbus_utils.c @@ -55,8 +55,11 @@ #define ONE_BYTE_SIZE 8 +#ifdef SOFTBUS_STANDARD_OS +static int32_t *g_timerHandle = NULL; +#else static void *g_timerId = NULL; -static int32_t g_handleTimes = 0; +#endif static TimerFunCallback g_timerFunList[SOFTBUS_MAX_TIMER_FUN_NUM] = {0}; static bool g_signalingMsgSwitch = false; @@ -114,22 +117,28 @@ static void HandleTimeoutFun(void) g_timerFunList[i](); } } - ++g_handleTimes; - if (g_handleTimes >= MAX_HANDLE_TIMES && g_timerId != NULL) { - (void)SoftBusDeleteTimer(g_timerId); - COMM_LOGI(COMM_UTILS, "update new timer"); - g_timerId = SoftBusCreateTimer(&g_timerId, TIMER_TYPE_PERIOD); - if (SoftBusStartTimer(g_timerId, TIMER_TIMEOUT) != SOFTBUS_OK) { - COMM_LOGE(COMM_UTILS, "start timer failed."); - (void)SoftBusDeleteTimer(g_timerId); - g_timerId = NULL; - } - g_handleTimes = 0; - } } int32_t SoftBusTimerInit(void) { +#ifdef SOFTBUS_STANDARD_OS + if (g_timerHandle != NULL) { + return SOFTBUS_OK; + } + SetTimerFunc(HandleTimeoutFun); + g_timerHandle = (int32_t *)SoftBusCalloc(sizeof(int32_t)); + if (g_timerHandle == NULL) { + COMM_LOGE(COMM_UTILS, "timerHandle calloc fail"); + return SOFTBUS_MALLOC_ERR; + } + int32_t ret = SoftBusStartTimerWithFfrt(g_timerHandle, TIMER_TIMEOUT, true); + if (ret != SOFTBUS_OK) { + SoftBusFree(g_timerHandle); + g_timerHandle = NULL; + COMM_LOGE(COMM_UTILS, "softbus timer init fail, ret=%{public}d", ret); + } + return ret; +#else if (g_timerId != NULL) { return SOFTBUS_OK; } @@ -142,14 +151,24 @@ int32_t SoftBusTimerInit(void) return SOFTBUS_ERR; } return SOFTBUS_OK; +#endif } void SoftBusTimerDeInit(void) { +#ifdef SOFTBUS_STANDARD_OS + if (g_timerHandle != NULL) { + SoftBusStopTimerWithFfrt(*g_timerHandle); + SoftBusFree(g_timerHandle); + g_timerHandle = NULL; + } + return; +#else if (g_timerId != NULL) { (void)SoftBusDeleteTimer(g_timerId); g_timerId = NULL; } +#endif } int32_t ConvertBytesToUpperCaseHexString(char *outBuf, uint32_t outBufLen, const unsigned char * inBuf, diff --git a/dsoftbus.gni b/dsoftbus.gni index d009f0c06..057e52a43 100644 --- a/dsoftbus.gni +++ b/dsoftbus.gni @@ -83,6 +83,9 @@ if (defined(ohos_lite)) { os_type = "standard" } defines = [ "SOFTBUS_LINUX" ] + if (is_standard_system) { + defines += [ "SOFTBUS_STANDARD_OS" ] + } import("//build/ohos.gni") import( "$dsoftbus_feature_product_config_path/feature_config/standard/config.gni")