add continuation implementation

Signed-off-by: wangdongdong <wangdd_zju@126.com>
Change-Id: I7253fb92d87e94749dd539ed30708eff5bec44cb
This commit is contained in:
wangdongdong 2021-09-09 18:17:48 +08:00
parent 85db120c05
commit 2377847b3c
16 changed files with 913 additions and 9 deletions

View File

@ -12,6 +12,7 @@
分布式任务调度模块负责跨设备组件管理,提供访问和控制远程组件的能力,支持分布式场景下的应用协同。主要功能如下:
- 远程启动FA跨设备拉起远端设备上指定FA。
- 远程迁移FA将FA跨设备迁移到远端。
## 系统架构<a name="section13587185873516"></a>
@ -31,6 +32,9 @@
├── service # 核心代码
| ├── include
| | ├── caller_info.h # 调用方信息
| | ├── continuation_callback_death_recipient.h # 迁移回调死亡监听接口
| | ├── distributed_sched_ability_shell.h # 迁移回调管理接口
| | ├── distributed_sched_continuation.h # 迁移token管理接口
| | ├── distributed_sched_interface.h # 对外接口
| | ├── distributed_sched_proxy.h # 客户端接口
| | ├── distributed_sched_service.h # 服务端接口
@ -39,6 +43,9 @@
| | ├── parcel_helper.h # 分布式消息解析模块
| | ├── uri.h # uri接口头文件
| ├── src
| | ├── continuation_callback_death_recipient.cpp # 迁移回调死亡监听实现
| | ├── distributed_sched_ability_shell.cpp # 迁移回调管理实现
| | ├── distributed_sched_continuation.h # 迁移token管理实现
| | ├── distributed_sched_proxy.cpp # 客户端实现
| | ├── distributed_sched_service.cpp # 服务端实现
| | ├── distributed_sched_stub.cpp # 服务端父类实现

View File

@ -31,9 +31,12 @@ ohos_shared_library("distributedschedsvr") {
sources = [
"src/adapter/dnetwork_adapter.cpp",
"src/bundle/bundle_manager_internal.cpp",
"src/continuation_callback_death_recipient.cpp",
"src/deviceManager/dms_device_info.cpp",
"src/distributed_device_node_listener.cpp",
"src/distributed_sched_ability_shell.cpp",
"src/distributed_sched_adapter.cpp",
"src/distributed_sched_continuation.cpp",
"src/distributed_sched_proxy.cpp",
"src/distributed_sched_service.cpp",
"src/distributed_sched_stub.cpp",

View File

@ -0,0 +1,32 @@
/*
* Copyright (c) 2021 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef OHOS_DISTRIBUTED_CONTINUATION_CALLBACK_DEATH_RECIPIENT_H
#define OHOS_DISTRIBUTED_CONTINUATION_CALLBACK_DEATH_RECIPIENT_H
#include "iremote_object.h"
namespace OHOS {
namespace DistributedSchedule {
class ContinuationCallbackDeathRecipient : public IRemoteObject::DeathRecipient {
public:
void OnRemoteDied(const wptr<IRemoteObject> &remote) override;
ContinuationCallbackDeathRecipient() = default;
~ContinuationCallbackDeathRecipient() override = default;
};
} // namespace DistributedSchedule
} // namespace OHOS
#endif // OHOS_DISTRIBUTED_CONTINUATION_CALLBACK_DEATH_RECIPIENT_H

View File

@ -0,0 +1,52 @@
/*
* Copyright (c) 2021 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef OHOS_DISTRIBUTED_SCHED_ABILITY_SHELL_H
#define OHOS_DISTRIBUTED_SCHED_ABILITY_SHELL_H
#include <list>
#include <map>
#include <mutex>
#include "continuation_callback_death_recipient.h"
#include "iremote_object.h"
#include "ohos/aafwk/content/want.h"
#include "single_instance.h"
namespace OHOS {
namespace DistributedSchedule {
class DistributedSchedAbilityShell {
DECLARE_SINGLE_INSTANCE_BASE(DistributedSchedAbilityShell);
public:
int32_t RegisterAbilityToken(const sptr<IRemoteObject>& abilityToken,
const sptr<IRemoteObject>& continuationCallback);
int32_t UnregisterAbilityToken(const sptr<IRemoteObject>& abilityToken,
const sptr<IRemoteObject>& continuationCallback);
sptr<IRemoteObject> GetContinuationCallback(const sptr<IRemoteObject>& abilityToken);
int32_t ScheduleCompleteContinuation(const sptr<IRemoteObject>& abilityToken, int32_t isSuccess);
void RemoveContinuationCallback(const sptr<IRemoteObject>& continuationCallback);
private:
DistributedSchedAbilityShell();
~DistributedSchedAbilityShell() = default;
std::map<sptr<IRemoteObject>, std::list<sptr<IRemoteObject>>> regAbilityMap_;
std::mutex regAbilityLock_;
sptr<IRemoteObject::DeathRecipient> death_;
};
} // namespace DistributedSchedule
} // namespace OHOS
#endif // OHOS_DISTRIBUTED_SCHED_ABILITY_SHELL_H

View File

@ -0,0 +1,61 @@
/*
* Copyright (c) 2021 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef OHOS_DISTRIBUTED_SCHED_CONTINUATION_H
#define OHOS_DISTRIBUTED_SCHED_CONTINUATION_H
#include <cstdint>
#include <map>
#include <mutex>
#include "event_handler.h"
#include "iremote_object.h"
#include "refbase.h"
namespace OHOS {
namespace DistributedSchedule {
using FuncContinuationCallback = std::function<void(const sptr<IRemoteObject>& abilityToken)>;
class DSchedContinuation : public std::enable_shared_from_this<DSchedContinuation> {
public:
void Init(const FuncContinuationCallback& contCallback);
bool PushAbilityToken(int32_t sessionId, const sptr<IRemoteObject>& abilityToken);
sptr<IRemoteObject> PopAbilityToken(int32_t sessionId);
int32_t GenerateSessionId();
private:
class ContinuationHandler : public AppExecFwk::EventHandler {
public:
ContinuationHandler(const std::shared_ptr<AppExecFwk::EventRunner>& runner,
const std::shared_ptr<DSchedContinuation>& continuationObj,
const FuncContinuationCallback& contCallback)
: AppExecFwk::EventHandler(runner), continuationObj_(continuationObj), contCallback_(contCallback) {}
~ContinuationHandler() = default;
void ProcessEvent(const OHOS::AppExecFwk::InnerEvent::Pointer& event) override;
private:
std::weak_ptr<DSchedContinuation> continuationObj_;
FuncContinuationCallback contCallback_;
};
std::shared_ptr<ContinuationHandler> continuationHandler_;
std::mutex continuationLock_;
int32_t currSessionId_ = 1;
std::map<int32_t, sptr<IRemoteObject>> continuationMap_;
};
} // namespace DistributedSchedule
} // namespace OHOS
#endif // OHOS_DISTRIBUTED_SCHED_CONTINUATION_H

View File

@ -22,6 +22,7 @@
#include <unordered_map>
#include "distributed_sched_stub.h"
#include "distributed_sched_continuation.h"
#include "iremote_object.h"
#include "iremote_proxy.h"
#include "nocopyable.h"
@ -64,7 +65,10 @@ public:
private:
DistributedSchedService();
bool Init();
bool GetLocalDeviceId(std::string& localDeviceId);
sptr<IDistributedSched> GetRemoteDms(const std::string& remoteDeviceId);
void NotifyContinuationCallbackResult(const sptr<IRemoteObject>& abilityToken, int32_t isSuccess);
std::shared_ptr<DSchedContinuation> dschedContinuation_;
};
} // namespace DistributedSchedule
} // namespace OHOS

View File

@ -31,6 +31,12 @@ public:
private:
int32_t StartRemoteAbilityInner(MessageParcel& data, MessageParcel& reply);
int32_t StartAbilityFromRemoteInner(MessageParcel& data, MessageParcel& reply);
int32_t StartContinuationInner(MessageParcel& data, MessageParcel& reply);
int32_t NotifyCompleteContinuationInner(MessageParcel& data, MessageParcel& reply);
int32_t NotifyContinuationResultFromRemoteInner(MessageParcel& data, MessageParcel& reply);
int32_t RegisterAbilityTokenInner(MessageParcel& data, MessageParcel& reply);
int32_t UnregisterAbilityTokenInner(MessageParcel& data, MessageParcel& reply);
bool CheckDmsRequestPermission();
bool EnforceInterfaceToken(MessageParcel& data);
using DistributedSchedFunc = int32_t(DistributedSchedStub::*)(MessageParcel& data, MessageParcel& reply);

View File

@ -0,0 +1,29 @@
/*
* Copyright (c) 2021 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "continuation_callback_death_recipient.h"
#include "distributed_sched_ability_shell.h"
#include "dtbschedmgr_log.h"
#include "iremote_proxy.h"
namespace OHOS {
namespace DistributedSchedule {
void ContinuationCallbackDeathRecipient::OnRemoteDied(const wptr<IRemoteObject>& remote)
{
HILOGI("%s called", __func__);
DistributedSchedAbilityShell::GetInstance().RemoveContinuationCallback(remote.promote());
}
} // namespace DistributedSchedule
} // namespace OHOS

View File

@ -0,0 +1,152 @@
/*
* Copyright (c) 2021 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "distributed_sched_ability_shell.h"
#include "dtbschedmgr_log.h"
#include "ipc_skeleton.h"
#include "ipc_types.h"
#include "iservice_registry.h"
#include "parcel_helper.h"
#include "string_ex.h"
namespace OHOS {
namespace DistributedSchedule {
namespace {
const std::u16string ABILITYSHELL_INTERFACE_TOKEN = u"ohos.abilityshell.ContinuationScheduler";
constexpr int64_t SCHEDULE_COMPLETE_CONTINUE = 1;
}
using namespace OHOS::HiviewDFX;
IMPLEMENT_SINGLE_INSTANCE(DistributedSchedAbilityShell);
DistributedSchedAbilityShell::DistributedSchedAbilityShell()
{
death_ = sptr<IRemoteObject::DeathRecipient>(new ContinuationCallbackDeathRecipient());
}
int32_t DistributedSchedAbilityShell::RegisterAbilityToken(const sptr<IRemoteObject>& abilityToken,
const sptr<IRemoteObject>& continuationCallback)
{
if (abilityToken == nullptr || continuationCallback == nullptr) {
HILOGE("RegisterAbilityToken abilityToken or continuationCallback is null");
return INVALID_PARAMETERS_ERR;
}
std::lock_guard<std::mutex> autoLock(regAbilityLock_);
auto itAbility = regAbilityMap_.find(continuationCallback);
if (itAbility == regAbilityMap_.end()) {
// new continuationCallback, add death recipient
continuationCallback->AddDeathRecipient(death_);
}
auto& tokenList = regAbilityMap_[continuationCallback];
for (const auto& tokenItem : tokenList) {
if (tokenItem == abilityToken) {
// already have reg abilityToken
return REG_REPEAT_ABILITY_TOKEN_ERR;
}
}
// add abilityToken
tokenList.emplace_back(abilityToken);
return ERR_OK;
}
int32_t DistributedSchedAbilityShell::UnregisterAbilityToken(const sptr<IRemoteObject>& abilityToken,
const sptr<IRemoteObject>& continuationCallback)
{
if (abilityToken == nullptr || continuationCallback == nullptr) {
HILOGE("UnregisterAbilityToken abilityToken or continuationCallback is null");
return INVALID_PARAMETERS_ERR;
}
std::lock_guard<std::mutex> autoLock(regAbilityLock_);
auto itAbility = regAbilityMap_.find(continuationCallback);
if (itAbility != regAbilityMap_.end()) {
std::list<sptr<IRemoteObject>>& tokenList = itAbility->second;
int32_t sizeBefore = tokenList.size();
if (sizeBefore == 0) {
return NO_ABILITY_TOKEN_ERR;
}
tokenList.remove(abilityToken);
int32_t sizeAfter = tokenList.size();
// if list is empty and erase it from map
if (sizeAfter == 0) {
if (itAbility->first != nullptr) {
itAbility->first->RemoveDeathRecipient(death_);
}
regAbilityMap_.erase(itAbility);
}
if (sizeBefore == sizeAfter) {
return NO_ABILITY_TOKEN_ERR;
}
return ERR_OK;
}
// not find continuationCallback, return NO_APP_THREAD_ERR
return NO_APP_THREAD_ERR;
}
sptr<IRemoteObject> DistributedSchedAbilityShell::GetContinuationCallback(const sptr<IRemoteObject>& abilityToken)
{
if (abilityToken == nullptr) {
HILOGE("GetContinuationCallback abilityToken is null");
return nullptr;
}
std::lock_guard<std::mutex> autoLock(regAbilityLock_);
for (const auto& regAbility : regAbilityMap_) {
const std::list<sptr<IRemoteObject>>& tokenList = regAbility.second;
for (const auto& tokenItem : tokenList) {
if (tokenItem == abilityToken) {
// find abilityToken
return regAbility.first;
}
}
}
return nullptr;
}
void DistributedSchedAbilityShell::RemoveContinuationCallback(const sptr<IRemoteObject>& continuationCallback)
{
if (continuationCallback == nullptr) {
HILOGE("RemoveContinuationCallback continuationCallback is null");
return;
}
std::lock_guard<std::mutex> autoLock(regAbilityLock_);
continuationCallback->RemoveDeathRecipient(death_);
regAbilityMap_.erase(continuationCallback);
}
int32_t DistributedSchedAbilityShell::ScheduleCompleteContinuation(const sptr<IRemoteObject>& abilityToken,
int32_t isSuccess)
{
if (abilityToken == nullptr) {
HILOGE("ScheduleCompleteContinuation ability abilityToken is null");
return INVALID_PARAMETERS_ERR;
}
sptr<IRemoteObject> continuationCallback = GetContinuationCallback(abilityToken);
if (continuationCallback == nullptr) {
HILOGE("ScheduleCompleteContinuation continuationCallback is null");
return INVALID_PARAMETERS_ERR;
}
MessageParcel data;
if (!data.WriteInterfaceToken(ABILITYSHELL_INTERFACE_TOKEN)) {
return ERR_FLATTEN_OBJECT;
}
PARCEL_WRITE_HELPER(data, Int32, isSuccess);
MessageParcel reply;
MessageOption option;
int32_t error = continuationCallback->SendRequest(SCHEDULE_COMPLETE_CONTINUE, data, reply, option);
HILOGI("ScheduleCompleteContinuation transact result: %{public}d", error);
return error;
}
} // namespace DistributedSchedule
} // namespace OHOS

View File

@ -0,0 +1,130 @@
/*
* Copyright (c) 2021 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "distributed_sched_continuation.h"
#include "dtbschedmgr_log.h"
using namespace std;
using namespace OHOS::AppExecFwk;
namespace OHOS {
namespace DistributedSchedule {
namespace {
constexpr int64_t CONTINUATION_DELAY_TIME = 20000;
}
void DSchedContinuation::Init(const FuncContinuationCallback& contCallback)
{
auto runner = EventRunner::Create("dsched_continuation");
continuationHandler_ = make_shared<ContinuationHandler>(runner, shared_from_this(), contCallback);
}
bool DSchedContinuation::PushAbilityToken(int32_t sessionId, const sptr<IRemoteObject>& abilityToken)
{
if (abilityToken == nullptr) {
HILOGE("DSchedContinuation::PushAbilityToken abilityToken null!");
return false;
}
if (sessionId <= 0) {
HILOGE("DSchedContinuation::PushAbilityToken sessionId invalid!");
return false;
}
if (continuationHandler_ == nullptr) {
HILOGE("DSchedContinuation::PushAbilityToken not initialized!");
return false;
}
lock_guard<mutex> autoLock(continuationLock_);
bool ret = true;
ret = continuationHandler_->SendEvent(sessionId, 0, CONTINUATION_DELAY_TIME);
if (!ret) {
HILOGE("DSchedContinuation::PushAbilityToken SendEvent failed!");
return false;
}
auto iterSession = continuationMap_.find(sessionId);
if (iterSession != continuationMap_.end()) {
HILOGE("DSchedContinuation::PushAbilityToken sessionId:%{public}d exist!", sessionId);
return false;
}
(void)continuationMap_.emplace(sessionId, abilityToken);
return true;
}
sptr<IRemoteObject> DSchedContinuation::PopAbilityToken(int32_t sessionId)
{
if (sessionId <= 0) {
HILOGE("DSchedContinuation::PopAbilityToken sessionId invalid");
return nullptr;
}
lock_guard<mutex> autoLock(continuationLock_);
auto iter = continuationMap_.find(sessionId);
if (iter == continuationMap_.end()) {
HILOGW("DSchedContinuation::PopAbilityToken not found sessionId:%{public}d", sessionId);
return nullptr;
}
sptr<IRemoteObject> abilityToken = iter->second;
(void)continuationMap_.erase(iter);
if (continuationHandler_ != nullptr) {
continuationHandler_->RemoveEvent(sessionId);
}
return abilityToken;
}
int32_t DSchedContinuation::GenerateSessionId()
{
lock_guard<mutex> autoLock(continuationLock_);
int32_t currValue = currSessionId_;
if (++currSessionId_ <= 0) {
currSessionId_ = 1;
}
return currValue;
}
void DSchedContinuation::ContinuationHandler::ProcessEvent(const InnerEvent::Pointer& event)
{
if (event == nullptr) {
HILOGE("ContinuationHandler::ProcessEvent event nullptr!");
return;
}
auto dSchedContinuation = continuationObj_.lock();
if (dSchedContinuation == nullptr) {
HILOGE("ContinuationHandler::ProcessEvent continuation object failed!");
return;
}
auto eventId = event->GetInnerEventId();
int32_t sessionId = static_cast<int32_t>(eventId);
if (sessionId <= 0) {
HILOGW("ContinuationHandler::ProcessEvent sessionId invalid!");
return;
}
auto abilityToken = dSchedContinuation->PopAbilityToken(sessionId);
if (abilityToken == nullptr) {
HILOGW("ContinuationHandler::ProcessEvent abilityToken nullptr!");
return;
}
if (contCallback_ != nullptr) {
contCallback_(abilityToken);
}
}
} // namespace DistributedSchedule
} // namespace OHOS

View File

@ -70,28 +70,103 @@ int32_t DistributedSchedProxy::StartAbilityFromRemote(const OHOS::AAFwk::Want& w
int32_t DistributedSchedProxy::StartContinuation(const OHOS::AAFwk::Want& want,
const OHOS::AppExecFwk::AbilityInfo& abilityInfo, const sptr<IRemoteObject>& abilityToken)
{
return 0;
if (abilityToken == nullptr) {
HILOGE("StartContinuation abilityToken null!");
return ERR_NULL_OBJECT;
}
sptr<IRemoteObject> remote = Remote();
if (remote == nullptr) {
HILOGE("StartContinuation remote service null");
return ERR_NULL_OBJECT;
}
MessageParcel data;
if (!data.WriteInterfaceToken(DMS_PROXY_INTERFACE_TOKEN)) {
return ERR_FLATTEN_OBJECT;
}
PARCEL_WRITE_HELPER(data, Parcelable, &want);
PARCEL_WRITE_HELPER(data, Parcelable, &abilityInfo);
PARCEL_WRITE_HELPER(data, RemoteObject, abilityToken);
MessageParcel reply;
PARCEL_TRANSACT_SYNC_RET_INT(remote, START_CONTINUATION, data, reply);
}
void DistributedSchedProxy::NotifyCompleteContinuation(const std::u16string& devId, int32_t sessionId, bool isSuccess)
{
sptr<IRemoteObject> remote = Remote();
if (remote == nullptr) {
HILOGE("NotifyCompleteContinuation remote service null");
return;
}
MessageParcel data;
if (!data.WriteInterfaceToken(DMS_PROXY_INTERFACE_TOKEN)) {
return;
}
PARCEL_WRITE_HELPER_NORET(data, String16, devId);
PARCEL_WRITE_HELPER_NORET(data, Int32, sessionId);
PARCEL_WRITE_HELPER_NORET(data, Bool, isSuccess);
MessageParcel reply;
PARCEL_TRANSACT_SYNC_NORET(remote, NOTIFY_COMPLETE_CONTINUATION, data, reply);
}
int32_t DistributedSchedProxy::NotifyContinuationResultFromRemote(int32_t sessionId, bool isSuccess)
{
return 0;
sptr<IRemoteObject> remote = Remote();
if (remote == nullptr) {
HILOGE("NotifyContinuationResultFromRemote remote service null");
return ERR_NULL_OBJECT;
}
MessageParcel data;
if (!data.WriteInterfaceToken(DMS_PROXY_INTERFACE_TOKEN)) {
return ERR_FLATTEN_OBJECT;
}
PARCEL_WRITE_HELPER(data, Int32, sessionId);
PARCEL_WRITE_HELPER(data, Bool, isSuccess);
MessageParcel reply;
PARCEL_TRANSACT_SYNC_RET_INT(remote, NOTIFY_CONTINUATION_RESULT_FROM_REMOTE, data, reply);
}
int32_t DistributedSchedProxy::RegisterAbilityToken(const sptr<IRemoteObject>& abilityToken,
const sptr<IRemoteObject>& continuationCallback)
{
return 0;
if (abilityToken == nullptr || continuationCallback == nullptr) {
HILOGE("RegisterAbilityToken paramter null!");
return ERR_NULL_OBJECT;
}
sptr<IRemoteObject> remote = Remote();
if (remote == nullptr) {
HILOGE("RegisterAbilityToken remote service null");
return ERR_NULL_OBJECT;
}
MessageParcel data;
if (!data.WriteInterfaceToken(DMS_PROXY_INTERFACE_TOKEN)) {
return ERR_FLATTEN_OBJECT;
}
PARCEL_WRITE_HELPER(data, RemoteObject, abilityToken);
PARCEL_WRITE_HELPER(data, RemoteObject, continuationCallback);
MessageParcel reply;
PARCEL_TRANSACT_SYNC_RET_INT(remote, REGISTER_ABILITY_TOKEN, data, reply);
}
int32_t DistributedSchedProxy::UnregisterAbilityToken(const sptr<IRemoteObject>& abilityToken,
const sptr<IRemoteObject>& continuationCallback)
{
return 0;
if (abilityToken == nullptr || continuationCallback == nullptr) {
HILOGE("UnregisterAbilityToken paramter null!");
return ERR_NULL_OBJECT;
}
sptr<IRemoteObject> remote = Remote();
if (remote == nullptr) {
HILOGE("UnregisterAbilityToken remote service null");
return ERR_NULL_OBJECT;
}
MessageParcel data;
if (!data.WriteInterfaceToken(DMS_PROXY_INTERFACE_TOKEN)) {
return ERR_FLATTEN_OBJECT;
}
PARCEL_WRITE_HELPER(data, RemoteObject, abilityToken);
PARCEL_WRITE_HELPER(data, RemoteObject, continuationCallback);
MessageParcel reply;
PARCEL_TRANSACT_SYNC_RET_INT(remote, UNREGISTER_ABILITY_TOKEN, data, reply);
}
} // namespace DistributedSchedule
} // namespace OHOS

View File

@ -20,6 +20,8 @@
#include "adapter/dnetwork_adapter.h"
#include "distributed_sched_adapter.h"
#include "distributed_sched_ability_shell.h"
#include "dtbschedmgr_device_info_storage.h"
#include "dtbschedmgr_log.h"
#include "ability_manager_client.h"
@ -59,6 +61,12 @@ void DistributedSchedService::OnStart()
HILOGE("failed to init DistributedSchedService");
return;
}
FuncContinuationCallback continuationCallback = [this] (const sptr<IRemoteObject>& abilityToken) {
HILOGW("continuationCallback timeout.");
NotifyContinuationCallbackResult(abilityToken, CONTINUE_ABILITY_TIMEOUT_ERR);
};
dschedContinuation_ = std::make_shared<DSchedContinuation>();
dschedContinuation_->Init(continuationCallback);
HILOGI("DistributedSchedService::OnStart start service success.");
}
@ -131,28 +139,122 @@ int32_t DistributedSchedService::StartAbilityFromRemote(const OHOS::AAFwk::Want&
int32_t DistributedSchedService::StartContinuation(const OHOS::AAFwk::Want& want,
const OHOS::AppExecFwk::AbilityInfo& abilityInfo, const sptr<IRemoteObject>& abilityToken)
{
return 0;
HILOGD("[PerformanceTest] DistributedSchedService StartContinuation begin");
if (abilityToken == nullptr) {
HILOGE("StartContinuation abilityToken is null!");
return INVALID_REMOTE_PARAMETERS_ERR;
}
auto flags = want.GetFlags();
if ((flags & AAFwk::Want::FLAG_ABILITY_CONTINUATION) == 0) {
HILOGE("StartContinuation want continuation flags invalid!");
return INVALID_REMOTE_PARAMETERS_ERR;
}
std::string devId;
if (!GetLocalDeviceId(devId)) {
HILOGE("StartContinuation get local deviceId failed!");
return INVALID_REMOTE_PARAMETERS_ERR;
}
if (dschedContinuation_ == nullptr) {
HILOGE("StartContinuation continuation object null!");
return INVALID_PARAMETERS_ERR;
}
int32_t sessionId = dschedContinuation_->GenerateSessionId();
AAFwk::Want newWant = want;
newWant.SetParam("sessionId", sessionId);
newWant.SetParam("deviceId", devId);
int32_t result = ERR_OK;
result = StartRemoteAbility(newWant, abilityInfo, 0);
if (result != ERR_OK) {
HILOGE("DistributedSchedService:continue ability failed, errorCode = %{public}d", result);
return result;
}
bool ret = dschedContinuation_->PushAbilityToken(sessionId, abilityToken);
if (!ret) {
HILOGW("StartContinuation PushAbilityToken failed!");
return INVALID_REMOTE_PARAMETERS_ERR;
}
HILOGD("[PerformanceTest] DistributedSchedService StartContinuation end");
return result;
}
void DistributedSchedService::NotifyCompleteContinuation(const std::u16string& devId, int32_t sessionId, bool isSuccess)
{
if (!isSuccess) {
HILOGE("NotifyCompleteContinuation failed!");
}
if (sessionId <= 0) {
HILOGE("NotifyCompleteContinuation sessionId invalid!");
return;
}
std::string deviceId = Str16ToStr8(devId);
sptr<IDistributedSched> remoteDms = GetRemoteDms(deviceId);
if (remoteDms == nullptr) {
HILOGE("NotifyCompleteContinuation get remote dms null!");
return;
}
remoteDms->NotifyContinuationResultFromRemote(sessionId, isSuccess);
}
int32_t DistributedSchedService::NotifyContinuationResultFromRemote(int32_t sessionId, bool isSuccess)
{
return 0;
if (sessionId <= 0) {
HILOGE("NotifyContinuationResultFromRemote sessionId:%{public}d invalid!", sessionId);
return INVALID_REMOTE_PARAMETERS_ERR;
}
if (dschedContinuation_ == nullptr) {
HILOGE("NotifyContinuationResultFromRemote continuation object null!");
return INVALID_REMOTE_PARAMETERS_ERR;
}
auto abilityToken = dschedContinuation_->PopAbilityToken(sessionId);
if (abilityToken == nullptr) {
HILOGE("DSchedContinuationCallback NotifyContinuationResultFromRemote abilityToken null!");
return INVALID_REMOTE_PARAMETERS_ERR;
}
NotifyContinuationCallbackResult(abilityToken, isSuccess ? 0 : NOTIFYCOMPLETECONTINUATION_FAILED);
return ERR_OK;
}
void DistributedSchedService::NotifyContinuationCallbackResult(const sptr<IRemoteObject>& abilityToken,
int32_t isSuccess)
{
if (isSuccess != ERR_OK) {
HILOGE("NotifyContinuationCallbackResult failed!");
}
HILOGD("NotifyContinuationCallbackResult ContinuationRet result:%{public}d", isSuccess);
if (abilityToken == nullptr) {
HILOGE("NotifyContinuationCallbackResult abilityToken null!");
return;
}
int32_t result = DistributedSchedAbilityShell::GetInstance().ScheduleCompleteContinuation(
abilityToken, isSuccess);
HILOGD("NotifyContinuationCallbackResult ScheduleCompleteContinuation result:%{public}d", result);
}
int32_t DistributedSchedService::RegisterAbilityToken(const sptr<IRemoteObject>& abilityToken,
const sptr<IRemoteObject>& continuationCallback)
{
return 0;
return DistributedSchedAbilityShell::GetInstance().RegisterAbilityToken(abilityToken, continuationCallback);
}
int32_t DistributedSchedService::UnregisterAbilityToken(const sptr<IRemoteObject>& abilityToken,
const sptr<IRemoteObject>& continuationCallback)
{
return 0;
return DistributedSchedAbilityShell::GetInstance().UnregisterAbilityToken(abilityToken, continuationCallback);
}
bool DistributedSchedService::GetLocalDeviceId(std::string& localDeviceId)
{
if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)) {
HILOGE("GetLocalDeviceId fail");
return false;
}
return true;
}
sptr<IDistributedSched> DistributedSchedService::GetRemoteDms(const std::string& remoteDeviceId)

View File

@ -17,6 +17,7 @@
#include "ability_info.h"
#include "caller_info.h"
#include "distributed_sched_ability_shell.h"
#include "dtbschedmgr_log.h"
#include "datetime_ex.h"
@ -31,14 +32,20 @@ using namespace AAFwk;
using namespace AppExecFwk;
namespace {
constexpr int32_t HID_HAP = 10000; /* first hap user */
const std::u16string DMS_STUB_INTERFACE_TOKEN = u"ohos.distributedschedule.accessToken";
}
DistributedSchedStub::DistributedSchedStub()
{
localFuncsMap_[START_REMOTE_ABILITY] = &DistributedSchedStub::StartRemoteAbilityInner;
localFuncsMap_[START_CONTINUATION] = &DistributedSchedStub::StartContinuationInner;
localFuncsMap_[NOTIFY_COMPLETE_CONTINUATION] = &DistributedSchedStub::NotifyCompleteContinuationInner;
localFuncsMap_[REGISTER_ABILITY_TOKEN] = &DistributedSchedStub::RegisterAbilityTokenInner;
localFuncsMap_[UNREGISTER_ABILITY_TOKEN] = &DistributedSchedStub::UnregisterAbilityTokenInner;
remoteFuncsMap_[START_ABILITY_FROM_REMOTE] = &DistributedSchedStub::StartAbilityFromRemoteInner;
remoteFuncsMap_[NOTIFY_CONTINUATION_RESULT_FROM_REMOTE] =
&DistributedSchedStub::NotifyContinuationResultFromRemoteInner;
}
DistributedSchedStub::~DistributedSchedStub()
@ -111,6 +118,80 @@ int32_t DistributedSchedStub::StartAbilityFromRemoteInner(MessageParcel& data, M
PARCEL_WRITE_REPLY_NOERROR(reply, Int32, result);
}
int32_t DistributedSchedStub::StartContinuationInner(MessageParcel& data, MessageParcel& reply)
{
shared_ptr<AAFwk::Want> want(data.ReadParcelable<AAFwk::Want>());
if (want == nullptr) {
HILOGW("DistributedSchedStub: StartContinuationInner want readParcelable failed!");
return ERR_NULL_OBJECT;
}
unique_ptr<AbilityInfo> spAbilityInfo(data.ReadParcelable<AbilityInfo>());
if (spAbilityInfo == nullptr) {
HILOGW("DistributedSchedStub: StartContinuationInner AbilityInfo readParcelable failed!");
return ERR_NULL_OBJECT;
}
sptr<IRemoteObject> abilityToken = data.ReadRemoteObject();
int32_t result = StartContinuation(*want, *spAbilityInfo, abilityToken);
HILOGI("DistributedSchedStub: StartContinuationInner result = %{public}d", result);
PARCEL_WRITE_REPLY_NOERROR(reply, Int32, result);
}
int32_t DistributedSchedStub::NotifyCompleteContinuationInner(MessageParcel& data,
[[maybe_unused]] MessageParcel& reply)
{
u16string devId = data.ReadString16();
if (devId.empty()) {
HILOGE("DistributedSchedStub: NotifyCompleteContinuationInner devId empty!");
return INVALID_PARAMETERS_ERR;
}
int32_t sessionId = 0;
PARCEL_READ_HELPER(data, Int32, sessionId);
bool continuationResult = false;
PARCEL_READ_HELPER(data, Bool, continuationResult);
NotifyCompleteContinuation(devId, sessionId, continuationResult);
return ERR_OK;
}
int32_t DistributedSchedStub::NotifyContinuationResultFromRemoteInner(MessageParcel& data,
[[maybe_unused]] MessageParcel& reply)
{
if (!CheckDmsRequestPermission()) {
HILOGW("DistributedSchedStub: NotifyContinuationResultFromRemoteInner request DENIED!");
return DMS_PERMISSION_DENIED;
}
int32_t sessionId = 0;
PARCEL_READ_HELPER(data, Int32, sessionId);
bool continuationResult = false;
PARCEL_READ_HELPER(data, Bool, continuationResult);
return NotifyContinuationResultFromRemote(sessionId, continuationResult);
}
int32_t DistributedSchedStub::RegisterAbilityTokenInner(MessageParcel& data, MessageParcel& reply)
{
sptr<IRemoteObject> abilityToken = data.ReadRemoteObject();
sptr<IRemoteObject> continuationCallback = data.ReadRemoteObject();
int32_t result = RegisterAbilityToken(abilityToken, continuationCallback);
HILOGI("DistributedSchedStub: RegisterAbilityTokenInner result = %{public}d", result);
PARCEL_WRITE_REPLY_NOERROR(reply, Int32, result);
}
int32_t DistributedSchedStub::UnregisterAbilityTokenInner(MessageParcel& data, MessageParcel& reply)
{
sptr<IRemoteObject> abilityToken = data.ReadRemoteObject();
sptr<IRemoteObject> continuationCallback = data.ReadRemoteObject();
int32_t result = UnregisterAbilityToken(abilityToken, continuationCallback);
HILOGI("DistributedSchedStub: UnregisterAbilityTokenInner result = %{public}d", result);
PARCEL_WRITE_REPLY_NOERROR(reply, Int32, result);
}
bool DistributedSchedStub::CheckDmsRequestPermission()
{
// never allow non-system uid distributed request
auto callingUid = IPCSkeleton::GetCallingUid();
return (callingUid < HID_HAP);
}
bool DistributedSchedStub::EnforceInterfaceToken(MessageParcel& data)
{
u16string interfaceToken = data.ReadInterfaceToken();

View File

@ -23,13 +23,17 @@ ohos_unittest("distributedschedsvrtest") {
sources = [
"//foundation/distributedschedule/dmsfwk/services/dtbschedmgr/src/adapter/dnetwork_adapter.cpp",
"//foundation/distributedschedule/dmsfwk/services/dtbschedmgr/src/bundle/bundle_manager_internal.cpp",
"//foundation/distributedschedule/dmsfwk/services/dtbschedmgr/src/continuation_callback_death_recipient.cpp",
"//foundation/distributedschedule/dmsfwk/services/dtbschedmgr/src/deviceManager/dms_device_info.cpp",
"//foundation/distributedschedule/dmsfwk/services/dtbschedmgr/src/distributed_device_node_listener.cpp",
"//foundation/distributedschedule/dmsfwk/services/dtbschedmgr/src/distributed_sched_ability_shell.cpp",
"//foundation/distributedschedule/dmsfwk/services/dtbschedmgr/src/distributed_sched_adapter.cpp",
"//foundation/distributedschedule/dmsfwk/services/dtbschedmgr/src/distributed_sched_continuation.cpp",
"//foundation/distributedschedule/dmsfwk/services/dtbschedmgr/src/distributed_sched_proxy.cpp",
"//foundation/distributedschedule/dmsfwk/services/dtbschedmgr/src/distributed_sched_service.cpp",
"//foundation/distributedschedule/dmsfwk/services/dtbschedmgr/src/distributed_sched_stub.cpp",
"//foundation/distributedschedule/dmsfwk/services/dtbschedmgr/src/dtbschedmgr_device_info_storage.cpp",
"unittest/distributed_sched_continuation_test.cpp",
"unittest/distributed_sched_service_test.cpp",
]

View File

@ -0,0 +1,113 @@
/*
* Copyright (c) 2021 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "distributed_sched_continuation_test.h"
using namespace std;
using namespace testing;
using namespace testing::ext;
using namespace OHOS;
using namespace AAFwk;
using namespace AppExecFwk;
namespace OHOS {
namespace DistributedSchedule {
namespace {
}
void DSchedContinuationTest::SetUpTestCase()
{
}
void DSchedContinuationTest::TearDownTestCase()
{
}
void DSchedContinuationTest::SetUp()
{
dschedContinuation_ = make_shared<DSchedContinuation>();
}
void DSchedContinuationTest::TearDown()
{
dschedContinuation_ = nullptr;
}
sptr<IRemoteObject> DSchedContinuationTest::GetDSchedService() const
{
sptr<IRemoteObject> dsched;
return dsched;
}
int32_t DSchedContinuationTest::PushAbilityToken()
{
FuncContinuationCallback continuationCallback = [this] (const sptr<IRemoteObject>& abilityToken) {
if (abilityToken == nullptr) {
return;
}
timeoutFlag_ = true;
};
dschedContinuation_->Init(continuationCallback);
int32_t sessionId = dschedContinuation_->GenerateSessionId();
dschedContinuation_->PushAbilityToken(sessionId, GetDSchedService());
return sessionId;
}
std::shared_ptr<Want> DSchedContinuationTest::MockWant(const string& bundleName, const string& ability, int32_t flags)
{
ElementName element("", bundleName, ability);
shared_ptr<Want> spWant = make_shared<Want>();
spWant->SetElement(element);
spWant->SetFlags(flags);
return spWant;
}
std::shared_ptr<AbilityInfo> DSchedContinuationTest::MockAbilityInfo(const string& bundleName, const string& ability,
const string& devId)
{
shared_ptr<AbilityInfo> spAbility = make_shared<AbilityInfo>();
spAbility->bundleName = bundleName;
spAbility->deviceId = devId;
return spAbility;
}
int32_t DSchedContinuationTest::StartContinuation(const sptr<IRemoteObject>& abilityToken, int32_t flags)
{
string bundleName = "bundleName";
string abilityName = "abilityName";
string devId = "devId";
shared_ptr<Want> spWant = MockWant(bundleName, abilityName, flags);
shared_ptr<AbilityInfo> spAbility = MockAbilityInfo(bundleName, abilityName, devId);
return DistributedSchedService::GetInstance().StartContinuation(*spWant, *spAbility, abilityToken);
}
/**
* @tc.name: StartContinuation_001
* @tc.desc: input invalid params
* @tc.type: FUNC
*/
HWTEST_F(DSchedContinuationTest, StartContinuation_001, TestSize.Level0)
{
DTEST_LOG << "DSchedContinuationTest StartContinuation_001 start" << std::endl;
/**
* @tc.steps: step1. input invalid abilityToken
* @tc.expected: step1. return false.
*/
int32_t ret = StartContinuation(nullptr, Want::FLAG_ABILITY_CONTINUATION);
EXPECT_TRUE(ret != ERR_OK);
DTEST_LOG << "DSchedContinuationTest StartContinuation001 end" << std::endl;
}
}
} // namespace OHOS

View File

@ -0,0 +1,53 @@
/*
* Copyright (c) 2021 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef DISTRIBUTED_SCHED_CONTINUATION_TEST_H
#define DISTRIBUTED_SCHED_CONTINUATION_TEST_H
#include "distributed_sched_continuation.h"
#include "distributed_sched_proxy.h"
#include "distributed_sched_service.h"
#include "dtbschedmgr_log.h"
#include "gtest/gtest.h"
#include "if_system_ability_manager.h"
#include "iservice_registry.h"
#include "ohos/aafwk/content/want.h"
#include "system_ability_definition.h"
#include "test_log.h"
namespace OHOS {
namespace DistributedSchedule {
class DSchedContinuationTest : public testing::Test {
public:
static void SetUpTestCase();
static void TearDownTestCase();
void SetUp();
void TearDown();
protected:
sptr<IRemoteObject> GetDSchedService() const;
int32_t PushAbilityToken();
std::shared_ptr<AAFwk::Want> MockWant(const std::string& bundleName, const std::string& ability,
int32_t flags);
std::shared_ptr<AppExecFwk::AbilityInfo> MockAbilityInfo(const std::string& bundleName,
const std::string& ability, const std::string& devId);
int32_t StartContinuation(const sptr<IRemoteObject>& abilityToken, int32_t flags);
std::shared_ptr<DSchedContinuation> dschedContinuation_;
bool timeoutFlag_ = false;
};
} // namespace DistributedSchedule
} // namespace OHOS
#endif // DISTRIBUTED_SCHED_CONTINUATION_TEST_H