!1450 fix::the problem of thread exiting

Merge pull request !1450 from yue/master
This commit is contained in:
openharmony_ci 2024-10-18 02:22:52 +00:00 committed by Gitee
commit 79aa5e9456
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
8 changed files with 120 additions and 66 deletions

View File

@ -209,10 +209,6 @@ public:
sptr<DBinderCallbackStub> QueryDBinderCallbackStub(sptr<IRemoteObject> rpcProxy);
sptr<IRemoteObject> QueryDBinderCallbackProxy(sptr<IRemoteObject> stub);
#endif
bool GetThreadStopFlag();
void IncreaseThreadCount();
void DecreaseThreadCount();
void NotifyChildThreadStop();
public:
static constexpr int DEFAULT_WORK_THREAD_NUM = 16;
@ -236,14 +232,7 @@ private:
class DestroyInstance {
public:
~DestroyInstance()
{
if (instance_ != nullptr) {
instance_->NotifyChildThreadStop();
delete instance_;
instance_ = nullptr;
}
}
~DestroyInstance();
};
static IPCProcessSkeleton *instance_;
@ -300,11 +289,6 @@ private:
std::atomic<int32_t> listenSocketId_ = 0;
uint64_t randNum_;
#endif
static constexpr size_t MAIN_THREAD_MAX_WAIT_TIME = 3;
std::atomic_bool stopThreadFlag_ = false;
std::mutex conMutex_;
std::condition_variable threadCountCon_;
std::atomic_size_t runningChildThreadNum_ = 0;
};
#ifdef CONFIG_IPC_SINGLE
} // namespace IPC_SINGLE

View File

@ -27,6 +27,11 @@ namespace OHOS {
namespace IPC_SINGLE {
#endif
enum class ThreadType {
NORMAL_THREAD = 0xB0B0B0B0,
IPC_THREAD = 0xB1B1B1B1,
};
class IPCThreadSkeleton {
public:
IPCThreadSkeleton();
@ -56,6 +61,8 @@ public:
static bool IsInstanceException(std::atomic<uint32_t> &flag);
static bool SetThreadType(ThreadType type);
bool IsSendRequesting();
// Joint Current thread into IPC Work Group
@ -76,6 +83,7 @@ private:
const pid_t tid_;
std::atomic<int32_t> sendRequestCount_ = 0;
std::string threadName_;
ThreadType threadType_ = ThreadType::NORMAL_THREAD;
};
#ifdef CONFIG_IPC_SINGLE
} // namespace IPC_SINGLE

View File

@ -67,6 +67,11 @@ public:
bool QueryInvokerProcInfo(bool isLocal, InvokerProcInfo &invokeInfo);
bool DetachInvokerProcInfo(bool isLocal);
bool GetThreadStopFlag();
void IncreaseThreadCount();
void DecreaseThreadCount();
void NotifyChildThreadStop();
private:
DISALLOW_COPY_AND_MOVE(ProcessSkeleton);
ProcessSkeleton() = default;
@ -103,6 +108,12 @@ private:
std::shared_mutex invokerProcMutex_;
std::unordered_map<std::string, InvokerProcInfo> invokerProcInfo_;
static constexpr size_t MAIN_THREAD_MAX_WAIT_TIME = 3;
std::atomic_bool stopThreadFlag_ = false;
std::mutex threadCountMutex_;
std::condition_variable threadCountCon_;
std::atomic_size_t runningChildThreadNum_ = 0;
};
} // namespace OHOS
#endif // OHOS_IPC_PROCESS_SKELETON_H

View File

@ -20,7 +20,6 @@
#include <sys/epoll.h>
#include <unistd.h>
#include "binder_connector.h"
#include "check_instance_exit.h"
#include "ipc_debug.h"
#include "ipc_thread_skeleton.h"
@ -1666,47 +1665,20 @@ sptr<IRemoteObject> IPCProcessSkeleton::QueryDBinderCallbackProxy(sptr<IRemoteOb
}
#endif
bool IPCProcessSkeleton::GetThreadStopFlag()
IPCProcessSkeleton::DestroyInstance::~DestroyInstance()
{
return stopThreadFlag_.load();
}
void IPCProcessSkeleton::IncreaseThreadCount()
{
runningChildThreadNum_.fetch_add(1);
}
void IPCProcessSkeleton::DecreaseThreadCount()
{
if (runningChildThreadNum_.load() > 0) {
runningChildThreadNum_.fetch_sub(1);
}
if (runningChildThreadNum_.load() == 0) {
threadCountCon_.notify_one();
}
}
void IPCProcessSkeleton::NotifyChildThreadStop()
{
// set child thread exit flag
stopThreadFlag_.store(true);
// after closeing fd, child threads will be not block in the 'WriteBinder' function
BinderConnector *connector = BinderConnector::GetInstance();
if (connector != nullptr) {
connector->CloseDriverFd();
}
ZLOGI(LOG_LABEL, "start waiting for child thread to exit, child thread num:%{public}zu",
runningChildThreadNum_.load());
std::unique_lock<std::mutex> lockGuard(conMutex_);
threadCountCon_.wait_for(lockGuard,
std::chrono::seconds(MAIN_THREAD_MAX_WAIT_TIME),
[&threadNum = this->runningChildThreadNum_] { return threadNum.load() == 0; });
if (runningChildThreadNum_.load() != 0) {
ZLOGE(LOG_LABEL, "wait timeout, %{public}zu child threads not exiting", runningChildThreadNum_.load());
if (instance_ == nullptr) {
return;
}
ZLOGI(LOG_LABEL, "wait finished, all child thread have exited");
// notify other threads to stop running
auto process = ProcessSkeleton::GetInstance();
if (process != nullptr) {
process->NotifyChildThreadStop();
}
delete instance_;
instance_ = nullptr;
}
#ifdef CONFIG_IPC_SINGLE
} // namespace IPC_SINGLE

View File

@ -165,6 +165,16 @@ IPCThreadSkeleton::~IPCThreadSkeleton()
delete invoker;
invoker = nullptr;
}
if (threadType_ == ThreadType::IPC_THREAD) {
// subtract thread count when thread exiting
auto process = ProcessSkeleton::GetInstance();
if (process != nullptr) {
process->DecreaseThreadCount();
}
}
ZLOGD(LOG_LABEL, "thread exit, threadName=%{public}s, tid=%{public}d, threadType=%{public}d",
threadName_.c_str(), tid_, threadType_);
}
bool IPCThreadSkeleton::IsInstanceException(std::atomic<uint32_t> &flag)
@ -297,6 +307,17 @@ bool IPCThreadSkeleton::IsSendRequesting()
{
return sendRequestCount_ > 0;
}
bool IPCThreadSkeleton::SetThreadType(ThreadType type)
{
IPCThreadSkeleton *current = IPCThreadSkeleton::GetCurrent();
if (current != nullptr) {
return false;
}
current->threadType_ = type;
return true;
}
#ifdef CONFIG_IPC_SINGLE
} // namespace IPC_SINGLE
#endif

View File

@ -96,13 +96,14 @@ void IPCWorkThread::JoinThread(int proto, int policy)
void *IPCWorkThread::ThreadHandler(void *args)
{
IPCProcessSkeleton *current = IPCProcessSkeleton::GetCurrent();
if (current == nullptr) {
ZLOGE(LOG_LABEL, "get IPCProcessSkeleton object failed");
(void)IPCThreadSkeleton::SetThreadType(ThreadType::IPC_THREAD);
ProcessSkeleton *process = ProcessSkeleton::GetInstance();
if (process == nullptr) {
ZLOGE(LOG_LABEL, "get ProcessSkeleton object failed");
return nullptr;
}
if (current->GetThreadStopFlag()) {
if (process->GetThreadStopFlag()) {
ZLOGW(LOG_LABEL, "the stop flag is true, thread start exit");
return nullptr;
}
@ -112,7 +113,6 @@ void *IPCWorkThread::ThreadHandler(void *args)
return nullptr;
}
current->IncreaseThreadCount();
std::string basicName = MakeBasicThreadName(param->proto, param->index);
std::string threadName = basicName + "_" + std::to_string(syscall(SYS_gettid));
int32_t ret = prctl(PR_SET_NAME, threadName.c_str());
@ -127,14 +127,13 @@ void *IPCWorkThread::ThreadHandler(void *args)
JoinThread(param->proto, param->policy);
current = IPCProcessSkeleton::GetCurrent();
IPCProcessSkeleton *current = IPCProcessSkeleton::GetCurrent();
if (current != nullptr) {
current->OnThreadTerminated(basicName);
}
ZLOGW(LOG_LABEL, "exit, proto:%{public}d policy:%{public}d name:%{public}s",
param->proto, param->policy, threadName.c_str());
delete param;
current->DecreaseThreadCount();
return nullptr;
}
@ -148,6 +147,17 @@ void IPCWorkThread::StopWorkThread()
void IPCWorkThread::Start(int policy, int proto, int threadIndex)
{
ProcessSkeleton *process = ProcessSkeleton::GetInstance();
if (process == nullptr) {
ZLOGE(LOG_LABEL, "get ProcessSkeleton object failed");
return;
}
if (process->GetThreadStopFlag()) {
ZLOGW(LOG_LABEL, "the stop flag is true, can not create other thread");
return;
}
auto param = new (std::nothrow) IPCWorkThreadParam();
if (param == nullptr) {
ZLOGE(LOG_LABEL, "create IPCWorkThreadParam failed");
@ -160,11 +170,13 @@ void IPCWorkThread::Start(int policy, int proto, int threadIndex)
param->proto = proto;
param->index = threadIndex;
pthread_t threadId;
int ret = pthread_create(&threadId, NULL, &IPCWorkThread::ThreadHandler, param);
if (ret != 0) {
ZLOGE(LOG_LABEL, "create thread failed, ret:%{public}d", ret);
return;
}
process->IncreaseThreadCount();
ZLOGD(LOG_LABEL, "create thread, policy:%{public}d proto:%{public}d", policy, proto);
if (pthread_detach(threadId) != 0) {
ZLOGE(LOG_LABEL, "detach error");

View File

@ -18,6 +18,7 @@
#include <cinttypes>
#include <unistd.h>
#include "binder_connector.h"
#include "check_instance_exit.h"
#include "ipc_debug.h"
#include "log_tags.h"
@ -414,4 +415,49 @@ bool ProcessSkeleton::IsNumStr(const std::string &str)
}
return std::all_of(str.begin(), str.end(), ::isdigit);
}
bool ProcessSkeleton::GetThreadStopFlag()
{
return stopThreadFlag_.load();
}
void ProcessSkeleton::IncreaseThreadCount()
{
std::unique_lock<std::mutex> lockGuard(threadCountMutex_);
runningChildThreadNum_.fetch_add(1);
}
void ProcessSkeleton::DecreaseThreadCount()
{
std::unique_lock<std::mutex> lockGuard(threadCountMutex_);
if (runningChildThreadNum_.load() > 0) {
runningChildThreadNum_.fetch_sub(1);
if (runningChildThreadNum_.load() == 0) {
threadCountCon_.notify_one();
}
}
}
void ProcessSkeleton::NotifyChildThreadStop()
{
// set child thread exit flag
stopThreadFlag_.store(true);
// after closeing fd, child threads will be not block in the 'WriteBinder' function
BinderConnector *connector = BinderConnector::GetInstance();
if (connector != nullptr) {
connector->CloseDriverFd();
}
ZLOGI(LOG_LABEL, "start waiting for child thread to exit, child thread num:%{public}zu",
runningChildThreadNum_.load());
std::unique_lock<std::mutex> lockGuard(threadCountMutex_);
threadCountCon_.wait_for(lockGuard,
std::chrono::seconds(MAIN_THREAD_MAX_WAIT_TIME),
[&threadNum = this->runningChildThreadNum_] { return threadNum.load() == 0; });
if (runningChildThreadNum_.load() != 0) {
ZLOGI(LOG_LABEL, "wait timeout, %{public}zu child threads not exiting", runningChildThreadNum_.load());
return;
}
ZLOGI(LOG_LABEL, "wait finished, all child thread have exited");
}
} // namespace OHOS

View File

@ -533,8 +533,8 @@ void BinderInvoker::StartWorkLoop()
}
int error;
do {
IPCProcessSkeleton *current = IPCProcessSkeleton::GetCurrent();
if (current == nullptr || current->GetThreadStopFlag()) {
ProcessSkeleton *process = ProcessSkeleton::GetInstance();
if (process == nullptr || process->GetThreadStopFlag()) {
break;
}
error = TransactWithDriver();
@ -546,7 +546,7 @@ void BinderInvoker::StartWorkLoop()
continue;
}
uint32_t cmd = input_.ReadUint32();
current = IPCProcessSkeleton::GetCurrent();
IPCProcessSkeleton *current = IPCProcessSkeleton::GetCurrent();
if (current != nullptr) {
current->LockForNumExecuting();
}