!1905 Enable awareness of socket session

Merge pull request !1905 from blueyouth/enable_session_awareness
This commit is contained in:
openharmony_ci 2024-09-09 03:58:49 +00:00 committed by Gitee
commit bd29ebe653
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
9 changed files with 106 additions and 23 deletions

View File

@ -288,7 +288,7 @@ int32_t CooperateClient::UnregisterEventListener(ITunnelClient &tunnel,
int32_t CooperateClient::SetDamplingCoefficient(ITunnelClient &tunnel, uint32_t direction, double coefficient)
{
FI_HILOGI("SetDamplingCoefficient(0x%{public}x, %{public}lf)", direction, coefficient);
FI_HILOGI("SetDamplingCoefficient(0x%{public}x, %{public}.3f)", direction, coefficient);
SetDamplingCoefficientParam param { direction, coefficient };
DefaultReply reply;

View File

@ -120,7 +120,7 @@ void InputEventBuilder::Thaw()
void InputEventBuilder::SetDamplingCoefficient(uint32_t direction, double coefficient)
{
coefficient = std::clamp(coefficient, MIN_DAMPLING_COEFFICENT, MAX_DAMPLING_COEFFICENT);
FI_HILOGI("SetDamplingCoefficient(0x%{public}x, %{public}lf)", direction, coefficient);
FI_HILOGI("SetDamplingCoefficient(0x%{public}x, %{public}.3f)", direction, coefficient);
if ((direction & COORDINATION_DAMPLING_UP) == COORDINATION_DAMPLING_UP) {
damplingCoefficients_[DamplingDirection::DAMPLING_DIRECTION_UP] = coefficient;
}

View File

@ -209,7 +209,7 @@ int32_t CooperateServer::SetParam(CallingContext &context, uint32_t id, MessageP
CHKPR(context_, RET_ERR);
ICooperate* cooperate = context_->GetPluginManager().LoadCooperate();
CHKPR(cooperate, RET_ERR);
FI_HILOGI("SetDamplingCoefficient(0x%{public}x, %{public}lf)", param.direction, param.coefficient);
FI_HILOGI("SetDamplingCoefficient(0x%{public}x, %{public}.3f)", param.direction, param.coefficient);
return cooperate->SetDamplingCoefficient(param.direction, param.coefficient);
}

View File

@ -35,10 +35,11 @@ namespace DeviceStatus {
class SocketSessionManager final : public ISocketSessionManager, public IEpollEventSource {
public:
SocketSessionManager() = default;
~SocketSessionManager() = default;
~SocketSessionManager();
DISALLOW_COPY_AND_MOVE(SocketSessionManager);
int32_t Init();
int32_t Enable();
void Disable();
void AddSessionDeletedCallback(int32_t pid, std::function<void(SocketSessionPtr)> callback) override;
void RemoveSessionDeletedCallback(int32_t pid) override;
@ -65,6 +66,7 @@ private:
private:
bool SetBufferSize(int32_t sockFd, int32_t bufSize);
void DispatchOne();
void OnEpollIn(IEpollEventSource &source);
void ReleaseSession(int32_t fd);
void ReleaseSessionByPid(int32_t pid);
std::shared_ptr<SocketSession> FindSession(int32_t fd) const;
@ -73,7 +75,7 @@ private:
void DumpSession(const std::string& title) const;
void NotifySessionDeleted(std::shared_ptr<SocketSession> sessionPtr);
static std::recursive_mutex mutex_;
mutable std::recursive_mutex mutex_;
EpollManager epollMgr_;
std::map<int32_t, std::shared_ptr<SocketSession>> sessions_;
std::map<int32_t, std::function<void(SocketSessionPtr)>> callbacks_;

View File

@ -35,14 +35,30 @@ namespace {
constexpr int32_t MAX_EPOLL_EVENTS { 64 };
} // namespace
std::recursive_mutex SocketSessionManager::mutex_;
SocketSessionManager::~SocketSessionManager()
{
Disable();
}
int32_t SocketSessionManager::Init()
int32_t SocketSessionManager::Enable()
{
CALL_INFO_TRACE;
std::lock_guard<std::recursive_mutex> guard(mutex_);
return epollMgr_.Open();
}
void SocketSessionManager::Disable()
{
CALL_INFO_TRACE;
std::lock_guard<std::recursive_mutex> guard(mutex_);
epollMgr_.Close();
std::for_each(sessions_.cbegin(), sessions_.cend(), [this](const auto &item) {
CHKPV(item.second);
NotifySessionDeleted(item.second);
});
sessions_.clear();
}
void SocketSessionManager::RegisterApplicationState()
{
CALL_DEBUG_ENTER;
@ -146,14 +162,43 @@ void SocketSessionManager::DispatchOne()
IEpollEventSource *source = reinterpret_cast<IEpollEventSource *>(evs[index].data.ptr);
CHKPC(source);
if ((evs[index].events & EPOLLIN) == EPOLLIN) {
source->Dispatch(evs[index]);
OnEpollIn(*source);
} else if ((evs[index].events & (EPOLLHUP | EPOLLERR)) != 0) {
FI_HILOGE("Epoll hangup:%{public}s", ::strerror(errno));
FI_HILOGW("Epoll hangup:%{public}s", ::strerror(errno));
ReleaseSession(source->GetFd());
}
}
}
void SocketSessionManager::OnEpollIn(IEpollEventSource &source)
{
CALL_DEBUG_ENTER;
char buf[MAX_PACKET_BUF_SIZE] {};
ssize_t numRead {};
do {
numRead = ::recv(source.GetFd(), buf, sizeof(buf), MSG_DONTWAIT);
if (numRead > 0) {
FI_HILOGI("%{public}zd bytes received", numRead);
} else if (numRead < 0) {
if (errno == EINTR) {
FI_HILOGD("recv was interrupted, read again");
continue;
}
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
FI_HILOGW("No available data");
} else {
FI_HILOGE("recv failed:%{public}s", ::strerror(errno));
}
break;
} else {
FI_HILOGE("EOF happened");
ReleaseSession(source.GetFd());
break;
}
} while (numRead == sizeof(buf));
}
void SocketSessionManager::ReleaseSession(int32_t fd)
{
CALL_DEBUG_ENTER;
@ -279,8 +324,6 @@ void SocketSessionManager::RemoveSessionDeletedCallback(int32_t pid)
void SocketSessionManager::NotifySessionDeleted(std::shared_ptr<SocketSession> session)
{
std::lock_guard<std::recursive_mutex> guard(mutex_);
CHKPV(session);
FI_HILOGI("Session lost, pid:%{public}d", session->GetPid());
if (auto iter = callbacks_.find(session->GetPid()); iter != callbacks_.end()) {
if (iter->second) {

View File

@ -122,9 +122,12 @@ private:
int32_t InitTimerMgr();
int32_t InitMotionDrag();
void OnThread();
void OnSocketEvent(const struct epoll_event &ev);
void OnDelegateTask(const epoll_event &ev);
void OnTimeout(const epoll_event &ev);
void OnDeviceMgr(const epoll_event &ev);
int32_t EnableSocketSessionMgr(int32_t nRetries);
void DisableSocketSessionMgr();
int32_t EnableDevMgr(int32_t nRetries);
void DisableDevMgr();
void EnableDSoftbus();

View File

@ -282,13 +282,6 @@ bool DeviceStatusService::Init()
FI_HILOGE("Dump init failed");
goto INIT_FAIL;
}
#if defined(OHOS_BUILD_ENABLE_INTENTION_FRAMEWORK)
if (socketSessionMgr_.Init() != RET_OK) {
FI_HILOGE("Failed to initialize socket session manager");
goto INIT_FAIL;
}
#elif defined(OHOS_BUILD_ENABLE_COORDINATION)
#endif // OHOS_BUILD_ENABLE_COORDINATION
return true;
INIT_FAIL:
@ -512,6 +505,7 @@ void DeviceStatusService::OnThread()
uint64_t tid = GetThisThreadId();
delegateTasks_.SetWorkerThreadId(tid);
FI_HILOGD("Main worker thread start, tid:%{public}" PRId64 "", tid);
EnableSocketSessionMgr(MAX_N_RETRIES);
EnableDevMgr(MAX_N_RETRIES);
while (state_ == ServiceRunningState::STATE_RUNNING) {
@ -521,7 +515,7 @@ void DeviceStatusService::OnThread()
auto epollEvent = reinterpret_cast<device_status_epoll_event*>(ev[i].data.ptr);
CHKPC(epollEvent);
if (epollEvent->event_type == EPOLL_EVENT_SOCKET) {
OnEpollEvent(ev[i]);
OnSocketEvent(ev[i]);
} else if (epollEvent->event_type == EPOLL_EVENT_ETASK) {
OnDelegateTask(ev[i]);
} else if (epollEvent->event_type == EPOLL_EVENT_TIMER) {
@ -536,6 +530,16 @@ void DeviceStatusService::OnThread()
FI_HILOGD("Main worker thread stop, tid:%{public}" PRId64 "", tid);
}
void DeviceStatusService::OnSocketEvent(const struct epoll_event &ev)
{
CALL_INFO_TRACE;
if ((ev.events & EPOLLIN) == EPOLLIN) {
socketSessionMgr_.Dispatch(ev);
} else if ((ev.events & (EPOLLHUP | EPOLLERR)) != 0) {
FI_HILOGE("Epoll hangup:%{public}s", ::strerror(errno));
}
}
void DeviceStatusService::OnDelegateTask(const struct epoll_event &ev)
{
if ((ev.events & EPOLLIN) == 0) {
@ -577,6 +581,37 @@ void DeviceStatusService::OnDeviceMgr(const struct epoll_event &ev)
}
}
int32_t DeviceStatusService::EnableSocketSessionMgr(int32_t nRetries)
{
CALL_INFO_TRACE;
int32_t ret = socketSessionMgr_.Enable();
if (ret != RET_OK) {
FI_HILOGE("Failed to enable SocketSessionManager");
if (nRetries > 0) {
auto timerId = timerMgr_.AddTimer(DEFAULT_WAIT_TIME_MS, WAIT_FOR_ONCE,
[this, nRetries]() {
return EnableSocketSessionMgr(nRetries - 1);
});
if (timerId < 0) {
FI_HILOGE("AddTimer failed, Failed to enable SocketSessionManager");
}
} else {
FI_HILOGE("Maximum number of retries exceeded, Failed to enable SocketSessionManager");
}
return ret;
}
FI_HILOGI("Enable SocketSessionManager successfully");
AddEpoll(EPOLL_EVENT_SOCKET, socketSessionMgr_.GetFd());
return RET_OK;
}
void DeviceStatusService::DisableSocketSessionMgr()
{
CALL_INFO_TRACE;
DelEpoll(EPOLL_EVENT_SOCKET, socketSessionMgr_.GetFd());
socketSessionMgr_.Disable();
}
int32_t DeviceStatusService::EnableDevMgr(int32_t nRetries)
{
CALL_INFO_TRACE;

View File

@ -305,7 +305,7 @@ HWTEST_F(CooperatePluginTest, CooperatePluginTest2, TestSize.Level0)
CALL_TEST_DEBUG;
Cooperate::RegisterEventListenerEvent registerEventListenerEvent1 {IPCSkeleton::GetCallingPid(), "test"};
g_context->mouseLocation_.AddListener(registerEventListenerEvent1);
g_socketSessionMgr.Init();
g_socketSessionMgr.Enable();
g_socketSessionMgr.AddSession(g_session);
g_contextOne->mouseLocation_.ReportMouseLocationToListener("test", {}, IPCSkeleton::GetCallingPid());
g_context->mouseLocation_.ReportMouseLocationToListener("test", {}, IPCSkeleton::GetCallingPid());
@ -344,7 +344,7 @@ HWTEST_F(CooperatePluginTest, CooperatePluginTest3, TestSize.Level0)
HWTEST_F(CooperatePluginTest, CooperatePluginTest4, TestSize.Level0)
{
CALL_TEST_DEBUG;
g_socketSessionMgr.Init();
g_socketSessionMgr.Enable();
RegisterHotareaListenerEvent registerHotareaListenerEvent{IPCSkeleton::GetCallingPid(), 1};
g_context->hotArea_.AddListener(registerHotareaListenerEvent);
g_context->hotArea_.OnHotAreaMessage(HotAreaType::AREA_LEFT, true);

View File

@ -402,7 +402,7 @@ HWTEST_F(SocketSessionTest, SocketSessionTest18, TestSize.Level0)
{
CALL_TEST_DEBUG;
g_socketSessionManager->GetAppMgr();
int32_t ret = g_socketSessionManager->Init();
int32_t ret = g_socketSessionManager->Enable();
EXPECT_EQ(ret, RET_OK);
}