add all connect for continue

Signed-off-by: t00605578 <tongyuejiao@huawei.com>
This commit is contained in:
t00605578 2024-07-07 15:39:37 +08:00
parent 85015a3264
commit ef12f5a71d
10 changed files with 590 additions and 15 deletions

View File

@ -462,6 +462,10 @@ enum {
* Result(29360230) for sink Ability abnormal termination during continuation.
*/
CONTINUE_SINK_ABILITY_TERMINATED = 29360230,
/**
* Result(29360231) for all connect manager reject connect apply.
*/
DMS_CONNECT_APPLY_REJECT_FAILED = 29360231,
};
} // namespace DistributedSchedule
} // namespace OHOS

View File

@ -0,0 +1,90 @@
/*
* Copyright (c) 2024 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef OHOS_IDMS_SRV_COLLABORATION_MGR_CAPI_H
#define OHOS_IDMS_SRV_COLLABORATION_MGR_CAPI_H
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#ifdef __cplusplus
extern "C" {
#endif
typedef enum ServiceCollaborationManagerHardwareType {
SCM_UNKNOWN_TYPE = 0,
SCM_DISPLAY = 1,
SCM_MIC = 2,
SCM_SPEAKER = 3,
SCM_CAMERA = 4,
} ServiceCollaborationManagerHardwareType;
typedef enum ServiceCollaborationManagerBussinessStatus {
SCM_IDLE = 1,
SCM_PREPARE = 2,
SCM_CONNECTING = 3,
SCM_CONNECTED = 4
} ServiceCollaborationManagerBussinessStatus;
typedef enum ServiceCollaborationManagerResultCode {
PASS = 1004720001,
REJECT = 1004720002
} ServiceCollaborationManagerResultCode;
typedef struct ServiceCollaborationManager_HardwareRequestInfo {
ServiceCollaborationManagerHardwareType hardWareType;
bool canShare;
} ServiceCollaborationManager_HardwareRequestInfo;
typedef struct ServiceCollaborationManager_CommunicationRequestInfo {
int32_t minBandwidth;
int32_t maxLatency;
int32_t minLatency;
int32_t maxWaitTime;
const char *dataType;
} ServiceCollaborationManager_CommunicationRequestInfo;
typedef struct ServiceCollaborationManager_ResourceRequestInfoSets {
uint32_t remoteHardwareListSize;
ServiceCollaborationManager_HardwareRequestInfo *remoteHardwareList;
uint32_t localHardwareListSize;
ServiceCollaborationManager_HardwareRequestInfo *localHardwareList;
ServiceCollaborationManager_CommunicationRequestInfo *communicationRequest;
} ServiceCollaborationManager_ResourceRequestInfoSets;
typedef struct ServiceCollaborationManager_Callback {
int32_t (*OnStop)(const char *peerNetworkId);
int32_t (*ApplyResult)(int32_t errorcode, int32_t result, const char *reason);
} ServiceCollaborationManager_Callback;
typedef struct ServiceCollaborationManager_API {
int32_t (*ServiceCollaborationManager_PublishServiceState)(const char *peerNetworkId, const char *serviceName,
const char *extraInfo, ServiceCollaborationManagerBussinessStatus state);
int32_t (*ServiceCollaborationManager_ApplyAdvancedResource)(const char *peerNetworkId, const char *serviceName,
ServiceCollaborationManager_ResourceRequestInfoSets *resourceRequest,
ServiceCollaborationManager_Callback *callback);
int32_t (*ServiceCollaborationManager_RegisterLifecycleCallback)(const char *serviceName,
ServiceCollaborationManager_Callback *callback);
int32_t (*ServiceCollaborationManager_UnRegisterLifecycleCallback)(const char *serviceName);
} ServiceCollaborationManager_API;
int32_t ServiceCollaborationManager_Export(ServiceCollaborationManager_API *exportapi);
#ifdef __cplusplus
}
#endif
#endif // OHOS_IDMS_SRV_COLLABORATION_MGR_CAPI_H

View File

@ -35,6 +35,7 @@ config("distributed_sched_config") {
"include/continue/state/sink_state",
"include/distributedWant",
"include/softbus_adapter/transport",
"include/softbus_adapter/allconnectmgr",
]
defines = []
if (dmsfwk_mission_manager) {
@ -77,6 +78,7 @@ ohos_shared_library("distributedschedsvr") {
cflags = [
"-DDMSFWK_SAME_ACCOUNT",
"-DDMSFWK_INTERACTIVE_ADAPTER",
"-DDMSFWK_ALL_CONNECT_MGR",
]
}
@ -130,6 +132,7 @@ ohos_shared_library("distributedschedsvr") {
"src/dms_token_callback.cpp",
"src/dms_version_manager.cpp",
"src/dtbschedmgr_device_info_storage.cpp",
"src/softbus_adapter/allconnectmgr/dsched_all_connect_manager.cpp",
"src/softbus_adapter/transport/dsched_data_buffer.cpp",
"src/softbus_adapter/transport/dsched_softbus_session.cpp",
"src/softbus_adapter/transport/dsched_transport_softbus_adapter.cpp",

View File

@ -51,9 +51,9 @@ public:
void Init();
void UnInit();
void NotifyAllConnectDecision(std::string peerDeviceId, bool isSupport);
void OnDataRecv(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer);
void OnShutdown(int32_t socket, bool isSelfCalled);
void OnBind();
int32_t GetContinueInfo(std::string &srcDeviceId, std::string &dstDeviceId);
std::shared_ptr<DSchedContinue> GetDSchedContinueByWant(const OHOS::AAFwk::Want& want, int32_t missionId);
@ -77,6 +77,7 @@ private:
void NotifyContinueDataRecv(int32_t sessionId, int32_t command, const std::string& jsonStr,
std::shared_ptr<DSchedDataBuffer> dataBuffer);
int32_t CheckContinuationLimit(const std::string& srcDeviceId, const std::string& dstDeviceId);
void WaitAllConnectDecision(int32_t direction, const DSchedContinueInfo &info, int32_t timeout);
void SetTimeOut(const DSchedContinueInfo& info, int32_t timeout);
void RemoveTimeout(const DSchedContinueInfo& info);
@ -87,6 +88,10 @@ private:
};
private:
#ifdef DMSFWK_ALL_CONNECT_MGR
static constexpr int32_t CONNECT_DECISION_WAIT_S = 60;
#endif
std::thread eventThread_;
std::condition_variable eventCon_;
std::mutex eventMutex_;
@ -96,6 +101,10 @@ private:
std::map<DSchedContinueInfo, std::shared_ptr<DSchedContinue>> continues_;
std::mutex continueMutex_;
std::mutex connectDecisionMutex_;
std::condition_variable connectDecisionCond_;
std::map<std::string, std::atomic<bool>> peerConnectDecision_;
std::atomic<int32_t> cntSink_ {0};
std::atomic<int32_t> cntSource_ {0};
};

View File

@ -0,0 +1,77 @@
/*
* Copyright (c) 2024 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef OHOS_DSCHED_ALL_CONNECT_MANAGER_H
#define OHOS_DSCHED_ALL_CONNECT_MANAGER_H
#include <map>
#include <mutex>
#include <queue>
#include <string>
#include "service_collaboration_manager_capi.h"
#include "single_instance.h"
namespace OHOS {
namespace DistributedSchedule {
class DSchedAllConnectManager {
DECLARE_SINGLE_INSTANCE_BASE(DSchedAllConnectManager);
public:
int32_t InitAllConnectManager();
int32_t UninitAllConnectManager();
int32_t PublishServiceState(const std::string &peerNetworkId, const std::string &extraInfo,
ServiceCollaborationManagerBussinessStatus state);
int32_t ApplyAdvanceResource(const std::string &peerNetworkId,
ServiceCollaborationManager_ResourceRequestInfoSets reqInfoSets);
void GetResourceRequest(ServiceCollaborationManager_ResourceRequestInfoSets &reqInfoSets);
private:
DSchedAllConnectManager() = default;
~DSchedAllConnectManager() = default;
int32_t GetServiceCollaborationManagerProxy();
int32_t RegistLifecycleCallback();
int32_t UnregistLifecycleCallback();
int32_t WaitAllConnectApplyCb(const std::string &peerNetworkId);
void NotifyAllConnectDecision(const std::string &peerNetworkId, bool isSupport);
static int32_t OnStop(const char *peerNetworkId);
static int32_t ApplyResult(int32_t errorcode, int32_t result, const char *reason);
private:
static constexpr int32_t DSCHED_QOS_TYPE_MIN_BW = 40 * 1024 * 1024;
static constexpr int32_t DSCHED_QOS_TYPE_MAX_LATENCY = 6000;
static constexpr int32_t DSCHED_QOS_TYPE_MIN_LATENCY = 1000;
static ServiceCollaborationManager_HardwareRequestInfo locReqInfo_;
static ServiceCollaborationManager_HardwareRequestInfo rmtReqInfo_;
static ServiceCollaborationManager_CommunicationRequestInfo communicationRequest_;
static std::queue<std::string> peerConnectCbQueue_;
const char *DMS_BIND_MGR_SRV_NAME = "TaskContinue";
std::mutex allConnectMgrLock_;
void *dllHandle_ = nullptr;
ServiceCollaborationManager_API allConnectMgrApi_ = {
.ServiceCollaborationManager_PublishServiceState = nullptr,
.ServiceCollaborationManager_ApplyAdvancedResource = nullptr,
.ServiceCollaborationManager_RegisterLifecycleCallback = nullptr,
.ServiceCollaborationManager_UnRegisterLifecycleCallback = nullptr,
};
std::mutex connectDecisionMutex_;
std::condition_variable connectDecisionCond_;
std::map<std::string, std::atomic<bool>> peerConnectDecision_;
};
} // namespace DistributedSchedule
} // namespace OHOS
#endif // OHOS_DSCHED_ALL_CONNECT_MANAGER_H

View File

@ -66,7 +66,7 @@ private:
~DSchedTransportSoftbusAdapter();
int32_t CreateServerSocket();
int32_t CreateClientSocket(const std::string &peerDeviceId);
int32_t AddClientSession(int32_t sessionId, std::string &peerDeviceId);
int32_t AddNewPeerSession(const std::string &peerDeviceId, int32_t &sessionId);
bool GetSessionIdByDeviceId(const std::string &peerDeviceId, int32_t &sessionId);
private:

View File

@ -15,8 +15,8 @@
#include "dsched_continue_manager.h"
#include <chrono>
#include <sys/prctl.h>
#include <map>
#include "cJSON.h"
@ -98,6 +98,15 @@ void DSchedContinueManager::UnInit()
HILOGI("UnInit end");
}
void DSchedContinueManager::NotifyAllConnectDecision(std::string peerDeviceId, bool isSupport)
{
HILOGI("Notify all connect decision, peerDeviceId %{public}s, isSupport %{public}d.",
GetAnonymStr(peerDeviceId).c_str(), isSupport);
std::lock_guard<std::mutex> decisionLock(connectDecisionMutex_);
peerConnectDecision_[peerDeviceId] = isSupport;
connectDecisionCond_.notify_all();
}
int32_t DSchedContinueManager::ContinueMission(const std::string& srcDeviceId, const std::string& dstDeviceId,
int32_t missionId, const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
{
@ -228,13 +237,41 @@ void DSchedContinueManager::HandleContinueMissionWithBundleName(const DSchedCont
auto newContinue = std::make_shared<DSchedContinue>(subType, direction, callback, info);
newContinue->Init();
continues_.insert(std::make_pair(info, newContinue));
SetTimeOut(info, CONTINUE_TIMEOUT);
newContinue->OnContinueMission(wantParams);
}
WaitAllConnectDecision(direction, info, CONTINUE_TIMEOUT);
HILOGI("end, subType: %{public}d dirction: %{public}d, continue info: %{public}s",
subType, direction, info.toString().c_str());
}
void DSchedContinueManager::WaitAllConnectDecision(int32_t direction, const DSchedContinueInfo &info, int32_t timeout)
{
#ifdef DMSFWK_ALL_CONNECT_MGR
std::string peerDeviceId = direction == CONTINUE_SOURCE ? info.sinkDeviceId_ : info.sourceDeviceId_;
{
std::unique_lock<std::mutex> decisionLock(connectDecisionMutex_);
connectDecisionCond_.wait_for(decisionLock, std::chrono::seconds(CONNECT_DECISION_WAIT_S),
[this, peerDeviceId]() {
return peerConnectDecision_.find(peerDeviceId) != peerConnectDecision_.end() &&
peerConnectDecision_.at(peerDeviceId).load();
});
if (peerConnectDecision_.find(peerDeviceId) == peerConnectDecision_.end()) {
HILOGE("Not find peerDeviceId %{public}s in peerConnectDecision.", GetAnonymStr(peerDeviceId).c_str());
return;
}
if (!peerConnectDecision_.at(peerDeviceId).load()) {
HILOGE("All connect manager refuse bind to PeerDeviceId %{public}s.", GetAnonymStr(peerDeviceId).c_str());
peerConnectDecision_.erase(peerDeviceId);
SetTimeOut(info, 0);
return;
}
peerConnectDecision_.erase(peerDeviceId);
}
#endif
SetTimeOut(info, CONTINUE_TIMEOUT);
}
void DSchedContinueManager::SetTimeOut(const DSchedContinueInfo &info, int32_t timeout)
{
auto func = [this, info]() {
@ -249,7 +286,8 @@ void DSchedContinueManager::SetTimeOut(const DSchedContinueInfo &info, int32_t t
HILOGE("eventHandler_ is nullptr");
return;
}
eventHandler_->PostTask(func, info.ToStringIgnoreMissionId(), timeout);
timeout > 0 ? eventHandler_->PostTask(func, info.ToStringIgnoreMissionId(), timeout) :
eventHandler_->PostTask(func);
}
int32_t DSchedContinueManager::StartContinuation(const OHOS::AAFwk::Want& want, int32_t missionId,

View File

@ -29,7 +29,6 @@
#include "parcel_helper.h"
#include "softbus_adapter/softbus_adapter.h"
#include "switch_status_dependency.h"
#include "dsched_continue_manager.h"
#include "mission/dms_continue_recv_manager.h"
#include "mission/wifi_state_adapter.h"

View File

@ -0,0 +1,289 @@
/*
* Copyright (c) 2024 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "dsched_all_connect_manager.h"
#include <dlfcn.h>
#include "distributed_sched_utils.h"
#include "dsched_continue_manager.h"
#include "dsched_transport_softbus_adapter.h"
#include "dtbschedmgr_log.h"
namespace OHOS {
namespace DistributedSchedule {
namespace {
const std::string TAG = "DSchedAllConnectManager";
constexpr int32_t CONNECT_DECISION_WAIT_S = 60;
}
IMPLEMENT_SINGLE_INSTANCE(DSchedAllConnectManager);
ServiceCollaborationManager_HardwareRequestInfo DSchedAllConnectManager::locReqInfo_ = {
.hardWareType = ServiceCollaborationManagerHardwareType::SCM_DISPLAY,
.canShare = true,
};
ServiceCollaborationManager_HardwareRequestInfo DSchedAllConnectManager::rmtReqInfo_ = {
.hardWareType = ServiceCollaborationManagerHardwareType::SCM_DISPLAY,
.canShare = true,
};
ServiceCollaborationManager_CommunicationRequestInfo DSchedAllConnectManager::communicationRequest_ = {
.minBandwidth = DSCHED_QOS_TYPE_MIN_BW,
.maxLatency = DSCHED_QOS_TYPE_MAX_LATENCY,
.minLatency = DSCHED_QOS_TYPE_MIN_LATENCY,
.maxWaitTime = 0,
.dataType = "DATA_TYPE_FILE",
};
std::queue<std::string> DSchedAllConnectManager::peerConnectCbQueue_;
int32_t DSchedAllConnectManager::InitAllConnectManager()
{
HILOGI("Init bind manager.");
int32_t ret = GetServiceCollaborationManagerProxy();
if (ret != ERR_OK) {
HILOGE("GetServiceCollaborationManagerProxy fail, ret %{public}d.", ret);
return ret;
}
ret = RegistLifecycleCallback();
if (ret != ERR_OK) {
HILOGE("Regist lifecycle callback fail, ret %{public}d.", ret);
return ret;
}
return ERR_OK;
}
int32_t DSchedAllConnectManager::UninitAllConnectManager()
{
HILOGI("Uninit bind manager enter.");
int32_t ret = UnregistLifecycleCallback();
if (ret != ERR_OK) {
HILOGE("Unregist lifecycle callback fail, ret %{public}d.", ret);
}
dlclose(dllHandle_);
dllHandle_ = nullptr;
allConnectMgrApi_ = {
.ServiceCollaborationManager_PublishServiceState = nullptr,
.ServiceCollaborationManager_ApplyAdvancedResource = nullptr,
.ServiceCollaborationManager_RegisterLifecycleCallback = nullptr,
.ServiceCollaborationManager_UnRegisterLifecycleCallback = nullptr,
};
return ERR_OK;
}
int32_t DSchedAllConnectManager::GetServiceCollaborationManagerProxy()
{
HILOGI("Get service collaboration manager proxy for all connect.");
std::lock_guard<std::mutex> autoLock(allConnectMgrLock_);
#if (defined(__aarch64__) || defined(__x86_64__))
std::string resolvedPath = "/system/lib64/libcfwk_allconnect_client.z.so";
#else
std::string resolvedPath = "/system/lib/libcfwk_allconnect_client.z.so";
#endif
char path[PATH_MAX + 1] = {0};
if (resolvedPath.empty() || resolvedPath.length() > PATH_MAX || realpath(resolvedPath.c_str(), path) == nullptr) {
HILOGE("Check all connect so real path failed, resolvedPath [%{public}s].", resolvedPath.c_str());
return INVALID_PARAMETERS_ERR;
}
int32_t (*ServiceCollaborationManagerExport)(ServiceCollaborationManager_API *exportapi) = nullptr;
dllHandle_ = dlopen(resolvedPath.c_str(), RTLD_LAZY);
if (dllHandle_ == nullptr) {
HILOGE("Open dms interactive adapter shared object fail, resolvedPath [%{public}s].", resolvedPath.c_str());
return NOT_FIND_SERVICE_REGISTRY;
}
int32_t ret = ERR_OK;
do {
ServiceCollaborationManagerExport =
reinterpret_cast<int32_t (*)(ServiceCollaborationManager_API *exportapi)>(
dlsym(dllHandle_, "ServiceCollaborationManager_Export"));
if (ServiceCollaborationManagerExport == nullptr) {
HILOGE("Link the ServiceCollaborationManagerExport symbol in dms interactive adapter fail.");
ret = NOT_FIND_SERVICE_REGISTRY;
break;
}
int32_t ret = ServiceCollaborationManagerExport(&allConnectMgrApi_);
if (ret != ERR_OK) {
HILOGE("Init remote dms interactive adapter proxy fail, ret %{public}d.", ret);
ret = INVALID_PARAMETERS_ERR;
break;
}
HILOGI("Init remote dms interactive adapter proxy success.");
ret = ERR_OK;
} while (false);
if (ret != ERR_OK) {
HILOGE("Get remote dms interactive adapter proxy fail, dlclose handle.");
dlclose(dllHandle_);
dllHandle_ = nullptr;
}
return ret;
}
int32_t DSchedAllConnectManager::RegistLifecycleCallback()
{
std::lock_guard<std::mutex> autoLock(allConnectMgrLock_);
HILOGI("Regist lifecycle callback.");
if (allConnectMgrApi_.ServiceCollaborationManager_RegisterLifecycleCallback == nullptr) {
HILOGE("Dms all connect manager RegisterLifecycleCallback api is null.");
return INVALID_PARAMETERS_ERR;
}
ServiceCollaborationManager_Callback bindMgrRegLfCB = {
.OnStop = &DSchedAllConnectManager::OnStop,
.ApplyResult = &DSchedAllConnectManager::ApplyResult,
};
int32_t ret = allConnectMgrApi_.ServiceCollaborationManager_RegisterLifecycleCallback(
DMS_BIND_MGR_SRV_NAME, &bindMgrRegLfCB);
if (ret != ERR_OK) {
HILOGE("Dms all connect manager regist lifecycle callback fail, ret %{public}d.", ret);
}
return ret;
}
int32_t DSchedAllConnectManager::UnregistLifecycleCallback()
{
std::lock_guard<std::mutex> autoLock(allConnectMgrLock_);
HILOGI("Unregist lifecycle callback.");
if (allConnectMgrApi_.ServiceCollaborationManager_UnRegisterLifecycleCallback == nullptr) {
HILOGE("Dms all connect manager UnRegisterLifecycleCallback api is null.");
return INVALID_PARAMETERS_ERR;
}
int32_t ret = allConnectMgrApi_.ServiceCollaborationManager_UnRegisterLifecycleCallback(
DMS_BIND_MGR_SRV_NAME);
if (ret != ERR_OK) {
HILOGE("Dms all connect manager unregist lifecycle callback fail, ret %{public}d.", ret);
}
return ret;
}
int32_t DSchedAllConnectManager::PublishServiceState(const std::string &peerNetworkId, const std::string &extraInfo,
ServiceCollaborationManagerBussinessStatus state)
{
std::lock_guard<std::mutex> autoLock(allConnectMgrLock_);
HILOGI("Publish service state enter, peerNetworkId %{public}s, extraInfo %{public}s, state %{public}d.",
GetAnonymStr(peerNetworkId).c_str(), extraInfo.c_str(), state);
if (allConnectMgrApi_.ServiceCollaborationManager_PublishServiceState == nullptr) {
HILOGE("Dms all connect manager PublishServiceState api is null.");
return INVALID_PARAMETERS_ERR;
}
int32_t ret = allConnectMgrApi_.ServiceCollaborationManager_PublishServiceState(peerNetworkId.c_str(),
DMS_BIND_MGR_SRV_NAME, extraInfo.c_str(), state);
if (ret != ERR_OK) {
HILOGE("Dms all connect manager publish service state fail, ret %{public}d.", ret);
}
return ret;
}
int32_t DSchedAllConnectManager::ApplyAdvanceResource(const std::string &peerNetworkId,
ServiceCollaborationManager_ResourceRequestInfoSets reqInfoSets)
{
int32_t ret = ERR_OK;
{
std::lock_guard<std::mutex> autoLock(allConnectMgrLock_);
HILOGI("Apply advance resource enter, peerNetworkId %{public}s.", GetAnonymStr(peerNetworkId).c_str());
if (allConnectMgrApi_.ServiceCollaborationManager_ApplyAdvancedResource == nullptr) {
HILOGE("Dms all connect manager ApplyAdvancedResource api is null.");
return ERR_OK;
}
peerConnectCbQueue_.push(peerNetworkId);
ServiceCollaborationManager_Callback applyScmCbApi = {
.OnStop = &DSchedAllConnectManager::OnStop,
.ApplyResult = &DSchedAllConnectManager::ApplyResult,
};
ret = allConnectMgrApi_.ServiceCollaborationManager_ApplyAdvancedResource(peerNetworkId.c_str(),
DMS_BIND_MGR_SRV_NAME, &reqInfoSets, &applyScmCbApi);
if (ret != ERR_OK) {
HILOGE("Dms all connect manager apply advanced resource fail, ret %{public}d.", ret);
return ret;
}
}
ret = WaitAllConnectApplyCb(peerNetworkId);
if (ret != ERR_OK) {
HILOGE("Wait all connect manager apply callback fail, ret %{public}d.", ret);
}
return ret;
}
int32_t DSchedAllConnectManager::WaitAllConnectApplyCb(const std::string &peerNetworkId)
{
std::unique_lock<std::mutex> decisionLock(connectDecisionMutex_);
connectDecisionCond_.wait_for(decisionLock, std::chrono::seconds(CONNECT_DECISION_WAIT_S),
[this, peerNetworkId]() {
return peerConnectDecision_.find(peerNetworkId) != peerConnectDecision_.end();
});
if (peerConnectDecision_.find(peerNetworkId) == peerConnectDecision_.end()) {
HILOGE("Not find peerNetworkId %{public}s in peerConnectDecision.", GetAnonymStr(peerNetworkId).c_str());
return CONTINUE_ABILITY_TIMEOUT_ERR;
}
int32_t ret = peerConnectDecision_.at(peerNetworkId).load() ? ERR_OK : DMS_CONNECT_APPLY_REJECT_FAILED;
HILOGI("Wait all connect apply decision callback end, peerNetworkId %{public}s, isSupport %{public}d.",
GetAnonymStr(peerNetworkId).c_str(), peerConnectDecision_.at(peerNetworkId).load());
peerConnectDecision_.erase(peerNetworkId);
return ret;
}
void DSchedAllConnectManager::NotifyAllConnectDecision(const std::string &peerNetworkId, bool isSupport)
{
HILOGI("Notify all connect decision, peerNetworkId %{public}s, isSupport %{public}d.",
GetAnonymStr(peerNetworkId).c_str(), isSupport);
std::lock_guard<std::mutex> decisionLock(connectDecisionMutex_);
peerConnectDecision_[peerNetworkId] = isSupport;
connectDecisionCond_.notify_all();
}
void DSchedAllConnectManager::GetResourceRequest(ServiceCollaborationManager_ResourceRequestInfoSets &reqInfoSets)
{
reqInfoSets.remoteHardwareListSize = 1;
reqInfoSets.remoteHardwareList = &rmtReqInfo_;
reqInfoSets.localHardwareListSize = 1;
reqInfoSets.localHardwareList = &locReqInfo_;
reqInfoSets.communicationRequest = &communicationRequest_;
}
int32_t DSchedAllConnectManager::OnStop(const char *peerNetworkId)
{
HILOGI("OnStop, when other task prepare to seize bind, disconnect DMS bind with peerNetworkId %{public}s.",
GetAnonymStr(peerNetworkId).c_str());
DSchedTransportSoftbusAdapter::GetInstance().DisconnectDevice(peerNetworkId);
return ERR_OK;
}
int32_t DSchedAllConnectManager::ApplyResult(int32_t errorcode, int32_t result, const char *reason)
{
HILOGI("Apply result start, errorcode %{public}d, result %{public}s, reason %{public}s.",
errorcode, result == ServiceCollaborationManagerResultCode::PASS ? "PASS" : "REJECT", reason);
bool isSupport = result == ServiceCollaborationManagerResultCode::PASS ? true : false;
if (peerConnectCbQueue_.empty()) {
HILOGE("Apply result start, peerConnectCbQueue is empty.");
return ERR_OK;
}
std::string peerNetworkId = peerConnectCbQueue_.front();
DSchedContinueManager::GetInstance().NotifyAllConnectDecision(peerNetworkId, isSupport);
DSchedAllConnectManager::GetInstance().NotifyAllConnectDecision(peerNetworkId, isSupport);
peerConnectCbQueue_.pop();
return ERR_OK;
}
} // namespace DistributedSchedule
} // namespace OHOS

View File

@ -16,6 +16,7 @@
#include "dsched_transport_softbus_adapter.h"
#include "distributed_sched_utils.h"
#include "dsched_all_connect_manager.h"
#include "dtbschedmgr_device_info_storage.h"
#include "dtbschedmgr_log.h"
#include "softbus_bus_center.h"
@ -83,6 +84,13 @@ int32_t DSchedTransportSoftbusAdapter::InitChannel()
HILOGE("service listen failed, ret: %{public}d", ret);
return ret;
}
#ifdef DMSFWK_ALL_CONNECT_MGR
ret = DSchedAllConnectManager::GetInstance().InitAllConnectManager();
if (ret != ERR_OK) {
HILOGE("Init all connect manager fail, ret: %{public}d.", ret);
}
#endif
HILOGI("end");
return ERR_OK;
}
@ -117,13 +125,43 @@ int32_t DSchedTransportSoftbusAdapter::ConnectDevice(const std::string &peerDevi
}
}
int32_t sessionId = CreateClientSocket(peerDeviceId);
int32_t ret = ERR_OK;
#ifdef DMSFWK_ALL_CONNECT_MGR
ServiceCollaborationManager_ResourceRequestInfoSets reqInfoSets;
DSchedAllConnectManager::GetInstance().GetResourceRequest(reqInfoSets);
ret = DSchedAllConnectManager::GetInstance().ApplyAdvanceResource(peerDeviceId, reqInfoSets);
if (ret != ERR_OK) {
HILOGE("Apply advance resource fail, ret: %{public}d.", ret);
return INVALID_SESSION_ID;
}
#endif
int32_t sessionId = INVALID_SESSION_ID;
ret = AddNewPeerSession(peerDeviceId, sessionId);
if (ret != ERR_OK || sessionId <= 0) {
HILOGE("Add new peer connect session fail, ret: %{public}d, sessionId: %{public}d.", ret, sessionId);
}
return sessionId;
}
int32_t DSchedTransportSoftbusAdapter::AddNewPeerSession(const std::string &peerDeviceId, int32_t &sessionId)
{
int32_t ret = ERR_OK;
#ifdef DMSFWK_ALL_CONNECT_MGR
ret = DSchedAllConnectManager::GetInstance().PublishServiceState(peerDeviceId, "", SCM_PREPARE);
if (ret != ERR_OK) {
HILOGE("Publish connect idle state fail, peerDeviceId: %{public}s, socket sessionId: %{public}d.",
GetAnonymStr(peerDeviceId).c_str(), sessionId);
}
#endif
sessionId = CreateClientSocket(peerDeviceId);
if (sessionId <= 0) {
HILOGE("create socket failed, ret: %{public}d", sessionId);
return sessionId;
HILOGE("create socket failed, sessionId: %{public}d.", sessionId);
return REMOTE_DEVICE_BIND_ABILITY_ERR;
}
int32_t ret = SetFirstCallerTokenID(callingTokenId_);
ret = SetFirstCallerTokenID(callingTokenId_);
HILOGD("SetFirstCallerTokenID callingTokenId: %{public}d, ret: %{public}d", callingTokenId_, ret);
callingTokenId_ = 0;
@ -133,13 +171,16 @@ int32_t DSchedTransportSoftbusAdapter::ConnectDevice(const std::string &peerDevi
if (ret != ERR_OK) {
HILOGE("client bind failed, ret: %{public}d", ret);
Shutdown(sessionId);
return INVALID_SESSION_ID;
sessionId = INVALID_SESSION_ID;
return REMOTE_DEVICE_BIND_ABILITY_ERR;
}
std::string localDeviceId;
if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)) {
HILOGE("GetLocalDeviceId failed");
Shutdown(sessionId);
return INVALID_SESSION_ID;
sessionId = INVALID_SESSION_ID;
return GET_LOCAL_DEVICE_ERR;
}
{
std::lock_guard<std::mutex> sessionLock(sessionMutex_);
@ -147,7 +188,14 @@ int32_t DSchedTransportSoftbusAdapter::ConnectDevice(const std::string &peerDevi
auto session = std::make_shared<DSchedSoftbusSession>(info);
sessions_[sessionId] = session;
}
return sessionId;
#ifdef DMSFWK_ALL_CONNECT_MGR
ret = DSchedAllConnectManager::GetInstance().PublishServiceState(peerDeviceId, "", SCM_CONNECTED);
if (ret != ERR_OK) {
HILOGE("Publish connect idle state fail, peerDeviceId: %{public}s, socket sessionId: %{public}d.",
GetAnonymStr(peerDeviceId).c_str(), sessionId);
}
#endif
return ERR_OK;
}
int32_t DSchedTransportSoftbusAdapter::CreateClientSocket(const std::string &peerDeviceId)
@ -219,17 +267,28 @@ bool DSchedTransportSoftbusAdapter::GetSessionIdByDeviceId(const std::string &pe
void DSchedTransportSoftbusAdapter::OnShutdown(int32_t sessionId, bool isSelfcalled)
{
std::string peerDeviceId;
{
std::lock_guard<std::mutex> sessionLock(sessionMutex_);
if (sessions_.empty() || sessions_.count(sessionId) == 0) {
HILOGE("error, invalid sessionId %{public}d", sessionId);
return;
}
HILOGI("peer %{public}s shutdown, socket sessionId: %{public}d.",
GetAnonymStr(sessions_[sessionId]->GetPeerDeviceId()).c_str(), sessionId);
peerDeviceId = sessions_[sessionId]->GetPeerDeviceId();
HILOGI("peerDeviceId: %{public}s shutdown, socket sessionId: %{public}d.",
GetAnonymStr(peerDeviceId).c_str(), sessionId);
Shutdown(sessionId);
sessions_.erase(sessionId);
}
#ifdef DMSFWK_ALL_CONNECT_MGR
int32_t ret = DSchedAllConnectManager::GetInstance().PublishServiceState(peerDeviceId, "", SCM_IDLE);
if (ret != ERR_OK) {
HILOGE("Publish connect idle state fail, peerDeviceId: %{public}s, socket sessionId: %{public}d.",
GetAnonymStr(peerDeviceId).c_str(), sessionId);
}
#endif
NotifyListenersSessionShutdown(sessionId, isSelfcalled);
}
@ -264,6 +323,13 @@ int32_t DSchedTransportSoftbusAdapter::ReleaseChannel()
HILOGI("shutdown server, socket session id: %{public}d", serverSocket_);
Shutdown(serverSocket_);
serverSocket_ = 0;
#ifdef DMSFWK_ALL_CONNECT_MGR
int32_t ret = DSchedAllConnectManager::GetInstance().UninitAllConnectManager();
if (ret != ERR_OK) {
HILOGE("Uninit all connect manager fail, ret: %{public}d.", ret);
}
#endif
return ERR_OK;
}