!985 fix all connect idle notify

Merge pull request !985 from 仝月姣/master
This commit is contained in:
openharmony_ci 2024-07-15 09:40:27 +00:00 committed by Gitee
commit c66132342a
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
5 changed files with 94 additions and 76 deletions

View File

@ -52,22 +52,24 @@ public:
int32_t ReleaseChannel();
int32_t SendData(int32_t sessionId, int32_t dataType, std::shared_ptr<DSchedDataBuffer> dataBuffer);
int32_t SendBytesBySoftbus(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer);
int32_t CreateSessionRecord(int32_t sessionId, const std::string &peerDeviceId);
void OnBind(int32_t sessionId, const std::string &peerDeviceId);
void OnShutdown(int32_t sessionId, bool isSelfCalled);
void OnBytes(int32_t sessionId, const void *data, uint32_t dataLen);
void OnDataReady(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer, uint32_t dataType);
void RegisterListener(int32_t serviceType, std::shared_ptr<IDataListener> listener);
void NotifyListenersSessionShutdown(int32_t sessionId, bool isSelfCalled);
void UnregisterListener(int32_t serviceType, std::shared_ptr<IDataListener> listener);
void SetCallingTokenId(int32_t callingTokenId);
bool GetSessionIdByDeviceId(const std::string &peerDeviceId, int32_t &sessionId);
private:
DSchedTransportSoftbusAdapter();
~DSchedTransportSoftbusAdapter();
int32_t CreateServerSocket();
int32_t CreateClientSocket(const std::string &peerDeviceId);
int32_t CreateSessionRecord(int32_t sessionId, const std::string &peerDeviceId, bool isServer);
int32_t AddNewPeerSession(const std::string &peerDeviceId, int32_t &sessionId);
bool GetSessionIdByDeviceId(const std::string &peerDeviceId, int32_t &sessionId);
void ShutdownSession(const std::string &peerDeviceId, int32_t sessionId);
void NotifyListenersSessionShutdown(int32_t sessionId, bool isSelfCalled);
private:
std::map<int32_t, std::shared_ptr<DSchedSoftbusSession>> sessions_;

View File

@ -1047,11 +1047,7 @@ int32_t DSchedContinue::ExecuteContinueEnd(int32_t result)
DSchedTransportSoftbusAdapter::GetInstance().DisconnectDevice(peerDeviceId);
}
if (result != ERR_OK) {
eventData_.state_ = DMS_DSCHED_EVENT_STOP;
} else {
eventData_.state_ = DMS_DSCHED_EVENT_FINISH;
}
eventData_.state_ = result != ERR_OK ? DMS_DSCHED_EVENT_STOP : DMS_DSCHED_EVENT_FINISH;
if (result == ERR_OK && direction_ == CONTINUE_SOURCE && isSourceExit_) {
int32_t ret = AbilityManagerClient::GetInstance()->CleanMission(continueInfo_.missionId_);
HILOGD("ExecuteContinueEnd clean mission result: %{public}d", ret);

View File

@ -44,7 +44,7 @@ ServiceCollaborationManager_CommunicationRequestInfo DSchedAllConnectManager::co
.maxLatency = DSCHED_QOS_TYPE_MAX_LATENCY,
.minLatency = DSCHED_QOS_TYPE_MIN_LATENCY,
.maxWaitTime = 0,
.dataType = "DATA_TYPE_FILE",
.dataType = "DATA_TYPE_BYTES",
};
std::queue<std::string> DSchedAllConnectManager::peerConnectCbQueue_;
@ -265,7 +265,12 @@ int32_t DSchedAllConnectManager::OnStop(const char *peerNetworkId)
{
HILOGI("OnStop, when other task prepare to seize bind, disconnect DMS bind with peerNetworkId %{public}s.",
GetAnonymStr(peerNetworkId).c_str());
DSchedTransportSoftbusAdapter::GetInstance().DisconnectDevice(peerNetworkId);
int32_t sessionId = -1;
if (!DSchedTransportSoftbusAdapter::GetInstance().GetSessionIdByDeviceId(peerNetworkId, sessionId)) {
HILOGW("Not find any sessionId by peerNetworkId %{public}s.", GetAnonymStr(peerNetworkId).c_str());
return ERR_OK;
}
DSchedTransportSoftbusAdapter::GetInstance().OnShutdown(sessionId, false);
return ERR_OK;
}

View File

@ -43,7 +43,7 @@ static uint32_t g_QosTV_Param_Index = static_cast<uint32_t>(sizeof(g_qosInfo) /
static void OnBind(int32_t socket, PeerSocketInfo info)
{
std::string peerDeviceId(info.networkId);
DSchedTransportSoftbusAdapter::GetInstance().CreateSessionRecord(socket, peerDeviceId);
DSchedTransportSoftbusAdapter::GetInstance().OnBind(socket, peerDeviceId);
}
static void OnShutdown(int32_t socket, ShutdownReason reason)
@ -118,7 +118,7 @@ int32_t DSchedTransportSoftbusAdapter::ConnectDevice(const std::string &peerDevi
for (auto iter = sessions_.begin(); iter != sessions_.end(); iter++) {
if (iter->second != nullptr && peerDeviceId == iter->second->GetPeerDeviceId()) {
HILOGI("peer device already connected");
sessions_[iter->first]->OnConnect();
iter->second->OnConnect();
return iter->first;
}
}
@ -134,6 +134,12 @@ int32_t DSchedTransportSoftbusAdapter::ConnectDevice(const std::string &peerDevi
HILOGE("Apply advance resource fail, ret: %{public}d.", ret);
return INVALID_SESSION_ID;
}
ret = DSchedAllConnectManager::GetInstance().PublishServiceState(peerDeviceId, "", SCM_PREPARE);
if (ret != ERR_OK) {
HILOGE("Publish prepare state fail, ret %{public}d, peerDeviceId %{public}s.",
ret, GetAnonymStr(peerDeviceId).c_str());
}
#endif
int32_t sessionId = INVALID_SESSION_ID;
@ -147,17 +153,16 @@ int32_t DSchedTransportSoftbusAdapter::ConnectDevice(const std::string &peerDevi
int32_t DSchedTransportSoftbusAdapter::AddNewPeerSession(const std::string &peerDeviceId, int32_t &sessionId)
{
int32_t ret = ERR_OK;
#ifdef DMSFWK_ALL_CONNECT_MGR
ret = DSchedAllConnectManager::GetInstance().PublishServiceState(peerDeviceId, "", SCM_PREPARE);
if (ret != ERR_OK) {
HILOGE("Publish connect idle state fail, peerDeviceId: %{public}s, socket sessionId: %{public}d.",
GetAnonymStr(peerDeviceId).c_str(), sessionId);
}
#endif
sessionId = CreateClientSocket(peerDeviceId);
if (sessionId <= 0) {
HILOGE("create socket failed, sessionId: %{public}d.", sessionId);
#ifdef DMSFWK_ALL_CONNECT_MGR
ret = DSchedAllConnectManager::GetInstance().PublishServiceState(peerDeviceId, "", SCM_IDLE);
if (ret != ERR_OK) {
HILOGE("Publish idle state fail, ret %{public}d, peerDeviceId %{public}s, sessionId %{public}d.",
ret, GetAnonymStr(peerDeviceId).c_str(), sessionId);
}
#endif
return REMOTE_DEVICE_BIND_ABILITY_ERR;
}
@ -165,37 +170,28 @@ int32_t DSchedTransportSoftbusAdapter::AddNewPeerSession(const std::string &peer
HILOGD("SetFirstCallerTokenID callingTokenId: %{public}d, ret: %{public}d", callingTokenId_, ret);
callingTokenId_ = 0;
HILOGI("bind begin");
ret = Bind(sessionId, g_qosInfo, g_QosTV_Param_Index, &iSocketListener);
HILOGI("bind end");
if (ret != ERR_OK) {
HILOGE("client bind failed, ret: %{public}d", ret);
Shutdown(sessionId);
sessionId = INVALID_SESSION_ID;
return ret;
}
do {
HILOGI("bind begin");
ret = Bind(sessionId, g_qosInfo, g_QosTV_Param_Index, &iSocketListener);
HILOGI("bind end");
if (ret != ERR_OK) {
HILOGE("client bind failed, ret: %{public}d", ret);
break;
}
ret = CreateSessionRecord(sessionId, peerDeviceId, false);
if (ret != ERR_OK) {
HILOGE("Client create session record fail, ret %{public}d, peerDeviceId %{public}s, sessionId %{public}d.",
ret, GetAnonymStr(peerDeviceId).c_str(), sessionId);
break;
}
} while (false);
std::string localDeviceId;
if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)) {
HILOGE("GetLocalDeviceId failed");
Shutdown(sessionId);
sessionId = INVALID_SESSION_ID;
return GET_LOCAL_DEVICE_ERR;
}
{
std::lock_guard<std::mutex> sessionLock(sessionMutex_);
SessionInfo info = { sessionId, localDeviceId, peerDeviceId, SOCKET_DMS_SESSION_NAME, false };
auto session = std::make_shared<DSchedSoftbusSession>(info);
sessions_[sessionId] = session;
}
#ifdef DMSFWK_ALL_CONNECT_MGR
ret = DSchedAllConnectManager::GetInstance().PublishServiceState(peerDeviceId, "", SCM_CONNECTED);
if (ret != ERR_OK) {
HILOGE("Publish connect idle state fail, peerDeviceId: %{public}s, socket sessionId: %{public}d.",
GetAnonymStr(peerDeviceId).c_str(), sessionId);
ShutdownSession(peerDeviceId, sessionId);
sessionId = INVALID_SESSION_ID;
}
#endif
return ERR_OK;
return ret;
}
int32_t DSchedTransportSoftbusAdapter::CreateClientSocket(const std::string &peerDeviceId)
@ -213,22 +209,31 @@ int32_t DSchedTransportSoftbusAdapter::CreateClientSocket(const std::string &pee
return sessionId;
}
int32_t DSchedTransportSoftbusAdapter::CreateSessionRecord(int32_t sessionId, const std::string &peerDeviceId)
int32_t DSchedTransportSoftbusAdapter::CreateSessionRecord(int32_t sessionId, const std::string &peerDeviceId,
bool isServer)
{
std::string localDeviceId;
if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)) {
HILOGE("GetLocalDeviceId failed");
Shutdown(sessionId);
return INVALID_SESSION_ID;
ShutdownSession(peerDeviceId, sessionId);
return GET_LOCAL_DEVICE_ERR;
}
{
std::lock_guard<std::mutex> sessionLock(sessionMutex_);
std::string sessionName = SOCKET_DMS_SESSION_NAME;
SessionInfo info = { sessionId, localDeviceId, peerDeviceId, sessionName, true };
SessionInfo info = { sessionId, localDeviceId, peerDeviceId, sessionName, isServer };
auto session = std::make_shared<DSchedSoftbusSession>(info);
sessions_[sessionId] = session;
}
return sessionId;
#ifdef DMSFWK_ALL_CONNECT_MGR
int32_t ret = DSchedAllConnectManager::GetInstance().PublishServiceState(peerDeviceId, "", SCM_CONNECTED);
if (ret != ERR_OK) {
HILOGE("Publish connected state fail, ret %{public}d, peerDeviceId %{public}s, sessionId %{public}d.",
ret, GetAnonymStr(peerDeviceId).c_str(), sessionId);
}
#endif
return ERR_OK;
}
void DSchedTransportSoftbusAdapter::DisconnectDevice(const std::string &peerDeviceId)
@ -245,7 +250,7 @@ void DSchedTransportSoftbusAdapter::DisconnectDevice(const std::string &peerDevi
if (sessionId != 0 && sessions_[sessionId] != nullptr && sessions_[sessionId]->OnDisconnect()) {
HILOGI("peer %{public}s shutdown, socket sessionId: %{public}d.",
GetAnonymStr(sessions_[sessionId]->GetPeerDeviceId()).c_str(), sessionId);
Shutdown(sessionId);
ShutdownSession(peerDeviceId, sessionId);
sessions_.erase(sessionId);
NotifyListenersSessionShutdown(sessionId, true);
}
@ -253,6 +258,18 @@ void DSchedTransportSoftbusAdapter::DisconnectDevice(const std::string &peerDevi
return;
}
void DSchedTransportSoftbusAdapter::ShutdownSession(const std::string &peerDeviceId, int32_t sessionId)
{
Shutdown(sessionId);
#ifdef DMSFWK_ALL_CONNECT_MGR
int32_t ret = DSchedAllConnectManager::GetInstance().PublishServiceState(peerDeviceId, "", SCM_IDLE);
if (ret != ERR_OK) {
HILOGE("Publish idle state fail, ret %{public}d, peerDeviceId %{public}s, sessionId %{public}d.",
ret, GetAnonymStr(peerDeviceId).c_str(), sessionId);
}
#endif
}
bool DSchedTransportSoftbusAdapter::GetSessionIdByDeviceId(const std::string &peerDeviceId, int32_t &sessionId)
{
std::lock_guard<std::mutex> sessionLock(sessionMutex_);
@ -265,6 +282,15 @@ bool DSchedTransportSoftbusAdapter::GetSessionIdByDeviceId(const std::string &pe
return false;
}
void DSchedTransportSoftbusAdapter::OnBind(int32_t sessionId, const std::string &peerDeviceId)
{
int32_t ret = CreateSessionRecord(sessionId, peerDeviceId, true);
if (ret != ERR_OK) {
HILOGE("Service create session record fail, ret %{public}d, peerDeviceId %{public}s, sessionId %{public}d.",
ret, GetAnonymStr(peerDeviceId).c_str(), sessionId);
}
}
void DSchedTransportSoftbusAdapter::OnShutdown(int32_t sessionId, bool isSelfcalled)
{
std::string peerDeviceId;
@ -277,18 +303,9 @@ void DSchedTransportSoftbusAdapter::OnShutdown(int32_t sessionId, bool isSelfcal
peerDeviceId = sessions_[sessionId]->GetPeerDeviceId();
HILOGI("peerDeviceId: %{public}s shutdown, socket sessionId: %{public}d.",
GetAnonymStr(peerDeviceId).c_str(), sessionId);
Shutdown(sessionId);
ShutdownSession(peerDeviceId, sessionId);
sessions_.erase(sessionId);
}
#ifdef DMSFWK_ALL_CONNECT_MGR
int32_t ret = DSchedAllConnectManager::GetInstance().PublishServiceState(peerDeviceId, "", SCM_IDLE);
if (ret != ERR_OK) {
HILOGE("Publish connect idle state fail, peerDeviceId: %{public}s, socket sessionId: %{public}d.",
GetAnonymStr(peerDeviceId).c_str(), sessionId);
}
#endif
NotifyListenersSessionShutdown(sessionId, isSelfcalled);
}
@ -314,11 +331,10 @@ int32_t DSchedTransportSoftbusAdapter::ReleaseChannel()
{
std::lock_guard<std::mutex> sessionLock(sessionMutex_);
for (auto iter = sessions_.begin(); iter != sessions_.end(); iter++) {
if (iter->second != nullptr) {
HILOGI("shutdown client: %{public}s, socket sessionId: %{public}d.",
GetAnonymStr(iter->second->GetPeerDeviceId()).c_str(), iter->first);
}
Shutdown(iter->first);
std::string peerDeviceId = (iter->second != nullptr) ? iter->second->GetPeerDeviceId() : "";
HILOGI("shutdown client: %{public}s, socket sessionId: %{public}d.",
GetAnonymStr(peerDeviceId).c_str(), iter->first);
ShutdownSession(peerDeviceId, iter->first);
}
sessions_.clear();
}

View File

@ -405,17 +405,16 @@ HWTEST_F(DSchedTransportSoftbusAdapterTest, InitChannel_001, TestSize.Level3)
}
/**
* @tc.name: CreateSessionRecord_001
* @tc.desc: call CreateSessionRecord
* @tc.name: OnBind_001
* @tc.desc: call OnBind with get local
* @tc.type: FUNC
*/
HWTEST_F(DSchedTransportSoftbusAdapterTest, CreateSessionRecord_001, TestSize.Level3)
HWTEST_F(DSchedTransportSoftbusAdapterTest, OnBind_001, TestSize.Level3)
{
DTEST_LOG << "DSchedTransportSoftbusAdapterTest CreateSessionRecord_001 begin" << std::endl;
DTEST_LOG << "DSchedTransportSoftbusAdapterTest OnBind_001 begin" << std::endl;
int32_t sessionId = 0;
int32_t ret = DSchedTransportSoftbusAdapter::GetInstance().CreateSessionRecord(sessionId, PEERDEVICEID);
EXPECT_EQ(ret, -1);
DTEST_LOG << "DSchedTransportSoftbusAdapterTest CreateSessionRecord_001 end" << std::endl;
DSchedTransportSoftbusAdapter::GetInstance().OnBind(sessionId, PEERDEVICEID);
DTEST_LOG << "DSchedTransportSoftbusAdapterTest OnBind_001 end" << std::endl;
}
/**