Optimize all connect init and wait end

Signed-off-by: t00605578 <tongyuejiao@huawei.com>
This commit is contained in:
t00605578 2024-08-09 18:32:39 +08:00
parent 360c22962a
commit c95d4b0689
6 changed files with 35 additions and 16 deletions

View File

@ -467,7 +467,7 @@ enum {
*/
DMS_CONNECT_APPLY_REJECT_FAILED = 29360231,
/**
* Result(29360232) for all connect manager reject connect apply.
* Result(29360232) for all connect manager deal connect apply timeout.
*/
DMS_CONNECT_APPLY_TIMEOUT_FAILED = 29360232,
};

View File

@ -103,9 +103,11 @@ private:
std::map<DSchedContinueInfo, std::shared_ptr<DSchedContinue>> continues_;
std::mutex continueMutex_;
#ifdef DMSFWK_ALL_CONNECT_MGR
std::mutex connectDecisionMutex_;
std::condition_variable connectDecisionCond_;
std::map<std::string, std::atomic<bool>> peerConnectDecision_;
#endif
std::atomic<int32_t> cntSink_ {0};
std::atomic<int32_t> cntSource_ {0};

View File

@ -103,9 +103,11 @@ void DSchedContinueManager::NotifyAllConnectDecision(std::string peerDeviceId, b
{
HILOGI("Notify all connect decision, peerDeviceId %{public}s, isSupport %{public}d.",
GetAnonymStr(peerDeviceId).c_str(), isSupport);
#ifdef DMSFWK_ALL_CONNECT_MGR
std::lock_guard<std::mutex> decisionLock(connectDecisionMutex_);
peerConnectDecision_[peerDeviceId] = isSupport;
connectDecisionCond_.notify_all();
#endif
}
int32_t DSchedContinueManager::ContinueMission(const std::string& srcDeviceId, const std::string& dstDeviceId,
@ -266,6 +268,15 @@ void DSchedContinueManager::HandleContinueMissionWithBundleName(const DSchedCont
auto newContinue = std::make_shared<DSchedContinue>(subType, direction, callback, info);
newContinue->Init();
continues_.insert(std::make_pair(info, newContinue));
#ifdef DMSFWK_ALL_CONNECT_MGR
{
std::unique_lock<std::mutex> decisionLock(connectDecisionMutex_);
std::string peerDeviceId = direction == CONTINUE_SOURCE ? info.sinkDeviceId_ : info.sourceDeviceId_;
if (peerConnectDecision_.find(peerDeviceId) != peerConnectDecision_.end()) {
peerConnectDecision_.erase(peerDeviceId);
}
}
#endif
newContinue->OnContinueMission(wantParams);
}
WaitAllConnectDecision(direction, info, CONTINUE_TIMEOUT);
@ -287,6 +298,7 @@ void DSchedContinueManager::WaitAllConnectDecision(int32_t direction, const DSch
if (peerConnectDecision_.find(peerDeviceId) == peerConnectDecision_.end()) {
HILOGE("Not find peerDeviceId %{public}s in peerConnectDecision.", GetAnonymStr(peerDeviceId).c_str());
SetTimeOut(info, 0);
return;
}
if (!peerConnectDecision_.at(peerDeviceId).load()) {

View File

@ -197,7 +197,7 @@ void DistributedSchedStub::InitRemoteFuncsInner()
&DistributedSchedStub::NotifyStateChangedFromRemoteInner;
#ifdef DMSFWK_INTERACTIVE_ADAPTER
remoteFuncsMap_[static_cast<uint32_t>(IDSchedInterfaceCode::NOTIFY_STATE_CHANGED_FROM_REMOTE)] =
remoteFuncsMap_[static_cast<uint32_t>(IDSchedInterfaceCode::NOTIFY_ABILITY_LIFECYCLE_CHANGED_FROM_REMOTE)] =
&DistributedSchedStub::NotifyAbilityLifecycleChangedFromRemoteAdapterInner;
#endif

View File

@ -18,7 +18,6 @@
#include <dlfcn.h>
#include "distributed_sched_utils.h"
#include "dsched_continue_manager.h"
#include "dsched_transport_softbus_adapter.h"
#include "dtbschedmgr_log.h"
@ -283,7 +282,6 @@ int32_t DSchedAllConnectManager::ApplyResult(int32_t errorcode, int32_t result,
return ERR_OK;
}
std::string peerNetworkId = peerConnectCbQueue_.front();
DSchedContinueManager::GetInstance().NotifyAllConnectDecision(peerNetworkId, isSupport);
DSchedAllConnectManager::GetInstance().NotifyAllConnectDecision(peerNetworkId, isSupport);
peerConnectCbQueue_.pop();
return ERR_OK;

View File

@ -17,6 +17,7 @@
#include "distributed_sched_utils.h"
#include "dsched_all_connect_manager.h"
#include "dsched_continue_manager.h"
#include "dtbschedmgr_device_info_storage.h"
#include "dtbschedmgr_log.h"
#include "softbus_bus_center.h"
@ -73,24 +74,25 @@ DSchedTransportSoftbusAdapter::~DSchedTransportSoftbusAdapter()
int32_t DSchedTransportSoftbusAdapter::InitChannel()
{
HILOGI("start");
serverSocket_ = CreateServerSocket();
if (serverSocket_ <= 0) {
HILOGE("create socket failed, ret: %{public}d", serverSocket_);
return serverSocket_;
}
int32_t ret = Listen(serverSocket_, g_qosInfo, g_QosTV_Param_Index, &iSocketListener);
if (ret != ERR_OK) {
HILOGE("service listen failed, ret: %{public}d", ret);
return ret;
}
int32_t ret = ERR_OK;
#ifdef DMSFWK_ALL_CONNECT_MGR
ret = DSchedAllConnectManager::GetInstance().InitAllConnectManager();
if (ret != ERR_OK) {
HILOGE("Init all connect manager fail, ret: %{public}d.", ret);
}
#endif
serverSocket_ = CreateServerSocket();
if (serverSocket_ <= 0) {
HILOGE("create socket failed, ret: %{public}d", serverSocket_);
return serverSocket_;
}
ret = Listen(serverSocket_, g_qosInfo, g_QosTV_Param_Index, &iSocketListener);
if (ret != ERR_OK) {
HILOGE("service listen failed, ret: %{public}d", ret);
return ret;
}
HILOGI("end");
return ERR_OK;
}
@ -120,6 +122,9 @@ int32_t DSchedTransportSoftbusAdapter::ConnectDevice(const std::string &peerDevi
HILOGI("peer device already connected");
iter->second->OnConnect();
sessionId = iter->first;
#ifdef DMSFWK_ALL_CONNECT_MGR
DSchedContinueManager::GetInstance().NotifyAllConnectDecision(peerDeviceId, true);
#endif
return ERR_OK;
}
}
@ -134,8 +139,10 @@ int32_t DSchedTransportSoftbusAdapter::ConnectDevice(const std::string &peerDevi
if (ret != ERR_OK) {
HILOGE("Apply advance resource fail, ret: %{public}d.", ret);
sessionId = INVALID_SESSION_ID;
DSchedContinueManager::GetInstance().NotifyAllConnectDecision(peerDeviceId, false);
return ret;
}
DSchedContinueManager::GetInstance().NotifyAllConnectDecision(peerDeviceId, true);
ret = DSchedAllConnectManager::GetInstance().PublishServiceState(peerDeviceId, "", SCM_PREPARE);
if (ret != ERR_OK) {