Merge pull request !1270 from m30043719/master
This commit is contained in:
openharmony_ci 2025-01-06 02:50:35 +00:00 committed by Gitee
commit bbec9b363b
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
21 changed files with 198 additions and 116 deletions

View File

@ -29,7 +29,8 @@
"dmsfwk_check_wifi",
"dmsfwk_recv_broadcast",
"dmsfwk_use_screenlock_icon_holdon",
"dmsfwk_sync_data_on_package_event"
"dmsfwk_sync_data_on_package_event",
"dmsfwk_all_connect_decisions"
],
"hisysevent_config": [
"//foundation/ability/dmsfwk/hisysevent.yaml"

View File

@ -574,9 +574,13 @@ enum {
*/
COLLAB_STATE_MACHINE_INVALID_STATE = 29360339,
/**
* Result(29360340) for DistributedSched Service Continue Ability Timeout Error.
* Result(29360340) for DistributedSched Service Collab Ability Timeout Error.
*/
COLLAB_ABILITY_TIMEOUT_ERR = 29360340,
/**
* Result(29360341) for DistributedSched Service Collab Ability Reject Error.
*/
COLLAB_ABILITY_REJECT_ERR = 29360341,
};
} // namespace DistributedSchedule
} // namespace OHOS

View File

@ -34,6 +34,7 @@ declare_args() {
dmsfwk_av_trans_stream_debug = false
dmsfwk_av_trans_pixel_map_debug = false
dmsfwk_continuous_task_enable = false
dmsfwk_all_connect_decisions = false
if (defined(global_parts_info)) {
if (defined(global_parts_info.multimedia_image_framework)) {
dmsfwk_mission_manager = true

View File

@ -34,15 +34,13 @@ public:
int32_t NotifyPrepareResult(const std::string& token, int32_t result, int32_t sessionId,
const std::string& serverSocketName);
int32_t NotifyCloseCollabSession(const std::string& token);
int32_t GetPeerSocketName(const std::string& token, std::string& peerSocketName);
int32_t NotifyRejectReason(const std::string& token, const std::string& reason);
enum {
COLLAB_MESSION = 330,
NOTIFY_PREPARE_RESULT = 331,
GET_SOURCE_SOCKET_NAME = 332,
NOTIFY_REJECT_REASON = 332,
BNOTIFY_CLOSE_COLLAB_SESSION = 333,
NOTIFY_REJECT_REASON = 335,
};
private:
sptr<IRemoteObject> GetDmsProxy();

View File

@ -65,33 +65,6 @@ int32_t DistributedClient::CollabMission(int32_t sessionId, const std::string& s
PARCEL_TRANSACT_SYNC_RET_INT(remote, COLLAB_MESSION, data, reply);
}
int32_t DistributedClient::GetPeerSocketName(const std::string& token, std::string& peerSocketName)
{
HILOGD("called.");
sptr<IRemoteObject> remote = GetDmsProxy();
if (remote == nullptr) {
HILOGW("remote is nullptr");
return INVALID_PARAMETERS_ERR;
}
MessageParcel data;
if (!data.WriteInterfaceToken(DMS_PROXY_INTERFACE_TOKEN)) {
HILOGW("write token failed");
return ERR_FLATTEN_OBJECT;
}
PARCEL_WRITE_HELPER(data, String, token);
MessageParcel reply;
MessageOption option;
int32_t ret = remote->SendRequest(GET_SOURCE_SOCKET_NAME, data, reply, option);
if (ret != ERR_NONE) {
HILOGW("error: %{public}d", ret);
return ret;
}
peerSocketName = reply.ReadString();
return ret;
}
int32_t DistributedClient::NotifyPrepareResult(const std::string& token, int32_t result,
int32_t sessionId, const std::string& serverSocketName)
{

View File

@ -88,6 +88,9 @@ config("distributed_sched_config") {
if (dmsfwk_continuous_task_enable) {
defines += [ "BGTASKMGR_CONTINUOUS_TASK_ENABLE" ]
}
if (dmsfwk_all_connect_decisions) {
defines += [ "COLLAB_ALL_CONNECT_DECISIONS" ]
}
}
ohos_shared_library("distributedschedsvr") {

View File

@ -75,17 +75,17 @@ struct CollabMessage : public Parcelable {
};
struct ConnectOpt : public Parcelable {
bool needStream_ = false;
bool needData_ = false;
bool needKeepLongAlive_ = false;
bool needSendBigData_ = false;
bool needSendStream_ = false;
bool needRecvStream_ = false;
AAFwk::WantParams startParams_;
AAFwk::WantParams messageParams_;
bool ReadFromParcel(Parcel &parcel)
{
needStream_ = parcel.ReadBool();
needData_ = parcel.ReadBool();
needKeepLongAlive_ = parcel.ReadBool();
needSendBigData_ = parcel.ReadBool();
needSendStream_ = parcel.ReadBool();
needRecvStream_ = parcel.ReadBool();
std::shared_ptr<AAFwk::WantParams> startParamsPtr(parcel.ReadParcelable<AAFwk::WantParams>());
if (startParamsPtr == nullptr) {
return false;
@ -136,9 +136,9 @@ public:
"srcAppVersion: " + std::to_string(this->srcAppVersion_) + " " +
"srcCollabSessionId: " + std::to_string(this->srcCollabSessionId_) + " " +
"collabToken: " + GetAnonymStr(this->collabToken_) + " " +
"needStream: " + std::to_string(this->srcOpt_.needStream_) + " " +
"needData: " + std::to_string(this->srcOpt_.needData_) + " " +
"needKeepLongAlive: " + std::to_string(this->srcOpt_.needKeepLongAlive_) + " " +
"needSendBigData: " + std::to_string(this->srcOpt_.needSendBigData_) + " " +
"needSendStream: " + std::to_string(this->srcOpt_.needSendStream_) + " " +
"needRecvStream: " + std::to_string(this->srcOpt_.needRecvStream_) + " " +
"srcDevId: " + GetAnonymStr(this->srcInfo_.deviceId_) + " " +
"srcBundle: " + this->srcInfo_.bundleName_ + " " +
"srcAbility: " + this->srcInfo_.abilityName_ + " " +
@ -209,24 +209,27 @@ private:
const std::string &socketName, const sptr<IRemoteObject> &clientCB);
int32_t PostSrcResultTask(std::shared_ptr<NotifyResultCmd> replyCmd);
int32_t PostErrEndTask(const int32_t &result);
int32_t PostAbilityRejectTask(const std::string &reason);
int32_t PostEndTask();
int32_t ExeSrcStart();
int32_t ExeStartAbility();
int32_t ExeAbilityRejectError(const std::string &reason);
int32_t ExeSinkPrepareResult(const int32_t &result);
int32_t ExeSrcCollabResult(const int32_t &result);
int32_t ExeSrcCollabResult(const int32_t &result, const std::string reason = "");
int32_t ExeSrcStartError(const int32_t &result);
int32_t ExeSrcWaitResultError(const int32_t &result);
int32_t ExeSinkStartError(const int32_t &result);
int32_t ExeSinkConnectError(const int32_t &result);
int32_t ExeSinkError(const int32_t &result);
int32_t ExeDisconnect();
int32_t ExeSrcClientNotify(const int32_t &result);
int32_t ExeSrcClientNotify(const int32_t &result, const std::string reason = "");
int32_t ExeClientDisconnectNotify();
int32_t PackStartCmd(std::shared_ptr<SinkStartCmd>& cmd);
int32_t PackPartCmd(std::shared_ptr<SinkStartCmd>& cmd);
int32_t PackNotifyResultCmd(std::shared_ptr<NotifyResultCmd> cmd, int32_t result);
int32_t PackNotifyResultCmd(std::shared_ptr<NotifyResultCmd> cmd, const int32_t &result,
const std::string &abilityRejectReason = "");
int32_t PackDisconnectCmd(std::shared_ptr<DisconnectCmd> cmd);
int32_t SendCommand(std::shared_ptr<BaseCmd> cmd);

View File

@ -33,6 +33,7 @@ typedef enum {
NOTIFY_PREPARE_RESULT_EVENT = 3,
ERR_END_EVENT = 4,
END_EVENT = 5,
ABILITY_REJECT_EVENT = 6,
} DSchedCollabEventType;
typedef enum {
@ -50,9 +51,9 @@ public:
virtual int32_t Marshal(std::string &jsonStr);
virtual int32_t Unmarshal(const std::string &jsonStr);
public:
bool needStream_ = false;
bool needData_ = false;
bool needKeepLongAlive_ = false;
bool needSendBigData_ = false;
bool needSendStream_ = false;
bool needRecvStream_ = false;
int32_t collabVersion_ = -1;
int32_t dmsVersion_ = -1;
int32_t command_ = -1;
@ -106,6 +107,7 @@ public:
int32_t result_ = -1;
int32_t sinkCollabSessionId_ = -1;
std::string sinkSocketName_;
std::string abilityRejectReason_;
};
class DisconnectCmd : public BaseCmd {

View File

@ -44,13 +44,13 @@ public:
const int32_t &sinkPid, const int32_t &sinkUid, const int32_t &sinkAccessTokenId);
int32_t NotifySinkPrepareResult(const std::string &collabToken, const int32_t &result,
const int32_t &collabSessionId, const std::string &socketName, const sptr<IRemoteObject> &clientCB);
int32_t NotifySinkRejectReason(const std::string& collabToken, const std::string& reason);
int32_t NotifyAbilityDied(const std::string &bundleName, const int32_t &pid);
int32_t NotifySessionClose(const std::string &collabToken);
int32_t CleanUpSession(const std::string &collabToken);
int32_t CheckCollabRelation(CollabInfo sourceInfo, CollabInfo sinkInfo);
int32_t ReleaseAbilityLink(const std::string &bundleName, const int32_t &pid);
int32_t CancleReleaseAbilityLink(const std::string &bundleName, const int32_t &pid);
std::string GetSrcSocketName(const std::string& collabToken);
void Init();
void UnInit();
@ -69,7 +69,7 @@ private:
void HandleReleaseAbilityLink(const std::string &bundleName, const int32_t &pid);
void HandleDataRecv(const int32_t &softbusSessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer);
void NotifyDataRecv(const int32_t &softbusSessionId, int32_t command, const std::string& jsonStr,
std::shared_ptr<DSchedDataBuffer> dataBuffer);
std::shared_ptr<DSchedDataBuffer> dataBuffer, const std::string& collabToken);
void WaitAllConnectDecision(const std::string &peerDeviceId, const std::shared_ptr<DSchedCollab> &dCollab);
void SetTimeOut(const std::string &collabToken, int32_t timeout);
void RemoveTimeout(const std::string &collabToken);

View File

@ -32,6 +32,7 @@ public:
private:
int32_t DoSinkPrepareResult(std::shared_ptr<DSchedCollab> dCollab, const AppExecFwk::InnerEvent::Pointer &event);
int32_t DoAbilityRejectError(std::shared_ptr<DSchedCollab> dCollab, const AppExecFwk::InnerEvent::Pointer &event);
int32_t DoConnectError(std::shared_ptr<DSchedCollab> dCollab, const AppExecFwk::InnerEvent::Pointer &event);
private:

View File

@ -32,6 +32,7 @@ public:
private:
int32_t DoSrcResultNotifyTask(std::shared_ptr<DSchedCollab> dCollab, const AppExecFwk::InnerEvent::Pointer &event);
int32_t DoAbilityRejectTask(std::shared_ptr<DSchedCollab> dCollab, const AppExecFwk::InnerEvent::Pointer &event);
int32_t DoSrcWaitResultError(std::shared_ptr<DSchedCollab> dCollab, const AppExecFwk::InnerEvent::Pointer &event);
private:

View File

@ -132,7 +132,7 @@ private:
int32_t NotifyStartAbilityResultInner(MessageParcel& data, MessageParcel& reply);
int32_t NotifyCollabPrepareResultInner(MessageParcel& data, MessageParcel& reply);
int32_t NotifyCloseCollabSessionInner(MessageParcel& data, MessageParcel& reply);
int32_t GetSrcSocketNameInner(MessageParcel& data, MessageParcel& reply);
int32_t NotifyRejectReason(MessageParcel& data, MessageParcel& reply);
private:
using DistributedSchedFunc = int32_t(DistributedSchedStub::*)(MessageParcel& data, MessageParcel& reply);

View File

@ -104,7 +104,7 @@ enum class IDSchedInterfaceCode : uint32_t {
// request code for collab
COLLAB_MISSION = 330,
NOTIFY_COLLAB_PREPARE_RESULT = 331,
GET_SOURCE_SOCKET_NAME = 332,
NOTIFY_REJECT_REASON = 332,
NOTIFY_CLOSE_COLLAB_SESSION = 333,
NOTIFY_START_ABILITY_RESULT = 334,
};

View File

@ -61,7 +61,7 @@ public:
void UnregisterListener(int32_t serviceType, std::shared_ptr<IDataListener> listener);
void SetCallingTokenId(int32_t callingTokenId);
bool GetSessionIdByDeviceId(const std::string &peerDeviceId, int32_t &sessionId);
bool IsNeedAllConnect();
bool IsNeedAllConnect(DSchedServiceType type);
private:
DSchedTransportSoftbusAdapter();

View File

@ -93,9 +93,9 @@ DSchedCollab::DSchedCollab(std::shared_ptr<SinkStartCmd> startCmd, const int32_t
collabInfo_.srcCollabSessionId_ = startCmd->srcCollabSessionId_;
collabInfo_.collabToken_ = startCmd->collabToken_;
collabInfo_.srcCollabVersion_ = startCmd->collabVersion_;
collabInfo_.srcOpt_.needStream_ = startCmd->needStream_;
collabInfo_.srcOpt_.needData_ = startCmd->needData_;
collabInfo_.srcOpt_.needKeepLongAlive_ = startCmd->needKeepLongAlive_;
collabInfo_.srcOpt_.needSendBigData_ = startCmd->needSendBigData_;
collabInfo_.srcOpt_.needSendStream_ = startCmd->needSendStream_;
collabInfo_.srcOpt_.needRecvStream_ = startCmd->needRecvStream_;
collabInfo_.srcOpt_.startParams_ = startCmd->startParams_;
collabInfo_.srcOpt_.messageParams_ = startCmd->messageParams_;
collabInfo_.srcInfo_.pid_ = startCmd->srcPid_;
@ -240,6 +240,16 @@ int32_t DSchedCollab::PostSrcResultTask(std::shared_ptr<NotifyResultCmd> notifyR
HILOGE("eventHandler is nullptr");
return INVALID_PARAMETERS_ERR;
}
if (!notifyResultCmd->abilityRejectReason_.empty()) {
DSchedCollabEventType eventType = ABILITY_REJECT_EVENT;
auto data = std::make_shared<std::string>(notifyResultCmd->abilityRejectReason_);
auto msgEvent = AppExecFwk::InnerEvent::Get(eventType, data, 0);
if (!eventHandler_->SendEvent(msgEvent, 0, AppExecFwk::EventQueue::Priority::IMMEDIATE)) {
HILOGE("send event type %{public}s fail", EVENTDATA[eventType].c_str());
return COLLAB_SEND_EVENT_FAILED;
}
return ERR_OK;
}
collabInfo_.sinkCollabSessionId_ = notifyResultCmd->sinkCollabSessionId_;
collabInfo_.sinkInfo_.socketName_ = notifyResultCmd->sinkSocketName_;
DSchedCollabEventType eventType = NOTIFY_RESULT_EVENT;
@ -269,6 +279,23 @@ int32_t DSchedCollab::PostErrEndTask(const int32_t &result)
return ERR_OK;
}
int32_t DSchedCollab::PostAbilityRejectTask(const std::string &reason)
{
HILOGI("called, reason: %{public}s", reason.c_str());
if (eventHandler_ == nullptr) {
HILOGE("eventHandler is nullptr");
return INVALID_PARAMETERS_ERR;
}
DSchedCollabEventType eventType = ABILITY_REJECT_EVENT;
auto data = std::make_shared<std::string>(reason);
auto msgEvent = AppExecFwk::InnerEvent::Get(eventType, data, 0);
if (!eventHandler_->SendEvent(msgEvent, 0, AppExecFwk::EventQueue::Priority::IMMEDIATE)) {
HILOGE("send event type %{public}s fail", EVENTDATA[eventType].c_str());
return COLLAB_SEND_EVENT_FAILED;
}
return ERR_OK;
}
int32_t DSchedCollab::PostEndTask()
{
HILOGI("called, collabInfo %{public}s", collabInfo_.ToString().c_str());
@ -357,9 +384,9 @@ int32_t DSchedCollab::PackStartCmd(std::shared_ptr<SinkStartCmd>& cmd)
cmd->sinkAbilityName_ = collabInfo_.sinkInfo_.abilityName_;
cmd->srcModuleName_ = collabInfo_.srcInfo_.moduleName_;
cmd->sinkModuleName_ = collabInfo_.sinkInfo_.moduleName_;
cmd->needStream_ = collabInfo_.srcOpt_.needStream_;
cmd->needData_ = collabInfo_.srcOpt_.needData_;
cmd->needKeepLongAlive_ = collabInfo_.srcOpt_.needKeepLongAlive_;
cmd->needSendBigData_ = collabInfo_.srcOpt_.needSendBigData_;
cmd->needSendStream_ = collabInfo_.srcOpt_.needSendStream_;
cmd->needRecvStream_ = collabInfo_.srcOpt_.needRecvStream_;
cmd->startParams_ = collabInfo_.srcOpt_.startParams_;
cmd->messageParams_ = collabInfo_.srcOpt_.messageParams_;
return PackPartCmd(cmd);
@ -447,9 +474,9 @@ AAFwk::Want DSchedCollab::GenerateCollabWant()
peerInfoParams.SetParam("serverId", AAFwk::String::Box(collabInfo_.srcInfo_.serverId_));
AAFwk::WantParams optParams;
optParams.SetParam("isNeedSendBigData", AAFwk::Boolean::Box(collabInfo_.srcOpt_.needStream_));
optParams.SetParam("isNeedSendStream", AAFwk::Boolean::Box(collabInfo_.srcOpt_.needData_));
optParams.SetParam("isNeedReceiveStream", AAFwk::Boolean::Box(collabInfo_.srcOpt_.needKeepLongAlive_));
optParams.SetParam("isNeedSendBigData", AAFwk::Boolean::Box(collabInfo_.srcOpt_.needSendBigData_));
optParams.SetParam("isNeedSendStream", AAFwk::Boolean::Box(collabInfo_.srcOpt_.needSendStream_));
optParams.SetParam("isNeedReceiveStream", AAFwk::Boolean::Box(collabInfo_.srcOpt_.needRecvStream_));
optParams.SetParam("parameters", AAFwk::WantParamWrapper::Box(collabInfo_.srcOpt_.messageParams_));
AAFwk::WantParams collabParams;
@ -486,10 +513,21 @@ int32_t DSchedCollab::SaveSinkAbilityData(const std::string& collabToken, const
return ERR_OK;
}
int32_t DSchedCollab::ExeAbilityRejectError(const std::string &reason)
{
HILOGE("called");
auto cmd = std::make_shared<NotifyResultCmd>();
PackNotifyResultCmd(cmd, COLLAB_ABILITY_REJECT_ERR, reason);
SendCommand(cmd);
CleanUpSession();
HILOGI("end");
return ERR_OK;
}
int32_t DSchedCollab::ExeSinkPrepareResult(const int32_t &result)
{
HILOGI("called");
if (result != ERR_OK) {
if (result != ERR_OK && result != COLLAB_ABILITY_REJECT_ERR) {
HILOGE("failed %{public}d", result);
return PostErrEndTask(result);
}
@ -509,7 +547,8 @@ int32_t DSchedCollab::ExeSinkPrepareResult(const int32_t &result)
return ERR_OK;
}
int32_t DSchedCollab::PackNotifyResultCmd(std::shared_ptr<NotifyResultCmd> cmd, int32_t result)
int32_t DSchedCollab::PackNotifyResultCmd(std::shared_ptr<NotifyResultCmd> cmd, const int32_t &result,
const std::string &abilityRejectReason)
{
if (cmd == nullptr) {
HILOGE("cmd is null");
@ -517,15 +556,21 @@ int32_t DSchedCollab::PackNotifyResultCmd(std::shared_ptr<NotifyResultCmd> cmd,
}
cmd->command_ = NOTIFY_RESULT_CMD;
cmd->result_ = result;
cmd->collabToken_ = collabInfo_.collabToken_;
cmd->sinkSocketName_ = collabInfo_.sinkInfo_.socketName_;
cmd->sinkCollabSessionId_ = collabInfo_.sinkCollabSessionId_;
cmd->abilityRejectReason_ = abilityRejectReason;
return ERR_OK;
}
int32_t DSchedCollab::ExeSrcCollabResult(const int32_t &result)
int32_t DSchedCollab::ExeSrcCollabResult(const int32_t &result, const std::string reason)
{
HILOGI("called, result: %{public}d, collabInfo: %{public}s", result, collabInfo_.ToString().c_str());
int32_t ret = ExeSrcClientNotify(result);
HILOGI("called, collabInfo: %{public}s", collabInfo_.ToString().c_str());
if (result != ERR_OK && result != COLLAB_ABILITY_REJECT_ERR) {
HILOGE("failed, result: %{public}d", result);
return PostErrEndTask(result);
}
int32_t ret = ExeSrcClientNotify(result, reason);
if (ret != ERR_OK) {
HILOGE("failed, ret: %{public}d", ret);
return PostErrEndTask(result);
@ -546,7 +591,7 @@ int32_t DSchedCollab::CleanUpSession()
return DSchedCollabManager::GetInstance().CleanUpSession(collabInfo_.collabToken_);
}
int32_t DSchedCollab::ExeSrcClientNotify(const int32_t &result)
int32_t DSchedCollab::ExeSrcClientNotify(const int32_t &result, const std::string reason)
{
HILOGI("called, result: %{public}d, collabInfo: %{public}s", result, collabInfo_.ToString().c_str());
if (collabInfo_.srcClientCB_ == nullptr) {
@ -563,6 +608,7 @@ int32_t DSchedCollab::ExeSrcClientNotify(const int32_t &result)
PARCEL_WRITE_HELPER(data, Int32, result);
PARCEL_WRITE_HELPER(data, String, collabInfo_.sinkInfo_.socketName_);
PARCEL_WRITE_HELPER(data, String, collabInfo_.collabToken_);
PARCEL_WRITE_HELPER(data, String, reason);
MessageParcel reply;
MessageOption option;
int32_t ret = collabInfo_.srcClientCB_->SendRequest(NOTIFY_COLLAB_PREPARE_RESULT, data, reply, option);
@ -683,6 +729,7 @@ int32_t DSchedCollab::PackDisconnectCmd(std::shared_ptr<DisconnectCmd> cmd)
return INVALID_PARAMETERS_ERR;
}
cmd->command_ = DISCONNECT_CMD;
cmd->collabToken_ = collabInfo_.collabToken_;
return ERR_OK;
}
@ -750,8 +797,8 @@ void DSchedCollab::OnDataRecv(int32_t command, std::shared_ptr<DSchedDataBuffer>
case NOTIFY_RESULT_CMD: {
auto notifyResultCmd = std::make_shared<NotifyResultCmd>();
ret = notifyResultCmd->Unmarshal(jsonStr);
if (ret != ERR_OK || notifyResultCmd->result_ != ERR_OK) {
HILOGE("failed, ret: %{public}d", ret);
if (ret != ERR_OK) {
HILOGE("unmarshal cmd failed, ret: %{public}d", ret);
PostErrEndTask(ret);
return;
}

View File

@ -51,9 +51,9 @@ int32_t BaseCmd::Marshal(std::string &jsonStr)
cJSON_AddStringToObject(rootValue, "SinkModuleName", sinkModuleName_.c_str());
cJSON_AddStringToObject(rootValue, "SinkServiceId", sinkServerId_.c_str());
cJSON_AddBoolToObject(rootValue, "NeedStream", needStream_);
cJSON_AddBoolToObject(rootValue, "NeedData", needData_);
cJSON_AddBoolToObject(rootValue, "NeedKeepLongAlive", needKeepLongAlive_);
cJSON_AddBoolToObject(rootValue, "NeedSendBigData", needSendBigData_);
cJSON_AddBoolToObject(rootValue, "NeedSendStream_", needSendStream_);
cJSON_AddBoolToObject(rootValue, "NeedRecvStream", needRecvStream_);
char *data = cJSON_Print(rootValue);
if (data == nullptr) {
@ -103,8 +103,8 @@ int32_t BaseCmd::Unmarshal(const std::string &jsonStr)
*strValues[i] = item->valuestring;
}
const char *boolKeys[] = { "NeedStream", "NeedData", "NeedKeepLongAlive" };
bool *boolValues[] = { &needStream_, &needData_, &needKeepLongAlive_ };
const char *boolKeys[] = { "NeedSendBigData", "NeedSendStream_", "NeedRecvStream" };
bool *boolValues[] = { &needSendBigData_, &needSendStream_, &needRecvStream_ };
int32_t boolLength = sizeof(boolKeys) / sizeof(boolKeys[0]);
for (int32_t i = 0; i < boolLength; i++) {
cJSON *item = cJSON_GetObjectItemCaseSensitive(rootValue, boolKeys[i]);
@ -535,6 +535,7 @@ int32_t NotifyResultCmd::Marshal(std::string &jsonStr)
cJSON_AddNumberToObject(rootValue, "SinkCollabSessionId", sinkCollabSessionId_);
cJSON_AddNumberToObject(rootValue, "Result", result_);
cJSON_AddStringToObject(rootValue, "SinkSocketName", sinkSocketName_.c_str());
cJSON_AddStringToObject(rootValue, "AbilityRejectReason", abilityRejectReason_.c_str());
char *data = cJSON_Print(rootValue);
if (data == nullptr) {
@ -571,14 +572,12 @@ int32_t NotifyResultCmd::Unmarshal(const std::string &jsonStr)
return INVALID_PARAMETERS_ERR;
}
sinkCollabSessionId_ = sinkCollabSessionId->valueint;
cJSON *result = cJSON_GetObjectItemCaseSensitive(rootValue, "Result");
if (result == nullptr || !cJSON_IsNumber(result)) {
cJSON_Delete(rootValue);
return INVALID_PARAMETERS_ERR;
}
result_ = result->valueint;
cJSON *sinkSocketName = cJSON_GetObjectItemCaseSensitive(rootValue, "SinkSocketName");
if (sinkSocketName == nullptr || !cJSON_IsString(sinkSocketName)) {
cJSON_Delete(rootValue);
@ -586,6 +585,10 @@ int32_t NotifyResultCmd::Unmarshal(const std::string &jsonStr)
}
sinkSocketName_ = sinkSocketName->valuestring;
cJSON *abilityRejectReason = cJSON_GetObjectItemCaseSensitive(rootValue, "AbilityRejectReason");
if (abilityRejectReason != nullptr && cJSON_IsString(abilityRejectReason)) {
abilityRejectReason_ = abilityRejectReason->valuestring;
}
cJSON_Delete(rootValue);
HILOGD("end");
return ERR_OK;

View File

@ -34,10 +34,7 @@ namespace DistributedSchedule {
namespace {
const std::string TAG = "DSchedCollabManager";
const std::string DSCHED_COLLAB_MANAGER = "dsched_collab_manager";
const int32_t RANDOM_MAX = 100;
const int32_t RANDOM_STRING_LENGTH = 20;
const char* RANDOM_SOURCE =
"abcdefghijklmnopqrsstuvwwxyzABCDDEFGHIJKLMMNOPQRSTUVWXXYZ1234567890!@#$%^&*()_+-={}:<>?[]\\|;\"',./~`";
std::map<int32_t, std::string> CMDDATA = {
{MIN_CMD, "MIN_CMD"},
{SINK_START_CMD, "SINK_START_CMD"},
@ -221,8 +218,8 @@ void DSchedCollabManager::HandleCollabMission(const DSchedCollabInfo &info)
newCollab->PostSrcStartTask();
SetTimeOut(collabToken, COLLAB_TIMEOUT);
#ifdef DMSFWK_ALL_CONNECT_MGR
if (!DSchedTransportSoftbusAdapter::GetInstance().IsNeedAllConnect()) {
#ifdef COLLAB_ALL_CONNECT_DECISIONS
if (!DSchedTransportSoftbusAdapter::GetInstance().IsNeedAllConnect(SERVICE_TYPE_COLLAB)) {
HILOGW("don't need wait all connect decision");
return;
}
@ -238,35 +235,36 @@ void DSchedCollabManager::HandleCollabMission(const DSchedCollabInfo &info)
std::string DSchedCollabManager::GenerateCollabToken(const std::string &srcDeviceId)
{
HILOGI("called");
std::string collabToken;
std::string characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz1234567890";
std::default_random_engine gen;
std::random_device rd;
gen.seed(rd());
std::uniform_int_distribution<> dis(0, characters.size() - 1);
std::string randomValue;
randomValue.resize(RANDOM_STRING_LENGTH);
bool isUnique = false;
while (!isUnique) {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(0, RANDOM_MAX);
for (size_t i = 0; i < RANDOM_STRING_LENGTH; ++i) {
randomValue[i] = RANDOM_SOURCE[dis(gen)];
for (int32_t i = 0; i < RANDOM_STRING_LENGTH; ++i) {
randomValue[i] = characters[dis(gen)];
}
collabToken = srcDeviceId + randomValue;
if (collabs_.count(collabToken) == 0) {
randomValue = srcDeviceId + randomValue;
if (collabs_.count(randomValue) == 0) {
isUnique = true;
}
}
HILOGI("end");
return collabToken;
return randomValue;
}
void DSchedCollabManager::SetTimeOut(const std::string &collabToken, int32_t timeout)
{
HILOGI("called, collabToken: %{public}s", collabToken.c_str());
HILOGI("called, collabToken: %{public}s", GetAnonymStr(collabToken).c_str());
auto func = [this, collabToken]() {
if (collabs_.count(collabToken) == 0) {
HILOGW("collab not exist.");
return;
}
HILOGE("collab timeout! info: %{public}s", collabToken.c_str());
HILOGE("collab timeout! info: %{public}s", GetAnonymStr(collabToken).c_str());
auto dCollab = collabs_[collabToken];
if (dCollab != nullptr) {
dCollab->PostErrEndTask(COLLAB_ABILITY_TIMEOUT_ERR);
@ -282,7 +280,7 @@ void DSchedCollabManager::SetTimeOut(const std::string &collabToken, int32_t tim
void DSchedCollabManager::RemoveTimeout(const std::string &collabToken)
{
HILOGI("called, collabToken: %{public}s", collabToken.c_str());
HILOGI("called, collabToken: %{public}s", GetAnonymStr(collabToken).c_str());
if (eventHandler_ == nullptr) {
HILOGE("eventHandler is nullptr");
return;
@ -346,7 +344,7 @@ int32_t DSchedCollabManager::NotifyStartAbilityResult(const std::string& collabT
}
if (result != ERR_OK) {
HILOGE("start ability failed");
return dCollab->PostErrEndTask(REMOTE_DEVICE_START_ABILITY_ERR);
return dCollab->PostErrEndTask(result);
}
dCollab->SaveSinkAbilityData(collabToken, result, sinkPid, sinkUid, sinkAccessTokenId);
HILOGI("end, info: %{public}s.", dCollab->GetCollabInfo().ToString().c_str());
@ -357,7 +355,7 @@ int32_t DSchedCollabManager::NotifySinkPrepareResult(const std::string &collabTo
const int32_t &collabSessionId, const std::string &socketName, const sptr<IRemoteObject> &clientCB)
{
HILOGI("called, collabToken: %{public}s, collabSessionId: %{public}d, result: %{public}d, socketName: %{public}s",
collabToken.c_str(), collabSessionId, result, socketName.c_str());
GetAnonymStr(collabToken).c_str(), collabSessionId, result, socketName.c_str());
auto func = [this, collabToken, result, collabSessionId, socketName, clientCB]() {
HandleCollabPrepareResult(collabToken, result, collabSessionId, socketName, clientCB);
};
@ -384,15 +382,15 @@ void DSchedCollabManager::HandleCollabPrepareResult(const std::string &collabTok
return;
}
std::string DSchedCollabManager::GetSrcSocketName(const std::string& collabToken)
int32_t DSchedCollabManager::NotifySinkRejectReason(const std::string& collabToken, const std::string& reason)
{
HILOGI("called");
HILOGI("called, resion: %{public}s", reason.c_str());
auto dCollab = GetDSchedCollabByTokenId(collabToken);
if (dCollab != nullptr) {
return dCollab->GetCollabInfo().srcInfo_.socketName_;
if (dCollab == nullptr) {
HILOGE("not find dCollab");
return INVALID_PARAMETERS_ERR;
}
HILOGE("not find dCollab");
return "";
return dCollab->PostAbilityRejectTask(reason);
}
int32_t DSchedCollabManager::NotifyAbilityDied(const std::string &bundleName, const int32_t &pid)
@ -479,7 +477,7 @@ int32_t DSchedCollabManager::CancleReleaseAbilityLink(const std::string &bundleN
int32_t DSchedCollabManager::NotifySessionClose(const std::string &collabToken)
{
HILOGI("called, collabToken: %{public}s", collabToken.c_str());
HILOGI("called, collabToken: %{public}s", GetAnonymStr(collabToken).c_str());
auto dCollab = GetDSchedCollabByTokenId(collabToken);
if (dCollab == nullptr) {
HILOGE("can't find collab");
@ -492,7 +490,7 @@ int32_t DSchedCollabManager::NotifySessionClose(const std::string &collabToken)
int32_t DSchedCollabManager::CleanUpSession(const std::string &collabToken)
{
HILOGI("called, collabToken: %{public}s", collabToken.c_str());
HILOGI("called, collabToken: %{public}s", GetAnonymStr(collabToken).c_str());
auto dCollab = GetDSchedCollabByTokenId(collabToken);
if (dCollab == nullptr) {
HILOGE("can't find collab");
@ -561,17 +559,25 @@ void DSchedCollabManager::HandleDataRecv(const int32_t &softbusSessionId, std::s
return;
}
int32_t command = comvalue->valueint;
cJSON *collabTokenvalue = cJSON_GetObjectItemCaseSensitive(cmdValue, "CollabToken");
if (collabTokenvalue == nullptr || !cJSON_IsString(collabTokenvalue)) {
cJSON_Delete(cmdValue);
HILOGE("parse collabToken failed");
return;
}
const std::string collabToken = collabTokenvalue->valuestring;
cJSON_Delete(cmdValue);
NotifyDataRecv(softbusSessionId, command, jsonStr, dataBuffer);
NotifyDataRecv(softbusSessionId, command, jsonStr, dataBuffer, collabToken);
HILOGI("end");
}
void DSchedCollabManager::NotifyDataRecv(const int32_t &softbusSessionId, int32_t command, const std::string& jsonStr,
std::shared_ptr<DSchedDataBuffer> dataBuffer)
std::shared_ptr<DSchedDataBuffer> dataBuffer, const std::string& collabToken)
{
HILOGI("called, parsed cmd %{public}s", CMDDATA[command].c_str());
for (auto iter = collabs_.begin(); iter != collabs_.end(); iter++) {
if (iter->second != nullptr && softbusSessionId == iter->second->GetSoftbusSessionId()) {
if (iter->second != nullptr && softbusSessionId == iter->second->GetSoftbusSessionId() &&
collabToken == iter->second->GetCollabInfo().collabToken_) {
HILOGI("softbusSessionId exist.");
iter->second->OnDataRecv(command, dataBuffer);
if (command == NOTIFY_RESULT_CMD) {

View File

@ -28,6 +28,7 @@ CollabSinkConnectState::CollabSinkConnectState(std::shared_ptr<DSchedCollabState
: stateMachine_(stateMachine)
{
memberFuncMap_[NOTIFY_PREPARE_RESULT_EVENT] = &CollabSinkConnectState::DoSinkPrepareResult;
memberFuncMap_[ABILITY_REJECT_EVENT] = &CollabSinkConnectState::DoAbilityRejectError;
memberFuncMap_[ERR_END_EVENT] = &CollabSinkConnectState::DoConnectError;
}
@ -76,6 +77,21 @@ int32_t CollabSinkConnectState::DoSinkPrepareResult(std::shared_ptr<DSchedCollab
return ret;
}
int32_t CollabSinkConnectState::DoAbilityRejectError(std::shared_ptr<DSchedCollab> dCollab,
const AppExecFwk::InnerEvent::Pointer &event)
{
if (dCollab == nullptr || event == nullptr) {
HILOGE("dCollab or event is null");
return INVALID_PARAMETERS_ERR;
}
auto syncCollabData = event->GetSharedObject<std::string>();
int32_t ret = dCollab->ExeAbilityRejectError(*syncCollabData);
if (ret != ERR_OK) {
HILOGE("failed, ret: %{public}d", ret);
}
return ret;
}
int32_t CollabSinkConnectState::DoConnectError(std::shared_ptr<DSchedCollab> dCollab,
const AppExecFwk::InnerEvent::Pointer &event)
{

View File

@ -28,6 +28,7 @@ CollabSrcWaitResultState::CollabSrcWaitResultState(std::shared_ptr<DSchedCollabS
: stateMachine_(stateMachine)
{
memberFuncMap_[NOTIFY_RESULT_EVENT] = &CollabSrcWaitResultState::DoSrcResultNotifyTask;
memberFuncMap_[ABILITY_REJECT_EVENT] = &CollabSrcWaitResultState::DoAbilityRejectTask;
memberFuncMap_[ERR_END_EVENT] = &CollabSrcWaitResultState::DoSrcWaitResultError;
}
@ -76,6 +77,21 @@ int32_t CollabSrcWaitResultState::DoSrcResultNotifyTask(std::shared_ptr<DSchedCo
}
return ret;
}
int32_t CollabSrcWaitResultState::DoAbilityRejectTask(std::shared_ptr<DSchedCollab> dCollab,
const AppExecFwk::InnerEvent::Pointer &event)
{
if (dCollab == nullptr || event == nullptr) {
HILOGE("dCollab or event is null");
return INVALID_PARAMETERS_ERR;
}
auto syncCollabData = event->GetSharedObject<std::string>();
int32_t ret = dCollab->ExeSrcCollabResult(COLLAB_ABILITY_REJECT_ERR, *syncCollabData);
if (ret != ERR_OK) {
HILOGE("failed, ret: %{public}d", ret);
}
return ret;
}
int32_t CollabSrcWaitResultState::DoSrcWaitResultError(std::shared_ptr<DSchedCollab> dCollab,
const AppExecFwk::InnerEvent::Pointer &event)

View File

@ -139,8 +139,8 @@ void DistributedSchedStub::InitLocalFuncsInner()
&DistributedSchedStub::StopRemoteExtensionAbilityInner;
localFuncsMap_[static_cast<uint32_t>(IDSchedInterfaceCode::COLLAB_MISSION)] =
&DistributedSchedStub::CollabMissionInner;
localFuncsMap_[static_cast<uint32_t>(IDSchedInterfaceCode::GET_SOURCE_SOCKET_NAME)] =
&DistributedSchedStub::GetSrcSocketNameInner;
localFuncsMap_[static_cast<uint32_t>(IDSchedInterfaceCode::NOTIFY_REJECT_REASON)] =
&DistributedSchedStub::NotifyRejectReason;
localFuncsMap_[static_cast<uint32_t>(IDSchedInterfaceCode::NOTIFY_START_ABILITY_RESULT)] =
&DistributedSchedStub::NotifyStartAbilityResultInner;
localFuncsMap_[static_cast<uint32_t>(IDSchedInterfaceCode::NOTIFY_COLLAB_PREPARE_RESULT)] =
@ -729,16 +729,18 @@ int32_t DistributedSchedStub::CollabMissionInner(MessageParcel& data, MessagePar
PARCEL_WRITE_REPLY_NOERROR(reply, Int32, result);
}
int32_t DistributedSchedStub::GetSrcSocketNameInner(MessageParcel& data, MessageParcel& reply)
int32_t DistributedSchedStub::NotifyRejectReason(MessageParcel& data, MessageParcel& reply)
{
HILOGI("called");
if (!IPCSkeleton::IsLocalCalling()) {
HILOGE("check permission failed!");
return DMS_PERMISSION_DENIED;
}
std::string collabToken = data.ReadString();
std::string srcSocketName = DSchedCollabManager::GetInstance().GetSrcSocketName(collabToken);
PARCEL_WRITE_REPLY_NOERROR(reply, String, srcSocketName);
std::string token = data.ReadString();
std::string reason = data.ReadString();
int32_t result = DSchedCollabManager::GetInstance().NotifySinkRejectReason(token, reason);
HILOGI("result = %{public}d", result);
PARCEL_WRITE_REPLY_NOERROR(reply, Int32, result);
}
int32_t DistributedSchedStub::NotifyStartAbilityResultInner(MessageParcel& data, MessageParcel& reply)

View File

@ -134,7 +134,7 @@ int32_t DSchedTransportSoftbusAdapter::ConnectDevice(const std::string &peerDevi
}
}
int32_t ret = ERR_OK;
if (IsNeedAllConnect()) {
if (IsNeedAllConnect(type)) {
HILOGI("waiting all connect decision");
ret = DecisionByAllConnect(peerDeviceId, type);
if (ret != ERR_OK) {
@ -151,7 +151,7 @@ int32_t DSchedTransportSoftbusAdapter::ConnectDevice(const std::string &peerDevi
void DSchedTransportSoftbusAdapter::NotifyConnectDecision(const std::string &peerDeviceId, DSchedServiceType type)
{
if (!IsNeedAllConnect()) {
if (!IsNeedAllConnect(type)) {
HILOGW("don't need notify all connect decision");
return;
}
@ -183,9 +183,14 @@ int32_t DSchedTransportSoftbusAdapter::DecisionByAllConnect(const std::string &p
return ERR_OK;
}
bool DSchedTransportSoftbusAdapter::IsNeedAllConnect()
bool DSchedTransportSoftbusAdapter::IsNeedAllConnect(DSchedServiceType type)
{
#ifndef COLLAB_ALL_CONNECT_DECISIONS
if (type == SERVICE_TYPE_COLLAB) {
HILOGI("called, don't need all connect, type: collab");
return false;
}
#endif
bool result = isAllConnectExist_ && WifiStateAdapter::GetInstance().IsWifiActive();
HILOGI("called, result: %{public}d", result);
return result;